Skip to content

[feat] Add retransmission mechanism for MooncakeStoreClient#94

Open
0oshowero0 wants to merge 3 commits into
Ascend:mainfrom
0oshowero0:add_retry_for_mooncake
Open

[feat] Add retransmission mechanism for MooncakeStoreClient#94
0oshowero0 wants to merge 3 commits into
Ascend:mainfrom
0oshowero0:add_retry_for_mooncake

Conversation

@0oshowero0
Copy link
Copy Markdown
Collaborator

@0oshowero0 0oshowero0 commented May 11, 2026

Background

When using MooncakeStore as TQ's backend, we observe occasional transmission errors during verl e2e runs:

E0508 17:18:06.011560 731271 tcp_transport.cpp:708] TcpTransport::getConnection failed to create connection to 61.28.30.25:16181. Error: connect: Connection timed out
E0508 17:18:06.011600 731277 tcp_transport.cpp:886] TcpTransport::startTransfer failed to get connection to 61.28.30.25:15816
E0508 17:18:06.011888 731271 transfer_task.cpp:281] Batch 281200032997056 completed with task failures: task_ids=[0]
E0508 17:18:06.011895 731271 client_service.cpp:1100] Transfer failed for key: 68108@uid with error: -800
E0508 17:18:06.011996 731271 real_client.cpp:2253] BatchGet failed for key '68108@uid': TRANSFER_FAIL

These Connection timed out / TRANSFER_FAIL (error: -800) errors are transient network issues that typically resolve on a subsequent attempt. However, the previous client implementation had no retry logic whatsoever:

  • On the tensor path, any single key returning a negative status code would trigger an immediate RuntimeError, failing the entire batch and crashing the training job.
  • On the bytes path, the failure was far worse: get_batch returns b"" for keys that encountered a transfer failure, and the client blindly passed these empty bytes through pickle.loads(... if result != b"" else None), treating them as legitimate None values.

This leads to silent content corruption. A training worker could proceed with corrupted or missing data without ever knowing that a transmission failure occurred, compromising model correctness.

This PR addresses all failure modes on both the read (get) and write (put) paths by adding controlled retries that isolate failed keys and attempt retransmission before giving up.

Summary

This PR introduces a retry mechanism for transient failures in MooncakeStoreClient, covering both read (get) and write (put) operations, for both tensor and non-tensor data paths.

Previously, the client had zero tolerance for transient errors:

  • Tensor read (_get_tensors_thread_worker): a single key failure (ret < 0) would immediately raise RuntimeError, causing the entire batch to fail.
  • Non-tensor read (_get_bytes_thread_worker): no failure detection at all. Empty byte strings (b"") — which MooncakeStore returns on transmission failures — were silently deserialized as None, making it impossible for callers to distinguish between "value is None" and "transfer failed".
  • Tensor write (_put_tensors_thread_worker): any single key returning a non-zero status would immediately abort the entire batch with RuntimeError.
  • Non-tensor write (_put_bytes_thread_worker): a single upsert_batch failure would immediately abort the batch with RuntimeError.

This change adds up to 3 retries with 1-second backoff across all four paths. For paths that expose per-key status codes, only the failed subset of keys is retried on each attempt.

Future Work

  • Replace the b"" heuristic in _get_bytes_thread_worker with proper per-key error codes once MooncakeStore exposes them, then upgrade the exhausted-retry path from logger.error to raise RuntimeError.
  • When MooncakeStore supports per-key status codes for upsert_batch and get_batch, switch the bytes write/read paths from whole-batch retry to per-key selective retry, matching the tensor-path behaviour.

Signed-off-by: 0oshowero0 <o0shower0o@outlook.com>
Copilot AI review requested due to automatic review settings May 11, 2026 09:25
@ascend-robot
Copy link
Copy Markdown

CLA Signature Pass

0oshowero0, thanks for your pull request. All authors of the commits have signed the CLA. 👍

Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR adds retry logic to MooncakeStoreClient.get() to make transient MooncakeStore transfer failures less likely to crash training jobs or silently corrupt returned values.

