Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 2 additions & 4 deletions cli/operator/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -1357,11 +1357,9 @@ func initSlotPruning(ctx context.Context, stores *ibftstorage.ParticipantStores,

// async perform initial slot gc
_ = stores.Each(func(_ spectypes.BeaconRole, store ibftstorage.ParticipantStore) error {
wg.Add(1)
go func() {
defer wg.Done()
wg.Go(func() {
store.Prune(ctx, threshold)
}()
})
return nil
})

Expand Down
6 changes: 2 additions & 4 deletions eth/executionclient/execution_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1016,9 +1016,7 @@ func TestSubscribeFilterLogs(t *testing.T) {
// Create a goroutine to collect logs
var receivedLogs []ethtypes.Log
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
wg.Go(func() {
for i := 0; i < 3; i++ {
select {
case log := <-logCh:
Expand All @@ -1030,7 +1028,7 @@ func TestSubscribeFilterLogs(t *testing.T) {
return
}
}
}()
})

// Create blocks with transactions
err = env.createBlocksWithLogs(contract, 3, 10*time.Millisecond)
Expand Down
7 changes: 2 additions & 5 deletions hprobe/prober.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,17 +53,14 @@ func (p *HealthProber) ProbeAll(ctx context.Context) error {
errsCh := make(chan error)

p.components.Range(func(name string, n pComponent) bool {
wg.Add(1)
go func() {
defer wg.Done()

wg.Go(func() {
err := p.probeComponent(ctx, n)
if err != nil {
// Relay the error and quit early.
errsCh <- fmt.Errorf("probe component %s: %w", name, err)
cancel()
}
}()
})
return true
})

Expand Down
6 changes: 2 additions & 4 deletions ibft/storage/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,11 +211,9 @@ func TestSlotCleanupJob(t *testing.T) {

// run normal gc
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
wg.Go(func() {
storage.PruneContinuously(ctx, tickerProv, 1)
}()
})

mockTimeChan <- time.Now()
mockSlotChan <- phase0.Slot(5)
Expand Down
26 changes: 8 additions & 18 deletions network/p2p/p2p_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,27 +41,23 @@ func TestCurrentSubnetsConcurrentAccess(t *testing.T) {
var wg sync.WaitGroup
start := make(chan struct{})

wg.Add(1)
go func() {
defer wg.Done()
wg.Go(func() {
<-start
for i := uint64(0); i < 5000; i++ {
subnets := commons.ZeroSubnets
subnets.Set(i % commons.SubnetsCount)
n.setCurrentSubnets(subnets)
}
}()
})

wg.Add(1)
go func() {
defer wg.Done()
wg.Go(func() {
<-start
for i := 0; i < 5000; i++ {
subnets := n.ActiveSubnets()
_ = subnets.ActiveCount()
_ = subnets.StringHex()
}
}()
})

close(start)
wg.Wait()
Expand Down Expand Up @@ -107,10 +103,8 @@ func TestP2pNetwork_SubscribeBroadcast(t *testing.T) {
broadcastErrCh <- err
}
}
wg.Add(1)

go func() {
defer wg.Done()
wg.Go(func() {
msgCommittee1 := generateCommitteeMsg(spectestingutils.Testing4SharesSet(), 1)
msgCommittee3 := generateCommitteeMsg(spectestingutils.Testing4SharesSet(), 3)
msgProposer := generateValidatorMsg(spectestingutils.Testing4SharesSet(), 4, spectypes.RoleProposer)
Expand All @@ -128,13 +122,9 @@ func TestP2pNetwork_SubscribeBroadcast(t *testing.T) {
recordBroadcastErr(node2.Broadcast(msgSyncCommitteeContribution.SSVMessage.GetID(), msgSyncCommitteeContribution))
<-time.After(time.Millisecond * 20)
recordBroadcastErr(node1.Broadcast(msgRoleVoluntaryExit.SSVMessage.GetID(), msgRoleVoluntaryExit))
}()

wg.Add(1)

go func() {
defer wg.Done()
})

wg.Go(func() {
msgCommittee1 := generateCommitteeMsg(spectestingutils.Testing4SharesSet(), 1)
msgCommittee2 := generateCommitteeMsg(spectestingutils.Testing4SharesSet(), 2)
msgCommittee3 := generateCommitteeMsg(spectestingutils.Testing4SharesSet(), 3)
Expand All @@ -151,7 +141,7 @@ func TestP2pNetwork_SubscribeBroadcast(t *testing.T) {
recordBroadcastErr(node1.Broadcast(msgProposer.SSVMessage.GetID(), msgProposer))
recordBroadcastErr(node1.Broadcast(msgSyncCommitteeContribution.SSVMessage.GetID(), msgSyncCommitteeContribution))
recordBroadcastErr(node2.Broadcast(msgRoleVoluntaryExit.SSVMessage.GetID(), msgRoleVoluntaryExit))
}()
})

wg.Wait()
close(broadcastErrCh)
Expand Down
6 changes: 2 additions & 4 deletions network/topics/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,9 +156,7 @@ func baseTest(t *testing.T, ctx context.Context, logger *zap.Logger, peers []*P,
wg.Wait()

// let the messages propagate
wg.Add(1)
go func() {
defer wg.Done()
wg.Go(func() {
// check number of peers and messages
for i := 0; i < nValidators; i++ {
wg.Add(1)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you think about using a separate innerWg for these goroutines? Seems that calling wg.Add(1) from inside a closure that's itself counted by the same wg is fragile — it works today because the for-loop always runs to completion before the outer goroutine returns, but a future early-return (e.g. on ctx cancel) could let the trailing wg.Wait() unblock before all inner Add calls land, racing any subsequent reuse of the same wg.

Expand All @@ -178,7 +176,7 @@ func baseTest(t *testing.T, ctx context.Context, logger *zap.Logger, peers []*P,
}
}(cids[i])
}
}()
})
wg.Wait()

t.Log("unsubscribing")
Expand Down
7 changes: 2 additions & 5 deletions operator/duties/attester.go
Original file line number Diff line number Diff line change
Expand Up @@ -436,10 +436,7 @@ func (h *AttesterHandler) fetchAndProcessDuties(ctx context.Context, logger *zap
attribute.Int("ssv.validator.duty.subscriptions", len(subscriptions)),
))

h.backgroundTasks.Add(1)
go func() {
defer h.backgroundTasks.Done()

h.backgroundTasks.Go(func() {
// Cannot use parent-context itself here, have to create independent instance
// to be able to continue working in background.
subscriptionCtx, cancel := context.WithCancel(h.ctx)
Expand All @@ -448,7 +445,7 @@ func (h *AttesterHandler) fetchAndProcessDuties(ctx context.Context, logger *zap
if err := h.beaconNode.SubmitBeaconCommitteeSubscriptions(subscriptionCtx, subscriptions); err != nil {
h.logger.Error("failed to submit beacon committee subscription", zap.Error(err))
}
}()
})

span.SetStatus(codes.Ok, "")
return nil
Expand Down
44 changes: 14 additions & 30 deletions operator/duties/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -231,30 +231,22 @@
// This call is blocking.
handler.HandleInitialDuties(s.ctx)

s.backgroundTasks.Add(1)
go func() {
defer s.backgroundTasks.Done()
s.backgroundTasks.Go(func() {
handler.HandleDuties(s.ctx)
}()
})
}

s.backgroundTasks.Add(1)
go func() {
defer s.backgroundTasks.Done()
s.backgroundTasks.Go(func() {
indicesChangeFeed.FanOut(s.ctx, s.indicesChgCh)
}()
})

s.backgroundTasks.Add(1)
go func() {
defer s.backgroundTasks.Done()
s.backgroundTasks.Go(func() {
reorgEventsFeed.FanOut(s.ctx, s.reorgCh)
}()
})

s.backgroundTasks.Add(1)
go func() {
defer s.backgroundTasks.Done()
s.backgroundTasks.Go(func() {
s.SlotTicker(s.ctx)
}()
})

s.logger.Info("duty scheduler has started")

Expand All @@ -272,9 +264,7 @@
return fmt.Errorf("failed to subscribe to head events: %w", err)
}

s.backgroundTasks.Add(1)
go func() {
defer s.backgroundTasks.Done()
s.backgroundTasks.Go(func() {
for {
select {
case <-ctx.Done():
Expand All @@ -292,7 +282,7 @@
headEventHandler(ctx, headEvent)
}
}
}()
})

return nil
}
Expand Down Expand Up @@ -487,18 +477,14 @@
}

recordDutyScheduled(ctx, duty.RunnerRole(), slotDelay)

s.backgroundTasks.Add(1)
go func() {
defer s.backgroundTasks.Done()

s.backgroundTasks.Go(func() {
// Cannot use parent-context itself here, have to create independent instance
// to be able to continue working in background.
dutyCtx, cancel := context.WithDeadline(s.ctx, dutyDeadline)
defer cancel()

s.dutyExecutor.ExecuteDuty(dutyCtx, logger, duty)
}()
})
}

