dns.py 5.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135
  1. import re
  2. import struct
  3. from ipaddress import IPv6Address
  4. import dns
  5. import dns.name
  6. import dns.rdtypes.txtbase, dns.rdtypes.svcbbase
  7. import dns.rdtypes.ANY.CDS, dns.rdtypes.ANY.DLV, dns.rdtypes.ANY.DS, dns.rdtypes.ANY.MX, dns.rdtypes.ANY.NS
  8. import dns.rdtypes.IN.AAAA, dns.rdtypes.IN.SRV
  9. def _strip_quotes_decorator(func):
  10. return lambda *args, **kwargs: func(*args, **kwargs)[1:-1]
  11. # Ensure that dnspython agrees with pdns' expectations for SVCB / HTTPS parameters.
  12. # WARNING: This is a global side-effect. It can't be done by extending a class, because dnspython hardcodes the use of
  13. # their dns.rdtypes.svcbbase.*Param classes in the global dns.rdtypes.svcbbase._class_for_key dictionary. We either have
  14. # to globally mess with that dict and insert our custom class, or we just mess with their classes directly.
  15. dns.rdtypes.svcbbase.ALPNParam.to_text = _strip_quotes_decorator(dns.rdtypes.svcbbase.ALPNParam.to_text)
  16. dns.rdtypes.svcbbase.IPv4HintParam.to_text = _strip_quotes_decorator(dns.rdtypes.svcbbase.IPv4HintParam.to_text)
  17. dns.rdtypes.svcbbase.IPv6HintParam.to_text = _strip_quotes_decorator(dns.rdtypes.svcbbase.IPv6HintParam.to_text)
  18. dns.rdtypes.svcbbase.MandatoryParam.to_text = _strip_quotes_decorator(dns.rdtypes.svcbbase.MandatoryParam.to_text)
  19. dns.rdtypes.svcbbase.PortParam.to_text = _strip_quotes_decorator(dns.rdtypes.svcbbase.PortParam.to_text)
  20. @dns.immutable.immutable
  21. class AAAA(dns.rdtypes.IN.AAAA.AAAA):
  22. def to_text(self, origin=None, relativize=True, **kw):
  23. address = super().to_text(origin, relativize, **kw)
  24. return IPv6Address(address).compressed
  25. @dns.immutable.immutable
  26. class LongQuotedTXT(dns.rdtypes.txtbase.TXTBase):
  27. """
  28. A TXT record like RFC 1035, but
  29. - allows arbitrarily long tokens, and
  30. - all tokens must be quoted.
  31. """
  32. def __init__(self, rdclass, rdtype, strings):
  33. # Same as in parent class, but with max_length=None. Note that we are calling __init__ from the grandparent.
  34. super(dns.rdtypes.txtbase.TXTBase, self).__init__(rdclass, rdtype)
  35. self.strings = self._as_tuple(strings,
  36. lambda x: self._as_bytes(x, True, max_length=None))
  37. @classmethod
  38. def from_text(cls, rdclass, rdtype, tok, origin=None, relativize=True):
  39. strings = []
  40. for token in tok.get_remaining():
  41. token = token.unescape_to_bytes()
  42. # The 'if' below is always true in the current code, but we
  43. # are leaving this check in in case things change some day.
  44. if not token.is_quoted_string():
  45. raise dns.exception.SyntaxError("Content must be quoted.")
  46. strings.append(token.value)
  47. if len(strings) == 0:
  48. raise dns.exception.UnexpectedEnd
  49. return cls(rdclass, rdtype, strings)
  50. def _to_wire(self, file, compress=None, origin=None, canonicalize=False):
  51. for long_s in self.strings:
  52. for s in [long_s[i:i+255] for i in range(0, max(len(long_s), 1), 255)]:
  53. l = len(s)
  54. assert l < 256
  55. file.write(struct.pack('!B', l))
  56. file.write(s)
  57. # TODO remove when https://github.com/rthalley/dnspython/pull/625 is in the main codebase
  58. class _DigestLengthMixin:
  59. _digest_length_by_type = {
  60. 1: 20, # SHA-1, RFC 3658 Sec. 2.4
  61. 2: 32, # SHA-256, RFC 4509 Sec. 2.2
  62. 3: 32, # GOST R 34.11-94, RFC 5933 Sec. 4 in conjunction with RFC 4490 Sec. 2.1
  63. 4: 48, # SHA-384, RFC 6605 Sec. 2
  64. }
  65. def __init__(self, *args, **kwargs):
  66. super().__init__(*args, **kwargs)
  67. try:
  68. if self.digest_type == 0: # reserved, RFC 3658 Sec. 2.4
  69. raise ValueError('digest type 0 is reserved')
  70. expected_length = _DigestLengthMixin._digest_length_by_type[self.digest_type]
  71. except KeyError:
  72. raise ValueError('unknown digest type')
  73. if len(self.digest) != expected_length:
  74. raise ValueError('digest length inconsistent with digest type')
  75. @dns.immutable.immutable
  76. class CDS(_DigestLengthMixin, dns.rdtypes.ANY.CDS.CDS):
  77. pass
  78. @dns.immutable.immutable
  79. class DLV(_DigestLengthMixin, dns.rdtypes.ANY.DLV.DLV):
  80. pass
  81. @dns.immutable.immutable
  82. class DS(_DigestLengthMixin, dns.rdtypes.ANY.DS.DS):
  83. pass
  84. def _HostnameMixin(name_field, *, allow_root):
  85. # Taken from https://github.com/PowerDNS/pdns/blob/4646277d05f293777a3d2423a3b188ccdf42c6bc/pdns/dnsname.cc#L419
  86. hostname_re = re.compile(r'^(([A-Za-z0-9]([A-Za-z0-9-]*[A-Za-z0-9])?)\.)+$')
  87. class Mixin:
  88. def to_text(self, origin=None, relativize=True, **kw):
  89. name = getattr(self, name_field)
  90. if not (allow_root and name == dns.name.root) and hostname_re.match(str(name)) is None:
  91. raise ValueError(f'invalid {name_field}: {name}')
  92. return super().to_text(origin, relativize, **kw)
  93. return Mixin
  94. @dns.immutable.immutable
  95. class MX(_HostnameMixin('exchange', allow_root=True), dns.rdtypes.ANY.MX.MX):
  96. pass
  97. @dns.immutable.immutable
  98. class NS(_HostnameMixin('target', allow_root=False), dns.rdtypes.ANY.NS.NS):
  99. pass
  100. @dns.immutable.immutable
  101. class SRV(_HostnameMixin('target', allow_root=True), dns.rdtypes.IN.SRV.SRV):
  102. pass