Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 2 additions & 8 deletions message/validation/consensus_validation.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
spectypes "github.com/ssvlabs/ssv-spec/types"

"github.com/ssvlabs/ssv/protocol/v2/message"
"github.com/ssvlabs/ssv/protocol/v2/qbft"
"github.com/ssvlabs/ssv/protocol/v2/qbft/roundtimer"
ssvtypes "github.com/ssvlabs/ssv/protocol/v2/types"
)
Expand Down Expand Up @@ -116,15 +117,8 @@ func (mv *messageValidator) validateConsensusMessageSemantics(
return ErrPrepareOrCommitWithFullData
}

hashedFullData, err := specqbft.HashDataRoot(signedSSVMessage.FullData)
if err != nil {
e := ErrFullDataHash
e.innerErr = err
return e
}

// Rule: Full data hash must match root
if hashedFullData != consensusMessage.Root {
if qbft.HashDataRoot(signedSSVMessage.FullData) != consensusMessage.Root {
return ErrInvalidHash
}
}
Expand Down
1 change: 0 additions & 1 deletion message/validation/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,6 @@ var (
ErrSignersNotSorted = Error{text: "signers are not sorted", reject: true}
ErrInconsistentSigners = Error{text: "signer is not expected", reject: true}
ErrInvalidHash = Error{text: "root doesn't match full data hash", reject: true}
ErrFullDataHash = Error{text: "couldn't hash root", reject: true}
ErrUndecodableMessageData = Error{text: "message data could not be decoded", reject: true}
ErrEventMessage = Error{text: "unexpected event message", reject: true}
ErrUnknownSSVMessageType = Error{text: "unknown SSV message type", reject: true}
Expand Down
4 changes: 2 additions & 2 deletions message/validation/validation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import (
"github.com/ssvlabs/ssv/operator/duties/dutystore"
"github.com/ssvlabs/ssv/operator/storage"
"github.com/ssvlabs/ssv/protocol/v2/message"
"github.com/ssvlabs/ssv/protocol/v2/qbft"
"github.com/ssvlabs/ssv/protocol/v2/qbft/roundtimer"
ssvtypes "github.com/ssvlabs/ssv/protocol/v2/types"
registrystorage "github.com/ssvlabs/ssv/registry/storage"
Expand Down Expand Up @@ -1559,8 +1560,7 @@ func Test_ValidateSSVMessage(t *testing.T) {

anotherFullData := []byte{1}
signedSSVMessage = generateSignedMessage(ks, committeeIdentifier, slot, func(message *specqbft.Message) {
message.Root, err = specqbft.HashDataRoot(anotherFullData)
require.NoError(t, err)
message.Root = qbft.HashDataRoot(anotherFullData)
})
signedSSVMessage.FullData = anotherFullData

Expand Down
6 changes: 2 additions & 4 deletions protocol/v2/qbft/controller/decided.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
spectypes "github.com/ssvlabs/ssv-spec/types"
"go.uber.org/zap"

"github.com/ssvlabs/ssv/protocol/v2/qbft"
"github.com/ssvlabs/ssv/protocol/v2/qbft/instance"
"github.com/ssvlabs/ssv/protocol/v2/ssv"
)
Expand Down Expand Up @@ -97,10 +98,7 @@ func (c *Controller) ValidateDecided(msg *specqbft.ProcessingMessage) error {
return fmt.Errorf("invalid decided msg: %w", err)
}

r, err := specqbft.HashDataRoot(msg.SignedMessage.FullData)
if err != nil {
return fmt.Errorf("could not hash input data: %w", err)
}
r := qbft.HashDataRoot(msg.SignedMessage.FullData)
if !bytes.Equal(r[:], msg.QBFTMessage.Root[:]) {
return spectypes.NewError(spectypes.RootHashInvalidErrorCode, "H(data) != root")
}
Expand Down
27 changes: 13 additions & 14 deletions protocol/v2/qbft/instance/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,12 +121,7 @@ func (i *Instance) Start(
// TODO align spec to add else to avoid broadcast errored proposal
}

startValueRoot, err := specqbft.HashDataRoot(i.StartValue)
if err != nil {
logger.Warn("❗ failed to hash instance start value", zap.Error(err))
span.SetStatus(codes.Error, err.Error())
return
}
startValueRoot := qbft.HashDataRoot(i.StartValue)
logger = logger.With(zap.String("qbft_start_value_root", hex.EncodeToString(startValueRoot[:])))

const eventMsg = "📢 leader broadcasting proposal message"
Expand Down Expand Up @@ -172,8 +167,8 @@ func (i *Instance) MarkIrrelevant() {
}

func (i *Instance) Broadcast(msg *spectypes.SignedSSVMessage) error {
if !i.CanProcessMessages() {
return spectypes.NewError(spectypes.InstanceStoppedProcessingMessagesErrorCode, "instance stopped processing messages")
if !i.IsRelevant() {
return spectypes.NewError(spectypes.InstanceStoppedProcessingMessagesErrorCode, "instance is no longer considered relevant")
}

return i.GetConfig().GetNetwork().Broadcast(msg.SSVMessage.GetID(), msg)
Expand All @@ -191,8 +186,8 @@ func allSigners(all []*specqbft.ProcessingMessage) []spectypes.OperatorID {
// The returned bool/value pair reports whether this call newly decided the
// instance. Callers that need the post-call state should inspect State/IsDecided.
func (i *Instance) ProcessMsg(ctx context.Context, logger *zap.Logger, msg *specqbft.ProcessingMessage) (decided bool, decidedValue []byte, aggregatedCommit *spectypes.SignedSSVMessage, err error) {
if !i.CanProcessMessages() {
return false, nil, nil, spectypes.NewError(spectypes.InstanceStoppedProcessingMessagesErrorCode, "instance stopped processing messages")
if !i.IsRelevant() {
return false, nil, nil, spectypes.NewError(spectypes.InstanceStoppedProcessingMessagesErrorCode, "instance is no longer considered relevant")
}

if err := i.BaseMsgValidation(msg); err != nil {
Expand Down Expand Up @@ -304,12 +299,16 @@ func (i *Instance) Decode(data []byte) error {
return json.Unmarshal(data, &i)
}

// bumpToRound sets round and sends current round metrics.
// bumpToRound pushes this instance to a higher round, also scheduling a timeout for it.
func (i *Instance) bumpToRound(round specqbft.Round) {
i.State.Round = round
if round > i.State.Round {
i.State.ProposalAcceptedForCurrentRound = nil
i.State.Round = round
i.roundTimer.TimeoutForRound(round)
}
}

// CanProcessMessages will return true if instance can process messages
func (i *Instance) CanProcessMessages() bool {
// IsRelevant will return true if instance can process messages
func (i *Instance) IsRelevant() bool {
return !i.markedIrrelevant && i.State.Round < i.config.GetCutOffRound()
}
1 change: 0 additions & 1 deletion protocol/v2/qbft/instance/observability.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ type roundChangeReason string
const (
reasonTimeout roundChangeReason = "timeout"
reasonPartialQuorum roundChangeReason = "partial-quorum"
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you think about keeping reasonJustified? Once the leader-path round-bump issue (open thread on round_change.go) is addressed and a real round change happens in the justified path, there will be no labeled metric for it. Operators currently can distinguish 'round changed because of justified RC quorum' from 'timeout' / 'partial-quorum' — losing that makes it harder to diagnose why a cluster keeps round-changing.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fair concern. After digging in, this metric was effectively double-counting in pre-PR code: by the time the leader-on-justified-quorum branch fired, State.Round had already been advanced (and recorded as a round change) via reasonTimeout or reasonPartialQuorum. The reasoning:

  1. RoundChangeContainer is populated only inside uponRoundChange:34 — single call site (grep-verified).
  2. Inside uponRoundChange, partial-quorum is checked after AddFirstMsgForSignerAndRound on every call.
  3. Partial-quorum threshold (f+1) is strictly less than full quorum (2f+1) by definition.
  4. processMsgF.Run() serializes processing.

So as RCs stream in, the (f+1)-th always triggers uponChangeRoundPartialQuorum → bumps State.Round — before the (2f+1)-th arrives and the leader branch fires. By the time the leader branch executes, bumpToRound(newRound) is always a no-op (State.Round == newRound already). No real round change happens in this branch.

Soft confirmation: if this future-round leader case were a real scenario in normal flow, ssv-spec tests would have a scenario for it — they don't. Leaving the metric removed.

(Julien's bumpToRound fix in 7ddb771 is still worth keeping for defense-in-depth against future refactors, but the metric would just be a perpetual zero.)

reasonJustified roundChangeReason = "justified"
)

var (
Expand Down
6 changes: 2 additions & 4 deletions protocol/v2/qbft/instance/prepare.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"go.uber.org/zap"

"github.com/ssvlabs/ssv/observability/log/fields"
"github.com/ssvlabs/ssv/protocol/v2/qbft"
ssvtypes "github.com/ssvlabs/ssv/protocol/v2/types"
)

Expand Down Expand Up @@ -70,10 +71,7 @@ func (i *Instance) getRoundChangeJustification() ([]*specqbft.ProcessingMessage,
return nil, nil
}

r, err := specqbft.HashDataRoot(i.State.LastPreparedValue)
if err != nil {
return nil, fmt.Errorf("could not hash input data: %w", err)
}
r := qbft.HashDataRoot(i.State.LastPreparedValue)

prepareMsgs := i.State.PrepareContainer.MessagesForRound(i.State.LastPreparedRound)
ret := make([]*specqbft.ProcessingMessage, 0)
Expand Down
43 changes: 16 additions & 27 deletions protocol/v2/qbft/instance/proposal.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
spectypes "github.com/ssvlabs/ssv-spec/types"
"go.uber.org/zap"

"github.com/ssvlabs/ssv/protocol/v2/qbft"
ssvtypes "github.com/ssvlabs/ssv/protocol/v2/types"
)

Expand All @@ -28,24 +29,21 @@ func (i *Instance) uponProposal(ctx context.Context, logger *zap.Logger, msg *sp

logger.Debug("📬 got proposal message")

i.State.ProposalAcceptedForCurrentRound = msg

currentRound := i.State.Round
msgRound := msg.QBFTMessage.Round

// A future justified proposal should bump us into future round and reset the round timer.
if msgRound > i.State.Round {
i.roundTimer.TimeoutForRound(msgRound)
}
i.bumpToRound(msgRound)

i.metrics.EndStage(ctx, msgRound)
i.metrics.EndStage(ctx, currentRound)
i.metrics.StartStage(stagePrepare)

r, err := specqbft.HashDataRoot(msg.SignedMessage.FullData)
if err != nil {
return fmt.Errorf("could not hash input data: %w", err)
}
// A future justified proposal should move us into the future round, hence we try to bump the round here.
// Always move on to the message-round. The round-change message broadcast is a best-effort thing, the QBFT
// cluster as a whole can progress further even if our round-change message cannot be created/broadcast
// for whatever reason.
i.bumpToRound(msgRound)

i.State.ProposalAcceptedForCurrentRound = msg

r := qbft.HashDataRoot(msg.SignedMessage.FullData)
prepare, err := i.CreatePrepare(msgRound, r)
if err != nil {
return fmt.Errorf("could not create prepare msg: %w", err)
Expand Down Expand Up @@ -83,10 +81,7 @@ func (i *Instance) isValidProposal(msg *specqbft.ProcessingMessage) error {
}

// verify full data integrity
r, err := specqbft.HashDataRoot(msg.SignedMessage.FullData)
if err != nil {
return fmt.Errorf("could not hash input data: %w", err)
}
r := qbft.HashDataRoot(msg.SignedMessage.FullData)
if !bytes.Equal(msg.QBFTMessage.Root[:], r[:]) {
return spectypes.NewError(spectypes.RootHashInvalidErrorCode, "H(data) != root")
}
Expand Down Expand Up @@ -189,10 +184,7 @@ func (i *Instance) isProposalJustification(
}

// proposed fullData must equal highest prepared fullData
r, err := specqbft.HashDataRoot(fullData)
if err != nil {
return fmt.Errorf("could not hash input data: %w", err)
}
r := qbft.HashDataRoot(fullData)
if !bytes.Equal(r[:], rcMsg.QBFTMessage.Root[:]) {
return errors.New("proposed data doesn't match highest prepared")
}
Expand Down Expand Up @@ -233,16 +225,13 @@ func (i *Instance) ProposerForRound(round specqbft.Round) spectypes.OperatorID {
extractSignedPrepares(prepares));
*/
func (i *Instance) CreateProposal(fullData []byte, roundChanges, prepares []*specqbft.ProcessingMessage) (*spectypes.SignedSSVMessage, error) {
r, err := specqbft.HashDataRoot(fullData)
if err != nil {
return nil, fmt.Errorf("could not hash input data: %w", err)
}
r := qbft.HashDataRoot(fullData)

roundChangeSignedMessages := make([]*spectypes.SignedSSVMessage, 0)
roundChangeSignedMessages := make([]*spectypes.SignedSSVMessage, 0, len(roundChanges))
for _, msg := range roundChanges {
roundChangeSignedMessages = append(roundChangeSignedMessages, msg.SignedMessage)
}
prepareSignedMessages := make([]*spectypes.SignedSSVMessage, 0)
prepareSignedMessages := make([]*spectypes.SignedSSVMessage, 0, len(prepares))
for _, msg := range prepares {
prepareSignedMessages = append(prepareSignedMessages, msg.SignedMessage)
}
Expand Down
76 changes: 38 additions & 38 deletions protocol/v2/qbft/instance/round_change.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,9 @@ import (
spectypes "github.com/ssvlabs/ssv-spec/types"
"go.uber.org/zap"

"github.com/ssvlabs/ssv/observability"
"github.com/ssvlabs/ssv/observability/log/fields"
"github.com/ssvlabs/ssv/protocol/v2/qbft"
ssvtypes "github.com/ssvlabs/ssv/protocol/v2/types"
)

Expand All @@ -22,9 +24,8 @@ func (i *Instance) uponRoundChange(
logger *zap.Logger,
msg *specqbft.ProcessingMessage,
) error {
prevRound := i.State.Round
logger = logger.With(
zap.Uint64("qbft_instance_round", uint64(prevRound)),
zap.Uint64("qbft_instance_round", uint64(i.State.Round)),
zap.Uint64("qbft_instance_height", uint64(i.State.Height)),
)

Expand Down Expand Up @@ -53,6 +54,19 @@ func (i *Instance) uponRoundChange(
}

if justifiedRoundChangeMsg != nil {
i.metrics.EndStage(ctx, i.State.Round)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we consider only advancing the stage after the broadcast actually succeeds? EndStage / StartStage now run before CreateProposal / Broadcast here (and similarly at lines 110-112 for the partial-quorum path). I fear if CreateProposal or MarshalJustifications errors, the metric reports stageProposal while nothing was emitted, which can make dashboards misleading.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Keeping consistent with the "best-effort broadcast" framing this PR uses throughout: the protocol stage advances when we move on (we've already built the new message and the state machine has progressed); broadcast failures are tracked independently. Gating stage advancement on a successful broadcast would create a different inconsistency — on broadcast failure the instance would still be in the new round but the dashboard would show the old stage.

i.metrics.StartStage(stageProposal)

// hasReceivedProposalJustificationForLeadingRound accepts future-round quorums, but CreateProposal
// uses i.State.Round as the proposal round and MessagesForRound(i.State.Round) selects the
// round-change justifications - so bump to the justified round here before building the proposal.
//
// In practice this branch never observes a future-round quorum: on natural message flow, the
// partial-quorum branch (f+1 < 2f+1) fires on an earlier uponRoundChange call and advances
// State.Round before this branch can see a full quorum, so bumpToRound here is always a no-op.
// Kept as defense-in-depth in case the message-processing flow changes.
i.bumpToRound(justifiedRoundChangeMsg.QBFTMessage.Round)

roundChangeJustificationSignedMessages, _ := justifiedRoundChangeMsg.QBFTMessage.GetRoundChangeJustifications() // no need to check error, check on isValidRoundChange

roundChangeJustification := make([]*specqbft.ProcessingMessage, 0)
Expand All @@ -66,68 +80,60 @@ func (i *Instance) uponRoundChange(

proposal, err := i.CreateProposal(
valueToPropose,
i.State.RoundChangeContainer.MessagesForRound(prevRound), // TODO - might be optimized to include only necessary quorum
i.State.RoundChangeContainer.MessagesForRound(i.State.Round), // TODO - might be optimized to include only necessary quorum
Comment thread
julienh-ssv marked this conversation as resolved.
roundChangeJustification,
)
if err != nil {
return fmt.Errorf("failed to create proposal: %w", err)
}

valueToProposeRoot, err := specqbft.HashDataRoot(valueToPropose)
if err != nil {
return fmt.Errorf("failed to hash value-to-propose: %w", err)
}
valueToProposeRoot := qbft.HashDataRoot(valueToPropose)
logger = logger.With(zap.String("qbft_value_to_propose_root", hex.EncodeToString(valueToProposeRoot[:])))

i.metrics.EndStage(ctx, prevRound)
i.metrics.StartStage(stageProposal)

i.metrics.RecordRoundChange(ctx, prevRound, reasonJustified)

logger.Debug("🔄 got justified round change, leader broadcasting proposal message",
zap.Any("round_change_signers", allSigners(i.State.RoundChangeContainer.MessagesForRound(prevRound))),
zap.Any("round_change_signers", allSigners(i.State.RoundChangeContainer.MessagesForRound(i.State.Round))),
)

if err := i.Broadcast(proposal); err != nil {
return fmt.Errorf("failed to broadcast proposal message: %w", err)
}
} else if partialQuorum, rcs := i.hasReceivedPartialQuorum(); partialQuorum {
newRound := minRound(rcs)
if newRound <= prevRound {
if newRound <= i.State.Round {
// No need to advance round, we've already changed it.
return nil
}

i.metrics.EndStage(ctx, prevRound)
i.metrics.StartStage(stageRoundChange)

i.metrics.RecordRoundChange(ctx, prevRound, reasonPartialQuorum)

err := i.uponChangeRoundPartialQuorum(logger, newRound)
if err != nil {
if err := i.uponChangeRoundPartialQuorum(ctx, logger, newRound); err != nil {
return err
}
}
return nil
}

func (i *Instance) uponChangeRoundPartialQuorum(logger *zap.Logger, newRound specqbft.Round) error {
func (i *Instance) uponChangeRoundPartialQuorum(ctx context.Context, logger *zap.Logger, newRound specqbft.Round) error {
ctx, span := tracer.Start(ctx, observability.InstrumentName(observabilityNamespace, "qbft.instance.change_round_partial_quorum"))
defer span.End()

prevRound := i.State.Round

i.metrics.EndStage(ctx, prevRound)
i.metrics.StartStage(stageRoundChange)
i.metrics.RecordRoundChange(ctx, prevRound, reasonPartialQuorum)

// Always move on to the next round. The round-change message broadcast is a best-effort thing, the QBFT
// cluster as a whole can progress further even if our round-change message cannot be created/broadcast
// for whatever reason.
i.bumpToRound(newRound)
i.State.ProposalAcceptedForCurrentRound = nil

i.roundTimer.TimeoutForRound(newRound)
startValueRoot := qbft.HashDataRoot(i.StartValue)
logger = logger.With(zap.String("qbft_start_value_root", hex.EncodeToString(startValueRoot[:])))

roundChange, err := i.CreateRoundChange(newRound)
if err != nil {
return fmt.Errorf("failed to create round change message: %w", err)
}

startValueRoot, err := specqbft.HashDataRoot(i.StartValue)
if err != nil {
return fmt.Errorf("failed to hash instance start value: %w", err)
}
logger = logger.With(zap.String("qbft_start_value_root", hex.EncodeToString(startValueRoot[:])))

logger.Debug("📢 broadcasting round change message (got partial quorum)",
zap.Uint64("qbft_new_round", uint64(newRound)),
zap.Any("round_change_signers", roundChange.OperatorIDs),
Expand Down Expand Up @@ -282,10 +288,7 @@ func (i *Instance) validRoundChangeForDataIgnoreSignature(
// Addition to formal spec
// We add this extra tests on the msg itself to filter round change msgs with invalid justifications, before they are inserted into msg containers
if msg.QBFTMessage.RoundChangePrepared() {
r, err := specqbft.HashDataRoot(fullData)
if err != nil {
return fmt.Errorf("could not hash input data: %w", err)
}
r := qbft.HashDataRoot(fullData)

// validate prepare message justifications
prepareSignedMsgs, _ := msg.QBFTMessage.GetRoundChangeJustifications() // no need to check error, checked on msg.QBFTMessage.Validate()
Expand Down Expand Up @@ -384,10 +387,7 @@ func (i *Instance) getRoundChangeData() (specqbft.Round, [32]byte, []byte, []*sp
return specqbft.NoRound, [32]byte{}, nil, nil, fmt.Errorf("could not get round change justification: %w", err)
}

r, err := specqbft.HashDataRoot(i.State.LastPreparedValue)
if err != nil {
return specqbft.NoRound, [32]byte{}, nil, nil, fmt.Errorf("could not hash input data: %w", err)
}
r := qbft.HashDataRoot(i.State.LastPreparedValue)

return i.State.LastPreparedRound, r, i.State.LastPreparedValue, justifications, nil
}
Expand Down
Loading