123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354 |
- import json
- import requests
- from django.conf import settings
- from desecapi import metrics
- from desecapi.exceptions import PCHException
- _config = {
- "base_url": settings.PCH_API,
- "token": settings.PCH_API_TOKEN,
- }
- def _pch_request(
- method,
- *,
- path,
- expect_status,
- data=None,
- accept="application/json",
- ):
- if data is not None:
- data = json.dumps(data)
- headers = {
- "Accept": accept,
- "User-Agent": "desecapi",
- "Authorization": _config["token"],
- }
- r = requests.request(method, _config["base_url"] + path, data=data, headers=headers)
- if r.status_code not in expect_status:
- metrics.get("desecapi_pch_request_failure").labels(
- method, path, r.status_code
- ).inc()
- raise PCHException(response=r)
- metrics.get("desecapi_pch_request_success").labels(method, r.status_code).inc()
- return r
- def _post(path, data, **kwargs):
- return _pch_request("post", path=path, data=data, **kwargs)
- def _delete(path, data, **kwargs):
- return _pch_request("delete", path=path, data=data, **kwargs)
- def create_domains(domains):
- _post("/zones", {"zones": domains}, expect_status=[201])
- def delete_domains(domains):
- _delete("/zones", {"zones": domains}, expect_status=[200])
|