diff --git a/crates/mirror-bin/src/main.rs b/crates/mirror-bin/src/main.rs index 0a369eb..ed1224b 100644 --- a/crates/mirror-bin/src/main.rs +++ b/crates/mirror-bin/src/main.rs @@ -9,6 +9,7 @@ use mirror_core::{run_mirror, MetricLabels, MIRROR_LABELS}; use mirror_fs::{FilesystemSink, FilesystemSinkConfig}; use mirror_kafka::{KafkaSink, KafkaSinkConfig, KafkaSource, KafkaSourceConfig}; use mirror_s3::{S3Sink, S3SinkConfig}; +use tracing::Instrument; use tracing_subscriber::EnvFilter; #[derive(Parser)] @@ -687,24 +688,32 @@ async fn spawn_mirror( .map_err(|e| anyhow::anyhow!("opening tee for mirror {name}: {e}"))?; let destinations_log = dest_descriptions.join(","); - Ok(tokio::spawn(async move { - tracing::info!( - mirror = %name, - destinations = %destinations_log, - compaction, - "loop start" - ); - let result = MIRROR_LABELS - .scope( - labels, - run_mirror(source, tee, shutdown_signal(shutdown_rx)), - ) - .await; - match result { - Ok(()) => Ok(()), - Err(e) => Err(anyhow::anyhow!("mirror {name}: {e}")), + // Single span carries `mirror = ` onto every event emitted + // from the spawned task — including the mirror-core logs + // (`starting mirror`, `heartbeat`, etc.) that don't otherwise have + // access to the operator-chosen mirror name. MIRROR_LABELS still + // carries topic+partition for metric labeling separately. + let span = tracing::info_span!("mirror", name = %name); + Ok(tokio::spawn( + async move { + tracing::info!( + destinations = %destinations_log, + compaction, + "loop start" + ); + let result = MIRROR_LABELS + .scope( + labels, + run_mirror(source, tee, shutdown_signal(shutdown_rx)), + ) + .await; + match result { + Ok(()) => Ok(()), + Err(e) => Err(anyhow::anyhow!("mirror {name}: {e}")), + } } - })) + .instrument(span), + )) } async fn open_inner_sink( diff --git a/crates/mirror-core/src/lib.rs b/crates/mirror-core/src/lib.rs index fb09baf..e956cab 100644 --- a/crates/mirror-core/src/lib.rs +++ b/crates/mirror-core/src/lib.rs @@ -106,6 +106,40 @@ pub struct Header { pub value: Option>, } +/// Which buffer trigger caused a blob sink (FS / S3) to flush. Used +/// only as a label on the `flushed batch` log line so operators can +/// tell why a given snapshot was emitted without grepping for the +/// matching threshold in the config. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum FlushTrigger { + /// `flush.max-offsets` reached: the buffer accumulated as many + /// records as the config allows. + MaxOffsets, + /// `flush.max-bytes` reached: the buffered payload size hit the + /// configured byte cap. + MaxBytes, + /// `flush.max-time-ms` elapsed since the first record landed in + /// the buffer. + MaxTime, + /// `flush.daily.at-utc` wall-clock boundary crossed. + Daily, + /// Explicit `Sink::flush` (graceful shutdown, end-of-test, or any + /// other operator-driven flush). + Explicit, +} + +impl FlushTrigger { + pub fn as_str(self) -> &'static str { + match self { + FlushTrigger::MaxOffsets => "max-offsets", + FlushTrigger::MaxBytes => "max-bytes", + FlushTrigger::MaxTime => "max-time", + FlushTrigger::Daily => "daily", + FlushTrigger::Explicit => "explicit", + } + } +} + /// Topic-schema declaration for a record column (`key` or `value`). /// /// This is the runtime representation shared by all sinks. The @@ -225,24 +259,35 @@ pub trait Sink: Send { false } - /// Called by the run loop after the bootstrap branch has decided - /// to skip from this sink's `next_expected_offset()` to the - /// source's `low_watermark` — i.e. when [`Self::allows_compacted_source`] - /// returned true and the source has been compacted past the sink's - /// current durable position. The sink must advance its internal - /// "next expected offset" to `low_watermark` so that: - /// 1. the next [`Self::next_expected_offset`] call returns - /// `low_watermark` (idle-drift check stays consistent); - /// 2. [`Self::write`] accepts the next incoming record at - /// `low_watermark` (the per-record gate stays consistent); - /// 3. blob/file naming reflects `low_watermark` as the new + /// Called by the run loop to advance this sink's internal + /// "next expected offset" to a higher value. The sink must update + /// state so that: + /// 1. the next [`Self::next_expected_offset`] call returns the + /// new value (idle-drift check stays consistent); + /// 2. [`Self::write`] accepts the next incoming record at the + /// new value (the per-record gate stays consistent); + /// 3. blob/file naming reflects the new value as the new /// starting offset for the snapshot range. /// - /// Only ever called when both invariants hold: - /// `allows_compacted_source() == true` and the loop's first call - /// to `next_expected_offset()` returned a value strictly less than - /// `low_watermark`. Default impl is a no-op (sinks that don't - /// override `allows_compacted_source` never see this call). + /// Two call sites today, both guarded by + /// `allows_compacted_source() == true`: + /// + /// - **Bootstrap pre-align.** The run loop's bootstrap branch + /// calls this with the source's `low_watermark` when the + /// source has been compacted/trimmed past the sink's current + /// durable position (`sink.next_expected_offset() < low_watermark`). + /// The argument is therefore the broker's reported low + /// watermark. + /// - **First-delivery alignment.** Inside the run loop, when the + /// broker delivers an offset *above* `expected` (the + /// `cleanup.policy=compact` case where `LogStartOffset = 0` + /// masks the actual deliverable start), this is called with + /// that delivered offset before the record is written. + /// + /// Either way the argument is "the new authoritative start offset", + /// not specifically a watermark. Default impl is a no-op (sinks + /// that don't override `allows_compacted_source` never see this + /// call). async fn align_to_source_low_watermark(&mut self, low_watermark: u64) -> Result<(), SinkError> { let _ = low_watermark; Ok(()) @@ -269,11 +314,23 @@ pub enum MirrorError { Source(#[from] SourceError), #[error(transparent)] Sink(#[from] SinkError), - /// Source delivered a record whose offset does not match what we - /// asked for. Indicates a Kafka client bug, a producer that skipped - /// offsets (impossible in normal Kafka), or a logic error. - #[error("source delivered offset {actual}, expected {expected}")] - SourceOffsetMismatch { expected: u64, actual: u64 }, + /// Source delivered an offset *below* `expected`. Always a hard + /// error: a Kafka client bug, a producer that rewound, or a + /// destination chain that has somehow advanced past the broker. + #[error("source delivered offset {got}, expected at least {expected} (went backwards)")] + SourceWentBackwards { expected: u64, got: u64 }, + /// Source delivered an offset *above* `expected`. Hard error in + /// append mode (would leave a gap in the destination chain). + /// Recoverable under `compaction: log`: the run loop aligns the + /// sink to the delivered offset and continues — the broker's + /// `LogStartOffset` reports 0 for a `cleanup.policy=compact` + /// topic even when the earliest deliverable record is much later + /// (compaction deduplicates by key but does not advance the + /// segment start), so the bootstrap pre-align can only be a hint + /// and the first-delivery offset is the authoritative starting + /// point. + #[error("source delivered offset {got}, expected {expected} (gap above expected)")] + SourceGapAboveExpected { expected: u64, got: u64 }, /// Sink's view of next-expected-offset diverged from what we /// believed while we were idle. Indicates an out-of-band write or /// a topic reset. @@ -436,7 +493,15 @@ where } } => { let progressed = expected - last_heartbeat_offset; - tracing::info!( + // Heartbeat fires per clock interval, not per record + // batch. SRE-facing liveness is the + // `mirror_v3_destination_offset_verified` gauge and + // the existing `flushed batch` line; the heartbeat is + // primarily diagnostic for the "no records arriving" + // case. DEBUG keeps it discoverable via + // `RUST_LOG=mirror_core=debug` without taking a slot + // in default operator logs. + tracing::debug!( expected_offset = expected, progressed, "heartbeat" @@ -446,12 +511,62 @@ where poll_result = source.poll_one() => { match poll_result? { Some(record) => { - if record.source_offset != expected { - return Err(MirrorError::SourceOffsetMismatch { + if record.source_offset < expected { + // Always a hard error: cannot rewrite a + // record we've already committed to the + // destination chain. + return Err(MirrorError::SourceWentBackwards { expected, - actual: record.source_offset, + got: record.source_offset, }); } + if record.source_offset > expected { + if sink.allows_compacted_source() { + // `cleanup.policy=compact` leaves + // `LogStartOffset` at 0 even when the + // earliest deliverable record is much + // later; the bootstrap pre-align (which + // uses `low_watermark`) misses this + // case and gaps also surface mid-stream + // every time the broker dropped a + // superseded record. The sink's `write` + // accepts forward gaps under + // `compaction:log` so we only bump the + // local `expected` tracker here. The + // bootstrap-time `align_to_source_low_watermark` + // is still called (with an empty + // buffer) so the first snapshot file's + // `from` reflects the broker's low + // watermark when that path applies. + // + // Not logged per-record: a compacted + // topic can have a gap on every + // delivered record (one per surviving + // key after upstream dedup), so any + // log level here scales with millions + // of lines per restart. Observability + // for gap rate is the dedicated + // counter below — plot a rate or + // alert on a threshold rather than + // reading logs. The startup `loop + // start … compaction="log"` INFO + // line is the one-shot "expect gaps + // here" signal. + let (topic_l, partition_l) = current_labels(); + metrics::counter!( + "mirror_v3_source_offset_gap_records_total", + "topic" => topic_l, + "partition" => partition_l, + ) + .increment(1); + expected = record.source_offset; + } else { + return Err(MirrorError::SourceGapAboveExpected { + expected, + got: record.source_offset, + }); + } + } sink.write(record).await?; expected = expected .checked_add(1) diff --git a/crates/mirror-core/tests/loop_invariants.rs b/crates/mirror-core/tests/loop_invariants.rs index 65a41b1..2ce9d8a 100644 --- a/crates/mirror-core/tests/loop_invariants.rs +++ b/crates/mirror-core/tests/loop_invariants.rs @@ -63,8 +63,9 @@ fn processes_records_in_order() { } #[test] -fn errors_on_source_offset_gap() { - // Source skips from 10 directly to 12 — must be rejected. +fn errors_on_source_offset_gap_in_append_mode() { + // Source skips from 10 directly to 12 — must be rejected in + // append mode (sink does NOT allow_compacted_source). let source = MockSource::new([ MockSourceEvent::Record(rec(10)), MockSourceEvent::Record(rec(12)), @@ -73,14 +74,102 @@ fn errors_on_source_offset_gap() { let result = drive(run_mirror(source, sink, never())); match result { - Err(MirrorError::SourceOffsetMismatch { expected, actual }) => { + Err(MirrorError::SourceGapAboveExpected { expected, got }) => { assert_eq!(expected, 11); - assert_eq!(actual, 12); + assert_eq!(got, 12); } - other => panic!("expected SourceOffsetMismatch, got {other:?}"), + other => panic!("expected SourceGapAboveExpected, got {other:?}"), } } +#[test] +fn errors_on_source_going_backwards() { + // Source delivers 10 then 9. Always a hard error, in any + // compaction mode — destination chain can't un-commit. + let source = MockSource::new([ + MockSourceEvent::Record(rec(10)), + MockSourceEvent::Record(rec(9)), + ]); + let sink = MockSink::starting_at(10).with_allows_compacted_source(true); + + let result = drive(run_mirror(source, sink, never())); + match result { + Err(MirrorError::SourceWentBackwards { expected, got }) => { + assert_eq!(expected, 11); + assert_eq!(got, 9); + } + other => panic!("expected SourceWentBackwards, got {other:?}"), + } +} + +#[test] +fn compaction_log_accepts_gap_from_compact_only_topic() { + // The scenario the upstream c2e64c11 bootstrap branch misses: + // `cleanup.policy=compact` leaves LogStartOffset = 0 so the + // broker reports low_watermark = 0, the bootstrap pre-align is + // a no-op, the loop seeks(0), and then the broker delivers a + // record at offset 461 (the earliest deliverable record after + // key dedup). Under compaction:log, the gap is legitimate — the + // loop must align the sink to the delivered offset and accept + // the record. + let source = MockSource::new([ + MockSourceEvent::Record(rec(461)), + MockSourceEvent::Error("stop".into()), + ]) + .with_low_watermark(0); // broker reports 0 for compact-only topic + let sink = MockSink::starting_at(0).with_allows_compacted_source(true); + let inspector = WriteInspector::wrap(sink); + let handle = inspector.handle(); + + let result = drive(run_mirror(source, handle, never())); + assert!( + matches!(result, Err(MirrorError::Source(_))), + "loop should exit on the source error after the aligned write, got: {result:?}" + ); + let writes = inspector.into_writes(); + assert_eq!( + writes.iter().map(|r| r.source_offset).collect::>(), + vec![461], + "the delivered record at offset 461 must be accepted under compaction:log \ + even though the bootstrap branch didn't pre-align (low_watermark was 0)" + ); +} + +#[test] +fn compaction_log_accepts_repeated_gaps_mid_stream() { + // Production repro: after the first aligned write at offset 461, + // the broker delivers offset 466 (gap of 4 — keys 462..465 were + // dominated by later records and dropped by compaction). The buffer + // is no longer empty so the old code's mid-stream + // `align_to_source_low_watermark` call tripped the sink's + // empty-buffer invariant. The fix moves gap acceptance into the + // sink's `write` (loosened under compaction:log); the run loop + // just bumps `expected` and writes. + let source = MockSource::new([ + MockSourceEvent::Record(rec(461)), + MockSourceEvent::Record(rec(466)), + MockSourceEvent::Record(rec(470)), + MockSourceEvent::Error("stop".into()), + ]) + .with_low_watermark(0); + let sink = MockSink::starting_at(0).with_allows_compacted_source(true); + let inspector = WriteInspector::wrap(sink); + let handle = inspector.handle(); + + let result = drive(run_mirror(source, handle, never())); + assert!( + matches!(result, Err(MirrorError::Source(_))), + "loop should exit on the source error after the gapped writes, got: {result:?}" + ); + let writes = inspector.into_writes(); + assert_eq!( + writes.iter().map(|r| r.source_offset).collect::>(), + vec![461, 466, 470], + "all delivered records must be accepted under compaction:log; \ + gaps between them are legitimate upstream compaction" + ); +} + #[test] fn errors_on_destination_drift_during_idle() { // After processing offset 10, an idle poll reveals the destination @@ -150,10 +239,14 @@ fn empty_destination_starts_at_zero_and_processes_first_record() { #[test] fn compacted_source_with_compaction_log_advances_to_low_watermark() { // Sink is empty (start = 0) and reports it tolerates a compacted - // source. The broker has trimmed offsets 0..460, so the earliest - // available offset is 461. The loop must seek to 461 and accept - // record 461 as the first record (rather than tripping - // SourceOffsetMismatch on "expected 0, got 461"). + // source. The broker has trimmed offsets 0..460 (delete-records + // case — low_watermark reports the advanced value), so the + // earliest available offset is 461. The loop's bootstrap branch + // must seek to 461 and accept record 461 as the first record + // (rather than tripping SourceGapAboveExpected on "expected 0, got + // 461"). Cf. `compaction_log_accepts_gap_from_compact_only_topic` + // which covers the cleanup.policy=compact case where low_watermark + // stays 0 and the alignment happens at first delivery instead. let source = MockSource::new([ MockSourceEvent::Record(rec(461)), MockSourceEvent::Error("stop".into()), @@ -281,6 +374,7 @@ use std::sync::{Arc, Mutex}; struct WriteInspector { writes: Arc>>, position: Arc>, + allows_compacted_source: bool, } impl WriteInspector { @@ -288,12 +382,14 @@ impl WriteInspector { Self { writes: Arc::new(Mutex::new(sink.writes)), position: Arc::new(Mutex::new(sink.running_position)), + allows_compacted_source: sink.allows_compacted_source, } } fn handle(&self) -> InspectorSink { InspectorSink { writes: Arc::clone(&self.writes), position: Arc::clone(&self.position), + allows_compacted_source: self.allows_compacted_source, } } fn into_writes(self) -> Vec { @@ -307,6 +403,7 @@ impl WriteInspector { struct InspectorSink { writes: Arc>>, position: Arc>, + allows_compacted_source: bool, } #[async_trait::async_trait] @@ -316,14 +413,32 @@ impl mirror_core::Sink for InspectorSink { } async fn write(&mut self, record: Record) -> Result<(), SinkError> { let mut pos = self.position.lock().unwrap(); - if record.source_offset != *pos { + if record.source_offset < *pos { + return Err(SinkError::UnexpectedPosition { + expected: *pos, + actual: record.source_offset, + }); + } + if !self.allows_compacted_source && record.source_offset != *pos { return Err(SinkError::UnexpectedPosition { expected: *pos, actual: record.source_offset, }); } - *pos += 1; + // Mirrors mirror-fs / mirror-s3 semantics: append mode is + // strict, compaction:log accepts forward gaps and bumps the + // tracker to the delivered offset + 1. + *pos = record.source_offset + 1; self.writes.lock().unwrap().push(record); Ok(()) } + fn allows_compacted_source(&self) -> bool { + self.allows_compacted_source + } + async fn align_to_source_low_watermark(&mut self, low_watermark: u64) -> Result<(), SinkError> { + // Same semantics as MockSink's impl: bump running position so + // the next write() at `low_watermark` succeeds. + *self.position.lock().unwrap() = low_watermark; + Ok(()) + } } diff --git a/crates/mirror-fs/src/lib.rs b/crates/mirror-fs/src/lib.rs index b2f7202..310d813 100644 --- a/crates/mirror-fs/src/lib.rs +++ b/crates/mirror-fs/src/lib.rs @@ -24,7 +24,7 @@ use std::sync::Arc; use std::time::{Duration, Instant}; use async_trait::async_trait; -use mirror_core::{Record, Sink, SinkError}; +use mirror_core::{FlushTrigger, Record, Sink, SinkError}; use mirror_envelope::{ColumnType, Format, ParquetCompression}; use tokio::io::AsyncWriteExt; @@ -221,7 +221,7 @@ impl FilesystemSink { // boundary is *always* advanced so we don't fire repeatedly // until tomorrow. if !self.buffer.is_empty() { - self.flush_locked().await?; + self.flush_locked(FlushTrigger::Daily).await?; } let mut t = next; let now = (self.clock)(); @@ -236,27 +236,63 @@ impl FilesystemSink { if self.buffer.is_empty() { return Ok(()); } - self.flush_locked().await + self.flush_locked(FlushTrigger::Explicit).await } - fn should_flush(&self) -> bool { + /// Lowest source-offset the sink will accept on the next `write`. + /// Under append mode this is `durable_position + buffer.len()` (the + /// chain is contiguous). Under `compaction:log` the buffer may + /// have gaps so the head is `last_buffered.source_offset + 1` + /// (or `durable_position` when the buffer is empty). + fn buffered_head(&self) -> u64 { + match self.compaction { + Some(CompactionMode::Log) => self + .buffer + .last() + .map(|r| r.source_offset + 1) + .unwrap_or(self.durable_position), + _ => self.durable_position + self.buffer.len() as u64, + } + } + + fn should_flush(&self) -> Option { if self.buffer.is_empty() { - return false; + return None; + } + if self.buffer.len() as u64 >= self.flush.max_offsets { + return Some(FlushTrigger::MaxOffsets); } - let by_count = self.buffer.len() as u64 >= self.flush.max_offsets; - let by_bytes = self.buffer_bytes >= self.flush.max_bytes; - let by_time = self + if self.buffer_bytes >= self.flush.max_bytes { + return Some(FlushTrigger::MaxBytes); + } + if self .buffer_started .map(|t| t.elapsed() >= self.flush.max_time) - .unwrap_or(false); - by_count || by_bytes || by_time + .unwrap_or(false) + { + return Some(FlushTrigger::MaxTime); + } + None } - async fn flush_locked(&mut self) -> Result<(), SinkError> { + async fn flush_locked(&mut self, trigger: FlushTrigger) -> Result<(), SinkError> { debug_assert!(!self.buffer.is_empty()); let flush_started = Instant::now(); let from = self.durable_position; - let to = self.durable_position + self.buffer.len() as u64 - 1; + let to = match self.compaction { + // Under compaction:log the buffer may contain gaps in its + // source-offset sequence (records dropped by upstream + // compaction are skipped on delivery, not buffered as + // tombstones), so `durable + len - 1` would underflow the + // actual high-water. Use the last buffered record's + // source_offset directly. + Some(CompactionMode::Log) => self + .buffer + .last() + .map(|r| r.source_offset) + .expect("buffer non-empty by debug_assert above"), + _ => self.durable_position + self.buffer.len() as u64 - 1, + }; let count = self.buffer.len(); let ext = self.format.extension(); let final_name = naming::batch_filename(from, to, ext); @@ -352,6 +388,7 @@ impl FilesystemSink { bytes = encoded_len, elapsed_ms, interval_ms, + trigger = trigger.as_str(), "flushed batch" ); Ok(()) @@ -381,13 +418,26 @@ impl Sink for FilesystemSink { actual: on_disk, }); } - Ok(self.durable_position + self.buffer.len() as u64) + Ok(self.buffered_head()) } async fn write(&mut self, record: Record) -> Result<(), SinkError> { self.tick_daily().await?; - let expected = self.durable_position + self.buffer.len() as u64; - if record.source_offset != expected { + let expected = self.buffered_head(); + // Backwards is always a hard error. + if record.source_offset < expected { + return Err(SinkError::UnexpectedPosition { + expected, + actual: record.source_offset, + }); + } + // Append mode also rejects forward gaps (the destination + // chain forbids holes). Under compaction:log forward gaps + // are legitimate — the upstream may have compacted the + // intermediate offsets out and the snapshot only stores + // latest-per-key. + if !matches!(self.compaction, Some(CompactionMode::Log)) && record.source_offset != expected + { return Err(SinkError::UnexpectedPosition { expected, actual: record.source_offset, @@ -439,8 +489,8 @@ impl Sink for FilesystemSink { if self.buffer_started.is_none() { self.buffer_started = Some(Instant::now()); } - if self.should_flush() { - self.flush_locked().await?; + if let Some(trigger) = self.should_flush() { + self.flush_locked(trigger).await?; } Ok(()) } diff --git a/crates/mirror-s3/src/lib.rs b/crates/mirror-s3/src/lib.rs index 93e3884..756d869 100644 --- a/crates/mirror-s3/src/lib.rs +++ b/crates/mirror-s3/src/lib.rs @@ -22,7 +22,7 @@ use std::time::{Duration, Instant}; use async_trait::async_trait; use bytes::Bytes; use futures::StreamExt; -use mirror_core::{Record, Sink, SinkError}; +use mirror_core::{FlushTrigger, Record, Sink, SinkError}; use mirror_envelope::{ColumnType, Format, ParquetCompression}; use mirror_fs::naming; use object_store::path::Path; @@ -185,7 +185,7 @@ impl S3Sink { return Ok(()); } if !self.buffer.is_empty() { - self.flush_locked().await?; + self.flush_locked(FlushTrigger::Daily).await?; } let mut t = next; let now = (self.clock)(); @@ -200,26 +200,61 @@ impl S3Sink { if self.buffer.is_empty() { return Ok(()); } - self.flush_locked().await + self.flush_locked(FlushTrigger::Explicit).await } - fn should_flush(&self) -> bool { + /// Lowest source-offset the sink will accept on the next `write`. + /// Append mode: `durable_position + buffer.len()` (contiguous chain). + /// Compaction:log: `last_buffered.source_offset + 1` (or + /// `durable_position` when the buffer is empty), so the buffer may + /// carry gaps in its source-offset sequence — see mirror-fs. + fn buffered_head(&self) -> u64 { + match self.compaction { + Some(CompactionMode::Log) => self + .buffer + .last() + .map(|r| r.source_offset + 1) + .unwrap_or(self.durable_position), + _ => self.durable_position + self.buffer.len() as u64, + } + } + + fn should_flush(&self) -> Option { if self.buffer.is_empty() { - return false; + return None; + } + if self.buffer.len() as u64 >= self.flush.max_offsets { + return Some(FlushTrigger::MaxOffsets); + } + if self.buffer_bytes >= self.flush.max_bytes { + return Some(FlushTrigger::MaxBytes); + } + if self + .buffer_started + .map(|t| t.elapsed() >= self.flush.max_time) + .unwrap_or(false) + { + return Some(FlushTrigger::MaxTime); } - self.buffer.len() as u64 >= self.flush.max_offsets - || self.buffer_bytes >= self.flush.max_bytes - || self - .buffer_started - .map(|t| t.elapsed() >= self.flush.max_time) - .unwrap_or(false) + None } - async fn flush_locked(&mut self) -> Result<(), SinkError> { + async fn flush_locked(&mut self, trigger: FlushTrigger) -> Result<(), SinkError> { debug_assert!(!self.buffer.is_empty()); let flush_started = Instant::now(); let from = self.durable_position; - let to = self.durable_position + self.buffer.len() as u64 - 1; + let to = match self.compaction { + // Under compaction:log the buffer may contain gaps in its + // source-offset sequence; the snapshot covers everything + // from `durable_position` through the highest-seen offset. + // See mirror-fs flush_locked for the same reasoning. + Some(CompactionMode::Log) => self + .buffer + .last() + .map(|r| r.source_offset) + .expect("buffer non-empty by debug_assert above"), + _ => self.durable_position + self.buffer.len() as u64 - 1, + }; let count = self.buffer.len(); let buffered_bytes = self.buffer_bytes; let name = naming::batch_filename(from, to, self.format.extension()); @@ -308,6 +343,7 @@ impl S3Sink { encoded_bytes, elapsed_ms, interval_ms, + trigger = trigger.as_str(), "flushed batch" ); Ok(()) @@ -343,13 +379,25 @@ impl Sink for S3Sink { actual: on_remote, }); } - Ok(self.durable_position + self.buffer.len() as u64) + Ok(self.buffered_head()) } async fn write(&mut self, record: Record) -> Result<(), SinkError> { self.tick_daily().await?; - let expected = self.durable_position + self.buffer.len() as u64; - if record.source_offset != expected { + let expected = self.buffered_head(); + // Backwards is always a hard error. + if record.source_offset < expected { + return Err(SinkError::UnexpectedPosition { + expected, + actual: record.source_offset, + }); + } + // Append mode rejects forward gaps too; compaction:log accepts + // them (the snapshot only stores latest-per-key, so the + // upstream-compacted intermediate offsets are legitimately + // absent). See mirror-fs::write for the same logic. + if !matches!(self.compaction, Some(CompactionMode::Log)) && record.source_offset != expected + { return Err(SinkError::UnexpectedPosition { expected, actual: record.source_offset, @@ -395,8 +443,8 @@ impl Sink for S3Sink { if self.buffer_started.is_none() { self.buffer_started = Some(Instant::now()); } - if self.should_flush() { - self.flush_locked().await?; + if let Some(trigger) = self.should_flush() { + self.flush_locked(trigger).await?; } Ok(()) } diff --git a/e2e/tests/compacted_source_without_compaction_fails_clearly.rs b/e2e/tests/compacted_source_without_compaction_fails_clearly.rs index 07a313a..916e1bb 100644 --- a/e2e/tests/compacted_source_without_compaction_fails_clearly.rs +++ b/e2e/tests/compacted_source_without_compaction_fails_clearly.rs @@ -7,6 +7,11 @@ //! `SourceOffsetMismatch` after polling — would leave operators //! debugging a gap in the destination chain that mirror-v3 knew //! about all along. +//! +//! Naming note: the "old `SourceOffsetMismatch`" referenced in the +//! body of this file has since been split into `SourceWentBackwards` +//! and `SourceGapAboveExpected`; the latter is the variant that +//! would fire here today if the bootstrap branch didn't pre-empt it. use std::time::Duration;