Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 13 additions & 6 deletions nisystemlink/clients/artifact/_artifact_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,21 @@
post,
response_handler,
)
from nisystemlink.clients.core._uplink._multipart_retry import (
retryable_multipart_request,
)
from nisystemlink.clients.core.helpers._iterator_file_like import IteratorFileLike
from requests.models import Response
from uplink import Part, Path
from uplink import Part, Path, retry

from . import models


def _iter_content_filelike_wrapper(response: Response) -> IteratorFileLike:
return IteratorFileLike(response.iter_content(chunk_size=4096))


@retry(when=retry.when.status(429), stop=retry.stop.after_attempt(5))
class ArtifactClient(BaseClient):
def __init__(self, configuration: core.HttpConfiguration | None = None):
"""Initialize an instance.
Expand All @@ -35,9 +43,10 @@ def __init__(self, configuration: core.HttpConfiguration | None = None):

super().__init__(configuration, base_path="/ninbartifact/v1/")

@post("artifacts")
@retryable_multipart_request()
@post("artifacts", args=[Part("workspace"), Part("artifact")])
def __upload_artifact(
self, workspace: Part, artifact: Part
self, workspace: str, artifact: BinaryIO
) -> models.UploadArtifactResponse:
"""Uploads an artifact using multipart/form-data headers to send the file payload in the HTTP body.