Changes:

  • Added retry-with-backoff for the tensor retrieval path (batch_get_into) by retrying only the failed subset of keys.
  • Added retry-with-backoff for the non-tensor retrieval path (get_batch) by detecting failures via b"" and retrying only the failed subset.
  • Increased BATCH_SIZE_LIMIT from 200 to 400.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines +37 to +40
BATCH_SIZE_LIMIT: int = 400
MAX_WORKER_THREADS = 4
MAX_RETRIES = 3
RETRY_DELAY_SECONDS = 1.0
Comment on lines +259 to +275
for attempt in range(1, MAX_RETRIES + 1):
# Reuse the originally allocated pointers; no need to allocate/register new buffers.
retry_ptrs = [batch_buffer_ptrs[i] for i in current_failed_indices]
retry_nbytes = [batch_nbytes[i] for i in current_failed_indices]

retry_codes = self._store.batch_get_into(current_failed_keys, retry_ptrs, retry_nbytes)

next_failed_indices = []
next_failed_keys = []
next_failed_codes = []

for i, ret in enumerate(retry_codes):
if ret < 0:
next_failed_indices.append(current_failed_indices[i])
next_failed_keys.append(current_failed_keys[i])
next_failed_codes.append(ret)

Comment on lines +313 to +327
for attempt in range(1, MAX_RETRIES + 1):
retry_results = self._store.get_batch(current_failed_keys)

next_failed_keys = []
next_failed_indices = []

for i, result in enumerate(retry_results):
original_idx = current_failed_indices[i]
if result == b"":
next_failed_keys.append(current_failed_keys[i])
next_failed_indices.append(original_idx)
else:
# Write the successfully retried value back to its original slot immediately.
raw_results[original_idx] = result

Comment on lines +337 to +346
else:
# All retries exhausted.
# FIXME: raise error here when we can distinguish transmission failures from empty values
logger.error(
f"get_batch failed for keys {current_failed_keys} after retrying {MAX_RETRIES} times. "
f"Please validate if the values corresponding to these keys are `None` during put."
)

return results, indexes
deserialized_results = [pickle.loads(result) if result != b"" else None for result in raw_results]
return deserialized_results, indexes
Signed-off-by: 0oshowero0 <o0shower0o@outlook.com>
@ascend-robot
Copy link
Copy Markdown

CLA Signature Pass

0oshowero0, thanks for your pull request. All authors of the commits have signed the CLA. 👍

Signed-off-by: 0oshowero0 <o0shower0o@outlook.com>
@0oshowero0 0oshowero0 requested a review from Copilot May 12, 2026 02:14
@ascend-robot
Copy link
Copy Markdown

CLA Signature Pass

0oshowero0, thanks for your pull request. All authors of the commits have signed the CLA. 👍

Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 1 out of 1 changed files in this pull request and generated 5 comments.


retry_results = self._store.batch_upsert_from(
current_failed_keys, retry_ptrs, retry_sizes, config=self.replica_config
)
retry_ptrs = [batch_buffer_ptrs[i] for i in current_failed_indices]
retry_nbytes = [batch_nbytes[i] for i in current_failed_indices]

retry_codes = self._store.batch_get_into(current_failed_keys, retry_ptrs, retry_nbytes)
Comment on lines +392 to +400
for attempt in range(1, MAX_RETRIES + 1):
retry_results = self._store.get_batch(current_failed_keys)

next_failed_keys = []
next_failed_indices = []

for i, result in enumerate(retry_results):
original_idx = current_failed_indices[i]
if result == b"":
Comment on lines +37 to +40
BATCH_SIZE_LIMIT: int = 400
MAX_WORKER_THREADS = 4
MAX_RETRIES = 3
RETRY_DELAY_SECONDS = 1.0
Comment on lines +419 to +423
else:
# All retries exhausted.
raise RuntimeError(
f"get_batch failed for keys {current_failed_keys} after retrying {MAX_RETRIES} times."
)
@ji-huazhong
Copy link
Copy Markdown
Collaborator

Perhaps we need to add a debug mode to verify the consistency of data transmission and reception? WDYT?

@0oshowero0
Copy link
Copy Markdown
Collaborator Author

I believe it should be done by the communication backend

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants