Skip to content
Merged
3 changes: 2 additions & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -119,10 +119,11 @@ RUN go install gotest.tools/gotestsum@v${GOTESTSUM_VERSION}
ENV PATH="${GOPATH}/bin:${PATH}"
ENV GOLANGCI_LINT_CACHE=${GOCACHE}/golangci-lint

# Create /dapps directory owned by cartesi for Docker named volume pre-population.
# Create directories owned by cartesi for Docker named volume pre-population.
# When a named volume is first mounted here, Docker copies this ownership.
USER root
RUN mkdir -p /dapps && chown cartesi:cartesi /dapps
RUN mkdir -p /var/lib/cartesi-rollups-node/logs && chown cartesi:cartesi /var/lib/cartesi-rollups-node/logs
USER cartesi

# =============================================================================
Expand Down
29 changes: 5 additions & 24 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -449,33 +449,14 @@ test-with-compose: ## Run all tests using docker compose with auto-shutdown

integration-test-local: build echo-dapp reject-loop-dapp exception-loop-dapp ## Run integration tests locally (requires: make start && eval $$(make env))
@cartesi-rollups-cli db init
@NODE_LOG_FILE=$$(mktemp /tmp/rollups-node-log.XXXXXX); \
echo "Node log file: $$NODE_LOG_FILE"; \
echo "Starting node in background..."; \
env CARTESI_ADVANCER_POLLING_INTERVAL=1 \
CARTESI_VALIDATOR_POLLING_INTERVAL=1 \
CARTESI_CLAIMER_POLLING_INTERVAL=1 \
CARTESI_PRT_POLLING_INTERVAL=1 \
cartesi-rollups-node > $$NODE_LOG_FILE 2>&1 & NODE_PID=$$!; \
tail -f $$NODE_LOG_FILE & TAIL_PID=$$!; \
trap 'kill $$TAIL_PID 2>/dev/null; echo "Stopping node (pid $$NODE_PID)..."; kill $$NODE_PID 2>/dev/null; wait $$NODE_PID 2>/dev/null; echo "Node logs saved to $$NODE_LOG_FILE"' EXIT; \
echo "Waiting for node to become healthy..."; \
attempts=0; \
until curl -sf http://localhost:10000/readyz >/dev/null 2>&1; do \
attempts=$$((attempts + 1)); \
if [ $$attempts -ge 60 ]; then \
echo "ERROR: Node failed to become healthy after 120 seconds"; \
echo "Last 50 lines of node log:"; \
tail -50 $$NODE_LOG_FILE; \
exit 1; \
fi; \
@if lsof -ti:10000 >/dev/null 2>&1; then \
echo "Killing stale node on port 10000..."; \
kill $$(lsof -ti:10000) 2>/dev/null || true; \
sleep 2; \
done; \
echo "Node is healthy. Running integration tests..."; \
export CARTESI_TEST_DAPP_PATH=$(CURDIR)/applications/echo-dapp; \
fi
@export CARTESI_TEST_DAPP_PATH=$(CURDIR)/applications/echo-dapp; \
export CARTESI_TEST_REJECT_DAPP_PATH=$(CURDIR)/applications/reject-loop-dapp; \
export CARTESI_TEST_EXCEPTION_DAPP_PATH=$(CURDIR)/applications/exception-loop-dapp; \
export CARTESI_TEST_NODE_LOG_FILE=$$NODE_LOG_FILE; \
$(MAKE) integration-test

