Skip to content
This repository was archived by the owner on Mar 19, 2024. It is now read-only.
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 32 additions & 7 deletions cmd/flow-archive-live/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
grpczerolog "github.com/grpc-ecosystem/go-grpc-middleware/providers/zerolog/v2"
"github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/logging"
"github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/tags"
"github.com/onflow/flow-go/utils/grpcutils"
"github.com/rs/zerolog"
"github.com/spf13/pflag"
"google.golang.org/api/option"
Expand All @@ -43,6 +44,7 @@ import (
"github.com/onflow/flow-archive/service/profiler"
"github.com/onflow/flow-archive/service/storage"
"github.com/onflow/flow-archive/service/storage2"
"github.com/onflow/flow-archive/service/stream"
"github.com/onflow/flow-archive/service/tracker"
)

Expand Down Expand Up @@ -85,11 +87,12 @@ func run() int {
flagSeedAddress string
flagSeedKey string
flagTracing bool
flagMsgSize int
)
pflag.StringVarP(&flagAddress, "address", "a", "127.0.0.1:5005", "bind address for serving DPS API")
pflag.StringVarP(&flagAccessAddress, "address-access", "A", "127.0.0.1:9000", "address to serve Access API on")
pflag.StringVarP(&flagBootstrap, "bootstrap", "b", "bootstrap", "path to directory with bootstrap information for spork")
pflag.StringVarP(&flagBucket, "bucket", "u", "", "Google Cloude Storage bucket with block data records")
pflag.StringVarP(&flagBucket, "bucket", "u", "", "Google Cloud Storage bucket with block data records")
pflag.StringVarP(&flagCheckpoint, "checkpoint", "c", "", "path to root checkpoint file for execution state trie")
pflag.StringVarP(&flagData, "data", "d", "data", "path to database directory for protocol data")
pflag.StringVarP(&flagLevel, "level", "l", "info", "log output level")
Expand All @@ -108,6 +111,7 @@ func run() int {
pflag.StringVar(&flagSeedAddress, "seed-address", "", "host address of seed node to follow consensus")
pflag.StringVar(&flagSeedKey, "seed-key", "", "hex-encoded public network key of seed node to follow consensus")
pflag.BoolVarP(&flagTracing, "tracing", "t", false, "enable tracing for this instance")
pflag.IntVar(&flagMsgSize, "grpc-msg-size", grpcutils.DefaultMaxMsgSize, "size limit of grpc messages")

pflag.Parse()

Expand Down Expand Up @@ -282,6 +286,31 @@ func run() int {
return failure
}

// On the other side, we also need access to the execution data from either GCP or Access Node API
var streamer tracker.DataStreamer
if flagBucket == "" {
streamer = stream.NewExecDataStreamer(log, flagSeedAddress, flagMsgSize,
stream.WithCatchupBlocks(blockIDs),
)
} else {
client, err := gcloud.NewClient(context.Background(),
option.WithoutAuthentication(),
)
if err != nil {
log.Error().Err(err).Msg("could not connect GCP client")
return failure
}
defer func() {
err := client.Close()
if err != nil {
log.Error().Err(err).Msg("could not close GCP client")
}
}()
bucket := client.Bucket(flagBucket)
streamer = cloud.NewGCPStreamer(log, bucket,
cloud.WithCatchupBlocks(blockIDs),

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm afraid the number of blockIDs might be too big especially the indexing speed is far behind.

Could we at least log the total number of blockIDs here?

)
}

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we need to print log about which streamer end up being used

log.Info().Msgf("%v blocks to catchup", len(blockIDs))
// On the other side, we also need access to the execution data. The cloud
// streamer is responsible for retrieving block execution records from a
Expand All @@ -300,16 +329,12 @@ func run() int {
log.Error().Err(err).Msg("could not close GCP client")
}
}()
bucket := client.Bucket(flagBucket)
stream := cloud.NewGCPStreamer(log, bucket,
cloud.WithCatchupBlocks(blockIDs),
)

