Skip to content
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 67 additions & 8 deletions bins/nittei/src/telemetry.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,24 @@
use opentelemetry::{global, propagation::TextMapCompositePropagator, trace::TracerProvider};
use opentelemetry::{
global,
propagation::TextMapCompositePropagator,
trace::{TraceContextExt, TracerProvider},
};
use opentelemetry_datadog::{ApiVersion, DatadogPipelineBuilder, DatadogPropagator};
use opentelemetry_otlp::{WithExportConfig, WithHttpConfig};
use opentelemetry_sdk::{
Resource,
propagation::TraceContextPropagator,
trace::{self, RandomIdGenerator, Sampler, SdkTracerProvider},
};
use tracing::warn;
use tracing_subscriber::{EnvFilter, Registry, layer::SubscriberExt};
use tracing::{Event, Subscriber, warn};
use tracing_subscriber::{
EnvFilter,
Registry,
field::VisitOutput,
fmt::{FmtContext, FormatEvent, format::Writer},
layer::SubscriberExt,
registry::LookupSpan,
};

/// Register a subscriber as global default to process span data.
///
Expand Down Expand Up @@ -57,11 +68,7 @@ pub fn init_subscriber() -> anyhow::Result<()> {
if let Some(telemetry_layer) = telemetry_layer {
let subscriber = Registry::default()
.with(env_filter)
.with(
tracing_subscriber::fmt::layer()
.json()
.with_current_span(false),
)
.with(tracing_subscriber::fmt::layer().event_format(DatadogFormatter))
.with(telemetry_layer);

// Set the global subscriber
Expand Down Expand Up @@ -191,3 +198,55 @@ fn get_sampler() -> Sampler {
// (2) if no parent, then the trace id ratio
Sampler::ParentBased(Box::new(Sampler::TraceIdRatioBased(ratio_to_sample)))
}

/// Empty struct to implement the FormatEvent trait
struct DatadogFormatter;

/// Format the event to be compatible with Datadog
/// This allows to correlate the logs with the traces
impl<S, N> FormatEvent<S, N> for DatadogFormatter
where
S: Subscriber + for<'a> LookupSpan<'a>,
N: for<'a> tracing_subscriber::fmt::FormatFields<'a> + 'static,
{
fn format_event(
&self,
ctx: &FmtContext<'_, S, N>,
mut writer: Writer<'_>,
event: &Event<'_>,
) -> std::fmt::Result {
// Start the JSON object
write!(writer, "{{")?;

// Add trace and span IDs if available
if let Some(span) = ctx.lookup_current() {
let extensions = span.extensions();
let otel_ctx = extensions.get::<opentelemetry::Context>();
if let Some(ctx) = otel_ctx {
if let Some(span) = ctx
.span()
.span_context()
.is_valid()
.then(|| ctx.span().span_context().clone())
{
write!(
writer,
"\"dd.trace_id\":\"{}\",\"dd.span_id\":\"{}\",",
span.trace_id(),
span.span_id()
)?;
}
}
}

// Format the event fields
let mut visitor = tracing_subscriber::fmt::format::JsonVisitor::new(&mut writer);
event.record(&mut visitor);
visitor.finish()?;

// Close the JSON object
write!(writer, "}}")?;

Ok(())
}
}