test_replication.py 4.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110
  1. from base64 import b64decode
  2. import os
  3. import socket
  4. import dns.query
  5. import pytest
  6. from conftest import DeSECAPIV1Client, return_eventually, query_replication, random_domainname, assert_eventually, \
  7. FaketimeShift
  8. some_ds_records = [
  9. '60604 8 1 ef66f772935b412376c8445c4442b802b0322814',
  10. '60604 8 2 c2739629145faaf464ff1bc65612fd1eb5766e80c96932d808edfb55d1e1f2ce',
  11. '60604 8 4 5943dac4fc4aad637445f483b0f43bd4152fab19250fd26df82bf12020a7f7101caa17e723cf433f43d2bbed11231e03',
  12. ]
  13. def test_signature_rotation(api_user_domain: DeSECAPIV1Client):
  14. name = random_domainname()
  15. api_user_domain.domain_create(name)
  16. rrsig = return_eventually(lambda: query_replication(name, "", 'RRSIG', covers='SOA'), timeout=20)
  17. with FaketimeShift(days=7):
  18. assert_eventually(lambda: rrsig != query_replication(name, "", 'RRSIG', covers='SOA'), timeout=60)
  19. def test_zone_deletion(api_user_domain: DeSECAPIV1Client):
  20. name = api_user_domain.domain
  21. assert_eventually(lambda: query_replication(name, "", 'SOA') is not None, timeout=20)
  22. api_user_domain.domain_destroy(name)
  23. assert_eventually(lambda: query_replication(name, "", 'SOA') is None, timeout=20)
  24. @pytest.mark.performance
  25. def test_signature_rotation_performance(api_user_domain: DeSECAPIV1Client):
  26. root_domain = api_user_domain.domain
  27. # test configuration
  28. bulk_block_size = 500
  29. domain_sizes = {
  30. # number of delegations: number of zones
  31. 2000: 1,
  32. 1000: 2,
  33. 10: 10,
  34. }
  35. # create test domains
  36. domain_names = {
  37. num_delegations: [random_domainname() + f'.num-ds-{num_delegations}.' + root_domain for _ in range(num_zones)]
  38. for num_delegations, num_zones in domain_sizes.items()
  39. }
  40. for num_delegations, names in domain_names.items():
  41. for name in names:
  42. # create a domain with name `name` and `num_delegations` delegations
  43. api_user_domain.domain_create(name)
  44. for a in range(0, num_delegations, bulk_block_size): # run block-wise to avoid exceeding max request size
  45. r = api_user_domain.rr_set_create_bulk(
  46. name,
  47. [
  48. {"subname": f'x{i}', "type": "DS", "ttl": 3600, "records": some_ds_records}
  49. for i in range(a, a + bulk_block_size)
  50. ] + [
  51. {"subname": f'x{i}', "type": "NS", "ttl": 3600, "records": ['ns1.test.', 'ns2.test.']}
  52. for i in range(a, a + bulk_block_size)
  53. ]
  54. )
  55. assert r.status_code == 200
  56. # retrieve all SOA RRSIGs
  57. soa_rrsig = {}
  58. for names in domain_names.values():
  59. for name in names:
  60. soa_rrsig[name] = return_eventually(lambda: query_replication(name, "", 'RRSIG', covers='SOA'), timeout=20)
  61. # rotate signatures
  62. with FaketimeShift(days=7):
  63. # assert SOA RRSIG has been updated
  64. for names in domain_names.values():
  65. for name in names:
  66. assert_eventually(
  67. lambda: soa_rrsig[name] != query_replication(name, "", 'RRSIG', covers='SOA'),
  68. timeout=600, # depending on number of domains in the database, this value requires increase
  69. )
  70. def test_tsig_axfr(api_user_domain: DeSECAPIV1Client):
  71. ns_ip = socket.gethostbyname('nsmaster')
  72. def count_xfr_rrsets(**kwargs):
  73. xfr = dns.query.xfr(ns_ip, api_user_domain.domain, **kwargs)
  74. zone = dns.zone.from_xfr(xfr)
  75. ## from dnspython 2.2.0 on
  76. #zone = dns.zone.Zone(api_user_domain.domain)
  77. #query, _ = dns.xfr.make_query(zone, **kwargs)
  78. #dns.query.inbound_xfr(ns_ip, zone, query)
  79. return sum(1 for _ in zone.iterate_rdatasets())
  80. with pytest.raises(dns.xfr.TransferError) as exc_info:
  81. count_xfr_rrsets()
  82. assert exc_info.value.rcode == dns.rcode.NOTAUTH
  83. keyring = {'default.': b64decode('XXXXXXXXXXXXXXXXXXXXXX==')}
  84. with pytest.raises(dns.xfr.TransferError) as exc_info:
  85. count_xfr_rrsets(keyring=keyring, keyname=None)
  86. assert exc_info.value.rcode == dns.rcode.NOTAUTH
  87. keyring = {'default.': b64decode(os.environ['DESECSTACK_NSMASTER_TSIGKEY'])}
  88. assert_eventually(lambda: count_xfr_rrsets(keyring=keyring, keyname=None) > 5, timeout=20, retry_on=(Exception,))