diff --git a/aaanalysis/feature_engineering/_backend/cpp/_filters/_assign.py b/aaanalysis/feature_engineering/_backend/cpp/_filters/_assign.py index d777d22b..6fc3d6b2 100644 --- a/aaanalysis/feature_engineering/_backend/cpp/_filters/_assign.py +++ b/aaanalysis/feature_engineering/_backend/cpp/_filters/_assign.py @@ -25,6 +25,7 @@ import aaanalysis.utils as ut from ._progress import ( _resolve_shared, + _worker_shared, _reset_progress, _cleanup_mp_manager, ) @@ -244,10 +245,15 @@ def _mp_scale_assignment(scales_chunk, shared_max_progress, shared_value_lock, p prefer_multiprocessing=False, ) + # Manager proxies pickle to loky workers unchanged; the thread-safe fallback + # (Manager unavailable) is passed as None so each worker uses its own defaults. + w_max_progress, w_value_lock, w_print_lock = _worker_shared( + shared_max_progress, shared_value_lock, print_lock + ) scale_chunks = np.array_split(list(dict_all_scales.keys()), n_jobs) with Parallel(n_jobs=n_jobs) as parallel: results = parallel( - delayed(_mp_scale_assignment)(chunk, shared_max_progress, shared_value_lock, print_lock) + delayed(_mp_scale_assignment)(chunk, w_max_progress, w_value_lock, w_print_lock) for chunk in scale_chunks ) diff --git a/aaanalysis/feature_engineering/_backend/cpp/_filters/_progress.py b/aaanalysis/feature_engineering/_backend/cpp/_filters/_progress.py index 764fa421..832b1fc9 100644 --- a/aaanalysis/feature_engineering/_backend/cpp/_filters/_progress.py +++ b/aaanalysis/feature_engineering/_backend/cpp/_filters/_progress.py @@ -12,6 +12,16 @@ "An attempt has been made to start a new process before the current process has finished..." - Create multiprocessing shared objects lazily (only when needed) and only from the main process. - Always allow passing shared_* objects explicitly to support true cross-process progress updates. + +Graceful-degradation contract: +- The multiprocessing.Manager only backs a *cosmetic* cross-process progress bar. + In some non-interactive contexts (``python -c``, heredocs, certain subprocess + shells) the Manager's spawn/pipe handshake fails with EOFError / OSError. + Manager creation is therefore best-effort: on failure ``_get_mp_shared()`` + returns None (leaving all module globals untouched) and ``_resolve_shared()`` + falls back to the thread-safe ``DEFAULT_SHARED_*`` objects, so the run still + completes single-process instead of aborting. When the Manager succeeds, + behavior is unchanged. """ import threading import multiprocessing as mp @@ -50,9 +60,22 @@ def _get_mp_shared(): """ Lazily create a multiprocessing.Manager + shared objects. + Returns a ``(shared_max_progress, shared_value_lock, print_lock)`` triple, or + None when Manager-backed shared state is unavailable. None is returned when: + - called from a spawned worker (must never start a Manager there), or + - the Manager could not be created (best-effort; see below). + IMPORTANT: - Must only be called from the main process. - Never called at import time. + + Best-effort Manager creation: a ``multiprocessing.Manager`` spawns a helper + process and talks to it over a pipe. In some non-interactive contexts + (``python -c``, heredocs, certain subprocess shells) that handshake fails with + EOFError / OSError. Since the Manager only backs a cosmetic cross-process + progress bar, such a failure is non-fatal: we return None (leaving every module + global untouched and the refcount unbumped) and the caller degrades to the + thread-safe ``DEFAULT_SHARED_*`` objects via ``_resolve_shared()``. """ global _MP_MANAGER, _MP_SHARED_MAX_PROGRESS, _MP_SHARED_VALUE_LOCK, _MP_PRINT_LOCK, _MP_MANAGER_REFCOUNT @@ -61,10 +84,31 @@ def _get_mp_shared(): return None if _MP_MANAGER is None: - _MP_MANAGER = mp.Manager() - _MP_SHARED_MAX_PROGRESS = _MP_MANAGER.Value("d", 0.0) - _MP_SHARED_VALUE_LOCK = _MP_MANAGER.Lock() - _MP_PRINT_LOCK = _MP_MANAGER.Lock() + # Build into locals first, commit to globals only on full success, so a + # partial failure never leaves half-initialized globals or a bumped + # refcount. The broad ``except`` is intentional and scoped to just this + # creation block: the Manager only powers a cosmetic progress bar, so any + # spawn/handshake failure (EOFError / OSError and friends) must degrade to + # the thread-safe defaults instead of aborting the run. + manager = None + try: + manager = mp.Manager() + shared_max_progress = manager.Value("d", 0.0) + shared_value_lock = manager.Lock() + print_lock = manager.Lock() + except Exception: + # Shut down a half-started Manager process (if any) so it does not leak, + # then signal "no shared state" -> caller uses DEFAULT_SHARED_*. + if manager is not None: + try: + manager.shutdown() + except Exception: + pass + return None + _MP_MANAGER = manager + _MP_SHARED_MAX_PROGRESS = shared_max_progress + _MP_SHARED_VALUE_LOCK = shared_value_lock + _MP_PRINT_LOCK = print_lock _MP_MANAGER_REFCOUNT = 0 _MP_MANAGER_REFCOUNT += 1 @@ -119,6 +163,35 @@ def _resolve_shared(shared_max_progress=None, shared_value_lock=None, print_lock return DEFAULT_SHARED_MAX_PROGRESS, DEFAULT_SHARED_VALUE_LOCK, DEFAULT_PRINT_LOCK +def _worker_shared(shared_max_progress, shared_value_lock, print_lock): + """ + Adapt resolved progress objects for handing to spawned *process* workers. + + joblib's process backends (loky) pickle whatever is captured in each + ``delayed(...)`` call. Manager-backed proxies pickle fine and stay shared + across processes, so they are returned unchanged (unified progress bar, + byte-identical to the Manager-available path). The thread-safe + ``DEFAULT_SHARED_*`` fallbacks (used when the Manager could not be created; + see ``_get_mp_shared``) are NOT picklable to spawned processes: a + ``threading.Lock`` cannot cross the process boundary. In that degraded case + return ``(None, None, None)`` so each worker self-resolves its own + process-local defaults instead of aborting the whole run with a + ``PicklingError``. Progress then reports per-worker (cosmetic degradation), + but the computation still completes. + + Only needed at process-backed dispatch sites; threading-backed parallelism + shares the objects in-process and does not pickle them. + """ + is_thread_default = ( + shared_max_progress is DEFAULT_SHARED_MAX_PROGRESS + or shared_value_lock is DEFAULT_SHARED_VALUE_LOCK + or print_lock is DEFAULT_PRINT_LOCK + ) + if is_thread_default: + return None, None, None + return shared_max_progress, shared_value_lock, print_lock + + def _reset_progress(shared_max_progress, shared_value_lock): """Reset shared progress to 0 in a safe way.""" with shared_value_lock: diff --git a/aaanalysis/feature_engineering/_backend/cpp/_filters/_stat_filter.py b/aaanalysis/feature_engineering/_backend/cpp/_filters/_stat_filter.py index a41bece3..d3185f4b 100644 --- a/aaanalysis/feature_engineering/_backend/cpp/_filters/_stat_filter.py +++ b/aaanalysis/feature_engineering/_backend/cpp/_filters/_stat_filter.py @@ -19,6 +19,7 @@ from .._split import SplitRange from ._progress import ( _resolve_shared, + _worker_shared, _reset_progress, _cleanup_mp_manager, ) @@ -337,10 +338,15 @@ def _mp_pre_filtering_info(scale_idx_chunk, shared_max_progress, shared_value_lo prefer_multiprocessing=False, ) + # Manager proxies pickle to loky workers unchanged; the thread-safe fallback + # (Manager unavailable) is passed as None so each worker uses its own defaults. + w_max_progress, w_value_lock, w_print_lock = _worker_shared( + shared_max_progress, shared_value_lock, print_lock + ) scale_idx_chunks = np.array_split(np.arange(len(list_scales)), n_jobs) with Parallel(n_jobs=n_jobs) as parallel: results = parallel( - delayed(_mp_pre_filtering_info)(chunk, shared_max_progress, shared_value_lock, print_lock) + delayed(_mp_pre_filtering_info)(chunk, w_max_progress, w_value_lock, w_print_lock) for chunk in scale_idx_chunks ) diff --git a/docs/source/index/release_notes.rst b/docs/source/index/release_notes.rst index ed7d1583..1350a4fd 100644 --- a/docs/source/index/release_notes.rst +++ b/docs/source/index/release_notes.rst @@ -363,6 +363,17 @@ Changed full-path import such as ``from aaanalysis.protein_design import SeqMut`` must become ``from aaanalysis.protein_engineering import SeqMut``. +Fixed +~~~~~ + +- :meth:`~aaanalysis.CPP.run` with ``n_jobs > 1`` no longer crashes in non-interactive + contexts (e.g. ``python -c``, heredocs, some subprocess shells) where starting a + ``multiprocessing.Manager`` for the cross-process progress bar raised ``EOFError`` / + ``OSError``. The Manager is now created best-effort: on failure CPP degrades to the + thread-safe, single-process progress path and the run completes normally instead of + aborting (previously the only workaround was ``n_jobs=1``). When the Manager is + available, behavior and output are unchanged. + Version 1.0 (Stable Version) -------------------------------- diff --git a/tests/unit/cpp_tests/test_progress_backend.py b/tests/unit/cpp_tests/test_progress_backend.py new file mode 100644 index 00000000..7336f89d --- /dev/null +++ b/tests/unit/cpp_tests/test_progress_backend.py @@ -0,0 +1,193 @@ +"""This is a script to test the shared-progress / multiprocessing backend of CPP +(``_backend/cpp/_filters/_progress.py``): the lazy Manager creation, its +graceful degradation to the thread-safe defaults when the Manager cannot be +spawned (issue #339 — EOFError in ``python -c`` / heredoc / subprocess shells), +and the resolve/cleanup helpers. + +Driven directly at the backend because the Manager-failure arm is not reachable +from the CPP frontend (it depends on the interpreter's process-launch context). +""" +import multiprocessing as mp + +import pytest + +import aaanalysis as aa +import aaanalysis.feature_engineering._backend.cpp._filters._progress as prog + +aa.options["verbose"] = False + + +# I Helper Functions +def _reset_module_globals(): + """Restore the module's lazy-Manager globals to their pristine (import) state.""" + # Best-effort shutdown of any live Manager left by a previous test. + if prog._MP_MANAGER is not None: + try: + prog._MP_MANAGER.shutdown() + except Exception: + pass + prog._MP_MANAGER = None + prog._MP_SHARED_MAX_PROGRESS = None + prog._MP_SHARED_VALUE_LOCK = None + prog._MP_PRINT_LOCK = None + prog._MP_MANAGER_REFCOUNT = 0 + + +@pytest.fixture(autouse=True) +def _clean_progress_globals(): + """Ensure each test starts and ends with pristine module globals.""" + _reset_module_globals() + yield + _reset_module_globals() + + +# II Test Classes +class TestGetMpShared: + """The lazy Manager creation and its graceful-degradation arm (issue #339).""" + + def test_happy_path_returns_triple(self): + # When the Manager can be created, a 3-tuple of shared objects is returned + # and the module globals are populated (byte-identical to previous behavior). + result = prog._get_mp_shared() + assert isinstance(result, tuple) + assert len(result) == 3 + shared_max_progress, shared_value_lock, print_lock = result + assert shared_max_progress is not None + assert prog._MP_MANAGER is not None + assert prog._MP_MANAGER_REFCOUNT == 1 + # The returned triple mirrors the stored globals. + assert shared_max_progress is prog._MP_SHARED_MAX_PROGRESS + assert shared_value_lock is prog._MP_SHARED_VALUE_LOCK + assert print_lock is prog._MP_PRINT_LOCK + prog._cleanup_mp_manager() + + def test_manager_eoferror_returns_none(self, monkeypatch): + # Simulate the issue-#339 failure: Manager() raises EOFError. The function + # must return None WITHOUT raising. + def _raise_eof(*args, **kwargs): + raise EOFError("simulated Manager pipe EOF") + + monkeypatch.setattr(prog.mp, "Manager", _raise_eof) + assert prog._get_mp_shared() is None + + def test_manager_oserror_returns_none(self, monkeypatch): + # The other realistic failure class (OSError) also degrades, not aborts. + def _raise_os(*args, **kwargs): + raise OSError("simulated Manager spawn failure") + + monkeypatch.setattr(prog.mp, "Manager", _raise_os) + assert prog._get_mp_shared() is None + + def test_failure_leaves_globals_clean(self, monkeypatch): + # A failed attempt must not partially-initialize globals or bump the refcount. + def _raise_eof(*args, **kwargs): + raise EOFError("simulated Manager pipe EOF") + + monkeypatch.setattr(prog.mp, "Manager", _raise_eof) + prog._get_mp_shared() + assert prog._MP_MANAGER is None + assert prog._MP_SHARED_MAX_PROGRESS is None + assert prog._MP_SHARED_VALUE_LOCK is None + assert prog._MP_PRINT_LOCK is None + assert prog._MP_MANAGER_REFCOUNT == 0 + + def test_failure_then_cleanup_is_safe(self, monkeypatch): + # Cleanup after a never-created Manager must be a no-op (guards on None). + def _raise_eof(*args, **kwargs): + raise EOFError("simulated Manager pipe EOF") + + monkeypatch.setattr(prog.mp, "Manager", _raise_eof) + assert prog._get_mp_shared() is None + # Must not raise even though no Manager was ever created. + prog._cleanup_mp_manager() + assert prog._MP_MANAGER is None + + def test_partial_allocation_failure_returns_none(self, monkeypatch): + # Manager() succeeds but a later .Value/.Lock allocation fails: still None, + # globals stay clean, and the half-started Manager is shut down (no leak). + class _FakeManager: + def __init__(self): + self.shutdown_called = False + + def Value(self, *args, **kwargs): + raise OSError("simulated allocation failure") + + def Lock(self): # pragma: no cover - not reached (Value fails first) + raise AssertionError("Lock should not be reached") + + def shutdown(self): + self.shutdown_called = True + + created = {} + + def _fake_manager_factory(*args, **kwargs): + m = _FakeManager() + created["m"] = m + return m + + monkeypatch.setattr(prog.mp, "Manager", _fake_manager_factory) + assert prog._get_mp_shared() is None + assert prog._MP_MANAGER is None + assert prog._MP_MANAGER_REFCOUNT == 0 + assert created["m"].shutdown_called is True + + +class TestResolveShared: + """``_resolve_shared`` priority: explicit > multiprocessing > thread defaults.""" + + def test_explicit_objects_passthrough(self): + # Explicitly passed shared objects are returned unchanged. + smp, svl, pl = object(), object(), object() + out = prog._resolve_shared(shared_max_progress=smp, shared_value_lock=svl, + print_lock=pl, prefer_multiprocessing=True) + assert out == (smp, svl, pl) + + def test_prefer_multiprocessing_uses_manager(self): + # prefer_multiprocessing=True in the main process returns the Manager triple. + out = prog._resolve_shared(prefer_multiprocessing=True) + assert isinstance(out, tuple) and len(out) == 3 + assert out == (prog._MP_SHARED_MAX_PROGRESS, prog._MP_SHARED_VALUE_LOCK, prog._MP_PRINT_LOCK) + prog._cleanup_mp_manager() + + def test_prefer_multiprocessing_falls_back_to_defaults_on_failure(self, monkeypatch): + # When the Manager cannot be created, _resolve_shared returns the + # thread-safe DEFAULT_SHARED_* triple (the core degradation contract). + def _raise_eof(*args, **kwargs): + raise EOFError("simulated Manager pipe EOF") + + monkeypatch.setattr(prog.mp, "Manager", _raise_eof) + out = prog._resolve_shared(prefer_multiprocessing=True) + assert out == (prog.DEFAULT_SHARED_MAX_PROGRESS, + prog.DEFAULT_SHARED_VALUE_LOCK, + prog.DEFAULT_PRINT_LOCK) + + def test_default_when_not_preferring_multiprocessing(self): + # Without prefer_multiprocessing, the thread-safe defaults are used. + out = prog._resolve_shared(prefer_multiprocessing=False) + assert out == (prog.DEFAULT_SHARED_MAX_PROGRESS, + prog.DEFAULT_SHARED_VALUE_LOCK, + prog.DEFAULT_PRINT_LOCK) + + +class TestParallelRunDegrades: + """End-to-end: CPP.run(n_jobs=2) still completes when the Manager fails.""" + + def test_cpp_run_completes_when_manager_unavailable(self, monkeypatch): + # Simulate the non-notebook context: force the Manager to raise, then run + # the parallel CPP path. It must complete (degrading to thread-safe + # progress) instead of crashing with EOFError. + import warnings + + def _raise_eof(*args, **kwargs): + raise EOFError("simulated Manager pipe EOF") + + monkeypatch.setattr(prog.mp, "Manager", _raise_eof) + df_seq = aa.load_dataset(name="DOM_GSEC", n=8) + labels = df_seq["label"].to_list() + df_parts = aa.SequenceFeature().get_df_parts(df_seq=df_seq) + df_scales = aa.load_scales(top60_n=20).T.head(8).T + with warnings.catch_warnings(): + warnings.simplefilter("ignore") + df_feat = aa.CPP(df_parts=df_parts, df_scales=df_scales, + verbose=True).run(labels=labels, n_filter=10, n_jobs=2) + assert len(df_feat) == 10