diff --git a/README.md b/README.md index 98e440b..5703cf7 100644 --- a/README.md +++ b/README.md @@ -8,6 +8,8 @@ AI adaptation guides: - English: [`docs/AI_ASSISTANT_DEPLOYMENT.md`](./docs/AI_ASSISTANT_DEPLOYMENT.md) - 中文:[`docs/AI_ASSISTANT_DEPLOYMENT.zh-CN.md`](./docs/AI_ASSISTANT_DEPLOYMENT.zh-CN.md) - AI debug runbook: [`docs/AI_DEBUG_RUNBOOK.md`](./docs/AI_DEBUG_RUNBOOK.md) +- IPC integration: [`docs/IPC.md`](./docs/IPC.md) +- IPC 集成说明: [`docs/IPC.zh-CN.md`](./docs/IPC.zh-CN.md) ## What This Project Does diff --git a/README.zh-CN.md b/README.zh-CN.md index 0d39305..cb414c6 100644 --- a/README.zh-CN.md +++ b/README.zh-CN.md @@ -8,6 +8,8 @@ AI 适配指南: - English: [`docs/AI_ASSISTANT_DEPLOYMENT.md`](./docs/AI_ASSISTANT_DEPLOYMENT.md) - 中文:[`docs/AI_ASSISTANT_DEPLOYMENT.zh-CN.md`](./docs/AI_ASSISTANT_DEPLOYMENT.zh-CN.md) - AI 调试 Runbook:[`docs/AI_DEBUG_RUNBOOK.md`](./docs/AI_DEBUG_RUNBOOK.md) +- IPC integration: [`docs/IPC.md`](./docs/IPC.md) +- IPC 集成说明:[`docs/IPC.zh-CN.md`](./docs/IPC.zh-CN.md) ## 这个项目解决什么问题 diff --git a/docs/IPC.md b/docs/IPC.md new file mode 100644 index 0000000..55b0e57 --- /dev/null +++ b/docs/IPC.md @@ -0,0 +1,129 @@ +# IPC Integration + +VibeMouse exposes two IPC paths: + +- Built-in `agent <-> listener` transport: `stdio + LPJSON` +- External control transport: stable local endpoint for command-only clients + +The message schema is defined in [`shared/schema/ipc.schema.json`](../shared/schema/ipc.schema.json). + +## External Command Endpoint + +External programs should connect to the stable local endpoint exposed by the agent: + +- Windows: `\\.\pipe\vibemouse` +- Linux/macOS: `${XDG_RUNTIME_DIR:-temp}/vibemouse.sock` + +The current endpoint is also written to `status.json` as `ipc_socket`. + +Notes: + +- External clients should send `command` messages only. +- External clients should not send `event` messages. +- The built-in listener path still uses `stdio`; it does not use the external endpoint. + +## Framing + +All IPC messages use LPJSON: + +1. 4-byte little-endian unsigned length prefix +2. UTF-8 JSON payload + +Example payload: + +```json +{"type":"command","command":"toggle_recording"} +``` + +## Message Types + +Two message shapes are supported: + +### Command Message + +```json +{"type":"command","command":"shutdown"} +``` + +### Event Message + +```json +{"type":"event","event":"mouse.side_front.press"} +``` + +`event` messages are intended for the listener-to-agent path. External integrations should generally use `command`. + +## Supported Commands + +| Command | Description | +|---|---| +| `noop` | No operation; event is ignored | +| `toggle_recording` | Start or stop voice recording | +| `trigger_secondary_action` | In idle: send Enter. In recording: stop and send transcript to OpenClaw | +| `submit_recording` | Stop recording and send transcript to OpenClaw | +| `send_enter` | Send Enter key to the focused input | +| `workspace_left` | Switch workspace left | +| `workspace_right` | Switch workspace right | +| `reload_config` | Reload `config.json` | +| `shutdown` | Gracefully shut down the agent | + +Canonical source: [`shared/protocol/COMMANDS.md`](../shared/protocol/COMMANDS.md) + +## Supported Events + +| Event | Description | +|---|---| +| `mouse.side_front.press` | Front side button press | +| `mouse.side_rear.press` | Rear side button press | +| `hotkey.record_toggle` | Recording toggle hotkey | +| `hotkey.recording_submit` | Recording submit hotkey | +| `gesture.up` | Upward gesture | +| `gesture.down` | Downward gesture | +| `gesture.left` | Leftward gesture | +| `gesture.right` | Rightward gesture | + +Canonical source: [`shared/protocol/EVENTS.md`](../shared/protocol/EVENTS.md) + +## Client Examples + +### Python: Unix Socket + +```python +import json +import socket +import struct + +endpoint = "/tmp/vibemouse.sock" +payload = {"type": "command", "command": "toggle_recording"} +body = json.dumps(payload).encode("utf-8") +frame = struct.pack(" listener`:`stdio + LPJSON` +- 外部控制通道:面向第三方程序的稳定本地 endpoint,仅接收命令 + +消息 schema 定义见 [`shared/schema/ipc.schema.json`](../shared/schema/ipc.schema.json)。 + +## 外部命令通道地址 + +外部程序集成时,应该连接 agent 暴露出来的稳定本地地址: + +- Windows: `\\.\pipe\vibemouse` +- Linux/macOS: `${XDG_RUNTIME_DIR:-temp}/vibemouse.sock` + +agent 也会把当前地址写入 `status.json` 的 `ipc_socket` 字段。 + +注意: + +- 外部客户端应只发送 `command` 消息 +- 外部客户端不应发送 `event` 消息 +- 内建 listener 链路仍然走 `stdio`,不会走这个外部 endpoint + +## 帧格式 + +所有 IPC 消息都使用 LPJSON: + +1. 前 4 字节:小端无符号整数,表示后续 JSON 长度 +2. 后续内容:UTF-8 JSON payload + +示例: + +```json +{"type":"command","command":"toggle_recording"} +``` + +## 消息类型 + +支持两种消息结构: + +### Command Message + +```json +{"type":"command","command":"shutdown"} +``` + +### Event Message + +```json +{"type":"event","event":"mouse.side_front.press"} +``` + +`event` 主要用于 listener -> agent。外部集成一般只需要发送 `command`。 + +## 当前支持的命令 + +| Command | 说明 | +|---|---| +| `noop` | 空操作 | +| `toggle_recording` | 开始或停止录音 | +| `trigger_secondary_action` | 空闲时发送 Enter,录音时停止并提交到 OpenClaw | +| `submit_recording` | 停止录音并提交到 OpenClaw | +| `send_enter` | 向当前焦点输入框发送 Enter | +| `workspace_left` | 切换到左侧工作区 | +| `workspace_right` | 切换到右侧工作区 | +| `reload_config` | 重新加载 `config.json` | +| `shutdown` | 优雅关闭 agent | + +规范来源:[`shared/protocol/COMMANDS.md`](../shared/protocol/COMMANDS.md) + +## 当前支持的事件 + +| Event | 说明 | +|---|---| +| `mouse.side_front.press` | 前侧键按下 | +| `mouse.side_rear.press` | 后侧键按下 | +| `hotkey.record_toggle` | 录音切换热键 | +| `hotkey.recording_submit` | 录音提交热键 | +| `gesture.up` | 上手势 | +| `gesture.down` | 下手势 | +| `gesture.left` | 左手势 | +| `gesture.right` | 右手势 | + +规范来源:[`shared/protocol/EVENTS.md`](../shared/protocol/EVENTS.md) + +## 客户端示例 + +### Python: Unix Socket + +```python +import json +import socket +import struct + +endpoint = "/tmp/vibemouse.sock" +payload = {"type": "command", "command": "toggle_recording"} +body = json.dumps(payload).encode("utf-8") +frame = struct.pack(" None: {"recording": False, "state": "idle", "listener_mode": "inline"}, ) - def test_set_recording_status_includes_ipc_port_when_command_server_is_running(self) -> None: + def test_set_recording_status_includes_ipc_socket_when_command_server_is_running(self) -> None: subject = self._make_subject() with tempfile.TemporaryDirectory(prefix="vibemouse-status-") as tmp: status_file = Path(tmp) / "status.json" setattr(subject, "_config", SimpleNamespace(status_file=status_file)) - setattr(subject, "_command_server", SimpleNamespace(port=43125)) + setattr(subject, "_command_server", SimpleNamespace(endpoint="test://vibemouse")) set_status = cast( Callable[[bool], None], @@ -131,7 +131,7 @@ def test_set_recording_status_includes_ipc_port_when_command_server_is_running(s self.assertEqual( payload, { - "ipc_port": 43125, + "ipc_socket": "test://vibemouse", "listener_mode": "inline", "recording": False, "state": "idle", diff --git a/tests/core/test_output.py b/tests/core/test_output.py index ded1992..b4ff941 100644 --- a/tests/core/test_output.py +++ b/tests/core/test_output.py @@ -48,7 +48,10 @@ def fake_run(*args: object, **kwargs: object) -> SimpleNamespace: captured_timeouts.append(timeout) return SimpleNamespace(returncode=0, stdout="1\n") - with patch("vibemouse.system_integration.subprocess.run", side_effect=fake_run): + with ( + patch("vibemouse.system_integration.sys.platform", "linux"), + patch("vibemouse.system_integration.subprocess.run", side_effect=fake_run), + ): subject = self._make_subject() probe = cast(Callable[[], bool], getattr(subject, "_is_text_input_focused")) call_probe: Callable[[], bool] = probe @@ -57,25 +60,31 @@ def fake_run(*args: object, **kwargs: object) -> SimpleNamespace: self.assertTrue(result) self.assertEqual(captured_timeouts, [1.5]) - @patch( - "vibemouse.system_integration.subprocess.run", - side_effect=subprocess.TimeoutExpired( - cmd=["python3", "-c", "..."], timeout=1.5 - ), - ) - def test_focus_probe_timeout_returns_false(self, _mock_run: object) -> None: - subject = self._make_subject() - probe = cast(Callable[[], bool], getattr(subject, "_is_text_input_focused")) - self.assertFalse(probe()) - - @patch( - "vibemouse.system_integration.subprocess.run", - side_effect=OSError("spawn failed"), - ) - def test_focus_probe_oserror_returns_false(self, _mock_run: object) -> None: - subject = self._make_subject() - probe = cast(Callable[[], bool], getattr(subject, "_is_text_input_focused")) - self.assertFalse(probe()) + def test_focus_probe_timeout_returns_false(self) -> None: + with ( + patch("vibemouse.system_integration.sys.platform", "linux"), + patch( + "vibemouse.system_integration.subprocess.run", + side_effect=subprocess.TimeoutExpired( + cmd=["python3", "-c", "..."], timeout=1.5 + ), + ), + ): + subject = self._make_subject() + probe = cast(Callable[[], bool], getattr(subject, "_is_text_input_focused")) + self.assertFalse(probe()) + + def test_focus_probe_oserror_returns_false(self) -> None: + with ( + patch("vibemouse.system_integration.sys.platform", "linux"), + patch( + "vibemouse.system_integration.subprocess.run", + side_effect=OSError("spawn failed"), + ), + ): + subject = self._make_subject() + probe = cast(Callable[[], bool], getattr(subject, "_is_text_input_focused")) + self.assertFalse(probe()) def test_focus_probe_prefers_system_integration_result(self) -> None: subject = self._make_subject() diff --git a/tests/ipc/test_client_server.py b/tests/ipc/test_client_server.py index a4382db..8cce976 100644 --- a/tests/ipc/test_client_server.py +++ b/tests/ipc/test_client_server.py @@ -6,12 +6,16 @@ import json import socket import struct +import sys import threading import time +from pathlib import Path + +import pytest from vibemouse.ipc.client import IPCClient from vibemouse.ipc.messages import make_command_message, write_lpjson_frame -from vibemouse.ipc.server import AgentCommandServer, IPCServer +from vibemouse.ipc.server import AgentCommandServer, IPCServer, default_command_endpoint def test_ipc_client_send_event() -> None: @@ -78,7 +82,17 @@ def test_ipc_server_receives_command_when_handler_is_configured() -> None: assert received == ["reload_config"] -def test_agent_command_server_accepts_loopback_command() -> None: +def test_default_command_endpoint_uses_stable_local_address() -> None: + endpoint = default_command_endpoint() + if sys.platform == "win32": + assert endpoint == r"\\.\pipe\vibemouse" + else: + assert Path(endpoint).name == "vibemouse.sock" + + +def test_agent_command_server_accepts_local_command() -> None: + if sys.platform == "win32": + pytest.skip("End-to-end named pipe client coverage is not implemented in this test") received: list[str] = [] ready = threading.Event() @@ -89,7 +103,9 @@ def on_command(command_name: str) -> None: server = AgentCommandServer(on_command=on_command) server.start() try: - with socket.create_connection(("127.0.0.1", server.port), timeout=2) as conn: + with socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) as conn: + conn.settimeout(2) + conn.connect(server.endpoint) stream = conn.makefile("rwb") write_lpjson_frame(stream, make_command_message("shutdown")) stream.close() diff --git a/tests/listener/test_keyboard_listener.py b/tests/listener/test_keyboard_listener.py index a1bd506..0d4932c 100644 --- a/tests/listener/test_keyboard_listener.py +++ b/tests/listener/test_keyboard_listener.py @@ -2,6 +2,7 @@ import unittest from collections.abc import Callable +from types import SimpleNamespace from typing import cast from vibemouse.core.commands import EVENT_HOTKEY_RECORD_TOGGLE @@ -82,3 +83,22 @@ def test_dispatch_hotkey_emits_configured_event(self) -> None: dispatch() self.assertEqual(seen, [EVENT_HOTKEY_RECORD_TOGGLE]) + + def test_windows_modifier_keycodes_are_normalized(self) -> None: + listener = KeyboardHotkeyListener( + on_hotkey=_noop, keycodes=(16, 91, 134), debounce_s=0.0 + ) + process = cast( + Callable[[int, int], bool], getattr(listener, "_process_key_event") + ) + + self.assertFalse(process(160, 1)) + self.assertFalse(process(92, 1)) + self.assertTrue(process(134, 1)) + + def test_extract_windows_keycode_prefers_vk_attribute(self) -> None: + extract = cast( + Callable[[object], int | None], + getattr(KeyboardHotkeyListener, "_extract_windows_keycode"), + ) + self.assertEqual(extract(SimpleNamespace(vk=160)), 16) diff --git a/tests/listener/test_mouse_listener.py b/tests/listener/test_mouse_listener.py index 83709fa..362fbe1 100644 --- a/tests/listener/test_mouse_listener.py +++ b/tests/listener/test_mouse_listener.py @@ -1170,6 +1170,73 @@ def _import_module(name: str): begin_suppress.assert_not_called() end_suppress.assert_not_called() + def test_pynput_ignores_right_click_when_not_right_trigger(self) -> None: + class _FakeButton: + def __init__(self, name: str) -> None: + self._name = name + + def __str__(self) -> str: + return f"Button.{self._name}" + + class _FakeMouseListener: + def __init__( + self, + *, + on_click: Callable[[int, int, object, bool], None], + on_move: Callable[[int, int], None], + ) -> None: + self._on_click = on_click + self._on_move = on_move + + def start(self) -> None: + self._on_click(10, 20, fake_button_holder.right, True) + self._on_click(10, 20, fake_button_holder.right, False) + + def stop(self) -> None: + return + + fake_button_holder = type("_FakeButtonHolder", (), {"right": _FakeButton("right")}) + + def _listener_ctor( + *, on_click: Callable[[int, int, object, bool], None], on_move: Callable[[int, int], None] + ) -> _FakeMouseListener: + return _FakeMouseListener(on_click=on_click, on_move=on_move) + + fake_mouse_module = type( + "_FakeMouseModule", + (), + { + "Listener": _listener_ctor, + "Button": fake_button_holder, + }, + ) + + listener = SideButtonListener( + on_front_press=_noop_button, + on_rear_press=_noop_button, + front_button="x1", + rear_button="x2", + gestures_enabled=True, + gesture_trigger_button="rear", + system_integration=_NeutralSystemIntegration(), + ) + + def _import_module(name: str): + if name == "pynput.mouse": + return fake_mouse_module + return _real_import_module(name) + + with ( + patch( + "vibemouse.mouse_listener.importlib.import_module", + side_effect=_import_module, + ), + patch.object(listener, "_dispatch_click_async") as dispatch_click_async, + ): + listener._run_pynput(timeout_s=0.2) + + dispatch_click_async.assert_not_called() + class _GrabDeviceStub: def __init__(self, *, fail_ungrab_times: int = 0) -> None: diff --git a/tests/ops/test_deploy.py b/tests/ops/test_deploy.py index 60e81e3..2e3424d 100644 --- a/tests/ops/test_deploy.py +++ b/tests/ops/test_deploy.py @@ -10,18 +10,21 @@ build_deploy_env, render_env_file, render_service_file, + render_windows_launcher, + render_windows_startup_file, run_deploy, ) class DeployHelpersTests(unittest.TestCase): def test_build_deploy_env_applies_preset_and_override(self) -> None: - env_map = build_deploy_env( - preset="fast", - openclaw_command="openclaw --profile prod", - openclaw_agent="ops", - openclaw_retries=5, - ) + with patch("vibemouse.deploy._is_windows", return_value=False): + env_map = build_deploy_env( + preset="fast", + openclaw_command="openclaw --profile prod", + openclaw_agent="ops", + openclaw_retries=5, + ) self.assertEqual( env_map["VIBEMOUSE_OPENCLAW_COMMAND"], "openclaw --profile prod" @@ -31,6 +34,26 @@ def test_build_deploy_env_applies_preset_and_override(self) -> None: self.assertEqual(env_map["VIBEMOUSE_OPENCLAW_RETRIES"], "5") self.assertEqual(env_map["VIBEMOUSE_BUTTON_DEBOUNCE_MS"], "120") + def test_build_deploy_env_uses_windows_status_file_on_windows(self) -> None: + with ( + patch("vibemouse.deploy._is_windows", return_value=True), + patch( + "vibemouse.deploy._windows_local_dir", + return_value=Path("C:/Users/Test/AppData/Local"), + ), + ): + env_map = build_deploy_env( + preset="stable", + openclaw_command="openclaw", + openclaw_agent="main", + openclaw_retries=None, + ) + + self.assertEqual( + env_map["VIBEMOUSE_STATUS_FILE"], + str(Path("C:/Users/Test/AppData/Local/VibeMouse/vibemouse-status.json")), + ) + def test_render_env_file_quotes_values(self) -> None: content = render_env_file( { @@ -57,9 +80,30 @@ def test_render_service_file_contains_paths(self) -> None: self.assertIn("StandardOutput=append:/tmp/vibemouse.log", service) self.assertIn("StandardError=append:/tmp/vibemouse.log", service) + def test_render_windows_launcher_contains_env_log_and_command(self) -> None: + env_file = Path("C:/Temp/deploy.env") + log_file = Path("C:/Temp/service.log") + launcher = render_windows_launcher( + env_file=env_file, + log_file=log_file, + exec_start='"C:/Python/python.exe" -m vibemouse.main run', + ) + + self.assertIn(str(env_file), launcher) + self.assertIn(str(log_file), launcher) + self.assertIn("-m vibemouse.main run", launcher) + + def test_render_windows_startup_file_points_to_launcher(self) -> None: + startup = render_windows_startup_file( + launcher_file=Path("C:/Temp/vibemouse-launch.ps1") + ) + + self.assertIn("powershell.exe", startup) + self.assertIn("vibemouse-launch.ps1", startup) + class DeployCommandTests(unittest.TestCase): - def test_run_deploy_dry_run_does_not_write_files(self) -> None: + def test_run_deploy_linux_dry_run_does_not_write_files(self) -> None: with tempfile.TemporaryDirectory(prefix="vibemouse-deploy-") as tmp: env_file = Path(tmp) / "deploy.env" service_file = Path(tmp) / "vibemouse.service" @@ -76,13 +120,41 @@ def test_run_deploy_dry_run_does_not_write_files(self) -> None: dry_run=True, ) - rc = run_deploy(args) + with patch("vibemouse.deploy._is_windows", return_value=False): + rc = run_deploy(args) self.assertEqual(rc, 0) self.assertFalse(env_file.exists()) self.assertFalse(service_file.exists()) - def test_run_deploy_skip_systemctl_writes_files_and_runs_doctor(self) -> None: + def test_run_deploy_windows_dry_run_does_not_write_files(self) -> None: + with tempfile.TemporaryDirectory(prefix="vibemouse-deploy-") as tmp: + env_file = Path(tmp) / "deploy.env" + launcher_file = Path(tmp) / "vibemouse-launch.ps1" + startup_file = Path(tmp) / "vibemouse.vbs" + args = argparse.Namespace( + preset="stable", + env_file=str(env_file), + launcher_file=str(launcher_file), + startup_file=str(startup_file), + log_file=str(Path(tmp) / "service.log"), + openclaw_command="openclaw", + openclaw_agent="main", + openclaw_retries=None, + exec_start="python -m vibemouse.main run", + skip_register=False, + dry_run=True, + ) + + with patch("vibemouse.deploy._is_windows", return_value=True): + rc = run_deploy(args) + + self.assertEqual(rc, 0) + self.assertFalse(env_file.exists()) + self.assertFalse(launcher_file.exists()) + self.assertFalse(startup_file.exists()) + + def test_run_deploy_linux_skip_systemctl_writes_files_and_runs_doctor(self) -> None: with tempfile.TemporaryDirectory(prefix="vibemouse-deploy-") as tmp: env_file = Path(tmp) / "deploy.env" service_file = Path(tmp) / "vibemouse.service" @@ -99,7 +171,10 @@ def test_run_deploy_skip_systemctl_writes_files_and_runs_doctor(self) -> None: dry_run=False, ) - with patch("vibemouse.deploy.run_doctor", return_value=0) as run_doctor: + with ( + patch("vibemouse.deploy._is_windows", return_value=False), + patch("vibemouse.deploy.run_doctor", return_value=0) as run_doctor, + ): rc = run_deploy(args) self.assertEqual(rc, 0) @@ -108,6 +183,38 @@ def test_run_deploy_skip_systemctl_writes_files_and_runs_doctor(self) -> None: self.assertTrue(service_file.exists()) self.assertIn('VIBEMOUSE_OPENCLAW_AGENT="ops"', env_file.read_text()) + def test_run_deploy_windows_writes_launcher_and_runs_doctor(self) -> None: + with tempfile.TemporaryDirectory(prefix="vibemouse-deploy-") as tmp: + env_file = Path(tmp) / "deploy.env" + launcher_file = Path(tmp) / "vibemouse-launch.ps1" + startup_file = Path(tmp) / "vibemouse.vbs" + args = argparse.Namespace( + preset="stable", + env_file=str(env_file), + launcher_file=str(launcher_file), + startup_file=str(startup_file), + log_file=str(Path(tmp) / "service.log"), + openclaw_command="openclaw --profile prod", + openclaw_agent="ops", + openclaw_retries=2, + exec_start="python -m vibemouse.main run", + skip_register=False, + dry_run=False, + ) + + with ( + patch("vibemouse.deploy._is_windows", return_value=True), + patch("vibemouse.deploy.run_doctor", return_value=0) as run_doctor, + ): + rc = run_deploy(args) + + self.assertEqual(rc, 0) + self.assertEqual(run_doctor.call_count, 1) + self.assertTrue(env_file.exists()) + self.assertTrue(launcher_file.exists()) + self.assertTrue(startup_file.exists()) + self.assertIn('VIBEMOUSE_OPENCLAW_AGENT="ops"', env_file.read_text()) + def test_run_deploy_rejects_negative_retry_override(self) -> None: args = argparse.Namespace( preset="stable", @@ -121,5 +228,6 @@ def test_run_deploy_rejects_negative_retry_override(self) -> None: skip_systemctl=True, dry_run=True, ) - rc = run_deploy(args) + with patch("vibemouse.deploy._is_windows", return_value=False): + rc = run_deploy(args) self.assertEqual(rc, 1) diff --git a/tests/ops/test_doctor.py b/tests/ops/test_doctor.py index d2476b3..0d50c17 100644 --- a/tests/ops/test_doctor.py +++ b/tests/ops/test_doctor.py @@ -15,6 +15,9 @@ _fix_hyprland_return_bind_conflict, _check_hyprland_return_bind_conflict, _check_openclaw, + _check_windows_background_process, + _check_windows_input_hooks, + _check_windows_startup_entry, _parse_openclaw_command, run_doctor, ) @@ -180,6 +183,7 @@ def fake_run(cmd: list[str], *, timeout: float) -> SimpleNamespace: def test_apply_doctor_fixes_runs_both_fixers(self) -> None: with ( + patch("vibemouse.doctor.sys.platform", "linux"), patch("vibemouse.doctor._fix_hyprland_return_bind_conflict") as fix_bind, patch("vibemouse.doctor._ensure_user_service_active") as fix_service, ): @@ -188,10 +192,51 @@ def test_apply_doctor_fixes_runs_both_fixers(self) -> None: self.assertEqual(fix_bind.call_count, 1) self.assertEqual(fix_service.call_count, 1) + def test_windows_input_hooks_fail_when_pynput_import_fails(self) -> None: + with patch( + "vibemouse.doctor.importlib.import_module", + side_effect=ModuleNotFoundError("pynput"), + ): + check = _check_windows_input_hooks() + + self.assertEqual(check.status, "fail") + self.assertIn("pynput.mouse", check.detail) + + def test_windows_startup_entry_warns_when_missing(self) -> None: + with ( + patch( + "vibemouse.doctor._windows_startup_file", + return_value=Path("C:/Users/Test/AppData/Roaming/Microsoft/Windows/Start Menu/Programs/Startup/vibemouse.vbs"), + ), + patch( + "vibemouse.doctor._windows_launcher_file", + return_value=Path("C:/Users/Test/AppData/Roaming/VibeMouse/vibemouse-launch.ps1"), + ), + ): + check = _check_windows_startup_entry() + + self.assertEqual(check.status, "warn") + self.assertIn("startup entry not found", check.detail) + + def test_windows_background_process_reports_running_process(self) -> None: + with patch( + "vibemouse.doctor._run_subprocess", + return_value=SimpleNamespace( + returncode=0, + stdout="python -m vibemouse.main run\n", + stderr="", + ), + ): + check = _check_windows_background_process() + + self.assertEqual(check.status, "ok") + self.assertIn("detected 1 VibeMouse process", check.detail) + class DoctorCommandTests(unittest.TestCase): def test_run_doctor_returns_nonzero_when_fail_exists(self) -> None: with ( + patch("vibemouse.doctor.sys.platform", "linux"), patch( "vibemouse.doctor._check_config_load", return_value=( @@ -216,6 +261,7 @@ def test_run_doctor_returns_nonzero_when_fail_exists(self) -> None: def test_run_doctor_with_fix_invokes_fix_path(self) -> None: with ( + patch("vibemouse.doctor.sys.platform", "linux"), patch("vibemouse.doctor._apply_doctor_fixes") as apply_fixes, patch( "vibemouse.doctor._check_config_load", @@ -258,3 +304,55 @@ def test_run_doctor_with_fix_invokes_fix_path(self) -> None: self.assertEqual(rc, 0) self.assertEqual(apply_fixes.call_count, 1) + + def test_run_doctor_on_windows_uses_windows_checks(self) -> None: + with ( + patch("vibemouse.doctor.sys.platform", "win32"), + patch( + "vibemouse.doctor._check_config_load", + return_value=( + DoctorCheck("config", "ok", "ok"), + cast( + AppConfig, + cast( + object, + SimpleNamespace( + openclaw_command="openclaw", + openclaw_agent="main", + sample_rate=16000, + channels=1, + ), + ), + ), + ), + ), + patch("vibemouse.doctor._check_openclaw", return_value=[]), + patch( + "vibemouse.doctor._check_audio_input", + return_value=DoctorCheck("audio", "ok", "ok"), + ), + patch( + "vibemouse.doctor._check_windows_input_hooks", + return_value=DoctorCheck("hooks", "ok", "ok"), + ) as hooks_check, + patch( + "vibemouse.doctor._check_windows_startup_entry", + return_value=DoctorCheck("startup", "ok", "ok"), + ) as startup_check, + patch( + "vibemouse.doctor._check_windows_background_process", + return_value=DoctorCheck("process", "ok", "ok"), + ) as process_check, + patch("vibemouse.doctor._check_input_device_permissions") as linux_input, + patch("vibemouse.doctor._check_hyprland_return_bind_conflict") as linux_bind, + patch("vibemouse.doctor._check_user_service_state") as linux_service, + ): + rc = run_doctor() + + self.assertEqual(rc, 0) + self.assertEqual(hooks_check.call_count, 1) + self.assertEqual(startup_check.call_count, 1) + self.assertEqual(process_check.call_count, 1) + self.assertEqual(linux_input.call_count, 0) + self.assertEqual(linux_bind.call_count, 0) + self.assertEqual(linux_service.call_count, 0) diff --git a/tests/platform/test_system_integration.py b/tests/platform/test_system_integration.py index fbfc4c1..d7c3620 100644 --- a/tests/platform/test_system_integration.py +++ b/tests/platform/test_system_integration.py @@ -8,6 +8,7 @@ from vibemouse.system_integration import ( HyprlandSystemIntegration, NoopSystemIntegration, + WindowsSystemIntegration, create_system_integration, detect_hyprland_session, is_browser_window_payload, @@ -37,9 +38,9 @@ def test_factory_returns_noop_integration(self) -> None: integration = create_system_integration(env={}, platform_name="linux") self.assertIsInstance(integration, NoopSystemIntegration) - def test_factory_returns_noop_on_non_hyprland_windows(self) -> None: + def test_factory_returns_windows_integration_on_non_hyprland_windows(self) -> None: integration = create_system_integration(env={}, platform_name="win32") - self.assertIsInstance(integration, NoopSystemIntegration) + self.assertIsInstance(integration, WindowsSystemIntegration) def test_factory_returns_noop_on_non_hyprland_macos(self) -> None: integration = create_system_integration(env={}, platform_name="darwin") @@ -163,16 +164,22 @@ def test_browser_payload_detection_false_for_terminal_window(self) -> None: self.assertFalse(is_browser_window_payload(payload)) def test_probe_text_input_focus_returns_true_when_script_outputs_one(self) -> None: - with patch( - "vibemouse.system_integration.subprocess.run", - return_value=SimpleNamespace(returncode=0, stdout="1\n"), + with ( + patch("vibemouse.system_integration.sys.platform", "linux"), + patch( + "vibemouse.system_integration.subprocess.run", + return_value=SimpleNamespace(returncode=0, stdout="1\n"), + ), ): self.assertTrue(probe_text_input_focus_via_atspi()) def test_probe_text_input_focus_timeout_returns_false(self) -> None: - with patch( - "vibemouse.system_integration.subprocess.run", - side_effect=subprocess.TimeoutExpired(cmd=["python3"], timeout=1.5), + with ( + patch("vibemouse.system_integration.sys.platform", "linux"), + patch( + "vibemouse.system_integration.subprocess.run", + side_effect=subprocess.TimeoutExpired(cmd=["python3"], timeout=1.5), + ), ): self.assertFalse(probe_text_input_focus_via_atspi()) @@ -200,3 +207,27 @@ def generate_keyboard_event( def test_probe_send_enter_without_module_returns_false(self) -> None: self.assertFalse(probe_send_enter_via_atspi(atspi_module=None, lazy_load=False)) + + +class WindowsSystemIntegrationTests(unittest.TestCase): + def test_windows_paste_shortcuts_match_terminal_conventions(self) -> None: + integration = WindowsSystemIntegration() + self.assertEqual( + integration.paste_shortcuts(terminal_active=True), + (("CTRL SHIFT", "V"), ("SHIFT", "Insert"), ("CTRL", "V")), + ) + self.assertEqual( + integration.paste_shortcuts(terminal_active=False), + (("CTRL", "V"),), + ) + + def test_windows_switch_workspace_uses_ctrl_win_arrow(self) -> None: + integration = WindowsSystemIntegration() + with patch.object(integration, "send_shortcut", return_value=True) as send_shortcut: + ok = integration.switch_workspace("left") + + self.assertTrue(ok) + self.assertEqual( + send_shortcut.call_args.kwargs, + {"mod": "CTRL WIN", "key": "Left"}, + ) diff --git a/tests/test_config.py b/tests/test_config.py index d4fbf07..e1f51fa 100644 --- a/tests/test_config.py +++ b/tests/test_config.py @@ -1,7 +1,9 @@ from __future__ import annotations import os +import tempfile import unittest +from pathlib import Path from unittest.mock import patch from vibemouse.config import load_config @@ -35,7 +37,8 @@ def test_defaults_disable_trust_remote_code(self) -> None: self.assertEqual(config.openclaw_retries, 0) self.assertEqual(config.front_button, "x1") self.assertEqual(config.rear_button, "x2") - self.assertEqual(config.record_hotkey_keycodes, (42, 125, 193)) + expected_hotkeys = (16, 91, 134) if os.name == "nt" else (42, 125, 193) + self.assertEqual(config.record_hotkey_keycodes, expected_hotkeys) self.assertIsNone(config.recording_submit_keycode) self.assertEqual(config.bindings, {}) @@ -53,6 +56,17 @@ def test_record_hotkey_keycodes_can_be_configured(self) -> None: self.assertEqual(config.record_hotkey_keycodes, (58, 125, 193)) + def test_windows_defaults_use_windows_hotkeys(self) -> None: + with tempfile.TemporaryDirectory(prefix="vibemouse-config-") as tmp: + config_path = Path(tmp) / "config.json" + with ( + patch("vibemouse.config.schema.sys.platform", "win32"), + patch.dict(os.environ, {}, clear=True), + ): + config = load_config(config_path) + + self.assertEqual(config.record_hotkey_keycodes, (16, 91, 134)) + def test_duplicate_record_hotkey_keycodes_are_rejected(self) -> None: with patch.dict( os.environ, diff --git a/tests/test_config_store.py b/tests/test_config_store.py index 74bb4fb..db57bf5 100644 --- a/tests/test_config_store.py +++ b/tests/test_config_store.py @@ -1,6 +1,7 @@ from __future__ import annotations import json +import os import tempfile import unittest from pathlib import Path @@ -20,7 +21,8 @@ def test_missing_config_file_uses_defaults(self) -> None: self.assertEqual(config.log_level, "INFO") self.assertEqual(config.front_button, "x1") self.assertEqual(config.rear_button, "x2") - self.assertEqual(config.record_hotkey_keycodes, (42, 125, 193)) + expected_hotkeys = (16, 91, 134) if os.name == "nt" else (42, 125, 193) + self.assertEqual(config.record_hotkey_keycodes, expected_hotkeys) self.assertEqual(config.status_file.name, "vibemouse-status.json") def test_json_config_values_are_loaded(self) -> None: diff --git a/vibemouse/config/schema.py b/vibemouse/config/schema.py index 370986e..26da99c 100644 --- a/vibemouse/config/schema.py +++ b/vibemouse/config/schema.py @@ -97,6 +97,9 @@ def default_status_file() -> Path: def build_default_config_document() -> dict[str, object]: + record_hotkey_keycodes = ( + [16, 91, 134] if sys.platform.startswith("win") else [42, 125, 193] + ) return { "schema_version": LATEST_CONFIG_SCHEMA_VERSION, "bindings": {}, @@ -119,7 +122,7 @@ def build_default_config_document() -> dict[str, object]: "input": { "front_button": "x1", "rear_button": "x2", - "record_hotkey_keycodes": [42, 125, 193], + "record_hotkey_keycodes": record_hotkey_keycodes, "recording_submit_keycode": None, "button_debounce_ms": 150, "gestures_enabled": False, diff --git a/vibemouse/core/app.py b/vibemouse/core/app.py index e48e784..f430162 100644 --- a/vibemouse/core/app.py +++ b/vibemouse/core/app.py @@ -551,7 +551,10 @@ def _start_command_server(self) -> None: return self._command_server = AgentCommandServer(on_command=self._execute_command) self._command_server.start() - _LOG.info("Agent command server listening on 127.0.0.1:%s", self._command_server.port) + _LOG.info( + "Agent command server listening on %s", + self._command_server.endpoint, + ) def _start_listener_mode(self) -> None: if self._listener_mode == "child": @@ -629,8 +632,10 @@ def _set_recording_status( "listener_mode": mode, } command_server = getattr(self, "_command_server", None) - if command_server is not None and getattr(command_server, "port", 0): - payload["ipc_port"] = int(command_server.port) + if command_server is not None: + endpoint = getattr(command_server, "endpoint", "") + if isinstance(endpoint, str) and endpoint: + payload["ipc_socket"] = endpoint try: write_status(self._config.status_file, payload) except Exception: diff --git a/vibemouse/ipc/__init__.py b/vibemouse/ipc/__init__.py index 1b17322..ac88778 100644 --- a/vibemouse/ipc/__init__.py +++ b/vibemouse/ipc/__init__.py @@ -12,13 +12,14 @@ serialize_message, write_lpjson_frame, ) -from vibemouse.ipc.server import AgentCommandServer, IPCServer +from vibemouse.ipc.server import AgentCommandServer, IPCServer, default_command_endpoint __all__ = [ "AgentCommandServer", "binary_reader", "binary_writer", "CommandMessage", + "default_command_endpoint", "EventMessage", "IPCClient", "IPCServer", diff --git a/vibemouse/ipc/server.py b/vibemouse/ipc/server.py index a0fe50c..8b3f894 100644 --- a/vibemouse/ipc/server.py +++ b/vibemouse/ipc/server.py @@ -1,11 +1,14 @@ -"""IPC servers for stream and loopback command transports.""" +"""IPC servers for stream and local command transports.""" from __future__ import annotations import logging +import os import socket +import sys +import tempfile import threading -from typing import Any, Callable +from typing import Any, Callable, Protocol from vibemouse.ipc.messages import ( _decode_lpjson, @@ -18,6 +21,21 @@ _LOG = logging.getLogger(__name__) + +class _ReadableStream(Protocol): + def read(self, size: int) -> bytes: ... + + def close(self) -> None: ... + + +def default_command_endpoint() -> str: + """Return the stable local command endpoint for the current platform.""" + if sys.platform == "win32": + return r"\\.\pipe\vibemouse" + runtime_dir = os.getenv("XDG_RUNTIME_DIR", tempfile.gettempdir()) + return os.path.join(runtime_dir, "vibemouse.sock") + + class IPCServer: """ Server that reads events from a listener child's stdout and optionally @@ -89,49 +107,67 @@ def _run(self) -> None: class AgentCommandServer: - """Loopback-only command server for external clients driving the agent.""" + """Stable local command server for external clients driving the agent.""" def __init__( self, *, on_command: Callable[[str], None], - host: str = "127.0.0.1", - port: int = 0, + endpoint: str | None = None, ) -> None: self._on_command = on_command - self._host = host - self._requested_port = port - self._listener: socket.socket | None = None - self._port = 0 + self._endpoint = endpoint or default_command_endpoint() + self._listener: socket.socket | int | None = None self._running = False self._accept_thread: threading.Thread | None = None self._client_threads: set[threading.Thread] = set() - self._client_connections: set[socket.socket] = set() + self._client_connections: set[Any] = set() self._clients_lock = threading.Lock() + @property + def endpoint(self) -> str: + return self._endpoint + @property def port(self) -> int: - return self._port + # Backward-compatible attribute for older callers; stable endpoints do not use a port. + return 0 def start(self) -> None: if self._listener is not None: return - listener = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - listener.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) - listener.bind((self._host, self._requested_port)) + self._running = True + if sys.platform == "win32": + self._accept_thread = threading.Thread( + target=self._accept_loop_pipe, + daemon=True, + ) + self._accept_thread.start() + return + + endpoint_dir = os.path.dirname(self._endpoint) + if endpoint_dir: + os.makedirs(endpoint_dir, exist_ok=True) + try: + os.unlink(self._endpoint) + except FileNotFoundError: + pass + listener = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) + listener.bind(self._endpoint) listener.listen() listener.settimeout(0.2) self._listener = listener - self._port = int(listener.getsockname()[1]) - self._running = True - self._accept_thread = threading.Thread(target=self._accept_loop, daemon=True) + self._accept_thread = threading.Thread(target=self._accept_loop_socket, daemon=True) self._accept_thread.start() def stop(self) -> None: self._running = False listener = self._listener self._listener = None - if listener is not None: + if sys.platform == "win32": + if isinstance(listener, int): + _close_pipe_handle(listener) + elif isinstance(listener, socket.socket): try: listener.close() except OSError: @@ -143,6 +179,8 @@ def stop(self) -> None: conn.close() except OSError: pass + except Exception: + pass if self._accept_thread is not None: self._accept_thread.join(timeout=2) self._accept_thread = None @@ -153,11 +191,17 @@ def stop(self) -> None: with self._clients_lock: self._client_threads.clear() self._client_connections.clear() - self._port = 0 + if sys.platform != "win32": + try: + os.unlink(self._endpoint) + except FileNotFoundError: + pass + except OSError: + pass - def _accept_loop(self) -> None: + def _accept_loop_socket(self) -> None: listener = self._listener - if listener is None: + if not isinstance(listener, socket.socket): return while self._running: try: @@ -171,8 +215,32 @@ def _accept_loop(self) -> None: with self._clients_lock: self._client_connections.add(conn) thread = threading.Thread( - target=self._serve_client, - args=(conn,), + target=self._serve_stream, + args=(conn.makefile("rwb"), conn), + daemon=True, + ) + with self._clients_lock: + self._client_threads.add(thread) + thread.start() + self._running = False + + def _accept_loop_pipe(self) -> None: + while self._running: + handle = _create_named_pipe(self._endpoint) + self._listener = handle + if handle is None: + break + if not _connect_named_pipe(handle): + _close_pipe_handle(handle) + if self._running: + _LOG.warning("Command server named pipe connect failed") + continue + stream = _NamedPipeStream(handle) + with self._clients_lock: + self._client_connections.add(stream) + thread = threading.Thread( + target=self._serve_stream, + args=(stream, stream), daemon=True, ) with self._clients_lock: @@ -180,8 +248,7 @@ def _accept_loop(self) -> None: thread.start() self._running = False - def _serve_client(self, conn: socket.socket) -> None: - stream = conn.makefile("rwb") + def _serve_stream(self, stream: _ReadableStream, connection: Any) -> None: current = threading.current_thread() try: while self._running: @@ -203,10 +270,133 @@ def _serve_client(self, conn: socket.socket) -> None: stream.close() except OSError: pass + except Exception: + pass try: - conn.close() + connection.close() except OSError: pass + except Exception: + pass with self._clients_lock: - self._client_connections.discard(conn) + self._client_connections.discard(connection) self._client_threads.discard(current) + + +if sys.platform == "win32": + import ctypes + from ctypes import wintypes + + _INVALID_HANDLE_VALUE = ctypes.c_void_p(-1).value + _ERROR_BROKEN_PIPE = 109 + _ERROR_PIPE_CONNECTED = 535 + _PIPE_ACCESS_INBOUND = 0x00000001 + _PIPE_TYPE_BYTE = 0x00000000 + _PIPE_READMODE_BYTE = 0x00000000 + _PIPE_WAIT = 0x00000000 + _PIPE_UNLIMITED_INSTANCES = 255 + + _kernel32 = ctypes.WinDLL("kernel32", use_last_error=True) + _kernel32.CreateNamedPipeW.argtypes = [ + wintypes.LPCWSTR, + wintypes.DWORD, + wintypes.DWORD, + wintypes.DWORD, + wintypes.DWORD, + wintypes.DWORD, + wintypes.DWORD, + wintypes.LPVOID, + ] + _kernel32.CreateNamedPipeW.restype = wintypes.HANDLE + _kernel32.ConnectNamedPipe.argtypes = [wintypes.HANDLE, wintypes.LPVOID] + _kernel32.ConnectNamedPipe.restype = wintypes.BOOL + _kernel32.ReadFile.argtypes = [ + wintypes.HANDLE, + wintypes.LPVOID, + wintypes.DWORD, + ctypes.POINTER(wintypes.DWORD), + wintypes.LPVOID, + ] + _kernel32.ReadFile.restype = wintypes.BOOL + _kernel32.DisconnectNamedPipe.argtypes = [wintypes.HANDLE] + _kernel32.DisconnectNamedPipe.restype = wintypes.BOOL + _kernel32.CloseHandle.argtypes = [wintypes.HANDLE] + _kernel32.CloseHandle.restype = wintypes.BOOL + + + def _create_named_pipe(endpoint: str) -> int | None: + handle = _kernel32.CreateNamedPipeW( + endpoint, + _PIPE_ACCESS_INBOUND, + _PIPE_TYPE_BYTE | _PIPE_READMODE_BYTE | _PIPE_WAIT, + _PIPE_UNLIMITED_INSTANCES, + 65536, + 65536, + 0, + None, + ) + if handle == _INVALID_HANDLE_VALUE: + error = ctypes.get_last_error() + _LOG.warning("Failed to create named pipe %s: winerror=%s", endpoint, error) + return None + return int(handle) + + + def _connect_named_pipe(handle: int) -> bool: + ok = bool(_kernel32.ConnectNamedPipe(handle, None)) + if ok: + return True + error = ctypes.get_last_error() + return error == _ERROR_PIPE_CONNECTED + + + def _close_pipe_handle(handle: int) -> None: + try: + _kernel32.DisconnectNamedPipe(handle) + except Exception: + pass + _kernel32.CloseHandle(handle) + + + class _NamedPipeStream: + def __init__(self, handle: int) -> None: + self._handle = handle + self._closed = False + + def read(self, size: int) -> bytes: + if self._closed: + return b"" + chunks: list[bytes] = [] + remaining = size + while remaining > 0: + buffer = ctypes.create_string_buffer(remaining) + read = wintypes.DWORD(0) + ok = bool( + _kernel32.ReadFile( + self._handle, + buffer, + remaining, + ctypes.byref(read), + None, + ) + ) + if not ok: + error = ctypes.get_last_error() + if error == _ERROR_BROKEN_PIPE: + if not chunks: + return b"" + raise ValueError("Pipe closed mid-frame") + raise OSError(f"Named pipe read failed: winerror={error}") + if read.value == 0: + return b"" if not chunks else b"".join(chunks) + chunk = buffer.raw[: read.value] + chunks.append(chunk) + remaining -= read.value + return b"".join(chunks) + + def close(self) -> None: + if self._closed: + return + self._closed = True + _close_pipe_handle(self._handle) + diff --git a/vibemouse/listener/keyboard_listener.py b/vibemouse/listener/keyboard_listener.py index c90a0dd..1daaf9d 100644 --- a/vibemouse/listener/keyboard_listener.py +++ b/vibemouse/listener/keyboard_listener.py @@ -2,6 +2,7 @@ import importlib import select +import sys import threading import time from collections.abc import Callable @@ -32,7 +33,9 @@ def __init__( self._on_hotkey: HotkeyCallback | None = on_hotkey self._on_event: EventCallback | None = on_event self._event_name: str | None = event_name - self._combo: frozenset[int] = frozenset(keycodes) + self._combo: frozenset[int] = frozenset( + self._normalize_configured_keycode(code) for code in keycodes + ) self._debounce_s: float = max(0.0, debounce_s) self._rescan_interval_s: float = max(0.2, rescan_interval_s) self._state_lock: threading.Lock = threading.Lock() @@ -58,7 +61,10 @@ def _run(self) -> None: last_error_summary: str | None = None while not self._stop.is_set(): try: - self._run_evdev() + if sys.platform.startswith("win"): + self._run_pynput(timeout_s=self._rescan_interval_s) + else: + self._run_evdev() self._reset_pressed_state() continue except Exception as error: @@ -89,7 +95,10 @@ def _run_evdev(self) -> None: try: caps = dev.capabilities() key_cap = caps.get(ecodes.EV_KEY, []) - if not any(code in key_cap for code in self._combo): + normalized_caps = { + self._normalize_configured_keycode(int(code)) for code in key_cap + } + if not any(code in normalized_caps for code in self._combo): dev.close() continue if ecodes.KEY_A not in key_cap: @@ -132,17 +141,52 @@ def _run_evdev(self) -> None: for dev in devices: dev.close() + def _run_pynput(self, *, timeout_s: float | None = None) -> None: + try: + keyboard_module = importlib.import_module("pynput.keyboard") + except Exception as error: + raise RuntimeError("pynput.keyboard is not available") from error + + listener_ctor = cast(_KeyboardListenerCtor, getattr(keyboard_module, "Listener")) + + def on_press(key: object) -> None: + keycode = self._extract_windows_keycode(key) + if keycode is None: + return + if self._process_key_event(keycode, 1): + self._dispatch_hotkey() + + def on_release(key: object) -> None: + keycode = self._extract_windows_keycode(key) + if keycode is None: + return + _ = self._process_key_event(keycode, 0) + + listener = listener_ctor(on_press=on_press, on_release=on_release) + listener.start() + deadline: float | None = None + if timeout_s is not None: + deadline = time.monotonic() + max(0.2, timeout_s) + try: + while not self._stop.is_set(): + if deadline is not None and time.monotonic() >= deadline: + return + time.sleep(0.2) + finally: + listener.stop() + def _reset_pressed_state(self) -> None: with self._state_lock: self._pressed.clear() self._combo_latched = False def _process_key_event(self, keycode: int, value: int) -> bool: + normalized = self._normalize_runtime_keycode(keycode) with self._state_lock: if value == 1: - self._pressed.add(keycode) + self._pressed.add(normalized) elif value == 0: - self._pressed.discard(keycode) + self._pressed.discard(normalized) else: return False @@ -160,6 +204,41 @@ def _process_key_event(self, keycode: int, value: int) -> bool: return True return False + @classmethod + def _normalize_configured_keycode(cls, keycode: int) -> int: + return cls._normalize_windows_vk(keycode) + + @classmethod + def _normalize_runtime_keycode(cls, keycode: int) -> int: + return cls._normalize_windows_vk(keycode) + + @staticmethod + def _normalize_windows_vk(keycode: int) -> int: + mapping = { + 160: 16, + 161: 16, + 162: 17, + 163: 17, + 164: 18, + 165: 18, + 92: 91, + } + return mapping.get(int(keycode), int(keycode)) + + @classmethod + def _extract_windows_keycode(cls, key: object) -> int | None: + vk = getattr(key, "vk", None) + if isinstance(vk, int): + return cls._normalize_windows_vk(vk) + value = getattr(key, "value", None) + nested_vk = getattr(value, "vk", None) + if isinstance(nested_vk, int): + return cls._normalize_windows_vk(nested_vk) + char = getattr(key, "char", None) + if isinstance(char, str) and len(char) == 1: + return ord(char.upper()) + return None + def _dispatch_hotkey(self) -> None: event_name = self._event_name if self._on_event is not None and isinstance(event_name, str) and event_name: @@ -197,3 +276,18 @@ def __call__(self) -> list[str]: ... class _Ecodes(Protocol): EV_KEY: int KEY_A: int + + +class _KeyboardListener(Protocol): + def start(self) -> None: ... + + def stop(self) -> None: ... + + +class _KeyboardListenerCtor(Protocol): + def __call__( + self, + *, + on_press: Callable[[object], None] | None = None, + on_release: Callable[[object], None] | None = None, + ) -> _KeyboardListener: ... diff --git a/vibemouse/listener/mouse_listener.py b/vibemouse/listener/mouse_listener.py index f85a912..16c63c5 100644 --- a/vibemouse/listener/mouse_listener.py +++ b/vibemouse/listener/mouse_listener.py @@ -100,6 +100,9 @@ def __init__( self._right_tap_timeout_s: float = 0.30 self._right_click_slop_px: int = 8 self._right_hold_suppress_timeout_s: float = 8.0 + self._injected_click_lock: threading.Lock = threading.Lock() + self._ignore_injected_right_click_events: int = 0 + self._ignore_injected_right_click_until: float = 0.0 self._stop: threading.Event = threading.Event() self._thread: threading.Thread | None = None @@ -349,7 +352,11 @@ def _run_pynput(self, *, timeout_s: float | None = None) -> None: front_candidates = button_map[self._front_button] rear_candidates = button_map[self._rear_button] - right_candidates = {"right", "button2"} + right_candidates = ( + {"right", "button2"} + if self._gestures_enabled and self._gesture_trigger_button == "right" + else set() + ) def on_click(x: int, y: int, button: object, pressed: bool) -> None: btn_name = str(button).lower().split(".")[-1] @@ -363,6 +370,9 @@ def on_click(x: int, y: int, button: object, pressed: bool) -> None: if button_label is None: return + if button_label == "right" and self._should_ignore_injected_right_click(): + _LOG.debug("Ignoring self-injected right click event from pynput backend") + return if self._gestures_enabled and self._is_gesture_trigger_button(button_label): if button_label == "right": @@ -537,6 +547,7 @@ def _consume_right_trigger_release(self) -> tuple[bool, str | None]: return should_replay, direction def _dispatch_right_click(self) -> None: + self._arm_injected_right_click_ignore() mouse_module = importlib.import_module("pynput.mouse") controller_ctor = cast( _MouseControllerCtor, @@ -551,6 +562,25 @@ def _dispatch_right_click(self) -> None: time.sleep(0.012) controller.release(button_holder.right) + def _arm_injected_right_click_ignore(self) -> None: + with self._injected_click_lock: + # Ignore the next synthetic press/release pair that we inject ourselves. + self._ignore_injected_right_click_events = 2 + self._ignore_injected_right_click_until = time.monotonic() + 0.5 + + def _should_ignore_injected_right_click(self) -> bool: + with self._injected_click_lock: + if self._ignore_injected_right_click_events <= 0: + return False + if time.monotonic() > self._ignore_injected_right_click_until: + self._ignore_injected_right_click_events = 0 + self._ignore_injected_right_click_until = 0.0 + return False + self._ignore_injected_right_click_events -= 1 + if self._ignore_injected_right_click_events <= 0: + self._ignore_injected_right_click_until = 0.0 + return True + def _start_gesture_capture( self, *, diff --git a/vibemouse/ops/deploy.py b/vibemouse/ops/deploy.py index ebe2418..3765213 100644 --- a/vibemouse/ops/deploy.py +++ b/vibemouse/ops/deploy.py @@ -1,6 +1,7 @@ from __future__ import annotations import argparse +import os import shlex import shutil import subprocess @@ -42,18 +43,13 @@ def configure_deploy_parser(parser: argparse.ArgumentParser) -> None: ) _ = parser.add_argument( "--env-file", - default=str(Path.home() / ".config" / "vibemouse" / "deploy.env"), - help="path to generated EnvironmentFile", - ) - _ = parser.add_argument( - "--service-file", - default=str(Path.home() / ".config" / "systemd" / "user" / "vibemouse.service"), - help="path to generated systemd user service file", + default=str(_default_env_file()), + help="path to generated environment file", ) _ = parser.add_argument( "--log-file", - default=str(Path.home() / ".local" / "state" / "vibemouse" / "service.log"), - help="path to persistent service log file", + default=str(_default_log_file()), + help="path to persistent log file", ) _ = parser.add_argument( "--openclaw-command", @@ -76,17 +72,42 @@ def configure_deploy_parser(parser: argparse.ArgumentParser) -> None: default=None, help="override ExecStart command", ) - _ = parser.add_argument( - "--skip-systemctl", - action="store_true", - help="skip systemctl enable/restart operations", - ) _ = parser.add_argument( "--dry-run", action="store_true", help="print plan without writing files", ) + if _is_windows(): + _ = parser.add_argument( + "--launcher-file", + default=str(_default_windows_launcher_file()), + help="path to generated PowerShell launcher", + ) + _ = parser.add_argument( + "--startup-file", + default=str(_default_windows_startup_file()), + help="path to generated Startup-folder entry", + ) + _ = parser.add_argument( + "--skip-register", + action="store_true", + help="skip creating the Startup-folder entry", + ) + else: + _ = parser.add_argument( + "--service-file", + default=str( + Path.home() / ".config" / "systemd" / "user" / "vibemouse.service" + ), + help="path to generated systemd user service file", + ) + _ = parser.add_argument( + "--skip-systemctl", + action="store_true", + help="skip systemctl enable/restart operations", + ) + def run_deploy(args: argparse.Namespace) -> int: preset = str(getattr(args, "preset", "stable")) @@ -108,7 +129,6 @@ def run_deploy(args: argparse.Namespace) -> int: return 1 env_path = Path(str(getattr(args, "env_file", ""))).expanduser() - service_path = Path(str(getattr(args, "service_file", ""))).expanduser() log_path = Path(str(getattr(args, "log_file", ""))).expanduser() exec_start = _resolve_exec_start(str(getattr(args, "exec_start", "") or "")) @@ -119,14 +139,45 @@ def run_deploy(args: argparse.Namespace) -> int: openclaw_retries=retries_override, ) env_content = render_env_file(env_map) + + if _is_windows(): + launcher_path = Path(str(getattr(args, "launcher_file", ""))).expanduser() + startup_path = Path(str(getattr(args, "startup_file", ""))).expanduser() + launcher_content = render_windows_launcher( + env_file=env_path, + log_file=log_path, + exec_start=exec_start, + ) + startup_content = render_windows_startup_file(launcher_file=launcher_path) + + if bool(getattr(args, "dry_run", False)): + print(f"[DRY-RUN] would write {env_path}") + print(f"[DRY-RUN] would write {launcher_path}") + if not bool(getattr(args, "skip_register", False)): + print(f"[DRY-RUN] would write {startup_path}") + print(f"[DRY-RUN] preset={preset}") + print(f"[DRY-RUN] exec_start={exec_start}") + return 0 + + _write_text(env_path, env_content) + _write_text(launcher_path, launcher_content) + print(f"Wrote {env_path}") + print(f"Wrote {launcher_path}") + if not bool(getattr(args, "skip_register", False)): + _write_text(startup_path, startup_content) + print(f"Wrote {startup_path}") + + print("Running doctor checks...") + return run_doctor() + + service_path = Path(str(getattr(args, "service_file", ""))).expanduser() service_content = render_service_file( env_file=env_path, log_file=log_path, exec_start=exec_start, ) - dry_run = bool(getattr(args, "dry_run", False)) - if dry_run: + if bool(getattr(args, "dry_run", False)): print(f"[DRY-RUN] would write {env_path}") print(f"[DRY-RUN] would write {service_path}") print(f"[DRY-RUN] preset={preset}") @@ -158,6 +209,11 @@ def build_deploy_env( openclaw_agent: str, openclaw_retries: int | None, ) -> dict[str, str]: + status_file = ( + _default_windows_status_file() + if _is_windows() + else "%t/vibemouse-status.json" + ) base = { "VIBEMOUSE_BACKEND": "funasr_onnx", "VIBEMOUSE_DEVICE": "cpu", @@ -166,7 +222,7 @@ def build_deploy_env( "VIBEMOUSE_OPENCLAW_COMMAND": openclaw_command, "VIBEMOUSE_OPENCLAW_AGENT": openclaw_agent, "VIBEMOUSE_OPENCLAW_TIMEOUT_S": "20.0", - "VIBEMOUSE_STATUS_FILE": "%t/vibemouse-status.json", + "VIBEMOUSE_STATUS_FILE": status_file, } base.update(_PRESET_OVERRIDES[preset]) if openclaw_retries is not None: @@ -177,7 +233,7 @@ def build_deploy_env( def render_env_file(env_map: dict[str, str]) -> str: lines = [ "# Generated by `vibemouse deploy`.", - "# Edit values if needed, then: systemctl --user restart vibemouse.service", + "# Edit values if needed, then restart VibeMouse.", ] for key in sorted(env_map.keys()): lines.append(f"{key}={_quote_env_value(env_map[key])}") @@ -212,6 +268,56 @@ def render_service_file(*, env_file: Path, log_file: Path, exec_start: str) -> s return "\n".join(lines) +def render_windows_launcher(*, env_file: Path, log_file: Path, exec_start: str) -> str: + lines = [ + '$ErrorActionPreference = "Stop"', + f"$envFile = '{_ps_single_quote(str(env_file))}'", + f"$logFile = '{_ps_single_quote(str(log_file))}'", + "$commandLine = @'", + exec_start, + "'@.Trim()", + "", + "if (Test-Path $envFile) {", + " Get-Content $envFile | ForEach-Object {", + " $line = $_.Trim()", + ' if (-not $line -or $line.StartsWith("#")) { return }', + ' $parts = $line -split "=", 2', + " if ($parts.Length -ne 2) { return }", + " $name = $parts[0].Trim()", + " $value = $parts[1].Trim()", + ' if ($value.Length -ge 2 -and $value.StartsWith(\'"\') -and $value.EndsWith(\'"\')) {', + " $value = $value.Substring(1, $value.Length - 2)", + ' $value = $value.Replace(\'\\\\\', \'\\\')', + ' $value = $value.Replace(\'\\"\', \'"\')', + " }", + ' [System.Environment]::SetEnvironmentVariable($name, $value, "Process")', + " }", + "}", + "", + "$logDir = Split-Path -Parent $logFile", + 'New-Item -ItemType Directory -Path $logDir -Force | Out-Null', + '& cmd.exe /d /c "$commandLine >> `"$logFile`" 2>>&1"', + "exit $LASTEXITCODE", + "", + ] + return "\n".join(lines) + + +def render_windows_startup_file(*, launcher_file: Path) -> str: + launcher = str(launcher_file).replace('"', '""') + command = ( + 'powershell.exe -NoProfile -ExecutionPolicy Bypass -WindowStyle Hidden ' + + f'-File "{launcher}"' + ) + return "\n".join( + [ + 'Set shell = CreateObject("WScript.Shell")', + f'shell.Run "{command.replace(chr(34), chr(34) * 2)}", 0, False', + "", + ] + ) + + def _quote_env_value(value: str) -> str: escaped = value.replace("\\", "\\\\").replace('"', '\\"') return f'"{escaped}"' @@ -224,10 +330,10 @@ def _resolve_exec_start(raw_exec_start: str) -> str: vibemouse_bin = shutil.which("vibemouse") if vibemouse_bin: - return f"{vibemouse_bin} run" + return f'{_quote_shell_path(vibemouse_bin)} run' python_bin = sys.executable - return f"{python_bin} -m vibemouse.main run" + return f'{_quote_shell_path(python_bin)} -m vibemouse.main run' def _write_text(path: Path, content: str) -> None: @@ -266,3 +372,66 @@ def validate_openclaw_command(raw: str) -> bool: except ValueError: return False return bool(parts) + + +def _is_windows() -> bool: + return sys.platform.startswith("win") + + +def _default_env_file() -> Path: + if _is_windows(): + return _windows_roaming_dir() / "VibeMouse" / "deploy.env" + return Path.home() / ".config" / "vibemouse" / "deploy.env" + + +def _default_log_file() -> Path: + if _is_windows(): + return _windows_local_dir() / "VibeMouse" / "service.log" + return Path.home() / ".local" / "state" / "vibemouse" / "service.log" + + +def _default_windows_status_file() -> str: + return str(_windows_local_dir() / "VibeMouse" / "vibemouse-status.json") + + +def _default_windows_launcher_file() -> Path: + return _windows_roaming_dir() / "VibeMouse" / "vibemouse-launch.ps1" + + +def _default_windows_startup_file() -> Path: + return _windows_startup_dir() / "vibemouse.vbs" + + +def _windows_roaming_dir() -> Path: + appdata = os.getenv("APPDATA") + if appdata: + return Path(appdata) + return Path.home() / "AppData" / "Roaming" + + +def _windows_local_dir() -> Path: + localappdata = os.getenv("LOCALAPPDATA") + if localappdata: + return Path(localappdata) + return Path.home() / "AppData" / "Local" + + +def _windows_startup_dir() -> Path: + return ( + _windows_roaming_dir() + / "Microsoft" + / "Windows" + / "Start Menu" + / "Programs" + / "Startup" + ) + + +def _quote_shell_path(path: str) -> str: + if " " in path or "\t" in path: + return f'"{path}"' + return path + + +def _ps_single_quote(value: str) -> str: + return value.replace("'", "''") diff --git a/vibemouse/ops/doctor.py b/vibemouse/ops/doctor.py index 3cdbac1..84f0e7b 100644 --- a/vibemouse/ops/doctor.py +++ b/vibemouse/ops/doctor.py @@ -2,6 +2,7 @@ import importlib import json +import os import shlex import shutil import subprocess @@ -33,10 +34,14 @@ def run_doctor(*, apply_fixes: bool = False) -> int: checks.extend(_check_openclaw(config)) checks.append(_check_audio_input(config)) - checks.append(_check_input_device_permissions(config)) - - checks.append(_check_hyprland_return_bind_conflict(config)) - checks.append(_check_user_service_state()) + if sys.platform.startswith("win"): + checks.append(_check_windows_input_hooks()) + checks.append(_check_windows_startup_entry()) + checks.append(_check_windows_background_process()) + else: + checks.append(_check_input_device_permissions(config)) + checks.append(_check_hyprland_return_bind_conflict(config)) + checks.append(_check_user_service_state()) _print_checks(checks) @@ -47,6 +52,8 @@ def run_doctor(*, apply_fixes: bool = False) -> int: def _apply_doctor_fixes() -> None: + if sys.platform.startswith("win"): + return _fix_hyprland_return_bind_conflict() _ensure_user_service_active() @@ -346,6 +353,98 @@ def _check_audio_input(config: AppConfig | None) -> DoctorCheck: ) +def _check_windows_input_hooks() -> DoctorCheck: + failures: list[str] = [] + for module_name in ("pynput.mouse", "pynput.keyboard"): + try: + _ = importlib.import_module(module_name) + except Exception as error: + failures.append(f"{module_name}: {error}") + + if failures: + return DoctorCheck( + name="input-hooks", + status="fail", + detail="; ".join(failures), + ) + + return DoctorCheck( + name="input-hooks", + status="ok", + detail="pynput mouse and keyboard hooks import successfully", + ) + + +def _check_windows_startup_entry() -> DoctorCheck: + startup_file = _windows_startup_file() + launcher_file = _windows_launcher_file() + + if startup_file.exists() and launcher_file.exists(): + return DoctorCheck( + name="startup-entry", + status="ok", + detail=f"startup entry present: {startup_file}", + ) + + if launcher_file.exists(): + return DoctorCheck( + name="startup-entry", + status="warn", + detail=f"launcher exists but startup entry is missing: {startup_file}", + ) + + return DoctorCheck( + name="startup-entry", + status="warn", + detail=f"startup entry not found: {startup_file}", + ) + + +def _check_windows_background_process() -> DoctorCheck: + script = ( + "$matches = Get-CimInstance Win32_Process | " + "Where-Object { $_.ProcessId -ne $PID -and $_.CommandLine -and (" + "$_.CommandLine -like '*vibemouse.main run*' -or " + "$_.CommandLine -like '*vibemouse-launch.ps1*' -or " + "$_.CommandLine -like '*vibemouse.exe run*'" + ") } | " + "ForEach-Object { $_.CommandLine }; " + "$matches" + ) + probe = _run_subprocess( + ["powershell", "-NoProfile", "-Command", script], + timeout=8.0, + ) + if probe is None: + return DoctorCheck( + name="background-process", + status="warn", + detail="could not query running processes", + ) + + if probe.returncode != 0: + stderr = probe.stderr.strip() + return DoctorCheck( + name="background-process", + status="warn", + detail=stderr or "process query failed", + ) + + lines = [line.strip() for line in probe.stdout.splitlines() if line.strip()] + if lines: + return DoctorCheck( + name="background-process", + status="ok", + detail=f"detected {len(lines)} VibeMouse process(es)", + ) + + return DoctorCheck( + name="background-process", + status="warn", + detail="no running VibeMouse process detected", + ) + + def _check_input_device_permissions(config: AppConfig | None) -> DoctorCheck: if not sys.platform.startswith("linux"): return DoctorCheck( @@ -608,3 +707,26 @@ def _print_checks(checks: list[DoctorCheck]) -> None: "fail": "[FAIL]", }.get(check.status, "[INFO]") print(f"{badge} {check.name}: {check.detail}") + + +def _windows_roaming_dir() -> Path: + appdata = os.getenv("APPDATA") + if appdata: + return Path(appdata) + return Path.home() / "AppData" / "Roaming" + + +def _windows_startup_file() -> Path: + return ( + _windows_roaming_dir() + / "Microsoft" + / "Windows" + / "Start Menu" + / "Programs" + / "Startup" + / "vibemouse.vbs" + ) + + +def _windows_launcher_file() -> Path: + return _windows_roaming_dir() / "VibeMouse" / "vibemouse-launch.ps1" diff --git a/vibemouse/platform/system_integration.py b/vibemouse/platform/system_integration.py index 9d9f43c..47ffb3b 100644 --- a/vibemouse/platform/system_integration.py +++ b/vibemouse/platform/system_integration.py @@ -1,13 +1,18 @@ from __future__ import annotations +import ctypes import importlib import json import os import subprocess import sys +import time from collections.abc import Mapping +from pathlib import Path from typing import Protocol, cast +from ctypes import wintypes + _TERMINAL_CLASS_HINTS: set[str] = { "foot", @@ -29,6 +34,9 @@ "warp", "windowsterminal", "wt", + "cascadia_hosting_window_class", + "consolewindowclass", + "mintty", } _TERMINAL_TITLE_HINTS: set[str] = { @@ -39,6 +47,7 @@ "fish", "powershell", "cmd.exe", + "pwsh", } _BROWSER_CLASS_HINTS: set[str] = { @@ -56,16 +65,104 @@ "opera", "thorium", "browser", + "chrome_widgetwin_1", + "mozillawindowclass", +} + +_WINDOWS_TEXT_CONTROL_CLASSES: set[str] = { + "edit", + "richedit20a", + "richedit20w", + "richedit50w", + "scintilla", + "consolewindowclass", + "cascadia_hosting_window_class", } +if sys.platform.startswith("win"): + _USER32 = ctypes.WinDLL("user32", use_last_error=True) + _KERNEL32 = ctypes.WinDLL("kernel32", use_last_error=True) +else: + _USER32 = None + _KERNEL32 = None + + +class _Point(ctypes.Structure): + _fields_ = [ + ("x", wintypes.LONG), + ("y", wintypes.LONG), + ] + + +class _Rect(ctypes.Structure): + _fields_ = [ + ("left", wintypes.LONG), + ("top", wintypes.LONG), + ("right", wintypes.LONG), + ("bottom", wintypes.LONG), + ] + + +class _GuiThreadInfo(ctypes.Structure): + _fields_ = [ + ("cbSize", wintypes.DWORD), + ("flags", wintypes.DWORD), + ("hwndActive", wintypes.HWND), + ("hwndFocus", wintypes.HWND), + ("hwndCapture", wintypes.HWND), + ("hwndMenuOwner", wintypes.HWND), + ("hwndMoveSize", wintypes.HWND), + ("hwndCaret", wintypes.HWND), + ("rcCaret", _Rect), + ] + + +if _USER32 is not None and _KERNEL32 is not None: + _USER32.GetForegroundWindow.restype = wintypes.HWND + _USER32.GetWindowTextLengthW.argtypes = [wintypes.HWND] + _USER32.GetWindowTextLengthW.restype = ctypes.c_int + _USER32.GetWindowTextW.argtypes = [wintypes.HWND, wintypes.LPWSTR, ctypes.c_int] + _USER32.GetWindowTextW.restype = ctypes.c_int + _USER32.GetClassNameW.argtypes = [wintypes.HWND, wintypes.LPWSTR, ctypes.c_int] + _USER32.GetClassNameW.restype = ctypes.c_int + _USER32.GetWindowThreadProcessId.argtypes = [ + wintypes.HWND, + ctypes.POINTER(wintypes.DWORD), + ] + _USER32.GetWindowThreadProcessId.restype = wintypes.DWORD + _USER32.GetCursorPos.argtypes = [ctypes.POINTER(_Point)] + _USER32.GetCursorPos.restype = wintypes.BOOL + _USER32.SetCursorPos.argtypes = [ctypes.c_int, ctypes.c_int] + _USER32.SetCursorPos.restype = wintypes.BOOL + _USER32.GetGUIThreadInfo.argtypes = [ + wintypes.DWORD, + ctypes.POINTER(_GuiThreadInfo), + ] + _USER32.GetGUIThreadInfo.restype = wintypes.BOOL + + _KERNEL32.OpenProcess.argtypes = [wintypes.DWORD, wintypes.BOOL, wintypes.DWORD] + _KERNEL32.OpenProcess.restype = wintypes.HANDLE + _KERNEL32.QueryFullProcessImageNameW.argtypes = [ + wintypes.HANDLE, + wintypes.DWORD, + wintypes.LPWSTR, + ctypes.POINTER(wintypes.DWORD), + ] + _KERNEL32.QueryFullProcessImageNameW.restype = wintypes.BOOL + _KERNEL32.CloseHandle.argtypes = [wintypes.HANDLE] + _KERNEL32.CloseHandle.restype = wintypes.BOOL + + def is_terminal_window_payload(payload: Mapping[str, object]) -> bool: window_class = str(payload.get("class", "")).lower() initial_class = str(payload.get("initialClass", "")).lower() title = str(payload.get("title", "")).lower() + process = str(payload.get("process", "")).lower() if any( - hint in window_class or hint in initial_class for hint in _TERMINAL_CLASS_HINTS + hint in window_class or hint in initial_class or hint in process + for hint in _TERMINAL_CLASS_HINTS ): return True @@ -76,9 +173,11 @@ def is_browser_window_payload(payload: Mapping[str, object]) -> bool: window_class = str(payload.get("class", "")).lower() initial_class = str(payload.get("initialClass", "")).lower() title = str(payload.get("title", "")).lower() + process = str(payload.get("process", "")).lower() if any( - hint in window_class or hint in initial_class for hint in _BROWSER_CLASS_HINTS + hint in window_class or hint in initial_class or hint in process + for hint in _BROWSER_CLASS_HINTS ): return True @@ -149,6 +248,268 @@ def paste_shortcuts(self, *, terminal_active: bool) -> tuple[tuple[str, str], .. return () +class WindowsSystemIntegration: + def __init__(self) -> None: + self._shortcut_controller: object | None = None + self._shortcut_keys: object | None = None + + @property + def is_hyprland(self) -> bool: + return False + + def send_shortcut(self, *, mod: str, key: str) -> bool: + controller, key_holder = self._load_keyboard_backend() + if controller is None or key_holder is None: + return False + + modifiers: list[object] = [] + for token in mod.strip().upper().split(): + if not token: + continue + resolved = self._resolve_modifier(token, key_holder) + if resolved is None: + return False + modifiers.append(resolved) + + target = self._resolve_key(key, key_holder) + if target is None: + return False + + pressed: list[object] = [] + try: + for item in modifiers: + controller.press(item) + pressed.append(item) + controller.press(target) + pressed.append(target) + time.sleep(0.012) + return True + except Exception: + return False + finally: + for item in reversed(pressed): + try: + controller.release(item) + except Exception: + pass + + def active_window(self) -> dict[str, object] | None: + hwnd = self._foreground_window_handle() + if hwnd is None: + return None + + pid = self._window_process_id(hwnd) + payload: dict[str, object] = { + "class": self._window_class_name(hwnd), + "initialClass": self._window_class_name(hwnd), + "title": self._window_title(hwnd), + } + if pid is not None: + payload["pid"] = pid + process_name = self._process_name(pid) + if process_name: + payload["process"] = process_name + return payload + + def cursor_position(self) -> tuple[int, int] | None: + if _USER32 is None: + return None + point = _Point() + if not bool(_USER32.GetCursorPos(ctypes.byref(point))): + return None + return point.x, point.y + + def move_cursor(self, *, x: int, y: int) -> bool: + if _USER32 is None: + return False + return bool(_USER32.SetCursorPos(int(x), int(y))) + + def switch_workspace(self, direction: str) -> bool: + key = "Left" if direction == "left" else "Right" + return self.send_shortcut(mod="CTRL WIN", key=key) + + def is_text_input_focused(self) -> bool | None: + focus_hwnd = self._focused_window_handle() + if focus_hwnd is None: + return None + + class_name = self._window_class_name(focus_hwnd).lower() + payload = { + "class": class_name, + "initialClass": class_name, + "title": self._window_title(focus_hwnd).lower(), + } + if is_terminal_window_payload(payload): + return True + if class_name in _WINDOWS_TEXT_CONTROL_CLASSES: + return True + return "edit" in class_name or "textarea" in class_name + + def send_enter_via_accessibility(self) -> bool | None: + return None + + def is_terminal_window_active(self) -> bool | None: + payload = self.active_window() + if payload is None: + return False + return is_terminal_window_payload(payload) + + def paste_shortcuts(self, *, terminal_active: bool) -> tuple[tuple[str, str], ...]: + if terminal_active: + return ( + ("CTRL SHIFT", "V"), + ("SHIFT", "Insert"), + ("CTRL", "V"), + ) + return (("CTRL", "V"),) + + def _load_keyboard_backend(self) -> tuple[object | None, object | None]: + if self._shortcut_controller is not None and self._shortcut_keys is not None: + return self._shortcut_controller, self._shortcut_keys + + try: + keyboard_module = importlib.import_module("pynput.keyboard") + except Exception: + return None, None + + controller_ctor = getattr(keyboard_module, "Controller", None) + key_holder = getattr(keyboard_module, "Key", None) + if controller_ctor is None or key_holder is None: + return None, None + + try: + controller = controller_ctor() + except Exception: + return None, None + + self._shortcut_controller = cast(object, controller) + self._shortcut_keys = cast(object, key_holder) + return self._shortcut_controller, self._shortcut_keys + + @staticmethod + def _resolve_modifier(token: str, key_holder: object) -> object | None: + mapping = { + "CTRL": "ctrl", + "CONTROL": "ctrl", + "SHIFT": "shift", + "ALT": "alt", + "WIN": "cmd", + "META": "cmd", + "CMD": "cmd", + "SUPER": "cmd", + } + attr = mapping.get(token) + if attr is None: + return None + return getattr(key_holder, attr, None) + + @staticmethod + def _resolve_key(raw_key: str, key_holder: object) -> object | None: + normalized = raw_key.strip().upper() + special_mapping = { + "RETURN": "enter", + "ENTER": "enter", + "INSERT": "insert", + "LEFT": "left", + "RIGHT": "right", + "UP": "up", + "DOWN": "down", + "TAB": "tab", + "ESC": "esc", + "ESCAPE": "esc", + "SPACE": "space", + } + attr = special_mapping.get(normalized) + if attr is not None: + return getattr(key_holder, attr, None) + + if len(normalized) == 1: + return normalized.lower() + + if normalized.startswith("F") and normalized[1:].isdigit(): + return getattr(key_holder, normalized.lower(), None) + + return None + + @staticmethod + def _foreground_window_handle() -> int | None: + if _USER32 is None: + return None + hwnd = _USER32.GetForegroundWindow() + return int(hwnd) if hwnd else None + + @staticmethod + def _window_title(hwnd: int) -> str: + if _USER32 is None: + return "" + length = int(_USER32.GetWindowTextLengthW(hwnd)) + if length <= 0: + return "" + buffer = ctypes.create_unicode_buffer(length + 1) + _ = _USER32.GetWindowTextW(hwnd, buffer, len(buffer)) + return buffer.value + + @staticmethod + def _window_class_name(hwnd: int) -> str: + if _USER32 is None: + return "" + buffer = ctypes.create_unicode_buffer(256) + copied = int(_USER32.GetClassNameW(hwnd, buffer, len(buffer))) + if copied <= 0: + return "" + return buffer.value + + @staticmethod + def _window_process_id(hwnd: int | None) -> int | None: + if _USER32 is None or hwnd is None: + return None + pid = wintypes.DWORD(0) + _ = _USER32.GetWindowThreadProcessId(hwnd, ctypes.byref(pid)) + return int(pid.value) or None + + @staticmethod + def _process_name(pid: int | None) -> str | None: + if _KERNEL32 is None or pid is None: + return None + + process_handle = _KERNEL32.OpenProcess(0x1000, False, pid) + if not process_handle: + return None + try: + buffer = ctypes.create_unicode_buffer(32768) + size = wintypes.DWORD(len(buffer)) + ok = bool( + _KERNEL32.QueryFullProcessImageNameW( + process_handle, + 0, + buffer, + ctypes.byref(size), + ) + ) + if not ok: + return None + return Path(buffer.value).name + finally: + _ = _KERNEL32.CloseHandle(process_handle) + + def _focused_window_handle(self) -> int | None: + hwnd = self._foreground_window_handle() + if hwnd is None or _USER32 is None: + return hwnd + + thread_id = _USER32.GetWindowThreadProcessId(hwnd, None) + if not thread_id: + return hwnd + + info = _GuiThreadInfo() + info.cbSize = ctypes.sizeof(_GuiThreadInfo) + if not bool(_USER32.GetGUIThreadInfo(thread_id, ctypes.byref(info))): + return hwnd + + focus_hwnd = info.hwndFocus or info.hwndActive + return int(focus_hwnd) if focus_hwnd else hwnd + + class HyprlandSystemIntegration: @property def is_hyprland(self) -> bool: @@ -263,12 +624,17 @@ def create_system_integration( if detect_hyprland_session(env=env): return HyprlandSystemIntegration() - _ = platform_name if platform_name is not None else sys.platform + normalized_platform = platform_name if platform_name is not None else sys.platform + if normalized_platform.startswith("win"): + return WindowsSystemIntegration() return NoopSystemIntegration() def probe_text_input_focus_via_atspi(*, timeout_s: float = 1.5) -> bool: + if not sys.platform.startswith("linux"): + return False + script = ( "import gi\n" "gi.require_version('Atspi', '2.0')\n" @@ -303,6 +669,9 @@ def probe_text_input_focus_via_atspi(*, timeout_s: float = 1.5) -> bool: def load_atspi_module() -> object | None: + if not sys.platform.startswith("linux"): + return None + try: gi = importlib.import_module("gi") require_version = cast(_RequireVersionFn, getattr(gi, "require_version"))