-
-
Notifications
You must be signed in to change notification settings - Fork 182
Expand file tree
/
Copy pathwrapper_windows.py
More file actions
282 lines (222 loc) · 9.5 KB
/
wrapper_windows.py
File metadata and controls
282 lines (222 loc) · 9.5 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
"""Windows agent injection — uses Win32 WriteConsoleInput to type into the agent CLI.
Called by wrapper.py on Windows. Not imported on other platforms.
"""
import ctypes
from ctypes import wintypes
import subprocess
import sys
import time
if sys.platform != "win32":
raise ImportError("wrapper_windows only works on Windows")
kernel32 = ctypes.WinDLL("kernel32", use_last_error=True)
user32 = ctypes.WinDLL("user32", use_last_error=True)
STD_INPUT_HANDLE = -10
KEY_EVENT = 0x0001
VK_RETURN = 0x0D
# Window message constants used by the wm_setfocus Enter backend.
WM_SETFOCUS = 0x0007
WM_ACTIVATE = 0x0006
WA_ACTIVE = 1
class _CHAR_UNION(ctypes.Union):
_fields_ = [("UnicodeChar", wintypes.WCHAR), ("AsciiChar", wintypes.CHAR)]
class _KEY_EVENT_RECORD(ctypes.Structure):
_fields_ = [
("bKeyDown", wintypes.BOOL),
("wRepeatCount", wintypes.WORD),
("wVirtualKeyCode", wintypes.WORD),
("wVirtualScanCode", wintypes.WORD),
("uChar", _CHAR_UNION),
("dwControlKeyState", wintypes.DWORD),
]
class _EVENT_UNION(ctypes.Union):
_fields_ = [("KeyEvent", _KEY_EVENT_RECORD)]
class _INPUT_RECORD(ctypes.Structure):
_fields_ = [("EventType", wintypes.WORD), ("Event", _EVENT_UNION)]
def _write_key(handle, char: str, key_down: bool, vk: int = 0, scan: int = 0):
rec = _INPUT_RECORD()
rec.EventType = KEY_EVENT
evt = rec.Event.KeyEvent
evt.bKeyDown = key_down
evt.wRepeatCount = 1
evt.uChar.UnicodeChar = char
evt.wVirtualKeyCode = vk
evt.wVirtualScanCode = scan
written = wintypes.DWORD(0)
kernel32.WriteConsoleInputW(handle, ctypes.byref(rec), 1, ctypes.byref(written))
def _send_wm_setfocus():
"""Tell the console window it just received focus — some Node TUIs
(GitHub Copilot CLI) gate Enter processing on focus state, so this
makes them accept injected Enter without an actual focus change."""
hwnd = kernel32.GetConsoleWindow()
if not hwnd:
return
user32.SendMessageW(hwnd, WM_SETFOCUS, 0, 0)
user32.SendMessageW(hwnd, WM_ACTIVATE, WA_ACTIVE, 0)
def inject(text: str, *, delay: float = 0.3, enter_backend: str = "console_input"):
"""Inject text + Enter into the current console via WriteConsoleInput.
Uses batch WriteConsoleInputW for the text (all records in one call)
then a separate Enter keystroke after a scaled delay.
`enter_backend` controls how the final Enter is delivered:
- "console_input" (default): standard WriteConsoleInput + VK_RETURN.
Works for Claude/Codex/Gemini/Kimi/Qwen/Kilo/etc.
- "wm_setfocus": fake-focus message (WM_SETFOCUS + WM_ACTIVATE) to
the console window before sending VK_RETURN. Needed for GitHub
Copilot CLI, whose Ink-based input layer ignores Enter events
when the console window is unfocused.
"""
handle = kernel32.GetStdHandle(STD_INPUT_HANDLE)
# Build all key events at once (key down + key up per character)
n_events = len(text) * 2
if n_events > 0:
records = (_INPUT_RECORD * n_events)()
idx = 0
for ch in text:
for key_down in (True, False):
rec = records[idx]
rec.EventType = KEY_EVENT
evt = rec.Event.KeyEvent
evt.bKeyDown = key_down
evt.wRepeatCount = 1
evt.uChar.UnicodeChar = ch
evt.wVirtualKeyCode = 0
evt.wVirtualScanCode = 0
idx += 1
written = wintypes.DWORD(0)
kernel32.WriteConsoleInputW(handle, records, n_events, ctypes.byref(written))
# Scale delay with text length so longer prompts get more processing time
scaled_delay = max(delay, len(text) * 0.001)
time.sleep(scaled_delay)
if enter_backend == "wm_setfocus":
_send_wm_setfocus()
# Tiny pause for the window to process the focus message
time.sleep(0.05)
_write_key(handle, "\r", True, vk=VK_RETURN, scan=0x1C)
_write_key(handle, "\r", False, vk=VK_RETURN, scan=0x1C)
# ---------------------------------------------------------------------------
# Activity detection — console screen buffer hashing
# ---------------------------------------------------------------------------
STD_OUTPUT_HANDLE = -11
class _COORD(ctypes.Structure):
_fields_ = [("X", wintypes.SHORT), ("Y", wintypes.SHORT)]
class _SMALL_RECT(ctypes.Structure):
_fields_ = [
("Left", wintypes.SHORT),
("Top", wintypes.SHORT),
("Right", wintypes.SHORT),
("Bottom", wintypes.SHORT),
]
class _CONSOLE_SCREEN_BUFFER_INFO(ctypes.Structure):
_fields_ = [
("dwSize", _COORD),
("dwCursorPosition", _COORD),
("wAttributes", wintypes.WORD),
("srWindow", _SMALL_RECT),
("dwMaximumWindowSize", _COORD),
]
class _CHAR_INFO(ctypes.Structure):
_fields_ = [("Char", _CHAR_UNION), ("Attributes", wintypes.WORD)]
kernel32.GetConsoleScreenBufferInfo.argtypes = [
wintypes.HANDLE,
ctypes.POINTER(_CONSOLE_SCREEN_BUFFER_INFO),
]
kernel32.GetConsoleScreenBufferInfo.restype = wintypes.BOOL
kernel32.ReadConsoleOutputW.argtypes = [
wintypes.HANDLE,
ctypes.POINTER(_CHAR_INFO),
_COORD,
_COORD,
ctypes.POINTER(_SMALL_RECT),
]
kernel32.ReadConsoleOutputW.restype = wintypes.BOOL
def get_activity_checker(pid_holder, agent_name="unknown", trigger_flag=None):
"""Return a callable that detects agent activity by diffing visible characters.
Counts how many visible characters changed since last poll. Filters out
invisible buffer noise (ConPTY artifacts, cursor jitter, timer ticks) by
requiring a minimum number of changed cells. Uses hysteresis: goes active
immediately on significant change, requires sustained quiet to go idle.
trigger_flag: shared [bool] list — set to [True] by queue watcher when a
message is injected. Forces active state immediately (covers thinking phase).
pid_holder: not used for screen hashing, but kept for signature compatibility.
"""
import array as _array
import os as _os
last_chars = [None] # previous poll's character bytes
handle = kernel32.GetStdHandle(STD_OUTPUT_HANDLE)
MIN_CHANGED_CELLS = 10 # idle noise is 2-5 cells; real work is 50+
IDLE_COOLDOWN = 5 # need 5 consecutive idle polls (5s) before going idle
_consecutive_idle = [0]
_is_active = [False]
def check():
# External trigger: queue watcher injected a message → force active
triggered = False
if trigger_flag is not None and trigger_flag[0]:
trigger_flag[0] = False
triggered = True
_consecutive_idle[0] = 0
_is_active[0] = True
# Get buffer dimensions
csbi = _CONSOLE_SCREEN_BUFFER_INFO()
if not kernel32.GetConsoleScreenBufferInfo(handle, ctypes.byref(csbi)):
return _is_active[0]
rect = csbi.srWindow
width = rect.Right - rect.Left + 1
height = rect.Bottom - rect.Top + 1
if width <= 0 or height <= 0:
return _is_active[0]
# Read visible window
buffer_size = _COORD(width, height)
buffer_coord = _COORD(0, 0)
read_rect = _SMALL_RECT(rect.Left, rect.Top, rect.Right, rect.Bottom)
char_info_array = (_CHAR_INFO * (width * height))()
ok = kernel32.ReadConsoleOutputW(
handle, char_info_array, buffer_size, buffer_coord,
ctypes.byref(read_rect),
)
if not ok:
return _is_active[0]
# Extract visible characters only (skip attributes)
raw = bytes(char_info_array)
shorts = _array.array("H")
shorts.frombytes(raw)
char_data = shorts[::2].tobytes()
# Count how many characters actually changed
prev = last_chars[0]
n_changed = 0
if prev is not None and len(prev) == len(char_data):
if prev != char_data: # fast path: skip counting if identical
for i in range(0, len(prev), 2):
if prev[i:i+2] != char_data[i:i+2]:
n_changed += 1
significant = n_changed >= MIN_CHANGED_CELLS
last_chars[0] = char_data
# Hysteresis: active immediately on significant change or trigger,
# idle only after IDLE_COOLDOWN consecutive quiet polls
if significant or triggered:
_consecutive_idle[0] = 0
_is_active[0] = True
else:
_consecutive_idle[0] += 1
if _consecutive_idle[0] >= IDLE_COOLDOWN:
_is_active[0] = False
return _is_active[0]
return check
def run_agent(command, extra_args, cwd, env, queue_file, agent, no_restart, start_watcher, strip_env=None, pid_holder=None, session_name=None, inject_env=None, inject_delay: float = 0.3, enter_backend: str = "console_input"):
"""Run agent as a direct subprocess, inject via Win32 console."""
if inject_env:
env = {**env, **inject_env}
start_watcher(lambda text: inject(text, delay=inject_delay, enter_backend=enter_backend))
while True:
try:
proc = subprocess.Popen([command] + extra_args, cwd=cwd, env=env)
if pid_holder is not None:
pid_holder[0] = proc.pid
proc.wait()
if pid_holder is not None:
pid_holder[0] = None
if no_restart:
break
print(f"\n {agent.capitalize()} exited (code {proc.returncode}).")
print(f" Restarting in 3s... (Ctrl+C to quit)")
time.sleep(3)
except KeyboardInterrupt:
break