Source code for emit.router.core

'router for emit'
from functools import wraps
import importlib
import logging
import re
from types import GeneratorType

from emit.messages import Message, NoResult


[docs]class Router(object): 'A router object. Holds routes and references to functions for dispatch'
[docs] def __init__(self, message_class=None, node_modules=None, node_package=None): '''\ Create a new router object. All parameters are optional. :param message_class: wrapper class for messages passed to nodes :type message_class: :py:class:`emit.message.Message` or subclass :param node_modules: a list of modules that contain nodes :type node_modules: a list of :py:class:`str`, or ``None``. :param node_package: if any node_modules are relative, the path to base off of. :type node_package: :py:class:`str`, or ``None``. :exceptions: None :returns: None ''' self.routes = {} self.names = set() self.regexes = {} self.ignore_regexes = {} self.fields = {} self.functions = {} self.message_class = message_class or Message # manage imported packages, lazily importing before the first message # is routed. self.resolved_node_modules = [] self.node_modules = node_modules or [] self.node_package = node_package self.logger = logging.getLogger(__name__ + '.Router') self.logger.debug('Initialized Router') self.routing_enabled = True
[docs] def __call__(self, **kwargs): '''\ Route a message to all nodes marked as entry points. .. note:: This function does not optionally accept a single argument (dictionary) as other points in this API do - it must be expanded to keyword arguments in this case. ''' self.logger.info('Calling entry point with %r', kwargs) self.route('__entry_point', kwargs)
[docs] def wrap_as_node(self, func): 'wrap a function as a node' name = self.get_name(func) @wraps(func) def wrapped(*args, **kwargs): 'wrapped version of func' message = self.get_message_from_call(*args, **kwargs) self.logger.info('calling "%s" with %r', name, message) result = func(message) # functions can return multiple values ("emit" multiple times) # by yielding instead of returning. Handle this case by making # a list of the results and processing them all after the # generator successfully exits. If we were to process them as # they came out of the generator, we might get a partially # processed input sent down the graph. This may be possible in # the future via a flag. if isinstance(result, GeneratorType): results = [ self.wrap_result(name, item) for item in result if item is not NoResult ] self.logger.debug( '%s returned generator yielding %d items', func, len(results) ) [self.route(name, item) for item in results] return tuple(results) # the case of a direct return is simpler. wrap, route, and # return the value. else: if result is NoResult: return result result = self.wrap_result(name, result) self.logger.debug( '%s returned single value %s', func, result ) self.route(name, result) return result return wrapped
[docs] def node(self, fields, subscribe_to=None, entry_point=False, ignore=None, **wrapper_options): '''\ Decorate a function to make it a node. .. note:: decorating as a node changes the function signature. Nodes should accept a single argument, which will be a :py:class:`emit.message.Message`. Nodes can be called directly by providing a dictionary argument or a set of keyword arguments. Other uses will raise a ``TypeError``. :param fields: fields that this function returns :type fields: ordered iterable of :py:class:`str` :param subscribe_to: functions in the graph to subscribe to. These indicators can be regular expressions. :type subscribe_to: :py:class:`str` or iterable of :py:class:`str` :param ignore: functions in the graph to ignore (also uses regular expressions.) Useful for ignoring specific functions in a broad regex. :type ignore: :py:class:`str` or iterable of :py:class:`str` :param entry_point: Set to ``True`` to mark this as an entry point - that is, this function will be called when the router is called directly. :type entry_point: :py:class:`bool` In addition to all of the above, you can define a ``wrap_node`` function on a subclass of Router, which will need to receive node and an options dictionary. Any extra options passed to node will be passed down to the options dictionary. See :py:class:`emit.router.CeleryRouter.wrap_node` as an example. :returns: decorated and wrapped function, or decorator if called directly ''' def outer(func): 'outer level function' # create a wrapper function self.logger.debug('wrapping %s', func) wrapped = self.wrap_as_node(func) if hasattr(self, 'wrap_node'): self.logger.debug('wrapping node "%s" in custom wrapper', wrapped) wrapped = self.wrap_node(wrapped, wrapper_options) # register the task in the graph name = self.get_name(func) self.register( name, wrapped, fields, subscribe_to, entry_point, ignore ) return wrapped return outer
[docs] def resolve_node_modules(self): 'import the modules specified in init' if not self.resolved_node_modules: try: self.resolved_node_modules = [ importlib.import_module(mod, self.node_package) for mod in self.node_modules ] except ImportError: self.resolved_node_modules = [] raise return self.resolved_node_modules
[docs] def get_message_from_call(self, *args, **kwargs): '''\ Get message object from a call. :raises: :py:exc:`TypeError` (if the format is not what we expect) This is where arguments to nodes are turned into Messages. Arguments are parsed in the following order: - A single positional argument (a :py:class:`dict`) - No positional arguments and a number of keyword arguments ''' if len(args) == 1 and isinstance(args[0], dict): # then it's a message self.logger.debug('called with arg dictionary') result = args[0] elif len(args) == 0 and kwargs != {}: # then it's a set of kwargs self.logger.debug('called with kwargs') result = kwargs else: # it's neither, and we don't handle that self.logger.error( 'get_message_from_call could not handle "%r", "%r"', args, kwargs ) raise TypeError('Pass either keyword arguments or a dictionary argument') return self.message_class(result)
[docs] def register(self, name, func, fields, subscribe_to, entry_point, ignore): ''' Register a named function in the graph :param name: name to register :type name: :py:class:`str` :param func: function to remember and call :type func: callable ``fields``, ``subscribe_to`` and ``entry_point`` are the same as in :py:meth:`Router.node`. ''' self.fields[name] = fields self.functions[name] = func self.register_route(subscribe_to, name) if ignore: self.register_ignore(ignore, name) if entry_point: self.add_entry_point(name) self.logger.info('registered %s', name)
[docs] def add_entry_point(self, destination): '''\ Add an entry point :param destination: node to route to initially :type destination: str ''' self.routes.setdefault('__entry_point', set()).add(destination) return self.routes['__entry_point']
[docs] def register_route(self, origins, destination): ''' Add routes to the routing dictionary :param origins: a number of origins to register :type origins: :py:class:`str` or iterable of :py:class:`str` or None :param destination: where the origins should point to :type destination: :py:class:`str` Routing dictionary takes the following form:: {'node_a': set(['node_b', 'node_c']), 'node_b': set(['node_d'])} ''' self.names.add(destination) self.logger.debug('added "%s" to names', destination) origins = origins or [] # remove None if not isinstance(origins, list): origins = [origins] self.regexes.setdefault(destination, [re.compile(origin) for origin in origins]) self.regenerate_routes() return self.regexes[destination]
[docs] def register_ignore(self, origins, destination): ''' Add routes to the ignore dictionary :param origins: a number of origins to register :type origins: :py:class:`str` or iterable of :py:class:`str` :param destination: where the origins should point to :type destination: :py:class:`str` Ignore dictionary takes the following form:: {'node_a': set(['node_b', 'node_c']), 'node_b': set(['node_d'])} ''' if not isinstance(origins, list): origins = [origins] self.ignore_regexes.setdefault(destination, [re.compile(origin) for origin in origins]) self.regenerate_routes() return self.ignore_regexes[destination]
[docs] def regenerate_routes(self): 'regenerate the routes after a new route is added' for destination, origins in self.regexes.items(): # we want only the names that match the destination regexes. resolved = [ name for name in self.names if name is not destination and any(origin.search(name) for origin in origins) ] ignores = self.ignore_regexes.get(destination, []) for origin in resolved: destinations = self.routes.setdefault(origin, set()) if any(ignore.search(origin) for ignore in ignores): self.logger.info('ignoring route "%s" -> "%s"', origin, destination) try: destinations.remove(destination) self.logger.debug('removed "%s" -> "%s"', origin, destination) except KeyError: pass continue if destination not in destinations: self.logger.info('added route "%s" -> "%s"', origin, destination) destinations.add(destination)
[docs] def disable_routing(self): 'disable routing (usually for testing purposes)' self.routing_enabled = False
[docs] def enable_routing(self): 'enable routing (after calling ``disable_routing``)' self.routing_enabled = True
[docs] def route(self, origin, message): '''\ Using the routing dictionary, dispatch a message to all subscribers :param origin: name of the origin node :type origin: :py:class:`str` :param message: message to dispatch :type message: :py:class:`emit.message.Message` or subclass ''' # side-effect: we have to know all the routes before we can route. But # we can't resolve them while the object is initializing, so we have to # do it just in time to route. self.resolve_node_modules() if not self.routing_enabled: return subs = self.routes.get(origin, set()) for destination in subs: self.logger.debug('routing "%s" -> "%s"', origin, destination) self.dispatch(origin, destination, message)
[docs] def dispatch(self, origin, destination, message): '''\ dispatch a message to a named function :param destination: destination to dispatch to :type destination: :py:class:`str` :param message: message to dispatch :type message: :py:class:`emit.message.Message` or subclass ''' func = self.functions[destination] self.logger.debug('calling %r directly', func) return func(_origin=origin, **message)
[docs] def wrap_result(self, name, result): ''' Wrap a result from a function with it's stated fields :param name: fields to look up :type name: :py:class:`str` :param result: return value from function. Will be converted to tuple. :type result: anything :raises: :py:exc:`ValueError` if name has no associated fields :returns: :py:class:`dict` ''' if not isinstance(result, tuple): result = tuple([result]) try: return dict(zip(self.fields[name], result)) except KeyError: msg = '"%s" has no associated fields' self.logger.exception(msg, name) raise ValueError(msg % name)
[docs] def get_name(self, func): ''' Get the name to reference a function by :param func: function to get the name of :type func: callable ''' if hasattr(func, 'name'): return func.name return '%s.%s' % ( func.__module__, func.__name__ )

Project Versions

This Page