From: Jason Xing This patch is to prepare for later batch xmit in generic path. Add a new socket option to provide an alternative to achieve a higher overall throughput. skb_batch will be used to store newly allocated skb at one time in the xmit path. Signed-off-by: Jason Xing --- Documentation/networking/af_xdp.rst | 9 ++++++++ include/net/xdp_sock.h | 2 ++ include/uapi/linux/if_xdp.h | 1 + net/xdp/xsk.c | 32 +++++++++++++++++++++++++++++ tools/include/uapi/linux/if_xdp.h | 1 + 5 files changed, 45 insertions(+) diff --git a/Documentation/networking/af_xdp.rst b/Documentation/networking/af_xdp.rst index 50d92084a49c..1194bdfaf61e 100644 --- a/Documentation/networking/af_xdp.rst +++ b/Documentation/networking/af_xdp.rst @@ -447,6 +447,15 @@ mode to allow application to tune the per-socket maximum iteration for better throughput and less frequency of send syscall. Allowed range is [32, xs->tx->nentries]. +XDP_GENERIC_XMIT_BATCH +---------------------- + +It provides an option that allows application to use batch xmit in the copy +mode. Batch process minimizes the number of grabbing/releasing queue lock +without redundant actions compared to before to gain the overall performance +improvement whereas it might increase the latency of per packet. The maximum +value shouldn't be larger than xs->max_tx_budget. + XDP_STATISTICS getsockopt ------------------------- diff --git a/include/net/xdp_sock.h b/include/net/xdp_sock.h index ce587a225661..b5a3e37da8db 100644 --- a/include/net/xdp_sock.h +++ b/include/net/xdp_sock.h @@ -61,6 +61,7 @@ struct xdp_sock { XSK_BOUND, XSK_UNBOUND, } state; + struct sk_buff **skb_batch; struct xsk_queue *tx ____cacheline_aligned_in_smp; struct list_head tx_list; @@ -70,6 +71,7 @@ struct xdp_sock { * preventing other XSKs from being starved. */ u32 tx_budget_spent; + u32 generic_xmit_batch; /* Statistics */ u64 rx_dropped; diff --git a/include/uapi/linux/if_xdp.h b/include/uapi/linux/if_xdp.h index 23a062781468..44cb72cd328e 100644 --- a/include/uapi/linux/if_xdp.h +++ b/include/uapi/linux/if_xdp.h @@ -80,6 +80,7 @@ struct xdp_mmap_offsets { #define XDP_STATISTICS 7 #define XDP_OPTIONS 8 #define XDP_MAX_TX_SKB_BUDGET 9 +#define XDP_GENERIC_XMIT_BATCH 10 struct xdp_umem_reg { __u64 addr; /* Start of packet data area */ diff --git a/net/xdp/xsk.c b/net/xdp/xsk.c index 9c3acecc14b1..7a149f4ac273 100644 --- a/net/xdp/xsk.c +++ b/net/xdp/xsk.c @@ -1122,6 +1122,7 @@ static int xsk_release(struct socket *sock) xskq_destroy(xs->tx); xskq_destroy(xs->fq_tmp); xskq_destroy(xs->cq_tmp); + kfree(xs->skb_batch); sock_orphan(sk); sock->sk = NULL; @@ -1456,6 +1457,37 @@ static int xsk_setsockopt(struct socket *sock, int level, int optname, WRITE_ONCE(xs->max_tx_budget, budget); return 0; } + case XDP_GENERIC_XMIT_BATCH: + { + unsigned int batch, batch_alloc_len; + struct sk_buff **new; + + if (optlen != sizeof(batch)) + return -EINVAL; + if (copy_from_sockptr(&batch, optval, sizeof(batch))) + return -EFAULT; + if (batch > xs->max_tx_budget) + return -EACCES; + + mutex_lock(&xs->mutex); + if (!batch) { + kfree(xs->skb_batch); + xs->generic_xmit_batch = 0; + goto out; + } + batch_alloc_len = sizeof(struct sk_buff *) * batch; + new = kmalloc(batch_alloc_len, GFP_KERNEL); + if (!new) + return -ENOMEM; + if (xs->skb_batch) + kfree(xs->skb_batch); + + xs->skb_batch = new; + xs->generic_xmit_batch = batch; +out: + mutex_unlock(&xs->mutex); + return 0; + } default: break; } diff --git a/tools/include/uapi/linux/if_xdp.h b/tools/include/uapi/linux/if_xdp.h index 23a062781468..44cb72cd328e 100644 --- a/tools/include/uapi/linux/if_xdp.h +++ b/tools/include/uapi/linux/if_xdp.h @@ -80,6 +80,7 @@ struct xdp_mmap_offsets { #define XDP_STATISTICS 7 #define XDP_OPTIONS 8 #define XDP_MAX_TX_SKB_BUDGET 9 +#define XDP_GENERIC_XMIT_BATCH 10 struct xdp_umem_reg { __u64 addr; /* Start of packet data area */ -- 2.41.3 From: Jason Xing Zerocopy mode has a good feature named multi buffer while copy mode has to transmit skb one by one like normal flows. The latter might lose the bypass power to some extent because of grabbing/releasing the same tx queue lock and disabling/enabling bh and stuff on a packet basis. Contending the same queue lock will bring a worse result. This patch supports batch feature by permitting owning the queue lock to send the generic_xmit_batch number of packets at one time. To further achieve a better result, some codes[1] are removed on purpose from xsk_direct_xmit_batch() as referred to __dev_direct_xmit(). [1] 1. advance the device check to granularity of sendto syscall. 2. remove validating packets because of its uselessness. 3. remove operation of softnet_data.xmit.recursion because it's not necessary. 4. remove BQL flow control. We don't need to do BQL control because it probably limit the speed. An ideal scenario is to use a standalone and clean tx queue to send packets only for xsk. Less competition shows better performance results. Experiments: 1) Tested on virtio_net: With this patch series applied, the performance number of xdpsock[2] goes up by 33%. Before, it was 767743 pps; while after it was 1021486 pps. If we test with another thread competing the same queue, a 28% increase (from 405466 pps to 521076 pps) can be observed. 2) Tested on ixgbe: The results of zerocopy and copy mode are respectively 1303277 pps and 1187347 pps. After this socket option took effect, copy mode reaches 1472367 which was higher than zerocopy mode impressively. [2]: ./xdpsock -i eth1 -t -S -s 64 It's worth mentioning batch process might bring high latency in certain cases. The recommended value is 32. Signed-off-by: Jason Xing --- include/linux/netdevice.h | 2 + net/core/dev.c | 18 +++++++ net/xdp/xsk.c | 103 ++++++++++++++++++++++++++++++++++++-- 3 files changed, 120 insertions(+), 3 deletions(-) diff --git a/include/linux/netdevice.h b/include/linux/netdevice.h index 5e5de4b0a433..27738894daa7 100644 --- a/include/linux/netdevice.h +++ b/include/linux/netdevice.h @@ -3352,6 +3352,8 @@ u16 dev_pick_tx_zero(struct net_device *dev, struct sk_buff *skb, int __dev_queue_xmit(struct sk_buff *skb, struct net_device *sb_dev); int __dev_direct_xmit(struct sk_buff *skb, u16 queue_id); +int xsk_direct_xmit_batch(struct sk_buff **skb, struct net_device *dev, + struct netdev_queue *txq, u32 max_batch, u32 *cur); static inline int dev_queue_xmit(struct sk_buff *skb) { diff --git a/net/core/dev.c b/net/core/dev.c index 68dc47d7e700..7a512bd38806 100644 --- a/net/core/dev.c +++ b/net/core/dev.c @@ -4742,6 +4742,24 @@ int __dev_queue_xmit(struct sk_buff *skb, struct net_device *sb_dev) } EXPORT_SYMBOL(__dev_queue_xmit); +int xsk_direct_xmit_batch(struct sk_buff **skb, struct net_device *dev, + struct netdev_queue *txq, u32 max_batch, u32 *cur) +{ + int ret = NETDEV_TX_BUSY; + + local_bh_disable(); + HARD_TX_LOCK(dev, txq, smp_processor_id()); + for (; *cur < max_batch; (*cur)++) { + ret = netdev_start_xmit(skb[*cur], dev, txq, false); + if (ret != NETDEV_TX_OK) + break; + } + HARD_TX_UNLOCK(dev, txq); + local_bh_enable(); + + return ret; +} + int __dev_direct_xmit(struct sk_buff *skb, u16 queue_id) { struct net_device *dev = skb->dev; diff --git a/net/xdp/xsk.c b/net/xdp/xsk.c index 7a149f4ac273..92ad82472776 100644 --- a/net/xdp/xsk.c +++ b/net/xdp/xsk.c @@ -780,9 +780,102 @@ static struct sk_buff *xsk_build_skb(struct xdp_sock *xs, return ERR_PTR(err); } -static int __xsk_generic_xmit(struct sock *sk) +static int __xsk_generic_xmit_batch(struct xdp_sock *xs) +{ + u32 max_batch = READ_ONCE(xs->generic_xmit_batch); + struct sk_buff **skb = xs->skb_batch; + struct net_device *dev = xs->dev; + struct netdev_queue *txq; + bool sent_frame = false; + struct xdp_desc desc; + u32 i = 0, j = 0; + u32 max_budget; + int err = 0; + + mutex_lock(&xs->mutex); + + /* Since we dropped the RCU read lock, the socket state might have changed. */ + if (unlikely(!xsk_is_bound(xs))) { + err = -ENXIO; + goto out; + } + + if (xs->queue_id >= dev->real_num_tx_queues) + goto out; + + if (unlikely(!netif_running(dev) || + !netif_carrier_ok(dev))) + goto out; + + max_budget = READ_ONCE(xs->max_tx_budget); + txq = netdev_get_tx_queue(dev, xs->queue_id); + do { + for (; i < max_batch && xskq_cons_peek_desc(xs->tx, &desc, xs->pool); i++) { + if (max_budget-- == 0) { + err = -EAGAIN; + break; + } + /* This is the backpressure mechanism for the Tx path. + * Reserve space in the completion queue and only proceed + * if there is space in it. This avoids having to implement + * any buffering in the Tx path. + */ + err = xsk_cq_reserve_addr_locked(xs->pool, desc.addr); + if (err) { + err = -EAGAIN; + break; + } + + skb[i] = xsk_build_skb(xs, &desc); + if (IS_ERR(skb[i])) { + err = PTR_ERR(skb[i]); + break; + } + + xskq_cons_release(xs->tx); + + if (xp_mb_desc(&desc)) + xs->skb = skb[i]; + } + + if (i) { + err = xsk_direct_xmit_batch(skb, dev, txq, i, &j); + if (err == NETDEV_TX_BUSY) { + err = -EAGAIN; + } else if (err == NET_XMIT_DROP) { + j++; + err = -EBUSY; + } + + sent_frame = true; + xs->skb = NULL; + } + + if (err) + goto out; + i = j = 0; + } while (xskq_cons_peek_desc(xs->tx, &desc, xs->pool)); + + if (xskq_has_descs(xs->tx)) { + if (xs->skb) + xsk_drop_skb(xs->skb); + xskq_cons_release(xs->tx); + } + +out: + for (; j < i; j++) { + xskq_cons_cancel_n(xs->tx, xsk_get_num_desc(skb[j])); + xsk_consume_skb(skb[j]); + } + if (sent_frame) + __xsk_tx_release(xs); + + mutex_unlock(&xs->mutex); + return err; +} + +static int __xsk_generic_xmit(struct xdp_sock *xs) { - struct xdp_sock *xs = xdp_sk(sk); bool sent_frame = false; struct xdp_desc desc; struct sk_buff *skb; @@ -871,11 +964,15 @@ static int __xsk_generic_xmit(struct sock *sk) static int xsk_generic_xmit(struct sock *sk) { + struct xdp_sock *xs = xdp_sk(sk); int ret; /* Drop the RCU lock since the SKB path might sleep. */ rcu_read_unlock(); - ret = __xsk_generic_xmit(sk); + if (READ_ONCE(xs->generic_xmit_batch)) + ret = __xsk_generic_xmit_batch(xs); + else + ret = __xsk_generic_xmit(xs); /* Reaquire RCU lock before going into common code. */ rcu_read_lock(); -- 2.41.3