Celery task in a Flask request context

Posted on Tue 03 November 2015 in Code • Tagged with Celery, Flask, Python, queue, request contextLeave a comment

Celery is an asynchronous task worker that’s frequently used for background processing in Python web apps. Rather than performing a time-consuming task within the request loop, we delegate it to a queue so that a worker process can pick it up when ready. The immediate benefit is much better latency for the end user. Pros also include easier scalability, since you can adjust the number of workers to your task load.

Examples of tasks that are usually best done in the background vary from fetching data through a third-party API to sending emails, and from pushing mobile notifications to pruning the database of stale records. Anything that may take more than a couple hundred milliseconds to complete — and isn’t absolutely essential to the current HTTP request — is typically a good candidate for asynchronous processing.

In Celery, workers are just regular Python processes, often running the exact same code that’s powering the app’s web frontend1. But unlike most of that code, they aren’t servicing any HTTP requests — they simply run some function with given arguments, both specified by whomever sent a task for execution. Indeed, those functions don’t even know who or what asked for them to be executed.

Neat, right? It is what we usually compliment as decoupling, or separation of concerns. They are valuable qualities even regardless of the UI and scaling benefits mentioned earlier.

Not quite web

But those qualities come with a trade-off. Task code is no longer a web frontend code: it doesn’t run within the comfy environment of our web framework of choice. Losing it may be quite unnerving, actually, because in a typical web application there will be many things tied directly to the HTTP request pipeline. After all, this is what web applications do — respond to HTTP requests — so it often makes perfect sense e.g. to marry the request flow with database transactions, committing or rolling those back according to HTTP status code that the app produced.

Tasks may also require a database, though, if only to assert the expected state of the world. Similar goes for memcache, a Redis instance, or basically any resource used by the frontend code. Alas, it’s quite possible the very reason we delegate work to a task is to shift lengthy interactions with those external systems away from the UI. Obviously, we’re going to need them for that!

Fake a request

So one way or another, our tasks will most likely need some initialization and/or cleanup code. And since it’s probably the same code that most HTTP request handlers require and use already, why not just pretend we’re handling a request after all?

In Flask, we can pull that off rather easily. The test_request_context method is conveniently provided to allow for faking the request context — that is, an execution environment for HTTP handlers. Like the name suggest, it is used mostly for testing, but there is nothing stopping us from using it in tasks run by Celery.

We probably don’t want to call it directly, though. What would be better is to have Celery prepare the context first, and then run the task code as if it was an HTTP handler. For even better results, the context would preserve information extracted from the actual HTTP request, one that has sent the task for execution. Moving some work to the background would then be a trivial matter, for both the task and the original handler would operate within the same environment.

Convenient? I believe so. And as I’ll demonstrate next, it isn’t very complicated to implement either.

Wrap the decorator?

At least one piece of the solution should stand out as pretty obvious. Since our intention is to wrap the task’s code in some additional packaging — the request context — it seems fairly natural to write our own @task decorator:

import functools

from myapp import app, celery


def task(**kwargs):
    """Decorator function to apply to Celery tasks.
    Executes the actual task inside Flask's test request context.
    """
    def decorator(func):
        """Actual decorator."""
        @celery.task(**kwargs)
        @functools.wraps(func)
        def wrapped(*args, **kwargs):
            with app.test_request_context():
                return func(*args, **kwargs)

        return wrapped

    return decorator

Here, app is the Flask application, and celery is the Celery object that’s often configured alongside it.

The Task class

While this technique will give us a request context inside the task code, it won’t be the request context from which the task has been sent for execution. To replicate that context correctly, we need additional support on the sender side.

Internally, Celery is converting every function we annotate with @task decorator into a subclass of Celery.app.task.Task. This process can be customized, for example by providing an explicit base= parameter to @task which specifies a custom Task subclass to use in place of the default one. Within that subclass, we’re free to override any functionality that we need to.

In our case, we’ll scrape the current request context from Flask for all relevant information, and later use it to recreate the context in the task code.

