Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,9 @@ dependencies = [
"dimo-python-sdk @ git+https://github.com/openminddev/dimo-python-sdk.git@6b47fcd28654a4145cedee649a0999a8eb08a2f6",
"nest-asyncio==1.6.0",
"tf-keras==2.18.0",
"sentence-transformers>=3.0.0",
"chromadb>=0.5.0",
"onnxruntime<1.20",
"faiss-cpu>=1.7.4",
]

Expand Down
17 changes: 15 additions & 2 deletions src/fuser/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@
from collections.abc import Sequence

from actions import describe_action
from fuser.knowledge_base.retriever import KnowledgeBase
from inputs.base import Sensor
from providers.io_provider import IOProvider
from providers.semantic_memory_provider import SemanticMemoryProvider
from runtime.config import RuntimeConfig


Expand Down Expand Up @@ -40,6 +40,8 @@ def __init__(self, config: RuntimeConfig):
self.knowledge_base = None
if config.knowledge_base:
try:
from fuser.knowledge_base.retriever import KnowledgeBase

self.knowledge_base = KnowledgeBase(**config.knowledge_base)
logging.info(
f"KnowledgeBase enabled with config: {config.knowledge_base}"
Expand Down Expand Up @@ -114,6 +116,17 @@ async def fuse(
if kb_context:
inputs_fused += f"\n\nKNOWLEDGE BASE:\n{kb_context}"

# Retrieve relevant memories from semantic memory
memories_section = ""
sem_memory = SemanticMemoryProvider()
if sem_memory.enabled and inputs_fused.strip():
mode = self.config.mode or "default"
memories = sem_memory.retrieve(query=inputs_fused, mode=mode)
if memories:
memories_section = "\nRELEVANT MEMORIES:\n" + "".join(
f"{i}. {mem}\n" for i, mem in enumerate(memories, 1)
)

# if we provide laws from blockchain, these override the locally stored rules
# the rules are not provided in the system prompt, but as a separate INPUT,
# since they are flowing from the outside world
Expand All @@ -140,7 +153,7 @@ async def fuse(
# (2) all the inputs (vision, sound, etc.)
# (3) a (typically) fixed list of available actions
# (4) a (typically) fixed system prompt requesting commands to be generated
fused_prompt = f"{system_prompt}\n\nAVAILABLE INPUTS:\n{inputs_fused}\nAVAILABLE ACTIONS:\n\n{actions_fused}\n\n{question_prompt}"
fused_prompt = f"{system_prompt}\n\nAVAILABLE INPUTS:\n{inputs_fused}{memories_section}\nAVAILABLE ACTIONS:\n\n{actions_fused}\n\n{question_prompt}"

logging.debug(f"FINAL PROMPT: {fused_prompt}")

Expand Down
9 changes: 9 additions & 0 deletions src/llm/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,15 @@ class LLMConfig(BaseModel):
history_length: T.Optional[int] = Field(
default=0, description="Number of past interactions to keep in context"
)
semantic_memory_enabled: T.Optional[bool] = Field(
default=False, description="Enable semantic memory for long-term recall"
)
semantic_memory_top_k: T.Optional[int] = Field(
default=3, description="Number of top memories to retrieve per query"
)
semantic_memory_threshold: T.Optional[float] = Field(
default=0.3, description="Minimum cosine similarity for memory retrieval"
)
extra_params: T.Dict[str, T.Any] = Field(default_factory=dict)

def __getitem__(self, item: str) -> T.Any:
Expand Down
12 changes: 12 additions & 0 deletions src/providers/llm_history_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from llm import LLMConfig

from .io_provider import IOProvider
from .semantic_memory_provider import SemanticMemoryProvider

R = TypeVar("R")

Expand Down Expand Up @@ -360,6 +361,17 @@ async def wrapper(self: Any, prompt: str, *args: Any, **kwargs: Any) -> R:
ChatMessage(role="assistant", content=action_message)
)

# Store to semantic memory
sem_memory = SemanticMemoryProvider()
if sem_memory.enabled:
mode = getattr(self._config, "mode", None) or "default"
sem_memory.store(
formatted_inputs,
action_message,
mode,
current_tick,
)

if (
self.history_manager.config.history_length > 0
and len(self.history_manager.history)
Expand Down
255 changes: 255 additions & 0 deletions src/providers/semantic_memory_provider.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,255 @@
"""Semantic memory provider using local embeddings and ChromaDB vector search."""

import logging
import os
import time
from typing import List, Optional

from providers.singleton import singleton

logger = logging.getLogger(__name__)

# Maximum characters per memory document
MAX_TEXT_LENGTH = 1000

# Default storage path relative to project root
DEFAULT_PERSIST_DIR = os.path.join(
os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))),
"config",
"memory",
"embeddings",
)


@singleton
class SemanticMemoryProvider:
"""Provides semantic memory storage and retrieval using embeddings.

Uses sentence-transformers for local embedding generation and ChromaDB
for persistent vector storage. Each robot mode gets its own collection.

Parameters
----------
None
Configuration is done via the configure() method after instantiation.
"""

def __init__(self) -> None:
self.enabled: bool = False
self.top_k: int = 3
self.similarity_threshold: float = 0.3

self._model = None
self._chroma_client = None
self._collections: dict = {}
self._persist_dir: str = DEFAULT_PERSIST_DIR

def configure(
self,
enabled: bool = False,
top_k: int = 3,
similarity_threshold: float = 0.3,
) -> None:
"""Configure the semantic memory provider.

Parameters
----------
enabled : bool
Whether semantic memory is active.
top_k : int
Number of top results to return from retrieval.
similarity_threshold : float
Minimum cosine similarity score to include a result.
"""
self.enabled = enabled
self.top_k = top_k
self.similarity_threshold = similarity_threshold

