Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions core/services/ocr2/plugins/llo/helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"net/http"
"net/http/httptest"
"net/url"
"sync/atomic"
"testing"
"time"

Expand Down Expand Up @@ -419,6 +420,32 @@ func createSingleDecimalBridge(t *testing.T, name string, i int, p decimal.Decim
return bridgeName
}

// createSingleDecimalCountingBridge is like createSingleDecimalBridge but increments callCount on each bridge request.
func createSingleDecimalCountingBridge(t *testing.T, name string, i int, p decimal.Decimal, borm bridges.ORM, callCount *atomic.Uint64) (bridgeName string) {
ctx := testutils.Context(t)
bridge := httptest.NewServer(http.HandlerFunc(func(res http.ResponseWriter, req *http.Request) {
callCount.Add(1)
b, err := io.ReadAll(req.Body)
require.NoError(t, err)
require.JSONEq(t, `{"data":{"data":"foo"}}`, string(b))

res.WriteHeader(http.StatusOK)
val := p.String()
resp := fmt.Sprintf(`{"result": %s}`, val)
_, err = res.Write([]byte(resp))
require.NoError(t, err)
}))
t.Cleanup(bridge.Close)
u, _ := url.Parse(bridge.URL)
bridgeName = fmt.Sprintf("bridge-%s-%d", name, i)
require.NoError(t, borm.CreateBridgeType(ctx, &bridges.BridgeType{
Name: bridges.BridgeName(bridgeName),
URL: models.WebURL(*u),
}))

return bridgeName
}

