Welcome to Emit’s documentation!

Emit is a library that hooks into distributed systems like Celery or RQ (or just local memory) to provide subscriptions and notifications for your functions. It is designed to make processing streams of information a whole lot easier.

You may want to start at Getting Started. Other highlights include integration with Celery and RQ and Emit’s multi-language capabilities.

Contents:

Getting Started

Installing

You can install emit with pip:

pip install emit

Quickstart

For a sampler, we’re going to make a simple command-line application that will take and count all the words in a document, giving you the top 5.

Put the following into graph.py

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
from __future__ import print_function
from collections import Counter
from emit import Router
import sys

router = Router()


def prefix(name):
    return '%s.%s' % (__name__, name)


@router.node(('word',), entry_point=True)
def words(msg):
    print('got document')
    for word in msg.document.strip().split(' '):
        yield word


WORDS = Counter()


@router.node(('word', 'count'), prefix('words'))
def count_word(msg):
    print('got word (%s)' % msg.word)

    global WORDS
    WORDS.update([msg.word])

    return msg.word, WORDS[msg.word]

if __name__ == '__main__':
    router(document=sys.stdin.read())

    print()
    print('Top 5 words:')
    for word, count in WORDS.most_common(5):
        print('    %s: %s' % (word, count))

(incidentally, this file is available in the project directory as examples/simple/graph.py.)

Now on the command line: echo "the rain in spain falls mainly on the plain" | python graph.py. You should get some output that looks similar to the following

got document
got word (the)
got word (rain)
got word (in)
got word (spain)
got word (falls)
got word (mainly)
got word (on)
got word (the)
got word (plain)

Top 5 words:
    the: 2
    on: 1
    plain: 1
    mainly: 1
    rain: 1

Breaking it Down

First, we need to construct a router:

router = Router()

Since we’re keeping everything in-memory, we don’t need to specify anything to get this to work properly. It should “Just Work(TM)”.

Next, we define a function to split apart a document on spaces to get words:

@router.node(('word',), entry_point=True)
def words(msg):
    print('got document')
    for word in msg.document.strip().split(' '):
        yield word

Router provides a decorator (node). The first argument is the fields that the decorated function returns. These are wrapped in a message and passed around between functions.

We don’t specify any subscriptions on this function, since it really doesn’t need any. In fact, it’s an entry point, so we specify that instead. This specifically means that if you call the router directly it will delegate to this function. There can be multiple functions with entry_point set to true on a given Router.

If the decorated function is a generator, each yielded value is treated as a separate input into the next nodes in the graph.

Splitting the document into parts is only as useful as what we can do with the words, so let’s count them now:

WORDS = Counter()
@router.node(('word', 'count'), prefix('words'))
def count_word(msg):
    print('got word (%s)' % msg.word)

    global WORDS
    WORDS.update([msg.word])

    return msg.word, WORDS[msg.word]

There’s a little less going on in this function. We just update a Counter builtin, and then return the word and the count to be passed down the graph. In real life, you’d probably persist this value in a database to allow multiple workers to process different parts of the stream.

In non-entry nodes, the second argument of router.node is a string or list of functions to subscribe to. These need to be fully qualified when you’re using Celery, but for now they’re fine.

Now that we’ve defined both functions, it’s time to send some data into our graph:

    router(document=sys.stdin.read())

Calling this graph is easy, since we defined a function as an entry point. You can call any of the functions (or the router itself) by using keyword arguments or passing a dictionary.

In the end, data flows through the graph like this:

_images/graph2.png

Next Steps

Now that you’ve got this under your belt, check out how to integrate your graph with RQ or Celery.

Distributing Work

Emit follows the philosophy that routing execution of tasks over the network is best handled by an external library. Currently, there are two integrations: RQ and Celery.

In addition, you may want to write your own for an as-of-yet unknown backend.

Contents

Using RQ to Distribute Processing

Note

RQ does not currently work on Python 3. Emit should work with it (as it works with Python 2) when Python 3 support is ready.

RQ is a module that makes distributed processing easy. It’s similar to Celery, but simpler and only for Python and Redis. We’ll be using the same example as we did in the Celery example.

_images/graph1.png
Installing

If you have a very recent version of pip, Emit can be installed pre-bundled with RQ by installing with the following extra:

pip install emit[rq-routing]

Otherwise, you’ll need to install these dependencies:

rq>=0.3.4
redis>=2.7.2
Setting up RQ

Create an app.py file for your RQ Router initializaition code to live in:

1
2
3
4
5
6
7
8
9
'simple rq app'
from redis import Redis
from emit.router.rq import RQRouter

import logging

