From: Jason Xing Add a new socket option to provide an alternative to achieve a higher overall throughput with the rest of series applied. As the corresponding documentataion I added says, it might increase the latency because the heavy allocation cannot be avoided especially when the shortage of memory occurs. So this patch don't turn this feature as default. Add generic_xmit_batch to tertermine how many descriptors are handled at one time. It shouldn't be larger than max_tx_budget. Introduce skb_cache when setting setsockopt with xs->mutex protection to store newly allocated skbs at one time. Introduce desc_cache to temporarily cache what descriptors the xsk is about to send each round. Signed-off-by: Jason Xing --- Documentation/networking/af_xdp.rst | 17 ++++++++++ include/net/xdp_sock.h | 7 ++++ include/uapi/linux/if_xdp.h | 1 + net/xdp/xsk.c | 51 +++++++++++++++++++++++++++++ tools/include/uapi/linux/if_xdp.h | 1 + 5 files changed, 77 insertions(+) diff --git a/Documentation/networking/af_xdp.rst b/Documentation/networking/af_xdp.rst index 50d92084a49c..7a8d219efe71 100644 --- a/Documentation/networking/af_xdp.rst +++ b/Documentation/networking/af_xdp.rst @@ -447,6 +447,23 @@ mode to allow application to tune the per-socket maximum iteration for better throughput and less frequency of send syscall. Allowed range is [32, xs->tx->nentries]. +XDP_GENERIC_XMIT_BATCH +---------------------- + +It provides an option that allows application to use batch xmit in the copy +mode. Batch process tries to allocate a certain number skbs through bulk +mechanism first and then initialize them and finally send them out at one +time. +It applies efficient bulk allocation/deallocation function, avoid frequently +grabbing/releasing a few locks (like cache lock and queue lock), minimizing +triggering IRQs from the driver side, which generally gain the overall +performance improvement as observed by xdpsock benchmark. +Potential side effect is that it might increase the latency of per packet +due to memory allocation that is unavoidable and time-consuming. +Setting a relatively large value of batch size could benifit for scenarios +like bulk transmission. The maximum value shouldn't be larger than +xs->max_tx_budget. + XDP_STATISTICS getsockopt ------------------------- diff --git a/include/net/xdp_sock.h b/include/net/xdp_sock.h index ce587a225661..f33f1e7dcea2 100644 --- a/include/net/xdp_sock.h +++ b/include/net/xdp_sock.h @@ -45,6 +45,12 @@ struct xsk_map { struct xdp_sock __rcu *xsk_map[]; }; +struct xsk_batch { + u32 generic_xmit_batch; + struct sk_buff **skb_cache; + struct xdp_desc *desc_cache; +}; + struct xdp_sock { /* struct sock must be the first member of struct xdp_sock */ struct sock sk; @@ -89,6 +95,7 @@ struct xdp_sock { struct mutex mutex; struct xsk_queue *fq_tmp; /* Only as tmp storage before bind */ struct xsk_queue *cq_tmp; /* Only as tmp storage before bind */ + struct xsk_batch batch; }; /* diff --git a/include/uapi/linux/if_xdp.h b/include/uapi/linux/if_xdp.h index 23a062781468..44cb72cd328e 100644 --- a/include/uapi/linux/if_xdp.h +++ b/include/uapi/linux/if_xdp.h @@ -80,6 +80,7 @@ struct xdp_mmap_offsets { #define XDP_STATISTICS 7 #define XDP_OPTIONS 8 #define XDP_MAX_TX_SKB_BUDGET 9 +#define XDP_GENERIC_XMIT_BATCH 10 struct xdp_umem_reg { __u64 addr; /* Start of packet data area */ diff --git a/net/xdp/xsk.c b/net/xdp/xsk.c index 7b0c68a70888..ace91800c447 100644 --- a/net/xdp/xsk.c +++ b/net/xdp/xsk.c @@ -1210,6 +1210,8 @@ static int xsk_release(struct socket *sock) xskq_destroy(xs->tx); xskq_destroy(xs->fq_tmp); xskq_destroy(xs->cq_tmp); + kfree(xs->batch.skb_cache); + kvfree(xs->batch.desc_cache); sock_orphan(sk); sock->sk = NULL; @@ -1544,6 +1546,55 @@ static int xsk_setsockopt(struct socket *sock, int level, int optname, WRITE_ONCE(xs->max_tx_budget, budget); return 0; } + case XDP_GENERIC_XMIT_BATCH: + { + struct xsk_buff_pool *pool = xs->pool; + struct xsk_batch *batch = &xs->batch; + struct xdp_desc *descs; + struct sk_buff **skbs; + unsigned int size; + int ret = 0; + + if (optlen != sizeof(size)) + return -EINVAL; + if (copy_from_sockptr(&size, optval, sizeof(size))) + return -EFAULT; + if (size == batch->generic_xmit_batch) + return 0; + if (size > xs->max_tx_budget || !pool) + return -EACCES; + + mutex_lock(&xs->mutex); + if (!size) { + kfree(batch->skb_cache); + kvfree(batch->desc_cache); + batch->generic_xmit_batch = 0; + goto out; + } + + skbs = kmalloc(size * sizeof(struct sk_buff *), GFP_KERNEL); + if (!skbs) { + ret = -ENOMEM; + goto out; + } + descs = kvcalloc(size, sizeof(struct xdp_desc), GFP_KERNEL); + if (!descs) { + kfree(skbs); + ret = -ENOMEM; + goto out; + } + if (batch->skb_cache) + kfree(batch->skb_cache); + if (batch->desc_cache) + kvfree(batch->desc_cache); + + batch->skb_cache = skbs; + batch->desc_cache = descs; + batch->generic_xmit_batch = size; +out: + mutex_unlock(&xs->mutex); + return ret; + } default: break; } diff --git a/tools/include/uapi/linux/if_xdp.h b/tools/include/uapi/linux/if_xdp.h index 23a062781468..44cb72cd328e 100644 --- a/tools/include/uapi/linux/if_xdp.h +++ b/tools/include/uapi/linux/if_xdp.h @@ -80,6 +80,7 @@ struct xdp_mmap_offsets { #define XDP_STATISTICS 7 #define XDP_OPTIONS 8 #define XDP_MAX_TX_SKB_BUDGET 9 +#define XDP_GENERIC_XMIT_BATCH 10 struct xdp_umem_reg { __u64 addr; /* Start of packet data area */ -- 2.41.3 From: Jason Xing To avoid reinvent the wheel, the patch provides a way to let batch feature to reuse xsk_build_skb() as the rest process of the whole initialization just after the skb is allocated. The original xsk_build_skb() itself allocates a new skb by calling sock_alloc_send_skb whether in copy mode or zerocopy mode. Add a new parameter allocated skb to let other callers to pass an already allocated skb to support later xmit batch feature. It replaces the previous allocation of memory function with a bulk one. Signed-off-by: Jason Xing --- include/net/xdp_sock.h | 3 +++ net/xdp/xsk.c | 23 ++++++++++++++++------- 2 files changed, 19 insertions(+), 7 deletions(-) diff --git a/include/net/xdp_sock.h b/include/net/xdp_sock.h index f33f1e7dcea2..8944f4782eb6 100644 --- a/include/net/xdp_sock.h +++ b/include/net/xdp_sock.h @@ -127,6 +127,9 @@ struct xsk_tx_metadata_ops { void (*tmo_request_launch_time)(u64 launch_time, void *priv); }; +struct sk_buff *xsk_build_skb(struct xdp_sock *xs, + struct sk_buff *allocated_skb, + struct xdp_desc *desc); #ifdef CONFIG_XDP_SOCKETS int xsk_generic_rcv(struct xdp_sock *xs, struct xdp_buff *xdp); diff --git a/net/xdp/xsk.c b/net/xdp/xsk.c index ace91800c447..f9458347ff7b 100644 --- a/net/xdp/xsk.c +++ b/net/xdp/xsk.c @@ -697,6 +697,7 @@ static int xsk_skb_metadata(struct sk_buff *skb, void *buffer, } static struct sk_buff *xsk_build_skb_zerocopy(struct xdp_sock *xs, + struct sk_buff *allocated_skb, struct xdp_desc *desc) { struct xsk_buff_pool *pool = xs->pool; @@ -714,7 +715,10 @@ static struct sk_buff *xsk_build_skb_zerocopy(struct xdp_sock *xs, if (!skb) { hr = max(NET_SKB_PAD, L1_CACHE_ALIGN(xs->dev->needed_headroom)); - skb = sock_alloc_send_skb(&xs->sk, hr, 1, &err); + if (!allocated_skb) + skb = sock_alloc_send_skb(&xs->sk, hr, 1, &err); + else + skb = allocated_skb; if (unlikely(!skb)) return ERR_PTR(err); @@ -769,15 +773,16 @@ static struct sk_buff *xsk_build_skb_zerocopy(struct xdp_sock *xs, return skb; } -static struct sk_buff *xsk_build_skb(struct xdp_sock *xs, - struct xdp_desc *desc) +struct sk_buff *xsk_build_skb(struct xdp_sock *xs, + struct sk_buff *allocated_skb, + struct xdp_desc *desc) { struct net_device *dev = xs->dev; struct sk_buff *skb = xs->skb; int err; if (dev->priv_flags & IFF_TX_SKB_NO_LINEAR) { - skb = xsk_build_skb_zerocopy(xs, desc); + skb = xsk_build_skb_zerocopy(xs, allocated_skb, desc); if (IS_ERR(skb)) { err = PTR_ERR(skb); skb = NULL; @@ -792,8 +797,12 @@ static struct sk_buff *xsk_build_skb(struct xdp_sock *xs, if (!skb) { hr = max(NET_SKB_PAD, L1_CACHE_ALIGN(dev->needed_headroom)); - tr = dev->needed_tailroom; - skb = sock_alloc_send_skb(&xs->sk, hr + len + tr, 1, &err); + if (!allocated_skb) { + tr = dev->needed_tailroom; + skb = sock_alloc_send_skb(&xs->sk, hr + len + tr, 1, &err); + } else { + skb = allocated_skb; + } if (unlikely(!skb)) goto free_err; @@ -906,7 +915,7 @@ static int __xsk_generic_xmit(struct sock *sk) goto out; } - skb = xsk_build_skb(xs, &desc); + skb = xsk_build_skb(xs, NULL, &desc); if (IS_ERR(skb)) { err = PTR_ERR(skb); if (err != -EOVERFLOW) -- 2.41.3 From: Jason Xing Support allocating and building skbs in batch. This patch uses kmem_cache_alloc_bulk() to complete the batch allocation which relies on the global common cache 'net_hotdata.skbuff_cache'. Use a xsk standalone skb cache (namely, xs->skb_cache) to store allocated skbs instead of resorting to napi_alloc_cache that was designed for softirq condition. After allocating memory for each of skbs, in a 'for' loop, the patch borrows part of __allocate_skb() to initialize skb and then calls xsk_build_skb() to complete the rest of initialization process, like copying data and stuff. Add batch.send_queue and use the skb->list to make skbs into one chain so that they can be easily sent which is shown in the subsequent patches. In terms of freeing skbs process, napi_consume_skb() in the tx completion would put the skb into global cache 'net_hotdata.skbuff_cache' that implements the deferred freeing skb feature to avoid freeing skb one by one to improve the performance. Signed-off-by: Jason Xing --- include/net/xdp_sock.h | 3 ++ net/core/skbuff.c | 101 +++++++++++++++++++++++++++++++++++++++++ net/xdp/xsk.c | 1 + 3 files changed, 105 insertions(+) diff --git a/include/net/xdp_sock.h b/include/net/xdp_sock.h index 8944f4782eb6..cb5aa8a314fe 100644 --- a/include/net/xdp_sock.h +++ b/include/net/xdp_sock.h @@ -47,8 +47,10 @@ struct xsk_map { struct xsk_batch { u32 generic_xmit_batch; + unsigned int skb_count; struct sk_buff **skb_cache; struct xdp_desc *desc_cache; + struct sk_buff_head send_queue; }; struct xdp_sock { @@ -130,6 +132,7 @@ struct xsk_tx_metadata_ops { struct sk_buff *xsk_build_skb(struct xdp_sock *xs, struct sk_buff *allocated_skb, struct xdp_desc *desc); +int xsk_alloc_batch_skb(struct xdp_sock *xs, u32 nb_pkts, u32 nb_descs, int *err); #ifdef CONFIG_XDP_SOCKETS int xsk_generic_rcv(struct xdp_sock *xs, struct xdp_buff *xdp); diff --git a/net/core/skbuff.c b/net/core/skbuff.c index bc12790017b0..5b6d3b4fa895 100644 --- a/net/core/skbuff.c +++ b/net/core/skbuff.c @@ -81,6 +81,8 @@ #include #include #include +#include +#include #include #include @@ -615,6 +617,105 @@ static void *kmalloc_reserve(unsigned int *size, gfp_t flags, int node, return obj; } +int xsk_alloc_batch_skb(struct xdp_sock *xs, u32 nb_pkts, u32 nb_descs, int *err) +{ + struct xsk_batch *batch = &xs->batch; + struct xdp_desc *descs = batch->desc_cache; + struct sk_buff **skbs = batch->skb_cache; + gfp_t gfp_mask = xs->sk.sk_allocation; + struct net_device *dev = xs->dev; + int node = NUMA_NO_NODE; + struct sk_buff *skb; + u32 i = 0, j = 0; + bool pfmemalloc; + u32 base_len; + u8 *data; + + base_len = max(NET_SKB_PAD, L1_CACHE_ALIGN(dev->needed_headroom)); + if (!(dev->priv_flags & IFF_TX_SKB_NO_LINEAR)) + base_len += dev->needed_tailroom; + + if (batch->skb_count >= nb_pkts) + goto build; + + if (xs->skb) { + i = 1; + batch->skb_count++; + } + + batch->skb_count += kmem_cache_alloc_bulk(net_hotdata.skbuff_cache, + gfp_mask, nb_pkts - batch->skb_count, + (void **)&skbs[batch->skb_count]); + if (batch->skb_count < nb_pkts) + nb_pkts = batch->skb_count; + +build: + for (i = 0, j = 0; j < nb_descs; j++) { + if (!xs->skb) { + u32 size = base_len + descs[j].len; + + /* In case we don't have enough allocated skbs */ + if (i >= nb_pkts) { + *err = -EAGAIN; + break; + } + + if (sk_wmem_alloc_get(&xs->sk) > READ_ONCE(xs->sk.sk_sndbuf)) { + *err = -EAGAIN; + break; + } + + skb = skbs[batch->skb_count - 1 - i]; + + prefetchw(skb); + /* We do our best to align skb_shared_info on a separate cache + * line. It usually works because kmalloc(X > SMP_CACHE_BYTES) gives + * aligned memory blocks, unless SLUB/SLAB debug is enabled. + * Both skb->head and skb_shared_info are cache line aligned. + */ + data = kmalloc_reserve(&size, gfp_mask, node, &pfmemalloc); + if (unlikely(!data)) { + *err = -ENOBUFS; + break; + } + /* kmalloc_size_roundup() might give us more room than requested. + * Put skb_shared_info exactly at the end of allocated zone, + * to allow max possible filling before reallocation. + */ + prefetchw(data + SKB_WITH_OVERHEAD(size)); + + memset(skb, 0, offsetof(struct sk_buff, tail)); + __build_skb_around(skb, data, size); + skb->pfmemalloc = pfmemalloc; + skb_set_owner_w(skb, &xs->sk); + } else if (unlikely(i == 0)) { + /* We have a skb in cache that is left last time */ + kmem_cache_free(net_hotdata.skbuff_cache, + skbs[batch->skb_count - 1]); + skbs[batch->skb_count - 1] = xs->skb; + } + + skb = xsk_build_skb(xs, skb, &descs[j]); + if (IS_ERR(skb)) { + *err = PTR_ERR(skb); + break; + } + + if (xp_mb_desc(&descs[j])) { + xs->skb = skb; + continue; + } + + xs->skb = NULL; + i++; + __skb_queue_tail(&batch->send_queue, skb); + } + + batch->skb_count -= i; + + return j; +} + /* Allocate a new skbuff. We do this ourselves so we can fill in a few * 'private' fields and also do memory statistics to find all the * [BEEP] leaks. diff --git a/net/xdp/xsk.c b/net/xdp/xsk.c index f9458347ff7b..cf45c7545124 100644 --- a/net/xdp/xsk.c +++ b/net/xdp/xsk.c @@ -1906,6 +1906,7 @@ static int xsk_create(struct net *net, struct socket *sock, int protocol, INIT_LIST_HEAD(&xs->map_list); spin_lock_init(&xs->map_list_lock); + __skb_queue_head_init(&xs->batch.send_queue); mutex_lock(&net->xdp.lock); sk_add_node_rcu(sk, &net->xdp.list); -- 2.41.3 From: Jason Xing Add batch xmit logic. Only grabbing the lock and disable bottom half once and sent all the aggregated packets in one loop. Via skb->list, the already built skbs can be handled one by one. Signed-off-by: Jason Xing --- include/net/xdp_sock.h | 1 + net/core/dev.c | 22 ++++++++++++++++++++++ 2 files changed, 23 insertions(+) diff --git a/include/net/xdp_sock.h b/include/net/xdp_sock.h index cb5aa8a314fe..5cdb8290f752 100644 --- a/include/net/xdp_sock.h +++ b/include/net/xdp_sock.h @@ -133,6 +133,7 @@ struct sk_buff *xsk_build_skb(struct xdp_sock *xs, struct sk_buff *allocated_skb, struct xdp_desc *desc); int xsk_alloc_batch_skb(struct xdp_sock *xs, u32 nb_pkts, u32 nb_descs, int *err); +int xsk_direct_xmit_batch(struct xdp_sock *xs, struct net_device *dev); #ifdef CONFIG_XDP_SOCKETS int xsk_generic_rcv(struct xdp_sock *xs, struct xdp_buff *xdp); diff --git a/net/core/dev.c b/net/core/dev.c index a64cef2c537e..32de76c79d29 100644 --- a/net/core/dev.c +++ b/net/core/dev.c @@ -163,6 +163,7 @@ #include #include #include +#include #include "dev.h" #include "devmem.h" @@ -4792,6 +4793,27 @@ int __dev_queue_xmit(struct sk_buff *skb, struct net_device *sb_dev) } EXPORT_SYMBOL(__dev_queue_xmit); +int xsk_direct_xmit_batch(struct xdp_sock *xs, struct net_device *dev) +{ + u16 queue_id = xs->queue_id; + struct netdev_queue *txq = netdev_get_tx_queue(dev, queue_id); + int ret = NETDEV_TX_BUSY; + struct sk_buff *skb; + + local_bh_disable(); + HARD_TX_LOCK(dev, txq, smp_processor_id()); + while ((skb = __skb_dequeue(&xs->batch.send_queue)) != NULL) { + skb_set_queue_mapping(skb, queue_id); + ret = netdev_start_xmit(skb, dev, txq, false); + if (ret != NETDEV_TX_OK) + break; + } + HARD_TX_UNLOCK(dev, txq); + local_bh_enable(); + + return ret; +} + int __dev_direct_xmit(struct sk_buff *skb, u16 queue_id) { struct net_device *dev = skb->dev; -- 2.41.3 From: Jason Xing Rename the last parameter to nb_descs for more accurate naming. Next patch will add a real nb_pkts parameter to help copy mode count how many pakcets are needed. No functional change here. Signed-off-by: Jason Xing --- net/xdp/xsk.c | 28 ++++++++++++++-------------- 1 file changed, 14 insertions(+), 14 deletions(-) diff --git a/net/xdp/xsk.c b/net/xdp/xsk.c index cf45c7545124..b057d10fcf6a 100644 --- a/net/xdp/xsk.c +++ b/net/xdp/xsk.c @@ -485,16 +485,16 @@ EXPORT_SYMBOL(xsk_tx_peek_desc); static u32 xsk_tx_peek_release_fallback(struct xsk_buff_pool *pool, u32 max_entries) { struct xdp_desc *descs = pool->tx_descs; - u32 nb_pkts = 0; + u32 nb_descs = 0; - while (nb_pkts < max_entries && xsk_tx_peek_desc(pool, &descs[nb_pkts])) - nb_pkts++; + while (nb_descs < max_entries && xsk_tx_peek_desc(pool, &descs[nb_descs])) + nb_descs++; xsk_tx_release(pool); - return nb_pkts; + return nb_descs; } -u32 xsk_tx_peek_release_desc_batch(struct xsk_buff_pool *pool, u32 nb_pkts) +u32 xsk_tx_peek_release_desc_batch(struct xsk_buff_pool *pool, u32 nb_descs) { struct xdp_sock *xs; @@ -502,16 +502,16 @@ u32 xsk_tx_peek_release_desc_batch(struct xsk_buff_pool *pool, u32 nb_pkts) if (!list_is_singular(&pool->xsk_tx_list)) { /* Fallback to the non-batched version */ rcu_read_unlock(); - return xsk_tx_peek_release_fallback(pool, nb_pkts); + return xsk_tx_peek_release_fallback(pool, nb_descs); } xs = list_first_or_null_rcu(&pool->xsk_tx_list, struct xdp_sock, tx_list); if (!xs) { - nb_pkts = 0; + nb_descs = 0; goto out; } - nb_pkts = xskq_cons_nb_entries(xs->tx, nb_pkts); + nb_descs = xskq_cons_nb_entries(xs->tx, nb_descs); /* This is the backpressure mechanism for the Tx path. Try to * reserve space in the completion queue for all packets, but @@ -519,23 +519,23 @@ u32 xsk_tx_peek_release_desc_batch(struct xsk_buff_pool *pool, u32 nb_pkts) * packets. This avoids having to implement any buffering in * the Tx path. */ - nb_pkts = xskq_prod_nb_free(pool->cq, nb_pkts); - if (!nb_pkts) + nb_descs = xskq_prod_nb_free(pool->cq, nb_descs); + if (!nb_descs) goto out; - nb_pkts = xskq_cons_read_desc_batch(xs->tx, pool, nb_pkts); - if (!nb_pkts) { + nb_descs = xskq_cons_read_desc_batch(xs->tx, pool, nb_descs); + if (!nb_descs) { xs->tx->queue_empty_descs++; goto out; } __xskq_cons_release(xs->tx); - xskq_prod_write_addr_batch(pool->cq, pool->tx_descs, nb_pkts); + xskq_prod_write_addr_batch(pool->cq, pool->tx_descs, nb_descs); xs->sk.sk_write_space(&xs->sk); out: rcu_read_unlock(); - return nb_pkts; + return nb_descs; } EXPORT_SYMBOL(xsk_tx_peek_release_desc_batch); -- 2.41.3 From: Jason Xing Add a new parameter nb_pkts to count how many packets are needed practically by copy mode with the help of XDP_PKT_CONTD option. Add descs to provide a way to pass xs->desc_cache to store the descriptors for copy mode. Signed-off-by: Jason Xing --- net/xdp/xsk.c | 3 ++- net/xdp/xsk_queue.h | 5 +++-- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/net/xdp/xsk.c b/net/xdp/xsk.c index b057d10fcf6a..d30090a8420f 100644 --- a/net/xdp/xsk.c +++ b/net/xdp/xsk.c @@ -523,7 +523,8 @@ u32 xsk_tx_peek_release_desc_batch(struct xsk_buff_pool *pool, u32 nb_descs) if (!nb_descs) goto out; - nb_descs = xskq_cons_read_desc_batch(xs->tx, pool, nb_descs); + nb_descs = xskq_cons_read_desc_batch(xs->tx, pool, pool->tx_descs, + nb_descs, NULL); if (!nb_descs) { xs->tx->queue_empty_descs++; goto out; diff --git a/net/xdp/xsk_queue.h b/net/xdp/xsk_queue.h index f16f390370dc..9caa0cfe29de 100644 --- a/net/xdp/xsk_queue.h +++ b/net/xdp/xsk_queue.h @@ -235,10 +235,9 @@ static inline void parse_desc(struct xsk_queue *q, struct xsk_buff_pool *pool, static inline u32 xskq_cons_read_desc_batch(struct xsk_queue *q, struct xsk_buff_pool *pool, - u32 max) + struct xdp_desc *descs, u32 max, u32 *nb_pkts) { u32 cached_cons = q->cached_cons, nb_entries = 0; - struct xdp_desc *descs = pool->tx_descs; u32 total_descs = 0, nr_frags = 0; /* track first entry, if stumble upon *any* invalid descriptor, rewind @@ -258,6 +257,8 @@ u32 xskq_cons_read_desc_batch(struct xsk_queue *q, struct xsk_buff_pool *pool, if (likely(!parsed.mb)) { total_descs += (nr_frags + 1); nr_frags = 0; + if (nb_pkts) + (*nb_pkts)++; } else { nr_frags++; if (nr_frags == pool->xdp_zc_max_segs) { -- 2.41.3 From: Jason Xing This function __xsk_generic_xmit_batch() is the core function in batches xmit, implement a batch version of __xsk_generic_xmit(). The whole logic is divided into sections: 1. check if we have enough available slots in tx ring and completion ring. 2. read descriptors from tx ring into pool->tx_descs in batches 3. reserve enough slots in completion ring to avoid backpressure 4. allocate and build skbs in batches 5. send all the possible packets in batches at one time Signed-off-by: Jason Xing --- net/xdp/xsk.c | 108 ++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 108 insertions(+) diff --git a/net/xdp/xsk.c b/net/xdp/xsk.c index d30090a8420f..1fa099653b7d 100644 --- a/net/xdp/xsk.c +++ b/net/xdp/xsk.c @@ -878,6 +878,114 @@ struct sk_buff *xsk_build_skb(struct xdp_sock *xs, return ERR_PTR(err); } +static int __xsk_generic_xmit_batch(struct xdp_sock *xs) +{ + struct xsk_batch *batch = &xs->batch; + struct xdp_desc *descs = batch->desc_cache; + struct xsk_buff_pool *pool = xs->pool; + u32 nb_pkts, nb_descs, cons_descs; + struct net_device *dev = xs->dev; + bool sent_frame = false; + u32 max_batch, expected; + u32 i = 0, max_budget; + struct sk_buff *skb; + int err = 0; + + mutex_lock(&xs->mutex); + + /* Since we dropped the RCU read lock, the socket state might have changed. */ + if (unlikely(!xsk_is_bound(xs))) { + err = -ENXIO; + goto out; + } + + if (xs->queue_id >= dev->real_num_tx_queues) + goto out; + + if (unlikely(!netif_running(dev) || + !netif_carrier_ok(dev))) + goto out; + + max_budget = READ_ONCE(xs->max_tx_budget); + max_batch = batch->generic_xmit_batch; + + for (i = 0; i < max_budget; i += cons_descs) { + expected = max_budget - i; + expected = max_batch > expected ? expected : max_batch; + nb_descs = xskq_cons_nb_entries(xs->tx, expected); + if (!nb_descs) + goto out; + + /* This is the backpressure mechanism for the Tx path. Try to + * reserve space in the completion queue for all packets, but + * if there are fewer slots available, just process that many + * packets. This avoids having to implement any buffering in + * the Tx path. + */ + nb_descs = xskq_prod_nb_free(pool->cq, nb_descs); + if (!nb_descs) { + err = -EAGAIN; + goto out; + } + + nb_pkts = 0; + nb_descs = xskq_cons_read_desc_batch(xs->tx, pool, descs, + nb_descs, &nb_pkts); + if (!nb_descs) { + err = -EAGAIN; + xs->tx->queue_empty_descs++; + goto out; + } + + cons_descs = xsk_alloc_batch_skb(xs, nb_pkts, nb_descs, &err); + /* Return 'nb_descs - cons_descs' number of descs to the + * pool if the batch allocation partially fails + */ + if (cons_descs < nb_descs) { + xskq_cons_cancel_n(xs->tx, nb_descs - cons_descs); + xsk_cq_cancel_locked(xs->pool, nb_descs - cons_descs); + } + + if (!skb_queue_empty(&batch->send_queue)) { + int err_xmit; + + err_xmit = xsk_direct_xmit_batch(xs, dev); + if (err_xmit == NETDEV_TX_BUSY) + err = -EAGAIN; + else if (err_xmit == NET_XMIT_DROP) + err = -EBUSY; + + sent_frame = true; + xs->skb = NULL; + } + + if (err) + goto out; + } + + /* Maximum budget of descriptors have been consumed */ + err = -EAGAIN; + + if (xskq_has_descs(xs->tx)) { + if (xs->skb) + xsk_drop_skb(xs->skb); + } + +out: + /* If send_queue has more pending skbs, we must to clear + * the rest of them. + */ + while ((skb = __skb_dequeue(&batch->send_queue)) != NULL) { + xskq_cons_cancel_n(xs->tx, xsk_get_num_desc(skb)); + xsk_consume_skb(skb); + } + if (sent_frame) + __xsk_tx_release(xs); + + mutex_unlock(&xs->mutex); + return err; +} + static int __xsk_generic_xmit(struct sock *sk) { struct xdp_sock *xs = xdp_sk(sk); -- 2.41.3 From: Jason Xing - Move xs->mutex into xsk_generic_xmit to prevent race condition when application manipulates generic_xmit_batch simultaneously. - Enable batch xmit eventually. Make the whole feature work eventually. Signed-off-by: Jason Xing --- net/xdp/xsk.c | 17 ++++++++--------- 1 file changed, 8 insertions(+), 9 deletions(-) diff --git a/net/xdp/xsk.c b/net/xdp/xsk.c index 1fa099653b7d..3741071c68fd 100644 --- a/net/xdp/xsk.c +++ b/net/xdp/xsk.c @@ -891,8 +891,6 @@ static int __xsk_generic_xmit_batch(struct xdp_sock *xs) struct sk_buff *skb; int err = 0; - mutex_lock(&xs->mutex); - /* Since we dropped the RCU read lock, the socket state might have changed. */ if (unlikely(!xsk_is_bound(xs))) { err = -ENXIO; @@ -982,21 +980,17 @@ static int __xsk_generic_xmit_batch(struct xdp_sock *xs) if (sent_frame) __xsk_tx_release(xs); - mutex_unlock(&xs->mutex); return err; } -static int __xsk_generic_xmit(struct sock *sk) +static int __xsk_generic_xmit(struct xdp_sock *xs) { - struct xdp_sock *xs = xdp_sk(sk); bool sent_frame = false; struct xdp_desc desc; struct sk_buff *skb; u32 max_batch; int err = 0; - mutex_lock(&xs->mutex); - /* Since we dropped the RCU read lock, the socket state might have changed. */ if (unlikely(!xsk_is_bound(xs))) { err = -ENXIO; @@ -1071,17 +1065,22 @@ static int __xsk_generic_xmit(struct sock *sk) if (sent_frame) __xsk_tx_release(xs); - mutex_unlock(&xs->mutex); return err; } static int xsk_generic_xmit(struct sock *sk) { + struct xdp_sock *xs = xdp_sk(sk); int ret; /* Drop the RCU lock since the SKB path might sleep. */ rcu_read_unlock(); - ret = __xsk_generic_xmit(sk); + mutex_lock(&xs->mutex); + if (xs->batch.generic_xmit_batch) + ret = __xsk_generic_xmit_batch(xs); + else + ret = __xsk_generic_xmit(xs); + mutex_unlock(&xs->mutex); /* Reaquire RCU lock before going into common code. */ rcu_read_lock(); -- 2.41.3 From: Jason Xing Only set xmit.more false for the last skb. In theory, only making xmit.more false for the last packets to be sent in each round can bring much benefit like avoid triggering too many irqs. Compared to the numbers for batch mode, a huge improvement (26%) can be seen on i40e/ixgbe driver since the cost of triggering irqs is expensive. Suggested-by: Jesper Dangaard Brouer Signed-off-by: Jason Xing --- net/core/dev.c | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/net/core/dev.c b/net/core/dev.c index 32de76c79d29..549a95b9d96f 100644 --- a/net/core/dev.c +++ b/net/core/dev.c @@ -4797,14 +4797,18 @@ int xsk_direct_xmit_batch(struct xdp_sock *xs, struct net_device *dev) { u16 queue_id = xs->queue_id; struct netdev_queue *txq = netdev_get_tx_queue(dev, queue_id); + struct sk_buff_head *send_queue = &xs->batch.send_queue; int ret = NETDEV_TX_BUSY; struct sk_buff *skb; + bool more = true; local_bh_disable(); HARD_TX_LOCK(dev, txq, smp_processor_id()); - while ((skb = __skb_dequeue(&xs->batch.send_queue)) != NULL) { + while ((skb = __skb_dequeue(send_queue)) != NULL) { + if (!skb_peek(send_queue)) + more = false; skb_set_queue_mapping(skb, queue_id); - ret = netdev_start_xmit(skb, dev, txq, false); + ret = netdev_start_xmit(skb, dev, txq, more); if (ret != NETDEV_TX_OK) break; } -- 2.41.3