From 0b2742ea0bd8db725c71c79dbd3828e1f7518ef4 Mon Sep 17 00:00:00 2001 From: Robert Baldyga Date: Thu, 19 Mar 2026 16:44:59 +0100 Subject: [PATCH 1/4] Introduce get_syslog_path() Signed-off-by: Robert Baldyga --- test_tools/os_tools.py | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/test_tools/os_tools.py b/test_tools/os_tools.py index e087448..cfa810c 100644 --- a/test_tools/os_tools.py +++ b/test_tools/os_tools.py @@ -49,6 +49,14 @@ class SystemManagerType(Enum): systemd = 1 +def get_syslog_path(): + for path in ["/var/log/messages", "/var/log/syslog"]: + result = TestRun.executor.run(f"test -f {path}") + if result.exit_code == 0: + return path + raise FileNotFoundError("Could not find syslog file") + + def get_distro(): output = TestRun.executor.run( "cat /etc/os-release | grep -e \"^ID=\" | awk -F= '{print$2}' | tr -d '\"'" From cd1f0b8dae43704c19a4610ba1858cf677992293 Mon Sep 17 00:00:00 2001 From: Robert Baldyga Date: Thu, 19 Mar 2026 16:45:42 +0100 Subject: [PATCH 2/4] scsi_debug: Handle syslog parsing properly Signed-off-by: Robert Baldyga --- test_tools/scsi_debug.py | 60 +++++++++++++++++++++++----------------- 1 file changed, 34 insertions(+), 26 deletions(-) diff --git a/test_tools/scsi_debug.py b/test_tools/scsi_debug.py index d5fea09..7f47bc3 100644 --- a/test_tools/scsi_debug.py +++ b/test_tools/scsi_debug.py @@ -1,69 +1,81 @@ # # Copyright(c) 2022 Intel Corporation +# Copyright(c) 2026 Unvertical # SPDX-License-Identifier: BSD-3-Clause # import re from core.test_run import TestRun - -syslog_path = "/var/log/messages" +from test_tools.os_tools import get_syslog_path class Logs: - last_read_line = 1 + last_read_line = 0 + syslog_path = None FLUSH = re.compile(r"scsi_debug:[\s\S]*cmd 35") FUA = re.compile(r"scsi_debug:[\s\S]*cmd 2a 08") + @staticmethod + def mark(): + """Set syslog position to current end so subsequent reads only see new entries.""" + if Logs.syslog_path is None: + Logs.syslog_path = get_syslog_path() + + line_count = TestRun.executor.run_expect_success( + f"wc -l < {Logs.syslog_path}" + ).stdout.strip() + Logs.last_read_line = int(line_count) + @staticmethod def check_syslog_for_signals(): - Logs.check_syslog_for_flush() - Logs.check_syslog_for_fua() + log_lines = Logs._read_syslog() + flush_count = Logs._count_logs(log_lines, Logs.FLUSH) + fua_count = Logs._count_logs(log_lines, Logs.FUA) + Logs._validate_logs_amount(fua_count, "FUA") + Logs._validate_logs_amount(flush_count, "FLUSH") @staticmethod def check_syslog_for_flush(): """Check syslog for FLUSH logs""" - log_lines = Logs._read_syslog(Logs.last_read_line) + log_lines = Logs._read_syslog() flush_logs_counter = Logs._count_logs(log_lines, Logs.FLUSH) - log_type = "FLUSH" - Logs._validate_logs_amount(flush_logs_counter, log_type) + Logs._validate_logs_amount(flush_logs_counter, "FLUSH") @staticmethod def check_syslog_for_fua(): """Check syslog for FUA logs""" - log_lines = Logs._read_syslog(Logs.last_read_line) + log_lines = Logs._read_syslog() fua_logs_counter = Logs._count_logs(log_lines, Logs.FUA) - log_type = "FUA" - Logs._validate_logs_amount(fua_logs_counter, log_type) + Logs._validate_logs_amount(fua_logs_counter, "FUA") @staticmethod - def _read_syslog(last_read_line: int): - """Read recent lines in syslog, mark last line and return read lines as list.""" + def _read_syslog(): + """Read syslog lines since last mark and advance the position.""" + if Logs.syslog_path is None: + Logs.syslog_path = get_syslog_path() + + start_line = Logs.last_read_line + 1 log_lines = TestRun.executor.run_expect_success( - f"tail -qn +{last_read_line} {syslog_path}" + f"tail -qn +{start_line} {Logs.syslog_path}" ).stdout.splitlines() - # mark last read line to continue next reading from here Logs.last_read_line += len(log_lines) - return log_lines @staticmethod def _count_logs(log_lines: list, expected_log): """Count specified log in list and return its amount.""" logs_counter = 0 - for line in log_lines: - is_log_in_line = expected_log.search(line) - if is_log_in_line is not None: + if expected_log.search(line) is not None: logs_counter += 1 - return logs_counter @staticmethod def _validate_logs_amount(logs_counter: int, log_type: str): - """Validate amount of logs and return""" + """Validate amount of logs.""" if logs_counter == 0: - if Logs._is_flush(log_type): + if log_type == "FLUSH": TestRun.LOGGER.error(f"{log_type} log not occured") else: TestRun.LOGGER.warning(f"{log_type} log not occured") @@ -71,7 +83,3 @@ def _validate_logs_amount(logs_counter: int, log_type: str): TestRun.LOGGER.warning(f"{log_type} log occured only once.") else: TestRun.LOGGER.info(f"{log_type} log occured {logs_counter} times.") - - @staticmethod - def _is_flush(log_type: str): - return log_type == "FLUSH" From 227ee2e89348529478025ee1c21f5763fea3a03c Mon Sep 17 00:00:00 2001 From: Robert Baldyga Date: Thu, 19 Mar 2026 18:08:44 +0100 Subject: [PATCH 3/4] scsi_debug: Move plugin into tool There is no reason for it to be a plugin. Make it a tool instead. Signed-off-by: Robert Baldyga --- internal_plugins/scsi_debug/__init__.py | 39 -------- storage_devices/device.py | 8 +- test_tools/scsi_debug.py | 125 ++++++++++++++---------- 3 files changed, 77 insertions(+), 95 deletions(-) delete mode 100644 internal_plugins/scsi_debug/__init__.py diff --git a/internal_plugins/scsi_debug/__init__.py b/internal_plugins/scsi_debug/__init__.py deleted file mode 100644 index f843b7c..0000000 --- a/internal_plugins/scsi_debug/__init__.py +++ /dev/null @@ -1,39 +0,0 @@ -# -# Copyright(c) 2020-2021 Intel Corporation -# SPDX-License-Identifier: BSD-3-Clause -# -from time import sleep - -from core.test_run_utils import TestRun -from storage_devices.device import Device -from connection.utils.output import CmdException -from test_tools.os_tools import load_kernel_module, is_kernel_module_loaded, unload_kernel_module - - -class ScsiDebug: - def __init__(self, params, config): - self.params = params - self.module_name = "scsi_debug" - - def pre_setup(self): - pass - - def post_setup(self): - self.reload() - - def reload(self): - self.teardown() - sleep(1) - load_output = load_kernel_module(self.module_name, self.params) - if load_output.exit_code != 0: - raise CmdException(f"Failed to load {self.module_name} module", load_output) - TestRun.LOGGER.info(f"{self.module_name} loaded successfully.") - sleep(10) - TestRun.scsi_debug_devices = Device.get_scsi_debug_devices() - - def teardown(self): - if is_kernel_module_loaded(self.module_name): - unload_kernel_module(self.module_name) - - -plugin_class = ScsiDebug diff --git a/storage_devices/device.py b/storage_devices/device.py index a56b0bb..453eb0c 100644 --- a/storage_devices/device.py +++ b/storage_devices/device.py @@ -1,6 +1,7 @@ # # Copyright(c) 2019-2022 Intel Corporation # Copyright(c) 2023-2024 Huawei Technologies Co., Ltd. +# Copyright(c) 2026 Unvertical # SPDX-License-Identifier: BSD-3-Clause # @@ -116,10 +117,3 @@ def __str__(self): def __repr__(self): return str(self) - - @staticmethod - def get_scsi_debug_devices(): - scsi_debug_devices = TestRun.executor.run_expect_success( - "lsscsi --scsi_id | grep scsi_debug").stdout - return [Device(f'/dev/disk/by-id/scsi-{device.split()[-1]}') - for device in scsi_debug_devices.splitlines()] diff --git a/test_tools/scsi_debug.py b/test_tools/scsi_debug.py index 7f47bc3..6ddbe5c 100644 --- a/test_tools/scsi_debug.py +++ b/test_tools/scsi_debug.py @@ -1,85 +1,112 @@ # -# Copyright(c) 2022 Intel Corporation +# Copyright(c) 2020-2022 Intel Corporation # Copyright(c) 2026 Unvertical # SPDX-License-Identifier: BSD-3-Clause # import re +from time import sleep +from connection.utils.output import CmdException from core.test_run import TestRun -from test_tools.os_tools import get_syslog_path +from storage_devices.device import Device +from test_tools.os_tools import ( + get_syslog_path, + is_kernel_module_loaded, + load_kernel_module, + unload_kernel_module, +) +MODULE_NAME = "scsi_debug" -class Logs: - last_read_line = 0 - syslog_path = None - FLUSH = re.compile(r"scsi_debug:[\s\S]*cmd 35") - FUA = re.compile(r"scsi_debug:[\s\S]*cmd 2a 08") +FLUSH = re.compile(r"scsi_debug:[\s\S]*cmd 35") +FUA = re.compile(r"scsi_debug:[\s\S]*cmd 2a 08") + + +class ScsiDebug: + def __init__(self, params): + self.params = params + self.syslog_path = None + self.last_read_line = 0 + self.reload() + + def reload(self): + self.unload() + sleep(1) + load_output = load_kernel_module(MODULE_NAME, self.params) + if load_output.exit_code != 0: + raise CmdException(f"Failed to load {MODULE_NAME} module", load_output) + TestRun.LOGGER.info(f"{MODULE_NAME} loaded successfully.") + sleep(10) @staticmethod - def mark(): + def unload(): + if is_kernel_module_loaded(MODULE_NAME): + unload_kernel_module(MODULE_NAME) + + def get_devices(self): + scsi_debug_devices = TestRun.executor.run_expect_success( + "lsscsi --scsi_id | grep scsi_debug").stdout + return [Device(f'/dev/disk/by-id/scsi-{device.split()[-1]}') + for device in scsi_debug_devices.splitlines()] + + def mark(self): """Set syslog position to current end so subsequent reads only see new entries.""" - if Logs.syslog_path is None: - Logs.syslog_path = get_syslog_path() + if self.syslog_path is None: + self.syslog_path = get_syslog_path() line_count = TestRun.executor.run_expect_success( - f"wc -l < {Logs.syslog_path}" + f"wc -l < {self.syslog_path}" ).stdout.strip() - Logs.last_read_line = int(line_count) + self.last_read_line = int(line_count) - @staticmethod - def check_syslog_for_signals(): - log_lines = Logs._read_syslog() - flush_count = Logs._count_logs(log_lines, Logs.FLUSH) - fua_count = Logs._count_logs(log_lines, Logs.FUA) - Logs._validate_logs_amount(fua_count, "FUA") - Logs._validate_logs_amount(flush_count, "FLUSH") + def check_for_signals(self): + log_lines = self._read_syslog() + flush_count, fua_count = self._count_logs(log_lines) + self._validate_logs_amount(fua_count, "FUA") + self._validate_logs_amount(flush_count, "FLUSH") - @staticmethod - def check_syslog_for_flush(): - """Check syslog for FLUSH logs""" - log_lines = Logs._read_syslog() - flush_logs_counter = Logs._count_logs(log_lines, Logs.FLUSH) - Logs._validate_logs_amount(flush_logs_counter, "FLUSH") + def check_for_flush(self): + log_lines = self._read_syslog() + flush_count, _ = self._count_logs(log_lines) + self._validate_logs_amount(flush_count, "FLUSH") - @staticmethod - def check_syslog_for_fua(): - """Check syslog for FUA logs""" - log_lines = Logs._read_syslog() - fua_logs_counter = Logs._count_logs(log_lines, Logs.FUA) - Logs._validate_logs_amount(fua_logs_counter, "FUA") + def check_for_fua(self): + log_lines = self._read_syslog() + _, fua_count = self._count_logs(log_lines) + self._validate_logs_amount(fua_count, "FUA") - @staticmethod - def _read_syslog(): + def _read_syslog(self): """Read syslog lines since last mark and advance the position.""" - if Logs.syslog_path is None: - Logs.syslog_path = get_syslog_path() + if self.syslog_path is None: + self.syslog_path = get_syslog_path() - start_line = Logs.last_read_line + 1 + start_line = self.last_read_line + 1 log_lines = TestRun.executor.run_expect_success( - f"tail -qn +{start_line} {Logs.syslog_path}" + f"tail -qn +{start_line} {self.syslog_path}" ).stdout.splitlines() - Logs.last_read_line += len(log_lines) + self.last_read_line += len(log_lines) return log_lines @staticmethod - def _count_logs(log_lines: list, expected_log): - """Count specified log in list and return its amount.""" - logs_counter = 0 + def _count_logs(log_lines): + flush_count = 0 + fua_count = 0 for line in log_lines: - if expected_log.search(line) is not None: - logs_counter += 1 - return logs_counter + if FLUSH.search(line): + flush_count += 1 + if FUA.search(line): + fua_count += 1 + return flush_count, fua_count @staticmethod - def _validate_logs_amount(logs_counter: int, log_type: str): - """Validate amount of logs.""" - if logs_counter == 0: + def _validate_logs_amount(count, log_type): + if count == 0: if log_type == "FLUSH": TestRun.LOGGER.error(f"{log_type} log not occured") else: TestRun.LOGGER.warning(f"{log_type} log not occured") - elif logs_counter == 1: + elif count == 1: TestRun.LOGGER.warning(f"{log_type} log occured only once.") else: - TestRun.LOGGER.info(f"{log_type} log occured {logs_counter} times.") + TestRun.LOGGER.info(f"{log_type} log occured {count} times.") From d9970662e8b72dab199eee36da479c612e3cf77f Mon Sep 17 00:00:00 2001 From: Robert Baldyga Date: Thu, 19 Mar 2026 20:52:20 +0100 Subject: [PATCH 4/4] scsi_debug: Decouple test logic from the tool Signed-off-by: Robert Baldyga --- test_tools/scsi_debug.py | 28 +++++----------------------- 1 file changed, 5 insertions(+), 23 deletions(-) diff --git a/test_tools/scsi_debug.py b/test_tools/scsi_debug.py index 6ddbe5c..205ff4f 100644 --- a/test_tools/scsi_debug.py +++ b/test_tools/scsi_debug.py @@ -50,7 +50,7 @@ def get_devices(self): return [Device(f'/dev/disk/by-id/scsi-{device.split()[-1]}') for device in scsi_debug_devices.splitlines()] - def mark(self): + def reset_stats(self): """Set syslog position to current end so subsequent reads only see new entries.""" if self.syslog_path is None: self.syslog_path = get_syslog_path() @@ -60,21 +60,15 @@ def mark(self): ).stdout.strip() self.last_read_line = int(line_count) - def check_for_signals(self): - log_lines = self._read_syslog() - flush_count, fua_count = self._count_logs(log_lines) - self._validate_logs_amount(fua_count, "FUA") - self._validate_logs_amount(flush_count, "FLUSH") - - def check_for_flush(self): + def get_flush_count(self): log_lines = self._read_syslog() flush_count, _ = self._count_logs(log_lines) - self._validate_logs_amount(flush_count, "FLUSH") + return flush_count - def check_for_fua(self): + def get_fua_count(self): log_lines = self._read_syslog() _, fua_count = self._count_logs(log_lines) - self._validate_logs_amount(fua_count, "FUA") + return fua_count def _read_syslog(self): """Read syslog lines since last mark and advance the position.""" @@ -98,15 +92,3 @@ def _count_logs(log_lines): if FUA.search(line): fua_count += 1 return flush_count, fua_count - - @staticmethod - def _validate_logs_amount(count, log_type): - if count == 0: - if log_type == "FLUSH": - TestRun.LOGGER.error(f"{log_type} log not occured") - else: - TestRun.LOGGER.warning(f"{log_type} log not occured") - elif count == 1: - TestRun.LOGGER.warning(f"{log_type} log occured only once.") - else: - TestRun.LOGGER.info(f"{log_type} log occured {count} times.")