|
@@ -278,6 +278,35 @@ class DeSECAPIV1Client:
|
|
|
# Join again
|
|
|
return {' '.join(param) for param in params}
|
|
|
|
|
|
+ def assert_rrsets(self, rrsets: dict, init_rrsets: dict = None) -> None:
|
|
|
+ init_rrsets = init_rrsets or {}
|
|
|
+
|
|
|
+ def normalize_rrset(rrset, qtype):
|
|
|
+ if not rrset:
|
|
|
+ return None, None
|
|
|
+ ttl, records = rrset.ttl, {rr.to_text() for rr in rrset}
|
|
|
+ if qtype not in ('CDS', 'CDNSKEY', 'DNSKEY'):
|
|
|
+ return ttl, records
|
|
|
+ return ttl, {' '.join(map(lambda x: x.replace(' ', ''), record.split(' ', 3))) for record in records}
|
|
|
+
|
|
|
+ rrsets_api = {
|
|
|
+ (rrset['subname'], rrset['type']): (rrset['ttl'], set(rrset['records']))
|
|
|
+ for rrset in self.get(f'/domains/{self.domain}/rrsets/').json()
|
|
|
+ }
|
|
|
+ rrsets_dns = {
|
|
|
+ (subname, qtype): normalize_rrset(NSLordClient.query(f'{subname}.{self.domain}'.lstrip('.'), qtype), qtype)
|
|
|
+ for (subname, qtype) in rrsets.keys()
|
|
|
+ }
|
|
|
+
|
|
|
+ for (subname, qtype), v in rrsets.items():
|
|
|
+ (ttl, records) = v or init_rrsets[(subname, qtype)] # if None, check init_rrsets
|
|
|
+ if not records:
|
|
|
+ assert (subname, qtype) not in rrsets_api
|
|
|
+ assert not rrsets_dns[(subname, qtype)][1]
|
|
|
+ else:
|
|
|
+ assert rrsets_api[(subname, qtype)] == (ttl, records)
|
|
|
+ assert rrsets_dns[(subname, qtype)] == (ttl, records)
|
|
|
+
|
|
|
|
|
|
class DeSECAPIV2Client(DeSECAPIV1Client):
|
|
|
base_url = "https://desec." + os.environ["DESECSTACK_DOMAIN"] + "/api/v2"
|
|
@@ -329,40 +358,13 @@ def api_user_domain_rrsets(api_user_domain, init_rrsets: dict) -> DeSECAPIV1Clie
|
|
|
Access to the API with a fresh user account that owns a domain with random name. The domain is
|
|
|
equipped with RRsets from init_rrsets.
|
|
|
"""
|
|
|
-
|
|
|
- def _normalize_rrset(rrset, qtype):
|
|
|
- if not rrset:
|
|
|
- return None, None
|
|
|
- ttl, records = rrset.ttl, {rr.to_text() for rr in rrset}
|
|
|
- if qtype not in ('CDS', 'CDNSKEY', 'DNSKEY'):
|
|
|
- return ttl, records
|
|
|
- return ttl, {' '.join(map(lambda x: x.replace(' ', ''), record.split(' ', 3))) for record in records}
|
|
|
-
|
|
|
- def _assert_rrsets(self, rrsets):
|
|
|
- rrsets_api = {
|
|
|
- (rrset['subname'], rrset['type']): (rrset['ttl'], set(rrset['records']))
|
|
|
- for rrset in self.get(f'/domains/{self.domain}/rrsets/').json()
|
|
|
- }
|
|
|
- rrsets_dns = {
|
|
|
- (subname, qtype): _normalize_rrset(NSLordClient.query(f'{subname}.{self.domain}'.lstrip('.'), qtype), qtype)
|
|
|
- for subname, qtype in rrsets.keys()
|
|
|
- }
|
|
|
-
|
|
|
- for k, v in rrsets.items():
|
|
|
- v = v or init_rrsets[k] # if None, check against init_rrsets
|
|
|
- if not v[1]:
|
|
|
- assert k not in rrsets_api
|
|
|
- assert not rrsets_dns[k][1]
|
|
|
- else:
|
|
|
- assert rrsets_api[k] == v
|
|
|
- assert rrsets_dns[k] == v
|
|
|
-
|
|
|
- api_user_domain.assert_rrsets = _assert_rrsets.__get__(api_user_domain) # very hacky way of adding a method
|
|
|
-
|
|
|
- api_user_domain.post(f"/domains/{api_user_domain.domain}/rrsets/", data=[
|
|
|
- {"subname": k[0], "type": k[1], "ttl": v[0], "records": list(v[1])}
|
|
|
- for k, v in init_rrsets.items()
|
|
|
- ])
|
|
|
+ api_user_domain.rr_set_create_bulk(
|
|
|
+ api_user_domain.domain,
|
|
|
+ data=[
|
|
|
+ {"subname": subname, "type": qtype, "ttl": ttl, "records": list(records)}
|
|
|
+ for (subname, qtype), (ttl, records) in init_rrsets.items()
|
|
|
+ ]
|
|
|
+ )
|
|
|
api_user_domain.assert_rrsets(init_rrsets)
|
|
|
return api_user_domain
|
|
|
|