The wrappers tun_ring_consume_batched/tap_ring_consume_batched are similar to the wrappers tun_ring_consume/tap_ring_consume. They deal with consuming a batch of entries of the ptr_ring and then waking the netdev queue whenever entries get invalidated to be used again by the producer. To avoid waking the netdev queue when the ptr_ring is full, it is checked if the netdev queue is stopped before invalidating entries. Like that the netdev queue can be safely woken after invalidating entries. The READ_ONCE in __ptr_ring_peek, paired with the smp_wmb() in __ptr_ring_produce within tun_net_xmit guarantees that the information about the netdev queue being stopped is visible after __ptr_ring_peek is called. Co-developed-by: Tim Gebauer Signed-off-by: Tim Gebauer Signed-off-by: Simon Schippers --- drivers/net/tap.c | 52 ++++++++++++++++++++++++++++++++++++++++ drivers/net/tun.c | 54 ++++++++++++++++++++++++++++++++++++++++++ include/linux/if_tap.h | 6 +++++ include/linux/if_tun.h | 7 ++++++ 4 files changed, 119 insertions(+) diff --git a/drivers/net/tap.c b/drivers/net/tap.c index f8292721a9d6..651d48612329 100644 --- a/drivers/net/tap.c +++ b/drivers/net/tap.c @@ -1216,6 +1216,58 @@ struct socket *tap_get_socket(struct file *file) } EXPORT_SYMBOL_GPL(tap_get_socket); +int tap_ring_consume_batched(struct file *file, + void **array, int n) +{ + struct tap_queue *q = file->private_data; + struct netdev_queue *txq; + struct net_device *dev; + bool will_invalidate; + bool stopped; + void *ptr; + int i; + + spin_lock(&q->ring.consumer_lock); + ptr = __ptr_ring_peek(&q->ring); + + if (!ptr) { + spin_unlock(&q->ring.consumer_lock); + return 0; + } + + i = 0; + do { + /* Check if the queue stopped before zeroing out, so no + * ptr get produced in the meantime, because this could + * result in waking even though the ptr_ring is full. + * The order of the operations is ensured by barrier(). + */ + will_invalidate = __ptr_ring_will_invalidate(&q->ring); + if (unlikely(will_invalidate)) { + rcu_read_lock(); + dev = rcu_dereference(q->tap)->dev; + txq = netdev_get_tx_queue(dev, q->queue_index); + stopped = netif_tx_queue_stopped(txq); + } + barrier(); + __ptr_ring_discard_one(&q->ring, will_invalidate); + + if (unlikely(will_invalidate)) { + if (stopped) + netif_tx_wake_queue(txq); + rcu_read_unlock(); + } + + array[i++] = ptr; + if (i >= n) + break; + } while ((ptr = __ptr_ring_peek(&q->ring))); + spin_unlock(&q->ring.consumer_lock); + + return i; +} +EXPORT_SYMBOL_GPL(tap_ring_consume_batched); + struct ptr_ring *tap_get_ptr_ring(struct file *file) { struct tap_queue *q; diff --git a/drivers/net/tun.c b/drivers/net/tun.c index 682df8157b55..7566b22780fb 100644 --- a/drivers/net/tun.c +++ b/drivers/net/tun.c @@ -3759,6 +3759,60 @@ struct socket *tun_get_socket(struct file *file) } EXPORT_SYMBOL_GPL(tun_get_socket); +int tun_ring_consume_batched(struct file *file, + void **array, int n) +{ + struct tun_file *tfile = file->private_data; + struct netdev_queue *txq; + struct net_device *dev; + bool will_invalidate; + bool stopped; + void *ptr; + int i; + + spin_lock(&tfile->tx_ring.consumer_lock); + ptr = __ptr_ring_peek(&tfile->tx_ring); + + if (!ptr) { + spin_unlock(&tfile->tx_ring.consumer_lock); + return 0; + } + + i = 0; + do { + /* Check if the queue stopped before zeroing out, so no + * ptr get produced in the meantime, because this could + * result in waking even though the ptr_ring is full. + * The order of the operations is ensured by barrier(). + */ + will_invalidate = + __ptr_ring_will_invalidate(&tfile->tx_ring); + if (unlikely(will_invalidate)) { + rcu_read_lock(); + dev = rcu_dereference(tfile->tun)->dev; + txq = netdev_get_tx_queue(dev, + tfile->queue_index); + stopped = netif_tx_queue_stopped(txq); + } + barrier(); + __ptr_ring_discard_one(&tfile->tx_ring, will_invalidate); + + if (unlikely(will_invalidate)) { + if (stopped) + netif_tx_wake_queue(txq); + rcu_read_unlock(); + } + + array[i++] = ptr; + if (i >= n) + break; + } while ((ptr = __ptr_ring_peek(&tfile->tx_ring))); + spin_unlock(&tfile->tx_ring.consumer_lock); + + return i; +} +EXPORT_SYMBOL_GPL(tun_ring_consume_batched); + struct ptr_ring *tun_get_tx_ring(struct file *file) { struct tun_file *tfile; diff --git a/include/linux/if_tap.h b/include/linux/if_tap.h index 553552fa635c..2e5542d6aef4 100644 --- a/include/linux/if_tap.h +++ b/include/linux/if_tap.h @@ -11,6 +11,7 @@ struct socket; #if IS_ENABLED(CONFIG_TAP) struct socket *tap_get_socket(struct file *); struct ptr_ring *tap_get_ptr_ring(struct file *file); +int tap_ring_consume_batched(struct file *file, void **array, int n); #else #include #include @@ -22,6 +23,11 @@ static inline struct ptr_ring *tap_get_ptr_ring(struct file *f) { return ERR_PTR(-EINVAL); } +static inline int tap_ring_consume_batched(struct file *f, + void **array, int n) +{ + return 0; +} #endif /* CONFIG_TAP */ /* diff --git a/include/linux/if_tun.h b/include/linux/if_tun.h index 80166eb62f41..5b41525ac007 100644 --- a/include/linux/if_tun.h +++ b/include/linux/if_tun.h @@ -22,6 +22,7 @@ struct tun_msg_ctl { #if defined(CONFIG_TUN) || defined(CONFIG_TUN_MODULE) struct socket *tun_get_socket(struct file *); struct ptr_ring *tun_get_tx_ring(struct file *file); +int tun_ring_consume_batched(struct file *file, void **array, int n); static inline bool tun_is_xdp_frame(void *ptr) { @@ -55,6 +56,12 @@ static inline struct ptr_ring *tun_get_tx_ring(struct file *f) return ERR_PTR(-EINVAL); } +static inline int tun_ring_consume_batched(struct file *file, + void **array, int n) +{ + return 0; +} + static inline bool tun_is_xdp_frame(void *ptr) { return false; -- 2.43.0