if self.enabled:
self._ensure_initialized()

def _ensure_initialized(self) -> None:
"""Lazy-load the embedding model and ChromaDB client."""
if self._model is not None and self._chroma_client is not None:
return

try:
from sentence_transformers import SentenceTransformer

logger.info("Loading embedding model: all-MiniLM-L6-v2")
self._model = SentenceTransformer("all-MiniLM-L6-v2", device="cpu")
logger.info("Embedding model loaded successfully")
except Exception as e:
logger.error(f"Failed to load embedding model: {e}")
self.enabled = False
return

try:
import chromadb

os.makedirs(self._persist_dir, exist_ok=True)
self._chroma_client = chromadb.PersistentClient(path=self._persist_dir)
logger.info(f"ChromaDB initialized at {self._persist_dir}")
except Exception as e:
logger.error(f"Failed to initialize ChromaDB: {e}")
self.enabled = False
return

def _get_collection(self, mode: str):
"""Get or create a ChromaDB collection for the given mode.

Parameters
----------
mode : str
The robot operating mode name.

Returns
-------
chromadb.Collection or None
The ChromaDB collection, or None if not initialized.
"""
if self._chroma_client is None:
return None

collection_name = f"om1_{mode}"
if collection_name not in self._collections:
self._collections[collection_name] = (
self._chroma_client.get_or_create_collection(
name=collection_name,
metadata={"hnsw:space": "cosine"},
)
)
return self._collections[collection_name]

def store(
self,
sensory_input: str,
action_response: str,
mode: str,
tick: int,
) -> None:
"""Store a sensory-action pair as a memory document.

Parameters
----------
sensory_input : str
The formatted sensory input text.
action_response : str
The action response text from the LLM.
mode : str
The current robot operating mode.
tick : int
The current tick number.
"""
if not self.enabled or self._model is None:
return

collection = self._get_collection(mode)
if collection is None:
return

try:
document = f"Input: {sensory_input[:MAX_TEXT_LENGTH]} | Response: {action_response[:MAX_TEXT_LENGTH]}"
embedding = self._model.encode(document, normalize_embeddings=True).tolist()

doc_id = f"tick_{tick}_{int(time.time() * 1000)}"

collection.add(
ids=[doc_id],
embeddings=[embedding],
documents=[document],
metadatas=[{"tick": tick, "timestamp": time.time(), "mode": mode}],
)

logger.debug(f"Stored memory: {doc_id} (mode={mode})")

except Exception as e:
logger.error(f"Failed to store memory: {e}")

def retrieve(
self,
query: str,
mode: str,
top_k: Optional[int] = None,
) -> List[str]:
"""Retrieve relevant memories for a given query.

Parameters
----------
query : str
The query text to find relevant memories for.
mode : str
The current robot operating mode.
top_k : int, optional
Override the default number of results. Defaults to self.top_k.

Returns
-------
List[str]
List of relevant memory document strings, ordered by similarity.
"""
if not self.enabled or self._model is None:
return []

collection = self._get_collection(mode)
if collection is None:
return []

try:
if collection.count() == 0:
return []

k = top_k if top_k is not None else self.top_k
k = min(k, collection.count())

query_embedding = self._model.encode(
query[:MAX_TEXT_LENGTH], normalize_embeddings=True
).tolist()

results = collection.query(
query_embeddings=[query_embedding],
n_results=k,
include=["documents", "distances"],
)

if not results or not results["documents"] or not results["documents"][0]:
return []

memories = []
documents = results["documents"][0]
# ChromaDB cosine distance = 1 - cosine_similarity
distances = results["distances"][0] if results["distances"] else []

for i, doc in enumerate(documents):
if i < len(distances):
similarity = 1.0 - distances[i]
if similarity >= self.similarity_threshold:
memories.append(doc)
else:
memories.append(doc)

logger.debug(f"Retrieved {len(memories)} memories for mode={mode}")
return memories

except Exception as e:
logger.error(f"Failed to retrieve memories: {e}")
return []

def clear_mode(self, mode: str) -> None:
"""Clear all memories for a specific mode.

Parameters
----------
mode : str
The robot operating mode to clear memories for.
"""
if self._chroma_client is None:
return

collection_name = f"om1_{mode}"
try:
self._chroma_client.delete_collection(name=collection_name)
self._collections.pop(collection_name, None)
logger.info(f"Cleared memories for mode: {mode}")
except Exception as e:
logger.error(f"Failed to clear memories for mode {mode}: {e}")
14 changes: 14 additions & 0 deletions src/runtime/cortex.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from inputs.orchestrator import InputOrchestrator
from providers.config_provider import ConfigProvider
from providers.io_provider import IOProvider
from providers.semantic_memory_provider import SemanticMemoryProvider
from providers.sleep_ticker_provider import SleepTickerProvider
from runtime.config import (
LifecycleHookType,
Expand Down Expand Up @@ -139,6 +140,19 @@ async def _initialize_mode(self, mode_name: str):
self.simulator_orchestrator = SimulatorOrchestrator(self.current_config)
self.background_orchestrator = BackgroundOrchestrator(self.current_config)

# Configure semantic memory provider
memory = SemanticMemoryProvider()
llm_config = getattr(self.current_config.cortex_llm, "_config", None)
if llm_config is not None:
memory.configure(
enabled=getattr(llm_config, "semantic_memory_enabled", False) or False,
top_k=getattr(llm_config, "semantic_memory_top_k", 3) or 3,
similarity_threshold=getattr(
llm_config, "semantic_memory_threshold", 0.3
)
or 0.3,
)

logging.info(f"Mode '{mode_name}' initialized successfully")

async def _handle_mode_transitions(self):
Expand Down
Loading
Loading