func createBridge(t *testing.T, bridgeName string, responseJSON string, borm bridges.ORM) {
ctx := testutils.Context(t)
bridge := httptest.NewServer(http.HandlerFunc(func(res http.ResponseWriter, req *http.Request) {
Expand Down
184 changes: 184 additions & 0 deletions core/services/ocr2/plugins/llo/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"net/http/httptest"
"sort"
"strings"
"sync/atomic"
"testing"
"time"

Expand Down Expand Up @@ -2692,6 +2693,189 @@ channelDefinitionsContractFromBlock = %d`, serverURL, serverPubKey, donID, confi
})
}

// TestIntegration_LLO_tombstone_stops_observations_and_reports checks that once a channel is
// tombstoned, the DON stops observing its streams (no bridge traffic for those stream jobs)
// and no longer transmits reports for that channel.
func TestIntegration_LLO_tombstone_stops_observations_and_reports(t *testing.T) {
t.Parallel()

const (
salt = 500
donID = uint32(777666)
streamIDActive = uint32(190)
streamIDTombstone = uint32(191)
)

offchainConfig := datastreamsllo.OffchainConfig{
ProtocolVersion: 1,
DefaultMinReportIntervalNanoseconds: uint64(1 * time.Second),
EnableObservationCompression: true,
}

clientCSAKeys := make([]csakey.KeyV2, nNodes)
clientPubKeys := make([]ed25519.PublicKey, nNodes)
for i := range nNodes {
k := big.NewInt(int64(salt + i))
key := csakey.MustNewV2XXXTestingOnly(k)
clientCSAKeys[i] = key
clientPubKeys[i] = key.PublicKey
}

steve, backend, configurator, configuratorAddress, _, _, _, _, configStore, configStoreAddress, _, _, _, _ := setupBlockchain(t)
fromBlock := 1

bootstrapCSAKey := csakey.MustNewV2XXXTestingOnly(big.NewInt(salt - 1))
bootstrapNodePort := freeport.GetOne(t)
appBootstrap, bootstrapPeerID, _, bootstrapKb, _ := setupNode(t, bootstrapNodePort, "bootstrap_llo_tombstone", backend, bootstrapCSAKey, nil)
bootstrapNode := Node{App: appBootstrap, KeyBundle: bootstrapKb}

packetCh := make(chan *packet, 100000)
serverKey := csakey.MustNewV2XXXTestingOnly(big.NewInt(salt - 2))
serverPubKey := serverKey.PublicKey
srv := NewMercuryServer(t, serverKey, packetCh)
serverURL := startMercuryServer(t, srv, clientPubKeys)

oracles, nodes := setupNodes(t, nNodes, backend, clientCSAKeys, func(c *chainlink.Config) {
c.Mercury.Transmitter.Protocol = ptr(config.MercuryTransmitterProtocolGRPC)
})

chainID := testutils.SimulatedChainID
relayType := "evm"
relayConfig := fmt.Sprintf(`
chainID = "%s"
fromBlock = %d
lloDonID = %d
lloConfigMode = "bluegreen"
`, chainID, fromBlock, donID)
addBootstrapJob(t, bootstrapNode, configuratorAddress, "job-tombstone", relayType, relayConfig)

pluginConfig := fmt.Sprintf(`servers = { "%s" = "%x" }
donID = %d
channelDefinitionsContractAddress = "0x%x"
channelDefinitionsContractFromBlock = %d`, serverURL, serverPubKey, donID, configStoreAddress, fromBlock)

var streamACalls, streamBCalls atomic.Uint64
priceA := decimal.NewFromFloat(111.1)
priceB := decimal.NewFromFloat(222.2)
for i, node := range nodes {
bridgeA := createSingleDecimalCountingBridge(t, "tomb-active", i, priceA, node.App.BridgeORM(), &streamACalls)
addSingleDecimalStreamJob(t, node, streamIDActive, bridgeA)
bridgeB := createSingleDecimalCountingBridge(t, "tomb-stone", i, priceB, node.App.BridgeORM(), &streamBCalls)
addSingleDecimalStreamJob(t, node, streamIDTombstone, bridgeB)
addLLOJob(
t,
node,
configuratorAddress,
bootstrapPeerID,
bootstrapNodePort,
clientPubKeys[i],
"tombstone-stream-test",
pluginConfig,
relayType,
relayConfig,
)
}

channelDefinitions := llotypes.ChannelDefinitions{
1: {
ReportFormat: llotypes.ReportFormatJSON,
Streams: []llotypes.Stream{
{StreamID: streamIDActive, Aggregator: llotypes.AggregatorMedian},
},
},
2: {
ReportFormat: llotypes.ReportFormatJSON,
Streams: []llotypes.Stream{
{StreamID: streamIDTombstone, Aggregator: llotypes.AggregatorMedian},
},
},
}
url, sha := newChannelDefinitionsServer(t, channelDefinitions)
_, err := configStore.SetChannelDefinitions(steve, donID, url, sha)
require.NoError(t, err)
backend.Commit()

setProductionConfig(
t, donID, steve, backend, configurator, configuratorAddress, nodes,
WithOracles(oracles), WithOffchainConfig(offchainConfig),
)

seenChannels := make(map[uint32]bool)
require.Eventually(t, func() bool {
pckt, err := receiveWithTimeout(t, packetCh, 2*time.Second)
if err != nil {
return false
}
req := pckt.req
if req.ReportFormat != uint32(llotypes.ReportFormatJSON) {
return len(seenChannels) == 2
}
_, _, r, _, err := (datastreamsllo.JSONReportCodec{}).UnpackDecode(req.Payload)
if err != nil {
return len(seenChannels) == 2
}
if r.ChannelID == 1 || r.ChannelID == 2 {
seenChannels[r.ChannelID] = true
}
return len(seenChannels) == 2
}, reportTimeout, 100*time.Millisecond, "expected reports for channel 1 and 2 before tombstone")
require.Greater(t, streamBCalls.Load(), uint64(0), "stream for channel 2 should be observed before tombstone")

// Tombstone channel 2 only; channel 1 keeps observing streamIDActive.
tombstonedDefs := llotypes.ChannelDefinitions{
1: {
ReportFormat: llotypes.ReportFormatJSON,
Streams: []llotypes.Stream{
{StreamID: streamIDActive, Aggregator: llotypes.AggregatorMedian},
},
},
2: {
ReportFormat: llotypes.ReportFormatJSON,
Tombstone: true,
Streams: []llotypes.Stream{
{StreamID: streamIDTombstone, Aggregator: llotypes.AggregatorMedian},
},
},
}
url2, sha2 := newChannelDefinitionsServer(t, tombstonedDefs)
_, err = configStore.SetChannelDefinitions(steve, donID, url2, sha2)
require.NoError(t, err)
backend.Commit()

tombstonedChannel := map[uint32]bool{2: true}
checkNoReportsWindow := 5 * time.Second
require.Eventually(t, func() bool {
start := time.Now()
sawTombstoned := false
for time.Since(start) < checkNoReportsWindow {
pckt, err := receiveWithTimeout(t, packetCh, 1*time.Second)
if err != nil {
continue
}
req := pckt.req
if req.ReportFormat != uint32(llotypes.ReportFormatJSON) {
continue
}
_, _, r, _, err := (datastreamsllo.JSONReportCodec{}).UnpackDecode(req.Payload)
if err == nil && tombstonedChannel[r.ChannelID] {
sawTombstoned = true
break
}
}
return !sawTombstoned
}, 45*time.Second, 200*time.Millisecond, "channel 2 should stop producing reports after tombstone")

// After reports for channel 2 have stopped, bridge traffic for streamIDTombstone should stop
// while streamIDActive continues to be observed.
bCallsAfterReportsStopped := streamBCalls.Load()
aCallsAfterReportsStopped := streamACalls.Load()
time.Sleep(6 * time.Second)
require.Equal(t, bCallsAfterReportsStopped, streamBCalls.Load(),
"tombstoned channel's stream should not be observed (no additional bridge calls)")
require.Greater(t, streamACalls.Load(), aCallsAfterReportsStopped,
"active channel's stream should still be observed")
}

func setupNodes(t *testing.T, nNodes int, backend evmtypes.Backend, clientCSAKeys []csakey.KeyV2, f func(*chainlink.Config)) (oracles []confighelper.OracleIdentityExtra, nodes []Node) {
ports := freeport.GetN(t, nNodes)
for i := range nNodes {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1325,6 +1325,80 @@ func Test_ChannelDefinitionCache_OwnerAndAdderMerging(t *testing.T) {
"tombstoned channel 600 should be dropped, channel 601 should remain")
})

// After the owner omits a tombstoned channel from their on-chain definitions, the in-memory
// cache no longer carries that channel for the owner source. The merged map returned by
// Definitions(prevOutcome) is what the LLO plugin treats as channel definitions for the OCR
// outcome; it must stay free of the dropped channel on later rounds too.
//
// Depends on prior subtests in this test (owner tombstone + omit flow); do not run in
// isolation with go test -run matching only this subtest name.
t.Run("dropped tombstoned channel stays out of merged outcome after cache update", func(t *testing.T) {
observedLogs.TakeAll()

prevSimulatingOCROutcome := llotypes.ChannelDefinitions{
600: {
ReportFormat: llotypes.ReportFormatJSON,
Streams: []llotypes.Stream{
{StreamID: 1, Aggregator: llotypes.AggregatorMedian},
},
Source: channeldefinitions.SourceOwner,
Tombstone: true,
},
601: {
ReportFormat: llotypes.ReportFormatJSON,
Streams: []llotypes.Stream{
{StreamID: 2, Aggregator: llotypes.AggregatorMode},
},
Source: channeldefinitions.SourceOwner,
Tombstone: false,
},
602: {
ReportFormat: llotypes.ReportFormatJSON,
Streams: []llotypes.Stream{
{StreamID: 3, Aggregator: llotypes.AggregatorMedian},
},
Source: adder1ID,
Tombstone: false,
},
}

mergedOutcome := cdc.Definitions(prevSimulatingOCROutcome)
_, has600 := mergedOutcome[600]
require.False(t, has600, "merged outcome should not contain dropped tombstoned channel 600")
_, has601 := mergedOutcome[601]
require.True(t, has601, "merged outcome should still contain channel 601")
_, has602 := mergedOutcome[602]
require.True(t, has602, "merged outcome should still contain adder channel 602")

// Simulate the next observation round: prev is the prior merged channel definitions.
mergedAgain := cdc.Definitions(mergedOutcome)
_, still600 := mergedAgain[600]
require.False(t, still600, "channel 600 must not reappear in merged outcome on subsequent Definitions(prev) calls")
require.Contains(t, mergedAgain, llotypes.ChannelID(601))
require.Contains(t, mergedAgain, llotypes.ChannelID(602))

require.Eventually(t, func() bool {
loaded, err := orm.LoadChannelDefinitions(testutils.Context(t), configStoreAddress, donID)
if err != nil || loaded == nil {
return false
}
if loaded.Format != channeldefinitions.MultiChannelDefinitionsFormat {
return false
}
var sources map[uint32]llotypes2.SourceDefinition
if err = json.Unmarshal(loaded.Definitions, &sources); err != nil {
return false
}
ownerSrc, ok := sources[channeldefinitions.SourceOwner]
if !ok {
return false
}
_, ownerHas600 := ownerSrc.Definitions[600]
return !ownerHas600 && len(ownerSrc.Definitions) > 0
}, 5*time.Second, 100*time.Millisecond,
"persisted owner source definitions should not list channel 600 after owner omitted it from the cache")
})

t.Run("multiple adders can add different channels", func(t *testing.T) {
observedLogs.TakeAll()

Expand Down
Loading