diff --git a/Dockerfile b/Dockerfile index 8e7f933e2..75f413847 100644 --- a/Dockerfile +++ b/Dockerfile @@ -119,10 +119,11 @@ RUN go install gotest.tools/gotestsum@v${GOTESTSUM_VERSION} ENV PATH="${GOPATH}/bin:${PATH}" ENV GOLANGCI_LINT_CACHE=${GOCACHE}/golangci-lint -# Create /dapps directory owned by cartesi for Docker named volume pre-population. +# Create directories owned by cartesi for Docker named volume pre-population. # When a named volume is first mounted here, Docker copies this ownership. USER root RUN mkdir -p /dapps && chown cartesi:cartesi /dapps +RUN mkdir -p /var/lib/cartesi-rollups-node/logs && chown cartesi:cartesi /var/lib/cartesi-rollups-node/logs USER cartesi # ============================================================================= diff --git a/Makefile b/Makefile index 28b8279b0..2d6fc6f77 100644 --- a/Makefile +++ b/Makefile @@ -449,33 +449,14 @@ test-with-compose: ## Run all tests using docker compose with auto-shutdown integration-test-local: build echo-dapp reject-loop-dapp exception-loop-dapp ## Run integration tests locally (requires: make start && eval $$(make env)) @cartesi-rollups-cli db init - @NODE_LOG_FILE=$$(mktemp /tmp/rollups-node-log.XXXXXX); \ - echo "Node log file: $$NODE_LOG_FILE"; \ - echo "Starting node in background..."; \ - env CARTESI_ADVANCER_POLLING_INTERVAL=1 \ - CARTESI_VALIDATOR_POLLING_INTERVAL=1 \ - CARTESI_CLAIMER_POLLING_INTERVAL=1 \ - CARTESI_PRT_POLLING_INTERVAL=1 \ - cartesi-rollups-node > $$NODE_LOG_FILE 2>&1 & NODE_PID=$$!; \ - tail -f $$NODE_LOG_FILE & TAIL_PID=$$!; \ - trap 'kill $$TAIL_PID 2>/dev/null; echo "Stopping node (pid $$NODE_PID)..."; kill $$NODE_PID 2>/dev/null; wait $$NODE_PID 2>/dev/null; echo "Node logs saved to $$NODE_LOG_FILE"' EXIT; \ - echo "Waiting for node to become healthy..."; \ - attempts=0; \ - until curl -sf http://localhost:10000/readyz >/dev/null 2>&1; do \ - attempts=$$((attempts + 1)); \ - if [ $$attempts -ge 60 ]; then \ - echo "ERROR: Node failed to become healthy after 120 seconds"; \ - echo "Last 50 lines of node log:"; \ - tail -50 $$NODE_LOG_FILE; \ - exit 1; \ - fi; \ + @if lsof -ti:10000 >/dev/null 2>&1; then \ + echo "Killing stale node on port 10000..."; \ + kill $$(lsof -ti:10000) 2>/dev/null || true; \ sleep 2; \ - done; \ - echo "Node is healthy. Running integration tests..."; \ - export CARTESI_TEST_DAPP_PATH=$(CURDIR)/applications/echo-dapp; \ + fi + @export CARTESI_TEST_DAPP_PATH=$(CURDIR)/applications/echo-dapp; \ export CARTESI_TEST_REJECT_DAPP_PATH=$(CURDIR)/applications/reject-loop-dapp; \ export CARTESI_TEST_EXCEPTION_DAPP_PATH=$(CURDIR)/applications/exception-loop-dapp; \ - export CARTESI_TEST_NODE_LOG_FILE=$$NODE_LOG_FILE; \ $(MAKE) integration-test ci-test: ## Run the full CI test pipeline locally (lint + unit + integration) diff --git a/internal/claimer/blockchain.go b/internal/claimer/blockchain.go index 9d4cb71a5..e275ef5cc 100644 --- a/internal/claimer/blockchain.go +++ b/internal/claimer/blockchain.go @@ -90,7 +90,7 @@ func (cb *claimerBlockchain) submitClaimToBlockchain( tx, err := ic.SubmitClaim(cb.txOpts, application.IApplicationAddress, lastBlockNumber, *epoch.OutputsMerkleRoot) if err != nil { - cb.logger.Error("submitClaimToBlockchain:failed", + cb.logger.Warn("submitClaimToBlockchain:failed", "appContractAddress", application.IApplicationAddress, "claimHash", *epoch.OutputsMerkleRoot, "last_block", epoch.LastBlock, @@ -225,7 +225,13 @@ func (cb *claimerBlockchain) findClaimAcceptedEventAndSucc( onHit := newOnHit(ctx, application.IApplicationAddress, filter, func(it *iconsensus.IConsensusClaimAcceptedIterator) { event := it.Event - if (len(events) > 0) || claimAcceptedEventMatches(application, epoch, event) { + // Match on epoch identity (app + lastBlock) without + // requiring the merkle root to match. This ensures + // that a ClaimAccepted event from a different claim + // (outvoting in Quorum) is returned to the caller, + // where claimAcceptedEventMatches detects the + // mismatch and sets the app as inoperable. + if (len(events) > 0) || claimAcceptedEventMatchesEpoch(application, epoch, event) { events = append(events, event) } }, @@ -256,6 +262,13 @@ func (cb *claimerBlockchain) getConsensusAddress( return ethutil.GetConsensus(ctx, cb.client, app.IApplicationAddress) } +// isNotFirstClaimError checks whether an error from submitClaim is +// a NotFirstClaim revert, indicating the claim was already submitted +// on-chain (e.g., before a node restart). +func isNotFirstClaimError(err error) bool { + return ethutil.IsCustomError(err, iconsensus.IConsensusMetaData, "NotFirstClaim") +} + // poll a transaction for its receipt func (cb *claimerBlockchain) pollTransaction( ctx context.Context, diff --git a/internal/claimer/claimer.go b/internal/claimer/claimer.go index 9cc05fd4e..5fcbbac97 100644 --- a/internal/claimer/claimer.go +++ b/internal/claimer/claimer.go @@ -335,6 +335,70 @@ func (s *Service) submitClaimsAndUpdateDatabase( ) txHash, err := s.blockchain.submitClaimToBlockchain(ic, app, currEpoch) if err != nil { + // NotFirstClaim handling after restart. + // + // Gas estimation (eth_estimateGas) simulates + // the call before broadcasting, so the revert + // is caught without spending gas. This relies + // on txOpts.GasLimit == 0 (the default); if + // GasLimit were pre-set, the tx would skip + // estimation and revert on-chain. + // + // Authority: submitClaim checks a per-epoch + // bitmap. Any duplicate (same epoch, regardless + // of merkle root) reverts with NotFirstClaim. + // After restart this is benign — the node + // recomputed the same claim that was already + // on-chain. Both ClaimSubmitted and + // ClaimAccepted events were already emitted + // (Authority emits both atomically). + // + // Quorum: submitClaim first checks if this + // validator already voted for the SAME claim + // (same app + lastBlock + merkleRoot). If so, + // it silently returns — no revert, no event. + // It only reverts with NotFirstClaim when the + // validator voted for a DIFFERENT merkleRoot + // in the same epoch (checked via allVotes + // bitmap). After restart, this means the node + // recomputed a different claim hash than what + // it submitted pre-restart — a determinism + // violation. ClaimSubmitted was emitted for + // the original vote; ClaimAccepted is emitted + // only once a majority of validators agree. + if isNotFirstClaimError(err) { + if app.ConsensusType == model.Consensus_Quorum { + // Quorum only reverts with NotFirstClaim + // when the merkle root differs. This is + // unrecoverable: computation is expected + // to be deterministic, so recomputing + // will produce the same divergent hash. + err = s.setApplicationInoperable( + s.Context, + app, + "NotFirstClaim from Quorum consensus: "+ + "computed claim hash %s differs from "+ + "previously submitted claim for "+ + "epoch with last_block %d. "+ + "Possible determinism violation or "+ + "machine state corruption.", + hashToHex(currEpoch.OutputsMerkleRoot), + currEpoch.LastBlock, + ) + delete(computedEpochs, key) + errs = append(errs, err) + continue + } + s.Logger.Info( + "Claim already on-chain, "+ + "waiting for event sync", + "app", app.IApplicationAddress, + "claim_hash", + hashToHex(currEpoch.OutputsMerkleRoot), + "last_block", currEpoch.LastBlock, + ) + continue + } delete(computedEpochs, key) errs = append(errs, err) continue @@ -584,6 +648,18 @@ func claimAcceptedEventMatches(application *model.Application, epoch *model.Epoc epoch.LastBlock == event.LastProcessedBlockNumber.Uint64() } +// claimAcceptedEventMatchesEpoch checks if a ClaimAccepted event belongs to +// the same epoch (app + lastBlock) regardless of the merkle root. This is +// used to detect outvoting in Quorum: a ClaimAccepted event exists for the +// epoch but with a different merkle root than what this node submitted. +func claimAcceptedEventMatchesEpoch(application *model.Application, epoch *model.Epoch, event *iconsensus.IConsensusClaimAccepted) bool { + if application == nil || epoch == nil || event == nil { + return false + } + return application.IApplicationAddress == event.AppContract && + epoch.LastBlock == event.LastProcessedBlockNumber.Uint64() +} + func (s *Service) String() string { return s.Name } diff --git a/internal/claimer/claimer_test.go b/internal/claimer/claimer_test.go index eb774db53..648e18de3 100644 --- a/internal/claimer/claimer_test.go +++ b/internal/claimer/claimer_test.go @@ -207,10 +207,10 @@ func makeApplication() *model.Application { func makeEpoch(id int64, status model.EpochStatus, i uint64) *model.Epoch { outputsMerkleRoot := common.HexToHash("0x01") // dummy value - txHash := common.HexToHash("0x02") // dummy value + txHash := common.HexToHash("0x02") // dummy value return repotest.NewEpochBuilder(id). WithIndex(i). - WithBlocks(i * 10, i * 10 + 9). + WithBlocks(i*10, i*10+9). WithStatus(status). WithClaimTransactionHash(txHash). WithOutputsMerkleRoot(outputsMerkleRoot). @@ -267,6 +267,33 @@ func makeAcceptedEvent(app *model.Application, epoch *model.Epoch) *iconsensus.I } } +// rpcDataError simulates an RPC error with revert data, as returned by +// eth_estimateGas when the contract reverts. +type rpcDataError struct { + code int + msg string + data any +} + +func (e *rpcDataError) Error() string { return e.msg } +func (e *rpcDataError) ErrorCode() int { return e.code } +func (e *rpcDataError) ErrorData() any { return e.data } + +// notFirstClaimError creates an error that mimics a NotFirstClaim revert +// from eth_estimateGas, with the ABI error selector as revert data. +func notFirstClaimError() error { + parsed, _ := iconsensus.IConsensusMetaData.GetAbi() + id := parsed.Errors["NotFirstClaim"].ID + selector := fmt.Sprintf("0x%x", id[:4]) + return &rpcDataError{ + code: 3, + msg: "execution reverted", + data: selector + "000000000000000000000000" + + "01000000000000000000000000000000000000000000000000000000000000" + + "0000000000000000000000000000000000000000000000000000000000000027", + } +} + // ////////////////////////////////////////////////////////////////////////////// // Success // ////////////////////////////////////////////////////////////////////////////// @@ -581,6 +608,66 @@ func TestSubmitFailedClaim(t *testing.T) { assert.Equal(t, 0, len(errs)) } +// TestNotFirstClaimHandledGracefully verifies that when submitClaim reverts +// with NotFirstClaim (e.g., after a node restart where claimsInFlight was +// lost), the claimer handles it gracefully — no error, no claimsInFlight +// entry, and the claim is left for event sync to pick up. +func TestNotFirstClaimHandledGracefully(t *testing.T) { + m, r, b := newServiceMock() + defer r.AssertExpectations(t) + defer b.AssertExpectations(t) + + endBlock := big.NewInt(40) + app := makeApplication() + currEpoch := makeComputedEpoch(app, 3) + var prevEvent *iconsensus.IConsensusClaimSubmitted + var currEvent *iconsensus.IConsensusClaimSubmitted + + b.On("getConsensusAddress", mock.Anything, app). + Return(app.IConsensusAddress, nil).Once() + b.On("findClaimSubmittedEventAndSucc", mock.Anything, app, currEpoch, currEpoch.LastBlock+1, endBlock.Uint64()). + Return(&iconsensus.IConsensus{}, prevEvent, currEvent, nil).Once() + // submitClaim reverts with NotFirstClaim (caught by eth_estimateGas). + b.On("submitClaimToBlockchain", mock.Anything, app, currEpoch). + Return(common.Hash{}, notFirstClaimError()).Once() + + errs := m.submitClaimsAndUpdateDatabase( + makeEpochMap(), makeEpochMap(currEpoch), makeApplicationMap(app), endBlock) + assert.Equal(t, 0, len(errs)) + assert.Equal(t, 0, len(m.claimsInFlight)) +} + +// TestNotFirstClaimQuorumSetsInoperable verifies that when submitClaim reverts +// with NotFirstClaim for a Quorum app, the claimer marks the application as +// inoperable. In Quorum, NotFirstClaim means the validator previously submitted +// a different merkle root — a determinism violation. +func TestNotFirstClaimQuorumSetsInoperable(t *testing.T) { + m, r, b := newServiceMock() + defer r.AssertExpectations(t) + defer b.AssertExpectations(t) + + endBlock := big.NewInt(40) + app := makeApplication() + app.ConsensusType = model.Consensus_Quorum + currEpoch := makeComputedEpoch(app, 3) + var prevEvent *iconsensus.IConsensusClaimSubmitted + var currEvent *iconsensus.IConsensusClaimSubmitted + + b.On("getConsensusAddress", mock.Anything, app). + Return(app.IConsensusAddress, nil).Once() + b.On("findClaimSubmittedEventAndSucc", mock.Anything, app, currEpoch, currEpoch.LastBlock+1, endBlock.Uint64()). + Return(&iconsensus.IConsensus{}, prevEvent, currEvent, nil).Once() + b.On("submitClaimToBlockchain", mock.Anything, app, currEpoch). + Return(common.Hash{}, notFirstClaimError()).Once() + r.On("UpdateApplicationState", mock.Anything, int64(0), model.ApplicationState_Inoperable, mock.Anything). + Return(nil).Once() + + errs := m.submitClaimsAndUpdateDatabase( + makeEpochMap(), makeEpochMap(currEpoch), makeApplicationMap(app), endBlock) + assert.Equal(t, 1, len(errs)) + assert.Equal(t, 0, len(m.claimsInFlight)) +} + // !claimSubmittedMatche(prevClaim, prevEvent) func TestSubmitClaimWithAntecessorMismatch(t *testing.T) { m, r, b := newServiceMock() diff --git a/internal/evmreader/output.go b/internal/evmreader/output.go index 951f32f3c..545b349bd 100644 --- a/internal/evmreader/output.go +++ b/internal/evmreader/output.go @@ -129,6 +129,9 @@ func (r *Service) readAndUpdateOutputs( nextSearchBlock := lastOutputCheck + 1 outputExecutedEvents, err := r.readOutputExecutionsFromBlockChain(ctx, app, nextSearchBlock, mostRecentBlockNumber) if err != nil { + if errors.Is(err, context.Canceled) { + return // shutting down + } r.Logger.Error("Error reading output events", "application", app.application.Name, "address", app.application.IApplicationAddress, "error", err) diff --git a/internal/evmreader/sealedepochs.go b/internal/evmreader/sealedepochs.go index 7622539db..49addc7fa 100644 --- a/internal/evmreader/sealedepochs.go +++ b/internal/evmreader/sealedepochs.go @@ -88,6 +88,9 @@ func (r *Service) checkForEpochsAndInputs( err := r.processApplicationSealedEpochs(ctx, app, mostRecentBlockNumber) if err != nil { + if errors.Is(err, context.Canceled) { + return // shutting down + } r.Logger.Error("Error processing application sealed epochs", "application", app.application.Name, "consensus_address", app.application.IConsensusAddress, @@ -97,6 +100,9 @@ func (r *Service) checkForEpochsAndInputs( err = r.processApplicationOpenEpoch(ctx, app, mostRecentBlockNumber) if err != nil { + if errors.Is(err, context.Canceled) { + return // shutting down + } r.Logger.Error("Error processing application open epoch", "application", app.application.Name, "consensus_address", app.application.IConsensusAddress, diff --git a/internal/manager/instance.go b/internal/manager/instance.go index bf1c602ee..cca4e4250 100644 --- a/internal/manager/instance.go +++ b/internal/manager/instance.go @@ -178,6 +178,21 @@ func (m *MachineInstanceImpl) Synchronize(ctx context.Context, repo MachineRepos "app_processed_inputs", m.application.ProcessedInputs, "machine_processed_inputs", currentProcessed) + // Validate machine vs app state before querying the DB. + // This must be done upfront because the ListInputs query uses OFFSET to skip + // already-processed inputs, and when OFFSET >= total rows the query returns + // 0 rows — making the COUNT(*) OVER() window function value unavailable. + if currentProcessed > m.application.ProcessedInputs { + return fmt.Errorf( + "%w: machine has processed %d inputs but app only has %d", + ErrMachineSynchronization, currentProcessed, m.application.ProcessedInputs) + } + if currentProcessed == m.application.ProcessedInputs { + m.logger.Info("No inputs to replay during synchronization", + "address", appAddress) + return nil + } + initialProcessedInputs := currentProcessed replayed := uint64(0) toReplay := uint64(0) @@ -201,17 +216,7 @@ func (m *MachineInstanceImpl) Synchronize(ctx context.Context, repo MachineRepos m.logger.Error(errorMsg, "address", appAddress) return fmt.Errorf("%w: %s", ErrMachineSynchronization, errorMsg) } - if currentProcessed > totalCount { - return fmt.Errorf( - "%w: machine has processed %d inputs but DB only has %d", - ErrMachineSynchronization, currentProcessed, totalCount) - } toReplay = totalCount - currentProcessed - if toReplay == 0 { - m.logger.Info("No inputs to replay during synchronization", - "address", appAddress) - return nil - } } for _, input := range inputs { diff --git a/internal/manager/instance_test.go b/internal/manager/instance_test.go index 9f42fd130..1c3befeec 100644 --- a/internal/manager/instance_test.go +++ b/internal/manager/instance_test.go @@ -1217,7 +1217,10 @@ func (r *mockSyncRepository) ListInputs( } start := p.Offset if start >= uint64(len(r.inputs)) { - return nil, r.totalCount, nil + // Match real PostgreSQL behavior: COUNT(*) OVER() is a per-row window + // value. When OFFSET skips all rows, 0 rows are returned and the total + // count is never scanned — the caller sees 0, not the real total. + return nil, 0, nil } end := start + p.Limit if p.Limit == 0 || end > uint64(len(r.inputs)) { @@ -1354,7 +1357,7 @@ func (s *MachineInstanceSuite) TestSynchronize() { err := inst.Synchronize(context.Background(), repo, 1000) require.Error(err) require.ErrorIs(err, ErrMachineSynchronization) - require.Contains(err.Error(), "machine has processed 5 inputs but DB only has 3") + require.Contains(err.Error(), "machine has processed 5 inputs but app only has 3") }) s.Run("CountMismatch", func() { diff --git a/internal/prt/idaveconsensus_adapter.go b/internal/prt/idaveconsensus_adapter.go index 69cd19c2a..af9913d65 100644 --- a/internal/prt/idaveconsensus_adapter.go +++ b/internal/prt/idaveconsensus_adapter.go @@ -47,6 +47,20 @@ func (a *DaveConsensusAdapterImpl) CanSettle(opts *bind.CallOpts) (CanSettleResu }, nil } +// IsEpochSettled checks on-chain whether an epoch has already been settled by +// comparing the given epoch number against the current sealed epoch. If the +// sealed epoch has advanced past it, the settlement was already performed. +// This prevents duplicate Settle calls after a node restart. +func (a *DaveConsensusAdapterImpl) IsEpochSettled( + opts *bind.CallOpts, epochNumber uint64, +) (bool, error) { + sealed, err := a.consensus.GetCurrentSealedEpoch(opts) + if err != nil { + return false, err + } + return sealed.EpochNumber.Uint64() > epochNumber, nil +} + func (a *DaveConsensusAdapterImpl) Settle( opts *bind.TransactOpts, epochNumber *big.Int, outputsMerkleRoot [32]byte, proof [][32]byte, diff --git a/internal/prt/itournament_adapter.go b/internal/prt/itournament_adapter.go index a27d008f4..3baf53db4 100644 --- a/internal/prt/itournament_adapter.go +++ b/internal/prt/itournament_adapter.go @@ -46,8 +46,8 @@ func NewITournamentAdapter( func (a *ITournamentAdapterImpl) Result(opts *bind.CallOpts) (bool, [32]byte, [32]byte, error) { result, err := a.tournament.ArbitrationResult(opts) // ArbitrationResult reverts when it has finished with no winners - if info, ok := ExtractJsonErrorInfo(err); ok && info.HasData { - if dataStr, ok := info.Data.(string); ok && dataStr == TournamentFailedNoWinner { + if info, ok := ExtractJSONErrorInfo(err); ok && info.HasData { + if ethutil.MatchesSelector(info.Data, TournamentFailedNoWinner) { return true, [32]byte{}, [32]byte{}, nil } } @@ -72,6 +72,20 @@ func (a *ITournamentAdapterImpl) BondValue(opts *bind.CallOpts) (*big.Int, error return a.tournament.BondValue(opts) } +// IsCommitmentJoined checks on-chain whether a commitment has already been +// joined to this tournament. It calls the contract's getCommitment method +// and checks if the returned finalState is non-zero (indicating the commitment +// exists). This prevents duplicate JoinTournament calls after a node restart. +func (a *ITournamentAdapterImpl) IsCommitmentJoined( + opts *bind.CallOpts, commitmentRoot [32]byte, +) (bool, error) { + result, err := a.tournament.GetCommitment(opts, commitmentRoot) + if err != nil { + return false, err + } + return result.FinalState != [32]byte{}, nil +} + func (a *ITournamentAdapterImpl) JoinTournament( opts *bind.TransactOpts, finalState [32]byte, proof [][32]byte, leftNode [32]byte, rightNode [32]byte, diff --git a/internal/prt/prt.go b/internal/prt/prt.go index 04dcab9ef..5cda631af 100644 --- a/internal/prt/prt.go +++ b/internal/prt/prt.go @@ -679,12 +679,39 @@ func (s *Service) trySettle(ctx context.Context, app *Application, mostRecentBlo "epoch %d has missing required fields for settlement", epoch.Index) } + // Check on-chain if the epoch was already settled (e.g., after a node + // restart where settleInFlight was lost). CanSettle only checks if + // the tournament has finished, not if settlement was already performed. + alreadySettled, err := consensus.IsEpochSettled(callOpts, currentEpochIndex) + if err != nil { + s.Logger.Error("failed to check if epoch is already settled", "application", app.Name, + "epoch_index", currentEpochIndex, "error", err) + return err + } + if alreadySettled { + s.Logger.Info("Epoch already settled on-chain, waiting for event sync", + "application", app.Name, "epoch_index", currentEpochIndex) + return nil + } + s.Logger.Info("Sending Settle transaction", "application", app.Name, "epoch_index", epoch.Index, "outputs_merkle_root", epoch.OutputsMerkleRoot.String()) tx, err := consensus.Settle(s.txOpts, result.EpochNumber, *epoch.OutputsMerkleRoot, hashSliceToByteSlice(epoch.OutputsMerkleProof)) if err != nil { + // The contract reverts with IncorrectEpochNumber when the epoch was + // already settled. This can happen after a restart if the on-chain + // check (IsEpochSettled) used a slightly stale block number, or if + // another entity settled the epoch concurrently. + if isIncorrectEpochNumberError(err) { + s.Logger.Info( + "Epoch already settled on-chain (detected via revert), "+ + "waiting for event sync", + "application", app.Name, + "epoch_index", result.EpochNumber.Uint64()) + return nil + } s.Logger.Error("failed to send Settle transaction", "application", app.Name, "epoch_index", result.EpochNumber.Uint64(), "error", err) return err @@ -772,6 +799,23 @@ func (s *Service) reactToTournament(ctx context.Context, app *Application, mostR Context: ctx, BlockNumber: new(big.Int).SetUint64(mostRecentBlock), } + + // Check on-chain if the commitment was already joined (e.g., after a node + // restart where joinInFlight was lost and the DB event sync hasn't caught up). + alreadyJoined, err := tournamentAdapter.IsCommitmentJoined(callOpts, *epoch.Commitment) + if err != nil { + s.Logger.Error("failed to check commitment on-chain", "application", app.Name, + "epoch_index", currentEpochIndex, "tournament", epoch.TournamentAddress.Hex(), + "commitment", epoch.Commitment.Hex(), "error", err) + return err + } + if alreadyJoined { + s.Logger.Info("Commitment already joined on-chain, waiting for event sync", + "application", app.Name, "epoch_index", currentEpochIndex, + "tournament", epoch.TournamentAddress.Hex(), "commitment", epoch.Commitment.Hex()) + return nil + } + bondValue, err := tournamentAdapter.BondValue(callOpts) if err != nil { s.Logger.Error("failed to fetch tournament bond value", "application", app.Name, @@ -798,6 +842,17 @@ func (s *Service) reactToTournament(ctx context.Context, app *Application, mostR tx, err := tournamentAdapter.JoinTournament(&txOptsWithValue, *epoch.MachineHash, hashSliceToByteSlice(epoch.CommitmentProof), leftNode, rightNode) if err != nil { + // The contract reverts with "clock is initialized" (a require string + // from Clock.sol) when the commitment was already joined. This can + // happen after a restart if the on-chain check (IsCommitmentJoined) + // used a slightly stale block number. + // Matched via ABI-decoded Error(string) revert data, not err.Error(). + if isRevertReason(err, TournamentClockInitialized) { + s.Logger.Info("Commitment already joined on-chain (detected via revert), waiting for event sync", + "application", app.Name, "epoch_index", currentEpochIndex, + "tournament", epoch.TournamentAddress.Hex(), "commitment", epoch.Commitment.Hex()) + return nil + } s.Logger.Error("failed to send join tournament transaction", "application", app.Name, "epoch_index", currentEpochIndex, "error", err) return err @@ -831,3 +886,10 @@ func (s *Service) validateApplication(ctx context.Context, app *Application) err } return nil } + +// isIncorrectEpochNumberError checks whether an error from Settle is an +// IncorrectEpochNumber revert, indicating the epoch was already settled +// on-chain (e.g., before a node restart or by another entity). +func isIncorrectEpochNumberError(err error) bool { + return ethutil.IsCustomError(err, idaveconsensus.IDaveConsensusMetaData, "IncorrectEpochNumber") +} diff --git a/internal/prt/types.go b/internal/prt/types.go index 7ef5fa0c8..91343c057 100644 --- a/internal/prt/types.go +++ b/internal/prt/types.go @@ -33,6 +33,7 @@ type TournamentAdapter interface { Constants(opts *bind.CallOpts) (TournamentConstants, error) TimeFinished(opts *bind.CallOpts) (bool, uint64, error) BondValue(opts *bind.CallOpts) (*big.Int, error) + IsCommitmentJoined(opts *bind.CallOpts, commitmentRoot [32]byte) (bool, error) JoinTournament(opts *bind.TransactOpts, finalState [32]byte, proof [][32]byte, leftNode [32]byte, rightNode [32]byte) (*types.Transaction, error) } @@ -41,6 +42,7 @@ type TournamentAdapter interface { type DaveConsensusAdapter interface { ParseEpochSealed(log types.Log) (*idaveconsensus.IDaveConsensusEpochSealed, error) CanSettle(opts *bind.CallOpts) (CanSettleResult, error) + IsEpochSettled(opts *bind.CallOpts, epochNumber uint64) (bool, error) Settle(opts *bind.TransactOpts, epochNumber *big.Int, outputsMerkleRoot [32]byte, proof [][32]byte) (*types.Transaction, error) } @@ -89,3 +91,8 @@ func (l TournamentLevel) String() string { } const TournamentFailedNoWinner string = "0xb3045ef8" + +// TournamentClockInitialized is the revert reason from the tournament contract +// when joinTournament is called for a commitment that was already joined. +// This happens after a node restart when the in-memory joinInFlight map is lost. +const TournamentClockInitialized string = "clock is initialized" diff --git a/internal/prt/util.go b/internal/prt/util.go index 8bd20f196..00694aa86 100644 --- a/internal/prt/util.go +++ b/internal/prt/util.go @@ -4,44 +4,61 @@ package prt import ( - "errors" + "encoding/hex" + "strings" "unsafe" + "github.com/cartesi/rollups-node/pkg/ethutil" + + "github.com/ethereum/go-ethereum/accounts/abi" "github.com/ethereum/go-ethereum/common" - "github.com/ethereum/go-ethereum/rpc" ) -type JSONRPCInfo struct { - Code int - Message string - Data any - HasCode bool - HasData bool -} +// Re-export for backward compatibility within the package. +type JSONRPCInfo = ethutil.JSONRPCInfo -func ExtractJsonErrorInfo(err error) (JSONRPCInfo, bool) { - var out JSONRPCInfo - if err == nil { - return out, false - } +var ExtractJSONErrorInfo = ethutil.ExtractJSONErrorInfo - var e rpc.Error - if errors.As(err, &e) { - out.Code = e.ErrorCode() - out.Message = e.Error() - out.HasCode = true - } +// errorStringSelector is the 4-byte selector for Solidity's Error(string), +// used by require(condition, "reason") statements. This is keccak256("Error(string)")[:4] +// and is a well-known, stable Solidity ABI constant (0x08c379a0). +var errorStringSelector = [4]byte{0x08, 0xc3, 0x79, 0xa0} - var de rpc.DataError - if errors.As(err, &de) { - out.Data = de.ErrorData() - out.HasData = true - if !out.HasCode { - out.Message = de.Error() - } +// isRevertReason checks whether an RPC error contains an Error(string) revert +// with the given reason. It extracts the rpc.DataError, decodes the +// ABI-encoded Error(string) payload, and compares the decoded string. +// This is more robust than matching err.Error() because it operates on +// structured revert data, independent of how the RPC provider formats errors. +func isRevertReason(err error, reason string) bool { + info, ok := ExtractJSONErrorInfo(err) + if !ok || !info.HasData { + return false } - - return out, out.HasCode || out.HasData + dataStr, ok := info.Data.(string) + if !ok { + return false + } + dataStr = strings.TrimPrefix(dataStr, "0x") + data, decErr := hex.DecodeString(dataStr) + if decErr != nil || len(data) < 4 { + return false + } + // Check for the Error(string) selector: 0x08c379a0. + if [4]byte(data[:4]) != errorStringSelector { + return false + } + // ABI-decode the string argument. + stringType, typeErr := abi.NewType("string", "", nil) + if typeErr != nil { + return false + } + args := abi.Arguments{{Type: stringType}} + values, decErr := args.Unpack(data[4:]) + if decErr != nil || len(values) == 0 { + return false + } + decoded, ok := values[0].(string) + return ok && decoded == reason } // hashSliceToByteSlice converts []common.Hash to [][32]byte without copying. diff --git a/internal/prt/util_test.go b/internal/prt/util_test.go new file mode 100644 index 000000000..afdb08fd6 --- /dev/null +++ b/internal/prt/util_test.go @@ -0,0 +1,89 @@ +// (c) Cartesi and individual authors (see AUTHORS) +// SPDX-License-Identifier: Apache-2.0 (see LICENSE) + +package prt + +import ( + "encoding/hex" + "errors" + "testing" + + "github.com/stretchr/testify/assert" +) + +// rpcDataError simulates an RPC error with revert data, as returned by +// eth_estimateGas when a contract reverts. +type rpcDataError struct { + code int + msg string + data any +} + +func (e *rpcDataError) Error() string { return e.msg } +func (e *rpcDataError) ErrorCode() int { return e.code } +func (e *rpcDataError) ErrorData() any { return e.data } + +// buildErrorStringRevert builds the hex-encoded revert data for +// Solidity's Error(string) — the encoding used by require(cond, "reason"). +func buildErrorStringRevert(reason string) string { + // Error(string) selector: 0x08c379a0 + selector := []byte{0x08, 0xc3, 0x79, 0xa0} + // ABI encoding: offset (0x20) + length + padded string bytes + offset := make([]byte, 32) + offset[31] = 0x20 + length := make([]byte, 32) + length[31] = byte(len(reason)) + // Pad string data to 32-byte boundary. + data := []byte(reason) + if rem := len(data) % 32; rem != 0 { + data = append(data, make([]byte, 32-rem)...) + } + var buf []byte + buf = append(buf, selector...) + buf = append(buf, offset...) + buf = append(buf, length...) + buf = append(buf, data...) + return "0x" + hex.EncodeToString(buf) +} + +func TestIsRevertReason(t *testing.T) { + t.Run("MatchesExactReason", func(t *testing.T) { + data := buildErrorStringRevert("clock is initialized") + err := &rpcDataError{code: 3, msg: "execution reverted", data: data} + assert.True(t, isRevertReason(err, "clock is initialized")) + }) + + t.Run("DoesNotMatchDifferentReason", func(t *testing.T) { + data := buildErrorStringRevert("clock is initialized") + err := &rpcDataError{code: 3, msg: "execution reverted", data: data} + assert.False(t, isRevertReason(err, "something else")) + }) + + t.Run("DoesNotMatchSubstring", func(t *testing.T) { + data := buildErrorStringRevert("clock is initialized and running") + err := &rpcDataError{code: 3, msg: "execution reverted", data: data} + assert.False(t, isRevertReason(err, "clock is initialized")) + }) + + t.Run("ReturnsFalseForNilError", func(t *testing.T) { + assert.False(t, isRevertReason(nil, "clock is initialized")) + }) + + t.Run("ReturnsFalseForNonRPCError", func(t *testing.T) { + err := errors.New("clock is initialized") + assert.False(t, isRevertReason(err, "clock is initialized")) + }) + + t.Run("ReturnsFalseForCustomErrorSelector", func(t *testing.T) { + // Custom error selector (not Error(string)) + err := &rpcDataError{code: 3, msg: "execution reverted", data: "0xb3045ef8"} + assert.False(t, isRevertReason(err, "clock is initialized")) + }) + + t.Run("HandlesWithout0xPrefix", func(t *testing.T) { + data := buildErrorStringRevert("clock is initialized") + // Strip the "0x" prefix — some RPC providers omit it. + err := &rpcDataError{code: 3, msg: "execution reverted", data: data[2:]} + assert.True(t, isRevertReason(err, "clock is initialized")) + }) +} diff --git a/pkg/ethutil/rpcerror.go b/pkg/ethutil/rpcerror.go new file mode 100644 index 000000000..76b5674e7 --- /dev/null +++ b/pkg/ethutil/rpcerror.go @@ -0,0 +1,92 @@ +// (c) Cartesi and individual authors (see AUTHORS) +// SPDX-License-Identifier: Apache-2.0 (see LICENSE) + +package ethutil + +import ( + "bytes" + "errors" + "fmt" + + "github.com/ethereum/go-ethereum/accounts/abi/bind" + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/rpc" +) + +// JSONRPCInfo contains structured error information extracted from a JSON-RPC error response. +type JSONRPCInfo struct { + Code int + Message string + Data any + HasCode bool + HasData bool +} + +// ExtractJSONErrorInfo attempts to extract JSON-RPC error details from an error. +// It checks for rpc.Error (code + message) and rpc.DataError (error data). +// Returns the extracted info and true if the error contained JSON-RPC details. +func ExtractJSONErrorInfo(err error) (JSONRPCInfo, bool) { + var out JSONRPCInfo + if err == nil { + return out, false + } + + var e rpc.Error + if errors.As(err, &e) { + out.Code = e.ErrorCode() + out.Message = e.Error() + out.HasCode = true + } + + var de rpc.DataError + if errors.As(err, &de) { + out.Data = de.ErrorData() + out.HasData = true + if !out.HasCode { + out.Message = de.Error() + } + } + + return out, out.HasCode || out.HasData +} + +// MatchesSelector checks whether RPC error data starts with the given 4-byte selector. +// Handles varying representations across Ethereum clients: hex strings (with/without +// 0x prefix, any case) and raw []byte. +func MatchesSelector(data any, selector string) bool { + expected := common.FromHex(selector) + var got []byte + switch d := data.(type) { + case string: + got = common.FromHex(d) + case []byte: + got = d + default: + return false + } + return len(got) >= len(expected) && bytes.Equal(got[:len(expected)], expected) +} + +// IsCustomError checks whether an RPC error is a specific custom Solidity error +// defined in the given contract metadata. It extracts the revert data from the +// error and compares its 4-byte selector against the ABI-derived selector for +// errorName. This is case-insensitive and handles 0x prefix variations. +func IsCustomError(err error, metadata *bind.MetaData, errorName string) bool { + if metadata == nil { + return false + } + info, ok := ExtractJSONErrorInfo(err) + if !ok || !info.HasData { + return false + } + parsed, _ := metadata.GetAbi() + if parsed == nil { + return false + } + abiErr, ok := parsed.Errors[errorName] + if !ok { + return false + } + selector := fmt.Sprintf("0x%x", abiErr.ID[:4]) + return MatchesSelector(info.Data, selector) +} diff --git a/pkg/ethutil/rpcerror_test.go b/pkg/ethutil/rpcerror_test.go new file mode 100644 index 000000000..ccfeea374 --- /dev/null +++ b/pkg/ethutil/rpcerror_test.go @@ -0,0 +1,108 @@ +// (c) Cartesi and individual authors (see AUTHORS) +// SPDX-License-Identifier: Apache-2.0 (see LICENSE) + +package ethutil + +import ( + "errors" + "testing" + + "github.com/stretchr/testify/assert" +) + +// rpcDataError simulates an RPC error with revert data, as returned by +// eth_estimateGas when a contract reverts. +type rpcDataError struct { + code int + msg string + data any +} + +func (e *rpcDataError) Error() string { return e.msg } +func (e *rpcDataError) ErrorCode() int { return e.code } +func (e *rpcDataError) ErrorData() any { return e.data } + +func TestExtractJSONErrorInfo(t *testing.T) { + t.Run("NilError", func(t *testing.T) { + info, ok := ExtractJSONErrorInfo(nil) + assert.False(t, ok) + assert.False(t, info.HasCode) + assert.False(t, info.HasData) + }) + + t.Run("PlainError", func(t *testing.T) { + _, ok := ExtractJSONErrorInfo(errors.New("plain")) + assert.False(t, ok) + }) + + t.Run("RPCDataError", func(t *testing.T) { + err := &rpcDataError{code: 3, msg: "execution reverted", data: "0xdeadbeef"} + info, ok := ExtractJSONErrorInfo(err) + assert.True(t, ok) + assert.True(t, info.HasCode) + assert.True(t, info.HasData) + assert.Equal(t, 3, info.Code) + assert.Equal(t, "0xdeadbeef", info.Data) + }) +} + +func TestMatchesSelector(t *testing.T) { + t.Run("LowercaseHexWithPrefix", func(t *testing.T) { + assert.True(t, MatchesSelector("0xaabbccdd0000", "0xaabbccdd")) + }) + + t.Run("UppercaseHexWithPrefix", func(t *testing.T) { + assert.True(t, MatchesSelector("0xAABBCCDD0000", "0xaabbccdd")) + }) + + t.Run("MixedCaseHexWithPrefix", func(t *testing.T) { + assert.True(t, MatchesSelector("0xAaBbCcDd0000", "0xaabbccdd")) + }) + + t.Run("WithoutPrefix", func(t *testing.T) { + assert.True(t, MatchesSelector("aabbccdd0000", "0xaabbccdd")) + }) + + t.Run("ExactLength", func(t *testing.T) { + assert.True(t, MatchesSelector("0xaabbccdd", "0xaabbccdd")) + }) + + t.Run("DifferentSelector", func(t *testing.T) { + assert.False(t, MatchesSelector("0x11223344", "0xaabbccdd")) + }) + + t.Run("TooShort", func(t *testing.T) { + assert.False(t, MatchesSelector("0xaabb", "0xaabbccdd")) + }) + + t.Run("EmptyData", func(t *testing.T) { + assert.False(t, MatchesSelector("", "0xaabbccdd")) + }) + + t.Run("RawBytes", func(t *testing.T) { + assert.True(t, MatchesSelector([]byte{0xaa, 0xbb, 0xcc, 0xdd, 0x00}, "0xaabbccdd")) + }) + + t.Run("NonStringNonBytes", func(t *testing.T) { + assert.False(t, MatchesSelector(42, "0xaabbccdd")) + }) + + t.Run("NilData", func(t *testing.T) { + assert.False(t, MatchesSelector(nil, "0xaabbccdd")) + }) +} + +func TestIsCustomError(t *testing.T) { + t.Run("NilError", func(t *testing.T) { + assert.False(t, IsCustomError(nil, nil, "Foo")) + }) + + t.Run("NonRPCError", func(t *testing.T) { + assert.False(t, IsCustomError(errors.New("plain"), nil, "Foo")) + }) + + t.Run("NilMetadata", func(t *testing.T) { + err := &rpcDataError{code: 3, msg: "revert", data: "0xaabbccdd"} + assert.False(t, IsCustomError(err, nil, "Foo")) + }) +} diff --git a/pkg/machine/implementation_test.go b/pkg/machine/implementation_test.go index 60cf9a567..b56183948 100644 --- a/pkg/machine/implementation_test.go +++ b/pkg/machine/implementation_test.go @@ -1249,11 +1249,11 @@ func (s *ImplementationSuite) TestCheckContext() { err = checkContext(canceledCtx) require.ErrorIs(err, ErrCanceled) - // Test deadline exceeded context - timeoutCtx, cancel := context.WithTimeout(ctx, 1*time.Millisecond) + // Test deadline exceeded context — use a deadline in the past so it's + // already expired without relying on sleep timing. + expiredCtx, cancel := context.WithDeadline(ctx, time.Now().Add(-time.Second)) defer cancel() - time.Sleep(2 * time.Millisecond) - err = checkContext(timeoutCtx) + err = checkContext(expiredCtx) require.ErrorIs(err, ErrDeadlineExceeded) // Test nil context (should not panic) diff --git a/scripts/run-integration-tests.sh b/scripts/run-integration-tests.sh index 6c611fd84..449695630 100755 --- a/scripts/run-integration-tests.sh +++ b/scripts/run-integration-tests.sh @@ -3,50 +3,24 @@ # SPDX-License-Identifier: Apache-2.0 (see LICENSE) # # Entrypoint for the integration-test container. -# Waits for the node to become healthy, starts a background health monitor, -# then runs the Go integration test suite. +# The node process is started and managed by TestMain in the Go test suite, +# so this script only needs to set up PATH and run the tests. # -# Usage: run-integration-tests.sh +# Usage: run-integration-tests.sh set -eu -NODE_URL="${1:-http://node:10000}" - -echo "Waiting for node to become healthy..." -attempts=0 -until curl -sf "${NODE_URL}/readyz"; do - attempts=$((attempts + 1)) - if [ "$attempts" -ge 60 ]; then - echo "ERROR: Node failed to become healthy after 120 seconds" - exit 1 - fi - sleep 2 -done -echo "Node is healthy. Running integration tests..." - -# Monitor node health in background; kill the entire process group if -# the node crashes. Uses a failure counter (3 consecutive misses) to -# tolerate transient blips such as GC pauses. -(fail_count=0; while sleep 5; do - if ! curl -sf "${NODE_URL}/readyz" >/dev/null 2>&1; then - fail_count=$((fail_count + 1)) - echo "WARNING: Node health check failed ($fail_count/3)" - if [ "$fail_count" -ge 3 ]; then - echo "ERROR: Node unhealthy after 3 consecutive checks, aborting tests." - kill 0 2>/dev/null - exit 1 - fi - else - fail_count=0 - fi -done) & -HEALTH_PID=$! -trap "kill $HEALTH_PID 2>/dev/null" EXIT - export PATH="/opt/go/bin:/build/cartesi/go/rollups-node:$PATH" -# Smoke-check: verify the CLI binary is on PATH before running the suite. +# Smoke-check: verify the required binaries are on PATH. which cartesi-rollups-cli || { echo "ERROR: cartesi-rollups-cli not found on PATH"; exit 1; } +which cartesi-rollups-node || { echo "ERROR: cartesi-rollups-node not found on PATH"; exit 1; } + +# Print the node log on exit so it appears in docker compose logs. +NODE_LOG="${CARTESI_TEST_NODE_LOG_FILE:-}" +if [ -n "$NODE_LOG" ]; then + trap 'echo "=== NODE LOG ==="; cat "$NODE_LOG" 2>/dev/null || true' EXIT +fi # Timeout must be less than the CI job timeout-minutes (60) to produce # a useful go test panic instead of an abrupt CI kill. diff --git a/test/compose/compose.integration.yaml b/test/compose/compose.integration.yaml index 0caab9aa5..037e2be3f 100644 --- a/test/compose/compose.integration.yaml +++ b/test/compose/compose.integration.yaml @@ -40,8 +40,8 @@ services: POSTGRES_DB: rollupsdb migration: - image: cartesi/rollups-node:devel - command: cartesi-rollups-cli db init + image: cartesi/rollups-node:tester + command: /build/cartesi/go/rollups-node/cartesi-rollups-cli db init depends_on: database: condition: service_healthy @@ -74,49 +74,25 @@ services: - ../downloads:/usr/share/cartesi-machine/images restart: "no" - node: - image: cartesi/rollups-node:devel - init: true - command: ["bash", "-o", "pipefail", "-c", "cartesi-rollups-node 2>&1 | tee /var/lib/cartesi-rollups-node/logs/node.log"] - depends_on: - migration: - condition: service_completed_successfully - ethereum_provider: - condition: service_healthy - dapp-builder: - condition: service_completed_successfully - volumes: - - dapp_images:/var/lib/cartesi-rollups-node/dapps - - node_logs:/var/lib/cartesi-rollups-node/logs - networks: - - devnet - healthcheck: - test: ["CMD", "curl", "-G", "-f", "http://127.0.0.1:10000/readyz"] - interval: 5s - timeout: 5s - retries: 30 - start_period: 10s - environment: - <<: *env - CARTESI_BLOCKCHAIN_DEFAULT_BLOCK: latest - CARTESI_ADVANCER_POLLING_INTERVAL: 1 - CARTESI_VALIDATOR_POLLING_INTERVAL: 1 - CARTESI_CLAIMER_POLLING_INTERVAL: 1 - CARTESI_PRT_POLLING_INTERVAL: 1 - + # The node is started and managed by TestMain inside the test process. + # This ensures all tests (including restart and snapshot policy tests) + # run with the same infrastructure in both local and CI environments. integration-test: image: cartesi/rollups-node:tester profiles: [integration-test] + init: true entrypoint: ["bash"] - command: ["/scripts/run-integration-tests.sh", "http://node:10000"] + command: ["/scripts/run-integration-tests.sh"] depends_on: - node: + migration: + condition: service_completed_successfully + ethereum_provider: condition: service_healthy dapp-builder: condition: service_completed_successfully volumes: - dapp_images:/var/lib/cartesi-rollups-node/dapps:ro - - node_logs:/var/lib/cartesi-rollups-node/logs:ro + - node_logs:/var/lib/cartesi-rollups-node/logs - ../downloads:/usr/share/cartesi-machine/images - ../../scripts/run-integration-tests.sh:/scripts/run-integration-tests.sh:ro networks: @@ -124,11 +100,12 @@ services: restart: "no" environment: <<: *env + CARTESI_BLOCKCHAIN_DEFAULT_BLOCK: latest CARTESI_TEST_DAPP_PATH: /var/lib/cartesi-rollups-node/dapps/echo-dapp CARTESI_TEST_REJECT_DAPP_PATH: /var/lib/cartesi-rollups-node/dapps/reject-loop-dapp CARTESI_TEST_EXCEPTION_DAPP_PATH: /var/lib/cartesi-rollups-node/dapps/exception-loop-dapp CARTESI_TEST_NODE_LOG_FILE: /var/lib/cartesi-rollups-node/logs/node.log - CARTESI_INSPECT_URL: http://node:10012/ + CARTESI_INSPECT_URL: http://localhost:10012/ volumes: dapp_images: diff --git a/test/integration/main_test.go b/test/integration/main_test.go index 1e82da4da..fac22fe4f 100644 --- a/test/integration/main_test.go +++ b/test/integration/main_test.go @@ -6,36 +6,115 @@ package integration import ( + "context" "flag" "fmt" "os" "testing" + "time" ) -// TestMain enforces that integration tests run sequentially. -// These tests share a single Anvil blockchain instance and PRT tests call -// anvilMine() which globally advances the block number, affecting Authority -// epoch boundaries. Running tests in parallel would cause subtle timing races. +// TestMain manages the node process and enforces sequential test execution. +// +// If no node is already running on port 10000 (e.g., in Docker Compose), +// TestMain starts the node binary as a subprocess, waits for health, and +// stops it after all tests complete. This makes the node lifecycle +// transparent to individual test suites — they don't need to know whether +// the node was started by the test or by an external process. +// +// Restart/snapshot tests call stopSharedNode/startSharedNode to exercise +// the node's synchronization path. When the node is externally managed +// (Compose), those tests are skipped. func TestMain(m *testing.M) { + var createdSnapshotsDir string flag.Parse() if testing.Short() { fmt.Fprintln(os.Stderr, "skipping integration tests in short mode") os.Exit(0) } - // Warn if -test.parallel is set above 1. The test binary flag is already - // parsed by flag.Parse(), so we can inspect it directly via GOMAXPROCS or - // by checking the flag value. Go's testing package uses -test.parallel to - // control the maximum number of tests running in parallel; the default is - // GOMAXPROCS which may be >1. + + // Enforce sequential execution — tests share blockchain state. p := flag.Lookup("test.parallel") if p != nil && p.Value.String() != "1" { fmt.Fprintln(os.Stderr, "WARNING: integration tests must not run in parallel "+ "(-test.parallel should be 1). Forcing -test.parallel=1.") if err := p.Value.Set("1"); err != nil { - fmt.Fprintf(os.Stderr, "failed to set -test.parallel=1: %v\n", err) + fmt.Fprintf(os.Stderr, + "failed to set -test.parallel=1: %v\n", err) + os.Exit(1) + } + } + + // Start the node if none is running (local execution). + // In Docker Compose, the node is a separate container and is already + // running — we detect this by checking if port 10000 is in use. + if nodePortAvailable() { + logPath := os.Getenv("CARTESI_TEST_NODE_LOG_FILE") + if logPath == "" { + f, err := os.CreateTemp("", "rollups-node-integration-*.log") + if err != nil { + fmt.Fprintf(os.Stderr, + "failed to create node log file: %v\n", err) + os.Exit(1) + } + logPath = f.Name() + f.Close() + os.Setenv("CARTESI_TEST_NODE_LOG_FILE", logPath) + } + + // Use a temporary directory for snapshots so they don't pollute the + // repo and are cleaned up even if the test fails. + if os.Getenv("CARTESI_SNAPSHOTS_DIR") == "" { + snapshotsDir, err := os.MkdirTemp("", "rollups-node-snapshots-*") + if err != nil { + fmt.Fprintf(os.Stderr, + "failed to create snapshots dir: %v\n", err) + os.Exit(1) + } + os.Setenv("CARTESI_SNAPSHOTS_DIR", snapshotsDir) + createdSnapshotsDir = snapshotsDir + fmt.Fprintf(os.Stderr, "Snapshots dir: %s\n", snapshotsDir) + } + + fmt.Fprintf(os.Stderr, "Starting node (log: %s)...\n", logPath) + + var err error + sharedNode, err = startNodeWithLog(logPath) + if err != nil { + fmt.Fprintf(os.Stderr, "failed to start node: %v\n", err) os.Exit(1) } + + ctx, cancel := context.WithTimeout( + context.Background(), 2*time.Minute) + if err := sharedNode.waitForHealth(ctx, nil); err != nil { + cancel() + sharedNode.stop(nil) + fmt.Fprintf(os.Stderr, + "node failed to become healthy: %v\n", err) + os.Exit(1) + } + cancel() + fmt.Fprintln(os.Stderr, "Node is healthy. Running integration tests...") + } else { + fmt.Fprintln(os.Stderr, + "Node already running on port 10000 (external). "+ + "Restart tests will be skipped.") + } + + code := m.Run() + + if sharedNode != nil { + fmt.Fprintln(os.Stderr, "Stopping node...") + sharedNode.stop(nil) } - os.Exit(m.Run()) + + // Clean up snapshots directory only if we created the temp dir ourselves. + if createdSnapshotsDir != "" { + fmt.Fprintf(os.Stderr, "Cleaning up snapshots dir: %s\n", createdSnapshotsDir) + os.RemoveAll(createdSnapshotsDir) + } + + os.Exit(code) } diff --git a/test/integration/node_helpers_test.go b/test/integration/node_helpers_test.go new file mode 100644 index 000000000..e914ae427 --- /dev/null +++ b/test/integration/node_helpers_test.go @@ -0,0 +1,208 @@ +// (c) Cartesi and individual authors (see AUTHORS) +// SPDX-License-Identifier: Apache-2.0 (see LICENSE) + +//go:build endtoendtests + +package integration + +import ( + "context" + "fmt" + "net" + "net/http" + "os" + "os/exec" + "testing" + "time" +) + +const nodeBinary = "cartesi-rollups-node" + +// sharedNode is the test-managed node process, started by TestMain when no +// external node is running. All test suites share this instance. Restart +// tests stop and restart it via stopSharedNode/startSharedNode. +// When nil, the node is externally managed (e.g., Docker Compose) and +// restart tests are skipped. +var sharedNode *nodeProcess + +// isNodeSelfManaged returns true if TestMain started the node process. +// When false, the node is externally managed and cannot be restarted. +func isNodeSelfManaged() bool { + return sharedNode != nil +} + +// stopSharedNode stops the test-managed node. Panics if the node is +// externally managed. +func stopSharedNode(t testing.TB) { + if sharedNode == nil { + t.Fatal("cannot stop node: not managed by tests (running in compose?)") + } + sharedNode.stop(t) + sharedNode = nil +} + +// startSharedNode starts a new test-managed node, reusing the existing log +// file. Call this after stopSharedNode to restart the node. +func startSharedNode(t testing.TB) { + if sharedNode != nil { + t.Fatal("cannot start node: already running") + } + + logPath := os.Getenv("CARTESI_TEST_NODE_LOG_FILE") + var err error + sharedNode, err = startNodeWithLog(logPath) + if err != nil { + t.Fatalf("failed to start node: %v", err) + } + + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Minute) + defer cancel() + if err := sharedNode.waitForHealth(ctx, t); err != nil { + sharedNode.stop(t) + sharedNode = nil + t.Fatalf("node failed to become healthy: %v", err) + } +} + +// nodePortAvailable returns true if the node's telemetry port (10000) is free. +func nodePortAvailable() bool { + conn, err := net.DialTimeout("tcp", "localhost:10000", time.Second) + if err != nil { + return true + } + conn.Close() + return false +} + +// nodeProcess represents a running node subprocess managed by the test. +type nodeProcess struct { + cmd *exec.Cmd + logFile *os.File + tail *exec.Cmd // tail -f process for live log streaming + tty *os.File // /dev/tty FD used by tail; closed in stop() +} + +// startNodeWithLog starts the node binary as a subprocess, appending output +// to the given log file path. The node inherits the current environment +// (database connection, blockchain endpoint, etc.) and additionally sets +// fast polling intervals for test responsiveness. +// +// A background `tail -f` process streams the log file to the terminal so +// the user can see node output in real time. This must be a separate process +// because `go test` captures the test process's stdout/stderr. +func startNodeWithLog(logPath string) (*nodeProcess, error) { + if _, err := exec.LookPath(nodeBinary); err != nil { + return nil, fmt.Errorf("%s not found on PATH: %w", nodeBinary, err) + } + + logFile, err := os.OpenFile( //nolint:gosec + logPath, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0644) + if err != nil { + return nil, fmt.Errorf("open log file %s: %w", logPath, err) + } + + cmd := exec.Command(nodeBinary) //nolint:gosec + cmd.Stdout = logFile + cmd.Stderr = logFile + cmd.Env = append(os.Environ(), + "CARTESI_ADVANCER_POLLING_INTERVAL=1", + "CARTESI_VALIDATOR_POLLING_INTERVAL=1", + "CARTESI_CLAIMER_POLLING_INTERVAL=1", + "CARTESI_PRT_POLLING_INTERVAL=1", + ) + + if err := cmd.Start(); err != nil { + logFile.Close() + return nil, fmt.Errorf("start node: %w", err) + } + + // Stream the log file to the terminal via a separate tail process. + // We write to /dev/tty to bypass go test and gotestsum's output capture, + // so the user sees node logs in real time just like the old Makefile did. + // Falls back silently if /dev/tty is not available (e.g., CI, compose). + tty, ttyErr := os.OpenFile("/dev/tty", os.O_WRONLY, 0) //nolint:gosec + if ttyErr != nil { + return &nodeProcess{cmd: cmd, logFile: logFile}, nil + } + + tail := exec.Command("tail", "-f", logPath) //nolint:gosec + tail.Stdout = tty + tail.Stderr = tty + if err := tail.Start(); err != nil { + tty.Close() + return &nodeProcess{cmd: cmd, logFile: logFile}, nil + } + + return &nodeProcess{cmd: cmd, logFile: logFile, tail: tail, tty: tty}, nil +} + +// waitForHealth polls the node's readyz endpoint until it responds 200 OK +// or the context is cancelled. +func (n *nodeProcess) waitForHealth(ctx context.Context, t testing.TB) error { + client := &http.Client{Timeout: 2 * time.Second} + return pollUntil(ctx, 2*time.Second, func() (bool, error) { + req, err := http.NewRequestWithContext( + ctx, "GET", "http://localhost:10000/readyz", nil) + if err != nil { + return false, nil + } + resp, err := client.Do(req) + if err != nil { + if t != nil { + t.Log(" waiting for node health...") + } + return false, nil + } + defer resp.Body.Close() + return resp.StatusCode == http.StatusOK, nil + }) +} + +// stop sends an interrupt signal to the node process and waits for it to exit. +// On Unix this is SIGINT (same as Ctrl+C), triggering the node's graceful +// shutdown. Also kills the tail process if running. +func (n *nodeProcess) stop(t testing.TB) { + if n.cmd.Process == nil { + return + } + + pid := n.cmd.Process.Pid + if t != nil { + t.Logf(" stopping node (pid=%d)...", pid) + } + + // Stop the tail process first so it doesn't print shutdown logs. + if n.tail != nil && n.tail.Process != nil { + _ = n.tail.Process.Kill() + _ = n.tail.Wait() + } + if n.tty != nil { + n.tty.Close() + } + + // Send interrupt for graceful shutdown. + if err := n.cmd.Process.Signal(os.Interrupt); err != nil { + if t != nil { + t.Logf(" signal failed, killing: %v", err) + } + _ = n.cmd.Process.Kill() + } + + // Wait for exit with a timeout — if the node hangs during shutdown, + // fall back to SIGKILL so the test suite doesn't hang indefinitely. + done := make(chan error, 1) + go func() { done <- n.cmd.Wait() }() + select { + case <-done: + case <-time.After(30 * time.Second): + if t != nil { + t.Log(" node did not exit within 30s, sending SIGKILL") + } + _ = n.cmd.Process.Kill() + <-done + } + n.logFile.Close() + if t != nil { + t.Logf(" node stopped (pid=%d)", pid) + } +} diff --git a/test/integration/restart_test.go b/test/integration/restart_test.go new file mode 100644 index 000000000..093b33fc8 --- /dev/null +++ b/test/integration/restart_test.go @@ -0,0 +1,315 @@ +// (c) Cartesi and individual authors (see AUTHORS) +// SPDX-License-Identifier: Apache-2.0 (see LICENSE) + +//go:build endtoendtests + +// IMPORTANT: These integration tests share a single Anvil blockchain instance. +// PRT tests call anvilMine() which globally advances the block number, affecting +// Authority epoch boundaries. Tests MUST NOT run in parallel (no t.Parallel()). +// The go test runner executes tests within a package sequentially by default, +// which is required for correctness here. + +package integration + +import ( + "context" + "regexp" + "testing" + "time" + + "github.com/cartesi/rollups-node/internal/model" + "github.com/ethereum/go-ethereum/ethclient" + "github.com/stretchr/testify/require" + "github.com/stretchr/testify/suite" +) + +type RestartSuite struct { + suite.Suite + LogChecker + ctx context.Context + cancel context.CancelFunc + app1Name string + app2Name string +} + +func TestRestart(t *testing.T) { + if !isNodeSelfManaged() { + t.Skip("skipping: node is externally managed (compose); " + + "restart tests require test-managed node") + } + suite.Run(t, new(RestartSuite)) +} + +func (s *RestartSuite) SetupSuite() { + s.ctx, s.cancel = context.WithTimeout( + context.Background(), 25*time.Minute) +} + +func (s *RestartSuite) TearDownSuite() { + // If a test failed mid-restart, the node may be stopped. Restart it + // so subsequent test suites have a running node. + if sharedNode == nil { + s.T().Log("Restarting shared node for subsequent tests...") + startSharedNode(s.T()) + } + s.cancel() +} + +func (s *RestartSuite) SetupTest() { + s.StartLogCapture() + s.app1Name = "" + s.app2Name = "" +} + +func (s *RestartSuite) TearDownTest() { + for _, name := range []string{s.app1Name, s.app2Name} { + if name != "" { + s.T().Logf("Disabling application %s", name) + if err := disableApplication(s.ctx, name); err != nil { + s.T().Logf( + "warning: failed to disable %s: %v", name, err) + } + } + } + s.CheckLogs(s.T()) +} + +// restartConfig configures the shared restart test flow. +type restartConfig struct { + // ExtraDeployArgs are additional CLI flags for deploy (e.g., "--prt"). + ExtraDeployArgs []string + // PreClaimHook, if non-nil, is called after outputs are verified + // but before the claim/execution phase. + PreClaimHook func( + ctx context.Context, t testing.TB, + require *require.Assertions, appName string, + ) +} + +// runRestartTest is the shared flow for both Authority and PRT restart tests. +// It deploys two apps, processes inputs, restarts the node, sends more inputs, +// and verifies the full L1 pipeline. +func (s *RestartSuite) runRestartTest(cfg restartConfig) { + require := s.Require() + dappPath := envOrDefault( + "CARTESI_TEST_DAPP_PATH", "applications/echo-dapp") + defer timed(s.T(), "full restart multi-app lifecycle test")() + + // === Phase 1: Deploy two apps and process inputs === + + s.T().Log("--- Phase 1: Deploy two apps and process inputs before restart ---") + + func() { + defer timed(s.T(), "deploy two echo-dapps")() + + deployArgs1 := append( + []string{"--salt", uniqueSalt()}, cfg.ExtraDeployArgs...) + deployArgs2 := append( + []string{"--salt", uniqueSalt()}, cfg.ExtraDeployArgs...) + + s.T().Logf(" deploying app-1: name=%s", s.app1Name) + addr1, err := deployApplication( + s.ctx, s.app1Name, dappPath, deployArgs1...) + require.NoError(err, "deploy app-1") + s.T().Logf(" app-1 deployed at %s", addr1) + + s.T().Logf(" deploying app-2: name=%s", s.app2Name) + addr2, err := deployApplication( + s.ctx, s.app2Name, dappPath, deployArgs2...) + require.NoError(err, "deploy app-2") + s.T().Logf(" app-2 deployed at %s", addr2) + + err = anvilSetBalance(s.ctx, addr1, oneEtherWei) + require.NoError(err, "fund app-1 contract") + err = anvilSetBalance(s.ctx, addr2, oneEtherWei) + require.NoError(err, "fund app-2 contract") + s.T().Log(" both apps funded with 1 ETH") + }() + + s.T().Log("Sending one input to each app before restart...") + idx1, _, err := sendInput(s.ctx, s.app1Name, "pre-restart-1") + require.NoError(err, "send input to app-1") + s.T().Logf(" app-1: input sent (index=%d)", idx1) + + idx2, _, err := sendInput(s.ctx, s.app2Name, "pre-restart-2") + require.NoError(err, "send input to app-2") + s.T().Logf(" app-2: input sent (index=%d)", idx2) + + func() { + defer timed(s.T(), "wait for pre-restart inputs")() + processCtx, processCancel := context.WithTimeout( + s.ctx, inputProcessingTimeout) + defer processCancel() + + input1, err := waitForInputProcessed( + processCtx, s.T(), s.app1Name, idx1) + require.NoError(err, "wait for app-1 input processing") + require.Equal( + model.InputCompletionStatus_Accepted, input1.Status) + s.T().Log(" app-1: input ACCEPTED") + + input2, err := waitForInputProcessed( + processCtx, s.T(), s.app2Name, idx2) + require.NoError(err, "wait for app-2 input processing") + require.Equal( + model.InputCompletionStatus_Accepted, input2.Status) + s.T().Log(" app-2: input ACCEPTED") + }() + + s.T().Log("Verifying pre-restart outputs...") + outputs1, err := readOutputs(s.ctx, s.app1Name) + require.NoError(err, "read app-1 outputs") + require.Equal( + uint64(echoOutputsPerInput), outputs1.Pagination.TotalCount, + "app-1 should have %d outputs", echoOutputsPerInput) + + outputs2, err := readOutputs(s.ctx, s.app2Name) + require.NoError(err, "read app-2 outputs") + require.Equal( + uint64(echoOutputsPerInput), outputs2.Pagination.TotalCount, + "app-2 should have %d outputs", echoOutputsPerInput) + s.T().Logf(" both apps have %d outputs each — correct", + echoOutputsPerInput) + + // === Phase 2: Stop and restart the node === + + s.T().Log("--- Phase 2: Restarting node to test machine synchronization ---") + func() { + defer timed(s.T(), "node restart cycle")() + stopSharedNode(s.T()) + startSharedNode(s.T()) + }() + + // === Phase 3: Send more inputs and verify === + + s.T().Log("--- Phase 3: Send inputs after restart and verify ---") + + s.T().Log("Sending one input to each app after restart...") + idx1b, _, err := sendInput(s.ctx, s.app1Name, "post-restart-1") + require.NoError(err, "send post-restart input to app-1") + s.T().Logf(" app-1: input sent (index=%d)", idx1b) + + idx2b, _, err := sendInput(s.ctx, s.app2Name, "post-restart-2") + require.NoError(err, "send post-restart input to app-2") + s.T().Logf(" app-2: input sent (index=%d)", idx2b) + + func() { + defer timed(s.T(), "wait for post-restart inputs")() + processCtx, processCancel := context.WithTimeout( + s.ctx, inputProcessingTimeout) + defer processCancel() + + input1b, err := waitForInputProcessed( + processCtx, s.T(), s.app1Name, idx1b) + require.NoError(err, "wait for app-1 post-restart input") + require.Equal( + model.InputCompletionStatus_Accepted, input1b.Status) + s.T().Log(" app-1: post-restart input ACCEPTED") + + input2b, err := waitForInputProcessed( + processCtx, s.T(), s.app2Name, idx2b) + require.NoError(err, "wait for app-2 post-restart input") + require.Equal( + model.InputCompletionStatus_Accepted, input2b.Status) + s.T().Log(" app-2: post-restart input ACCEPTED") + }() + + s.T().Log("Verifying post-restart outputs...") + outputs1after, err := readOutputs(s.ctx, s.app1Name) + require.NoError(err, "read app-1 outputs after restart") + require.Equal( + uint64(2*echoOutputsPerInput), + outputs1after.Pagination.TotalCount, + "app-1 should have %d outputs after 2 inputs", + 2*echoOutputsPerInput) + + outputs2after, err := readOutputs(s.ctx, s.app2Name) + require.NoError(err, "read app-2 outputs after restart") + require.Equal( + uint64(2*echoOutputsPerInput), + outputs2after.Pagination.TotalCount, + "app-2 should have %d outputs after 2 inputs", + 2*echoOutputsPerInput) + s.T().Logf( + " both apps have %d outputs each after restart — correct", + 2*echoOutputsPerInput) + + s.T().Log("Node survived restart and both apps continued processing") + + // === Optional pre-claim hook (e.g. PRT tournament settlement) === + + if cfg.PreClaimHook != nil { + cfg.PreClaimHook(s.ctx, s.T(), require, s.app1Name) + } + + // === Verify full L1 pipeline for app-1 === + + s.T().Log("Verifying claim and execution for app-1...") + epochIndex := outputs1.Data[0].EpochIndex + + var voucherIdx, noticeIdx uint64 + voucherFound, noticeFound := false, false + for _, out := range outputs1.Data { + if out.DecodedData == nil { + continue + } + switch out.DecodedData.Type { + case "Voucher": + voucherIdx = out.Index + voucherFound = true + case "Notice": + noticeIdx = out.Index + noticeFound = true + } + } + require.True(voucherFound, "app-1: voucher not found") + require.True(noticeFound, "app-1: notice not found") + + verifyClaimAndExecute(s.ctx, s.T(), require, verifyAndExecuteConfig{ + AppName: s.app1Name, + EpochIndex: epochIndex, + EpochOutputs: outputs1.Data, + VoucherIdx: voucherIdx, + NoticeIdx: noticeIdx, + }) + + s.T().Log("=== Restart lifecycle complete: " + + "both apps survived restart, full L1 pipeline verified ===") +} + +// TestRestartMultiAppAuthority tests restart with Authority consensus. +func (s *RestartSuite) TestRestartMultiAppAuthority() { + s.app1Name = uniqueAppName("restart-auth-1") + s.app2Name = uniqueAppName("restart-auth-2") + s.runRestartTest(restartConfig{}) +} + +// TestRestartMultiAppPrt tests restart with PRT (Dave) consensus. +func (s *RestartSuite) TestRestartMultiAppPrt() { + // PRT settlement mines hundreds of blocks rapidly, which can cause + // transient BlockOutOfRangeError in the EVM reader. + s.SetAllowedErrors(AllowedError{ + Pattern: regexp.MustCompile(`BlockOutOfRangeError`), + Reason: "transient Anvil error during rapid block mining in PRT settlement", + }) + + s.app1Name = uniqueAppName("restart-prt-1") + s.app2Name = uniqueAppName("restart-prt-2") + + endpoint := envOrDefault( + "CARTESI_BLOCKCHAIN_HTTP_ENDPOINT", "http://localhost:8545") + ethClient, err := ethclient.Dial(endpoint) + s.Require().NoError(err, "dial ethclient") + defer ethClient.Close() + + s.runRestartTest(restartConfig{ + ExtraDeployArgs: []string{"--prt"}, + PreClaimHook: func( + ctx context.Context, t testing.TB, + require *require.Assertions, appName string, + ) { + settleTournament(ctx, t, require, ethClient, appName, 0) + settleTournament(ctx, t, require, ethClient, appName, 1) + }, + }) +} diff --git a/test/integration/snapshot_policy_test.go b/test/integration/snapshot_policy_test.go new file mode 100644 index 000000000..24265c69c --- /dev/null +++ b/test/integration/snapshot_policy_test.go @@ -0,0 +1,331 @@ +// (c) Cartesi and individual authors (see AUTHORS) +// SPDX-License-Identifier: Apache-2.0 (see LICENSE) + +//go:build endtoendtests + +// IMPORTANT: These integration tests share a single Anvil blockchain instance. +// PRT tests call anvilMine() which globally advances the block number, affecting +// Authority epoch boundaries. Tests MUST NOT run in parallel (no t.Parallel()). +// The go test runner executes tests within a package sequentially by default, +// which is required for correctness here. + +package integration + +import ( + "context" + "regexp" + "strings" + "testing" + "time" + + "github.com/cartesi/rollups-node/internal/model" + "github.com/ethereum/go-ethereum/ethclient" + "github.com/stretchr/testify/require" + "github.com/stretchr/testify/suite" +) + +type SnapshotPolicySuite struct { + suite.Suite + LogChecker + ctx context.Context + cancel context.CancelFunc + appName string +} + +func TestSnapshotPolicy(t *testing.T) { + if !isNodeSelfManaged() { + t.Skip("skipping: node is externally managed (compose); " + + "snapshot policy tests require test-managed node for restart") + } + suite.Run(t, new(SnapshotPolicySuite)) +} + +func (s *SnapshotPolicySuite) SetupSuite() { + s.ctx, s.cancel = context.WithTimeout( + context.Background(), 30*time.Minute) +} + +func (s *SnapshotPolicySuite) TearDownSuite() { + // If a test failed mid-restart, the node may be stopped. Restart it + // so subsequent test suites have a running node. + if sharedNode == nil { + s.T().Log("Restarting shared node for subsequent tests...") + startSharedNode(s.T()) + } + s.cancel() +} + +func (s *SnapshotPolicySuite) SetupTest() { + s.StartLogCapture() + s.appName = "" +} + +func (s *SnapshotPolicySuite) TearDownTest() { + if s.appName != "" { + s.T().Logf("Disabling application %s", s.appName) + if err := disableApplication(s.ctx, s.appName); err != nil { + s.T().Logf( + "warning: failed to disable %s: %v", s.appName, err) + } + } + s.CheckLogs(s.T()) +} + +// deployWithSnapshotPolicy deploys an echo-dapp in disabled state, sets the +// snapshot policy via CLI, then enables the application. This approach avoids +// creating a JSON execution-parameters file with all the default values. +func (s *SnapshotPolicySuite) deployWithSnapshotPolicy( + appName, dappPath string, + policy model.SnapshotPolicy, + extraDeployArgs ...string, +) string { + require := s.Require() + t := s.T() + + deployArgs := append( + []string{"--salt", uniqueSalt(), "--enable=false"}, + extraDeployArgs...) + + t.Logf(" deploying %s in disabled state...", appName) + addr, err := deployApplication( + s.ctx, appName, dappPath, deployArgs...) + require.NoError(err, "deploy echo-dapp (disabled)") + t.Logf(" deployed at %s (disabled)", addr) + + t.Logf(" setting snapshot_policy=%s via CLI...", policy) + _, err = runCLI(s.ctx, + "app", "execution-parameters", "set", + appName, "snapshot_policy", string(policy)) + require.NoError(err, "set snapshot policy") + + t.Logf(" enabling application %s...", appName) + _, err = runCLI(s.ctx, + "app", "status", appName, "enabled", "--yes") + require.NoError(err, "enable application") + + return addr +} + +// snapshotPolicyConfig configures a snapshot policy test. +type snapshotPolicyConfig struct { + // Policy is the snapshot policy to test. + Policy model.SnapshotPolicy + // ExtraDeployArgs are additional CLI flags for deploy (e.g., "--prt"). + ExtraDeployArgs []string + // PreClaimHook, if non-nil, is called before claim/execution verification. + PreClaimHook func( + ctx context.Context, t testing.TB, + require *require.Assertions, appName string, + ) +} + +// runSnapshotPolicyTest runs the shared test flow for a given snapshot policy: +// deploy with policy, send input, restart node, send another, verify pipeline. +func (s *SnapshotPolicySuite) runSnapshotPolicyTest(cfg snapshotPolicyConfig) { + require := s.Require() + dappPath := envOrDefault( + "CARTESI_TEST_DAPP_PATH", "applications/echo-dapp") + policyStr := strings.ToLower( + strings.ReplaceAll(string(cfg.Policy), "_", "-")) + s.appName = uniqueAppName("snap-" + policyStr) + defer timed(s.T(), "snapshot "+policyStr+" test")() + + // === Deploy with snapshot policy === + + s.T().Logf( + "--- Setup: deploying echo-dapp with %s snapshot policy ---", + policyStr) + addr := s.deployWithSnapshotPolicy( + s.appName, dappPath, cfg.Policy, cfg.ExtraDeployArgs...) + + err := anvilSetBalance(s.ctx, addr, oneEtherWei) + require.NoError(err, "fund application contract") + s.T().Log(" funded application contract with 1 ETH") + + // === Send first input === + + s.T().Logf("Sending first input (snap-%s-1)...", policyStr) + idx1, _, err := sendInput( + s.ctx, s.appName, "snap-"+policyStr+"-1") + require.NoError(err, "send first input") + s.T().Logf(" input sent (index=%d)", idx1) + + func() { + defer timed(s.T(), "wait for first input")() + processCtx, processCancel := context.WithTimeout( + s.ctx, inputProcessingTimeout) + defer processCancel() + + input, err := waitForInputProcessed( + processCtx, s.T(), s.appName, idx1) + require.NoError(err, "wait for first input processing") + require.Equal( + model.InputCompletionStatus_Accepted, input.Status) + s.T().Log(" first input ACCEPTED") + }() + + outputs, err := readOutputs(s.ctx, s.appName) + require.NoError(err, "read outputs after first input") + require.Equal( + uint64(echoOutputsPerInput), outputs.Pagination.TotalCount, + "should have %d outputs after 1 input", echoOutputsPerInput) + s.T().Logf(" verified %d outputs after first input", + echoOutputsPerInput) + + // === Restart node to test snapshot loading === + + s.T().Logf( + "Restarting node to test snapshot loading with %s policy...", + policyStr) + func() { + defer timed(s.T(), "node restart cycle ("+policyStr+")")() + stopSharedNode(s.T()) + startSharedNode(s.T()) + }() + + // === Send second input after restart === + + s.T().Logf( + "Sending second input after restart (snap-%s-2)...", policyStr) + idx2, _, err := sendInput( + s.ctx, s.appName, "snap-"+policyStr+"-2") + require.NoError(err, "send second input after restart") + s.T().Logf(" input sent (index=%d)", idx2) + + func() { + defer timed(s.T(), "wait for second input after restart")() + processCtx, processCancel := context.WithTimeout( + s.ctx, inputProcessingTimeout) + defer processCancel() + + input, err := waitForInputProcessed( + processCtx, s.T(), s.appName, idx2) + require.NoError(err, "wait for second input after restart") + require.Equal( + model.InputCompletionStatus_Accepted, input.Status) + s.T().Log(" second input ACCEPTED after restart") + }() + + outputsAfter, err := readOutputs(s.ctx, s.appName) + require.NoError(err, "read outputs after restart") + require.Equal( + uint64(2*echoOutputsPerInput), + outputsAfter.Pagination.TotalCount, + "should have %d outputs after 2 inputs", + 2*echoOutputsPerInput) + s.T().Logf(" verified %d outputs after restart", + 2*echoOutputsPerInput) + + // === Optional pre-claim hook (e.g. PRT tournament settlement) === + + if cfg.PreClaimHook != nil { + cfg.PreClaimHook(s.ctx, s.T(), require, s.appName) + } + + // === Verify full L1 pipeline === + + s.T().Log("Verifying claim and execution...") + epochIndex := outputs.Data[0].EpochIndex + + var voucherIdx, noticeIdx uint64 + voucherFound, noticeFound := false, false + for _, out := range outputs.Data { + if out.DecodedData == nil { + continue + } + switch out.DecodedData.Type { + case "Voucher": + voucherIdx = out.Index + voucherFound = true + case "Notice": + noticeIdx = out.Index + noticeFound = true + } + } + require.True(voucherFound, "voucher output not found") + require.True(noticeFound, "notice output not found") + + verifyClaimAndExecute(s.ctx, s.T(), require, verifyAndExecuteConfig{ + AppName: s.appName, + EpochIndex: epochIndex, + EpochOutputs: outputs.Data, + VoucherIdx: voucherIdx, + NoticeIdx: noticeIdx, + }) + + s.T().Logf("=== Snapshot %s test complete ===", policyStr) +} + +// TestSnapshotPolicyEveryInput tests the EVERY_INPUT snapshot policy +// with Authority consensus. +func (s *SnapshotPolicySuite) TestSnapshotPolicyEveryInput() { + s.runSnapshotPolicyTest(snapshotPolicyConfig{ + Policy: model.SnapshotPolicy_EveryInput, + }) +} + +// TestSnapshotPolicyEveryEpoch tests the EVERY_EPOCH snapshot policy +// with Authority consensus. +func (s *SnapshotPolicySuite) TestSnapshotPolicyEveryEpoch() { + s.runSnapshotPolicyTest(snapshotPolicyConfig{ + Policy: model.SnapshotPolicy_EveryEpoch, + }) +} + +// TestSnapshotPolicyEveryInputPrt tests the EVERY_INPUT snapshot policy +// with PRT (Dave) consensus. +func (s *SnapshotPolicySuite) TestSnapshotPolicyEveryInputPrt() { + // PRT settlement mines hundreds of blocks rapidly, which can cause + // transient BlockOutOfRangeError in the EVM reader. + s.SetAllowedErrors(AllowedError{ + Pattern: regexp.MustCompile(`BlockOutOfRangeError`), + Reason: "transient Anvil error during rapid block mining in PRT settlement", + }) + + endpoint := envOrDefault( + "CARTESI_BLOCKCHAIN_HTTP_ENDPOINT", "http://localhost:8545") + ethClient, err := ethclient.Dial(endpoint) + s.Require().NoError(err, "dial ethclient") + defer ethClient.Close() + + s.runSnapshotPolicyTest(snapshotPolicyConfig{ + Policy: model.SnapshotPolicy_EveryInput, + ExtraDeployArgs: []string{"--prt"}, + PreClaimHook: func( + ctx context.Context, t testing.TB, + require *require.Assertions, appName string, + ) { + settleTournament(ctx, t, require, ethClient, appName, 0) + settleTournament(ctx, t, require, ethClient, appName, 1) + }, + }) +} + +// TestSnapshotPolicyEveryEpochPrt tests the EVERY_EPOCH snapshot policy +// with PRT (Dave) consensus. +func (s *SnapshotPolicySuite) TestSnapshotPolicyEveryEpochPrt() { + // PRT settlement mines hundreds of blocks rapidly, which can cause + // transient BlockOutOfRangeError in the EVM reader. + s.SetAllowedErrors(AllowedError{ + Pattern: regexp.MustCompile(`BlockOutOfRangeError`), + Reason: "transient Anvil error during rapid block mining in PRT settlement", + }) + + endpoint := envOrDefault( + "CARTESI_BLOCKCHAIN_HTTP_ENDPOINT", "http://localhost:8545") + ethClient, err := ethclient.Dial(endpoint) + s.Require().NoError(err, "dial ethclient") + defer ethClient.Close() + + s.runSnapshotPolicyTest(snapshotPolicyConfig{ + Policy: model.SnapshotPolicy_EveryEpoch, + ExtraDeployArgs: []string{"--prt"}, + PreClaimHook: func( + ctx context.Context, t testing.TB, + require *require.Assertions, appName string, + ) { + settleTournament(ctx, t, require, ethClient, appName, 0) + settleTournament(ctx, t, require, ethClient, appName, 1) + }, + }) +}