-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathprotocols.py
More file actions
155 lines (111 loc) · 4.58 KB
/
protocols.py
File metadata and controls
155 lines (111 loc) · 4.58 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
"""Protocol definitions for pluggable backends and extensions."""
from __future__ import annotations
from collections.abc import Sequence
from typing import Literal, Protocol
from synaptic.models import (
ConsolidationLevel,
DigestResult,
Edge,
EdgeKind,
Node,
NodeKind,
)
class StorageBackend(Protocol):
"""Storage backend for nodes and edges."""
# Lifecycle
async def connect(self) -> None: ...
async def close(self) -> None: ...
# Node CRUD
async def save_node(self, node: Node) -> None: ...
async def get_node(self, node_id: str) -> Node | None: ...
async def update_node(self, node: Node) -> None: ...
async def delete_node(self, node_id: str) -> None: ...
async def list_nodes(
self,
*,
kind: str | NodeKind | None = None,
level: ConsolidationLevel | None = None,
limit: int = 100,
) -> list[Node]: ...
# Edge CRUD
async def save_edge(self, edge: Edge) -> None: ...
async def get_edges(
self, node_id: str, *, direction: Literal["both", "incoming", "outgoing"] = "both"
) -> list[Edge]: ...
async def update_edge(self, edge: Edge) -> None: ...
async def delete_edge(self, edge_id: str) -> None: ...
# Batch read
async def get_nodes_batch(self, node_ids: list[str]) -> list[Node]: ...
# Count
async def count_nodes(
self,
*,
kind: str | NodeKind | None = None,
category: str | None = None,
year: int | None = None,
) -> int: ...
# Search
async def search_fts(self, query: str, *, limit: int = 20) -> list[Node]: ...
async def search_fuzzy(
self, query: str, *, limit: int = 20, threshold: float = 0.3
) -> list[Node]: ...
async def search_vector(self, embedding: list[float], *, limit: int = 20) -> list[Node]: ...
# Graph traversal
async def get_neighbors(self, node_id: str, *, depth: int = 1) -> list[tuple[Node, Edge]]: ...
# Batch
async def save_nodes_batch(self, nodes: Sequence[Node]) -> None: ...
async def save_edges_batch(self, edges: Sequence[Edge]) -> None: ...
# Maintenance
async def prune_edges(self, *, weight_below: float = 0.1) -> int: ...
async def decay_vitality(self, *, factor: float = 0.95) -> int: ...
class GraphTraversal(Protocol):
"""Extended protocol for graph-native backends (Neo4j, etc.).
Provides advanced traversal operations beyond StorageBackend.get_neighbors().
"""
async def shortest_path(
self, from_id: str, to_id: str, *, max_depth: int = 5
) -> list[tuple[Node, Edge]]: ...
async def pattern_match(self, pattern: str, *, limit: int = 20) -> list[dict[str, object]]: ...
async def find_by_type_hierarchy(self, type_name: str, *, limit: int = 50) -> list[Node]: ...
class Digester(Protocol):
"""Converts structured context into knowledge nodes and edges."""
async def digest(self, context: dict[str, object]) -> DigestResult: ...
class QueryRewriter(Protocol):
"""Rewrites a search query into expanded forms (e.g. via LLM)."""
async def rewrite(self, query: str) -> list[str]: ...
class QueryDecomposer(Protocol):
"""Breaks a compound query into atomic sub-queries.
Distinct from ``QueryRewriter``: a rewriter returns alternative phrasings
of the *same* information need (paraphrases, synonyms), while a
decomposer returns *multiple independent* sub-queries whose union
answers the original (the output is meant to be searched in parallel
and fused via RRF, not substituted).
A non-decomposable query should be returned as a single-element list so
the caller can treat "no decomposition" uniformly.
"""
async def decompose(self, query: str) -> list[str]: ...
class TagExtractor(Protocol):
"""Extracts tags from text content."""
def extract(self, text: str) -> list[str]: ...
class KindClassifier(Protocol):
"""Classifies text into a NodeKind."""
def classify(self, title: str, content: str) -> NodeKind: ...
class RelationDetector(Protocol):
"""Detects potential edges for a newly added node."""
async def detect(
self,
node: Node,
backend: StorageBackend,
) -> list[tuple[str, EdgeKind, float]]: ...
class EntityExtractor(Protocol):
"""Extracts entities from text and links them in the graph.
Drop-in compatible with PhraseExtractor, SpaCyEntityExtractor,
and HybridEntityExtractor.
"""
async def extract_and_link(
self,
graph: object, # SynapticGraph (avoid circular import)
node_id: str,
title: str,
content: str,
) -> list[str]: ...