Skip to content

Commit 624ec90

Browse files
authored
feat: Add changes to allow redis-watcher to work with flask authz + redis 5
1 parent 2ba1131 commit 624ec90

2 files changed

Lines changed: 133 additions & 24 deletions

File tree

redis_watcher/watcher.py

Lines changed: 132 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -15,24 +15,39 @@
1515
import json
1616
import logging
1717
from threading import Thread, Lock, Event
18+
import time
1819

1920
from casbin.model import Model
2021
from redis.client import Redis, PubSub
22+
from redis.backoff import ExponentialBackoff
23+
from redis.retry import Retry as RedisRetry
2124

2225
from redis_watcher.options import WatcherOptions
2326

2427

2528
class RedisWatcher:
26-
def __init__(self):
29+
def __init__(self, logger=None):
2730
self.mutex: Lock = Lock()
2831
self.sub_client: PubSub = None
2932
self.pub_client: Redis = None
3033
self.options: WatcherOptions = None
3134
self.close = None
35+
self.sleep = 0
36+
self.execute_update = False
3237
self.callback: callable = None
3338
self.subscribe_thread: Thread = Thread(target=self.subscribe, daemon=True)
3439
self.subscribe_event = Event()
35-
self.logger = logging.getLogger(__name__)
40+
41+
self.logger = logger if logger else logging.getLogger(__name__)
42+
43+
def recreate_thread(self):
44+
self.sleep = 10
45+
self.execute_update = True
46+
self.subscribe_thread: Thread = Thread(target=self.subscribe, daemon=True)
47+
self.subscribe_event = Event()
48+
self.close = False
49+
self.subscribe_thread.start()
50+
self.subscribe_event.wait(timeout=1)
3651

3752
def init_config(self, option: WatcherOptions):
3853
if option.optional_update_callback:
@@ -47,6 +62,51 @@ def set_update_callback(self, callback: callable):
4762
with self.mutex:
4863
self.callback = callback
4964

65+
def _get_redis_conn(self):
66+
"""
67+
Creates a new redis connection instance
68+
"""
69+
rds = Redis(
70+
host=self.options.host,
71+
port=self.options.port,
72+
password=self.options.password,
73+
ssl=self.options.ssl,
74+
retry=RedisRetry(ExponentialBackoff(), 3),
75+
)
76+
return rds
77+
78+
def init_publisher_subscriber(self, init_pub=True, init_sub=True):
79+
"""
80+
Initialize the publisher and subscriber subscribers
81+
NOTE: A new Redis connection is created for the publisher and subscriber because since Redis5
82+
the connection needs to be created by thread
83+
Args:
84+
init_pub (bool, optional): Whether to initialize the publisher subscriber. Defaults to True.
85+
init_sub (bool, optional): Whether to initialize the publisher subscriber. Defaults to True.
86+
"""
87+
try:
88+
if init_pub:
89+
rds = self._get_redis_conn()
90+
if not rds.ping():
91+
raise Exception("Redis not responding.")
92+
self.pub_client = rds.client()
93+
94+
if init_sub:
95+
rds = self._get_redis_conn()
96+
if not rds.ping():
97+
raise Exception("Redis not responding.")
98+
self.sub_client = rds.client().pubsub()
99+
except Exception as e:
100+
if self.pub_client:
101+
self.pub_client.close()
102+
if self.sub_client:
103+
self.sub_client.close()
104+
self.pub_client = None
105+
self.sub_client = None
106+
print(
107+
f"Casbin Redis Watcher error: {e}. Publisher/Subscriber failed to be initialized {self.options.local_ID}"
108+
)
109+
50110
def update(self):
51111
def func():
52112
with self.mutex:
@@ -103,12 +163,16 @@ def func():
103163
def default_callback_func(msg: str):
104164
print("callback: " + msg)
105165

106-
@staticmethod
107-
def log_record(f: callable):
166+
def log_record(self, f: callable):
108167
try:
168+
if not self.pub_client:
169+
rds = self._get_redis_conn()
170+
self.pub_client = rds.client()
109171
result = f()
110172
except Exception as e:
111-
print(f"Casbin Redis Watcher error: {e}")
173+
if self.pub_client:
174+
self.pub_client.close()
175+
print(f"Casbin Redis Watcher error: {e}. Publisher failure on the worker {self.options.local_ID}")
112176
else:
113177
return result
114178