But before we get to that, let’s just create the Task subclass and move the above execution logic to it. This way, users won’t have to use a completely new @task decorator which would needlessly couple them to a specific Celery app instance:

from celery import Task

class RequestContextTask(Task):
    """Base class for tasks that run inside a Flask request context."""
    abstract = True

    def __call__(self, *args, **kwargs):
        with app.test_request_context():
            return super(RequestContextTask, self).__call__(*args, **kwargs)

Instead, they can either set this class as base= for a specific task:

@celery.task(base=RequestContextTask)
def sync_user_data(user_id):
    # ...

or make it into new default for all their tasks:

celery = Celery(...)
celery.Task = RequestContextTask

Invocation patterns

When the frontend asks for a task to be executed, it most often uses the Task.delay method. It will package a payload contaning task arguments, and send it off through a broker — usually an AMQP-based queue, such as RabbitMQ — so that a Celery worker can pick it up and actually execute.

But there are other means of task invocation. We can even run it “in place”, locally and synchronously, which is especially useful for various testing scenarios. Lastly, a task can also be retried from within its own code, terminating its current run and scheduling another attempt for some future date.

Obviously, for the RequestContextTask to be useful, it needs to behave correctly in every situation. Therefore we need to cover all the entry points I’ve mentioned — the asynchronous call, a synchronous invocation, and a task retry:

class RequestContextTask(Task):
    # ...

    def apply_async(self, args=None, kwargs=None, **rest):
        self._include_context(kwargs)
        return super(RequestContextTask, self) \
            .apply_async(args, kwargs, **rest)

    def apply(self, args=None, kwargs=None, **rest):
        self._include_context(kwargs)
        return super(RequestContextTask, self) \
            .apply(args, kwargs, **rest)

    def retry(self, args=None, kwargs=None, **rest):
        self._include_context(kwargs)
        return super(RequestContextTask, self) \
            .retry(args, kwargs, **rest)

Note that Task.apply_async is being called internally by Task.delay, so it’s only that first method that we have to override.

Context in a box

As you can deduce right away, the Flask-related magic is meant to go in the _include_context method. The idea is to prepare arguments for the eventual invocation of Flask.test_request_context, and pass them through an extra task parameter. Those arguments are relatively uncomplicated: they are just a medley of various pieces of information that we can easily obtain from the Flask’s request object:

from flask import has_request_context, request


class RequestContextTask(Task):
    CONTEXT_ARG_NAME = '_flask_request_context'

    # ...

    def _include_request_context(self, kwargs):
        """Includes all the information about current HTTP request context
        as an additional argument to the task.
        """
        if not has_request_context():
            return

        context = {
            'path': request.path,
            'base_url': request.url_root,
            'method': request.method,
            'headers': dict(request.headers),
        }
        if '?' in request.url:
            context['query_string'] = request.url[(request.url.find('?') + 1):]

        kwargs[self.CONTEXT_ARG_NAME] = context

On the worker side, we simply unpack them and recreate the context:

from flask import make_response

class RequestContextTask(Task):
    # ...
    def __call__(self, *args, **kwargs):
        call = lambda: super(RequestContextTask, self).__call__(*args, **kwargs)

        context = kwargs.pop(self.CONTEXT_ARG_NAME, None)
        if context is None or has_request_context():
            return call()

        with app.test_request_context(**context):
            result = call()
            app.process_response(make_response(result or ''))

        return result

The only tricky part is calling Flask.process_response at the end. We need to do that for the @after_request hooks to execute correctly. This is quite crucial, because those hooks are where you’d normally put important cleanup code, like commit/rollback of the database transaction.

Complete solution

To see how all those code snippets fit together, see this gist. You may also want to have a look at the article on Celery integration in Flask docs for some tips on how to integrate it with your own project.


  1. This isn’t strictly necessary, as Celery supports sending tasks for execution by explicit name. For that request to reach the worker, however, the task broker configuration must be correctly shared between sender and the worker. 

Continue reading