import asyncio
import functools
import os
import signal
import sys
import time

import tornado.autoreload
import tornado.ioloop
import tornado.web
from django.conf import settings
from import BaseCommand, CommandError
from django.utils.module_loading import import_string

from hurricane.amqp import logger
from hurricane.amqp.basehandler import _AMQPConsumer
from hurricane.amqp.worker import AMQPClient
from hurricane.metrics import StartupTimeMetric
from hurricane.server import make_probe_server, sanitize_probes

[docs]class Command(BaseCommand): """ Starting a Tornado-powered Django AMQP 0-9-1 consumer. Implements consume command as a management command for django application. The new command can be called using ``python consume <arguments>``. Arguments: - ``--queue`` - the AMQP 0-9-1 queue to consume from - ``--exchange`` - the AMQP 0-9-1 exchange to declare - ``--amqp-port`` - the message broker connection port - ``--amqp-host`` - the host address of the message broker - ``--amqp-vhost`` - the virtual host of the message broker to use with this consumer - ``--handler`` - the Hurricane AMQP handler class (dotted path) - ``--startup-probe`` - the exposed path (default is /startup) for probes to check startup - ``--readiness-probe`` - the exposed path (default is /ready) for probes to check readiness - ``--liveness-probe`` - the exposed path (default is /alive) for probes to check liveness - ``--probe-port`` - the port for Tornado probe route to listen on - ``--req-queue-len`` - threshold of length of queue of request, which is considered for readiness probe - ``--no-probe`` - disable probe endpoint - ``--no-metrics`` - disable metrics collection - ``--autoreload`` - reload code on change - ``--debug`` - set Tornado's Debug flag - ``--reconnect`` - try to reconnect this client automatically as the broker is available again - ``--max-lifetime``- If specified, maximum requests after which pod is restarted """ help = "Start a Tornado-powered Django AMQP 0-9-1 consumer"
[docs] def add_arguments(self, parser): """ Defines arguments, that can be accepted with ``consume`` command. """ parser.add_argument("--queue", type=str, default="", help="The AMQP 0-9-1 queue to consume from") parser.add_argument("--exchange", type=str, default="", help="The AMQP 0-9-1 exchange to declare") parser.add_argument( "--amqp-port", type=int, help="The message broker connection port", ) parser.add_argument( "--amqp-host", type=str, help="The host address of the message broker", ) parser.add_argument( "--amqp-vhost", type=str, help="The virtual host of the message broker to use with this consumer", ) parser.add_argument("handler", type=str, help="The Hurricane AMQP handler class (dotted path)") parser.add_argument( "--liveness-probe", type=str, default="/alive", help="The exposed path (default is /alive) for probes to check liveness", ) parser.add_argument( "--readiness-probe", type=str, default="/ready", help="The exposed path (default is /ready) for probes to check readiness", ) parser.add_argument( "--startup-probe", type=str, default="/startup", help="The exposed path (default is /startup) for probes to check startup", ) parser.add_argument( "--probe-port", type=int, default=8001, help="The port for Tornado probe route to listen on", ) parser.add_argument("--req-queue-len", type=int, default=10, help="Length of the request queue") parser.add_argument("--no-probe", action="store_true", help="Disable probe endpoint") parser.add_argument("--no-metrics", action="store_true", help="Disable metrics collection") parser.add_argument("--autoreload", action="store_true", help="Reload code on change") parser.add_argument("--debug", action="store_true", help="Set Tornado's Debug flag") parser.add_argument( "--reconnect", action="store_true", help="Try to reconnect this client automatically as the broker is available again", ) parser.add_argument( "--webhook-url", type=str, help="Url for webhooks", ) parser.add_argument( "--max-lifetime", type=int, default=None, help="Maximum requests after which pod is restarted" )
[docs] def handle(self, *args, **options): """ Defines functionalities for different arguments. After all arguments were processed, it starts the async event loop. """ start_time = time.time()"Starting a Tornado-powered Django AMQP consumer") if options["autoreload"]: tornado.autoreload.start()"Autoreload was performed") # sanitize probes: returns regexps for probes in options and their representations for logging options, probe_representations = sanitize_probes(options) # set the probe routes if not options["no_probe"]:"Probe application running on port {options['probe_port']}") probe_application = make_probe_server(options, self.check) probe_application.listen(options["probe_port"]) else:"No probe application running") connection = self.set_connection_values(options) # load the handler class _amqp_consumer = import_string(options["handler"]) if not issubclass(_amqp_consumer, _AMQPConsumer): logger.error(f"The type {_amqp_consumer} is not subclass of _AMQPConsumer") raise CommandError("Cannot start the consumer due to an implementation error") worker = AMQPClient( _amqp_consumer, queue_name=options["queue"], exchange_name=options["exchange"], amqp_host=connection["amqp_host"], amqp_port=connection["amqp_port"], amqp_vhost=connection["amqp_vhost"], ) # prepate the io loop loop = asyncio.get_event_loop() def ask_exit(signame):"Received signal {signame}. Shutting down now.") loop.stop() sys.exit(0) for signame in ("SIGINT", "SIGTERM"): loop.add_signal_handler(getattr(signal, signame), functools.partial(ask_exit, signame)) end_time = time.time() time_elapsed = end_time - start_time StartupTimeMetric.set(time_elapsed)["reconnect"])
def set_connection_values(self, options): # load connection data connection = {} if "amqp_host" in options and options["amqp_host"]: connection["amqp_host"] = options["amqp_host"] elif hasattr(settings, "AMQP_HOST"): connection["amqp_host"] = settings.AMQP_HOST elif os.getenv("AMQP_HOST"): connection["amqp_host"] = os.getenv("AMQP_HOST") else: raise CommandError( "The amqp host must not be empty: set it either as environment AMQP_HOST, " "in the django settings as AMQP_HOST or as optional argument --amqp-host" ) if "amqp_port" in options and options["amqp_port"]: connection["amqp_port"] = options["amqp_port"] elif hasattr(settings, "AMQP_PORT"): connection["amqp_port"] = settings.AMQP_PORT elif os.getenv("AMQP_PORT"): connection["amqp_port"] = int(os.getenv("AMQP_PORT")) else: raise CommandError( "The amqp port must not be empty: set it either as environment AMQP_PORT, " "in the django settings as AMQP_PORT or as optional argument --amqp-port" ) connection["amqp_vhost"] = "/" if "amqp_vhost" in options and options["amqp_vhost"]: connection["amqp_vhost"] = options["amqp_vhost"] elif hasattr(settings, "AMQP_VHOST"): connection["amqp_vhost"] = settings.AMQP_VHOST elif os.getenv("AMQP_VHOST"): connection["amqp_vhost"] = os.getenv("AMQP_VHOST") return connection