Add a small selftest that stresses pipe->mutex contention by spawning N writer threads that hammer a single pipe with multi-page writes, plus M reader threads that drain. Each writer records its own write() latency samples into a log2-bucketed histogram; main aggregates and prints total writes, throughput, average and percentile (p50/p99/p99.9) latencies, and the maximum observed latency. This was used to validate "fs/pipe: bulk pre-allocate pages outside pipe->mutex in anon_pipe_write". By default it sweeps over writers={1,2,5} x readers={1,5,10} using 64KB writes for 3s on a 1 MB pipe (~27s total); -w/-r switch to a single configuration and -s/-d/-p tune msgsize/duration/pipe size. Output is one-line-per-metric with a "---" separator between configurations so two runs (e.g. baseline vs patched) can be diffed directly. Pass --memory-pressure to fork stress-ng (--vm 4 --vm-bytes 80% --vm-method all) for the duration of the run, so alloc_page() in anon_pipe_write() routinely hits direct reclaim. The flag fails fast if stress-ng is not on $PATH. The program exits 0 on a clean run and reports its results to stdout, so it integrates with the kselftest framework via TEST_GEN_PROGS. Signed-off-by: Breno Leitao --- tools/testing/selftests/Makefile | 1 + tools/testing/selftests/pipe/.gitignore | 1 + tools/testing/selftests/pipe/Makefile | 9 + tools/testing/selftests/pipe/pipe_bench.c | 351 ++++++++++++++++++++++++++++++ 4 files changed, 362 insertions(+) diff --git a/tools/testing/selftests/Makefile b/tools/testing/selftests/Makefile index 6e59b8f63e416..bcd9db9d292ca 100644 --- a/tools/testing/selftests/Makefile +++ b/tools/testing/selftests/Makefile @@ -91,6 +91,7 @@ TARGETS += pcie_bwctrl TARGETS += perf_events TARGETS += pidfd TARGETS += pid_namespace +TARGETS += pipe TARGETS += power_supply TARGETS += powerpc TARGETS += prctl diff --git a/tools/testing/selftests/pipe/.gitignore b/tools/testing/selftests/pipe/.gitignore new file mode 100644 index 0000000000000..20b549361a152 --- /dev/null +++ b/tools/testing/selftests/pipe/.gitignore @@ -0,0 +1 @@ +pipe_bench diff --git a/tools/testing/selftests/pipe/Makefile b/tools/testing/selftests/pipe/Makefile new file mode 100644 index 0000000000000..1810c680117b3 --- /dev/null +++ b/tools/testing/selftests/pipe/Makefile @@ -0,0 +1,9 @@ +# SPDX-License-Identifier: GPL-2.0 +# Copyright (c) 2026 Meta Platforms, Inc. and affiliates +# Copyright (c) 2026 Breno Leitao + +CFLAGS += -O2 -Wall -Wextra -pthread + +TEST_GEN_PROGS := pipe_bench + +include ../lib.mk diff --git a/tools/testing/selftests/pipe/pipe_bench.c b/tools/testing/selftests/pipe/pipe_bench.c new file mode 100644 index 0000000000000..4b4ee6c8c0ced --- /dev/null +++ b/tools/testing/selftests/pipe/pipe_bench.c @@ -0,0 +1,351 @@ +// SPDX-License-Identifier: GPL-2.0 +/* + * pipe_bench - exercise pipe->mutex contention under concurrent writers. + * + * N writer threads hammer a single pipe with multi-page writes; M reader + * threads drain it. Each writer records its own write() latency histogram. + * Multi-page writes (msgsize >= PAGE_SIZE) force the loop in + * anon_pipe_write() to call alloc_page(GFP_HIGHUSER | __GFP_ACCOUNT) under + * pipe->mutex, which is the critical section the patch shrinks. + * + * By default the benchmark sweeps writers in {1, 2, 5} x readers in + * {1, 5, 10} and prints one block per configuration so two runs (e.g. + * baseline vs patched) can be diffed directly. Pass -w and -r to run a + * single configuration instead. Pass --memory-pressure to spawn stress-ng + * alongside the sweep so the per-page alloc_page() path under pipe->mutex + * has to dip into reclaim. + * + * Copyright (c) 2026 Meta Platforms, Inc. and affiliates + * Copyright (c) 2026 Breno Leitao + */ + +#define _GNU_SOURCE +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#define ARRAY_SIZE(a) (sizeof(a) / sizeof((a)[0])) +#define HIST_BUCKETS 32 + +static size_t g_msgsize = 16 * 4096; +static int g_duration = 3; +static int g_pipe_size = 1024 * 1024; +static int g_memory_pressure; + +static atomic_int g_stop; +static int g_pipe[2]; + +struct wstats { + uint64_t writes; + uint64_t bytes; + uint64_t lat_sum_ns; + uint64_t lat_max_ns; + uint64_t lat_hist[HIST_BUCKETS]; +}; + +static inline uint64_t now_ns(void) +{ + struct timespec ts; + + clock_gettime(CLOCK_MONOTONIC, &ts); + return (uint64_t)ts.tv_sec * 1000000000ull + ts.tv_nsec; +} + +static inline int log2_bucket(uint64_t v) +{ + int b = 0; + + if (!v) + return 0; + while (v >>= 1) + b++; + return b < HIST_BUCKETS ? b : HIST_BUCKETS - 1; +} + +static void *writer(void *arg) +{ + struct wstats *s = arg; + char *buf = aligned_alloc(4096, g_msgsize); + + if (!buf) + return NULL; + memset(buf, 0xAA, g_msgsize); + + while (!atomic_load_explicit(&g_stop, memory_order_relaxed)) { + uint64_t t0 = now_ns(); + ssize_t n = write(g_pipe[1], buf, g_msgsize); + uint64_t dt = now_ns() - t0; + + if (n > 0) { + s->writes++; + s->bytes += n; + s->lat_sum_ns += dt; + if (dt > s->lat_max_ns) + s->lat_max_ns = dt; + s->lat_hist[log2_bucket(dt)]++; + } else if (n < 0 && errno == EPIPE) { + break; + } + } + free(buf); + return NULL; +} + +static void *reader(void *arg) +{ + char *buf = aligned_alloc(4096, g_msgsize); + + (void)arg; + if (!buf) + return NULL; + /* + * Drain until EOF (write end closed by main). g_stop is not checked + * here on purpose: writers may be blocked in write() with the pipe + * full when g_stop is set, so the reader must keep draining until + * main closes the write end. + */ + for (;;) { + ssize_t n = read(g_pipe[0], buf, g_msgsize); + + if (n <= 0) + break; + } + free(buf); + return NULL; +} + +static void summarize(struct wstats *all, int nw, int nr) +{ + uint64_t total_writes = 0, total_bytes = 0, total_lat = 0; + uint64_t max_lat = 0; + uint64_t agg[HIST_BUCKETS] = {0}; + uint64_t p50_target, p99_target, p999_target; + uint64_t cum = 0, p50 = 0, p99 = 0, p999 = 0; + uint64_t avg_ns; + double sec; + + for (int i = 0; i < nw; i++) { + total_writes += all[i].writes; + total_bytes += all[i].bytes; + total_lat += all[i].lat_sum_ns; + if (all[i].lat_max_ns > max_lat) + max_lat = all[i].lat_max_ns; + for (int b = 0; b < HIST_BUCKETS; b++) + agg[b] += all[i].lat_hist[b]; + } + + p50_target = total_writes * 50 / 100; + p99_target = total_writes * 99 / 100; + p999_target = total_writes * 999 / 1000; + + for (int b = 0; b < HIST_BUCKETS; b++) { + cum += agg[b]; + if (!p50 && cum >= p50_target) + p50 = 1ULL << b; + if (!p99 && cum >= p99_target) + p99 = 1ULL << b; + if (!p999 && cum >= p999_target) + p999 = 1ULL << b; + } + + sec = g_duration; + avg_ns = total_writes ? total_lat / total_writes : 0; + + printf("config: writers=%d readers=%d msgsize=%zu duration=%d pipe_size=%d memory_pressure=%s\n", + nw, nr, g_msgsize, g_duration, g_pipe_size, + g_memory_pressure ? "yes" : "no"); + printf("writes: total=%llu rate=%.0f/s\n", + (unsigned long long)total_writes, total_writes / sec); + printf("throughput_MBps: %.2f\n", + (total_bytes / sec) / (1024.0 * 1024.0)); + printf("lat_avg_ns: %llu\n", (unsigned long long)avg_ns); + printf("lat_p50_ns_upper: %llu\n", (unsigned long long)p50); + printf("lat_p99_ns_upper: %llu\n", (unsigned long long)p99); + printf("lat_p999_ns_upper: %llu\n", (unsigned long long)p999); + printf("lat_max_ns: %llu\n", (unsigned long long)max_lat); +} + +static pid_t spawn_stress_ng(void) +{ + pid_t pid = fork(); + + if (pid < 0) { + perror("fork"); + return -1; + } + if (pid == 0) { + execlp("stress-ng", "stress-ng", + "--vm", "4", "--vm-bytes", "80%", + "--vm-method", "all", + (char *)NULL); + fprintf(stderr, "exec stress-ng failed: %s\n", + strerror(errno)); + _exit(127); + } + /* Give stress-ng a moment to map its VM regions before measuring. */ + sleep(1); + return pid; +} + +static void kill_stress_ng(pid_t pid) +{ + int status; + + if (pid <= 0) + return; + kill(pid, SIGTERM); + for (int i = 0; i < 20; i++) { + if (waitpid(pid, &status, WNOHANG) > 0) + return; + usleep(100 * 1000); + } + kill(pid, SIGKILL); + waitpid(pid, &status, 0); +} + +static int run_one(int nw, int nr) +{ + pthread_t *wt, *rt; + struct wstats *ws; + + atomic_store(&g_stop, 0); + + if (pipe(g_pipe) < 0) { + perror("pipe"); + return -1; + } + if (fcntl(g_pipe[1], F_SETPIPE_SZ, g_pipe_size) < 0) + perror("F_SETPIPE_SZ (continuing)"); + + wt = calloc(nw, sizeof(*wt)); + rt = calloc(nr, sizeof(*rt)); + ws = calloc(nw, sizeof(*ws)); + + if (!wt || !rt || !ws) { + fprintf(stderr, "alloc failed\n"); + free(wt); + free(rt); + free(ws); + close(g_pipe[0]); + close(g_pipe[1]); + return -1; + } + + for (int i = 0; i < nr; i++) + pthread_create(&rt[i], NULL, reader, NULL); + for (int i = 0; i < nw; i++) + pthread_create(&wt[i], NULL, writer, &ws[i]); + + sleep(g_duration); + atomic_store(&g_stop, 1); + + /* + * Close write end first so any writer blocked in write() gets EPIPE + * and exits, and so the readers see EOF after draining. + */ + close(g_pipe[1]); + for (int i = 0; i < nw; i++) + pthread_join(wt[i], NULL); + for (int i = 0; i < nr; i++) + pthread_join(rt[i], NULL); + close(g_pipe[0]); + + summarize(ws, nw, nr); + fflush(stdout); + + free(wt); + free(rt); + free(ws); + return 0; +} + +int main(int argc, char **argv) +{ + static const int writers_sweep[] = {1, 2, 5}; + static const int readers_sweep[] = {1, 5, 10}; + static const struct option long_opts[] = { + {"memory-pressure", no_argument, NULL, 'M'}, + {0, 0, 0, 0}, + }; + int writers_override = 0, readers_override = 0; + pid_t stress_pid = -1; + int rc = 0, opt; + + while ((opt = getopt_long(argc, argv, "w:r:s:d:p:", + long_opts, NULL)) != -1) { + switch (opt) { + case 'w': + writers_override = atoi(optarg); + break; + case 'r': + readers_override = atoi(optarg); + break; + case 's': + g_msgsize = atol(optarg); + break; + case 'd': + g_duration = atoi(optarg); + break; + case 'p': + g_pipe_size = atoi(optarg); + break; + case 'M': + g_memory_pressure = 1; + break; + default: + fprintf(stderr, + "usage: %s [-w writers] [-r readers] [-s msgsize] [-d secs] [-p pipe_size] [--memory-pressure]\n" + " default: sweep writers={1,2,5} x readers={1,5,10}\n" + " --memory-pressure: spawn stress-ng (--vm 4 --vm-bytes 80%% --vm-method all) for the run\n", + argv[0]); + return 1; + } + } + + signal(SIGPIPE, SIG_IGN); + setvbuf(stdout, NULL, _IOLBF, 0); + setvbuf(stderr, NULL, _IOLBF, 0); + + fprintf(stderr, "pid=%d\n", getpid()); + fflush(stderr); + + if (g_memory_pressure) { + stress_pid = spawn_stress_ng(); + if (stress_pid < 0) { + fprintf(stderr, + "memory_pressure requested but stress-ng could not be spawned\n"); + return 1; + } + } + + if (writers_override > 0 || readers_override > 0) { + int nw = writers_override > 0 ? writers_override : 1; + int nr = readers_override > 0 ? readers_override : 1; + + rc = run_one(nw, nr) < 0 ? 1 : 0; + goto out; + } + + for (size_t i = 0; i < ARRAY_SIZE(writers_sweep); i++) { + for (size_t j = 0; j < ARRAY_SIZE(readers_sweep); j++) { + printf("---\n"); + if (run_one(writers_sweep[i], readers_sweep[j]) < 0) { + rc = 1; + goto out; + } + } + } +out: + kill_stress_ng(stress_pid); + return rc; +} -- 2.53.0-Meta