瀏覽代碼

feat(api): extend abuse protection to check email hostnames

When user register, we check how many accounts with the same email
hostname were registered in the past. If there are too many, we
ask the user to solve a captcha.

This commit extends the code to be easily extended by more abuse
heuristics.
Nils Wisiol 8 年之前
父節點
當前提交
9d1bdbcba5

+ 1 - 1
api/desecapi/management/commands/privacy-chores.py

@@ -9,7 +9,7 @@ class Command(BaseCommand):
 
     def handle(self, *args, **kwargs):
 
-        users = User.objects.filter(created__lt=timezone.now()-timedelta(hours=settings.ABUSE_LOCK_ACCOUNT_BY_REGISTRATION_IP_PERIOD_HRS))
+        users = User.objects.filter(created__lt=timezone.now()-timedelta(hours=settings.ABUSE_BY_REMOTE_IP_PERIOD_HRS))
         for u in users:
             u.registration_remote_ip = ''
             u.save() # TODO bulk update?

+ 4 - 1
api/desecapi/settings.py

@@ -155,5 +155,8 @@ NORECAPTCHA_SECRET_KEY = os.environ['DESECSTACK_NORECAPTCHA_SECRET_KEY']
 NORECAPTCHA_WIDGET_TEMPLATE = 'captcha-widget.html'
 
 # abuse protection
-ABUSE_LOCK_ACCOUNT_BY_REGISTRATION_IP_PERIOD_HRS = 48
+ABUSE_BY_REMOTE_IP_LIMIT = 1
+ABUSE_BY_REMOTE_IP_PERIOD_HRS = 48
+ABUSE_BY_EMAIL_HOSTNAME_LIMIT = 1
+ABUSE_BY_EMAIL_HOSTNAME_PERIOD_HRS = 24
 LIMIT_USER_DOMAIN_COUNT_DEFAULT = 5

+ 1 - 1
api/desecapi/tests/testprivacychores.py

@@ -22,7 +22,7 @@ class PrivacyChoresCommandTest(TestCase):
             registration_remote_ip='1.3.3.8',
         ).save()
         user2 = User.objects.get(email=name2)
-        user2.created = timezone.now()-timedelta(hours=settings.ABUSE_LOCK_ACCOUNT_BY_REGISTRATION_IP_PERIOD_HRS+1)
+        user2.created = timezone.now()-timedelta(hours=settings.ABUSE_BY_REMOTE_IP_PERIOD_HRS+1)
         user2.save()
 
         user_count = User.objects.all().count()

+ 56 - 1
api/desecapi/tests/testregistration.py

@@ -87,7 +87,7 @@ class RegistrationTest(APITestCase):
         self.assertEqual(user.captcha_required, False)
 
         #fake registration time
-        user.created = timezone.now() - timedelta(hours=settings.ABUSE_LOCK_ACCOUNT_BY_REGISTRATION_IP_PERIOD_HRS+1)
+        user.created = timezone.now() - timedelta(hours=settings.ABUSE_BY_REMOTE_IP_PERIOD_HRS+1)
         user.save()
 
         url = reverse('register')
@@ -110,3 +110,58 @@ class RegistrationTest(APITestCase):
         send_account_lock_email(None, user)
 
         self.assertEqual(len(mail.outbox), outboxlen+1)
