Skip to content

handle cleanup.policy=compact and reduce log verbosity#1

Merged
solsson merged 3 commits into
mainfrom
fix-compaction-only-source-delivers-gap
Jun 5, 2026
Merged

handle cleanup.policy=compact and reduce log verbosity#1
solsson merged 3 commits into
mainfrom
fix-compaction-only-source-delivers-gap

Conversation

@solsson

@solsson solsson commented Jun 5, 2026

Copy link
Copy Markdown
Contributor

This PR started as a single bugfix but is growing based on actual pre-production use. The initial issue was:

source delivered offset N, expected 0 on a real compacted topic, with compaction: log

The compaction:log mirror's bootstrap branch (c2e64c1) handles
delete-records-trimmed topics, but not cleanup.policy=compact-only
topics. The two have different Kafka semantics for log_start_offset /
low_watermark, and the bug presents on the latter.

Symptom

mirror with compaction: log against a topic with cleanup.policy=compact, retention.ms=-1, no manual delete-records. On startup:

INFO  mirror_core: heartbeat expected_offset=0 progressed=0
ERROR mirror_v3: mirror task errored; exiting process mirror=mymirror
ERROR mirror_v3: mirror exited with error
        error=mirror mymirror: source delivered offset 461, expected 0

source_low_watermark=0 is NOT a fetch_watermarks bug — the broker
genuinely reports 0 for a cleanup.policy=compact topic, see below.
fetch_high_watermark works (1215814 is the broker's actual HWM); the
fix in 7fa70e7 didn't help here because there's nothing for it to fetch
differently.

Why delete-records and cleanup.policy=compact look different

rd_kafka_query_watermark_offsets's low is the broker's
LogStartOffset. The two cleanup policies advance it differently:

Cleanup policy Advances LogStartOffset? delivered after seek(0)
delete by retention.ms / .bytes first surviving offset
compact,delete by retention + DeleteRecords first surviving offset
compact only no — segment start stays at 0 first surviving offset (gap)

For compact-only, the broker keeps LogStartOffset = 0 because it
hasn't deleted anything — it has deduplicated by key within the
segment. The fetch from offset 0 silently jumps forward to whatever
surviving record exists next; the client sees a gap, not an error.

The c2e64c1 bootstrap branch keys off sink_start < low_watermark:

let expected = if sink_start < low_watermark {
    if sink.allows_compacted_source() {
        sink.align_to_source_low_watermark(low_watermark).await?;
        low_watermark
    } else {
        return Err(MirrorError::SourceCompactedBelowExpected {});
    }
} else {
    sink_start
};
source.seek(expected).await?;

For our case, sink_start=0, low_watermark=0, the alignment branch is
skipped, the mirror seeks 0, the broker delivers 461, the gap check
fires, the mirror exits. Same crash the commit set out to prevent —
just via a different cleanup policy.

Why the e2e doesn't catch this

e2e/tests/compacted_source_with_compaction_log.rs:

// Topic created with cleanup.policy=compact,delete —
// the only combined policy on which Redpanda accepts manual
// DeleteRecords.

…and then it advances the low via delete-records. The commit message
even calls this out:

E2e coverage uses delete-records as a deterministic stand-in for
broker-side compaction (the consumer sees an identical post-trim
low watermark).

That premise is the bug. Post-delete-records the broker reports the
advanced low watermark; after compaction alone it doesn't.
kafka_source_low_watermark_contract.rs has the same shape — calls
trim_records_before(...) — so neither test exercises the real
compaction path.

Note: kafka_source_low_watermark_contract.rs should stay — it
documents the broker contract for the post-delete-records case,
which is the path 7fa70e7 fixed and remains the right behaviour. The
new test proposed below (compaction-only repro) complements it, it
doesn't replace it. Keeping both pins each path explicitly so a future
maintainer reading the suite can tell delete-records and pure
compaction apart without re-deriving the semantics.

Repro recipe

Topic spec (single partition for brevity):

cleanup.policy=compact
retention.ms=-1
retention.bytes=-1
max.message.bytes=500006
segment.ms=<small enough to force a roll>
min.cleanable.dirty.ratio=0.01
  1. Produce 1000 records over enough keys that compaction has work
    (e.g. cycle through 50 keys with values "v{i}").
  2. Force segment rolls (close-segment producer fence or
    rpk topic alter-config segment.ms=1).
  3. Wait for the log-cleaner to compact — verify the on-disk segment
    file is now smaller than the original record count.
  4. rpk topic describe --print-partitions — confirm
    start-offset = 0, end-offset = 1000, AND the actual earliest
    readable offset is > 0 (verify by consuming from offset 0 and
    noting where the first delivered record lands).
  5. Point a compaction: log mirror at the topic. Expect crash with
    source delivered offset N, expected 0.

Step 3 is the bit the existing e2e is missing. Adding a real-Kafka
compaction stack to the harness is the cleanest fix, but a single-broker
Redpanda with min.cleanable.dirty.ratio low + a forced segment roll
should reproduce the LogStartOffset=0 semantics deterministically.

Proposed fix

Under compaction: log, both the per-record gate in
run_mirror_with_heartbeat and the sink's own write invariant
should treat expected as a minimum, not an exact match. The
bootstrap branch's low_watermark inspection is still a useful hint
(it lets us pre-align before the first delivery on delete-records
topics, avoiding a fetch round-trip), but it can't be authoritative
for cleanup.policy=compact topics — only the broker's first
delivered offset can. And once the first record lands in the sink's
buffer, every subsequent compaction-dropped key produces another
forward gap that has to be tolerated mid-stream.

There's a tempting "minimal" version of this fix that only touches
run_mirror_with_heartbeat: detect the forward gap and call
Sink::align_to_source_low_watermark(delivered) to push the sink's
durable position forward. That works exactly once (at bootstrap, when
the buffer is empty); on the second gap the sink's empty-buffer
invariant trips with align_to_source_low_watermark called in inconsistent state: buffer=1 durable=461 low_watermark=466. The
production repro hits this on the second record — keys 462..465 were
dominated by later records and dropped by compaction, so the broker
jumps from 461 to 466 immediately.

So the gap-acceptance has to move into the sink. Split into:

  1. MirrorError::SourceOffsetMismatch -> two variants.
    SourceWentBackwards { expected, got } (delivered < expected;
    hard error in all modes — the destination chain can't un-commit)
    and SourceGapAboveExpected { expected, got } (delivered >
    expected; hard error in append mode, accepted in compaction:log).
    The blanket name conflated two very different invariants.

  2. mirror-core run loop (crates/mirror-core/src/lib.rs:446,
    the poll_result = source.poll_one() arm of the existing
    tokio::select!): replace the strict equality check with the
    three-way branch above. ~10 lines of additional logic inside
    the existing Some(record) => { … } block; shutdown and
    heartbeat arms unchanged. No call to
    align_to_source_low_watermark from this path — the gap goes
    straight to sink.write().

  3. mirror-fs / mirror-s3 sinks (write + next_expected_offset +
    flush_locked): derive the floor via a small buffered_head()
    helper that reads last_buffered.source_offset + 1 under
    compaction:log (or durable_position when the buffer is empty),
    falling back to the existing durable + len shape in append
    mode. write rejects < floor; rejects > floor only in append
    mode. flush_locked derives the snapshot's to from the highest
    buffered source-offset under compaction:log so the
    <from>-<to>.<ext> filename reflects the actual offset span the
    snapshot covers, gaps and all.

  4. Sink::align_to_source_low_watermark stays bootstrap-only.
    Its empty-buffer precondition is now correct again — only the
    bootstrap branch calls it, and only when the sink really is
    empty. Doc comment updated to make the constraint explicit.

Sketch (run loop):

while let Some(record) = source.poll_one().await? {
    if record.source_offset < expected {
        return Err(MirrorError::SourceWentBackwards {
            expected, got: record.source_offset,
        });
    }
    if record.source_offset > expected {
        if sink.allows_compacted_source() {
            // A counter, not a log line — see "Observability"
            // below for the rationale.
            let (topic, partition) = current_labels();
            metrics::counter!(
                "mirror_v3_source_offset_gap_records_total",
                "topic" => topic, "partition" => partition,
            ).increment(1);
            expected = record.source_offset;
        } else {
            return Err(MirrorError::SourceGapAboveExpected {
                expected, got: record.source_offset,
            });
        }
    }
    sink.write(record).await?;
    expected = expected.checked_add(1).expect("source offset overflowed u64");
}

Sketch (sink write, same shape for mirror-fs and mirror-s3):

let expected = self.buffered_head(); // helper, mode-aware
if record.source_offset < expected {
    return Err(SinkError::UnexpectedPosition { expected, actual: record.source_offset });
}
if !matches!(self.compaction, Some(CompactionMode::Log))
    && record.source_offset != expected {
    return Err(SinkError::UnexpectedPosition { expected, actual: record.source_offset });
}
// proceed

Key properties this preserves:

  • Append-mode mirrors still fail loud on any gap. Both layers
    enforce the corrupt-chain invariant for non-compaction sinks.
  • No silent gap acceptance for log-compaction restart drift.
    Forward gaps only land under compaction:log AND
    allows_compacted_source() — the operator's explicit opt-in.
  • The bootstrap pre-align stays as an optimisation. For
    delete-records and delete policies, low_watermark > 0 still
    lets us pre-seek without paying a delivery round-trip.
  • align_to_source_low_watermark stays narrow. No call-site or
    signature changes; its empty-buffer invariant stays load-bearing.

Idle-drift check (on_disk == durable) under compaction:log is
already loose per the c2e64c1 change, so it should not need further
adjustment.

E2e coverage gap

The existing tests need a real compaction reproducer, not a
delete-records stand-in. Two routes:

  1. Real compaction in single-broker Redpanda. Tunable knobs:
    log_compaction_interval_ms (default 10000), segment.ms,
    min.cleanable.dirty.ratio. After producing + rolling segments,
    poll until segment file shrinks before asserting low/high. Flakier
    than delete-records but exercises the actual code path. Probably
    kafka_source_low_watermark_reports_zero_on_compaction_only to
    document the broker contract (asserting low == 0 after real
    compaction), plus
    mirror_compaction_log_handles_gap_from_seek_zero covering the
    delivered-above-expected alignment path proposed above.

  2. Multi-broker Apache Kafka stack variant in the harness. The
    maintainer's 7fa70e7 commit message already flags this as a missing
    piece. Worth doing for its own sake (catches multi-broker metadata
    races), and incidentally would expose the compaction-only gap with
    a more realistic policy mix.

Workarounds available to operators today

  • cleanup.policy=compact,delete + DeleteRecords to manually
    advance the broker's LogStartOffset past the compaction gap. Loses
    the benefit of compaction-as-archive (records are now genuinely
    deleted, not just deduplicated), so only useful as a one-off
    cleanup before the mirror's first run.
  • Seed the destination chain with an empty snapshot file whose
    name encodes the broker's actual earliest deliverable offset
    (<from>-<from>.parquet). The mirror's next_expected_offset then
    reads as that value, the bootstrap branch's sink_start >= low_watermark check sees no compaction (correctly), and the run
    loop's gap check matches the delivered offset.
    Brittle (depends on internal blob-naming behaviour) and operator-only.
  • Switch the mirror to a kafka destination temporarily until the
    fix lands, accepting that the parquet archive side is unavailable.

None of these are good defaults. The proposed fix removes the need.


Reported against Yolean/mirror-v3 main @ af21853
(c2e64c1 + 7fa70e7 + …) by a downstream consumer.

solsson and others added 3 commits June 5, 2026 10:25
The bootstrap path from c2e64c1 handled `delete-records`-trimmed
sources but not `cleanup.policy=compact`-only sources. The two have
different broker semantics: `delete-records` advances LogStartOffset,
compact alone does not. `query_watermark_offsets` therefore returns
the advanced value for the first case but 0 for the second, even
though compaction has dropped the early offsets. The gap also shows
up mid-stream every time the broker delivers a key whose intermediate
offsets were superseded by later records.

Symptom (see PR description for the full repro and analysis):

  starting mirror start_offset=0 sink_next_expected=0
                  source_low_watermark=0 compaction="log"
  ERROR mirror exited with error
        error=mirror <name>: source delivered offset 461, expected 0

The bootstrap pre-align saw 0 < 0, skipped, the loop seeked(0), the
broker delivered 461 (the earliest surviving record after key dedup),
and the strict equality gate fired.

This commit makes both the per-record gate in the run loop AND the
sink's `write` treat `expected` as a *minimum* under compaction:log:

  - mirror-core run loop:
      delivered < expected            -> SourceWentBackwards
      delivered > expected, append    -> SourceGapAboveExpected
      delivered > expected, log       -> warn + bump expected, write
      delivered == expected           -> unchanged
  - mirror-fs / mirror-s3 sinks:
      `write()` derives the floor via a new `buffered_head` helper
      which reads `last_buffered.source_offset + 1` under
      compaction:log (or `durable_position` when the buffer is
      empty). Backwards is always an error; forward gaps are rejected
      only in append mode.
      `flush_locked()` derives the snapshot file's `to` from the
      highest buffered source-offset under compaction:log instead of
      `durable + len - 1`, so the on-disk `<from>-<to>` range
      reflects the actual gap span.
      `next_expected_offset()` returns the new helper's value too.

The earlier draft of this fix called `align_to_source_low_watermark`
from the run loop on every mid-stream gap; that tripped the sink's
empty-buffer invariant on the second gap. Moving gap acceptance
into the sink keeps `align_to_source_low_watermark` what it was —
a bootstrap-only, empty-buffer operation — and lets the run loop
stay just a per-record gate.

Other changes:

  - Splits the previous blanket SourceOffsetMismatch into two named
    variants. The conflation made the new gap-acceptance path
    indistinguishable from the genuine corruption case in error
    messages and downstream matching.
  - Updates the doc comment on Sink::align_to_source_low_watermark
    to clarify it stays bootstrap-only.
  - Adds three unit tests to crates/mirror-core/tests/loop_invariants.rs:
      `errors_on_source_going_backwards` (SourceWentBackwards under
        compaction:log)
      `compaction_log_accepts_gap_from_compact_only_topic`
        (bootstrap-time gap; broker reports low_watermark = 0 but
        delivers 461)
      `compaction_log_accepts_repeated_gaps_mid_stream` (production
        repro: deliveries at 461, 466, 470 — the case the earlier
        align-mid-stream attempt missed)
  - WriteInspector in the test harness now forwards
    allows_compacted_source and accepts forward gaps under
    compaction:log so the new tests can exercise the full path
    through the run loop.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Four operator-facing log changes, no behavioural impact:

  - **Per-mirror span on every event.** mirror-bin wraps each
    spawned mirror task in `tracing::info_span!("mirror", name=%name)`
    via `.instrument(span)`, so `mirror=<name>` appears on every
    event emitted from mirror-core (`starting mirror`, `heartbeat`,
    …) and the sinks without each call site having to thread the
    name. MIRROR_LABELS still carries topic+partition for metric
    labeling separately.

  - **Flush trigger label.** New `pub enum FlushTrigger {
    MaxOffsets, MaxBytes, MaxTime, Daily, Explicit }` in mirror-core.
    `should_flush` in mirror-fs and mirror-s3 returns
    `Option<FlushTrigger>`; `flush_locked` takes the trigger as a
    parameter; the `flushed batch` log carries `trigger=<reason>`
    so operators can tell why a snapshot was emitted without
    cross-referencing the config thresholds.

  - **Gap counter instead of per-record log.** On a heavily-compacted
    source, the mid-stream gap path can fire on every delivered
    record (one per surviving key after upstream dedup) — any log
    level there scales with ~10^6 lines per restart. Replaced with
    `mirror_v3_source_offset_gap_records_total{topic,partition}`,
    a counter operators can graph or alert on. The startup
    `loop start … compaction="log"` INFO line is still the
    one-shot "expect gaps here" signal.

  - **Heartbeat to DEBUG.** It fires per clock interval, not per
    record batch, and the `mirror_v3_destination_offset_verified`
    gauge plus the `flushed batch` line already cover liveness for
    SREs. DEBUG keeps it discoverable via
    `RUST_LOG=mirror_core=debug` without taking a slot in default
    operator logs.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
CI's `cargo fmt --all -- --check` failed on three line-break choices
rustfmt would have rewritten — the `async fn align_to_source_low_watermark`
signature in the mock sink, and the matches!-and-condition guards in
mirror-fs::write and mirror-s3::write. No functional change; the
diff is whitespace-only.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
@solsson solsson changed the title WIP fix issues found at first downstream use handle cleanup.policy=compact and reduce log verbosity Jun 5, 2026
@solsson solsson merged commit 478997f into main Jun 5, 2026
3 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant