Add multi-port support to the iou-zcrx test binary and a new rss_multiqueue Python test variant that exercises multi-queue zero-copy receive with per-port flow rule steering. In multi-port mode, the server creates N listening sockets on consecutive ports (cfg_port, cfg_port+1, ...) and uses epoll to accept one connection per socket. Each client thread connects to its corresponding port. Per-port ntuple flow rules steer traffic to different NIC hardware queues, each with its own zcrx instance. For single-thread mode (the default), behavior is unchanged: one socket on cfg_port, one thread, one queue. Signed-off-by: Juanlu Herrero --- .../selftests/drivers/net/hw/iou-zcrx.c | 81 ++++++++++++++----- .../selftests/drivers/net/hw/iou-zcrx.py | 45 ++++++++++- 2 files changed, 104 insertions(+), 22 deletions(-) diff --git a/tools/testing/selftests/drivers/net/hw/iou-zcrx.c b/tools/testing/selftests/drivers/net/hw/iou-zcrx.c index 646682167bb0..1f33d7127185 100644 --- a/tools/testing/selftests/drivers/net/hw/iou-zcrx.c +++ b/tools/testing/selftests/drivers/net/hw/iou-zcrx.c @@ -102,6 +102,7 @@ struct thread_ctx { bool stop; size_t received; int queue_id; + int port; int thread_id; }; @@ -353,35 +354,47 @@ static void *server_worker(void *arg) static void run_server(void) { + struct epoll_event ev, events[64]; struct thread_ctx *ctxs; + struct sockaddr_in6 addr; pthread_t *threads; - int fd, ret, i, enable; + int *fds; + int epfd, nfds, accepted; + int ret, i, enable; ctxs = calloc(cfg_num_threads, sizeof(*ctxs)); threads = calloc(cfg_num_threads, sizeof(*threads)); - if (!ctxs || !threads) + fds = calloc(cfg_num_threads, sizeof(*fds)); + if (!ctxs || !threads || !fds) error(1, 0, "calloc()"); - fd = socket(AF_INET6, SOCK_STREAM, 0); - if (fd == -1) - error(1, 0, "socket()"); + for (i = 0; i < cfg_num_threads; i++) { + fds[i] = socket(AF_INET6, SOCK_STREAM, 0); + if (fds[i] == -1) + error(1, 0, "socket()"); + + enable = 1; + ret = setsockopt(fds[i], SOL_SOCKET, SO_REUSEADDR, + &enable, sizeof(int)); + if (ret < 0) + error(1, 0, "setsockopt(SO_REUSEADDR)"); - enable = 1; - ret = setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &enable, sizeof(int)); - if (ret < 0) - error(1, 0, "setsockopt(SO_REUSEADDR)"); + addr = cfg_addr; + addr.sin6_port = htons(cfg_port + i); - ret = bind(fd, (struct sockaddr *)&cfg_addr, sizeof(cfg_addr)); - if (ret < 0) - error(1, 0, "bind()"); + ret = bind(fds[i], (struct sockaddr *)&addr, sizeof(addr)); + if (ret < 0) + error(1, 0, "bind()"); - if (listen(fd, 1024) < 0) - error(1, 0, "listen()"); + if (listen(fds[i], 1024) < 0) + error(1, 0, "listen()"); + } pthread_barrier_init(&barrier, NULL, cfg_num_threads + 1); for (i = 0; i < cfg_num_threads; i++) { ctxs[i].queue_id = cfg_queue_id + i; + ctxs[i].port = cfg_port + i; ctxs[i].thread_id = i; } @@ -397,12 +410,36 @@ static void run_server(void) if (cfg_dry_run) goto join; + epfd = epoll_create1(0); + if (epfd < 0) + error(1, 0, "epoll_create1()"); + for (i = 0; i < cfg_num_threads; i++) { - ctxs[i].connfd = accept(fd, NULL, NULL); - if (ctxs[i].connfd < 0) - error(1, 0, "accept()"); + ev.events = EPOLLIN; + ev.data.u32 = i; + if (epoll_ctl(epfd, EPOLL_CTL_ADD, fds[i], &ev) < 0) + error(1, 0, "epoll_ctl()"); } + accepted = 0; + while (accepted < cfg_num_threads) { + nfds = epoll_wait(epfd, events, 64, 5000); + if (nfds < 0) + error(1, 0, "epoll_wait()"); + if (nfds == 0) + error(1, 0, "epoll_wait() timeout"); + + for (i = 0; i < nfds; i++) { + int idx = events[i].data.u32; + + ctxs[idx].connfd = accept(fds[idx], NULL, NULL); + if (ctxs[idx].connfd < 0) + error(1, 0, "accept()"); + accepted++; + } + } + + close(epfd); pthread_barrier_wait(&barrier); join: @@ -410,23 +447,29 @@ static void run_server(void) pthread_join(threads[i], NULL); pthread_barrier_destroy(&barrier); - close(fd); + for (i = 0; i < cfg_num_threads; i++) + close(fds[i]); + free(fds); free(threads); free(ctxs); } static void *client_worker(void *arg) { + struct thread_ctx *ctx = arg; + struct sockaddr_in6 addr = cfg_addr; ssize_t to_send = cfg_send_size; ssize_t sent = 0; ssize_t chunk, res; int fd; + addr.sin6_port = htons(cfg_port + ctx->thread_id); + fd = socket(AF_INET6, SOCK_STREAM, 0); if (fd == -1) error(1, 0, "socket()"); - if (connect(fd, (struct sockaddr *)&cfg_addr, sizeof(cfg_addr))) + if (connect(fd, (struct sockaddr *)&addr, sizeof(addr))) error(1, 0, "connect()"); while (to_send) { diff --git a/tools/testing/selftests/drivers/net/hw/iou-zcrx.py b/tools/testing/selftests/drivers/net/hw/iou-zcrx.py index e81724cb5542..c918cdaf6b1b 100755 --- a/tools/testing/selftests/drivers/net/hw/iou-zcrx.py +++ b/tools/testing/selftests/drivers/net/hw/iou-zcrx.py @@ -35,6 +35,12 @@ def set_flow_rule(cfg): return int(values) +def set_flow_rule_port(cfg, port, queue): + output = ethtool(f"-N {cfg.ifname} flow-type tcp6 dst-port {port} action {queue}").stdout + values = re.search(r'ID (\d+)', output).group(1) + return int(values) + + def set_flow_rule_rss(cfg, rss_ctx_id): output = ethtool(f"-N {cfg.ifname} flow-type tcp6 dst-port {cfg.port} context {rss_ctx_id}").stdout values = re.search(r'ID (\d+)', output).group(1) @@ -100,18 +106,51 @@ def rss(cfg): defer(ethtool, f"-N {cfg.ifname} delete {flow_rule_id}") +def rss_multiqueue(cfg): + channels = cfg.ethnl.channels_get({'header': {'dev-index': cfg.ifindex}}) + channels = channels['combined-count'] + if channels < 3: + raise KsftSkipEx('Test requires NETIF with at least 3 combined channels') + + rings = cfg.ethnl.rings_get({'header': {'dev-index': cfg.ifindex}}) + rx_rings = rings['rx'] + hds_thresh = rings.get('hds-thresh', 0) + + cfg.ethnl.rings_set({'header': {'dev-index': cfg.ifindex}, + 'tcp-data-split': 'enabled', + 'hds-thresh': 0, + 'rx': 64}) + defer(cfg.ethnl.rings_set, {'header': {'dev-index': cfg.ifindex}, + 'tcp-data-split': 'unknown', + 'hds-thresh': hds_thresh, + 'rx': rx_rings}) + defer(mp_clear_wait, cfg) + + cfg.num_threads = 2 + cfg.target = channels - cfg.num_threads + ethtool(f"-X {cfg.ifname} equal {cfg.target}") + defer(ethtool, f"-X {cfg.ifname} default") + + for i in range(cfg.num_threads): + flow_rule_id = set_flow_rule_port(cfg, cfg.port + i, cfg.target + i) + defer(ethtool, f"-N {cfg.ifname} delete {flow_rule_id}") + + @ksft_variants([ KsftNamedVariant("single", single), KsftNamedVariant("rss", rss), + KsftNamedVariant("rss_multiqueue", rss_multiqueue), ]) def test_zcrx(cfg, setup) -> None: cfg.require_ipver('6') + cfg.num_threads = getattr(cfg, 'num_threads', 1) setup(cfg) - rx_cmd = f"{cfg.bin_local} -s -p {cfg.port} -i {cfg.ifname} -q {cfg.target}" - tx_cmd = f"{cfg.bin_remote} -c -h {cfg.addr_v['6']} -p {cfg.port} -l 12840" + rx_cmd = f"{cfg.bin_local} -s -p {cfg.port} -i {cfg.ifname} -q {cfg.target} -t {cfg.num_threads}" + tx_cmd = f"{cfg.bin_remote} -c -h {cfg.addr_v['6']} -p {cfg.port} -l 12840 -t {cfg.num_threads}" with bkg(rx_cmd, exit_wait=True): - wait_port_listen(cfg.port, proto="tcp") + for i in range(cfg.num_threads): + wait_port_listen(cfg.port + i, proto="tcp") cmd(tx_cmd, host=cfg.remote) -- 2.53.0