Browse Source

feat(tests): migrate former Chakram e2e tests to pytest e2e2, fixes #534

Peter Thomassen 4 năm trước cách đây
mục cha
commit
c2e517b010

+ 2 - 2
docker-compose.test-e2e2.yml

@@ -82,11 +82,11 @@ services:
         ipv4_address: ${DESECSTACK_IPV4_REAR_PREFIX16}.0.127
     extra_hosts:
     - "checkipv4.dedyn.${DESECSTACK_DOMAIN}:${DESECSTACK_IPV4_REAR_PREFIX16}.0.128"
-    - "checkipv6.dedyn.${DESECSTACK_DOMAIN}:${DESECSTACK_IPV4_REAR_PREFIX16}.0.128"
+    - "checkipv6.dedyn.${DESECSTACK_DOMAIN}:${DESECSTACK_IPV6_ADDRESS}"
     - "checkip.dedyn.${DESECSTACK_DOMAIN}:${DESECSTACK_IPV4_REAR_PREFIX16}.0.128"
     - "dedyn.${DESECSTACK_DOMAIN}:${DESECSTACK_IPV4_REAR_PREFIX16}.0.128"
     - "desec.${DESECSTACK_DOMAIN}:${DESECSTACK_IPV4_REAR_PREFIX16}.0.128"
-    - "update6.dedyn.${DESECSTACK_DOMAIN}:${DESECSTACK_IPV4_REAR_PREFIX16}.0.128"
+    - "update6.dedyn.${DESECSTACK_DOMAIN}:${DESECSTACK_IPV6_ADDRESS}"
     - "update.dedyn.${DESECSTACK_DOMAIN}:${DESECSTACK_IPV4_REAR_PREFIX16}.0.128"
     - "www.dedyn.${DESECSTACK_DOMAIN}:${DESECSTACK_IPV4_REAR_PREFIX16}.0.128"
     - "www.desec.${DESECSTACK_DOMAIN}:${DESECSTACK_IPV4_REAR_PREFIX16}.0.128"

+ 104 - 33
test/e2e2/conftest.py

@@ -1,3 +1,4 @@
+import glob
 import json
 import os
 import random
@@ -72,31 +73,20 @@ def random_password() -> str:
     return "".join(random.choice(string.ascii_letters) for _ in range(16))
 
 
-def random_domainname() -> str:
-    return (
-        "".join(random.choice(string.ascii_lowercase) for _ in range(16))
-        + ".test"
-    )
-
-
-def random_local_public_suffix_domainname() -> str:
-    return (
-        "".join(random.choice(string.ascii_lowercase) for _ in range(16))
-        + ".dedyn."
-        + os.environ['DESECSTACK_DOMAIN']
-    )
+def random_domainname(suffix='test') -> str:
+    return "".join(random.choice(string.ascii_lowercase) for _ in range(16)) + f'.{suffix}'
 
 
 class DeSECAPIV1Client:
     base_url = "https://desec." + os.environ["DESECSTACK_DOMAIN"] + "/api/v1"
-    headers = {
-        "Accept": "application/json",
-        "Content-Type": "application/json",
-        "User-Agent": "e2e2",
-    }
 
     def __init__(self) -> None:
         super().__init__()
+        self.headers = {  # instance-local
+            "Accept": "application/json",
+            "Content-Type": "application/json",
+            "User-Agent": "e2e2",
+        }
         self.email = None
         self.password = None
         self.domains = {}
@@ -107,10 +97,7 @@ class DeSECAPIV1Client:
         # (2) against the default certificate store, if /autocert is not available
         # (this is usually the case when run outside a docker container)
         self.verify = True
-        self.verify_alt = [
-            f'/autocert/desec.{os.environ["DESECSTACK_DOMAIN"]}.cer',
-            f'/autocert/get.desec.{os.environ["DESECSTACK_DOMAIN"]}.cer',
-        ]
+        self.verify_alt = glob.glob('/autocert/*.cer')
 
     @staticmethod
     def _filter_response_output(output: dict) -> dict:
@@ -128,16 +115,19 @@ class DeSECAPIV1Client:
             return None
 
     def _do_request(self, *args, **kwargs):
-        verify_list = [self.verify] + self.verify_alt
-        # do not verify SSL if we're in faketime (cert will be expired!?)
-        if faketime_get() != '+0d':
+        if 'verify' in kwargs:
+            verify_list = [kwargs.pop('verify')]
+        elif faketime_get() != '+0d':
+            # do not verify SSL if we're in faketime (cert will be expired!?)
             verify_list = [False]
