[Solved]-Django REST Framework: Per-user throttles

18๐Ÿ‘

I figured out a way to do this by extending the UserRateThrottle and adding special users to my settings file.

This class just overrides the allow_request method, adding some special logic to see if usernames are listed in the OVERRIDE_THROTTLE_RATES variable:

class ExceptionalUserRateThrottle(UserRateThrottle):
    def allow_request(self, request, view):
        """
        Give special access to a few special accounts.

        Mirrors code in super class with minor tweaks.
        """
        if self.rate is None:
            return True

        self.key = self.get_cache_key(request, view)
        if self.key is None:
            return True

        self.history = self.cache.get(self.key, [])
        self.now = self.timer()

        # Adjust if user has special privileges.
        override_rate = settings.REST_FRAMEWORK['OVERRIDE_THROTTLE_RATES'].get(
            request.user.username,
            None,
        )
        if override_rate is not None:
            self.num_requests, self.duration = self.parse_rate(override_rate)

        # Drop any requests from the history which have now passed the
        # throttle duration
        while self.history and self.history[-1] <= self.now - self.duration:
            self.history.pop()
        if len(self.history) >= self.num_requests:
            return self.throttle_failure()
        return self.throttle_success()

To use this, just set your DEFAULT_THROTTLE_CLASS to this class, then put some special users into OVERRIDE_THROTTLE_RATES like so:

'DEFAULT_THROTTLE_CLASSES': (
    'rest_framework.throttling.AnonRateThrottle',
    'cl.api.utils.ExceptionalUserRateThrottle',
),
'DEFAULT_THROTTLE_RATES': {
    'anon': '100/day',
    'user': '1000/hour',
},
'OVERRIDE_THROTTLE_RATES': {
    'scout': '10000/hour',
    'scout_test': '10000/hour',
},
๐Ÿ‘คmlissner

2๐Ÿ‘

I have found the solution after customized Django REST Throttling,

Its Blocking particular user after 3 login attempts (Block user_id that presents in my application). Block IP address after 6 login attempts for anonymous user.

prevent.py :-

#!/usr/bin/python

from collections import Counter

from rest_framework.throttling import SimpleRateThrottle
from django.contrib.auth.models import User


class UserLoginRateThrottle(SimpleRateThrottle):
    scope = 'loginAttempts'

    def get_cache_key(self, request, view):
        user = User.objects.filter(username=request.data.get('username'))
        ident = user[0].pk if user else self.get_ident(request)

        return self.cache_format % {
            'scope': self.scope,
            'ident': ident
        }

    def allow_request(self, request, view):
        """
        Implement the check to see if the request should be throttled.
        On success calls `throttle_success`.
        On failure calls `throttle_failure`.
        """
        if self.rate is None:
            return True

        self.key = self.get_cache_key(request, view)
        if self.key is None:
            return True

        self.history = self.cache.get(self.key, [])
        self.now = self.timer()

        while self.history and self.history[-1] <= self.now - self.duration:
            self.history.pop()

        if len(self.history) >= self.num_requests:
            return self.throttle_failure()

        if len(self.history) >= 3:
            data = Counter(self.history)
            for key, value in data.items():
                if value == 2:
                    return self.throttle_failure()
        return self.throttle_success(request)

    def throttle_success(self, request):
        """
        Inserts the current request's timestamp along with the key
        into the cache.
        """
        user = User.objects.filter(username=request.data.get('username'))
        if user:
            self.history.insert(0, user[0].id)
        self.history.insert(0, self.now)
        self.cache.set(self.key, self.history, self.duration)
        return True

Views.py :-

from .prevent import UserLoginRateThrottle
   ....
   ....
   ....
   class ObtainAuthToken(auth_views.ObtainAuthToken):
       throttle_classes = (UserLoginRateThrottle,)/use this method here your login view

       def post(self, request, *args, **kwargs):
           ....
           ....

Settings.py :-

Django-rest-framework

REST_FRAMEWORK = {
    ...
    ...
    ...
    'DEFAULT_THROTTLE_CLASSES': (
        'rest_framework.throttling.UserRateThrottle',

    ),
    'DEFAULT_THROTTLE_RATES': {
        'loginAttempts': '6/hr',
        'user': '1000/min',
    }
}
๐Ÿ‘คMr Singh

1๐Ÿ‘

I know this a pretty old thread and the accepted answer was helpful for me as well. I wanted to show how you can have multiple user rate throttles in place, in this case additional ones for the root users

from rest_framework.settings import api_settings
from django.core.exceptions import ImproperlyConfigured

class RootRateThrottle(UserRateThrottle):
    """
    Limits the rate of API calls that may be made by a given user.
    The user id will be used as a unique cache key if the user is
    authenticated.  For anonymous requests, the IP address of the request will
    be used.
    """

    def get_cache_key(self, request, view):
        if request.user.is_authenticated:
            ident = request.user.pk
        else:
            ident = self.get_ident(request)

        self.rate = self.get_rate(request)
        logger.debug(
            "Throttling rate for %s: %s", request.user, self.rate
        )

        self.num_requests, self.duration = self.parse_rate(self.rate)
        return self.cache_format % {
            'scope': self.scope,
            'ident': ident
        }

    def get_rate(self, request=None):
        """
        Determine the string representation of the allowed request rate.
        """
        if not getattr(self, 'scope', None):
            msg = ("You must set either `.scope` or `.rate` for '%s' throttle" %
                   self.__class__.__name__)
            raise ImproperlyConfigured(msg)

        if request and request.user.is_superuser:
            throttle_rates = settings.REST_FRAMEWORK["ROOT_THROTTLE_RATES"]
        else:
            throttle_rates = api_settings.DEFAULT_THROTTLE_RATES
        try:
            return throttle_rates[self.scope]
        except KeyError:
            msg = "No default throttle rate set for '%s' scope" % self.scope
            raise ImproperlyConfigured(msg)

class ByMinuteRateThrottle(RootRateThrottle):
    scope = 'minute'


class ByHourRateThrottle(RootRateThrottle):
    scope = 'hour'


class ByDayRateThrottle(RootRateThrottle):
    scope = 'day'

the settings part then looks like this

'DEFAULT_THROTTLE_CLASSES': [
    'threedi_api.throttling.ByMinuteRateThrottle',
    'threedi_api.throttling.ByHourRateThrottle',
    'threedi_api.throttling.ByDayRateThrottle',
],
'DEFAULT_THROTTLE_RATES': {
    'minute': '100/min',
    'hour': '1000/hour',
    'day': '5000/day',
},
'ROOT_THROTTLE_RATES': {
    'minute': '200/min',
    'hour': '2000/hour',
    'day': '10000/day',
},
๐Ÿ‘คLarsVegas

Leave a comment