router = RQRouter(redis_connection=Redis(), node_modules=['tasks'])

logging.basicConfig(format='%(levelname)s:%(message)s', level=logging.DEBUG)

The RQRouter class only needs to know what Redis connection you want to use. The rest of the options are specified at the node level.

Next we’ll define (in tasks.py) a function to take a document and emit each word:

@router.node(('word',), entry_point=True)
def emit_words(msg):
    for word in msg.document.strip().split(' '):
        yield word

Without any arguments, RQ tasks will go to the ‘default’ queue. If you don’t want to mess with queues, this will just work.

If you want to set some attributes, however, you can:

@router.node(('word', 'count'), subscribe_to='tasks.emit_words', queue='words')
def tally_word(msg):
    redis = Redis()
    return msg.word, redis.zincrby('celery_emit_example', msg.word, 1)

Enqueued functions for this node will be put on the “words” node. You’ll need to specify which nodes to listen to when running rqworker.

The available parameters:

parameter default effect
queue 'default' specify a queue to route to.
connection supplied connection a different connection - be careful with this, as you’ll need to specify the connection string on the worker
timeout None timeout (in seconds) of a task
result_ttl 500 TTL (in seconds) of results
Running the Graph

We just need to start the RQ worker:

rqworker default words

And enter the following on the command line to start something fun processing (if you’d like, the relevant code is in examples/rq/kickoff.py in the project directory, start it and get a prompt with ipython -i kickoff.py):

from app import router
import random
words = 'the rain in spain falls mainly on the plain'.split(' ')
router(document=' '.join(random.choice(words) for i in range(50)))

And you should see the rqworker window quickly scrolling by with updated totals. Run the command a couple more times, if you like, and you’ll see the totals keep going up.

Performance

Because of the way RQ forks tasks, the graph is rebuilt for every task. To speed up this process, do it once on worker initialization. You can use this snippet (adapted from the RQ worker documentation)

#!/usr/bin/env python
import sys
import rq

# Preload libraries
from app import router
router.resolve_node_modules()

# Provide queue names to listen to as arguments to this script,
# similar to rqworker
with rq.Connection():
    qs = map(rq.Queue, sys.argv[1:]) or [rq.Queue()]

    w = rq.Worker(qs)
    w.work()

Using Celery to Distribute Processing

Warning

Celery doesn’t work quite right under Python 3.3. It works fine under 2.6-3.2 and pypy. Follow Bug 1107 on Celery for progress.

Emit makes it simple to use celery to distribute realtime processing across many worker nodes. To demonstrate this, we’ll be scaling our quickstart example

We’ll be making, in essence, this graph:

_images/graph1.png
Installing

If you have a very recent version of pip, Emit can be installed pre-bundled with celery by installing with the following extra:

pip install emit[celery-routing]

Otherwise, you’ll need celery>=3.0.13, as well as the libraries for whatever broker you’ll be using.

Setting up Celery

Create an app.py file for your celery initializaition code to live in:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
'simple celery app'
from celery import Celery
from emit.router.celery import CeleryRouter

import logging

app = Celery(
    'celery_emit_example',
    broker='redis://'
)
app.conf.update(
    CELERY_IMPORTS=('tasks',)
)

router = CeleryRouter(celery_task=app.task, node_modules=['tasks'])

logging.basicConfig(format='%(levelname)s:%(message)s', level=logging.DEBUG)

Take note that Router is initialized using the default celery task in this case. This is probably the best way to do it, since per-task settings should belong in the task (possible in Emit’s decorator), and app-level configuration should be on the app object (as on line 10).

Next we’ll define (in tasks.py) a function to take a document and emit each word:

@router.node(('word',), entry_point=True)
def emit_words(msg):
    for word in msg.document.strip().split(' '):
        yield word

We don’t have to give any special syntax to get these tasks to work with celery: since we specified it in the router, they just do.

However, if you want to give special celery attributes to a particular function, you can do that too:

@router.node(('word', 'count'), subscribe_to='tasks.emit_words', celery_task=app.task(rate_limit='5/s'))
def tally_word(msg):
    redis = Redis()
    return msg.word, redis.zincrby('celery_emit_example', msg.word, 1)

Obviously rate limiting to 5 per second in this case is a bit contrived, but you get the general idea: it’s easy to configure tasks within the decorator by passing in the celery decorator.

The available parameters:

parameter default effect
celery_task None override the supplied celery task with a node-specific tas
Running the Graph

We’ll need to boot up the celery daemon:

celery worker -A app.app -l INFO -E

And enter the following on the command line to start something fun processing (if you’d like, the relevant code is in examples/celery/kickoff.py in the project directory, start it and get a prompt with ipython -i kickoff.py):

