Source code for hurricane.amqp.worker

import os
import time
from typing import Type

import tornado.ioloop
from django.conf import settings

from hurricane.amqp import logger
from hurricane.amqp.basehandler import _AMQPConsumer

[docs]class AMQPClient(object): """ This is the AMQP Client that will reconnect, if the nested handler instance indicates that a reconnect is necessary. """ def __init__( self, consumer_klass: Type[_AMQPConsumer], queue_name: str, exchange_name: str, amqp_host: str, amqp_port: int, amqp_vhost: str, ): self._reconnect_delay = 0 # load user if hasattr(settings, "AMQP_USER"): user = settings.AMQP_USER elif os.getenv("AMQP_USER"): user = os.getenv("AMQP_USER") else: user = None # load password if hasattr(settings, "AMQP_PASSWORD"): password = settings.AMQP_PASSWORD elif os.getenv("AMQP_PASSWORD"): password = os.getenv("AMQP_PASSWORD") else: password = None self._consumer_args = (queue_name, exchange_name, amqp_host, amqp_port, amqp_vhost, user, password) self._consumer_klass = consumer_klass self._consumer = self._consumer_klass(*self._consumer_args)
[docs] def run(self, reconnect: bool = False) -> None: """ If reconnect is True, AMQP consumer is running in auto-connect mode. In this case consumer will be executed. If any exception occurs, consumer will be disconnected and after some delay will be reconnected. Then consumer will be restarted. KeyboardInterrupt exception is handled differently and stops consumer. In this case IOLoop will be terminated. If reconnect is false, consumer will be started, but no exceptions and interruptions will be tolerated. """ if reconnect:"AMQP consumer running in auto-reconnect mode") while True: try: except KeyboardInterrupt: self._consumer.stop() break self._maybe_reconnect() else:"Terminating Hurricane AMQP client") loop = tornado.ioloop.IOLoop.current() loop.stop()
def _maybe_reconnect(self) -> None: if self._consumer.should_reconnect: self._consumer.stop() reconnect_delay = self._get_reconnect_delay() logger.warning("Reconnecting after %d seconds", reconnect_delay) time.sleep(reconnect_delay) self._consumer = self._consumer_klass(*self._consumer_args) def _get_reconnect_delay(self): if self._consumer.was_consuming: self._reconnect_delay = 0 else: self._reconnect_delay += 1 self._reconnect_delay = min(self._reconnect_delay, 30) return self._reconnect_delay