From e3b9e1bcdaea2e1c68439d8a3e3663b66cfdcf29 Mon Sep 17 00:00:00 2001 From: folbrich Date: Thu, 11 Jun 2026 20:48:00 +0200 Subject: [PATCH] Use testing/synctest in dedup queue tests Run TestDedupQueueParallel and TestWriteDedupQueueParallelReadWrite inside synctest bubbles. The fake clock makes the previously timing-dependent behavior deterministic: - TestDedupQueueParallel: the sleep in GetChunkFunc now guarantees all goroutines have registered as waiters before the first request completes, so the assertion can require exactly one upstream request instead of "ideally just one". - TestWriteDedupQueueParallelReadWrite: the test now asserts that no fake time elapses across StoreChunk(), proving the write does not queue behind the in-flight slow read. The 1s sleep no longer costs wall time and the background read goroutine no longer leaks past the test. --- dedupqueue_test.go | 58 ++++++++++++++++++++++------------------- writededupqueue_test.go | 47 +++++++++++++++++++++------------ 2 files changed, 62 insertions(+), 43 deletions(-) diff --git a/dedupqueue_test.go b/dedupqueue_test.go index 5458442..79d4158 100644 --- a/dedupqueue_test.go +++ b/dedupqueue_test.go @@ -4,6 +4,7 @@ import ( "sync" "sync/atomic" "testing" + "testing/synctest" "time" "github.com/stretchr/testify/require" @@ -37,35 +38,38 @@ func TestDedupQueueSimple(t *testing.T) { } func TestDedupQueueParallel(t *testing.T) { - // Make a store that counts the requests to it - var requests int64 - store := &TestStore{ - GetChunkFunc: func(ChunkID) (*Chunk, error) { - time.Sleep(time.Millisecond) // make it artificially slow to not complete too early - atomic.AddInt64(&requests, 1) - return NewChunk([]byte{0}), nil - }, - } - q := NewDedupQueue(store) + synctest.Test(t, func(t *testing.T) { + // Make a store that counts the requests to it + var requests atomic.Int64 + store := &TestStore{ + GetChunkFunc: func(ChunkID) (*Chunk, error) { + // The fake clock only advances once all other goroutines are + // blocked, so this guarantees they all registered as waiters + // on this request before it completes + time.Sleep(time.Millisecond) + requests.Add(1) + return NewChunk([]byte{0}), nil + }, + } + q := NewDedupQueue(store) - var ( - wg sync.WaitGroup - start = make(chan struct{}) - ) + var ( + wg sync.WaitGroup + start = make(chan struct{}) + ) - // Start several goroutines all asking for the same chunk from the store - for range 10 { - wg.Add(1) - go func() { - <-start - q.GetChunk(ChunkID{0}) - wg.Done() - }() - } + // Start several goroutines all asking for the same chunk from the store + for range 10 { + wg.Go(func() { + <-start + q.GetChunk(ChunkID{0}) + }) + } - close(start) - wg.Wait() + close(start) + wg.Wait() - // There should ideally be just one requests that was done on the upstream store - require.LessOrEqual(t, requests, int64(1), "requests to the store") + // There should be just one request that was done on the upstream store + require.EqualValues(t, 1, requests.Load(), "requests to the store") + }) } diff --git a/writededupqueue_test.go b/writededupqueue_test.go index 5cced68..8d52115 100644 --- a/writededupqueue_test.go +++ b/writededupqueue_test.go @@ -2,6 +2,7 @@ package desync import ( "testing" + "testing/synctest" "time" "github.com/stretchr/testify/require" @@ -10,22 +11,36 @@ import ( // Test read access before write access to ensure a failing read doesn't // impact the write operation (should use separate queues). func TestWriteDedupQueueParallelReadWrite(t *testing.T) { - c := NewChunk([]byte{1, 2, 3, 4}) - sleeping := make(chan struct{}) - store := &TestStore{ - // Slow GetChunk operation - GetChunkFunc: func(id ChunkID) (*Chunk, error) { - close(sleeping) - time.Sleep(time.Second) - return nil, ChunkMissing{id} - }, - } - q := NewWriteDedupQueue(store) + synctest.Test(t, func(t *testing.T) { + c := NewChunk([]byte{1, 2, 3, 4}) + sleeping := make(chan struct{}) + store := &TestStore{ + // Slow GetChunk operation + GetChunkFunc: func(id ChunkID) (*Chunk, error) { + close(sleeping) + time.Sleep(time.Second) + return nil, ChunkMissing{id} + }, + } + q := NewWriteDedupQueue(store) - // Queue us a slow HasChunk() operation, then perform a StoreChunk(). The store - // operation should not be impacted by the ongoing read - go q.GetChunk(c.ID()) - <-sleeping + // Queue up a slow GetChunk() operation, then perform a StoreChunk(). The store + // operation should not be impacted by the ongoing read + done := make(chan struct{}) + go func() { + defer close(done) + q.GetChunk(c.ID()) + }() + <-sleeping - require.NoError(t, q.StoreChunk(c)) + start := time.Now() + require.NoError(t, q.StoreChunk(c)) + + // The fake clock only advances while all goroutines are blocked, so any + // time passing here means the write waited for the in-flight read + require.Zero(t, time.Since(start), "StoreChunk() blocked on the ongoing GetChunk()") + + // Wait for the read to finish, all goroutines must be done before the test returns + <-done + }) }