Skip to content

Commit b62fc5a

Browse files
committed
Gracefully handle payload not available error
1 parent 0d17383 commit b62fc5a

17 files changed

Lines changed: 203 additions & 129 deletions

File tree

core/capabilities/remote/executable/request/client_request.go

Lines changed: 25 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -35,19 +35,20 @@ type clientResponse struct {
3535
}
3636

3737
type ClientRequest struct {
38-
id string
39-
cancelFn context.CancelFunc
40-
responseCh chan clientResponse
41-
createdAt time.Time
42-
responseIDCount map[[32]byte]int
43-
meteringResponses map[[32]byte][]commoncap.MeteringNodeDetail
44-
errorCount map[string]int
45-
totalErrorCount int
46-
responseReceived map[p2ptypes.PeerID]bool
47-
lggr logger.Logger
48-
ocr3Configs map[string]ocrtypes.ContractConfig
49-
workflowExecutionID string
50-
referenceID string
38+
id string
39+
cancelFn context.CancelFunc
40+
responseCh chan clientResponse
41+
createdAt time.Time
42+
responseIDCount map[[32]byte]int
43+
meteringResponses map[[32]byte][]commoncap.MeteringNodeDetail
44+
errorCount map[string]int
45+
totalErrorCount int
46+
payloadNotAvailableCount int
47+
responseReceived map[p2ptypes.PeerID]bool
48+
lggr logger.Logger
49+
ocr3Configs map[string]ocrtypes.ContractConfig
50+
workflowExecutionID string
51+
referenceID string
5152

5253
requiredIdenticalResponses int
5354
remoteNodeCount int
@@ -386,6 +387,16 @@ func (c *ClientRequest) OnMessage(_ context.Context, msg *types.MessageBody) err
386387
c.lggr.Warn("received multiple different errors for the same request, number of different errors received: %d", len(c.errorCount))
387388
}
388389

