base.py 25 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674
  1. import base64
  2. import random
  3. import re
  4. import string
  5. from django.utils import timezone
  6. from httpretty import httpretty, core as hr_core
  7. from rest_framework.reverse import reverse
  8. from rest_framework.test import APITestCase, APIClient
  9. from rest_framework.utils import json
  10. from api import settings
  11. from desecapi.models import User, Domain, Token, RRset, RR
  12. class DesecAPIClient(APIClient):
  13. def __init__(self, *args, **kwargs):
  14. super().__init__(*args, **kwargs)
  15. self.reverse = DesecTestCase.reverse
  16. def post_rr_set(self, domain_name, **kwargs):
  17. kwargs.setdefault('subname', '')
  18. kwargs.setdefault('ttl', 60)
  19. return self.post(
  20. self.reverse('v1:rrsets', name=domain_name),
  21. kwargs,
  22. )
  23. def get_rr_sets(self, domain_name, **kwargs):
  24. # FIXME add 'v1:rrset@', there seems to be a bug with that
  25. return self.get(
  26. self.reverse('v1:rrsets', name=domain_name),
  27. kwargs
  28. )
  29. def get_rr_set(self, domain_name, subname, type_):
  30. # FIXME add 'v1:rrset@', there seems to be a bug with that
  31. return self.get(
  32. self.reverse('v1:rrset', name=domain_name, subname=subname, type=type_)
  33. )
  34. def put_rr_set(self, domain_name, subname, type_, **kwargs):
  35. # FIXME add 'v1:rrset@', there seems to be a bug with that
  36. return self.put(
  37. self.reverse('v1:rrset', name=domain_name, subname=subname, type=type_),
  38. kwargs
  39. )
  40. def patch_rr_set(self, domain_name, subname, type_, **kwargs):
  41. # FIXME add 'v1:rrset@', there seems to be a bug with that
  42. return self.patch(
  43. self.reverse('v1:rrset', name=domain_name, subname=subname, type=type_),
  44. kwargs
  45. )
  46. def delete_rr_set(self, domain_name, subname, type_):
  47. # FIXME add 'v1:rrset@', there seems to be a bug with that
  48. return self.delete(
  49. self.reverse('v1:rrset', name=domain_name, subname=subname, type=type_)
  50. )
  51. # TODO add and use {post,get,delete,...}_domain
  52. class AssertRequestsContextManager:
  53. """
  54. Checks that in its context, certain expected requests are made.
  55. """
  56. @classmethod
  57. def _flatten_nested_lists(cls, l):
  58. for i in l:
  59. if isinstance(i, list) or isinstance(i, tuple):
  60. yield from cls._flatten_nested_lists(i)
  61. else:
  62. yield i
  63. def __init__(self, test_case, expected_requests, single_expectation_single_request=True, expect_order=True):
  64. """
  65. Initialize a context that checks for made HTTP requests.
  66. Args:
  67. test_case: Test case in which this context lives. Used to fail test if observed requests do not meet
  68. expectations.
  69. expected_requests: (Possibly nested) list of requests, represented by kwarg-dictionaries for
  70. `httpretty.register_uri`.
  71. single_expectation_single_request: If True (default), each expected request needs to be matched by exactly
  72. one observed request.
  73. expect_order: If True (default), requests made are expected in order of expectations given.
  74. """
  75. self.test_case = test_case
  76. self.expected_requests = list(self._flatten_nested_lists(expected_requests))
  77. self.single_expectation_single_request = single_expectation_single_request
  78. self.expect_order = expect_order
  79. def __enter__(self):
  80. hr_core.POTENTIAL_HTTP_PORTS.add(8081) # FIXME should depend on self.expected_requests
  81. self.expected_requests = self.expected_requests
  82. for request in self.expected_requests:
  83. httpretty.register_uri(**request)
  84. @staticmethod
  85. def _find_matching_request(pattern, requests):
  86. for request in requests:
  87. if pattern['method'] == request[0] and pattern['uri'].match(request[1]):
  88. return request
  89. return None
  90. def __exit__(self, exc_type, exc_val, exc_tb):
  91. # organize seen requests in a primitive data structure
  92. seen_requests = [
  93. (r.command, 'http://%s%s' % (r.headers['Host'], r.path)) for r in httpretty.latest_requests
  94. ]
  95. httpretty.reset()
  96. unmatched_requests = seen_requests[:]
  97. # go through expected requests one by one
  98. requests_to_check = list(self.expected_requests)[:]
  99. while requests_to_check:
  100. request = requests_to_check.pop(0)
  101. # match request
  102. match = None
  103. if self.expect_order:
  104. if not self.single_expectation_single_request:
  105. raise ValueError(
  106. 'Checking of multiple (possibly zero) requests per expectation and checking of request '
  107. 'order simultaneously is not implemented, sorry.')
  108. if unmatched_requests:
  109. match = self._find_matching_request(request, [unmatched_requests[0]])
  110. else:
  111. match = self._find_matching_request(
  112. request, unmatched_requests if self.single_expectation_single_request else seen_requests)
  113. # check match
  114. if not match and self.single_expectation_single_request:
  115. self.test_case.fail(('Expected to see a %s request on\n\n%s,\n\nbut only saw these %i '
  116. 'requests:\n\n%s\n\nAll expected requests:\n\n%s\n\n'
  117. 'Hint: check for possible duplicates in your expectation.\n' +
  118. ('Hint: Is the expectation order correct?' if self.expect_order else '')) % (
  119. request['method'], request['uri'], len(seen_requests),
  120. '\n'.join(map(str, seen_requests)),
  121. '\n'.join(map(str, [(r['method'], r['uri']) for r in self.expected_requests])),
  122. ))
  123. if match:
  124. unmatched_requests.remove(match)
  125. # see if any requests were unexpected
  126. if unmatched_requests and self.single_expectation_single_request:
  127. self.test_case.fail('While waiting for %i request(s), we saw %i unexpected request(s). The unexpected '
  128. 'request(s) was/were:\n\n%s\n\nAll recorded requests:\n\n%s\n\nAll expected requests:'
  129. '\n\n%s' % (
  130. len(self.expected_requests),
  131. len(unmatched_requests),
  132. '\n'.join(map(str, unmatched_requests)),
  133. '\n'.join(map(str, seen_requests)),
  134. '\n'.join(map(str, [(r['method'], r['uri']) for r in self.expected_requests])),
  135. ))
  136. class MockPDNSTestCase(APITestCase):
  137. """
  138. This test case provides a "mocked Internet" environment with a mock pdns API interface. All internet connections,
  139. HTTP or otherwise, that this test case is unaware of will result in an exception.
  140. By default, requests are intercepted but unexpected and result in a failed test. To set pdns API request
  141. expectations, use the `with MockPDNSTestCase.assertPdns*` functions.
  142. Subclasses may not touch httpretty.enable() or httpretty.disable(). For 'local' usage, httpretty.register_uri()
  143. and httpretty.reset() may be used.
  144. """
  145. PDNS_ZONES = r'/zones'
  146. PDNS_ZONE_CRYPTO_KEYS = r'/zones/(?P<id>[^/]+)/cryptokeys'
  147. PDNS_ZONE = r'/zones/(?P<id>[^/]+)'
  148. PDNS_ZONE_NOTIFY = r'/zones/(?P<id>[^/]+)/notify'
  149. @classmethod
  150. def get_full_pdns_url(cls, path_regex, ns='LORD', **kwargs):
  151. api = getattr(settings, 'NS%s_PDNS_API' % ns)
  152. return re.compile('^' + api + cls.fill_regex_groups(path_regex, **kwargs) + '$')
  153. @classmethod
  154. def fill_regex_groups(cls, template, **kwargs):
  155. s = template
  156. for name, value in kwargs.items():
  157. if value is None:
  158. continue
  159. pattern = r'\(\?P\<%s\>[^\)]+\)' % name
  160. if not re.search(pattern, s):
  161. raise ValueError('Tried to fill field %s in template %s, but it does not exist.' % (name, template))
  162. s = re.sub(
  163. pattern=pattern,
  164. repl=value,
  165. string=s,
  166. )
  167. return s
  168. @classmethod
  169. def _pdns_zone_id_heuristic(cls, name):
  170. """
  171. Returns an educated guess of the pdns zone id for a given zone name.
  172. """
  173. if not name:
  174. return None
  175. name = name.translate(str.maketrans({'/': '=2F', '_': '=5F'}))
  176. return cls._normalize_name(name)
  177. @classmethod
  178. def _normalize_name(cls, arg):
  179. if not arg:
  180. return None
  181. if not isinstance(arg, list):
  182. return cls._normalize_name([arg])[0]
  183. else:
  184. return [x if x.endswith('.') else x + '.' for x in arg]
  185. @classmethod
  186. def request_pdns_zone_create(cls):
  187. return {
  188. 'method': 'POST',
  189. 'uri': cls.get_full_pdns_url(cls.PDNS_ZONES),
  190. 'status': 201,
  191. 'body': None,
  192. }
  193. @classmethod
  194. def request_pdns_zone_create_422(cls):
  195. request = cls.request_pdns_zone_create()
  196. request['status'] = 422
  197. return request
  198. @classmethod
  199. def request_pdns_zone_create_already_exists(cls, existing_domains=None):
  200. existing_domains = cls._normalize_name(existing_domains)
  201. def request_callback(r, _, response_headers):
  202. body = json.loads(r.parsed_body)
  203. if not existing_domains or body['name'] in existing_domains:
  204. return [422, response_headers, json.dumps({'error': 'Domain \'%s\' already exists' % body['name']})]
  205. else:
  206. return [200, response_headers, None]
  207. request = cls.request_pdns_zone_create_422()
  208. request['body'] = request_callback
  209. request.pop('status')
  210. return request
  211. @classmethod
  212. def request_pdns_zone_delete(cls, name=None, ns='LORD'):
  213. return {
  214. 'method': 'DELETE',
  215. 'uri': cls.get_full_pdns_url(cls.PDNS_ZONE, ns=ns, id=cls._pdns_zone_id_heuristic(name)),
  216. 'status': 200,
  217. 'body': None,
  218. }
  219. @classmethod
  220. def request_pdns_zone_update(cls, name=None):
  221. return {
  222. 'method': 'PATCH',
  223. 'uri': cls.get_full_pdns_url(cls.PDNS_ZONE, id=cls._pdns_zone_id_heuristic(name)),
  224. 'status': 200,
  225. 'body': None,
  226. }
  227. @classmethod
  228. def request_pdns_zone_update_unknown_type(cls, name=None, unknown_types=None):
  229. def request_callback(r, _, response_headers):
  230. body = json.loads(r.parsed_body)
  231. if not unknown_types or body['rrsets'][0]['type'] in unknown_types:
  232. return [
  233. 422, response_headers,
  234. json.dumps({'error': 'Mocked error. Unknown RR type %s.' % body['rrsets'][0]['type']})
  235. ]
  236. else:
  237. return [200, response_headers, None]
  238. request = cls.request_pdns_zone_update(name)
  239. request['body'] = request_callback
  240. request.pop('status')
  241. return request
  242. @classmethod
  243. def request_pdns_zone_retrieve(cls, name=None):
  244. return {
  245. 'method': 'GET',
  246. 'uri': cls.get_full_pdns_url(cls.PDNS_ZONE, id=cls._pdns_zone_id_heuristic(name)),
  247. 'status': 200,
  248. 'body': json.dumps({
  249. 'rrsets': [{
  250. 'comments': [],
  251. 'name': cls._normalize_name(name) if name else 'test.mock.',
  252. 'ttl': 60,
  253. 'type': 'NS',
  254. 'records': [{'content': ns} for ns in settings.DEFAULT_NS],
  255. }]
  256. }),
  257. }
  258. @classmethod
  259. def request_pdns_zone_retrieve_crypto_keys(cls, name=None):
  260. return {
  261. 'method': 'GET',
  262. 'uri': cls.get_full_pdns_url(cls.PDNS_ZONE_CRYPTO_KEYS, id=cls._pdns_zone_id_heuristic(name)),
  263. 'status': 200,
  264. 'body': json.dumps([
  265. {
  266. 'active': True,
  267. 'algorithm': 'ECDSAP256SHA256',
  268. 'bits': 256,
  269. 'dnskey': '257 3 13 EVBcsqrnOp6RGWtsrr9QW8cUtt/'
  270. 'WI5C81RcCZDTGNI9elAiMQlxRdnic+7V+b7jJDE2vgY08qAbxiNh5NdzkzA==',
  271. 'ds': [
  272. '62745 13 1 642d70d9bb84903ca4c4ca08a6e4f1e9465aeaa6',
  273. '62745 13 2 5cddaeaa383e2ea7de49bd1212bf520228f0e3b334626517e5f6a68eb85b48f6',
  274. '62745 13 4 b3f2565901ddcb0b78337301cf863d1045774377bca05c7ad69e17a167734b92'
  275. '9f0a49b7edcca913eb6f5dfeac4645b8'
  276. ],
  277. 'flags': 257,
  278. 'id': 179425943,
  279. 'keytype': key_type,
  280. 'type': 'Cryptokey',
  281. }
  282. for key_type in ['csk', 'ksk', 'zsk']
  283. ])
  284. }
  285. @classmethod
  286. def request_pdns_zone_notify(cls, name=None):
  287. return {
  288. 'method': 'PUT',
  289. 'uri': cls.get_full_pdns_url(cls.PDNS_ZONE_NOTIFY, id=cls._pdns_zone_id_heuristic(name)),
  290. 'status': 200,
  291. 'body': None,
  292. }
  293. def assertPdnsRequests(self, *expected_requests, expect_order=True):
  294. """
  295. Assert the given requests are made. To build requests, use the `MockPDNSTestCase.request_*` functions.
  296. Unmet expectations will fail the test.
  297. Args:
  298. *expected_requests: List of expected requests.
  299. expect_order: If True (default), the order of observed requests is checked.
  300. """
  301. return AssertRequestsContextManager(
  302. test_case=self,
  303. expected_requests=expected_requests,
  304. expect_order=expect_order,
  305. )
  306. def assertPdnsNoRequestsBut(self, *expected_requests):
  307. """
  308. Assert no requests other than the given ones are made. Each request can be matched more than once, unmatched
  309. expectations WILL NOT fail the test.
  310. Args:
  311. *expected_requests: List of acceptable requests to be made.
  312. """
  313. return AssertRequestsContextManager(
  314. test_case=self,
  315. expected_requests=expected_requests,
  316. single_expectation_single_request=False,
  317. expect_order=False,
  318. )
  319. def assertPdnsZoneCreation(self):
  320. """
  321. Asserts that nslord is contact and a zone is created.
  322. """
  323. return AssertRequestsContextManager(
  324. test_case=self,
  325. expected_requests=[
  326. self.request_pdns_zone_create()
  327. ],
  328. )
  329. def assertPdnsZoneDeletion(self, name=None):
  330. """
  331. Asserts that nslord and nsmaster are contacted to delete a zone.
  332. Args:
  333. name: If given, the test is restricted to the name of this zone.
  334. """
  335. return AssertRequestsContextManager(
  336. test_case=self,
  337. expected_requests=[
  338. self.request_pdns_zone_delete(ns='LORD', name=None),
  339. self.request_pdns_zone_delete(ns='MASTER', name=None),
  340. ],
  341. )
  342. @classmethod
  343. def setUpTestData(cls):
  344. httpretty.enable(allow_net_connect=False)
  345. httpretty.reset()
  346. hr_core.POTENTIAL_HTTP_PORTS.add(8081) # FIXME static dependency on settings variable
  347. for request in [
  348. cls.request_pdns_zone_create(),
  349. cls.request_pdns_zone_notify(),
  350. cls.request_pdns_zone_update(),
  351. cls.request_pdns_zone_retrieve_crypto_keys(),
  352. cls.request_pdns_zone_retrieve()
  353. ]:
  354. httpretty.register_uri(**request)
  355. cls.setUpTestDataWithPdns()
  356. httpretty.reset()
  357. @classmethod
  358. def setUpTestDataWithPdns(cls):
  359. """
  360. Override this method to set up test data. During the run of this method, httpretty is configured to accept
  361. all pdns API requests.
  362. """
  363. pass
  364. @classmethod
  365. def tearDownClass(cls):
  366. super().tearDownClass()
  367. httpretty.disable()
  368. def setUp(self):
  369. super().setUp()
  370. httpretty.reset()
  371. class DesecTestCase(MockPDNSTestCase):
  372. """
  373. This test case is run in the "standard" deSEC e.V. setting, i.e. with an API that is aware of the public suffix
  374. domains AUTO_DELEGATION_DOMAINS.
  375. The test case aims to be as close to the deployment as possible and may be extended as the deployment evolves.
  376. The test case provides an admin user and a regular user for testing.
  377. """
  378. client_class = DesecAPIClient
  379. AUTO_DELEGATION_DOMAINS = ['dedyn.io'] # TODO replace with project wide settings
  380. PUBLIC_SUFFIXES = ['de', 'com', 'io', 'gov.cd', 'edu.ec', 'xxx', 'pinb.gov.pl', 'valer.ostfold.no', 'kota.aichi.jp']
  381. @classmethod
  382. def reverse(cls, view_name, **kwargs):
  383. return reverse(view_name, kwargs=kwargs)
  384. @classmethod
  385. def setUpTestDataWithPdns(cls):
  386. super().setUpTestDataWithPdns()
  387. random.seed(0xde5ec)
  388. cls.admin = cls.create_user(is_admin=True)
  389. cls.add_domains = [cls.create_domain(name=name) for name in cls.AUTO_DELEGATION_DOMAINS]
  390. cls.user = cls.create_user()
  391. @classmethod
  392. def random_string(cls, length=6, chars=string.ascii_letters + string.digits):
  393. return ''.join(random.choice(chars) for _ in range(length))
  394. @classmethod
  395. def random_password(cls, length=12):
  396. return cls.random_string(
  397. length,
  398. chars=string.ascii_letters + string.digits + string.punctuation +
  399. 'some 💩🐬 UTF-8: “红色联合”对“四·二八兵团”总部大楼的攻击已持续了两天"'
  400. )
  401. @classmethod
  402. def random_ip(cls, proto=None):
  403. proto = proto or random.choice([4, 6])
  404. if proto == 4:
  405. return '.'.join([str(random.randrange(256)) for _ in range(4)])
  406. elif proto == 6:
  407. return '2001:' + ':'.join(['%x' % random.randrange(16**4) for _ in range(7)])
  408. else:
  409. raise ValueError('Unknown IP protocol version %s. Expected int 4 or int 6.' % str(proto))
  410. @classmethod
  411. def random_username(cls, host=None):
  412. host = host or cls.random_domain_name(suffix=random.choice(cls.PUBLIC_SUFFIXES))
  413. return cls.random_string() + '+test@' + host.lower()
  414. @classmethod
  415. def random_domain_name(cls, suffix=None):
  416. if not suffix:
  417. suffix = random.choice(cls.PUBLIC_SUFFIXES)
  418. return random.choice(string.ascii_lowercase) + cls.random_string() + '--test' + '.' + suffix
  419. @classmethod
  420. def create_token(cls, user):
  421. token = Token.objects.create(user=user)
  422. token.save()
  423. return token
  424. @classmethod
  425. def create_user(cls, **kwargs):
  426. kwargs.setdefault('email', cls.random_username())
  427. user = User(**kwargs)
  428. user.plain_password = cls.random_string(length=12)
  429. user.set_password(user.plain_password)
  430. user.save()
  431. return user
  432. @classmethod
  433. def create_domain(cls, suffix=None, **kwargs):
  434. kwargs.setdefault('owner', cls.create_user())
  435. kwargs.setdefault('name', cls.random_domain_name(suffix=suffix))
  436. domain = Domain(**kwargs)
  437. domain.save()
  438. return domain
  439. @classmethod
  440. def create_rr_set(cls, domain, records, **kwargs):
  441. if isinstance(domain, str):
  442. domain = Domain.objects.get_or_create(name=domain)
  443. domain.save()
  444. rr_set = RRset(domain=domain, **kwargs)
  445. rr_set.save()
  446. for r in records:
  447. RR(content=r, rrset=rr_set).save()
  448. return rr_set
  449. @classmethod
  450. def _find_auto_delegation_zone(cls, name):
  451. if not name:
  452. return None
  453. parents = [parent for parent in cls.AUTO_DELEGATION_DOMAINS if name.endswith('.' + parent)]
  454. if not parents:
  455. raise ValueError('Could not find auto delegation zone for zone %s; searched in %s' % (
  456. name,
  457. cls.AUTO_DELEGATION_DOMAINS
  458. ))
  459. return parents[0]
  460. @classmethod
  461. def requests_desec_domain_creation(cls, name=None):
  462. return [
  463. cls.request_pdns_zone_create(),
  464. cls.request_pdns_zone_notify(name=name),
  465. cls.request_pdns_zone_retrieve(name=name),
  466. cls.request_pdns_zone_retrieve_crypto_keys(name=name),
  467. ]
  468. @classmethod
  469. def requests_desec_domain_deletion(cls, name=None):
  470. return [
  471. cls.request_pdns_zone_delete(name=name, ns='LORD'),
  472. cls.request_pdns_zone_delete(name=name, ns='MASTER'),
  473. ]
  474. @classmethod
  475. def requests_desec_domain_creation_auto_delegation(cls, name=None):
  476. delegate_at = cls._find_auto_delegation_zone(name)
  477. return cls.requests_desec_domain_creation(name=name) + [
  478. cls.request_pdns_zone_update(name=delegate_at),
  479. cls.request_pdns_zone_notify(name=delegate_at),
  480. cls.request_pdns_zone_retrieve_crypto_keys(name=name),
  481. ]
  482. @classmethod
  483. def requests_desec_domain_deletion_auto_delegation(cls, name=None):
  484. delegate_at = cls._find_auto_delegation_zone(name)
  485. return [
  486. cls.request_pdns_zone_update(name=delegate_at),
  487. cls.request_pdns_zone_notify(name=delegate_at),
  488. cls.request_pdns_zone_delete(name=name, ns='LORD'),
  489. cls.request_pdns_zone_delete(name=name, ns='MASTER'),
  490. ]
  491. @classmethod
  492. def requests_desec_rr_sets_update(cls, name=None):
  493. return [
  494. cls.request_pdns_zone_update(name=name),
  495. cls.request_pdns_zone_notify(name=name),
  496. ]
  497. class DomainOwnerTestCase(DesecTestCase):
  498. """
  499. This test case creates a domain owner, some domains for her and some domains that are owned by other users.
  500. DomainOwnerTestCase.client is authenticated with the owner's token.
  501. """
  502. DYN = False
  503. NUM_OWNED_DOMAINS = 2
  504. NUM_OTHER_DOMAINS = 20
  505. @classmethod
  506. def setUpTestDataWithPdns(cls):
  507. super().setUpTestDataWithPdns()
  508. cls.owner = cls.create_user(dyn=cls.DYN)
  509. cls.my_domains = [
  510. cls.create_domain(suffix=random.choice(cls.AUTO_DELEGATION_DOMAINS) if cls.DYN else '', owner=cls.owner)
  511. for _ in range(cls.NUM_OWNED_DOMAINS)
  512. ]
  513. cls.other_domains = [
  514. cls.create_domain(suffix=random.choice(cls.AUTO_DELEGATION_DOMAINS) if cls.DYN else '')
  515. for _ in range(cls.NUM_OTHER_DOMAINS)
  516. ]
  517. cls.my_domain = cls.my_domains[0]
  518. cls.other_domain = cls.other_domains[0]
  519. cls.create_rr_set(cls.my_domain, ['127.0.0.1', '127.0.1.1'], type='A', ttl=123)
  520. cls.create_rr_set(cls.other_domain, ['40.1.1.1', '40.2.2.2'], type='A', ttl=456)
  521. cls.token = cls.create_token(user=cls.owner)
  522. def setUp(self):
  523. super().setUp()
  524. self.client.credentials(HTTP_AUTHORIZATION='Token ' + self.token.key)
  525. class LockedDomainOwnerTestCase(DomainOwnerTestCase):
  526. @classmethod
  527. def setUpTestData(cls):
  528. super().setUpTestData()
  529. cls.owner.locked = timezone.now()
  530. cls.owner.save()
  531. class DynDomainOwnerTestCase(DomainOwnerTestCase):
  532. DYN = True
  533. @staticmethod
  534. def _http_header_base64_conversion(content):
  535. return base64.b64encode(content.encode()).decode()
  536. @staticmethod
  537. def _set_credentials(client, authorization):
  538. client.credentials(HTTP_AUTHORIZATION=authorization)
  539. @classmethod
  540. def _set_credentials_basic_auth(cls, client, user_name, token):
  541. if not user_name and not token:
  542. cls._set_credentials(client, '')
  543. else:
  544. cls._set_credentials(client, 'Basic ' + DynDomainOwnerTestCase._http_header_base64_conversion(
  545. user_name + ':' + token
  546. )
  547. )
  548. @classmethod
  549. def _set_credentials_token_auth(cls, client, token):
  550. if token is None:
  551. cls._set_credentials(client, '')
  552. else:
  553. cls._set_credentials(client, 'Token ' + token)
  554. def _assertDynDNS12Update(self, requests, mock_remote_addr='', **kwargs):
  555. with self.assertPdnsRequests(requests):
  556. if mock_remote_addr:
  557. return self.client.get(self.reverse('v1:dyndns12update'), kwargs, REMOTE_ADDR=mock_remote_addr)
  558. else:
  559. return self.client.get(self.reverse('v1:dyndns12update'), kwargs)
  560. def assertDynDNS12Update(self, domain_name=None, mock_remote_addr='', **kwargs):
  561. return self._assertDynDNS12Update(
  562. [self.request_pdns_zone_update(name=domain_name), self.request_pdns_zone_notify(name=domain_name)],
  563. mock_remote_addr,
  564. **kwargs
  565. )
  566. def assertDynDNS12NoUpdate(self, mock_remote_addr='', **kwargs):
  567. return self._assertDynDNS12Update([], mock_remote_addr, **kwargs)
  568. def setUp(self):
  569. super().setUp()
  570. self.client_token_authorized = self.client_class()
  571. self._set_credentials_basic_auth(self.client, self.my_domain.name, self.token.key)
  572. self._set_credentials_token_auth(self.client_token_authorized, self.token.key)