pch.py 1.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354
  1. import json
  2. import requests
  3. from django.conf import settings
  4. from desecapi import metrics
  5. from desecapi.exceptions import PCHException
  6. _config = {
  7. "base_url": settings.PCH_API,
  8. "token": settings.PCH_API_TOKEN,
  9. }
  10. def _pch_request(
  11. method,
  12. *,
  13. path,
  14. expect_status,
  15. data=None,
  16. accept="application/json",
  17. ):
  18. if data is not None:
  19. data = json.dumps(data)
  20. headers = {
  21. "Accept": accept,
  22. "User-Agent": "desecapi",
  23. "Authorization": _config["token"],
  24. }
  25. r = requests.request(method, _config["base_url"] + path, data=data, headers=headers)
  26. if r.status_code not in expect_status:
  27. metrics.get("desecapi_pch_request_failure").labels(
  28. method, path, r.status_code
  29. ).inc()
  30. raise PCHException(response=r)
  31. metrics.get("desecapi_pch_request_success").labels(method, r.status_code).inc()
  32. return r
  33. def _post(path, data, **kwargs):
  34. return _pch_request("post", path=path, data=data, **kwargs)
  35. def _delete(path, data, **kwargs):
  36. return _pch_request("delete", path=path, data=data, **kwargs)
  37. def create_domains(domains):
  38. _post("/zones", {"zones": domains}, expect_status=[201])
  39. def delete_domains(domains):
  40. _delete("/zones", {"zones": domains}, expect_status=[200])