瀏覽代碼

feat(api): introduce git-based zone file repository

Nils Wisiol 4 年之前
父節點
當前提交
e37f5ea896

+ 2 - 1
.github/workflows/main.yml

@@ -49,6 +49,7 @@ jobs:
         cd $GITHUB_WORKSPACE
         docker-compose build
         docker-compose -f docker-compose.yml -f docker-compose.test-e2e.yml build test-e2e
+        docker-compose -f docker-compose.yml -f docker-compose.test-e2e2.yml build test-e2e2
 
     - name: Check for missing migrations
       run: |
@@ -57,7 +58,7 @@ jobs:
 
     - name: Run e2e2 Tests
       run: |
-        docker-compose -f docker-compose.yml -f docker-compose.test-e2e2.yml run -T test-e2e2 bash -c "./apiwait 300 && python3 -m pytest -vv ."
+        docker-compose -f docker-compose.yml -f docker-compose.test-e2e2.yml run -T test-e2e2 sh -c "./apiwait 300 && python3 -m pytest -vv ."
 
     - name: e2e2 Tests Logs and Cleanup
       if: always()

+ 19 - 0
README.md

@@ -370,3 +370,22 @@ While there are certainly many ways to get started hacking desec-stack, here is
     From this point on, you are set up to use most of PyCharm's convenience features.
 
     1. For PyCharm's Python Console, the environment variables of your `.env` file and `DJANGO_SETTINGS_MODULE=api.settings_quick_test` need to be configured in Settings › Build, Execution, Deployment › Console › Django Console. (Note that if you need to work with the database, you need to initialize it first by running all migrations; otherwise, the model tables will be missing from the database.)
+
+
+## Debugging
+
+### RabbitMQ
+
+To access message queue information of RabbitMQ, use the RabbitMQ management plugin. First, port 15672 of the RabbitMQ
+container needs to be exposed (default when using `docker-compose.dev.yml`). Then, inside the container, create a user
+that can access the RabbitMQ data:
+
+```
+rabbitmq-plugins enable rabbitmq_management
+rabbitmqctl add_user admin admin
+rabbitmqctl set_user_tags admin administrator
+rabbitmqctl set_permissions admin '.*' '.*' '.*'
+```
+
+Then the web-based management interface will be available at http://localhost:15672 with user `admin` and password
+`admin`.

+ 9 - 1
api/Dockerfile

@@ -1,5 +1,8 @@
 FROM python:3.8-alpine
 
+COPY --from=trajano/alpine-libfaketime /faketime.so /lib/libfaketime.so
+RUN mkdir -p /etc/faketime
+
 RUN apk add --no-cache bash dcron postgresql-client sqlite
 
 RUN mkdir /usr/src/app
@@ -9,7 +12,7 @@ ENV PIP_DISABLE_PIP_VERSION_CHECK=1
 ENV PIP_NO_CACHE_DIR=1
 
 COPY requirements.txt /usr/src/app/
-RUN apk add --no-cache gcc freetype-dev libffi-dev musl-dev libmemcached-dev postgresql-dev jpeg-dev zlib-dev \
+RUN apk add --no-cache gcc freetype-dev libffi-dev musl-dev libmemcached-dev postgresql-dev jpeg-dev zlib-dev git \
     && pip install -r requirements.txt \
     && apk --no-cache del gcc
 RUN pip freeze
@@ -20,6 +23,11 @@ ADD ["cronhook/crontab", "cronhook/start-cron.sh", "/root/cronhook/"]
 RUN crontab /root/cronhook/crontab
 RUN chmod +x /root/cronhook/start-cron.sh
 
+RUN mkdir /zones /var/run/celerybeat-schedule \
+    && chown nobody /zones /var/run/celerybeat-schedule \
+    && chmod 755 /zones \
+    && chmod 700 /var/run/celerybeat-schedule
+
 COPY . /usr/src/app
 
 EXPOSE 8000

+ 45 - 4
api/api/celery.py

@@ -1,14 +1,55 @@
-import os
-from celery import Celery
-
+import logging
+import pprint
 
-os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'api.settings')
+import django.utils.log
+from celery import Celery
+from celery.signals import task_failure
 
 app = Celery('api', include='desecapi.mail_backends')
 app.config_from_object('django.conf:settings', namespace='CELERY')
 app.autodiscover_tasks()
 
 
+class CeleryFormatter(logging.Formatter):
+
+    def format(self, record):
+        return (
+            f'Task: { record.sender }\n'
+            f'Task arguments: { record.task_args }\n'
+            f'Task keyword arguments: { record.task_kwargs }\n'
+            f'Task ID: { record.task_id }\n'
+            f'Exception Information:\n{pprint.pformat(record.exception.__dict__)}'
+        )
+
+
 @app.task(bind=True)
 def debug_task(self):
     print('Request: {0!r}'.format(self.request))
+
+
+@task_failure.connect()
+def task_failure(task_id, exception, args, kwargs, traceback, einfo, **other_kwargs):
+    try:
+        sender = other_kwargs.get('sender').name
+    except AttributeError:
+        sender = '<unknown sender in task_failure>'
+
+    logger.error(
+        'Celery %s in %s', type(exception).__name__, sender,
+        extra={
+            'request': None,
+            'task_id': task_id,
+            'exception': exception,
+            'task_args': args,
+            'task_kwargs': pprint.pformat(kwargs),
+            'sender': sender,
+        },
+        exc_info=einfo,
+    )
+
+
+django.setup()
+logger = logging.getLogger(__name__)
+handler = django.utils.log.AdminEmailHandler()
+handler.setFormatter(CeleryFormatter())
+logger.addHandler(handler)

+ 27 - 1
api/api/settings.py

@@ -12,7 +12,9 @@ https://docs.djangoproject.com/en/1.7/ref/settings/
 import os
 from datetime import timedelta
 
+from celery.schedules import crontab
 from django.conf.global_settings import PASSWORD_HASHERS as DEFAULT_PASSWORD_HASHERS
+from kombu import Queue, Exchange
 
 BASE_DIR = os.path.dirname(os.path.dirname(__file__))
 
@@ -167,14 +169,38 @@ NSMASTER_PDNS_API_TOKEN = os.environ['DESECSTACK_NSMASTER_APIKEY']
 CATALOG_ZONE = 'catalog.internal'
 
 # Celery
+# see https://docs.celeryproject.org/en/stable/history/whatsnew-4.0.html#latentcall-django-admonition
 CELERY_BROKER_URL = 'amqp://rabbitmq'
 CELERY_EMAIL_MESSAGE_EXTRA_ATTRIBUTES = []  # required because djcelery_email.utils accesses it
-CELERY_TASK_TIME_LIMIT = 30
+CELERY_TASK_TIME_LIMIT = 300  # as zones become larger, AXFR takes longer, this needs to be increased
 TASK_CONFIG = {  # The first entry is the default queue
     'email_slow_lane': {'rate_limit': '3/m'},
     'email_fast_lane': {'rate_limit': '1/s'},
     'email_immediate_lane': {'rate_limit': None},
 }
+CELERY_TIMEZONE = 'UTC'  # timezone for task schedule below
+CELERY_BEAT_SCHEDULE = {
+    'rotate_signatures': {
+        'task': 'desecapi.replication.update_all',
+        'schedule': crontab(minute=0, hour=0, day_of_week='thursday'),
+        'options': {'priority': 5},  # priority must be higher than rotation jobs for individual domains
+    },
+    'remove_history': {
+        'task': 'desecapi.replication.remove_history',
+        'schedule': crontab(minute=42, hour='*/3'),
+        'options': {'priority': 5},
+    },
+}
+CELERY_TASK_QUEUES = [
+    Queue(
+        'replication',
+        Exchange('replication'),
+        routing_key='replication',
+        queue_arguments={'x-max-priority': 10},  # make the replication queue a priority-queue
+    ),
+    # Other queues are created automatically
+]
+CELERY_BEAT_MAX_LOOP_INTERVAL = 15  # Low value important for running e2e2 tests in reasonable time
 
 # pdns accepts request payloads of this size.
 # This will hopefully soon be configurable: https://github.com/PowerDNS/pdns/pull/7550

