Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 31 additions & 27 deletions dedupqueue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"sync"
"sync/atomic"
"testing"
"testing/synctest"
"time"

"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -37,35 +38,38 @@ func TestDedupQueueSimple(t *testing.T) {
}

func TestDedupQueueParallel(t *testing.T) {
// Make a store that counts the requests to it
var requests int64
store := &TestStore{
GetChunkFunc: func(ChunkID) (*Chunk, error) {
time.Sleep(time.Millisecond) // make it artificially slow to not complete too early
atomic.AddInt64(&requests, 1)
return NewChunk([]byte{0}), nil
},
}
q := NewDedupQueue(store)
synctest.Test(t, func(t *testing.T) {
// Make a store that counts the requests to it
var requests atomic.Int64
store := &TestStore{
GetChunkFunc: func(ChunkID) (*Chunk, error) {
// The fake clock only advances once all other goroutines are
// blocked, so this guarantees they all registered as waiters
// on this request before it completes
time.Sleep(time.Millisecond)
requests.Add(1)
return NewChunk([]byte{0}), nil
},
}
q := NewDedupQueue(store)

var (
wg sync.WaitGroup
start = make(chan struct{})
)
var (
wg sync.WaitGroup
start = make(chan struct{})
)

// Start several goroutines all asking for the same chunk from the store
for range 10 {
wg.Add(1)
go func() {
<-start
q.GetChunk(ChunkID{0})
wg.Done()
}()
}
// Start several goroutines all asking for the same chunk from the store
for range 10 {
wg.Go(func() {
<-start
q.GetChunk(ChunkID{0})
})
}

close(start)
wg.Wait()
close(start)
wg.Wait()

// There should ideally be just one requests that was done on the upstream store
require.LessOrEqual(t, requests, int64(1), "requests to the store")
// There should be just one request that was done on the upstream store
require.EqualValues(t, 1, requests.Load(), "requests to the store")
})
}
47 changes: 31 additions & 16 deletions writededupqueue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package desync

import (
"testing"
"testing/synctest"
"time"

"github.com/stretchr/testify/require"
Expand All @@ -10,22 +11,36 @@ import (
// Test read access before write access to ensure a failing read doesn't
// impact the write operation (should use separate queues).
func TestWriteDedupQueueParallelReadWrite(t *testing.T) {
c := NewChunk([]byte{1, 2, 3, 4})
sleeping := make(chan struct{})
store := &TestStore{
// Slow GetChunk operation
GetChunkFunc: func(id ChunkID) (*Chunk, error) {
close(sleeping)
time.Sleep(time.Second)
return nil, ChunkMissing{id}
},
}
q := NewWriteDedupQueue(store)
synctest.Test(t, func(t *testing.T) {
c := NewChunk([]byte{1, 2, 3, 4})
sleeping := make(chan struct{})
store := &TestStore{
// Slow GetChunk operation
GetChunkFunc: func(id ChunkID) (*Chunk, error) {
close(sleeping)
time.Sleep(time.Second)
return nil, ChunkMissing{id}
},
}
q := NewWriteDedupQueue(store)

// Queue us a slow HasChunk() operation, then perform a StoreChunk(). The store
// operation should not be impacted by the ongoing read
go q.GetChunk(c.ID())
<-sleeping
// Queue up a slow GetChunk() operation, then perform a StoreChunk(). The store
// operation should not be impacted by the ongoing read
done := make(chan struct{})
go func() {
defer close(done)
q.GetChunk(c.ID())
}()
<-sleeping

require.NoError(t, q.StoreChunk(c))
start := time.Now()
require.NoError(t, q.StoreChunk(c))

// The fake clock only advances while all goroutines are blocked, so any
// time passing here means the write waited for the in-flight read
require.Zero(t, time.Since(start), "StoreChunk() blocked on the ongoing GetChunk()")

// Wait for the read to finish, all goroutines must be done before the test returns
<-done
})
}
Loading