|
@@ -278,7 +278,7 @@ class DeSECAPIV1Client:
|
|
|
# Join again
|
|
|
return {' '.join(param) for param in params}
|
|
|
|
|
|
- def assert_rrsets(self, rrsets: dict, init_rrsets: dict = None) -> None:
|
|
|
+ def assert_rrsets(self, rrsets: dict, init_rrsets: dict = None, via_dns=True, via_api=True) -> None:
|
|
|
init_rrsets = init_rrsets or {}
|
|
|
|
|
|
# Build expectation from rrsets and init_rrsets
|
|
@@ -315,20 +315,22 @@ class DeSECAPIV1Client:
|
|
|
}
|
|
|
return {k: v for k, v in rrsets_unfiltered.items() if v != (None, None)}
|
|
|
|
|
|
- # Query API for RR set values
|
|
|
- rrsets_api = {
|
|
|
- (rrset['subname'], rrset['type']): (rrset['ttl'], set(rrset['records']))
|
|
|
- for rrset in self.get(f'/domains/{self.domain}/rrsets/').json()
|
|
|
- if (rrset['subname'], rrset['type']) in rrsets_to_check
|
|
|
- }
|
|
|
+ if via_api:
|
|
|
+ # Query API for RR set values
|
|
|
+ rrsets_api = {
|
|
|
+ (rrset['subname'], rrset['type']): (rrset['ttl'], set(rrset['records']))
|
|
|
+ for rrset in self.get(f'/domains/{self.domain}/rrsets/').json()
|
|
|
+ if (rrset['subname'], rrset['type']) in rrsets_to_check
|
|
|
+ }
|
|
|
|
|
|
- # Assert API responses fulfil expectations
|
|
|
- assert rrsets_api.keys() & rrsets_unexpected.keys() == set()
|
|
|
- assert rrsets_expected == rrsets_api
|
|
|
+ # Assert API responses fulfil expectations
|
|
|
+ assert rrsets_api.keys() & rrsets_unexpected.keys() == set()
|
|
|
+ assert rrsets_expected == rrsets_api
|
|
|
|
|
|
- # Assert DNS responses fulfil expectations
|
|
|
- assert_eventually(lambda: rrsets_dns().keys() & rrsets_unexpected.keys() == set())
|
|
|
- assert_eventually(lambda: rrsets_expected == rrsets_dns())
|
|
|
+ if via_dns:
|
|
|
+ # Assert DNS responses fulfil expectations
|
|
|
+ assert_eventually(lambda: rrsets_dns().keys() & rrsets_unexpected.keys() == set())
|
|
|
+ assert_eventually(lambda: rrsets_expected == rrsets_dns())
|
|
|
|
|
|
|
|
|
|