From: Ming Lei So far IORING_RECVSEND_FIXED_BUF is only honoured on the SEND_ZC path, even though the import wiring is already present for plain send and completely absent for recv. Targets such as ublk's NBD backend want to push/pull I/O data directly to/from an io_uring registered buffer over a plain send/recv on a TCP socket. Wire IORING_RECVSEND_FIXED_BUF into the plain IORING_OP_SEND and IORING_OP_RECV paths: - Accept the flag in SENDMSG_FLAGS / RECVMSG_FLAGS and, at prep time, restrict it to the non-vectorized IORING_OP_SEND / IORING_OP_RECV opcodes. It is mutually exclusive with buffer select, bundles and (for recv) multishot, and records sqe->buf_index. - For recv, set REQ_F_IMPORT_BUFFER in setup so the registered buffer is imported lazily at issue time, mirroring the send path. - In io_send()/io_recv(), import the registered buffer via io_import_reg_buf() (ITER_SOURCE for send, ITER_DEST for recv) and clear REQ_F_IMPORT_BUFFER. The resulting bvec iter persists in async_data, so MSG_WAITALL partial send/recv retries resume at the right offset. Signed-off-by: Ming Lei --- io_uring/net.c | 47 +++++++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 45 insertions(+), 2 deletions(-) diff --git a/io_uring/net.c b/io_uring/net.c index f01f1d25e930..c2bbf9dd2790 100644 --- a/io_uring/net.c +++ b/io_uring/net.c @@ -418,7 +418,8 @@ static int io_sendmsg_setup(struct io_kiocb *req, const struct io_uring_sqe *sqe return io_net_import_vec(req, kmsg, msg.msg_iov, msg.msg_iovlen, ITER_SOURCE); } -#define SENDMSG_FLAGS (IORING_RECVSEND_POLL_FIRST | IORING_RECVSEND_BUNDLE | IORING_SEND_VECTORIZED) +#define SENDMSG_FLAGS (IORING_RECVSEND_POLL_FIRST | IORING_RECVSEND_BUNDLE | \ + IORING_SEND_VECTORIZED | IORING_RECVSEND_FIXED_BUF) int io_sendmsg_prep(struct io_kiocb *req, const struct io_uring_sqe *sqe) { @@ -431,6 +432,15 @@ int io_sendmsg_prep(struct io_kiocb *req, const struct io_uring_sqe *sqe) sr->flags = READ_ONCE(sqe->ioprio); if (sr->flags & ~SENDMSG_FLAGS) return -EINVAL; + if (sr->flags & IORING_RECVSEND_FIXED_BUF) { + /* registered buffer send only supported for plain IORING_OP_SEND */ + if (req->opcode != IORING_OP_SEND || + (sr->flags & IORING_RECVSEND_BUNDLE) || + (sr->flags & IORING_SEND_VECTORIZED) || + (req->flags & REQ_F_BUFFER_SELECT)) + return -EINVAL; + req->buf_index = READ_ONCE(sqe->buf_index); + } sr->msg_flags = READ_ONCE(sqe->msg_flags) | MSG_NOSIGNAL; if (sr->msg_flags & MSG_DONTWAIT) req->flags |= REQ_F_NOWAIT; @@ -662,6 +672,15 @@ int io_send(struct io_kiocb *req, unsigned int issue_flags) (sr->flags & IORING_RECVSEND_POLL_FIRST)) return -EAGAIN; + if (req->flags & REQ_F_IMPORT_BUFFER) { + ret = io_import_reg_buf(req, &kmsg->msg.msg_iter, + (u64)(uintptr_t)sr->buf, sr->len, + ITER_SOURCE, issue_flags); + if (unlikely(ret)) + return ret; + req->flags &= ~REQ_F_IMPORT_BUFFER; + } + flags = sr->msg_flags; if (issue_flags & IO_URING_F_NONBLOCK) flags |= MSG_DONTWAIT; @@ -777,6 +796,10 @@ static int io_recvmsg_prep_setup(struct io_kiocb *req) if (req->flags & REQ_F_BUFFER_SELECT) return 0; + if (sr->flags & IORING_RECVSEND_FIXED_BUF) { + req->flags |= REQ_F_IMPORT_BUFFER; + return 0; + } return import_ubuf(ITER_DEST, sr->buf, sr->len, &kmsg->msg.msg_iter); } @@ -785,7 +808,7 @@ static int io_recvmsg_prep_setup(struct io_kiocb *req) } #define RECVMSG_FLAGS (IORING_RECVSEND_POLL_FIRST | IORING_RECV_MULTISHOT | \ - IORING_RECVSEND_BUNDLE) + IORING_RECVSEND_BUNDLE | IORING_RECVSEND_FIXED_BUF) int io_recvmsg_prep(struct io_kiocb *req, const struct io_uring_sqe *sqe) { @@ -803,6 +826,14 @@ int io_recvmsg_prep(struct io_kiocb *req, const struct io_uring_sqe *sqe) sr->flags = READ_ONCE(sqe->ioprio); if (sr->flags & ~RECVMSG_FLAGS) return -EINVAL; + if (sr->flags & IORING_RECVSEND_FIXED_BUF) { + /* registered buffer recv only for plain IORING_OP_RECV */ + if (req->opcode != IORING_OP_RECV || + (sr->flags & (IORING_RECV_MULTISHOT | IORING_RECVSEND_BUNDLE)) || + (req->flags & REQ_F_BUFFER_SELECT)) + return -EINVAL; + req->buf_index = READ_ONCE(sqe->buf_index); + } sr->msg_flags = READ_ONCE(sqe->msg_flags); if (sr->msg_flags & MSG_DONTWAIT) req->flags |= REQ_F_NOWAIT; @@ -1199,6 +1230,18 @@ int io_recv(struct io_kiocb *req, unsigned int issue_flags) if (force_nonblock) flags |= MSG_DONTWAIT; + if (req->flags & REQ_F_IMPORT_BUFFER) { + ret = io_import_reg_buf(req, &kmsg->msg.msg_iter, + (u64)(uintptr_t)sr->buf, sr->len, + ITER_DEST, issue_flags); + if (unlikely(ret)) { + kmsg->msg.msg_inq = -1; + sel.buf_list = NULL; + goto out_free; + } + req->flags &= ~REQ_F_IMPORT_BUFFER; + } + retry_multishot: sel.buf_list = NULL; if (io_do_buffer_select(req)) { -- 2.54.0 From: Ming Lei Exercise IORING_RECVSEND_FIXED_BUF on plain IORING_OP_SEND and IORING_OP_RECV: send-fixed, recv-fixed and both-fixed roundtrips with non-zero offsets into distinct registered buffers, a large MSG_WAITALL transfer to cover the persisted bvec iter across partial retries, plus negative cases (sendmsg/bundle/recv-multishot rejected with -EINVAL and a bad buf_index returning -EFAULT). Signed-off-by: Ming Lei --- test/Makefile | 1 + test/fixed-buf-send-recv.c | 311 +++++++++++++++++++++++++++++++++++++ 2 files changed, 312 insertions(+) create mode 100644 test/fixed-buf-send-recv.c diff --git a/test/Makefile b/test/Makefile index d1cd2470..effb3bae 100644 --- a/test/Makefile +++ b/test/Makefile @@ -132,6 +132,7 @@ test_srcs := \ file-verify.c \ fixed-buf-iter.c \ fixed-buf-merge.c \ + fixed-buf-send-recv.c \ fixed-hugepage.c \ fixed-link.c \ fixed-reuse.c \ diff --git a/test/fixed-buf-send-recv.c b/test/fixed-buf-send-recv.c new file mode 100644 index 00000000..d51bcf3f --- /dev/null +++ b/test/fixed-buf-send-recv.c @@ -0,0 +1,311 @@ +/* SPDX-License-Identifier: MIT */ +/* + * Test IORING_RECVSEND_FIXED_BUF on plain IORING_OP_SEND / IORING_OP_RECV. + * + * A registered (fixed) buffer can be used as the send source and/or the recv + * destination over a TCP socket via IORING_RECVSEND_FIXED_BUF. Covers: + * - send fixed -> recv normal + * - send normal -> recv fixed + * - send fixed -> recv fixed (both ends registered, non-zero offsets) + * - large MSG_WAITALL transfer (exercises the persisted bvec iter across + * partial send/recv retries) + * - negative cases: FIXED_BUF rejected on sendmsg, on bundle, on send + * vectorized, on recv multishot, and a bad buf_index -> -EFAULT. + */ +#include +#include +#include +#include +#include +#include +#include + +#include "liburing.h" +#include "helpers.h" + +#define BUF_SIZE (128 * 1024) +#define OFF 4096 + +/* registered buffer indices */ +#define SBUF_IDX 0 +#define RBUF_IDX 1 + +static int no_fixed_buf; + +static void fill_pattern(unsigned char *buf, size_t len, unsigned seed) +{ + size_t i; + + for (i = 0; i < len; i++) + buf[i] = (unsigned char)((i + seed) & 0xff); +} + +/* + * Submit a paired send (user_data 1) + recv (user_data 2) and wait for both. + * Either side may use a registered buffer. Returns 0 on success with the + * received data verified against the sent pattern, -EINVAL if the kernel + * doesn't support the flag, or 1 on hard failure. + */ +static int do_roundtrip(struct io_uring *ring, int sfd, int rfd, + unsigned char *sptr, unsigned char *rptr, size_t len, + int s_fixed, int r_fixed, int waitall) +{ + struct io_uring_sqe *sqe; + struct io_uring_cqe *cqe; + int ret, i, sflags = 0, rflags = 0; + int s_res = INT_MIN, r_res = INT_MIN; + static unsigned seed; + + seed++; + if (waitall) { + sflags |= MSG_WAITALL; + rflags |= MSG_WAITALL; + } + + fill_pattern(sptr, len, seed); + memset(rptr, 0, len); + + sqe = io_uring_get_sqe(ring); + io_uring_prep_send(sqe, sfd, sptr, len, sflags); + if (s_fixed) { + sqe->ioprio |= IORING_RECVSEND_FIXED_BUF; + sqe->buf_index = SBUF_IDX; + } + sqe->user_data = 1; + + sqe = io_uring_get_sqe(ring); + io_uring_prep_recv(sqe, rfd, rptr, len, rflags); + if (r_fixed) { + sqe->ioprio |= IORING_RECVSEND_FIXED_BUF; + sqe->buf_index = RBUF_IDX; + } + sqe->user_data = 2; + + ret = io_uring_submit_and_wait(ring, 2); + if (ret != 2) { + fprintf(stderr, "submit_and_wait: %d\n", ret); + return 1; + } + + for (i = 0; i < 2; i++) { + ret = io_uring_peek_cqe(ring, &cqe); + if (ret) { + fprintf(stderr, "peek_cqe: %d\n", ret); + return 1; + } + if (cqe->user_data == 1) + s_res = cqe->res; + else + r_res = cqe->res; + io_uring_cqe_seen(ring, cqe); + } + + if (s_res == -EINVAL || r_res == -EINVAL) { + no_fixed_buf = 1; + return -EINVAL; + } + if (s_res != (int)len) { + fprintf(stderr, "send res %d, want %zu (s_fixed=%d)\n", + s_res, len, s_fixed); + return 1; + } + if (r_res != (int)len) { + fprintf(stderr, "recv res %d, want %zu (r_fixed=%d)\n", + r_res, len, r_fixed); + return 1; + } + if (memcmp(sptr, rptr, len)) { + fprintf(stderr, "data mismatch (s_fixed=%d r_fixed=%d len=%zu)\n", + s_fixed, r_fixed, len); + return 1; + } + return 0; +} + +/* Submit one sqe (already prepared by caller) and expect a specific res. */ +static int expect_res(struct io_uring *ring, int expect) +{ + struct io_uring_cqe *cqe; + int ret, res; + + ret = io_uring_submit(ring); + if (ret != 1) { + fprintf(stderr, "submit: %d\n", ret); + return 1; + } + ret = io_uring_wait_cqe(ring, &cqe); + if (ret) { + fprintf(stderr, "wait_cqe: %d\n", ret); + return 1; + } + res = cqe->res; + io_uring_cqe_seen(ring, cqe); + if (res != expect) { + fprintf(stderr, "got res %d, expected %d\n", res, expect); + return 1; + } + return 0; +} + +static int test_negative(struct io_uring *ring, int sfd) +{ + struct io_uring_sqe *sqe; + struct msghdr msg = { }; + struct iovec iov; + static char nbuf[64]; + + /* sendmsg + FIXED_BUF is only allowed for plain send -> -EINVAL */ + iov.iov_base = nbuf; + iov.iov_len = sizeof(nbuf); + msg.msg_iov = &iov; + msg.msg_iovlen = 1; + sqe = io_uring_get_sqe(ring); + io_uring_prep_sendmsg(sqe, sfd, &msg, 0); + sqe->ioprio |= IORING_RECVSEND_FIXED_BUF; + sqe->buf_index = SBUF_IDX; + sqe->user_data = 10; + if (expect_res(ring, -EINVAL)) { + fprintf(stderr, "sendmsg+fixed_buf not rejected\n"); + return 1; + } + + /* send + bundle + FIXED_BUF -> -EINVAL */ + sqe = io_uring_get_sqe(ring); + io_uring_prep_send(sqe, sfd, nbuf, sizeof(nbuf), 0); + sqe->ioprio |= IORING_RECVSEND_FIXED_BUF | IORING_RECVSEND_BUNDLE; + sqe->buf_index = SBUF_IDX; + sqe->user_data = 11; + if (expect_res(ring, -EINVAL)) { + fprintf(stderr, "send bundle+fixed_buf not rejected\n"); + return 1; + } + + /* send + vectorized + FIXED_BUF -> -EINVAL */ + sqe = io_uring_get_sqe(ring); + io_uring_prep_send(sqe, sfd, nbuf, sizeof(nbuf), 0); + sqe->ioprio |= IORING_RECVSEND_FIXED_BUF | IORING_SEND_VECTORIZED; + sqe->buf_index = SBUF_IDX; + sqe->user_data = 14; + if (expect_res(ring, -EINVAL)) { + fprintf(stderr, "send vectorized+fixed_buf not rejected\n"); + return 1; + } + + /* recv multishot + FIXED_BUF -> -EINVAL */ + sqe = io_uring_get_sqe(ring); + io_uring_prep_recv_multishot(sqe, sfd, nbuf, sizeof(nbuf), 0); + sqe->ioprio |= IORING_RECVSEND_FIXED_BUF; + sqe->buf_index = RBUF_IDX; + sqe->user_data = 12; + if (expect_res(ring, -EINVAL)) { + fprintf(stderr, "recv multishot+fixed_buf not rejected\n"); + return 1; + } + + /* send fixed with an unregistered buf_index -> -EFAULT at issue */ + sqe = io_uring_get_sqe(ring); + io_uring_prep_send(sqe, sfd, nbuf, sizeof(nbuf), 0); + sqe->ioprio |= IORING_RECVSEND_FIXED_BUF; + sqe->buf_index = 42; + sqe->user_data = 13; + if (expect_res(ring, -EFAULT)) { + fprintf(stderr, "send fixed bad index not -EFAULT\n"); + return 1; + } + + return 0; +} + +int main(int argc, char *argv[]) +{ + struct io_uring ring; + struct iovec regvec[2]; + unsigned char *sbuf, *rbuf, *hbuf; + int ret, fds[2]; + + if (argc > 1) + return T_EXIT_SKIP; + + if (posix_memalign((void **)&sbuf, 4096, BUF_SIZE) || + posix_memalign((void **)&rbuf, 4096, BUF_SIZE)) { + fprintf(stderr, "posix_memalign failed\n"); + return T_EXIT_FAIL; + } + hbuf = malloc(BUF_SIZE); + if (!hbuf) + return T_EXIT_FAIL; + + ret = io_uring_queue_init(8, &ring, 0); + if (ret) { + fprintf(stderr, "queue_init: %d\n", ret); + return T_EXIT_FAIL; + } + + regvec[SBUF_IDX].iov_base = sbuf; + regvec[SBUF_IDX].iov_len = BUF_SIZE; + regvec[RBUF_IDX].iov_base = rbuf; + regvec[RBUF_IDX].iov_len = BUF_SIZE; + ret = io_uring_register_buffers(&ring, regvec, 2); + if (ret) { + fprintf(stderr, "register_buffers: %d\n", ret); + return T_EXIT_FAIL; + } + + ret = t_create_socket_pair(fds, true); + if (ret) { + fprintf(stderr, "socket pair: %d\n", ret); + return T_EXIT_FAIL; + } + + /* send fixed -> recv normal (also doubles as feature detection) */ + ret = do_roundtrip(&ring, fds[1], fds[0], sbuf + OFF, hbuf, 4096, + 1, 0, 0); + if (ret == -EINVAL) { + fprintf(stderr, "IORING_RECVSEND_FIXED_BUF send unsupported, skip\n"); + return T_EXIT_SKIP; + } + if (ret) + goto fail; + + /* send normal -> recv fixed */ + ret = do_roundtrip(&ring, fds[1], fds[0], hbuf, rbuf + OFF, 4096, + 0, 1, 0); + if (ret == -EINVAL) { + fprintf(stderr, "IORING_RECVSEND_FIXED_BUF recv unsupported, skip\n"); + return T_EXIT_SKIP; + } + if (ret) + goto fail; + + /* send fixed -> recv fixed, non-zero offsets on both ends */ + ret = do_roundtrip(&ring, fds[1], fds[0], sbuf + OFF, rbuf + 2 * OFF, + 8192, 1, 1, 0); + if (ret) + goto fail; + + /* large transfer with MSG_WAITALL: persisted bvec iter across retries */ + ret = do_roundtrip(&ring, fds[1], fds[0], sbuf, rbuf, BUF_SIZE, + 1, 1, 1); + if (ret) + goto fail; + + /* and the other direction */ + ret = do_roundtrip(&ring, fds[0], fds[1], sbuf, rbuf, BUF_SIZE, + 1, 1, 1); + if (ret) + goto fail; + + if (test_negative(&ring, fds[1])) + goto fail; + + io_uring_queue_exit(&ring); + close(fds[0]); + close(fds[1]); + free(hbuf); + free(sbuf); + free(rbuf); + return T_EXIT_PASS; +fail: + fprintf(stderr, "test failed\n"); + return T_EXIT_FAIL; +} -- 2.54.0