Skip to content
This repository was archived by the owner on Oct 4, 2023. It is now read-only.
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 35 additions & 0 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,41 @@ Context Manager
with rate_limiter:
do_something()

Consume
~~~~~~~

For the case of limiting the rate of a specific unit metric rather than the execution
of an operation as a whole, the ``ratelimiter`` module provides the consume attribute.
This allows to limit our execution at a rate of bytes/s instead of block-operations/s.
Note that bytes/s is just an example, similar to any other unit such as database
capacitiy per second etc. The consume attribute can be modified during execution to account
for varying unit sizes.

.. code:: python

from ratelimiter import RateLimiter

rate_limiter = RateLimiter(max_calls=10, period=1, consume=2)
# Time duration of the following operation will be doubled
for i in range(100):
with rate_limiter:
do_something()

.. code:: python

from ratelimiter import RateLimiter

rate_limiter = RateLimiter(max_calls=1024, period=1)
# Stream of data
packets = [ bytearray(512), bytearray(2048), bytearray(1024), bytearray(512) ]

for packet in packets:
with rate_limiter:
send(packet)
# Consume the length of the packet
rate_limiter.consume = packet.length


Callback
~~~~~~~~

Expand Down
33 changes: 28 additions & 5 deletions ratelimiter/_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import functools
import threading
import collections
import math


