diff --git a/enqueue.go b/enqueue.go index 0d2f763e..80548ca3 100644 --- a/enqueue.go +++ b/enqueue.go @@ -1,6 +1,7 @@ package work import ( + "strconv" "sync" "time" @@ -120,7 +121,7 @@ func (e *Enqueuer) EnqueueUniqueIn(jobName string, secondsFromNow int64, args ma // In order to add robustness to the system, jobs are only unique for 24 hours after they're enqueued. This is mostly relevant for scheduled jobs. // EnqueueUniqueByKey returns the job if it was enqueued and nil if it wasn't func (e *Enqueuer) EnqueueUniqueByKey(jobName string, args map[string]interface{}, keyMap map[string]interface{}) (*Job, error) { - enqueue, job, err := e.uniqueJobHelper(jobName, args, keyMap) + enqueue, job, err := e.uniqueJobHelper(jobName, args, keyMap, 0) if err != nil { return nil, err } @@ -136,7 +137,7 @@ func (e *Enqueuer) EnqueueUniqueByKey(jobName string, args map[string]interface{ // EnqueueUniqueInByKey enqueues a job in the scheduled job queue that is unique on specified key for execution in secondsFromNow seconds. See EnqueueUnique for the semantics of unique jobs. // Subsequent calls with same key will update arguments func (e *Enqueuer) EnqueueUniqueInByKey(jobName string, secondsFromNow int64, args map[string]interface{}, keyMap map[string]interface{}) (*ScheduledJob, error) { - enqueue, job, err := e.uniqueJobHelper(jobName, args, keyMap) + enqueue, job, err := e.uniqueJobHelper(jobName, args, keyMap, secondsFromNow) if err != nil { return nil, err } @@ -181,7 +182,7 @@ func (e *Enqueuer) addToKnownJobs(conn redis.Conn, jobName string) error { type enqueueFnType func(*int64) (string, error) -func (e *Enqueuer) uniqueJobHelper(jobName string, args map[string]interface{}, keyMap map[string]interface{}) (enqueueFnType, *Job, error) { +func (e *Enqueuer) uniqueJobHelper(jobName string, args map[string]interface{}, keyMap map[string]interface{}, uniqTTL int64) (enqueueFnType, *Job, error) { useDefaultKeys := false if keyMap == nil { useDefaultKeys = true @@ -232,9 +233,9 @@ func (e *Enqueuer) uniqueJobHelper(jobName string, args map[string]interface{}, } if runAt != nil { // Scheduled job so different job queue with additional arg - scriptArgs[0] = redisKeyScheduled(e.Namespace) // KEY[1] - scriptArgs = append(scriptArgs, *runAt) // ARGV[3] - + scriptArgs[0] = redisKeyScheduled(e.Namespace) // KEY[1] + scriptArgs = append(scriptArgs, *runAt) // ARGV[3] + scriptArgs = append(scriptArgs, strconv.FormatInt(uniqTTL, 10)) // ARGV[4] script = e.enqueueUniqueInScript } diff --git a/go.mod b/go.mod index 224eab98..a16ce294 100644 --- a/go.mod +++ b/go.mod @@ -15,13 +15,11 @@ require ( github.com/garyburd/redigo v1.6.0 // indirect github.com/gocraft/health v0.0.0-20170925182251-8675af27fef0 github.com/gocraft/web v0.0.0-20190207150652-9707327fb69b - github.com/gocraft/work v0.5.1 github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b // indirect github.com/gomodule/redigo v2.0.0+incompatible github.com/jrallison/go-workers v0.0.0-20180112190529-dbf81d0b75bb github.com/kr/pretty v0.2.0 // indirect github.com/orfjackal/nanospec.go v0.0.0-20120727230329-de4694c1d701 // indirect - github.com/robfig/cron v1.2.0 // indirect github.com/robfig/cron/v3 v3.0.1 github.com/stretchr/testify v1.5.1 github.com/youtube/vitess v2.1.1+incompatible // indirect diff --git a/go.sum b/go.sum index 63aa5f0a..89fa3ef1 100644 --- a/go.sum +++ b/go.sum @@ -24,8 +24,6 @@ github.com/gocraft/health v0.0.0-20170925182251-8675af27fef0 h1:pKjeDsx7HGGbjr7V github.com/gocraft/health v0.0.0-20170925182251-8675af27fef0/go.mod h1:rWibcVfwbUxi/QXW84U7vNTcIcZFd6miwbt8ritxh/Y= github.com/gocraft/web v0.0.0-20190207150652-9707327fb69b h1:g2Qcs0B+vOQE1L3a7WQ/JUUSzJnHbTz14qkJSqEWcF4= github.com/gocraft/web v0.0.0-20190207150652-9707327fb69b/go.mod h1:Ag7UMbZNGrnHwaXPJOUKJIVgx4QOWMOWZngrvsN6qak= -github.com/gocraft/work v0.5.1 h1:3bRjMiOo6N4zcRgZWV3Y7uX7R22SF+A9bPTk4xRXr34= -github.com/gocraft/work v0.5.1/go.mod h1:pc3n9Pb5FAESPPGfM0nL+7Q1xtgtRnF8rr/azzhQVlM= github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b h1:VKtxabqXZkF25pY9ekfRL6a582T4P37/31XEstQ5p58= github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q= github.com/gomodule/redigo v2.0.0+incompatible h1:K/R+8tc58AaqLkqG2Ol3Qk+DR/TlNuhuh457pBFPtt0= @@ -41,8 +39,6 @@ github.com/orfjackal/nanospec.go v0.0.0-20120727230329-de4694c1d701 h1:yOXfzNV7q github.com/orfjackal/nanospec.go v0.0.0-20120727230329-de4694c1d701/go.mod h1:VtBIF1XX0c1nKkeAPk8i4aXkYopqQgfDqolHUIHPwNI= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= -github.com/robfig/cron v1.2.0 h1:ZjScXvvxeQ63Dbyxy76Fj3AT3Ut0aKsyd2/tl3DTMuQ= -github.com/robfig/cron v1.2.0/go.mod h1:JGuDeoQd7Z6yL4zQhZ3OPEVHB7fL6Ka6skscFHfmt2k= github.com/robfig/cron/v3 v3.0.1 h1:WdRxkvbJztn8LMz/QEvLN5sBU+xKpSqwwUO1Pjr4qDs= github.com/robfig/cron/v3 v3.0.1/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= diff --git a/redis.go b/redis.go index 417eb481..53b09d00 100644 --- a/redis.go +++ b/redis.go @@ -364,12 +364,13 @@ return 'dup' // ARGV[1] = job // ARGV[2] = updated job or just a 1 if arguments don't update // ARGV[3] = epoch seconds for job to be run at +// ARGV[4] = uniq key ttl var redisLuaEnqueueUniqueIn = ` -if redis.call('set', KEYS[2], ARGV[2], 'NX', 'EX', '86400') then +if redis.call('set', KEYS[2], ARGV[2], 'NX', 'EX', ARGV[4]) then redis.call('zadd', KEYS[1], ARGV[3], ARGV[1]) return 'ok' else - redis.call('set', KEYS[2], ARGV[2], 'EX', '86400') + redis.call('set', KEYS[2], ARGV[2], 'EX', ARGV[4]) end return 'dup' `