conftest.py 7.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254
  1. import json
  2. import os
  3. import random
  4. import re
  5. import string
  6. from json import JSONDecodeError
  7. from typing import Optional, Tuple, Iterable, Callable
  8. import dns
  9. import dns.name
  10. import dns.query
  11. import pytest
  12. import requests
  13. from requests.exceptions import SSLError
  14. def random_mixed_case_string(n):
  15. k = random.randint(1, n-1)
  16. s = random.choices(string.ascii_lowercase, k=k) + random.choices(string.ascii_uppercase, k=n-k)
  17. random.shuffle(s)
  18. return ''.join(s)
  19. @pytest.fixture()
  20. def random_email() -> Callable[[], str]:
  21. return lambda: f'{random_mixed_case_string(10)}@{random_mixed_case_string(10)}.desec.test'
  22. @pytest.fixture()
  23. def random_password() -> Callable[[], str]:
  24. return lambda: "".join(random.choice(string.ascii_letters) for _ in range(16))
  25. @pytest.fixture()
  26. def random_domainname() -> Callable[[], str]:
  27. return lambda: (
  28. "".join(random.choice(string.ascii_lowercase) for _ in range(16))
  29. + ".test"
  30. )
  31. @pytest.fixture()
  32. def random_local_public_suffix_domainname() -> Callable[[], str]:
  33. return lambda: (
  34. "".join(random.choice(string.ascii_lowercase) for _ in range(16))
  35. + ".dedyn."
  36. + os.environ['DESECSTACK_DOMAIN']
  37. )
  38. class DeSECAPIV1Client:
  39. base_url = "https://desec." + os.environ["DESECSTACK_DOMAIN"] + "/api/v1"
  40. headers = {
  41. "Accept": "application/json",
  42. "Content-Type": "application/json",
  43. "User-Agent": "e2e2",
  44. }
  45. def __init__(self) -> None:
  46. super().__init__()
  47. self.email = None
  48. self.password = None
  49. self.domains = []
  50. # We support two certificate verification methods
  51. # (1) against self-signed certificates, if /autocert path is present
  52. # (this is usually the case when run inside a docker container)
  53. # (2) against the default certificate store, if /autocert is not available
  54. # (this is usually the case when run outside a docker container)
  55. self.verify = True
  56. self.verify_alt = [
  57. f'/autocert/desec.{os.environ["DESECSTACK_DOMAIN"]}.cer',
  58. f'/autocert/get.desec.{os.environ["DESECSTACK_DOMAIN"]}.cer',
  59. ]
  60. @staticmethod
  61. def _filter_response_output(output: dict) -> dict:
  62. try:
  63. output['challenge'] = output['challenge'][:10] + '...'
  64. except (KeyError, TypeError):
  65. pass
  66. return output
  67. def _do_request(self, *args, **kwargs):
  68. verify_list = [self.verify] + self.verify_alt
  69. exc = None
  70. for verify in verify_list:
  71. try:
  72. reply = requests.request(*args, **kwargs, verify=verify)
  73. except SSLError as e:
  74. print(f'API <<< SSL could not verify against "{verify}"')
  75. exc = e
  76. else:
  77. # note verification preference for next time
  78. self.verify = verify
  79. self.verify_alt = verify_list
  80. self.verify_alt.remove(self.verify)
  81. return reply
  82. print(f'API <<< SSL could not be verified against any verification method')
  83. raise exc
  84. def _request(self, method: str, *, path: str, data: Optional[dict] = None, **kwargs) -> requests.Response:
  85. if data is not None:
  86. data = json.dumps(data)
  87. url = self.base_url + path if re.match(r'^https?://', path) is None else path
  88. print(f"API >>> {method} {url}")
  89. if data:
  90. print(f"API >>> {type(data)}: {data}")
  91. response = self._do_request(
  92. method,
  93. url,
  94. data=data,
  95. headers=self.headers,
  96. **kwargs,
  97. )
  98. print(f"API <<< {response.status_code}")
  99. if response.text:
  100. try:
  101. print(f"API <<< {self._filter_response_output(response.json())}")
  102. except JSONDecodeError:
  103. print(f"API <<< {response.text}")
  104. return response
  105. def get(self, path: str, **kwargs) -> requests.Response:
  106. return self._request("GET", path=path, **kwargs)
  107. def post(self, path: str, data: Optional[dict] = None, **kwargs) -> requests.Response:
  108. return self._request("POST", path=path, data=data, **kwargs)
  109. def delete(self, path: str, **kwargs) -> requests.Response:
  110. return self._request("DELETE", path=path, **kwargs)
  111. def register(self, email: str, password: str) -> Tuple[requests.Response, requests.Response]:
  112. self.email = email
  113. self.password = password
  114. captcha = self.post("/captcha/")
  115. return captcha, self.post(
  116. "/auth/",
  117. data={
  118. "email": email,
  119. "password": password,
  120. "captcha": {
  121. "id": captcha.json()["id"],
  122. "solution": captcha.json()[
  123. "content"
  124. ], # available via e2e configuration magic
  125. },
  126. },
  127. )
  128. def login(self, email: str, password: str) -> requests.Response:
  129. response = self.post(
  130. "/auth/login/", data={"email": email, "password": password}
  131. )
  132. token = response.json().get('token')
  133. if token is not None:
  134. self.headers["Authorization"] = f'Token {response.json()["token"]}'
  135. return response
  136. def domain_list(self) -> requests.Response:
  137. return self.get("/domains/")
  138. def domain_create(self, name) -> requests.Response:
  139. self.domains.append(name)
  140. return self.post(
  141. "/domains/",
  142. data={
  143. "name": name,
  144. }
  145. )
  146. def domain_destroy(self, name) -> requests.Response:
  147. self.domains.remove(name)
  148. return self.delete(f"/domains/{name}/")
  149. def rr_set_create(self, domain_name: str, rr_type: str, records: Iterable[str], subname: str = '',
  150. ttl: int = 3600) -> requests.Response:
  151. return self.post(
  152. f"/domains/{domain_name}/rrsets/",
  153. data={
  154. "subname": subname,
  155. "type": rr_type,
  156. "ttl": ttl,
  157. "records": records,
  158. }
  159. )
  160. @pytest.fixture
  161. def api_anon() -> DeSECAPIV1Client:
  162. """
  163. Anonymous access to the API.
  164. """
  165. return DeSECAPIV1Client()
  166. @pytest.fixture()
  167. def api_user(random_email, random_password) -> DeSECAPIV1Client:
  168. """
  169. Access to the API with a fresh user account (zero domains, one token). Authorization header
  170. is preconfigured, email address and password are randomly chosen.
  171. """
  172. api = DeSECAPIV1Client()
  173. email = random_email()
  174. password = random_password()
  175. api.register(email, password)
  176. api.login(email, password)
  177. return api
  178. @pytest.fixture()
  179. def api_user_domain(api_user, random_domainname) -> DeSECAPIV1Client:
  180. """
  181. Access to the API with a fresh user account that owns a domain with random name. The domain has
  182. no records other than the default ones.
  183. """
  184. api_user.domain_create(random_domainname())
  185. return api_user
  186. class NSClient:
  187. where = None
  188. def query(self, qname: str, qtype: str):
  189. print(f'DNS >>> {qname}/{qtype} @{self.where}')
  190. qname = dns.name.from_text(qname)
  191. qtype = dns.rdatatype.from_text(qtype)
  192. answer = dns.query.tcp(
  193. q=dns.message.make_query(qname, qtype),
  194. where=self.where,
  195. timeout=2
  196. )
  197. try:
  198. section = dns.message.AUTHORITY if qtype == dns.rdatatype.from_text('NS') else dns.message.ANSWER
  199. response = answer.find_rrset(section, qname, dns.rdataclass.IN, qtype)
  200. print(f'DNS <<< {response}')
  201. return {i.to_text() for i in response.items}
  202. except KeyError:
  203. print('DNS <<< !!! not found !!! Complete Answer below:\n' + answer.to_text())
  204. return {}
  205. class NSLordClient(NSClient):
  206. where = os.environ["DESECSTACK_IPV4_REAR_PREFIX16"] + '.0.129'
  207. @pytest.fixture()
  208. def ns_lord() -> NSLordClient:
  209. return NSLordClient()