From 5e50e5d1d1e8941b6c1f15c8e4ed9d14771bdff1 Mon Sep 17 00:00:00 2001 From: junkmd Date: Tue, 17 Feb 2026 22:51:26 +0900 Subject: [PATCH 1/4] fix: Update `test_logutil.py` to prevent CI log clutter and improve isolation. - Use direct `logging.Logger()` instantiation in `test_emit`. - Bypasses global registration to ensure test isolation. - Disable logger propagation for test `Logger` instances. - Sets `logger.propagate = False` to prevent interference from other handlers. --- comtypes/test/test_logutil.py | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/comtypes/test/test_logutil.py b/comtypes/test/test_logutil.py index 04f3bd32..73b7250b 100644 --- a/comtypes/test/test_logutil.py +++ b/comtypes/test/test_logutil.py @@ -226,8 +226,15 @@ class Test_NTDebugHandler(ut.TestCase): def test_emit(self): ready = threading.Event() handler = NTDebugHandler() - logger = logging.getLogger("test_ntdebug_handler") + # Direct `Logger()` instantiation for test isolation: bypasses global + # registration and prevents any side effects / cross-test pollution. + # (The official 'Loggers should NEVER be instantiated directly' rule + # targets production code where hierarchy and propagation matter; + # here we want neither.) + # https://docs.python.org/3/library/logging.html#logger-objects + logger = logging.Logger("test_ntdebug_handler") # Clear existing handlers to prevent interference from other tests + logger.propagate = False logger.handlers = [] logger.addHandler(handler) logger.setLevel(logging.INFO) From f5b16463b4ff67773fbb5e1d45724bb6ee6e6dc1 Mon Sep 17 00:00:00 2001 From: junkmd Date: Tue, 17 Feb 2026 22:51:26 +0900 Subject: [PATCH 2/4] refactor: Extract debug string listener into a separate function in `test_logutil.py`. Improves modularity and readability of the debug string capturing logic. --- comtypes/test/test_logutil.py | 46 +++++++++++++++++++---------------- 1 file changed, 25 insertions(+), 21 deletions(-) diff --git a/comtypes/test/test_logutil.py b/comtypes/test/test_logutil.py index 73b7250b..6eee109f 100644 --- a/comtypes/test/test_logutil.py +++ b/comtypes/test/test_logutil.py @@ -172,6 +172,29 @@ def open_dbwin_debug_channels() -> Iterator[tuple[int, int, int]]: yield (h_buffer_ready, h_data_ready, p_view) +def _listen_on_dbwin_channel( + interval_ms: int, + messages: Queue, + ready: threading.Event, + stop: threading.Event, + pid: int, +) -> None: + # Create/open named events and file mapping for interprocess communication. + # These objects are part of the Windows Debugging API contract. + with open_dbwin_debug_channels() as (h_buffer_ready, h_data_ready, p_view): + ready.set() # Signal to the main thread that listener is ready. + while not stop.is_set(): # Loop until the main thread signals to finish. + _SetEvent(h_buffer_ready) # Signal readiness to `OutputDebugString`. + # Wait for `OutputDebugString` to signal that data is ready. + if _WaitForSingleObject(h_data_ready, interval_ms) == WAIT_OBJECT_0: + # Debug string buffer format: [4 bytes: PID][N bytes: string]. + # Check if the process ID in the buffer matches the current PID. + if ctypes.cast(p_view, POINTER(DWORD)).contents.value == pid: + # Extract the null-terminated string, skipping the PID, + # and put it into the queue. + messages.put(ctypes.string_at(p_view + 4).strip(b"\x00")) + + @contextlib.contextmanager def capture_debug_strings(ready: threading.Event, *, interval: int) -> Iterator[Queue]: """Context manager to capture debug strings emitted via `OutputDebugString`. @@ -179,28 +202,9 @@ def capture_debug_strings(ready: threading.Event, *, interval: int) -> Iterator[ """ captured = Queue() finished = threading.Event() - - def _listener( - q: Queue, rdy: threading.Event, fin: threading.Event, pid: int - ) -> None: - # Create/open named events and file mapping for interprocess communication. - # These objects are part of the Windows Debugging API contract. - with open_dbwin_debug_channels() as (h_buffer_ready, h_data_ready, p_view): - rdy.set() # Signal to the main thread that listener is ready. - while not fin.is_set(): # Loop until the main thread signals to finish. - _SetEvent(h_buffer_ready) # Signal readiness to `OutputDebugString`. - # Wait for `OutputDebugString` to signal that data is ready. - if _WaitForSingleObject(h_data_ready, interval) == WAIT_OBJECT_0: - # Debug string buffer format: [4 bytes: PID][N bytes: string]. - # Check if the process ID in the buffer matches the current PID. - if ctypes.cast(p_view, POINTER(DWORD)).contents.value == pid: - # Extract the null-terminated string, skipping the PID, - # and put it into the queue. - q.put(ctypes.string_at(p_view + 4).strip(b"\x00")) - th = threading.Thread( - target=_listener, - args=(captured, ready, finished, _GetCurrentProcessId()), + target=_listen_on_dbwin_channel, + args=(interval, captured, ready, finished, _GetCurrentProcessId()), daemon=True, ) th.start() From 9f7fb33d40d0f52ede74ef39d71761142d63a397 Mon Sep 17 00:00:00 2001 From: junkmd Date: Tue, 17 Feb 2026 22:51:26 +0900 Subject: [PATCH 3/4] refactor: Streamline debug string capture API in `test_logutil.py`. --- comtypes/test/test_logutil.py | 26 ++++++++++++++++---------- 1 file changed, 16 insertions(+), 10 deletions(-) diff --git a/comtypes/test/test_logutil.py b/comtypes/test/test_logutil.py index 6eee109f..8556971d 100644 --- a/comtypes/test/test_logutil.py +++ b/comtypes/test/test_logutil.py @@ -196,10 +196,7 @@ def _listen_on_dbwin_channel( @contextlib.contextmanager -def capture_debug_strings(ready: threading.Event, *, interval: int) -> Iterator[Queue]: - """Context manager to capture debug strings emitted via `OutputDebugString`. - Spawns a listener thread to monitor the debug channels. - """ +def _run_dbwin_listener(ready: threading.Event, interval: int) -> Iterator[Queue]: captured = Queue() finished = threading.Event() th = threading.Thread( @@ -215,11 +212,22 @@ def capture_debug_strings(ready: threading.Event, *, interval: int) -> Iterator[ th.join() +@contextlib.contextmanager +def capture_debug_strings(*, timeout: float, interval: float) -> Iterator[Queue]: + """Context manager to capture debug strings emitted via `OutputDebugString`. + Spawns a listener thread to monitor the debug channels. + + Parameters are floats in seconds. + """ + ready = threading.Event() + with _run_dbwin_listener(ready, int(interval * 1000)) as messages: + ready.wait(timeout=timeout) # Wait for the listener to be ready + yield messages + + class Test_OutputDebugStringW(ut.TestCase): def test(self): - ready = threading.Event() - with capture_debug_strings(ready, interval=100) as cap: - ready.wait(timeout=5) # Wait for the listener to be ready + with capture_debug_strings(timeout=5, interval=0.1) as cap: OutputDebugStringW("hello world") OutputDebugStringW("test message") self.assertEqual(cap.get(), b"hello world") @@ -228,7 +236,6 @@ def test(self): class Test_NTDebugHandler(ut.TestCase): def test_emit(self): - ready = threading.Event() handler = NTDebugHandler() # Direct `Logger()` instantiation for test isolation: bypasses global # registration and prevents any side effects / cross-test pollution. @@ -242,8 +249,7 @@ def test_emit(self): logger.handlers = [] logger.addHandler(handler) logger.setLevel(logging.INFO) - with capture_debug_strings(ready, interval=100) as cap: - ready.wait(timeout=5) # Wait for the listener to be ready + with capture_debug_strings(timeout=5, interval=0.1) as cap: msg = "This is a test message from NTDebugHandler." logger.info(msg) logger.removeHandler(handler) From 20baa783d8b87407de24c5038fedee6221619674 Mon Sep 17 00:00:00 2001 From: junkmd Date: Tue, 17 Feb 2026 22:51:26 +0900 Subject: [PATCH 4/4] refactor: Improve clarity of variables in `_run_dbwin_listener` in `test_logutil.py`. --- comtypes/test/test_logutil.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/comtypes/test/test_logutil.py b/comtypes/test/test_logutil.py index 8556971d..4d116daf 100644 --- a/comtypes/test/test_logutil.py +++ b/comtypes/test/test_logutil.py @@ -196,19 +196,19 @@ def _listen_on_dbwin_channel( @contextlib.contextmanager -def _run_dbwin_listener(ready: threading.Event, interval: int) -> Iterator[Queue]: - captured = Queue() - finished = threading.Event() +def _run_dbwin_listener(ready: threading.Event, interval_ms: int) -> Iterator[Queue]: + messages = Queue() + stop = threading.Event() th = threading.Thread( target=_listen_on_dbwin_channel, - args=(interval, captured, ready, finished, _GetCurrentProcessId()), + args=(interval_ms, messages, ready, stop, _GetCurrentProcessId()), daemon=True, ) th.start() try: - yield captured + yield messages finally: - finished.set() + stop.set() th.join()