diff --git a/cmd/flow-archive-live/main.go b/cmd/flow-archive-live/main.go index e7a35d25..7dc78c55 100644 --- a/cmd/flow-archive-live/main.go +++ b/cmd/flow-archive-live/main.go @@ -18,6 +18,7 @@ import ( grpczerolog "github.com/grpc-ecosystem/go-grpc-middleware/providers/zerolog/v2" "github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/logging" "github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/tags" + "github.com/onflow/flow-go/utils/grpcutils" "github.com/rs/zerolog" "github.com/spf13/pflag" "google.golang.org/api/option" @@ -43,6 +44,7 @@ import ( "github.com/onflow/flow-archive/service/profiler" "github.com/onflow/flow-archive/service/storage" "github.com/onflow/flow-archive/service/storage2" + "github.com/onflow/flow-archive/service/stream" "github.com/onflow/flow-archive/service/tracker" ) @@ -85,11 +87,12 @@ func run() int { flagSeedAddress string flagSeedKey string flagTracing bool + flagMsgSize int ) pflag.StringVarP(&flagAddress, "address", "a", "127.0.0.1:5005", "bind address for serving DPS API") pflag.StringVarP(&flagAccessAddress, "address-access", "A", "127.0.0.1:9000", "address to serve Access API on") pflag.StringVarP(&flagBootstrap, "bootstrap", "b", "bootstrap", "path to directory with bootstrap information for spork") - pflag.StringVarP(&flagBucket, "bucket", "u", "", "Google Cloude Storage bucket with block data records") + pflag.StringVarP(&flagBucket, "bucket", "u", "", "Google Cloud Storage bucket with block data records") pflag.StringVarP(&flagCheckpoint, "checkpoint", "c", "", "path to root checkpoint file for execution state trie") pflag.StringVarP(&flagData, "data", "d", "data", "path to database directory for protocol data") pflag.StringVarP(&flagLevel, "level", "l", "info", "log output level") @@ -108,6 +111,7 @@ func run() int { pflag.StringVar(&flagSeedAddress, "seed-address", "", "host address of seed node to follow consensus") pflag.StringVar(&flagSeedKey, "seed-key", "", "hex-encoded public network key of seed node to follow consensus") pflag.BoolVarP(&flagTracing, "tracing", "t", false, "enable tracing for this instance") + pflag.IntVar(&flagMsgSize, "grpc-msg-size", grpcutils.DefaultMaxMsgSize, "size limit of grpc messages") pflag.Parse() @@ -282,6 +286,31 @@ func run() int { return failure } + // On the other side, we also need access to the execution data from either GCP or Access Node API + var streamer tracker.DataStreamer + if flagBucket == "" { + streamer = stream.NewExecDataStreamer(log, flagSeedAddress, flagMsgSize, + stream.WithCatchupBlocks(blockIDs), + ) + } else { + client, err := gcloud.NewClient(context.Background(), + option.WithoutAuthentication(), + ) + if err != nil { + log.Error().Err(err).Msg("could not connect GCP client") + return failure + } + defer func() { + err := client.Close() + if err != nil { + log.Error().Err(err).Msg("could not close GCP client") + } + }() + bucket := client.Bucket(flagBucket) + streamer = cloud.NewGCPStreamer(log, bucket, + cloud.WithCatchupBlocks(blockIDs), + ) + } log.Info().Msgf("%v blocks to catchup", len(blockIDs)) // On the other side, we also need access to the execution data. The cloud // streamer is responsible for retrieving block execution records from a @@ -300,16 +329,12 @@ func run() int { log.Error().Err(err).Msg("could not close GCP client") } }() - bucket := client.Bucket(flagBucket) - stream := cloud.NewGCPStreamer(log, bucket, - cloud.WithCatchupBlocks(blockIDs), - ) // Next, we can initialize our consensus and execution trackers. They are // responsible for tracking changes to the available data, for the consensus // follower and related consensus data on one side, and the cloud streamer // and available execution records on the other side. - execution, err := tracker.NewExecution(log, protocolDB, stream) + execution, err := tracker.NewExecution(log, protocolDB, streamer) if err != nil { log.Error().Err(err).Msg("could not initialize execution tracker") return failure @@ -325,7 +350,7 @@ func run() int { // will use the callback to make additional data available to the mapper, // while the cloud streamer will use the callback to download execution data // for finalized blocks. - follow.AddOnBlockFinalizedConsumer(stream.OnBlockFinalized) + follow.AddOnBlockFinalizedConsumer(streamer.OnBlockFinalized) follow.AddOnBlockFinalizedConsumer(consensus.OnBlockFinalized) // If metrics are enabled, the mapper should use the metrics writer. Otherwise, it can diff --git a/service/stream/config.go b/service/stream/config.go new file mode 100644 index 00000000..e899a365 --- /dev/null +++ b/service/stream/config.go @@ -0,0 +1,36 @@ +package stream + +import ( + "github.com/onflow/flow-go/model/flow" +) + +// DefaultConfig is the default configuration for the Streamer. +var DefaultConfig = Config{ + // small buffer size as there's almost no latency between API and consensus follower in ANs + BufferSize: 4, + CatchupBlocks: []flow.Identifier{}, +} + +// Config is the configuration for a Streamer. +type Config struct { + BufferSize uint + CatchupBlocks []flow.Identifier +} + +// Option is a function that can be applied to a Config. +type Option func(*Config) + +// WithBufferSize can be used to specify the buffer size for a Streamer to use. +func WithBufferSize(size uint) Option { + return func(cfg *Config) { + cfg.BufferSize = size + } +} + +// WithCatchupBlocks injects a number of block IDs that are already finalized, +// but for which we still need to download the execution data records. +func WithCatchupBlocks(blockIDs []flow.Identifier) Option { + return func(cfg *Config) { + cfg.CatchupBlocks = blockIDs + } +} diff --git a/service/stream/exec_data_streamer.go b/service/stream/exec_data_streamer.go new file mode 100644 index 00000000..fee29ca9 --- /dev/null +++ b/service/stream/exec_data_streamer.go @@ -0,0 +1,179 @@ +package stream + +import ( + "context" + "errors" + "fmt" + "sync/atomic" + + "cloud.google.com/go/storage" + "github.com/hashicorp/go-multierror" + "github.com/onflow/flow-archive/models/archive" + "github.com/onflow/flow-go/consensus/hotstuff/model" + "github.com/onflow/flow-go/engine/execution/ingestion/uploader" + "github.com/onflow/flow-go/model/flow" + "github.com/onflow/flow/protobuf/go/flow/access" + execData "github.com/onflow/flow/protobuf/go/flow/executiondata" + "github.com/rs/zerolog" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" +) + +type ExecDataStreamer struct { + log zerolog.Logger + execApi execData.ExecutionDataAPIClient + accessApi access.AccessAPIClient + queue *archive.SafeDeque // queue of block identifiers for next downloads + buffer *archive.SafeDeque // queue of downloaded execution data records + limit uint // buffer size limit for downloaded records + busy uint32 // used as a guard to avoid concurrent polling + ctx context.Context + chain flow.ChainID +} + +func NewExecDataStreamer(log zerolog.Logger, accessAddr string, msgSize int, options ...Option) *ExecDataStreamer { + cfg := DefaultConfig + for _, option := range options { + option(&cfg) + } + + ctx := context.Background() + + // initialize clients + opts := []grpc.DialOption{grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(msgSize)), + grpc.WithTransportCredentials(insecure.NewCredentials())} + conn, err := grpc.Dial(accessAddr, opts...) + if err != nil { + log.Error() + } + execDataAPI := execData.NewExecutionDataAPIClient(conn) + accessAPI := access.NewAccessAPIClient(conn) + + // get chainID from network params + params, err := accessAPI.GetNetworkParameters(ctx, &access.GetNetworkParametersRequest{}) + if err != nil { + log.Error().Err(err).Msg("unable to get network params") + } + chain := flow.ChainID(params.ChainId) + + return &ExecDataStreamer{ + log: log, + execApi: execDataAPI, + accessApi: accessAPI, + queue: archive.NewDeque(), + buffer: archive.NewDeque(), + limit: cfg.BufferSize, + busy: 0, + ctx: ctx, + chain: chain, + } +} + +// OnBlockFinalized is a callback for the Flow consensus follower. It is called +// each time a block is finalized by the Flow consensus algorithm. +func (e *ExecDataStreamer) OnBlockFinalized(block *model.Block) { + blockID := block.BlockID + // We push the block ID to the front of the queue; the streamer will try to + // download the blocks in a FIFO manner. + e.queue.PushFront(blockID) + + e.log.Debug().Hex("block", blockID[:]).Msg("execution record queued for download") +} + +// Next returns the next available block data. It returns an ErrUnavailable if no block +// data is available at the moment. +func (e *ExecDataStreamer) Next() (*uploader.BlockData, error) { + // same implementation as GCPStreamer + go e.poll() + if e.buffer.Len() == 0 { + e.log.Debug().Msg("buffer empty, no execution record available") + return nil, archive.ErrUnavailable + } + record := e.buffer.PopBack() + return record.(*uploader.BlockData), nil +} + +func (e *ExecDataStreamer) poll() { + // same implementation as GCPStreamer + if !atomic.CompareAndSwapUint32(&e.busy, 0, 1) { + return + } + defer atomic.StoreUint32(&e.busy, 0) + err := e.pullExecData() + if errors.Is(err, storage.ErrObjectNotExist) { + e.log.Debug().Msg("next execution record not available, download stopped") + return + } + if err != nil { + e.log.Error().Err(err).Msg("could not download execution records") + return + } +} + +func (e ExecDataStreamer) pullExecData() error { + for { + // same implementation as GCPStreamer + if uint(e.buffer.Len()) >= e.limit { + e.log.Debug().Uint("limit", e.limit).Msg("buffer full, stopping execution record download") + return nil + } + + if uint(e.queue.Len()) == 0 { + e.log.Debug().Msg("queue empty, stopping execution record download") + return nil + } + blockID := e.queue.PopBack().(flow.Identifier) + record, err := e.getUploaderBlockData(blockID) + if err != nil { + e.queue.PushBack(blockID) + return fmt.Errorf("could not pull execution record (name: %s): %w", blockID, err) + } + + e.log.Info(). + Str("name", blockID.String()). + Uint64("height", record.Block.Header.Height). + Hex("block", blockID[:]). + Msg("pushing execution record into buffer") + e.buffer.PushFront(record) + } +} + +func (e *ExecDataStreamer) getUploaderBlockData(blockID flow.Identifier) (*uploader.BlockData, error) { + // TODO : get rid of uploader.BlockData use + // we currently have to query additional data from ANs to keep a 1:1 match of the data from the GCP strea + var errs *multierror.Error + // get block + br := &access.GetBlockByIDRequest{Id: blockID[:]} + b, err := e.accessApi.GetBlockByID(e.ctx, br) + if err != nil { + errs = multierror.Append(errs, fmt.Errorf("failed to get block data for blockID (%s): %w", blockID, err)) + } + + // get transactions + txr := &access.GetTransactionsByBlockIDRequest{BlockId: blockID[:]} + tx, err := e.accessApi.GetTransactionResultsByBlockID(e.ctx, txr) + if err != nil { + errs = multierror.Append(errs, fmt.Errorf("failed to get transaction results for blockID (%s): %w", blockID, err)) + } + // get exec data + exr := &execData.GetExecutionDataByBlockIDRequest{BlockId: blockID[:]} + ex, err := e.execApi.GetExecutionDataByBlockID(e.ctx, exr) + if err != nil { + errs = multierror.Append(errs, fmt.Errorf("failed to get execution data for blockID (%s): %w", blockID, err)) + } + err = errs.ErrorOrNil() + if err != nil { + return nil, err + } + return e.aggregateToBlockData(b, tx, ex) +} + +// function to aggregate response data tp uploader.BlockData +// TODO: remove this function when we use the streaming API +func (e *ExecDataStreamer) aggregateToBlockData( + bl *access.BlockResponse, + tx *access.TransactionResultsResponse, + ex *execData.GetExecutionDataByBlockIDResponse, +) (*uploader.BlockData, error) { + return nil, nil +} diff --git a/service/stream/exec_data_streamer_internal_test.go b/service/stream/exec_data_streamer_internal_test.go new file mode 100644 index 00000000..cf424c96 --- /dev/null +++ b/service/stream/exec_data_streamer_internal_test.go @@ -0,0 +1,17 @@ +package stream + +import ( + "testing" +) + +func TestExecDataStreamer_NewExecDataStreamer(t *testing.T) { + // Todo: implement +} + +func TestExecDataStreamer_Next(t *testing.T) { + // Todo: implement +} + +func TestExecDataStreamer_OnBlockFinalized(t *testing.T) { + // Todo: implement +} diff --git a/service/tracker/record.go b/service/tracker/record.go index 3085524b..2463590c 100644 --- a/service/tracker/record.go +++ b/service/tracker/record.go @@ -1,6 +1,7 @@ package tracker import ( + "github.com/onflow/flow-go/consensus/hotstuff/model" "github.com/onflow/flow-go/engine/execution/ingestion/uploader" "github.com/onflow/flow-go/model/flow" ) @@ -15,3 +16,9 @@ type RecordStreamer interface { type RecordHolder interface { Record(blockID flow.Identifier) (*uploader.BlockData, error) } + +// DataStreamer is a general interface that allows implementations of ExecDataStreamer and GCPStreamer +type DataStreamer interface { + Next() (*uploader.BlockData, error) + OnBlockFinalized(block *model.Block) +}