Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
87 changes: 84 additions & 3 deletions pysoa/server/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,20 @@ class DictWrapper(RecursivelyCensoredDictWrapper):

self._skip_django_database_cleanup = False

# Set by main() when the server is started as a forked child process. This is a shared-memory
# cell written here and read by the parent _ProcessMonitor to detect hung workers.
self._ping_timestamp = None # type: Any

def _ping_parent(self):
# type: () -> None
"""
Update the shared-memory timestamp that the parent _ProcessMonitor polls to detect hung workers.
Called whenever the child makes meaningful progress: request received, request complete, or idle.
No-op when not running as a forked child (i.e. when _ping_timestamp is None).
"""
if self._ping_timestamp is not None:
self._ping_timestamp.value = time.monotonic()

def handle_next_request(self): # type: () -> None
"""
Retrieves the next request from the transport, or returns if it times out (no request has been made), and then
Expand All @@ -258,14 +272,17 @@ def handle_next_request(self): # type: () -> None
self.logger.warning('Thought to be impossible, but the transport returned None')
raise MessageReceiveTimeout()
except MessageReceiveTimeout:
# no new message, nothing to do
# no new message, nothing to do — ping the parent to confirm we are alive and idle
self._ping_parent()
self._idle_timer.stop()
self.perform_idle_actions()
self._set_busy_metrics(False)
self._idle_timer.start()
return

# We are no longer idle, so stop the timer, reset for the next idle period, and indicate busy in the gauges
# We are no longer idle, so stop the timer, reset for the next idle period, and indicate busy in the gauges.
# Ping the parent to reset the watchdog clock now that a request has been received.
self._ping_parent()
self._idle_timer.stop()
self._idle_timer = None
self._set_busy_metrics(True)
Expand Down Expand Up @@ -358,6 +375,8 @@ def attr_filter(attrib, _value): # type: (attr.Attribute, Any) -> bool
PySOALogContextFilter.clear_logging_request_context()
self.perform_post_request_actions()
self._set_busy_metrics(False)
# Ping the parent to signal that request processing is complete and reset the watchdog clock.
self._ping_parent()

def make_client(self, context, extra_context=None, **kwargs):
# type: (Context, Optional[Context], **Any) -> Client
Expand Down Expand Up @@ -960,7 +979,43 @@ def initialize(cls, settings): # type: (ServerSettings) -> Type[Server]
return cls

@classmethod
def main(cls, forked_process_id=None): # type: (Optional[int]) -> None
def _validate_receive_timeout_vs_ping_timeout(cls, settings, ping_timeout):
# type: (Any, int) -> None
"""
Raise ValueError if the transport's receive_timeout_in_seconds exceeds half the ping watchdog
timeout. If receive_timeout_in_seconds > ping_timeout/2 the child could stay blocked in a
single BLPOP call for longer than the watchdog allows, causing spurious kills of healthy workers.

Only checked when running in forked mode (ping_timeout is not None).
Silently skipped for non-Redis transports.
"""
transport_kwargs = settings['transport'].get('kwargs', {})
receive_timeout = transport_kwargs.get('receive_timeout_in_seconds')
if receive_timeout is None:
# The setting wasn't overridden; fall back to the Redis transport's built-in default so
# the check still catches mismatched ping-timeout values on default configurations.
try:
from pysoa.common.transport.redis_gateway.core import RedisTransportCore # noqa: local import
receive_timeout = attr.fields(RedisTransportCore).receive_timeout_in_seconds.default
except (ImportError, attr.exceptions.NotAnAttrsClassError):
return # non-Redis transport; nothing to validate
max_receive_timeout = ping_timeout / 2
if receive_timeout > max_receive_timeout:
raise ValueError(
'transport.kwargs.receive_timeout_in_seconds ({rt}s) must be at most half the '
'--ping-timeout ({pt}s), otherwise the ping watchdog may not detect hung workers '
'reliably. Lower receive_timeout_in_seconds to {max}s or raise --ping-timeout to '
'at least {min_pt}s.'.format(
rt=receive_timeout,
pt=ping_timeout,
max=max_receive_timeout,
min_pt=receive_timeout * 2,
)
)

@classmethod
def main(cls, forked_process_id=None, _ping_timestamp=None, _ping_timeout=None):
# type: (Optional[int], Any, Optional[int]) -> None
"""
Command-line entry point for running a PySOA server. The chain of method calls is as follows::

Expand Down Expand Up @@ -993,6 +1048,10 @@ def main(cls, forked_process_id=None): # type: (Optional[int]) -> None
file, etc. For example, if the `--fork` argument is used with the value 5 (creating
five child processes), this argument will have the values 1, 2, 3, 4, and 5 across
the five respective child processes.
:param _ping_timeout: The watchdog timeout (in seconds) configured in the parent _ProcessMonitor, injected
directly so that the validation works on multiprocessing spawn-mode platforms (Windows,
macOS Python 3.13+) where the child process does not inherit the parent's sys.argv.
When provided, it takes priority over the --ping-timeout value parsed from sys.argv.
"""
parser = argparse.ArgumentParser(
description='Server for the {} SOA service'.format(cls.service_name),
Expand All @@ -1010,6 +1069,14 @@ def main(cls, forked_process_id=None): # type: (Optional[int]) -> None
help='The settings module to use',
required=True,
)
# Read the parent's ping-timeout so we can validate the transport receive timeout against it.
# parse_known_args is used throughout, so unknown arguments (e.g. --fork) are silently ignored.
parser.add_argument(
'--ping-timeout',
type=int,
default=None,
help=argparse.SUPPRESS, # internal; set by standalone.py
)
cmd_options, _ = parser.parse_known_args(sys.argv[1:])

# Load settings from the given file (or use Django and grab from its settings)
Expand Down Expand Up @@ -1044,6 +1111,17 @@ def main(cls, forked_process_id=None): # type: (Optional[int]) -> None
if not cls.service_name:
raise AttributeError('Server subclass must set service_name')

# When running as a forked child, validate that the transport's receive timeout is short enough
# for the parent's ping watchdog to reliably detect hangs.
#
# Prefer the explicitly-injected _ping_timeout (passed as a positional arg by _ProcessMonitor)
# over the sys.argv-parsed value. On multiprocessing "spawn" start-mode platforms (Windows,
# macOS Python 3.13+) child processes do not inherit the parent's sys.argv, so
# cmd_options.ping_timeout would be None and the check would be silently skipped.
effective_ping_timeout = _ping_timeout if _ping_timeout is not None else cmd_options.ping_timeout
if forked_process_id is not None and effective_ping_timeout is not None:
cls._validate_receive_timeout_vs_ping_timeout(settings, effective_ping_timeout)

PySOALogContextFilter.set_service_name(cls.service_name)

# Set up logging
Expand All @@ -1059,5 +1137,8 @@ def main(cls, forked_process_id=None): # type: (Optional[int]) -> None
# Set up server and signal handling
server = cls.initialize(settings)(settings, forked_process_id) # type: Server

# Wire up the shared-memory ping timestamp supplied by the parent _ProcessMonitor (if any).
server._ping_timestamp = _ping_timestamp

# Start server event loop
server.run()
Loading