conftest.py 7.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234
  1. import json
  2. import os
  3. import random
  4. import string
  5. from json import JSONDecodeError
  6. from typing import Optional, Tuple, Iterable, Callable
  7. import dns
  8. import dns.name
  9. import dns.query
  10. import pytest
  11. import requests
  12. from requests.exceptions import SSLError
  13. @pytest.fixture()
  14. def random_email() -> Callable[[], str]:
  15. return lambda: (
  16. "".join(random.choice(string.ascii_letters) for _ in range(10))
  17. + "@desec.test"
  18. )
  19. @pytest.fixture()
  20. def random_password() -> Callable[[], str]:
  21. return lambda: "".join(random.choice(string.ascii_letters) for _ in range(16))
  22. @pytest.fixture()
  23. def random_domainname() -> Callable[[], str]:
  24. return lambda: (
  25. "".join(random.choice(string.ascii_lowercase) for _ in range(16))
  26. + ".test"
  27. )
  28. @pytest.fixture()
  29. def random_local_public_suffix_domainname() -> Callable[[], str]:
  30. return lambda: (
  31. "".join(random.choice(string.ascii_lowercase) for _ in range(16))
  32. + ".dedyn."
  33. + os.environ['DESECSTACK_DOMAIN']
  34. )
  35. class DeSECAPIV1Client:
  36. base_url = "https://desec." + os.environ["DESECSTACK_DOMAIN"] + "/api/v1"
  37. headers = {
  38. "Accept": "application/json",
  39. "Content-Type": "application/json",
  40. "User-Agent": "e2e2",
  41. }
  42. def __init__(self) -> None:
  43. super().__init__()
  44. self.email = None
  45. self.password = None
  46. self.domains = []
  47. self.verify = True
  48. # We support two certificate verification methods
  49. # (1) against self-signed certificates, if /autocert path is present
  50. # (this is usually the case when run inside a docker container)
  51. # (2) against the default certificate store, if /autocert is not available
  52. # (this is usually the case when run outside a docker container)
  53. self.verify = True
  54. self.verify_alt = f'/autocert/desec.{os.environ["DESECSTACK_DOMAIN"]}.cer'
  55. @staticmethod
  56. def _filter_response_output(output: dict) -> dict:
  57. try:
  58. output['challenge'] = output['challenge'][:10] + '...'
  59. except (KeyError, TypeError):
  60. pass
  61. return output
  62. def _do_request(self, *args, **kwargs):
  63. try:
  64. return requests.request(*args, **kwargs, verify=self.verify)
  65. except SSLError:
  66. self.verify, self.verify_alt = self.verify_alt, self.verify
  67. return requests.request(*args, **kwargs, verify=self.verify) # if problem persists, let it raise
  68. def _request(self, method: str, *, path: str, data: Optional[dict] = None, **kwargs) -> requests.Response:
  69. if data is not None:
  70. data = json.dumps(data)
  71. url = self.base_url + path
  72. print(f"API >>> {method} {url}")
  73. if data:
  74. print(f"API >>> {type(data)}: {data}")
  75. response = self._do_request(
  76. method,
  77. url,
  78. data=data,
  79. headers=self.headers,
  80. **kwargs,
  81. )
  82. print(f"API <<< {response.status_code}")
  83. if response.text:
  84. try:
  85. print(f"API <<< {self._filter_response_output(response.json())}")
  86. except JSONDecodeError:
  87. print(f"API <<< {response.text}")
  88. return response
  89. def get(self, path: str, **kwargs) -> requests.Response:
  90. return self._request("GET", path=path, **kwargs)
  91. def post(self, path: str, data: Optional[dict] = None, **kwargs) -> requests.Response:
  92. return self._request("POST", path=path, data=data, **kwargs)
  93. def delete(self, path: str, **kwargs) -> requests.Response:
  94. return self._request("DELETE", path=path, **kwargs)
  95. def register(self, email: str, password: str) -> Tuple[requests.Response, requests.Response]:
  96. self.email = email
  97. self.password = password
  98. captcha = self.post("/captcha/")
  99. return captcha, self.post(
  100. "/auth/",
  101. data={
  102. "email": email,
  103. "password": password,
  104. "captcha": {
  105. "id": captcha.json()["id"],
  106. "solution": captcha.json()[
  107. "content"
  108. ], # available via e2e configuration magic
  109. },
  110. },
  111. )
  112. def login(self, email: str, password: str) -> requests.Response:
  113. token = self.post(
  114. "/auth/login/", data={"email": email, "password": password}
  115. )
  116. self.headers["Authorization"] = f'Token {token.json()["token"]}'
  117. return token
  118. def domain_list(self) -> requests.Response:
  119. return self.get("/domains/")
  120. def domain_create(self, name) -> requests.Response:
  121. self.domains.append(name)
  122. return self.post(
  123. "/domains/",
  124. data={
  125. "name": name,
  126. }
  127. )
  128. def domain_destroy(self, name) -> requests.Response:
  129. self.domains.remove(name)
  130. return self.delete(f"/domains/{name}/")
  131. def rr_set_create(self, domain_name: str, rr_type: str, records: Iterable[str], subname: str = '',
  132. ttl: int = 3600) -> requests.Response:
  133. return self.post(
  134. f"/domains/{domain_name}/rrsets/",
  135. data={
  136. "subname": subname,
  137. "type": rr_type,
  138. "ttl": ttl,
  139. "records": records,
  140. }
  141. )
  142. @pytest.fixture
  143. def api_anon() -> DeSECAPIV1Client:
  144. """
  145. Anonymous access to the API.
  146. """
  147. return DeSECAPIV1Client()
  148. @pytest.fixture()
  149. def api_user(api_anon, random_email, random_password) -> DeSECAPIV1Client:
  150. """
  151. Access to the API with a fresh user account (zero domains, one token). Authorization header
  152. is preconfigured, email address and password are randomly chosen.
  153. """
  154. api = api_anon
  155. email = random_email()
  156. password = random_password()
  157. api.register(email, password)
  158. api.login(email, password)
  159. return api
  160. @pytest.fixture()
  161. def api_user_domain(api_user, random_domainname) -> DeSECAPIV1Client:
  162. """
  163. Access to the API with a fresh user account that owns a domain with random name. The domain has
  164. no records other than the default ones.
  165. """
  166. api_user.domain_create(random_domainname())
  167. return api_user
  168. class NSClient:
  169. where = None
  170. def query(self, qname: str, qtype: str):
  171. print(f'DNS >>> {qname}/{qtype} @{self.where}')
  172. qname = dns.name.from_text(qname)
  173. qtype = dns.rdatatype.from_text(qtype)
  174. answer = dns.query.tcp(
  175. q=dns.message.make_query(qname, qtype),
  176. where=self.where,
  177. timeout=2
  178. )
  179. try:
  180. section = dns.message.AUTHORITY if qtype == dns.rdatatype.from_text('NS') else dns.message.ANSWER
  181. response = answer.find_rrset(section, qname, dns.rdataclass.IN, qtype)
  182. print(f'DNS <<< {response}')
  183. return {i.to_text() for i in response.items}
  184. except KeyError:
  185. print('DNS <<< !!! not found !!! Complete Answer below:\n' + answer.to_text())
  186. return {}
  187. class NSLordClient(NSClient):
  188. where = os.environ["DESECSTACK_IPV4_REAR_PREFIX16"] + '.0.129'
  189. @pytest.fixture()
  190. def ns_lord() -> NSLordClient:
  191. return NSLordClient()