Skip to content

Commit d1de3b3

Browse files
committed
x
1 parent c1fd0f7 commit d1de3b3

File tree

6 files changed

+46
-45
lines changed

6 files changed

+46
-45
lines changed

block/components.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,7 @@ func (bc *Components) Start(ctx context.Context) error {
100100
func (bc *Components) Stop() error {
101101
var errs error
102102
if bc.Executor != nil {
103+
println("+++ stopping executor")
103104
if err := bc.Executor.Stop(); err != nil {
104105
errs = errors.Join(errs, fmt.Errorf("failed to stop executor: %w", err))
105106
}

node/failover.go

Lines changed: 16 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -165,16 +165,14 @@ func setupFailoverState(
165165
}
166166

167167
func (f *failoverState) Run(ctx context.Context) (multiErr error) {
168-
//wg, ctx := errgroup.WithContext(ctx)
169-
var wg = errgroup.Group{}
168+
var wg errgroup.Group
170169
wg.Go(func() error {
171170
f.logger.Info().Str("addr", f.rpcServer.Addr).Msg("Started RPC server")
172171
if err := f.rpcServer.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) {
173172
return err
174173
}
175174
return nil
176175
})
177-
defer f.rpcServer.Shutdown(context.Background()) // nolint: errcheck
178176

179177
if err := f.p2pClient.Start(ctx); err != nil {
180178
return fmt.Errorf("start p2p: %w", err)
@@ -184,8 +182,12 @@ func (f *failoverState) Run(ctx context.Context) (multiErr error) {
184182
if err := f.headerSyncService.Start(ctx); err != nil {
185183
return fmt.Errorf("error while starting header sync service: %w", err)
186184
}
185+
187186
defer func() {
188-
if err := f.headerSyncService.Stop(context.Background()); err != nil && !errors.Is(err, context.Canceled) {
187+
shutdownCtx, done := context.WithTimeout(context.Background(), 3*time.Second)
188+
defer done()
189+
190+
if err := f.headerSyncService.Stop(shutdownCtx); err != nil && !errors.Is(err, context.Canceled) {
189191
multiErr = errors.Join(multiErr, fmt.Errorf("stopping header sync: %w", err))
190192
}
191193
}()
@@ -194,7 +196,10 @@ func (f *failoverState) Run(ctx context.Context) (multiErr error) {
194196
return fmt.Errorf("error while starting data sync service: %w", err)
195197
}
196198
defer func() {
197-
if err := f.dataSyncService.Stop(context.Background()); err != nil && !errors.Is(err, context.Canceled) {
199+
shutdownCtx, done := context.WithTimeout(context.Background(), 3*time.Second)
200+
defer done()
201+
202+
if err := f.dataSyncService.Stop(shutdownCtx); err != nil && !errors.Is(err, context.Canceled) {
198203
multiErr = errors.Join(multiErr, fmt.Errorf("stopping data sync: %w", err))
199204
}
200205
}()
@@ -211,6 +216,12 @@ func (f *failoverState) Run(ctx context.Context) (multiErr error) {
211216
}
212217
}()
213218

219+
defer func() { // shutdown first
220+
shutdownCtx, done := context.WithTimeout(context.Background(), 3*time.Second)
221+
defer done()
222+
_ = f.rpcServer.Shutdown(shutdownCtx)
223+
}()
224+
214225
return wg.Wait()
215226
}
216227

node/full_node_integration_test.go

Lines changed: 6 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -84,16 +84,14 @@ func TestTxGossipingMultipleNodesDAIncluded(t *testing.T) {
8484

8585
numNodes := 4
8686
nodes, cleanups := createNodesWithCleanup(t, numNodes, config)
87-
for _, cleanup := range cleanups {
88-
defer cleanup()
89-
}
9087

9188
ctxs, cancels := createNodeContexts(numNodes)
9289
var runningWg sync.WaitGroup
9390

9491
errChan := make(chan error, numNodes)
9592
// Start only the sequencer first
9693
startNodeInBackground(t, nodes, ctxs, &runningWg, 0, errChan)
94+
t.Cleanup(func() { shutdownAndWait(t, cleanups, &runningWg, 10*time.Second) })
9795

9896
// Wait for the first block to be produced by the sequencer
9997
err := waitForFirstBlock(nodes[0], Header)
@@ -165,16 +163,13 @@ func TestFastDASync(t *testing.T) {
165163
config.DA.BlockTime = evconfig.DurationWrapper{Duration: 200 * time.Millisecond}
166164

167165
nodes, cleanups := createNodesWithCleanup(t, 2, config)
168-
for _, cleanup := range cleanups {
169-
defer cleanup()
170-
}
171-
172166
ctxs, cancels := createNodeContexts(len(nodes))
173167
var runningWg sync.WaitGroup
174168

175169
errChan := make(chan error, len(nodes))
176170
// Start only the first node
177171
startNodeInBackground(t, nodes, ctxs, &runningWg, 0, errChan)
172+
t.Cleanup(func() { shutdownAndWait(t, cleanups, &runningWg, 10*time.Second) })
178173

179174
// Wait for the first node to produce a few blocks
180175
blocksToWaitFor := uint64(2)
@@ -185,6 +180,7 @@ func TestFastDASync(t *testing.T) {
185180

186181
// Now start the second node and time its sync
187182
startNodeInBackground(t, nodes, ctxs, &runningWg, 1, errChan)
183+
188184
start := time.Now()
189185
// Wait for the second node to catch up to the first node
190186
require.NoError(waitForAtLeastNBlocks(nodes[1], blocksToWaitFor, Store))
@@ -327,6 +323,7 @@ func testSingleSequencerSingleFullNode(t *testing.T, source Source) {
327323

328324
// Start the sequencer first
329325
startNodeInBackground(t, nodes, ctxs, &runningWg, 0, errChan)
326+
t.Cleanup(func() { shutdownAndWait(t, cancels, &runningWg, 10*time.Second) })
330327

331328
// Wait for the sequencer to produce at first block
332329
require.NoError(waitForFirstBlock(nodes[0], source))
@@ -346,9 +343,6 @@ func testSingleSequencerSingleFullNode(t *testing.T, source Source) {
346343

347344
// Verify both nodes are synced using the helper
348345
require.NoError(verifyNodesSynced(nodes[0], nodes[1], source))
349-
350-
// Cancel all node contexts to signal shutdown and wait
351-
shutdownAndWait(t, cancels, &runningWg, 5*time.Second)
352346
}
353347

354348
// testSingleSequencerTwoFullNodes sets up a single sequencer and two full nodes, starts the sequencer, waits for it to produce a block, then starts the full nodes.
@@ -370,6 +364,7 @@ func testSingleSequencerTwoFullNodes(t *testing.T, source Source) {
370364

371365
// Start the sequencer first
372366
startNodeInBackground(t, nodes, ctxs, &runningWg, 0, errChan)
367+
t.Cleanup(func() { shutdownAndWait(t, cancels, &runningWg, 10*time.Second) })
373368

374369
// Wait for the sequencer to produce at first block
375370
require.NoError(waitForFirstBlock(nodes[0], source))
@@ -397,9 +392,6 @@ func testSingleSequencerTwoFullNodes(t *testing.T, source Source) {
397392
for i := 1; i < numNodes; i++ {
398393
require.NoError(verifyNodesSynced(nodes[0], nodes[i], source))
399394
}
400-
401-
// Cancel all node contexts to signal shutdown and wait
402-
shutdownAndWait(t, cancels, &runningWg, 5*time.Second)
403395
}
404396

405397
// testSingleSequencerSingleFullNodeTrustedHash sets up a single sequencer and a single full node with a trusted hash, starts the sequencer, waits for it to produce a block, then starts the full node with the trusted hash.
@@ -421,6 +413,7 @@ func testSingleSequencerSingleFullNodeTrustedHash(t *testing.T, source Source) {
421413

422414
// Start the sequencer first
423415
startNodeInBackground(t, nodes, ctxs, &runningWg, 0, errChan)
416+
t.Cleanup(func() { shutdownAndWait(t, cancels, &runningWg, 10*time.Second) })
424417

425418
// Wait for the sequencer to produce at first block
426419
require.NoError(waitForFirstBlock(nodes[0], source))
@@ -461,9 +454,6 @@ func testSingleSequencerSingleFullNodeTrustedHash(t *testing.T, source Source) {
461454

462455
// Verify both nodes are synced using the helper
463456
require.NoError(verifyNodesSynced(nodes[0], nodes[1], source))
464-
465-
// Cancel all node contexts to signal shutdown and wait
466-
shutdownAndWait(t, cancels, &runningWg, 5*time.Second)
467457
}
468458

469459
// TestTwoChainsInOneNamespace verifies that two chains in the same namespace can coexist without any issues.

node/helpers_test.go

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,10 @@ func newTestNode(
107107
remoteSigner, err := remote_signer.NewNoopSigner(genesisValidatorKey)
108108
require.NoError(t, err)
109109

110+
logger := zerolog.Nop()
111+
if testing.Verbose() {
112+
logger = zerolog.New(zerolog.NewTestWriter(t))
113+
}
110114
node, err := NewNode(
111115
config,
112116
executor,
@@ -117,7 +121,7 @@ func newTestNode(
117121
genesis,
118122
ds,
119123
DefaultMetricsProvider(evconfig.DefaultInstrumentationConfig()),
120-
zerolog.Nop(),
124+
logger,
121125
NodeOptions{},
122126
)
123127
require.NoError(t, err)
@@ -190,12 +194,6 @@ func createNodesWithCleanup(t *testing.T, num int, config evconfig.Config) ([]*F
190194
// Update cleanup to cancel the context instead of calling Stop
191195
cleanup := func() {
192196
stopDAHeightTicker()
193-
// slow down shutdown to let caches persist
194-
for _, n := range nodes {
195-
if n.IsRunning() {
196-
time.Sleep(time.Second / 10)
197-
}
198-
}
199197
}
200198

201199
nodes[0], cleanups[0] = aggNode.(*FullNode), cleanup
@@ -268,7 +266,7 @@ func startNodeInBackground(t *testing.T, nodes []*FullNode, ctxs []context.Conte
268266
}
269267

270268
// Helper to cancel all contexts and wait for goroutines with timeout
271-
func shutdownAndWait(t *testing.T, cancels []context.CancelFunc, wg *sync.WaitGroup, timeout time.Duration) {
269+
func shutdownAndWait[T ~func()](t *testing.T, cancels []T, wg *sync.WaitGroup, timeout time.Duration) {
272270
for _, cancel := range cancels {
273271
cancel()
274272
}

node/single_sequencer_integration_test.go

Lines changed: 16 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66
"context"
77
"errors"
88
"fmt"
9+
"io"
910
"net/http"
1011
"sync"
1112
"testing"
@@ -68,6 +69,7 @@ func (s *FullNodeTestSuite) SetupTest() {
6869

6970
// Start the node in a goroutine using Run instead of Start
7071
s.startNodeInBackground(s.node)
72+
s.T().Cleanup(func() { shutdownAndWait(s.T(), []context.CancelFunc{s.cancel}, &s.runningWg, 10*time.Second) })
7173

7274
// Verify that the node is running and producing blocks
7375
err := waitForFirstBlock(s.node, Header)
@@ -242,6 +244,7 @@ func TestStateRecovery(t *testing.T) {
242244

243245
// Start the sequencer first
244246
startNodeInBackground(t, []*FullNode{node}, []context.Context{ctx}, &runningWg, 0, nil)
247+
t.Cleanup(func() { shutdownAndWait(t, []context.CancelFunc{cancel}, &runningWg, 10*time.Second) })
245248

246249
blocksToWaitFor := uint64(20)
247250
// Wait for the sequencer to produce at first block
@@ -285,6 +288,7 @@ func TestMaxPendingHeadersAndData(t *testing.T) {
285288

286289
var runningWg sync.WaitGroup
287290
startNodeInBackground(t, []*FullNode{node}, []context.Context{ctx}, &runningWg, 0, nil)
291+
t.Cleanup(func() { shutdownAndWait(t, []context.CancelFunc{cancel}, &runningWg, 10*time.Second) })
288292

289293
// Wait blocks to be produced up to max pending
290294
numExtraBlocks := uint64(5)
@@ -294,9 +298,6 @@ func TestMaxPendingHeadersAndData(t *testing.T) {
294298
height, err := getNodeHeight(node, Store)
295299
require.NoError(err)
296300
require.LessOrEqual(height, config.Node.MaxPendingHeadersAndData)
297-
298-
// Stop the node and wait for shutdown
299-
shutdownAndWait(t, []context.CancelFunc{cancel}, &runningWg, 10*time.Second)
300301
}
301302

302303
// TestBatchQueueThrottlingWithDAFailure tests that when DA layer fails and MaxPendingHeadersAndData
@@ -334,7 +335,9 @@ func TestBatchQueueThrottlingWithDAFailure(t *testing.T) {
334335
var runningWg sync.WaitGroup
335336
errChan := make(chan error, 1)
336337
startNodeInBackground(t, []*FullNode{node}, []context.Context{ctx}, &runningWg, 0, errChan)
338+
t.Cleanup(func() { shutdownAndWait(t, []context.CancelFunc{cancel}, &runningWg, 10*time.Second) })
337339
require.Len(errChan, 0, "Expected no errors when starting node")
340+
338341
// Wait for the node to start producing blocks
339342
waitForBlockN(t, 1, node, config.Node.BlockTime.Duration)
340343

@@ -387,13 +390,6 @@ func TestBatchQueueThrottlingWithDAFailure(t *testing.T) {
387390
require.NoError(err)
388391
t.Logf("Final height: %d", finalHeight)
389392
cancel() // stop the node
390-
if v, ok := node.leaderElection.(*singleRoleElector); ok {
391-
// skip cache persistence to avoid race condition with shutdown
392-
v.runnable.(*failoverState).bc.Cache = nil
393-
} else {
394-
time.Sleep(time.Second)
395-
}
396-
397393
// The height should not have increased much due to MaxPendingHeadersAndData limit
398394
// Allow at most 3 additional blocks due to timing and pending blocks in queue
399395
heightIncrease := finalHeight - heightAfterDAFailure
@@ -427,6 +423,7 @@ func waitForBlockN(t *testing.T, n uint64, node *FullNode, blockInterval time.Du
427423
return got >= n
428424
}, timeout[0], blockInterval/2)
429425
}
426+
430427
func TestReadinessEndpointWhenBlockProductionStops(t *testing.T) {
431428
require := require.New(t)
432429

@@ -443,14 +440,20 @@ func TestReadinessEndpointWhenBlockProductionStops(t *testing.T) {
443440
defer cancel()
444441

445442
var runningWg sync.WaitGroup
446-
startNodeInBackground(t, []*FullNode{node}, []context.Context{ctx}, &runningWg, 0, nil)
443+
errChan := make(chan error, 1)
444+
startNodeInBackground(t, []*FullNode{node}, []context.Context{ctx}, &runningWg, 0, errChan)
445+
t.Cleanup(func() { shutdownAndWait(t, []context.CancelFunc{cancel}, &runningWg, 10*time.Second) })
446+
require.Len(errChan, 0, "Expected no errors when starting node")
447447

448-
waitForBlockN(t, 3, node, config.Node.BlockTime.Duration)
448+
waitForBlockN(t, 2, node, config.Node.BlockTime.Duration)
449+
require.Len(errChan, 0, "Expected no errors when starting node")
449450

450451
resp, err := http.Get("http://" + config.RPC.Address + "/health/ready")
451452
require.NoError(err)
452-
require.Equal(http.StatusOK, resp.StatusCode, "Readiness should be READY while producing blocks")
453+
body, err := io.ReadAll(resp.Body)
454+
require.NoError(err)
453455
resp.Body.Close()
456+
require.Equal(http.StatusOK, resp.StatusCode, "Readiness should be READY while producing blocks: %s", body)
454457

455458
time.Sleep(time.Duration(config.Node.MaxPendingHeadersAndData+2) * config.Node.BlockTime.Duration)
456459

@@ -466,6 +469,4 @@ func TestReadinessEndpointWhenBlockProductionStops(t *testing.T) {
466469
defer resp.Body.Close()
467470
return resp.StatusCode == http.StatusServiceUnavailable
468471
}, 10*time.Second, 100*time.Millisecond, "Readiness should be UNREADY after aggregator stops producing blocks (5x block time)")
469-
470-
shutdownAndWait(t, []context.CancelFunc{cancel}, &runningWg, 10*time.Second)
471472
}

scripts/test.mk

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ test-all: test test-docker-e2e
1818
## test-integration: Running integration tests
1919
test-integration:
2020
@echo "--> Running integration tests"
21-
@cd node && go test -mod=readonly -failfast -timeout=15m -tags='integration' ./...
21+
@cd node && go test -mod=readonly -failfast -v -timeout=15m -tags='integration' ./...
2222
.PHONY: test-integration
2323

2424
## test-e2e: Running e2e tests

0 commit comments

Comments
 (0)