Celery task in a Flask request context

Posted on Tue 03 November 2015 in Code • Tagged with Celery, Flask, Python, queue, request contextLeave a comment

Celery is an asynchronous task worker that’s frequently used for background processing in Python web apps. Rather than performing a time-consuming task within the request loop, we delegate it to a queue so that a worker process can pick it up when ready. The immediate benefit is much better latency for the end user. Pros also include easier scalability, since you can adjust the number of workers to your task load.

Examples of tasks that are usually best done in the background vary from fetching data through a third-party API to sending emails, and from pushing mobile notifications to pruning the database of stale records. Anything that may take more than a couple hundred milliseconds to complete — and isn’t absolutely essential to the current HTTP request — is typically a good candidate for asynchronous processing.

In Celery, workers are just regular Python processes, often running the exact same code that’s powering the app’s web frontend1. But unlike most of that code, they aren’t servicing any HTTP requests — they simply run some function with given arguments, both specified by whomever sent a task for execution. Indeed, those functions don’t even know who or what asked for them to be executed.

Neat, right? It is what we usually compliment as decoupling, or separation of concerns. They are valuable qualities even regardless of the UI and scaling benefits mentioned earlier.

Not quite web

But those qualities come with a trade-off. Task code is no longer a web frontend code: it doesn’t run within the comfy environment of our web framework of choice. Losing it may be quite unnerving, actually, because in a typical web application there will be many things tied directly to the HTTP request pipeline. After all, this is what web applications do — respond to HTTP requests — so it often makes perfect sense e.g. to marry the request flow with database transactions, committing or rolling those back according to HTTP status code that the app produced.

Tasks may also require a database, though, if only to assert the expected state of the world. Similar goes for memcache, a Redis instance, or basically any resource used by the frontend code. Alas, it’s quite possible the very reason we delegate work to a task is to shift lengthy interactions with those external systems away from the UI. Obviously, we’re going to need them for that!

Fake a request

So one way or another, our tasks will most likely need some initialization and/or cleanup code. And since it’s probably the same code that most HTTP request handlers require and use already, why not just pretend we’re handling a request after all?

In Flask, we can pull that off rather easily. The test_request_context method is conveniently provided to allow for faking the request context — that is, an execution environment for HTTP handlers. Like the name suggest, it is used mostly for testing, but there is nothing stopping us from using it in tasks run by Celery.

We probably don’t want to call it directly, though. What would be better is to have Celery prepare the context first, and then run the task code as if it was an HTTP handler. For even better results, the context would preserve information extracted from the actual HTTP request, one that has sent the task for execution. Moving some work to the background would then be a trivial matter, for both the task and the original handler would operate within the same environment.

Convenient? I believe so. And as I’ll demonstrate next, it isn’t very complicated to implement either.

Wrap the decorator?

At least one piece of the solution should stand out as pretty obvious. Since our intention is to wrap the task’s code in some additional packaging — the request context — it seems fairly natural to write our own @task decorator:

import functools

from myapp import app, celery


def task(**kwargs):
    """Decorator function to apply to Celery tasks.
    Executes the actual task inside Flask's test request context.
    """
    def decorator(func):
        """Actual decorator."""
        @celery.task(**kwargs)
        @functools.wraps(func)
        def wrapped(*args, **kwargs):
            with app.test_request_context():
                return func(*args, **kwargs)

        return wrapped

    return decorator

Here, app is the Flask application, and celery is the Celery object that’s often configured alongside it.

The Task class

While this technique will give us a request context inside the task code, it won’t be the request context from which the task has been sent for execution. To replicate that context correctly, we need additional support on the sender side.

Internally, Celery is converting every function we annotate with @task decorator into a subclass of Celery.app.task.Task. This process can be customized, for example by providing an explicit base= parameter to @task which specifies a custom Task subclass to use in place of the default one. Within that subclass, we’re free to override any functionality that we need to.

In our case, we’ll scrape the current request context from Flask for all relevant information, and later use it to recreate the context in the task code.

But before we get to that, let’s just create the Task subclass and move the above execution logic to it. This way, users won’t have to use a completely new @task decorator which would needlessly couple them to a specific Celery app instance:

