After consuming the last visible record, ringbuf_process_ring() publishes the consumer position and checks the producer position. These operations lack a full StoreLoad barrier. A producer can therefore commit a new record but read the old consumer position while the consumer reads the old producer position. The producer sends no notification and the consumer waits despite a queued record. Insert a full barrier between publishing a consumer position and the next producer position load. When a record bound or callback ends the current invocation first, execute the barrier before returning so the load in a later invocation completes the same handshake. Add an edge-triggered epoll test that drains one record per call while a concurrent producer fills the ring. Without the barrier, a missed notification leaves the producer dropping records from a full ring while the consumer times out. Document that bounded consumers and callbacks that terminate consumption must drain before waiting again. Fixes: bf99c936f947 ("libbpf: Add BPF ring buffer support") Reported-by: Andrew Werner Reported-by: Sashiko Closes: https://lore.kernel.org/bpf/20260614015716.945AF1F000E9@smtp.kernel.org/ Assisted-by: Codex:gpt-5.5 Signed-off-by: Tamir Duberstein --- tools/lib/bpf/libbpf.h | 23 +++++++ tools/lib/bpf/ringbuf.c | 24 +++++-- tools/testing/selftests/bpf/prog_tests/ringbuf.c | 83 ++++++++++++++++++++++++ 3 files changed, 123 insertions(+), 7 deletions(-) diff --git a/tools/lib/bpf/libbpf.h b/tools/lib/bpf/libbpf.h index ae46b17feaa6..3a649ed87034 100644 --- a/tools/lib/bpf/libbpf.h +++ b/tools/lib/bpf/libbpf.h @@ -1440,6 +1440,11 @@ struct ring; struct user_ring_buffer; /* Callback-based consumption is unsupported for BPF_F_RB_OVERWRITE maps. */ +/* + * A negative return stops consumption; non-negative values continue. Stopping + * can leave records queued without a new readiness notification. Before + * waiting for readiness again, consume until no records remain. + */ typedef int (*ring_buffer_sample_fn)(void *ctx, void *data, size_t size); struct ring_buffer_opts { @@ -1456,6 +1461,20 @@ LIBBPF_API int ring_buffer__add(struct ring_buffer *rb, int map_fd, ring_buffer_sample_fn sample_cb, void *ctx); LIBBPF_API int ring_buffer__poll(struct ring_buffer *rb, int timeout_ms); LIBBPF_API int ring_buffer__consume(struct ring_buffer *rb); + +/** + * @brief **ring_buffer__consume_n()** consumes up to a requested number of + * records from a ring buffer manager without event polling. + * + * @param rb A ring buffer manager object. + * @param n Maximum number of records to consume. + * @return The number of records consumed, or a negative error code on failure. + * + * Reaching the requested bound does not establish that every ring is empty. + * Records can remain queued without a new readiness notification. Before + * calling ring_buffer__poll() or waiting on ring_buffer__epoll_fd(), call + * ring_buffer__consume() until it returns 0. + */ LIBBPF_API int ring_buffer__consume_n(struct ring_buffer *rb, size_t n); LIBBPF_API int ring_buffer__epoll_fd(const struct ring_buffer *rb); @@ -1538,6 +1557,10 @@ LIBBPF_API int ring__consume(struct ring *r); * @param r A ringbuffer object. * @param n Maximum number of records to consume. * @return The number of records consumed, or a negative error code on failure. + * + * Reaching the requested bound does not establish that the ring is empty. + * Records can remain queued without a new readiness notification. Before + * waiting on ring__map_fd(), call ring__consume() until it returns 0. */ LIBBPF_API int ring__consume_n(struct ring *r, size_t n); diff --git a/tools/lib/bpf/ringbuf.c b/tools/lib/bpf/ringbuf.c index 141f2cbe56eb..0598f6c2f7da 100644 --- a/tools/lib/bpf/ringbuf.c +++ b/tools/lib/bpf/ringbuf.c @@ -271,7 +271,7 @@ static int64_t ringbuf_process_ring(struct ring *r, size_t n) return 0; cons_pos = __atomic_load_n(r->consumer_pos, __ATOMIC_ACQUIRE); - do { + for (;;) { got_new_data = false; prod_pos = __atomic_load_n(r->producer_pos, __ATOMIC_ACQUIRE); /* Positions wrap; the consumer cannot logically pass the producer. */ @@ -279,9 +279,9 @@ static int64_t ringbuf_process_ring(struct ring *r, size_t n) len_ptr = r->data + (cons_pos & r->mask); len = __atomic_load_n(len_ptr, __ATOMIC_ACQUIRE); - /* sample not committed yet, bail out for now */ + /* Retry a busy record once after publishing prior records. */ if (len & BPF_RINGBUF_BUSY_BIT) - goto done; + break; got_new_data = true; cons_pos += roundup_len(len); @@ -294,7 +294,8 @@ static int64_t ringbuf_process_ring(struct ring *r, size_t n) __atomic_store_n(r->consumer_pos, cons_pos, __ATOMIC_RELEASE); - return err; + cnt = err; + break; } cnt++; } @@ -303,10 +304,19 @@ static int64_t ringbuf_process_ring(struct ring *r, size_t n) __ATOMIC_RELEASE); if (cnt >= n) - goto done; + break; } - } while (got_new_data); -done: + if (!got_new_data) + break; + + /* + * Order the published consumer position before the next + * producer-position load, whether below or in a later invocation. + */ + __atomic_thread_fence(__ATOMIC_SEQ_CST); + if (cnt < 0 || cnt >= n) + break; + } return cnt; } diff --git a/tools/testing/selftests/bpf/prog_tests/ringbuf.c b/tools/testing/selftests/bpf/prog_tests/ringbuf.c index 29be2476c478..0d45a766a580 100644 --- a/tools/testing/selftests/bpf/prog_tests/ringbuf.c +++ b/tools/testing/selftests/bpf/prog_tests/ringbuf.c @@ -492,6 +492,87 @@ static void ringbuf_null_cb_subtest(void) test_ringbuf_n_lskel__destroy(skel_n); } +#define N_WAKEUP_SAMPLES 20000 + +struct wakeup_ctx { + bool stop; +}; + +static void *wakeup_producer(void *arg) +{ + struct wakeup_ctx *ctx = arg; + + while (!__atomic_load_n(&ctx->stop, __ATOMIC_RELAXED)) + syscall(__NR_getpgid); + return NULL; +} + +static void ringbuf_wakeup_subtest(void) +{ + struct test_ringbuf_n_lskel *skel_n; + struct ring_buffer *ringbuf = NULL; + struct epoll_event event = { + .events = EPOLLIN | EPOLLET, + }; + struct wakeup_ctx ctx = {}; + pthread_t producer; + int epoll_fd = -1; + int err, total = 0; + + skel_n = test_ringbuf_n_lskel__open(); + if (!ASSERT_OK_PTR(skel_n, "test_ringbuf_n_lskel__open")) + return; + + skel_n->maps.ringbuf.max_entries = getpagesize(); + skel_n->bss->pid = getpid(); + skel_n->bss->value = SAMPLE_VALUE; + + err = test_ringbuf_n_lskel__load(skel_n); + if (!ASSERT_OK(err, "test_ringbuf_n_lskel__load")) + goto cleanup; + + err = test_ringbuf_n_lskel__attach(skel_n); + if (!ASSERT_OK(err, "test_ringbuf_n_lskel__attach")) + goto cleanup; + + ringbuf = ring_buffer__new(skel_n->maps.ringbuf.map_fd, + process_noop_sample, NULL, NULL); + if (!ASSERT_OK_PTR(ringbuf, "ring_buffer__new")) + goto cleanup; + + epoll_fd = epoll_create1(EPOLL_CLOEXEC); + if (!ASSERT_OK_FD(epoll_fd, "epoll_create1")) + goto cleanup_ringbuf; + + err = epoll_ctl(epoll_fd, EPOLL_CTL_ADD, skel_n->maps.ringbuf.map_fd, + &event); + if (!ASSERT_OK(err, "epoll_ctl")) + goto cleanup_epoll; + + err = pthread_create(&producer, NULL, wakeup_producer, &ctx); + if (!ASSERT_OK(err, "pthread_create")) + goto cleanup_epoll; + + while (total < N_WAKEUP_SAMPLES) { + err = epoll_wait(epoll_fd, &event, 1, 1000); + if (!ASSERT_EQ(err, 1, "epoll_wait")) + break; + while ((err = ring_buffer__consume_n(ringbuf, 1)) > 0) + total += err; + if (!ASSERT_OK(err, "ring_buffer__consume_n")) + break; + } + + __atomic_store_n(&ctx.stop, true, __ATOMIC_RELAXED); + pthread_join(producer, NULL); +cleanup_epoll: + close(epoll_fd); +cleanup_ringbuf: + ring_buffer__free(ringbuf); +cleanup: + test_ringbuf_n_lskel__destroy(skel_n); +} + static void ringbuf_n_subtest(void) { struct test_ringbuf_n_lskel *skel_n; @@ -709,6 +790,8 @@ void test_ringbuf(void) ringbuf_n_subtest(); if (test__start_subtest("ringbuf_null_cb")) ringbuf_null_cb_subtest(); + if (test__start_subtest("ringbuf_wakeup")) + ringbuf_wakeup_subtest(); if (test__start_subtest("ringbuf_map_key")) ringbuf_map_key_subtest(); if (test__start_subtest("ringbuf_write")) -- 2.55.0.rc0.159.gbe5d7338c2