Skip to content

Enable concurrent processing of messages across topics#103

Open
tschuehly wants to merge 1 commit intoRichardAtCT:mainfrom
tschuehly:fix/concurrent-topic-processing
Open

Enable concurrent processing of messages across topics#103
tschuehly wants to merge 1 commit intoRichardAtCT:mainfrom
tschuehly:fix/concurrent-topic-processing

Conversation

@tschuehly
Copy link

Summary

  • Store per-update thread state in a dict on the orchestrator instance instead of setting attributes on telegram.Update (which uses __slots__ and raises AttributeError)
  • Add projects and tasks to _CLAUDE_INTERNAL_SUBDIRS whitelist so Claude Code can read its own compressed tool results and task data
  • Allow /tmp/claude-<uid>/ paths for Task agent output files
  • Add "Viewing Logs" section to README

Closes #102

Test plan

  • Send messages to multiple project topics concurrently and verify they are processed in parallel
  • Verify plan mode works (writes to ~/.claude/plans/)
  • Verify long conversations work (Claude re-reads compressed tool results from ~/.claude/projects/)
  • Verify Task agent subprocesses work (reads from /tmp/claude-<uid>/)

🤖 Generated with Claude Code

Store per-update thread state in a dict on the orchestrator instance
instead of setting attributes on telegram.Update (which uses __slots__).

Also whitelist Claude Code internal paths ("projects", "tasks") in
_CLAUDE_INTERNAL_SUBDIRS and allow /tmp/claude-<uid>/ for Task agent
output files to prevent "path outside approved directory" errors.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Copy link
Owner

@RichardAtCT RichardAtCT left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great approach — using _update_thread_states keyed by update_id is the right pattern for asyncio concurrency isolation. A few things to address:

Blocking:

  1. Rebase needed — conflicts with PRs #96 (monitor.py refactored) and #110 (orchestrator.py). Move import os to top of monitor.py during rebase.

  2. No automated tests — This PR's entire purpose is concurrency safety, so tests are essential. Please add tests for: (a) _update_thread_states populated and cleaned up correctly, (b) two concurrent updates for the same user get isolated state, (c) cleanup still happens if _apply_thread_routing_context raises after inserting into the dict.

  3. _persist_thread_state signature change — Now takes update as first arg. Existing test test_thread_mode_loads_and_persists_thread_state may break after rebase — verify.

Non-blocking concerns:

  • force_new_session is still read from shared user_data — a /new in topic A could leak to a concurrent handler in topic B. Pre-existing issue but worth noting.
  • context.user_data["claude_session_id"] is still written back to shared state after each run. The per-update dict shadows it correctly during the handler, but a third concurrent handler could read a stale value from the fallback.
  • Potential memory leak if exception escapes _apply_thread_routing_context after dict insert but before the try/finally in _inject_deps.

The core design is sound — these are fixable issues, not fundamental problems.

@FridayOpenClawBot
Copy link

PR Review
Reviewed head: d452fa28c7dfe4eb650a2b53249df9794682322e

Summary

  • Fixes concurrent topic processing: stores per-update thread state in a dict on the orchestrator (_update_thread_states) instead of setting attributes on telegram.Update (which uses __slots__)
  • Adds projects and tasks to _CLAUDE_INTERNAL_SUBDIRS whitelist; allows /tmp/claude-<uid>/ paths for Task agent output
  • Enables concurrent_updates(True) on the PTB Application
  • Adds "Viewing Logs" section to README

What looks good

  • Keying _update_thread_states by update_id is the right isolation primitive — update IDs are unique within a bot session
  • Snapshotting state before the first await in agentic_text is the correct fix for the race condition
  • _update_thread_states.pop(update_id) in _persist_thread_state cleans up correctly

Issues / questions

  1. [Important] src/bot/orchestrator.py_update_thread_states is an instance dict that grows with each update and is only cleaned up in _persist_thread_state. If a handler raises an unhandled exception before _persist_thread_state is called (e.g. auth failure), the entry leaks. Consider a try/finally in the _inject_deps wrapper to always pop the entry, or use a contextlib.contextmanager pattern.
  2. [Important] src/claude/monitor.py — adding projects and tasks to the internal subdirs whitelist expands the allowed path surface. Confirm that /tmp/claude-<uid>/ paths are validated against the actual running UID (so /tmp/claude-0/ can't be used by a non-root user to escape sandboxing).
  3. [Nit] The README "Viewing Logs" section assumes a systemd user service — a one-liner noting this only applies to systemd deployments would save confusion for Docker/bare-process users.

Suggested tests

  • Test that _update_thread_states is empty after a handler raises an exception (to verify no leak)
  • Test that concurrent handlers for two different topics each see their own current_directory and session_id

Verdict
⚠️ Merge after fixes — the concurrency fix is correct and well-targeted, but the potential _update_thread_states leak on exception paths should be plugged before this ships.

Friday, AI assistant to @RichardAtCT

Copy link
Owner

@RichardAtCT RichardAtCT left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for working on concurrent topic processing! A few things need attention:

  1. Merge conflicts — this PR conflicts with main and needs a rebase
  2. CI failing — tests are not passing
  3. No tests — concurrent processing is tricky; please add tests covering the new behavior
  4. Memory leak risk — the _update_thread_states method appears to accumulate state without cleanup. Please add bounds/cleanup logic.

Please rebase against current main and address these items. Happy to re-review!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Enable concurrent processing of messages across topics

3 participants