Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 12 additions & 8 deletions internal/claimer/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,8 @@ func (cb *claimerBlockchain) findClaimSubmittedEventAndSucc(
) {
ic, err := iconsensus.NewIConsensus(application.IConsensusAddress, cb.client)
if err != nil {
return nil, nil, nil, err
return nil, nil, nil, fmt.Errorf("creating IConsensus binding for submitted events of application: %v, epoch: %v (%v): %w",
application.IApplicationAddress, epoch.Index, epoch.VirtualIndex, err)
}
oracle := newOracle(ic.GetNumberOfSubmittedClaims)
events := []*iconsensus.IConsensusClaimSubmitted{}
Expand All @@ -178,11 +179,13 @@ func (cb *claimerBlockchain) findClaimSubmittedEventAndSucc(

numSubmittedClaims, err := oracle(ctx, epoch.LastBlock)
if err != nil {
return nil, nil, nil, err
return nil, nil, nil, fmt.Errorf("querying number of submitted claims for epoch %v (%v) at block %d: %w",
epoch.Index, epoch.VirtualIndex, epoch.LastBlock, err)
}
_, err = ethutil.FindTransitions(ctx, fromBlock, toBlock, numSubmittedClaims, oracle, onHit)
if err != nil {
return nil, nil, nil, fmt.Errorf("failed to walk ClaimSubmitted transitions: %w", err)
return nil, nil, nil, fmt.Errorf("walking ClaimSubmitted transitions for application: %v, epoch %v (%v): %w",
application.IApplicationAddress, epoch.Index, epoch.VirtualIndex, err)
}

if len(events) == 0 {
Expand Down Expand Up @@ -210,7 +213,7 @@ func (cb *claimerBlockchain) findClaimAcceptedEventAndSucc(
) {
ic, err := iconsensus.NewIConsensus(application.IConsensusAddress, cb.client)
if err != nil {
return nil, nil, nil, err
return nil, nil, nil, fmt.Errorf("creating IConsensus binding for accepted events: %w", err)
}

oracle := newOracle(ic.GetNumberOfAcceptedClaims)
Expand Down Expand Up @@ -239,11 +242,12 @@ func (cb *claimerBlockchain) findClaimAcceptedEventAndSucc(

numAcceptedClaims, err := oracle(ctx, epoch.LastBlock)
if err != nil {
return nil, nil, nil, err
return nil, nil, nil, fmt.Errorf("querying number of accepted claims at block %d: %w", epoch.LastBlock, err)
}
_, err = ethutil.FindTransitions(ctx, fromBlock, toBlock, numAcceptedClaims, oracle, onHit)
if err != nil {
return nil, nil, nil, fmt.Errorf("failed to walk ClaimAccepted transitions: %w", err)
return nil, nil, nil, fmt.Errorf("walking ClaimAccepted transitions for application: %v, epoch %v (%v): %w",
application.IApplicationAddress, epoch.Index, epoch.VirtualIndex, err)
}

if len(events) == 0 {
Expand Down Expand Up @@ -283,7 +287,7 @@ func (cb *claimerBlockchain) pollTransaction(
if errors.Is(err, ethereum.NotFound) {
return false, nil, nil
}
return false, nil, err
return false, nil, fmt.Errorf("fetching transaction receipt for %v: %w", txHash, err)
}

// receipt must be committed before use. Return false until it is.
Expand All @@ -309,7 +313,7 @@ func (cb *claimerBlockchain) getDefaultBlockNumber(ctx context.Context) (*big.In

hdr, err := cb.client.HeaderByNumber(ctx, big.NewInt(nr))
if err != nil {
return nil, err
return nil, fmt.Errorf("fetching header for block %v: %w", nr, err)
}
return hdr.Number, nil
}
8 changes: 4 additions & 4 deletions internal/claimer/claimer.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ func (s *Service) checkClaimsInFlight(
// NOTE: there is no point in trying the other applications on a database error
// so we just return and try again later (next tick)
if err != nil {
return err
return fmt.Errorf("updating epoch %d (%d) with submitted claim: %w", computedEpoch.Index, computedEpoch.VirtualIndex, err)
}

// we expect apps[key] to always exist,
Expand Down Expand Up @@ -203,7 +203,7 @@ func (s *Service) findClaimSubmittedEventAndSucc(
ic, prevClaimSubmissionEvent, currClaimSubmissionEvent, err :=
s.blockchain.findClaimSubmittedEventAndSucc(ctx, app, prevEpoch, fromBlock, toBlock)
if err != nil {
return nil, nil, nil, err
return nil, nil, nil, fmt.Errorf("finding claim submitted event for epoch %d (%d): %w", prevEpoch.Index, prevEpoch.VirtualIndex, err)
}

if prevClaimSubmissionEvent == nil {
Expand Down Expand Up @@ -439,7 +439,7 @@ func (s *Service) findClaimAcceptedEventAndSucc(
ic, prevClaimAcceptanceEvent, currClaimAcceptanceEvent, err :=
s.blockchain.findClaimAcceptedEventAndSucc(ctx, app, prevEpoch, fromBlock, toBlock)
if err != nil {
return nil, nil, nil, err
return nil, nil, nil, fmt.Errorf("finding claim accepted event for epoch %d (%d): %w", prevEpoch.Index, prevEpoch.VirtualIndex, err)
}

if prevClaimAcceptanceEvent == nil {
Expand Down Expand Up @@ -565,7 +565,7 @@ func (s *Service) checkConsensusForAddressChange(
) error {
newConsensusAddress, err := s.blockchain.getConsensusAddress(s.Context, app)
if err != nil {
return err
return fmt.Errorf("getting consensus address for app %v: %w", app.IApplicationAddress, err)
}
if app.IConsensusAddress != newConsensusAddress {
err = s.setApplicationInoperable(
Expand Down
10 changes: 5 additions & 5 deletions internal/claimer/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,17 +72,17 @@ func Create(ctx context.Context, c *CreateInfo) (*Service, error) {

err = service.Create(ctx, &c.CreateInfo, &s.Service)
if err != nil {
return nil, err
return nil, fmt.Errorf("creating base service: %w", err)
}

nodeConfig, err := setupPersistentConfig(ctx, s.Logger, c.Repository, &c.Config)
if err != nil {
return nil, err
return nil, fmt.Errorf("setting up persistent config: %w", err)
}

chainId, err := c.EthConn.ChainID(ctx)
if err != nil {
return nil, err
return nil, fmt.Errorf("querying chain ID: %w", err)
}
if chainId.Uint64() != c.Config.BlockchainId {
return nil, fmt.Errorf("chainId mismatch: network %d != provided %d", chainId.Uint64(), c.Config.BlockchainId)
Expand All @@ -99,7 +99,7 @@ func Create(ctx context.Context, c *CreateInfo) (*Service, error) {
if s.submissionEnabled {
txOpts, err = auth.GetTransactOpts(ctx, chainId)
if err != nil {
return nil, err
return nil, fmt.Errorf("getting transaction options: %w", err)
}
}

Expand Down Expand Up @@ -190,7 +190,7 @@ func setupPersistentConfig(
logger.Info("Initializing claimer persistent config", "config", nc.Value)
err = repository.SaveNodeConfig(ctx, repo, &nc)
if err != nil {
return nil, err
return nil, fmt.Errorf("saving claimer persistent config: %w", err)
}
return &nc.Value, nil
} else if err == nil {
Expand Down
28 changes: 14 additions & 14 deletions internal/repository/postgres/claimer.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ func (r *PostgresRepository) selectOldestClaimPerApp(
sqlStr, args := stmt.Sql()
rows, err := tx.Query(ctx, sqlStr, args...)
if err != nil {
return nil, nil, err
return nil, nil, fmt.Errorf("querying oldest claim per app (status=%v): %w", epochStatus, err)
}
defer rows.Close()

Expand Down Expand Up @@ -126,13 +126,13 @@ func (r *PostgresRepository) selectOldestClaimPerApp(
&application.UpdatedAt,
)
if err != nil {
return nil, nil, err
return nil, nil, fmt.Errorf("scanning epoch/application row: %w", err)
}
epochs[application.ID] = &epoch
applications[application.ID] = &application
}
if err := rows.Err(); err != nil {
return nil, nil, err
return nil, nil, fmt.Errorf("iterating oldest claim rows: %w", err)
}
return epochs, applications, nil
}
Expand Down Expand Up @@ -186,7 +186,7 @@ func (r *PostgresRepository) selectNewestAcceptedClaimPerApp(
sqlStr, args := stmt.Sql()
rows, err := tx.Query(ctx, sqlStr, args...)
if err != nil {
return nil, err
return nil, fmt.Errorf("querying newest accepted claim per app: %w", err)
}
defer rows.Close()

Expand All @@ -206,12 +206,12 @@ func (r *PostgresRepository) selectNewestAcceptedClaimPerApp(
&epoch.UpdatedAt,
)
if err != nil {
return nil, err
return nil, fmt.Errorf("scanning accepted epoch row: %w", err)
}
epochs[epoch.ApplicationID] = &epoch
}
if err := rows.Err(); err != nil {
return nil, err
return nil, fmt.Errorf("iterating accepted claim rows: %w", err)
}
return epochs, nil
}
Expand All @@ -227,19 +227,19 @@ func (r *PostgresRepository) SelectSubmittedClaimPairsPerApp(ctx context.Context
AccessMode: pgx.ReadOnly,
})
if err != nil {
return nil, nil, nil, err
return nil, nil, nil, fmt.Errorf("beginning read-only transaction for submitted claims: %w", err)
}
// Read-only tx: rollback releases the snapshot, equivalent to commit.
defer tx.Rollback(ctx) //nolint:errcheck

computed, applications, err := r.selectOldestClaimPerApp(ctx, tx, model.EpochStatus_ClaimComputed)
if err != nil {
return nil, nil, nil, err
return nil, nil, nil, fmt.Errorf("selecting oldest computed claim per app: %w", err)
}

acceptedOrSubmitted, err := r.selectNewestAcceptedClaimPerApp(ctx, tx, true)
if err != nil {
return nil, nil, nil, err
return nil, nil, nil, fmt.Errorf("selecting newest accepted claim per app: %w", err)
}

return acceptedOrSubmitted, computed, applications, err
Expand All @@ -256,19 +256,19 @@ func (r *PostgresRepository) SelectAcceptedClaimPairsPerApp(ctx context.Context)
AccessMode: pgx.ReadOnly,
})
if err != nil {
return nil, nil, nil, err
return nil, nil, nil, fmt.Errorf("beginning read-only transaction for accepted claims: %w", err)
}
// Read-only tx: rollback releases the snapshot, equivalent to commit.
defer tx.Rollback(ctx) //nolint:errcheck

submitted, applications, err := r.selectOldestClaimPerApp(ctx, tx, model.EpochStatus_ClaimSubmitted)
if err != nil {
return nil, nil, nil, err
return nil, nil, nil, fmt.Errorf("selecting oldest submitted claim per app: %w", err)
}

accepted, err := r.selectNewestAcceptedClaimPerApp(ctx, tx, false)
if err != nil {
return nil, nil, nil, err
return nil, nil, nil, fmt.Errorf("selecting newest accepted claim per app: %w", err)
}

return accepted, submitted, applications, err
Expand Down Expand Up @@ -301,7 +301,7 @@ func (r *PostgresRepository) UpdateEpochWithSubmittedClaim(
sqlStr, args := updStmt.Sql()
cmd, err := r.db.Exec(ctx, sqlStr, args...)
if err != nil {
return err
return fmt.Errorf("executing update for submitted claim (app=%d, index=%d): %w", applicationID, index, err)
}
if cmd.RowsAffected() == 0 {
return repository.ErrNoUpdate
Expand Down Expand Up @@ -333,7 +333,7 @@ func (r *PostgresRepository) UpdateEpochWithAcceptedClaim(
sqlStr, args := updStmt.Sql()
cmd, err := r.db.Exec(ctx, sqlStr, args...)
if err != nil {
return err
return fmt.Errorf("executing update for accepted claim (app=%d, index=%d): %w", applicationID, index, err)
}
if cmd.RowsAffected() == 0 {
return repository.ErrNoUpdate
Expand Down
Loading