Commit 2300e624 authored by Alessandro Rubini's avatar Alessandro Rubini

buffers: properly handle retr_block called by push

buffer->store_block calls trigger->push_block when it receives the
first block. The trigger user fires directly , and if the device
accepts the data immediately, our data_done() will retr_block from the
buffer.  This happens with zio-zero, cset 1, and leads to a deadlock.

To allow immediate success of push_block down to device level, release
the lock before calling push_block. The code here also handles
pathologic situations like two processes storing at the same time.
Signed-off-by: Alessandro Rubini's avatarAlessandro Rubini <rubini@gnudd.com>
Acked-by: 's avatarFederico Vaga <federico.vaga@gmail.com>
parent 20817fbf
......@@ -102,15 +102,26 @@ static void zbk_free_block(struct zio_bi *bi, struct zio_block *block)
}
/* When write() stores the first block, we try pushing it */
static inline int __try_push(struct zio_ti *ti, struct zio_channel *chan,
static inline int __try_push(struct zio_bi *bi, struct zio_channel *chan,
struct zio_block *block)
{
struct zio_ti *ti = chan->cset->ti;
int pushed;
/* chek if trigger is disabled */
if (unlikely((ti->flags & ZIO_STATUS) == ZIO_DISABLED))
return 0;
if (ti->t_op->push_block(ti, chan, block) < 0)
return 0;
return 1;
/*
* If push succeeds and the device eats data immediately,
* the trigger may call retr_block right now. So
* release the lock but also say we can't retrieve now.
*/
bi->flags |= ZBI_PUSHING;
spin_unlock(&bi->lock);
pushed = (ti->t_op->push_block(ti, chan, block) == 0);
spin_lock(&bi->lock);
bi->flags &= ~ZBI_PUSHING;
return pushed;
}
/* Store is called by the trigger (for input) or by f->write (for output) */
......@@ -135,20 +146,20 @@ static int zbk_store_block(struct zio_bi *bi, struct zio_block *block)
spin_lock(&bi->lock);
if (zbki->nitem == bi->zattr_set.std_zattr[ZATTR_ZBUF_MAXLEN].value)
goto out_unlock;
list_add_tail(&item->list, &zbki->list);
if (!zbki->nitem) {
if (unlikely(output))
pushed = __try_push(chan->cset->ti, chan, block);
pushed = __try_push(bi, chan, block);
else
awake = 1;
}
if (likely(!pushed)) {
zbki->nitem++;
list_add_tail(&item->list, &zbki->list);
}
if (pushed)
list_del(&item->list);
zbki->nitem += !pushed;
spin_unlock(&bi->lock);
/* if input, awake user space */
if (awake && ((bi->flags & ZIO_DIR) == ZIO_DIR_INPUT))
/* if first input, awake user space */
if (awake)
wake_up_interruptible(&bi->q);
return 0;
......@@ -169,7 +180,7 @@ static struct zio_block *zbk_retr_block(struct zio_bi *bi)
zbki = to_zbki(bi);
spin_lock(&bi->lock);
if (list_empty(&zbki->list))
if (!zbki->nitem || bi->flags & ZBI_PUSHING)
goto out_unlock;
first = zbki->list.next;
item = list_entry(first, struct zbk_item, list);
......
......@@ -162,15 +162,26 @@ static void zbk_free_block(struct zio_bi *bi, struct zio_block *block)
}
/* When write() stores the first block, we try pushing it */
static inline int __try_push(struct zio_ti *ti, struct zio_channel *chan,
static inline int __try_push(struct zio_bi *bi, struct zio_channel *chan,
struct zio_block *block)
{
struct zio_ti *ti = chan->cset->ti;
int pushed;
/* chek if trigger is disabled */
if (unlikely((ti->flags & ZIO_STATUS) == ZIO_DISABLED))
return 0;
if (ti->t_op->push_block(ti, chan, block) < 0)
return 0;
return 1;
/*
* If push succeeds and the device eats data immediately,
* the trigger may call retr_block right now. So
* release the lock but also say we can't retrieve now.
*/
bi->flags |= ZBI_PUSHING;
spin_unlock(&bi->lock);
pushed = (ti->t_op->push_block(ti, chan, block) == 0);
spin_lock(&bi->lock);
bi->flags &= ~ZBI_PUSHING;
return pushed;
}
/* Store is called by the trigger (for input) or by f->write (for output) */
......@@ -179,7 +190,7 @@ static int zbk_store_block(struct zio_bi *bi, struct zio_block *block)
struct zbk_instance *zbki = to_zbki(bi);
struct zio_channel *chan = bi->chan;
struct zbk_item *item;
int awake = 0, pushed = 0, output;
int awake = 0, pushed = 0, output, first;
pr_debug("%s:%d (%p, %p)\n", __func__, __LINE__, bi, block);
......@@ -193,20 +204,21 @@ static int zbk_store_block(struct zio_bi *bi, struct zio_block *block)
/* add to the buffer instance or push to the trigger */
spin_lock(&bi->lock);
if (list_empty(&zbki->list)) {
first = list_empty(&zbki->list);
list_add_tail(&item->list, &zbki->list);
if (first) {
if (unlikely(output))
pushed = __try_push(chan->cset->ti, chan, block);
pushed = __try_push(bi, chan, block);
else
awake = 1;
}
if (likely(!pushed))
list_add_tail(&item->list, &zbki->list);
if (pushed)
list_del(&item->list);
spin_unlock(&bi->lock);
/* if input, awake user space */
if (awake && ((bi->flags & ZIO_DIR) == ZIO_DIR_INPUT))
/* if first input, awake user space */
if (awake)
wake_up_interruptible(&bi->q);
return 0;
}
......@@ -222,7 +234,7 @@ static struct zio_block *zbk_retr_block(struct zio_bi *bi)
zbki = to_zbki(bi);
spin_lock(&bi->lock);
if (list_empty(&zbki->list))
if (list_empty(&zbki->list) || bi->flags & ZBI_PUSHING)
goto out_unlock;
first = zbki->list.next;
item = list_entry(first, struct zbk_item, list);
......
......@@ -80,6 +80,12 @@ struct zio_bi {
};
#define to_zio_bi(obj) container_of(obj, struct zio_bi, head.dev)
/* first 4bit are reserved for zio object universal flags */
enum zbi_flag_mask {
ZBI_PUSHING = 0x10, /* a push is being performed */
};
/* The block is the basic data item being transferred */
struct zio_block {
unsigned long ctrl_flags;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment