Introduce tun_ring_consume() that wraps ptr_ring_consume() and calls __tun_wake_queue(). The latter wakes the stopped netdev subqueue once half of the ring capacity has been consumed, tracked via the new cons_cnt field in tun_file. As a safety net, the queue is also woken on the last consumed entry if it leaves the ring empty. The point is to allow the queue to be stopped when it gets full, which is required for traffic shaping - implemented by the following "avoid ptr_ring tail-drop when a qdisc is present". Some implementation details: - tun_ring_recv() replaces ptr_ring_consume() with tun_ring_consume() to properly wake the queue on purge. - tun_queue_purge() also replaces ptr_ring_consume() with tun_ring_consume(). - __tun_detach() locks the tx_ring.consumer_lock to avoid races with the consumer on the queue_index. - Reset cons_cnt in tun_attach() so the half-ring wake threshold is valid for the new ring size after ptr_ring_resize(). - The upcoming patch explains the pairing of the smp_mb() of __tun_wake_queue(). - tun_queue_resize() wakes all queues after resizing with the proper tx_ring.consumer_lock and resets the cons_cnt to avoid a possible stale queue. Without the corresponding queue stopping, this patch alone causes no regression for a tap setup sending to a qemu VM: 1.132 Mpps to 1.134 Mpps. Details: AMD Ryzen 5 5600X at 4.3 GHz, 3200 MHz RAM, isolated QEMU threads, pktgen sender; Avg over 50 runs @ 100,000,000 packets; SRSO and spectre v2 mitigations disabled. Co-developed-by: Tim Gebauer Signed-off-by: Tim Gebauer Signed-off-by: Simon Schippers --- drivers/net/tun.c | 73 +++++++++++++++++++++++++++++++++++++++++------ 1 file changed, 64 insertions(+), 9 deletions(-) diff --git a/drivers/net/tun.c b/drivers/net/tun.c index b183189f1853..b24cc899a890 100644 --- a/drivers/net/tun.c +++ b/drivers/net/tun.c @@ -145,6 +145,8 @@ struct tun_file { struct list_head next; struct tun_struct *detached; struct ptr_ring tx_ring; + /* Protected by tx_ring.consumer_lock */ + int cons_cnt; struct xdp_rxq_info xdp_rxq; }; @@ -557,11 +559,43 @@ void tun_ptr_free(void *ptr) } EXPORT_SYMBOL_GPL(tun_ptr_free); -static void tun_queue_purge(struct tun_file *tfile) +/* Callers must hold ring.consumer_lock */ +static void __tun_wake_queue(struct tun_struct *tun, + struct tun_file *tfile, int consumed) +{ + struct netdev_queue *txq = netdev_get_tx_queue(tun->dev, + tfile->queue_index); + + /* Paired with smp_mb__after_atomic() in tun_net_xmit() */ + smp_mb(); + if (netif_tx_queue_stopped(txq)) { + tfile->cons_cnt += consumed; + if (tfile->cons_cnt >= tfile->tx_ring.size / 2 || + __ptr_ring_empty(&tfile->tx_ring)) { + netif_tx_wake_queue(txq); + tfile->cons_cnt = 0; + } + } +} + +static void *tun_ring_consume(struct tun_struct *tun, struct tun_file *tfile) +{ + void *ptr; + + spin_lock(&tfile->tx_ring.consumer_lock); + ptr = __ptr_ring_consume(&tfile->tx_ring); + if (ptr) + __tun_wake_queue(tun, tfile, 1); + + spin_unlock(&tfile->tx_ring.consumer_lock); + return ptr; +} + +static void tun_queue_purge(struct tun_struct *tun, struct tun_file *tfile) { void *ptr; - while ((ptr = ptr_ring_consume(&tfile->tx_ring)) != NULL) + while ((ptr = tun_ring_consume(tun, tfile)) != NULL) tun_ptr_free(ptr); skb_queue_purge(&tfile->sk.sk_write_queue); @@ -588,8 +622,10 @@ static void __tun_detach(struct tun_file *tfile, bool clean) rcu_assign_pointer(tun->tfiles[index], tun->tfiles[tun->numqueues - 1]); ntfile = rtnl_dereference(tun->tfiles[index]); + spin_lock(&ntfile->tx_ring.consumer_lock); ntfile->queue_index = index; ntfile->xdp_rxq.queue_index = index; + spin_unlock(&ntfile->tx_ring.consumer_lock); rcu_assign_pointer(tun->tfiles[tun->numqueues - 1], NULL); @@ -605,7 +641,7 @@ static void __tun_detach(struct tun_file *tfile, bool clean) synchronize_net(); tun_flow_delete_by_queue(tun, tun->numqueues + 1); /* Drop read queue */ - tun_queue_purge(tfile); + tun_queue_purge(tun, tfile); tun_set_real_num_queues(tun); } else if (tfile->detached && clean) { tun = tun_enable_queue(tfile); @@ -670,14 +706,14 @@ static void tun_detach_all(struct net_device *dev) tfile = rtnl_dereference(tun->tfiles[i]); tun_napi_del(tfile); /* Drop read queue */ - tun_queue_purge(tfile); + tun_queue_purge(tun, tfile); xdp_rxq_info_unreg(&tfile->xdp_rxq); sock_put(&tfile->sk); } list_for_each_entry_safe(tfile, tmp, &tun->disabled, next) { tun_napi_del(tfile); tun_enable_queue(tfile); - tun_queue_purge(tfile); + tun_queue_purge(tun, tfile); xdp_rxq_info_unreg(&tfile->xdp_rxq); sock_put(&tfile->sk); } @@ -687,6 +723,13 @@ static void tun_detach_all(struct net_device *dev) module_put(THIS_MODULE); } +static void tun_reset_cons_cnt(struct tun_file *tfile) +{ + spin_lock(&tfile->tx_ring.consumer_lock); + tfile->cons_cnt = 0; + spin_unlock(&tfile->tx_ring.consumer_lock); +} + static int tun_attach(struct tun_struct *tun, struct file *file, bool skip_filter, bool napi, bool napi_frags, bool publish_tun) @@ -730,6 +773,7 @@ static int tun_attach(struct tun_struct *tun, struct file *file, goto out; } + tun_reset_cons_cnt(tfile); tfile->queue_index = tun->numqueues; tfile->socket.sk->sk_shutdown &= ~RCV_SHUTDOWN; @@ -2115,13 +2159,14 @@ static ssize_t tun_put_user(struct tun_struct *tun, return total; } -static void *tun_ring_recv(struct tun_file *tfile, int noblock, int *err) +static void *tun_ring_recv(struct tun_struct *tun, struct tun_file *tfile, + int noblock, int *err) { DECLARE_WAITQUEUE(wait, current); void *ptr = NULL; int error = 0; - ptr = ptr_ring_consume(&tfile->tx_ring); + ptr = tun_ring_consume(tun, tfile); if (ptr) goto out; if (noblock) { @@ -2133,7 +2178,7 @@ static void *tun_ring_recv(struct tun_file *tfile, int noblock, int *err) while (1) { set_current_state(TASK_INTERRUPTIBLE); - ptr = ptr_ring_consume(&tfile->tx_ring); + ptr = tun_ring_consume(tun, tfile); if (ptr) break; if (signal_pending(current)) { @@ -2170,7 +2215,7 @@ static ssize_t tun_do_read(struct tun_struct *tun, struct tun_file *tfile, if (!ptr) { /* Read frames from ring */ - ptr = tun_ring_recv(tfile, noblock, &err); + ptr = tun_ring_recv(tun, tfile, noblock, &err); if (!ptr) return err; } @@ -3622,6 +3667,16 @@ static int tun_queue_resize(struct tun_struct *tun) dev->tx_queue_len, GFP_KERNEL, tun_ptr_free); + if (!ret) { + for (i = 0; i < tun->numqueues; i++) { + tfile = rtnl_dereference(tun->tfiles[i]); + spin_lock(&tfile->tx_ring.consumer_lock); + netif_wake_subqueue(tun->dev, tfile->queue_index); + tfile->cons_cnt = 0; + spin_unlock(&tfile->tx_ring.consumer_lock); + } + } + kfree(rings); return ret; } -- 2.43.0 Add tun_wake_queue() to tun.c and export it for use by vhost-net. The function validates that the file belongs to a tun/tap device and that the tfile exists, dereferences the tun_struct under RCU, and delegates to __tun_wake_queue(). vhost_net_buf_produce() now calls tun_wake_queue() after a successful batched consume of the ring to allow the netdev subqueue to be woken up. The point is to allow the queue to be stopped when it gets full, which is required for traffic shaping - implemented by the following "avoid ptr_ring tail-drop when a qdisc is present". Without the corresponding queue stopping, this patch alone causes no throughput regression for a tap+vhost-net setup sending to a qemu VM: 3.857 Mpps to 3.891 Mpps. Details: AMD Ryzen 5 5600X at 4.3 GHz, 3200 MHz RAM, isolated QEMU threads, XDP drop program active in VM, pktgen sender; Avg over 50 runs @ 100,000,000 packets. SRSO and spectre v2 mitigations disabled. Co-developed-by: Tim Gebauer Signed-off-by: Tim Gebauer Signed-off-by: Simon Schippers --- drivers/net/tun.c | 23 +++++++++++++++++++++++ drivers/vhost/net.c | 21 +++++++++++++++------ include/linux/if_tun.h | 3 +++ 3 files changed, 41 insertions(+), 6 deletions(-) diff --git a/drivers/net/tun.c b/drivers/net/tun.c index b24cc899a890..4ee1ed6e815a 100644 --- a/drivers/net/tun.c +++ b/drivers/net/tun.c @@ -3785,6 +3785,29 @@ struct ptr_ring *tun_get_tx_ring(struct file *file) } EXPORT_SYMBOL_GPL(tun_get_tx_ring); +/* Callers must hold ring.consumer_lock */ +void tun_wake_queue(struct file *file, int consumed) +{ + struct tun_file *tfile; + struct tun_struct *tun; + + if (file->f_op != &tun_fops) + return; + + tfile = file->private_data; + if (!tfile) + return; + + rcu_read_lock(); + + tun = rcu_dereference(tfile->tun); + if (tun) + __tun_wake_queue(tun, tfile, consumed); + + rcu_read_unlock(); +} +EXPORT_SYMBOL_GPL(tun_wake_queue); + module_init(tun_init); module_exit(tun_cleanup); MODULE_DESCRIPTION(DRV_DESCRIPTION); diff --git a/drivers/vhost/net.c b/drivers/vhost/net.c index 80965181920c..ee583d6cc0fa 100644 --- a/drivers/vhost/net.c +++ b/drivers/vhost/net.c @@ -176,13 +176,21 @@ static void *vhost_net_buf_consume(struct vhost_net_buf *rxq) return ret; } -static int vhost_net_buf_produce(struct vhost_net_virtqueue *nvq) +static int vhost_net_buf_produce(struct sock *sk, + struct vhost_net_virtqueue *nvq) { + struct file *file = sk->sk_socket->file; struct vhost_net_buf *rxq = &nvq->rxq; rxq->head = 0; - rxq->tail = ptr_ring_consume_batched(nvq->rx_ring, rxq->queue, - VHOST_NET_BATCH); + spin_lock(&nvq->rx_ring->consumer_lock); + rxq->tail = __ptr_ring_consume_batched(nvq->rx_ring, rxq->queue, + VHOST_NET_BATCH); + + if (rxq->tail) + tun_wake_queue(file, rxq->tail); + + spin_unlock(&nvq->rx_ring->consumer_lock); return rxq->tail; } @@ -209,14 +217,15 @@ static int vhost_net_buf_peek_len(void *ptr) return __skb_array_len_with_tag(ptr); } -static int vhost_net_buf_peek(struct vhost_net_virtqueue *nvq) +static int vhost_net_buf_peek(struct sock *sk, + struct vhost_net_virtqueue *nvq) { struct vhost_net_buf *rxq = &nvq->rxq; if (!vhost_net_buf_is_empty(rxq)) goto out; - if (!vhost_net_buf_produce(nvq)) + if (!vhost_net_buf_produce(sk, nvq)) return 0; out: @@ -995,7 +1004,7 @@ static int peek_head_len(struct vhost_net_virtqueue *rvq, struct sock *sk) unsigned long flags; if (rvq->rx_ring) - return vhost_net_buf_peek(rvq); + return vhost_net_buf_peek(sk, rvq); spin_lock_irqsave(&sk->sk_receive_queue.lock, flags); head = skb_peek(&sk->sk_receive_queue); diff --git a/include/linux/if_tun.h b/include/linux/if_tun.h index 80166eb62f41..5f3e206c7a73 100644 --- a/include/linux/if_tun.h +++ b/include/linux/if_tun.h @@ -22,6 +22,7 @@ struct tun_msg_ctl { #if defined(CONFIG_TUN) || defined(CONFIG_TUN_MODULE) struct socket *tun_get_socket(struct file *); struct ptr_ring *tun_get_tx_ring(struct file *file); +void tun_wake_queue(struct file *file, int consumed); static inline bool tun_is_xdp_frame(void *ptr) { @@ -55,6 +56,8 @@ static inline struct ptr_ring *tun_get_tx_ring(struct file *f) return ERR_PTR(-EINVAL); } +static inline void tun_wake_queue(struct file *f, int consumed) {} + static inline bool tun_is_xdp_frame(void *ptr) { return false; -- 2.43.0 This patch moves the check for available free space for a new entry into a separate function. Existing callers that only check for a non-zero return value are unaffected; __ptr_ring_produce() now returns -EINVAL for a zero-size ring and -ENOSPC when full, whereas before both cases returned -ENOSPC. The new helper allows callers to determine in advance whether subsequent __ptr_ring_produce() calls will succeed. This information can, for example, be used to temporarily stop producing until __ptr_ring_check_produce() indicates that space is available again. Co-developed-by: Tim Gebauer Signed-off-by: Tim Gebauer Signed-off-by: Simon Schippers --- include/linux/ptr_ring.h | 20 ++++++++++++++++++-- 1 file changed, 18 insertions(+), 2 deletions(-) diff --git a/include/linux/ptr_ring.h b/include/linux/ptr_ring.h index d2c3629bbe45..c95e891903f0 100644 --- a/include/linux/ptr_ring.h +++ b/include/linux/ptr_ring.h @@ -96,6 +96,20 @@ static inline bool ptr_ring_full_bh(struct ptr_ring *r) return ret; } +/* Note: callers invoking this in a loop must use a compiler barrier, + * for example cpu_relax(). Callers must hold producer_lock. + */ +static inline int __ptr_ring_check_produce(struct ptr_ring *r) +{ + if (unlikely(!r->size)) + return -EINVAL; + + if (data_race(r->queue[r->producer])) + return -ENOSPC; + + return 0; +} + /* Note: callers invoking this in a loop must use a compiler barrier, * for example cpu_relax(). Callers must hold producer_lock. * Callers are responsible for making sure pointer that is being queued @@ -103,8 +117,10 @@ static inline bool ptr_ring_full_bh(struct ptr_ring *r) */ static inline int __ptr_ring_produce(struct ptr_ring *r, void *ptr) { - if (unlikely(!r->size) || data_race(r->queue[r->producer])) - return -ENOSPC; + int p = __ptr_ring_check_produce(r); + + if (p) + return p; /* Make sure the pointer we are storing points to a valid data. */ /* Pairs with the dependency ordering in __ptr_ring_consume. */ -- 2.43.0 This commit prevents tail-drop when a qdisc is present and the ptr_ring becomes full. Once the ring reaches capacity after a produce attempt, the netdev queue is stopped instead of dropping subsequent packets. If no qdisc is present, the previous tail-drop behavior is preserved. If producing an entry fails anyway due to a race, tun_net_xmit() drops the packet. Such races are expected because LLTX is enabled and the transmit path operates without the usual locking. The __tun_wake_queue() function of the consumer races with the producer for waking/stopping the netdev queue, which could result in a stalled queue. Therefore, an smp_mb__after_atomic() is introduced that pairs with the smp_mb() of the consumer. It follows the principle of store buffering described in tools/memory-model/Documentation/recipes.txt: - The producer in tun_net_xmit() first sets __QUEUE_STATE_DRV_XOFF, followed by an smp_mb__after_atomic() (= smp_mb()), and then reads the ring with __ptr_ring_check_produce(). - The consumer in __tun_wake_queue() first writes zero to the ring in __ptr_ring_consume(), followed by an smp_mb(), and then reads the queue status with netif_tx_queue_stopped(). => Following the aforementioned principle, it is impossible for the producer to see a full ring (and therefore not wake the queue on the re-check) while the consumer simultaneously fails to see a stopped queue (and therefore also does not wake it). Benchmarks: The benchmarks show a slight regression in raw transmission performance when using two sending threads. Packet loss also occurs only in the two-thread sending case; no packet loss was observed with a single sending thread. Test setup: AMD Ryzen 5 5600X at 4.3 GHz, 3200 MHz RAM, isolated QEMU threads; Average over 50 runs @ 100,000,000 packets. SRSO and spectre v2 mitigations disabled. Note for tap+vhost-net: XDP drop program active in VM -> ~2.5x faster; slower for tap due to more syscalls (high utilization of entry_SYSRETQ_unsafe_stack in perf) +--------------------------+--------------+----------------+----------+ | 1 thread | Stock | Patched with | diff | | sending | | fq_codel qdisc | | +------------+-------------+--------------+----------------+----------+ | TAP | Received | 1.132 Mpps | 1.123 Mpps | -0.8% | | +-------------+--------------+----------------+----------+ | | Lost/s | 3.765 Mpps | 0 pps | | +------------+-------------+--------------+----------------+----------+ | TAP | Received | 3.857 Mpps | 3.901 Mpps | +1.1% | | +-------------+--------------+----------------+----------+ | +vhost-net | Lost/s | 0.802 Mpps | 0 pps | | +------------+-------------+--------------+----------------+----------+ +--------------------------+--------------+----------------+----------+ | 2 threads | Stock | Patched with | diff | | sending | | fq_codel qdisc | | +------------+-------------+--------------+----------------+----------+ | TAP | Received | 1.115 Mpps | 1.081 Mpps | -3.0% | | +-------------+--------------+----------------+----------+ | | Lost/s | 8.490 Mpps | 391 pps | | +------------+-------------+--------------+----------------+----------+ | TAP | Received | 3.664 Mpps | 3.555 Mpps | -3.0% | | +-------------+--------------+----------------+----------+ | +vhost-net | Lost/s | 5.330 Mpps | 938 pps | | +------------+-------------+--------------+----------------+----------+ Co-developed-by: Tim Gebauer Signed-off-by: Tim Gebauer Signed-off-by: Simon Schippers --- drivers/net/tun.c | 25 +++++++++++++++++++++++-- 1 file changed, 23 insertions(+), 2 deletions(-) diff --git a/drivers/net/tun.c b/drivers/net/tun.c index 4ee1ed6e815a..e56358878c36 100644 --- a/drivers/net/tun.c +++ b/drivers/net/tun.c @@ -1052,6 +1052,7 @@ static netdev_tx_t tun_net_xmit(struct sk_buff *skb, struct net_device *dev) struct netdev_queue *queue; struct tun_file *tfile; int len = skb->len; + int ret; rcu_read_lock(); tfile = rcu_dereference(tun->tfiles[txq]); @@ -1106,13 +1107,33 @@ static netdev_tx_t tun_net_xmit(struct sk_buff *skb, struct net_device *dev) nf_reset_ct(skb); - if (ptr_ring_produce(&tfile->tx_ring, skb)) { + queue = netdev_get_tx_queue(dev, txq); + + spin_lock(&tfile->tx_ring.producer_lock); + ret = __ptr_ring_produce(&tfile->tx_ring, skb); + if (!qdisc_txq_has_no_queue(queue) && + __ptr_ring_check_produce(&tfile->tx_ring) == -ENOSPC) { + netif_tx_stop_queue(queue); + /* Paired with smp_mb() in __tun_wake_queue() */ + smp_mb__after_atomic(); + if (!__ptr_ring_check_produce(&tfile->tx_ring)) + netif_tx_wake_queue(queue); + } + spin_unlock(&tfile->tx_ring.producer_lock); + + if (ret) { + /* This should be a rare case if a qdisc is present, but + * can happen due to lltx. + * Since skb_tx_timestamp(), skb_orphan(), + * run_ebpf_filter() and pskb_trim() could have tinkered + * with the SKB, returning NETDEV_TX_BUSY is unsafe and + * we must drop instead. + */ drop_reason = SKB_DROP_REASON_FULL_RING; goto drop; } /* dev->lltx requires to do our own update of trans_start */ - queue = netdev_get_tx_queue(dev, txq); txq_trans_cond_update(queue); /* Notify and wake up reader process */ -- 2.43.0