Simple Durable Background Job Processing Using Database as a Queue
Why Background Job Processing?
Background job processing is not a new concept. It has been around for a long time. The main idea is to offload the long processing tasks from the foreground (e.g. web server or job dispatcher) to a set of workers that run in the background in parallel. There are a few benefits for this:
- This avoids having long running tasks such as sending emails, processing images, documents etc from blocking the foreground HTTP server. This ensures the quick and snappy response to user requests.
- Via decoupling, we can scale the background workers independently from the foreground, which allows better resource utilization and scalability.
- Typically the background jobs are backed by a durable storage such as a queue or database to ensure the jobs are not lost when the workers are stoppped or restarted. This brings greater reliability and fault tolerance.
Why Database as a Queue?
Using database as a queue for backgroud job processing might sounds like a heresy as majority of the cloud vendors encourage you to burn money on their own queue services. Also historically database (RDBMS) doesn't have a particularly good reputation as a queue service, mainly due to the blockish nature of the transaction isolation (this will be explained in greater detail later in section Dequeueing A Task). However with recent years advancements in CPUs and SSDs and the RDBMS features, database is now a pretty viable option - in fact it works at massive scale for hyper-scalers such as Meta. Besides for your average "twitter-for-cats" web service, scalability is not much of an issue. There are a few benefits of using database as a queue for background job processing:
- Simplicity: A database and your language runtime are all you need. It's always nice to cut down the number of dependencies as much as possible.
- Transactional: Due to the transactional nature of RDBMS, we can ensure the consistency of the job processing with commit/rollback. The bonus point is that if your queue and your business logic shares the same database, you can pretty much guarantee the atomicity of the job processing.
- Durable: The job queue is essentially backed up as a table in the database, which means it can survive the most catastrophic failure.
In the following example we will implement the background job processing using sqlite3 and SQLModel and Python.
Historically sqlite3 lacks the ability to have multi-process support, but this has largely been solved via the write-ahead logging feature. Replication support from Litestream also helps you to run production-grade sqlite3 database system in a distributed environment at a dirt cheap price.
Basic Data Model
Here is the queue item definition:
class TaskStatus(Enum):
PENDING = "pending"
RUNNING = "running"
COMPLETED = "completed"
FAILED = "failed"
DEFAULT_PRIORITY = 5
class SQLModel(_SQLModel, registry=registry()):
metadata = MetaData()
class TaskItem(SQLModel, table=True):
id: int = Field(primary_key=True)
args: List[Any] = Field(sa_column=Column(JSON))
kwargs: Dict[str, Any] = Field(sa_column=Column(JSON))
func: str
result: Any = Field(sa_column=Column(JSON))
error: Optional[str] = Field(default=None, nullable=True)
status: TaskStatus = Field(default=TaskStatus.PENDING, index=True)
generation_id: int = Field(default=1)
created_at: datetime = Field(default=datetime.now(UTC))
updated_at: datetime = Field(default=datetime.now(UTC))
priority: int = Field(default=DEFAULT_PRIORITY)
In the above code, we define a TaskItem
class that represents a single job item in the queue. It has a status
field that indicates the current state of the job, and a priority
field that indicates the priority of the job.
As we want the function and its arguments to be queueable, we define func
, args
and kwargs
fields. We also define result
, error
and status
fields to store the result of the job, the error if the job failed, and the current status of the job.
For args
, kwargs
and result
, we use the JSON
type to store the data. This allows us to store arbitrary data that can be serialized and deserialized into json format, but it's probably gotta fail miserably for any complicated data structure.
Enqueueing a Task
This is how a task enqueueing looks like:
def enqueue_task(
session: Session,
fn: Callable[..., Awaitable[Any]],
*args: List[Any],
priority: int = DEFAULT_PRIORITY,
**kwargs: Dict[str, Any],
):
fn_module = fn.__module__
fn_name = fn.__name__
task = TaskItem(
func=f"{fn_module}.{fn_name}",
args=args,
kwargs=kwargs,
priority=priority,
)
session.add(task)
session.commit()
return task.id
This function takes a function and its arguments, and enqueues it into the database. It returns the id of the task item. Nothing fancy here.
Dequeueing a Task
Naive Not-working Approach
This is where the tricky part comes in. In a conventional RDBMS queueing system there are always multiple workers that are working simultaneously. This means that there is a high chance that multiple workers will try to dequeue the same task at the same time. Like the naive approach below. The system will end up with many duplicated tasks
START TRANSACTION;
SELECT * FROM task_items WHERE status = 'pending' ORDER BY priority DESC LIMIT 1;
-- start processing
COMMIT;
Using FOR UPDATE
To avoid this racing condition systems often use the FOR UPDATE
clause to lock the row for the duration of the job processing within a transaction:
START TRANSACTION;
SELECT * FROM task_items WHERE status = 'pending' FOR UPDATE ORDER BY priority DESC LIMIT 1;
-- start processing
COMMIT;
This approach ensures that when multiple workers try to dequeue the same task simultaneously, only one worker will be able to claim the task and start processing. The downside of this approach is that other worker's transactions will be blocked until the first worker commits the transaction, which is not great for performance.
Using FOR UPDATE SKIP LOCKED
In MySQL and Postgres you can mitigate the contention by using the SELECT ... FOR UPDATE SKIP LOCKED
. It is introduced in MySQL 8.0.1 and Postgres 9.5.
START TRANSACTION;
SELECT * FROM task_items WHERE status = 'pending' FOR UPDATE SKIP LOCKED ORDER BY priority DESC LIMIT 1;
-- start processing
COMMIT;
In this case if multiple workers try to dequeue tasks at the same time. The first worker will claim whatever task that is at top of the selected. For the others they will simply ignore that task and try the records next to it.
The SELECT ... FOR UPDATE SKIP LOCKED
sounds perfect. In fact the state-of-the-art solid-queue uses this approach.
pg_try_advisory_lock
The pg_try_advisory_lock
function (a non-blocking alternative to pg_advisory_lock
) is a PostgreSQL-specific function that attempts to acquire an advisory lock on a specific key (in our case the task id). If the lock is already held by another session, the function returns false immediately, possibly running the lock acquisition in a loop. In our specific case this can be used as a non-blocking alternative to SELECT ... FOR UPDATE SKIP LOCKED
.
Optimistic Locking (The actual implementation)
The SELECT ... FOR UPDATE SKIP LOCKED
and pg_try_advisory_lock
both sounds promising. However sqlite3 doesn't support SELECT ... FOR UPDATE
, neither does it support pg_try_advisory_lock
for obvious reasons.
So we need to find a different approach. The following is the actual implementation of the task dequeueing:
def dequeue_task(session: Session):
task = session.exec(
select(TaskItem)
.where(TaskItem.status == TaskStatus.PENDING)
.order_by(TaskItem.priority.desc())
).first()
if not task:
return None
# an optimistic lock to prevent race condition
result = session.exec(
update(TaskItem)
.where(TaskItem.id == task.id)
.where(TaskItem.generation_id == task.generation_id)
.values(
status=TaskStatus.RUNNING,
generation_id=task.generation_id + 1,
updated_at=datetime.now(UTC),
)
)
if result.rowcount == 0:
return dequeue_task(session)
session.refresh(task)
return task
In this implementation we use optimistic locking to implement lock-free task dequeueing. The generation_id
field is used to implement the optimistic lock. When a worker claims a task, it increments the generation_id
field. If another worker tries to claim the same task at the same time, it will fail because the generation_id
field will not match - This is because the generation_id
is incremented by the first worker atomically. In such case the other worker will simply try again. Given the (task_id
, status
) claiming is fairly quick and snappy. It is likely to succeed in the next attempt.
This approach ensures:
- Multiple workers won't dequeue the same task concurrently.
- No contention on the task table as there is no invovement of transactional locks.
The concept of generation_id
is certainly not new. You can find it like for like in Kubernetes .metadata.resourceVersion
, it is used in a similar fashion.
Here is a little helper function to await for the completion of the task:
async def await_task_completion(
session: Session,
task_id: int,
timeout: float = 5,
interval: float = 0.1,
) -> TaskItem:
start = time.time()
while True:
task = session.exec(select(TaskItem).where(TaskItem.id == task_id)).first()
logger.info("task status", task_id=task_id, status=task.status)
if task.status == TaskStatus.COMPLETED or task.status == TaskStatus.FAILED:
return task
if time.time() - start > timeout:
raise TimeoutError(
f"Task {task_id} did not complete within {timeout} seconds"
)
await asyncio.sleep(interval)
The background job worker
You can find it at here - https://gist.github.com/jingkaihe/65840eb7f6176d1c0e4136f3d77e5e24#file-dbq-py-L122-L172. It's fairly straitforward so I won't go into the details.
See it in action
from dbq import (
Worker,
enqueue_task,
await_task_completion,
SQLModel,
)
from sqlmodel import Session, create_engine
import asyncio
import structlog
logger = structlog.get_logger(__name__)
async def simple_task(a: int, b: int, text: str):
await asyncio.sleep(1)
print(text)
return {
"added": a + b,
"text": text,
}
async def main():
engine = create_engine("sqlite:///:memory:")
SQLModel.metadata.create_all(engine)
session = Session(engine)
worker = Worker(session, concurrency=5)
worker_task = asyncio.create_task(worker.start())
try:
task_ids = [
enqueue_task(session, simple_task, i, i + 1, f"Hello, World {i}!")
for i in range(10)
]
await_tasks = [await_task_completion(session, task_id) for task_id in task_ids]
await asyncio.gather(*await_tasks)
finally:
await worker.stop()
worker_task.cancel()
try:
await worker_task
except asyncio.CancelledError:
logger.info("worker task cancelled")
if __name__ == "__main__":
asyncio.run(main())
I run it with time python main.py
and it runs under 3 seconds:
real 0m2.942s
user 0m0.827s
sys 0m0.174s
Conclusion
Obviously this is not a full-blown background job processing system like Celery however it is a simple and effective way to implement background job processing via just using the database without introducing any other dependencies, which always come with a cost in terms of complexity and operational overhead.
You can find the full implementation at https://gist.github.com/jingkaihe/65840eb7f6176d1c0e4136f3d77e5e24.