Celery task in a Flask request context
Posted on Tue 03 November 2015 in Code
Celery is an asynchronous task worker that’s frequently used for background processing in Python web apps. Rather than performing a time-consuming task within the request loop, we delegate it to a queue so that a worker process can pick it up when ready. The immediate benefit is much better latency for the end user. Pros also include easier scalability, since you can adjust the number of workers to your task load.
Examples of tasks that are usually best done in the background vary from fetching data through a third-party API to sending emails, and from pushing mobile notifications to pruning the database of stale records. Anything that may take more than a couple hundred milliseconds to complete — and isn’t absolutely essential to the current HTTP request — is typically a good candidate for asynchronous processing.
In Celery, workers are just regular Python processes, often running the exact same code that’s powering the app’s web frontend1. But unlike most of that code, they aren’t servicing any HTTP requests — they simply run some function with given arguments, both specified by whomever sent a task for execution. Indeed, those functions don’t even know who or what asked for them to be executed.
Neat, right? It is what we usually compliment as decoupling, or separation of concerns. They are valuable qualities even regardless of the UI and scaling benefits mentioned earlier.
Not quite web
But those qualities come with a trade-off. Task code is no longer a web frontend code: it doesn’t run within the comfy environment of our web framework of choice. Losing it may be quite unnerving, actually, because in a typical web application there will be many things tied directly to the HTTP request pipeline. After all, this is what web applications do — respond to HTTP requests — so it often makes perfect sense e.g. to marry the request flow with database transactions, committing or rolling those back according to HTTP status code that the app produced.
Tasks may also require a database, though, if only to assert the expected state of the world. Similar goes for memcache, a Redis instance, or basically any resource used by the frontend code. Alas, it’s quite possible the very reason we delegate work to a task is to shift lengthy interactions with those external systems away from the UI. Obviously, we’re going to need them for that!
Fake a request
So one way or another, our tasks will most likely need some initialization and/or cleanup code. And since it’s probably the same code that most HTTP request handlers require and use already, why not just pretend we’re handling a request after all?
In Flask, we can pull that off rather easily.
The test_request_context
method
is conveniently provided to allow for faking the request context — that is, an execution environment for HTTP handlers.
Like the name suggest, it is used mostly for testing, but there is nothing
stopping us from using it in tasks run by Celery.
We probably don’t want to call it directly, though. What would be better is to have Celery prepare the context first, and then run the task code as if it was an HTTP handler. For even better results, the context would preserve information extracted from the actual HTTP request, one that has sent the task for execution. Moving some work to the background would then be a trivial matter, for both the task and the original handler would operate within the same environment.
Convenient? I believe so. And as I’ll demonstrate next, it isn’t very complicated to implement either.
Wrap the decorator?
At least one piece of the solution should stand out as pretty obvious. Since our intention is to wrap the task’s code
in some additional packaging — the request context — it seems fairly natural to write our own @task
decorator:
import functools
from myapp import app, celery
def task(**kwargs):
"""Decorator function to apply to Celery tasks.
Executes the actual task inside Flask's test request context.
"""
def decorator(func):
"""Actual decorator."""
@celery.task(**kwargs)
@functools.wraps(func)
def wrapped(*args, **kwargs):
with app.test_request_context():
return func(*args, **kwargs)
return wrapped
return decorator
Here, app
is the Flask
application, and celery
is the
Celery
object
that’s often configured alongside it.
The Task
class
While this technique will give us a request context inside the task code, it won’t be the request context from which the task has been sent for execution. To replicate that context correctly, we need additional support on the sender side.
Internally, Celery is converting every function we annotate with @task
decorator into a subclass of
Celery.app.task.Task
.
This process can be customized, for example by providing an explicit base=
parameter to @task
which
specifies a custom Task
subclass
to use in place of the default one. Within that subclass, we’re free to override any functionality that we need to.
In our case, we’ll scrape the current request context from Flask for all relevant information, and later use it to recreate the context in the task code.
But before we get to that, let’s just create the Task
subclass and move the above execution logic to it.
This way, users won’t have to use a completely new @task
decorator which would needlessly couple them to a specific
Celery
app instance:
from celery import Task
class RequestContextTask(Task):
"""Base class for tasks that run inside a Flask request context."""
abstract = True
def __call__(self, *args, **kwargs):
with app.test_request_context():
return super(RequestContextTask, self).__call__(*args, **kwargs)
Instead, they can either set this class as base=
for a specific task:
@celery.task(base=RequestContextTask)
def sync_user_data(user_id):
# ...
or make it into new default for all their tasks:
celery = Celery(...)
celery.Task = RequestContextTask
Invocation patterns
When the frontend asks for a task to be executed, it most often uses the
Task.delay
method.
It will package a payload contaning task arguments, and send it off through a broker —
usually an AMQP-based queue,
such as RabbitMQ — so that a Celery worker can pick it up and actually execute.
But there are other means of task invocation. We can even run it “in place”, locally and synchronously, which is especially useful for various testing scenarios. Lastly, a task can also be retried from within its own code, terminating its current run and scheduling another attempt for some future date.
Obviously, for the RequestContextTask
to be useful, it needs to behave correctly in every situation.
Therefore we need to cover all the entry points I’ve mentioned —
the asynchronous call, a synchronous invocation, and a task retry:
class RequestContextTask(Task):
# ...
def apply_async(self, args=None, kwargs=None, **rest):
self._include_context(kwargs)
return super(RequestContextTask, self) \
.apply_async(args, kwargs, **rest)
def apply(self, args=None, kwargs=None, **rest):
self._include_context(kwargs)
return super(RequestContextTask, self) \
.apply(args, kwargs, **rest)
def retry(self, args=None, kwargs=None, **rest):
self._include_context(kwargs)
return super(RequestContextTask, self) \
.retry(args, kwargs, **rest)
Note that Task.apply_async
is being called internally by Task.delay
,
so it’s only that first method that we have to override.
Context in a box
As you can deduce right away, the Flask-related magic is meant to go in the _include_context
method.
The idea is to prepare arguments for the eventual invocation of Flask.test_request_context
,
and pass them through an extra task parameter.
Those arguments are relatively uncomplicated: they are just a medley of various pieces of information
that we can easily obtain from the Flask’s request
object:
from flask import has_request_context, request
class RequestContextTask(Task):
CONTEXT_ARG_NAME = '_flask_request_context'
# ...
def _include_request_context(self, kwargs):
"""Includes all the information about current HTTP request context
as an additional argument to the task.
"""
if not has_request_context():
return
context = {
'path': request.path,
'base_url': request.url_root,
'method': request.method,
'headers': dict(request.headers),
}
if '?' in request.url:
context['query_string'] = request.url[(request.url.find('?') + 1):]
kwargs[self.CONTEXT_ARG_NAME] = context
On the worker side, we simply unpack them and recreate the context:
from flask import make_response
class RequestContextTask(Task):
# ...
def __call__(self, *args, **kwargs):
call = lambda: super(RequestContextTask, self).__call__(*args, **kwargs)
context = kwargs.pop(self.CONTEXT_ARG_NAME, None)
if context is None or has_request_context():
return call()
with app.test_request_context(**context):
result = call()
app.process_response(make_response(result or ''))
return result
The only tricky part is calling
Flask.process_response
at the end.
We need to do that for the @after_request
hooks
to execute correctly. This is quite crucial, because those hooks are where you’d normally put important cleanup code,
like commit/rollback of the database transaction.
Complete solution
To see how all those code snippets fit together, see this gist. You may also want to have a look at the article on Celery integration in Flask docs for some tips on how to integrate it with your own project.
-
This isn’t strictly necessary, as Celery supports sending tasks for execution by explicit name. For that request to reach the worker, however, the task broker configuration must be correctly shared between sender and the worker. ↩