diff --git a/core/services/ocr2/plugins/llo/helpers_test.go b/core/services/ocr2/plugins/llo/helpers_test.go index 2f471dfdeaf..788a13b592b 100644 --- a/core/services/ocr2/plugins/llo/helpers_test.go +++ b/core/services/ocr2/plugins/llo/helpers_test.go @@ -12,6 +12,7 @@ import ( "net/http" "net/http/httptest" "net/url" + "sync/atomic" "testing" "time" @@ -419,6 +420,32 @@ func createSingleDecimalBridge(t *testing.T, name string, i int, p decimal.Decim return bridgeName } +// createSingleDecimalCountingBridge is like createSingleDecimalBridge but increments callCount on each bridge request. +func createSingleDecimalCountingBridge(t *testing.T, name string, i int, p decimal.Decimal, borm bridges.ORM, callCount *atomic.Uint64) (bridgeName string) { + ctx := testutils.Context(t) + bridge := httptest.NewServer(http.HandlerFunc(func(res http.ResponseWriter, req *http.Request) { + callCount.Add(1) + b, err := io.ReadAll(req.Body) + require.NoError(t, err) + require.JSONEq(t, `{"data":{"data":"foo"}}`, string(b)) + + res.WriteHeader(http.StatusOK) + val := p.String() + resp := fmt.Sprintf(`{"result": %s}`, val) + _, err = res.Write([]byte(resp)) + require.NoError(t, err) + })) + t.Cleanup(bridge.Close) + u, _ := url.Parse(bridge.URL) + bridgeName = fmt.Sprintf("bridge-%s-%d", name, i) + require.NoError(t, borm.CreateBridgeType(ctx, &bridges.BridgeType{ + Name: bridges.BridgeName(bridgeName), + URL: models.WebURL(*u), + })) + + return bridgeName +} + func createBridge(t *testing.T, bridgeName string, responseJSON string, borm bridges.ORM) { ctx := testutils.Context(t) bridge := httptest.NewServer(http.HandlerFunc(func(res http.ResponseWriter, req *http.Request) { diff --git a/core/services/ocr2/plugins/llo/integration_test.go b/core/services/ocr2/plugins/llo/integration_test.go index 3007d77b2ff..08ac67f4a60 100644 --- a/core/services/ocr2/plugins/llo/integration_test.go +++ b/core/services/ocr2/plugins/llo/integration_test.go @@ -11,6 +11,7 @@ import ( "net/http/httptest" "sort" "strings" + "sync/atomic" "testing" "time" @@ -2692,6 +2693,189 @@ channelDefinitionsContractFromBlock = %d`, serverURL, serverPubKey, donID, confi }) } +// TestIntegration_LLO_tombstone_stops_observations_and_reports checks that once a channel is +// tombstoned, the DON stops observing its streams (no bridge traffic for those stream jobs) +// and no longer transmits reports for that channel. +func TestIntegration_LLO_tombstone_stops_observations_and_reports(t *testing.T) { + t.Parallel() + + const ( + salt = 500 + donID = uint32(777666) + streamIDActive = uint32(190) + streamIDTombstone = uint32(191) + ) + + offchainConfig := datastreamsllo.OffchainConfig{ + ProtocolVersion: 1, + DefaultMinReportIntervalNanoseconds: uint64(1 * time.Second), + EnableObservationCompression: true, + } + + clientCSAKeys := make([]csakey.KeyV2, nNodes) + clientPubKeys := make([]ed25519.PublicKey, nNodes) + for i := range nNodes { + k := big.NewInt(int64(salt + i)) + key := csakey.MustNewV2XXXTestingOnly(k) + clientCSAKeys[i] = key + clientPubKeys[i] = key.PublicKey + } + + steve, backend, configurator, configuratorAddress, _, _, _, _, configStore, configStoreAddress, _, _, _, _ := setupBlockchain(t) + fromBlock := 1 + + bootstrapCSAKey := csakey.MustNewV2XXXTestingOnly(big.NewInt(salt - 1)) + bootstrapNodePort := freeport.GetOne(t) + appBootstrap, bootstrapPeerID, _, bootstrapKb, _ := setupNode(t, bootstrapNodePort, "bootstrap_llo_tombstone", backend, bootstrapCSAKey, nil) + bootstrapNode := Node{App: appBootstrap, KeyBundle: bootstrapKb} + + packetCh := make(chan *packet, 100000) + serverKey := csakey.MustNewV2XXXTestingOnly(big.NewInt(salt - 2)) + serverPubKey := serverKey.PublicKey + srv := NewMercuryServer(t, serverKey, packetCh) + serverURL := startMercuryServer(t, srv, clientPubKeys) + + oracles, nodes := setupNodes(t, nNodes, backend, clientCSAKeys, func(c *chainlink.Config) { + c.Mercury.Transmitter.Protocol = ptr(config.MercuryTransmitterProtocolGRPC) + }) + + chainID := testutils.SimulatedChainID + relayType := "evm" + relayConfig := fmt.Sprintf(` +chainID = "%s" +fromBlock = %d +lloDonID = %d +lloConfigMode = "bluegreen" +`, chainID, fromBlock, donID) + addBootstrapJob(t, bootstrapNode, configuratorAddress, "job-tombstone", relayType, relayConfig) + + pluginConfig := fmt.Sprintf(`servers = { "%s" = "%x" } +donID = %d +channelDefinitionsContractAddress = "0x%x" +channelDefinitionsContractFromBlock = %d`, serverURL, serverPubKey, donID, configStoreAddress, fromBlock) + + var streamACalls, streamBCalls atomic.Uint64 + priceA := decimal.NewFromFloat(111.1) + priceB := decimal.NewFromFloat(222.2) + for i, node := range nodes { + bridgeA := createSingleDecimalCountingBridge(t, "tomb-active", i, priceA, node.App.BridgeORM(), &streamACalls) + addSingleDecimalStreamJob(t, node, streamIDActive, bridgeA) + bridgeB := createSingleDecimalCountingBridge(t, "tomb-stone", i, priceB, node.App.BridgeORM(), &streamBCalls) + addSingleDecimalStreamJob(t, node, streamIDTombstone, bridgeB) + addLLOJob( + t, + node, + configuratorAddress, + bootstrapPeerID, + bootstrapNodePort, + clientPubKeys[i], + "tombstone-stream-test", + pluginConfig, + relayType, + relayConfig, + ) + } + + channelDefinitions := llotypes.ChannelDefinitions{ + 1: { + ReportFormat: llotypes.ReportFormatJSON, + Streams: []llotypes.Stream{ + {StreamID: streamIDActive, Aggregator: llotypes.AggregatorMedian}, + }, + }, + 2: { + ReportFormat: llotypes.ReportFormatJSON, + Streams: []llotypes.Stream{ + {StreamID: streamIDTombstone, Aggregator: llotypes.AggregatorMedian}, + }, + }, + } + url, sha := newChannelDefinitionsServer(t, channelDefinitions) + _, err := configStore.SetChannelDefinitions(steve, donID, url, sha) + require.NoError(t, err) + backend.Commit() + + setProductionConfig( + t, donID, steve, backend, configurator, configuratorAddress, nodes, + WithOracles(oracles), WithOffchainConfig(offchainConfig), + ) + + seenChannels := make(map[uint32]bool) + require.Eventually(t, func() bool { + pckt, err := receiveWithTimeout(t, packetCh, 2*time.Second) + if err != nil { + return false + } + req := pckt.req + if req.ReportFormat != uint32(llotypes.ReportFormatJSON) { + return len(seenChannels) == 2 + } + _, _, r, _, err := (datastreamsllo.JSONReportCodec{}).UnpackDecode(req.Payload) + if err != nil { + return len(seenChannels) == 2 + } + if r.ChannelID == 1 || r.ChannelID == 2 { + seenChannels[r.ChannelID] = true + } + return len(seenChannels) == 2 + }, reportTimeout, 100*time.Millisecond, "expected reports for channel 1 and 2 before tombstone") + require.Greater(t, streamBCalls.Load(), uint64(0), "stream for channel 2 should be observed before tombstone") + + // Tombstone channel 2 only; channel 1 keeps observing streamIDActive. + tombstonedDefs := llotypes.ChannelDefinitions{ + 1: { + ReportFormat: llotypes.ReportFormatJSON, + Streams: []llotypes.Stream{ + {StreamID: streamIDActive, Aggregator: llotypes.AggregatorMedian}, + }, + }, + 2: { + ReportFormat: llotypes.ReportFormatJSON, + Tombstone: true, + Streams: []llotypes.Stream{ + {StreamID: streamIDTombstone, Aggregator: llotypes.AggregatorMedian}, + }, + }, + } + url2, sha2 := newChannelDefinitionsServer(t, tombstonedDefs) + _, err = configStore.SetChannelDefinitions(steve, donID, url2, sha2) + require.NoError(t, err) + backend.Commit() + + tombstonedChannel := map[uint32]bool{2: true} + checkNoReportsWindow := 5 * time.Second + require.Eventually(t, func() bool { + start := time.Now() + sawTombstoned := false + for time.Since(start) < checkNoReportsWindow { + pckt, err := receiveWithTimeout(t, packetCh, 1*time.Second) + if err != nil { + continue + } + req := pckt.req + if req.ReportFormat != uint32(llotypes.ReportFormatJSON) { + continue + } + _, _, r, _, err := (datastreamsllo.JSONReportCodec{}).UnpackDecode(req.Payload) + if err == nil && tombstonedChannel[r.ChannelID] { + sawTombstoned = true + break + } + } + return !sawTombstoned + }, 45*time.Second, 200*time.Millisecond, "channel 2 should stop producing reports after tombstone") + + // After reports for channel 2 have stopped, bridge traffic for streamIDTombstone should stop + // while streamIDActive continues to be observed. + bCallsAfterReportsStopped := streamBCalls.Load() + aCallsAfterReportsStopped := streamACalls.Load() + time.Sleep(6 * time.Second) + require.Equal(t, bCallsAfterReportsStopped, streamBCalls.Load(), + "tombstoned channel's stream should not be observed (no additional bridge calls)") + require.Greater(t, streamACalls.Load(), aCallsAfterReportsStopped, + "active channel's stream should still be observed") +} + func setupNodes(t *testing.T, nNodes int, backend evmtypes.Backend, clientCSAKeys []csakey.KeyV2, f func(*chainlink.Config)) (oracles []confighelper.OracleIdentityExtra, nodes []Node) { ports := freeport.GetN(t, nNodes) for i := range nNodes { diff --git a/core/services/ocr2/plugins/llo/onchain_channel_definition_cache_integration_test.go b/core/services/ocr2/plugins/llo/onchain_channel_definition_cache_integration_test.go index bad4e9e913e..bc1ec5a0071 100644 --- a/core/services/ocr2/plugins/llo/onchain_channel_definition_cache_integration_test.go +++ b/core/services/ocr2/plugins/llo/onchain_channel_definition_cache_integration_test.go @@ -1325,6 +1325,80 @@ func Test_ChannelDefinitionCache_OwnerAndAdderMerging(t *testing.T) { "tombstoned channel 600 should be dropped, channel 601 should remain") }) + // After the owner omits a tombstoned channel from their on-chain definitions, the in-memory + // cache no longer carries that channel for the owner source. The merged map returned by + // Definitions(prevOutcome) is what the LLO plugin treats as channel definitions for the OCR + // outcome; it must stay free of the dropped channel on later rounds too. + // + // Depends on prior subtests in this test (owner tombstone + omit flow); do not run in + // isolation with go test -run matching only this subtest name. + t.Run("dropped tombstoned channel stays out of merged outcome after cache update", func(t *testing.T) { + observedLogs.TakeAll() + + prevSimulatingOCROutcome := llotypes.ChannelDefinitions{ + 600: { + ReportFormat: llotypes.ReportFormatJSON, + Streams: []llotypes.Stream{ + {StreamID: 1, Aggregator: llotypes.AggregatorMedian}, + }, + Source: channeldefinitions.SourceOwner, + Tombstone: true, + }, + 601: { + ReportFormat: llotypes.ReportFormatJSON, + Streams: []llotypes.Stream{ + {StreamID: 2, Aggregator: llotypes.AggregatorMode}, + }, + Source: channeldefinitions.SourceOwner, + Tombstone: false, + }, + 602: { + ReportFormat: llotypes.ReportFormatJSON, + Streams: []llotypes.Stream{ + {StreamID: 3, Aggregator: llotypes.AggregatorMedian}, + }, + Source: adder1ID, + Tombstone: false, + }, + } + + mergedOutcome := cdc.Definitions(prevSimulatingOCROutcome) + _, has600 := mergedOutcome[600] + require.False(t, has600, "merged outcome should not contain dropped tombstoned channel 600") + _, has601 := mergedOutcome[601] + require.True(t, has601, "merged outcome should still contain channel 601") + _, has602 := mergedOutcome[602] + require.True(t, has602, "merged outcome should still contain adder channel 602") + + // Simulate the next observation round: prev is the prior merged channel definitions. + mergedAgain := cdc.Definitions(mergedOutcome) + _, still600 := mergedAgain[600] + require.False(t, still600, "channel 600 must not reappear in merged outcome on subsequent Definitions(prev) calls") + require.Contains(t, mergedAgain, llotypes.ChannelID(601)) + require.Contains(t, mergedAgain, llotypes.ChannelID(602)) + + require.Eventually(t, func() bool { + loaded, err := orm.LoadChannelDefinitions(testutils.Context(t), configStoreAddress, donID) + if err != nil || loaded == nil { + return false + } + if loaded.Format != channeldefinitions.MultiChannelDefinitionsFormat { + return false + } + var sources map[uint32]llotypes2.SourceDefinition + if err = json.Unmarshal(loaded.Definitions, &sources); err != nil { + return false + } + ownerSrc, ok := sources[channeldefinitions.SourceOwner] + if !ok { + return false + } + _, ownerHas600 := ownerSrc.Definitions[600] + return !ownerHas600 && len(ownerSrc.Definitions) > 0 + }, 5*time.Second, 100*time.Millisecond, + "persisted owner source definitions should not list channel 600 after owner omitted it from the cache") + }) + t.Run("multiple adders can add different channels", func(t *testing.T) { observedLogs.TakeAll()