From: Jason Xing This function __xsk_generic_xmit_batch() is the core function in batches xmit, implement a batch version of __xsk_generic_xmit(). The whole logic is divided into sections: 1. check if we have enough available slots in tx ring and completion ring. 2. read descriptors from tx ring into pool->tx_descs in batches 3. reserve enough slots in completion ring to avoid backpressure 4. allocate and build skbs in batches 5. send all the possible packets in batches at one time Signed-off-by: Jason Xing --- net/xdp/xsk.c | 116 ++++++++++++++++++++++++++++++++++++++++++++ net/xdp/xsk_queue.h | 8 +++ 2 files changed, 124 insertions(+) diff --git a/net/xdp/xsk.c b/net/xdp/xsk.c index c26e26cb4dda..e1ad2ac2b39a 100644 --- a/net/xdp/xsk.c +++ b/net/xdp/xsk.c @@ -920,6 +920,122 @@ struct sk_buff *xsk_build_skb(struct xdp_sock *xs, return ERR_PTR(err); } +static int __xsk_generic_xmit_batch(struct xdp_sock *xs) +{ + struct xsk_buff_pool *pool = xs->pool; + struct xsk_batch *batch = &xs->batch; + struct xdp_desc *descs = batch->desc_cache; + struct net_device *dev = xs->dev; + u32 max_batch, max_budget; + bool sent_frame = false; + struct sk_buff *skb; + u32 cons_descs; + int err = 0; + u32 i = 0; + + mutex_lock(&xs->mutex); + + /* Since we dropped the RCU read lock, the socket state might have changed. */ + if (unlikely(!xsk_is_bound(xs))) { + err = -ENXIO; + goto out; + } + + if (xs->queue_id >= dev->real_num_tx_queues) { + err = -ENXIO; + goto out; + } + + if (unlikely(!netif_running(dev) || !netif_carrier_ok(dev))) { + err = -ENETDOWN; + goto out; + } + + max_budget = READ_ONCE(xs->max_tx_budget); + max_batch = batch->generic_xmit_batch; + + for (i = 0; i < max_budget; i += cons_descs) { + u32 nb_pkts = 0; + u32 nb_descs; + + nb_descs = min(max_batch, max_budget - i); + nb_descs = xskq_cons_nb_entries(xs->tx, nb_descs); + if (!nb_descs) + goto out; + + /* This is the backpressure mechanism for the Tx path. Try to + * reserve space in the completion queue for all packets, but + * if there are fewer slots available, just process that many + * packets. This avoids having to implement any buffering in + * the Tx path. + */ + nb_descs = xsk_cq_reserve_locked(pool, nb_descs); + if (!nb_descs) { + err = -EAGAIN; + goto out; + } + + cons_descs = xskq_cons_read_desc_batch_copy(xs->tx, pool, descs, + nb_descs, &nb_pkts); + if (cons_descs < nb_descs) { + u32 delta = nb_descs - cons_descs; + + xsk_cq_cancel_locked(pool, delta); + xs->tx->queue_empty_descs += delta; + if (!cons_descs) { + err = -EAGAIN; + goto out; + } + nb_descs = cons_descs; + } + + cons_descs = xsk_alloc_batch_skb(xs, nb_pkts, nb_descs, &err); + /* Return 'nb_descs - cons_descs' number of descs to the + * pool if the batch allocation partially fails + */ + if (cons_descs < nb_descs) { + xskq_cons_cancel_n(xs->tx, nb_descs - cons_descs); + xsk_cq_cancel_locked(pool, nb_descs - cons_descs); + } + + if (!skb_queue_empty(&batch->send_queue)) { + int err_xmit; + + err_xmit = xsk_direct_xmit_batch(xs, dev); + if (err_xmit == NETDEV_TX_BUSY) + err = -EAGAIN; + else if (err_xmit == NET_XMIT_DROP) + err = -EBUSY; + + sent_frame = true; + } + + if (err) + goto out; + } + + /* Maximum budget of descriptors have been consumed */ + if (xskq_has_descs(xs->tx)) + err = -EAGAIN; + +out: + if (xs->skb) + xsk_drop_skb(xs->skb); + + /* If send_queue has more pending skbs, we must to clear + * the rest of them. + */ + while ((skb = __skb_dequeue(&batch->send_queue)) != NULL) { + xskq_cons_cancel_n(xs->tx, xsk_get_num_desc(skb)); + xsk_consume_skb(skb); + } + if (sent_frame) + __xsk_tx_release(xs); + + mutex_unlock(&xs->mutex); + return err; +} + static int __xsk_generic_xmit(struct sock *sk) { struct xdp_sock *xs = xdp_sk(sk); diff --git a/net/xdp/xsk_queue.h b/net/xdp/xsk_queue.h index 34cc07d6115e..c3b97c6f2910 100644 --- a/net/xdp/xsk_queue.h +++ b/net/xdp/xsk_queue.h @@ -314,6 +314,14 @@ xskq_cons_read_desc_batch(struct xsk_queue *q, struct xsk_buff_pool *pool, NULL, pool->xdp_zc_max_segs); } +static inline u32 +xskq_cons_read_desc_batch_copy(struct xsk_queue *q, struct xsk_buff_pool *pool, + struct xdp_desc *descs, u32 max, u32 *nb_pkts) +{ + return __xskq_cons_read_desc_batch(q, pool, descs, max, + nb_pkts, MAX_SKB_FRAGS); +} + /* Functions for consumers */ static inline void __xskq_cons_release(struct xsk_queue *q) -- 2.41.3