-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathbridge.py
More file actions
2027 lines (1793 loc) · 80.8 KB
/
Copy pathbridge.py
File metadata and controls
2027 lines (1793 loc) · 80.8 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
#!/usr/bin/env python3
"""Claude Code Telegram Bridge
Thin relay: Telegram messages -> claude -p -> Telegram responses.
Claude Code is the brain. This script is just a phone line.
Conversation continuity: each forum topic (or DM chat) maintains its own
session ID so consecutive messages share context. Sessions auto-expire
after 3 days of inactivity. Use /clearnew to start a fresh session in the
current topic.
Forum topics: Enable "Topics" in your Telegram group settings. Each topic
becomes an independent Claude session, running in parallel.
"""
import asyncio
import json
import os
import select
import shutil
import signal
import subprocess
import sys
import time
from collections.abc import Callable
# When run as `python bridge.py`, this module is loaded as `__main__`.
# Submodules under `patchbay/commands/` do `import bridge` to reach handler
# helpers via `bridge.X` (so test monkeypatches against `bridge.X` are
# visible). Without this alias, that import would re-execute bridge.py as a
# second module named `bridge`, and the re-entry would hit line 2148's
# `from patchbay.commands.lifecycle import cmd_cancel, ...` while lifecycle
# is mid-import → ImportError. Make `__main__` and `bridge` the same module.
sys.modules.setdefault("bridge", sys.modules[__name__]) # noqa: E402
from concurrent.futures import ThreadPoolExecutor
from dataclasses import dataclass, field
from datetime import datetime, timedelta
from pathlib import Path
from typing import NamedTuple
from dotenv import load_dotenv
load_dotenv(Path(__file__).parent / ".env")
import telegramify_markdown # noqa: E402
from telegram import InlineKeyboardButton, InlineKeyboardMarkup, Update # noqa: E402
from telegram.constants import ParseMode # noqa: E402
from telegram.error import ChatMigrated, Forbidden, RetryAfter # noqa: E402
from telegram.ext import ( # noqa: E402
Application,
CallbackQueryHandler,
CommandHandler,
ContextTypes,
MessageHandler,
filters,
)
from patchbay.markdown_config import configure_telegramify # noqa: E402
configure_telegramify()
# ---------------------------------------------------------------------------
# Import from package modules — these are the canonical implementations.
# Re-export at module level for backward compatibility with existing tests
# and validate.py.
# ---------------------------------------------------------------------------
from patchbay.config import ( # noqa: E402
ACTIVITY_LOG, # noqa: F401 — used by tests via bridge.ACTIVITY_LOG
ANSI_RE,
BOT_TOKEN,
CHAT_PROJECTS_FILE,
CLAUDE_PATH,
DEFAULT_HARNESS,
FORGE_QUEUE_DIR, # noqa: F401 — used by tests via bridge.FORGE_QUEUE_DIR
MAX_QUEUED_MESSAGES,
MAX_TIMEOUT,
MAX_TURNS,
MAX_WORKERS,
PA_PLUGIN_DIR,
PENDING_DIR,
DOC_DIR,
PHOTO_DIR,
QUOTA_HIT_PREFIX,
RESTART_DRAIN_TIMEOUT,
RESTART_NOTIFY_FILE,
SEND_RETRY_ATTEMPTS,
SEND_RETRY_BASE_DELAY,
SESSION_DIR, # noqa: F401 — used by tests via bridge.SESSION_DIR
SESSION_EXPIRY,
SESSION_KEY_RE,
SHUTDOWN_PROCESS_TIMEOUT,
STALL_POLL_INTERVAL,
STALL_TIMEOUT,
TELEGRAM_MSG_LIMIT,
TYPING_INTERVAL,
USAGE_WEEKLY_TOKEN_CAP,
VALID_HARNESSES,
WORKING_DIR,
logger,
)
from patchbay.sessions import ( # noqa: E402
PENDING_MAX_ATTEMPTS,
_sanitize_session_key, # noqa: F401 — used by tests via bridge._sanitize_session_key
_session_key,
archive_failed_pending,
bump_pending_attempts,
clear_pending,
clear_session,
consume_stall_kill,
get_session_id,
mark_stall_kill,
save_pending,
save_session_id,
)
from patchbay.harness import ( # noqa: E402
CAPABILITIES_BY_NAME,
ClaudeSdkHarness,
ToolUse, # noqa: F401 — re-exported for tests
TurnError,
TurnFinal,
TurnRequest,
)
from patchbay.parser import ( # noqa: E402
_parse_events, # noqa: F401 — re-exported for validate.py smoke tests
is_empty_success_response,
parse_claude_response,
)
from patchbay.quota import ( # noqa: E402
handoff_to_forge as _handoff_to_forge_impl,
is_quota_error as _is_quota_error_impl,
)
from patchbay.activity import log_activity # noqa: E402
from patchbay.file_send import extract_file_sentinels, send_files # noqa: E402
from patchbay.outbound import get_recent_outbound, log_outbound_response # noqa: E402
from patchbay.text_split import ( # noqa: E402
is_markdownv2_balanced,
split_for_telegram,
)
from patchbay.models import ( # noqa: E402
DEFAULT_MODEL,
VALID_MODELS,
extract_model_prefix,
get_chat_model,
resolve_model,
set_chat_model,
)
from patchbay.efforts import ( # noqa: E402
DEFAULT_EFFORT,
VALID_EFFORTS,
get_chat_effort,
resolve_effort,
set_chat_effort,
)
from patchbay.projects import ( # noqa: E402
_load_chat_projects,
_parse_project_entry,
get_all_projects as _get_all_projects,
get_chat_agent,
get_chat_harness,
get_chat_title,
get_chat_working_dir,
set_chat_harness,
set_chat_project,
set_chat_title,
)
# Backward-compatible names for functions that were renamed
_is_quota_error = _is_quota_error_impl
_handoff_to_forge = _handoff_to_forge_impl
_log_activity = log_activity
# Module-level _ANSI_RE for backward compat
_ANSI_RE = ANSI_RE
_SESSION_KEY_RE = SESSION_KEY_RE
_executor = ThreadPoolExecutor(max_workers=MAX_WORKERS)
# ---------------------------------------------------------------------------
# Per-session state
#
# Consolidates what used to be six parallel dicts keyed by session_key into a
# single SessionState object. The legacy module-level dicts below remain as
# the live data store during the migration; each subsequent commit will move
# one field from the legacy dicts onto SessionState until they can all be
# deleted. New code should read/write through `_get_session_state(key)`.
# ---------------------------------------------------------------------------
class QueuedMessage(NamedTuple):
"""A debounced queued message awaiting processing.
Carries `pending_id` so the corresponding pending file (saved at queue
time) can be cleared after the queued batch delivers, and so that
SIGTERM-tearing-the-loop-down before drain leaves the file in place
for replay_pending() on the next bridge start.
"""
text: str
pending_id: str
@dataclass
class SessionState:
"""All per-session runtime state, keyed by session_key in `_sessions`."""
proc: subprocess.Popen | None = None # subprocess harnesses (pi) only; mirrored by harness's proc_setter
harness: object | None = None # whichever harness instance is driving the active turn
worker_loop: asyncio.AbstractEventLoop | None = None # loop owned by _drive_harness_sync; needed for cross-loop cancel of SDK-based harnesses
started_at: float | None = None # time.time() when processing began
last_event_at: float | None = None # last time stall-detector saw activity
queue: list[QueuedMessage] = field(default_factory=list) # debounced messages awaiting processing
processing: bool = False # True while a claude run is in flight for this key
lock: asyncio.Lock = field(default_factory=asyncio.Lock)
# cc-sdk-mop: holds the MOP instance for the lifetime of the SDK client
# so its in-process MCP tools and Stop-hook closure stay valid. Cleared
# after the turn completes.
mop: object | None = None
# Owned by patchbay.runtime so command handlers (in patchbay/commands/) can
# import the same dict without a circular dependency on bridge. The local
# alias keeps existing call sites unchanged.
from patchbay.runtime import sessions as _sessions # noqa: E402
def _get_session_state(key: str) -> SessionState:
"""Return the SessionState for `key`, creating an empty one if needed."""
state = _sessions.get(key)
if state is None:
state = SessionState()
_sessions[key] = state
return state
def _iter_active_procs() -> list[tuple[str, subprocess.Popen]]:
"""Snapshot of (session_key, proc) for every session with a live subprocess.
Subprocess harnesses only (pi) — SDK-based sessions where the harness
owns its own subprocess (cc-sdk) won't appear here. Use
`_iter_active_sessions` for the backend-agnostic view (e.g. /ping,
stall detector).
"""
return [(k, s.proc) for k, s in _sessions.items() if s.proc is not None]
def _iter_active_sessions() -> list[tuple[str, "SessionState"]]:
"""Snapshot of (session_key, state) for every session running a turn,
regardless of harness. Used by /ping, the stall detector, /restart,
and graceful shutdown so SDK turns are visible too.
A session counts as active when either `state.proc` is set
(subprocess harnesses) or `state.harness` is set (SDK harnesses).
Either signal alone is sufficient.
"""
return [
(k, s)
for k, s in _sessions.items()
if s.proc is not None or s.harness is not None
]
async def _interrupt_session_async(state: "SessionState") -> None:
"""Soft interrupt — SIGINT for subprocess harnesses, task-cancel for SDK.
Subprocess harnesses (pi) get a chance to finish cleanly; SDK
harnesses behave the same as _cancel_session_async since the SDK
doesn't expose a gentler signal.
"""
import signal as _signal
proc = state.proc
if proc is not None:
try:
proc.send_signal(_signal.SIGINT)
except OSError:
pass
return
# SDK harness: no finer-grained interrupt available — fall through to cancel.
await _cancel_session_async(state)
async def _cancel_session_async(state: "SessionState") -> None:
"""Hard cancel — SIGKILL for subprocess harnesses, task-cancel for SDK.
Backend-agnostic dispatch:
* subprocess harnesses (pi) — `state.proc` is set; SIGKILL via
`proc.kill()`. Sync, immediate; the harness's drain returns and
`_drive_harness_sync` exits naturally.
* SDK harnesses (cc-sdk, cc-sdk-mop) — no subprocess handle the
bridge can reach (the SDK owns it). Schedule `harness.cancel()`
on the worker-thread loop captured in `state.worker_loop` via
`run_coroutine_threadsafe`, await with a short timeout so a
wedged loop can't hang us.
Idempotent and silent on error — callers (cmd_kill, stall detector,
shutdown) treat this as a fire-and-forget request.
"""
proc = state.proc
if proc is not None:
try:
proc.kill()
except OSError:
pass
return
harness = state.harness
loop = state.worker_loop
if harness is None or loop is None:
return
try:
future = asyncio.run_coroutine_threadsafe(harness.cancel(), loop)
except RuntimeError:
# Worker loop closed between our read and the schedule — nothing to do.
return
try:
await asyncio.wait_for(asyncio.wrap_future(future), timeout=5.0)
except (asyncio.TimeoutError, Exception): # noqa: BLE001 — silence everything during cancel
pass
async def _try_inflight_push(
state: SessionState,
text: str,
session_key: str,
label: str,
) -> bool:
"""Route `text` into the running session's live SDK client, if any.
Returns True iff the message was accepted by an in-flight cc-sdk turn
via `ClaudeSdkHarness.push()`. False means the caller should fall
back to the existing claim/queue path:
* no turn is processing for this session, OR
* the active harness isn't a ClaudeSdkHarness with a live client
(cc-sdk-mop, pi, between-turn states), OR
* push() raised — we don't want a broken inflight to swallow the
message silently.
Holds `state.lock` only long enough to read the harness + loop refs
so a concurrent _claim_or_queue can't slip a queued entry in
between our read and the push. The actual push happens off-lock
(it crosses event loops via run_coroutine_threadsafe and can block
briefly on the SDK transport write).
"""
async with state.lock:
if not state.processing:
return False
harness = state.harness
worker_loop = state.worker_loop
if (
worker_loop is None
or worker_loop.is_closed()
or not isinstance(harness, ClaudeSdkHarness)
or harness._live_client is None
):
return False
try:
future = asyncio.run_coroutine_threadsafe(harness.push(text), worker_loop)
accepted = await asyncio.wait_for(asyncio.wrap_future(future), timeout=5.0)
except (asyncio.TimeoutError, RuntimeError, Exception) as e: # noqa: BLE001 — never let a failed push lose the message; bridge falls back to queue
logger.warning(
"Inflight push failed for %s (%s): %s — falling back to queue",
session_key,
label,
e,
)
return False
if accepted:
logger.info("Pushed %s inflight into running cc-sdk session for %s", label, session_key)
_log_activity(
f"{label}_pushed_inflight",
session_key=session_key,
text_len=len(text),
)
return bool(accepted)
async def _claim_or_queue(state: SessionState, text: str, pending_id: str = "") -> tuple[str, int | None]:
"""Atomically claim the processing lane or enqueue the message.
Returns one of:
("claimed", None) — caller now owns processing for this session.
("queued", depth) — caller's message has been queued at the given depth.
("full", None) — queue is full; caller should drop the message.
Holding state.lock around the check+claim+enqueue closes a debounce
race where two messages arriving in the same event-loop tick could
both pass the `if state.processing` check and stomp on each other.
`pending_id` is the on-disk pending-file id for this message, threaded
in so queued messages survive a SIGTERM-mid-drain — replay_pending()
on the next bridge start re-runs anything still on disk.
"""
async with state.lock:
if not state.processing:
state.processing = True
state.started_at = time.time()
return ("claimed", None)
if len(state.queue) >= MAX_QUEUED_MESSAGES:
return ("full", None)
state.queue.append(QueuedMessage(text=text, pending_id=pending_id))
return ("queued", len(state.queue))
async def _drain_next(state: SessionState) -> list[QueuedMessage] | None:
"""Pop the next batch of queued messages, or release processing.
Returns the batch if there are queued messages. Otherwise atomically
clears the processing flag (and started_at) under the lock and returns
None — that's the only safe place to release ownership, since a
concurrent _claim_or_queue would otherwise see processing=True, queue
a message, then watch it get orphaned when we cleared the flag.
"""
async with state.lock:
if not state.queue:
state.processing = False
state.started_at = None
return None
batch = state.queue
state.queue = []
return batch
async def _release_processing(state: SessionState) -> None:
"""Force-clear the processing flag. Used as defensive cleanup on an
exception path — _drain_next normally handles the happy path."""
async with state.lock:
state.processing = False
state.started_at = None
# Track remote-control process (only one at a time, keyed by session key)
_remote_proc: subprocess.Popen | None = None
_remote_proc_key: str | None = None
_remote_drain_thread: object | None = None # threading.Thread when active
def _start_remote_drain(proc: subprocess.Popen) -> None:
"""Drain proc.stdout in a daemon thread for the rest of its life.
Without this, `claude remote-control` can deadlock on a full stdout
pipe (~64KB on macOS) once the initial-output capture window closes
and nobody is reading anymore. The drain just discards lines —
everything the user needs (connection info, etc.) was already
captured during the initial 10s window. Audit §17.
"""
import threading
def _drain() -> None:
try:
for _ in iter(proc.stdout.readline, ""):
pass
except (OSError, ValueError):
pass
finally:
try:
proc.stdout.close()
except OSError:
pass
global _remote_drain_thread
t = threading.Thread(target=_drain, name="remote-control-drain", daemon=True)
t.start()
_remote_drain_thread = t
# Message debounce: batch messages that arrive while Claude is processing
# Flag to block new messages during graceful shutdown
_shutting_down = False
# Bot instance (set in post_init)
_bot_instance = None
# Bridge's primary asyncio loop, captured in post_init. Used by the
# cc-sdk-mop deliver closure to schedule `bot.send_message` on the loop
# where the bot's httpx client was created — calling it from inside the
# cc-sdk-mop dispatch's temporary `asyncio.run(...)` loop poisons the bot.
_main_loop: asyncio.AbstractEventLoop | None = None
# Captured once at runtime module import time — used by /health to report
# process uptime. Local alias preserves existing _BRIDGE_STARTED_AT call sites.
from patchbay.runtime import BRIDGE_STARTED_AT as _BRIDGE_STARTED_AT # noqa: E402
# ---------------------------------------------------------------------------
# Claude invocation
# ---------------------------------------------------------------------------
def _drive_harness_sync(
harness,
req: TurnRequest,
state: "SessionState | None" = None,
) -> list:
"""Drive the harness async iterator from a sync context, return all events.
`run_claude` is sync (called via `loop.run_in_executor` by the orchestrator),
but the harness exposes an async iterator. This helper bridges the two:
we run a fresh event loop on the worker thread, drain the iterator, and
hand back the collected `TurnEvent` list. One loop per call is fine —
harness work is dominated by the subprocess wait, not loop overhead.
When `state` is given, we mirror the harness instance and the
worker-thread's event loop into it for the duration of the turn.
`_cancel_session_async` reads those fields to dispatch a cancel:
subprocess harnesses are killed via `state.proc.kill()`, SDK
harnesses are cancelled via `run_coroutine_threadsafe(harness.cancel(),
state.worker_loop)`. The fields are cleared in `finally`.
"""
events: list = []
async def _drive() -> None:
if state is not None:
state.worker_loop = asyncio.get_running_loop()
state.harness = harness
try:
async for event in harness.run_turn(req):
events.append(event)
finally:
if state is not None:
state.harness = None
state.worker_loop = None
asyncio.run(_drive())
return events
def run_claude(
message: str,
session_key: str,
_retry: bool = False,
model: str | None = None,
max_turns_override: int | None = None,
) -> str:
"""Invoke a coding-agent harness for one turn, translate events to a string.
max_turns_override: when set, replaces MAX_TURNS for this invocation
only. Used by the OOM self-heal path (see patchbay/self_heal.py) to
retry with a tighter turn budget after a kill.
"""
session_id = get_session_id(session_key)
chat_cwd = get_chat_working_dir(session_key)
projects = _load_chat_projects()
entry = projects.get(session_key)
rel_path, _ = _parse_project_entry(entry)
project_info = f"~/Developer/{rel_path}" if rel_path else "~/Developer (default)"
agent_name = get_chat_agent(session_key)
system_prompt = (
"The user is messaging you via Telegram from their phone. "
"You have full access to all your MCP tools and can do real work. "
"For email access, use the himalaya CLI: 'himalaya envelope list --account icloud' or '--account gmail' to list emails, "
"'himalaya message read <id> --account <account>' to read them. "
"IMPORTANT: NEVER use the AskUserQuestion tool - it requires interactive terminal UI that doesn't work through Telegram. "
"Instead, ask questions as plain text in your response and let the user reply naturally.\n\n"
"VOICE — replies are for a phone screen, not a desk:\n"
"1. Lead with substance. No opener ('I'll do X', 'Now let me Y') and no closer ('Want me to...', 'Let me know...').\n"
"2. Prefer lists over prose when itemizing. Use NUMBERS for ordered/sequential items, LETTERS (a, b, c) for unordered items the user might reference back. Avoid plain bullets (•/-) — the user can't cite them shorthand.\n"
"3. Each list item is one line with at most one parenthetical for crucial detail. No nested lists.\n"
"4. Plain-English noun phrases. No class names, SDK type names, or `module.symbol` paths unless that's literally what's being changed.\n"
"5. Flag inactions explicitly: 'Did NOT X — <one short reason>'. The user needs to know what's still on their plate.\n"
"6. Tests as raw counts only ('17 new, 692 total'). No narrative around them.\n"
"7. Length: roughly 60-80 words for multi-item status updates, 10-30 words for single facts. When uncertain, cut.\n"
"8. The user complains 'too much' when answers are long, never when short.\n\n"
f"TURN LIMIT: This session has a {MAX_TURNS}-turn limit. If a task will take more than ~20 tool calls, "
"decompose it: do the critical/unblocking work now, create beads for the remaining subtasks, "
"then report what you did and what's queued. Don't get cut off mid-task.\n\n"
"CRITICAL: Your FINAL output MUST be a text response to the user — never end on a tool call. "
"If you've done work via tools, summarize what you did in a short text message at the end. "
"Even one sentence is acceptable; total silence is the only true failure. "
"If you don't produce text, the bridge has to burn an extra Claude query asking you to "
"summarize, and the user sees '(Completed N turns…)' until that retry returns. "
"Bottom line: always close the conversation with at least a brief text reply.\n\n"
"TOOL STDOUT IS INVISIBLE TO THE USER: Anything printed by Bash, Python scripts, or other tools "
"goes to YOUR context only — never to Telegram. If you run a script that prints prototypes, "
"tables, mockups, or any content you want the user to see, you MUST inline that content verbatim "
"in your text response. Never write 'three shapes above', 'see output', 'here's the result', "
"or any phrase that implies the user can see what the tool printed. If the tool output is what "
"you're showing them, paste it into your text message. This failure mode has bitten us before — "
"when in doubt, inline it.\n\n"
"FORMATTING: Telegram renders your replies as MarkdownV2 (converted from standard markdown by the bridge). "
"Use normal markdown — `inline code`, ```code blocks```, **bold**, *italic*, and block quotes all render. "
"Tables are NOT supported by Telegram and will be rendered as a plain code block, so prefer numbered/lettered "
"lists (per the VOICE rules above) or ASCII-aligned columns inside a ``` code block for tabular data.\n\n"
"HEADLESS — You and the user are not co-located. The user is on a phone via Telegram; "
"you are on a remote machine with no GUI session, no display, no one to click dialogs or "
"watch a screen. Don't depend on interactive computer use — yours or the user's. Reason from "
"code, tests, and static analysis. If a task genuinely needs a runtime/GUI step (rare), stop "
"and say so rather than asking the user to do it on their phone.\n\n"
"SHARED FILES: There is a ProtonDrive folder synced to this machine. "
"Find it at ~/Library/CloudStorage/ProtonDrive-*/Claude-Support (glob for the exact path). "
"You can drop files there (documents, images, exports) for the user to access from any device. "
"Photos sent from Telegram are already handled separately via the photo handler.\n\n"
"SENDING FILES TO TELEGRAM: To attach a file directly to your reply (image, gpx, pdf, "
"anything), include a sentinel anywhere in your response text:\n"
" [[send-file: /absolute/path/to/file.ext]]\n"
" [[send-file: /absolute/path/to/photo.png | optional caption]]\n"
"Image MIMEs are sent inline via sendPhoto; everything else as a document. "
"Path must be absolute. Photos cap at 10MB, documents at 50MB. "
"The sentinel itself is stripped from the message — write it on its own line. "
"Use this instead of dropping into ProtonDrive when the user wants the file in-chat.\n\n"
f"Telegram session key: {session_key}\n"
f"Working directory: {project_info}\n"
f"Chat projects config: {CHAT_PROJECTS_FILE}\n"
"You can change your own project directory by editing chat_projects.json "
"(map session key to a path relative to ~/Developer). "
"After changing it, tell the user to run /clearnew to pick up the new cwd."
)
# Inject recent outbound notifications so the session knows what was sent
recent = get_recent_outbound(session_key, max_age=86400.0)
if recent:
lines = []
for entry in recent[-3:]: # last 3 messages max
ts = datetime.fromtimestamp(entry["ts"]).strftime("%H:%M")
src = entry.get("source", "?")
txt = entry["text"][:500]
lines.append(f" [{ts}] ({src}): {txt}")
system_prompt += (
"\n\nRECENT NOTIFICATIONS sent to this thread (the user may be replying to one of these):\n"
+ "\n".join(lines)
)
if agent_name:
system_prompt += (
f"\n\nAGENT MODE: You are the '{agent_name}' agent from Fanta. "
f"CLAUDE.md has the full loading order — follow it. "
f"Do NOT read other agents' files. You are ONLY the {agent_name} agent. "
f"Skip MEMORY.md in Telegram context (per Fanta conventions)."
)
# If the previous run was killed by the stall detector, warn this run
# against repeating whatever headless-unsafe command likely hung it.
stall_info = consume_stall_kill(session_key)
if stall_info:
idle_min = stall_info.get("idle_minutes", 0)
system_prompt += (
f"\n\nPRIOR RUN KILLED: Your previous invocation in this session was terminated by the "
f"stall detector after {idle_min:.0f} minutes of zero CPU with no output. The most likely "
"cause is that you ran a command requiring a macOS TCC/GUI permission dialog (XCUITest, "
"`xcodebuild test` with UI tests, `osascript` targeting a GUI app you hadn't pre-approved, "
"Simulator boot, Instruments, etc.). The user is on their phone — they cannot click the dialog. "
"Look at your last tool call in the conversation history, DO NOT RETRY IT, and pick a "
"headless-safe alternative (swift test, tuist build, unit tests, static analysis)."
)
# Resolve model / effort the same way the legacy path did.
if not model:
model = resolve_model(session_key)
effort = resolve_effort(session_key)
# Resolve harness: per-chat override > DEFAULT_HARNESS env. Every
# supported harness is dispatched directly — selection is logged on
# every activity entry under `harness=` and `harness_requested=` so
# cross-harness comparison stays possible.
harness_name = get_chat_harness(session_key) or DEFAULT_HARNESS
if harness_name not in VALID_HARNESSES:
logger.warning(
"Unknown harness %r for %s; falling back to %s",
harness_name,
session_key,
DEFAULT_HARNESS,
)
harness_name = DEFAULT_HARNESS
effective_harness = harness_name
if session_id:
logger.info("Resuming session %s for %s", session_id[:12], session_key)
logger.info(
"Launching claude in %s for %s (model=%s, effort=%s, harness=%s)",
chat_cwd,
session_key,
model or "default",
effort,
effective_harness,
)
invoke_start = time.time()
_log_activity(
"turn_invoke",
session_key=session_key,
cwd=chat_cwd,
model=model or "default",
effort=effort,
resume=bool(session_id),
harness=effective_harness,
harness_requested=harness_name,
)
state = _get_session_state(session_key)
state.last_event_at = time.time()
def _on_progress() -> None:
st = _sessions.get(session_key)
if st is not None:
st.last_event_at = time.time()
def _proc_setter(proc: subprocess.Popen | None) -> None:
st = _sessions.get(session_key)
if st is not None:
st.proc = proc
# ----- cc-sdk-mop dispatch (in-process MCP + Stop hook) -----
# MOP is wired into the SDK client itself: model output goes through
# `mcp__mop__submit_message`, which calls bot.send_message directly.
# Because MOP delivers the response itself, run_claude returns "" and
# the orchestrator's _send_response is a no-op (extract_file_sentinels
# short-circuits on empty). The MOP instance must outlive the SDK
# client so its Stop hook closure stays valid — pinned via state.mop
# and cleared in the finally block.
if effective_harness == "cc-sdk-mop":
from claude_agent_sdk import (
AssistantMessage,
ClaudeSDKClient,
ResultMessage,
SystemMessage,
TextBlock,
)
from patchbay.harness.claude_sdk_mop import ClaudeSdkMopHarness
# Parse session_key → chat_id, thread_id for build_options.
if "_" in session_key:
chat_id_str, thread_id_str = session_key.split("_", 1)
chat_id = int(chat_id_str)
thread_id = int(thread_id_str)
else:
chat_id = int(session_key)
thread_id = None
rules_dir_env = os.environ.get("MOP_RULES_DIR")
rules_dir = Path(rules_dir_env) if rules_dir_env else None
if _main_loop is None:
raise RuntimeError(
"cc-sdk-mop dispatch invoked before post_init captured "
"the bridge's main loop — bot would be poisoned. Bug."
)
harness = ClaudeSdkMopHarness()
options, mop = harness.build_options(
bot=_bot_instance,
chat_id=chat_id,
thread_id=thread_id,
main_loop=_main_loop,
rules_dir=rules_dir,
session_key=session_key,
)
# Resume the existing Claude session so each turn keeps history.
# build_options returns a generic ClaudeAgentOptions; the per-turn
# session_id is the bridge's responsibility to wire in.
if session_id:
options.resume = session_id
# Pin mop to session state so the GC doesn't reap its Stop-hook
# closure mid-turn.
state.mop = mop
captured_session_id: str | None = None
plain_text_fragments: list[str] = []
async def _drive_mop_session() -> None:
nonlocal captured_session_id
async with ClaudeSDKClient(options=options) as client:
await client.query(message)
async for msg in client.receive_response():
_on_progress()
if isinstance(msg, SystemMessage):
sid = msg.data.get("session_id") if isinstance(msg.data, dict) else None
if sid:
captured_session_id = sid
elif isinstance(msg, AssistantMessage):
for block in getattr(msg, "content", []) or []:
if isinstance(block, TextBlock):
txt = getattr(block, "text", None)
if txt:
plain_text_fragments.append(txt)
elif isinstance(msg, ResultMessage):
sid = getattr(msg, "session_id", None)
if sid:
captured_session_id = sid
return
try:
asyncio.run(_drive_mop_session())
finally:
state.mop = None
duration = time.time() - invoke_start
if captured_session_id:
save_session_id(session_key, captured_session_id)
# Safety net: if MOP delivered nothing this turn (model didn't call
# submit_message and the Stop hook didn't compel it), fall back to
# the model's plain TextBlock output so the user never gets silence.
# The deliver closure carries `delivery_count`, incremented each time
# MOP successfully sent something to Telegram; checked here AFTER
# the SDK loop returns. Any positive count means user received
# something through MOP and we should NOT also send the plain text.
deliver_closure = getattr(mop, "_patchbay_deliver", None)
raw_count = getattr(deliver_closure, "delivery_count", 0) if deliver_closure else 0
delivery_count = raw_count if isinstance(raw_count, int) else 0
delivered = delivery_count > 0
fallback_text = ""
if not delivered and plain_text_fragments:
fallback_text = "\n\n".join(s.strip() for s in plain_text_fragments if s.strip())
if fallback_text:
logger.warning(
"cc-sdk-mop fallback: MOP did not deliver but model produced %d chars of plain text — sending as regular reply",
len(fallback_text),
)
_log_activity(
"turn_complete",
session_key=session_key,
duration=duration,
elapsed_ms=int(duration * 1000),
turns_used=None,
exit_code=0,
response_len=len(fallback_text),
harness=effective_harness,
mop_delivery_count=delivery_count,
fallback=bool(fallback_text),
)
# MOP delivered via Telegram itself — empty string suppresses the
# orchestrator's redundant _send_response. If MOP delivered nothing
# we return the plain text so _send_response sends it.
return fallback_text
if effective_harness == "cc-sdk":
# cc-sdk owns its subprocess internally — there's no Popen handle
# for the bridge to mirror, so /kill / stall detector / shutdown
# route through `harness.cancel()` via _cancel_session_async
# instead. `state.proc` stays None for the duration of this turn.
harness = ClaudeSdkHarness(
cli_path=CLAUDE_PATH,
max_timeout_seconds=MAX_TIMEOUT,
on_progress=_on_progress,
max_turns_default=(
max_turns_override if max_turns_override is not None else MAX_TURNS
),
)
elif effective_harness == "pi":
# Pi (badlogicgames/pi) — multi-model coding agent. Uses its own
# session storage (~/.pi/agent/sessions). Subprocess-based, so
# proc_setter mirrors into state.proc for /kill / stall.
from patchbay.harness import PiHarness
harness = PiHarness(
max_timeout_seconds=MAX_TIMEOUT,
on_progress=_on_progress,
proc_setter=_proc_setter,
)
else:
raise RuntimeError(
f"Unhandled harness {effective_harness!r} after validation. Bug."
)
req = TurnRequest(
prompt=message,
session_key=session_key,
project_dir=Path(chat_cwd),
system_prompt=system_prompt,
resume_session_id=session_id,
model=model,
effort=effort,
allowed_tools=None,
disallowed_tools=["AskUserQuestion", "EnterPlanMode", "ExitPlanMode"],
max_turns=(max_turns_override if max_turns_override is not None else MAX_TURNS),
plugin_dir=PA_PLUGIN_DIR,
extra=None,
)
try:
events = _drive_harness_sync(harness, req, state)
except (asyncio.CancelledError, KeyboardInterrupt):
# /kill cancels the in-flight task; CancelledError unwinds out of
# asyncio.run() in _drive_harness_sync. Log a terminal activity event
# so /soak and the activity log see the kill — without this branch
# the turn shows only `turn_invoke` + `process_kill` and looks
# indistinguishable from a wedge. cmd_kill already replied to the
# user and cleared state.processing; we re-raise so the orchestrator
# skips its own send.
duration = time.time() - invoke_start
_log_activity(
"turn_cancelled",
session_key=session_key,
duration=duration,
elapsed_ms=int(duration * 1000),
harness=effective_harness,
)
raise
finally:
st = _sessions.get(session_key)
if st is not None:
st.proc = None
st.last_event_at = None
# Belt-and-suspenders: _drive_harness_sync clears these in its
# own finally too, but a hard exception out of asyncio.run could
# in principle leave them stale.
st.harness = None
st.worker_loop = None
duration = time.time() - invoke_start
final = events[-1] if events else None
# Failure path — TurnError. Each kind maps to an existing recovery branch.
if isinstance(final, TurnError):
if final.kind == "timeout":
_log_activity(
"turn_timeout",
session_key=session_key,
duration=duration,
elapsed_ms=int(duration * 1000),
harness=effective_harness,
)
return (
f"[Timed out after {MAX_TIMEOUT // 60} min] "
"Session preserved — send your message again to resume."
)
if final.kind == "corrupt_session" and not _retry:
stale = final.metadata.get("stale_session_id") or session_id
logger.warning(
"Stale session %s for %s, retrying fresh",
(stale or "?")[:12],
session_key,
)
clear_session(session_key)
return run_claude(message, session_key, _retry=True)
if final.kind == "oom" and not _retry:
from patchbay.self_heal import (
OOM_RETRY_MAX_TURNS,
OOM_RETRY_PROMPT_TRIM,
dispatch_repair,
)
rc = final.metadata.get("exit_code", 0)
result = dispatch_repair(
"claude_oom_137",
{"session_key": session_key, "returncode": rc},
)
if result.fixed:
logger.warning(
"OOM kill (rc=%d) for %s, retrying with max_turns=%d trimmed_prompt=%dch",
rc,
session_key,
OOM_RETRY_MAX_TURNS,
OOM_RETRY_PROMPT_TRIM,
)
return run_claude(
message[:OOM_RETRY_PROMPT_TRIM],
session_key,
_retry=True,
model=model,
max_turns_override=OOM_RETRY_MAX_TURNS,
)
if final.kind == "rate_limit":
stderr = final.metadata.get("stderr", "")
source = "stderr" if stderr else "events+stderr"
if stderr:
logger.warning(
"Quota/rate limit detected (no output) for %s: %s",
session_key,
stderr[:200],
)
else:
logger.warning("Quota/rate limit detected for %s", session_key)
_log_activity(
"quota_hit",
session_key=session_key,
duration=duration,
source=source,
harness=effective_harness,
)
return QUOTA_HIT_PREFIX + message
if final.kind == "max_turns":
sess_id = final.metadata.get("session_id")
if sess_id:
save_session_id(session_key, sess_id)
num_turns = final.metadata.get("num_turns")
_log_activity(
"turn_complete",
session_key=session_key,
duration=duration,
elapsed_ms=int(duration * 1000),
turns_used=num_turns,
exit_code=0,
response_len=len(final.message),
harness=effective_harness,
)
return final.message
# unknown / process_died — surface what we have.
rc = final.metadata.get("exit_code", 0)
stderr = final.metadata.get("stderr", "")
if rc and stderr:
logger.warning(
"Claude exited %d for %s. stderr: %s",