test_dyndns.py 3.5 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879
  1. import ipaddress
  2. import os
  3. from conftest import DeSECAPIV1Client, query_replication, NSLordClient, assert_eventually
  4. import base64
  5. import pytest
  6. ipv4_net = os.environ['DESECSTACK_IPV4_REAR_PREFIX16'] + '.0.127'
  7. ipv6_net = os.environ['DESECSTACK_IPV6_SUBNET']
  8. update_url = "https://update.dedyn." + os.environ["DESECSTACK_DOMAIN"] + "/"
  9. update6_url = "https://update6.dedyn." + os.environ["DESECSTACK_DOMAIN"] + "/"
  10. @pytest.mark.parametrize("subname", [None, '', 'foo', '*.bar'])
  11. @pytest.mark.parametrize("base_url", [update_url, update6_url])
  12. @pytest.mark.parametrize("auth_method", ['basic', 'token', 'query'])
  13. def test(api_user_lps_domain: DeSECAPIV1Client, auth_method, base_url, subname):
  14. domain = api_user_lps_domain.domain
  15. api_headers = api_user_lps_domain.headers.copy()
  16. def _ips_in_network(ip_set, network):
  17. if ip_set:
  18. assert network is not None, "`network` was None when `ip_set` wasn't empty."
  19. return all(ipaddress.ip_address(ip) in ipaddress.ip_network(network) for ip in ip_set)
  20. def do_test(url, headers, params, expected_ipv4, expected_ipv6, subname):
  21. subname = subname or ''
  22. api_user_lps_domain.headers = headers.copy()
  23. response = api_user_lps_domain.get(url, params=params)
  24. assert response.status_code == 200
  25. assert response.text == 'good'
  26. api_user_lps_domain.headers = api_headers.copy()
  27. rrs_api = {
  28. qtype: {
  29. record
  30. for rrset in api_user_lps_domain.get(f'/domains/{domain}/rrsets/?subname={subname}&type={qtype}').json()
  31. for record in rrset['records']
  32. }
  33. for qtype in ['A', 'AAAA']
  34. }
  35. rrs_dns = {qtype: NSLordClient.query(params.get('hostname', domain), qtype)[1] for qtype in ['A', 'AAAA']}
  36. for expected_net, qtype in [(expected_ipv4, 'A'), (expected_ipv6, 'AAAA')]:
  37. assert len(rrs_api[qtype]) == (1 if expected_net else 0)
  38. assert len(rrs_dns[qtype]) == (1 if expected_net else 0)
  39. assert _ips_in_network(rrs_api[qtype], expected_net)
  40. assert _ips_in_network(rrs_dns[qtype], expected_net)
  41. assert_eventually(lambda: _ips_in_network(query_replication(domain, '', qtype), expected_net))
  42. headers = {}
  43. params = {}
  44. if auth_method == 'token':
  45. headers['Authorization'] = api_user_lps_domain.headers["Authorization"]
  46. elif auth_method == 'basic':
  47. credentials = base64.b64encode(f'{api_user_lps_domain.domain}:{api_user_lps_domain.token}'.encode()).decode()
  48. headers["Authorization"] = f'Basic {credentials}'
  49. elif auth_method == 'query':
  50. params = {'username': api_user_lps_domain.domain, 'password': api_user_lps_domain.token}
  51. else:
  52. raise ValueError
  53. if subname is not None:
  54. params['hostname'] = f'{subname}.{domain}'.lstrip('.')
  55. update6 = base_url.startswith('https://update6.')
  56. do_test(base_url, headers, params, expected_ipv4=None if update6 else ipv4_net,
  57. expected_ipv6=ipv6_net if update6 else None, subname=subname)
  58. for extra_params, expected_ipv4, expected_ipv6 in [
  59. [dict(ip='1.2.3.4'), '1.2.3.4', ipv6_net if update6 else None],
  60. [dict(ip='', ipv6='bade::affe'), None, 'bade::affe'],
  61. [dict(ipv6='dead::beef'), None if update6 else ipv4_net, 'dead::beef'],
  62. [dict(ip='1.3.3.7', ipv6=''), '1.3.3.7', None],
  63. [dict(ip='192.168.1.1', ipv6='::1'), '192.168.1.1', '::1'],
  64. ]:
  65. do_test(base_url + 'update/', headers, dict(params, **extra_params), expected_ipv4, expected_ipv6, subname)