+ 4 - 3
api/desecapi/mail_backends.py

@@ -13,15 +13,15 @@ logger = logging.getLogger(__name__)
 
 
 class MultiLaneEmailBackend(BaseEmailBackend):
-    config = {'ignore_result': True, 'queue': 'celery'}
+    config = {'ignore_result': True}
     default_backend = 'django.core.mail.backends.smtp.EmailBackend'
 
     def __init__(self, lane: str = None, fail_silently=False, **kwargs):
         lane = lane or next(iter(settings.TASK_CONFIG))
-        self.config.update(name=lane)
+        self.config.update(name=lane, queue=lane)
         self.config.update(settings.TASK_CONFIG[lane])
         self.task_kwargs = kwargs.copy()
-        # Make a copy tor ensure we don't modify input dict when we set the 'lane'
+        # Make a copy to ensure we don't modify input dict when we set the 'lane'
         self.task_kwargs['debug'] = self.task_kwargs.pop('debug', {}).copy()
         self.task_kwargs['debug']['lane'] = lane
         super().__init__(fail_silently)
@@ -47,4 +47,5 @@ class MultiLaneEmailBackend(BaseEmailBackend):
 TASKS = {
     name: MultiLaneEmailBackend(lane=name, fail_silently=True).task
     for name in settings.TASK_CONFIG
+    if name.startswith('email_')
 }

+ 5 - 1
api/desecapi/metrics.py

@@ -1,4 +1,4 @@
-from prometheus_client import Counter, Histogram
+from prometheus_client import Counter, Histogram, Summary
 
 metrics = {}
 
@@ -15,6 +15,10 @@ def set_histogram(name, *args, **kwargs):
     metrics[name] = Histogram(name, *args, **kwargs)
 
 
+def set_summary(name, *args, **kwargs):
+    metrics[name] = Summary(name, *args, **kwargs)
+
+
 # models.py metrics
 set_counter('desecapi_captcha_content_created', 'number of times captcha content created', ['kind'])
 set_counter('desecapi_autodelegation_created', 'number of autodelegations added')

+ 23 - 0
api/desecapi/migrations/0014_replication.py

@@ -0,0 +1,23 @@
+# Generated by Django 3.1.5 on 2021-01-21 19:29
+
+from django.db import migrations, models
+
+
+class Migration(migrations.Migration):
+
+    dependencies = [
+        ('desecapi', '0013_user_needs_captcha'),
+    ]
+
+    operations = [
+        migrations.AddField(
+            model_name='domain',
+            name='replicated',
+            field=models.DateTimeField(blank=True, null=True),
+        ),
+        migrations.AddField(
+            model_name='domain',
+            name='replication_duration',
+            field=models.DurationField(blank=True, null=True),
+        ),
+    ]

+ 2 - 0
api/desecapi/models.py

@@ -221,6 +221,8 @@ class Domain(ExportModelOperationsMixin('Domain'), models.Model):
                             validators=validate_domain_name)
     owner = models.ForeignKey(User, on_delete=models.PROTECT, related_name='domains')
     published = models.DateTimeField(null=True, blank=True)
+    replicated = models.DateTimeField(null=True, blank=True)
+    replication_duration = models.DurationField(null=True, blank=True)
     minimum_ttl = models.PositiveIntegerField(default=_minimum_ttl_default.__func__)
     renewal_state = models.IntegerField(choices=RenewalState.choices, default=RenewalState.IMMORTAL)
     renewal_changed = models.DateTimeField(auto_now_add=True)

+ 5 - 1
api/desecapi/pdns_change_tracker.py

@@ -6,7 +6,7 @@ from django.db.models.signals import post_save, post_delete
 from django.db.transaction import atomic
 from django.utils import timezone
 
-from desecapi import metrics
+from desecapi import metrics, replication
 from desecapi.models import RRset, RR, Domain
 from desecapi.pdns import _pdns_post, NSLORD, NSMASTER, _pdns_delete, _pdns_patch, _pdns_put, pdns_id, \
     construct_catalog_rrset
@@ -255,10 +255,12 @@ class PDNSChangeTracker:
         # TODO introduce two phase commit protocol
         changes = self._compute_changes()
         axfr_required = set()
+        replication_required = set()
         for change in changes:
             try:
                 change.pdns_do()
                 change.api_do()
+                replication_required.add(change.domain_name)
                 if change.axfr_required:
                     axfr_required.add(change.domain_name)
             except Exception as e:
@@ -268,6 +270,8 @@ class PDNSChangeTracker:
 
         self.transaction.__exit__(None, None, None)
 
+        for name in replication_required:
+            replication.update.delay(name)
         for name in axfr_required:
             _pdns_put(NSMASTER, '/zones/%s/axfr-retrieve' % pdns_id(name))
         Domain.objects.filter(name__in=axfr_required).update(published=timezone.now())

+ 200 - 0
api/desecapi/replication.py