class RateLimiter(object):
Expand All @@ -12,14 +13,16 @@ class RateLimiter(object):
requests for a time period.
"""

def __init__(self, max_calls, period=1.0, callback=None):
def __init__(self, max_calls, period=1.0, callback=None, consume=1):
"""Initialize a RateLimiter object which enforces as much as max_calls
operations on period (eventually floating) number of seconds.
"""
if period <= 0:
raise ValueError('Rate limiting period should be > 0')
if max_calls <= 0:
raise ValueError('Rate limiting number of calls should be > 0')
if consume < 1:
raise ValueError('Number of calls to consume should be >= 1')

# We're using a deque to store the last execution timestamps, not for
# its maxlen attribute, but to allow constant time front removal.
Expand All @@ -28,6 +31,7 @@ def __init__(self, max_calls, period=1.0, callback=None):
self.period = period
self.max_calls = max_calls
self.callback = callback
self.consume = consume
self._lock = threading.Lock()
self._alock = None

Expand All @@ -49,8 +53,9 @@ def __enter__(self):
# We want to ensure that no more than max_calls were run in the allowed
# period. For this, we store the last timestamps of each call and run
# the rate verification upon each __enter__ call.
if len(self.calls) >= self.max_calls:
until = time.time() + self.period - self._timespan
if len(self.calls) + self.consume > self.max_calls:
cycles = math.ceil(self.consume / self.max_calls)
until = time.time() + cycles * self.period - self._timespan
if self.callback:
t = threading.Thread(target=self.callback, args=(until,))
t.daemon = True
Expand All @@ -63,7 +68,8 @@ def __enter__(self):
def __exit__(self, exc_type, exc_val, exc_tb):
with self._lock:
# Store the last operation timestamp.
self.calls.append(time.time())
timestamps = [ time.time() ] * self.consume
self.calls.extend(timestamps)

# Pop the timestamp list front (ie: the older calls) until the sum goes
# back below the period. This is our 'sliding period' window.
Expand All @@ -72,4 +78,21 @@ def __exit__(self, exc_type, exc_val, exc_tb):

@property
def _timespan(self):
return self.calls[-1] - self.calls[0]
return self.calls[-1] - self.calls[0] if self.calls else 0

@property
def consume(self):
"""The consume attribute allows to modify the number of calls
that will be consumed with a single rate limited run."""
return self._consume

@consume.setter
def consume(self, value):
if value >= 1:
self._consume = value
else:
raise ValueError('Number of calls to consume should be >= 1')

@consume.deleter
def consume(self):
raise AttributeError('Consume attribute cannot be dereferenced')
76 changes: 68 additions & 8 deletions tests/test_ratelimiter.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,13 +52,15 @@ def validate_call_times(self, ts, max_calls, period):
self.assertGreaterEqual(ts[i + max_calls] - ts[i], period)

def test_bad_args(self):
self.assertRaises(ValueError, RateLimiter, -1, self.period)
self.assertRaises(ValueError, RateLimiter, +1, -self.period)
self.assertRaises(ValueError, RateLimiter, -self.max_calls, self.period)
self.assertRaises(ValueError, RateLimiter, self.max_calls, -self.period)
self.assertRaises(ValueError, RateLimiter, self.max_calls, self.period, consume=0)
self.assertRaises(AttributeError, delattr, RateLimiter, 'consume')

def test_limit_1(self):
with Timer() as timer:
obj = RateLimiter(self.max_calls, self.period)
for i in range(self.max_calls + 1):
for _ in range(self.max_calls + 1):
with obj:
# After the 'self.max_calls' iteration the execution
# inside the context manager should be blocked
Expand All @@ -69,17 +71,51 @@ def test_limit_1(self):
# iterations is greater or equal to the 'self.period' then blocking
# and sleeping after the 'self.max_calls' iteration has been occured.
self.assertGreaterEqual(timer.duration, self.period)
self.assertLessEqual(timer.duration, 2 * self.period)

def test_limit_2(self):
calls = []
obj = RateLimiter(self.max_calls, self.period)
for i in range(3 * self.max_calls):
for _ in range(3 * self.max_calls):
with obj:
calls.append(time.time())
calls.extend([time.time()] * obj.consume)

self.assertEqual(len(calls), 3 * self.max_calls)
self.validate_call_times(calls, self.max_calls, self.period)

def test_limit_3(self):
with Timer() as timer:
obj = RateLimiter(self.max_calls, self.period, consume=2)
for _ in range(self.max_calls+1):
with obj:
pass

self.assertGreaterEqual(timer.duration, self.period * obj.consume)
self.assertLessEqual(timer.duration, self.period * (obj.consume + 1))

def test_limit_4(self):
calls = []
obj = RateLimiter(self.max_calls, self.period, consume=3)
for _ in range(3 * self.max_calls):
with obj:
calls.extend([time.time()] * obj.consume)

self.assertEqual(len(calls), 3 * self.max_calls * obj.consume)
self.validate_call_times(calls, self.max_calls, self.period)

def test_limit_5(self):
calls = []
obj = RateLimiter(self.max_calls, self.period)
with Timer() as timer:
while obj.consume < 2 * self.max_calls:
with obj:
calls.extend([time.time()] * obj.consume)
obj.consume += 1

self.assertEqual(len(calls), sum(range(2 * self.max_calls)))
self.assertGreaterEqual(timer.duration, self.period * math.ceil(len(calls) / self.max_calls))
self.validate_call_times(calls, 2 * self.max_calls - 1, 2 * self.period)

def test_decorator_1(self):
@RateLimiter(self.max_calls, self.period)
def f():
Expand All @@ -88,25 +124,49 @@ def f():
pass

with Timer() as timer:
[f() for i in range(self.max_calls + 1)]
[f() for _ in range(self.max_calls + 1)]

# The sum of the time in the iterations without the rate limit blocking
# is way lower than 'self.period'. If the duration of the all
# iterations is greater or equal to the 'self.period' then blocking
# and sleeping after the 'self.max_calls' iteration has been occured.
self.assertGreaterEqual(timer.duration, self.period)
self.assertLessEqual(timer.duration, 2 * self.period)

def test_decorator_2(self):
@RateLimiter(self.max_calls, self.period)
def f():
f.calls.append(time.time())
f.calls.extend([time.time()])
f.calls = []

[f() for i in range(3 * self.max_calls)]
[f() for _ in range(3 * self.max_calls)]

self.assertEqual(len(f.calls), 3 * self.max_calls)
self.validate_call_times(f.calls, self.max_calls, self.period)

def test_decorator_3(self):
consume = 2
@RateLimiter(self.max_calls, self.period, consume=consume)
def f():
pass
with Timer() as timer:
[f() for _ in range(self.max_calls + 1)]

self.assertGreaterEqual(timer.duration, self.period * consume)
self.assertLessEqual(timer.duration, self.period * (consume + 1))

def test_decorator_4(self):
consume = 3
@RateLimiter(self.max_calls, self.period, consume=consume)
def f():
f.calls.extend([time.time()] * consume)
f.calls = []

[f() for _ in range(3 * self.max_calls)]

self.assertEqual(len(f.calls), 3 * self.max_calls * consume)
self.validate_call_times(f.calls, self.max_calls, self.period)

def test_random(self):
for _ in range(10):
calls = []
Expand Down