From: Jason Xing Add a new socket option to provide an alternative to achieve a higher overall throughput with the rest of series applied. Init skb_cache and desc_batch when setting setsockopt with xs->mutex protection. skb_cache will be used to store newly allocated skb at one time in the xmit path. desc_batch will be used to temporarily store descriptors of pool. Signed-off-by: Jason Xing --- Documentation/networking/af_xdp.rst | 11 +++++++ include/net/xdp_sock.h | 3 ++ include/uapi/linux/if_xdp.h | 1 + net/xdp/xsk.c | 47 +++++++++++++++++++++++++++++ tools/include/uapi/linux/if_xdp.h | 1 + 5 files changed, 63 insertions(+) diff --git a/Documentation/networking/af_xdp.rst b/Documentation/networking/af_xdp.rst index 50d92084a49c..decb4da80db4 100644 --- a/Documentation/networking/af_xdp.rst +++ b/Documentation/networking/af_xdp.rst @@ -447,6 +447,17 @@ mode to allow application to tune the per-socket maximum iteration for better throughput and less frequency of send syscall. Allowed range is [32, xs->tx->nentries]. +XDP_GENERIC_XMIT_BATCH +---------------------- + +It provides an option that allows application to use batch xmit in the copy +mode. Batch process tries to allocate a certain number skbs through bulk +mechanism first and then send them out at one time, minimizing the number +of grabbing/releasing a few locks (like cache lock and queue lock). +it normally gains the overall performance improvement as observed by +xdpsock benchmark, whereas it might increase the latency of per packet. +The maximum value shouldn't be larger than xs->max_tx_budget. + XDP_STATISTICS getsockopt ------------------------- diff --git a/include/net/xdp_sock.h b/include/net/xdp_sock.h index ce587a225661..c2b05268b8ad 100644 --- a/include/net/xdp_sock.h +++ b/include/net/xdp_sock.h @@ -70,6 +70,7 @@ struct xdp_sock { * preventing other XSKs from being starved. */ u32 tx_budget_spent; + u32 generic_xmit_batch; /* Statistics */ u64 rx_dropped; @@ -89,6 +90,8 @@ struct xdp_sock { struct mutex mutex; struct xsk_queue *fq_tmp; /* Only as tmp storage before bind */ struct xsk_queue *cq_tmp; /* Only as tmp storage before bind */ + struct sk_buff **skb_cache; + struct xdp_desc *desc_batch; }; /* diff --git a/include/uapi/linux/if_xdp.h b/include/uapi/linux/if_xdp.h index 23a062781468..44cb72cd328e 100644 --- a/include/uapi/linux/if_xdp.h +++ b/include/uapi/linux/if_xdp.h @@ -80,6 +80,7 @@ struct xdp_mmap_offsets { #define XDP_STATISTICS 7 #define XDP_OPTIONS 8 #define XDP_MAX_TX_SKB_BUDGET 9 +#define XDP_GENERIC_XMIT_BATCH 10 struct xdp_umem_reg { __u64 addr; /* Start of packet data area */ diff --git a/net/xdp/xsk.c b/net/xdp/xsk.c index 9c3acecc14b1..e75a6e2bab83 100644 --- a/net/xdp/xsk.c +++ b/net/xdp/xsk.c @@ -1122,6 +1122,8 @@ static int xsk_release(struct socket *sock) xskq_destroy(xs->tx); xskq_destroy(xs->fq_tmp); xskq_destroy(xs->cq_tmp); + kfree(xs->skb_cache); + kvfree(xs->desc_batch); sock_orphan(sk); sock->sk = NULL; @@ -1456,6 +1458,51 @@ static int xsk_setsockopt(struct socket *sock, int level, int optname, WRITE_ONCE(xs->max_tx_budget, budget); return 0; } + case XDP_GENERIC_XMIT_BATCH: + { + struct xdp_desc *descs; + struct sk_buff **skbs; + unsigned int batch; + int ret = 0; + + if (optlen != sizeof(batch)) + return -EINVAL; + if (copy_from_sockptr(&batch, optval, sizeof(batch))) + return -EFAULT; + if (batch > xs->max_tx_budget) + return -EACCES; + + mutex_lock(&xs->mutex); + if (!batch) { + kfree(xs->skb_cache); + kvfree(xs->desc_batch); + xs->generic_xmit_batch = 0; + goto out; + } + + skbs = kmalloc(batch * sizeof(struct sk_buff *), GFP_KERNEL); + if (!skbs) { + ret = -ENOMEM; + goto out; + } + descs = kvcalloc(batch, sizeof(*xs->desc_batch), GFP_KERNEL); + if (!skbs) { + kfree(skbs); + ret = -ENOMEM; + goto out; + } + if (xs->skb_cache) + kfree(xs->skb_cache); + if (xs->desc_batch) + kvfree(xs->desc_batch); + + xs->skb_cache = skbs; + xs->desc_batch = descs; + xs->generic_xmit_batch = batch; +out: + mutex_unlock(&xs->mutex); + return ret; + } default: break; } diff --git a/tools/include/uapi/linux/if_xdp.h b/tools/include/uapi/linux/if_xdp.h index 23a062781468..44cb72cd328e 100644 --- a/tools/include/uapi/linux/if_xdp.h +++ b/tools/include/uapi/linux/if_xdp.h @@ -80,6 +80,7 @@ struct xdp_mmap_offsets { #define XDP_STATISTICS 7 #define XDP_OPTIONS 8 #define XDP_MAX_TX_SKB_BUDGET 9 +#define XDP_GENERIC_XMIT_BATCH 10 struct xdp_umem_reg { __u64 addr; /* Start of packet data area */ -- 2.41.3 From: Jason Xing Add a new parameter to let generic xmit call this interface in the subsequent patches. Prior to this patch, pool->tx_descs in xskq_cons_read_desc_batch() is only used to store a small number of descs in zerocopy mode. Later another similar cache named xs->desc_batch will be used in copy mode. So adjust the parameter for copy mode. Signed-off-by: Jason Xing --- net/xdp/xsk.c | 2 +- net/xdp/xsk_queue.h | 3 +-- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/net/xdp/xsk.c b/net/xdp/xsk.c index e75a6e2bab83..173ad49379c3 100644 --- a/net/xdp/xsk.c +++ b/net/xdp/xsk.c @@ -509,7 +509,7 @@ u32 xsk_tx_peek_release_desc_batch(struct xsk_buff_pool *pool, u32 nb_pkts) if (!nb_pkts) goto out; - nb_pkts = xskq_cons_read_desc_batch(xs->tx, pool, nb_pkts); + nb_pkts = xskq_cons_read_desc_batch(xs->tx, pool, pool->tx_descs, nb_pkts); if (!nb_pkts) { xs->tx->queue_empty_descs++; goto out; diff --git a/net/xdp/xsk_queue.h b/net/xdp/xsk_queue.h index 46d87e961ad6..47741b4c285d 100644 --- a/net/xdp/xsk_queue.h +++ b/net/xdp/xsk_queue.h @@ -235,10 +235,9 @@ static inline void parse_desc(struct xsk_queue *q, struct xsk_buff_pool *pool, static inline u32 xskq_cons_read_desc_batch(struct xsk_queue *q, struct xsk_buff_pool *pool, - u32 max) + struct xdp_desc *descs, u32 max) { u32 cached_cons = q->cached_cons, nb_entries = 0; - struct xdp_desc *descs = pool->tx_descs; u32 total_descs = 0, nr_frags = 0; /* track first entry, if stumble upon *any* invalid descriptor, rewind -- 2.41.3 From: Jason Xing Add xskq_prod_write_addr_batch_locked() helper for batch xmit. xskq_prod_write_addr_batch() is used in the napi poll env which is already in the softirq so it doesn't need any lock protection. Later this function will be used in the generic xmit path that is non irq, so the locked version as this patch adds is needed. Also add nb_pkts in xskq_prod_write_addr_batch() to count how many skbs instead of descs will be used in the batch xmit at one time, so that main batch xmit function can decide how many skbs will be allocated. Note that xskq_prod_write_addr_batch() was designed to help zerocopy mode because it only cares about descriptors/data itself. Signed-off-by: Jason Xing --- net/xdp/xsk_queue.h | 26 +++++++++++++++++++++++--- 1 file changed, 23 insertions(+), 3 deletions(-) diff --git a/net/xdp/xsk_queue.h b/net/xdp/xsk_queue.h index 47741b4c285d..c444a1e29838 100644 --- a/net/xdp/xsk_queue.h +++ b/net/xdp/xsk_queue.h @@ -389,17 +389,37 @@ static inline int xskq_prod_reserve_addr(struct xsk_queue *q, u64 addr) return 0; } -static inline void xskq_prod_write_addr_batch(struct xsk_queue *q, struct xdp_desc *descs, - u32 nb_entries) +static inline u32 xskq_prod_write_addr_batch(struct xsk_queue *q, struct xdp_desc *descs, + u32 nb_entries) { struct xdp_umem_ring *ring = (struct xdp_umem_ring *)q->ring; u32 i, cached_prod; + u32 nb_pkts = 0; /* A, matches D */ cached_prod = q->cached_prod; - for (i = 0; i < nb_entries; i++) + for (i = 0; i < nb_entries; i++) { ring->desc[cached_prod++ & q->ring_mask] = descs[i].addr; + if (!xp_mb_desc(&descs[i])) + nb_pkts++; + } q->cached_prod = cached_prod; + + return nb_pkts; +} + +static inline u32 +xskq_prod_write_addr_batch_locked(struct xsk_buff_pool *pool, + struct xdp_desc *descs, u32 nb_entries) +{ + unsigned long flags; + u32 nb_pkts; + + spin_lock_irqsave(&pool->cq_lock, flags); + nb_pkts = xskq_prod_write_addr_batch(pool->cq, descs, nb_entries); + spin_unlock_irqrestore(&pool->cq_lock, flags); + + return nb_pkts; } static inline int xskq_prod_reserve_desc(struct xsk_queue *q, -- 2.41.3 From: Jason Xing Batch xmit mode needs to allocate and build skbs at one time. To avoid reinvent the wheel, use xsk_build_skb() as the second half process of the whole initialization of each skb. The original xsk_build_skb() itself allocates a new skb by calling sock_alloc_send_skb whether in copy mode or zerocopy mode. Add a new parameter allocated skb to let other callers to pass an already allocated skb to support later xmit batch feature. At that time, another building skb function will generate a new skb and pass it to xsk_build_skb() to finish the rest of building process, like initializing structures and copying data. Signed-off-by: Jason Xing --- include/net/xdp_sock.h | 4 ++++ net/xdp/xsk.c | 23 ++++++++++++++++------- 2 files changed, 20 insertions(+), 7 deletions(-) diff --git a/include/net/xdp_sock.h b/include/net/xdp_sock.h index c2b05268b8ad..cbba880c27c3 100644 --- a/include/net/xdp_sock.h +++ b/include/net/xdp_sock.h @@ -123,6 +123,10 @@ struct xsk_tx_metadata_ops { void (*tmo_request_launch_time)(u64 launch_time, void *priv); }; + +struct sk_buff *xsk_build_skb(struct xdp_sock *xs, + struct sk_buff *allocated_skb, + struct xdp_desc *desc); #ifdef CONFIG_XDP_SOCKETS int xsk_generic_rcv(struct xdp_sock *xs, struct xdp_buff *xdp); diff --git a/net/xdp/xsk.c b/net/xdp/xsk.c index 173ad49379c3..213d6100e405 100644 --- a/net/xdp/xsk.c +++ b/net/xdp/xsk.c @@ -605,6 +605,7 @@ static void xsk_drop_skb(struct sk_buff *skb) } static struct sk_buff *xsk_build_skb_zerocopy(struct xdp_sock *xs, + struct sk_buff *allocated_skb, struct xdp_desc *desc) { struct xsk_buff_pool *pool = xs->pool; @@ -618,7 +619,10 @@ static struct sk_buff *xsk_build_skb_zerocopy(struct xdp_sock *xs, if (!skb) { hr = max(NET_SKB_PAD, L1_CACHE_ALIGN(xs->dev->needed_headroom)); - skb = sock_alloc_send_skb(&xs->sk, hr, 1, &err); + if (!allocated_skb) + skb = sock_alloc_send_skb(&xs->sk, hr, 1, &err); + else + skb = allocated_skb; if (unlikely(!skb)) return ERR_PTR(err); @@ -657,8 +661,9 @@ static struct sk_buff *xsk_build_skb_zerocopy(struct xdp_sock *xs, return skb; } -static struct sk_buff *xsk_build_skb(struct xdp_sock *xs, - struct xdp_desc *desc) +struct sk_buff *xsk_build_skb(struct xdp_sock *xs, + struct sk_buff *allocated_skb, + struct xdp_desc *desc) { struct xsk_tx_metadata *meta = NULL; struct net_device *dev = xs->dev; @@ -667,7 +672,7 @@ static struct sk_buff *xsk_build_skb(struct xdp_sock *xs, int err; if (dev->priv_flags & IFF_TX_SKB_NO_LINEAR) { - skb = xsk_build_skb_zerocopy(xs, desc); + skb = xsk_build_skb_zerocopy(xs, allocated_skb, desc); if (IS_ERR(skb)) { err = PTR_ERR(skb); goto free_err; @@ -683,8 +688,12 @@ static struct sk_buff *xsk_build_skb(struct xdp_sock *xs, first_frag = true; hr = max(NET_SKB_PAD, L1_CACHE_ALIGN(dev->needed_headroom)); - tr = dev->needed_tailroom; - skb = sock_alloc_send_skb(&xs->sk, hr + len + tr, 1, &err); + if (!allocated_skb) { + tr = dev->needed_tailroom; + skb = sock_alloc_send_skb(&xs->sk, hr + len + tr, 1, &err); + } else { + skb = allocated_skb; + } if (unlikely(!skb)) goto free_err; @@ -818,7 +827,7 @@ static int __xsk_generic_xmit(struct sock *sk) goto out; } - skb = xsk_build_skb(xs, &desc); + skb = xsk_build_skb(xs, NULL, &desc); if (IS_ERR(skb)) { err = PTR_ERR(skb); if (err != -EOVERFLOW) -- 2.41.3 From: Jason Xing Support allocating and building skbs in batch. This patch uses kmem_cache_alloc_bulk() to complete the batch allocation which relies on the global common cache 'net_hotdata.skbuff_cache'. Use a xsk standalone skb cache (namely, xs->skb_cache) to store skbs instead of resorting to napi_alloc_cache that was designed for softirq condition. In case that memory shortage occurs, to avoid frequently allocating skbs and then freeing part of them, using the allocated skbs from cache in a reversed order (like from 10, 9, ..., 2, 1, 0) solves the issue. After allocating memory for each of skbs, in a 'for' loop, the patch borrows part of __allocate_skb() to initializing skb and then calls xsk_build_skb() to complete the rest of whole process, like copying data and stuff. Considering passing no fclone flag during allocation period, in terms of freeing process, napi_consume_skb() in the tx completion would put the skb into different and global cache 'net_hotdata.skbuff_cache' that implements the deferred freeing skb feature to avoid freeing skb one by one. Signed-off-by: Jason Xing --- include/net/xdp_sock.h | 3 ++ net/core/skbuff.c | 103 +++++++++++++++++++++++++++++++++++++++++ 2 files changed, 106 insertions(+) diff --git a/include/net/xdp_sock.h b/include/net/xdp_sock.h index cbba880c27c3..b533317409df 100644 --- a/include/net/xdp_sock.h +++ b/include/net/xdp_sock.h @@ -92,6 +92,7 @@ struct xdp_sock { struct xsk_queue *cq_tmp; /* Only as tmp storage before bind */ struct sk_buff **skb_cache; struct xdp_desc *desc_batch; + unsigned int skb_count; }; /* @@ -127,6 +128,8 @@ struct xsk_tx_metadata_ops { struct sk_buff *xsk_build_skb(struct xdp_sock *xs, struct sk_buff *allocated_skb, struct xdp_desc *desc); +int xsk_alloc_batch_skb(struct xdp_sock *xs, u32 nb_pkts, u32 nb_descs, + int *consumed, int *start, int *end); #ifdef CONFIG_XDP_SOCKETS int xsk_generic_rcv(struct xdp_sock *xs, struct xdp_buff *xdp); diff --git a/net/core/skbuff.c b/net/core/skbuff.c index ee0274417948..c9071e56d133 100644 --- a/net/core/skbuff.c +++ b/net/core/skbuff.c @@ -80,6 +80,8 @@ #include #include #include +#include +#include #include #include @@ -614,6 +616,107 @@ static void *kmalloc_reserve(unsigned int *size, gfp_t flags, int node, return obj; } +int xsk_alloc_batch_skb(struct xdp_sock *xs, u32 nb_pkts, u32 nb_descs, + int *consumed, int *start, int *end) +{ + struct xdp_desc *descs = xs->desc_batch; + struct sk_buff **skbs = xs->skb_cache; + gfp_t gfp_mask = xs->sk.sk_allocation; + struct net_device *dev = xs->dev; + int node = NUMA_NO_NODE; + struct sk_buff *skb; + u32 i = 0, j = 0; + bool pfmemalloc; + u32 base_len; + int err = 0; + u8 *data; + + base_len = max(NET_SKB_PAD, L1_CACHE_ALIGN(dev->needed_headroom)); + if (!(dev->priv_flags & IFF_TX_SKB_NO_LINEAR)) + base_len += dev->needed_tailroom; + + if (xs->skb_count >= nb_pkts) + goto build; + + if (xs->skb) { + i = 1; + xs->skb_count++; + } + + xs->skb_count += kmem_cache_alloc_bulk(net_hotdata.skbuff_cache, + gfp_mask, nb_pkts - xs->skb_count, + (void **)&skbs[xs->skb_count]); + if (xs->skb_count < nb_pkts) + nb_pkts = xs->skb_count; + +build: + for (i = 0, j = 0; j < nb_descs; j++) { + if (!xs->skb) { + u32 size = base_len + descs[j].len; + + /* In case we don't have enough allocated skbs */ + if (i >= nb_pkts) { + err = -EAGAIN; + break; + } + + if (sk_wmem_alloc_get(&xs->sk) > READ_ONCE(xs->sk.sk_sndbuf)) { + err = -EAGAIN; + break; + } + + skb = skbs[xs->skb_count - 1 - i]; + + prefetchw(skb); + /* We do our best to align skb_shared_info on a separate cache + * line. It usually works because kmalloc(X > SMP_CACHE_BYTES) gives + * aligned memory blocks, unless SLUB/SLAB debug is enabled. + * Both skb->head and skb_shared_info are cache line aligned. + */ + data = kmalloc_reserve(&size, gfp_mask, node, &pfmemalloc); + if (unlikely(!data)) { + err = -ENOBUFS; + break; + } + /* kmalloc_size_roundup() might give us more room than requested. + * Put skb_shared_info exactly at the end of allocated zone, + * to allow max possible filling before reallocation. + */ + prefetchw(data + SKB_WITH_OVERHEAD(size)); + + memset(skb, 0, offsetof(struct sk_buff, tail)); + __build_skb_around(skb, data, size); + skb->pfmemalloc = pfmemalloc; + skb_set_owner_w(skb, &xs->sk); + } else if (unlikely(i == 0)) { + /* We have a skb in cache that is left last time */ + kmem_cache_free(net_hotdata.skbuff_cache, skbs[xs->skb_count - 1]); + skbs[xs->skb_count - 1] = xs->skb; + } + + skb = xsk_build_skb(xs, skb, &descs[j]); + if (IS_ERR(skb)) { + err = PTR_ERR(skb); + break; + } + + if (xp_mb_desc(&descs[j])) { + xs->skb = skb; + continue; + } + + xs->skb = NULL; + i++; + } + + *consumed = j; + *start = xs->skb_count - 1; + *end = xs->skb_count - i; + xs->skb_count -= i; + + return err; +} + /* Allocate a new skbuff. We do this ourselves so we can fill in a few * 'private' fields and also do memory statistics to find all the * [BEEP] leaks. -- 2.41.3 From: Jason Xing Add batch xmit logic. Only grabbing the lock and disable bottom half once and sent all the aggregated packets in one loop. Since previous patch puts descriptors in xs->skb_cache in a reversed order, this patch sends each skb out from start to end when 'start' is not smaller than 'end'. Signed-off-by: Jason Xing --- include/linux/netdevice.h | 3 +++ net/core/dev.c | 19 +++++++++++++++++++ 2 files changed, 22 insertions(+) diff --git a/include/linux/netdevice.h b/include/linux/netdevice.h index 5e5de4b0a433..8e2688e3f2e4 100644 --- a/include/linux/netdevice.h +++ b/include/linux/netdevice.h @@ -3352,6 +3352,9 @@ u16 dev_pick_tx_zero(struct net_device *dev, struct sk_buff *skb, int __dev_queue_xmit(struct sk_buff *skb, struct net_device *sb_dev); int __dev_direct_xmit(struct sk_buff *skb, u16 queue_id); +int xsk_direct_xmit_batch(struct sk_buff **skbs, struct net_device *dev, + struct netdev_queue *txq, int *cur, + int start, int end); static inline int dev_queue_xmit(struct sk_buff *skb) { diff --git a/net/core/dev.c b/net/core/dev.c index 68dc47d7e700..a5a6b9a199e9 100644 --- a/net/core/dev.c +++ b/net/core/dev.c @@ -4742,6 +4742,25 @@ int __dev_queue_xmit(struct sk_buff *skb, struct net_device *sb_dev) } EXPORT_SYMBOL(__dev_queue_xmit); +int xsk_direct_xmit_batch(struct sk_buff **skbs, struct net_device *dev, + struct netdev_queue *txq, int *cur, + int start, int end) +{ + int ret = NETDEV_TX_BUSY; + + local_bh_disable(); + HARD_TX_LOCK(dev, txq, smp_processor_id()); + for (*cur = start; *cur >= end; (*cur)--) { + ret = netdev_start_xmit(skbs[*cur], dev, txq, false); + if (ret != NETDEV_TX_OK) + break; + } + HARD_TX_UNLOCK(dev, txq); + local_bh_enable(); + + return ret; +} + int __dev_direct_xmit(struct sk_buff *skb, u16 queue_id) { struct net_device *dev = skb->dev; -- 2.41.3 From: Jason Xing This function __xsk_generic_xmit_batch() is the core function in batches xmit, implement a batch version of __xsk_generic_xmit(). The whole logic is divided into sections: 1. check if we have enough available slots in tx ring and completion ring. 2. read descriptors from tx ring into xs->desc_batch in batches 3. reserve enough slots in completion ring to avoid backpressure 4. allocate and build skbs in batches 5. send all the possible packets in batches at one time Signed-off-by: Jason Xing --- net/xdp/xsk.c | 117 ++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 117 insertions(+) diff --git a/net/xdp/xsk.c b/net/xdp/xsk.c index 213d6100e405..90089a6e78b2 100644 --- a/net/xdp/xsk.c +++ b/net/xdp/xsk.c @@ -789,6 +789,123 @@ struct sk_buff *xsk_build_skb(struct xdp_sock *xs, return ERR_PTR(err); } +static int __xsk_generic_xmit_batch(struct xdp_sock *xs) +{ + struct xdp_desc *descs = xs->desc_batch; + struct xsk_buff_pool *pool = xs->pool; + struct sk_buff **skbs = xs->skb_cache; + u32 nb_pkts, nb_descs, cons_descs; + struct net_device *dev = xs->dev; + int start = 0, end = 0, cur = -1; + u32 i = 0, max_budget; + struct netdev_queue *txq; + bool sent_frame = false; + u32 max_batch, expected; + int err = 0; + + mutex_lock(&xs->mutex); + + /* Since we dropped the RCU read lock, the socket state might have changed. */ + if (unlikely(!xsk_is_bound(xs))) { + err = -ENXIO; + goto out; + } + + if (xs->queue_id >= dev->real_num_tx_queues) + goto out; + + if (unlikely(!netif_running(dev) || + !netif_carrier_ok(dev))) + goto out; + + max_budget = READ_ONCE(xs->max_tx_budget); + max_batch = xs->generic_xmit_batch; + txq = netdev_get_tx_queue(dev, xs->queue_id); + + for (i = 0; i < max_budget; i += cons_descs) { + expected = max_budget - i; + expected = max_batch > expected ? expected : max_batch; + nb_descs = xskq_cons_nb_entries(xs->tx, expected); + if (!nb_descs) + goto out; + + /* This is the backpressure mechanism for the Tx path. Try to + * reserve space in the completion queue for all packets, but + * if there are fewer slots available, just process that many + * packets. This avoids having to implement any buffering in + * the Tx path. + */ + nb_descs = xskq_prod_nb_free(pool->cq, nb_descs); + if (!nb_descs) { + err = -EAGAIN; + goto out; + } + + nb_descs = xskq_cons_read_desc_batch(xs->tx, pool, descs, nb_descs); + if (!nb_descs) { + err = -EAGAIN; + xs->tx->queue_empty_descs++; + goto out; + } + + nb_pkts = xskq_prod_write_addr_batch_locked(pool, descs, nb_descs); + + err = xsk_alloc_batch_skb(xs, nb_pkts, nb_descs, &cons_descs, &start, &end); + /* Return 'nb_descs - cons_descs' number of descs to the + * pool if the batch allocation partially fails + */ + if (cons_descs < nb_descs) { + xskq_cons_cancel_n(xs->tx, nb_descs - cons_descs); + xsk_cq_cancel_locked(xs->pool, nb_descs - cons_descs); + } + + if (start >= end) { + int err_xmit; + + err_xmit = xsk_direct_xmit_batch(skbs, dev, txq, + &cur, start, end); + if (err_xmit == NETDEV_TX_BUSY) { + err = -EAGAIN; + } else if (err_xmit == NET_XMIT_DROP) { + cur++; + err = -EBUSY; + } + + sent_frame = true; + xs->skb = NULL; + } + + if (err) + goto out; + + start = 0; + end = 0; + cur = -1; + } + + /* Maximum budget of descriptors have been consumed */ + err = -EAGAIN; + + if (xskq_has_descs(xs->tx)) { + if (xs->skb) + xsk_drop_skb(xs->skb); + } + +out: + /* If cur is larger than end, we must to clear the rest of + * sbks staying in the skb_cache + */ + for (; cur >= end; cur--) { + xskq_cons_cancel_n(xs->tx, xsk_get_num_desc(skbs[cur])); + xsk_consume_skb(skbs[cur]); + } + if (sent_frame) + __xsk_tx_release(xs); + + mutex_unlock(&xs->mutex); + return err; +} + static int __xsk_generic_xmit(struct sock *sk) { struct xdp_sock *xs = xdp_sk(sk); -- 2.41.3 From: Jason Xing - Move xs->mutex into xsk_generic_xmit to prevent race condition when application manipulates generic_xmit_batch simultaneously. - Enable batch xmit eventually. Make the whole feature work eventually. Signed-off-by: Jason Xing --- net/xdp/xsk.c | 17 ++++++++--------- 1 file changed, 8 insertions(+), 9 deletions(-) diff --git a/net/xdp/xsk.c b/net/xdp/xsk.c index 90089a6e78b2..34fd54ad4768 100644 --- a/net/xdp/xsk.c +++ b/net/xdp/xsk.c @@ -803,8 +803,6 @@ static int __xsk_generic_xmit_batch(struct xdp_sock *xs) u32 max_batch, expected; int err = 0; - mutex_lock(&xs->mutex); - /* Since we dropped the RCU read lock, the socket state might have changed. */ if (unlikely(!xsk_is_bound(xs))) { err = -ENXIO; @@ -902,21 +900,17 @@ static int __xsk_generic_xmit_batch(struct xdp_sock *xs) if (sent_frame) __xsk_tx_release(xs); - mutex_unlock(&xs->mutex); return err; } -static int __xsk_generic_xmit(struct sock *sk) +static int __xsk_generic_xmit(struct xdp_sock *xs) { - struct xdp_sock *xs = xdp_sk(sk); bool sent_frame = false; struct xdp_desc desc; struct sk_buff *skb; u32 max_batch; int err = 0; - mutex_lock(&xs->mutex); - /* Since we dropped the RCU read lock, the socket state might have changed. */ if (unlikely(!xsk_is_bound(xs))) { err = -ENXIO; @@ -991,17 +985,22 @@ static int __xsk_generic_xmit(struct sock *sk) if (sent_frame) __xsk_tx_release(xs); - mutex_unlock(&xs->mutex); return err; } static int xsk_generic_xmit(struct sock *sk) { + struct xdp_sock *xs = xdp_sk(sk); int ret; /* Drop the RCU lock since the SKB path might sleep. */ rcu_read_unlock(); - ret = __xsk_generic_xmit(sk); + mutex_lock(&xs->mutex); + if (xs->generic_xmit_batch) + ret = __xsk_generic_xmit_batch(xs); + else + ret = __xsk_generic_xmit(xs); + mutex_unlock(&xs->mutex); /* Reaquire RCU lock before going into common code. */ rcu_read_lock(); -- 2.41.3 From: Jason Xing Only set xmit.more false for the last skb. In theory, only making xmit.more false for the last packets to be sent in each round can bring much benefit like avoid triggering too many irqs. Compared to the numbers for batch mode, a huge improvement (26%) can be seen on i40e driver while a slight decrease (10%) on virtio_net. Suggested-by: Jesper Dangaard Brouer Signed-off-by: Jason Xing --- Considering different implmentation in VM and host, I'm not sure if we need to create another setsockopt to control this... --- net/core/dev.c | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/net/core/dev.c b/net/core/dev.c index a5a6b9a199e9..9d28a3d0ce3b 100644 --- a/net/core/dev.c +++ b/net/core/dev.c @@ -4751,7 +4751,9 @@ int xsk_direct_xmit_batch(struct sk_buff **skbs, struct net_device *dev, local_bh_disable(); HARD_TX_LOCK(dev, txq, smp_processor_id()); for (*cur = start; *cur >= end; (*cur)--) { - ret = netdev_start_xmit(skbs[*cur], dev, txq, false); + bool more = !!(*cur != end); + + ret = netdev_start_xmit(skbs[*cur], dev, txq, more); if (ret != NETDEV_TX_OK) break; } -- 2.41.3