Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import aaanalysis.utils as ut
from ._progress import (
_resolve_shared,
_worker_shared,
_reset_progress,
_cleanup_mp_manager,
)
Expand Down Expand Up @@ -244,10 +245,15 @@ def _mp_scale_assignment(scales_chunk, shared_max_progress, shared_value_lock, p
prefer_multiprocessing=False,
)

# Manager proxies pickle to loky workers unchanged; the thread-safe fallback
# (Manager unavailable) is passed as None so each worker uses its own defaults.
w_max_progress, w_value_lock, w_print_lock = _worker_shared(
shared_max_progress, shared_value_lock, print_lock
)
scale_chunks = np.array_split(list(dict_all_scales.keys()), n_jobs)
with Parallel(n_jobs=n_jobs) as parallel:
results = parallel(
delayed(_mp_scale_assignment)(chunk, shared_max_progress, shared_value_lock, print_lock)
delayed(_mp_scale_assignment)(chunk, w_max_progress, w_value_lock, w_print_lock)
for chunk in scale_chunks
)

Expand Down
81 changes: 77 additions & 4 deletions aaanalysis/feature_engineering/_backend/cpp/_filters/_progress.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,16 @@
"An attempt has been made to start a new process before the current process has finished..."
- Create multiprocessing shared objects lazily (only when needed) and only from the main process.
- Always allow passing shared_* objects explicitly to support true cross-process progress updates.

Graceful-degradation contract:
- The multiprocessing.Manager only backs a *cosmetic* cross-process progress bar.
In some non-interactive contexts (``python -c``, heredocs, certain subprocess
shells) the Manager's spawn/pipe handshake fails with EOFError / OSError.
Manager creation is therefore best-effort: on failure ``_get_mp_shared()``
returns None (leaving all module globals untouched) and ``_resolve_shared()``
falls back to the thread-safe ``DEFAULT_SHARED_*`` objects, so the run still
completes single-process instead of aborting. When the Manager succeeds,
behavior is unchanged.
"""
import threading
import multiprocessing as mp
Expand Down Expand Up @@ -50,9 +60,22 @@ def _get_mp_shared():
"""
Lazily create a multiprocessing.Manager + shared objects.

Returns a ``(shared_max_progress, shared_value_lock, print_lock)`` triple, or
None when Manager-backed shared state is unavailable. None is returned when:
- called from a spawned worker (must never start a Manager there), or
- the Manager could not be created (best-effort; see below).

IMPORTANT:
- Must only be called from the main process.
- Never called at import time.

Best-effort Manager creation: a ``multiprocessing.Manager`` spawns a helper
process and talks to it over a pipe. In some non-interactive contexts
(``python -c``, heredocs, certain subprocess shells) that handshake fails with
EOFError / OSError. Since the Manager only backs a cosmetic cross-process
progress bar, such a failure is non-fatal: we return None (leaving every module
global untouched and the refcount unbumped) and the caller degrades to the
thread-safe ``DEFAULT_SHARED_*`` objects via ``_resolve_shared()``.
"""
global _MP_MANAGER, _MP_SHARED_MAX_PROGRESS, _MP_SHARED_VALUE_LOCK, _MP_PRINT_LOCK, _MP_MANAGER_REFCOUNT

Expand All @@ -61,10 +84,31 @@ def _get_mp_shared():
return None

if _MP_MANAGER is None:
_MP_MANAGER = mp.Manager()
_MP_SHARED_MAX_PROGRESS = _MP_MANAGER.Value("d", 0.0)
_MP_SHARED_VALUE_LOCK = _MP_MANAGER.Lock()
_MP_PRINT_LOCK = _MP_MANAGER.Lock()
# Build into locals first, commit to globals only on full success, so a
# partial failure never leaves half-initialized globals or a bumped
# refcount. The broad ``except`` is intentional and scoped to just this
# creation block: the Manager only powers a cosmetic progress bar, so any
# spawn/handshake failure (EOFError / OSError and friends) must degrade to
# the thread-safe defaults instead of aborting the run.
manager = None
try:
manager = mp.Manager()
shared_max_progress = manager.Value("d", 0.0)
shared_value_lock = manager.Lock()
print_lock = manager.Lock()
except Exception:
# Shut down a half-started Manager process (if any) so it does not leak,
# then signal "no shared state" -> caller uses DEFAULT_SHARED_*.
if manager is not None:
try:
manager.shutdown()
except Exception:
pass
return None
_MP_MANAGER = manager
_MP_SHARED_MAX_PROGRESS = shared_max_progress
_MP_SHARED_VALUE_LOCK = shared_value_lock
_MP_PRINT_LOCK = print_lock
_MP_MANAGER_REFCOUNT = 0

_MP_MANAGER_REFCOUNT += 1
Expand Down Expand Up @@ -119,6 +163,35 @@ def _resolve_shared(shared_max_progress=None, shared_value_lock=None, print_lock
return DEFAULT_SHARED_MAX_PROGRESS, DEFAULT_SHARED_VALUE_LOCK, DEFAULT_PRINT_LOCK


def _worker_shared(shared_max_progress, shared_value_lock, print_lock):
"""
Adapt resolved progress objects for handing to spawned *process* workers.