from celery import Task

class RequestContextTask(Task):
    """Base class for tasks that run inside a Flask request context."""
    abstract = True

    def __call__(self, *args, **kwargs):
        with app.test_request_context():
            return super(RequestContextTask, self).__call__(*args, **kwargs)

Instead, they can either set this class as base= for a specific task:

@celery.task(base=RequestContextTask)
def sync_user_data(user_id):
    # ...

or make it into new default for all their tasks:

celery = Celery(...)
celery.Task = RequestContextTask

Invocation patterns

When the frontend asks for a task to be executed, it most often uses the Task.delay method. It will package a payload contaning task arguments, and send it off through a broker — usually an AMQP-based queue, such as RabbitMQ — so that a Celery worker can pick it up and actually execute.

But there are other means of task invocation. We can even run it “in place”, locally and synchronously, which is especially useful for various testing scenarios. Lastly, a task can also be retried from within its own code, terminating its current run and scheduling another attempt for some future date.

Obviously, for the RequestContextTask to be useful, it needs to behave correctly in every situation. Therefore we need to cover all the entry points I’ve mentioned — the asynchronous call, a synchronous invocation, and a task retry:

class RequestContextTask(Task):
    # ...

    def apply_async(self, args=None, kwargs=None, **rest):
        self._include_context(kwargs)
        return super(RequestContextTask, self) \
            .apply_async(args, kwargs, **rest)

    def apply(self, args=None, kwargs=None, **rest):
        self._include_context(kwargs)
        return super(RequestContextTask, self) \
            .apply(args, kwargs, **rest)

    def retry(self, args=None, kwargs=None, **rest):
        self._include_context(kwargs)
        return super(RequestContextTask, self) \
            .retry(args, kwargs, **rest)

Note that Task.apply_async is being called internally by Task.delay, so it’s only that first method that we have to override.

Context in a box

As you can deduce right away, the Flask-related magic is meant to go in the _include_context method. The idea is to prepare arguments for the eventual invocation of Flask.test_request_context, and pass them through an extra task parameter. Those arguments are relatively uncomplicated: they are just a medley of various pieces of information that we can easily obtain from the Flask’s request object:

from flask import has_request_context, request


class RequestContextTask(Task):
    CONTEXT_ARG_NAME = '_flask_request_context'

    # ...

    def _include_request_context(self, kwargs):
        """Includes all the information about current HTTP request context
        as an additional argument to the task.
        """
        if not has_request_context():
            return

        context = {
            'path': request.path,
            'base_url': request.url_root,
            'method': request.method,
            'headers': dict(request.headers),
        }
        if '?' in request.url:
            context['query_string'] = request.url[(request.url.find('?') + 1):]

        kwargs[self.CONTEXT_ARG_NAME] = context

On the worker side, we simply unpack them and recreate the context:

from flask import make_response

class RequestContextTask(Task):
    # ...
    def __call__(self, *args, **kwargs):
        call = lambda: super(RequestContextTask, self).__call__(*args, **kwargs)

        context = kwargs.pop(self.CONTEXT_ARG_NAME, None)
        if context is None or has_request_context():
            return call()

        with app.test_request_context(**context):
            result = call()
            app.process_response(make_response(result or ''))

        return result

The only tricky part is calling Flask.process_response at the end. We need to do that for the @after_request hooks to execute correctly. This is quite crucial, because those hooks are where you’d normally put important cleanup code, like commit/rollback of the database transaction.

Complete solution

To see how all those code snippets fit together, see this gist. You may also want to have a look at the article on Celery integration in Flask docs for some tips on how to integrate it with your own project.


  1. This isn’t strictly necessary, as Celery supports sending tasks for execution by explicit name. For that request to reach the worker, however, the task broker configuration must be correctly shared between sender and the worker. 

Continue reading

CSS class helper for Jinja

Posted on Thu 22 October 2015 in Code • Tagged with Jinja, CSS, Python, FlaskLeave a comment

