Source code for hurricane.amqp.basehandler

import asyncio
import functools
from typing import List

import pika
from pika.adapters.tornado_connection import TornadoConnection

from hurricane.amqp import logger


class _AMQPConsumer:

    """
    This is Hurricane's base AMQP consumer that will handle unexpected interactions
    with the message broker such as channel and connection closures.
    If the message broker closes the connection, this class will stop and indicate
    that reconnection is necessary.
    """

    EXCHANGE_TYPE = None

    def __init__(
        self,
        queue_name: str,
        exchange_name: str,
        host: str,
        port: int,
        vhost: str = None,
        username: str = None,
        password: str = None,
    ):
        """
        Create a new instance of the consumer class, passing in the AMQP
        URL used to connect to broker.
        """
        self.should_reconnect = False
        self.was_consuming = False

        self._queue_name = queue_name
        self._exchange_name = exchange_name
        self._host = host
        self._port = port
        self._vhost = vhost or "/"
        self._username = username
        self._password = password

        self._connection = None
        self._channel = None
        self._closing = False
        self._consumer_tag = None
        self._consuming = False
        self._prefetch_count = 1

    def connect(self) -> TornadoConnection:

        """
        This method connects to the broker, returning the connection handle.
        """

        logger.info(f"Connecting to {self._host}:{self._port}{self._vhost}")
        # set amqp credentials
        if self._username:
            credentials = pika.PlainCredentials(self._username, self._password)
            # set amqp connection parameters
            parameters = pika.ConnectionParameters(
                host=self._host,
                port=self._port,
                virtual_host=self._vhost,
                credentials=credentials,
            )
        else:
            parameters = pika.ConnectionParameters(
                host=self._host,
                port=self._port,
                virtual_host=self._vhost,
            )

        # connect
        connection = TornadoConnection(
            parameters=parameters,
            on_open_callback=self.on_connection_open,
            on_open_error_callback=self.on_connection_open_error,
            on_close_callback=self.on_connection_closed,
        )
        return connection

    def close_connection(self):
        self._consuming = False
        if self._connection.is_closing or self._connection.is_closed:
            logger.info("Connection is closing or already closed")
        else:
            logger.info("Closing connection")
            self._connection.close()

    def on_connection_open(self, _unused_connection: pika.SelectConnection):

        """
        This method is called by pika once the connection to the broker has
        been established.
        """

        logger.info("Connection opened")
        self.open_channel()

    def on_connection_open_error(self, _unused_connection: pika.SelectConnection, err: Exception):

        """
        This method is called by pika if the connection to the broker
        can't be established.
        :param pika.SelectConnection _unused_connection: The connection
        :param Exception err: The error
        """

        logger.error(f"Connection open failed: {err}")
        self.reconnect()

    def on_connection_closed(self, _unused_connection: pika.SelectConnection, reason: Exception):

        """
        This method is invoked by pika when the connection to the broker is
        closed unexpectedly. Since it is unexpected, we will reconnect to
        the broker if it disconnects.
        """

        self._channel = None
        if self._closing:
            self._connection.ioloop.stop()
        else:
            logger.warning("Connection closed, reconnect necessary: %s", reason)
            self.reconnect()

    def reconnect(self):

        """
        Will be invoked if the connection can't be opened or is
        closed. Indicates that a reconnect is necessary then stops the
        ioloop.
        """

        self.should_reconnect = True
        self.stop()

    def open_channel(self):

        """
        Open a new channel with the broker by issuing the Channel.Open RPC
        command. When the broker responds that the channel is open, the
        on_channel_open callback will be invoked by pika.
        """

        logger.info("Creating a new channel")
        self._connection.channel(on_open_callback=self.on_channel_open)

    def on_channel_open(self, channel: pika.channel.Channel):

        """
        This method is invoked by pika when the channel has been opened.
        The channel object is passed in so we can make use of it.
        Since the channel is now open, we'll declare the exchange to use.
        """

        logger.info("Channel opened")
        self._channel = channel
        self.add_on_channel_close_callback()
        self.setup_exchange(self._exchange_name)

    def add_on_channel_close_callback(self):

        """
        This method tells pika to call the on_channel_closed method if
        the broker unexpectedly closes the channel.
        """

        logger.info("Adding channel close callback")
        self._channel.add_on_close_callback(self.on_channel_closed)

    def on_channel_closed(self, channel: pika.channel.Channel, reason: Exception):

        """
        Invoked by pika when the broker unexpectedly closes the channel.
        Channels are usually closed if you attempt to do something that
        violates the protocol, such as re-declare an exchange or queue with
        different parameters. In this case, we'll close the connection
        to shutdown the object.
        """

        logger.warning("Channel %i was closed: %s", channel, reason)
        self.close_connection()

    def setup_exchange(self, exchange_name: str) -> None:

        """
        Setup the exchange on the broker by invoking the Exchange.Declare RPC
        command. When it is complete, the on_exchange_declareok method will
        be invoked by pika.
        """

        logger.info(f"Declaring exchange: {exchange_name}")
        # Note: using functools.partial is not required, it is demonstrating
        # how arbitrary data can be passed to the callback when it is called
        cb = functools.partial(self.on_exchange_declareok, userdata=exchange_name)
        self._channel.exchange_declare(exchange=exchange_name, exchange_type=self.EXCHANGE_TYPE, callback=cb)

    def on_exchange_declareok(self, _unused_frame: pika.frame.Method, userdata: str) -> None:

        """
        Invoked by pika when the broker has finished the Exchange.Declare RPC
        command.
        """

        logger.info("Exchange declared: %s", userdata)
        self.setup_queue(self._queue_name)

    def setup_queue(self, queue_name: str) -> None:

        """
        Setup the queue on the broker by invoking the Queue.Declare RPC
        command. When it is complete, the on_queue_declareok method will
        be invoked by pika.
        :param str|unicode queue_name: The name of the queue to declare.
        """

        logger.info(f"Declaring queue {queue_name}")
        cb = functools.partial(self.on_queue_declareok, userdata=queue_name)
        self._channel.queue_declare(queue=queue_name, callback=cb)

    def get_routing_keys(self, queue_name: str) -> List[str]:

        """
        Generate a list of binding keys for this queue. This method will
        be called from on_queue_declareok in order to bind the declared queue
        on to one or multiple routing keys.
        """

        return []

    def on_queue_declareok(self, _unused_frame: pika.frame.Method, userdata: str) -> None:

        """
        Method invoked by pika when the Queue.Declare RPC call made in
        setup_queue has completed. In this method we will bind the queue
        and exchange together with the routing key by issuing the Queue.Bind
        RPC command. When this command is complete, the on_bindok method will
        be invoked by pika.
        """

        queue_name = userdata
        logger.info(f"Binding to {queue_name}")
        routing_keys = self.get_routing_keys(queue_name)
        if len(routing_keys) == 0:
            # no routing key applicable
            cb = functools.partial(self.on_bindok, queue_name=queue_name)
            self._channel.queue_bind(queue_name, exchange=self._exchange_name, callback=cb)
        else:
            for routing_key in routing_keys:
                cb = functools.partial(self.on_bindok, queue_name=queue_name, routing_key=routing_key)
                self._channel.queue_bind(queue_name, routing_key=routing_key, exchange=self._exchange_name, callback=cb)

    def on_bindok(self, _unused_frame: pika.frame.Method, queue_name: str, routing_key: str = None):

        """
        Invoked by pika when the Queue.Bind method has completed. At this
        point we will set the prefetch count for the channel.
        """

        if routing_key:
            logger.info(f"Queue bound: {queue_name} with routing key {routing_key}")
        else:
            logger.info(f"Queue bound: {queue_name}")
        self.set_qos()

    def set_qos(self) -> None:

        """
        This method sets up the consumer prefetch to only be delivered
        one message at a time. The consumer must acknowledge this message
        before the broker will deliver another one. You should experiment
        with different prefetch values to achieve desired performance.
        """

        self._channel.basic_qos(prefetch_count=self._prefetch_count, callback=self.on_basic_qos_ok)

    def on_basic_qos_ok(self, _unused_frame) -> None:

        """
        Invoked by pika when the Basic.QoS method has completed. At this
        point we will start consuming messages by calling start_consuming
        which will invoke the needed RPC commands to start the process.
        :param pika.frame.Method _unused_frame: The Basic.QosOk response frame
        """

        logger.info(f"QOS set to: {self._prefetch_count}")
        self.start_consuming()

    def start_consuming(self) -> None:

        """
        This method sets up the consumer by first calling
        add_on_cancel_callback so that the object is notified if the broker
        cancels the consumer.
        """

        logger.info("Issuing consumer")
        self.add_on_cancel_callback()
        self._consumer_tag = self._channel.basic_consume(self._queue_name, self.on_message)
        self.was_consuming = True
        self._consuming = True

    def add_on_cancel_callback(self) -> None:

        """
        Add a callback that will be invoked if the broker cancels the consumer
        for some reason. If the broker does cancel the consumer,
        on_consumer_cancelled will be invoked by pika.
        """

        logger.info("Adding consumer cancellation callback")
        self._channel.add_on_cancel_callback(self.on_consumer_cancelled)

    def on_consumer_cancelled(self, method_frame: pika.frame.Method) -> None:

        """
        Invoked by pika when the broker sends a Basic.Cancel for a consumer
        receiving messages.
        """

        logger.info(f"Consumer was cancelled remotely, shutting down: {method_frame}")
        if self._channel:
            self._channel.close()

    def on_message(
        self,
        _unused_channel: pika.channel.Channel,
        basic_deliver: pika.spec.Basic.Deliver,
        properties: pika.spec.BasicProperties,
        body: str,
    ) -> None:

        """
        Invoked by pika when a message is delivered from the broker.
        """

        logger.error(
            "Received message # %s from %s",
            basic_deliver.delivery_tag,
            properties.app_id,
        )
        self.reject_message(basic_deliver.delivery_tag)
        raise NotImplementedError("The on_message method must be implemented by a handler class")

    def acknowledge_message(self, delivery_tag) -> None:

        """
        Acknowledge the message delivery from the broker by sending a
        Basic.Ack RPC method for the delivery tag.
        :param int delivery_tag: The delivery tag from the Basic.Deliver frame
        """

        logger.info("Acknowledging message %s", delivery_tag)
        self._channel.basic_ack(delivery_tag)

    def reject_message(self, delivery_tag, requeue: bool = False) -> None:

        """
        Reject the message delivery from the broker.
        """

        logger.info("Rejecting message %s", delivery_tag)
        self._channel.basic_nack(delivery_tag, requeue=requeue)

    def stop_consuming(self) -> None:

        """
        Tell the broker that you would like to stop consuming by sending the
        Basic.Cancel RPC command.
        """

        if self._channel:
            logger.info("Sending a Basic.Cancel command to the broker")
            cb = functools.partial(self.on_cancelok, userdata=self._consumer_tag)
            self._channel.basic_cancel(self._consumer_tag, cb)

    def on_cancelok(self, _unused_frame: pika.frame.Method, userdata: str) -> None:

        """
        This method is invoked by pika when the broker acknowledges the
        cancellation of a consumer. At this point we will close the channel.
        This will invoke the on_channel_closed method once the channel has been
        closed, which will in-turn close the connection.
        """

        self._consuming = False
        logger.info(f"The broker acknowledged the cancellation of the consumer: {userdata}")
        self.close_channel()

    def close_channel(self) -> None:

        """
        Call to close the channel cleanly by issuing the
        Channel.Close RPC command.
        """

        logger.info("Closing the channel")
        self._channel.close()

    def run(self) -> None:

        """
        Run the consumer by connecting to the broker and then
        starting the IOLoop to block and allow the SelectConnection to operate.
        """

        self._connection = self.connect()
        self._connection.ioloop.start()

    def stop(self) -> None:

        """
        Cleanly shutdown the connection by stopping the consumer.
        """

        if not self._closing:
            self._closing = True
            logger.info("Stopping")
            if self._consuming:
                self.stop_consuming()
                logger.info("Start called")
                self._connection.ioloop.stop()
            else:
                self._connection.ioloop.stop()
            logger.info("Stopped consuming")


[docs]class TopicHandler(_AMQPConsumer): """ This handler implements Hurricane's base AMQP consumer that handles unexpected interactions with the message broker such as channel and connection closures. The ``EXCHANGE_TYPE`` is *topic*. """ EXCHANGE_TYPE = "topic"