瀏覽代碼

feat(api): management command for abuse cases

Nils Wisiol 3 年之前
父節點
當前提交
eab2d367c5
共有 2 個文件被更改,包括 78 次插入0 次删除
  1. 21 0
      api/desecapi/management/commands/stop-abuse.py
  2. 57 0
      api/desecapi/tests/test_stop_abuse.py

+ 21 - 0
api/desecapi/management/commands/stop-abuse.py

@@ -0,0 +1,21 @@
+from django.core.management import BaseCommand
+
+from desecapi.models import RRset, Domain, User
+from desecapi.pdns_change_tracker import PDNSChangeTracker
+
+
+class Command(BaseCommand):
+    help = 'Removes all DNS records from the given domain and suspends the associated user'
+
+    def add_arguments(self, parser):
+        parser.add_argument('domain-name', nargs='*', help='Domain(s) to remove all DNS records from')
+
+    def handle(self, *args, **options):
+        with PDNSChangeTracker():
+            domains = Domain.objects.filter(name__in=options['domain-name'])
+            users = User.objects.filter(domains__name__in=options['domain-name'])
+            rrsets = RRset.objects.filter(domain__name__in=options['domain-name']).exclude(type='NS', subname='')
+            print(f'Deleting {rrsets.count()} RRset(s) from {domains.count()} domain(s); '
+                  f'disabling {users.count()} associated user account(s).')
+            rrsets.delete()
+        users.update(is_active=False)

+ 57 - 0
api/desecapi/tests/test_stop_abuse.py

@@ -0,0 +1,57 @@
+from django.core import management
+
+from desecapi import models
+from desecapi.tests.base import DomainOwnerTestCase
+
+
+class StopAbuseCommandTest(DomainOwnerTestCase):
+
+    @classmethod
+    def setUpTestDataWithPdns(cls):
+        super().setUpTestDataWithPdns()
+        cls.create_rr_set(cls.my_domains[1], ['127.0.0.1', '127.0.1.1'], type='A', ttl=123)
+        cls.create_rr_set(cls.other_domains[1], ['40.1.1.1', '40.2.2.2'], type='A', ttl=456)
+        for d in cls.my_domains + cls.other_domains:
+            cls.create_rr_set(d, ['ns1.example.', 'ns2.example.'], type='NS', ttl=456)
+            cls.create_rr_set(d, ['ns1.example.', 'ns2.example.'], type='NS', ttl=456, subname='subname')
+            cls.create_rr_set(d, ['"foo"'], type='TXT', ttl=456)
+
+    def test_noop(self):
+        # test implicit by absence assertPdnsRequests
+        management.call_command('stop-abuse')
+
+    def test_remove_rrsets(self):
+        with self.assertPdnsRequests(self.requests_desec_rr_sets_update(name=self.my_domain.name)):
+            management.call_command('stop-abuse', self.my_domain)
+        self.assertEqual(models.RRset.objects.filter(domain__name=self.my_domain.name).count(), 1)  # only NS left
+
+    def test_disable_user(self):
+        with self.assertPdnsRequests(self.requests_desec_rr_sets_update(name=self.my_domain.name)):
+            management.call_command('stop-abuse', self.my_domain)
+        self.owner.refresh_from_db()
+        self.assertEqual(self.owner.is_active, False)
+
+    def test_keep_other_owned_domains(self):
+        with self.assertPdnsRequests(self.requests_desec_rr_sets_update(name=self.my_domain.name)):
+            management.call_command('stop-abuse', self.my_domain)
+        self.assertGreater(models.RRset.objects.filter(domain__name=self.my_domains[1].name).count(), 1)
+
+    def test_only_disable_owner(self):
+        with self.assertPdnsRequests(self.requests_desec_rr_sets_update(name=self.my_domain.name)):
+            management.call_command('stop-abuse', self.my_domain)
+        self.my_domain.owner.refresh_from_db()
+        self.other_domain.owner.refresh_from_db()
+        self.assertEqual(self.my_domain.owner.is_active, False)
+        self.assertEqual(self.other_domain.owner.is_active, True)
+
+    def test_disable_owners(self):
+        with self.assertPdnsRequests(
+            self.requests_desec_rr_sets_update(name=self.my_domain.name),
+            self.requests_desec_rr_sets_update(name=self.other_domain.name),
+            expect_order=False,
+        ):
+            management.call_command('stop-abuse', self.my_domain, self.other_domain)
+        self.my_domain.owner.refresh_from_db()
+        self.other_domain.owner.refresh_from_db()
+        self.assertEqual(self.my_domain.owner.is_active, False)
+        self.assertEqual(self.other_domain.owner.is_active, False)