@@ -0,0 +1,200 @@
+import os
+import subprocess
+from datetime import datetime, timedelta
+from typing import List
+
+import dns.query
+import dns.zone
+from celery import shared_task
+from django.utils import timezone
+
+from desecapi import models
+
+
+class ReplicationException(Exception):
+
+    def __init__(self, message, **kwargs):
+        super().__init__(message)
+        for k, v in kwargs.items():
+            self.__setattr__(k, v)
+
+
+class GitRepositoryException(ReplicationException):
+    pass
+
+
+class UnsupportedZoneNameException(ReplicationException):
+    pass
+
+
+class Repository:
+    # TODO replication performance could potentially(*) be further improved by allowing to run multiple AXFR in
+    #  parallel, and then use a file lock to synchronize git file system actions
+    #  (*) but only if the signing server can sign multiple requests in parallel
+
+    _config = {
+        'user.email': 'api@desec.internal',
+        'user.name': 'deSEC API',
+    }
+
+    def __init__(self, path):
+        self.path = path
+
+    def _git(self, *args):
+        cmd = ['/usr/bin/git'] + list(args)
+        print('>>> ' + str(cmd))
+
+        with subprocess.Popen(
+                cmd,
+                bufsize=0,
+                cwd=self.path,
+                stderr=subprocess.PIPE,
+                stdout=subprocess.PIPE,
+                env={'HOME': '/'},  # Celery does not adjust $HOME when dropping privleges
+        ) as p:
+            rcode = p.wait()
+            stderr = p.stderr.read()
+            stdout = p.stdout.read()
+            try:
+                stderr, stdout = stderr.decode(), stdout.decode()
+            except UnicodeDecodeError:
+                GitRepositoryException('git stdout or stderr was not valid unicode!',
+                                       cmd=cmd, rcode=rcode, stderr=stderr, stdout=stdout)
+
+        print('\n'.join('<<< ' + s for s in stdout.split('\n')))
+        return cmd, rcode, stdout, stderr
+
+    def _git_do(self, *args):
+        cmd, rcode, stdout, stderr = self._git(*args)
+
+        if rcode != 0:
+            raise GitRepositoryException(f'{cmd} returned nonzero error code',
+                                         cmd=cmd, rcode=rcode, stdout=stdout, stderr=stderr)
+
+        if stderr.strip():
+            raise GitRepositoryException(f'{cmd} returned non-empty error output',
+                                         cmd=cmd, rcode=rcode, stdout=stdout, stderr=stderr)
+
+        return stdout
+
+    def _git_check(self, *args):
+        _, rcode, _, _ = self._git(*args)
+        return rcode
+
+    def commit_all(self, msg=None):
+        self._git_do('add', '.')
+        if self._git_check('diff', '--exit-code', '--numstat', '--staged'):
+            self._git_do('commit', '-m', msg or 'update')
+
+    def init(self):
+        self._git_do('init', '-b', 'main')
+        for k, v in self._config.items():
+            self._git_do('config', k, v)
+
+    def get_head(self):
+        return self.get_commit('HEAD')
+
+    def get_commit(self, rev):
+        try:
+            commit_hash, commit_msg = self._git_do('show', rev, '--format=%H%n%s', '-s').split('\n', 1)
+            return commit_hash, commit_msg[:-1]
+        except GitRepositoryException:
+            return None, None
+
+    def remove_history(self, before: datetime):
+        rev = self._git_do('log', f'--before={before.isoformat()}Z', '-1', '--format=%H')
+        with open(os.path.join(self.path, '.git', 'shallow'), 'w') as f:
+            f.writelines([rev])
+        self._git_do('reflog', 'expire', '--expire=now', '--all')
+        self._git_do('gc', '--prune=now')  # prune only
+        self._git_do('gc')  # remaining garbage collection (e.g. compressing file revisions)
+
+
+class ZoneRepository(Repository):
+    AXFR_SOURCE = '172.16.1.11'
+
+    def __init__(self, path):
+        super().__init__(path)
+        self._config['gc.auto'] = '0'
+        if not os.path.exists(os.path.join(self.path, '.git')):
+            self.init()
+            self.commit_all(msg='Inception or Recovery')
+            update_all.delay()
+
+    def refresh(self, name):
+        if '/' in name or '\x00' in name:
+            raise UnsupportedZoneNameException
+
+        # obtain AXFR
+        timeout = 60  # if AXFR take longer, the timeout must be increased (see also settings.py)
+        try:
+            xfr = list(dns.query.xfr(self.AXFR_SOURCE, name, timeout=timeout))
+        except dns.query.TransferError as e:
+            if e.rcode == dns.rcode.Rcode.NOTAUTH:
+                self._delete_zone(name)
+            else:
+                raise
+        else:
+            self._update_zone(name, xfr)
+
+    def _update_zone(self, name: str, xfr: List[dns.message.QueryMessage]):
+        z = dns.zone.from_xfr(xfr, check_origin=False)
+        try:
+            print(f'New SOA for {name}: '
+                  f'{z.get_rrset(name="", rdtype=dns.rdatatype.SOA).to_text()}')
+            print(f'         Signature: '
+                  f'{z.get_rrset(name="", rdtype=dns.rdatatype.RRSIG, covers=dns.rdatatype.SOA).to_text()}')
+        except AttributeError:
+            print(f'WARNING {name} has no SOA record?!')
+
+        # TODO sort AXFR? (but take care with SOA)
+        #  stable output can be achieved with
+        #  output = '\n'.join(sorted('\n'.split(z.to_text())))
+        #  but we need to see first if the frontend can handle this messed up zone file
+
+        # write zone file
+        filename = os.path.join(self.path, name + '.zone')
+        with open(filename + '~', 'w') as f:
+            f.write(f'; Generated by deSEC at {datetime.utcnow()}Z\n')  # TODO if sorting, remove this to avoid overhead
+            z.to_file(f)
+        os.rename(filename + '~', filename)
+
+    def _delete_zone(self, name: str):
+        os.remove(os.path.join(self.path, name + '.zone'))
+
+
+ZONE_REPOSITORY_PATH = '/zones'
+
+
+@shared_task(queue='replication')
+def update(name: str):
+    # TODO this task runs through following steps:
+    #  (1) retrieve AXFR  (dedyn.io 01/2021: 8.5s)
+    #  (2) parse AXFR     (dedyn.io 01/2021: 1.8s)
+    #  (3) write AXFR into zone file (dedyn.io 01/2021: 2.3s)
+    #  (4) commit into git repository  (dedyn.io 01/2021: 0.5s)
+    #  To enhance performance, steps 1-3 can be executed in parallel for multiple zones with multiprocessing.
+    #  Step 4, which takes 0.5s even for very large zones, can only be executed by a single worker, as
+    #  two parallel git commits will fail
+    print(f'updating {name}')
+    t = timezone.now()
+    zones = ZoneRepository(ZONE_REPOSITORY_PATH)
+    zones.refresh(name)
+    zones.commit_all(f'Update for {name}')
+    models.Domain.objects.filter(name=name).update(replicated=timezone.now(), replication_duration=timezone.now() - t)
+
+
+@shared_task(queue='replication', priority=9)
+def update_all():
+    names = models.Domain.objects.all().values_list('name', flat=True)
+    print(f'Queuing replication for all {len(names)} zones.')
+    for name in names:
+        update.s(name).apply_async(priority=1)
+
+
+@shared_task(queue='replication')
+def remove_history():
+    before = datetime.now() - timedelta(days=2)
+    print(f'Cleaning repo data from before {before}')
+    zones = ZoneRepository(ZONE_REPOSITORY_PATH)
+    zones.remove_history(before=before)

+ 28 - 2
api/desecapi/tests/base.py

@@ -17,6 +17,7 @@ from rest_framework.reverse import reverse
 from rest_framework.test import APITestCase, APIClient
 from rest_framework.utils import json
 
+from desecapi import replication
 from desecapi.models import User, Domain, Token, RRset, RR, psl, RR_SET_TYPES_AUTOMATIC, RR_SET_TYPES_UNSUPPORTED, \
     RR_SET_TYPES_MANAGEABLE
 
@@ -140,7 +141,8 @@ class AssertRequestsContextManager:
             else:
                 yield i
 
-    def __init__(self, test_case, expected_requests, single_expectation_single_request=True, expect_order=True):
+    def __init__(self, test_case, expected_requests, single_expectation_single_request=True, expect_order=True,
+                 exit_hook=None):
         """
         Initialize a context that checks for made HTTP requests.
 
@@ -158,6 +160,7 @@ class AssertRequestsContextManager:
         self.single_expectation_single_request = single_expectation_single_request
         self.expect_order = expect_order
         self.old_httpretty_entries = None
+        self.exit_hook = exit_hook
 
     def __enter__(self):
         hr_core.POTENTIAL_HTTP_PORTS.add(8081)  # FIXME should depend on self.expected_requests
@@ -176,6 +179,10 @@ class AssertRequestsContextManager:
         return None
 
     def __exit__(self, exc_type, exc_val, exc_tb):
+        # call exit hook
+        if callable(self.exit_hook):
+            self.exit_hook()
+
         # organize seen requests in a primitive data structure
         seen_requests = [
             (r.command, 'http://%s%s' % (r.headers['Host'], r.path), r.parsed_body) for r in httpretty.latest_requests
@@ -467,18 +474,24 @@ class MockPDNSTestCase(APITestCase):
             'priority': 1,  # avoid collision with DELETE zones/(?P<id>[^/]+)$ (httpretty does not match the method)
         }
 
-    def assertPdnsRequests(self, *expected_requests, expect_order=True):
+    def __init__(self, methodName: str = ...) -> None:
+        super().__init__(methodName)
+        self._mock_replication = None
+
+    def assertPdnsRequests(self, *expected_requests, expect_order=True, exit_hook=None):
         """
         Assert the given requests are made. To build requests, use the `MockPDNSTestCase.request_*` functions.
         Unmet expectations will fail the test.
         Args:
             *expected_requests: List of expected requests.
             expect_order: If True (default), the order of observed requests is checked.
