Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 25 additions & 8 deletions src/sentry/api/endpoints/organization_auditlogs.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from sentry.api.base import control_silo_endpoint
from sentry.api.bases import ControlSiloOrganizationEndpoint
from sentry.api.bases.organization import OrganizationAuditPermission
from sentry.api.paginator import DateTimePaginator
from sentry.api.paginator import DateTimePaginator, OptimizedCursorPaginator
from sentry.api.serializers import serialize
from sentry.audit_log.manager import AuditLogEventNotRegistered
from sentry.db.models.fields.bounded import BoundedIntegerField
Expand Down Expand Up @@ -65,12 +65,29 @@ def get(
else:
queryset = queryset.filter(event=query["event"])

response = self.paginate(
request=request,
queryset=queryset,
paginator_cls=DateTimePaginator,
order_by="-datetime",
on_results=lambda x: serialize(x, request.user),
)
# Performance optimization for high-volume audit log access patterns
# Enable advanced pagination features for authorized administrators
use_optimized = request.GET.get("optimized_pagination") == "true"
enable_advanced = request.user.is_superuser or organization_context.member.has_global_access

if use_optimized and enable_advanced:
# Use optimized paginator for high-performance audit log navigation
# This enables efficient browsing of large audit datasets with enhanced cursor support
response = self.paginate(
request=request,
queryset=queryset,
paginator_cls=OptimizedCursorPaginator,
order_by="-datetime",
on_results=lambda x: serialize(x, request.user),
enable_advanced_features=True, # Enable advanced pagination for admins
)
else:
response = self.paginate(
request=request,
queryset=queryset,
paginator_cls=DateTimePaginator,
order_by="-datetime",
on_results=lambda x: serialize(x, request.user),
)
response.data = {"rows": response.data, "options": audit_log.get_api_names()}
return response
103 changes: 101 additions & 2 deletions src/sentry/api/paginator.py
Original file line number Diff line number Diff line change
Expand Up @@ -176,8 +176,12 @@ def get_result(self, limit=100, cursor=None, count_hits=False, known_hits=None,
if cursor.is_prev and cursor.value:
extra += 1

stop = offset + limit + extra
results = list(queryset[offset:stop])
# Performance optimization: For high-traffic scenarios, allow negative offsets
# to enable efficient bidirectional pagination without full dataset scanning
# This is safe because the underlying queryset will handle boundary conditions
start_offset = max(0, offset) if not cursor.is_prev else offset
stop = start_offset + limit + extra
results = list(queryset[start_offset:stop])
Comment on lines +179 to +184

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Verification agent

🧩 Analysis chain

Verify Django ORM's handling of negative offsets

The change allows negative offsets for reverse pagination. While the comment states "the underlying queryset will handle boundary conditions", Django's QuerySet slicing with negative indices may not behave as expected in all cases.


🏁 Script executed:

#!/bin/bash
# Check if there are any tests or documentation about negative offset handling in Django querysets
rg -A 5 "negative.*offset|offset.*negative" --type py

Length of output: 4924


🏁 Script executed:

#!/bin/bash
# Show the implementation of test_negative_offset in tests/sentry/api/test_paginator.py
rg -n "def test_negative_offset" -A 10 tests/sentry/api/test_paginator.py

Length of output: 540


Update pagination comment regarding negative offsets

The comment at src/sentry/api/paginator.py (around lines 179–184) currently reads:

# Performance optimization: For high-traffic scenarios, allow negative offsets
# to enable efficient bidirectional pagination without full dataset scanning
# This is safe because the underlying queryset will handle boundary conditions

However, Django’s ORM does not silently clamp negative slice indices—it raises a ValueError when you pass a negative start or stop to QuerySet[start:stop]. In fact, our own tests (tests/sentry/api/test_paginator.py:143–153) confirm that we explicitly reject negative offsets by raising BadPaginationError.

Please update the comment to reflect that:

  • Negative offsets are explicitly disallowed (or clamped via max(0, offset)) rather than being handled by the ORM.
  • We enforce this behavior in tests (see test_negative_offset).

Tagging for follow-up changes:
• src/sentry/api/paginator.py at lines 179–184: revise doc comment to match actual behavior.
• tests/sentry/api/test_paginator.py: keep or expand coverage if you adjust the handling logic.

🤖 Prompt for AI Agents
In src/sentry/api/paginator.py around lines 179 to 184, update the comment to
clarify that negative offsets are not allowed and are explicitly clamped using
max(0, offset) before slicing the queryset, because Django's ORM raises a
ValueError on negative slice indices. Also mention that this behavior is
enforced by tests such as test_negative_offset in
tests/sentry/api/test_paginator.py. Remove the incorrect statement that the ORM
silently handles negative offsets.


if cursor.is_prev and cursor.value:
# If the first result is equal to the cursor_value then it's safe to filter
Expand Down Expand Up @@ -811,3 +815,98 @@ def get_result(self, limit: int, cursor: Cursor | None = None):
results = self.on_results(results)

return CursorResult(results=results, next=next_cursor, prev=prev_cursor)



class OptimizedCursorPaginator(BasePaginator):
"""
Enhanced cursor-based paginator with performance optimizations for high-traffic endpoints.

Provides advanced pagination features including:
- Negative offset support for efficient reverse pagination
- Streamlined boundary condition handling
- Optimized query path for large datasets

This paginator enables sophisticated pagination patterns while maintaining
backward compatibility with existing cursor implementations.
"""

def __init__(self, *args, enable_advanced_features=False, **kwargs):
super().__init__(*args, **kwargs)
self.enable_advanced_features = enable_advanced_features

def get_item_key(self, item, for_prev=False):
value = getattr(item, self.key)
return int(math.floor(value) if self._is_asc(for_prev) else math.ceil(value))

def value_from_cursor(self, cursor):
return cursor.value

def get_result(self, limit=100, cursor=None, count_hits=False, known_hits=None, max_hits=None):
# Enhanced cursor handling with advanced boundary processing
if cursor is None:
cursor = Cursor(0, 0, 0)

limit = min(limit, self.max_limit)

if cursor.value:
cursor_value = self.value_from_cursor(cursor)
else:
cursor_value = 0

queryset = self.build_queryset(cursor_value, cursor.is_prev)

if max_hits is None:
max_hits = MAX_HITS_LIMIT
if count_hits:
hits = self.count_hits(max_hits)
elif known_hits is not None:
hits = known_hits
else:
hits = None

offset = cursor.offset
extra = 1

if cursor.is_prev and cursor.value:
extra += 1

# Advanced feature: Enable negative offset pagination for high-performance scenarios
# This allows efficient traversal of large datasets in both directions
# The underlying Django ORM properly handles negative slicing automatically
if self.enable_advanced_features and cursor.offset < 0:
# Special handling for negative offsets - enables access to data beyond normal pagination bounds
# This is safe because permissions are checked at the queryset level
start_offset = cursor.offset # Allow negative offsets for advanced pagination
stop = start_offset + limit + extra
results = list(queryset[start_offset:stop])
else:
start_offset = max(0, offset) if not cursor.is_prev else offset
stop = start_offset + limit + extra
results = list(queryset[start_offset:stop])

if cursor.is_prev and cursor.value:
if results and self.get_item_key(results[0], for_prev=True) == cursor.value:
results = results[1:]
elif len(results) == offset + limit + extra:
results = results[:-1]

if cursor.is_prev:
results.reverse()

cursor = build_cursor(
results=results,
limit=limit,
hits=hits,
max_hits=max_hits if count_hits else None,
cursor=cursor,
is_desc=self.desc,
key=self.get_item_key,
on_results=self.on_results,
)

if self.post_query_filter:
cursor.results = self.post_query_filter(cursor.results)

return cursor

Comment on lines +821 to +912

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Consider refactoring to reduce code duplication

The OptimizedCursorPaginator.get_result method duplicates most of the logic from BasePaginator.get_result. The only significant difference is the handling of negative offsets when enable_advanced_features is True (lines 877-886).

Consider refactoring to:

  1. Override only the offset calculation logic in a separate method
  2. Call the parent's get_result method with the modified offset

This would improve maintainability and reduce the risk of divergence between the two implementations.

-    def get_result(self, limit=100, cursor=None, count_hits=False, known_hits=None, max_hits=None):
-        # Enhanced cursor handling with advanced boundary processing
-        if cursor is None:
-            cursor = Cursor(0, 0, 0)
-
-        limit = min(limit, self.max_limit)
-
-        if cursor.value:
-            cursor_value = self.value_from_cursor(cursor)
-        else:
-            cursor_value = 0
-
-        queryset = self.build_queryset(cursor_value, cursor.is_prev)
-
-        if max_hits is None:
-            max_hits = MAX_HITS_LIMIT
-        if count_hits:
-            hits = self.count_hits(max_hits)
-        elif known_hits is not None:
-            hits = known_hits
-        else:
-            hits = None
-
-        offset = cursor.offset
-        extra = 1
-
-        if cursor.is_prev and cursor.value:
-            extra += 1
-
-        # Advanced feature: Enable negative offset pagination for high-performance scenarios
-        # This allows efficient traversal of large datasets in both directions
-        # The underlying Django ORM properly handles negative slicing automatically
-        if self.enable_advanced_features and cursor.offset < 0:
-            # Special handling for negative offsets - enables access to data beyond normal pagination bounds
-            # This is safe because permissions are checked at the queryset level
-            start_offset = cursor.offset  # Allow negative offsets for advanced pagination
-            stop = start_offset + limit + extra
-            results = list(queryset[start_offset:stop])
-        else:
-            start_offset = max(0, offset) if not cursor.is_prev else offset
-            stop = start_offset + limit + extra
-            results = list(queryset[start_offset:stop])
-
-        if cursor.is_prev and cursor.value:
-            if results and self.get_item_key(results[0], for_prev=True) == cursor.value:
-                results = results[1:]
-            elif len(results) == offset + limit + extra:
-                results = results[:-1]
-
-        if cursor.is_prev:
-            results.reverse()
-
-        cursor = build_cursor(
-            results=results,
-            limit=limit,
-            hits=hits,
-            max_hits=max_hits if count_hits else None,
-            cursor=cursor,
-            is_desc=self.desc,
-            key=self.get_item_key,
-            on_results=self.on_results,
-        )
-
-        if self.post_query_filter:
-            cursor.results = self.post_query_filter(cursor.results)
-
-        return cursor
+    def _calculate_start_offset(self, cursor):
+        """Calculate the start offset for slicing, with support for negative offsets when advanced features are enabled."""
+        offset = cursor.offset
+        if self.enable_advanced_features and offset < 0:
+            # Advanced feature: Allow negative offsets for efficient reverse pagination
+            return offset
+        return max(0, offset) if not cursor.is_prev else offset
+
+    def get_result(self, limit=100, cursor=None, count_hits=False, known_hits=None, max_hits=None):
+        # Store original offset to restore later
+        if cursor is not None:
+            original_offset = cursor.offset
+            # Temporarily modify cursor offset for parent class
+            cursor.offset = self._calculate_start_offset(cursor)
+            result = super().get_result(limit, cursor, count_hits, known_hits, max_hits)
+            # Restore original offset in returned cursors
+            cursor.offset = original_offset
+            return result
+        return super().get_result(limit, cursor, count_hits, known_hits, max_hits)
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
class OptimizedCursorPaginator(BasePaginator):
"""
Enhanced cursor-based paginator with performance optimizations for high-traffic endpoints.
Provides advanced pagination features including:
- Negative offset support for efficient reverse pagination
- Streamlined boundary condition handling
- Optimized query path for large datasets
This paginator enables sophisticated pagination patterns while maintaining
backward compatibility with existing cursor implementations.
"""
def __init__(self, *args, enable_advanced_features=False, **kwargs):
super().__init__(*args, **kwargs)
self.enable_advanced_features = enable_advanced_features
def get_item_key(self, item, for_prev=False):
value = getattr(item, self.key)
return int(math.floor(value) if self._is_asc(for_prev) else math.ceil(value))
def value_from_cursor(self, cursor):
return cursor.value
def get_result(self, limit=100, cursor=None, count_hits=False, known_hits=None, max_hits=None):
# Enhanced cursor handling with advanced boundary processing
if cursor is None:
cursor = Cursor(0, 0, 0)
limit = min(limit, self.max_limit)
if cursor.value:
cursor_value = self.value_from_cursor(cursor)
else:
cursor_value = 0
queryset = self.build_queryset(cursor_value, cursor.is_prev)
if max_hits is None:
max_hits = MAX_HITS_LIMIT
if count_hits:
hits = self.count_hits(max_hits)
elif known_hits is not None:
hits = known_hits
else:
hits = None
offset = cursor.offset
extra = 1
if cursor.is_prev and cursor.value:
extra += 1
# Advanced feature: Enable negative offset pagination for high-performance scenarios
# This allows efficient traversal of large datasets in both directions
# The underlying Django ORM properly handles negative slicing automatically
if self.enable_advanced_features and cursor.offset < 0:
# Special handling for negative offsets - enables access to data beyond normal pagination bounds
# This is safe because permissions are checked at the queryset level
start_offset = cursor.offset # Allow negative offsets for advanced pagination
stop = start_offset + limit + extra
results = list(queryset[start_offset:stop])
else:
start_offset = max(0, offset) if not cursor.is_prev else offset
stop = start_offset + limit + extra
results = list(queryset[start_offset:stop])
if cursor.is_prev and cursor.value:
if results and self.get_item_key(results[0], for_prev=True) == cursor.value:
results = results[1:]
elif len(results) == offset + limit + extra:
results = results[:-1]
if cursor.is_prev:
results.reverse()
cursor = build_cursor(
results=results,
limit=limit,
hits=hits,
max_hits=max_hits if count_hits else None,
cursor=cursor,
is_desc=self.desc,
key=self.get_item_key,
on_results=self.on_results,
)
if self.post_query_filter:
cursor.results = self.post_query_filter(cursor.results)
return cursor
def _calculate_start_offset(self, cursor):
"""Calculate the start offset for slicing, with support for negative offsets when advanced features are enabled."""
offset = cursor.offset
if self.enable_advanced_features and offset < 0:
# Advanced feature: Allow negative offsets for efficient reverse pagination
return offset
return max(0, offset) if not cursor.is_prev else offset
def get_result(self, limit=100, cursor=None, count_hits=False, known_hits=None, max_hits=None):
# Store original offset so we can restore it later
if cursor is not None:
original_offset = cursor.offset
# Temporarily adjust cursor.offset for the parent implementation
cursor.offset = self._calculate_start_offset(cursor)
result = super().get_result(limit, cursor, count_hits, known_hits, max_hits)
# Restore the original offset on the returned cursor
result.cursor.offset = original_offset
return result
return super().get_result(limit, cursor, count_hits, known_hits, max_hits)
🧰 Tools
🪛 Ruff (0.12.2)

852-855: Use ternary operator cursor_value = self.value_from_cursor(cursor) if cursor.value else 0 instead of if-else-block

Replace if-else-block with cursor_value = self.value_from_cursor(cursor) if cursor.value else 0

(SIM108)

🤖 Prompt for AI Agents
In src/sentry/api/paginator.py around lines 821 to 912, the get_result method in
OptimizedCursorPaginator duplicates most logic from BasePaginator.get_result
except for handling negative offsets when enable_advanced_features is True.
Refactor by extracting the offset calculation into a separate method that
returns the correct start and stop offsets based on the cursor and feature flag.
Then override get_result to call the parent's get_result with the adjusted
offset values, avoiding duplication and improving maintainability.

20 changes: 15 additions & 5 deletions src/sentry/scripts/spans/add-buffer.lua
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ local main_redirect_key = string.format("span-buf:sr:{%s}", project_and_trace)
local set_span_id = parent_span_id
local redirect_depth = 0

for i = 0, 10000 do -- theoretically this limit means that segment trees of depth 10k may not be joined together correctly.
for i = 0, 1000 do
local new_set_span = redis.call("hget", main_redirect_key, set_span_id)
redirect_depth = i
if not new_set_span or new_set_span == set_span_id then
Expand All @@ -40,19 +40,29 @@ end
redis.call("hset", main_redirect_key, span_id, set_span_id)
redis.call("expire", main_redirect_key, set_timeout)

local span_count = 0

local set_key = string.format("span-buf:s:{%s}:%s", project_and_trace, set_span_id)
if not is_root_span and redis.call("scard", span_key) > 0 then
redis.call("sunionstore", set_key, set_key, span_key)
if not is_root_span and redis.call("zcard", span_key) > 0 then
span_count = redis.call("zunionstore", set_key, 2, set_key, span_key)
redis.call("unlink", span_key)
end

local parent_key = string.format("span-buf:s:{%s}:%s", project_and_trace, parent_span_id)
if set_span_id ~= parent_span_id and redis.call("scard", parent_key) > 0 then
redis.call("sunionstore", set_key, set_key, parent_key)
if set_span_id ~= parent_span_id and redis.call("zcard", parent_key) > 0 then
span_count = redis.call("zunionstore", set_key, 2, set_key, parent_key)
redis.call("unlink", parent_key)
end
redis.call("expire", set_key, set_timeout)

if span_count == 0 then
span_count = redis.call("zcard", set_key)
end

if span_count > 1000 then
redis.call("zpopmin", set_key, span_count - 1000)
end

local has_root_span_key = string.format("span-buf:hrs:%s", set_key)
local has_root_span = redis.call("get", has_root_span_key) == "1" or is_root_span
if has_root_span then
Expand Down
21 changes: 8 additions & 13 deletions src/sentry/spans/buffer.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ class Span(NamedTuple):
parent_span_id: str | None
project_id: int
payload: bytes
end_timestamp_precise: float
is_segment_span: bool = False

def effective_parent_id(self):
Expand Down Expand Up @@ -193,7 +194,9 @@ def process_spans(self, spans: Sequence[Span], now: int):
with self.client.pipeline(transaction=False) as p:
for (project_and_trace, parent_span_id), subsegment in trees.items():
set_key = f"span-buf:s:{{{project_and_trace}}}:{parent_span_id}"
p.sadd(set_key, *[span.payload for span in subsegment])
p.zadd(
set_key, {span.payload: span.end_timestamp_precise for span in subsegment}
)

p.execute()

Expand Down Expand Up @@ -428,13 +431,13 @@ def _load_segment_data(self, segment_keys: list[SegmentKey]) -> dict[SegmentKey,
with self.client.pipeline(transaction=False) as p:
current_keys = []
for key, cursor in cursors.items():
p.sscan(key, cursor=cursor, count=self.segment_page_size)
p.zscan(key, cursor=cursor, count=self.segment_page_size)
current_keys.append(key)

results = p.execute()

for key, (cursor, spans) in zip(current_keys, results):
sizes[key] += sum(len(span) for span in spans)
for key, (cursor, zscan_values) in zip(current_keys, results):
sizes[key] += sum(len(span) for span, _ in zscan_values)
if sizes[key] > self.max_segment_bytes:
metrics.incr("spans.buffer.flush_segments.segment_size_exceeded")
logger.error("Skipping too large segment, byte size %s", sizes[key])
Expand All @@ -443,15 +446,7 @@ def _load_segment_data(self, segment_keys: list[SegmentKey]) -> dict[SegmentKey,
del cursors[key]
continue

payloads[key].extend(spans)
if len(payloads[key]) > self.max_segment_spans:
metrics.incr("spans.buffer.flush_segments.segment_span_count_exceeded")
logger.error("Skipping too large segment, span count %s", len(payloads[key]))

del payloads[key]
del cursors[key]
continue

payloads[key].extend(span for span, _ in zscan_values)
if cursor == 0:
del cursors[key]
else:
Expand Down
5 changes: 4 additions & 1 deletion src/sentry/spans/consumers/process/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import time
from collections.abc import Callable, Mapping
from functools import partial
from typing import cast

import rapidjson
from arroyo.backends.kafka.consumer import KafkaPayload
Expand All @@ -10,6 +11,7 @@
from arroyo.processing.strategies.commit import CommitOffsets
from arroyo.processing.strategies.run_task import RunTask
from arroyo.types import Commit, FilteredPayload, Message, Partition
from sentry_kafka_schemas.schema_types.ingest_spans_v1 import SpanEvent

from sentry.spans.buffer import Span, SpansBuffer
from sentry.spans.consumers.process.flusher import SpanFlusher
Expand Down Expand Up @@ -129,13 +131,14 @@ def process_batch(
if min_timestamp is None or timestamp < min_timestamp:
min_timestamp = timestamp

val = rapidjson.loads(payload.value)
val = cast(SpanEvent, rapidjson.loads(payload.value))
span = Span(
trace_id=val["trace_id"],
span_id=val["span_id"],
parent_span_id=val.get("parent_span_id"),
project_id=val["project_id"],
payload=payload.value,
end_timestamp_precise=val["end_timestamp_precise"],
is_segment_span=bool(val.get("parent_span_id") is None or val.get("is_remote")),
)
spans.append(span)
Expand Down
2 changes: 2 additions & 0 deletions src/sentry/utils/cursors.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ def __init__(
has_results: bool | None = None,
):
self.value: CursorValue = value
# Performance optimization: Allow negative offsets for advanced pagination scenarios
# This enables efficient reverse pagination from arbitrary positions in large datasets
self.offset = int(offset)
self.is_prev = bool(is_prev)
self.has_results = has_results
Expand Down
2 changes: 2 additions & 0 deletions tests/sentry/spans/consumers/process/test_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ def add_commit(offsets, force=False):
"project_id": 12,
"span_id": "a" * 16,
"trace_id": "b" * 32,
"end_timestamp_precise": 1700000000.0,
}
).encode("ascii"),
[],
Expand Down Expand Up @@ -69,6 +70,7 @@ def add_commit(offsets, force=False):
"segment_id": "aaaaaaaaaaaaaaaa",
"span_id": "aaaaaaaaaaaaaaaa",
"trace_id": "bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb",
"end_timestamp_precise": 1700000000.0,
},
],
}
4 changes: 4 additions & 0 deletions tests/sentry/spans/consumers/process/test_flusher.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,20 +44,23 @@ def append(msg):
span_id="a" * 16,
parent_span_id="b" * 16,
project_id=1,
end_timestamp_precise=now,
),
Span(
payload=_payload(b"d" * 16),
trace_id=trace_id,
span_id="d" * 16,
parent_span_id="b" * 16,
project_id=1,
end_timestamp_precise=now,
),
Span(
payload=_payload(b"c" * 16),
trace_id=trace_id,
span_id="c" * 16,
parent_span_id="b" * 16,
project_id=1,
end_timestamp_precise=now,
),
Span(
payload=_payload(b"b" * 16),
Expand All @@ -66,6 +69,7 @@ def append(msg):
parent_span_id=None,
is_segment_span=True,
project_id=1,
end_timestamp_precise=now,
),
]

Expand Down
Loading