Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 26 additions & 17 deletions crates/mirror-bin/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use mirror_core::{run_mirror, MetricLabels, MIRROR_LABELS};
use mirror_fs::{FilesystemSink, FilesystemSinkConfig};
use mirror_kafka::{KafkaSink, KafkaSinkConfig, KafkaSource, KafkaSourceConfig};
use mirror_s3::{S3Sink, S3SinkConfig};
use tracing::Instrument;
use tracing_subscriber::EnvFilter;

#[derive(Parser)]
Expand Down Expand Up @@ -687,24 +688,32 @@ async fn spawn_mirror(
.map_err(|e| anyhow::anyhow!("opening tee for mirror {name}: {e}"))?;

let destinations_log = dest_descriptions.join(",");
Ok(tokio::spawn(async move {
tracing::info!(
mirror = %name,
destinations = %destinations_log,
compaction,
"loop start"
);
let result = MIRROR_LABELS
.scope(
labels,
run_mirror(source, tee, shutdown_signal(shutdown_rx)),
)
.await;
match result {
Ok(()) => Ok(()),
Err(e) => Err(anyhow::anyhow!("mirror {name}: {e}")),
// Single span carries `mirror = <name>` onto every event emitted
// from the spawned task — including the mirror-core logs
// (`starting mirror`, `heartbeat`, etc.) that don't otherwise have
// access to the operator-chosen mirror name. MIRROR_LABELS still
// carries topic+partition for metric labeling separately.
let span = tracing::info_span!("mirror", name = %name);
Ok(tokio::spawn(
async move {
tracing::info!(
destinations = %destinations_log,
compaction,
"loop start"
);
let result = MIRROR_LABELS
.scope(
labels,
run_mirror(source, tee, shutdown_signal(shutdown_rx)),
)
.await;
match result {
Ok(()) => Ok(()),
Err(e) => Err(anyhow::anyhow!("mirror {name}: {e}")),
}
}
}))
.instrument(span),
))
}

async fn open_inner_sink(
Expand Down
165 changes: 140 additions & 25 deletions crates/mirror-core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,40 @@ pub struct Header {
pub value: Option<Vec<u8>>,
}

/// Which buffer trigger caused a blob sink (FS / S3) to flush. Used
/// only as a label on the `flushed batch` log line so operators can
/// tell why a given snapshot was emitted without grepping for the
/// matching threshold in the config.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum FlushTrigger {
/// `flush.max-offsets` reached: the buffer accumulated as many
/// records as the config allows.
MaxOffsets,
/// `flush.max-bytes` reached: the buffered payload size hit the
/// configured byte cap.
MaxBytes,
/// `flush.max-time-ms` elapsed since the first record landed in
/// the buffer.
MaxTime,
/// `flush.daily.at-utc` wall-clock boundary crossed.
Daily,
/// Explicit `Sink::flush` (graceful shutdown, end-of-test, or any
/// other operator-driven flush).
Explicit,
}

impl FlushTrigger {
pub fn as_str(self) -> &'static str {
match self {
FlushTrigger::MaxOffsets => "max-offsets",
FlushTrigger::MaxBytes => "max-bytes",
FlushTrigger::MaxTime => "max-time",
FlushTrigger::Daily => "daily",
FlushTrigger::Explicit => "explicit",
}
}
}

/// Topic-schema declaration for a record column (`key` or `value`).
///
/// This is the runtime representation shared by all sinks. The
Expand Down Expand Up @@ -225,24 +259,35 @@ pub trait Sink: Send {
false
}

