Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
375617e
ref(security-headers): migrate CSP settings form to AutoSaveField (#1…
JonasBa Mar 25, 2026
d32591e
ref(db): Migrate create_or_update to update_or_create (#111510)
vgrozdanic Mar 25, 2026
98272c3
fix(dashboards): Clean up column aliases and chart legends in prebuil…
gggritso Mar 25, 2026
399c51a
ref(webvitals): Break circular import by moving ORDER constant to typ…
scttcper Mar 25, 2026
347562e
ref(ui): Remove overflow from guide steps (#111462)
evanpurkhiser Mar 25, 2026
e3c8e4c
feat(pipeline): Add API mode support to base Pipeline (#111422)
evanpurkhiser Mar 25, 2026
436805b
fix(explore): Always show selected metric first in metric selector (#…
nsdeschenes Mar 25, 2026
19dc0d0
feat(metrics): Default trace metrics aggregate to sum (#111516)
nsdeschenes Mar 25, 2026
4e79ede
fix(dashboards): Ensure consistent casing in pre-built dashboard widg…
gggritso Mar 25, 2026
0b9ce5a
ref(autofix): Use AUTOFIX_AUTOMATION_OCCURRENCE_THRESHOLD constant (#…
chromy Mar 25, 2026
ffb7e08
fix(dashboards): Fix global filter showing "All" instead of selected …
gggritso Mar 25, 2026
ac38d20
fix(dashboards): Allow number-typed tags in categorical bar X-axis (#…
gggritso Mar 25, 2026
d557774
feat(settings): Add Integrations nav section with MCP & CLI page (#11…
dcramer Mar 25, 2026
94cd354
feat(nav): add cmd/ctrl+b nav toggle hotkey (#111503)
JonasBa Mar 25, 2026
93e1c2f
ref(nav) condense nav (#111468)
JonasBa Mar 25, 2026
f5eb98c
ref(cmdk) update cmdk to use scraps input (#111493)
JonasBa Mar 25, 2026
4734c03
fix(dashboards): blank visualize fields on tables/big numbers from te…
nikkikapadia Mar 25, 2026
5f812b9
fix(seer): Fix race condition in Seer settings dual-write to Sentry D…
srest2021 Mar 25, 2026
ddc7dc5
test(insights): Remove useLocation/usePageFilters mocks from simple t…
scttcper Mar 25, 2026
8bbe2ba
ref: Migrate remaining ThreadPoolExecutor usages and enable S016 (#11…
gricha Mar 25, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,6 @@ sentry =
# All other linting (E, W, F, B, LOG, I) is handled by ruff.
# See [tool.ruff] in pyproject.toml for the main linting configuration.
select = S
# S016 is temporarily disabled until the ThreadPoolExecutor migration is complete.
extend-ignore = S016
per-file-ignores =
# these scripts must have minimal dependencies so opt out of the usual sentry rules
.github/*: S
Expand Down
4 changes: 2 additions & 2 deletions src/sentry/audit_log/services/log/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,10 @@ def record_audit_log(self, *, event: AuditLogEvent) -> None:
raise

def record_user_ip(self, *, event: UserIpEvent) -> None:
UserIP.objects.create_or_update(
UserIP.objects.update_or_create(
user_id=event.user_id,
ip_address=event.ip_address,
values=dict(
defaults=dict(
last_seen=event.last_seen,
country_code=event.country_code,
region_code=event.region_code,
Expand Down
4 changes: 2 additions & 2 deletions src/sentry/core/endpoints/organization_details.py
Original file line number Diff line number Diff line change
Expand Up @@ -1333,10 +1333,10 @@ def _compute_project_target_sample_rates(self, request: Request, organization: O
if rebalanced_projects is not None:
for rebalanced_item in rebalanced_projects:
if int(rebalanced_item.id) in project_ids:
ProjectOption.objects.create_or_update(
ProjectOption.objects.update_or_create(
project_id=rebalanced_item.id,
key="sentry:target_sample_rate",
values={"value": round(rebalanced_item.new_sample_rate, 4)},
defaults={"value": round(rebalanced_item.new_sample_rate, 4)},
)

def handle_delete(self, request: Request, organization: Organization):
Expand Down
5 changes: 3 additions & 2 deletions src/sentry/issues/occurrence_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import logging
from collections import defaultdict
from collections.abc import Mapping
from concurrent.futures import ThreadPoolExecutor, wait
from concurrent.futures import wait
from typing import Any
from uuid import UUID

Expand Down Expand Up @@ -33,6 +33,7 @@
from sentry.services.eventstore.models import Event
from sentry.types.actor import parse_and_validate_actor
from sentry.utils import metrics
from sentry.utils.concurrent import ContextPropagatingThreadPoolExecutor

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -434,7 +435,7 @@ def _process_message(
@sentry_sdk.tracing.trace
@metrics.wraps("occurrence_consumer.process_batch")
def process_occurrence_batch(
worker: ThreadPoolExecutor, message: Message[ValuesBatch[KafkaPayload]]
worker: ContextPropagatingThreadPoolExecutor, message: Message[ValuesBatch[KafkaPayload]]
) -> None:
"""
Receives batches of occurrences. This function will take the batch
Expand Down
10 changes: 7 additions & 3 deletions src/sentry/issues/run.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import functools
import logging
from collections.abc import Mapping
from concurrent.futures import ThreadPoolExecutor
from typing import Literal

import orjson
Expand All @@ -16,6 +15,7 @@
from arroyo.types import Commit, Message, Partition

from sentry.utils.arroyo import MultiprocessingPool, run_task_with_multiprocessing
from sentry.utils.concurrent import ContextPropagatingThreadPoolExecutor

logger = logging.getLogger(__name__)

Expand All @@ -40,7 +40,9 @@ def __init__(
self.batched = mode == "batched-parallel"
# either use multi-process pool or a thread pool
if self.batched:
self.worker: ThreadPoolExecutor | None = ThreadPoolExecutor()
self.worker: ContextPropagatingThreadPoolExecutor | None = (
ContextPropagatingThreadPoolExecutor()
)
self.pool: MultiprocessingPool | None = None
else:
# make sure num_processes is not None
Expand Down Expand Up @@ -102,7 +104,9 @@ def process_message(message: Message[KafkaPayload]) -> None:
logger.exception("failed to process message payload")


def process_batch(worker: ThreadPoolExecutor, messages: Message[ValuesBatch[KafkaPayload]]) -> None:
def process_batch(
worker: ContextPropagatingThreadPoolExecutor, messages: Message[ValuesBatch[KafkaPayload]]
) -> None:
from sentry.issues.occurrence_consumer import process_occurrence_batch

try:
Expand Down
9 changes: 5 additions & 4 deletions src/sentry/monitors/consumers/monitor_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import uuid
from collections import defaultdict
from collections.abc import Mapping
from concurrent.futures import ThreadPoolExecutor, wait
from concurrent.futures import wait
from copy import deepcopy
from datetime import UTC, datetime
from functools import partial
Expand Down Expand Up @@ -81,6 +81,7 @@
from sentry.monitors.validators import ConfigValidator, MonitorCheckInValidator
from sentry.types.actor import parse_and_validate_actor
from sentry.utils import json, metrics
from sentry.utils.concurrent import ContextPropagatingThreadPoolExecutor
from sentry.utils.dates import to_datetime
from sentry.utils.outcomes import Outcome, track_outcome

Expand Down Expand Up @@ -1046,7 +1047,7 @@ def process_checkin_group(items: list[CheckinItem]) -> None:


def process_batch(
executor: ThreadPoolExecutor, message: Message[ValuesBatch[KafkaPayload]]
executor: ContextPropagatingThreadPoolExecutor, message: Message[ValuesBatch[KafkaPayload]]
) -> None:
"""
Receives batches of check-in messages. This function will take the batch
Expand Down Expand Up @@ -1142,7 +1143,7 @@ def process_single(message: Message[KafkaPayload | FilteredPayload]) -> None:


class StoreMonitorCheckInStrategyFactory(ProcessingStrategyFactory[KafkaPayload]):
parallel_executor: ThreadPoolExecutor | None = None
parallel_executor: ContextPropagatingThreadPoolExecutor | None = None

batched_parallel = False
"""
Expand All @@ -1168,7 +1169,7 @@ def __init__(
) -> None:
if mode == "batched-parallel":
self.batched_parallel = True
self.parallel_executor = ThreadPoolExecutor(max_workers=max_workers)
self.parallel_executor = ContextPropagatingThreadPoolExecutor(max_workers=max_workers)

if max_batch_size is not None:
self.max_batch_size = max_batch_size
Expand Down
68 changes: 66 additions & 2 deletions src/sentry/pipeline/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@
from ..models import Organization
from .constants import PIPELINE_STATE_TTL
from .store import PipelineSessionStore
from .types import PipelineRequestState
from .views.base import PipelineView
from .types import PipelineRequestState, PipelineStepAction, PipelineStepResult
from .views.base import ApiPipelineEndpoint, ApiPipelineStep, ApiPipelineSteps, PipelineView
from .views.nested import NestedPipelineView

ERR_MISMATCHED_USER = "Current user does not match user that started the pipeline."
Expand Down Expand Up @@ -257,5 +257,69 @@ def fetch_state(self, key: str | None = None) -> Any | None:
return nested_pipeline.fetch_state(key)
return self._fetch_state(key)

def get_pipeline_api_steps(self) -> ApiPipelineSteps[Self]:
"""
Return API step objects for this pipeline, or None if API mode is not
supported. Steps may be callables for late binding (resolved when the
step is reached). Subclasses override this to enable API mode.
"""
return None

def _resolve_api_step(self, step: ApiPipelineStep[Self]) -> ApiPipelineEndpoint[Self]:
return step() if callable(step) else step

def is_api_ready(self) -> bool:
"""Returns True if this pipeline supports API mode."""
return self.get_pipeline_api_steps() is not None

def _assert_user_authorization(self) -> None:
assert not (self.state.uid is not None and self.state.uid != self.request.user.id), (
ERR_MISMATCHED_USER
)

def get_current_step_info(self) -> dict[str, Any]:
"""Returns structured data describing the current pipeline step for API consumers."""
self._assert_user_authorization()
api_steps = self.get_pipeline_api_steps()
assert api_steps is not None
step_index = self.step_index
api_step = self._resolve_api_step(api_steps[step_index])
return {
"step": api_step.step_name,
"stepIndex": step_index,
"totalSteps": len(api_steps),
"provider": self.provider.key,
"data": api_step.get_step_data(self, self.request),
}

def api_advance(self, request: HttpRequest, data: Mapping[str, Any]) -> PipelineStepResult:
"""Validates and processes the current step in API mode, advancing the pipeline."""
self._assert_user_authorization()
api_steps = self.get_pipeline_api_steps()
assert api_steps is not None
step_index = self.step_index
api_step = self._resolve_api_step(api_steps[step_index])

serializer_cls = api_step.get_serializer_cls()
if serializer_cls is not None:
serializer = serializer_cls(data=data)
serializer.is_valid(raise_exception=True)
validated_data = serializer.validated_data
else:
validated_data = data

result = api_step.handle_post(validated_data, self, request)

if result.action == PipelineStepAction.ADVANCE:
self.state.step_index = step_index + 1
if self.step_index >= len(api_steps):
return self.api_finish_pipeline()

return result

def api_finish_pipeline(self) -> PipelineStepResult:
"""Called when all pipeline steps complete in API mode. Subclasses must override."""
raise NotImplementedError

def get_logger(self) -> logging.Logger:
return logging.getLogger(f"sentry.integration.{self.provider.key}")
9 changes: 8 additions & 1 deletion src/sentry/pipeline/provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from collections.abc import Callable, Mapping, Sequence
from typing import Any

from sentry.pipeline.views.base import PipelineView
from sentry.pipeline.views.base import ApiPipelineSteps, PipelineView


class PipelineProvider[P](abc.ABC):
Expand Down Expand Up @@ -36,6 +36,13 @@ def get_pipeline_views(self) -> Sequence[PipelineView[P] | Callable[[], Pipeline
>>> return [OAuthInitView(), OAuthCallbackView()]
"""

def get_pipeline_api_steps(self) -> ApiPipelineSteps[P]:
"""
Return API step objects for this provider's pipeline, or None if API
mode is not supported. Override to enable the pipeline API.
"""
return None

def update_config(self, config: Mapping[str, Any]) -> None:
"""
Use update_config to allow additional provider configuration be assigned to
Expand Down
39 changes: 38 additions & 1 deletion src/sentry/pipeline/types.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
from __future__ import annotations

from dataclasses import dataclass
from dataclasses import dataclass, field
from enum import Enum
from typing import Any

from sentry.db.models.base import Model
from sentry.organizations.services.organization import RpcOrganization
Expand All @@ -24,3 +26,38 @@ class PipelineAnalyticsEntry:

event_type: str
pipeline_type: str


class PipelineStepAction(str, Enum):
ADVANCE = "advance"
STAY = "stay"
ERROR = "error"
COMPLETE = "complete"


@dataclass
class PipelineStepResult:
action: PipelineStepAction
data: dict[str, Any] = field(default_factory=dict)

@classmethod
def advance(cls, data: dict[str, Any] | None = None) -> PipelineStepResult:
return cls(action=PipelineStepAction.ADVANCE, data=data or {})

@classmethod
def stay(cls, data: dict[str, Any] | None = None) -> PipelineStepResult:
return cls(action=PipelineStepAction.STAY, data=data or {})

@classmethod
def error(cls, message: str) -> PipelineStepResult:
return cls(action=PipelineStepAction.ERROR, data={"detail": message})

@classmethod
def complete(cls, data: dict[str, Any] | None = None) -> PipelineStepResult:
return cls(action=PipelineStepAction.COMPLETE, data=data or {})

def serialize(self) -> dict[str, Any]:
result: dict[str, Any] = {"status": self.action.value}
if self.data:
result["data"] = self.data
return result
47 changes: 46 additions & 1 deletion src/sentry/pipeline/views/base.py
Original file line number Diff line number Diff line change
@@ -1,21 +1,66 @@
from __future__ import annotations

from collections.abc import Mapping
from collections.abc import Callable, Mapping, Sequence
from typing import Any, Protocol

from django.http.request import HttpRequest
from django.http.response import HttpResponseBase

from sentry.pipeline.types import PipelineStepResult
from sentry.utils import json
from sentry.web.client_config import get_client_config
from sentry.web.frontend.base import determine_active_organization
from sentry.web.helpers import render_to_response


class PipelineView[P](Protocol):
"""Legacy protocol for template/redirect-based pipeline steps.

Deprecated: new pipeline steps should implement ApiPipelineEndpoint instead.
Existing PipelineView implementations will be migrated incrementally as each
integration adopts API mode. Once all providers have been converted, this
protocol and the dispatch-based flow in Pipeline.current_step will be removed.
"""

def dispatch(self, request: HttpRequest, pipeline: P) -> HttpResponseBase: ...


class ApiPipelineEndpoint[P, D = Any, V = Any](Protocol):
"""Protocol for a pipeline step that supports API mode.

This replaces the legacy PipelineView dispatch() approach. Instead of
rendering templates and handling redirects server-side, each step exposes
structured data via get_step_data() and accepts validated input via
handle_post(), allowing a React frontend to drive the flow through JSON
API calls.

Each provider returns a list of these via get_pipeline_api_steps(), one
per pipeline view. The pipeline is considered API-ready when
get_pipeline_api_steps() returns any steps (see Pipeline.is_api_ready).

P: the pipeline type (e.g. IntegrationPipeline)
D: the TypedDict returned by get_step_data — typed step data for the frontend
V: the type of validated_data passed to handle_post (defaults to Any)
"""

step_name: str

def get_step_data(self, pipeline: P, request: HttpRequest) -> D: ...

def get_serializer_cls(self) -> type | None: ...

def handle_post(
self,
validated_data: V,
pipeline: P,
request: HttpRequest,
) -> PipelineStepResult: ...


type ApiPipelineStep[P] = ApiPipelineEndpoint[P] | Callable[[], ApiPipelineEndpoint[P]]
type ApiPipelineSteps[P] = Sequence[ApiPipelineStep[P]] | None


def render_react_view(
request: HttpRequest,
pipeline_name: str,
Expand Down
7 changes: 4 additions & 3 deletions src/sentry/remote_subscriptions/consumers/result_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import multiprocessing
from collections import defaultdict
from collections.abc import Mapping
from concurrent.futures import ThreadPoolExecutor, wait
from concurrent.futures import wait
from functools import partial
from typing import Generic, Literal, TypeVar

Expand All @@ -23,6 +23,7 @@
from sentry.remote_subscriptions.models import BaseRemoteSubscription
from sentry.utils import metrics
from sentry.utils.arroyo import MultiprocessingPool, run_task_with_multiprocessing
from sentry.utils.concurrent import ContextPropagatingThreadPoolExecutor
from sentry.utils.retries import TimedRetryPolicy

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -94,7 +95,7 @@ def handle_result(self, subscription: U | None, result: T):


class ResultsStrategyFactory(ProcessingStrategyFactory[KafkaPayload], Generic[T, U]):
parallel_executor: ThreadPoolExecutor | None = None
parallel_executor: ContextPropagatingThreadPoolExecutor | None = None

batched_parallel = False
"""
Expand Down Expand Up @@ -137,7 +138,7 @@ def __init__(
self.result_processor = self.result_processor_cls(use_subscription_lock=mode == "parallel")
if mode == "batched-parallel":
self.batched_parallel = True
self.parallel_executor = ThreadPoolExecutor(max_workers=max_workers)
self.parallel_executor = ContextPropagatingThreadPoolExecutor(max_workers=max_workers)
if max_workers is None:
metric_tags["workers"] = "default"
else:
Expand Down
Loading
Loading