Source code for hurricane.webhooks.base

import asyncio
import functools
import socket
import time
import typing
from concurrent.futures.thread import ThreadPoolExecutor
from enum import Enum

import requests
from django.conf import settings
from requests import RequestException

from hurricane.server.loggers import logger


[docs]class WebhookStatus(Enum): FAILED = "failed" SUCCEEDED = "succeeded" WARNING = "warning"
[docs]class Webhook: """ Base class for webhooks in the registry. Run function initiates sending of webhook to the specified url. """ code = None data = {} def __init__(self, code=None): if code: self.code = code
[docs] @classmethod def get_from_registry(cls): """ Getting webhook from registry using the code. """ from hurricane.webhooks import webhook_registry return webhook_registry.get(cls.code)
[docs] def run(self, url: str, status: WebhookStatus, error_trace: str = None, close_loop: bool = False, loop=None): """ Initiates the sending of webhook in an asynchronous manner. Also specifies the callback of the async process, which handles the feedback and either logs success or failure of a webhook sending process. Parameters ---------- url : Url, which webhook should be sent to status : can be either WebhookStatus.FAILED or WebhookStatus.SUCCEEDED depending on the success or failure of the process, which should be indicated by the webhook error_trace : specifies the error trace of the preceding failure close_loop : specifies, whether the main loop should be closed or be left running """ if url: self.set_traceback(error_trace) self.set_status(status) self.set_timestamp() self.set_podname() self.set_version() current_loop = loop or asyncio.get_event_loop() executor = ThreadPoolExecutor(max_workers=1) fut = current_loop.run_in_executor(executor, self._send_webhook, self.get_message(), url, close_loop) # callback runs after run_in_executor is done callback_wrapper = functools.partial( self._callback_webhook_exception_check, url=url, close_loop=close_loop, loop=loop ) fut.add_done_callback(callback_wrapper) if not url and close_loop: logger.warning("No webhook can be sent, as no url is specified") self._callback_webhook_exception_check(future=None, url="", close_loop=True, loop=loop)
def _send_webhook(self, data: dict, webhook_url: str, close_loop: bool): # sending webhook request to the specified url logger.info(f"Start sending {self.code} webhook to {webhook_url}") response = requests.post(webhook_url, timeout=5, json=data) response.raise_for_status() def set_traceback(self, error_trace: str): self.data["traceback"] = error_trace def set_timestamp(self): self.data["timestamp"] = int(time.time()) def set_podname(self): self.data["podname"] = socket.gethostname() def set_status(self, status: WebhookStatus): self.data["status"] = status.value def set_version(self): if hasattr(settings, "HURRICANE_VERSION"): self.data["version"] = settings.HURRICANE_VERSION else: self.data["version"] = None def get_message(self): return self.data @staticmethod def _callback_webhook_exception_check( future: typing.Union[asyncio.Future, None], url: str, close_loop: bool, loop=None ): # checks if sending webhook had any failures, it indicates, that command was successfully executed # but sending webhook has failed if future: try: future.result() except RequestException as e: logger.warning(f"Sending webhook to {url} has failed due to {e}") if close_loop: logger.info("Loop will be closed") loop.stop()