Extending Router¶
To extend emit.Router (for example, to add a new dispatch backend) it’s most helpful to override the following methods:
- __init__(self, your_args, *args, **kwargs)
- This is the __init__ pattern used by the current dispatch backends.
- dispatch(origin, destination, message)
- Do dispatching. Typically passes along origin (as _origin) with the message.
- wrap_node(node, options)
- Given a wrapped function (node), do additional processing on the function or node. Unhandled arguments to Router.node are passed as a dictionary as options.
Example¶
See the following example (the current RQRouter implementation):
class RQRouter(Router):
'Router specifically for RQ routing'
def __init__(self, redis_connection, *args, **kwargs):
'''\
Specific routing when using RQ
:param redis_connection: a redis connection to send to all the tasks
(can be overridden in :py:meth:`Router.node`.)
:type redis_connection: :py:class:`redis.Redis`
'''
super(RQRouter, self).__init__(*args, **kwargs)
self.redis_connection = redis_connection
self.logger.debug('Initialized RQ Router')
def dispatch(self, origin, destination, message):
'dispatch through RQ'
func = self.functions[destination]
self.logger.debug('enqueueing %r', func)
return func.delay(_origin=origin, **message)
def wrap_node(self, node, options):
'''
we have the option to construct nodes here, so we can use different
queues for nodes without having to have different queue objects.
'''
job_kwargs = {
'queue': options.get('queue', 'default'),
'connection': options.get('connection', self.redis_connection),
'timeout': options.get('timeout', None),
'result_ttl': options.get('result_ttl', 500),
}
return job(**job_kwargs)(node)