-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmnemo_mcp.py
More file actions
3983 lines (3344 loc) · 148 KB
/
mnemo_mcp.py
File metadata and controls
3983 lines (3344 loc) · 148 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
"""
mnemo MCP server — content-addressed project memory for Claude Code
Exposes mnemo operations as MCP tools that Claude can call natively.
Run with: claude mcp add mnemo -- uv run --with fastmcp fastmcp run mnemo_mcp.py
Or for development:
uv run --with fastmcp fastmcp dev mnemo_mcp.py
Tools:
memory_claim — commit a project fact (or batch of facts) to the memory tree
memory_update — supersede an existing claim with new info
memory_reinforce — mark an existing claim as still current
memory_link — create a directional relationship between two nodes
memory_query — look up a node by address (prefix ok)
memory_verify — verify anchored claims against the actual codebase
memory_search — search active memory by keyword
memory_provenance — trace a claim back to its origins
memory_compress — compress a set of nodes into a summary
memory_session_compress — compress current work cycle into a summary
memory_status — active set size, pressure level, last root
memory_diff — what changed since last root
memory_soul — generate the current project knowledge document
memory_reroot — recompute the root from active nodes
memory_infer — passive pattern inference from session logs
memory_help — return usage guide for claude-code / quick / all
memory_write — write a file + auto-claim the change
memory_edit — edit a file + stale warnings + auto-claim
memory_glob — glob with tree coverage annotation per file
Session continuity:
mnemo_handoff.py generates structured handoff nodes at session compress time
and provides first-recall orientation priming on session start (turns 1-2).
"""
import json
import os
import time
from fastmcp import FastMCP
from mnemo import (
Store, Node, GENESIS,
supersede, compress, reroot,
build_active_context,
propose_supersessions,
generate_soul_doc,
discover_store,
)
from mnemo_associate import associate, retrieve_relevant
from mnemo_log import emit, configure as log_configure
from mnemo_explore import explore as _explore
from mnemo_grep import grep as _grep
from mnemo_plan import plan as _plan
from mnemo_read import read as _read
from mnemo_infer import infer as _infer
from mnemo_handoff import generate_handoff, build_orientation
from mnemo_arc import (
create_arc, update_arc, complete_arc, pause_arc,
find_active_arcs, match_session_to_arcs, detect_arc_candidates,
)
from mnemo_anchor import compute_content_hash, update_file_index
from mnemo_map import map_path as _map_path
from mnemo_coverage import coverage as _coverage, format_report as _format_coverage
from mnemo_session import (
load_or_create_session, get_session_store,
promote_chain, promote_nodes, promote_all_preliminary,
archive_session, session_summary, gc_sessions,
list_preliminary_chains,
)
from mnemo_fs import (
get_project_root as _get_project_root,
normalize_path as _normalize_path,
nodes_for_file as _nodes_for_file,
check_stale_anchors as _check_stale_anchors,
auto_claim as _auto_claim,
fs_write as _fs_write,
fs_edit as _fs_edit,
fs_glob as _fs_glob,
format_glob_with_coverage as _format_glob_with_coverage,
)
# --- Project registry ---
REGISTRY_DIR = os.path.expanduser("~/.mnemo")
REGISTRY_PATH = os.path.join(REGISTRY_DIR, "projects.json")
def _load_registry() -> dict[str, str]:
"""Load project name -> store path mapping. Canonical format: {name: path}."""
try:
with open(REGISTRY_PATH, "r", encoding="utf-8") as f:
data = json.load(f)
if isinstance(data, dict):
return {k: v for k, v in data.items() if isinstance(v, str)}
if isinstance(data, list):
# Recover from legacy list format
result = {}
for entry in data:
if isinstance(entry, dict) and "path" in entry:
import os as _os
name = entry.get("name") or _os.path.basename(_os.path.dirname(entry["path"]))
result[name] = entry["path"]
return result
return {}
except (FileNotFoundError, json.JSONDecodeError):
return {}
def _save_registry(registry: dict[str, str]) -> None:
"""Persist registry. Atomic write."""
os.makedirs(REGISTRY_DIR, exist_ok=True)
tmp = REGISTRY_PATH + ".tmp"
with open(tmp, "w", encoding="utf-8") as f:
json.dump(registry, f, indent=2)
os.replace(tmp, REGISTRY_PATH)
def _register_project(name: str, store_path: str) -> None:
"""Add/update a project in the registry."""
registry = _load_registry()
registry[name] = os.path.abspath(store_path)
_save_registry(registry)
def _resolve_project(name: str) -> Store | None:
"""Look up a project by name, return a Store instance."""
registry = _load_registry()
path = registry.get(name)
if path and os.path.isdir(path):
return Store(path)
return None
# --- Configuration ---
def _detect_store_path() -> tuple[str, bool]:
"""
Auto-detect project store using v2 discover_store() semantics.
Returns (store_path_str, is_v2).
v2: found .mnemo/ by walking up from CWD.
v1-compat: fell back to MNEMO_STORE env or ~/mnemo.
MNEMO_PROJECT_DIR overrides the CWD used for discovery — useful when
the MCP server is launched from a different working directory (e.g.
from Claude Desktop, which doesn't support the cwd config field).
"""
project_dir_override = os.environ.get("MNEMO_PROJECT_DIR", "")
store_path, is_v2 = discover_store(cwd=project_dir_override or None)
store_path_str = str(store_path)
if is_v2:
# Auto-register v2 project store by directory name
from pathlib import Path as _Path
project_dir = _Path(store_path_str).parent
dir_name = project_dir.name
_register_project(dir_name, store_path_str)
return store_path_str, is_v2
_store_path_result = _detect_store_path()
STORE_PATH = _store_path_result[0]
STORE_IS_V2 = _store_path_result[1]
GLOBAL_PATH = os.environ.get("MNEMO_GLOBAL", os.path.expanduser("~/.mnemo/global"))
COMPRESS_INTERVAL = int(os.environ.get("MNEMO_COMPRESS_INTERVAL", "15"))
store = Store(STORE_PATH)
global_store = Store(GLOBAL_PATH)
log_configure(STORE_PATH)
emit("status", "system",
f"mnemo MCP session started — project: {STORE_PATH}, global: {GLOBAL_PATH}")
# --- Session tracking ---
_session_turns = 0
_session_addrs: list[str] = [] # node addresses created/modified this cycle
_recalled_recent: list[set[str]] = [] # addresses surfaced on recent turns (last 5)
_file_visits: dict[str, int] = {} # basename -> read count this session
# --- Session store (v2 only) ---
_session_id: str = ""
_session_store: Store | None = None # ephemeral working memory
_recently_extended_chain_ids: set[str] = set() # chains extended this session (continuity boost)
def _init_session_store():
"""Initialize the session store if running against a v2 project store."""
global _session_id, _session_store
if STORE_IS_V2:
from pathlib import Path as _Path
state_path = _Path(STORE_PATH) / "session_state.json"
_session_id, _session_store = load_or_create_session(store, state_path)
_init_session_store()
def _is_recently_recalled(addr: str) -> bool:
"""Check if an address was surfaced in any of the last 5 recall turns."""
return any(addr in turn_set for turn_set in _recalled_recent)
def _get_store(project: str = "") -> Store:
"""Get a store by project name, or the active store if empty."""
if not project:
return store
resolved = _resolve_project(project)
if not resolved:
raise ValueError(f"Unknown project '{project}'. Use memory_projects() to list registered projects.")
return resolved
def _save_session_state():
"""Persist session cycle state to disk so it survives MCP restarts."""
state = {
"session_turns": _session_turns,
"session_addrs": _session_addrs,
"recalled_recent": [sorted(s) for s in _recalled_recent],
"file_visits": _file_visits,
"session_id": _session_id,
"recently_extended_chain_ids": sorted(_recently_extended_chain_ids),
"saved_at": time.time(),
}
path = os.path.join(STORE_PATH, "session_state.json")
tmp_path = path + ".tmp"
try:
with open(tmp_path, "w", encoding="utf-8") as f:
json.dump(state, f)
os.replace(tmp_path, path) # atomic on both POSIX and Windows
except Exception:
# Clean up temp file if rename failed
try:
os.unlink(tmp_path)
except OSError:
pass
def _load_session_state():
"""Restore session state from disk. Discards if >2 hours stale."""
global _session_turns, _session_addrs, _recalled_recent, _file_visits
global _recently_extended_chain_ids
path = os.path.join(STORE_PATH, "session_state.json")
try:
with open(path, "r", encoding="utf-8") as f:
state = json.load(f)
# Discard if stale (>2 hours)
if time.time() - state.get("saved_at", 0) > 7200:
return
_session_turns = state.get("session_turns", 0)
_session_addrs = state.get("session_addrs", [])
_recalled_recent = [set(s) for s in state.get("recalled_recent", [])]
_file_visits = state.get("file_visits", {})
_recently_extended_chain_ids = set(state.get("recently_extended_chain_ids", []))
except (FileNotFoundError, json.JSONDecodeError, Exception):
pass # no state to restore, start fresh
_load_session_state()
def _build_session_context() -> dict:
"""Build the session context dict passed to retrieval and read tools."""
return {
"session_addrs": set(_session_addrs),
"recalled_recent": _recalled_recent,
"recently_extended_chain_ids": _recently_extended_chain_ids,
}
mcp = FastMCP("mnemo", instructions=f"""You have a project memory. It persists across sessions.
This is the project's knowledge base — what's been learned about the
codebase, what decisions were made and why, what conventions to follow,
what's known to break, what's been tried before. When you start a fresh
session, this is how you get oriented fast instead of rediscovering
everything from scratch.
EVERY TURN, call memory_recall with what the user just said. Every
single turn. This is how you access project context. Don't think about
whether to do it — just do it. The result tells you what the project
already knows. Let it inform your response naturally.
memory_claim — when something worth preserving happens. Architecture
decisions, conventions established, bugs discovered, dependency
constraints, module responsibilities, approaches that worked or failed.
Don't store noise. Store what would make a future instance productive
on this project faster. The bar: "would a fresh instance need this?"
memory_update — when project knowledge has changed. A dependency was
upgraded, an approach was abandoned, a module was restructured, a bug
was fixed. Always reference what it replaces. The old knowledge stays
addressable but the active path moves forward.
memory_reinforce — when existing knowledge gets confirmed as still
current. Doesn't create new nodes, just marks existing ones as fresh.
memory_link — when two pieces of knowledge are related. "This decision
was caused_by this constraint." "This pattern depends_on this dependency."
Links turn the tree into a graph — recall follows them automatically.
Relationship types: relates_to, caused_by, depends_on, blocks, enables, contradicts.
memory_compress — when memory_status shows pressure, or when a cluster
of related facts can be summarized. Compression is lossy in content
but lossless in provenance.
Domains: architecture, decisions, patterns, tasks, issues, dependencies, history, context
The memory store is at: {STORE_PATH}
""")
# ===================================================================
# Associative recall — direct retrieval, no LLM intermediary
# ===================================================================
@mcp.tool()
def memory_recall(message: str, project: str = "") -> str:
"""
Associative recall. Call this every turn with what the user said.
Surfaces what the project already knows that's relevant.
Depth adapts automatically — brief/trivial messages get minimal context,
substantive questions get full context. You don't need to manage this.
When project is specified, queries that project's store instead of the
active one. Cross-project recall is lightweight — no extraction or
session tracking.
Args:
message: What the user said (or a brief summary of the topic)
project: Optional project name to query instead of the active store
"""
# Cross-project recall — lightweight, no session tracking or extraction
if project:
try:
target = _get_store(project)
except ValueError as e:
return str(e)
relevant = retrieve_relevant(message, target, max_nodes=5)
if not relevant:
return f"(nothing found in {project})"
lines = [f"From {project}:"]
for item in relevant:
domain = item["node"].meta.get("domain", "?")
content = item["node"].content[:120]
addr = item["node"].addr[:8]
lines.append(f" [{domain}] {content} [{addr}]")
return "\n".join(lines)
global _session_turns
_session_turns += 1
is_session_start = _session_turns <= 2 and not _session_addrs
# Build session context for temporal relevance scoring
session_context = _build_session_context()
# Adaptive retrieval — max_nodes=0 lets signal density control depth
result = associate(message, store, narrative=True, max_nodes=0,
session_context=session_context)
density = result.get("signal_density", "medium")
# First-recall priming — inject orientation context on session start
orientation = ""
if is_session_start:
orientation_text = build_orientation(store, global_store)
if orientation_text:
orientation = orientation_text + "\n\n"
emit("recall", "subconscious", "session orientation injected",
detail={"session_start": True})
if not result["preload"]:
emit("recall", "subconscious", "nothing surfaced",
detail={"message": message[:100], "relevant_count": 0,
"session_turns": _session_turns, "density": density})
recall_body = orientation or "(nothing comes to mind)"
else:
emit("recall", "subconscious", result["preload"],
addresses=result["relevant_addrs"],
detail={"message": message[:100],
"relevant_count": result["relevant_count"],
"tension_count": result["tension_count"],
"session_turns": _session_turns, "density": density})
preload = result["preload"]
# If only always-active pinned nodes surfaced, signal it — no domain hits
if result["relevant_count"] > 0 and "── Chain" not in preload and not is_session_start:
preload += "\n\n(no chain hits — always-active context only)"
recall_body = orientation + preload
response = recall_body
# Track what was recalled for session affinity on future turns
_recalled_recent.append(set(result["relevant_addrs"]))
if len(_recalled_recent) > 5:
_recalled_recent.pop(0)
_save_session_state()
# Update recall metadata on surfaced nodes
for addr in result["relevant_addrs"]:
node = store.get(addr)
if node:
node.meta["recall_count"] = node.meta.get("recall_count", 0) + 1
node.meta["last_recalled"] = time.time()
store.put(node)
# Query global store for cross-project knowledge (user prefs, general patterns)
if global_store.get_active():
global_relevant = retrieve_relevant(message, global_store, max_nodes=3)
if global_relevant:
fragments = []
for item in global_relevant:
# Apply penalty — project context takes priority
if item["score"] * 0.7 > 0.5:
content = item["node"].content
addr = item["node"].addr
fragments.append(f"{content} [{addr[:8]}]")
if fragments:
response += "\n\nGlobal context: " + " — ".join(fragments)
# Cross-project fallback — when active store has sparse hits, check other
# registered projects. Capped at 3 total hits to avoid noise.
# Only fires when relevant_count < 2 (active store nearly empty on this topic).
if result["relevant_count"] < 2:
registry = _load_registry()
cross_hits = []
active_store_path = os.path.abspath(STORE_PATH)
for proj_name, proj_path in registry.items():
if not os.path.isdir(proj_path):
continue
if os.path.abspath(proj_path) == active_store_path:
continue # skip the active project
if len(cross_hits) >= 3:
break
try:
from mnemo import Store as _Store
proj_store = _Store(proj_path)
if not proj_store.get_active():
continue
proj_relevant = retrieve_relevant(message, proj_store, max_nodes=2)
for item in proj_relevant[:2]:
if item["score"] > 0.6 and len(cross_hits) < 3:
cross_hits.append((proj_name, item["node"]))
except Exception:
continue
if cross_hits:
fragments = []
for proj_name, node in cross_hits:
domain = node.meta.get("domain", "?")
fragments.append(
f" [{proj_name}/{domain}] {node.content[:120]} [{node.addr[:8]}]"
)
response += "\n\nCross-project context:\n" + "\n".join(fragments)
emit("recall", "subconscious",
f"cross-project: {len(cross_hits)} hit(s) from other projects",
detail={"projects": list({p for p, _ in cross_hits}),
"count": len(cross_hits)})
# Auto-compress when threshold reached — don't nudge, just do it
if _session_turns >= COMPRESS_INTERVAL and _session_addrs:
# Derive a summary from the cycle nodes (domain breakdown + snippets)
active = store.get_active()
cycle_nodes = [store.get(a) for a in _session_addrs if a in active]
cycle_nodes = [n for n in cycle_nodes if n]
domain_counts: dict[str, int] = {}
for n in cycle_nodes:
d = n.meta.get("domain", "context")
domain_counts[d] = domain_counts.get(d, 0) + 1
domain_str = ", ".join(
f"{d}({c})" for d, c in
sorted(domain_counts.items(), key=lambda x: -x[1])
)
snippets = "; ".join(n.content[:60] for n in cycle_nodes[:3])
captured_turns = _session_turns
captured_count = len(_session_addrs)
auto_summary = f"auto-compress turn {captured_turns}: [{domain_str}] {snippets}"
memory_session_compress(auto_summary)
response += (
f"\n\n[Auto-compressed {captured_count} nodes at turn {captured_turns}.]"
)
# Nudge for claiming when significant work has happened without claims
elif _session_turns >= 10 and not _session_addrs:
response += (
"\n\n[Claim nudge: 10+ turns with no claims stored. "
"If decisions were made or reasoning discussed, "
"capture the 'why' with memory_claim before it fades.]"
)
# Queue extraction for this turn (runs in background thread)
return response
# ===================================================================
# Core tools
# ===================================================================
@mcp.tool()
def memory_claim(content: str = "", domain: str = "", confidence: float = 0.8,
batch: list[dict] = None,
scope: str = "project",
anchors: list[dict] = None,
priority: float = 0,
project: str = "",
chain_id: str = "",
chain_name: str = "",
preliminary: bool = False,
agent_id: str = "",
ttl_days: float = 0) -> str:
"""
Commit facts to the memory tree.
Can store a single claim via the content/domain params, or multiple
claims at once via the batch param. Use batch when you have several
facts to store — it's one tool call instead of many.
Scope controls where the claim is stored:
- "project" (default): stored in the project-specific memory tree
- "global": stored in the cross-project global memory (~/.mnemo/global/)
Use global for user preferences, general conventions, workflow patterns,
and knowledge that transcends any single codebase.
Chain assignment (v2 stores):
- chain_id: append this node to an existing chain (use memory_chains to list)
- chain_name: create a new chain with this node as its seed (sets the summary)
- Neither: node is standalone; extraction sidecar may propose chain membership later
Args:
content: The claim as a standalone fact (ignored if batch is provided)
domain: Category — architecture, decisions, patterns, tasks, issues, dependencies, history, context (ignored if batch is provided)
confidence: 0.0-1.0 how established this fact is (ignored if batch is provided)
batch: List of claims, each with keys: content (str), domain (str), confidence (float, optional, default 0.8), scope (str, optional, default "project"), anchors (list, optional), priority (float, optional, default 0)
scope: "project" or "global" — where to store the claim (ignored if batch is provided)
anchors: Optional verification anchors — list of dicts like {"type": "file", "path": "..."}, {"type": "grep", "pattern": "...", "path": "..."}, {"type": "dependency", "name": "..."} (ignored if batch is provided)
priority: Score boost for high-importance nodes (0 = normal, 0.5 = moderate, 1.0 = high). Use for user preferences, working agreements, critical invariants. (ignored if batch is provided)
project: Optional project name — store the claim in that project's tree instead of the active one
chain_id: Append this node to an existing chain (v2 stores only)
chain_name: Create a new chain with this node as seed; chain_name becomes the summary (v2 stores only)
preliminary: If True (v2 only), store in the session store as a preliminary node.
Preliminary nodes won't appear in project recall until promoted via memory_promote.
Use for work-in-progress reasoning you're not ready to commit.
agent_id: Agent attribution (v2 multi-agent). Tags the node with the owning agent.
Used for retrieval boosting and chain-diff output. Optional in single-agent sessions.
ttl_days: Time-to-live in days. Node is silently skipped in retrieval after this
many days from creation. Use for ephemeral working notes, task state,
and anything that's only relevant for the current sprint. 0 = no expiry.
"""
claims = []
if batch:
claims = batch
elif not content or not domain:
return "Either provide content + domain for a single claim, or batch for multiple."
else:
claims = [{"content": content, "domain": domain,
"confidence": confidence, "scope": scope,
"anchors": anchors, "priority": priority}]
results = []
claimed_addrs = []
for claim in claims:
c = claim.get("content", "")
d = claim.get("domain", "")
conf = claim.get("confidence", 0.8)
claim_scope = claim.get("scope", "project")
claim_priority = claim.get("priority", 0)
if not c or not d:
results.append(f"Skipped (missing content or domain): {c[:40]}")
continue
# Resolve target store: preliminary > project param > scope > active store
use_session = (preliminary and STORE_IS_V2
and _session_store is not None
and claim_scope == "project"
and not project)
if use_session:
target_store = _session_store
elif project:
try:
target_store = _get_store(project)
except ValueError as e:
return str(e)
elif claim_scope == "global":
target_store = global_store
else:
target_store = store
meta = {
"domain": d,
"confidence": conf,
"source": "live",
"scope": claim_scope,
}
if claim_priority:
meta["priority"] = claim_priority
if agent_id:
meta["agent_id"] = agent_id
claim_ttl = claim.get("ttl_days", ttl_days)
if claim_ttl:
meta["ttl_days"] = claim_ttl
# Attach verified anchors if provided
claim_anchors = claim.get("anchors") or anchors
if claim_anchors:
from mnemo_verify import validate_anchor
processed = []
for a in claim_anchors:
if not validate_anchor(a) and a.get("type") != "content_hash":
continue
# Auto-compute content_hash when context_lines provided but hash missing
if (a.get("type") == "content_hash"
and a.get("context_lines")
and not a.get("content_hash")):
a = dict(a) # don't mutate caller's dict
a["content_hash"] = compute_content_hash(a["context_lines"])
processed.append(a)
if processed:
meta["anchors"] = processed
node = Node(
type="leaf",
content=c,
meta=meta,
)
target_store.put(node)
active = target_store.get_active()
active.add(node.addr)
target_store.set_active(active)
# Register content_hash anchors in the file index
if meta.get("anchors"):
update_file_index(target_store, node)
# Only track project-scope claims in session (global is user-level)
if claim_scope == "project" and not use_session:
_session_addrs.append(node.addr)
# Chain assignment (v2 stores only, project scope only)
chain_tag = ""
if claim_scope == "project" and STORE_IS_V2:
try:
from mnemo_chains import extend_chain, create_chain
if chain_id:
ok = extend_chain(target_store, chain_id, node.addr)
if ok:
chain_tag = f" (→ chain {chain_id[:10]})"
_recently_extended_chain_ids.add(chain_id)
else:
chain_tag = f" (chain {chain_id[:10]} not found)"
elif chain_name:
chain_authority = 0.0
new_chain_id = create_chain(
target_store, node.addr,
domain=d, summary=chain_name,
agent_id=agent_id or None,
authority=chain_authority,
)
chain_tag = f" (new chain {new_chain_id})"
_recently_extended_chain_ids.add(new_chain_id)
except Exception:
pass # chain ops are non-fatal
claimed_addrs.append(node.addr)
scope_tag = " [global]" if claim_scope == "global" else ""
prelim_tag = " [preliminary]" if use_session else ""
emit("claim", "conscious", f"[{d}]{scope_tag}{prelim_tag} {c}",
addresses=[node.addr], domain=d,
detail={"confidence": conf, "scope": claim_scope,
"preliminary": use_session,
"chain_id": chain_id or None,
"chain_name": chain_name or None})
results.append(
f"Stored [{d}]{scope_tag}{prelim_tag} {node.addr}: {c}{chain_tag}"
)
_save_session_state()
return "\n".join(results)
@mcp.tool()
def memory_update(old_address: str, new_content: str, reason: str = "",
domain: str = "") -> str:
"""
Supersede an existing claim with updated information.
The old claim remains addressable but the active path routes through the new one.
Domain and confidence are inherited from the old node unless overridden.
Args:
old_address: Address (or prefix) of the claim being replaced
new_content: The updated claim text
reason: Why this changed (e.g. "dependency upgraded", "module restructured")
domain: Override domain (leave empty to inherit from old node)
"""
old = store.get(old_address)
if not old:
emit("update", "conscious", f"NOT FOUND: {old_address}")
return f"Not found: {old_address}"
new_addr = supersede(old.addr, new_content, store, reason=reason, domain=domain)
if _is_recently_recalled(old.addr):
new_node = store.get(new_addr)
if new_node:
new_node.meta["recall_hits"] = new_node.meta.get("recall_hits", 0) + 1
store.put(new_node)
_session_addrs.append(new_addr)
_save_session_state()
emit("update", "conscious",
f"{old.addr[:8]} -> {new_addr[:8]}: {new_content}",
addresses=[old.addr, new_addr],
domain=old.meta.get("domain"),
detail={"reason": reason, "old_content": old.content})
return f"Updated {old.addr[:8]} -> {new_addr}: {new_content}"
@mcp.tool()
def memory_reinforce(address: str) -> str:
"""
Mark an existing claim as still current. Bumps its freshness
without creating a new node. Use when you verify something still holds.
Args:
address: Address (or prefix) of the claim to reinforce
"""
node = store.get(address)
if not node:
emit("reinforce", "conscious", f"NOT FOUND: {address}")
return f"Not found: {address}"
node.meta["last_reinforced"] = time.time()
count = node.meta.get("reinforcement_count", 0) + 1
node.meta["reinforcement_count"] = count
if _is_recently_recalled(node.addr):
node.meta["recall_hits"] = node.meta.get("recall_hits", 0) + 1
store.put(node)
_save_session_state()
emit("reinforce", "conscious",
f"{node.addr[:8]}: {node.content}",
addresses=[node.addr],
domain=node.meta.get("domain"),
detail={"reinforcement_count": count})
return f"Reinforced {node.addr}: {node.content[:60]}"
@mcp.tool()
def memory_link(source: str, target: str, rel: str = "relates_to") -> str:
"""
Create a directional link between two nodes, turning the tree into a graph.
Links are metadata — they don't change node addresses or provenance.
When recall surfaces the source node, linked nodes get a relevance
boost proportional to the source's score. Causal links propagate stronger.
Args:
source: Address of the node to link FROM
target: Address of the node to link TO
rel: Relationship type — relates_to, caused_by, depends_on, blocks, enables, contradicts
"""
valid_rels = {"relates_to", "caused_by", "depends_on", "blocks", "enables", "contradicts"}
if rel not in valid_rels:
return f"Invalid rel '{rel}'. Valid: {', '.join(sorted(valid_rels))}"
src = store.get(source)
if not src:
return f"Source not found: {source}"
tgt = store.get(target)
if not tgt:
return f"Target not found: {target}"
# Add link to source node's meta
links = src.meta.get("links", [])
# Don't duplicate
for existing in links:
if existing.get("addr") == tgt.addr and existing.get("rel") == rel:
return f"Link already exists: {src.addr[:8]} --{rel}--> {tgt.addr[:8]}"
links.append({"addr": tgt.addr, "rel": rel})
src.meta["links"] = links
if _is_recently_recalled(src.addr):
src.meta["recall_hits"] = src.meta.get("recall_hits", 0) + 1
store.put(src)
_save_session_state()
emit("link", "conscious",
f"{src.addr[:8]} --{rel}--> {tgt.addr[:8]}",
addresses=[src.addr, tgt.addr],
detail={"rel": rel,
"source_content": src.content[:60],
"target_content": tgt.content[:60]})
return (
f"Linked: {src.addr[:8]} --{rel}--> {tgt.addr[:8]}\n"
f" from: {src.content[:60]}\n"
f" to: {tgt.content[:60]}"
)
# ===================================================================
# Query tools
# ===================================================================
@mcp.tool()
def memory_query(address: str) -> str:
"""
Look up a node by its address (prefix matching supported).
Args:
address: Full address or prefix (e.g. "a7f3c2")
"""
node = store.get(address)
if not node:
return f"Not found: {address}"
age_days = int((time.time() - node.created) / 86400)
reinforced = node.meta.get("last_reinforced")
r_info = ""
if reinforced:
r_days = int((time.time() - reinforced) / 86400)
r_count = node.meta.get("reinforcement_count", 0)
r_info = f"\nReinforced: {r_count}x, last {r_days}d ago"
# Preserved values from compression
pv_info = ""
preserved = node.meta.get("preserved_values")
if preserved:
pv_lines = [p["fragment"] for p in preserved[:20]]
pv_info = "\nPreserved values:\n " + "\n ".join(pv_lines)
# Coverage score for compress nodes
coverage = node.meta.get("coverage_score")
c_info = f"\nCoverage: {coverage:.0%}" if coverage is not None else ""
# Reverse links — what links TO this node
rev_links = store.get_reverse_links(node.addr)
rl_info = ""
if rev_links:
rl_lines = []
for rl in rev_links:
src = store.get(rl["source_addr"])
preview = src.content[:50] if src else "?"
rl_lines.append(f" {rl['source_addr'][:8]} --{rl['rel']}--> here: {preview}")
rl_info = "\nLinked FROM:\n" + "\n".join(rl_lines)
# Anchors display
anchor_info = ""
node_anchors = node.meta.get("anchors")
if node_anchors:
anchor_lines = []
for a in node_anchors:
atype = a.get("type", "?")
if atype == "file":
anchor_lines.append(f" file: {a.get('path', '?')}")
elif atype == "grep":
path_part = f" in {a['path']}" if a.get("path") else ""
anchor_lines.append(f" grep: '{a.get('pattern', '?')}'{path_part}")
elif atype == "dependency":
anchor_lines.append(f" dependency: {a.get('name', '?')}")
if anchor_lines:
anchor_info = "\nAnchors:\n" + "\n".join(anchor_lines)
emit("query", "conscious",
f"{node.addr[:8]} [{node.meta.get('domain', '?')}] {age_days}d: {node.content}",
addresses=[node.addr], domain=node.meta.get("domain"))
return (
f"addr: {node.addr}\n"
f"type: {node.type}\n"
f"domain: {node.meta.get('domain', 'n/a')}\n"
f"age: {age_days}d\n"
f"inputs: {node.inputs}{r_info}{c_info}{pv_info}{rl_info}{anchor_info}\n"
f"content: {node.content}"
)
@mcp.tool()
def memory_graph(
address: str,
depth: int = 2,
rel_types: str = "",
direction: str = "both",
) -> str:
"""
Traverse the link graph from a node and render the subgraph.
Unlike recall (which uses links to boost scores implicitly), this tool
makes the graph structure explicit — useful for understanding how a decision
connects to architecture, or how a bug relates to a dependency chain.
Args:
address: Node address to start from (prefix matching supported)
depth: Hops to traverse (default 2; max useful is 3-4)
rel_types: Comma-separated relationship types to follow
(default: all — caused_by, depends_on, blocks, enables,
relates_to, contradicts)
direction: "forward" (this node → others), "reverse" (others → this),
or "both" (default)
"""
from mnemo_graph import traverse_graph, render_graph
node = store.get(address)
if not node:
return f"Not found: {address}"
rels = [r.strip() for r in rel_types.split(",") if r.strip()] or None
depth = max(1, min(depth, 5)) # clamp to reasonable range
result = traverse_graph(store, node.addr, depth=depth,
rel_types=rels, direction=direction)
rendered = render_graph(result)
emit("graph", "conscious",
f"graph from {node.addr[:8]} depth={depth} nodes={len(result['nodes'])}",
addresses=[node.addr],
detail={"depth": depth, "nodes": len(result["nodes"]),
"edges": len(result["edges"])})
return rendered
@mcp.tool()
def memory_gap(topic: str, context: str = "") -> str:
"""
Record a knowledge gap — something you're uncertain about that's
potentially significant.
Gaps surface in future recall like any issues node, so they can be
answered by a future instance, another agent, or a conscious claim.
Resolve a gap with memory_update(old_addr=<gap_addr>, content=<answer>).
Args:
topic: What you don't know (e.g. "how _find_relevant_nodes ranks results")
context: Why it matters / where you encountered it (optional)
"""
content = f"[GAP] {topic}"
if context:
content += f" — {context}"
node = Node(
type="leaf",
content=content,
meta={
"domain": "issues",
"source": "conscious",
"gap": True,
"session_id": _session_id or "",
},
)
store.put(node)
active = store.get_active()
active.add(node.addr)
store.set_active(active)
_save_session_state()
emit("claim", "conscious", f"gap recorded: {topic[:80]}",
addresses=[node.addr],
detail={"topic": topic, "context": context})
return (
f"Gap recorded: [{node.addr[:8]}]\n"
f" {content}\n"
f"Resolve with: memory_update(old_addr='{node.addr[:8]}', content='<answer>')"
)
@mcp.tool()
def memory_ask(question: str, context: str = "", target_agent: str = "") -> str:
"""
Record a pending decision or question that needs ratification.
Unlike memory_gap (I don't know this), memory_ask is for choices you've
made that need confirmation, or questions directed at another agent.
Surfaces in recall with [ASK] marker until resolved.
Args:
question: The question or decision to ratify
context: Why it matters / what's at stake (optional)
target_agent: Auto-ping this agent with the question (optional)
"""
content = f"[ASK] {question}"
if context:
content += f" — {context}"
node = Node(
type="leaf",
content=content,
meta={
"domain": "issues",
"source": "conscious",
"ask": True,
"session_id": _session_id or "",
},
)
store.put(node)
active = store.get_active()
active.add(node.addr)
store.set_active(active)
_save_session_state()
emit("claim", "conscious", f"ask recorded: {question[:80]}",
addresses=[node.addr],
detail={"question": question, "target_agent": target_agent})
result = (
f"Ask recorded: [{node.addr[:8]}]\n"