+
+    def test_multiple_registration_captcha_required_same_email_host(self):
+        outboxlen = len(mail.outbox)
+
+        url = reverse('register')
+        for i in range(settings.ABUSE_BY_EMAIL_HOSTNAME_LIMIT):
+            data = {'email': utils.generateRandomString() + '@test-same-email.desec.io', 'password': utils.generateRandomString(size=12)}
+            response = self.client.post(url, data, REMOTE_ADDR=utils.generateRandomString())
+            self.assertEqual(response.status_code, status.HTTP_201_CREATED)
+            user = models.User.objects.get(email=data['email'])
+            self.assertEqual(user.email, data['email'])
+            self.assertEqual(user.captcha_required, False)
+
+        self.assertEqual(len(mail.outbox), outboxlen)
+
+        url = reverse('register')
+        data = {'email': utils.generateRandomString() + '@test-same-email.desec.io',
+                'password': utils.generateRandomString(size=12)}
+        response = self.client.post(url, data, REMOTE_ADDR=utils.generateRandomString())
+        self.assertEqual(response.status_code, status.HTTP_201_CREATED)
+        user = models.User.objects.get(email=data['email'])
+        self.assertEqual(user.email, data['email'])
+        self.assertEqual(user.captcha_required, True)
+
+        self.assertEqual(len(mail.outbox), outboxlen + 1)
+
+    def test_multiple_registration_no_captcha_required_same_email_host_long_time(self):
+        outboxlen = len(mail.outbox)
+
+        url = reverse('register')
+        for i in range(settings.ABUSE_BY_EMAIL_HOSTNAME_LIMIT):
+            data = {'email': utils.generateRandomString() + '@test-same-email-1.desec.io', 'password': utils.generateRandomString(size=12)}
+            response = self.client.post(url, data, REMOTE_ADDR=utils.generateRandomString())
+            self.assertEqual(response.status_code, status.HTTP_201_CREATED)
+            user = models.User.objects.get(email=data['email'])
+            self.assertEqual(user.email, data['email'])
+            self.assertEqual(user.captcha_required, False)
+
+            #fake registration time
+            user = models.User.objects.get(email=data['email'])
+            user.created = timezone.now() - timedelta(hours=settings.ABUSE_BY_REMOTE_IP_PERIOD_HRS+1)
+            user.save()
+
+        self.assertEqual(len(mail.outbox), outboxlen)
+
+        url = reverse('register')
+        data = {'email': utils.generateRandomString() + '@test-same-email-1.desec.io',
+                'password': utils.generateRandomString(size=12)}
+        response = self.client.post(url, data, REMOTE_ADDR=utils.generateRandomString())
+        self.assertEqual(response.status_code, status.HTTP_201_CREATED)
+        user = models.User.objects.get(email=data['email'])
+        self.assertEqual(user.email, data['email'])
+        self.assertEqual(user.captcha_required, False)
+
+        self.assertEqual(len(mail.outbox), outboxlen)

+ 1 - 1
api/desecapi/tests/utils.py

@@ -12,7 +12,7 @@ class utils(object):
 
     @classmethod
     def generateUsername(cls):
-        return cls.generateRandomString() + '@desec.io'
+        return cls.generateRandomString() + '@' + cls.generateRandomString() + 'desec.io'
 
     @classmethod
     def generateDomainname(cls):

+ 13 - 4
api/desecapi/views.py

@@ -290,10 +290,19 @@ class RegistrationView(views.RegistrationView):
         return Response(serializer.data, status=status.HTTP_201_CREATED, headers=headers)
 
     def perform_create(self, serializer, remote_ip):
-        captcha = User.objects.filter(
-                created__gte=timezone.now()-timedelta(hours=settings.ABUSE_LOCK_ACCOUNT_BY_REGISTRATION_IP_PERIOD_HRS),
-                registration_remote_ip=remote_ip
-            ).exists()
+        captcha = \
+            (
+                User.objects.filter(
+                    created__gte=timezone.now()-timedelta(hours=settings.ABUSE_BY_REMOTE_IP_PERIOD_HRS),
+                    registration_remote_ip=remote_ip
+                ).count() >= settings.ABUSE_BY_REMOTE_IP_LIMIT
+                or
+                User.objects.filter(
+                    created__gte=timezone.now() - timedelta(hours=settings.ABUSE_BY_EMAIL_HOSTNAME_PERIOD_HRS),
+                    email__endswith=serializer.validated_data['email'].split('@')[-1]
+                ).count() >= settings.ABUSE_BY_EMAIL_HOSTNAME_LIMIT
+            )
+
         user = serializer.save(registration_remote_ip=remote_ip, captcha_required=captcha)
         if captcha:
             send_account_lock_email(self.request, user)