From 197b42dc27a8a6652c058611b88f32e7a0a2561f Mon Sep 17 00:00:00 2001 From: ssd04 Date: Tue, 5 May 2026 13:44:39 +0300 Subject: [PATCH 1/3] add context for managed signatures broadcast --- consensus/spos/bls/v2/subroundSignature.go | 34 +++- .../spos/bls/v2/subroundSignature_test.go | 162 ++++++++++++++++++ 2 files changed, 189 insertions(+), 7 deletions(-) diff --git a/consensus/spos/bls/v2/subroundSignature.go b/consensus/spos/bls/v2/subroundSignature.go index 69873cce8cf..a8399e68eb4 100644 --- a/consensus/spos/bls/v2/subroundSignature.go +++ b/consensus/spos/bls/v2/subroundSignature.go @@ -217,15 +217,15 @@ func (sr *subroundSignature) doSignatureConsensusCheck() bool { return false } -func (sr *subroundSignature) waitForSingatures() { +func (sr *subroundSignature) waitForSingatures( + timeLeft time.Duration, +) { done := make(chan struct{}) go func() { sr.SignaturesWaitGroup().Wait() close(done) }() - timeLeft := sr.RoundHandler().RemainingTime(sr.RoundHandler().TimeStamp(), time.Duration(sr.EndTime())) - select { case <-done: sr.SignaturesCtxCancel() @@ -238,7 +238,12 @@ func (sr *subroundSignature) waitForSingatures() { func (sr *subroundSignature) doSignatureJobForManagedKeys(ctx context.Context) bool { // wait for optimistic signatures creation to finish - sr.waitForSingatures() + timeLeft := sr.RoundHandler().RemainingTime(sr.RoundHandler().TimeStamp(), time.Duration(sr.EndTime())) + + sigCtx, cancel := context.WithTimeout(ctx, timeLeft) + defer cancel() + + sr.waitForSingatures(timeLeft) numMultiKeysSignaturesSent := int32(0) sentSigForAllKeys := atomicCore.Flag{} @@ -256,24 +261,32 @@ func (sr *subroundSignature) doSignatureJobForManagedKeys(ctx context.Context) b continue } + select { + case <-sigCtx.Done(): + log.Debug("doSignatureJobForManagedKeys: timeout while sending signatures") + return false + default: + } + err := checkGoRoutinesThrottler(ctx, sr.signatureThrottler) if err != nil { + log.Debug("doSignatureJobForManagedKeys.checkGoRoutinesThrottler", "err", err) return false } sr.signatureThrottler.StartProcessing() wg.Add(1) - go func(ctx context.Context, idx int, pk string) { + go func(sigCtx context.Context, idx int, pk string) { defer sr.signatureThrottler.EndProcessing() - signatureSent := sr.sendSignatureForManagedKey(ctx, idx, pk) + signatureSent := sr.sendSignatureForManagedKey(sigCtx, idx, pk) if signatureSent { atomic.AddInt32(&numMultiKeysSignaturesSent, 1) } else { sentSigForAllKeys.SetValue(false) } wg.Done() - }(ctx, idx, pk) + }(sigCtx, idx, pk) } wg.Wait() @@ -290,6 +303,13 @@ func (sr *subroundSignature) sendSignatureForManagedKey(ctx context.Context, idx nonce := sr.GetHeader().GetNonce() currentHash := sr.GetData() + select { + case <-ctx.Done(): + log.Debug("sendSignatureForManagedKey: timeout while sending signature", "idx", idx, "pk", pk) + return false + default: + } + signatureShare, err := sr.SigningHandler().SignatureShare(uint16(idx)) if err != nil { // signature share not found (optimistic signature share creation was not triggered) diff --git a/consensus/spos/bls/v2/subroundSignature_test.go b/consensus/spos/bls/v2/subroundSignature_test.go index 76bd07bf1f8..9868ada68dd 100644 --- a/consensus/spos/bls/v2/subroundSignature_test.go +++ b/consensus/spos/bls/v2/subroundSignature_test.go @@ -950,6 +950,168 @@ func TestSubroundSignature_DoSignatureJobForManagedKeys(t *testing.T) { assert.Equal(t, expectedBroadcastMap, signaturesBroadcast) }) + t.Run("should work until context is cancelled", func(t *testing.T) { + t.Parallel() + + ctx, cancel := context.WithCancel(context.TODO()) + defer cancel() + + container := consensusMocks.InitConsensusCore() + enableEpochsHandler := &enableEpochsHandlerMock.EnableEpochsHandlerStub{ + IsFlagEnabledInEpochCalled: func(flag core.EnableEpochFlag, epoch uint32) bool { + return flag == common.AndromedaFlag + }, + } + container.SetEnableEpochsHandler(enableEpochsHandler) + + numCalls := 0 + signingHandler := &consensusMocks.SigningHandlerStub{ + SignatureShareCalled: func(index uint16) ([]byte, error) { + return nil, expectedErr + }, + CreateSignatureShareForPublicKeyCalled: func(_ context.Context, msg []byte, index uint16, epoch uint32, publicKeyBytes []byte) ([]byte, error) { + numCalls++ + if numCalls >= 3 { + cancel() + } + + return []byte("SIG"), nil + }, + } + container.SetSigningHandler(signingHandler) + consensusState := initializers.InitConsensusStateWithKeysHandler( + &testscommon.KeysHandlerStub{ + IsKeyManagedByCurrentNodeCalled: func(pkBytes []byte) bool { + return true + }, + }, + ) + ch := make(chan bool, 1) + + sr, _ := spos.NewSubround( + bls.SrBlock, + bls.SrSignature, + bls.SrEndRound, + roundTimeDuration, + 0.7, + 0.85, + "(SIGNATURE)", + consensusState, + ch, + executeStoredMessages, + container, + chainID, + currentPid, + &statusHandler.AppStatusHandlerStub{}, + ) + + signatureSentForPks := make(map[string]struct{}) + mutex := sync.Mutex{} + srSignature, _ := v2.NewSubroundSignature( + sr, + &statusHandler.AppStatusHandlerStub{}, + &testscommon.SentSignatureTrackerStub{ + SignatureSentCalled: func(pkBytes []byte) { + mutex.Lock() + signatureSentForPks[string(pkBytes)] = struct{}{} + mutex.Unlock() + }, + }, + &consensusMocks.SposWorkerMock{}, + &dataRetrieverMock.ThrottlerStub{}, + ) + + sr.SetHeader(&block.Header{}) + signaturesBroadcast := make(map[string]int) + container.SetBroadcastMessenger(&consensusMocks.BroadcastMessengerMock{ + BroadcastConsensusMessageCalled: func(message *consensus.Message) error { + mutex.Lock() + signaturesBroadcast[string(message.PubKey)]++ + mutex.Unlock() + return nil + }, + }) + + sr.SetSelfPubKey("OTHER") + + r := srSignature.DoSignatureJobForManagedKeys(ctx) + assert.False(t, r) + + numFinishedJobs := 0 + for _, pk := range sr.ConsensusGroup() { + isJobDone, err := sr.JobDone(pk, bls.SrSignature) + assert.NoError(t, err) + + if isJobDone { + numFinishedJobs++ + } + } + assert.Equal(t, 3, numFinishedJobs) + + assert.Equal(t, 3, len(signaturesBroadcast)) + }) + + t.Run("context done should return early", func(t *testing.T) { + t.Parallel() + + container := consensusMocks.InitConsensusCore() + enableEpochsHandler := &enableEpochsHandlerMock.EnableEpochsHandlerStub{ + IsFlagEnabledInEpochCalled: func(flag core.EnableEpochFlag, epoch uint32) bool { + return flag == common.AndromedaFlag + }, + } + container.SetEnableEpochsHandler(enableEpochsHandler) + + consensusState := initializers.InitConsensusStateWithKeysHandler( + &testscommon.KeysHandlerStub{ + IsKeyManagedByCurrentNodeCalled: func(pkBytes []byte) bool { + return true + }, + }, + ) + ch := make(chan bool, 1) + + sr, _ := spos.NewSubround( + bls.SrBlock, + bls.SrSignature, + bls.SrEndRound, + roundTimeDuration, + 0.7, + 0.85, + "(SIGNATURE)", + consensusState, + ch, + executeStoredMessages, + container, + chainID, + currentPid, + &statusHandler.AppStatusHandlerStub{}, + ) + + srSignature, _ := v2.NewSubroundSignature( + sr, + &statusHandler.AppStatusHandlerStub{}, + &testscommon.SentSignatureTrackerStub{}, + &consensusMocks.SposWorkerMock{}, + &dataRetrieverMock.ThrottlerStub{}, + ) + + sr.SetHeader(&block.Header{}) + sr.SetSelfPubKey("OTHER") + + ctx, cancel := context.WithCancel(context.TODO()) + cancel() + + r := srSignature.DoSignatureJobForManagedKeys(ctx) + assert.False(t, r) + + for _, pk := range sr.ConsensusGroup() { + isJobDone, err := sr.JobDone(pk, bls.SrSignature) + assert.NoError(t, err) + assert.False(t, isJobDone) + } + }) + t.Run("should fail", func(t *testing.T) { t.Parallel() container := consensusMocks.InitConsensusCore() From d20fca5459a933bffc19b769191ee82e0a7b45c6 Mon Sep 17 00:00:00 2001 From: ssd04 Date: Tue, 5 May 2026 13:49:26 +0300 Subject: [PATCH 2/3] update tests --- consensus/spos/bls/v2/subroundSignature_test.go | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/consensus/spos/bls/v2/subroundSignature_test.go b/consensus/spos/bls/v2/subroundSignature_test.go index 9868ada68dd..df91c241b34 100644 --- a/consensus/spos/bls/v2/subroundSignature_test.go +++ b/consensus/spos/bls/v2/subroundSignature_test.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "sync" + "sync/atomic" "testing" "github.com/multiversx/mx-chain-core-go/core" @@ -964,15 +965,16 @@ func TestSubroundSignature_DoSignatureJobForManagedKeys(t *testing.T) { } container.SetEnableEpochsHandler(enableEpochsHandler) - numCalls := 0 + numCalls := int32(0) signingHandler := &consensusMocks.SigningHandlerStub{ SignatureShareCalled: func(index uint16) ([]byte, error) { return nil, expectedErr }, CreateSignatureShareForPublicKeyCalled: func(_ context.Context, msg []byte, index uint16, epoch uint32, publicKeyBytes []byte) ([]byte, error) { - numCalls++ - if numCalls >= 3 { + atomic.AddInt32(&numCalls, 1) + if atomic.LoadInt32(&numCalls) > 3 { cancel() + return nil, expectedErr } return []byte("SIG"), nil From 7faae14d99e01703dc051d37b33575b79cb39468 Mon Sep 17 00:00:00 2001 From: ssd04 Date: Tue, 5 May 2026 16:04:01 +0300 Subject: [PATCH 3/3] log for aggregated sig --- consensus/spos/bls/v2/subroundEndRound.go | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/consensus/spos/bls/v2/subroundEndRound.go b/consensus/spos/bls/v2/subroundEndRound.go index 2384bc15944..9a5f9d8b626 100644 --- a/consensus/spos/bls/v2/subroundEndRound.go +++ b/consensus/spos/bls/v2/subroundEndRound.go @@ -387,6 +387,11 @@ func (sr *subroundEndRound) sendProof() (bool, error) { return false, err } + log.Debug("step 3: aggregate signature has been created", + "PubKeysBitmap", bitmap, + "AggregateSignature", sig, + ) + // Re-check grace period after aggregation which may have been slow under CPU contention if !sr.shouldSendProof() { return false, nil @@ -697,8 +702,6 @@ func (sr *subroundEndRound) createAndBroadcastProof( } log.Debug("step 3: block header proof has been sent", - "PubKeysBitmap", bitmap, - "AggregateSignature", signature, "proof sender", hex.EncodeToString([]byte(sender))) return nil