joblib's process backends (loky) pickle whatever is captured in each
``delayed(...)`` call. Manager-backed proxies pickle fine and stay shared
across processes, so they are returned unchanged (unified progress bar,
byte-identical to the Manager-available path). The thread-safe
``DEFAULT_SHARED_*`` fallbacks (used when the Manager could not be created;
see ``_get_mp_shared``) are NOT picklable to spawned processes: a
``threading.Lock`` cannot cross the process boundary. In that degraded case
return ``(None, None, None)`` so each worker self-resolves its own
process-local defaults instead of aborting the whole run with a
``PicklingError``. Progress then reports per-worker (cosmetic degradation),
but the computation still completes.

Only needed at process-backed dispatch sites; threading-backed parallelism
shares the objects in-process and does not pickle them.
"""
is_thread_default = (
shared_max_progress is DEFAULT_SHARED_MAX_PROGRESS
or shared_value_lock is DEFAULT_SHARED_VALUE_LOCK
or print_lock is DEFAULT_PRINT_LOCK
)
if is_thread_default:
return None, None, None
return shared_max_progress, shared_value_lock, print_lock


def _reset_progress(shared_max_progress, shared_value_lock):
"""Reset shared progress to 0 in a safe way."""
with shared_value_lock:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
from .._split import SplitRange
from ._progress import (
_resolve_shared,
_worker_shared,
_reset_progress,
_cleanup_mp_manager,
)
Expand Down Expand Up @@ -337,10 +338,15 @@ def _mp_pre_filtering_info(scale_idx_chunk, shared_max_progress, shared_value_lo
prefer_multiprocessing=False,
)

# Manager proxies pickle to loky workers unchanged; the thread-safe fallback
# (Manager unavailable) is passed as None so each worker uses its own defaults.
w_max_progress, w_value_lock, w_print_lock = _worker_shared(
shared_max_progress, shared_value_lock, print_lock
)
scale_idx_chunks = np.array_split(np.arange(len(list_scales)), n_jobs)
with Parallel(n_jobs=n_jobs) as parallel:
results = parallel(
delayed(_mp_pre_filtering_info)(chunk, shared_max_progress, shared_value_lock, print_lock)
delayed(_mp_pre_filtering_info)(chunk, w_max_progress, w_value_lock, w_print_lock)
for chunk in scale_idx_chunks
)

Expand Down
11 changes: 11 additions & 0 deletions docs/source/index/release_notes.rst
Original file line number Diff line number Diff line change
Expand Up @@ -363,6 +363,17 @@ Changed
full-path import such as ``from aaanalysis.protein_design import SeqMut`` must become
``from aaanalysis.protein_engineering import SeqMut``.

Fixed
~~~~~

- :meth:`~aaanalysis.CPP.run` with ``n_jobs > 1`` no longer crashes in non-interactive
contexts (e.g. ``python -c``, heredocs, some subprocess shells) where starting a
``multiprocessing.Manager`` for the cross-process progress bar raised ``EOFError`` /
``OSError``. The Manager is now created best-effort: on failure CPP degrades to the
thread-safe, single-process progress path and the run completes normally instead of
aborting (previously the only workaround was ``n_jobs=1``). When the Manager is
available, behavior and output are unchanged.


Version 1.0 (Stable Version)
--------------------------------
Expand Down
193 changes: 193 additions & 0 deletions tests/unit/cpp_tests/test_progress_backend.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,193 @@
"""This is a script to test the shared-progress / multiprocessing backend of CPP
(``_backend/cpp/_filters/_progress.py``): the lazy Manager creation, its
graceful degradation to the thread-safe defaults when the Manager cannot be
spawned (issue #339 — EOFError in ``python -c`` / heredoc / subprocess shells),
and the resolve/cleanup helpers.

Driven directly at the backend because the Manager-failure arm is not reachable
from the CPP frontend (it depends on the interpreter's process-launch context).
"""
import multiprocessing as mp

import pytest

import aaanalysis as aa
import aaanalysis.feature_engineering._backend.cpp._filters._progress as prog

aa.options["verbose"] = False


# I Helper Functions
def _reset_module_globals():
"""Restore the module's lazy-Manager globals to their pristine (import) state."""
# Best-effort shutdown of any live Manager left by a previous test.
if prog._MP_MANAGER is not None:
try:
prog._MP_MANAGER.shutdown()
except Exception:
pass
prog._MP_MANAGER = None
prog._MP_SHARED_MAX_PROGRESS = None
prog._MP_SHARED_VALUE_LOCK = None
prog._MP_PRINT_LOCK = None
prog._MP_MANAGER_REFCOUNT = 0


@pytest.fixture(autouse=True)
def _clean_progress_globals():
"""Ensure each test starts and ends with pristine module globals."""
_reset_module_globals()
yield
_reset_module_globals()


# II Test Classes
class TestGetMpShared:
"""The lazy Manager creation and its graceful-degradation arm (issue #339)."""

def test_happy_path_returns_triple(self):
# When the Manager can be created, a 3-tuple of shared objects is returned
# and the module globals are populated (byte-identical to previous behavior).
result = prog._get_mp_shared()
assert isinstance(result, tuple)
assert len(result) == 3
shared_max_progress, shared_value_lock, print_lock = result
assert shared_max_progress is not None
assert prog._MP_MANAGER is not None
assert prog._MP_MANAGER_REFCOUNT == 1
# The returned triple mirrors the stored globals.
assert shared_max_progress is prog._MP_SHARED_MAX_PROGRESS
assert shared_value_lock is prog._MP_SHARED_VALUE_LOCK
assert print_lock is prog._MP_PRINT_LOCK
prog._cleanup_mp_manager()

def test_manager_eoferror_returns_none(self, monkeypatch):
# Simulate the issue-#339 failure: Manager() raises EOFError. The function
# must return None WITHOUT raising.
def _raise_eof(*args, **kwargs):
raise EOFError("simulated Manager pipe EOF")

monkeypatch.setattr(prog.mp, "Manager", _raise_eof)
assert prog._get_mp_shared() is None

def test_manager_oserror_returns_none(self, monkeypatch):
# The other realistic failure class (OSError) also degrades, not aborts.
def _raise_os(*args, **kwargs):
raise OSError("simulated Manager spawn failure")

monkeypatch.setattr(prog.mp, "Manager", _raise_os)
assert prog._get_mp_shared() is None

def test_failure_leaves_globals_clean(self, monkeypatch):
# A failed attempt must not partially-initialize globals or bump the refcount.
def _raise_eof(*args, **kwargs):
raise EOFError("simulated Manager pipe EOF")

monkeypatch.setattr(prog.mp, "Manager", _raise_eof)
prog._get_mp_shared()
assert prog._MP_MANAGER is None
assert prog._MP_SHARED_MAX_PROGRESS is None
assert prog._MP_SHARED_VALUE_LOCK is None
assert prog._MP_PRINT_LOCK is None
assert prog._MP_MANAGER_REFCOUNT == 0

def test_failure_then_cleanup_is_safe(self, monkeypatch):
# Cleanup after a never-created Manager must be a no-op (guards on None).
def _raise_eof(*args, **kwargs):
raise EOFError("simulated Manager pipe EOF")

monkeypatch.setattr(prog.mp, "Manager", _raise_eof)
assert prog._get_mp_shared() is None
# Must not raise even though no Manager was ever created.
prog._cleanup_mp_manager()
assert prog._MP_MANAGER is None

def test_partial_allocation_failure_returns_none(self, monkeypatch):
# Manager() succeeds but a later .Value/.Lock allocation fails: still None,
# globals stay clean, and the half-started Manager is shut down (no leak).
class _FakeManager:
def __init__(self):
self.shutdown_called = False

def Value(self, *args, **kwargs):
raise OSError("simulated allocation failure")

def Lock(self): # pragma: no cover - not reached (Value fails first)
raise AssertionError("Lock should not be reached")

def shutdown(self):
self.shutdown_called = True

created = {}

def _fake_manager_factory(*args, **kwargs):
m = _FakeManager()
created["m"] = m
return m

monkeypatch.setattr(prog.mp, "Manager", _fake_manager_factory)
assert prog._get_mp_shared() is None
assert prog._MP_MANAGER is None
assert prog._MP_MANAGER_REFCOUNT == 0
assert created["m"].shutdown_called is True


class TestResolveShared:
"""``_resolve_shared`` priority: explicit > multiprocessing > thread defaults."""

def test_explicit_objects_passthrough(self):
# Explicitly passed shared objects are returned unchanged.
smp, svl, pl = object(), object(), object()
out = prog._resolve_shared(shared_max_progress=smp, shared_value_lock=svl,
print_lock=pl, prefer_multiprocessing=True)
assert out == (smp, svl, pl)

def test_prefer_multiprocessing_uses_manager(self):
# prefer_multiprocessing=True in the main process returns the Manager triple.
out = prog._resolve_shared(prefer_multiprocessing=True)
assert isinstance(out, tuple) and len(out) == 3
assert out == (prog._MP_SHARED_MAX_PROGRESS, prog._MP_SHARED_VALUE_LOCK, prog._MP_PRINT_LOCK)
prog._cleanup_mp_manager()

def test_prefer_multiprocessing_falls_back_to_defaults_on_failure(self, monkeypatch):
# When the Manager cannot be created, _resolve_shared returns the
# thread-safe DEFAULT_SHARED_* triple (the core degradation contract).
def _raise_eof(*args, **kwargs):
raise EOFError("simulated Manager pipe EOF")

monkeypatch.setattr(prog.mp, "Manager", _raise_eof)
out = prog._resolve_shared(prefer_multiprocessing=True)
assert out == (prog.DEFAULT_SHARED_MAX_PROGRESS,
prog.DEFAULT_SHARED_VALUE_LOCK,
prog.DEFAULT_PRINT_LOCK)

def test_default_when_not_preferring_multiprocessing(self):
# Without prefer_multiprocessing, the thread-safe defaults are used.
out = prog._resolve_shared(prefer_multiprocessing=False)
assert out == (prog.DEFAULT_SHARED_MAX_PROGRESS,
prog.DEFAULT_SHARED_VALUE_LOCK,
prog.DEFAULT_PRINT_LOCK)


class TestParallelRunDegrades:
"""End-to-end: CPP.run(n_jobs=2) still completes when the Manager fails."""

def test_cpp_run_completes_when_manager_unavailable(self, monkeypatch):
# Simulate the non-notebook context: force the Manager to raise, then run
# the parallel CPP path. It must complete (degrading to thread-safe
# progress) instead of crashing with EOFError.
import warnings

def _raise_eof(*args, **kwargs):
raise EOFError("simulated Manager pipe EOF")

monkeypatch.setattr(prog.mp, "Manager", _raise_eof)
df_seq = aa.load_dataset(name="DOM_GSEC", n=8)
labels = df_seq["label"].to_list()
df_parts = aa.SequenceFeature().get_df_parts(df_seq=df_seq)
df_scales = aa.load_scales(top60_n=20).T.head(8).T
with warnings.catch_warnings():
warnings.simplefilter("ignore")
df_feat = aa.CPP(df_parts=df_parts, df_scales=df_scales,
verbose=True).run(labels=labels, n_filter=10, n_jobs=2)
assert len(df_feat) == 10
Loading