// Next, we can initialize our consensus and execution trackers. They are
// responsible for tracking changes to the available data, for the consensus
// follower and related consensus data on one side, and the cloud streamer
// and available execution records on the other side.
execution, err := tracker.NewExecution(log, protocolDB, stream)
execution, err := tracker.NewExecution(log, protocolDB, streamer)
if err != nil {
log.Error().Err(err).Msg("could not initialize execution tracker")
return failure
Expand All @@ -325,7 +350,7 @@ func run() int {
// will use the callback to make additional data available to the mapper,
// while the cloud streamer will use the callback to download execution data
// for finalized blocks.
follow.AddOnBlockFinalizedConsumer(stream.OnBlockFinalized)
follow.AddOnBlockFinalizedConsumer(streamer.OnBlockFinalized)
follow.AddOnBlockFinalizedConsumer(consensus.OnBlockFinalized)

// If metrics are enabled, the mapper should use the metrics writer. Otherwise, it can
Expand Down
36 changes: 36 additions & 0 deletions service/stream/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package stream

import (
"github.com/onflow/flow-go/model/flow"
)

// DefaultConfig is the default configuration for the Streamer.
var DefaultConfig = Config{
// small buffer size as there's almost no latency between API and consensus follower in ANs
BufferSize: 4,
CatchupBlocks: []flow.Identifier{},
}

// Config is the configuration for a Streamer.
type Config struct {
BufferSize uint
CatchupBlocks []flow.Identifier
}

// Option is a function that can be applied to a Config.
type Option func(*Config)

// WithBufferSize can be used to specify the buffer size for a Streamer to use.
func WithBufferSize(size uint) Option {
return func(cfg *Config) {
cfg.BufferSize = size
}
}

// WithCatchupBlocks injects a number of block IDs that are already finalized,
// but for which we still need to download the execution data records.
func WithCatchupBlocks(blockIDs []flow.Identifier) Option {
return func(cfg *Config) {
cfg.CatchupBlocks = blockIDs
}
}
179 changes: 179 additions & 0 deletions service/stream/exec_data_streamer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,179 @@
package stream

import (
"context"
"errors"
"fmt"
"sync/atomic"

"cloud.google.com/go/storage"
"github.com/hashicorp/go-multierror"
"github.com/onflow/flow-archive/models/archive"
"github.com/onflow/flow-go/consensus/hotstuff/model"
"github.com/onflow/flow-go/engine/execution/ingestion/uploader"
"github.com/onflow/flow-go/model/flow"
"github.com/onflow/flow/protobuf/go/flow/access"
execData "github.com/onflow/flow/protobuf/go/flow/executiondata"
"github.com/rs/zerolog"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
)

type ExecDataStreamer struct {
log zerolog.Logger
execApi execData.ExecutionDataAPIClient
accessApi access.AccessAPIClient
queue *archive.SafeDeque // queue of block identifiers for next downloads
buffer *archive.SafeDeque // queue of downloaded execution data records
limit uint // buffer size limit for downloaded records
busy uint32 // used as a guard to avoid concurrent polling
ctx context.Context
chain flow.ChainID
}

func NewExecDataStreamer(log zerolog.Logger, accessAddr string, msgSize int, options ...Option) *ExecDataStreamer {
cfg := DefaultConfig
for _, option := range options {
option(&cfg)
}

ctx := context.Background()

// initialize clients
opts := []grpc.DialOption{grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(msgSize)),
grpc.WithTransportCredentials(insecure.NewCredentials())}
conn, err := grpc.Dial(accessAddr, opts...)
if err != nil {
log.Error()
}
execDataAPI := execData.NewExecutionDataAPIClient(conn)
accessAPI := access.NewAccessAPIClient(conn)

// get chainID from network params
params, err := accessAPI.GetNetworkParameters(ctx, &access.GetNetworkParametersRequest{})
if err != nil {
log.Error().Err(err).Msg("unable to get network params")
}
chain := flow.ChainID(params.ChainId)

return &ExecDataStreamer{
log: log,
execApi: execDataAPI,
accessApi: accessAPI,
queue: archive.NewDeque(),
buffer: archive.NewDeque(),
limit: cfg.BufferSize,
busy: 0,
ctx: ctx,
chain: chain,
}
}

// OnBlockFinalized is a callback for the Flow consensus follower. It is called
// each time a block is finalized by the Flow consensus algorithm.
func (e *ExecDataStreamer) OnBlockFinalized(block *model.Block) {
blockID := block.BlockID
// We push the block ID to the front of the queue; the streamer will try to
// download the blocks in a FIFO manner.
e.queue.PushFront(blockID)

e.log.Debug().Hex("block", blockID[:]).Msg("execution record queued for download")
}

// Next returns the next available block data. It returns an ErrUnavailable if no block
// data is available at the moment.
func (e *ExecDataStreamer) Next() (*uploader.BlockData, error) {
// same implementation as GCPStreamer
go e.poll()

@peterargue peterargue Apr 5, 2023

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You could simplify this logic a bit by running a fixed size worker pool that continuously reads from e.queue fills e.buffer up to the configured size. Then there is no need for explicit polling and you can tune the number of worker needed to keep up with indexing

if e.buffer.Len() == 0 {
e.log.Debug().Msg("buffer empty, no execution record available")
return nil, archive.ErrUnavailable
}
record := e.buffer.PopBack()
return record.(*uploader.BlockData), nil
}

func (e *ExecDataStreamer) poll() {
// same implementation as GCPStreamer
if !atomic.CompareAndSwapUint32(&e.busy, 0, 1) {
return
}
defer atomic.StoreUint32(&e.busy, 0)
err := e.pullExecData()
if errors.Is(err, storage.ErrObjectNotExist) {
e.log.Debug().Msg("next execution record not available, download stopped")
return
}
if err != nil {
e.log.Error().Err(err).Msg("could not download execution records")
return
}
}

func (e ExecDataStreamer) pullExecData() error {
for {
// same implementation as GCPStreamer
if uint(e.buffer.Len()) >= e.limit {
e.log.Debug().Uint("limit", e.limit).Msg("buffer full, stopping execution record download")
return nil
}

if uint(e.queue.Len()) == 0 {
e.log.Debug().Msg("queue empty, stopping execution record download")
return nil
}
blockID := e.queue.PopBack().(flow.Identifier)
record, err := e.getUploaderBlockData(blockID)
if err != nil {
e.queue.PushBack(blockID)
return fmt.Errorf("could not pull execution record (name: %s): %w", blockID, err)
}

e.log.Info().
Str("name", blockID.String()).
Uint64("height", record.Block.Header.Height).
Hex("block", blockID[:]).
Msg("pushing execution record into buffer")
e.buffer.PushFront(record)
}
}

func (e *ExecDataStreamer) getUploaderBlockData(blockID flow.Identifier) (*uploader.BlockData, error) {
// TODO : get rid of uploader.BlockData use
// we currently have to query additional data from ANs to keep a 1:1 match of the data from the GCP strea
var errs *multierror.Error
// get block
br := &access.GetBlockByIDRequest{Id: blockID[:]}
b, err := e.accessApi.GetBlockByID(e.ctx, br)
if err != nil {
errs = multierror.Append(errs, fmt.Errorf("failed to get block data for blockID (%s): %w", blockID, err))
}
Comment on lines +147 to +150

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could we just get this from the local storage?

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we'd only have the header in local

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why only the header? the archive node is running a consensus follower which should be indexing full blocks


// get transactions
txr := &access.GetTransactionsByBlockIDRequest{BlockId: blockID[:]}
tx, err := e.accessApi.GetTransactionResultsByBlockID(e.ctx, txr)
if err != nil {
errs = multierror.Append(errs, fmt.Errorf("failed to get transaction results for blockID (%s): %w", blockID, err))
}
Comment on lines +154 to +157

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we planning to serve the transaction result data? if not, maybe we can just stop indexing it?

If we want to keep it, this approach is fine for now, but we'll have to handle the case where the response size is too large. In that case, we need to get the total tx count (from the list of collections in the exec data), and call GetTransactionResultByIndex for each index to fetch the results individually.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we now serve transactions from the access API implemented in Archive, but yes, I want to remove this call entirely, left as a todo.

// get exec data
exr := &execData.GetExecutionDataByBlockIDRequest{BlockId: blockID[:]}
ex, err := e.execApi.GetExecutionDataByBlockID(e.ctx, exr)
if err != nil {
errs = multierror.Append(errs, fmt.Errorf("failed to get execution data for blockID (%s): %w", blockID, err))
}
err = errs.ErrorOrNil()
if err != nil {
return nil, err
}
return e.aggregateToBlockData(b, tx, ex)
}

// function to aggregate response data tp uploader.BlockData
// TODO: remove this function when we use the streaming API
func (e *ExecDataStreamer) aggregateToBlockData(
bl *access.BlockResponse,
tx *access.TransactionResultsResponse,
ex *execData.GetExecutionDataByBlockIDResponse,
) (*uploader.BlockData, error) {
return nil, nil
}
17 changes: 17 additions & 0 deletions service/stream/exec_data_streamer_internal_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package stream

import (
"testing"
)

func TestExecDataStreamer_NewExecDataStreamer(t *testing.T) {
// Todo: implement
}

func TestExecDataStreamer_Next(t *testing.T) {
// Todo: implement
}

func TestExecDataStreamer_OnBlockFinalized(t *testing.T) {
// Todo: implement
}
7 changes: 7 additions & 0 deletions service/tracker/record.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package tracker

import (
"github.com/onflow/flow-go/consensus/hotstuff/model"
"github.com/onflow/flow-go/engine/execution/ingestion/uploader"
"github.com/onflow/flow-go/model/flow"
)
Expand All @@ -15,3 +16,9 @@ type RecordStreamer interface {
type RecordHolder interface {
Record(blockID flow.Identifier) (*uploader.BlockData, error)
}

// DataStreamer is a general interface that allows implementations of ExecDataStreamer and GCPStreamer
type DataStreamer interface {
Next() (*uploader.BlockData, error)
OnBlockFinalized(block *model.Block)
}