From: Jason Xing No functional changes here. Add a new parameter as a prep to help completion queue in copy mode convert into atomic type in the rest of this series. The patch also keeps the unified interface. Signed-off-by: Jason Xing --- net/xdp/xsk.c | 8 ++++---- net/xdp/xsk_queue.h | 31 +++++++++++++++++++------------ 2 files changed, 23 insertions(+), 16 deletions(-) diff --git a/net/xdp/xsk.c b/net/xdp/xsk.c index bcfd400e9cf8..4e95b894f218 100644 --- a/net/xdp/xsk.c +++ b/net/xdp/xsk.c @@ -276,7 +276,7 @@ static int __xsk_rcv(struct xdp_sock *xs, struct xdp_buff *xdp, u32 len) xs->rx_dropped++; return -ENOMEM; } - if (xskq_prod_nb_free(xs->rx, num_desc) < num_desc) { + if (xskq_prod_nb_free(xs->rx, num_desc, false) < num_desc) { xs->rx_queue_full++; return -ENOBUFS; } @@ -519,7 +519,7 @@ u32 xsk_tx_peek_release_desc_batch(struct xsk_buff_pool *pool, u32 nb_pkts) * packets. This avoids having to implement any buffering in * the Tx path. */ - nb_pkts = xskq_prod_nb_free(pool->cq, nb_pkts); + nb_pkts = xskq_prod_nb_free(pool->cq, nb_pkts, false); if (!nb_pkts) goto out; @@ -551,7 +551,7 @@ static int xsk_cq_reserve_locked(struct xsk_buff_pool *pool) int ret; spin_lock(&pool->cq_cached_prod_lock); - ret = xskq_prod_reserve(pool->cq); + ret = xskq_prod_reserve(pool->cq, false); spin_unlock(&pool->cq_cached_prod_lock); return ret; @@ -588,7 +588,7 @@ static void xsk_cq_submit_addr_locked(struct xsk_buff_pool *pool, static void xsk_cq_cancel_locked(struct xsk_buff_pool *pool, u32 n) { spin_lock(&pool->cq_cached_prod_lock); - xskq_prod_cancel_n(pool->cq, n); + xskq_prod_cancel_n(pool->cq, n, false); spin_unlock(&pool->cq_cached_prod_lock); } diff --git a/net/xdp/xsk_queue.h b/net/xdp/xsk_queue.h index 44cc01555c0b..7b4d9b954584 100644 --- a/net/xdp/xsk_queue.h +++ b/net/xdp/xsk_queue.h @@ -378,37 +378,44 @@ static inline u32 xskq_get_prod(struct xsk_queue *q) return READ_ONCE(q->ring->producer); } -static inline u32 xskq_prod_nb_free(struct xsk_queue *q, u32 max) +static inline u32 xskq_prod_nb_free(struct xsk_queue *q, u32 max, bool atomic) { - u32 free_entries = q->nentries - (q->cached_prod - q->cached_cons); + u32 cached_prod = atomic ? atomic_read(&q->cached_prod_atomic) : q->cached_prod; + u32 free_entries = q->nentries - (cached_prod - q->cached_cons); if (free_entries >= max) return max; /* Refresh the local tail pointer */ q->cached_cons = READ_ONCE(q->ring->consumer); - free_entries = q->nentries - (q->cached_prod - q->cached_cons); + free_entries = q->nentries - (cached_prod - q->cached_cons); return free_entries >= max ? max : free_entries; } -static inline bool xskq_prod_is_full(struct xsk_queue *q) +static inline bool xskq_prod_is_full(struct xsk_queue *q, bool atomic) { - return xskq_prod_nb_free(q, 1) ? false : true; + return xskq_prod_nb_free(q, 1, atomic) ? false : true; } -static inline void xskq_prod_cancel_n(struct xsk_queue *q, u32 cnt) +static inline void xskq_prod_cancel_n(struct xsk_queue *q, u32 cnt, bool atomic) { - q->cached_prod -= cnt; + if (atomic) + atomic_sub(cnt, &q->cached_prod_atomic); + else + q->cached_prod -= cnt; } -static inline int xskq_prod_reserve(struct xsk_queue *q) +static inline int xskq_prod_reserve(struct xsk_queue *q, bool atomic) { - if (xskq_prod_is_full(q)) + if (xskq_prod_is_full(q, atomic)) return -ENOSPC; /* A, matches D */ - q->cached_prod++; + if (atomic) + atomic_inc(&q->cached_prod_atomic); + else + q->cached_prod++; return 0; } @@ -416,7 +423,7 @@ static inline int xskq_prod_reserve_addr(struct xsk_queue *q, u64 addr) { struct xdp_umem_ring *ring = (struct xdp_umem_ring *)q->ring; - if (xskq_prod_is_full(q)) + if (xskq_prod_is_full(q, false)) return -ENOSPC; /* A, matches D */ @@ -450,7 +457,7 @@ static inline int xskq_prod_reserve_desc(struct xsk_queue *q, struct xdp_rxtx_ring *ring = (struct xdp_rxtx_ring *)q->ring; u32 idx; - if (xskq_prod_is_full(q)) + if (xskq_prod_is_full(q, false)) return -ENOBUFS; /* A, matches D */ -- 2.41.3