+            exit_hook: If given a callable, it is called when the context manager exits.
         """
         return AssertRequestsContextManager(
             test_case=self,
             expected_requests=expected_requests,
             expect_order=expect_order,
+            exit_hook=exit_hook,
         )
 
     def assertPdnsNoRequestsBut(self, *expected_requests):
@@ -549,6 +562,9 @@ class MockPDNSTestCase(APITestCase):
                             for hashed in Token.objects.filter(user=user).values_list('key', flat=True)))
         self.assertEqual(len(Token.make_hash(plain).split('$')), 4)
 
+    def assertReplication(self, name):
+        replication.update.delay.assert_called_with(name)
+
     @classmethod
     def setUpTestData(cls):
         httpretty.enable(allow_net_connect=False)
@@ -580,6 +596,7 @@ class MockPDNSTestCase(APITestCase):
         httpretty.disable()
 
     def setUp(self):
+        # configure mocks for nslord
         def request_callback(r, _, response_headers):
             try:
                 request = json.loads(r.parsed_body)
@@ -618,6 +635,15 @@ class MockPDNSTestCase(APITestCase):
                     priority=-100,
                 )
 
+        # configure mocks for replication
+        self._mock_replication = mock.patch('desecapi.replication.update.delay', return_value=None, wraps=None)
+        self._mock_replication.start()
+
+    def tearDown(self) -> None:
+        if self._mock_replication:
+            self._mock_replication.stop()
+        super().tearDown()
+
 
 class DesecTestCase(MockPDNSTestCase):
     """

+ 10 - 4
api/desecapi/tests/test_pdns_change_tracker.py

@@ -19,10 +19,16 @@ class PdnsChangeTrackerTestCase(DesecTestCase):
         cls.full_domain = Domain.objects.create(owner=cls.user, name=cls.random_domain_name())
 
     def assertPdnsZoneUpdate(self, name, rr_sets):
-        return self.assertPdnsRequests([
-            self.request_pdns_zone_update_assert_body(name, rr_sets),
-            self.request_pdns_zone_axfr(name),
-        ])
+        def _assert_replication():
+            self.assertReplication(name)
+
+        return self.assertPdnsRequests(
+            [
+                self.request_pdns_zone_update_assert_body(name, rr_sets),
+                self.request_pdns_zone_axfr(name),
+            ],
+            exit_hook=_assert_replication,
+        )
 
     def test_rrset_does_not_exist_exception(self):
         tracker = PDNSChangeTracker()

+ 103 - 1
api/desecapi/tests/test_replication.py

@@ -1,13 +1,21 @@
 import json
+import os
+import random
+import string
+import time
+from datetime import datetime
+from tempfile import TemporaryDirectory
 
+from django.test import testcases
 from rest_framework import status
 
+from desecapi.replication import Repository
 from desecapi.tests.base import DesecTestCase
 
 
 class ReplicationTest(DesecTestCase):
     def test_serials(self):
