Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions Cargo.lock
Original file line number Diff line number Diff line change
Expand Up @@ -5075,9 +5075,9 @@ dependencies = [

[[package]]
name = "sentry-kafka-schemas"
version = "2.1.21"
version = "2.1.23"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "18d151a1fd11abf859f75d199a49ae16c4e0460d4461171ae2c6a477599ca6d7"
checksum = "e9e5223681aafffe1d97fe54ce0808e2ea642bea0ef1ce568ed966a03e3193ff"
dependencies = [
"jsonschema",
"prost 0.14.3",
Expand Down Expand Up @@ -5167,9 +5167,9 @@ dependencies = [

[[package]]
name = "sentry_protos"
version = "0.5.0"
version = "0.7.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4544fc955b4587df17166521e836334c337c994d839f07e906d2838274d0d90a"
checksum = "dcca2ce7135d2721bd635ff5a55ce8f3b124f854a643a1f8dfc82410b04487df"
dependencies = [
"prost 0.14.3",
"prost-types",
Expand Down
4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -183,12 +183,12 @@ sentry = { version = "0.41.0", default-features = false, features = [
"transport",
] }
sentry-core = "0.41.0"
sentry-kafka-schemas = { version = "2.1.14", default-features = false }
sentry-kafka-schemas = { version = "2.1.23", default-features = false }
sentry-release-parser = { version = "1.4.0", default-features = false, features = [
"semver-1",
] }
sentry-types = "0.41.0"
sentry_protos = "0.5.0"
sentry_protos = "0.7.0"
serde = { version = "=1.0.228", features = ["derive", "rc"] }
serde-transcode = "1.1.1"
serde-vars = "0.3.1"
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ dev = [
"pyyaml>=6.0.3",
"redis>=5.0.1",
"requests>=2.32.5",
"sentry-protos>=0.4.14",
"sentry-protos>=0.7.0",
"sentry-sdk>=2.50.0",
"types-protobuf>=6.30.2.20250703",
"types-pyyaml>=6.0.12.20241230",
Expand Down
17 changes: 13 additions & 4 deletions relay-base-schema/src/data_category.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use crate::events::EventType;
/// An error that occurs if a number cannot be converted into a [`DataCategory`].
#[derive(Debug, PartialEq, thiserror::Error)]
#[error("Unknown numeric data category {0} can not be converted into a DataCategory.")]
pub struct UnknownDataCategory(pub u8);
pub struct UnknownDataCategory(pub u32);

/// Classifies the type of data that is being ingested.
#[derive(Clone, Copy, Debug, Eq, Hash, Ord, PartialEq, PartialOrd)]
Expand Down Expand Up @@ -420,11 +420,20 @@ impl TryFrom<u8> for DataCategory {
32 => Ok(Self::InstallableBuild),
33 => Ok(Self::TraceMetric),
34 => Ok(Self::SeerUser),
other => Err(UnknownDataCategory(other)),
other => Err(UnknownDataCategory(other as u32)),
}
}
}

impl TryFrom<u32> for DataCategory {
type Error = UnknownDataCategory;

fn try_from(value: u32) -> Result<Self, UnknownDataCategory> {
let value = u8::try_from(value).map_err(|_| UnknownDataCategory(value))?;
value.try_into()
}
}

/// The unit in which a data category is measured.
///
/// This enum specifies how quantities for different data categories are measured,
Expand Down Expand Up @@ -543,8 +552,8 @@ mod tests {
// If this test fails, update the numeric bounds so that the first assertion
// maps to the last variant in the enum and the second assertion produces an error
// that the DataCategory does not exist.
assert_eq!(DataCategory::try_from(34), Ok(DataCategory::SeerUser));
assert_eq!(DataCategory::try_from(35), Err(UnknownDataCategory(35)));
assert_eq!(DataCategory::try_from(34u8), Ok(DataCategory::SeerUser));
assert_eq!(DataCategory::try_from(35u8), Err(UnknownDataCategory(35)));
}

#[test]
Expand Down
10 changes: 10 additions & 0 deletions relay-dynamic-config/src/global.rs
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,16 @@ pub struct Options {
)]
pub sessions_eap_rollout_rate: f32,

/// Rollout rate for accepted outcomes being emitted by EAP instead of Relay.
///
/// Rate needs to be between `0.0` and `1.0`.
#[serde(
rename = "relay.eap-outcomes.rollout-rate",
deserialize_with = "default_on_error",
skip_serializing_if = "is_default"
)]
pub eap_outcomes_rollout_rate: f32,

/// All other unknown options.
#[serde(flatten)]
other: HashMap<String, Value>,
Expand Down
26 changes: 26 additions & 0 deletions relay-server/src/managed/counted.rs
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,32 @@ impl Counted for SessionAggregateItem {
}
}

#[cfg(feature = "processing")]
impl Counted for sentry_protos::snuba::v1::Outcomes {
fn quantities(&self) -> Quantities {
self.category_count
.iter()
.inspect(|cc| {
debug_assert!(DataCategory::try_from(cc.data_category).is_ok());
debug_assert!(usize::try_from(cc.quantity).is_ok());
})
.filter_map(|cc| {
Some((
DataCategory::try_from(cc.data_category).ok()?,
usize::try_from(cc.quantity).ok()?,
))
})
.collect()
}
}