390+
if commoncap.ErrResponsePayloadNotAvailable.Is(errors.New(msg.ErrorMsg)) {
391+
c.payloadNotAvailableCount++
392+
if c.payloadNotAvailableCount == c.remoteNodeCount-c.requiredIdenticalResponses+1 {
393+
// return an error to indicate unexpected state, but do not send an error as we might still receive a response with valid attestation.
394+
return fmt.Errorf("unexpected state: received %d payload not available responses, while max allowed is %d. This means a bug in the code, please investigate",
395+
c.payloadNotAvailableCount, c.remoteNodeCount-c.requiredIdenticalResponses)
396+
}
397+
return nil
398+
}
399+
389400
if c.errorCount[msg.ErrorMsg] == c.requiredIdenticalResponses {
390401
c.sendResponse(clientResponse{Err: fmt.Errorf("%s : %s", msg.Error, msg.ErrorMsg)})
391402
} else if c.totalErrorCount == c.remoteNodeCount-c.requiredIdenticalResponses+1 {
@@ -421,7 +432,7 @@ func (c *ClientRequest) verifyAttestation(resp commoncap.CapabilityResponse, met
421432
}
422433

423434
reportData := commoncap.ResponseToReportData(c.workflowExecutionID, c.referenceID, resp.Payload.Value, metering.SpendUnit, metering.SpendValue)
424-
sigData := ocr2key.ReportToSigData3(attestation.ConfigDigest, attestation.SequenceNumber, reportData)
435+
sigData := ocr2key.ReportToSigData3(attestation.ConfigDigest, attestation.SequenceNumber, reportData[:])
425436
signed := make([]bool, len(cfg.Signers))
426437
for _, sig := range attestation.Sigs {
427438
if int(sig.Signer) > len(cfg.Signers) {

core/capabilities/remote/executable/request/client_request_internal_test.go

Lines changed: 10 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -38,9 +38,9 @@ func Test_ClientRequest_VerifyAttestation(t *testing.T) {
3838

3939
reportData := commoncap.ResponseToReportData(workflowExecutionID, referenceID, valueBytes, spendUnit, spendValue)
4040

41-
sig1, err := kb1.Sign3(configDigest, seqNr, reportData)
41+
sig1, err := kb1.Sign3(configDigest, seqNr, reportData[:])
4242
require.NoError(t, err)
43-
sig2, err := kb2.Sign3(configDigest, seqNr, reportData)
43+
sig2, err := kb2.Sign3(configDigest, seqNr, reportData[:])
4444
require.NoError(t, err)
4545

4646
ocr3Configs := map[string]ocrtypes.ContractConfig{
@@ -77,7 +77,7 @@ func Test_ClientRequest_VerifyAttestation(t *testing.T) {
7777

7878
t.Run("nil ocr3Configs returns error", func(t *testing.T) {
7979
cNil := &ClientRequest{workflowExecutionID: workflowExecutionID, referenceID: referenceID, lggr: logger.Test(t), ocr3Configs: nil}
80-
err := cNil.verifyAttestation(validResp)
80+
err := cNil.verifyAttestation(validResp, validResp.Metadata.Metering[0])
8181
require.Error(t, err)
8282
require.Contains(t, err.Error(), "OCR3 configs not provided")
8383
})
@@ -89,7 +89,7 @@ func Test_ClientRequest_VerifyAttestation(t *testing.T) {
8989
lggr: logger.Test(t),
9090
ocr3Configs: map[string]ocrtypes.ContractConfig{},
9191
}
92-
err := cBad.verifyAttestation(validResp)
92+
err := cBad.verifyAttestation(validResp, validResp.Metadata.Metering[0])
9393
require.Error(t, err)
9494
require.Contains(t, err.Error(), "not found")
9595
})
@@ -106,34 +106,11 @@ func Test_ClientRequest_VerifyAttestation(t *testing.T) {
106106
},
107107
Payload: &anypb.Any{TypeUrl: "type.googleapis.com/values.v1.Map", Value: valueBytes},
108108
}
109-
err := c.verifyAttestation(respFewSigs)
109+
err := c.verifyAttestation(respFewSigs, validResp.Metadata.Metering[0])
110110
require.Error(t, err)
111111
require.Contains(t, err.Error(), "not enough signatures")
112112
})
113113

114-
t.Run("unexpected number of metering records returns error", func(t *testing.T) {
115-
respBadMetering := commoncap.CapabilityResponse{
116-
Metadata: commoncap.ResponseMetadata{
117-
Metering: []commoncap.MeteringNodeDetail{
118-
{SpendUnit: spendUnit, SpendValue: spendValue},
119-
{SpendUnit: "other", SpendValue: "99"},
120-
},
121-
OCRAttestation: &commoncap.ResponseOCRAttestation{
122-
ConfigDigest: configDigest,
123-
SequenceNumber: seqNr,
124-
Sigs: []commoncap.AttributedSignature{
125-
{Signer: 0, Signature: sig1},
126-
{Signer: 1, Signature: sig2},
127-
},
128-
},
129-
},
130-
Payload: &anypb.Any{TypeUrl: "type.googleapis.com/values.v1.Map", Value: valueBytes},
131-
}
132-
err := c.verifyAttestation(respBadMetering)
133-
require.Error(t, err)
134-
require.Contains(t, err.Error(), "unexpected number of metering records")
135-
})
136-
137114
t.Run("invalid signer index returns error", func(t *testing.T) {
138115
respBadSigner := commoncap.CapabilityResponse{
139116
Metadata: commoncap.ResponseMetadata{
@@ -149,7 +126,7 @@ func Test_ClientRequest_VerifyAttestation(t *testing.T) {
149126
},
150127
Payload: &anypb.Any{TypeUrl: "type.googleapis.com/values.v1.Map", Value: valueBytes},
151128
}
152-
err := c.verifyAttestation(respBadSigner)
129+
err := c.verifyAttestation(respBadSigner, validResp.Metadata.Metering[0])
153130
require.Error(t, err)
154131
require.Contains(t, err.Error(), "invalid signer index")
155132
})
@@ -169,7 +146,7 @@ func Test_ClientRequest_VerifyAttestation(t *testing.T) {
169146
},
170147
Payload: &anypb.Any{TypeUrl: "type.googleapis.com/values.v1.Map", Value: valueBytes},
171148
}
172-
err := c.verifyAttestation(respDupSig)
149+
err := c.verifyAttestation(respDupSig, validResp.Metadata.Metering[0])
173150
require.Error(t, err)
174151
require.Contains(t, err.Error(), "duplicate signature")
175152
})
@@ -192,7 +169,7 @@ func Test_ClientRequest_VerifyAttestation(t *testing.T) {
192169
},
193170
Payload: &anypb.Any{TypeUrl: "type.googleapis.com/values.v1.Map", Value: valueBytes},
194171
}
195-
err = c.verifyAttestation(respBadSig)
172+
err = c.verifyAttestation(respBadSig, validResp.Metadata.Metering[0])
196173
require.Error(t, err)
197174
require.Contains(t, err.Error(), "invalid signature")
198175
})
@@ -213,13 +190,13 @@ func Test_ClientRequest_VerifyAttestation(t *testing.T) {
213190
},
214191
Payload: &anypb.Any{TypeUrl: "x", Value: wrongBytes},
215192
}
216-
err := c.verifyAttestation(respWrongPayload)
193+
err := c.verifyAttestation(respWrongPayload, validResp.Metadata.Metering[0])
217194
require.Error(t, err)
218195
require.Contains(t, err.Error(), "invalid signature")
219196
})
220197

