Skip to content

Commit 9c1dee7

Browse files
committed
shared event hub connection across sessions
1 parent 4f26f7f commit 9c1dee7

4 files changed

Lines changed: 738 additions & 70 deletions

File tree

Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
# :coding: utf-8
2+
# :copyright: Copyright (c) 2024 ftrack
3+
4+
"""Session-scoped EventHub proxy for tracking subscriptions."""
5+
6+
from __future__ import absolute_import
7+
8+
from builtins import object
9+
10+
11+
class SessionEventHubProxy(object):
12+
"""Proxy for EventHub that tracks subscriptions per session.
13+
14+
This wrapper intercepts subscribe() and unsubscribe() calls to maintain
15+
a session-local list of subscriber IDs. When the session is closed, only
16+
the subscribers created by that session are unsubscribed, leaving other
17+
sessions' subscribers intact on the shared EventHub.
18+
"""
19+
20+
def __init__(self, event_hub, session):
21+
"""Initialize the proxy.
22+
23+
Args:
24+
event_hub: The underlying EventHub instance (may be shared).
25+
session: The Session instance that owns this proxy.
26+
"""
27+
self._event_hub = event_hub
28+
self._session = session
29+
30+
def subscribe(self, subscription, callback, subscriber=None, priority=100):
31+
"""Subscribe to events and track the subscriber ID for this session.
32+
33+
All arguments are passed through to the underlying EventHub.subscribe().
34+
The returned subscriber ID is tracked for cleanup when this session closes.
35+
36+
Args:
37+
subscription (str): The subscription expression.
38+
callback (callable): The callback to invoke when matching events occur.
39+
subscriber (dict): Optional subscriber metadata.
40+
priority (int): Optional priority (lower = earlier execution).
41+
42+
Returns:
43+
str: The subscriber identifier.
44+
"""
45+
subscriber_id = self._event_hub.subscribe(
46+
subscription, callback, subscriber=subscriber, priority=priority
47+
)
48+
49+
# Track this subscriber for session cleanup
50+
if not hasattr(self._session, '_session_subscribers'):
51+
self._session._session_subscribers = []
52+
self._session._session_subscribers.append(subscriber_id)
53+
54+
return subscriber_id
55+
56+
def unsubscribe(self, subscriber_identifier):
57+
"""Unsubscribe and remove from session tracking.
58+
59+
Args:
60+
subscriber_identifier (str): The subscriber identifier to unsubscribe.
61+
"""
62+
self._event_hub.unsubscribe(subscriber_identifier)
63+
64+
# Remove from session tracking
65+
if hasattr(self._session, '_session_subscribers'):
66+
if subscriber_identifier in self._session._session_subscribers:
67+
self._session._session_subscribers.remove(
68+
subscriber_identifier)
69+
70+
def __getattr__(self, name):
71+
"""Delegate all other method/attribute access to the underlying EventHub.
72+
73+
This allows the proxy to be used transparently as if it were the
74+
actual EventHub instance.
75+
76+
Args:
77+
name (str): The attribute name to access.
78+
79+
Returns:
80+
The attribute from the underlying EventHub.
81+
"""
82+
return getattr(self._event_hub, name)
83+
84+
def __repr__(self):
85+
"""Return string representation."""
86+
return '<SessionEventHubProxy for {0}>'.format(self._event_hub)
Lines changed: 190 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,190 @@
1+
# :coding: utf-8
2+
# :copyright: Copyright (c) 2024 ftrack
3+
4+
"""EventHub singleton registry for connection sharing across sessions."""
5+
6+
from __future__ import absolute_import
7+
8+
import threading
9+
import weakref
10+
import logging
11+
12+
13+
class EventHubRegistry(object):
14+
"""Global registry for shared EventHub instances.
15+
16+
Manages EventHub instances to ensure that sessions with identical
17+
credentials can share the same WebSocket connection to the server.
18+
19+
This prevents connection exhaustion when multiple Session instances
20+
are created with the same credentials.
21+
"""
22+
23+
def __init__(self):
24+
"""Initialize the registry."""
25+
self._hubs = {} # (server_url, api_user, api_key) -> EventHub
26+
self._hub_sessions = {} # hub_key -> WeakSet of session weakrefs
27+
self._hub_connect_locks = {} # hub_key -> Lock (for first connection)
28+
self._lock = threading.RLock()
29+
self.logger = logging.getLogger(
30+
__name__ + '.' + self.__class__.__name__)
31+
32+
def get_or_create(
33+
self,
34+
server_url,
35+
api_user,
36+
api_key,
37+
headers=None,
38+
cookies=None,
39+
auto_connect=False,
40+
):
41+
"""Get existing EventHub or create new one for credentials.
42+
43+
Args:
44+
server_url (str): The ftrack server URL.
45+
api_user (str): The API user to authenticate as.
46+
api_key (str): The API key to authenticate with.
47+
headers (dict): Optional custom headers.
48+
cookies (dict): Optional custom cookies.
49+
auto_connect (bool): Whether to automatically connect the hub.
50+
51+
Returns:
52+
EventHub: Shared EventHub instance for these credentials.
53+
54+
Note:
55+
Headers and cookies are only used when creating a new hub.
56+
If a hub already exists for these credentials, the provided
57+
headers/cookies are ignored (the existing hub is returned).
58+
"""
59+
# Create key from credentials
60+
# Note: headers/cookies not included in key for simplicity
61+
# This means sessions with same credentials share hub even if headers differ
62+
key = (server_url, api_user, api_key)
63+
64+
with self._lock:
65+
if key not in self._hubs:
66+
# Import here to avoid circular dependency
67+
import ftrack_api.event.hub
68+
69+
# Create new hub
70+
hub = ftrack_api.event.hub.EventHub(
71+
server_url, api_user, api_key, headers=headers, cookies=cookies
72+
)
73+
74+
self._hubs[key] = hub
75+
self._hub_sessions[key] = weakref.WeakSet()
76+
self._hub_connect_locks[key] = threading.Lock()
77+
78+
self.logger.debug(
79+
'Created new shared EventHub for {0}@{1}'.format(
80+
api_user, server_url
81+
)
82+
)
83+
84+
# Connect if requested
85+
if auto_connect:
86+
self._ensure_connected(key, hub)
87+
else:
88+
existing_hub = self._hubs[key]
89+
self.logger.debug(
90+
'Reusing existing EventHub for {0}@{1} ({2} active sessions)'.format(
91+
api_user, server_url, len(self._hub_sessions[key])
92+
)
93+
)
94+
95+
# If auto_connect requested and hub not connected, connect it
96+
if auto_connect and not existing_hub.connected:
97+
self._ensure_connected(key, existing_hub)
98+
99+
hub = existing_hub
100+
101+
return hub
102+
103+
def _ensure_connected(self, hub_key, hub):
104+
"""Ensure the hub is connected (thread-safe).
105+
106+
Args:
107+
hub_key: The hub registry key.
108+
hub: The EventHub instance to connect.
109+
"""
110+
# Use per-hub lock to ensure only one thread initiates connection
111+
with self._hub_connect_locks[hub_key]:
112+
if not hub.connected and not hub._connection_initialised:
113+
hub.init_connection()
114+
# Connect in background thread
115+
connect_thread = threading.Thread(target=hub.connect)
116+
connect_thread.daemon = True
117+
connect_thread.start()
118+
119+
def register_session(self, hub_key, session_weakref):
120+
"""Register a session as using this hub.
121+
122+
Args:
123+
hub_key: The hub registry key (server_url, api_user, api_key).
124+
session_weakref: Weak reference to the session.
125+
"""
126+
with self._lock:
127+
if hub_key in self._hub_sessions:
128+
self._hub_sessions[hub_key].add(session_weakref)
129+
130+
def on_session_deleted(self, hub_key):
131+
"""Callback when a session is garbage collected.
132+
133+
Args:
134+
hub_key: The hub registry key.
135+
"""
136+
with self._lock:
137+
if hub_key not in self._hub_sessions:
138+
return
139+
140+
# WeakSet automatically removes dead references
141+
active_sessions = len(self._hub_sessions[hub_key])
142+
143+
if active_sessions == 0:
144+
# No more sessions using this hub - disconnect and clean up
145+
hub = self._hubs.pop(hub_key, None)
146+
self._hub_sessions.pop(hub_key, None)
147+
self._hub_connect_locks.pop(hub_key, None)
148+
149+
if hub:
150+
try:
151+
if hub.connected:
152+
# Disconnect without unsubscribing - subscribers already cleaned up
153+
hub.disconnect(unsubscribe=False)
154+
self.logger.debug(
155+
'Disconnected unused shared EventHub for {0}@{1}'.format(
156+
hub_key[1], hub_key[0]
157+
)
158+
)
159+
except Exception as error:
160+
self.logger.debug(
161+
'Error disconnecting unused EventHub: {0}'.format(
162+
error)
163+
)
164+
165+
def get_session_count(self, hub_key):
166+
"""Get the number of active sessions for a hub.
167+
168+
Args:
169+
hub_key: The hub registry key.
170+
171+
Returns:
172+
int: Number of active sessions using this hub.
173+
"""
174+
with self._lock:
175+
if hub_key in self._hub_sessions:
176+
return len(self._hub_sessions[hub_key])
177+
return 0
178+
179+
180+
# Global singleton instance
181+
_GLOBAL_EVENT_HUB_REGISTRY = EventHubRegistry()
182+
183+
184+
def get_event_hub_registry():
185+
"""Get the global EventHub registry instance.
186+
187+
Returns:
188+
EventHubRegistry: The global registry instance.
189+
"""
190+
return _GLOBAL_EVENT_HUB_REGISTRY

0 commit comments

Comments
 (0)