Add option to get execution data from AN/Observer#89
Conversation
| pflag.StringVar(&flagSeedAddress, "seed-address", "", "host address of seed node to follow consensus") | ||
| pflag.StringVar(&flagSeedKey, "seed-key", "", "hex-encoded public network key of seed node to follow consensus") | ||
| pflag.BoolVarP(&flagTracing, "tracing", "t", false, "enable tracing for this instance") | ||
| pflag.BoolVarP(&flagDisableGCP, "disable-cloud-streaming", "g", false, "disable streaming exec data from GCP and use the Access Node instead") |
There was a problem hiding this comment.
rather having a separate option to disable GCP, could we just enable it only if bucket is set?
| ctx := context.Background() | ||
|
|
||
| // initialize clients | ||
| opts := []grpc.DialOption{grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(grpcutils.DefaultMaxMsgSize)), |
There was a problem hiding this comment.
This should be configurable. grpcutils.DefaultMaxMsgSize most likely won't be enough for large blocks.
| // data is available at the moment. | ||
| func (e *ExecDataStreamer) Next() (*uploader.BlockData, error) { | ||
| // same implementation as GCPStreamer | ||
| go e.poll() |
There was a problem hiding this comment.
You could simplify this logic a bit by running a fixed size worker pool that continuously reads from e.queue fills e.buffer up to the configured size. Then there is no need for explicit polling and you can tune the number of worker needed to keep up with indexing
| b, err := e.accessApi.GetBlockByID(e.ctx, br) | ||
| if err != nil { | ||
| errs = multierror.Append(errs, fmt.Errorf("failed to get block data for blockID (%s): %w", blockID, err)) | ||
| } |
There was a problem hiding this comment.
could we just get this from the local storage?
There was a problem hiding this comment.
we'd only have the header in local
There was a problem hiding this comment.
why only the header? the archive node is running a consensus follower which should be indexing full blocks
| tx, err := e.accessApi.GetTransactionResultsByBlockID(e.ctx, txr) | ||
| if err != nil { | ||
| errs = multierror.Append(errs, fmt.Errorf("failed to get transaction results for blockID (%s): %w", blockID, err)) | ||
| } |
There was a problem hiding this comment.
Are we planning to serve the transaction result data? if not, maybe we can just stop indexing it?
If we want to keep it, this approach is fine for now, but we'll have to handle the case where the response size is too large. In that case, we need to get the total tx count (from the list of collections in the exec data), and call GetTransactionResultByIndex for each index to fetch the results individually.
There was a problem hiding this comment.
we now serve transactions from the access API implemented in Archive, but yes, I want to remove this call entirely, left as a todo.
| streamer = cloud.NewGCPStreamer(log, bucket, | ||
| cloud.WithCatchupBlocks(blockIDs), | ||
| ) | ||
| } |
There was a problem hiding this comment.
we need to print log about which streamer end up being used
| }() | ||
| bucket := client.Bucket(flagBucket) | ||
| streamer = cloud.NewGCPStreamer(log, bucket, | ||
| cloud.WithCatchupBlocks(blockIDs), |
There was a problem hiding this comment.
I'm afraid the number of blockIDs might be too big especially the indexing speed is far behind.
Could we at least log the total number of blockIDs here?
| return fmt.Errorf("could not pull execution record (name: %s): %w", blockID, err) | ||
| } | ||
|
|
||
| e.log.Debug(). |
There was a problem hiding this comment.
This log is worth to be INFO level
| e.log.Debug(). | |
| e.log.Info(). |
Goal of this PR
introduces a second way to get exec data via the exec data API from a trusted node
Fixes #32