· Jonathan Cutrer · Engineering  · 5 min read

A Minimal Task Queue in Python Without Celery

For background jobs that don't need a distributed system. PostgreSQL LISTEN/NOTIFY, asyncio, and about 150 lines of code.

For background jobs that don't need a distributed system. PostgreSQL LISTEN/NOTIFY, asyncio, and about 150 lines of code.

Celery is a serious piece of infrastructure. Message broker, result backend, worker processes, beat scheduler — it’s the right tool when you need distributed task execution at scale. It’s significant overkill when you need to run three types of background jobs on a single-server application that processes a few hundred tasks per day.

I’ve built the lightweight version of this a few times now. Here’s the current iteration.

The Design

The core idea: tasks are rows in a PostgreSQL table, worker processes poll for unclaimed rows, and LISTEN/NOTIFY replaces a timer-based poll for low-latency dispatch.

CREATE TABLE tasks (
    id          BIGSERIAL PRIMARY KEY,
    task_type   TEXT NOT NULL,
    payload     JSONB NOT NULL DEFAULT '{}',
    status      TEXT NOT NULL DEFAULT 'pending'
                CHECK (status IN ('pending', 'running', 'done', 'failed')),
    priority    INT NOT NULL DEFAULT 0,
    created_at  TIMESTAMPTZ NOT NULL DEFAULT NOW(),
    started_at  TIMESTAMPTZ,
    finished_at TIMESTAMPTZ,
    error       TEXT,
    retries     INT NOT NULL DEFAULT 0,
    max_retries INT NOT NULL DEFAULT 3
);

CREATE INDEX idx_tasks_pending ON tasks (priority DESC, created_at ASC)
    WHERE status = 'pending';

-- Function to notify on new task insert
CREATE OR REPLACE FUNCTION notify_task_insert()
RETURNS TRIGGER AS $$
BEGIN
    PERFORM pg_notify('new_task', NEW.id::text);
    RETURN NEW;
END;
$$ LANGUAGE plpgsql;

CREATE TRIGGER task_insert_notify
    AFTER INSERT ON tasks
    FOR EACH ROW EXECUTE FUNCTION notify_task_insert();

The partial index on (priority DESC, created_at ASC) WHERE status = 'pending' is what makes the claim query fast — it only indexes pending tasks, so the index stays small as completed tasks accumulate.

Claiming a Task Atomically

The critical section: multiple workers competing for the same task. This must be atomic or you get double-execution.

async def claim_next_task(conn) -> dict | None:
    """Atomically claim the highest-priority pending task."""
    row = await conn.fetchrow("""
        UPDATE tasks
        SET status = 'running', started_at = NOW()
        WHERE id = (
            SELECT id FROM tasks
            WHERE status = 'pending'
            ORDER BY priority DESC, created_at ASC
            FOR UPDATE SKIP LOCKED
            LIMIT 1
        )
        RETURNING *
    """)
    return dict(row) if row else None

FOR UPDATE SKIP LOCKED is the key. It acquires a row-level lock on the selected task, but instead of blocking when it encounters a locked row (which another worker is already claiming), it skips to the next one. This lets multiple workers run simultaneously without stepping on each other.

Without SKIP LOCKED, workers would block waiting for locks and you’d get serial execution under contention — the opposite of what you want.

The Worker Loop

import asyncio
import asyncpg
import json
import logging
from typing import Callable, Awaitable

logger = logging.getLogger(__name__)

TaskHandler = Callable[[dict], Awaitable[None]]

class TaskWorker:
    def __init__(self, dsn: str, handlers: dict[str, TaskHandler]):
        self.dsn = dsn
        self.handlers = handlers
        self._pool: asyncpg.Pool | None = None

    async def start(self):
        self._pool = await asyncpg.create_pool(self.dsn, min_size=2, max_size=10)
        logger.info("Worker started, listening for tasks")
        await asyncio.gather(
            self._listen_loop(),
            self._drain_loop(),
        )

    async def _listen_loop(self):
        """Subscribe to NOTIFY and claim tasks on new inserts."""
        async with self._pool.acquire() as conn:
            await conn.add_listener("new_task", self._on_notify)
            while True:
                await asyncio.sleep(3600)  # keep connection alive

    async def _on_notify(self, conn, pid, channel, payload):
        asyncio.create_task(self._process_one())

    async def _drain_loop(self):
        """Catch any tasks that slipped through on startup or restart."""
        while True:
            await self._process_one()
            await asyncio.sleep(5)

    async def _process_one(self):
        async with self._pool.acquire() as conn:
            task = await claim_next_task(conn)
            if not task:
                return

            handler = self.handlers.get(task["task_type"])
            if not handler:
                logger.warning(f"No handler for task_type={task['task_type']}")
                await conn.execute(
                    "UPDATE tasks SET status='failed', error=$1, finished_at=NOW() WHERE id=$2",
                    f"No handler registered for {task['task_type']}", task["id"]
                )
                return

            try:
                await handler(task["payload"])
                await conn.execute(
                    "UPDATE tasks SET status='done', finished_at=NOW() WHERE id=$1",
                    task["id"]
                )
                logger.info(f"Task {task['id']} ({task['task_type']}) completed")
            except Exception as e:
                retries = task["retries"] + 1
                if retries < task["max_retries"]:
                    await conn.execute(
                        "UPDATE tasks SET status='pending', retries=$1, error=$2 WHERE id=$3",
                        retries, str(e), task["id"]
                    )
                    logger.warning(f"Task {task['id']} failed (retry {retries}/{task['max_retries']}): {e}")
                else:
                    await conn.execute(
                        "UPDATE tasks SET status='failed', retries=$1, error=$2, finished_at=NOW() WHERE id=$3",
                        retries, str(e), task["id"]
                    )
                    logger.error(f"Task {task['id']} failed permanently: {e}")

Registering Handlers

async def send_welcome_email(payload: dict):
    user_id = payload["user_id"]
    # ... email logic
    logger.info(f"Welcome email sent to user {user_id}")

async def process_uploaded_file(payload: dict):
    file_path = payload["file_path"]
    # ... processing logic

# Start the worker
worker = TaskWorker(
    dsn=os.environ["DATABASE_URL"],
    handlers={
        "send_welcome_email": send_welcome_email,
        "process_uploaded_file": process_uploaded_file,
    }
)

asyncio.run(worker.start())

Enqueuing From Your Application

async def enqueue(conn, task_type: str, payload: dict, priority: int = 0):
    await conn.execute(
        "INSERT INTO tasks (task_type, payload, priority) VALUES ($1, $2, $3)",
        task_type, json.dumps(payload), priority
    )

The trigger fires automatically, the worker picks it up within milliseconds. No polling delay for normal operation; the _drain_loop exists only as a safety net.

What This Doesn’t Do

No distributed workers across multiple machines (the SKIP LOCKED approach works fine across multiple processes on the same Postgres instance). No scheduled/cron tasks. No task result retrieval beyond checking the database row. No rate limiting or concurrency controls per task type.

For any of those, evaluate Celery, ARQ, or Dramatiq. This is for the cases where you don’t need those things and don’t want to operate the infrastructure they require.

The complete version of this, including the schema, worker, and a FastAPI endpoint for enqueueing, fits in about 250 lines of Python.

Back to Blog

Related Posts

View All Posts »