From e62b4ae9ebde659b95150f1b8a9d4c0b53f27d52 Mon Sep 17 00:00:00 2001 From: "debing.sun" Date: Sun, 22 Mar 2026 13:17:15 +0800 Subject: [PATCH] iouring --- src/Makefile | 21 +- src/ae.c | 18 +- src/ae_io_uring.c | 266 ++++++++++++++++++ src/config.c | 6 + src/config.h | 2 + src/io_uring_batch.c | 489 +++++++++++++++++++++++++++++++++ src/io_uring_batch.h | 77 ++++++ src/iothread.c | 34 +++ src/networking.c | 26 ++ src/server.c | 24 ++ src/server.h | 21 ++ tests/integration/io-uring.tcl | 94 +++++++ 12 files changed, 1070 insertions(+), 8 deletions(-) create mode 100644 src/ae_io_uring.c create mode 100644 src/io_uring_batch.c create mode 100644 src/io_uring_batch.h create mode 100644 tests/integration/io-uring.tcl diff --git a/src/Makefile b/src/Makefile index b3ebd13b87d..751e50b0257 100644 --- a/src/Makefile +++ b/src/Makefile @@ -343,6 +343,24 @@ ifneq ($(SKIP_VEC_SETS),yes) FINAL_CFLAGS+=-DINCLUDE_VEC_SETS=1 endif +# io_uring support (Linux 5.6+, requires liburing) +LIBURING_LIBS= +LIBURING_PKGCONFIG := $(shell $(PKG_CONFIG) --exists liburing 2>/dev/null && echo $$?) +ifeq ($(LIBURING_PKGCONFIG),0) + LIBURING_LIBS=$(shell $(PKG_CONFIG) --libs liburing) + LIBURING_CFLAGS=$(shell $(PKG_CONFIG) --cflags liburing) +else + LIBURING_LIBS=-luring + LIBURING_CFLAGS= +endif + +ifeq ($(USE_IO_URING),yes) +ifeq ($(uname_S),Linux) + FINAL_CFLAGS+= -DHAVE_IO_URING $(LIBURING_CFLAGS) + FINAL_LIBS+= $(LIBURING_LIBS) +endif +endif + ifndef V define MAKE_INSTALL @printf ' %b %b\n' $(LINKCOLOR)INSTALL$(ENDCOLOR) $(BINCOLOR)$(1)$(ENDCOLOR) 1>&2 @@ -382,7 +400,7 @@ endif REDIS_SERVER_NAME=redis-server$(PROG_SUFFIX) REDIS_SENTINEL_NAME=redis-sentinel$(PROG_SUFFIX) -REDIS_SERVER_OBJ=threads_mngr.o memory_prefetch.o adlist.o quicklist.o ae.o anet.o dict.o ebuckets.o eventnotifier.o iothread.o mstr.o entry.o kvstore.o fwtree.o estore.o server.o sds.o zmalloc.o lzf_c.o lzf_d.o pqsort.o zipmap.o sha1.o ziplist.o release.o networking.o util.o object.o db.o replication.o rdb.o t_string.o t_list.o t_set.o t_zset.o t_hash.o config.o aof.o pubsub.o multi.o debug.o sort.o intset.o syncio.o cluster.o cluster_asm.o cluster_legacy.o cluster_slot_stats.o crc16.o endianconv.o slowlog.o eval.o bio.o rio.o rand.o memtest.o syscheck.o crcspeed.o crccombine.o crc64.o bitops.o sentinel.o notify.o setproctitle.o blocked.o hyperloglog.o latency.o sparkline.o redis-check-rdb.o redis-check-aof.o geo.o lazyfree.o module.o evict.o expire.o geohash.o geohash_helper.o childinfo.o defrag.o siphash.o rax.o t_stream.o listpack.o localtime.o lolwut.o lolwut5.o lolwut6.o lolwut8.o acl.o tracking.o socket.o tls.o sha256.o timeout.o setcpuaffinity.o monotonic.o mt19937-64.o resp_parser.o call_reply.o script_lua.o script.o functions.o function_lua.o commands.o strl.o connection.o unix.o logreqres.o keymeta.o chk.o hotkeys.o gcra.o +REDIS_SERVER_OBJ=threads_mngr.o memory_prefetch.o adlist.o quicklist.o ae.o anet.o dict.o ebuckets.o eventnotifier.o iothread.o mstr.o entry.o kvstore.o fwtree.o estore.o server.o sds.o zmalloc.o lzf_c.o lzf_d.o pqsort.o zipmap.o sha1.o ziplist.o release.o networking.o util.o object.o db.o replication.o rdb.o t_string.o t_list.o t_set.o t_zset.o t_hash.o config.o aof.o pubsub.o multi.o debug.o sort.o intset.o syncio.o cluster.o cluster_asm.o cluster_legacy.o cluster_slot_stats.o crc16.o endianconv.o slowlog.o eval.o bio.o rio.o rand.o memtest.o syscheck.o crcspeed.o crccombine.o crc64.o bitops.o sentinel.o notify.o setproctitle.o blocked.o hyperloglog.o latency.o sparkline.o redis-check-rdb.o redis-check-aof.o geo.o lazyfree.o module.o evict.o expire.o geohash.o geohash_helper.o childinfo.o defrag.o siphash.o rax.o t_stream.o listpack.o localtime.o lolwut.o lolwut5.o lolwut6.o lolwut8.o acl.o tracking.o socket.o tls.o sha256.o timeout.o setcpuaffinity.o monotonic.o mt19937-64.o resp_parser.o call_reply.o script_lua.o script.o functions.o function_lua.o commands.o strl.o connection.o unix.o logreqres.o keymeta.o chk.o hotkeys.o gcra.o io_uring_batch.o REDIS_CLI_NAME=redis-cli$(PROG_SUFFIX) REDIS_CLI_OBJ=anet.o adlist.o dict.o redis-cli.o zmalloc.o release.o ae.o redisassert.o crcspeed.o crccombine.o crc64.o siphash.o crc16.o monotonic.o cli_common.o mt19937-64.o strl.o cli_commands.o REDIS_BENCHMARK_NAME=redis-benchmark$(PROG_SUFFIX) @@ -417,6 +435,7 @@ persist-settings: distclean echo MALLOC=$(MALLOC) >> .make-settings echo BUILD_TLS=$(BUILD_TLS) >> .make-settings echo USE_SYSTEMD=$(USE_SYSTEMD) >> .make-settings + echo USE_IO_URING=$(USE_IO_URING) >> .make-settings echo CFLAGS=$(CFLAGS) >> .make-settings echo LDFLAGS=$(LDFLAGS) >> .make-settings echo REDIS_CFLAGS=$(REDIS_CFLAGS) >> .make-settings diff --git a/src/ae.c b/src/ae.c index 733c88d52d3..81f26bbdfee 100644 --- a/src/ae.c +++ b/src/ae.c @@ -29,16 +29,20 @@ /* Include the best multiplexing layer supported by this system. * The following should be ordered by performances, descending. */ -#ifdef HAVE_EVPORT -#include "ae_evport.c" +#ifdef HAVE_IO_URING +#include "ae_io_uring.c" #else - #ifdef HAVE_EPOLL - #include "ae_epoll.c" + #ifdef HAVE_EVPORT + #include "ae_evport.c" #else - #ifdef HAVE_KQUEUE - #include "ae_kqueue.c" + #ifdef HAVE_EPOLL + #include "ae_epoll.c" #else - #include "ae_select.c" + #ifdef HAVE_KQUEUE + #include "ae_kqueue.c" + #else + #include "ae_select.c" + #endif #endif #endif #endif diff --git a/src/ae_io_uring.c b/src/ae_io_uring.c new file mode 100644 index 00000000000..ea25a3b20f8 --- /dev/null +++ b/src/ae_io_uring.c @@ -0,0 +1,266 @@ +/* Linux io_uring based ae.c module with epoll fallback + * + * Copyright (c) 2024-Present, Redis Ltd. + * All rights reserved. + * + * Licensed under your choice of (a) the Redis Source Available License 2.0 + * (RSALv2); or (b) the Server Side Public License v1 (SSPLv1); or (c) the + * GNU Affero General Public License v3 (AGPLv3). + * + * This module uses io_uring multishot poll for event readiness notification. + * Requires Linux 5.6+ and liburing. If io_uring initialization fails at + * runtime (old kernel, insufficient resources), falls back to epoll + * transparently within the same binary. + */ + +#include +#include +#include + +#define AE_IO_URING_RING_ENTRIES 4096 + +#define IOURING_USERDATA(fd, mask) (((uint64_t)(unsigned)(fd)) | ((uint64_t)(mask) << 32)) +#define IOURING_USERDATA_FD(ud) ((int)((ud) & 0xFFFFFFFF)) +#define IOURING_USERDATA_MASK(ud) ((int)((ud) >> 32)) + +#define IOURING_TAG_BATCH_READ 0x100 +#define IOURING_TAG_BATCH_WRITE 0x200 + +typedef struct aeApiState { + int use_io_uring; + /* epoll fallback state */ + int epfd; + struct epoll_event *epoll_events; + /* io_uring state */ + struct io_uring ring; + int ring_entries; +} aeApiState; + +static int aeApiCreate(aeEventLoop *eventLoop) { + aeApiState *state = zmalloc(sizeof(aeApiState)); + if (!state) return -1; + memset(state, 0, sizeof(*state)); + state->epfd = -1; + + int entries = eventLoop->setsize; + if (entries < AE_IO_URING_RING_ENTRIES) + entries = AE_IO_URING_RING_ENTRIES; + + if (io_uring_queue_init(entries, &state->ring, 0) == 0) { + state->use_io_uring = 1; + state->ring_entries = entries; + eventLoop->apidata = state; + return 0; + } + + /* io_uring unavailable, fall back to epoll */ + state->use_io_uring = 0; + state->epoll_events = zmalloc(sizeof(struct epoll_event) * eventLoop->setsize); + if (!state->epoll_events) { + zfree(state); + return -1; + } + state->epfd = epoll_create(1024); + if (state->epfd == -1) { + zfree(state->epoll_events); + zfree(state); + return -1; + } + anetCloexec(state->epfd); + eventLoop->apidata = state; + return 0; +} + +static int aeApiResize(aeEventLoop *eventLoop, int setsize) { + aeApiState *state = eventLoop->apidata; + if (state->use_io_uring) return 0; + state->epoll_events = zrealloc(state->epoll_events, + sizeof(struct epoll_event) * setsize); + return 0; +} + +static void aeApiFree(aeEventLoop *eventLoop) { + aeApiState *state = eventLoop->apidata; + if (state->use_io_uring) { + io_uring_queue_exit(&state->ring); + } else { + close(state->epfd); + zfree(state->epoll_events); + } + zfree(state); +} + +static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask) { + aeApiState *state = eventLoop->apidata; + + if (state->use_io_uring) { + int old_mask = eventLoop->events[fd].mask; + int full_mask = old_mask | mask; + + /* Cancel existing multishot poll if modifying */ + if (old_mask != AE_NONE) { + struct io_uring_sqe *sqe = io_uring_get_sqe(&state->ring); + if (!sqe) return -1; + io_uring_prep_poll_remove(sqe, IOURING_USERDATA(fd, old_mask)); + io_uring_sqe_set_data64(sqe, 0); + io_uring_submit(&state->ring); + + struct io_uring_cqe *cqe; + io_uring_wait_cqe(&state->ring, &cqe); + io_uring_cqe_seen(&state->ring, cqe); + } + + struct io_uring_sqe *sqe = io_uring_get_sqe(&state->ring); + if (!sqe) return -1; + + unsigned poll_mask = 0; + if (full_mask & AE_READABLE) poll_mask |= POLLIN; + if (full_mask & AE_WRITABLE) poll_mask |= POLLOUT; + + io_uring_prep_poll_multishot(sqe, fd, poll_mask); + io_uring_sqe_set_data64(sqe, IOURING_USERDATA(fd, full_mask)); + io_uring_submit(&state->ring); + return 0; + } + + /* epoll fallback */ + struct epoll_event ee = {0}; + int op = eventLoop->events[fd].mask == AE_NONE ? + EPOLL_CTL_ADD : EPOLL_CTL_MOD; + ee.events = 0; + mask |= eventLoop->events[fd].mask; + if (mask & AE_READABLE) ee.events |= EPOLLIN; + if (mask & AE_WRITABLE) ee.events |= EPOLLOUT; + ee.data.fd = fd; + if (epoll_ctl(state->epfd, op, fd, &ee) == -1) return -1; + return 0; +} + +static void aeApiDelEvent(aeEventLoop *eventLoop, int fd, int delmask) { + aeApiState *state = eventLoop->apidata; + + if (state->use_io_uring) { + int old_mask = eventLoop->events[fd].mask; + int new_mask = old_mask & (~delmask); + + struct io_uring_sqe *sqe = io_uring_get_sqe(&state->ring); + if (!sqe) return; + io_uring_prep_poll_remove(sqe, IOURING_USERDATA(fd, old_mask)); + io_uring_sqe_set_data64(sqe, 0); + io_uring_submit(&state->ring); + + struct io_uring_cqe *cqe; + io_uring_wait_cqe(&state->ring, &cqe); + io_uring_cqe_seen(&state->ring, cqe); + + if (new_mask != AE_NONE) { + sqe = io_uring_get_sqe(&state->ring); + if (!sqe) return; + + unsigned poll_mask = 0; + if (new_mask & AE_READABLE) poll_mask |= POLLIN; + if (new_mask & AE_WRITABLE) poll_mask |= POLLOUT; + + io_uring_prep_poll_multishot(sqe, fd, poll_mask); + io_uring_sqe_set_data64(sqe, IOURING_USERDATA(fd, new_mask)); + io_uring_submit(&state->ring); + } + return; + } + + /* epoll fallback */ + struct epoll_event ee = {0}; + int mask = eventLoop->events[fd].mask & (~delmask); + ee.events = 0; + if (mask & AE_READABLE) ee.events |= EPOLLIN; + if (mask & AE_WRITABLE) ee.events |= EPOLLOUT; + ee.data.fd = fd; + if (mask != AE_NONE) { + epoll_ctl(state->epfd, EPOLL_CTL_MOD, fd, &ee); + } else { + epoll_ctl(state->epfd, EPOLL_CTL_DEL, fd, &ee); + } +} + +static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) { + aeApiState *state = eventLoop->apidata; + + if (state->use_io_uring) { + int numevents = 0; + struct io_uring_cqe *cqe; + + if (tvp != NULL) { + struct __kernel_timespec ts; + ts.tv_sec = tvp->tv_sec; + ts.tv_nsec = tvp->tv_usec * 1000; + int ret = io_uring_submit_and_wait_timeout(&state->ring, &cqe, + 1, &ts, NULL); + if (ret < 0 && ret != -ETIME && ret != -EINTR) + return 0; + } else { + int ret = io_uring_submit_and_wait(&state->ring, 1); + if (ret < 0 && ret != -EINTR) + return 0; + } + + unsigned head; + int seen = 0; + io_uring_for_each_cqe(&state->ring, head, cqe) { + uint64_t ud = io_uring_cqe_get_data64(cqe); + int tag = IOURING_USERDATA_MASK(ud); + seen++; + + /* Skip cancel completions and batch I/O completions */ + if (ud == 0 || tag >= IOURING_TAG_BATCH_READ) + continue; + + int fd = IOURING_USERDATA_FD(ud); + int res = cqe->res; + int mask = 0; + + if (res < 0) { + mask = AE_READABLE | AE_WRITABLE; + } else { + if (res & POLLIN) mask |= AE_READABLE; + if (res & POLLOUT) mask |= AE_WRITABLE; + if (res & POLLERR) mask |= AE_READABLE | AE_WRITABLE; + if (res & POLLHUP) mask |= AE_READABLE | AE_WRITABLE; + } + + if (mask && numevents < eventLoop->setsize) { + eventLoop->fired[numevents].fd = fd; + eventLoop->fired[numevents].mask = mask; + numevents++; + } + } + io_uring_cq_advance(&state->ring, seen); + return numevents; + } + + /* epoll fallback */ + int retval, numevents = 0; + retval = epoll_wait(state->epfd, state->epoll_events, eventLoop->setsize, + tvp ? (tvp->tv_sec*1000 + (tvp->tv_usec + 999)/1000) : -1); + if (retval > 0) { + int j; + numevents = retval; + for (j = 0; j < numevents; j++) { + int mask = 0; + struct epoll_event *e = state->epoll_events + j; + + if (e->events & EPOLLIN) mask |= AE_READABLE; + if (e->events & EPOLLOUT) mask |= AE_WRITABLE; + if (e->events & EPOLLERR) mask |= AE_WRITABLE | AE_READABLE; + if (e->events & EPOLLHUP) mask |= AE_WRITABLE | AE_READABLE; + eventLoop->fired[j].fd = e->data.fd; + eventLoop->fired[j].mask = mask; + } + } else if (retval == -1 && errno != EINTR) { + panic("aeApiPoll: epoll_wait, %s", strerror(errno)); + } + return numevents; +} + +static char *aeApiName(void) { + return "io_uring"; +} diff --git a/src/config.c b/src/config.c index e02cd64e5b9..d04d873c5f0 100644 --- a/src/config.c +++ b/src/config.c @@ -3206,6 +3206,12 @@ standardConfig static_configs[] = { createIntConfig("databases", NULL, IMMUTABLE_CONFIG, 1, INT_MAX, server.dbnum, 16, INTEGER_CONFIG, NULL, NULL), createIntConfig("port", NULL, MODIFIABLE_CONFIG, 0, 65535, server.port, 6379, INTEGER_CONFIG, NULL, updatePort), /* TCP port. */ createIntConfig("io-threads", NULL, DEBUG_CONFIG | IMMUTABLE_CONFIG, 1, 128, server.io_threads_num, 1, INTEGER_CONFIG, NULL, NULL), /* Single threaded by default */ +#ifdef HAVE_IO_URING + createBoolConfig("io-uring-enabled", NULL, IMMUTABLE_CONFIG, server.io_uring_enabled, 1, NULL, NULL), + createBoolConfig("io-uring-batch-writes", NULL, IMMUTABLE_CONFIG, server.io_uring_batch_writes, 1, NULL, NULL), + createBoolConfig("io-uring-batch-reads", NULL, IMMUTABLE_CONFIG, server.io_uring_batch_reads, 0, NULL, NULL), + createIntConfig("io-uring-sq-size", NULL, IMMUTABLE_CONFIG, 64, 16384, server.io_uring_sq_size, 1024, INTEGER_CONFIG, NULL, NULL), +#endif createIntConfig("prefetch-batch-max-size", NULL, MODIFIABLE_CONFIG | HIDDEN_CONFIG, 0, PREFETCH_BATCH_MAX_SIZE, server.prefetch_batch_max_size, 16, INTEGER_CONFIG, NULL, NULL), createIntConfig("auto-aof-rewrite-percentage", NULL, MODIFIABLE_CONFIG, 0, INT_MAX, server.aof_rewrite_perc, 100, INTEGER_CONFIG, NULL, NULL), createIntConfig("cluster-replica-validity-factor", "cluster-slave-validity-factor", MODIFIABLE_CONFIG, 0, INT_MAX, server.cluster_slave_validity_factor, 10, INTEGER_CONFIG, NULL, NULL), /* Slave max data age factor. */ diff --git a/src/config.h b/src/config.h index beb6bc815b2..9bc12443bc3 100644 --- a/src/config.h +++ b/src/config.h @@ -84,6 +84,8 @@ /* Test for polling API */ #ifdef __linux__ #define HAVE_EPOLL 1 +/* HAVE_IO_URING is defined via Makefile when USE_IO_URING=yes. + * Requires Linux 5.6+ and liburing. */ #endif /* Test for accept4() */ diff --git a/src/io_uring_batch.c b/src/io_uring_batch.c new file mode 100644 index 00000000000..e36208fbead --- /dev/null +++ b/src/io_uring_batch.c @@ -0,0 +1,489 @@ +/* io_uring batch I/O engine for Redis + * + * Copyright (c) 2024-Present, Redis Ltd. + * All rights reserved. + * + * Licensed under your choice of (a) the Redis Source Available License 2.0 + * (RSALv2); or (b) the Server Side Public License v1 (SSPLv1); or (c) the + * GNU Affero General Public License v3 (AGPLv3). + */ + +#include "server.h" + +#ifdef HAVE_IO_URING + +#include "io_uring_batch.h" +#include +#include + +#define IO_BATCH_INITIAL_CAPACITY 64 + +/* Encode operation index + type into CQE user_data for completion routing */ +#define BATCH_USERDATA(idx, type) (((uint64_t)(unsigned)(idx)) | ((uint64_t)(type) << 32)) +#define BATCH_USERDATA_IDX(ud) ((int)((ud) & 0xFFFFFFFF)) +#define BATCH_USERDATA_TYPE(ud) ((int)((ud) >> 32)) + +ioBatchState *ioBatchCreate(int sq_size) { + ioBatchState *batch = zmalloc(sizeof(ioBatchState)); + if (!batch) return NULL; + memset(batch, 0, sizeof(*batch)); + + if (sq_size <= 0) sq_size = IO_BATCH_DEFAULT_SQ_SIZE; + batch->sq_size = sq_size; + + if (io_uring_queue_init(sq_size, &batch->ring, 0) < 0) { + zfree(batch); + return NULL; + } + batch->ring_initialized = 1; + + batch->write_capacity = IO_BATCH_INITIAL_CAPACITY; + batch->write_ops = zmalloc(sizeof(ioBatchOp) * batch->write_capacity); + batch->write_count = 0; + + batch->read_capacity = IO_BATCH_INITIAL_CAPACITY; + batch->read_ops = zmalloc(sizeof(ioBatchOp) * batch->read_capacity); + batch->read_count = 0; + + batch->inflight_writes = 0; + batch->inflight_reads = 0; + batch->batch_writes_enabled = 1; + batch->batch_reads_enabled = 0; + + return batch; +} + +void ioBatchFree(ioBatchState *batch) { + if (!batch) return; + + /* Free any pending iovec arrays */ + for (int i = 0; i < batch->write_count; i++) { + if (batch->write_ops[i].iov) + zfree(batch->write_ops[i].iov); + } + for (int i = 0; i < batch->read_count; i++) { + if (batch->read_ops[i].iov) + zfree(batch->read_ops[i].iov); + } + + if (batch->ring_initialized) + io_uring_queue_exit(&batch->ring); + + zfree(batch->write_ops); + zfree(batch->read_ops); + zfree(batch); +} + +static void ioBatchGrowWriteOps(ioBatchState *batch) { + batch->write_capacity *= 2; + batch->write_ops = zrealloc(batch->write_ops, + sizeof(ioBatchOp) * batch->write_capacity); +} + +static void ioBatchGrowReadOps(ioBatchState *batch) { + batch->read_capacity *= 2; + batch->read_ops = zrealloc(batch->read_ops, + sizeof(ioBatchOp) * batch->read_capacity); +} + +/* Queue a write operation. The caller provides an iovec array that has been + * built from the client's output buffers. The iovec is OWNED by the batch + * state and will be freed after completion or on cleanup. */ +int ioBatchAddWrite(ioBatchState *batch, struct client *c, int fd, + struct iovec *iov, int iovcnt, size_t total_len) { + if (!batch || !batch->batch_writes_enabled) return -1; + if (batch->write_count >= batch->write_capacity) + ioBatchGrowWriteOps(batch); + + ioBatchOp *op = &batch->write_ops[batch->write_count]; + op->c = c; + op->fd = fd; + op->type = IO_BATCH_OP_WRITE; + op->iov = iov; + op->iovcnt = iovcnt; + op->total_len = total_len; + batch->write_count++; + return 0; +} + +/* Submit all queued write operations to the io_uring ring. */ +int ioBatchSubmitWrites(ioBatchState *batch) { + if (!batch || batch->write_count == 0) return 0; + + int submitted = 0; + for (int i = 0; i < batch->write_count; i++) { + ioBatchOp *op = &batch->write_ops[i]; + struct io_uring_sqe *sqe = io_uring_get_sqe(&batch->ring); + if (!sqe) break; + + io_uring_prep_writev(sqe, op->fd, op->iov, op->iovcnt, 0); + sqe->flags |= IOSQE_ASYNC; + io_uring_sqe_set_data64(sqe, BATCH_USERDATA(i, IO_BATCH_OP_WRITE)); + submitted++; + } + + if (submitted > 0) { + int ret = io_uring_submit(&batch->ring); + if (ret < 0) { + serverLog(LL_WARNING, "io_uring batch write submit failed: %s", + strerror(-ret)); + return -1; + } + batch->inflight_writes = submitted; + } + return submitted; +} + +/* Harvest write completion CQEs. Returns number of completions processed. + * For each completed write, updates the client's sentlen and frees + * consumed reply blocks. */ +int ioBatchHarvestWrites(ioBatchState *batch) { + if (!batch || batch->inflight_writes == 0) return 0; + + struct io_uring_cqe *cqe; + unsigned head; + int harvested = 0; + int seen = 0; + + io_uring_for_each_cqe(&batch->ring, head, cqe) { + uint64_t ud = io_uring_cqe_get_data64(cqe); + int type = BATCH_USERDATA_TYPE(ud); + int idx = BATCH_USERDATA_IDX(ud); + seen++; + + if (type != IO_BATCH_OP_WRITE) continue; + if (idx < 0 || idx >= batch->write_count) continue; + + ioBatchOp *op = &batch->write_ops[idx]; + client *c = op->c; + + if (cqe->res >= 0) { + ssize_t nwritten = cqe->res; + c->net_output_bytes += nwritten; + + /* Update sentlen for the fixed buffer part */ + if (c->bufpos > 0) { + ssize_t buf_remaining = c->bufpos - c->sentlen; + if (nwritten >= buf_remaining) { + nwritten -= buf_remaining; + c->bufpos = 0; + c->sentlen = 0; + } else { + c->sentlen += nwritten; + nwritten = 0; + } + } + + /* Update sentlen for reply list blocks */ + while (nwritten > 0 && listLength(c->reply)) { + clientReplyBlock *o = listNodeValue(listFirst(c->reply)); + ssize_t block_remaining = o->used - c->sentlen; + if (nwritten >= block_remaining) { + nwritten -= block_remaining; + c->reply_bytes -= o->size; + listDelNode(c->reply, listFirst(c->reply)); + c->sentlen = 0; + } else { + c->sentlen += nwritten; + nwritten = 0; + } + } + } else if (cqe->res != -EAGAIN) { + /* Write error: mark connection for closing */ + c->conn->last_errno = -cqe->res; + if (c->conn->state == CONN_STATE_CONNECTED) + c->conn->state = CONN_STATE_ERROR; + } + + /* Free the iovec for this op */ + if (op->iov) { + zfree(op->iov); + op->iov = NULL; + } + harvested++; + } + io_uring_cq_advance(&batch->ring, seen); + + batch->inflight_writes -= harvested; + if (batch->inflight_writes < 0) batch->inflight_writes = 0; + + /* Reset write queue for next batch */ + batch->write_count = 0; + return harvested; +} + +/* Queue a read operation for batch submission. */ +int ioBatchAddRead(ioBatchState *batch, struct client *c, int fd, + void *buf, size_t buf_len) { + if (!batch || !batch->batch_reads_enabled) return -1; + if (batch->read_count >= batch->read_capacity) + ioBatchGrowReadOps(batch); + + ioBatchOp *op = &batch->read_ops[batch->read_count]; + op->c = c; + op->fd = fd; + op->type = IO_BATCH_OP_READ; + + /* Build a single-element iovec */ + op->iov = zmalloc(sizeof(struct iovec)); + op->iov[0].iov_base = buf; + op->iov[0].iov_len = buf_len; + op->iovcnt = 1; + op->total_len = buf_len; + batch->read_count++; + return 0; +} + +/* Submit all queued read operations. */ +int ioBatchSubmitReads(ioBatchState *batch) { + if (!batch || batch->read_count == 0) return 0; + + int submitted = 0; + for (int i = 0; i < batch->read_count; i++) { + ioBatchOp *op = &batch->read_ops[i]; + struct io_uring_sqe *sqe = io_uring_get_sqe(&batch->ring); + if (!sqe) break; + + io_uring_prep_readv(sqe, op->fd, op->iov, op->iovcnt, 0); + sqe->flags |= IOSQE_ASYNC; + io_uring_sqe_set_data64(sqe, BATCH_USERDATA(i, IO_BATCH_OP_READ)); + submitted++; + } + + if (submitted > 0) { + int ret = io_uring_submit(&batch->ring); + if (ret < 0) { + serverLog(LL_WARNING, "io_uring batch read submit failed: %s", + strerror(-ret)); + return -1; + } + batch->inflight_reads = submitted; + } + return submitted; +} + +/* Harvest read completions. For each completed read, updates the + * client's querybuf and triggers input processing. */ +int ioBatchHarvestReads(ioBatchState *batch) { + if (!batch || batch->inflight_reads == 0) return 0; + + struct io_uring_cqe *cqe; + unsigned head; + int harvested = 0; + int seen = 0; + + io_uring_for_each_cqe(&batch->ring, head, cqe) { + uint64_t ud = io_uring_cqe_get_data64(cqe); + int type = BATCH_USERDATA_TYPE(ud); + int idx = BATCH_USERDATA_IDX(ud); + seen++; + + if (type != IO_BATCH_OP_READ) continue; + if (idx < 0 || idx >= batch->read_count) continue; + + ioBatchOp *op = &batch->read_ops[idx]; + client *c = op->c; + + if (cqe->res > 0) { + ssize_t nread = cqe->res; + sdsIncrLen(c->querybuf, nread); + c->net_input_bytes += nread; + c->lastinteraction = server.unixtime; + atomicIncr(server.stat_net_input_bytes, nread); + } else if (cqe->res == 0) { + /* EOF */ + c->conn->state = CONN_STATE_CLOSED; + } else if (cqe->res != -EAGAIN) { + c->conn->last_errno = -cqe->res; + if (c->conn->state == CONN_STATE_CONNECTED) + c->conn->state = CONN_STATE_ERROR; + } + + if (op->iov) { + zfree(op->iov); + op->iov = NULL; + } + harvested++; + } + io_uring_cq_advance(&batch->ring, seen); + + batch->inflight_reads -= harvested; + if (batch->inflight_reads < 0) batch->inflight_reads = 0; + + batch->read_count = 0; + return harvested; +} + +/* Build an iovec from a client's output buffers (buf + reply list) and + * queue it for batch submission. Returns 0 on success, -1 on failure. */ +int ioBatchAddClientWrite(struct client *c, ioBatchState *batch) { + if (!batch || !c) return -1; + + int max_iov = IO_BATCH_MAX_IOVEC; + struct iovec *iov = zmalloc(sizeof(struct iovec) * max_iov); + int iovcnt = 0; + size_t total = 0; + + /* Add the fixed buffer */ + if (c->bufpos > 0 && !c->buf_encoded) { + iov[iovcnt].iov_base = c->buf + c->sentlen; + iov[iovcnt].iov_len = c->bufpos - c->sentlen; + total += iov[iovcnt].iov_len; + iovcnt++; + } + + /* Add reply list blocks */ + if (listLength(c->reply) > 0) { + size_t offset = c->bufpos > 0 ? 0 : c->sentlen; + listIter iter; + listNode *next; + listRewind(c->reply, &iter); + while ((next = listNext(&iter)) && iovcnt < max_iov) { + clientReplyBlock *o = listNodeValue(next); + if (o->used == 0) continue; + if (o->buf_encoded) { + /* Encoded blocks need the complex writev path; bail out + * and let the caller use the synchronous code path. */ + zfree(iov); + return -1; + } + iov[iovcnt].iov_base = o->buf + offset; + iov[iovcnt].iov_len = o->used - offset; + total += iov[iovcnt].iov_len; + iovcnt++; + offset = 0; + } + } + + if (iovcnt == 0) { + zfree(iov); + return 0; + } + + return ioBatchAddWrite(batch, c, c->conn->fd, iov, iovcnt, total); +} + +/* Prepare a client's querybuf for a batch read and queue the operation. */ +int ioBatchAddClientRead(struct client *c, ioBatchState *batch) { + if (!batch || !c || !c->conn || c->conn->fd < 0) return -1; + + size_t readlen = PROTO_IOBUF_LEN; + if (c->querybuf == NULL) { + c->querybuf = sdsempty(); + } + + size_t qblen = sdslen(c->querybuf); + if (sdsavail(c->querybuf) < readlen) { + c->querybuf = sdsMakeRoomFor(c->querybuf, readlen - sdsavail(c->querybuf)); + } + + void *buf = c->querybuf + qblen; + size_t buf_len = sdsavail(c->querybuf); + + return ioBatchAddRead(batch, c, c->conn->fd, buf, buf_len); +} + +int ioBatchPendingWrites(ioBatchState *batch) { + return batch ? batch->write_count : 0; +} + +int ioBatchPendingReads(ioBatchState *batch) { + return batch ? batch->read_count : 0; +} + +int ioBatchInflight(ioBatchState *batch) { + return batch ? (batch->inflight_writes + batch->inflight_reads) : 0; +} + +/* ============ Server-level io_uring lifecycle ============ */ + +void initIOUring(void) { + if (!server.io_uring_enabled) { + serverLog(LL_VERBOSE, "io_uring: disabled by configuration."); + return; + } + + server.io_uring_batch = ioBatchCreate(server.io_uring_sq_size); + if (!server.io_uring_batch) { + serverLog(LL_WARNING, + "io_uring: failed to initialize batch I/O ring (sq_size=%d). " + "Falling back to synchronous I/O.", + server.io_uring_sq_size); + server.io_uring_enabled = 0; + return; + } + + server.io_uring_batch->batch_writes_enabled = server.io_uring_batch_writes; + server.io_uring_batch->batch_reads_enabled = server.io_uring_batch_reads; + + serverLog(LL_NOTICE, + "io_uring: initialized (sq_size=%d, batch_writes=%s, batch_reads=%s)", + server.io_uring_sq_size, + server.io_uring_batch_writes ? "yes" : "no", + server.io_uring_batch_reads ? "yes" : "no"); +} + +void freeIOUring(void) { + if (server.io_uring_batch) { + ioBatchFree(server.io_uring_batch); + server.io_uring_batch = NULL; + } +} + +void harvestIOUringCompletions(void) { + if (!server.io_uring_batch) return; + if (ioBatchInflight(server.io_uring_batch) > 0) { + ioBatchHarvestWrites(server.io_uring_batch); + ioBatchHarvestReads(server.io_uring_batch); + } +} + +/* Batch read handler: given an array of fired fds from aeApiPoll, + * submit io_uring read operations for all eligible readable clients. + * Returns the number of fds handled via batch (these should be skipped + * in the normal dispatch loop). */ +int handleBatchReadsIOUring(int *fds, int nfds) { + ioBatchState *batch = server.io_uring_batch; + if (!batch || !batch->batch_reads_enabled || nfds == 0) return 0; + + int batched = 0; + for (int i = 0; i < nfds; i++) { + int fd = fds[i]; + if (fd < 0) continue; + + /* Get the connection from the event loop's client data */ + void *cd = aeGetFileClientData(server.el, fd); + if (!cd) continue; + + connection *conn = (connection *)cd; + if (!conn->read_handler || conn->state != CONN_STATE_CONNECTED) + continue; + + client *c = connGetPrivateData(conn); + if (!c) continue; + + /* Skip clients that shouldn't be batch-read */ + if (!(c->io_flags & CLIENT_IO_READ_ENABLED)) continue; + if (c->flags & (CLIENT_BLOCKED | CLIENT_CLOSE_ASAP)) continue; + if (c->flags & CLIENT_SLAVE) continue; + + if (ioBatchAddClientRead(c, batch) == 0) { + fds[i] = -1; /* Mark as handled */ + batched++; + } + } + + if (batched > 0) { + ioBatchSubmitReads(batch); + } + return batched; +} + +#else /* !HAVE_IO_URING */ + +/* Provide empty stubs when io_uring is not available. + * The config system and server code guard calls with #ifdef HAVE_IO_URING, + * but the .o file must still compile cleanly. */ +typedef int io_uring_batch_not_available; + +#endif /* HAVE_IO_URING */ diff --git a/src/io_uring_batch.h b/src/io_uring_batch.h new file mode 100644 index 00000000000..7ebb3927e9e --- /dev/null +++ b/src/io_uring_batch.h @@ -0,0 +1,77 @@ +/* io_uring batch I/O engine for Redis + * + * Copyright (c) 2024-Present, Redis Ltd. + * All rights reserved. + * + * Licensed under your choice of (a) the Redis Source Available License 2.0 + * (RSALv2); or (b) the Server Side Public License v1 (SSPLv1); or (c) the + * GNU Affero General Public License v3 (AGPLv3). + */ + +#ifndef __IO_URING_BATCH_H +#define __IO_URING_BATCH_H + +#ifdef HAVE_IO_URING + +#include +#include + +#define IO_BATCH_DEFAULT_SQ_SIZE 1024 +#define IO_BATCH_MAX_IOVEC 128 +#define IO_BATCH_OP_WRITE 1 +#define IO_BATCH_OP_READ 2 + +struct client; + +typedef struct ioBatchOp { + struct client *c; + int fd; + int type; + struct iovec *iov; + int iovcnt; + size_t total_len; +} ioBatchOp; + +typedef struct ioBatchState { + struct io_uring ring; + int ring_initialized; + ioBatchOp *write_ops; + int write_count; + int write_capacity; + ioBatchOp *read_ops; + int read_count; + int read_capacity; + int inflight_writes; + int inflight_reads; + int sq_size; + int batch_writes_enabled; + int batch_reads_enabled; +} ioBatchState; + +/* Lifecycle */ +ioBatchState *ioBatchCreate(int sq_size); +void ioBatchFree(ioBatchState *batch); + +/* Write batching */ +int ioBatchAddWrite(ioBatchState *batch, struct client *c, int fd, + struct iovec *iov, int iovcnt, size_t total_len); +int ioBatchSubmitWrites(ioBatchState *batch); +int ioBatchHarvestWrites(ioBatchState *batch); + +/* Read batching */ +int ioBatchAddRead(ioBatchState *batch, struct client *c, int fd, + void *buf, size_t buf_len); +int ioBatchSubmitReads(ioBatchState *batch); +int ioBatchHarvestReads(ioBatchState *batch); + +/* High-level helpers that build iovecs from client buffers */ +int ioBatchAddClientWrite(struct client *c, ioBatchState *batch); +int ioBatchAddClientRead(struct client *c, ioBatchState *batch); + +/* Query state */ +int ioBatchPendingWrites(ioBatchState *batch); +int ioBatchPendingReads(ioBatchState *batch); +int ioBatchInflight(ioBatchState *batch); + +#endif /* HAVE_IO_URING */ +#endif /* __IO_URING_BATCH_H */ diff --git a/src/iothread.c b/src/iothread.c index 981edb9515e..8eb00b74c87 100644 --- a/src/iothread.c +++ b/src/iothread.c @@ -796,6 +796,14 @@ void IOThreadBeforeSleep(struct aeEventLoop *el) { * notification (write fd and wake up) is costly. */ dont_sleep = 1; } + +#ifdef HAVE_IO_URING + /* Submit any pending batch writes for this IO thread's clients */ + if (t->io_uring_batch && ioBatchPendingWrites(t->io_uring_batch) > 0) { + ioBatchSubmitWrites(t->io_uring_batch); + } +#endif + if (!dont_sleep) { atomicSetWithSync(t->running, 0); /* Not running if going to sleep. */ /* Try to process clients from main thread again, since before we set @@ -815,6 +823,14 @@ void IOThreadBeforeSleep(struct aeEventLoop *el) { void IOThreadAfterSleep(struct aeEventLoop *el) { IOThread *t = el->privdata[0]; +#ifdef HAVE_IO_URING + /* Harvest io_uring batch completions for this IO thread */ + if (t->io_uring_batch && ioBatchInflight(t->io_uring_batch) > 0) { + ioBatchHarvestWrites(t->io_uring_batch); + ioBatchHarvestReads(t->io_uring_batch); + } +#endif + /* Set the IO thread to running state, so the main thread can deliver * clients to it without extra notifications. */ atomicSetWithSync(t->running, 1); @@ -903,6 +919,18 @@ void initThreadedIO(void) { #endif pthread_mutex_init(&t->pending_clients_mutex, attr); +#ifdef HAVE_IO_URING + if (server.io_uring_enabled) { + t->io_uring_batch = ioBatchCreate(server.io_uring_sq_size); + if (t->io_uring_batch) { + t->io_uring_batch->batch_writes_enabled = server.io_uring_batch_writes; + t->io_uring_batch->batch_reads_enabled = server.io_uring_batch_reads; + } + } else { + t->io_uring_batch = NULL; + } +#endif + t->pending_clients_notifier = createEventNotifier(); if (aeCreateFileEvent(t->el, getReadEventFd(t->pending_clients_notifier), AE_READABLE, handleClientsFromMainThread, t) != AE_OK) @@ -946,6 +974,12 @@ void killIOThreads(void) { int err, j; for (j = 1; j < server.io_threads_num; j++) { +#ifdef HAVE_IO_URING + if (IOThreads[j].io_uring_batch) { + ioBatchFree(IOThreads[j].io_uring_batch); + IOThreads[j].io_uring_batch = NULL; + } +#endif if (IOThreads[j].tid == pthread_self()) continue; if (IOThreads[j].tid && pthread_cancel(IOThreads[j].tid) == 0) { if ((err = pthread_join(IOThreads[j].tid,NULL)) != 0) { diff --git a/src/networking.c b/src/networking.c index beed81e7437..ecb59da901a 100644 --- a/src/networking.c +++ b/src/networking.c @@ -2789,6 +2789,11 @@ int handleClientsWithPendingWrites(void) { listNode *ln; int processed = listLength(server.clients_pending_write); +#ifdef HAVE_IO_URING + int use_batch = server.io_uring_enabled && server.io_uring_batch_writes && + server.io_uring_batch != NULL; +#endif + listRewind(server.clients_pending_write,&li); while((ln = listNext(&li))) { client *c = listNodeValue(ln); @@ -2817,6 +2822,19 @@ int handleClientsWithPendingWrites(void) { continue; } +#ifdef HAVE_IO_URING + /* When io_uring batch writes are enabled, queue writes for batch + * submission instead of writing synchronously. We only batch + * non-slave, non-encoded simple buffer + reply list writes. */ + if (use_batch && !(c->flags & CLIENT_SLAVE) && !c->buf_encoded && + c->conn && c->conn->fd >= 0) + { + if (ioBatchAddClientWrite(c, server.io_uring_batch) == 0) + continue; + /* Fall through to synchronous write if batching fails */ + } +#endif + /* Try to write buffers to the client socket. */ if (writeToClient(c,0) == C_ERR) continue; @@ -2826,6 +2844,14 @@ int handleClientsWithPendingWrites(void) { installClientWriteHandler(c); } } + +#ifdef HAVE_IO_URING + /* Submit all queued batch writes in a single syscall */ + if (use_batch && ioBatchPendingWrites(server.io_uring_batch) > 0) { + ioBatchSubmitWrites(server.io_uring_batch); + } +#endif + return processed; } diff --git a/src/server.c b/src/server.c index a0245d7108d..5ee3b09429f 100644 --- a/src/server.c +++ b/src/server.c @@ -2099,6 +2099,11 @@ void afterSleep(struct aeEventLoop *eventLoop) { server.el_cmd_cnt_start = server.stat_numcommands; } +#ifdef HAVE_IO_URING + /* Harvest io_uring batch I/O completions after waking up from poll */ + harvestIOUringCompletions(); +#endif + /* Set running after waking up */ if (server.io_threads_num > 1) atomicSetWithSync(server.running, 1); @@ -2938,6 +2943,9 @@ void initServer(void) { sizeof(server.client_pause_per_purpose)); server.postponed_clients = listCreate(); server.events_processed_while_blocked = 0; +#ifdef HAVE_IO_URING + server.io_uring_batch = NULL; +#endif server.system_memory_size = zmalloc_get_memory_size(); server.blocked_last_cron = 0; server.blocking_op_nesting = 0; @@ -3209,6 +3217,9 @@ void initListeners(void) { void InitServerLast(void) { bioInit(); initThreadedIO(); +#ifdef HAVE_IO_URING + initIOUring(); +#endif set_jemalloc_bg_thread(server.jemalloc_bg_thread); server.initial_memory_usage = zmalloc_used_memory(); } @@ -6308,6 +6319,16 @@ sds genRedisInfoString(dict *section_dict, int all_sections, int everything) { "config_file:%s\r\n", server.configfile ? server.configfile : "", "io_threads_active:%i\r\n", server.io_threads_active)); +#ifdef HAVE_IO_URING + info = sdscatfmt(info, + "io_uring_enabled:%i\r\n" + "io_uring_batch_writes:%i\r\n" + "io_uring_batch_reads:%i\r\n", + server.io_uring_enabled, + server.io_uring_batch_writes, + server.io_uring_batch_reads); +#endif + /* Conditional properties */ if (isShutdownInitiated()) { info = sdscatfmt(info, @@ -8134,6 +8155,9 @@ int main(int argc, char **argv) { setOOMScoreAdj(-1); aeMain(server.el); +#ifdef HAVE_IO_URING + freeIOUring(); +#endif aeDeleteEventLoop(server.el); return 0; } diff --git a/src/server.h b/src/server.h index bd154507263..967e9142937 100644 --- a/src/server.h +++ b/src/server.h @@ -22,6 +22,7 @@ #include "atomicvar.h" #include "commands.h" #include "object.h" +#include "io_uring_batch.h" #include #include @@ -1626,6 +1627,9 @@ typedef struct __attribute__((aligned(CACHE_LINE_SIZE))) { pthread_mutex_t pending_clients_mutex; /* Mutex for pending write list */ list *pending_clients_to_main_thread; /* Clients that are waiting to be executed by the main thread. */ list *clients; /* IO thread managed clients. */ +#ifdef HAVE_IO_URING + ioBatchState *io_uring_batch; /* Per-thread batch I/O state */ +#endif } IOThread; /* Context for streaming replDataBuf to database */ @@ -2024,6 +2028,14 @@ struct redisServer { pendingCommandPool cmd_pool; /* Shared pool for reusing pendingCommand, * only when IO threads disabled */ int prefetch_batch_max_size;/* Maximum number of keys to prefetch in a single batch */ +#ifdef HAVE_IO_URING + /* io_uring batch I/O state for the main thread */ + ioBatchState *io_uring_batch; + int io_uring_enabled; /* Master switch for io_uring */ + int io_uring_batch_writes; /* Enable batch writes via io_uring */ + int io_uring_batch_reads; /* Enable batch reads via io_uring */ + int io_uring_sq_size; /* Submission queue size */ +#endif long long events_processed_while_blocked; /* processEventsWhileBlocked() */ int enable_protected_configs; /* Enable the modification of protected configs, see PROTECTED_ACTION_ALLOWED_* */ int enable_debug_cmd; /* Enable DEBUG commands, see PROTECTED_ACTION_ALLOWED_* */ @@ -3248,6 +3260,15 @@ void keepClientInMainThread(client *c); void fetchClientFromIOThread(client *c); int isClientMustHandledByMainThread(client *c); +/* io_uring_batch.c - io_uring batch I/O */ +#ifdef HAVE_IO_URING +void initIOUring(void); +void freeIOUring(void); +int handleClientsWithPendingWritesIOUring(void); +int handleBatchReadsIOUring(int *fds, int nfds); +void harvestIOUringCompletions(void); +#endif + /* logreqres.c - logging of requests and responses */ void reqresReset(client *c, int free_buf); void reqresSaveClientReplyOffset(client *c); diff --git a/tests/integration/io-uring.tcl b/tests/integration/io-uring.tcl new file mode 100644 index 00000000000..35e305cf3a2 --- /dev/null +++ b/tests/integration/io-uring.tcl @@ -0,0 +1,94 @@ +# Test io_uring integration when compiled with USE_IO_URING=yes + +proc io_uring_enabled {} { + set info [r info server] + if {[string match "*io_uring_enabled:1*" $info]} { + return 1 + } + return 0 +} + +# Basic io_uring test: verify server starts and serves requests +tags {"io_uring"} { + start_server {} { + test "io_uring - server info reports io_uring status" { + set info [r info server] + # If compiled with io_uring, the fields should be present + if {[string match "*io_uring_enabled*" $info]} { + assert_match "*io_uring_enabled:*" $info + assert_match "*io_uring_batch_writes:*" $info + assert_match "*io_uring_batch_reads:*" $info + } + } + + test "io_uring - basic SET/GET works" { + r set mykey myvalue + assert_equal "myvalue" [r get mykey] + } + + test "io_uring - pipeline works" { + set pipe [r pipeline] + for {set i 0} {$i < 100} {incr i} { + $pipe set "key:$i" "value:$i" + } + $pipe exec + + for {set i 0} {$i < 100} {incr i} { + assert_equal "value:$i" [r get "key:$i"] + } + } + + test "io_uring - multiple clients" { + set clients {} + for {set i 0} {$i < 10} {incr i} { + set rd [redis_deferring_client] + lappend clients $rd + } + + foreach rd $clients { + $rd set testkey testval + } + + foreach rd $clients { + $rd read + } + + assert_equal "testval" [r get testkey] + + foreach rd $clients { + $rd close + } + } + + test "io_uring - event loop API name" { + set info [r info server] + # When compiled with io_uring, the multiplexing API should report io_uring + if {[io_uring_enabled]} { + # The API could be "io_uring" or "io_uring+epoll_fallback" depending on kernel + set ok [expr {[string match "*multiplexing_api:io_uring*" $info]}] + # It's also ok if it fell back to epoll on older kernels + if {!$ok} { + set ok [expr {[string match "*multiplexing_api:epoll*" $info]}] + } + assert {$ok} + } + } + } + + # Test with io_uring batch writes + if {[io_uring_enabled]} { + start_server {overrides {io-uring-batch-writes yes}} { + test "io_uring batch writes - bulk write operations" { + set pipe [r pipeline] + for {set i 0} {$i < 1000} {incr i} { + $pipe set "bw:$i" [string repeat "x" 100] + } + $pipe exec + + for {set i 0} {$i < 1000} {incr i} { + assert_equal [string repeat "x" 100] [r get "bw:$i"] + } + } + } + } +}