123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136 |
- import json
- import os
- import random
- import string
- import time
- from datetime import datetime
- from tempfile import TemporaryDirectory
- from django.test import testcases
- from rest_framework import status
- from desecapi.replication import Repository
- from desecapi.tests.base import DesecTestCase
- class ReplicationTest(DesecTestCase):
- def test_serials(self):
- url = self.reverse('v1:serial')
- zones = [
- {'name': 'test.example.', 'edited_serial': 12345},
- {'name': 'example.org.', 'edited_serial': 54321},
- ]
- serials = {zone['name']: zone['edited_serial'] for zone in zones}
- pdns_requests = [{
- 'method': 'GET',
- 'uri': self.get_full_pdns_url(r'/zones', ns='MASTER'),
- 'status': 200,
- 'body': json.dumps(zones),
- }]
- # Run twice to make sure cache output varies on remote address
- for i in range(2):
- response = self.client.get(path=url, REMOTE_ADDR='123.8.0.2')
- self.assertStatus(response, status.HTTP_401_UNAUTHORIZED)
- with self.assertPdnsRequests(pdns_requests):
- response = self.client.get(path=url, REMOTE_ADDR='10.8.0.2')
- self.assertStatus(response, status.HTTP_200_OK)
- self.assertEqual(response.data, serials)
- # Do not expect pdns request in next iteration (result will be cached)
- pdns_requests = []
- class RepositoryTest(testcases.TestCase):
- def assertGit(self, path):
- self.assertTrue(
- os.path.exists(os.path.join(path, '.git')),
- f'Expected a git repository at {path} but did not find .git subdirectory.'
- )
- def assertHead(self, repo, message=None, sha=None):
- actual_sha, actual_message = repo.get_head()
- if actual_sha is None:
- self.fail(f'Expected HEAD to have commit message "{message}" and hash "{sha}", but repository has no '
- f'commits.')
- if sha:
- self.assertEqual(actual_sha, sha, f'Expected HEAD to have hash "{sha}" but had "{actual_sha}".')
- if message:
- self.assertIn(
- message, actual_message,
- f'Expected "{message}" to appear in the last commit message, but only found "{actual_message}".',
- )
- def assertHasCommit(self, repo: Repository, commit_id):
- self.assertIsNotNone(
- repo.get_commit(commit_id)[0], f'Expected repository to have commit {commit_id}, but it had not.'
- )
- def assertHasCommits(self, repo: Repository, commit_id_list):
- for commit in commit_id_list:
- self.assertHasCommit(repo, commit)
- def assertHasNotCommit(self, repo: Repository, commit_id):
- self.assertIsNone(
- repo.get_commit(commit_id)[0], f'Expected repository to not have commit {commit_id}, but it had.'
- )
- def assertHasNotCommits(self, repo: Repository, commit_id_list):
- for commit in commit_id_list:
- self.assertHasNotCommit(repo, commit)
- def assertNoCommits(self, repo: Repository):
- head = repo.get_head()
- self.assertEqual(head, (None, None), f'Expected that repository has no commits, but HEAD was {head}.')
- @staticmethod
- def _random_string(length):
- return ''.join(random.choices(string.ascii_lowercase, k=length))
- def _random_commit(self, repo: Repository, message=''):
- with open(os.path.join(repo.path, self._random_string(16)), 'w') as f:
- f.write(self._random_string(500))
- repo.commit_all(message)
- return repo.get_head()[0]
- def _random_commits(self, num, repo: Repository, message=''):
- return [self._random_commit(repo, message) for _ in range(num)]
- def test_init(self):
- with TemporaryDirectory() as path:
- repo = Repository(path)
- repo.init()
- self.assertGit(path)
- def test_commit(self):
- with TemporaryDirectory() as path:
- repo = Repository(path)
- repo.init()
- repo.commit_all('commit1')
- self.assertNoCommits(repo)
- with open(os.path.join(path, 'test_commit'), 'w') as f:
- f.write('foo')
- repo.commit_all('commit2')
- self.assertHead(repo, message='commit2')
- def test_remove_history(self):
- with TemporaryDirectory() as path:
- repo = Repository(path)
- repo.init()
- remove = self._random_commits(5, repo, 'to be removed') # we're going to remove these 'old' commits
- keep = self._random_commits(1, repo, 'anchor to be kept') # as sync anchor, the last 'old' commit is kept
- cutoff = datetime.now()
- time.sleep(1)
- keep += self._random_commits(5, repo, 'to be kept') # we're going to keep these 'new' commits
- self.assertHasCommits(repo, remove + keep)
- repo.remove_history(before=cutoff)
- self.assertHasCommits(repo, keep)
- self.assertHasNotCommits(repo, remove)
|