Add a regression test for a kernel bug where IORING_CQE_F_BUF_MORE was silently dropped during bundle recv retries with provided buffer rings + incremental recv + bundle recv. The kernel's io_recv_finish() merges CQE flags across retry iterations using: cflags = req->cqe.flags | (cflags & CQE_F_MASK); CQE_F_MASK did not include IORING_CQE_F_BUF_MORE, so the flag was lost on the final retry iteration when a buffer entry was partially consumed. This caused userspace to incorrectly advance its buffer ring head past an entry the kernel still considers in use. The test uses non-aligned send chunks (3000 bytes vs 4096-byte buffer entries) and sqe->len = 0 to avoid MSHOT_CAP blocking the retry path, then verifies that BUF_MORE is set on every CQE where the cumulative consumption is not a multiple of the entry size. The retry path triggering isn't 100% deterministic so we tried forcing that by prefilling the socket with data at start so that when we read, we are more likely to receive data and thus trigger a retry. Signed-off-by: Clément Léger Assisted-by: Claude:claude-opus-4.6 --- test/Makefile | 1 + test/recv-bundle-inc-buf-more.c | 395 ++++++++++++++++++++++++++++++++ 2 files changed, 396 insertions(+) create mode 100644 test/recv-bundle-inc-buf-more.c diff --git a/test/Makefile b/test/Makefile index d1cd2470..e11ebff7 100644 --- a/test/Makefile +++ b/test/Makefile @@ -216,6 +216,7 @@ test_srcs := \ read-mshot-empty.c \ read-mshot-stdin.c \ read-write.c \ + recv-bundle-inc-buf-more.c \ recv-bundle-short-ooo.c \ recv-inc-ooo.c \ recv-msgall.c \ diff --git a/test/recv-bundle-inc-buf-more.c b/test/recv-bundle-inc-buf-more.c new file mode 100644 index 00000000..fc7bac55 --- /dev/null +++ b/test/recv-bundle-inc-buf-more.c @@ -0,0 +1,395 @@ +/* SPDX-License-Identifier: MIT */ +/* + * Test that IORING_CQE_F_BUF_MORE survives bundle recv retries with + * incremental provided buffer rings. + * + * Bug: io_recv_finish() merges CQE flags across bundle retry iterations + * using: + * + * cflags = req->cqe.flags | (cflags & CQE_F_MASK); + * + * CQE_F_MASK did not include IORING_CQE_F_BUF_MORE, so the flag was + * silently dropped on the final retry iteration when a buffer entry was + * partially consumed. Userspace would then wrongfully advance its + * buffer ring head past an entry the kernel still considers in use. + * + * To trigger the bundle retry path reliably: + * + * 1. Use sqe->len = 0 so that mshot_len = 0 and IORING_RECV_MSHOT_CAP + * is never set (it would prevent retries via IORING_RECV_NO_RETRY). + * + * 2. Use multiple small buffer entries in an incremental buffer ring. + * On multishot continuation calls, msg_inq from the previous recv + * limits max_len, which provides fewer entries than available, so + * REQ_F_BL_EMPTY is not set. + * + * 3. Use a sender thread that fills the socket and continues sending + * so that after a recv that consumes the limited buffer, msg_inq > 1 + * (more data arrived), triggering the bundle retry. + * + * When the retry's first iteration fully consumes one or more entries + * and the second iteration partially consumes the next, BUF_MORE must + * be set. On the buggy kernel, the merge drops it. + * + * Fixed by adding IORING_CQE_F_BUF_MORE to CQE_F_MASK. + */ +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "liburing.h" +#include "helpers.h" + +/* + * Multiple small entries: the kernel will provide a subset when + * max_len < total capacity, leaving entries for the retry. + */ +#define NR_BUFS 256 +#define BUF_ENTRY_SIZE 4096 +#define TOTAL_BUF_SIZE (NR_BUFS * BUF_ENTRY_SIZE) /* 1 MB */ +#define BUF_BGID 1 +#define BUF_MASK (NR_BUFS - 1) + +/* + * Total data to send per iteration. Must be less than TOTAL_BUF_SIZE + * so the buffer ring is never fully consumed in a single iteration. + */ +#define TOTAL_SEND (512 * 1024) + +/* + * The sender pre-fills the socket with this much data before the + * receiver arms the multishot recv. This ensures msg_inq is non-zero + * from the start and that data keeps arriving during recv processing, + * making the bundle retry path much more likely to fire. + */ +#define PREFILL_SEND (TOTAL_SEND / 2) + +/* Number of iterations to increase the chance of hitting the retry path */ +#define NR_ITERATIONS 20 + +/* + * Intentionally NOT a multiple of BUF_ENTRY_SIZE so that the data + * in the TCP receive buffer is not aligned to entry boundaries. + * This ensures msg_inq values that cause partial-entry consumption. + */ +#define SEND_CHUNK 3000 + +static int no_bundle_inc; + +struct send_data { + int fd; + int total; + int prefill; + pthread_barrier_t barrier; +}; + +static int send_bytes(int fd, unsigned char *data, int bytes) +{ + int sent = 0; + + while (sent < bytes) { + int chunk = bytes - sent; + int ret; + + if (chunk > SEND_CHUNK) + chunk = SEND_CHUNK; + + ret = send(fd, data, chunk, 0); + if (ret < 0) { + if (errno == EPIPE) + return sent; + perror("send"); + return -1; + } + sent += ret; + } + + return sent; +} + +static void *sender_fn(void *arg) +{ + struct send_data *sd = arg; + unsigned char *data; + + data = malloc(SEND_CHUNK); + if (!data) + return (void *)(intptr_t)1; + + /* + * Pre-fill the socket with data before the receiver arms the + * multishot recv. This ensures the recv sees data immediately + * and that continuations have data flowing in, making the + * bundle retry path fire more reliably. + */ + if (send_bytes(sd->fd, data, sd->prefill) < 0) { + free(data); + return (void *)(intptr_t)1; + } + + /* Signal receiver that data is ready */ + pthread_barrier_wait(&sd->barrier); + + /* Continue sending the rest */ + if (send_bytes(sd->fd, data, sd->total - sd->prefill) < 0) { + free(data); + return (void *)(intptr_t)1; + } + + /* Signal EOF */ + shutdown(sd->fd, SHUT_WR); + free(data); + return NULL; +} + +static int arm_recv(struct io_uring *ring, int fd) +{ + struct io_uring_sqe *sqe; + int ret; + + sqe = io_uring_get_sqe(ring); + /* + * sqe->len = 0: critical for avoiding MSHOT_CAP which would + * block the retry path via IORING_RECV_NO_RETRY. + */ + io_uring_prep_recv_multishot(sqe, fd, NULL, 0, 0); + sqe->ioprio |= IORING_RECVSEND_BUNDLE; + sqe->buf_group = BUF_BGID; + sqe->flags |= IOSQE_BUFFER_SELECT; + sqe->user_data = 1; + + ret = io_uring_submit(ring); + if (ret != 1) { + fprintf(stderr, "submit: %d\n", ret); + return 1; + } + return 0; +} + +static int test(void) +{ + struct io_uring_buf_ring *br; + struct io_uring_params p = { }; + struct io_uring ring; + struct send_data sd; + pthread_t sender; + unsigned char *buf; + int fds[2]; + int ret, val; + int recv_bytes = 0; + int buf_more_missing = 0; + int partial_cqes = 0; + void *tret; + + p.cq_entries = 4096; + p.flags = IORING_SETUP_CQSIZE; + ret = io_uring_queue_init_params(16, &ring, &p); + if (ret) { + fprintf(stderr, "ring init: %d\n", ret); + return T_EXIT_FAIL; + } + + if (!(p.features & IORING_FEAT_RECVSEND_BUNDLE)) { + io_uring_queue_exit(&ring); + return T_EXIT_SKIP; + } + + if (posix_memalign((void **)&buf, 4096, TOTAL_BUF_SIZE)) { + io_uring_queue_exit(&ring); + return T_EXIT_FAIL; + } + memset(buf, 0, TOTAL_BUF_SIZE); + + br = io_uring_setup_buf_ring(&ring, NR_BUFS, BUF_BGID, + IOU_PBUF_RING_INC, &ret); + if (!br) { + free(buf); + io_uring_queue_exit(&ring); + if (ret == -EINVAL) { + no_bundle_inc = 1; + return T_EXIT_SKIP; + } + fprintf(stderr, "buf ring setup: %d\n", ret); + return T_EXIT_FAIL; + } + + for (int i = 0; i < NR_BUFS; i++) { + io_uring_buf_ring_add(br, buf + i * BUF_ENTRY_SIZE, + BUF_ENTRY_SIZE, i, BUF_MASK, i); + } + io_uring_buf_ring_advance(br, NR_BUFS); + + ret = t_create_socket_pair(fds, true); + if (ret) { + fprintf(stderr, "socket pair: %d\n", ret); + goto fail; + } + + val = 1; + setsockopt(fds[1], IPPROTO_TCP, TCP_NODELAY, &val, sizeof(val)); + + /* Start sender thread */ + sd.fd = fds[1]; + sd.total = TOTAL_SEND; + sd.prefill = PREFILL_SEND; + pthread_barrier_init(&sd.barrier, NULL, 2); + + ret = pthread_create(&sender, NULL, sender_fn, &sd); + if (ret) { + fprintf(stderr, "pthread_create: %d\n", ret); + goto fail; + } + + /* Wait for sender to pre-fill the socket, then arm recv */ + pthread_barrier_wait(&sd.barrier); + if (arm_recv(&ring, fds[0])) + goto join_fail; + + /* Collect completions */ + while (recv_bytes < TOTAL_SEND) { + struct io_uring_cqe *cqe; + struct __kernel_timespec ts = { .tv_sec = 10, }; + + ret = io_uring_wait_cqe_timeout(&ring, &cqe, &ts); + if (ret) { + fprintf(stderr, "wait: %d (recv'd %d/%d)\n", + ret, recv_bytes, TOTAL_SEND); + goto join_fail; + } + + if (cqe->res == -ENOBUFS) { + io_uring_cqe_seen(&ring, cqe); + continue; + } + if (cqe->res == -EINVAL) { + io_uring_cqe_seen(&ring, cqe); + no_bundle_inc = 1; + pthread_join(sender, NULL); + close(fds[0]); + close(fds[1]); + free(buf); + io_uring_queue_exit(&ring); + return T_EXIT_SKIP; + } + if (cqe->res <= 0) { + if (cqe->res == 0 && recv_bytes >= TOTAL_SEND) { + io_uring_cqe_seen(&ring, cqe); + break; + } + fprintf(stderr, "recv error: res=%d recv=%d\n", + cqe->res, recv_bytes); + io_uring_cqe_seen(&ring, cqe); + goto join_fail; + } + + if (!(cqe->flags & IORING_CQE_F_BUFFER)) { + fprintf(stderr, "IORING_CQE_F_BUFFER not set\n"); + io_uring_cqe_seen(&ring, cqe); + goto join_fail; + } + + /* + * If the total bytes consumed so far is not a multiple of + * BUF_ENTRY_SIZE, the last buffer entry was only partially + * used and BUF_MORE must be set. We check the cumulative + * total (not just cqe->res) because a previous CQE may + * have partially consumed an entry that this CQE finishes. + * + * On a buggy kernel where the bundle retry drops BUF_MORE + * from the merge, this check catches the regression. + * + * Note: only check when the buffer ring hasn't been + * fully consumed (recv_bytes + cqe->res < TOTAL_BUF_SIZE). + */ + if (((recv_bytes + cqe->res) % BUF_ENTRY_SIZE) != 0 && + (recv_bytes + cqe->res) < TOTAL_BUF_SIZE) { + partial_cqes++; + if (!(cqe->flags & IORING_CQE_F_BUF_MORE)) { + fprintf(stderr, + "FAIL: BUF_MORE not set after partial " + "entry consumption!\n" + " cqe->res=%d flags=0x%x " + "recv_bytes=%d\n", + cqe->res, cqe->flags, recv_bytes); + buf_more_missing = 1; + } + } + + recv_bytes += cqe->res; + + if (!(cqe->flags & IORING_CQE_F_MORE) && + recv_bytes < TOTAL_SEND) { + io_uring_cqe_seen(&ring, cqe); + if (arm_recv(&ring, fds[0])) + goto join_fail; + continue; + } + + io_uring_cqe_seen(&ring, cqe); + } + + pthread_join(sender, &tret); + + if (tret) { + fprintf(stderr, "sender thread failed\n"); + goto fail; + } + + if (buf_more_missing) + goto fail; + + /* + * If no partial-entry CQEs were seen, the retry path was + * likely not exercised. Return -1 so the caller can retry. + */ + if (!partial_cqes) + ret = -1; + else + ret = T_EXIT_PASS; + + goto out; + +join_fail: + pthread_join(sender, NULL); +fail: + ret = T_EXIT_FAIL; +out: + close(fds[0]); + close(fds[1]); + free(buf); + io_uring_queue_exit(&ring); + return ret; +} + +int main(int argc, char *argv[]) +{ + int i, ret; + + if (argc > 1) + return T_EXIT_SKIP; + + /* + * The retry path depends on TCP timing and is not fully deterministic. + * Loop until we get a definitive result: PASS means partial-entry + * CQEs were seen and BUF_MORE was correctly set on all of them. + * FAIL/SKIP stop immediately. -1 means inconclusive (no partial + * CQEs seen), so retry. If all iterations are inconclusive, fail. + */ + for (i = 0; i < NR_ITERATIONS; i++) { + ret = test(); + if (ret == T_EXIT_PASS || ret == T_EXIT_FAIL || + ret == T_EXIT_SKIP) + return ret; + } + + fprintf(stderr, "no partial-entry CQEs seen after %d iterations\n", + NR_ITERATIONS); + return T_EXIT_FAIL; +} -- 2.53.0-Meta