#[cfg(feature = "processing")]
impl Counted for sentry_protos::snuba::v1::TraceItem {
fn quantities(&self) -> Quantities {
self.outcomes.quantities()
}
}

impl<T> Counted for &T
where
T: Counted,
Expand Down
10 changes: 5 additions & 5 deletions relay-server/src/processing/logs/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@ use uuid::Uuid;

use crate::envelope::WithHeader;
use crate::processing::logs::{Error, Result};
use crate::processing::utils::store::{extract_meta_attributes, proto_timestamp, uuid_to_item_id};
use crate::processing::utils::store::{
extract_meta_attributes, proto_timestamp, quantities_to_trace_item_outcomes, uuid_to_item_id,
};
use crate::processing::{self, Counted, Retention};
use crate::services::outcome::DiscardReason;
use crate::services::store::StoreTraceItem;
Expand Down Expand Up @@ -74,12 +76,10 @@ pub fn convert(log: WithHeader<OurLog>, ctx: &Context) -> Result<StoreTraceItem>
attributes: attributes(meta, attrs, fields),
client_sample_rate: 1.0,
server_sample_rate: 1.0,
outcomes: Some(quantities_to_trace_item_outcomes(quantities, ctx.scoping)),
};

Ok(StoreTraceItem {
trace_item,
quantities,
})
Ok(StoreTraceItem { trace_item })
}

/// Fields on the log message which are stored as fields.
Expand Down
10 changes: 7 additions & 3 deletions relay-server/src/processing/trace_attachments/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,12 @@ use relay_protocol::{Annotated, IntoValue, Value};
use relay_quotas::Scoping;
use sentry_protos::snuba::v1::{AnyValue, TraceItem, TraceItemType, any_value};

