-
Notifications
You must be signed in to change notification settings - Fork 0
Span Buffer Multiprocess Enhancement with Health Monitoring #10
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Open
akshayutture-augment
wants to merge
1
commit into
span-flusher-stable
Choose a base branch
from
span-flusher-multiprocess
base: span-flusher-stable
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Open
Changes from all commits
Commits
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🧩 Analysis chain
Health monitoring and restart logic: watch process type check and thread-mode restarts
The per-process health logic is a good upgrade, but a couple of details are worth revisiting:
Process type check before
kill()Here
processinstances are created viaself.mp_context.Process(...), whose type ismultiprocessing.context.SpawnProcess. Relying onisinstance(process, multiprocessing.Process)may or may not match depending on the exact class hierarchy and could silently skip the kill step in some environments.To make this robust and independent of the concrete multiprocessing implementation, consider checking capabilities instead of type, e.g.:
while explicitly excluding threads if you want:
Thread mode (
produce_to_pipe) restart behaviorIn thread mode,
processis athreading.Thread, so thekill()branch is skipped. If a thread is considered “hung” (health exceeds threshold),_create_process_for_shardswill start an additional thread for the same shard set without stopping the old one. That’s acceptable in tests but could lead to multiple flusher threads per shard ifproduce_to_pipeis ever used outside unit tests.If you expect thread mode beyond tests, it would be safer to special-case threads (e.g., don’t auto-restart on hang, or track/join and only create one replacement).
Given the current docstring says
produce_to_pipeis for unit testing, treating these as future-proofing rather than critical bugs seems reasonable.🏁 Script executed:
Length of output: 1908
🏁 Script executed:
Length of output: 4162
Fix broken
isinstancecheck for process type and implement proper thread cleanupYour concerns are confirmed by the code. Both issues are real:
Line 249:
isinstance(process, multiprocessing.Process)fails for SpawnProcessThe Python test confirms that
multiprocessing.get_context("spawn").Process(...)returnsSpawnProcess, which is not an instance ofmultiprocessing.Process. This meansprocess.kill()at line 253 is never executed. Old multiprocessing processes become orphans and accumulate as zombie processes.Fix: Use
hasattr(process, "kill")instead:Line 124 assigns new thread without stopping old one
In thread mode,
_create_process_for_shardscreates a new thread and overwrites the dict entry at line 124 without stopping the old thread. Both threads continue running for the same shard set, creating duplicates.Fix: For thread mode, either (a) join the old thread before creating a new one, or (b) prevent auto-restart on hang for threads, or (c) explicitly exclude threads from kill logic and don't auto-restart them.
🧰 Tools
🪛 Ruff (0.14.5)
248-250: Avoid specifying long messages outside the exception class
(TRY003)