From: Jason Xing For afxdp, the return value of sendto() syscall doesn't reflect how many descs handled in the kernel. One of use cases is that when user-space application tries to know the number of transmitted skbs and then decides if it continues to send, say, is it stopped due to max tx budget? The following formular can be used after sending to learn how many skbs/descs the kernel takes care of: tx_queue.consumers_before - tx_queue.consumers_after Prior to the current patch, in non-zc mode, the consumer of tx queue is not immediately updated at the end of each sendto syscall when error occurs, which leads to the consumer value out-of-dated from the perspective of user space. So this patch requires store operation to pass the cached value to the shared value to handle the problem. More than those explicit errors appearing in the while() loop in __xsk_generic_xmit(), there are a few possible error cases that might be neglected in the following call trace: __xsk_generic_xmit() xskq_cons_peek_desc() xskq_cons_read_desc() xskq_cons_is_valid_desc() It will also cause the premature exit in the while() loop even if not all the descs are consumed. Based on the above analysis, using @sent_frame could cover all the possible cases where it might lead to out-of-dated global state of consumer after finishing __xsk_generic_xmit(). The patch also adds a common helper __xsk_tx_release() to keep align with the zc mode usage in xsk_tx_release(). Signed-off-by: Jason Xing --- v4 Link: https://lore.kernel.org/all/20250625101014.45066-1-kerneljasonxing@gmail.com/ 1. use the common helper 2. keep align with the zc mode usage in xsk_tx_release() v3 Link: https://lore.kernel.org/all/20250623073129.23290-1-kerneljasonxing@gmail.com/ 1. use xskq_has_descs helper. 2. add selftest V2 Link: https://lore.kernel.org/all/20250619093641.70700-1-kerneljasonxing@gmail.com/ 1. filter out those good cases because only those that return error need updates. Side note: 1. in non-batched zero copy mode, at the end of every caller of xsk_tx_peek_desc(), there is always a xsk_tx_release() function that used to update the local consumer to the global state of consumer. So for the zero copy mode, no need to change at all. 2. Actually I have no strong preference between v1 (see the above link) and v2 because smp_store_release() shouldn't cause side effect. Considering the exactitude of writing code, v2 is a more preferable one. --- net/xdp/xsk.c | 17 ++++++++++------- 1 file changed, 10 insertions(+), 7 deletions(-) diff --git a/net/xdp/xsk.c b/net/xdp/xsk.c index 72c000c0ae5f..bd61b0bc9c24 100644 --- a/net/xdp/xsk.c +++ b/net/xdp/xsk.c @@ -300,6 +300,13 @@ static bool xsk_tx_writeable(struct xdp_sock *xs) return true; } +static void __xsk_tx_release(struct xdp_sock *xs) +{ + __xskq_cons_release(xs->tx); + if (xsk_tx_writeable(xs)) + xs->sk.sk_write_space(&xs->sk); +} + static bool xsk_is_bound(struct xdp_sock *xs) { if (READ_ONCE(xs->state) == XSK_BOUND) { @@ -407,11 +414,8 @@ void xsk_tx_release(struct xsk_buff_pool *pool) struct xdp_sock *xs; rcu_read_lock(); - list_for_each_entry_rcu(xs, &pool->xsk_tx_list, tx_list) { - __xskq_cons_release(xs->tx); - if (xsk_tx_writeable(xs)) - xs->sk.sk_write_space(&xs->sk); - } + list_for_each_entry_rcu(xs, &pool->xsk_tx_list, tx_list) + __xsk_tx_release(xs); rcu_read_unlock(); } EXPORT_SYMBOL(xsk_tx_release); @@ -858,8 +862,7 @@ static int __xsk_generic_xmit(struct sock *sk) out: if (sent_frame) - if (xsk_tx_writeable(xs)) - sk->sk_write_space(sk); + __xsk_tx_release(xs); mutex_unlock(&xs->mutex); return err; -- 2.41.3 From: Jason Xing This patch only checks non-zc mode and non STAT_TX_INVALID testcase. The conditions are included in check_consumer(). The policy of testing the issue is to recognize the max budget case where the number of descs in the tx queue is larger than the default max budget, namely, 32, to make sure that 1) the max_batch error is triggered in __xsk_generic_xmit(), 2) xskq_cons_peek_desc() doesn't have the chance to update the global state of consumer at last. Hitting max budget case is just one of premature exit cases but has the same result/action in __xsk_generic_xmit(). Signed-off-by: Jason Xing --- tools/testing/selftests/bpf/xskxceiver.c | 60 +++++++++++++++++++----- 1 file changed, 48 insertions(+), 12 deletions(-) diff --git a/tools/testing/selftests/bpf/xskxceiver.c b/tools/testing/selftests/bpf/xskxceiver.c index 0ced4026ee44..694b0c0e1217 100644 --- a/tools/testing/selftests/bpf/xskxceiver.c +++ b/tools/testing/selftests/bpf/xskxceiver.c @@ -109,6 +109,8 @@ #include +#define MAX_TX_BUDGET_DEFAULT 32 + static bool opt_verbose; static bool opt_print_tests; static enum test_mode opt_mode = TEST_MODE_ALL; @@ -1091,11 +1093,34 @@ static bool is_pkt_valid(struct pkt *pkt, void *buffer, u64 addr, u32 len) return true; } -static int kick_tx(struct xsk_socket_info *xsk) +static u32 load_value(u32 *a) { - int ret; + return __atomic_load_n(a, __ATOMIC_ACQUIRE); +} + +static int kick_tx_with_check(struct xsk_socket_info *xsk) +{ + int ret, cons_delta; + u32 prev_cons; + prev_cons = load_value(xsk->tx.consumer); ret = sendto(xsk_socket__fd(xsk->xsk), NULL, 0, MSG_DONTWAIT, NULL, 0); + cons_delta = load_value(xsk->tx.consumer) - prev_cons; + if (cons_delta != MAX_TX_BUDGET_DEFAULT) + return TEST_FAILURE; + + return ret; +} + +static int kick_tx(struct xsk_socket_info *xsk, bool check_cons) +{ + u32 ready_to_send = load_value(xsk->tx.producer) - load_value(xsk->tx.consumer); + int ret; + + if (!check_cons || ready_to_send <= MAX_TX_BUDGET_DEFAULT) + ret = sendto(xsk_socket__fd(xsk->xsk), NULL, 0, MSG_DONTWAIT, NULL, 0); + else + ret = kick_tx_with_check(xsk); if (ret >= 0) return TEST_PASS; if (errno == ENOBUFS || errno == EAGAIN || errno == EBUSY || errno == ENETDOWN) { @@ -1116,14 +1141,14 @@ static int kick_rx(struct xsk_socket_info *xsk) return TEST_PASS; } -static int complete_pkts(struct xsk_socket_info *xsk, int batch_size) +static int complete_pkts(struct xsk_socket_info *xsk, int batch_size, bool check_cons) { unsigned int rcvd; u32 idx; int ret; if (xsk_ring_prod__needs_wakeup(&xsk->tx)) { - ret = kick_tx(xsk); + ret = kick_tx(xsk, check_cons); if (ret) return TEST_FAILURE; } @@ -1323,7 +1348,17 @@ static int receive_pkts(struct test_spec *test) return TEST_PASS; } -static int __send_pkts(struct ifobject *ifobject, struct xsk_socket_info *xsk, bool timeout) +bool check_consumer(struct test_spec *test) +{ + if (test->mode & TEST_MODE_ZC || + !strncmp("STAT_TX_INVALID", test->name, MAX_TEST_NAME_SIZE)) + return false; + + return true; +} + +static int __send_pkts(struct test_spec *test, struct ifobject *ifobject, + struct xsk_socket_info *xsk, bool timeout) { u32 i, idx = 0, valid_pkts = 0, valid_frags = 0, buffer_len; struct pkt_stream *pkt_stream = xsk->pkt_stream; @@ -1336,7 +1371,7 @@ static int __send_pkts(struct ifobject *ifobject, struct xsk_socket_info *xsk, b /* pkts_in_flight might be negative if many invalid packets are sent */ if (pkts_in_flight >= (int)((umem_size(umem) - xsk->batch_size * buffer_len) / buffer_len)) { - ret = kick_tx(xsk); + ret = kick_tx(xsk, check_consumer(test)); if (ret) return TEST_FAILURE; return TEST_CONTINUE; @@ -1365,7 +1400,7 @@ static int __send_pkts(struct ifobject *ifobject, struct xsk_socket_info *xsk, b } } - complete_pkts(xsk, xsk->batch_size); + complete_pkts(xsk, xsk->batch_size, check_consumer(test)); } for (i = 0; i < xsk->batch_size; i++) { @@ -1437,7 +1472,7 @@ static int __send_pkts(struct ifobject *ifobject, struct xsk_socket_info *xsk, b } if (!timeout) { - if (complete_pkts(xsk, i)) + if (complete_pkts(xsk, i, check_consumer(test))) return TEST_FAILURE; usleep(10); @@ -1447,7 +1482,7 @@ static int __send_pkts(struct ifobject *ifobject, struct xsk_socket_info *xsk, b return TEST_CONTINUE; } -static int wait_for_tx_completion(struct xsk_socket_info *xsk) +static int wait_for_tx_completion(struct xsk_socket_info *xsk, bool check_cons) { struct timeval tv_end, tv_now, tv_timeout = {THREAD_TMOUT, 0}; int ret; @@ -1466,7 +1501,7 @@ static int wait_for_tx_completion(struct xsk_socket_info *xsk) return TEST_FAILURE; } - complete_pkts(xsk, xsk->batch_size); + complete_pkts(xsk, xsk->batch_size, check_cons); } return TEST_PASS; @@ -1492,7 +1527,7 @@ static int send_pkts(struct test_spec *test, struct ifobject *ifobject) __set_bit(i, bitmap); continue; } - ret = __send_pkts(ifobject, &ifobject->xsk_arr[i], timeout); + ret = __send_pkts(test, ifobject, &ifobject->xsk_arr[i], timeout); if (ret == TEST_CONTINUE && !test->fail) continue; @@ -1502,7 +1537,8 @@ static int send_pkts(struct test_spec *test, struct ifobject *ifobject) if (ret == TEST_PASS && timeout) return ret; - ret = wait_for_tx_completion(&ifobject->xsk_arr[i]); + ret = wait_for_tx_completion(&ifobject->xsk_arr[i], + check_consumer(test)); if (ret) return TEST_FAILURE; } -- 2.41.3