Move server state (io_uring ring, zcrx area, receive tracking) from global variables into struct thread_ctx and thread the server side. The main thread creates a single listening socket, spawns N worker threads (each setting up its own io_uring and zcrx instance), then accepts N connections and distributes them to the workers via pthread barriers for synchronization. Signed-off-by: Juanlu Herrero --- .../selftests/drivers/net/hw/iou-zcrx.c | 247 ++++++++++-------- 1 file changed, 140 insertions(+), 107 deletions(-) diff --git a/tools/testing/selftests/drivers/net/hw/iou-zcrx.c b/tools/testing/selftests/drivers/net/hw/iou-zcrx.c index 6185c855b85c..646682167bb0 100644 --- a/tools/testing/selftests/drivers/net/hw/iou-zcrx.c +++ b/tools/testing/selftests/drivers/net/hw/iou-zcrx.c @@ -89,20 +89,22 @@ static bool cfg_dry_run; static int cfg_num_threads = 1; static char *payload; +static pthread_barrier_t barrier; struct thread_ctx { + struct io_uring ring; + void *area_ptr; + void *ring_ptr; + size_t ring_size; + struct io_uring_zcrx_rq rq_ring; + unsigned long area_token; + int connfd; + bool stop; + size_t received; + int queue_id; int thread_id; }; -static void *area_ptr; -static void *ring_ptr; -static size_t ring_size; -static struct io_uring_zcrx_rq rq_ring; -static unsigned long area_token; -static int connfd; -static bool stop; -static size_t received; - static unsigned long gettimeofday_ms(void) { struct timeval tv; @@ -145,7 +147,7 @@ static inline size_t get_refill_ring_size(unsigned int rq_entries) return ALIGN_UP(size, page_size); } -static void setup_zcrx(struct io_uring *ring) +static void setup_zcrx(struct thread_ctx *ctx) { unsigned int ifindex; unsigned int rq_entries = 4096; @@ -156,58 +158,58 @@ static void setup_zcrx(struct io_uring *ring) error(1, 0, "bad interface name: %s", cfg_ifname); if (cfg_rx_buf_len && cfg_rx_buf_len != page_size) { - area_ptr = mmap(NULL, - AREA_SIZE, - PROT_READ | PROT_WRITE, - MAP_ANONYMOUS | MAP_PRIVATE | - MAP_HUGETLB | MAP_HUGE_2MB, - -1, - 0); - if (area_ptr == MAP_FAILED) { + ctx->area_ptr = mmap(NULL, + AREA_SIZE, + PROT_READ | PROT_WRITE, + MAP_ANONYMOUS | MAP_PRIVATE | + MAP_HUGETLB | MAP_HUGE_2MB, + -1, + 0); + if (ctx->area_ptr == MAP_FAILED) { printf("Can't allocate huge pages\n"); exit(SKIP_CODE); } } else { - area_ptr = mmap(NULL, - AREA_SIZE, - PROT_READ | PROT_WRITE, - MAP_ANONYMOUS | MAP_PRIVATE, - 0, - 0); - if (area_ptr == MAP_FAILED) + ctx->area_ptr = mmap(NULL, + AREA_SIZE, + PROT_READ | PROT_WRITE, + MAP_ANONYMOUS | MAP_PRIVATE, + 0, + 0); + if (ctx->area_ptr == MAP_FAILED) error(1, 0, "mmap(): zero copy area"); } - ring_size = get_refill_ring_size(rq_entries); - ring_ptr = mmap(NULL, - ring_size, - PROT_READ | PROT_WRITE, - MAP_ANONYMOUS | MAP_PRIVATE, - 0, - 0); + ctx->ring_size = get_refill_ring_size(rq_entries); + ctx->ring_ptr = mmap(NULL, + ctx->ring_size, + PROT_READ | PROT_WRITE, + MAP_ANONYMOUS | MAP_PRIVATE, + 0, + 0); struct io_uring_region_desc region_reg = { - .size = ring_size, - .user_addr = (__u64)(unsigned long)ring_ptr, + .size = ctx->ring_size, + .user_addr = (__u64)(unsigned long)ctx->ring_ptr, .flags = IORING_MEM_REGION_TYPE_USER, }; struct io_uring_zcrx_area_reg area_reg = { - .addr = (__u64)(unsigned long)area_ptr, + .addr = (__u64)(unsigned long)ctx->area_ptr, .len = AREA_SIZE, .flags = 0, }; struct t_io_uring_zcrx_ifq_reg reg = { .if_idx = ifindex, - .if_rxq = cfg_queue_id, + .if_rxq = ctx->queue_id, .rq_entries = rq_entries, .area_ptr = (__u64)(unsigned long)&area_reg, .region_ptr = (__u64)(unsigned long)®ion_reg, .rx_buf_len = cfg_rx_buf_len, }; - ret = io_uring_register_ifq(ring, (void *)®); + ret = io_uring_register_ifq(&ctx->ring, (void *)®); if (cfg_rx_buf_len && (ret == -EINVAL || ret == -EOPNOTSUPP || ret == -ERANGE)) { printf("Large chunks are not supported %i\n", ret); @@ -216,64 +218,40 @@ static void setup_zcrx(struct io_uring *ring) error(1, 0, "io_uring_register_ifq(): %d", ret); } - rq_ring.khead = (unsigned int *)((char *)ring_ptr + reg.offsets.head); - rq_ring.ktail = (unsigned int *)((char *)ring_ptr + reg.offsets.tail); - rq_ring.rqes = (struct io_uring_zcrx_rqe *)((char *)ring_ptr + reg.offsets.rqes); - rq_ring.rq_tail = 0; - rq_ring.ring_entries = reg.rq_entries; - - area_token = area_reg.rq_area_token; -} - -static void add_accept(struct io_uring *ring, int sockfd) -{ - struct io_uring_sqe *sqe; - - sqe = io_uring_get_sqe(ring); + ctx->rq_ring.khead = (unsigned int *)((char *)ctx->ring_ptr + reg.offsets.head); + ctx->rq_ring.ktail = (unsigned int *)((char *)ctx->ring_ptr + reg.offsets.tail); + ctx->rq_ring.rqes = (struct io_uring_zcrx_rqe *)((char *)ctx->ring_ptr + reg.offsets.rqes); + ctx->rq_ring.rq_tail = 0; + ctx->rq_ring.ring_entries = reg.rq_entries; - io_uring_prep_accept(sqe, sockfd, NULL, NULL, 0); - sqe->user_data = 1; + ctx->area_token = area_reg.rq_area_token; } -static void add_recvzc(struct io_uring *ring, int sockfd) +static void add_recvzc(struct thread_ctx *ctx, int sockfd) { struct io_uring_sqe *sqe; - sqe = io_uring_get_sqe(ring); + sqe = io_uring_get_sqe(&ctx->ring); io_uring_prep_rw(IORING_OP_RECV_ZC, sqe, sockfd, NULL, 0, 0); sqe->ioprio |= IORING_RECV_MULTISHOT; sqe->user_data = 2; } -static void add_recvzc_oneshot(struct io_uring *ring, int sockfd, size_t len) +static void add_recvzc_oneshot(struct thread_ctx *ctx, int sockfd, size_t len) { struct io_uring_sqe *sqe; - sqe = io_uring_get_sqe(ring); + sqe = io_uring_get_sqe(&ctx->ring); io_uring_prep_rw(IORING_OP_RECV_ZC, sqe, sockfd, NULL, len, 0); sqe->ioprio |= IORING_RECV_MULTISHOT; sqe->user_data = 2; } -static void process_accept(struct io_uring *ring, struct io_uring_cqe *cqe) +static void process_recvzc(struct thread_ctx *ctx, struct io_uring_cqe *cqe) { - if (cqe->res < 0) - error(1, 0, "accept()"); - if (connfd) - error(1, 0, "Unexpected second connection"); - - connfd = cqe->res; - if (cfg_oneshot) - add_recvzc_oneshot(ring, connfd, page_size); - else - add_recvzc(ring, connfd); -} - -static void process_recvzc(struct io_uring *ring, struct io_uring_cqe *cqe) -{ - unsigned rq_mask = rq_ring.ring_entries - 1; + unsigned rq_mask = ctx->rq_ring.ring_entries - 1; struct io_uring_zcrx_cqe *rcqe; struct io_uring_zcrx_rqe *rqe; uint64_t mask; @@ -282,7 +260,7 @@ static void process_recvzc(struct io_uring *ring, struct io_uring_cqe *cqe) int i; if (cqe->res == 0 && cqe->flags == 0 && cfg_oneshot_recvs == 0) { - stop = true; + ctx->stop = true; return; } @@ -291,59 +269,99 @@ static void process_recvzc(struct io_uring *ring, struct io_uring_cqe *cqe) if (cfg_oneshot) { if (cqe->res == 0 && cqe->flags == 0 && cfg_oneshot_recvs) { - add_recvzc_oneshot(ring, connfd, page_size); + add_recvzc_oneshot(ctx, ctx->connfd, page_size); cfg_oneshot_recvs--; } } else if (!(cqe->flags & IORING_CQE_F_MORE)) { - add_recvzc(ring, connfd); + add_recvzc(ctx, ctx->connfd); } rcqe = (struct io_uring_zcrx_cqe *)(cqe + 1); n = cqe->res; mask = (1ULL << IORING_ZCRX_AREA_SHIFT) - 1; - data = (char *)area_ptr + (rcqe->off & mask); + data = (char *)ctx->area_ptr + (rcqe->off & mask); for (i = 0; i < n; i++) { - if (*(data + i) != payload[(received + i)]) + if (*(data + i) != payload[(ctx->received + i)]) error(1, 0, "payload mismatch at %d", i); } - received += n; + ctx->received += n; - rqe = &rq_ring.rqes[(rq_ring.rq_tail & rq_mask)]; - rqe->off = (rcqe->off & ~IORING_ZCRX_AREA_MASK) | area_token; + rqe = &ctx->rq_ring.rqes[(ctx->rq_ring.rq_tail & rq_mask)]; + rqe->off = (rcqe->off & ~IORING_ZCRX_AREA_MASK) | ctx->area_token; rqe->len = cqe->res; - io_uring_smp_store_release(rq_ring.ktail, ++rq_ring.rq_tail); + io_uring_smp_store_release(ctx->rq_ring.ktail, ++ctx->rq_ring.rq_tail); } -static void server_loop(struct io_uring *ring) +static void server_loop(struct thread_ctx *ctx) { struct io_uring_cqe *cqe; unsigned int count = 0; unsigned int head; int i, ret; - io_uring_submit_and_wait(ring, 1); + io_uring_submit_and_wait(&ctx->ring, 1); - io_uring_for_each_cqe(ring, head, cqe) { - if (cqe->user_data == 1) - process_accept(ring, cqe); - else if (cqe->user_data == 2) - process_recvzc(ring, cqe); + io_uring_for_each_cqe(&ctx->ring, head, cqe) { + if (cqe->user_data == 2) + process_recvzc(ctx, cqe); else error(1, 0, "unknown cqe"); count++; } - io_uring_cq_advance(ring, count); + io_uring_cq_advance(&ctx->ring, count); } -static void run_server(void) +static void *server_worker(void *arg) { + struct thread_ctx *ctx = arg; unsigned int flags = 0; - struct io_uring ring; - int fd, enable, ret; uint64_t tstop; + flags |= IORING_SETUP_COOP_TASKRUN; + flags |= IORING_SETUP_SINGLE_ISSUER; + flags |= IORING_SETUP_DEFER_TASKRUN; + flags |= IORING_SETUP_SUBMIT_ALL; + flags |= IORING_SETUP_CQE32; + + io_uring_queue_init(512, &ctx->ring, flags); + + setup_zcrx(ctx); + + pthread_barrier_wait(&barrier); + + if (cfg_dry_run) + return NULL; + + pthread_barrier_wait(&barrier); + + if (cfg_oneshot) + add_recvzc_oneshot(ctx, ctx->connfd, page_size); + else + add_recvzc(ctx, ctx->connfd); + + tstop = gettimeofday_ms() + 5000; + while (!ctx->stop && gettimeofday_ms() < tstop) + server_loop(ctx); + + if (!ctx->stop) + error(1, 0, "test failed\n"); + + return NULL; +} + +static void run_server(void) +{ + struct thread_ctx *ctxs; + pthread_t *threads; + int fd, ret, i, enable; + + ctxs = calloc(cfg_num_threads, sizeof(*ctxs)); + threads = calloc(cfg_num_threads, sizeof(*threads)); + if (!ctxs || !threads) + error(1, 0, "calloc()"); + fd = socket(AF_INET6, SOCK_STREAM, 0); if (fd == -1) error(1, 0, "socket()"); @@ -360,26 +378,41 @@ static void run_server(void) if (listen(fd, 1024) < 0) error(1, 0, "listen()"); - flags |= IORING_SETUP_COOP_TASKRUN; - flags |= IORING_SETUP_SINGLE_ISSUER; - flags |= IORING_SETUP_DEFER_TASKRUN; - flags |= IORING_SETUP_SUBMIT_ALL; - flags |= IORING_SETUP_CQE32; + pthread_barrier_init(&barrier, NULL, cfg_num_threads + 1); + + for (i = 0; i < cfg_num_threads; i++) { + ctxs[i].queue_id = cfg_queue_id + i; + ctxs[i].thread_id = i; + } - io_uring_queue_init(512, &ring, flags); + for (i = 0; i < cfg_num_threads; i++) { + ret = pthread_create(&threads[i], NULL, server_worker, + &ctxs[i]); + if (ret) + error(1, ret, "pthread_create()"); + } + + pthread_barrier_wait(&barrier); - setup_zcrx(&ring); if (cfg_dry_run) - return; + goto join; + + for (i = 0; i < cfg_num_threads; i++) { + ctxs[i].connfd = accept(fd, NULL, NULL); + if (ctxs[i].connfd < 0) + error(1, 0, "accept()"); + } - add_accept(&ring, fd); + pthread_barrier_wait(&barrier); - tstop = gettimeofday_ms() + 5000; - while (!stop && gettimeofday_ms() < tstop) - server_loop(&ring); +join: + for (i = 0; i < cfg_num_threads; i++) + pthread_join(threads[i], NULL); - if (!stop) - error(1, 0, "test failed\n"); + pthread_barrier_destroy(&barrier); + close(fd); + free(threads); + free(ctxs); } static void *client_worker(void *arg) -- 2.53.0