# -*- encoding: utf-8 -*-
import traceback
from functools import wraps
import collections
# from collections import namedtupla
import pika
import logging
import threading
import time
import json
import sys
import os
# add amqppy path
sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '..')))
import amqppy
from amqppy import utils
logger = logging.getLogger(__name__)
_ChannelExchange = collections.namedtuple('ChannelExchange', ['channel', 'exchange'])
[docs]class Worker(object):
""" This class handles a worker that listens for incoming Topics and Rpc requests.
:param str broker: The URL for connection to RabbitMQ. Eg: 'amqp://serviceuser:password@rabbit.host:5672//'
"""
def __init__(self, broker, heartbeat_sec=None):
# map(callback) -> (channel, exchange)
self._callbacks = {}
self.quit = False
self.thread = None
self._conn = None
try:
logger.debug("connecting to broker \'{}\'".format(broker))
self._conn = utils._create_connection(broker=broker, heartbeat_sec=heartbeat_sec)
finally:
if self._conn and self._conn.is_open:
logger.debug("connected")
else:
error = "Cannot connect to broker \'{}\'".format(broker)
logger.error(error)
raise amqppy.BrokenConnection(error)
def __del__(self):
# logger.debug("consumer worker destructor")
self._close()
def _close(self):
for callback in self._callbacks:
if self._callbacks[callback].channel and self._callbacks[callback].channel.is_open:
self._callbacks[callback].channel.close()
self._callbacks = {}
if self._conn and self._conn.is_open:
logger.debug('closing connection')
self._conn.close()
self._conn = None
logger.debug('connection closed')
def _create_channel(self, exchange, callback):
try:
channel = self._conn.channel()
channel.exchange_declare(exchange=exchange, exchange_type="topic", passive=True)
self._callbacks[callback] = _ChannelExchange(channel, exchange)
return channel
except Exception:
channel = self._conn.channel()
channel.exchange_declare(exchange=exchange, exchange_type="topic", passive=False, durable=True, auto_delete=True)
self._callbacks[callback] = _ChannelExchange(channel, exchange)
return channel
[docs] def stop(self):
""" Stops listening and close all channels and the connection
"""
logger.debug("stop")
self.quit = True
self._join()
self._close()
[docs] def add_request(self, routing_key, on_request_callback, exchange=amqppy.AMQP_EXCHANGE, durable=False, auto_delete=True,
exclusive=False):
""" Registers a new consumer for a RPC reply task. These tasks will be executed when a RPC request is invoked by
publisher.Rpc.request().
:param str rounting_key: It defines the subscription interest. In terms of AMQP the routing key to bind on
:param method on_request_callback: Called when a Rpc request is invoked, it should return the reply.
:param str exchange: The exchange you want to publish the message.
:param bool durable: Queue messages survives a reboot of RabbitMQ.
:param bool auto_delete: Queues will auto-delete after use.
:param bool exclusive: Ensures that is the unique consumer
"""
logger.debug("adding request, exchange: {}, topic: {} --> {}".format(exchange, routing_key, on_request_callback))
channel = self._create_channel(exchange, on_request_callback)
channel.queue_declare(queue=routing_key, durable=durable, auto_delete=auto_delete)
channel.queue_bind(queue=routing_key, exchange=exchange, routing_key=routing_key)
channel.confirm_delivery()
try:
channel.basic_consume(
exclusive=exclusive,
queue=routing_key,
consumer_callback=self._profiler_wrapper_request(on_request_callback),
no_ack=True)
except pika.exceptions.ChannelClosed as e:
if "in exclusive use" in str(e):
raise amqppy.ExclusiveQueue(str(e))
else:
raise e
return self # Fluent pattern
def _profiler_wrapper_request(self, on_request_callback):
@wraps(on_request_callback)
def _wrapper(*args, **kwargs):
logger.debug("request \'{}\'.*args: {}".format(on_request_callback.__name__, args))
# process request arguments
deliver = args[1]
properties = args[2]
message = args[3]
# convert message body to string
# output.decode('string-escape').strip('"')
if isinstance(message, bytes):
message = message.decode("utf-8")
if not isinstance(message, str):
logger.warning("_profiler_wrapper_request, type: {}, body: {}".format(type(message), message))
logger.debug("Starting request \'{}\'".format(on_request_callback.__name__))
# response = on_request_callback(*args, **kwargs)
start = time.time()
rpc_exception = None
try:
response = {
# message is text, it should be converted in dictionary at request func
"result": on_request_callback(exchange=deliver.exchange, routing_key=deliver.routing_key, headers=properties.headers, body=message),
}
except Exception as e:
logger.warning("Exception in request \'{}\', routing_key: {}\n{}".format(on_request_callback.__name__,
deliver.routing_key,
traceback.format_exc()))
rpc_exception = e
response = {
"error": utils._ensure_utf8(rpc_exception)
}
elapsed = time.time() - start
logger.debug('Request \'{}\' finished. Time elapsed: {}'.format(on_request_callback.__name__, elapsed))
# sending response back
channel = self._callbacks[on_request_callback].channel
exchange = self._callbacks[on_request_callback].exchange
routing_key = properties.reply_to
logger.debug('Sending RPC response to routing key: {}'.format(routing_key))
try:
publish_result = channel.basic_publish(
exchange=exchange,
routing_key=routing_key,
properties=pika.BasicProperties(
correlation_id=properties.correlation_id,
content_encoding='utf-8',
content_type='application/json'),
body=json.dumps(response, ensure_ascii=False).encode('utf8'),
mandatory=True)
if not publish_result:
raise amqppy.PublishNotRouted("Request response was not routed")
except amqppy.PublishNotRouted:
# don't raise it
logger.warning("RPC response it has not been published, it might be due to a response waiting timeout")
except Exception:
logger.error('Exception on publish message to routing_key: {}. Exception message: {}'.format(
routing_key, traceback.format_exc()))
logger.debug('RPC response sent.')
# throw exception to the rpc server/consumer
if rpc_exception:
raise rpc_exception
return _wrapper
[docs] def add_topic(self, routing_key, on_topic_callback, queue=None, exclusive=False, exchange=amqppy.AMQP_EXCHANGE, durable=False,
auto_delete=True, no_ack=True, **kwargs):
""" Registers a new consumer for a Topic subscriber. These tasks will be executed when a Topic is published by
publisher.Topic.publish().
:param str rounting_key: The routing key to bind on.
:param method on_topic_callback: Called when a topic is published.
:param str queue: The name of the queue. If it is not provided the queue will be named the same as the 'routing_key'.
:param bool exclusive: Only one consumer is allowed.
:param str exchange: The exchange you want to publish the message.
:param bool durable: Queue messages survives a reboot of RabbitMQ.
:param bool auto_delete: Queues will auto-delete after use.
:param bool no_ack: Tell the broker that ACK reply is not needed. If it is False, an ACK will be sent automatically each \
time a message is consumed unless a amqppy.AbortConsume or amqppy.DeadLetterMessage is raised.
"""
logger.debug("adding topic, exchange: {}, topic: {} --> {}".format(exchange, routing_key, on_topic_callback, kwargs))
self.no_ack = no_ack
channel = self._create_channel(exchange, on_topic_callback)
queue_name = queue if queue else routing_key
channel.queue_declare(queue=queue_name, durable=durable, auto_delete=auto_delete,
arguments=kwargs)
channel.queue_bind(queue=queue_name, exchange=exchange, routing_key=routing_key)
try:
channel.basic_consume(
queue=queue_name,
exclusive=exclusive,
consumer_callback=self._profiler_wrapper_topic(on_topic_callback),
no_ack=no_ack)
except pika.exceptions.ChannelClosed as e:
if "in exclusive use" in str(e):
raise amqppy.ExclusiveQueue(str(e))
else:
raise e
return self # Fluent pattern
def _profiler_wrapper_topic(self, on_topic_callback):
@wraps(on_topic_callback)
def _wrapper(*args, **kwargs):
logger.debug("topic \'{}\'.*args: {}".format(on_topic_callback.__name__, args))
# logger.debug("request \'{}\'.**kwargs: {}".format(on_topic_callback.__name__, kwargs))
# process request arguments
deliver = args[1]
properties = args[2]
message = args[3]
# convert message body to string
if isinstance(message, bytes):
message = message.decode("utf-8")
if not utils._is_string(message):
logger.warning("_profiler_wrapper_topic, message in not string, is of type: {}".format(type(message)))
# logger.debug("Properties vars: {}".format(vars(properties)))
logger.debug("Starting request \'{}\'".format(on_topic_callback.__name__))
start = time.time()
try:
on_topic_callback(exchange=deliver.exchange, routing_key=deliver.routing_key, headers=properties.headers, body=message)
if not self.no_ack:
self._callbacks[on_topic_callback].channel.basic_ack(delivery_tag=deliver.delivery_tag)
logger.debug("ACK sent")
except amqppy.AbortConsume as e:
logger.warning("AbortConsume exception: {}".format(e))
except amqppy.DeadLetterMessage as e:
logger.warning("DeadLetterMessage exception: {}".format(e))
self._callbacks[on_topic_callback].channel.basic_reject(delivery_tag=deliver.delivery_tag, requeue=False)
finally:
elapsed = time.time() - start
logger.debug('Request \'{}\' finished. Time elapsed: {}'.format(on_topic_callback.__name__, elapsed))
return _wrapper
[docs] def run(self):
""" Start worker to listen. This will block the execution until the worker is stopped or an uncaught Exception """
logger.info('Running worker, waiting for the first message...')
try:
while not self.quit:
self._conn.process_data_events(0.5)
finally:
logger.info("exiting from worker run")
[docs] def run_async(self):
""" Start asynchronously worker to listen. The execution thread will follow after this call, hence is not blocked. """
self.thread = threading.Thread(target=self.run)
self.thread.start()
return self # Fluent pattern
def _join(self):
""" Waits until worker has ended """
if self.thread and self.thread.is_alive():
self.thread.join()