diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 92eb4e29621..74a71520f6a 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -15,7 +15,9 @@ jobs: - name: make # Fail build if there are warnings # build with TLS just for compilation coverage - run: make REDIS_CFLAGS='-Werror' BUILD_TLS=yes + run: | + sudo apt-get install -y libzstd-dev + make REDIS_CFLAGS='-Werror' BUILD_TLS=yes - name: test run: | sudo apt-get install tcl8.6 tclx @@ -33,7 +35,9 @@ jobs: - uses: actions/checkout@v6 - name: make # build with TLS module just for compilation coverage - run: make SANITIZER=address REDIS_CFLAGS='-Werror -DDEBUG_ASSERTIONS -DREDIS_TEST' BUILD_TLS=module + run: | + sudo apt-get install -y libzstd-dev + make SANITIZER=address REDIS_CFLAGS='-Werror -DDEBUG_ASSERTIONS -DREDIS_TEST' BUILD_TLS=module - name: testprep run: sudo apt-get install tcl8.6 tclx -y - name: test @@ -48,7 +52,8 @@ jobs: run: | sed -i 's|http://deb.debian.org/debian|http://archive.debian.org/debian|g' /etc/apt/sources.list sed -i 's|http://security.debian.org|http://archive.debian.org/debian-security|g' /etc/apt/sources.list - apt-get update && apt-get install -y build-essential + echo "deb http://archive.debian.org/debian buster-backports main" >> /etc/apt/sources.list + apt-get update && apt-get install -y build-essential libzstd-dev/buster-backports make REDIS_CFLAGS='-Werror' build-macos-latest: @@ -58,7 +63,9 @@ jobs: - name: make # Fail build if there are warnings # build with TLS just for compilation coverage - run: make REDIS_CFLAGS='-Werror' BUILD_TLS=yes + run: | + brew install zstd + make REDIS_CFLAGS='-Werror' BUILD_TLS=yes build-32bit: runs-on: ubuntu-latest @@ -66,7 +73,8 @@ jobs: - uses: actions/checkout@v6 - name: make run: | - sudo apt-get update && sudo apt-get install libc6-dev-i386 gcc-multilib + sudo dpkg --add-architecture i386 + sudo apt-get update && sudo apt-get install libc6-dev-i386 gcc-multilib libzstd-dev:i386 make REDIS_CFLAGS='-Werror' 32bit build-libc-malloc: @@ -74,7 +82,9 @@ jobs: steps: - uses: actions/checkout@v6 - name: make - run: make REDIS_CFLAGS='-Werror' MALLOC=libc + run: | + sudo apt-get install -y libzstd-dev + make REDIS_CFLAGS='-Werror' MALLOC=libc build-centos-jemalloc: runs-on: ubuntu-latest @@ -83,7 +93,7 @@ jobs: - uses: actions/checkout@v6 - name: make run: | - dnf -y install which gcc make + dnf -y install which gcc make libzstd-devel make REDIS_CFLAGS='-Werror' build-old-chain-jemalloc: @@ -100,6 +110,6 @@ jobs: apt-key adv --keyserver keyserver.ubuntu.com --recv-keys 40976EAF437D05B5 apt-key adv --keyserver keyserver.ubuntu.com --recv-keys 3B4FE6ACC0B21F32 apt-get update - apt-get install -y make gcc-4.8 + apt-get install -y make gcc-4.8 libzstd-dev update-alternatives --install /usr/bin/gcc gcc /usr/bin/gcc-4.8 100 make CC=gcc REDIS_CFLAGS='-Werror' diff --git a/.github/workflows/daily.yml b/.github/workflows/daily.yml index fd067686c1b..250da230478 100644 --- a/.github/workflows/daily.yml +++ b/.github/workflows/daily.yml @@ -11,7 +11,7 @@ on: inputs: skipjobs: description: 'jobs to skip (delete the ones you wanna keep, do not leave empty)' - default: 'valgrind,sanitizer,tls,freebsd,macos,alpine,32bit,iothreads,ubuntu,centos,malloc,specific,fortify,reply-schema,oldTC,defrag,vectorset,assert-keyspace,arm' + default: 'valgrind,sanitizer,tls,freebsd,macos,alpine,32bit,iothreads,ubuntu,centos,malloc,specific,fortify,reply-schema,oldTC,defrag,vectorset,assert-keyspace,arm,compression' skiptests: description: 'tests to skip (delete the ones you wanna keep, do not leave empty)' default: 'redis,modules,sentinel,cluster,unittest' @@ -52,7 +52,9 @@ jobs: repository: ${{ env.GITHUB_REPOSITORY }} ref: ${{ env.GITHUB_HEAD_REF }} - name: make - run: make REDIS_CFLAGS='-Werror -DREDIS_TEST -DDEBUG_ASSERTIONS' + run: | + sudo apt-get install -y libzstd-dev + make REDIS_CFLAGS='-Werror -DREDIS_TEST -DDEBUG_ASSERTIONS' - name: testprep run: sudo apt-get install tcl8.6 tclx - name: test @@ -240,7 +242,7 @@ jobs: ref: ${{ env.GITHUB_HEAD_REF }} - name: make run: | - apt-get update && apt-get install -y make gcc + apt-get update && apt-get install -y make gcc libzstd-dev # Also enables jemalloc's sized deallocation checks to catch sdallocx()/zfree_with_size() misuse. make CC=gcc REDIS_CFLAGS='-Werror -DREDIS_TEST -U_FORTIFY_SOURCE -D_FORTIFY_SOURCE=3' JEMALLOC_CONFIGURE_OPTS='--enable-opt-size-checks' - name: testprep @@ -279,7 +281,9 @@ jobs: repository: ${{ env.GITHUB_REPOSITORY }} ref: ${{ env.GITHUB_HEAD_REF }} - name: make - run: make MALLOC=libc REDIS_CFLAGS='-Werror' + run: | + sudo apt-get install -y libzstd-dev + make MALLOC=libc REDIS_CFLAGS='-Werror' - name: testprep run: sudo apt-get install tcl8.6 tclx - name: test @@ -313,7 +317,9 @@ jobs: repository: ${{ env.GITHUB_REPOSITORY }} ref: ${{ env.GITHUB_HEAD_REF }} - name: make - run: make MALLOC=libc CFLAGS=-DNO_MALLOC_USABLE_SIZE REDIS_CFLAGS='-Werror' + run: | + sudo apt-get install -y libzstd-dev + make MALLOC=libc CFLAGS=-DNO_MALLOC_USABLE_SIZE REDIS_CFLAGS='-Werror' - name: testprep run: sudo apt-get install tcl8.6 tclx - name: test @@ -348,7 +354,8 @@ jobs: ref: ${{ env.GITHUB_HEAD_REF }} - name: make run: | - sudo apt-get update && sudo apt-get install libc6-dev-i386 gcc-multilib + sudo dpkg --add-architecture i386 + sudo apt-get update && sudo apt-get install libc6-dev-i386 gcc-multilib libzstd-dev:i386 make 32bit REDIS_CFLAGS='-Werror -DREDIS_TEST' make -C tests/modules 32bit # the script below doesn't have an argument, we must build manually ahead of time - name: testprep @@ -388,6 +395,7 @@ jobs: ref: ${{ env.GITHUB_HEAD_REF }} - name: make run: | + sudo apt-get install -y libzstd-dev make BUILD_TLS=yes REDIS_CFLAGS='-Werror' - name: testprep run: | @@ -428,6 +436,7 @@ jobs: ref: ${{ env.GITHUB_HEAD_REF }} - name: make run: | + sudo apt-get install -y libzstd-dev make BUILD_TLS=yes REDIS_CFLAGS='-Werror' - name: testprep run: | @@ -468,6 +477,7 @@ jobs: ref: ${{ env.GITHUB_HEAD_REF }} - name: make run: | + sudo apt-get install -y libzstd-dev make REDIS_CFLAGS='-Werror' - name: testprep run: sudo apt-get install tcl8.6 tclx @@ -478,6 +488,43 @@ jobs: if: true && !contains(github.event.inputs.skiptests, 'cluster') run: ./runtest-cluster --config io-threads 4 ${{github.event.inputs.cluster_test_args}} + test-ubuntu-client-compression: + runs-on: ubuntu-latest + if: | + (github.event_name == 'workflow_dispatch' || (github.event_name != 'workflow_dispatch' && github.repository == 'redis/redis')) && + !contains(github.event.inputs.skipjobs, 'compression') + timeout-minutes: 14400 + steps: + - name: prep + if: github.event_name == 'workflow_dispatch' + run: | + echo "GITHUB_REPOSITORY=${{github.event.inputs.use_repo}}" >> $GITHUB_ENV + echo "GITHUB_HEAD_REF=${{github.event.inputs.use_git_ref}}" >> $GITHUB_ENV + echo "skipjobs: ${{github.event.inputs.skipjobs}}" + echo "skiptests: ${{github.event.inputs.skiptests}}" + echo "test_args: ${{github.event.inputs.test_args}}" + echo "cluster_test_args: ${{github.event.inputs.cluster_test_args}}" + - uses: actions/checkout@v6 + with: + repository: ${{ env.GITHUB_REPOSITORY }} + ref: ${{ env.GITHUB_HEAD_REF }} + - name: make + run: | + sudo apt-get install -y libzstd-dev + make REDIS_CFLAGS='-Werror' + - name: testprep + run: sudo apt-get install tcl8.6 tclx + - name: test + if: true && !contains(github.event.inputs.skiptests, 'redis') + # Running with less than default(16) clients as compression slows down + # replication and some tests start to fail because of timing errors. + # This is similar to why we run tsan with only 1 client which is even + # more extreme case + run: ./runtest --clients 4 --config io-threads 4 --config repl-compression 1 --compression --accurate --verbose --tags "repl network iothreads psync2" --dump-logs ${{github.event.inputs.test_args}} + - name: cluster tests + if: true && !contains(github.event.inputs.skiptests, 'cluster') + run: ./runtest-cluster --config io-threads 4 --config repl-compression 1 --compression ${{github.event.inputs.cluster_test_args}} + test-ubuntu-reclaim-cache: runs-on: ubuntu-latest if: | @@ -500,6 +547,7 @@ jobs: ref: ${{ env.GITHUB_HEAD_REF }} - name: make run: | + sudo apt-get install -y libzstd-dev make REDIS_CFLAGS='-Werror' - name: testprep run: | @@ -577,7 +625,9 @@ jobs: repository: ${{ env.GITHUB_REPOSITORY }} ref: ${{ env.GITHUB_HEAD_REF }} - name: make - run: make valgrind REDIS_CFLAGS='-Werror -DREDIS_TEST' + run: | + sudo apt-get update && sudo apt-get install -y libzstd-dev + make valgrind REDIS_CFLAGS='-Werror -DREDIS_TEST' - name: testprep run: | sudo apt-get update @@ -610,7 +660,9 @@ jobs: repository: ${{ env.GITHUB_REPOSITORY }} ref: ${{ env.GITHUB_HEAD_REF }} - name: make - run: make valgrind REDIS_CFLAGS='-Werror -DREDIS_TEST' + run: | + sudo apt-get update && sudo apt-get install -y libzstd-dev + make valgrind REDIS_CFLAGS='-Werror -DREDIS_TEST' - name: testprep run: | sudo apt-get update @@ -642,7 +694,9 @@ jobs: repository: ${{ env.GITHUB_REPOSITORY }} ref: ${{ env.GITHUB_HEAD_REF }} - name: make - run: make valgrind CFLAGS="-DNO_MALLOC_USABLE_SIZE -DREDIS_TEST" REDIS_CFLAGS='-Werror' + run: | + sudo apt-get update && sudo apt-get install -y libzstd-dev + make valgrind CFLAGS="-DNO_MALLOC_USABLE_SIZE -DREDIS_TEST" REDIS_CFLAGS='-Werror' - name: testprep run: | sudo apt-get update @@ -672,7 +726,9 @@ jobs: repository: ${{ env.GITHUB_REPOSITORY }} ref: ${{ env.GITHUB_HEAD_REF }} - name: make - run: make valgrind CFLAGS="-DNO_MALLOC_USABLE_SIZE -DREDIS_TEST" REDIS_CFLAGS='-Werror' + run: | + sudo apt-get update && sudo apt-get install -y libzstd-dev + make valgrind CFLAGS="-DNO_MALLOC_USABLE_SIZE -DREDIS_TEST" REDIS_CFLAGS='-Werror' - name: testprep run: | sudo apt-get update @@ -709,7 +765,9 @@ jobs: repository: ${{ env.GITHUB_REPOSITORY }} ref: ${{ env.GITHUB_HEAD_REF }} - name: make - run: make SANITIZER=address REDIS_CFLAGS='-DREDIS_TEST -Werror -DDEBUG_ASSERTIONS' + run: | + sudo apt-get install -y libzstd-dev + make SANITIZER=address REDIS_CFLAGS='-DREDIS_TEST -Werror -DDEBUG_ASSERTIONS' - name: testprep run: | sudo apt-get update @@ -750,7 +808,9 @@ jobs: repository: ${{ env.GITHUB_REPOSITORY }} ref: ${{ env.GITHUB_HEAD_REF }} - name: make - run: make SANITIZER=memory REDIS_CFLAGS='-DREDIS_TEST -Werror -DDEBUG_ASSERTIONS' + run: | + sudo apt-get install -y libzstd-dev + make SANITIZER=memory REDIS_CFLAGS='-DREDIS_TEST -Werror -DDEBUG_ASSERTIONS' - name: testprep run: | sudo apt-get update @@ -794,7 +854,9 @@ jobs: repository: ${{ env.GITHUB_REPOSITORY }} ref: ${{ env.GITHUB_HEAD_REF }} - name: make - run: make SANITIZER=undefined REDIS_CFLAGS='-DREDIS_TEST -Werror' SKIP_VEC_SETS=yes LUA_DEBUG=yes # we (ab)use this flow to also check Lua C API violations + run: | + sudo apt-get install -y libzstd-dev + make SANITIZER=undefined REDIS_CFLAGS='-DREDIS_TEST -Werror' SKIP_VEC_SETS=yes LUA_DEBUG=yes # we (ab)use this flow to also check Lua C API violations - name: testprep run: | sudo apt-get update @@ -841,7 +903,9 @@ jobs: - name: make # TODO Investigate why jemalloc with clang TSan crash on start; # with gcc TSan, jemalloc works modulo sentinel tests hanging. - run: make SANITIZER=thread USE_JEMALLOC=no REDIS_CFLAGS='-DREDIS_TEST -Werror -DDEBUG_ASSERTIONS' + run: | + sudo apt-get install -y libzstd-dev + make SANITIZER=thread USE_JEMALLOC=no REDIS_CFLAGS='-DREDIS_TEST -Werror -DDEBUG_ASSERTIONS' - name: testprep run: | sudo apt-get update @@ -879,7 +943,7 @@ jobs: ref: ${{ env.GITHUB_HEAD_REF }} - name: make run: | - dnf -y install which gcc make + dnf -y install which gcc make libzstd-devel make REDIS_CFLAGS='-Werror' - name: testprep run: | @@ -918,7 +982,7 @@ jobs: ref: ${{ env.GITHUB_HEAD_REF }} - name: make run: | - dnf -y install which gcc make openssl-devel openssl + dnf -y install which gcc make openssl-devel openssl libzstd-devel make BUILD_TLS=module REDIS_CFLAGS='-Werror' - name: testprep run: | @@ -961,7 +1025,7 @@ jobs: ref: ${{ env.GITHUB_HEAD_REF }} - name: make run: | - dnf -y install which gcc make openssl-devel openssl + dnf -y install which gcc make openssl-devel openssl libzstd-devel make BUILD_TLS=module REDIS_CFLAGS='-Werror' - name: testprep run: | @@ -1002,7 +1066,9 @@ jobs: repository: ${{ env.GITHUB_REPOSITORY }} ref: ${{ env.GITHUB_HEAD_REF }} - name: make - run: make REDIS_CFLAGS='-Werror' + run: | + brew install zstd + make REDIS_CFLAGS='-Werror' - name: test if: true && !contains(github.event.inputs.skiptests, 'redis') run: ./runtest --accurate --verbose --clients 1 --no-latency --dump-logs ${{github.event.inputs.test_args}} @@ -1028,7 +1094,9 @@ jobs: repository: ${{ env.GITHUB_REPOSITORY }} ref: ${{ env.GITHUB_HEAD_REF }} - name: make - run: make REDIS_CFLAGS='-Werror' + run: | + brew install zstd + make REDIS_CFLAGS='-Werror' - name: sentinel tests if: true && !contains(github.event.inputs.skiptests, 'sentinel') run: ./runtest-sentinel ${{github.event.inputs.cluster_test_args}} @@ -1054,7 +1122,9 @@ jobs: repository: ${{ env.GITHUB_REPOSITORY }} ref: ${{ env.GITHUB_HEAD_REF }} - name: make - run: make REDIS_CFLAGS='-Werror' + run: | + brew install zstd + make REDIS_CFLAGS='-Werror' - name: cluster tests if: true && !contains(github.event.inputs.skiptests, 'cluster') run: ./runtest-cluster ${{github.event.inputs.cluster_test_args}} @@ -1086,7 +1156,9 @@ jobs: repository: ${{ env.GITHUB_REPOSITORY }} ref: ${{ env.GITHUB_HEAD_REF }} - name: make - run: make REDIS_CFLAGS='-Werror -DREDIS_TEST' + run: | + brew install zstd + make REDIS_CFLAGS='-Werror -DREDIS_TEST' test-freebsd: runs-on: ubuntu-latest @@ -1112,7 +1184,7 @@ jobs: version: 13.2 shell: bash run: | - sudo pkg install -y bash gmake lang/tcl86 lang/tclX gcc + sudo pkg install -y bash gmake lang/tcl86 lang/tclX gcc zstd devel/pkgconf gmake ./runtest --single unit/keyspace --single unit/auth --single unit/networking --single unit/protocol @@ -1138,7 +1210,7 @@ jobs: ref: ${{ env.GITHUB_HEAD_REF }} - name: make run: | - apk add build-base + apk add build-base zstd-dev make REDIS_CFLAGS='-Werror' - name: testprep run: apk add tcl procps tclx @@ -1174,7 +1246,7 @@ jobs: ref: ${{ env.GITHUB_HEAD_REF }} - name: make run: | - apk add build-base + apk add build-base zstd-dev make REDIS_CFLAGS='-Werror' USE_JEMALLOC=no CFLAGS=-DUSE_MALLOC_USABLE_SIZE - name: testprep run: apk add tcl procps tclx @@ -1209,7 +1281,9 @@ jobs: repository: ${{ env.GITHUB_REPOSITORY }} ref: ${{ env.GITHUB_HEAD_REF }} - name: make - run: make REDIS_CFLAGS='-Werror -DLOG_REQ_RES' + run: | + sudo apt-get install -y libzstd-dev + make REDIS_CFLAGS='-Werror -DLOG_REQ_RES' - name: testprep run: sudo apt-get install tcl8.6 tclx - name: test @@ -1262,7 +1336,7 @@ jobs: apt-key adv --keyserver keyserver.ubuntu.com --recv-keys 40976EAF437D05B5 apt-key adv --keyserver keyserver.ubuntu.com --recv-keys 3B4FE6ACC0B21F32 apt-get update - apt-get install -y make gcc-4.8 + apt-get install -y make gcc-4.8 libzstd-dev update-alternatives --install /usr/bin/gcc gcc /usr/bin/gcc-4.8 100 make CC=gcc REDIS_CFLAGS='-Werror' - name: testprep @@ -1307,7 +1381,7 @@ jobs: apt-key adv --keyserver keyserver.ubuntu.com --recv-keys 40976EAF437D05B5 apt-key adv --keyserver keyserver.ubuntu.com --recv-keys 3B4FE6ACC0B21F32 apt-get update - apt-get install -y make gcc-4.8 openssl libssl-dev + apt-get install -y make gcc-4.8 openssl libssl-dev libzstd-dev update-alternatives --install /usr/bin/gcc gcc /usr/bin/gcc-4.8 100 make CC=gcc BUILD_TLS=module REDIS_CFLAGS='-Werror' - name: testprep @@ -1357,7 +1431,7 @@ jobs: apt-key adv --keyserver keyserver.ubuntu.com --recv-keys 40976EAF437D05B5 apt-key adv --keyserver keyserver.ubuntu.com --recv-keys 3B4FE6ACC0B21F32 apt-get update - apt-get install -y make gcc-4.8 openssl libssl-dev + apt-get install -y make gcc-4.8 openssl libssl-dev libzstd-dev update-alternatives --install /usr/bin/gcc gcc /usr/bin/gcc-4.8 100 make BUILD_TLS=module CC=gcc REDIS_CFLAGS='-Werror' - name: testprep @@ -1398,7 +1472,9 @@ jobs: repository: ${{ env.GITHUB_REPOSITORY }} ref: ${{ env.GITHUB_HEAD_REF }} - name: make - run: make SANITIZER=address DEBUG_DEFRAG=force REDIS_CFLAGS='-Werror' + run: | + sudo apt-get install -y libzstd-dev + make SANITIZER=address DEBUG_DEFRAG=force REDIS_CFLAGS='-Werror' - name: testprep run: sudo apt-get install tcl8.6 tclx - name: test @@ -1426,7 +1502,9 @@ jobs: repository: ${{ env.GITHUB_REPOSITORY }} ref: ${{ env.GITHUB_HEAD_REF }} - name: make - run: make REDIS_CFLAGS='-Werror -DREDIS_TEST' + run: | + sudo apt-get install -y libzstd-dev + make REDIS_CFLAGS='-Werror -DREDIS_TEST' - name: testprep run: | sudo apt-get install tcl8.6 tclx diff --git a/redis.conf b/redis.conf index 9151c8fc8e5..7077ac2a112 100644 --- a/redis.conf +++ b/redis.conf @@ -839,6 +839,16 @@ replica-priority 100 # By default min-replicas-to-write is set to 0 (feature disabled) and # min-replicas-max-lag is set to 10. +# Enable client compression for replication at specified level when connecting to +# master 0-22 (0 for disabled compression). Currently only zstd compression lib +# is supported hence the compression values 1-22 are related to it. +# NOTE: This setting only works when io-threads > 1 +repl-compression 0 + +# When compression is enabled specify maximum latency in ms before the +# compressed data is flushed and send to the socket. +compression-max-latency 100 + # A Redis master is able to list the address and port of the attached # replicas in different ways. For example the "INFO replication" section # offers this information, which is used, among other tools, by diff --git a/src/Makefile b/src/Makefile index 8939f89e07c..4c04d6f95e1 100644 --- a/src/Makefile +++ b/src/Makefile @@ -152,6 +152,18 @@ endif FINAL_CFLAGS=$(STD) $(WARN) $(OPT) $(DEBUG) $(CFLAGS) $(REDIS_CFLAGS) FINAL_LDFLAGS=$(LDFLAGS) $(OPT) $(REDIS_LDFLAGS) $(DEBUG) FINAL_LIBS=-lm + +# Detect libzstd via pkg-config, fall back to -lzstd +LIBZSTD_PKGCONFIG := $(shell $(PKG_CONFIG) --exists libzstd && echo $$?) +ifeq ($(LIBZSTD_PKGCONFIG),0) + LIBZSTD_CFLAGS=$(shell $(PKG_CONFIG) --cflags libzstd) + LIBZSTD_LIBS=$(shell $(PKG_CONFIG) --libs libzstd) +else + LIBZSTD_CFLAGS= + LIBZSTD_LIBS=-lzstd +endif +FINAL_CFLAGS+=$(LIBZSTD_CFLAGS) +FINAL_LIBS+=$(LIBZSTD_LIBS) DEBUG=-g -ggdb # Linux ARM32 needs -latomic at linking time @@ -384,7 +396,7 @@ endif REDIS_SERVER_NAME=redis-server$(PROG_SUFFIX) REDIS_SENTINEL_NAME=redis-sentinel$(PROG_SUFFIX) -REDIS_SERVER_OBJ=threads_mngr.o memory_prefetch.o adlist.o quicklist.o ae.o anet.o dict.o ebuckets.o eventnotifier.o iothread.o mstr.o entry.o kvstore.o fwtree.o estore.o server.o sds.o zmalloc.o lzf_c.o lzf_d.o pqsort.o zipmap.o sha1.o ziplist.o release.o networking.o util.o object.o db.o replication.o rdb.o t_string.o t_list.o t_set.o t_zset.o t_hash.o t_array.o sparsearray.o config.o aof.o pubsub.o multi.o debug.o sort.o intset.o syncio.o cluster.o cluster_asm.o cluster_legacy.o cluster_slot_stats.o crc16.o endianconv.o slowlog.o eval.o bio.o rio.o rand.o memtest.o syscheck.o crcspeed.o crccombine.o crc64.o bitops.o sentinel.o notify.o setproctitle.o blocked.o hyperloglog.o latency.o sparkline.o redis-check-rdb.o redis-check-aof.o geo.o lazyfree.o module.o evict.o expire.o geohash.o geohash_helper.o childinfo.o defrag.o siphash.o rax.o t_stream.o listpack.o localtime.o lolwut.o lolwut5.o lolwut6.o lolwut8.o acl.o tracking.o socket.o tls.o sha256.o timeout.o setcpuaffinity.o monotonic.o mt19937-64.o resp_parser.o call_reply.o script_lua.o script.o functions.o function_lua.o commands.o strl.o connection.o unix.o logreqres.o keymeta.o chk.o hotkeys.o gcra.o vector.o fast_float_strtod.o +REDIS_SERVER_OBJ=threads_mngr.o memory_prefetch.o adlist.o quicklist.o ae.o anet.o dict.o ebuckets.o eventnotifier.o iothread.o mstr.o entry.o kvstore.o fwtree.o estore.o server.o sds.o zmalloc.o lzf_c.o lzf_d.o pqsort.o zipmap.o sha1.o ziplist.o release.o networking.o util.o object.o db.o replication.o rdb.o t_string.o t_list.o t_set.o t_zset.o t_hash.o t_array.o sparsearray.o config.o aof.o pubsub.o multi.o debug.o sort.o intset.o syncio.o cluster.o cluster_asm.o cluster_legacy.o cluster_slot_stats.o crc16.o endianconv.o slowlog.o eval.o bio.o rio.o rand.o memtest.o syscheck.o crcspeed.o crccombine.o crc64.o bitops.o sentinel.o notify.o setproctitle.o blocked.o hyperloglog.o latency.o sparkline.o redis-check-rdb.o redis-check-aof.o geo.o lazyfree.o module.o evict.o expire.o geohash.o geohash_helper.o childinfo.o defrag.o siphash.o rax.o t_stream.o listpack.o localtime.o lolwut.o lolwut5.o lolwut6.o lolwut8.o acl.o tracking.o socket.o tls.o sha256.o timeout.o setcpuaffinity.o monotonic.o mt19937-64.o resp_parser.o call_reply.o script_lua.o script.o functions.o function_lua.o commands.o strl.o connection.o unix.o logreqres.o keymeta.o chk.o hotkeys.o gcra.o vector.o fast_float_strtod.o client_comp.o REDIS_CLI_NAME=redis-cli$(PROG_SUFFIX) REDIS_CLI_OBJ=anet.o adlist.o dict.o redis-cli.o zmalloc.o release.o ae.o redisassert.o crcspeed.o crccombine.o crc64.o siphash.o crc16.o monotonic.o cli_common.o mt19937-64.o strl.o cli_commands.o REDIS_BENCHMARK_NAME=redis-benchmark$(PROG_SUFFIX) diff --git a/src/client_comp.c b/src/client_comp.c new file mode 100644 index 00000000000..c0529823fe7 --- /dev/null +++ b/src/client_comp.c @@ -0,0 +1,647 @@ +#include "server.h" +#include "client_comp.h" +#include + +/* Abstraction over compression library */ +typedef struct compressionType { + int (*init_compress)(struct compressionState *st, int level); + int (*init_decompress)(struct compressionState *st); + int (*compress)(struct compressionState *st, int flush); + int (*decompress)(struct compressionState *st); + void (*end)(struct compressionState *st); +} compressionType; + +/* Temporary buffer used by compression library to store compressed/decompressed + * data. */ +typedef struct { + unsigned char *data; + int size; + int written; + int consumed; +} tempBuf; + +/* Main compression state struct */ +struct compressionState { + const compressionType *type; + tempBuf input; /* Buffer holding compressed data */ + tempBuf output; /* Buffer holding uncompressed(decompressed) data */ + union { + ZSTD_CStream *zstdCCtx; /* Zstd compression ctx */ + ZSTD_DStream *zstdDCtx; /* Zstd decompression ctx */ + } ctx; + int write_flush_pending; /* write flush not yet completed */ + int read_flush_pending; /* read flush not yet completed */ + int handle_pending; /* Set while processing pending decompressed data, in + * which case we must not read from the socket. */ + mstime_t last_write; /* Time since last write. Used to check if it's time + * to flush the buffer */ + compressionDirection dir; +}; + +static int decompressInto(compressionState *state, char *buf, size_t buflen); + +/* --- zstd --- */ + +static int zstdInitCompress(compressionState *st, int level) { + st->ctx.zstdCCtx = ZSTD_createCStream(); + if (!st->ctx.zstdCCtx) { + serverLog(LL_NOTICE, "Failed to create ZSTD compression context"); + return -1; + } + size_t res = ZSTD_CCtx_setParameter(st->ctx.zstdCCtx, ZSTD_c_compressionLevel, level); + if (ZSTD_isError(res)) { + ZSTD_freeCStream(st->ctx.zstdCCtx); + serverLog(LL_NOTICE, "Failed to set compression level for ZSTD compression context"); + return -1; + } + + /* temp buf storing compressed data */ + size_t outSize = ZSTD_CStreamOutSize(); + st->output.data = zmalloc(outSize); + st->output.size = outSize; + st->output.written = 0; + st->output.consumed = 0; + + /* temp buf storing uncompressed data */ + size_t inSize = ZSTD_CStreamInSize(); + st->input.data = zmalloc(inSize); + st->input.size = inSize; + st->input.written = 0; + st->input.consumed = 0; + + st->write_flush_pending = 0; + + return 0; +} + +static int zstdInitDecompress(compressionState *st) { + st->ctx.zstdDCtx = ZSTD_createDStream(); + if (!st->ctx.zstdDCtx) { + serverLog(LL_NOTICE, "Failed to create ZSTD decompression context"); + return -1; + } + + /* temp buf storing compressed data */ + size_t inSize = ZSTD_DStreamInSize(); + st->input.data = zmalloc(inSize); + st->input.size = inSize; + st->input.written = 0; + st->input.consumed = 0; + + /* temp buf storing decompressed data */ + size_t outSize = ZSTD_DStreamOutSize(); + st->output.data = zmalloc(outSize); + st->output.size = outSize; + st->output.written = 0; + st->output.consumed = 0; + + st->read_flush_pending = 0; + + return 0; +} + +static int zstdCompress(compressionState *st, int flush) { + ZSTD_inBuffer input = { + .src = st->input.data, + .size = st->input.written, + .pos = st->input.consumed + }; + ZSTD_outBuffer output = { + .dst = st->output.data, + .size = st->output.size, + .pos = st->output.written + }; + + ZSTD_EndDirective directive; + /* We use ZSTD_e_end instead of ZSTD_e_flush when we want to flush zstd's. + * This flushes zstd's internal buffers but also ends the current frame. + * This of course lowers the compression ratio but massively increases speed + * on the decompression side also, as it doesn't need to wait for more data. + * The resulting compression ratio is still very good (tested with default + * compression level). */ + if (flush || st->write_flush_pending) + directive = ZSTD_e_end; + else + directive = ZSTD_e_continue; + + size_t ret; + do { + ret = ZSTD_compressStream2(st->ctx.zstdCCtx, &output, &input, directive); + if (ZSTD_isError(ret)) { + serverLog(LL_WARNING, "zstd compress error: %s", ZSTD_getErrorName(ret)); + return -1; + } + } while (ret > 0 && output.pos < output.size); + + /* If we pass a directive different than ZSTD_e_continue to zstd we want to + * keep using that directive until compressStream2 returns 0. By keeping + * this flag raised we know we are in the process of flushing data, i.e we + * cannot use ZSTD_e_continue before we have flushed it all. */ + st->write_flush_pending = (directive == ZSTD_e_end && ret > 0); + + st->input.consumed = input.pos; + st->output.written = output.pos; + + return 0; +} + +static int zstdDecompress(compressionState *st) { + ZSTD_inBuffer input = { + .src = st->input.data, + .size = st->input.written, + .pos = st->input.consumed + }; + ZSTD_outBuffer output = { + .dst = st->output.data, + .size = st->output.size, + .pos = st->output.written + }; + + size_t ret = ZSTD_decompressStream(st->ctx.zstdDCtx, &output, &input); + if (ZSTD_isError(ret)) { + serverLog(LL_NOTICE, "zstd decompress error: %s", ZSTD_getErrorName(ret)); + return -1; + } + + /* Don't try to flush again if we already tried, no more progress can be + * made without additional input. */ + st->read_flush_pending = !st->read_flush_pending && (ret > 0); + + st->input.consumed = input.pos; + st->output.written = output.pos; + + return 0; +} + +static void zstdEnd(compressionState *st) { + if (st->dir == COMPRESS && st->ctx.zstdCCtx) { + /* Flush any pending data so context is in a good state before closing it */ + if (st->write_flush_pending) { + size_t sz = ZSTD_CStreamOutSize(); + char *tmp = zmalloc(sz); + ZSTD_inBuffer input = { + .src = NULL, + .size = 0, + .pos = 0 + }; + ZSTD_outBuffer output = { + .dst = tmp, + .size = sz, + .pos = 0 + }; + while (ZSTD_compressStream2(st->ctx.zstdCCtx, &output, &input, ZSTD_e_end) > 0) { + /* Just ignore the output, we are closing the compression state + * anyways */ + output.pos = 0; + } + zfree(tmp); + } + ZSTD_freeCStream(st->ctx.zstdCCtx); + st->ctx.zstdCCtx = NULL; + } else if (st->dir == DECOMPRESS && st->ctx.zstdDCtx) { + ZSTD_freeDStream(st->ctx.zstdDCtx); + st->ctx.zstdDCtx = NULL; + } +} + +static const compressionType zstdType = { + .init_compress = zstdInitCompress, + .init_decompress = zstdInitDecompress, + .compress = zstdCompress, + .decompress = zstdDecompress, + .end = zstdEnd, +}; + +/* Create compression state for the client */ +static void compressionStateCreate(client *c) { + compressionState *st = zcalloc(sizeof(compressionState)); + st->type = &zstdType; + st->last_write = 0; + st->write_flush_pending = 0; + st->read_flush_pending = 0; + st->handle_pending = 0; + st->dir = CD_INVALID; + + c->compression_state = st; +} + +static void compressionStateDestroy(compressionState *state) { + if (state == NULL) return; + + state->type->end(state); + zfree(state->input.data); + zfree(state->output.data); + zfree(state); +} + +/* Create and initialize a compression state for the client. No-op if already + * initialized. `dir` indicates the compression direction, i.e if the client + * will compress or decompress data. + * Currently only viable for master/replica clients. */ +static int clientCreateCompressionState(client *c, compressionDirection dir) { + /* Client compression already initialized */ + if (c->compression_state != NULL) + return 1; + + compressionStateCreate(c); + + compressionState *st = c->compression_state; + + if (dir == COMPRESS) { + serverAssert(c->compression_level > 0 && c->flags & CLIENT_SLAVE); + + if (st->type->init_compress(st, c->compression_level) == -1) { + compressionStateDestroy(c->compression_state); + c->compression_state = NULL; + return 0; + } + + st->dir = COMPRESS; + + serverLog(LL_NOTICE, "Initialized compression at level %d for client #%llu...", + c->compression_level, (unsigned long long)c->id); + } else if (dir == DECOMPRESS) { + serverAssert(server.repl_master_compression_level > 0); + + if (st->type->init_decompress(st) == -1) { + compressionStateDestroy(c->compression_state); + c->compression_state = NULL; + return 0; + } + + st->dir = DECOMPRESS; + + serverLog(LL_NOTICE, "Decompression for master client initialized."); + } else { + /* Inaccessible */ + serverAssert(0); + } + + return 1; +} + +void clientDestroyCompressionState(client *c) { + if (c->compression_state == NULL) return; + + compressionStateDestroy(c->compression_state); + c->compression_state = NULL; + c->compression_level = 0; + c->io_flags &= ~CLIENT_IO_COMPRESSION_ENABLED; + + serverLog(LL_NOTICE, "Compression state for client #%llu%s destroyed...", + (unsigned long long)c->id, + c->flags & CLIENT_MASTER ? " (master)" : c->flags & CLIENT_SLAVE ? + " (slave)" : ""); +} + +/* Enable client compression and create compression state if not present. + * Currently only valid for primary/replica clients. + * Return 0 if compression state was not created and failed to be initialized, + * 1 if compression was enabled. */ +int clientEnableCompression(client *c, compressionDirection dir) { + serverAssert((c->flags & CLIENT_MASTER) || (c->flags & CLIENT_SLAVE)); + if (!clientCreateCompressionState(c, dir)) { + return 0; + } + + c->io_flags |= CLIENT_IO_COMPRESSION_ENABLED; + return 1; +} + +/* Disable compression without destroying compression state. */ +void clientDisableCompression(client *c) { + c->io_flags &= ~CLIENT_IO_COMPRESSION_ENABLED; +} + +/* Return 1 if data written to the client must be compressed. Note that this + * is independent from CLIENT_IO_COMPRESSION_ENABLED: once the compressed + * stream has started the peer expects compressed data, no matter which thread + * the client is currently running on. */ +int clientCompressesWrites(client *c) { + return c->compression_state && c->compression_state->dir == COMPRESS; +} + +/* Return 1 if data read from the client must be decompressed. */ +int clientDecompressesReads(client *c) { + return c->compression_state && c->compression_state->dir == DECOMPRESS; +} + +/* Compress any data fed for compression (see clientCompressAndWriteBuf) and + * write to socket. Compression library may not return compressed data + * immediately so this call may not write anything to socket. + * Force flushes the compressed buffer according to compression_max_latency. + * Return 0 on success and 1 on error. On a compression error the client is + * scheduled for close as the compressed stream cannot be recovered; on a + * socket error the caller can inspect the connection state. */ +int compressAndWrite(client *c, int *tot_written) { + if (c->compression_level <= 0) + return 0; + + compressionState *state = c->compression_state; + serverAssert(state); + + /* All available uncompressed data was consumed so we need to reset the + * uncompressed buffer */ + if (state->input.written == state->input.size && + state->input.consumed == state->input.size) + { + state->input.written = 0; + state->input.consumed = 0; + } + + if (state->output.written < state->output.size) { + /* Force flush after `compression_max_latency` ms have passed. + * Note, this only makes sense when we have enough space for compressing + * data. */ + int flush = mstime() - state->last_write > server.compression_max_latency; + if (state->type->compress(state, flush) == -1) { + /* The compressed stream is broken, there is no way to recover: + * close the link and let the replica resync. */ + freeClientAsync(c); + return 1; + } + } + + /* Try to write all the data available in the compressed buffer. */ + *tot_written = 0; + int towrite = + state->output.written - state->output.consumed; + do { + int written = connWrite( + c->conn, state->output.data + state->output.consumed, towrite); + if (written < 0) { + return 1; + } + + state->output.consumed += written; + *tot_written += written; + towrite -= written; + + /* All of the compressed data was send to the socket so we need to reset + * the compression buffer. */ + if (state->output.consumed == state->output.size) { + serverAssert(towrite == 0 && state->output.written == state->output.size); + + state->output.written = 0; + state->output.consumed = 0; + } + } while (towrite > 0); + + if (*tot_written > 0) + state->last_write = mstime(); + + return 0; +} + +/* Compress `len` bytes from `data` and write the compressed output to the + * client's connection. The compression library may buffer data internally, + * so consuming input doesn't imply an actual socket write. + * Return the number of bytes consumed from `data` (uncompressed), or -1 on + * connection error. `*socket_written` is set to the number of (compressed) + * bytes actually written to the socket. */ +ssize_t clientCompressAndWriteBuf(client *c, const char *data, size_t len, ssize_t *socket_written) { + compressionState *state = c->compression_state; + serverAssert(state && state->dir == COMPRESS); + + *socket_written = 0; + size_t consumed = 0; + while (consumed != len) { + int to_consume = + min(state->input.size - state->input.written, (int)(len - consumed)); + serverAssert(to_consume >= 0); + + memcpy(state->input.data + state->input.written, + data + consumed, to_consume); + + state->input.written += to_consume; + consumed += to_consume; + + /* Write whatever we have available in the compressed buffer */ + int written = 0; + if (compressAndWrite(c, &written)) { + if (connGetState(c->conn) != CONN_STATE_CONNECTED) { + return -1; + } + return consumed; + } + *socket_written += written; + + if (written == 0 && state->output.written == state->output.consumed) + break; + } + + return consumed; +} + +/* Read compressed data from the client's connection and decompress it into + * `buf`. Tries to read and decompress as much as possible. + * Return the number of decompressed bytes placed in `buf`, 0 if the peer + * closed the connection, or -1 if no data is available or on error (on a + * decompression error `c->read_error` is set as the stream cannot be + * recovered). + * We may not be able to consume all the data we read from the socket in one + * call (e.g. `buf` is full while decompressed data is still pending). Such + * leftovers are processed via compressionProcessPendingReads. */ +int clientReadAndDecompress(client *c, char *buf, size_t buf_len, size_t *socket_read) { + compressionState *state = c->compression_state; + serverAssert(state && state->dir == DECOMPRESS); + + *socket_read = 0; + size_t decompressed = 0; + do { + int curr = decompressInto(state, buf + decompressed, buf_len - decompressed); + if (curr < 0) { + /* Decompression error: the compressed stream is broken, there is + * no way to recover, so close the connection with the master. */ + c->read_error = CLIENT_READ_CONN_DISCONNECTED; + return -1; + } + decompressed += curr; + + int nread = 0; + /* If the handle_pending flag is raised we only decompress whatever data + * we have already read from the socket without reading anything more. + * Socket reading will happen when the event loop handles the read + * event, in which case the handle_pending flag wouldn't be raised. */ + if (!state->handle_pending) { + nread = connRead(c->conn, + state->input.data + state->input.written, + state->input.size - state->input.written); + + if (nread < 0 && connGetState(c->conn) == CONN_STATE_ERROR) + return -1; + /* Even if nread == 0 we continue the loop until decompressInto has + * nothing more it can do. */ + if (nread > 0) { + *socket_read += nread; + state->input.written += nread; + } + } + + if (curr <= 0 && nread <= 0) break; + } while (decompressed < buf_len); + + if (decompressed == 0 && connGetState(c->conn) == CONN_STATE_CONNECTED) + return -1; + + return decompressed; +} + +/* Decompress input compressed data and put it in `buf`. If decompressed data + * is more than buflen this function must be called again so output data can + * be consumed. If buflen is sufficiently large this function will decompress + * as much data as possible. */ +static int decompressInto(compressionState *state, char *buf, size_t buflen) { + if (buflen == 0) + return 0; + + int consumed = 0; + + /* Decompress as much data as possible */ + while ((size_t)consumed < buflen && + (state->read_flush_pending || + state->input.written > state->input.consumed || + state->output.written > state->output.consumed)) + { + /* Reset the decompressed buffer if all the available data is consumed */ + if (state->output.consumed == state->output.size) { + state->output.written = 0; + state->output.consumed = 0; + } + + if ((state->read_flush_pending && state->output.size > state->output.written) || + state->input.written > state->input.consumed) + { + if (state->type->decompress(state) == -1) { + return -1; + } + } + + /* Copy the decompressed data to the output buffer */ + if (state->output.written > state->output.consumed) { + size_t nonconsumed_decompressed = state->output.written - state->output.consumed; + int to_consume = min(buflen - consumed, nonconsumed_decompressed); + memcpy(buf + consumed, state->output.data + state->output.consumed, to_consume); + + state->output.consumed += to_consume; + consumed += to_consume; + } + } + + if (state->output.consumed == state->output.size) + { + state->output.written = 0; + state->output.consumed = 0; + } + + /* Reset the compressed buffer if we decompressed all the available data */ + if (state->input.consumed == state->input.size) { + state->input.written = 0; + state->input.consumed = 0; + } + + serverAssert((size_t)consumed <= buflen); + return consumed; +} + +/* Read data from input_buf and decompress it immediately. The result is written + * into output_buf. + * Return number of bytes decompressed. *consumed stores number of bytes consumed + * from input_buf. + * Note, that we may have enough compressed data inside input buf so that decompressing + * it will exceed output_len. The function must be ran in a loop until input_buf + * is fully consumed - so make sure to have free space in output_buf on each call. */ +int readFromBufAndDecompress(client *c, char *input_buf, size_t input_len, + char *output_buf, size_t output_len, size_t *consumed) +{ + compressionState *state = c->compression_state; + if (!state) { + return -1; + } + + int tot_decompressed = 0; + *consumed = 0; + while (*consumed <= input_len && (size_t)tot_decompressed < output_len) { + int to_consume = + min(state->input.size - state->input.written, + (int)(input_len - *consumed)); + + if (to_consume) + memcpy(state->input.data + state->input.written, + input_buf + *consumed, to_consume); + + *consumed += to_consume; + + state->input.written += to_consume; + + int decompressed = decompressInto(state, output_buf + tot_decompressed, + output_len - tot_decompressed); + if (decompressed <= 0) + break; + tot_decompressed += decompressed; + } + + return tot_decompressed; +} + +/* Check if we need to flush compressed data. Compression library may wait for + * a lot of compressed data before it finishes a frame and gives it back to user. + * While this gives the best compression ratio it introduces a lot of latency so + * we make a compromise and flush periodically. */ +int clientHasPendingCompressionFlush(client *c) { + compressionState *state = c->compression_state; + if (!state) return 0; + if (state->dir != COMPRESS) return 0; + + return state->write_flush_pending || (mstime() - state->last_write >= server.compression_max_latency); +} + +/* Check if we still have pending compressed data. This may mean that either the + * compression library still has data in it's internal buffers, we still have + * compressed data that needs to be consumed by the library or we have stored + * decompressed data that we still have not consumed. */ +int clientHasPendingCompressedData(client *c) { + compressionState *state = c->compression_state; + if (!state) return 0; + if (state->dir != DECOMPRESS) return 0; + + return (state->read_flush_pending && state->output.size > state->output.written) || + state->input.written > state->input.consumed || + state->output.written > state->output.consumed; +} + +/* Process pending decompressed data of the given compression clients. There + * are some scenarios in which a read event doesn't consume all the available + * data (see clientHasPendingCompressedData), e.g. the query buffer filled up + * while decompressed data was still pending. In such cases this function, + * called from the IO thread's beforeSleep, re-invokes the read handler in + * decompress-only mode (no socket reads). + * Return the number of clients that still have pending data afterwards, so + * the caller knows it should not block waiting for events. */ +int compressionProcessPendingReads(list *compression_clients) { + if (listLength(compression_clients) == 0) return 0; + + int pending = 0; + listIter li; + listNode *ln; + listRewind(compression_clients, &li); + while ((ln = listNext(&li))) { + client *c = listNodeValue(ln); + + if (c->io_flags & CLIENT_IO_CLOSE_ASAP) continue; + if (!clientHasPendingCompressedData(c)) continue; + if (!connHasReadHandler(c->conn)) continue; + + compressionState *state = c->compression_state; + state->handle_pending = 1; + readQueryFromClient(c->conn); + state->handle_pending = 0; + + if (!(c->io_flags & CLIENT_IO_CLOSE_ASAP) && + clientHasPendingCompressedData(c)) + { + pending++; + } + } + return pending; +} diff --git a/src/client_comp.h b/src/client_comp.h new file mode 100644 index 00000000000..db0bff531ee --- /dev/null +++ b/src/client_comp.h @@ -0,0 +1,51 @@ +/* + * Copyright (c) 2009-Present, Redis Ltd. + * All rights reserved. + * + * Licensed under your choice of (a) the Redis Source Available License 2.0 + * (RSALv2); or (b) the Server Side Public License v1 (SSPLv1); or (c) the + * GNU Affero General Public License v3 (AGPLv3). + */ + +#ifndef __CLIENT_COMP_H +#define __CLIENT_COMP_H + +#include + +#include "adlist.h" + +/* Opaque handle to client's compression state used internally by client_comp */ +typedef struct compressionState compressionState; + +struct client; + +typedef enum { + CD_INVALID, + COMPRESS, + DECOMPRESS, +} compressionDirection; + +int clientEnableCompression(struct client *c, compressionDirection dir); +void clientDisableCompression(struct client *c); +void clientDestroyCompressionState(struct client *c); + +int clientCompressesWrites(struct client *c); +int clientDecompressesReads(struct client *c); + +ssize_t clientCompressAndWriteBuf(struct client *c, const char *data, size_t len, + ssize_t *socket_written); +int clientReadAndDecompress(struct client *c, char *buf, size_t buf_len, + size_t *socket_read); + +int compressAndWrite(struct client *c, int *tot_written); + +int readFromBufAndDecompress(struct client *c, char *input_buf, size_t input_len, + char *output_buf, size_t output_len, + size_t *consumed); + +int clientHasPendingCompressionFlush(struct client *c); +int clientHasPendingCompressedData(struct client *c); + +int compressionProcessPendingReads(list *compression_clients); + +#endif /* __CLIENT_COMP_H */ diff --git a/src/config.c b/src/config.c index 97fa58a11d5..01145e23928 100644 --- a/src/config.c +++ b/src/config.c @@ -3304,6 +3304,8 @@ standardConfig static_configs[] = { createIntConfig("cluster-slot-migration-max-archived-tasks", NULL, MODIFIABLE_CONFIG | HIDDEN_CONFIG, 1, INT_MAX, server.asm_max_archived_tasks, 32, INTEGER_CONFIG, NULL, NULL), createIntConfig("lookahead", NULL, MODIFIABLE_CONFIG, 1, INT_MAX, server.lookahead, REDIS_DEFAULT_LOOKAHEAD, INTEGER_CONFIG, NULL, NULL), createIntConfig("slowlog-entry-max-argc", NULL, MODIFIABLE_CONFIG, 2, INT_MAX, server.slowlog_max_argc, 32, INTEGER_CONFIG, NULL, NULL), + createIntConfig("repl-compression", NULL, MODIFIABLE_CONFIG, 0, 22, server.repl_compression, 0, INTEGER_CONFIG, NULL, NULL), + createIntConfig("compression-max-latency", NULL, MODIFIABLE_CONFIG, 0, INT_MAX, server.compression_max_latency, 100, INTEGER_CONFIG, NULL, NULL), /* 100ms */ /* Unsigned int configs */ createUIntConfig("maxclients", NULL, MODIFIABLE_CONFIG, 1, UINT_MAX, server.maxclients, 10000, INTEGER_CONFIG, NULL, updateMaxclients), diff --git a/src/iothread.c b/src/iothread.c index 3ee3e674b12..80bf659415d 100644 --- a/src/iothread.c +++ b/src/iothread.c @@ -10,6 +10,12 @@ #include "server.h" +#define IO_DEFAULT_HZ CONFIG_DEFAULT_HZ + +/* Replicates the behaviour of run_with_period used in serverCron but for + * IO threads. IO threads use default Hz for now. */ +#define run_with_period_io(_t_, _ms_) _run_with_period((_t_)->cronloops, (_ms_), IO_DEFAULT_HZ) + /* IO threads. */ IOThread IOThreads[IO_THREADS_MAX_NUM]; @@ -124,6 +130,13 @@ void enqueuePendingClientsToMainThread(client *c, int unbind) { listUnlinkNode(t->clients, c->io_thread_client_list_node); listLinkNodeTail(t->pending_clients_to_main_thread, c->io_thread_client_list_node); c->io_thread_client_list_node = NULL; + + if (listSearchKey(t->compression_clients, c) == + &c->io_thread_compression_clients_node) + { + listUnlinkNode(t->compression_clients, + &c->io_thread_compression_clients_node); + } } } @@ -149,6 +162,17 @@ void enqueuePendingClienstToIOThreads(client *c) { c->io_lastinteraction = c->lastinteraction; } + /* Check here prevents data races with IO thread which may also check + * this flag. */ + if (!(c->io_flags & CLIENT_IO_COMPRESSION_ENABLED)) { + if (c->compression_level > 0) { + clientEnableCompression(c, COMPRESS); + } + if (c->flags & CLIENT_MASTER && server.repl_master_compression_level > 0) { + clientEnableCompression(c, DECOMPRESS); + } + } + c->running_tid = c->tid; listAddNodeHead(mainThreadPendingClientsToIOThreads[c->tid], c); } @@ -158,10 +182,24 @@ void enqueuePendingClienstToIOThreads(client *c) { void unbindClientFromIOThreadEventLoop(client *c) { serverAssert(c->tid != IOTHREAD_MAIN_THREAD_ID && c->running_tid == IOTHREAD_MAIN_THREAD_ID); - if (!connHasEventLoop(c->conn)) return; + /* If the client is not bound to an event loop there is nothing to do, + * unless the client uses repl compression in which case we need to unlink + * it from IO Thread compression_clients list. */ + if (!connHasEventLoop(c->conn) && !c->compression_state) return; + /* As calling in main thread, we should pause the io thread to make it safe. */ pauseIOThread(c->tid); connUnbindEventLoop(c->conn); + IOThread *t = &IOThreads[c->tid]; + /* We need to remove the client from the compression_clients list so it + * won't be processed in IOThreadCompressionCron anymore */ + if (listSearchKey(t->compression_clients, c) == + &c->io_thread_compression_clients_node) + { + listUnlinkNode(t->compression_clients, + &c->io_thread_compression_clients_node); + clientDisableCompression(c); + } resumeIOThread(c->tid); } @@ -304,7 +342,7 @@ void assignClientToIOThread(client *c) { * write, and then put it in the list, main thread will send these clients * to IO thread in beforeSleep. */ connUnbindEventLoop(c->conn); - c->io_flags &= ~(CLIENT_IO_READ_ENABLED | CLIENT_IO_WRITE_ENABLED); + c->io_flags &= ~(CLIENT_IO_READ_ENABLED | CLIENT_IO_WRITE_ENABLED | CLIENT_IO_COMPRESSION_ENABLED); enqueuePendingClienstToIOThreads(c); } @@ -765,6 +803,14 @@ int processClientsFromMainThread(IOThread *t) { connSetReadHandler(c->conn, readQueryFromClient); } + /* Add the client to the compression clients list. */ + if (c->compression_state != NULL && + listSearchKey(t->compression_clients, c) == NULL) + { + listLinkNodeTail(t->compression_clients, + &c->io_thread_compression_clients_node); + } + /* If the client has pending replies, write replies to client. */ if (clientHasPendingReplies(c)) { writeToClient(c, 0); @@ -784,8 +830,13 @@ void IOThreadBeforeSleep(struct aeEventLoop *el) { /* Handle pending data(typical TLS). */ connTypeProcessPendingData(el); - /* If any connection type(typical TLS) still has pending unread data don't sleep at all. */ - int dont_sleep = connTypeHasPendingData(el); + /* Handle decompressed data that wasn't fully consumed by the last read + * event (e.g. the query buffer filled up). */ + int compression_pending = compressionProcessPendingReads(t->compression_clients); + + /* If any connection type(typical TLS) or compression client still has + * pending unread data don't sleep at all. */ + int dont_sleep = connTypeHasPendingData(el) || compression_pending > 0; /* Process clients from main thread, since the main thread may deliver clients * without notification during IO thread processing events. */ @@ -825,7 +876,7 @@ void IOThreadClientsCron(IOThread *t) { /* Process at least a few clients while we are at it, even if we need * to process less than CLIENTS_CRON_MIN_ITERATIONS to meet our contract * of processing each client once per second. */ - int iterations = listLength(t->clients) / CONFIG_DEFAULT_HZ; + int iterations = listLength(t->clients) / IO_DEFAULT_HZ; if (iterations < CLIENTS_CRON_MIN_ITERATIONS) { iterations = CLIENTS_CRON_MIN_ITERATIONS; } @@ -841,7 +892,49 @@ void IOThreadClientsCron(IOThread *t) { } } -/* This is the IO thread timer interrupt, CONFIG_DEFAULT_HZ times per second. +void IOThreadCompressionCron(IOThread *t) { + if (listLength(t->compression_clients) == 0) return; + + listIter li; + listNode *ln; + listRewind(t->compression_clients, &li); + /* TODO: if compression is generalized for all types of clients this cron + * will need to only process a portion of the clients (similar to + * IOThreadClientsCron) for performance reasons. I.e in such case the + * clientHasPendingCompressionFlush check may be moved directly to + * IOThreadClientsCron. */ + while ((ln = listNext(&li))) { + client *c = listNodeValue(ln); + + if (c->io_flags & CLIENT_IO_CLOSE_ASAP) continue; + serverAssert(c->compression_state); + + /* Usually compressAndWrite is called via clientCompressAndWriteBuf + * when repl data is written to the replica, but when compression + * maximum latency ms have passed we want to force flush the + * compression buffer so we don't have much delays between writes to + * the socket */ + if (clientHasPendingCompressionFlush(c)) { + /* Only master/replica clients support client compression for now. */ + serverAssert(c->flags & CLIENT_SLAVE); + + int written = 0; + int err = compressAndWrite(c, &written); + if (err) { + if (connGetState(c->conn) != CONN_STATE_CONNECTED) + freeClientAsync(c); + continue; + } + + if (written > 0) { + c->net_output_bytes += written; + atomicIncr(server.stat_net_repl_output_bytes, written); + } + } + } +} + +/* This is the IO thread timer interrupt, IO_DEFAULT_HZ times per second. * The current responsibility is to detect clients that have been stuck in the * IO thread for too long and hand them over to the main thread for handling. */ int IOThreadCron(struct aeEventLoop *eventLoop, long long id, void *clientData) { @@ -849,10 +942,14 @@ int IOThreadCron(struct aeEventLoop *eventLoop, long long id, void *clientData) UNUSED(id); IOThread *t = clientData; + run_with_period_io(t, server.compression_max_latency) IOThreadCompressionCron(t); + /* Run cron tasks for the clients in the IO thread. */ IOThreadClientsCron(t); - return 1000/CONFIG_DEFAULT_HZ; + t->cronloops++; + + return 1000/IO_DEFAULT_HZ; } /* The main function of IO thread, it will run an event loop. The mian thread @@ -894,6 +991,8 @@ void initThreadedIO(void) { t->processing_clients = listCreate(); t->pending_clients_to_main_thread = listCreate(); t->clients = listCreate(); + t->compression_clients = listCreate(); + t->cronloops = 0; atomicSetWithSync(t->paused, IO_THREAD_UNPAUSED); atomicSetWithSync(t->running, 0); diff --git a/src/networking.c b/src/networking.c index 2f5384c3b99..6e3e77b94f1 100644 --- a/src/networking.c +++ b/src/networking.c @@ -227,6 +227,7 @@ client *createClient(connection *conn) { c->sockname = NULL; c->client_list_node = NULL; c->io_thread_client_list_node = NULL; + listInitNode(&c->io_thread_compression_clients_node, c); c->postponed_list_node = NULL; c->client_tracking_redirection = 0; c->client_tracking_prefixes = NULL; @@ -256,6 +257,8 @@ client *createClient(connection *conn) { c->stat_avg_pipeline_length_cnt = 0; c->task = NULL; c->node_id = NULL; + c->compression_level = 0; + c->compression_state = NULL; atomicSet(c->pending_read, 0); return c; } @@ -2199,6 +2202,9 @@ void freeClient(client *c) { listDelNode(server.clients_to_close,ln); } + /* Disable compression if present */ + clientDestroyCompressionState(c); + /* If it is our master that's being disconnected we should make sure * to cache the state to try a partial resynchronization later. * @@ -2216,7 +2222,8 @@ void freeClient(client *c) { /* Log link disconnection with slave */ if (clientTypeIsSlave(c)) { - const char *type = c->flags & CLIENT_REPL_RDB_CHANNEL ? " (rdbchannel)" : ""; + int is_rdb_ch = c->flags & CLIENT_REPL_RDB_CHANNEL; + const char *type = is_rdb_ch ? " (rdbchannel)" : ""; serverLog(LL_NOTICE,"Connection with replica%s %s lost.", type, replicationGetSlaveName(c)); } @@ -2717,6 +2724,52 @@ static inline int _writeToClientNonSlave(client *c, ssize_t *nwritten) { return C_OK; } +static inline int _writeToClientSlaveIOThread(client *c, ssize_t *nwritten) { + replBufBlock *o = listNodeValue(c->io_curr_repl_node); + /* The IO thread must not send data beyond the bound position. */ + size_t pos = c->io_curr_repl_node == c->io_bound_repl_node ? + c->io_bound_block_pos : o->used; + if (pos > c->io_curr_block_pos) { + ssize_t consumed; + if (clientCompressesWrites(c)) { + /* Note, that consumed is how much bytes we've read from the repl + * buffer, whereas socket_written is the number of compressed bytes + * actually written to the socket, which would most certainly be a + * different number. */ + ssize_t socket_written = 0; + consumed = clientCompressAndWriteBuf(c, o->buf+c->io_curr_block_pos, + pos-c->io_curr_block_pos, + &socket_written); + if (consumed <= 0) { + if (consumed < 0) *nwritten = -1; + return C_ERR; + } + *nwritten += socket_written; + /* Since nwritten stores the number of compressed bytes written to + * socket we also store the uncompressed size for stats. */ + atomicIncr(server.stat_net_repl_uncompressed_bytes, consumed); + } else { + consumed = connWrite(c->conn, o->buf+c->io_curr_block_pos, + pos-c->io_curr_block_pos); + if (consumed <= 0) { + if (consumed < 0) *nwritten = -1; + return C_ERR; + } + *nwritten += consumed; + } + + /* Advance the block position with consumed, because that's how much + * bytes were read from the repl buffer node. */ + c->io_curr_block_pos += consumed; + } + /* If we fully sent the object and there are more nodes to send, go to the next one. */ + if (c->io_curr_block_pos == pos && c->io_curr_repl_node != c->io_bound_repl_node) { + c->io_curr_repl_node = listNextNode(c->io_curr_repl_node); + c->io_curr_block_pos = 0; + } + return C_OK; +} + /* This function does actual writing output buffers for slave client types, * it is called by writeToClient. * If we write successfully, it returns C_OK, otherwise, C_ERR is returned, @@ -2727,33 +2780,37 @@ static inline int _writeToClientSlave(client *c, ssize_t *nwritten) { serverAssert(c->bufpos == 0 && listLength(c->reply) == 0); if (c->running_tid != IOTHREAD_MAIN_THREAD_ID) { - replBufBlock *o = listNodeValue(c->io_curr_repl_node); - /* The IO thread must not send data beyond the bound position. */ - size_t pos = c->io_curr_repl_node == c->io_bound_repl_node ? - c->io_bound_block_pos : o->used; - if (pos > c->io_curr_block_pos) { - *nwritten = connWrite(c->conn, o->buf+c->io_curr_block_pos, - pos-c->io_curr_block_pos); - if (*nwritten <= 0) return C_ERR; - c->io_curr_block_pos += *nwritten; - } - /* If we fully sent the object and there are more nodes to send, go to the next one. */ - if (c->io_curr_block_pos == pos && c->io_curr_repl_node != c->io_bound_repl_node) { - c->io_curr_repl_node = listNextNode(c->io_curr_repl_node); - c->io_curr_block_pos = 0; - } - return C_OK; + return _writeToClientSlaveIOThread(c, nwritten); } replBufBlock *o = listNodeValue(c->ref_repl_buf_node); serverAssert(o->used >= c->ref_block_pos); /* Send current block if it is not fully sent. */ if (o->used > c->ref_block_pos) { - *nwritten = connWrite(c->conn, o->buf+c->ref_block_pos, - o->used-c->ref_block_pos); - if (*nwritten <= 0) return C_ERR; - c->ref_block_pos += *nwritten; + ssize_t consumed; + if (clientCompressesWrites(c)) { + ssize_t socket_written = 0; + consumed = clientCompressAndWriteBuf(c, o->buf+c->ref_block_pos, + o->used-c->ref_block_pos, + &socket_written); + if (consumed <= 0) { + if (consumed < 0) *nwritten = -1; + return C_ERR; + } + *nwritten = socket_written; + atomicIncr(server.stat_net_repl_uncompressed_bytes, consumed); + } else { + consumed = connWrite(c->conn, o->buf+c->ref_block_pos, + o->used-c->ref_block_pos); + if (consumed <= 0) { + if (consumed < 0) *nwritten = -1; + return C_ERR; + } + *nwritten = consumed; + } + c->ref_block_pos += consumed; } + /* If we fully sent the object on head, go to the next one. */ listNode *next = listNextNode(c->ref_repl_buf_node); if (next && c->ref_block_pos == o->used) { @@ -3821,7 +3878,13 @@ void readQueryFromClient(connection *conn) { int nread, big_arg = 0; size_t qblen, readlen; - if (!(c->io_flags & CLIENT_IO_READ_ENABLED)) { + /* We have to read compressed data but compression for this client is + * currently disabled. This could happened f.e when client was just fetched + * to main thread. */ + int pending_compression_read = c->compression_state && + !(c->io_flags & CLIENT_IO_COMPRESSION_ENABLED); + + if (!(c->io_flags & CLIENT_IO_READ_ENABLED) || pending_compression_read) { atomicSetWithSync(c->pending_read, 1); return; } else if (server.io_threads_num > 1) { @@ -3901,12 +3964,25 @@ void readQueryFromClient(connection *conn) { /* Read as much as possible from the socket to save read(2) system calls. */ readlen = sdsavail(c->querybuf); } - nread = connRead(c->conn, c->querybuf+qblen, readlen); + size_t network_read = 0; + if (clientDecompressesReads(c)) { + /* nread is the number of decompressed bytes placed in the query + * buffer, whereas network_read is the number of (compressed) bytes + * actually read from the socket. */ + nread = clientReadAndDecompress(c, c->querybuf+qblen, readlen, &network_read); + } else { + nread = connRead(c->conn, c->querybuf+qblen, readlen); + if (nread > 0) network_read = nread; + } if (nread == -1) { - if (connGetState(conn) == CONN_STATE_CONNECTED) { + /* clientReadAndDecompress sets read_error on a decompression error, + * in which case the link is not recoverable even if the connection + * is still alive. */ + if (c->read_error == 0 && connGetState(conn) == CONN_STATE_CONNECTED) { goto done; } else { - c->read_error = CLIENT_READ_CONN_DISCONNECTED; + if (c->read_error == 0) + c->read_error = CLIENT_READ_CONN_DISCONNECTED; freeClientAsync(c); goto done; } @@ -3928,6 +4004,9 @@ void readQueryFromClient(connection *conn) { * c->lastinteraction will be updated during processClientsFromIOThread */ c->io_lastinteraction = server.unixtime; + if (clientDecompressesReads(c)) + atomicIncr(server.stat_net_repl_decompressed_bytes, nread); + if (c->flags & CLIENT_MASTER) { if (c->running_tid == IOTHREAD_MAIN_THREAD_ID) { c->read_reploff += nread; @@ -3935,11 +4014,11 @@ void readQueryFromClient(connection *conn) { /* Same comment as for c->io_lastinteraction */ c->io_read_reploff += nread; } - atomicIncr(server.stat_net_repl_input_bytes, nread); + atomicIncr(server.stat_net_repl_input_bytes, network_read); } else { - atomicIncr(server.stat_net_input_bytes, nread); + atomicIncr(server.stat_net_input_bytes, network_read); } - c->net_input_bytes += nread; + c->net_input_bytes += network_read; if (!(c->flags & CLIENT_MASTER) && /* The commands cached in the MULTI/EXEC queue have not been executed yet, diff --git a/src/replication.c b/src/replication.c index aaedabd123f..73f4ad851e6 100644 --- a/src/replication.c +++ b/src/replication.c @@ -146,7 +146,8 @@ void putReplicasInPendingClientsToIOThreads(void) { * also send them to the IO thread. */ if (replica->flags & CLIENT_PENDING_WRITE || clientHasPendingReplies(replica) || - replicaFromIOThreadHasPendingRead(replica)) + replicaFromIOThreadHasPendingRead(replica) || + clientHasPendingCompressionFlush(replica)) { enqueuePendingClienstToIOThreads(replica); } @@ -1482,6 +1483,37 @@ void replconfCommand(client *c) { server.repl_diskless_sync) { c->slave_capa |= SLAVE_CAPA_RDB_CHANNEL_REPL; } + } else if (!strcasecmp(c->argv[j]->ptr, "compression")) { + long level; + + if ((getLongFromObjectOrReply(c,c->argv[j+1], &level,NULL) != C_OK)) + return; + + if (server.io_threads_num > 1) { + if (level == (long)server.repl_compression) { + serverLog(LL_NOTICE, "Client #%llu request for replication " + "compression with level %ld accepted", + (unsigned long long)c->id, level); + + c->compression_level = level; + } else { + serverLog(LL_NOTICE, "Client #%llu request for replication " + "compression rejected. Compression levels differ.", + (unsigned long long)c->id); + c->compression_level = 0; + addReplyErrorFormat(c, "Requested level %ld differs from master's level: %d", + level, server.repl_compression); + return; + } + } else { + serverLog(LL_NOTICE, "Client #%llu request for replication " + "compression rejected. Replication compression is only " + "enabled with IO threads.", + (unsigned long long)c->id); + c->compression_level = 0; + addReplyError(c, "Master has not enabled replication compression"); + return; + } } else if (!strcasecmp(c->argv[j]->ptr,"ack")) { /* REPLCONF ACK is used by slave to inform the master the amount * of replication stream that it processed so far. It is an @@ -1518,6 +1550,7 @@ void replconfCommand(client *c) { * that 'ack' might be received before we detect bgsave is done. */ if (c->replstate == SLAVE_STATE_SEND_BULK_AND_STREAM) replicaPutOnline(c); + /* Note: this command does not reply anything! */ return; } else if (!strcasecmp(c->argv[j]->ptr,"getack")) { @@ -2700,6 +2733,12 @@ void readSyncBulkPayload(connection *conn) { if (server.repl_backlog == NULL) createReplicationBacklog(); serverLog(LL_NOTICE, "MASTER <-> REPLICA sync: Finished with success"); + /* If we agreed on compression with the master then setup compression on the + * master client. At this point we're done reading all non compressed payload + * from the master */ + if (server.repl_master_compression_level) + clientEnableCompression(server.master, DECOMPRESS); + if (server.supervised_mode == SUPERVISED_SYSTEMD) { redisCommunicateSystemd("STATUS=MASTER <-> REPLICA sync: Finished with success. Ready to accept connections in read-write mode.\n"); } @@ -3182,6 +3221,14 @@ void syncWithMaster(connection *conn) { if (err) goto write_error; } + /* Try to setup client compression if we're configured so */ + if (server.repl_compression > 0) { + sds level = sdsfromlonglong(server.repl_compression); + err = sendCommand(conn, "REPLCONF", "compression", level, NULL); + sdsfree(level); + if (err) goto write_error; + } + /* Inform the master of our (slave) capabilities. * * EOF: supports EOF-style RDB transfer for diskless replication. @@ -3250,7 +3297,7 @@ void syncWithMaster(connection *conn) { } if (server.repl_state == REPL_STATE_RECEIVE_REQ_REPLY && !no_compress_checksum) - server.repl_state = REPL_STATE_RECEIVE_CAPA_REPLY; + server.repl_state = REPL_STATE_RECEIVE_CLIENT_COMP; /* Receive REPLCONF REQUEST reply (rdb-no-compress and rdb-no-checksum). */ if (server.repl_state == REPL_STATE_RECEIVE_REQ_REPLY) { @@ -3263,6 +3310,32 @@ void syncWithMaster(connection *conn) { "REPLCONF rdb-no-compress/checksum: %s", err); } sdsfree(err); + server.repl_state = REPL_STATE_RECEIVE_CLIENT_COMP; + return; + } + + if (server.repl_state == REPL_STATE_RECEIVE_CLIENT_COMP && server.repl_compression <= 0) + server.repl_state = REPL_STATE_RECEIVE_CAPA_REPLY; + + if (server.repl_state == REPL_STATE_RECEIVE_CLIENT_COMP) { + err = receiveSynchronousResponse(conn); + if (err == NULL) goto no_response_error; + + /* Ignore the error if any, not all the Redis versions support + * REPLCONF compression. */ + if (err[0] == '-') { + serverLog(LL_NOTICE, + "(Non critical) Replication compression not enabled: %s", + err); + server.repl_compression = 0; + server.repl_master_compression_level = 0; + } else { + serverLog(LL_NOTICE, "Master agreed to compression level %d", + server.repl_compression); + server.repl_master_compression_level = server.repl_compression; + } + sdsfree(err); + err = NULL; server.repl_state = REPL_STATE_RECEIVE_CAPA_REPLY; return; } @@ -3334,6 +3407,9 @@ void syncWithMaster(connection *conn) { if (psync_result == PSYNC_CONTINUE) { serverLog(LL_NOTICE, "MASTER <-> REPLICA sync: Master accepted a Partial Resynchronization."); + if (server.repl_master_compression_level > 0) { + clientEnableCompression(server.master, DECOMPRESS); + } if (server.supervised_mode == SUPERVISED_SYSTEMD) { redisCommunicateSystemd("STATUS=MASTER <-> REPLICA sync: Partial Resynchronization accepted. Ready to accept connections in read-write mode.\n"); } @@ -4126,6 +4202,9 @@ int replDataBufStreamToDb(replDataBuf *buf, replDataBufToDbCtx *ctx) { int ret = C_OK; client *c = ctx->client; + if (server.repl_master_compression_level > 0) + serverAssert(server.master->compression_state); + blockingOperationStarts(); while ((n = listFirst(buf->blocks))) { replDataBufBlock *o = listNodeValue(n); @@ -4134,30 +4213,59 @@ int replDataBufStreamToDb(replDataBuf *buf, replDataBufToDbCtx *ctx) { size_t processed = 0; while (processed < o->used) { - size_t bytes = min(PROTO_IOBUF_LEN, o->used - processed); - c->querybuf = sdscatlen(c->querybuf, &o->buf[processed], bytes); - c->read_reploff += (long long int) bytes; + /* Consumed bytes from the current block in the current iteration. */ + size_t consumed = 0; + + if (server.repl_master_compression_level > 0) { + c->querybuf = sdsMakeRoomFor(c->querybuf, PROTO_IOBUF_LEN); + + size_t qblen = sdslen(c->querybuf); + size_t avail = sdsavail(c->querybuf); + int decompressed = readFromBufAndDecompress(c, o->buf + processed, + o->used - processed, + c->querybuf + qblen, + avail, + &consumed); + serverAssert(decompressed >= 0); + sdsIncrLen(c->querybuf, decompressed); + c->read_reploff += (long long) decompressed; + c->io_read_reploff += (long long int) decompressed; + + atomicIncr(server.stat_net_repl_decompressed_bytes, decompressed); + } else { + size_t bytes = min(PROTO_IOBUF_LEN, o->used - processed); + c->querybuf = sdscatlen(c->querybuf, &o->buf[processed], bytes); + c->read_reploff += (long long int) bytes; + c->io_read_reploff += (long long int) bytes; + consumed = bytes; + } c->lastinteraction = server.unixtime; /* We don't expect error return value but just in case. */ ret = processInputBuffer(c); if (ret != C_OK) break; - processed += bytes; - buf->used -= bytes; + processed += consumed; + buf->used -= consumed; - if (server.repl_debug_pause & REPL_DEBUG_ON_STREAMING_REPL_BUF) + if (server.repl_debug_pause & REPL_DEBUG_ON_STREAMING_REPL_BUF) { debugPauseProcess(); + /* Always process events after debug break as tests may have + * ran other debug commands we need to process and it's not + * always possible to enter the below condition relying on + * `server.loading_process_events_interval_bytes`. */ + processEventsWhileBlocked(); + } /* Check if we should yield back to the event loop */ if (server.loading_process_events_interval_bytes && - ((ctx->applied_offset + bytes) / server.loading_process_events_interval_bytes > + ((ctx->applied_offset + consumed) / server.loading_process_events_interval_bytes > ctx->applied_offset / server.loading_process_events_interval_bytes)) { ctx->yield_callback(ctx); processEventsWhileBlocked(); } - ctx->applied_offset += bytes; + ctx->applied_offset += consumed; /* Check if we should continue processing */ if (!ctx->should_continue(ctx)) { diff --git a/src/server.c b/src/server.c index df660175e09..b0a541c30cd 100644 --- a/src/server.c +++ b/src/server.c @@ -2915,6 +2915,8 @@ void resetServerStats(void) { atomicSet(server.stat_net_output_bytes, 0); atomicSet(server.stat_net_repl_input_bytes, 0); atomicSet(server.stat_net_repl_output_bytes, 0); + atomicSet(server.stat_net_repl_uncompressed_bytes, 0); + atomicSet(server.stat_net_repl_decompressed_bytes, 0); server.stat_unexpected_error_replies = 0; server.stat_total_error_replies = 0; server.stat_dump_payload_sanitizations = 0; @@ -6642,6 +6644,7 @@ sds genRedisInfoString(dict *section_dict, int all_sections, int everything) { long long stat_total_client_process_input_buff_events; long long stat_avg_pipeline_length_sum; long long stat_avg_pipeline_length_cnt; + long long stat_net_repl_uncompressed_bytes, stat_net_repl_decompressed_bytes; long long current_eviction_exceeded_time = server.stat_last_eviction_exceeded_time ? (long long) elapsedUs(server.stat_last_eviction_exceeded_time): 0; long long current_active_defrag_time = server.stat_last_active_defrag_time ? @@ -6655,6 +6658,8 @@ sds genRedisInfoString(dict *section_dict, int all_sections, int everything) { atomicGet(server.stat_total_client_process_input_buff_events, stat_total_client_process_input_buff_events); atomicGet(server.stat_avg_pipeline_length_sum, stat_avg_pipeline_length_sum); atomicGet(server.stat_avg_pipeline_length_cnt, stat_avg_pipeline_length_cnt); + atomicGet(server.stat_net_repl_uncompressed_bytes, stat_net_repl_uncompressed_bytes); + atomicGet(server.stat_net_repl_decompressed_bytes, stat_net_repl_decompressed_bytes); /* If we calculated the total reads and writes in the threads section, * we don't need to do it again, and also keep the values consistent. */ @@ -6747,6 +6752,12 @@ sds genRedisInfoString(dict *section_dict, int all_sections, int everything) { if (!server.cluster_enabled && server.cluster_compatibility_sample_ratio) { info = sdscatprintf(info, "cluster_incompatible_ops:%lld\r\n", server.stat_cluster_incompatible_ops); } + if (stat_net_repl_uncompressed_bytes > 0) { + info = sdscatprintf(info, "total_net_repl_uncompressed_bytes:%lld\r\n", stat_net_repl_uncompressed_bytes); + } + if (stat_net_repl_decompressed_bytes > 0) { + info = sdscatprintf(info, "total_net_repl_decompressed_bytes:%lld\r\n", stat_net_repl_decompressed_bytes); + } } /* Replication */ diff --git a/src/server.h b/src/server.h index 9318eec686d..9803d0fc23d 100644 --- a/src/server.h +++ b/src/server.h @@ -67,6 +67,7 @@ typedef long long ustime_t; /* microsecond time type. */ #include "connection.h" /* Connection abstraction */ #include "eventnotifier.h" /* Event notification */ #include "memory_prefetch.h" +#include "client_comp.h" /* Forward declarations needed by redismodule.h and keymeta.h */ struct redisObject; @@ -460,6 +461,7 @@ extern int configOOMScoreAdjValuesDefaults[CONFIG_OOM_COUNT]; #define CLIENT_IO_REUSABLE_QUERYBUFFER (1ULL<<3) /* The client is using the reusable query buffer. */ #define CLIENT_IO_CLOSE_ASAP (1ULL<<4) /* Close this client ASAP in IO thread. */ #define CLIENT_IO_PENDING_CRON (1ULL<<5) /* The client is pending cron job, to be processed in main thread. */ +#define CLIENT_IO_COMPRESSION_ENABLED (1ULL<<6) /* The client compression is enabled for this client*/ /* Definitions for client read errors. These error codes are used to indicate * various issues that can occur while reading or parsing data from a client. */ @@ -526,6 +528,7 @@ typedef enum { REPL_STATE_RECEIVE_PORT_REPLY, /* Wait for REPLCONF reply */ REPL_STATE_RECEIVE_IP_REPLY, /* Wait for REPLCONF reply */ REPL_STATE_RECEIVE_REQ_REPLY, /* Wait for REPLCONF reply */ + REPL_STATE_RECEIVE_CLIENT_COMP, /* Wait for CLIENT_COMP reply */ REPL_STATE_RECEIVE_CAPA_REPLY, /* Wait for REPLCONF reply */ REPL_STATE_SEND_PSYNC, /* Send PSYNC */ REPL_STATE_RECEIVE_PSYNC_REPLY, /* Wait for PSYNC reply */ @@ -810,10 +813,12 @@ typedef enum { #endif #define NOTIFY_ALL (NOTIFY_GENERIC | NOTIFY_STRING | NOTIFY_LIST | NOTIFY_SET | NOTIFY_HASH | NOTIFY_ZSET | NOTIFY_EXPIRED | NOTIFY_EVICTED | NOTIFY_STREAM | NOTIFY_MODULE | NOTIFY_ARRAY) /* A flag */ +#define _run_with_period(_cronloops_, _ms_, _hz_) if (((_ms_) <= 1000/(_hz_)) || !((_cronloops_)%((_ms_)/(1000/(_hz_))))) + /* Using the following macro you can run code inside serverCron() with the * specified period, specified in milliseconds. * The actual resolution depends on server.hz. */ -#define run_with_period(_ms_) if (((_ms_) <= 1000/server.hz) || !(server.cronloops%((_ms_)/(1000/server.hz)))) +#define run_with_period(_ms_) _run_with_period(server.cronloops, (_ms_), server.hz) /* We can print the stacktrace, so our assert is defined this way: */ #define serverAssertWithInfo(_c,_o,_e) (likely(_e)?(void)0 : (_serverAssertWithInfo(_c,_o,#_e,__FILE__,__LINE__),redis_unreachable())) @@ -1581,6 +1586,7 @@ typedef struct client { sds sockname; /* Cached connection target address. */ listNode *client_list_node; /* list node in client list */ listNode *io_thread_client_list_node; /* list node in io thread client list */ + listNode io_thread_compression_clients_node; /* list node in io thread compression clients list */ listNode *postponed_list_node; /* list node within the postponed list */ void *module_blocked_client; /* Pointer to the RedisModuleBlockedClient associated with this * client. This is set in case of module authentication before the @@ -1597,7 +1603,10 @@ typedef struct client { void *auth_module; /* The module that owns the callback, which is used * to disconnect the client if the module is * unloaded for cleanup. Opaque for Redis Core.*/ - + compressionState *compression_state; /* Opauqe handle to compression state */ + int compression_level; /* Compression level (0 means no compresison). + * Currently not relevant for non-replication + * connections. */ /* If this client is in tracking mode and this field is non zero, * invalidation messages for keys fetched by this client will be sent to * the specified client ID. */ @@ -1679,6 +1688,8 @@ typedef struct __attribute__((aligned(CACHE_LINE_SIZE))) { list *clients; /* IO thread managed clients. */ redisAtomic long long io_reads_processed; /* Number of read events processed */ redisAtomic long long io_writes_processed; /* Number of write events processed */ + list *compression_clients; /* Clients that write/read compressed data */ + size_t cronloops; } IOThread; extern IOThread IOThreads[IO_THREADS_MAX_NUM]; @@ -2147,6 +2158,8 @@ struct redisServer { redisAtomic long long stat_net_output_bytes; /* Bytes written to network. */ redisAtomic long long stat_net_repl_input_bytes; /* Bytes read during replication, added to stat_net_input_bytes in 'info'. */ redisAtomic long long stat_net_repl_output_bytes; /* Bytes written during replication, added to stat_net_output_bytes in 'info'. */ + redisAtomic long long stat_net_repl_uncompressed_bytes; /* Bytes read from repl buffer before being compressed and written during replication. */ + redisAtomic long long stat_net_repl_decompressed_bytes; /* Decompressed bytes after reading compressed data during replication. */ size_t stat_current_cow_peak; /* Peak size of copy on write bytes. */ size_t stat_current_cow_bytes; /* Copy on write bytes while child is active. */ monotime stat_current_cow_updated; /* Last update time of stat_current_cow_bytes */ @@ -2409,6 +2422,8 @@ struct redisServer { char master_replid[CONFIG_RUN_ID_SIZE+1]; /* Master PSYNC runid. */ long long master_initial_offset; /* Master PSYNC offset. */ int repl_slave_lazy_flush; /* Lazy FLUSHALL before loading DB? */ + int repl_compression; /* Should slave attempt compressed replication link */ + int repl_master_compression_level; /* Compression level agreed with master */ /* Synchronous replication. */ list *clients_waiting_acks; /* Clients waiting in WAIT or WAITAOF. */ int get_ack_from_slaves; /* If true we send REPLCONF GETACK. */ @@ -2429,6 +2444,7 @@ struct redisServer { int oom_score_adj_values[CONFIG_OOM_COUNT]; /* Linux oom_score_adj configuration */ int oom_score_adj; /* If true, oom_score_adj is managed */ int disable_thp; /* If true, disable THP by syscall */ + int compression_max_latency; /* flush interval for client compression in ms */ /* Blocked clients */ unsigned int blocked_clients; /* # of clients executing a blocking cmd.*/ unsigned int blocked_clients_by_type[BLOCKED_NUM]; diff --git a/tests/instances.tcl b/tests/instances.tcl index 8f0a23bd298..35c3ef71352 100644 --- a/tests/instances.tcl +++ b/tests/instances.tcl @@ -41,6 +41,7 @@ set ::run_matching {} ; # If non empty, only tests matching pattern are run. set ::stop_on_failure 0 set ::loop 0 set ::tsan 0 +set ::compression 0 if {[catch {cd tmp}]} { puts "tmp directory not found." @@ -313,6 +314,8 @@ proc parse_options {} { set ::force_resp3 1 } elseif {$opt eq {--tsan}} { set ::tsan 1 + } elseif {$opt eq {--compression}} { + set ::compression 1 } elseif {$opt eq "--help"} { puts "--single Only runs tests specified by pattern." puts "--dont-clean Keep log files on exit." diff --git a/tests/integration/psync2-master-restart.tcl b/tests/integration/psync2-master-restart.tcl index 5971e74e12d..d1640b90a32 100644 --- a/tests/integration/psync2-master-restart.tcl +++ b/tests/integration/psync2-master-restart.tcl @@ -90,7 +90,7 @@ start_server {} { restart_server 0 true false true now set master [srv 0 client] } - wait_for_condition 50 1000 { + wait_for_condition 100 1000 { [status $replica master_link_status] eq {up} && [status $sub_replica master_link_status] eq {up} } else { @@ -131,7 +131,6 @@ start_server {} { $offset == [status $replica master_repl_offset] && $offset == [status $sub_replica master_repl_offset] } else { - show_cluster_status fail "Replicas and master offsets were unable to match *exactly*." } @@ -196,7 +195,7 @@ start_server {} { after 20 - wait_for_condition 500 100 { + wait_for_condition 1000 100 { [status $master master_repl_offset] == [status $replica master_repl_offset] && [status $master master_repl_offset] == [status $sub_replica master_repl_offset] } else { diff --git a/tests/integration/replication-buffer.tcl b/tests/integration/replication-buffer.tcl index 11e604c7562..b57f1289d80 100644 --- a/tests/integration/replication-buffer.tcl +++ b/tests/integration/replication-buffer.tcl @@ -76,7 +76,17 @@ start_server {} { # replication buffer, so all replicas output memory is not # more than double of replication buffer. set repl_buf_mem [s mem_total_replication_buffers] - set extra_mem [expr {[s used_memory]-$before_used-1024*1024}] + set repl_full_sync_buf_mem [s mem_replica_full_sync_buffer] + + set compression_extra_mem 0 + if {$::compression} { + # each replica holds 2 temp user-space buffers of size 128Kb related + # to compression + set numreplicas 3 + set compression_extra_mem [expr {(2 * 128) * 1024 * $numreplicas}] + } + set extra_mem [expr {[s used_memory] - $before_used - 1024*1024 - $compression_extra_mem}] + if {$rdbchannel == "yes"} { # master's replication buffers should not grow assert {$extra_mem < 1024*1024} @@ -162,6 +172,15 @@ start_server {} { # Generating RDB will take 1000 seconds $master config set rdb-key-save-delay 1000000 populate 1000 master 10000 + + # When compression is enabled the repl buffer may be consumed by the + # compression library faster than the data is actually send. We still + # trim the replication buffer in that case. In order for the following + # test to work we rely on replica2 being slow. If repl-rdb-channel is + # enabled though the repl buffer may be consumed faster than we expect. + if {$::compression} { + $replica2 config set repl-rdb-channel no + } $replica2 replicaof $master_host $master_port # Make sure replica2 is waiting bgsave wait_for_condition 5000 100 { @@ -174,6 +193,11 @@ start_server {} { # the slow replica2 kept replication buffer. populate 20000 master 10000 assert {[s repl_backlog_histlen] > [expr 10000*10000]} + + # revert the config + if {$::compression} { + $replica2 config set repl-rdb-channel yes + } } # Wait replica1 catch up with the master @@ -224,10 +248,12 @@ start_server {} { # Since we trim replication backlog inrementally, replication backlog # memory may take time to be reclaimed. - wait_for_condition 1000 100 { - [s repl_backlog_histlen] < [expr 10000*10000] - } else { - fail "Replication backlog memory is not smaller" + if {!$::compression} { + wait_for_condition 1000 100 { + [s repl_backlog_histlen] < [expr 10000*10000] + } else { + fail "Replication backlog memory is not smaller" + } } resume_process $replica2_pid } diff --git a/tests/integration/replication-rdbchannel.tcl b/tests/integration/replication-rdbchannel.tcl index 0fc955bd6f0..531a8edd246 100644 --- a/tests/integration/replication-rdbchannel.tcl +++ b/tests/integration/replication-rdbchannel.tcl @@ -299,8 +299,12 @@ start_server {tags {"repl external:skip debug_defrag:skip"}} { assert_lessthan [expr $peak_master_used_mem - $prev_used - $backlog_size] 1000000 assert_lessthan $peak_master_rpl_buf [expr {$backlog_size + 1000000}] assert_lessthan $peak_master_slave_buf_size 1000000 - # buffers in the replica are more than 5mb - assert_morethan $peak_replica_buf_size 5000000 + # buffers in the replica are more than 5mb. Skip when testing + # compression as replica buf contains compressed data in that case + # which is a lot smaller + if {$::compression == 0} { + assert_morethan $peak_replica_buf_size 5000000 + } stop_write_load $load_handle } @@ -343,11 +347,17 @@ start_server {tags {"repl external:skip"}} { fail "replica didn't start sync" } - # Create some traffic on replication stream - populate 100 master 100000 + # Create some traffic on replication stream. + # In case of compression generate a command stream that is not well + # compressed so we can reach the buffer limits easier. + if {$::compression} { + populate 1000 master 500000 0 false 0 true + } else { + populate 100 master 100000 + } # Wait for replica's buffer limit reached - wait_for_log_messages -1 {"*Replication buffer limit has been reached*"} 0 1000 10 + wait_for_log_messages -1 {"*Replication buffer limit has been reached*"} 0 100 10 # Speed up loading $replica config set key-load-delay 0 @@ -361,7 +371,12 @@ start_server {tags {"repl external:skip"}} { } # Verify sync was not interrupted. - assert_equal [s 0 sync_full] [expr $prev_sync_full + 1] + # In the compression case the master's output buffer limits could be + # reached when replica stops accumulating command stream since we + # are sending a lot more data in this case. + if {$::compression == 0} { + assert_equal [s 0 sync_full] [expr $prev_sync_full + 1] + } # Verify db's are identical assert_morethan [$master dbsize] 0 @@ -434,7 +449,11 @@ start_server {tags {"repl external:skip debug_defrag:skip"}} { } # Generate replication traffic of ~20mb to disconnect the slave on obuf limit - populate 20 master 1000000 -1 + if {$::compression} { + populate 200 master 1000000 -1 false 0 true + } else { + populate 20 master 1000000 -1 + } wait_for_log_messages -1 {"*Client * closed * for overcoming of output buffer limits.*"} $loglines 1000 10 $replica config set key-load-delay 0 diff --git a/tests/integration/replication.tcl b/tests/integration/replication.tcl index 0611a970eed..697d392544b 100644 --- a/tests/integration/replication.tcl +++ b/tests/integration/replication.tcl @@ -108,13 +108,13 @@ start_server {tags {"repl external:skip"}} { $B config set loglevel debug r set test foo assert_equal [r getset test bar] foo - wait_for_condition 500 10 { + wait_for_condition 1000 10 { [$A get test] eq "bar" } else { fail "getset wasn't propagated" } assert_equal [r set test vaz get] bar - wait_for_condition 500 10 { + wait_for_condition 1000 10 { [$A get test] eq "vaz" } else { fail "set get wasn't propagated" @@ -241,8 +241,8 @@ start_server {tags {"repl external:skip"}} { s role } {slave} - wait_for_sync r test {Sync should have transferred keys from master} { + wait_for_sync r r get mykey } {foo} @@ -367,7 +367,7 @@ foreach mdl {no yes} rdbchannel {no yes} { # Wait for all the three slaves to reach the "online" # state from the POV of the master. - set retry 500 + set retry 1000 while {$retry} { set info [r -3 info] if {[string match {*slave0:*state=online*slave1:*state=online*slave2:*state=online*} $info]} { @@ -810,7 +810,7 @@ test {diskless loading short read} { for {set i 0} {$i < $attempts} {incr i} { # wait for the replica to start reading the rdb # using the log file since the replica only responds to INFO once in 2mb - set res [wait_for_log_messages -1 {"*Loading DB in memory*"} $loglines 2000 1] + set res [wait_for_log_messages -1 {"*Loading DB in memory*"} $loglines 10000 1] set loglines [lindex $res 1] # add some additional random sleep so that we kill the master on a different place each time @@ -1551,7 +1551,7 @@ start_server {tags {"repl external:skip"}} { foreach disklessload {disabled on-empty-db} { test "Replica should reply LOADING while flushing a large db (disklessload: $disklessload)" { - start_server {} { + start_server {tags {"repl"}} { set replica [srv 0 client] start_server {} { set master [srv 0 client] diff --git a/tests/support/test.tcl b/tests/support/test.tcl index 01ea8e717aa..3d416d451fe 100644 --- a/tests/support/test.tcl +++ b/tests/support/test.tcl @@ -126,6 +126,9 @@ proc assert_refcount_morethan {key ref} { # max retries and delay between retries. Otherwise the 'elsescript' is # executed. proc wait_for_condition {maxtries delay e _else_ elsescript} { + if {$::compression} { + set maxtries [expr $maxtries * 3] + } if {$_else_ ne "else"} { error "$_else_ must be equal to \"else\"" } diff --git a/tests/support/util.tcl b/tests/support/util.tcl index e46da150ad7..146e7d63674 100644 --- a/tests/support/util.tcl +++ b/tests/support/util.tcl @@ -136,6 +136,10 @@ proc wait_for_sync r { if {$::tsan} { set maxtries 100 } + # same for compression + if {$::compression} { + set maxtries 200 + } wait_for_condition $maxtries 100 { [status $r master_link_status] eq "up" @@ -150,6 +154,9 @@ proc wait_replica_online {r {replica_id 0} {maxtries 50} {delay 100}} { if {$::tsan} { set maxtries [expr {$maxtries * 2}] } + if {$::compression} { + set maxtries [expr {$maxtries * 3}] + } wait_for_condition $maxtries $delay { [string match "*slave$replica_id:*,state=online*" [$r info replication]] @@ -165,6 +172,9 @@ proc wait_for_ofs_sync {r1 r2} { if {$::tsan} { set maxtries 100 } + if {$::compression} { + set maxtries 150 + } wait_for_condition $maxtries 100 { [status $r1 master_repl_offset] eq [status $r2 master_repl_offset] } else { @@ -221,6 +231,9 @@ proc verify_log_message {srv_idx pattern from_line} { # wait for pattern to be found in server's stdout after certain line number # return value is a list containing the line that matched the pattern and the line number proc wait_for_log_messages {srv_idx patterns from_line maxtries delay} { + if {$::compression} { + set maxtries [expr {$maxtries * 3}] + } set retry $maxtries set next_line [expr $from_line + 1] ;# searching form the line after set stdout [srv $srv_idx stdout] @@ -656,11 +669,19 @@ proc stop_bg_complex_data {handle} { # Write num keys with the given key prefix and value size (in bytes). If idx is # given, it's the index (AKA level) used with the srv procedure and it specifies # to which Redis instance to write the keys. -proc populate {num {prefix key:} {size 3} {idx 0} {prints false} {expires 0}} { +proc populate {num {prefix key:} {size 3} {idx 0} {prints false} {expires 0} {random false}} { r $idx deferred 1 if {$num > 16} {set pipeline 16} else {set pipeline $num} - set val [string repeat A $size] + if {$random} { + set baseval [randstring $size $size] + } else { + set val [string repeat A $size] + } for {set j 0} {$j < $pipeline} {incr j} { + if {$random} { + set jstr [format "%08d" $j] + set val [string replace $baseval 0 7 $jstr] + } if {$expires > 0} { r $idx set $prefix$j $val ex $expires } else { @@ -669,6 +690,10 @@ proc populate {num {prefix key:} {size 3} {idx 0} {prints false} {expires 0}} { if {$prints} {puts $j} } for {} {$j < $num} {incr j} { + if {$random} { + set jstr [format "%08d" $j] + set val [string replace $baseval 0 7 $jstr] + } if {$expires > 0} { r $idx set $prefix$j $val ex $expires } else { @@ -754,7 +779,11 @@ proc resume_process {pid} { after 100 } - wait_for_condition 50 1000 { + set max_attempts 50 + if {$::compression} { + set max_attempts 150 + } + wait_for_condition $max_attempts 1000 { [string match "R*" [exec ps -o state= -p $pid]] || [string match "S*" [exec ps -o state= -p $pid]] } else { diff --git a/tests/test_helper.tcl b/tests/test_helper.tcl index 2f73df1c469..f7fb27b47ef 100644 --- a/tests/test_helper.tcl +++ b/tests/test_helper.tcl @@ -90,6 +90,7 @@ set ::large_memory 0 set ::log_req_res 0 set ::force_resp3 0 set ::debug_defrag 0 +set ::compression 0 # Set to 1 when we are running in client mode. The Redis test uses a # server-client model to run tests simultaneously. The server instance @@ -728,6 +729,8 @@ for {set j 0} {$j < [llength $argv]} {incr j} { set ::ignoredigest 1 } elseif {$opt eq {--debug-defrag}} { set ::debug_defrag 1 + } elseif {$opt eq {--compression}} { + set ::compression 1 } elseif {$opt eq {--help}} { print_help_screen exit 0