From 1d2378bb0a80902d7bdc72567c5964bae76640fa Mon Sep 17 00:00:00 2001 From: purelualight Date: Sat, 2 May 2026 00:50:03 +0800 Subject: [PATCH] refactor: use WaitGroup.Go to simplify code Signed-off-by: purelualight --- cli/operator/node.go | 6 +-- eth/executionclient/execution_client_test.go | 6 +-- hprobe/prober.go | 7 +-- ibft/storage/store_test.go | 6 +-- network/p2p/p2p_test.go | 26 ++++------- network/topics/controller_test.go | 6 +-- operator/duties/attester.go | 7 +-- operator/duties/scheduler.go | 44 ++++++------------- operator/duties/sync_committee.go | 7 +-- operator/dutytracer/collector_bench_test.go | 6 +-- operator/validator/router_test.go | 7 +-- protocol/v2/qbft/instance/process_msg_test.go | 6 +-- protocol/v2/ssv/queue/queue_test.go | 18 +++----- protocol/v2/ssv/runner/committee.go | 7 +-- .../v2/ssv/validator/committee_queue_test.go | 12 ++--- registry/storage/shares_test.go | 6 +-- utils/hashmap/hashmap_test.go | 19 +++----- utils/ttl/map_test.go | 24 ++++------ 18 files changed, 70 insertions(+), 150 deletions(-) diff --git a/cli/operator/node.go b/cli/operator/node.go index 48e68fad1c..af5f8904ac 100644 --- a/cli/operator/node.go +++ b/cli/operator/node.go @@ -1357,11 +1357,9 @@ func initSlotPruning(ctx context.Context, stores *ibftstorage.ParticipantStores, // async perform initial slot gc _ = stores.Each(func(_ spectypes.BeaconRole, store ibftstorage.ParticipantStore) error { - wg.Add(1) - go func() { - defer wg.Done() + wg.Go(func() { store.Prune(ctx, threshold) - }() + }) return nil }) diff --git a/eth/executionclient/execution_client_test.go b/eth/executionclient/execution_client_test.go index 0b5f443af5..8969d6d589 100644 --- a/eth/executionclient/execution_client_test.go +++ b/eth/executionclient/execution_client_test.go @@ -1016,9 +1016,7 @@ func TestSubscribeFilterLogs(t *testing.T) { // Create a goroutine to collect logs var receivedLogs []ethtypes.Log var wg sync.WaitGroup - wg.Add(1) - go func() { - defer wg.Done() + wg.Go(func() { for i := 0; i < 3; i++ { select { case log := <-logCh: @@ -1030,7 +1028,7 @@ func TestSubscribeFilterLogs(t *testing.T) { return } } - }() + }) // Create blocks with transactions err = env.createBlocksWithLogs(contract, 3, 10*time.Millisecond) diff --git a/hprobe/prober.go b/hprobe/prober.go index cfdc35c0a2..eba39b898e 100644 --- a/hprobe/prober.go +++ b/hprobe/prober.go @@ -53,17 +53,14 @@ func (p *HealthProber) ProbeAll(ctx context.Context) error { errsCh := make(chan error) p.components.Range(func(name string, n pComponent) bool { - wg.Add(1) - go func() { - defer wg.Done() - + wg.Go(func() { err := p.probeComponent(ctx, n) if err != nil { // Relay the error and quit early. errsCh <- fmt.Errorf("probe component %s: %w", name, err) cancel() } - }() + }) return true }) diff --git a/ibft/storage/store_test.go b/ibft/storage/store_test.go index 0abeda512a..2781f6ca4f 100644 --- a/ibft/storage/store_test.go +++ b/ibft/storage/store_test.go @@ -211,11 +211,9 @@ func TestSlotCleanupJob(t *testing.T) { // run normal gc var wg sync.WaitGroup - wg.Add(1) - go func() { - defer wg.Done() + wg.Go(func() { storage.PruneContinuously(ctx, tickerProv, 1) - }() + }) mockTimeChan <- time.Now() mockSlotChan <- phase0.Slot(5) diff --git a/network/p2p/p2p_test.go b/network/p2p/p2p_test.go index 6fb2125318..b8dd4a00d2 100644 --- a/network/p2p/p2p_test.go +++ b/network/p2p/p2p_test.go @@ -41,27 +41,23 @@ func TestCurrentSubnetsConcurrentAccess(t *testing.T) { var wg sync.WaitGroup start := make(chan struct{}) - wg.Add(1) - go func() { - defer wg.Done() + wg.Go(func() { <-start for i := uint64(0); i < 5000; i++ { subnets := commons.ZeroSubnets subnets.Set(i % commons.SubnetsCount) n.setCurrentSubnets(subnets) } - }() + }) - wg.Add(1) - go func() { - defer wg.Done() + wg.Go(func() { <-start for i := 0; i < 5000; i++ { subnets := n.ActiveSubnets() _ = subnets.ActiveCount() _ = subnets.StringHex() } - }() + }) close(start) wg.Wait() @@ -107,10 +103,8 @@ func TestP2pNetwork_SubscribeBroadcast(t *testing.T) { broadcastErrCh <- err } } - wg.Add(1) - go func() { - defer wg.Done() + wg.Go(func() { msgCommittee1 := generateCommitteeMsg(spectestingutils.Testing4SharesSet(), 1) msgCommittee3 := generateCommitteeMsg(spectestingutils.Testing4SharesSet(), 3) msgProposer := generateValidatorMsg(spectestingutils.Testing4SharesSet(), 4, spectypes.RoleProposer) @@ -128,13 +122,9 @@ func TestP2pNetwork_SubscribeBroadcast(t *testing.T) { recordBroadcastErr(node2.Broadcast(msgSyncCommitteeContribution.SSVMessage.GetID(), msgSyncCommitteeContribution)) <-time.After(time.Millisecond * 20) recordBroadcastErr(node1.Broadcast(msgRoleVoluntaryExit.SSVMessage.GetID(), msgRoleVoluntaryExit)) - }() - - wg.Add(1) - - go func() { - defer wg.Done() + }) + wg.Go(func() { msgCommittee1 := generateCommitteeMsg(spectestingutils.Testing4SharesSet(), 1) msgCommittee2 := generateCommitteeMsg(spectestingutils.Testing4SharesSet(), 2) msgCommittee3 := generateCommitteeMsg(spectestingutils.Testing4SharesSet(), 3) @@ -151,7 +141,7 @@ func TestP2pNetwork_SubscribeBroadcast(t *testing.T) { recordBroadcastErr(node1.Broadcast(msgProposer.SSVMessage.GetID(), msgProposer)) recordBroadcastErr(node1.Broadcast(msgSyncCommitteeContribution.SSVMessage.GetID(), msgSyncCommitteeContribution)) recordBroadcastErr(node2.Broadcast(msgRoleVoluntaryExit.SSVMessage.GetID(), msgRoleVoluntaryExit)) - }() + }) wg.Wait() close(broadcastErrCh) diff --git a/network/topics/controller_test.go b/network/topics/controller_test.go index 53d41fb2f9..e371e906f4 100644 --- a/network/topics/controller_test.go +++ b/network/topics/controller_test.go @@ -156,9 +156,7 @@ func baseTest(t *testing.T, ctx context.Context, logger *zap.Logger, peers []*P, wg.Wait() // let the messages propagate - wg.Add(1) - go func() { - defer wg.Done() + wg.Go(func() { // check number of peers and messages for i := 0; i < nValidators; i++ { wg.Add(1) @@ -178,7 +176,7 @@ func baseTest(t *testing.T, ctx context.Context, logger *zap.Logger, peers []*P, } }(cids[i]) } - }() + }) wg.Wait() t.Log("unsubscribing") diff --git a/operator/duties/attester.go b/operator/duties/attester.go index b66975db61..58d541875f 100644 --- a/operator/duties/attester.go +++ b/operator/duties/attester.go @@ -436,10 +436,7 @@ func (h *AttesterHandler) fetchAndProcessDuties(ctx context.Context, logger *zap attribute.Int("ssv.validator.duty.subscriptions", len(subscriptions)), )) - h.backgroundTasks.Add(1) - go func() { - defer h.backgroundTasks.Done() - + h.backgroundTasks.Go(func() { // Cannot use parent-context itself here, have to create independent instance // to be able to continue working in background. subscriptionCtx, cancel := context.WithCancel(h.ctx) @@ -448,7 +445,7 @@ func (h *AttesterHandler) fetchAndProcessDuties(ctx context.Context, logger *zap if err := h.beaconNode.SubmitBeaconCommitteeSubscriptions(subscriptionCtx, subscriptions); err != nil { h.logger.Error("failed to submit beacon committee subscription", zap.Error(err)) } - }() + }) span.SetStatus(codes.Ok, "") return nil diff --git a/operator/duties/scheduler.go b/operator/duties/scheduler.go index ae6b4164c9..77486075d6 100644 --- a/operator/duties/scheduler.go +++ b/operator/duties/scheduler.go @@ -231,30 +231,22 @@ func (s *Scheduler) Start(ctx context.Context) error { // This call is blocking. handler.HandleInitialDuties(s.ctx) - s.backgroundTasks.Add(1) - go func() { - defer s.backgroundTasks.Done() + s.backgroundTasks.Go(func() { handler.HandleDuties(s.ctx) - }() + }) } - s.backgroundTasks.Add(1) - go func() { - defer s.backgroundTasks.Done() + s.backgroundTasks.Go(func() { indicesChangeFeed.FanOut(s.ctx, s.indicesChgCh) - }() + }) - s.backgroundTasks.Add(1) - go func() { - defer s.backgroundTasks.Done() + s.backgroundTasks.Go(func() { reorgEventsFeed.FanOut(s.ctx, s.reorgCh) - }() + }) - s.backgroundTasks.Add(1) - go func() { - defer s.backgroundTasks.Done() + s.backgroundTasks.Go(func() { s.SlotTicker(s.ctx) - }() + }) s.logger.Info("duty scheduler has started") @@ -272,9 +264,7 @@ func (s *Scheduler) listenToHeadEvents(ctx context.Context) error { return fmt.Errorf("failed to subscribe to head events: %w", err) } - s.backgroundTasks.Add(1) - go func() { - defer s.backgroundTasks.Done() + s.backgroundTasks.Go(func() { for { select { case <-ctx.Done(): @@ -292,7 +282,7 @@ func (s *Scheduler) listenToHeadEvents(ctx context.Context) error { headEventHandler(ctx, headEvent) } } - }() + }) return nil } @@ -487,18 +477,14 @@ func (s *Scheduler) ExecuteDuties(ctx context.Context, duties []*spectypes.Valid } recordDutyScheduled(ctx, duty.RunnerRole(), slotDelay) - - s.backgroundTasks.Add(1) - go func() { - defer s.backgroundTasks.Done() - + s.backgroundTasks.Go(func() { // Cannot use parent-context itself here, have to create independent instance // to be able to continue working in background. dutyCtx, cancel := context.WithDeadline(s.ctx, dutyDeadline) defer cancel() s.dutyExecutor.ExecuteDuty(dutyCtx, logger, duty) - }() + }) } span.SetStatus(codes.Ok, "") @@ -540,9 +526,7 @@ func (s *Scheduler) ExecuteCommitteeDuties(ctx context.Context, duties committee recordDutyScheduled(ctx, duty.RunnerRole(), slotDelay) - s.backgroundTasks.Add(1) - go func() { - defer s.backgroundTasks.Done() + s.backgroundTasks.Go(func() { // Cannot use parent-context itself here, have to create independent instance // to be able to continue working in background. @@ -551,7 +535,7 @@ func (s *Scheduler) ExecuteCommitteeDuties(ctx context.Context, duties committee s.waitOneThirdIntoSlotOrValidBlock(duty.Slot) s.dutyExecutor.ExecuteCommitteeDuty(dutyCtx, logger, committee.id, duty) - }() + }) } span.SetStatus(codes.Ok, "") diff --git a/operator/duties/sync_committee.go b/operator/duties/sync_committee.go index cfaca99838..f555a7d3a0 100644 --- a/operator/duties/sync_committee.go +++ b/operator/duties/sync_committee.go @@ -333,10 +333,7 @@ func (h *SyncCommitteeHandler) fetchAndProcessDuties(ctx context.Context, epoch attribute.Int("ssv.validator.duty.subscriptions", len(subscriptions)), )) - h.backgroundTasks.Add(1) - go func() { - defer h.backgroundTasks.Done() - + h.backgroundTasks.Go(func() { // Cannot use parent-context itself here, have to create independent instance // to be able to continue working in background. subscriptionCtx, cancel := context.WithCancel(h.ctx) @@ -345,7 +342,7 @@ func (h *SyncCommitteeHandler) fetchAndProcessDuties(ctx context.Context, epoch if err := h.beaconNode.SubmitSyncCommitteeSubscriptions(subscriptionCtx, subscriptions); err != nil { h.logger.Error("failed to subscribe sync committee to subnet", zap.Error(err)) } - }() + }) span.SetStatus(codes.Ok, "") return nil diff --git a/operator/dutytracer/collector_bench_test.go b/operator/dutytracer/collector_bench_test.go index 26f3c47052..8757c53e9f 100644 --- a/operator/dutytracer/collector_bench_test.go +++ b/operator/dutytracer/collector_bench_test.go @@ -60,11 +60,9 @@ func BenchmarkTracer(b *testing.B) { var wg sync.WaitGroup for _, msg := range traces[:actualCount] { - wg.Add(1) - go func() { - defer wg.Done() + wg.Go(func() { _ = collector.Collect(b.Context(), msg, dummyVerify) - }() + }) } wg.Wait() } diff --git a/operator/validator/router_test.go b/operator/validator/router_test.go index 0ca2e9d0ea..b252864b86 100644 --- a/operator/validator/router_test.go +++ b/operator/validator/router_test.go @@ -27,10 +27,7 @@ func TestRouter(t *testing.T) { count := 0 var wg sync.WaitGroup - wg.Add(1) - go func() { - defer wg.Done() - + wg.Go(func() { cn := router.GetMessageChan() for msg := range cn { require.NotNil(t, msg) @@ -39,7 +36,7 @@ func TestRouter(t *testing.T) { return } } - }() + }) for i := 0; i < expectedCount; i++ { msg := &queue.SSVMessage{ diff --git a/protocol/v2/qbft/instance/process_msg_test.go b/protocol/v2/qbft/instance/process_msg_test.go index b0f8eb2d3e..bf4d9158f0 100644 --- a/protocol/v2/qbft/instance/process_msg_test.go +++ b/protocol/v2/qbft/instance/process_msg_test.go @@ -119,14 +119,12 @@ func TestProcessMsgConcurrentAccess(t *testing.T) { errs := make(chan error, workers) var wg sync.WaitGroup for i := 0; i < workers; i++ { - wg.Add(1) - go func() { - defer wg.Done() + wg.Go(func() { // Intentionally reuse the same message pointer to exercise dedup/idempotent // processing while ProcessMsg serializes handler execution through processMsgF. _, _, _, err := env.inst.ProcessMsg(context.Background(), zap.NewNop(), msg) errs <- err - }() + }) } wg.Wait() close(errs) diff --git a/protocol/v2/ssv/queue/queue_test.go b/protocol/v2/ssv/queue/queue_test.go index 0460c53e84..d9d0f1cef1 100644 --- a/protocol/v2/ssv/queue/queue_test.go +++ b/protocol/v2/ssv/queue/queue_test.go @@ -367,9 +367,7 @@ func benchmarkPriorityQueueParallel(b *testing.B, factory func() Queue, lossy bo pushedCount atomic.Int64 ) for i := 0; i < pushers; i++ { - pushersWg.Add(1) - go func() { - defer pushersWg.Done() + pushersWg.Go(func() { for m := range messageStream { if lossy { queue.TryPush(m) @@ -379,7 +377,7 @@ func benchmarkPriorityQueueParallel(b *testing.B, factory func() Queue, lossy bo pushedCount.Add(1) time.Sleep(time.Duration(rand.Intn(300)) * time.Microsecond) } - }() + }) } // Assert pushed messages. @@ -398,9 +396,7 @@ func benchmarkPriorityQueueParallel(b *testing.B, factory func() Queue, lossy bo popped := make(chan *SSVMessage, messageCount*2) poppingCtx, stopPopping := context.WithCancel(b.Context()) for i := 0; i < poppers; i++ { - poppersWg.Add(1) - go func() { - defer poppersWg.Done() + poppersWg.Go(func() { for { msg := queue.Pop(poppingCtx, NewMessagePrioritizer(mockState), FilterAny) if msg == nil { @@ -408,7 +404,7 @@ func benchmarkPriorityQueueParallel(b *testing.B, factory func() Queue, lossy bo } popped <- msg } - }() + }) } // Wait for pushed messages assertion. @@ -475,9 +471,7 @@ func BenchmarkPriorityQueue_Concurrent(b *testing.B) { var pushersWg sync.WaitGroup var pushed atomic.Int32 for i := 0; i < 16; i++ { - pushersWg.Add(1) - go func() { - defer pushersWg.Done() + pushersWg.Go(func() { for n := b.N; n > 0; n-- { select { case msg := <-msgs: @@ -486,7 +480,7 @@ func BenchmarkPriorityQueue_Concurrent(b *testing.B) { default: } } - }() + }) } pushersCtx, cancel := context.WithCancel(b.Context()) diff --git a/protocol/v2/ssv/runner/committee.go b/protocol/v2/ssv/runner/committee.go index d588664a24..50c47a7985 100644 --- a/protocol/v2/ssv/runner/committee.go +++ b/protocol/v2/ssv/runner/committee.go @@ -294,11 +294,8 @@ func (r *CommitteeRunner) ProcessConsensus(ctx context.Context, logger *zap.Logg }() for range workerCount { - wg.Add(1) - - go func() { - defer wg.Done() + wg.Go(func() { for validatorDuty := range dutiesCh { if ctx.Err() != nil { return @@ -351,7 +348,7 @@ func (r *CommitteeRunner) ProcessConsensus(ctx context.Context, logger *zap.Logg return } } - }() + }) } go func() { diff --git a/protocol/v2/ssv/validator/committee_queue_test.go b/protocol/v2/ssv/validator/committee_queue_test.go index 415e063d11..8728cd01e1 100644 --- a/protocol/v2/ssv/validator/committee_queue_test.go +++ b/protocol/v2/ssv/validator/committee_queue_test.go @@ -1613,12 +1613,10 @@ func TestQueueLoadAndSaturationScenarios(t *testing.T) { consumerCtx, consumerCancel := context.WithCancel(ctx) var consumerWg sync.WaitGroup - consumerWg.Add(1) - go func() { - defer consumerWg.Done() + consumerWg.Go(func() { committee.ConsumeQueue(consumerCtx, logger, q, processFn, committeeRunner) - }() + }) // Fill with filtered Prepare messages for i := 0; i < queueCapacity; i++ { @@ -1668,12 +1666,10 @@ func TestQueueLoadAndSaturationScenarios(t *testing.T) { // Restart consumption consumer2Ctx, consumer2Cancel := context.WithCancel(ctx2) var consumer2Wg sync.WaitGroup - consumer2Wg.Add(1) - go func() { - defer consumer2Wg.Done() + consumer2Wg.Go(func() { committee.ConsumeQueue(consumer2Ctx, logger, q, processFn, committeeRunner) - }() + }) // Observe how many of the old Prepares now drain timeout := time.After(2 * time.Second) diff --git a/registry/storage/shares_test.go b/registry/storage/shares_test.go index 16cc76f388..7639884a0b 100644 --- a/registry/storage/shares_test.go +++ b/registry/storage/shares_test.go @@ -472,9 +472,7 @@ func TestSharesStorage_HighContentionConcurrency(t *testing.T) { defer cancel() for i := 0; i < 100; i++ { for _, op := range []string{"add", "update", "remove1", "remove4", "read"} { - wg.Add(1) - go func() { - defer wg.Done() + wg.Go(func() { for ctx.Err() == nil { switch op { case "add": @@ -500,7 +498,7 @@ func TestSharesStorage_HighContentionConcurrency(t *testing.T) { _ = storage.ValidatorStore.Committees() } } - }() + }) } } wg.Wait() diff --git a/utils/hashmap/hashmap_test.go b/utils/hashmap/hashmap_test.go index 00db29efe6..967b6dd4ff 100644 --- a/utils/hashmap/hashmap_test.go +++ b/utils/hashmap/hashmap_test.go @@ -441,19 +441,15 @@ func TestGetOrInsertHangIssue67(t *testing.T) { var wg sync.WaitGroup key := "key" - wg.Add(1) - go func() { - defer wg.Done() + wg.Go(func() { m.GetOrSet(key, 9) m.Delete(key) - }() + }) - wg.Add(1) - go func() { - defer wg.Done() + wg.Go(func() { m.GetOrSet(key, 9) m.Delete(key) - }() + }) wg.Wait() } @@ -485,10 +481,7 @@ func TestIssue1682(t *testing.T) { var errs []error var mu sync.Mutex for i := 0; i < 10; i++ { - wwg.Add(1) - go func() { - defer wwg.Done() - + wwg.Go(func() { m := New[string, validatorStatus]() var wg sync.WaitGroup var attempted sync.WaitGroup @@ -558,7 +551,7 @@ func TestIssue1682(t *testing.T) { case <-ticker.C: } } - }() + }) } wwg.Wait() require.Empty(t, errs) diff --git a/utils/ttl/map_test.go b/utils/ttl/map_test.go index 187bb5b21d..f089439333 100644 --- a/utils/ttl/map_test.go +++ b/utils/ttl/map_test.go @@ -441,19 +441,15 @@ func TestGetOrInsertHangIssue67(t *testing.T) { var wg sync.WaitGroup key := "key" - wg.Add(1) - go func() { - defer wg.Done() + wg.Go(func() { m.GetOrSet(key, 9) m.Delete(key) - }() + }) - wg.Add(1) - go func() { - defer wg.Done() + wg.Go(func() { m.GetOrSet(key, 9) m.Delete(key) - }() + }) wg.Wait() } @@ -485,25 +481,21 @@ func TestIssue1682(t *testing.T) { var errs []error var mu sync.Mutex for i := 0; i < 10; i++ { - wwg.Add(1) - go func() { - defer wwg.Done() + wwg.Go(func() { m := New[string, validatorStatus](t.Context(), 1*time.Hour, 1*time.Hour) var wg sync.WaitGroup for _, cmtID := range cmtIDs { n := 50 + randSeed().Intn(200) for j := 0; j < n; j++ { - wg.Add(1) - go func() { - defer wg.Done() + wg.Go(func() { time.Sleep(time.Duration(randSeed().Intn(2000)) * time.Millisecond) _, found := m.GetOrSet(cmtID, validatorStatusSubscribing) time.Sleep(time.Duration(randSeed().Intn(200)) * time.Millisecond) if !found { m.Set(cmtID, validatorStatusSubscribed) } - }() + }) } } @@ -545,7 +537,7 @@ func TestIssue1682(t *testing.T) { case <-ticker.C: } } - }() + }) } wwg.Wait() require.Empty(t, errs)