From: Pavel Begunkov asml.silence@gmail.com
[ Upstream commit aa1df3a360a0c50e0f0086a785d75c2785c29967 ]
Overflowing CQEs may result in reordering, which is buggy in case of links, F_MORE and so on. If we guarantee that we don't reorder for the unlikely event of a CQ ring overflow, then we can further extend this to not have to terminate multishot requests if it happens. For other operations, like zerocopy sends, we have no choice but to honor CQE ordering.
Reported-by: Dylan Yudaken dylany@fb.com Signed-off-by: Pavel Begunkov asml.silence@gmail.com Link: https://lore.kernel.org/r/ec3bc55687b0768bbe20fb62d7d06cfced7d7e70.166389203... Signed-off-by: Jens Axboe axboe@kernel.dk Signed-off-by: Sasha Levin sashal@kernel.org --- io_uring/io_uring.c | 12 ++++++++++-- io_uring/io_uring.h | 12 +++++++++--- 2 files changed, 19 insertions(+), 5 deletions(-)
diff --git a/io_uring/io_uring.c b/io_uring/io_uring.c index 242d896c00f3..13af6b56ebd2 100644 --- a/io_uring/io_uring.c +++ b/io_uring/io_uring.c @@ -567,7 +567,7 @@ static bool __io_cqring_overflow_flush(struct io_ring_ctx *ctx, bool force)
io_cq_lock(ctx); while (!list_empty(&ctx->cq_overflow_list)) { - struct io_uring_cqe *cqe = io_get_cqe(ctx); + struct io_uring_cqe *cqe = io_get_cqe_overflow(ctx, true); struct io_overflow_cqe *ocqe;
if (!cqe && !force) @@ -694,12 +694,19 @@ bool io_req_cqe_overflow(struct io_kiocb *req) * control dependency is enough as we're using WRITE_ONCE to * fill the cq entry */ -struct io_uring_cqe *__io_get_cqe(struct io_ring_ctx *ctx) +struct io_uring_cqe *__io_get_cqe(struct io_ring_ctx *ctx, bool overflow) { struct io_rings *rings = ctx->rings; unsigned int off = ctx->cached_cq_tail & (ctx->cq_entries - 1); unsigned int free, queued, len;
+ /* + * Posting into the CQ when there are pending overflowed CQEs may break + * ordering guarantees, which will affect links, F_MORE users and more. + * Force overflow the completion. + */ + if (!overflow && (ctx->check_cq & BIT(IO_CHECK_CQ_OVERFLOW_BIT))) + return NULL;
/* userspace may cheat modifying the tail, be safe and do min */ queued = min(__io_cqring_events(ctx), ctx->cq_entries); @@ -2228,6 +2235,7 @@ static int io_cqring_wait(struct io_ring_ctx *ctx, int min_events,
do { io_cqring_overflow_flush(ctx); + if (io_cqring_events(ctx) >= min_events) return 0; if (!io_run_task_work()) diff --git a/io_uring/io_uring.h b/io_uring/io_uring.h index 2f73f83af960..45809ae6f64e 100644 --- a/io_uring/io_uring.h +++ b/io_uring/io_uring.h @@ -24,7 +24,7 @@ enum { IOU_STOP_MULTISHOT = -ECANCELED, };
-struct io_uring_cqe *__io_get_cqe(struct io_ring_ctx *ctx); +struct io_uring_cqe *__io_get_cqe(struct io_ring_ctx *ctx, bool overflow); bool io_req_cqe_overflow(struct io_kiocb *req); int io_run_task_work_sig(void); void io_req_complete_failed(struct io_kiocb *req, s32 res); @@ -91,7 +91,8 @@ static inline void io_cq_lock(struct io_ring_ctx *ctx)
void io_cq_unlock_post(struct io_ring_ctx *ctx);
-static inline struct io_uring_cqe *io_get_cqe(struct io_ring_ctx *ctx) +static inline struct io_uring_cqe *io_get_cqe_overflow(struct io_ring_ctx *ctx, + bool overflow) { if (likely(ctx->cqe_cached < ctx->cqe_sentinel)) { struct io_uring_cqe *cqe = ctx->cqe_cached; @@ -103,7 +104,12 @@ static inline struct io_uring_cqe *io_get_cqe(struct io_ring_ctx *ctx) return cqe; }
- return __io_get_cqe(ctx); + return __io_get_cqe(ctx, overflow); +} + +static inline struct io_uring_cqe *io_get_cqe(struct io_ring_ctx *ctx) +{ + return io_get_cqe_overflow(ctx, false); }
static inline bool __io_fill_cqe_req(struct io_ring_ctx *ctx,