-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathobsidian_sync.py
More file actions
246 lines (210 loc) · 9.12 KB
/
obsidian_sync.py
File metadata and controls
246 lines (210 loc) · 9.12 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
"""
Obsidian / Notion Sync — RAG over personal knowledge base.
Indexes all markdown notes from an Obsidian vault (or any .md folder)
into vector memory with frontmatter metadata for semantic search.
"""
import os
import re
import json
import hashlib
import logging
from datetime import datetime
from typing import Dict, List, Optional, Any
logger = logging.getLogger("ObsidianSync")
class ObsidianSync:
"""
Syncs a markdown note vault (Obsidian, Logseq, Obsidian, plain .md files)
into the agent's vector memory for Retrieval-Augmented Generation.
Features:
- YAML frontmatter extraction (tags, dates, links)
- Delta sync — only re-indexes changed files (hash-based)
- Chunked indexing — large notes split into ~512-token windows
- Wikilink graph awareness ([[linked-note]] references)
"""
CHUNK_SIZE = 1800 # ~450 tokens per chunk
CHUNK_OVERLAP = 200 # overlap between chunks for context continuity
def __init__(self, vector_memory, database, llm_provider=None):
self.vmem = vector_memory
self.db = database
self.llm = llm_provider
self._index_cache: Dict[str, str] = {} # filepath -> content hash
self._load_index_cache()
logger.info("📚 ObsidianSync initialized.")
# ─── Cache Persistence ───────────────────────────────
def _cache_path(self): return "obsidian_sync_cache.json"
def _load_index_cache(self):
try:
with open(self._cache_path(), "r", encoding="utf-8") as f:
self._index_cache = json.load(f)
except (FileNotFoundError, json.JSONDecodeError):
self._index_cache = {}
def _save_index_cache(self):
try:
with open(self._cache_path(), "w", encoding="utf-8") as f:
json.dump(self._index_cache, f)
except Exception:
pass
def _file_hash(self, filepath: str) -> str:
try:
with open(filepath, "rb") as f:
return hashlib.md5(f.read()).hexdigest()
except Exception:
return ""
# ─── Frontmatter Parsing ─────────────────────────────
def _parse_frontmatter(self, content: str) -> tuple[Dict, str]:
"""Extract YAML frontmatter and return (metadata, body)."""
meta = {}
body = content
if content.startswith("---"):
end = content.find("---", 3)
if end != -1:
fm = content[3:end].strip()
body = content[end + 3:].strip()
# Simple YAML-light parsing
for line in fm.split("\n"):
if ":" in line:
key, _, val = line.partition(":")
meta[key.strip()] = val.strip().strip('"').strip("'")
return meta, body
def _extract_wikilinks(self, text: str) -> List[str]:
"""Extract [[wikilink]] references from note body."""
return re.findall(r'\[\[([^\]]+)\]\]', text)
# ─── Chunking ────────────────────────────────────────
def _chunk_text(self, text: str) -> List[str]:
"""Split text into overlapping chunks for vector indexing."""
chunks = []
start = 0
while start < len(text):
end = start + self.CHUNK_SIZE
chunk = text[start:end]
if chunk.strip():
chunks.append(chunk.strip())
start = end - self.CHUNK_OVERLAP
if start < 0:
start = 0
if end >= len(text):
break
return chunks
# ─── Core Sync ───────────────────────────────────────
def sync_vault(self, vault_path: str, tenant_id: int,
force: bool = False) -> Dict[str, Any]:
"""
Recursively scan vault_path for .md files.
Only re-indexes files that have changed (hash-based delta sync).
Args:
vault_path: Path to Obsidian vault or any markdown folder
tenant_id: Agent tenant ID
force: Force re-index all files even if unchanged
"""
if not os.path.isdir(vault_path):
return {"success": False, "error": f"Vault path not found: {vault_path}"}
indexed = []
skipped = []
errors = []
total_chunks = 0
md_files = []
for root, dirs, files in os.walk(vault_path):
# Skip hidden dirs and .obsidian config
dirs[:] = [d for d in dirs if not d.startswith('.')]
for f in files:
if f.endswith(".md"):
md_files.append(os.path.join(root, f))
logger.info(f"📚 Found {len(md_files)} markdown files in vault.")
for filepath in md_files:
try:
current_hash = self._file_hash(filepath)
cached_hash = self._index_cache.get(filepath, "")
if not force and current_hash == cached_hash:
skipped.append(os.path.basename(filepath))
continue
result = self._index_note(filepath, tenant_id)
if result["success"]:
indexed.append(os.path.basename(filepath))
total_chunks += result.get("chunks", 0)
self._index_cache[filepath] = current_hash
else:
errors.append(f"{os.path.basename(filepath)}: {result.get('error')}")
except Exception as e:
errors.append(f"{os.path.basename(filepath)}: {e}")
self._save_index_cache()
summary = {
"success": True,
"vault": vault_path,
"total_files": len(md_files),
"indexed": len(indexed),
"skipped_unchanged": len(skipped),
"errors": len(errors),
"total_chunks": total_chunks,
"error_list": errors[:5],
}
logger.info(f"✅ Vault sync complete: {len(indexed)} indexed, {len(skipped)} unchanged, {len(errors)} errors")
return summary
def _index_note(self, filepath: str, tenant_id: int) -> Dict[str, Any]:
"""Index a single markdown note into vector memory."""
try:
with open(filepath, "r", encoding="utf-8", errors="ignore") as f:
content = f.read()
except Exception as e:
return {"success": False, "error": str(e)}
if not content.strip():
return {"success": False, "error": "Empty file"}
filename = os.path.basename(filepath)
note_name = os.path.splitext(filename)[0]
# Parse frontmatter
meta, body = self._parse_frontmatter(content)
tags = meta.get("tags", "").replace(",", " ").split()
links = self._extract_wikilinks(body)
# Build metadata dict for vector store
vmeta = {
"category": "vault_note",
"type": "obsidian",
"filename": filename,
"note_name": note_name,
"source": filepath,
"tags": " ".join(tags[:10]),
"links": " ".join(links[:10]),
"date": meta.get("date", meta.get("created", "")),
}
# Chunk and store
chunks = self._chunk_text(body)
if not chunks:
chunks = [body[:self.CHUNK_SIZE]]
file_hash = self._file_hash(filepath)
for i, chunk in enumerate(chunks):
doc_id = f"obsidian_{file_hash}_{i}"
header = f"[Note: {note_name}"
if tags:
header += f" | Tags: {', '.join(tags[:5])}"
header += "]\n"
self.vmem.add(
tenant_id=tenant_id,
text=header + chunk,
metadata={**vmeta, "chunk": str(i)},
doc_id=doc_id
)
return {"success": True, "note": note_name, "chunks": len(chunks),
"tags": tags, "links": links}
# ─── Querying ────────────────────────────────────────
def query_vault(self, tenant_id: int, question: str, n: int = 5) -> str:
"""Semantic search over indexed vault notes."""
results = self.vmem.search(
tenant_id, question, n_results=n,
# category="vault_note" # filter if your vmem supports it
)
if not results:
return "No relevant notes found in your vault."
lines = [f"🗂️ From your knowledge vault ({len(results)} relevant notes):"]
seen = set()
for r in results:
text = r.get("text", "")[:500]
if text in seen:
continue
seen.add(text)
note = r.get("metadata", {}).get("note_name", "Unknown note")
lines.append(f"\n📄 **{note}**\n{text}")
return "\n".join(lines)
def get_status(self) -> Dict[str, Any]:
return {
"indexed_files": len(self._index_cache),
"cache_path": self._cache_path(),
}