From: Jason Xing Add a union member for completion queue only in copy mode for now. The purpose is to replace the cq_cached_prod_lock with atomic operation to improve performance. Note that completion queue in zerocopy mode doesn't need to be converted because the whole process is lockless. Signed-off-by: Jason Xing --- net/xdp/xsk_queue.h | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/net/xdp/xsk_queue.h b/net/xdp/xsk_queue.h index 1eb8d9f8b104..44cc01555c0b 100644 --- a/net/xdp/xsk_queue.h +++ b/net/xdp/xsk_queue.h @@ -40,7 +40,11 @@ struct xdp_umem_ring { struct xsk_queue { u32 ring_mask; u32 nentries; - u32 cached_prod; + union { + u32 cached_prod; + /* Used for cq in copy mode only */ + atomic_t cached_prod_atomic; + }; u32 cached_cons; struct xdp_ring *ring; u64 invalid_descs; -- 2.41.3 From: Jason Xing No functional changes here. Add a new parameter as a prep to help completion queue in copy mode convert into atomic type in the rest of this series. The patch also keeps the unified interface. Signed-off-by: Jason Xing --- net/xdp/xsk.c | 8 ++++---- net/xdp/xsk_queue.h | 31 +++++++++++++++++++------------ 2 files changed, 23 insertions(+), 16 deletions(-) diff --git a/net/xdp/xsk.c b/net/xdp/xsk.c index bcfd400e9cf8..4e95b894f218 100644 --- a/net/xdp/xsk.c +++ b/net/xdp/xsk.c @@ -276,7 +276,7 @@ static int __xsk_rcv(struct xdp_sock *xs, struct xdp_buff *xdp, u32 len) xs->rx_dropped++; return -ENOMEM; } - if (xskq_prod_nb_free(xs->rx, num_desc) < num_desc) { + if (xskq_prod_nb_free(xs->rx, num_desc, false) < num_desc) { xs->rx_queue_full++; return -ENOBUFS; } @@ -519,7 +519,7 @@ u32 xsk_tx_peek_release_desc_batch(struct xsk_buff_pool *pool, u32 nb_pkts) * packets. This avoids having to implement any buffering in * the Tx path. */ - nb_pkts = xskq_prod_nb_free(pool->cq, nb_pkts); + nb_pkts = xskq_prod_nb_free(pool->cq, nb_pkts, false); if (!nb_pkts) goto out; @@ -551,7 +551,7 @@ static int xsk_cq_reserve_locked(struct xsk_buff_pool *pool) int ret; spin_lock(&pool->cq_cached_prod_lock); - ret = xskq_prod_reserve(pool->cq); + ret = xskq_prod_reserve(pool->cq, false); spin_unlock(&pool->cq_cached_prod_lock); return ret; @@ -588,7 +588,7 @@ static void xsk_cq_submit_addr_locked(struct xsk_buff_pool *pool, static void xsk_cq_cancel_locked(struct xsk_buff_pool *pool, u32 n) { spin_lock(&pool->cq_cached_prod_lock); - xskq_prod_cancel_n(pool->cq, n); + xskq_prod_cancel_n(pool->cq, n, false); spin_unlock(&pool->cq_cached_prod_lock); } diff --git a/net/xdp/xsk_queue.h b/net/xdp/xsk_queue.h index 44cc01555c0b..7b4d9b954584 100644 --- a/net/xdp/xsk_queue.h +++ b/net/xdp/xsk_queue.h @@ -378,37 +378,44 @@ static inline u32 xskq_get_prod(struct xsk_queue *q) return READ_ONCE(q->ring->producer); } -static inline u32 xskq_prod_nb_free(struct xsk_queue *q, u32 max) +static inline u32 xskq_prod_nb_free(struct xsk_queue *q, u32 max, bool atomic) { - u32 free_entries = q->nentries - (q->cached_prod - q->cached_cons); + u32 cached_prod = atomic ? atomic_read(&q->cached_prod_atomic) : q->cached_prod; + u32 free_entries = q->nentries - (cached_prod - q->cached_cons); if (free_entries >= max) return max; /* Refresh the local tail pointer */ q->cached_cons = READ_ONCE(q->ring->consumer); - free_entries = q->nentries - (q->cached_prod - q->cached_cons); + free_entries = q->nentries - (cached_prod - q->cached_cons); return free_entries >= max ? max : free_entries; } -static inline bool xskq_prod_is_full(struct xsk_queue *q) +static inline bool xskq_prod_is_full(struct xsk_queue *q, bool atomic) { - return xskq_prod_nb_free(q, 1) ? false : true; + return xskq_prod_nb_free(q, 1, atomic) ? false : true; } -static inline void xskq_prod_cancel_n(struct xsk_queue *q, u32 cnt) +static inline void xskq_prod_cancel_n(struct xsk_queue *q, u32 cnt, bool atomic) { - q->cached_prod -= cnt; + if (atomic) + atomic_sub(cnt, &q->cached_prod_atomic); + else + q->cached_prod -= cnt; } -static inline int xskq_prod_reserve(struct xsk_queue *q) +static inline int xskq_prod_reserve(struct xsk_queue *q, bool atomic) { - if (xskq_prod_is_full(q)) + if (xskq_prod_is_full(q, atomic)) return -ENOSPC; /* A, matches D */ - q->cached_prod++; + if (atomic) + atomic_inc(&q->cached_prod_atomic); + else + q->cached_prod++; return 0; } @@ -416,7 +423,7 @@ static inline int xskq_prod_reserve_addr(struct xsk_queue *q, u64 addr) { struct xdp_umem_ring *ring = (struct xdp_umem_ring *)q->ring; - if (xskq_prod_is_full(q)) + if (xskq_prod_is_full(q, false)) return -ENOSPC; /* A, matches D */ @@ -450,7 +457,7 @@ static inline int xskq_prod_reserve_desc(struct xsk_queue *q, struct xdp_rxtx_ring *ring = (struct xdp_rxtx_ring *)q->ring; u32 idx; - if (xskq_prod_is_full(q)) + if (xskq_prod_is_full(q, false)) return -ENOBUFS; /* A, matches D */ -- 2.41.3 From: Jason Xing Now it's time to convert cq in generic path into atomic operations to achieve a higher performance number. I managed to see it improve around 5% over different platforms. Signed-off-by: Jason Xing --- include/net/xsk_buff_pool.h | 5 ----- net/xdp/xsk.c | 12 ++---------- net/xdp/xsk_buff_pool.c | 1 - 3 files changed, 2 insertions(+), 16 deletions(-) diff --git a/include/net/xsk_buff_pool.h b/include/net/xsk_buff_pool.h index 92a2358c6ce3..0b1abdb99c9e 100644 --- a/include/net/xsk_buff_pool.h +++ b/include/net/xsk_buff_pool.h @@ -90,11 +90,6 @@ struct xsk_buff_pool { * destructor callback. */ spinlock_t cq_prod_lock; - /* Mutual exclusion of the completion ring in the SKB mode. - * Protect: when sockets share a single cq when the same netdev - * and queue id is shared. - */ - spinlock_t cq_cached_prod_lock; struct xdp_buff_xsk *free_heads[]; }; diff --git a/net/xdp/xsk.c b/net/xdp/xsk.c index 4e95b894f218..6b99a7eeb952 100644 --- a/net/xdp/xsk.c +++ b/net/xdp/xsk.c @@ -548,13 +548,7 @@ static int xsk_wakeup(struct xdp_sock *xs, u8 flags) static int xsk_cq_reserve_locked(struct xsk_buff_pool *pool) { - int ret; - - spin_lock(&pool->cq_cached_prod_lock); - ret = xskq_prod_reserve(pool->cq, false); - spin_unlock(&pool->cq_cached_prod_lock); - - return ret; + return xskq_prod_reserve(pool->cq, true); } static void xsk_cq_submit_addr_locked(struct xsk_buff_pool *pool, @@ -587,9 +581,7 @@ static void xsk_cq_submit_addr_locked(struct xsk_buff_pool *pool, static void xsk_cq_cancel_locked(struct xsk_buff_pool *pool, u32 n) { - spin_lock(&pool->cq_cached_prod_lock); - xskq_prod_cancel_n(pool->cq, n, false); - spin_unlock(&pool->cq_cached_prod_lock); + xskq_prod_cancel_n(pool->cq, n, true); } static void xsk_inc_num_desc(struct sk_buff *skb) diff --git a/net/xdp/xsk_buff_pool.c b/net/xdp/xsk_buff_pool.c index 51526034c42a..9539f121b290 100644 --- a/net/xdp/xsk_buff_pool.c +++ b/net/xdp/xsk_buff_pool.c @@ -91,7 +91,6 @@ struct xsk_buff_pool *xp_create_and_assign_umem(struct xdp_sock *xs, INIT_LIST_HEAD(&pool->xsk_tx_list); spin_lock_init(&pool->xsk_tx_list_lock); spin_lock_init(&pool->cq_prod_lock); - spin_lock_init(&pool->cq_cached_prod_lock); refcount_set(&pool->users, 1); pool->fq = xs->fq_tmp; -- 2.41.3