-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmanager.py
More file actions
115 lines (89 loc) · 3.59 KB
/
manager.py
File metadata and controls
115 lines (89 loc) · 3.59 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
"""
High-level memory manager (production-stable).
Connects schema (Pydantic) <-> store (JSON persistence).
"""
import datetime
from typing import Optional, Dict, Any
from .schema import UserMemory, EventMemory, ChatContext, ChatMessage, OAuthMetadata
from .store import get_user_memory, save_user_memory, delete_user_memory
# --- Internal helpers ---
async def _load_user(user_id: str) -> UserMemory:
"""Load or initialize user memory safely."""
data = await get_user_memory(user_id)
if not data:
return UserMemory(user_id=user_id)
try:
return UserMemory(**data)
except Exception:
# Defensive salvage
partial = {"user_id": user_id}
if isinstance(data, dict):
for k in ("timezone", "recent_events", "chat_context"):
if k in data:
partial[k] = data[k]
return UserMemory(**partial)
async def _persist_user(user: UserMemory):
"""Persist user memory in JSON store."""
await save_user_memory(user.user_id, user.to_serializable())
# -------------------------------
# Core APIs for agent orchestration
# -------------------------------
async def add_event(user_id: str, event_data: Dict[str, Any]):
user = await _load_user(user_id)
evt = EventMemory(**event_data)
user.recent_events.append(evt)
user.last_synced = datetime.datetime.utcnow().isoformat()
await _persist_user(user)
return evt
async def get_last_event(user_id: str) -> Optional[EventMemory]:
user = await _load_user(user_id)
return user.recent_events[-1] if user.recent_events else None
async def delete_last_event(user_id: str) -> Optional[EventMemory]:
user = await _load_user(user_id)
if not user.recent_events:
return None
deleted = user.recent_events.pop()
user.last_deleted_event = deleted
await _persist_user(user)
return deleted
async def forget_user(user_id: str):
await delete_user_memory(user_id)
# -------------------------------
# Chat context management
# -------------------------------
async def add_message_context(user_id: str, role: str, text: str):
user = await _load_user(user_id)
msg = ChatMessage(role=role, text=text)
user.chat_context.messages.append(msg)
user.chat_context.last_updated = datetime.datetime.utcnow().isoformat()
await _persist_user(user)
async def get_chat_context(user_id: str) -> list[Dict[str, Any]]:
user = await _load_user(user_id)
return [m.dict() for m in user.chat_context.messages]
# -------------------------------
# OAuth metadata helpers (safe)
# -------------------------------
async def save_oauth_tokens(user_id: str, tokens_meta: Dict[str, Any]):
user = await _load_user(user_id)
meta = OAuthMetadata(
google_sub=tokens_meta.get("google_sub") or tokens_meta.get("google_email"),
google_email=tokens_meta.get("google_email"),
expires_at=tokens_meta.get("expires_at") or tokens_meta.get("expiry"),
scope=tokens_meta.get("scope") or tokens_meta.get("scopes"),
)
user.oauth = meta
user.last_synced = datetime.datetime.utcnow().isoformat()
await _persist_user(user)
return user.oauth
async def get_oauth_tokens(user_id: str) -> Optional[Dict[str, Any]]:
user = await _load_user(user_id)
if not user.oauth:
return None
return user.oauth.dict()
# -------------------------------
# 🔥 NEW — Full user memory access
# -------------------------------
async def get_full_user_data(user_id: str) -> dict:
"""Return the entire user JSON memory as a serializable dict."""
user = await _load_user(user_id)
return user.to_serializable()