ソースを参照

feat(api): also take email addresses as abuse cases

Nils Wisiol 3 年 前
コミット
4662aca817

+ 30 - 5
api/desecapi/management/commands/stop-abuse.py

@@ -1,21 +1,46 @@
 from django.core.management import BaseCommand
+from django.db.models import Q
 
+from api import settings
 from desecapi.models import RRset, Domain, User
 from desecapi.pdns_change_tracker import PDNSChangeTracker
 
 
 class Command(BaseCommand):
-    help = 'Removes all DNS records from the given domain and suspends the associated user'
+    help = 'Removes all DNS records from domains given either by name or by email address of their owner. ' \
+           'Locks all implicated user accounts.'
 
     def add_arguments(self, parser):
-        parser.add_argument('domain-name', nargs='*', help='Domain(s) to remove all DNS records from')
+        parser.add_argument('names', nargs='*',
+                            help='Domain(s) and User(s) to truncate and disable identified by name and email addresses')
 
     def handle(self, *args, **options):
         with PDNSChangeTracker():
-            domains = Domain.objects.filter(name__in=options['domain-name'])
-            users = User.objects.filter(domains__name__in=options['domain-name'])
-            rrsets = RRset.objects.filter(domain__name__in=options['domain-name']).exclude(type='NS', subname='')
+            # domains to truncate: all domains given and all domains belonging to a user given
+            domains = Domain.objects.filter(
+                Q(name__in=options['names']) |
+                Q(owner__email__in=options['names'])
+            )
+
+            # users to lock: all associated with any of the domains and all given
+            users = User.objects.filter(
+                Q(domains__name__in=options['names']) |
+                Q(email__in=options['names'])
+            )
+
+            # rrsets to delete: all belonging to (all domains given and all domains belonging to a user given)
+            rrsets = RRset.objects.filter(
+                Q(domain__name__in=options['names']) |
+                Q(domain__owner__email__in=options['names'])
+            )
+
             print(f'Deleting {rrsets.count()} RRset(s) from {domains.count()} domain(s); '
                   f'disabling {users.count()} associated user account(s).')
+
+            # delete rrsets and create default NS records
             rrsets.delete()
+            for d in domains:
+                RRset.objects.create(domain=d, subname='', type='NS', ttl=3600, contents=settings.DEFAULT_NS)
+
+        # lock users
         users.update(is_active=False)

+ 56 - 6
api/desecapi/tests/test_stop_abuse.py

@@ -1,5 +1,6 @@
 from django.core import management
 
+from api import settings
 from desecapi import models
 from desecapi.tests.base import DomainOwnerTestCase
 
@@ -20,31 +21,68 @@ class StopAbuseCommandTest(DomainOwnerTestCase):
         # test implicit by absence assertPdnsRequests
         management.call_command('stop-abuse')
 
-    def test_remove_rrsets(self):
+    def test_remove_rrsets_by_domain_name(self):
         with self.assertPdnsRequests(self.requests_desec_rr_sets_update(name=self.my_domain.name)):
             management.call_command('stop-abuse', self.my_domain)
         self.assertEqual(models.RRset.objects.filter(domain__name=self.my_domain.name).count(), 1)  # only NS left
+        self.assertEqual(
+            set(models.RR.objects.filter(rrset__domain__name=self.my_domain.name).values_list('content', flat=True)),
+            set(settings.DEFAULT_NS),
+        )
 
-    def test_disable_user(self):
+    def test_remove_rrsets_by_email(self):
+        with self.assertPdnsRequests(
+            *[self.requests_desec_rr_sets_update(name=d.name) for d in self.my_domains],
+            expect_order=False,
+        ):
+            management.call_command('stop-abuse', self.owner.email)
+        self.assertEqual(models.RRset.objects.filter(domain__name=self.my_domain.name).count(), 1)  # only NS left
+        self.assertEqual(
+            set(models.RR.objects.filter(rrset__domain__name=self.my_domain.name).values_list('content', flat=True)),
+            set(settings.DEFAULT_NS),
+        )
+
+    def test_disable_user_by_domain_name(self):
         with self.assertPdnsRequests(self.requests_desec_rr_sets_update(name=self.my_domain.name)):
             management.call_command('stop-abuse', self.my_domain)
         self.owner.refresh_from_db()
         self.assertEqual(self.owner.is_active, False)
 
-    def test_keep_other_owned_domains(self):
+    def test_disable_user_by_email(self):
+        with self.assertPdnsRequests(
+            *[self.requests_desec_rr_sets_update(name=d.name) for d in self.my_domains],
+            expect_order=False,
+        ):
+            management.call_command('stop-abuse', self.owner.email)
+        self.owner.refresh_from_db()
+        self.assertEqual(self.owner.is_active, False)
+
+    def test_keep_other_owned_domains_name(self):
         with self.assertPdnsRequests(self.requests_desec_rr_sets_update(name=self.my_domain.name)):
             management.call_command('stop-abuse', self.my_domain)
         self.assertGreater(models.RRset.objects.filter(domain__name=self.my_domains[1].name).count(), 1)
 
+    def test_dont_keep_other_owned_domains_email(self):
+        with self.assertPdnsRequests(
+            *[self.requests_desec_rr_sets_update(name=d.name) for d in self.my_domains],
+            expect_order=False,
+        ):
+            management.call_command('stop-abuse', self.owner.email)
+        self.assertEqual(models.RRset.objects.filter(domain__name=self.my_domains[1].name).count(), 1)
+
     def test_only_disable_owner(self):
-        with self.assertPdnsRequests(self.requests_desec_rr_sets_update(name=self.my_domain.name)):
-            management.call_command('stop-abuse', self.my_domain)
+        with self.assertPdnsRequests(
+            self.requests_desec_rr_sets_update(name=self.my_domains[0].name),
+            self.requests_desec_rr_sets_update(name=self.my_domains[1].name),
+            expect_order=False,
+        ):
+            management.call_command('stop-abuse', self.my_domain, self.owner.email)
         self.my_domain.owner.refresh_from_db()
         self.other_domain.owner.refresh_from_db()
         self.assertEqual(self.my_domain.owner.is_active, False)
         self.assertEqual(self.other_domain.owner.is_active, True)
 
-    def test_disable_owners(self):
+    def test_disable_owners_by_domain_name(self):
         with self.assertPdnsRequests(
             self.requests_desec_rr_sets_update(name=self.my_domain.name),
             self.requests_desec_rr_sets_update(name=self.other_domain.name),
@@ -55,3 +93,15 @@ class StopAbuseCommandTest(DomainOwnerTestCase):
         self.other_domain.owner.refresh_from_db()
         self.assertEqual(self.my_domain.owner.is_active, False)
         self.assertEqual(self.other_domain.owner.is_active, False)
+
+    def test_disable_owners_by_email(self):
+        with self.assertPdnsRequests(
+            *[self.requests_desec_rr_sets_update(name=d.name) for d in self.my_domains + self.other_domains],
+            expect_order=False,
+        ):
+            management.call_command('stop-abuse', self.owner.email, *[d.owner.email for d in self.other_domains])
+        self.my_domain.owner.refresh_from_db()
+        self.other_domain.owner.refresh_from_db()
+        self.assertEqual(self.my_domain.owner.is_active, False)
+        self.assertEqual(self.other_domain.owner.is_active, False)
+