Extending Router

To extend emit.Router (for example, to add a new dispatch backend) it’s most helpful to override the following methods:

__init__(self, your_args, *args, **kwargs)
This is the __init__ pattern used by the current dispatch backends.
dispatch(origin, destination, message)
Do dispatching. Typically passes along origin (as _origin) with the message.
wrap_node(node, options)
Given a wrapped function (node), do additional processing on the function or node. Unhandled arguments to Router.node are passed as a dictionary as options.

Example

See the following example (the current RQRouter implementation):

class RQRouter(Router):
    'Router specifically for RQ routing'
    def __init__(self, redis_connection, *args, **kwargs):
        '''\
        Specific routing when using RQ

        :param redis_connection: a redis connection to send to all the tasks
                                 (can be overridden in :py:meth:`Router.node`.)
        :type redis_connection: :py:class:`redis.Redis`
        '''
        super(RQRouter, self).__init__(*args, **kwargs)
        self.redis_connection = redis_connection
        self.logger.debug('Initialized RQ Router')

    def dispatch(self, origin, destination, message):
        'dispatch through RQ'
        func = self.functions[destination]
        self.logger.debug('enqueueing %r', func)
        return func.delay(_origin=origin, **message)

    def wrap_node(self, node, options):
        '''
        we have the option to construct nodes here, so we can use different
        queues for nodes without having to have different queue objects.
        '''
        job_kwargs = {
            'queue': options.get('queue', 'default'),
            'connection': options.get('connection', self.redis_connection),
            'timeout': options.get('timeout', None),
            'result_ttl': options.get('result_ttl', 500),
        }

        return job(**job_kwargs)(node)

Project Versions

Table Of Contents

Previous topic

Using Celery to Distribute Processing

Next topic

Using Emit in Other Languages

This Page