/// Called by the run loop after the bootstrap branch has decided
/// to skip from this sink's `next_expected_offset()` to the
/// source's `low_watermark` — i.e. when [`Self::allows_compacted_source`]
/// returned true and the source has been compacted past the sink's
/// current durable position. The sink must advance its internal
/// "next expected offset" to `low_watermark` so that:
/// 1. the next [`Self::next_expected_offset`] call returns
/// `low_watermark` (idle-drift check stays consistent);
/// 2. [`Self::write`] accepts the next incoming record at
/// `low_watermark` (the per-record gate stays consistent);
/// 3. blob/file naming reflects `low_watermark` as the new
/// Called by the run loop to advance this sink's internal
/// "next expected offset" to a higher value. The sink must update
/// state so that:
/// 1. the next [`Self::next_expected_offset`] call returns the
/// new value (idle-drift check stays consistent);
/// 2. [`Self::write`] accepts the next incoming record at the
/// new value (the per-record gate stays consistent);
/// 3. blob/file naming reflects the new value as the new
/// starting offset for the snapshot range.
///
/// Only ever called when both invariants hold:
/// `allows_compacted_source() == true` and the loop's first call
/// to `next_expected_offset()` returned a value strictly less than
/// `low_watermark`. Default impl is a no-op (sinks that don't
/// override `allows_compacted_source` never see this call).
/// Two call sites today, both guarded by
/// `allows_compacted_source() == true`:
///
/// - **Bootstrap pre-align.** The run loop's bootstrap branch
/// calls this with the source's `low_watermark` when the
/// source has been compacted/trimmed past the sink's current
/// durable position (`sink.next_expected_offset() < low_watermark`).
/// The argument is therefore the broker's reported low
/// watermark.
/// - **First-delivery alignment.** Inside the run loop, when the
/// broker delivers an offset *above* `expected` (the
/// `cleanup.policy=compact` case where `LogStartOffset = 0`
/// masks the actual deliverable start), this is called with
/// that delivered offset before the record is written.
///
/// Either way the argument is "the new authoritative start offset",
/// not specifically a watermark. Default impl is a no-op (sinks
/// that don't override `allows_compacted_source` never see this
/// call).
async fn align_to_source_low_watermark(&mut self, low_watermark: u64) -> Result<(), SinkError> {
let _ = low_watermark;
Ok(())
Expand All @@ -269,11 +314,23 @@ pub enum MirrorError {
Source(#[from] SourceError),
#[error(transparent)]
Sink(#[from] SinkError),
/// Source delivered a record whose offset does not match what we
/// asked for. Indicates a Kafka client bug, a producer that skipped
/// offsets (impossible in normal Kafka), or a logic error.
#[error("source delivered offset {actual}, expected {expected}")]
SourceOffsetMismatch { expected: u64, actual: u64 },
/// Source delivered an offset *below* `expected`. Always a hard
/// error: a Kafka client bug, a producer that rewound, or a
/// destination chain that has somehow advanced past the broker.
#[error("source delivered offset {got}, expected at least {expected} (went backwards)")]
SourceWentBackwards { expected: u64, got: u64 },
/// Source delivered an offset *above* `expected`. Hard error in
/// append mode (would leave a gap in the destination chain).
/// Recoverable under `compaction: log`: the run loop aligns the
/// sink to the delivered offset and continues — the broker's
/// `LogStartOffset` reports 0 for a `cleanup.policy=compact`
/// topic even when the earliest deliverable record is much later
/// (compaction deduplicates by key but does not advance the
/// segment start), so the bootstrap pre-align can only be a hint
/// and the first-delivery offset is the authoritative starting
/// point.
#[error("source delivered offset {got}, expected {expected} (gap above expected)")]
SourceGapAboveExpected { expected: u64, got: u64 },
/// Sink's view of next-expected-offset diverged from what we
/// believed while we were idle. Indicates an out-of-band write or
/// a topic reset.
Expand Down Expand Up @@ -436,7 +493,15 @@ where
}
} => {
let progressed = expected - last_heartbeat_offset;
tracing::info!(
// Heartbeat fires per clock interval, not per record
// batch. SRE-facing liveness is the
// `mirror_v3_destination_offset_verified` gauge and
// the existing `flushed batch` line; the heartbeat is
// primarily diagnostic for the "no records arriving"
// case. DEBUG keeps it discoverable via
// `RUST_LOG=mirror_core=debug` without taking a slot
// in default operator logs.
tracing::debug!(
expected_offset = expected,
progressed,
"heartbeat"
Expand All @@ -446,12 +511,62 @@ where
poll_result = source.poll_one() => {
match poll_result? {
Some(record) => {
if record.source_offset != expected {
return Err(MirrorError::SourceOffsetMismatch {
if record.source_offset < expected {
// Always a hard error: cannot rewrite a
// record we've already committed to the
// destination chain.
return Err(MirrorError::SourceWentBackwards {
expected,
actual: record.source_offset,
got: record.source_offset,
});
}
if record.source_offset > expected {
if sink.allows_compacted_source() {
// `cleanup.policy=compact` leaves
// `LogStartOffset` at 0 even when the
// earliest deliverable record is much
// later; the bootstrap pre-align (which
// uses `low_watermark`) misses this
// case and gaps also surface mid-stream
// every time the broker dropped a
// superseded record. The sink's `write`
// accepts forward gaps under
// `compaction:log` so we only bump the
// local `expected` tracker here. The
// bootstrap-time `align_to_source_low_watermark`
// is still called (with an empty
// buffer) so the first snapshot file's
// `from` reflects the broker's low
// watermark when that path applies.
//
// Not logged per-record: a compacted
// topic can have a gap on every
// delivered record (one per surviving
// key after upstream dedup), so any
// log level here scales with millions
// of lines per restart. Observability
// for gap rate is the dedicated
// counter below — plot a rate or
// alert on a threshold rather than
// reading logs. The startup `loop
// start … compaction="log"` INFO
// line is the one-shot "expect gaps
// here" signal.
let (topic_l, partition_l) = current_labels();
metrics::counter!(
"mirror_v3_source_offset_gap_records_total",
"topic" => topic_l,
"partition" => partition_l,
)
.increment(1);
expected = record.source_offset;
} else {
return Err(MirrorError::SourceGapAboveExpected {
expected,
got: record.source_offset,
});
}
}
sink.write(record).await?;
expected = expected
.checked_add(1)
Expand Down
Loading