test_dyndns.py 3.4 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677
  1. import ipaddress
  2. import os
  3. from conftest import DeSECAPIV1Client, query_replication, NSLordClient, assert_eventually
  4. import base64
  5. import pytest
  6. ipv4_net = os.environ['DESECSTACK_IPV4_REAR_PREFIX16'] + '.0.127'
  7. ipv6_net = os.environ['DESECSTACK_IPV6_SUBNET']
  8. update_url = "https://update.dedyn." + os.environ["DESECSTACK_DOMAIN"] + "/"
  9. update6_url = "https://update6.dedyn." + os.environ["DESECSTACK_DOMAIN"] + "/"
  10. @pytest.mark.parametrize("subname", [None, '', 'foo', '*.bar'])
  11. @pytest.mark.parametrize("base_url", [update_url, update6_url])
  12. @pytest.mark.parametrize("auth_method", ['basic', 'token', 'query'])
  13. def test(api_user_lps_domain: DeSECAPIV1Client, auth_method, base_url, subname):
  14. domain = api_user_lps_domain.domain
  15. api_headers = api_user_lps_domain.headers.copy()
  16. def _ips_in_network(ip_set, network):
  17. return all(ipaddress.ip_address(ip) in ipaddress.ip_network(network) for ip in ip_set)
  18. def do_test(url, headers, params, expected_ipv4, expected_ipv6, subname):
  19. subname = subname or ''
  20. api_user_lps_domain.headers = headers.copy()
  21. response = api_user_lps_domain.get(url, params=params)
  22. assert response.status_code == 200
  23. assert response.text == 'good'
  24. api_user_lps_domain.headers = api_headers.copy()
  25. rrs_api = {
  26. qtype: {
  27. record
  28. for rrset in api_user_lps_domain.get(f'/domains/{domain}/rrsets/?subname={subname}&type={qtype}').json()
  29. for record in rrset['records']
  30. }
  31. for qtype in ['A', 'AAAA']
  32. }
  33. rrs_dns = {qtype: NSLordClient.query(params.get('hostname', domain), qtype)[1] for qtype in ['A', 'AAAA']}
  34. for expected_net, qtype in [(expected_ipv4, 'A'), (expected_ipv6, 'AAAA')]:
  35. assert len(rrs_api[qtype]) == (1 if expected_net else 0)
  36. assert len(rrs_dns[qtype]) == (1 if expected_net else 0)
  37. assert _ips_in_network(rrs_api[qtype], expected_net)
  38. assert _ips_in_network(rrs_dns[qtype], expected_net)
  39. assert_eventually(lambda: _ips_in_network(query_replication(domain, '', qtype), expected_net))
  40. headers = {}
  41. params = {}
  42. if auth_method == 'token':
  43. headers['Authorization'] = api_user_lps_domain.headers["Authorization"]
  44. elif auth_method == 'basic':
  45. credentials = base64.b64encode(f'{api_user_lps_domain.domain}:{api_user_lps_domain.token}'.encode()).decode()
  46. headers["Authorization"] = f'Basic {credentials}'
  47. elif auth_method == 'query':
  48. params = {'username': api_user_lps_domain.domain, 'password': api_user_lps_domain.token}
  49. else:
  50. raise ValueError
  51. if subname is not None:
  52. params['hostname'] = f'{subname}.{domain}'.lstrip('.')
  53. update6 = base_url.startswith('https://update6.')
  54. do_test(base_url, headers, params, expected_ipv4=None if update6 else ipv4_net,
  55. expected_ipv6=ipv6_net if update6 else None, subname=subname)
  56. for extra_params, expected_ipv4, expected_ipv6 in [
  57. [dict(ip='1.2.3.4'), '1.2.3.4', ipv6_net if update6 else None],
  58. [dict(ip='', ipv6='bade::affe'), None, 'bade::affe'],
  59. [dict(ipv6='dead::beef'), None if update6 else ipv4_net, 'dead::beef'],
  60. [dict(ip='1.3.3.7', ipv6=''), '1.3.3.7', None],
  61. [dict(ip='192.168.1.1', ipv6='::1'), '192.168.1.1', '::1'],
  62. ]:
  63. do_test(base_url + 'update/', headers, dict(params, **extra_params), expected_ipv4, expected_ipv6, subname)