Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 34 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,27 @@
# Changelog

## [0.4.1] - 2026-06-09

### Added
- FTS5 token filter: fixed `detail=column` issue where pure-digit/alpha tokens were parsed as column names (port from Opprime)
- Knowledge auto-retrieval: record_hit() on every auto-search hit for cache warmth
- _recorded_tc: track total tool calls per loop execution for SkillOpt gate integration
- _build_gmem_summary: standalone helper for archive_store compression summary construction
- tools/remember_info.py: unified memory route — auto-classifies user content into Knowledge/Notes/Experience
- tools/archive_search.py: Archive Store search tool for cross-session memory recall

### Changed
- kernel.py: merged Opprime improvements (FTS5 safety filter + Knowledge hit recording + loop tc tracking)
- session.py: synchronized with Opprime (fixes for L1 compaction type mismatch, image_url filtering)
- experience.py: synchronized with Opprime (improved JSON error tolerance)
- storage.py: synchronized with Opprime (aging mechanism for Knowledge entries)
- archive_store.py: generalized agent-specific DB fallback paths to `~/gbase-home/` (public-ready)

### Fixed
- knowledge.py: FTS5 MATCH returning 0 matches due to unpurged detail=column token pollution
- session.py: content null leading to DeepSeek serde enum variant mismatch (400 error)
- session.py: L1 compression type mismatch (passing str instead of list[dict])

## [0.4.0] - 2026-06-02

### Added
Expand All @@ -14,6 +36,18 @@
- L4 permanent notes: auto-serialize when tool calls >= 5 or reply >= 500 chars
- 20+ new tools: self_edit, feishu_send, note_tool, knowledge, mirror_tool, chain, mail, security_watch, etc.

### Added
- ArchiveStore: full-text conversation archive with LIKE-based semantic retrieval (replaces LLM compression)
- Session: removed 3-layer LLM compression, replaced with ArchiveStore append/search
- Territory safety: cross-agent read/write access control (write blocking, read warning)
- RSI Dual-Knob: task intent classification → dynamic temperature control
- Time-decay weighted retrieval: M3 sparse attention inspired (7d full / 7-30d linear / 30d+ exponential)
- Entity conflict detection: Cosmos 3 inspired, auto-detect contradictions on write
- Hot query cache (LRU, max 64) for high-frequency entity lookups
- Archive trash recovery: deleted entries saved to `data/archive_trash/` as grep-able JSONL
- L4 permanent notes: auto-serialize when tool calls >= 5 or reply >= 500 chars
- 20+ new tools: self_edit, feishu_send, note_tool, knowledge, mirror_tool, chain, mail, security_watch, etc.

