dns_utils.py 2.1 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889
  1. from typing import Optional
  2. import dns.resolver
  3. def _get_dns_resolver():
  4. my_resolver = dns.resolver.Resolver()
  5. # 1.1.1.1 is CloudFlare's public DNS server
  6. my_resolver.nameservers = ["1.1.1.1"]
  7. return my_resolver
  8. def get_cname_record(hostname) -> Optional[str]:
  9. """Return the CNAME record if exists for a domain, WITHOUT the trailing period at the end"""
  10. try:
  11. answers = _get_dns_resolver().query(hostname, "CNAME")
  12. except Exception:
  13. return None
  14. for a in answers:
  15. ret = a.to_text()
  16. return ret[:-1]
  17. return None
  18. def get_mx_domains(hostname) -> [(int, str)]:
  19. """return list of (priority, domain name).
  20. domain name ends with a "." at the end.
  21. """
  22. try:
  23. answers = _get_dns_resolver().query(hostname, "MX")
  24. except Exception:
  25. return []
  26. ret = []
  27. for a in answers:
  28. record = a.to_text() # for ex '20 alt2.aspmx.l.google.com.'
  29. parts = record.split(" ")
  30. ret.append((int(parts[0]), parts[1]))
  31. return ret
  32. _include_spf = "include:"
  33. def get_spf_domain(hostname) -> [str]:
  34. """return all domains listed in *include:*"""
  35. try:
  36. answers = _get_dns_resolver().query(hostname, "TXT")
  37. except Exception:
  38. return []
  39. ret = []
  40. for a in answers: # type: dns.rdtypes.ANY.TXT.TXT
  41. for record in a.strings:
  42. record = record.decode() # record is bytes
  43. if record.startswith("v=spf1"):
  44. parts = record.split(" ")
  45. for part in parts:
  46. if part.startswith(_include_spf):
  47. ret.append(part[part.find(_include_spf) + len(_include_spf) :])
  48. return ret
  49. def get_txt_record(hostname) -> [str]:
  50. """return all domains listed in *include:*"""
  51. try:
  52. answers = _get_dns_resolver().query(hostname, "TXT")
  53. except Exception:
  54. return []
  55. ret = []
  56. for a in answers: # type: dns.rdtypes.ANY.TXT.TXT
  57. for record in a.strings:
  58. record = record.decode() # record is bytes
  59. ret.append(record)
  60. return ret