-        url=self.reverse('v1:serial')
+        url = self.reverse('v1:serial')
         zones = [
             {'name': 'test.example.', 'edited_serial': 12345},
             {'name': 'example.org.', 'edited_serial': 54321},
@@ -32,3 +40,97 @@ class ReplicationTest(DesecTestCase):
 
             # Do not expect pdns request in next iteration (result will be cached)
             pdns_requests = []
+
+
+class RepositoryTest(testcases.TestCase):
+
+    def assertGit(self, path):
+        self.assertTrue(
+            os.path.exists(os.path.join(path, '.git')),
+            f'Expected a git repository at {path} but did not find .git subdirectory.'
+        )
+
+    def assertHead(self, repo, message=None, sha=None):
+        actual_sha, actual_message = repo.get_head()
+        if actual_sha is None:
+            self.fail(f'Expected HEAD to have commit message "{message}" and hash "{sha}", but repository has no '
+                      f'commits.')
+        if sha:
+            self.assertEqual(actual_sha, sha, f'Expected HEAD to have hash "{sha}" but had "{actual_sha}".')
+        if message:
+            self.assertIn(
+                message, actual_message,
+                f'Expected "{message}" to appear in the last commit message, but only found "{actual_message}".',
+            )
+
+    def assertHasCommit(self, repo: Repository, commit_id):
+        self.assertIsNotNone(
+            repo.get_commit(commit_id)[0], f'Expected repository to have commit {commit_id}, but it had not.'
+        )
+
+    def assertHasCommits(self, repo: Repository, commit_id_list):
+        for commit in commit_id_list:
+            self.assertHasCommit(repo, commit)
+
+    def assertHasNotCommit(self, repo: Repository, commit_id):
+        self.assertIsNone(
+            repo.get_commit(commit_id)[0], f'Expected repository to not have commit {commit_id}, but it had.'
+        )
+
+    def assertHasNotCommits(self, repo: Repository, commit_id_list):
+        for commit in commit_id_list:
+            self.assertHasNotCommit(repo, commit)
+
+    def assertNoCommits(self, repo: Repository):
+        head = repo.get_head()
+        self.assertEqual(head, (None, None), f'Expected that repository has no commits, but HEAD was {head}.')
+
+    @staticmethod
+    def _random_string(length):
+        return ''.join(random.choices(string.ascii_lowercase, k=length))
+
+    def _random_commit(self, repo: Repository, message=''):
+        with open(os.path.join(repo.path, self._random_string(16)), 'w') as f:
+            f.write(self._random_string(500))
+        repo.commit_all(message)
+        return repo.get_head()[0]
+
+    def _random_commits(self, num, repo: Repository, message=''):
+        return [self._random_commit(repo, message) for _ in range(num)]
+
+    def test_init(self):
+        with TemporaryDirectory() as path:
+            repo = Repository(path)
+            repo.init()
+            self.assertGit(path)
+
+    def test_commit(self):
+        with TemporaryDirectory() as path:
+            repo = Repository(path)
+            repo.init()
+            repo.commit_all('commit1')
+            self.assertNoCommits(repo)
+
+            with open(os.path.join(path, 'test_commit'), 'w') as f:
+                f.write('foo')
+
+            repo.commit_all('commit2')
+            self.assertHead(repo, message='commit2')
+
+    def test_remove_history(self):
+        with TemporaryDirectory() as path:
+            repo = Repository(path)
+            repo.init()
+
+            remove = self._random_commits(5, repo, 'to be removed')  # we're going to remove these 'old' commits
+            keep = self._random_commits(1, repo, 'anchor to be kept')  # as sync anchor, the last 'old' commit is kept
+            cutoff = datetime.now()
+            time.sleep(1)
+            keep += self._random_commits(5, repo, 'to be kept')  # we're going to keep these 'new' commits
+
+            self.assertHasCommits(repo, remove + keep)
+
+            repo.remove_history(before=cutoff)
+
+            self.assertHasCommits(repo, keep)
+            self.assertHasNotCommits(repo, remove)

+ 7 - 1
docker-compose.dev.yml

@@ -50,10 +50,16 @@ services:
       driver: "json-file"
 
   rabbitmq:
+    ports:
+    - "15672:15672"
+    logging:
+      driver: "json-file"
+
+  celery-email:
     logging:
       driver: "json-file"
 
-  celery:
+  celery-replication:
     logging:
       driver: "json-file"
 

+ 0 - 16
docker-compose.test-e2e.yml

@@ -5,14 +5,10 @@ services:
   www:
     environment:
     - DESECSTACK_E2E_TEST=TRUE # increase abuse limits and such
-    logging:
-      driver: "json-file"
 
   api:
     environment:
     - DESECSTACK_E2E_TEST=TRUE # increase abuse limits and such
-    logging:
-      driver: "json-file"
 
   nslord:
     networks:
@@ -20,16 +16,6 @@ services:
         ipv4_address: ${DESECSTACK_IPV4_REAR_PREFIX16}.0.129 # make nslord available for test-e2e
     environment:
     - DESECSTACK_NSLORD_CACHE_TTL=0
-    logging:
-      driver: "json-file"
-
-  nsmaster:
-    logging:
-      driver: "json-file"
-
-  memcached:
-    logging:
-      driver: "json-file"
 
   test-e2e:
     build: test/e2e
@@ -59,5 +45,3 @@ services:
     - "update.dedyn.${DESECSTACK_DOMAIN}:${DESECSTACK_IPV4_REAR_PREFIX16}.0.128"
     - "www.dedyn.${DESECSTACK_DOMAIN}:${DESECSTACK_IPV4_REAR_PREFIX16}.0.128"
     - "www.desec.${DESECSTACK_DOMAIN}:${DESECSTACK_IPV4_REAR_PREFIX16}.0.128"
-    logging:
-      driver: "json-file"

+ 42 - 0
docker-compose.test-e2e2.yml

@@ -11,6 +11,32 @@ services:
   api:
     environment:
     - DESECSTACK_E2E_TEST=TRUE # increase abuse limits and such
+    # faketime setup
+    - LD_PRELOAD=/lib/libfaketime.so
+    - FAKETIME_TIMESTAMP_FILE=/etc/faketime/faketime.rc
+    - FAKETIME_NO_CACHE=1
+    volumes:
+    - faketime:/etc/faketime/:ro
+
+  celery-email:
+    environment:
+    - DESECSTACK_E2E_TEST=TRUE # increase abuse limits and such
+    # faketime setup
+    - LD_PRELOAD=/lib/libfaketime.so
+    - FAKETIME_TIMESTAMP_FILE=/etc/faketime/faketime.rc
+    - FAKETIME_NO_CACHE=1
+    volumes:
+    - faketime:/etc/faketime/:ro
+
+  celery-replication:
+    environment:
+    - DESECSTACK_E2E_TEST=TRUE # increase abuse limits and such
+    # faketime setup
+    - LD_PRELOAD=/lib/libfaketime.so
+    - FAKETIME_TIMESTAMP_FILE=/etc/faketime/faketime.rc
+    - FAKETIME_NO_CACHE=1
+    volumes:
+    - faketime:/etc/faketime/:ro
 
   nslord:
     networks:
@@ -18,6 +44,15 @@ services:
         ipv4_address: ${DESECSTACK_IPV4_REAR_PREFIX16}.0.129 # make nslord available for test-e2e
     environment:
     - DESECSTACK_NSLORD_CACHE_TTL=0
+    # faketime setup
+    - LD_PRELOAD=/usr/lib/x86_64-linux-gnu/faketime/libfaketime.so.1
+    - FAKETIME_TIMESTAMP_FILE=/etc/faketime/faketime.rc
+    - FAKETIME_NO_CACHE=1
+    ports:
+    - "5311:53"
+    - "5311:53/udp"
+    volumes:
+    - faketime:/etc/faketime/:ro
 
   test-e2e2:
     build: test/e2e2
@@ -29,8 +64,14 @@ services:
     - DESECSTACK_IPV6_SUBNET
     - DESECSTACK_IPV6_ADDRESS
     - DESECSTACK_NSLORD_DEFAULT_TTL
+    # faketime setup
+    - LD_PRELOAD=/lib/libfaketime.so
+    - FAKETIME_TIMESTAMP_FILE=/etc/faketime/faketime.rc
+    - FAKETIME_NO_CACHE=1
     volumes:
     - autocert:/autocert/:ro
+    - zones:/zones:ro
+    - faketime:/etc/faketime/:rw
     mac_address: 06:42:ac:10:00:7f
     depends_on:
     - www
@@ -53,3 +94,4 @@ services:
 
 volumes:
   autocert:
+  faketime:

+ 56 - 4
docker-compose.yml

@@ -120,10 +120,13 @@ services:
     - dbapi
     - nslord
     - nsmaster
-    - celery
+    - celery-email
+    - celery-replication
     - memcached
     tmpfs:
     - /var/local/django_metrics:size=500m
+    volumes:
+    - zones:/zones:rw
     environment:
     - DESECSTACK_DOMAIN
     - DESECSTACK_NS
@@ -242,14 +245,17 @@ services:
         tag: "desec/rabbitmq"
     restart: unless-stopped
 
-  celery:
+  celery-email:
     build: api
     image: desec/dedyn-api:latest
     init: true
-    command: celery -A api worker -l info --uid nobody --gid nogroup
+    command: celery -A api worker -Q email_slow_lane,email_fast_lane,email_immediate_lane -c 8 -n email -l info --uid nobody --gid nogroup
     depends_on:
     - dbapi
     - nslord
+    - rabbitmq
+    volumes:
+    - zones:/zones:rw
     environment:
     - DESECSTACK_DOMAIN
     - DESECSTACK_NS
@@ -269,13 +275,57 @@ services:
     - DESECSTACK_NSLORD_DEFAULT_TTL
     - DESECSTACK_NSMASTER_APIKEY
     - DESECSTACK_MINIMUM_TTL_DEFAULT
+    - DJANGO_SETTINGS_MODULE=api.settings
     networks:
     - rearapi_celery
     - rearapi_dbapi
     logging:
       driver: "syslog"
       options:
-        tag: "desec/celery"
+        tag: "desec/celery-email"
+    restart: unless-stopped
+
+  celery-replication:
+    build: api
+    image: desec/dedyn-api:latest
+    init: true
+    command: celery -A api worker -E -B -s /var/run/celerybeat-schedule/db -Q replication -n replication -c 1 -l info --uid nobody --gid nogroup
+    depends_on:
+    - dbapi
+    - nslord
+    - rabbitmq
+    volumes:
+    - zones:/zones:rw
+    - celerybeat:/var/run/celerybeat-schedule
+    environment:
+    - DESECSTACK_DOMAIN
+    - DESECSTACK_NS
+    - DESECSTACK_API_ADMIN
+    - DESECSTACK_API_SEPA_CREDITOR_ID
+    - DESECSTACK_API_SEPA_CREDITOR_NAME
+    - DESECSTACK_API_EMAIL_HOST
+    - DESECSTACK_API_EMAIL_HOST_USER
+    - DESECSTACK_API_EMAIL_HOST_PASSWORD
+    - DESECSTACK_API_EMAIL_PORT
+    - DESECSTACK_API_SECRETKEY
+    - DESECSTACK_API_PSL_RESOLVER
+    - DESECSTACK_DBAPI_PASSWORD_desec
+    - DESECSTACK_IPV4_REAR_PREFIX16
+    - DESECSTACK_IPV6_SUBNET
+    - DESECSTACK_NSLORD_APIKEY
+    - DESECSTACK_NSLORD_DEFAULT_TTL
+    - DESECSTACK_NSMASTER_APIKEY
+    - DESECSTACK_MINIMUM_TTL_DEFAULT
+    - DJANGO_SETTINGS_MODULE=api.settings
+    networks:
+      rearapi_celery:
+      rearapi_dbapi:
+      rearapi_ns:
+        ipv4_address: ${DESECSTACK_IPV4_REAR_PREFIX16}.1.13
+    logging:
+      driver: "syslog"
+      options:
+        tag: "desec/celery-replication"
     restart: unless-stopped
 
   memcached:
@@ -365,6 +415,8 @@ volumes:
   prometheus:
   rabbitmq_data:
   webapp_dist:
+  zones:
+  celerybeat:
 
 networks:
   # Note that it is required that the front network ranks lower (in lexical order)

+ 1 - 0
nslord/Dockerfile

@@ -4,6 +4,7 @@ RUN apt-get update && apt-get install -y \
 		dnsutils \
 		net-tools \
 		dirmngr gnupg \
+		faketime \
 	--no-install-recommends && apt-get clean && rm -rf /var/lib/apt/lists/*
 
 RUN echo 'deb [arch=amd64] http://repo.powerdns.com/ubuntu bionic-auth-44 main' \

+ 1 - 1
nslord/conf/pdns.conf.var

@@ -1,4 +1,4 @@
-allow-axfr-ips=${DESECSTACK_IPV4_REAR_PREFIX16}.1.12
+allow-axfr-ips=${DESECSTACK_IPV4_REAR_PREFIX16}.1.12,${DESECSTACK_IPV4_REAR_PREFIX16}.1.13
 api=yes
 api-key=${DESECSTACK_NSLORD_APIKEY}
 default-api-rectify=no

+ 7 - 2
test/e2e2/Dockerfile

@@ -1,4 +1,9 @@
-FROM python:3.8
+FROM python:3.8-alpine
+
+RUN apk add --no-cache bash curl
+
+COPY --from=trajano/alpine-libfaketime /faketime.so /lib/libfaketime.so
+RUN mkdir -p /etc/faketime
 
 RUN mkdir /e2e
 WORKDIR /e2e
@@ -9,4 +14,4 @@ COPY apiwait .
 COPY *.py .
 COPY ./spec .
 
-CMD ./apiwait 45 && python3 -m pytest -vv .
+CMD ./apiwait 300 && python3 -m pytest -vv .

+ 137 - 31
test/e2e2/conftest.py

@@ -3,16 +3,25 @@ import os
 import random
 import re
 import string
+import time
+import warnings
+from datetime import datetime
 from json import JSONDecodeError
-from typing import Optional, Tuple, Iterable, Callable
+from typing import Optional, Tuple, Iterable
 
 import dns
 import dns.name
 import dns.query
 import dns.rdtypes.svcbbase
+import dns.zone
 import pytest
 import requests
 from requests.exceptions import SSLError
+from urllib3.exceptions import InsecureRequestWarning
+
+
+def tsprint(s, *args, **kwargs):
+    print(f"{datetime.now().strftime('%d-%b (%H:%M:%S)')} {s}", *args, **kwargs)
 
 
 def _strip_quotes_decorator(func):
@@ -37,27 +46,23 @@ def random_mixed_case_string(n):
     return ''.join(s)
 
 
-@pytest.fixture()
-def random_email() -> Callable[[], str]:
-    return lambda: f'{random_mixed_case_string(10)}@{random_mixed_case_string(10)}.desec.test'
+def random_email() -> str:
+    return f'{random_mixed_case_string(10)}@{random_mixed_case_string(10)}.desec.test'
 
 
-@pytest.fixture()
-def random_password() -> Callable[[], str]:
-    return lambda: "".join(random.choice(string.ascii_letters) for _ in range(16))
+def random_password() -> str:
+    return "".join(random.choice(string.ascii_letters) for _ in range(16))
 
 
-@pytest.fixture()
-def random_domainname() -> Callable[[], str]:
-    return lambda: (
+def random_domainname() -> str:
+    return (
         "".join(random.choice(string.ascii_lowercase) for _ in range(16))
         + ".test"
     )
 
 
-@pytest.fixture()
-def random_local_public_suffix_domainname() -> Callable[[], str]:
-    return lambda: (
+def random_local_public_suffix_domainname() -> str:
+    return (
         "".join(random.choice(string.ascii_lowercase) for _ in range(16))
         + ".dedyn."
         + os.environ['DESECSTACK_DOMAIN']
@@ -106,12 +111,20 @@ class DeSECAPIV1Client:
 
     def _do_request(self, *args, **kwargs):
         verify_list = [self.verify] + self.verify_alt
+        # do not verify SSL if we're in faketime (cert will be expired!?)
+        if faketime_get() != '+0d':
+            verify_list = [False]
+
         exc = None
         for verify in verify_list:
             try:
-                reply = requests.request(*args, **kwargs, verify=verify)
+                with warnings.catch_warnings():
+                    if verify_list == [False]:
+                        # Supress insecurity warning if we do not want to verify
+                        warnings.filterwarnings('ignore', category=InsecureRequestWarning)
+                    reply = requests.request(*args, **kwargs, verify=verify)
             except SSLError as e:
-                print(f'API <<< SSL could not verify against "{verify}"')
+                tsprint(f'API <<< SSL could not verify against "{verify}"')
                 exc = e
             else:
                 # note verification preference for next time
@@ -119,7 +132,7 @@ class DeSECAPIV1Client:
                 self.verify_alt = verify_list
                 self.verify_alt.remove(self.verify)
                 return reply
-        print(f'API <<< SSL could not be verified against any verification method')
+        tsprint(f'API <<< SSL could not be verified against any verification method')
         raise exc
 
     def _request(self, method: str, *, path: str, data: Optional[dict] = None, **kwargs) -> requests.Response:
@@ -128,9 +141,9 @@ class DeSECAPIV1Client:
 
         url = self.base_url + path if re.match(r'^https?://', path) is None else path
 
-        print(f"API >>> {method} {url}")
+        tsprint(f"API >>> {method} {url}")
         if data:
-            print(f"API >>> {type(data)}: {data}")
+            tsprint(f"API >>> {type(data)}: {self._shorten(data)}")
 
         response = self._do_request(
             method,
@@ -140,21 +153,31 @@ class DeSECAPIV1Client:
             **kwargs,
         )
 
-        print(f"API <<< {response.status_code}")
+        tsprint(f"API <<< {response.status_code}")
         if response.text:
             try:
-                print(f"API <<< {self._filter_response_output(response.json())}")
+                tsprint(f"API <<< {self._shorten(str(self._filter_response_output(response.json())))}")
             except JSONDecodeError:
-                print(f"API <<< {response.text}")
+                tsprint(f"API <<< {response.text}")
 
         return response
 
+    @staticmethod
+    def _shorten(s: str):
+        if len(s) < 200:
+            return s
+        else:
+            return s[:50] + '...' + s[-50:]
+
     def get(self, path: str, **kwargs) -> requests.Response:
         return self._request("GET", path=path, **kwargs)
 
     def post(self, path: str, data: Optional[dict] = None, **kwargs) -> requests.Response:
         return self._request("POST", path=path, data=data, **kwargs)
 
+    def patch(self, path: str, data: Optional[dict] = None, **kwargs) -> requests.Response:
+        return self._request("PATCH", path=path, data=data, **kwargs)
+
     def delete(self, path: str, **kwargs) -> requests.Response:
         return self._request("DELETE", path=path, **kwargs)
 
@@ -183,6 +206,11 @@ class DeSECAPIV1Client:
         token = response.json().get('token')
         if token is not None:
             self.headers["Authorization"] = f'Token {response.json()["token"]}'
+            self.patch(  # make token last forever
+                f"/auth/tokens/{response.json().get('id')}/",
+                data={'max_unused_period': None, 'max_age': None}
+            )
+
         return response
 
     def domain_list(self) -> requests.Response:
@@ -214,6 +242,9 @@ class DeSECAPIV1Client:
             }
         )
 
+    def rr_set_create_bulk(self, domain_name: str, data: list) -> requests.Response:
+        return self.patch(f"/domains/{domain_name}/rrsets/", data=data)
+
     def rr_set_delete(self, domain_name: str, rr_type: str, subname: str = '') -> requests.Response:
         return self.delete(f"/domains/{domain_name}/rrsets/{subname}.../{rr_type}/")
 
@@ -246,7 +277,7 @@ def api_anon() -> DeSECAPIV1Client:
 
 
 @pytest.fixture()
-def api_user(random_email, random_password) -> DeSECAPIV1Client:
+def api_user() -> DeSECAPIV1Client:
     """
     Access to the API with a fresh user account (zero domains, one token). Authorization header
     is preconfigured, email address and password are randomly chosen.
@@ -260,7 +291,7 @@ def api_user(random_email, random_password) -> DeSECAPIV1Client:
 
 
 @pytest.fixture()
-def api_user_domain(api_user, random_domainname) -> DeSECAPIV1Client:
+def api_user_domain(api_user) -> DeSECAPIV1Client:
     """
     Access to the API with a fresh user account that owns a domain with random name. The domain has
     no records other than the default ones.
@@ -272,22 +303,23 @@ def api_user_domain(api_user, random_domainname) -> DeSECAPIV1Client:
 class NSClient:
     where = None
 
-    def query(self, qname: str, qtype: str):
-        print(f'DNS >>> {qname}/{qtype} @{self.where}')
+    @classmethod
+    def query(cls, qname: str, qtype: str):
+        tsprint(f'DNS >>> {qname}/{qtype} @{cls.where}')
         qname = dns.name.from_text(qname)
         qtype = dns.rdatatype.from_text(qtype)
         answer = dns.query.tcp(
             q=dns.message.make_query(qname, qtype),
-            where=self.where,
+            where=cls.where,
             timeout=2
         )
         try:
             section = dns.message.AUTHORITY if qtype == dns.rdatatype.from_text('NS') else dns.message.ANSWER
             response = answer.find_rrset(section, qname, dns.rdataclass.IN, qtype)
-            print(f'DNS <<< {response}')
+            tsprint(f'DNS <<< {response}')
             return {i.to_text() for i in response.items}
         except KeyError:
-            print('DNS <<< !!! not found !!! Complete Answer below:\n' + answer.to_text())
+            tsprint('DNS <<< !!! not found !!! Complete Answer below:\n' + answer.to_text())
             return {}
 
 
@@ -295,6 +327,80 @@ class NSLordClient(NSClient):
     where = os.environ["DESECSTACK_IPV4_REAR_PREFIX16"] + '.0.129'
 
 
-@pytest.fixture()
-def ns_lord() -> NSLordClient:
-    return NSLordClient()
+def query_replication(zone: str, qname: str, qtype: str, covers: str = None):
+    if qtype == 'RRSIG':
+        assert covers, 'If querying RRSIG, covers parameter must be set to a RR type, e.g. SOA.'
+    else:
+        assert not covers
+        covers = dns.rdatatype.NONE
+
+    zonefile = os.path.join('/zones', zone + '.zone')
+    zone = dns.name.from_text(zone, origin=dns.name.root)
+    qname = dns.name.from_text(qname, origin=zone)
+
+    if not os.path.exists(zonefile):
+        tsprint(f'RPL <<< Zone file for {zone} not found '
+                f'(number of zones: {len(list(filter(lambda f: f.endswith(".zone"), os.listdir("/zones"))))})')
+        return None
+
+    try:
+        tsprint(f'RPL >>> {qname}/{qtype} in {zone}')
+        z = dns.zone.from_file(f=zonefile, origin=zone, relativize=False)
+        v = {i.to_text() for i in z.find_rrset(qname, qtype, covers=covers).items}
+        tsprint(f'RPL <<< {v}')
+        return v
+    except KeyError:
+        tsprint(f'RPL <<< RR Set {qname}/{qtype} not found')
+        return {}
+    except dns.zone.NoSOA:
+        tsprint(f'RPL <<< Zone {zone} not found')
+        return None
+
+
+def return_eventually(expression: callable, min_pause: float = .1, max_pause: float = 2, timeout: float = 5,
+                      retry_on: Tuple[type] = (Exception,)):
+    if not callable(expression):
+        raise ValueError('Expression given not callable. Did you forget "lambda:"?')
+
+    wait = min_pause
+    started = datetime.now()
+    while True:
+        try:
+            return expression()
+        except retry_on as e:
+            if (datetime.now() - started).total_seconds() > timeout:
+                tsprint(f'{expression.__code__} failed with {e}, no more retries')
+                raise e
+            time.sleep(wait)
+            wait = min(2 * wait, max_pause)
+
+
+def assert_eventually(assertion: callable, min_pause: float = .1, max_pause: float = 2, timeout: float = 5) -> None:
+    def _assert():
+        assert assertion()
+    return_eventually(_assert, min_pause, max_pause, timeout, retry_on=(AssertionError,))
+
+
+def faketime(t: str):
+    print('FAKETIME', t)
+    with open('/etc/faketime/faketime.rc', 'w') as f:
+        f.write(t + '\n')
+
+
+def faketime_get():
+    try:
+        with open('/etc/faketime/faketime.rc', 'r') as f:
+            return f.readline().strip()
+    except FileNotFoundError:
+        return '+0d'
+
+
+def faketime_add(days: int):
+    assert days >= 0
+
+    current_faketime = faketime_get()
+    assert current_faketime[0] == '+'
+    assert current_faketime[-1] == 'd'
+    current_days = int(current_faketime[1:-1])
+
+    faketime(f'+{current_days + days:n}d')

+ 2 - 2
test/e2e2/spec/test_api_domains.py

@@ -1,7 +1,7 @@
-from conftest import DeSECAPIV1Client
+from conftest import DeSECAPIV1Client, random_domainname
 
 
-def test_create(api_user: DeSECAPIV1Client, random_domainname):
+def test_create(api_user: DeSECAPIV1Client):
     assert len(api_user.domain_list()) == 0
     assert api_user.domain_create(random_domainname()).status_code == 201
     assert len(api_user.domain_list()) == 1

+ 17 - 13
test/e2e2/spec/test_api_rr_validation.py → test/e2e2/spec/test_api_rr.py

@@ -2,7 +2,7 @@ from typing import List, Tuple
 
 import pytest
 
-from conftest import DeSECAPIV1Client, NSClient
+from conftest import DeSECAPIV1Client, query_replication, NSLordClient, assert_eventually
 
 
 def generate_params(dict_value_lists_by_type: dict) -> List[Tuple[str, str]]:
@@ -347,7 +347,7 @@ def test_soundness():
 
 
 @pytest.mark.parametrize("rr_type,value", generate_params(VALID_RECORDS_CANONICAL))
-def test_create_valid_canonical(api_user_domain: DeSECAPIV1Client, ns_lord: NSClient, rr_type: str, value: str):
+def test_create_valid_canonical(api_user_domain: DeSECAPIV1Client, rr_type: str, value: str):
     domain_name = api_user_domain.domain
     expected = set()
     subname = 'a'
@@ -357,12 +357,13 @@ def test_create_valid_canonical(api_user_domain: DeSECAPIV1Client, ns_lord: NSCl
     if value is not None:
         assert api_user_domain.rr_set_create(domain_name, rr_type, [value], subname=subname).status_code == 201
         expected.add(value)
-    rrset = ns_lord.query(f'{subname}.{domain_name}'.strip('.'), rr_type)
+    rrset = NSLordClient.query(f'{subname}.{domain_name}'.strip('.'), rr_type)
     assert rrset == expected
+    assert_eventually(lambda: query_replication(domain_name, subname, rr_type) == expected)
 
 
 @pytest.mark.parametrize("rr_type,value", generate_params(VALID_RECORDS_NON_CANONICAL))
-def test_create_valid_non_canonical(api_user_domain: DeSECAPIV1Client, ns_lord: NSClient, rr_type: str, value: str):
+def test_create_valid_non_canonical(api_user_domain: DeSECAPIV1Client, rr_type: str, value: str):
     domain_name = api_user_domain.domain
     expected = set()
     subname = 'a'
@@ -372,8 +373,9 @@ def test_create_valid_non_canonical(api_user_domain: DeSECAPIV1Client, ns_lord:
     if value is not None:
         assert api_user_domain.rr_set_create(domain_name, rr_type, [value], subname=subname).status_code == 201
         expected.add(value)
-    rrset = ns_lord.query(f'{subname}.{domain_name}'.strip('.'), rr_type)
+    rrset = NSLordClient.query(f'{subname}.{domain_name}'.strip('.'), rr_type)
     assert len(rrset) == len(expected)
+    assert_eventually(lambda: len(query_replication(domain_name, subname, rr_type)) == len(expected))
 
 
 @pytest.mark.parametrize("rr_type,value", INVALID_RECORDS_PARAMS)
@@ -381,22 +383,24 @@ def test_create_invalid(api_user_domain: DeSECAPIV1Client, rr_type: str, value:
     assert api_user_domain.rr_set_create(api_user_domain.domain, rr_type, [value]).status_code == 400
 
 
-def test_create_long_subname(api_user_domain: DeSECAPIV1Client, ns_lord: NSClient):
-    assert api_user_domain.rr_set_create(api_user_domain.domain, "AAAA", ["::1"], subname="a"*63).status_code == 201
-    assert ns_lord.query(f"{'a'*63}.{api_user_domain.domain}", "AAAA") == {"::1"}
+def test_create_long_subname(api_user_domain: DeSECAPIV1Client):
+    subname = 'a' * 63
+    assert api_user_domain.rr_set_create(api_user_domain.domain, "AAAA", ["::1"], subname=subname).status_code == 201
+    assert NSLordClient.query(f"{subname}.{api_user_domain.domain}", "AAAA") == {"::1"}
+    assert_eventually(lambda: query_replication(api_user_domain.domain, subname, "AAAA") == {"::1"})
 
 
-def test_add_remove_DNSKEY(api_user_domain: DeSECAPIV1Client, ns_lord: NSClient):
+def test_add_remove_DNSKEY(api_user_domain: DeSECAPIV1Client):
     domain_name = api_user_domain.domain
     auto_dnskeys = api_user_domain.get_key_params(domain_name, 'DNSKEY')
 
     # After adding another DNSKEY, we expect it to be part of the nameserver's response (along with the automatic ones)
     value = '257 3 13 aCoEWYBBVsP9Fek2oC8yqU8ocKmnS1iD SFZNORnQuHKtJ9Wpyz+kNryquB78Pyk/ NTEoai5bxoipVQQXzHlzyg=='
     assert api_user_domain.rr_set_create(domain_name, 'DNSKEY', [value], subname='').status_code == 201
-    rrset = ns_lord.query(f'{domain_name}'.strip('.'), 'DNSKEY')
-    assert rrset == auto_dnskeys | {value}
+    assert NSLordClient.query(domain_name, 'DNSKEY') == auto_dnskeys | {value}
+    assert_eventually(lambda: query_replication(domain_name, '', 'DNSKEY') == auto_dnskeys | {value})
 
     # After deleting it, we expect that the automatically managed ones are still there
     assert api_user_domain.rr_set_delete(domain_name, "DNSKEY", subname='').status_code == 204
-    rrset = ns_lord.query(f'{domain_name}'.strip('.'), 'DNSKEY')
-    assert rrset == auto_dnskeys
+    assert NSLordClient.query(domain_name, 'DNSKEY') == auto_dnskeys
+    assert_eventually(lambda: query_replication(domain_name, '', 'DNSKEY') == auto_dnskeys)

+ 75 - 0
test/e2e2/spec/test_replication.py

@@ -0,0 +1,75 @@
+from conftest import DeSECAPIV1Client, return_eventually, query_replication, random_domainname, assert_eventually, \
+    faketime_add
+
+some_ds_records = [
+    '60604 8 1 ef66f772935b412376c8445c4442b802b0322814',
+    '60604 8 2 c2739629145faaf464ff1bc65612fd1eb5766e80c96932d808edfb55d1e1f2ce',
+    '60604 8 4 5943dac4fc4aad637445f483b0f43bd4152fab19250fd26df82bf12020a7f7101caa17e723cf433f43d2bbed11231e03',
+]
+
+
+def test_signature_rotation(api_user_domain: DeSECAPIV1Client):
+    name = random_domainname()
+    api_user_domain.domain_create(name)
+    rrsig = return_eventually(lambda: query_replication(name, "", 'RRSIG', covers='SOA'), timeout=20)
+    faketime_add(days=7)
+    assert_eventually(lambda: rrsig != query_replication(name, "", 'RRSIG', covers='SOA'), timeout=60)
+
+
+def test_zone_deletion(api_user_domain: DeSECAPIV1Client):
+    name = api_user_domain.domain
+    assert_eventually(lambda: query_replication(name, "", 'SOA') is not None, timeout=20)
+    api_user_domain.domain_destroy(name)
+    assert_eventually(lambda: query_replication(name, "", 'SOA') is None, timeout=20)
+
+
+def test_signature_rotation_performance(api_user_domain: DeSECAPIV1Client):
+    root_domain = api_user_domain.domain
+
+    # test configuration
+    bulk_block_size = 500
+    domain_sizes = {
+        # number of delegations: number of zones
+        2000: 1,
+        1000: 2,
+        10: 10,
+    }
+
+    # create test domains
+    domain_names = {
+        num_delegations: [random_domainname() + f'.num-ds-{num_delegations}.' + root_domain for _ in range(num_zones)]
+        for num_delegations, num_zones in domain_sizes.items()
+    }
+    for num_delegations, names in domain_names.items():
+        for name in names:
+            # create a domain with name `name` and `num_delegations` delegations
+            api_user_domain.domain_create(name)
+            for a in range(0, num_delegations, bulk_block_size):  # run block-wise to avoid exceeding max request size
+                r = api_user_domain.rr_set_create_bulk(
+                    name,
+                    [
+                        {"subname": f'x{i}', "type": "DS", "ttl": 3600, "records": some_ds_records}
+                        for i in range(a, a + bulk_block_size)
+                    ] + [
+                        {"subname": f'x{i}', "type": "NS", "ttl": 3600, "records": ['ns1.test.', 'ns2.test.']}
+                        for i in range(a, a + bulk_block_size)
+                    ]
+                )
+                assert r.status_code == 200
+
+    # retrieve all SOA RRSIGs
+    soa_rrsig = {}
+    for names in domain_names.values():
+        for name in names:
+            soa_rrsig[name] = return_eventually(lambda: query_replication(name, "", 'RRSIG', covers='SOA'), timeout=20)
+
+    # rotate signatures
+    faketime_add(7)
+
+    # assert SOA RRSIG has been updated
+    for names in domain_names.values():
+        for name in names:
+            assert_eventually(
+                lambda: soa_rrsig[name] != query_replication(name, "", 'RRSIG', covers='SOA'),
+                timeout=600,  # depending on number of domains in the database, this value requires increase
+            )