From: Jason Xing Add a new socket option to provide an alternative to achieve a higher overall throughput with the rest of series applied. As the corresponding documentataion I added says, it might increase the latency because the heavy allocation cannot be avoided especially when the shortage of memory occurs. So this patch don't turn this feature as default. Add generic_xmit_batch to tertermine how many descriptors are handled at one time. It shouldn't be larger than max_tx_budget or smaller than one that is the default value (disabling batch mode). Introduce skb_cache when setting setsockopt with xs->mutex protection to store newly allocated skbs at one time. Introduce desc_cache to temporarily cache what descriptors the xsk is about to send each round. Signed-off-by: Jason Xing --- Documentation/networking/af_xdp.rst | 17 +++++++++++ include/net/xdp_sock.h | 7 +++++ include/uapi/linux/if_xdp.h | 1 + net/xdp/xsk.c | 47 +++++++++++++++++++++++++++++ tools/include/uapi/linux/if_xdp.h | 1 + 5 files changed, 73 insertions(+) diff --git a/Documentation/networking/af_xdp.rst b/Documentation/networking/af_xdp.rst index 50d92084a49c..7a8d219efe71 100644 --- a/Documentation/networking/af_xdp.rst +++ b/Documentation/networking/af_xdp.rst @@ -447,6 +447,23 @@ mode to allow application to tune the per-socket maximum iteration for better throughput and less frequency of send syscall. Allowed range is [32, xs->tx->nentries]. +XDP_GENERIC_XMIT_BATCH +---------------------- + +It provides an option that allows application to use batch xmit in the copy +mode. Batch process tries to allocate a certain number skbs through bulk +mechanism first and then initialize them and finally send them out at one +time. +It applies efficient bulk allocation/deallocation function, avoid frequently +grabbing/releasing a few locks (like cache lock and queue lock), minimizing +triggering IRQs from the driver side, which generally gain the overall +performance improvement as observed by xdpsock benchmark. +Potential side effect is that it might increase the latency of per packet +due to memory allocation that is unavoidable and time-consuming. +Setting a relatively large value of batch size could benifit for scenarios +like bulk transmission. The maximum value shouldn't be larger than +xs->max_tx_budget. + XDP_STATISTICS getsockopt ------------------------- diff --git a/include/net/xdp_sock.h b/include/net/xdp_sock.h index 23e8861e8b25..965cab9a0465 100644 --- a/include/net/xdp_sock.h +++ b/include/net/xdp_sock.h @@ -45,6 +45,12 @@ struct xsk_map { struct xdp_sock __rcu *xsk_map[]; }; +struct xsk_batch { + u32 generic_xmit_batch; + struct sk_buff **skb_cache; + struct xdp_desc *desc_cache; +}; + struct xdp_sock { /* struct sock must be the first member of struct xdp_sock */ struct sock sk; @@ -89,6 +95,7 @@ struct xdp_sock { struct mutex mutex; struct xsk_queue *fq_tmp; /* Only as tmp storage before bind */ struct xsk_queue *cq_tmp; /* Only as tmp storage before bind */ + struct xsk_batch batch; }; /* diff --git a/include/uapi/linux/if_xdp.h b/include/uapi/linux/if_xdp.h index 23a062781468..44cb72cd328e 100644 --- a/include/uapi/linux/if_xdp.h +++ b/include/uapi/linux/if_xdp.h @@ -80,6 +80,7 @@ struct xdp_mmap_offsets { #define XDP_STATISTICS 7 #define XDP_OPTIONS 8 #define XDP_MAX_TX_SKB_BUDGET 9 +#define XDP_GENERIC_XMIT_BATCH 10 struct xdp_umem_reg { __u64 addr; /* Start of packet data area */ diff --git a/net/xdp/xsk.c b/net/xdp/xsk.c index 6149f6a79897..6122db8606fe 100644 --- a/net/xdp/xsk.c +++ b/net/xdp/xsk.c @@ -1218,6 +1218,16 @@ static void xsk_delete_from_maps(struct xdp_sock *xs) } } +static void xsk_batch_reset(struct xsk_batch *batch, struct sk_buff **skbs, + struct xdp_desc *descs, unsigned int size) +{ + kfree(batch->skb_cache); + kvfree(batch->desc_cache); + batch->skb_cache = skbs; + batch->desc_cache = descs; + batch->generic_xmit_batch = size; +} + static int xsk_release(struct socket *sock) { struct sock *sk = sock->sk; @@ -1247,6 +1257,7 @@ static int xsk_release(struct socket *sock) xskq_destroy(xs->tx); xskq_destroy(xs->fq_tmp); xskq_destroy(xs->cq_tmp); + xsk_batch_reset(&xs->batch, NULL, NULL, 0); sock_orphan(sk); sock->sk = NULL; @@ -1588,6 +1599,42 @@ static int xsk_setsockopt(struct socket *sock, int level, int optname, WRITE_ONCE(xs->max_tx_budget, budget); return 0; } + case XDP_GENERIC_XMIT_BATCH: + { + struct xsk_buff_pool *pool = xs->pool; + struct xsk_batch *batch = &xs->batch; + struct xdp_desc *descs; + struct sk_buff **skbs; + unsigned int size; + int ret = 0; + + if (optlen != sizeof(size)) + return -EINVAL; + if (copy_from_sockptr(&size, optval, sizeof(size))) + return -EFAULT; + if (size == batch->generic_xmit_batch) + return 0; + if (!size || size > xs->max_tx_budget || !pool) + return -EACCES; + + mutex_lock(&xs->mutex); + skbs = kmalloc(size * sizeof(struct sk_buff *), GFP_KERNEL); + if (!skbs) { + ret = -ENOMEM; + goto out; + } + descs = kvcalloc(size, sizeof(struct xdp_desc), GFP_KERNEL); + if (!descs) { + kfree(skbs); + ret = -ENOMEM; + goto out; + } + + xsk_batch_reset(batch, skbs, descs, size); +out: + mutex_unlock(&xs->mutex); + return ret; + } default: break; } diff --git a/tools/include/uapi/linux/if_xdp.h b/tools/include/uapi/linux/if_xdp.h index 23a062781468..44cb72cd328e 100644 --- a/tools/include/uapi/linux/if_xdp.h +++ b/tools/include/uapi/linux/if_xdp.h @@ -80,6 +80,7 @@ struct xdp_mmap_offsets { #define XDP_STATISTICS 7 #define XDP_OPTIONS 8 #define XDP_MAX_TX_SKB_BUDGET 9 +#define XDP_GENERIC_XMIT_BATCH 10 struct xdp_umem_reg { __u64 addr; /* Start of packet data area */ -- 2.41.3 From: Jason Xing To avoid reinvent the wheel, the patch provides a way to let batch feature reuse xsk_build_skb() as the rest process of the whole initialization just after the skb is allocated. The original xsk_build_skb() itself allocates a new skb by calling sock_alloc_send_skb whether in copy mode or zerocopy mode. Add a new parameter allocated skb to let other callers to pass an already allocated skb to support later xmit batch feature. It replaces the previous allocation of memory function with a bulk one. Signed-off-by: Jason Xing --- include/net/xdp_sock.h | 3 +++ net/xdp/xsk.c | 23 ++++++++++++++++------- 2 files changed, 19 insertions(+), 7 deletions(-) diff --git a/include/net/xdp_sock.h b/include/net/xdp_sock.h index 965cab9a0465..90c709fd1239 100644 --- a/include/net/xdp_sock.h +++ b/include/net/xdp_sock.h @@ -133,6 +133,9 @@ int xsk_generic_rcv(struct xdp_sock *xs, struct xdp_buff *xdp); int __xsk_map_redirect(struct xdp_sock *xs, struct xdp_buff *xdp); void __xsk_map_flush(struct list_head *flush_list); INDIRECT_CALLABLE_DECLARE(void xsk_destruct_skb(struct sk_buff *)); +struct sk_buff *xsk_build_skb(struct xdp_sock *xs, + struct sk_buff *allocated_skb, + struct xdp_desc *desc); /** * xsk_tx_metadata_to_compl - Save enough relevant metadata information diff --git a/net/xdp/xsk.c b/net/xdp/xsk.c index 6122db8606fe..ecd5b9c424b8 100644 --- a/net/xdp/xsk.c +++ b/net/xdp/xsk.c @@ -718,6 +718,7 @@ static int xsk_skb_metadata(struct sk_buff *skb, void *buffer, } static struct sk_buff *xsk_build_skb_zerocopy(struct xdp_sock *xs, + struct sk_buff *allocated_skb, struct xdp_desc *desc) { struct xsk_buff_pool *pool = xs->pool; @@ -734,7 +735,10 @@ static struct sk_buff *xsk_build_skb_zerocopy(struct xdp_sock *xs, if (!skb) { hr = max(NET_SKB_PAD, L1_CACHE_ALIGN(xs->dev->needed_headroom)); - skb = sock_alloc_send_skb(&xs->sk, hr, 1, &err); + if (!allocated_skb) + skb = sock_alloc_send_skb(&xs->sk, hr, 1, &err); + else + skb = allocated_skb; if (unlikely(!skb)) return ERR_PTR(err); @@ -799,15 +803,16 @@ static struct sk_buff *xsk_build_skb_zerocopy(struct xdp_sock *xs, return skb; } -static struct sk_buff *xsk_build_skb(struct xdp_sock *xs, - struct xdp_desc *desc) +struct sk_buff *xsk_build_skb(struct xdp_sock *xs, + struct sk_buff *allocated_skb, + struct xdp_desc *desc) { struct net_device *dev = xs->dev; struct sk_buff *skb = xs->skb; int err; if (dev->priv_flags & IFF_TX_SKB_NO_LINEAR) { - skb = xsk_build_skb_zerocopy(xs, desc); + skb = xsk_build_skb_zerocopy(xs, allocated_skb, desc); if (IS_ERR(skb)) { err = PTR_ERR(skb); skb = NULL; @@ -822,8 +827,12 @@ static struct sk_buff *xsk_build_skb(struct xdp_sock *xs, if (!skb) { hr = max(NET_SKB_PAD, L1_CACHE_ALIGN(dev->needed_headroom)); - tr = dev->needed_tailroom; - skb = sock_alloc_send_skb(&xs->sk, hr + len + tr, 1, &err); + if (!allocated_skb) { + tr = dev->needed_tailroom; + skb = sock_alloc_send_skb(&xs->sk, hr + len + tr, 1, &err); + } else { + skb = allocated_skb; + } if (unlikely(!skb)) goto free_err; @@ -943,7 +952,7 @@ static int __xsk_generic_xmit(struct sock *sk) goto out; } - skb = xsk_build_skb(xs, &desc); + skb = xsk_build_skb(xs, NULL, &desc); if (IS_ERR(skb)) { err = PTR_ERR(skb); if (err != -EOVERFLOW) -- 2.41.3 From: Jason Xing Support allocating and building skbs in batch. There are three steps for one batched allocation: 1. Reserve the skb and count the skb->truesize. It provides a way that for later patch to speed up small data transmission by diminishing the impact of kmalloc_reserve(). 2. Add the total of truesize to sk_wmem_alloc at one time. The load and store of sk_wmem_alloc is time-consuming, so this batch process makes it gain the performance improvement. 3. Copy data and then finish initialization of each skb. This patch uses kmem_cache_alloc_bulk() to complete the batch allocation which relies on the global common cache 'net_hotdata.skbuff_cache'. Use a xsk standalone skb cache (namely, xs->skb_cache) to store allocated skbs instead of resorting to napi_alloc_cache that was designed for softirq condition. After allocating memory for each of skbs, in a 'for' loop, the patch borrows part of __alloc_skb() to initialize skb and then calls xsk_build_skb() to complete the rest of initialization process, like copying data and stuff. To achieve a better result, the allocation function only uses the function we need to keep it super clean, like skb_set_owner_w() that is simplified into two lines of codes. Add batch.send_queue and use the skb->list to make skbs into one chain so that they can be easily sent which is shown in the subsequent patches. In terms of freeing skbs process, napi_consume_skb() in the tx completion would put the skb into global cache 'net_hotdata.skbuff_cache' that implements the deferred freeing skb feature to avoid freeing skb one by one to improve the performance. Signed-off-by: Jason Xing --- include/net/xdp_sock.h | 3 + net/core/skbuff.c | 121 +++++++++++++++++++++++++++++++++++++++++ net/xdp/xsk.c | 7 +++ 3 files changed, 131 insertions(+) diff --git a/include/net/xdp_sock.h b/include/net/xdp_sock.h index 90c709fd1239..84f0aee3fb10 100644 --- a/include/net/xdp_sock.h +++ b/include/net/xdp_sock.h @@ -47,8 +47,10 @@ struct xsk_map { struct xsk_batch { u32 generic_xmit_batch; + unsigned int skb_count; struct sk_buff **skb_cache; struct xdp_desc *desc_cache; + struct sk_buff_head send_queue; }; struct xdp_sock { @@ -136,6 +138,7 @@ INDIRECT_CALLABLE_DECLARE(void xsk_destruct_skb(struct sk_buff *)); struct sk_buff *xsk_build_skb(struct xdp_sock *xs, struct sk_buff *allocated_skb, struct xdp_desc *desc); +int xsk_alloc_batch_skb(struct xdp_sock *xs, u32 nb_pkts, u32 nb_descs, int *err); /** * xsk_tx_metadata_to_compl - Save enough relevant metadata information diff --git a/net/core/skbuff.c b/net/core/skbuff.c index 4045d7c484a1..f29cecacd8bb 100644 --- a/net/core/skbuff.c +++ b/net/core/skbuff.c @@ -83,6 +83,7 @@ #include #include #include +#include #include #include @@ -647,6 +648,126 @@ static void *kmalloc_reserve(unsigned int *size, gfp_t flags, int node, return obj; } +#ifdef CONFIG_XDP_SOCKETS +int xsk_alloc_batch_skb(struct xdp_sock *xs, u32 nb_pkts, u32 nb_descs, int *err) +{ + struct xsk_batch *batch = &xs->batch; + struct xdp_desc *descs = batch->desc_cache; + struct sk_buff **skbs = batch->skb_cache; + u32 alloc_descs, base_len, wmem, sndbuf; + gfp_t gfp_mask = xs->sk.sk_allocation; + u32 skb_count = batch->skb_count; + struct net_device *dev = xs->dev; + unsigned int total_truesize = 0; + struct sk_buff *skb = NULL; + int node = NUMA_NO_NODE; + u32 i = 0, j, k = 0; + bool need_alloc; + u8 *data; + + base_len = max(NET_SKB_PAD, L1_CACHE_ALIGN(dev->needed_headroom)); + if (!(dev->priv_flags & IFF_TX_SKB_NO_LINEAR)) + base_len += dev->needed_tailroom; + + if (xs->skb) + nb_pkts--; + + if (skb_count >= nb_pkts) + goto alloc_data; + + skb_count += kmem_cache_alloc_bulk(net_hotdata.skbuff_cache, + gfp_mask, + nb_pkts - skb_count, + (void **)&skbs[skb_count]); + if (skb_count < nb_pkts) + nb_pkts = skb_count; + +alloc_data: + /* + * Phase 1: Allocate data buffers and initialize SKBs. + * Pre-scan descriptors to determine packet boundaries, so we can + * batch the sk_wmem_alloc charge in Phase 2. + */ + need_alloc = !xs->skb; + wmem = sk_wmem_alloc_get(&xs->sk); + sndbuf = READ_ONCE(xs->sk.sk_sndbuf); + for (j = 0; j < nb_descs; j++) { + if (need_alloc) { + u32 size = base_len; + + if (!(dev->priv_flags & IFF_TX_SKB_NO_LINEAR)) + size += descs[j].len; + + if (i >= nb_pkts) { + *err = -EAGAIN; + break; + } + + if (wmem + size + total_truesize > sndbuf) { + *err = -EAGAIN; + break; + } + + skb = skbs[skb_count - 1 - i]; + skbuff_clear(skb); + data = kmalloc_reserve(&size, gfp_mask, node, skb); + if (unlikely(!data)) { + *err = -ENOBUFS; + break; + } + __finalize_skb_around(skb, data, size); + /* Replace skb_set_owner_w() with the following */ + skb->sk = &xs->sk; + skb->destructor = sock_wfree; + total_truesize += skb->truesize; + i++; + need_alloc = false; + } + if (!xp_mb_desc(&descs[j])) + need_alloc = true; + } + alloc_descs = j; + + /* + * Phase 2: Batch charge sk_wmem_alloc. + * One refcount_add() replaces N per-SKB skb_set_owner_w() calls, + * which gains much performance improvement. + */ + if (total_truesize) + refcount_add(total_truesize, &xs->sk.sk_wmem_alloc); + + /* Phase 3: Build SKBs with packet data */ + for (j = 0; j < alloc_descs; j++) { + if (!xs->skb) { + skb = skbs[skb_count - 1 - k]; + k++; + } + + skb = xsk_build_skb(xs, skb, &descs[j]); + if (IS_ERR(skb)) { + *err = PTR_ERR(skb); + break; + } + + if (xp_mb_desc(&descs[j])) { + xs->skb = skb; + continue; + } + + xs->skb = NULL; + __skb_queue_tail(&batch->send_queue, skb); + } + + /* Phase 4: Reclaim unused allocated SKBs */ + while (k < i) + kfree_skb(skbs[skb_count - 1 - k++]); + + batch->skb_count = skb_count - i; + + return j; +} +#endif + /* Allocate a new skbuff. We do this ourselves so we can fill in a few * 'private' fields and also do memory statistics to find all the * [BEEP] leaks. diff --git a/net/xdp/xsk.c b/net/xdp/xsk.c index ecd5b9c424b8..f97bc9cf9b9a 100644 --- a/net/xdp/xsk.c +++ b/net/xdp/xsk.c @@ -25,6 +25,7 @@ #include #include #include +#include #include #include #include @@ -1230,10 +1231,15 @@ static void xsk_delete_from_maps(struct xdp_sock *xs) static void xsk_batch_reset(struct xsk_batch *batch, struct sk_buff **skbs, struct xdp_desc *descs, unsigned int size) { + if (batch->skb_count) + kmem_cache_free_bulk(net_hotdata.skbuff_cache, + batch->skb_count, + (void **)batch->skb_cache); kfree(batch->skb_cache); kvfree(batch->desc_cache); batch->skb_cache = skbs; batch->desc_cache = descs; + batch->skb_count = 0; batch->generic_xmit_batch = size; } @@ -1946,6 +1952,7 @@ static int xsk_create(struct net *net, struct socket *sock, int protocol, INIT_LIST_HEAD(&xs->map_list); spin_lock_init(&xs->map_list_lock); + __skb_queue_head_init(&xs->batch.send_queue); mutex_lock(&net->xdp.lock); sk_add_node_rcu(sk, &net->xdp.list); -- 2.41.3 From: Jason Xing It's beneficial for small data transmission. Replace per-SKB kmalloc_reserve() with on-demand bulk allocation from skb_small_head_cache for small packets. Add a persistent per-socket data buffer cache (batch.data_cache / batch.data_count) that survives across batch cycles, similar to how batch.send_queue caches built SKBs. Inside the Phase-1 per-descriptor loop, when a small packet needs a data buffer and the cache is empty, a single kmem_cache_alloc_bulk() refills it with generic_xmit_batch objects. Subsequent small packets pop directly from the cache. Large packets bypass the cache entirely and fall back to kmalloc_reserve(). Unused buffers remain in the cache for the next batch. I observed that kmalloc_reserve() consumes nearly 40% which seems unavoidable at the first glance, thinking adding the bulk mechanism should contribute to the performance. That's the motivation of this patch. Now, the feature gives us around 10% improvement. Signed-off-by: Jason Xing --- include/net/xdp_sock.h | 2 ++ net/core/skbuff.c | 27 ++++++++++++++++++++++----- net/xdp/xsk.c | 24 ++++++++++++++++++++---- 3 files changed, 44 insertions(+), 9 deletions(-) diff --git a/include/net/xdp_sock.h b/include/net/xdp_sock.h index 84f0aee3fb10..2151aab8f0a1 100644 --- a/include/net/xdp_sock.h +++ b/include/net/xdp_sock.h @@ -51,6 +51,8 @@ struct xsk_batch { struct sk_buff **skb_cache; struct xdp_desc *desc_cache; struct sk_buff_head send_queue; + unsigned int data_count; + void **data_cache; }; struct xdp_sock { diff --git a/net/core/skbuff.c b/net/core/skbuff.c index f29cecacd8bb..5726b1566b2b 100644 --- a/net/core/skbuff.c +++ b/net/core/skbuff.c @@ -661,9 +661,11 @@ int xsk_alloc_batch_skb(struct xdp_sock *xs, u32 nb_pkts, u32 nb_descs, int *err unsigned int total_truesize = 0; struct sk_buff *skb = NULL; int node = NUMA_NO_NODE; + void **dc = batch->data_cache; + unsigned int dc_count = batch->data_count; u32 i = 0, j, k = 0; bool need_alloc; - u8 *data; + void *data; base_len = max(NET_SKB_PAD, L1_CACHE_ALIGN(dev->needed_headroom)); if (!(dev->priv_flags & IFF_TX_SKB_NO_LINEAR)) @@ -683,6 +685,13 @@ int xsk_alloc_batch_skb(struct xdp_sock *xs, u32 nb_pkts, u32 nb_descs, int *err nb_pkts = skb_count; alloc_data: + if (dc_count < nb_pkts && !(gfp_mask & KMALLOC_NOT_NORMAL_BITS)) + dc_count += kmem_cache_alloc_bulk( + net_hotdata.skb_small_head_cache, + gfp_mask | __GFP_NOMEMALLOC | __GFP_NOWARN, + batch->generic_xmit_batch - dc_count, + &dc[dc_count]); + /* * Phase 1: Allocate data buffers and initialize SKBs. * Pre-scan descriptors to determine packet boundaries, so we can @@ -710,10 +719,17 @@ int xsk_alloc_batch_skb(struct xdp_sock *xs, u32 nb_pkts, u32 nb_descs, int *err skb = skbs[skb_count - 1 - i]; skbuff_clear(skb); - data = kmalloc_reserve(&size, gfp_mask, node, skb); - if (unlikely(!data)) { - *err = -ENOBUFS; - break; + if (dc_count && + SKB_HEAD_ALIGN(size) <= SKB_SMALL_HEAD_CACHE_SIZE) { + data = dc[--dc_count]; + size = SKB_SMALL_HEAD_CACHE_SIZE; + } else { + data = kmalloc_reserve(&size, gfp_mask, + node, skb); + if (unlikely(!data)) { + *err = -ENOBUFS; + break; + } } __finalize_skb_around(skb, data, size); /* Replace skb_set_owner_w() with the following */ @@ -762,6 +778,7 @@ int xsk_alloc_batch_skb(struct xdp_sock *xs, u32 nb_pkts, u32 nb_descs, int *err while (k < i) kfree_skb(skbs[skb_count - 1 - k++]); + batch->data_count = dc_count; batch->skb_count = skb_count - i; return j; diff --git a/net/xdp/xsk.c b/net/xdp/xsk.c index f97bc9cf9b9a..7a6991bc19a8 100644 --- a/net/xdp/xsk.c +++ b/net/xdp/xsk.c @@ -1229,14 +1229,22 @@ static void xsk_delete_from_maps(struct xdp_sock *xs) } static void xsk_batch_reset(struct xsk_batch *batch, struct sk_buff **skbs, - struct xdp_desc *descs, unsigned int size) -{ + struct xdp_desc *descs, void **data, + unsigned int size) +{ + if (batch->data_count) + kmem_cache_free_bulk(net_hotdata.skb_small_head_cache, + batch->data_count, + batch->data_cache); + kfree(batch->data_cache); if (batch->skb_count) kmem_cache_free_bulk(net_hotdata.skbuff_cache, batch->skb_count, (void **)batch->skb_cache); kfree(batch->skb_cache); kvfree(batch->desc_cache); + batch->data_cache = data; + batch->data_count = 0; batch->skb_cache = skbs; batch->desc_cache = descs; batch->skb_count = 0; @@ -1272,7 +1280,7 @@ static int xsk_release(struct socket *sock) xskq_destroy(xs->tx); xskq_destroy(xs->fq_tmp); xskq_destroy(xs->cq_tmp); - xsk_batch_reset(&xs->batch, NULL, NULL, 0); + xsk_batch_reset(&xs->batch, NULL, NULL, NULL, 0); sock_orphan(sk); sock->sk = NULL; @@ -1620,6 +1628,7 @@ static int xsk_setsockopt(struct socket *sock, int level, int optname, struct xsk_batch *batch = &xs->batch; struct xdp_desc *descs; struct sk_buff **skbs; + void **data; unsigned int size; int ret = 0; @@ -1638,14 +1647,21 @@ static int xsk_setsockopt(struct socket *sock, int level, int optname, ret = -ENOMEM; goto out; } + data = kmalloc_array(size, sizeof(void *), GFP_KERNEL); + if (!data) { + kfree(skbs); + ret = -ENOMEM; + goto out; + } descs = kvcalloc(size, sizeof(struct xdp_desc), GFP_KERNEL); if (!descs) { + kfree(data); kfree(skbs); ret = -ENOMEM; goto out; } - xsk_batch_reset(batch, skbs, descs, size); + xsk_batch_reset(batch, skbs, descs, data, size); out: mutex_unlock(&xs->mutex); return ret; -- 2.41.3 From: Jason Xing Add batch xmit logic. Only grabbing the lock and disable bottom half once and sent all the aggregated packets in one loop. Via skb->list, the already built skbs can be handled one by one. Signed-off-by: Jason Xing --- include/net/xdp_sock.h | 1 + net/core/dev.c | 41 +++++++++++++++++++++++++++++++++++++++++ 2 files changed, 42 insertions(+) diff --git a/include/net/xdp_sock.h b/include/net/xdp_sock.h index 2151aab8f0a1..0609e3b04279 100644 --- a/include/net/xdp_sock.h +++ b/include/net/xdp_sock.h @@ -141,6 +141,7 @@ struct sk_buff *xsk_build_skb(struct xdp_sock *xs, struct sk_buff *allocated_skb, struct xdp_desc *desc); int xsk_alloc_batch_skb(struct xdp_sock *xs, u32 nb_pkts, u32 nb_descs, int *err); +int xsk_direct_xmit_batch(struct xdp_sock *xs, struct net_device *dev); /** * xsk_tx_metadata_to_compl - Save enough relevant metadata information diff --git a/net/core/dev.c b/net/core/dev.c index 4519f0e59beb..e33a2406d8ca 100644 --- a/net/core/dev.c +++ b/net/core/dev.c @@ -163,6 +163,7 @@ #include #include #include +#include #include "dev.h" #include "devmem.h" @@ -4893,6 +4894,46 @@ int __dev_queue_xmit(struct sk_buff *skb, struct net_device *sb_dev) } EXPORT_SYMBOL(__dev_queue_xmit); +int xsk_direct_xmit_batch(struct xdp_sock *xs, struct net_device *dev) +{ + u16 queue_id = xs->queue_id; + struct netdev_queue *txq = netdev_get_tx_queue(dev, queue_id); + struct sk_buff_head *send_queue = &xs->batch.send_queue; + int ret = NETDEV_TX_BUSY; + struct sk_buff *skb; + + local_bh_disable(); + HARD_TX_LOCK(dev, txq, smp_processor_id()); + while ((skb = __skb_dequeue(send_queue)) != NULL) { + struct sk_buff *orig_skb = skb; + bool again = false; + + skb = validate_xmit_skb_list(skb, dev, &again); + if (skb != orig_skb) { + dev_core_stats_tx_dropped_inc(dev); + kfree_skb_list(skb); + ret = NET_XMIT_DROP; + break; + } + + if (netif_xmit_frozen_or_drv_stopped(txq)) { + __skb_queue_head(send_queue, skb); + break; + } + skb_set_queue_mapping(skb, queue_id); + ret = netdev_start_xmit(skb, dev, txq, false); + if (ret != NETDEV_TX_OK) { + if (ret == NETDEV_TX_BUSY) + __skb_queue_head(send_queue, skb); + break; + } + } + HARD_TX_UNLOCK(dev, txq); + local_bh_enable(); + + return ret; +} + int __dev_direct_xmit(struct sk_buff *skb, u16 queue_id) { struct net_device *dev = skb->dev; -- 2.41.3 From: Jason Xing Only set xmit.more false for the last skb. In theory, only making xmit.more false for the last packets to be sent in each round can bring much benefit like avoid triggering too many irqs. Compared to the numbers for batch mode, a huge improvement (26%) can be seen on i40e/ixgbe driver since the cost of triggering irqs is expensive. Suggested-by: Jesper Dangaard Brouer Signed-off-by: Jason Xing --- net/core/dev.c | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/net/core/dev.c b/net/core/dev.c index e33a2406d8ca..a6abd621a7f3 100644 --- a/net/core/dev.c +++ b/net/core/dev.c @@ -4901,6 +4901,7 @@ int xsk_direct_xmit_batch(struct xdp_sock *xs, struct net_device *dev) struct sk_buff_head *send_queue = &xs->batch.send_queue; int ret = NETDEV_TX_BUSY; struct sk_buff *skb; + bool more = true; local_bh_disable(); HARD_TX_LOCK(dev, txq, smp_processor_id()); @@ -4920,8 +4921,12 @@ int xsk_direct_xmit_batch(struct xdp_sock *xs, struct net_device *dev) __skb_queue_head(send_queue, skb); break; } + + if (!skb_peek(send_queue)) + more = false; + skb_set_queue_mapping(skb, queue_id); - ret = netdev_start_xmit(skb, dev, txq, false); + ret = netdev_start_xmit(skb, dev, txq, more); if (ret != NETDEV_TX_OK) { if (ret == NETDEV_TX_BUSY) __skb_queue_head(send_queue, skb); -- 2.41.3 From: Jason Xing This patch moves the SG check ahead, which is the only place we need to handle very carefully because either in xsk_build_skb_zerocopy() or in multi-buffer mode nr_frags (in skb_needs_linearize()) is used[1]. In most cases, for xsk, it's totally not needed to validate and check the skb in validate_xmit_skb_list() that adds numerous checks in the extremely hot path. In this kind of workload, even the overhead of mathematical operations is not trivial. Performance-wise, I run './xdpsock -i enp2s0f0np0 -t -S -s 64' on 1Gb/sec ixgbe driver to verify. It stably goes up by 5.48% [1]: https://lore.kernel.org/all/20251125115754.46793-1-kerneljasonxing@gmail.com/ Signed-off-by: Jason Xing --- net/core/dev.c | 21 ++++++++++++--------- 1 file changed, 12 insertions(+), 9 deletions(-) diff --git a/net/core/dev.c b/net/core/dev.c index a6abd621a7f3..aa38993b9dd4 100644 --- a/net/core/dev.c +++ b/net/core/dev.c @@ -4899,6 +4899,7 @@ int xsk_direct_xmit_batch(struct xdp_sock *xs, struct net_device *dev) u16 queue_id = xs->queue_id; struct netdev_queue *txq = netdev_get_tx_queue(dev, queue_id); struct sk_buff_head *send_queue = &xs->batch.send_queue; + bool need_validate = !(dev->features & NETIF_F_SG); int ret = NETDEV_TX_BUSY; struct sk_buff *skb; bool more = true; @@ -4906,15 +4907,17 @@ int xsk_direct_xmit_batch(struct xdp_sock *xs, struct net_device *dev) local_bh_disable(); HARD_TX_LOCK(dev, txq, smp_processor_id()); while ((skb = __skb_dequeue(send_queue)) != NULL) { - struct sk_buff *orig_skb = skb; - bool again = false; - - skb = validate_xmit_skb_list(skb, dev, &again); - if (skb != orig_skb) { - dev_core_stats_tx_dropped_inc(dev); - kfree_skb_list(skb); - ret = NET_XMIT_DROP; - break; + if (unlikely(need_validate)) { + struct sk_buff *orig_skb = skb; + bool again = false; + + skb = validate_xmit_skb_list(skb, dev, &again); + if (skb != orig_skb) { + dev_core_stats_tx_dropped_inc(dev); + kfree_skb_list(skb); + ret = NET_XMIT_DROP; + break; + } } if (netif_xmit_frozen_or_drv_stopped(txq)) { -- 2.41.3 From: Jason Xing Rename the last parameter to nb_descs for more accurate naming. Next patch will add a real nb_pkts parameter to help copy mode count how many pakcets are needed. No functional change here. Signed-off-by: Jason Xing --- net/xdp/xsk.c | 28 ++++++++++++++-------------- 1 file changed, 14 insertions(+), 14 deletions(-) diff --git a/net/xdp/xsk.c b/net/xdp/xsk.c index 7a6991bc19a8..6cd2e58e170c 100644 --- a/net/xdp/xsk.c +++ b/net/xdp/xsk.c @@ -485,16 +485,16 @@ EXPORT_SYMBOL(xsk_tx_peek_desc); static u32 xsk_tx_peek_release_fallback(struct xsk_buff_pool *pool, u32 max_entries) { struct xdp_desc *descs = pool->tx_descs; - u32 nb_pkts = 0; + u32 nb_descs = 0; - while (nb_pkts < max_entries && xsk_tx_peek_desc(pool, &descs[nb_pkts])) - nb_pkts++; + while (nb_descs < max_entries && xsk_tx_peek_desc(pool, &descs[nb_descs])) + nb_descs++; xsk_tx_release(pool); - return nb_pkts; + return nb_descs; } -u32 xsk_tx_peek_release_desc_batch(struct xsk_buff_pool *pool, u32 nb_pkts) +u32 xsk_tx_peek_release_desc_batch(struct xsk_buff_pool *pool, u32 nb_descs) { struct xdp_sock *xs; @@ -502,16 +502,16 @@ u32 xsk_tx_peek_release_desc_batch(struct xsk_buff_pool *pool, u32 nb_pkts) if (!list_is_singular(&pool->xsk_tx_list)) { /* Fallback to the non-batched version */ rcu_read_unlock(); - return xsk_tx_peek_release_fallback(pool, nb_pkts); + return xsk_tx_peek_release_fallback(pool, nb_descs); } xs = list_first_or_null_rcu(&pool->xsk_tx_list, struct xdp_sock, tx_list); if (!xs) { - nb_pkts = 0; + nb_descs = 0; goto out; } - nb_pkts = xskq_cons_nb_entries(xs->tx, nb_pkts); + nb_descs = xskq_cons_nb_entries(xs->tx, nb_descs); /* This is the backpressure mechanism for the Tx path. Try to * reserve space in the completion queue for all packets, but @@ -519,23 +519,23 @@ u32 xsk_tx_peek_release_desc_batch(struct xsk_buff_pool *pool, u32 nb_pkts) * packets. This avoids having to implement any buffering in * the Tx path. */ - nb_pkts = xskq_prod_nb_free(pool->cq, nb_pkts); - if (!nb_pkts) + nb_descs = xskq_prod_nb_free(pool->cq, nb_descs); + if (!nb_descs) goto out; - nb_pkts = xskq_cons_read_desc_batch(xs->tx, pool, nb_pkts); - if (!nb_pkts) { + nb_descs = xskq_cons_read_desc_batch(xs->tx, pool, nb_descs); + if (!nb_descs) { xs->tx->queue_empty_descs++; goto out; } __xskq_cons_release(xs->tx); - xskq_prod_write_addr_batch(pool->cq, pool->tx_descs, nb_pkts); + xskq_prod_write_addr_batch(pool->cq, pool->tx_descs, nb_descs); xs->sk.sk_write_space(&xs->sk); out: rcu_read_unlock(); - return nb_pkts; + return nb_descs; } EXPORT_SYMBOL(xsk_tx_peek_release_desc_batch); -- 2.41.3 From: Jason Xing Add a new parameter nb_pkts to count how many packets are needed practically by copy mode with the help of XDP_PKT_CONTD option. Add descs to provide a way to pass xs->desc_cache to store the descriptors for copy mode. Signed-off-by: Jason Xing --- net/xdp/xsk_queue.h | 20 +++++++++++++++----- 1 file changed, 15 insertions(+), 5 deletions(-) diff --git a/net/xdp/xsk_queue.h b/net/xdp/xsk_queue.h index ec08d9c102b1..354f6fe86893 100644 --- a/net/xdp/xsk_queue.h +++ b/net/xdp/xsk_queue.h @@ -263,12 +263,12 @@ static inline void parse_desc(struct xsk_queue *q, struct xsk_buff_pool *pool, parsed->mb = xp_mb_desc(desc); } -static inline -u32 xskq_cons_read_desc_batch(struct xsk_queue *q, struct xsk_buff_pool *pool, - u32 max) +static inline u32 +__xskq_cons_read_desc_batch(struct xsk_queue *q, struct xsk_buff_pool *pool, + struct xdp_desc *descs, u32 max, u32 *nb_pkts, + u32 max_segs) { u32 cached_cons = q->cached_cons, nb_entries = 0; - struct xdp_desc *descs = pool->tx_descs; u32 total_descs = 0, nr_frags = 0; /* track first entry, if stumble upon *any* invalid descriptor, rewind @@ -288,9 +288,11 @@ u32 xskq_cons_read_desc_batch(struct xsk_queue *q, struct xsk_buff_pool *pool, if (likely(!parsed.mb)) { total_descs += (nr_frags + 1); nr_frags = 0; + if (nb_pkts) + (*nb_pkts)++; } else { nr_frags++; - if (nr_frags == pool->xdp_zc_max_segs) { + if (nr_frags == max_segs) { nr_frags = 0; break; } @@ -304,6 +306,14 @@ u32 xskq_cons_read_desc_batch(struct xsk_queue *q, struct xsk_buff_pool *pool, return total_descs; } +static inline u32 +xskq_cons_read_desc_batch(struct xsk_queue *q, struct xsk_buff_pool *pool, + u32 max) +{ + return __xskq_cons_read_desc_batch(q, pool, pool->tx_descs, max, + NULL, pool->xdp_zc_max_segs); +} + /* Functions for consumers */ static inline void __xskq_cons_release(struct xsk_queue *q) -- 2.41.3 From: Jason Xing Previously it only reserves one slot. The patch extends it to n to cover the batch mode. Signed-off-by: Jason Xing --- net/xdp/xsk.c | 12 ++++++++---- net/xdp/xsk_queue.h | 12 +++++++----- 2 files changed, 15 insertions(+), 9 deletions(-) diff --git a/net/xdp/xsk.c b/net/xdp/xsk.c index 6cd2e58e170c..c26e26cb4dda 100644 --- a/net/xdp/xsk.c +++ b/net/xdp/xsk.c @@ -546,12 +546,17 @@ static int xsk_wakeup(struct xdp_sock *xs, u8 flags) return dev->netdev_ops->ndo_xsk_wakeup(dev, xs->queue_id, flags); } -static int xsk_cq_reserve_locked(struct xsk_buff_pool *pool) +/* The function tries to reserve as many descs as possible. If there + * is no single slot to allocate, return zero. Otherwise, return how + * many slots are available, even though it might stop reserving at + * certain point. + */ +static int xsk_cq_reserve_locked(struct xsk_buff_pool *pool, u32 n) { int ret; spin_lock(&pool->cq->cq_cached_prod_lock); - ret = xskq_prod_reserve(pool->cq); + ret = xskq_prod_reserve(pool->cq, n); spin_unlock(&pool->cq->cq_cached_prod_lock); return ret; @@ -947,8 +952,7 @@ static int __xsk_generic_xmit(struct sock *sk) * if there is space in it. This avoids having to implement * any buffering in the Tx path. */ - err = xsk_cq_reserve_locked(xs->pool); - if (err) { + if (!xsk_cq_reserve_locked(xs->pool, 1)) { err = -EAGAIN; goto out; } diff --git a/net/xdp/xsk_queue.h b/net/xdp/xsk_queue.h index 354f6fe86893..34cc07d6115e 100644 --- a/net/xdp/xsk_queue.h +++ b/net/xdp/xsk_queue.h @@ -413,14 +413,16 @@ static inline void xskq_prod_cancel_n(struct xsk_queue *q, u32 cnt) q->cached_prod -= cnt; } -static inline int xskq_prod_reserve(struct xsk_queue *q) +static inline int xskq_prod_reserve(struct xsk_queue *q, u32 n) { - if (xskq_prod_is_full(q)) - return -ENOSPC; + u32 nr_free = xskq_prod_nb_free(q, n); + + if (!nr_free) + return 0; /* A, matches D */ - q->cached_prod++; - return 0; + q->cached_prod += nr_free; + return nr_free; } static inline int xskq_prod_reserve_addr(struct xsk_queue *q, u64 addr) -- 2.41.3 From: Jason Xing This function __xsk_generic_xmit_batch() is the core function in batches xmit, implement a batch version of __xsk_generic_xmit(). The whole logic is divided into sections: 1. check if we have enough available slots in tx ring and completion ring. 2. read descriptors from tx ring into pool->tx_descs in batches 3. reserve enough slots in completion ring to avoid backpressure 4. allocate and build skbs in batches 5. send all the possible packets in batches at one time Signed-off-by: Jason Xing --- net/xdp/xsk.c | 116 ++++++++++++++++++++++++++++++++++++++++++++ net/xdp/xsk_queue.h | 8 +++ 2 files changed, 124 insertions(+) diff --git a/net/xdp/xsk.c b/net/xdp/xsk.c index c26e26cb4dda..e1ad2ac2b39a 100644 --- a/net/xdp/xsk.c +++ b/net/xdp/xsk.c @@ -920,6 +920,122 @@ struct sk_buff *xsk_build_skb(struct xdp_sock *xs, return ERR_PTR(err); } +static int __xsk_generic_xmit_batch(struct xdp_sock *xs) +{ + struct xsk_buff_pool *pool = xs->pool; + struct xsk_batch *batch = &xs->batch; + struct xdp_desc *descs = batch->desc_cache; + struct net_device *dev = xs->dev; + u32 max_batch, max_budget; + bool sent_frame = false; + struct sk_buff *skb; + u32 cons_descs; + int err = 0; + u32 i = 0; + + mutex_lock(&xs->mutex); + + /* Since we dropped the RCU read lock, the socket state might have changed. */ + if (unlikely(!xsk_is_bound(xs))) { + err = -ENXIO; + goto out; + } + + if (xs->queue_id >= dev->real_num_tx_queues) { + err = -ENXIO; + goto out; + } + + if (unlikely(!netif_running(dev) || !netif_carrier_ok(dev))) { + err = -ENETDOWN; + goto out; + } + + max_budget = READ_ONCE(xs->max_tx_budget); + max_batch = batch->generic_xmit_batch; + + for (i = 0; i < max_budget; i += cons_descs) { + u32 nb_pkts = 0; + u32 nb_descs; + + nb_descs = min(max_batch, max_budget - i); + nb_descs = xskq_cons_nb_entries(xs->tx, nb_descs); + if (!nb_descs) + goto out; + + /* This is the backpressure mechanism for the Tx path. Try to + * reserve space in the completion queue for all packets, but + * if there are fewer slots available, just process that many + * packets. This avoids having to implement any buffering in + * the Tx path. + */ + nb_descs = xsk_cq_reserve_locked(pool, nb_descs); + if (!nb_descs) { + err = -EAGAIN; + goto out; + } + + cons_descs = xskq_cons_read_desc_batch_copy(xs->tx, pool, descs, + nb_descs, &nb_pkts); + if (cons_descs < nb_descs) { + u32 delta = nb_descs - cons_descs; + + xsk_cq_cancel_locked(pool, delta); + xs->tx->queue_empty_descs += delta; + if (!cons_descs) { + err = -EAGAIN; + goto out; + } + nb_descs = cons_descs; + } + + cons_descs = xsk_alloc_batch_skb(xs, nb_pkts, nb_descs, &err); + /* Return 'nb_descs - cons_descs' number of descs to the + * pool if the batch allocation partially fails + */ + if (cons_descs < nb_descs) { + xskq_cons_cancel_n(xs->tx, nb_descs - cons_descs); + xsk_cq_cancel_locked(pool, nb_descs - cons_descs); + } + + if (!skb_queue_empty(&batch->send_queue)) { + int err_xmit; + + err_xmit = xsk_direct_xmit_batch(xs, dev); + if (err_xmit == NETDEV_TX_BUSY) + err = -EAGAIN; + else if (err_xmit == NET_XMIT_DROP) + err = -EBUSY; + + sent_frame = true; + } + + if (err) + goto out; + } + + /* Maximum budget of descriptors have been consumed */ + if (xskq_has_descs(xs->tx)) + err = -EAGAIN; + +out: + if (xs->skb) + xsk_drop_skb(xs->skb); + + /* If send_queue has more pending skbs, we must to clear + * the rest of them. + */ + while ((skb = __skb_dequeue(&batch->send_queue)) != NULL) { + xskq_cons_cancel_n(xs->tx, xsk_get_num_desc(skb)); + xsk_consume_skb(skb); + } + if (sent_frame) + __xsk_tx_release(xs); + + mutex_unlock(&xs->mutex); + return err; +} + static int __xsk_generic_xmit(struct sock *sk) { struct xdp_sock *xs = xdp_sk(sk); diff --git a/net/xdp/xsk_queue.h b/net/xdp/xsk_queue.h index 34cc07d6115e..c3b97c6f2910 100644 --- a/net/xdp/xsk_queue.h +++ b/net/xdp/xsk_queue.h @@ -314,6 +314,14 @@ xskq_cons_read_desc_batch(struct xsk_queue *q, struct xsk_buff_pool *pool, NULL, pool->xdp_zc_max_segs); } +static inline u32 +xskq_cons_read_desc_batch_copy(struct xsk_queue *q, struct xsk_buff_pool *pool, + struct xdp_desc *descs, u32 max, u32 *nb_pkts) +{ + return __xskq_cons_read_desc_batch(q, pool, descs, max, + nb_pkts, MAX_SKB_FRAGS); +} + /* Functions for consumers */ static inline void __xskq_cons_release(struct xsk_queue *q) -- 2.41.3 From: Jason Xing perf c2c profiling of the AF_XDP generic-copy batch TX path reveals that ~45% of all cache-line contention (HITM) comes from a single cacheline inside struct xsk_buff_pool. The sendmsg CPU reads pool geometry fields (addrs, chunk_size, headroom, tx_metadata_len, etc.) in the validate and build hot path, while the NAPI TX-completion CPU writes cq_prod_lock (via xsk_destruct_skb -> xsk_cq_submit_addr_locked) and cached_need_wakeup (via xsk_set/clear_tx_need_wakeup) on the same cacheline—classic false sharing. This adds one extra cacheline (64 bytes) to the per-pool allocation but eliminates cross-CPU false sharing between the TX sendmsg and TX completion paths. This reorganization improves overall performance by 5-6%, which can be captured by xdpsock. After this, the only one hotpot is 6% refcount process, which has already been batched to minimize the impact in the series. Signed-off-by: Jason Xing --- include/net/xsk_buff_pool.h | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/include/net/xsk_buff_pool.h b/include/net/xsk_buff_pool.h index ccb3b350001f..b1b11e3aa273 100644 --- a/include/net/xsk_buff_pool.h +++ b/include/net/xsk_buff_pool.h @@ -73,23 +73,27 @@ struct xsk_buff_pool { u64 addrs_cnt; u32 free_list_cnt; u32 dma_pages_cnt; - u32 free_heads_cnt; + + /* Read-mostly fields */ u32 headroom; u32 chunk_size; u32 chunk_shift; u32 frame_len; u32 xdp_zc_max_segs; u8 tx_metadata_len; /* inherited from umem */ - u8 cached_need_wakeup; bool uses_need_wakeup; bool unaligned; bool tx_sw_csum; void *addrs; + + /* Write-heavy fields */ /* Mutual exclusion of the completion ring in the SKB mode. * Protect: NAPI TX thread and sendmsg error paths in the SKB * destructor callback. */ - spinlock_t cq_prod_lock; + spinlock_t cq_prod_lock ____cacheline_aligned_in_smp; + u8 cached_need_wakeup; + u32 free_heads_cnt; struct xdp_buff_xsk *free_heads[]; }; -- 2.41.3 From: Jason Xing Add a new helper xsk_init_batch() used in xsk_create() with the default value 1. Obsolete __xsk_generic_xmit. Signed-off-by: Jason Xing --- net/xdp/xsk.c | 151 +++++++++++++------------------------------------- 1 file changed, 37 insertions(+), 114 deletions(-) diff --git a/net/xdp/xsk.c b/net/xdp/xsk.c index e1ad2ac2b39a..be341290e42c 100644 --- a/net/xdp/xsk.c +++ b/net/xdp/xsk.c @@ -1036,101 +1036,14 @@ static int __xsk_generic_xmit_batch(struct xdp_sock *xs) return err; } -static int __xsk_generic_xmit(struct sock *sk) -{ - struct xdp_sock *xs = xdp_sk(sk); - bool sent_frame = false; - struct xdp_desc desc; - struct sk_buff *skb; - u32 max_batch; - int err = 0; - - mutex_lock(&xs->mutex); - - /* Since we dropped the RCU read lock, the socket state might have changed. */ - if (unlikely(!xsk_is_bound(xs))) { - err = -ENXIO; - goto out; - } - - if (xs->queue_id >= xs->dev->real_num_tx_queues) - goto out; - - max_batch = READ_ONCE(xs->max_tx_budget); - while (xskq_cons_peek_desc(xs->tx, &desc, xs->pool)) { - if (max_batch-- == 0) { - err = -EAGAIN; - goto out; - } - - /* This is the backpressure mechanism for the Tx path. - * Reserve space in the completion queue and only proceed - * if there is space in it. This avoids having to implement - * any buffering in the Tx path. - */ - if (!xsk_cq_reserve_locked(xs->pool, 1)) { - err = -EAGAIN; - goto out; - } - - skb = xsk_build_skb(xs, NULL, &desc); - if (IS_ERR(skb)) { - err = PTR_ERR(skb); - if (err != -EOVERFLOW) - goto out; - err = 0; - continue; - } - - xskq_cons_release(xs->tx); - - if (xp_mb_desc(&desc)) { - xs->skb = skb; - continue; - } - - err = __dev_direct_xmit(skb, xs->queue_id); - if (err == NETDEV_TX_BUSY) { - /* Tell user-space to retry the send */ - xskq_cons_cancel_n(xs->tx, xsk_get_num_desc(skb)); - xsk_consume_skb(skb); - err = -EAGAIN; - goto out; - } - - /* Ignore NET_XMIT_CN as packet might have been sent */ - if (err == NET_XMIT_DROP) { - /* SKB completed but not sent */ - err = -EBUSY; - xs->skb = NULL; - goto out; - } - - sent_frame = true; - xs->skb = NULL; - } - - if (xskq_has_descs(xs->tx)) { - if (xs->skb) - xsk_drop_skb(xs->skb); - xskq_cons_release(xs->tx); - } - -out: - if (sent_frame) - __xsk_tx_release(xs); - - mutex_unlock(&xs->mutex); - return err; -} - static int xsk_generic_xmit(struct sock *sk) { + struct xdp_sock *xs = xdp_sk(sk); int ret; /* Drop the RCU lock since the SKB path might sleep. */ rcu_read_unlock(); - ret = __xsk_generic_xmit(sk); + ret = __xsk_generic_xmit_batch(xs); /* Reaquire RCU lock before going into common code. */ rcu_read_lock(); @@ -1626,6 +1539,34 @@ struct xdp_umem_reg_v1 { __u32 headroom; }; +static int xsk_init_batch(struct xsk_batch *batch, unsigned int size) +{ + struct xdp_desc *descs; + struct sk_buff **skbs; + void **data; + + skbs = kmalloc(size * sizeof(struct sk_buff *), GFP_KERNEL); + if (!skbs) + return -ENOMEM; + + data = kmalloc_array(size, sizeof(void *), GFP_KERNEL); + if (!data) { + kfree(skbs); + return -ENOMEM; + } + + descs = kvcalloc(size, sizeof(struct xdp_desc), GFP_KERNEL); + if (!descs) { + kfree(data); + kfree(skbs); + return -ENOMEM; + } + + xsk_batch_reset(batch, skbs, descs, data, size); + + return 0; +} + static int xsk_setsockopt(struct socket *sock, int level, int optname, sockptr_t optval, unsigned int optlen) { @@ -1746,9 +1687,6 @@ static int xsk_setsockopt(struct socket *sock, int level, int optname, { struct xsk_buff_pool *pool = xs->pool; struct xsk_batch *batch = &xs->batch; - struct xdp_desc *descs; - struct sk_buff **skbs; - void **data; unsigned int size; int ret = 0; @@ -1762,27 +1700,7 @@ static int xsk_setsockopt(struct socket *sock, int level, int optname, return -EACCES; mutex_lock(&xs->mutex); - skbs = kmalloc(size * sizeof(struct sk_buff *), GFP_KERNEL); - if (!skbs) { - ret = -ENOMEM; - goto out; - } - data = kmalloc_array(size, sizeof(void *), GFP_KERNEL); - if (!data) { - kfree(skbs); - ret = -ENOMEM; - goto out; - } - descs = kvcalloc(size, sizeof(struct xdp_desc), GFP_KERNEL); - if (!descs) { - kfree(data); - kfree(skbs); - ret = -ENOMEM; - goto out; - } - - xsk_batch_reset(batch, skbs, descs, data, size); -out: + ret = xsk_init_batch(batch, size); mutex_unlock(&xs->mutex); return ret; } @@ -2056,6 +1974,7 @@ static int xsk_create(struct net *net, struct socket *sock, int protocol, { struct xdp_sock *xs; struct sock *sk; + int ret; if (!ns_capable(net->user_ns, CAP_NET_RAW)) return -EPERM; @@ -2071,6 +1990,11 @@ static int xsk_create(struct net *net, struct socket *sock, int protocol, if (!sk) return -ENOBUFS; + xs = xdp_sk(sk); + ret = xsk_init_batch(&xs->batch, 1); + if (ret) + return ret; + sock->ops = &xsk_proto_ops; sock_init_data(sock, sk); @@ -2081,7 +2005,6 @@ static int xsk_create(struct net *net, struct socket *sock, int protocol, sock_set_flag(sk, SOCK_RCU_FREE); - xs = xdp_sk(sk); xs->state = XSK_READY; xs->max_tx_budget = TX_BATCH_SIZE; mutex_init(&xs->mutex); -- 2.41.3 From: Jason Xing Three targeted optimizations for the batch copy-mode TX hot path: Replace skb_store_bits() with memcpy() for single-buffer first-desc path. After skb_reserve() + skb_put(), the SKB is freshly allocated with all data in the linear area and no frags, so skb_store_bits() degenerates to memcpy(skb->data, buffer, len) but carries unnecessary function call overhead, offset validation, and frag iteration logic. Inline UMEM address computation in Phase 3 and pass the pre-computed buffer pointer to xsk_build_skb(), avoiding the per-packet non-inlined xp_raw_get_data() (EXPORT_SYMBOL) call chain: xsk_buff_raw_get_data -> xp_raw_get_data -> __xp_raw_get_addr + __xp_raw_get_data. In the batch loop the pool->addrs and pool->unaligned are invariant, so we cache them once and compute each buffer address inline. Prefetch the *next* descriptor's UMEM data buffer at the top of the Phase 3 loop, hiding the memory latency of the upcoming memcpy. It improves 3-4% performance stably. Signed-off-by: Jason Xing --- include/net/xdp_sock.h | 3 ++- net/core/skbuff.c | 18 ++++++++++++++++-- net/xdp/xsk.c | 15 ++++++--------- 3 files changed, 24 insertions(+), 12 deletions(-) diff --git a/include/net/xdp_sock.h b/include/net/xdp_sock.h index 0609e3b04279..5e05236c7fba 100644 --- a/include/net/xdp_sock.h +++ b/include/net/xdp_sock.h @@ -139,7 +139,8 @@ void __xsk_map_flush(struct list_head *flush_list); INDIRECT_CALLABLE_DECLARE(void xsk_destruct_skb(struct sk_buff *)); struct sk_buff *xsk_build_skb(struct xdp_sock *xs, struct sk_buff *allocated_skb, - struct xdp_desc *desc); + struct xdp_desc *desc, + void *buffer); int xsk_alloc_batch_skb(struct xdp_sock *xs, u32 nb_pkts, u32 nb_descs, int *err); int xsk_direct_xmit_batch(struct xdp_sock *xs, struct net_device *dev); diff --git a/net/core/skbuff.c b/net/core/skbuff.c index 5726b1566b2b..bef5270e6332 100644 --- a/net/core/skbuff.c +++ b/net/core/skbuff.c @@ -752,14 +752,28 @@ int xsk_alloc_batch_skb(struct xdp_sock *xs, u32 nb_pkts, u32 nb_descs, int *err if (total_truesize) refcount_add(total_truesize, &xs->sk.sk_wmem_alloc); - /* Phase 3: Build SKBs with packet data */ + /* Phase 3: Build SKBs with packet data. */ + struct xsk_buff_pool *pool = xs->pool; + void *pool_addrs = pool->addrs; + bool unaligned = pool->unaligned; + for (j = 0; j < alloc_descs; j++) { + u64 addr = descs[j].addr; + void *buffer; + + if (unaligned) + addr = xp_unaligned_add_offset_to_addr(addr); + buffer = pool_addrs + addr; + + if (j + 1 < alloc_descs) + prefetch(pool_addrs + descs[j + 1].addr); + if (!xs->skb) { skb = skbs[skb_count - 1 - k]; k++; } - skb = xsk_build_skb(xs, skb, &descs[j]); + skb = xsk_build_skb(xs, skb, &descs[j], buffer); if (IS_ERR(skb)) { *err = PTR_ERR(skb); break; diff --git a/net/xdp/xsk.c b/net/xdp/xsk.c index be341290e42c..3bf81b838075 100644 --- a/net/xdp/xsk.c +++ b/net/xdp/xsk.c @@ -811,7 +811,8 @@ static struct sk_buff *xsk_build_skb_zerocopy(struct xdp_sock *xs, struct sk_buff *xsk_build_skb(struct xdp_sock *xs, struct sk_buff *allocated_skb, - struct xdp_desc *desc) + struct xdp_desc *desc, + void *buffer) { struct net_device *dev = xs->dev; struct sk_buff *skb = xs->skb; @@ -825,11 +826,10 @@ struct sk_buff *xsk_build_skb(struct xdp_sock *xs, goto free_err; } } else { - u32 hr, tr, len; - void *buffer; + u32 hr, tr, len = desc->len; - buffer = xsk_buff_raw_get_data(xs->pool, desc->addr); - len = desc->len; + if (!buffer) + buffer = xsk_buff_raw_get_data(xs->pool, desc->addr); if (!skb) { hr = max(NET_SKB_PAD, L1_CACHE_ALIGN(dev->needed_headroom)); @@ -844,10 +844,7 @@ struct sk_buff *xsk_build_skb(struct xdp_sock *xs, skb_reserve(skb, hr); skb_put(skb, len); - - err = skb_store_bits(skb, 0, buffer, len); - if (unlikely(err)) - goto free_err; + memcpy(skb->data, buffer, len); xsk_skb_init_misc(skb, xs, desc->addr); if (desc->options & XDP_TX_METADATA) { -- 2.41.3