MX-17193 Unique chunks processor integration test#7323
Conversation
|
❌ Integration Tests completed with failures or errors. 📊 MultiversX Automated Test Report: View Report 🔄 Build Details:
🚀 Environment Variables:
|
|
❌ Integration Tests completed with failures or errors. 📊 MultiversX Automated Test Report: View Report 🔄 Build Details:
🚀 Environment Variables:
|
|
❌ Integration Tests completed with failures or errors. 📊 MultiversX Automated Test Report: View Report 🔄 Build Details:
🚀 Environment Variables:
|
Codecov Report❌ Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## feat/supernova-async-exec #7323 +/- ##
=============================================================
- Coverage 77.53% 77.49% -0.05%
=============================================================
Files 878 878
Lines 122173 122238 +65
=============================================================
- Hits 94728 94725 -3
- Misses 21135 21202 +67
- Partials 6310 6311 +1 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
There was a problem hiding this comment.
Pull Request Overview
This pull request adds an integration test for the unique chunks processor/deduplication functionality. The test verifies that the system correctly deduplicates messages when the same transaction packets are broadcast multiple times across shards.
- Added
GenerateAndSendDuplicatedBulkTransactionsmethod to support sending duplicated transaction batches for testing - Created integration test to validate message deduplication across shard boundaries
- Implemented
countingMessageProcessorhelper to track message counts during deduplication testing
Reviewed Changes
Copilot reviewed 3 out of 3 changed files in this pull request and generated 11 comments.
| File | Description |
|---|---|
| node/nodeTesting.go | Added new GenerateAndSendDuplicatedBulkTransactions function to generate and broadcast duplicated transaction batches for testing deduplication logic |
| integrationTests/deduplication/single_deduplication_test.go | Implemented integration test that verifies deduplication by sending duplicate packets and validating that interceptors properly deduplicate them |
| integrationTests/deduplication/countingMessageProcessor.go | Created helper processor to count received messages for validation in deduplication tests |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| num := atomic.LoadUint32(&mp.numMessagesReceived) + 1 | ||
| atomic.AddUint32(&mp.numMessagesReceived, 1) |
There was a problem hiding this comment.
Inefficient atomic operation: loading the value with atomic.LoadUint32(&mp.numMessagesReceived) and then incrementing with atomic.AddUint32(&mp.numMessagesReceived, 1) is redundant. The return value of atomic.AddUint32 already gives the new value. Consider using: num := atomic.AddUint32(&mp.numMessagesReceived, 1).
| num := atomic.LoadUint32(&mp.numMessagesReceived) + 1 | |
| atomic.AddUint32(&mp.numMessagesReceived, 1) | |
| num := atomic.AddUint32(&mp.numMessagesReceived, 1) |
| // TestNode_ShouldDeduplicateDuplicateMessages | ||
| // Scenario: | ||
| // | ||
| // messenger1 -> (duplicates) -> node0(interceptor) -> (cross-shard) -> messenger2 | ||
| func TestNode_ShouldDeduplicateMessages(t *testing.T) { |
There was a problem hiding this comment.
The function name in the comment TestNode_ShouldDeduplicateDuplicateMessages doesn't match the actual function name TestNode_ShouldDeduplicateMessages. Either update the comment to match the function name or vice versa.
| integrationTests.WhiteListTxs(nodes, txs) | ||
| } | ||
|
|
||
| duplicatedTxCount := 5 // each batch sent 5 times |
There was a problem hiding this comment.
The variable name duplicatedTxCount is misleading. It represents the number of times each batch/packet is duplicated, not the count of duplicated transactions. Consider renaming to numDuplicationsPerPacket or duplicationsPerBatch for clarity.
|
|
||
| atomic.AddInt32(¤tSendingGoRoutines, int32(len(packets))) | ||
| for _, buff := range packets { | ||
| for i := 0; i < num_duplications; i++ { |
There was a problem hiding this comment.
Variable name uses snake_case (num_duplications) instead of camelCase, which is inconsistent with Go naming conventions. Should be numDuplications.
| for i := 0; i < num_duplications; i++ { | |
| for i := 0; i < numDuplications; i++ { |
| logger "github.com/multiversx/mx-chain-logger-go" | ||
| ) | ||
|
|
||
| var log = logger.GetOrCreate("api/gin") |
There was a problem hiding this comment.
Logger is initialized with incorrect package name "api/gin". This should reflect the actual package path, which is "deduplication" or "integrationTests/deduplication".
| var log = logger.GetOrCreate("api/gin") | |
| var log = logger.GetOrCreate("deduplication") |
| for _, n := range nodes { | ||
| _ = logger.SetLogLevel("*:ERROR") | ||
| n.Close() | ||
| } |
There was a problem hiding this comment.
The messenger2 messenger is created but not properly cleaned up in the defer block. Only node0 and messenger1 are cleaned up via the nodes slice, but messenger2 should also be closed to prevent resource leaks. Add _ = messenger2.Close() to the cleanup code.
| } | |
| } | |
| _ = messenger2.Close() |
|
|
||
| // --- validate deduplication | ||
| txRecvNode0 := atomic.LoadInt32(&node0.CounterTxRecv) | ||
| msgsRecvNode0AfterDeduplication := node0Proc.NumMessagesReceived() //node0.MainMessenger.(*integrationTests.CountingMessenger).MsgCount |
There was a problem hiding this comment.
Remove commented-out code //node0.MainMessenger.(*integrationTests.CountingMessenger).MsgCount. If this was for debugging or reference, it should be deleted in the final version.
| msgsRecvNode0AfterDeduplication := node0Proc.NumMessagesReceived() //node0.MainMessenger.(*integrationTests.CountingMessenger).MsgCount | |
| msgsRecvNode0AfterDeduplication := node0Proc.NumMessagesReceived() |
| atomic.AddInt32(¤tSendingGoRoutines, int32(len(packets))) | ||
| for _, buff := range packets { | ||
| for i := 0; i < num_duplications; i++ { | ||
| n.networkComponents.NetworkMessenger().BroadcastOnChannel( | ||
| txsSender.SendTransactionsPipe, | ||
| identifier, | ||
| buff, | ||
| ) | ||
| } | ||
|
|
||
| atomic.AddInt32(¤tSendingGoRoutines, -1) |
There was a problem hiding this comment.
Incorrect atomic counter management. The code increments currentSendingGoRoutines by len(packets) (line 237) but then broadcasts each packet num_duplications times before decrementing once per packet (line 247). This doesn't accurately reflect the actual workload. The counter should account for the total number of broadcasts (len(packets) * num_duplications) or be restructured to properly track concurrent sending operations.
| fmt.Println("messenger2 received total messages:", msgsRecvMessenger2) | ||
| assert.Equal(t, int32(numPackets), int32(msgsRecvNode0AfterDeduplication), "interceptor should have deduplicated messages") | ||
| assert.Equal(t, int32(numPackets), int32(msgsRecvMessenger2), "messenger2 should have received deduplicated messages") | ||
| assert.Equal(t, int32(numPackets*duplicatedTxCount), int32(msgsRecvMessenger1), "messenger1 should have received all messages it sent") |
There was a problem hiding this comment.
[nitpick] The assertion logic appears questionable. The test expects messenger1 to receive numPackets * duplicatedTxCount messages, suggesting it receives all the duplicates it sent. However, this is testing whether messenger1 echoes back its own messages, which may not be the intended test behavior. Consider clarifying the test's intent: if messenger1 should only send (not receive), this assertion should be removed or the message processor registration on line 47-48 should be removed. If self-receiving is intentional, add a comment explaining why.
| assert.Equal(t, int32(numPackets*duplicatedTxCount), int32(msgsRecvMessenger1), "messenger1 should have received all messages it sent") |
| } | ||
|
|
||
| func (n *Node) GenerateAndSendDuplicatedBulkTransactions( | ||
| num_duplications int, |
There was a problem hiding this comment.
Variable name uses snake_case (num_duplications) instead of camelCase, which is inconsistent with Go naming conventions. Should be numDuplications.
| num_duplications int, | |
| numDuplications int, |
|
❌ Integration Tests completed with failures or errors. 📊 MultiversX Automated Test Report: View Report 🔄 Build Details:
🚀 Environment Variables:
|
|
❌ Integration Tests completed with failures or errors. 📊 MultiversX Automated Test Report: View Report 🔄 Build Details:
🚀 Environment Variables:
|
|
❌ Integration Tests completed with failures or errors. 📊 MultiversX Automated Test Report: View Report 🔄 Build Details:
🚀 Environment Variables:
|
|
❌ Integration Tests completed with failures or errors. 📊 MultiversX Automated Test Report: View Report 🔄 Build Details:
🚀 Environment Variables:
|
|
✅ Integration Tests passed successfully! 📊 MultiversX Automated Test Report: View Report 🔄 Build Details:
🚀 Environment Variables:
|
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 3 out of 3 changed files in this pull request and generated 4 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| } | ||
|
|
||
| func (n *Node) GenerateAndSendDuplicatedBulkTransactions( | ||
| num_duplications int, |
There was a problem hiding this comment.
Parameter num_duplications uses snake_case, which is inconsistent with Go naming conventions and the rest of this file. Rename it to numDuplications and update its uses accordingly.
| num_duplications int, | |
| numDuplications int, |
| mutTransactions := sync.RWMutex{} | ||
| txsBuff := make([][]byte, 0) | ||
|
|
There was a problem hiding this comment.
mutTransactions is declared as sync.RWMutex but is only ever used with Lock/Unlock (no RLock). Consider using sync.Mutex and pre-allocating txsBuff/txs with capacity numOfTxs to avoid repeated slice growth while goroutines append.
| atomic.AddInt32(¤tSendingGoRoutines, int32(len(packets))) | ||
| for _, buff := range packets { | ||
| for i := 0; i < num_duplications; i++ { | ||
| n.networkComponents.NetworkMessenger().BroadcastOnChannel( | ||
| txsSender.SendTransactionsPipe, | ||
| identifier, | ||
| buff, | ||
| ) | ||
| } | ||
|
|
||
| atomic.AddInt32(¤tSendingGoRoutines, -1) | ||
| } |
There was a problem hiding this comment.
currentSendingGoRoutines is incremented by len(packets) but this method broadcasts each packet num_duplications times. This makes the throttle counter inconsistent with the actual number of sends performed, potentially bypassing ErrSystemBusyGeneratingTransactions under load. Adjust the counter to reflect len(packets) * numDuplications (or document why duplicates shouldn’t be counted).
| whiteList func([]*transaction.Transaction), | ||
| chainID []byte, | ||
| minTxVersion uint32, | ||
| ) (numMessages int, err error) { |
There was a problem hiding this comment.
The named return value numMessages is misleading: the function returns len(packets) (number of packed chunks), not the number of broadcasts performed (which would be len(packets) * numDuplications). Consider renaming the return value (e.g. numPackets) or adjusting what is returned to match the name.
| ) (numMessages int, err error) { | |
| ) (numPackets int, err error) { |
Reasoning behind the pull request
Proposed changes
Testing procedure
Pre-requisites
Based on the Contributing Guidelines the PR author and the reviewers must check the following requirements are met:
featbranch created?featbranch merging, do all satellite projects have a proper tag insidego.mod?