Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 52 additions & 0 deletions .github/workflows/docs.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
name: Deploy Documentation

on:
push:
branches: [master]
workflow_dispatch:

permissions:
contents: read
pages: write
id-token: write

concurrency:
group: "pages"
cancel-in-progress: false

jobs:
build:
runs-on: ubuntu-latest
steps:
- name: Checkout
uses: actions/checkout@v4

- name: Set up Python
uses: actions/setup-python@v5
with:
python-version: '3.11'
cache: 'pip'

- name: Install dependencies
run: |
pip install mkdocs-material mkdocstrings[python]
pip install -e .

- name: Build documentation
run: mkdocs build --strict

- name: Upload artifact
uses: actions/upload-pages-artifact@v3
with:
path: site

deploy:
environment:
name: github-pages
url: ${{ steps.deployment.outputs.page_url }}
runs-on: ubuntu-latest
needs: build
steps:
- name: Deploy to GitHub Pages
id: deployment
uses: actions/deploy-pages@v4
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -67,3 +67,6 @@ config.ini
*.asc

*.sqlite3

# MkDocs build output
site/
68 changes: 68 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,61 @@ path('django_simple_queue/', include('django_simple_queue.urls')),

## Usage

### Create a custom settings file `task_worker_settings.py`
If you want a lean taskworker process, thatdoes not run all of the django apps, create a custom settings file in the same
directory as `<project>/settings.py` that imports the webserver settings, but doesn't load all the django apps that
the webserver needs. In our tests, this decreased the RAM usage for a single taskworker from `400-500 MB` to just `30-50 MB`[^1].

Also the number of subprocesses (as reported by htop) drops from `10-21` subprocesses to just `1`!

Here is an example --

```python
# task_worker_settings.py
from .settings import *

# Application definition - keep only what's needed for task processing
INSTALLED_APPS = [
# django packages - minimal required
"django.contrib.contenttypes", # Required for DB models
"django_simple_queue",
# Only apps needed for task processing
"<project>",
]

# Remove all middleware
MIDDLEWARE = []

# Remove template settings - task workers don't need templates
TEMPLATES = []

# Disable static/media file handling
STATICFILES_DIRS = ()
STATIC_URL = None
STATIC_ROOT = None
MEDIA_ROOT = None
MEDIA_URL = None

# Disable unnecessary Django features
USE_I18N = False
USE_TZ = True # Keep timezone support if your tasks rely on it

# Optimize database connections
DATABASES["default"]["CONN_MAX_AGE"] = None # Persistent connections
DATABASES["default"]["OPTIONS"] = {
"connect_timeout": 10,
}

# Remove unnecessary authentication settings
AUTH_PASSWORD_VALIDATORS = []

# Disable admin
ADMIN_ENABLED = False

# Disable URL configuration
ROOT_URLCONF = None
```

Start the worker process as follows:
````
python manage.py task_worker
Expand All @@ -31,3 +86,16 @@ create_task(
)
````
The task queue can be viewed at /django_simple_queue/task

## Concurrency and locking

To prevent multiple workers from picking the same task, the worker command (`django_simple_queue/management/commands/task_worker.py`) uses database-level pessimistic locking when claiming tasks:

- It wraps the selection in a transaction and queries with `select_for_update(skip_locked=True)` to lock a single queued task row and skip any rows currently locked by another worker.
- Once a task is selected under the lock, the worker immediately marks it as `In progress` (`Task.PROGRESS`) within the same transaction. Only after claiming does it spawn a subprocess to execute the task.
- If the database backend does not support `skip_locked`, the code falls back to `select_for_update()` without the `skip_locked` argument. While this still provides row-level locking on supported backends, `skip_locked` offers better concurrency characteristics.

Recommended backends: For robust concurrent processing with multiple workers, use a database that supports `SELECT ... FOR UPDATE SKIP LOCKED` (e.g., PostgreSQL). SQLite may not provide full locking semantics for this pattern; it is best suited for development or single-worker setups.

[^1]: The metric used is Resident Set Size from the `psutil` python module, which double counts shared libraries and is
slightly more than actual RAM Usage.
62 changes: 54 additions & 8 deletions django_simple_queue/admin.py
Original file line number Diff line number Diff line change
@@ -1,29 +1,75 @@
from django.contrib import admin
from django.shortcuts import reverse
from django.utils.safestring import mark_safe
from django.contrib import messages
"""
Django admin configuration for django-simple-queue.

Provides a customized admin interface for viewing and managing tasks.
"""
from __future__ import annotations

from django.contrib import admin, messages
from django.db.models import QuerySet
from django.http import HttpRequest
from django.urls import reverse
from django.utils.html import format_html
from django.utils.safestring import SafeString
from django.utils.translation import ngettext
from django_simple_queue.models import Task


