On Thu, May 24, 2018 at 11:04:40AM +0800, Ming Lei wrote:
When the allocation process is scheduled back and the mapped hw queue is changed, fake one extra wake up on previous queue for compensating wake up miss, so other allocations on the previous queue won't be starved.
This patch fixes one request allocation hang issue, which can be triggered easily in case of very low nr_request.
Thanks, Ming, this looks better. One comment below.
Cc: stable@vger.kernel.org Cc: Omar Sandoval osandov@fb.com Signed-off-by: Ming Lei ming.lei@redhat.com
V3:
- fix comments as suggested by Jens
- remove the wrapper as suggested by Omar
V2:
- fix build failure
block/blk-mq-tag.c | 12 ++++++++++++ include/linux/sbitmap.h | 7 +++++++ lib/sbitmap.c | 22 ++++++++++++---------- 3 files changed, 31 insertions(+), 10 deletions(-)
diff --git a/block/blk-mq-tag.c b/block/blk-mq-tag.c index 336dde07b230..a4e58fc28a06 100644 --- a/block/blk-mq-tag.c +++ b/block/blk-mq-tag.c @@ -134,6 +134,8 @@ unsigned int blk_mq_get_tag(struct blk_mq_alloc_data *data) ws = bt_wait_ptr(bt, data->hctx); drop_ctx = data->ctx == NULL; do {
struct sbitmap_queue *bt_prev;
- /*
- We're out of tags on this hardware queue, kick any
- pending IO submits before going to sleep waiting for
@@ -159,6 +161,7 @@ unsigned int blk_mq_get_tag(struct blk_mq_alloc_data *data) if (data->ctx) blk_mq_put_ctx(data->ctx);
io_schedule();bt_prev = bt;
data->ctx = blk_mq_get_ctx(data->q); @@ -170,6 +173,15 @@ unsigned int blk_mq_get_tag(struct blk_mq_alloc_data *data) bt = &tags->bitmap_tags; finish_wait(&ws->wait, &wait);
/*
* If destination hw queue is changed, fake wake up on
* previous queue for compensating the wake up miss, so
* other allocations on previous queue won't be starved.
*/
if (bt != bt_prev)
sbitmap_queue_wake_up(bt_prev);
- ws = bt_wait_ptr(bt, data->hctx); } while (1);
diff --git a/include/linux/sbitmap.h b/include/linux/sbitmap.h index 841585f6e5f2..bba9d80191b7 100644 --- a/include/linux/sbitmap.h +++ b/include/linux/sbitmap.h @@ -484,6 +484,13 @@ static inline struct sbq_wait_state *sbq_wait_ptr(struct sbitmap_queue *sbq, void sbitmap_queue_wake_all(struct sbitmap_queue *sbq); /**
- sbitmap_queue_wake_up() - Wake up some of waiters in one waitqueue
- on a &struct sbitmap_queue.
- @sbq: Bitmap queue to wake up.
- */
+void sbitmap_queue_wake_up(struct sbitmap_queue *sbq);
+/**
- sbitmap_queue_show() - Dump &struct sbitmap_queue information to a &struct
- seq_file.
- @sbq: Bitmap queue to show.
diff --git a/lib/sbitmap.c b/lib/sbitmap.c index e6a9c06ec70c..14e027a33ffa 100644 --- a/lib/sbitmap.c +++ b/lib/sbitmap.c @@ -335,8 +335,9 @@ void sbitmap_queue_resize(struct sbitmap_queue *sbq, unsigned int depth) if (sbq->wake_batch != wake_batch) { WRITE_ONCE(sbq->wake_batch, wake_batch); /*
* Pairs with the memory barrier in sbq_wake_up() to ensure that
* the batch size is updated before the wait counts.
* Pairs with the memory barrier in sbitmap_queue_wake_up()
* to ensure that the batch size is updated before the wait
*/ smp_mb__before_atomic(); for (i = 0; i < SBQ_WAIT_QUEUES; i++)* counts.
@@ -425,7 +426,7 @@ static struct sbq_wait_state *sbq_wake_ptr(struct sbitmap_queue *sbq) return NULL; } -static void sbq_wake_up(struct sbitmap_queue *sbq) +void sbitmap_queue_wake_up(struct sbitmap_queue *sbq) { struct sbq_wait_state *ws; unsigned int wake_batch;
Just after this is:
/* * Pairs with the memory barrier in set_current_state() to ensure the * proper ordering of clear_bit()/waitqueue_active() in the waker and * test_and_set_bit_lock()/prepare_to_wait()/finish_wait() in the * waiter. See the comment on waitqueue_active(). This is __after_atomic * because we just did clear_bit_unlock() in the caller. */ smp_mb__after_atomic();
But there's no atomic operation before this in blk_mq_get_tag(). I don't think this memory barrier is necessary for the blk_mq_get_tag() case, so let's move it to sbitmap_queue_clear():
diff --git a/lib/sbitmap.c b/lib/sbitmap.c index 14e027a33ffa..537328a98c06 100644 --- a/lib/sbitmap.c +++ b/lib/sbitmap.c @@ -432,15 +432,6 @@ void sbitmap_queue_wake_up(struct sbitmap_queue *sbq) unsigned int wake_batch; int wait_cnt;
- /* - * Pairs with the memory barrier in set_current_state() to ensure the - * proper ordering of clear_bit()/waitqueue_active() in the waker and - * test_and_set_bit_lock()/prepare_to_wait()/finish_wait() in the - * waiter. See the comment on waitqueue_active(). This is __after_atomic - * because we just did clear_bit_unlock() in the caller. - */ - smp_mb__after_atomic(); - ws = sbq_wake_ptr(sbq); if (!ws) return; @@ -472,6 +463,13 @@ void sbitmap_queue_clear(struct sbitmap_queue *sbq, unsigned int nr, unsigned int cpu) { sbitmap_clear_bit_unlock(&sbq->sb, nr); + /* + * Pairs with the memory barrier in set_current_state() to ensure the + * proper ordering of clear_bit_unlock()/waitqueue_active() in the waker + * and test_and_set_bit_lock()/prepare_to_wait()/finish_wait() in the + * waiter. See the comment on waitqueue_active(). + */ + smp_mb__after_atomic(); sbitmap_queue_wake_up(sbq); if (likely(!sbq->round_robin && nr < sbq->sb.depth)) *per_cpu_ptr(sbq->alloc_hint, cpu) = nr;
That comment also mentioned clear_bit() but we do clear_bit_unlock() now, so we can update that at the same time.
@@ -454,23 +455,24 @@ static void sbq_wake_up(struct sbitmap_queue *sbq) */ smp_mb__before_atomic(); /*
* If there are concurrent callers to sbq_wake_up(), the last
* one to decrement the wait count below zero will bump it back
* up. If there is a concurrent resize, the count reset will
* either cause the cmpxchg to fail or overwrite after the
* cmpxchg.
* If there are concurrent callers to sbitmap_queue_wake_up(),
* the last one to decrement the wait count below zero will
* bump it back up. If there is a concurrent resize, the count
* reset will either cause the cmpxchg to fail or overwrite
*/ atomic_cmpxchg(&ws->wait_cnt, wait_cnt, wait_cnt + wake_batch); sbq_index_atomic_inc(&sbq->wake_index); wake_up_nr(&ws->wait, wake_batch); }* after the cmpxchg.
} +EXPORT_SYMBOL_GPL(sbitmap_queue_wake_up); void sbitmap_queue_clear(struct sbitmap_queue *sbq, unsigned int nr, unsigned int cpu) { sbitmap_clear_bit_unlock(&sbq->sb, nr);
- sbq_wake_up(sbq);
- sbitmap_queue_wake_up(sbq); if (likely(!sbq->round_robin && nr < sbq->sb.depth)) *per_cpu_ptr(sbq->alloc_hint, cpu) = nr;
} @@ -482,7 +484,7 @@ void sbitmap_queue_wake_all(struct sbitmap_queue *sbq) /* * Pairs with the memory barrier in set_current_state() like in
* sbq_wake_up().
*/ smp_mb(); wake_index = atomic_read(&sbq->wake_index);* sbitmap_queue_wake_up().
-- 2.9.5