On 24. 03. 24, 23:23, Sasha Levin wrote:
From: Jens Axboe axboe@kernel.dk
[ Upstream commit 9fe3eaea4a3530ca34a8d8ff00b1848c528789ca ]
If we have a ton of notifications coming in, we can be looping in here for a long time. This can be problematic for various reasons, mostly because we can starve userspace. If the application is waiting on N events, then only re-run if we need more events.
This commit breaks test/recv-multishot.c from liburing: early error: res 4 test stream=1 wait_each=0 recvmsg=0 early_error=0 defer=1 failed
The behaviour is the same in 6.9-rc2 (which contains the commit too).
Reverting the commit on the top of 6.8.2 makes it pass again.
Should the test be updated or is the commit wrong?
Thanks.
Fixes: c0e0d6ba25f1 ("io_uring: add IORING_SETUP_DEFER_TASKRUN") Signed-off-by: Jens Axboe axboe@kernel.dk Signed-off-by: Sasha Levin sashal@kernel.org
io_uring/io_uring.c | 44 +++++++++++++++++++++++++++++--------------- 1 file changed, 29 insertions(+), 15 deletions(-)
diff --git a/io_uring/io_uring.c b/io_uring/io_uring.c index 5233a20d01b54..39dfb83dc9fc4 100644 --- a/io_uring/io_uring.c +++ b/io_uring/io_uring.c @@ -1389,7 +1389,20 @@ static void __cold io_move_task_work_from_local(struct io_ring_ctx *ctx) } } -static int __io_run_local_work(struct io_ring_ctx *ctx, struct io_tw_state *ts) +static bool io_run_local_work_continue(struct io_ring_ctx *ctx, int events,
int min_events)
+{
- if (llist_empty(&ctx->work_llist))
return false;
- if (events < min_events)
return true;
- if (ctx->flags & IORING_SETUP_TASKRUN_FLAG)
atomic_or(IORING_SQ_TASKRUN, &ctx->rings->sq_flags);
- return false;
+}
+static int __io_run_local_work(struct io_ring_ctx *ctx, struct io_tw_state *ts,
{ struct llist_node *node; unsigned int loops = 0;int min_events)
@@ -1418,18 +1431,20 @@ static int __io_run_local_work(struct io_ring_ctx *ctx, struct io_tw_state *ts) } loops++;
- if (!llist_empty(&ctx->work_llist))
- if (io_run_local_work_continue(ctx, ret, min_events)) goto again; if (ts->locked) { io_submit_flush_completions(ctx);
if (!llist_empty(&ctx->work_llist))
}if (io_run_local_work_continue(ctx, ret, min_events)) goto again;
- trace_io_uring_local_work_run(ctx, ret, loops); return ret; }
-static inline int io_run_local_work_locked(struct io_ring_ctx *ctx) +static inline int io_run_local_work_locked(struct io_ring_ctx *ctx,
{ struct io_tw_state ts = { .locked = true, }; int ret;int min_events)
@@ -1437,20 +1452,20 @@ static inline int io_run_local_work_locked(struct io_ring_ctx *ctx) if (llist_empty(&ctx->work_llist)) return 0;
- ret = __io_run_local_work(ctx, &ts);
- ret = __io_run_local_work(ctx, &ts, min_events); /* shouldn't happen! */ if (WARN_ON_ONCE(!ts.locked)) mutex_lock(&ctx->uring_lock); return ret; }
-static int io_run_local_work(struct io_ring_ctx *ctx) +static int io_run_local_work(struct io_ring_ctx *ctx, int min_events) { struct io_tw_state ts = {}; int ret; ts.locked = mutex_trylock(&ctx->uring_lock);
- ret = __io_run_local_work(ctx, &ts);
- ret = __io_run_local_work(ctx, &ts, min_events); if (ts.locked) mutex_unlock(&ctx->uring_lock);
@@ -1646,7 +1661,7 @@ static int io_iopoll_check(struct io_ring_ctx *ctx, long min) io_task_work_pending(ctx)) { u32 tail = ctx->cached_cq_tail;
(void) io_run_local_work_locked(ctx);
(void) io_run_local_work_locked(ctx, min);
if (task_work_pending(current) || wq_list_empty(&ctx->iopoll_list)) { @@ -2489,7 +2504,7 @@ int io_run_task_work_sig(struct io_ring_ctx *ctx) { if (!llist_empty(&ctx->work_llist)) { __set_current_state(TASK_RUNNING);
if (io_run_local_work(ctx) > 0)
} if (io_run_task_work() > 0)if (io_run_local_work(ctx, INT_MAX) > 0) return 0;
@@ -2557,7 +2572,7 @@ static int io_cqring_wait(struct io_ring_ctx *ctx, int min_events, if (!io_allowed_run_tw(ctx)) return -EEXIST; if (!llist_empty(&ctx->work_llist))
io_run_local_work(ctx);
io_run_task_work(); io_cqring_overflow_flush(ctx); /* if user messes with these they will just get an early return */io_run_local_work(ctx, min_events);
@@ -2595,11 +2610,10 @@ static int io_cqring_wait(struct io_ring_ctx *ctx, int min_events, trace_io_uring_cqring_wait(ctx, min_events); do {
unsigned long check_cq;int nr_wait = (int) iowq.cq_tail - READ_ONCE(ctx->rings->cq.tail);
if (ctx->flags & IORING_SETUP_DEFER_TASKRUN) {
int nr_wait = (int) iowq.cq_tail - READ_ONCE(ctx->rings->cq.tail);
} else {atomic_set(&ctx->cq_wait_nr, nr_wait); set_current_state(TASK_INTERRUPTIBLE);
@@ -2618,7 +2632,7 @@ static int io_cqring_wait(struct io_ring_ctx *ctx, int min_events, */ io_run_task_work(); if (!llist_empty(&ctx->work_llist))
io_run_local_work(ctx);
io_run_local_work(ctx, nr_wait);
/* * Non-local task_work will be run on exit to userspace, but @@ -3273,7 +3287,7 @@ static __cold bool io_uring_try_cancel_requests(struct io_ring_ctx *ctx, if ((ctx->flags & IORING_SETUP_DEFER_TASKRUN) && io_allowed_defer_tw_run(ctx))
ret |= io_run_local_work(ctx) > 0;
ret |= io_cancel_defer_files(ctx, task, cancel_all); mutex_lock(&ctx->uring_lock); ret |= io_poll_remove_all(ctx, task, cancel_all);ret |= io_run_local_work(ctx, INT_MAX) > 0;
@@ -3635,7 +3649,7 @@ SYSCALL_DEFINE6(io_uring_enter, unsigned int, fd, u32, to_submit, * it should handle ownership problems if any. */ if (ctx->flags & IORING_SETUP_DEFER_TASKRUN)
(void)io_run_local_work_locked(ctx);
} mutex_unlock(&ctx->uring_lock); }(void)io_run_local_work_locked(ctx, min_complete);