@admin.register(Task)
class TaskAdmin(admin.ModelAdmin):
"""
Admin interface for Task model.

Features:
- All fields are read-only when editing an existing task
- Status column links to the task status page
- "Enqueue" action to re-queue selected tasks
- Search by task ID, callable path, and output
- Filter by status, created, and modified dates

Note:
Tasks are generally created programmatically via ``create_task()``,
but the admin interface is useful for monitoring and re-queuing
failed tasks.
"""

def get_readonly_fields(self, request, obj=None):
if obj:
self.readonly_fields = [field.name for field in obj.__class__._meta.fields]
return self.readonly_fields

def status_page_link(self, obj):
return mark_safe("<a href='{}?task_id={}', target='_blank'>{}</a>".format(
def status_page_link(self, obj: Task) -> SafeString:
"""
Generate a clickable link to the task status page.

Used as a column in the admin list view. Opens in a new tab.

Args:
obj: The Task instance.

Returns:
Safe HTML link to the task status view.
"""
return format_html(
'<a href="{}?task_id={}" target="_blank">{}</a>',
reverse('django_simple_queue:task'),
obj.id,
obj.get_status_display(),
))
)
status_page_link.short_description = "Status"

@admin.action(description='Enqueue')
def enqueue_tasks(self, request, queryset):
def enqueue_tasks(self, request: HttpRequest, queryset: QuerySet[Task]) -> None:
"""
Admin action to re-queue selected tasks.

Changes the status of selected tasks back to QUEUED so they will
be picked up by a worker. Useful for retrying failed tasks.

Args:
request: The HTTP request.
queryset: QuerySet of selected Task instances.
"""
updated = queryset.update(status=Task.QUEUED)
self.message_user(request, ngettext(
'%d task was successfully enqueued.',
Expand Down
87 changes: 87 additions & 0 deletions django_simple_queue/conf.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
"""
Configuration settings for django_simple_queue.

Settings are read from Django's settings.py with the DJANGO_SIMPLE_QUEUE_ prefix.
"""
from __future__ import annotations

from django.conf import settings


def get_allowed_tasks() -> set[str] | None:
"""
Returns the set of allowed task callables.

Configure in settings.py:
DJANGO_SIMPLE_QUEUE_ALLOWED_TASKS = {
"myapp.tasks.process_order",
"myapp.tasks.send_email",
}

If not set or set to None, ALL tasks are allowed (unsafe, but backwards-compatible).
Set to an empty set to disallow all tasks.
"""
allowed = getattr(settings, "DJANGO_SIMPLE_QUEUE_ALLOWED_TASKS", None)
if allowed is None:
return None # No restriction (backwards-compatible but unsafe)
return set(allowed)


def is_task_allowed(task_path: str) -> bool:
"""
Check if a task path is in the allowed list.

Args:
task_path: Dotted path to the callable (e.g., "myapp.tasks.process_order")

Returns:
True if allowed, False if not allowed.
If DJANGO_SIMPLE_QUEUE_ALLOWED_TASKS is not configured, returns True (permissive).
"""
allowed = get_allowed_tasks()
if allowed is None:
# No allowlist configured - permissive mode (backwards-compatible)
return True
return task_path in allowed


def get_max_output_size() -> int:
"""
Returns the maximum allowed output size in bytes.

Configure in settings.py:
DJANGO_SIMPLE_QUEUE_MAX_OUTPUT_SIZE = 1_000_000 # 1MB

Default: 10MB
"""
return getattr(settings, "DJANGO_SIMPLE_QUEUE_MAX_OUTPUT_SIZE", 10 * 1024 * 1024)


def get_max_args_size() -> int:
"""
Returns the maximum allowed args JSON size in bytes.

Configure in settings.py:
DJANGO_SIMPLE_QUEUE_MAX_ARGS_SIZE = 100_000 # 100KB

Default: 1MB
"""
return getattr(settings, "DJANGO_SIMPLE_QUEUE_MAX_ARGS_SIZE", 1024 * 1024)


def get_task_timeout() -> int | None:
"""
Returns the maximum execution time for a task in seconds.

Configure in settings.py:
DJANGO_SIMPLE_QUEUE_TASK_TIMEOUT = 300 # 5 minutes

If not set or set to None, tasks can run indefinitely.
Set to 0 or negative to disable timeout.

Default: 3600 (1 hour)
"""
timeout = getattr(settings, "DJANGO_SIMPLE_QUEUE_TASK_TIMEOUT", 3600)
if timeout is None or timeout <= 0:
return None
return timeout
Loading