span.SetStatus(codes.Ok, "")
Expand Down Expand Up @@ -540,9 +526,7 @@

recordDutyScheduled(ctx, duty.RunnerRole(), slotDelay)

s.backgroundTasks.Add(1)
go func() {
defer s.backgroundTasks.Done()
s.backgroundTasks.Go(func() {

Check failure on line 529 in operator/duties/scheduler.go

View workflow job for this annotation

GitHub Actions / lint

unnecessary leading newline (whitespace)

// Cannot use parent-context itself here, have to create independent instance
// to be able to continue working in background.
Expand All @@ -551,7 +535,7 @@

s.waitOneThirdIntoSlotOrValidBlock(duty.Slot)
s.dutyExecutor.ExecuteCommitteeDuty(dutyCtx, logger, committee.id, duty)
}()
})
}

span.SetStatus(codes.Ok, "")
Comment thread
purelualight marked this conversation as resolved.
Expand Down
7 changes: 2 additions & 5 deletions operator/duties/sync_committee.go
Original file line number Diff line number Diff line change
Expand Up @@ -333,10 +333,7 @@ func (h *SyncCommitteeHandler) fetchAndProcessDuties(ctx context.Context, epoch
attribute.Int("ssv.validator.duty.subscriptions", len(subscriptions)),
))

