-
Notifications
You must be signed in to change notification settings - Fork 217
feat(boxsdk): shared network client and token storage #1299
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: combined-sdk
Are you sure you want to change the base?
Changes from all commits
35b368a
0cc8d59
3c5ec83
b140dda
8b00f82
0a15681
b1bb0f2
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -19,6 +19,21 @@ | |
| from ..util.datetime_formatter import normalize_date_to_rfc3339_format | ||
| from ..util.shared_link import get_shared_link_header | ||
| from ..util.deprecation_decorator import deprecated | ||
| from ..auth.developer_token_auth import DeveloperTokenAuth | ||
| from ..auth.jwt_auth import JWTAuth | ||
| from ..auth.ccg_auth import CCGAuth | ||
| from ..auth.oauth2 import OAuth2 as LegacyOAuth2 | ||
| from ..util.token_storage_adapter import LegacyTokenStorageAdapter | ||
|
|
||
| from box_sdk_gen.box.developer_token_auth import BoxDeveloperTokenAuth | ||
| from box_sdk_gen.box.oauth import BoxOAuth, OAuthConfig | ||
| from box_sdk_gen.box.jwt_auth import BoxJWTAuth, JWTConfig | ||
| from box_sdk_gen.box.ccg_auth import BoxCCGAuth, CCGConfig | ||
| from box_sdk_gen.client import BoxClient | ||
| from box_sdk_gen.networking.network import NetworkSession | ||
| from box_sdk_gen.networking.base_urls import BaseUrls | ||
| from box_sdk_gen.networking.retries import BoxRetryStrategy | ||
| from box_sdk_gen.schemas.access_token import AccessToken | ||
|
|
||
| if TYPE_CHECKING: | ||
| from boxsdk import OAuth2 | ||
|
|
@@ -2009,3 +2024,316 @@ def get_ai_agent_default_config( | |
| session=self._session, | ||
| response_object=box_response.json(), | ||
| ) | ||
|
|
||
| def get_authentication(self, *, token_storage=None): | ||
| """ | ||
| Extract authentication configuration from this legacy client and convert it | ||
| to a generated SDK Authentication object. | ||
|
|
||
| This method supports the following legacy authentication types: | ||
| - DeveloperTokenAuth -> BoxDeveloperTokenAuth | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. LegacyOAuth2? |
||
| - OAuth2 -> BoxOAuth | ||
| - JWTAuth -> BoxJWTAuth | ||
| - CCGAuth -> BoxCCGAuth | ||
|
|
||
| :param token_storage: | ||
| Optional TokenStorage instance for the generated SDK. | ||
| If not provided, an adapter will be created to bridge legacy token storage. | ||
| :return: | ||
| Authentication object compatible with the generated SDK (box_sdk_gen). | ||
| :raises ValueError: | ||
| If the authentication type is not supported or required credentials are missing. | ||
| """ | ||
| oauth = self._oauth | ||
|
|
||
| # Developer Token Authentication | ||
| if isinstance(oauth, DeveloperTokenAuth): | ||
| token = oauth.access_token | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. maybe extract if body to separate private methods for better readability |
||
| if not token: | ||
| raise ValueError("Developer token is not available") | ||
| return BoxDeveloperTokenAuth(token=token) | ||
|
|
||
| # OAuth 2.0 Authentication | ||
| # Check if it's OAuth2 (but not DeveloperTokenAuth, JWTAuth, or CCGAuth) | ||
| if isinstance(oauth, LegacyOAuth2) and not isinstance( | ||
| oauth, (DeveloperTokenAuth, JWTAuth, CCGAuth) | ||
| ): | ||
| # It's OAuth2 | ||
| client_id = getattr(oauth, '_client_id', None) | ||
| client_secret = getattr(oauth, '_client_secret', None) | ||
|
|
||
| if not client_id or not client_secret: | ||
| raise ValueError("OAuth2 client_id and client_secret are required") | ||
|
|
||
| # Create token storage adapter if not provided | ||
| if token_storage is None: | ||
| # Create adapter from legacy OAuth2's token storage | ||
| def get_tokens(): | ||
| return oauth._get_tokens() | ||
|
|
||
| def store_tokens(access_token, refresh_token): | ||
| oauth._store_tokens(access_token, refresh_token) | ||
|
|
||
| token_storage = LegacyTokenStorageAdapter( | ||
| get_tokens=get_tokens, store_tokens=store_tokens | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. use lambda? |
||
| ) | ||
|
|
||
| config = OAuthConfig( | ||
| client_id=client_id, | ||
| client_secret=client_secret, | ||
| token_storage=token_storage, | ||
| ) | ||
|
|
||
| # Pre-populate with existing tokens if available | ||
| auth = BoxOAuth(config=config) | ||
| access_token, refresh_token = oauth._get_tokens() | ||
| if access_token: | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is this needed? Wouldn't old token storage be called to get token? |
||
| existing_token = AccessToken( | ||
| access_token=access_token, | ||
| refresh_token=refresh_token, | ||
| expires_in=3600, # Default, actual expiry not available | ||
| token_type='bearer', | ||
| ) | ||
| token_storage.store(existing_token) | ||
|
|
||
| return auth | ||
|
|
||
| # JWT Authentication | ||
| if isinstance(oauth, JWTAuth): | ||
| client_id = getattr(oauth, '_client_id', None) | ||
| client_secret = getattr(oauth, '_client_secret', None) | ||
| jwt_key_id = getattr(oauth, '_jwt_key_id', None) | ||
| rsa_private_key = getattr(oauth, '_rsa_private_key', None) | ||
| enterprise_id = getattr(oauth, '_enterprise_id', None) | ||
| user_id = getattr(oauth, '_user_id', None) | ||
|
|
||
| if not all([client_id, client_secret, jwt_key_id, rsa_private_key]): | ||
| raise ValueError( | ||
| "JWT authentication requires client_id, client_secret, jwt_key_id, and private key" | ||
| ) | ||
|
|
||
| # Convert RSA private key to string format | ||
| # Note: If the key was originally encrypted, we can't extract the passphrase | ||
| # from the normalized RSAPrivateKey object. We'll serialize it unencrypted. | ||
| from cryptography.hazmat.primitives import serialization | ||
|
|
||
| try: | ||
| # Serialize the key to PEM format (unencrypted) | ||
| # This works even if the original key was encrypted, as the | ||
| # normalized RSAPrivateKey object is already decrypted | ||
| private_key_pem = rsa_private_key.private_bytes( | ||
| encoding=serialization.Encoding.PEM, | ||
| format=serialization.PrivateFormat.PKCS8, | ||
| encryption_algorithm=serialization.NoEncryption(), | ||
| ).decode('utf-8') | ||
| # Passphrase is not needed since we're serializing unencrypted | ||
| # The generated SDK will handle encryption if needed | ||
| passphrase = '' | ||
| except Exception as e: | ||
| raise ValueError( | ||
| f"Cannot serialize private key: {e}. " | ||
| "Please ensure the private key is valid." | ||
| ) from e | ||
|
|
||
| # Create token storage adapter if not provided | ||
| if token_storage is None: | ||
| from box_sdk_gen.box.token_storage import InMemoryTokenStorage | ||
|
|
||
| token_storage = InMemoryTokenStorage() | ||
|
|
||
| config = JWTConfig( | ||
| client_id=client_id, | ||
| client_secret=client_secret, | ||
| jwt_key_id=jwt_key_id, | ||
| private_key=private_key_pem, | ||
| private_key_passphrase=passphrase, | ||
| enterprise_id=enterprise_id, | ||
| user_id=user_id, | ||
| token_storage=token_storage, | ||
| ) | ||
|
|
||
| auth = BoxJWTAuth(config=config) | ||
|
|
||
| # Handle user vs enterprise scope | ||
| if user_id: | ||
| auth = auth.with_user_subject(user_id, token_storage=token_storage) | ||
|
|
||
| return auth | ||
|
|
||
| # CCG (Client Credentials Grant) Authentication | ||
| if isinstance(oauth, CCGAuth): | ||
| client_id = getattr(oauth, '_client_id', None) | ||
| client_secret = getattr(oauth, '_client_secret', None) | ||
| enterprise_id = getattr(oauth, '_enterprise_id', None) | ||
| user_id = getattr(oauth, '_user_id', None) | ||
|
|
||
| if not client_id or not client_secret: | ||
| raise ValueError( | ||
| "CCG authentication requires client_id and client_secret" | ||
| ) | ||
|
|
||
| # Create token storage adapter if not provided | ||
| if token_storage is None: | ||
| from box_sdk_gen.box.token_storage import InMemoryTokenStorage | ||
|
|
||
| token_storage = InMemoryTokenStorage() | ||
|
|
||
| config = CCGConfig( | ||
| client_id=client_id, | ||
| client_secret=client_secret, | ||
| enterprise_id=enterprise_id, | ||
| user_id=user_id, | ||
| token_storage=token_storage, | ||
| ) | ||
|
|
||
| auth = BoxCCGAuth(config=config) | ||
|
|
||
| # Handle user vs enterprise scope | ||
| if user_id: | ||
| auth = auth.with_user_subject(user_id, token_storage=token_storage) | ||
|
|
||
| return auth | ||
|
|
||
| raise ValueError( | ||
| f"Unsupported authentication type: {type(oauth).__name__}. " | ||
| "Supported types: DeveloperTokenAuth, OAuth2, JWTAuth, CCGAuth" | ||
| ) | ||
|
|
||
| def get_network_session( | ||
| self, | ||
| *, | ||
| network_client=None, | ||
| retry_strategy=None, | ||
| data_sanitizer=None, | ||
| additional_headers=None, | ||
| ): | ||
| """ | ||
| Extract network configuration from this legacy client and convert it | ||
| to a generated SDK NetworkSession object. | ||
|
|
||
| :param network_client: | ||
| Optional NetworkClient instance for the generated SDK. | ||
| If not provided, a default will be created with proxy support if configured. | ||
| :param retry_strategy: | ||
| Optional RetryStrategy instance for the generated SDK. | ||
| If not provided, one will be created from legacy retry settings. | ||
| :param data_sanitizer: | ||
| Optional DataSanitizer instance for the generated SDK. | ||
| :param additional_headers: | ||
| Optional dictionary of additional HTTP headers to merge with legacy headers. | ||
| :return: | ||
| NetworkSession object compatible with the generated SDK (box_sdk_gen). | ||
| """ | ||
| session = self._session | ||
| api_config = session.api_config | ||
| proxy_config = session.proxy_config | ||
|
|
||
| # Extract base URLs | ||
| base_url = getattr(api_config, 'BASE_API_URL', 'https://api.box.com/2.0') | ||
| # Remove version suffix if present | ||
| if base_url.endswith('/2.0'): | ||
| base_url = base_url[:-4] | ||
| elif base_url.endswith('/2'): | ||
| base_url = base_url[:-2] | ||
|
|
||
| upload_url = getattr(api_config, 'UPLOAD_URL', 'https://upload.box.com/api/2.0') | ||
| # Remove version suffix if present | ||
| if upload_url.endswith('/2.0'): | ||
| upload_url = upload_url[:-4] | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. extract to a method |
||
| elif upload_url.endswith('/2'): | ||
| upload_url = upload_url[:-2] | ||
|
|
||
| oauth2_url = getattr( | ||
| api_config, 'OAUTH2_AUTHORIZE_URL', 'https://account.box.com/api/oauth2' | ||
| ) | ||
| # Extract base OAuth URL | ||
| if '/authorize' in oauth2_url: | ||
| oauth2_url = oauth2_url[: oauth2_url.rindex('/authorize')] | ||
|
|
||
| base_urls = BaseUrls( | ||
| base_url=base_url, upload_url=upload_url, oauth_2_url=oauth2_url | ||
| ) | ||
|
|
||
| # Extract or create retry strategy | ||
| if retry_strategy is None: | ||
| max_retries = getattr(api_config, 'MAX_RETRY_ATTEMPTS', 5) | ||
| retry_base_interval = getattr(session, '_retry_base_interval', 1.0) | ||
| retry_strategy = BoxRetryStrategy( | ||
| max_attempts=max_retries, retry_base_interval=retry_base_interval | ||
| ) | ||
|
|
||
| # Handle proxy configuration | ||
| proxy_url = None | ||
| if proxy_config and hasattr(proxy_config, 'URL') and proxy_config.URL: | ||
| proxy_url = proxy_config.URL | ||
| # Handle authenticated proxy | ||
| if hasattr(proxy_config, 'AUTH') and proxy_config.AUTH: | ||
| auth = proxy_config.AUTH | ||
| if isinstance(auth, dict) and 'user' in auth and 'password' in auth: | ||
| scheme = ( | ||
| proxy_url.split('://', 1)[0] if '://' in proxy_url else 'http' | ||
| ) | ||
| # Extract host from URL | ||
| host = proxy_url.split('//')[1] if '//' in proxy_url else proxy_url | ||
| proxy_url = f"{scheme}://{auth['user']}:{auth['password']}@{host}" | ||
|
|
||
| # Merge custom headers | ||
| headers = {} | ||
| if hasattr(session, '_default_headers'): | ||
| headers.update(session._default_headers.copy()) | ||
| if additional_headers: | ||
| headers.update(additional_headers) | ||
|
|
||
| # Create network session | ||
| network_session = NetworkSession( | ||
| base_urls=base_urls, | ||
| network_client=network_client, | ||
| retry_strategy=retry_strategy, | ||
| additional_headers=headers if headers else None, | ||
| proxy_url=proxy_url, | ||
| data_sanitizer=data_sanitizer, | ||
| ) | ||
|
|
||
| return network_session | ||
|
|
||
| def get_sdk_gen_client(self, *, auth_options=None, network_options=None): | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. missing typehints |
||
| """ | ||
| Create a fully configured generated SDK client from this legacy client. | ||
|
|
||
| This method combines get_authentication() and get_network_session() to create | ||
| a BoxClient instance that shares authentication and network configuration | ||
| with this legacy client. | ||
|
|
||
| :param auth_options: | ||
| Optional dictionary with authentication options: | ||
| - token_storage: Custom TokenStorage instance | ||
| :param network_options: | ||
| Optional dictionary with network options: | ||
| - network_client: Custom NetworkClient instance | ||
| - retry_strategy: Custom RetryStrategy instance | ||
| - data_sanitizer: Custom DataSanitizer instance | ||
| - additional_headers: Dictionary of additional HTTP headers | ||
| :return: | ||
| BoxClient instance from box_sdk_gen, fully configured with shared settings. | ||
| """ | ||
| # Extract authentication | ||
| token_storage = None | ||
| if auth_options and 'token_storage' in auth_options: | ||
| token_storage = auth_options['token_storage'] | ||
|
|
||
| auth = self.get_authentication(token_storage=token_storage) | ||
|
|
||
| # Extract network session | ||
| network_kwargs = {} | ||
| if network_options: | ||
| network_kwargs = { | ||
| 'network_client': network_options.get('network_client'), | ||
| 'retry_strategy': network_options.get('retry_strategy'), | ||
| 'data_sanitizer': network_options.get('data_sanitizer'), | ||
| 'additional_headers': network_options.get('additional_headers'), | ||
| } | ||
|
|
||
| network_session = self.get_network_session(**network_kwargs) | ||
|
|
||
| # Create and return fully configured client | ||
| return BoxClient(auth=auth, network_session=network_session) | ||
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
missing typehint for token_storage and returned type