Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
58ccee9
test: sink-trait matrix + loop_invariants against real FilesystemSink
Jun 5, 2026
0a3f747
test: discoverable ignored-tests for documented coverage gaps
Jun 5, 2026
fb322af
test: palette of testability enablers for TDD-style spec authoring
Jun 5, 2026
ed6041c
proposes a webhook feature to close the KKV gap
solsson Jun 5, 2026
0610422
notify: phase 1 — config types + validation for the `notify` block
Jun 5, 2026
5932458
notify: phase 2 — Notifier trait + NoOpNotifier wired into run loop
Jun 5, 2026
98fdd48
notify: phase 3a+3b — kkv-v1 dispatcher with outcome-driven retry
Jun 5, 2026
d0cca71
notify: phase 3c — source-consume debounce buffer
Jun 5, 2026
431e14a
notify: phase 5 — wire KkvV1Notifier into mirror-bin
Jun 5, 2026
67cdbf3
notify: phase 3d — DNS-A fan-out for K8s headless services
Jun 5, 2026
7e7b620
notify: phase 4a — FlushObserver trait + FS/S3/Tee wiring
Jun 5, 2026
b689e21
notify: phase 4b — FlushDispatcher for trigger.on: destination-flush
Jun 5, 2026
34610aa
notify: phase 6 — e2e tests for source-consume + destination-flush
Jun 5, 2026
a13c74d
cache: serve /q/health/ready alias for kkv-client onReady compat
solsson Jun 5, 2026
d0780e3
ci: single-arch image build on PRs
Jun 6, 2026
c7aa408
style: drop mdash and phase references per CLAUDE.md
Jun 6, 2026
41fbb96
notify-kkv: include the `v: 1` protocol-version field in the body
solsson Jun 6, 2026
5ef7c9e
notify-kkv: suppress until per-mirror bootstrap_hwm
solsson Jun 6, 2026
0905f9d
cache-v1: per-mirror paths + cache-v1-main singleton alias
solsson Jun 6, 2026
1978fb8
e2e: catch the harness up to the cache-v1 + suppression API changes
Jun 7, 2026
129e0d7
core: add Source::commit_through + Source::fetch_committed_offset
Jun 7, 2026
16bdd79
kafka: implement Source::commit_through + fetch_committed_offset
Jun 7, 2026
b501c0f
core: add WriteObserver + AckSink + KafkaSink/MockSink wiring
Jun 7, 2026
ad4fcd2
notify-kkv: wire AckSink into KkvV1Notifier + FlushDispatcher
Jun 7, 2026
f825821
mirror-bin: per-mirror AckTracker + periodic source-commit task
Jun 7, 2026
1406613
core: per-mirror suppression_threshold from committed offset
Jun 7, 2026
66cb8b9
core: MirrorStatus enum replaces caught_up: AtomicBool
Jun 7, 2026
4150af5
mirror-bin: per-mirror readiness poller (lag + assignment)
Jun 7, 2026
d6fd3cd
config: per-destination affects-readiness field (default true)
Jun 7, 2026
fb8ccb1
cache: structured /q/health/ready JSON + 503 body on per-mirror routes
Jun 7, 2026
6055e33
e2e: dev2 reproducer for between-pods notify gap
Jun 7, 2026
4cd7697
docs: deployment-strategy requirement + structured readiness
Jun 7, 2026
731d53f
mirror-bin: register every enabled mirror, not just slot-needing ones
solsson Jun 7, 2026
0c60257
e2e: cache_v1 spec assertion follows the per-mirror path rename
solsson Jun 8, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 10 additions & 2 deletions .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
name: ci
# Same shape as Yolean/envoyimage's echo.yaml: separate verify and
# publish phases, image push gated on the full e2e suite passing
# first. (No upstream-image cron job there's nothing for this repo
# first. (No upstream-image cron job; there's nothing for this repo
# to mirror in the registry sense.)
#
# Third-party actions are pinned to a 40-char commit SHA with the
Expand Down Expand Up @@ -102,7 +102,15 @@ jobs:
uses: docker/build-push-action@bcafcacb16a39f128d818304e6c9c0c18556b85f # v7.1.0
with:
context: .
platforms: linux/amd64,linux/arm64
# PR/QA runs build linux/amd64 only; the arm64 leg runs under
# QEMU on amd64 runners and dominates wall time (Rust +
# librdkafka cross-compile). Push events build both arches
# because releases ship multi-arch. See issue #2.
# GHA's per-branch cache scope means PR caches don't warm main
# anyway, so dropping arm64 from PR runs is the simplest
# effective fix; switching to a registry-backed cache would
# share across branches but needs PR write access to ghcr.
platforms: ${{ github.event_name == 'pull_request' && 'linux/amd64' || 'linux/amd64,linux/arm64' }}
push: ${{ github.event_name != 'pull_request' }}
tags: ${{ steps.meta.outputs.tags }}
labels: ${{ steps.meta.outputs.labels }}
Expand Down
28 changes: 28 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ members = [
"crates/mirror-kafka",
"crates/mirror-fs",
"crates/mirror-s3",
"crates/mirror-notify-kkv",
"crates/mirror-bin",
"crates/xtask",
"e2e",
Expand All @@ -28,6 +29,7 @@ mirror-envelope = { path = "crates/mirror-envelope" }
mirror-kafka = { path = "crates/mirror-kafka" }
mirror-fs = { path = "crates/mirror-fs" }
mirror-s3 = { path = "crates/mirror-s3" }
mirror-notify-kkv = { path = "crates/mirror-notify-kkv" }

serde = { version = "1", features = ["derive"] }
serde_json = "1"
Expand Down Expand Up @@ -60,6 +62,7 @@ utoipa = { version = "5", features = ["axum_extras"] }
utoipa-axum = "0.2"
utoipa-scalar = { version = "0.3", features = ["axum"] }
reqwest = { version = "0.12", default-features = false, features = ["rustls-tls"] }
url = "2"
indexmap = "2"

[profile.release]
Expand Down
4 changes: 2 additions & 2 deletions KAFKA_KEYVALUE_DROPIN_REPLACEMENT.md
Original file line number Diff line number Diff line change
Expand Up @@ -188,9 +188,9 @@ parity with KKV.
| ------------------------------------------- | ------------------------------------- |
| onupdate webhook dispatcher | mirror-v3 does not implement (deferred to a future PR). If a current dependent uses Yolean's KKV in sidecar mode and relies on onupdate, mirror-v3 is **not** a drop-in for them yet. |
| `POST /_admin/v1/shutdown[/{exitcode}]` | mirror-v3 has it; not compared |
| `/q/health` / `/q/health/ready` (Quarkus) | mirror-v3 does not implement; we expose `/metrics` (Prometheus) on the metrics port instead |
| `/q/health/ready` (Quarkus) | mirror-v3 implements as a drop-in: same path, same `200`/`503` codes, plus a structured `ReadinessReport` JSON body that names any unhealthy mirror by status enum. Existing `@yolean/kafka-keyvalue` Node clients work unchanged. `/q/health` (the wider SmallRye umbrella) is not implemented; we expose `/metrics` (Prometheus) on the metrics port instead |
| Multi-partition `/cache/v1/offset/{t}/{p}` | the fixture topic uses 1 partition; the multi-partition case is unit-tested in `mirror-cache`'s handler tests |
| Readiness 503 timing | both serve 503 before catch-up, sticky after; deeper compare would need a controlled-rate producer |
| Readiness 503 timing | KKV: `caught_up` flips false→true once and sticks. mirror-v3: non-sticky — tracks per-mirror lag against the broker high-watermark, source-partition assignment, and per-destination flush progress; falls back to 503 if any of those degrades. Plus a per-destination YAML opt-out (`affects-readiness: false`) for best-effort secondary sinks. |

## Open

Expand Down
65 changes: 58 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -65,16 +65,18 @@ A minimal PodMonitor for the checkit chart points at port 9090; the standard pro

### `/cache/v1` (drop-in for `Yolean/kafka-keyvalue`)

Per-mirror opt-in via `http-access: { api: cache-v1 }`. When at least one mirror has it set, `mirror-v3 run` starts a second HTTP server on `0.0.0.0:8080` (override with `MIRROR_V3_CACHE_PORT`) that exposes the KKV `/cache/v1` surface:
Per-mirror opt-in via `http-access: { cache-v1: {} }`. When at least one mirror has it set, `mirror-v3 run` starts a second HTTP server on `0.0.0.0:8080` (override with `MIRROR_V3_CACHE_PORT`) that exposes the KKV-shaped surface under each opt-in mirror's name:

```
GET /cache/v1/raw/{key} → value bytes (application/octet-stream), 404 if absent
GET /cache/v1/offset/{topic}/{partition} → decimal text
GET /cache/v1/keys → newline-separated keys
GET /cache/v1/values → newline-separated raw values
GET /cache/v1/{mirror}/raw/{key} → value bytes (application/octet-stream), 404 if absent
GET /cache/v1/{mirror}/offset/{topic}/{partition} → decimal text
GET /cache/v1/{mirror}/keys → newline-separated keys
GET /cache/v1/{mirror}/values → newline-separated raw values
```

Reads carry `x-kkv-last-seen-offsets: <JSON>` and return **503** until every opt-in mirror has caught up to the source's high-watermark captured at startup — same readiness contract as KKV, so dependents don't transiently see an older state across reloads. The cache view updates per-record from the consume loop, decoupled from disk flush cadence (set `flush.max-time-ms` high to save bucket ops without sacrificing freshness). Updates are monotonic; if a future feature ever rewinds source consumption, the cache stays at the highest offset seen.
Each mirror owns its own `key → latest-value` view; a key only shows up under the mirror that consumed it. Reads carry `x-kkv-last-seen-offsets: <JSON>` and return **503** until that mirror has caught up to its source's high-watermark captured at startup — same readiness contract as KKV, so dependents don't transiently see an older state across reloads. The view updates per-record from the consume loop, decoupled from disk flush cadence (set `flush.max-time-ms` high to save bucket ops without sacrificing freshness). Updates are monotonic; if a future feature ever rewinds source consumption, the cache stays at the highest offset seen.

To keep existing kkv consumers working unmodified during a migration, **one** mirror per process may additionally set `cache-v1-main: {}`. That mounts the unprefixed `/cache/v1/...` paths onto that mirror's view (alias-only — same handlers, no separate data path). The validator rejects more than one `cache-v1-main` in the config. Mirror names that collide with the literal path segments `raw | offset | keys | values` are rejected.

Also exposed on the same port:

Expand Down Expand Up @@ -153,7 +155,56 @@ docker run --rm -v "$PWD/examples:/cfg" mirror-v3:dev validate --config /cfg/kaf

## Operational invariants

- **One process owns at most one mirror per `(topic, partition)`.** Run with `replicas: 1` and `strategy.type: Recreate` in Kubernetes for every mirror-v3 deployment. This is non-negotiable — two writers will race on destination naming and trip the corrupt-chain detector on the next restart.
- **One process owns at most one mirror per `(topic, partition)`.** Run with `replicas: 1` and either `strategy.type: Recreate` or `RollingUpdate` with `maxSurge: 0` and `maxUnavailable: 1` for every mirror-v3 deployment. This is non-negotiable on two counts:
1. **Destination races.** Two writers will race on destination naming and trip the corrupt-chain detector on the next restart.
2. **Source-side coordination.** mirror-v3 uses `assign()` instead of `subscribe()` for its Kafka consumer, so there is no consumer-group coordinator deciding which pod owns the partition. Two pods up at once would both consume the same partition and race the consumer-offset commit log.
- **VersityGW specifically:** `If-None-Match: *` is silently ignored (v1.4.1, POSIX backend, verified in e2e), so the deployment guarantee is the *only* atomicity layer for the cross-process race. AWS S3 honors `If-None-Match: *` and gives API-level atomicity on top of the deployment guarantee.
- **Any unrecoverable error in any mirror exits the entire process.** Restart correctness is the recovery mechanism; supervision belongs to the orchestrator.
- **For blob destinations, a `(from, to)` filename/key is the durable "offset"** — atomic rename (FS) or single-shot `PutObject` (S3) makes it visible. The destination listing is the source of truth on startup.

## Readiness

`GET /q/health/ready` returns a structured JSON body in every state:

```json
{
"ready": "ready" | "warming" | "degraded",
"mirrors": [
{
"name": "userstate",
"status": "ready" | "warming" | "lag_behind_source"
| "source_unassigned" | "destination_lagging",
"source": {
"topic": "userstate", "partition": 0, "assigned": true,
"end_offset": 12345, "last_applied_offset": 12345, "lag": 0
},
"destination": { "name": "userstate-gcs", "lag": 5 }
}
],
"unhealthy": ["userstate"]
}
```

HTTP status is `200` iff every mirror is `ready`; `503` otherwise. The drop-in `@yolean/kafka-keyvalue` Node client only inspects the status code, so the body is transparent to legacy consumers but greppable for on-call.

Per-mirror `/cache/v1/{mirror}/...` routes return the matching `mirrors[i]` element as the `503` body, so a polling consumer sees a meaningful retry signal instead of opaque `503`.

Tuning:

- `MIRROR_V3_READINESS_LAG` (default `0`) — offsets of lag tolerated before `LagBehindSource` fires.
- `MIRROR_V3_READINESS_POLL_MS` (default `2000`) — how often each mirror's broker high-watermark + consumer assignment is re-checked. `0` disables the poller.
- `MIRROR_V3_OFFSET_COMMIT_INTERVAL_MS` (default `5000`) — how often the supervisor commits the consumer's progress back to the broker. `0` disables (the mirror still works but loses the between-pods notify guarantee on the next restart).

Per-destination opt-out:

```yaml
destinations:
- type: filesystem
root: /var/lib/mirror-v3
# affects-readiness: true # default
- type: kafka
bootstrap-servers: ghost-cluster:9092
affects-readiness: false # best-effort secondary
```

A destination with `affects-readiness: false` still records its `flushed_through` for observability but is skipped when computing `DestinationLagging`. Use it for observability replicas or archival sinks that must not flip consumer-pod readiness when they fall behind.
Loading