base.py 51 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480
  1. import base64
  2. import operator
  3. import random
  4. import re
  5. import string
  6. from contextlib import nullcontext
  7. from functools import partial, reduce
  8. from json import JSONDecodeError
  9. from typing import Union, List, Dict, Set
  10. from unittest import mock
  11. from django.conf import settings
  12. from django.contrib.auth.hashers import check_password
  13. from django.core import mail
  14. from django.db import connection
  15. from httpretty import httpretty, core as hr_core
  16. from rest_framework import status
  17. from rest_framework.reverse import reverse
  18. from rest_framework.test import APITestCase, APIClient
  19. from rest_framework.utils import json
  20. from desecapi.models import User, Domain, Token, RRset, RR
  21. from desecapi.models.domains import psl
  22. from desecapi.models.records import (
  23. RR_SET_TYPES_AUTOMATIC,
  24. RR_SET_TYPES_UNSUPPORTED,
  25. RR_SET_TYPES_MANAGEABLE,
  26. )
  27. class DesecAPIClient(APIClient):
  28. @staticmethod
  29. def _http_header_base64_conversion(content):
  30. return base64.b64encode(content.encode()).decode()
  31. def set_credentials(self, authorization):
  32. self.credentials(HTTP_AUTHORIZATION=authorization)
  33. def set_credentials_basic_auth(self, part_1, part_2=None):
  34. if not part_1 and not part_2:
  35. self.set_credentials("")
  36. else:
  37. s = part_1 if not part_2 else "%s:%s" % (part_1, part_2)
  38. self.set_credentials("Basic " + self._http_header_base64_conversion(s))
  39. def set_credentials_token_auth(self, token):
  40. if token is None:
  41. self.set_credentials("")
  42. else:
  43. self.set_credentials("Token " + token)
  44. def __init__(self, *args, **kwargs):
  45. super().__init__(*args, **kwargs)
  46. self.reverse = DesecTestCase.reverse
  47. def bulk_patch_rr_sets(self, domain_name, payload):
  48. return self.patch(
  49. self.reverse("v1:rrsets", name=domain_name),
  50. payload,
  51. )
  52. def bulk_post_rr_sets(self, domain_name, payload):
  53. return self.post(
  54. self.reverse("v1:rrsets", name=domain_name),
  55. payload,
  56. )
  57. def bulk_put_rr_sets(self, domain_name, payload):
  58. return self.put(
  59. self.reverse("v1:rrsets", name=domain_name),
  60. payload,
  61. )
  62. def post_rr_set(self, domain_name, **kwargs):
  63. data = kwargs or None
  64. if data:
  65. data.setdefault("ttl", 60)
  66. return self.post(
  67. self.reverse("v1:rrsets", name=domain_name),
  68. data=data,
  69. )
  70. def get_rr_sets(self, domain_name, **kwargs):
  71. return self.get(
  72. self.reverse("v1:rrsets", name=domain_name) + kwargs.pop("query", ""),
  73. kwargs,
  74. )
  75. def get_rr_set(self, domain_name, subname, type_):
  76. return self.get(
  77. self.reverse("v1:rrset@", name=domain_name, subname=subname, type=type_)
  78. )
  79. def put_rr_set(self, domain_name, subname, type_, data):
  80. return self.put(
  81. self.reverse("v1:rrset@", name=domain_name, subname=subname, type=type_),
  82. data,
  83. )
  84. def patch_rr_set(self, domain_name, subname, type_, data):
  85. return self.patch(
  86. self.reverse("v1:rrset@", name=domain_name, subname=subname, type=type_),
  87. data,
  88. )
  89. def delete_rr_set(self, domain_name, subname, type_):
  90. return self.delete(
  91. self.reverse("v1:rrset@", name=domain_name, subname=subname, type=type_)
  92. )
  93. # TODO add and use {post,get,delete,...}_domain
  94. class SQLiteReadUncommitted:
  95. def __init__(self):
  96. self.read_uncommitted = None
  97. def __enter__(self):
  98. with connection.cursor() as cursor:
  99. cursor.execute("PRAGMA read_uncommitted;")
  100. self.read_uncommitted = True if cursor.fetchone()[0] else False
  101. cursor.execute("PRAGMA read_uncommitted = true;")
  102. def __exit__(self, exc_type, exc_val, exc_tb):
  103. if self.read_uncommitted is None:
  104. return
  105. with connection.cursor() as cursor:
  106. if self.read_uncommitted:
  107. cursor.execute("PRAGMA read_uncommitted = true;")
  108. else:
  109. cursor.execute("PRAGMA read_uncommitted = false;")
  110. class AssertRequestsContextManager:
  111. """
  112. Checks that in its context, certain expected requests are made.
  113. """
  114. @classmethod
  115. def _flatten_nested_lists(cls, l):
  116. for i in l:
  117. if isinstance(i, list) or isinstance(i, tuple):
  118. yield from cls._flatten_nested_lists(i)
  119. else:
  120. yield i
  121. def __init__(
  122. self,
  123. test_case,
  124. expected_requests,
  125. single_expectation_single_request=True,
  126. expect_order=True,
  127. exit_hook=None,
  128. ):
  129. """
  130. Initialize a context that checks for made HTTP requests.
  131. Args:
  132. test_case: Test case in which this context lives. Used to fail test if observed requests do not meet
  133. expectations.
  134. expected_requests: (Possibly nested) list of requests, represented by kwarg-dictionaries for
  135. `httpretty.register_uri`.
  136. single_expectation_single_request: If True (default), each expected request needs to be matched by exactly
  137. one observed request.
  138. expect_order: If True (default), requests made are expected in order of expectations given.
  139. """
  140. self.test_case = test_case
  141. self.expected_requests = list(self._flatten_nested_lists(expected_requests))
  142. self.single_expectation_single_request = single_expectation_single_request
  143. self.expect_order = expect_order
  144. self.old_httpretty_entries = None
  145. self.exit_hook = exit_hook
  146. def __enter__(self):
  147. hr_core.POTENTIAL_HTTP_PORTS.add(
  148. 8081
  149. ) # FIXME should depend on self.expected_requests
  150. # noinspection PyProtectedMember
  151. self.old_httpretty_entries = (
  152. httpretty._entries.copy()
  153. ) # FIXME accessing private properties of httpretty
  154. for request in self.expected_requests:
  155. httpretty.register_uri(**request)
  156. @staticmethod
  157. def _find_matching_request(pattern, requests):
  158. for request in requests:
  159. if pattern["method"] == request[0] and pattern["uri"].match(request[1]):
  160. if pattern.get("payload") and pattern["payload"] not in request[2]:
  161. continue
  162. return request
  163. return None
  164. def __exit__(self, exc_type, exc_val, exc_tb):
  165. # call exit hook
  166. if callable(self.exit_hook):
  167. self.exit_hook()
  168. # organize seen requests in a primitive data structure
  169. seen_requests = [
  170. (r.command, "http://%s%s" % (r.headers["Host"], r.path), r.parsed_body)
  171. for r in httpretty.latest_requests
  172. ]
  173. httpretty.reset()
  174. hr_core.POTENTIAL_HTTP_PORTS.add(
  175. 8081
  176. ) # FIXME should depend on self.expected_requests
  177. httpretty._entries = self.old_httpretty_entries
  178. unmatched_requests = seen_requests[:]
  179. # go through expected requests one by one
  180. requests_to_check = list(self.expected_requests)[:]
  181. while requests_to_check:
  182. request = requests_to_check.pop(0)
  183. # match request
  184. match = None
  185. if self.expect_order:
  186. if not self.single_expectation_single_request:
  187. raise ValueError(
  188. "Checking of multiple (possibly zero) requests per expectation and checking of request "
  189. "order simultaneously is not implemented, sorry."
  190. )
  191. if unmatched_requests:
  192. match = self._find_matching_request(
  193. request, [unmatched_requests[0]]
  194. )
  195. else:
  196. match = self._find_matching_request(
  197. request,
  198. unmatched_requests
  199. if self.single_expectation_single_request
  200. else seen_requests,
  201. )
  202. # check match
  203. if not match and self.single_expectation_single_request:
  204. self.test_case.fail(
  205. (
  206. "Expected to see a %s request on\n\n%s,\n\nbut only saw these %i "
  207. "requests:\n\n%s\n\nAll expected requests:\n\n%s\n\n"
  208. "Hint: check for possible duplicates in your expectation.\n"
  209. + (
  210. "Hint: Is the expectation order correct?"
  211. if self.expect_order
  212. else ""
  213. )
  214. )
  215. % (
  216. request["method"],
  217. request["uri"],
  218. len(seen_requests),
  219. "\n".join(map(str, seen_requests)),
  220. "\n".join(
  221. map(
  222. str,
  223. [
  224. (r["method"], r["uri"])
  225. for r in self.expected_requests
  226. ],
  227. )
  228. ),
  229. )
  230. )
  231. if match:
  232. unmatched_requests.remove(match)
  233. # see if any requests were unexpected
  234. if unmatched_requests and self.single_expectation_single_request:
  235. self.test_case.fail(
  236. "While waiting for %i request(s), we saw %i unexpected request(s). The unexpected "
  237. "request(s) was/were:\n\n%s\n\nAll recorded requests:\n\n%s\n\nAll expected requests:"
  238. "\n\n%s"
  239. % (
  240. len(self.expected_requests),
  241. len(unmatched_requests),
  242. "\n".join(map(str, unmatched_requests)),
  243. "\n".join(map(str, seen_requests)),
  244. "\n".join(
  245. map(
  246. str,
  247. [(r["method"], r["uri"]) for r in self.expected_requests],
  248. )
  249. ),
  250. )
  251. )
  252. class MockPDNSTestCase(APITestCase):
  253. """
  254. This test case provides a "mocked Internet" environment with a mock pdns API interface. All internet connections,
  255. HTTP or otherwise, that this test case is unaware of will result in an exception.
  256. By default, requests are intercepted but unexpected will result in a failed test. To set pdns API request
  257. expectations, use the `with MockPDNSTestCase.assertPdns*` context managers.
  258. Subclasses may not touch httpretty.enable() or httpretty.disable(). For 'local' usage, httpretty.register_uri()
  259. and httpretty.reset() may be used.
  260. """
  261. PDNS_ZONES = r"/zones\?rrsets=false"
  262. PDNS_ZONE_CRYPTO_KEYS = r"/zones/(?P<id>[^/]+)/cryptokeys"
  263. PDNS_ZONE = r"/zones/(?P<id>[^/]+)"
  264. PDNS_ZONE_AXFR = r"/zones/(?P<id>[^/]+)/axfr-retrieve"
  265. PDNS_ZONE_EXPORT = r"/zones/(?P<id>[^/]+)/export"
  266. PCH_ZONE_CREATE = r"/zones"
  267. PCH_ZONE_DELETE = r"/zones"
  268. @classmethod
  269. def get_full_pdns_url(cls, path_regex, ns="LORD", **kwargs):
  270. api = getattr(settings, "NS%s_PDNS_API" % ns)
  271. return re.compile("^" + api + cls.fill_regex_groups(path_regex, **kwargs) + "$")
  272. @classmethod
  273. def fill_regex_groups(cls, template, **kwargs):
  274. s = template
  275. for name, value in kwargs.items():
  276. if value is None:
  277. continue
  278. pattern = r"\(\?P\<%s\>[^\)]+\)" % name
  279. if not re.search(pattern, s):
  280. raise ValueError(
  281. "Tried to fill field %s in template %s, but it does not exist."
  282. % (name, template)
  283. )
  284. s = re.sub(
  285. pattern=pattern,
  286. repl=value,
  287. string=s,
  288. )
  289. return s
  290. @classmethod
  291. def _pdns_zone_id_heuristic(cls, name):
  292. """
  293. Returns an educated guess of the pdns zone id for a given zone name.
  294. """
  295. if not name:
  296. return None
  297. name = cls._normalize_name(name)
  298. return name.translate(
  299. str.maketrans({"/": "=2F", "_": "=5F"})
  300. ) # make sure =5F is not lower-cased
  301. @classmethod
  302. def _normalize_name(cls, arg):
  303. if not arg:
  304. return None
  305. if not isinstance(arg, list):
  306. return cls._normalize_name([arg])[0]
  307. else:
  308. return [x.rstrip(".") + "." for x in arg]
  309. @classmethod
  310. def request_pdns_zone_create(cls, ns, **kwargs):
  311. return {
  312. "method": "POST",
  313. "uri": cls.get_full_pdns_url(cls.PDNS_ZONES, ns=ns),
  314. "status": 201,
  315. "body": "",
  316. "match_querystring": True,
  317. **kwargs,
  318. }
  319. def request_pdns_zone_create_assert_name(self, ns, name):
  320. def request_callback(r, _, response_headers):
  321. body = json.loads(r.parsed_body)
  322. self.failIf(
  323. "name" not in body.keys(),
  324. "pdns domain creation request malformed: did not contain a domain name.",
  325. )
  326. try: # if an assertion fails, an exception is raised. We want to send a reply anyway!
  327. self.assertEqual(
  328. name,
  329. body["name"],
  330. "Expected to see a domain creation request with name %s, "
  331. "but name %s was sent." % (name, body["name"]),
  332. )
  333. finally:
  334. return [201, response_headers, ""]
  335. request = self.request_pdns_zone_create(ns)
  336. request.pop("status")
  337. # noinspection PyTypeChecker
  338. request["body"] = request_callback
  339. return request
  340. @classmethod
  341. def request_pdns_zone_create_422(cls):
  342. request = cls.request_pdns_zone_create(ns="LORD")
  343. request["status"] = 422
  344. return request
  345. @classmethod
  346. def request_pdns_zone_delete(cls, name=None, ns="LORD"):
  347. return {
  348. "method": "DELETE",
  349. "uri": cls.get_full_pdns_url(
  350. cls.PDNS_ZONE, ns=ns, id=cls._pdns_zone_id_heuristic(name)
  351. ),
  352. "status": 200,
  353. "body": "",
  354. }
  355. @classmethod
  356. def request_pdns_zone_update(cls, name=None):
  357. return {
  358. "method": "PATCH",
  359. "uri": cls.get_full_pdns_url(
  360. cls.PDNS_ZONE, id=cls._pdns_zone_id_heuristic(name)
  361. ),
  362. "status": 200,
  363. "body": "",
  364. }
  365. def request_pdns_zone_update_assert_body(
  366. self, name: str = None, updated_rr_sets: Union[List[RRset], Dict] = None
  367. ):
  368. if updated_rr_sets is None:
  369. updated_rr_sets = []
  370. def request_callback(r, _, response_headers):
  371. if not updated_rr_sets:
  372. # nothing to assert
  373. return [200, response_headers, ""]
  374. body = json.loads(r.parsed_body)
  375. self.failIf(
  376. "rrsets" not in body.keys(),
  377. "pdns zone update request malformed: did not contain a list of RR sets.",
  378. )
  379. try: # if an assertion fails, an exception is raised. We want to send a reply anyway!
  380. with SQLiteReadUncommitted(): # tests are wrapped in uncommitted transactions, so we need to see inside
  381. # convert updated_rr_sets into a plain data type, if Django models were given
  382. if isinstance(updated_rr_sets, list):
  383. updated_rr_sets_dict = {}
  384. for rr_set in updated_rr_sets:
  385. updated_rr_sets_dict[
  386. (rr_set.type, rr_set.subname, rr_set.ttl)
  387. ] = rrs = []
  388. for rr in rr_set.records.all():
  389. rrs.append(rr.content)
  390. elif isinstance(updated_rr_sets, dict):
  391. updated_rr_sets_dict = updated_rr_sets
  392. else:
  393. raise ValueError(
  394. "updated_rr_sets must be a list of RRSets or a dict."
  395. )
  396. # check expectations
  397. self.assertEqual(
  398. len(updated_rr_sets_dict),
  399. len(body["rrsets"]),
  400. "Saw an unexpected number of RR set updates: expected %i, intercepted %i."
  401. % (len(updated_rr_sets_dict), len(body["rrsets"])),
  402. )
  403. for (
  404. exp_type,
  405. exp_subname,
  406. exp_ttl,
  407. ), exp_records in updated_rr_sets_dict.items():
  408. expected_name = (
  409. ".".join(filter(None, [exp_subname, name])) + "."
  410. )
  411. for seen_rr_set in body["rrsets"]:
  412. if (
  413. expected_name == seen_rr_set["name"]
  414. and exp_type == seen_rr_set["type"]
  415. ):
  416. # TODO replace the following asserts by assertTTL, assertRecords, ... or similar
  417. if len(exp_records):
  418. self.assertEqual(exp_ttl, seen_rr_set["ttl"])
  419. self.assertEqual(
  420. set(exp_records),
  421. set(
  422. [rr["content"] for rr in seen_rr_set["records"]]
  423. ),
  424. )
  425. break
  426. else:
  427. # we did not break out, i.e. we did not find a matching RR set in body['rrsets']
  428. self.fail(
  429. "Expected to see an pdns zone update request for RR set of domain `%s` with name "
  430. "`%s` and type `%s`, but did not see one. Seen update request on %s for RR sets:"
  431. "\n\n%s"
  432. % (
  433. name,
  434. expected_name,
  435. exp_type,
  436. request["uri"],
  437. json.dumps(body["rrsets"], indent=4),
  438. )
  439. )
  440. finally:
  441. return [200, response_headers, ""]
  442. request = self.request_pdns_zone_update(name)
  443. request.pop("status")
  444. # noinspection PyTypeChecker
  445. request["body"] = request_callback
  446. return request
  447. @classmethod
  448. def request_pdns_zone_retrieve(cls, name=None):
  449. return {
  450. "method": "GET",
  451. "uri": cls.get_full_pdns_url(
  452. cls.PDNS_ZONE, id=cls._pdns_zone_id_heuristic(name)
  453. ),
  454. "status": 200,
  455. "body": json.dumps(
  456. {
  457. "rrsets": [
  458. {
  459. "comments": [],
  460. "name": cls._normalize_name(name) if name else "test.mock.",
  461. "ttl": 60,
  462. "type": "NS",
  463. "records": [{"content": ns} for ns in settings.DEFAULT_NS],
  464. }
  465. ]
  466. }
  467. ),
  468. }
  469. @staticmethod
  470. def get_body_pdns_zone_retrieve_crypto_keys():
  471. common_body = {
  472. "algorithm": "ECDSAP256SHA256",
  473. "bits": 256,
  474. "dnskey": "257 3 13 EVBcsqrnOp6RGWtsrr9QW8cUtt/WI5C81RcCZDTGNI9elAiMQlxRdnic+7V+b7jJDE2vgY08qAbxiNh5NdzkzA==",
  475. "id": 179425943,
  476. "published": True,
  477. "type": "Cryptokey",
  478. }
  479. common_cds = [
  480. "62745 13 2 5cddaeaa383e2ea7de49bd1212bf520228f0e3b334626517e5f6a68eb85b48f6",
  481. "62745 13 4 b3f2565901ddcb0b78337301cf863d1045774377bca05c7ad69e17a167734b929f0a49b7edcca913eb6f5dfeac4645b8",
  482. ]
  483. return [
  484. {
  485. **common_body,
  486. "flags": 257,
  487. "keytype": "csk",
  488. "cds": common_cds,
  489. },
  490. {
  491. **common_body,
  492. "flags": 257,
  493. "keytype": "ksk",
  494. "cds": common_cds,
  495. },
  496. {
  497. **common_body,
  498. "flags": 256,
  499. "keytype": "zsk",
  500. },
  501. ]
  502. @classmethod
  503. def request_pdns_zone_retrieve_zone_export(cls, name=None):
  504. return {
  505. "method": "GET",
  506. "uri": cls.get_full_pdns_url(
  507. cls.PDNS_ZONE_EXPORT, id=cls._pdns_zone_id_heuristic(name)
  508. ),
  509. "status": 200,
  510. "body": "Zone export dummy!",
  511. }
  512. @classmethod
  513. def request_pdns_zone_retrieve_crypto_keys(cls, name=None):
  514. return {
  515. "method": "GET",
  516. "uri": cls.get_full_pdns_url(
  517. cls.PDNS_ZONE_CRYPTO_KEYS, id=cls._pdns_zone_id_heuristic(name)
  518. ),
  519. "status": 200,
  520. "body": json.dumps(cls.get_body_pdns_zone_retrieve_crypto_keys()),
  521. }
  522. @classmethod
  523. def request_pdns_zone_axfr(cls, name=None):
  524. return {
  525. "method": "PUT",
  526. "uri": cls.get_full_pdns_url(
  527. cls.PDNS_ZONE_AXFR, ns="MASTER", id=cls._pdns_zone_id_heuristic(name)
  528. ),
  529. "status": 200,
  530. "body": "",
  531. }
  532. @classmethod
  533. def request_pdns_update_catalog(cls):
  534. return {
  535. "method": "PATCH",
  536. "uri": cls.get_full_pdns_url(
  537. cls.PDNS_ZONE,
  538. ns="MASTER",
  539. id=cls._pdns_zone_id_heuristic("catalog.internal"),
  540. ),
  541. "status": 204,
  542. "body": "",
  543. "priority": 1, # avoid collision with DELETE zones/(?P<id>[^/]+)$ (httpretty does not match the method)
  544. }
  545. def request_pch_zone_create(self, name, **kwargs):
  546. def request_callback(r, _, response_headers):
  547. try:
  548. self.assertEqual(
  549. r.parsed_body,
  550. {"zones": [name]},
  551. f"Expected PCH zone creation request for {name}, but got '{r.parsed_body}'.",
  552. )
  553. finally:
  554. return [
  555. 201,
  556. response_headers,
  557. json.dumps(
  558. {
  559. "status": True,
  560. "message": "Zone(s) ADDED",
  561. "zones": [name],
  562. }
  563. ),
  564. ]
  565. return {
  566. "method": "POST",
  567. "uri": re.compile("^" + settings.PCH_API + self.PCH_ZONE_CREATE),
  568. "body": request_callback,
  569. **kwargs,
  570. }
  571. def request_pch_zone_delete(self, name, **kwargs):
  572. def request_callback(r, _, response_headers):
  573. try:
  574. self.assertEqual(
  575. r.parsed_body,
  576. {"zones": [name]},
  577. f"Expected PCH zone deletion request for {name}, but got '{r.parsed_body}'.",
  578. )
  579. finally:
  580. return [
  581. 200,
  582. response_headers,
  583. json.dumps(
  584. {
  585. "status": True,
  586. "message": "Zone(s) deleted",
  587. "zones": [name],
  588. }
  589. ),
  590. ]
  591. return {
  592. "method": "DELETE",
  593. "uri": re.compile("^" + settings.PCH_API + self.PCH_ZONE_DELETE),
  594. "body": request_callback,
  595. **kwargs,
  596. }
  597. def assertRequests(self, *expected_requests, expect_order=True, exit_hook=None):
  598. """
  599. Assert the given requests are made. To build requests, use the `MockPDNSTestCase.request_*` functions.
  600. Unmet expectations will fail the test.
  601. Args:
  602. *expected_requests: List of expected requests.
  603. expect_order: If True (default), the order of observed requests is checked.
  604. exit_hook: If given a callable, it is called when the context manager exits.
  605. """
  606. return AssertRequestsContextManager(
  607. test_case=self,
  608. expected_requests=expected_requests,
  609. expect_order=expect_order,
  610. exit_hook=exit_hook,
  611. )
  612. def assertNoRequestsBut(self, *expected_requests):
  613. """
  614. Assert no requests other than the given ones are made. Each request can be matched more than once, unmatched
  615. expectations WILL NOT fail the test.
  616. Args:
  617. *expected_requests: List of acceptable requests to be made.
  618. """
  619. return AssertRequestsContextManager(
  620. test_case=self,
  621. expected_requests=expected_requests,
  622. single_expectation_single_request=False,
  623. expect_order=False,
  624. )
  625. def assertZoneCreation(self, name):
  626. """
  627. Asserts that nslord, nsmaster and PCH are contacted for zone creation.
  628. Name is only asserted for requests to PCH.
  629. """
  630. return AssertRequestsContextManager(
  631. test_case=self,
  632. expected_requests=[
  633. self.request_pdns_zone_create(ns="LORD"),
  634. self.request_pdns_zone_create(ns="MASTER"),
  635. self.request_pch_zone_create(name=name),
  636. ],
  637. )
  638. def assertZoneDeletion(self, name):
  639. """
  640. Asserts that nslord, nsmaster and PCH are contacted for zone deletion.
  641. """
  642. return AssertRequestsContextManager(
  643. test_case=self,
  644. expected_requests=[
  645. self.request_pdns_zone_delete(ns="LORD", name=name),
  646. self.request_pdns_zone_delete(ns="MASTER", name=name),
  647. self.request_pch_zone_delete(name=name),
  648. ],
  649. )
  650. def assertStatus(self, response, status):
  651. if response.status_code != status:
  652. self.fail(
  653. (
  654. "Expected a response with status %i, but saw response with status %i. "
  655. + (
  656. "\n@@@@@ THE REQUEST CAUSING THIS RESPONSE WAS UNEXPECTED BY THE TEST @@@@@\n"
  657. if response.status_code == 599
  658. else ""
  659. )
  660. + "The response was %s.\n"
  661. "The response body was\n\n%s"
  662. )
  663. % (
  664. status,
  665. response.status_code,
  666. response,
  667. str(response.data).replace("\\n", "\n")
  668. if hasattr(response, "data")
  669. else "",
  670. )
  671. )
  672. def assertResponse(self, response, code=None, body=None):
  673. if code:
  674. self.assertStatus(response, code)
  675. if body:
  676. self.assertJSONEqual(response.content, body)
  677. def assertToken(self, plain, user=None):
  678. user = user or self.owner
  679. self.assertTrue(
  680. any(
  681. check_password(plain, hashed, preferred="pbkdf2_sha256_iter1")
  682. for hashed in Token.objects.filter(user=user).values_list(
  683. "key", flat=True
  684. )
  685. )
  686. )
  687. self.assertEqual(len(Token.make_hash(plain).split("$")), 4)
  688. @classmethod
  689. def setUpTestData(cls):
  690. httpretty.enable(allow_net_connect=False)
  691. httpretty.reset()
  692. hr_core.POTENTIAL_HTTP_PORTS.add(
  693. 8081
  694. ) # FIXME static dependency on settings variable
  695. for request in [
  696. # TODO delete not in this list - is this even needed?
  697. cls.request_pdns_zone_create(ns="LORD"),
  698. cls.request_pdns_zone_create(ns="MASTER"),
  699. cls.request_pdns_zone_axfr(),
  700. cls.request_pdns_zone_update(),
  701. cls.request_pdns_zone_retrieve_crypto_keys(),
  702. cls.request_pdns_zone_retrieve(),
  703. ]:
  704. httpretty.register_uri(**request)
  705. cls.setUpTestDataWithPdns()
  706. httpretty.reset()
  707. @classmethod
  708. def setUpTestDataWithPdns(cls):
  709. """
  710. Override this method to set up test data. During the run of this method, httpretty is configured to accept
  711. all pdns API requests.
  712. """
  713. pass
  714. @classmethod
  715. def tearDownClass(cls):
  716. super().tearDownClass()
  717. httpretty.disable()
  718. def setUp(self):
  719. def request_callback(r, _, response_headers):
  720. try:
  721. request = json.loads(r.parsed_body)
  722. except JSONDecodeError:
  723. request = r.parsed_body
  724. return [
  725. 599,
  726. response_headers,
  727. json.dumps(
  728. {
  729. "MockPDNSTestCase": "This response was generated upon an unexpected request.",
  730. "request": request,
  731. "method": str(r.method),
  732. "requestline": str(r.raw_requestline),
  733. "host": str(r.headers["Host"]) if "Host" in r.headers else None,
  734. "headers": {
  735. str(key): str(value) for key, value in r.headers.items()
  736. },
  737. },
  738. indent=4,
  739. ),
  740. ]
  741. super().setUp()
  742. httpretty.reset()
  743. hr_core.POTENTIAL_HTTP_PORTS.add(
  744. 8081
  745. ) # FIXME should depend on self.expected_requests
  746. for method in [
  747. httpretty.GET,
  748. httpretty.PUT,
  749. httpretty.POST,
  750. httpretty.DELETE,
  751. httpretty.HEAD,
  752. httpretty.PATCH,
  753. httpretty.OPTIONS,
  754. httpretty.CONNECT,
  755. ]:
  756. for ns in ["LORD", "MASTER"]:
  757. httpretty.register_uri(
  758. method,
  759. self.get_full_pdns_url(".*", ns),
  760. body=request_callback,
  761. status=599,
  762. priority=-100,
  763. )
  764. httpretty.register_uri(
  765. method,
  766. re.compile("^" + settings.PCH_API + ".*"),
  767. body=request_callback,
  768. status=599,
  769. priority=-100,
  770. )
  771. class DesecTestCase(MockPDNSTestCase):
  772. """
  773. This test case is run in the "standard" deSEC e.V. setting, i.e. with an API that is aware of the public suffix
  774. domains AUTO_DELEGATION_DOMAINS.
  775. The test case aims to be as close to the deployment as possible and may be extended as the deployment evolves.
  776. The test case provides an admin user and a regular user for testing.
  777. """
  778. client_class = DesecAPIClient
  779. AUTO_DELEGATION_DOMAINS = settings.LOCAL_PUBLIC_SUFFIXES
  780. PUBLIC_SUFFIXES = {
  781. "de",
  782. "com",
  783. "io",
  784. "gov.cd",
  785. "edu.ec",
  786. "xxx",
  787. "pinb.gov.pl",
  788. "valer.ostfold.no",
  789. "kota.aichi.jp",
  790. "s3.amazonaws.com",
  791. "wildcard.ck",
  792. }
  793. SUPPORTED_RR_SET_TYPES = {
  794. "A",
  795. "AAAA",
  796. "AFSDB",
  797. "APL",
  798. "CAA",
  799. "CDNSKEY",
  800. "CDS",
  801. "CERT",
  802. "CNAME",
  803. "CSYNC",
  804. "DHCID",
  805. "DNAME",
  806. "DNSKEY",
  807. "DLV",
  808. "DS",
  809. "EUI48",
  810. "EUI64",
  811. "HINFO",
  812. "HTTPS",
  813. "KX",
  814. "L32",
  815. "L64",
  816. "LOC",
  817. "LP",
  818. "MX",
  819. "NAPTR",
  820. "NID",
  821. "NS",
  822. "OPENPGPKEY",
  823. "PTR",
  824. "RP",
  825. "SMIMEA",
  826. "SPF",
  827. "SRV",
  828. "SSHFP",
  829. "SVCB",
  830. "TLSA",
  831. "TXT",
  832. "URI",
  833. }
  834. admin = None
  835. auto_delegation_domains = None
  836. user = None
  837. @classmethod
  838. def reverse(cls, view_name, **kwargs):
  839. return reverse(view_name, kwargs=kwargs)
  840. @classmethod
  841. def setUpTestDataWithPdns(cls):
  842. super().setUpTestDataWithPdns()
  843. random.seed(0xDE5EC)
  844. cls.admin = cls.create_user(is_admin=True)
  845. cls.auto_delegation_domains = [
  846. cls.create_domain(name=name) for name in cls.AUTO_DELEGATION_DOMAINS
  847. ]
  848. cls.user = cls.create_user()
  849. @classmethod
  850. def random_string(cls, length=6, chars=string.ascii_letters + string.digits):
  851. return "".join(random.choice(chars) for _ in range(length))
  852. @classmethod
  853. def random_password(cls, length=12):
  854. return cls.random_string(
  855. length,
  856. chars=string.ascii_letters
  857. + string.digits
  858. + string.punctuation
  859. + 'some 💩🐬 UTF-8: “红色联合”对“四·二八兵团”总部大楼的攻击已持续了两天"',
  860. )
  861. @classmethod
  862. def random_ip(cls, proto=None):
  863. proto = proto or random.choice([4, 6])
  864. if proto == 4:
  865. return ".".join([str(random.randrange(256)) for _ in range(4)])
  866. elif proto == 6:
  867. return "2001:" + ":".join(
  868. ["%x" % random.randrange(16**4) for _ in range(7)]
  869. )
  870. else:
  871. raise ValueError(
  872. "Unknown IP protocol version %s. Expected int 4 or int 6." % str(proto)
  873. )
  874. @classmethod
  875. def random_username(cls, host=None):
  876. host = host or cls.random_domain_name(cls.PUBLIC_SUFFIXES)
  877. return cls.random_string() + "+test@" + host.lower()
  878. @classmethod
  879. def random_domain_name(cls, suffix=None):
  880. if not suffix:
  881. suffix = cls.PUBLIC_SUFFIXES
  882. if isinstance(suffix, set):
  883. suffix = random.sample(suffix, 1)[0]
  884. return (
  885. random.choice(string.ascii_letters)
  886. + cls.random_string()
  887. + "--test"
  888. + "."
  889. + suffix
  890. ).lower()
  891. @classmethod
  892. def has_local_suffix(cls, domain_name: str):
  893. return any(
  894. [
  895. domain_name.endswith(f".{suffix}")
  896. for suffix in settings.LOCAL_PUBLIC_SUFFIXES
  897. ]
  898. )
  899. @classmethod
  900. def create_token(cls, user, **kwargs):
  901. token = Token.objects.create(user=user, **kwargs)
  902. token.save()
  903. return token
  904. @classmethod
  905. def create_user(cls, needs_captcha=False, **kwargs):
  906. kwargs.setdefault("email", cls.random_username())
  907. user = User(needs_captcha=needs_captcha, **kwargs)
  908. user.plain_password = cls.random_string(length=12)
  909. user.set_password(user.plain_password)
  910. user.save()
  911. return user
  912. @classmethod
  913. def create_domain(cls, suffix=None, **kwargs):
  914. kwargs.setdefault("owner", cls.create_user())
  915. kwargs.setdefault("name", cls.random_domain_name(suffix))
  916. domain = Domain(**kwargs)
  917. domain.save()
  918. return domain
  919. @classmethod
  920. def create_rr_set(cls, domain, records, **kwargs):
  921. if isinstance(domain, str):
  922. domain = Domain.objects.get(name=domain)
  923. domain.save()
  924. rr_set = RRset(domain=domain, **kwargs)
  925. rr_set.save()
  926. for r in records:
  927. RR(content=r, rrset=rr_set).save()
  928. return rr_set
  929. @classmethod
  930. def _find_auto_delegation_zone(cls, name):
  931. if not name:
  932. return None
  933. parents = [
  934. parent
  935. for parent in cls.AUTO_DELEGATION_DOMAINS
  936. if name.endswith("." + parent)
  937. ]
  938. if not parents:
  939. raise ValueError(
  940. "Could not find auto delegation zone for zone %s; searched in %s"
  941. % (name, cls.AUTO_DELEGATION_DOMAINS)
  942. )
  943. return parents[0]
  944. def requests_desec_domain_creation(self, name=None, axfr=True, keys=True):
  945. soa_content = "get.desec.io. get.desec.io. 1 86400 3600 2419200 3600"
  946. requests = [
  947. self.request_pdns_zone_create(ns="LORD", payload=soa_content),
  948. self.request_pdns_zone_create(ns="MASTER"),
  949. self.request_pdns_update_catalog(),
  950. self.request_pch_zone_create(name=name),
  951. ]
  952. if axfr:
  953. requests.append(self.request_pdns_zone_axfr(name=name))
  954. if keys:
  955. requests.append(self.request_pdns_zone_retrieve_crypto_keys(name=name))
  956. return requests
  957. def requests_desec_domain_deletion(self, domain):
  958. requests = [
  959. self.request_pdns_zone_delete(name=domain.name, ns="LORD"),
  960. self.request_pdns_zone_delete(name=domain.name, ns="MASTER"),
  961. self.request_pdns_update_catalog(),
  962. self.request_pch_zone_delete(name=domain.name),
  963. ]
  964. if domain.is_locally_registrable:
  965. delegate_at = self._find_auto_delegation_zone(domain.name)
  966. requests += [
  967. self.request_pdns_zone_update(name=delegate_at),
  968. self.request_pdns_zone_axfr(name=delegate_at),
  969. ]
  970. return requests
  971. def requests_desec_domain_creation_auto_delegation(self, name=None):
  972. delegate_at = self._find_auto_delegation_zone(name)
  973. return self.requests_desec_domain_creation(name=name) + [
  974. self.request_pdns_zone_update(name=delegate_at),
  975. self.request_pdns_zone_axfr(name=delegate_at),
  976. ]
  977. @classmethod
  978. def requests_desec_rr_sets_update(cls, name=None):
  979. return [
  980. cls.request_pdns_zone_update(name=name),
  981. cls.request_pdns_zone_axfr(name=name),
  982. ]
  983. def assertRRSet(
  984. self, response_rr, domain=None, subname=None, records=None, type_=None, **kwargs
  985. ):
  986. kwargs["domain"] = domain
  987. kwargs["subname"] = subname
  988. kwargs["records"] = records
  989. kwargs["type"] = type_
  990. for key, value in kwargs.items():
  991. if value is not None:
  992. self.assertEqual(
  993. response_rr[key],
  994. value,
  995. 'RR set did not have the expected %s: Expected "%s" but was "%s" in %s'
  996. % (key, value, response_rr[key], response_rr),
  997. )
  998. def assertRRsetDB(
  999. self,
  1000. domain: Domain,
  1001. subname: str,
  1002. type_: str,
  1003. ttl: int = None,
  1004. rr_contents: Set[str] = None,
  1005. ):
  1006. if rr_contents is not None:
  1007. try:
  1008. has_rr_contents = {
  1009. rr.content
  1010. for rr in domain.rrset_set.get(
  1011. subname=subname, type=type_
  1012. ).records.all()
  1013. }
  1014. except RRset.DoesNotExist:
  1015. has_rr_contents = set()
  1016. self.assertSetEqual(
  1017. has_rr_contents,
  1018. rr_contents,
  1019. f'{domain.name}: RRset for subname="{subname}" and type={type_} did not have the expected records '
  1020. f"{rr_contents}, but had {has_rr_contents}.",
  1021. )
  1022. if ttl is not None:
  1023. has_ttl = domain.rrset_set.get(subname=subname, type=type_).ttl
  1024. self.assertEqual(
  1025. has_ttl,
  1026. ttl,
  1027. f'{domain.name}: RRset for subname="{subname}" and type={type_} did not '
  1028. f"have the expected TTL of {ttl}, but had {has_ttl}.",
  1029. )
  1030. @staticmethod
  1031. def _count_occurrences_by_mask(rr_sets, masks):
  1032. def _cmp(key, a, b):
  1033. if key == "records":
  1034. a = sorted(a)
  1035. b = sorted(b)
  1036. return a == b
  1037. def _filter_rr_sets_by_mask(rr_sets_, mask):
  1038. return [
  1039. rr_set
  1040. for rr_set in rr_sets_
  1041. if reduce(
  1042. operator.and_,
  1043. [
  1044. _cmp(key, rr_set.get(key, None), value)
  1045. for key, value in mask.items()
  1046. ],
  1047. )
  1048. ]
  1049. return [len(_filter_rr_sets_by_mask(rr_sets, mask)) for mask in masks]
  1050. def assertRRSetsCount(self, rr_sets, masks, count=1):
  1051. actual_counts = self._count_occurrences_by_mask(rr_sets, masks)
  1052. if not all([actual_count == count for actual_count in actual_counts]):
  1053. self.fail(
  1054. "Expected to find %i RR set(s) for each of %s, but distribution is %s in %s."
  1055. % (count, masks, actual_counts, rr_sets)
  1056. )
  1057. def assertContainsRRSets(self, rr_sets_haystack, rr_sets_needle):
  1058. if not all(self._count_occurrences_by_mask(rr_sets_haystack, rr_sets_needle)):
  1059. self.fail(
  1060. "Expected to find RR sets with %s, but only got %s."
  1061. % (rr_sets_needle, rr_sets_haystack)
  1062. )
  1063. def assertContains(
  1064. self, response, text, count=None, status_code=200, msg_prefix="", html=False
  1065. ):
  1066. # convenience method to check the status separately, which yields nicer error messages
  1067. self.assertStatus(response, status_code)
  1068. # same for the substring check
  1069. self.assertIn(
  1070. text,
  1071. response.content.decode(response.charset),
  1072. f"Could not find {text} in the following response:\n{response.content.decode(response.charset)}",
  1073. )
  1074. return super().assertContains(
  1075. response, text, count, status_code, msg_prefix, html
  1076. )
  1077. def assertAllSupportedRRSetTypes(self, types):
  1078. self.assertEqual(
  1079. types,
  1080. self.SUPPORTED_RR_SET_TYPES,
  1081. "Either some RR types given are unsupported, or not all "
  1082. "supported RR types were in the given set.",
  1083. )
  1084. def assertEmailSent(
  1085. self,
  1086. subject_contains="",
  1087. body_contains=None,
  1088. recipient=None,
  1089. reset=True,
  1090. pattern=None,
  1091. ):
  1092. total = 1
  1093. self.assertEqual(
  1094. len(mail.outbox),
  1095. total,
  1096. "Expected %i message in the outbox, but found %i."
  1097. % (total, len(mail.outbox)),
  1098. )
  1099. email = mail.outbox[-1]
  1100. self.assertTrue(
  1101. subject_contains in email.subject,
  1102. "Expected '%s' in the email subject, but found '%s'"
  1103. % (subject_contains, email.subject),
  1104. )
  1105. if type(body_contains) != list:
  1106. body_contains = [] if body_contains is None else [body_contains]
  1107. for elem in body_contains:
  1108. self.assertTrue(
  1109. elem in email.body,
  1110. f"Expected '{elem}' in the email body, but found '{email.body}'",
  1111. )
  1112. if recipient is not None:
  1113. if isinstance(recipient, list):
  1114. self.assertListEqual(recipient, email.recipients())
  1115. else:
  1116. self.assertIn(recipient, email.recipients())
  1117. body = email.body
  1118. self.assertIn("user_id = ", body)
  1119. if reset:
  1120. mail.outbox = []
  1121. return body if not pattern else re.search(pattern, body).group(1)
  1122. def assertConfirmationLinkRedirect(self, confirmation_link):
  1123. response = self.client.get(confirmation_link)
  1124. self.assertResponse(response, status.HTTP_406_NOT_ACCEPTABLE)
  1125. response = self.client.get(confirmation_link, HTTP_ACCEPT="text/html")
  1126. self.assertResponse(response, status.HTTP_302_FOUND)
  1127. self.assertNoEmailSent()
  1128. def assertNoEmailSent(self):
  1129. self.assertFalse(
  1130. mail.outbox,
  1131. "Expected no email to be sent, but %i were sent. First subject line is '%s'."
  1132. % (len(mail.outbox), mail.outbox[0].subject if mail.outbox else "<n/a>"),
  1133. )
  1134. class PublicSuffixMockMixin:
  1135. def _mock_get_public_suffix(self, domain_name, public_suffixes=None):
  1136. if public_suffixes is None:
  1137. public_suffixes = settings.LOCAL_PUBLIC_SUFFIXES | self.PUBLIC_SUFFIXES
  1138. # Poor man's PSL interpreter. First, find all known suffixes covering the domain.
  1139. suffixes = [
  1140. suffix
  1141. for suffix in public_suffixes
  1142. if ".{}".format(domain_name).endswith(".{}".format(suffix))
  1143. ]
  1144. # Also, consider TLD.
  1145. suffixes += [domain_name.rsplit(".")[-1]]
  1146. # Select the candidate with the most labels.
  1147. return max(suffixes, key=lambda suffix: suffix.count("."))
  1148. @staticmethod
  1149. def _mock_is_public_suffix(name):
  1150. return name == psl.get_public_suffix(name)
  1151. def get_psl_context_manager(self, side_effect_parameter):
  1152. if side_effect_parameter is None:
  1153. return nullcontext()
  1154. if callable(side_effect_parameter):
  1155. side_effect = side_effect_parameter
  1156. else:
  1157. side_effect = partial(
  1158. self._mock_get_public_suffix,
  1159. public_suffixes=[side_effect_parameter]
  1160. if not isinstance(side_effect_parameter, list)
  1161. else list(side_effect_parameter),
  1162. )
  1163. return mock.patch.object(psl, "get_public_suffix", side_effect=side_effect)
  1164. def setUpMockPatch(self):
  1165. mock.patch.object(
  1166. psl, "get_public_suffix", side_effect=self._mock_get_public_suffix
  1167. ).start()
  1168. mock.patch.object(
  1169. psl, "is_public_suffix", side_effect=self._mock_is_public_suffix
  1170. ).start()
  1171. self.addCleanup(mock.patch.stopall)
  1172. class DomainOwnerTestCase(DesecTestCase, PublicSuffixMockMixin):
  1173. """
  1174. This test case creates a domain owner, some domains for her and some domains that are owned by other users.
  1175. DomainOwnerTestCase.client is authenticated with the owner's token.
  1176. """
  1177. DYN = False
  1178. NUM_OWNED_DOMAINS = 2
  1179. NUM_OTHER_DOMAINS = 20
  1180. owner = None
  1181. my_domains = None
  1182. other_domains = None
  1183. my_domain = None
  1184. other_domain = None
  1185. token = None
  1186. @classmethod
  1187. def setUpTestDataWithPdns(cls):
  1188. super().setUpTestDataWithPdns()
  1189. cls.owner = cls.create_user()
  1190. domain_kwargs = {"suffix": cls.AUTO_DELEGATION_DOMAINS if cls.DYN else None}
  1191. if cls.DYN:
  1192. domain_kwargs["minimum_ttl"] = 60
  1193. cls.my_domains = [
  1194. cls.create_domain(owner=cls.owner, **domain_kwargs)
  1195. for _ in range(cls.NUM_OWNED_DOMAINS)
  1196. ]
  1197. cls.other_domains = [
  1198. cls.create_domain(**domain_kwargs) for _ in range(cls.NUM_OTHER_DOMAINS)
  1199. ]
  1200. if cls.DYN:
  1201. for domain in cls.my_domains + cls.other_domains:
  1202. parent_domain = Domain.objects.get(name=domain.parent_domain_name)
  1203. parent_domain.update_delegation(domain)
  1204. cls.my_domain = cls.my_domains[0]
  1205. cls.other_domain = cls.other_domains[0]
  1206. cls.create_rr_set(cls.my_domain, ["127.0.0.1", "3.2.2.3"], type="A", ttl=123)
  1207. cls.create_rr_set(cls.other_domain, ["40.1.1.1"], type="A", ttl=456)
  1208. cls.token = cls.create_token(user=cls.owner)
  1209. def setUp(self):
  1210. super().setUp()
  1211. self.client.credentials(HTTP_AUTHORIZATION="Token " + self.token.plain)
  1212. self.setUpMockPatch()
  1213. class DynDomainOwnerTestCase(DomainOwnerTestCase):
  1214. DYN = True
  1215. @classmethod
  1216. def request_pdns_zone_axfr(cls, name=None):
  1217. return super().request_pdns_zone_axfr(name.lower() if name else None)
  1218. @classmethod
  1219. def request_pdns_zone_update(cls, name=None):
  1220. return super().request_pdns_zone_update(name.lower() if name else None)
  1221. def _assertDynDNS12Update(self, requests, mock_remote_addr="", **kwargs):
  1222. with self.assertRequests(requests):
  1223. if mock_remote_addr:
  1224. return self.client.get(
  1225. self.reverse("v1:dyndns12update"),
  1226. kwargs,
  1227. REMOTE_ADDR=mock_remote_addr,
  1228. )
  1229. else:
  1230. return self.client.get(self.reverse("v1:dyndns12update"), kwargs)
  1231. def assertDynDNS12Update(
  1232. self, domain_name=None, mock_remote_addr="", expect_update=True, **kwargs
  1233. ):
  1234. if expect_update:
  1235. pdns_name = (
  1236. self._normalize_name(domain_name).lower() if domain_name else None
  1237. )
  1238. requests = [
  1239. self.request_pdns_zone_update(name=pdns_name),
  1240. self.request_pdns_zone_axfr(name=pdns_name),
  1241. ]
  1242. else:
  1243. requests = []
  1244. return self._assertDynDNS12Update(requests, mock_remote_addr, **kwargs)
  1245. def assertDynDNS12NoUpdate(self, *args, **kwargs):
  1246. return self.assertDynDNS12Update(expect_update=False, *args, **kwargs)
  1247. def setUp(self):
  1248. super().setUp()
  1249. self.client_token_authorized = self.client_class()
  1250. self.client.set_credentials_basic_auth(
  1251. self.my_domain.name.lower(), self.token.plain
  1252. )
  1253. self.client_token_authorized.set_credentials_token_auth(self.token.plain)
  1254. class AuthenticatedRRSetBaseTestCase(DomainOwnerTestCase):
  1255. UNSUPPORTED_TYPES = RR_SET_TYPES_UNSUPPORTED
  1256. AUTOMATIC_TYPES = RR_SET_TYPES_AUTOMATIC
  1257. ALLOWED_TYPES = RR_SET_TYPES_MANAGEABLE
  1258. SUBNAMES = [
  1259. "foo",
  1260. "bar.baz",
  1261. "q.w.e.r.t",
  1262. "*",
  1263. "*.foobar",
  1264. "_",
  1265. "-foo.test",
  1266. "_bar",
  1267. ]
  1268. @classmethod
  1269. def _test_rr_sets(cls, subname=None, type_=None, records=None, ttl=None):
  1270. """
  1271. Gives a list of example RR sets for testing.
  1272. Args:
  1273. subname: Filter by subname. None to allow any.
  1274. type_: Filter by type. None to allow any.
  1275. records: Filter by records. Must match exactly. None to allow any.
  1276. ttl: Filter by ttl. None to allow any.
  1277. Returns: Returns a list of tuples that represents example RR sets represented as 4-tuples consisting of
  1278. subname, type_, records, ttl
  1279. """
  1280. # TODO add more examples of cls.ALLOWED_TYPES
  1281. # NOTE The validity of the RRset contents it *not* verified. We currently leave this task to pdns.
  1282. rr_sets = (
  1283. [
  1284. ("", "A", ["1.2.3.4"], 3620),
  1285. ("test", "A", ["2.2.3.4"], 3620),
  1286. ("test", "TXT", ['"foobar"'], 3620),
  1287. ]
  1288. + [
  1289. (subname_, "TXT", ['"hey ho, let\'s go!"'], 134)
  1290. for subname_ in cls.SUBNAMES
  1291. ]
  1292. + [
  1293. (subname_, type_, ["10 mx1.example.com."], 101)
  1294. for subname_ in cls.SUBNAMES
  1295. for type_ in ["MX", "SPF"]
  1296. ]
  1297. + [(subname_, "A", ["1.2.3.4"], 187) for subname_ in cls.SUBNAMES]
  1298. )
  1299. if subname or type_ or records or ttl:
  1300. rr_sets = [
  1301. rr_set
  1302. for rr_set in rr_sets
  1303. if (
  1304. (subname is None or subname == rr_set[0])
  1305. and (type_ is None or type_ == rr_set[1])
  1306. and (records is None or records == rr_set[2])
  1307. and (ttl is None or ttl == rr_set[3])
  1308. )
  1309. ]
  1310. return rr_sets
  1311. @classmethod
  1312. def setUpTestDataWithPdns(cls):
  1313. super().setUpTestDataWithPdns()
  1314. # TODO this test does not cover "dyn" / auto delegation domains
  1315. cls.my_empty_domain = cls.create_domain(suffix="", owner=cls.owner)
  1316. cls.my_rr_set_domain = cls.create_domain(suffix="", owner=cls.owner)
  1317. cls.other_rr_set_domain = cls.create_domain(suffix="")
  1318. for domain in [cls.my_rr_set_domain, cls.other_rr_set_domain]:
  1319. for subname, type_, records, ttl in cls._test_rr_sets():
  1320. cls.create_rr_set(
  1321. domain, subname=subname, type=type_, records=records, ttl=ttl
  1322. )