From 06c4566251eb11c913f1a6cb9ce2ed9e760cffcd Mon Sep 17 00:00:00 2001 From: Markus Unterwaditzer Date: Mon, 16 Jun 2025 10:41:07 +0200 Subject: [PATCH 01/21] ref(span-buffer): Add compression --- src/sentry/spans/buffer.py | 43 ++++++++++++++++-- tests/sentry/spans/test_buffer.py | 72 +++++++++++++++++++++++++++++++ 2 files changed, 112 insertions(+), 3 deletions(-) diff --git a/src/sentry/spans/buffer.py b/src/sentry/spans/buffer.py index c9ef32cd136dcc..c0b4975c90810f 100644 --- a/src/sentry/spans/buffer.py +++ b/src/sentry/spans/buffer.py @@ -70,6 +70,7 @@ from typing import Any, NamedTuple import rapidjson +import zstandard from django.conf import settings from django.utils.functional import cached_property from sentry_redis_tools.clients import RedisCluster, StrictRedis @@ -153,6 +154,9 @@ def __init__(self, assigned_shards: list[int]): self.assigned_shards = list(assigned_shards) self.add_buffer_sha: str | None = None self.any_shard_at_limit = False + # Reuse compressor/decompressor objects for better performance + self._zstd_compressor = zstandard.ZstdCompressor(level=0) + self._zstd_decompressor = zstandard.ZstdDecompressor() @cached_property def client(self) -> RedisCluster[bytes] | StrictRedis[bytes]: @@ -187,7 +191,9 @@ def process_spans(self, spans: Sequence[Span], now: int): with self.client.pipeline(transaction=False) as p: for (project_and_trace, parent_span_id), subsegment in trees.items(): set_key = self._get_span_key(project_and_trace, parent_span_id) - p.sadd(set_key, *[span.payload for span in subsegment]) + # Compress multiple spans together into batches + compressed = self._compress_span_payloads([span.payload for span in subsegment]) + p.sadd(set_key, compressed) p.execute() @@ -310,6 +316,32 @@ def _group_by_parent(self, spans: Sequence[Span]) -> dict[tuple[str, str], list[ return trees + def _compress_span_payloads(self, payloads: list[bytes]) -> bytes: + combined = b"\x00".join(payloads) + original_size = len(combined) + + with metrics.timer("spans.buffer.compression.cpu_time"): + compressed = self._zstd_compressor.compress(combined) + + compressed_size = len(compressed) + + compression_ratio = compressed_size / original_size if original_size > 0 else 0 + metrics.timing("spans.buffer.compression.original_size", original_size) + metrics.timing("spans.buffer.compression.compressed_size", compressed_size) + metrics.timing("spans.buffer.compression.compression_ratio", compression_ratio) + + return compressed + + def _decompress_batch(self, compressed_data: bytes) -> list[bytes]: + # Check for zstd magic header (0xFD2FB528 in little-endian) -- + # backwards compat with code that did not write compressed payloads. + with metrics.timer("spans.buffer.decompression.cpu_time"): + if not compressed_data.startswith(b"\x28\xb5\x2f\xfd"): + return [compressed_data] + + decompressed_buffer = self._zstd_decompressor.decompress(compressed_data) + return decompressed_buffer.split(b"\x00") + def record_stored_segments(self): with metrics.timer("spans.buffer.get_stored_segments"): with self.client.pipeline(transaction=False) as p: @@ -438,7 +470,12 @@ def _load_segment_data(self, segment_keys: list[SegmentKey]) -> dict[SegmentKey, results = p.execute() for key, (cursor, spans) in zip(current_keys, results): - sizes[key] += sum(len(span) for span in spans) + decompressed_spans = [] + + for span_data in spans: + decompressed_spans.extend(self._decompress_batch(span_data)) + + sizes[key] += sum(len(span) for span in decompressed_spans) if sizes[key] > max_segment_bytes: metrics.incr("spans.buffer.flush_segments.segment_size_exceeded") logger.warning("Skipping too large segment, byte size %s", sizes[key]) @@ -447,7 +484,7 @@ def _load_segment_data(self, segment_keys: list[SegmentKey]) -> dict[SegmentKey, del cursors[key] continue - payloads[key].extend(spans) + payloads[key].extend(decompressed_spans) if len(payloads[key]) > max_segment_spans: metrics.incr("spans.buffer.flush_segments.segment_span_count_exceeded") logger.warning("Skipping too large segment, span count %s", len(payloads[key])) diff --git a/tests/sentry/spans/test_buffer.py b/tests/sentry/spans/test_buffer.py index 366f5aeec281c1..057efc35d6881f 100644 --- a/tests/sentry/spans/test_buffer.py +++ b/tests/sentry/spans/test_buffer.py @@ -513,3 +513,75 @@ def test_flush_rebalance(buffer: SpansBuffer): assert not rv assert_clean(buffer.client) + + +def test_compression_functionality(buffer): + """Test that compression is working correctly.""" + + def make_payload(span_id: str): + return rapidjson.dumps( + { + "span_id": span_id, + "trace_id": "a" * 32, + "data": {"message": "x" * 1000}, # Large payload to compress well + "extra_data": {"field": "y" * 500}, + } + ).encode("ascii") + + spans = [ + Span( + payload=make_payload("b" * 16), + trace_id="a" * 32, + span_id="b" * 16, # This will be the root span + parent_span_id=None, + project_id=1, + is_segment_span=True, + ), + Span( + payload=make_payload("a" * 16), + trace_id="a" * 32, + span_id="a" * 16, + parent_span_id="b" * 16, + project_id=1, + ), + Span( + payload=make_payload("c" * 16), + trace_id="a" * 32, + span_id="c" * 16, + parent_span_id="b" * 16, + project_id=1, + ), + ] + + # Process spans - should compress them together + buffer.process_spans(spans, now=0) + + # Verify data is stored (and compressed internally) + segment_key = _segment_id(1, "a" * 32, "b" * 16) + stored_data = buffer.client.smembers(segment_key) + + # Should have stored data (compressed or uncompressed fallback) + assert len(stored_data) > 0 + + # Verify we can flush and get back the original data + segments = buffer.flush_segments(now=11) + assert len(segments) == 1 + + segment = list(segments.values())[0] + assert len(segment.spans) == 3 + + # Verify the payloads are correctly reconstructed + span_ids = set() + for span in segment.spans: + assert "data" in span.payload + assert "extra_data" in span.payload + assert span.payload["data"]["message"] == "x" * 1000 + assert span.payload["extra_data"]["field"] == "y" * 500 + span_ids.add(span.payload["span_id"]) + + # Verify we got all expected span IDs + expected_span_ids = {"a" * 16, "b" * 16, "c" * 16} + assert span_ids == expected_span_ids + + buffer.done_flush_segments(segments) + assert_clean(buffer.client) From 6bca6f87a7392eaafa068e2ced6b51afeeebca35 Mon Sep 17 00:00:00 2001 From: Markus Unterwaditzer Date: Mon, 16 Jun 2025 14:28:49 +0200 Subject: [PATCH 02/21] dictionary training (dictionary is sketchy) --- src/sentry/spans/buffer.py | 16 +++++++++++++--- src/sentry/spans/zstd_dict.bin | Bin 0 -> 8192 bytes 2 files changed, 13 insertions(+), 3 deletions(-) create mode 100644 src/sentry/spans/zstd_dict.bin diff --git a/src/sentry/spans/buffer.py b/src/sentry/spans/buffer.py index c0b4975c90810f..bf810e7f824568 100644 --- a/src/sentry/spans/buffer.py +++ b/src/sentry/spans/buffer.py @@ -66,6 +66,7 @@ import itertools import logging import math +import os from collections.abc import Generator, MutableMapping, Sequence from typing import Any, NamedTuple @@ -92,6 +93,14 @@ logger = logging.getLogger(__name__) +def _load_zstd_dictionary() -> zstandard.ZstdCompressionDict: + """Load the zstd dictionary for span compression.""" + dict_path = os.path.join(os.path.dirname(__file__), "zstd_dict.bin") + with open(dict_path, "rb") as f: + dict_data = f.read() + return zstandard.ZstdCompressionDict(dict_data) + + def _segment_key_to_span_id(segment_key: SegmentKey) -> bytes: return parse_segment_key(segment_key)[-1] @@ -154,9 +163,10 @@ def __init__(self, assigned_shards: list[int]): self.assigned_shards = list(assigned_shards) self.add_buffer_sha: str | None = None self.any_shard_at_limit = False - # Reuse compressor/decompressor objects for better performance - self._zstd_compressor = zstandard.ZstdCompressor(level=0) - self._zstd_decompressor = zstandard.ZstdDecompressor() + # Load custom dictionary and create compressor/decompressor objects + self._zstd_dict = _load_zstd_dictionary() + self._zstd_compressor = zstandard.ZstdCompressor(level=0, dict_data=self._zstd_dict) + self._zstd_decompressor = zstandard.ZstdDecompressor(dict_data=self._zstd_dict) @cached_property def client(self) -> RedisCluster[bytes] | StrictRedis[bytes]: diff --git a/src/sentry/spans/zstd_dict.bin b/src/sentry/spans/zstd_dict.bin new file mode 100644 index 0000000000000000000000000000000000000000..3c00b2226f0e06a80da3c4e54b842103a3e34d1a GIT binary patch literal 8192 zcmbVROOGR08J&btKw*JbcNNmh~_Be6dj$Iz5iAD+_ zh>%#21wR0kf50yQMM5lD!ve8kgV-V=Avm}Esfr z{`(*Q^ttC=yZ?t*UwHk~AN=Cu-@fyw;}?JS`#&2T_!i1Pt#=@)wAB|d-z^Y2Z5B2Pu*~ydZ*gj ziU>J@L8q}ELvYu37IKZRVz2wp3^M{YV`{rRUVv<#g8>^U-j8baV`b}Xx z$4i>!mgUjb@(d%Du~st6tyO~1^+W!uPMc>~>lwjprGz4aXv4X-n`bTu!{K0-!gv&W z&z?Pdua}-;d;LMN7@^r3B}~Xp1}VLZ#2PMX2!RvnR5AVZ!i@$$a>X&tLrn7H-efv{ zD}}`@%nH+ZB&5Zxltf0o35jJcg=Haflb!R#=_C}>m~}RBi`jbI=FFthjBB5~wHTbZ zX<6as9;uUw(s}7smYA+gkS>aY7cb0S@F0WsLOZ2B!FZfc7fZfG33XNx4QJ`N_x1qRI>lS2Pt&w4X*%nNaYXF&;167A1t6}S z%u-zsMi;Xa7Z#4~+_4DgcKkL09ewrAJWa2<(^Z>ze)MJ&(9zYdxVrrB=Rq3AKB9T? zwTbAyCcNxCQU1r`o zTHd!4I>wN0=oltDkzw-Y&uIW3aqh)ff+O2ta;$SDih8#zoUq*#fZ(_S*joJ^=dfC{IID|Qyb zk(!K)gg+6OQ_Rx|VauZ?A$zL1W+axKb{0Y{hpG03a5Hdy2oFYenvUGCENVK=gW)ns zgRwywWjXN40|uy+NxA3&RgBtL7C22IAzhFNR1+#W7nD|#8qK6Fqcsw+ph9$4$R!Kk z&T2QUoAkn`%Sn@Cc$QEKEW=~i!uBl_yan*oyn}F{q~;ZV4HJNwG>Wa++P6}o0IRa% zoD*(vYfQ5+9ZbrC5WCoFNV8dg;;srzU^BJ5z#`u(|4Z^Tc~g)G-%GMI-&vE{?xJKz zb3w9#wlmByz3Q)Kcv6ETHc9%*`Ku2w&yt!JEQIBZFA*5RF`}gRi74P#vFUDL5Eg8- z(I+XC4LgBhG|3@~ECVd!n<(!DhQiO@z<|vG%k99Zr}GW8ty!+)WuieloK&{pcJaU+ zyiA!WKub8*bCZ`TP?s@LQ82LNSmX6xCYlqfM8yU2jUec{UjF9U`O%4+3{K_%>KUwi zM^7zX2Vc2CiJFDvNbe$ahp7tVOH6q=j`Lx`{pj)SVmh7|aT4K@q`217Q?FSr@2*`v zM~GpTQAoof@&*^LW_7wO4~*+kBV};gX>TPWT4D3x?3LH>z+{T?5N1Mz(m@7$HfL&Vp+Yg-JrNS?_JeM8c>t)s6))(6xY-MIg2;z?@gy z3oDpWk|-L34_X=*nc>Xy29?&5O-e^)$kI7Wg|>T%S6Xj~^jUKe6DVcd0z%T1wA2KK z$V`{?vKp9ExO5xYxZ1-`MI?g2fa1_M66FFM zxy5lER>Ezdu1YFiQ1E(SEA%C_sbB!hp-sEEhIpE#{#!FpSqYp`ecCeu;btjkA(+VX zj>Of>V)^^@0o6;PvQW(15*KyBY<}5@+>X5nl)5mYGJsxzZQsRQlvv9^1?Dy;Pzt4y zt(ZmP5xnGjgJJKmm_XTv+NP-auEnM)4rZlT{9>VkQ_A?4V7e7!w>iJ5FxsRq?X4US zp22JzhX3TekyCIaW{5AFZT6+wS@VYT<7wSxjmZ^C+a?Gaa#^9ZBqd)8zAojCo7Yc@ z7z?gW{bGS@HgC+WdtLI-db2ASg2h)f3Cur)m7rX#k1}T&g$S&n!y7sBn%y!|qP@i? zpwpAG45*?hLy?rg zU=aY$Z3EjYlkW(D>D-sna}j@=(As0{L#o?tA<%B$w0yVWx3$G@7Xm`cZ6Oe0d}Xpv zD8599#B-)}AjuJ#9nnRQ?a-hWz>TgDAh^<|K^6csDGM6CNg16R$9(qA7#}#0s+em{ zgFsvqYymYZWFTWjCF}?R!8wYhWGuMOWZEc+cISc;q(_bhb!b*v8_dz6K`vDh$a^{$ z9Oc&aEQnpkd04hCT2)`f)>OryOen#`T@VG+#D0Pn9`;A$dXNj8lyV9t+EYBS27_AF z6JM57>+0#wz`DTMHO|~c0(+4WCFl)#f;Nz7dQ7XxAB<4U4PxI-FTiw7HMmy8lP8nn z)DFCgrkm0V+5$MsxS5@tEt53I8d9iGc|3+#Y0k)l6CxTw&i)U4E>cTxuuDJ#(6*`v4GVu~=Uxzd#sM6n{0 zZcqp4dNPK_0mLyL+79eDQFOd|e)Q#|Z@ls7J4a2ic=Y`TXK$Px4dT&Pj=owV;Kmxi zG~#X&0H{F^qC_arkdx!VQlCtt``7fpw`{x^m$h}GQ|^n-=w?s+*i8$`?gZmDG&c>5 zl3WX&&}zKz(jmgAcb%jrQX0zPAS2Nlf`fLV;D(kE1fK*9={>i!g8;oKiy8ziA&S01 zk+|2kgor}#Od6mYGN1_cn_Rjj>6T?F0%s+1ZtMjV3&PT?ZU9$Q65~*o1t(Z6CE?nw zH-XO{?+Q|qx8=fuJ%{k`PItKlL9OXF{97z=5Z*{xBtZ*LR?X#Z?`0oHG<9EFY}0Ly zXi7`m!LiRp%dK!AE{N3ypdFTZqch9FPb)p|Bw4pynTe6kpd69tXruFtc@J`B>8dOm zkn1N_l1l8=ujpDmrHjVZ$!US4hX>_uOWE%A>WW5gKLkU@R+3ZH!Z^U89H4otip3dC zrIpY^Lz3c1YjbWkNz+i6wGq(uRx9gJbVS=h5HA%3zbM_6wIvY>Dm^7 z;BI^cZx3d(d77PI#jOAXt!&C#5lneEO{2?SVW)f!XE=SZoH z(re-^sMtE3%-|$5sEA%l@yc2eE1V5viuPReJVWnda7z)FL~bA4B{Z9bMPFO;=yl0} zj{HTnv{(Hy^qp>!ziV>aYH6TZrc|YfQqUi}e09SS1{F%8=EGteK!5q?K-e7w*yYo6PP?EeFQ C!zl~^ literal 0 HcmV?d00001 From f6b036a5792c668df38a2a87c401796c4aaf0def Mon Sep 17 00:00:00 2001 From: Markus Unterwaditzer Date: Mon, 16 Jun 2025 14:54:59 +0200 Subject: [PATCH 03/21] Revert "dictionary training (dictionary is sketchy)" This reverts commit 6bca6f87a7392eaafa068e2ced6b51afeeebca35. --- src/sentry/spans/buffer.py | 16 +++------------- src/sentry/spans/zstd_dict.bin | Bin 8192 -> 0 bytes 2 files changed, 3 insertions(+), 13 deletions(-) delete mode 100644 src/sentry/spans/zstd_dict.bin diff --git a/src/sentry/spans/buffer.py b/src/sentry/spans/buffer.py index bf810e7f824568..c0b4975c90810f 100644 --- a/src/sentry/spans/buffer.py +++ b/src/sentry/spans/buffer.py @@ -66,7 +66,6 @@ import itertools import logging import math -import os from collections.abc import Generator, MutableMapping, Sequence from typing import Any, NamedTuple @@ -93,14 +92,6 @@ logger = logging.getLogger(__name__) -def _load_zstd_dictionary() -> zstandard.ZstdCompressionDict: - """Load the zstd dictionary for span compression.""" - dict_path = os.path.join(os.path.dirname(__file__), "zstd_dict.bin") - with open(dict_path, "rb") as f: - dict_data = f.read() - return zstandard.ZstdCompressionDict(dict_data) - - def _segment_key_to_span_id(segment_key: SegmentKey) -> bytes: return parse_segment_key(segment_key)[-1] @@ -163,10 +154,9 @@ def __init__(self, assigned_shards: list[int]): self.assigned_shards = list(assigned_shards) self.add_buffer_sha: str | None = None self.any_shard_at_limit = False - # Load custom dictionary and create compressor/decompressor objects - self._zstd_dict = _load_zstd_dictionary() - self._zstd_compressor = zstandard.ZstdCompressor(level=0, dict_data=self._zstd_dict) - self._zstd_decompressor = zstandard.ZstdDecompressor(dict_data=self._zstd_dict) + # Reuse compressor/decompressor objects for better performance + self._zstd_compressor = zstandard.ZstdCompressor(level=0) + self._zstd_decompressor = zstandard.ZstdDecompressor() @cached_property def client(self) -> RedisCluster[bytes] | StrictRedis[bytes]: diff --git a/src/sentry/spans/zstd_dict.bin b/src/sentry/spans/zstd_dict.bin deleted file mode 100644 index 3c00b2226f0e06a80da3c4e54b842103a3e34d1a..0000000000000000000000000000000000000000 GIT binary patch literal 0 HcmV?d00001 literal 8192 zcmbVROOGR08J&btKw*JbcNNmh~_Be6dj$Iz5iAD+_ zh>%#21wR0kf50yQMM5lD!ve8kgV-V=Avm}Esfr z{`(*Q^ttC=yZ?t*UwHk~AN=Cu-@fyw;}?JS`#&2T_!i1Pt#=@)wAB|d-z^Y2Z5B2Pu*~ydZ*gj ziU>J@L8q}ELvYu37IKZRVz2wp3^M{YV`{rRUVv<#g8>^U-j8baV`b}Xx z$4i>!mgUjb@(d%Du~st6tyO~1^+W!uPMc>~>lwjprGz4aXv4X-n`bTu!{K0-!gv&W z&z?Pdua}-;d;LMN7@^r3B}~Xp1}VLZ#2PMX2!RvnR5AVZ!i@$$a>X&tLrn7H-efv{ zD}}`@%nH+ZB&5Zxltf0o35jJcg=Haflb!R#=_C}>m~}RBi`jbI=FFthjBB5~wHTbZ zX<6as9;uUw(s}7smYA+gkS>aY7cb0S@F0WsLOZ2B!FZfc7fZfG33XNx4QJ`N_x1qRI>lS2Pt&w4X*%nNaYXF&;167A1t6}S z%u-zsMi;Xa7Z#4~+_4DgcKkL09ewrAJWa2<(^Z>ze)MJ&(9zYdxVrrB=Rq3AKB9T? zwTbAyCcNxCQU1r`o zTHd!4I>wN0=oltDkzw-Y&uIW3aqh)ff+O2ta;$SDih8#zoUq*#fZ(_S*joJ^=dfC{IID|Qyb zk(!K)gg+6OQ_Rx|VauZ?A$zL1W+axKb{0Y{hpG03a5Hdy2oFYenvUGCENVK=gW)ns zgRwywWjXN40|uy+NxA3&RgBtL7C22IAzhFNR1+#W7nD|#8qK6Fqcsw+ph9$4$R!Kk z&T2QUoAkn`%Sn@Cc$QEKEW=~i!uBl_yan*oyn}F{q~;ZV4HJNwG>Wa++P6}o0IRa% zoD*(vYfQ5+9ZbrC5WCoFNV8dg;;srzU^BJ5z#`u(|4Z^Tc~g)G-%GMI-&vE{?xJKz zb3w9#wlmByz3Q)Kcv6ETHc9%*`Ku2w&yt!JEQIBZFA*5RF`}gRi74P#vFUDL5Eg8- z(I+XC4LgBhG|3@~ECVd!n<(!DhQiO@z<|vG%k99Zr}GW8ty!+)WuieloK&{pcJaU+ zyiA!WKub8*bCZ`TP?s@LQ82LNSmX6xCYlqfM8yU2jUec{UjF9U`O%4+3{K_%>KUwi zM^7zX2Vc2CiJFDvNbe$ahp7tVOH6q=j`Lx`{pj)SVmh7|aT4K@q`217Q?FSr@2*`v zM~GpTQAoof@&*^LW_7wO4~*+kBV};gX>TPWT4D3x?3LH>z+{T?5N1Mz(m@7$HfL&Vp+Yg-JrNS?_JeM8c>t)s6))(6xY-MIg2;z?@gy z3oDpWk|-L34_X=*nc>Xy29?&5O-e^)$kI7Wg|>T%S6Xj~^jUKe6DVcd0z%T1wA2KK z$V`{?vKp9ExO5xYxZ1-`MI?g2fa1_M66FFM zxy5lER>Ezdu1YFiQ1E(SEA%C_sbB!hp-sEEhIpE#{#!FpSqYp`ecCeu;btjkA(+VX zj>Of>V)^^@0o6;PvQW(15*KyBY<}5@+>X5nl)5mYGJsxzZQsRQlvv9^1?Dy;Pzt4y zt(ZmP5xnGjgJJKmm_XTv+NP-auEnM)4rZlT{9>VkQ_A?4V7e7!w>iJ5FxsRq?X4US zp22JzhX3TekyCIaW{5AFZT6+wS@VYT<7wSxjmZ^C+a?Gaa#^9ZBqd)8zAojCo7Yc@ z7z?gW{bGS@HgC+WdtLI-db2ASg2h)f3Cur)m7rX#k1}T&g$S&n!y7sBn%y!|qP@i? zpwpAG45*?hLy?rg zU=aY$Z3EjYlkW(D>D-sna}j@=(As0{L#o?tA<%B$w0yVWx3$G@7Xm`cZ6Oe0d}Xpv zD8599#B-)}AjuJ#9nnRQ?a-hWz>TgDAh^<|K^6csDGM6CNg16R$9(qA7#}#0s+em{ zgFsvqYymYZWFTWjCF}?R!8wYhWGuMOWZEc+cISc;q(_bhb!b*v8_dz6K`vDh$a^{$ z9Oc&aEQnpkd04hCT2)`f)>OryOen#`T@VG+#D0Pn9`;A$dXNj8lyV9t+EYBS27_AF z6JM57>+0#wz`DTMHO|~c0(+4WCFl)#f;Nz7dQ7XxAB<4U4PxI-FTiw7HMmy8lP8nn z)DFCgrkm0V+5$MsxS5@tEt53I8d9iGc|3+#Y0k)l6CxTw&i)U4E>cTxuuDJ#(6*`v4GVu~=Uxzd#sM6n{0 zZcqp4dNPK_0mLyL+79eDQFOd|e)Q#|Z@ls7J4a2ic=Y`TXK$Px4dT&Pj=owV;Kmxi zG~#X&0H{F^qC_arkdx!VQlCtt``7fpw`{x^m$h}GQ|^n-=w?s+*i8$`?gZmDG&c>5 zl3WX&&}zKz(jmgAcb%jrQX0zPAS2Nlf`fLV;D(kE1fK*9={>i!g8;oKiy8ziA&S01 zk+|2kgor}#Od6mYGN1_cn_Rjj>6T?F0%s+1ZtMjV3&PT?ZU9$Q65~*o1t(Z6CE?nw zH-XO{?+Q|qx8=fuJ%{k`PItKlL9OXF{97z=5Z*{xBtZ*LR?X#Z?`0oHG<9EFY}0Ly zXi7`m!LiRp%dK!AE{N3ypdFTZqch9FPb)p|Bw4pynTe6kpd69tXruFtc@J`B>8dOm zkn1N_l1l8=ujpDmrHjVZ$!US4hX>_uOWE%A>WW5gKLkU@R+3ZH!Z^U89H4otip3dC zrIpY^Lz3c1YjbWkNz+i6wGq(uRx9gJbVS=h5HA%3zbM_6wIvY>Dm^7 z;BI^cZx3d(d77PI#jOAXt!&C#5lneEO{2?SVW)f!XE=SZoH z(re-^sMtE3%-|$5sEA%l@yc2eE1V5viuPReJVWnda7z)FL~bA4B{Z9bMPFO;=yl0} zj{HTnv{(Hy^qp>!ziV>aYH6TZrc|YfQqUi}e09SS1{F%8=EGteK!5q?K-e7w*yYo6PP?EeFQ C!zl~^ From d1240725fc50676d68e4ffadc2c3e8c850732074 Mon Sep 17 00:00:00 2001 From: Markus Unterwaditzer Date: Mon, 16 Jun 2025 19:54:07 +0200 Subject: [PATCH 04/21] try zlib --- src/sentry/spans/buffer.py | 14 ++++++-------- 1 file changed, 6 insertions(+), 8 deletions(-) diff --git a/src/sentry/spans/buffer.py b/src/sentry/spans/buffer.py index c0b4975c90810f..e85a8f1078b4da 100644 --- a/src/sentry/spans/buffer.py +++ b/src/sentry/spans/buffer.py @@ -66,11 +66,11 @@ import itertools import logging import math +import zlib from collections.abc import Generator, MutableMapping, Sequence from typing import Any, NamedTuple import rapidjson -import zstandard from django.conf import settings from django.utils.functional import cached_property from sentry_redis_tools.clients import RedisCluster, StrictRedis @@ -154,9 +154,7 @@ def __init__(self, assigned_shards: list[int]): self.assigned_shards = list(assigned_shards) self.add_buffer_sha: str | None = None self.any_shard_at_limit = False - # Reuse compressor/decompressor objects for better performance - self._zstd_compressor = zstandard.ZstdCompressor(level=0) - self._zstd_decompressor = zstandard.ZstdDecompressor() + # LZ4 doesn't need persistent compressor/decompressor objects @cached_property def client(self) -> RedisCluster[bytes] | StrictRedis[bytes]: @@ -321,7 +319,7 @@ def _compress_span_payloads(self, payloads: list[bytes]) -> bytes: original_size = len(combined) with metrics.timer("spans.buffer.compression.cpu_time"): - compressed = self._zstd_compressor.compress(combined) + compressed = zlib.compress(combined) compressed_size = len(compressed) @@ -333,13 +331,13 @@ def _compress_span_payloads(self, payloads: list[bytes]) -> bytes: return compressed def _decompress_batch(self, compressed_data: bytes) -> list[bytes]: - # Check for zstd magic header (0xFD2FB528 in little-endian) -- + # Check for zlib magic header (0x789C for default compression) -- # backwards compat with code that did not write compressed payloads. with metrics.timer("spans.buffer.decompression.cpu_time"): - if not compressed_data.startswith(b"\x28\xb5\x2f\xfd"): + if not compressed_data.startswith(b"\x78\x9c"): return [compressed_data] - decompressed_buffer = self._zstd_decompressor.decompress(compressed_data) + decompressed_buffer = zlib.decompress(compressed_data) return decompressed_buffer.split(b"\x00") def record_stored_segments(self): From b040b91c563a9ef702350a6a68d8e5e20959df64 Mon Sep 17 00:00:00 2001 From: Markus Unterwaditzer Date: Tue, 17 Jun 2025 14:42:56 +0200 Subject: [PATCH 05/21] Revert "try zlib" This reverts commit d1240725fc50676d68e4ffadc2c3e8c850732074. --- src/sentry/spans/buffer.py | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/src/sentry/spans/buffer.py b/src/sentry/spans/buffer.py index e85a8f1078b4da..c0b4975c90810f 100644 --- a/src/sentry/spans/buffer.py +++ b/src/sentry/spans/buffer.py @@ -66,11 +66,11 @@ import itertools import logging import math -import zlib from collections.abc import Generator, MutableMapping, Sequence from typing import Any, NamedTuple import rapidjson +import zstandard from django.conf import settings from django.utils.functional import cached_property from sentry_redis_tools.clients import RedisCluster, StrictRedis @@ -154,7 +154,9 @@ def __init__(self, assigned_shards: list[int]): self.assigned_shards = list(assigned_shards) self.add_buffer_sha: str | None = None self.any_shard_at_limit = False - # LZ4 doesn't need persistent compressor/decompressor objects + # Reuse compressor/decompressor objects for better performance + self._zstd_compressor = zstandard.ZstdCompressor(level=0) + self._zstd_decompressor = zstandard.ZstdDecompressor() @cached_property def client(self) -> RedisCluster[bytes] | StrictRedis[bytes]: @@ -319,7 +321,7 @@ def _compress_span_payloads(self, payloads: list[bytes]) -> bytes: original_size = len(combined) with metrics.timer("spans.buffer.compression.cpu_time"): - compressed = zlib.compress(combined) + compressed = self._zstd_compressor.compress(combined) compressed_size = len(compressed) @@ -331,13 +333,13 @@ def _compress_span_payloads(self, payloads: list[bytes]) -> bytes: return compressed def _decompress_batch(self, compressed_data: bytes) -> list[bytes]: - # Check for zlib magic header (0x789C for default compression) -- + # Check for zstd magic header (0xFD2FB528 in little-endian) -- # backwards compat with code that did not write compressed payloads. with metrics.timer("spans.buffer.decompression.cpu_time"): - if not compressed_data.startswith(b"\x78\x9c"): + if not compressed_data.startswith(b"\x28\xb5\x2f\xfd"): return [compressed_data] - decompressed_buffer = zlib.decompress(compressed_data) + decompressed_buffer = self._zstd_decompressor.decompress(compressed_data) return decompressed_buffer.split(b"\x00") def record_stored_segments(self): From ad0778db549f6b2349e2487217a1ebe5c062abb2 Mon Sep 17 00:00:00 2001 From: Markus Unterwaditzer Date: Tue, 17 Jun 2025 16:52:20 +0200 Subject: [PATCH 06/21] mp wip --- src/sentry/spans/consumers/process/flusher.py | 122 ++++++++++++------ .../spans/consumers/process/test_consumer.py | 5 + 2 files changed, 89 insertions(+), 38 deletions(-) diff --git a/src/sentry/spans/consumers/process/flusher.py b/src/sentry/spans/consumers/process/flusher.py index efc13358dc81be..e96037f300b297 100644 --- a/src/sentry/spans/consumers/process/flusher.py +++ b/src/sentry/spans/consumers/process/flusher.py @@ -27,7 +27,8 @@ class SpanFlusher(ProcessingStrategy[FilteredPayload | int]): """ - A background thread that polls Redis for new segments to flush and to produce to Kafka. + A background multiprocessing manager that polls Redis for new segments to flush and to produce to Kafka. + Creates one process per shard for parallel processing. This is a processing step to be embedded into the consumer that writes to Redis. It takes and fowards integer messages that represent recently @@ -53,49 +54,66 @@ def __init__( self.current_drift = mp_context.Value("i", 0) self.backpressure_since = mp_context.Value("i", 0) self.healthy_since = mp_context.Value("i", 0) - self.process_restarts = 0 + self.process_restarts = {shard: 0 for shard in buffer.assigned_shards} self.produce_to_pipe = produce_to_pipe - self._create_process() + # Create one process per shard + self.processes = {} + self.shard_healthy_since = { + shard: mp_context.Value("i", int(time.time())) for shard in buffer.assigned_shards + } - def _create_process(self): + self._create_processes() + + def _create_processes(self): + # Create one process per shard + for shard in self.buffer.assigned_shards: + self._create_process_for_shard(shard) + + def _create_process_for_shard(self, shard: int): # Optimistically reset healthy_since to avoid a race between the # starting process and the next flush cycle. Keep back pressure across # the restart, however. - self.healthy_since.value = int(time.time()) + self.shard_healthy_since[shard].value = int(time.time()) + + # Create a buffer for this specific shard + shard_buffer = SpansBuffer([shard]) make_process: Callable[..., multiprocessing.context.SpawnProcess | threading.Thread] if self.produce_to_pipe is None: target = run_with_initialized_sentry( - SpanFlusher.main, + SpanFlusher.flush_shard_main, # unpickling buffer will import sentry, so it needs to be # pickled separately. at the same time, pickling # synchronization primitives like multiprocessing.Value can # only be done by the Process - self.buffer, + shard_buffer, ) make_process = self.mp_context.Process else: - target = partial(SpanFlusher.main, self.buffer) + target = partial(SpanFlusher.flush_shard_main, shard_buffer) make_process = threading.Thread - self.process = make_process( + process = make_process( target=target, args=( + shard, self.stopped, self.current_drift, self.backpressure_since, - self.healthy_since, + self.shard_healthy_since[shard], self.produce_to_pipe, ), daemon=True, ) - self.process.start() + process.start() + self.processes[shard] = process @staticmethod - def main( + def flush_shard_main( buffer: SpansBuffer, + shard: int, stopped, current_drift, backpressure_since, @@ -103,6 +121,7 @@ def main( produce_to_pipe: Callable[[KafkaPayload], None] | None, ) -> None: sentry_sdk.set_tag("sentry_spans_buffer_component", "flusher") + sentry_sdk.set_tag("sentry_spans_buffer_shard", shard) try: producer_futures = [] @@ -140,17 +159,21 @@ def produce(payload: KafkaPayload) -> None: time.sleep(1) continue - with metrics.timer("spans.buffer.flusher.produce"): + with metrics.timer("spans.buffer.flusher.produce", tags={"shard": shard}): for _, flushed_segment in flushed_segments.items(): if not flushed_segment.spans: continue spans = [span.payload for span in flushed_segment.spans] kafka_payload = KafkaPayload(None, orjson.dumps({"spans": spans}), []) - metrics.timing("spans.buffer.segment_size_bytes", len(kafka_payload.value)) + metrics.timing( + "spans.buffer.segment_size_bytes", + len(kafka_payload.value), + tags={"shard": shard}, + ) produce(kafka_payload) - with metrics.timer("spans.buffer.flusher.wait_produce"): + with metrics.timer("spans.buffer.flusher.wait_produce", tags={"shard": shard}): for future in producer_futures: future.result() @@ -169,27 +192,37 @@ def produce(payload: KafkaPayload) -> None: def poll(self) -> None: self.next_step.poll() - def _ensure_process_alive(self) -> None: + def _ensure_processes_alive(self) -> None: max_unhealthy_seconds = options.get("spans.buffer.flusher.max-unhealthy-seconds") - if not self.process.is_alive(): - exitcode = getattr(self.process, "exitcode", "unknown") - cause = f"no_process_{exitcode}" - elif int(time.time()) - self.healthy_since.value > max_unhealthy_seconds: - cause = "hang" - else: - return # healthy - metrics.incr("spans.buffer.flusher_unhealthy", tags={"cause": cause}) - if self.process_restarts > MAX_PROCESS_RESTARTS: - raise RuntimeError(f"flusher process crashed repeatedly ({cause}), restarting consumer") + for shard in self.buffer.assigned_shards: + process = self.processes.get(shard) + if not process: + continue + + cause = None + if not process.is_alive(): + exitcode = getattr(process, "exitcode", "unknown") + cause = f"no_process_{exitcode}" + elif int(time.time()) - self.shard_healthy_since[shard].value > max_unhealthy_seconds: + cause = "hang" + + if cause is None: + continue # healthy + + metrics.incr("spans.buffer.flusher_unhealthy", tags={"cause": cause, "shard": shard}) + if self.process_restarts[shard] > MAX_PROCESS_RESTARTS: + raise RuntimeError( + f"flusher process for shard {shard} crashed repeatedly ({cause}), restarting consumer" + ) - try: - self.process.kill() - except ValueError: - pass # Process already closed, ignore + try: + process.kill() + except (ValueError, AttributeError): + pass # Process already closed, ignore - self.process_restarts += 1 - self._create_process() + self.process_restarts[shard] += 1 + self._create_process_for_shard(shard) def submit(self, message: Message[FilteredPayload | int]) -> None: # Note that submit is not actually a hot path. Their message payloads @@ -197,7 +230,7 @@ def submit(self, message: Message[FilteredPayload | int]) -> None: # per second at most. If anything, self.poll() might even be called # more often than submit() - self._ensure_process_alive() + self._ensure_processes_alive() self.buffer.record_stored_segments() @@ -219,6 +252,12 @@ def submit(self, message: Message[FilteredPayload | int]) -> None: self.current_drift.value = drift = message.payload - int(time.time()) metrics.timing("spans.buffer.flusher.drift", drift) + # Update healthy_since to be the minimum across all shards + min_healthy_time = min( + healthy_since.value for healthy_since in self.shard_healthy_since.values() + ) + self.healthy_since.value = min_healthy_time + # We also pause insertion into Redis if Redis is too full. In this case # we cannot allow the flusher to progress either, as it would write # partial/fragmented segments to buffered-segments topic. We have to @@ -251,15 +290,22 @@ def close(self) -> None: self.next_step.close() def join(self, timeout: float | None = None): - # set stopped flag first so we can "flush" the background thread while + # set stopped flag first so we can "flush" the background threads while # next_step is also shutting down. we can do two things at once! self.stopped.value = True deadline = time.time() + timeout if timeout else None self.next_step.join(timeout) - while self.process.is_alive() and (deadline is None or deadline > time.time()): - time.sleep(0.1) + # Wait for all processes to finish + for shard, process in self.processes.items(): + if deadline is not None: + remaining_time = deadline - time.time() + if remaining_time <= 0: + break + + while process.is_alive() and (deadline is None or deadline > time.time()): + time.sleep(0.1) - if isinstance(self.process, multiprocessing.Process): - self.process.terminate() + if isinstance(process, multiprocessing.Process): + process.terminate() diff --git a/tests/sentry/spans/consumers/process/test_consumer.py b/tests/sentry/spans/consumers/process/test_consumer.py index f12a7ab386c43f..57859f0ef6a5f0 100644 --- a/tests/sentry/spans/consumers/process/test_consumer.py +++ b/tests/sentry/spans/consumers/process/test_consumer.py @@ -1,5 +1,6 @@ from datetime import datetime +import time import pytest import rapidjson from arroyo.backends.kafka import KafkaPayload @@ -55,6 +56,10 @@ def add_commit(offsets, force=False): step.poll() fac._flusher.current_drift.value = 9000 # "advance" our "clock" + + step.poll() + # Give flusher threads time to process after drift change + time.sleep(0.1) step.join() From ca063c5d3ad8e7ad397e7b85fe370d6c820c5bd4 Mon Sep 17 00:00:00 2001 From: Markus Unterwaditzer Date: Tue, 17 Jun 2025 17:06:57 +0200 Subject: [PATCH 07/21] flusher-processes option --- src/sentry/consumers/__init__.py | 10 +- src/sentry/spans/consumers/process/factory.py | 3 + src/sentry/spans/consumers/process/flusher.py | 162 +++++++++++++++--- .../spans/consumers/process/test_consumer.py | 46 ++++- 4 files changed, 194 insertions(+), 27 deletions(-) diff --git a/src/sentry/consumers/__init__.py b/src/sentry/consumers/__init__.py index 345912eebb8fd9..3fb6b7d963446f 100644 --- a/src/sentry/consumers/__init__.py +++ b/src/sentry/consumers/__init__.py @@ -427,7 +427,15 @@ def ingest_transactions_options() -> list[click.Option]: "topic": Topic.INGEST_SPANS, "dlq_topic": Topic.INGEST_SPANS_DLQ, "strategy_factory": "sentry.spans.consumers.process.factory.ProcessSpansStrategyFactory", - "click_options": multiprocessing_options(default_max_batch_size=100), + "click_options": [ + *multiprocessing_options(default_max_batch_size=100), + click.Option( + ["--flusher-processes", "flusher_processes"], + default=None, + type=int, + help="Maximum number of processes for the span flusher. Defaults to number of shards if not specified.", + ), + ], }, "process-segments": { "topic": Topic.BUFFERED_SEGMENTS, diff --git a/src/sentry/spans/consumers/process/factory.py b/src/sentry/spans/consumers/process/factory.py index 8a4e8a6fbdf3b2..90ef08f6fc24a2 100644 --- a/src/sentry/spans/consumers/process/factory.py +++ b/src/sentry/spans/consumers/process/factory.py @@ -38,6 +38,7 @@ def __init__( num_processes: int, input_block_size: int | None, output_block_size: int | None, + flusher_processes: int | None = None, produce_to_pipe: Callable[[KafkaPayload], None] | None = None, ): super().__init__() @@ -48,6 +49,7 @@ def __init__( self.input_block_size = input_block_size self.output_block_size = output_block_size self.num_processes = num_processes + self.flusher_processes = flusher_processes self.produce_to_pipe = produce_to_pipe if self.num_processes != 1: @@ -69,6 +71,7 @@ def create_with_partitions( flusher = self._flusher = SpanFlusher( buffer, next_step=committer, + max_processes=self.flusher_processes, produce_to_pipe=self.produce_to_pipe, ) diff --git a/src/sentry/spans/consumers/process/flusher.py b/src/sentry/spans/consumers/process/flusher.py index e96037f300b297..c585f538957767 100644 --- a/src/sentry/spans/consumers/process/flusher.py +++ b/src/sentry/spans/consumers/process/flusher.py @@ -43,10 +43,12 @@ def __init__( self, buffer: SpansBuffer, next_step: ProcessingStrategy[FilteredPayload | int], + max_processes: int | None = None, produce_to_pipe: Callable[[KafkaPayload], None] | None = None, ): self.buffer = buffer self.next_step = next_step + self.max_processes = max_processes or len(buffer.assigned_shards) self.mp_context = mp_context = multiprocessing.get_context("spawn") self.stopped = mp_context.Value("i", 0) @@ -57,7 +59,15 @@ def __init__( self.process_restarts = {shard: 0 for shard in buffer.assigned_shards} self.produce_to_pipe = produce_to_pipe - # Create one process per shard + # Determine which shards get their own processes vs shared processes + self.active_shards = min(self.max_processes, len(buffer.assigned_shards)) + self.shard_to_process_map = {} + for i, shard in enumerate(buffer.assigned_shards): + process_index = i % self.active_shards + if process_index not in self.shard_to_process_map: + self.shard_to_process_map[process_index] = [] + self.shard_to_process_map[process_index].append(shard) + self.processes = {} self.shard_healthy_since = { shard: mp_context.Value("i", int(time.time())) for shard in buffer.assigned_shards @@ -66,23 +76,24 @@ def __init__( self._create_processes() def _create_processes(self): - # Create one process per shard - for shard in self.buffer.assigned_shards: - self._create_process_for_shard(shard) + # Create processes based on shard mapping + for process_index, shards in self.shard_to_process_map.items(): + self._create_process_for_shards(process_index, shards) - def _create_process_for_shard(self, shard: int): + def _create_process_for_shards(self, process_index: int, shards: list[int]): # Optimistically reset healthy_since to avoid a race between the # starting process and the next flush cycle. Keep back pressure across # the restart, however. - self.shard_healthy_since[shard].value = int(time.time()) + for shard in shards: + self.shard_healthy_since[shard].value = int(time.time()) - # Create a buffer for this specific shard - shard_buffer = SpansBuffer([shard]) + # Create a buffer for these specific shards + shard_buffer = SpansBuffer(shards) make_process: Callable[..., multiprocessing.context.SpawnProcess | threading.Thread] if self.produce_to_pipe is None: target = run_with_initialized_sentry( - SpanFlusher.flush_shard_main, + SpanFlusher.flush_shards_main, # unpickling buffer will import sentry, so it needs to be # pickled separately. at the same time, pickling # synchronization primitives like multiprocessing.Value can @@ -91,24 +102,115 @@ def _create_process_for_shard(self, shard: int): ) make_process = self.mp_context.Process else: - target = partial(SpanFlusher.flush_shard_main, shard_buffer) + target = partial(SpanFlusher.flush_shards_main, shard_buffer) make_process = threading.Thread process = make_process( target=target, args=( - shard, + shards, self.stopped, self.current_drift, self.backpressure_since, - self.shard_healthy_since[shard], + [self.shard_healthy_since[shard] for shard in shards], self.produce_to_pipe, ), daemon=True, ) process.start() - self.processes[shard] = process + self.processes[process_index] = process + + def _create_process_for_shard(self, shard: int): + # Find which process this shard belongs to and restart that process + for process_index, shards in self.shard_to_process_map.items(): + if shard in shards: + self._create_process_for_shards(process_index, shards) + break + + @staticmethod + def flush_shards_main( + buffer: SpansBuffer, + shards: list[int], + stopped, + current_drift, + backpressure_since, + healthy_since_list, + produce_to_pipe: Callable[[KafkaPayload], None] | None, + ) -> None: + sentry_sdk.set_tag("sentry_spans_buffer_component", "flusher") + sentry_sdk.set_tag("sentry_spans_buffer_shards", ",".join(map(str, shards))) + + try: + producer_futures = [] + + if produce_to_pipe is not None: + produce = produce_to_pipe + producer = None + else: + cluster_name = get_topic_definition(Topic.BUFFERED_SEGMENTS)["cluster"] + + producer_config = get_kafka_producer_cluster_options(cluster_name) + producer = KafkaProducer(build_kafka_configuration(default_config=producer_config)) + topic = ArroyoTopic( + get_topic_definition(Topic.BUFFERED_SEGMENTS)["real_topic_name"] + ) + + def produce(payload: KafkaPayload) -> None: + producer_futures.append(producer.produce(topic, payload)) + + while not stopped.value: + system_now = int(time.time()) + now = system_now + current_drift.value + flushed_segments = buffer.flush_segments(now=now) + + # Check backpressure flag set by buffer + if buffer.any_shard_at_limit: + if backpressure_since.value == 0: + backpressure_since.value = system_now + else: + backpressure_since.value = 0 + + # Update healthy_since for all shards handled by this process + for healthy_since in healthy_since_list: + healthy_since.value = system_now + + if not flushed_segments: + time.sleep(1) + continue + + for shard in shards: + with metrics.timer("spans.buffer.flusher.produce", tags={"shard": shard}): + for _, flushed_segment in flushed_segments.items(): + if not flushed_segment.spans: + continue + + spans = [span.payload for span in flushed_segment.spans] + kafka_payload = KafkaPayload(None, orjson.dumps({"spans": spans}), []) + metrics.timing( + "spans.buffer.segment_size_bytes", + len(kafka_payload.value), + tags={"shard": shard}, + ) + produce(kafka_payload) + + with metrics.timer( + "spans.buffer.flusher.wait_produce", tags={"shards": ",".join(map(str, shards))} + ): + for future in producer_futures: + future.result() + + producer_futures.clear() + + buffer.done_flush_segments(flushed_segments) + + if producer is not None: + producer.close() + except KeyboardInterrupt: + pass + except Exception: + sentry_sdk.capture_exception() + raise @staticmethod def flush_shard_main( @@ -195,34 +297,46 @@ def poll(self) -> None: def _ensure_processes_alive(self) -> None: max_unhealthy_seconds = options.get("spans.buffer.flusher.max-unhealthy-seconds") - for shard in self.buffer.assigned_shards: - process = self.processes.get(shard) + for process_index, process in self.processes.items(): if not process: continue + shards = self.shard_to_process_map[process_index] + cause = None if not process.is_alive(): exitcode = getattr(process, "exitcode", "unknown") cause = f"no_process_{exitcode}" - elif int(time.time()) - self.shard_healthy_since[shard].value > max_unhealthy_seconds: - cause = "hang" + else: + # Check if any shard handled by this process is unhealthy + for shard in shards: + if ( + int(time.time()) - self.shard_healthy_since[shard].value + > max_unhealthy_seconds + ): + cause = "hang" + break if cause is None: continue # healthy - metrics.incr("spans.buffer.flusher_unhealthy", tags={"cause": cause, "shard": shard}) - if self.process_restarts[shard] > MAX_PROCESS_RESTARTS: - raise RuntimeError( - f"flusher process for shard {shard} crashed repeatedly ({cause}), restarting consumer" + # Report unhealthy for all shards handled by this process + for shard in shards: + metrics.incr( + "spans.buffer.flusher_unhealthy", tags={"cause": cause, "shard": shard} ) + if self.process_restarts[shard] > MAX_PROCESS_RESTARTS: + raise RuntimeError( + f"flusher process for shard {shard} crashed repeatedly ({cause}), restarting consumer" + ) + self.process_restarts[shard] += 1 try: process.kill() except (ValueError, AttributeError): pass # Process already closed, ignore - self.process_restarts[shard] += 1 - self._create_process_for_shard(shard) + self._create_process_for_shards(process_index, shards) def submit(self, message: Message[FilteredPayload | int]) -> None: # Note that submit is not actually a hot path. Their message payloads @@ -298,7 +412,7 @@ def join(self, timeout: float | None = None): self.next_step.join(timeout) # Wait for all processes to finish - for shard, process in self.processes.items(): + for process_index, process in self.processes.items(): if deadline is not None: remaining_time = deadline - time.time() if remaining_time <= 0: diff --git a/tests/sentry/spans/consumers/process/test_consumer.py b/tests/sentry/spans/consumers/process/test_consumer.py index 57859f0ef6a5f0..70f71992e6e449 100644 --- a/tests/sentry/spans/consumers/process/test_consumer.py +++ b/tests/sentry/spans/consumers/process/test_consumer.py @@ -1,6 +1,6 @@ +import time from datetime import datetime -import time import pytest import rapidjson from arroyo.backends.kafka import KafkaPayload @@ -56,7 +56,7 @@ def add_commit(offsets, force=False): step.poll() fac._flusher.current_drift.value = 9000 # "advance" our "clock" - + step.poll() # Give flusher threads time to process after drift change time.sleep(0.1) @@ -79,3 +79,45 @@ def add_commit(offsets, force=False): }, ], } + + +@pytest.mark.django_db +def test_flusher_processes_limit(monkeypatch): + """Test that flusher respects the max_processes limit""" + # Flush very aggressively to make test pass instantly + monkeypatch.setattr("time.sleep", lambda _: None) + + topic = Topic("test") + messages: list[KafkaPayload] = [] + + # Create factory with limited flusher processes + fac = ProcessSpansStrategyFactory( + max_batch_size=10, + max_batch_time=10, + num_processes=1, + input_block_size=None, + output_block_size=None, + flusher_processes=2, # Limit to 2 processes even if more shards + produce_to_pipe=messages.append, + ) + + commits = [] + + def add_commit(offsets, force=False): + commits.append(offsets) + + # Create with 4 partitions/shards to test process sharing + partitions = {Partition(topic, i): 0 for i in range(4)} + step = fac.create_with_partitions(add_commit, partitions) + + # Verify that flusher uses at most 2 processes + flusher = fac._flusher + assert len(flusher.processes) == 2 + assert flusher.max_processes == 2 + assert flusher.active_shards == 2 + + # Verify shards are distributed across processes + total_shards = sum(len(shards) for shards in flusher.shard_to_process_map.values()) + assert total_shards == 4 # All 4 shards should be assigned + + step.join() From 77fa5b59110dc99ee19f7ba7f4375d9603ef345b Mon Sep 17 00:00:00 2001 From: Markus Unterwaditzer Date: Wed, 18 Jun 2025 17:51:29 +0200 Subject: [PATCH 08/21] cahnge default --- src/sentry/consumers/__init__.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/sentry/consumers/__init__.py b/src/sentry/consumers/__init__.py index 3fb6b7d963446f..0aa60a4d6e4222 100644 --- a/src/sentry/consumers/__init__.py +++ b/src/sentry/consumers/__init__.py @@ -431,9 +431,9 @@ def ingest_transactions_options() -> list[click.Option]: *multiprocessing_options(default_max_batch_size=100), click.Option( ["--flusher-processes", "flusher_processes"], - default=None, + default=1, type=int, - help="Maximum number of processes for the span flusher. Defaults to number of shards if not specified.", + help="Maximum number of processes for the span flusher. Defaults to 1.", ), ], }, From 5bd833a60aeb93ba773eb29e9fc57ae48ca49beb Mon Sep 17 00:00:00 2001 From: Markus Unterwaditzer Date: Wed, 18 Jun 2025 18:16:12 +0200 Subject: [PATCH 09/21] fix typing --- CLAUDE.md | 11 +++++++++++ src/sentry/spans/consumers/process/flusher.py | 7 ++++--- 2 files changed, 15 insertions(+), 3 deletions(-) diff --git a/CLAUDE.md b/CLAUDE.md index 97b7d0f6eea07f..cc56fcd90d8521 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -445,6 +445,17 @@ for org in organizations: # RIGHT: Use prefetch_related organizations.prefetch_related('projects') + +# WRONG: Use hasattr() for unions +x: str | None = "hello" +if hasattr(x, "replace"): + x = x.replace("e", "a") + +# RIGHT: Use isinstance() +x: str | None = "hello" +if isinstance(x, str): + x = x.replace("e", "a") + ``` ### Frontend diff --git a/src/sentry/spans/consumers/process/flusher.py b/src/sentry/spans/consumers/process/flusher.py index 7851e7def6be4b..e2658a855dc318 100644 --- a/src/sentry/spans/consumers/process/flusher.py +++ b/src/sentry/spans/consumers/process/flusher.py @@ -61,14 +61,14 @@ def __init__( # Determine which shards get their own processes vs shared processes self.active_shards = min(self.max_processes, len(buffer.assigned_shards)) - self.shard_to_process_map = {} + self.shard_to_process_map: dict[int, list[int]] = {} for i, shard in enumerate(buffer.assigned_shards): process_index = i % self.active_shards if process_index not in self.shard_to_process_map: self.shard_to_process_map[process_index] = [] self.shard_to_process_map[process_index].append(shard) - self.processes = {} + self.processes: dict[int, multiprocessing.context.SpawnProcess | threading.Thread] = {} self.shard_healthy_since = { shard: mp_context.Value("i", int(time.time())) for shard in buffer.assigned_shards } @@ -332,7 +332,8 @@ def _ensure_processes_alive(self) -> None: self.process_restarts[shard] += 1 try: - process.kill() + if isinstance(process, multiprocessing.Process): + process.kill() except (ValueError, AttributeError): pass # Process already closed, ignore From d77bdb1293b6eee47d2fcc3377c9b1aa83fda03a Mon Sep 17 00:00:00 2001 From: Markus Unterwaditzer Date: Wed, 18 Jun 2025 18:31:02 +0200 Subject: [PATCH 10/21] remove dead code --- src/sentry/spans/consumers/process/flusher.py | 79 ------------------- 1 file changed, 79 deletions(-) diff --git a/src/sentry/spans/consumers/process/flusher.py b/src/sentry/spans/consumers/process/flusher.py index e2658a855dc318..5362050d03b977 100644 --- a/src/sentry/spans/consumers/process/flusher.py +++ b/src/sentry/spans/consumers/process/flusher.py @@ -212,85 +212,6 @@ def produce(payload: KafkaPayload) -> None: sentry_sdk.capture_exception() raise - @staticmethod - def flush_shard_main( - buffer: SpansBuffer, - shard: int, - stopped, - current_drift, - backpressure_since, - healthy_since, - produce_to_pipe: Callable[[KafkaPayload], None] | None, - ) -> None: - sentry_sdk.set_tag("sentry_spans_buffer_component", "flusher") - sentry_sdk.set_tag("sentry_spans_buffer_shard", shard) - - try: - producer_futures = [] - - if produce_to_pipe is not None: - produce = produce_to_pipe - producer = None - else: - cluster_name = get_topic_definition(Topic.BUFFERED_SEGMENTS)["cluster"] - - producer_config = get_kafka_producer_cluster_options(cluster_name) - producer = KafkaProducer(build_kafka_configuration(default_config=producer_config)) - topic = ArroyoTopic( - get_topic_definition(Topic.BUFFERED_SEGMENTS)["real_topic_name"] - ) - - def produce(payload: KafkaPayload) -> None: - producer_futures.append(producer.produce(topic, payload)) - - while not stopped.value: - system_now = int(time.time()) - now = system_now + current_drift.value - flushed_segments = buffer.flush_segments(now=now) - - # Check backpressure flag set by buffer - if buffer.any_shard_at_limit: - if backpressure_since.value == 0: - backpressure_since.value = system_now - else: - backpressure_since.value = 0 - - healthy_since.value = system_now - - if not flushed_segments: - time.sleep(1) - continue - - with metrics.timer("spans.buffer.flusher.produce", tags={"shard": shard}): - for _, flushed_segment in flushed_segments.items(): - if not flushed_segment.spans: - continue - - spans = [span.payload for span in flushed_segment.spans] - kafka_payload = KafkaPayload(None, orjson.dumps({"spans": spans}), []) - metrics.timing( - "spans.buffer.segment_size_bytes", - len(kafka_payload.value), - tags={"shard": shard}, - ) - produce(kafka_payload) - - with metrics.timer("spans.buffer.flusher.wait_produce", tags={"shard": shard}): - for future in producer_futures: - future.result() - - producer_futures.clear() - - buffer.done_flush_segments(flushed_segments) - - if producer is not None: - producer.close() - except KeyboardInterrupt: - pass - except Exception: - sentry_sdk.capture_exception() - raise - def poll(self) -> None: self.next_step.poll() From 73a371e4077fcbca158967760976473973890e04 Mon Sep 17 00:00:00 2001 From: Markus Unterwaditzer Date: Mon, 23 Jun 2025 11:45:22 +0200 Subject: [PATCH 11/21] wip --- src/sentry/spans/consumers/process/flusher.py | 56 +++++++++---------- .../spans/consumers/process/test_consumer.py | 6 +- 2 files changed, 30 insertions(+), 32 deletions(-) diff --git a/src/sentry/spans/consumers/process/flusher.py b/src/sentry/spans/consumers/process/flusher.py index 5362050d03b977..ae9f6df3f501b3 100644 --- a/src/sentry/spans/consumers/process/flusher.py +++ b/src/sentry/spans/consumers/process/flusher.py @@ -54,24 +54,24 @@ def __init__( self.stopped = mp_context.Value("i", 0) self.redis_was_full = False self.current_drift = mp_context.Value("i", 0) - self.backpressure_since = mp_context.Value("i", 0) self.healthy_since = mp_context.Value("i", 0) self.process_restarts = {shard: 0 for shard in buffer.assigned_shards} self.produce_to_pipe = produce_to_pipe # Determine which shards get their own processes vs shared processes - self.active_shards = min(self.max_processes, len(buffer.assigned_shards)) - self.shard_to_process_map: dict[int, list[int]] = {} + self.num_processes = min(self.max_processes, len(buffer.assigned_shards)) + self.shard_to_process_map: dict[int, list[int]] = {i: [] for i in range(self.num_processes)} for i, shard in enumerate(buffer.assigned_shards): - process_index = i % self.active_shards - if process_index not in self.shard_to_process_map: - self.shard_to_process_map[process_index] = [] + process_index = i % self.num_processes self.shard_to_process_map[process_index].append(shard) self.processes: dict[int, multiprocessing.context.SpawnProcess | threading.Thread] = {} self.shard_healthy_since = { shard: mp_context.Value("i", int(time.time())) for shard in buffer.assigned_shards } + self.process_backpressure_since = { + process_index: mp_context.Value("i", 0) for process_index in self.shard_to_process_map + } self._create_processes() @@ -111,7 +111,7 @@ def _create_process_for_shards(self, process_index: int, shards: list[int]): shards, self.stopped, self.current_drift, - self.backpressure_since, + self.process_backpressure_since[process_index], [self.shard_healthy_since[shard] for shard in shards], self.produce_to_pipe, ), @@ -138,8 +138,9 @@ def flush_shards_main( healthy_since_list, produce_to_pipe: Callable[[KafkaPayload], None] | None, ) -> None: + shard_tag = ",".join(map(str, shards)) sentry_sdk.set_tag("sentry_spans_buffer_component", "flusher") - sentry_sdk.set_tag("sentry_spans_buffer_shards", ",".join(map(str, shards))) + sentry_sdk.set_tag("sentry_spans_buffer_shards", shard_tag) try: producer_futures = [] @@ -179,24 +180,21 @@ def produce(payload: KafkaPayload) -> None: time.sleep(1) continue - for shard in shards: - with metrics.timer("spans.buffer.flusher.produce", tags={"shard": shard}): - for _, flushed_segment in flushed_segments.items(): - if not flushed_segment.spans: - continue - - spans = [span.payload for span in flushed_segment.spans] - kafka_payload = KafkaPayload(None, orjson.dumps({"spans": spans}), []) - metrics.timing( - "spans.buffer.segment_size_bytes", - len(kafka_payload.value), - tags={"shard": shard}, - ) - produce(kafka_payload) - - with metrics.timer( - "spans.buffer.flusher.wait_produce", tags={"shards": ",".join(map(str, shards))} - ): + with metrics.timer("spans.buffer.flusher.produce", tags={"shard": shard_tag}): + for _, flushed_segment in flushed_segments.items(): + if not flushed_segment.spans: + continue + + spans = [span.payload for span in flushed_segment.spans] + kafka_payload = KafkaPayload(None, orjson.dumps({"spans": spans}), []) + metrics.timing( + "spans.buffer.segment_size_bytes", + len(kafka_payload.value), + tags={"shard": shard_tag}, + ) + produce(kafka_payload) + + with metrics.timer("spans.buffer.flusher.wait_produce", tags={"shards": shard_tag}): for future in producer_futures: future.result() @@ -275,9 +273,9 @@ def submit(self, message: Message[FilteredPayload | int]) -> None: # efforts, it is still always going to be less durable than Kafka. # Minimizing our Redis memory usage also makes COGS easier to reason # about. - if self.backpressure_since.value > 0: - backpressure_secs = options.get("spans.buffer.flusher.backpressure-seconds") - if int(time.time()) - self.backpressure_since.value > backpressure_secs: + backpressure_secs = options.get("spans.buffer.flusher.backpressure-seconds") + for backpressure_since in self.process_backpressure_since.values(): + if int(time.time()) - backpressure_since.value > backpressure_secs: metrics.incr("spans.buffer.flusher.backpressure") raise MessageRejected() diff --git a/tests/sentry/spans/consumers/process/test_consumer.py b/tests/sentry/spans/consumers/process/test_consumer.py index 70f71992e6e449..914d5681f265c8 100644 --- a/tests/sentry/spans/consumers/process/test_consumer.py +++ b/tests/sentry/spans/consumers/process/test_consumer.py @@ -9,7 +9,7 @@ from sentry.spans.consumers.process.factory import ProcessSpansStrategyFactory -@pytest.mark.django_db +@pytest.mark.django_db(transaction=True) def test_basic(monkeypatch): # Flush very aggressively to make test pass instantly monkeypatch.setattr("time.sleep", lambda _: None) @@ -81,7 +81,7 @@ def add_commit(offsets, force=False): } -@pytest.mark.django_db +@pytest.mark.django_db(transaction=True) def test_flusher_processes_limit(monkeypatch): """Test that flusher respects the max_processes limit""" # Flush very aggressively to make test pass instantly @@ -114,7 +114,7 @@ def add_commit(offsets, force=False): flusher = fac._flusher assert len(flusher.processes) == 2 assert flusher.max_processes == 2 - assert flusher.active_shards == 2 + assert flusher.num_processes == 2 # Verify shards are distributed across processes total_shards = sum(len(shards) for shards in flusher.shard_to_process_map.values()) From 6b21925a80aefdc9684ae5f717189c894ef2a63b Mon Sep 17 00:00:00 2001 From: Markus Unterwaditzer Date: Mon, 23 Jun 2025 11:51:45 +0200 Subject: [PATCH 12/21] fix assert --- tests/sentry/spans/consumers/process/test_flusher.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/sentry/spans/consumers/process/test_flusher.py b/tests/sentry/spans/consumers/process/test_flusher.py index 6365653ae1f920..0e886ffac28a95 100644 --- a/tests/sentry/spans/consumers/process/test_flusher.py +++ b/tests/sentry/spans/consumers/process/test_flusher.py @@ -80,4 +80,4 @@ def append(msg): assert messages - assert flusher.backpressure_since.value + assert any(x.value for x in flusher.process_backpressure_since.values()) From 44070e082db6bcbaf032ce4e7cfe6b1d4af42ec5 Mon Sep 17 00:00:00 2001 From: Markus Unterwaditzer Date: Mon, 23 Jun 2025 11:55:43 +0200 Subject: [PATCH 13/21] fix test --- src/sentry/spans/consumers/process/flusher.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/src/sentry/spans/consumers/process/flusher.py b/src/sentry/spans/consumers/process/flusher.py index ae9f6df3f501b3..270fd15ffddef4 100644 --- a/src/sentry/spans/consumers/process/flusher.py +++ b/src/sentry/spans/consumers/process/flusher.py @@ -275,7 +275,10 @@ def submit(self, message: Message[FilteredPayload | int]) -> None: # about. backpressure_secs = options.get("spans.buffer.flusher.backpressure-seconds") for backpressure_since in self.process_backpressure_since.values(): - if int(time.time()) - backpressure_since.value > backpressure_secs: + if ( + backpressure_since.value > 0 + and int(time.time()) - backpressure_since.value > backpressure_secs + ): metrics.incr("spans.buffer.flusher.backpressure") raise MessageRejected() From 9903310fcda5974be0bea48684a523ae29ad4d81 Mon Sep 17 00:00:00 2001 From: Markus Unterwaditzer Date: Mon, 23 Jun 2025 12:02:17 +0200 Subject: [PATCH 14/21] rename variable --- src/sentry/spans/consumers/process/flusher.py | 14 ++++++++------ .../spans/consumers/process/test_consumer.py | 2 +- 2 files changed, 9 insertions(+), 7 deletions(-) diff --git a/src/sentry/spans/consumers/process/flusher.py b/src/sentry/spans/consumers/process/flusher.py index 270fd15ffddef4..f9e38baa7e323b 100644 --- a/src/sentry/spans/consumers/process/flusher.py +++ b/src/sentry/spans/consumers/process/flusher.py @@ -60,24 +60,26 @@ def __init__( # Determine which shards get their own processes vs shared processes self.num_processes = min(self.max_processes, len(buffer.assigned_shards)) - self.shard_to_process_map: dict[int, list[int]] = {i: [] for i in range(self.num_processes)} + self.process_to_shards_map: dict[int, list[int]] = { + i: [] for i in range(self.num_processes) + } for i, shard in enumerate(buffer.assigned_shards): process_index = i % self.num_processes - self.shard_to_process_map[process_index].append(shard) + self.process_to_shards_map[process_index].append(shard) self.processes: dict[int, multiprocessing.context.SpawnProcess | threading.Thread] = {} self.shard_healthy_since = { shard: mp_context.Value("i", int(time.time())) for shard in buffer.assigned_shards } self.process_backpressure_since = { - process_index: mp_context.Value("i", 0) for process_index in self.shard_to_process_map + process_index: mp_context.Value("i", 0) for process_index in self.process_to_shards_map } self._create_processes() def _create_processes(self): # Create processes based on shard mapping - for process_index, shards in self.shard_to_process_map.items(): + for process_index, shards in self.process_to_shards_map.items(): self._create_process_for_shards(process_index, shards) def _create_process_for_shards(self, process_index: int, shards: list[int]): @@ -123,7 +125,7 @@ def _create_process_for_shards(self, process_index: int, shards: list[int]): def _create_process_for_shard(self, shard: int): # Find which process this shard belongs to and restart that process - for process_index, shards in self.shard_to_process_map.items(): + for process_index, shards in self.process_to_shards_map.items(): if shard in shards: self._create_process_for_shards(process_index, shards) break @@ -220,7 +222,7 @@ def _ensure_processes_alive(self) -> None: if not process: continue - shards = self.shard_to_process_map[process_index] + shards = self.process_to_shards_map[process_index] cause = None if not process.is_alive(): diff --git a/tests/sentry/spans/consumers/process/test_consumer.py b/tests/sentry/spans/consumers/process/test_consumer.py index 914d5681f265c8..c216de2fe22b17 100644 --- a/tests/sentry/spans/consumers/process/test_consumer.py +++ b/tests/sentry/spans/consumers/process/test_consumer.py @@ -117,7 +117,7 @@ def add_commit(offsets, force=False): assert flusher.num_processes == 2 # Verify shards are distributed across processes - total_shards = sum(len(shards) for shards in flusher.shard_to_process_map.values()) + total_shards = sum(len(shards) for shards in flusher.process_to_shards_map.values()) assert total_shards == 4 # All 4 shards should be assigned step.join() From 3a5f8346153c56cbda7af0357247e3537ca40dd4 Mon Sep 17 00:00:00 2001 From: Markus Unterwaditzer Date: Mon, 23 Jun 2025 12:03:30 +0200 Subject: [PATCH 15/21] remove dead code --- src/sentry/spans/consumers/process/flusher.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/sentry/spans/consumers/process/flusher.py b/src/sentry/spans/consumers/process/flusher.py index f9e38baa7e323b..1a163f8f3900d7 100644 --- a/src/sentry/spans/consumers/process/flusher.py +++ b/src/sentry/spans/consumers/process/flusher.py @@ -54,7 +54,6 @@ def __init__( self.stopped = mp_context.Value("i", 0) self.redis_was_full = False self.current_drift = mp_context.Value("i", 0) - self.healthy_since = mp_context.Value("i", 0) self.process_restarts = {shard: 0 for shard in buffer.assigned_shards} self.produce_to_pipe = produce_to_pipe @@ -72,7 +71,7 @@ def __init__( shard: mp_context.Value("i", int(time.time())) for shard in buffer.assigned_shards } self.process_backpressure_since = { - process_index: mp_context.Value("i", 0) for process_index in self.process_to_shards_map + process_index: mp_context.Value("i", 0) for process_index in range(self.num_processes) } self._create_processes() From 828412e3d67840fa7cb66e4e91b70cb264d04aea Mon Sep 17 00:00:00 2001 From: Markus Unterwaditzer Date: Mon, 23 Jun 2025 12:04:49 +0200 Subject: [PATCH 16/21] remove more dead code --- src/sentry/spans/consumers/process/flusher.py | 6 ------ 1 file changed, 6 deletions(-) diff --git a/src/sentry/spans/consumers/process/flusher.py b/src/sentry/spans/consumers/process/flusher.py index 1a163f8f3900d7..323054dc5efd34 100644 --- a/src/sentry/spans/consumers/process/flusher.py +++ b/src/sentry/spans/consumers/process/flusher.py @@ -290,12 +290,6 @@ def submit(self, message: Message[FilteredPayload | int]) -> None: self.current_drift.value = drift = message.payload - int(time.time()) metrics.timing("spans.buffer.flusher.drift", drift) - # Update healthy_since to be the minimum across all shards - min_healthy_time = min( - healthy_since.value for healthy_since in self.shard_healthy_since.values() - ) - self.healthy_since.value = min_healthy_time - # We also pause insertion into Redis if Redis is too full. In this case # we cannot allow the flusher to progress either, as it would write # partial/fragmented segments to buffered-segments topic. We have to From 079e3dfaa605cc2b0384647977fc67d9efde384a Mon Sep 17 00:00:00 2001 From: Markus Unterwaditzer Date: Mon, 23 Jun 2025 12:13:06 +0200 Subject: [PATCH 17/21] remove self.buffer --- src/sentry/spans/consumers/process/flusher.py | 21 +++++++++++-------- 1 file changed, 12 insertions(+), 9 deletions(-) diff --git a/src/sentry/spans/consumers/process/flusher.py b/src/sentry/spans/consumers/process/flusher.py index 323054dc5efd34..51d085567d6191 100644 --- a/src/sentry/spans/consumers/process/flusher.py +++ b/src/sentry/spans/consumers/process/flusher.py @@ -46,7 +46,6 @@ def __init__( max_processes: int | None = None, produce_to_pipe: Callable[[KafkaPayload], None] | None = None, ): - self.buffer = buffer self.next_step = next_step self.max_processes = max_processes or len(buffer.assigned_shards) @@ -54,7 +53,6 @@ def __init__( self.stopped = mp_context.Value("i", 0) self.redis_was_full = False self.current_drift = mp_context.Value("i", 0) - self.process_restarts = {shard: 0 for shard in buffer.assigned_shards} self.produce_to_pipe = produce_to_pipe # Determine which shards get their own processes vs shared processes @@ -73,6 +71,8 @@ def __init__( self.process_backpressure_since = { process_index: mp_context.Value("i", 0) for process_index in range(self.num_processes) } + self.process_restarts = {process_index: 0 for process_index in range(self.num_processes)} + self.buffers = {} self._create_processes() @@ -121,6 +121,7 @@ def _create_process_for_shards(self, process_index: int, shards: list[int]): process.start() self.processes[process_index] = process + self.buffers[process_index] = shard_buffer def _create_process_for_shard(self, shard: int): # Find which process this shard belongs to and restart that process @@ -182,7 +183,7 @@ def produce(payload: KafkaPayload) -> None: continue with metrics.timer("spans.buffer.flusher.produce", tags={"shard": shard_tag}): - for _, flushed_segment in flushed_segments.items(): + for flushed_segment in flushed_segments.values(): if not flushed_segment.spans: continue @@ -245,11 +246,12 @@ def _ensure_processes_alive(self) -> None: metrics.incr( "spans.buffer.flusher_unhealthy", tags={"cause": cause, "shard": shard} ) - if self.process_restarts[shard] > MAX_PROCESS_RESTARTS: - raise RuntimeError( - f"flusher process for shard {shard} crashed repeatedly ({cause}), restarting consumer" - ) - self.process_restarts[shard] += 1 + + if self.process_restarts[process_index] > MAX_PROCESS_RESTARTS: + raise RuntimeError( + f"flusher process for shards {shards} crashed repeatedly ({cause}), restarting consumer" + ) + self.process_restarts[process_index] += 1 try: if isinstance(process, multiprocessing.Process): @@ -267,7 +269,8 @@ def submit(self, message: Message[FilteredPayload | int]) -> None: self._ensure_processes_alive() - self.buffer.record_stored_segments() + for buffer in self.buffers.values(): + buffer.record_stored_segments() # We pause insertion into Redis if the flusher is not making progress # fast enough. We could backlog into Redis, but we assume, despite best From f8ccac42efc7b40c925f780eeaac642e9dbe28c1 Mon Sep 17 00:00:00 2001 From: Markus Unterwaditzer Date: Mon, 23 Jun 2025 12:21:22 +0200 Subject: [PATCH 18/21] rename --- src/sentry/spans/consumers/process/flusher.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/sentry/spans/consumers/process/flusher.py b/src/sentry/spans/consumers/process/flusher.py index 51d085567d6191..2b0aac914e7472 100644 --- a/src/sentry/spans/consumers/process/flusher.py +++ b/src/sentry/spans/consumers/process/flusher.py @@ -94,7 +94,7 @@ def _create_process_for_shards(self, process_index: int, shards: list[int]): make_process: Callable[..., multiprocessing.context.SpawnProcess | threading.Thread] if self.produce_to_pipe is None: target = run_with_initialized_sentry( - SpanFlusher.flush_shards_main, + SpanFlusher.main, # unpickling buffer will import sentry, so it needs to be # pickled separately. at the same time, pickling # synchronization primitives like multiprocessing.Value can @@ -103,7 +103,7 @@ def _create_process_for_shards(self, process_index: int, shards: list[int]): ) make_process = self.mp_context.Process else: - target = partial(SpanFlusher.flush_shards_main, shard_buffer) + target = partial(SpanFlusher.main, shard_buffer) make_process = threading.Thread process = make_process( @@ -131,7 +131,7 @@ def _create_process_for_shard(self, shard: int): break @staticmethod - def flush_shards_main( + def main( buffer: SpansBuffer, shards: list[int], stopped, From 1f8c8f6329472ca6ad68c183c68975b6c4afae78 Mon Sep 17 00:00:00 2001 From: Markus Unterwaditzer Date: Mon, 23 Jun 2025 12:34:07 +0200 Subject: [PATCH 19/21] remove shard_healthy_since --- src/sentry/spans/consumers/process/flusher.py | 28 ++++++++----------- 1 file changed, 12 insertions(+), 16 deletions(-) diff --git a/src/sentry/spans/consumers/process/flusher.py b/src/sentry/spans/consumers/process/flusher.py index 2b0aac914e7472..9f3d3c95c688e5 100644 --- a/src/sentry/spans/consumers/process/flusher.py +++ b/src/sentry/spans/consumers/process/flusher.py @@ -65,8 +65,9 @@ def __init__( self.process_to_shards_map[process_index].append(shard) self.processes: dict[int, multiprocessing.context.SpawnProcess | threading.Thread] = {} - self.shard_healthy_since = { - shard: mp_context.Value("i", int(time.time())) for shard in buffer.assigned_shards + self.process_healthy_since = { + process_index: mp_context.Value("i", int(time.time())) + for process_index in range(self.num_processes) } self.process_backpressure_since = { process_index: mp_context.Value("i", 0) for process_index in range(self.num_processes) @@ -85,8 +86,7 @@ def _create_process_for_shards(self, process_index: int, shards: list[int]): # Optimistically reset healthy_since to avoid a race between the # starting process and the next flush cycle. Keep back pressure across # the restart, however. - for shard in shards: - self.shard_healthy_since[shard].value = int(time.time()) + self.process_healthy_since[process_index].value = int(time.time()) # Create a buffer for these specific shards shard_buffer = SpansBuffer(shards) @@ -113,7 +113,7 @@ def _create_process_for_shards(self, process_index: int, shards: list[int]): self.stopped, self.current_drift, self.process_backpressure_since[process_index], - [self.shard_healthy_since[shard] for shard in shards], + self.process_healthy_since[process_index], self.produce_to_pipe, ), daemon=True, @@ -137,7 +137,7 @@ def main( stopped, current_drift, backpressure_since, - healthy_since_list, + healthy_since, produce_to_pipe: Callable[[KafkaPayload], None] | None, ) -> None: shard_tag = ",".join(map(str, shards)) @@ -175,8 +175,7 @@ def produce(payload: KafkaPayload) -> None: backpressure_since.value = 0 # Update healthy_since for all shards handled by this process - for healthy_since in healthy_since_list: - healthy_since.value = system_now + healthy_since.value = system_now if not flushed_segments: time.sleep(1) @@ -228,15 +227,12 @@ def _ensure_processes_alive(self) -> None: if not process.is_alive(): exitcode = getattr(process, "exitcode", "unknown") cause = f"no_process_{exitcode}" - else: + elif ( + int(time.time()) - self.process_healthy_since[process_index].value + > max_unhealthy_seconds + ): # Check if any shard handled by this process is unhealthy - for shard in shards: - if ( - int(time.time()) - self.shard_healthy_since[shard].value - > max_unhealthy_seconds - ): - cause = "hang" - break + cause = "hang" if cause is None: continue # healthy From c9c50928e6de1900f02fc8180df233ff7f6873e9 Mon Sep 17 00:00:00 2001 From: Markus Unterwaditzer Date: Mon, 23 Jun 2025 12:47:36 +0200 Subject: [PATCH 20/21] fix mypy --- src/sentry/spans/consumers/process/flusher.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/src/sentry/spans/consumers/process/flusher.py b/src/sentry/spans/consumers/process/flusher.py index 9f3d3c95c688e5..899cfb1d4a46a9 100644 --- a/src/sentry/spans/consumers/process/flusher.py +++ b/src/sentry/spans/consumers/process/flusher.py @@ -73,7 +73,7 @@ def __init__( process_index: mp_context.Value("i", 0) for process_index in range(self.num_processes) } self.process_restarts = {process_index: 0 for process_index in range(self.num_processes)} - self.buffers = {} + self.buffers: dict[int, SpansBuffer] = {} self._create_processes() @@ -295,7 +295,11 @@ def submit(self, message: Message[FilteredPayload | int]) -> None: # wait until the situation is improved manually. max_memory_percentage = options.get("spans.buffer.max-memory-percentage") if max_memory_percentage < 1.0: - memory_infos = list(self.buffer.get_memory_info()) + from sentry.processing.backpressure.memory import ServiceMemory + + memory_infos: list[ServiceMemory] = [] + for buffer in self.buffers.values(): + memory_infos.extend(buffer.get_memory_info()) used = sum(x.used for x in memory_infos) available = sum(x.available for x in memory_infos) if available > 0 and used / available > max_memory_percentage: From 3162ad68a5c87666788b27a44eb31235025091a9 Mon Sep 17 00:00:00 2001 From: Markus Unterwaditzer Date: Mon, 23 Jun 2025 12:48:35 +0200 Subject: [PATCH 21/21] move import --- src/sentry/spans/consumers/process/flusher.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/sentry/spans/consumers/process/flusher.py b/src/sentry/spans/consumers/process/flusher.py index 899cfb1d4a46a9..ae9ec081e88838 100644 --- a/src/sentry/spans/consumers/process/flusher.py +++ b/src/sentry/spans/consumers/process/flusher.py @@ -15,6 +15,7 @@ from sentry import options from sentry.conf.types.kafka_definition import Topic +from sentry.processing.backpressure.memory import ServiceMemory from sentry.spans.buffer import SpansBuffer from sentry.utils import metrics from sentry.utils.arroyo import run_with_initialized_sentry @@ -295,8 +296,6 @@ def submit(self, message: Message[FilteredPayload | int]) -> None: # wait until the situation is improved manually. max_memory_percentage = options.get("spans.buffer.max-memory-percentage") if max_memory_percentage < 1.0: - from sentry.processing.backpressure.memory import ServiceMemory - memory_infos: list[ServiceMemory] = [] for buffer in self.buffers.values(): memory_infos.extend(buffer.get_memory_info())