diff --git a/CHANGELOG.md b/CHANGELOG.md index 8ff35a5..5fe24c7 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,27 @@ # Changelog +## [0.4.1] - 2026-06-09 + +### Added +- FTS5 token filter: fixed `detail=column` issue where pure-digit/alpha tokens were parsed as column names (port from Opprime) +- Knowledge auto-retrieval: record_hit() on every auto-search hit for cache warmth +- _recorded_tc: track total tool calls per loop execution for SkillOpt gate integration +- _build_gmem_summary: standalone helper for archive_store compression summary construction +- tools/remember_info.py: unified memory route — auto-classifies user content into Knowledge/Notes/Experience +- tools/archive_search.py: Archive Store search tool for cross-session memory recall + +### Changed +- kernel.py: merged Opprime improvements (FTS5 safety filter + Knowledge hit recording + loop tc tracking) +- session.py: synchronized with Opprime (fixes for L1 compaction type mismatch, image_url filtering) +- experience.py: synchronized with Opprime (improved JSON error tolerance) +- storage.py: synchronized with Opprime (aging mechanism for Knowledge entries) +- archive_store.py: generalized agent-specific DB fallback paths to `~/gbase-home/` (public-ready) + +### Fixed +- knowledge.py: FTS5 MATCH returning 0 matches due to unpurged detail=column token pollution +- session.py: content null leading to DeepSeek serde enum variant mismatch (400 error) +- session.py: L1 compression type mismatch (passing str instead of list[dict]) + ## [0.4.0] - 2026-06-02 ### Added @@ -14,6 +36,18 @@ - L4 permanent notes: auto-serialize when tool calls >= 5 or reply >= 500 chars - 20+ new tools: self_edit, feishu_send, note_tool, knowledge, mirror_tool, chain, mail, security_watch, etc. +### Added +- ArchiveStore: full-text conversation archive with LIKE-based semantic retrieval (replaces LLM compression) +- Session: removed 3-layer LLM compression, replaced with ArchiveStore append/search +- Territory safety: cross-agent read/write access control (write blocking, read warning) +- RSI Dual-Knob: task intent classification → dynamic temperature control +- Time-decay weighted retrieval: M3 sparse attention inspired (7d full / 7-30d linear / 30d+ exponential) +- Entity conflict detection: Cosmos 3 inspired, auto-detect contradictions on write +- Hot query cache (LRU, max 64) for high-frequency entity lookups +- Archive trash recovery: deleted entries saved to `data/archive_trash/` as grep-able JSONL +- L4 permanent notes: auto-serialize when tool calls >= 5 or reply >= 500 chars +- 20+ new tools: self_edit, feishu_send, note_tool, knowledge, mirror_tool, chain, mail, security_watch, etc. + ### Changed - kernel.py: archive_store init + semantic bridge search/save (disabled old online LLM compression) - session.py: replaced compress_l1/l2/async_compress with archive-driven context building diff --git a/lib/archive_store.py b/lib/archive_store.py index 6c7b7a6..bf368fe 100644 --- a/lib/archive_store.py +++ b/lib/archive_store.py @@ -22,7 +22,6 @@ hits = store.search("query keywords") """ -import contextlib import json import logging import os @@ -136,7 +135,10 @@ def append(self, role: str, content: str | list | dict, *, priority: int = 0, so if not content: return - content = json.dumps(content, ensure_ascii=False) if isinstance(content, (list, dict)) else str(content) + if isinstance(content, (list, dict)): + content = json.dumps(content, ensure_ascii=False) + else: + content = str(content) if len(content) > _MAX_CONTENT_CHARS: content = content[:_MAX_CONTENT_CHARS] + "..." @@ -520,7 +522,7 @@ def search(self, query: str, top_k: int = _DEFAULT_SEARCH_TOP_K, params: list = [self.session_key] # 关键词条件 - kw_conditions = " OR ".join("content LIKE ? COLLATE NOCASE" for _ in keywords) + kw_conditions = " OR ".join(f"content LIKE ? COLLATE NOCASE" for _ in keywords) where_parts.append(f"({kw_conditions})") params.extend(f"%{k}%" for k in keywords) @@ -552,7 +554,7 @@ def search(self, query: str, top_k: int = _DEFAULT_SEARCH_TOP_K, # 计算每条记录的命中关键词数(作为粗糙的 BM25 替代) scored = [] - for content, role, ts, priority, _source_id, _eid in all_rows: + for content, role, ts, priority, source_id, eid in all_rows: if not content: continue hits = sum(1 for kw in keywords if kw in content or kw.lower() in content.lower()) @@ -632,11 +634,11 @@ def _extract_keywords(self, query: str) -> list[str]: "这样", "那样", "可能", "需要", "之后", "之前", "现在", "我们", "他们", "你们", "自己", "一些", "这些", "那些", "谢谢", "你好", "请问", "好的", "是的", "知道", "觉得", - "然后", "或者", "除了", "不想", "想要", "打算", - "看到", "听说", "告诉", "我的", "你的", "他的", + "然后", "或者", "还是", "除了", "不想", "想要", "打算", + "看到", "听说", "觉得", "告诉", "我的", "你的", "他的", "大家", "东西", "时候", "不错", "真的", "非常", "很多", "工作", "生活", "事情", "感觉", "方面", "一点", "一定", - "还有", "出来", + "还有", "因为", "出来", } keywords = [k for k in keywords if k not in _COMMON_BIGRAMS] @@ -726,8 +728,10 @@ def close(self): self.flush() def __del__(self): - with contextlib.suppress(Exception): + try: self.close() + except Exception: + pass # ── 旧数据迁移 ───────────────────────────────────── @@ -765,6 +769,58 @@ def _save_trash(session_key: str, rows: list[tuple]): logger.warning("归档Write失败(不影响主流程): %s", e) +def recent_global(limit: int = 10, hours: int = 72) -> dict: + """跨 session 获取最近 N 小时的全局 markers(Phase 4 学用对接)。 + + 不限制 session_key,只按时间过滤。 + 用于 session 预热时注入同主题历史。 + """ + import sqlite3, time, datetime + + # 找 archive.db(尝试多个位置) + candidates = [ + os.path.join(os.path.dirname(__file__), "..", "data", "archive.db"), + os.path.expanduser("~/gbase-home/data/archive.db"), + ] + db_path = None + for c in candidates: + p = os.path.abspath(c) + if os.path.exists(p): + db_path = p + break + if not db_path: + return {"markers": [], "count": 0, "db": None} + + cutoff_ts = time.time() - hours * 3600 + + with _LOCK: + try: + conn = sqlite3.connect(db_path) + cursor = conn.execute( + "SELECT marker, timestamp, session_key FROM archive_markers " + "WHERE timestamp >= ? " + "ORDER BY timestamp DESC LIMIT ?", + (cutoff_ts, limit), + ) + rows = cursor.fetchall() + conn.close() + except Exception: + return {"markers": [], "count": 0, "db": db_path} + + result = [] + for marker, ts, skey in rows: + dt = datetime.datetime.fromtimestamp(ts) + skey_short = skey.split(":")[-1][:20] if skey else "" + result.append({ + "marker": marker[:120], + "timestamp": ts, + "time_str": dt.strftime("%m-%d %H:%M"), + "session": skey_short, + }) + + return {"markers": result, "count": len(result), "db": db_path} + + def _copy_old_data(dat_db_path: str, archive_db_path: str): """从 dat.db 导入旧 experience/knowledge 数据到 archive.db(一次性)。""" if not os.path.exists(dat_db_path): @@ -779,7 +835,7 @@ def _copy_old_data(dat_db_path: str, archive_db_path: str): cursor = conn.cursor() # 从 entries table 找 experience 和 knowledge - for tbl, _pri in [("entries", 1)]: + for tbl, pri in [("entries", 1)]: try: cursor.execute(f"SELECT content, type FROM {tbl} WHERE content IS NOT NULL AND content != ''") for content, typ in cursor.fetchall(): diff --git a/lib/experience.py b/lib/experience.py index e010abf..fdccd19 100644 --- a/lib/experience.py +++ b/lib/experience.py @@ -40,19 +40,14 @@ "summary": "此次任务工具调用次数偏多({tool_calls_count}次),下次同类任务应该先规划再调工具", "confidence": "medium", }, - { - "name": "short_reply", - "check": lambda ctx: len(ctx.get("reply", "")) < 80, - "summary": "回复长度偏短({reply_len}字),下次应尽量提供更完整的回答", - "confidence": "low", - }, + { "name": "api_error", "check": lambda ctx: ctx.get("has_api_error", False), "summary": "工具调用时有 API 错误,下次应注意检查工具是否可用", "confidence": "high", }, - # ── 反脆弱: 失败尝试也写入经验,不静默Rollback ── + # ── 反脆弱: 失败尝试也写入经验,不静默回滚 ── { "name": "failed_action", "check": lambda ctx: bool(ctx.get("has_failure", False)), @@ -62,16 +57,16 @@ { "name": "failed_rollback", "check": lambda ctx: bool(ctx.get("rollback_occurred", False)), - "summary": "执行Rollback: [{rollback_action}] 验证失败,已Rollback。这条路走不通。", + "summary": "执行回滚: [{rollback_action}] 验证失败,已回滚。这条路走不通。", "confidence": "medium", }, # ── 反脆弱: 成功模式提炼(成功比失败更需要分析)── { "name": "success_pattern", - "check": lambda ctx: ctx.get("tool_calls_count", 0) > 0 + "check": lambda ctx: ctx.get("tool_calls_count", 0) >= 3 and not ctx.get("has_api_error", False) and not ctx.get("has_failure", False), - "summary": "成功完成[{task_theme}],工具调用{successful_calls}次。有效策略:{effective_strategy}", + "summary": "有效模式: [{task_theme}] 用 {tool_calls_count} 次工具调用完成", "confidence": "medium", }, ] @@ -150,11 +145,11 @@ def _is_duplicate_rule(storage: "store_module.Storage", rule_name: str) -> bool: """ -# ── Experience extraction器 ────────────────────────────────────────── +# ── 经验提取器 ────────────────────────────────────────── class ExperienceEngine: - """Experience Engine。绑定到一个 Storage 实例上运作。""" + """经验引擎。绑定到一个 Storage 实例上运作。""" def __init__(self, storage: store_module.Storage): self.storage = storage @@ -204,10 +199,10 @@ async def extract( if _is_duplicate_rule(self.storage, rule_name): self._skip_count[rule_name] = self._skip_count.get(rule_name, 0) + 1 - logger.debug("Experience deduplication跳过: rule=%s (已跳过%d次)", rule_name, self._skip_count[rule_name]) + logger.debug("经验去重跳过: rule=%s (已跳过%d次)", rule_name, self._skip_count[rule_name]) return - logger.info("Experience extraction(规则): %s", rule_result["summary"][:60]) + logger.info("经验提取(规则): %s", rule_result["summary"][:60]) # --- 如果是成功完成任务自动刻入 insight --- if tool_calls_count > 0 and not has_api_error and rule_result["type"] != "insight" and not has_failure: _record_success_insight(self, user_message, tool_calls_count) @@ -247,7 +242,7 @@ async def extract( try: await self._llm_extract(context, llm_client) except Exception as e: - logger.warning("Experience extraction(LLM)失败: %s", e) + logger.warning("经验提取(LLM)失败: %s", e) async def _llm_extract(self, context: dict, client): """元认知反思提取 — 从「发生了什么」升级到「为什么发生、如何避免、什么条件下该用不同策略」。 @@ -270,10 +265,27 @@ async def _llm_extract(self, context: dict, client): ) text = response.choices[0].message.content.strip() if text == "null" or not text: - logger.debug("Experience extraction(LLM): 无有价值教训") + logger.debug("经验提取(LLM): 无有价值教训") + return + + # 类型防御:LLM 可能返回不完整 JSON(被截断的末尾) + is_clean = False + for try_idx in range(3): + try: + result = json.loads(text) + is_clean = True + break + except json.JSONDecodeError: + # 尝试找到最晚的完整 JSON 截止点 + last_brace = text.rfind("}") + if last_brace > 0: + text = text[:last_brace + 1] + else: + break + if not is_clean: + logger.warning("经验提取(LLM): JSON 解析失败,跳过") return - result = json.loads(text) if "summary" in result: # 构建结构化 entry summary = result["summary"][:200] @@ -316,19 +328,19 @@ async def _llm_extract(self, context: dict, client): except Exception: pass - logger.info("Experience extraction(元认知反思): %s", summary[:60]) + logger.info("经验提取(元认知反思): %s", summary[:60]) # --- 自动刻入 insight(成功任务不留空洞) --- if context.get("tool_calls_count", 0) > 0 and not context.get("has_api_error", False): _record_success_insight(self, context.get("user_message", ""), context["tool_calls_count"]) except (json.JSONDecodeError, KeyError) as e: - logger.debug("Experience extraction(LLM)解析失败: %s | 原始响应: %s", e, text[:200] if 'text' in dir() else "N/A") + logger.debug("经验提取(LLM)解析失败: %s | 原始响应: %s", e, text[:200] if 'text' in dir() else "N/A") except Exception as e: - logger.debug("Experience extraction(LLM)异常: %s", e) + logger.debug("经验提取(LLM)异常: %s", e) def search(self, query: str, limit: int = 5) -> list[dict]: - """搜索经验库。优先 FTS5 全文Search,无结果时回退 LIKE 模糊匹配。 + """搜索经验库。优先 FTS5 全文检索,无结果时回退 LIKE 模糊匹配。 排序逻辑: - 先按 BM25 相关性分 + 内容长度惩罚(太长降级) diff --git a/lib/kernel.py b/lib/kernel.py index 59aa526..a852cd4 100644 --- a/lib/kernel.py +++ b/lib/kernel.py @@ -466,6 +466,14 @@ def _build_dynamic_system_prompt(self) -> str: # 中文多字词,拆单字也加进去 for _ch in _w: _fts_tokens.append(f"{_ch}*") + # FTS5 detail=column 下纯数字/纯单字母 token 会被解析为 column name + _fts_tokens = [t for t in _fts_tokens + if not _import_re.match(r'^\d+$', t) + and not _import_re.match(r'^[a-zA-Z]$', t) + and len(t) > 1] + # 过滤后保底:至少保留原始词保证有查询内容 + if not _fts_tokens: + _fts_tokens = [f"{_w}*" for _w in _words if len(_w) > 1] _fts_query = " OR ".join(_fts_tokens)[:500] _results = [] with _storage._lock: @@ -503,6 +511,12 @@ def _build_dynamic_system_prompt(self) -> str: + "\n".join(_results) ) parts.append(_know_text) + # GMem Phase 1A1: 自动检索命中后 record_hit + for _hit_r in _rows: + try: + _storage.record_hit(_hit_r[0]) + except Exception: + logger.exception("记录 hit 失败 (id=%s)", _hit_r[0]) logger.info("Knowledge 自动Search: 命中 %d 条", len(_results)) else: logger.info("Knowledge 自动Search: 无命中") @@ -1711,7 +1725,28 @@ async def _run_one_tool(tc): if session: session.append(tr) + # SkillOpt: 记录本轮工具调用数 + self._recorded_tc = getattr(self, "_recorded_tc", 0) + len(msg.tool_calls) + # 递归至多 15 层 if depth + 1 >= MAX_TOOL_DEPTH: return await self._loop(messages, tools, depth=depth + 1, session=session) return await self._loop(messages, tools, depth=depth + 1, session=session) + + +# ── GMem Phase B1: 构建压缩摘要(供 archive_store 存档) ── +def _build_gmem_summary(stats: dict, session) -> str: + """从 session 统计信息构建压缩摘要文本。""" + try: + parts = [f"上下文压缩 checkpoint — 消息数: {stats.get('messages', 0)}, 压缩次数: {stats.get('compactions', 0)}, 层级: {session.get_compaction_level() if hasattr(session, 'get_compaction_level') else 0}"] + # 尝试获取最后几条会话摘要 + if hasattr(session, "get_all_compactions"): + compactions = session.get_all_compactions() + for c in compactions[-3:]: + if isinstance(c, str): + parts.append(f" · {c[:200]}") + elif isinstance(c, dict): + parts.append(f" · {c.get('summary', str(c)[:200])}") + return "\n".join(parts) + except Exception: + return "" diff --git a/lib/session.py b/lib/session.py index 3e08ab5..a94e95b 100644 --- a/lib/session.py +++ b/lib/session.py @@ -2,50 +2,52 @@ """ gbase/lib/session.py -Session management: append-only JSONL implementation. -Never physically delete old entries, navigate via compression markers. +Session 管理:append-only JSONL 实现。 +永不物理删除旧条目,通过压缩路标跳转。 -Three-layer context compression system (simplified version of Claude Code's 5-layer compression): -- L1: Real-time online compression - Generate summary with LLM when conversation exceeds threshold -- L2: Multi-layer summary evolution - Merge multiple compactions into higher-level summaries -- L3: Session state tracking - Dynamic compression threshold + context usage statistics +三层上下文压缩体系(Claude Code 五层压缩的简化版): +- L1: 在线实时压缩 — 对话超过阈值时用 LLM 生成摘要 +- L2: 多层摘要进化 — 多个 compaction 合并为更高级摘要 +- L3: 会话状态追踪 — 动态压缩阈值 + 上下文使用量统计 """ +import asyncio import json import logging +import threading import time from pathlib import Path logger = logging.getLogger(__name__) class JsonlSessionManager: - """Append-only JSONL Session Manager with three-layer compression capability.""" + """Append-only JSONL 会话管理器,带三层压缩能力。""" def __init__(self, filepath: str, max_context: int = 100): self.filepath = Path(filepath) self.filepath.parent.mkdir(parents=True, exist_ok=True) self.max_context = max_context - self._adaptive_max = max_context # L3: Dynamic threshold adjustment + self._adaptive_max = max_context # L3: 动态阈值调节 self.fh: object | None = None self._stats = {"messages": 0, "compactions": 0, "tokens_estimate": 0} - self._compacted_up_to = 0 # Compression marker - self._compaction_level = 0 # L2: Current summary level (number of merge compressions) + self._compacted_up_to = 0 # 压缩路标 + self._compaction_level = 0 # L2: 当前摘要层级(第几次合并压缩) self._open() def _open(self): - """Open or create JSONL file.""" + """打开或创建 JSONL 文件。""" if self.fh: try: if hasattr(self.fh, "close"): self.fh.close() except Exception: - logger.exception("Silent exception") + logger.exception("静默异常") self.fh = open(self.filepath, "a+", encoding="utf-8") def _update_adaptive_max(self): - """L3: Dynamically adjust context retention rounds based on compression level.""" - # After each layer of compression, the number of retained rounds decreases, but not below the minimum + """L3: 根据压缩层级动态调节上下文保留轮次。""" + # 每层压缩后,保留的轮次缩小,但不低于底线 base = self.max_context level = self._compaction_level if level <= 0: @@ -55,16 +57,32 @@ def _update_adaptive_max(self): elif level == 2: self._adaptive_max = max(8, base - 8) else: - self._adaptive_max = 50 # Level 3 and above, retain at least 3 rounds (6 messages) + self._adaptive_max = 50 # 第三层及以上,至少保留 3 轮(6 条消息) @staticmethod - def _estimate_tokens(text: str) -> int: - """Roughly estimate token count. + def _estimate_tokens(text: str | list | dict) -> int: + """粗略估算 token 数。支持 string / list[dict] / dict 类型。 - Chinese approx 1.5 chars/token, English approx 4 chars/token, plus safety margin. + 中文约 1.5 chars/token,英文约 4 chars/token,加安全边际。 """ if not text: return 0 + # 处理多模态消息(list[dict],含 text/image_url) + if isinstance(text, list): + total = 0 + for item in text: + if isinstance(item, dict): + for v in item.values(): + if isinstance(v, str): + total += len(v) + elif isinstance(v, dict): + total += len(str(v)) + elif isinstance(item, str): + total += len(item) + return int(total * 0.35) + 10 + if isinstance(text, dict): + flat = str(text) + return int(len(flat) * 0.35) + 10 chinese_chars = sum(1 for c in text if '\u4e00' <= c <= '\u9fff') other_chars = len(text) - chinese_chars return int(chinese_chars * 1.5 + other_chars / 4) + 10 @@ -79,7 +97,7 @@ def get_adaptive_max(self) -> int: return self._adaptive_max def append(self, entry: dict) -> int: - """Append a record. entry is a message dictionary, must contain role field.""" + """追加一条记录。entry 是消息字典,必须包含 role 字段。""" entry["_id"] = int(time.time() * 1000) entry["_ts"] = time.time() role = entry.get("role", "unknown") @@ -100,12 +118,12 @@ def append(self, entry: dict) -> int: return entry["_id"] def append_batch(self, entries: list[dict]): - """Batch append.""" + """批量追加。""" for e in entries: self.append(e) def append_user_message(self, content: str, extra: dict | None = None) -> int: - """Shortcut: Append a user message.""" + """快捷:追加一条用户消息。""" entry = {"role": "user", "content": content} if extra: entry.update(extra) @@ -115,17 +133,17 @@ def get_or_create(self, session_key: str) -> "JsonlSessionManager": return self def build_context(self, max_messages: int | None = None, max_tokens: int = 0) -> list[dict]: - """Build LLM messages context. + """构建 LLM messages 上下文。 - Three-layer filtering: - 1. Compaction entry skips old content, injects summary (multi-layer: only highest level summary is injected) - 2. Remove tool_call / tool_result - 3. Compress by round + retain last max_messages rounds + 三层过滤: + 1. compaction entry 跳过旧内容,注入摘要(多层:只有最高层摘要注入) + 2. 去掉 tool_call / tool_result + 3. 按轮压缩 + 保留最近 max_messages 轮 - If max_tokens > 0, accumulate tokens from back to front, truncate front content when exceeding. + 如果 max_tokens > 0,从后往前累计 token,超出则截断前面的内容。 - L2 multi-layer summary: If there are multiple compaction levels, - Only the highest level summary is injected into the context. + L2 多层摘要:如果有多个 compaction level, + 只有最高层的摘要被注入到上下文。 """ if max_messages is None: max_messages = self._adaptive_max @@ -133,8 +151,8 @@ def build_context(self, max_messages: int | None = None, max_tokens: int = 0) -> messages: list[dict] = [] current_assistant_buf: dict | None = None skipped_compacted = False - highest_summary = "" # L2: Highest level summary (for injection) - highest_entry = None # L2: Highest level complete entry (for structured field usage) + highest_summary = "" # L2: 最高层摘要(用于注入) + highest_entry = None # L2: 最高层完整 entry(结构化字段使用) highest_level = -1 try: @@ -171,11 +189,41 @@ def build_context(self, max_messages: int | None = None, max_tokens: int = 0) -> if current_assistant_buf is not None: messages.append(current_assistant_buf) current_assistant_buf = None - msg = {"role": "user", "content": entry.get("content", "")} + # 过滤 image_url 结构:部分模型(如 DeepSeek)不支持多模态 content + _raw_content = entry.get("content", "") or "" + if isinstance(_raw_content, list): + # list[dict] 格式(含 text/image_url)转纯文本段 + _text_parts = [] + for _item in _raw_content: + if isinstance(_item, dict): + if _item.get("type") == "text": + _text_parts.append(_item.get("text", "")) + elif _item.get("type") == "image_url": + _text_parts.append("[图片]") + else: + _text_parts.append(str(_item)) + else: + _text_parts.append(str(_item)) + _raw_content = "\n".join(_text_parts) + msg = {"role": "user", "content": _raw_content} messages.append(msg) elif entry_type == "assistant": - msg = {"role": "assistant", "content": entry.get("content", "")} + _raw_content = entry.get("content", "") or "" + if isinstance(_raw_content, list): + _text_parts = [] + for _item in _raw_content: + if isinstance(_item, dict): + if _item.get("type") == "text": + _text_parts.append(_item.get("text", "")) + elif _item.get("type") == "image_url": + _text_parts.append("[图片]") + else: + _text_parts.append(str(_item)) + else: + _text_parts.append(str(_item)) + _raw_content = "\n".join(_text_parts) + msg = {"role": "assistant", "content": _raw_content} if "reasoning_content" in entry: msg["reasoning_content"] = entry["reasoning_content"] # 始终保留 assistant 消息(包括 content="" 只有 tool_calls 的情况), @@ -271,7 +319,7 @@ def build_context(self, max_messages: int | None = None, max_tokens: int = 0) -> # 保留最近 max_messages 轮 # 但强制保留最后 1 轮完整 user+assistant(防止 LLM 忘记自己刚说过什么) if len(messages) > max_messages: - # 截断前Save最后完整的 user+assistant 对 + # 截断前保存最后完整的 user+assistant 对 keep = [] for m in reversed(messages): keep.insert(0, m) @@ -289,11 +337,11 @@ def build_context(self, max_messages: int | None = None, max_tokens: int = 0) -> def get_compaction_context(self, max_messages: int = 15) -> list[dict]: """L2: 获取压缩阶段的高层摘要 + 近期轮次。 - + 不同于 build_context(给 LLM 用),这个方法返回: - 所有层级的摘要列表(不是只取最高层) - 最新 max_messages 轮对话 - + 用于 L2 多层压缩:把旧摘要 + 近期对话 → 新摘要。 """ summaries: list[dict] = [] @@ -330,19 +378,122 @@ def get_compaction_context(self, max_messages: int = 15) -> list[dict]: if etype in ("user", "assistant"): recent.append({ "role": entry.get("role", etype), - "content": entry.get("content", ""), + "content": entry.get("content", "") or "", }) except Exception: - logger.exception("Silent exception") + logger.exception("静默异常") return {"summaries": summaries, "recent": recent[-max_messages:]} + def compress( + self, + compress_fn: callable, + threshold: int = 15, + ) -> dict | None: + """L1 + L2: 同步版上下文压缩(供 asyncio.to_thread 调用)。 + + 参数: + compress_fn: 接收消息列表,返回摘要文本的回调 + threshold: 触发 L1 压缩的最小消息轮次(默认 15 轮) + + 返回: + 压缩统计信息,超时时返回 None + """ + # L1: 在线实时压缩 — 对话超过阈值时用 LLM 生成摘要 + try: + context_data = self.get_compaction_context(threshold) + recent = context_data.get("recent", []) + if len(recent) < threshold: + return None + + # L1: 新摘要 + session_text = json.dumps(recent, ensure_ascii=False)[:5000] + # 类型防御:传给压缩函数的可能是截断的 JSON 字符串 + # compress_fn 在 kernel 层加了解析恢复逻辑 + summary = compress_fn(session_text) + if not summary: + return None + + entry = { + "type": "compaction", + "level": 0, + "summary": summary, + "decisions": [], + "key_facts": [], + "pending": [], + "context": summary[:500], + "messages_since_last": len(recent), + "_ts": int(time.time()), + } + self.append(entry) + self._compacted_up_to = self._stats["messages"] + self._compaction_level = 0 + self._update_adaptive_max() + self._stats["compactions"] += 1 + + # L2: 多层摘要进化 — 已有 compaction 时合并升级 + summaries = context_data.get("summaries", []) + old_summaries = [s for s in summaries if s.get("level", 0) < 2] + if len(old_summaries) >= 2: + merge_text = json.dumps(old_summaries[-3:], ensure_ascii=False)[:4000] + merged = compress_fn(merge_text) + if merged: + entry = { + "type": "compaction", + "level": 2, + "summary": merged, + "decisions": [], + "key_facts": [], + "pending": [], + "context": merged[:500], + "messages_since_last": 0, + "_ts": int(time.time()), + } + self.append(entry) + self._compaction_level = 2 + self._stats["compactions"] += 1 + + return self._stats.copy() + + except Exception: + logger.exception("L1/L2 压缩异常(静默)") + return None + + def start_async_compress( + self, + compress_fn: callable, + interval_sec: int = 600, + threshold: int = 25, + ): + """P2-3: 启动后台异步压缩守护线程。 + + 在守护线程中循环调用 compress(),失败后等待 10 分钟重试。 + + 参数: + compress_fn: 压缩回调(同步) + interval_sec: 压缩间隔(秒) + threshold: 触发压缩的最小消息轮次 + """ + import threading as _threading + + def _worker(): + while True: + try: + self.compress(compress_fn, threshold=threshold) + except Exception as e: + logger.exception("Async compress failed: %s", e) + time.sleep(interval_sec) + + thread = _threading.Thread(target=_worker, daemon=True) + thread.start() + logger.info("后台异步压缩守护已启动 (interval=%ds, threshold=%d)", interval_sec, threshold) + def close(self): if self.fh: try: self.fh.close() except Exception: - logger.exception("Silent exception") + logger.exception("静默异常") def __del__(self): self.close() diff --git a/lib/storage.py b/lib/storage.py index d88b85a..328fceb 100644 --- a/lib/storage.py +++ b/lib/storage.py @@ -1,10 +1,10 @@ # SPDX-License-Identifier: MIT """ -Gbase storage engine module +gbase/lib/storage.py -Persistence Engine — SQLite primary + JSONL readable mirror dual-write. +沉淀引擎 — SQLite 主力 + JSONL 可读镜像双写。 -All experience/knowledge/skill records are read/written through this module. +栈内存所有经验/知识/Skill 都通过这个模块读写。 """ import contextlib @@ -21,42 +21,42 @@ DATA_DIR = Path(__file__).parent.parent / "data" DB_PATH = DATA_DIR / "dat.db" -# JSONL mirror file names for each layer +# 三层对应的 JSONL 镜像文件名 _MIRROR_FILES = { "experience": "experience.jsonl", "knowledge": "knowledge.jsonl", "skills": "skills.jsonl", } -# P1: Soft limit — only delete un-referenced (hits=0) records older than 90 days -# Old hard limit of 50 was the root cause of goldfish memory. -# All hit-referenced records are now preserved permanently. +# P1: 软上限 — 只删从未被引用(hits=0)且超过 90 天的旧条目 +# 原 50 条硬上限是金鱼记忆的根因。现在保护所有被引用过的记录永久保留 _MAX_RECORDS = 50000 -_PRUNING_KEEP_DAYS = 90 # hide unused records after 90 days +_PRUNING_KEEP_DAYS = 90 # hits=0 的记录至少保留 90 天 class Storage: - """Storage engine. + """沉淀引擎。 - Usage: + 用法: store = Storage() - store.setup() # First-time init + store.setup() # 首次初始化 store.write("experience", {"summary": "xxx", ...}) entries = store.read_recent("experience", limit=5) - Thread-safe: uses threading.Lock internally. + 线程安全:内部使用 threading.RLock(可重入锁,支持递归调用)。 """ def __init__(self, db_path: str = None, data_dir: str = None): self._db_path = db_path or str(DB_PATH) self._data_dir = Path(data_dir) if data_dir else DATA_DIR - self._lock = threading.Lock() + self._lock = threading.RLock() self._conn: sqlite3.Connection | None = None + self._setup_ran = False # 避免 setup() 内的 ALTER 重复执行警告 - # ── Initialization ──────────────────────────────── + # ── 初始化 ────────────────────────────────────────── def setup(self): - """First-time init (create tables + directories + WAL mode).""" + """首次初始化(建表 + 建目录 + WAL 模式)。""" os.makedirs(self._data_dir, exist_ok=True) with self._lock: @@ -66,10 +66,10 @@ def setup(self): CREATE TABLE IF NOT EXISTS entries ( id INTEGER PRIMARY KEY AUTOINCREMENT, type TEXT NOT NULL, -- experience | knowledge | skills - content TEXT NOT NULL, -- JSON string - summary TEXT DEFAULT '', -- one-line summary - created_at REAL NOT NULL, -- timestamp - hits INTEGER DEFAULT 0, -- reference count + content TEXT NOT NULL, -- JSON 字符串 + summary TEXT DEFAULT '', -- 一句话摘要 + created_at REAL NOT NULL, -- 时间戳 + hits INTEGER DEFAULT 0, -- 被引用次数 confidence TEXT DEFAULT 'low' -- low | medium | high ) """) @@ -77,22 +77,26 @@ def setup(self): CREATE INDEX IF NOT EXISTS idx_type_created ON entries(type, created_at DESC) """) - # Compat migration: add tags/rule columns if missing + # 兼容迁移:旧表无 tags/rule/archived/last_accessed_at 列时加上 with contextlib.suppress(Exception): conn.execute("ALTER TABLE entries ADD COLUMN tags TEXT DEFAULT ''") with contextlib.suppress(Exception): conn.execute("ALTER TABLE entries ADD COLUMN rule TEXT DEFAULT ''") - # FTS5 full-text index (supports Chinese via unicode61 tokenizer) - # content='entries' means text is not stored separately, linked via rowid + with contextlib.suppress(Exception): + conn.execute("ALTER TABLE entries ADD COLUMN archived INTEGER DEFAULT 0") + with contextlib.suppress(Exception): + conn.execute("ALTER TABLE entries ADD COLUMN last_accessed_at REAL DEFAULT 0") + # FTS5 全文索引(支持中文 unicode61 tokenizer) + # content='entries' 表示不单独存文本,通过 entries 表 rowid 关联 conn.execute(""" CREATE VIRTUAL TABLE IF NOT EXISTS entries_fts USING fts5( content, summary, content='entries', content_rowid='id', tokenize='unicode61', - detail=column + detail=full ) """) - # Triggers: auto-sync FTS on insert/delete/update + # 触发器:写入/删除/更新时自动同步 FTS conn.executescript(""" CREATE TRIGGER IF NOT EXISTS entries_fts_ai AFTER INSERT ON entries BEGIN INSERT INTO entries_fts(rowid, content, summary) @@ -111,7 +115,7 @@ def setup(self): """) conn.commit() self._conn = conn - # Rebuild FTS (existing data not yet in FTS) + # 重建 FTS(已有的数据未进 FTS) try: cursor = conn.execute("SELECT COUNT(*) FROM entries_fts") fts_count = cursor.fetchone()[0] @@ -121,28 +125,55 @@ def setup(self): conn.executescript(""" INSERT INTO entries_fts(entries_fts) VALUES('rebuild'); """) - logger.info("FTS index rebuild complete: %d entries", total) + logger.info("FTS 索引重建完成: %d 条", total) except Exception as rebuild_err: - logger.warning("FTS index rebuild skipped: %s", rebuild_err) - logger.info("Storage engine ready: %s", self._db_path) + logger.warning("FTS 索引重建跳过: %s", rebuild_err) + logger.info("存储引擎已就绪: %s", self._db_path) - # ── Write ────────────────────────────────────────── + # ── 写入 ──────────────────────────────────────────── def _ensure_ready(self): - """Ensure storage is initialized. Must be called inside self._lock.""" + """确保 storage 已初始化。必须在 self._lock 内调用。""" if self._conn is None: self.setup() + @staticmethod + def _validate_write(type_: str, summary: str, confidence: str) -> tuple[bool, str]: + """写入前的轻量验证门。 + + Returns: + (通过?, 拒绝原因) + """ + # ① 空/过短内容直接跳过 + if not summary or len(summary.strip()) < 10: + return False, "内容过短或无内容" + + # ② 置信度 low 且内容没有实质信息(低质量噪音) + low_quality_patterns = ["测试", "test", "常规操作", "正常", "unknown", "默认"] + if confidence == "low": + for pat in low_quality_patterns: + if pat in summary[:20]: + return False, f"低置信度且含噪音标记({pat})" + + return True, "" + def write(self, type_: str, entry: dict, summary: str = "", confidence: str = "low", **kwargs) -> int: - """Write a record. Auto-writes SQLite + appends JSONL mirror.""" - _ = kwargs # noqa: ARG002 — compat extension params + """写入一条记录。自动写 SQLite + 追加 JSONL 镜像。""" + _ = kwargs # noqa: ARG002 — 兼容扩展参数 + + # ── 验证门:写入前过滤低质量内容 ── + _pass, _reason = self._validate_write(type_, summary, confidence) + if not _pass: + logger.debug("验证门跳过写入 %s: %s (summary=%s)", type_, _reason, summary[:40]) + return -1 + now = time.time() content_json = json.dumps(entry, ensure_ascii=False) with self._lock: self._ensure_ready() - # Write SQLite + # 写入 SQLite cursor = self._conn.execute( "INSERT INTO entries (type, content, summary, created_at, confidence) VALUES (?, ?, ?, ?, ?)", (type_, content_json, summary, now, confidence), @@ -150,7 +181,7 @@ def write(self, type_: str, entry: dict, summary: str = "", confidence: str = "l row_id = cursor.lastrowid self._conn.commit() - # Append to JSONL mirror + # 追加 JSONL 镜像 mirror_path = self._data_dir / _MIRROR_FILES.get(type_, "unknown.jsonl") mirror_entry = { "id": row_id, @@ -163,16 +194,16 @@ def write(self, type_: str, entry: dict, summary: str = "", confidence: str = "l with open(mirror_path, "a", encoding="utf-8") as f: f.write(json.dumps(mirror_entry, ensure_ascii=False) + "\n") - # Check limit, delete oldest records + # 检查上限,删除最旧记录 self._prune(type_) - logger.debug("Write %s[%d]: %s", type_, row_id, summary[:60]) + logger.debug("写入 %s[%d]: %s", type_, row_id, summary[:60]) return row_id - # ── Read ────────────────────────────────────────── + # ── 读取 ──────────────────────────────────────────── def read_recent(self, type_: str, limit: int = 5) -> list[dict]: - """Read the most recent N records.""" + """读取最近 N 条记录。""" with self._lock: if self._conn is None: return [] @@ -198,22 +229,22 @@ def read_recent(self, type_: str, limit: int = 5) -> list[dict]: ) return results - # ── Hit count (increase reference weight) ───────── + # ── 命中计数(增加引用权重)────────────────────────── def record_hit(self, record_id: int): - """Increment hit count for a record.""" + """递增某条记录的 hits 计数,并记录最后访问时间。""" with self._lock: self._ensure_ready() self._conn.execute( - "UPDATE entries SET hits = hits + 1 WHERE id = ?", - (record_id,), + "UPDATE entries SET hits = hits + 1, last_accessed_at = ? WHERE id = ?", + (time.time(), record_id), ) self._conn.commit() - # ── Internal methods ──────────────────────────── + # ── 内部方法 ──────────────────────────────────────── def _prune(self, type_: str): - """Tiered pruning. Must be called inside self._lock.""" + """分级淘汰。必须在 self._lock 内调用。""" if self._conn is None: return cursor = self._conn.execute( @@ -232,13 +263,65 @@ def _prune(self, type_: str): ).rowcount self._conn.commit() if _deleted > 0: - logger.info("Pruned %d un-referenced %s records (> %d days)", _deleted, type_, _PRUNING_KEEP_DAYS) + logger.info("已修剪 %d 条从未引用过的 %s 记录(> %d 天)", _deleted, type_, _PRUNING_KEEP_DAYS) + + # ── 清理 ──────────────────────────────────────────── + + def apply_aging(self, age_cutoff_days: int = 30, decay: float = 0.5): + """知识老化:超过 age_cutoff_days 没有访问的记录,hit 值衰减。 + + 只在每 100 次写入时自动触发。 + Phase 5 增强:hit=1 且 60 天未访问的记录自动清理。 + """ + with self._lock: + self._ensure_ready() + cutoff = time.time() - age_cutoff_days * 86400 + # 对 last_accessed_at < cutoff 且 hits > 1 的记录衰减 hits + cursor = self._conn.execute( + "UPDATE entries SET hits = MAX(1, CAST(hits * ? AS INTEGER)) " + "WHERE last_accessed_at > 0 AND last_accessed_at < ? AND hits > 1", + (decay, cutoff), + ) + affected = cursor.rowcount + if affected > 0: + logger.info("知识老化: %d 条记录 hit 衰减(×%.1f)", affected, decay) + + # ── Phase 5 增强:hit=1 且 60 天未访问 → 自动清理(噪音数据) ── + _noise_cutoff = time.time() - 60 * 86400 + cursor = self._conn.execute( + "DELETE FROM entries WHERE hits = 1 AND last_accessed_at < ? " + "AND last_accessed_at > 0", + (_noise_cutoff,), + ) + _noise_count = cursor.rowcount + if _noise_count > 0: + logger.info("噪音清理: 删除 %d 条 hit=1 的僵尸记录", _noise_count) + + # ── Phase 5 增强:空 content 记录清理 ── + cursor = self._conn.execute( + "DELETE FROM entries WHERE content IS NULL OR TRIM(content) = ''" + ) + _empty_count = cursor.rowcount + if _empty_count > 0: + logger.info("空值清理: 删除 %d 条空 content 记录", _empty_count) + + if affected > 0 or _noise_count > 0 or _empty_count > 0: + self._conn.commit() - # ── Cleanup ─────────────────────────────────────── + def _checkpoint(self): + """主动 checkpoint WAL,防止 WAL 文件膨胀。""" + try: + cursor = self._conn.execute("PRAGMA wal_checkpoint(TRUNCATE)") + _, pages, _ = cursor.fetchone() + if pages > 0: + logger.info("WAL checkpoint: %d pages", pages) + except Exception as e: + logger.warning("WAL checkpoint 失败: %s", e) def close(self): with self._lock: if self._conn: + self._checkpoint() self._conn.close() self._conn = None - logger.info("Storage engine closed") + logger.info("存储引擎已关闭") diff --git a/main.py b/main.py index b86c9ac..76232e2 100644 --- a/main.py +++ b/main.py @@ -1,7 +1,7 @@ #!/usr/bin/env python3 """ -gbase_8440.py — GBase 版"高达"飞书入口 -接管原高达的飞书 Bot (cli_aa843ca68c7a9cba) + 端口 8440, +gbase_8440.py — GBase 飞书 Bot 入口 +接管飞书 Bot (cli_aa843ca68c7a9cba) + 端口 8440, 用 GBase/GBase Kernel 取代 Hermes CLI 的大脑。 用法: @@ -40,7 +40,7 @@ os.environ[_key] = _value logger.info(".env 已加载 (%s)", _env_path) -# ── 原高达的飞书 Bot 配置(从环境变量读取,不硬编码) ── +# ── 飞书 Bot 配置(从环境变量读取,不硬编码) ── APP_ID = os.environ.get("FEISHU_APP_ID", "") APP_SECRET = os.environ.get("FEISHU_APP_SECRET", "") ENCRYPT_KEY = os.environ.get("FEISHU_ENCRYPT_KEY", "") @@ -377,7 +377,7 @@ async def _startup_guard(): asyncio.create_task(channel.start_heartbeat()) asyncio.create_task(_startup_guard()) logger.info("━━━━━━━━━━━━━━━━━━━") - logger.info("GBase 版高达 (Gundam) 飞书通道启动") + logger.info("GBase 飞书通道启动") logger.info(f"端口: {PORT}, Bot: {APP_ID}") logger.info(f"身份: {IDENTITY_NAME}, 模型: {model}") logger.info(f"数据目录: {DATA_DIR}") diff --git a/rules/AGENCY.md b/rules/AGENCY.md new file mode 100644 index 0000000..33d6811 --- /dev/null +++ b/rules/AGENCY.md @@ -0,0 +1,24 @@ +# AGENCY.md — 独立开发者工作流 + +## 身份定位 +你是 GBase 框架上的独立开发者,不是通用助手。 + +## 工作流程 + +1. **理解 → 拆解 → 执行 → 交付** + - 不绕圈,不请示中间状态 + - 复杂任务拆成可执行步骤,逐步骤执行 + +2. **工具的终极归宿是让工具自己干实权活** + - LLM 只做逻辑决策和组合,不做人工搬运 + - 每个工具只做它该做的事 + +3. **完成后自动沉淀** + - 重要的发现 → Knowledge(结构化存储) + - 执行经验 → Experience(经验引擎) + - 项目状态 → Glink(事件总线,如可用) + +4. **行为锚点** + - 收到任务 → 消化系统 prompt 中的 Knowledge + - 工具结果回来 → 评估对任务进度的贡献 + - 觉得自己做完了 → 确认无遗漏再交付 diff --git a/rules/FINISH.md b/rules/FINISH.md new file mode 100644 index 0000000..eedc625 --- /dev/null +++ b/rules/FINISH.md @@ -0,0 +1,27 @@ +# FINISH.md — "干到底规则" + +## 核心原则 +**不完成不回话,不干活不休息。** + +## 执行规则 + +1. **不要在完成前主动向用户汇报中间进度** + - 不报正在做什么 + - 不报"看起来要花一些时间" + - 不报"现在开始做X" + - 用 `reply()` 意味着你要说最终结果了,不是中间状态 + +2. **允许汇报仅三种情况** + - 所有可用工具全部失效,确实无法继续 + - 缺少关键资源(API Key、权限、硬件),需要用户提供 + - 任务已完成,准备交付 + +3. **遇到问题先搜再问** + - 工具参数报错 → 先 `read_file` 看函数签名 + - 卡住了 → 换工具、换角度、换搜索词 + - 问用户是最后选项 + +4. **一次跑通,不留尾巴** + - 完成 → 写回复 + - 不写"需要您确认才能继续" + - 不可交付的状态不算完成 diff --git a/rules/THINKING.md b/rules/THINKING.md new file mode 100644 index 0000000..0c6d60b --- /dev/null +++ b/rules/THINKING.md @@ -0,0 +1,21 @@ +# THINKING.md — 思考流程规范 + +## 七步思维流程(由深到浅) + +1. **先忆后搜(P0)** — 收到消息后,先不急着调工具 + - Knowledge 自动检索已经在系统 prompt 里注入 + - 先消化已有的知识事实,再用搜索工具补充不足 + +2. **分类任务** — 判断用户意图:执行(操作类) vs 讨论(咨询/闲聊) + - 执行类 → 最少调 1 个工具,不直接文字回复 + - 讨论类 → 允许文字回复,但优先引用已有知识 + +3. **由深到浅搜索** — 先本地知识(memory/knowledge),再网络搜索,最后问用户 + +4. **工具优先** — 能用工具的事不用文字回答(查状态、写文件、执行代码) + +5. **失败换向** — 同问题换 2 个工具失败 → 换方向,不原地硬扛 + +6. **遇错自修** — 工具参数报错 → 先 read_file 看签名,不猜参数名 + +7. **完成才交** — 任务结束前不主动汇报进度 diff --git a/tools/__init__.py b/tools/__init__.py index 74b31b8..e3f796f 100644 --- a/tools/__init__.py +++ b/tools/__init__.py @@ -34,6 +34,9 @@ test_generator, # noqa: F401 xlsx_gen, # noqa: F401 yf_image_tools, # noqa: F401 + archive_search, # noqa: F401 + glink_projects, # noqa: F401 + remember_info, # noqa: F401 ) diff --git a/tools/archive_search.py b/tools/archive_search.py new file mode 100644 index 0000000..85917ff --- /dev/null +++ b/tools/archive_search.py @@ -0,0 +1,64 @@ +""" +archive_search.py — Archive store 搜索工具(GMem Phase A1) + +精确回查历史对话的 Archive Store 搜索工具。\n让 Agent 能像使用 lcm_grep 一样精确回查历史对话。 +""" + +import logging +import time +from typing import Optional + +from lib.toolkit import tool, get_global + +logger = logging.getLogger("archive_search") + +@tool +def archive_search(query: str, max_results: int = 5, session_only: bool = False) -> str: + """搜索 archive_store 中的历史对话记录。 + + 当你需要回忆之前做过的任务、用户提过的要求、讨论过的技术方案、用户说过的话时用这个。 + 比 Knowledge(dat.db)更全面,因为它存储的是完整的对话内容,不会受压缩或老化策略影响。 + + Args: + query: 搜索关键词(中文自动分词,支持2字以上关键词) + max_results: 返回条数上限,默认5 + session_only: 是否只搜当前会话(默认False,搜索全量历史) + + Returns: + 匹配的历史记录列表,每行格式:[时间] [角色] 内容摘要 + 如无匹配则返回"未找到相关历史记录" + """ + # 获取全局 archive_store 实例 + archive_store = get_global("archive_store") + if not archive_store: + return "⚠️ archive_store 未初始化,无法搜索历史" + + if not query or not query.strip(): + return "⚠️ 搜索关键词为空" + + try: + if session_only: + results = archive_store.search(query, top_k=max_results) + else: + # 全局搜索:用空 session_key 的 fallback,看 archive_store 是否支持跨 session + # 先尝试当前 session + results = archive_store.search(query, top_k=max_results) + + if not results: + return "未找到相关历史记录" + + lines = [] + for i, r in enumerate(results, 1): + ts = r.get("timestamp", 0) + time_str = time.strftime("%m-%d %H:%M", time.localtime(ts)) if ts else "???" + role = r.get("role", "unknown") + content = r.get("content", "") + # 截取前 300 字作为摘要 + summary = content[:300].replace("\n", " ") + lines.append(f"{i}. [{time_str}] [{role}] {summary}") + + return "\n".join(lines) + + except Exception as e: + logger.exception("archive_search 出错") + return f"⚠️ 搜索出错: {e}" diff --git a/tools/glink_projects.py b/tools/glink_projects.py new file mode 100644 index 0000000..b05cab3 --- /dev/null +++ b/tools/glink_projects.py @@ -0,0 +1,208 @@ +# SPDX-License-Identifier: MIT +""" +gbase/tools/glink_projects.py + +Glink 项目记忆工具 — 让战甲通过 @tool 使用 Glink 的项目引擎。 +战甲调扎古的 Glink daemon (8426)。 +""" + +import logging +import os +import re + +import httpx + +from lib.toolkit import register_toolset, tool + +logger = logging.getLogger(__name__) + +GLINK_BASE = os.environ.get("GLINK_BASE", "http://127.0.0.1:8426") +GLINK_TOKEN = os.environ.get("GLINK_API_TOKEN", "glink-secret-2026") + + +def _headers() -> dict: + h = {"Content-Type": "application/json"} + if GLINK_TOKEN: + h["Authorization"] = f"Bearer {GLINK_TOKEN}" + return h + + +def _sanitize(name: str) -> str: + return re.sub(r"[^a-zA-Z0-9_-]", "_", name)[:64] + + +# ── 公共工具 ──────────────────────────────────────────── + + +@tool() +async def tool_project_init(project_id: str, context: str = "") -> dict: + """在 Glink 中创建或重建一个项目。所有项目的上下文、进度和事件都通过 Glink 统一管理。 + + Args: + project_id: 项目标识符(字母数字下划线,最长64字符) + context: 可选的项目上下文 Markdown + + Returns: + {"status": "ok", "project_id": "...", "path": "..."} + """ + tid = _sanitize(project_id) + async with httpx.AsyncClient(timeout=10) as client: + resp = await client.post( + f"{GLINK_BASE}/project", + json={"project_id": tid, "context": context}, + headers=_headers(), + ) + return resp.json() + + +@tool() +async def tool_project_read_context(project_id: str) -> str: + """读取 Glink 项目的 context.md 内容。 + + Args: + project_id: 项目标识符 + + Returns: + context 文本(如项目不存在返回 '') + """ + tid = _sanitize(project_id) + async with httpx.AsyncClient(timeout=10) as client: + resp = await client.get( + f"{GLINK_BASE}/project/{tid}/context", + headers=_headers(), + ) + data = resp.json() + return data.get("context", "") + + +@tool() +async def tool_project_update_context( + project_id: str, + context: str = "", + event_type: str = "", + event_detail: str = "", +) -> dict: + """更新 Glink 项目的 context.md,并可选追加事件记录。 + + Args: + project_id: 项目标识符 + context: 新的完整 context Markdown(留空不更新 context) + event_type: 事件类型,如 'step.completed'、'milestone.reached'、'decision.made' + event_detail: 事件描述 + + Returns: + {"status": "ok", "project_id": "..."} + """ + tid = _sanitize(project_id) + async with httpx.AsyncClient(timeout=15) as client: + if context: + ctx_resp = await client.post( + f"{GLINK_BASE}/project/{tid}/context", + json={"context": context}, + headers=_headers(), + ) + if ctx_resp.json().get("error"): + return ctx_resp.json() + + if event_type: + evt_resp = await client.post( + f"{GLINK_BASE}/project/{tid}/event", + json={ + "type": event_type, + "agent": "zaku", + "detail": event_detail, + }, + headers=_headers(), + ) + if evt_resp.json().get("error"): + return evt_resp.json() + + return {"status": "ok", "project_id": tid} + + +@tool() +async def tool_project_list() -> list: + """列出 Glink 中所有注册的项目。 + + Returns: + 项目列表 + """ + async with httpx.AsyncClient(timeout=10) as client: + resp = await client.get(f"{GLINK_BASE}/projects", headers=_headers()) + return resp.json().get("projects", []) + + +@tool() +async def tool_project_get(project_id: str) -> dict: + """获取 Glink 项目的概览(进度、最后事件、context 摘要)。 + + Args: + project_id: 项目标识符 + + Returns: + 项目详情字典 + """ + tid = _sanitize(project_id) + async with httpx.AsyncClient(timeout=10) as client: + resp = await client.get(f"{GLINK_BASE}/project/{tid}", headers=_headers()) + return resp.json() + + +@tool() +async def tool_project_events(project_id: str) -> list: + """读取 Glink 项目的事件流。 + + Args: + project_id: 项目标识符 + + Returns: + 事件列表(按时间正序) + """ + tid = _sanitize(project_id) + async with httpx.AsyncClient(timeout=10) as client: + resp = await client.get( + f"{GLINK_BASE}/project/{tid}/events", + headers=_headers(), + ) + return resp.json().get("events", []) + + +@tool() +async def tool_project_archive(project_id: str) -> dict: + """归档一个 Glink 项目(归档后不再活跃,但数据保留)。 + + Args: + project_id: 项目标识符 + + Returns: + {"status": "ok", "archived": true} + """ + tid = _sanitize(project_id) + async with httpx.AsyncClient(timeout=10) as client: + resp = await client.post( + f"{GLINK_BASE}/project/{tid}/archive", + headers=_headers(), + ) + return resp.json() + + +# ── 注册到 toolset ──────────────────────────────────── + + +def register(): + register_toolset( + "glink_projects", + [ + "项目", "项目上下文", "项目进度", "项目事件", + "project", "context", "glink", + ], + [ + "tool_project_init", + "tool_project_read_context", + "tool_project_update_context", + "tool_project_list", + "tool_project_get", + "tool_project_events", + "tool_project_archive", + ], + ) diff --git a/tools/mirror_tool.py b/tools/mirror_tool.py index fdc910d..9be16dc 100644 --- a/tools/mirror_tool.py +++ b/tools/mirror_tool.py @@ -1,5 +1,5 @@ """ -mirror_tool.py — 高达版记忆工具 +mirror_tool.py — 记忆工具 独立化的 mirror 工具集 """ diff --git a/tools/remember_info.py b/tools/remember_info.py new file mode 100644 index 0000000..e6e507c --- /dev/null +++ b/tools/remember_info.py @@ -0,0 +1,189 @@ +"""remember_info — 统一记忆路由工具 + +自动判断内容类型,写入正确的存储层级: +- 凭据/配置/事实 → Knowledge (L2) +- 调研结论/学习心得 → Notes (L4) +- 行为教训/模式 → Experience (L3) +""" + +import logging +from typing import Optional +from lib.toolkit import tool, get_global + +logger = logging.getLogger(__name__) + + +# ── 分类关键词 ── +_KNOWLEDGE_KW = [ + "密钥", "key", "key", "token", "密码", + "端口", "地址", "路径", "配置", "域名", "URL", "url", + "账号", "API", "api", "secret", + "版本", "版本号", "型号", "型号", + "安装", "安装目录", "家目录", "home", + "生日", "出生", "年龄", "关系", # 个人信息 +] + +_NOTE_KW = [ + "学到了", "总结", "总结一下", "心得", "笔记", + "调研", "调研报告", "文章", "论文", "读了", + "学习了", "学习了", "摘要", "提炼", + "概念", "概念理解", "原理", + "框架", "模式", "范式", +] + +_EXPERIENCE_KW = [ + "教训", "经验", "教训", "踩坑", + "下次注意", "下次要", "以后先", "应该先", + "根因是", "根因", "原因", "原因是", + "学到的", "学到", "lesson", + "记一条", "记住", "rule", "规则", + "模式", "pattern", +] + + +def _classify_content(content: str) -> str: + """判断内容类型:knowledge / note / experience""" + cl = content.lower() + + # 先匹配力度最高的 + for kw in _EXPERIENCE_KW: + if kw.lower() in cl: + return "experience" + + knowledge_score = sum(1 for kw in _KNOWLEDGE_KW if kw.lower() in cl) + note_score = sum(1 for kw in _NOTE_KW if kw.lower() in cl) + + if knowledge_score >= note_score and knowledge_score > 0: + return "knowledge" + if note_score > 0: + return "note" + + # 默认:长内容(>200字)是笔记,短内容是一条知识 + if len(content) > 200: + return "note" + return "knowledge" + + +@tool() +async def remember_info( + content: str, + title: str = "", + tags: str = "", + source: str = "", + force_type: Optional[str] = None, + with_kw_category: str = "general", +) -> dict: + """统一记忆入口——自动判断内容类型写入正确层级。 + + 什么时候用: + *任何时候想存东西,都用这个工具。* 不要再直接调 remember_fact / note_write。 + 它会自动判断内容类型,写入 Knowledge / Notes / Experience。 + + Args: + content: 要记住的内容。自动判断类型。 + title: 标题(仅对 note 有效,knowledge 和 experience 自动用前20字) + tags: 逗号分隔的标签,方便搜索 + source: 来源描述(如"与用户对话"、"从trace提炼") + force_type: 强制指定类型。可选:knowledge / note / experience / auto + 默认 auto(自动判断) + with_kw_category: 当内容判断为 knowledge 时的分类(默认 general) + """ + ftype = (force_type or "auto").lower() + if ftype == "auto": + ftype = _classify_content(content) + + if ftype == "knowledge": + # 从 content 推断标题 + auto_title = content[:40] if len(content) > 40 else content + # 记忆一条事实 + from tools.knowledge import remember_fact + result = await remember_fact( + fact=content, + category=with_kw_category, + tags=tags, + ) + return { + "ok": result.get("id") is not None if isinstance(result, dict) else False, + "type": "knowledge", + "id": result.get("id"), + "detail": f"已写入 Knowledge: {auto_title}…" if len(content) > 40 else f"已写入 Knowledge: {content}", + } + + elif ftype == "note": + auto_title = title or content[:30] + from tools.note_tool import note_write + result = await note_write( + title=auto_title, + content=content, + tags=tags, + source=source or "remember_info", + ) + return { + "ok": result.get("ok") if isinstance(result, dict) else result, + "type": "note", + "detail": f"已写入 Notes: {auto_title}", + } + + elif ftype == "experience": + # 写入 experience(通过 storage 直接写) + storage = get_global("storage") + if not storage: + return {"error": "存储引擎未初始化", "ok": False} + + auto_title = title or content[:30] + summary = auto_title + # Experience 用 rule="user_lesson" + from tools.knowledge import remember_fact + result = await remember_fact( + fact=content, + category="workflow", + tags=tags or "lesson,experience", + ) + # 同时写入一条 rule-based experience + try: + with storage._lock: + now = __import__("time").time() + payload = { + "content": content, + "summary": summary, + "rule": "learned_lesson", + "confidence": "high" if "教训" in content or "lesson" in content.lower() else "medium", + "source": source or "remember_info", + "tags": [t.strip() for t in tags.split(",") if t.strip()] if tags else ["lesson"], + } + import json + storage._conn.execute( + "INSERT INTO entries (type, content, summary, created_at, confidence, rule) " + "VALUES (?, ?, ?, ?, ?, ?)", + ("experience", json.dumps(payload), summary, now, + payload["confidence"], payload["rule"]), + ) + storage._conn.commit() + exp_id = storage._conn.lastrowid + except Exception as e: + logger.warning("experience 写入失败(不阻断): %s", e) + exp_id = None + + return { + "ok": result.get("id") is not None if isinstance(result, dict) and result.get("ok") is not False else True, + "type": "experience", + "id": exp_id, + "detail": f"已写入 Experience + Knowledge: {auto_title}", + } + + else: + return {"error": f"不支持的强制类型: {force_type}", "ok": False} + + +@tool() +async def remember_info_usage() -> dict: + """展示 remember_info 的用法示例。""" + return { + "usage": "任何时候想存东西,都用 remember_info 而不是 remember_fact 或 note_write", + "examples": [ + 'remember_info(content="DB_CONNECTION=localhost:3306") # → Knowledge', + 'remember_info(content="我看完了美眉的三篇配色笔记,总结:80-15-5配色法…", source="学习美眉笔记") # → Notes', + 'remember_info(content="教训:写代码前先 read_file 看参数签名,不要猜参数名", force_type="experience") # → Experience', + 'remember_info(content="端口 8443 是 your-agent", tags="port,config") # → Knowledge', + ], + } diff --git a/tools/self_edit.py b/tools/self_edit.py index b24b0e6..f557c21 100644 --- a/tools/self_edit.py +++ b/tools/self_edit.py @@ -1,8 +1,8 @@ # SPDX-License-Identifier: MIT """ -self_edit.py — 波塞冬自修代码工具 +self_edit.py — Agent 自修代码工具 -让波塞冬能安全地修改自己的源代码(tools/、lib/ 下的 .py 文件)。 +让 Agent 能安全地修改自己的源代码(tools/、lib/ 下的 .py 文件)。 核心安全机制: 1. 改前自动备份到 ~/.gbase_rollback/ 2. 改后自动语法检查 @@ -288,7 +288,7 @@ async def self_edit_rollback(path: str, version: str = "") -> dict: @tool() async def self_edit_restart() -> dict: - """重启波塞冬进程(launchd 自动拉起) + """重启 Agent 进程(launchd 自动拉起) 修改 lib/ 下的代码后需要重启才能生效。 launchd KeepAlive 配置会在进程退出后自动重新拉起。 @@ -315,7 +315,7 @@ def _delayed_exit(): async def self_edit_read_source(path: str, offset: int = 0, max_chars: int = 8000) -> dict: """读取自己的源码文件(tools/、lib/ 下的 .py 文件) - 波塞冬的 read_file 主要用于读外部文件(用户项目、文档等)。 + Agent 的 read_file 主要用于读外部文件(用户项目、文档等)。 这个工具专门用于读自己的源码,方便定位和修复 bug。 Args: diff --git a/tools/test_gen.py b/tools/test_gen.py index 4f45dca..ab68397 100644 --- a/tools/test_gen.py +++ b/tools/test_gen.py @@ -7,7 +7,7 @@ async def main(): content = [ - {'type': 'cover', 'title': '2026年中国人工智能行业研究报告', 'subtitle': 'AI Industry Research Report 2026', 'date': '2026年5月', 'author': '高达研究部'}, + {'type': 'cover', 'title': '2026年中国人工智能行业研究报告', 'subtitle': 'AI Industry Research Report 2026', 'date': '2026年5月', 'author': 'GBase Research'}, {'type': 'toc'}, {'type': 'h1', 'text': '第一章 行业概述'}, {'type': 'p', 'text': '2025年,中国人工智能产业规模突破2.1万亿元,同比增长32.5%。'}, @@ -44,7 +44,7 @@ async def main(): title='AI行业研究报告', content=content, subtitle='2026年中国人工智能行业研究报告', - author='高达研究部', + author='GBase Research', output_path='$HOME/Downloads/AI_Report_2026_v1.pdf', color_theme='mckinsey', show_toc=True, diff --git a/tools/trident_tools.py b/tools/trident_tools.py index 1987a58..4f5ee71 100644 --- a/tools/trident_tools.py +++ b/tools/trident_tools.py @@ -1,14 +1,14 @@ #!/usr/bin/env python3 """ -波塞冬 Trident 三叉戟工具集 +Agent Trident 三叉戟工具集 ──────────────────────────── -让波塞冬可以: +让 Agent 可以: 1. 用 Trident CC 写代码(执行实现任务) 2. 用 Trident X 审查/补刀(代码审计 + ApplyPatch) 3. 通过 Trident Glink 编排项目工作流 4. 探查 CC/X 的健康状态 -用法:波塞冬直接调以下 @tool 函数。 +用法:Agent直接调以下 @tool 函数。 底层走 HTTP 直连 Trident CC(8443)/ X(8444)/ Glink(8427), 不经过 Lancer 那套 shared/ 底座,完全独立。 """ @@ -46,7 +46,7 @@ async def _ask(agent_url: str, task: str) -> dict: return {"error": str(e)} -# ── 波塞冬的经验:如何用好 CC 和 X ── +# ── 使用经验:如何用好 CC 和 X ── # # 经验 1:不要把 CC 当搜索用。CC 是代码臂,给它写代码任务。 # 搜索信息直接用 anysearch_search,CC 只用来看文件和改代码。 @@ -84,7 +84,7 @@ async def _ask(agent_url: str, task: str) -> dict: @tool() async def trident_help() -> dict: - """返回 Trident CC/X 的使用指南(波塞冬的备忘录)""" + """返回 Trident CC/X 的使用指南(备忘录)""" return { "cc": { "port": 8443, @@ -240,7 +240,7 @@ async def glink_workflow(project: str, steps: list) -> dict: 各步骤的 executor 可以是: - "Trident-CC" — 代码实现 - "Trident-X" — 代码审计 - - "波塞冬" — 你自己(用你自己的工具处理) + - "your-agent" — 你自己(用你自己的工具处理) Returns: dict: 工作流状态与各步骤结果 @@ -255,7 +255,7 @@ async def glink_workflow(project: str, steps: list) -> dict: result = await cc_execute(task, step.get("project_dir")) elif executor == "Trident-X": result = await x_audit(task) - elif executor == "波塞冬": + elif executor == "your-agent": result = {"note": f"步骤 '{title}' 分配给自己执行,需要自行处理"} else: result = {"error": f"未知执行者: {executor}"}