Expand All @@ -49,6 +58,7 @@ def __upload_artifact(
UploadArtifactResponse: The response containing the artifact ID.

"""
...

def upload_artifact(
self, workspace: str, artifact: BinaryIO
Expand All @@ -70,9 +80,6 @@ def upload_artifact(

return response

def _iter_content_filelike_wrapper(response: Response) -> IteratorFileLike:
return IteratorFileLike(response.iter_content(chunk_size=4096))

@response_handler(_iter_content_filelike_wrapper)
@get("artifacts/{id}")
def download_artifact(self, id: Path) -> IteratorFileLike:
Expand Down
189 changes: 189 additions & 0 deletions nisystemlink/clients/core/_uplink/_multipart_retry.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,189 @@
"""Helpers for multipart requests that need retry-safe stream handling.

The decorator exported by this module is intended for multipart requests that may
be retried by Uplink. It preserves the caller's stream position on the initial
send, rewinds seekable multipart parts only on retry attempts, and aborts the
retry if any part cannot be rewound.

When a retry is aborted because a part cannot be rewound, the original response
or exception that triggered the retry is surfaced back through Uplink's normal
response and error handling pipeline instead of sending a malformed follow-up
request.
"""

import io
from enum import auto, Enum
from typing import Any, Callable, cast, TypeVar

from requests import Response
from uplink import decorators
from uplink.clients.io import transitions
from uplink.clients.io.interfaces import RequestTemplate

F = TypeVar("F", bound=Callable[..., Any])


class _RewindResult(Enum):
REWOUND = auto()
FAILED = auto()
NOT_NEEDED = auto()


def _rewind_retryable_part(part: object) -> _RewindResult:
"""Rewind the first seekable multipart payload contained in ``part``.

Returns ``_RewindResult.REWOUND`` when a multipart payload was successfully
rewound to the start of the stream. Returns
``_RewindResult.FAILED`` when a stream payload appears to need rewinding but
rejects it. Returns ``_RewindResult.NOT_NEEDED`` when the part contains no
stream payload that requires rewinding, such as simple string fields.
"""
if hasattr(part, "seek"):
seekable = getattr(part, "seekable", None)
if callable(seekable):
try:
if not cast(Any, seekable)():
return _RewindResult.FAILED
except (OSError, io.UnsupportedOperation):
return _RewindResult.FAILED

try:
cast(Any, part).seek(0)
except (OSError, io.UnsupportedOperation):
return _RewindResult.FAILED
return _RewindResult.REWOUND

if isinstance(part, tuple):
for item in part:
rewind_result = _rewind_retryable_part(item)
if rewind_result is not _RewindResult.NOT_NEEDED:
return rewind_result

return _RewindResult.NOT_NEEDED


def _get_saved_retry_action(
response: Response | None,
exception_info: tuple[type[BaseException], BaseException, Any] | None,
) -> Any:
"""Return the original retry-triggering failure as an Uplink transition."""
if response is not None:
return transitions.finish(response)

if exception_info is not None:
return transitions.fail(*exception_info)

return None


class _RetryableMultipartRequestTemplate(RequestTemplate):
"""Track multipart retry state and rewind streams only for retry sends.

The first request attempt is left untouched so callers can intentionally
provide streams positioned away from offset 0. On later attempts, each file
part must be rewound to the beginning before the request is sent again.

If rewinding fails for any part, the retry is cancelled and the original
response or exception that caused the retry is returned to Uplink so normal
error handling can surface it to the caller.
"""

def __init__(self) -> None:
self._attempted_request_ids: set[int] = set()
self._responses_by_request_id: dict[int, Response] = {}
self._exceptions_by_request_id: dict[
int, tuple[type[BaseException], BaseException, Any]
] = {}

def _clear_request_state(self, request_id: int) -> None:
self._attempted_request_ids.discard(request_id)
self._responses_by_request_id.pop(request_id, None)
self._exceptions_by_request_id.pop(request_id, None)

def before_request(self, request: tuple[str, str, dict[str, Any]]) -> Any:
_, _, extras = request
request_id = id(request)
if request_id not in self._attempted_request_ids:
self._attempted_request_ids.add(request_id)
return None

for part in extras.get("files", {}).values():
if _rewind_retryable_part(part) is _RewindResult.FAILED:
retry_action = _get_saved_retry_action(
self._responses_by_request_id.get(request_id),
self._exceptions_by_request_id.get(request_id),
)
self._clear_request_state(request_id)
return retry_action
return None
Comment thread
rbell517 marked this conversation as resolved.

def after_response(
self, request: tuple[str, str, dict[str, Any]], response: Response
) -> None:
request_id = id(request)
self._responses_by_request_id[request_id] = response
self._exceptions_by_request_id.pop(request_id, None)
return None
Comment thread
rbell517 marked this conversation as resolved.

def after_exception(
self,
request: tuple[str, str, dict[str, Any]],
exc_type: type[BaseException],
exc_val: BaseException,
exc_tb: Any,
) -> None:
request_id = id(request)
self._exceptions_by_request_id[request_id] = (exc_type, exc_val, exc_tb)
self._responses_by_request_id.pop(request_id, None)
return None


class _RetryableMultipartCleanupTemplate(RequestTemplate):
def __init__(self, retry_template: _RetryableMultipartRequestTemplate) -> None:
self._retry_template = retry_template

def after_response(
self, request: tuple[str, str, dict[str, Any]], response: Response
) -> None:
self._retry_template._clear_request_state(id(request))
return None

def after_exception(
self,
request: tuple[str, str, dict[str, Any]],
exc_type: type[BaseException],
exc_val: BaseException,
exc_tb: Any,
) -> None:
self._retry_template._clear_request_state(id(request))
return None
Comment thread
rbell517 marked this conversation as resolved.


class _RetryableMultipartRequest(decorators.MethodAnnotation):
def modify_request(self, request_builder: Any) -> None:
retryable_template = _RetryableMultipartRequestTemplate()
# Insert ahead of Uplink's retry template so this helper can see the
# original retry-triggering response/exception and short-circuit future
# attempts when a multipart stream cannot be rewound safely.
request_builder._request_templates.insert(0, retryable_template)
request_builder._request_templates.append(
_RetryableMultipartCleanupTemplate(retryable_template)
)


def retryable_multipart_request() -> Callable[[F], F]:
"""Create a decorator for multipart requests with retry-safe stream handling.

Behavior:
- The initial send preserves the caller-provided stream position.
- Retry attempts rewind seekable multipart payloads back to offset 0.
- Multipart fields that do not contain streams, such as simple strings, are
left alone and do not block retries.
- If a retry attempt cannot rewind a payload, the retry is cancelled and the
original retry-triggering response or exception is surfaced.
"""

def decorator(func: F) -> F:
return _RetryableMultipartRequest()(func) # type: ignore[return-value]

return decorator
4 changes: 4 additions & 0 deletions nisystemlink/clients/feeds/_feeds_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@
from nisystemlink.clients import core
from nisystemlink.clients.core._uplink._base_client import BaseClient
from nisystemlink.clients.core._uplink._methods import delete, get, post
from nisystemlink.clients.core._uplink._multipart_retry import (
retryable_multipart_request,
)
from uplink import Part, Path, Query, retry

from . import models
Expand Down Expand Up @@ -90,6 +93,7 @@ def query_feeds(

return response

@retryable_multipart_request()
@post(
"feeds/{feedId}/packages",
args=[Path(name="feedId"), Part(), Query(name="shouldOverwrite")],
Expand Down
5 changes: 5 additions & 0 deletions nisystemlink/clients/file/_file_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@
post,
response_handler,
)
from nisystemlink.clients.core._uplink._multipart_retry import (
retryable_multipart_request,
)
from nisystemlink.clients.core.helpers import IteratorFileLike
from requests.models import Response
from uplink import Body, Field, params, Part, Path, Query, retry
Expand Down Expand Up @@ -331,6 +334,7 @@ def download_file(self, id: str) -> IteratorFileLike:
"""

@response_handler(_file_uri_response_handler)
@retryable_multipart_request()
@post("service-groups/Default/upload-files")
def __upload_file(
self,
Expand Down Expand Up @@ -433,6 +437,7 @@ def start_upload_session(
],
)
@response_handler(lambda response: None)
@retryable_multipart_request()
def append_to_upload_session(
self,
session_id: str,
Expand Down
31 changes: 21 additions & 10 deletions nisystemlink/clients/notebook/_notebook_client.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import io
from typing import List
from typing import BinaryIO, List

from nisystemlink.clients import core
from nisystemlink.clients.core._api_error import ApiError
Expand All @@ -14,6 +14,9 @@
put,
response_handler,
)
from nisystemlink.clients.core._uplink._multipart_retry import (
retryable_multipart_request,
)
from nisystemlink.clients.core.helpers._iterator_file_like import IteratorFileLike
from uplink import Part, Path, retry

Expand Down Expand Up @@ -54,12 +57,16 @@ def get_notebook(self, id: str) -> models.NotebookMetadata:
"""
...

@put("ninotebook/v1/notebook/{id}")
@retryable_multipart_request()
@put(
"ninotebook/v1/notebook/{id}",
args=[Path("id"), Part("metadata"), Part("content")],
)
def __update_notebook(
self,
id: Path,
metadata: Part = None,
content: Part = None,
id: str,
metadata: io.BytesIO | None = None,
content: BinaryIO | None = None,
) -> models.NotebookMetadata:
"""Updates a notebook metadata by ID.

Expand All @@ -81,7 +88,7 @@ def update_notebook(
self,
id: str,
metadata: models.NotebookMetadata | None = None,
content: io.BufferedReader | None = None,
content: BinaryIO | None = None,
) -> models.NotebookMetadata:
"""Updates a notebook metadata by ID.

Expand Down Expand Up @@ -121,11 +128,15 @@ def delete_notebook(self, id: str) -> None:
"""
...

@post("ninotebook/v1/notebook")
@retryable_multipart_request()
@post(
"ninotebook/v1/notebook",
args=[Part("metadata"), Part("content")],
)
def __create_notebook(
self,
metadata: Part,
content: Part,
metadata: io.BytesIO,
content: BinaryIO,
) -> models.NotebookMetadata:
"""Creates a new notebook.

Expand All @@ -145,7 +156,7 @@ def __create_notebook(
def create_notebook(
self,
metadata: models.NotebookMetadata,
content: io.BufferedReader,
content: BinaryIO,
) -> models.NotebookMetadata:
"""Creates a new notebook.

Expand Down
Loading
Loading