From: Jason Xing Add a union member for completion queue only in copy mode for now. The purpose is to replace the cq_cached_prod_lock with atomic operation to improve performance. Note that completion queue in zerocopy mode doesn't need to be converted because the whole process is lockless. Signed-off-by: Jason Xing --- net/xdp/xsk_queue.h | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/net/xdp/xsk_queue.h b/net/xdp/xsk_queue.h index 1eb8d9f8b104..44cc01555c0b 100644 --- a/net/xdp/xsk_queue.h +++ b/net/xdp/xsk_queue.h @@ -40,7 +40,11 @@ struct xdp_umem_ring { struct xsk_queue { u32 ring_mask; u32 nentries; - u32 cached_prod; + union { + u32 cached_prod; + /* Used for cq in copy mode only */ + atomic_t cached_prod_atomic; + }; u32 cached_cons; struct xdp_ring *ring; u64 invalid_descs; -- 2.41.3 From: Jason Xing Use some exclusive functions for cached_prod in generic path instead of extending unified functions to avoid affecting zerocopy feature. Use atomic operations. Signed-off-by: Jason Xing --- net/xdp/xsk.c | 4 ++-- net/xdp/xsk_queue.h | 21 ++++++++++++++++++--- 2 files changed, 20 insertions(+), 5 deletions(-) diff --git a/net/xdp/xsk.c b/net/xdp/xsk.c index bcfd400e9cf8..b63409b1422e 100644 --- a/net/xdp/xsk.c +++ b/net/xdp/xsk.c @@ -551,7 +551,7 @@ static int xsk_cq_reserve_locked(struct xsk_buff_pool *pool) int ret; spin_lock(&pool->cq_cached_prod_lock); - ret = xskq_prod_reserve(pool->cq); + ret = xsk_cq_cached_prod_reserve(pool->cq); spin_unlock(&pool->cq_cached_prod_lock); return ret; @@ -588,7 +588,7 @@ static void xsk_cq_submit_addr_locked(struct xsk_buff_pool *pool, static void xsk_cq_cancel_locked(struct xsk_buff_pool *pool, u32 n) { spin_lock(&pool->cq_cached_prod_lock); - xskq_prod_cancel_n(pool->cq, n); + atomic_sub(n, &pool->cq->cached_prod_atomic); spin_unlock(&pool->cq_cached_prod_lock); } diff --git a/net/xdp/xsk_queue.h b/net/xdp/xsk_queue.h index 44cc01555c0b..3a023791b273 100644 --- a/net/xdp/xsk_queue.h +++ b/net/xdp/xsk_queue.h @@ -402,13 +402,28 @@ static inline void xskq_prod_cancel_n(struct xsk_queue *q, u32 cnt) q->cached_prod -= cnt; } -static inline int xskq_prod_reserve(struct xsk_queue *q) +static inline bool xsk_cq_cached_prod_nb_free(struct xsk_queue *q) { - if (xskq_prod_is_full(q)) + u32 cached_prod = atomic_read(&q->cached_prod_atomic); + u32 free_entries = q->nentries - (cached_prod - q->cached_cons); + + if (free_entries) + return true; + + /* Refresh the local tail pointer */ + q->cached_cons = READ_ONCE(q->ring->consumer); + free_entries = q->nentries - (cached_prod - q->cached_cons); + + return free_entries ? true : false; +} + +static inline int xsk_cq_cached_prod_reserve(struct xsk_queue *q) +{ + if (!xsk_cq_cached_prod_nb_free(q)) return -ENOSPC; /* A, matches D */ - q->cached_prod++; + atomic_inc(&q->cached_prod_atomic); return 0; } -- 2.41.3 From: Jason Xing Remove the spin lock protection along with some functions adjusted. Now cached_prod is fully converted to atomic, which improves the performance by around 5% over different platforms. Signed-off-by: Jason Xing --- include/net/xsk_buff_pool.h | 5 ----- net/xdp/xsk.c | 21 ++++----------------- net/xdp/xsk_buff_pool.c | 1 - 3 files changed, 4 insertions(+), 23 deletions(-) diff --git a/include/net/xsk_buff_pool.h b/include/net/xsk_buff_pool.h index 92a2358c6ce3..0b1abdb99c9e 100644 --- a/include/net/xsk_buff_pool.h +++ b/include/net/xsk_buff_pool.h @@ -90,11 +90,6 @@ struct xsk_buff_pool { * destructor callback. */ spinlock_t cq_prod_lock; - /* Mutual exclusion of the completion ring in the SKB mode. - * Protect: when sockets share a single cq when the same netdev - * and queue id is shared. - */ - spinlock_t cq_cached_prod_lock; struct xdp_buff_xsk *free_heads[]; }; diff --git a/net/xdp/xsk.c b/net/xdp/xsk.c index b63409b1422e..ae8a92c168b8 100644 --- a/net/xdp/xsk.c +++ b/net/xdp/xsk.c @@ -546,17 +546,6 @@ static int xsk_wakeup(struct xdp_sock *xs, u8 flags) return dev->netdev_ops->ndo_xsk_wakeup(dev, xs->queue_id, flags); } -static int xsk_cq_reserve_locked(struct xsk_buff_pool *pool) -{ - int ret; - - spin_lock(&pool->cq_cached_prod_lock); - ret = xsk_cq_cached_prod_reserve(pool->cq); - spin_unlock(&pool->cq_cached_prod_lock); - - return ret; -} - static void xsk_cq_submit_addr_locked(struct xsk_buff_pool *pool, struct sk_buff *skb) { @@ -585,11 +574,9 @@ static void xsk_cq_submit_addr_locked(struct xsk_buff_pool *pool, spin_unlock_irqrestore(&pool->cq_prod_lock, flags); } -static void xsk_cq_cancel_locked(struct xsk_buff_pool *pool, u32 n) +static void xsk_cq_cached_prod_cancel(struct xsk_buff_pool *pool, u32 n) { - spin_lock(&pool->cq_cached_prod_lock); atomic_sub(n, &pool->cq->cached_prod_atomic); - spin_unlock(&pool->cq_cached_prod_lock); } static void xsk_inc_num_desc(struct sk_buff *skb) @@ -643,7 +630,7 @@ static void xsk_consume_skb(struct sk_buff *skb) } skb->destructor = sock_wfree; - xsk_cq_cancel_locked(xs->pool, num_descs); + xsk_cq_cached_prod_cancel(xs->pool, num_descs); /* Free skb without triggering the perf drop trace */ consume_skb(skb); xs->skb = NULL; @@ -860,7 +847,7 @@ static struct sk_buff *xsk_build_skb(struct xdp_sock *xs, xskq_cons_release(xs->tx); } else { /* Let application retry */ - xsk_cq_cancel_locked(xs->pool, 1); + xsk_cq_cached_prod_cancel(xs->pool, 1); } return ERR_PTR(err); @@ -898,7 +885,7 @@ static int __xsk_generic_xmit(struct sock *sk) * if there is space in it. This avoids having to implement * any buffering in the Tx path. */ - err = xsk_cq_reserve_locked(xs->pool); + err = xsk_cq_cached_prod_reserve(xs->pool->cq); if (err) { err = -EAGAIN; goto out; diff --git a/net/xdp/xsk_buff_pool.c b/net/xdp/xsk_buff_pool.c index 51526034c42a..9539f121b290 100644 --- a/net/xdp/xsk_buff_pool.c +++ b/net/xdp/xsk_buff_pool.c @@ -91,7 +91,6 @@ struct xsk_buff_pool *xp_create_and_assign_umem(struct xdp_sock *xs, INIT_LIST_HEAD(&pool->xsk_tx_list); spin_lock_init(&pool->xsk_tx_list_lock); spin_lock_init(&pool->cq_prod_lock); - spin_lock_init(&pool->cq_cached_prod_lock); refcount_set(&pool->users, 1); pool->fq = xs->fq_tmp; -- 2.41.3