Browse Source

feat(e2e2): use dnspython RRset objects

Nils Wisiol 3 years ago
parent
commit
97342c2da0

+ 6 - 4
test/e2e2/conftest.py

@@ -331,9 +331,11 @@ def api_user_domain_rrsets(api_user_domain, init_rrsets: dict) -> DeSECAPIV1Clie
     """
 
     def _normalize_rrset(rrset, qtype):
+        if not rrset:
+            return None, None
+        ttl, records = rrset.ttl, {rr.to_text() for rr in rrset}
         if qtype not in ('CDS', 'CDNSKEY', 'DNSKEY'):
-            return rrset
-        ttl, records = rrset
+            return ttl, records
         return ttl, {' '.join(map(lambda x: x.replace(' ', ''), record.split(' ', 3))) for record in records}
 
     def _assert_rrsets(self, rrsets):
@@ -405,10 +407,10 @@ class NSClient:
             section = dns.message.AUTHORITY if qtype == dns.rdatatype.from_text('NS') else dns.message.ANSWER
             response = answer.find_rrset(section, qname, dns.rdataclass.IN, qtype)
             tsprint(f'DNS <<< {response}')
-            return response.ttl, {i.to_text() for i in response.items}
+            return response
         except KeyError:
             tsprint('DNS <<< !!! not found !!! Complete Answer below:\n' + answer.to_text())
-            return None, set()
+            return None
 
 
 class NSLordClient(NSClient):

+ 1 - 1
test/e2e2/spec/test_api_domains.py

@@ -9,7 +9,7 @@ def test_create(api_user: DeSECAPIV1Client):
 
 def test_get(api_user_domain: DeSECAPIV1Client):
     domain = api_user_domain.get(f"/domains/{api_user_domain.domain}/").json()
-    assert NSLordClient.query(api_user_domain.domain, 'CDS')[1] == set(domain['keys'][0]['ds'])
+    assert {rr.to_text() for rr in NSLordClient.query(api_user_domain.domain, 'CDS')} == set(domain['keys'][0]['ds'])
     assert domain['name'] == api_user_domain.domain
 
 

+ 14 - 7
test/e2e2/spec/test_api_rr.py

@@ -379,9 +379,12 @@ def test_create_valid_canonical(api_user_domain: DeSECAPIV1Client, rr_type: str,
     if value is not None:
         assert api_user_domain.rr_set_create(domain_name, rr_type, [value], subname=subname).status_code == 201
         expected.add(value)
-    _, rrset = NSLordClient.query(f'{subname}.{domain_name}'.strip('.'), rr_type)
+    rrset = {rr.to_text() for rr in NSLordClient.query(f'{subname}.{domain_name}'.strip('.'), rr_type)}
     assert rrset == expected
-    assert_eventually(lambda: query_replication(domain_name, subname, rr_type) == expected)
+    assert_eventually(
+        lambda: query_replication(domain_name, subname, rr_type) == expected,
+        timeout=20,
+    )
 
 
 @pytest.mark.parametrize("rr_type,value", generate_params(VALID_RECORDS_NON_CANONICAL))
@@ -395,9 +398,13 @@ def test_create_valid_non_canonical(api_user_domain: DeSECAPIV1Client, rr_type:
     if value is not None:
         assert api_user_domain.rr_set_create(domain_name, rr_type, [value], subname=subname).status_code == 201
         expected.add(value)
-    _, rrset = NSLordClient.query(f'{subname}.{domain_name}'.strip('.'), rr_type)
+    rrset = NSLordClient.query(f'{subname}.{domain_name}'.strip('.'), rr_type)
     assert len(rrset) == len(expected)
-    assert_eventually(lambda: len(query_replication(domain_name, subname, rr_type)) == len(expected))
+    assert_eventually(
+        lambda: len(query_replication(domain_name, subname, rr_type)) == len(expected),
+        retry_on=(TypeError, AssertionError),
+        timeout=20,
+    )
 
 
 @pytest.mark.parametrize("rr_type,value", INVALID_RECORDS_PARAMS)
@@ -408,7 +415,7 @@ def test_create_invalid(api_user_domain: DeSECAPIV1Client, rr_type: str, value:
 def test_create_long_subname(api_user_domain: DeSECAPIV1Client):
     subname = 'a' * 63
     assert api_user_domain.rr_set_create(api_user_domain.domain, "AAAA", ["::1"], subname=subname).status_code == 201
-    assert NSLordClient.query(f"{subname}.{api_user_domain.domain}", "AAAA")[1] == {"::1"}
+    assert NSLordClient.query(f"{subname}.{api_user_domain.domain}", "AAAA")[0].to_text() == "::1"
     assert_eventually(lambda: query_replication(api_user_domain.domain, subname, "AAAA") == {"::1"})
 
 
@@ -419,10 +426,10 @@ def test_add_remove_DNSKEY(api_user_domain: DeSECAPIV1Client):
     # After adding another DNSKEY, we expect it to be part of the nameserver's response (along with the automatic ones)
     value = '257 3 13 aCoEWYBBVsP9Fek2oC8yqU8ocKmnS1iD SFZNORnQuHKtJ9Wpyz+kNryquB78Pyk/ NTEoai5bxoipVQQXzHlzyg=='
     assert api_user_domain.rr_set_create(domain_name, 'DNSKEY', [value], subname='').status_code == 201
-    assert NSLordClient.query(domain_name, 'DNSKEY')[1] == auto_dnskeys | {value}
+    assert {rr.to_text() for rr in NSLordClient.query(domain_name, 'DNSKEY')} == auto_dnskeys | {value}
     assert_eventually(lambda: query_replication(domain_name, '', 'DNSKEY') == auto_dnskeys | {value})
 
     # After deleting it, we expect that the automatically managed ones are still there
     assert api_user_domain.rr_set_delete(domain_name, "DNSKEY", subname='').status_code == 204
-    assert NSLordClient.query(domain_name, 'DNSKEY')[1] == auto_dnskeys
+    assert {rr.to_text() for rr in NSLordClient.query(domain_name, 'DNSKEY')} == auto_dnskeys
     assert_eventually(lambda: query_replication(domain_name, '', 'DNSKEY') == auto_dnskeys)

+ 4 - 1
test/e2e2/spec/test_dyndns.py

@@ -41,7 +41,10 @@ def test(api_user_lps_domain: DeSECAPIV1Client, auth_method, base_url, subname):
             }
             for qtype in ['A', 'AAAA']
         }
-        rrs_dns = {qtype: NSLordClient.query(params.get('hostname', domain), qtype)[1] for qtype in ['A', 'AAAA']}
+        rrs_dns = {
+            qtype: {rr.to_text() for rr in NSLordClient.query(params.get('hostname', domain), qtype) or []}
+            for qtype in ['A', 'AAAA']
+        }
 
         for expected_net, qtype in [(expected_ipv4, 'A'), (expected_ipv6, 'AAAA')]:
             assert len(rrs_api[qtype]) == (1 if expected_net else 0)