from app import router
import random
words = 'the rain in spain falls mainly on the plain'.split(' ')
router(document=' '.join(random.choice(words) for i in range(50)))

You should get something like the following:

({'word': 'the'},
 {'word': 'spain'},
 {'word': 'in'},
 # ...
 {'word': 'falls'},
 {'word': 'falls'},
 {'word': 'mainly'})

And you should see the celery window quickly scrolling by with updated totals. Run the command a couple more times, if you like, and you’ll see the totals keep going up.

Extending Router

To extend emit.Router (for example, to add a new dispatch backend) it’s most helpful to override the following methods:

__init__(self, your_args, *args, **kwargs)
This is the __init__ pattern used by the current dispatch backends.
dispatch(origin, destination, message)
Do dispatching. Typically passes along origin (as _origin) with the message.
wrap_node(node, options)
Given a wrapped function (node), do additional processing on the function or node. Unhandled arguments to Router.node are passed as a dictionary as options.
Example

See the following example (the current RQRouter implementation):

class RQRouter(Router):
    'Router specifically for RQ routing'
    def __init__(self, redis_connection, *args, **kwargs):
        '''\
        Specific routing when using RQ

        :param redis_connection: a redis connection to send to all the tasks
                                 (can be overridden in :py:meth:`Router.node`.)
        :type redis_connection: :py:class:`redis.Redis`
        '''
        super(RQRouter, self).__init__(*args, **kwargs)
        self.redis_connection = redis_connection
        self.logger.debug('Initialized RQ Router')

    def dispatch(self, origin, destination, message):
        'dispatch through RQ'
        func = self.functions[destination]
        self.logger.debug('enqueueing %r', func)
        return func.delay(_origin=origin, **message)

    def wrap_node(self, node, options):
        '''
        we have the option to construct nodes here, so we can use different
        queues for nodes without having to have different queue objects.
        '''
        job_kwargs = {
            'queue': options.get('queue', 'default'),
            'connection': options.get('connection', self.redis_connection),
            'timeout': options.get('timeout', None),
            'result_ttl': options.get('result_ttl', 500),
        }

        return job(**job_kwargs)(node)

Using Emit in Other Languages

You can use Emit in other languages through the multilang API.

Defining Tasks

We’re going to define a node that takes a number and emits each integer in that range. Let’s do it with Ruby! (why not?)

require "json"
message = JSON.parse(STDIN.read)

message["count"].times do |i|
  puts i.to_json
end

(the equivalent in Python is in examples/multilang/test.py)

The messages passed in and out are expected to be in JSON format. Output from the functions should be json strings separated by newlines.

Creating a Node

We’ll be subclassing emit.multilang.ShellNode to tell emit how to execute our task:

@router.node(('n',))
class RubyShellNode(ShellNode):
    command = 'bundle exec ruby test.rb'

After that, you can call your node and subscribe as normal.

Terminology

Graph
A directed graph. Not actually implemented as an object, but referenced throughout the project as the final construct of a router, nodes, and subscriptions. You can generate an image of the graph using the emit-digraph utility.
Router
An object (implemented in emit.router.Router or subclasses) that keeps references to functions and their names and handles dispatch. It generally knows where everything is and where it’s going.
Node
A function or callable class that receives messages, processes them in its own way, and passes them on down the graph. In this sense, this output could be called a “stream”.
Subscription/Route
An edge in the graph. It is directed, and so only flows one way. Circular subscriptions can be created (by two nodes subscribing to each other’s streams), but they have a high probability of creating an infinite loop and so should be used carefully. Subscriptions also can exist as a special case for an entry point to the graph.

Regex Routing

You can subscribe to arbitrary node’s output by providing a regular expression. In this example, we’ll use Redis’ pubsub capabilities to notify an external receiver of all tasks passing through the graph.

The product of this example is in examples/regex/graph.py.

First, we’ll create a function that yields multiple return values. In this case, we’re going to naively parse a HTTP querystring.

@router.node(('key', 'value'), entry_point=True)
def parse_querystring(msg):
    'parse a querystring into keys and values'
    for part in msg.querystring.strip().lstrip('?').split('&'):
        key, value = part.split('=')
        yield key, value

Now we’re going to count keys and values:

@router.node(('key', 'value', 'count'), prefix('parse_querystring'))
def count_keyval(msg):
    count = redis.zincrby('querystring_count.%s' % msg.key, msg.value, 1)
    return msg.key, msg.value, count

Next, we’ll make a function that publishes to Redis on every message:

