diff --git a/dedupqueue_test.go b/dedupqueue_test.go index 5458442..79d4158 100644 --- a/dedupqueue_test.go +++ b/dedupqueue_test.go @@ -4,6 +4,7 @@ import ( "sync" "sync/atomic" "testing" + "testing/synctest" "time" "github.com/stretchr/testify/require" @@ -37,35 +38,38 @@ func TestDedupQueueSimple(t *testing.T) { } func TestDedupQueueParallel(t *testing.T) { - // Make a store that counts the requests to it - var requests int64 - store := &TestStore{ - GetChunkFunc: func(ChunkID) (*Chunk, error) { - time.Sleep(time.Millisecond) // make it artificially slow to not complete too early - atomic.AddInt64(&requests, 1) - return NewChunk([]byte{0}), nil - }, - } - q := NewDedupQueue(store) + synctest.Test(t, func(t *testing.T) { + // Make a store that counts the requests to it + var requests atomic.Int64 + store := &TestStore{ + GetChunkFunc: func(ChunkID) (*Chunk, error) { + // The fake clock only advances once all other goroutines are + // blocked, so this guarantees they all registered as waiters + // on this request before it completes + time.Sleep(time.Millisecond) + requests.Add(1) + return NewChunk([]byte{0}), nil + }, + } + q := NewDedupQueue(store) - var ( - wg sync.WaitGroup - start = make(chan struct{}) - ) + var ( + wg sync.WaitGroup + start = make(chan struct{}) + ) - // Start several goroutines all asking for the same chunk from the store - for range 10 { - wg.Add(1) - go func() { - <-start - q.GetChunk(ChunkID{0}) - wg.Done() - }() - } + // Start several goroutines all asking for the same chunk from the store + for range 10 { + wg.Go(func() { + <-start + q.GetChunk(ChunkID{0}) + }) + } - close(start) - wg.Wait() + close(start) + wg.Wait() - // There should ideally be just one requests that was done on the upstream store - require.LessOrEqual(t, requests, int64(1), "requests to the store") + // There should be just one request that was done on the upstream store + require.EqualValues(t, 1, requests.Load(), "requests to the store") + }) } diff --git a/writededupqueue_test.go b/writededupqueue_test.go index 5cced68..8d52115 100644 --- a/writededupqueue_test.go +++ b/writededupqueue_test.go @@ -2,6 +2,7 @@ package desync import ( "testing" + "testing/synctest" "time" "github.com/stretchr/testify/require" @@ -10,22 +11,36 @@ import ( // Test read access before write access to ensure a failing read doesn't // impact the write operation (should use separate queues). func TestWriteDedupQueueParallelReadWrite(t *testing.T) { - c := NewChunk([]byte{1, 2, 3, 4}) - sleeping := make(chan struct{}) - store := &TestStore{ - // Slow GetChunk operation - GetChunkFunc: func(id ChunkID) (*Chunk, error) { - close(sleeping) - time.Sleep(time.Second) - return nil, ChunkMissing{id} - }, - } - q := NewWriteDedupQueue(store) + synctest.Test(t, func(t *testing.T) { + c := NewChunk([]byte{1, 2, 3, 4}) + sleeping := make(chan struct{}) + store := &TestStore{ + // Slow GetChunk operation + GetChunkFunc: func(id ChunkID) (*Chunk, error) { + close(sleeping) + time.Sleep(time.Second) + return nil, ChunkMissing{id} + }, + } + q := NewWriteDedupQueue(store) - // Queue us a slow HasChunk() operation, then perform a StoreChunk(). The store - // operation should not be impacted by the ongoing read - go q.GetChunk(c.ID()) - <-sleeping + // Queue up a slow GetChunk() operation, then perform a StoreChunk(). The store + // operation should not be impacted by the ongoing read + done := make(chan struct{}) + go func() { + defer close(done) + q.GetChunk(c.ID()) + }() + <-sleeping - require.NoError(t, q.StoreChunk(c)) + start := time.Now() + require.NoError(t, q.StoreChunk(c)) + + // The fake clock only advances while all goroutines are blocked, so any + // time passing here means the write waited for the in-flight read + require.Zero(t, time.Since(start), "StoreChunk() blocked on the ongoing GetChunk()") + + // Wait for the read to finish, all goroutines must be done before the test returns + <-done + }) }