import traceback
import tornado.web
from asgiref.sync import sync_to_async
from django.conf import settings
from django.contrib.staticfiles.handlers import StaticFilesHandler
from django.core.management.base import SystemCheckError
from django.core.wsgi import get_wsgi_application
from django.db import OperationalError, connection
from hurricane.metrics import (
HealthMetric,
ReadinessMetric,
RequestCounterMetric,
RequestQueueLengthMetric,
ResponseTimeAverageMetric,
StartupTimeMetric,
)
from hurricane.server.loggers import logger
from hurricane.server.wsgi import HurricaneWSGIContainer
from hurricane.webhooks import LivenessWebhook, ReadinessWebhook
from hurricane.webhooks.base import WebhookStatus
[docs]class DjangoHandler(tornado.web.RequestHandler):
"""
This handler transmits all standard requests to django application. Currently it uses WSGI Container based on
tornado WSGI Container.
"""
[docs] def initialize(self):
"""
Initialization of Hurricane WSGI Container.
"""
self.django = HurricaneWSGIContainer(self, get_wsgi_application())
[docs] async def prepare(self) -> None:
"""
Transmitting incoming request to django application via WSGI Container.
"""
await self.django(self.request)
self._finished = True
self._log()
self.on_finish()
[docs]class DjangoStaticFilesHandler(DjangoHandler):
"""
This handler transmits all static requests to django application. Currently it uses WSGI Container based on
tornado WSGI Container.
"""
[docs] def initialize(self):
"""
Initialization of Hurricane WSGI Container.
"""
self.django = HurricaneWSGIContainer(self, StaticFilesHandler(get_wsgi_application()))
[docs]class DjangoProbeHandler(tornado.web.RequestHandler):
"""
Parent class for all specific probe handlers.
"""
[docs] def compute_etag(self):
return None
[docs] def set_extra_headers(self, path):
"""
Setting of extra headers for cache-control, namely: no-store, no-cache, must-revalidate and max-age=0. It
means that information on requests and responses will not be stored.
"""
self.set_header("Cache-Control", "no-store, no-cache, must-revalidate, max-age=0")
async def _check(self):
"""
Checking application on several errors. Catches SystemCheckErrors of django system check framework and
checks the connection to the database.
"""
pass
async def _check_startup_wrapper(self):
"""
Checking application on several errors. Catches SystemCheckErrors of django system check framework and
checks the connection to the database.
"""
if StartupTimeMetric.get():
await self._check()
else:
self.set_status(400)
def _probe_check(self):
"""
Checking application on several errors. Catches SystemCheckErrors of django system check framework and
checks the connection to the database.
"""
pass
[docs] async def get(self):
"""
Get method, which runs the check.
"""
await self._check_startup_wrapper()
[docs] async def post(self):
"""
Post method, which runs the check.
"""
await self._check_startup_wrapper()
@sync_to_async
def _ensure_connection(self):
connection.ensure_connection()
async def _custom_check_wrapper(self, tag, metric, webhook, webhook_url):
got_exception = None
try:
async_check = sync_to_async(self.check)
await async_check(tags=[tag], include_deployment_checks=True)
if settings.DATABASES:
# once a connection has been established, this will be successful
# (even if the connection is gone later on)
await self._ensure_connection()
except SystemCheckError as e:
got_exception = traceback.format_exc()
self._write_error(msg="check error", e=e)
except OperationalError as e:
got_exception = traceback.format_exc()
self._write_error(msg="database error", e=e)
else:
self._probe_check()
self._update_health_metric_no_exception(metric, webhook, webhook_url)
finally:
if got_exception:
self.set_status(500)
self._update_health_metric_exception(metric, webhook, webhook_url)
def _update_health_metric_no_exception(self, metric, webhook, webhook_url):
if not metric.get():
metric_change = True
metric.set(metric_change)
self._send_webhook(metric, webhook, webhook_url, WebhookStatus.SUCCEEDED, metric_change)
def _update_health_metric_exception(self, metric, webhook, webhook_url):
if metric.get() or metric.get() is None:
metric_change = False
metric.set(metric_change)
self._send_webhook(metric, webhook, webhook_url, WebhookStatus.FAILED, metric_change)
def _write_error(self, msg, e=None):
if settings.DEBUG:
self.write(f"django {msg}: " + str(e))
else:
self.write(f"{msg}")
def _send_webhook(self, metric, webhook, webhook_url, status, metric_change):
if webhook_url:
logger.info(
f"{metric.code.capitalize()} metric changed to {metric_change}. {webhook.code.capitalize()} webhook with status {status} triggered"
)
webhook().run(url=webhook_url, status=status)
[docs]class DjangoLivenessHandler(DjangoProbeHandler):
"""
This handler runs with every call to the probe endpoint which is supposed to be used
"""
def initialize(self, check_handler, webhook_url, max_lifetime):
self.check = check_handler
self.liveness_webhook_url = webhook_url
self.liveness_webhook = LivenessWebhook
self.metric = HealthMetric
self.tag = "liveness"
self.max_lifetime = max_lifetime
async def _check(self):
await self._custom_check_wrapper(self.tag, self.metric, self.liveness_webhook, self.liveness_webhook_url)
def _probe_check(self):
if self.max_lifetime and RequestCounterMetric.get() > self.max_lifetime:
self.set_status(400)
return None
if response_average_time := ResponseTimeAverageMetric.get():
self.write(
f"Average response time: {response_average_time:.2f}ms Request "
f"queue size: {RequestQueueLengthMetric.get()} Rx"
)
else:
self.write("alive")
[docs]class DjangoReadinessHandler(DjangoProbeHandler):
"""
This handler runs with every call to the probe endpoint which is supposed to be used
with Kubernetes 'Readiness Probes'. The DjangoCheckHandler calls Django's Check Framework which
can be used to determine the application's health state during its operation.
"""
def initialize(self, check_handler, req_queue_len, webhook_url):
self.check = check_handler
self.request_queue_length = req_queue_len
self.readiness_webhook_url = webhook_url
self.readiness_webhook = ReadinessWebhook
self.metric = ReadinessMetric
self.tag = "readiness"
async def _check(self):
await self._custom_check_wrapper(self.tag, self.metric, self.readiness_webhook, self.readiness_webhook_url)
def _probe_check(self):
if RequestQueueLengthMetric.get() > self.request_queue_length:
self.set_status(400)
self._update_health_metric_exception(self.metric, self.readiness_webhook, self.readiness_webhook_url)
elif RequestQueueLengthMetric.get() <= self.request_queue_length:
self.set_status(200)
self._update_health_metric_no_exception(self.metric, self.readiness_webhook, self.readiness_webhook_url)
[docs]class DjangoStartupHandler(DjangoProbeHandler):
"""
This handler runs with every call to the probe endpoint which is supposed to be used with Kubernetes
'Startup Probes'. It returns 400 response for post and get requests, if StartupTimeMetric is not set, what means
that the application is still in the startup phase. As soon as StartupTimeMetric is set, this handler returns 200
response upon request, which indicates, that startup phase is finished and Kubernetes can now poll
liveness/readiness probes.
"""
async def _check(self):
await self._probe_check()
async def _probe_check(self):
self.write(f"Startup was finished {StartupTimeMetric.get()}")
self.set_status(200)