diff --git a/client.go b/client.go index 939f33ca..e738ed68 100644 --- a/client.go +++ b/client.go @@ -363,6 +363,58 @@ func (c *Client) DeleteDeadJob(diedAt int64, jobID string) error { return nil } +// Retry retries failed jobs of a specific type from the dead queue. +func (c *Client) RetryDeadOfType(jobType string) error { + // Get queues for job names + queues, err := c.Queues() + if err != nil { + logError("client.retry_all_dead_jobs.queues", err) + return err + } + + fmt.Println("Queues: %+V", queues) + // Extract job names + var jobNames []string + for _, q := range queues { + if q.JobName == jobType { + jobNames = append(jobNames, q.JobName) + } + } + fmt.Println("JobNames: %+V", jobNames) + + script := redis.NewScript(len(jobNames)+1, redisLuaRequeueAllDeadCmd) + + args := make([]interface{}, 0, len(jobNames)+1+3) + args = append(args, redisKeyDead(c.namespace)) // KEY[1] + for _, jobName := range jobNames { + args = append(args, redisKeyJobs(c.namespace, jobName)) // KEY[2, 3, ...] + } + args = append(args, redisKeyJobsPrefix(c.namespace)) // ARGV[1] + args = append(args, nowEpochSeconds()) + args = append(args, 1000) + + conn := c.pool.Get() + defer conn.Close() + + fmt.Println("args: %+V", args) + // Cap iterations for safety (which could reprocess 1k*1k jobs). + // This is conceptually an infinite loop but let's be careful. + for i := 0; i < 1000; i++ { + res, err := redis.Int64(script.Do(conn, args...)) + if err != nil { + logError("client.retry_all_dead_jobs.do", err) + return err + } + + if res == 0 { + break + } + } + + return nil + +} + // RetryDeadJob retries a dead job. The job will be re-queued on the normal work queue for eventual processing by a worker. func (c *Client) RetryDeadJob(diedAt int64, jobID string) error { // Get queues for job names diff --git a/webui/webui.go b/webui/webui.go index 51e250d5..17914f9c 100644 --- a/webui/webui.go +++ b/webui/webui.go @@ -57,6 +57,7 @@ func NewServer(namespace string, pool *redis.Pool, hostPort string) *Server { router.Get("/dead_jobs", (*context).deadJobs) router.Post("/delete_dead_job/:died_at:\\d.*/:job_id", (*context).deleteDeadJob) router.Post("/retry_dead_job/:died_at:\\d.*/:job_id", (*context).retryDeadJob) + router.Post("/retry_dead_job_type/:job_type", (*context).retryDeadJobOfType) router.Post("/delete_all_dead_jobs", (*context).deleteAllDeadJobs) router.Post("/retry_all_dead_jobs", (*context).retryAllDeadJobs) @@ -205,6 +206,12 @@ func (c *context) retryDeadJob(rw web.ResponseWriter, r *web.Request) { render(rw, map[string]string{"status": "ok"}, err) } +func (c *context) retryDeadJobOfType(rw web.ResponseWriter, r *web.Request) { + err := c.client.RetryDeadOfType(r.PathParams["job_type"]) + + render(rw, map[string]string{"status": "ok"}, err) +} + func (c *context) deleteAllDeadJobs(rw web.ResponseWriter, r *web.Request) { err := c.client.DeleteAllDeadJobs() render(rw, map[string]string{"status": "ok"}, err)