diff --git a/relay-dynamic-config/src/feature.rs b/relay-dynamic-config/src/feature.rs index 0f4ac8ae73..9c650263c7 100644 --- a/relay-dynamic-config/src/feature.rs +++ b/relay-dynamic-config/src/feature.rs @@ -120,6 +120,18 @@ pub enum Feature { /// Enables OTLP spans to use the Span V2 processing pipeline in Relay. #[serde(rename = "organizations:span-v2-otlp-processing")] SpanV2OtlpProcessing, + /// Enable the experimental Span Attachment subset of the Span V2 processing pipeline in Relay. + #[serde(rename = "projects:span-v2-attachment-processing")] + SpanV2AttachmentProcessing, + /// Enable the experimental Trace Attachment pipeline in Relay. + #[serde(rename = "projects:trace-attachment-processing")] + TraceAttachmentProcessing, + /// Enable the new Replay pipeline in Relay. + #[serde(rename = "organizations:new-replay-processing")] + NewReplayProcessing, + /// Enable the new Error processing pipeline in Relay. + #[serde(rename = "organizations:relay-new-error-processing")] + NewErrorProcessing, /// This feature has deprecated and is kept for external Relays. #[doc(hidden)] #[serde(rename = "projects:span-metrics-extraction")] @@ -132,15 +144,6 @@ pub enum Feature { #[doc(hidden)] #[serde(rename = "organizations:indexed-spans-extraction")] DeprecatedExtractSpansFromEvent, - /// Enable the experimental Span Attachment subset of the Span V2 processing pipeline in Relay. - #[serde(rename = "projects:span-v2-attachment-processing")] - SpanV2AttachmentProcessing, - /// Enable the experimental Trace Attachment pipeline in Relay. - #[serde(rename = "projects:trace-attachment-processing")] - TraceAttachmentProcessing, - /// Enable the new Replay pipeline in Relay. - #[serde(rename = "organizations:new-replay-processing")] - NewReplayProcessing, /// Forward compatibility. #[doc(hidden)] #[serde(other)] diff --git a/relay-server/src/constants.rs b/relay-server/src/constants.rs index 8ec401609c..d359ad6749 100644 --- a/relay-server/src/constants.rs +++ b/relay-server/src/constants.rs @@ -3,6 +3,9 @@ use std::time::Duration; include!(concat!(env!("OUT_DIR"), "/constants.gen.rs")); +/// Name of the custom tag in the crash user data for Sentry event payloads. +pub const SENTRY_CRASH_PAYLOAD_KEY: &str = "__sentry"; + /// Name of the event attachment. /// /// This is a special attachment that can contain a sentry event payload encoded as message pack. diff --git a/relay-server/src/envelope/mod.rs b/relay-server/src/envelope/mod.rs index 4ae4fec494..7a1ba33894 100644 --- a/relay-server/src/envelope/mod.rs +++ b/relay-server/src/envelope/mod.rs @@ -210,6 +210,15 @@ impl EnvelopeHeaders { pub fn sent_at(&self) -> Option> { self.sent_at } + + /// Returns the specified header value, if present. + pub fn get_header(&self, name: &K) -> Option<&Value> + where + String: Borrow, + K: Ord + ?Sized, + { + self.other.get(name) + } } #[doc(hidden)] @@ -476,7 +485,7 @@ impl Envelope { String: Borrow, K: Ord + ?Sized, { - self.headers.other.get(name) + self.headers.get_header(name) } /// Sets the specified header value, returning the previous one if present. diff --git a/relay-server/src/managed/counted.rs b/relay-server/src/managed/counted.rs index dbe85f0bbd..61b983d388 100644 --- a/relay-server/src/managed/counted.rs +++ b/relay-server/src/managed/counted.rs @@ -211,7 +211,7 @@ where } } -impl Counted for Vec { +impl Counted for [T] { fn quantities(&self) -> Quantities { let mut quantities = BTreeMap::new(); for element in self { @@ -223,14 +223,14 @@ impl Counted for Vec { } } +impl Counted for Vec { + fn quantities(&self) -> Quantities { + self.as_slice().quantities() + } +} + impl Counted for SmallVec<[T; N]> { fn quantities(&self) -> Quantities { - let mut quantities = BTreeMap::new(); - for element in self { - for (category, size) in element.quantities() { - *quantities.entry(category).or_default() += size; - } - } - quantities.into_iter().collect() + self.as_slice().quantities() } } diff --git a/relay-server/src/processing/common.rs b/relay-server/src/processing/common.rs index ecf8682c97..d127c1e5a4 100644 --- a/relay-server/src/processing/common.rs +++ b/relay-server/src/processing/common.rs @@ -1,9 +1,8 @@ use crate::Envelope; use crate::managed::{Managed, Rejected}; use crate::processing::ForwardContext; -#[cfg(feature = "processing")] -use crate::processing::StoreHandle; use crate::processing::check_ins::CheckInsProcessor; +use crate::processing::errors::ErrorsProcessor; use crate::processing::logs::LogsProcessor; use crate::processing::profile_chunks::ProfileChunksProcessor; use crate::processing::replays::ReplaysProcessor; @@ -37,7 +36,7 @@ macro_rules! outputs { #[cfg(feature = "processing")] fn forward_store( self, - s: StoreHandle<'_>, + s: crate::processing::StoreHandle<'_>, ctx: ForwardContext<'_>, ) -> Result<(), Rejected<()>> { match self { @@ -60,6 +59,7 @@ macro_rules! outputs { outputs!( CheckIns => CheckInsProcessor, + Errors => ErrorsProcessor, Logs => LogsProcessor, ProfileChunks => ProfileChunksProcessor, Sessions => SessionsProcessor, diff --git a/relay-server/src/processing/errors/dynamic_sampling.rs b/relay-server/src/processing/errors/dynamic_sampling.rs new file mode 100644 index 0000000000..b07ef3a641 --- /dev/null +++ b/relay-server/src/processing/errors/dynamic_sampling.rs @@ -0,0 +1,89 @@ +use chrono::Utc; +use relay_event_schema::protocol::{Contexts, TraceContext}; +use relay_protocol::{Annotated, Empty as _}; +use relay_sampling::config::RuleType; +use relay_sampling::evaluation::SamplingEvaluator; + +use crate::managed::Managed; +use crate::processing::Context; +use crate::processing::errors::ExpandedError; +use crate::processing::errors::errors::SentryError as _; +use crate::utils::SamplingResult; + +/// Applies a dynamic sampling decision onto the error. +/// +/// The function validates the DSC as well as a tagging the error event with the sampling decision +/// of the associated trace. +pub async fn apply(error: &mut Managed, ctx: Context<'_>) { + // Only run in processing to not compute the decision multiple times and it is the most + // accurate place, as other Relays may have unsupported inbound filter or sampling configs. + if !ctx.is_processing() { + return; + } + + if ctx.sampling_project_info.is_none() { + // If there is a DSC, the current project does not have access to the sampling project + // -> remove the DSC. + error.modify(|error, _| error.headers.remove_dsc()); + return; + } + + if let Some(sampled) = is_trace_fully_sampled(error, ctx).await { + error.modify(|error, _| tag_error_with_sampling_decision(error, sampled)); + }; +} + +fn tag_error_with_sampling_decision(error: &mut ExpandedError, sampled: bool) { + let Some(event) = error.error.event_mut().value_mut() else { + return; + }; + + // We want to get the trace context, in which we will inject the `sampled` field. + let context = event + .contexts + .get_or_insert_with(Contexts::new) + .get_or_default::(); + + // We want to update `sampled` only if it was not set, since if we don't check this + // we will end up overriding the value set by downstream Relays and this will lead + // to more complex debugging in case of problems. + if context.sampled.is_empty() { + relay_log::trace!("tagged error with `sampled = {}` flag", sampled); + context.sampled = Annotated::new(sampled); + } +} + +/// Runs dynamic sampling if the dsc and root project state are not None and returns whether the +/// transactions received with such dsc and project state would be kept or dropped by dynamic +/// sampling. +async fn is_trace_fully_sampled(error: &ExpandedError, ctx: Context<'_>) -> Option { + let dsc = error.headers.dsc()?; + + let sampling_config = ctx + .sampling_project_info + .and_then(|s| s.config.sampling.as_ref()) + .and_then(|s| s.as_ref().ok())?; + + if sampling_config.unsupported() { + if ctx.is_processing() { + relay_log::error!("found unsupported rules even as processing relay"); + } + + return None; + } + + // If the sampled field is not set, we prefer to not tag the error since we have no clue on + // whether the head of the trace was kept or dropped on the client side. + // In addition, if the head of the trace was dropped on the client we will immediately mark + // the trace as not fully sampled. + if !dsc.sampled? { + return Some(false); + } + + let evaluator = SamplingEvaluator::new(Utc::now()); + + let rules = sampling_config.filter_rules(RuleType::Trace); + + let evaluation = evaluator.match_rules(*dsc.trace_id, dsc, rules).await; + Some(SamplingResult::from(evaluation).decision().is_keep()) +} diff --git a/relay-server/src/processing/errors/errors/apple_crash_report.rs b/relay-server/src/processing/errors/errors/apple_crash_report.rs new file mode 100644 index 0000000000..18753625e0 --- /dev/null +++ b/relay-server/src/processing/errors/errors/apple_crash_report.rs @@ -0,0 +1,75 @@ +use relay_event_schema::protocol::Event; +use relay_protocol::Annotated; + +use crate::envelope::{AttachmentType, ItemType, Items}; +use crate::managed::{Counted, Quantities}; +use crate::processing::errors::Result; +use crate::processing::errors::errors::{ + Context, ErrorRef, ErrorRefMut, ParsedError, SentryError, utils, +}; + +#[derive(Debug)] +pub struct AppleCrashReport { + pub event: Annotated, + pub attachments: Items, + pub user_reports: Items, +} + +impl SentryError for AppleCrashReport { + fn try_expand(items: &mut Items, _ctx: Context<'_>) -> Result>> { + let Some(apple_crash_report) = utils::take_item_by(items, |item| { + item.attachment_type() == Some(&AttachmentType::AppleCrashReport) + }) else { + return Ok(None); + }; + + let mut event = match utils::take_item_of_type(items, ItemType::Event) { + Some(event) => utils::event_from_json_payload(event, None)?, + None => Annotated::empty(), + }; + + // TODO: write metrics + crate::utils::process_apple_crash_report( + event.get_or_insert_with(Event::default), + &apple_crash_report.payload(), + ); + + let mut attachments = items + .drain_filter(|item| *item.ty() == ItemType::Attachment) + .collect::(); + attachments.push(apple_crash_report); + + let error = Self { + event, + attachments, + user_reports: utils::take_items_of_type(items, ItemType::UserReport), + }; + + Ok(Some(ParsedError { + error, + fully_normalized: false, + })) + } + + fn as_ref(&self) -> ErrorRef<'_> { + ErrorRef { + event: &self.event, + attachments: &self.attachments, + user_reports: &self.user_reports, + } + } + + fn as_ref_mut(&mut self) -> ErrorRefMut<'_> { + ErrorRefMut { + event: &mut self.event, + attachments: &mut self.attachments, + user_reports: Some(&mut self.user_reports), + } + } +} + +impl Counted for AppleCrashReport { + fn quantities(&self) -> Quantities { + self.as_ref().to_quantities() + } +} diff --git a/relay-server/src/processing/errors/errors/attachments.rs b/relay-server/src/processing/errors/errors/attachments.rs new file mode 100644 index 0000000000..4ee0d8eeeb --- /dev/null +++ b/relay-server/src/processing/errors/errors/attachments.rs @@ -0,0 +1,74 @@ +use relay_event_schema::protocol::Event; +use relay_protocol::Annotated; + +use crate::envelope::{AttachmentType, ItemType, Items}; +use crate::managed::{Counted, Quantities}; +use crate::processing::errors::Result; +use crate::processing::errors::errors::{ + Context, ErrorRef, ErrorRefMut, ParsedError, SentryError, utils, +}; + +#[derive(Debug)] +pub struct Attachments { + pub event: Annotated, + pub attachments: Items, + pub user_reports: Items, +} + +impl SentryError for Attachments { + fn try_expand(items: &mut Items, ctx: Context<'_>) -> Result>> { + let ev = utils::take_item_by(items, |item| { + item.attachment_type() == Some(&AttachmentType::EventPayload) + }); + let b1 = utils::take_item_by(items, |item| { + item.attachment_type() == Some(&AttachmentType::Breadcrumbs) + }); + let b2 = utils::take_item_by(items, |item| { + item.attachment_type() == Some(&AttachmentType::Breadcrumbs) + }); + + if ev.is_none() && b1.is_none() || b2.is_none() { + return Ok(None); + } + + let (event, _) = crate::services::processor::event::event_from_attachments( + ctx.processing.config, + ev, + b1, + b2, + )?; + + let error = Self { + event, + attachments: utils::take_items_of_type(items, ItemType::Attachment), + user_reports: utils::take_items_of_type(items, ItemType::UserReport), + }; + + Ok(Some(ParsedError { + error, + fully_normalized: false, + })) + } + + fn as_ref(&self) -> ErrorRef<'_> { + ErrorRef { + event: &self.event, + attachments: &self.attachments, + user_reports: &self.user_reports, + } + } + + fn as_ref_mut(&mut self) -> ErrorRefMut<'_> { + ErrorRefMut { + event: &mut self.event, + attachments: &mut self.attachments, + user_reports: Some(&mut self.user_reports), + } + } +} + +impl Counted for Attachments { + fn quantities(&self) -> Quantities { + self.as_ref().to_quantities() + } +} diff --git a/relay-server/src/processing/errors/errors/form_data.rs b/relay-server/src/processing/errors/errors/form_data.rs new file mode 100644 index 0000000000..49e9bebe92 --- /dev/null +++ b/relay-server/src/processing/errors/errors/form_data.rs @@ -0,0 +1,64 @@ +use relay_event_schema::protocol::Event; +use relay_protocol::Annotated; + +use crate::envelope::{ItemType, Items}; +use crate::managed::{Counted, Quantities}; +use crate::processing::errors::Result; +use crate::processing::errors::errors::{ + Context, ErrorRef, ErrorRefMut, ParsedError, SentryError, utils, +}; +use crate::services::processor::ProcessingError; + +#[derive(Debug)] +pub struct FormData { + pub event: Annotated, + pub attachments: Items, + pub user_reports: Items, +} + +impl SentryError for FormData { + fn try_expand(items: &mut Items, _ctx: Context<'_>) -> Result>> { + let Some(form_data) = utils::take_item_of_type(items, ItemType::FormData) else { + return Ok(None); + }; + + let event = { + let mut value = serde_json::Value::Object(Default::default()); + crate::services::processor::event::merge_formdata(&mut value, form_data); + Annotated::deserialize_with_meta(value).map_err(ProcessingError::InvalidJson) + }?; + + let error = Self { + event, + attachments: utils::take_items_of_type(items, ItemType::Attachment), + user_reports: utils::take_items_of_type(items, ItemType::UserReport), + }; + + Ok(Some(ParsedError { + error, + fully_normalized: false, + })) + } + + fn as_ref(&self) -> ErrorRef<'_> { + ErrorRef { + event: &self.event, + attachments: &self.attachments, + user_reports: &self.user_reports, + } + } + + fn as_ref_mut(&mut self) -> ErrorRefMut<'_> { + ErrorRefMut { + event: &mut self.event, + attachments: &mut self.attachments, + user_reports: Some(&mut self.user_reports), + } + } +} + +impl Counted for FormData { + fn quantities(&self) -> Quantities { + self.as_ref().to_quantities() + } +} diff --git a/relay-server/src/processing/errors/errors/generic.rs b/relay-server/src/processing/errors/errors/generic.rs new file mode 100644 index 0000000000..e9a5cbc6bd --- /dev/null +++ b/relay-server/src/processing/errors/errors/generic.rs @@ -0,0 +1,58 @@ +use relay_event_schema::protocol::Event; +use relay_protocol::Annotated; + +use crate::envelope::{ItemType, Items}; +use crate::managed::{Counted, Quantities}; +use crate::processing::errors::Result; +use crate::processing::errors::errors::{ + Context, ErrorRef, ErrorRefMut, ParsedError, SentryError, utils, +}; + +#[derive(Debug)] +pub struct Generic { + pub event: Annotated, + pub attachments: Items, + pub user_reports: Items, +} + +impl SentryError for Generic { + fn try_expand(items: &mut Items, _ctx: Context<'_>) -> Result>> { + let Some(ev) = utils::take_item_of_type(items, ItemType::Event) else { + return Ok(None); + }; + + let fully_normalized = ev.fully_normalized(); + let error = Self { + event: utils::event_from_json_payload(ev, None)?, + attachments: utils::take_items_of_type(items, ItemType::Attachment), + user_reports: utils::take_items_of_type(items, ItemType::UserReport), + }; + + Ok(Some(ParsedError { + error, + fully_normalized, + })) + } + + fn as_ref(&self) -> ErrorRef<'_> { + ErrorRef { + event: &self.event, + attachments: &self.attachments, + user_reports: &self.user_reports, + } + } + + fn as_ref_mut(&mut self) -> ErrorRefMut<'_> { + ErrorRefMut { + event: &mut self.event, + attachments: &mut self.attachments, + user_reports: Some(&mut self.user_reports), + } + } +} + +impl Counted for Generic { + fn quantities(&self) -> Quantities { + self.as_ref().to_quantities() + } +} diff --git a/relay-server/src/processing/errors/errors/minidump.rs b/relay-server/src/processing/errors/errors/minidump.rs new file mode 100644 index 0000000000..c5bacc0bb2 --- /dev/null +++ b/relay-server/src/processing/errors/errors/minidump.rs @@ -0,0 +1,75 @@ +use relay_event_schema::protocol::Event; +use relay_protocol::Annotated; + +use crate::envelope::{AttachmentType, ItemType, Items}; +use crate::managed::{Counted, Quantities}; +use crate::processing::errors::Result; +use crate::processing::errors::errors::{ + Context, ErrorRef, ErrorRefMut, ParsedError, SentryError, utils, +}; + +#[derive(Debug)] +pub struct Minidump { + pub event: Annotated, + pub attachments: Items, + pub user_reports: Items, +} + +impl SentryError for Minidump { + fn try_expand(items: &mut Items, _ctx: Context<'_>) -> Result>> { + let Some(minidump) = utils::take_item_by(items, |item| { + item.attachment_type() == Some(&AttachmentType::Minidump) + }) else { + return Ok(None); + }; + + let mut event = match utils::take_item_of_type(items, ItemType::Event) { + Some(event) => utils::event_from_json_payload(event, None)?, + None => Annotated::empty(), + }; + + // TODO: write metrics + crate::utils::process_minidump( + event.get_or_insert_with(Event::default), + &minidump.payload(), + ); + + let mut attachments = items + .drain_filter(|item| *item.ty() == ItemType::Attachment) + .collect::(); + attachments.push(minidump); + + let error = Self { + event, + attachments, + user_reports: utils::take_items_of_type(items, ItemType::UserReport), + }; + + Ok(Some(ParsedError { + error, + fully_normalized: false, + })) + } + + fn as_ref(&self) -> ErrorRef<'_> { + ErrorRef { + event: &self.event, + attachments: &self.attachments, + user_reports: &self.user_reports, + } + } + + fn as_ref_mut(&mut self) -> ErrorRefMut<'_> { + ErrorRefMut { + event: &mut self.event, + attachments: &mut self.attachments, + user_reports: Some(&mut self.user_reports), + } + } +} + +impl Counted for Minidump { + fn quantities(&self) -> Quantities { + self.as_ref().to_quantities() + } +} diff --git a/relay-server/src/processing/errors/errors/mod.rs b/relay-server/src/processing/errors/errors/mod.rs new file mode 100644 index 0000000000..e580887378 --- /dev/null +++ b/relay-server/src/processing/errors/errors/mod.rs @@ -0,0 +1,274 @@ +use relay_event_schema::protocol::Event; +use relay_protocol::{Annotated, Empty}; +use relay_quotas::DataCategory; + +use crate::envelope::{ContentType, EnvelopeHeaders, Item, ItemType, Items}; +use crate::managed::{Counted, Quantities}; +use crate::processing::errors::Result; +use crate::processing::{self, ForwardContext}; +use crate::services::processor::ProcessingError; +use crate::statsd::RelayTimers; + +mod apple_crash_report; +mod attachments; +mod form_data; +mod generic; +mod minidump; +mod nnswitch; +mod playstation; +mod raw_security; +mod security; +mod unreal; +mod user_report_v2; +mod utils; + +pub use self::apple_crash_report::*; +pub use self::attachments::*; +pub use self::form_data::*; +pub use self::generic::*; +pub use self::minidump::*; +pub use self::nnswitch::*; +pub use self::playstation::*; +pub use self::raw_security::*; +pub use self::security::*; +pub use self::unreal::*; +pub use self::user_report_v2::*; + +#[derive(Debug, Copy, Clone)] +pub struct ErrorRef<'a> { + pub event: &'a Annotated, + pub attachments: &'a [Item], + pub user_reports: &'a [Item], +} + +impl ErrorRef<'_> { + fn to_quantities(self) -> Quantities { + let mut quantities = self.attachments.quantities(); + quantities.extend(self.user_reports.quantities()); + quantities.push((DataCategory::Error, 1)); + quantities + } +} + +#[derive(Debug)] +pub struct ErrorRefMut<'a> { + pub event: &'a mut Annotated, + pub attachments: &'a mut Items, + pub user_reports: Option<&'a mut Items>, +} + +#[derive(Debug)] +pub struct ErrorItems { + pub event: Option, + pub attachments: Items, + pub user_reports: Items, + pub other: Items, +} + +impl From for Items { + fn from(value: ErrorItems) -> Self { + let ErrorItems { + event, + attachments, + user_reports, + other, + } = value; + + let mut items = attachments; + items.reserve_exact(event.is_some() as usize + user_reports.len() + other.len()); + items.extend(user_reports); + items.extend(other); + if let Some(event) = event { + items.push(event); + } + + items + } +} + +#[derive(Debug, Clone, Copy)] +pub struct Context<'a> { + pub envelope: &'a EnvelopeHeaders, + pub processing: processing::Context<'a>, +} + +#[cfg(test)] +impl Context<'static> { + /// Returns a [`Context`] with default values for testing. + pub fn for_test() -> Self { + use std::sync::LazyLock; + + static ENVELOPE: LazyLock> = + LazyLock::new(|| crate::testutils::new_envelope(false, "")); + + Self { + envelope: ENVELOPE.headers(), + processing: processing::Context::for_test(), + } + } +} + +/// A shape of error Sentry supports. +pub trait SentryError: Counted { + /// Attempts to parse this error from the passed [`items`]. + /// + /// If parsing modifies the parsed `items` it must either return an error, indicating the + /// passed items are invalid, or it must return a fully constructed [`Self`]. + /// + /// The parser may return `Ok(None)` when none of the passed items match this shape of error. + fn try_expand(items: &mut Items, ctx: Context<'_>) -> Result>> + where + Self: Sized; + + /// Post expansion processing phase for the error. + /// + /// Most error events do not need a specific post processing phase and should prefer doing + /// processing and validation during [expansion](Self::try_expand). + fn process(&mut self, ctx: Context<'_>) -> Result<()> { + let _ = ctx; + Ok(()) + } + + /// Serializes the error back into [`Items`], ready to be attached to an envelope. + /// + /// The default implementation serializes all items exposed through [`Self::as_ref_mut`]. + /// Errors which handle with more items must override this implementation. + fn serialize(mut self, _ctx: ForwardContext<'_>) -> Result + where + Self: Sized, + { + let ErrorRefMut { + event, + attachments, + user_reports, + } = self.as_ref_mut(); + + let event = std::mem::take(event); + let event = if !event.is_empty() { + let data = relay_statsd::metric!(timer(RelayTimers::EventProcessingSerialization), { + event.to_json().map_err(ProcessingError::SerializeFailed)? + }); + + let event_type = event + .value() + .and_then(|event| event.ty.value().copied()) + .unwrap_or_default(); + + let mut item = Item::new(ItemType::from_event_type(event_type)); + item.set_payload(ContentType::Json, data); + Some(item) + } else { + None + }; + + Ok(ErrorItems { + event, + attachments: std::mem::take(attachments), + user_reports: user_reports.map(std::mem::take).unwrap_or_default(), + other: Items::new(), + }) + } + + /// A reference to the contained error data. + fn as_ref(&self) -> ErrorRef<'_>; + /// A mutable reference to the contained error data. + fn as_ref_mut(&mut self) -> ErrorRefMut<'_>; + + /// A shorthand to access the contained error event. + fn event(&self) -> &Annotated { + self.as_ref().event + } + /// A shorthand to access the contained error event mutably. + fn event_mut(&mut self) -> &mut Annotated { + self.as_ref_mut().event + } +} + +macro_rules! gen_error_kind { + ($($name:ident,)*) => { + #[derive(Debug)] + pub enum ErrorKind { + $($name($name),)* + } + + impl SentryError for ErrorKind { + fn try_expand(items: &mut Items, ctx: Context<'_>) -> Result>> { + $( + if let Some(p) = <$name as SentryError>::try_expand(items, ctx)? { + return Ok(Some(ParsedError { + error: p.error.into(), + fully_normalized: p.fully_normalized, + })) + }; + )* + + Ok(None) + } + + fn process(&mut self, ctx: Context<'_>) -> Result<()> { + match self { + $(Self::$name(error) => error.process(ctx),)* + } + } + + fn serialize(self, ctx: ForwardContext<'_>) -> Result { + match self { + $(Self::$name(error) => error.serialize(ctx),)* + } + } + + fn as_ref(&self) -> ErrorRef<'_> { + match self { + $(Self::$name(error) => error.as_ref(),)* + } + } + + fn as_ref_mut(&mut self) -> ErrorRefMut<'_> { + match self { + $(Self::$name(error) => error.as_ref_mut(),)* + } + } + } + + $( + impl From<$name> for ErrorKind { + fn from(value: $name) -> Self { + Self::$name(value) + } + } + )* + + impl Counted for ErrorKind { + fn quantities(&self) -> Quantities { + match self { + $(Self::$name(error) => error.quantities(),)* + } + } + } + }; +} + +// Order of these types is important, from most specific to least specific. +// +// For example a Minidump crash may contain an error, which would also be picked up by the generic +// error. +gen_error_kind![ + Nnswitch, + Unreal, + Minidump, + AppleCrashReport, + Playstation, + Security, + RawSecurity, + UserReportV2, + FormData, + Attachments, + Generic, +]; + +// TODO: this may need a better name +#[derive(Debug)] +pub struct ParsedError { + pub error: T, + pub fully_normalized: bool, +} diff --git a/relay-server/src/processing/errors/errors/nnswitch.rs b/relay-server/src/processing/errors/errors/nnswitch.rs new file mode 100644 index 0000000000..b7a966bfcf --- /dev/null +++ b/relay-server/src/processing/errors/errors/nnswitch.rs @@ -0,0 +1,437 @@ +//! Nintendo Switch crash reports processor related code. +//! +//! These functions are included only in the processing mode. + +use bytes::{Buf, Bytes}; +use relay_event_schema::protocol::Event; +use relay_protocol::Annotated; +use std::sync::OnceLock; +use zstd::bulk::Decompressor as ZstdDecompressor; + +use crate::Envelope; +use crate::envelope::{EnvelopeError, Item, ItemType, Items}; +use crate::managed::{Counted, Quantities}; +use crate::processing::errors::Result; +use crate::processing::errors::errors::{ + Context, ErrorRef, ErrorRefMut, ParsedError, SentryError, utils, +}; +use crate::services::processor::ProcessingError; + +/// Magic number indicating the dying message file is encoded by sentry-switch SDK. +const SENTRY_MAGIC: &[u8] = "sntr".as_bytes(); + +/// The file name that Nintendo uses to in the events they forward. +const DYING_MESSAGE_FILENAME: &str = "dying_message.dat"; + +/// Limit the size of the decompressed data to prevent an invalid frame blowing up memory usage. +const MAX_DECOMPRESSED_SIZE: usize = 100_1024; + +#[derive(Debug)] +pub struct Nnswitch { + pub event: Annotated, + pub attachments: Items, + pub user_reports: Items, +} + +impl SentryError for Nnswitch { + fn try_expand(items: &mut Items, _ctx: Context<'_>) -> Result>> { + let Some(dying_message) = utils::take_item_by(items, is_dying_message) else { + return Ok(None); + }; + + let event = utils::take_item_of_type(items, ItemType::Event); + + let mut attachments = items + .drain_filter(|item| *item.ty() == ItemType::Attachment) + .collect::(); + + let dying_message = expand_dying_message(dying_message.payload()) + .map_err(ProcessingError::InvalidNintendoDyingMessage)?; + + attachments.extend(dying_message.attachments); + + let event = match (event, dying_message.event) { + (Some(event), Some(dying_message)) => merge_events(event, dying_message) + .map_err(SwitchProcessingError::InvalidJson) + .map_err(ProcessingError::InvalidNintendoDyingMessage)?, + (Some(event), None) => utils::event_from_json_payload(event, None)?, + (None, Some(event)) => utils::event_from_json_payload(event, None)?, + (None, None) => return Err(ProcessingError::NoEventPayload.into()), + }; + + let error = Self { + event, + attachments, + user_reports: utils::take_items_of_type(items, ItemType::UserReport), + }; + + Ok(Some(ParsedError { + error, + fully_normalized: false, + })) + } + + fn as_ref(&self) -> ErrorRef<'_> { + ErrorRef { + event: &self.event, + attachments: &self.attachments, + user_reports: &self.user_reports, + } + } + + fn as_ref_mut(&mut self) -> ErrorRefMut<'_> { + ErrorRefMut { + event: &mut self.event, + attachments: &mut self.attachments, + user_reports: Some(&mut self.user_reports), + } + } +} + +impl Counted for Nnswitch { + fn quantities(&self) -> Quantities { + self.as_ref().to_quantities() + } +} + +/// An error returned when parsing the dying message attachment. +#[derive(Debug, thiserror::Error)] +pub enum SwitchProcessingError { + #[error("invalid json")] + InvalidJson(#[source] serde_json::Error), + #[error("envelope parsing failed")] + EnvelopeParsing(#[from] EnvelopeError), + #[error("unexpected EOF, expected {expected:?}")] + UnexpectedEof { expected: &'static str }, + #[error("invalid {0:?} ({1:?})")] + InvalidValue(&'static str, usize), + #[error("Zstandard error")] + Zstandard(#[source] std::io::Error), +} + +fn merge_events( + from_envelope: Item, + from_dying_messages: Item, +) -> Result, serde_json::Error> { + let from_envelope = serde_json::from_slice(&from_envelope.payload())?; + let mut from_dying_message = serde_json::from_slice(&from_dying_messages.payload())?; + + // Uses the dying message as a base and fills it with values from the event. + crate::utils::merge_values(&mut from_dying_message, from_envelope); + + Annotated::::deserialize_with_meta(from_dying_message) +} + +fn is_dying_message(item: &crate::envelope::Item) -> bool { + item.ty() == &ItemType::Attachment + && item.payload().starts_with(SENTRY_MAGIC) + && item.filename() == Some(DYING_MESSAGE_FILENAME) +} + +#[derive(Debug, Default)] +struct ExpandedDyingMessage { + event: Option, + attachments: Items, +} + +/// Parses DyingMessage contents and updates the envelope. +/// See dying_message.md for the documentation. +fn expand_dying_message(mut payload: Bytes) -> Result { + payload.advance(SENTRY_MAGIC.len()); + let version = payload + .try_get_u8() + .map_err(|_| SwitchProcessingError::UnexpectedEof { + expected: "version", + })?; + + match version { + 0 => expand_dying_message_v0(payload), + _ => Err(SwitchProcessingError::InvalidValue( + "version", + version as usize, + )), + } +} + +/// DyingMessage protocol v0 parser. +fn expand_dying_message_v0( + mut payload: Bytes, +) -> Result { + let encoding_byte = payload + .try_get_u8() + .map_err(|_| SwitchProcessingError::UnexpectedEof { + expected: "encoding", + })?; + let format = (encoding_byte >> 6) & 0b0000_0011; + let compression = (encoding_byte >> 4) & 0b0000_0011; + let compression_arg = encoding_byte & 0b0000_1111; + + let compressed_length = + payload + .try_get_u16() + .map_err(|_| SwitchProcessingError::UnexpectedEof { + expected: "compressed data length", + })?; + let data = decompress_data( + payload, + compressed_length as usize, + compression, + compression_arg, + )?; + + match format { + 0 => expand_dying_message_from_envelope_items(data), + _ => Err(SwitchProcessingError::InvalidValue( + "payload format", + format as usize, + )), + } +} + +/// Merges envelope items with the ones contained in the DyingMessage +fn expand_dying_message_from_envelope_items( + data: Bytes, +) -> Result { + let mut items = + Envelope::parse_items_bytes(data).map_err(SwitchProcessingError::EnvelopeParsing)?; + + let event = utils::take_item_of_type(&mut items, ItemType::Event); + let attachments = items + .drain_filter(|item| *item.ty() == ItemType::Attachment) + .collect(); + + if !items.is_empty() { + // Ignore unsupported items instead of failing to keep forward compatibility. + relay_log::debug!( + "Ignoring {} unsupported items in the dying message", + items.len() + ); + } + + Ok(ExpandedDyingMessage { event, attachments }) +} + +fn decompress_data( + payload: Bytes, + compressed_length: usize, + compression: u8, + compression_arg: u8, +) -> Result { + if payload.len() < compressed_length { + return Err(SwitchProcessingError::InvalidValue( + "compressed data length", + compressed_length, + )); + } + + let data = payload.slice(0..compressed_length); + match compression { + // No compression + 0 => Ok(data), + // Zstandard + 1 => decompress_data_zstd(data, compression_arg) + .map(Bytes::from) + .map_err(SwitchProcessingError::Zstandard), + _ => Err(SwitchProcessingError::InvalidValue( + "compression format", + compression as usize, + )), + } +} + +fn get_zstd_dictionary(id: usize) -> Option<&'static zstd::dict::DecoderDictionary<'static>> { + // Inlined dictionary binary data. + static ZSTD_DICTIONARIES: &[&[u8]] = &[ + // index 0 = empty dictionary (a.k.a "none") + b"", + ]; + + // We initialize dictionaries (from their binary representation) only once and reuse them when decompressing. + static ZSTD_DEC_DICTIONARIES: OnceLock< + [zstd::dict::DecoderDictionary; ZSTD_DICTIONARIES.len()], + > = OnceLock::new(); + let dictionaries = ZSTD_DEC_DICTIONARIES.get_or_init(|| { + let mut dictionaries: [zstd::dict::DecoderDictionary; ZSTD_DICTIONARIES.len()] = + [zstd::dict::DecoderDictionary::new(ZSTD_DICTIONARIES[0])]; + for i in 0..ZSTD_DICTIONARIES.len() { + dictionaries[i] = zstd::dict::DecoderDictionary::new(ZSTD_DICTIONARIES[i]); + } + dictionaries + }); + + dictionaries.get(id) +} + +fn decompress_data_zstd(data: Bytes, dictionary_id: u8) -> std::io::Result> { + let dictionary = get_zstd_dictionary(dictionary_id as usize) + .ok_or(std::io::Error::other("Unknown compression dictionary"))?; + + let mut decompressor = ZstdDecompressor::with_prepared_dictionary(dictionary)?; + decompressor.decompress(data.as_ref(), MAX_DECOMPRESSED_SIZE) +} + +#[cfg(test)] +mod tests { + use relay_protocol::assert_annotated_snapshot; + use std::io::Write; + + use super::*; + use crate::envelope::{ContentType, Item}; + use zstd::bulk::Compressor as ZstdCompressor; + + #[test] + fn test_is_dying_message() { + let mut item = Item::new(ItemType::Attachment); + item.set_filename("any"); + item.set_payload(ContentType::OctetStream, Bytes::from("sntrASDF")); + assert!(!is_dying_message(&item)); + item.set_filename(DYING_MESSAGE_FILENAME); + assert!(is_dying_message(&item)); + item.set_payload(ContentType::OctetStream, Bytes::from("FOO")); + assert!(!is_dying_message(&item)); + } + + fn create_envelope_items(dying_message: Bytes) -> Items { + // Note: the attachment length specified in the "outer" envelope attachment is very important. + // Otherwise parsing would fail because the inner one can contain line-breaks. + let envelope = + r#"{"event_id":"9ec79c33ec9942ab8353589fcb2e04dc","dsn":"https://e12d836b15bb49d7bbf99e64295d995b:@sentry.io/42"} +{"type":"event"} +{"message":"hello world","level":"error","map":{"a":"val"}} +{"type":"attachment","filename":"dying_message.dat","length":} +"#.replace("", &dying_message.len().to_string()); + + let mut envelope = + Envelope::parse_bytes([Bytes::from(envelope), dying_message].concat().into()).unwrap(); + + envelope.take_items_by(|_| true) + } + + #[test] + fn test_expand_uncompressed_envelope_items() { + // The attachment content is as follows: + // - 4 bytes magic = sntr + // - 1 byte version = 0 + // - 1 byte encoding = 0b0000_0000 - i.e. envelope items, uncompressed + // - 2 bytes data length = 98 bytes - 0x0062 in big endian representation + // - 98 bytes of content + let mut items = create_envelope_items(Bytes::from( + "sntr\0\0\0\x62\ + {\"type\":\"event\"}\n\ + {\"foo\":\"bar\",\"level\":\"info\",\"map\":{\"b\":\"c\"}}\n\ + {\"type\":\"attachment\",\"length\":2}\n\ + Hi\n", + )); + + assert_eq!(items[1].ty(), &ItemType::Attachment); + assert_eq!(items[1].filename(), Some(DYING_MESSAGE_FILENAME)); + assert_eq!(items[1].payload().len(), 106); + + let parsed = Nnswitch::try_expand(&mut items, Context::for_test()) + .unwrap() + .unwrap(); + + assert_annotated_snapshot!(parsed.error.event, @r#" + { + "level": "info", + "logentry": { + "formatted": "hello world" + }, + "foo": "bar", + "map": { + "a": "val", + "b": "c" + } + } + "#); + assert_eq!(parsed.error.attachments.len(), 1); + assert_eq!(parsed.error.attachments[0].ty(), &ItemType::Attachment); + assert_eq!(parsed.error.attachments[0].filename(), None); + assert_eq!(parsed.error.attachments[0].payload(), "Hi".as_bytes()); + } + + #[test] + fn test_expand_compressed_envelope_items() { + // encoding 0x10 - i.e. envelope items, Zstandard compressed, no dictionary + let dying_message = create_compressed_dying_message(0x10); + let mut items = create_envelope_items(dying_message.into()); + + let parsed = Nnswitch::try_expand(&mut items, Context::for_test()) + .unwrap() + .unwrap(); + + assert_annotated_snapshot!(parsed.error.event, @r#" + { + "level": "info", + "logentry": { + "formatted": "hello world" + }, + "foo": "bar", + "map": { + "a": "val", + "b": "c" + } + } + "#); + assert_eq!(parsed.error.attachments.len(), 1); + assert_eq!(parsed.error.attachments[0].ty(), &ItemType::Attachment); + assert_eq!(parsed.error.attachments[0].filename(), None); + assert_eq!(parsed.error.attachments[0].payload(), "Hi".as_bytes()); + } + + fn create_compressed_dying_message(encoding: u8) -> Vec { + // The attachment content is as follows: + // - 4 bytes magic = sntr + // - 1 byte version = 0 + // - 1 byte encoding + // - 2 bytes data length = N bytes - in big endian representation + // - N bytes of compressed content (Zstandard) + let mut compressor = ZstdCompressor::new(3).unwrap(); + let compressed_data = compressor + .compress( + b"\ + {\"type\":\"event\"}\n\ + {\"foo\":\"bar\",\"level\":\"info\",\"map\":{\"b\":\"c\"}}\n\ + {\"type\":\"attachment\",\"length\":2}\n\ + Hi\n\ + ", + ) + .unwrap(); + let mut dying_message: Vec = Vec::new(); + dying_message.write_all(b"sntr\0").unwrap(); + dying_message.write_all(&[encoding]).unwrap(); + dying_message + .write_all(&(compressed_data.len() as u16).to_be_bytes()) + .unwrap(); + dying_message.write_all(&compressed_data).unwrap(); + dying_message + } + + #[test] + fn test_expand_fails_with_unknown_dictioary() { + // encoding 0x10 - i.e. envelope items, Zstandard compressed, dictionary ID 1 + let dying_message = create_compressed_dying_message(0b0001_0001); + let mut items = create_envelope_items(dying_message.into()); + + assert!(Nnswitch::try_expand(&mut items, Context::for_test()).is_err()); + } + + #[test] + fn test_expand_fails_on_invalid_data() { + let mut items = create_envelope_items(Bytes::from( + "sntr\0\0\0\x62\ + {\"type\":\"event\"}\n\ + ", + )); + assert!(Nnswitch::try_expand(&mut items, Context::for_test()).is_err()); + } + + #[test] + fn test_expand_works_with_empty_data() { + let mut items = create_envelope_items(Bytes::from("sntr\0\0\0\0")); + + let _ = Nnswitch::try_expand(&mut items, Context::for_test()) + .unwrap() + .unwrap(); + } +} diff --git a/relay-server/src/processing/errors/errors/playstation.rs b/relay-server/src/processing/errors/errors/playstation.rs new file mode 100644 index 0000000000..a54d38e98a --- /dev/null +++ b/relay-server/src/processing/errors/errors/playstation.rs @@ -0,0 +1,163 @@ +use relay_dynamic_config::Feature; +use relay_event_schema::protocol::Event; +use relay_protocol::Annotated; + +use crate::envelope::{AttachmentType, Item, ItemType, Items}; +use crate::managed::{Counted, Quantities}; +use crate::processing::errors::Result; +use crate::processing::errors::errors::{ + Context, ErrorRef, ErrorRefMut, ParsedError, SentryError, utils, +}; + +#[derive(Debug)] +pub struct Playstation { + pub event: Annotated, + pub attachments: Items, + pub user_reports: Items, +} + +impl SentryError for Playstation { + #[cfg(not(sentry))] + fn try_expand(_items: &mut Items, _ctx: Context<'_>) -> Result>> { + Ok(None) + } + + #[cfg(sentry)] + fn try_expand(items: &mut Items, ctx: Context<'_>) -> Result>> { + use crate::constants::SENTRY_CRASH_PAYLOAD_KEY; + use crate::services::processor::ProcessingError; + use crate::statsd::RelayCounters; + + if ctx.processing.should_filter(Feature::PlaystationIngestion) { + return Ok(None); + } + + let Some(prosperodump) = utils::take_item_by(items, |item| { + item.attachment_type() == Some(&AttachmentType::Prosperodump) + }) else { + return Ok(None); + }; + + relay_statsd::metric!(counter(RelayCounters::PlaystationProcessing) += 1); + + let data = relay_prosperoconv::extract_data(&prosperodump.payload()).map_err(|err| { + ProcessingError::InvalidPlaystationDump(format!("Failed to extract data: {err}")) + })?; + let prospero_dump = relay_prosperoconv::ProsperoDump::parse(&data).map_err(|err| { + ProcessingError::InvalidPlaystationDump(format!("Failed to parse dump: {err}")) + })?; + let minidump_buffer = relay_prosperoconv::write_dump(&prospero_dump).map_err(|err| { + ProcessingError::InvalidPlaystationDump(format!("Failed to create minidump: {err}")) + })?; + + let event = utils::take_item_of_type(items, ItemType::Event); + let prospero_event = prospero_dump.userdata.get(SENTRY_CRASH_PAYLOAD_KEY); + + let mut event = match (event, prospero_event) { + (Some(event), Some(prospero)) => { + merge_events(&event, prospero.as_bytes()).map_err(ProcessingError::InvalidJson)? + } + (Some(event), None) => utils::event_from_json_payload(event, None)?, + (None, Some(prospero)) => utils::event_from_json(prospero.as_bytes(), None)?, + (None, None) => Annotated::empty(), + }; + + // If "__sentry" is not a key in the userdata do the legacy extraction. + // This should be removed once all customers migrated to the new format. + if prospero_event.is_none() { + crate::services::processor::playstation::legacy_userdata_extraction( + event.get_or_insert_with(Default::default), + &prospero_dump, + ); + } + crate::services::processor::playstation::merge_playstation_context( + event.get_or_insert_with(Default::default), + &prospero_dump, + ); + + let mut attachments = items + .drain_filter(|item| *item.ty() == ItemType::Attachment) + .collect::(); + + attachments.push({ + let mut item = Item::new(ItemType::Attachment); + item.set_filename("generated_minidump.dmp"); + item.set_payload(crate::envelope::ContentType::Minidump, minidump_buffer); + item.set_attachment_type(AttachmentType::Minidump); + item + }); + + attachments.extend(prospero_dump.files.iter().map(|file| { + let mut item = Item::new(ItemType::Attachment); + item.set_filename(file.name); + item.set_attachment_type(AttachmentType::Attachment); + item.set_payload( + crate::services::processor::playstation::infer_content_type(file.name), + file.contents.to_owned(), + ); + item + })); + + let console_log = { + let mut console_log = prospero_dump.system_log.into_owned(); + console_log.extend(prospero_dump.log_lines); + console_log + }; + if !console_log.is_empty() { + attachments.push({ + let mut item = Item::new(ItemType::Attachment); + item.set_filename("console.log"); + item.set_payload(crate::envelope::ContentType::Text, console_log.into_bytes()); + item.set_attachment_type(AttachmentType::Attachment); + item + }) + } + + let error = Self { + event, + attachments, + user_reports: utils::take_items_of_type(items, ItemType::UserReport), + }; + + Ok(Some(ParsedError { + error, + fully_normalized: false, + })) + } + + fn as_ref(&self) -> ErrorRef<'_> { + ErrorRef { + event: &self.event, + attachments: &self.attachments, + user_reports: &self.user_reports, + } + } + + fn as_ref_mut(&mut self) -> ErrorRefMut<'_> { + ErrorRefMut { + event: &mut self.event, + attachments: &mut self.attachments, + user_reports: Some(&mut self.user_reports), + } + } +} + +impl Counted for Playstation { + fn quantities(&self) -> Quantities { + self.as_ref().to_quantities() + } +} + +#[cfg(sentry)] +fn merge_events( + from_envelope: &Item, + from_prospero: &[u8], +) -> Result, serde_json::Error> { + let from_envelope = serde_json::from_slice(&from_envelope.payload())?; + let mut from_prospero = serde_json::from_slice(from_prospero)?; + + // Uses the dying message as a base and fills it with values from the event. + crate::utils::merge_values(&mut from_prospero, from_envelope); + + Annotated::::deserialize_with_meta(from_prospero) +} diff --git a/relay-server/src/processing/errors/errors/raw_security.rs b/relay-server/src/processing/errors/errors/raw_security.rs new file mode 100644 index 0000000000..a1332b828f --- /dev/null +++ b/relay-server/src/processing/errors/errors/raw_security.rs @@ -0,0 +1,62 @@ +use relay_event_schema::protocol::Event; +use relay_protocol::Annotated; + +use crate::envelope::{ItemType, Items}; +use crate::managed::{Counted, Quantities}; +use crate::processing::errors::Result; +use crate::processing::errors::errors::{ + Context, ErrorRef, ErrorRefMut, ParsedError, SentryError, utils, +}; + +#[derive(Debug)] +pub struct RawSecurity { + pub event: Annotated, + pub attachments: Items, + pub user_reports: Items, +} + +impl SentryError for RawSecurity { + fn try_expand(items: &mut Items, ctx: Context<'_>) -> Result>> { + let Some(item) = utils::take_item_of_type(items, ItemType::RawSecurity) else { + return Ok(None); + }; + + let (event, _) = crate::services::processor::event::event_from_security_report( + item, + ctx.envelope.meta(), + )?; + + let error = Self { + event, + attachments: utils::take_items_of_type(items, ItemType::Attachment), + user_reports: utils::take_items_of_type(items, ItemType::UserReport), + }; + + Ok(Some(ParsedError { + error, + fully_normalized: false, + })) + } + + fn as_ref(&self) -> ErrorRef<'_> { + ErrorRef { + event: &self.event, + attachments: &self.attachments, + user_reports: &self.user_reports, + } + } + + fn as_ref_mut(&mut self) -> ErrorRefMut<'_> { + ErrorRefMut { + event: &mut self.event, + attachments: &mut self.attachments, + user_reports: Some(&mut self.user_reports), + } + } +} + +impl Counted for RawSecurity { + fn quantities(&self) -> Quantities { + self.as_ref().to_quantities() + } +} diff --git a/relay-server/src/processing/errors/errors/security.rs b/relay-server/src/processing/errors/errors/security.rs new file mode 100644 index 0000000000..4101be1158 --- /dev/null +++ b/relay-server/src/processing/errors/errors/security.rs @@ -0,0 +1,57 @@ +use relay_event_schema::protocol::Event; +use relay_protocol::Annotated; + +use crate::envelope::{ItemType, Items}; +use crate::managed::{Counted, Quantities}; +use crate::processing::errors::Result; +use crate::processing::errors::errors::{ + Context, ErrorRef, ErrorRefMut, ParsedError, SentryError, utils, +}; + +#[derive(Debug)] +pub struct Security { + pub event: Annotated, + pub attachments: Items, + pub user_reports: Items, +} + +impl SentryError for Security { + fn try_expand(items: &mut Items, _ctx: Context<'_>) -> Result>> { + let Some(ev) = utils::take_item_of_type(items, ItemType::Security) else { + return Ok(None); + }; + + let error = Self { + event: utils::event_from_json_payload(ev, None)?, + attachments: utils::take_items_of_type(items, ItemType::Attachment), + user_reports: utils::take_items_of_type(items, ItemType::UserReport), + }; + + Ok(Some(ParsedError { + error, + fully_normalized: false, + })) + } + + fn as_ref(&self) -> ErrorRef<'_> { + ErrorRef { + event: &self.event, + attachments: &self.attachments, + user_reports: &self.user_reports, + } + } + + fn as_ref_mut(&mut self) -> ErrorRefMut<'_> { + ErrorRefMut { + event: &mut self.event, + attachments: &mut self.attachments, + user_reports: Some(&mut self.user_reports), + } + } +} + +impl Counted for Security { + fn quantities(&self) -> Quantities { + self.as_ref().to_quantities() + } +} diff --git a/relay-server/src/processing/errors/errors/unreal.rs b/relay-server/src/processing/errors/errors/unreal.rs new file mode 100644 index 0000000000..1c592d5bfa --- /dev/null +++ b/relay-server/src/processing/errors/errors/unreal.rs @@ -0,0 +1,93 @@ +use relay_event_schema::protocol::Event; +use relay_protocol::Annotated; + +use crate::envelope::{ItemType, Items}; +use crate::managed::{Counted, Quantities}; +use crate::processing::errors::Result; +use crate::processing::errors::errors::{ + Context, ErrorRef, ErrorRefMut, ParsedError, SentryError, utils, +}; +use crate::services::processor::ProcessingError; +use crate::utils::UnrealExpansion; + +#[derive(Debug)] +pub struct Unreal { + pub event: Annotated, + pub attachments: Items, + pub user_reports: Items, +} + +impl SentryError for Unreal { + fn try_expand(items: &mut Items, ctx: Context<'_>) -> Result>> { + let Some(report) = utils::take_item_of_type(items, ItemType::UnrealReport) else { + return Ok(None); + }; + + let UnrealExpansion { + event, + mut attachments, + } = crate::utils::expand_unreal(report, ctx.processing.config)?; + + let event = match utils::take_item_of_type(items, ItemType::Event).or(event) { + Some(event) => utils::event_from_json_payload(event, None)?, + // `process` later fills this event in, ideally the event is already filled in here, + // during the expansion, it is split into two phases now, to keep compatibility with + // the existing unreal code. + None => Annotated::empty(), + }; + + attachments.extend(items.drain_filter(|item| *item.ty() == ItemType::Attachment)); + + let error = Self { + event, + attachments, + user_reports: utils::take_items_of_type(items, ItemType::UserReport), + }; + + Ok(Some(ParsedError { + error, + fully_normalized: false, + })) + } + + fn process(&mut self, ctx: Context<'_>) -> Result<()> { + let event_id = ctx.envelope.event_id().unwrap_or_default(); + debug_assert_ne!(event_id, Default::default(), "event id must always be set"); + + let user_header = ctx + .envelope + .get_header(crate::constants::UNREAL_USER_HEADER) + .and_then(|v| v.as_str()); + + if let Some(result) = + crate::utils::process_unreal(event_id, &mut self.event, &self.attachments, user_header) + .map_err(ProcessingError::InvalidUnrealReport)? + { + self.user_reports.extend(result.user_reports); + } + + Ok(()) + } + + fn as_ref(&self) -> ErrorRef<'_> { + ErrorRef { + event: &self.event, + attachments: &self.attachments, + user_reports: &self.user_reports, + } + } + + fn as_ref_mut(&mut self) -> ErrorRefMut<'_> { + ErrorRefMut { + event: &mut self.event, + attachments: &mut self.attachments, + user_reports: Some(&mut self.user_reports), + } + } +} + +impl Counted for Unreal { + fn quantities(&self) -> Quantities { + self.as_ref().to_quantities() + } +} diff --git a/relay-server/src/processing/errors/errors/user_report_v2.rs b/relay-server/src/processing/errors/errors/user_report_v2.rs new file mode 100644 index 0000000000..8abe209bfc --- /dev/null +++ b/relay-server/src/processing/errors/errors/user_report_v2.rs @@ -0,0 +1,61 @@ +use relay_base_schema::events::EventType; +use relay_event_schema::protocol::Event; +use relay_protocol::Annotated; + +use crate::envelope::{ItemType, Items}; +use crate::managed::{Counted, Quantities}; +use crate::processing::errors::Result; +use crate::processing::errors::errors::{ + Context, ErrorRef, ErrorRefMut, ParsedError, SentryError, utils, +}; +use crate::statsd::RelayCounters; + +#[derive(Debug)] +pub struct UserReportV2 { + pub event: Annotated, + pub attachments: Items, +} + +impl SentryError for UserReportV2 { + fn try_expand(items: &mut Items, _ctx: Context<'_>) -> Result>> { + let Some(ev) = utils::take_item_of_type(items, ItemType::UserReportV2) else { + return Ok(None); + }; + + let error = Self { + event: utils::event_from_json_payload(ev, EventType::UserReportV2)?, + attachments: utils::take_items_of_type(items, ItemType::Attachment), + }; + + relay_statsd::metric!( + counter(RelayCounters::FeedbackAttachments) += error.attachments.len() as u64 + ); + + Ok(Some(ParsedError { + error, + fully_normalized: false, + })) + } + + fn as_ref(&self) -> ErrorRef<'_> { + ErrorRef { + event: &self.event, + attachments: &self.attachments, + user_reports: &[], + } + } + + fn as_ref_mut(&mut self) -> ErrorRefMut<'_> { + ErrorRefMut { + event: &mut self.event, + attachments: &mut self.attachments, + user_reports: None, + } + } +} + +impl Counted for UserReportV2 { + fn quantities(&self) -> Quantities { + self.as_ref().to_quantities() + } +} diff --git a/relay-server/src/processing/errors/errors/utils.rs b/relay-server/src/processing/errors/errors/utils.rs new file mode 100644 index 0000000000..b36c52f9cb --- /dev/null +++ b/relay-server/src/processing/errors/errors/utils.rs @@ -0,0 +1,51 @@ +use relay_base_schema::events::EventType; +use relay_event_schema::protocol::Event; +use relay_protocol::Annotated; + +use crate::envelope::{Item, ItemType, Items}; +use crate::processing::errors::{Error, Result}; + +pub fn take_item_by(items: &mut Items, f: F) -> Option +where + F: FnMut(&Item) -> bool, +{ + let index = items.iter().position(f); + index.map(|index| items.swap_remove(index)) +} + +pub fn take_item_of_type(items: &mut Items, ty: ItemType) -> Option { + take_item_by(items, |item| item.ty() == &ty) +} + +pub fn take_items_by(items: &mut Items, mut f: F) -> Items +where + F: FnMut(&Item) -> bool, +{ + items.drain_filter(|item| f(item)).collect() +} + +pub fn take_items_of_type(items: &mut Items, ty: ItemType) -> Items { + take_items_by(items, |item| item.ty() == &ty) +} + +pub fn event_from_json_payload( + item: Item, + event_type: impl Into>, +) -> Result> { + event_from_json(&item.payload(), event_type) +} + +pub fn event_from_json( + payload: &[u8], + event_type: impl Into>, +) -> Result> { + let mut event = Annotated::::from_json_bytes(payload).map_err(Error::InvalidJson)?; + + if let Some(event_value) = event.value_mut() + && let Some(event_type) = event_type.into() + { + event_value.ty.set_value(Some(event_type)); + } + + Ok(event) +} diff --git a/relay-server/src/processing/errors/filter.rs b/relay-server/src/processing/errors/filter.rs new file mode 100644 index 0000000000..1fb0c5cf7d --- /dev/null +++ b/relay-server/src/processing/errors/filter.rs @@ -0,0 +1,13 @@ +use crate::managed::Managed; +use crate::processing::errors::errors::SentryError as _; +use crate::processing::errors::{ExpandedError, Result}; +use crate::processing::{self, Context}; +use crate::services::processor::ProcessingError; + +/// Runs inbound filters on the [`ExpandedError`]. +pub fn filter(error: &Managed, ctx: Context<'_>) -> Result<()> { + let _ = processing::utils::event::filter(&error.headers, error.error.event(), ctx) + .map_err(ProcessingError::EventFiltered)?; + + Ok(()) +} diff --git a/relay-server/src/processing/errors/mod.rs b/relay-server/src/processing/errors/mod.rs new file mode 100644 index 0000000000..f09f5a54b2 --- /dev/null +++ b/relay-server/src/processing/errors/mod.rs @@ -0,0 +1,208 @@ +use std::sync::Arc; + +use crate::Envelope; +use crate::envelope::{EnvelopeHeaders, Item, ItemType, Items}; +use crate::managed::{ + Counted, Managed, ManagedEnvelope, ManagedResult as _, OutcomeError, Quantities, Rejected, +}; +use crate::processing::errors::errors::SentryError as _; +use crate::processing::utils::event::EventFullyNormalized; +use crate::processing::{self, Context, Forward, Output, QuotaRateLimiter}; +use crate::services::outcome::{DiscardReason, Outcome}; +use crate::services::processor::ProcessingError; + +mod dynamic_sampling; +#[allow( + clippy::module_inception, + reason = "all error types of the errors processor" +)] +mod errors; +mod filter; +mod process; + +pub use errors::SwitchProcessingError; +use relay_event_normalization::GeoIpLookup; +use relay_event_schema::protocol::Metrics; + +type Result = std::result::Result; + +#[derive(Debug, thiserror::Error)] +pub enum Error { + #[error("TODO")] + InvalidJson(serde_json::Error), + #[error("envelope processor failed")] + ProcessingFailed(#[from] ProcessingError), +} + +impl OutcomeError for Error { + type Error = Error; + + fn consume(self) -> (Option, Self::Error) { + let outcome = match &self { + Self::InvalidJson(_) => Some(Outcome::Invalid(DiscardReason::InvalidJson)), + Self::ProcessingFailed(e) => e.to_outcome(), + }; + (outcome, self) + } +} + +/// A processor for Error Events. +/// +/// It processes all kinds of error events, user feedback, crashes, ... +pub struct ErrorsProcessor { + limiter: Arc, + geoip_lookup: GeoIpLookup, +} + +impl ErrorsProcessor { + /// Creates a new [`Self`]. + pub fn new(limiter: Arc, geoip_lookup: GeoIpLookup) -> Self { + Self { + limiter, + geoip_lookup, + } + } +} + +impl processing::Processor for ErrorsProcessor { + type UnitOfWork = SerializedError; + type Output = ErrorOutput; + type Error = Error; + + fn prepare_envelope( + &self, + envelope: &mut ManagedEnvelope, + ) -> Option> { + let has_transaction = envelope + .envelope() + .items() + .any(|item| item.ty() == &ItemType::Transaction); + + if has_transaction { + return None; + } + + let items = envelope.envelope_mut().take_items_by(Item::requires_event); + + let errors = SerializedError { + headers: envelope.envelope().headers().clone(), + items, + }; + + Some(Managed::with_meta_from(envelope, errors)) + } + + async fn process( + &self, + error: Managed, + ctx: Context<'_>, + ) -> Result, Rejected> { + let mut error = process::expand(error, ctx); + + process::process(&mut error, ctx)?; + + process::finalize(&mut error, ctx)?; + process::normalize(&mut error, &self.geoip_lookup, ctx)?; + + filter::filter(&error, ctx).reject(&error)?; + + dynamic_sampling::apply(&mut error, ctx).await; + + let mut error = self.limiter.enforce_quotas(error, ctx).await?; + + process::scrub(&mut error, ctx)?; + + Ok(Output::just(ErrorOutput(error))) + } +} + +#[derive(Debug)] +pub struct SerializedError { + /// Original envelope headers. + headers: EnvelopeHeaders, + /// List of items which can be processed as an error. + /// + /// This is a mixture of items all of which return `true` for [`Item::requires_event`]. + items: Items, +} + +impl Counted for SerializedError { + fn quantities(&self) -> Quantities { + self.items.quantities() + } +} + +#[derive(Debug)] +struct Flags { + pub fully_normalized: EventFullyNormalized, +} + +#[derive(Debug)] +struct ExpandedError { + // TODO: event_id is a very important header, maybe pull it out to a field + pub headers: EnvelopeHeaders, + pub flags: Flags, + pub metrics: Metrics, + + pub error: errors::ErrorKind, +} + +impl Counted for ExpandedError { + fn quantities(&self) -> Quantities { + self.error.quantities() + } +} + +impl processing::RateLimited for Managed { + type Output = Self; + type Error = Error; + + async fn enforce( + self, + _rate_limiter: R, + _ctx: processing::Context<'_>, + ) -> Result> + where + R: processing::RateLimiter, + { + Ok(self) + } +} + +#[derive(Debug)] +pub struct ErrorOutput(Managed); + +impl Forward for ErrorOutput { + fn serialize_envelope( + self, + ctx: processing::ForwardContext<'_>, + ) -> Result>, Rejected<()>> { + self.0 + .try_map(|errors, _| { + let mut items = errors.error.serialize(ctx)?; + + if let Some(event) = items.event.as_mut() { + event.set_fully_normalized(errors.flags.fully_normalized.0); + } + + // TODO: size limits? + + Ok::<_, Error>(Envelope::from_parts(errors.headers, items.into())) + }) + .map_err(|err| err.map(|_| ())) + } + + #[cfg(feature = "processing")] + fn forward_store( + self, + s: processing::StoreHandle<'_>, + ctx: processing::ForwardContext<'_>, + ) -> Result<(), Rejected<()>> { + let envelope = self.serialize_envelope(ctx)?; + let envelope = ManagedEnvelope::from(envelope).into_processed(); + + s.store(crate::services::store::StoreEnvelope { envelope }); + + Ok(()) + } +} diff --git a/relay-server/src/processing/errors/process.rs b/relay-server/src/processing/errors/process.rs new file mode 100644 index 0000000000..0834adf7fa --- /dev/null +++ b/relay-server/src/processing/errors/process.rs @@ -0,0 +1,160 @@ +use bytes::Bytes; +use relay_event_normalization::GeoIpLookup; +use relay_event_schema::protocol::UserReport; + +use crate::envelope::{ContentType, Items}; +use crate::managed::{Managed, RecordKeeper, Rejected}; +use crate::processing::errors::errors::{self, ErrorKind, SentryError as _}; +use crate::processing::errors::{Error, ExpandedError, Flags, Result, SerializedError}; +use crate::processing::utils::event::EventFullyNormalized; +use crate::processing::{self, Context}; +use crate::services::processor::ProcessingError; + +pub fn expand(error: Managed, ctx: Context<'_>) -> Managed { + error.map(|error, records| do_expand(error, ctx, records).unwrap()) +} + +fn do_expand( + mut error: SerializedError, + ctx: Context<'_>, + _: &mut RecordKeeper<'_>, +) -> Result { + let is_trusted = error.headers.meta().request_trust().is_trusted(); + + let ctx = errors::Context { + envelope: &error.headers, + processing: ctx, + }; + + // TODO: support the "only expand errors in processing" usecase + let parsed = ErrorKind::try_expand(&mut error.items, ctx) + .unwrap() + .unwrap(); + + // TODO: think about forward compatibility with the remaining `items`. + // TODO: event size limit(s), maybe in serialize? + + Ok(ExpandedError { + headers: error.headers, + flags: Flags { + fully_normalized: EventFullyNormalized(is_trusted && parsed.fully_normalized), + }, + metrics: Default::default(), + error: parsed.error, + }) +} + +pub fn process( + error: &mut Managed, + ctx: Context<'_>, +) -> Result<(), Rejected> { + error.try_modify(|error, records| { + error.error.process(errors::Context { + envelope: &error.headers, + processing: ctx, + })?; + + if let Some(user_reports) = error.error.as_ref_mut().user_reports { + process_user_reports(user_reports, records); + } + + Ok::<_, Error>(()) + }) +} + +pub fn finalize( + error: &mut Managed, + ctx: Context<'_>, +) -> Result<(), Rejected> { + error.try_modify(|error, _| { + let e = error.error.as_ref_mut(); + + processing::utils::event::finalize( + &error.headers, + e.event, + e.attachments.iter(), + &mut error.metrics, + ctx.config, + )?; + + Ok::<_, Error>(()) + }) +} + +pub fn normalize( + error: &mut Managed, + geoip_lookup: &GeoIpLookup, + ctx: Context<'_>, +) -> Result<(), Rejected> { + let scoping = error.scoping(); + + error.try_modify(|error, _| { + let e = error.error.as_ref_mut(); + + error.flags.fully_normalized = processing::utils::event::normalize( + &error.headers, + e.event, + error.flags.fully_normalized, + scoping.project_id, + ctx, + geoip_lookup, + )?; + + Ok::<_, Error>(()) + }) +} + +pub fn scrub(error: &mut Managed, ctx: Context<'_>) -> Result<(), Rejected> { + error.try_modify(|error, _| { + let e = error.error.as_ref_mut(); + + processing::utils::event::scrub(e.event, ctx.project_info)?; + processing::utils::attachments::scrub(e.attachments.iter_mut(), ctx.project_info); + + Ok::<_, Error>(()) + }) +} + +/// Validates and normalizes all user report items in the envelope. +/// +/// User feedback items are removed from the envelope if they contain invalid JSON or if the +/// JSON violates the schema (basic type validation). Otherwise, their normalized representation +/// is written back into the item. +fn process_user_reports(user_reports: &mut Items, records: &mut RecordKeeper<'_>) { + for mut user_report in std::mem::take(user_reports) { + let data = match process_user_report(user_report.payload()) { + Ok(data) => data, + Err(err) => { + records.reject_err(err, user_report); + continue; + } + }; + user_report.set_payload(ContentType::Json, data); + user_reports.push(user_report); + } +} + +fn process_user_report(user_report: Bytes) -> Result { + // There is a customer SDK which sends invalid reports with a trailing `\n`, + // strip it here, even if they update/fix their SDK there will still be many old + // versions with the broken SDK out there. + let user_report = trim_whitespaces(&user_report); + + let report = + serde_json::from_slice::(user_report).map_err(ProcessingError::InvalidJson)?; + + serde_json::to_string(&report) + .map(Bytes::from) + .map_err(ProcessingError::SerializeFailed) + .map_err(Into::into) +} + +fn trim_whitespaces(data: &[u8]) -> &[u8] { + let Some(from) = data.iter().position(|x| !x.is_ascii_whitespace()) else { + return &[]; + }; + let Some(to) = data.iter().rposition(|x| !x.is_ascii_whitespace()) else { + return &[]; + }; + &data[from..to + 1] +} diff --git a/relay-server/src/processing/mod.rs b/relay-server/src/processing/mod.rs index a33f8ba7bf..4d003034b7 100644 --- a/relay-server/src/processing/mod.rs +++ b/relay-server/src/processing/mod.rs @@ -24,6 +24,7 @@ pub use self::forward::*; pub use self::limits::*; pub mod check_ins; +pub mod errors; pub mod logs; pub mod profile_chunks; pub mod replays; diff --git a/relay-server/src/processing/transactions/process.rs b/relay-server/src/processing/transactions/process.rs index 79dced9fd4..014e52673b 100644 --- a/relay-server/src/processing/transactions/process.rs +++ b/relay-server/src/processing/transactions/process.rs @@ -199,7 +199,7 @@ pub fn run_inbound_filters( work: &Managed>, ctx: Context<'_>, ) -> Result> { - utils::event::filter(&work.headers, &work.event, &ctx) + utils::event::filter(&work.headers, &work.event, ctx) .map_err(ProcessingError::EventFiltered) .map_err(Error::from) .reject(work) diff --git a/relay-server/src/processing/utils/event.rs b/relay-server/src/processing/utils/event.rs index 92ca842b66..c1cbb8100f 100644 --- a/relay-server/src/processing/utils/event.rs +++ b/relay-server/src/processing/utils/event.rs @@ -346,7 +346,7 @@ pub enum FiltersStatus { pub fn filter( headers: &EnvelopeHeaders, event: &Annotated, - ctx: &Context, + ctx: Context, ) -> Result { let event = match event.value() { Some(event) => event, @@ -384,7 +384,7 @@ pub fn filter( } /// New type representing the normalization state of the event. -#[derive(Copy, Clone)] +#[derive(Debug, Copy, Clone)] pub struct EventFullyNormalized(pub bool); impl EventFullyNormalized { diff --git a/relay-server/src/services/processor.rs b/relay-server/src/services/processor.rs index 5ebaf0c1ec..17e4ff6619 100644 --- a/relay-server/src/services/processor.rs +++ b/relay-server/src/services/processor.rs @@ -48,6 +48,7 @@ use crate::metrics::{MetricOutcomes, MetricsLimiter, MinimalTrackableBucket}; use crate::metrics_extraction::transactions::ExtractedMetrics; use crate::metrics_extraction::transactions::types::ExtractMetricsError; use crate::processing::check_ins::CheckInsProcessor; +use crate::processing::errors::ErrorsProcessor; use crate::processing::logs::LogsProcessor; use crate::processing::profile_chunks::ProfileChunksProcessor; use crate::processing::replays::ReplaysProcessor; @@ -94,7 +95,7 @@ use { mod attachment; mod dynamic_sampling; -mod event; +pub mod event; mod metrics; mod nel; mod profile; @@ -103,7 +104,7 @@ mod report; mod span; #[cfg(all(sentry, feature = "processing"))] -mod playstation; +pub mod playstation; mod standalone; #[cfg(feature = "processing")] mod unreal; @@ -1160,6 +1161,7 @@ struct InnerProcessor { } struct Processing { + errors: ErrorsProcessor, logs: LogsProcessor, trace_metrics: TraceMetricsProcessor, spans: SpansProcessor, @@ -1247,6 +1249,7 @@ impl EnvelopeProcessorService { .map(CardinalityLimiter::new), metric_outcomes, processing: Processing { + errors: ErrorsProcessor::new(Arc::clone("a_limiter), geoip_lookup.clone()), logs: LogsProcessor::new(Arc::clone("a_limiter)), trace_metrics: TraceMetricsProcessor::new(Arc::clone("a_limiter)), spans: SpansProcessor::new(Arc::clone("a_limiter), geoip_lookup.clone()), @@ -1386,11 +1389,11 @@ impl EnvelopeProcessorService { &self.inner.geoip_lookup, )?; let filter_run = - processing::utils::event::filter(managed_envelope.envelope().headers(), &event, &ctx) + processing::utils::event::filter(managed_envelope.envelope().headers(), &event, ctx) .map_err(|err| { - managed_envelope.reject(Outcome::Filtered(err.clone())); - ProcessingError::EventFiltered(err) - })?; + managed_envelope.reject(Outcome::Filtered(err.clone())); + ProcessingError::EventFiltered(err) + })?; if self.inner.config.processing_enabled() || matches!(filter_run, FiltersStatus::Ok) { dynamic_sampling::tag_error_with_sampling_decision( @@ -1671,7 +1674,18 @@ impl EnvelopeProcessorService { relay_log::trace!("Processing {group} group", group = group.variant()); match group { - ProcessingGroup::Error => run!(process_errors, project_id, ctx), + ProcessingGroup::Error => { + if ctx.project_info.has_feature(Feature::NewErrorProcessing) { + self.process_with_processor( + &self.inner.processing.errors, + managed_envelope, + ctx, + ) + .await + } else { + run!(process_errors, project_id, ctx) + } + } ProcessingGroup::Transaction => { self.process_with_processor( &self.inner.processing.transactions, diff --git a/relay-server/src/services/processor/event.rs b/relay-server/src/services/processor/event.rs index c756fd9265..9e4f208de3 100644 --- a/relay-server/src/services/processor/event.rs +++ b/relay-server/src/services/processor/event.rs @@ -219,7 +219,7 @@ fn event_from_json_payload( Ok((event, item.len())) } -fn event_from_security_report( +pub fn event_from_security_report( item: Item, meta: &RequestMeta, ) -> Result { @@ -341,7 +341,7 @@ fn parse_msgpack_breadcrumbs( Ok(breadcrumbs) } -fn event_from_attachments( +pub fn event_from_attachments( config: &Config, event_item: Option, breadcrumbs_item1: Option, @@ -393,7 +393,7 @@ fn event_from_attachments( Ok((event, len)) } -fn merge_formdata(target: &mut SerdeValue, item: Item) { +pub fn merge_formdata(target: &mut SerdeValue, item: Item) { let payload = item.payload(); let mut aggregator = ChunkedFormDataAggregator::new(); diff --git a/relay-server/src/services/processor/nnswitch.rs b/relay-server/src/services/processor/nnswitch.rs index ce1faadfaf..237b0c52bf 100644 --- a/relay-server/src/services/processor/nnswitch.rs +++ b/relay-server/src/services/processor/nnswitch.rs @@ -2,14 +2,17 @@ //! //! These functions are included only in the processing mode. +use bytes::{Buf, Bytes}; +use std::sync::OnceLock; +use zstd::bulk::Decompressor as ZstdDecompressor; + use crate::Envelope; -use crate::envelope::{ContentType, EnvelopeError, Item, ItemType}; +use crate::envelope::{ContentType, Item, ItemType}; use crate::managed::TypedEnvelope; use crate::services::processor::{ErrorGroup, ProcessingError}; use crate::utils; -use bytes::{Buf, Bytes}; -use std::sync::OnceLock; -use zstd::bulk::Decompressor as ZstdDecompressor; + +pub use crate::processing::errors::SwitchProcessingError; /// Magic number indicating the dying message file is encoded by sentry-switch SDK. const SENTRY_MAGIC: &[u8] = "sntr".as_bytes(); @@ -22,21 +25,6 @@ const MAX_DECOMPRESSED_SIZE: usize = 100_1024; type Result = std::result::Result; -/// An error returned when parsing the dying message attachment. -#[derive(Debug, thiserror::Error)] -pub enum SwitchProcessingError { - #[error("invalid json")] - InvalidJson(#[source] serde_json::Error), - #[error("envelope parsing failed")] - EnvelopeParsing(#[from] EnvelopeError), - #[error("unexpected EOF, expected {expected:?}")] - UnexpectedEof { expected: String }, - #[error("invalid {0:?} ({1:?})")] - InvalidValue(String, usize), - #[error("Zstandard error")] - Zstandard(#[source] std::io::Error), -} - /// Expands Nintendo Switch crash-reports. /// /// If the envelope does NOT contain a `dying_message.dat` attachment, it doesn't do anything. @@ -81,12 +69,12 @@ fn expand_dying_message(mut payload: Bytes, envelope: &mut Envelope) -> Result<( let version = payload .try_get_u8() .map_err(|_| SwitchProcessingError::UnexpectedEof { - expected: "version".into(), + expected: "version", })?; match version { 0 => expand_dying_message_v0(payload, envelope), _ => Err(SwitchProcessingError::InvalidValue( - "version".into(), + "version", version as usize, )), } @@ -97,7 +85,7 @@ fn expand_dying_message_v0(mut payload: Bytes, envelope: &mut Envelope) -> Resul let encoding_byte = payload .try_get_u8() .map_err(|_| SwitchProcessingError::UnexpectedEof { - expected: "encoding".into(), + expected: "encoding", })?; let format = (encoding_byte >> 6) & 0b0000_0011; let compression = (encoding_byte >> 4) & 0b0000_0011; @@ -107,7 +95,7 @@ fn expand_dying_message_v0(mut payload: Bytes, envelope: &mut Envelope) -> Resul payload .try_get_u16() .map_err(|_| SwitchProcessingError::UnexpectedEof { - expected: "compressed data length".into(), + expected: "compressed data length", })?; let data = decompress_data( payload, @@ -119,7 +107,7 @@ fn expand_dying_message_v0(mut payload: Bytes, envelope: &mut Envelope) -> Resul match format { 0 => expand_dying_message_from_envelope_items(data, envelope), _ => Err(SwitchProcessingError::InvalidValue( - "payload format".into(), + "payload format", format as usize, )), } @@ -160,7 +148,7 @@ fn decompress_data( ) -> Result { if payload.len() < compressed_length { return Err(SwitchProcessingError::InvalidValue( - "compressed data length".into(), + "compressed data length", compressed_length, )); } @@ -174,7 +162,7 @@ fn decompress_data( .map(Bytes::from) .map_err(SwitchProcessingError::Zstandard), _ => Err(SwitchProcessingError::InvalidValue( - "compression format".into(), + "compression format", compression as usize, )), } diff --git a/relay-server/src/services/processor/playstation.rs b/relay-server/src/services/processor/playstation.rs index 03bf19b8cf..b8d2f542f8 100644 --- a/relay-server/src/services/processor/playstation.rs +++ b/relay-server/src/services/processor/playstation.rs @@ -12,6 +12,7 @@ use relay_event_schema::protocol::{Event, TagEntry}; use relay_prosperoconv::{self, ProsperoDump}; use relay_protocol::{Annotated, Empty, Object}; +use crate::constants::SENTRY_CRASH_PAYLOAD_KEY; use crate::envelope::{AttachmentType, ContentType, Item, ItemType}; use crate::managed::TypedEnvelope; use crate::services::processor::metric; @@ -20,9 +21,6 @@ use crate::services::projects::project::ProjectInfo; use crate::statsd::RelayCounters; use crate::utils; -/// Name of the custom tag in the UserData for Sentry event payloads. -const SENTRY_PAYLOAD_KEY: &str = "__sentry"; - pub fn expand( managed_envelope: &mut TypedEnvelope, config: &Config, @@ -48,7 +46,7 @@ pub fn expand( ProcessingError::InvalidPlaystationDump(format!("Failed to create minidump: {err}")) })?; - if let Some(json) = prospero_dump.userdata.get(SENTRY_PAYLOAD_KEY) { + if let Some(json) = prospero_dump.userdata.get(SENTRY_CRASH_PAYLOAD_KEY) { let event = envelope.take_item_by(|item| item.ty() == &ItemType::Event); let event_item = merge_or_create_event_item(json, event); envelope.add_item(event_item); @@ -92,7 +90,10 @@ pub fn process( // If "__sentry" is not a key in the userdata do the legacy extraction. // This should be removed once all customers migrated to the new format. - if !&prospero_dump.userdata.contains_key(SENTRY_PAYLOAD_KEY) { + if !&prospero_dump + .userdata + .contains_key(SENTRY_CRASH_PAYLOAD_KEY) + { legacy_userdata_extraction(event, &prospero_dump); } merge_playstation_context(event, &prospero_dump); @@ -134,7 +135,7 @@ fn add_attachments( } } -fn legacy_userdata_extraction(event: &mut Event, prospero: &ProsperoDump) { +pub fn legacy_userdata_extraction(event: &mut Event, prospero: &ProsperoDump) { let contexts = event.contexts.get_or_insert_with(Contexts::default); let tags = event.tags.value_mut().get_or_insert_with(Tags::default); macro_rules! add_tag { @@ -203,7 +204,7 @@ fn legacy_userdata_extraction(event: &mut Event, prospero: &ProsperoDump) { } } -fn merge_playstation_context(event: &mut Event, prospero: &ProsperoDump) { +pub fn merge_playstation_context(event: &mut Event, prospero: &ProsperoDump) { let contexts = event.contexts.get_or_insert_with(Contexts::default); let platform = "PS5"; @@ -270,7 +271,7 @@ fn merge_playstation_context(event: &mut Event, prospero: &ProsperoDump) { }); } -fn infer_content_type(filename: &str) -> ContentType { +pub fn infer_content_type(filename: &str) -> ContentType { // Since we only receive a limited selection of files through this mechanism this simple logic // should be enough. let extension = filename.rsplit('.').next().map(str::to_lowercase); diff --git a/relay-server/src/utils/param_parser.rs b/relay-server/src/utils/param_parser.rs index fa036728d9..1f5ff4cf83 100644 --- a/relay-server/src/utils/param_parser.rs +++ b/relay-server/src/utils/param_parser.rs @@ -31,22 +31,23 @@ where } } -/// Merge two serde values. +/// Merge two [`serde_json::Value`] items. +/// +/// Fills the value `a` with values from `b`. This does not overwrite values from `a` with `b`. /// /// Taken (with small changes) from stack overflow answer: /// . pub fn merge_values(a: &mut Value, b: Value) { match (a, b) { - //recursively merge dicts - (a @ &mut Value::Object(_), Value::Object(b)) => { - let a = a.as_object_mut().unwrap(); + // Recursively merge dicts + (Value::Object(a), Value::Object(b)) => { for (k, v) in b { merge_values(a.entry(k).or_insert(Value::Null), v); } } - //fill in missing left values + // Fill in missing left values (a @ &mut Value::Null, b) => *a = b, - //do not override existing values that are not maps + // Do not override existing values that are not maps (_a, _b) => {} } } diff --git a/relay-server/src/utils/unreal.rs b/relay-server/src/utils/unreal.rs index 078bdcd403..8d15ea28fb 100644 --- a/relay-server/src/utils/unreal.rs +++ b/relay-server/src/utils/unreal.rs @@ -12,23 +12,24 @@ use symbolic_unreal::{ }; use crate::constants::{ - ITEM_NAME_BREADCRUMBS1, ITEM_NAME_BREADCRUMBS2, ITEM_NAME_EVENT, UNREAL_USER_HEADER, + ITEM_NAME_BREADCRUMBS1, ITEM_NAME_BREADCRUMBS2, ITEM_NAME_EVENT, SENTRY_CRASH_PAYLOAD_KEY, + UNREAL_USER_HEADER, }; -use crate::envelope::{AttachmentType, ContentType, Envelope, Item, ItemType}; +use crate::envelope::{AttachmentType, ContentType, Envelope, Item, ItemType, Items}; use crate::services::processor::ProcessingError; /// Maximum number of unreal logs to parse for breadcrumbs. const MAX_NUM_UNREAL_LOGS: usize = 40; -/// Name of the custom XML tag in Unreal GameData for Sentry event payloads. -const SENTRY_PAYLOAD_KEY: &str = "__sentry"; - /// Client SDK name used for the event payload to identify the UE4 crash reporter. const CLIENT_SDK_NAME: &str = "unreal.crashreporter"; fn get_event_item(data: &[u8]) -> Result, Unreal4Error> { let mut context = Unreal4Context::parse(data)?; - let json = match context.game_data.remove(SENTRY_PAYLOAD_KEY) { + let json = match context + .game_data + .remove(crate::constants::SENTRY_CRASH_PAYLOAD_KEY) + { Some(json) if !json.is_empty() => json, _ => return Ok(None), }; @@ -52,13 +53,38 @@ pub fn expand_unreal_envelope( envelope: &mut Envelope, config: &Config, ) -> Result<(), ProcessingError> { - let payload = unreal_item.payload(); - let crash = Unreal4Crash::parse_with_limit(&payload, config.max_envelope_size())?; - - let mut has_event = envelope + let has_event = envelope .get_item_by(|item| item.ty() == &ItemType::Event) .is_some(); + let expansion = expand_unreal(unreal_item, config)?; + + if !has_event && let Some(event) = expansion.event { + envelope.add_item(event); + } + + for attachment in expansion.attachments { + envelope.add_item(attachment); + } + + if let Err(offender) = super::check_envelope_size_limits(config, envelope) { + return Err(ProcessingError::PayloadTooLarge(offender)); + } + + Ok(()) +} + +/// Expands an Unreal 4 item and returns the expanded items. +pub fn expand_unreal( + unreal_item: Item, + config: &Config, +) -> Result { + let mut event = None; + let mut attachments = Items::new(); + + let payload = unreal_item.payload(); + let crash = Unreal4Crash::parse_with_limit(&payload, config.max_envelope_size())?; + for file in crash.files() { let (content_type, attachment_type) = match file.ty() { Unreal4FileType::Minidump => (ContentType::Minidump, AttachmentType::Minidump), @@ -76,27 +102,29 @@ pub fn expand_unreal_envelope( }, }; - if !has_event - && attachment_type == AttachmentType::UnrealContext - && let Some(event_item) = get_event_item(file.data())? - { - envelope.add_item(event_item); - has_event = true; + if event.is_none() && attachment_type == AttachmentType::UnrealContext { + event = get_event_item(file.data())?; } let mut item = Item::new(ItemType::Attachment); item.set_filename(file.name()); // TODO: This clones data. Update symbolic to allow moving the bytes out. + // + // See: . item.set_payload(content_type, file.data().to_owned()); item.set_attachment_type(attachment_type); - envelope.add_item(item); + attachments.push(item); } - if let Err(offender) = super::check_envelope_size_limits(config, envelope) { - return Err(ProcessingError::PayloadTooLarge(offender)); - } + Ok(UnrealExpansion { event, attachments }) +} - Ok(()) +/// Expansion from an Unreal 4 report. +pub struct UnrealExpansion { + /// The error event if the crash contained one. + pub event: Option, + /// Files of the report as attachments. + pub attachments: Items, } fn merge_unreal_user_info(event: &mut Event, user_info: &str) { @@ -311,7 +339,7 @@ fn merge_unreal_context(event: &mut Event, context: Unreal4Context) { let filtered_keys = context .game_data .into_iter() - .filter(|(key, _)| key != SENTRY_PAYLOAD_KEY) + .filter(|(key, _)| key != SENTRY_CRASH_PAYLOAD_KEY) .map(|(key, value)| (key, Annotated::new(Value::String(value)))); game_context.extend(filtered_keys); @@ -350,17 +378,47 @@ pub fn process_unreal_envelope( let user_header = envelope .get_header(UNREAL_USER_HEADER) .and_then(Value::as_str); - let context_item = - envelope.get_item_by(|item| item.attachment_type() == Some(&AttachmentType::UnrealContext)); - let mut logs_items = envelope - .items() + + // the `unwrap_or_default` here can produce an invalid user report if the envelope id + // is indeed missing. This should not happen under normal circumstances since the EventId is + // created statically. + let event_id = envelope.event_id().unwrap_or_default(); + debug_assert!(!event_id.is_nil()); + + match process_unreal(event_id, event, envelope.items(), user_header)? { + Some(r) => { + for item in r.user_reports { + envelope.add_item(item); + } + Ok(true) + } + None => Ok(false), + } +} + +/// Processes an unreal crash report. +/// +/// The `user_header` should be extracted from the [`UNREAL_USER_HEADER`] envelope header. +pub fn process_unreal<'a>( + event_id: EventId, + event: &mut Annotated, + attachments: impl IntoIterator + Clone, + user_header: Option<&str>, +) -> Result, Unreal4Error> { + let context_item = attachments + .clone() + .into_iter() + .find(|item| item.attachment_type() == Some(&AttachmentType::UnrealContext)); + + let mut logs_items = attachments + .into_iter() .filter(|item| item.attachment_type() == Some(&AttachmentType::UnrealLogs)) .map(|item| item.payload()) .peekable(); // Early exit if there is no information. if user_header.is_none() && context_item.is_none() && logs_items.peek().is_none() { - return Ok(false); + return Ok(None); } // If we have UE4 info, ensure an event is there to fill. DO NOT fill if there is no unreal @@ -373,23 +431,30 @@ pub fn process_unreal_envelope( merge_unreal_logs(event, logs_items)?; + let mut user_reports = Items::new(); if let Some(context_item) = context_item { let mut context = Unreal4Context::parse(&context_item.payload())?; // the `unwrap_or_default` here can produce an invalid user report if the envelope id // is indeed missing. This should not happen under normal circumstances since the EventId is // created statically. - let event_id = envelope.event_id().unwrap_or_default(); - debug_assert!(!event_id.is_nil()); + // let event_id = envelope.event_id().unwrap_or_default(); + // debug_assert!(!event_id.is_nil()); if let Some(report) = get_unreal_user_report(event_id, &mut context) { - envelope.add_item(report); + user_reports.push(report); } merge_unreal_context(event, context); } - Ok(true) + Ok(Some(ProcessedUnrealReport { user_reports })) +} + +/// Result when processing an unreal report. +pub struct ProcessedUnrealReport { + /// User reports contained in the report. + pub user_reports: Items, } #[cfg(test)]