API Documentation

Router

class emit.router.core.Router(message_class=None, node_modules=None, node_package=None)[source]

A router object. Holds routes and references to functions for dispatch

__init__(message_class=None, node_modules=None, node_package=None)[source]

Create a new router object. All parameters are optional.

Parameters:
  • message_class (emit.message.Message or subclass) – wrapper class for messages passed to nodes
  • node_modules (a list of str, or None.) – a list of modules that contain nodes
  • node_package (str, or None.) – if any node_modules are relative, the path to base off of.
Exceptions :

None

Returns:

None

__call__(**kwargs)[source]

Route a message to all nodes marked as entry points.

Note

This function does not optionally accept a single argument (dictionary) as other points in this API do - it must be expanded to keyword arguments in this case.

node(fields, subscribe_to=None, entry_point=False, ignore=None, **wrapper_options)[source]

Decorate a function to make it a node.

Parameters:
  • fields (ordered iterable of str) – fields that this function returns
  • subscribe_to (str or iterable of str) – functions in the graph to subscribe to. These indicators can be regular expressions.
  • ignore (str or iterable of str) – functions in the graph to ignore (also uses regular expressions.) Useful for ignoring specific functions in a broad regex.
  • entry_point (bool) – Set to True to mark this as an entry point - that is, this function will be called when the router is called directly.

In addition to all of the above, you can define a wrap_node function on a subclass of Router, which will need to receive node and an options dictionary. Any extra options passed to node will be passed down to the options dictionary. See emit.router.CeleryRouter.wrap_node as an example.

Returns:decorated and wrapped function, or decorator if called directly

Examples

Multiple fields:

@router.node(['quotient', 'remainder'])
def division_with_remainder(msg):
    return msg.numer / msg.denom, msg.numer % msg.denom

This function would end up returing a dictionary that looked something like:

{'quotient': 2, 'remainder': 1}

The next node in the graph would recieve a emit.message.Message with “quotient” and “remainder” fields.

Emitting multiple values:

@router.node(['word'])
def parse_document(msg):
    for word in msg.document.clean().split(' '):
        yield word

If the function returns a generator, Emit will gather the values together and make sure the generator exits cleanly before returning (but this may change in the future via a flag.) Therefore, the return value will look like this:

({'word': "I've"},
 {'word': 'got'},
 {'word': 'a'},
 {'word': 'lovely'},
 {'word': 'bunch'},
 {'word': 'of'},
 {'word': 'coconuts'})

Each message in the tuple will be passed on individually in the graph.

add_entry_point(destination)[source]

Add an entry point

Parameters:destination (str) – node to route to initially
disable_routing()[source]

disable routing (usually for testing purposes)

dispatch(origin, destination, message)[source]

dispatch a message to a named function

Parameters:
  • destination (str) – destination to dispatch to
  • message (emit.message.Message or subclass) – message to dispatch
enable_routing()[source]

enable routing (after calling disable_routing)

get_message_from_call(*args, **kwargs)[source]

Get message object from a call.

Raises :TypeError (if the format is not what we expect)

This is where arguments to nodes are turned into Messages. Arguments are parsed in the following order:

  • A single positional argument (a dict)
  • No positional arguments and a number of keyword arguments
get_name(func)[source]

Get the name to reference a function by

Parameters:func (callable) – function to get the name of
regenerate_routes()[source]

regenerate the routes after a new route is added

register(name, func, fields, subscribe_to, entry_point, ignore)[source]

Register a named function in the graph

Parameters:
  • name (str) – name to register
  • func (callable) – function to remember and call

fields, subscribe_to and entry_point are the same as in Router.node().

register_ignore(origins, destination)[source]

Add routes to the ignore dictionary

Parameters:
  • origins (str or iterable of str) – a number of origins to register
  • destination (str) – where the origins should point to

Ignore dictionary takes the following form:

{'node_a': set(['node_b', 'node_c']),
 'node_b': set(['node_d'])}
register_route(origins, destination)[source]

Add routes to the routing dictionary

Parameters:
  • origins (str or iterable of str or None) – a number of origins to register
  • destination (str) – where the origins should point to

Routing dictionary takes the following form:

{'node_a': set(['node_b', 'node_c']),
 'node_b': set(['node_d'])}
resolve_node_modules()[source]

import the modules specified in init

route(origin, message)[source]

Using the routing dictionary, dispatch a message to all subscribers

Parameters:
  • origin (str) – name of the origin node
  • message (emit.message.Message or subclass) – message to dispatch
wrap_as_node(func)[source]

wrap a function as a node

wrap_result(name, result)[source]

Wrap a result from a function with it’s stated fields

Parameters:
  • name (str) – fields to look up
  • result (anything) – return value from function. Will be converted to tuple.
Raises :

ValueError if name has no associated fields

Returns:

dict

Message

class emit.messages.Message(*args, **kwargs)[source]

Convenient wrapper around a dictionary to provide attribute access

as_dict()[source]

representation of this message as a dictionary

Returns:dict
as_json()[source]

representation of this message as a json object

Returns:str
class emit.messages.NoResult[source]

single value to return from a node to stop further processing

Multilang

class emit.multilang.ShellNode[source]

callable object to wrap communication to a node in another language

to use this, subclass ShellNode, providing “command”. Decorate it however you feel like.

Messages will be passed in on lines in msgpack format. This class expects similar output: msgpack messages separated by a newline.

__call__(msg)[source]

call the command specified, processing output

deserialize(msg)[source]

deserialize output to a Python object

get_command()[source]

get the command as a list

get_cwd()[source]

get directory to change to before running the command

Project Versions

Table Of Contents

Previous topic

Testing

Next topic

Changelog

This Page