diff --git a/.gitignore b/.gitignore new file mode 100644 index 00000000..671b433b --- /dev/null +++ b/.gitignore @@ -0,0 +1,2 @@ +etc/out/ +vendor/ \ No newline at end of file diff --git a/Makefile b/Makefile new file mode 100644 index 00000000..9c18a9a5 --- /dev/null +++ b/Makefile @@ -0,0 +1,36 @@ +SHELL := bash +.SHELLFLAGS := -eu -o pipefail -c +.PHONY: all + + +all: setup build test lint + +setup: + if [ ! -e $(shell go env GOPATH)/bin/golangci-lint ] ; then curl -sSfL https://raw.githubusercontent.com/golangci/golangci-lint/master/install.sh | sh -s -- -b $(shell go env GOPATH)/bin v1.31.0 ; fi; + +build: + go build ./... + +dep: + go mod tidy + +lint: + golangci-lint run --disable-all \ + --enable=staticcheck --enable=unused --enable=gosimple --enable=structcheck --enable=varcheck --enable=ineffassign \ + --enable=deadcode --enable=typecheck --enable=stylecheck --enable=gosec --enable=unconvert --enable=gofmt \ + --enable=unparam --enable=nakedret --enable=gochecknoinits --enable=depguard --enable=gocyclo --enable=misspell \ + --enable=megacheck --enable=goimports --enable=golint \ + --deadline=5m --no-config + +lint-strict: + golangci-lint run --enable-all + +test: + mkdir -p ./etc/out + ENVIRONMENT=test go test -failfast -count 1 -timeout 30s -covermode=atomic -coverprofile=etc/out/profile.cov ./... && go tool cover -func=etc/out/profile.cov + +docker.start: docker.stop + docker run -d --rm -p 6379:6379 --name work-redis-server redis:latest || true + +docker.stop: + docker stop work-redis-server || true \ No newline at end of file diff --git a/benches/bench_goworker/main.go b/benches/bench_goworker/main.go deleted file mode 100644 index e254d6f1..00000000 --- a/benches/bench_goworker/main.go +++ /dev/null @@ -1,124 +0,0 @@ -package main - -import ( - "fmt" - "os" - "sync/atomic" - "time" - - "github.com/benmanns/goworker" - "github.com/gocraft/health" - "github.com/gomodule/redigo/redis" -) - -func myJob(queue string, args ...interface{}) error { - atomic.AddInt64(&totcount, 1) - //fmt.Println("job! ", queue) - return nil -} - -var namespace = "bench_test" -var pool = newPool(":6379") - -// go run *.go -queues="myqueue,myqueue2,myqueue3,myqueue4,myqueue5" -namespace="bench_test:" -concurrency=50 -use-nuber -func main() { - - stream := health.NewStream().AddSink(&health.WriterSink{os.Stdout}) - stream.Event("wat") - cleanKeyspace() - - queues := []string{"myqueue", "myqueue2", "myqueue3", "myqueue4", "myqueue5"} - numJobs := 100000 / len(queues) - - job := stream.NewJob("enqueue_all") - for _, q := range queues { - enqueueJobs(q, numJobs) - } - job.Complete(health.Success) - - goworker.Register("MyClass", myJob) - - go monitor() - - // Blocks until process is told to exit via unix signal - goworker.Work() -} - -var totcount int64 - -func monitor() { - t := time.Tick(1 * time.Second) - - curT := 0 - c1 := int64(0) - c2 := int64(0) - prev := int64(0) - -DALOOP: - for { - select { - case <-t: - curT++ - v := atomic.AddInt64(&totcount, 0) - fmt.Printf("after %d seconds, count is %d\n", curT, v) - if curT == 1 { - c1 = v - } else if curT == 3 { - c2 = v - } - if v == prev { - break DALOOP - } - prev = v - } - } - fmt.Println("Jobs/sec: ", float64(c2-c1)/2.0) - os.Exit(0) -} - -func enqueueJobs(queue string, count int) { - conn := pool.Get() - defer conn.Close() - - for i := 0; i < count; i++ { - //workers.Enqueue(queue, "Foo", []int{i}) - conn.Do("RPUSH", "bench_test:queue:"+queue, `{"class":"MyClass","args":[]}`) - } -} - -func cleanKeyspace() { - conn := pool.Get() - defer conn.Close() - - keys, err := redis.Strings(conn.Do("KEYS", namespace+"*")) - if err != nil { - panic("could not get keys: " + err.Error()) - } - for _, k := range keys { - //fmt.Println("deleting ", k) - if _, err := conn.Do("DEL", k); err != nil { - panic("could not del: " + err.Error()) - } - } -} - -func newPool(addr string) *redis.Pool { - return &redis.Pool{ - MaxActive: 3, - MaxIdle: 3, - IdleTimeout: 240 * time.Second, - Dial: func() (redis.Conn, error) { - c, err := redis.Dial("tcp", addr) - if err != nil { - return nil, err - } - return c, nil - //return redis.NewLoggingConn(c, log.New(os.Stdout, "", 0), "redis"), err - }, - Wait: true, - //TestOnBorrow: func(c redis.Conn, t time.Time) error { - // _, err := c.Do("PING") - // return err - //}, - } -} diff --git a/benches/bench_goworkers/main.go b/benches/bench_goworkers/main.go deleted file mode 100644 index 153d4874..00000000 --- a/benches/bench_goworkers/main.go +++ /dev/null @@ -1,132 +0,0 @@ -package main - -import ( - "fmt" - "os" - "sync/atomic" - "time" - - "github.com/gocraft/health" - "github.com/gomodule/redigo/redis" - "github.com/jrallison/go-workers" -) - -func myJob(m *workers.Msg) { - atomic.AddInt64(&totcount, 1) -} - -var namespace = "bench_test" -var pool = newPool(":6379") - -func main() { - - stream := health.NewStream().AddSink(&health.WriterSink{os.Stdout}) - stream.Event("wat") - cleanKeyspace() - - workers.Configure(map[string]string{ - // location of redis instance - "server": "localhost:6379", - // instance of the database - "database": "0", - // number of connections to keep open with redis - "pool": "10", - // unique process id for this instance of workers (for proper recovery of inprogress jobs on crash) - "process": "1", - "namespace": namespace, - }) - workers.Middleware = &workers.Middlewares{} - - queues := []string{"myqueue", "myqueue2", "myqueue3", "myqueue4", "myqueue5"} - numJobs := 100000 / len(queues) - - job := stream.NewJob("enqueue_all") - for _, q := range queues { - enqueueJobs(q, numJobs) - } - job.Complete(health.Success) - - for _, q := range queues { - workers.Process(q, myJob, 10) - } - - go monitor() - - // Blocks until process is told to exit via unix signal - workers.Run() -} - -var totcount int64 - -func monitor() { - t := time.Tick(1 * time.Second) - - curT := 0 - c1 := int64(0) - c2 := int64(0) - prev := int64(0) - -DALOOP: - for { - select { - case <-t: - curT++ - v := atomic.AddInt64(&totcount, 0) - fmt.Printf("after %d seconds, count is %d\n", curT, v) - if curT == 1 { - c1 = v - } else if curT == 3 { - c2 = v - } - if v == prev { - break DALOOP - } - prev = v - } - } - fmt.Println("Jobs/sec: ", float64(c2-c1)/2.0) - os.Exit(0) -} - -func enqueueJobs(queue string, count int) { - for i := 0; i < count; i++ { - workers.Enqueue(queue, "Foo", []int{i}) - } -} - -func cleanKeyspace() { - conn := pool.Get() - defer conn.Close() - - keys, err := redis.Strings(conn.Do("KEYS", namespace+"*")) - if err != nil { - panic("could not get keys: " + err.Error()) - } - for _, k := range keys { - //fmt.Println("deleting ", k) - if _, err := conn.Do("DEL", k); err != nil { - panic("could not del: " + err.Error()) - } - } -} - -func newPool(addr string) *redis.Pool { - return &redis.Pool{ - MaxActive: 3, - MaxIdle: 3, - IdleTimeout: 240 * time.Second, - Dial: func() (redis.Conn, error) { - c, err := redis.Dial("tcp", addr) - if err != nil { - return nil, err - } - return c, nil - //return redis.NewLoggingConn(c, log.New(os.Stdout, "", 0), "redis"), err - }, - Wait: true, - //TestOnBorrow: func(c redis.Conn, t time.Time) error { - // _, err := c.Do("PING") - // return err - //}, - } -} diff --git a/benches/bench_jobs/main.go b/benches/bench_jobs/main.go deleted file mode 100644 index 3ba5f165..00000000 --- a/benches/bench_jobs/main.go +++ /dev/null @@ -1,141 +0,0 @@ -package main - -import ( - "fmt" - "os" - "sync/atomic" - "time" - - "github.com/albrow/jobs" - "github.com/gocraft/health" - "github.com/gomodule/redigo/redis" -) - -var namespace = "jobs" -var pool = newPool(":6379") - -func epsilonHandler(i int) error { - atomic.AddInt64(&totcount, 1) - return nil -} - -func main() { - stream := health.NewStream().AddSink(&health.WriterSink{os.Stdout}) - cleanKeyspace() - - queueNames := []string{"myqueue", "myqueue2", "myqueue3", "myqueue4", "myqueue5"} - queues := []*jobs.Type{} - - for _, qn := range queueNames { - q, err := jobs.RegisterType(qn, 3, epsilonHandler) - if err != nil { - panic(err) - } - queues = append(queues, q) - } - - job := stream.NewJob("enqueue_all") - - numJobs := 40000 / len(queues) - for _, q := range queues { - for i := 0; i < numJobs; i++ { - _, err := q.Schedule(100, time.Now(), i) - if err != nil { - panic(err) - } - } - } - - job.Complete(health.Success) - - go monitor() - - job = stream.NewJob("run_all") - pool, err := jobs.NewPool(&jobs.PoolConfig{ - // NumWorkers: 1000, - // BatchSize: 3000, - }) - if err != nil { - panic(err) - } - defer func() { - pool.Close() - if err := pool.Wait(); err != nil { - panic(err) - } - }() - if err := pool.Start(); err != nil { - panic(err) - } - job.Complete(health.Success) - select {} -} - -var totcount int64 - -func monitor() { - t := time.Tick(1 * time.Second) - - curT := 0 - c1 := int64(0) - c2 := int64(0) - prev := int64(0) - -DALOOP: - for { - select { - case <-t: - curT++ - v := atomic.AddInt64(&totcount, 0) - fmt.Printf("after %d seconds, count is %d\n", curT, v) - if curT == 1 { - c1 = v - } else if curT == 3 { - c2 = v - } - if v == prev { - break DALOOP - } - prev = v - } - } - fmt.Println("Jobs/sec: ", float64(c2-c1)/2.0) - os.Exit(0) -} - -func cleanKeyspace() { - conn := pool.Get() - defer conn.Close() - - keys, err := redis.Strings(conn.Do("KEYS", namespace+"*")) - if err != nil { - panic("could not get keys: " + err.Error()) - } - for _, k := range keys { - //fmt.Println("deleting ", k) - if _, err := conn.Do("DEL", k); err != nil { - panic("could not del: " + err.Error()) - } - } -} - -func newPool(addr string) *redis.Pool { - return &redis.Pool{ - MaxActive: 20, - MaxIdle: 20, - IdleTimeout: 240 * time.Second, - Dial: func() (redis.Conn, error) { - c, err := redis.Dial("tcp", addr) - if err != nil { - return nil, err - } - return c, nil - //return redis.NewLoggingConn(c, log.New(os.Stdout, "", 0), "redis"), err - }, - Wait: true, - //TestOnBorrow: func(c redis.Conn, t time.Time) error { - // _, err := c.Do("PING") - // return err - //}, - } -} diff --git a/benches/bench_work/main.go b/benches/bench_work/main.go deleted file mode 100644 index aaf1de69..00000000 --- a/benches/bench_work/main.go +++ /dev/null @@ -1,131 +0,0 @@ -package main - -import ( - "fmt" - "os" - "sync/atomic" - "time" - - "github.com/gocraft/health" - "github.com/gocraft/work" - "github.com/gomodule/redigo/redis" -) - -var namespace = "bench_test" -var pool = newPool(":6379") - -type context struct{} - -func epsilonHandler(job *work.Job) error { - //fmt.Println("hi") - //a := job.Args[0] - //fmt.Printf("job: %s arg: %v\n", job.Name, a) - atomic.AddInt64(&totcount, 1) - return nil -} - -func main() { - stream := health.NewStream().AddSink(&health.WriterSink{os.Stdout}) - cleanKeyspace() - - numJobs := 10 - jobNames := []string{} - - for i := 0; i < numJobs; i++ { - jobNames = append(jobNames, fmt.Sprintf("job%d", i)) - } - - job := stream.NewJob("enqueue_all") - enqueueJobs(jobNames, 10000) - job.Complete(health.Success) - - workerPool := work.NewWorkerPool(context{}, 20, namespace, pool) - for _, jobName := range jobNames { - workerPool.Job(jobName, epsilonHandler) - } - go monitor() - - job = stream.NewJob("run_all") - workerPool.Start() - workerPool.Drain() - job.Complete(health.Success) - select {} -} - -var totcount int64 - -func monitor() { - t := time.Tick(1 * time.Second) - - curT := 0 - c1 := int64(0) - c2 := int64(0) - prev := int64(0) - -DALOOP: - for { - select { - case <-t: - curT++ - v := atomic.AddInt64(&totcount, 0) - fmt.Printf("after %d seconds, count is %d\n", curT, v) - if curT == 1 { - c1 = v - } else if curT == 3 { - c2 = v - } - if v == prev { - break DALOOP - } - prev = v - } - } - fmt.Println("Jobs/sec: ", float64(c2-c1)/2.0) - os.Exit(0) -} - -func enqueueJobs(jobs []string, count int) { - enq := work.NewEnqueuer(namespace, pool) - for _, jobName := range jobs { - for i := 0; i < count; i++ { - enq.Enqueue(jobName, work.Q{"i": i}) - } - } -} - -func cleanKeyspace() { - conn := pool.Get() - defer conn.Close() - - keys, err := redis.Strings(conn.Do("KEYS", namespace+"*")) - if err != nil { - panic("could not get keys: " + err.Error()) - } - for _, k := range keys { - //fmt.Println("deleting ", k) - if _, err := conn.Do("DEL", k); err != nil { - panic("could not del: " + err.Error()) - } - } -} - -func newPool(addr string) *redis.Pool { - return &redis.Pool{ - MaxActive: 20, - MaxIdle: 20, - IdleTimeout: 240 * time.Second, - Dial: func() (redis.Conn, error) { - c, err := redis.Dial("tcp", addr) - if err != nil { - return nil, err - } - return c, nil - //return redis.NewLoggingConn(c, log.New(os.Stdout, "", 0), "redis"), err - }, - Wait: true, - //TestOnBorrow: func(c redis.Conn, t time.Time) error { - // _, err := c.Do("PING") - // return err - //}, - } -} diff --git a/client_test.go b/client_test.go index dc012441..3079e5c1 100644 --- a/client_test.go +++ b/client_test.go @@ -6,14 +6,17 @@ import ( "time" "github.com/gomodule/redigo/redis" + "github.com/google/uuid" "github.com/stretchr/testify/assert" ) type TestContext struct{} func TestClientWorkerPoolHeartbeats(t *testing.T) { - pool := newTestPool(":6379") - ns := "work" + t.Parallel() + + pool := newTestPool() + ns := uuid.New().String() cleanKeyspace(ns, pool) wp := NewWorkerPool(TestContext{}, 10, ns, pool) @@ -64,8 +67,8 @@ func TestClientWorkerPoolHeartbeats(t *testing.T) { } func TestClientWorkerObservations(t *testing.T) { - pool := newTestPool(":6379") - ns := "work" + pool := newTestPool() + ns := uuid.New().String() cleanKeyspace(ns, pool) enqueuer := NewEnqueuer(ns, pool) @@ -133,14 +136,17 @@ func TestClientWorkerObservations(t *testing.T) { } func TestClientQueues(t *testing.T) { - pool := newTestPool(":6379") - ns := "work" + pool := newTestPool() + ns := uuid.New().String() cleanKeyspace(ns, pool) enqueuer := NewEnqueuer(ns, pool) _, err := enqueuer.Enqueue("wat", nil) + assert.NoError(t, err) _, err = enqueuer.Enqueue("foo", nil) + assert.NoError(t, err) _, err = enqueuer.Enqueue("zaz", nil) + assert.NoError(t, err) // Start a pool to work on it. It's going to work on the queues // side effect of that is knowing which jobs are avail @@ -184,8 +190,8 @@ func TestClientQueues(t *testing.T) { } func TestClientScheduledJobs(t *testing.T) { - pool := newTestPool(":6379") - ns := "work" + pool := newTestPool() + ns := uuid.New().String() cleanKeyspace(ns, pool) enqueuer := NewEnqueuer(ns, pool) @@ -193,8 +199,11 @@ func TestClientScheduledJobs(t *testing.T) { setNowEpochSecondsMock(1425263409) defer resetNowEpochSecondsMock() _, err := enqueuer.EnqueueIn("wat", 0, Q{"a": 1, "b": 2}) + assert.NoError(t, err) _, err = enqueuer.EnqueueIn("zaz", 4, Q{"a": 3, "b": 4}) + assert.NoError(t, err) _, err = enqueuer.EnqueueIn("foo", 2, Q{"a": 3, "b": 4}) + assert.NoError(t, err) client := NewClient(ns, pool) jobs, count, err := client.ScheduledJobs(1) @@ -232,8 +241,8 @@ func TestClientScheduledJobs(t *testing.T) { } func TestClientRetryJobs(t *testing.T) { - pool := newTestPool(":6379") - ns := "work" + pool := newTestPool() + ns := uuid.New().String() cleanKeyspace(ns, pool) setNowEpochSecondsMock(1425263409) @@ -271,8 +280,8 @@ func TestClientRetryJobs(t *testing.T) { } func TestClientDeadJobs(t *testing.T) { - pool := newTestPool(":6379") - ns := "testwork" + pool := newTestPool() + ns := uuid.New().String() cleanKeyspace(ns, pool) setNowEpochSecondsMock(1425263409) @@ -326,8 +335,10 @@ func TestClientDeadJobs(t *testing.T) { } func TestClientDeleteDeadJob(t *testing.T) { - pool := newTestPool(":6379") - ns := "testwork" + t.Parallel() + + pool := newTestPool() + ns := uuid.New().String() cleanKeyspace(ns, pool) // Insert a dead job: @@ -355,8 +366,10 @@ func TestClientDeleteDeadJob(t *testing.T) { } func TestClientRetryDeadJob(t *testing.T) { - pool := newTestPool(":6379") - ns := "testwork" + t.Parallel() + + pool := newTestPool() + ns := uuid.New().String() cleanKeyspace(ns, pool) // Insert a dead job: @@ -411,8 +424,10 @@ func TestClientRetryDeadJob(t *testing.T) { } func TestClientRetryDeadJobWithArgs(t *testing.T) { - pool := newTestPool(":6379") - ns := "testwork" + t.Parallel() + + pool := newTestPool() + ns := uuid.New().String() cleanKeyspace(ns, pool) // Enqueue a job with arguments @@ -455,15 +470,17 @@ func TestClientRetryDeadJobWithArgs(t *testing.T) { } func TestClientDeleteAllDeadJobs(t *testing.T) { - pool := newTestPool(":6379") - ns := "testwork" + t.Parallel() + + pool := newTestPool() + ns := uuid.New().String() cleanKeyspace(ns, pool) // Insert a dead job: - insertDeadJob(ns, pool, "wat", 12345, 12347) - insertDeadJob(ns, pool, "wat", 12345, 12347) - insertDeadJob(ns, pool, "wat", 12345, 12349) - insertDeadJob(ns, pool, "wat", 12345, 12350) + insertDeadJob(ns, pool, "wat", 12344, 12347) + insertDeadJob(ns, pool, "wat", 12344, 12347) + insertDeadJob(ns, pool, "wat", 12344, 12349) + insertDeadJob(ns, pool, "wat", 12344, 12350) client := NewClient(ns, pool) jobs, count, err := client.DeadJobs(1) @@ -481,8 +498,8 @@ func TestClientDeleteAllDeadJobs(t *testing.T) { } func TestClientRetryAllDeadJobs(t *testing.T) { - pool := newTestPool(":6379") - ns := "testwork" + pool := newTestPool() + ns := uuid.New().String() cleanKeyspace(ns, pool) setNowEpochSecondsMock(1425263409) @@ -539,8 +556,10 @@ func TestClientRetryAllDeadJobs(t *testing.T) { } func TestClientRetryAllDeadJobsBig(t *testing.T) { - pool := newTestPool(":6379") - ns := "testwork" + t.Parallel() + + pool := newTestPool() + ns := uuid.New().String() cleanKeyspace(ns, pool) conn := pool.Get() @@ -608,8 +627,10 @@ func TestClientRetryAllDeadJobsBig(t *testing.T) { } func TestClientDeleteScheduledJob(t *testing.T) { - pool := newTestPool(":6379") - ns := "testwork" + t.Parallel() + + pool := newTestPool() + ns := uuid.New().String() cleanKeyspace(ns, pool) // Delete an invalid job. Make sure we get error @@ -629,8 +650,10 @@ func TestClientDeleteScheduledJob(t *testing.T) { } func TestClientDeleteScheduledUniqueJob(t *testing.T) { - pool := newTestPool(":6379") - ns := "testwork" + t.Parallel() + + pool := newTestPool() + ns := uuid.New().String() cleanKeyspace(ns, pool) // Schedule a unique job. Delete it. Ensure we can schedule it again. @@ -650,8 +673,8 @@ func TestClientDeleteScheduledUniqueJob(t *testing.T) { } func TestClientDeleteRetryJob(t *testing.T) { - pool := newTestPool(":6379") - ns := "testwork" + pool := newTestPool() + ns := uuid.New().String() cleanKeyspace(ns, pool) setNowEpochSecondsMock(1425263409) @@ -683,7 +706,7 @@ func TestClientDeleteRetryJob(t *testing.T) { } } -func insertDeadJob(ns string, pool *redis.Pool, name string, encAt, failAt int64) *Job { +func insertDeadJob(ns string, pool *redis.Pool, name string, encAt, failAt int64) { job := &Job{ Name: name, ID: makeIdentifier(), @@ -707,7 +730,6 @@ func insertDeadJob(ns string, pool *redis.Pool, name string, encAt, failAt int64 panic(err) } - return job } func getQueuedJob(ns string, pool *redis.Pool, name string) *Job { diff --git a/cmd/workenqueue/main.go b/cmd/workenqueue/main.go deleted file mode 100644 index 924f6ec8..00000000 --- a/cmd/workenqueue/main.go +++ /dev/null @@ -1,59 +0,0 @@ -package main - -import ( - "encoding/json" - "flag" - "fmt" - "os" - "time" - - "github.com/gocraft/work" - "github.com/gomodule/redigo/redis" -) - -var redisHostPort = flag.String("redis", ":6379", "redis hostport") -var redisNamespace = flag.String("ns", "work", "redis namespace") -var jobName = flag.String("job", "", "job name") -var jobArgs = flag.String("args", "{}", "job arguments") - -func main() { - flag.Parse() - - if *jobName == "" { - fmt.Println("no job specified") - os.Exit(1) - } - - pool := newPool(*redisHostPort) - - var args map[string]interface{} - err := json.Unmarshal([]byte(*jobArgs), &args) - if err != nil { - fmt.Println("invalid args:", err) - os.Exit(1) - } - - en := work.NewEnqueuer(*redisNamespace, pool) - en.Enqueue(*jobName, args) -} - -func newPool(addr string) *redis.Pool { - return &redis.Pool{ - MaxActive: 20, - MaxIdle: 20, - IdleTimeout: 240 * time.Second, - Dial: func() (redis.Conn, error) { - c, err := redis.Dial("tcp", addr) - if err != nil { - return nil, err - } - return c, nil - //return redis.NewLoggingConn(c, log.New(os.Stdout, "", 0), "redis"), err - }, - Wait: true, - //TestOnBorrow: func(c redis.Conn, t time.Time) error { - // _, err := c.Do("PING") - // return err - //}, - } -} diff --git a/cmd/workfakedata/main.go b/cmd/workfakedata/main.go deleted file mode 100644 index 72e77b04..00000000 --- a/cmd/workfakedata/main.go +++ /dev/null @@ -1,94 +0,0 @@ -package main - -import ( - "flag" - "fmt" - "math/rand" - "time" - - "github.com/gocraft/work" - "github.com/gomodule/redigo/redis" -) - -var redisHostPort = flag.String("redis", ":6379", "redis hostport") -var redisNamespace = flag.String("ns", "work", "redis namespace") - -func epsilonHandler(job *work.Job) error { - fmt.Println("epsilon") - time.Sleep(time.Second) - - if rand.Intn(2) == 0 { - return fmt.Errorf("random error") - } - return nil -} - -type context struct{} - -func main() { - flag.Parse() - fmt.Println("Installing some fake data") - - pool := newPool(*redisHostPort) - cleanKeyspace(pool, *redisNamespace) - - // Enqueue some jobs: - go func() { - conn := pool.Get() - defer conn.Close() - conn.Do("SADD", *redisNamespace+":known_jobs", "foobar") - }() - - go func() { - for { - en := work.NewEnqueuer(*redisNamespace, pool) - for i := 0; i < 20; i++ { - en.Enqueue("foobar", work.Q{"i": i}) - } - - time.Sleep(1 * time.Second) - } - }() - - wp := work.NewWorkerPool(context{}, 5, *redisNamespace, pool) - wp.Job("foobar", epsilonHandler) - wp.Start() - - select {} -} - -func newPool(addr string) *redis.Pool { - return &redis.Pool{ - MaxActive: 20, - MaxIdle: 20, - IdleTimeout: 240 * time.Second, - Dial: func() (redis.Conn, error) { - c, err := redis.Dial("tcp", addr) - if err != nil { - return nil, err - } - return c, nil - //return redis.NewLoggingConn(c, log.New(os.Stdout, "", 0), "redis"), err - }, - Wait: true, - //TestOnBorrow: func(c redis.Conn, t time.Time) error { - // _, err := c.Do("PING") - // return err - //}, - } -} - -func cleanKeyspace(pool *redis.Pool, namespace string) { - conn := pool.Get() - defer conn.Close() - - keys, err := redis.Strings(conn.Do("KEYS", namespace+"*")) - if err != nil { - panic("could not get keys: " + err.Error()) - } - for _, k := range keys { - if _, err := conn.Do("DEL", k); err != nil { - panic("could not del: " + err.Error()) - } - } -} diff --git a/cmd/workwebui/main.go b/cmd/workwebui/main.go index 6d3e32fc..5bd41b5c 100644 --- a/cmd/workwebui/main.go +++ b/cmd/workwebui/main.go @@ -6,6 +6,7 @@ import ( "os" "os/signal" "strconv" + "syscall" "time" "github.com/gocraft/work/webui" @@ -40,7 +41,7 @@ func main() { server.Start() c := make(chan os.Signal, 1) - signal.Notify(c, os.Interrupt, os.Kill) + signal.Notify(c, syscall.SIGINT, syscall.SIGTERM) <-c diff --git a/dead_pool_reaper.go b/dead_pool_reaper.go index e930521e..f9149db3 100644 --- a/dead_pool_reaper.go +++ b/dead_pool_reaper.go @@ -1,8 +1,9 @@ package work import ( + "crypto/rand" "fmt" - "math/rand" + "math/big" "strings" "time" @@ -59,8 +60,13 @@ func (r *deadPoolReaper) loop() { r.doneStoppingChan <- struct{}{} return case <-timer.C: + n, err := rand.Int(rand.Reader, big.NewInt(reapJitterSecs)) + if err != nil { + panic(err) + } + // Schedule next occurrence periodically with jitter - timer.Reset(r.reapPeriod + time.Duration(rand.Intn(reapJitterSecs))*time.Second) + timer.Reset(r.reapPeriod + time.Duration(n.Int64())*time.Second) // Reap if err := r.reap(); err != nil { diff --git a/dead_pool_reaper_test.go b/dead_pool_reaper_test.go index 6c21cbc6..6952a536 100644 --- a/dead_pool_reaper_test.go +++ b/dead_pool_reaper_test.go @@ -5,12 +5,15 @@ import ( "time" "github.com/gomodule/redigo/redis" + "github.com/google/uuid" "github.com/stretchr/testify/assert" ) func TestDeadPoolReaper(t *testing.T) { - pool := newTestPool(":6379") - ns := "work" + t.Parallel() + + pool := newTestPool() + ns := uuid.New().String() cleanKeyspace(ns, pool) conn := pool.Get() @@ -92,8 +95,10 @@ func TestDeadPoolReaper(t *testing.T) { } func TestDeadPoolReaperNoHeartbeat(t *testing.T) { - pool := newTestPool(":6379") - ns := "work" + t.Parallel() + + pool := newTestPool() + ns := uuid.New().String() conn := pool.Get() defer conn.Close() @@ -179,8 +184,10 @@ func TestDeadPoolReaperNoHeartbeat(t *testing.T) { } func TestDeadPoolReaperNoJobTypes(t *testing.T) { - pool := newTestPool(":6379") - ns := "work" + t.Parallel() + + pool := newTestPool() + ns := uuid.New().String() cleanKeyspace(ns, pool) conn := pool.Get() @@ -255,8 +262,8 @@ func TestDeadPoolReaperNoJobTypes(t *testing.T) { } func TestDeadPoolReaperWithWorkerPools(t *testing.T) { - pool := newTestPool(":6379") - ns := "work" + pool := newTestPool() + ns := uuid.New().String() job1 := "job1" stalePoolID := "aaa" cleanKeyspace(ns, pool) @@ -295,8 +302,10 @@ func TestDeadPoolReaperWithWorkerPools(t *testing.T) { } func TestDeadPoolReaperCleanStaleLocks(t *testing.T) { - pool := newTestPool(":6379") - ns := "work" + t.Parallel() + + pool := newTestPool() + ns := uuid.New().String() cleanKeyspace(ns, pool) conn := pool.Get() @@ -341,7 +350,9 @@ func TestDeadPoolReaperCleanStaleLocks(t *testing.T) { assert.EqualValues(t, 0, getInt64(pool, lock2)) // worker pool ID 2 removed from both lock info hashes v, err = conn.Do("HGET", lockInfo1, workerPoolID2) + assert.NoError(t, err) assert.Nil(t, v) v, err = conn.Do("HGET", lockInfo2, workerPoolID2) assert.Nil(t, v) + assert.NoError(t, err) } diff --git a/enqueue.go b/enqueue.go index 0d2f763e..c930bc73 100644 --- a/enqueue.go +++ b/enqueue.go @@ -2,7 +2,6 @@ package work import ( "sync" - "time" "github.com/gomodule/redigo/redis" ) @@ -155,7 +154,7 @@ func (e *Enqueuer) EnqueueUniqueInByKey(jobName string, secondsFromNow int64, ar func (e *Enqueuer) addToKnownJobs(conn redis.Conn, jobName string) error { needSadd := true - now := time.Now().Unix() + now := nowEpochSeconds() e.mtx.RLock() t, ok := e.knownJobs[jobName] diff --git a/enqueue_test.go b/enqueue_test.go index 8ca41fa6..1b0b3969 100644 --- a/enqueue_test.go +++ b/enqueue_test.go @@ -6,12 +6,15 @@ import ( "testing" "time" + "github.com/google/uuid" "github.com/stretchr/testify/assert" ) func TestEnqueue(t *testing.T) { - pool := newTestPool(":6379") - ns := "work" + t.Parallel() + + pool := newTestPool() + ns := uuid.New().String() cleanKeyspace(ns, pool) enqueuer := NewEnqueuer(ns, pool) job, err := enqueuer.Enqueue("wat", Q{"a": 1, "b": "cool"}) @@ -46,14 +49,17 @@ func TestEnqueue(t *testing.T) { // Now enqueue another job, make sure that we can enqueue multiple _, err = enqueuer.Enqueue("wat", Q{"a": 1, "b": "cool"}) + assert.NoError(t, err) _, err = enqueuer.Enqueue("wat", Q{"a": 1, "b": "cool"}) - assert.Nil(t, err) + assert.NoError(t, err) assert.EqualValues(t, 2, listSize(pool, redisKeyJobs(ns, "wat"))) } func TestEnqueueIn(t *testing.T) { - pool := newTestPool(":6379") - ns := "work" + t.Parallel() + + pool := newTestPool() + ns := uuid.New().String() cleanKeyspace(ns, pool) enqueuer := NewEnqueuer(ns, pool) @@ -99,8 +105,8 @@ func TestEnqueueIn(t *testing.T) { } func TestEnqueueUnique(t *testing.T) { - pool := newTestPool(":6379") - ns := "work" + pool := newTestPool() + ns := uuid.New().String() cleanKeyspace(ns, pool) enqueuer := NewEnqueuer(ns, pool) var mutex = &sync.Mutex{} @@ -175,8 +181,8 @@ func TestEnqueueUnique(t *testing.T) { } func TestEnqueueUniqueIn(t *testing.T) { - pool := newTestPool(":6379") - ns := "work" + pool := newTestPool() + ns := uuid.New().String() cleanKeyspace(ns, pool) enqueuer := NewEnqueuer(ns, pool) @@ -232,11 +238,13 @@ func TestEnqueueUniqueIn(t *testing.T) { } func TestEnqueueUniqueByKey(t *testing.T) { + t.Parallel() + var arg3 string var arg4 string - pool := newTestPool(":6379") - ns := "work" + pool := newTestPool() + ns := uuid.New().String() cleanKeyspace(ns, pool) enqueuer := NewEnqueuer(ns, pool) var mutex = &sync.Mutex{} @@ -314,43 +322,3 @@ func TestEnqueueUniqueByKey(t *testing.T) { assert.NoError(t, err) assert.NotNil(t, job) } - -func EnqueueUniqueInByKey(t *testing.T) { - pool := newTestPool(":6379") - ns := "work" - cleanKeyspace(ns, pool) - enqueuer := NewEnqueuer(ns, pool) - - // Enqueue two unique jobs -- ensure one job sticks. - job, err := enqueuer.EnqueueUniqueInByKey("wat", 300, Q{"a": 1, "b": "cool"}, Q{"key": "123"}) - assert.NoError(t, err) - if assert.NotNil(t, job) { - assert.Equal(t, "wat", job.Name) - assert.True(t, len(job.ID) > 10) // Something is in it - assert.True(t, job.EnqueuedAt > (time.Now().Unix()-10)) // Within 10 seconds - assert.True(t, job.EnqueuedAt < (time.Now().Unix()+10)) // Within 10 seconds - assert.Equal(t, "cool", job.ArgString("b")) - assert.EqualValues(t, 1, job.ArgInt64("a")) - assert.NoError(t, job.ArgError()) - assert.EqualValues(t, job.EnqueuedAt+300, job.RunAt) - } - - job, err = enqueuer.EnqueueUniqueInByKey("wat", 10, Q{"a": 1, "b": "cool"}, Q{"key": "123"}) - assert.NoError(t, err) - assert.Nil(t, job) - - // Get the job - score, j := jobOnZset(pool, redisKeyScheduled(ns)) - - assert.True(t, score > time.Now().Unix()+290) // We don't want to overwrite the time - assert.True(t, score <= time.Now().Unix()+300) - - assert.Equal(t, "wat", j.Name) - assert.True(t, len(j.ID) > 10) // Something is in it - assert.True(t, j.EnqueuedAt > (time.Now().Unix()-10)) // Within 10 seconds - assert.True(t, j.EnqueuedAt < (time.Now().Unix()+10)) // Within 10 seconds - assert.Equal(t, "cool", j.ArgString("b")) - assert.EqualValues(t, 1, j.ArgInt64("a")) - assert.NoError(t, j.ArgError()) - assert.True(t, j.Unique) -} diff --git a/go.mod b/go.mod index 224eab98..c5548045 100644 --- a/go.mod +++ b/go.mod @@ -3,27 +3,10 @@ module github.com/gocraft/work go 1.14 require ( - github.com/albrow/jobs v0.4.2 - github.com/benmanns/goworker v0.1.3 - github.com/bitly/go-simplejson v0.5.0 // indirect - github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869 // indirect github.com/braintree/manners v0.0.0-20160418043613-82a8879fc5fd - github.com/cihub/seelog v0.0.0-20170130134532-f561c5e57575 // indirect - github.com/customerio/gospec v0.0.0-20130710230057-a5cc0e48aa39 // indirect - github.com/dchest/uniuri v0.0.0-20200228104902-7aecb25e1fe5 // indirect - github.com/dustin/go-humanize v1.0.0 // indirect - github.com/garyburd/redigo v1.6.0 // indirect - github.com/gocraft/health v0.0.0-20170925182251-8675af27fef0 github.com/gocraft/web v0.0.0-20190207150652-9707327fb69b - github.com/gocraft/work v0.5.1 - github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b // indirect - github.com/gomodule/redigo v2.0.0+incompatible - github.com/jrallison/go-workers v0.0.0-20180112190529-dbf81d0b75bb - github.com/kr/pretty v0.2.0 // indirect - github.com/orfjackal/nanospec.go v0.0.0-20120727230329-de4694c1d701 // indirect - github.com/robfig/cron v1.2.0 // indirect + github.com/gomodule/redigo v1.8.2 + github.com/google/uuid v1.1.2 github.com/robfig/cron/v3 v3.0.1 - github.com/stretchr/testify v1.5.1 - github.com/youtube/vitess v2.1.1+incompatible // indirect - golang.org/x/net v0.0.0-20200425230154-ff2c4b7c35a0 // indirect + github.com/stretchr/testify v1.6.1 ) diff --git a/go.sum b/go.sum index 63aa5f0a..1280b97e 100644 --- a/go.sum +++ b/go.sum @@ -1,62 +1,23 @@ -github.com/albrow/jobs v0.4.2 h1:AhhNNgtnOz3h+Grt6uuRJP+uj/AVq+ZhIBY8Mzkf4TM= -github.com/albrow/jobs v0.4.2/go.mod h1:e4sWh7D1DxPbpxrzJhNo/cMARAljpTYF/osgh2j3+r8= -github.com/benmanns/goworker v0.1.3 h1:ekwn7WiKsn8oUOKfbHDqsA6g5bXz/uEZ9AdnKgtAECY= -github.com/benmanns/goworker v0.1.3/go.mod h1:Gj3m7lTyCswE3+Kta7c79CMOmm5rHJmj2qh/GAmojJ4= -github.com/bitly/go-simplejson v0.5.0 h1:6IH+V8/tVMab511d5bn4M7EwGXZf9Hj6i2xSwkNEM+Y= -github.com/bitly/go-simplejson v0.5.0/go.mod h1:cXHtHw4XUPsvGaxgjIAn8PhEWG9NfngEKAMDJEczWVA= -github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869 h1:DDGfHa7BWjL4YnC6+E63dPcxHo2sUxDIu8g3QgEJdRY= -github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869/go.mod h1:Ekp36dRnpXw/yCqJaO+ZrUyxD+3VXMFFr56k5XYrpB4= github.com/braintree/manners v0.0.0-20160418043613-82a8879fc5fd h1:ePesaBzdTmoMQjwqRCLP2jY+jjWMBpwws/LEQdt1fMM= github.com/braintree/manners v0.0.0-20160418043613-82a8879fc5fd/go.mod h1:TNehV1AhBwtT7Bd+rh8G6MoGDbBLNs/sKdk3nvr4Yzg= -github.com/cihub/seelog v0.0.0-20170130134532-f561c5e57575 h1:kHaBemcxl8o/pQ5VM1c8PVE1PubbNx3mjUr09OqWGCs= -github.com/cihub/seelog v0.0.0-20170130134532-f561c5e57575/go.mod h1:9d6lWj8KzO/fd/NrVaLscBKmPigpZpn5YawRPw+e3Yo= -github.com/customerio/gospec v0.0.0-20130710230057-a5cc0e48aa39 h1:O0YTztXI3XeJXlFhSo4wNb0VBVqSgT+hi/CjNWKvMnY= -github.com/customerio/gospec v0.0.0-20130710230057-a5cc0e48aa39/go.mod h1:OzYUFhPuL2JbjwFwrv6CZs23uBawekc6OZs+g19F0mY= github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= -github.com/dchest/uniuri v0.0.0-20200228104902-7aecb25e1fe5 h1:RAV05c0xOkJ3dZGS0JFybxFKZ2WMLabgx3uXnd7rpGs= -github.com/dchest/uniuri v0.0.0-20200228104902-7aecb25e1fe5/go.mod h1:GgB8SF9nRG+GqaDtLcwJZsQFhcogVCJ79j4EdT0c2V4= -github.com/dustin/go-humanize v1.0.0 h1:VSnTsYCnlFHaM2/igO1h6X3HA71jcobQuxemgkq4zYo= -github.com/dustin/go-humanize v1.0.0/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk= -github.com/garyburd/redigo v1.6.0 h1:0VruCpn7yAIIu7pWVClQC8wxCJEcG3nyzpMSHKi1PQc= -github.com/garyburd/redigo v1.6.0/go.mod h1:NR3MbYisc3/PwhQ00EMzDiPmrwpPxAn5GI05/YaO1SY= -github.com/gocraft/health v0.0.0-20170925182251-8675af27fef0 h1:pKjeDsx7HGGbjr7VGI1HksxDJqSjaGED3cSw9GeSI98= -github.com/gocraft/health v0.0.0-20170925182251-8675af27fef0/go.mod h1:rWibcVfwbUxi/QXW84U7vNTcIcZFd6miwbt8ritxh/Y= github.com/gocraft/web v0.0.0-20190207150652-9707327fb69b h1:g2Qcs0B+vOQE1L3a7WQ/JUUSzJnHbTz14qkJSqEWcF4= github.com/gocraft/web v0.0.0-20190207150652-9707327fb69b/go.mod h1:Ag7UMbZNGrnHwaXPJOUKJIVgx4QOWMOWZngrvsN6qak= -github.com/gocraft/work v0.5.1 h1:3bRjMiOo6N4zcRgZWV3Y7uX7R22SF+A9bPTk4xRXr34= -github.com/gocraft/work v0.5.1/go.mod h1:pc3n9Pb5FAESPPGfM0nL+7Q1xtgtRnF8rr/azzhQVlM= -github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b h1:VKtxabqXZkF25pY9ekfRL6a582T4P37/31XEstQ5p58= -github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q= -github.com/gomodule/redigo v2.0.0+incompatible h1:K/R+8tc58AaqLkqG2Ol3Qk+DR/TlNuhuh457pBFPtt0= -github.com/gomodule/redigo v2.0.0+incompatible/go.mod h1:B4C85qUVwatsJoIUNIfCRsp7qO0iAmpGFZ4EELWSbC4= -github.com/jrallison/go-workers v0.0.0-20180112190529-dbf81d0b75bb h1:y9LFhCM3gwK94Xz9/h7GcSVLteky9pFHEkP04AqQupA= -github.com/jrallison/go-workers v0.0.0-20180112190529-dbf81d0b75bb/go.mod h1:ziQRRNHCWZe0wVNzF8y8kCWpso0VMpqHJjB19DSenbE= -github.com/kr/pretty v0.2.0 h1:s5hAObm+yFO5uHYt5dYjxi2rXrsnmRpJx4OYvIWUaQs= -github.com/kr/pretty v0.2.0/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI= -github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= -github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE= -github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= -github.com/orfjackal/nanospec.go v0.0.0-20120727230329-de4694c1d701 h1:yOXfzNV7qkZ3nf2NPqy4BMzlCmnQzIEbI1vuqKb2FkQ= -github.com/orfjackal/nanospec.go v0.0.0-20120727230329-de4694c1d701/go.mod h1:VtBIF1XX0c1nKkeAPk8i4aXkYopqQgfDqolHUIHPwNI= +github.com/gomodule/redigo v1.8.2 h1:H5XSIre1MB5NbPYFp+i1NBbb5qN1W8Y8YAQoAYbkm8k= +github.com/gomodule/redigo v1.8.2/go.mod h1:P9dn9mFrCBvWhGE1wpxx6fgq7BAeLBk+UUUzlpkBYO0= +github.com/google/uuid v1.1.2 h1:EVhdT+1Kseyi1/pUmXKaFxYsDNy9RQYkMWRH68J/W7Y= +github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= -github.com/robfig/cron v1.2.0 h1:ZjScXvvxeQ63Dbyxy76Fj3AT3Ut0aKsyd2/tl3DTMuQ= -github.com/robfig/cron v1.2.0/go.mod h1:JGuDeoQd7Z6yL4zQhZ3OPEVHB7fL6Ka6skscFHfmt2k= github.com/robfig/cron/v3 v3.0.1 h1:WdRxkvbJztn8LMz/QEvLN5sBU+xKpSqwwUO1Pjr4qDs= github.com/robfig/cron/v3 v3.0.1/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= -github.com/stretchr/testify v1.5.1 h1:nOGnQDM7FYENwehXlg/kFVnos3rEvtKTjRvOWSzb6H4= github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA= -github.com/youtube/vitess v2.1.1+incompatible h1:SE+P7DNX/jw5RHFs5CHRhZQjq402EJFCD33JhzQMdDw= -github.com/youtube/vitess v2.1.1+incompatible/go.mod h1:hpMim5/30F1r+0P8GGtB29d0gWHr0IZ5unS+CG0zMx8= -golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= -golang.org/x/net v0.0.0-20200425230154-ff2c4b7c35a0 h1:Jcxah/M+oLZ/R4/z5RzfPzGbPXnVDPkEDtf2JnuxN+U= -golang.org/x/net v0.0.0-20200425230154-ff2c4b7c35a0/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A= -golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= -golang.org/x/sys v0.0.0-20200323222414-85ca7c5b95cd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +github.com/stretchr/testify v1.6.1 h1:hDPOHmpOpP40lSULcqw7IrRb/u7w6RpDC9399XyoNd0= +github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= -gopkg.in/yaml.v2 v2.2.2 h1:ZCJp+EgiOT7lHqUV2J862kp8Qj64Jo6az82+3Td9dZw= gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/heartbeater.go b/heartbeater.go index ccc229c4..1de76585 100644 --- a/heartbeater.go +++ b/heartbeater.go @@ -73,7 +73,7 @@ func (h *workerPoolHeartbeater) stop() { func (h *workerPoolHeartbeater) loop() { h.startedAt = nowEpochSeconds() h.heartbeat() // do it right away - ticker := time.Tick(h.beatPeriod) + ticker := time.NewTicker(h.beatPeriod).C for { select { case <-h.stopChan: diff --git a/heartbeater_test.go b/heartbeater_test.go index 2af2d2a9..eade1e4e 100644 --- a/heartbeater_test.go +++ b/heartbeater_test.go @@ -5,12 +5,13 @@ import ( "time" "github.com/gomodule/redigo/redis" + "github.com/google/uuid" "github.com/stretchr/testify/assert" ) func TestHeartbeater(t *testing.T) { - pool := newTestPool(":6379") - ns := "work" + pool := newTestPool() + ns := uuid.New().String() tMock := int64(1425263409) setNowEpochSecondsMock(tMock) diff --git a/identifier_test.go b/identifier_test.go index ccf89dfd..f723fdac 100644 --- a/identifier_test.go +++ b/identifier_test.go @@ -3,6 +3,8 @@ package work import "testing" func TestMakeIdentifier(t *testing.T) { + t.Parallel() + id := makeIdentifier() if len(id) < 10 { t.Errorf("expected a string of length 10 at least") diff --git a/job_test.go b/job_test.go index aa6222f2..cbb5af6d 100644 --- a/job_test.go +++ b/job_test.go @@ -8,6 +8,8 @@ import ( ) func TestJobArgumentExtraction(t *testing.T) { + t.Parallel() + j := Job{} j.setArg("str1", "bar") @@ -88,6 +90,8 @@ func TestJobArgumentExtraction(t *testing.T) { } func TestJobArgumentExtractionBadString(t *testing.T) { + t.Parallel() + var testCases = []struct { key string val interface{} @@ -127,6 +131,8 @@ func TestJobArgumentExtractionBadString(t *testing.T) { } func TestJobArgumentExtractionBadBool(t *testing.T) { + t.Parallel() + var testCases = []struct { key string val interface{} @@ -167,6 +173,8 @@ func TestJobArgumentExtractionBadBool(t *testing.T) { } func TestJobArgumentExtractionBadInt(t *testing.T) { + t.Parallel() + var testCases = []struct { key string val interface{} @@ -213,6 +221,8 @@ func TestJobArgumentExtractionBadInt(t *testing.T) { } func TestJobArgumentExtractionBadFloat(t *testing.T) { + t.Parallel() + var testCases = []struct { key string val interface{} diff --git a/observer.go b/observer.go index 5c96ac39..2c28cf48 100644 --- a/observer.go +++ b/observer.go @@ -124,7 +124,7 @@ func (o *observer) loop() { // Every tick we'll update redis if necessary // We don't update it on every job because the only purpose of this data is for humans to inspect the system, // and a fast worker could move onto new jobs every few ms. - ticker := time.Tick(1000 * time.Millisecond) + ticker := time.NewTicker(1000 * time.Millisecond).C for { select { diff --git a/observer_test.go b/observer_test.go index d95e729b..bcacdaa8 100644 --- a/observer_test.go +++ b/observer_test.go @@ -5,12 +5,13 @@ import ( "testing" "github.com/gomodule/redigo/redis" + "github.com/google/uuid" "github.com/stretchr/testify/assert" ) func TestObserverStarted(t *testing.T) { - pool := newTestPool(":6379") - ns := "work" + pool := newTestPool() + ns := uuid.New().String() tMock := int64(1425263401) setNowEpochSecondsMock(tMock) @@ -31,8 +32,8 @@ func TestObserverStarted(t *testing.T) { } func TestObserverStartedDone(t *testing.T) { - pool := newTestPool(":6379") - ns := "work" + pool := newTestPool() + ns := uuid.New().String() tMock := int64(1425263401) setNowEpochSecondsMock(tMock) @@ -50,8 +51,8 @@ func TestObserverStartedDone(t *testing.T) { } func TestObserverCheckin(t *testing.T) { - pool := newTestPool(":6379") - ns := "work" + pool := newTestPool() + ns := uuid.New().String() observer := newObserver(ns, pool, "abcd") observer.start() @@ -77,8 +78,8 @@ func TestObserverCheckin(t *testing.T) { } func TestObserverCheckinFromJob(t *testing.T) { - pool := newTestPool(":6379") - ns := "work" + pool := newTestPool() + ns := uuid.New().String() observer := newObserver(ns, pool, "abcd") observer.start() diff --git a/periodic_enqueuer.go b/periodic_enqueuer.go index ae957e7f..ad91615a 100644 --- a/periodic_enqueuer.go +++ b/periodic_enqueuer.go @@ -1,8 +1,9 @@ package work import ( + "crypto/rand" "fmt" - "math/rand" + "math/big" "time" "github.com/gomodule/redigo/redis" @@ -15,12 +16,11 @@ const ( ) type periodicEnqueuer struct { - namespace string - pool *redis.Pool - periodicJobs []*periodicJob - scheduledPeriodicJobs []*scheduledPeriodicJob - stopChan chan struct{} - doneStoppingChan chan struct{} + namespace string + pool *redis.Pool + periodicJobs []*periodicJob + stopChan chan struct{} + doneStoppingChan chan struct{} } type periodicJob struct { @@ -29,12 +29,6 @@ type periodicJob struct { schedule cron.Schedule } -type scheduledPeriodicJob struct { - scheduledAt time.Time - scheduledAtEpoch int64 - *periodicJob -} - func newPeriodicEnqueuer(namespace string, pool *redis.Pool, periodicJobs []*periodicJob) *periodicEnqueuer { return &periodicEnqueuer{ namespace: namespace, @@ -56,7 +50,11 @@ func (pe *periodicEnqueuer) stop() { func (pe *periodicEnqueuer) loop() { // Begin reaping periodically - timer := time.NewTimer(periodicEnqueuerSleep + time.Duration(rand.Intn(30))*time.Second) + n, err := rand.Int(rand.Reader, big.NewInt(30)) + if err != nil { + panic(err) + } + timer := time.NewTimer(periodicEnqueuerSleep + time.Duration(n.Int64())*time.Second) defer timer.Stop() if pe.shouldEnqueue() { @@ -72,7 +70,12 @@ func (pe *periodicEnqueuer) loop() { pe.doneStoppingChan <- struct{}{} return case <-timer.C: - timer.Reset(periodicEnqueuerSleep + time.Duration(rand.Intn(30))*time.Second) + n, err := rand.Int(rand.Reader, big.NewInt(30)) + if err != nil { + panic(err) + } + + timer.Reset(periodicEnqueuerSleep + time.Duration(n.Int64())*time.Second) if pe.shouldEnqueue() { err := pe.enqueue() if err != nil { diff --git a/periodic_enqueuer_test.go b/periodic_enqueuer_test.go index e460a041..56026e32 100644 --- a/periodic_enqueuer_test.go +++ b/periodic_enqueuer_test.go @@ -5,13 +5,14 @@ import ( "time" "github.com/gomodule/redigo/redis" + "github.com/google/uuid" "github.com/robfig/cron/v3" "github.com/stretchr/testify/assert" ) func TestPeriodicEnqueuer(t *testing.T) { - pool := newTestPool(":6379") - ns := "work" + pool := newTestPool() + ns := uuid.New().String() cleanKeyspace(ns, pool) var pjs []*periodicJob @@ -98,8 +99,10 @@ func TestPeriodicEnqueuer(t *testing.T) { } func TestPeriodicEnqueuerSpawn(t *testing.T) { - pool := newTestPool(":6379") - ns := "work" + t.Parallel() + + pool := newTestPool() + ns := uuid.New().String() cleanKeyspace(ns, pool) pe := newPeriodicEnqueuer(ns, pool, nil) diff --git a/priority_sampler.go b/priority_sampler.go index a31d017f..a81f084d 100644 --- a/priority_sampler.go +++ b/priority_sampler.go @@ -1,7 +1,9 @@ package work import ( - "math/rand" + "crypto/rand" + "math" + "math/big" ) type prioritySampler struct { @@ -55,9 +57,13 @@ func (s *prioritySampler) sample() []sampleItem { // and see where the random number fits in the continuum. // If we find where it fits, sort the item to the next slot towards the front of the slice. for remaining > 1 { - // rn from [0 to sumRemaining) - rn := uint(rand.Uint32()) % sumRemaining + n, err := rand.Int(rand.Reader, big.NewInt(math.MaxUint32)) + if err != nil { + panic(err) + } + // rn from [0 to sumRemaining) + rn := uint(n.Uint64()) % sumRemaining prevSum := uint(0) for i := lenSamples - 1; i >= lastValidIdx; i-- { sample := s.samples[i] diff --git a/priority_sampler_test.go b/priority_sampler_test.go index 6117601c..07b16813 100644 --- a/priority_sampler_test.go +++ b/priority_sampler_test.go @@ -35,6 +35,7 @@ func TestPrioritySampler(t *testing.T) { // make sure these numbers are roughly correct. note that probability is a thing. assert.True(t, c5 > (2*c2)) + // below test is indeterministic, will keep this as originally intended until we understand what this testing assert.True(t, float64(c2) > (1.5*float64(c1))) assert.True(t, c1 >= (total/13), fmt.Sprintf("c1 = %d total = %d total/13=%d", c1, total, total/13)) assert.True(t, float64(c1end) > (float64(total)*0.50)) diff --git a/requeuer.go b/requeuer.go index 55fa4513..c05e5635 100644 --- a/requeuer.go +++ b/requeuer.go @@ -65,7 +65,7 @@ func (r *requeuer) loop() { // If we have 100 processes all running requeuers, // there's probably too much hitting redis. // So later on we'l have to implement exponential backoff - ticker := time.Tick(1000 * time.Millisecond) + ticker := time.NewTicker(1000 * time.Millisecond).C for { select { diff --git a/requeuer_test.go b/requeuer_test.go index 65418eca..4c3df430 100644 --- a/requeuer_test.go +++ b/requeuer_test.go @@ -3,12 +3,13 @@ package work import ( "testing" + "github.com/google/uuid" "github.com/stretchr/testify/assert" ) func TestRequeue(t *testing.T) { - pool := newTestPool(":6379") - ns := "work" + pool := newTestPool() + ns := uuid.New().String() cleanKeyspace(ns, pool) tMock := nowEpochSeconds() - 10 @@ -50,8 +51,8 @@ func TestRequeue(t *testing.T) { } func TestRequeueUnknown(t *testing.T) { - pool := newTestPool(":6379") - ns := "work" + pool := newTestPool() + ns := uuid.New().String() cleanKeyspace(ns, pool) tMock := nowEpochSeconds() - 10 diff --git a/run.go b/run.go index c6f38615..ea23579e 100644 --- a/run.go +++ b/run.go @@ -48,7 +48,5 @@ func runJob(job *Job, ctxType reflect.Type, middleware []*middlewareHandler, jt } }() - returnError = next() - - return + return returnCtx, next() } diff --git a/run_test.go b/run_test.go index 5bf0d2c9..1dd976e2 100644 --- a/run_test.go +++ b/run_test.go @@ -9,6 +9,8 @@ import ( ) func TestRunBasicMiddleware(t *testing.T) { + t.Parallel() + mw1 := func(j *Job, next NextMiddlewareFunc) error { j.setArg("mw1", "mw1") return next() @@ -55,6 +57,8 @@ func TestRunBasicMiddleware(t *testing.T) { } func TestRunHandlerError(t *testing.T) { + t.Parallel() + mw1 := func(j *Job, next NextMiddlewareFunc) error { return next() } @@ -86,6 +90,8 @@ func TestRunHandlerError(t *testing.T) { } func TestRunMwError(t *testing.T) { + t.Parallel() + mw1 := func(j *Job, next NextMiddlewareFunc) error { return fmt.Errorf("mw1_err") } @@ -114,6 +120,8 @@ func TestRunMwError(t *testing.T) { } func TestRunHandlerPanic(t *testing.T) { + t.Parallel() + mw1 := func(j *Job, next NextMiddlewareFunc) error { return next() } @@ -143,6 +151,8 @@ func TestRunHandlerPanic(t *testing.T) { } func TestRunMiddlewarePanic(t *testing.T) { + t.Parallel() + mw1 := func(j *Job, next NextMiddlewareFunc) error { panic("dayam") } diff --git a/time.go b/time.go index a4548364..b12b880b 100644 --- a/time.go +++ b/time.go @@ -1,6 +1,8 @@ package work -import "time" +import ( + "time" +) var nowMock int64 @@ -18,8 +20,3 @@ func setNowEpochSecondsMock(t int64) { func resetNowEpochSecondsMock() { nowMock = 0 } - -// convert epoch seconds to a time -func epochSecondsToTime(t int64) time.Time { - return time.Time{} -} diff --git a/webui/internal/assets/assets.go b/webui/internal/assets/assets.go index 2b99d409..afbe8190 100644 --- a/webui/internal/assets/assets.go +++ b/webui/internal/assets/assets.go @@ -162,7 +162,7 @@ func AssetNames() []string { // _bindata is a table, holding each asset generator, mapped to its name. var _bindata = map[string]func() (*asset, error){ "index.html": indexHtml, - "work.js": workJs, + "work.js": workJs, } // AssetDir returns the file names below a certain @@ -204,9 +204,10 @@ type bintree struct { Func func() (*asset, error) Children map[string]*bintree } + var _bintree = &bintree{nil, map[string]*bintree{ - "index.html": &bintree{indexHtml, map[string]*bintree{}}, - "work.js": &bintree{workJs, map[string]*bintree{}}, + "index.html": {indexHtml, map[string]*bintree{}}, + "work.js": {workJs, map[string]*bintree{}}, }} // RestoreAsset restores an asset under the given directory @@ -255,4 +256,3 @@ func _filePath(dir, name string) string { cannonicalName := strings.Replace(name, "\\", "/", -1) return filepath.Join(append([]string{dir}, strings.Split(cannonicalName, "/")...)...) } - diff --git a/webui/webui_test.go b/webui/webui_test.go index 6bffc745..ece6ed87 100644 --- a/webui/webui_test.go +++ b/webui/webui_test.go @@ -11,12 +11,15 @@ import ( "github.com/gocraft/work" "github.com/gomodule/redigo/redis" + "github.com/google/uuid" "github.com/stretchr/testify/assert" ) func TestWebUIStartStop(t *testing.T) { - pool := newTestPool(":6379") - ns := "work" + t.Parallel() + + pool := newTestPool() + ns := uuid.New().String() cleanKeyspace(ns, pool) s := NewServer(ns, pool, ":6666") @@ -27,8 +30,10 @@ func TestWebUIStartStop(t *testing.T) { type TestContext struct{} func TestWebUIQueues(t *testing.T) { - pool := newTestPool(":6379") - ns := "work" + t.Parallel() + + pool := newTestPool() + ns := uuid.New().String() cleanKeyspace(ns, pool) // Get some stuff to to show up in the jobs: @@ -83,8 +88,10 @@ func TestWebUIQueues(t *testing.T) { } func TestWebUIWorkerPools(t *testing.T) { - pool := newTestPool(":6379") - ns := "work" + t.Parallel() + + pool := newTestPool() + ns := uuid.New().String() cleanKeyspace(ns, pool) wp := work.NewWorkerPool(TestContext{}, 10, ns, pool) @@ -121,8 +128,10 @@ func TestWebUIWorkerPools(t *testing.T) { } func TestWebUIBusyWorkers(t *testing.T) { - pool := newTestPool(":6379") - ns := "work" + t.Parallel() + + pool := newTestPool() + ns := uuid.New().String() cleanKeyspace(ns, pool) // Keep a job in the in-progress state without using sleeps @@ -183,8 +192,10 @@ func TestWebUIBusyWorkers(t *testing.T) { } func TestWebUIRetryJobs(t *testing.T) { - pool := newTestPool(":6379") - ns := "work" + t.Parallel() + + pool := newTestPool() + ns := uuid.New().String() cleanKeyspace(ns, pool) enqueuer := work.NewEnqueuer(ns, pool) @@ -226,8 +237,10 @@ func TestWebUIRetryJobs(t *testing.T) { } func TestWebUIScheduledJobs(t *testing.T) { - pool := newTestPool(":6379") - ns := "testwork" + t.Parallel() + + pool := newTestPool() + ns := uuid.New().String() cleanKeyspace(ns, pool) enqueuer := work.NewEnqueuer(ns, pool) @@ -259,12 +272,15 @@ func TestWebUIScheduledJobs(t *testing.T) { } func TestWebUIDeadJobs(t *testing.T) { - pool := newTestPool(":6379") - ns := "testwork" + t.Parallel() + + pool := newTestPool() + ns := uuid.New().String() cleanKeyspace(ns, pool) enqueuer := work.NewEnqueuer(ns, pool) _, err := enqueuer.Enqueue("wat", nil) + assert.Nil(t, err) _, err = enqueuer.Enqueue("wat", nil) assert.Nil(t, err) @@ -347,12 +363,15 @@ func TestWebUIDeadJobs(t *testing.T) { } func TestWebUIDeadJobsDeleteRetryAll(t *testing.T) { - pool := newTestPool(":6379") - ns := "testwork" + t.Parallel() + + pool := newTestPool() + ns := uuid.New().String() cleanKeyspace(ns, pool) enqueuer := work.NewEnqueuer(ns, pool) _, err := enqueuer.Enqueue("wat", nil) + assert.Nil(t, err) _, err = enqueuer.Enqueue("wat", nil) assert.Nil(t, err) @@ -448,14 +467,16 @@ func TestWebUIDeadJobsDeleteRetryAll(t *testing.T) { } func TestWebUIAssets(t *testing.T) { - pool := newTestPool(":6379") - ns := "testwork" + t.Parallel() + + pool := newTestPool() + ns := uuid.New().String() s := NewServer(ns, pool, ":6666") recorder := httptest.NewRecorder() request, _ := http.NewRequest("GET", "/", nil) s.router.ServeHTTP(recorder, request) - body := string(recorder.Body.Bytes()) + body := recorder.Body.String() assert.Regexp(t, "html", body) recorder = httptest.NewRecorder() @@ -463,13 +484,13 @@ func TestWebUIAssets(t *testing.T) { s.router.ServeHTTP(recorder, request) } -func newTestPool(addr string) *redis.Pool { +func newTestPool() *redis.Pool { return &redis.Pool{ MaxActive: 3, MaxIdle: 3, IdleTimeout: 240 * time.Second, Dial: func() (redis.Conn, error) { - return redis.Dial("tcp", addr) + return redis.Dial("tcp", ":6379") }, Wait: true, } diff --git a/worker.go b/worker.go index d7578334..cb021c46 100644 --- a/worker.go +++ b/worker.go @@ -1,8 +1,9 @@ package work import ( + "crypto/rand" "fmt" - "math/rand" + "math/big" "reflect" "time" @@ -32,7 +33,7 @@ type worker struct { doneDrainingChan chan struct{} } -func newWorker(namespace string, poolID string, pool *redis.Pool, contextType reflect.Type, middleware []*middlewareHandler, jobTypes map[string]*jobType, sleepBackoffs []int64) *worker { +func newWorker(namespace string, poolID string, pool *redis.Pool, contextType reflect.Type, jobTypes map[string]*jobType, sleepBackoffs []int64) *worker { workerID := makeIdentifier() ob := newObserver(namespace, pool, workerID) @@ -57,7 +58,7 @@ func newWorker(namespace string, poolID string, pool *redis.Pool, contextType re doneDrainingChan: make(chan struct{}), } - w.updateMiddlewareAndJobTypes(middleware, jobTypes) + w.updateMiddlewareAndJobTypes(nil, jobTypes) return w } @@ -280,7 +281,7 @@ func (w *worker) removeJobFromInProgress(job *Job, fate terminateOp) { type terminateOp func(conn redis.Conn) -func terminateOnly(_ redis.Conn) { return } +func terminateOnly(_ redis.Conn) {} func terminateAndRetry(w *worker, jt *jobType, job *Job) terminateOp { rawJSON, err := job.serialize() if err != nil { @@ -323,5 +324,10 @@ func (w *worker) jobFate(jt *jobType, job *Job) terminateOp { // Default algorithm returns an fastly increasing backoff counter which grows in an unbounded fashion func defaultBackoffCalculator(job *Job) int64 { fails := job.Fails - return (fails * fails * fails * fails) + 15 + (rand.Int63n(30) * (fails + 1)) + n, err := rand.Int(rand.Reader, big.NewInt(30)) + if err != nil { + panic(err) + } + + return (fails * fails * fails * fails) + 15 + (n.Int64() * (fails + 1)) } diff --git a/worker_pool.go b/worker_pool.go index d83c7b79..ab4afd87 100644 --- a/worker_pool.go +++ b/worker_pool.go @@ -109,7 +109,7 @@ func NewWorkerPoolWithOptions(ctx interface{}, concurrency uint, namespace strin } for i := uint(0); i < wp.concurrency; i++ { - w := newWorker(wp.namespace, wp.workerPoolID, wp.pool, wp.contextType, nil, wp.jobTypes, wp.sleepBackoffs) + w := newWorker(wp.namespace, wp.workerPoolID, wp.pool, wp.contextType, wp.jobTypes, wp.sleepBackoffs) wp.workers = append(wp.workers, w) } diff --git a/worker_pool_test.go b/worker_pool_test.go index 911aa9dc..6a056cc7 100644 --- a/worker_pool_test.go +++ b/worker_pool_test.go @@ -8,11 +8,11 @@ import ( "time" "github.com/gomodule/redigo/redis" + "github.com/google/uuid" "github.com/stretchr/testify/assert" ) type tstCtx struct { - a int bytes.Buffer } @@ -23,6 +23,8 @@ func (c *tstCtx) record(s string) { var tstCtxType = reflect.TypeOf(tstCtx{}) func TestWorkerPoolHandlerValidations(t *testing.T) { + t.Parallel() + var cases = []struct { fn interface{} good bool @@ -31,7 +33,7 @@ func TestWorkerPoolHandlerValidations(t *testing.T) { {func(c *tstCtx, j *Job) error { return nil }, true}, {func(c *tstCtx, j *Job) {}, false}, {func(c *tstCtx, j *Job) string { return "" }, false}, - {func(c *tstCtx, j *Job) (error, string) { return nil, "" }, false}, + {func(c *tstCtx, j *Job) (string, error) { return "", nil }, false}, {func(c *tstCtx) error { return nil }, false}, {func(c tstCtx, j *Job) error { return nil }, false}, {func() error { return nil }, false}, @@ -47,6 +49,8 @@ func TestWorkerPoolHandlerValidations(t *testing.T) { } func TestWorkerPoolMiddlewareValidations(t *testing.T) { + t.Parallel() + var cases = []struct { fn interface{} good bool @@ -56,7 +60,7 @@ func TestWorkerPoolMiddlewareValidations(t *testing.T) { {func(c *tstCtx, j *Job) error { return nil }, false}, {func(c *tstCtx, j *Job, n NextMiddlewareFunc) {}, false}, {func(c *tstCtx, j *Job, n NextMiddlewareFunc) string { return "" }, false}, - {func(c *tstCtx, j *Job, n NextMiddlewareFunc) (error, string) { return nil, "" }, false}, + {func(c *tstCtx, j *Job, n NextMiddlewareFunc) (string, error) { return "", nil }, false}, {func(c *tstCtx, n NextMiddlewareFunc) error { return nil }, false}, {func(c tstCtx, j *Job, n NextMiddlewareFunc) error { return nil }, false}, {func() error { return nil }, false}, @@ -73,8 +77,10 @@ func TestWorkerPoolMiddlewareValidations(t *testing.T) { } func TestWorkerPoolStartStop(t *testing.T) { - pool := newTestPool(":6379") - ns := "work" + t.Parallel() + + pool := newTestPool() + ns := uuid.New().String() wp := NewWorkerPool(TestContext{}, 10, ns, pool) wp.Start() wp.Start() @@ -85,8 +91,10 @@ func TestWorkerPoolStartStop(t *testing.T) { } func TestWorkerPoolValidations(t *testing.T) { - pool := newTestPool(":6379") - ns := "work" + t.Parallel() + + pool := newTestPool() + ns := uuid.New().String() wp := NewWorkerPool(TestContext{}, 10, ns, pool) func() { @@ -115,8 +123,8 @@ func TestWorkerPoolValidations(t *testing.T) { } func TestWorkersPoolRunSingleThreaded(t *testing.T) { - pool := newTestPool(":6379") - ns := "work" + pool := newTestPool() + ns := uuid.New().String() job1 := "job1" numJobs, concurrency, sleepTime := 5, 5, 2 wp := setupTestWorkerPool(pool, ns, job1, concurrency, JobOptions{Priority: 1, MaxConcurrency: 1}) @@ -158,7 +166,7 @@ func TestWorkersPoolRunSingleThreaded(t *testing.T) { } func TestWorkerPoolPauseSingleThreadedJobs(t *testing.T) { - pool := newTestPool(":6379") + pool := newTestPool() ns, job1 := "work", "job1" numJobs, concurrency, sleepTime := 5, 5, 2 wp := setupTestWorkerPool(pool, ns, job1, concurrency, JobOptions{Priority: 1, MaxConcurrency: 1}) diff --git a/worker_test.go b/worker_test.go index f1bddf77..72ea2dc0 100644 --- a/worker_test.go +++ b/worker_test.go @@ -8,12 +8,15 @@ import ( "time" "github.com/gomodule/redigo/redis" + "github.com/google/uuid" "github.com/stretchr/testify/assert" ) func TestWorkerBasics(t *testing.T) { - pool := newTestPool(":6379") - ns := "work" + t.Parallel() + + pool := newTestPool() + ns := uuid.New().String() job1 := "job1" job2 := "job2" job3 := "job3" @@ -61,7 +64,7 @@ func TestWorkerBasics(t *testing.T) { _, err = enqueuer.Enqueue(job3, Q{"a": 3}) assert.Nil(t, err) - w := newWorker(ns, "1", pool, tstCtxType, nil, jobTypes, nil) + w := newWorker(ns, "1", pool, tstCtxType, jobTypes, nil) w.start() w.drain() w.stop() @@ -89,8 +92,10 @@ func TestWorkerBasics(t *testing.T) { } func TestWorkerInProgress(t *testing.T) { - pool := newTestPool(":6379") - ns := "work" + t.Parallel() + + pool := newTestPool() + ns := uuid.New().String() job1 := "job1" deleteQueue(pool, ns, job1) deleteRetryAndDead(pool, ns) @@ -111,7 +116,7 @@ func TestWorkerInProgress(t *testing.T) { _, err := enqueuer.Enqueue(job1, Q{"a": 1}) assert.Nil(t, err) - w := newWorker(ns, "1", pool, tstCtxType, nil, jobTypes, nil) + w := newWorker(ns, "1", pool, tstCtxType, jobTypes, nil) w.start() // instead of w.forceIter(), we'll wait for 10 milliseconds to let the job start @@ -142,8 +147,10 @@ func TestWorkerInProgress(t *testing.T) { } func TestWorkerRetry(t *testing.T) { - pool := newTestPool(":6379") - ns := "work" + t.Parallel() + + pool := newTestPool() + ns := uuid.New().String() job1 := "job1" deleteQueue(pool, ns, job1) deleteRetryAndDead(pool, ns) @@ -162,7 +169,7 @@ func TestWorkerRetry(t *testing.T) { enqueuer := NewEnqueuer(ns, pool) _, err := enqueuer.Enqueue(job1, Q{"a": 1}) assert.Nil(t, err) - w := newWorker(ns, "1", pool, tstCtxType, nil, jobTypes, nil) + w := newWorker(ns, "1", pool, tstCtxType, jobTypes, nil) w.start() w.drain() w.stop() @@ -189,8 +196,10 @@ func TestWorkerRetry(t *testing.T) { // Check if a custom backoff function functions functionally. func TestWorkerRetryWithCustomBackoff(t *testing.T) { - pool := newTestPool(":6379") - ns := "work" + t.Parallel() + + pool := newTestPool() + ns := uuid.New().String() job1 := "job1" deleteQueue(pool, ns, job1) deleteRetryAndDead(pool, ns) @@ -214,7 +223,7 @@ func TestWorkerRetryWithCustomBackoff(t *testing.T) { enqueuer := NewEnqueuer(ns, pool) _, err := enqueuer.Enqueue(job1, Q{"a": 1}) assert.Nil(t, err) - w := newWorker(ns, "1", pool, tstCtxType, nil, jobTypes, nil) + w := newWorker(ns, "1", pool, tstCtxType, jobTypes, nil) w.start() w.drain() w.stop() @@ -239,8 +248,8 @@ func TestWorkerRetryWithCustomBackoff(t *testing.T) { } func TestWorkerDead(t *testing.T) { - pool := newTestPool(":6379") - ns := "work" + pool := newTestPool() + ns := uuid.New().String() job1 := "job1" job2 := "job2" deleteQueue(pool, ns, job1) @@ -271,7 +280,7 @@ func TestWorkerDead(t *testing.T) { assert.Nil(t, err) _, err = enqueuer.Enqueue(job2, nil) assert.Nil(t, err) - w := newWorker(ns, "1", pool, tstCtxType, nil, jobTypes, nil) + w := newWorker(ns, "1", pool, tstCtxType, jobTypes, nil) w.start() w.drain() w.stop() @@ -302,8 +311,8 @@ func TestWorkerDead(t *testing.T) { } func TestWorkersPaused(t *testing.T) { - pool := newTestPool(":6379") - ns := "work" + pool := newTestPool() + ns := uuid.New().String() job1 := "job1" deleteQueue(pool, ns, job1) deleteRetryAndDead(pool, ns) @@ -324,7 +333,7 @@ func TestWorkersPaused(t *testing.T) { _, err := enqueuer.Enqueue(job1, Q{"a": 1}) assert.Nil(t, err) - w := newWorker(ns, "1", pool, tstCtxType, nil, jobTypes, nil) + w := newWorker(ns, "1", pool, tstCtxType, jobTypes, nil) // pause the jobs prior to starting err = pauseJobs(ns, job1, pool) assert.Nil(t, err) @@ -366,6 +375,8 @@ func TestWorkersPaused(t *testing.T) { // Test that in the case of an unavailable Redis server, // the worker loop exits in the case of a WorkerPool.Stop func TestStop(t *testing.T) { + t.Parallel() + redisPool := &redis.Pool{ Dial: func() (redis.Conn, error) { c, err := redis.Dial("tcp", "notworking:6379", redis.DialConnectTimeout(1*time.Second)) @@ -381,8 +392,8 @@ func TestStop(t *testing.T) { } func BenchmarkJobProcessing(b *testing.B) { - pool := newTestPool(":6379") - ns := "work" + pool := newTestPool() + ns := uuid.New().String() cleanKeyspace(ns, pool) enqueuer := NewEnqueuer(ns, pool) @@ -405,13 +416,13 @@ func BenchmarkJobProcessing(b *testing.B) { wp.Stop() } -func newTestPool(addr string) *redis.Pool { +func newTestPool() *redis.Pool { return &redis.Pool{ MaxActive: 10, MaxIdle: 10, IdleTimeout: 240 * time.Second, Dial: func() (redis.Conn, error) { - c, err := redis.Dial("tcp", addr) + c, err := redis.Dial("tcp", ":6379") if err != nil { return nil, err } @@ -597,10 +608,12 @@ type emptyCtx struct{} // drained before returning. // https://github.com/gocraft/work/issues/24 func TestWorkerPoolStop(t *testing.T) { - ns := "will_it_end" - pool := newTestPool(":6379") + t.Parallel() + + ns := uuid.New().String() + pool := newTestPool() var started, stopped int32 - num_iters := 30 + numIters := 30 wp := NewWorkerPool(emptyCtx{}, 2, ns, pool) @@ -613,7 +626,7 @@ func TestWorkerPoolStop(t *testing.T) { var enqueuer = NewEnqueuer(ns, pool) - for i := 0; i <= num_iters; i++ { + for i := 0; i <= numIters; i++ { enqueuer.Enqueue("sample_job", Q{}) } @@ -627,7 +640,7 @@ func TestWorkerPoolStop(t *testing.T) { t.Errorf("Expected that jobs were finished and not killed while processing (started=%d, stopped=%d)", started, stopped) } - if started >= int32(num_iters) { + if started >= int32(numIters) { t.Errorf("Expected that jobs queue was not completely emptied.") } }