Skip to content

Commit eae77cb

Browse files
committed
Stabilize runtime dock and headless streaming
- move headless live output out of chat bubbles into dock ticker/inspector - align runtime dock labels, ticker cache, and hidden-tab stream recovery - persist actor.activity state transitions and fix WeCom media decrypt portability
1 parent 6e65551 commit eae77cb

35 files changed

Lines changed: 2225 additions & 1701 deletions

pyproject.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ authors = [
1414
license = "Apache-2.0"
1515
keywords = ["orchestrator","ai","rfd","pair","collaboration"]
1616
dependencies = [
17+
"cryptography>=41.0.0,<47.0",
1718
"PyYAML>=6.0,<7.0",
1819
"dingtalk-stream>=0.24.3",
1920
"fastapi>=0.110,<1.0",

src/cccc/contracts/v1/event.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
"actor.stop",
3131
"actor.restart",
3232
"actor.remove",
33+
"actor.activity",
3334
"context.sync",
3435
"chat.message",
3536
"chat.stream",
@@ -167,6 +168,13 @@ class ActorLifecycleData(BaseModel):
167168
model_config = ConfigDict(extra="forbid")
168169

169170

171+
class ActorActivityData(BaseModel):
172+
"""Periodic runtime status snapshot for running actors."""
173+
actors: List[Dict[str, Any]] = Field(default_factory=list)
174+
175+
model_config = ConfigDict(extra="allow")
176+
177+
170178
class ContextSyncData(BaseModel):
171179
version: str = ""
172180
changes: List[Dict[str, Any]] = Field(default_factory=list)
@@ -242,6 +250,7 @@ class Event(BaseModel):
242250
"actor.stop": ActorLifecycleData,
243251
"actor.restart": ActorLifecycleData,
244252
"actor.remove": ActorLifecycleData,
253+
"actor.activity": ActorActivityData,
245254
"context.sync": ContextSyncData,
246255
"chat.message": ChatMessageData,
247256
"chat.stream": ChatStreamData,

src/cccc/daemon/serve_ops.py

Lines changed: 60 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -293,6 +293,7 @@ def start_actor_activity_thread(
293293

294294
def _actor_activity_loop() -> None:
295295
interval = max(1.0, float(interval_seconds or 1.0))
296+
prev_runtime_by_group: Dict[str, Dict[str, Dict[str, Any]]] = {}
296297
while not stop_event.is_set():
297298
try:
298299
groups_base = home / "groups"
@@ -377,15 +378,72 @@ def _actor_activity_loop() -> None:
377378
actors_data.append(payload)
378379
actors_snapshot[aid] = payload
379380
replace_group_runtime(gid, actors_snapshot)
381+
382+
# Broadcast running actors in-memory (daemon IPC).
380383
if actors_data:
381-
event_broadcaster.publish({
384+
activity_event = {
382385
"id": uuid.uuid4().hex,
383386
"kind": "actor.activity",
384387
"ts": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()),
385388
"group_id": gid,
386389
"by": "system",
387390
"data": {"actors": actors_data},
388-
})
391+
}
392+
event_broadcaster.publish(activity_event)
393+
394+
# Detect working-state transitions so the web port
395+
# (which tails the ledger file) also receives the
396+
# update. Without this, actor.activity only flows
397+
# through the in-memory EventBroadcaster and never
398+
# reaches the web frontend.
399+
prev_snapshot = prev_runtime_by_group.get(gid, {})
400+
state_changed = len(actors_snapshot) != len(prev_snapshot)
401+
if not state_changed:
402+
for aid, payload in actors_snapshot.items():
403+
prev_actor = prev_snapshot.get(aid)
404+
if prev_actor is None or payload.get("effective_working_state") != prev_actor.get("effective_working_state"):
405+
state_changed = True
406+
break
407+
# Emit "stopped" entries for actors that disappeared
408+
# (crashed/stopped since last tick) so the web
409+
# frontend can clear stale "working" halos.
410+
stopped_entries: list[Dict[str, Any]] = []
411+
for prev_aid, prev_actor in prev_snapshot.items():
412+
if prev_aid not in actors_snapshot:
413+
prev_runner = str(prev_actor.get("runner_effective") or "pty").strip() or "pty"
414+
state_changed = True
415+
stopped_entries.append({
416+
"id": prev_aid,
417+
"running": False,
418+
"runner_effective": prev_runner,
419+
"idle_seconds": None,
420+
"effective_working_state": "stopped",
421+
"effective_working_reason": "runner_not_running",
422+
"effective_working_updated_at": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()),
423+
"effective_active_task_id": None,
424+
})
425+
prev_runtime_by_group[gid] = {
426+
aid: {
427+
"effective_working_state": p.get("effective_working_state"),
428+
"runner_effective": p.get("runner_effective"),
429+
}
430+
for aid, p in actors_snapshot.items()
431+
}
432+
if state_changed:
433+
ledger_actors = actors_data + stopped_entries
434+
if ledger_actors:
435+
try:
436+
from ..kernel.ledger import append_event
437+
append_event(
438+
group.ledger_path,
439+
kind="actor.activity",
440+
group_id=gid,
441+
scope_key="",
442+
by="system",
443+
data={"actors": ledger_actors},
444+
)
445+
except Exception:
446+
pass
389447
except Exception as e:
390448
_log_loop_error("actor_activity_tick failed", e)
391449
stop_event.wait(interval)

src/cccc/ports/im/adapters/wecom.py

Lines changed: 6 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818
import mimetypes
1919
import os
2020
import random
21-
import subprocess
2221
import threading
2322
import time
2423
import urllib.parse
@@ -28,6 +27,8 @@
2827
from pathlib import Path
2928
from typing import Any, Dict, List, Optional
3029

30+
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
31+
3132
from .base import IMAdapter, OutboundStreamHandle
3233

3334
# WeCom API limits
@@ -47,7 +48,6 @@
4748
# Keep the latest callback handle per chat for the lifetime of this bridge
4849
# process. We only need a bounded cache, not a time-based expiry.
4950
REPLY_REF_MAX_ENTRIES = 256
50-
WECOM_MEDIA_CIPHER = "aes-256-cbc"
5151
WECOM_MEDIA_BLOCK_SIZE = 32
5252

5353

@@ -249,30 +249,12 @@ def _strip_pkcs7_padding(self, raw: bytes, block_size: int = WECOM_MEDIA_BLOCK_S
249249
def _decrypt_media_bytes(self, encrypted: bytes, aes_key: str) -> bytes:
250250
key = self._decode_media_aes_key(aes_key)
251251
iv = key[:16]
252-
cmd = [
253-
"openssl",
254-
"enc",
255-
"-d",
256-
f"-{WECOM_MEDIA_CIPHER}",
257-
"-nopad",
258-
"-K",
259-
key.hex(),
260-
"-iv",
261-
iv.hex(),
262-
]
263252
try:
264-
proc = subprocess.run(
265-
cmd,
266-
input=encrypted,
267-
capture_output=True,
268-
check=False,
269-
)
253+
decryptor = Cipher(algorithms.AES(key), modes.CBC(iv)).decryptor()
254+
decrypted = decryptor.update(encrypted) + decryptor.finalize()
270255
except Exception as e:
271-
raise ValueError(f"wecom media decrypt command failed: {e}") from e
272-
if proc.returncode != 0:
273-
stderr = proc.stderr.decode("utf-8", errors="replace").strip()
274-
raise ValueError(f"wecom media decrypt failed: {stderr or f'openssl exit {proc.returncode}'}")
275-
return self._strip_pkcs7_padding(proc.stdout)
256+
raise ValueError(f"wecom media decrypt failed: {e}") from e
257+
return self._strip_pkcs7_padding(decrypted)
276258

277259
def _upload_media(self, raw: bytes, filename: str, media_type: str) -> str:
278260
boundary = "----cccc" + uuid.uuid4().hex

tests/test_codex_app_flow.py

Lines changed: 162 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1539,6 +1539,168 @@ def publish(self, event: dict) -> None:
15391539
finally:
15401540
cleanup()
15411541

1542+
def test_actor_activity_thread_writes_ledger_on_state_change(self) -> None:
1543+
"""actor.activity should be written to ledger on working-state transitions."""
1544+
from cccc.daemon.serve_ops import start_actor_activity_thread
1545+
from cccc.kernel.actors import add_actor
1546+
from cccc.kernel.group import create_group, load_group
1547+
from cccc.kernel.registry import load_registry
1548+
1549+
home, cleanup = self._with_home()
1550+
try:
1551+
reg = load_registry()
1552+
created = create_group(reg, title="codex-ledger-activity", topic="")
1553+
group = load_group(created.group_id)
1554+
self.assertIsNotNone(group)
1555+
add_actor(group, actor_id="peer1", title="Peer 1", runtime="codex", runner="headless") # type: ignore[arg-type]
1556+
group.save() # type: ignore[union-attr]
1557+
1558+
# Re-load to get fresh ledger_path
1559+
group = load_group(created.group_id)
1560+
self.assertIsNotNone(group)
1561+
ledger_path = group.ledger_path # type: ignore[union-attr]
1562+
1563+
class _Broadcaster:
1564+
def publish(self, event: dict) -> None:
1565+
pass
1566+
1567+
status_holder = {"status": "working", "running": True}
1568+
1569+
class _CodexSupervisor:
1570+
@staticmethod
1571+
def get_state(group_id: str, actor_id: str) -> dict:
1572+
return {
1573+
"group_id": group_id,
1574+
"actor_id": actor_id,
1575+
"status": status_holder["status"],
1576+
"current_task_id": "turn-1",
1577+
"updated_at": "2026-04-02T10:00:00Z",
1578+
}
1579+
1580+
@staticmethod
1581+
def actor_running(_group_id: str, _actor_id: str) -> bool:
1582+
return bool(status_holder.get("running", True))
1583+
1584+
stop_event = threading.Event()
1585+
thread = start_actor_activity_thread(
1586+
stop_event=stop_event,
1587+
home=Path(home),
1588+
pty_supervisor=object(),
1589+
headless_supervisor=object(),
1590+
codex_supervisor=_CodexSupervisor(),
1591+
event_broadcaster=_Broadcaster(),
1592+
load_group=load_group,
1593+
interval_seconds=1.0,
1594+
)
1595+
try:
1596+
# First tick runs immediately: new actor → state_changed → writes to ledger
1597+
time.sleep(0.25)
1598+
# Verify ledger has actor.activity
1599+
import json
1600+
lines = ledger_path.read_text(encoding="utf-8").strip().split("\n")
1601+
activity_lines = [json.loads(line) for line in lines if '"actor.activity"' in line]
1602+
self.assertTrue(activity_lines, "First tick should write actor.activity to ledger")
1603+
self.assertEqual(activity_lines[-1]["data"]["actors"][0]["effective_working_state"], "working")
1604+
1605+
initial_count = len(activity_lines)
1606+
# Wait another tick (>1s interval) — no state change → no new ledger write
1607+
time.sleep(1.3)
1608+
lines2 = ledger_path.read_text(encoding="utf-8").strip().split("\n")
1609+
activity_lines2 = [json.loads(line) for line in lines2 if '"actor.activity"' in line]
1610+
self.assertEqual(len(activity_lines2), initial_count, "No state change should not add ledger entries")
1611+
1612+
# Change state: working → idle → should write to ledger on next tick
1613+
status_holder["status"] = "idle"
1614+
time.sleep(1.3)
1615+
lines3 = ledger_path.read_text(encoding="utf-8").strip().split("\n")
1616+
activity_lines3 = [json.loads(line) for line in lines3 if '"actor.activity"' in line]
1617+
self.assertGreater(len(activity_lines3), initial_count, "State change should add ledger entry")
1618+
self.assertEqual(activity_lines3[-1]["data"]["actors"][0]["effective_working_state"], "idle")
1619+
1620+
# Simulate actor stopping (actor_running returns False)
1621+
idle_count = len(activity_lines3)
1622+
status_holder["running"] = False
1623+
time.sleep(1.3)
1624+
lines4 = ledger_path.read_text(encoding="utf-8").strip().split("\n")
1625+
activity_lines4 = [json.loads(line) for line in lines4 if '"actor.activity"' in line]
1626+
self.assertGreater(len(activity_lines4), idle_count, "Actor stop should add ledger entry")
1627+
last_event = activity_lines4[-1]
1628+
stopped_actors = [a for a in last_event["data"]["actors"] if a["id"] == "peer1"]
1629+
self.assertEqual(len(stopped_actors), 1, "Stopped actor should appear in event")
1630+
self.assertEqual(stopped_actors[0]["effective_working_state"], "stopped")
1631+
self.assertFalse(stopped_actors[0]["running"])
1632+
finally:
1633+
stop_event.set()
1634+
thread.join(timeout=1.0)
1635+
finally:
1636+
cleanup()
1637+
1638+
def test_actor_activity_thread_preserves_runner_on_stopped_entry(self) -> None:
1639+
from cccc.daemon.serve_ops import start_actor_activity_thread
1640+
from cccc.kernel.actors import add_actor
1641+
from cccc.kernel.group import create_group, load_group
1642+
from cccc.kernel.registry import load_registry
1643+
1644+
home, cleanup = self._with_home()
1645+
try:
1646+
reg = load_registry()
1647+
created = create_group(reg, title="pty-ledger-activity", topic="")
1648+
group = load_group(created.group_id)
1649+
self.assertIsNotNone(group)
1650+
add_actor(group, actor_id="peer1", title="Peer 1", runtime="codex", runner="pty") # type: ignore[arg-type]
1651+
group.save() # type: ignore[union-attr]
1652+
1653+
group = load_group(created.group_id)
1654+
self.assertIsNotNone(group)
1655+
ledger_path = group.ledger_path # type: ignore[union-attr]
1656+
status_holder = {"running": True}
1657+
1658+
class _PtySupervisor:
1659+
@staticmethod
1660+
def actor_running(_group_id: str, _actor_id: str) -> bool:
1661+
return bool(status_holder.get("running", True))
1662+
1663+
@staticmethod
1664+
def idle_seconds(*, group_id: str, actor_id: str) -> float:
1665+
return 0.0
1666+
1667+
@staticmethod
1668+
def terminal_override(*, group_id: str, actor_id: str):
1669+
return None
1670+
1671+
class _Broadcaster:
1672+
def publish(self, event: dict) -> None:
1673+
pass
1674+
1675+
stop_event = threading.Event()
1676+
thread = start_actor_activity_thread(
1677+
stop_event=stop_event,
1678+
home=Path(home),
1679+
pty_supervisor=_PtySupervisor(),
1680+
headless_supervisor=object(),
1681+
codex_supervisor=object(),
1682+
event_broadcaster=_Broadcaster(),
1683+
load_group=load_group,
1684+
interval_seconds=1.0,
1685+
)
1686+
try:
1687+
time.sleep(0.25)
1688+
status_holder["running"] = False
1689+
time.sleep(1.3)
1690+
1691+
lines = ledger_path.read_text(encoding="utf-8").strip().split("\n")
1692+
activity_lines = [json.loads(line) for line in lines if '"actor.activity"' in line]
1693+
self.assertTrue(activity_lines, "Actor stop should write actor.activity to ledger")
1694+
stopped_actors = [a for a in activity_lines[-1]["data"]["actors"] if a["id"] == "peer1"]
1695+
self.assertEqual(len(stopped_actors), 1)
1696+
self.assertEqual(stopped_actors[0]["effective_working_state"], "stopped")
1697+
self.assertEqual(stopped_actors[0]["runner_effective"], "pty")
1698+
finally:
1699+
stop_event.set()
1700+
thread.join(timeout=1.0)
1701+
finally:
1702+
cleanup()
1703+
15421704
def test_codex_session_persists_headless_state_file(self) -> None:
15431705
from cccc.daemon.codex_app_sessions import CodexAppSession
15441706
from cccc.daemon.runner_state_ops import headless_state_path

tests/test_wecom_adapter_core.py

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
"""Tests for WecomAdapter core functionality (Steps 7-13)."""
22

3+
import subprocess
34
import sys
45
import tempfile
56
import threading
@@ -8,6 +9,16 @@
89
from pathlib import Path
910
from unittest.mock import patch
1011

12+
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
13+
14+
15+
def _encrypt_wecom_media_for_test(raw: bytes, aes_key: str) -> bytes:
16+
key = aes_key.encode("utf-8")
17+
pad = 32 - (len(raw) % 32)
18+
padded = raw + bytes([pad]) * pad
19+
encryptor = Cipher(algorithms.AES(key), modes.CBC(key[:16])).encryptor()
20+
return encryptor.update(padded) + encryptor.finalize()
21+
1122

1223
class TestWecomAuthFrames(unittest.TestCase):
1324
"""WeCom AI Bot auth now uses bot_id + secret over WebSocket."""
@@ -736,6 +747,25 @@ def fake_urlopen(req, timeout=0):
736747
self.assertEqual(raw, b"plain-bytes")
737748
mock_decrypt.assert_called_once_with(b"encrypted-bytes", "12345678901234567890123456789012")
738749

750+
def test_download_attachment_decrypts_direct_url_without_external_openssl(self):
751+
adapter = self._make_adapter()
752+
aes_key = "12345678901234567890123456789012"
753+
encrypted = _encrypt_wecom_media_for_test(b"plain attachment bytes", aes_key)
754+
755+
def fake_urlopen(req, timeout=0):
756+
self.assertEqual(req.full_url, "https://example.test/media.enc")
757+
self.assertEqual(timeout, 60)
758+
return _FakeHttpResponse(encrypted)
759+
760+
with patch("cccc.ports.im.adapters.wecom.urllib.request.urlopen", side_effect=fake_urlopen):
761+
with patch.object(subprocess, "run", side_effect=AssertionError("external openssl should not be used")):
762+
raw = adapter.download_attachment({
763+
"download_url": "https://example.test/media.enc",
764+
"aeskey": aes_key,
765+
})
766+
767+
self.assertEqual(raw, b"plain attachment bytes")
768+
739769
def test_download_attachment_uses_direct_url_without_decrypt_when_no_aeskey(self):
740770
adapter = self._make_adapter()
741771

0 commit comments

Comments
 (0)