123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213 |
- import json
- import re
- from hashlib import sha1
- import requests
- from django.conf import settings
- from django.core.exceptions import SuspiciousOperation
- from desecapi import metrics
- from desecapi.exceptions import PDNSException, RequestEntityTooLarge
- SUPPORTED_RRSET_TYPES = {
- # https://doc.powerdns.com/authoritative/appendices/types.html
- # "major" types
- "A",
- "AAAA",
- "AFSDB",
- "ALIAS",
- "APL",
- "CAA",
- "CERT",
- "CDNSKEY",
- "CDS",
- "CNAME",
- "CSYNC",
- "DNSKEY",
- "DNAME",
- "DS",
- "HINFO",
- "HTTPS",
- "KEY",
- "LOC",
- "MX",
- "NAPTR",
- "NS",
- "NSEC",
- "NSEC3",
- "NSEC3PARAM",
- "OPENPGPKEY",
- "PTR",
- "RP",
- "RRSIG",
- "SOA",
- "SPF",
- "SSHFP",
- "SRV",
- "SVCB",
- "TLSA",
- "SMIMEA",
- "TXT",
- "URI",
- # "additional" types, without obsolete ones
- "DHCID",
- "DLV",
- "EUI48",
- "EUI64",
- "IPSECKEY",
- "KX",
- "MINFO",
- "MR",
- "RKEY",
- "WKS",
- # https://doc.powerdns.com/authoritative/changelog/4.5.html#change-4.5.0-alpha1-New-Features
- "NID",
- "L32",
- "L64",
- "LP",
- }
- NSLORD = object()
- NSMASTER = object()
- _config = {
- NSLORD: {
- "base_url": settings.NSLORD_PDNS_API,
- "headers": {
- "Accept": "application/json",
- "User-Agent": "desecapi",
- "X-API-Key": settings.NSLORD_PDNS_API_TOKEN,
- },
- },
- NSMASTER: {
- "base_url": settings.NSMASTER_PDNS_API,
- "headers": {
- "Accept": "application/json",
- "User-Agent": "desecapi",
- "X-API-Key": settings.NSMASTER_PDNS_API_TOKEN,
- },
- },
- }
- def _pdns_request(method, *, server, path, data=None):
- if data is not None:
- data = json.dumps(data)
- if data is not None and len(data) > settings.PDNS_MAX_BODY_SIZE:
- raise RequestEntityTooLarge
- r = requests.request(
- method,
- _config[server]["base_url"] + path,
- data=data,
- headers=_config[server]["headers"],
- )
- if r.status_code not in range(200, 300):
- raise PDNSException(response=r)
- metrics.get("desecapi_pdns_request_success").labels(method, r.status_code).inc()
- return r
- def _pdns_post(server, path, data):
- return _pdns_request("post", server=server, path=path, data=data)
- def _pdns_patch(server, path, data):
- return _pdns_request("patch", server=server, path=path, data=data)
- def _pdns_get(server, path):
- return _pdns_request("get", server=server, path=path)
- def _pdns_put(server, path):
- return _pdns_request("put", server=server, path=path)
- def _pdns_delete(server, path):
- return _pdns_request("delete", server=server, path=path)
- def pdns_id(name):
- # See also pdns code, apiZoneNameToId() in ws-api.cc (with the exception of forward slash)
- if not re.match(r"^[a-zA-Z0-9_.-]+$", name):
- raise SuspiciousOperation("Invalid hostname " + name)
- name = name.translate(str.maketrans({"/": "=2F", "_": "=5F"}))
- return name.rstrip(".") + "."
- def get_keys(domain):
- """
- Retrieves a dict representation of the DNSSEC key information
- """
- r = _pdns_get(NSLORD, "/zones/%s/cryptokeys" % pdns_id(domain.name))
- metrics.get("desecapi_pdns_keys_fetched").inc()
- field_map = {
- "dnskey": "dnskey",
- "cds": "ds",
- "flags": "flags", # deprecated
- "keytype": "keytype", # deprecated
- }
- return [
- {v: key.get(k, []) for k, v in field_map.items()}
- for key in r.json()
- if key["published"]
- ]
- def get_zone(domain):
- """
- Retrieves a dict representation of the zone from pdns
- """
- r = _pdns_get(NSLORD, "/zones/" + pdns_id(domain.name))
- return r.json()
- def get_rrset_datas(domain):
- """
- Retrieves a dict representation of the RRsets in a given zone
- """
- return [
- {
- "domain": domain,
- "subname": rrset["name"][: -(len(domain.name) + 2)],
- "type": rrset["type"],
- "records": [record["content"] for record in rrset["records"]],
- "ttl": rrset["ttl"],
- }
- for rrset in get_zone(domain)["rrsets"]
- ]
- def construct_catalog_rrset(
- zone=None, delete=False, subname=None, qtype="PTR", rdata=None
- ):
- # subname can be generated from zone for convenience; exactly one needs to be given
- assert (zone is None) ^ (subname is None)
- # sanity check: one can't delete an rrset and give record data at the same time
- assert not (delete and rdata)
- if subname is None:
- zone = zone.rstrip(".") + "."
- m_unique = sha1(zone.encode()).hexdigest()
- subname = f"{m_unique}.zones"
- if rdata is None:
- rdata = zone
- return {
- "name": f"{subname}.{settings.CATALOG_ZONE}".strip(".") + ".",
- "type": qtype,
- "ttl": 0, # as per the specification
- "changetype": "REPLACE",
- "records": [] if delete else [{"content": rdata, "disabled": False}],
- }
- def get_serials():
- return {
- zone["name"]: zone["edited_serial"]
- for zone in _pdns_get(NSMASTER, "/zones").json()
- }
|