Using Celery to Distribute Processing

Warning

Celery doesn’t work quite right under Python 3.3. It works fine under 2.6-3.2 and pypy. Follow Bug 1107 on Celery for progress.

Emit makes it simple to use celery to distribute realtime processing across many worker nodes. To demonstrate this, we’ll be scaling our quickstart example

We’ll be making, in essence, this graph:

../_images/graph1.png

Installing

If you have a very recent version of pip, Emit can be installed pre-bundled with celery by installing with the following extra:

pip install emit[celery-routing]

Otherwise, you’ll need celery>=3.0.13, as well as the libraries for whatever broker you’ll be using.

Setting up Celery

Create an app.py file for your celery initializaition code to live in:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
'simple celery app'
from celery import Celery
from emit.router.celery import CeleryRouter

import logging

app = Celery(
    'celery_emit_example',
    broker='redis://'
)
app.conf.update(
    CELERY_IMPORTS=('tasks',)
)

router = CeleryRouter(celery_task=app.task, node_modules=['tasks'])

logging.basicConfig(format='%(levelname)s:%(message)s', level=logging.DEBUG)

Take note that Router is initialized using the default celery task in this case. This is probably the best way to do it, since per-task settings should belong in the task (possible in Emit’s decorator), and app-level configuration should be on the app object (as on line 10).

Next we’ll define (in tasks.py) a function to take a document and emit each word:

@router.node(('word',), entry_point=True)
def emit_words(msg):
    for word in msg.document.strip().split(' '):
        yield word

We don’t have to give any special syntax to get these tasks to work with celery: since we specified it in the router, they just do.

However, if you want to give special celery attributes to a particular function, you can do that too:

@router.node(('word', 'count'), subscribe_to='tasks.emit_words', celery_task=app.task(rate_limit='5/s'))
def tally_word(msg):
    redis = Redis()
    return msg.word, redis.zincrby('celery_emit_example', msg.word, 1)

Obviously rate limiting to 5 per second in this case is a bit contrived, but you get the general idea: it’s easy to configure tasks within the decorator by passing in the celery decorator.

The available parameters:

parameter default effect
celery_task None override the supplied celery task with a node-specific tas

Running the Graph

We’ll need to boot up the celery daemon:

celery worker -A app.app -l INFO -E

And enter the following on the command line to start something fun processing (if you’d like, the relevant code is in examples/celery/kickoff.py in the project directory, start it and get a prompt with ipython -i kickoff.py):

from app import router
import random
words = 'the rain in spain falls mainly on the plain'.split(' ')
router(document=' '.join(random.choice(words) for i in range(50)))

You should get something like the following:

({'word': 'the'},
 {'word': 'spain'},
 {'word': 'in'},
 # ...
 {'word': 'falls'},
 {'word': 'falls'},
 {'word': 'mainly'})

And you should see the celery window quickly scrolling by with updated totals. Run the command a couple more times, if you like, and you’ll see the totals keep going up.