ci-test: ## Run the full CI test pipeline locally (lint + unit + integration)
Expand Down
17 changes: 15 additions & 2 deletions internal/claimer/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ func (cb *claimerBlockchain) submitClaimToBlockchain(
tx, err := ic.SubmitClaim(cb.txOpts, application.IApplicationAddress,
lastBlockNumber, *epoch.OutputsMerkleRoot)
if err != nil {
cb.logger.Error("submitClaimToBlockchain:failed",
cb.logger.Warn("submitClaimToBlockchain:failed",
"appContractAddress", application.IApplicationAddress,
"claimHash", *epoch.OutputsMerkleRoot,
"last_block", epoch.LastBlock,
Expand Down Expand Up @@ -225,7 +225,13 @@ func (cb *claimerBlockchain) findClaimAcceptedEventAndSucc(
onHit := newOnHit(ctx, application.IApplicationAddress, filter,
func(it *iconsensus.IConsensusClaimAcceptedIterator) {
event := it.Event
if (len(events) > 0) || claimAcceptedEventMatches(application, epoch, event) {
// Match on epoch identity (app + lastBlock) without
// requiring the merkle root to match. This ensures
// that a ClaimAccepted event from a different claim
// (outvoting in Quorum) is returned to the caller,
// where claimAcceptedEventMatches detects the
// mismatch and sets the app as inoperable.
if (len(events) > 0) || claimAcceptedEventMatchesEpoch(application, epoch, event) {
events = append(events, event)
}
},
Expand Down Expand Up @@ -256,6 +262,13 @@ func (cb *claimerBlockchain) getConsensusAddress(
return ethutil.GetConsensus(ctx, cb.client, app.IApplicationAddress)
}

// isNotFirstClaimError checks whether an error from submitClaim is
// a NotFirstClaim revert, indicating the claim was already submitted
// on-chain (e.g., before a node restart).
func isNotFirstClaimError(err error) bool {
return ethutil.IsCustomError(err, iconsensus.IConsensusMetaData, "NotFirstClaim")
}

// poll a transaction for its receipt
func (cb *claimerBlockchain) pollTransaction(
ctx context.Context,
Expand Down
76 changes: 76 additions & 0 deletions internal/claimer/claimer.go
Original file line number Diff line number Diff line change
Expand Up @@ -335,6 +335,70 @@ func (s *Service) submitClaimsAndUpdateDatabase(
)
txHash, err := s.blockchain.submitClaimToBlockchain(ic, app, currEpoch)
if err != nil {
// NotFirstClaim handling after restart.
//
// Gas estimation (eth_estimateGas) simulates
// the call before broadcasting, so the revert
// is caught without spending gas. This relies
// on txOpts.GasLimit == 0 (the default); if
// GasLimit were pre-set, the tx would skip
// estimation and revert on-chain.
//
// Authority: submitClaim checks a per-epoch
// bitmap. Any duplicate (same epoch, regardless
// of merkle root) reverts with NotFirstClaim.
// After restart this is benign — the node
// recomputed the same claim that was already
// on-chain. Both ClaimSubmitted and
// ClaimAccepted events were already emitted
// (Authority emits both atomically).
//
// Quorum: submitClaim first checks if this
// validator already voted for the SAME claim
// (same app + lastBlock + merkleRoot). If so,
// it silently returns — no revert, no event.
// It only reverts with NotFirstClaim when the
// validator voted for a DIFFERENT merkleRoot
// in the same epoch (checked via allVotes
// bitmap). After restart, this means the node
// recomputed a different claim hash than what
// it submitted pre-restart — a determinism
// violation. ClaimSubmitted was emitted for
// the original vote; ClaimAccepted is emitted
// only once a majority of validators agree.
if isNotFirstClaimError(err) {
if app.ConsensusType == model.Consensus_Quorum {
// Quorum only reverts with NotFirstClaim
// when the merkle root differs. This is
// unrecoverable: computation is expected
// to be deterministic, so recomputing
// will produce the same divergent hash.
err = s.setApplicationInoperable(
s.Context,
app,
"NotFirstClaim from Quorum consensus: "+
"computed claim hash %s differs from "+
"previously submitted claim for "+
"epoch with last_block %d. "+
"Possible determinism violation or "+
"machine state corruption.",
hashToHex(currEpoch.OutputsMerkleRoot),
currEpoch.LastBlock,
)
delete(computedEpochs, key)
errs = append(errs, err)
continue
}
s.Logger.Info(
"Claim already on-chain, "+
"waiting for event sync",
"app", app.IApplicationAddress,
"claim_hash",
hashToHex(currEpoch.OutputsMerkleRoot),
"last_block", currEpoch.LastBlock,
)
continue
}
delete(computedEpochs, key)
errs = append(errs, err)
continue
Expand Down Expand Up @@ -584,6 +648,18 @@ func claimAcceptedEventMatches(application *model.Application, epoch *model.Epoc
epoch.LastBlock == event.LastProcessedBlockNumber.Uint64()
}

// claimAcceptedEventMatchesEpoch checks if a ClaimAccepted event belongs to
// the same epoch (app + lastBlock) regardless of the merkle root. This is
// used to detect outvoting in Quorum: a ClaimAccepted event exists for the
// epoch but with a different merkle root than what this node submitted.
func claimAcceptedEventMatchesEpoch(application *model.Application, epoch *model.Epoch, event *iconsensus.IConsensusClaimAccepted) bool {
if application == nil || epoch == nil || event == nil {
return false
}
return application.IApplicationAddress == event.AppContract &&
epoch.LastBlock == event.LastProcessedBlockNumber.Uint64()
}

func (s *Service) String() string {
return s.Name
}
91 changes: 89 additions & 2 deletions internal/claimer/claimer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,10 +207,10 @@ func makeApplication() *model.Application {

func makeEpoch(id int64, status model.EpochStatus, i uint64) *model.Epoch {
outputsMerkleRoot := common.HexToHash("0x01") // dummy value
txHash := common.HexToHash("0x02") // dummy value
txHash := common.HexToHash("0x02") // dummy value
return repotest.NewEpochBuilder(id).
WithIndex(i).
WithBlocks(i * 10, i * 10 + 9).
WithBlocks(i*10, i*10+9).
WithStatus(status).
WithClaimTransactionHash(txHash).
WithOutputsMerkleRoot(outputsMerkleRoot).
Expand Down Expand Up @@ -267,6 +267,33 @@ func makeAcceptedEvent(app *model.Application, epoch *model.Epoch) *iconsensus.I
}
}

// rpcDataError simulates an RPC error with revert data, as returned by
// eth_estimateGas when the contract reverts.
type rpcDataError struct {
code int
msg string
data any
}

func (e *rpcDataError) Error() string { return e.msg }
func (e *rpcDataError) ErrorCode() int { return e.code }
func (e *rpcDataError) ErrorData() any { return e.data }

// notFirstClaimError creates an error that mimics a NotFirstClaim revert
// from eth_estimateGas, with the ABI error selector as revert data.
func notFirstClaimError() error {
parsed, _ := iconsensus.IConsensusMetaData.GetAbi()
id := parsed.Errors["NotFirstClaim"].ID
selector := fmt.Sprintf("0x%x", id[:4])
return &rpcDataError{
code: 3,
msg: "execution reverted",
data: selector + "000000000000000000000000" +
"01000000000000000000000000000000000000000000000000000000000000" +
"0000000000000000000000000000000000000000000000000000000000000027",
}
}

// //////////////////////////////////////////////////////////////////////////////
// Success
// //////////////////////////////////////////////////////////////////////////////
Expand Down Expand Up @@ -581,6 +608,66 @@ func TestSubmitFailedClaim(t *testing.T) {
assert.Equal(t, 0, len(errs))
}

// TestNotFirstClaimHandledGracefully verifies that when submitClaim reverts
// with NotFirstClaim (e.g., after a node restart where claimsInFlight was
// lost), the claimer handles it gracefully — no error, no claimsInFlight
// entry, and the claim is left for event sync to pick up.
func TestNotFirstClaimHandledGracefully(t *testing.T) {
m, r, b := newServiceMock()
defer r.AssertExpectations(t)
defer b.AssertExpectations(t)

endBlock := big.NewInt(40)
app := makeApplication()
currEpoch := makeComputedEpoch(app, 3)
var prevEvent *iconsensus.IConsensusClaimSubmitted
var currEvent *iconsensus.IConsensusClaimSubmitted

b.On("getConsensusAddress", mock.Anything, app).
Return(app.IConsensusAddress, nil).Once()
b.On("findClaimSubmittedEventAndSucc", mock.Anything, app, currEpoch, currEpoch.LastBlock+1, endBlock.Uint64()).
Return(&iconsensus.IConsensus{}, prevEvent, currEvent, nil).Once()
// submitClaim reverts with NotFirstClaim (caught by eth_estimateGas).
b.On("submitClaimToBlockchain", mock.Anything, app, currEpoch).
Return(common.Hash{}, notFirstClaimError()).Once()

errs := m.submitClaimsAndUpdateDatabase(
makeEpochMap(), makeEpochMap(currEpoch), makeApplicationMap(app), endBlock)
assert.Equal(t, 0, len(errs))
assert.Equal(t, 0, len(m.claimsInFlight))
}

// TestNotFirstClaimQuorumSetsInoperable verifies that when submitClaim reverts
// with NotFirstClaim for a Quorum app, the claimer marks the application as
// inoperable. In Quorum, NotFirstClaim means the validator previously submitted
// a different merkle root — a determinism violation.
func TestNotFirstClaimQuorumSetsInoperable(t *testing.T) {
m, r, b := newServiceMock()
defer r.AssertExpectations(t)
defer b.AssertExpectations(t)

endBlock := big.NewInt(40)
app := makeApplication()
app.ConsensusType = model.Consensus_Quorum
currEpoch := makeComputedEpoch(app, 3)
var prevEvent *iconsensus.IConsensusClaimSubmitted
var currEvent *iconsensus.IConsensusClaimSubmitted

b.On("getConsensusAddress", mock.Anything, app).
Return(app.IConsensusAddress, nil).Once()
b.On("findClaimSubmittedEventAndSucc", mock.Anything, app, currEpoch, currEpoch.LastBlock+1, endBlock.Uint64()).
Return(&iconsensus.IConsensus{}, prevEvent, currEvent, nil).Once()
b.On("submitClaimToBlockchain", mock.Anything, app, currEpoch).
Return(common.Hash{}, notFirstClaimError()).Once()
r.On("UpdateApplicationState", mock.Anything, int64(0), model.ApplicationState_Inoperable, mock.Anything).
Return(nil).Once()

errs := m.submitClaimsAndUpdateDatabase(
makeEpochMap(), makeEpochMap(currEpoch), makeApplicationMap(app), endBlock)
assert.Equal(t, 1, len(errs))
assert.Equal(t, 0, len(m.claimsInFlight))
}

// !claimSubmittedMatche(prevClaim, prevEvent)
func TestSubmitClaimWithAntecessorMismatch(t *testing.T) {
m, r, b := newServiceMock()
Expand Down
3 changes: 3 additions & 0 deletions internal/evmreader/output.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,9 @@ func (r *Service) readAndUpdateOutputs(
nextSearchBlock := lastOutputCheck + 1
outputExecutedEvents, err := r.readOutputExecutionsFromBlockChain(ctx, app, nextSearchBlock, mostRecentBlockNumber)
if err != nil {
if errors.Is(err, context.Canceled) {
return // shutting down
}
r.Logger.Error("Error reading output events",
"application", app.application.Name, "address", app.application.IApplicationAddress,
"error", err)
Expand Down
6 changes: 6 additions & 0 deletions internal/evmreader/sealedepochs.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,9 @@ func (r *Service) checkForEpochsAndInputs(

err := r.processApplicationSealedEpochs(ctx, app, mostRecentBlockNumber)
if err != nil {
if errors.Is(err, context.Canceled) {
return // shutting down
}
r.Logger.Error("Error processing application sealed epochs",
"application", app.application.Name,
"consensus_address", app.application.IConsensusAddress,
Expand All @@ -97,6 +100,9 @@ func (r *Service) checkForEpochsAndInputs(

err = r.processApplicationOpenEpoch(ctx, app, mostRecentBlockNumber)
if err != nil {
if errors.Is(err, context.Canceled) {
return // shutting down
}
r.Logger.Error("Error processing application open epoch",
"application", app.application.Name,
"consensus_address", app.application.IConsensusAddress,
Expand Down
25 changes: 15 additions & 10 deletions internal/manager/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,21 @@ func (m *MachineInstanceImpl) Synchronize(ctx context.Context, repo MachineRepos
"app_processed_inputs", m.application.ProcessedInputs,
"machine_processed_inputs", currentProcessed)

// Validate machine vs app state before querying the DB.
// This must be done upfront because the ListInputs query uses OFFSET to skip
// already-processed inputs, and when OFFSET >= total rows the query returns
// 0 rows — making the COUNT(*) OVER() window function value unavailable.
if currentProcessed > m.application.ProcessedInputs {
return fmt.Errorf(
"%w: machine has processed %d inputs but app only has %d",
ErrMachineSynchronization, currentProcessed, m.application.ProcessedInputs)
}
if currentProcessed == m.application.ProcessedInputs {
m.logger.Info("No inputs to replay during synchronization",
"address", appAddress)
return nil
}

initialProcessedInputs := currentProcessed
replayed := uint64(0)
toReplay := uint64(0)
Expand All @@ -201,17 +216,7 @@ func (m *MachineInstanceImpl) Synchronize(ctx context.Context, repo MachineRepos
m.logger.Error(errorMsg, "address", appAddress)
return fmt.Errorf("%w: %s", ErrMachineSynchronization, errorMsg)
}
if currentProcessed > totalCount {
return fmt.Errorf(
"%w: machine has processed %d inputs but DB only has %d",
ErrMachineSynchronization, currentProcessed, totalCount)
}
toReplay = totalCount - currentProcessed
if toReplay == 0 {
m.logger.Info("No inputs to replay during synchronization",
"address", appAddress)
return nil
}
}

for _, input := range inputs {
Expand Down
7 changes: 5 additions & 2 deletions internal/manager/instance_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1217,7 +1217,10 @@ func (r *mockSyncRepository) ListInputs(
}
start := p.Offset
if start >= uint64(len(r.inputs)) {
return nil, r.totalCount, nil
// Match real PostgreSQL behavior: COUNT(*) OVER() is a per-row window
// value. When OFFSET skips all rows, 0 rows are returned and the total
// count is never scanned — the caller sees 0, not the real total.
return nil, 0, nil
}
end := start + p.Limit
if p.Limit == 0 || end > uint64(len(r.inputs)) {
Expand Down Expand Up @@ -1354,7 +1357,7 @@ func (s *MachineInstanceSuite) TestSynchronize() {
err := inst.Synchronize(context.Background(), repo, 1000)
require.Error(err)
require.ErrorIs(err, ErrMachineSynchronization)
require.Contains(err.Error(), "machine has processed 5 inputs but DB only has 3")
require.Contains(err.Error(), "machine has processed 5 inputs but app only has 3")
})

s.Run("CountMismatch", func() {
Expand Down
Loading
Loading