diff --git a/README.rst b/README.rst index 5fc9551..43b93d2 100644 --- a/README.rst +++ b/README.rst @@ -40,6 +40,41 @@ Context Manager with rate_limiter: do_something() +Consume +~~~~~~~ + +For the case of limiting the rate of a specific unit metric rather than the execution +of an operation as a whole, the ``ratelimiter`` module provides the consume attribute. +This allows to limit our execution at a rate of bytes/s instead of block-operations/s. +Note that bytes/s is just an example, similar to any other unit such as database +capacitiy per second etc. The consume attribute can be modified during execution to account +for varying unit sizes. + +.. code:: python + + from ratelimiter import RateLimiter + + rate_limiter = RateLimiter(max_calls=10, period=1, consume=2) + # Time duration of the following operation will be doubled + for i in range(100): + with rate_limiter: + do_something() + +.. code:: python + + from ratelimiter import RateLimiter + + rate_limiter = RateLimiter(max_calls=1024, period=1) + # Stream of data + packets = [ bytearray(512), bytearray(2048), bytearray(1024), bytearray(512) ] + + for packet in packets: + with rate_limiter: + send(packet) + # Consume the length of the packet + rate_limiter.consume = packet.length + + Callback ~~~~~~~~ diff --git a/ratelimiter/_sync.py b/ratelimiter/_sync.py index 577dfb4..d9c7ac2 100644 --- a/ratelimiter/_sync.py +++ b/ratelimiter/_sync.py @@ -4,6 +4,7 @@ import functools import threading import collections +import math class RateLimiter(object): @@ -12,7 +13,7 @@ class RateLimiter(object): requests for a time period. """ - def __init__(self, max_calls, period=1.0, callback=None): + def __init__(self, max_calls, period=1.0, callback=None, consume=1): """Initialize a RateLimiter object which enforces as much as max_calls operations on period (eventually floating) number of seconds. """ @@ -20,6 +21,8 @@ def __init__(self, max_calls, period=1.0, callback=None): raise ValueError('Rate limiting period should be > 0') if max_calls <= 0: raise ValueError('Rate limiting number of calls should be > 0') + if consume < 1: + raise ValueError('Number of calls to consume should be >= 1') # We're using a deque to store the last execution timestamps, not for # its maxlen attribute, but to allow constant time front removal. @@ -28,6 +31,7 @@ def __init__(self, max_calls, period=1.0, callback=None): self.period = period self.max_calls = max_calls self.callback = callback + self.consume = consume self._lock = threading.Lock() self._alock = None @@ -49,8 +53,9 @@ def __enter__(self): # We want to ensure that no more than max_calls were run in the allowed # period. For this, we store the last timestamps of each call and run # the rate verification upon each __enter__ call. - if len(self.calls) >= self.max_calls: - until = time.time() + self.period - self._timespan + if len(self.calls) + self.consume > self.max_calls: + cycles = math.ceil(self.consume / self.max_calls) + until = time.time() + cycles * self.period - self._timespan if self.callback: t = threading.Thread(target=self.callback, args=(until,)) t.daemon = True @@ -63,7 +68,8 @@ def __enter__(self): def __exit__(self, exc_type, exc_val, exc_tb): with self._lock: # Store the last operation timestamp. - self.calls.append(time.time()) + timestamps = [ time.time() ] * self.consume + self.calls.extend(timestamps) # Pop the timestamp list front (ie: the older calls) until the sum goes # back below the period. This is our 'sliding period' window. @@ -72,4 +78,21 @@ def __exit__(self, exc_type, exc_val, exc_tb): @property def _timespan(self): - return self.calls[-1] - self.calls[0] + return self.calls[-1] - self.calls[0] if self.calls else 0 + + @property + def consume(self): + """The consume attribute allows to modify the number of calls + that will be consumed with a single rate limited run.""" + return self._consume + + @consume.setter + def consume(self, value): + if value >= 1: + self._consume = value + else: + raise ValueError('Number of calls to consume should be >= 1') + + @consume.deleter + def consume(self): + raise AttributeError('Consume attribute cannot be dereferenced') diff --git a/tests/test_ratelimiter.py b/tests/test_ratelimiter.py index abe5ed4..58df8e0 100644 --- a/tests/test_ratelimiter.py +++ b/tests/test_ratelimiter.py @@ -52,13 +52,15 @@ def validate_call_times(self, ts, max_calls, period): self.assertGreaterEqual(ts[i + max_calls] - ts[i], period) def test_bad_args(self): - self.assertRaises(ValueError, RateLimiter, -1, self.period) - self.assertRaises(ValueError, RateLimiter, +1, -self.period) + self.assertRaises(ValueError, RateLimiter, -self.max_calls, self.period) + self.assertRaises(ValueError, RateLimiter, self.max_calls, -self.period) + self.assertRaises(ValueError, RateLimiter, self.max_calls, self.period, consume=0) + self.assertRaises(AttributeError, delattr, RateLimiter, 'consume') def test_limit_1(self): with Timer() as timer: obj = RateLimiter(self.max_calls, self.period) - for i in range(self.max_calls + 1): + for _ in range(self.max_calls + 1): with obj: # After the 'self.max_calls' iteration the execution # inside the context manager should be blocked @@ -69,17 +71,51 @@ def test_limit_1(self): # iterations is greater or equal to the 'self.period' then blocking # and sleeping after the 'self.max_calls' iteration has been occured. self.assertGreaterEqual(timer.duration, self.period) + self.assertLessEqual(timer.duration, 2 * self.period) def test_limit_2(self): calls = [] obj = RateLimiter(self.max_calls, self.period) - for i in range(3 * self.max_calls): + for _ in range(3 * self.max_calls): with obj: - calls.append(time.time()) + calls.extend([time.time()] * obj.consume) self.assertEqual(len(calls), 3 * self.max_calls) self.validate_call_times(calls, self.max_calls, self.period) + def test_limit_3(self): + with Timer() as timer: + obj = RateLimiter(self.max_calls, self.period, consume=2) + for _ in range(self.max_calls+1): + with obj: + pass + + self.assertGreaterEqual(timer.duration, self.period * obj.consume) + self.assertLessEqual(timer.duration, self.period * (obj.consume + 1)) + + def test_limit_4(self): + calls = [] + obj = RateLimiter(self.max_calls, self.period, consume=3) + for _ in range(3 * self.max_calls): + with obj: + calls.extend([time.time()] * obj.consume) + + self.assertEqual(len(calls), 3 * self.max_calls * obj.consume) + self.validate_call_times(calls, self.max_calls, self.period) + + def test_limit_5(self): + calls = [] + obj = RateLimiter(self.max_calls, self.period) + with Timer() as timer: + while obj.consume < 2 * self.max_calls: + with obj: + calls.extend([time.time()] * obj.consume) + obj.consume += 1 + + self.assertEqual(len(calls), sum(range(2 * self.max_calls))) + self.assertGreaterEqual(timer.duration, self.period * math.ceil(len(calls) / self.max_calls)) + self.validate_call_times(calls, 2 * self.max_calls - 1, 2 * self.period) + def test_decorator_1(self): @RateLimiter(self.max_calls, self.period) def f(): @@ -88,25 +124,49 @@ def f(): pass with Timer() as timer: - [f() for i in range(self.max_calls + 1)] + [f() for _ in range(self.max_calls + 1)] # The sum of the time in the iterations without the rate limit blocking # is way lower than 'self.period'. If the duration of the all # iterations is greater or equal to the 'self.period' then blocking # and sleeping after the 'self.max_calls' iteration has been occured. self.assertGreaterEqual(timer.duration, self.period) + self.assertLessEqual(timer.duration, 2 * self.period) def test_decorator_2(self): @RateLimiter(self.max_calls, self.period) def f(): - f.calls.append(time.time()) + f.calls.extend([time.time()]) f.calls = [] - [f() for i in range(3 * self.max_calls)] + [f() for _ in range(3 * self.max_calls)] self.assertEqual(len(f.calls), 3 * self.max_calls) self.validate_call_times(f.calls, self.max_calls, self.period) + def test_decorator_3(self): + consume = 2 + @RateLimiter(self.max_calls, self.period, consume=consume) + def f(): + pass + with Timer() as timer: + [f() for _ in range(self.max_calls + 1)] + + self.assertGreaterEqual(timer.duration, self.period * consume) + self.assertLessEqual(timer.duration, self.period * (consume + 1)) + + def test_decorator_4(self): + consume = 3 + @RateLimiter(self.max_calls, self.period, consume=consume) + def f(): + f.calls.extend([time.time()] * consume) + f.calls = [] + + [f() for _ in range(3 * self.max_calls)] + + self.assertEqual(len(f.calls), 3 * self.max_calls * consume) + self.validate_call_times(f.calls, self.max_calls, self.period) + def test_random(self): for _ in range(10): calls = []