Skip to content

Simple Durable Background Job Processing Using Database as a Queue

Why Background Job Processing?

Background job processing is not a new concept. It has been around for a long time. The main idea is to offload the long processing tasks from the foreground (e.g. web server or job dispatcher) to a set of workers that run in the background in parallel. There are a few benefits for this:

  • This avoids having long running tasks such as sending emails, processing images, documents etc from blocking the foreground HTTP server. This ensures the quick and snappy response to user requests.
  • Via decoupling, we can scale the background workers independently from the foreground, which allows better resource utilization and scalability.
  • Typically the background jobs are backed by a durable storage such as a queue or database to ensure the jobs are not lost when the workers are stoppped or restarted. This brings greater reliability and fault tolerance.

Why Database as a Queue?

Using database as a queue for backgroud job processing might sounds like a heresy as majority of the cloud vendors encourage you to burn money on their own queue services. Also historically database (RDBMS) doesn't have a particularly good reputation as a queue service, mainly due to the blockish nature of the transaction isolation (this will be explained in greater detail later in section Dequeueing A Task). However with recent years advancements in CPUs and SSDs and the RDBMS features, database is now a pretty viable option - in fact it works at massive scale for hyper-scalers such as Meta. Besides for your average "twitter-for-cats" web service, scalability is not much of an issue. There are a few benefits of using database as a queue for background job processing:

  • Simplicity: A database and your language runtime are all you need. It's always nice to cut down the number of dependencies as much as possible.
  • Transactional: Due to the transactional nature of RDBMS, we can ensure the consistency of the job processing with commit/rollback. The bonus point is that if your queue and your business logic shares the same database, you can pretty much guarantee the atomicity of the job processing.
  • Durable: The job queue is essentially backed up as a table in the database, which means it can survive the most catastrophic failure.

In the following example we will implement the background job processing using sqlite3 and SQLModel and Python.

Historically sqlite3 lacks the ability to have multi-process support, but this has largely been solved via the write-ahead logging feature. Replication support from Litestream also helps you to run production-grade sqlite3 database system in a distributed environment at a dirt cheap price.

Basic Data Model

Here is the queue item definition:

class TaskStatus(Enum):
    PENDING = "pending"
    RUNNING = "running"
    COMPLETED = "completed"
    FAILED = "failed"


DEFAULT_PRIORITY = 5


class SQLModel(_SQLModel, registry=registry()):
    metadata = MetaData()


class TaskItem(SQLModel, table=True):
    id: int = Field(primary_key=True)
    args: List[Any] = Field(sa_column=Column(JSON))
    kwargs: Dict[str, Any] = Field(sa_column=Column(JSON))
    func: str

    result: Any = Field(sa_column=Column(JSON))
    error: Optional[str] = Field(default=None, nullable=True)
    status: TaskStatus = Field(default=TaskStatus.PENDING, index=True)

    generation_id: int = Field(default=1)
    created_at: datetime = Field(default=datetime.now(UTC))
    updated_at: datetime = Field(default=datetime.now(UTC))

    priority: int = Field(default=DEFAULT_PRIORITY)

In the above code, we define a TaskItem class that represents a single job item in the queue. It has a status field that indicates the current state of the job, and a priority field that indicates the priority of the job.

As we want the function and its arguments to be queueable, we define func, args and kwargs fields. We also define result, error and status fields to store the result of the job, the error if the job failed, and the current status of the job.

For args, kwargs and result, we use the JSON type to store the data. This allows us to store arbitrary data that can be serialized and deserialized into json format, but it's probably gotta fail miserably for any complicated data structure.

Enqueueing a Task

This is how a task enqueueing looks like:

def enqueue_task(
    session: Session,
    fn: Callable[..., Awaitable[Any]],
    *args: List[Any],
    priority: int = DEFAULT_PRIORITY,
    **kwargs: Dict[str, Any],
):
    fn_module = fn.__module__
    fn_name = fn.__name__

    task = TaskItem(
        func=f"{fn_module}.{fn_name}",
        args=args,
        kwargs=kwargs,
        priority=priority,
    )
    session.add(task)
    session.commit()

    return task.id

This function takes a function and its arguments, and enqueues it into the database. It returns the id of the task item. Nothing fancy here.

Dequeueing a Task

Naive Not-working Approach

This is where the tricky part comes in. In a conventional RDBMS queueing system there are always multiple workers that are working simultaneously. This means that there is a high chance that multiple workers will try to dequeue the same task at the same time. Like the naive approach below. The system will end up with many duplicated tasks

START TRANSACTION;
SELECT * FROM task_items WHERE status = 'pending' ORDER BY priority DESC LIMIT 1;
-- start processing
COMMIT;

Using FOR UPDATE

To avoid this racing condition systems often use the FOR UPDATE clause to lock the row for the duration of the job processing within a transaction:

START TRANSACTION;
SELECT * FROM task_items WHERE status = 'pending' FOR UPDATE ORDER BY priority DESC LIMIT 1;
-- start processing
COMMIT;

This approach ensures that when multiple workers try to dequeue the same task simultaneously, only one worker will be able to claim the task and start processing. The downside of this approach is that other worker's transactions will be blocked until the first worker commits the transaction, which is not great for performance.

Using FOR UPDATE SKIP LOCKED

