Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.markdown
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
# Pending

- Make the agent fork-safe. Under forking app servers (Puma cluster/`preload_app!`, etc.) the agent's background threads were started in the master and left alive across `fork()`, which could intermittently deadlock worker boot (a thread holding a lock at fork time). The agent now hooks `Process._fork` (Ruby 3.1+) to stop its threads immediately before a fork and restart them in both parent and child. (#618)
- Guard the agent's `Thread.new` calls against `ThreadError` ("can't alloc thread") so a process at its thread/pid limit logs and degrades instead of aborting agent startup. (#618)
- Only start the error-service background worker when `errors_enabled` is true (previously it started unconditionally). (#618)

# 6.2.0

- Fix compatibility with `http >= 6.0.0` (#613)
Expand Down
1 change: 1 addition & 0 deletions lib/scout_apm.rb
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ module ScoutApm
require 'scout_apm/config'
require 'scout_apm/environment'
require 'scout_apm/agent'
require 'scout_apm/fork_safety'
require 'scout_apm/logger'
require 'scout_apm/reporting'
require 'scout_apm/layaway'
Expand Down
94 changes: 83 additions & 11 deletions lib/scout_apm/agent.rb
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,10 @@ def install(force=false)

logger.info "Scout Agent [#{ScoutApm::VERSION}] Initialized"

# Hook fork() so the agent's threads are never left alive across a fork
# (the cause of intermittent worker-boot deadlocks under forking servers).
ScoutApm::ForkSafety.install

if should_load_instruments? || force
instrument_manager.install!
install_background_job_integrations
Expand Down Expand Up @@ -66,7 +70,7 @@ def start(opts={})

if context.started?
start_background_worker unless background_worker_running?
start_error_service_background_worker unless error_service_background_worker_running?
start_error_service_background_worker if error_service_enabled? && !error_service_background_worker_running?
return
end

Expand All @@ -82,7 +86,7 @@ def start(opts={})
@app_server_load ||= AppServerLoad.new(context).run

start_background_worker
start_error_service_background_worker
start_error_service_background_worker if error_service_enabled?
end

def instrument_manager
Expand Down Expand Up @@ -142,6 +146,47 @@ def should_load_instruments?
context.config.value('monitor')
end

###############
# Fork hooks #
###############

# Called (via ScoutApm::ForkSafety) on the parent side just before fork().
# Tears down every agent-owned thread *fast* (no graceful join / flush -- a
# join could block on an in-flight request and we must not be mid-operation
# when fork() runs). The recorder is reset so the child rebuilds it lazily.
def stop_threads_for_fork
logger.debug "[ForkSafety] Stopping agent threads before fork (pid #{Process.pid})" if background_worker_running?

if @app_server_load && @app_server_load.respond_to?(:stop)
@app_server_load.stop
end
@app_server_load = nil

@background_worker.stop if @background_worker
@background_worker_thread.kill if @background_worker_thread && @background_worker_thread.alive?
@background_worker_thread = nil

@error_service_background_worker.stop if @error_service_background_worker
@error_service_background_worker_thread.kill if @error_service_background_worker_thread && @error_service_background_worker_thread.alive?
@error_service_background_worker_thread = nil

context.reset_recorder_for_fork!
end

# Called on both the parent and the child after fork() returns. Restarts the
# agent's threads so each process has a fresh, working set. No-op unless the
# agent had already started (and monitoring is on).
def restart_after_fork
return unless context.started?
return unless context.config.value('monitor')

logger.debug "[ForkSafety] Restarting agent threads after fork (pid #{Process.pid})"

@app_server_load = AppServerLoad.new(context).run
start_background_worker(true)
start_error_service_background_worker if error_service_enabled?
end

#################################
# Background Worker Lifecycle #
#################################
Expand All @@ -168,15 +213,27 @@ def start_background_worker(quiet=false)

logger.info "Initializing worker thread."

ScoutApm::Agent::ExitHandler.new(context).install
# Install once per process. at_exit blocks are inherited across fork, so a
# forked child already has the handler and must not stack another one.
unless @exit_handler_installed
ScoutApm::Agent::ExitHandler.new(context).install
@exit_handler_installed = true
end

periodic_work = ScoutApm::PeriodicWork.new(context)

@background_worker = ScoutApm::BackgroundWorker.new(context)
@background_worker_thread = Thread.new do
@background_worker.start {
periodic_work.run
}
begin
@background_worker_thread = Thread.new do
@background_worker.start {
periodic_work.run
}
end
rescue ThreadError => e
logger.warn "Unable to start background worker thread: #{e.message}. Metrics will not be reported from this process."
@background_worker = nil
@background_worker_thread = nil
return false
end

return true
Expand Down Expand Up @@ -204,14 +261,29 @@ def background_worker_running?
# seconds to batch error reports
ERROR_SEND_FREQUENCY = 5
def start_error_service_background_worker
return false if error_service_background_worker_running?

periodic_work = ScoutApm::ErrorService::PeriodicWork.new(context)

@error_service_background_worker = ScoutApm::BackgroundWorker.new(context, ERROR_SEND_FREQUENCY)
@error_service_background_worker_thread = Thread.new do
@error_service_background_worker.start {
periodic_work.run
}
begin
@error_service_background_worker_thread = Thread.new do
@error_service_background_worker.start {
periodic_work.run
}
end
rescue ThreadError => e
logger.warn "Unable to start error service worker thread: #{e.message}. Errors will not be reported from this process."
@error_service_background_worker = nil
@error_service_background_worker_thread = nil
return false
end

return true
end

def error_service_enabled?
context.config.value('errors_enabled')
end

def error_service_background_worker_running?
Expand Down
13 changes: 13 additions & 0 deletions lib/scout_apm/agent_context.rb
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,19 @@ def recorder=(recorder)
@recorder = recorder
end

# On the fork() path: stop the current recorder (kills the async recorder
# thread, if any) and drop the memo so the child lazily builds a fresh one.
def reset_recorder_for_fork!
if @recorder && @recorder.respond_to?(:stop)
begin
@recorder.stop
rescue => e
logger.debug("Error stopping recorder for fork: #{e.message}")
end
end
@recorder = nil
end

# I believe this is only useful for testing?
def environment=(env)
@environment = env
Expand Down
7 changes: 7 additions & 0 deletions lib/scout_apm/app_server_load.rb
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,13 @@ def run
logger.debug("Failed Startup Info - #{e.message} \n\t#{e.backtrace.join("\t\n")}")
end

# Stops the reporting thread immediately. Used on the fork() path so a
# blocked startup-info report is never alive when the process forks.
def stop
@thread.kill if @thread && @thread.alive?
@thread = nil
end

def data
{
:language => 'ruby',
Expand Down
73 changes: 73 additions & 0 deletions lib/scout_apm/fork_safety.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
module ScoutApm
# Makes the agent safe across +fork()+.
#
# The agent runs several background threads (metrics worker, error-service
# worker, async recorder, app-server-load reporter). Threads are not inherited
# by a forked child, and worse, a thread that is mid-operation (holding a lock
# in the resolver, OpenSSL, malloc, the logger, ...) at the instant of +fork()+
# leaves the child holding that lock with no thread alive to release it -- an
# intermittent boot deadlock. Forking app servers (Puma cluster / preload,
# Unicorn) start the agent in the master and then fork, hitting exactly this.
#
# We cannot reliably detect "this process is about to fork" at boot (e.g. under
# `rails server`, Puma has not configured itself yet when our Railtie runs), so
# instead we hook +Process._fork+ -- invoked by every +Kernel#fork+ /
# +Process.fork+, including Puma's worker forks -- and:
#
# * before the fork: stop the agent's threads so none is alive at fork time
# * after the fork (in BOTH parent and child): restart them
#
# Restarting on both sides keeps monitoring alive in the surviving parent (an
# app may fork for reasons unrelated to a web worker) and gives each child a
# fresh, working set of threads.
#
# +Process._fork+ exists only on Ruby >= 3.1, so this is a no-op on older
# Rubies, which keep the previous mitigations (Puma before_worker_boot hook,
# first-request middleware start).
module ForkSafety
@installed = false

def self.install
return if @installed
return unless Process.respond_to?(:_fork)

Process.singleton_class.prepend(ProcessHook)
@installed = true
end

def self.installed?
@installed
end

# Parent side, just before the actual fork.
def self.prepare_for_fork
ScoutApm::Agent.instance.stop_threads_for_fork
rescue => e
log("Error preparing for fork: #{e.message}")
end

# Runs in both the parent and the child after the fork returns.
def self.complete_fork
ScoutApm::Agent.instance.restart_after_fork
rescue => e
log("Error restarting after fork: #{e.message}")
end

def self.log(message)
ScoutApm::Agent.instance.context.logger.debug("[ForkSafety] #{message}")
rescue
# Never let logging failures escape into the host's fork path.
end

module ProcessHook
# +super+ (the real fork) is called exactly once and outside any rescue, so
# the agent's bookkeeping can never prevent or duplicate the host's fork.
def _fork
ScoutApm::ForkSafety.prepare_for_fork
pid = super
ScoutApm::ForkSafety.complete_fork
pid
end
end
end
end
71 changes: 71 additions & 0 deletions test/unit/agent_fork_test.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
require 'test_helper'

class AgentForkTest < Minitest::Test
# A fresh, non-singleton Agent backed by real defaults + coercions.
def build_agent(overrides = {})
agent = ScoutApm::Agent.new
overlays = [
FakeConfigOverlay.new(overrides),
ScoutApm::Config::ConfigDefaults.new,
ScoutApm::Config::ConfigNull.new,
]
agent.context.config = ScoutApm::Config.new(agent.context, overlays)
# Don't register real at_exit handlers from these throwaway agents.
ScoutApm::Agent::ExitHandler.any_instance.stubs(:install)
agent
end

def test_error_service_enabled_reflects_config
assert build_agent('errors_enabled' => true).error_service_enabled?
refute build_agent('errors_enabled' => false).error_service_enabled?
end

def test_start_background_worker_survives_thread_alloc_failure
agent = build_agent('monitor' => true)
Thread.stubs(:new).raises(ThreadError.new("can't alloc thread"))

result = agent.start_background_worker(true) # must not raise

refute result
refute agent.background_worker_running?
end

def test_start_error_service_worker_survives_thread_alloc_failure
agent = build_agent('monitor' => true, 'errors_enabled' => true)
Thread.stubs(:new).raises(ThreadError.new("can't alloc thread"))

result = agent.start_error_service_background_worker # must not raise

refute result
refute agent.error_service_background_worker_running?
end

def test_stop_threads_for_fork_kills_running_worker
agent = build_agent('monitor' => true)
assert agent.start_background_worker(true)
assert agent.background_worker_running?

agent.stop_threads_for_fork

refute agent.background_worker_running?
end

def test_stop_threads_for_fork_resets_and_stops_recorder
agent = build_agent('monitor' => true)
fake_recorder = Object.new
def fake_recorder.stop; @stopped = true; end
def fake_recorder.stopped?; @stopped; end
agent.context.recorder = fake_recorder

agent.stop_threads_for_fork

assert fake_recorder.stopped?, "recorder should be stopped on the fork path"
assert_nil agent.context.instance_variable_get(:@recorder), "recorder memo should be cleared for lazy rebuild"
end

def test_restart_after_fork_is_noop_when_not_started
agent = build_agent('monitor' => true) # never started
agent.restart_after_fork
refute agent.background_worker_running?
end
end
31 changes: 31 additions & 0 deletions test/unit/fork_safety_test.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
require 'test_helper'

require 'scout_apm/fork_safety'

class ForkSafetyTest < Minitest::Test
def test_install_is_idempotent
ScoutApm::ForkSafety.install
before = ScoutApm::ForkSafety.installed?
ScoutApm::ForkSafety.install
assert_equal before, ScoutApm::ForkSafety.installed?
end

def test_hooks_process_fork_when_supported
ScoutApm::ForkSafety.install
if Process.respond_to?(:_fork)
assert ScoutApm::ForkSafety.installed?
assert Process.singleton_class.ancestors.include?(ScoutApm::ForkSafety::ProcessHook)
else
refute ScoutApm::ForkSafety.installed?
end
end

def test_prepare_and_complete_never_raise
# Even if the agent's bookkeeping blows up, the fork path must not raise.
ScoutApm::Agent.instance.stubs(:stop_threads_for_fork).raises(StandardError.new("boom"))
ScoutApm::Agent.instance.stubs(:restart_after_fork).raises(StandardError.new("boom"))

ScoutApm::ForkSafety.prepare_for_fork
ScoutApm::ForkSafety.complete_fork
end
end
Loading