import asyncio
import traceback
import tornado.web
from asgiref.sync import async_to_sync, sync_to_async
from django.conf import settings
from django.core.management.base import SystemCheckError
from django.core.wsgi import get_wsgi_application
from django.db import OperationalError, connection
from hurricane.metrics import (
HealthMetric,
ReadinessMetric,
RequestCounterMetric,
RequestQueueLengthMetric,
ResponseTimeAverageMetric,
StartupTimeMetric,
)
from hurricane.server.loggers import logger
from hurricane.server.wsgi import HurricaneWSGIContainer
from hurricane.webhooks import LivenessWebhook, ReadinessWebhook
from hurricane.webhooks.base import WebhookStatus
[docs]class DjangoHandler(tornado.web.RequestHandler):
"""
This handler transmits all standard requests to django application. Currently it uses WSGI Container based on
tornado WSGI Container.
"""
[docs] def initialize(self):
"""
Initialization of Hurricane WSGI Container.
"""
self.django = HurricaneWSGIContainer(self, get_wsgi_application())
[docs] async def prepare(self) -> None:
"""
Transmitting incoming request to django application via WSGI Container.
"""
await self.django(self.request)
self._finished = True
self._log()
self.on_finish()
[docs]class DjangoProbeHandler(tornado.web.RequestHandler):
"""
Parent class for all specific probe handlers.
"""
[docs] def compute_etag(self):
return None
[docs] def set_extra_headers(self, path):
"""
Setting of extra headers for cache-control, namely: no-store, no-cache, must-revalidate and max-age=0. It
means that information on requests and responses will not be stored.
"""
self.set_header("Cache-Control", "no-store, no-cache, must-revalidate, max-age=0")
def _check(self):
"""
Checking application on several errors. Catches SystemCheckErrors of django system check framework and
checks the connection to the database.
"""
pass
[docs] async def get(self):
"""
Get method, which runs the check.
"""
await self._check()
[docs] async def post(self):
"""
Post method, which runs the check.
"""
await self._check()
[docs]class DjangoLivenessHandler(DjangoProbeHandler):
"""
This handler runs with every call to the probe endpoint which is supposed to be used
"""
def initialize(self, check_handler, webhook_url, max_lifetime):
self.check = check_handler
self.liveness_webhook = webhook_url
self.max_lifetime = max_lifetime
@sync_to_async
def ensure_connection(self):
connection.ensure_connection()
async def _check(self):
if StartupTimeMetric.get():
await self._check_liveness()
else:
self.set_status(400)
def _update_health_metric(self, liveness_webhook, got_exception):
if not got_exception:
if HealthMetric.get() is not True:
HealthMetric.set(True)
if liveness_webhook:
logger.info("Health metric changed to True. Liveness webhook with status succeeded triggered")
LivenessWebhook().run(url=liveness_webhook, status=WebhookStatus.SUCCEEDED)
elif HealthMetric.get() is not False:
HealthMetric.set(False)
if liveness_webhook:
logger.info("Health metric changed to False. Liveness webhook with status failed triggered")
LivenessWebhook().run(url=liveness_webhook, status=WebhookStatus.FAILED, error_trace=got_exception)
async def _check_liveness(self):
if self.max_lifetime and RequestCounterMetric.get() > self.max_lifetime:
self.set_status(400)
return None
got_exception = None
try:
async_check = sync_to_async(self.check)
await async_check(tags=["hurricane"], include_deployment_checks=True)
if settings.DATABASES:
# once a connection has been established, this will be successful
# (even if the connection is gone later on)
await self.ensure_connection()
except SystemCheckError as e:
got_exception = traceback.format_exc()
self._write_error(msg="check error", e=e)
except OperationalError as e:
got_exception = traceback.format_exc()
self._write_error(msg="database error", e=e)
else:
if response_average_time := ResponseTimeAverageMetric.get():
self.write(
f"Average response time: {response_average_time:.2f}ms Request "
f"queue size: {RequestQueueLengthMetric.get()} Rx"
)
else:
self.write("alive")
self._update_health_metric(self.liveness_webhook, got_exception)
finally:
if got_exception:
self.set_status(500)
self._update_health_metric(self.liveness_webhook, got_exception)
def _write_error(self, msg, e=None):
if settings.DEBUG:
self.write(f"django {msg}: " + str(e))
else:
self.write(f"{msg}")
[docs]class DjangoReadinessHandler(DjangoProbeHandler):
"""
This handler runs with every call to the probe endpoint which is supposed to be used
with Kubernetes 'Readiness Probes'. The DjangoCheckHandler calls Django's Check Framework which
can be used to determine the application's health state during its operation.
"""
def initialize(self, req_queue_len, webhook_url):
self.request_queue_length = req_queue_len
self.readiness_webhook = webhook_url
async def _check(self):
if StartupTimeMetric.get() and RequestQueueLengthMetric.get() > self.request_queue_length:
self.set_status(400)
if ReadinessMetric.get() is not False:
ReadinessMetric.set(False)
if self.readiness_webhook:
logger.info("Readiness metric changed to False. Webhook with status failed triggered")
ReadinessWebhook().run(url=self.readiness_webhook, status=WebhookStatus.FAILED)
elif StartupTimeMetric.get() and RequestQueueLengthMetric.get() <= self.request_queue_length:
self.set_status(200)
if ReadinessMetric.get() is not True:
ReadinessMetric.set(True)
if self.readiness_webhook:
logger.info("Readiness metric changed to True. Webhook with status succeeded triggered")
ReadinessWebhook().run(url=self.readiness_webhook, status=WebhookStatus.SUCCEEDED)
else:
self.set_status(400)
[docs]class DjangoStartupHandler(DjangoProbeHandler):
"""
This handler runs with every call to the probe endpoint which is supposed to be used with Kubernetes
'Startup Probes'. It returns 400 response for post and get requests, if StartupTimeMetric is not set, what means
that the application is still in the startup phase. As soon as StartupTimeMetric is set, this handler returns 200
response upon request, which indicates, that startup phase is finished and Kubernetes can now poll
liveness/readiness probes.
"""
async def _check(self):
if StartupTimeMetric.get():
self.write(f"Startup was finished {StartupTimeMetric.get()}")
self.set_status(200)
else:
self.set_status(400)