fix(rab): run async background boundary refresh on detached session#17441
fix(rab): run async background boundary refresh on detached session#17441nbayati wants to merge 17 commits into
Conversation
Prevents fast primary API calls from closing the underlying aiohttp session mid-flight and breaking background Regional Access Boundary (RAB) lookups. Adds a clone() method to async Request adapters to run background refreshes on an independent session, closing it cleanly when finished.
There was a problem hiding this comment.
Code Review
This pull request introduces request cloning for asynchronous regional access boundary refreshes to prevent race conditions where a foreground call closes a shared session. It adds a clone() method to the aiohttp request adapters and ensures that cloned requests are cleanly closed. The review feedback recommends ensuring the inspect module is imported and improving the exception handling when closing the cloned request by catching general exceptions and logging a warning instead of silently ignoring specific errors.
| await self._session.close() | ||
| self._closed = True | ||
|
|
||
| def clone(self) -> "Request": |
There was a problem hiding this comment.
The current clone() implementation creates a brand-new, default ClientSession. This drops all customized connection-level configurations on the original transport, such as proxies, SSL/TLS contexts, and private CA certificates (which are configured on the underlying TCPConnector), along with session-level default headers, cookies, basic auth, and timeouts. This will cause requests to fail in secure enterprise environments requiring custom proxy or cert setups.
There was a problem hiding this comment.
Done. I updated Request.clone() to fully reconstruct the underlying network connector (TCPConnector and UnixConnector) so that custom SSL contexts, IP bindings, and connection limits are preserved. I also made sure that all session-level defaults (including headers, cookies, basic auth, and custom timeouts) are completely mirrored in the cloned session.
| def _clone(self) -> "Request": | ||
| """Creates a copy of this request adapter. | ||
| The base implementation returns `self` (an identical shared instance). |
There was a problem hiding this comment.
nit: I still think a name other than clone should be considered. Gemini suggests _isolate() or _branch(). But this doesn't matter too much if it's internal
| return new_request, cloned_callable, is_cloned | ||
|
|
||
|
|
||
| async def _close_cloned_request(lookup_request, is_cloned): |
There was a problem hiding this comment.
nit: It seems like _prepare_async_lookup_callable and _close_cloned_request would be a good fit for a context manager. That would let us encapsulate these three variables, and enforce automatic closing.
Gemini put this together:
@contextlib.asynccontextmanager
async def _managed_lookup_callable(request):
"""An async context manager that prepares a cloned lookup callable
and guarantees its transport is closed on exit.
"""
lookup_callable, lookup_request, is_cloned = _prepare_async_lookup_callable(request)
try:
yield lookup_callable
finally:
await _close_cloned_request(lookup_request, is_cloned)
# ... Inside your class/function where _worker is defined:
async def _worker():
try:
async with _managed_lookup_callable(request) as lookup_callable:
regional_access_boundary_info = (
await credentials._lookup_regional_access_boundary(lookup_callable)
)
except Exception as e:
if _helpers.is_logging_enabled(_LOGGER):
_LOGGER.warning(
"Failed regional access boundary lookup: %s",
e,
exc_info=True
)
regional_access_boundary_info = None
But this is just a suggestion that came to mind, I think it's fine to merge as-is too.
lsirac
left a comment
There was a problem hiding this comment.
Here are 3 critical, subtle architectural findings regarding the custom _clone() implementation that was recently introduced for async Regional Access Boundary refreshes.
| # Copy underlying connection pool settings (SSL context, IP bindings, limits). | ||
| orig_connector = getattr(self._session, "_connector", None) | ||
| if orig_connector and not orig_connector.closed: | ||
| if isinstance(orig_connector, aiohttp.TCPConnector): |
There was a problem hiding this comment.
The _clone() implementation explicitly checks only for standard TCPConnector and UnixConnector instances. If the original session is configured with a custom, proxy, or subclassed connector (such as corporate SOCKS or tunneling proxies), the check falls through and the cloned session is created with a default, direct-connection TCPConnector.
This silently drops the proxy/custom configuration and routes traffic directly over the public internet, which will fail or violate security constraints in enterprise/isolated cloud environments. We should either explicitly support proxy preservation or raise a clear transport exception if an unsupported custom connector is detected.
There was a problem hiding this comment.
That's a really good point.
We can't really support proxy preservation because third-party aiohttp connectors have arbitrary, unknown constructor signatures (meaning we have no way to instantiate a fresh detached copy of them dynamically), and simply shallow-copying the existing connector is unsafe due to shared socket pools. This leaves us two options: fallback to re-using the customer transport and hope that we don't encounter the bug this PR is trying to fix, or raise the exception as you suggested and accept this as a limitation of RAB.
I've decided not to fallback to re-using the customer's transport if we can't clone it, because it's not just that the RAB call would fail, but also there's another risk: if the foreground task closes the session while the background worker is actively reading from it, the forceful socket truncation mid-flight can leave complex corporate proxy connections in a hung or corrupted state, which means that the affects won't be limited to our RAB calls. So I've added the else: raise exceptions.TransportError(...) block, as raising the error here is the safest path. The exception will trigger the 15-minute cooldown and allow the user's main request to proceed safely.
I thought about disabling RAB permanently if we can't clone the transport (thinking what's the point of entering cooldown if we're going to keep trying to clone it and fail), but decided against it. I realized that because credentials objects are frequently instantiated globally and shared across multiple different clients and API surfaces, there's a chance that the next call would be executed over entirely different transports, making the RAB call possible.
… aiohttp cloning and add unit tests
| lookup_callable, | ||
| lookup_request, | ||
| is_cloned, | ||
| ) = _prepare_async_lookup_callable(request) |
There was a problem hiding this comment.
ClientSession can still be closed and the request will still fail with the current implementation as with the request being created inside the background worker, start_refresh() will potentially return before _clone() runs.
There was a problem hiding this comment.
Done. I've updated the implementation to perform request cloning synchronously inside start_refresh before dispatching the background _worker task, ensuring the session configuration is captured before the foreground block exits.
| Returns: | ||
| google.auth.aio.transport.aiohttp.Request: A request adapter copy | ||
| running a new aiohttp.ClientSession with identical connection, | ||
| proxy, and session configurations. |
There was a problem hiding this comment.
This docstring can be reworded. I think the current implementation of clone does not actually clone everything. Also doesn't mention unsupported cases.
There was a problem hiding this comment.
Done. Reworded the docstring to document that only standard TCP and Unix connector types are supported, that the DNS resolver is explicitly excluded from the clone, and documented the TransportError exception.
| maybe_coro = lookup_request.close() | ||
| if is_async := inspect.iscoroutine(maybe_coro): |
There was a problem hiding this comment.
I believe we are missing valid awaitable cases here as written (e.g. if Future returned by custom transports)
There was a problem hiding this comment.
Done. Updated _close_cloned_request to check inspect.isawaitable instead of inspect.iscoroutine to support asyncio.Future and other awaitables, and added a unit test.
…losure race in RAB background refresh
When AuthorizedSession.request() makes an API call, it runs inside a temporary aiohttp ClientSession block. If our background Regional Access Boundary (RAB) refresh worker naively shares this exact same session, a fast primary call (like an instant 401/403 or a quick CRM check) will exit its block and close the active socket mid-flight. This causes the background worker to silently fail with "RuntimeError: Session is closed" and forces the RAB manager into a 15-minute cooldown.
This commit solves the race condition by giving the background worker its own separate transport session:
To recreate the bug consistently and verify our fix, I used a reproduction script (paste/6312404345552896) that simulates a fast main API call (like an instant 401/403 Edge drop or highly optimized endpoint) by cancelling the primary request 30ms after it is dispatched.