Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
6f2ffef
feat(fullhistory/streaming): meta-store catalog + one write protocol …
chowbao Jun 18, 2026
3556abf
feat(fullhistory/streaming): processChunk + catchupSource
chowbao Jun 18, 2026
0ca7e96
feat(fullhistory/streaming): tx-hash rolling rebuild + coverage protocol
chowbao Jun 18, 2026
b34373c
feat(fullhistory/streaming): postcondition resolver + executor
chowbao Jun 18, 2026
c445557
refactor(fullhistory): collapse per-chunk hot stores into one multi-C…
chowbao Jun 18, 2026
1e25396
feat(fullhistory/streaming): derived progress (deriveWatermark/derive…
chowbao Jun 18, 2026
f3e141c
feat(fullhistory/streaming): hot-DB ingestion loop
chowbao Jun 18, 2026
e6e2be1
feat(fullhistory/streaming): lifecycle tick + loop (clean shutdown)
chowbao Jun 18, 2026
04f8e1a
feat(fullhistory/streaming): startup orchestration (startStreaming)
chowbao Jun 18, 2026
8e93c29
test(fullhistory/streaming): close derived-progress coverage gaps
chowbao Jun 18, 2026
768e428
feat(fullhistory/streaming): config schema + validateConfig + single-…
chowbao Jun 18, 2026
a623034
feat(fullhistory/streaming): retention widen/shorten + reader-retenti…
chowbao Jun 18, 2026
4d1698e
feat(fullhistory/streaming): surgical recovery + hot-volume-loss hand…
chowbao Jun 18, 2026
8ffbf4b
feat(fullhistory/streaming): audit command (INV-1..4 invariant walks)
chowbao Jun 18, 2026
468a9ce
fix(full-history): audit completes (not aborts) when an INV-2 two-fro…
chowbao Jun 18, 2026
62233f2
feat(fullhistory/streaming): runnable streaming-daemon entrypoint + c…
chowbao Jun 18, 2026
938752c
fix(fullhistory/streaming): honor storage-path overrides in the data …
chowbao Jun 18, 2026
39719aa
feat(fullhistory/streaming): observability — metrics + structured log…
chowbao Jun 18, 2026
0c53519
fix(fullhistory/streaming): real steady-state liveness signal + log/l…
chowbao Jun 18, 2026
61e61fb
test(fullhistory/streaming): crash-injection + convergence suite (INV…
chowbao Jun 18, 2026
acd39ee
test(fullhistory/streaming): document convergence caveats; verify cas…
chowbao Jun 18, 2026
28c7839
test(fullhistory/streaming): end-to-end daemon integration (in-process)
chowbao Jun 18, 2026
29db0bb
test/docs(fullhistory/streaming): tx-hash format alignment + perf exp…
chowbao Jun 18, 2026
e6d7367
refactor(fullhistory/streaming): rename lfs->ledgers, catch_up->backf…
chowbao Jun 18, 2026
56df864
refactor(fullhistory/streaming): align execution layer to design c586…
chowbao Jun 18, 2026
b20a385
test(fullhistory/streaming): address review -- preserve assertion intent
chowbao Jun 18, 2026
d587b06
fix(fullhistory/streaming): join lifecycle goroutine per supervise it…
chowbao Jun 18, 2026
4338835
docs(fullhistory/streaming): address review -- comment accuracy + wid…
chowbao Jun 18, 2026
82c65d9
docs(full-history): add implementation issue breakdown + status trace…
chowbao Jun 18, 2026
43ccbb9
refactor(fullhistory/streaming): organize package -- doc map, layer-g…
chowbao Jun 19, 2026
15c0ee6
refactor(fullhistory): /simplify pass -- prune onto RetentionGate, sl…
chowbao Jun 19, 2026
eab0fa2
docs(fullhistory/streaming): clarify surgical-recovery Hi=live contra…
chowbao Jun 19, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
227 changes: 147 additions & 80 deletions cmd/stellar-rpc/internal/fullhistory/ingest/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,58 +14,37 @@ import (
"github.com/stellar/go-stellar-sdk/xdr"

"github.com/stellar/stellar-rpc/cmd/stellar-rpc/internal/fullhistory/pkg/chunk"
"github.com/stellar/stellar-rpc/cmd/stellar-rpc/internal/fullhistory/pkg/stores/eventstore"
"github.com/stellar/stellar-rpc/cmd/stellar-rpc/internal/fullhistory/pkg/stores/ledger"
"github.com/stellar/stellar-rpc/cmd/stellar-rpc/internal/fullhistory/pkg/stores/txhash"
"github.com/stellar/stellar-rpc/cmd/stellar-rpc/internal/fullhistory/pkg/stores/hotchunk"
)

// HotStores holds the long-lived, caller-owned hot stores injected into RunHot.
// The caller (the daemon) opens and closes these; RunHot only borrows them to
// build the per-type hot ingesters. A field left nil for an enabled data type is
// a configuration error caught by RunHot. Every hot store is chunk-bound (each
// instance accumulates exactly one chunk before being frozen into cold
// artifacts), so each injected store must already be bound to the chunk being
// ingested — RunHot rejects a mismatch up front.
// HotStores holds the long-lived, caller-owned shared per-chunk hot DB injected
// into RunHot. The caller (the daemon) opens and closes it; RunHot only borrows
// it to drive the per-ledger atomic ingest. Under decision (a) this is ONE
// multi-CF RocksDB instance (ledgers + events CFs + txhash CFs), not three
// independent stores. The DB is chunk-bound (it accumulates exactly one chunk
// before being frozen into cold artifacts), so the injected DB must already be
// bound to the chunk being ingested — RunHot rejects a mismatch up front. A nil
// DB with any data type enabled in cfg is a configuration error caught by
// RunHot.
type HotStores struct {
Ledgers *ledger.HotStore
Txhash *txhash.HotStore
Events *eventstore.HotStore
// HotDB is the shared per-chunk multi-CF hot DB. Required when any hot data
// type is enabled.
HotDB *hotchunk.DB
}

// buildHotIngesters constructs one HotIngester per data type enabled in cfg, in
// canonical ledgers→txhash→events order, from the injected stores. It errors if
// an enabled type's store is nil.
func buildHotIngesters(stores HotStores, sink MetricSink, cfg Config) ([]HotIngester, error) {
var ings []HotIngester
if cfg.Ledgers {
if stores.Ledgers == nil {
return nil, errors.New("ingest: Ledgers enabled but HotStores.Ledgers is nil")
}
ings = append(ings, NewLedgerHotIngester(stores.Ledgers, sink))
}
if cfg.Txhash {
if stores.Txhash == nil {
return nil, errors.New("ingest: Txhash enabled but HotStores.Txhash is nil")
}
ings = append(ings, NewTxhashHotIngester(stores.Txhash, sink))
}
if cfg.Events {
if stores.Events == nil {
return nil, errors.New("ingest: Events enabled but HotStores.Events is nil")
}
ings = append(ings, NewEventsHotIngester(stores.Events, sink))
}
return ings, nil
// ingestContributions maps the ingest Config's enabled data types onto the
// hotchunk.Ingest toggles that select which CFs the single per-ledger batch
// writes.
func ingestContributions(cfg Config) hotchunk.Ingest {
return hotchunk.Ingest{Ledgers: cfg.Ledgers, Txhash: cfg.Txhash, Events: cfg.Events}
}

// buildColdIngesters opens one ColdIngester per data type enabled in cfg,
// each opening its own per-chunk writer under coldDir/<type> (constructors
// create their own directories and freely overwrite any prior attempt's
// files — see the package doc's artifact model). The constructor table below
// is the single definition site of the canonical ledgers→txhash→events order
// (buildHotIngesters keeps its explicit if-ladder because its three injected
// store types differ). On any constructor error it closes the ingesters built
// so far and returns.
// is the single definition site of the canonical ledgers→txhash→events order.
// On any constructor error it closes the ingesters built so far and returns.
func buildColdIngesters(coldDir string, chunkID chunk.ID, sink MetricSink, cfg Config) ([]ColdIngester, error) {
ctors := []struct {
enabled bool
Expand Down Expand Up @@ -123,11 +102,12 @@ func closeColdAll(ings []ColdIngester, err error) error {
}

// RunHot opens one stream for chunkID from source and feeds each ledger (as a
// view) to a HotService over the enabled hot ingesters, built from the INJECTED,
// caller-owned stores in hotStores. Ingest errors abort fast; HotService.Ingest
// waits for all ingesters before the loop pulls again so the borrowed view is
// never read past its lifetime. The hot stores are NOT closed here — the caller
// owns their lifecycle.
// view) to a HotService backed by the INJECTED, caller-owned shared per-chunk
// hot DB in hotStores. Each ledger commits as ONE atomic synced WriteBatch
// across all enabled CFs (decision (a)); Ingest errors abort fast, and
// HotService.Ingest consumes the borrowed view synchronously before the loop
// pulls the next ledger. The hot DB is NOT closed here — the caller owns its
// lifecycle.
func RunHot(
ctx context.Context,
logger *supportlog.Entry,
Expand All @@ -140,47 +120,26 @@ func RunHot(
if verr := cfg.validate(); verr != nil {
return verr
}
// Every hot store is chunk-bound — each instance accumulates exactly one
// chunk's data before being frozen into the chunk's cold artifacts — and
// records its chunk at open time. An injected store bound to a different
// chunk than we're ingesting would silently interleave two chunks' data
// (ledgers, txhash) or fail every per-ledger write with an out-of-range
// offset (events, whose LedgerOffsets are chunk-relative), so catch the
// mismatch up front with a clear message. Nil stores are skipped here:
// buildHotIngesters rejects a nil store for an enabled type with a more
// specific error.
checkBinding := func(name string, got chunk.ID) error {
if got != chunkID {
return fmt.Errorf("ingest: RunHot chunk %d but injected %s store is bound to chunk %d",
uint32(chunkID), name, uint32(got))
}
return nil
}
if cfg.Ledgers && hotStores.Ledgers != nil {
if err := checkBinding("Ledgers", hotStores.Ledgers.ChunkID()); err != nil {
return err
}
}
if cfg.Txhash && hotStores.Txhash != nil {
if err := checkBinding("Txhash", hotStores.Txhash.ChunkID()); err != nil {
return err
}
}
if cfg.Events && hotStores.Events != nil {
if err := checkBinding("Events", hotStores.Events.ChunkID()); err != nil {
return err
}
anyEnabled := cfg.Ledgers || cfg.Txhash || cfg.Events
if anyEnabled && hotStores.HotDB == nil {
return errors.New("ingest: a hot data type is enabled but HotStores.HotDB is nil")
}
ings, berr := buildHotIngesters(hotStores, sink, cfg)
if berr != nil {
return berr
// The shared hot DB is chunk-bound — it accumulates exactly one chunk's
// data before being frozen into the chunk's cold artifacts — and records
// its chunk at open time. An injected DB bound to a different chunk than
// we're ingesting would silently interleave two chunks' data or fail every
// per-ledger events write with an out-of-range offset (LedgerOffsets are
// chunk-relative), so catch the mismatch up front with a clear message.
if hotStores.HotDB != nil && hotStores.HotDB.ChunkID() != chunkID {
return fmt.Errorf("ingest: RunHot chunk %d but injected hot DB is bound to chunk %d",
uint32(chunkID), uint32(hotStores.HotDB.ChunkID()))
}
stream, oerr := source.OpenStream(chunkID)
if oerr != nil {
return fmt.Errorf("open stream for chunk %d: %w", uint32(chunkID), oerr)
}
logger.Debugf("RunHot: ingesting chunk %d [%d, %d]", uint32(chunkID), chunkID.FirstLedger(), chunkID.LastLedger())
service := NewHotService(ings, sink)
service := NewHotService(hotStores.HotDB, ingestContributions(cfg), sink)
return drain(ctx, stream, chunkID, service)
}

Expand Down Expand Up @@ -235,6 +194,114 @@ func drain(ctx context.Context, stream ledgerbackend.LedgerStream, chunkID chunk
return nil
}

// ColdDirs names the per-data-type output root for one chunk's cold artifacts.
// Each field is the directory UNDER WHICH the matching cold ingester composes
// its {bucketID:05d}/ subdirectory — i.e. the same `coldDir` the per-type
// constructor (NewLedgerColdIngester / NewTxhashColdIngester /
// NewEventsColdIngester) takes. A field left "" for a data type enabled in cfg
// is a configuration error caught by RunColdChunk.
//
// RunCold derives these three roots from a single coldDir by appending the
// fixed dataType subdirectory (coldDir/ledgers, coldDir/txhash, coldDir/events).
// ColdDirs exists so a caller with a DIFFERENT on-disk layout (e.g. the
// streaming daemon, whose raw txhash runs live under txhash/raw, not txhash)
// can place each artifact at its own canonical path while reusing the very same
// cold ingesters, ColdService, and drain loop.
type ColdDirs struct {
Ledgers string
Txhash string
Events string
}

// buildColdIngestersIn opens one ColdIngester per data type enabled in cfg,
// each under its OWN root from dirs (rather than coldDir/<dataType>). It is the
// ColdDirs counterpart of buildColdIngesters: same constructors, same canonical
// ledgers→txhash→events order, same rollback-on-constructor-error semantics; it
// differs only in resolving each type's root from an explicit field instead of
// a fixed subdirectory of one coldDir.
func buildColdIngestersIn(dirs ColdDirs, chunkID chunk.ID, sink MetricSink, cfg Config) ([]ColdIngester, error) {
ctors := []struct {
enabled bool
dataType string
dir string
open func(string, chunk.ID, MetricSink) (ColdIngester, error)
}{
{cfg.Ledgers, dataTypeLedgers, dirs.Ledgers, NewLedgerColdIngester},
{cfg.Txhash, dataTypeTxhash, dirs.Txhash, NewTxhashColdIngester},
{cfg.Events, dataTypeEvents, dirs.Events, NewEventsColdIngester},
}
var ings []ColdIngester
for _, c := range ctors {
if !c.enabled {
continue
}
if c.dir == "" {
return nil, closeColdAll(ings, fmt.Errorf("ingest: %s enabled but ColdDirs.%s is empty", c.dataType, c.dataType))
}
ing, err := c.open(c.dir, chunkID, sink)
if err != nil {
return nil, closeColdAll(ings, fmt.Errorf("open %s cold ingester: %w", c.dataType, err))
}
ings = append(ings, ing)
}
return ings, nil
}

// RunColdChunk ingests EXACTLY ONE chunk's cold artifacts from source into the
// per-data-type roots named by dirs, in a single streaming pass over the
// chunk's ledgers. It is the single-chunk, explicit-layout sibling of RunCold:
// it reuses the same cold ingester constructors, the same ColdService, and the
// same drain loop (sequence/overrun validation, full-range completeness check
// before Finalize), differing only in (1) producing one chunk rather than N
// concurrent chunks and (2) taking explicit per-type output roots so a caller
// whose layout is not coldDir/<dataType> can still reuse the cold pipeline
// verbatim.
//
// The cold ingesters overwrite any prior attempt's files at their canonical
// paths (see the package doc's artifact model), so RunColdChunk is the
// re-materialization primitive the streaming freeze protocol drives: a partial
// file from a crashed attempt is inert scratch the next call overwrites.
func RunColdChunk(
ctx context.Context,
logger *supportlog.Entry,
source ChunkSource,
dirs ColdDirs,
chunkID chunk.ID,
sink MetricSink,
cfg Config,
) (err error) {
if verr := cfg.validate(); verr != nil {
return verr
}
sink = orNop(sink)
start := time.Now()
if cerr := ctx.Err(); cerr != nil {
sink.ColdChunkTotal(time.Since(start))
return cerr
}
stream, oerr := source.OpenStream(chunkID)
if oerr != nil {
sink.ColdChunkTotal(time.Since(start))
return fmt.Errorf("open stream for chunk %d: %w", uint32(chunkID), oerr)
}
ings, berr := buildColdIngestersIn(dirs, chunkID, sink, cfg)
if berr != nil {
sink.ColdChunkTotal(time.Since(start))
return berr
}
logger.Debugf("RunColdChunk: ingesting chunk %d [%d, %d]", uint32(chunkID), chunkID.FirstLedger(), chunkID.LastLedger())
service := NewColdService(ings, sink)
defer func() {
if cerr := service.Close(); cerr != nil {
err = errors.Join(err, fmt.Errorf("close: %w", cerr))
}
}()
if derr := drain(ctx, stream, chunkID, service); derr != nil {
return derr
}
return service.Finalize(ctx)
}

// RunCold ingests numChunks consecutive chunks starting at startChunk into the
// cold stores under coldDir, processing up to chunkWorkers chunks concurrently.
// Each chunk worker opens its own stream via source.OpenStream(chunkID), builds
Expand Down
Loading
Loading