In MySQL and Postgres you can mitigate the contention by using the SELECT ... FOR UPDATE SKIP LOCKED. It is introduced in MySQL 8.0.1 and Postgres 9.5.

START TRANSACTION;
SELECT * FROM task_items WHERE status = 'pending' FOR UPDATE SKIP LOCKED ORDER BY priority DESC LIMIT 1;
-- start processing
COMMIT;

In this case if multiple workers try to dequeue tasks at the same time. The first worker will claim whatever task that is at top of the selected. For the others they will simply ignore that task and try the records next to it.

The SELECT ... FOR UPDATE SKIP LOCKED sounds perfect. In fact the state-of-the-art solid-queue uses this approach.

pg_try_advisory_lock

The pg_try_advisory_lock function (a non-blocking alternative to pg_advisory_lock) is a PostgreSQL-specific function that attempts to acquire an advisory lock on a specific key (in our case the task id). If the lock is already held by another session, the function returns false immediately, possibly running the lock acquisition in a loop. In our specific case this can be used as a non-blocking alternative to SELECT ... FOR UPDATE SKIP LOCKED.

Optimistic Locking (The actual implementation)

The SELECT ... FOR UPDATE SKIP LOCKED and pg_try_advisory_lock both sounds promising. However sqlite3 doesn't support SELECT ... FOR UPDATE, neither does it support pg_try_advisory_lock for obvious reasons.

So we need to find a different approach. The following is the actual implementation of the task dequeueing:

def dequeue_task(session: Session):
    task = session.exec(
        select(TaskItem)
        .where(TaskItem.status == TaskStatus.PENDING)
        .order_by(TaskItem.priority.desc())
    ).first()

    if not task:
        return None

    # an optimistic lock to prevent race condition
    result = session.exec(
        update(TaskItem)
        .where(TaskItem.id == task.id)
        .where(TaskItem.generation_id == task.generation_id)
        .values(
            status=TaskStatus.RUNNING,
            generation_id=task.generation_id + 1,
            updated_at=datetime.now(UTC),
        )
    )
    if result.rowcount == 0:
        return dequeue_task(session)

    session.refresh(task)
    return task

In this implementation we use optimistic locking to implement lock-free task dequeueing. The generation_id field is used to implement the optimistic lock. When a worker claims a task, it increments the generation_id field. If another worker tries to claim the same task at the same time, it will fail because the generation_id field will not match - This is because the generation_id is incremented by the first worker atomically. In such case the other worker will simply try again. Given the (task_id, status) claiming is fairly quick and snappy. It is likely to succeed in the next attempt.

This approach ensures:

  • Multiple workers won't dequeue the same task concurrently.
  • No contention on the task table as there is no invovement of transactional locks.

The concept of generation_id is certainly not new. You can find it like for like in Kubernetes .metadata.resourceVersion, it is used in a similar fashion.

Here is a little helper function to await for the completion of the task:

async def await_task_completion(
    session: Session,
    task_id: int,
    timeout: float = 5,
    interval: float = 0.1,
) -> TaskItem:
    start = time.time()
    while True:
        task = session.exec(select(TaskItem).where(TaskItem.id == task_id)).first()
        logger.info("task status", task_id=task_id, status=task.status)
        if task.status == TaskStatus.COMPLETED or task.status == TaskStatus.FAILED:
            return task
        if time.time() - start > timeout:
            raise TimeoutError(
                f"Task {task_id} did not complete within {timeout} seconds"
            )
        await asyncio.sleep(interval)

The background job worker

You can find it at here - https://gist.github.com/jingkaihe/65840eb7f6176d1c0e4136f3d77e5e24#file-dbq-py-L122-L172. It's fairly straitforward so I won't go into the details.

See it in action

from dbq import (
    Worker,
    enqueue_task,
    await_task_completion,
    SQLModel,
)
from sqlmodel import Session, create_engine
import asyncio
import structlog

logger = structlog.get_logger(__name__)


async def simple_task(a: int, b: int, text: str):
    await asyncio.sleep(1)
    print(text)
    return {
        "added": a + b,
        "text": text,
    }


async def main():
    engine = create_engine("sqlite:///:memory:")
    SQLModel.metadata.create_all(engine)
    session = Session(engine)
    worker = Worker(session, concurrency=5)
    worker_task = asyncio.create_task(worker.start())
    try:
        task_ids = [
            enqueue_task(session, simple_task, i, i + 1, f"Hello, World {i}!")
            for i in range(10)
        ]
        await_tasks = [await_task_completion(session, task_id) for task_id in task_ids]
        await asyncio.gather(*await_tasks)
    finally:
        await worker.stop()
        worker_task.cancel()
        try:
            await worker_task
        except asyncio.CancelledError:
            logger.info("worker task cancelled")


if __name__ == "__main__":
    asyncio.run(main())

I run it with time python main.py and it runs under 3 seconds:

real    0m2.942s
user    0m0.827s
sys     0m0.174s

Conclusion

Obviously this is not a full-blown background job processing system like Celery however it is a simple and effective way to implement background job processing via just using the database without introducing any other dependencies, which always come with a cost in terms of complexity and operational overhead.

You can find the full implementation at https://gist.github.com/jingkaihe/65840eb7f6176d1c0e4136f3d77e5e24.