From df41b8ce9432ca60491850e7c57463ff00146b6a Mon Sep 17 00:00:00 2001 From: Carme Pamy Date: Fri, 20 Sep 2024 15:40:53 -0400 Subject: [PATCH 1/3] feat: add functionality to recreate threads when die because redis gets disconnected or other cause + create separted redis connections to make it work with redis5 --- casbin_redis_watcher/options.py | 1 + casbin_redis_watcher/watcher.py | 153 +++++++++++++++++++++++++++----- requirements.txt | 2 +- 3 files changed, 132 insertions(+), 24 deletions(-) diff --git a/casbin_redis_watcher/options.py b/casbin_redis_watcher/options.py index 12307b1..0216217 100644 --- a/casbin_redis_watcher/options.py +++ b/casbin_redis_watcher/options.py @@ -25,6 +25,7 @@ class WatcherOptions: channel = None ignore_self = None local_ID = None + timeout = 20 optional_update_callback = None def init_config(self): diff --git a/casbin_redis_watcher/watcher.py b/casbin_redis_watcher/watcher.py index 869e6b2..c4502c9 100644 --- a/casbin_redis_watcher/watcher.py +++ b/casbin_redis_watcher/watcher.py @@ -15,24 +15,42 @@ import json import logging from threading import Thread, Lock, Event +import time from casbin.model import Model from redis.client import Redis, PubSub +from redis.backoff import ExponentialBackoff +from redis.retry import Retry as RedisRetry + from casbin_redis_watcher.options import WatcherOptions class RedisWatcher: - def __init__(self): + def __init__(self, logger=None): self.mutex: Lock = Lock() self.sub_client: PubSub = None self.pub_client: Redis = None self.options: WatcherOptions = None self.close = None + self.sleep = 0 + self.execute_update = False self.callback: callable = None self.subscribe_thread: Thread = Thread(target=self.subscribe, daemon=True) self.subscribe_event = Event() - self.logger = logging.getLogger(__name__) + + self.logger = logger if logger else logging.getLogger(__name__) + + def recreate_thread(self): + self.sleep = 10 + self.execute_update = True + self.subscribe_thread: Thread = Thread(target=self.subscribe, + daemon=True + ) + self.subscribe_event = Event() + self.close = False + self.subscribe_thread.start() + self.subscribe_event.wait(timeout=1) def init_config(self, option: WatcherOptions): if option.optional_update_callback: @@ -47,6 +65,47 @@ def set_update_callback(self, callback: callable): with self.mutex: self.callback = callback + def _get_redis_conn(self): + """ + Creates a new redis connection instance + """ + rds = Redis(host=self.options.host, port=self.options.port, + password=self.options.password, + ssl=self.options.ssl, + retry=RedisRetry(ExponentialBackoff(), 3) + ) + return rds + + def init_publisher_subscriber(self, init_pub=True, init_sub=True): + """ + Initialize the publisher and subscriber subscribers + NOTE: A new Redis connection is created for the publisher and subscriber because since Redis5 + the connection needs to be created by thread + Args: + init_pub (bool, optional): Whether to initialize the publisher subscriber. Defaults to True. + init_sub (bool, optional): Whether to initialize the publisher subscriber. Defaults to True. + """ + try: + if init_pub: + rds = self._get_redis_conn() + if not rds.ping(): + raise Exception("Redis not responding.") + self.pub_client = rds.client() + + if init_sub: + rds = self._get_redis_conn() + if not rds.ping(): + raise Exception("Redis not responding.") + self.sub_client = rds.client().pubsub() + except Exception as e: + if self.pub_client: + self.pub_client.close() + if self.sub_client: + self.sub_client.close() + self.pub_client = None + self.sub_client = None + print(f"Casbin Redis Watcher error: {e}. Publisher/Subscriber failed to be initialized {self.options.local_ID}") + def update(self): def func(): with self.mutex: @@ -103,12 +162,16 @@ def func(): def default_callback_func(msg: str): print("callback: " + msg) - @staticmethod - def log_record(f: callable): + def log_record(self, f: callable): try: + if not self.pub_client: + rds = self._get_redis_conn() + self.pub_client = rds.client() result = f() except Exception as e: - print(f"Casbin Redis Watcher error: {e}") + if self.pub_client: + self.pub_client.close() + print(f"Casbin Redis Watcher error: {e}. Publisher failure on the worker {self.options.local_ID}") else: return result @@ -117,13 +180,63 @@ def unsubscribe(psc: PubSub): return psc.unsubscribe() def subscribe(self): - self.sub_client.subscribe(self.options.channel) - for item in self.sub_client.listen(): - if not self.subscribe_event.is_set(): - self.subscribe_event.set() - if item is not None and item["type"] == "message": - with self.mutex: - self.callback(str(item)) + time.sleep(self.sleep) + try: + if not self.sub_client: + rds = self._get_redis_conn() + self.sub_client = rds.client().pubsub() + self.sub_client.subscribe(self.options.channel) + print(f"Waiting for casbin updates... in the worker: {self.options.local_ID}") + if self.execute_update: + self.update() + try: + for item in self.sub_client.listen(): + if not self.subscribe_event.is_set(): + self.subscribe_event.set() + if item is not None and item["type"] == "message": + try: + with self.mutex: + self.callback(str(item)) + except Exception as listen_exc: + print("Casbin Redis watcher failed sending update to teh callback function " + " process due to: {}".format(str(listen_exc))) + if self.sub_client: + self.sub_client.close() + break + except Exception as sub_exc: + print("Casbin Redis watcher failed to get message from redis due to {}".format(str(sub_exc))) + if self.sub_client: + self.sub_client.close() + except Exception as redis_exc: + print("Casbin Redis watcher failed to subscribe due to: {}" + .format(str(redis_exc))) + finally: + if self.sub_client: + self.sub_client.close() + + def should_reload(self, recreate=True): + """ + Checks is the thread and event are still alive, if they are not they are recreated. + If they were recreated the watcher should reload the policies. + Args: + recreate(bool): recreates the thread if it's dead for redis timeouts + """ + try: + if self.subscribe_thread.is_alive() and self.subscribe_event.is_set(): + return False + else: + if recreate and not self.subscribe_thread.is_alive(): + print(f"Casbin Redis Watcher will be recreated for the worker {self.options.local_ID} in 10 secs.") + self.recreate_thread() + return True + except Exception: + return True + + def update_callback(self): + """ + This method was created to cover the function that flask_authz calls + """ + self.update() class MSG: @@ -140,18 +253,15 @@ def marshal_binary(self): @staticmethod def unmarshal_binary(data: bytes): loaded = json.loads(data) + loaded.pop("params", None) return MSG(**loaded) -def new_watcher(option: WatcherOptions): +def new_watcher(option: WatcherOptions, logger=None): option.init_config() - w = RedisWatcher() - rds = Redis(host=option.host, port=option.port, password=option.password, ssl=option.ssl) - if rds.ping() is False: - raise Exception("Redis server is not available.") - w.sub_client = rds.client().pubsub() - w.pub_client = rds.client() + w = RedisWatcher(logger) w.init_config(option) + w.init_publisher_subscriber() w.close = False w.subscribe_thread.start() w.subscribe_event.wait(timeout=5) @@ -161,10 +271,7 @@ def new_watcher(option: WatcherOptions): def new_publish_watcher(option: WatcherOptions): option.init_config() w = RedisWatcher() - rds = Redis(host=option.host, port=option.port, password=option.password, ssl=option.ssl) - if rds.ping() is False: - raise Exception("Redis server is not available.") - w.pub_client = rds.client() w.init_config(option) + w.init_publisher_subscriber(init_sub=False) w.close = False return w diff --git a/requirements.txt b/requirements.txt index 3c34b5b..7551798 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,2 +1,2 @@ casbin~=1.18 -redis==4.5.2 +redis From 54db533b43e1db463523a063d3a33a6d9e234595 Mon Sep 17 00:00:00 2001 From: Carme Pamy Date: Fri, 20 Sep 2024 15:44:39 -0400 Subject: [PATCH 2/3] ci: only run the release job in master and format code --- .github/workflows/release.yml | 1 + casbin_redis_watcher/options.py | 1 - casbin_redis_watcher/watcher.py | 30 ++++++++++++++++-------------- 3 files changed, 17 insertions(+), 15 deletions(-) diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml index 32fbcd4..d24beb8 100644 --- a/.github/workflows/release.yml +++ b/.github/workflows/release.yml @@ -72,6 +72,7 @@ jobs: GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} release: + if: ${{ github.ref_name == 'master' }} name: Release runs-on: ubuntu-latest needs: [ test, coveralls ] diff --git a/casbin_redis_watcher/options.py b/casbin_redis_watcher/options.py index 0216217..12307b1 100644 --- a/casbin_redis_watcher/options.py +++ b/casbin_redis_watcher/options.py @@ -25,7 +25,6 @@ class WatcherOptions: channel = None ignore_self = None local_ID = None - timeout = 20 optional_update_callback = None def init_config(self): diff --git a/casbin_redis_watcher/watcher.py b/casbin_redis_watcher/watcher.py index c4502c9..6bbed56 100644 --- a/casbin_redis_watcher/watcher.py +++ b/casbin_redis_watcher/watcher.py @@ -22,7 +22,6 @@ from redis.backoff import ExponentialBackoff from redis.retry import Retry as RedisRetry - from casbin_redis_watcher.options import WatcherOptions @@ -44,9 +43,7 @@ def __init__(self, logger=None): def recreate_thread(self): self.sleep = 10 self.execute_update = True - self.subscribe_thread: Thread = Thread(target=self.subscribe, - daemon=True - ) + self.subscribe_thread: Thread = Thread(target=self.subscribe, daemon=True) self.subscribe_event = Event() self.close = False self.subscribe_thread.start() @@ -69,11 +66,13 @@ def _get_redis_conn(self): """ Creates a new redis connection instance """ - rds = Redis(host=self.options.host, port=self.options.port, - password=self.options.password, - ssl=self.options.ssl, - retry=RedisRetry(ExponentialBackoff(), 3) - ) + rds = Redis( + host=self.options.host, + port=self.options.port, + password=self.options.password, + ssl=self.options.ssl, + retry=RedisRetry(ExponentialBackoff(), 3), + ) return rds def init_publisher_subscriber(self, init_pub=True, init_sub=True): @@ -104,7 +103,9 @@ def init_publisher_subscriber(self, init_pub=True, init_sub=True): self.sub_client.close() self.pub_client = None self.sub_client = None - print(f"Casbin Redis Watcher error: {e}. Publisher/Subscriber failed to be initialized {self.options.local_ID}") + print( + f"Casbin Redis Watcher error: {e}. Publisher/Subscriber failed to be initialized {self.options.local_ID}" + ) def update(self): def func(): @@ -198,8 +199,10 @@ def subscribe(self): with self.mutex: self.callback(str(item)) except Exception as listen_exc: - print("Casbin Redis watcher failed sending update to teh callback function " - " process due to: {}".format(str(listen_exc))) + print( + "Casbin Redis watcher failed sending update to teh callback function " + " process due to: {}".format(str(listen_exc)) + ) if self.sub_client: self.sub_client.close() break @@ -208,8 +211,7 @@ def subscribe(self): if self.sub_client: self.sub_client.close() except Exception as redis_exc: - print("Casbin Redis watcher failed to subscribe due to: {}" - .format(str(redis_exc))) + print("Casbin Redis watcher failed to subscribe due to: {}".format(str(redis_exc))) finally: if self.sub_client: self.sub_client.close() From eafe0317a6fd95050adfb5757826e49d0e1e3a8e Mon Sep 17 00:00:00 2001 From: Carme Pamy Date: Wed, 7 May 2025 14:51:16 -0400 Subject: [PATCH 3/3] ci: revert change --- .github/workflows/release.yml | 1 - 1 file changed, 1 deletion(-) diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml index d24beb8..32fbcd4 100644 --- a/.github/workflows/release.yml +++ b/.github/workflows/release.yml @@ -72,7 +72,6 @@ jobs: GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} release: - if: ${{ github.ref_name == 'master' }} name: Release runs-on: ubuntu-latest needs: [ test, coveralls ]