From 0c9c7578fc1f27cae6a718939120858799ba9924 Mon Sep 17 00:00:00 2001 From: rkishner Date: Thu, 28 May 2026 10:45:19 +0300 Subject: [PATCH] utils: replace pexpect.spawn with pty+subprocess to avoid forkpty Co-authored-by: Cursor Signed-off-by: rkishner --- utilities/console.py | 79 +++++-- utilities/unittests/test_console.py | 319 +++++++++++++++++++++++++--- 2 files changed, 355 insertions(+), 43 deletions(-) diff --git a/utilities/console.py b/utilities/console.py index 9ad746566c..5b48c09ed9 100644 --- a/utilities/console.py +++ b/utilities/console.py @@ -1,9 +1,13 @@ import logging import os +import pty +import shlex +import subprocess import pexpect +import pexpect.fdpexpect from ocp_resources.virtual_machine import VirtualMachine -from timeout_sampler import TimeoutSampler, retry +from timeout_sampler import TimeoutExpiredError, TimeoutSampler, retry from utilities.constants import ( TIMEOUT_5MIN, @@ -58,7 +62,8 @@ def __init__( password or getattr(self.vm, "login_params", {}).get("password") or self.vm.password # type: ignore[attr-defined] ) self.timeout = timeout - self.child = None + self.child: pexpect.fdpexpect.fdspawn | None = None + self._proc: subprocess.Popen[bytes] | None = None self.login_prompt = "login:" self.prompt = prompt if prompt else [r"#", r"\$"] self.kubeconfig = kubeconfig @@ -68,13 +73,14 @@ def __init__( @retry(wait_timeout=TIMEOUT_5MIN, sleep=TIMEOUT_10SEC) def connect(self): LOGGER.info(f"Connect to {self.vm.name} console") - self.console_eof_sampler(func=pexpect.spawn, command=self.cmd, timeout=self.timeout) - try: + self.console_eof_sampler() self._connect() - except Exception: + except TimeoutExpiredError, pexpect.exceptions.ExceptionPexpect: LOGGER.exception(f"Failed to connect to {self.vm.name} console.") - self.child.close() + if self.child is not None: + self.child.close() + self._terminate_proc() raise return self.child @@ -95,8 +101,8 @@ def _connect(self): LOGGER.info(f"{self.vm.name}: Got prompt {self.prompt}") def disconnect(self): - if self.child.terminated: - self.console_eof_sampler(func=pexpect.spawn, command=self.cmd, timeout=self.timeout) + if self._proc is not None and self._proc.poll() is not None: + self.console_eof_sampler() try: self.child.send("\n\n") @@ -107,16 +113,63 @@ def disconnect(self): self.child.expect("login:") finally: self.child.close() + self._terminate_proc() + + def _spawn_console(self) -> pexpect.fdpexpect.fdspawn: + """ + Creates a pty pair and spawns virtctl via subprocess, returning an fdspawn + wrapping the master end. - def console_eof_sampler(self, func, command, timeout): + Uses pty.openpty() + subprocess.Popen instead of pexpect.spawn to avoid + os.forkpty(), which is deprecated in multi-threaded processes (Python 3.12+). + """ + self._terminate_proc() + master_fd, slave_fd = pty.openpty() + proc: subprocess.Popen[bytes] | None = None + try: + proc = subprocess.Popen( + shlex.split(self.cmd), + stdin=slave_fd, + stdout=slave_fd, + stderr=slave_fd, + start_new_session=True, + ) + child = pexpect.fdpexpect.fdspawn(fd=master_fd, encoding="utf-8", timeout=self.timeout) + except OSError, ValueError, pexpect.exceptions.ExceptionPexpect: + if proc is not None: + proc.terminate() + try: + proc.wait(timeout=TIMEOUT_10SEC) + except subprocess.TimeoutExpired: + proc.kill() + proc.wait(timeout=TIMEOUT_10SEC) + os.close(master_fd) + raise + finally: + os.close(slave_fd) + + self._proc = proc + return child + + def _terminate_proc(self) -> None: + if self._proc is not None: + try: + if self._proc.poll() is None: + self._proc.terminate() + self._proc.wait(timeout=TIMEOUT_10SEC) + except subprocess.TimeoutExpired: + LOGGER.warning(f"Force killing unresponsive console process for {self.vm.name}") + self._proc.kill() + self._proc.wait(timeout=TIMEOUT_10SEC) + finally: + self._proc = None + + def console_eof_sampler(self) -> None: sampler = TimeoutSampler( wait_timeout=TIMEOUT_5MIN, sleep=5, - func=func, + func=self._spawn_console, exceptions_dict={pexpect.exceptions.EOF: []}, - command=command, - timeout=timeout, - encoding="utf-8", ) for sample in sampler: if sample: diff --git a/utilities/unittests/test_console.py b/utilities/unittests/test_console.py index ca8bae0396..bdc7dc9014 100644 --- a/utilities/unittests/test_console.py +++ b/utilities/unittests/test_console.py @@ -1,13 +1,19 @@ -# Generated using Claude cli - """Unit tests for console module""" import os +import subprocess from unittest.mock import MagicMock, mock_open, patch +import pexpect +import pytest from console import Console +def _single_attempt_sampler(func, func_args=(), **kwargs): + """Drop-in replacement for TimeoutSampler that runs func once with no retries.""" + return iter([func(*func_args)]) + + class TestConsole: """Test cases for Console class""" @@ -313,10 +319,9 @@ def test_console_disconnect_no_username(self, mock_get_dir): exit_calls = [call for call in console.child.send.call_args_list if "exit" in str(call)] assert len(exit_calls) == 0 - @patch("console.pexpect") @patch("console.get_data_collector_base_directory") - def test_console_disconnect_terminated_child(self, mock_get_dir, mock_pexpect): - """Test disconnect method when child is terminated""" + def test_console_disconnect_terminated_child(self, mock_get_dir): + """Test disconnect method when the console subprocess has exited""" mock_get_dir.return_value = "/tmp/data" mock_vm = MagicMock() mock_vm.name = "test-vm" @@ -327,19 +332,14 @@ def test_console_disconnect_terminated_child(self, mock_get_dir, mock_pexpect): console = Console(vm=mock_vm) console.child = MagicMock() - console.child.terminated = True + mock_proc = MagicMock() + mock_proc.poll.return_value = 0 + console._proc = mock_proc - # Mock the console_eof_sampler - console.console_eof_sampler = MagicMock() + with patch.object(console, "console_eof_sampler") as mock_eof_sampler: + console.disconnect() - console.disconnect() - - # Should call console_eof_sampler when child is terminated - console.console_eof_sampler.assert_called_once_with( - func=mock_pexpect.spawn, - command=console.cmd, - timeout=console.timeout, - ) + mock_eof_sampler.assert_called_once_with() @patch("console.TimeoutSampler") @patch("builtins.open", new_callable=mock_open) @@ -363,18 +363,12 @@ def test_console_eof_sampler_success(self, mock_get_dir, mock_file_open, mock_ti mock_sampler_instance.__iter__.return_value = [mock_sample] mock_timeout_sampler.return_value = mock_sampler_instance - mock_func = MagicMock() - command = "test-command" - timeout = 30 - - console.console_eof_sampler(func=mock_func, command=command, timeout=timeout) + console.console_eof_sampler() - # Should create TimeoutSampler with correct parameters + # Should create TimeoutSampler with _spawn_console as the retry function mock_timeout_sampler.assert_called_once() call_args = mock_timeout_sampler.call_args - assert call_args[1]["func"] == mock_func - assert call_args[1]["command"] == command - assert call_args[1]["timeout"] == timeout + assert call_args[1]["func"] == console._spawn_console # Should set child and logfile assert console.child == mock_sample @@ -400,11 +394,276 @@ def test_console_eof_sampler_no_sample(self, mock_get_dir, mock_timeout_sampler) mock_sampler_instance.__iter__.return_value = [None] mock_timeout_sampler.return_value = mock_sampler_instance - mock_func = MagicMock() - command = "test-command" - timeout = 30 - - console.console_eof_sampler(func=mock_func, command=command, timeout=timeout) + console.console_eof_sampler() # Should not change child when no valid sample is found assert console.child == original_child + + @patch("console.os.close") + @patch("console.pexpect.fdpexpect.fdspawn") + @patch("console.subprocess.Popen") + @patch("console.pty.openpty") + @patch("console.get_data_collector_base_directory") + def test_spawn_console_success( + self, + mock_get_dir, + mock_openpty, + mock_popen, + mock_fdspawn, + mock_os_close, + ): + """Test _spawn_console creates fdspawn and stores subprocess handle""" + mock_get_dir.return_value = "/tmp/data" + mock_vm = MagicMock() + mock_vm.name = "test-vm" + mock_vm.namespace = None + + mock_openpty.return_value = (10, 11) + mock_proc = MagicMock() + mock_popen.return_value = mock_proc + mock_child = MagicMock() + mock_fdspawn.return_value = mock_child + + console = Console(vm=mock_vm) + result = console._spawn_console() + + assert result == mock_child + assert console._proc == mock_proc + mock_popen.assert_called_once() + mock_fdspawn.assert_called_once_with(fd=10, encoding="utf-8", timeout=30) + mock_os_close.assert_called_once_with(11) + + @patch("console.os.close") + @patch("console.subprocess.Popen") + @patch("console.pty.openpty") + @patch("console.get_data_collector_base_directory") + def test_spawn_console_popen_failure(self, mock_get_dir, mock_openpty, mock_popen, mock_os_close): + """Test _spawn_console closes pty fds when subprocess spawn fails""" + mock_get_dir.return_value = "/tmp/data" + mock_vm = MagicMock() + mock_vm.name = "test-vm" + mock_vm.namespace = None + + mock_openpty.return_value = (10, 11) + mock_popen.side_effect = OSError("spawn failed") + + console = Console(vm=mock_vm) + with pytest.raises(OSError, match="spawn failed"): + console._spawn_console() + + mock_os_close.assert_any_call(10) + mock_os_close.assert_any_call(11) + assert console._proc is None + + @patch("console.os.close") + @patch("console.pexpect.fdpexpect.fdspawn") + @patch("console.subprocess.Popen") + @patch("console.pty.openpty") + @patch("console.get_data_collector_base_directory") + def test_spawn_console_fdspawn_failure( + self, + mock_get_dir, + mock_openpty, + mock_popen, + mock_fdspawn, + mock_os_close, + ): + """Test _spawn_console terminates subprocess when fdspawn fails""" + mock_get_dir.return_value = "/tmp/data" + mock_vm = MagicMock() + mock_vm.name = "test-vm" + mock_vm.namespace = None + + mock_openpty.return_value = (10, 11) + mock_proc = MagicMock() + mock_popen.return_value = mock_proc + mock_fdspawn.side_effect = pexpect.exceptions.ExceptionPexpect("fdspawn failed") + + console = Console(vm=mock_vm) + with pytest.raises(pexpect.exceptions.ExceptionPexpect, match="fdspawn failed"): + console._spawn_console() + + mock_proc.terminate.assert_called_once() + mock_proc.wait.assert_called_once() + mock_os_close.assert_any_call(10) + mock_os_close.assert_any_call(11) + assert console._proc is None + + @patch("console.os.close") + @patch("console.pexpect.fdpexpect.fdspawn") + @patch("console.subprocess.Popen") + @patch("console.pty.openpty") + @patch("console.get_data_collector_base_directory") + def test_spawn_console_fdspawn_failure_force_kill( + self, + mock_get_dir, + mock_openpty, + mock_popen, + mock_fdspawn, + mock_os_close, + ): + """Test _spawn_console force-kills subprocess when terminate times out""" + mock_get_dir.return_value = "/tmp/data" + mock_vm = MagicMock() + mock_vm.name = "test-vm" + mock_vm.namespace = None + + mock_openpty.return_value = (10, 11) + mock_proc = MagicMock() + mock_popen.return_value = mock_proc + mock_proc.wait.side_effect = [subprocess.TimeoutExpired(cmd="virtctl", timeout=10), None] + mock_fdspawn.side_effect = pexpect.exceptions.ExceptionPexpect("fdspawn failed") + + console = Console(vm=mock_vm) + with pytest.raises(pexpect.exceptions.ExceptionPexpect, match="fdspawn failed"): + console._spawn_console() + + mock_proc.terminate.assert_called_once() + mock_proc.kill.assert_called_once() + assert mock_proc.wait.call_count == 2 + mock_os_close.assert_any_call(10) # master_fd closed in except block + mock_os_close.assert_any_call(11) # slave_fd closed in finally block + + @patch("console.get_data_collector_base_directory") + def test_terminate_proc_running_process(self, mock_get_dir): + """Test _terminate_proc gracefully terminates a running subprocess""" + mock_get_dir.return_value = "/tmp/data" + mock_vm = MagicMock() + mock_vm.name = "test-vm" + mock_vm.namespace = None + + mock_proc = MagicMock() + mock_proc.poll.return_value = None + + console = Console(vm=mock_vm) + console._proc = mock_proc + console._terminate_proc() + + mock_proc.terminate.assert_called_once() + mock_proc.wait.assert_called_once_with(timeout=10) + assert console._proc is None + + @patch("console.get_data_collector_base_directory") + def test_terminate_proc_force_kill(self, mock_get_dir): + """Test _terminate_proc force-kills subprocess when terminate times out""" + mock_get_dir.return_value = "/tmp/data" + mock_vm = MagicMock() + mock_vm.name = "test-vm" + mock_vm.namespace = None + + mock_proc = MagicMock() + mock_proc.poll.return_value = None + mock_proc.wait.side_effect = [subprocess.TimeoutExpired(cmd="virtctl", timeout=10), None] + + console = Console(vm=mock_vm) + console._proc = mock_proc + console._terminate_proc() + + mock_proc.terminate.assert_called_once() + mock_proc.kill.assert_called_once() + assert mock_proc.wait.call_count == 2 + assert console._proc is None + + @patch("console.get_data_collector_base_directory") + def test_terminate_proc_already_exited(self, mock_get_dir): + """Test _terminate_proc waits without terminate when process already exited""" + mock_get_dir.return_value = "/tmp/data" + mock_vm = MagicMock() + mock_vm.name = "test-vm" + mock_vm.namespace = None + + mock_proc = MagicMock() + mock_proc.poll.return_value = 0 + + console = Console(vm=mock_vm) + console._proc = mock_proc + console._terminate_proc() + + mock_proc.terminate.assert_not_called() + mock_proc.wait.assert_called_once_with(timeout=10) + assert console._proc is None + + @patch("console.get_data_collector_base_directory") + def test_console_connect_failure_cleanup(self, mock_get_dir): + """Test connect cleans up child and subprocess on failure""" + mock_get_dir.return_value = "/tmp/data" + mock_vm = MagicMock() + mock_vm.name = "test-vm" + mock_vm.namespace = None + mock_vm.username = "user" + mock_vm.password = "pass" + mock_vm.login_params = {} + + console = Console(vm=mock_vm) + mock_child = MagicMock() + + with ( + patch("timeout_sampler.TimeoutSampler", _single_attempt_sampler), + patch.object(console, "console_eof_sampler") as mock_sampler, + patch.object(console, "_connect", side_effect=pexpect.exceptions.TIMEOUT("login prompt not seen")), + patch.object(console, "_terminate_proc") as mock_terminate, + ): + mock_sampler.side_effect = lambda: setattr(console, "child", mock_child) + with pytest.raises(pexpect.exceptions.TIMEOUT): + console.connect() + + mock_child.close.assert_called_once() + mock_terminate.assert_called_once() + + @patch("console.get_data_collector_base_directory") + def test_console_connect_failure_no_child(self, mock_get_dir): + """Test connect terminates subprocess when child was never set""" + mock_get_dir.return_value = "/tmp/data" + mock_vm = MagicMock() + mock_vm.name = "test-vm" + mock_vm.namespace = None + mock_vm.username = "user" + mock_vm.password = "pass" + mock_vm.login_params = {} + + console = Console(vm=mock_vm) + + with ( + patch("timeout_sampler.TimeoutSampler", _single_attempt_sampler), + patch.object(console, "console_eof_sampler"), + patch.object(console, "_connect", side_effect=pexpect.exceptions.TIMEOUT("login prompt not seen")), + patch.object(console, "_terminate_proc") as mock_terminate, + ): + with pytest.raises(pexpect.exceptions.TIMEOUT): + console.connect() + + mock_terminate.assert_called_once() + + @patch("console.os.close") + @patch("console.pexpect.fdpexpect.fdspawn") + @patch("console.subprocess.Popen") + @patch("console.pty.openpty") + @patch("console.get_data_collector_base_directory") + def test_spawn_console_terminates_previous_proc_on_retry( + self, + mock_get_dir, + mock_openpty, + mock_popen, + mock_fdspawn, + mock_os_close, + ): + """Test _spawn_console terminates any previous subprocess before spawning a new one""" + mock_get_dir.return_value = "/tmp/data" + mock_vm = MagicMock() + mock_vm.name = "test-vm" + mock_vm.namespace = None + + mock_openpty.return_value = (10, 11) + old_proc = MagicMock() + old_proc.poll.return_value = None + new_proc = MagicMock() + mock_popen.return_value = new_proc + mock_fdspawn.return_value = MagicMock() + + console = Console(vm=mock_vm) + console._proc = old_proc + + console._spawn_console() + + old_proc.terminate.assert_called_once() + assert console._proc == new_proc