One of the great uses for a templating engine such as Jinja is reducing the clutter in our HTML source files. Even with the steady advances in the CSS1 standards, various div.wrapper, div.container, div.content, and other presentational elements are still a common fact of life. Unless you’re one of the cool kids who use Web Components with Polymer, your main option for abstracting this boilerplate away is defining some more general template macros.

As with any kind of abstraction, it’s crucial to balance broad applicability with a potential for finely-tuned customization. In the case of wrapping HTML snippets in Jinja macros, an easy way to maintain flexibility is to allow the caller to specify some crucial attributes of the root element:

{#
  Renders the markup for a <button> from Twitter Bootstrap.
#}
{% macro button(text=none, style='default', type='button', class='') %}
  <button type="{{ type }}"
        class="btn btn-{{ style }}{% if class %} {{ class }}{% endif %}"
        {{ kwargs|xmlattr }}>
    {{ text }}
  </button>
{% endmacro %}

An element id would be a good example, and in the above macro it’s handled implicility thanks to the {{ kwargs|xmlattr }} stanza.

class, however, is not that simple, because a macro like that usually needs to supply some CSS classes of its own. The operation of combining them with additional ones, passed by the caller, is awfully textual and error-prone. It’s easy, for example, to forget about the crucial space and run two class names together.

As if CSS wasn’t difficult enough to debug already!

Let them have list

The root cause for any of those problems is working at a level that’s too low for the task. The value for a class attribute may be encoded as string, but it’s fundamentally a list of tokens. In the modern DOM API, for example, it is represented as exactly that: a DOMTokenList.

I’ve found it helpful to replicate a similar mechanism in Jinja templates. The result is a ClassList wrapper whose code I quote here in full:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
from collections import Iterable, MutableSet


class ClassList(MutableSet):
    """Data structure for holding, and ultimately returning as a single string,
    a set of identifiers that should be managed like CSS classes.
    """
    def __init__(self, arg=None):
        """Constructor.
        :param arg: A single class name or an iterable thereof.
        """
        if isinstance(arg, basestring):
            classes = arg.split()
        elif isinstance(arg, Iterable):
            classes = arg
        elif arg is not None:
            raise TypeError(
                "expected a string or string iterable, got %r" % type(arg))

        self.classes = set(filter(None, classes))

    def __contains__(self, class_):
        return class_ in self.classes

    def __iter__(self):
        return iter(self.classes)

    def __len__(self):
        return len(self.classes)

    def add(self, *classes):
        for class_ in classes:
            self.classes.add(class_)

    def discard(self, *classes):
        for class_ in classes:
            self.classes.discard(class_)

    def __str__(self):
        return " ".join(sorted(self.classes))

    def __html__(self):
        return 'class="%s"' % self if self else ""

To make it work with Flask, adorn the class with Flask.template_global decorator:

from myflaskapp import app

@app.template_global('classlist')
class ClassList(MutableSet):
    # ...

Otherwise, if you’re working with a raw Jinja Environment, simply add ClassList to its global namespace directly:

jinja_env.globals['classlist'] = ClassList

In either case, I recommend following the apparent Jinja convention of naming template symbols as lowercasewithoutspaces.

Becoming classy

Usage of this new classlist helper is relatively straightforward. Since it accepts both a space-separated string or an iterable of CSS class names, a macro can wrap anything the caller would pass it as a value for class attribute:

{% macro button(text=none, style='default', type='button', class='') %}
  {% set classes = classlist(class) %}
  {# ... #}

The classlist is capable of producing the complete class attribute syntax (i.e. class="..."), or omit it entirely if it would be empty. All we need to do is evaluate it using the normal Jinja expression syntax:

  <button type="{{ type }}" {{ classes }} {{ kwargs|xmlattr }}>

Before that, though, we may want to include some additional classes that are specific to our macro. The button macro, for example, needs to add the Bootstrap-specific btn and btn-$STYLE classes to the <button> element it produces2:

  {% do classes.add('btn', 'btn-' + style) %}

After executing this statement, the final class attribute contains both the caller-provided classes, as well those two that were added explicitly.


  1. Believe it or not, we can finally center the content vertically without much of an issue! 

  2. The {% do %} block in Jinja allows to execute statements (such as function calls) without evaluating values they return. It is not exposed by default, but adding a standard jinja2.ext.do extension to the Environment makes it available. 

Continue reading

Reload Jinja macro files

Posted on Thu 24 September 2015 in Code • Tagged with Python, Jinja, Flask, Werkzeug, Flask-ScriptLeave a comment

The integration between Jinja templating engine and the Flask microframework for Python is quite seamless most of the time. It also facilitates rapid development: when we run our web application through Flask’s development server, any changes in Python code will get picked up immediately without restarting it. Similarly, Jinja’s default template loader will detect whether any template has been modified after its last compilation and recompile it if necessary.

One instance when it doesn’t seem to work that well, though, is template files that only contain Jinja macros. They apparently aren’t subject to the same caching rules that apply to regular templates. Modifications to those files alone may not be picked up by Jinja Environment, causing render_template calls in Flask to (indirectly) use their stale versions.

I’ll be watching you

This problem can be alleviated by making the server watch those macro files explicitly, not unlike the Python sources it already monitors. When running a Flask server through the Flask.run method, you can pass additional keyword arguments which aren’t handled by Flask itself, but by the underlying WSGI scaffolding called Werkzeug. The automatic reloader is actually part of it and is quite configurable. Most importantly, it allows passing a list of extra_files= to watch:

import os

app.run(host='0.0.0.0', port=int(os.environ.get('PORT', 5000)),
        extra_files=[os.path.join(app.root_path, app.template_folder, 'macros.html')])

But of course it’s tedious and error-prone to keep this list up to date manually, so let’s try to do better.

…all of you

I’m going to assume you’re keeping all your Jinja macro files inside a single directory. This makes sense, as macros are reusable pieces of template code that are imported by multiple regular templates, so they shouldn’t be scattered around the codebase without some order. The folder may be named macros, util, common, or similar; what’s important is that all the macros have a designated place in the project source tree.

Under this assumption, it’s not difficult to programmatically compute their complete list1:

from pathlib import Path

from myapplication import app  # Flask application object


#: Directories under app's template folder that contain files
#: with only Jinja macros.
JINJA_MACRO_DIRS = ('macros',)


def get_extra_files():
    """Returns an iterable of the extra files that the Flask server
    should monitor for changes and reload the app if any has been modified.
    """
    app_root = Path(app.root_path)

    for dirname in JINJA_MACRO_DIRS:
        macros_dir = app_root / app.template_folder / dirname
        for filepath in macros_dir.rglob('*.html'):
            yield str(filepath)

If you’re using Flask blueprints, in addition to the global application templates you’d also want to monitor any blueprint-specific macro files:

        for bp in (app.blueprints or {}).values():
            macros_dir = Path(bp.root_path) / bp.template_folder / dirname
            for filepath in macros_dir.rglob('*.html'):
                yield str(filepath)

A new start

How you’re going to use the get_extra_files function from the snippet above depends on how exactly you’re running the Flask development server. In the simplest case, when Flask.run is invoked at the top-level scope of some module, it’s pretty obvious:

app.run(..., extra_files=list(get_extra_files()))

More realistically, though, you may be using the Flask-Script extension for all management tasks, including starting the server. If so, calling Flask.run will be relegated to it, so you’ll need to override the Server command to inject additional parameters there:

import os

from flask.ext.script import Manager, Server as _Server

from myapplication import app


class Server(_Server):
    def __init__(self, *args, **kwargs):
        if kwargs.get('use_reloader', True):
            kwargs.setdefault('extra_files', []).extend(get_extra_files())
        super(Server, self).__init__(*args, **kwargs)


manager = Manager(app, with_default_commands=False)

server = Server(port=int(os.environ.get('PORT', 5000)))
manager.add_command('server', server)

This new server command should work just like the default one provided by Flask-Script out of the box, except that all your Jinja macro files will now be monitored for changes.


  1. Like before, I’m using the pathlib module, for which there is a backport if you’re using Python 3.3 or older. 

Continue reading

Automatic error pages in Flask

Posted on Tue 08 September 2015 in Code • Tagged with Flask, Python, HTTP, errorsLeave a comment

In Flask, a popular web framework for Python, it’s pretty easy to implement custom handlers for specific HTTP errors. All you have to do is to write a function and annotate it with the @errorhandler decorator:

@app.errorhandler(404)
def not_found(error):
    return render_template('errors/404.html')

Albeit simple, the above example is actually very realistic. There’s rarely anything else to do in response to serious request error than to send back some HTML with an appropriate message. If you have more handlers like that, though, you’ll notice they get pretty repetitive: for each erroneous HTTP status code (404, 403, 400, etc.), pick a corresponding template and simply render it.

Which is, of course, why we’d want to deal with all in a little smarter way and with less boilerplate.

Just add a template

Ideally, we would like to avoid writing any Python code at all for each individual error handler. Since all we’re doing revolves around predefined templates, let’s just define the handlers automatically based on the HTML files themselves.

Assuming we store them in the same template directory — say, errors/ — and name their files after numeric status codes (e.g. 404.html), getting all those codes is quite simple1:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
from pathlib import Path, PurePath

from mywebapp import app


#: Path to the directory where HTML templates for error pages are stored.
ERRORS_DIR = PurePath('errors')


def get_supported_error_codes():
    """Returns an iterable of HTTP status codes for which we have
    a custom error page templates defined.
    """
    error_templates_dir = Path(app.root_path, app.template_folder, ERRORS_DIR)

    potential_error_templates = (
        entry for entry in error_templates_dir.glob('*.html')
        if entry.is_file())
    for template in potential_error_templates:
        try:
            code = int(template.stem)  # e.g. 404.html
        except ValueError:
            pass  # could be some base.html template, or similar
        else:
            if code < 400:
                app.logger.warning(
                    "Found error template for non-error HTTP status %s", code)
                continue
            yield code

Once we have them, we can try wiring up the handlers programmatically and making them do the right thing.

One function to handle them all

Although I’ve used a plural here, in reality we only need a single handler, as it’ll be perfectly capable of dealing with any HTTP error we bind it to. To help distinguishing between the different status codes, Flask by default invokes our error handlers with an HTTPException argument that has the code as an attribute:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
from flask import render_template
from jinja2 import TemplateNotFound


def error_handler(error):
    """Universal handler for all HTTP errors.

    :param error: :class:`~werkzeug.exceptions.HTTPException`
                  representing the HTTP error.

    :return: HTTP response to be returned
    """
    code = getattr(error, 'code', None)
    if not code:
        app.logger.warning(
            "Got spurious argument to HTTP error handler: %r", error)
        return FATAL_ERROR_RESPONSE

    app.logger.debug("HTTP error %s, rendering error page", code)
    template = ERRORS_DIR / ('%s.html' % code)
    try:
        return render_template(str(template)), code
    except TemplateNotFound:
        # shouldn't happen if the handler has been wired up properly
        app.logger.fatal("Missing template for HTTP error page: %s", template)
        return FATAL_ERROR_RESPONSE

#: Response emitted when an error occurs in the error handler itself.
FATAL_ERROR_RESPONSE = ("Internal Server Error", 500)

Catching TemplateNotFound exception is admittedly a little paranoid here, as it can occur pretty much exclusively due to a programming error elsewhere. Feel free to treat it as a failed assertion about application’s internal state and e.g. convert to AssertionError if desirable.

The setup

The final step is to actually set up the handler(s):

for code in get_supported_error_codes():
    app.errorhandler(code)(error_handler)

It may look a bit wonky, but it’s just a simple desugaring of the standard Python decorator syntax from the first example. There exists a more direct approach of putting the handler inside app.error_handler_spec dictionary, but it is discouraged by Flask documentation.

Where to put the above code, though? I posit it’s perfectly fine to place in the module’s global scope, because the error handlers (and other request handlers) are traditionally defined at import time anyway.

Also notice that the default error handling we’ve defined here doesn’t preclude more specialized variants for select HTTP codes. All you have to do is ensure that your custom @app.errorhandler definition occurs after the above loop.


  1. Note that I’m using the pathlib module here — and you should, too, since it’s all around awesome. However, it is only a part of the standard library since Python 3.4, so you will most likely need to get the backport first. 

Continue reading