h.backgroundTasks.Add(1)
go func() {
defer h.backgroundTasks.Done()

h.backgroundTasks.Go(func() {
// Cannot use parent-context itself here, have to create independent instance
// to be able to continue working in background.
subscriptionCtx, cancel := context.WithCancel(h.ctx)
Expand All @@ -345,7 +342,7 @@ func (h *SyncCommitteeHandler) fetchAndProcessDuties(ctx context.Context, epoch
if err := h.beaconNode.SubmitSyncCommitteeSubscriptions(subscriptionCtx, subscriptions); err != nil {
h.logger.Error("failed to subscribe sync committee to subnet", zap.Error(err))
}
}()
})

span.SetStatus(codes.Ok, "")
return nil
Expand Down
6 changes: 2 additions & 4 deletions operator/dutytracer/collector_bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,11 +60,9 @@ func BenchmarkTracer(b *testing.B) {

var wg sync.WaitGroup
for _, msg := range traces[:actualCount] {
wg.Add(1)
go func() {
defer wg.Done()
wg.Go(func() {
_ = collector.Collect(b.Context(), msg, dummyVerify)
}()
})
}
wg.Wait()
}
Expand Down
7 changes: 2 additions & 5 deletions operator/validator/router_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,7 @@ func TestRouter(t *testing.T) {
count := 0

var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()

wg.Go(func() {
cn := router.GetMessageChan()
for msg := range cn {
require.NotNil(t, msg)
Expand All @@ -39,7 +36,7 @@ func TestRouter(t *testing.T) {
return
}
}
}()
})

for i := 0; i < expectedCount; i++ {
msg := &queue.SSVMessage{
Expand Down
6 changes: 2 additions & 4 deletions protocol/v2/qbft/instance/process_msg_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,14 +119,12 @@ func TestProcessMsgConcurrentAccess(t *testing.T) {
errs := make(chan error, workers)
var wg sync.WaitGroup
for i := 0; i < workers; i++ {
wg.Add(1)
go func() {
defer wg.Done()
wg.Go(func() {
// Intentionally reuse the same message pointer to exercise dedup/idempotent
// processing while ProcessMsg serializes handler execution through processMsgF.
_, _, _, err := env.inst.ProcessMsg(context.Background(), zap.NewNop(), msg)
errs <- err
}()
})
}
wg.Wait()
close(errs)
Expand Down
Loading
Loading