dns.py 3.9 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394
  1. import struct
  2. import dns
  3. import dns.rdtypes.txtbase, dns.rdtypes.svcbbase
  4. import dns.rdtypes.ANY.CDS, dns.rdtypes.ANY.DLV, dns.rdtypes.ANY.DS
  5. def _strip_quotes_decorator(func):
  6. return lambda *args, **kwargs: func(*args, **kwargs)[1:-1]
  7. # Ensure that dnspython agrees with pdns' expectations for SVCB / HTTPS parameters.
  8. # WARNING: This is a global side-effect. It can't be done by extending a class, because dnspython hardcodes the use of
  9. # their dns.rdtypes.svcbbase.*Param classes in the global dns.rdtypes.svcbbase._class_for_key dictionary. We either have
  10. # to globally mess with that dict and insert our custom class, or we just mess with their classes directly.
  11. dns.rdtypes.svcbbase.ALPNParam.to_text = _strip_quotes_decorator(dns.rdtypes.svcbbase.ALPNParam.to_text)
  12. dns.rdtypes.svcbbase.IPv4HintParam.to_text = _strip_quotes_decorator(dns.rdtypes.svcbbase.IPv4HintParam.to_text)
  13. dns.rdtypes.svcbbase.IPv6HintParam.to_text = _strip_quotes_decorator(dns.rdtypes.svcbbase.IPv6HintParam.to_text)
  14. dns.rdtypes.svcbbase.MandatoryParam.to_text = _strip_quotes_decorator(dns.rdtypes.svcbbase.MandatoryParam.to_text)
  15. dns.rdtypes.svcbbase.PortParam.to_text = _strip_quotes_decorator(dns.rdtypes.svcbbase.PortParam.to_text)
  16. @dns.immutable.immutable
  17. class LongQuotedTXT(dns.rdtypes.txtbase.TXTBase):
  18. """
  19. A TXT record like RFC 1035, but
  20. - allows arbitrarily long tokens, and
  21. - all tokens must be quoted.
  22. """
  23. def __init__(self, rdclass, rdtype, strings):
  24. # Same as in parent class, but with max_length=None. Note that we are calling __init__ from the grandparent.
  25. super(dns.rdtypes.txtbase.TXTBase, self).__init__(rdclass, rdtype)
  26. self.strings = self._as_tuple(strings,
  27. lambda x: self._as_bytes(x, True, max_length=None))
  28. @classmethod
  29. def from_text(cls, rdclass, rdtype, tok, origin=None, relativize=True):
  30. strings = []
  31. for token in tok.get_remaining():
  32. token = token.unescape_to_bytes()
  33. # The 'if' below is always true in the current code, but we
  34. # are leaving this check in in case things change some day.
  35. if not token.is_quoted_string():
  36. raise dns.exception.SyntaxError("Content must be quoted.")
  37. strings.append(token.value)
  38. if len(strings) == 0:
  39. raise dns.exception.UnexpectedEnd
  40. return cls(rdclass, rdtype, strings)
  41. def _to_wire(self, file, compress=None, origin=None, canonicalize=False):
  42. for long_s in self.strings:
  43. for s in [long_s[i:i+255] for i in range(0, max(len(long_s), 1), 255)]:
  44. l = len(s)
  45. assert l < 256
  46. file.write(struct.pack('!B', l))
  47. file.write(s)
  48. # TODO remove when https://github.com/rthalley/dnspython/pull/625 is in the main codebase
  49. class _DigestLengthMixin():
  50. _digest_length_by_type = {
  51. 1: 20, # SHA-1, RFC 3658 Sec. 2.4
  52. 2: 32, # SHA-256, RFC 4509 Sec. 2.2
  53. 3: 32, # GOST R 34.11-94, RFC 5933 Sec. 4 in conjunction with RFC 4490 Sec. 2.1
  54. 4: 48, # SHA-384, RFC 6605 Sec. 2
  55. }
  56. def __init__(self, *args, **kwargs):
  57. super().__init__(*args, **kwargs)
  58. try:
  59. if self.digest_type == 0: # reserved, RFC 3658 Sec. 2.4
  60. raise ValueError('digest type 0 is reserved')
  61. expected_length = _DigestLengthMixin._digest_length_by_type[self.digest_type]
  62. except KeyError:
  63. raise ValueError('unknown digest type')
  64. if len(self.digest) != expected_length:
  65. raise ValueError('digest length inconsistent with digest type')
  66. @dns.immutable.immutable
  67. class CDS(_DigestLengthMixin, dns.rdtypes.ANY.CDS.CDS):
  68. pass
  69. @dns.immutable.immutable
  70. class DLV(_DigestLengthMixin, dns.rdtypes.ANY.DLV.DLV):
  71. pass
  72. @dns.immutable.immutable
  73. class DS(_DigestLengthMixin, dns.rdtypes.ANY.DS.DS):
  74. pass