import logging from celery import shared_task from django.conf import settings from django.core.mail import get_connection from django.core.mail.backends.base import BaseEmailBackend from djcelery_email.utils import dict_to_email, email_to_dict from desecapi import metrics logger = logging.getLogger(__name__) class MultiLaneEmailBackend(BaseEmailBackend): config = {"ignore_result": True} default_backend = "django.core.mail.backends.smtp.EmailBackend" def __init__(self, lane: str = None, fail_silently=False, **kwargs): lane = lane or next(iter(settings.TASK_CONFIG)) self.config.update(name=lane, queue=lane) self.config.update(settings.TASK_CONFIG[lane]) self.task_kwargs = kwargs.copy() # Make a copy to ensure we don't modify input dict when we set the 'lane' self.task_kwargs["debug"] = self.task_kwargs.pop("debug", {}).copy() self.task_kwargs["debug"]["lane"] = lane super().__init__(fail_silently) def send_messages(self, email_messages): dict_messages = [email_to_dict(msg) for msg in email_messages] TASKS[self.config["name"]].delay(dict_messages, **self.task_kwargs) return len(email_messages) @staticmethod def _run_task(messages, debug, **kwargs): logger.warning("Sending queued email, details: %s", debug) kwargs.setdefault( "backend", kwargs.pop("backbackend", MultiLaneEmailBackend.default_backend) ) with get_connection(**kwargs) as connection: return connection.send_messages( [dict_to_email(message) for message in messages] ) @property def task(self): return shared_task(**self.config)(self._run_task) # Define tasks so that Celery can discovery them TASKS = { name: MultiLaneEmailBackend(lane=name, fail_silently=True).task for name in settings.TASK_CONFIG if name.startswith("email_") }