Update the MIRIX code base to be async-native#118
Closed
LiaoJianhe wants to merge 11 commits intoMirix-AI:re-orgfrom
Closed
Update the MIRIX code base to be async-native#118LiaoJianhe wants to merge 11 commits intoMirix-AI:re-orgfrom
LiaoJianhe wants to merge 11 commits intoMirix-AI:re-orgfrom
Conversation
* feat: multi scope clients * fix: format and passing tests * fix: fix langfuse tests * fix: fix local client tests * fix: fix tests * chore: format * feat: scoped core memory * fix: fix some test bugs * chore: tests * Apply suggestion from @L-u-k-e * Apply suggestion from @L-u-k-e --------- Co-authored-by: Jianhe Liao <jianhe_liao@intuit.com>
feat: Allow clients to add `filter_tags` to blocks and use them for cross user searches
* feat: support multiple filter operators in tag search * chore: remove integration test * fix: messageToDict * feat: block filter tag updates - always-apply on save plus new update mode options (#58) * gfeat: update block filter tags * fix: fix bugs
Made-with: Cursor
Made-with: Cursor
Collaborator
Author
|
Wrong PR target branch |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
MIRIX Async-Native Rewrite
1. Why Async-Native
MIRIX is a multi-agent system where every user request fans out into
database queries, Redis lookups, LLM API calls, embedding computations,
and Kafka messages -- all I/O-bound. The previous sync codebase serialized
these operations: each blocked thread sat idle waiting for a network
response, and concurrency was limited to the thread-pool size.
Rewriting the stack to be async-native delivers several concrete benefits:
Higher throughput on the same hardware.
A single event loop multiplexes thousands of in-flight I/O operations
without dedicating a thread to each one. Connection pools (asyncpg, Redis,
httpx) are shared across all coroutines, so the server handles more
concurrent users with fewer file descriptors, less memory, and less
context-switching overhead.
End-to-end consistency with FastAPI / Uvicorn.
FastAPI is async-first. When route handlers are
async defand directlyawaitthe server/manager/agent/LLM chain, there is no implicit offloadto a thread-pool executor. This removes an entire class of subtle bugs
(thread-safety of shared state, session leaks across threads) and makes
the call stack easy to reason about.
Natural streaming and SSE.
LLM token streaming and Server-Sent Events map directly to async
generators. No background thread is needed to feed an SSE response; the
generator yields tokens as they arrive from the LLM provider.
In-process background workers.
Queue consumers (memory extraction, cleanup) run as
asyncio.Tasks inthe same process. This simplifies deployment (one container, one process)
while keeping workers non-blocking.
Lower tail latency.
asyncio.sleep-based retries and exponential back-off do not occupy athread during the wait, freeing the loop to serve other requests.
2. High-Level Changes
2.1 External Library Migrations
pg8000/psycopg2-binarycreate_engine,sessionmaker,Sessioncreate_async_engine,async_sessionmaker,AsyncSessionredis.Redishiredisrequestsopenai.OpenAIopenai.AsyncOpenAIanthropic.Anthropicanthropic.AsyncAnthropicAzureOpenAIAsyncAzureOpenAIgenaicallsgenai+httpx.AsyncClientAIOKafkaProducer,AIOKafkaConsumer)duckduckgo_searchgoogle-api-python-clientasyncio_mode = "auto")2.2 Application-Layer Changes
ORM base (
mirix/orm/sqlalchemy_base.py)All CRUD methods (
create,read,update,delete,list) areasync def. Sessions are used viaasync with session. Retry decoratorsuse
asyncio.sleep().Service managers (
mirix/services/)All 16 managers are async:
user_manager.pyclient_manager.pytool_manager.pyadmin_user_manager.pyorganization_manager.pyblock_manager.pymessage_manager.pycloud_file_mapping_manager.pystep_manager.pyagent_manager.pyraw_memory_manager.pyepisodic_memory_manager.pysemantic_memory_manager.pyprocedural_memory_manager.pyresource_memory_manager.pyknowledge_vault_manager.pyEvery manager method uses
async with self.session_maker()andawaitfor all database operations.
LLM API layer (
mirix/llm_api/)LLMClientBase.send_llm_request()andrequest()are async. Allprovider clients (OpenAI, Anthropic, Azure, Google, Cohere, Mistral, AWS
Bedrock) use their respective async SDK classes. Streaming responses are
AsyncGenerator.retry_with_exponential_backoff()usesasyncio.sleep().Agent execution (
mirix/agent/agent.py)step(),inner_step(),_get_ai_reply(),_handle_ai_response(),execute_tool_and_persist_state(), andsave_agent()are all async.Built-in tools (core, memory, extras) are async. User-defined tools
execute in
ToolExecutionSandboxviaasyncio.create_subprocess_exec()(no thread pool).
MetaAgent (
mirix/agent/meta_agent.py)MetaAgent.step(),initialize(), and sub-agent orchestration are async.MessageQueueusesasyncio.Lockinstead ofthreading.Lock.Queue system (
mirix/queue/)MemoryQueuewrapsasyncio.Queue.KafkaQueueusesaiokafka(fully async producer/consumer).QueueWorkerruns as anasyncio.Taskin the main event loop.Server (
mirix/server/server.py)AsyncServer(renamed from the formerSyncServer) exposes asyncmethods:
send_messages(),_step(),load_agent(),create_agent().A backward-compatible alias
SyncServer = AsyncServeris retained forexternal callers that have not yet updated.
REST API (
mirix/server/rest_api.py)All route handlers are
async defand directlyawaitserver methods.Zero
asyncio.to_threadwrappers on the request path. SSE streaming usessse_async_generator().Client SDK (
mirix/client/remote_client.py)MirixClientuseshttpx.AsyncClientwithRetryTransport. All publicmethods (
add,send_message,create_agent, etc.) are async.MirixClient.create()is an async factory for initialization.Observability (
mirix/observability/langfuse_client.py)Singleton initialization uses
asyncio.Lockfor coroutine-safedouble-checked locking. The sync LangFuse SDK is called via
asyncio.to_thread(see Section 3.1).Tests (
tests/,pyproject.toml)pytest-asynciowithasyncio_mode = "auto". Fixtures inconftest.pyare async.
asyncio_default_fixture_loop_scope = "session".3. Remaining Synchronous Code
The request-serving hot path is fully async. The items below are the only
remaining synchronous touch-points. Each is intentional.
3.1 LangFuse SDK
mirix/observability/langfuse_client.pyLangfuse()init,.flush(),.shutdown()are sync SDK calls, wrapped withawait asyncio.to_thread(...).to_threadborrows a thread from the default executor briefly; it does not block the event loop or limit request concurrency.3.2 Gmail OAuth
mirix/functions/mcp_client/gmail_client.pyauthenticate_gmail_local()blocks waiting for a browser OAuth redirect. Called viaawait asyncio.to_thread(...).3.3 SQLAlchemy DDL at Startup
mirix/server/server.py,ensure_tables_created()await conn.run_sync(Base.metadata.create_all)run_syncis the documented pattern for async engines.3.4 Cleanup Job Entry Point
mirix/jobs/cleanup_raw_memories.pyasyncio.run(delete_stale_raw_memories_async(threshold))in__main__.3.5 Pure CPU Helpers -- Intentionally Sync
mirix/utils.py,mirix/services/utils.py, and private helpers in memory managers (_clean_text_for_search,_parse_embedding_field,_count_word_matches,_preprocess_text_for_bm25).async defto a function that neverawaits provides no concurrency benefit. The event loop only yields atawaitpoints, so anasync defbody with no awaits runs identically to a plaindef-- but with extra coroutine-object overhead. A function should beasync defif and only if it performs I/O. (Note:mirix/services/utils.py::build_queryis correctlyasync defbecause it awaitsembedding_model().)asyncio.to_thread(offload to a thread), notasync def.3.6 Server Class Naming (Resolved)
The class formerly named
SyncServerhas been renamed toAsyncServeras part of this change set. All imports, type hints, docstrings, and tests
have been updated. A backward-compatible alias
SyncServer = AsyncServeris retained in
mirix/server/server.py.4. Summary
The MIRIX application is async-native from the HTTP boundary through the
server, agents, service managers, ORM, database, Redis, Kafka, and
LLM/embedding clients. The only remaining sync touch-points are:
asyncio.to_thread; low impact.to_thread; rare.run_sync; no runtime impact.asyncio.run()in__main__; separate process.async defwould add overhead, not benefit.SyncServerrenamed toAsyncServer; alias kept.None of these limit MIRIX's ability to scale request throughput or
concurrent users. The critical path is fully async.