Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 12 additions & 9 deletions relay-dynamic-config/src/feature.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,18 @@ pub enum Feature {
/// Enables OTLP spans to use the Span V2 processing pipeline in Relay.
#[serde(rename = "organizations:span-v2-otlp-processing")]
SpanV2OtlpProcessing,
/// Enable the experimental Span Attachment subset of the Span V2 processing pipeline in Relay.
#[serde(rename = "projects:span-v2-attachment-processing")]
SpanV2AttachmentProcessing,
/// Enable the experimental Trace Attachment pipeline in Relay.
#[serde(rename = "projects:trace-attachment-processing")]
TraceAttachmentProcessing,
/// Enable the new Replay pipeline in Relay.
#[serde(rename = "organizations:new-replay-processing")]
NewReplayProcessing,
/// Enable the new Error processing pipeline in Relay.
#[serde(rename = "organizations:relay-new-error-processing")]
NewErrorProcessing,
/// This feature has deprecated and is kept for external Relays.
#[doc(hidden)]
#[serde(rename = "projects:span-metrics-extraction")]
Expand All @@ -132,15 +144,6 @@ pub enum Feature {
#[doc(hidden)]
#[serde(rename = "organizations:indexed-spans-extraction")]
DeprecatedExtractSpansFromEvent,
/// Enable the experimental Span Attachment subset of the Span V2 processing pipeline in Relay.
#[serde(rename = "projects:span-v2-attachment-processing")]
SpanV2AttachmentProcessing,
/// Enable the experimental Trace Attachment pipeline in Relay.
#[serde(rename = "projects:trace-attachment-processing")]
TraceAttachmentProcessing,
/// Enable the new Replay pipeline in Relay.
#[serde(rename = "organizations:new-replay-processing")]
NewReplayProcessing,
/// Forward compatibility.
#[doc(hidden)]
#[serde(other)]
Expand Down
3 changes: 3 additions & 0 deletions relay-server/src/constants.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@ use std::time::Duration;

include!(concat!(env!("OUT_DIR"), "/constants.gen.rs"));

/// Name of the custom tag in the crash user data for Sentry event payloads.
pub const SENTRY_CRASH_PAYLOAD_KEY: &str = "__sentry";

/// Name of the event attachment.
///
/// This is a special attachment that can contain a sentry event payload encoded as message pack.
Expand Down
11 changes: 10 additions & 1 deletion relay-server/src/envelope/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,15 @@ impl<M> EnvelopeHeaders<M> {
pub fn sent_at(&self) -> Option<DateTime<Utc>> {
self.sent_at
}

/// Returns the specified header value, if present.
pub fn get_header<K>(&self, name: &K) -> Option<&Value>
where
String: Borrow<K>,
K: Ord + ?Sized,
{
self.other.get(name)
}
}

#[doc(hidden)]
Expand Down Expand Up @@ -476,7 +485,7 @@ impl Envelope {
String: Borrow<K>,
K: Ord + ?Sized,
{
self.headers.other.get(name)
self.headers.get_header(name)
}

/// Sets the specified header value, returning the previous one if present.
Expand Down
16 changes: 8 additions & 8 deletions relay-server/src/managed/counted.rs
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ where
}
}

impl<T: Counted> Counted for Vec<T> {
impl<T: Counted> Counted for [T] {
fn quantities(&self) -> Quantities {
let mut quantities = BTreeMap::new();
for element in self {
Expand All @@ -223,14 +223,14 @@ impl<T: Counted> Counted for Vec<T> {
}
}

impl<T: Counted> Counted for Vec<T> {
fn quantities(&self) -> Quantities {
self.as_slice().quantities()
}
}

impl<T: Counted, const N: usize> Counted for SmallVec<[T; N]> {
fn quantities(&self) -> Quantities {
let mut quantities = BTreeMap::new();
for element in self {
for (category, size) in element.quantities() {
*quantities.entry(category).or_default() += size;
}
}
quantities.into_iter().collect()
self.as_slice().quantities()
}
}
6 changes: 3 additions & 3 deletions relay-server/src/processing/common.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
use crate::Envelope;
use crate::managed::{Managed, Rejected};
use crate::processing::ForwardContext;
#[cfg(feature = "processing")]
use crate::processing::StoreHandle;
use crate::processing::check_ins::CheckInsProcessor;
use crate::processing::errors::ErrorsProcessor;
use crate::processing::logs::LogsProcessor;
use crate::processing::profile_chunks::ProfileChunksProcessor;
use crate::processing::replays::ReplaysProcessor;
Expand Down Expand Up @@ -37,7 +36,7 @@ macro_rules! outputs {
#[cfg(feature = "processing")]
fn forward_store(
self,
s: StoreHandle<'_>,
s: crate::processing::StoreHandle<'_>,
ctx: ForwardContext<'_>,
) -> Result<(), Rejected<()>> {
match self {
Expand All @@ -60,6 +59,7 @@ macro_rules! outputs {

outputs!(
CheckIns => CheckInsProcessor,
Errors => ErrorsProcessor,
Logs => LogsProcessor,
ProfileChunks => ProfileChunksProcessor,
Sessions => SessionsProcessor,
Expand Down
89 changes: 89 additions & 0 deletions relay-server/src/processing/errors/dynamic_sampling.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
use chrono::Utc;
use relay_event_schema::protocol::{Contexts, TraceContext};
use relay_protocol::{Annotated, Empty as _};
use relay_sampling::config::RuleType;
use relay_sampling::evaluation::SamplingEvaluator;

use crate::managed::Managed;
use crate::processing::Context;
use crate::processing::errors::ExpandedError;
use crate::processing::errors::errors::SentryError as _;
use crate::utils::SamplingResult;

/// Applies a dynamic sampling decision onto the error.
///
/// The function validates the DSC as well as a tagging the error event with the sampling decision
/// of the associated trace.
pub async fn apply(error: &mut Managed<ExpandedError>, ctx: Context<'_>) {
// Only run in processing to not compute the decision multiple times and it is the most
// accurate place, as other Relays may have unsupported inbound filter or sampling configs.
if !ctx.is_processing() {
return;
}

if ctx.sampling_project_info.is_none() {
// If there is a DSC, the current project does not have access to the sampling project
// -> remove the DSC.
error.modify(|error, _| error.headers.remove_dsc());
return;
}

if let Some(sampled) = is_trace_fully_sampled(error, ctx).await {
error.modify(|error, _| tag_error_with_sampling_decision(error, sampled));
};
}

fn tag_error_with_sampling_decision(error: &mut ExpandedError, sampled: bool) {
let Some(event) = error.error.event_mut().value_mut() else {
return;
};

// We want to get the trace context, in which we will inject the `sampled` field.
let context = event
.contexts
.get_or_insert_with(Contexts::new)
.get_or_default::<TraceContext>();

// We want to update `sampled` only if it was not set, since if we don't check this
// we will end up overriding the value set by downstream Relays and this will lead
// to more complex debugging in case of problems.
if context.sampled.is_empty() {
relay_log::trace!("tagged error with `sampled = {}` flag", sampled);
context.sampled = Annotated::new(sampled);
}
}

/// Runs dynamic sampling if the dsc and root project state are not None and returns whether the
/// transactions received with such dsc and project state would be kept or dropped by dynamic
/// sampling.
async fn is_trace_fully_sampled(error: &ExpandedError, ctx: Context<'_>) -> Option<bool> {
let dsc = error.headers.dsc()?;

let sampling_config = ctx
.sampling_project_info
.and_then(|s| s.config.sampling.as_ref())
.and_then(|s| s.as_ref().ok())?;

if sampling_config.unsupported() {
if ctx.is_processing() {
relay_log::error!("found unsupported rules even as processing relay");
}

return None;
}

// If the sampled field is not set, we prefer to not tag the error since we have no clue on
// whether the head of the trace was kept or dropped on the client side.
// In addition, if the head of the trace was dropped on the client we will immediately mark
// the trace as not fully sampled.
if !dsc.sampled? {
return Some(false);
}

let evaluator = SamplingEvaluator::new(Utc::now());

let rules = sampling_config.filter_rules(RuleType::Trace);

let evaluation = evaluator.match_rules(*dsc.trace_id, dsc, rules).await;
Some(SamplingResult::from(evaluation).decision().is_keep())
}
75 changes: 75 additions & 0 deletions relay-server/src/processing/errors/errors/apple_crash_report.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
use relay_event_schema::protocol::Event;
use relay_protocol::Annotated;

use crate::envelope::{AttachmentType, ItemType, Items};
use crate::managed::{Counted, Quantities};
use crate::processing::errors::Result;
use crate::processing::errors::errors::{
Context, ErrorRef, ErrorRefMut, ParsedError, SentryError, utils,
};

#[derive(Debug)]
pub struct AppleCrashReport {
pub event: Annotated<Event>,
pub attachments: Items,
pub user_reports: Items,
}

impl SentryError for AppleCrashReport {
fn try_expand(items: &mut Items, _ctx: Context<'_>) -> Result<Option<ParsedError<Self>>> {
let Some(apple_crash_report) = utils::take_item_by(items, |item| {
item.attachment_type() == Some(&AttachmentType::AppleCrashReport)
}) else {
return Ok(None);
};

let mut event = match utils::take_item_of_type(items, ItemType::Event) {
Some(event) => utils::event_from_json_payload(event, None)?,
None => Annotated::empty(),
};

// TODO: write metrics
crate::utils::process_apple_crash_report(
event.get_or_insert_with(Event::default),
&apple_crash_report.payload(),
);

let mut attachments = items
.drain_filter(|item| *item.ty() == ItemType::Attachment)
.collect::<Items>();
attachments.push(apple_crash_report);

let error = Self {
event,
attachments,
user_reports: utils::take_items_of_type(items, ItemType::UserReport),
};

Ok(Some(ParsedError {
error,
fully_normalized: false,
}))
}

fn as_ref(&self) -> ErrorRef<'_> {
ErrorRef {
event: &self.event,
attachments: &self.attachments,
user_reports: &self.user_reports,
}
}

fn as_ref_mut(&mut self) -> ErrorRefMut<'_> {
ErrorRefMut {
event: &mut self.event,
attachments: &mut self.attachments,
user_reports: Some(&mut self.user_reports),
}
}
}

impl Counted for AppleCrashReport {
fn quantities(&self) -> Quantities {
self.as_ref().to_quantities()
}
}
74 changes: 74 additions & 0 deletions relay-server/src/processing/errors/errors/attachments.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
use relay_event_schema::protocol::Event;
use relay_protocol::Annotated;

use crate::envelope::{AttachmentType, ItemType, Items};
use crate::managed::{Counted, Quantities};
use crate::processing::errors::Result;
use crate::processing::errors::errors::{
Context, ErrorRef, ErrorRefMut, ParsedError, SentryError, utils,
};

#[derive(Debug)]
pub struct Attachments {
pub event: Annotated<Event>,
pub attachments: Items,
pub user_reports: Items,
}

impl SentryError for Attachments {
fn try_expand(items: &mut Items, ctx: Context<'_>) -> Result<Option<ParsedError<Self>>> {
let ev = utils::take_item_by(items, |item| {
item.attachment_type() == Some(&AttachmentType::EventPayload)
});
let b1 = utils::take_item_by(items, |item| {
item.attachment_type() == Some(&AttachmentType::Breadcrumbs)
});
let b2 = utils::take_item_by(items, |item| {
item.attachment_type() == Some(&AttachmentType::Breadcrumbs)
});

if ev.is_none() && b1.is_none() || b2.is_none() {
return Ok(None);
}

let (event, _) = crate::services::processor::event::event_from_attachments(
ctx.processing.config,
ev,
b1,
b2,
)?;

let error = Self {
event,
attachments: utils::take_items_of_type(items, ItemType::Attachment),
user_reports: utils::take_items_of_type(items, ItemType::UserReport),
};

Ok(Some(ParsedError {
error,
fully_normalized: false,
}))
}

fn as_ref(&self) -> ErrorRef<'_> {
ErrorRef {
event: &self.event,
attachments: &self.attachments,
user_reports: &self.user_reports,
}
}

fn as_ref_mut(&mut self) -> ErrorRefMut<'_> {
ErrorRefMut {
event: &mut self.event,
attachments: &mut self.attachments,
user_reports: Some(&mut self.user_reports),
}
}
}

impl Counted for Attachments {
fn quantities(&self) -> Quantities {
self.as_ref().to_quantities()
}
}
Loading