diff --git a/google/auth/_constants.py b/google/auth/_constants.py index 28e47025f..45c5b785c 100644 --- a/google/auth/_constants.py +++ b/google/auth/_constants.py @@ -1,5 +1,5 @@ """Shared constants.""" -_SERVICE_ACCOUNT_TRUST_BOUNDARY_LOOKUP_ENDPOINT = "https://iamcredentials.{universe_domain}/v1/projects/-/serviceAccounts/{service_account_email}/allowedLocations" -_WORKFORCE_POOL_TRUST_BOUNDARY_LOOKUP_ENDPOINT = "https://iamcredentials.{universe_domain}/v1/locations/global/workforcePools/{pool_id}/allowedLocations" -_WORKLOAD_IDENTITY_POOL_TRUST_BOUNDARY_LOOKUP_ENDPOINT = "https://iamcredentials.{universe_domain}/v1/projects/{project_number}/locations/global/workloadIdentityPools/{pool_id}/allowedLocations" +_SERVICE_ACCOUNT_REGIONAL_ACCESS_BOUNDARY_LOOKUP_ENDPOINT = "https://iamcredentials.googleapis.com/v1/projects/-/serviceAccounts/{service_account_email}/allowedLocations" +_WORKFORCE_POOL_REGIONAL_ACCESS_BOUNDARY_LOOKUP_ENDPOINT = "https://iamcredentials.googleapis.com/v1/locations/global/workforcePools/{pool_id}/allowedLocations" +_WORKLOAD_IDENTITY_POOL_REGIONAL_ACCESS_BOUNDARY_LOOKUP_ENDPOINT = "https://iamcredentials.googleapis.com/v1/projects/{project_number}/locations/global/workloadIdentityPools/{pool_id}/allowedLocations" diff --git a/google/auth/_helpers.py b/google/auth/_helpers.py index 750631aa5..5a7e71623 100644 --- a/google/auth/_helpers.py +++ b/google/auth/_helpers.py @@ -314,8 +314,7 @@ def get_bool_from_env(variable_name, default=False): The environment variable is interpreted as a boolean with the following (case-insensitive) rules: - "true", "1" are considered true. - - "false", "0" are considered false. - Any other values will raise an exception. + - Any other value (or unset) is considered false. Args: variable_name (str): The name of the environment variable. @@ -324,10 +323,6 @@ def get_bool_from_env(variable_name, default=False): Returns: bool: The boolean value of the environment variable. - - Raises: - google.auth.exceptions.InvalidValue: If the environment variable is - set to a value that can not be interpreted as a boolean. """ value = os.environ.get(variable_name) @@ -338,14 +333,8 @@ def get_bool_from_env(variable_name, default=False): if value in ("true", "1"): return True - elif value in ("false", "0"): - return False else: - raise exceptions.InvalidValue( - 'Environment variable "{}" must be one of "true", "false", "1", or "0".'.format( - variable_name - ) - ) + return False def is_python_3(): diff --git a/google/auth/_regional_access_boundary_utils.py b/google/auth/_regional_access_boundary_utils.py new file mode 100644 index 000000000..c878a83a8 --- /dev/null +++ b/google/auth/_regional_access_boundary_utils.py @@ -0,0 +1,111 @@ +"""Utilities for Regional Access Boundary management.""" + +import datetime +import logging +import threading + +from google.auth import _helpers + +_LOGGER = logging.getLogger(__name__) + + +# The default lifetime for a cached Regional Access Boundary. +DEFAULT_REGIONAL_ACCESS_BOUNDARY_TTL = datetime.timedelta(hours=6) + +# The period of time prior to the boundary's expiration when a background refresh +# is proactively triggered. +REGIONAL_ACCESS_BOUNDARY_REFRESH_THRESHOLD = datetime.timedelta(hours=1) + +# The initial cooldown period for a failed Regional Access Boundary lookup. +DEFAULT_REGIONAL_ACCESS_BOUNDARY_COOLDOWN = datetime.timedelta(minutes=15) + +# The maximum cooldown period for a failed Regional Access Boundary lookup. +MAX_REGIONAL_ACCESS_BOUNDARY_COOLDOWN = datetime.timedelta(hours=6) + + +class _RegionalAccessBoundaryRefreshThread(threading.Thread): + """Thread for background refreshing of the Regional Access Boundary.""" + + def __init__(self, credentials, request): + super(_RegionalAccessBoundaryRefreshThread, self).__init__() + self.daemon = True + self._credentials = credentials + self._request = request + + def run(self): + """ + Performs the Regional Access Boundary lookup and updates the credential's state. + + This method is run in a separate thread. It delegates the actual lookup + to the credentials object's `_lookup_regional_access_boundary` method. + Based on the lookup's outcome (success or complete failure after retries), + it updates the credential's cached Regional Access Boundary information, + its expiry, its cooldown expiry, and its exponential cooldown duration. + """ + regional_access_boundary_info = ( + self._credentials._lookup_regional_access_boundary(self._request) + ) + + with self._credentials._stale_boundary_lock: # Acquire the lock + if regional_access_boundary_info: + # On success, update the boundary and its expiry, and clear any cooldown. + self._credentials._regional_access_boundary = ( + regional_access_boundary_info + ) + self._credentials._regional_access_boundary_expiry = ( + _helpers.utcnow() + DEFAULT_REGIONAL_ACCESS_BOUNDARY_TTL + ) + self._credentials._regional_access_boundary_cooldown_expiry = None + # Reset the cooldown duration on success. + self._credentials._current_rab_cooldown_duration = ( + DEFAULT_REGIONAL_ACCESS_BOUNDARY_COOLDOWN + ) + if _helpers.is_logging_enabled(_LOGGER): + _LOGGER.debug( + "Asynchronous Regional Access Boundary lookup successful." + ) + else: + # On complete failure, calculate the next exponential cooldown duration and set the cooldown expiry. + if _helpers.is_logging_enabled(_LOGGER): + _LOGGER.warning( + "Asynchronous Regional Access Boundary lookup failed. Entering cooldown." + ) + self._credentials._regional_access_boundary_cooldown_expiry = ( + _helpers.utcnow() + self._credentials._current_rab_cooldown_duration + ) + new_cooldown_duration = ( + self._credentials._current_rab_cooldown_duration * 2 + ) + self._credentials._current_rab_cooldown_duration = min( + new_cooldown_duration, MAX_REGIONAL_ACCESS_BOUNDARY_COOLDOWN + ) + # If the proactive refresh failed, clear any existing expired RAB data. + # This ensures we don't continue using stale data. + self._credentials._regional_access_boundary = None + self._credentials._regional_access_boundary_expiry = None + + +class _RegionalAccessBoundaryRefreshManager(object): + """Manages a thread for background refreshing of the Regional Access Boundary.""" + + def __init__(self): + self._lock = threading.Lock() + self._worker = None + + def start_refresh(self, credentials, request): + """ + Starts a background thread to refresh the Regional Access Boundary if one is not already running. + + Args: + credentials (CredentialsWithRegionalAccessBoundary): The credentials + to refresh. + request (google.auth.transport.Request): The object used to make + HTTP requests. + """ + with self._lock: + if self._worker and self._worker.is_alive(): + # A refresh is already in progress. + return + + self._worker = _RegionalAccessBoundaryRefreshThread(credentials, request) + self._worker.start() diff --git a/google/auth/compute_engine/credentials.py b/google/auth/compute_engine/credentials.py index 9507e837f..9af8aa020 100644 --- a/google/auth/compute_engine/credentials.py +++ b/google/auth/compute_engine/credentials.py @@ -20,7 +20,9 @@ """ import datetime +import logging +from google.auth import _constants from google.auth import _helpers from google.auth import credentials from google.auth import exceptions @@ -30,16 +32,14 @@ from google.auth.compute_engine import _metadata from google.oauth2 import _client -_TRUST_BOUNDARY_LOOKUP_ENDPOINT = ( - "https://iamcredentials.{}/v1/projects/-/serviceAccounts/{}/allowedLocations" -) +_LOGGER = logging.getLogger(__name__) class Credentials( credentials.Scoped, credentials.CredentialsWithQuotaProject, credentials.CredentialsWithUniverseDomain, - credentials.CredentialsWithTrustBoundary, + credentials.CredentialsWithRegionalAccessBoundary, ): """Compute Engine Credentials. @@ -66,7 +66,6 @@ def __init__( scopes=None, default_scopes=None, universe_domain=None, - trust_boundary=None, ): """ Args: @@ -82,7 +81,6 @@ def __init__( provided or None, credential will attempt to fetch the value from metadata server. If metadata server doesn't have universe domain endpoint, then the default googleapis.com will be used. - trust_boundary (Mapping[str,str]): A credential trust boundary. """ super(Credentials, self).__init__() self._service_account_email = service_account_email @@ -93,7 +91,6 @@ def __init__( if universe_domain: self._universe_domain = universe_domain self._universe_domain_cached = True - self._trust_boundary = trust_boundary def _retrieve_info(self, request): """Retrieve information about the service account. @@ -146,8 +143,8 @@ def _perform_refresh_token(self, request): new_exc = exceptions.RefreshError(caught_exc) raise new_exc from caught_exc - def _build_trust_boundary_lookup_url(self): - """Builds and returns the URL for the trust boundary lookup API for GCE.""" + def _build_regional_access_boundary_lookup_url(self): + """Builds and returns the URL for the regional access boundary lookup API for GCE.""" # If the service account email is 'default', we need to get the # actual email address from the metadata server. if self._service_account_email == "default": @@ -157,24 +154,26 @@ def _build_trust_boundary_lookup_url(self): try: info = _metadata.get_service_account_info(request, "default") if not info or "email" not in info: - raise exceptions.RefreshError( + _LOGGER.error( "Unexpected response from metadata server: " - "service account info is missing 'email' field." + "service account info is missing 'email' field. Cannot build Regional Access Boundary lookup URL." ) + return None self._service_account_email = info["email"] except exceptions.TransportError as e: # If fetching the service account email fails due to a transport error, - # it means we cannot build the trust boundary lookup URL. - # Wrap this in a RefreshError so it's caught by _refresh_trust_boundary. - raise exceptions.RefreshError( - "Failed to get service account email for trust boundary lookup: {}".format( - e - ) - ) from e + # it means we cannot build the regional access boundary lookup URL. + _LOGGER.error( + "Failed to get service account email to build Regional Access Boundary lookup URL: %s", + e, + ) + return None - return _TRUST_BOUNDARY_LOOKUP_ENDPOINT.format( - self.universe_domain, self.service_account_email + return ( + _constants._SERVICE_ACCOUNT_REGIONAL_ACCESS_BOUNDARY_LOOKUP_ENDPOINT.format( + service_account_email=self.service_account_email + ) ) @property @@ -211,17 +210,22 @@ def get_cred_info(self): "principal": self.service_account_email, } - @_helpers.copy_docstring(credentials.CredentialsWithQuotaProject) - def with_quota_project(self, quota_project_id): + def _make_copy(self): creds = self.__class__( service_account_email=self._service_account_email, - quota_project_id=quota_project_id, + quota_project_id=self._quota_project_id, scopes=self._scopes, default_scopes=self._default_scopes, universe_domain=self._universe_domain, - trust_boundary=self._trust_boundary, ) creds._universe_domain_cached = self._universe_domain_cached + self._copy_regional_access_boundary_state(creds) + return creds + + @_helpers.copy_docstring(credentials.CredentialsWithQuotaProject) + def with_quota_project(self, quota_project_id): + creds = self._make_copy() + creds._quota_project_id = quota_project_id return creds @_helpers.copy_docstring(credentials.Scoped) @@ -229,39 +233,16 @@ def with_scopes(self, scopes, default_scopes=None): # Compute Engine credentials can not be scoped (the metadata service # ignores the scopes parameter). App Engine, Cloud Run and Flex support # requesting scopes. - creds = self.__class__( - scopes=scopes, - default_scopes=default_scopes, - service_account_email=self._service_account_email, - quota_project_id=self._quota_project_id, - universe_domain=self._universe_domain, - trust_boundary=self._trust_boundary, - ) - creds._universe_domain_cached = self._universe_domain_cached + creds = self._make_copy() + creds._scopes = scopes + creds._default_scopes = default_scopes return creds @_helpers.copy_docstring(credentials.CredentialsWithUniverseDomain) def with_universe_domain(self, universe_domain): - return self.__class__( - scopes=self._scopes, - default_scopes=self._default_scopes, - service_account_email=self._service_account_email, - quota_project_id=self._quota_project_id, - trust_boundary=self._trust_boundary, - universe_domain=universe_domain, - ) - - @_helpers.copy_docstring(credentials.CredentialsWithTrustBoundary) - def with_trust_boundary(self, trust_boundary): - creds = self.__class__( - service_account_email=self._service_account_email, - quota_project_id=self._quota_project_id, - scopes=self._scopes, - default_scopes=self._default_scopes, - universe_domain=self._universe_domain, - trust_boundary=trust_boundary, - ) - creds._universe_domain_cached = self._universe_domain_cached + creds = self._make_copy() + creds._universe_domain = universe_domain + creds._universe_domain_cached = True return creds diff --git a/google/auth/credentials.py b/google/auth/credentials.py index cdb206532..5629c3872 100644 --- a/google/auth/credentials.py +++ b/google/auth/credentials.py @@ -19,17 +19,17 @@ from enum import Enum import logging import os -from typing import List +import threading +from urllib.parse import urlparse from google.auth import _helpers, environment_vars +from google.auth import _regional_access_boundary_utils from google.auth import exceptions from google.auth import metrics from google.auth._credentials_base import _BaseCredentials from google.auth._refresh_worker import RefreshThreadManager DEFAULT_UNIVERSE_DOMAIN = "googleapis.com" -NO_OP_TRUST_BOUNDARY_LOCATIONS: List[str] = [] -NO_OP_TRUST_BOUNDARY_ENCODED_LOCATIONS = "0x0" _LOGGER = logging.getLogger("google.auth._default") @@ -61,10 +61,6 @@ def __init__(self): If this is None, the token is assumed to never expire.""" self._quota_project_id = None """Optional[str]: Project to use for quota and billing purposes.""" - self._trust_boundary = None - """Optional[dict]: Cache of a trust boundary response which has a list - of allowed regions and an encoded string representation of credentials - trust boundary.""" self._universe_domain = DEFAULT_UNIVERSE_DOMAIN """Optional[str]: The universe domain value, default is googleapis.com """ @@ -290,8 +286,21 @@ def with_universe_domain(self, universe_domain): ) -class CredentialsWithTrustBoundary(Credentials): - """Abstract base for credentials supporting ``with_trust_boundary`` factory""" +class CredentialsWithRegionalAccessBoundary(Credentials): + """Abstract base for credentials supporting regional access boundary configuration.""" + + def __init__(self, *args, **kwargs): + super(CredentialsWithRegionalAccessBoundary, self).__init__(*args, **kwargs) + self._regional_access_boundary = None + self._regional_access_boundary_expiry = None + self._regional_access_boundary_cooldown_expiry = None + self._regional_access_boundary_refresh_manager = ( + _regional_access_boundary_utils._RegionalAccessBoundaryRefreshManager() + ) + self._current_rab_cooldown_duration = ( + _regional_access_boundary_utils.DEFAULT_REGIONAL_ACCESS_BOUNDARY_COOLDOWN + ) + self._stale_boundary_lock = threading.Lock() @abc.abstractmethod def _perform_refresh_token(self, request): @@ -307,29 +316,110 @@ def _perform_refresh_token(self, request): """ raise NotImplementedError("_perform_refresh_token must be implemented") - def with_trust_boundary(self, trust_boundary): - """Returns a copy of these credentials with a modified trust boundary. - + def _with_regional_access_boundary(self, regional_access_boundary): + """Returns a copy of these credentials with a modified Regional Access Boundary. + This is an internal method used by credential factory methods (e.g., from_info) + to seed the RAB cache when a new boundary is fetched or explicitly provided. + Because this represents a newly acquired boundary, it is granted a fresh + default TTL and any previous cooldowns are cleared. To copy an existing boundary's + state (including its remaining TTL and cooldowns), use `_copy_regional_access_boundary_state`. Args: - trust_boundary Mapping[str, str]: The trust boundary to use for the - credential. This should be a map with a "locations" key that maps to - a list of GCP regions, and a "encodedLocations" key that maps to a - hex string. - + regional_access_boundary (dict): Must contain an "encodedLocations" key. Returns: google.auth.credentials.Credentials: A new credentials instance. + Raises: + google.auth.exceptions.InvalidValue: If the input is malformed. + """ + if ( + not isinstance(regional_access_boundary, dict) + or "encodedLocations" not in regional_access_boundary + ): + raise exceptions.InvalidValue( + "regional_access_boundary must be a dictionary with an 'encodedLocations' key." + ) + + new_creds = self._make_copy() + new_creds._regional_access_boundary = regional_access_boundary + + new_creds._regional_access_boundary_expiry = ( + _helpers.utcnow() + + _regional_access_boundary_utils.DEFAULT_REGIONAL_ACCESS_BOUNDARY_TTL + ) + + new_creds._regional_access_boundary_cooldown_expiry = None + + return new_creds + + def _copy_regional_access_boundary_state(self, target): + """Copies the regional access boundary state to another instance.""" + target._regional_access_boundary = self._regional_access_boundary + target._regional_access_boundary_expiry = self._regional_access_boundary_expiry + target._regional_access_boundary_cooldown_expiry = ( + self._regional_access_boundary_cooldown_expiry + ) + target._current_rab_cooldown_duration = self._current_rab_cooldown_duration + # Create a new lock for the target instance to ensure independent thread-safety. + target._stale_boundary_lock = threading.Lock() + + def _maybe_start_regional_access_boundary_refresh(self, request, url): + """ + Starts a background thread to refresh the Regional Access Boundary if needed. + + This method checks if a refresh is necessary and if one is not already + in progress or in a cooldown period. If so, it starts a background + thread to perform the lookup. + + Args: + request (google.auth.transport.Request): The object used to make + HTTP requests. + url (str): The URL of the request. """ - raise NotImplementedError("This credential does not support trust boundaries.") + try: + # Do not perform a lookup if the request is for a regional endpoint. + hostname = urlparse(url).hostname + if hostname and ( + hostname.endswith(".rep.googleapis.com") + or hostname.endswith(".rep.sandbox.googleapis.com") + ): + return + except (ValueError, TypeError): + # If the URL is malformed, proceed with the default lookup behavior. + pass + + # A refresh is only needed if the feature is enabled. + if not self._is_regional_access_boundary_lookup_required(): + return - def _is_trust_boundary_lookup_required(self): - """Checks if a trust boundary lookup is required. + # Don't start a new refresh if the Regional Access Boundary info is still fresh. + refresh_threshold = ( + _regional_access_boundary_utils.REGIONAL_ACCESS_BOUNDARY_REFRESH_THRESHOLD + ) + if ( + self._regional_access_boundary + and self._regional_access_boundary_expiry + and _helpers.utcnow() + < (self._regional_access_boundary_expiry - refresh_threshold) + ): + return + + # Don't start a new refresh if the cooldown is still in effect. + if ( + self._regional_access_boundary_cooldown_expiry + and _helpers.utcnow() < self._regional_access_boundary_cooldown_expiry + ): + return + + # If all checks pass, start the background refresh. + self._regional_access_boundary_refresh_manager.start_refresh(self, request) + + def _is_regional_access_boundary_lookup_required(self): + """Checks if a Regional Access Boundary lookup is required. A lookup is required if the feature is enabled via an environment - variable, the universe domain is supported, and a no-op boundary - is not already cached. + variable and the universe domain is supported. Returns: - bool: True if a trust boundary lookup is required, False otherwise. + bool: True if a Regional Access Boundary lookup is required, False otherwise. """ # 1. Check if the feature is enabled via environment variable. if not _helpers.get_bool_from_env( @@ -337,111 +427,87 @@ def _is_trust_boundary_lookup_required(self): ): return False - # 2. Skip trust boundary flow for non-default universe domains. + # 2. Skip for non-default universe domains. if self.universe_domain != DEFAULT_UNIVERSE_DOMAIN: return False - # 3. Do not trigger refresh if credential has a cached no-op trust boundary. - return not self._has_no_op_trust_boundary() + return True - def _get_trust_boundary_header(self): - if self._trust_boundary is not None: - if self._has_no_op_trust_boundary(): - # STS expects an empty string if the trust boundary value is no-op. - return {"x-allowed-locations": ""} - else: - return {"x-allowed-locations": self._trust_boundary["encodedLocations"]} + def _get_regional_access_boundary_header(self): + if self._regional_access_boundary is not None: + return { + "x-allowed-locations": self._regional_access_boundary[ + "encodedLocations" + ] + } return {} def apply(self, headers, token=None): """Apply the token to the authentication header.""" super().apply(headers, token) - headers.update(self._get_trust_boundary_header()) - def refresh(self, request): - """Refreshes the access token and the trust boundary. + boundary_header = self._get_regional_access_boundary_header() + if boundary_header: + headers.update(boundary_header) + else: + # If we have no boundary to add, ensure the header is not present + # from a previous, stale state. We use pop() with a default to + # avoid a KeyError if the header was never there. + headers.pop("x-allowed-locations", None) - This method calls the subclass's token refresh logic and then - refreshes the trust boundary if applicable. + def before_request(self, request, method, url, headers): + """Refreshes the access token and triggers the Regional Access Boundary + lookup if necessary. """ - self._perform_refresh_token(request) - self._refresh_trust_boundary(request) - - def _refresh_trust_boundary(self, request): - """Triggers a refresh of the trust boundary and updates the cache if necessary. + super(CredentialsWithRegionalAccessBoundary, self).before_request( + request, method, url, headers + ) + self._maybe_start_regional_access_boundary_refresh(request, url) - Args: - request (google.auth.transport.Request): The object used to make - HTTP requests. + def refresh(self, request): + """Refreshes the access token. - Raises: - google.auth.exceptions.RefreshError: If the trust boundary could - not be refreshed and no cached value is available. + This method calls the subclass's token refresh logic. The Regional + Access Boundary is refreshed separately in a non-blocking way. """ - if not self._is_trust_boundary_lookup_required(): - return - try: - self._trust_boundary = self._lookup_trust_boundary(request) - except exceptions.RefreshError as error: - # If the call to the lookup API failed, check if there is a trust boundary - # already cached. If there is, do nothing. If not, then throw the error. - if self._trust_boundary is None: - raise error - if _helpers.is_logging_enabled(_LOGGER): - _LOGGER.debug( - "Using cached trust boundary due to refresh error: %s", error - ) - return + self._perform_refresh_token(request) - def _lookup_trust_boundary(self, request): - """Calls the trust boundary lookup API to refresh the trust boundary cache. + def _lookup_regional_access_boundary(self, request): + """Calls the Regional Access Boundary lookup API to retrieve the Regional Access Boundary information. Args: request (google.auth.transport.Request): The object used to make HTTP requests. Returns: - trust_boundary (dict): The trust boundary object returned by the lookup API. - - Raises: - google.auth.exceptions.RefreshError: If the trust boundary could not be - retrieved. + Optional[dict]: The Regional Access Boundary information returned by the lookup API, or None if the lookup failed. """ from google.oauth2 import _client - url = self._build_trust_boundary_lookup_url() + url = self._build_regional_access_boundary_lookup_url() if not url: - raise exceptions.InvalidValue("Failed to build trust boundary lookup URL.") + _LOGGER.error("Failed to build Regional Access Boundary lookup URL.") + return None headers = {} self._apply(headers) - headers.update(self._get_trust_boundary_header()) - return _client._lookup_trust_boundary(request, url, headers=headers) + headers.update(self._get_regional_access_boundary_header()) + return _client._lookup_regional_access_boundary(request, url, headers=headers) @abc.abstractmethod - def _build_trust_boundary_lookup_url(self): + def _build_regional_access_boundary_lookup_url(self): """ - Builds and returns the URL for the trust boundary lookup API. + Builds and returns the URL for the Regional Access Boundary lookup API. This method should be implemented by subclasses to provide the specific URL based on the credential type and its properties. Returns: - str: The URL for the trust boundary lookup endpoint, or None + str: The URL for the Regional Access Boundary lookup endpoint, or None if lookup should be skipped (e.g., for non-applicable universe domains). """ raise NotImplementedError( - "_build_trust_boundary_lookup_url must be implemented" - ) - - def _has_no_op_trust_boundary(self): - # A no-op trust boundary is indicated by encodedLocations being "0x0". - # The "locations" list may or may not be present as an empty list. - if self._trust_boundary is None: - return False - return ( - self._trust_boundary.get("encodedLocations") - == NO_OP_TRUST_BOUNDARY_ENCODED_LOCATIONS + "_build_regional_access_boundary_lookup_url must be implemented" ) diff --git a/google/auth/external_account.py b/google/auth/external_account.py index 05874eda7..7caaca105 100644 --- a/google/auth/external_account.py +++ b/google/auth/external_account.py @@ -34,6 +34,7 @@ import functools import io import json +import logging import re from google.auth import _constants @@ -45,6 +46,8 @@ from google.oauth2 import sts from google.oauth2 import utils +_LOGGER = logging.getLogger(__name__) + # External account JSON type identifier. _EXTERNAL_ACCOUNT_JSON_TYPE = "external_account" # The token exchange grant_type used for exchanging credentials. @@ -82,7 +85,7 @@ class Credentials( credentials.Scoped, credentials.CredentialsWithQuotaProject, credentials.CredentialsWithTokenUri, - credentials.CredentialsWithTrustBoundary, + credentials.CredentialsWithRegionalAccessBoundary, metaclass=abc.ABCMeta, ): """Base class for all external account credentials. @@ -117,7 +120,6 @@ def __init__( default_scopes=None, workforce_pool_user_project=None, universe_domain=credentials.DEFAULT_UNIVERSE_DOMAIN, - trust_boundary=None, ): """Instantiates an external account credentials object. @@ -150,7 +152,6 @@ def __init__( billing/quota. universe_domain (str): The universe domain. The default universe domain is googleapis.com. - trust_boundary (str): String representation of trust boundary meta. Raises: google.auth.exceptions.RefreshError: If the generateAccessToken endpoint returned an error. @@ -176,7 +177,6 @@ def __init__( self._scopes = scopes self._default_scopes = default_scopes self._workforce_pool_user_project = workforce_pool_user_project - self._trust_boundary = trust_boundary if self._client_id: self._client_auth = utils.ClientAuthentication( @@ -242,7 +242,6 @@ def _constructor_args(self): "scopes": self._scopes, "default_scopes": self._default_scopes, "universe_domain": self._universe_domain, - "trust_boundary": self._trust_boundary, } if not self.is_workforce_pool: args.pop("workforce_pool_user_project") @@ -349,6 +348,7 @@ def with_scopes(self, scopes, default_scopes=None): scoped = self.__class__(**kwargs) scoped._cred_file_path = self._cred_file_path scoped._metrics_options = self._metrics_options + self._copy_regional_access_boundary_state(scoped) return scoped @abc.abstractmethod @@ -417,20 +417,9 @@ def refresh(self, request): """Refreshes the access token. For impersonated credentials, this method will refresh the underlying - source credentials and the impersonated credentials. For non-impersonated - credentials, it will refresh the access token and the trust boundary. + source credentials and the impersonated credentials. """ self._perform_refresh_token(request) - self._handle_trust_boundary(request) - - def _handle_trust_boundary(self, request): - # If we are impersonating, the trust boundary is handled by the - # impersonated credentials object. We need to get it from there. - if self._service_account_impersonation_url: - self._trust_boundary = self._impersonated_credentials._trust_boundary - else: - # Otherwise, refresh the trust boundary for the external account. - self._refresh_trust_boundary(request) def _perform_refresh_token(self, request, cert_fingerprint=None): scopes = self._scopes if self._scopes is not None else self._default_scopes @@ -486,8 +475,8 @@ def _perform_refresh_token(self, request, cert_fingerprint=None): self.expiry = now + lifetime - def _build_trust_boundary_lookup_url(self): - """Builds and returns the URL for the trust boundary lookup API.""" + def _build_regional_access_boundary_lookup_url(self): + """Builds and returns the URL for the Regional Access Boundary lookup API.""" url = None # Try to parse as a workload identity pool. # Audience format: //iam.googleapis.com/projects/PROJECT_NUMBER/locations/global/workloadIdentityPools/POOL_ID/providers/PROVIDER_ID @@ -497,8 +486,7 @@ def _build_trust_boundary_lookup_url(self): ) if workload_match: project_number, pool_id = workload_match.groups() - url = _constants._WORKLOAD_IDENTITY_POOL_TRUST_BOUNDARY_LOOKUP_ENDPOINT.format( - universe_domain=self._universe_domain, + url = _constants._WORKLOAD_IDENTITY_POOL_REGIONAL_ACCESS_BOUNDARY_LOOKUP_ENDPOINT.format( project_number=project_number, pool_id=pool_id, ) @@ -510,21 +498,26 @@ def _build_trust_boundary_lookup_url(self): ) if workforce_match: pool_id = workforce_match.groups()[0] - url = _constants._WORKFORCE_POOL_TRUST_BOUNDARY_LOOKUP_ENDPOINT.format( - universe_domain=self._universe_domain, pool_id=pool_id + url = _constants._WORKFORCE_POOL_REGIONAL_ACCESS_BOUNDARY_LOOKUP_ENDPOINT.format( + pool_id=pool_id ) if url: return url else: # If both fail, the audience format is invalid. - raise exceptions.InvalidValue("Invalid audience format.") + _LOGGER.error( + "Invalid audience format for Regional Access Boundary lookup: %s", + self._audience, + ) + return None def _make_copy(self): kwargs = self._constructor_args() new_cred = self.__class__(**kwargs) new_cred._cred_file_path = self._cred_file_path new_cred._metrics_options = self._metrics_options + self._copy_regional_access_boundary_state(new_cred) return new_cred @_helpers.copy_docstring(credentials.CredentialsWithQuotaProject) @@ -546,12 +539,6 @@ def with_universe_domain(self, universe_domain): cred._universe_domain = universe_domain return cred - @_helpers.copy_docstring(credentials.CredentialsWithTrustBoundary) - def with_trust_boundary(self, trust_boundary): - cred = self._make_copy() - cred._trust_boundary = trust_boundary - return cred - def _should_initialize_impersonated_credentials(self): return ( self._service_account_impersonation_url is not None @@ -600,7 +587,6 @@ def _initialize_impersonated_credentials(self): lifetime=self._service_account_impersonation_options.get( "token_lifetime_seconds" ), - trust_boundary=self._trust_boundary, ) def _create_default_metrics_options(self): @@ -667,7 +653,7 @@ def from_info(cls, info, **kwargs): Raises: InvalidValue: For invalid parameters. """ - return cls( + initial_creds = cls( audience=info.get("audience"), subject_token_type=info.get("subject_token_type"), token_url=info.get("token_url"), @@ -687,10 +673,17 @@ def from_info(cls, info, **kwargs): universe_domain=info.get( "universe_domain", credentials.DEFAULT_UNIVERSE_DOMAIN ), - trust_boundary=info.get("trust_boundary"), **kwargs ) + regional_access_boundary = info.get("regional_access_boundary") + if regional_access_boundary: + initial_creds = initial_creds._with_regional_access_boundary( + regional_access_boundary + ) + + return initial_creds + @classmethod def from_file(cls, filename, **kwargs): """Creates a Credentials instance from an external account json file. diff --git a/google/auth/external_account_authorized_user.py b/google/auth/external_account_authorized_user.py index 680fce628..77fa92293 100644 --- a/google/auth/external_account_authorized_user.py +++ b/google/auth/external_account_authorized_user.py @@ -36,6 +36,7 @@ import datetime import io import json +import logging import re from google.auth import _constants @@ -45,6 +46,8 @@ from google.oauth2 import sts from google.oauth2 import utils +_LOGGER = logging.getLogger(__name__) + _EXTERNAL_ACCOUNT_AUTHORIZED_USER_JSON_TYPE = "external_account_authorized_user" @@ -52,7 +55,7 @@ class Credentials( credentials.CredentialsWithQuotaProject, credentials.ReadOnlyScoped, credentials.CredentialsWithTokenUri, - credentials.CredentialsWithTrustBoundary, + credentials.CredentialsWithRegionalAccessBoundary, ): """Credentials for External Account Authorized Users. @@ -87,7 +90,6 @@ def __init__( scopes=None, quota_project_id=None, universe_domain=credentials.DEFAULT_UNIVERSE_DOMAIN, - trust_boundary=None, ): """Instantiates a external account authorized user credentials object. @@ -113,7 +115,6 @@ def __init__( create the credentials. universe_domain (Optional[str]): The universe domain. The default value is googleapis.com. - trust_boundary (Mapping[str,str]): A credential trust boundary. Returns: google.auth.external_account_authorized_user.Credentials: The @@ -134,7 +135,6 @@ def __init__( self._scopes = scopes self._universe_domain = universe_domain or credentials.DEFAULT_UNIVERSE_DOMAIN self._cred_file_path = None - self._trust_boundary = trust_boundary if not self.valid and not self.can_refresh: raise exceptions.InvalidOperation( @@ -182,7 +182,6 @@ def constructor_args(self): "scopes": self._scopes, "quota_project_id": self._quota_project_id, "universe_domain": self._universe_domain, - "trust_boundary": self._trust_boundary, } @property @@ -308,18 +307,29 @@ def _perform_refresh_token(self, request): if "refresh_token" in response_data: self._refresh_token = response_data["refresh_token"] - def _build_trust_boundary_lookup_url(self): - """Builds and returns the URL for the trust boundary lookup API.""" + def _build_regional_access_boundary_lookup_url(self): + """Builds and returns the URL for the Regional Access Boundary lookup API. + + Returns: + Optional[str]: The URL for the Regional Access Boundary lookup endpoint, or None + if the URL cannot be built due to an invalid workforce pool audience format. + """ # Audience format: //iam.googleapis.com/locations/global/workforcePools/POOL_ID/providers/PROVIDER_ID match = re.search(r"locations/[^/]+/workforcePools/([^/]+)", self._audience) if not match: - raise exceptions.InvalidValue("Invalid workforce pool audience format.") + _LOGGER.error( + "Invalid workforce pool audience format for Regional Access Boundary lookup: %s", + self._audience, + ) + return None pool_id = match.groups()[0] - return _constants._WORKFORCE_POOL_TRUST_BOUNDARY_LOOKUP_ENDPOINT.format( - universe_domain=self._universe_domain, pool_id=pool_id + return ( + _constants._WORKFORCE_POOL_REGIONAL_ACCESS_BOUNDARY_LOOKUP_ENDPOINT.format( + pool_id=pool_id + ) ) def revoke(self, request): @@ -359,6 +369,7 @@ def _make_copy(self): kwargs = self.constructor_args() cred = self.__class__(**kwargs) cred._cred_file_path = self._cred_file_path + self._copy_regional_access_boundary_state(cred) return cred @_helpers.copy_docstring(credentials.CredentialsWithQuotaProject) @@ -379,12 +390,6 @@ def with_universe_domain(self, universe_domain): cred._universe_domain = universe_domain return cred - @_helpers.copy_docstring(credentials.CredentialsWithTrustBoundary) - def with_trust_boundary(self, trust_boundary): - cred = self._make_copy() - cred._trust_boundary = trust_boundary - return cred - @classmethod def from_info(cls, info, **kwargs): """Creates a Credentials instance from parsed external account info. @@ -414,7 +419,7 @@ def from_info(cls, info, **kwargs): expiry = datetime.datetime.strptime( expiry.rstrip("Z").split(".")[0], "%Y-%m-%dT%H:%M:%S" ) - return cls( + initial_creds = cls( audience=info.get("audience"), refresh_token=info.get("refresh_token"), token_url=info.get("token_url"), @@ -429,10 +434,17 @@ def from_info(cls, info, **kwargs): universe_domain=info.get( "universe_domain", credentials.DEFAULT_UNIVERSE_DOMAIN ), - trust_boundary=info.get("trust_boundary"), **kwargs ) + regional_access_boundary = info.get("regional_access_boundary") + if regional_access_boundary: + initial_creds = initial_creds._with_regional_access_boundary( + regional_access_boundary + ) + + return initial_creds + @classmethod def from_file(cls, filename, **kwargs): """Creates a Credentials instance from an external account json file. diff --git a/google/auth/identity_pool.py b/google/auth/identity_pool.py index 50b2a83e4..30819ef04 100644 --- a/google/auth/identity_pool.py +++ b/google/auth/identity_pool.py @@ -572,4 +572,3 @@ def refresh(self, request): ) self._perform_refresh_token(request, cert_fingerprint=cert_fingerprint) - self._handle_trust_boundary(request) diff --git a/google/auth/impersonated_credentials.py b/google/auth/impersonated_credentials.py index 304f0606e..4d65bfd36 100644 --- a/google/auth/impersonated_credentials.py +++ b/google/auth/impersonated_credentials.py @@ -30,7 +30,9 @@ from datetime import datetime import http.client as http_client import json +import logging +from google.auth import _constants from google.auth import _exponential_backoff from google.auth import _helpers from google.auth import credentials @@ -40,15 +42,14 @@ from google.auth import metrics from google.oauth2 import _client +_LOGGER = logging.getLogger(__name__) _REFRESH_ERROR = "Unable to acquire impersonated credentials" _DEFAULT_TOKEN_LIFETIME_SECS = 3600 # 1 hour in seconds _GOOGLE_OAUTH2_TOKEN_ENDPOINT = "https://oauth2.googleapis.com/token" -_TRUST_BOUNDARY_LOOKUP_ENDPOINT = ( - "https://iamcredentials.{}/v1/projects/-/serviceAccounts/{}/allowedLocations" -) + _SOURCE_CREDENTIAL_AUTHORIZED_USER_TYPE = "authorized_user" _SOURCE_CREDENTIAL_SERVICE_ACCOUNT_TYPE = "service_account" @@ -123,7 +124,7 @@ class Credentials( credentials.Scoped, credentials.CredentialsWithQuotaProject, credentials.Signing, - credentials.CredentialsWithTrustBoundary, + credentials.CredentialsWithRegionalAccessBoundary, ): """This module defines impersonated credentials which are essentially impersonated identities. @@ -204,7 +205,6 @@ def __init__( lifetime=_DEFAULT_TOKEN_LIFETIME_SECS, quota_project_id=None, iam_endpoint_override=None, - trust_boundary=None, ): """ Args: @@ -235,7 +235,6 @@ def __init__( subject (Optional[str]): sub field of a JWT. This field should only be set if you wish to impersonate as a user. This feature is useful when using domain wide delegation. - trust_boundary (Mapping[str,str]): A credential trust boundary. """ super(Credentials, self).__init__() @@ -267,7 +266,6 @@ def __init__( self._quota_project_id = quota_project_id self._iam_endpoint_override = iam_endpoint_override self._cred_file_path = None - self._trust_boundary = trust_boundary def _metric_header_for_usage(self): return metrics.CRED_TYPE_SA_IMPERSONATE @@ -344,26 +342,26 @@ def _perform_refresh_token(self, request): iam_endpoint_override=self._iam_endpoint_override, ) - def _build_trust_boundary_lookup_url(self): - """Builds and returns the URL for the trust boundary lookup API. + def _build_regional_access_boundary_lookup_url(self): + """Builds and returns the URL for the Regional Access Boundary lookup API. This method constructs the specific URL for the IAM Credentials API's `allowedLocations` endpoint, using the credential's universe domain and service account email. - Raises: - ValueError: If `self.service_account_email` is None or an empty - string, as it's required to form the URL. - Returns: - str: The URL for the trust boundary lookup endpoint. + Optional[str]: The URL for the Regional Access Boundary lookup endpoint, or None + if the service account email is missing. """ if not self.service_account_email: - raise ValueError( - "Service account email is required to build the trust boundary lookup URL." + _LOGGER.error( + "Service account email is required to build the Regional Access Boundary lookup URL for impersonated credentials." + ) + return None + return ( + _constants._SERVICE_ACCOUNT_REGIONAL_ACCESS_BOUNDARY_LOOKUP_ENDPOINT.format( + service_account_email=self.service_account_email ) - return _TRUST_BOUNDARY_LOOKUP_ENDPOINT.format( - self.universe_domain, self.service_account_email ) def sign_bytes(self, message): @@ -435,15 +433,9 @@ def _make_copy(self): lifetime=self._lifetime, quota_project_id=self._quota_project_id, iam_endpoint_override=self._iam_endpoint_override, - trust_boundary=self._trust_boundary, ) cred._cred_file_path = self._cred_file_path - return cred - - @_helpers.copy_docstring(credentials.CredentialsWithTrustBoundary) - def with_trust_boundary(self, trust_boundary): - cred = self._make_copy() - cred._trust_boundary = trust_boundary + self._copy_regional_access_boundary_state(cred) return cred @_helpers.copy_docstring(credentials.CredentialsWithQuotaProject) @@ -527,17 +519,23 @@ def from_impersonated_service_account_info(cls, info, scopes=None): delegates = info.get("delegates") quota_project_id = info.get("quota_project_id") scopes = scopes or info.get("scopes") - trust_boundary = info.get("trust_boundary") - return cls( + initial_creds = cls( source_credentials, target_principal, scopes, delegates, quota_project_id=quota_project_id, - trust_boundary=trust_boundary, ) + regional_access_boundary = info.get("regional_access_boundary") + if regional_access_boundary: + initial_creds = initial_creds._with_regional_access_boundary( + regional_access_boundary + ) + + return initial_creds + class IDTokenCredentials(credentials.CredentialsWithQuotaProject): """Open ID Connect ID Token-based service account credentials.""" diff --git a/google/oauth2/_client.py b/google/oauth2/_client.py index d4db42007..4b5a0c1f7 100644 --- a/google/oauth2/_client.py +++ b/google/oauth2/_client.py @@ -26,6 +26,7 @@ import datetime import http.client as http_client import json +import logging import urllib from google.auth import _exponential_backoff @@ -36,6 +37,8 @@ from google.auth import metrics from google.auth import transport +_LOGGER = logging.getLogger(__name__) + _URLENCODED_CONTENT_TYPE = "application/x-www-form-urlencoded" _JSON_CONTENT_TYPE = "application/json" _JWT_GRANT_TYPE = "urn:ietf:params:oauth:grant-type:jwt-bearer" @@ -514,20 +517,20 @@ def refresh_grant( return _handle_refresh_grant_response(response_data, refresh_token) -def _lookup_trust_boundary(request, url, headers=None): - """Implements the global lookup of a credential trust boundary. +def _lookup_regional_access_boundary(request, url, headers=None): + """Implements the global lookup of a credential Regional Access Boundary. For the lookup, we send a request to the global lookup endpoint and then parse the response. Service account credentials, workload identity - pools and workforce pools implementation may have trust boundaries configured. + pools and workforce pools implementation may have Regional Access Boundaries configured. Args: request (google.auth.transport.Request): A callable used to make HTTP requests. - url (str): The trust boundary lookup url. + url (str): The Regional Access Boundary lookup url. headers (Optional[Mapping[str, str]]): The headers for the request. Returns: - Mapping[str,list|str]: A dictionary containing + Optional[Mapping[str,list|str]]: A dictionary containing "locations" as a list of allowed locations as strings and - "encodedLocations" as a hex string. + "encodedLocations" as a hex string, or None if the lookup failed. e.g: { "locations": [ @@ -535,61 +538,66 @@ def _lookup_trust_boundary(request, url, headers=None): ], "encodedLocations": "0xA30" } - If the credential is not set up with explicit trust boundaries, a trust boundary - of "all" will be returned as a default response. - { - "locations": [], - "encodedLocations": "0x0" - } - Raises: - exceptions.RefreshError: If the response status code is not 200. - exceptions.MalformedError: If the response is not in a valid format. """ - response_data = _lookup_trust_boundary_request(request, url, headers=headers) - # In case of no-op response, the "locations" list may or may not be present as an empty list. + response_data = _lookup_regional_access_boundary_request( + request, url, headers=headers + ) + if response_data is None: + # Error was already logged by _lookup_regional_access_boundary_request + return None + if "encodedLocations" not in response_data: - raise exceptions.MalformedError( - "Invalid trust boundary info: {}".format(response_data) + _LOGGER.error( + "Regional Access Boundary response malformed: missing 'encodedLocations' key in %s", + response_data, ) + return None return response_data -def _lookup_trust_boundary_request(request, url, can_retry=True, headers=None): - """Makes a request to the trust boundary lookup endpoint. +def _lookup_regional_access_boundary_request( + request, url, can_retry=True, headers=None +): + """Makes a request to the Regional Access Boundary lookup endpoint. Args: request (google.auth.transport.Request): A callable used to make HTTP requests. - url (str): The trust boundary lookup url. + url (str): The Regional Access Boundary lookup url. can_retry (bool): Enable or disable request retry behavior. Defaults to true. headers (Optional[Mapping[str, str]]): The headers for the request. Returns: - Mapping[str, str]: The JSON-decoded response data. - - Raises: - google.auth.exceptions.RefreshError: If the token endpoint returned - an error. + Optional[Mapping[str, str]]: The JSON-decoded response data on success, or None on failure. """ ( response_status_ok, response_data, retryable_error, - ) = _lookup_trust_boundary_request_no_throw(request, url, can_retry, headers) + ) = _lookup_regional_access_boundary_request_no_throw( + request, url, can_retry, headers + ) if not response_status_ok: - _handle_error_response(response_data, retryable_error) + _LOGGER.warning( + "Regional Access Boundary HTTP request failed after retries: response_data=%s, retryable_error=%s", + response_data, + retryable_error, + ) + return None return response_data -def _lookup_trust_boundary_request_no_throw(request, url, can_retry=True, headers=None): - """Makes a request to the trust boundary lookup endpoint. This +def _lookup_regional_access_boundary_request_no_throw( + request, url, can_retry=True, headers=None +): + """Makes a request to the Regional Access Boundary lookup endpoint. This function doesn't throw on response errors. Args: request (google.auth.transport.Request): A callable used to make HTTP requests. - url (str): The trust boundary lookup url. + url (str): The Regional Access Boundary lookup url. can_retry (bool): Enable or disable request retry behavior. Defaults to true. headers (Optional[Mapping[str, str]]): The headers for the request. @@ -603,7 +611,7 @@ def _lookup_trust_boundary_request_no_throw(request, url, can_retry=True, header response_data = {} retryable_error = False - retries = _exponential_backoff.ExponentialBackoff() + retries = _exponential_backoff.ExponentialBackoff(total_attempts=6) for _ in retries: response = request(method="GET", url=url, headers=headers) response_body = ( @@ -624,6 +632,9 @@ def _lookup_trust_boundary_request_no_throw(request, url, can_retry=True, header retryable_error = _can_retry( status_code=response.status, response_data=response_data ) + # Add 502 (Bad Gateway) as a retryable error for RAB lookups. + if response.status == http_client.BAD_GATEWAY: + retryable_error = True if not can_retry or not retryable_error: return False, response_data, retryable_error diff --git a/google/oauth2/_service_account_async.py b/google/oauth2/_service_account_async.py index cfd315a7f..fa6cfb7b7 100644 --- a/google/oauth2/_service_account_async.py +++ b/google/oauth2/_service_account_async.py @@ -75,6 +75,13 @@ async def refresh(self, request): self.token = access_token self.expiry = expiry + @_helpers.copy_docstring(credentials_async.Credentials) + async def before_request(self, request, method, url, headers): + # Explicit override to bypass synchronous CredentialsWithRegionalAccessBoundary. + await credentials_async.Credentials.before_request( + self, request, method, url, headers + ) + class IDTokenCredentials( service_account.IDTokenCredentials, @@ -130,3 +137,11 @@ async def refresh(self, request): ) self.token = access_token self.expiry = expiry + + @_helpers.copy_docstring(credentials_async.Credentials) + async def before_request(self, request, method, url, headers): + # Explicit override to bypass synchronous CredentialsWithRegionalAccessBoundary + # and disable Regional Access Boundary refresh for async credentials. + await credentials_async.Credentials.before_request( + self, request, method, url, headers + ) diff --git a/google/oauth2/credentials.py b/google/oauth2/credentials.py index ae60223b4..5c8878f34 100644 --- a/google/oauth2/credentials.py +++ b/google/oauth2/credentials.py @@ -87,7 +87,6 @@ def __init__( refresh_handler=None, enable_reauth_refresh=False, granted_scopes=None, - trust_boundary=None, universe_domain=credentials.DEFAULT_UNIVERSE_DOMAIN, account=None, ): @@ -131,7 +130,6 @@ def __init__( granted_scopes (Optional[Sequence[str]]): The scopes that were consented/granted by the user. This could be different from the requested scopes and it could be empty if granted and requested scopes were same. - trust_boundary (str): String representation of trust boundary meta. universe_domain (Optional[str]): The universe domain. The default universe domain is googleapis.com. account (Optional[str]): The account associated with the credential. @@ -154,7 +152,6 @@ def __init__( self._rapt_token = rapt_token self.refresh_handler = refresh_handler self._enable_reauth_refresh = enable_reauth_refresh - self._trust_boundary = trust_boundary self._universe_domain = universe_domain or credentials.DEFAULT_UNIVERSE_DOMAIN self._account = account or "" self._cred_file_path = None @@ -192,7 +189,6 @@ def __setstate__(self, d): self._quota_project_id = d.get("_quota_project_id") self._rapt_token = d.get("_rapt_token") self._enable_reauth_refresh = d.get("_enable_reauth_refresh") - self._trust_boundary = d.get("_trust_boundary") self._universe_domain = ( d.get("_universe_domain") or credentials.DEFAULT_UNIVERSE_DOMAIN ) @@ -300,7 +296,6 @@ def _make_copy(self): quota_project_id=self.quota_project_id, rapt_token=self.rapt_token, enable_reauth_refresh=self._enable_reauth_refresh, - trust_boundary=self._trust_boundary, universe_domain=self._universe_domain, account=self._account, ) @@ -494,7 +489,6 @@ def from_authorized_user_info(cls, info, scopes=None): quota_project_id=info.get("quota_project_id"), # may not exist expiry=expiry, rapt_token=info.get("rapt_token"), # may not exist - trust_boundary=info.get("trust_boundary"), # may not exist universe_domain=info.get("universe_domain"), # may not exist account=info.get("account", ""), # may not exist ) diff --git a/google/oauth2/service_account.py b/google/oauth2/service_account.py index f897b3b75..3e4bbdc4c 100644 --- a/google/oauth2/service_account.py +++ b/google/oauth2/service_account.py @@ -72,6 +72,7 @@ import copy import datetime +import logging from google.auth import _constants from google.auth import _helpers @@ -83,6 +84,8 @@ from google.auth import metrics from google.oauth2 import _client +_LOGGER = logging.getLogger(__name__) + _DEFAULT_TOKEN_LIFETIME_SECS = 3600 # 1 hour in seconds _GOOGLE_OAUTH2_TOKEN_ENDPOINT = "https://oauth2.googleapis.com/token" @@ -92,7 +95,7 @@ class Credentials( credentials.Scoped, credentials.CredentialsWithQuotaProject, credentials.CredentialsWithTokenUri, - credentials.CredentialsWithTrustBoundary, + credentials.CredentialsWithRegionalAccessBoundary, ): """Service account credentials @@ -142,7 +145,6 @@ def __init__( additional_claims=None, always_use_jwt_access=False, universe_domain=credentials.DEFAULT_UNIVERSE_DOMAIN, - trust_boundary=None, ): """ Args: @@ -166,7 +168,6 @@ def __init__( universe_domain (str): The universe domain. The default universe domain is googleapis.com. For default value self signed jwt is used for token refresh. - trust_boundary (Mapping[str,str]): A credential trust boundary. .. note:: Typically one of the helper constructors :meth:`from_service_account_file` or @@ -196,7 +197,6 @@ def __init__( self._additional_claims = additional_claims else: self._additional_claims = {} - self._trust_boundary = trust_boundary @classmethod def _from_signer_and_info(cls, signer, info, **kwargs): @@ -214,7 +214,7 @@ def _from_signer_and_info(cls, signer, info, **kwargs): Raises: ValueError: If the info is not in the expected format. """ - return cls( + initial_creds = cls( signer, service_account_email=info["client_email"], token_uri=info["token_uri"], @@ -222,9 +222,14 @@ def _from_signer_and_info(cls, signer, info, **kwargs): universe_domain=info.get( "universe_domain", credentials.DEFAULT_UNIVERSE_DOMAIN ), - trust_boundary=info.get("trust_boundary"), **kwargs, ) + regional_access_boundary = info.get("regional_access_boundary") + if regional_access_boundary: + initial_creds = initial_creds._with_regional_access_boundary( + regional_access_boundary + ) + return initial_creds @classmethod def from_service_account_info(cls, info, **kwargs): @@ -296,9 +301,9 @@ def _make_copy(self): additional_claims=self._additional_claims.copy(), always_use_jwt_access=self._always_use_jwt_access, universe_domain=self._universe_domain, - trust_boundary=self._trust_boundary, ) cred._cred_file_path = self._cred_file_path + self._copy_regional_access_boundary_state(cred) return cred @_helpers.copy_docstring(credentials.Scoped) @@ -384,12 +389,6 @@ def with_token_uri(self, token_uri): cred._token_uri = token_uri return cred - @_helpers.copy_docstring(credentials.CredentialsWithTrustBoundary) - def with_trust_boundary(self, trust_boundary): - cred = self._make_copy() - cred._trust_boundary = trust_boundary - return cred - def _make_authorization_grant_assertion(self): """Create the OAuth 2.0 assertion. @@ -433,7 +432,7 @@ def _metric_header_for_usage(self): return metrics.CRED_TYPE_SA_JWT return metrics.CRED_TYPE_SA_ASSERTION - @_helpers.copy_docstring(credentials.CredentialsWithTrustBoundary) + @_helpers.copy_docstring(credentials.CredentialsWithRegionalAccessBoundary) def _perform_refresh_token(self, request): if self._always_use_jwt_access and not self._jwt_credentials: # If self signed jwt should be used but jwt credential is not @@ -499,27 +498,26 @@ def _create_self_signed_jwt(self, audience): self, audience ) - def _build_trust_boundary_lookup_url(self): - """Builds and returns the URL for the trust boundary lookup API. + def _build_regional_access_boundary_lookup_url(self): + """Builds and returns the URL for the Regional Access Boundary lookup API. This method constructs the specific URL for the IAM Credentials API's `allowedLocations` endpoint, using the credential's universe domain and service account email. - Raises: - ValueError: If `self.service_account_email` is None or an empty - string, as it's required to form the URL. - Returns: - str: The URL for the trust boundary lookup endpoint. + Optional[str]: The URL for the Regional Access Boundary lookup endpoint, or None + if the service account email is missing. """ if not self.service_account_email: - raise ValueError( - "Service account email is required to build the trust boundary lookup URL." + _LOGGER.error( + "Service account email is required to build the Regional Access Boundary lookup URL for service account credentials." + ) + return None + return ( + _constants._SERVICE_ACCOUNT_REGIONAL_ACCESS_BOUNDARY_LOOKUP_ENDPOINT.format( + service_account_email=self._service_account_email, ) - return _constants._SERVICE_ACCOUNT_TRUST_BOUNDARY_LOOKUP_ENDPOINT.format( - universe_domain=self._universe_domain, - service_account_email=self._service_account_email, ) @_helpers.copy_docstring(credentials.Signing) diff --git a/tests/compute_engine/test_credentials.py b/tests/compute_engine/test_credentials.py index ab7d45e04..9ed0865fc 100644 --- a/tests/compute_engine/test_credentials.py +++ b/tests/compute_engine/test_credentials.py @@ -26,7 +26,6 @@ from google.auth import transport from google.auth.compute_engine import credentials from google.auth.transport import requests -from google.oauth2 import _client SAMPLE_ID_TOKEN_EXP = 1584393400 @@ -62,9 +61,8 @@ class TestCredentials(object): credentials = None credentials_with_all_fields = None - VALID_TRUST_BOUNDARY = {"encodedLocations": "valid-encoded-locations"} - NO_OP_TRUST_BOUNDARY = {"encodedLocations": ""} - EXPECTED_TRUST_BOUNDARY_LOOKUP_URL_DEFAULT_UNIVERSE = "https://iamcredentials.googleapis.com/v1/projects/-/serviceAccounts/default/allowedLocations" + VALID_REGIONAL_ACCESS_BOUNDARY = {"encodedLocations": "valid-encoded-locations"} + EXPECTED_REGIONAL_ACCESS_BOUNDARY_LOOKUP_URL = "https://iamcredentials.googleapis.com/v1/projects/-/serviceAccounts/default/allowedLocations" @pytest.fixture(autouse=True) def credentials_fixture(self): @@ -258,13 +256,13 @@ def test_with_universe_domain(self): assert creds.universe_domain == "universe_domain" assert creds._universe_domain_cached - def test_with_trust_boundary(self): + def test_with_regional_access_boundary(self): creds = self.credentials_with_all_fields new_boundary = {"encodedLocations": "new_boundary"} - new_creds = creds.with_trust_boundary(new_boundary) + new_creds = creds._with_regional_access_boundary(new_boundary) assert new_creds is not creds - assert new_creds._trust_boundary == new_boundary + assert new_creds._regional_access_boundary == new_boundary assert new_creds._service_account_email == creds._service_account_email assert new_creds._quota_project_id == creds._quota_project_id assert new_creds._scopes == creds._scopes @@ -309,10 +307,10 @@ def test_user_provided_universe_domain(self, get_universe_domain): # domain endpoint. get_universe_domain.assert_not_called() - @mock.patch("google.oauth2._client._lookup_trust_boundary", autospec=True) + @mock.patch("google.oauth2._client._lookup_regional_access_boundary", autospec=True) @mock.patch("google.auth.compute_engine._metadata.get", autospec=True) - def test_refresh_trust_boundary_lookup_skipped_if_env_var_not_true( - self, mock_metadata_get, mock_lookup_tb + def test_refresh_regional_access_boundary_lookup_skipped_if_env_var_not_true( + self, mock_metadata_get, mock_lookup_rab ): creds = self.credentials request = mock.Mock() @@ -325,17 +323,18 @@ def test_refresh_trust_boundary_lookup_skipped_if_env_var_not_true( ] with mock.patch.dict( - os.environ, {environment_vars.GOOGLE_AUTH_TRUST_BOUNDARY_ENABLED: "false"} + os.environ, + {environment_vars.GOOGLE_AUTH_TRUST_BOUNDARY_ENABLED: "false"}, ): creds.refresh(request) - mock_lookup_tb.assert_not_called() - assert creds._trust_boundary is None + mock_lookup_rab.assert_not_called() + assert creds._regional_access_boundary is None - @mock.patch("google.oauth2._client._lookup_trust_boundary", autospec=True) + @mock.patch("google.oauth2._client._lookup_regional_access_boundary", autospec=True) @mock.patch("google.auth.compute_engine._metadata.get", autospec=True) - def test_refresh_trust_boundary_lookup_skipped_if_env_var_missing( - self, mock_metadata_get, mock_lookup_tb + def test_refresh_regional_access_boundary_lookup_skipped_if_env_var_missing( + self, mock_metadata_get, mock_lookup_rab ): creds = self.credentials request = mock.Mock() @@ -350,234 +349,25 @@ def test_refresh_trust_boundary_lookup_skipped_if_env_var_missing( with mock.patch.dict(os.environ, clear=True): creds.refresh(request) - mock_lookup_tb.assert_not_called() - assert creds._trust_boundary is None - - @mock.patch.object(_client, "_lookup_trust_boundary", autospec=True) - @mock.patch("google.auth.compute_engine._metadata.get", autospec=True) - def test_refresh_trust_boundary_lookup_success( - self, mock_metadata_get, mock_lookup_tb - ): - mock_lookup_tb.return_value = { - "locations": ["us-central1"], - "encodedLocations": "0xABC", - } - creds = self.credentials - request = mock.Mock() - - # The first call to _metadata.get is for service account info, the second - # for the access token, and the third for the universe domain. - mock_metadata_get.side_effect = [ - # from _retrieve_info - {"email": "resolved-email@example.com", "scopes": ["scope1"]}, - # from get_service_account_token - {"access_token": "mock_token", "expires_in": 3600}, - # from get_universe_domain - "", - ] - - with mock.patch.dict( - os.environ, {environment_vars.GOOGLE_AUTH_TRUST_BOUNDARY_ENABLED: "true"} - ): - creds.refresh(request) - - # Verify _metadata.get was called three times. - assert mock_metadata_get.call_count == 3 - # Verify lookup_trust_boundary was called with correct URL and token - expected_url = "https://iamcredentials.googleapis.com/v1/projects/-/serviceAccounts/resolved-email@example.com/allowedLocations" - mock_lookup_tb.assert_called_once_with( - request, expected_url, headers={"authorization": "Bearer mock_token"} - ) - # Verify trust boundary was set - assert creds._trust_boundary == { - "locations": ["us-central1"], - "encodedLocations": "0xABC", - } - - # Verify x-allowed-locations header is set by apply() - headers_applied = {} - creds.apply(headers_applied) - assert headers_applied["x-allowed-locations"] == "0xABC" - - @mock.patch("google.auth.compute_engine._metadata.get", autospec=True) - @mock.patch.object(_client, "_lookup_trust_boundary", autospec=True) - def test_refresh_trust_boundary_lookup_fails_no_cache( - self, mock_lookup_tb, mock_metadata_get - ): - mock_lookup_tb.side_effect = exceptions.RefreshError("Lookup failed") - creds = self.credentials - request = mock.Mock() - - # Mock metadata calls for token, universe domain, and service account info - mock_metadata_get.side_effect = [ - # from _retrieve_info - {"email": "resolved-email@example.com", "scopes": ["scope1"]}, - # from get_service_account_token - {"access_token": "mock_token", "expires_in": 3600}, - # from get_universe_domain - "", - ] - - with mock.patch.dict( - os.environ, {environment_vars.GOOGLE_AUTH_TRUST_BOUNDARY_ENABLED: "true"} - ): - with pytest.raises(exceptions.RefreshError, match="Lookup failed"): - creds.refresh(request) - - assert creds._trust_boundary is None - assert mock_metadata_get.call_count == 3 - mock_lookup_tb.assert_called_once() - - @mock.patch("google.auth.compute_engine._metadata.get", autospec=True) - @mock.patch.object(_client, "_lookup_trust_boundary", autospec=True) - def test_refresh_trust_boundary_lookup_fails_with_cached_data( - self, mock_lookup_tb, mock_metadata_get - ): - # First refresh: Successfully fetch a valid trust boundary. - mock_lookup_tb.return_value = { - "locations": ["us-central1"], - "encodedLocations": "0xABC", - } - mock_metadata_get.side_effect = [ - # from _retrieve_info - {"email": "resolved-email@example.com", "scopes": ["scope1"]}, - # from get_service_account_token - {"access_token": "mock_token_1", "expires_in": 3600}, - # from get_universe_domain - "", - ] - creds = self.credentials - request = mock.Mock() - - with mock.patch.dict( - os.environ, {environment_vars.GOOGLE_AUTH_TRUST_BOUNDARY_ENABLED: "true"} - ): - creds.refresh(request) - - assert creds._trust_boundary == { - "locations": ["us-central1"], - "encodedLocations": "0xABC", - } - mock_lookup_tb.assert_called_once() - - # Second refresh: Mock lookup to fail, but expect cached data to be preserved. - mock_lookup_tb.reset_mock() - mock_lookup_tb.side_effect = exceptions.RefreshError("Lookup failed") - - with mock.patch.dict( - os.environ, {environment_vars.GOOGLE_AUTH_TRUST_BOUNDARY_ENABLED: "true"} - ): - # This refresh should not raise an error because a cached value exists. - mock_metadata_get.reset_mock() - mock_metadata_get.side_effect = [ - # from _retrieve_info - {"email": "resolved-email@example.com", "scopes": ["scope1"]}, - # from get_service_account_token - {"access_token": "mock_token_2", "expires_in": 3600}, - # from get_universe_domain - "", - ] - creds.refresh(request) - - assert creds._trust_boundary == { - "locations": ["us-central1"], - "encodedLocations": "0xABC", - } - mock_lookup_tb.assert_called_once() - - @mock.patch("google.auth.compute_engine._metadata.get", autospec=True) - @mock.patch.object(_client, "_lookup_trust_boundary", autospec=True) - def test_refresh_fetches_no_op_trust_boundary( - self, mock_lookup_tb, mock_metadata_get - ): - mock_lookup_tb.return_value = {"locations": [], "encodedLocations": "0x0"} - creds = self.credentials - request = mock.Mock() - - mock_metadata_get.side_effect = [ - # from _retrieve_info - {"email": "resolved-email@example.com", "scopes": ["scope1"]}, - # from get_service_account_token - {"access_token": "mock_token", "expires_in": 3600}, - # from get_universe_domain - "", - ] - - with mock.patch.dict( - os.environ, {environment_vars.GOOGLE_AUTH_TRUST_BOUNDARY_ENABLED: "true"} - ): - creds.refresh(request) - - assert creds._trust_boundary == {"locations": [], "encodedLocations": "0x0"} - assert mock_metadata_get.call_count == 3 - expected_url = "https://iamcredentials.googleapis.com/v1/projects/-/serviceAccounts/resolved-email@example.com/allowedLocations" - mock_lookup_tb.assert_called_once_with( - request, expected_url, headers={"authorization": "Bearer mock_token"} - ) - # Verify that an empty header was added. - headers_applied = {} - creds.apply(headers_applied) - assert headers_applied["x-allowed-locations"] == "" - - @mock.patch("google.auth.compute_engine._metadata.get", autospec=True) - @mock.patch.object(_client, "_lookup_trust_boundary", autospec=True) - def test_refresh_starts_with_no_op_trust_boundary_skips_lookup( - self, mock_lookup_tb, mock_metadata_get - ): - creds = self.credentials - # Use pre-cache universe domain to avoid an extra metadata call. - creds._universe_domain_cached = True - creds._trust_boundary = {"locations": [], "encodedLocations": "0x0"} - request = mock.Mock() - - mock_metadata_get.side_effect = [ - # from _retrieve_info - {"email": "resolved-email@example.com", "scopes": ["scope1"]}, - # from get_service_account_token - {"access_token": "mock_token", "expires_in": 3600}, - ] - - with mock.patch.dict( - os.environ, {environment_vars.GOOGLE_AUTH_TRUST_BOUNDARY_ENABLED: "true"} - ): - creds.refresh(request) - - # Verify trust boundary remained NO_OP - assert creds._trust_boundary == {"locations": [], "encodedLocations": "0x0"} - # Lookup should be skipped - mock_lookup_tb.assert_not_called() - # Two metadata calls for token refresh should have happened. - assert mock_metadata_get.call_count == 2 - - # Verify that an empty header was added. - headers_applied = {} - creds.apply(headers_applied) - assert headers_applied["x-allowed-locations"] == "" + mock_lookup_rab.assert_not_called() + assert creds._regional_access_boundary is None @mock.patch( "google.auth.compute_engine._metadata.get_service_account_info", autospec=True ) - @mock.patch( - "google.auth.compute_engine._metadata.get_universe_domain", autospec=True - ) - def test_build_trust_boundary_lookup_url_default_email( - self, mock_get_universe_domain, mock_get_service_account_info + def test_build_regional_access_boundary_lookup_url_default_email( + self, mock_get_service_account_info ): - # Test with default service account email, which needs resolution - creds = self.credentials - creds._service_account_email = "default" mock_get_service_account_info.return_value = { "email": "resolved-email@example.com" } - mock_get_universe_domain.return_value = "googleapis.com" - - url = creds._build_trust_boundary_lookup_url() + creds = self.credentials + creds._universe_domain_cached = True + url = creds._build_regional_access_boundary_lookup_url() mock_get_service_account_info.assert_called_once_with(mock.ANY, "default") - mock_get_universe_domain.assert_called_once_with(mock.ANY) - assert url == ( - "https://iamcredentials.googleapis.com/v1/projects/-/serviceAccounts/resolved-email@example.com/allowedLocations" - ) + expected_url = "https://iamcredentials.googleapis.com/v1/projects/-/serviceAccounts/resolved-email@example.com/allowedLocations" + assert url == expected_url @mock.patch( "google.auth.compute_engine._metadata.get_service_account_info", autospec=True @@ -585,7 +375,7 @@ def test_build_trust_boundary_lookup_url_default_email( @mock.patch( "google.auth.compute_engine._metadata.get_universe_domain", autospec=True ) - def test_build_trust_boundary_lookup_url_explicit_email( + def test_build_regional_access_boundary_lookup_url_explicit_email( self, mock_get_universe_domain, mock_get_service_account_info ): # Test with an explicit service account email, no resolution needed @@ -593,58 +383,34 @@ def test_build_trust_boundary_lookup_url_explicit_email( creds._service_account_email = FAKE_SERVICE_ACCOUNT_EMAIL mock_get_universe_domain.return_value = "googleapis.com" - url = creds._build_trust_boundary_lookup_url() + url = creds._build_regional_access_boundary_lookup_url() mock_get_service_account_info.assert_not_called() - mock_get_universe_domain.assert_called_once_with(mock.ANY) assert url == ( "https://iamcredentials.googleapis.com/v1/projects/-/serviceAccounts/foo@bar.com/allowedLocations" ) - @mock.patch( - "google.auth.compute_engine._metadata.get_service_account_info", autospec=True - ) @mock.patch( "google.auth.compute_engine._metadata.get_universe_domain", autospec=True ) - def test_build_trust_boundary_lookup_url_non_default_universe( - self, mock_get_universe_domain, mock_get_service_account_info - ): - # Test with a non-default universe domain - creds = self.credentials_with_all_fields - - url = creds._build_trust_boundary_lookup_url() - - # Universe domain is cached and email is explicit, so no metadata calls needed. - mock_get_service_account_info.assert_not_called() - mock_get_universe_domain.assert_not_called() - assert url == ( - "https://iamcredentials.fake-universe-domain/v1/projects/-/serviceAccounts/foo@bar.com/allowedLocations" - ) - @mock.patch( "google.auth.compute_engine._metadata.get_service_account_info", autospec=True ) - def test_build_trust_boundary_lookup_url_get_service_account_info_fails( - self, mock_get_service_account_info + def test_build_regional_access_boundary_lookup_url_get_service_account_info_fails( + self, mock_get_service_account_info, mock_get_universe_domain ): - # Test scenario where get_service_account_info fails mock_get_service_account_info.side_effect = exceptions.TransportError( - "Failed to get info" + "Metadata server error" ) creds = self.credentials - creds._service_account_email = "default" - - with pytest.raises( - exceptions.RefreshError, - match=r"Failed to get service account email for trust boundary lookup: .*", - ): - creds._build_trust_boundary_lookup_url() + url = creds._build_regional_access_boundary_lookup_url() + assert url is None + mock_get_service_account_info.assert_called_once() @mock.patch( "google.auth.compute_engine._metadata.get_service_account_info", autospec=True ) - def test_build_trust_boundary_lookup_url_no_email( + def test_build_regional_access_boundary_lookup_url_no_email( self, mock_get_service_account_info ): # Test with default service account email, which needs resolution, but metadata @@ -653,10 +419,8 @@ def test_build_trust_boundary_lookup_url_no_email( creds._service_account_email = "default" mock_get_service_account_info.return_value = {"scopes": ["one", "two"]} - with pytest.raises(exceptions.RefreshError) as excinfo: - creds._build_trust_boundary_lookup_url() - - assert excinfo.match(r"missing 'email' field") + url = creds._build_regional_access_boundary_lookup_url() + assert url is None @mock.patch("google.auth.compute_engine._metadata.get") @mock.patch("google.auth._agent_identity_utils.get_agent_identity_certificate_path") diff --git a/tests/oauth2/test__client.py b/tests/oauth2/test__client.py index 6a01b1bac..ad510f903 100644 --- a/tests/oauth2/test__client.py +++ b/tests/oauth2/test__client.py @@ -632,7 +632,7 @@ def test__token_endpoint_request_no_throw_with_retry(can_retry): assert mock_request.call_count == 1 -def test_lookup_trust_boundary(): +def test_lookup_regional_access_boundary(): response_data = { "locations": ["us-central1", "us-east1"], "encodedLocations": "0x80080000000000", @@ -647,7 +647,9 @@ def test_lookup_trust_boundary(): url = "http://example.com" headers = {"Authorization": "Bearer access_token"} - response = _client._lookup_trust_boundary(mock_request, url, headers=headers) + response = _client._lookup_regional_access_boundary( + mock_request, url, headers=headers + ) assert response["encodedLocations"] == "0x80080000000000" assert response["locations"] == ["us-central1", "us-east1"] @@ -655,47 +657,7 @@ def test_lookup_trust_boundary(): mock_request.assert_called_once_with(method="GET", url=url, headers=headers) -def test_lookup_trust_boundary_no_op_response_without_locations(): - response_data = {"encodedLocations": "0x0"} - - mock_response = mock.create_autospec(transport.Response, instance=True) - mock_response.status = http_client.OK - mock_response.data = json.dumps(response_data).encode("utf-8") - - mock_request = mock.create_autospec(transport.Request) - mock_request.return_value = mock_response - - url = "http://example.com" - headers = {"Authorization": "Bearer access_token"} - # for the response to be valid, we only need encodedLocations to be present. - response = _client._lookup_trust_boundary(mock_request, url, headers=headers) - assert response["encodedLocations"] == "0x0" - assert "locations" not in response - - mock_request.assert_called_once_with(method="GET", url=url, headers=headers) - - -def test_lookup_trust_boundary_no_op_response(): - response_data = {"locations": [], "encodedLocations": "0x0"} - - mock_response = mock.create_autospec(transport.Response, instance=True) - mock_response.status = http_client.OK - mock_response.data = json.dumps(response_data).encode("utf-8") - - mock_request = mock.create_autospec(transport.Request) - mock_request.return_value = mock_response - - url = "http://example.com" - headers = {"Authorization": "Bearer access_token"} - response = _client._lookup_trust_boundary(mock_request, url, headers=headers) - - assert response["encodedLocations"] == "0x0" - assert response["locations"] == [] - - mock_request.assert_called_once_with(method="GET", url=url, headers=headers) - - -def test_lookup_trust_boundary_error(): +def test_lookup_regional_access_boundary_error(): mock_response = mock.create_autospec(transport.Response, instance=True) mock_response.status = http_client.INTERNAL_SERVER_ERROR mock_response.data = "this is an error message" @@ -705,33 +667,40 @@ def test_lookup_trust_boundary_error(): url = "http://example.com" headers = {"Authorization": "Bearer access_token"} - with pytest.raises(exceptions.RefreshError) as excinfo: - _client._lookup_trust_boundary(mock_request, url, headers=headers) - assert excinfo.match("this is an error message") + result = _client._lookup_regional_access_boundary( + mock_request, url, headers=headers + ) + assert result is None mock_request.assert_called_with(method="GET", url=url, headers=headers) -def test_lookup_trust_boundary_missing_encoded_locations(): - response_data = {"locations": [], "bad_field": "0x0"} - +@pytest.mark.parametrize( + "status_code", + [ + http_client.NOT_FOUND, + http_client.FORBIDDEN, + ], +) +def test_lookup_regional_access_boundary_non_retryable_error(status_code): mock_response = mock.create_autospec(transport.Response, instance=True) - mock_response.status = http_client.OK - mock_response.data = json.dumps(response_data).encode("utf-8") + mock_response.status = status_code + mock_response.data = "Error" mock_request = mock.create_autospec(transport.Request) mock_request.return_value = mock_response url = "http://example.com" headers = {"Authorization": "Bearer access_token"} - with pytest.raises(exceptions.MalformedError) as excinfo: - _client._lookup_trust_boundary(mock_request, url, headers=headers) - assert excinfo.match("Invalid trust boundary info") - + result = _client._lookup_regional_access_boundary( + mock_request, url, headers=headers + ) + assert result is None + # Non-retryable errors should only be called once. mock_request.assert_called_once_with(method="GET", url=url, headers=headers) -def test_lookup_trust_boundary_internal_failure_and_retry_failure_error(): +def test_lookup_regional_access_boundary_internal_failure_and_retry_failure_error(): retryable_error = mock.create_autospec(transport.Response, instance=True) retryable_error.status = http_client.BAD_REQUEST retryable_error.data = json.dumps({"error_description": "internal_failure"}).encode( @@ -749,10 +718,10 @@ def test_lookup_trust_boundary_internal_failure_and_retry_failure_error(): request.side_effect = [retryable_error, retryable_error, unretryable_error] headers = {"Authorization": "Bearer access_token"} - with pytest.raises(exceptions.RefreshError): - _client._lookup_trust_boundary_request( - request, "http://example.com", headers=headers - ) + result = _client._lookup_regional_access_boundary_request( + request, "http://example.com", headers=headers + ) + assert result is None # request should be called three times. Two retryable errors and one # unretryable error to break the retry loop. assert request.call_count == 3 @@ -760,14 +729,17 @@ def test_lookup_trust_boundary_internal_failure_and_retry_failure_error(): assert call[1]["headers"] == headers -def test_lookup_trust_boundary_internal_failure_and_retry_succeeds(): +def test_lookup_regional_access_boundary_internal_failure_and_retry_succeeds(): retryable_error = mock.create_autospec(transport.Response, instance=True) retryable_error.status = http_client.BAD_REQUEST retryable_error.data = json.dumps({"error_description": "internal_failure"}).encode( "utf-8" ) - response_data = {"locations": [], "encodedLocations": "0x0"} + response_data = { + "locations": ["us-central1", "us-east1"], + "encodedLocations": "0xVALIDHEX", + } response = mock.create_autospec(transport.Response, instance=True) response.status = http_client.OK response.data = json.dumps(response_data).encode("utf-8") @@ -777,7 +749,7 @@ def test_lookup_trust_boundary_internal_failure_and_retry_succeeds(): headers = {"Authorization": "Bearer access_token"} request.side_effect = [retryable_error, response] - _ = _client._lookup_trust_boundary_request( + _ = _client._lookup_regional_access_boundary_request( request, "http://example.com", headers=headers ) @@ -786,7 +758,7 @@ def test_lookup_trust_boundary_internal_failure_and_retry_succeeds(): assert call[1]["headers"] == headers -def test_lookup_trust_boundary_with_headers(): +def test_lookup_regional_access_boundary_with_headers(): response_data = { "locations": ["us-central1", "us-east1"], "encodedLocations": "0x80080000000000", @@ -800,7 +772,9 @@ def test_lookup_trust_boundary_with_headers(): mock_request.return_value = mock_response headers = {"Authorization": "Bearer access_token", "x-test-header": "test-value"} - _client._lookup_trust_boundary(mock_request, "http://example.com", headers=headers) + _client._lookup_regional_access_boundary( + mock_request, "http://example.com", headers=headers + ) mock_request.assert_called_once_with( method="GET", url="http://example.com", headers=headers diff --git a/tests/oauth2/test_service_account.py b/tests/oauth2/test_service_account.py index 788f52263..1d6eded2e 100644 --- a/tests/oauth2/test_service_account.py +++ b/tests/oauth2/test_service_account.py @@ -20,9 +20,7 @@ import pytest # type: ignore from google.auth import _helpers -from google.auth import credentials from google.auth import crypt -from google.auth import environment_vars from google.auth import exceptions from google.auth import iam from google.auth import jwt @@ -60,10 +58,6 @@ class TestCredentials(object): SERVICE_ACCOUNT_EMAIL = "service-account@example.com" TOKEN_URI = "https://example.com/oauth2/token" - NO_OP_TRUST_BOUNDARY = { - "locations": credentials.NO_OP_TRUST_BOUNDARY_LOCATIONS, - "encodedLocations": credentials.NO_OP_TRUST_BOUNDARY_ENCODED_LOCATIONS, - } VALID_TRUST_BOUNDARY = { "locations": ["us-central1", "us-east1"], "encodedLocations": "0xVALIDHEXSA", @@ -77,14 +71,12 @@ class TestCredentials(object): def make_credentials( cls, universe_domain=DEFAULT_UNIVERSE_DOMAIN, - trust_boundary=None, # Align with Credentials class default ): return service_account.Credentials( SIGNER, cls.SERVICE_ACCOUNT_EMAIL, cls.TOKEN_URI, universe_domain=universe_domain, - trust_boundary=trust_boundary, ) def test_get_cred_info(self): @@ -236,6 +228,26 @@ def test_with_quota_project(self): new_credentials.apply(hdrs, token="tok") assert "x-goog-user-project" in hdrs + def test_with_regional_access_boundary(self): + credentials = self.make_credentials() + new_credentials = credentials._with_regional_access_boundary( + {"encodedLocations": "new_boundary"} + ) + assert new_credentials is not credentials + assert new_credentials._regional_access_boundary == { + "encodedLocations": "new_boundary" + } + + def test_build_regional_access_boundary_lookup_url(self): + credentials = self.make_credentials() + expected_url = ( + "https://iamcredentials.googleapis.com/v1/projects/-/" + "serviceAccounts/{}/allowedLocations".format( + credentials.service_account_email + ) + ) + assert credentials._build_regional_access_boundary_lookup_url() == expected_url + def test_with_token_uri(self): credentials = self.make_credentials() new_token_uri = "https://example2.com/oauth2/token" @@ -270,18 +282,6 @@ def test__with_always_use_jwt_access_non_default_universe_domain(self): "always_use_jwt_access should be True for non-default universe domain" ) - def test_with_trust_boundary(self): - credentials = self.make_credentials() - new_boundary = {"encodedLocations": "new_boundary"} - new_credentials = credentials.with_trust_boundary(new_boundary) - - assert new_credentials is not credentials - assert new_credentials._trust_boundary == new_boundary - assert new_credentials._signer == credentials._signer - assert ( - new_credentials.service_account_email == credentials.service_account_email - ) - def test__make_authorization_grant_assertion(self): credentials = self.make_credentials() token = credentials._make_authorization_grant_assertion() @@ -529,39 +529,6 @@ def test_refresh_success(self, jwt_grant): # Check that the credentials are valid (have a token and are not expired). assert credentials.valid - # Trust boundary should be None since env var is not set and no initial - # boundary was provided. - assert credentials._trust_boundary is None - - @mock.patch("google.oauth2._client._lookup_trust_boundary") - @mock.patch("google.oauth2._client.jwt_grant", autospec=True) - def test_refresh_skips_trust_boundary_lookup_non_default_universe( - self, mock_jwt_grant, mock_lookup_trust_boundary - ): - # Create credentials with a non-default universe domain - credentials = self.make_credentials(universe_domain=FAKE_UNIVERSE_DOMAIN) - token = "token" - mock_jwt_grant.return_value = ( - token, - _helpers.utcnow() + datetime.timedelta(seconds=500), - {}, - ) - request = mock.create_autospec(transport.Request, instance=True) - - with mock.patch.dict( - os.environ, {environment_vars.GOOGLE_AUTH_TRUST_BOUNDARY_ENABLED: "true"} - ): - credentials.refresh(request) - - # Ensure jwt_grant was called (token refresh happened) - mock_jwt_grant.assert_called_once() - # Ensure trust boundary lookup was not called - mock_lookup_trust_boundary.assert_not_called() - # Verify that x-allowed-locations header is not set by apply() - headers_applied = {} - credentials.apply(headers_applied) - assert "x-allowed-locations" not in headers_applied - @mock.patch("google.oauth2._client.jwt_grant", autospec=True) def test_before_request_refreshes(self, jwt_grant): credentials = self.make_credentials() @@ -670,208 +637,6 @@ def test_refresh_non_gdu_domain_wide_delegation_not_supported(self): credentials.refresh(None) assert excinfo.match("domain wide delegation is not supported") - @mock.patch("google.oauth2._client._lookup_trust_boundary") - @mock.patch("google.oauth2._client.jwt_grant", autospec=True) - def test_refresh_success_with_valid_trust_boundary( - self, mock_jwt_grant, mock_lookup_trust_boundary - ): - # Start with no boundary. - credentials = self.make_credentials(trust_boundary=None) - token = "token" - mock_jwt_grant.return_value = ( - token, - _helpers.utcnow() + datetime.timedelta(seconds=500), - {}, - ) - request = mock.create_autospec(transport.Request, instance=True) - - # Mock the trust boundary lookup to return a valid boundary. - mock_lookup_trust_boundary.return_value = self.VALID_TRUST_BOUNDARY - - with mock.patch.dict( - os.environ, {environment_vars.GOOGLE_AUTH_TRUST_BOUNDARY_ENABLED: "true"} - ): - credentials.refresh(request) - - assert credentials.valid - assert credentials.token == token - - # Verify trust boundary was set. - assert credentials._trust_boundary == self.VALID_TRUST_BOUNDARY - - # Verify the mock was called with the correct URL. - mock_lookup_trust_boundary.assert_called_once_with( - request, - self.EXPECTED_TRUST_BOUNDARY_LOOKUP_URL_DEFAULT_UNIVERSE, - headers={"authorization": "Bearer token"}, - ) - - # Verify x-allowed-locations header is set correctly by apply(). - headers_applied = {} - credentials.apply(headers_applied) - assert ( - headers_applied["x-allowed-locations"] - == self.VALID_TRUST_BOUNDARY["encodedLocations"] - ) - - @mock.patch("google.oauth2._client._lookup_trust_boundary") - @mock.patch("google.oauth2._client.jwt_grant", autospec=True) - def test_refresh_fetches_no_op_trust_boundary( - self, mock_jwt_grant, mock_lookup_trust_boundary - ): - # Start with no trust boundary - credentials = self.make_credentials(trust_boundary=None) - token = "token" - mock_jwt_grant.return_value = ( - token, - _helpers.utcnow() + datetime.timedelta(seconds=500), - {}, - ) - request = mock.create_autospec(transport.Request, instance=True) - - mock_lookup_trust_boundary.return_value = self.NO_OP_TRUST_BOUNDARY - - with mock.patch.dict( - os.environ, {environment_vars.GOOGLE_AUTH_TRUST_BOUNDARY_ENABLED: "true"} - ): - credentials.refresh(request) - - assert credentials.valid - assert credentials.token == token - assert credentials._trust_boundary == self.NO_OP_TRUST_BOUNDARY - mock_lookup_trust_boundary.assert_called_once_with( - request, - self.EXPECTED_TRUST_BOUNDARY_LOOKUP_URL_DEFAULT_UNIVERSE, - headers={"authorization": "Bearer token"}, - ) - headers_applied = {} - credentials.apply(headers_applied) - assert headers_applied["x-allowed-locations"] == "" - - @mock.patch("google.oauth2._client._lookup_trust_boundary") - @mock.patch("google.oauth2._client.jwt_grant", autospec=True) - def test_refresh_starts_with_no_op_trust_boundary_skips_lookup( - self, mock_jwt_grant, mock_lookup_trust_boundary - ): - credentials = self.make_credentials( - trust_boundary=self.NO_OP_TRUST_BOUNDARY - ) # Start with NO_OP - token = "token" - mock_jwt_grant.return_value = ( - token, - _helpers.utcnow() + datetime.timedelta(seconds=500), - {}, - ) - request = mock.create_autospec(transport.Request, instance=True) - - with mock.patch.dict( - os.environ, {environment_vars.GOOGLE_AUTH_TRUST_BOUNDARY_ENABLED: "true"} - ): - credentials.refresh(request) - - assert credentials.valid - assert credentials.token == token - # Verify trust boundary remained NO_OP - assert credentials._trust_boundary == self.NO_OP_TRUST_BOUNDARY - - # Lookup should be skipped - mock_lookup_trust_boundary.assert_not_called() - - # Verify that an empty header was added. - headers_applied = {} - credentials.apply(headers_applied) - assert headers_applied["x-allowed-locations"] == "" - - @mock.patch("google.oauth2._client._lookup_trust_boundary") - @mock.patch("google.oauth2._client.jwt_grant", autospec=True) - def test_refresh_trust_boundary_lookup_fails_no_cache( - self, mock_jwt_grant, mock_lookup_trust_boundary - ): - # Start with no trust boundary - credentials = self.make_credentials(trust_boundary=None) - mock_lookup_trust_boundary.side_effect = exceptions.RefreshError( - "Lookup failed" - ) - mock_jwt_grant.return_value = ( - "mock_access_token", - _helpers.utcnow() + datetime.timedelta(seconds=3600), - {}, - ) - request = mock.create_autospec(transport.Request, instance=True) - - # Mock the trust boundary lookup to raise an error - with mock.patch.dict( - os.environ, {environment_vars.GOOGLE_AUTH_TRUST_BOUNDARY_ENABLED: "true"} - ), pytest.raises(exceptions.RefreshError, match="Lookup failed"): - credentials.refresh(request) - - assert credentials._trust_boundary is None - mock_lookup_trust_boundary.assert_called_once() - - @mock.patch("google.oauth2._client._lookup_trust_boundary") - @mock.patch("google.oauth2._client.jwt_grant", autospec=True) - def test_refresh_trust_boundary_lookup_fails_with_cached_data( - self, mock_jwt_grant, mock_lookup_trust_boundary - ): - # Initial setup: Credentials with no trust boundary. - credentials = self.make_credentials(trust_boundary=None) - token = "token" - mock_jwt_grant.return_value = ( - token, - _helpers.utcnow() + datetime.timedelta(seconds=500), - {}, - ) - request = mock.create_autospec(transport.Request, instance=True) - - # First refresh: Successfully fetch a valid trust boundary. - mock_lookup_trust_boundary.return_value = self.VALID_TRUST_BOUNDARY - with mock.patch.dict( - os.environ, {environment_vars.GOOGLE_AUTH_TRUST_BOUNDARY_ENABLED: "true"} - ): - credentials.refresh(request) - - assert credentials.valid - assert credentials.token == token - assert credentials._trust_boundary == self.VALID_TRUST_BOUNDARY - mock_lookup_trust_boundary.assert_called_once_with( - request, - self.EXPECTED_TRUST_BOUNDARY_LOOKUP_URL_DEFAULT_UNIVERSE, - headers={"authorization": "Bearer token"}, - ) - - # Second refresh: Mock lookup to fail, but expect cached data to be preserved. - mock_lookup_trust_boundary.reset_mock() - mock_lookup_trust_boundary.side_effect = exceptions.RefreshError( - "Lookup failed" - ) - - with mock.patch.dict( - os.environ, {environment_vars.GOOGLE_AUTH_TRUST_BOUNDARY_ENABLED: "true"} - ): - credentials.refresh(request) # This should NOT raise an exception - - assert credentials.valid # Credentials should still be valid - assert ( - credentials._trust_boundary == self.VALID_TRUST_BOUNDARY - ) # Cached data should be preserved - mock_lookup_trust_boundary.assert_called_once_with( - request, - self.EXPECTED_TRUST_BOUNDARY_LOOKUP_URL_DEFAULT_UNIVERSE, - headers={ - "authorization": "Bearer token", - "x-allowed-locations": self.VALID_TRUST_BOUNDARY["encodedLocations"], - }, - ) # Lookup should have been attempted again - - def test_build_trust_boundary_lookup_url_no_email(self): - credentials = self.make_credentials() - credentials._service_account_email = None - - with pytest.raises(ValueError) as excinfo: - credentials._build_trust_boundary_lookup_url() - - assert "Service account email is required" in str(excinfo.value) - class TestIDTokenCredentials(object): SERVICE_ACCOUNT_EMAIL = "service-account@example.com" diff --git a/tests/test__helpers.py b/tests/test__helpers.py index 4d12abd86..a167902f6 100644 --- a/tests/test__helpers.py +++ b/tests/test__helpers.py @@ -20,7 +20,7 @@ import pytest # type: ignore -from google.auth import _helpers, exceptions +from google.auth import _helpers # _MOCK_BASE_LOGGER_NAME is the base logger namespace used for testing. _MOCK_BASE_LOGGER_NAME = "foogle" @@ -251,14 +251,11 @@ def test_get_bool_from_env(monkeypatch): # Test invalid value monkeypatch.setenv("TEST_VAR", "invalid_value") - with pytest.raises(exceptions.InvalidValue) as excinfo: - _helpers.get_bool_from_env("TEST_VAR") - assert 'must be one of "true", "false", "1", or "0"' in str(excinfo.value) + assert _helpers.get_bool_from_env("TEST_VAR") is False # Test empty string value monkeypatch.setenv("TEST_VAR", "") - with pytest.raises(exceptions.InvalidValue): - _helpers.get_bool_from_env("TEST_VAR") + assert _helpers.get_bool_from_env("TEST_VAR") is False def test_hash_sensitive_info_basic(): diff --git a/tests/test_aws.py b/tests/test_aws.py index b6b1ca231..a8f6fb152 100644 --- a/tests/test_aws.py +++ b/tests/test_aws.py @@ -973,7 +973,6 @@ def test_from_info_full_options(self, mock_init): quota_project_id=QUOTA_PROJECT_ID, workforce_pool_user_project=None, universe_domain=DEFAULT_UNIVERSE_DOMAIN, - trust_boundary=None, ) @mock.patch.object(aws.Credentials, "__init__", return_value=None) @@ -1003,7 +1002,6 @@ def test_from_info_required_options_only(self, mock_init): quota_project_id=None, workforce_pool_user_project=None, universe_domain=DEFAULT_UNIVERSE_DOMAIN, - trust_boundary=None, ) @mock.patch.object(aws.Credentials, "__init__", return_value=None) @@ -1035,7 +1033,6 @@ def test_from_info_supplier(self, mock_init): quota_project_id=None, workforce_pool_user_project=None, universe_domain=DEFAULT_UNIVERSE_DOMAIN, - trust_boundary=None, ) @mock.patch.object(aws.Credentials, "__init__", return_value=None) @@ -1073,7 +1070,6 @@ def test_from_file_full_options(self, mock_init, tmpdir): quota_project_id=QUOTA_PROJECT_ID, workforce_pool_user_project=None, universe_domain=DEFAULT_UNIVERSE_DOMAIN, - trust_boundary=None, ) @mock.patch.object(aws.Credentials, "__init__", return_value=None) @@ -1104,7 +1100,6 @@ def test_from_file_required_options_only(self, mock_init, tmpdir): quota_project_id=None, workforce_pool_user_project=None, universe_domain=DEFAULT_UNIVERSE_DOMAIN, - trust_boundary=None, ) def test_constructor_invalid_credential_source(self): @@ -2058,9 +2053,6 @@ def test_refresh_success_with_impersonation_ignore_default_scopes( "authorization": "Bearer {}".format(self.SUCCESS_RESPONSE["access_token"]), "x-goog-user-project": QUOTA_PROJECT_ID, "x-goog-api-client": IMPERSONATE_ACCESS_TOKEN_REQUEST_METRICS_HEADER_VALUE, - # TODO(negarb): Uncomment and update when trust boundary is supported - # for external account credentials. - # "x-allowed-locations": "0x0", } impersonation_request_data = { "delegates": None, @@ -2153,7 +2145,6 @@ def test_refresh_success_with_impersonation_use_default_scopes( "authorization": "Bearer {}".format(self.SUCCESS_RESPONSE["access_token"]), "x-goog-user-project": QUOTA_PROJECT_ID, "x-goog-api-client": IMPERSONATE_ACCESS_TOKEN_REQUEST_METRICS_HEADER_VALUE, - # "x-allowed-locations": "0x0", } impersonation_request_data = { "delegates": None, @@ -2348,7 +2339,6 @@ def test_refresh_success_with_supplier_with_impersonation( "authorization": "Bearer {}".format(self.SUCCESS_RESPONSE["access_token"]), "x-goog-user-project": QUOTA_PROJECT_ID, "x-goog-api-client": IMPERSONATE_ACCESS_TOKEN_REQUEST_METRICS_HEADER_VALUE, - # "x-allowed-locations": "0x0", } impersonation_request_data = { "delegates": None, diff --git a/tests/test_credentials.py b/tests/test_credentials.py index 0d64f0e0a..993836ddb 100644 --- a/tests/test_credentials.py +++ b/tests/test_credentials.py @@ -23,10 +23,14 @@ from google.auth import credentials from google.auth import environment_vars from google.auth import exceptions -from google.oauth2 import _client -class CredentialsImpl(credentials.CredentialsWithTrustBoundary): +class CredentialsImpl(credentials.CredentialsWithRegionalAccessBoundary): + def __init__(self, universe_domain=None): + super(CredentialsImpl, self).__init__() + if universe_domain: + self._universe_domain = universe_domain + def _perform_refresh_token(self, request): self.token = "refreshed-token" self.expiry = ( @@ -38,10 +42,15 @@ def _perform_refresh_token(self, request): def with_quota_project(self, quota_project_id): raise NotImplementedError() - def _build_trust_boundary_lookup_url(self): + def _build_regional_access_boundary_lookup_url(self): # Using self.token here to make the URL dynamic for testing purposes return "http://mock.url/lookup_for_{}".format(self.token) + def _make_copy(self): + new_credentials = self.__class__() + self._copy_regional_access_boundary_state(new_credentials) + return new_credentials + class CredentialsImplWithMetrics(credentials.Credentials): def refresh(self, request): @@ -119,10 +128,13 @@ def test_before_request(): assert "x-allowed-locations" not in headers -def test_before_request_with_trust_boundary(): +def test_before_request_with_regional_access_boundary(): DUMMY_BOUNDARY = "0xA30" credentials = CredentialsImpl() - credentials._trust_boundary = {"locations": [], "encodedLocations": DUMMY_BOUNDARY} + credentials._regional_access_boundary = { + "locations": [], + "encodedLocations": DUMMY_BOUNDARY, + } request = mock.Mock() headers = {} @@ -366,113 +378,251 @@ def test_token_state_no_expiry(): c.before_request(request, "http://example.com", "GET", {}) -class TestCredentialsWithTrustBoundary(object): - @mock.patch.object(_client, "_lookup_trust_boundary") - def test_lookup_trust_boundary_env_var_not_true(self, mock_lookup_tb): +class TestCredentialsWithRegionalAccessBoundary(object): + @mock.patch( + "google.auth._regional_access_boundary_utils._RegionalAccessBoundaryRefreshManager.start_refresh" + ) + def test_maybe_start_refresh_is_skipped_if_env_var_not_set( + self, mock_start_refresh + ): creds = CredentialsImpl() - request = mock.Mock() + with mock.patch.dict(os.environ, clear=True): + creds._maybe_start_regional_access_boundary_refresh( + mock.Mock(), "http://example.com" + ) + mock_start_refresh.assert_not_called() - # Ensure env var is not "true" + @mock.patch( + "google.auth._regional_access_boundary_utils._RegionalAccessBoundaryRefreshManager.start_refresh" + ) + def test_maybe_start_refresh_is_skipped_if_not_expired(self, mock_start_refresh): + creds = CredentialsImpl() + creds._regional_access_boundary = {"encodedLocations": "0xABC"} + creds._regional_access_boundary_expiry = _helpers.utcnow() + datetime.timedelta( + hours=2 + ) with mock.patch.dict( - os.environ, {environment_vars.GOOGLE_AUTH_TRUST_BOUNDARY_ENABLED: "false"} + os.environ, + {environment_vars.GOOGLE_AUTH_TRUST_BOUNDARY_ENABLED: "true"}, ): - result = creds._refresh_trust_boundary(request) + creds._maybe_start_regional_access_boundary_refresh( + mock.Mock(), "http://example.com" + ) + mock_start_refresh.assert_not_called() - assert result is None - mock_lookup_tb.assert_not_called() - - @mock.patch.object(_client, "_lookup_trust_boundary") - def test_lookup_trust_boundary_env_var_missing(self, mock_lookup_tb): + @mock.patch( + "google.auth._regional_access_boundary_utils._RegionalAccessBoundaryRefreshManager.start_refresh" + ) + def test_maybe_start_refresh_triggered_if_soft_expired(self, mock_start_refresh): creds = CredentialsImpl() + creds._regional_access_boundary = {"encodedLocations": "0xABC"} + + creds._regional_access_boundary_expiry = _helpers.utcnow() + datetime.timedelta( + minutes=30 + ) request = mock.Mock() + with mock.patch.dict( + os.environ, + {environment_vars.GOOGLE_AUTH_TRUST_BOUNDARY_ENABLED: "true"}, + ): + creds._maybe_start_regional_access_boundary_refresh( + request, "http://example.com" + ) + mock_start_refresh.assert_called_once_with(creds, request) - # Ensure env var is missing - with mock.patch.dict(os.environ, clear=True): - result = creds._refresh_trust_boundary(request) + @mock.patch( + "google.auth._regional_access_boundary_utils._RegionalAccessBoundaryRefreshManager.start_refresh" + ) + def test_maybe_start_refresh_is_skipped_if_cooldown_active( + self, mock_start_refresh + ): + creds = CredentialsImpl() + creds._regional_access_boundary_cooldown_expiry = ( + _helpers.utcnow() + datetime.timedelta(minutes=5) + ) + with mock.patch.dict( + os.environ, + {environment_vars.GOOGLE_AUTH_TRUST_BOUNDARY_ENABLED: "true"}, + ): + creds._maybe_start_regional_access_boundary_refresh( + mock.Mock(), "http://example.com" + ) + mock_start_refresh.assert_not_called() - assert result is None - mock_lookup_tb.assert_not_called() + @mock.patch( + "google.auth._regional_access_boundary_utils._RegionalAccessBoundaryRefreshManager.start_refresh" + ) + def test_maybe_start_refresh_is_skipped_for_regional_endpoint( + self, mock_start_refresh + ): + creds = CredentialsImpl() + with mock.patch.dict( + os.environ, + {environment_vars.GOOGLE_AUTH_TRUST_BOUNDARY_ENABLED: "true"}, + ): + creds._maybe_start_regional_access_boundary_refresh( + mock.Mock(), "https://my-service.us-east1.rep.googleapis.com" + ) + mock_start_refresh.assert_not_called() - @mock.patch.object(_client, "_lookup_trust_boundary") - def test_lookup_trust_boundary_non_default_universe(self, mock_lookup_tb): + @mock.patch( + "google.auth._regional_access_boundary_utils._RegionalAccessBoundaryRefreshManager.start_refresh" + ) + def test_maybe_start_refresh_is_triggered(self, mock_start_refresh): creds = CredentialsImpl() - creds._universe_domain = "my.universe.com" # Non-GDU request = mock.Mock() - with mock.patch.dict( - os.environ, {environment_vars.GOOGLE_AUTH_TRUST_BOUNDARY_ENABLED: "true"} + os.environ, + {environment_vars.GOOGLE_AUTH_TRUST_BOUNDARY_ENABLED: "true"}, ): - result = creds._refresh_trust_boundary(request) + creds._maybe_start_regional_access_boundary_refresh( + request, "http://example.com" + ) + mock_start_refresh.assert_called_once_with(creds, request) - assert result is None - mock_lookup_tb.assert_not_called() + def test_get_regional_access_boundary_header(self): + creds = CredentialsImpl() + creds._regional_access_boundary = {"encodedLocations": "0xABC"} + headers = creds._get_regional_access_boundary_header() + assert headers == {"x-allowed-locations": "0xABC"} + + creds._regional_access_boundary = None + headers = creds._get_regional_access_boundary_header() + assert headers == {} + + def test_copy_regional_access_boundary_state(self): + source_creds = CredentialsImpl() + source_creds._regional_access_boundary = {"encodedLocations": "0xABC"} + source_creds._regional_access_boundary_expiry = _helpers.utcnow() + source_creds._regional_access_boundary_cooldown_expiry = _helpers.utcnow() + + target_creds = CredentialsImpl() + source_creds._copy_regional_access_boundary_state(target_creds) + + assert ( + target_creds._regional_access_boundary + == source_creds._regional_access_boundary + ) + assert ( + target_creds._regional_access_boundary_expiry + == source_creds._regional_access_boundary_expiry + ) + assert ( + target_creds._regional_access_boundary_cooldown_expiry + == source_creds._regional_access_boundary_cooldown_expiry + ) - @mock.patch.object(_client, "_lookup_trust_boundary") - def test_lookup_trust_boundary_calls_client_and_build_url(self, mock_lookup_tb): + def test_with_regional_access_boundary_valid_input(self): creds = CredentialsImpl() - creds.token = "test_token" # For _build_trust_boundary_lookup_url - request = mock.Mock() - expected_url = "http://mock.url/lookup_for_test_token" - expected_boundary_info = {"encodedLocations": "0xABC"} - mock_lookup_tb.return_value = expected_boundary_info + rab_info = {"encodedLocations": "new_location"} + new_creds = creds._with_regional_access_boundary(rab_info) - # Mock _build_trust_boundary_lookup_url to ensure it's called. - mock_build_url = mock.Mock(return_value=expected_url) - creds._build_trust_boundary_lookup_url = mock_build_url + assert new_creds._regional_access_boundary == rab_info + assert new_creds._regional_access_boundary_expiry is not None + assert new_creds._regional_access_boundary_cooldown_expiry is None + def test_with_regional_access_boundary_malformed_input(self): + creds = CredentialsImpl() + with pytest.raises( + exceptions.InvalidValue, + match="regional_access_boundary must be a dictionary with an 'encodedLocations' key.", + ): + creds._with_regional_access_boundary({"bad_key": "bad_value"}) + with pytest.raises( + exceptions.InvalidValue, + match="regional_access_boundary must be a dictionary with an 'encodedLocations' key.", + ): + creds._with_regional_access_boundary("not_a_dict") + + @mock.patch( + "google.auth._regional_access_boundary_utils._RegionalAccessBoundaryRefreshManager.start_refresh" + ) + def test_maybe_start_refresh_is_skipped_if_non_default_universe_domain( + self, mock_start_refresh + ): + creds = CredentialsImpl(universe_domain="not.googleapis.com") + with mock.patch.dict( + os.environ, + {environment_vars.GOOGLE_AUTH_TRUST_BOUNDARY_ENABLED: "true"}, + ): + creds._maybe_start_regional_access_boundary_refresh( + mock.Mock(), "http://example.com" + ) + mock_start_refresh.assert_not_called() + + @mock.patch( + "google.auth._regional_access_boundary_utils._RegionalAccessBoundaryRefreshManager.start_refresh" + ) + def test_maybe_start_refresh_handles_url_parse_errors(self, mock_start_refresh): + creds = CredentialsImpl() + request = mock.Mock() with mock.patch.dict( - os.environ, {environment_vars.GOOGLE_AUTH_TRUST_BOUNDARY_ENABLED: "true"} + os.environ, + {environment_vars.GOOGLE_AUTH_TRUST_BOUNDARY_ENABLED: "true"}, ): - result = creds._lookup_trust_boundary(request) + creds._maybe_start_regional_access_boundary_refresh(request, "http://[") + mock_start_refresh.assert_called_once_with(creds, request) + + @mock.patch("google.oauth2._client._lookup_regional_access_boundary") + @mock.patch.object(CredentialsImpl, "_build_regional_access_boundary_lookup_url") + def test_lookup_regional_access_boundary_success( + self, mock_build_url, mock_lookup_rab + ): + creds = CredentialsImpl() + creds.token = "token" + request = mock.Mock() + mock_build_url.return_value = "http://rab.example.com" + mock_lookup_rab.return_value = {"encodedLocations": "success"} + + result = creds._lookup_regional_access_boundary(request) - assert result == expected_boundary_info mock_build_url.assert_called_once() - expected_headers = {"authorization": "Bearer test_token"} - mock_lookup_tb.assert_called_once_with( - request, expected_url, headers=expected_headers + mock_lookup_rab.assert_called_once_with( + request, "http://rab.example.com", headers={"authorization": "Bearer token"} ) + assert result == {"encodedLocations": "success"} - @mock.patch.object(_client, "_lookup_trust_boundary") - def test_lookup_trust_boundary_build_url_returns_none(self, mock_lookup_tb): + @mock.patch("google.oauth2._client._lookup_regional_access_boundary") + @mock.patch.object(CredentialsImpl, "_build_regional_access_boundary_lookup_url") + def test_lookup_regional_access_boundary_failure( + self, mock_build_url, mock_lookup_rab + ): creds = CredentialsImpl() + creds.token = "token" request = mock.Mock() + mock_build_url.return_value = "http://rab.example.com" + mock_lookup_rab.return_value = None - # Mock _build_trust_boundary_lookup_url to return None - mock_build_url = mock.Mock(return_value=None) - creds._build_trust_boundary_lookup_url = mock_build_url + result = creds._lookup_regional_access_boundary(request) - with mock.patch.dict( - os.environ, {environment_vars.GOOGLE_AUTH_TRUST_BOUNDARY_ENABLED: "true"} - ): - with pytest.raises( - exceptions.InvalidValue, - match="Failed to build trust boundary lookup URL.", - ): - creds._lookup_trust_boundary(request) - - mock_build_url.assert_called_once() # Ensure _build_trust_boundary_lookup_url was called - mock_lookup_tb.assert_not_called() # Ensure _client.lookup_trust_boundary was not called - - @mock.patch("google.auth.credentials._LOGGER") - @mock.patch("google.auth._helpers.is_logging_enabled", return_value=True) - @mock.patch.object(_client, "_lookup_trust_boundary") - def test_refresh_trust_boundary_fails_with_cached_data_and_logging( - self, mock_lookup_tb, mock_is_logging_enabled, mock_logger + mock_build_url.assert_called_once() + mock_lookup_rab.assert_called_once_with( + request, "http://rab.example.com", headers={"authorization": "Bearer token"} + ) + assert result is None + + @mock.patch("google.oauth2._client._lookup_regional_access_boundary") + @mock.patch.object(CredentialsImpl, "_build_regional_access_boundary_lookup_url") + def test_lookup_regional_access_boundary_null_url( + self, mock_build_url, mock_lookup_rab ): creds = CredentialsImpl() - creds._trust_boundary = {"encodedLocations": "0xABC"} + creds.token = "token" request = mock.Mock() + mock_build_url.return_value = None - refresh_error = exceptions.RefreshError("Lookup failed") - mock_lookup_tb.side_effect = refresh_error + result = creds._lookup_regional_access_boundary(request) - with mock.patch.dict( - os.environ, {environment_vars.GOOGLE_AUTH_TRUST_BOUNDARY_ENABLED: "true"} - ): - creds.refresh(request) + mock_build_url.assert_called_once() + mock_lookup_rab.assert_not_called() + assert result is None - mock_lookup_tb.assert_called_once() - mock_is_logging_enabled.assert_called_once_with(mock_logger) - mock_logger.debug.assert_called_once_with( - "Using cached trust boundary due to refresh error: %s", refresh_error + def test_credentials_with_regional_access_boundary_initialization(self): + creds = CredentialsImpl() + assert creds._regional_access_boundary is None + assert creds._regional_access_boundary_expiry is None + assert creds._regional_access_boundary_cooldown_expiry is None + assert creds._current_rab_cooldown_duration == ( + credentials._regional_access_boundary_utils.DEFAULT_REGIONAL_ACCESS_BOUNDARY_COOLDOWN ) + assert creds._stale_boundary_lock is not None diff --git a/tests/test_external_account.py b/tests/test_external_account.py index e50a1b8c0..30cd49b11 100644 --- a/tests/test_external_account.py +++ b/tests/test_external_account.py @@ -15,14 +15,12 @@ import datetime import http.client as http_client import json -import os from unittest import mock import urllib import pytest # type: ignore from google.auth import _helpers -from google.auth import environment_vars from google.auth import exceptions from google.auth import external_account from google.auth import transport @@ -128,7 +126,6 @@ class TestCredentials(object): "status": "INVALID_ARGUMENT", } } - NO_OP_TRUST_BOUNDARY = {"locations": [], "encodedLocations": "0x0"} VALID_TRUST_BOUNDARY = { "locations": ["us-central1", "us-east1"], "encodedLocations": "0xVALIDHEXSA", @@ -158,7 +155,6 @@ def make_credentials( service_account_impersonation_url=None, service_account_impersonation_options={}, universe_domain=DEFAULT_UNIVERSE_DOMAIN, - trust_boundary=None, ): return CredentialsImpl( audience=cls.AUDIENCE, @@ -174,7 +170,6 @@ def make_credentials( scopes=scopes, default_scopes=default_scopes, universe_domain=universe_domain, - trust_boundary=trust_boundary, ) @classmethod @@ -187,7 +182,6 @@ def make_workforce_pool_credentials( default_scopes=None, service_account_impersonation_url=None, workforce_pool_user_project=None, - trust_boundary=None, ): return CredentialsImpl( audience=cls.WORKFORCE_AUDIENCE, @@ -201,7 +195,6 @@ def make_workforce_pool_credentials( scopes=scopes, default_scopes=default_scopes, workforce_pool_user_project=workforce_pool_user_project, - trust_boundary=trust_boundary, ) @classmethod @@ -358,6 +351,16 @@ def test_with_scopes(self): assert scoped_credentials.has_scopes(["email"]) assert not scoped_credentials.requires_scopes + def test_with_scopes_copies_regional_access_boundary(self): + credentials = self.make_credentials() + credentials = credentials._with_regional_access_boundary( + self.VALID_TRUST_BOUNDARY + ) + scoped_credentials = credentials.with_scopes(["email"]) + + assert scoped_credentials.has_scopes(["email"]) + assert scoped_credentials._regional_access_boundary == self.VALID_TRUST_BOUNDARY + def test_with_scopes_workforce_pool(self): credentials = self.make_workforce_pool_credentials( workforce_pool_user_project=self.WORKFORCE_POOL_USER_PROJECT @@ -435,7 +438,6 @@ def test_with_scopes_full_options_propagated(self): scopes=["email"], default_scopes=["default2"], universe_domain=DEFAULT_UNIVERSE_DOMAIN, - trust_boundary=None, ) def test_with_token_uri(self): @@ -524,7 +526,6 @@ def test_with_quota_project_full_options_propagated(self): scopes=self.SCOPES, default_scopes=["default1"], universe_domain=DEFAULT_UNIVERSE_DOMAIN, - trust_boundary=None, ) # Confirm with_quota_project sets the correct quota project after @@ -719,24 +720,6 @@ def test_refresh_without_client_auth_success( assert not credentials.expired assert credentials.token == response["access_token"] - @mock.patch("google.auth.external_account.Credentials._lookup_trust_boundary") - def test_refresh_skips_trust_boundary_lookup_when_disabled( - self, mock_lookup_trust_boundary - ): - credentials = self.make_credentials() - request = self.make_mock_request( - status=http_client.OK, data=self.SUCCESS_RESPONSE - ) - - credentials.refresh(request) - - assert credentials.valid - assert credentials.token == self.SUCCESS_RESPONSE["access_token"] - mock_lookup_trust_boundary.assert_not_called() - headers_applied = {} - credentials.apply(headers_applied) - assert "x-allowed-locations" not in headers_applied - def test_perform_refresh_token_with_cert_fingerprint(self): credentials = self.make_credentials() credentials._sts_client = mock.MagicMock() @@ -755,240 +738,6 @@ def test_perform_refresh_token_with_cert_fingerprint(self): _, kwargs = credentials._sts_client.exchange_token.call_args assert kwargs["additional_options"]["bindCertFingerprint"] == "my-fingerprint" - def test_refresh_skips_sending_allowed_locations_header_with_trust_boundary(self): - # This test verifies that the x-allowed-locations header is not sent with - # the STS request even if a trust boundary is cached. - trust_boundary_value = {"encodedLocations": "0x12345"} - headers = { - "Content-Type": "application/x-www-form-urlencoded", - "x-goog-api-client": "gl-python/3.7 auth/1.1 google-byoid-sdk sa-impersonation/false config-lifetime/false", - } - request_data = { - "grant_type": "urn:ietf:params:oauth:grant-type:token-exchange", - "audience": self.AUDIENCE, - "requested_token_type": "urn:ietf:params:oauth:token-type:access_token", - "subject_token": "subject_token_0", - "subject_token_type": self.SUBJECT_TOKEN_TYPE, - } - request = self.make_mock_request( - status=http_client.OK, data=self.SUCCESS_RESPONSE - ) - credentials = self.make_credentials() - # Set a cached trust boundary. - credentials._trust_boundary = trust_boundary_value - - with mock.patch( - "google.auth.metrics.python_and_auth_lib_version", - return_value=LANG_LIBRARY_METRICS_HEADER_VALUE, - ): - credentials.refresh(request) - - self.assert_token_request_kwargs(request.call_args[1], headers, request_data) - - def test_refresh_on_impersonated_credential_skips_parent_trust_boundary_lookup( - self, - ): - # This test verifies that the top-level impersonating credential - # does not perform a trust boundary lookup. - request = self.make_mock_request( - status=http_client.OK, - data=self.SUCCESS_RESPONSE, - impersonation_status=http_client.OK, - impersonation_data={ - "accessToken": "SA_ACCESS_TOKEN", - "expireTime": "2025-01-01T00:00:00Z", - }, - ) - credentials = self.make_credentials( - service_account_impersonation_url=self.SERVICE_ACCOUNT_IMPERSONATION_URL - ) - - with mock.patch.object( - credentials, "_refresh_trust_boundary", autospec=True - ) as mock_refresh_trust_boundary: - credentials.refresh(request) - - mock_refresh_trust_boundary.assert_not_called() - - def test_refresh_fetches_no_op_trust_boundary(self): - request = self.make_mock_request( - status=http_client.OK, data=self.SUCCESS_RESPONSE - ) - credentials = self.make_credentials() - - with mock.patch.object( - credentials, - "_lookup_trust_boundary", - return_value=self.NO_OP_TRUST_BOUNDARY, - ) as mock_lookup, mock.patch.dict( - os.environ, {environment_vars.GOOGLE_AUTH_TRUST_BOUNDARY_ENABLED: "true"} - ): - credentials.refresh(request) - - mock_lookup.assert_called_once() - headers = {} - credentials.apply(headers) - assert headers["x-allowed-locations"] == "" - - def test_refresh_skips_lookup_with_cached_no_op_boundary(self): - request = self.make_mock_request( - status=http_client.OK, data=self.SUCCESS_RESPONSE - ) - credentials = self.make_credentials() - credentials._trust_boundary = self.NO_OP_TRUST_BOUNDARY - - with mock.patch.object( - credentials, "_lookup_trust_boundary" - ) as mock_lookup, mock.patch.dict( - os.environ, {environment_vars.GOOGLE_AUTH_TRUST_BOUNDARY_ENABLED: "true"} - ): - credentials.refresh(request) - - mock_lookup.assert_not_called() - headers = {} - credentials.apply(headers) - assert headers["x-allowed-locations"] == "" - - def test_refresh_fails_on_lookup_failure_with_no_cache(self): - request = self.make_mock_request( - status=http_client.OK, data=self.SUCCESS_RESPONSE - ) - credentials = self.make_credentials() - - with mock.patch.object( - credentials, - "_lookup_trust_boundary", - side_effect=exceptions.RefreshError("Lookup failed"), - ) as mock_lookup, mock.patch.dict( - os.environ, {environment_vars.GOOGLE_AUTH_TRUST_BOUNDARY_ENABLED: "true"} - ), pytest.raises( - exceptions.RefreshError, match="Lookup failed" - ): - credentials.refresh(request) - - mock_lookup.assert_called_once() - - def test_refresh_uses_cached_boundary_on_lookup_failure(self): - request = self.make_mock_request( - status=http_client.OK, data=self.SUCCESS_RESPONSE - ) - credentials = self.make_credentials() - credentials._trust_boundary = {"encodedLocations": "0x123"} - - with mock.patch.object( - credentials, - "_lookup_trust_boundary", - side_effect=exceptions.RefreshError("Lookup failed"), - ) as mock_lookup, mock.patch.dict( - os.environ, {environment_vars.GOOGLE_AUTH_TRUST_BOUNDARY_ENABLED: "true"} - ): - credentials.refresh(request) - - mock_lookup.assert_called_once() - headers = {} - credentials.apply(headers) - assert headers["x-allowed-locations"] == "0x123" - - def test_refresh_propagates_trust_boundary_to_impersonated_credential(self): - request = self.make_mock_request( - status=http_client.OK, data=self.SUCCESS_RESPONSE - ) - credentials = self.make_credentials( - service_account_impersonation_url=self.SERVICE_ACCOUNT_IMPERSONATION_URL, - trust_boundary=self.VALID_TRUST_BOUNDARY, - ) - impersonated_creds_mock = mock.Mock() - impersonated_creds_mock._trust_boundary = self.VALID_TRUST_BOUNDARY - - with mock.patch( - "google.auth.external_account.impersonated_credentials.Credentials", - return_value=impersonated_creds_mock, - ) as mock_impersonated_creds, mock.patch.dict( - os.environ, {environment_vars.GOOGLE_AUTH_TRUST_BOUNDARY_ENABLED: "true"} - ): - credentials.refresh(request) - - mock_impersonated_creds.assert_called_once_with( - source_credentials=mock.ANY, - target_principal=mock.ANY, - target_scopes=mock.ANY, - quota_project_id=mock.ANY, - iam_endpoint_override=mock.ANY, - lifetime=mock.ANY, - trust_boundary=self.VALID_TRUST_BOUNDARY, - ) - assert credentials._trust_boundary == self.VALID_TRUST_BOUNDARY - - def test_build_trust_boundary_lookup_url_workload(self): - credentials = self.make_credentials() - expected_url = "https://iamcredentials.googleapis.com/v1/projects/123456/locations/global/workloadIdentityPools/POOL_ID/allowedLocations" - assert credentials._build_trust_boundary_lookup_url() == expected_url - - def test_build_trust_boundary_lookup_url_workforce(self): - credentials = self.make_workforce_pool_credentials() - expected_url = "https://iamcredentials.googleapis.com/v1/locations/global/workforcePools/POOL_ID/allowedLocations" - assert credentials._build_trust_boundary_lookup_url() == expected_url - - @pytest.mark.parametrize( - "audience", - [ - "invalid", - "//iam.googleapis.com/projects/locations/global/workloadIdentityPools/POOL_ID/providers/PROVIDER_ID", - "//iam.googleapis.com/locations/global/workforcsePools//providers/provider-id", - ], - ) - def test_build_trust_boundary_lookup_url_invalid_audience(self, audience): - credentials = self.make_credentials() - credentials._audience = audience - with pytest.raises(exceptions.InvalidValue, match="Invalid audience format."): - credentials._build_trust_boundary_lookup_url() - - def test_refresh_fetches_trust_boundary_workload(self): - request = self.make_mock_request( - status=http_client.OK, data=self.SUCCESS_RESPONSE - ) - credentials = self.make_credentials() - - with mock.patch.object( - credentials, - "_lookup_trust_boundary", - return_value=self.VALID_TRUST_BOUNDARY, - ) as mock_lookup, mock.patch.dict( - os.environ, {environment_vars.GOOGLE_AUTH_TRUST_BOUNDARY_ENABLED: "true"} - ): - credentials.refresh(request) - - mock_lookup.assert_called_once() - headers = {} - credentials.apply(headers) - assert ( - headers["x-allowed-locations"] - == self.VALID_TRUST_BOUNDARY["encodedLocations"] - ) - - def test_refresh_fetches_trust_boundary_workforce(self): - request = self.make_mock_request( - status=http_client.OK, data=self.SUCCESS_RESPONSE - ) - credentials = self.make_workforce_pool_credentials() - - with mock.patch.object( - credentials, - "_lookup_trust_boundary", - return_value=self.VALID_TRUST_BOUNDARY, - ) as mock_lookup, mock.patch.dict( - os.environ, {environment_vars.GOOGLE_AUTH_TRUST_BOUNDARY_ENABLED: "true"} - ): - credentials.refresh(request) - - mock_lookup.assert_called_once() - headers = {} - credentials.apply(headers) - assert ( - headers["x-allowed-locations"] - == self.VALID_TRUST_BOUNDARY["encodedLocations"] - ) - @mock.patch( "google.auth.metrics.python_and_auth_lib_version", return_value=LANG_LIBRARY_METRICS_HEADER_VALUE, @@ -1990,34 +1739,37 @@ def test_before_request_expired(self, utcnow): "authorization": "Bearer {}".format(self.SUCCESS_RESPONSE["access_token"]) } - def test_refresh_impersonation_trust_boundary(self): - request = self.make_mock_request( - status=http_client.OK, - data=self.SUCCESS_RESPONSE, - impersonation_status=http_client.OK, - impersonation_data={ - "accessToken": "SA_ACCESS_TOKEN", - "expireTime": "2025-01-01T00:00:00Z", - }, - ) - credentials = self.make_credentials( - service_account_impersonation_url=self.SERVICE_ACCOUNT_IMPERSONATION_URL - ) - impersonated_creds_mock = mock.Mock() - impersonated_creds_mock._trust_boundary = self.VALID_TRUST_BOUNDARY + def test_build_regional_access_boundary_lookup_url_workload(self): + credentials = self.make_credentials() + expected_url = "https://iamcredentials.googleapis.com/v1/projects/123456/locations/global/workloadIdentityPools/POOL_ID/allowedLocations" + assert credentials._build_regional_access_boundary_lookup_url() == expected_url - with mock.patch( - "google.auth.external_account.impersonated_credentials.Credentials", - return_value=impersonated_creds_mock, - ): - credentials.refresh(request) + def test_build_regional_access_boundary_lookup_url_workforce(self): + credentials = self.make_workforce_pool_credentials() + expected_url = "https://iamcredentials.googleapis.com/v1/locations/global/workforcePools/POOL_ID/allowedLocations" + assert credentials._build_regional_access_boundary_lookup_url() == expected_url - assert credentials._trust_boundary == self.VALID_TRUST_BOUNDARY + @pytest.mark.parametrize( + "audience", + [ + "invalid", + "//iam.googleapis.com/projects/locations/global/workloadIdentityPools/POOL_ID/providers/PROVIDER_ID", + "//iam.googleapis.com/locations/global/workforcsePools//providers/provider-id", + ], + ) + def test_build_regional_access_boundary_lookup_url_invalid_audience(self, audience): + credentials = self.make_credentials() + credentials._audience = audience + assert credentials._build_regional_access_boundary_lookup_url() is None - def test_with_trust_boundary(self): + def test_with_regional_access_boundary(self): credentials = self.make_credentials() - new_credentials = credentials.with_trust_boundary(self.VALID_TRUST_BOUNDARY) - assert new_credentials._trust_boundary == self.VALID_TRUST_BOUNDARY + new_credentials = credentials._with_regional_access_boundary( + {"encodedLocations": "new_boundary"} + ) + assert new_credentials._regional_access_boundary == { + "encodedLocations": "new_boundary" + } @mock.patch("google.auth._helpers.utcnow") def test_before_request_impersonation_expired(self, utcnow): diff --git a/tests/test_external_account_authorized_user.py b/tests/test_external_account_authorized_user.py index b023ddf3d..e7ff6d951 100644 --- a/tests/test_external_account_authorized_user.py +++ b/tests/test_external_account_authorized_user.py @@ -15,12 +15,10 @@ import datetime import http.client as http_client import json -import os from unittest import mock import pytest # type: ignore -from google.auth import environment_vars from google.auth import exceptions from google.auth import external_account_authorized_user from google.auth import transport @@ -559,26 +557,6 @@ def test_with_universe_domain(self): assert new_creds._quota_project_id == QUOTA_PROJECT_ID assert new_creds.universe_domain == FAKE_UNIVERSE_DOMAIN - def test_with_trust_boundary(self): - creds = self.make_credentials( - token=ACCESS_TOKEN, - expiry=NOW, - revoke_url=REVOKE_URL, - quota_project_id=QUOTA_PROJECT_ID, - ) - new_creds = creds.with_trust_boundary({"encodedLocations": "new_boundary"}) - assert new_creds._audience == creds._audience - assert new_creds._refresh_token == creds.refresh_token - assert new_creds._token_url == creds._token_url - assert new_creds._token_info_url == creds._token_info_url - assert new_creds._client_id == creds._client_id - assert new_creds._client_secret == creds._client_secret - assert new_creds.token == creds.token - assert new_creds.expiry == creds.expiry - assert new_creds._revoke_url == creds._revoke_url - assert new_creds._quota_project_id == QUOTA_PROJECT_ID - assert new_creds._trust_boundary == {"encodedLocations": "new_boundary"} - def test_from_file_required_options_only(self, tmpdir): from_creds = self.make_credentials() config_file = tmpdir.join("config.json") @@ -623,64 +601,33 @@ def test_from_file_full_options(self, tmpdir): assert creds._revoke_url == REVOKE_URL assert creds._quota_project_id == QUOTA_PROJECT_ID - def test_refresh_fetches_trust_boundary(self): - request = self.make_mock_request( - status=http_client.OK, - data={"access_token": ACCESS_TOKEN, "expires_in": 3600}, + def test_with_regional_access_boundary(self): + creds = self.make_credentials( + token=ACCESS_TOKEN, + expiry=NOW, + revoke_url=REVOKE_URL, + quota_project_id=QUOTA_PROJECT_ID, ) - credentials = self.make_credentials() - - with mock.patch.object( - credentials, - "_lookup_trust_boundary", - return_value={"encodedLocations": "0x123"}, - ) as mock_lookup, mock.patch.dict( - os.environ, {environment_vars.GOOGLE_AUTH_TRUST_BOUNDARY_ENABLED: "true"} - ): - credentials.refresh(request) - - mock_lookup.assert_called_once() - headers = {} - credentials.apply(headers) - assert headers["x-allowed-locations"] == "0x123" - - def test_refresh_skips_trust_boundary_lookup_when_disabled(self): - request = self.make_mock_request( - status=http_client.OK, - data={"access_token": ACCESS_TOKEN, "expires_in": 3600}, + new_creds = creds._with_regional_access_boundary( + {"encodedLocations": "new_boundary"} ) - credentials = self.make_credentials() - - with mock.patch.object( - credentials, "_lookup_trust_boundary" - ) as mock_lookup, mock.patch.dict(os.environ, {}, clear=True): - credentials.refresh(request) - - mock_lookup.assert_not_called() - headers = {} - credentials.apply(headers) - assert "x-allowed-locations" not in headers + assert new_creds is not creds + assert new_creds._regional_access_boundary == { + "encodedLocations": "new_boundary" + } - def test_build_trust_boundary_lookup_url(self): + def test_build_regional_access_boundary_lookup_url(self): credentials = self.make_credentials() expected_url = "https://iamcredentials.googleapis.com/v1/locations/global/workforcePools/POOL_ID/allowedLocations" - assert credentials._build_trust_boundary_lookup_url() == expected_url + assert credentials._build_regional_access_boundary_lookup_url() == expected_url @pytest.mark.parametrize( "audience", [ "invalid", - "//iam.googleapis.com/locations/global/workforcePools/", "//iam.googleapis.com/locations/global/providers/", - "//iam.googleapis.com/workforcePools/POOL_ID/providers/PROVIDER_ID", ], ) - def test_build_trust_boundary_lookup_url_invalid_audience(self, audience): + def test_build_regional_access_boundary_lookup_url_invalid_audience(self, audience): credentials = self.make_credentials(audience=audience) - with pytest.raises(exceptions.InvalidValue): - credentials._build_trust_boundary_lookup_url() - - def test_build_trust_boundary_lookup_url_different_universe(self): - credentials = self.make_credentials(universe_domain=FAKE_UNIVERSE_DOMAIN) - expected_url = "https://iamcredentials.fake-universe-domain/v1/locations/global/workforcePools/POOL_ID/allowedLocations" - assert credentials._build_trust_boundary_lookup_url() == expected_url + assert credentials._build_regional_access_boundary_lookup_url() is None diff --git a/tests/test_identity_pool.py b/tests/test_identity_pool.py index c68fac647..77b9de229 100644 --- a/tests/test_identity_pool.py +++ b/tests/test_identity_pool.py @@ -385,9 +385,6 @@ def assert_underlying_credentials_refresh( "Content-Type": "application/json", "authorization": "Bearer {}".format(token_response["access_token"]), "x-goog-api-client": metrics_header_value, - # TODO(negarb): Uncomment and update when trust boundary is supported - # for external account credentials. - # "x-allowed-locations": "0x0", } impersonation_request_data = { "delegates": None, @@ -508,7 +505,6 @@ def test_from_info_full_options(self, mock_init): quota_project_id=QUOTA_PROJECT_ID, workforce_pool_user_project=None, universe_domain=DEFAULT_UNIVERSE_DOMAIN, - trust_boundary=None, ) @mock.patch.object(identity_pool.Credentials, "__init__", return_value=None) @@ -538,7 +534,6 @@ def test_from_info_required_options_only(self, mock_init): quota_project_id=None, workforce_pool_user_project=None, universe_domain=DEFAULT_UNIVERSE_DOMAIN, - trust_boundary=None, ) @mock.patch.object(identity_pool.Credentials, "__init__", return_value=None) @@ -570,7 +565,6 @@ def test_from_info_supplier(self, mock_init): quota_project_id=None, workforce_pool_user_project=None, universe_domain=DEFAULT_UNIVERSE_DOMAIN, - trust_boundary=None, ) @mock.patch.object(identity_pool.Credentials, "__init__", return_value=None) @@ -601,7 +595,6 @@ def test_from_info_workforce_pool(self, mock_init): quota_project_id=None, workforce_pool_user_project=WORKFORCE_POOL_USER_PROJECT, universe_domain=DEFAULT_UNIVERSE_DOMAIN, - trust_boundary=None, ) @mock.patch.object(identity_pool.Credentials, "__init__", return_value=None) @@ -638,7 +631,6 @@ def test_from_file_full_options(self, mock_init, tmpdir): quota_project_id=QUOTA_PROJECT_ID, workforce_pool_user_project=None, universe_domain=DEFAULT_UNIVERSE_DOMAIN, - trust_boundary=None, ) @mock.patch.object(identity_pool.Credentials, "__init__", return_value=None) @@ -669,7 +661,6 @@ def test_from_file_required_options_only(self, mock_init, tmpdir): quota_project_id=None, workforce_pool_user_project=None, universe_domain=DEFAULT_UNIVERSE_DOMAIN, - trust_boundary=None, ) @mock.patch.object(identity_pool.Credentials, "__init__", return_value=None) @@ -701,7 +692,6 @@ def test_from_file_workforce_pool(self, mock_init, tmpdir): quota_project_id=None, workforce_pool_user_project=WORKFORCE_POOL_USER_PROJECT, universe_domain=DEFAULT_UNIVERSE_DOMAIN, - trust_boundary=None, ) def test_constructor_nonworkforce_with_workforce_pool_user_project(self): diff --git a/tests/test_impersonated_credentials.py b/tests/test_impersonated_credentials.py index 3ff7281a6..6c5dd4aaf 100644 --- a/tests/test_impersonated_credentials.py +++ b/tests/test_impersonated_credentials.py @@ -22,7 +22,6 @@ import pytest # type: ignore from google.auth import _helpers -from google.auth import credentials as auth_credentials from google.auth import crypt from google.auth import environment_vars from google.auth import exceptions @@ -128,10 +127,6 @@ class TestImpersonatedCredentials(object): # Because Python 2.7: DELEGATES = [] # type: ignore LIFETIME = 3600 - NO_OP_TRUST_BOUNDARY = { - "locations": auth_credentials.NO_OP_TRUST_BOUNDARY_LOCATIONS, - "encodedLocations": auth_credentials.NO_OP_TRUST_BOUNDARY_ENCODED_LOCATIONS, - } VALID_TRUST_BOUNDARY = { "locations": ["us-central1", "us-east1"], "encodedLocations": "0xVALIDHEX", @@ -142,7 +137,7 @@ class TestImpersonatedCredentials(object): ) FAKE_UNIVERSE_DOMAIN = "universe.foo" SOURCE_CREDENTIALS = service_account.Credentials( - SIGNER, SERVICE_ACCOUNT_EMAIL, TOKEN_URI, trust_boundary=NO_OP_TRUST_BOUNDARY + SIGNER, SERVICE_ACCOUNT_EMAIL, TOKEN_URI ) USER_SOURCE_CREDENTIALS = credentials.Credentials(token="ABCDE") IAM_ENDPOINT_OVERRIDE = ( @@ -157,7 +152,6 @@ def make_credentials( target_principal=TARGET_PRINCIPAL, subject=None, iam_endpoint_override=None, - trust_boundary=None, # Align with Credentials class default ): return Credentials( source_credentials=source_credentials, @@ -167,7 +161,6 @@ def make_credentials( lifetime=lifetime, subject=subject, iam_endpoint_override=iam_endpoint_override, - trust_boundary=trust_boundary, ) def test_from_impersonated_service_account_info(self): @@ -178,17 +171,6 @@ def test_from_impersonated_service_account_info(self): ) assert isinstance(credentials, impersonated_credentials.Credentials) - def test_from_impersonated_service_account_info_with_trust_boundary(self): - info = copy.deepcopy(IMPERSONATED_SERVICE_ACCOUNT_AUTHORIZED_USER_SOURCE_INFO) - info["trust_boundary"] = self.VALID_TRUST_BOUNDARY - credentials = ( - impersonated_credentials.Credentials.from_impersonated_service_account_info( - info - ) - ) - assert isinstance(credentials, impersonated_credentials.Credentials) - assert credentials._trust_boundary == self.VALID_TRUST_BOUNDARY - def test_from_impersonated_service_account_info_with_invalid_source_credentials_type( self, ): @@ -311,12 +293,9 @@ def test_token_usage_metrics(self): assert headers["x-goog-api-client"] == "cred-type/imp" @pytest.mark.parametrize("use_data_bytes", [True, False]) - @mock.patch("google.oauth2._client._lookup_trust_boundary") - def test_refresh_success( - self, mock_lookup_trust_boundary, use_data_bytes, mock_donor_credentials - ): + def test_refresh_success(self, use_data_bytes, mock_donor_credentials): # Start with no boundary. - credentials = self.make_credentials(lifetime=None, trust_boundary=None) + credentials = self.make_credentials(lifetime=None) token = "token" expire_time = ( @@ -330,9 +309,6 @@ def test_refresh_success( use_data_bytes=use_data_bytes, ) - # Mock the trust boundary lookup to return a valid value. - mock_lookup_trust_boundary.return_value = self.VALID_TRUST_BOUNDARY - with mock.patch.dict( os.environ, {environment_vars.GOOGLE_AUTH_TRUST_BOUNDARY_ENABLED: "true"} ), mock.patch( @@ -348,34 +324,8 @@ def test_refresh_success( == ACCESS_TOKEN_REQUEST_METRICS_HEADER_VALUE ) - # Verify that the x-allowed-locations header from the source credential - # was applied. The source credential has a NO_OP boundary, so the - # header should be an empty string. - request_kwargs = request.call_args[1] - assert "headers" in request_kwargs - assert "x-allowed-locations" in request_kwargs["headers"] - assert request_kwargs["headers"]["x-allowed-locations"] == "" - - # Verify trust boundary was set. - assert credentials._trust_boundary == self.VALID_TRUST_BOUNDARY - - # Verify the mock was called with the correct URL. - mock_lookup_trust_boundary.assert_called_once_with( - request, - self.EXPECTED_TRUST_BOUNDARY_LOOKUP_URL_DEFAULT_UNIVERSE, - headers={"authorization": "Bearer token"}, - ) - - # Verify x-allowed-locations header is set correctly by apply(). - headers_applied = {} - credentials.apply(headers_applied) - assert ( - headers_applied["x-allowed-locations"] - == self.VALID_TRUST_BOUNDARY["encodedLocations"] - ) - - def test_refresh_source_creds_no_trust_boundary(self): - # Use a source credential that does not support trust boundaries. + def test_refresh_source_creds_no_regional_access_boundary(self): + # Use a source credential that does not support regional access boundaries. source_credentials = credentials.Credentials(token="source_token") creds = self.make_credentials(source_credentials=source_credentials) token = "impersonated_token" @@ -396,191 +346,6 @@ def test_refresh_source_creds_no_trust_boundary(self): request_kwargs = request.call_args[1] assert "x-allowed-locations" not in request_kwargs["headers"] - @mock.patch("google.oauth2._client._lookup_trust_boundary") - def test_refresh_trust_boundary_lookup_fails_no_cache( - self, mock_lookup_trust_boundary, mock_donor_credentials - ): - # Start with no trust boundary - credentials = self.make_credentials(lifetime=None, trust_boundary=None) - token = "token" - - expire_time = ( - _helpers.utcnow().replace(microsecond=0) + datetime.timedelta(seconds=500) - ).isoformat("T") + "Z" - response_body = {"accessToken": token, "expireTime": expire_time} - - request = self.make_request( - data=json.dumps(response_body), status=http_client.OK - ) - - # Mock the trust boundary lookup to raise an error - mock_lookup_trust_boundary.side_effect = exceptions.RefreshError( - "Lookup failed" - ) - - with mock.patch.dict( - os.environ, {environment_vars.GOOGLE_AUTH_TRUST_BOUNDARY_ENABLED: "true"} - ), pytest.raises(exceptions.RefreshError) as excinfo: - credentials.refresh(request) - - assert "Lookup failed" in str(excinfo.value) - assert credentials._trust_boundary is None # Still no trust boundary - mock_lookup_trust_boundary.assert_called_once() - - @mock.patch("google.oauth2._client._lookup_trust_boundary") - def test_refresh_fetches_no_op_trust_boundary( - self, mock_lookup_trust_boundary, mock_donor_credentials - ): - # Start with no trust boundary - credentials = self.make_credentials(lifetime=None, trust_boundary=None) - token = "token" - expire_time = ( - _helpers.utcnow().replace(microsecond=0) + datetime.timedelta(seconds=500) - ).isoformat("T") + "Z" - response_body = {"accessToken": token, "expireTime": expire_time} - request = self.make_request( - data=json.dumps(response_body), status=http_client.OK - ) - - mock_lookup_trust_boundary.return_value = ( - self.NO_OP_TRUST_BOUNDARY - ) # Mock returns NO_OP - - with mock.patch.dict( - os.environ, {environment_vars.GOOGLE_AUTH_TRUST_BOUNDARY_ENABLED: "true"} - ), mock.patch( - "google.auth.metrics.token_request_access_token_impersonate", - return_value=ACCESS_TOKEN_REQUEST_METRICS_HEADER_VALUE, - ): - credentials.refresh(request) - - assert credentials._trust_boundary == self.NO_OP_TRUST_BOUNDARY - mock_lookup_trust_boundary.assert_called_once_with( - request, - self.EXPECTED_TRUST_BOUNDARY_LOOKUP_URL_DEFAULT_UNIVERSE, - headers={"authorization": "Bearer token"}, - ) - headers_applied = {} - credentials.apply(headers_applied) - assert headers_applied["x-allowed-locations"] == "" - - @mock.patch("google.oauth2._client._lookup_trust_boundary") - def test_refresh_skips_trust_boundary_lookup_non_default_universe( - self, mock_lookup_trust_boundary - ): - # Create source credentials with a non-default universe domain - source_credentials = service_account.Credentials( - SIGNER, - "some@email.com", - TOKEN_URI, - universe_domain=self.FAKE_UNIVERSE_DOMAIN, - ) - # Create impersonated credentials using the non-default source credentials - credentials = self.make_credentials(source_credentials=source_credentials) - - # Mock the IAM credentials API call for generateAccessToken - token = "token" - expire_time = ( - _helpers.utcnow().replace(microsecond=0) + datetime.timedelta(seconds=500) - ).isoformat("T") + "Z" - response_body = {"accessToken": token, "expireTime": expire_time} - request = self.make_request( - data=json.dumps(response_body), status=http_client.OK - ) - - with mock.patch.dict( - os.environ, {environment_vars.GOOGLE_AUTH_TRUST_BOUNDARY_ENABLED: "true"} - ): - credentials.refresh(request) - - # Ensure trust boundary lookup was not called - mock_lookup_trust_boundary.assert_not_called() - # Verify that x-allowed-locations header is not set by apply() - headers_applied = {} - credentials.apply(headers_applied) - assert "x-allowed-locations" not in headers_applied - - @mock.patch("google.oauth2._client._lookup_trust_boundary") - def test_refresh_starts_with_no_op_trust_boundary_skips_lookup( - self, mock_lookup_trust_boundary, mock_donor_credentials - ): - credentials = self.make_credentials( - lifetime=None, trust_boundary=self.NO_OP_TRUST_BOUNDARY - ) # Start with NO_OP - token = "token" - expire_time = ( - _helpers.utcnow().replace(microsecond=0) + datetime.timedelta(seconds=500) - ).isoformat("T") + "Z" - response_body = {"accessToken": token, "expireTime": expire_time} - request = self.make_request( - data=json.dumps(response_body), status=http_client.OK - ) - with mock.patch.dict( - os.environ, {environment_vars.GOOGLE_AUTH_TRUST_BOUNDARY_ENABLED: "true"} - ), mock.patch( - "google.auth.metrics.token_request_access_token_impersonate", - return_value=ACCESS_TOKEN_REQUEST_METRICS_HEADER_VALUE, - ): - credentials.refresh(request) - - # Verify trust boundary remained NO_OP - assert credentials._trust_boundary == self.NO_OP_TRUST_BOUNDARY - - # Lookup should be skipped - mock_lookup_trust_boundary.assert_not_called() - - # Verify that an empty header was added. - headers_applied = {} - credentials.apply(headers_applied) - assert headers_applied["x-allowed-locations"] == "" - - @mock.patch("google.oauth2._client._lookup_trust_boundary") - def test_refresh_trust_boundary_lookup_fails_with_cached_data2( - self, mock_lookup_trust_boundary, mock_donor_credentials - ): - # Start with no trust boundary - credentials = self.make_credentials(lifetime=None, trust_boundary=None) - token = "token" - - expire_time = ( - _helpers.utcnow().replace(microsecond=0) + datetime.timedelta(seconds=500) - ).isoformat("T") + "Z" - response_body = {"accessToken": token, "expireTime": expire_time} - - request = self.make_request( - data=json.dumps(response_body), status=http_client.OK - ) - - # First refresh: Successfully fetch a valid trust boundary. - mock_lookup_trust_boundary.return_value = self.VALID_TRUST_BOUNDARY - with mock.patch.dict( - os.environ, {environment_vars.GOOGLE_AUTH_TRUST_BOUNDARY_ENABLED: "true"} - ), mock.patch( - "google.auth.metrics.token_request_access_token_impersonate", - return_value=ACCESS_TOKEN_REQUEST_METRICS_HEADER_VALUE, - ): - credentials.refresh(request) - - assert credentials.valid - # Verify trust boundary was set. - assert credentials._trust_boundary == self.VALID_TRUST_BOUNDARY - mock_lookup_trust_boundary.assert_called_once() - - # Second refresh: Mock lookup to fail, but expect cached data to be preserved. - mock_lookup_trust_boundary.reset_mock() - mock_lookup_trust_boundary.side_effect = exceptions.RefreshError( - "Lookup failed" - ) - - with mock.patch.dict( - os.environ, {environment_vars.GOOGLE_AUTH_TRUST_BOUNDARY_ENABLED: "true"} - ): - credentials.refresh(request) - - assert credentials.valid - assert credentials._trust_boundary == self.VALID_TRUST_BOUNDARY - mock_lookup_trust_boundary.assert_called_once() - @pytest.mark.parametrize("use_data_bytes", [True, False]) def test_refresh_with_subject_success(self, use_data_bytes, mock_dwd_credentials): credentials = self.make_credentials(subject="test@email.com", lifetime=None) @@ -981,15 +746,14 @@ def test_with_scopes(self): assert credentials.requires_scopes is False assert credentials._target_scopes == ["fake_scope1", "fake_scope2"] - def test_with_trust_boundary(self): + def test_with_regional_access_boundary(self): credentials = self.make_credentials() new_boundary = {"encodedLocations": "new_boundary"} - new_credentials = credentials.with_trust_boundary(new_boundary) + new_credentials = credentials._with_regional_access_boundary(new_boundary) assert new_credentials is not credentials - assert new_credentials._trust_boundary == new_boundary + assert new_credentials._regional_access_boundary == new_boundary # The source credentials should be a copy, not the same object. - # But they should be functionally equivalent. assert ( new_credentials._source_credentials is not credentials._source_credentials ) @@ -1004,13 +768,18 @@ def test_with_trust_boundary(self): ) assert new_credentials._target_principal == credentials._target_principal - def test_build_trust_boundary_lookup_url_no_email(self): + def test_build_regional_access_boundary_lookup_url_no_email(self): credentials = self.make_credentials(target_principal=None) - with pytest.raises(ValueError) as excinfo: - credentials._build_trust_boundary_lookup_url() + assert credentials._build_regional_access_boundary_lookup_url() is None - assert "Service account email is required" in str(excinfo.value) + def test_build_regional_access_boundary_lookup_url_success(self): + credentials = self.make_credentials() + # Ensure service_account_email is properly set by default mock + expected_url = "https://iamcredentials.googleapis.com/v1/projects/-/serviceAccounts/{}/allowedLocations".format( + credentials.service_account_email + ) + assert credentials._build_regional_access_boundary_lookup_url() == expected_url def test_with_scopes_provide_default_scopes(self): credentials = self.make_credentials() diff --git a/tests/test_pluggable.py b/tests/test_pluggable.py index b2764361b..70e8c188a 100644 --- a/tests/test_pluggable.py +++ b/tests/test_pluggable.py @@ -274,7 +274,6 @@ def test_from_info_full_options(self, mock_init): quota_project_id=QUOTA_PROJECT_ID, workforce_pool_user_project=None, universe_domain=DEFAULT_UNIVERSE_DOMAIN, - trust_boundary=None, ) @mock.patch.object(pluggable.Credentials, "__init__", return_value=None) @@ -303,7 +302,6 @@ def test_from_info_required_options_only(self, mock_init): quota_project_id=None, workforce_pool_user_project=None, universe_domain=DEFAULT_UNIVERSE_DOMAIN, - trust_boundary=None, ) @mock.patch.object(pluggable.Credentials, "__init__", return_value=None) @@ -339,7 +337,6 @@ def test_from_file_full_options(self, mock_init, tmpdir): quota_project_id=QUOTA_PROJECT_ID, workforce_pool_user_project=None, universe_domain=DEFAULT_UNIVERSE_DOMAIN, - trust_boundary=None, ) @mock.patch.object(pluggable.Credentials, "__init__", return_value=None) @@ -369,7 +366,6 @@ def test_from_file_required_options_only(self, mock_init, tmpdir): quota_project_id=None, workforce_pool_user_project=None, universe_domain=DEFAULT_UNIVERSE_DOMAIN, - trust_boundary=None, ) def test_constructor_invalid_options(self):