From 5c99597a7e9f41d27b5f358b55c870e881336c40 Mon Sep 17 00:00:00 2001 From: Josh Newman <3987603+josh-newman@users.noreply.github.com> Date: Wed, 16 Sep 2020 19:29:04 +0000 Subject: [PATCH 1/5] Push reader --- push_read.go | 80 +++++++++++++++++++++++++++++++++++++++++++++++ push_read_test.go | 31 ++++++++++++++++++ slice_test.go | 2 +- 3 files changed, 112 insertions(+), 1 deletion(-) create mode 100644 push_read.go create mode 100644 push_read_test.go diff --git a/push_read.go b/push_read.go new file mode 100644 index 0000000..695cf22 --- /dev/null +++ b/push_read.go @@ -0,0 +1,80 @@ +package bigslice + +import ( + "fmt" + "reflect" + + "github.com/grailbio/base/must" + "github.com/grailbio/bigslice/slicefunc" + "github.com/grailbio/bigslice/sliceio" + "github.com/grailbio/bigslice/typecheck" +) + +func PushReader(nshard int, sinkRead interface{}, prags ...Pragma) Slice { + fn, ok := slicefunc.Of(sinkRead) + if !ok || fn.In.NumOut() != 2 || fn.In.Out(0).Kind() != reflect.Int { + typecheck.Panicf(1, "pushreader: invalid reader function type %T", sinkRead) + } + + var ( + sinkType = fn.In.Out(1) + errorType = reflect.TypeOf((*error)(nil)).Elem() + errorNilValue = reflect.Zero(errorType) + ) + + type state struct { + sunkC chan []reflect.Value + err error + } + readerFuncImpl := func(args []reflect.Value) []reflect.Value { + state := args[1].Interface().(*state) + if state.sunkC == nil { + state.sunkC = make(chan []reflect.Value, defaultChunksize) + sinkImpl := func(args []reflect.Value) []reflect.Value { + state.sunkC <- args + return nil + } + sinkFunc := reflect.MakeFunc(sinkType, sinkImpl) + go func() { + defer close(state.sunkC) + defer func() { + if p := recover(); p != nil { + state.err = fmt.Errorf("pushreader: panic from read: %v", p) + } + }() + outs := reflect.ValueOf(sinkRead).Call([]reflect.Value{args[0], sinkFunc}) + if errI := outs[0].Interface(); errI != nil { + state.err = errI.(error) + } + }() + } + + var rows int + for rows < args[2].Len() { + row := <-state.sunkC + if row == nil { + state.err = sliceio.EOF + break + } + must.True(len(row) == len(args[2:]), "%v, %v", len(row), len(args[2:])) + for c := range row { + args[2+c].Index(rows).Set(row[c]) + } + rows++ + } + errValue := errorNilValue + if state.err != nil { + errValue = reflect.ValueOf(state.err) + } + return []reflect.Value{reflect.ValueOf(rows), errValue} + } + readerFuncArgTypes := []reflect.Type{reflect.TypeOf(int(0)), reflect.TypeOf(&state{})} + for i := 0; i < sinkType.NumIn(); i++ { + readerFuncArgTypes = append(readerFuncArgTypes, reflect.SliceOf(sinkType.In(i))) + } + readerFuncType := reflect.FuncOf(readerFuncArgTypes, + []reflect.Type{reflect.TypeOf(int(0)), errorType}, false) + readerFunc := reflect.MakeFunc(readerFuncType, readerFuncImpl) + + return ReaderFunc(nshard, readerFunc.Interface(), prags...) +} diff --git a/push_read_test.go b/push_read_test.go new file mode 100644 index 0000000..3f80c53 --- /dev/null +++ b/push_read_test.go @@ -0,0 +1,31 @@ +package bigslice_test + +import ( + "testing" + + fuzz "github.com/google/gofuzz" + "github.com/grailbio/bigslice" +) + +func TestPushReader(t *testing.T) { + const ( + N = 3 + Nshard = 1 + ) + slice := bigslice.PushReader(Nshard, func(shard int, sink func(string, int)) error { + fuzzer := fuzz.NewWithSeed(1) + var row struct { + string + int + } + for i := 0; i < N; i++ { + fuzzer.Fuzz(&row) + sink(row.string, row.int) + } + return nil + }) + // Map everything to the same key so we can count them. + slice = bigslice.Map(slice, func(s string, i int) (key string, count int) { return "", 1 }) + slice = bigslice.Fold(slice, func(a, e int) int { return a + e }) + assertEqual(t, slice, false, []string{""}, []int{N * Nshard}) +} diff --git a/slice_test.go b/slice_test.go index d8a0083..971a5f2 100644 --- a/slice_test.go +++ b/slice_test.go @@ -325,7 +325,7 @@ func TestReaderFunc(t *testing.T) { t.Errorf("%d (of %d) nonzero rows", nnonzero, len(strings)) } if state.Fuzzer == nil { - state.Fuzzer = fuzz.New() + state.Fuzzer = fuzz.NewWithSeed(1) } state.NumElements(1, len(strings)) var ( From 434562ae0e0d25e7d89b31980b4c84b8de79edb0 Mon Sep 17 00:00:00 2001 From: Josh Newman <3987603+josh-newman@users.noreply.github.com> Date: Fri, 6 Nov 2020 03:31:38 +0000 Subject: [PATCH 2/5] Comparative benchmark --- push_read_test.go | 105 ++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 105 insertions(+) diff --git a/push_read_test.go b/push_read_test.go index 3f80c53..5505255 100644 --- a/push_read_test.go +++ b/push_read_test.go @@ -1,10 +1,17 @@ package bigslice_test import ( + "context" + "math" + "math/rand" + "strconv" "testing" fuzz "github.com/google/gofuzz" + "github.com/grailbio/base/must" "github.com/grailbio/bigslice" + "github.com/grailbio/bigslice/exec" + "github.com/grailbio/bigslice/sliceio" ) func TestPushReader(t *testing.T) { @@ -29,3 +36,101 @@ func TestPushReader(t *testing.T) { slice = bigslice.Fold(slice, func(a, e int) int { return a + e }) assertEqual(t, slice, false, []string{""}, []int{N * Nshard}) } + +// On an m5d.2xlarge on EC2: +// goos: linux +// goarch: amd64 +// pkg: github.com/grailbio/bigslice +// BenchmarkReaders/10/pushreader-8 110 10705539 ns/op +// BenchmarkReaders/10/readerfunc-8 100 10036689 ns/op +// BenchmarkReaders/1000/pushreader-8 8 131828675 ns/op +// BenchmarkReaders/1000/readerfunc-8 12 97414219 ns/op +// BenchmarkReaders/100000/pushreader-8 1 11181803529 ns/op +// BenchmarkReaders/100000/readerfunc-8 1 8714653324 ns/op +// PASS +// ok github.com/grailbio/bigslice 26.135s +func BenchmarkReaders(b *testing.B) { + ctx := context.Background() + sess := exec.Start(exec.Local) + for _, rowsPerShard := range []int{10, 1000, 100000} { + lastResult := -1 + checkResult := func(sliceResult *exec.Result) { + scanner := sliceResult.Scanner() + var result int + must.True(scanner.Scan(ctx, &result)) + if lastResult < 0 { + lastResult = result + } + must.True(lastResult == result) + } + + b.Run(strconv.Itoa(rowsPerShard), func(b *testing.B) { + b.Run("pushreader", func(b *testing.B) { + for i := 0; i < b.N; i++ { + checkResult(sess.Must(ctx, benchmarkPushReader, int64(1), rowsPerShard, true)) + } + }) + b.Run("readerfunc", func(b *testing.B) { + for i := 0; i < b.N; i++ { + checkResult(sess.Must(ctx, benchmarkPushReader, int64(1), rowsPerShard, false)) + } + }) + }) + } + sess.Shutdown() +} + +var benchmarkPushReader = bigslice.Func(func( + seed int64, + rowsPerShard int, + usePush bool, +) bigslice.Slice { + const nShards = 100 + shardSeeds := make([]int64, nShards) + rnd := rand.New(rand.NewSource(seed)) + for i := range shardSeeds { + shardSeeds[i] = rnd.Int63() + } + var slice bigslice.Slice + if usePush { + slice = bigslice.PushReader(nShards, func(shard int, row func(int32)) error { + shardRnd := rand.New(rand.NewSource(shardSeeds[shard])) + for i := 0; i < rowsPerShard; i++ { + row(shardRnd.Int31()) + } + return nil + }) + } else { + type state struct { + *rand.Rand + doneRows int + } + slice = bigslice.ReaderFunc(nShards, func(shard int, state *state, nums []int32) (int, error) { + if state.Rand == nil { + state.Rand = rand.New(rand.NewSource(shardSeeds[shard])) + } + if state.doneRows == rowsPerShard { + return 0, sliceio.EOF + } + var i int + for state.doneRows < rowsPerShard && i < len(nums) { + nums[i] = state.Rand.Int31() + state.doneRows++ + i++ + } + return i, nil + }) + } + slice = bigslice.Map(slice, func(num int32) (joinKey int, _ int32) { + return 0, num + }) + slice = bigslice.Fold(slice, func(accum int, num int32) int { + if num < math.MaxInt32/2 { + return accum + } + return accum + 1 + }) + return bigslice.Map(slice, func(joinKey, accum int) int { + return accum + }) +}) From a75067d36b4fe1f583e86256179c996de8449ddc Mon Sep 17 00:00:00 2001 From: Josh Newman <3987603+josh-newman@users.noreply.github.com> Date: Tue, 10 Nov 2020 03:11:35 +0000 Subject: [PATCH 3/5] vectorize chan ops --- push_read.go | 87 +++++++++++++++++++++++++++++++++++----------------- 1 file changed, 59 insertions(+), 28 deletions(-) diff --git a/push_read.go b/push_read.go index 695cf22..f7d5fa0 100644 --- a/push_read.go +++ b/push_read.go @@ -4,7 +4,6 @@ import ( "fmt" "reflect" - "github.com/grailbio/base/must" "github.com/grailbio/bigslice/slicefunc" "github.com/grailbio/bigslice/sliceio" "github.com/grailbio/bigslice/typecheck" @@ -22,51 +21,72 @@ func PushReader(nshard int, sinkRead interface{}, prags ...Pragma) Slice { errorNilValue = reflect.Zero(errorType) ) + type chunkResult struct { + n int + err error + } type state struct { - sunkC chan []reflect.Value - err error + emptyC chan readerChunk + sink struct { + filling readerChunk + result chunkResult + } + doneC chan chunkResult } readerFuncImpl := func(args []reflect.Value) []reflect.Value { - state := args[1].Interface().(*state) - if state.sunkC == nil { - state.sunkC = make(chan []reflect.Value, defaultChunksize) + var ( + shard = args[0] + state = args[1].Interface().(*state) + chunk readerChunk = args[2:] + ) + if state.emptyC == nil { + state.emptyC = make(chan readerChunk) + state.doneC = make(chan chunkResult) + sinkSend := func() { + state.doneC <- state.sink.result + state.sink.filling = nil + state.sink.result = chunkResult{} + } sinkImpl := func(args []reflect.Value) []reflect.Value { - state.sunkC <- args + if state.sink.filling == nil { + state.sink.filling = <-state.emptyC + } + state.sink.filling.SetRow(state.sink.result.n, args) + state.sink.result.n++ + if state.sink.result.n == state.sink.filling.Len() { + sinkSend() + } return nil } sinkFunc := reflect.MakeFunc(sinkType, sinkImpl) go func() { - defer close(state.sunkC) + defer close(state.emptyC) // Panic if another send is attempted. + defer close(state.doneC) defer func() { if p := recover(); p != nil { - state.err = fmt.Errorf("pushreader: panic from read: %v", p) + state.sink.result.err = fmt.Errorf("pushreader: panic from read: %v", p) + } else { + state.sink.result.err = sliceio.EOF + } + // Make sure it's our turn to send our last result. + if state.sink.filling == nil { + state.sink.filling = <-state.emptyC } + sinkSend() }() - outs := reflect.ValueOf(sinkRead).Call([]reflect.Value{args[0], sinkFunc}) + outs := reflect.ValueOf(sinkRead).Call([]reflect.Value{shard, sinkFunc}) if errI := outs[0].Interface(); errI != nil { - state.err = errI.(error) + state.sink.result.err = errI.(error) } }() } - - var rows int - for rows < args[2].Len() { - row := <-state.sunkC - if row == nil { - state.err = sliceio.EOF - break - } - must.True(len(row) == len(args[2:]), "%v, %v", len(row), len(args[2:])) - for c := range row { - args[2+c].Index(rows).Set(row[c]) - } - rows++ - } + state.emptyC <- chunk + result := <-state.doneC errValue := errorNilValue - if state.err != nil { - errValue = reflect.ValueOf(state.err) + if result.err != nil { + errValue = reflect.ValueOf(result.err) } - return []reflect.Value{reflect.ValueOf(rows), errValue} + return []reflect.Value{reflect.ValueOf(result.n), errValue} } readerFuncArgTypes := []reflect.Type{reflect.TypeOf(int(0)), reflect.TypeOf(&state{})} for i := 0; i < sinkType.NumIn(); i++ { @@ -78,3 +98,14 @@ func PushReader(nshard int, sinkRead interface{}, prags ...Pragma) Slice { return ReaderFunc(nshard, readerFunc.Interface(), prags...) } + +// TODO: Consider implementing Slice directly (instead of via ReaderFunc) and using Frame. +type readerChunk []reflect.Value + +func (c readerChunk) Len() int { return c[0].Len() } + +func (c readerChunk) SetRow(r int, vals []reflect.Value) { + for col := 0; col < len(c); col++ { + c[col].Index(r).Set(vals[col]) + } +} From d22fdead5e185d3614e268bd5800b256fd7b5550 Mon Sep 17 00:00:00 2001 From: Josh Newman <3987603+josh-newman@users.noreply.github.com> Date: Tue, 10 Nov 2020 03:12:06 +0000 Subject: [PATCH 4/5] simulate more expensive per-row work in benchmark --- push_read_test.go | 105 +++++++++++++++++++++++++++++----------------- 1 file changed, 67 insertions(+), 38 deletions(-) diff --git a/push_read_test.go b/push_read_test.go index 5505255..2c1adc6 100644 --- a/push_read_test.go +++ b/push_read_test.go @@ -2,9 +2,9 @@ package bigslice_test import ( "context" + "fmt" "math" "math/rand" - "strconv" "testing" fuzz "github.com/google/gofuzz" @@ -16,8 +16,8 @@ import ( func TestPushReader(t *testing.T) { const ( - N = 3 - Nshard = 1 + N = 1000 + Nshard = 10 ) slice := bigslice.PushReader(Nshard, func(shard int, sink func(string, int)) error { fuzzer := fuzz.NewWithSeed(1) @@ -41,62 +41,76 @@ func TestPushReader(t *testing.T) { // goos: linux // goarch: amd64 // pkg: github.com/grailbio/bigslice -// BenchmarkReaders/10/pushreader-8 110 10705539 ns/op -// BenchmarkReaders/10/readerfunc-8 100 10036689 ns/op -// BenchmarkReaders/1000/pushreader-8 8 131828675 ns/op -// BenchmarkReaders/1000/readerfunc-8 12 97414219 ns/op -// BenchmarkReaders/100000/pushreader-8 1 11181803529 ns/op -// BenchmarkReaders/100000/readerfunc-8 1 8714653324 ns/op +// BenchmarkReaders/heavy=false/n=10/standard-8 100 10641845 ns/op +// BenchmarkReaders/heavy=false/n=10/push-8 100 11062321 ns/op +// BenchmarkReaders/heavy=false/n=1000/standard-8 12 98823548 ns/op +// BenchmarkReaders/heavy=false/n=1000/push-8 9 117393717 ns/op +// BenchmarkReaders/heavy=true/n=10/standard-8 20 55009760 ns/op +// BenchmarkReaders/heavy=true/n=10/push-8 20 56645978 ns/op +// BenchmarkReaders/heavy=true/n=1000/standard-8 1 4544902923 ns/op +// BenchmarkReaders/heavy=true/n=1000/push-8 1 4555043499 ns/op // PASS // ok github.com/grailbio/bigslice 26.135s func BenchmarkReaders(b *testing.B) { ctx := context.Background() sess := exec.Start(exec.Local) - for _, rowsPerShard := range []int{10, 1000, 100000} { - lastResult := -1 - checkResult := func(sliceResult *exec.Result) { - scanner := sliceResult.Scanner() - var result int - must.True(scanner.Scan(ctx, &result)) - if lastResult < 0 { - lastResult = result + for _, heavyWork := range []bool{false, true} { + for _, rowsPerShard := range []int{10, 1000} { + lastResult := -1 + checkResult := func(sliceResult *exec.Result) { + scanner := sliceResult.Scanner() + var result int + must.True(scanner.Scan(ctx, &result)) + if lastResult < 0 { + lastResult = result + } + must.True(lastResult == result) } - must.True(lastResult == result) - } - - b.Run(strconv.Itoa(rowsPerShard), func(b *testing.B) { - b.Run("pushreader", func(b *testing.B) { + opts := benchmarkOpts{ + Seed: 1, + RowsPerShard: rowsPerShard, + HeavyWork: heavyWork, + } + b.Run(fmt.Sprintf("heavy=%t/n=%d/standard", heavyWork, rowsPerShard), func(b *testing.B) { for i := 0; i < b.N; i++ { - checkResult(sess.Must(ctx, benchmarkPushReader, int64(1), rowsPerShard, true)) + checkResult(sess.Must(ctx, benchmarkPushReader, opts)) } }) - b.Run("readerfunc", func(b *testing.B) { + opts.PushReader = true + b.Run(fmt.Sprintf("heavy=%t/n=%d/push", heavyWork, rowsPerShard), func(b *testing.B) { for i := 0; i < b.N; i++ { - checkResult(sess.Must(ctx, benchmarkPushReader, int64(1), rowsPerShard, false)) + checkResult(sess.Must(ctx, benchmarkPushReader, opts)) } }) - }) + } } sess.Shutdown() } -var benchmarkPushReader = bigslice.Func(func( - seed int64, - rowsPerShard int, - usePush bool, -) bigslice.Slice { +type benchmarkOpts struct { + Seed int64 + RowsPerShard int + PushReader bool + HeavyWork bool +} + +var benchmarkPushReader = bigslice.Func(func(opts benchmarkOpts) bigslice.Slice { const nShards = 100 shardSeeds := make([]int64, nShards) - rnd := rand.New(rand.NewSource(seed)) + rnd := rand.New(rand.NewSource(opts.Seed)) for i := range shardSeeds { shardSeeds[i] = rnd.Int63() } var slice bigslice.Slice - if usePush { + if opts.PushReader { slice = bigslice.PushReader(nShards, func(shard int, row func(int32)) error { shardRnd := rand.New(rand.NewSource(shardSeeds[shard])) - for i := 0; i < rowsPerShard; i++ { - row(shardRnd.Int31()) + for i := 0; i < opts.RowsPerShard; i++ { + if opts.HeavyWork { + row(heavyWork(shardRnd)) + } else { + row(lightWork(shardRnd)) + } } return nil }) @@ -109,12 +123,16 @@ var benchmarkPushReader = bigslice.Func(func( if state.Rand == nil { state.Rand = rand.New(rand.NewSource(shardSeeds[shard])) } - if state.doneRows == rowsPerShard { + if state.doneRows == opts.RowsPerShard { return 0, sliceio.EOF } var i int - for state.doneRows < rowsPerShard && i < len(nums) { - nums[i] = state.Rand.Int31() + for state.doneRows < opts.RowsPerShard && i < len(nums) { + if opts.HeavyWork { + nums[i] = heavyWork(state.Rand) + } else { + nums[i] = lightWork(state.Rand) + } state.doneRows++ i++ } @@ -134,3 +152,14 @@ var benchmarkPushReader = bigslice.Func(func( return accum }) }) + +func heavyWork(r *rand.Rand) int32 { + for i := 0; i < 10000; i++ { + _ = r.Int() + } + return lightWork(r) +} + +func lightWork(r *rand.Rand) int32 { + return r.Int31() +} From 1746e1c7517970ce5c334b7f193dccd38ba20e6e Mon Sep 17 00:00:00 2001 From: Josh Newman <3987603+josh-newman@users.noreply.github.com> Date: Tue, 10 Nov 2020 03:32:52 +0000 Subject: [PATCH 5/5] fix error reflection --- push_read.go | 15 +++------------ 1 file changed, 3 insertions(+), 12 deletions(-) diff --git a/push_read.go b/push_read.go index f7d5fa0..b98d10d 100644 --- a/push_read.go +++ b/push_read.go @@ -14,12 +14,7 @@ func PushReader(nshard int, sinkRead interface{}, prags ...Pragma) Slice { if !ok || fn.In.NumOut() != 2 || fn.In.Out(0).Kind() != reflect.Int { typecheck.Panicf(1, "pushreader: invalid reader function type %T", sinkRead) } - - var ( - sinkType = fn.In.Out(1) - errorType = reflect.TypeOf((*error)(nil)).Elem() - errorNilValue = reflect.Zero(errorType) - ) + sinkType := fn.In.Out(1) type chunkResult struct { n int @@ -82,18 +77,14 @@ func PushReader(nshard int, sinkRead interface{}, prags ...Pragma) Slice { } state.emptyC <- chunk result := <-state.doneC - errValue := errorNilValue - if result.err != nil { - errValue = reflect.ValueOf(result.err) - } - return []reflect.Value{reflect.ValueOf(result.n), errValue} + return []reflect.Value{reflect.ValueOf(result.n), reflect.ValueOf(&result.err).Elem()} } readerFuncArgTypes := []reflect.Type{reflect.TypeOf(int(0)), reflect.TypeOf(&state{})} for i := 0; i < sinkType.NumIn(); i++ { readerFuncArgTypes = append(readerFuncArgTypes, reflect.SliceOf(sinkType.In(i))) } readerFuncType := reflect.FuncOf(readerFuncArgTypes, - []reflect.Type{reflect.TypeOf(int(0)), errorType}, false) + []reflect.Type{reflect.TypeOf(int(0)), reflect.TypeOf((*error)(nil)).Elem()}, false) readerFunc := reflect.MakeFunc(readerFuncType, readerFuncImpl) return ReaderFunc(nshard, readerFunc.Interface(), prags...)