from ipaddress import IPv4Network import re from itertools import product from math import ceil, floor from django.conf import settings from django.core.exceptions import ValidationError from django.core.management import call_command from rest_framework import status from desecapi.models import Domain, RR, RRset from desecapi.models.records import RR_SET_TYPES_AUTOMATIC, RR_SET_TYPES_UNSUPPORTED from desecapi.tests.base import DesecTestCase, AuthenticatedRRSetBaseTestCase class UnauthenticatedRRSetTestCase(DesecTestCase): def test_unauthorized_access(self): url = self.reverse("v1:rrsets", name="example.com") for method in [ self.client.get, self.client.post, self.client.put, self.client.delete, self.client.patch, ]: response = method(url) self.assertStatus(response, status.HTTP_401_UNAUTHORIZED) class AuthenticatedRRSetTestCase(AuthenticatedRRSetBaseTestCase): def test_subname_validity(self): for subname in [ "aEroport", "AEROPORT", "aéroport", "a" * 64, ]: with self.assertRaises(ValidationError): RRset(domain=self.my_domain, subname=subname, ttl=60, type="A").save() for subname in [ "aeroport", "a" * 63, ]: RRset(domain=self.my_domain, subname=subname, ttl=60, type="A").save() def test_retrieve_my_rr_sets(self): for response in [ self.client.get_rr_sets(self.my_domain.name), self.client.get_rr_sets(self.my_domain.name, subname=""), ]: self.assertStatus(response, status.HTTP_200_OK) self.assertEqual(len(response.data), 1, response.data) def test_retrieve_my_rr_sets_pagination(self): def convert_links(links): mapping = {} for link in links.split(", "): _url, label = link.split("; ") label = re.search('rel="(.*)"', label).group(1) _url = _url[1:-1] assert label not in mapping mapping[label] = _url return mapping def assertPaginationResponse( response, expected_length, expected_directional_links=[] ): self.assertStatus(response, status.HTTP_200_OK) self.assertEqual(len(response.data), expected_length) _links = convert_links(response["Link"]) self.assertEqual( len(_links), len(expected_directional_links) + 1 ) # directional links, plus "first" self.assertTrue(_links["first"].endswith("/?cursor=")) for directional_link in expected_directional_links: self.assertEqual( _links["first"].find("/?cursor="), _links[directional_link].find("/?cursor="), ) self.assertTrue(len(_links[directional_link]) > len(_links["first"])) # Prepare extra records so that we get three pages (total: n + 1) n = int(settings.REST_FRAMEWORK["PAGE_SIZE"] * 2.5) RRset.objects.bulk_create( [ RRset(domain=self.my_domain, subname=str(i), ttl=123, type="A") for i in range(n) ] ) # No pagination response = self.client.get_rr_sets(self.my_domain.name) self.assertStatus(response, status.HTTP_400_BAD_REQUEST) self.assertEqual( response.data["detail"], f'Pagination required. You can query up to {settings.REST_FRAMEWORK["PAGE_SIZE"]} items at a time ({n+1} total). ' "Please use the `first` page link (see Link header).", ) links = convert_links(response["Link"]) self.assertEqual(len(links), 1) self.assertTrue(links["first"].endswith("/?cursor=")) # First page url = links["first"] response = self.client.get(url) assertPaginationResponse( response, settings.REST_FRAMEWORK["PAGE_SIZE"], ["next"] ) # Next url = convert_links(response["Link"])["next"] response = self.client.get(url) assertPaginationResponse( response, settings.REST_FRAMEWORK["PAGE_SIZE"], ["next", "prev"] ) data_next = response.data.copy() # Next-next (last) page url = convert_links(response["Link"])["next"] response = self.client.get(url) assertPaginationResponse(response, n / 5 + 1, ["prev"]) # Prev url = convert_links(response["Link"])["prev"] response = self.client.get(url) assertPaginationResponse( response, settings.REST_FRAMEWORK["PAGE_SIZE"], ["next", "prev"] ) # Make sure that one step forward equals two steps forward and one step back self.assertEqual(response.data, data_next) def test_retrieve_other_rr_sets(self): self.assertStatus( self.client.get_rr_sets(self.other_domain.name), status.HTTP_404_NOT_FOUND ) self.assertStatus( self.client.get_rr_sets(self.other_domain.name, subname="test"), status.HTTP_404_NOT_FOUND, ) self.assertStatus( self.client.get_rr_sets(self.other_domain.name, type="A"), status.HTTP_404_NOT_FOUND, ) def test_retrieve_my_rr_sets_filter(self): response = self.client.get_rr_sets(self.my_rr_set_domain.name, query="?cursor=") self.assertStatus(response, status.HTTP_200_OK) expected_number_of_rrsets = min( len(self._test_rr_sets()), settings.REST_FRAMEWORK["PAGE_SIZE"] ) self.assertEqual(len(response.data), expected_number_of_rrsets) for subname in self.SUBNAMES: response = self.client.get_rr_sets( self.my_rr_set_domain.name, subname=subname ) self.assertStatus(response, status.HTTP_200_OK) self.assertRRSetsCount( response.data, [dict(subname=subname)], count=len(self._test_rr_sets(subname=subname)), ) for type_ in self.ALLOWED_TYPES: response = self.client.get_rr_sets(self.my_rr_set_domain.name, type=type_) self.assertStatus(response, status.HTTP_200_OK) def test_create_my_rr_sets(self): for subname in [ None, "create-my-rr-sets", "foo.create-my-rr-sets", "bar.baz.foo.create-my-rr-sets", ]: for data in [ {"subname": subname, "records": ["1.2.3.4"], "ttl": 3660, "type": "A"}, { "subname": "" if subname is None else subname, "records": ["desec.io."], "ttl": 36900, "type": "PTR", }, { "subname": "" if subname is None else subname, "ttl": 3650, "type": "TXT", "records": ['"foo"'], }, { "subname": f"{subname}.cname".lower(), "ttl": 3600, "type": "CNAME", "records": ["example.com."], }, ]: # Try POST with missing subname if data["subname"] is None: data.pop("subname") with self.assertPdnsRequests( self.requests_desec_rr_sets_update(name=self.my_empty_domain.name) ): response = self.client.post_rr_set( domain_name=self.my_empty_domain.name, **data ) self.assertStatus(response, status.HTTP_201_CREATED) self.assertTrue( all( field in response.data for field in [ "created", "domain", "subname", "name", "records", "ttl", "type", "touched", ] ) ) self.assertEqual( self.my_empty_domain.touched, max( rrset.touched for rrset in self.my_empty_domain.rrset_set.all() ), ) # Check for uniqueness on second attempt response = self.client.post_rr_set( domain_name=self.my_empty_domain.name, **data ) self.assertContains( response, "Another RRset with the same subdomain and type exists for this domain.", status_code=status.HTTP_400_BAD_REQUEST, ) response = self.client.get_rr_sets(self.my_empty_domain.name) self.assertStatus(response, status.HTTP_200_OK) self.assertRRSetsCount(response.data, [data]) response = self.client.get_rr_set( self.my_empty_domain.name, data.get("subname", ""), data["type"] ) self.assertStatus(response, status.HTTP_200_OK) self.assertRRSet(response.data, **data) def test_create_my_rr_sets_type_restriction(self): for subname in [ "", "create-my-rr-sets", "foo.create-my-rr-sets", "bar.baz.foo.create-my-rr-sets", ]: for data in ( [ {"subname": subname, "ttl": 60, "type": "a"}, { "subname": subname, "records": ["10 example.com."], "ttl": 60, "type": "txt", }, ] + [ { "subname": subname, "records": ["10 example.com."], "ttl": 60, "type": type_, } for type_ in self.UNSUPPORTED_TYPES ] + [ { "subname": subname, "records": [ "get.desec.io. get.desec.io. 2584 10800 3600 604800 60" ], "ttl": 60, "type": type_, } for type_ in self.AUTOMATIC_TYPES ] ): response = self.client.post_rr_set(self.my_domain.name, **data) self.assertStatus(response, status.HTTP_400_BAD_REQUEST) response = self.client.get_rr_sets(self.my_domain.name) self.assertStatus(response, status.HTTP_200_OK) self.assertRRSetsCount(response.data, [data], count=0) def test_create_my_rr_sets_only_at_apex(self): for type_, records in { "DNSKEY": ["257 3 15 l02Woi0iS8Aa25FQkUd9RMzZHJpBoRQwAQEX1SxZJA4="], }.items(): data = { "subname": "non-apex", "ttl": 3600, "type": type_, "records": records, } r = self.client.post_rr_set(self.my_empty_domain.name, **data) self.assertContains( r, f"{type_} RRset must have empty subname", status_code=status.HTTP_400_BAD_REQUEST, ) def test_create_my_rr_sets_restricted_at_apex(self): for type_, records in { "CNAME": ["foobar.com."], "DS": ["45586 5 1 D0FDF996D1AF2CCDBDC942B02CB02D379629E20B"], }.items(): data = {"subname": "", "ttl": 3600, "type": type_, "records": records} r = self.client.post_rr_set(self.my_empty_domain.name, **data) self.assertContains( r, f"{type_} RRset cannot have empty subname", status_code=status.HTTP_400_BAD_REQUEST, ) def test_create_my_rr_sets_cname_multiple_records(self): for records in (["foobar.com.", "foobar.com."], ["foobar.com.", "foobar.org."]): data = {"subname": "asdf", "ttl": 3600, "type": "CNAME", "records": records} response = self.client.post_rr_set(self.my_empty_domain.name, **data) self.assertContains( response, "CNAME RRset cannot have multiple records", status_code=status.HTTP_400_BAD_REQUEST, ) def test_create_my_rr_sets_dname_multiple_records(self): for records in (["foobar.com.", "foobar.com."], ["foobar.com.", "foobar.org."]): data = {"subname": "asdf", "ttl": 3600, "type": "DNAME", "records": records} response = self.client.post_rr_set(self.my_empty_domain.name, **data) self.assertContains( response, "DNAME RRset cannot have multiple records", status_code=status.HTTP_400_BAD_REQUEST, ) def test_create_my_rr_sets_cname_exclusivity(self): self.create_rr_set(self.my_domain, ["1.2.3.4"], type="A", ttl=3600, subname="a") self.create_rr_set( self.my_domain, ["example.com."], type="CNAME", ttl=3600, subname="cname" ) # Can't add a CNAME where something else is data = { "subname": "a", "ttl": 3600, "type": "CNAME", "records": ["foobar.com."], } response = self.client.post_rr_set(self.my_domain.name, **data) self.assertStatus(response, status.HTTP_400_BAD_REQUEST) # Can't add something else where a CNAME is data = {"subname": "cname", "ttl": 3600, "type": "A", "records": ["4.3.2.1"]} response = self.client.post_rr_set(self.my_domain.name, **data) self.assertStatus(response, status.HTTP_400_BAD_REQUEST) def test_create_my_rr_sets_without_records(self): for subname in [ "", "create-my-rr-sets", "foo.create-my-rr-sets", "bar.baz.foo.create-my-rr-sets", ]: for data in [ {"subname": subname, "records": [], "ttl": 60, "type": "A"}, {"subname": subname, "ttl": 60, "type": "A"}, ]: response = self.client.post_rr_set(self.my_empty_domain.name, **data) self.assertStatus(response, status.HTTP_400_BAD_REQUEST) response = self.client.get_rr_sets(self.my_empty_domain.name) self.assertStatus(response, status.HTTP_200_OK) self.assertRRSetsCount(response.data, [], count=0) def test_create_other_rr_sets(self): data = {"records": ["1.2.3.4"], "ttl": 60, "type": "A"} response = self.client.post_rr_set(self.other_domain.name, **data) self.assertStatus(response, status.HTTP_404_NOT_FOUND) @staticmethod def _create_test_txt_record(record, type_="TXT"): return { "records": [f"{record}"], "ttl": 3600, "type": type_, "subname": f"name{len(record)}", } def test_create_my_rr_sets_chunk_too_long(self): for l, t in product([1, 255, 256, 498], ["TXT", "SPF"]): with self.assertPdnsRequests( self.requests_desec_rr_sets_update(self.my_empty_domain.name) ): response = self.client.post_rr_set( self.my_empty_domain.name, **self._create_test_txt_record(f'"{"A" * l}"', t), ) self.assertStatus(response, status.HTTP_201_CREATED) with self.assertPdnsRequests( self.requests_desec_rr_sets_update(self.my_empty_domain.name) ): self.client.delete_rr_set( self.my_empty_domain.name, type_=t, subname=f"name{l+2}" ) def test_create_my_rr_sets_too_long_content(self): def token(length): if length == 0: return "" if length <= 255: return f'"{"A" * length}"' return f"{token(255)} " * (length // 255) + token(length % 255) def p2w_length(length): return ceil(length / 255 * 256) def w2p_length(length): return floor(length / 256 * 255) max_wirelength = 64000 max_preslength = w2p_length(max_wirelength) assert max_preslength == 63750 assert p2w_length(max_preslength) == 64000 assert p2w_length(max_preslength + 1) == 64002 for t in ["SPF", "TXT"]: response = self.client.post_rr_set( self.my_empty_domain.name, # record of wire length 501 bytes in chunks of max 255 each (RFC 4408) **self._create_test_txt_record(token(max_preslength + 1), t), ) self.assertStatus(response, status.HTTP_400_BAD_REQUEST) self.assertIn( f"Ensure this value has no more than {max_wirelength} byte in wire format (it has {p2w_length(max_preslength + 1)}).", str(response.data), ) with self.assertPdnsRequests( self.requests_desec_rr_sets_update(self.my_empty_domain.name) ): response = self.client.post_rr_set( self.my_empty_domain.name, # record of wire length 500 bytes in chunks of max 255 each (RFC 4408) **self._create_test_txt_record(token(max_preslength)), ) self.assertStatus(response, status.HTTP_201_CREATED) def test_create_my_rr_sets_too_large_rrset(self): network = IPv4Network("127.0.0.0/20") # size: 4096 IP addresses data = { "records": [str(ip) for ip in network], "ttl": 3600, "type": "A", "subname": "name", } response = self.client.post_rr_set(self.my_empty_domain.name, **data) self.assertStatus(response, status.HTTP_400_BAD_REQUEST) excess_length = 28743 + len(self.my_empty_domain.name) self.assertIn( f"Total length of RRset exceeds limit by {excess_length} bytes.", str(response.data), ) def test_create_my_rr_sets_twice(self): data = {"records": ["1.2.3.4"], "ttl": 3660, "type": "A"} with self.assertPdnsRequests( self.requests_desec_rr_sets_update(self.my_empty_domain.name) ): response = self.client.post_rr_set(self.my_empty_domain.name, **data) self.assertStatus(response, status.HTTP_201_CREATED) data["records"][0] = "3.2.2.1" response = self.client.post_rr_set(self.my_empty_domain.name, **data) self.assertStatus(response, status.HTTP_400_BAD_REQUEST) def test_create_my_rr_sets_duplicate_content(self): for records in [ ["::1", "0::1"], # TODO add more examples ]: data = {"records": records, "ttl": 3660, "type": "AAAA"} response = self.client.post_rr_set(self.my_empty_domain.name, **data) self.assertContains( response, "Duplicate", status_code=status.HTTP_400_BAD_REQUEST ) def test_create_my_rr_sets_upper_case(self): for subname in ["asdF", "cAse", "asdf.FOO", "--F", "ALLCAPS"]: data = {"records": ["1.2.3.4"], "ttl": 60, "type": "A", "subname": subname} response = self.client.post_rr_set(self.my_empty_domain.name, **data) self.assertStatus(response, status.HTTP_400_BAD_REQUEST) self.assertIn("Subname can only use (lowercase)", str(response.data)) def test_create_my_rr_sets_subname_too_long(self): name = ( "9.1.f.1.0.6.1.d.f.2.2.b.9.1.6.3.c.2.f.7.9.5.2.4.7.8.3.2.6.6.c.6.9.3.5.6.9.2.4.d.c.e.a.e.f.1.8" ".b.a.c.9.0.6.2.c.b.c.1.6.3.8.2.7.9.0.5.2.a.c.f.f.2.6.a.c.3.c.e.3.0.6.1.8.0.7.4.0.1.0.0.2.ip6.test" ) domain = self.create_domain(name=name, owner=self.owner) subname = ( "e.8.c.f.e.0.9.f.4.9.1.f.1.0.6.1.d.f.2.2.b.9.1.6.3.c.9.3.5.6.9.2.4.d.c.e.a.e.f.1.8.b.a.c.9.0" ".6.2.c.b.c.1.6.3.8.2.6.7.9.0.5.2.a.c.f.f.2.6" ) data = {"subname": subname, "records": ["1.2.3.4"], "ttl": 3600, "type": "A"} response = self.client.post_rr_set(domain.name, **data) self.assertStatus(response, status.HTTP_400_BAD_REQUEST) self.assertEqual(response.data["subname"][0].code, "name_too_long") def test_create_my_rr_sets_subname_too_many_dots(self): for subname in ["dottest.", ".dottest", "dot..test"]: data = { "subname": subname, "records": ["10 example.com."], "ttl": 3600, "type": "MX", } response = self.client.post_rr_set(self.my_domain.name, **data) self.assertStatus(response, status.HTTP_400_BAD_REQUEST) response = self.client.get_rr_sets(self.my_domain.name) self.assertStatus(response, status.HTTP_200_OK) self.assertRRSetsCount(response.data, [data], count=0) def test_create_my_rr_sets_empty_payload(self): response = self.client.post_rr_set(self.my_empty_domain.name) self.assertContains( response, "No data provided", status_code=status.HTTP_400_BAD_REQUEST ) def test_create_my_rr_sets_cname_two_records(self): data = { "subname": "sub", "records": ["example.com.", "example.org."], "ttl": 3600, "type": "CNAME", } response = self.client.post_rr_set(self.my_domain.name, **data) self.assertStatus(response, status.HTTP_400_BAD_REQUEST) def test_create_my_rr_sets_canonical_content(self): # TODO fill in more examples datas = [ # record type: (non-canonical input, canonical output expectation) ("A", ("127.0.0.1", "127.0.0.1")), ("AAAA", ("0000::0000:0001", "::1")), ("AAAA", ("::ffff:127.0.0.1", "::ffff:7f00:1")), ("AAAA", ("2001:db8::128.2.129.4", "2001:db8::8002:8104")), ("AFSDB", ("02 turquoise.FEMTO.edu.", "2 turquoise.femto.edu.")), ( "APL", ( "2:FF00:0:0:0:0::/8 !1:192.168.38.0/28", "2:ff00::/8 !1:192.168.38.0/28", ), ), ("CAA", ('0128 "issue" "letsencrypt.org"', '128 issue "letsencrypt.org"')), ( "CDNSKEY", ( "0256 03 08 AwEAAday3UX323uVzQqtOMQ7EHQYfD5Ofv4akjQGN2zY5AgB/2jmdR/+ 1PvXFqzKCAGJv4wjABEBNWLLFm7ew1hHMDZEKVL17aml0EBKI6Dsz6Mx t6n7ScvLtHaFRKaxT4i2JxiuVhKdQR9XGMiWAPQKrRM5SLG0P+2F+TLK l3D0L/cD", "256 3 8 AwEAAday3UX323uVzQqtOMQ7EHQYfD5Ofv4akjQGN2zY5AgB/2jmdR/+1PvXFqzKCAGJv4wjABEBNWLLFm7ew1hHMDZEKVL17aml0EBKI6Dsz6Mxt6n7ScvLtHaFRKaxT4i2JxiuVhKdQR9XGMiWAPQKrRM5SLG0P+2F+TLKl3D0L/cD", ), ), ( "CDNSKEY", ( "257 3 8 AwEAAcw5QLr0IjC0wKbGoBPQv4qmeqHy9mvL5qGQTuaG5TSrNqEAR6b/ qvxDx6my4JmEmjUPA1JeEI9YfTUieMr2UZflu7aIbZFLw0vqiYrywCGr CHXLalOrEOmrvAxLvq4vHtuTlH7JIszzYBSes8g1vle6KG7xXiP3U5Ll 96Qiu6bZ31rlMQSPB20xbqJJh6psNSrQs41QvdcXAej+K2Hl1Wd8kPri ec4AgiBEh8sk5Pp8W9ROLQ7PcbqqttFaW2m7N/Wy4qcFU13roWKDEAst bxH5CHPoBfZSbIwK4KM6BK/uDHpSPIbiOvOCW+lvu9TAiZPc0oysY6as lO7jXv16Gws=", "257 3 8 AwEAAcw5QLr0IjC0wKbGoBPQv4qmeqHy9mvL5qGQTuaG5TSrNqEAR6b/qvxDx6my4JmEmjUPA1JeEI9YfTUieMr2UZflu7aIbZFLw0vqiYrywCGrCHXLalOrEOmrvAxLvq4vHtuTlH7JIszzYBSes8g1vle6KG7xXiP3U5Ll96Qiu6bZ31rlMQSPB20xbqJJh6psNSrQs41QvdcXAej+K2Hl1Wd8kPriec4AgiBEh8sk5Pp8W9ROLQ7PcbqqttFaW2m7N/Wy4qcFU13roWKDEAstbxH5CHPoBfZSbIwK4KM6BK/uDHpSPIbiOvOCW+lvu9TAiZPc0oysY6aslO7jXv16Gws=", ), ), ( "CDNSKEY", ( "257 3 13 aCoEWYBBVsP9Fek2oC8yqU8ocKmnS1iDSFZNORnQuHKtJ9Wpyz+kNryq uB78Pyk/NTEoai5bxoipVQQXzHlzyg==", "257 3 13 aCoEWYBBVsP9Fek2oC8yqU8ocKmnS1iDSFZNORnQuHKtJ9Wpyz+kNryquB78Pyk/NTEoai5bxoipVQQXzHlzyg==", ), ), ("CDNSKEY", ("0 3 0 AA==", "0 3 0 AA==")), ( "CDS", ( "047883 013 02 43BD262211B2A748335149408F67BC95B9A4A3174FD86E6A83830380 446E7AFD", "47883 13 2 43BD262211B2A748335149408F67BC95B9A4A3174FD86E6A83830380446E7AFD".lower(), ), ), ("CDS", ("0 0 0 00", "0 0 0 00")), ("CERT", ("06 00 00 sadfdd==", "6 0 0 sadfdQ==")), ("CNAME", ("EXAMPLE.COM.", "example.com.")), ("CSYNC", ("066 03 NS AAAA A", "66 3 A NS AAAA")), ("DHCID", ("xxxx", "xxxx")), ( "DLV", ( "6454 8 2 5CBA665A006F6487625C6218522F09BD3673C25FA10F25CB18459AA1 0DF1F520", "6454 8 2 5CBA665A006F6487625C6218522F09BD3673C25FA10F25CB18459AA10DF1F520".lower(), ), ), ( "DLV", ( "6454 8 2 5C BA665A006F6487625C6218522F09BD3673C25FA10F25CB18459AA1 0DF1F520", "6454 8 2 5CBA665A006F6487625C6218522F09BD3673C25FA10F25CB18459AA10DF1F520".lower(), ), ), ("DNAME", ("EXAMPLE.COM.", "example.com.")), ( "DNSKEY", ( "0256 03 08 AwEAAday3UX323uVzQqtOMQ7EHQYfD5Ofv4akjQGN2zY5AgB/2jmdR/+ 1PvXFqzKCAGJv4wjABEBNWLLFm7ew1hHMDZEKVL17aml0EBKI6Dsz6Mx t6n7ScvLtHaFRKaxT4i2JxiuVhKdQR9XGMiWAPQKrRM5SLG0P+2F+TLK l3D0L/cD", "256 3 8 AwEAAday3UX323uVzQqtOMQ7EHQYfD5Ofv4akjQGN2zY5AgB/2jmdR/+1PvXFqzKCAGJv4wjABEBNWLLFm7ew1hHMDZEKVL17aml0EBKI6Dsz6Mxt6n7ScvLtHaFRKaxT4i2JxiuVhKdQR9XGMiWAPQKrRM5SLG0P+2F+TLKl3D0L/cD", ), ), ( "DNSKEY", ( "257 3 8 AwEAAcw5QLr0IjC0wKbGoBPQv4qmeqHy9mvL5qGQTuaG5TSrNqEAR6b/ qvxDx6my4JmEmjUPA1JeEI9YfTUieMr2UZflu7aIbZFLw0vqiYrywCGr CHXLalOrEOmrvAxLvq4vHtuTlH7JIszzYBSes8g1vle6KG7xXiP3U5Ll 96Qiu6bZ31rlMQSPB20xbqJJh6psNSrQs41QvdcXAej+K2Hl1Wd8kPri ec4AgiBEh8sk5Pp8W9ROLQ7PcbqqttFaW2m7N/Wy4qcFU13roWKDEAst bxH5CHPoBfZSbIwK4KM6BK/uDHpSPIbiOvOCW+lvu9TAiZPc0oysY6as lO7jXv16Gws=", "257 3 8 AwEAAcw5QLr0IjC0wKbGoBPQv4qmeqHy9mvL5qGQTuaG5TSrNqEAR6b/qvxDx6my4JmEmjUPA1JeEI9YfTUieMr2UZflu7aIbZFLw0vqiYrywCGrCHXLalOrEOmrvAxLvq4vHtuTlH7JIszzYBSes8g1vle6KG7xXiP3U5Ll96Qiu6bZ31rlMQSPB20xbqJJh6psNSrQs41QvdcXAej+K2Hl1Wd8kPriec4AgiBEh8sk5Pp8W9ROLQ7PcbqqttFaW2m7N/Wy4qcFU13roWKDEAstbxH5CHPoBfZSbIwK4KM6BK/uDHpSPIbiOvOCW+lvu9TAiZPc0oysY6aslO7jXv16Gws=", ), ), ( "DNSKEY", ( "257 3 13 aCoEWYBBVsP9Fek2oC8yqU8ocKmnS1iDSFZNORnQuHKtJ9Wpyz+kNryq uB78Pyk/NTEoai5bxoipVQQXzHlzyg==", "257 3 13 aCoEWYBBVsP9Fek2oC8yqU8ocKmnS1iDSFZNORnQuHKtJ9Wpyz+kNryquB78Pyk/NTEoai5bxoipVQQXzHlzyg==", ), ), ( "DS", ( "6454 8 2 5CBA665A006F6487625C6218522F09BD3673C25FA10F25CB18459AA1 0DF1F520", "6454 8 2 5CBA665A006F6487625C6218522F09BD3673C25FA10F25CB18459AA10DF1F520".lower(), ), ), ( "DS", ( "6454 8 2 5C BA665A006F6487625C6218522F09BD3673C25FA10F25CB18459AA1 0DF1F520", "6454 8 2 5CBA665A006F6487625C6218522F09BD3673C25FA10F25CB18459AA10DF1F520".lower(), ), ), ("EUI48", ("AA-BB-CC-DD-EE-FF", "aa-bb-cc-dd-ee-ff")), ("EUI64", ("AA-BB-CC-DD-EE-FF-aa-aa", "aa-bb-cc-dd-ee-ff-aa-aa")), ("HINFO", ("cpu os", '"cpu" "os"')), ("HINFO", ('"cpu" "os"', '"cpu" "os"')), ( "HTTPS", ("01 h3POOL.exaMPLe. aLPn=h2,h3", "1 h3POOL.exaMPLe. alpn=h2,h3"), ), ( "HTTPS", ( "01 h3POOL.exaMPLe. aLPn=h2,h3 ECH=MTIzLi4uCg==", '1 h3POOL.exaMPLe. alpn=h2,h3 ech="MTIzLi4uCg=="', ), ), # ('IPSECKEY', ('01 00 02 . ASDFAF==', '1 0 2 . ASDFAA==')), # ('IPSECKEY', ('01 00 02 . 000000==', '1 0 2 . 00000w==')), ("KX", ("010 example.com.", "10 example.com.")), ("L32", ("010 10.1.2.0", "10 10.1.2.0")), ("L64", ("010 2001:0Db8:2140:2000", "10 2001:0db8:2140:2000")), ( "LOC", ( "023 012 59 N 042 022 48.500 W 65.00m 20.00m 10.00m 10.00m", "23 12 59.000 N 42 22 48.500 W 65.00m 20.00m 10.00m 10.00m", ), ), ("LP", ("010 l64-subnet1.example.com.", "10 l64-subnet1.example.com.")), ("MX", ("10 010.1.1.1.", "10 010.1.1.1.")), ("MX", ("010 010.1.1.2.", "10 010.1.1.2.")), ("MX", ("0 .", "0 .")), ( "NAPTR", ( '100 50 "s" "z3950+I2L+I2C" "" _z3950._tcp.gatech.edu.', '100 50 "s" "z3950+I2L+I2C" "" _z3950._tcp.gatech.edu.', ), ), ("NID", ("010 0014:4fff:ff20:Ee64", "10 0014:4fff:ff20:ee64")), ("NS", ("EXaMPLE.COM.", "example.com.")), ( "OPENPGPKEY", ( "mG8EXtVIsRMFK4EEACIDAwQSZPNqE4tS xLFJYhX+uabSgMrhOqUizJhkLx82", "mG8EXtVIsRMFK4EEACIDAwQSZPNqE4tSxLFJYhX+uabSgMrhOqUizJhkLx82", ), ), ("PTR", ("EXAMPLE.COM.", "example.com.")), ("RP", ("hostmaster.EXAMPLE.com. .", "hostmaster.example.com. .")), ("SMIMEA", ("3 01 0 aaBBccddeeff", "3 1 0 aabbccddeeff")), ( "SPF", ( '"v=spf1 ip4:10.1" ".1.1 ip4:127" ".0.0.0/16 ip4:192.168.0.0/27 include:example.com -all"', '"v=spf1 ip4:10.1" ".1.1 ip4:127" ".0.0.0/16 ip4:192.168.0.0/27 include:example.com -all"', ), ), ("SPF", ('"foo" "bar"', '"foo" "bar"')), ("SPF", ('"foobar"', '"foobar"')), ("SRV", ("0 000 0 .", "0 0 0 .")), ("SRV", ("100 1 5061 EXAMPLE.com.", "100 1 5061 example.com.")), ("SRV", ("100 1 5061 example.com.", "100 1 5061 example.com.")), ("SSHFP", ("2 2 aabbccEEddff", "2 2 aabbcceeddff")), ( "SVCB", ( "2 sVc2.example.NET. IPV6hint=2001:db8:00:0::2 port=01234", "2 sVc2.example.NET. port=1234 ipv6hint=2001:db8::2", ), ), ( "SVCB", ( "2 sVc2.example.NET. ECH=MjIyLi4uCg== IPV6hint=2001:db8:00:0::2 port=01234", '2 sVc2.example.NET. port=1234 ech="MjIyLi4uCg==" ipv6hint=2001:db8::2', ), ), ( "TLSA", ( "3 0001 1 000AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA", "3 1 1 000aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa", ), ), ( "TLSA", ( "003 00 002 696B8F6B92A913560b23ef5720c378881faffe74432d04eb35db957c0a93987b47adf26abb5dac10ba482597ae16edb069b511bec3e26010d1927bf6392760dd", "3 0 2 696b8f6b92a913560b23ef5720c378881faffe74432d04eb35db957c0a93987b47adf26abb5dac10ba482597ae16edb069b511bec3e26010d1927bf6392760dd", ), ), ( "TLSA", ( "3 0 2 696B8F6B92A913560b23ef5720c378881faffe74432d04eb35db957c0a93987b47adf26abb5dac10ba482597ae16edb069b511bec3e26010d1927bf6392760dd696B8F6B92A913560b23ef5720c378881faffe74432d04eb35db957c0a93987b47adf26abb5dac10ba482597ae16edb069b511bec3e26010d1927bf6392760dd", "3 0 2 696b8f6b92a913560b23ef5720c378881faffe74432d04eb35db957c0a93987b47adf26abb5dac10ba482597ae16edb069b511bec3e26010d1927bf6392760dd696b8f6b92a913560b23ef5720c378881faffe74432d04eb35db957c0a93987b47adf26abb5dac10ba482597ae16edb069b511bec3e26010d1927bf6392760dd", ), ), ("TXT", ('"foo" "bar"', '"foo" "bar"')), ("TXT", ('"foobar"', '"foobar"')), ("TXT", ('"foo" "" "bar"', '"foo" "" "bar"')), ("TXT", ('"" "" "foo" "" "bar"', '"" "" "foo" "" "bar"')), ( "TXT", ( r'"\130\164name\164Boss\164type\1611"', r'"\130\164name\164Boss\164type\1611"', ), ), ( "TXT", ( '"' + "".join(rf"\{n:03}" for n in range(256)) + '"', # all bytes r'"\000\001\002\003\004\005\006\007\008\009\010\011\012\013\014\015\016\017\018\019\020\021\022\023\024\025\026\027\028\029\030\031 !\"#$%&' + "'" + r'()*+,-./0123456789:;<=>?@ABCDEFGHIJKLMNOPQRSTUVWXYZ[\\]^_`abcdefghijklmnopqrstuvwxyz{|}~\127\128\129\130\131\132\133\134\135\136\137\138\139\140\141\142\143\144\145\146\147\148\149\150\151\152\153\154\155\156\157\158\159\160\161\162\163\164\165\166\167\168\169\170\171\172\173\174\175\176\177\178\179\180\181\182\183\184\185\186\187\188\189\190\191\192\193\194\195\196\197\198\199\200\201\202\203\204\205\206\207\208\209\210\211\212\213\214\215\216\217\218\219\220\221\222\223\224\225\226\227\228\229\230\231\232\233\234\235\236\237\238\239\240\241\242\243\244\245\246\247\248\249\250\251\252\253\254" "\255"', ), ), ( "URI", ( '10 01 "ftp://ftp1.example.com/public"', '10 1 "ftp://ftp1.example.com/public"', ), ), ] for t, (record, canonical_record) in datas: if not record: continue subname = "" if t == "DNSKEY" else "test" data = {"records": [record], "ttl": 3660, "type": t, "subname": subname} with self.assertPdnsRequests( self.requests_desec_rr_sets_update(name=self.my_empty_domain.name) ): response = self.client.post_rr_set(self.my_empty_domain.name, **data) self.assertStatus(response, status.HTTP_201_CREATED) self.assertEqual( canonical_record, response.data["records"][0], f"For RR set type {t}, expected '{canonical_record}' to be the canonical form of " f'\'{record}\', but saw \'{response.data["records"][0]}\'.', ) with self.assertPdnsRequests( self.requests_desec_rr_sets_update(name=self.my_empty_domain.name) ): response = self.client.delete_rr_set( self.my_empty_domain.name, subname=subname, type_=t ) self.assertStatus(response, status.HTTP_204_NO_CONTENT) self.assertAllSupportedRRSetTypes(set(t for t, _ in datas)) def test_create_my_rr_sets_known_type_benign(self): # TODO fill in more examples datas = { "A": ["127.0.0.1", "127.0.0.2"], "AAAA": ["::1", "::2"], "AFSDB": ["2 turquoise.femto.edu."], "APL": [ # from RFC 3123 Sec. 4 "1:192.168.32.0/21 !1:192.168.38.0/28", "1:192.168.42.0/26 1:192.168.42.64/26 1:192.168.42.128/25", "1:127.0.0.1/32 1:172.16.64.0/22", "1:224.0.0.0/4 2:FF00:0:0:0:0:0:0:0/8", # made-up (not from RFC) "1:1.2.3.4/32 2:::/128", "2:FF00::/8 !1:192.168.38.0/28", ], "CAA": [ '128 issue "letsencrypt.org"', '128 iodef "mailto:desec@example.com"', '1 issue "letsencrypt.org"', ], "CERT": ["06 0 0 sadfdd==", "IPGP 0 0 sadfdd=="], "CDNSKEY": [ "256 3 8 AwEAAday3UX323uVzQqtOMQ7EHQYfD5Ofv4akjQGN2zY5AgB/2jmdR/+ 1PvXFqzKCAGJv4wjABEBNWLLFm7ew1hHMDZEKVL17aml0EBKI6Dsz6Mx t6n7ScvLtHaFRKaxT4i2JxiuVhKdQR9XGMiWAPQKrRM5SLG0P+2F+TLK l3D0L/cD", "257 3 8 AwEAAcw5QLr0IjC0wKbGoBPQv4qmeqHy9mvL5qGQTuaG5TSrNqEAR6b/ qvxDx6my4JmEmjUPA1JeEI9YfTUieMr2UZflu7aIbZFLw0vqiYrywCGr CHXLalOrEOmrvAxLvq4vHtuTlH7JIszzYBSes8g1vle6KG7xXiP3U5Ll 96Qiu6bZ31rlMQSPB20xbqJJh6psNSrQs41QvdcXAej+K2Hl1Wd8kPri ec4AgiBEh8sk5Pp8W9ROLQ7PcbqqttFaW2m7N/Wy4qcFU13roWKDEAst bxH5CHPoBfZSbIwK4KM6BK/uDHpSPIbiOvOCW+lvu9TAiZPc0oysY6as lO7jXv16Gws=", "257 3 13 aCoEWYBBVsP9Fek2oC8yqU8ocKmnS1iDSFZNORnQuHKtJ9Wpyz+kNryq uB78Pyk/NTEoai5bxoipVQQXzHlzyg==", ], "CDS": [ "6454 8 1 24396E17E36D031F71C354B06A979A67A01F503E", "6454 8 2 5CBA665A006F6487625C6218522F09BD3673C25FA10F25CB18459AA1 0DF1F520", "62703 13 2 085BF1EE0ADBBC99D4D9328229EBDCAEC5FAB20E38610072AD055474 4C7AF4A0", "61655 13 4 C838A5C66FCBF83B8B6B50C3CEEC3524777FE4AF8A9FE0172ECAD242 48B0CA1A216DD0D538F20C130DD3059538204B04", "6454 8 5 24396E17E36D031F71C354B06A979A67A01F503E", ], "CNAME": ["example.com."], "CSYNC": ["0 0", "66 1 A", "66 2 AAAA", "66 3 A NS AAAA", "66 15 NSEC"], "DHCID": ["aaaaaaaaaaaa", "aa aaa aaaa a a a"], "DLV": [ "6454 8 1 24396E17E36D031F71C354B06A979A67A01F503E", "6454 8 2 5CBA665A006F6487625C6218522F09BD3673C25FA10F25CB18459AA1 0DF1F520", "62703 13 2 085BF1EE0ADBBC99D4D9328229EBDCAEC5FAB20E38610072AD055474 4C7AF4A0", "61655 13 4 C838A5C66FCBF83B8B6B50C3CEEC3524777FE4AF8A9FE0172ECAD242 48B0CA1A216DD0D538F20C130DD3059538204B04", "6454 8 5 24396E17E36D031F71C354B06A979A67A01F503E", ], "DNAME": ["example.com."], "DNSKEY": [ "256 3 8 AwEAAday3UX323uVzQqtOMQ7EHQYfD5Ofv4akjQGN2zY5AgB/2jmdR/+ 1PvXFqzKCAGJv4wjABEBNWLLFm7ew1hHMDZEKVL17aml0EBKI6Dsz6Mx t6n7ScvLtHaFRKaxT4i2JxiuVhKdQR9XGMiWAPQKrRM5SLG0P+2F+TLK l3D0L/cD", "257 3 8 AwEAAcw5QLr0IjC0wKbGoBPQv4qmeqHy9mvL5qGQTuaG5TSrNqEAR6b/ qvxDx6my4JmEmjUPA1JeEI9YfTUieMr2UZflu7aIbZFLw0vqiYrywCGr CHXLalOrEOmrvAxLvq4vHtuTlH7JIszzYBSes8g1vle6KG7xXiP3U5Ll 96Qiu6bZ31rlMQSPB20xbqJJh6psNSrQs41QvdcXAej+K2Hl1Wd8kPri ec4AgiBEh8sk5Pp8W9ROLQ7PcbqqttFaW2m7N/Wy4qcFU13roWKDEAst bxH5CHPoBfZSbIwK4KM6BK/uDHpSPIbiOvOCW+lvu9TAiZPc0oysY6as lO7jXv16Gws=", "257 3 13 aCoEWYBBVsP9Fek2oC8yqU8ocKmnS1iDSFZNORnQuHKtJ9Wpyz+kNryq uB78Pyk/NTEoai5bxoipVQQXzHlzyg==", ], "DS": [ "6454 8 1 24396E17E36D031F71C354B06A979A67A01F503E", "6454 8 2 5CBA665A006F6487625C6218522F09BD3673C25FA10F25CB18459AA1 0DF1F520", "62703 13 2 085BF1EE0ADBBC99D4D9328229EBDCAEC5FAB20E38610072AD055474 4C7AF4A0", "61655 13 4 C838A5C66FCBF83B8B6B50C3CEEC3524777FE4AF8A9FE0172ECAD242 48B0CA1A216DD0D538F20C130DD3059538204B04", "6454 8 5 24396E17E36D031F71C354B06A979A67A01F503E", ], "EUI48": ["aa-bb-cc-dd-ee-ff", "AA-BB-CC-DD-EE-FF"], "EUI64": ["aa-bb-cc-dd-ee-ff-00-11", "AA-BB-CC-DD-EE-FF-00-11"], "HINFO": ['"ARMv8-A" "Linux"'], "HTTPS": [ # from https://www.ietf.org/archive/id/draft-ietf-dnsop-svcb-https-06.html#name-examples, with ech base64'd "1 . alpn=h3", "0 pool.svc.example.", '1 h3pool.example. alpn=h2,h3 ech="MTIzLi4uCg=="', '2 . alpn=h2 ech="YWJjLi4uCg=="', # made-up (not from RFC) "1 pool.svc.example. no-default-alpn alpn=h2 port=1234 ipv4hint=192.168.123.1", "2 . ech=... key65333=ex1 key65444=ex2 mandatory=key65444,ech", # see #section-7 ], # 'IPSECKEY': [ # '12 0 2 . asdfdf==', # '03 1 1 127.0.0.1 asdfdf==', # '10 02 2 bade::affe AQNRU3mG7TVTO2BkR47usntb102uFJtugbo6BSGvgqt4AQ==', # '12 3 01 example.com. asdfdf==', # ], "KX": ["4 example.com.", "28 io."], "L32": ["010 10.1.2.0", "65535 1.2.3.4"], "L64": ["010 2001:0DB8:1140:1000", "10 2001:0DB8:1140:1000"], "LOC": ["23 12 59.000 N 42 22 48.500 W 65.00m 20.00m 10.00m 10.00m"], "LP": ["10 l64-subnet1.example.com.", "65535 ."], "MX": ["10 example.com.", "20 1.1.1.1."], "NAPTR": ['100 50 "s" "z3950+I2L+I2C" "" _z3950._tcp.gatech.edu.'], "NID": ["65535 0014:4fff:ff20:ee64"], "NS": ["ns1.example.com."], "OPENPGPKEY": [ "mG8EXtVIsRMFK4EEACIDAwQSZPNqE4tSxLFJYhX+uabSgMrhOqUizJhkLx82", # key incomplete "YWFh\xf0\x9f\x92\xa9YWFh", # valid as non-alphabet bytes will be ignored ], "PTR": ["example.com.", "*.example.com."], "RP": ["hostmaster.example.com. ."], "SMIMEA": ["3 1 0 aabbccddeeff"], "SPF": [ '"v=spf1 include:example.com ~all"', '"v=spf1 ip4:10.1.1.1 ip4:127.0.0.0/16 ip4:192.168.0.0/27 include:example.com -all"', '"spf2.0/pra,mfrom ip6:2001:558:fe14:76:68:87:28:0/120 -all"', ], "SRV": ["0 0 0 .", "100 1 5061 example.com."], "SSHFP": ["2 2 aabbcceeddff"], "SVCB": [ "0 svc4-baz.example.net.", "1 . key65333=...", '2 svc2.example.net. ech="MjIyLi4uCg==" ipv6hint=2001:db8::2 port=1234', ], "TLSA": [ "3 1 1 AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA", "3 0 2 696b8f6b92a913560b23ef5720c378881faffe74432d04eb35db957c0a93987b47adf26abb5dac10ba482597ae16edb069b511bec3e26010d1927bf6392760dd", "3 0 2 696b8f6b92a913560b23ef5720c378881faffe74432d04eb35db957c0a93987b47adf26abb5dac10ba482597ae16edb069b511bec3e26010d1927bf6392760dd696b8f6b92a913560b23ef5720c378881faffe74432d04eb35db957c0a93987b47adf26abb5dac10ba482597ae16edb069b511bec3e26010d1927bf6392760dd", ], "TXT": [ '"foobar"', '"foo" "bar"', '"“红色联合”对“四·二八兵团”总部大楼的攻击已持续了两天"', '"new\\010line"' '"🧥 👚 👕 👖 👔 👗 👙 👘 👠 👡 👢 👞 👟 🥾 🥿 🧦 🧤 🧣 🎩 🧢 👒 🎓 ⛑ 👑 👝 👛 👜 💼 🎒 👓 🕶 🥽 🥼 🌂 🧵"', ], "URI": ['10 1 "ftp://ftp1.example.com/public"'], } self.assertAllSupportedRRSetTypes(set(datas.keys())) for t, records in datas.items(): subname = "" if t == "DNSKEY" else "test" for r in records: data = {"records": [r], "ttl": 3660, "type": t, "subname": subname} with self.assertPdnsRequests( self.requests_desec_rr_sets_update(name=self.my_empty_domain.name) ): response = self.client.post_rr_set( self.my_empty_domain.name, **data ) self.assertStatus(response, status.HTTP_201_CREATED) with self.assertPdnsRequests( self.requests_desec_rr_sets_update(name=self.my_empty_domain.name) ): response = self.client.delete_rr_set( self.my_empty_domain.name, subname=subname, type_=t ) self.assertStatus(response, status.HTTP_204_NO_CONTENT) def test_create_my_rr_sets_known_type_invalid(self): # TODO fill in more examples datas = { # recordtype: [list of examples expected to be rejected, individually] "A": [ "127.0.0.999", "127.000.0.01", "127.0.0.256", "::1", "foobar", "10.0.1", "10!", ], "AAAA": ["::g", "1:1:1:1:1:1:1:1:", "1:1:1:1:1:1:1:1:1"], "AFSDB": ["example.com.", "1 1", "1 de"], "APL": [ "0:192.168.32.0/21 !1:192.168.38.0/28", "1:192.168.32.0/21 !!1:192.168.38.0/28", "1:192.168.32.0/33", "18:12345/2", "1:127.0.0.1", "2:FF00:0:0:0:0:0:0:0:0/8" "2:::/129", ], "CAA": ['43235 issue "letsencrypt.org"'], "CERT": ["6 0 sadfdd=="], "CDNSKEY": [ "a 3 13 aCoEWYBBVsP9Fek2oC8yqU8ocKmnS1iDSFZNORnQuHKtJ9Wpyz+kNryq uB78Pyk/NTEoai5bxoipVQQXzHlzyg==" "257 b 13 aCoEWYBBVsP9Fek2oC8yqU8ocKmnS1iDSFZNORnQuHKtJ9Wpyz+kNryq uB78Pyk/NTEoai5bxoipVQQXzHlzyg==" "257 3 c aCoEWYBBVsP9Fek2oC8yqU8ocKmnS1iDSFZNORnQuHKtJ9Wpyz+kNryq uB78Pyk/NTEoai5bxoipVQQXzHlzyg==" "257 3 13 d", "0 3 0 0", ], "CDS": [ "a 8 1 24396E17E36D031F71C354B06A979A67A01F503E", "-6454 8 1 24396E17E36D031F71C354B06A979A67A01F503E", "6454 b 1 24396E17E36D031F71C354B06A979A67A01F503E", "6454 8 c 24396E17E36D031F71C354B06A979A67A01F503E", "6454 8 1 d", "6454 8 0 24396E17E36D031F71C354B06A979A67A01F503E", "6454 8 1 aabbccddeeff", "0 0 0 0", ], "CNAME": ["example.com", "10 example.com."], "CSYNC": ["0 -1 A", "444 65536 A", "0 3 AAA"], "DHCID": ["x", "xx", "xxx"], "DLV": [ "a 8 1 24396E17E36D031F71C354B06A979A67A01F503E", "-6454 8 1 24396E17E36D031F71C354B06A979A67A01F503E", "6454 b 1 24396E17E36D031F71C354B06A979A67A01F503E", "6454 8 c 24396E17E36D031F71C354B06A979A67A01F503E", "6454 8 1 d", "6454 8 0 24396E17E36D031F71C354B06A979A67A01F503E", "6454 8 1 aabbccddeeff", ], "DNAME": ["example.com", "10 example.com."], "DNSKEY": [ "a 3 13 aCoEWYBBVsP9Fek2oC8yqU8ocKmnS1iDSFZNORnQuHKtJ9Wpyz+kNryq uB78Pyk/NTEoai5bxoipVQQXzHlzyg==" "257 b 13 aCoEWYBBVsP9Fek2oC8yqU8ocKmnS1iDSFZNORnQuHKtJ9Wpyz+kNryq uB78Pyk/NTEoai5bxoipVQQXzHlzyg==" "257 3 c aCoEWYBBVsP9Fek2oC8yqU8ocKmnS1iDSFZNORnQuHKtJ9Wpyz+kNryq uB78Pyk/NTEoai5bxoipVQQXzHlzyg==" "257 3 13 d" ], "DS": [ "a 8 1 24396E17E36D031F71C354B06A979A67A01F503E", "-6454 8 1 24396E17E36D031F71C354B06A979A67A01F503E", "6454 b 1 24396E17E36D031F71C354B06A979A67A01F503E", "6454 8 c 24396E17E36D031F71C354B06A979A67A01F503E", "6454 8 1 d", "6454 8 0 24396E17E36D031F71C354B06A979A67A01F503E", "6454 8 1 aabbccddeeff", ], "EUI48": ["aa-bb-ccdd-ee-ff", "AA-BB-CC-DD-EE-GG"], "EUI64": ["aa-bb-cc-dd-ee-ff-gg-11", "AA-BB-C C-DD-EE-FF-00-11"], "HINFO": ['"ARMv8-A"', f'"a" "{"b"*256}"'], "HTTPS": [ # from https://tools.ietf.org/html/draft-ietf-dnsop-svcb-https-02#section-10.3, with ech base64'd '1 h3pool alpn=h2,h3 ech="MTIzLi4uCg=="', # made-up (not from RFC) "0 pool.svc.example. no-default-alpn port=1234 ipv4hint=192.168.123.1", # no keys in alias mode "1 pool.svc.example. no-default-alpn port=1234 ipv4hint=192.168.123.1 ipv4hint=192.168.123.2", # dup ], # 'IPSECKEY': [], "KX": ["-1 example.com", "10 example.com"], "L32": ["65536 10.1.2.0", "5 a.1.2.0", "10 10.1.02.0"], "L64": ["65536 2001:0DB8:4140:4000", "5 01:0DB8:4140:4000"], "LOC": [ "23 12 61.000 N 42 22 48.500 W 65.00m 20.00m 10.00m 10.00m", "foo", "1.1.1.1", ], "LP": [ "10 l64-subnet1.example.com", "-3 l64-subnet1.example.com.", "65536 l64-subnet1.example.com.", ], "MX": [ "10 example.com", "example.com.", "-5 asdf.", "65537 asdf.", "10 _foo.example.com.", "10 $url.", ], "NAPTR": [ '100 50 "s" "z3950+I2L+I2C" "" _z3950._tcp.gatech.edu', '100 50 "s" "" _z3950._tcp.gatech.edu.', '100 50 3 2 "z3950+I2L+I2C" "" _z3950._tcp.gatech.edu.', ], "NID": ["010 14:4fff:ff20:Ee64", "d 0014:4fff:ff20:ee64", "20 ::14::ee64"], "NS": ["ns1.example.com", "127.0.0.1", "_foobar.example.dedyn.io.", "."], "OPENPGPKEY": ["1 2 3"], "PTR": ['"example.com."', "10 *.example.com."], "RP": ["hostmaster.example.com.", "10 foo."], "SMIMEA": ["3 1 0 aGVsbG8gd29ybGQh"], "SPF": ['"v=spf1', "v=spf1 include:example.com ~all"], "SRV": [ "0 0 0 0", "100 5061 example.com.", "0 0 16920 _foo.example.com.", "0 0 16920 $url.", ], "SSHFP": ["aabbcceeddff"], "SVCB": [ "0 svc4-baz.example.net. keys=val", "1 not.fully.qualified key65333=...", '2 duplicate.key. ech="MjIyLi4uCg==" ech="MjIyLi4uCg=="', ], "TLSA": ["3 1 1 AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA"], "TXT": [ 'foob"ar', "v=spf1 include:example.com ~all", '"foo\nbar"', '"\x00" "NUL byte yo"', '"' + "".join(rf"\{n:03}" for n in range(257)) + '"', # \256 does not exist ], "URI": ['"1" "2" "3"'], } self.assertAllSupportedRRSetTypes(set(datas.keys())) for t, records in datas.items(): for r in records: data = {"records": [r], "ttl": 3660, "type": t, "subname": ""} response = self.client.post_rr_set(self.my_empty_domain.name, **data) self.assertNotContains( response, "Duplicate", status_code=status.HTTP_400_BAD_REQUEST ) def test_create_my_rr_sets_txt_splitting(self): for t in ["TXT", "SPF"]: for l in [200, 255, 256, 300, 400]: data = { "records": [f'"{"a"*l}"'], "ttl": 3660, "type": t, "subname": f"x{l}", } with self.assertPdnsRequests( self.requests_desec_rr_sets_update(name=self.my_empty_domain.name) ): response = self.client.post_rr_set( self.my_empty_domain.name, **data ) self.assertStatus(response, status.HTTP_201_CREATED) response = self.client.get_rr_set(self.my_empty_domain.name, f"x{l}", t) num_tokens = response.data["records"][0].count(" ") + 1 num_tokens_expected = l // 256 + 1 self.assertEqual( num_tokens, num_tokens_expected, f"For a {t} record with a token of length of {l}, expected to see " f"{num_tokens_expected} tokens in the canonical format, but saw {num_tokens}.", ) self.assertEqual( "".join(r.strip('" ') for r in response.data["records"][0]), "a" * l ) def test_create_my_rr_sets_unknown_type(self): for _type in ["AA", "ASDF"] + list( RR_SET_TYPES_AUTOMATIC | RR_SET_TYPES_UNSUPPORTED ): response = self.client.post_rr_set( self.my_domain.name, records=["1234"], ttl=3660, type=_type ) self.assertContains( response, text="managed automatically" if _type in RR_SET_TYPES_AUTOMATIC else "type is currently unsupported", status_code=status.HTTP_400_BAD_REQUEST, ) def test_create_my_rr_sets_ttl_too_small(self): ttl = settings.MINIMUM_TTL_DEFAULT - 1 response = self.client.post_rr_set( self.my_empty_domain.name, records=["1.2.3.4"], ttl=ttl, type="A" ) self.assertStatus(response, status.HTTP_400_BAD_REQUEST) detail = f"Ensure this value is greater than or equal to {self.my_empty_domain.minimum_ttl}." self.assertEqual(response.data["ttl"][0], detail) ttl += 1 with self.assertPdnsRequests( self.requests_desec_rr_sets_update(name=self.my_empty_domain.name) ): response = self.client.post_rr_set( self.my_empty_domain.name, records=["1.2.23.4"], ttl=ttl, type="A" ) self.assertStatus(response, status.HTTP_201_CREATED) def test_create_my_rr_sets_ttl_too_large(self): max_ttl = 24 * 3600 response = self.client.post_rr_set( self.my_empty_domain.name, records=["1.2.3.4"], ttl=max_ttl + 1, type="A" ) self.assertStatus(response, status.HTTP_400_BAD_REQUEST) detail = f"Ensure this value is less than or equal to {max_ttl}." self.assertEqual(response.data["ttl"][0], detail) with self.assertPdnsRequests( self.requests_desec_rr_sets_update(name=self.my_empty_domain.name) ): response = self.client.post_rr_set( self.my_empty_domain.name, records=["1.2.23.4"], ttl=max_ttl, type="A" ) self.assertStatus(response, status.HTTP_201_CREATED) def test_retrieve_my_rr_sets_apex(self): response = self.client.get_rr_set( self.my_rr_set_domain.name, subname="", type_="A" ) self.assertStatus(response, status.HTTP_200_OK) self.assertEqual(response.data["records"][0], "1.2.3.4") self.assertEqual(response.data["ttl"], 3620) def test_retrieve_my_rr_sets_restricted_types(self): for type_ in self.AUTOMATIC_TYPES: response = self.client.get_rr_sets(self.my_domain.name, type=type_) self.assertStatus(response, status.HTTP_403_FORBIDDEN) response = self.client.get_rr_sets( self.my_domain.name, type=type_, subname="" ) self.assertStatus(response, status.HTTP_403_FORBIDDEN) def test_update_my_rr_sets(self): for subname in self.SUBNAMES: with self.assertPdnsRequests( self.requests_desec_rr_sets_update(name=self.my_rr_set_domain.name) ): data = { "records": ["2.2.3.4"], "ttl": 3630, "type": "A", "subname": subname, } response = self.client.put_rr_set( self.my_rr_set_domain.name, subname, "A", data ) self.assertStatus(response, status.HTTP_200_OK) response = self.client.get_rr_set(self.my_rr_set_domain.name, subname, "A") self.assertStatus(response, status.HTTP_200_OK) self.assertEqual(response.data["records"], ["2.2.3.4"]) self.assertEqual(response.data["ttl"], 3630) response = self.client.put_rr_set( self.my_rr_set_domain.name, subname, "A", {"records": ["2.2.3.5"]} ) self.assertStatus(response, status.HTTP_400_BAD_REQUEST) response = self.client.put_rr_set( self.my_rr_set_domain.name, subname, "A", {"ttl": 3637} ) self.assertStatus(response, status.HTTP_400_BAD_REQUEST) def test_update_my_rr_set_with_invalid_payload_type(self): for subname in self.SUBNAMES: data = [ {"records": ["2.2.3.4"], "ttl": 30, "type": "A", "subname": subname} ] response = self.client.put_rr_set( self.my_rr_set_domain.name, subname, "A", data ) self.assertStatus(response, status.HTTP_400_BAD_REQUEST) self.assertEquals( response.data["non_field_errors"][0], "Invalid data. Expected a dictionary, but got list.", ) data = "foobar" response = self.client.put_rr_set( self.my_rr_set_domain.name, subname, "A", data ) self.assertStatus(response, status.HTTP_400_BAD_REQUEST) self.assertEquals( response.data["non_field_errors"][0], "Invalid data. Expected a dictionary, but got str.", ) def test_partially_update_my_rr_sets(self): for subname in self.SUBNAMES: current_rr_set = self.client.get_rr_set( self.my_rr_set_domain.name, subname, "A" ).data for data in [ {"records": ["2.2.3.4"], "ttl": 3630}, {"records": ["3.2.3.4"]}, {"records": ["3.2.3.4", "9.8.8.7"]}, {"ttl": 3637}, ]: with self.assertPdnsRequests( self.requests_desec_rr_sets_update(name=self.my_rr_set_domain.name) ): response = self.client.patch_rr_set( self.my_rr_set_domain.name, subname, "A", data ) self.assertStatus(response, status.HTTP_200_OK) response = self.client.get_rr_set( self.my_rr_set_domain.name, subname, "A" ) self.assertStatus(response, status.HTTP_200_OK) current_rr_set.update(data) self.assertEqual( set(response.data["records"]), set(current_rr_set["records"]) ) self.assertEqual(response.data["ttl"], current_rr_set["ttl"]) response = self.client.patch_rr_set( self.my_rr_set_domain.name, subname, "A", {} ) self.assertStatus(response, status.HTTP_200_OK) def test_rr_sets_touched_if_noop(self): for subname in self.SUBNAMES: touched_old = RRset.objects.get( domain=self.my_rr_set_domain, type="A", subname=subname ).touched response = self.client.patch_rr_set( self.my_rr_set_domain.name, subname, "A", {} ) self.assertStatus(response, status.HTTP_200_OK) touched_new = RRset.objects.get( domain=self.my_rr_set_domain, type="A", subname=subname ).touched self.assertGreater(touched_new, touched_old) self.assertEqual( Domain.objects.get(name=self.my_rr_set_domain.name).touched, touched_new ) def test_partially_update_other_rr_sets(self): data = {"records": ["3.2.3.4"], "ttl": 334} for subname in self.SUBNAMES: response = self.client.patch_rr_set( self.other_rr_set_domain.name, subname, "A", data ) self.assertStatus(response, status.HTTP_404_NOT_FOUND) def test_update_other_rr_sets(self): data = {"ttl": 305} for subname in self.SUBNAMES: response = self.client.patch_rr_set( self.other_rr_set_domain.name, subname, "A", data ) self.assertStatus(response, status.HTTP_404_NOT_FOUND) def test_update_essential_properties(self): # Changing the subname is expected to cause an error url = self.reverse( "v1:rrset", name=self.my_rr_set_domain.name, subname="test", type="A" ) data = {"records": ["3.2.3.4"], "ttl": 3620, "subname": "test2", "type": "A"} response = self.client.patch(url, data) self.assertStatus(response, status.HTTP_400_BAD_REQUEST) self.assertEquals(response.data["subname"][0].code, "read-only-on-update") response = self.client.put(url, data) self.assertStatus(response, status.HTTP_400_BAD_REQUEST) self.assertEquals(response.data["subname"][0].code, "read-only-on-update") # Changing the type is expected to cause an error data = {"records": ["3.2.3.4"], "ttl": 3620, "subname": "test", "type": "TXT"} response = self.client.patch(url, data) self.assertStatus(response, status.HTTP_400_BAD_REQUEST) self.assertEquals(response.data["type"][0].code, "read-only-on-update") response = self.client.put(url, data) self.assertStatus(response, status.HTTP_400_BAD_REQUEST) self.assertEquals(response.data["type"][0].code, "read-only-on-update") # Changing "created" is no-op response = self.client.get(url) data = response.data created = data["created"] data["created"] = "2019-07-19T17:22:49.575717Z" response = self.client.patch(url, data) self.assertStatus(response, status.HTTP_200_OK) response = self.client.put(url, data) self.assertStatus(response, status.HTTP_200_OK) # Check that nothing changed response = self.client.get(url) self.assertStatus(response, status.HTTP_200_OK) self.assertEqual(response.data["records"][0], "2.2.3.4") self.assertEqual(response.data["ttl"], 3620) self.assertEqual( response.data["name"], "test." + self.my_rr_set_domain.name + "." ) self.assertEqual(response.data["subname"], "test") self.assertEqual(response.data["type"], "A") self.assertEqual(response.data["created"], created) # This is expected to work, but the fields are ignored data = {"records": ["3.2.3.4"], "name": "example.com.", "domain": "example.com"} with self.assertPdnsRequests( self.requests_desec_rr_sets_update(name=self.my_rr_set_domain.name) ): response = self.client.patch(url, data) self.assertStatus(response, status.HTTP_200_OK) response = self.client.get(url) self.assertStatus(response, status.HTTP_200_OK) self.assertEqual(response.data["records"][0], "3.2.3.4") self.assertEqual(response.data["domain"], self.my_rr_set_domain.name) self.assertEqual( response.data["name"], "test." + self.my_rr_set_domain.name + "." ) def test_update_unknown_rrset(self): url = self.reverse( "v1:rrset", name=self.my_rr_set_domain.name, subname="doesnotexist", type="A", ) data = {"records": ["3.2.3.4"], "ttl": 3620} response = self.client.patch(url, data) self.assertStatus(response, status.HTTP_404_NOT_FOUND) response = self.client.put(url, data) self.assertStatus(response, status.HTTP_404_NOT_FOUND) def test_delete_my_rr_sets_with_patch(self): data = {"records": []} for subname in self.SUBNAMES: with self.assertPdnsRequests( self.requests_desec_rr_sets_update(name=self.my_rr_set_domain.name) ): response = self.client.patch_rr_set( self.my_rr_set_domain.name, subname, "A", data ) self.assertStatus(response, status.HTTP_204_NO_CONTENT) # Deletion is only idempotent via DELETE. For PATCH/PUT, the view raises 404 if the instance does not # exist. By that time, the view has not parsed the payload yet and does not know it is a deletion. response = self.client.patch_rr_set( self.my_rr_set_domain.name, subname, "A", data ) self.assertStatus(response, status.HTTP_404_NOT_FOUND) response = self.client.get_rr_set(self.my_rr_set_domain.name, subname, "A") self.assertStatus(response, status.HTTP_404_NOT_FOUND) def test_delete_my_rr_sets_with_delete(self): for subname in self.SUBNAMES: with self.assertPdnsRequests( self.requests_desec_rr_sets_update(name=self.my_rr_set_domain.name) ): response = self.client.delete_rr_set( self.my_rr_set_domain.name, subname=subname, type_="A" ) self.assertStatus(response, status.HTTP_204_NO_CONTENT) domain = Domain.objects.get(name=self.my_rr_set_domain.name) self.assertEqual(domain.touched, domain.published) response = self.client.delete_rr_set( self.my_rr_set_domain.name, subname=subname, type_="A" ) self.assertStatus(response, status.HTTP_204_NO_CONTENT) response = self.client.get_rr_set( self.my_rr_set_domain.name, subname=subname, type_="A" ) self.assertStatus(response, status.HTTP_404_NOT_FOUND) def test_delete_other_rr_sets(self): data = {"records": []} for subname in self.SUBNAMES: # Try PATCH empty response = self.client.patch_rr_set( self.other_rr_set_domain.name, subname, "A", data ) self.assertStatus(response, status.HTTP_404_NOT_FOUND) # Try DELETE response = self.client.delete_rr_set( self.other_rr_set_domain.name, subname, "A" ) self.assertStatus(response, status.HTTP_404_NOT_FOUND) # Make sure it actually is still there self.assertGreater( len( self.other_rr_set_domain.rrset_set.filter(subname=subname, type="A") ), 0, ) def test_import_rr_sets(self): with self.assertPdnsRequests( self.request_pdns_zone_retrieve(name=self.my_domain.name) ): call_command("sync-from-pdns", self.my_domain.name) for response in [ self.client.get_rr_sets(self.my_domain.name), self.client.get_rr_sets(self.my_domain.name, subname=""), ]: self.assertStatus(response, status.HTTP_200_OK) self.assertEqual(len(response.data), 1, response.data) self.assertContainsRRSets( response.data, [dict(subname="", records=settings.DEFAULT_NS, type="NS")], ) def test_extra_dnskeys(self): name = "ietf.org" dnskeys = [ "256 3 5 AwEAAdDECajHaTjfSoNTY58WcBah1BxPKVIHBz4IfLjfqMvium4lgKtKZLe97DgJ5/NQrNEGGQmr6fKvUj67cfrZUojZ2cGRiz" "VhgkOqZ9scaTVXNuXLM5Tw7VWOVIceeXAuuH2mPIiEV6MhJYUsW6dvmNsJ4XwCgNgroAmXhoMEiWEjBB+wjYZQ5GtZHBFKVXACSWTiCtdd" "HcueOeSVPi5WH94VlubhHfiytNPZLrObhUCHT6k0tNE6phLoHnXWU+6vpsYpz6GhMw/R9BFxW5PdPFIWBgoWk2/XFVRSKG9Lr61b2z1R12" "6xeUwvw46RVy3hanV3vNO7LM5HniqaYclBbhk=", "257 3 5 AwEAAavjQ1H6pE8FV8LGP0wQBFVL0EM9BRfqxz9p/sZ+8AByqyFHLdZcHoOGF7CgB5OKYMvGOgysuYQloPlwbq7Ws5WywbutbX" "yG24lMWy4jijlJUsaFrS5EvUu4ydmuRc/TGnEXnN1XQkO+waIT4cLtrmcWjoY8Oqud6lDaJdj1cKr2nX1NrmMRowIu3DIVtGbQJmzpukpD" "VZaYMMAm8M5vz4U2vRCVETLgDoQ7rhsiD127J8gVExjO8B0113jCajbFRcMtUtFTjH4z7jXP2ZzDcXsgpe4LYFuenFQAcRBRlE6oaykHR7" "rlPqqmw58nIELJUFoMcb/BdRLgbyTeurFlnxs=", ] expected_ds = [ "45586 5 2 67fcd7e0b9e0366309f3b6f7476dff931d5226edc5348cd80fd82a081dfcf6ee", "45586 5 4 aee6931c7790c428bca35dab9179cb27f042715e38e5a8adb6bb24c57c21c65dbd02a5b09887787f30128bfac8b6f0b5", ] domain = Domain.objects.create(name=name, owner=self.owner) rrset = domain.rrset_set.create(subname="", type="DNSKEY", ttl=3600) rrset.records.bulk_create( [RR(rrset=rrset, content=dnskey) for dnskey in dnskeys] ) url = self.reverse("v1:domain-detail", name=domain.name) with self.assertPdnsRequests( self.request_pdns_zone_retrieve_crypto_keys(name=domain.name) ): response = self.client.get(url) self.assertStatus(response, status.HTTP_200_OK) self.assertEqual( response.data["keys"], [ { "dnskey": key["dnskey"], "ds": key["cds"] if key["flags"] & 1 else [], "flags": key["flags"], "keytype": key["keytype"], "managed": True, } for key in self.get_body_pdns_zone_retrieve_crypto_keys() ] + [ { "dnskey": dnskeys[0], "ds": [], "flags": 256, "keytype": None, "managed": False, }, { "dnskey": dnskeys[1], "ds": expected_ds, "flags": 257, "keytype": None, "managed": False, }, ], )