|
@@ -281,6 +281,25 @@ class DeSECAPIV1Client:
|
|
|
def assert_rrsets(self, rrsets: dict, init_rrsets: dict = None) -> None:
|
|
|
init_rrsets = init_rrsets or {}
|
|
|
|
|
|
+ # Build expectation from rrsets and init_rrsets
|
|
|
+ rrsets_to_check = {
|
|
|
+ (subname, qtype): rrsets.get((subname, qtype)) or init_rrsets[(subname, qtype)]
|
|
|
+ for (subname, qtype) in rrsets.keys()
|
|
|
+ }
|
|
|
+
|
|
|
+ rrsets_expected = {
|
|
|
+ (subname, qtype): (ttl, records)
|
|
|
+ for (subname, qtype), (ttl, records) in rrsets_to_check.items()
|
|
|
+ if records
|
|
|
+ }
|
|
|
+
|
|
|
+ rrsets_unexpected = {
|
|
|
+ (subname, qtype): (ttl, records)
|
|
|
+ for (subname, qtype), (ttl, records) in rrsets_to_check.items()
|
|
|
+ if not records
|
|
|
+ }
|
|
|
+
|
|
|
+ # Query DNS for RR set values
|
|
|
def normalize_rrset(rrset, qtype):
|
|
|
if not rrset:
|
|
|
return None, None
|
|
@@ -289,23 +308,26 @@ class DeSECAPIV1Client:
|
|
|
return ttl, records
|
|
|
return ttl, {' '.join(map(lambda x: x.replace(' ', ''), record.split(' ', 3))) for record in records}
|
|
|
|
|
|
+ rrsets_dns = {
|
|
|
+ (subname, qtype): normalize_rrset(NSLordClient.query(f'{subname}.{self.domain}'.lstrip('.'), qtype), qtype)
|
|
|
+ for (subname, qtype) in rrsets_to_check.keys()
|
|
|
+ }
|
|
|
+ rrsets_dns = {k: v for k, v in rrsets_dns.items() if v != (None, None)}
|
|
|
+
|
|
|
+ # Query API for RR set values
|
|
|
rrsets_api = {
|
|
|
(rrset['subname'], rrset['type']): (rrset['ttl'], set(rrset['records']))
|
|
|
for rrset in self.get(f'/domains/{self.domain}/rrsets/').json()
|
|
|
- }
|
|
|
- rrsets_dns = {
|
|
|
- (subname, qtype): normalize_rrset(NSLordClient.query(f'{subname}.{self.domain}'.lstrip('.'), qtype), qtype)
|
|
|
- for (subname, qtype) in rrsets.keys()
|
|
|
+ if (rrset['subname'], rrset['type']) in rrsets_to_check
|
|
|
}
|
|
|
|
|
|
- for (subname, qtype), v in rrsets.items():
|
|
|
- (ttl, records) = v or init_rrsets[(subname, qtype)] # if None, check init_rrsets
|
|
|
- if not records:
|
|
|
- assert (subname, qtype) not in rrsets_api
|
|
|
- assert not rrsets_dns[(subname, qtype)][1]
|
|
|
- else:
|
|
|
- assert rrsets_api[(subname, qtype)] == (ttl, records)
|
|
|
- assert rrsets_dns[(subname, qtype)] == (ttl, records)
|
|
|
+ # Assert DNS responses fulfil expectations
|
|
|
+ assert rrsets_dns.keys() & rrsets_unexpected.keys() == set()
|
|
|
+ assert rrsets_expected == rrsets_dns
|
|
|
+
|
|
|
+ # Assert API responses fulfil expectations
|
|
|
+ assert rrsets_api.keys() & rrsets_unexpected.keys() == set()
|
|
|
+ assert rrsets_expected == rrsets_api
|
|
|
|
|
|
|
|
|
class DeSECAPIV2Client(DeSECAPIV1Client):
|