replication.py 7.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200
  1. import os
  2. import subprocess
  3. from datetime import datetime, timedelta
  4. from typing import List
  5. import dns.query
  6. import dns.zone
  7. from celery import shared_task
  8. from django.utils import timezone
  9. from desecapi import models
  10. class ReplicationException(Exception):
  11. def __init__(self, message, **kwargs):
  12. super().__init__(message)
  13. for k, v in kwargs.items():
  14. self.__setattr__(k, v)
  15. class GitRepositoryException(ReplicationException):
  16. pass
  17. class UnsupportedZoneNameException(ReplicationException):
  18. pass
  19. class Repository:
  20. # TODO replication performance could potentially(*) be further improved by allowing to run multiple AXFR in
  21. # parallel, and then use a file lock to synchronize git file system actions
  22. # (*) but only if the signing server can sign multiple requests in parallel
  23. _config = {
  24. 'user.email': 'api@desec.internal',
  25. 'user.name': 'deSEC API',
  26. }
  27. def __init__(self, path):
  28. self.path = path
  29. def _git(self, *args):
  30. cmd = ['/usr/bin/git'] + list(args)
  31. print('>>> ' + str(cmd))
  32. with subprocess.Popen(
  33. cmd,
  34. bufsize=0,
  35. cwd=self.path,
  36. stderr=subprocess.PIPE,
  37. stdout=subprocess.PIPE,
  38. env={'HOME': '/'}, # Celery does not adjust $HOME when dropping privleges
  39. ) as p:
  40. rcode = p.wait()
  41. stderr = p.stderr.read()
  42. stdout = p.stdout.read()
  43. try:
  44. stderr, stdout = stderr.decode(), stdout.decode()
  45. except UnicodeDecodeError:
  46. GitRepositoryException('git stdout or stderr was not valid unicode!',
  47. cmd=cmd, rcode=rcode, stderr=stderr, stdout=stdout)
  48. print('\n'.join('<<< ' + s for s in stdout.split('\n')))
  49. return cmd, rcode, stdout, stderr
  50. def _git_do(self, *args):
  51. cmd, rcode, stdout, stderr = self._git(*args)
  52. if rcode != 0:
  53. raise GitRepositoryException(f'{cmd} returned nonzero error code',
  54. cmd=cmd, rcode=rcode, stdout=stdout, stderr=stderr)
  55. if stderr.strip():
  56. raise GitRepositoryException(f'{cmd} returned non-empty error output',
  57. cmd=cmd, rcode=rcode, stdout=stdout, stderr=stderr)
  58. return stdout
  59. def _git_check(self, *args):
  60. _, rcode, _, _ = self._git(*args)
  61. return rcode
  62. def commit_all(self, msg=None):
  63. self._git_do('add', '.')
  64. if self._git_check('diff', '--exit-code', '--numstat', '--staged'):
  65. self._git_do('commit', '-m', msg or 'update')
  66. def init(self):
  67. self._git_do('init', '-b', 'main')
  68. for k, v in self._config.items():
  69. self._git_do('config', k, v)
  70. def get_head(self):
  71. return self.get_commit('HEAD')
  72. def get_commit(self, rev):
  73. try:
  74. commit_hash, commit_msg = self._git_do('show', rev, '--format=%H%n%s', '-s').split('\n', 1)
  75. return commit_hash, commit_msg[:-1]
  76. except GitRepositoryException:
  77. return None, None
  78. def remove_history(self, before: datetime):
  79. rev = self._git_do('log', f'--before={before.isoformat()}Z', '-1', '--format=%H')
  80. with open(os.path.join(self.path, '.git', 'shallow'), 'w') as f:
  81. f.writelines([rev])
  82. self._git_do('reflog', 'expire', '--expire=now', '--all')
  83. self._git_do('gc', '--prune=now') # prune only
  84. self._git_do('gc') # remaining garbage collection (e.g. compressing file revisions)
  85. class ZoneRepository(Repository):
  86. AXFR_SOURCE = '172.16.1.11'
  87. def __init__(self, path):
  88. super().__init__(path)
  89. self._config['gc.auto'] = '0'
  90. if not os.path.exists(os.path.join(self.path, '.git')):
  91. self.init()
  92. self.commit_all(msg='Inception or Recovery')
  93. update_all.delay()
  94. def refresh(self, name):
  95. if '/' in name or '\x00' in name:
  96. raise UnsupportedZoneNameException
  97. # obtain AXFR
  98. timeout = 60 # if AXFR take longer, the timeout must be increased (see also settings.py)
  99. try:
  100. xfr = list(dns.query.xfr(self.AXFR_SOURCE, name, timeout=timeout))
  101. except dns.query.TransferError as e:
  102. if e.rcode == dns.rcode.Rcode.NOTAUTH:
  103. self._delete_zone(name)
  104. else:
  105. raise
  106. else:
  107. self._update_zone(name, xfr)
  108. def _update_zone(self, name: str, xfr: List[dns.message.QueryMessage]):
  109. z = dns.zone.from_xfr(xfr, check_origin=False)
  110. try:
  111. print(f'New SOA for {name}: '
  112. f'{z.get_rrset(name="", rdtype=dns.rdatatype.SOA).to_text()}')
  113. print(f' Signature: '
  114. f'{z.get_rrset(name="", rdtype=dns.rdatatype.RRSIG, covers=dns.rdatatype.SOA).to_text()}')
  115. except AttributeError:
  116. print(f'WARNING {name} has no SOA record?!')
  117. # TODO sort AXFR? (but take care with SOA)
  118. # stable output can be achieved with
  119. # output = '\n'.join(sorted('\n'.split(z.to_text())))
  120. # but we need to see first if the frontend can handle this messed up zone file
  121. # write zone file
  122. filename = os.path.join(self.path, name + '.zone')
  123. with open(filename + '~', 'w') as f:
  124. f.write(f'; Generated by deSEC at {datetime.utcnow()}Z\n') # TODO if sorting, remove this to avoid overhead
  125. z.to_file(f)
  126. os.rename(filename + '~', filename)
  127. def _delete_zone(self, name: str):
  128. os.remove(os.path.join(self.path, name + '.zone'))
  129. ZONE_REPOSITORY_PATH = '/zones'
  130. @shared_task(queue='replication')
  131. def update(name: str):
  132. # TODO this task runs through following steps:
  133. # (1) retrieve AXFR (dedyn.io 01/2021: 8.5s)
  134. # (2) parse AXFR (dedyn.io 01/2021: 1.8s)
  135. # (3) write AXFR into zone file (dedyn.io 01/2021: 2.3s)
  136. # (4) commit into git repository (dedyn.io 01/2021: 0.5s)
  137. # To enhance performance, steps 1-3 can be executed in parallel for multiple zones with multiprocessing.
  138. # Step 4, which takes 0.5s even for very large zones, can only be executed by a single worker, as
  139. # two parallel git commits will fail
  140. print(f'updating {name}')
  141. t = timezone.now()
  142. zones = ZoneRepository(ZONE_REPOSITORY_PATH)
  143. zones.refresh(name)
  144. zones.commit_all(f'Update for {name}')
  145. models.Domain.objects.filter(name=name).update(replicated=timezone.now(), replication_duration=timezone.now() - t)
  146. @shared_task(queue='replication', priority=9)
  147. def update_all():
  148. names = models.Domain.objects.all().values_list('name', flat=True)
  149. print(f'Queuing replication for all {len(names)} zones.')
  150. for name in names:
  151. update.s(name).apply_async(priority=1)
  152. @shared_task(queue='replication')
  153. def remove_history():
  154. before = datetime.now() - timedelta(days=2)
  155. print(f'Cleaning repo data from before {before}')
  156. zones = ZoneRepository(ZONE_REPOSITORY_PATH)
  157. zones.remove_history(before=before)