from __future__ import annotations import email import json import logging import os import re import sqlite3 import uuid from collections import Counter, defaultdict from collections.abc import Iterable from datetime import datetime from datetime import timedelta as td from email.message import EmailMessage from itertools import islice from secrets import token_urlsafe from typing import Literal, TypedDict, cast from urllib.parse import urlencode, urlparse from uuid import UUID from zoneinfo import ZoneInfo from cronsim import CronSim, CronSimError from django.conf import settings from django.contrib import messages from django.contrib.auth.decorators import login_required from django.core import signing from django.core.exceptions import PermissionDenied from django.db.models import Case, Count, F, Q, QuerySet, When from django.db.models.functions import Substr from django.http import ( Http404, HttpRequest, HttpResponse, HttpResponseBadRequest, HttpResponseForbidden, JsonResponse, ) from django.shortcuts import get_object_or_404, redirect, render from django.template.loader import get_template, render_to_string from django.urls import reverse from django.utils.timezone import now from django.views.decorators.csrf import csrf_exempt from django.views.decorators.http import require_POST from django_stubs_ext import WithAnnotations from oncalendar import OnCalendar, OnCalendarError from pydantic import BaseModel, TypeAdapter, ValidationError from hc.accounts.http import AuthenticatedHttpRequest from hc.accounts.models import Member, Profile, Project from hc.api.models import ( DEFAULT_GRACE, DEFAULT_TIMEOUT, MAX_DURATION, Channel, Check, Flip, Notification, Ping, TokenBucket, ) from hc.api.transports import Signal, Telegram, TransportError from hc.front import forms from hc.front.decorators import require_setting from hc.front.templatetags.hc_extras import ( down_title, num_down_title, site_hostname, sortchecks, ) from hc.lib import curl from hc.lib.badges import get_badge_url from hc.lib.tz import all_timezones from hc.lib.urls import absolute_reverse logger = logging.getLogger(__name__) VALID_SORT_VALUES = ("name", "-name", "last_ping", "-last_ping", "created") STATUS_TEXT_TMPL = get_template("front/log_status_text.html") LAST_PING_TMPL = get_template("front/last_ping_cell.html") EVENTS_TMPL = get_template("front/details_events.html") DOWNTIMES_TMPL = get_template("front/details_downtimes.html") def _tags_counts(checks: Iterable[Check]) -> tuple[list[tuple[str, str, str]], int]: num_down = 0 grace = set() counts: Counter[str] = Counter() down_counts: Counter[str] = Counter() for check in checks: counts.update(check.tags_list()) if check.cached_status == "down": num_down += 1 down_counts.update(check.tags_list()) elif check.cached_status == "grace": grace.update(check.tags_list()) result = [] for tag in counts: if tag in down_counts: status = "down" text = f"{down_counts[tag]} of {counts[tag]} down" else: status = "grace" if tag in grace else "up" text = f"{counts[tag]} up" result.append((tag, status, text)) return result, num_down def _common_timezones(checks: Iterable[Check]) -> list[str]: counter: Counter[str] = Counter() for check in checks: counter[check.tz] += 1 return [tz for tz, _ in counter.most_common(3)] def _get_check_for_user( request: HttpRequest, code: UUID, preload_owner_profile: bool = False ) -> tuple[Check, bool]: """Return specified check if current user has access to it. If `preload_owner_profile` is `True`, the returned check's project.owner.profile will be already loaded. This helps avoid extra SQL queries if the caller later looks up the project owner's check_limit or ping_log_limit. """ assert request.user.is_authenticated q = Check.objects.select_related("project") if preload_owner_profile: q = q.select_related("project__owner__profile") check = get_object_or_404(q, code=code) if request.user.is_superuser: return check, True if request.user.id == check.project.owner_id: return check, True membership = get_object_or_404(Member, project=check.project, user=request.user) return check, membership.is_rw def _get_rw_check_for_user(request: HttpRequest, code: UUID) -> Check: check, rw = _get_check_for_user(request, code) if not rw: raise PermissionDenied return check def _get_channel_for_user(request: HttpRequest, code: UUID) -> tuple[Channel, bool]: """Return specified channel if current user has access to it.""" assert request.user.is_authenticated channel = get_object_or_404(Channel.objects.select_related("project"), code=code) if request.user.is_superuser: return channel, True if request.user.id == channel.project.owner_id: return channel, True membership = get_object_or_404(Member, project=channel.project, user=request.user) return channel, membership.is_rw def _get_rw_channel_for_user(request: HttpRequest, code: UUID) -> Channel: channel, rw = _get_channel_for_user(request, code) if not rw: raise PermissionDenied return channel def _get_project_for_user(request: HttpRequest, code: UUID) -> tuple[Project, bool]: """Check access, return (project, rw) tuple.""" project = get_object_or_404(Project, code=code) if request.user.is_superuser: return project, True if request.user.id == project.owner_id: return project, True membership = get_object_or_404(Member, project=project, user=request.user) return project, membership.is_rw def _get_rw_project_for_user(request: HttpRequest, code: UUID) -> Project: """Check access, return (project, rw) tuple.""" project, rw = _get_project_for_user(request, code) if not rw: raise PermissionDenied return project def _refresh_last_active_date(request: AuthenticatedHttpRequest) -> None: """Update last_active_date if it is more than a day old.""" profile = request.profile if profile.last_active_date is None or (now() - profile.last_active_date).days > 0: profile.last_active_date = now() profile.save() # Also modify session to trigger session cookie refresh # and push forward its expiry date: request.session["last_active"] = profile.last_active_date.timestamp() return None def _get_referer_qs(request: HttpRequest) -> str: parsed = urlparse(request.META.get("HTTP_REFERER", "")) if parsed.query: assert isinstance(parsed.query, str) return "?" + parsed.query return "" def _status_match(check: Check, statuses: set[str]) -> bool: if "started" in statuses and check.last_start: return True return check.cached_status in statuses @login_required def checks(request: AuthenticatedHttpRequest, code: UUID) -> HttpResponse: _refresh_last_active_date(request) project, rw = _get_project_for_user(request, code) if request.GET.get("sort") in VALID_SORT_VALUES: request.profile.sort = request.GET["sort"] request.profile.save() if request.GET.get("urls") in ("uuid", "slug") and rw: project.show_slugs = request.GET["urls"] == "slug" project.save() if request.session.get("last_project_id") != project.id: request.session["last_project_id"] = project.id q = Check.objects.filter(project=project) q = q.select_related("project") checks = list(q.prefetch_related("channel_set")) sortchecks(checks, request.profile.sort) tags_counts, num_down = _tags_counts(checks) tags_counts.sort(key=lambda item: item[0].lower()) is_group = Case(When(kind="group", then=0), default=1) channels = project.channel_set.annotate(is_group=is_group) # Sort groups first, then in the creation order channels = channels.order_by("is_group", "created") hidden_checks = set() # Hide checks that don't match selected tags: selected_tags = set(request.GET.getlist("tag", [])) if selected_tags: for check in checks: if not selected_tags.issubset(check.tags_list()): hidden_checks.add(check) # Hide checks that don't match the search string: search = request.GET.get("search", "") if search: for check in checks: haystack = f"{check.name}\n{check.slug}\n{check.code}" if search not in haystack.lower(): hidden_checks.add(check) # Hide checks that don't match status filters selected_statuses = set(request.GET.getlist("status", [])) if selected_statuses: for check in checks: if not _status_match(check, selected_statuses): hidden_checks.add(check) # Figure out which checks have ambiguous ping URLs seen, ambiguous = set(), set() if project.show_slugs: for check in checks: if check.slug and check.slug in seen: ambiguous.add(check.slug) else: seen.add(check.slug) # Do we need to show the "Last Duration" header? show_last_duration = False for check in checks: if check.clamped_last_duration(): show_last_duration = True break ctx = { "page": "checks", "rw": rw, "checks": checks, "channels": channels, "num_down": num_down, "tags": tags_counts, "ping_endpoint": settings.PING_ENDPOINT, "common_timezones": _common_timezones(checks), "timezones": all_timezones, "project": project, "num_available": project.num_checks_available(), "sort": request.profile.sort, "selected_tags": selected_tags, "selected_statuses": selected_statuses, "search": search, "hidden_checks": hidden_checks, "num_visible": len(checks) - len(hidden_checks), "ambiguous": ambiguous, "show_last_duration": show_last_duration, } return render(request, "front/checks.html", ctx) def status(request: HttpRequest, code: UUID) -> HttpResponse: if not request.user.is_authenticated: return HttpResponseForbidden() project, rw = _get_project_for_user(request, code) checks = list(Check.objects.filter(project=project)) details = [] for check in checks: ctx = {"check": check} details.append( { "code": str(check.code), "status": check.get_status(), "last_ping": LAST_PING_TMPL.render(ctx).strip(), "started": check.last_start is not None, } ) tags_counts, num_down = _tags_counts(checks) tags = {tag: (status, tooltip) for tag, status, tooltip in tags_counts} return JsonResponse( {"details": details, "tags": tags, "title": num_down_title(num_down)} ) @login_required @require_POST def switch_channel( request: AuthenticatedHttpRequest, code: UUID, channel_code: UUID ) -> HttpResponse: check = _get_rw_check_for_user(request, code) channel = get_object_or_404(Channel, code=channel_code) if channel.project_id != check.project_id: return HttpResponseBadRequest() if request.POST.get("state") == "on": channel.checks.add(check) else: channel.checks.remove(check) return HttpResponse() class ProjectStatus(TypedDict): status: str started: bool def _get_project_summary(profile: Profile) -> dict[UUID, ProjectStatus]: statuses: dict[UUID, ProjectStatus] = defaultdict( lambda: {"status": "up", "started": False} ) q = profile.checks_from_all_projects() q = q.annotate(project_code=F("project__code")) for check in q: summary = statuses[check.project_code] if check.last_start: summary["started"] = True if summary["status"] != "down": status = check.get_status() if status == "down" or (status == "grace" and summary["status"] == "up"): summary["status"] = status return statuses def index(request: HttpRequest) -> HttpResponse: if not request.user.is_authenticated: return redirect("hc-login") # We now know user is logged, tell the type checker request.profile exists- request = cast(AuthenticatedHttpRequest, request) _refresh_last_active_date(request) summary = _get_project_summary(request.profile) if "refresh" in request.GET: return JsonResponse({str(k): v for k, v in summary.items()}) q = request.profile.projects() q = q.annotate(n_checks=Count("check", distinct=True)) q = q.annotate(n_channels=Count("channel", distinct=True)) q = q.annotate(owner_email=F("owner__email")) projects = list(q) any_down = False for project in projects: setattr(project, "overall_status", summary[project.code]["status"]) setattr(project, "any_started", summary[project.code]["started"]) if summary[project.code]["status"] == "down": any_down = True # The list returned by projects() is already sorted . Do an additional sorting pass # to move projects with overall_status=down to the front (without changing their # relative order) projects.sort(key=lambda p: getattr(p, "overall_status") != "down") ctx = { "page": "projects", "projects": projects, "last_project_id": request.session.get("last_project_id"), "any_down": any_down, } return render(request, "front/projects.html", ctx) @login_required def projects_menu(request: AuthenticatedHttpRequest) -> HttpResponse: projects = list(request.profile.projects()) statuses: dict[int, str] = defaultdict(lambda: "up") for check in Check.objects.filter(project__in=projects): old_status = statuses[check.project_id] if old_status != "down": status = check.get_status() if status == "down" or (status == "grace" and old_status == "up"): statuses[check.project_id] = status for p in projects: setattr(p, "overall_status", statuses[p.id]) return render(request, "front/projects_menu.html", {"projects": projects}) def dashboard(request: HttpRequest) -> HttpResponse: return render(request, "front/dashboard.html", {}) def _replace_placeholders(doc: str, html: str) -> str: if doc.startswith("self_hosted"): return html limit = settings.PING_BODY_LIMIT or 100 if limit % 1000 == 0: limit_fmt = f"{limit // 1000} kB" else: limit_fmt = f"{limit} bytes" replaces = { "{{ default_timeout }}": str(int(DEFAULT_TIMEOUT.total_seconds())), "{{ default_grace }}": str(int(DEFAULT_GRACE.total_seconds())), "SITE_NAME": settings.SITE_NAME, "SITE_ROOT": settings.SITE_ROOT, "SITE_HOSTNAME": site_hostname(), "SITE_SCHEME": urlparse(settings.SITE_ROOT).scheme, "PING_ENDPOINT": settings.PING_ENDPOINT, "PING_URL": settings.PING_ENDPOINT + "your-uuid-here", "PING_BODY_LIMIT_FORMATTED": limit_fmt, "PING_BODY_LIMIT": str(limit), "IMG_URL": os.path.join(settings.STATIC_URL, "img/docs"), } for placeholder, value in replaces.items(): html = html.replace(placeholder, value) return html def serve_doc(request: HttpRequest, doc: str = "introduction") -> HttpResponse: # Filenames in /templates/docs/ consist of lowercase letters and underscores, # -- make sure we don't accept anything else if not re.match(r"^[0-9a-z_]+$", doc): raise Http404("not found") path = settings.BASE_DIR / f"templates/docs/{doc}.html-fragment" if not path.exists(): raise Http404("not found") with path.open("r", encoding="utf-8") as f: content = f.read() content = _replace_placeholders(doc, content) ctx = { "page": "docs", "section": doc, "content": content, "first_line": content.split("\n")[0], } return render(request, "front/docs_single.html", ctx) @csrf_exempt def docs_search(request: HttpRequest) -> HttpResponse: form = forms.SearchForm(request.GET) if not form.is_valid(): return render(request, "front/docs_search.html", {"results": []}) query = """ SELECT slug, title, snippet(docs, 2, '', '', '…', 10) FROM docs WHERE docs MATCH ? ORDER BY bm25(docs, 2.0, 10.0, 1.0) LIMIT 8 """ # Wrap the query in double quotes to get a valid FTS string # https://www.sqlite.org/fts5.html#full_text_query_syntax q = '"%s"' % form.cleaned_data["q"] con = sqlite3.connect(settings.BASE_DIR / "search.db") cur = con.cursor() res = cur.execute(query, (q,)) ctx = {"results": res.fetchall()} return render(request, "front/docs_search.html", ctx) def docs_cron(request: HttpRequest) -> HttpResponse: return render(request, "front/docs_cron.html", {"page": "docs-cron"}) @require_POST @login_required def add_check(request: AuthenticatedHttpRequest, code: UUID) -> HttpResponse: project = _get_rw_project_for_user(request, code) if project.num_checks_available() <= 0: return HttpResponseBadRequest() form = forms.AddCheckForm(request.POST) if not form.is_valid(): return HttpResponseBadRequest() check = Check(project=project) check.name = form.cleaned_data["name"] check.slug = form.cleaned_data["slug"] check.tags = form.cleaned_data["tags"] check.kind = form.cleaned_data["kind"] check.timeout = form.cleaned_data["timeout"] check.schedule = form.cleaned_data["schedule"] check.tz = form.cleaned_data["tz"] check.grace = form.cleaned_data["grace"] check.save() check.assign_all_channels() url = reverse("hc-checks", args=[project.code]) url += _get_referer_qs(request) # Preserve selected tags and search return redirect(url) @require_POST @login_required def update_name(request: AuthenticatedHttpRequest, code: UUID) -> HttpResponse: check = _get_rw_check_for_user(request, code) form = forms.NameTagsForm(request.POST) if form.is_valid(): check.name = form.cleaned_data["name"] check.slug = form.cleaned_data["slug"] check.tags = form.cleaned_data["tags"] check.desc = form.cleaned_data["desc"] check.save() if "/details/" in request.META.get("HTTP_REFERER", ""): return redirect("hc-details", code) url = reverse("hc-checks", args=[check.project.code]) url += _get_referer_qs(request) # Preserve selected tags and search return redirect(url) @require_POST @login_required def filtering_rules(request: AuthenticatedHttpRequest, code: UUID) -> HttpResponse: check = _get_rw_check_for_user(request, code) form = forms.FilteringRulesForm(request.POST) if form.is_valid(): check.filter_subject = form.cleaned_data["filter_subject"] check.filter_body = form.cleaned_data["filter_body"] check.start_kw = form.cleaned_data["start_kw"] check.success_kw = form.cleaned_data["success_kw"] check.failure_kw = form.cleaned_data["failure_kw"] check.methods = form.cleaned_data["methods"] check.manual_resume = form.cleaned_data["manual_resume"] check.save() return redirect("hc-details", code) @require_POST @login_required def update_timeout(request: AuthenticatedHttpRequest, code: UUID) -> HttpResponse: check = _get_rw_check_for_user(request, code) kind = request.POST.get("kind") if kind == "simple": simple_form = forms.TimeoutForm(request.POST) if not simple_form.is_valid(): return HttpResponseBadRequest() check.kind = "simple" check.timeout = simple_form.cleaned_data["timeout"] check.grace = simple_form.cleaned_data["grace"] elif kind == "cron": cron_form = forms.CronForm(request.POST) if not cron_form.is_valid(): return HttpResponseBadRequest() check.kind = "cron" check.schedule = cron_form.cleaned_data["schedule"] check.tz = cron_form.cleaned_data["tz"] check.grace = cron_form.cleaned_data["grace"] elif kind == "oncalendar": oncalendar_form = forms.OnCalendarForm(request.POST) if not oncalendar_form.is_valid(): return HttpResponseBadRequest() check.kind = "oncalendar" check.schedule = oncalendar_form.cleaned_data["schedule"] check.tz = oncalendar_form.cleaned_data["tz"] check.grace = oncalendar_form.cleaned_data["grace"] check.alert_after = check.going_down_after() if check.status == "up": assert check.alert_after if check.alert_after < now(): # Checks can flip from "up" to "down" state as a result of changing check's # schedule. We don't want to send notifications when changing schedule # interactively in the web UI. So we update the `alert_after` and `status` # fields, and create a Flip object here the same way as `sendalerts` would # do, but without sending an actual alert. # # We need to create the Flip object because otherwise the calculation # in Check.downtimes() will come out wrong (when this check later comes up, # we will have no record of when it went down). check.create_flip("down", mark_as_processed=True) check.alert_after = None check.status = "down" check.save() if "/details/" in request.META.get("HTTP_REFERER", ""): return redirect("hc-details", code) url = reverse("hc-checks", args=[check.project.code]) url += _get_referer_qs(request) # Preserve selected tags and search return redirect(url) @require_POST def cron_preview(request: HttpRequest) -> HttpResponse: schedule = request.POST.get("schedule", "") tz = request.POST.get("tz") ctx: dict[str, object] = {"tz": tz} if tz not in all_timezones: ctx["bad_tz"] = True return render(request, "front/cron_preview.html", ctx) now_local = now().astimezone(ZoneInfo(tz)) try: it = CronSim(schedule, now_local) ctx["dates"] = list(islice(it, 0, 6)) ctx["desc"] = it.explain() except CronSimError: ctx["bad_schedule"] = True if not ctx.get("dates"): ctx["bad_schedule"] = True return render(request, "front/cron_preview.html", ctx) @require_POST @login_required def oncalendar_preview(request: HttpRequest) -> HttpResponse: schedule = request.POST.get("schedule", "") tz = request.POST.get("tz") ctx: dict[str, object] = {"tz": tz, "dates": []} if tz not in all_timezones: ctx["bad_tz"] = True return render(request, "front/oncalendar_preview.html", ctx) now_local = now().astimezone(ZoneInfo(tz)) try: it = OnCalendar(schedule, now_local) iterations = 6 if tz == "UTC" else 4 ctx["dates"] = list(islice(it, 0, iterations)) except OnCalendarError: ctx["bad_schedule"] = True if not ctx["dates"]: ctx["bad_schedule"] = True return render(request, "front/oncalendar_preview.html", ctx) def validate_schedule(request: HttpRequest) -> HttpResponse: kind = request.GET.get("kind", "") iterator: type[CronSim] | type[OnCalendar] if kind == "cron": iterator = CronSim elif kind == "oncalendar": iterator = OnCalendar else: return HttpResponseBadRequest() schedule = request.GET.get("schedule", "") result = True try: # Does cronsim/oncalendar accept the schedule? it = iterator(schedule, now()) # Can it calculate the next datetime? next(it) except (CronSimError, OnCalendarError, StopIteration): result = False return JsonResponse({"result": result}) @login_required def ping_details( request: AuthenticatedHttpRequest, code: UUID, n: int | None = None ) -> HttpResponse: check, rw = _get_check_for_user(request, code) q = Ping.objects.filter(owner=check) if n: q = q.filter(n=n) else: # When n is not specified, look up the most recent success or failure, # ignoring "start", "log", "ign" events q = q.exclude(kind__in=("start", "log", "ign")) try: ping = q.latest("created") except Ping.DoesNotExist: return render(request, "front/ping_details_not_found.html") body = ping.get_body() ctx = { "check": check, "ping": ping, "body": body, "plain": None, "html": None, "active": None, } if ping.scheme == "email" and body: parsed = email.message_from_string(body, policy=email.policy.SMTP) assert isinstance(parsed, EmailMessage) ctx["subject"] = parsed.get("subject", "") # The "active" tab is set to show the value that's successfully parsed last. # Per the current implementation, this means that if both plain text and HTML # content are present, the ping details dialog will initially display the HTML # content, otherwise - only one content type exists, and we default to that # (either plain text or HTML, at least one of them should exist in a # valid email). # # NOTE: If both plain text and html have not been parsed successfully the # "active" tab is not set at all, but currently this is not an issue since in # this case the "ping details" template does not render any tabs. plain_mime_part = parsed.get_body(("plain",)) if plain_mime_part: assert isinstance(plain_mime_part, EmailMessage) ctx["plain"] = plain_mime_part.get_content() ctx["active"] = "plain" html_mime_part = parsed.get_body(("html",)) if html_mime_part: assert isinstance(html_mime_part, EmailMessage) ctx["html"] = html_mime_part.get_content() ctx["active"] = "html" return render(request, "front/ping_details.html", ctx) @login_required def ping_body(request: AuthenticatedHttpRequest, code: UUID, n: int) -> HttpResponse: check, rw = _get_check_for_user(request, code) ping = get_object_or_404(Ping, owner=check, n=n) body = ping.get_body_bytes() if not body: raise Http404("not found") response = HttpResponse(body, content_type="application/octet-stream") filename = "%s-%s" % (check.code, ping.n) response["Content-Disposition"] = f'attachment; filename="{filename}.txt"' return response @require_POST @login_required def pause(request: AuthenticatedHttpRequest, code: UUID) -> HttpResponse: check = _get_rw_check_for_user(request, code) # Track the status change for correct downtime calculation in Check.downtimes() check.create_flip("paused", mark_as_processed=True) check.status = "paused" check.last_start = None check.alert_after = None check.save() # After pausing a check we must check if all checks are up, # and Profile.next_nag_date needs to be cleared out: check.project.update_next_nag_dates() # Don't redirect after an AJAX request: if request.META.get("HTTP_X_REQUESTED_WITH") == "XMLHttpRequest": return HttpResponse() return redirect("hc-details", code) @require_POST @login_required def resume(request: AuthenticatedHttpRequest, code: UUID) -> HttpResponse: check = _get_rw_check_for_user(request, code) if check.status != "paused": return HttpResponseBadRequest() check.create_flip("new", mark_as_processed=True) check.status = "new" check.last_start = None check.last_ping = None check.alert_after = None check.save() return redirect("hc-details", code) @require_POST @login_required def remove_check(request: AuthenticatedHttpRequest, code: UUID) -> HttpResponse: check = _get_rw_check_for_user(request, code) project = check.project check.lock_and_delete() return redirect("hc-checks", project.code) @require_POST @login_required def clear_events(request: AuthenticatedHttpRequest, code: UUID) -> HttpResponse: check = _get_rw_check_for_user(request, code) check.status = "new" check.last_ping = None check.last_start = None check.last_duration = None check.has_confirmation_link = False check.alert_after = None check.save() check.ping_set.all().delete() check.notification_set.all().delete() check.flip_set.all().delete() return redirect("hc-details", code) class PingAnnotations(TypedDict): body_raw_preview: bytes def _get_events( check: Check, page_limit: int, start: datetime, end: datetime, kinds: tuple[str, ...] | None = None, ) -> list[Notification | WithAnnotations[Ping, PingAnnotations] | Flip]: # Sorting by "n" instead of "id" is important here. Both give the same # query results, but sorting by "id" can cause postgres to pick # api_ping.id index (slow if the api_ping table is big). Sorting by # "n" works around the problem--postgres picks the api_ping.owner_id index. pq = check.visible_pings.order_by("-n") pq = pq.filter(created__gte=start, created__lte=end) if kinds is not None: kinds_filter = Q(kind__in=kinds) if "success" in kinds: kinds_filter = kinds_filter | Q(kind__isnull=True) | Q(kind="") pq = pq.filter(kinds_filter) # Optimization: defer loading body_raw, instead load its first 150 bytes # as "body_raw_preview". This reduces both network I/O to database, and disk I/O # on the database host if the database contains large request bodies. pq = pq.defer("body_raw") pq = pq.annotate(body_raw_preview=Substr("body_raw", 1, 151)) pings = list(pq[:page_limit]) # Optimization: the template will access Ping.duration, which would generate a # SQL query per displayed ping. Since we've already fetched a list of pings, # for some of them we can calculate durations more efficiently, without causing # additional SQL queries: starts: dict[UUID | None, datetime | None] = {} num_misses = 0 for ping in reversed(pings): if ping.kind == "start": starts[ping.rid] = ping.created elif ping.kind in (None, "", "fail"): if ping.rid not in starts: # We haven't seen a start, success or fail event for this rid. # Will need to fall back to Ping.duration(). num_misses += 1 else: ping.duration = None matching_start = starts[ping.rid] if matching_start is not None: if ping.created - matching_start < MAX_DURATION: ping.duration = ping.created - matching_start starts[ping.rid] = None # If we will need to fall back to Ping.duration() more than 10 times # then disable duration display altogether: if num_misses > 10: for ping in pings: ping.duration = None alerts: list[Notification] = [] if kinds and "notification" in kinds: aq = check.notification_set.order_by("-created") aq = aq.filter(created__gte=start, created__lte=end, check_status="down") aq = aq.select_related("channel") alerts = list(aq[:page_limit]) flips: list[Flip] = [] if kinds is None or "flip" in kinds: fq = check.flip_set.order_by("-created") fq = fq.filter(created__gte=start, created__lte=end) flips = list(fq[:page_limit]) events = pings + alerts + flips # Sort events by the timestamp. # If timestamps are equal, put flips chronologically after pings events.sort(key=lambda el: (el.created, isinstance(el, Flip)), reverse=True) return events[:page_limit] @login_required def log(request: AuthenticatedHttpRequest, code: UUID) -> HttpResponse: check, rw = _get_check_for_user(request, code, preload_owner_profile=True) smin = check.created smax = now() oldest_ping = check.visible_pings.order_by("n").first() if oldest_ping: smin = max(smin, oldest_ping.created) events = _get_events(check, 1000, start=smin, end=smax) ctx = { "page": "log", "project": check.project, "check": check, "min": smin, "max": smax, "events": events, "oldest_ping": oldest_ping, } if events: # A full precision timestamp of the most recent event. # This will be used client-side for fetching live updates to specify # "return any events after *this* point". ctx["last_event_timestamp"] = events[0].created.timestamp() return render(request, "front/log.html", ctx) @login_required def details(request: AuthenticatedHttpRequest, code: UUID) -> HttpResponse: _refresh_last_active_date(request) check, rw = _get_check_for_user(request, code, preload_owner_profile=True) if request.GET.get("urls") in ("uuid", "slug") and rw: check.project.show_slugs = request.GET["urls"] == "slug" check.project.save() all_channels = check.project.channel_set.order_by("created") regular_channels: list[Channel] = [] group_channels: list[Channel] = [] for channel in all_channels: channels = group_channels if channel.kind == "group" else regular_channels channels.append(channel) all_tags = set() sibling_checks = Check.objects.filter(project=check.project).only("tags", "tz") for sibling in sibling_checks: if sibling.tags: all_tags.update(sibling.tags.split(" ")) ctx = { "page": "details", "project": check.project, "check": check, "rw": rw, "channels": regular_channels, "group_channels": group_channels, "enabled_channels": list(check.channel_set.all()), "common_timezones": _common_timezones(sibling_checks), "timezones": all_timezones, "downtimes": check.downtimes(3, request.profile.tz), "tz": request.profile.tz, "is_copied": "copied" in request.GET, "all_tags": " ".join(sorted(all_tags)), } return render(request, "front/details.html", ctx) @login_required def uncloak(request: AuthenticatedHttpRequest, unique_key: str) -> HttpResponse: for check in request.profile.checks_from_all_projects().only("code"): if check.unique_key == unique_key: return redirect("hc-details", check.code) raise Http404("not found") @login_required def transfer(request: AuthenticatedHttpRequest, code: UUID) -> HttpResponse: check = _get_rw_check_for_user(request, code) if request.method == "POST": form = forms.TransferForm(request.POST) if not form.is_valid(): return HttpResponseBadRequest() target_project = _get_rw_project_for_user(request, form.cleaned_data["project"]) if target_project.owner_id != check.project.owner_id: if target_project.num_checks_available() <= 0: return HttpResponseBadRequest() check.project = target_project check.save() check.assign_all_channels() messages.success(request, "Check transferred successfully!") return redirect("hc-details", code) ctx = {"check": check} return render(request, "front/transfer_modal.html", ctx) @require_POST @login_required def copy(request: AuthenticatedHttpRequest, code: UUID) -> HttpResponse: check = _get_rw_check_for_user(request, code) if check.project.num_checks_available() <= 0: return HttpResponseBadRequest() new_name = check.name + " (copy)" # Make sure we don't exceed the 100 character db field limit: if len(new_name) > 100: new_name = check.name[:90] + "... (copy)" new_slug = check.slug + "-copy" if len(new_slug) > 100: new_slug = "" copied = Check(project=check.project) copied.name = new_name copied.slug = new_slug copied.desc, copied.tags = check.desc, check.tags copied.filter_subject = check.filter_subject copied.filter_body = check.filter_body copied.start_kw = check.start_kw copied.success_kw = check.success_kw copied.failure_kw = check.failure_kw copied.methods = check.methods copied.manual_resume = check.manual_resume copied.kind = check.kind copied.timeout, copied.grace = check.timeout, check.grace copied.schedule, copied.tz = check.schedule, check.tz copied.save() copied.channel_set.add(*check.channel_set.all()) url = reverse("hc-details", args=[copied.code]) return redirect(url + "?copied") def status_single(request: HttpRequest, code: UUID) -> HttpResponse: if not request.user.is_authenticated: return HttpResponseForbidden() # We now know user is logged, tell the type checker request.profile exists- request = cast(AuthenticatedHttpRequest, request) check, rw = _get_check_for_user(request, code, preload_owner_profile=True) status = check.get_status() events = _get_events(check, 30, start=check.created, end=now()) updated = "1" if len(events): updated = str(events[0].created.timestamp()) doc = { "status": status, "status_text": STATUS_TEXT_TMPL.render({"check": check, "rw": rw}), "title": down_title(check), "updated": updated, "started": check.last_start is not None, } if updated != request.GET.get("u"): doc["events"] = EVENTS_TMPL.render({"check": check, "events": events}) downtimes = check.downtimes(3, request.profile.tz) doc["downtimes"] = DOWNTIMES_TMPL.render( {"downtimes": downtimes, "tz": request.profile.tz} ) return JsonResponse(doc) @login_required def badges(request: AuthenticatedHttpRequest, code: UUID) -> HttpResponse: project, rw = _get_project_for_user(request, code) if request.method == "POST": form = forms.BadgeSettingsForm(request.POST) if not form.is_valid(): return HttpResponseBadRequest() fmt = form.cleaned_data["fmt"] states = form.cleaned_data["states"] with_late = True if states == "3" else False if form.cleaned_data["target"] == "all": label = settings.MASTER_BADGE_LABEL url = get_badge_url(project.badge_key, "*", fmt, with_late) elif form.cleaned_data["target"] == "tag": label = form.cleaned_data["tag"] url = get_badge_url(project.badge_key, label, fmt, with_late) elif form.cleaned_data["target"] == "check": check = project.check_set.get(code=form.cleaned_data["check"]) url = absolute_reverse( "hc-badge-check", args=[states, check.prepare_badge_key(), fmt] ) label = check.name_then_code() if fmt == "shields": url = "https://img.shields.io/endpoint?" + urlencode({"url": url}) ctx = {"fmt": fmt, "label": label, "url": url} return render(request, "front/badges_preview.html", ctx) checks = list(project.check_set.order_by("name")) tags = set() for check in checks: tags.update(check.tags_list()) sorted_tags = sorted(tags, key=lambda s: s.lower()) ctx = { "project": project, "page": "badges", "checks": checks, "tags": sorted_tags, "fmt": "svg", "label": settings.MASTER_BADGE_LABEL, "url": get_badge_url(project.badge_key, "*"), } return render(request, "front/badges.html", ctx) @login_required def channels(request: AuthenticatedHttpRequest, code: UUID) -> HttpResponse: project, rw = _get_project_for_user(request, code) if request.method == "POST": if not rw: return HttpResponseForbidden() channel_code = request.POST["channel"] try: channel = Channel.objects.get(code=channel_code) except Channel.DoesNotExist: return HttpResponseBadRequest() if channel.project_id != project.id: return HttpResponseForbidden() new_checks = [] for key in request.POST: if key.startswith("check-"): check_code = key[6:] try: check = Check.objects.get(code=check_code) except Check.DoesNotExist: return HttpResponseBadRequest() if check.project_id != project.id: return HttpResponseForbidden() new_checks.append(check) channel.checks.set(new_checks) return redirect("hc-channels", project.code) channels = project.channel_set.annotate(n_checks=Count("checks")) # Sort groups first, then in the creation order channels = channels.annotate(is_group=Case(When(kind="group", then=0), default=1)) channels = channels.order_by("is_group", "created") ctx = { "page": "channels", "rw": rw, "project": project, "profile": project.owner_profile, "channels": channels, "enable_apprise": settings.APPRISE_ENABLED is True, "enable_call": bool(settings.TWILIO_AUTH), "enable_discord": bool(settings.DISCORD_CLIENT_ID), "enable_matrix": bool(settings.MATRIX_ACCESS_TOKEN), "enable_mattermost": settings.MATTERMOST_ENABLED is True, "enable_msteams": settings.MSTEAMS_ENABLED is True, "enable_opsgenie": settings.OPSGENIE_ENABLED is True, "enable_pagertree": settings.PAGERTREE_ENABLED is True, "enable_pd": settings.PD_ENABLED is True, "enable_prometheus": settings.PROMETHEUS_ENABLED is True, "enable_pushbullet": bool(settings.PUSHBULLET_CLIENT_ID), "enable_pushover": bool(settings.PUSHOVER_API_TOKEN), "enable_rocketchat": settings.ROCKETCHAT_ENABLED is True, "enable_shell": settings.SHELL_ENABLED is True, "enable_signal": bool(settings.SIGNAL_CLI_SOCKET), "enable_slack": settings.SLACK_ENABLED is True, "enable_slack_btn": bool(settings.SLACK_CLIENT_ID), "enable_sms": bool(settings.TWILIO_AUTH), "enable_spike": settings.SPIKE_ENABLED is True, "enable_telegram": bool(settings.TELEGRAM_TOKEN), "enable_trello": bool(settings.TRELLO_APP_KEY), "enable_victorops": settings.VICTOROPS_ENABLED is True, "enable_webhooks": settings.WEBHOOKS_ENABLED is True, "enable_whatsapp": settings.TWILIO_USE_WHATSAPP, "enable_zulip": settings.ZULIP_ENABLED is True, "use_payments": settings.USE_PAYMENTS, } return render(request, "front/channels.html", ctx) @login_required def channel_checks(request: AuthenticatedHttpRequest, code: UUID) -> HttpResponse: channel = _get_rw_channel_for_user(request, code) assigned = set(channel.checks.values_list("code", flat=True).distinct()) checks = channel.project.check_set.order_by("created") ctx = {"checks": checks, "assigned": assigned, "channel": channel} return render(request, "front/channel_checks.html", ctx) @require_POST @login_required def update_channel_name(request: AuthenticatedHttpRequest, code: UUID) -> HttpResponse: channel = _get_rw_channel_for_user(request, code) form = forms.ChannelNameForm(request.POST) if form.is_valid(): channel.name = form.cleaned_data["name"] channel.save() return redirect("hc-channels", channel.project.code) def verify_email(request: HttpRequest, code: UUID, token: str) -> HttpResponse: channel = get_object_or_404(Channel, code=code) if channel.make_token() == token: channel.email_verified = True channel.save() return render(request, "front/verify_email_success.html") return render(request, "bad_link.html") @csrf_exempt def unsubscribe_email( request: HttpRequest, code: UUID, signed_token: str ) -> HttpResponse: ctx = {} # Some email servers open links in emails to check for malicious content. # To work around this, on GET requests we serve a confirmation form. # If the signature is at least 5 minutes old, we also include JS code to # auto-submit the form. signer = signing.TimestampSigner(salt="alerts") # First, check the signature without looking at the timestamp: try: token = signer.unsign(signed_token) except signing.BadSignature: return render(request, "bad_link.html") # Then, check if timestamp is older than 5 minutes: try: signer.unsign(signed_token, max_age=300) except signing.SignatureExpired: ctx["autosubmit"] = True channel = get_object_or_404(Channel, code=code, kind="email") if channel.make_token() != token: return render(request, "bad_link.html") if request.method != "POST": return render(request, "accounts/unsubscribe_submit.html", ctx) channel.delete() return render(request, "front/unsubscribe_success.html") @require_POST @login_required def send_test_notification( request: AuthenticatedHttpRequest, code: UUID ) -> HttpResponse: channel, rw = _get_channel_for_user(request, code) dummy = Check(name="TEST", status="down", project=channel.project) dummy.last_ping = now() - td(days=1) dummy.n_pings = 42 dummy_flip = Flip(owner=dummy) dummy_flip.created = now() dummy_flip.old_status = "up" dummy_flip.new_status = "down" # Delete all older test notifications for this channel Notification.objects.filter(channel=channel, owner=None).delete() # Send the test notification error = channel.notify(dummy_flip, is_test=True) if error == "no-op": # This channel may be configured to send "up" notifications only. dummy_flip.old_status = "down" dummy_flip.new_status = "up" error = channel.notify(dummy_flip, is_test=True) if error: messages.warning(request, "Could not send a test notification. %s." % error) else: messages.success(request, "Test notification sent!") return redirect("hc-channels", channel.project.code) @require_POST @login_required def remove_channel(request: AuthenticatedHttpRequest, code: UUID) -> HttpResponse: channel = _get_rw_channel_for_user(request, code) project = channel.project channel.delete() return redirect("hc-channels", project.code) def email_form(request: AuthenticatedHttpRequest, channel: Channel) -> HttpResponse: adding = channel._state.adding if request.method == "POST": form = forms.EmailForm(request.POST) if form.is_valid(): if channel.disabled or form.cleaned_data["value"] != channel.email.value: channel.disabled = False if not settings.EMAIL_USE_VERIFICATION: # In self-hosted setting, administrator can set # EMAIL_USE_VERIFICATION=False to disable email verification channel.email_verified = True elif form.cleaned_data["value"] == request.user.email: # If the user is adding *their own* address # we skip the verification step channel.email_verified = True else: channel.email_verified = False channel.value = form.get_value() channel.save() if adding: channel.assign_all_checks() if not channel.email_verified: channel.send_verify_link() return redirect("hc-channels", channel.project.code) elif adding: form = forms.EmailForm() else: form = forms.EmailForm( { "value": channel.email.value, "up": channel.email.notify_up, "down": channel.email.notify_down, } ) ctx = { "page": "channels", "project": channel.project, "use_verification": settings.EMAIL_USE_VERIFICATION, "form": form, "is_new": adding, } return render(request, "integrations/email_form.html", ctx) @login_required def add_email(request: AuthenticatedHttpRequest, code: UUID) -> HttpResponse: project = _get_rw_project_for_user(request, code) channel = Channel(project=project, kind="email") return email_form(request, channel) @login_required def edit_channel(request: AuthenticatedHttpRequest, code: UUID) -> HttpResponse: channel = _get_rw_channel_for_user(request, code) if channel.kind == "email": return email_form(request, channel) elif channel.kind == "webhook": return webhook_form(request, channel) elif channel.kind == "sms": return sms_form(request, channel) elif channel.kind == "signal": return signal_form(request, channel) elif channel.kind == "whatsapp": return whatsapp_form(request, channel) elif channel.kind == "ntfy": return ntfy_form(request, channel) elif channel.kind == "group": return group_form(request, channel) return HttpResponseBadRequest() @require_setting("WEBHOOKS_ENABLED") def webhook_form(request: HttpRequest, channel: Channel) -> HttpResponse: adding = channel._state.adding if request.method == "POST": form = forms.WebhookForm(request.POST) if form.is_valid(): channel.name = form.cleaned_data["name"] channel.value = form.get_value() channel.save() if adding: channel.assign_all_checks() return redirect("hc-channels", channel.project.code) elif adding: form = forms.WebhookForm() else: def flatten(d: dict[str, str]) -> str: return "\n".join("%s: %s" % pair for pair in d.items()) doc = json.loads(channel.value) doc["headers_down"] = flatten(doc["headers_down"]) doc["headers_up"] = flatten(doc["headers_up"]) doc["name"] = channel.name form = forms.WebhookForm(doc) ctx = { "page": "channels", "project": channel.project, "form": form, "is_new": adding, } return render(request, "integrations/webhook_form.html", ctx) @require_setting("WEBHOOKS_ENABLED") @login_required def add_webhook(request: AuthenticatedHttpRequest, code: UUID) -> HttpResponse: project = _get_rw_project_for_user(request, code) channel = Channel(project=project, kind="webhook") return webhook_form(request, channel) @require_setting("SHELL_ENABLED") @login_required def add_shell(request: AuthenticatedHttpRequest, code: UUID) -> HttpResponse: project = _get_rw_project_for_user(request, code) if request.method == "POST": form = forms.AddShellForm(request.POST) if form.is_valid(): channel = Channel(project=project, kind="shell") channel.value = form.get_value() channel.save() channel.assign_all_checks() return redirect("hc-channels", project.code) else: form = forms.AddShellForm() ctx = { "page": "channels", "project": project, "form": form, } return render(request, "integrations/add_shell.html", ctx) @require_setting("PD_ENABLED") @login_required def add_pd(request: AuthenticatedHttpRequest, code: UUID) -> HttpResponse: project = _get_rw_project_for_user(request, code) # Simple Install Flow if settings.PD_APP_ID: state = token_urlsafe() redirect_url = absolute_reverse("hc-add-pd-complete") redirect_url += "?" + urlencode({"state": state}) install_url = "https://app.pagerduty.com/install/integration?" + urlencode( {"app_id": settings.PD_APP_ID, "redirect_url": redirect_url, "version": "2"} ) ctx = {"page": "channels", "project": project, "install_url": install_url} request.session["pagerduty"] = (state, str(project.code)) return render(request, "integrations/add_pd_simple.html", ctx) if request.method == "POST": form = forms.AddPdForm(request.POST) if form.is_valid(): channel = Channel(project=project, kind="pd") channel.value = form.cleaned_data["value"] channel.save() channel.assign_all_checks() return redirect("hc-channels", project.code) else: form = forms.AddPdForm() ctx = {"page": "channels", "project": project, "form": form} return render(request, "integrations/add_pd.html", ctx) @require_setting("PD_ENABLED") @require_setting("PD_APP_ID") @login_required def add_pd_complete(request: AuthenticatedHttpRequest) -> HttpResponse: if "pagerduty" not in request.session: return HttpResponseBadRequest() state, code_str = request.session.pop("pagerduty") code = UUID(code_str) if request.GET.get("state") != state: return HttpResponseForbidden() project = _get_rw_project_for_user(request, code) doc = json.loads(request.GET["config"]) for item in doc["integration_keys"]: channel = Channel(kind="pd", project=project) channel.name = item["name"] channel.value = json.dumps( {"service_key": item["integration_key"], "account": doc["account"]["name"]} ) channel.save() channel.assign_all_checks() messages.success(request, "The PagerDuty integration has been added!") return redirect("hc-channels", project.code) @require_setting("PD_ENABLED") @require_setting("PD_APP_ID") def pd_help(request: HttpRequest) -> HttpResponse: ctx = {"page": "channels"} return render(request, "integrations/add_pd_simple.html", ctx) @require_setting("PAGERTREE_ENABLED") @login_required def add_pagertree(request: AuthenticatedHttpRequest, code: UUID) -> HttpResponse: project = _get_rw_project_for_user(request, code) if request.method == "POST": form = forms.AddUrlForm(request.POST) if form.is_valid(): channel = Channel(project=project, kind="pagertree") channel.value = form.cleaned_data["value"] channel.save() channel.assign_all_checks() return redirect("hc-channels", project.code) else: form = forms.AddUrlForm() ctx = {"page": "channels", "project": project, "form": form} return render(request, "integrations/add_pagertree.html", ctx) @require_setting("SLACK_ENABLED") @login_required def add_slack(request: AuthenticatedHttpRequest, code: UUID) -> HttpResponse: project = _get_rw_project_for_user(request, code) if request.method == "POST": form = forms.AddUrlForm(request.POST) if form.is_valid(): channel = Channel(project=project, kind="slack") channel.value = form.cleaned_data["value"] channel.save() channel.assign_all_checks() return redirect("hc-channels", project.code) else: form = forms.AddUrlForm() ctx = { "page": "channels", "form": form, } return render(request, "integrations/add_slack.html", ctx) @require_setting("SLACK_ENABLED") @require_setting("SLACK_CLIENT_ID") def slack_help(request: HttpRequest) -> HttpResponse: ctx = {"page": "channels"} return render(request, "integrations/add_slack_btn.html", ctx) @require_setting("SLACK_ENABLED") @require_setting("SLACK_CLIENT_ID") @login_required def add_slack_btn(request: AuthenticatedHttpRequest, code: UUID) -> HttpResponse: project = _get_rw_project_for_user(request, code) state = token_urlsafe() authorize_url = "https://slack.com/oauth/v2/authorize?" + urlencode( { "scope": "incoming-webhook", "client_id": settings.SLACK_CLIENT_ID, "state": state, } ) ctx = { "project": project, "page": "channels", "authorize_url": authorize_url, } request.session["add_slack"] = (state, str(project.code)) return render(request, "integrations/add_slack_btn.html", ctx) @require_setting("SLACK_ENABLED") @require_setting("SLACK_CLIENT_ID") @login_required def add_slack_complete(request: AuthenticatedHttpRequest) -> HttpResponse: if "add_slack" not in request.session: return HttpResponseForbidden() state, code_str = request.session.pop("add_slack") code = UUID(code_str) project = _get_rw_project_for_user(request, code) if request.GET.get("error") == "access_denied": messages.warning(request, "Slack setup was cancelled.") return redirect("hc-channels", project.code) if request.GET.get("state") != state: return HttpResponseForbidden() data = { "client_id": settings.SLACK_CLIENT_ID, "client_secret": settings.SLACK_CLIENT_SECRET, "code": request.GET.get("code"), } result = curl.post("https://slack.com/api/oauth.v2.access", data) doc = result.json() if not isinstance(doc, dict) or not doc.get("ok"): messages.warning( request, "Received an unexpected response from Slack. Integration not added.", ) logger.warning("Unexpected Slack OAuth response: %s", result.content) return redirect("hc-channels", project.code) channel = Channel(kind="slack", project=project) channel.value = result.text if channel.slack_channel: channel.name = channel.slack_channel channel.save() channel.assign_all_checks() messages.success(request, "Success, integration added!") return redirect("hc-channels", project.code) @require_setting("MATTERMOST_ENABLED") def mattermost_help(request: HttpRequest) -> HttpResponse: return render(request, "integrations/add_mattermost.html") @require_setting("MATTERMOST_ENABLED") @login_required def add_mattermost(request: AuthenticatedHttpRequest, code: UUID) -> HttpResponse: project = _get_rw_project_for_user(request, code) if request.method == "POST": form = forms.AddUrlForm(request.POST) if form.is_valid(): channel = Channel(project=project, kind="mattermost") channel.value = form.cleaned_data["value"] channel.save() channel.assign_all_checks() return redirect("hc-channels", project.code) else: form = forms.AddUrlForm() ctx = {"page": "channels", "form": form, "project": project} return render(request, "integrations/add_mattermost.html", ctx) @require_setting("ROCKETCHAT_ENABLED") def rocketchat_help(request: HttpRequest) -> HttpResponse: return render(request, "integrations/add_rocketchat.html") @require_setting("ROCKETCHAT_ENABLED") @login_required def add_rocketchat(request: AuthenticatedHttpRequest, code: UUID) -> HttpResponse: project = _get_rw_project_for_user(request, code) if request.method == "POST": form = forms.AddUrlForm(request.POST) if form.is_valid(): channel = Channel(project=project, kind="rocketchat") channel.value = form.cleaned_data["value"] channel.save() channel.assign_all_checks() return redirect("hc-channels", project.code) else: form = forms.AddUrlForm() ctx = {"page": "channels", "form": form, "project": project} return render(request, "integrations/add_rocketchat.html", ctx) @require_setting("PUSHBULLET_CLIENT_ID") @login_required def add_pushbullet(request: AuthenticatedHttpRequest, code: UUID) -> HttpResponse: project = _get_rw_project_for_user(request, code) state = token_urlsafe() authorize_url = "https://www.pushbullet.com/authorize?" + urlencode( { "client_id": settings.PUSHBULLET_CLIENT_ID, "redirect_uri": absolute_reverse(add_pushbullet_complete), "response_type": "code", "state": state, } ) ctx = { "page": "channels", "project": project, "authorize_url": authorize_url, } request.session["add_pushbullet"] = (state, str(project.code)) return render(request, "integrations/add_pushbullet.html", ctx) class PushbulletOAuthResponse(BaseModel): access_token: str @require_setting("PUSHBULLET_CLIENT_ID") @login_required def add_pushbullet_complete(request: AuthenticatedHttpRequest) -> HttpResponse: if "add_pushbullet" not in request.session: return HttpResponseForbidden() state, code_str = request.session.pop("add_pushbullet") code = UUID(code_str) project = _get_rw_project_for_user(request, code) if request.GET.get("error") == "access_denied": messages.warning(request, "Pushbullet setup was cancelled.") return redirect("hc-channels", project.code) if request.GET.get("state") != state: return HttpResponseForbidden() data = { "client_id": settings.PUSHBULLET_CLIENT_ID, "client_secret": settings.PUSHBULLET_CLIENT_SECRET, "code": request.GET.get("code"), "grant_type": "authorization_code", } result = curl.post("https://api.pushbullet.com/oauth2/token", data) try: doc = PushbulletOAuthResponse.model_validate_json(result.content, strict=True) except ValidationError: logger.warning("Unexpected Pushbullet OAuth response: %s", result.content) messages.warning( request, "Received an unexpected response from Pushbullet. Integration not added.", ) return redirect("hc-channels", project.code) channel = Channel(kind="pushbullet", project=project) channel.value = doc.access_token channel.save() channel.assign_all_checks() messages.success(request, "The Pushbullet integration has been added!") return redirect("hc-channels", project.code) @require_setting("DISCORD_CLIENT_ID") @login_required def add_discord(request: AuthenticatedHttpRequest, code: UUID) -> HttpResponse: project = _get_rw_project_for_user(request, code) state = token_urlsafe() auth_url = "https://discordapp.com/api/oauth2/authorize?" + urlencode( { "client_id": settings.DISCORD_CLIENT_ID, "scope": "webhook.incoming", "redirect_uri": absolute_reverse(add_discord_complete), "response_type": "code", "state": state, } ) ctx = {"page": "channels", "project": project, "authorize_url": auth_url} request.session["add_discord"] = (state, str(project.code)) return render(request, "integrations/add_discord.html", ctx) @require_setting("DISCORD_CLIENT_ID") @login_required def add_discord_complete(request: AuthenticatedHttpRequest) -> HttpResponse: if "add_discord" not in request.session: return HttpResponseForbidden() state, code_str = request.session.pop("add_discord") code = UUID(code_str) project = _get_rw_project_for_user(request, code) if request.GET.get("error") == "access_denied": messages.warning(request, "Discord setup was cancelled.") return redirect("hc-channels", project.code) if request.GET.get("state") != state: return HttpResponseForbidden() data = { "client_id": settings.DISCORD_CLIENT_ID, "client_secret": settings.DISCORD_CLIENT_SECRET, "code": request.GET.get("code"), "grant_type": "authorization_code", "redirect_uri": absolute_reverse(add_discord_complete), } result = curl.post("https://discordapp.com/api/oauth2/token", data) doc = result.json() if isinstance(doc, dict) and doc.get("code") == 30007: e = "maximum number of webhooks reached" messages.warning(request, f"Response from Discord: {e}. Integration not added.") return redirect("hc-channels", project.code) if not isinstance(doc, dict) or "access_token" not in doc: messages.warning( request, "Received an unexpected response from Discord. Integration not added.", ) logger.warning("Unexpected Discord OAuth response: %s", result.content) return redirect("hc-channels", project.code) channel = Channel(kind="discord", project=project) channel.value = result.text channel.save() channel.assign_all_checks() messages.success(request, "The Discord integration has been added!") return redirect("hc-channels", project.code) @require_setting("PUSHOVER_API_TOKEN") def pushover_help(request: HttpRequest) -> HttpResponse: ctx = {"page": "channels"} return render(request, "integrations/add_pushover_help.html", ctx) @require_setting("PUSHOVER_API_TOKEN") @login_required def add_pushover(request: AuthenticatedHttpRequest, code: UUID) -> HttpResponse: project = _get_rw_project_for_user(request, code) if request.method == "POST": state = token_urlsafe().lower() failure_url = absolute_reverse("hc-channels", args=[project.code]) success_url = absolute_reverse("hc-add-pushover", args=[project.code]) success_url += "?" + urlencode( { "state": state, "prio": request.POST.get("po_priority", "0"), "prio_up": request.POST.get("po_priority_up", "0"), } ) assert settings.PUSHOVER_SUBSCRIPTION_URL subscription_url = ( settings.PUSHOVER_SUBSCRIPTION_URL + "?" + urlencode({"success": success_url, "failure": failure_url}) ) request.session["pushover"] = state return redirect(subscription_url) # Handle successful subscriptions if "pushover_user_key" in request.GET: if "pushover" not in request.session: return HttpResponseForbidden() state = request.session.pop("pushover") if request.GET.get("state") != state: return HttpResponseForbidden() if request.GET.get("pushover_unsubscribed") == "1": # Unsubscription: delete all Pushover channels for this project Channel.objects.filter(project=project, kind="po").delete() return redirect("hc-channels", project.code) form = forms.AddPushoverForm(request.GET) if not form.is_valid(): return HttpResponseBadRequest() channel = Channel(project=project, kind="po") channel.value = form.get_value() channel.save() channel.assign_all_checks() messages.success(request, "The Pushover integration has been added!") return redirect("hc-channels", project.code) # Show Integration Settings form ctx = { "page": "channels", "project": project, "po_retry_delay": td(seconds=settings.PUSHOVER_EMERGENCY_RETRY_DELAY), "po_expiration": td(seconds=settings.PUSHOVER_EMERGENCY_EXPIRATION), } return render(request, "integrations/add_pushover.html", ctx) @require_setting("OPSGENIE_ENABLED") @login_required def add_opsgenie(request: AuthenticatedHttpRequest, code: UUID) -> HttpResponse: project = _get_rw_project_for_user(request, code) if request.method == "POST": form = forms.AddOpsgenieForm(request.POST) if form.is_valid(): channel = Channel(project=project, kind="opsgenie") v = {"region": form.cleaned_data["region"], "key": form.cleaned_data["key"]} channel.value = json.dumps(v) channel.save() channel.assign_all_checks() return redirect("hc-channels", project.code) else: form = forms.AddOpsgenieForm() ctx = {"page": "channels", "project": project, "form": form} return render(request, "integrations/add_opsgenie.html", ctx) @require_setting("VICTOROPS_ENABLED") @login_required def add_victorops(request: AuthenticatedHttpRequest, code: UUID) -> HttpResponse: project = _get_rw_project_for_user(request, code) if request.method == "POST": form = forms.AddUrlForm(request.POST) if form.is_valid(): channel = Channel(project=project, kind="victorops") channel.value = form.cleaned_data["value"] channel.save() channel.assign_all_checks() return redirect("hc-channels", project.code) else: form = forms.AddUrlForm() ctx = {"page": "channels", "project": project, "form": form} return render(request, "integrations/add_victorops.html", ctx) @require_setting("ZULIP_ENABLED") @login_required def add_zulip(request: AuthenticatedHttpRequest, code: UUID) -> HttpResponse: project = _get_rw_project_for_user(request, code) if request.method == "POST": form = forms.AddZulipForm(request.POST) if form.is_valid(): channel = Channel(project=project, kind="zulip") channel.value = form.get_value() channel.save() channel.assign_all_checks() return redirect("hc-channels", project.code) else: form = forms.AddZulipForm() ctx = {"page": "channels", "project": project, "form": form} return render(request, "integrations/add_zulip.html", ctx) class TelegramChat(BaseModel): id: int type: Literal["group", "private", "supergroup", "channel"] title: str | None = None username: str | None = None class TelegramMessage(BaseModel): chat: TelegramChat text: str message_thread_id: int | None = None class TelegramCallback(BaseModel): message: TelegramMessage @classmethod def load(self, data: bytes) -> TelegramCallback: doc = json.loads(data.decode()) if "channel_post" in doc: # Telegram's "channel_post" key uses the same structure as "message". # To keep the validation and view logic simple, if the payload # contains "channel_post", copy it to "message", and proceed as usual. doc["message"] = doc["channel_post"] return TelegramCallback.model_validate(doc, strict=True) @csrf_exempt @require_POST def telegram_bot(request: HttpRequest) -> HttpResponse: try: doc = TelegramCallback.load(request.body) except ValidationError: # We don't recognize the message format, but don't want Telegram # retrying this over and over again, so respond with 200 OK return HttpResponse() except ValueError: return HttpResponseBadRequest() if "/start" not in doc.message.text: return HttpResponse() chat = doc.message.chat recipient = { "id": chat.id, "type": chat.type, "name": chat.title or chat.username, "thread_id": doc.message.message_thread_id, } invite = render_to_string( "integrations/telegram_invite.html", {"qs": signing.dumps(recipient)}, ) try: Telegram.send(chat.id, doc.message.message_thread_id, invite) except TransportError: # Swallow the error and return HTTP 200 OK, otherwise Telegram will # hit the webhook again and again. pass return HttpResponse() @require_setting("TELEGRAM_TOKEN") def telegram_help(request: HttpRequest) -> HttpResponse: ctx = { "page": "channels", "bot_name": settings.TELEGRAM_BOT_NAME, } return render(request, "integrations/add_telegram.html", ctx) @require_setting("TELEGRAM_TOKEN") @login_required def add_telegram(request: AuthenticatedHttpRequest) -> HttpResponse: recipient = None if qs := request.META["QUERY_STRING"]: try: recipient = signing.loads(qs, max_age=600) assert isinstance(recipient, dict) except signing.BadSignature: return render(request, "bad_link.html") if request.method == "POST": form = forms.AddTelegramForm(request.POST) if not form.is_valid(): return HttpResponseBadRequest() project = _get_rw_project_for_user(request, form.cleaned_data["project"]) channel = Channel(project=project, kind="telegram") channel.value = json.dumps(recipient) channel.save() channel.assign_all_checks() messages.success(request, "The Telegram integration has been added!") return redirect("hc-channels", project.code) ctx = { "page": "channels", "projects": request.profile.projects(), "recipient": recipient, "bot_name": settings.TELEGRAM_BOT_NAME, } return render(request, "integrations/add_telegram.html", ctx) @require_setting("TWILIO_AUTH") def sms_form(request: HttpRequest, channel: Channel) -> HttpResponse: adding = channel._state.adding if request.method == "POST": form = forms.PhoneUpDownForm(request.POST) if form.is_valid(): channel.name = form.cleaned_data["label"] channel.value = form.get_json() channel.save() if adding: channel.assign_all_checks() return redirect("hc-channels", channel.project.code) elif adding: form = forms.PhoneUpDownForm(initial={"up": False}) else: form = forms.PhoneUpDownForm( { "label": channel.name, "phone": channel.phone.value, "up": channel.phone.notify_up, "down": channel.phone.notify_down, } ) ctx = { "page": "channels", "project": channel.project, "form": form, "profile": channel.project.owner_profile, "is_new": adding, } return render(request, "integrations/sms_form.html", ctx) @require_setting("TWILIO_AUTH") @login_required def add_sms(request: AuthenticatedHttpRequest, code: UUID) -> HttpResponse: project = _get_rw_project_for_user(request, code) channel = Channel(project=project, kind="sms") return sms_form(request, channel) @require_setting("TWILIO_AUTH") @login_required def add_call(request: AuthenticatedHttpRequest, code: UUID) -> HttpResponse: project = _get_rw_project_for_user(request, code) if request.method == "POST": form = forms.PhoneNumberForm(request.POST) if form.is_valid(): channel = Channel(project=project, kind="call") channel.name = form.cleaned_data["label"] channel.value = form.get_json() channel.save() channel.assign_all_checks() return redirect("hc-channels", project.code) else: form = forms.PhoneNumberForm() ctx = { "page": "channels", "project": project, "form": form, "profile": project.owner_profile, } return render(request, "integrations/add_call.html", ctx) @require_setting("TWILIO_USE_WHATSAPP") def whatsapp_form(request: HttpRequest, channel: Channel) -> HttpResponse: adding = channel._state.adding if request.method == "POST": form = forms.PhoneUpDownForm(request.POST) if form.is_valid(): channel.name = form.cleaned_data["label"] channel.value = form.get_json() channel.save() if adding: channel.assign_all_checks() return redirect("hc-channels", channel.project.code) elif adding: form = forms.PhoneUpDownForm() else: form = forms.PhoneUpDownForm( { "label": channel.name, "phone": channel.phone.value, "up": channel.phone.notify_up, "down": channel.phone.notify_down, } ) ctx = { "page": "channels", "project": channel.project, "form": form, "profile": channel.project.owner_profile, "is_new": adding, } return render(request, "integrations/whatsapp_form.html", ctx) @require_setting("TWILIO_USE_WHATSAPP") @login_required def add_whatsapp(request: AuthenticatedHttpRequest, code: UUID) -> HttpResponse: project = _get_rw_project_for_user(request, code) channel = Channel(project=project, kind="whatsapp") return whatsapp_form(request, channel) @require_setting("SIGNAL_CLI_SOCKET") def signal_form(request: HttpRequest, channel: Channel) -> HttpResponse: adding = channel._state.adding if request.method == "POST": form = forms.PhoneUpDownForm(request.POST) if form.is_valid(): channel.name = form.cleaned_data["label"] channel.value = form.get_json() channel.save() if adding: channel.assign_all_checks() return redirect("hc-channels", channel.project.code) elif adding: form = forms.PhoneUpDownForm() else: form = forms.PhoneUpDownForm( { "label": channel.name, "phone": channel.phone.value, "up": channel.phone.notify_up, "down": channel.phone.notify_down, } ) ctx = { "page": "channels", "project": channel.project, "form": form, "is_new": adding, } return render(request, "integrations/signal_form.html", ctx) @require_setting("SIGNAL_CLI_SOCKET") @login_required def add_signal(request: AuthenticatedHttpRequest, code: UUID) -> HttpResponse: project = _get_rw_project_for_user(request, code) channel = Channel(project=project, kind="signal") return signal_form(request, channel) @require_setting("TRELLO_APP_KEY") @login_required def add_trello(request: AuthenticatedHttpRequest, code: UUID) -> HttpResponse: project = _get_rw_project_for_user(request, code) if request.method == "POST": form = forms.AddTrelloForm(request.POST) if not form.is_valid(): return HttpResponseBadRequest() channel = Channel(project=project, kind="trello") channel.value = form.get_value() channel.save() channel.assign_all_checks() return redirect("hc-channels", project.code) return_url = absolute_reverse("hc-add-trello", args=[project.code]) authorize_url = "https://trello.com/1/authorize?" + urlencode( { "expiration": "never", "name": settings.SITE_NAME, "scope": "read,write", "response_type": "token", "key": settings.TRELLO_APP_KEY, "return_url": return_url, } ) ctx = { "page": "channels", "project": project, "authorize_url": authorize_url, } return render(request, "integrations/add_trello.html", ctx) @require_setting("MATRIX_ACCESS_TOKEN") @login_required def add_matrix(request: AuthenticatedHttpRequest, code: UUID) -> HttpResponse: project = _get_rw_project_for_user(request, code) if request.method == "POST": form = forms.AddMatrixForm(request.POST) if form.is_valid(): channel = Channel(project=project, kind="matrix") channel.value = form.cleaned_data["room_id"] # If user supplied room alias instead of ID, use it as channel name alias = form.cleaned_data["alias"] if not alias.startswith("!"): channel.name = alias channel.save() channel.assign_all_checks() messages.success(request, "The Matrix integration has been added!") return redirect("hc-channels", project.code) else: form = forms.AddMatrixForm() ctx = { "page": "channels", "project": project, "form": form, "matrix_user_id": settings.MATRIX_USER_ID, } return render(request, "integrations/add_matrix.html", ctx) @require_setting("APPRISE_ENABLED") @login_required def add_apprise(request: AuthenticatedHttpRequest, code: UUID) -> HttpResponse: project = _get_rw_project_for_user(request, code) if request.method == "POST": form = forms.AddAppriseForm(request.POST) if form.is_valid(): channel = Channel(project=project, kind="apprise") channel.value = form.cleaned_data["url"] channel.save() channel.assign_all_checks() messages.success(request, "The Apprise integration has been added!") return redirect("hc-channels", project.code) else: form = forms.AddAppriseForm() ctx = {"page": "channels", "project": project, "form": form} return render(request, "integrations/add_apprise.html", ctx) class TrelloList(BaseModel): id: str name: str class TrelloBoard(BaseModel): id: str name: str lists: list[TrelloList] TrelloBoards = TypeAdapter(list[TrelloBoard]) @require_setting("TRELLO_APP_KEY") @login_required @require_POST def trello_settings(request: AuthenticatedHttpRequest) -> HttpResponse: token = request.POST.get("token", "") url = "https://api.trello.com/1/members/me/boards" assert settings.TRELLO_APP_KEY params = { "key": settings.TRELLO_APP_KEY, "token": token, "filter": "open", "fields": "id,name", "lists": "open", "list_fields": "id,name", } result = curl.get(url, params) try: boards = TrelloBoards.validate_json(result.content) except ValidationError: logger.warning("Unexpected Trello API response: %s", result.content) return render(request, "integrations/trello_settings.html", {"error": 1}) num_lists = sum(len(board.lists) for board in boards) ctx = {"token": token, "boards": boards, "num_lists": num_lists} return render(request, "integrations/trello_settings.html", ctx) @require_setting("MSTEAMS_ENABLED") @login_required def add_msteams(request: AuthenticatedHttpRequest, code: UUID) -> HttpResponse: project = _get_rw_project_for_user(request, code) if request.method == "POST": form = forms.AddUrlForm(request.POST) if form.is_valid(): channel = Channel(project=project, kind="msteamsw") channel.value = form.cleaned_data["value"] channel.save() channel.assign_all_checks() return redirect("hc-channels", project.code) else: form = forms.AddUrlForm() ctx = {"page": "channels", "project": project, "form": form} return render(request, "integrations/add_msteams.html", ctx) @require_setting("PROMETHEUS_ENABLED") @login_required def add_prometheus(request: AuthenticatedHttpRequest, code: UUID) -> HttpResponse: project, rw = _get_project_for_user(request, code) ctx = { "page": "channels", "project": project, "site_scheme": urlparse(settings.SITE_ROOT).scheme, } return render(request, "integrations/add_prometheus.html", ctx) @require_setting("PROMETHEUS_ENABLED") def metrics(request: HttpRequest, code: UUID, key: str) -> HttpResponse: if len(key) != 32: return HttpResponseBadRequest() q = Project.objects.filter(code=code, api_key_readonly=key) try: project = q.get() except Project.DoesNotExist: return HttpResponseForbidden() checks = Check.objects.filter(project_id=project.id).order_by("id") def esc(s: str) -> str: return s.replace("\\", "\\\\").replace('"', '\\"').replace("\n", "\\n") def output(checks: QuerySet[Check]) -> Iterable[str]: help = "Whether the check is currently up (1 for yes, 0 for no)." yield f"# HELP hc_check_up {help}\n" yield "# TYPE hc_check_up gauge\n" TMPL = """hc_check_up{name="%s", tags="%s", unique_key="%s"} %d\n""" for check in checks: value = 0 if check.get_status() == "down" else 1 yield TMPL % (esc(check.name), esc(check.tags), check.unique_key, value) yield "\n" help = "Whether the check is currently started (1 for yes, 0 for no)." yield f"# HELP hc_check_started {help}\n" yield "# TYPE hc_check_started gauge\n" TMPL = """hc_check_started{name="%s", tags="%s", unique_key="%s"} %d\n""" for check in checks: value = 1 if check.last_start is not None else 0 yield TMPL % (esc(check.name), esc(check.tags), check.unique_key, value) all_tags, down_tags, num_down = set(), set(), 0 for check in checks: all_tags.update(check.tags_list()) if check.get_status() == "down": num_down += 1 down_tags.update(check.tags_list()) yield "\n" help = "Whether all checks with this tag are up (1 for yes, 0 for no)." yield f"# HELP hc_tag_up {help}\n" yield "# TYPE hc_tag_up gauge\n" TMPL = """hc_tag_up{tag="%s"} %d\n""" for tag in sorted(all_tags): value = 0 if tag in down_tags else 1 yield TMPL % (esc(tag), value) yield "\n" yield "# HELP hc_checks_total The total number of checks.\n" yield "# TYPE hc_checks_total gauge\n" yield "hc_checks_total %d\n" % len(checks) yield "\n" yield "# HELP hc_checks_down_total The number of checks currently down.\n" yield "# TYPE hc_checks_down_total gauge\n" yield "hc_checks_down_total %d\n" % num_down return HttpResponse(output(checks), content_type="text/plain") @require_setting("SPIKE_ENABLED") @login_required def add_spike(request: AuthenticatedHttpRequest, code: UUID) -> HttpResponse: project = _get_rw_project_for_user(request, code) if request.method == "POST": form = forms.AddUrlForm(request.POST) if form.is_valid(): channel = Channel(project=project, kind="spike") channel.value = form.cleaned_data["value"] channel.save() channel.assign_all_checks() return redirect("hc-channels", project.code) else: form = forms.AddUrlForm() ctx = {"page": "channels", "project": project, "form": form} return render(request, "integrations/add_spike.html", ctx) @login_required def add_gotify(request: AuthenticatedHttpRequest, code: UUID) -> HttpResponse: project = _get_rw_project_for_user(request, code) if request.method == "POST": form = forms.AddGotifyForm(request.POST) if form.is_valid(): channel = Channel(project=project, kind="gotify") channel.value = form.get_value() channel.save() channel.assign_all_checks() return redirect("hc-channels", project.code) else: form = forms.AddGotifyForm() ctx = {"page": "channels", "project": project, "form": form} return render(request, "integrations/add_gotify.html", ctx) def group_form(request: HttpRequest, channel: Channel) -> HttpResponse: adding = channel._state.adding if request.method == "POST": form = forms.GroupForm(request.POST, project=channel.project) if form.is_valid(): channel.name = form.cleaned_data["label"] channel.value = form.get_value() channel.save() if adding: channel.assign_all_checks() return redirect("hc-channels", channel.project.code) elif adding: form = forms.GroupForm(project=channel.project) else: # Filter out unavailable channels channels = list(channel.group_channels.values_list("code", flat=True)) form = forms.GroupForm( {"channels": channels, "label": channel.name}, project=channel.project ) ctx = {"page": "channels", "project": channel.project, "form": form} return render(request, "integrations/group_form.html", ctx) @login_required def add_group(request: AuthenticatedHttpRequest, code: UUID) -> HttpResponse: project = _get_rw_project_for_user(request, code) channel = Channel(project=project, kind="group") return group_form(request, channel) def ntfy_form(request: HttpRequest, channel: Channel) -> HttpResponse: adding = channel._state.adding if request.method == "POST": form = forms.NtfyForm(request.POST) if form.is_valid(): channel.value = form.get_value() channel.save() if adding: channel.assign_all_checks() return redirect("hc-channels", channel.project.code) elif adding: form = forms.NtfyForm() else: form = forms.NtfyForm( { "topic": channel.ntfy.topic, "url": channel.ntfy.url, "priority": channel.ntfy.priority, "priority_up": channel.ntfy.priority_up, "token": channel.ntfy.token, } ) ctx = {"page": "channels", "project": channel.project, "form": form} return render(request, "integrations/ntfy_form.html", ctx) @login_required def add_ntfy(request: AuthenticatedHttpRequest, code: UUID) -> HttpResponse: project = _get_rw_project_for_user(request, code) channel = Channel(project=project, kind="ntfy") return ntfy_form(request, channel) @require_setting("SIGNAL_CLI_SOCKET") @login_required def signal_captcha(request: AuthenticatedHttpRequest) -> HttpResponse: if not request.user.is_superuser: return HttpResponseForbidden() ctx = {"challenge": request.GET.get("challenge", "")} if request.method == "POST": challenge = request.POST.get("challenge", "") captcha = request.POST.get("captcha", "") if captcha.startswith("signalcaptcha://"): captcha = captcha[16:] payload = { "jsonrpc": "2.0", "method": "submitRateLimitChallenge", "params": {"challenge": str(challenge), "captcha": captcha}, "id": str(uuid.uuid4()), } payload_bytes = (json.dumps(payload) + "\n").encode() for reply_bytes in Signal._read_replies(payload_bytes): try: reply = json.loads(reply_bytes.decode()) except ValueError: ctx["result"] = "submitRateLimitChallenge failed" break if reply.get("id") == payload["id"]: ctx["result"] = reply_bytes.decode() break return render(request, "front/signal_captcha.html", ctx) @require_setting("SIGNAL_CLI_SOCKET") @login_required @require_POST def verify_signal_number(request: AuthenticatedHttpRequest) -> HttpResponse: def render_result(result: str | None) -> HttpResponse: return render(request, "integrations/signal_result.html", {"result": result}) # Enforce per-account rate limit (50 verifications per day) if not TokenBucket.authorize_signal_verification(request.user): return render_result("Verification rate limit exceeded") form = forms.PhoneNumberForm(request.POST) if not form.is_valid(): return render_result("Invalid phone number") phone = form.cleaned_data["phone"] # Enforce per-recipient rate limit (6 messages per minute) if not TokenBucket.authorize_signal(phone): return render_result("Verification rate limit exceeded") try: Signal.send(phone, f"Test message from {settings.SITE_NAME}") except TransportError as e: return render_result(e.message) # Success! return render_result(None) def log_events(request: HttpRequest, code: UUID) -> HttpResponse: if not request.user.is_authenticated: return HttpResponseForbidden() check, rw = _get_check_for_user(request, code, preload_owner_profile=True) form = forms.LogFiltersForm(request.GET) if not form.is_valid(): return HttpResponseBadRequest() if form.cleaned_data["u"]: # We are live-loading more events start = form.cleaned_data["u"] + td(microseconds=1) end = now() else: # We're applying new filters start = check.created end = form.cleaned_data["end"] or now() # clamp start to the date of the oldest visible ping oldest_ping = check.visible_pings.order_by("n").first() if oldest_ping: start = max(start, oldest_ping.created) events = _get_events(check, 1000, start=start, end=end, kinds=form.kinds()) response = render(request, "front/log_rows.html", {"events": events}) if events: # Include a full precision timestamp of the most recent event in a # response header. This will be used client-side for fetching live updates # to specify "return any events after *this* point". response["X-Last-Event-Timestamp"] = str(events[0].created.timestamp()) return response # Forks: add custom views after this line