base.py 35 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902
  1. import base64
  2. import operator
  3. import random
  4. import re
  5. import string
  6. from contextlib import nullcontext
  7. from functools import partial, reduce
  8. from unittest import mock
  9. from django.utils import timezone
  10. from httpretty import httpretty, core as hr_core
  11. from rest_framework.reverse import reverse
  12. from rest_framework.test import APITestCase, APIClient
  13. from rest_framework.utils import json
  14. from api import settings
  15. from desecapi.models import User, Domain, Token, RRset, RR
  16. from desecapi.views import DomainList
  17. class DesecAPIClient(APIClient):
  18. @staticmethod
  19. def _http_header_base64_conversion(content):
  20. return base64.b64encode(content.encode()).decode()
  21. def set_credentials(self, authorization):
  22. self.credentials(HTTP_AUTHORIZATION=authorization)
  23. def set_credentials_basic_auth(self, part_1, part_2=None):
  24. if not part_1 and not part_2:
  25. self.set_credentials('')
  26. else:
  27. s = part_1 if not part_2 else '%s:%s' % (part_1, part_2)
  28. self.set_credentials('Basic ' + self._http_header_base64_conversion(s))
  29. def set_credentials_token_auth(self, token):
  30. if token is None:
  31. self.set_credentials('')
  32. else:
  33. self.set_credentials('Token ' + token)
  34. def __init__(self, *args, **kwargs):
  35. super().__init__(*args, **kwargs)
  36. self.reverse = DesecTestCase.reverse
  37. def bulk_patch_rr_sets(self, domain_name, payload):
  38. return self.patch(
  39. self.reverse('v1:rrsets', name=domain_name),
  40. payload,
  41. )
  42. def bulk_post_rr_sets(self, domain_name, payload):
  43. return self.post(
  44. self.reverse('v1:rrsets', name=domain_name),
  45. payload,
  46. )
  47. def bulk_put_rr_sets(self, domain_name, payload):
  48. return self.put(
  49. self.reverse('v1:rrsets', name=domain_name),
  50. payload,
  51. )
  52. def post_rr_set(self, domain_name, **kwargs):
  53. kwargs.setdefault('subname', '')
  54. kwargs.setdefault('ttl', 60)
  55. return self.post(
  56. self.reverse('v1:rrsets', name=domain_name),
  57. kwargs,
  58. )
  59. def get_rr_sets(self, domain_name, **kwargs):
  60. return self.get(
  61. self.reverse('v1:rrsets', name=domain_name),
  62. kwargs
  63. )
  64. def get_rr_set(self, domain_name, subname, type_):
  65. return self.get(
  66. self.reverse('v1:rrset@', name=domain_name, subname=subname, type=type_)
  67. )
  68. def put_rr_set(self, domain_name, subname, type_, **kwargs):
  69. return self.put(
  70. self.reverse('v1:rrset@', name=domain_name, subname=subname, type=type_),
  71. kwargs
  72. )
  73. def patch_rr_set(self, domain_name, subname, type_, **kwargs):
  74. return self.patch(
  75. self.reverse('v1:rrset@', name=domain_name, subname=subname, type=type_),
  76. kwargs
  77. )
  78. def delete_rr_set(self, domain_name, subname, type_):
  79. return self.delete(
  80. self.reverse('v1:rrset@', name=domain_name, subname=subname, type=type_)
  81. )
  82. # TODO add and use {post,get,delete,...}_domain
  83. class AssertRequestsContextManager:
  84. """
  85. Checks that in its context, certain expected requests are made.
  86. """
  87. @classmethod
  88. def _flatten_nested_lists(cls, l):
  89. for i in l:
  90. if isinstance(i, list) or isinstance(i, tuple):
  91. yield from cls._flatten_nested_lists(i)
  92. else:
  93. yield i
  94. def __init__(self, test_case, expected_requests, single_expectation_single_request=True, expect_order=True):
  95. """
  96. Initialize a context that checks for made HTTP requests.
  97. Args:
  98. test_case: Test case in which this context lives. Used to fail test if observed requests do not meet
  99. expectations.
  100. expected_requests: (Possibly nested) list of requests, represented by kwarg-dictionaries for
  101. `httpretty.register_uri`.
  102. single_expectation_single_request: If True (default), each expected request needs to be matched by exactly
  103. one observed request.
  104. expect_order: If True (default), requests made are expected in order of expectations given.
  105. """
  106. self.test_case = test_case
  107. self.expected_requests = list(self._flatten_nested_lists(expected_requests))
  108. self.single_expectation_single_request = single_expectation_single_request
  109. self.expect_order = expect_order
  110. self.old_httpretty_entries = None
  111. def __enter__(self):
  112. hr_core.POTENTIAL_HTTP_PORTS.add(8081) # FIXME should depend on self.expected_requests
  113. self.expected_requests = self.expected_requests
  114. self.old_httpretty_entries = httpretty._entries.copy() # FIXME accessing private properties of httpretty
  115. for request in self.expected_requests:
  116. httpretty.register_uri(**request)
  117. @staticmethod
  118. def _find_matching_request(pattern, requests):
  119. for request in requests:
  120. if pattern['method'] == request[0] and pattern['uri'].match(request[1]):
  121. return request
  122. return None
  123. def __exit__(self, exc_type, exc_val, exc_tb):
  124. # organize seen requests in a primitive data structure
  125. seen_requests = [
  126. (r.command, 'http://%s%s' % (r.headers['Host'], r.path)) for r in httpretty.latest_requests
  127. ]
  128. httpretty.reset()
  129. hr_core.POTENTIAL_HTTP_PORTS.add(8081) # FIXME should depend on self.expected_requests
  130. httpretty._entries = self.old_httpretty_entries
  131. unmatched_requests = seen_requests[:]
  132. # go through expected requests one by one
  133. requests_to_check = list(self.expected_requests)[:]
  134. while requests_to_check:
  135. request = requests_to_check.pop(0)
  136. # match request
  137. match = None
  138. if self.expect_order:
  139. if not self.single_expectation_single_request:
  140. raise ValueError(
  141. 'Checking of multiple (possibly zero) requests per expectation and checking of request '
  142. 'order simultaneously is not implemented, sorry.')
  143. if unmatched_requests:
  144. match = self._find_matching_request(request, [unmatched_requests[0]])
  145. else:
  146. match = self._find_matching_request(
  147. request, unmatched_requests if self.single_expectation_single_request else seen_requests)
  148. # check match
  149. if not match and self.single_expectation_single_request:
  150. self.test_case.fail(('Expected to see a %s request on\n\n%s,\n\nbut only saw these %i '
  151. 'requests:\n\n%s\n\nAll expected requests:\n\n%s\n\n'
  152. 'Hint: check for possible duplicates in your expectation.\n' +
  153. ('Hint: Is the expectation order correct?' if self.expect_order else '')) % (
  154. request['method'], request['uri'], len(seen_requests),
  155. '\n'.join(map(str, seen_requests)),
  156. '\n'.join(map(str, [(r['method'], r['uri']) for r in self.expected_requests])),
  157. ))
  158. if match:
  159. unmatched_requests.remove(match)
  160. # see if any requests were unexpected
  161. if unmatched_requests and self.single_expectation_single_request:
  162. self.test_case.fail('While waiting for %i request(s), we saw %i unexpected request(s). The unexpected '
  163. 'request(s) was/were:\n\n%s\n\nAll recorded requests:\n\n%s\n\nAll expected requests:'
  164. '\n\n%s' % (
  165. len(self.expected_requests),
  166. len(unmatched_requests),
  167. '\n'.join(map(str, unmatched_requests)),
  168. '\n'.join(map(str, seen_requests)),
  169. '\n'.join(map(str, [(r['method'], r['uri']) for r in self.expected_requests])),
  170. ))
  171. class MockPDNSTestCase(APITestCase):
  172. """
  173. This test case provides a "mocked Internet" environment with a mock pdns API interface. All internet connections,
  174. HTTP or otherwise, that this test case is unaware of will result in an exception.
  175. By default, requests are intercepted but unexpected will result in a failed test. To set pdns API request
  176. expectations, use the `with MockPDNSTestCase.assertPdns*` context managers.
  177. Subclasses may not touch httpretty.enable() or httpretty.disable(). For 'local' usage, httpretty.register_uri()
  178. and httpretty.reset() may be used.
  179. """
  180. PDNS_ZONES = r'/zones'
  181. PDNS_ZONE_CRYPTO_KEYS = r'/zones/(?P<id>[^/]+)/cryptokeys'
  182. PDNS_ZONE = r'/zones/(?P<id>[^/]+)'
  183. PDNS_ZONE_AXFR = r'/zones/(?P<id>[^/]+)/axfr-retrieve'
  184. @classmethod
  185. def get_full_pdns_url(cls, path_regex, ns='LORD', **kwargs):
  186. api = getattr(settings, 'NS%s_PDNS_API' % ns)
  187. return re.compile('^' + api + cls.fill_regex_groups(path_regex, **kwargs) + '$')
  188. @classmethod
  189. def fill_regex_groups(cls, template, **kwargs):
  190. s = template
  191. for name, value in kwargs.items():
  192. if value is None:
  193. continue
  194. pattern = r'\(\?P\<%s\>[^\)]+\)' % name
  195. if not re.search(pattern, s):
  196. raise ValueError('Tried to fill field %s in template %s, but it does not exist.' % (name, template))
  197. s = re.sub(
  198. pattern=pattern,
  199. repl=value,
  200. string=s,
  201. )
  202. return s
  203. @classmethod
  204. def _pdns_zone_id_heuristic(cls, name):
  205. """
  206. Returns an educated guess of the pdns zone id for a given zone name.
  207. """
  208. if not name:
  209. return None
  210. name = cls._normalize_name(name)
  211. return name.translate(str.maketrans({'/': '=2F', '_': '=5F'})) # make sure =5F is not lower-cased
  212. @classmethod
  213. def _normalize_name(cls, arg):
  214. if not arg:
  215. return None
  216. if not isinstance(arg, list):
  217. return cls._normalize_name([arg])[0]
  218. else:
  219. return [x.rstrip('.') + '.' for x in arg]
  220. @classmethod
  221. def request_pdns_zone_create(cls, ns):
  222. return {
  223. 'method': 'POST',
  224. 'uri': cls.get_full_pdns_url(cls.PDNS_ZONES, ns=ns),
  225. 'status': 201,
  226. 'body': None,
  227. }
  228. @classmethod
  229. def request_pdns_zone_create_422(cls):
  230. request = cls.request_pdns_zone_create(ns='LORD')
  231. request['status'] = 422
  232. return request
  233. @classmethod
  234. def request_pdns_zone_create_already_exists(cls, existing_domains=None):
  235. existing_domains = cls._normalize_name(existing_domains)
  236. def request_callback(r, _, response_headers):
  237. body = json.loads(r.parsed_body)
  238. if not existing_domains or body['name'] in existing_domains:
  239. return [422, response_headers, json.dumps({'error': 'Domain \'%s\' already exists' % body['name']})]
  240. else:
  241. return [200, response_headers, '']
  242. request = cls.request_pdns_zone_create_422()
  243. request['body'] = request_callback
  244. request.pop('status')
  245. return request
  246. @classmethod
  247. def request_pdns_zone_delete(cls, name=None, ns='LORD'):
  248. return {
  249. 'method': 'DELETE',
  250. 'uri': cls.get_full_pdns_url(cls.PDNS_ZONE, ns=ns, id=cls._pdns_zone_id_heuristic(name)),
  251. 'status': 200,
  252. 'body': None,
  253. }
  254. @classmethod
  255. def request_pdns_zone_update(cls, name=None):
  256. return {
  257. 'method': 'PATCH',
  258. 'uri': cls.get_full_pdns_url(cls.PDNS_ZONE, id=cls._pdns_zone_id_heuristic(name)),
  259. 'status': 200,
  260. 'body': None,
  261. }
  262. @classmethod
  263. def request_pdns_zone_update_unknown_type(cls, name=None, unknown_types=None):
  264. def request_callback(r, _, response_headers):
  265. body = json.loads(r.parsed_body)
  266. if not unknown_types or body['rrsets'][0]['type'] in unknown_types:
  267. return [
  268. 422, response_headers,
  269. json.dumps({'error': 'Mocked error. Unknown RR type %s.' % body['rrsets'][0]['type']})
  270. ]
  271. else:
  272. return [200, response_headers, None]
  273. request = cls.request_pdns_zone_update(name)
  274. request['body'] = request_callback
  275. request.pop('status')
  276. return request
  277. @classmethod
  278. def request_pdns_zone_retrieve(cls, name=None):
  279. return {
  280. 'method': 'GET',
  281. 'uri': cls.get_full_pdns_url(cls.PDNS_ZONE, id=cls._pdns_zone_id_heuristic(name)),
  282. 'status': 200,
  283. 'body': json.dumps({
  284. 'rrsets': [{
  285. 'comments': [],
  286. 'name': cls._normalize_name(name) if name else 'test.mock.',
  287. 'ttl': 60,
  288. 'type': 'NS',
  289. 'records': [{'content': ns} for ns in settings.DEFAULT_NS],
  290. }]
  291. }),
  292. }
  293. @classmethod
  294. def request_pdns_zone_retrieve_crypto_keys(cls, name=None):
  295. return {
  296. 'method': 'GET',
  297. 'uri': cls.get_full_pdns_url(cls.PDNS_ZONE_CRYPTO_KEYS, id=cls._pdns_zone_id_heuristic(name)),
  298. 'status': 200,
  299. 'body': json.dumps([
  300. {
  301. 'active': True,
  302. 'algorithm': 'ECDSAP256SHA256',
  303. 'bits': 256,
  304. 'dnskey': '257 3 13 EVBcsqrnOp6RGWtsrr9QW8cUtt/'
  305. 'WI5C81RcCZDTGNI9elAiMQlxRdnic+7V+b7jJDE2vgY08qAbxiNh5NdzkzA==',
  306. 'ds': [
  307. '62745 13 1 642d70d9bb84903ca4c4ca08a6e4f1e9465aeaa6',
  308. '62745 13 2 5cddaeaa383e2ea7de49bd1212bf520228f0e3b334626517e5f6a68eb85b48f6',
  309. '62745 13 4 b3f2565901ddcb0b78337301cf863d1045774377bca05c7ad69e17a167734b92'
  310. '9f0a49b7edcca913eb6f5dfeac4645b8'
  311. ],
  312. 'flags': 257,
  313. 'id': 179425943,
  314. 'keytype': key_type,
  315. 'type': 'Cryptokey',
  316. }
  317. for key_type in ['csk', 'ksk', 'zsk']
  318. ])
  319. }
  320. @classmethod
  321. def request_pdns_zone_axfr(cls, name=None):
  322. return {
  323. 'method': 'PUT',
  324. 'uri': cls.get_full_pdns_url(cls.PDNS_ZONE_AXFR, ns='MASTER', id=cls._pdns_zone_id_heuristic(name)),
  325. 'status': 200,
  326. 'body': None,
  327. }
  328. def assertPdnsRequests(self, *expected_requests, expect_order=True):
  329. """
  330. Assert the given requests are made. To build requests, use the `MockPDNSTestCase.request_*` functions.
  331. Unmet expectations will fail the test.
  332. Args:
  333. *expected_requests: List of expected requests.
  334. expect_order: If True (default), the order of observed requests is checked.
  335. """
  336. return AssertRequestsContextManager(
  337. test_case=self,
  338. expected_requests=expected_requests,
  339. expect_order=expect_order,
  340. )
  341. def assertPdnsNoRequestsBut(self, *expected_requests):
  342. """
  343. Assert no requests other than the given ones are made. Each request can be matched more than once, unmatched
  344. expectations WILL NOT fail the test.
  345. Args:
  346. *expected_requests: List of acceptable requests to be made.
  347. """
  348. return AssertRequestsContextManager(
  349. test_case=self,
  350. expected_requests=expected_requests,
  351. single_expectation_single_request=False,
  352. expect_order=False,
  353. )
  354. def assertPdnsZoneCreation(self):
  355. """
  356. Asserts that nslord is contact and a zone is created.
  357. """
  358. return AssertRequestsContextManager(
  359. test_case=self,
  360. expected_requests=[
  361. self.request_pdns_zone_create(ns='LORD'),
  362. self.request_pdns_zone_create(ns='MASTER')
  363. ],
  364. )
  365. def assertPdnsZoneDeletion(self, name=None):
  366. """
  367. Asserts that nslord and nsmaster are contacted to delete a zone.
  368. Args:
  369. name: If given, the test is restricted to the name of this zone.
  370. """
  371. return AssertRequestsContextManager(
  372. test_case=self,
  373. expected_requests=[
  374. self.request_pdns_zone_delete(ns='LORD', name=name),
  375. self.request_pdns_zone_delete(ns='MASTER', name=name),
  376. ],
  377. )
  378. def assertStatus(self, response, status):
  379. if response.status_code != status:
  380. self.fail((
  381. 'Expected a response with status %i, but saw response with status %i. ' +
  382. (
  383. '\n@@@@@ THE REQUEST CAUSING THIS RESPONSE WAS UNEXPECTED BY THE TEST @@@@@\n'
  384. if response.status_code == 599 else ''
  385. ) +
  386. 'The response was %s.\n'
  387. 'The response body was\n\n%s') % (
  388. status,
  389. response.status_code,
  390. response,
  391. str(response.data).replace('\\n', '\n') if hasattr(response, 'data') else '',
  392. ))
  393. @classmethod
  394. def setUpTestData(cls):
  395. httpretty.enable(allow_net_connect=False)
  396. httpretty.reset()
  397. hr_core.POTENTIAL_HTTP_PORTS.add(8081) # FIXME static dependency on settings variable
  398. for request in [
  399. cls.request_pdns_zone_create(ns='LORD'),
  400. cls.request_pdns_zone_create(ns='MASTER'),
  401. cls.request_pdns_zone_axfr(),
  402. cls.request_pdns_zone_update(),
  403. cls.request_pdns_zone_retrieve_crypto_keys(),
  404. cls.request_pdns_zone_retrieve()
  405. ]:
  406. httpretty.register_uri(**request)
  407. cls.setUpTestDataWithPdns()
  408. httpretty.reset()
  409. @classmethod
  410. def setUpTestDataWithPdns(cls):
  411. """
  412. Override this method to set up test data. During the run of this method, httpretty is configured to accept
  413. all pdns API requests.
  414. """
  415. pass
  416. @classmethod
  417. def tearDownClass(cls):
  418. super().tearDownClass()
  419. httpretty.disable()
  420. def setUp(self):
  421. def request_callback(r, _, response_headers):
  422. return [
  423. 599,
  424. response_headers,
  425. json.dumps(
  426. {
  427. 'MockPDNSTestCase': 'This response was generated upon an unexpected request.',
  428. 'request': str(r),
  429. 'method': str(r.method),
  430. 'requestline': str(r.raw_requestline),
  431. 'host': str(r.headers['Host']) if 'Host' in r.headers else None,
  432. 'headers': {str(key): str(value) for key, value in r.headers.items()},
  433. },
  434. indent=4
  435. )
  436. ]
  437. super().setUp()
  438. httpretty.reset()
  439. hr_core.POTENTIAL_HTTP_PORTS.add(8081) # FIXME should depend on self.expected_requests
  440. for method in [
  441. httpretty.GET, httpretty.PUT, httpretty.POST, httpretty.DELETE, httpretty.HEAD, httpretty.PATCH,
  442. httpretty.OPTIONS, httpretty.CONNECT
  443. ]:
  444. for ns in ['LORD', 'MASTER']:
  445. httpretty.register_uri(
  446. method,
  447. self.get_full_pdns_url('.*', ns),
  448. body=request_callback,
  449. status=599,
  450. priority=-100,
  451. )
  452. class DesecTestCase(MockPDNSTestCase):
  453. """
  454. This test case is run in the "standard" deSEC e.V. setting, i.e. with an API that is aware of the public suffix
  455. domains AUTO_DELEGATION_DOMAINS.
  456. The test case aims to be as close to the deployment as possible and may be extended as the deployment evolves.
  457. The test case provides an admin user and a regular user for testing.
  458. """
  459. client_class = DesecAPIClient
  460. AUTO_DELEGATION_DOMAINS = settings.LOCAL_PUBLIC_SUFFIXES
  461. PUBLIC_SUFFIXES = {'de', 'com', 'io', 'gov.cd', 'edu.ec', 'xxx', 'pinb.gov.pl', 'valer.ostfold.no',
  462. 'kota.aichi.jp', 's3.amazonaws.com', 'wildcard.ck'}
  463. @classmethod
  464. def reverse(cls, view_name, **kwargs):
  465. return reverse(view_name, kwargs=kwargs)
  466. @classmethod
  467. def setUpTestDataWithPdns(cls):
  468. super().setUpTestDataWithPdns()
  469. random.seed(0xde5ec)
  470. cls.admin = cls.create_user(is_admin=True)
  471. cls.add_domains = [cls.create_domain(name=name) for name in cls.AUTO_DELEGATION_DOMAINS]
  472. cls.user = cls.create_user()
  473. @classmethod
  474. def random_string(cls, length=6, chars=string.ascii_letters + string.digits):
  475. return ''.join(random.choice(chars) for _ in range(length))
  476. @classmethod
  477. def random_password(cls, length=12):
  478. return cls.random_string(
  479. length,
  480. chars=string.ascii_letters + string.digits + string.punctuation +
  481. 'some 💩🐬 UTF-8: “红色联合”对“四·二八兵团”总部大楼的攻击已持续了两天"'
  482. )
  483. @classmethod
  484. def random_ip(cls, proto=None):
  485. proto = proto or random.choice([4, 6])
  486. if proto == 4:
  487. return '.'.join([str(random.randrange(256)) for _ in range(4)])
  488. elif proto == 6:
  489. return '2001:' + ':'.join(['%x' % random.randrange(16**4) for _ in range(7)])
  490. else:
  491. raise ValueError('Unknown IP protocol version %s. Expected int 4 or int 6.' % str(proto))
  492. @classmethod
  493. def random_username(cls, host=None):
  494. host = host or cls.random_domain_name(cls.PUBLIC_SUFFIXES)
  495. return cls.random_string() + '+test@' + host.lower()
  496. @classmethod
  497. def random_domain_name(cls, suffix=None):
  498. if not suffix:
  499. suffix = cls.PUBLIC_SUFFIXES
  500. if isinstance(suffix, set):
  501. suffix = random.sample(suffix, 1)[0]
  502. return (random.choice(string.ascii_letters) + cls.random_string() + '--test' + '.' + suffix).lower()
  503. @classmethod
  504. def create_token(cls, user):
  505. token = Token.objects.create(user=user)
  506. token.save()
  507. return token
  508. @classmethod
  509. def create_user(cls, **kwargs):
  510. kwargs.setdefault('email', cls.random_username())
  511. user = User(**kwargs)
  512. user.plain_password = cls.random_string(length=12)
  513. user.set_password(user.plain_password)
  514. user.save()
  515. return user
  516. @classmethod
  517. def create_domain(cls, suffix=None, **kwargs):
  518. kwargs.setdefault('owner', cls.create_user())
  519. kwargs.setdefault('name', cls.random_domain_name(suffix))
  520. domain = Domain(**kwargs)
  521. domain.save()
  522. return domain
  523. @classmethod
  524. def create_rr_set(cls, domain, records, **kwargs):
  525. if isinstance(domain, str):
  526. domain = Domain.objects.get_or_create(name=domain)
  527. domain.save()
  528. rr_set = RRset(domain=domain, **kwargs)
  529. rr_set.save()
  530. for r in records:
  531. RR(content=r, rrset=rr_set).save()
  532. return rr_set
  533. @classmethod
  534. def _find_auto_delegation_zone(cls, name):
  535. if not name:
  536. return None
  537. parents = [parent for parent in cls.AUTO_DELEGATION_DOMAINS if name.endswith('.' + parent)]
  538. if not parents:
  539. raise ValueError('Could not find auto delegation zone for zone %s; searched in %s' % (
  540. name,
  541. cls.AUTO_DELEGATION_DOMAINS
  542. ))
  543. return parents[0]
  544. @classmethod
  545. def requests_desec_domain_creation(cls, name=None):
  546. return [
  547. cls.request_pdns_zone_create(ns='LORD'),
  548. cls.request_pdns_zone_create(ns='MASTER'),
  549. cls.request_pdns_zone_axfr(name=name),
  550. cls.request_pdns_zone_retrieve(name=name),
  551. cls.request_pdns_zone_retrieve_crypto_keys(name=name),
  552. ]
  553. @classmethod
  554. def requests_desec_domain_deletion(cls, name=None):
  555. return [
  556. cls.request_pdns_zone_delete(name=name, ns='LORD'),
  557. cls.request_pdns_zone_delete(name=name, ns='MASTER'),
  558. ]
  559. @classmethod
  560. def requests_desec_domain_creation_auto_delegation(cls, name=None):
  561. delegate_at = cls._find_auto_delegation_zone(name)
  562. return cls.requests_desec_domain_creation(name=name) + [
  563. cls.request_pdns_zone_update(name=delegate_at),
  564. cls.request_pdns_zone_axfr(name=delegate_at),
  565. cls.request_pdns_zone_retrieve_crypto_keys(name=name),
  566. ]
  567. @classmethod
  568. def requests_desec_domain_deletion_auto_delegation(cls, name=None):
  569. delegate_at = cls._find_auto_delegation_zone(name)
  570. return [
  571. cls.request_pdns_zone_update(name=delegate_at),
  572. cls.request_pdns_zone_axfr(name=delegate_at),
  573. cls.request_pdns_zone_delete(name=name, ns='LORD'),
  574. cls.request_pdns_zone_delete(name=name, ns='MASTER'),
  575. ]
  576. @classmethod
  577. def requests_desec_rr_sets_update(cls, name=None):
  578. return [
  579. cls.request_pdns_zone_update(name=name),
  580. cls.request_pdns_zone_axfr(name=name),
  581. ]
  582. class DomainOwnerTestCase(DesecTestCase):
  583. """
  584. This test case creates a domain owner, some domains for her and some domains that are owned by other users.
  585. DomainOwnerTestCase.client is authenticated with the owner's token.
  586. """
  587. DYN = False
  588. NUM_OWNED_DOMAINS = 2
  589. NUM_OTHER_DOMAINS = 20
  590. owner = None
  591. my_domains = None
  592. other_domains = None
  593. my_domain = None
  594. other_domain = None
  595. token = None
  596. def _mock_get_public_suffix(self, domain_name, public_suffixes=None):
  597. if public_suffixes is None:
  598. public_suffixes = settings.LOCAL_PUBLIC_SUFFIXES | self.PUBLIC_SUFFIXES
  599. # Poor man's PSL interpreter. First, find all known suffixes covering the domain.
  600. suffixes = [suffix for suffix in public_suffixes
  601. if '.{}'.format(domain_name).endswith('.{}'.format(suffix))]
  602. # Also, consider TLD.
  603. suffixes += [domain_name.rsplit('.')[-1]]
  604. # Select the candidate with the most labels.
  605. return max(suffixes, key=lambda suffix: suffix.count('.'))
  606. @staticmethod
  607. def _mock_is_public_suffix(name):
  608. return name == DomainList.psl.get_public_suffix(name)
  609. def get_psl_context_manager(self, side_effect_parameter):
  610. if side_effect_parameter is None:
  611. return nullcontext()
  612. if callable(side_effect_parameter):
  613. side_effect = side_effect_parameter
  614. else:
  615. side_effect = partial(self._mock_get_public_suffix, public_suffixes=[side_effect_parameter])
  616. return mock.patch.object(DomainList.psl, 'get_public_suffix', side_effect=side_effect)
  617. def setUpMockPatch(self):
  618. mock.patch.object(DomainList.psl, 'get_public_suffix', side_effect=self._mock_get_public_suffix).start()
  619. mock.patch.object(DomainList.psl, 'is_public_suffix', side_effect=self._mock_is_public_suffix).start()
  620. self.addCleanup(mock.patch.stopall)
  621. @classmethod
  622. def setUpTestDataWithPdns(cls):
  623. super().setUpTestDataWithPdns()
  624. cls.owner = cls.create_user(dyn=cls.DYN)
  625. cls.my_domains = [
  626. cls.create_domain(suffix=cls.AUTO_DELEGATION_DOMAINS if cls.DYN else None, owner=cls.owner)
  627. for _ in range(cls.NUM_OWNED_DOMAINS)
  628. ]
  629. cls.other_domains = [
  630. cls.create_domain(suffix=cls.AUTO_DELEGATION_DOMAINS if cls.DYN else None)
  631. for _ in range(cls.NUM_OTHER_DOMAINS)
  632. ]
  633. cls.my_domain = cls.my_domains[0]
  634. cls.other_domain = cls.other_domains[0]
  635. cls.create_rr_set(cls.my_domain, ['127.0.0.1', '127.0.1.1'], type='A', ttl=123)
  636. cls.create_rr_set(cls.other_domain, ['40.1.1.1', '40.2.2.2'], type='A', ttl=456)
  637. cls.token = cls.create_token(user=cls.owner)
  638. def setUp(self):
  639. super().setUp()
  640. self.client.credentials(HTTP_AUTHORIZATION='Token ' + self.token.key)
  641. self.setUpMockPatch()
  642. class LockedDomainOwnerTestCase(DomainOwnerTestCase):
  643. @classmethod
  644. def setUpTestData(cls):
  645. super().setUpTestData()
  646. cls.owner.locked = timezone.now()
  647. cls.owner.save()
  648. class DynDomainOwnerTestCase(DomainOwnerTestCase):
  649. DYN = True
  650. @classmethod
  651. def request_pdns_zone_axfr(cls, name=None):
  652. return super().request_pdns_zone_axfr(name.lower() if name else None)
  653. @classmethod
  654. def request_pdns_zone_update(cls, name=None):
  655. return super().request_pdns_zone_update(name.lower() if name else None)
  656. def _assertDynDNS12Update(self, requests, mock_remote_addr='', **kwargs):
  657. with self.assertPdnsRequests(requests):
  658. if mock_remote_addr:
  659. return self.client.get(self.reverse('v1:dyndns12update'), kwargs, REMOTE_ADDR=mock_remote_addr)
  660. else:
  661. return self.client.get(self.reverse('v1:dyndns12update'), kwargs)
  662. def assertDynDNS12Update(self, domain_name=None, mock_remote_addr='', **kwargs):
  663. pdns_name = self._normalize_name(domain_name).lower() if domain_name else None
  664. return self._assertDynDNS12Update(
  665. [self.request_pdns_zone_update(name=pdns_name), self.request_pdns_zone_axfr(name=pdns_name)],
  666. mock_remote_addr,
  667. **kwargs
  668. )
  669. def assertDynDNS12NoUpdate(self, mock_remote_addr='', **kwargs):
  670. return self._assertDynDNS12Update([], mock_remote_addr, **kwargs)
  671. def setUp(self):
  672. super().setUp()
  673. self.client_token_authorized = self.client_class()
  674. self.client.set_credentials_basic_auth(self.my_domain.name.lower(), self.token.key)
  675. self.client_token_authorized.set_credentials_token_auth(self.token.key)
  676. class AuthenticatedRRSetBaseTestCase(DomainOwnerTestCase):
  677. DEAD_TYPES = ['ALIAS', 'DNAME']
  678. RESTRICTED_TYPES = ['SOA', 'RRSIG', 'DNSKEY', 'NSEC3PARAM', 'OPT']
  679. # see https://doc.powerdns.com/md/types/
  680. PDNS_RR_TYPES = ['A', 'AAAA', 'AFSDB', 'ALIAS', 'CAA', 'CERT', 'CDNSKEY', 'CDS', 'CNAME', 'DNSKEY', 'DNAME', 'DS',
  681. 'HINFO', 'KEY', 'LOC', 'MX', 'NAPTR', 'NS', 'NSEC', 'NSEC3', 'NSEC3PARAM', 'OPENPGPKEY', 'PTR',
  682. 'RP', 'RRSIG', 'SOA', 'SPF', 'SSHFP', 'SRV', 'TKEY', 'TSIG', 'TLSA', 'SMIMEA', 'TXT', 'URI']
  683. ALLOWED_TYPES = ['A', 'AAAA', 'AFSDB', 'CAA', 'CERT', 'CDNSKEY', 'CDS', 'CNAME', 'DS', 'HINFO', 'KEY', 'LOC', 'MX',
  684. 'NAPTR', 'NS', 'NSEC', 'NSEC3', 'OPENPGPKEY', 'PTR', 'RP', 'SPF', 'SSHFP', 'SRV', 'TKEY', 'TSIG',
  685. 'TLSA', 'SMIMEA', 'TXT', 'URI']
  686. SUBNAMES = ['foo', 'bar.baz', 'q.w.e.r.t', '*', '*.foobar', '_', '-foo.test', '_bar']
  687. @classmethod
  688. def _test_rr_sets(cls, subname=None, type_=None, records=None, ttl=None):
  689. """
  690. Gives a list of example RR sets for testing.
  691. Args:
  692. subname: Filter by subname. None to allow any.
  693. type_: Filter by type. None to allow any.
  694. records: Filter by records. Must match exactly. None to allow any.
  695. ttl: Filter by ttl. None to allow any.
  696. Returns: Returns a list of tuples that represents example RR sets represented as 4-tuples consisting of
  697. subname, type_, records, ttl
  698. """
  699. # TODO add more examples of cls.ALLOWED_TYPES
  700. # NOTE The validity of the RRset contents it *not* verified. We currently leave this task to pdns.
  701. rr_sets = [
  702. ('', 'A', ['1.2.3.4'], 120),
  703. ('test', 'A', ['2.2.3.4'], 120),
  704. ('test', 'TXT', ['"foobar"'], 120),
  705. ] + [
  706. (subname_, 'TXT', ['"hey ho, let\'s go!"'], 134)
  707. for subname_ in cls.SUBNAMES
  708. ] + [
  709. (subname_, type_, ['10 mx1.example.com.'], 101)
  710. for subname_ in cls.SUBNAMES
  711. for type_ in ['MX', 'SPF']
  712. ] + [
  713. (subname_, 'A', ['1.2.3.4'], 187)
  714. for subname_ in cls.SUBNAMES
  715. ]
  716. if subname or type_ or records or ttl:
  717. rr_sets = [
  718. rr_set for rr_set in rr_sets
  719. if (
  720. (subname is None or subname == rr_set[0]) and
  721. (type_ is None or type_ == rr_set[1]) and
  722. (records is None or records == rr_set[2]) and
  723. (ttl is None or ttl == rr_set[3])
  724. )
  725. ]
  726. return rr_sets
  727. @classmethod
  728. def setUpTestDataWithPdns(cls):
  729. super().setUpTestDataWithPdns()
  730. # TODO this test does not cover "dyn" / auto delegation domains
  731. cls.my_empty_domain = cls.create_domain(suffix='', owner=cls.owner)
  732. cls.my_rr_set_domain = cls.create_domain(suffix='', owner=cls.owner)
  733. cls.other_rr_set_domain = cls.create_domain(suffix='')
  734. for domain in [cls.my_rr_set_domain, cls.other_rr_set_domain]:
  735. for (subname, type_, records, ttl) in cls._test_rr_sets():
  736. cls.create_rr_set(domain, subname=subname, type=type_, records=records, ttl=ttl)
  737. def assertRRSet(self, response_rr, domain=None, subname=None, records=None, type_=None, **kwargs):
  738. kwargs['domain'] = domain
  739. kwargs['subname'] = subname
  740. kwargs['records'] = records
  741. kwargs['type'] = type_
  742. for key, value in kwargs.items():
  743. if value is not None:
  744. self.assertEqual(
  745. response_rr[key], value,
  746. 'RR set did not have the expected %s: Expected "%s" but was "%s" in %s' % (
  747. key, value, response_rr[key], response_rr
  748. )
  749. )
  750. @staticmethod
  751. def _count_occurrences_by_mask(rr_sets, masks):
  752. def _cmp(key, a, b):
  753. if key == 'records':
  754. a = sorted(a)
  755. b = sorted(b)
  756. return a == b
  757. def _filter_rr_sets_by_mask(rr_sets_, mask):
  758. return [rr_set for rr_set in rr_sets_
  759. if reduce(operator.and_, [_cmp(key, rr_set.get(key, None), value) for key, value in mask.items()])
  760. ]
  761. return [len(_filter_rr_sets_by_mask(rr_sets, mask)) for mask in masks]
  762. def assertRRSetsCount(self, rr_sets, masks, count=1):
  763. actual_counts = self._count_occurrences_by_mask(rr_sets, masks)
  764. if not all([actual_count == count for actual_count in actual_counts]):
  765. self.fail('Expected to find %i RR set(s) for each of %s, but distribution is %s in %s.' % (
  766. count, masks, actual_counts, rr_sets
  767. ))
  768. def assertContainsRRSets(self, rr_sets_haystack, rr_sets_needle):
  769. if not all(self._count_occurrences_by_mask(rr_sets_haystack, rr_sets_needle)):
  770. self.fail('Expected to find RR sets with %s, but only got %s.' % (
  771. rr_sets_needle, rr_sets_haystack
  772. ))