@router.node(tuple(), '.+')
def notify_on_emit(msg):
    redis.publish('notify_on_emit.%s' % msg._origin, msg.as_json())
    return NoResult

Now, when you call router(querystring='?a=1&b=2&c=3'), notify_on_emit will publish seven messages: three with origin “graph.parse_querystring”, three with origin “graph.count_keyval”, and one with origin “__entry_point”. The graph ends up looking like this:

_images/graph3.png

You can also specify ignores in Router.node, which can cut a little fat out of an otherwise greedy regex.

Command Line Utilities

emit_digraph - Generate Graph Images

Emit ships with a command-line program to inspect a graph: emit_digraph. Use it to generate graphs like this:

_images/graph.png

emit_digraph will output the code graphviz needs to properly generate the graph. (You’ll need graphviz installed on your machine for this to render properly.) To use it, pass it the path of your router. (for example, emit_digraph app.router in the Celery example.) The output should look something like this:

digraph router {
"tasks.clean_words" -> "tasks.tally_words";
"tasks.clean_text" -> "tasks.clean_words";
"tasks.tweet_text" -> "tasks.count_messages";
"__entry_point" -> "tasks.tweet_text";
}

to make graphviz generate a PNG of the graph, pipe it into the following command:

emit_digraph app.router | dot -T png -o graph.png

Logging

Emit is set up to handle logging using Python’s standard logger. It currently uses the following levels:

  • DEBUG: task registration and calls - very verbose
  • INFO: route registration, receipts

So far there’s been no need for anything above INFO, but that may change in the future.

Setting Up Logging

In some file (I recommend the file where the router is initialized, but your project may vary) insert the following lines:

import logging
logging.basicConfig(format='%(levelname)s:%(message)s', level=logging.DEBUG) # or INFO etc.

Setting Up Logging in Django

In your logging config, add a logger for “emit”. Like so:

LOGGING = {
    # snip formatters, filters, handlers, etc
    'loggers': {
        # other loggers here
        'emit': {
            'handlers': ['console'],
            'level': 'INFO',
        }
    }
}

Testing

Testing your functions is pretty easy. Just call Router.disable_routing. Something like this:

from unittest import TestCase
from yourapp.app import router
from yourapp.tasks import do_task

class DoTaskTests(TestCase):
    def setUp(self):
        router.disable_routing()

    def test_blah(self):
        assert True

To re-enable routing, you’d call Router.enable_routing.

API Documentation

Router

class emit.router.core.Router(message_class=None, node_modules=None, node_package=None)

A router object. Holds routes and references to functions for dispatch

__init__(message_class=None, node_modules=None, node_package=None)

Create a new router object. All parameters are optional.

Parameters:
  • message_class (emit.message.Message or subclass) – wrapper class for messages passed to nodes
  • node_modules (a list of str, or None.) – a list of modules that contain nodes
  • node_package (str, or None.) – if any node_modules are relative, the path to base off of.
Exceptions:

None

Returns:

None

__call__(**kwargs)

Route a message to all nodes marked as entry points.

Note

This function does not optionally accept a single argument (dictionary) as other points in this API do - it must be expanded to keyword arguments in this case.

node(fields, subscribe_to=None, entry_point=False, ignore=None, emit_immediately=False, **wrapper_options)

Decorate a function to make it a node.

Note

decorating as a node changes the function signature. Nodes should accept a single argument, which will be a emit.message.Message. Nodes can be called directly by providing a dictionary argument or a set of keyword arguments. Other uses will raise a TypeError.

Parameters:
  • fields (ordered iterable of str) – fields that this function returns
  • subscribe_to (str or iterable of str) – functions in the graph to subscribe to. These indicators can be regular expressions.
  • ignore (str or iterable of str) – functions in the graph to ignore (also uses regular expressions.) Useful for ignoring specific functions in a broad regex.
  • entry_point (bool) – Set to True to mark this as an entry point - that is, this function will be called when the router is called directly.
  • emit_immediately (bool) – route generator’s messages immediately, instead of waiting for the entire list of values

In addition to all of the above, you can define a wrap_node function on a subclass of Router, which will need to receive node and an options dictionary. Any extra options passed to node will be passed down to the options dictionary. See emit.router.CeleryRouter.wrap_node as an example.

Returns:decorated and wrapped function, or decorator if called directly

Examples

Multiple fields:

@router.node(['quotient', 'remainder'])
def division_with_remainder(msg):
    return msg.numer / msg.denom, msg.numer % msg.denom

This function would end up returing a dictionary that looked something like:

{'quotient': 2, 'remainder': 1}

The next node in the graph would recieve a emit.message.Message with “quotient” and “remainder” fields.

Emitting multiple values:

