diff --git a/nisystemlink/clients/artifact/_artifact_client.py b/nisystemlink/clients/artifact/_artifact_client.py index c98cdda3..9615b700 100644 --- a/nisystemlink/clients/artifact/_artifact_client.py +++ b/nisystemlink/clients/artifact/_artifact_client.py @@ -10,13 +10,21 @@ post, response_handler, ) +from nisystemlink.clients.core._uplink._multipart_retry import ( + retryable_multipart_request, +) from nisystemlink.clients.core.helpers._iterator_file_like import IteratorFileLike from requests.models import Response -from uplink import Part, Path +from uplink import Part, Path, retry from . import models +def _iter_content_filelike_wrapper(response: Response) -> IteratorFileLike: + return IteratorFileLike(response.iter_content(chunk_size=4096)) + + +@retry(when=retry.when.status(429), stop=retry.stop.after_attempt(5)) class ArtifactClient(BaseClient): def __init__(self, configuration: core.HttpConfiguration | None = None): """Initialize an instance. @@ -35,9 +43,10 @@ def __init__(self, configuration: core.HttpConfiguration | None = None): super().__init__(configuration, base_path="/ninbartifact/v1/") - @post("artifacts") + @retryable_multipart_request() + @post("artifacts", args=[Part("workspace"), Part("artifact")]) def __upload_artifact( - self, workspace: Part, artifact: Part + self, workspace: str, artifact: BinaryIO ) -> models.UploadArtifactResponse: """Uploads an artifact using multipart/form-data headers to send the file payload in the HTTP body. @@ -49,6 +58,7 @@ def __upload_artifact( UploadArtifactResponse: The response containing the artifact ID. """ + ... def upload_artifact( self, workspace: str, artifact: BinaryIO @@ -70,9 +80,6 @@ def upload_artifact( return response - def _iter_content_filelike_wrapper(response: Response) -> IteratorFileLike: - return IteratorFileLike(response.iter_content(chunk_size=4096)) - @response_handler(_iter_content_filelike_wrapper) @get("artifacts/{id}") def download_artifact(self, id: Path) -> IteratorFileLike: diff --git a/nisystemlink/clients/core/_uplink/_multipart_retry.py b/nisystemlink/clients/core/_uplink/_multipart_retry.py new file mode 100644 index 00000000..23e5a928 --- /dev/null +++ b/nisystemlink/clients/core/_uplink/_multipart_retry.py @@ -0,0 +1,189 @@ +"""Helpers for multipart requests that need retry-safe stream handling. + +The decorator exported by this module is intended for multipart requests that may +be retried by Uplink. It preserves the caller's stream position on the initial +send, rewinds seekable multipart parts only on retry attempts, and aborts the +retry if any part cannot be rewound. + +When a retry is aborted because a part cannot be rewound, the original response +or exception that triggered the retry is surfaced back through Uplink's normal +response and error handling pipeline instead of sending a malformed follow-up +request. +""" + +import io +from enum import auto, Enum +from typing import Any, Callable, cast, TypeVar + +from requests import Response +from uplink import decorators +from uplink.clients.io import transitions +from uplink.clients.io.interfaces import RequestTemplate + +F = TypeVar("F", bound=Callable[..., Any]) + + +class _RewindResult(Enum): + REWOUND = auto() + FAILED = auto() + NOT_NEEDED = auto() + + +def _rewind_retryable_part(part: object) -> _RewindResult: + """Rewind the first seekable multipart payload contained in ``part``. + + Returns ``_RewindResult.REWOUND`` when a multipart payload was successfully + rewound to the start of the stream. Returns + ``_RewindResult.FAILED`` when a stream payload appears to need rewinding but + rejects it. Returns ``_RewindResult.NOT_NEEDED`` when the part contains no + stream payload that requires rewinding, such as simple string fields. + """ + if hasattr(part, "seek"): + seekable = getattr(part, "seekable", None) + if callable(seekable): + try: + if not cast(Any, seekable)(): + return _RewindResult.FAILED + except (OSError, io.UnsupportedOperation): + return _RewindResult.FAILED + + try: + cast(Any, part).seek(0) + except (OSError, io.UnsupportedOperation): + return _RewindResult.FAILED + return _RewindResult.REWOUND + + if isinstance(part, tuple): + for item in part: + rewind_result = _rewind_retryable_part(item) + if rewind_result is not _RewindResult.NOT_NEEDED: + return rewind_result + + return _RewindResult.NOT_NEEDED + + +def _get_saved_retry_action( + response: Response | None, + exception_info: tuple[type[BaseException], BaseException, Any] | None, +) -> Any: + """Return the original retry-triggering failure as an Uplink transition.""" + if response is not None: + return transitions.finish(response) + + if exception_info is not None: + return transitions.fail(*exception_info) + + return None + + +class _RetryableMultipartRequestTemplate(RequestTemplate): + """Track multipart retry state and rewind streams only for retry sends. + + The first request attempt is left untouched so callers can intentionally + provide streams positioned away from offset 0. On later attempts, each file + part must be rewound to the beginning before the request is sent again. + + If rewinding fails for any part, the retry is cancelled and the original + response or exception that caused the retry is returned to Uplink so normal + error handling can surface it to the caller. + """ + + def __init__(self) -> None: + self._attempted_request_ids: set[int] = set() + self._responses_by_request_id: dict[int, Response] = {} + self._exceptions_by_request_id: dict[ + int, tuple[type[BaseException], BaseException, Any] + ] = {} + + def _clear_request_state(self, request_id: int) -> None: + self._attempted_request_ids.discard(request_id) + self._responses_by_request_id.pop(request_id, None) + self._exceptions_by_request_id.pop(request_id, None) + + def before_request(self, request: tuple[str, str, dict[str, Any]]) -> Any: + _, _, extras = request + request_id = id(request) + if request_id not in self._attempted_request_ids: + self._attempted_request_ids.add(request_id) + return None + + for part in extras.get("files", {}).values(): + if _rewind_retryable_part(part) is _RewindResult.FAILED: + retry_action = _get_saved_retry_action( + self._responses_by_request_id.get(request_id), + self._exceptions_by_request_id.get(request_id), + ) + self._clear_request_state(request_id) + return retry_action + return None + + def after_response( + self, request: tuple[str, str, dict[str, Any]], response: Response + ) -> None: + request_id = id(request) + self._responses_by_request_id[request_id] = response + self._exceptions_by_request_id.pop(request_id, None) + return None + + def after_exception( + self, + request: tuple[str, str, dict[str, Any]], + exc_type: type[BaseException], + exc_val: BaseException, + exc_tb: Any, + ) -> None: + request_id = id(request) + self._exceptions_by_request_id[request_id] = (exc_type, exc_val, exc_tb) + self._responses_by_request_id.pop(request_id, None) + return None + + +class _RetryableMultipartCleanupTemplate(RequestTemplate): + def __init__(self, retry_template: _RetryableMultipartRequestTemplate) -> None: + self._retry_template = retry_template + + def after_response( + self, request: tuple[str, str, dict[str, Any]], response: Response + ) -> None: + self._retry_template._clear_request_state(id(request)) + return None + + def after_exception( + self, + request: tuple[str, str, dict[str, Any]], + exc_type: type[BaseException], + exc_val: BaseException, + exc_tb: Any, + ) -> None: + self._retry_template._clear_request_state(id(request)) + return None + + +class _RetryableMultipartRequest(decorators.MethodAnnotation): + def modify_request(self, request_builder: Any) -> None: + retryable_template = _RetryableMultipartRequestTemplate() + # Insert ahead of Uplink's retry template so this helper can see the + # original retry-triggering response/exception and short-circuit future + # attempts when a multipart stream cannot be rewound safely. + request_builder._request_templates.insert(0, retryable_template) + request_builder._request_templates.append( + _RetryableMultipartCleanupTemplate(retryable_template) + ) + + +def retryable_multipart_request() -> Callable[[F], F]: + """Create a decorator for multipart requests with retry-safe stream handling. + + Behavior: + - The initial send preserves the caller-provided stream position. + - Retry attempts rewind seekable multipart payloads back to offset 0. + - Multipart fields that do not contain streams, such as simple strings, are + left alone and do not block retries. + - If a retry attempt cannot rewind a payload, the retry is cancelled and the + original retry-triggering response or exception is surfaced. + """ + + def decorator(func: F) -> F: + return _RetryableMultipartRequest()(func) # type: ignore[return-value] + + return decorator diff --git a/nisystemlink/clients/feeds/_feeds_client.py b/nisystemlink/clients/feeds/_feeds_client.py index 34e14a56..14969268 100644 --- a/nisystemlink/clients/feeds/_feeds_client.py +++ b/nisystemlink/clients/feeds/_feeds_client.py @@ -5,6 +5,9 @@ from nisystemlink.clients import core from nisystemlink.clients.core._uplink._base_client import BaseClient from nisystemlink.clients.core._uplink._methods import delete, get, post +from nisystemlink.clients.core._uplink._multipart_retry import ( + retryable_multipart_request, +) from uplink import Part, Path, Query, retry from . import models @@ -90,6 +93,7 @@ def query_feeds( return response + @retryable_multipart_request() @post( "feeds/{feedId}/packages", args=[Path(name="feedId"), Part(), Query(name="shouldOverwrite")], diff --git a/nisystemlink/clients/file/_file_client.py b/nisystemlink/clients/file/_file_client.py index bf86a015..14feaea4 100644 --- a/nisystemlink/clients/file/_file_client.py +++ b/nisystemlink/clients/file/_file_client.py @@ -14,6 +14,9 @@ post, response_handler, ) +from nisystemlink.clients.core._uplink._multipart_retry import ( + retryable_multipart_request, +) from nisystemlink.clients.core.helpers import IteratorFileLike from requests.models import Response from uplink import Body, Field, params, Part, Path, Query, retry @@ -331,6 +334,7 @@ def download_file(self, id: str) -> IteratorFileLike: """ @response_handler(_file_uri_response_handler) + @retryable_multipart_request() @post("service-groups/Default/upload-files") def __upload_file( self, @@ -433,6 +437,7 @@ def start_upload_session( ], ) @response_handler(lambda response: None) + @retryable_multipart_request() def append_to_upload_session( self, session_id: str, diff --git a/nisystemlink/clients/notebook/_notebook_client.py b/nisystemlink/clients/notebook/_notebook_client.py index 4bbc2233..cd78ab66 100644 --- a/nisystemlink/clients/notebook/_notebook_client.py +++ b/nisystemlink/clients/notebook/_notebook_client.py @@ -1,5 +1,5 @@ import io -from typing import List +from typing import BinaryIO, List from nisystemlink.clients import core from nisystemlink.clients.core._api_error import ApiError @@ -14,6 +14,9 @@ put, response_handler, ) +from nisystemlink.clients.core._uplink._multipart_retry import ( + retryable_multipart_request, +) from nisystemlink.clients.core.helpers._iterator_file_like import IteratorFileLike from uplink import Part, Path, retry @@ -54,12 +57,16 @@ def get_notebook(self, id: str) -> models.NotebookMetadata: """ ... - @put("ninotebook/v1/notebook/{id}") + @retryable_multipart_request() + @put( + "ninotebook/v1/notebook/{id}", + args=[Path("id"), Part("metadata"), Part("content")], + ) def __update_notebook( self, - id: Path, - metadata: Part = None, - content: Part = None, + id: str, + metadata: io.BytesIO | None = None, + content: BinaryIO | None = None, ) -> models.NotebookMetadata: """Updates a notebook metadata by ID. @@ -81,7 +88,7 @@ def update_notebook( self, id: str, metadata: models.NotebookMetadata | None = None, - content: io.BufferedReader | None = None, + content: BinaryIO | None = None, ) -> models.NotebookMetadata: """Updates a notebook metadata by ID. @@ -121,11 +128,15 @@ def delete_notebook(self, id: str) -> None: """ ... - @post("ninotebook/v1/notebook") + @retryable_multipart_request() + @post( + "ninotebook/v1/notebook", + args=[Part("metadata"), Part("content")], + ) def __create_notebook( self, - metadata: Part, - content: Part, + metadata: io.BytesIO, + content: BinaryIO, ) -> models.NotebookMetadata: """Creates a new notebook. @@ -145,7 +156,7 @@ def __create_notebook( def create_notebook( self, metadata: models.NotebookMetadata, - content: io.BufferedReader, + content: BinaryIO, ) -> models.NotebookMetadata: """Creates a new notebook. diff --git a/tests/core/test_multipart_retry.py b/tests/core/test_multipart_retry.py new file mode 100644 index 00000000..648c8abb --- /dev/null +++ b/tests/core/test_multipart_retry.py @@ -0,0 +1,333 @@ +import io +from typing import Any, cast + +import pytest +import responses +from nisystemlink.clients.core import ApiException +from nisystemlink.clients.core._uplink._base_client import _handle_http_status +from nisystemlink.clients.core._uplink._multipart_retry import ( + _RetryableMultipartCleanupTemplate, + _RetryableMultipartRequestTemplate, + retryable_multipart_request, +) +from requests import Response +from uplink import Consumer, Part, post, retry +from uplink.clients.io import CompositeRequestTemplate +from uplink.clients.io import state as uplink_state +from uplink.clients.io.interfaces import RequestTemplate + + +class _NonSeekableStream: + def seekable(self) -> bool: + return False + + def seek(self, offset: int) -> None: + raise AssertionError("seek should not be called for non-seekable streams") + + +class _FailingSeekStream: + def __init__(self, data: bytes = b"artifact") -> None: + self._buffer = io.BytesIO(data) + + def read(self, size: int = -1) -> bytes: + return self._buffer.read(size) + + def seek(self, offset: int) -> None: + raise OSError("cannot rewind") + + +class _StaticTransitionTemplate(RequestTemplate): + def __init__(self, response_transition=None, exception_transition=None) -> None: + self._response_transition = response_transition + self._exception_transition = exception_transition + + def after_response(self, request, response): + return self._response_transition + + def after_exception(self, request, exc_type, exc_val, exc_tb): + return self._exception_transition + + +@retry( + when=retry.when.status(429), + stop=retry.stop.after_attempt(2), + backoff=retry.backoff.fixed(0), +) +class _MultipartRetryTestConsumer(Consumer): + def __init__(self): + super().__init__( + base_url="https://example.com/", + hooks=[_handle_http_status], + ) + + @retryable_multipart_request() + @post("upload", args=[Part("artifact")]) + def upload(self, artifact): + pass + + +class TestRetryableMultipartRequestTemplate: + def test__before_request_on_initial_attempt__does_not_rewind_parts(self): + direct_part = io.BytesIO(b"direct") + direct_part.seek(3) + tuple_part = io.BytesIO(b"tuple") + tuple_part.seek(2) + request = ( + "POST", + "https://example.com/upload", + { + "files": { + "direct": direct_part, + "tuple": ("tuple.bin", tuple_part, "application/octet-stream"), + } + }, + ) + + _RetryableMultipartRequestTemplate().before_request(request) + + assert direct_part.tell() == 3 + assert tuple_part.tell() == 2 + + def test__before_request_on_retry__rewinds_parts(self): + direct_part = io.BytesIO(b"direct") + direct_part.seek(3) + tuple_part = io.BytesIO(b"tuple") + tuple_part.seek(2) + request = ( + "POST", + "https://example.com/upload", + { + "files": { + "direct": direct_part, + "tuple": ("tuple.bin", tuple_part, "application/octet-stream"), + } + }, + ) + template = _RetryableMultipartRequestTemplate() + + template.before_request(request) + direct_part.read() + tuple_part.read() + template.before_request(request) + + assert direct_part.tell() == 0 + assert tuple_part.tell() == 0 + + def test__before_request_with_non_seekable_parts__does_not_raise(self): + request = ( + "POST", + "https://example.com/upload", + { + "files": { + "non_seekable": _NonSeekableStream(), + "failing_seek": _FailingSeekStream(), + } + }, + ) + template = _RetryableMultipartRequestTemplate() + + template.before_request(request) + template.before_request(request) + + def test__before_request_when_rewind_fails_after_response__finishes_with_saved_response( + self, + ): + request = ( + "POST", + "https://example.com/upload", + { + "files": { + "failing_seek": _FailingSeekStream(), + } + }, + ) + response = Response() + response.status_code = 429 + response.url = "https://example.com/upload" + template = _RetryableMultipartRequestTemplate() + + template.before_request(request) + template.after_response(request, response) + action = template.before_request(request) + + assert action is not None + next_state = action(uplink_state.BeforeRequest(request)) + assert isinstance(next_state, uplink_state.Finish) + assert next_state.response is response + request_id = id(request) + assert request_id not in template._attempted_request_ids + assert request_id not in template._responses_by_request_id + assert request_id not in template._exceptions_by_request_id + + def test__terminal_response_after_retry_pipeline__clears_saved_retry_state(self): + request = ( + "POST", + "https://example.com/upload", + { + "files": { + "artifact": ("artifact.bin", io.BytesIO(b"artifact")), + } + }, + ) + response = Response() + response.status_code = 200 + response.url = "https://example.com/upload" + template = _RetryableMultipartRequestTemplate() + composite = CompositeRequestTemplate( + [ + template, + _StaticTransitionTemplate(), + _RetryableMultipartCleanupTemplate(template), + ] + ) + + template.before_request(request) + composite.after_response(request, response) + + request_id = id(request) + assert request_id not in template._attempted_request_ids + assert request_id not in template._responses_by_request_id + assert request_id not in template._exceptions_by_request_id + + def test__retry_transition_after_response__preserves_saved_retry_state(self): + request = ( + "POST", + "https://example.com/upload", + { + "files": { + "artifact": ("artifact.bin", io.BytesIO(b"artifact")), + } + }, + ) + response = Response() + response.status_code = 503 + response.url = "https://example.com/upload" + retry_transition = object() + template = _RetryableMultipartRequestTemplate() + composite = CompositeRequestTemplate( + [ + template, + _StaticTransitionTemplate(response_transition=retry_transition), + _RetryableMultipartCleanupTemplate(template), + ] + ) + + template.before_request(request) + + assert composite.after_response(request, response) is retry_transition + + request_id = id(request) + assert request_id in template._attempted_request_ids + assert template._responses_by_request_id[request_id] is response + assert request_id not in template._exceptions_by_request_id + + def test__terminal_exception_after_retry_pipeline__clears_saved_retry_state(self): + request = ( + "POST", + "https://example.com/upload", + { + "files": { + "artifact": ("artifact.bin", io.BytesIO(b"artifact")), + } + }, + ) + exception = RuntimeError("boom") + template = _RetryableMultipartRequestTemplate() + composite = CompositeRequestTemplate( + [ + template, + _StaticTransitionTemplate(), + _RetryableMultipartCleanupTemplate(template), + ] + ) + + template.before_request(request) + composite.after_exception(request, RuntimeError, exception, None) + + request_id = id(request) + assert request_id not in template._attempted_request_ids + assert request_id not in template._responses_by_request_id + assert request_id not in template._exceptions_by_request_id + + def test__before_request_on_retry_with_string_only_parts__allows_retry(self): + request = ( + "POST", + "https://example.com/upload", + { + "files": { + "workspace": "workspace-id", + "metadata": (None, '{"name": "example"}'), + } + }, + ) + response = Response() + response.status_code = 429 + response.url = "https://example.com/upload" + template = _RetryableMultipartRequestTemplate() + + template.before_request(request) + template.after_response(request, response) + action = template.before_request(request) + + assert action is None + + +class TestRetryableMultipartRequestIntegration: + @responses.activate + def test__upload_with_unrewindable_stream_after_rate_limit__raises_original_api_exception( + self, + ): + responses.post("https://example.com/upload", status=429) + consumer = _MultipartRetryTestConsumer() + + with pytest.raises(ApiException) as exc_info: + consumer.upload( + artifact=( + "artifact.bin", + _FailingSeekStream(), + "application/octet-stream", + ) + ) + + assert exc_info.value.http_status_code == 429 + assert len(responses.calls) == 1 + + @responses.activate + def test__upload_with_rewindable_stream_after_rate_limit__retries_and_succeeds( + self, + ): + request_bodies = [] + + def response_callback(request): + body = request.body + if isinstance(body, str): + body = body.encode("utf-8") + request_bodies.append(body) + + if len(request_bodies) == 1: + return (429, {}, "") + return (200, {}, "ok") + + responses.add_callback( + responses.POST, + "https://example.com/upload", + callback=response_callback, + ) + consumer = _MultipartRetryTestConsumer() + artifact_content = b"rewindable-artifact" + + consumer.upload( + artifact=( + "artifact.bin", + io.BytesIO(artifact_content), + "application/octet-stream", + ) + ) + + assert len(responses.calls) == 2 + last_response = cast(Any, responses.calls[-1]).response + assert last_response is not None + assert last_response.status_code == 200 + assert len(request_bodies) == 2 + assert artifact_content in request_bodies[0] + assert artifact_content in request_bodies[1] diff --git a/tests/integration/artifact/test_artifact.py b/tests/integration/artifact/test_artifact.py index edecd961..889cba67 100644 --- a/tests/integration/artifact/test_artifact.py +++ b/tests/integration/artifact/test_artifact.py @@ -2,11 +2,17 @@ from typing import List import pytest +import responses from nisystemlink.clients.artifact import ArtifactClient from nisystemlink.clients.artifact.models._upload_artifact_response import ( UploadArtifactResponse, ) from nisystemlink.clients.core._http_configuration import HttpConfiguration +from responses.registries import OrderedRegistry +from uplink.clients.io import blocking_strategy as uplink_blocking_strategy + +BASE_URL = "https://test-api.lifecyclesolutions.ni.com" +DEFAULT_WORKSPACE = "2300760d-38c4-48a1-9acb-800260812337" @pytest.fixture(scope="class") @@ -23,7 +29,7 @@ def create_artifact(client: ArtifactClient): def _create_artifact( content: bytes = b"test content", cleanup: bool = True, - workspace: str = "2300760d-38c4-48a1-9acb-800260812337", + workspace: str = DEFAULT_WORKSPACE, ): # Used the main-test default workspace since the client for creating a workspace has not been added yet artifact_stream = io.BytesIO(content) @@ -51,6 +57,24 @@ def test__upload_artifact__artifact_uploaded( assert upload_response is not None assert upload_response.id is not None + def test__upload_artifact_after_rate_limit_retry__artifact_uploaded( + self, create_artifact, monkeypatch: pytest.MonkeyPatch + ): + monkeypatch.setattr(uplink_blocking_strategy.time, "sleep", lambda _: None) + + with responses.RequestsMock(registry=OrderedRegistry) as request_mock: + request_mock.add( + responses.POST, + f"{BASE_URL}/ninbartifact/v1/artifacts", + status=429, + ) + request_mock.add_passthru(f"{BASE_URL}/ninbartifact/v1/artifacts") + + upload_response: UploadArtifactResponse = create_artifact() + + assert upload_response is not None + assert upload_response.id is not None + def test__download_artifact__artifact_downloaded( self, client: ArtifactClient, create_artifact ): diff --git a/tests/integration/feeds/test_feeds_client.py b/tests/integration/feeds/test_feeds_client.py index e7d00d6d..b820d69e 100644 --- a/tests/integration/feeds/test_feeds_client.py +++ b/tests/integration/feeds/test_feeds_client.py @@ -6,10 +6,14 @@ from typing import BinaryIO, Callable import pytest +import responses from nisystemlink.clients.core import ApiException from nisystemlink.clients.feeds import FeedsClient from nisystemlink.clients.feeds.models import CreateFeedRequest, Platform +from responses.registries import OrderedRegistry +from uplink.clients.io import blocking_strategy as uplink_blocking_strategy +BASE_URL = "https://test-api.lifecyclesolutions.ni.com" FEED_DESCRIPTION = "Sample feed for uploading packages" PACKAGE_PATH = str( Path(__file__).parent.resolve() @@ -236,6 +240,46 @@ def test__upload_package_content__invalid_feed_id_raises( feed_id=invalid_id, ) + def test__upload_package_content_after_rate_limit_retry__upload_package_content_succeeds( + self, + client: FeedsClient, + create_feed: Callable, + create_feed_request: Callable, + get_feed_name: Callable, + monkeypatch: pytest.MonkeyPatch, + ): + create_feed_request_body = create_feed_request( + feed_name=get_feed_name(), + description=FEED_DESCRIPTION, + platform=Platform.WINDOWS, + ) + create_feed_resp = create_feed(create_feed_request_body) + assert create_feed_resp.id is not None + + response = None + + monkeypatch.setattr(uplink_blocking_strategy.time, "sleep", lambda _: None) + + with responses.RequestsMock(registry=OrderedRegistry) as request_mock: + request_mock.add( + responses.POST, + f"{BASE_URL}/nifeed/v1/feeds/{create_feed_resp.id}/packages", + status=429, + ) + request_mock.add_passthru( + f"{BASE_URL}/nifeed/v1/feeds/{create_feed_resp.id}/packages" + ) + + with open(PACKAGE_PATH, "rb") as package: + response = client.upload_package_content( + feed_id=create_feed_resp.id, + package=package, + overwrite=True, + ) + + assert response is not None + assert response.id is not None + def test__delete_windows_feed__succeeds( self, client: FeedsClient, diff --git a/tests/integration/file/test_file_client.py b/tests/integration/file/test_file_client.py index 7dfa8534..d8bdfa2f 100644 --- a/tests/integration/file/test_file_client.py +++ b/tests/integration/file/test_file_client.py @@ -9,6 +9,7 @@ import backoff # type: ignore import pytest # type: ignore +import responses from nisystemlink.clients.core import ApiException from nisystemlink.clients.file import FileClient from nisystemlink.clients.file.models import ( @@ -20,7 +21,10 @@ UpdateMetadataRequest, ) from nisystemlink.clients.file.utilities import rename_file +from responses.registries import OrderedRegistry +from uplink.clients.io import blocking_strategy as uplink_blocking_strategy +BASE_URL = "https://test-api.lifecyclesolutions.ni.com" FILE_NOT_FOUND_ERR = "Not Found" PREFIX = "File Client Tests-" TEST_FILE_DATA = b"This is a test file binary content." @@ -115,6 +119,41 @@ def test__upload_file_with_metadata__succeeds( len(files.available_files[0].properties.keys()) == len(metadata) + 1 ) # Name + 1 custom property + def test__upload_file_after_rate_limit_retry__upload_file_succeeds( + self, client: FileClient, monkeypatch: pytest.MonkeyPatch + ): + """Retrying a file upload should succeed against the real endpoint.""" + test_file = io.BytesIO(TEST_FILE_DATA) + test_file.name = "retry-safe-file.bin" + file_id = None + + monkeypatch.setattr(uplink_blocking_strategy.time, "sleep", lambda _: None) + + try: + with responses.RequestsMock(registry=OrderedRegistry) as request_mock: + request_mock.add( + responses.POST, + f"{BASE_URL}/nifile/v1/service-groups/Default/upload-files", + status=429, + ) + request_mock.add_passthru( + f"{BASE_URL}/nifile/v1/service-groups/Default/upload-files" + ) + + file_id = client.upload_file(file=test_file) + + assert file_id is not None + + files = client.get_files(ids=[file_id]) + assert files.total_count == 1 + assert len(files.available_files) == 1 + assert files.available_files[0].id == file_id + assert files.available_files[0].properties is not None + assert files.available_files[0].properties["Name"] == test_file.name + finally: + if file_id: + client.delete_file(id=file_id) + def test__upload_get_delete_files__succeeds( self, client: FileClient, test_file, random_filename_extension ): diff --git a/tests/integration/notebook/test_notebook_client.py b/tests/integration/notebook/test_notebook_client.py index 104e790b..78e85ce4 100644 --- a/tests/integration/notebook/test_notebook_client.py +++ b/tests/integration/notebook/test_notebook_client.py @@ -14,6 +14,8 @@ QueryExecutionsRequest, QueryNotebookRequest, ) +from responses.registries import OrderedRegistry +from uplink.clients.io import blocking_strategy as uplink_blocking_strategy TEST_FILE_DATA = b"This is a test notebook binary content." PREFIX = "Notebook Client Tests-" @@ -284,6 +286,78 @@ def test__get_notebook_content_by_invalid_id__raises_ApiException_NotFound( with pytest.raises(ApiException, match="Not Found"): client.get_notebook_content(id="invalid_id") + def test__create_notebook_after_rate_limit_retry__notebook_created_with_valid_metadata( + self, + client: NotebookClient, + random_filename: str, + monkeypatch: pytest.MonkeyPatch, + ): + metadata = NotebookMetadata(name=random_filename) + notebook = None + notebook_id = None + + monkeypatch.setattr(uplink_blocking_strategy.time, "sleep", lambda _: None) + + with responses.RequestsMock(registry=OrderedRegistry) as request_mock: + request_mock.add( + responses.POST, + f"{BASE_URL}/ninotebook/v1/notebook", + status=429, + ) + request_mock.add_passthru(f"{BASE_URL}/ninotebook/v1/notebook") + + with open("tests/integration/notebook/sample_file.ipynb", "rb") as file: + notebook = client.create_notebook(metadata=metadata, content=file) + notebook_id = notebook.id + + try: + assert notebook is not None + assert notebook.id is not None + assert notebook.name == metadata.name + assert notebook.workspace is not None + assert notebook.created_by is not None + assert notebook.created_at is not None + finally: + if notebook_id: + client.delete_notebook(id=notebook_id) + + def test__update_notebook_metadata_after_rate_limit_retry__update_notebook_metadata_succeeds( + self, + client: NotebookClient, + create_notebook, + random_filename: str, + monkeypatch: pytest.MonkeyPatch, + ): + metadata = NotebookMetadata(name=random_filename) + notebook = create_notebook(metadata=metadata) + + [filename, extension] = random_filename.split(".") + new_name = f"{filename}-retry-updated.{extension}" + notebook.name = new_name + notebook.properties = {"key": "value"} + response = None + + monkeypatch.setattr(uplink_blocking_strategy.time, "sleep", lambda _: None) + + with responses.RequestsMock(registry=OrderedRegistry) as request_mock: + request_mock.add( + responses.PUT, + f"{BASE_URL}/ninotebook/v1/notebook/{notebook.id}", + status=429, + ) + request_mock.add_passthru( + f"{BASE_URL}/ninotebook/v1/notebook/{notebook.id}" + ) + + response = client.update_notebook(id=notebook.id, metadata=notebook) + + assert response is not None + assert response.id == notebook.id + assert response.name == new_name + assert response.properties == notebook.properties + assert response.updated_by is not None + assert response.updated_at is not None + @responses.activate def test__create_executions_with_valid_notebook_id__returns_executions_with_right_fields( self,