From 3776bf1847f01299a3d46e3413e0670f6d3e7dcb Mon Sep 17 00:00:00 2001 From: "Leo Zhang (zhangchiqing)" Date: Wed, 5 Apr 2023 08:50:25 -0700 Subject: [PATCH 1/2] remove loader --- cmd/flow-archive-indexer/main.go | 12 +----- cmd/flow-archive-live/main.go | 7 +--- service/mapper/transitions.go | 4 +- service/mapper/transitions_internal_test.go | 44 +-------------------- testing/mocks/loader.go | 41 ------------------- 5 files changed, 5 insertions(+), 103 deletions(-) delete mode 100644 testing/mocks/loader.go diff --git a/cmd/flow-archive-indexer/main.go b/cmd/flow-archive-indexer/main.go index 5ad1bbfc9..24a166868 100644 --- a/cmd/flow-archive-indexer/main.go +++ b/cmd/flow-archive-indexer/main.go @@ -30,7 +30,6 @@ import ( "github.com/onflow/flow-archive/models/archive" "github.com/onflow/flow-archive/service/chain" "github.com/onflow/flow-archive/service/index" - "github.com/onflow/flow-archive/service/loader" "github.com/onflow/flow-archive/service/mapper" "github.com/onflow/flow-archive/service/storage" "github.com/onflow/flow-archive/service/triereader" @@ -116,7 +115,6 @@ func run() int { // Check if index already exists. read := index.NewReader(log, indexDB, storage) - first, err := read.First() empty := errors.Is(err, badger.ErrKeyNotFound) if err != nil && !empty { log.Error().Err(err).Msg("could not get first height from index reader") @@ -152,8 +150,6 @@ func run() int { }() // Initialize the transitions with the dependencies and add them to the FSM. - var load mapper.Loader - load = loader.FromIndex(log, storage, indexDB) bootstrap := flagCheckpoint != "" if empty { file, err := os.Open(flagCheckpoint) @@ -162,7 +158,6 @@ func run() int { return failure } file.Close() - load = loader.FromCheckpointFile(flagCheckpoint, &log) } else if bootstrap { file, err := os.Open(flagCheckpoint) if err != nil { @@ -170,14 +165,9 @@ func run() int { return failure } file.Close() - initialize := loader.FromCheckpointFile(flagCheckpoint, &log) - load = loader.FromIndex(log, storage, indexDB, - loader.WithInitializer(initialize), - loader.WithExclude(loader.ExcludeAtOrBelow(first)), - ) } - transitions := mapper.NewTransitions(log, load, disk, feed, read, write, + transitions := mapper.NewTransitions(log, disk, feed, read, write, mapper.WithSkipRegisters(flagSkip), ) state := mapper.EmptyState(flagCheckpoint) diff --git a/cmd/flow-archive-live/main.go b/cmd/flow-archive-live/main.go index 90f2ca5c9..8504af135 100644 --- a/cmd/flow-archive-live/main.go +++ b/cmd/flow-archive-live/main.go @@ -49,7 +49,6 @@ import ( "github.com/onflow/flow-archive/service/cloud" "github.com/onflow/flow-archive/service/index" "github.com/onflow/flow-archive/service/initializer" - "github.com/onflow/flow-archive/service/loader" "github.com/onflow/flow-archive/service/mapper" "github.com/onflow/flow-archive/service/metrics" "github.com/onflow/flow-archive/service/profiler" @@ -306,10 +305,6 @@ func run() int { follow.AddOnBlockFinalizedConsumer(stream.OnBlockFinalized) follow.AddOnBlockFinalizedConsumer(consensus.OnBlockFinalized) - // If we have an empty database, we want a loader to bootstrap from the - // checkpoint; if we don't, we can optionally use the root checkpoint to - // speed up the restart/restoration. - load := loader.FromIndex(log, storage, indexDB) // If metrics are enabled, the mapper should use the metrics writer. Otherwise, it can // use the regular one. writer := archive.Writer(write) @@ -321,7 +316,7 @@ func run() int { // At this point, we can initialize the core business logic of the indexer, // with the mapper's finite state machine and transitions. We also want to // load and inject the root checkpoint if it is given as a parameter. - transitions := mapper.NewTransitions(log, load, consensus, execution, read, writer, + transitions := mapper.NewTransitions(log, consensus, execution, read, writer, mapper.WithSkipRegisters(flagSkip), ) state := mapper.EmptyState(flagCheckpoint) diff --git a/service/mapper/transitions.go b/service/mapper/transitions.go index 35fecca7b..d83c3ca9f 100644 --- a/service/mapper/transitions.go +++ b/service/mapper/transitions.go @@ -37,7 +37,6 @@ type TransitionFunc func(*State) error type Transitions struct { cfg Config log zerolog.Logger - load Loader chain archive.Chain updates TrieUpdates read archive.Reader @@ -46,7 +45,7 @@ type Transitions struct { } // NewTransitions returns a Transitions component using the given dependencies and using the given options -func NewTransitions(log zerolog.Logger, load Loader, chain archive.Chain, updates TrieUpdates, read archive.Reader, write archive.Writer, options ...Option) *Transitions { +func NewTransitions(log zerolog.Logger, chain archive.Chain, updates TrieUpdates, read archive.Reader, write archive.Writer, options ...Option) *Transitions { cfg := DefaultConfig for _, option := range options { @@ -56,7 +55,6 @@ func NewTransitions(log zerolog.Logger, load Loader, chain archive.Chain, update t := Transitions{ log: log.With().Str("component", "mapper_transitions").Logger(), cfg: cfg, - load: load, chain: chain, updates: updates, read: read, diff --git a/service/mapper/transitions_internal_test.go b/service/mapper/transitions_internal_test.go index 7437ce7e2..8afe5f696 100644 --- a/service/mapper/transitions_internal_test.go +++ b/service/mapper/transitions_internal_test.go @@ -36,13 +36,12 @@ import ( func TestNewTransitions(t *testing.T) { t.Run("nominal case, without options", func(t *testing.T) { - load := mocks.BaselineLoader(t) chain := mocks.BaselineChain(t) updates := mocks.BaselineParser(t) read := mocks.BaselineReader(t) write := mocks.BaselineWriter(t) - tr := NewTransitions(mocks.NoopLogger, load, chain, updates, read, write) + tr := NewTransitions(mocks.NoopLogger, chain, updates, read, write) assert.NotNil(t, tr) assert.Equal(t, chain, tr.chain) @@ -53,14 +52,13 @@ func TestNewTransitions(t *testing.T) { }) t.Run("nominal case, with option", func(t *testing.T) { - load := mocks.BaselineLoader(t) chain := mocks.BaselineChain(t) updates := mocks.BaselineParser(t) read := mocks.BaselineReader(t) write := mocks.BaselineWriter(t) skip := true - tr := NewTransitions(mocks.NoopLogger, load, chain, updates, read, write, + tr := NewTransitions(mocks.NoopLogger, chain, updates, read, write, WithSkipRegisters(skip), ) @@ -917,11 +915,6 @@ func TestTransitions_ResumeIndexing(t *testing.T) { return nil } - loader := mocks.BaselineLoader(t) - loader.TrieFunc = func() (*trie.MTrie, error) { - return tree, nil - } - reader := mocks.BaselineReader(t) reader.LastFunc = func() (uint64, error) { return header.Height, nil @@ -937,7 +930,6 @@ func TestTransitions_ResumeIndexing(t *testing.T) { StatusResume, withReader(reader), withWriter(writer), - withLoader(loader), withChain(chain), ) @@ -956,11 +948,6 @@ func TestTransitions_ResumeIndexing(t *testing.T) { return 0, mocks.GenericError } - loader := mocks.BaselineLoader(t) - loader.TrieFunc = func() (*trie.MTrie, error) { - return tree, nil - } - reader := mocks.BaselineReader(t) reader.LastFunc = func() (uint64, error) { return header.Height, nil @@ -973,7 +960,6 @@ func TestTransitions_ResumeIndexing(t *testing.T) { t, StatusResume, withReader(reader), - withLoader(loader), withChain(chain), ) @@ -995,11 +981,6 @@ func TestTransitions_ResumeIndexing(t *testing.T) { return mocks.GenericError } - loader := mocks.BaselineLoader(t) - loader.TrieFunc = func() (*trie.MTrie, error) { - return tree, nil - } - reader := mocks.BaselineReader(t) reader.LastFunc = func() (uint64, error) { return header.Height, nil @@ -1013,7 +994,6 @@ func TestTransitions_ResumeIndexing(t *testing.T) { StatusResume, withWriter(writer), withReader(reader), - withLoader(loader), withChain(chain), ) @@ -1030,11 +1010,6 @@ func TestTransitions_ResumeIndexing(t *testing.T) { return header.Height, nil } - loader := mocks.BaselineLoader(t) - loader.TrieFunc = func() (*trie.MTrie, error) { - return tree, nil - } - reader := mocks.BaselineReader(t) reader.LastFunc = func() (uint64, error) { return 0, mocks.GenericError @@ -1047,7 +1022,6 @@ func TestTransitions_ResumeIndexing(t *testing.T) { t, StatusResume, withReader(reader), - withLoader(loader), withChain(chain), ) @@ -1064,11 +1038,6 @@ func TestTransitions_ResumeIndexing(t *testing.T) { return header.Height, nil } - loader := mocks.BaselineLoader(t) - loader.TrieFunc = func() (*trie.MTrie, error) { - return tree, nil - } - reader := mocks.BaselineReader(t) reader.LastFunc = func() (uint64, error) { return header.Height, nil @@ -1081,7 +1050,6 @@ func TestTransitions_ResumeIndexing(t *testing.T) { t, StatusForward, withReader(reader), - withLoader(loader), withChain(chain), ) @@ -1133,7 +1101,6 @@ func createTrieWithNPayloads(t *testing.T, n int) *trie.MTrie { func baselineFSM(t *testing.T, status Status, opts ...func(tr *Transitions)) (*Transitions, *State) { t.Helper() - load := mocks.BaselineLoader(t) chain := mocks.BaselineChain(t) updates := mocks.BaselineParser(t) read := mocks.BaselineReader(t) @@ -1148,7 +1115,6 @@ func baselineFSM(t *testing.T, status Status, opts ...func(tr *Transitions)) (*T WaitInterval: 0, }, log: mocks.NoopLogger, - load: load, chain: chain, updates: updates, read: read, @@ -1171,12 +1137,6 @@ func baselineFSM(t *testing.T, status Status, opts ...func(tr *Transitions)) (*T return &tr, &st } -func withLoader(load Loader) func(*Transitions) { - return func(tr *Transitions) { - tr.load = load - } -} - func withChain(chain archive.Chain) func(*Transitions) { return func(tr *Transitions) { tr.chain = chain diff --git a/testing/mocks/loader.go b/testing/mocks/loader.go deleted file mode 100644 index c52750151..000000000 --- a/testing/mocks/loader.go +++ /dev/null @@ -1,41 +0,0 @@ -// Copyright 2021 Optakt Labs OÜ -// -// Licensed under the Apache License, Version 2.0 (the "License"); you may not -// use this file except in compliance with the License. You may obtain a copy of -// the License at -// -// https://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -// License for the specific language governing permissions and limitations under -// the License. - -package mocks - -import ( - "testing" - - "github.com/onflow/flow-go/ledger/complete/mtrie/trie" -) - -type Loader struct { - TrieFunc func() (*trie.MTrie, error) -} - -func BaselineLoader(t *testing.T) *Loader { - t.Helper() - - l := Loader{ - TrieFunc: func() (*trie.MTrie, error) { - return GenericTrie, nil - }, - } - - return &l -} - -func (l *Loader) Trie() (*trie.MTrie, error) { - return l.TrieFunc() -} From be8798d89c6bf77fec274307225a84fa121fdba1 Mon Sep 17 00:00:00 2001 From: "Leo Zhang (zhangchiqing)" Date: Wed, 5 Apr 2023 09:42:39 -0700 Subject: [PATCH 2/2] add todos --- cmd/flow-archive-live/main.go | 57 +++++++++++++++++++++++++++++++++++ service/tracker/consensus.go | 1 + 2 files changed, 58 insertions(+) diff --git a/cmd/flow-archive-live/main.go b/cmd/flow-archive-live/main.go index 8504af135..a34a680d1 100644 --- a/cmd/flow-archive-live/main.go +++ b/cmd/flow-archive-live/main.go @@ -108,6 +108,14 @@ func run() int { pflag.Parse() + // TODO(leo): add startup reporting + // on startup, it's useful to print the actual value of all the flags for debugging purpose. + // on startup, it's useful to print the current state, such as: + // - whether the database has been bootstrapped + // - the last indexed height (since the FSM will continue from last indexed height and index the next, useful for debugging when indexing is halt) + // - the last finalized height (since the follower engine will continue finalize from this height, and useful for debugging when finalization is halt) + // - report metrics with the current state + // Increase the GOMAXPROCS value in order to use the full IOPS available, see: // https://groups.google.com/g/golang-nuts/c/jPb_h3TvlKE _ = runtime.GOMAXPROCS(128) @@ -291,6 +299,19 @@ func run() int { log.Error().Err(err).Msg("could not initialize execution tracker") return failure } + // TODO(leo): use follower state to retrive block data + // this creates a consensus tracker in order to retrieve finalized + // blocks by height. This is not recommended, because finalized blocks + // and sealed blocks should be retrieved from protocol state, rather than + // retrieving from database (protocolDB) directy. + // the question is: how to get the protocol state, the consensus follower + // is supposed to sync blocks from peers and use consensus algorithm to finalize + // blocks, which means the follower creates a protocol state internally, + // which is called follower state. The follower state is the correct module + // to retrieve finalized or sealed blocks by height. + // However, the consensus follower didn't expose the follower state, it's better + // that to adjust the consensus follower creation function to return follower state + // module as well. consensus, err := tracker.NewConsensus(log, protocolDB, execution) if err != nil { log.Error().Err(err).Msg("could not initialize consensus tracker") @@ -302,6 +323,15 @@ func run() int { // will use the callback to make additional data available to the mapper, // while the cloud streamer will use the callback to download execution data // for finalized blocks. + // TODO(leo): use jobqueue + // here, callbacks are used to notify stream (trie updates downloader) that + // a new finalized block can be downloaded. + // this wouldn't be a problem until indexing speed is behind, in which case, + // there will be lots of finalized and un-indexed blocks buffered in memory, and + // potentially causing OOM. A better way is to notify the latest finalized height only + // and only pre-fetch up to a certain distance. Such job-queue has been implemented in + // [jobqueue](https://github.com/onflow/flow-go/blob/master/module/jobqueue/README.md) + // the jobqueue also allows concurrently working on multiple blocks follow.AddOnBlockFinalizedConsumer(stream.OnBlockFinalized) follow.AddOnBlockFinalizedConsumer(consensus.OnBlockFinalized) @@ -310,15 +340,42 @@ func run() int { writer := archive.Writer(write) metricsEnabled := flagMetricsAddr != "" if metricsEnabled { + // TODO(leo): add metrics for finalized height, and trie updates download + // the indexer depends on the stream (trie updates downloader) and consensus follower (blocks downloader) + // to fetch the data to be indexed. + // the stream, the consensus follower and the indexer are all working independently. + // therefore, we need to monitor the progress of each module independently as well. + // however here, the metrics is currently only added to the writer (the indexer). + // it should also be added other modules in order to monitor changes in finalized height and indexed height + // as well as the trie updates downloading activities. writer = metrics.NewMetricsWriter(write) } // At this point, we can initialize the core business logic of the indexer, // with the mapper's finite state machine and transitions. We also want to // load and inject the root checkpoint if it is given as a parameter. + // TODO(leo): use follower state + // the write will index both the block and trie updates for each block, + // actually, it is not necessary to index the block for the finalized height, because + // protocol state has already indexed the block by height. + // in other words, indexing the block would be creating a duplicated index. + // We could implement the read so that the block is read from the protocol state, rather + // than from a duplicated index. transitions := mapper.NewTransitions(log, consensus, execution, read, writer, mapper.WithSkipRegisters(flagSkip), ) + // TODO(leo): replace FSM with job queue + // FSM makes it clear about state transition, however, it also limits the writer + // to index only one block at a time, which would be very slow, especially in the case when + // the archive node needs to catch up. + // if multiple blocks are finalized, technically, they can be indexed concurrently, meaning + // concurrently fetching data for multiple blocks and index them. we just need to carefully + // set a limit on the number of blocks to be concurrently processed. + // There feature has already been implemented in + // [jobqueue](https://github.com/onflow/flow-go/blob/master/module/jobqueue/README.md) + // would be a good candidate for replacing FSM + // also indexing the trie updates and indexing the block data can also be done concurrently, which + // FSM doesn't allow state := mapper.EmptyState(flagCheckpoint) fsm := mapper.NewFSM(state, mapper.WithTransition(mapper.StatusInitialize, transitions.InitializeMapper), diff --git a/service/tracker/consensus.go b/service/tracker/consensus.go index fd2a3efe6..cb0906546 100644 --- a/service/tracker/consensus.go +++ b/service/tracker/consensus.go @@ -73,6 +73,7 @@ func (c *Consensus) OnBlockFinalized(block *model.Block) { c.log.Error().Err(err).Hex("block", blockID[:]).Msg("could not get header") return } + // TODO(leo): protect by mutex, otherwise not concurrent safe c.last = header.Height c.log.Info().Hex("block", blockID[:]).Uint64("height", header.Height).Msg("block finalization processed") }