@router.node(['word'])
def parse_document(msg):
    for word in msg.document.clean().split(' '):
        yield word

If the function returns a generator, Emit will gather the values together and make sure the generator exits cleanly before returning (but this may change in the future via a flag.) Therefore, the return value will look like this:

({'word': "I've"},
 {'word': 'got'},
 {'word': 'a'},
 {'word': 'lovely'},
 {'word': 'bunch'},
 {'word': 'of'},
 {'word': 'coconuts'})

Each message in the tuple will be passed on individually in the graph.

add_entry_point(destination)

Add an entry point

Parameters:destination (str) – node to route to initially
disable_routing()

disable routing (usually for testing purposes)

dispatch(origin, destination, message)

dispatch a message to a named function

Parameters:
  • destination (str) – destination to dispatch to
  • message (emit.message.Message or subclass) – message to dispatch
enable_routing()

enable routing (after calling disable_routing)

get_message_from_call(*args, **kwargs)

Get message object from a call.

Raises:TypeError (if the format is not what we expect)

This is where arguments to nodes are turned into Messages. Arguments are parsed in the following order:

  • A single positional argument (a dict)
  • No positional arguments and a number of keyword arguments
get_name(func)

Get the name to reference a function by

Parameters:func (callable) – function to get the name of
regenerate_routes()

regenerate the routes after a new route is added

register(name, func, fields, subscribe_to, entry_point, ignore)

Register a named function in the graph

Parameters:
  • name (str) – name to register
  • func (callable) – function to remember and call

fields, subscribe_to and entry_point are the same as in Router.node().

register_ignore(origins, destination)

Add routes to the ignore dictionary

Parameters:
  • origins (str or iterable of str) – a number of origins to register
  • destination (str) – where the origins should point to

Ignore dictionary takes the following form:

{'node_a': set(['node_b', 'node_c']),
 'node_b': set(['node_d'])}
register_route(origins, destination)

Add routes to the routing dictionary

Parameters:
  • origins (str or iterable of str or None) – a number of origins to register
  • destination (str) – where the origins should point to

Routing dictionary takes the following form:

{'node_a': set(['node_b', 'node_c']),
 'node_b': set(['node_d'])}
resolve_node_modules()

import the modules specified in init

route(origin, message)

Using the routing dictionary, dispatch a message to all subscribers

Parameters:
  • origin (str) – name of the origin node
  • message (emit.message.Message or subclass) – message to dispatch
wrap_as_node(func, emit_immediately=False)

wrap a function as a node

wrap_result(name, result)

Wrap a result from a function with it’s stated fields

Parameters:
  • name (str) – fields to look up
  • result (anything) – return value from function. Will be converted to tuple.
Raises:

ValueError if name has no associated fields

Returns:

dict

Message

class emit.messages.Message(*args, **kwargs)

Convenient wrapper around a dictionary to provide attribute access

as_dict()

representation of this message as a dictionary

Returns:dict
as_json()

representation of this message as a json object

Returns:str
class emit.messages.NoResult

single value to return from a node to stop further processing

Multilang

class emit.multilang.ShellNode

callable object to wrap communication to a node in another language

to use this, subclass ShellNode, providing “command”. Decorate it however you feel like.

Messages will be passed in on lines in msgpack format. This class expects similar output: msgpack messages separated by a newline.

__call__(msg)

call the command specified, processing output

deserialize(msg)

deserialize output to a Python object

get_command()

get the command as a list

get_cwd()

get directory to change to before running the command

Changelog

0.5.0

  • Nodes which return single values will now be wrapped in a tuple, for consistency with generator nodes. Routing will proceed as before.
  • You can now pass “emit_immediately” into a node definition to immediately route your message, without waiting for the rest of the generator to finish.

0.4.0

  • Optional install bundles for installing with RQ or Celery
  • Move modules around to make API more consistent. Notably, emit.router.Router is now in emit.router.core with RQ and Celery backends in emit.router.rq and emit.router.celery, respectively.
  • Huge cleanup of codebase in general, especially test suite and setup.py.

0.3.0

0.2.0

  • New argument for node: ignores. Pass it some regex to ignore items in otherwise broad subscriptions.
  • Add support for Python 2.6

0.1.0

  • Initial Release to PyPI

Supported Pythons:

Indices and tables

License

Copyright (c) 2012-2013 Brian Hicks

Permission is hereby granted, free of charge, to any person obtaining a copy of
this software and associated documentation files (the "Software"), to deal in
the Software without restriction, including without limitation the rights to
use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies
of the Software, and to permit persons to whom the Software is furnished to do
so, subject to the following conditions:

The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.

THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.