· Jonathan Cutrer · Engineering · 5 min read
A Minimal Task Queue in Python Without Celery
For background jobs that don't need a distributed system. PostgreSQL LISTEN/NOTIFY, asyncio, and about 150 lines of code.
Celery is a serious piece of infrastructure. Message broker, result backend, worker processes, beat scheduler — it’s the right tool when you need distributed task execution at scale. It’s significant overkill when you need to run three types of background jobs on a single-server application that processes a few hundred tasks per day.
I’ve built the lightweight version of this a few times now. Here’s the current iteration.
The Design
The core idea: tasks are rows in a PostgreSQL table, worker processes poll for unclaimed rows, and LISTEN/NOTIFY replaces a timer-based poll for low-latency dispatch.
CREATE TABLE tasks (
id BIGSERIAL PRIMARY KEY,
task_type TEXT NOT NULL,
payload JSONB NOT NULL DEFAULT '{}',
status TEXT NOT NULL DEFAULT 'pending'
CHECK (status IN ('pending', 'running', 'done', 'failed')),
priority INT NOT NULL DEFAULT 0,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
started_at TIMESTAMPTZ,
finished_at TIMESTAMPTZ,
error TEXT,
retries INT NOT NULL DEFAULT 0,
max_retries INT NOT NULL DEFAULT 3
);
CREATE INDEX idx_tasks_pending ON tasks (priority DESC, created_at ASC)
WHERE status = 'pending';
-- Function to notify on new task insert
CREATE OR REPLACE FUNCTION notify_task_insert()
RETURNS TRIGGER AS $$
BEGIN
PERFORM pg_notify('new_task', NEW.id::text);
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
CREATE TRIGGER task_insert_notify
AFTER INSERT ON tasks
FOR EACH ROW EXECUTE FUNCTION notify_task_insert();The partial index on (priority DESC, created_at ASC) WHERE status = 'pending' is what makes the claim query fast — it only indexes pending tasks, so the index stays small as completed tasks accumulate.
Claiming a Task Atomically
The critical section: multiple workers competing for the same task. This must be atomic or you get double-execution.
async def claim_next_task(conn) -> dict | None:
"""Atomically claim the highest-priority pending task."""
row = await conn.fetchrow("""
UPDATE tasks
SET status = 'running', started_at = NOW()
WHERE id = (
SELECT id FROM tasks
WHERE status = 'pending'
ORDER BY priority DESC, created_at ASC
FOR UPDATE SKIP LOCKED
LIMIT 1
)
RETURNING *
""")
return dict(row) if row else NoneFOR UPDATE SKIP LOCKED is the key. It acquires a row-level lock on the selected task, but instead of blocking when it encounters a locked row (which another worker is already claiming), it skips to the next one. This lets multiple workers run simultaneously without stepping on each other.
Without SKIP LOCKED, workers would block waiting for locks and you’d get serial execution under contention — the opposite of what you want.
The Worker Loop
import asyncio
import asyncpg
import json
import logging
from typing import Callable, Awaitable
logger = logging.getLogger(__name__)
TaskHandler = Callable[[dict], Awaitable[None]]
class TaskWorker:
def __init__(self, dsn: str, handlers: dict[str, TaskHandler]):
self.dsn = dsn
self.handlers = handlers
self._pool: asyncpg.Pool | None = None
async def start(self):
self._pool = await asyncpg.create_pool(self.dsn, min_size=2, max_size=10)
logger.info("Worker started, listening for tasks")
await asyncio.gather(
self._listen_loop(),
self._drain_loop(),
)
async def _listen_loop(self):
"""Subscribe to NOTIFY and claim tasks on new inserts."""
async with self._pool.acquire() as conn:
await conn.add_listener("new_task", self._on_notify)
while True:
await asyncio.sleep(3600) # keep connection alive
async def _on_notify(self, conn, pid, channel, payload):
asyncio.create_task(self._process_one())
async def _drain_loop(self):
"""Catch any tasks that slipped through on startup or restart."""
while True:
await self._process_one()
await asyncio.sleep(5)
async def _process_one(self):
async with self._pool.acquire() as conn:
task = await claim_next_task(conn)
if not task:
return
handler = self.handlers.get(task["task_type"])
if not handler:
logger.warning(f"No handler for task_type={task['task_type']}")
await conn.execute(
"UPDATE tasks SET status='failed', error=$1, finished_at=NOW() WHERE id=$2",
f"No handler registered for {task['task_type']}", task["id"]
)
return
try:
await handler(task["payload"])
await conn.execute(
"UPDATE tasks SET status='done', finished_at=NOW() WHERE id=$1",
task["id"]
)
logger.info(f"Task {task['id']} ({task['task_type']}) completed")
except Exception as e:
retries = task["retries"] + 1
if retries < task["max_retries"]:
await conn.execute(
"UPDATE tasks SET status='pending', retries=$1, error=$2 WHERE id=$3",
retries, str(e), task["id"]
)
logger.warning(f"Task {task['id']} failed (retry {retries}/{task['max_retries']}): {e}")
else:
await conn.execute(
"UPDATE tasks SET status='failed', retries=$1, error=$2, finished_at=NOW() WHERE id=$3",
retries, str(e), task["id"]
)
logger.error(f"Task {task['id']} failed permanently: {e}")Registering Handlers
async def send_welcome_email(payload: dict):
user_id = payload["user_id"]
# ... email logic
logger.info(f"Welcome email sent to user {user_id}")
async def process_uploaded_file(payload: dict):
file_path = payload["file_path"]
# ... processing logic
# Start the worker
worker = TaskWorker(
dsn=os.environ["DATABASE_URL"],
handlers={
"send_welcome_email": send_welcome_email,
"process_uploaded_file": process_uploaded_file,
}
)
asyncio.run(worker.start())Enqueuing From Your Application
async def enqueue(conn, task_type: str, payload: dict, priority: int = 0):
await conn.execute(
"INSERT INTO tasks (task_type, payload, priority) VALUES ($1, $2, $3)",
task_type, json.dumps(payload), priority
)The trigger fires automatically, the worker picks it up within milliseconds. No polling delay for normal operation; the _drain_loop exists only as a safety net.
What This Doesn’t Do
No distributed workers across multiple machines (the SKIP LOCKED approach works fine across multiple processes on the same Postgres instance). No scheduled/cron tasks. No task result retrieval beyond checking the database row. No rate limiting or concurrency controls per task type.
For any of those, evaluate Celery, ARQ, or Dramatiq. This is for the cases where you don’t need those things and don’t want to operate the infrastructure they require.
The complete version of this, including the schema, worker, and a FastAPI endpoint for enqueueing, fits in about 250 lines of Python.