Production-ready PostgreSQL memory for LangGraph agents. Pool setup, lifecycle management, retry logic, and common operations — no boilerplate.
Note: This library wraps langgraph-checkpoint-postgres and langgraph — it does not reimplement checkpointing or the store. It handles the boilerplate you'd otherwise copy-paste into every agent.
Short-term memory persists conversation state within a single thread. LangGraph's checkpointer saves a snapshot at every graph step automatically — PostgresShortTerm handles the pool, lifecycle, and cleanup around it.
- One-line setup — connection pool, checkpointer, and lifecycle managed via async context manager
- Production pool defaults — TCP keepalives, configurable idle/lifetime/timeout, schema isolation
- Retry with backoff — transient Postgres errors retried automatically via tenacity
- Thread cleanup — single CTE deletes across all 3 checkpoint tables in one round-trip
- Bulk cleanup — delete threads older than N days using UUID v6 timestamp comparison
- Health primitives —
ping()andpool_stats()for application health endpoints - Pydantic config — validated settings, pass however you load them (YAML, env vars, hardcoded)
from langchain_core.messages import HumanMessage
from langgraph.graph import END, START, MessagesState, StateGraph
from langgraph_postgres_memory import PostgresMemoryConfig, PostgresShortTerm
# Define your graph
builder = StateGraph(MessagesState)
builder.add_node("echo", lambda state: {"messages": state["messages"]})
builder.add_edge(START, "echo")
builder.add_edge("echo", END)
# Configure memory
config = PostgresMemoryConfig(
user="myuser",
password="mypass",
host="localhost",
database="mydb",
schema_name="agent_schema", # default: "public"
)
async with PostgresShortTerm(config) as memory:
# Compile your graph with the checkpointer
graph = builder.compile(checkpointer=memory.checkpointer)
# Invoke as usual
result = await graph.ainvoke(
{"messages": [HumanMessage(content="hello")]},
{"configurable": {"thread_id": "thread-123"}},
)
# Read messages back
messages = await memory.get_messages("thread-123")
# Delete a thread
await memory.delete_thread("thread-123")
# Bulk cleanup
await memory.delete_threads_older_than(days=30)
# Health check
alive = await memory.ping()
stats = memory.pool_stats()# ~40 lines you copy-paste into every agent
conn_str = f"postgresql://{user}:{quote_plus(password)}@{host}:{port}/{db}"
conn_str += "?keepalives=1&keepalives_idle=30&..."
pool = AsyncConnectionPool(
conninfo=conn_str, min_size=2, max_size=10,
kwargs={"autocommit": True, "row_factory": dict_row},
configure=..., check=...,
)
await pool.open()
checkpointer = AsyncPostgresSaver(pool)
await checkpointer.setup()
# ... try/finally to close pool
# ... raw SQL to delete threads across 3 tables
# ... dig into checkpoint JSONB to extract messagesLong-term memory persists knowledge across threads and sessions. Unlike the checkpointer (which saves every message automatically within one thread), the store requires explicit reads and writes — your agent decides what to remember. PostgresLongTerm handles pool, store lifecycle, TTL sweeper, retry, and convenience operations.
- One-line setup — connection pool, store, table migrations, and TTL sweeper via async context manager
- Any embedding provider — pass OpenAI, Bedrock, Cohere, or any custom function. No provider lock-in
- Optional semantic search — embeddings are optional. Works as a pure key-value store without them
- TTL auto-lifecycle — sweeper starts/stops automatically with the context manager
- Namespace CRUD with retry —
put,get,search,delete,list_namespacesall retried on transient errors - Bulk operations —
delete_namespaceandcountvia raw SQL (Store API doesn't have these) - Health primitives —
ping()andpool_stats()for application health endpoints - Pydantic config — inherits all pool/retry settings from
PostgresMemoryConfig, adds embedding + TTL fields
from langgraph_postgres_memory import PostgresLongTermConfig, PostgresLongTerm
config = PostgresLongTermConfig(user="u", password="p", database="db")
async with PostgresLongTerm(config) as memory:
# Store user preferences
await memory.put(("users", "u1", "prefs"), "theme", {"value": "dark"})
await memory.put(("users", "u1", "prefs"), "lang", {"value": "python"})
# Retrieve
item = await memory.get(("users", "u1", "prefs"), "theme")
print(item.value) # {"value": "dark"}
# Search with filter
results = await memory.search(("users", "u1", "prefs"), filter={"value": "python"})
# List namespaces
ns = await memory.list_namespaces(prefix=("users", "u1"))
# Count items
n = await memory.count(("users", "u1", "prefs")) # 2
# Bulk delete
deleted = await memory.delete_namespace(("users", "u1", "prefs")) # 2
# Health check
alive = await memory.ping()
stats = memory.pool_stats()from langchain_openai import OpenAIEmbeddings
from langgraph_postgres_memory import PostgresLongTermConfig, PostgresLongTerm
config = PostgresLongTermConfig(
user="u", password="p", database="db",
embedding=OpenAIEmbeddings(model="text-embedding-3-small"),
embedding_dims=1536,
embedding_fields=["text"],
)
async with PostgresLongTerm(config) as memory:
await memory.put(
("users", "u1", "memories"), "m1",
{"text": "User prefers Python for backend work"}
)
await memory.put(
("users", "u1", "memories"), "m2",
{"text": "User's company runs on AWS with Kubernetes"}
)
# Semantic search
results = await memory.search(
("users", "u1", "memories"),
query="what cloud infrastructure does the user have?"
)
# Returns m2 ranked higher (semantic match)import boto3
from langchain_aws import BedrockEmbeddings
from langgraph_postgres_memory import PostgresLongTermConfig, PostgresLongTerm
session = boto3.Session(profile_name="my-profile", region_name="us-east-1")
bedrock_client = session.client("bedrock-runtime")
config = PostgresLongTermConfig(
user="u", password="p", database="db",
embedding=BedrockEmbeddings(
model_id="amazon.titan-embed-text-v2:0",
client=bedrock_client
),
embedding_dims=1024,
)
async with PostgresLongTerm(config) as memory:
# Works exactly the same as OpenAI example
...from langgraph_postgres_memory import (
PostgresMemoryConfig,
PostgresShortTerm,
PostgresLongTermConfig,
PostgresLongTerm,
)
short_config = PostgresMemoryConfig(user="u", password="p", database="db")
long_config = PostgresLongTermConfig(
user="u", password="p", database="db",
embedding=my_embeddings, embedding_dims=1536,
)
async with PostgresShortTerm(short_config) as short, PostgresLongTerm(long_config) as long:
graph = builder.compile(
checkpointer=short.checkpointer, # automatic per-thread state
store=long.store, # explicit cross-thread memory
)config = PostgresLongTermConfig(
user="u", password="p", database="db",
ttl_default_minutes=1440, # 24 hours default
ttl_sweep_interval_minutes=10, # check every 10 minutes
)
async with PostgresLongTerm(config) as memory:
# This item expires in 60 minutes (overrides default)
await memory.put(("cache",), "temp", {"data": "..."}, ttl=60)
# This item uses default TTL (24 hours)
await memory.put(("users", "u1", "session"), "ctx", {"last_topic": "k8s"})
# TTL sweeper runs automatically in background# pip
pip install langgraph-postgres-memory
# uv
uv add langgraph-postgres-memory- Python >= 3.11
- PostgreSQL (tested with 16)
- pgvector extension (only if using semantic search)
All pool and retry settings have sensible defaults. Override what you need:
config = PostgresMemoryConfig(
user="myuser",
password="mypass",
database="mydb",
# Connection (defaults shown)
host="localhost",
port=5432,
schema_name="public",
# Pool tuning
pool_min_size=2,
pool_max_size=20,
pool_max_idle=300, # seconds — tune down to ~30 for serverless (Neon, Supabase)
pool_max_lifetime=1800, # seconds — tune down to ~180 for serverless
pool_timeout=30, # seconds — acquisition timeout
# Retry tuning
retry_max_attempts=3,
retry_max_wait=10, # backoff cap in seconds
)PostgresLongTermConfig inherits all fields above and adds:
long_config = PostgresLongTermConfig(
user="myuser",
password="mypass",
database="mydb",
# Embedding (optional — omit for pure key-value mode)
embedding=my_embeddings_object, # LangChain Embeddings, sync/async function
embedding_dims=1536, # required if embedding is set
embedding_fields=["$"], # JSON paths to embed, default = entire value
distance_type="cosine", # "cosine", "l2", or "inner_product"
# TTL (optional — omit for no expiry)
ttl_default_minutes=1440, # default TTL for new items (minutes)
ttl_sweep_interval_minutes=5, # background cleanup interval (minutes)
)| Method | Description |
|---|---|
PostgresShortTerm(config) |
Constructor, takes PostgresMemoryConfig |
async with PostgresShortTerm(config) |
Opens pool, initializes checkpointer, closes on exit |
.checkpointer |
AsyncPostgresSaver instance for builder.compile(checkpointer=...) |
await .get_messages(thread_id) |
Get messages from latest checkpoint |
await .delete_thread(thread_id) |
Delete all checkpoints, blobs, and writes for a thread |
await .delete_threads_older_than(days) |
Bulk delete threads older than N days |
await .ping() |
Returns True if database is reachable |
.pool_stats() |
Pool size, available connections, waiting requests |
| Method | Description |
|---|---|
PostgresLongTerm(config) |
Constructor, takes PostgresLongTermConfig |
async with PostgresLongTerm(config) |
Opens pool, initializes store, starts TTL sweeper, closes on exit |
.store |
AsyncPostgresStore instance for builder.compile(store=...) |
await .put(namespace, key, value) |
Store or update an item (supports index and ttl kwargs) |
await .get(namespace, key) |
Retrieve an item or None |
await .search(namespace_prefix) |
Search with optional query, filter, limit, offset |
await .delete(namespace, key) |
Delete a single item |
await .list_namespaces() |
List namespaces with optional prefix, suffix, max_depth |
await .delete_namespace(namespace) |
Delete ALL items in a namespace (returns count) |
await .count(namespace_prefix) |
Count items under a namespace prefix |
await .ping() |
Returns True if database is reachable |
.pool_stats() |
Pool size, available connections, waiting requests |
langgraph_postgres_memory/
__init__.py — public exports
_core.py — shared config, pool builder, retry builder, helpers
shortterm.py — PostgresShortTerm (checkpointer wrapper)
longterm.py — PostgresLongTermConfig + PostgresLongTerm (store wrapper)
pyproject.toml
Makefile
tests/
conftest.py — test config fixture + --run-integration flag
docker-compose.yml — Postgres 16 on port 5433
test_shortterm.py — 20 unit + 7 integration tests
test_longterm.py — 17 unit + 21 integration tests
# Unit tests only (no database needed)
make test-unit
# Full test suite (starts Postgres via Docker, runs all tests, stops Postgres)
make test-all
# Or manually
docker compose -f tests/docker-compose.yml up -d --wait
uv run pytest --run-integration -v
docker compose -f tests/docker-compose.yml downThis project wraps langgraph-checkpoint-postgres and langgraph from the LangChain team. The checkpointing engine, store, serialization, and schema management are entirely theirs — this library handles pool lifecycle, retry, and convenience operations on top.
MIT