Add a multithreaded server with a two-phase architecture: a main thread runs an epoll loop on the listening socket and dispatches each accepted connfd to a worker thread by direct array assignment. After the accept loop ends, a barrier release lets each worker submit one IORING_OP_RECV_ZC SQE per assigned connfd (tagged with a connection index in user_data) and process completions in its own io_uring CQE loop. Each per-worker connfd array has a single writer (main, before barrier) and a single reader (the worker, after barrier), so no eventfd, mutex, or queue is required. With multiple queues, connections are dispatched to the correct worker by SO_INCOMING_NAPI_ID using a NAPI-ID-to-thread lookup table populated via a new -n option. Signed-off-by: Juanlu Herrero --- .../selftests/drivers/net/hw/iou-zcrx.c | 238 +++++++++++++----- 1 file changed, 171 insertions(+), 67 deletions(-) diff --git a/tools/testing/selftests/drivers/net/hw/iou-zcrx.c b/tools/testing/selftests/drivers/net/hw/iou-zcrx.c index 6eb738ef4b5cc..03ae5228cb5a4 100644 --- a/tools/testing/selftests/drivers/net/hw/iou-zcrx.c +++ b/tools/testing/selftests/drivers/net/hw/iou-zcrx.c @@ -87,8 +87,15 @@ static struct sockaddr_in6 cfg_addr; static unsigned int cfg_rx_buf_len; static bool cfg_dry_run; static int cfg_num_threads = 1; +static int cfg_napi_ids[64]; +static int cfg_num_napi_ids; static char *payload; +static pthread_barrier_t barrier; + +#define MAX_CONNS_PER_WORKER 64 +#define FIRST_ACCEPT_TIMEOUT_MS 4000 +#define ACCEPT_TIMEOUT_MS 200 struct thread_ctx { struct io_uring ring; @@ -97,9 +104,11 @@ struct thread_ctx { size_t ring_size; struct io_uring_zcrx_rq rq_ring; unsigned long area_token; - int connfd; - bool stop; - size_t received; + int queue_id; + + int connfds[MAX_CONNS_PER_WORKER]; + size_t received[MAX_CONNS_PER_WORKER]; + int nr_conns; }; static unsigned long gettimeofday_ms(void) @@ -199,7 +208,7 @@ static void setup_zcrx(struct thread_ctx *ctx) struct t_io_uring_zcrx_ifq_reg reg = { .if_idx = ifindex, - .if_rxq = cfg_queue_id, + .if_rxq = ctx->queue_id, .rq_entries = rq_entries, .area_ptr = (__u64)(unsigned long)&area_reg, .region_ptr = (__u64)(unsigned long)®ion_reg, @@ -224,53 +233,32 @@ static void setup_zcrx(struct thread_ctx *ctx) ctx->area_token = area_reg.rq_area_token; } -static void add_accept(struct thread_ctx *ctx, int sockfd) +static void add_recvzc(struct thread_ctx *ctx, int conn_idx) { struct io_uring_sqe *sqe; sqe = io_uring_get_sqe(&ctx->ring); - io_uring_prep_accept(sqe, sockfd, NULL, NULL, 0); - sqe->user_data = 1; -} - -static void add_recvzc(struct thread_ctx *ctx, int sockfd) -{ - struct io_uring_sqe *sqe; - - sqe = io_uring_get_sqe(&ctx->ring); - - io_uring_prep_rw(IORING_OP_RECV_ZC, sqe, sockfd, NULL, 0, 0); + io_uring_prep_rw(IORING_OP_RECV_ZC, sqe, ctx->connfds[conn_idx], + NULL, 0, 0); sqe->ioprio |= IORING_RECV_MULTISHOT; - sqe->user_data = 2; + sqe->user_data = conn_idx; } -static void add_recvzc_oneshot(struct thread_ctx *ctx, int sockfd, size_t len) +static void add_recvzc_oneshot(struct thread_ctx *ctx, int conn_idx, size_t len) { struct io_uring_sqe *sqe; sqe = io_uring_get_sqe(&ctx->ring); - io_uring_prep_rw(IORING_OP_RECV_ZC, sqe, sockfd, NULL, len, 0); + io_uring_prep_rw(IORING_OP_RECV_ZC, sqe, ctx->connfds[conn_idx], + NULL, len, 0); sqe->ioprio |= IORING_RECV_MULTISHOT; - sqe->user_data = 2; + sqe->user_data = conn_idx; } -static void process_accept(struct thread_ctx *ctx, struct io_uring_cqe *cqe) -{ - if (cqe->res < 0) - error(1, 0, "accept()"); - if (ctx->connfd) - error(1, 0, "Unexpected second connection"); - - ctx->connfd = cqe->res; - if (cfg_oneshot) - add_recvzc_oneshot(ctx, ctx->connfd, page_size); - else - add_recvzc(ctx, ctx->connfd); -} - -static void process_recvzc(struct thread_ctx *ctx, struct io_uring_cqe *cqe) +static void process_recvzc(struct thread_ctx *ctx, struct io_uring_cqe *cqe, + int conn_idx) { unsigned rq_mask = ctx->rq_ring.ring_entries - 1; struct io_uring_zcrx_cqe *rcqe; @@ -281,7 +269,7 @@ static void process_recvzc(struct thread_ctx *ctx, struct io_uring_cqe *cqe) int i; if (cqe->res == 0 && cqe->flags == 0 && cfg_oneshot_recvs == 0) { - ctx->stop = true; + ctx->nr_conns--; return; } @@ -290,11 +278,11 @@ static void process_recvzc(struct thread_ctx *ctx, struct io_uring_cqe *cqe) if (cfg_oneshot) { if (cqe->res == 0 && cqe->flags == 0 && cfg_oneshot_recvs) { - add_recvzc_oneshot(ctx, ctx->connfd, page_size); + add_recvzc_oneshot(ctx, conn_idx, page_size); cfg_oneshot_recvs--; } } else if (!(cqe->flags & IORING_CQE_F_MORE)) { - add_recvzc(ctx, ctx->connfd); + add_recvzc(ctx, conn_idx); } rcqe = (struct io_uring_zcrx_cqe *)(cqe + 1); @@ -304,10 +292,10 @@ static void process_recvzc(struct thread_ctx *ctx, struct io_uring_cqe *cqe) data = (char *)ctx->area_ptr + (rcqe->off & mask); for (i = 0; i < n; i++) { - if (*(data + i) != payload[(ctx->received + i)]) + if (*(data + i) != payload[(ctx->received[conn_idx] + i)]) error(1, 0, "payload mismatch at %d", i); } - ctx->received += n; + ctx->received[conn_idx] += n; rqe = &ctx->rq_ring.rqes[(ctx->rq_ring.rq_tail & rq_mask)]; rqe->off = (rcqe->off & ~IORING_ZCRX_AREA_MASK) | ctx->area_token; @@ -320,28 +308,80 @@ static void server_loop(struct thread_ctx *ctx) struct io_uring_cqe *cqe; unsigned int count = 0; unsigned int head; - int i, ret; io_uring_submit_and_wait(&ctx->ring, 1); io_uring_for_each_cqe(&ctx->ring, head, cqe) { - if (cqe->user_data == 1) - process_accept(ctx, cqe); - else if (cqe->user_data == 2) - process_recvzc(ctx, cqe); - else - error(1, 0, "unknown cqe"); + process_recvzc(ctx, cqe, cqe->user_data); count++; } io_uring_cq_advance(&ctx->ring, count); } -static void run_server(void) +static void *server_worker(void *arg) { - struct thread_ctx ctx = {}; + struct thread_ctx *ctx = arg; unsigned int flags = 0; - int fd, enable, ret; uint64_t tstop; + int i; + + flags |= IORING_SETUP_COOP_TASKRUN; + flags |= IORING_SETUP_SINGLE_ISSUER; + flags |= IORING_SETUP_DEFER_TASKRUN; + flags |= IORING_SETUP_SUBMIT_ALL; + flags |= IORING_SETUP_CQE32; + + io_uring_queue_init(512, &ctx->ring, flags); + setup_zcrx(ctx); + + pthread_barrier_wait(&barrier); + + if (cfg_dry_run) + return NULL; + + pthread_barrier_wait(&barrier); + + for (i = 0; i < ctx->nr_conns; i++) { + if (cfg_oneshot) + add_recvzc_oneshot(ctx, i, page_size); + else + add_recvzc(ctx, i); + } + + tstop = gettimeofday_ms() + 5000; + while (ctx->nr_conns > 0 && gettimeofday_ms() < tstop) + server_loop(ctx); + + if (ctx->nr_conns != 0) + error(1, 0, "test failed: %d connections incomplete", + ctx->nr_conns); + + return NULL; +} + +static int find_thread_by_napi(int napi_id) +{ + int i; + + for (i = 0; i < cfg_num_napi_ids; i++) { + if (cfg_napi_ids[i] == napi_id) + return i; + } + return -1; +} + +static void run_server(void) +{ + struct epoll_event ev = { .events = EPOLLIN }; + int timeout_ms = FIRST_ACCEPT_TIMEOUT_MS; + struct thread_ctx *ctxs; + pthread_t *threads; + int fd, epfd, ret, enable, i; + + ctxs = calloc(cfg_num_threads, sizeof(*ctxs)); + threads = calloc(cfg_num_threads, sizeof(*threads)); + if (!ctxs || !threads) + error(1, 0, "calloc()"); fd = socket(AF_INET6, SOCK_STREAM, 0); if (fd == -1) @@ -359,26 +399,78 @@ static void run_server(void) if (listen(fd, 1024) < 0) error(1, 0, "listen()"); - flags |= IORING_SETUP_COOP_TASKRUN; - flags |= IORING_SETUP_SINGLE_ISSUER; - flags |= IORING_SETUP_DEFER_TASKRUN; - flags |= IORING_SETUP_SUBMIT_ALL; - flags |= IORING_SETUP_CQE32; + pthread_barrier_init(&barrier, NULL, cfg_num_threads + 1); - io_uring_queue_init(512, &ctx.ring, flags); + for (i = 0; i < cfg_num_threads; i++) + ctxs[i].queue_id = cfg_queue_id + i; + + for (i = 0; i < cfg_num_threads; i++) { + ret = pthread_create(&threads[i], NULL, server_worker, &ctxs[i]); + if (ret) + error(1, ret, "pthread_create()"); + } + + pthread_barrier_wait(&barrier); - setup_zcrx(&ctx); if (cfg_dry_run) - return; + goto join; - add_accept(&ctx, fd); + epfd = epoll_create1(0); + if (epfd < 0) + error(1, errno, "epoll_create1()"); - tstop = gettimeofday_ms() + 5000; - while (!ctx.stop && gettimeofday_ms() < tstop) - server_loop(&ctx); + if (epoll_ctl(epfd, EPOLL_CTL_ADD, fd, &ev) < 0) + error(1, errno, "epoll_ctl()"); + + while (1) { + struct epoll_event out_ev; + int nfds, idx, connfd; + + nfds = epoll_wait(epfd, &out_ev, 1, timeout_ms); + if (nfds < 0) + error(1, errno, "epoll_wait()"); + if (nfds == 0) + break; + timeout_ms = ACCEPT_TIMEOUT_MS; + + connfd = accept(fd, NULL, NULL); + if (connfd < 0) + error(1, errno, "accept()"); + + if (cfg_num_napi_ids > 0) { + int napi_id; + socklen_t len = sizeof(napi_id); + + ret = getsockopt(connfd, SOL_SOCKET, + SO_INCOMING_NAPI_ID, + &napi_id, &len); + if (ret < 0) + error(1, errno, "getsockopt(SO_INCOMING_NAPI_ID)"); + + idx = find_thread_by_napi(napi_id); + if (idx < 0) + error(1, 0, "unknown NAPI ID: %d", napi_id); + } else { + idx = 0; + } + + if (ctxs[idx].nr_conns >= MAX_CONNS_PER_WORKER) + error(1, 0, "worker %d connection overflow", idx); + ctxs[idx].connfds[ctxs[idx].nr_conns++] = connfd; + } - if (!ctx.stop) - error(1, 0, "test failed\n"); + close(epfd); + + pthread_barrier_wait(&barrier); + +join: + for (i = 0; i < cfg_num_threads; i++) + pthread_join(threads[i], NULL); + + pthread_barrier_destroy(&barrier); + close(fd); + free(threads); + free(ctxs); } static void *client_worker(void *arg) @@ -438,8 +530,8 @@ static void run_client(void) static void usage(const char *filepath) { error(1, 0, "Usage: %s (-4|-6) (-s|-c) -h -p " - "-l -i -q -t", - filepath); + "-l -i -q -t " + "-n", filepath); } static void parse_opts(int argc, char **argv) @@ -457,7 +549,7 @@ static void parse_opts(int argc, char **argv) usage(argv[0]); cfg_payload_len = max_payload_len; - while ((c = getopt(argc, argv, "sch:p:l:i:q:o:z:x:dt:")) != -1) { + while ((c = getopt(argc, argv, "sch:p:l:i:q:o:z:x:dt:n:")) != -1) { switch (c) { case 's': if (cfg_client) @@ -501,6 +593,18 @@ static void parse_opts(int argc, char **argv) case 't': cfg_num_threads = strtoul(optarg, NULL, 0); break; + case 'n': { + char *tok, *str = optarg; + + cfg_num_napi_ids = 0; + while ((tok = strsep(&str, ",")) != NULL) { + if (cfg_num_napi_ids >= 64) + error(1, 0, "too many NAPI IDs"); + cfg_napi_ids[cfg_num_napi_ids++] = + strtoul(tok, NULL, 0); + } + break; + } } } -- 2.52.0