### Changed
- kernel.py: archive_store init + semantic bridge search/save (disabled old online LLM compression)
- session.py: replaced compress_l1/l2/async_compress with archive-driven context building
Expand Down
74 changes: 65 additions & 9 deletions lib/archive_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
hits = store.search("query keywords")
"""

import contextlib
import json
import logging
import os
Expand Down Expand Up @@ -136,7 +135,10 @@ def append(self, role: str, content: str | list | dict, *, priority: int = 0, so
if not content:
return

content = json.dumps(content, ensure_ascii=False) if isinstance(content, (list, dict)) else str(content)
if isinstance(content, (list, dict)):
content = json.dumps(content, ensure_ascii=False)
else:
content = str(content)

if len(content) > _MAX_CONTENT_CHARS:
content = content[:_MAX_CONTENT_CHARS] + "..."
Expand Down Expand Up @@ -520,7 +522,7 @@ def search(self, query: str, top_k: int = _DEFAULT_SEARCH_TOP_K,
params: list = [self.session_key]

# 关键词条件
kw_conditions = " OR ".join("content LIKE ? COLLATE NOCASE" for _ in keywords)
kw_conditions = " OR ".join(f"content LIKE ? COLLATE NOCASE" for _ in keywords)
where_parts.append(f"({kw_conditions})")
params.extend(f"%{k}%" for k in keywords)

Expand Down Expand Up @@ -552,7 +554,7 @@ def search(self, query: str, top_k: int = _DEFAULT_SEARCH_TOP_K,

# 计算每条记录的命中关键词数(作为粗糙的 BM25 替代)
scored = []
for content, role, ts, priority, _source_id, _eid in all_rows:
for content, role, ts, priority, source_id, eid in all_rows:
if not content:
continue
hits = sum(1 for kw in keywords if kw in content or kw.lower() in content.lower())
Expand Down Expand Up @@ -632,11 +634,11 @@ def _extract_keywords(self, query: str) -> list[str]:
"这样", "那样", "可能", "需要", "之后", "之前", "现在",
"我们", "他们", "你们", "自己", "一些", "这些", "那些",
"谢谢", "你好", "请问", "好的", "是的", "知道", "觉得",
"然后", "或者", "除了", "不想", "想要", "打算",
"看到", "听说", "告诉", "我的", "你的", "他的",
"然后", "或者", "还是", "除了", "不想", "想要", "打算",
"看到", "听说", "觉得", "告诉", "我的", "你的", "他的",
"大家", "东西", "时候", "不错", "真的", "非常", "很多",
"工作", "生活", "事情", "感觉", "方面", "一点", "一定",
"还有", "出来",
"还有", "因为", "出来",
}
keywords = [k for k in keywords if k not in _COMMON_BIGRAMS]

Expand Down Expand Up @@ -726,8 +728,10 @@ def close(self):
self.flush()

def __del__(self):
with contextlib.suppress(Exception):
try:
self.close()
except Exception:
pass


# ── 旧数据迁移 ─────────────────────────────────────
Expand Down Expand Up @@ -765,6 +769,58 @@ def _save_trash(session_key: str, rows: list[tuple]):
logger.warning("归档Write失败(不影响主流程): %s", e)


def recent_global(limit: int = 10, hours: int = 72) -> dict:
"""跨 session 获取最近 N 小时的全局 markers(Phase 4 学用对接)。

不限制 session_key,只按时间过滤。
用于 session 预热时注入同主题历史。
"""
import sqlite3, time, datetime

# 找 archive.db(尝试多个位置)
candidates = [
os.path.join(os.path.dirname(__file__), "..", "data", "archive.db"),
os.path.expanduser("~/gbase-home/data/archive.db"),
]
db_path = None
for c in candidates:
p = os.path.abspath(c)
if os.path.exists(p):
db_path = p
break
if not db_path:
return {"markers": [], "count": 0, "db": None}

cutoff_ts = time.time() - hours * 3600

with _LOCK:
try:
conn = sqlite3.connect(db_path)
cursor = conn.execute(
"SELECT marker, timestamp, session_key FROM archive_markers "
"WHERE timestamp >= ? "
"ORDER BY timestamp DESC LIMIT ?",
(cutoff_ts, limit),
)
rows = cursor.fetchall()
conn.close()
except Exception:
return {"markers": [], "count": 0, "db": db_path}

result = []
for marker, ts, skey in rows:
dt = datetime.datetime.fromtimestamp(ts)
skey_short = skey.split(":")[-1][:20] if skey else ""
result.append({
"marker": marker[:120],
"timestamp": ts,
"time_str": dt.strftime("%m-%d %H:%M"),
"session": skey_short,
})

return {"markers": result, "count": len(result), "db": db_path}


def _copy_old_data(dat_db_path: str, archive_db_path: str):
"""从 dat.db 导入旧 experience/knowledge 数据到 archive.db(一次性)。"""
if not os.path.exists(dat_db_path):
Expand All @@ -779,7 +835,7 @@ def _copy_old_data(dat_db_path: str, archive_db_path: str):
cursor = conn.cursor()

# 从 entries table 找 experience 和 knowledge
for tbl, _pri in [("entries", 1)]:
for tbl, pri in [("entries", 1)]:
try:
cursor.execute(f"SELECT content, type FROM {tbl} WHERE content IS NOT NULL AND content != ''")
for content, typ in cursor.fetchall():
Expand Down
54 changes: 33 additions & 21 deletions lib/experience.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,19 +40,14 @@
"summary": "此次任务工具调用次数偏多({tool_calls_count}次),下次同类任务应该先规划再调工具",
"confidence": "medium",
},
{
"name": "short_reply",
"check": lambda ctx: len(ctx.get("reply", "")) < 80,
"summary": "回复长度偏短({reply_len}字),下次应尽量提供更完整的回答",
"confidence": "low",
},

{
"name": "api_error",
"check": lambda ctx: ctx.get("has_api_error", False),
"summary": "工具调用时有 API 错误,下次应注意检查工具是否可用",
"confidence": "high",
},
# ── 反脆弱: 失败尝试也写入经验,不静默Rollback ──
# ── 反脆弱: 失败尝试也写入经验,不静默回滚 ──
{
"name": "failed_action",
"check": lambda ctx: bool(ctx.get("has_failure", False)),
Expand All @@ -62,16 +57,16 @@
{
"name": "failed_rollback",
"check": lambda ctx: bool(ctx.get("rollback_occurred", False)),
"summary": "执行Rollback: [{rollback_action}] 验证失败,已Rollback。这条路走不通。",
"summary": "执行回滚: [{rollback_action}] 验证失败,已回滚。这条路走不通。",
"confidence": "medium",
},
# ── 反脆弱: 成功模式提炼(成功比失败更需要分析)──
{
"name": "success_pattern",
"check": lambda ctx: ctx.get("tool_calls_count", 0) > 0
"check": lambda ctx: ctx.get("tool_calls_count", 0) >= 3
and not ctx.get("has_api_error", False)
and not ctx.get("has_failure", False),
"summary": "成功完成[{task_theme}],工具调用{successful_calls}次。有效策略:{effective_strategy}",
"summary": "有效模式: [{task_theme}] 用 {tool_calls_count} 次工具调用完成",
"confidence": "medium",
},
]
Expand Down Expand Up @@ -150,11 +145,11 @@ def _is_duplicate_rule(storage: "store_module.Storage", rule_name: str) -> bool:
"""


