|
@@ -1,4 +1,6 @@
|
|
import base64
|
|
import base64
|
|
|
|
+from functools import reduce
|
|
|
|
+import operator
|
|
import random
|
|
import random
|
|
import re
|
|
import re
|
|
import string
|
|
import string
|
|
@@ -663,11 +665,11 @@ class DomainOwnerTestCase(DesecTestCase):
|
|
cls.owner = cls.create_user(dyn=cls.DYN)
|
|
cls.owner = cls.create_user(dyn=cls.DYN)
|
|
|
|
|
|
cls.my_domains = [
|
|
cls.my_domains = [
|
|
- cls.create_domain(suffix=random.choice(cls.AUTO_DELEGATION_DOMAINS) if cls.DYN else '', owner=cls.owner)
|
|
|
|
|
|
+ cls.create_domain(suffix=random.choice(cls.AUTO_DELEGATION_DOMAINS) if cls.DYN else None, owner=cls.owner)
|
|
for _ in range(cls.NUM_OWNED_DOMAINS)
|
|
for _ in range(cls.NUM_OWNED_DOMAINS)
|
|
]
|
|
]
|
|
cls.other_domains = [
|
|
cls.other_domains = [
|
|
- cls.create_domain(suffix=random.choice(cls.AUTO_DELEGATION_DOMAINS) if cls.DYN else '')
|
|
|
|
|
|
+ cls.create_domain(suffix=random.choice(cls.AUTO_DELEGATION_DOMAINS) if cls.DYN else None)
|
|
for _ in range(cls.NUM_OTHER_DOMAINS)
|
|
for _ in range(cls.NUM_OTHER_DOMAINS)
|
|
]
|
|
]
|
|
|
|
|
|
@@ -727,3 +729,109 @@ class DynDomainOwnerTestCase(DomainOwnerTestCase):
|
|
self.client_token_authorized = self.client_class()
|
|
self.client_token_authorized = self.client_class()
|
|
self.client.set_credentials_basic_auth(self.my_domain.name, self.token.key)
|
|
self.client.set_credentials_basic_auth(self.my_domain.name, self.token.key)
|
|
self.client_token_authorized.set_credentials_token_auth(self.token.key)
|
|
self.client_token_authorized.set_credentials_token_auth(self.token.key)
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+class AuthenticatedRRSetBaseTestCase(DomainOwnerTestCase):
|
|
|
|
+ DEAD_TYPES = ['ALIAS', 'DNAME']
|
|
|
|
+ RESTRICTED_TYPES = ['SOA', 'RRSIG', 'DNSKEY', 'NSEC3PARAM', 'OPT']
|
|
|
|
+
|
|
|
|
+ # see https://doc.powerdns.com/md/types/
|
|
|
|
+ PDNS_RR_TYPES = ['A', 'AAAA', 'AFSDB', 'ALIAS', 'CAA', 'CERT', 'CDNSKEY', 'CDS', 'CNAME', 'DNSKEY', 'DNAME', 'DS',
|
|
|
|
+ 'HINFO', 'KEY', 'LOC', 'MX', 'NAPTR', 'NS', 'NSEC', 'NSEC3', 'NSEC3PARAM', 'OPENPGPKEY', 'PTR',
|
|
|
|
+ 'RP', 'RRSIG', 'SOA', 'SPF', 'SSHFP', 'SRV', 'TKEY', 'TSIG', 'TLSA', 'SMIMEA', 'TXT', 'URI']
|
|
|
|
+ ALLOWED_TYPES = ['A', 'AAAA', 'AFSDB', 'CAA', 'CERT', 'CDNSKEY', 'CDS', 'CNAME', 'DS', 'HINFO', 'KEY', 'LOC', 'MX',
|
|
|
|
+ 'NAPTR', 'NS', 'NSEC', 'NSEC3', 'OPENPGPKEY', 'PTR', 'RP', 'SPF', 'SSHFP', 'SRV', 'TKEY', 'TSIG',
|
|
|
|
+ 'TLSA', 'SMIMEA', 'TXT', 'URI']
|
|
|
|
+
|
|
|
|
+ SUBNAMES = ['foo', 'bar.baz', 'q.w.e.r.t', '*', '*.foobar', '_', '-foo.test', '_bar']
|
|
|
|
+
|
|
|
|
+ @classmethod
|
|
|
|
+ def _test_rr_sets(cls, subname=None, type_=None, records=None, ttl=None):
|
|
|
|
+ """
|
|
|
|
+ Gives a list of example RR sets for testing.
|
|
|
|
+ Args:
|
|
|
|
+ subname: Filter by subname. None to allow any.
|
|
|
|
+ type_: Filter by type. None to allow any.
|
|
|
|
+ records: Filter by records. Must match exactly. None to allow any.
|
|
|
|
+ ttl: Filter by ttl. None to allow any.
|
|
|
|
+
|
|
|
|
+ Returns: Returns a list of tuples that represents example RR sets represented as 4-tuples consisting of
|
|
|
|
+ subname, type_, records, ttl
|
|
|
|
+ """
|
|
|
|
+ # TODO add more examples of cls.ALLOWED_TYPES
|
|
|
|
+ # NOTE The validity of the RRset contents it *not* verified. We currently leave this task to pdns.
|
|
|
|
+ rr_sets = [
|
|
|
|
+ ('', 'A', ['1.2.3.4'], 120),
|
|
|
|
+ ('test', 'A', ['2.2.3.4'], 120),
|
|
|
|
+ ('test', 'TXT', ['"foobar"'], 120),
|
|
|
|
+ ] + [
|
|
|
|
+ (subname_, 'TXT', ['"hey ho, let\'s go!"'], 134)
|
|
|
|
+ for subname_ in cls.SUBNAMES
|
|
|
|
+ ] + [
|
|
|
|
+ (subname_, type_, ['10 mx1.example.com.'], 101)
|
|
|
|
+ for subname_ in cls.SUBNAMES
|
|
|
|
+ for type_ in ['MX', 'SPF']
|
|
|
|
+ ] + [
|
|
|
|
+ (subname_, 'A', ['1.2.3.4'], 187)
|
|
|
|
+ for subname_ in cls.SUBNAMES
|
|
|
|
+ ]
|
|
|
|
+
|
|
|
|
+ if subname or type_ or records or ttl:
|
|
|
|
+ rr_sets = [
|
|
|
|
+ rr_set for rr_set in rr_sets
|
|
|
|
+ if (
|
|
|
|
+ (subname is None or subname == rr_set[0]) and
|
|
|
|
+ (type_ is None or type_ == rr_set[1]) and
|
|
|
|
+ (records is None or records == rr_set[2]) and
|
|
|
|
+ (ttl is None or ttl == rr_set[3])
|
|
|
|
+ )
|
|
|
|
+ ]
|
|
|
|
+ return rr_sets
|
|
|
|
+
|
|
|
|
+ @classmethod
|
|
|
|
+ def setUpTestDataWithPdns(cls):
|
|
|
|
+ super().setUpTestDataWithPdns()
|
|
|
|
+ # TODO this test does not cover "dyn" / auto delegation domains
|
|
|
|
+ cls.my_empty_domain = cls.create_domain(suffix='', owner=cls.owner)
|
|
|
|
+ cls.my_rr_set_domain = cls.create_domain(suffix='', owner=cls.owner)
|
|
|
|
+ cls.other_rr_set_domain = cls.create_domain(suffix='')
|
|
|
|
+ for domain in [cls.my_rr_set_domain, cls.other_rr_set_domain]:
|
|
|
|
+ for (subname, type_, records, ttl) in cls._test_rr_sets():
|
|
|
|
+ cls.create_rr_set(domain, subname=subname, type=type_, records=records, ttl=ttl)
|
|
|
|
+
|
|
|
|
+ def assertRRSet(self, response_rr, domain=None, subname=None, records=None, type_=None, **kwargs):
|
|
|
|
+ kwargs['domain'] = domain
|
|
|
|
+ kwargs['subname'] = subname
|
|
|
|
+ kwargs['records'] = records
|
|
|
|
+ kwargs['type'] = type_
|
|
|
|
+
|
|
|
|
+ for key, value in kwargs.items():
|
|
|
|
+ if value is not None:
|
|
|
|
+ self.assertEqual(
|
|
|
|
+ response_rr[key], value,
|
|
|
|
+ 'RR set did not have the expected %s: Expected "%s" but was "%s" in %s' % (
|
|
|
|
+ key, value, response_rr[key], response_rr
|
|
|
|
+ )
|
|
|
|
+ )
|
|
|
|
+
|
|
|
|
+ @staticmethod
|
|
|
|
+ def _count_occurrences_by_mask(rr_sets, masks):
|
|
|
|
+ def _filter_rr_sets_by_mask(rr_sets_, mask):
|
|
|
|
+ return [rr_set for rr_set in rr_sets_
|
|
|
|
+ if reduce(operator.and_, [rr_set.get(key, None) == value for key, value in mask.items()])
|
|
|
|
+ ]
|
|
|
|
+
|
|
|
|
+ return [len(_filter_rr_sets_by_mask(rr_sets, mask)) for mask in masks]
|
|
|
|
+
|
|
|
|
+ def assertRRSetsCount(self, rr_sets, masks, count=1):
|
|
|
|
+ actual_counts = self._count_occurrences_by_mask(rr_sets, masks)
|
|
|
|
+ if not all([actual_count == count for actual_count in actual_counts]):
|
|
|
|
+ self.fail('Expected to find %i RR set(s) for each of %s, but distribution is %s in %s.' % (
|
|
|
|
+ count, masks, actual_counts, rr_sets
|
|
|
|
+ ))
|
|
|
|
+
|
|
|
|
+ def assertContainsRRSets(self, rr_sets_haystack, rr_sets_needle):
|
|
|
|
+ if not all(self._count_occurrences_by_mask(rr_sets_haystack, rr_sets_needle)):
|
|
|
|
+ self.fail('Expected to find RR sets with %s, but only got %s.' % (
|
|
|
|
+ rr_sets_needle, rr_sets_haystack
|
|
|
|
+ ))
|