Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

### Fixed

- `JobCountByQueueAndState` now returns consistent results across drivers, including requested queues with zero jobs, and deduplicates repeated queue names in input. This resolves an issue with the sqlite driver in River UI reported in [riverqueue/riverui#496](https://github.com/riverqueue/riverui#496). [PR #1140](https://github.com/riverqueue/river/pull/1140).

## [0.30.2] - 2026-01-26

### Fixed
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

50 changes: 50 additions & 0 deletions riverdriver/riverdrivertest/riverdrivertest.go
Original file line number Diff line number Diff line change
Expand Up @@ -787,6 +787,56 @@ func Exercise[TTx any](ctx context.Context, t *testing.T,
require.Equal(t, int64(1), countsByQueue[1].CountAvailable)
require.Equal(t, int64(1), countsByQueue[1].CountRunning)
})

t.Run("IncludesRequestedQueuesThatHaveNoJobs", func(t *testing.T) {
t.Parallel()

exec, _ := setup(ctx, t)

_ = testfactory.Job(ctx, t, exec, &testfactory.JobOpts{Queue: ptrutil.Ptr("queue2"), State: ptrutil.Ptr(rivertype.JobStateAvailable)})
_ = testfactory.Job(ctx, t, exec, &testfactory.JobOpts{Queue: ptrutil.Ptr("queue2"), State: ptrutil.Ptr(rivertype.JobStateRunning)})

countsByQueue, err := exec.JobCountByQueueAndState(ctx, &riverdriver.JobCountByQueueAndStateParams{
QueueNames: []string{"queue1", "queue2"},
Schema: "",
})
require.NoError(t, err)

require.Len(t, countsByQueue, 2)

require.Equal(t, "queue1", countsByQueue[0].Queue)
require.Equal(t, int64(0), countsByQueue[0].CountAvailable)
require.Equal(t, int64(0), countsByQueue[0].CountRunning)

require.Equal(t, "queue2", countsByQueue[1].Queue)
require.Equal(t, int64(1), countsByQueue[1].CountAvailable)
require.Equal(t, int64(1), countsByQueue[1].CountRunning)
})

t.Run("InputQueueNamesAreDeduplicated", func(t *testing.T) {
t.Parallel()

exec, _ := setup(ctx, t)

_ = testfactory.Job(ctx, t, exec, &testfactory.JobOpts{Queue: ptrutil.Ptr("queue2"), State: ptrutil.Ptr(rivertype.JobStateAvailable)})
_ = testfactory.Job(ctx, t, exec, &testfactory.JobOpts{Queue: ptrutil.Ptr("queue2"), State: ptrutil.Ptr(rivertype.JobStateRunning)})

countsByQueue, err := exec.JobCountByQueueAndState(ctx, &riverdriver.JobCountByQueueAndStateParams{
QueueNames: []string{"queue2", "queue1", "queue1"},
Schema: "",
})
require.NoError(t, err)

require.Len(t, countsByQueue, 2)

require.Equal(t, "queue1", countsByQueue[0].Queue)
require.Equal(t, int64(0), countsByQueue[0].CountAvailable)
require.Equal(t, int64(0), countsByQueue[0].CountRunning)

require.Equal(t, "queue2", countsByQueue[1].Queue)
require.Equal(t, int64(1), countsByQueue[1].CountAvailable)
require.Equal(t, int64(1), countsByQueue[1].CountRunning)
})
})

t.Run("JobCountByState", func(t *testing.T) {
Expand Down
2 changes: 1 addition & 1 deletion riverdriver/riverpgxv5/internal/dbsqlc/river_job.sql
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ GROUP BY state;

-- name: JobCountByQueueAndState :many
WITH all_queues AS (
SELECT unnest(@queue_names::text[])::text AS queue
SELECT DISTINCT unnest(@queue_names::text[])::text AS queue
),

running_job_counts AS (
Expand Down
2 changes: 1 addition & 1 deletion riverdriver/riverpgxv5/internal/dbsqlc/river_job.sql.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

40 changes: 36 additions & 4 deletions riverdriver/riversqlite/river_sqlite_driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -295,14 +295,46 @@ func (e *Executor) JobCountByQueueAndState(ctx context.Context, params *riverdri
if err != nil {
return nil, interpretError(err)
}
results := make([]*riverdriver.JobCountByQueueAndStateResult, len(rows))
for i, row := range rows {
results[i] = &riverdriver.JobCountByQueueAndStateResult{

// The PostgreSQL drivers implement this query with an `all_queues` CTE and
// LEFT JOINs, so they return one row per requested queue, including queues
// that currently have no jobs. The input queue list is deduplicated in SQL.
// The SQLite sqlc driver only reliably supports `sqlc.slice` in `IN (...)`,
// and we haven't found a workable way to bind a parameterized list through
// `json_each(...)` to produce equivalent SQL. The SQLite SQL query therefore
// returns only queues with matching rows, and this wrapper fills in missing
// queues to match PostgreSQL behavior.
countsByQueue := make(map[string]struct {
CountAvailable int64
CountRunning int64
}, len(rows))
for _, row := range rows {
countsByQueue[row.Queue] = struct {
CountAvailable int64
CountRunning int64
}{
CountAvailable: row.CountAvailable,
CountRunning: row.CountRunning,
Queue: row.Queue,
}
}

queueNames := slices.Clone(params.QueueNames)
slices.Sort(queueNames)
queueNames = slices.Compact(queueNames)

results := make([]*riverdriver.JobCountByQueueAndStateResult, 0, len(queueNames))
for _, queueName := range queueNames {
result := &riverdriver.JobCountByQueueAndStateResult{
Queue: queueName,
}
if counts, ok := countsByQueue[queueName]; ok {
result.CountAvailable = counts.CountAvailable
result.CountRunning = counts.CountRunning
}

results = append(results, result)
}

return results, nil
}

Expand Down
Loading