diff --git a/CHANGELOG.markdown b/CHANGELOG.markdown index 7bd8c177..abae181d 100644 --- a/CHANGELOG.markdown +++ b/CHANGELOG.markdown @@ -1,5 +1,9 @@ # Pending +- Make the agent fork-safe. Under forking app servers (Puma cluster/`preload_app!`, etc.) the agent's background threads were started in the master and left alive across `fork()`, which could intermittently deadlock worker boot (a thread holding a lock at fork time). The agent now hooks `Process._fork` (Ruby 3.1+) to stop its threads immediately before a fork and restart them in both parent and child. (#618) +- Guard the agent's `Thread.new` calls against `ThreadError` ("can't alloc thread") so a process at its thread/pid limit logs and degrades instead of aborting agent startup. (#618) +- Only start the error-service background worker when `errors_enabled` is true (previously it started unconditionally). (#618) + # 6.2.0 - Fix compatibility with `http >= 6.0.0` (#613) diff --git a/lib/scout_apm.rb b/lib/scout_apm.rb index c77244af..da37814a 100644 --- a/lib/scout_apm.rb +++ b/lib/scout_apm.rb @@ -128,6 +128,7 @@ module ScoutApm require 'scout_apm/config' require 'scout_apm/environment' require 'scout_apm/agent' +require 'scout_apm/fork_safety' require 'scout_apm/logger' require 'scout_apm/reporting' require 'scout_apm/layaway' diff --git a/lib/scout_apm/agent.rb b/lib/scout_apm/agent.rb index e94dadbe..1ce96d1c 100644 --- a/lib/scout_apm/agent.rb +++ b/lib/scout_apm/agent.rb @@ -37,6 +37,10 @@ def install(force=false) logger.info "Scout Agent [#{ScoutApm::VERSION}] Initialized" + # Hook fork() so the agent's threads are never left alive across a fork + # (the cause of intermittent worker-boot deadlocks under forking servers). + ScoutApm::ForkSafety.install + if should_load_instruments? || force instrument_manager.install! install_background_job_integrations @@ -66,7 +70,7 @@ def start(opts={}) if context.started? start_background_worker unless background_worker_running? - start_error_service_background_worker unless error_service_background_worker_running? + start_error_service_background_worker if error_service_enabled? && !error_service_background_worker_running? return end @@ -82,7 +86,7 @@ def start(opts={}) @app_server_load ||= AppServerLoad.new(context).run start_background_worker - start_error_service_background_worker + start_error_service_background_worker if error_service_enabled? end def instrument_manager @@ -142,6 +146,47 @@ def should_load_instruments? context.config.value('monitor') end + ############### + # Fork hooks # + ############### + + # Called (via ScoutApm::ForkSafety) on the parent side just before fork(). + # Tears down every agent-owned thread *fast* (no graceful join / flush -- a + # join could block on an in-flight request and we must not be mid-operation + # when fork() runs). The recorder is reset so the child rebuilds it lazily. + def stop_threads_for_fork + logger.debug "[ForkSafety] Stopping agent threads before fork (pid #{Process.pid})" if background_worker_running? + + if @app_server_load && @app_server_load.respond_to?(:stop) + @app_server_load.stop + end + @app_server_load = nil + + @background_worker.stop if @background_worker + @background_worker_thread.kill if @background_worker_thread && @background_worker_thread.alive? + @background_worker_thread = nil + + @error_service_background_worker.stop if @error_service_background_worker + @error_service_background_worker_thread.kill if @error_service_background_worker_thread && @error_service_background_worker_thread.alive? + @error_service_background_worker_thread = nil + + context.reset_recorder_for_fork! + end + + # Called on both the parent and the child after fork() returns. Restarts the + # agent's threads so each process has a fresh, working set. No-op unless the + # agent had already started (and monitoring is on). + def restart_after_fork + return unless context.started? + return unless context.config.value('monitor') + + logger.debug "[ForkSafety] Restarting agent threads after fork (pid #{Process.pid})" + + @app_server_load = AppServerLoad.new(context).run + start_background_worker(true) + start_error_service_background_worker if error_service_enabled? + end + ################################# # Background Worker Lifecycle # ################################# @@ -168,15 +213,27 @@ def start_background_worker(quiet=false) logger.info "Initializing worker thread." - ScoutApm::Agent::ExitHandler.new(context).install + # Install once per process. at_exit blocks are inherited across fork, so a + # forked child already has the handler and must not stack another one. + unless @exit_handler_installed + ScoutApm::Agent::ExitHandler.new(context).install + @exit_handler_installed = true + end periodic_work = ScoutApm::PeriodicWork.new(context) @background_worker = ScoutApm::BackgroundWorker.new(context) - @background_worker_thread = Thread.new do - @background_worker.start { - periodic_work.run - } + begin + @background_worker_thread = Thread.new do + @background_worker.start { + periodic_work.run + } + end + rescue ThreadError => e + logger.warn "Unable to start background worker thread: #{e.message}. Metrics will not be reported from this process." + @background_worker = nil + @background_worker_thread = nil + return false end return true @@ -204,14 +261,29 @@ def background_worker_running? # seconds to batch error reports ERROR_SEND_FREQUENCY = 5 def start_error_service_background_worker + return false if error_service_background_worker_running? + periodic_work = ScoutApm::ErrorService::PeriodicWork.new(context) @error_service_background_worker = ScoutApm::BackgroundWorker.new(context, ERROR_SEND_FREQUENCY) - @error_service_background_worker_thread = Thread.new do - @error_service_background_worker.start { - periodic_work.run - } + begin + @error_service_background_worker_thread = Thread.new do + @error_service_background_worker.start { + periodic_work.run + } + end + rescue ThreadError => e + logger.warn "Unable to start error service worker thread: #{e.message}. Errors will not be reported from this process." + @error_service_background_worker = nil + @error_service_background_worker_thread = nil + return false end + + return true + end + + def error_service_enabled? + context.config.value('errors_enabled') end def error_service_background_worker_running? diff --git a/lib/scout_apm/agent_context.rb b/lib/scout_apm/agent_context.rb index bd07bc5c..6225c7ab 100644 --- a/lib/scout_apm/agent_context.rb +++ b/lib/scout_apm/agent_context.rb @@ -210,6 +210,19 @@ def recorder=(recorder) @recorder = recorder end + # On the fork() path: stop the current recorder (kills the async recorder + # thread, if any) and drop the memo so the child lazily builds a fresh one. + def reset_recorder_for_fork! + if @recorder && @recorder.respond_to?(:stop) + begin + @recorder.stop + rescue => e + logger.debug("Error stopping recorder for fork: #{e.message}") + end + end + @recorder = nil + end + # I believe this is only useful for testing? def environment=(env) @environment = env diff --git a/lib/scout_apm/app_server_load.rb b/lib/scout_apm/app_server_load.rb index 066e8377..2c9fb8c6 100644 --- a/lib/scout_apm/app_server_load.rb +++ b/lib/scout_apm/app_server_load.rb @@ -28,6 +28,13 @@ def run logger.debug("Failed Startup Info - #{e.message} \n\t#{e.backtrace.join("\t\n")}") end + # Stops the reporting thread immediately. Used on the fork() path so a + # blocked startup-info report is never alive when the process forks. + def stop + @thread.kill if @thread && @thread.alive? + @thread = nil + end + def data { :language => 'ruby', diff --git a/lib/scout_apm/fork_safety.rb b/lib/scout_apm/fork_safety.rb new file mode 100644 index 00000000..a36b93b2 --- /dev/null +++ b/lib/scout_apm/fork_safety.rb @@ -0,0 +1,73 @@ +module ScoutApm + # Makes the agent safe across +fork()+. + # + # The agent runs several background threads (metrics worker, error-service + # worker, async recorder, app-server-load reporter). Threads are not inherited + # by a forked child, and worse, a thread that is mid-operation (holding a lock + # in the resolver, OpenSSL, malloc, the logger, ...) at the instant of +fork()+ + # leaves the child holding that lock with no thread alive to release it -- an + # intermittent boot deadlock. Forking app servers (Puma cluster / preload, + # Unicorn) start the agent in the master and then fork, hitting exactly this. + # + # We cannot reliably detect "this process is about to fork" at boot (e.g. under + # `rails server`, Puma has not configured itself yet when our Railtie runs), so + # instead we hook +Process._fork+ -- invoked by every +Kernel#fork+ / + # +Process.fork+, including Puma's worker forks -- and: + # + # * before the fork: stop the agent's threads so none is alive at fork time + # * after the fork (in BOTH parent and child): restart them + # + # Restarting on both sides keeps monitoring alive in the surviving parent (an + # app may fork for reasons unrelated to a web worker) and gives each child a + # fresh, working set of threads. + # + # +Process._fork+ exists only on Ruby >= 3.1, so this is a no-op on older + # Rubies, which keep the previous mitigations (Puma before_worker_boot hook, + # first-request middleware start). + module ForkSafety + @installed = false + + def self.install + return if @installed + return unless Process.respond_to?(:_fork) + + Process.singleton_class.prepend(ProcessHook) + @installed = true + end + + def self.installed? + @installed + end + + # Parent side, just before the actual fork. + def self.prepare_for_fork + ScoutApm::Agent.instance.stop_threads_for_fork + rescue => e + log("Error preparing for fork: #{e.message}") + end + + # Runs in both the parent and the child after the fork returns. + def self.complete_fork + ScoutApm::Agent.instance.restart_after_fork + rescue => e + log("Error restarting after fork: #{e.message}") + end + + def self.log(message) + ScoutApm::Agent.instance.context.logger.debug("[ForkSafety] #{message}") + rescue + # Never let logging failures escape into the host's fork path. + end + + module ProcessHook + # +super+ (the real fork) is called exactly once and outside any rescue, so + # the agent's bookkeeping can never prevent or duplicate the host's fork. + def _fork + ScoutApm::ForkSafety.prepare_for_fork + pid = super + ScoutApm::ForkSafety.complete_fork + pid + end + end + end +end diff --git a/test/unit/agent_fork_test.rb b/test/unit/agent_fork_test.rb new file mode 100644 index 00000000..fa71e18e --- /dev/null +++ b/test/unit/agent_fork_test.rb @@ -0,0 +1,71 @@ +require 'test_helper' + +class AgentForkTest < Minitest::Test + # A fresh, non-singleton Agent backed by real defaults + coercions. + def build_agent(overrides = {}) + agent = ScoutApm::Agent.new + overlays = [ + FakeConfigOverlay.new(overrides), + ScoutApm::Config::ConfigDefaults.new, + ScoutApm::Config::ConfigNull.new, + ] + agent.context.config = ScoutApm::Config.new(agent.context, overlays) + # Don't register real at_exit handlers from these throwaway agents. + ScoutApm::Agent::ExitHandler.any_instance.stubs(:install) + agent + end + + def test_error_service_enabled_reflects_config + assert build_agent('errors_enabled' => true).error_service_enabled? + refute build_agent('errors_enabled' => false).error_service_enabled? + end + + def test_start_background_worker_survives_thread_alloc_failure + agent = build_agent('monitor' => true) + Thread.stubs(:new).raises(ThreadError.new("can't alloc thread")) + + result = agent.start_background_worker(true) # must not raise + + refute result + refute agent.background_worker_running? + end + + def test_start_error_service_worker_survives_thread_alloc_failure + agent = build_agent('monitor' => true, 'errors_enabled' => true) + Thread.stubs(:new).raises(ThreadError.new("can't alloc thread")) + + result = agent.start_error_service_background_worker # must not raise + + refute result + refute agent.error_service_background_worker_running? + end + + def test_stop_threads_for_fork_kills_running_worker + agent = build_agent('monitor' => true) + assert agent.start_background_worker(true) + assert agent.background_worker_running? + + agent.stop_threads_for_fork + + refute agent.background_worker_running? + end + + def test_stop_threads_for_fork_resets_and_stops_recorder + agent = build_agent('monitor' => true) + fake_recorder = Object.new + def fake_recorder.stop; @stopped = true; end + def fake_recorder.stopped?; @stopped; end + agent.context.recorder = fake_recorder + + agent.stop_threads_for_fork + + assert fake_recorder.stopped?, "recorder should be stopped on the fork path" + assert_nil agent.context.instance_variable_get(:@recorder), "recorder memo should be cleared for lazy rebuild" + end + + def test_restart_after_fork_is_noop_when_not_started + agent = build_agent('monitor' => true) # never started + agent.restart_after_fork + refute agent.background_worker_running? + end +end diff --git a/test/unit/fork_safety_test.rb b/test/unit/fork_safety_test.rb new file mode 100644 index 00000000..68c47c1a --- /dev/null +++ b/test/unit/fork_safety_test.rb @@ -0,0 +1,31 @@ +require 'test_helper' + +require 'scout_apm/fork_safety' + +class ForkSafetyTest < Minitest::Test + def test_install_is_idempotent + ScoutApm::ForkSafety.install + before = ScoutApm::ForkSafety.installed? + ScoutApm::ForkSafety.install + assert_equal before, ScoutApm::ForkSafety.installed? + end + + def test_hooks_process_fork_when_supported + ScoutApm::ForkSafety.install + if Process.respond_to?(:_fork) + assert ScoutApm::ForkSafety.installed? + assert Process.singleton_class.ancestors.include?(ScoutApm::ForkSafety::ProcessHook) + else + refute ScoutApm::ForkSafety.installed? + end + end + + def test_prepare_and_complete_never_raise + # Even if the agent's bookkeeping blows up, the fork path must not raise. + ScoutApm::Agent.instance.stubs(:stop_threads_for_fork).raises(StandardError.new("boom")) + ScoutApm::Agent.instance.stubs(:restart_after_fork).raises(StandardError.new("boom")) + + ScoutApm::ForkSafety.prepare_for_fork + ScoutApm::ForkSafety.complete_fork + end +end