# ── Experience extraction器 ──────────────────────────────────────────
# ── 经验提取器 ──────────────────────────────────────────


class ExperienceEngine:
"""Experience Engine。绑定到一个 Storage 实例上运作。"""
"""经验引擎。绑定到一个 Storage 实例上运作。"""

def __init__(self, storage: store_module.Storage):
self.storage = storage
Expand Down Expand Up @@ -204,10 +199,10 @@ async def extract(

if _is_duplicate_rule(self.storage, rule_name):
self._skip_count[rule_name] = self._skip_count.get(rule_name, 0) + 1
logger.debug("Experience deduplication跳过: rule=%s (已跳过%d次)", rule_name, self._skip_count[rule_name])
logger.debug("经验去重跳过: rule=%s (已跳过%d次)", rule_name, self._skip_count[rule_name])
return

logger.info("Experience extraction(规则): %s", rule_result["summary"][:60])
logger.info("经验提取(规则): %s", rule_result["summary"][:60])
# --- 如果是成功完成任务自动刻入 insight ---
if tool_calls_count > 0 and not has_api_error and rule_result["type"] != "insight" and not has_failure:
_record_success_insight(self, user_message, tool_calls_count)
Expand Down Expand Up @@ -247,7 +242,7 @@ async def extract(
try:
await self._llm_extract(context, llm_client)
except Exception as e:
logger.warning("Experience extraction(LLM)失败: %s", e)
logger.warning("经验提取(LLM)失败: %s", e)

async def _llm_extract(self, context: dict, client):
"""元认知反思提取 — 从「发生了什么」升级到「为什么发生、如何避免、什么条件下该用不同策略」。
Expand All @@ -270,10 +265,27 @@ async def _llm_extract(self, context: dict, client):
)
text = response.choices[0].message.content.strip()
if text == "null" or not text:
logger.debug("Experience extraction(LLM): 无有价值教训")
logger.debug("经验提取(LLM): 无有价值教训")
return

# 类型防御:LLM 可能返回不完整 JSON(被截断的末尾)
is_clean = False
for try_idx in range(3):
try:
result = json.loads(text)
is_clean = True
break
except json.JSONDecodeError:
# 尝试找到最晚的完整 JSON 截止点
last_brace = text.rfind("}")
if last_brace > 0:
text = text[:last_brace + 1]
else:
break
if not is_clean:
logger.warning("经验提取(LLM): JSON 解析失败,跳过")
return

result = json.loads(text)
if "summary" in result:
# 构建结构化 entry
summary = result["summary"][:200]
Expand Down Expand Up @@ -316,19 +328,19 @@ async def _llm_extract(self, context: dict, client):
except Exception:
pass

logger.info("Experience extraction(元认知反思): %s", summary[:60])
logger.info("经验提取(元认知反思): %s", summary[:60])

# --- 自动刻入 insight(成功任务不留空洞) ---
if context.get("tool_calls_count", 0) > 0 and not context.get("has_api_error", False):
_record_success_insight(self, context.get("user_message", ""), context["tool_calls_count"])

except (json.JSONDecodeError, KeyError) as e:
logger.debug("Experience extraction(LLM)解析失败: %s | 原始响应: %s", e, text[:200] if 'text' in dir() else "N/A")
logger.debug("经验提取(LLM)解析失败: %s | 原始响应: %s", e, text[:200] if 'text' in dir() else "N/A")
except Exception as e:
logger.debug("Experience extraction(LLM)异常: %s", e)
logger.debug("经验提取(LLM)异常: %s", e)

def search(self, query: str, limit: int = 5) -> list[dict]:
"""搜索经验库。优先 FTS5 全文Search,无结果时回退 LIKE 模糊匹配。
"""搜索经验库。优先 FTS5 全文检索,无结果时回退 LIKE 模糊匹配。

排序逻辑:
- 先按 BM25 相关性分 + 内容长度惩罚(太长降级)
Expand Down
35 changes: 35 additions & 0 deletions lib/kernel.py
Original file line number Diff line number Diff line change
Expand Up @@ -466,6 +466,14 @@ def _build_dynamic_system_prompt(self) -> str:
# 中文多字词,拆单字也加进去
for _ch in _w:
_fts_tokens.append(f"{_ch}*")
# FTS5 detail=column 下纯数字/纯单字母 token 会被解析为 column name
_fts_tokens = [t for t in _fts_tokens
if not _import_re.match(r'^\d+$', t)
and not _import_re.match(r'^[a-zA-Z]$', t)
and len(t) > 1]
# 过滤后保底:至少保留原始词保证有查询内容
if not _fts_tokens:
_fts_tokens = [f"{_w}*" for _w in _words if len(_w) > 1]
_fts_query = " OR ".join(_fts_tokens)[:500]
_results = []
with _storage._lock:
Expand Down Expand Up @@ -503,6 +511,12 @@ def _build_dynamic_system_prompt(self) -> str:
+ "\n".join(_results)
)
parts.append(_know_text)
# GMem Phase 1A1: 自动检索命中后 record_hit
for _hit_r in _rows:
try:
_storage.record_hit(_hit_r[0])
except Exception:
logger.exception("记录 hit 失败 (id=%s)", _hit_r[0])
logger.info("Knowledge 自动Search: 命中 %d 条", len(_results))
else:
logger.info("Knowledge 自动Search: 无命中")
Expand Down Expand Up @@ -1711,7 +1725,28 @@ async def _run_one_tool(tc):
if session:
session.append(tr)

# SkillOpt: 记录本轮工具调用数
self._recorded_tc = getattr(self, "_recorded_tc", 0) + len(msg.tool_calls)

# 递归至多 15 层
if depth + 1 >= MAX_TOOL_DEPTH:
return await self._loop(messages, tools, depth=depth + 1, session=session)
return await self._loop(messages, tools, depth=depth + 1, session=session)


# ── GMem Phase B1: 构建压缩摘要(供 archive_store 存档) ──
def _build_gmem_summary(stats: dict, session) -> str:
"""从 session 统计信息构建压缩摘要文本。"""
try:
parts = [f"上下文压缩 checkpoint — 消息数: {stats.get('messages', 0)}, 压缩次数: {stats.get('compactions', 0)}, 层级: {session.get_compaction_level() if hasattr(session, 'get_compaction_level') else 0}"]
# 尝试获取最后几条会话摘要
if hasattr(session, "get_all_compactions"):
compactions = session.get_all_compactions()
for c in compactions[-3:]:
if isinstance(c, str):
parts.append(f" · {c[:200]}")
elif isinstance(c, dict):
parts.append(f" · {c.get('summary', str(c)[:200])}")
return "\n".join(parts)
except Exception:
return ""
Loading
Loading