Skip to content
This repository was archived by the owner on Mar 19, 2024. It is now read-only.
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 1 addition & 11 deletions cmd/flow-archive-indexer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ import (
"github.com/onflow/flow-archive/models/archive"
"github.com/onflow/flow-archive/service/chain"
"github.com/onflow/flow-archive/service/index"
"github.com/onflow/flow-archive/service/loader"
"github.com/onflow/flow-archive/service/mapper"
"github.com/onflow/flow-archive/service/storage"
"github.com/onflow/flow-archive/service/triereader"
Expand Down Expand Up @@ -116,7 +115,6 @@ func run() int {

// Check if index already exists.
read := index.NewReader(log, indexDB, storage)
first, err := read.First()
empty := errors.Is(err, badger.ErrKeyNotFound)
if err != nil && !empty {
log.Error().Err(err).Msg("could not get first height from index reader")
Expand Down Expand Up @@ -152,8 +150,6 @@ func run() int {
}()

// Initialize the transitions with the dependencies and add them to the FSM.
var load mapper.Loader
load = loader.FromIndex(log, storage, indexDB)
bootstrap := flagCheckpoint != ""
if empty {
file, err := os.Open(flagCheckpoint)
Expand All @@ -162,22 +158,16 @@ func run() int {
return failure
}
file.Close()
load = loader.FromCheckpointFile(flagCheckpoint, &log)
} else if bootstrap {
file, err := os.Open(flagCheckpoint)
if err != nil {
log.Error().Err(err).Msg("could not open checkpoint file")
return failure
}
file.Close()
initialize := loader.FromCheckpointFile(flagCheckpoint, &log)
load = loader.FromIndex(log, storage, indexDB,
loader.WithInitializer(initialize),
loader.WithExclude(loader.ExcludeAtOrBelow(first)),
)
}

transitions := mapper.NewTransitions(log, load, disk, feed, read, write,
transitions := mapper.NewTransitions(log, disk, feed, read, write,
mapper.WithSkipRegisters(flagSkip),
)
state := mapper.EmptyState(flagCheckpoint)
Expand Down
64 changes: 58 additions & 6 deletions cmd/flow-archive-live/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@ import (
"github.com/onflow/flow-archive/service/cloud"
"github.com/onflow/flow-archive/service/index"
"github.com/onflow/flow-archive/service/initializer"
"github.com/onflow/flow-archive/service/loader"
"github.com/onflow/flow-archive/service/mapper"
"github.com/onflow/flow-archive/service/metrics"
"github.com/onflow/flow-archive/service/profiler"
Expand Down Expand Up @@ -109,6 +108,14 @@ func run() int {

pflag.Parse()

// TODO(leo): add startup reporting
// on startup, it's useful to print the actual value of all the flags for debugging purpose.
// on startup, it's useful to print the current state, such as:
// - whether the database has been bootstrapped
// - the last indexed height (since the FSM will continue from last indexed height and index the next, useful for debugging when indexing is halt)
// - the last finalized height (since the follower engine will continue finalize from this height, and useful for debugging when finalization is halt)
// - report metrics with the current state

// Increase the GOMAXPROCS value in order to use the full IOPS available, see:
// https://groups.google.com/g/golang-nuts/c/jPb_h3TvlKE
_ = runtime.GOMAXPROCS(128)
Expand Down Expand Up @@ -292,6 +299,19 @@ func run() int {
log.Error().Err(err).Msg("could not initialize execution tracker")
return failure
}
// TODO(leo): use follower state to retrive block data
// this creates a consensus tracker in order to retrieve finalized
// blocks by height. This is not recommended, because finalized blocks
// and sealed blocks should be retrieved from protocol state, rather than
// retrieving from database (protocolDB) directy.
// the question is: how to get the protocol state, the consensus follower
// is supposed to sync blocks from peers and use consensus algorithm to finalize
// blocks, which means the follower creates a protocol state internally,
// which is called follower state. The follower state is the correct module
// to retrieve finalized or sealed blocks by height.
// However, the consensus follower didn't expose the follower state, it's better
// that to adjust the consensus follower creation function to return follower state
// module as well.
consensus, err := tracker.NewConsensus(log, protocolDB, execution)
if err != nil {
log.Error().Err(err).Msg("could not initialize consensus tracker")
Expand All @@ -303,27 +323,59 @@ func run() int {
// will use the callback to make additional data available to the mapper,
// while the cloud streamer will use the callback to download execution data
// for finalized blocks.
// TODO(leo): use jobqueue
// here, callbacks are used to notify stream (trie updates downloader) that
// a new finalized block can be downloaded.
// this wouldn't be a problem until indexing speed is behind, in which case,
// there will be lots of finalized and un-indexed blocks buffered in memory, and
// potentially causing OOM. A better way is to notify the latest finalized height only
// and only pre-fetch up to a certain distance. Such job-queue has been implemented in
// [jobqueue](https://github.com/onflow/flow-go/blob/master/module/jobqueue/README.md)
// the jobqueue also allows concurrently working on multiple blocks
follow.AddOnBlockFinalizedConsumer(stream.OnBlockFinalized)
follow.AddOnBlockFinalizedConsumer(consensus.OnBlockFinalized)

// If we have an empty database, we want a loader to bootstrap from the
// checkpoint; if we don't, we can optionally use the root checkpoint to
// speed up the restart/restoration.
load := loader.FromIndex(log, storage, indexDB)
// If metrics are enabled, the mapper should use the metrics writer. Otherwise, it can
// use the regular one.
writer := archive.Writer(write)
metricsEnabled := flagMetricsAddr != ""
if metricsEnabled {
// TODO(leo): add metrics for finalized height, and trie updates download
// the indexer depends on the stream (trie updates downloader) and consensus follower (blocks downloader)
// to fetch the data to be indexed.
// the stream, the consensus follower and the indexer are all working independently.
// therefore, we need to monitor the progress of each module independently as well.
// however here, the metrics is currently only added to the writer (the indexer).
// it should also be added other modules in order to monitor changes in finalized height and indexed height
// as well as the trie updates downloading activities.
writer = metrics.NewMetricsWriter(write)
}

// At this point, we can initialize the core business logic of the indexer,
// with the mapper's finite state machine and transitions. We also want to
// load and inject the root checkpoint if it is given as a parameter.
transitions := mapper.NewTransitions(log, load, consensus, execution, read, writer,
// TODO(leo): use follower state
// the write will index both the block and trie updates for each block,
// actually, it is not necessary to index the block for the finalized height, because
// protocol state has already indexed the block by height.
// in other words, indexing the block would be creating a duplicated index.
// We could implement the read so that the block is read from the protocol state, rather
// than from a duplicated index.
transitions := mapper.NewTransitions(log, consensus, execution, read, writer,
mapper.WithSkipRegisters(flagSkip),
)
// TODO(leo): replace FSM with job queue
// FSM makes it clear about state transition, however, it also limits the writer
// to index only one block at a time, which would be very slow, especially in the case when
// the archive node needs to catch up.
// if multiple blocks are finalized, technically, they can be indexed concurrently, meaning
// concurrently fetching data for multiple blocks and index them. we just need to carefully
// set a limit on the number of blocks to be concurrently processed.
// There feature has already been implemented in
// [jobqueue](https://github.com/onflow/flow-go/blob/master/module/jobqueue/README.md)
// would be a good candidate for replacing FSM
// also indexing the trie updates and indexing the block data can also be done concurrently, which
// FSM doesn't allow
state := mapper.EmptyState(flagCheckpoint)
fsm := mapper.NewFSM(state,
mapper.WithTransition(mapper.StatusInitialize, transitions.InitializeMapper),
Expand Down
4 changes: 1 addition & 3 deletions service/mapper/transitions.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ type TransitionFunc func(*State) error
type Transitions struct {
cfg Config
log zerolog.Logger
load Loader
chain archive.Chain
updates TrieUpdates
read archive.Reader
Expand All @@ -46,7 +45,7 @@ type Transitions struct {
}

// NewTransitions returns a Transitions component using the given dependencies and using the given options
func NewTransitions(log zerolog.Logger, load Loader, chain archive.Chain, updates TrieUpdates, read archive.Reader, write archive.Writer, options ...Option) *Transitions {
func NewTransitions(log zerolog.Logger, chain archive.Chain, updates TrieUpdates, read archive.Reader, write archive.Writer, options ...Option) *Transitions {

cfg := DefaultConfig
for _, option := range options {
Expand All @@ -56,7 +55,6 @@ func NewTransitions(log zerolog.Logger, load Loader, chain archive.Chain, update
t := Transitions{
log: log.With().Str("component", "mapper_transitions").Logger(),
cfg: cfg,
load: load,
chain: chain,
updates: updates,
read: read,
Expand Down
44 changes: 2 additions & 42 deletions service/mapper/transitions_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,12 @@ import (

func TestNewTransitions(t *testing.T) {
t.Run("nominal case, without options", func(t *testing.T) {
load := mocks.BaselineLoader(t)
chain := mocks.BaselineChain(t)
updates := mocks.BaselineParser(t)
read := mocks.BaselineReader(t)
write := mocks.BaselineWriter(t)

tr := NewTransitions(mocks.NoopLogger, load, chain, updates, read, write)
tr := NewTransitions(mocks.NoopLogger, chain, updates, read, write)

assert.NotNil(t, tr)
assert.Equal(t, chain, tr.chain)
Expand All @@ -53,14 +52,13 @@ func TestNewTransitions(t *testing.T) {
})

t.Run("nominal case, with option", func(t *testing.T) {
load := mocks.BaselineLoader(t)
chain := mocks.BaselineChain(t)
updates := mocks.BaselineParser(t)
read := mocks.BaselineReader(t)
write := mocks.BaselineWriter(t)

skip := true
tr := NewTransitions(mocks.NoopLogger, load, chain, updates, read, write,
tr := NewTransitions(mocks.NoopLogger, chain, updates, read, write,
WithSkipRegisters(skip),
)

Expand Down Expand Up @@ -917,11 +915,6 @@ func TestTransitions_ResumeIndexing(t *testing.T) {
return nil
}

loader := mocks.BaselineLoader(t)
loader.TrieFunc = func() (*trie.MTrie, error) {
return tree, nil
}

reader := mocks.BaselineReader(t)
reader.LastFunc = func() (uint64, error) {
return header.Height, nil
Expand All @@ -937,7 +930,6 @@ func TestTransitions_ResumeIndexing(t *testing.T) {
StatusResume,
withReader(reader),
withWriter(writer),
withLoader(loader),
withChain(chain),
)

Expand All @@ -956,11 +948,6 @@ func TestTransitions_ResumeIndexing(t *testing.T) {
return 0, mocks.GenericError
}

loader := mocks.BaselineLoader(t)
loader.TrieFunc = func() (*trie.MTrie, error) {
return tree, nil
}

reader := mocks.BaselineReader(t)
reader.LastFunc = func() (uint64, error) {
return header.Height, nil
Expand All @@ -973,7 +960,6 @@ func TestTransitions_ResumeIndexing(t *testing.T) {
t,
StatusResume,
withReader(reader),
withLoader(loader),
withChain(chain),
)

Expand All @@ -995,11 +981,6 @@ func TestTransitions_ResumeIndexing(t *testing.T) {
return mocks.GenericError
}

loader := mocks.BaselineLoader(t)
loader.TrieFunc = func() (*trie.MTrie, error) {
return tree, nil
}

reader := mocks.BaselineReader(t)
reader.LastFunc = func() (uint64, error) {
return header.Height, nil
Expand All @@ -1013,7 +994,6 @@ func TestTransitions_ResumeIndexing(t *testing.T) {
StatusResume,
withWriter(writer),
withReader(reader),
withLoader(loader),
withChain(chain),
)

Expand All @@ -1030,11 +1010,6 @@ func TestTransitions_ResumeIndexing(t *testing.T) {
return header.Height, nil
}

loader := mocks.BaselineLoader(t)
loader.TrieFunc = func() (*trie.MTrie, error) {
return tree, nil
}

reader := mocks.BaselineReader(t)
reader.LastFunc = func() (uint64, error) {
return 0, mocks.GenericError
Expand All @@ -1047,7 +1022,6 @@ func TestTransitions_ResumeIndexing(t *testing.T) {
t,
StatusResume,
withReader(reader),
withLoader(loader),
withChain(chain),
)

Expand All @@ -1064,11 +1038,6 @@ func TestTransitions_ResumeIndexing(t *testing.T) {
return header.Height, nil
}

loader := mocks.BaselineLoader(t)
loader.TrieFunc = func() (*trie.MTrie, error) {
return tree, nil
}

reader := mocks.BaselineReader(t)
reader.LastFunc = func() (uint64, error) {
return header.Height, nil
Expand All @@ -1081,7 +1050,6 @@ func TestTransitions_ResumeIndexing(t *testing.T) {
t,
StatusForward,
withReader(reader),
withLoader(loader),
withChain(chain),
)

Expand Down Expand Up @@ -1133,7 +1101,6 @@ func createTrieWithNPayloads(t *testing.T, n int) *trie.MTrie {
func baselineFSM(t *testing.T, status Status, opts ...func(tr *Transitions)) (*Transitions, *State) {
t.Helper()

load := mocks.BaselineLoader(t)
chain := mocks.BaselineChain(t)
updates := mocks.BaselineParser(t)
read := mocks.BaselineReader(t)
Expand All @@ -1148,7 +1115,6 @@ func baselineFSM(t *testing.T, status Status, opts ...func(tr *Transitions)) (*T
WaitInterval: 0,
},
log: mocks.NoopLogger,
load: load,
chain: chain,
updates: updates,
read: read,
Expand All @@ -1171,12 +1137,6 @@ func baselineFSM(t *testing.T, status Status, opts ...func(tr *Transitions)) (*T
return &tr, &st
}

func withLoader(load Loader) func(*Transitions) {
return func(tr *Transitions) {
tr.load = load
}
}

func withChain(chain archive.Chain) func(*Transitions) {
return func(tr *Transitions) {
tr.chain = chain
Expand Down
1 change: 1 addition & 0 deletions service/tracker/consensus.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ func (c *Consensus) OnBlockFinalized(block *model.Block) {
c.log.Error().Err(err).Hex("block", blockID[:]).Msg("could not get header")
return
}
// TODO(leo): protect by mutex, otherwise not concurrent safe
c.last = header.Height
c.log.Info().Hex("block", blockID[:]).Uint64("height", header.Height).Msg("block finalization processed")
}
Expand Down
41 changes: 0 additions & 41 deletions testing/mocks/loader.go

This file was deleted.