use crate::managed::{Managed, Rejected};
use crate::managed::{Counted, Managed, Quantities, Rejected};
use crate::processing::Retention;
use crate::processing::trace_attachments::types::ExpandedAttachment;
use crate::processing::utils::store::{
AttributeMeta, extract_client_sample_rate, extract_meta_attributes, proto_timestamp,
uuid_to_item_id,
quantities_to_trace_item_outcomes, uuid_to_item_id,
};
use crate::services::outcome::{DiscardReason, Outcome};
use crate::services::upload::StoreAttachment;
Expand All @@ -25,6 +25,8 @@ pub fn convert(
let scoping = attachment.scoping();
let received_at = attachment.received_at();
attachment.try_map(|attachment, _record_keeper| {
let quantities = attachment.quantities();

let ExpandedAttachment {
parent_id,
meta,
Expand All @@ -37,7 +39,7 @@ pub fn convert(
retention,
server_sample_rate,
};
let trace_item = attachment_to_trace_item(meta, ctx)
let trace_item = attachment_to_trace_item(meta, quantities, ctx)
.ok_or(Outcome::Invalid(DiscardReason::InvalidTraceAttachment))?;

Ok::<_, Outcome>(StoreAttachment { trace_item, body })
Expand All @@ -62,6 +64,7 @@ struct Context {

fn attachment_to_trace_item(
meta: Annotated<TraceAttachmentMeta>,
quantities: Quantities,
ctx: Context,
) -> Option<TraceItem> {
let meta = meta.into_value()?;
Expand Down Expand Up @@ -99,6 +102,7 @@ fn attachment_to_trace_item(
retention_days: ctx.retention.standard as u32,
received: Some(proto_timestamp(ctx.received_at)),
downsampled_retention_days: ctx.retention.downsampled as u32,
outcomes: Some(quantities_to_trace_item_outcomes(quantities, ctx.scoping)),
};
Some(trace_item)
}
Expand Down
9 changes: 4 additions & 5 deletions relay-server/src/processing/trace_metrics/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@ use uuid::Uuid;
use crate::envelope::WithHeader;
use crate::processing::trace_metrics::{Error, Result};
use crate::processing::utils::store::{
extract_client_sample_rate, extract_meta_attributes, uuid_to_item_id,
extract_client_sample_rate, extract_meta_attributes, quantities_to_trace_item_outcomes,
uuid_to_item_id,
};
use crate::processing::{self, Counted, Retention};
use crate::services::outcome::DiscardReason;
Expand Down Expand Up @@ -75,12 +76,10 @@ pub fn convert(metric: WithHeader<TraceMetric>, ctx: &Context) -> Result<StoreTr
attributes: attributes(meta, attrs, fields),
client_sample_rate,
server_sample_rate: 1.0,
outcomes: Some(quantities_to_trace_item_outcomes(quantities, ctx.scoping)),
};

Ok(StoreTraceItem {
trace_item,
quantities,
})
Ok(StoreTraceItem { trace_item })
}

fn ts(dt: DateTime<Utc>) -> Timestamp {
Expand Down
21 changes: 20 additions & 1 deletion relay-server/src/processing/utils/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,13 @@ use relay_conventions::CLIENT_SAMPLE_RATE;
use relay_event_schema::protocol::Attributes;
use relay_protocol::{Annotated, IntoValue, MetaTree, Value};

use sentry_protos::snuba::v1::{AnyValue, ArrayValue, any_value};
use relay_quotas::Scoping;
use sentry_protos::snuba::v1::{AnyValue, ArrayValue, CategoryCount, Outcomes, any_value};
use serde::Serialize;
use uuid::Uuid;

use crate::managed::Quantities;

/// Represents metadata extracted from Relay's annotated model.
///
/// This struct holds metadata about processing errors, transformations, and other
Expand Down Expand Up @@ -221,3 +224,19 @@ pub fn item_id_to_uuid(item_id: &[u8]) -> Result<Uuid, TryFromSliceError> {
let item_id = u128::from_le_bytes(item_id);
Ok(Uuid::from_u128(item_id))
}

/// Converts [`Quantities`] and [`Scoping`] into Trace Item [`Outcomes`].
pub fn quantities_to_trace_item_outcomes(q: Quantities, scoping: Scoping) -> Outcomes {
let category_count = q
.into_iter()
.map(|(category, quantity)| CategoryCount {
data_category: category as u32,
quantity: quantity as u64,
})
.collect();

Outcomes {
category_count,
key_id: scoping.key_id.unwrap_or(0),
}
}
30 changes: 19 additions & 11 deletions relay-server/src/services/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,16 +111,11 @@ pub struct StoreMetrics {
pub struct StoreTraceItem {
/// The final trace item which will be produced to Kafka.
pub trace_item: TraceItem,
/// Outcomes to be emitted when successfully producing the item to Kafka.
///
/// Note: this is only a temporary measure, long term these outcomes will be part of the trace
/// item and emitted by Snuba to guarantee a delivery to storage.
pub quantities: Quantities,
}

impl Counted for StoreTraceItem {
fn quantities(&self) -> Quantities {
self.quantities.clone()
self.trace_item.quantities()
}
}

Expand Down Expand Up @@ -631,18 +626,31 @@ impl StoreService {
let scoping = message.scoping();
let received_at = message.received_at();

let quantities = message.try_accept(|item| {
let eap_emits_outcomes = utils::is_rolled_out(
scoping.organization_id.value(),
self.global_config
.current()
.options
.eap_outcomes_rollout_rate,
)
.is_keep();

let outcomes = message.try_accept(|mut item| {
let outcomes = match eap_emits_outcomes {
true => None,
false => item.trace_item.outcomes.take(),
};

let message = KafkaMessage::for_item(scoping, item.trace_item);
self.produce(KafkaTopic::Items, message)
.map(|()| item.quantities)
self.produce(KafkaTopic::Items, message).map(|()| outcomes)
});

// Accepted outcomes when items have been successfully produced to rdkafka.
//
// This is only a temporary measure, long term these outcomes will be part of the trace
// item and emitted by Snuba to guarantee a delivery to storage.
if let Ok(quantities) = quantities {
for (category, quantity) in quantities {
if let Ok(Some(outcomes)) = outcomes {
for (category, quantity) in outcomes.quantities() {
self.outcome_aggregator.send(TrackOutcome {
category,
event_id: None,
Expand Down
1 change: 1 addition & 0 deletions relay-server/src/services/store/sessions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ pub fn to_trace_item(scoping: Scoping, bucket: Bucket, retention: u16) -> Option
attributes,
client_sample_rate: 1.0,
server_sample_rate: 1.0,
outcomes: None,
})
}

Expand Down
13 changes: 2 additions & 11 deletions relay-server/src/services/upload.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,10 @@ use futures::stream::FuturesUnordered;
use futures::{FutureExt, StreamExt};
use objectstore_client::{Client, ExpirationPolicy, PutBuilder, Session, Usecase};
use relay_config::UploadServiceConfig;
use relay_quotas::DataCategory;
use relay_system::{
Addr, AsyncResponse, FromMessage, Interface, NoResponse, Receiver, Sender, Service,
};
use sentry_protos::snuba::v1::TraceItem;
use smallvec::smallvec;
use uuid::Uuid;

use crate::constants::DEFAULT_ATTACHMENT_RETENTION;
Expand Down Expand Up @@ -100,10 +98,7 @@ pub struct StoreAttachment {

impl Counted for StoreAttachment {
fn quantities(&self) -> Quantities {
smallvec![
(DataCategory::AttachmentItem, 1),
(DataCategory::Attachment, self.body.len()),
]
self.trace_item.quantities()
}
}

Expand Down Expand Up @@ -357,7 +352,6 @@ impl UploadServiceInner {
.map_err(Error::UploadFailed)
.reject(&managed)?;

let quantities = managed.quantities();
let body = Bytes::clone(&managed.body);

// Make sure that the attachment can be converted into a trace item:
Expand All @@ -366,10 +360,7 @@ impl UploadServiceInner {
trace_item,
body: _,
} = attachment;
Ok::<_, Error>(StoreTraceItem {
trace_item,
quantities,
})
Ok::<_, Error>(StoreTraceItem { trace_item })
})?;

// Upload the attachment:
Expand Down
Loading
Loading