@@ -117,13 +181,64 @@ def unsubscribe(psc: PubSub):
117181
return psc.unsubscribe()
118182

119183
def subscribe(self):
120-
self.sub_client.subscribe(self.options.channel)
121-
for item in self.sub_client.listen():
122-
if not self.subscribe_event.is_set():
123-
self.subscribe_event.set()
124-
if item is not None and item["type"] == "message":
125-
with self.mutex:
126-
self.callback(str(item))
184+
time.sleep(self.sleep)
185+
try:
186+
if not self.sub_client:
187+
rds = self._get_redis_conn()
188+
self.sub_client = rds.client().pubsub()
189+
self.sub_client.subscribe(self.options.channel)
190+
print(f"Waiting for casbin updates... in the worker: {self.options.local_ID}")
191+
if self.execute_update:
192+
self.update()
193+
try:
194+
for item in self.sub_client.listen():
195+
if not self.subscribe_event.is_set():
196+
self.subscribe_event.set()
197+
if item is not None and item["type"] == "message":
198+
try:
199+
with self.mutex:
200+
self.callback(str(item))
201+
except Exception as listen_exc:
202+
print(
203+
"Casbin Redis watcher failed sending update to teh callback function "
204+
" process due to: {}".format(str(listen_exc))
205+
)
206+
if self.sub_client:
207+
self.sub_client.close()
208+
break
209+
except Exception as sub_exc:
210+
print("Casbin Redis watcher failed to get message from redis due to {}".format(str(sub_exc)))
211+
if self.sub_client:
212+
self.sub_client.close()
213+
except Exception as redis_exc:
214+
print("Casbin Redis watcher failed to subscribe due to: {}".format(str(redis_exc)))
215+
finally:
216+
if self.sub_client:
217+
self.sub_client.close()
218+
219+
def should_reload(self, recreate=True):
220+
"""
221+
Checks is the thread and event are still alive, if they are not they are recreated.
222+
If they were recreated the watcher should reload the policies.
223+
Args:
224+
recreate(bool): recreates the thread if it's dead for redis timeouts
225+
"""
226+
try:
227+
if self.subscribe_thread.is_alive() and self.subscribe_event.is_set():
228+
return False
229+
else:
230+
if recreate and not self.subscribe_thread.is_alive():
231+
print(f"Casbin Redis Watcher will be recreated for the worker {self.options.local_ID} in 10 secs.")
232+
self.recreate_thread()
233+
return True
234+
except Exception:
235+
return True
236+
237+
def update_callback(self):
238+
"""
239+
This method was created to cover the function that flask_authz calls
240+
"""
241+
self.update()
127242

128243

129244
class MSG:
@@ -140,18 +255,15 @@ def marshal_binary(self):
140255
@staticmethod
141256
def unmarshal_binary(data: bytes):
142257
loaded = json.loads(data)
258+
loaded.pop("params", None)
143259
return MSG(**loaded)
144260

145261

146-
def new_watcher(option: WatcherOptions):
262+
def new_watcher(option: WatcherOptions, logger=None):
147263
option.init_config()
148-
w = RedisWatcher()
149-
rds = Redis(host=option.host, port=option.port, password=option.password, ssl=option.ssl)
150-
if rds.ping() is False:
151-
raise Exception("Redis server is not available.")
152-
w.sub_client = rds.client().pubsub()
153-
w.pub_client = rds.client()
264+
w = RedisWatcher(logger)
154265
w.init_config(option)
266+
w.init_publisher_subscriber()
155267
w.close = False
156268
w.subscribe_thread.start()
157269
w.subscribe_event.wait(timeout=5)
@@ -161,10 +273,7 @@ def new_watcher(option: WatcherOptions):
161273
def new_publish_watcher(option: WatcherOptions):
162274
option.init_config()
163275
w = RedisWatcher()
164-
rds = Redis(host=option.host, port=option.port, password=option.password, ssl=option.ssl)
165-
if rds.ping() is False:
166-
raise Exception("Redis server is not available.")
167-
w.pub_client = rds.client()
168276
w.init_config(option)
277+
w.init_publisher_subscriber(init_sub=False)
169278
w.close = False
170279
return w

requirements.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,2 @@
11
casbin~=1.18
2-
redis==4.5.2
2+
redis

0 commit comments

Comments
 (0)