From dbfd159f2d041459f51b9e1131259c63e5e50f03 Mon Sep 17 00:00:00 2001 From: eyal Date: Tue, 28 Apr 2026 14:30:38 -0700 Subject: [PATCH 1/2] Improve resilience by adding a uwsgi-style process manager --- pysoa/server/server.py | 76 ++- pysoa/server/standalone.py | 155 ++++-- .../test_server/test_handle_next_request.py | 191 +++++++ tests/unit/server/test_standalone.py | 476 ++++++++++++++++-- 4 files changed, 814 insertions(+), 84 deletions(-) diff --git a/pysoa/server/server.py b/pysoa/server/server.py index c4084d4f..0287292d 100644 --- a/pysoa/server/server.py +++ b/pysoa/server/server.py @@ -240,6 +240,20 @@ class DictWrapper(RecursivelyCensoredDictWrapper): self._skip_django_database_cleanup = False + # Set by main() when the server is started as a forked child process. These are shared-memory + # cells written here and read by the parent _ProcessMonitor to detect hung workers. + self._ping_timestamp = None # type: Any + + def _ping_parent(self): + # type: () -> None + """ + Update the shared-memory timestamp that the parent _ProcessMonitor polls to detect hung workers. + Called whenever the child makes meaningful progress: request received, request complete, or idle. + No-op when not running as a forked child (i.e. when _ping_timestamp is None). + """ + if self._ping_timestamp is not None: + self._ping_timestamp.value = time.monotonic() + def handle_next_request(self): # type: () -> None """ Retrieves the next request from the transport, or returns if it times out (no request has been made), and then @@ -258,14 +272,17 @@ def handle_next_request(self): # type: () -> None self.logger.warning('Thought to be impossible, but the transport returned None') raise MessageReceiveTimeout() except MessageReceiveTimeout: - # no new message, nothing to do + # no new message, nothing to do — ping the parent to confirm we are alive and idle + self._ping_parent() self._idle_timer.stop() self.perform_idle_actions() self._set_busy_metrics(False) self._idle_timer.start() return - # We are no longer idle, so stop the timer, reset for the next idle period, and indicate busy in the gauges + # We are no longer idle, so stop the timer, reset for the next idle period, and indicate busy in the gauges. + # Ping the parent to reset the watchdog clock now that a request has been received. + self._ping_parent() self._idle_timer.stop() self._idle_timer = None self._set_busy_metrics(True) @@ -358,6 +375,8 @@ def attr_filter(attrib, _value): # type: (attr.Attribute, Any) -> bool PySOALogContextFilter.clear_logging_request_context() self.perform_post_request_actions() self._set_busy_metrics(False) + # Ping the parent to signal that request processing is complete and reset the watchdog clock. + self._ping_parent() def make_client(self, context, extra_context=None, **kwargs): # type: (Context, Optional[Context], **Any) -> Client @@ -960,7 +979,42 @@ def initialize(cls, settings): # type: (ServerSettings) -> Type[Server] return cls @classmethod - def main(cls, forked_process_id=None): # type: (Optional[int]) -> None + def _validate_receive_timeout_vs_ping_timeout(cls, settings, ping_timeout): + # type: (Any, int) -> None + """ + Raise ValueError if the transport's receive_timeout_in_seconds exceeds half the ping watchdog + timeout. If receive_timeout_in_seconds > ping_timeout/2 the child could stay blocked in a + single BLPOP call for longer than the watchdog allows, causing spurious kills of healthy workers. + + Only checked when running in forked mode (ping_timeout is not None). + Silently skipped for non-Redis transports. + """ + transport_kwargs = settings['transport'].get('kwargs', {}) + receive_timeout = transport_kwargs.get('receive_timeout_in_seconds') + if receive_timeout is None: + # The setting wasn't overridden; fall back to the Redis transport's built-in default so + # the check still catches mismatched ping-timeout values on default configurations. + try: + from pysoa.common.transport.redis_gateway.core import RedisTransportCore # noqa: local import + receive_timeout = attr.fields(RedisTransportCore).receive_timeout_in_seconds.default + except (ImportError, attr.exceptions.NotAnAttrsClassError): + return # non-Redis transport; nothing to validate + if receive_timeout > ping_timeout / 2: + raise ValueError( + 'transport.kwargs.receive_timeout_in_seconds ({rt}s) must be at most half the ' + '--ping-timeout ({pt}s), otherwise the ping watchdog may not detect hung workers ' + 'reliably. Lower receive_timeout_in_seconds to {max}s or raise --ping-timeout to ' + 'at least {min_pt}s.'.format( + rt=receive_timeout, + pt=ping_timeout, + max=ping_timeout // 2, + min_pt=receive_timeout * 2, + ) + ) + + @classmethod + def main(cls, forked_process_id=None, _ping_timestamp=None): + # type: (Optional[int], Any) -> None """ Command-line entry point for running a PySOA server. The chain of method calls is as follows:: @@ -1010,6 +1064,14 @@ def main(cls, forked_process_id=None): # type: (Optional[int]) -> None help='The settings module to use', required=True, ) + # Read the parent's ping-timeout so we can validate the transport receive timeout against it. + # parse_known_args is used throughout, so unknown arguments (e.g. --fork) are silently ignored. + parser.add_argument( + '--ping-timeout', + type=int, + default=None, + help=argparse.SUPPRESS, # internal; set by standalone.py + ) cmd_options, _ = parser.parse_known_args(sys.argv[1:]) # Load settings from the given file (or use Django and grab from its settings) @@ -1044,6 +1106,11 @@ def main(cls, forked_process_id=None): # type: (Optional[int]) -> None if not cls.service_name: raise AttributeError('Server subclass must set service_name') + # When running as a forked child, validate that the transport's receive timeout is short enough + # for the parent's ping watchdog to reliably detect hangs. + if forked_process_id is not None and cmd_options.ping_timeout is not None: + cls._validate_receive_timeout_vs_ping_timeout(settings, cmd_options.ping_timeout) + PySOALogContextFilter.set_service_name(cls.service_name) # Set up logging @@ -1059,5 +1126,8 @@ def main(cls, forked_process_id=None): # type: (Optional[int]) -> None # Set up server and signal handling server = cls.initialize(settings)(settings, forked_process_id) # type: Server + # Wire up the shared-memory ping timestamp supplied by the parent _ProcessMonitor (if any). + server._ping_timestamp = _ping_timestamp + # Start server event loop server.run() diff --git a/pysoa/server/standalone.py b/pysoa/server/standalone.py index 13588b39..d77f3d36 100644 --- a/pysoa/server/standalone.py +++ b/pysoa/server/standalone.py @@ -66,10 +66,11 @@ def _get_arg_parser(): # type: () -> argparse.ArgumentParser parser = argparse.ArgumentParser() parser.add_argument( '-f', '--fork-processes', '--fork', - help='The number of processes to fork (if 0, 1, or none, no process is forked; the server is run directly)', + help='The number of worker processes to fork (defaults to 1; use 0 to run the server directly in the current ' + 'process without forking, which is not recommended for production)', required=False, type=int, - default=0, + default=1, ) parser.add_argument( '--no-respawn', @@ -80,6 +81,23 @@ def _get_arg_parser(): # type: () -> argparse.ArgumentParser action='store_true', default=False, ) + parser.add_argument( + '--process-shutdown-timeout', + help='When shutting down, the number of seconds to wait for a worker process to exit after sending SIGTERM ' + 'before escalating to SIGKILL (default: 30)', + required=False, + type=int, + default=30, + ) + parser.add_argument( + '--ping-timeout', + help='The number of seconds a worker process may spend processing a single request before the parent kills ' + 'it with SIGKILL. The child process pings the parent when it starts and finishes each request; if the ' + 'parent does not receive a ping within this timeout, it assumes the child is hung (default: 10)', + required=False, + type=int, + default=10, + ) parser.add_argument( '--use-file-watcher', help='If specified, PySOA will watch service files for changes and restart the service automatically. If no ' @@ -108,11 +126,32 @@ class _ProcessMonitor(threading.Thread): A helper thread that manages starting, monitoring, terminating and, upon premature termination, restarting of forked child server processes. """ - def __init__(self, index, signal_context, respawn, **kwargs): # type: (int, _SignalContext, bool, Any) -> None + _POLL_INTERVAL = 1.0 # seconds between join() polls during normal operation + + def __init__( + self, + index, # type: int + signal_context, # type: _SignalContext + respawn, # type: bool + shutdown_timeout, # type: int + ping_timeout, # type: int + **kwargs # type: Any + ): # type: (...) -> None self.index = index self.signal_context = signal_context self.respawn = respawn - self.process_kwargs = kwargs + self.shutdown_timeout = shutdown_timeout + self.ping_timeout = ping_timeout + + # Shared-memory timestamp written by the child on every meaningful event + # (request received, request complete, idle timeout) and read by this thread + # to detect workers that are hung mid-request. + self._ping_timestamp = multiprocessing.Value('d', time.monotonic()) + + # Inject the timestamp into the positional args forwarded to server_class.main(). + existing_args = kwargs.pop('args', ()) + self.process_kwargs = dict(kwargs, args=existing_args + (self._ping_timestamp,)) + self.process = None # type: Optional[multiprocessing.Process] self.one_minute_restart_times = collections.deque(maxlen=8) # type: Deque[float] self.fifteen_second_restart_times = collections.deque(maxlen=3) # type: Deque[float] @@ -127,53 +166,95 @@ def terminate(self): # type: () -> None self.process.terminate() def _start_process(self): # type: () -> None + # Refresh the ping timestamp before starting a new child so the replacement is + # not immediately killed due to a stale timestamp left by a crashed/killed worker. + self._ping_timestamp.value = time.monotonic() self.process = multiprocessing.Process(**self.process_kwargs) self.process.start() + def _kill_process(self, reason=None): # type: (Optional[str]) -> None + """Forcefully kill the child process with SIGKILL and wait for it to exit.""" + if not self.process: + return + if reason is None: + reason = 'did not exit within {} seconds of SIGTERM'.format(self.shutdown_timeout) + sys.stdout.write( + 'Server process #{} {}; sending SIGKILL.\n'.format(self.index, reason) + ) + sys.stdout.flush() + try: + os.kill(self.process.pid, signal.SIGKILL) + except OSError: + pass # process may have exited between the is_alive() check and the kill + self.process.join() + def run(self): # type: () -> None self._start_process() assert self.process is not None - while not self.signal_context.signaled: - self.process.join() - time.sleep(0.01) - if self.signal_context.signaled or not self.respawn: + while True: + # Poll with a short timeout so we can react to shutdown signals promptly + # rather than blocking indefinitely in join(). + self.process.join(timeout=self._POLL_INTERVAL) + + if not self.process.is_alive(): + # Child exited on its own (normal exit, harakiri os._exit, or crash). + if self.signal_context.signaled or not self.respawn: + break + + t = time.time() + + if ( + len(self.one_minute_restart_times) == self.one_minute_restart_times.maxlen and + t - self.one_minute_restart_times[0] < 60 + ): + sys.stdout.write( + 'Server process #{} has crashed too many times ({}) in the last minute; ' + 'not respawning.\n'.format(self.index, self.one_minute_restart_times.maxlen), + ) + sys.stdout.flush() + break + elif ( + len(self.fifteen_second_restart_times) == self.fifteen_second_restart_times.maxlen and + t - self.fifteen_second_restart_times[0] < 15 + ): + sys.stdout.write( + 'Server process #{} has crashed too many times ({}) in the last 15 seconds; ' + 'not respawning.\n'.format(self.index, self.fifteen_second_restart_times.maxlen), + ) + sys.stdout.flush() + break + else: + sys.stdout.write('Re-spawning failed server process #{}\n'.format(self.index)) + sys.stdout.flush() + self.one_minute_restart_times.append(t) + self.fifteen_second_restart_times.append(t) + self._start_process() + + continue + + if self.signal_context.signaled: + # The parent received a shutdown signal and already sent SIGTERM via terminate(). + # Give the child a grace period to exit cleanly, then escalate to SIGKILL. + self.process.join(timeout=self.shutdown_timeout) + if self.process.is_alive(): + self._kill_process() break - t = time.time() - - if ( - len(self.one_minute_restart_times) == self.one_minute_restart_times.maxlen and - t - self.one_minute_restart_times[0] < 60 - ): - sys.stdout.write( - 'Server process #{} has crashed too many times ({}) in the last minute; ' - 'not respawning.\n'.format(self.index, self.one_minute_restart_times.maxlen), + # Check whether the child is hung. The child pings us (updates _ping_timestamp) + # on every meaningful event: request received, request complete, or idle timeout. + # If the timestamp is stale for longer than ping_timeout, the child is stuck. + if time.monotonic() - self._ping_timestamp.value > self.ping_timeout: + self._kill_process( + 'has not pinged in more than {} seconds'.format(self.ping_timeout) ) - sys.stdout.flush() - break - elif ( - len(self.fifteen_second_restart_times) == self.fifteen_second_restart_times.maxlen and - t - self.fifteen_second_restart_times[0] < 15 - ): - sys.stdout.write( - 'Server process #{} has crashed too many times ({}) in the last 15 seconds; ' - 'not respawning.\n'.format(self.index, self.fifteen_second_restart_times.maxlen), - ) - sys.stdout.flush() - break - else: - sys.stdout.write('Re-spawning failed server process #{}\n'.format(self.index)) - sys.stdout.flush() - self.one_minute_restart_times.append(t) - self.fifteen_second_restart_times.append(t) - self._start_process() + # The process is now dead; loop back so is_alive() triggers the respawn logic above. self.process = None def _run_server(args, server_class): # type: (argparse.Namespace, Type[Server]) -> None - if args.fork_processes > 1: + if args.fork_processes >= 1: cpu_count = multiprocessing.cpu_count() num_processes = args.fork_processes max_processes = cpu_count * 5 @@ -222,6 +303,8 @@ def signaled(_signal_number, _stack_frame): index=i, signal_context=signal_context, respawn=not args.no_respawn, + shutdown_timeout=args.process_shutdown_timeout, + ping_timeout=args.ping_timeout, target=server_class.main, name='pysoa-worker-{}'.format(i), args=(i, ), diff --git a/tests/unit/server/test_server/test_handle_next_request.py b/tests/unit/server/test_server/test_handle_next_request.py index efbaed90..ab28b710 100644 --- a/tests/unit/server/test_server/test_handle_next_request.py +++ b/tests/unit/server/test_server/test_handle_next_request.py @@ -3,6 +3,8 @@ unicode_literals, ) +import multiprocessing +import time from typing import Mapping from unittest import TestCase @@ -10,9 +12,11 @@ import six from pysoa.common.transport.base import ServerTransport +from pysoa.common.transport.errors import MessageReceiveTimeout from pysoa.server.server import Server from pysoa.server.types import ActionType from pysoa.test import factories +from pysoa.test.compatibility import mock class HandleNextRequestServer(Server): @@ -38,6 +42,17 @@ def get_response(self): return self._response +@fields.ClassConfigurationSchema.provider(fields.Dictionary({})) +class TimeoutServerTransport(ServerTransport): + """Transport that always raises MessageReceiveTimeout (simulates an idle server).""" + + def receive_request_message(self): + raise MessageReceiveTimeout() + + def send_response_message(self, request_id, meta, body): + pass + + class TestProcessNextRequests(TestCase): def test_emtpy_request_returns_job_response_error(self): """ @@ -56,3 +71,179 @@ def test_emtpy_request_returns_job_response_error(self): errors = response['errors'] self.assertEqual(len(errors), 3) self.assertEqual({'actions', 'control', 'context'}, set([e.get('field', None) for e in errors])) + + +# --------------------------------------------------------------------------- +# Tests for _ping_parent and the ping calls inside handle_next_request +# --------------------------------------------------------------------------- + + +class TestServerPingParent(TestCase): + """Tests for the child-process heartbeat mechanism (_ping_parent).""" + + def _make_server(self): + settings = factories.ServerSettingsFactory() + return HandleNextRequestServer(settings=settings) + + def _make_server_with_ping(self): + """Return a server wired up with a real shared-memory ping timestamp.""" + server = self._make_server() + server._ping_timestamp = multiprocessing.Value('d', 0.0) + return server + + # ------------------------------------------------------------------ + # _ping_parent unit tests + # ------------------------------------------------------------------ + + def test_ping_parent_noop_when_no_shared_memory(self): + """_ping_parent must not raise when the server is not a forked child.""" + server = self._make_server() + self.assertIsNone(server._ping_timestamp) + # Should complete without error. + server._ping_parent() + + def test_ping_parent_updates_timestamp(self): + """_ping_parent writes a fresh monotonic timestamp.""" + server = self._make_server_with_ping() + + before = time.monotonic() + server._ping_parent() + after = time.monotonic() + + self.assertGreaterEqual(server._ping_timestamp.value, before) + self.assertLessEqual(server._ping_timestamp.value, after) + + def test_ping_parent_updates_timestamp_on_each_call(self): + """Each _ping_parent call must write a monotonically non-decreasing timestamp.""" + server = self._make_server_with_ping() + + server._ping_parent() + ts1 = server._ping_timestamp.value + + server._ping_parent() + ts2 = server._ping_timestamp.value + + self.assertGreaterEqual(ts2, ts1) + + # ------------------------------------------------------------------ + # Ping calls inside handle_next_request + # ------------------------------------------------------------------ + + def test_ping_on_receive_timeout(self): + """handle_next_request pings the parent on MessageReceiveTimeout (idle).""" + server = self._make_server_with_ping() + server.transport = TimeoutServerTransport(server.service_name) + + before = time.monotonic() + server.handle_next_request() + + self.assertGreaterEqual(server._ping_timestamp.value, before, + 'Server should have updated ping timestamp on idle timeout') + + def test_ping_when_request_received(self): + """handle_next_request pings the parent after a request is dequeued.""" + server = self._make_server_with_ping() + server.transport = SimplePassthroughServerTransport(server.service_name) + server.transport.set_request({}) + + ping_calls = [] + original_ping = server._ping_parent + + def _capture_ping(): + ping_calls.append(time.monotonic()) + original_ping() + + with mock.patch.object(server, '_ping_parent', side_effect=_capture_ping): + server.handle_next_request() + + # At least two pings: one on receive, one in the finally block. + self.assertGreaterEqual(len(ping_calls), 2, + 'Expected _ping_parent to be called at least twice per request') + + def test_ping_after_request_completes(self): + """handle_next_request pings the parent in the finally block after the request finishes.""" + server = self._make_server_with_ping() + server.transport = SimplePassthroughServerTransport(server.service_name) + server.transport.set_request({}) + + before = time.monotonic() + server.handle_next_request() + + self.assertGreaterEqual(server._ping_timestamp.value, before, + 'Server should have updated ping timestamp after the request finished') + + def test_ping_after_request_even_on_exception(self): + """The finally-block ping must fire even when request processing raises an exception.""" + server = self._make_server_with_ping() + server.transport = SimplePassthroughServerTransport(server.service_name) + server.transport.set_request({}) + + before = time.monotonic() + with mock.patch.object(server, 'process_job', side_effect=RuntimeError('boom')): + try: + server.handle_next_request() + except RuntimeError: + pass + + self.assertGreaterEqual(server._ping_timestamp.value, before, + 'Server must ping even when an exception escapes handle_next_request') + + +# --------------------------------------------------------------------------- +# Tests for the receive_timeout vs ping_timeout validation +# --------------------------------------------------------------------------- + + +class TestReceiveTimeoutValidation(TestCase): + """ + Verify that _validate_receive_timeout_vs_ping_timeout raises ValueError when + receive_timeout_in_seconds exceeds half the ping-timeout, which would make the + watchdog unreliable. + + We test the extracted classmethod directly to avoid mocking Server.main's + heavy infrastructure (settings loading, logging, server instantiation). + """ + + def _make_settings(self, receive_timeout): + """Return a minimal fake settings mapping with the given transport receive timeout.""" + kwargs = {} if receive_timeout is None else {'receive_timeout_in_seconds': receive_timeout} + return {'transport': {'object': SimplePassthroughServerTransport, 'kwargs': kwargs}} + + def test_raises_when_receive_timeout_exceeds_half_ping_timeout(self): + """receive_timeout_in_seconds > ping_timeout/2 must raise ValueError.""" + settings = self._make_settings(receive_timeout=8) + with self.assertRaises(ValueError) as ctx: + HandleNextRequestServer._validate_receive_timeout_vs_ping_timeout(settings, ping_timeout=10) + self.assertIn('receive_timeout_in_seconds', str(ctx.exception)) + self.assertIn('ping-timeout', str(ctx.exception)) + + def test_does_not_raise_when_receive_timeout_equals_half_ping_timeout(self): + """receive_timeout_in_seconds == ping_timeout/2 is exactly on the boundary and must not raise.""" + settings = self._make_settings(receive_timeout=5) + HandleNextRequestServer._validate_receive_timeout_vs_ping_timeout(settings, ping_timeout=10) + + def test_does_not_raise_when_receive_timeout_is_below_half_ping_timeout(self): + """receive_timeout_in_seconds < ping_timeout/2 must not raise.""" + settings = self._make_settings(receive_timeout=3) + HandleNextRequestServer._validate_receive_timeout_vs_ping_timeout(settings, ping_timeout=10) + + def test_uses_redis_default_when_timeout_not_in_kwargs(self): + """When receive_timeout_in_seconds is absent from kwargs the Redis default (5) is used.""" + # The Redis default is 5; with ping_timeout=8 the check is 5 > 4 → should raise. + settings = self._make_settings(receive_timeout=None) + # Inject a fake Redis transport to trigger the default-fallback branch. + from pysoa.common.transport.redis_gateway.server import RedisServerTransport + settings['transport']['object'] = RedisServerTransport + with self.assertRaises(ValueError): + HandleNextRequestServer._validate_receive_timeout_vs_ping_timeout(settings, ping_timeout=8) + + def test_skips_check_for_non_redis_transport_without_timeout(self): + """Non-Redis transports that don't expose receive_timeout_in_seconds are silently skipped.""" + settings = self._make_settings(receive_timeout=None) + # SimplePassthroughServerTransport is not a Redis transport; import should fail gracefully. + with mock.patch('pysoa.server.server.Server._validate_receive_timeout_vs_ping_timeout', + wraps=HandleNextRequestServer._validate_receive_timeout_vs_ping_timeout): + # Patch the Redis import to simulate a non-Redis environment. + with mock.patch.dict('sys.modules', {'pysoa.common.transport.redis_gateway.core': None}): + # Should not raise even though ping_timeout=1 would normally flag any timeout > 0.5. + HandleNextRequestServer._validate_receive_timeout_vs_ping_timeout(settings, ping_timeout=1) diff --git a/tests/unit/server/test_standalone.py b/tests/unit/server/test_standalone.py index 29d11aa3..3dc0bb47 100644 --- a/tests/unit/server/test_standalone.py +++ b/tests/unit/server/test_standalone.py @@ -3,10 +3,13 @@ unicode_literals, ) +import collections import datetime +import multiprocessing import os import signal import sys +import threading import time from types import ModuleType from typing import Optional @@ -67,10 +70,11 @@ def setUp(self): def tearDown(self): sys.argv = self.prev_argv - def test_no_arguments(self): + def test_no_arguments_explicit_no_fork(self): + """With --fork 0, the server runs directly in the current process (no subprocess).""" server_getter = mock.MagicMock() - sys.argv = ['/path/to/example_service/standalone.py'] + sys.argv = ['/path/to/example_service/standalone.py', '--fork', '0'] standalone.simple_main(server_getter) # type: ignore @@ -89,7 +93,7 @@ def test_only_file_watcher_argument_no_values(self, mock_get_reloader): self.assertFalse(server_getter.return_value.main.called) assert mock_get_reloader.call_count == 1 - assert mock_get_reloader.call_args_list[0][0][0] in ('', 'pytest', 'coverage') + assert mock_get_reloader.call_args_list[0][0][0] in ('', 'pytest', 'pytest.__main__', 'coverage') assert mock_get_reloader.call_args_list[0][0][1] is None assert mock_get_reloader.call_args_list[0][1]['signal_forks'] is False @@ -111,12 +115,12 @@ def test_only_file_watcher_argument_some_values(self, mock_get_reloader): self.assertFalse(server_getter.return_value.main.called) assert mock_get_reloader.call_count == 1 - assert mock_get_reloader.call_args_list[0][0][0] in ('', 'pytest', 'coverage') + assert mock_get_reloader.call_args_list[0][0][0] in ('', 'pytest', 'pytest.__main__', 'coverage') assert mock_get_reloader.call_args_list[0][0][1] == ['example', 'pysoa', 'conformity'] assert mock_get_reloader.call_args_list[0][1]['signal_forks'] is False self.assertEqual(1, mock_get_reloader.return_value.main.call_count) - self.assertEqual(0, mock_get_reloader.return_value.main.call_args_list[0][0][1][0].fork_processes) + self.assertEqual(1, mock_get_reloader.return_value.main.call_args_list[0][0][1][0].fork_processes) self.assertEqual( server_getter.return_value, mock_get_reloader.return_value.main.call_args_list[0][0][1][1], @@ -134,7 +138,7 @@ def test_file_watcher_argument_no_values_with_forking(self, mock_get_reloader): self.assertFalse(server_getter.return_value.main.called) assert mock_get_reloader.call_count == 1 - assert mock_get_reloader.call_args_list[0][0][0] in ('', 'pytest', 'coverage') + assert mock_get_reloader.call_args_list[0][0][0] in ('', 'pytest', 'pytest.__main__', 'coverage') assert mock_get_reloader.call_args_list[0][0][1] is None assert mock_get_reloader.call_args_list[0][1]['signal_forks'] is True @@ -157,7 +161,7 @@ def test_file_watcher_argument_some_values_with_forking(self, mock_get_reloader) self.assertFalse(server_getter.return_value.main.called) assert mock_get_reloader.call_count == 1 - assert mock_get_reloader.call_args_list[0][0][0] in ('', 'pytest', 'coverage') + assert mock_get_reloader.call_args_list[0][0][0] in ('', 'pytest', 'pytest.__main__', 'coverage') assert mock_get_reloader.call_args_list[0][0][1] == ['pysoa'] assert mock_get_reloader.call_args_list[0][1]['signal_forks'] is True @@ -184,6 +188,9 @@ def test_only_forking_not_limited(self, mock_cpu_count, mock_process): prev_sighup = signal.signal(signal.SIGHUP, signal.SIG_IGN) processes = [mock.MagicMock() for _ in range(0, 10)] + # Each mock process must report is_alive()=False so the polling loop can exit. + for p in processes: + p.is_alive.return_value = False mock_process.side_effect = processes standalone.simple_main(server_getter) # type: ignore @@ -196,6 +203,8 @@ def test_only_forking_not_limited(self, mock_cpu_count, mock_process): found_ids = [0 for _ in range(0, 11)] i = 1 for call in mock_process.call_args_list: + # args[0] is still the forked_process_id; additional elements are the + # shared-memory ping cells appended by _ProcessMonitor.__init__. id = call[1]['args'][0] self.assertEqual(server_getter.return_value.main, call[1]['target']) self.assertEqual('pysoa-worker-{}'.format(id), call[1]['name']) @@ -234,6 +243,8 @@ def test_only_forking_limited(self, mock_cpu_count, mock_process): prev_sighup = signal.signal(signal.SIGHUP, signal.SIG_IGN) processes = [mock.MagicMock() for _ in range(0, 5)] + for p in processes: + p.is_alive.return_value = False mock_process.side_effect = processes standalone.simple_main(server_getter) # type: ignore @@ -281,6 +292,17 @@ def _join_se(): @mock.patch('multiprocessing.Process') @mock.patch('multiprocessing.cpu_count') def test_forking_with_default_respawn(self, mock_cpu_count, mock_process): + """ + Verifies respawn rate-limiting behaviour: + + * Worker 2 (quick-dying): crashes 3 times within 15 s → respawn stops after 4 total. + * Worker 3 (slow-dying): crashes 8 times within 60 s → respawn stops after 9 total. + * Worker 1 (living): stays alive until SIGTERM is sent; only 1 instance created. + + With the new polling model a process is considered dead when is_alive() returns + False (not when join() returns). Time is advanced via freezegun ticks inside the + constructor side-effect so that crash-rate deque checks see the right timestamps. + """ server_getter = mock.MagicMock() mock_cpu_count.return_value = 2 @@ -298,6 +320,10 @@ def test_forking_with_default_respawn(self, mock_cpu_count, mock_process): slow_dying_processes = [] bad_processes = [] + # Shared flag: once the living worker fires SIGTERM, all living is_alive() + # calls return False so every monitor thread can exit cleanly. + living_should_die = False + def patched_freeze_time(): # TODO Until https://github.com/spulec/freezegun/issues/307 is fixed f = freezegun.freeze_time() @@ -308,40 +334,52 @@ def patched_freeze_time(): def tick_six_se(): frozen_time.tick(datetime.timedelta(seconds=6)) - def tick_twenty_se(): - frozen_time.tick(datetime.timedelta(seconds=20)) - - def signal_se(): - os.kill(os.getpid(), signal.SIGTERM) - time.sleep(0.3) - def se(target, name, args): + nonlocal living_should_die process = mock.MagicMock() + # culprit[2][0] is the forked_process_id; additional elements are + # the shared-memory ping cells injected by _ProcessMonitor.__init__. process.culprit = (target, name, args) - if args[0] == 1: - # If it's the first process, we want to actually live. This tests normal operation. + worker_id = args[0] + + if worker_id == 1: + # Living worker: stays alive until SIGTERM fires. living_processes.append(process) - if len(living_processes) == 6: - raise ValueError('Too many, too many!') - elif len(living_processes) == 5: - process.join.side_effect = signal_se - else: - process.join.side_effect = tick_twenty_se - if len(living_processes) == 1: - time.sleep(3) # sleep 3 seconds so that all of these happen after quick- and slow-dying - elif args[0] == 2: - # If it's the second process, we want to die quickly. This tests the 15-second respawn limit. + if len(living_processes) > 1: + raise ValueError('Living worker spawned more than once!') + + def signal_join(*a, **kw): + nonlocal living_should_die + os.kill(os.getpid(), signal.SIGTERM) + living_should_die = True + time.sleep(0.3) + + process.join.side_effect = signal_join + process.is_alive.side_effect = lambda: not living_should_die + + # Delay creation so quick- and slow-dying workers get a head start. + time.sleep(0.05) + + elif worker_id == 2: + # Quick-dying: crashes immediately, hits 15-second rate limit. quick_dying_processes.append(process) - # no sleep so that all of these happen before any ticks, before slow-dying and living - elif args[0] == 3: - # If it's the third process, we want to die slowly. This tests the 60-second respawn limit. + process.is_alive.return_value = False # dies on first poll + + elif worker_id == 3: + # Slow-dying: each crash advances frozen time by 6 s; the 8-crash + # 60-second window fills before 60 s elapses (8 × 6 = 48 s < 60 s). slow_dying_processes.append(process) - process.join.side_effect = tick_six_se + tick_six_se() # advance time when the process is *created* + process.is_alive.return_value = False # dies on first poll + + # Slight delay to ensure quick-dying finishes first. if len(slow_dying_processes) == 1: - time.sleep(1) # sleep 1 second so that all of these happen after quick-dying + time.sleep(0.02) + else: bad_processes.append((target, name, args)) - raise ValueError('Nope nope nope') + raise ValueError('Unexpected worker id {}'.format(worker_id)) + return process mock_process.side_effect = se @@ -352,35 +390,34 @@ def se(target, name, args): assert server_getter.return_value.main.called is False assert len(bad_processes) == 0 + + # 1 initial + 3 respawns before hitting 15-second crash limit. assert len(quick_dying_processes) == 4 + + # 1 initial + 8 respawns before hitting 60-second crash limit. assert len(slow_dying_processes) == 9 - assert len(living_processes) == 5 - for i, p in enumerate(living_processes): + # Living worker is never respawned (it stays alive until shutdown). + assert len(living_processes) == 1 + + for p in living_processes: assert p.culprit[0] is server_getter.return_value.main assert p.culprit[1] == 'pysoa-worker-1' - assert p.culprit[2] == (1, ) - if i < 5: - p.start.assert_called_once_with() - p.join.assert_called_once_with() - for p in living_processes[:-1]: - assert p.terminate.called is False - living_processes[-1].terminate.assert_called_once_with() + assert p.culprit[2][0] == 1 + p.start.assert_called_once_with() for p in quick_dying_processes: assert p.culprit[0] is server_getter.return_value.main assert p.culprit[1] == 'pysoa-worker-2' - assert p.culprit[2] == (2, ) + assert p.culprit[2][0] == 2 p.start.assert_called_once_with() - p.join.assert_called_once_with() assert p.terminate.called is False for p in slow_dying_processes: assert p.culprit[0] is server_getter.return_value.main assert p.culprit[1] == 'pysoa-worker-3' - assert p.culprit[2] == (3, ) + assert p.culprit[2][0] == 3 p.start.assert_called_once_with() - p.join.assert_called_once_with() assert p.terminate.called is False finally: if prev_sigint is not None: @@ -389,3 +426,352 @@ def se(target, name, args): signal.signal(signal.SIGTERM, prev_sigterm or signal.SIG_IGN) if prev_sighup is not None: signal.signal(signal.SIGHUP, prev_sighup or signal.SIG_IGN) + + +# --------------------------------------------------------------------------- +# Tests for the new default fork count and CLI argument defaults +# --------------------------------------------------------------------------- + + +class TestDefaultForkCount(unittest.TestCase): + """Verify that the default number of fork processes is 1.""" + + def setUp(self): + self.assertIsNotNone(standalone, 'setup_module did not run correctly') + self.prev_argv = sys.argv + + def tearDown(self): + sys.argv = self.prev_argv + + @mock.patch('multiprocessing.Process') + @mock.patch('multiprocessing.cpu_count') + def test_default_fork_processes_is_one(self, mock_cpu_count, mock_process): + """With no -f flag a single worker process is forked (not a direct main() call).""" + mock_cpu_count.return_value = 4 + sys.argv = ['/path/to/example_service/standalone.py', '--no-respawn'] + + mock_proc = mock.MagicMock() + mock_proc.is_alive.return_value = False + mock_process.return_value = mock_proc + + prev_sigint = prev_sigterm = prev_sighup = None + try: + prev_sigint = signal.signal(signal.SIGINT, signal.SIG_IGN) + prev_sigterm = signal.signal(signal.SIGTERM, signal.SIG_IGN) + prev_sighup = signal.signal(signal.SIGHUP, signal.SIG_IGN) + + standalone.simple_main(mock.MagicMock()) # type: ignore + + self.assertEqual(1, mock_process.call_count, 'Exactly one worker process expected') + self.assertEqual(1, mock_process.call_args_list[0][1]['args'][0]) + finally: + if prev_sigint is not None: + signal.signal(signal.SIGINT, prev_sigint or signal.SIG_IGN) + if prev_sigterm is not None: + signal.signal(signal.SIGTERM, prev_sigterm or signal.SIG_IGN) + if prev_sighup is not None: + signal.signal(signal.SIGHUP, prev_sighup or signal.SIG_IGN) + + def test_ping_timeout_default_is_ten_seconds(self): + parser = standalone._get_arg_parser() # type: ignore + args = parser.parse_args([]) + self.assertEqual(10, args.ping_timeout) + + def test_process_shutdown_timeout_default_is_thirty_seconds(self): + parser = standalone._get_arg_parser() # type: ignore + args = parser.parse_args([]) + self.assertEqual(30, args.process_shutdown_timeout) + + +# --------------------------------------------------------------------------- +# Tests for _ProcessMonitor ping mechanism (parent-side watchdog) +# --------------------------------------------------------------------------- + + +class TestProcessMonitorPingMechanism(unittest.TestCase): + """ + Unit tests for the parent-side watchdog that kills hung child processes. + + Each test creates a _ProcessMonitor directly, injects a fake child-process + object, and runs the monitor thread with a very short poll interval so the + suite stays fast. + """ + + _ORIGINAL_POLL_INTERVAL = None + + def setUp(self): + self.assertIsNotNone(standalone, 'setup_module did not run correctly') + TestProcessMonitorPingMechanism._ORIGINAL_POLL_INTERVAL = ( + standalone._ProcessMonitor._POLL_INTERVAL # type: ignore + ) + standalone._ProcessMonitor._POLL_INTERVAL = 0.01 # type: ignore + + def tearDown(self): + standalone._ProcessMonitor._POLL_INTERVAL = ( # type: ignore + TestProcessMonitorPingMechanism._ORIGINAL_POLL_INTERVAL + ) + + # ------------------------------------------------------------------ + # Internal helpers + # ------------------------------------------------------------------ + + def _make_monitor(self, ping_timeout=10, shutdown_timeout=30, respawn=False): + """Create a _ProcessMonitor whose child process is controlled by the test.""" + signal_context = standalone._SignalContext() # type: ignore + monitor = standalone._ProcessMonitor( # type: ignore + index=1, + signal_context=signal_context, + respawn=respawn, + shutdown_timeout=shutdown_timeout, + ping_timeout=ping_timeout, + target=mock.MagicMock(), + name='test-worker', + args=(1,), + ) + return monitor, signal_context + + # ------------------------------------------------------------------ + # Shared-memory injection + # ------------------------------------------------------------------ + + def test_ping_cells_injected_into_child_args(self): + """ping_timestamp is appended to the child args tuple.""" + monitor, _ = self._make_monitor() + child_args = monitor.process_kwargs['args'] + self.assertEqual(1, child_args[0]) # forked_process_id + self.assertIs(child_args[1], monitor._ping_timestamp) # shared Value('d') + self.assertEqual(2, len(child_args)) + + def test_ping_timestamp_is_shared_double(self): + """_ping_timestamp is a multiprocessing.Value that can hold a monotonic float.""" + monitor, _ = self._make_monitor() + self.assertIsInstance(monitor._ping_timestamp, multiprocessing.sharedctypes.Synchronized) + # Should be able to store a full-precision float (as used by time.monotonic()). + ts = time.monotonic() + monitor._ping_timestamp.value = ts + self.assertAlmostEqual(ts, monitor._ping_timestamp.value, places=6) + + # ------------------------------------------------------------------ + # Ping-based SIGKILL + # ------------------------------------------------------------------ + + def test_kills_child_with_stale_ping(self): + """SIGKILL is sent whenever the ping timestamp is stale beyond ping_timeout.""" + monitor, _ = self._make_monitor(ping_timeout=5, respawn=False) + + # Simulate: child has not pinged for 6 s (> 5 s timeout). + monitor._ping_timestamp.value = time.monotonic() - 6 + + fake = mock.MagicMock() + fake.pid = 9999 + # First poll: alive (hung). After SIGKILL: dead. + fake.is_alive.side_effect = [True, False] + monitor.process = fake + + sent = [] + with mock.patch.object(monitor, '_start_process', side_effect=lambda: None): + with mock.patch('os.kill', side_effect=lambda pid, sig: sent.append(sig)): + monitor.start() + monitor.join(timeout=5.0) + + self.assertFalse(monitor.is_alive(), 'Monitor thread did not exit') + self.assertIn(signal.SIGKILL, sent, 'Expected SIGKILL for hung child') + + def test_fresh_ping_prevents_kill(self): + """A child with a fresh ping timestamp must not be killed.""" + monitor, _ = self._make_monitor(ping_timeout=5, respawn=False) + + monitor._ping_timestamp.value = time.monotonic() # fresh + + counter = [0] + + def _is_alive(): + counter[0] += 1 + return counter[0] <= 4 + + fake = mock.MagicMock() + fake.pid = 9999 + fake.is_alive.side_effect = _is_alive + monitor.process = fake + + sent = [] + with mock.patch.object(monitor, '_start_process', side_effect=lambda: None): + with mock.patch('os.kill', side_effect=lambda pid, sig: sent.append(sig)): + monitor.start() + monitor.join(timeout=5.0) + + self.assertNotIn(signal.SIGKILL, sent, 'Child with fresh ping must not be killed') + + # ------------------------------------------------------------------ + # Shutdown SIGKILL escalation + # ------------------------------------------------------------------ + + def test_sigkill_escalation_after_shutdown_timeout(self): + """SIGTERM is escalated to SIGKILL when the child ignores the shutdown signal.""" + monitor, signal_context = self._make_monitor(shutdown_timeout=0, respawn=False) + + fake = mock.MagicMock() + fake.pid = 9999 + fake.is_alive.return_value = True # never exits on its own + + sent = [] + + def _os_kill(pid, sig): + sent.append(sig) + if sig == signal.SIGKILL: + fake.is_alive.return_value = False # dead after SIGKILL + + def _trigger(): + time.sleep(0.05) + signal_context.signaled = True + monitor.terminate() + + monitor.process = fake + with mock.patch.object(monitor, '_start_process', side_effect=lambda: None): + with mock.patch('os.kill', side_effect=_os_kill): + monitor.start() + threading.Thread(target=_trigger, daemon=True).start() + monitor.join(timeout=10.0) + + self.assertFalse(monitor.is_alive(), 'Monitor thread did not exit') + self.assertIn(signal.SIGKILL, sent, 'Expected SIGKILL escalation after timeout') + + def test_no_sigkill_when_child_exits_gracefully(self): + """A child that exits during the shutdown join window must not receive SIGKILL.""" + monitor, signal_context = self._make_monitor(shutdown_timeout=30, respawn=False) + + fake = mock.MagicMock() + fake.pid = 9999 + fake.is_alive.return_value = True + + def _graceful_join(*args, **kwargs): + # Simulate graceful exit the moment the parent waits. + fake.is_alive.return_value = False + + fake.join.side_effect = _graceful_join + + sent = [] + + def _trigger(): + time.sleep(0.05) + signal_context.signaled = True + monitor.terminate() + + monitor.process = fake + with mock.patch.object(monitor, '_start_process', side_effect=lambda: None): + with mock.patch('os.kill', side_effect=lambda pid, sig: sent.append(sig)): + monitor.start() + threading.Thread(target=_trigger, daemon=True).start() + monitor.join(timeout=10.0) + + self.assertNotIn(signal.SIGKILL, sent, 'Gracefully-exiting child must not be SIGKILL-ed') + + # ------------------------------------------------------------------ + # Respawn after ping-kill + # ------------------------------------------------------------------ + + def test_respawns_after_ping_kill(self): + """The monitor spawns a replacement process after killing a hung child.""" + monitor, signal_context = self._make_monitor(ping_timeout=5, respawn=True) + + calls = [0] + + def _start_proc(): + calls[0] += 1 + p = mock.MagicMock() + p.pid = 9000 + calls[0] + + if calls[0] == 1: + # First spawn: stuck — stale ping. + monitor._ping_timestamp.value = time.monotonic() - 6 + p.is_alive.side_effect = [True, False] # True → stuck, False → after kill + else: + # Second spawn: exits immediately and signals shutdown. + p.is_alive.return_value = False + signal_context.signaled = True + + monitor.process = p + + with mock.patch.object(monitor, '_start_process', side_effect=_start_proc): + with mock.patch('os.kill'): + monitor._start_process() # wire up first process + monitor.start() + monitor.join(timeout=5.0) + + self.assertFalse(monitor.is_alive(), 'Monitor thread did not exit') + self.assertEqual(2, calls[0], 'Expected initial spawn + one respawn after ping-kill') + + def test_start_process_resets_ping_state(self): + """ + _start_process refreshes the ping timestamp before spawning a new child. + + Without this reset, a stale _ping_timestamp left by a crashed/killed worker + would cause its replacement to be immediately SIGKILL-ed. + """ + monitor, _ = self._make_monitor() + + # Simulate a stale timestamp left by a previously killed/crashed worker. + monitor._ping_timestamp.value = 0.0 # epoch — obviously stale + + fake_process = mock.MagicMock() + before = time.monotonic() + + with mock.patch('multiprocessing.Process', return_value=fake_process) as mock_mp: + monitor._start_process() + + self.assertGreaterEqual(monitor._ping_timestamp.value, before, + '_start_process must refresh the ping timestamp') + mock_mp.assert_called_once() + fake_process.start.assert_called_once_with() + + def test_replacement_not_killed_due_to_stale_ping_from_previous_child(self): + """ + After a ping-kill the replacement child must not be immediately SIGKILL-ed due to + the stale _ping_timestamp left by the dead worker. + + We let the real _start_process run (so the timestamp reset actually fires) and + only mock multiprocessing.Process to return a controllable fake child object. + """ + monitor, signal_context = self._make_monitor(ping_timeout=5, respawn=True) + + spawn_count = [0] + sigkills_per_spawn = collections.defaultdict(int) + + def make_process(**kwargs): + spawn_count[0] += 1 + n = spawn_count[0] + p = mock.MagicMock() + p.pid = 9000 + n + + if n == 1: + # First child: simulate it writing a stale timestamp to shared memory + # (as a real child would when it started handling a request and then hung). + def _start_first(): + monitor._ping_timestamp.value = time.monotonic() - 6 + p.start.side_effect = _start_first + # Alive on first poll (stuck), dead on second (after SIGKILL). + p.is_alive.side_effect = [True, False] + else: + # Replacement: by this point _start_process has reset the ping state. + # The replacement is alive for one poll (exercising the ping check) and + # then exits naturally. Signal shutdown so the monitor doesn't respawn again. + def _start_replacement(): + signal_context.signaled = True + p.start.side_effect = _start_replacement + p.is_alive.side_effect = [True, False] + + return p + + def _os_kill(pid, sig): + if sig == signal.SIGKILL: + sigkills_per_spawn[spawn_count[0]] += 1 + + with mock.patch('multiprocessing.Process', side_effect=make_process): + with mock.patch('os.kill', side_effect=_os_kill): + monitor.start() + monitor.join(timeout=5.0) + + self.assertFalse(monitor.is_alive(), 'Monitor thread did not exit') + self.assertEqual(2, spawn_count[0], 'Expected initial spawn + one respawn') + self.assertEqual(1, sigkills_per_spawn[1], 'Expected exactly one SIGKILL for the stuck child') + self.assertEqual(0, sigkills_per_spawn[2], 'Replacement child must not be SIGKILL-ed') From d61d03c298041a287775b4d36d97581eeb7a7d9e Mon Sep 17 00:00:00 2001 From: eyal Date: Tue, 28 Apr 2026 16:30:11 -0700 Subject: [PATCH 2/2] Fix issues from cursor code review --- pysoa/server/server.py | 27 +++- pysoa/server/standalone.py | 76 +++++++-- .../test_server/test_handle_next_request.py | 97 ++++++++++++ tests/unit/server/test_standalone.py | 147 ++++++++++++++++-- 4 files changed, 310 insertions(+), 37 deletions(-) diff --git a/pysoa/server/server.py b/pysoa/server/server.py index 0287292d..bcb4e670 100644 --- a/pysoa/server/server.py +++ b/pysoa/server/server.py @@ -240,8 +240,8 @@ class DictWrapper(RecursivelyCensoredDictWrapper): self._skip_django_database_cleanup = False - # Set by main() when the server is started as a forked child process. These are shared-memory - # cells written here and read by the parent _ProcessMonitor to detect hung workers. + # Set by main() when the server is started as a forked child process. This is a shared-memory + # cell written here and read by the parent _ProcessMonitor to detect hung workers. self._ping_timestamp = None # type: Any def _ping_parent(self): @@ -999,7 +999,8 @@ def _validate_receive_timeout_vs_ping_timeout(cls, settings, ping_timeout): receive_timeout = attr.fields(RedisTransportCore).receive_timeout_in_seconds.default except (ImportError, attr.exceptions.NotAnAttrsClassError): return # non-Redis transport; nothing to validate - if receive_timeout > ping_timeout / 2: + max_receive_timeout = ping_timeout / 2 + if receive_timeout > max_receive_timeout: raise ValueError( 'transport.kwargs.receive_timeout_in_seconds ({rt}s) must be at most half the ' '--ping-timeout ({pt}s), otherwise the ping watchdog may not detect hung workers ' @@ -1007,14 +1008,14 @@ def _validate_receive_timeout_vs_ping_timeout(cls, settings, ping_timeout): 'at least {min_pt}s.'.format( rt=receive_timeout, pt=ping_timeout, - max=ping_timeout // 2, + max=max_receive_timeout, min_pt=receive_timeout * 2, ) ) @classmethod - def main(cls, forked_process_id=None, _ping_timestamp=None): - # type: (Optional[int], Any) -> None + def main(cls, forked_process_id=None, _ping_timestamp=None, _ping_timeout=None): + # type: (Optional[int], Any, Optional[int]) -> None """ Command-line entry point for running a PySOA server. The chain of method calls is as follows:: @@ -1047,6 +1048,10 @@ def main(cls, forked_process_id=None, _ping_timestamp=None): file, etc. For example, if the `--fork` argument is used with the value 5 (creating five child processes), this argument will have the values 1, 2, 3, 4, and 5 across the five respective child processes. + :param _ping_timeout: The watchdog timeout (in seconds) configured in the parent _ProcessMonitor, injected + directly so that the validation works on multiprocessing spawn-mode platforms (Windows, + macOS Python 3.13+) where the child process does not inherit the parent's sys.argv. + When provided, it takes priority over the --ping-timeout value parsed from sys.argv. """ parser = argparse.ArgumentParser( description='Server for the {} SOA service'.format(cls.service_name), @@ -1108,8 +1113,14 @@ def main(cls, forked_process_id=None, _ping_timestamp=None): # When running as a forked child, validate that the transport's receive timeout is short enough # for the parent's ping watchdog to reliably detect hangs. - if forked_process_id is not None and cmd_options.ping_timeout is not None: - cls._validate_receive_timeout_vs_ping_timeout(settings, cmd_options.ping_timeout) + # + # Prefer the explicitly-injected _ping_timeout (passed as a positional arg by _ProcessMonitor) + # over the sys.argv-parsed value. On multiprocessing "spawn" start-mode platforms (Windows, + # macOS Python 3.13+) child processes do not inherit the parent's sys.argv, so + # cmd_options.ping_timeout would be None and the check would be silently skipped. + effective_ping_timeout = _ping_timeout if _ping_timeout is not None else cmd_options.ping_timeout + if forked_process_id is not None and effective_ping_timeout is not None: + cls._validate_receive_timeout_vs_ping_timeout(settings, effective_ping_timeout) PySOALogContextFilter.set_service_name(cls.service_name) diff --git a/pysoa/server/standalone.py b/pysoa/server/standalone.py index d77f3d36..367990a5 100644 --- a/pysoa/server/standalone.py +++ b/pysoa/server/standalone.py @@ -98,6 +98,15 @@ def _get_arg_parser(): # type: () -> argparse.ArgumentParser type=int, default=10, ) + parser.add_argument( + '--startup-timeout', + help='The number of seconds allowed for a worker process to start up and send its first ping before the ' + 'parent kills it with SIGKILL. This is separate from --ping-timeout, which governs per-request watchdog ' + 'checks after the first ping has been received (default: 60)', + required=False, + type=int, + default=60, + ) parser.add_argument( '--use-file-watcher', help='If specified, PySOA will watch service files for changes and restart the service automatically. If no ' @@ -135,6 +144,7 @@ def __init__( respawn, # type: bool shutdown_timeout, # type: int ping_timeout, # type: int + startup_timeout, # type: int **kwargs # type: Any ): # type: (...) -> None self.index = index @@ -142,15 +152,24 @@ def __init__( self.respawn = respawn self.shutdown_timeout = shutdown_timeout self.ping_timeout = ping_timeout + self.startup_timeout = startup_timeout # Shared-memory timestamp written by the child on every meaningful event # (request received, request complete, idle timeout) and read by this thread # to detect workers that are hung mid-request. self._ping_timestamp = multiprocessing.Value('d', time.monotonic()) - # Inject the timestamp into the positional args forwarded to server_class.main(). + # Tracks when the current child process was last (re)started; used to distinguish + # the startup phase (before the first child ping) from normal operation. + # Initialised here and refreshed by _start_process() on each (re)spawn. + self._process_started_at = time.monotonic() + + # Inject the timestamp and ping_timeout into the positional args forwarded to + # server_class.main(). Both are passed explicitly so that the values are available + # in the child process even on "spawn" start-mode platforms (Windows, macOS Python + # 3.13+) where the child does not inherit the parent's sys.argv. existing_args = kwargs.pop('args', ()) - self.process_kwargs = dict(kwargs, args=existing_args + (self._ping_timestamp,)) + self.process_kwargs = dict(kwargs, args=existing_args + (self._ping_timestamp, self.ping_timeout)) self.process = None # type: Optional[multiprocessing.Process] self.one_minute_restart_times = collections.deque(maxlen=8) # type: Deque[float] @@ -166,9 +185,15 @@ def terminate(self): # type: () -> None self.process.terminate() def _start_process(self): # type: () -> None - # Refresh the ping timestamp before starting a new child so the replacement is - # not immediately killed due to a stale timestamp left by a crashed/killed worker. - self._ping_timestamp.value = time.monotonic() + # Record the start time and reset the ping timestamp to that same instant. + # Both are set atomically (same value) so that: + # _ping_timestamp.value == _process_started_at → child has not yet pinged + # _ping_timestamp.value > _process_started_at → child has sent at least one ping + # This also prevents a stale timestamp left by a crashed/killed worker from causing + # the replacement child to be immediately SIGKILL-ed. + t = time.monotonic() + self._process_started_at = t + self._ping_timestamp.value = t self.process = multiprocessing.Process(**self.process_kwargs) self.process.start() @@ -186,7 +211,12 @@ def _kill_process(self, reason=None): # type: (Optional[str]) -> None os.kill(self.process.pid, signal.SIGKILL) except OSError: pass # process may have exited between the is_alive() check and the kill - self.process.join() + self.process.join(timeout=self.shutdown_timeout) + if self.process.is_alive(): + sys.stdout.write( + 'Server process #{} could not be killed with SIGKILL; giving up.\n'.format(self.index) + ) + sys.stdout.flush() def run(self): # type: () -> None self._start_process() @@ -241,14 +271,29 @@ def run(self): # type: () -> None self._kill_process() break - # Check whether the child is hung. The child pings us (updates _ping_timestamp) - # on every meaningful event: request received, request complete, or idle timeout. - # If the timestamp is stale for longer than ping_timeout, the child is stuck. - if time.monotonic() - self._ping_timestamp.value > self.ping_timeout: - self._kill_process( - 'has not pinged in more than {} seconds'.format(self.ping_timeout) - ) - # The process is now dead; loop back so is_alive() triggers the respawn logic above. + # Check whether the child is hung. + # _start_process sets both _ping_timestamp and _process_started_at to the same + # instant, so: + # _ping_timestamp.value == _process_started_at → child has never pinged + # _ping_timestamp.value > _process_started_at → at least one ping received + # Before the first ping we apply startup_timeout (measured from process start) + # to give slow-starting services room to initialise. After the first ping we + # apply ping_timeout (measured from the last ping) for per-request hang detection. + now = time.monotonic() + last_ping = self._ping_timestamp.value + has_pinged = last_ping > self._process_started_at + if has_pinged: + if now - last_ping > self.ping_timeout: + self._kill_process( + 'has not pinged in more than {} seconds'.format(self.ping_timeout) + ) + # The process is now dead; loop back so is_alive() triggers the respawn logic above. + else: + if now - self._process_started_at > self.startup_timeout: + self._kill_process( + 'did not ping within startup timeout of {} seconds'.format(self.startup_timeout) + ) + # The process is now dead; loop back so is_alive() triggers the respawn logic above. self.process = None @@ -305,6 +350,7 @@ def signaled(_signal_number, _stack_frame): respawn=not args.no_respawn, shutdown_timeout=args.process_shutdown_timeout, ping_timeout=args.ping_timeout, + startup_timeout=args.startup_timeout, target=server_class.main, name='pysoa-worker-{}'.format(i), args=(i, ), @@ -352,7 +398,7 @@ def _run_server_reloader_wrapper(args, server_class): # type: (argparse.Namespa autoreload.get_reloader( module_name or '', args.use_file_watcher, - signal_forks=args.fork_processes > 1 + signal_forks=args.fork_processes >= 1 ).main( _run_server, (args, server_class), diff --git a/tests/unit/server/test_server/test_handle_next_request.py b/tests/unit/server/test_server/test_handle_next_request.py index ab28b710..8a6b4174 100644 --- a/tests/unit/server/test_server/test_handle_next_request.py +++ b/tests/unit/server/test_server/test_handle_next_request.py @@ -247,3 +247,100 @@ def test_skips_check_for_non_redis_transport_without_timeout(self): with mock.patch.dict('sys.modules', {'pysoa.common.transport.redis_gateway.core': None}): # Should not raise even though ping_timeout=1 would normally flag any timeout > 0.5. HandleNextRequestServer._validate_receive_timeout_vs_ping_timeout(settings, ping_timeout=1) + + +# --------------------------------------------------------------------------- +# Tests that verify the _ping_timeout parameter path in Server.main() works +# correctly even when sys.argv does not contain --ping-timeout (spawn mode). +# --------------------------------------------------------------------------- + + +class TestMainPingTimeoutParameter(TestCase): + """ + Verify that Server.main() uses the explicit _ping_timeout keyword argument + (not sys.argv) to drive the receive_timeout validation. This exercises the + fix for the bug where spawn-mode child processes (Windows, macOS Python + 3.13+) would silently skip the validation because their sys.argv doesn't + contain --ping-timeout. + """ + + def _run_main_stub(self, forked_process_id, _ping_timeout, extra_argv=None): + """ + Invoke HandleNextRequestServer.main() with all heavy infrastructure + mocked so that only the validation-dispatch logic is exercised. + + Returns the list of ping_timeout values that + _validate_receive_timeout_vs_ping_timeout was called with. + + On spawn-mode platforms (Windows, macOS Python 3.13+) the child's + sys.argv is rebuilt without the parent's flags; we simulate that by + omitting --ping-timeout from the argv while still supplying -s so + that argparse is satisfied. The explicit _ping_timeout kwarg must + carry the value instead. + """ + called_with = [] + + def fake_validate(settings, ping_timeout): + called_with.append(ping_timeout) + + # Minimal argv: -s is required by HandleNextRequestServer (no use_django). + # --ping-timeout is deliberately absent to simulate spawn-mode child argv. + argv = ['/path/to/standalone.py', '-s', 'fake.settings.module'] + if extra_argv: + argv.extend(extra_argv) + + import types + fake_settings_module = types.ModuleType('fake.settings.module') + fake_settings_module.SOA_SERVER_SETTINGS = {} # irrelevant; settings_class is mocked + + with mock.patch.object(HandleNextRequestServer, '_validate_receive_timeout_vs_ping_timeout', + side_effect=fake_validate): + with mock.patch('sys.argv', argv): + with mock.patch('importlib.import_module', return_value=fake_settings_module): + with mock.patch.object(HandleNextRequestServer, 'settings_class', + return_value=mock.MagicMock()): + with mock.patch('logging.config.dictConfig'): + with mock.patch.object(HandleNextRequestServer, 'initialize', + return_value=HandleNextRequestServer): + with mock.patch.object(HandleNextRequestServer, 'run'): + HandleNextRequestServer.main( + forked_process_id=forked_process_id, + _ping_timestamp=None, + _ping_timeout=_ping_timeout, + ) + return called_with + + def test_validation_fires_via_explicit_ping_timeout_arg(self): + """ + When _ping_timeout is supplied as a keyword argument to main(), the + validation must run even if sys.argv has no --ping-timeout flag. + This simulates a spawn-mode child where sys.argv is reset. + """ + called_with = self._run_main_stub(forked_process_id=1, _ping_timeout=10) + self.assertEqual([10], called_with, + 'Validation should have been called exactly once with ping_timeout=10') + + def test_validation_skipped_when_both_sources_absent(self): + """ + If neither _ping_timeout nor --ping-timeout in sys.argv is present, the + validation must be skipped (no spurious error for non-forked direct + invocations or pre-existing deployments that haven't updated yet). + """ + called_with = self._run_main_stub(forked_process_id=1, _ping_timeout=None) + self.assertEqual([], called_with, + 'Validation should not be called when ping_timeout is absent from both sources') + + def test_explicit_arg_takes_priority_over_sys_argv(self): + """ + When _ping_timeout is given explicitly *and* --ping-timeout is also in + sys.argv (fork-mode on Unix), the explicit value wins so that the parent + is the single source of truth. + """ + # sys.argv carries a different value (30) to demonstrate the priority. + called_with = self._run_main_stub( + forked_process_id=1, + _ping_timeout=10, # explicit value — should win + extra_argv=['--ping-timeout', '30'], # sys.argv value — should be ignored + ) + self.assertEqual([10], called_with, + 'Explicit _ping_timeout=10 should take priority over sys.argv --ping-timeout 30') diff --git a/tests/unit/server/test_standalone.py b/tests/unit/server/test_standalone.py index 3dc0bb47..5cca80de 100644 --- a/tests/unit/server/test_standalone.py +++ b/tests/unit/server/test_standalone.py @@ -95,7 +95,8 @@ def test_only_file_watcher_argument_no_values(self, mock_get_reloader): assert mock_get_reloader.call_count == 1 assert mock_get_reloader.call_args_list[0][0][0] in ('', 'pytest', 'pytest.__main__', 'coverage') assert mock_get_reloader.call_args_list[0][0][1] is None - assert mock_get_reloader.call_args_list[0][1]['signal_forks'] is False + # Default fork_processes=1 means signal_forks is True (the reloader must signal the worker). + assert mock_get_reloader.call_args_list[0][1]['signal_forks'] is True self.assertEqual(1, mock_get_reloader.return_value.main.call_count) self.assertEqual( @@ -117,7 +118,8 @@ def test_only_file_watcher_argument_some_values(self, mock_get_reloader): assert mock_get_reloader.call_count == 1 assert mock_get_reloader.call_args_list[0][0][0] in ('', 'pytest', 'pytest.__main__', 'coverage') assert mock_get_reloader.call_args_list[0][0][1] == ['example', 'pysoa', 'conformity'] - assert mock_get_reloader.call_args_list[0][1]['signal_forks'] is False + # Default fork_processes=1 means signal_forks is True (the reloader must signal the worker). + assert mock_get_reloader.call_args_list[0][1]['signal_forks'] is True self.assertEqual(1, mock_get_reloader.return_value.main.call_count) self.assertEqual(1, mock_get_reloader.return_value.main.call_args_list[0][0][1][0].fork_processes) @@ -482,6 +484,11 @@ def test_process_shutdown_timeout_default_is_thirty_seconds(self): args = parser.parse_args([]) self.assertEqual(30, args.process_shutdown_timeout) + def test_startup_timeout_default_is_sixty_seconds(self): + parser = standalone._get_arg_parser() # type: ignore + args = parser.parse_args([]) + self.assertEqual(60, args.startup_timeout) + # --------------------------------------------------------------------------- # Tests for _ProcessMonitor ping mechanism (parent-side watchdog) @@ -515,7 +522,7 @@ def tearDown(self): # Internal helpers # ------------------------------------------------------------------ - def _make_monitor(self, ping_timeout=10, shutdown_timeout=30, respawn=False): + def _make_monitor(self, ping_timeout=10, shutdown_timeout=30, respawn=False, startup_timeout=60): """Create a _ProcessMonitor whose child process is controlled by the test.""" signal_context = standalone._SignalContext() # type: ignore monitor = standalone._ProcessMonitor( # type: ignore @@ -524,6 +531,7 @@ def _make_monitor(self, ping_timeout=10, shutdown_timeout=30, respawn=False): respawn=respawn, shutdown_timeout=shutdown_timeout, ping_timeout=ping_timeout, + startup_timeout=startup_timeout, target=mock.MagicMock(), name='test-worker', args=(1,), @@ -535,12 +543,20 @@ def _make_monitor(self, ping_timeout=10, shutdown_timeout=30, respawn=False): # ------------------------------------------------------------------ def test_ping_cells_injected_into_child_args(self): - """ping_timestamp is appended to the child args tuple.""" - monitor, _ = self._make_monitor() + """ping_timestamp and ping_timeout are appended to the child args tuple.""" + monitor, _ = self._make_monitor(ping_timeout=42) child_args = monitor.process_kwargs['args'] self.assertEqual(1, child_args[0]) # forked_process_id self.assertIs(child_args[1], monitor._ping_timestamp) # shared Value('d') - self.assertEqual(2, len(child_args)) + self.assertEqual(42, child_args[2]) # ping_timeout explicit value + self.assertEqual(3, len(child_args)) + + def test_ping_timeout_injected_into_child_args(self): + """ping_timeout is passed explicitly so the child can validate without sys.argv.""" + monitor, _ = self._make_monitor(ping_timeout=15) + child_args = monitor.process_kwargs['args'] + # args[2] is the _ping_timeout parameter in server_class.main(). + self.assertEqual(15, child_args[2]) def test_ping_timestamp_is_shared_double(self): """_ping_timestamp is a multiprocessing.Value that can hold a monotonic float.""" @@ -559,13 +575,17 @@ def test_kills_child_with_stale_ping(self): """SIGKILL is sent whenever the ping timestamp is stale beyond ping_timeout.""" monitor, _ = self._make_monitor(ping_timeout=5, respawn=False) - # Simulate: child has not pinged for 6 s (> 5 s timeout). + # Simulate: process started long ago, last ping was 6 s ago (> 5 s timeout). + # Setting _process_started_at to the past ensures has_pinged is True so that + # the per-request ping_timeout branch (not the startup branch) is exercised. + monitor._process_started_at = time.monotonic() - 100 monitor._ping_timestamp.value = time.monotonic() - 6 fake = mock.MagicMock() fake.pid = 9999 - # First poll: alive (hung). After SIGKILL: dead. - fake.is_alive.side_effect = [True, False] + # Calls: (1) main-loop poll → alive/hung, (2) _kill_process is_alive check → dead, + # (3) main-loop poll on next iteration → still dead, triggering the "exited" branch. + fake.is_alive.side_effect = [True, False, False] monitor.process = fake sent = [] @@ -581,6 +601,9 @@ def test_fresh_ping_prevents_kill(self): """A child with a fresh ping timestamp must not be killed.""" monitor, _ = self._make_monitor(ping_timeout=5, respawn=False) + # Push _process_started_at into the past so has_pinged is True, then set a + # fresh ping to confirm the watchdog does not fire. + monitor._process_started_at = time.monotonic() - 100 monitor._ping_timestamp.value = time.monotonic() # fresh counter = [0] @@ -683,8 +706,13 @@ def _start_proc(): if calls[0] == 1: # First spawn: stuck — stale ping. + # Push _process_started_at into the past so has_pinged is True and the + # per-request ping_timeout branch fires (not the startup branch). + monitor._process_started_at = time.monotonic() - 100 monitor._ping_timestamp.value = time.monotonic() - 6 - p.is_alive.side_effect = [True, False] # True → stuck, False → after kill + # Calls: (1) main-loop → alive/hung, (2) _kill_process is_alive check → dead, + # (3) main-loop next iteration → still dead, triggering respawn. + p.is_alive.side_effect = [True, False, False] else: # Second spawn: exits immediately and signals shutdown. p.is_alive.return_value = False @@ -703,15 +731,18 @@ def _start_proc(): def test_start_process_resets_ping_state(self): """ - _start_process refreshes the ping timestamp before spawning a new child. + _start_process refreshes both the ping timestamp and _process_started_at before + spawning a new child, setting them to the same instant so the has_pinged check + starts in the 'not yet pinged' state. Without this reset, a stale _ping_timestamp left by a crashed/killed worker would cause its replacement to be immediately SIGKILL-ed. """ monitor, _ = self._make_monitor() - # Simulate a stale timestamp left by a previously killed/crashed worker. + # Simulate stale state left by a previously killed/crashed worker. monitor._ping_timestamp.value = 0.0 # epoch — obviously stale + monitor._process_started_at = 0.0 fake_process = mock.MagicMock() before = time.monotonic() @@ -721,6 +752,11 @@ def test_start_process_resets_ping_state(self): self.assertGreaterEqual(monitor._ping_timestamp.value, before, '_start_process must refresh the ping timestamp') + self.assertGreaterEqual(monitor._process_started_at, before, + '_start_process must refresh _process_started_at') + self.assertEqual(monitor._ping_timestamp.value, monitor._process_started_at, + '_ping_timestamp and _process_started_at must be equal after _start_process ' + 'so the child starts in the startup phase') mock_mp.assert_called_once() fake_process.start.assert_called_once_with() @@ -746,11 +782,15 @@ def make_process(**kwargs): if n == 1: # First child: simulate it writing a stale timestamp to shared memory # (as a real child would when it started handling a request and then hung). + # Also push _process_started_at into the past so has_pinged is True and + # the per-request ping_timeout branch (not the startup branch) fires. def _start_first(): + monitor._process_started_at = time.monotonic() - 100 monitor._ping_timestamp.value = time.monotonic() - 6 p.start.side_effect = _start_first - # Alive on first poll (stuck), dead on second (after SIGKILL). - p.is_alive.side_effect = [True, False] + # Calls: (1) main-loop → alive/hung, (2) _kill_process is_alive check → dead, + # (3) main-loop next iteration → still dead, triggering the respawn path. + p.is_alive.side_effect = [True, False, False] else: # Replacement: by this point _start_process has reset the ping state. # The replacement is alive for one poll (exercising the ping check) and @@ -775,3 +815,82 @@ def _os_kill(pid, sig): self.assertEqual(2, spawn_count[0], 'Expected initial spawn + one respawn') self.assertEqual(1, sigkills_per_spawn[1], 'Expected exactly one SIGKILL for the stuck child') self.assertEqual(0, sigkills_per_spawn[2], 'Replacement child must not be SIGKILL-ed') + + # ------------------------------------------------------------------ + # Startup timeout + # ------------------------------------------------------------------ + + def test_startup_timeout_kills_child_that_never_pings(self): + """A child that never sends its first ping is killed after startup_timeout.""" + # startup_timeout=0 means any positive elapsed time exceeds the budget. + monitor, _ = self._make_monitor(ping_timeout=60, startup_timeout=0, respawn=False) + + fake = mock.MagicMock() + fake.pid = 9999 + # Child never pings: _ping_timestamp.value == _process_started_at throughout. + # Calls: (1) main-loop → alive/hung, (2) _kill_process is_alive check → dead, + # (3) main-loop next iteration → still dead, triggering the "exited" branch. + fake.is_alive.side_effect = [True, False, False] + monitor.process = fake + + sent = [] + with mock.patch.object(monitor, '_start_process', side_effect=lambda: None): + with mock.patch('os.kill', side_effect=lambda pid, sig: sent.append(sig)): + monitor.start() + monitor.join(timeout=5.0) + + self.assertFalse(monitor.is_alive(), 'Monitor thread did not exit') + self.assertIn(signal.SIGKILL, sent, 'Expected SIGKILL for child that never pinged') + + def test_startup_timeout_prevents_kill_within_startup_window(self): + """A child that has not yet pinged is not killed while still within startup_timeout.""" + # ping_timeout=1 is very short, but startup_timeout=60 keeps the child alive + # until it exits naturally (before the startup window expires). + monitor, _ = self._make_monitor(ping_timeout=1, startup_timeout=60, respawn=False) + + counter = [0] + + def _is_alive(): + counter[0] += 1 + return counter[0] <= 4 # alive for 4 polls, then exits naturally + + fake = mock.MagicMock() + fake.pid = 9999 + fake.is_alive.side_effect = _is_alive + monitor.process = fake + + sent = [] + with mock.patch.object(monitor, '_start_process', side_effect=lambda: None): + with mock.patch('os.kill', side_effect=lambda pid, sig: sent.append(sig)): + monitor.start() + monitor.join(timeout=5.0) + + self.assertNotIn(signal.SIGKILL, sent, + 'Child still within startup window must not be killed even if ping_timeout is short') + + def test_ping_timeout_applies_after_first_ping(self): + """Once a child has pinged, ping_timeout governs hang detection (not startup_timeout).""" + # startup_timeout=60 (would protect a non-pinging child) but the child has already + # pinged once and is now 6 s stale, which exceeds ping_timeout=5. + monitor, _ = self._make_monitor(ping_timeout=5, startup_timeout=60, respawn=False) + + # Simulate: child pinged once long ago (process started 100 s ago, last ping 6 s ago). + monitor._process_started_at = time.monotonic() - 100 + monitor._ping_timestamp.value = time.monotonic() - 6 + + fake = mock.MagicMock() + fake.pid = 9999 + # Calls: (1) main-loop → alive/hung, (2) _kill_process is_alive check → dead, + # (3) main-loop next iteration → still dead, triggering the "exited" branch. + fake.is_alive.side_effect = [True, False, False] + monitor.process = fake + + sent = [] + with mock.patch.object(monitor, '_start_process', side_effect=lambda: None): + with mock.patch('os.kill', side_effect=lambda pid, sig: sent.append(sig)): + monitor.start() + monitor.join(timeout=5.0) + + self.assertFalse(monitor.is_alive(), 'Monitor thread did not exit') + self.assertIn(signal.SIGKILL, sent, + 'Expected SIGKILL once ping is stale beyond ping_timeout, even with long startup_timeout')