+        else:
+            verify_list = [self.verify] + self.verify_alt
 
         exc = None
         for verify in verify_list:
             try:
                 with warnings.catch_warnings():
-                    if verify_list == [False]:
+                    if not verify:
                         # Suppress insecurity warning if we do not want to verify
                         warnings.filterwarnings('ignore', category=InsecureRequestWarning)
                     reply = requests.request(*args, **kwargs, verify=verify)
@@ -190,6 +180,9 @@ class DeSECAPIV1Client:
     def get(self, path: str, **kwargs) -> requests.Response:
         return self._request("GET", path=path, **kwargs)
 
+    def options(self, path: str, **kwargs) -> requests.Response:
+        return self._request("OPTIONS", path=path, **kwargs)
+
     def post(self, path: str, data: Optional[dict] = None, **kwargs) -> requests.Response:
         return self._request("POST", path=path, data=data, **kwargs)
 
@@ -221,9 +214,9 @@ class DeSECAPIV1Client:
         response = self.post(
             "/auth/login/", data={"email": email, "password": password}
         )
-        token = response.json().get('token')
-        if token is not None:
-            self.headers["Authorization"] = f'Token {response.json()["token"]}'
+        self.token = response.json().get('token')
+        if self.token is not None:
+            self.headers["Authorization"] = f'Token {self.token}'
             self.patch(  # make token last forever
                 f"/auth/tokens/{response.json().get('id')}/",
                 data={'max_unused_period': None, 'max_age': None}
@@ -286,6 +279,10 @@ class DeSECAPIV1Client:
         return {' '.join(param) for param in params}
 
 
+class DeSECAPIV2Client(DeSECAPIV1Client):
+    base_url = "https://desec." + os.environ["DESECSTACK_DOMAIN"] + "/api/v2"
+
+
 @pytest.fixture
 def api_anon() -> DeSECAPIV1Client:
     """
@@ -294,6 +291,14 @@ def api_anon() -> DeSECAPIV1Client:
     return DeSECAPIV1Client()
 
 
+@pytest.fixture
+def api_anon_v2() -> DeSECAPIV2Client:
+    """
+    Anonymous access to the API.
+    """
+    return DeSECAPIV2Client()
+
+
 @pytest.fixture()
 def api_user() -> DeSECAPIV1Client:
     """
@@ -318,6 +323,71 @@ def api_user_domain(api_user) -> DeSECAPIV1Client:
     return api_user
 
 
+@pytest.fixture()
+def api_user_domain_rrsets(api_user_domain, init_rrsets: dict) -> DeSECAPIV1Client:
+    """
+    Access to the API with a fresh user account that owns a domain with random name. The domain is
+    equipped with RRsets from init_rrsets.
+    """
+
+    def _normalize_rrset(rrset, qtype):
+        if qtype not in ('CDS', 'CDNSKEY', 'DNSKEY'):
+            return rrset
+        ttl, records = rrset
+        return ttl, {' '.join(map(lambda x: x.replace(' ', ''), record.split(' ', 3))) for record in records}
+
+    def _assert_rrsets(self, rrsets):
+        rrsets_api = {
+            (rrset['subname'], rrset['type']): (rrset['ttl'], set(rrset['records']))
+            for rrset in self.get(f'/domains/{self.domain}/rrsets/').json()
+        }
+        rrsets_dns = {
+            (subname, qtype): _normalize_rrset(NSLordClient.query(f'{subname}.{self.domain}'.lstrip('.'), qtype), qtype)
+            for subname, qtype in rrsets.keys()
+        }
+
+        for k, v in rrsets.items():
+            v = v or init_rrsets[k]  # if None, check against init_rrsets
+            if not v[1]:
+                assert k not in rrsets_api
+                assert not rrsets_dns[k][1]
+            else:
+                assert rrsets_api[k] == v
+                assert rrsets_dns[k] == v
+
+    api_user_domain.assert_rrsets = _assert_rrsets.__get__(api_user_domain)  # very hacky way of adding a method
+
+    api_user_domain.post(f"/domains/{api_user_domain.domain}/rrsets/", data=[
+        {"subname": k[0], "type": k[1], "ttl": v[0], "records": list(v[1])}
+        for k, v in init_rrsets.items()
+    ])
+    api_user_domain.assert_rrsets(init_rrsets)
+    return api_user_domain
+
+
+api_user_lps = api_user
+
+
+@pytest.fixture()
+def lps(api_user_lps) -> DeSECAPIV1Client:
+    """
+    Access to the API with a fresh user account that owns a local public suffix.
+    """
+    lps = "dedyn." + os.environ['DESECSTACK_DOMAIN']
+    api_user_lps.domain_create(lps)  # may return 400 if exists, but that's ok
+    return lps
+
+
+@pytest.fixture()
+def api_user_lps_domain(api_user, lps) -> DeSECAPIV1Client:
+    """
+    Access to the API with a fresh user account that owns a domain with random name under a local public suffix.
+    The domain has no records other than the default ones.
+    """
+    api_user.domain_create(random_domainname(suffix=lps))
+    return api_user
+
+
 class NSClient:
     where = None
 
@@ -335,10 +405,10 @@ class NSClient:
             section = dns.message.AUTHORITY if qtype == dns.rdatatype.from_text('NS') else dns.message.ANSWER
             response = answer.find_rrset(section, qname, dns.rdataclass.IN, qtype)
             tsprint(f'DNS <<< {response}')
-            return {i.to_text() for i in response.items}
+            return response.ttl, {i.to_text() for i in response.items}
         except KeyError:
             tsprint('DNS <<< !!! not found !!! Complete Answer below:\n' + answer.to_text())
-            return {}
+            return None, set()
 
 
 class NSLordClient(NSClient):
@@ -393,10 +463,11 @@ def return_eventually(expression: callable, min_pause: float = .1, max_pause: fl
             wait = min(2 * wait, max_pause)
 
 
-def assert_eventually(assertion: callable, min_pause: float = .1, max_pause: float = 2, timeout: float = 5) -> None:
+def assert_eventually(assertion: callable, min_pause: float = .1, max_pause: float = 2, timeout: float = 5,
+                      retry_on: Tuple[type] = (AssertionError,)) -> None:
     def _assert():
         assert assertion()
-    return_eventually(_assert, min_pause, max_pause, timeout, retry_on=(AssertionError,))
+    return_eventually(_assert, min_pause, max_pause, timeout, retry_on=retry_on)
 
 
 def faketime(t: str):

+ 1 - 0
test/e2e2/requirements.txt

@@ -1,4 +1,5 @@
 pytest
+pytest-schema
 pytest-xdist
 requests
 dnspython~=2.1.0

+ 17 - 1
test/e2e2/spec/test_api_basic.py

@@ -1,6 +1,6 @@
 import os
 
-from conftest import DeSECAPIV1Client
+from conftest import DeSECAPIV1Client, DeSECAPIV2Client
 
 
 def test_homepage(api_anon: DeSECAPIV1Client):
@@ -10,6 +10,22 @@ def test_homepage(api_anon: DeSECAPIV1Client):
         "reset-password": f"{api_anon.base_url}/auth/account/reset-password/",
     }
 
+def test_homepage_CORS(api_anon: DeSECAPIV1Client):
+    api_anon.headers['Origin'] = 'http://foo.example'
+    assert api_anon.get("/").headers['access-control-allow-origin'] == '*'
+
+    api_anon.headers['Access-Control-Request-Headers'] = 'Authorization'
+    api_anon.headers['Access-Control-Request-Method'] = 'POST'
+    assert 'authorization' in api_anon.options("/").headers['access-control-allow-headers'].split(', ')
+
+
+def test_homepage_v2(api_anon_v2: DeSECAPIV2Client):
+    assert api_anon_v2.get("/").json() == {
+        "register": f"{api_anon_v2.base_url}/auth/",
+        "login": f"{api_anon_v2.base_url}/auth/login/",
+        "reset-password": f"{api_anon_v2.base_url}/auth/account/reset-password/",
+    }
+
 
 def test_get_desec_io(api_anon: DeSECAPIV1Client):
     response = api_anon.get("https://get.desec." + os.environ['DESECSTACK_DOMAIN'], allow_redirects=False)

+ 5 - 5
test/e2e2/spec/test_api_rr.py

@@ -369,7 +369,7 @@ def test_create_valid_canonical(api_user_domain: DeSECAPIV1Client, rr_type: str,
     if value is not None:
         assert api_user_domain.rr_set_create(domain_name, rr_type, [value], subname=subname).status_code == 201
         expected.add(value)
-    rrset = NSLordClient.query(f'{subname}.{domain_name}'.strip('.'), rr_type)
+    _, rrset = NSLordClient.query(f'{subname}.{domain_name}'.strip('.'), rr_type)
     assert rrset == expected
     assert_eventually(lambda: query_replication(domain_name, subname, rr_type) == expected)
 
@@ -385,7 +385,7 @@ def test_create_valid_non_canonical(api_user_domain: DeSECAPIV1Client, rr_type:
     if value is not None:
         assert api_user_domain.rr_set_create(domain_name, rr_type, [value], subname=subname).status_code == 201
         expected.add(value)
-    rrset = NSLordClient.query(f'{subname}.{domain_name}'.strip('.'), rr_type)
+    _, rrset = NSLordClient.query(f'{subname}.{domain_name}'.strip('.'), rr_type)
     assert len(rrset) == len(expected)
     assert_eventually(lambda: len(query_replication(domain_name, subname, rr_type)) == len(expected))
 
@@ -398,7 +398,7 @@ def test_create_invalid(api_user_domain: DeSECAPIV1Client, rr_type: str, value:
 def test_create_long_subname(api_user_domain: DeSECAPIV1Client):
     subname = 'a' * 63
     assert api_user_domain.rr_set_create(api_user_domain.domain, "AAAA", ["::1"], subname=subname).status_code == 201
-    assert NSLordClient.query(f"{subname}.{api_user_domain.domain}", "AAAA") == {"::1"}
+    assert NSLordClient.query(f"{subname}.{api_user_domain.domain}", "AAAA")[1] == {"::1"}
     assert_eventually(lambda: query_replication(api_user_domain.domain, subname, "AAAA") == {"::1"})
 
 
@@ -409,10 +409,10 @@ def test_add_remove_DNSKEY(api_user_domain: DeSECAPIV1Client):
     # After adding another DNSKEY, we expect it to be part of the nameserver's response (along with the automatic ones)
     value = '257 3 13 aCoEWYBBVsP9Fek2oC8yqU8ocKmnS1iD SFZNORnQuHKtJ9Wpyz+kNryquB78Pyk/ NTEoai5bxoipVQQXzHlzyg=='
     assert api_user_domain.rr_set_create(domain_name, 'DNSKEY', [value], subname='').status_code == 201
-    assert NSLordClient.query(domain_name, 'DNSKEY') == auto_dnskeys | {value}
+    assert NSLordClient.query(domain_name, 'DNSKEY')[1] == auto_dnskeys | {value}
     assert_eventually(lambda: query_replication(domain_name, '', 'DNSKEY') == auto_dnskeys | {value})
 
     # After deleting it, we expect that the automatically managed ones are still there
     assert api_user_domain.rr_set_delete(domain_name, "DNSKEY", subname='').status_code == 204
-    assert NSLordClient.query(domain_name, 'DNSKEY') == auto_dnskeys
+    assert NSLordClient.query(domain_name, 'DNSKEY')[1] == auto_dnskeys
     assert_eventually(lambda: query_replication(domain_name, '', 'DNSKEY') == auto_dnskeys)

+ 60 - 0
test/e2e2/spec/test_api_rrset.py

@@ -0,0 +1,60 @@
+import pytest
+
+from conftest import DeSECAPIV1Client
+
+
+@pytest.mark.parametrize("init_rrsets", [
+    {
+        ('www', 'A'): (3600, {'1.2.3.4'}),
+        ('www', 'AAAA'): (3600, {'::1'}),
+        ('one', 'CNAME'): (3600, {'some.example.net.'}),
+        ('other', 'TXT'): (3600, {'"foo" "bar"', '"bar" "foo"'}),
+    }
+])
+@pytest.mark.parametrize("rrsets", [
+    {  # create three RRsets
+        ('a' * 63, 'A'): (7000, {'4.3.2.1', '7.6.5.4'}),
+        ('b', 'PTR'): (7000, {'1.foo.bar.com.', '2.bar.foo.net.'}),
+        ('c.' + 'a' * 63, 'MX'): (7000, {'10 mail.something.net.'}),
+    },
+    {  # update three RRsets
+        ('www', 'A'): None,  # ensure value from init_rrset is still there
+        ('www', 'AAAA'): (7000, {'6666::6666', '7777::7777'}),
+        ('one', 'CNAME'): (7000, {'other.example.net.'}),
+        ('other', 'TXT'): (7000, {'"foobar"'}),
+    },
+    {  # delete three RRsets
+        ('www', 'A'): (7000, {}),
+        ('www', 'AAAA'): None,  # ensure value from init_rrset is still there
+        ('one', 'CNAME'): (7000, {}),
+        ('other', 'TXT'): (7000, {}),
+    },
+    {  # create, update, delete
+        ('a' * 63, 'A'): (7000, {'4.3.2.1', '7.6.5.4'}),
+        ('www', 'A'): None,  # ensure value from init_rrset is still there
+        ('www', 'AAAA'): (7000, {'6666::6666', '7777::7777'}),
+        ('one', 'CNAME'): None,  # ensure value from init_rrset is still there
+        ('other', 'TXT'): (7000, {}),
+    },
+    {  # complex usecase
+        ('', 'A'): (3600, {'1.2.3.4', '255.254.253.252'}),  # create apex reocrd
+        ('*', 'MX'): (3601, {'0 mx.example.net.'}),  # create wildcard record
+        ('www', 'AAAA'): (3602, {}),  # remove existing record
+        ('www', 'A'): (7000, {'4.3.2.1', '7.6.5.4'}),  # update existing record
+        ('one', 'A'): (3603, {'1.1.1.1'}),  # configure A instead of ...
+        ('one', 'CNAME'): (3603, {}),  # ... CNAME
+        ('other', 'CNAME'): (3603, {'cname.example.com.'}),  # configure CNAME instead of ...
+        ('other', 'TXT'): (3600, {}),  # ... TXT
+        ('nonexistent', 'DNAME'): (3600, {}),  # delete something that doesn't exist
+        ('sub', 'DNSKEY'): (3600, {'257 3 15 l02Woi0iS8Aa25FQkUd9RMzZHJpBoRQwAQEX1SxZJA4='}),  # non-apex DNSSEC
+        ('sub', 'CDNSKEY'): (3600, {'257 3 15 l02Woi0iS8Aa25FQkUd9RMzZHJpBoRQwAQEX1SxZJA4='}),  # non-apex DNSSEC
+        ('sub', 'CDS'): (3600, {'35217 15 2 401781b934e392de492ec77ae2e15d70f6575a1c0bc59c5275c04ebe80c6614c'}),  # dto.
+    },
+])
+def test(api_user_domain_rrsets: DeSECAPIV1Client, rrsets: dict):
+    api_user_domain_rrsets.patch(f"/domains/{api_user_domain_rrsets.domain}/rrsets/", data=[
+        {"subname": k[0], "type": k[1], "ttl": v[0], "records": list(v[1])}
+        for k, v in rrsets.items()
+        if v is not None
+    ])
+    api_user_domain_rrsets.assert_rrsets(rrsets)

+ 43 - 0
test/e2e2/spec/test_donation.py

@@ -0,0 +1,43 @@
+from conftest import DeSECAPIV1Client
+
+import pytest
+from pytest_schema import schema, Optional
+
+
+donation = {
+    'name': str,
+    'iban': str,
+    'bic': str,
+    'amount': str,
+    'mref': str,
+    'interval': int,
+    'message': str,
+    'email': str,
+}
+
+
+@pytest.mark.parametrize("data", [
+    {
+        "name": "Drama Queen",
+        "iban": "DE89 3704 0044 0532 0130 00",
+        "bic": "MARKDEF1100",
+        "amount": "3.14",
+        "message": "foobar",
+        "email": "drama@queen.world",
+    },
+    {
+        "name": "Drama Queen",
+        "iban": "DE89370400440532013000",
+        "bic": "MARKDEF1100",
+        "amount": "3.14",
+    },
+])
+def test_response(api_anon: DeSECAPIV1Client, data):
+    response = api_anon.post("/donation/", data=data)
+    assert response.status_code == 201
+    assert schema(donation, ignore_extra_keys=False) == response.json()
+
+
+@pytest.mark.skip(reason="not sure how to test")
+def test_confirmation_email():
+    pass

+ 77 - 0
test/e2e2/spec/test_dyndns.py

@@ -0,0 +1,77 @@
+import ipaddress
+import os
+
+from conftest import DeSECAPIV1Client, query_replication, NSLordClient, assert_eventually
+
+import base64
+import pytest
+
+
+ipv4_net = os.environ['DESECSTACK_IPV4_REAR_PREFIX16'] + '.0.127'
+ipv6_net = os.environ['DESECSTACK_IPV6_SUBNET']
+update_url = "https://update.dedyn." + os.environ["DESECSTACK_DOMAIN"] + "/"
+update6_url = "https://update6.dedyn." + os.environ["DESECSTACK_DOMAIN"] + "/"
+
+
+@pytest.mark.parametrize("subname", [None, '', 'foo', '*.bar'])
+@pytest.mark.parametrize("base_url", [update_url, update6_url])
+@pytest.mark.parametrize("auth_method", ['basic', 'token', 'query'])
+def test(api_user_lps_domain: DeSECAPIV1Client, auth_method, base_url, subname):
+    domain = api_user_lps_domain.domain
+    api_headers = api_user_lps_domain.headers.copy()
+
+    def _ips_in_network(ip_set, network):
+        return all(ipaddress.ip_address(ip) in ipaddress.ip_network(network) for ip in ip_set)
+
+    def do_test(url, headers, params, expected_ipv4, expected_ipv6, subname):
+        subname = subname or ''
+        api_user_lps_domain.headers = headers.copy()
+        response = api_user_lps_domain.get(url, params=params)
+        assert response.status_code == 200
+        assert response.text == 'good'
+
+        api_user_lps_domain.headers = api_headers.copy()
+        rrs_api = {
+            qtype: {
+                record
+                for rrset in api_user_lps_domain.get(f'/domains/{domain}/rrsets/?subname={subname}&type={qtype}').json()
+                for record in rrset['records']
+            }
+            for qtype in ['A', 'AAAA']
+        }
+        rrs_dns = {qtype: NSLordClient.query(params.get('hostname', domain), qtype)[1] for qtype in ['A', 'AAAA']}
+
+        for expected_net, qtype in [(expected_ipv4, 'A'), (expected_ipv6, 'AAAA')]:
+            assert len(rrs_api[qtype]) == (1 if expected_net else 0)
+            assert len(rrs_dns[qtype]) == (1 if expected_net else 0)
+            assert _ips_in_network(rrs_api[qtype], expected_net)
+            assert _ips_in_network(rrs_dns[qtype], expected_net)
+            assert_eventually(lambda: _ips_in_network(query_replication(domain, '', qtype), expected_net))
+
+    headers = {}
+    params = {}
+    if auth_method == 'token':
+        headers['Authorization'] = api_user_lps_domain.headers["Authorization"]
+    elif auth_method == 'basic':
+        credentials = base64.b64encode(f'{api_user_lps_domain.domain}:{api_user_lps_domain.token}'.encode()).decode()
+        headers["Authorization"] = f'Basic {credentials}'
+    elif auth_method == 'query':
+        params = {'username': api_user_lps_domain.domain, 'password': api_user_lps_domain.token}
+    else:
+        raise ValueError
+
+    if subname is not None:
+        params['hostname'] = f'{subname}.{domain}'.lstrip('.')
+
+    update6 = base_url.startswith('https://update6.')
+    do_test(base_url, headers, params, expected_ipv4=None if update6 else ipv4_net,
+            expected_ipv6=ipv6_net if update6 else None, subname=subname)
+
+    for extra_params, expected_ipv4, expected_ipv6 in [
+        [dict(ip='1.2.3.4'), '1.2.3.4', ipv6_net if update6 else None],
+        [dict(ip='', ipv6='bade::affe'), None, 'bade::affe'],
+        [dict(ipv6='dead::beef'), None if update6 else ipv4_net, 'dead::beef'],
+        [dict(ip='1.3.3.7', ipv6=''), '1.3.3.7', None],
+        [dict(ip='192.168.1.1', ipv6='::1'), '192.168.1.1', '::1'],
+    ]:
+        do_test(base_url + 'update/', headers, dict(params, **extra_params), expected_ipv4, expected_ipv6, subname)

+ 103 - 0
test/e2e2/spec/test_www.py

@@ -0,0 +1,103 @@
+import ipaddress
+import os
+import socket
+
+import pytest
+from requests import exceptions
+
+
+https_url = "https://desec." + os.environ["DESECSTACK_DOMAIN"] + "/"
+ipv4 = os.environ['DESECSTACK_IPV4_REAR_PREFIX16'] + '.0.128'
+ipv6 = os.environ['DESECSTACK_IPV6_ADDRESS']
+
+
+class HostsOverride:
+    def __init__(self, host, ip):
+        self.cache = {host: ip}
+
+    def __enter__(self):
+        self._getaddrinfo = socket.getaddrinfo
+        socket.getaddrinfo = self.getaddrinfo
+
+    def __exit__(self, type, value, traceback):
+        socket.getaddrinfo = self._getaddrinfo
+
+    def getaddrinfo(self, host, *args, **kwargs):
+        try:
+            host = self.cache[host]
+        except KeyError:
+            pass
+        return self._getaddrinfo(host, *args, **kwargs)
+
+
+@pytest.mark.parametrize("hostname", [
+    f'{subname}.{os.environ["DESECSTACK_DOMAIN"]}' for subname in (
+        'dedyn',
+        'www.dedyn',
+        'get.desec',
+    )
+])
+@pytest.mark.parametrize("protocol", ['http', 'https'])
+def test_redirects(api_anon, protocol, hostname):
+    api_anon.headers = {}
+    expected_locations = [https_url]
+    if protocol == 'http':
+        expected_locations.append(f'https://{hostname}/')
+    if hostname.startswith('www.'):
+        expected_locations.append('{}://{}/'.format(protocol, hostname.removeprefix('www.')))
+    response = api_anon.get(f'{protocol}://{hostname}/', allow_redirects=False)
+    assert response.headers['Location'] in expected_locations
+
+
+@pytest.mark.parametrize("hostname", [
+    f'{subname}.{os.environ["DESECSTACK_DOMAIN"]}' for subname in (
+        'checkip.dedyn',
+        'checkipv4.dedyn',
+        'checkipv6.dedyn',
+    )
+])
+@pytest.mark.parametrize("protocol", ['http', 'https'])
+@pytest.mark.parametrize("server_ip", [ipv4, ipv6])
+def test_checkip(api_anon, server_ip, protocol, hostname):
+    api_anon.headers = {}
+    ip_version = 'v6' if ':' in server_ip else 'v4'
+    with HostsOverride(hostname, server_ip):
+        if not hostname.startswith('checkip.') and not hostname.startswith(f'checkip{ip_version}.'):
+            with pytest.raises(exceptions.ConnectionError) as excinfo:
+                api_anon.get(f'{protocol}://{hostname}/', allow_redirects=False, verify=False)
+            assert 'RemoteDisconnected' in str(excinfo)
+            return
+
+        response = api_anon.get(f'{protocol}://{hostname}/', allow_redirects=False)
+        if protocol == 'http':
+            assert response.headers['Location'] == f'https://{hostname}/'
+        else:
+            factories = {'v4': ipaddress.IPv4Address, 'v6': ipaddress.IPv6Address}
+            assert factories[ip_version](response.text)
+
+
+@pytest.mark.parametrize("hostname", [ipv4, f'[{ipv6}]'])
+@pytest.mark.parametrize("protocol", ['http', 'https'])
+def test_unknown_hosts(api_anon, protocol, hostname):
+    api_anon.headers = {}
+    with pytest.raises(exceptions.ConnectionError) as excinfo:
+        api_anon.get(f'{protocol}://{hostname}/', allow_redirects=False, verify=False)
+    assert 'RemoteDisconnected' in str(excinfo)
+
+
+def test_security_headers(api_anon):
+    api_anon.headers = {}
+    expected_headers = {
+        'Strict-Transport-Security': 'max-age=31536000; includeSubdomains; preload',
+        'Content-Security-Policy': "default-src 'self'; frame-src 'none'; connect-src 'self'; font-src 'self'; "
+                                   "img-src 'self' data:; media-src data:; script-src 'self' 'unsafe-eval'; "
+                                   "style-src 'self' 'unsafe-inline'; base-uri 'self'; frame-ancestors 'none'; "
+                                   "block-all-mixed-content; form-action 'none';",
+        'X-Frame-Options': 'deny',
+        'X-Content-Type-Options': 'nosniff',
+        'Referrer-Policy': 'strict-origin-when-cross-origin',
+        'X-XSS-Protection': '1; mode=block',
+    }
+    response = api_anon.get(https_url)
+    for k, v in expected_headers.items():
+        assert response.headers[k] == v