221198
t.Run("valid attestation succeeds", func(t *testing.T) {
222-
err := c.verifyAttestation(validResp)
199+
err := c.verifyAttestation(validResp, validResp.Metadata.Metering[0])
223200
require.NoError(t, err)
224201
})
225202
}

core/capabilities/remote/executable/request/client_request_test.go

Lines changed: 129 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -336,29 +336,15 @@ func Test_ClientRequest_MessageValidation(t *testing.T) {
336336
assert.Equal(t, resp, values.NewString("response1"))
337337
})
338338
t.Run("Execute Request With Valid Attestation", func(t *testing.T) {
339-
ctx := t.Context()
340-
capabilityPeers, capDonInfo, capInfo := capabilityDon(t, 4, 1)
339+
const F = 1
340+
const N = 3*F + 1
341+
capabilityPeers, capDonInfo, capInfo := capabilityDon(t, N, F)
341342

342343
configDigest := ocrtypes.ConfigDigest{1, 2, 3, 4, 5}
343344
kb1, err := ocr2key.New(corekeys.EVM)
344345
require.NoError(t, err)
345346
kb2, err := ocr2key.New(corekeys.EVM)
346347
require.NoError(t, err)
347-
dispatcher := &clientRequestTestDispatcher{msgs: make(chan *types.MessageBody, 100)}
348-
req, err := request.NewClientExecuteRequest(ctx, logger.Test(t), capabilityRequest, capInfo,
349-
workflowDonInfo, dispatcher, 10*time.Minute, nil, "", map[string]ocrtypes.ContractConfig{
350-
pb.OCR3ConfigDefaultKey: {
351-
ConfigDigest: configDigest,
352-
Signers: []ocrtypes.OnchainPublicKey{kb1.PublicKey(), kb2.PublicKey()},
353-
F: 1,
354-
},
355-
})
356-
require.NoError(t, err)
357-
defer req.Cancel(errors.New("test end"))
358-
359-
<-dispatcher.msgs
360-
<-dispatcher.msgs
361-
assert.Empty(t, dispatcher.msgs)
362348

363349
seqNr := uint64(100)
364350

@@ -372,9 +358,9 @@ func Test_ClientRequest_MessageValidation(t *testing.T) {
372358
spendUnit, spendValue := "testunit", "42"
373359
reportData := commoncap.ResponseToReportData(capabilityRequest.Metadata.WorkflowExecutionID, capabilityRequest.Metadata.ReferenceID, payloadAsAny.Value, spendUnit, spendValue)
374360

375-
sig1, err := kb1.Sign3(configDigest, seqNr, reportData)
361+
sig1, err := kb1.Sign3(configDigest, seqNr, reportData[:])
376362
require.NoError(t, err)
377-
sig2, err := kb2.Sign3(configDigest, seqNr, reportData)
363+
sig2, err := kb2.Sign3(configDigest, seqNr, reportData[:])
378364
require.NoError(t, err)
379365

380366
rawResponseWithAttestation, err := pb.MarshalCapabilityResponse(commoncap.CapabilityResponse{
@@ -395,31 +381,135 @@ func Test_ClientRequest_MessageValidation(t *testing.T) {
395381
})
396382
require.NoError(t, err)
397383

398-
msg := &types.MessageBody{
399-
CapabilityId: capInfo.ID,
400-
CapabilityDonId: capDonInfo.ID,
401-
CallerDonId: workflowDonInfo.ID,
402-
Method: types.MethodExecute,
403-
Payload: rawResponseWithAttestation,
404-
MessageId: []byte("messageID"),
384+
ocrConfigs := map[string]ocrtypes.ContractConfig{
385+
pb.OCR3ConfigDefaultKey: {
386+
ConfigDigest: configDigest,
387+
Signers: []ocrtypes.OnchainPublicKey{kb1.PublicKey(), kb2.PublicKey()},
388+
F: F,
389+
},
405390
}
406-
msg.Sender = capabilityPeers[0][:]
407-
err = req.OnMessage(ctx, msg)
408-
require.NoError(t, err)
409391

410-
response := <-req.ResponseChan()
411-
capResponse, err := pb.UnmarshalCapabilityResponse(response.Result)
412-
require.NoError(t, err)
392+
assertValidResponse := func(t *testing.T, result []byte) {
393+
capResponse, err := pb.UnmarshalCapabilityResponse(result)
394+
require.NoError(t, err)
413395

414-
var pbValue pbvalues.Value
415-
require.NoError(t, capResponse.Payload.UnmarshalTo(&pbValue))
416-
receivedValue, err := values.FromProto(&pbValue)
417-
require.NoError(t, err)
396+
var pbValue pbvalues.Value
397+
require.NoError(t, capResponse.Payload.UnmarshalTo(&pbValue))
398+
receivedValue, err := values.FromProto(&pbValue)
399+
require.NoError(t, err)
400+
401+
var receivedMap map[string]int
402+
require.NoError(t, receivedValue.UnwrapTo(&receivedMap))
403+
404+
assert.Equal(t, 42, receivedMap["number"])
405+
}
418406

419-
var receivedMap map[string]int
420-
require.NoError(t, receivedValue.UnwrapTo(&receivedMap))
407+
t.Run("succeeds on first peer with valid attestation", func(t *testing.T) {
408+
ctx := t.Context()
421409

422-
assert.Equal(t, 42, receivedMap["number"])
410+
dispatcher := &clientRequestTestDispatcher{msgs: make(chan *types.MessageBody, 100)}
411+
req, err := request.NewClientExecuteRequest(ctx, logger.Test(t), capabilityRequest, capInfo,
412+
workflowDonInfo, dispatcher, 10*time.Minute, nil, "", ocrConfigs)
413+
require.NoError(t, err)
414+
defer req.Cancel(errors.New("test end"))
415+
416+
for range N {
417+
<-dispatcher.msgs
418+
}
419+
420+
assert.Empty(t, dispatcher.msgs)
421+
422+
msg := &types.MessageBody{
423+
CapabilityId: capInfo.ID,
424+
CapabilityDonId: capDonInfo.ID,
425+
CallerDonId: workflowDonInfo.ID,
426+
Method: types.MethodExecute,
427+
Payload: rawResponseWithAttestation,
428+
MessageId: []byte("messageID"),
429+
}
430+
msg.Sender = capabilityPeers[0][:]
431+
err = req.OnMessage(ctx, msg)
432+
require.NoError(t, err)
433+
434+
response := <-req.ResponseChan()
435+
assertValidResponse(t, response.Result)
436+
})
437+
438+
t.Run("2F peers return ErrResponsePayloadNotAvailable then success", func(t *testing.T) {
439+
ctx := t.Context()
440+
dispatcher := &clientRequestTestDispatcher{msgs: make(chan *types.MessageBody, 100)}
441+
req, err := request.NewClientExecuteRequest(ctx, logger.Test(t), capabilityRequest, capInfo,
442+
workflowDonInfo, dispatcher, 10*time.Minute, nil, "", ocrConfigs)
443+
require.NoError(t, err)
444+
defer req.Cancel(errors.New("test end"))
445+
446+
for range N {
447+
<-dispatcher.msgs
448+
}
449+
450+
assert.Empty(t, dispatcher.msgs)
451+
452+
for i := range 2 * F {
453+
msgNA := &types.MessageBody{
454+
CapabilityId: capInfo.ID,
455+
CapabilityDonId: capDonInfo.ID,
456+
CallerDonId: workflowDonInfo.ID,
457+
Method: types.MethodExecute,
458+
MessageId: []byte("messageID"),
459+
Error: types.Error_INTERNAL_ERROR,
460+
ErrorMsg: commoncap.ErrResponsePayloadNotAvailable.Error(),
461+
}
462+
msgNA.Sender = capabilityPeers[i][:]
463+
require.NoError(t, req.OnMessage(ctx, msgNA))
464+
}
465+
466+
msgOK := &types.MessageBody{
467+
CapabilityId: capInfo.ID,
468+
CapabilityDonId: capDonInfo.ID,
469+
CallerDonId: workflowDonInfo.ID,
470+
Method: types.MethodExecute,
471+
Payload: rawResponseWithAttestation,
472+
MessageId: []byte("messageID"),
473+
}
474+
msgOK.Sender = capabilityPeers[2*F][:]
475+
require.NoError(t, req.OnMessage(ctx, msgOK))
476+
477+
response := <-req.ResponseChan()
478+
assertValidResponse(t, response.Result)
479+
})
480+
481+
t.Run("2F+1 peers return ErrResponsePayloadNotAvailable", func(t *testing.T) {
482+
ctx := t.Context()
483+
dispatcher := &clientRequestTestDispatcher{msgs: make(chan *types.MessageBody, 100)}
484+
req, err := request.NewClientExecuteRequest(ctx, logger.Test(t), capabilityRequest, capInfo,
485+
workflowDonInfo, dispatcher, 10*time.Minute, nil, "", ocrConfigs)
486+
require.NoError(t, err)
487+
defer req.Cancel(errors.New("test end"))
488+
489+
for range N {
490+
<-dispatcher.msgs
491+
}
492+
493+
assert.Empty(t, dispatcher.msgs)
494+
495+
noPayloadMsg := types.MessageBody{
496+
CapabilityId: capInfo.ID,
497+
CapabilityDonId: capDonInfo.ID,
498+
CallerDonId: workflowDonInfo.ID,
499+
Method: types.MethodExecute,
500+
MessageId: []byte("messageID"),
501+
Error: types.Error_INTERNAL_ERROR,
502+
ErrorMsg: commoncap.ErrResponsePayloadNotAvailable.Error(),
503+
}
504+
505+
for i := range 2 * F {
506+
noPayloadMsg.Sender = capabilityPeers[i][:]
507+
require.NoError(t, req.OnMessage(ctx, &noPayloadMsg))
508+
}
509+
510+
noPayloadMsg.Sender = capabilityPeers[2*F][:]
511+
require.Error(t, req.OnMessage(ctx, &noPayloadMsg))
512+
})
423513
})
424514

425515
t.Run("Executes full schedule", func(t *testing.T) {

core/scripts/go.mod

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -46,8 +46,8 @@ require (
4646
github.com/shopspring/decimal v1.4.0
4747
github.com/smartcontractkit/chainlink-automation v0.8.1
4848
github.com/smartcontractkit/chainlink-ccip v0.1.1-solana.0.20260317185256-d5f7db87ae70
49-
github.com/smartcontractkit/chainlink-common v0.10.1-0.20260319170028-82a189c9c1bc
50-
github.com/smartcontractkit/chainlink-common/keystore v1.0.3-0.20260319170028-82a189c9c1bc
49+
github.com/smartcontractkit/chainlink-common v0.10.1-0.20260325173120-bbf92524d041
50+
github.com/smartcontractkit/chainlink-common/keystore v1.0.3-0.20260319174204-76aac329aab9
5151
github.com/smartcontractkit/chainlink-data-streams v0.1.12
5252
github.com/smartcontractkit/chainlink-deployments-framework v0.86.3
5353
github.com/smartcontractkit/chainlink-evm v0.3.4-0.20260318010722-59d4165024f1

core/scripts/go.sum

Lines changed: 4 additions & 4 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)