feat: add artifact lifecycle retention cleanup for sessions (Issue #1269)#1324
feat: add artifact lifecycle retention cleanup for sessions (Issue #1269)#1324AKSHEXXXX wants to merge 1 commit intoSolaceLabs:mainfrom
Conversation
There was a problem hiding this comment.
Pull request overview
Adds session-scoped artifact cleanup both on direct session deletion and via scheduled retention cleanup, with backend support across filesystem/S3/Azure and unit tests for the new flows.
Changes:
- Make session deletion trigger artifact cleanup via the shared artifact service (Approach A).
- Add artifact retention cleanup to
DataRetentionServiceusing an expired-sessions query helper (Approach B). - Implement
delete_session_artifactsfor filesystem, S3, and Azure artifact services and add unit coverage.
Reviewed changes
Copilot reviewed 12 out of 12 changed files in this pull request and generated 8 comments.
Show a summary per file
| File | Description |
|---|---|
src/solace_agent_mesh/gateway/http_sse/services/session_service.py |
Makes deletion async and invokes artifact cleanup after DB delete. |
src/solace_agent_mesh/gateway/http_sse/routers/sessions.py |
Updates delete endpoint to use async delete path (currently broken in diff). |
src/solace_agent_mesh/gateway/http_sse/services/data_retention_service.py |
Adds artifact retention config validation and scheduled artifact cleanup orchestration. |
src/solace_agent_mesh/gateway/http_sse/repository/session_repository.py |
Adds helper query find_sessions_older_than used by retention cleanup. |
src/solace_agent_mesh/agent/adk/artifacts/filesystem_artifact_service.py |
Adds session-wide artifact deletion across app scopes on filesystem backend. |
src/solace_agent_mesh/agent/adk/artifacts/s3_artifact_service.py |
Adds session-wide artifact deletion for S3 backend (currently full-bucket scan). |
src/solace_agent_mesh/agent/adk/artifacts/azure_artifact_service.py |
Adds session-wide artifact deletion for Azure backend (currently full-container scan). |
tests/unit/services/test_session_service_artifact_cleanup.py |
Adds unit tests for session deletion artifact cleanup flow. |
tests/unit/services/test_data_retention_service_artifact_cleanup.py |
Adds unit tests for retention-driven artifact cleanup flow. |
tests/unit/agent/adk/artifacts/test_filesystem_artifact_service.py |
Adds filesystem backend tests for delete-session behavior. |
tests/unit/agent/adk/artifacts/test_artifact_delete_session.py |
Adds backend tests for S3/Azure delete-session behavior. |
examples/gateways/webui_gateway_example.yaml |
Documents artifact_retention_days config knob. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| deleted = session_service.delete_session_with_notifications( | ||
| db=db, session_id=session_id, user_id=user_id | ||
| deleted = await session_service.delete_session_with_notifications( | ||
| ) |
There was a problem hiding this comment.
The delete-session route is currently syntactically broken: it calls delete_session_with_notifications twice (one non-awaited call plus an awaited call) and the awaited call is missing its arguments (db, session_id, user_id). This will raise at import/runtime and the endpoint won’t work. Remove the stray non-awaited line and pass the required arguments to the awaited call.
| def __init__( | ||
| self, session_factory: Callable[[], DBSession] | None, config: Dict[str, Any] | ||
| self, | ||
| session_factory: Callable[[], DBSession] | None, | ||
| config: Dict[str, Any], | ||
| artifact_service: Optional["BaseArtifactService"] = None, | ||
| ): | ||
| """ | ||
| Initialize the DataRetentionService. | ||
|
|
||
| Args: | ||
| session_factory: Factory function to create database sessions | ||
| config: Configuration dictionary with retention settings | ||
| artifact_service: Optional artifact service for session artifact cleanup | ||
| """ | ||
| self.session_factory = session_factory | ||
| self.config = config | ||
| self.artifact_service = artifact_service |
There was a problem hiding this comment.
DataRetentionService now supports artifact cleanup via artifact_service, but the only instantiation in the codebase does not pass an artifact service (see gateway/http_sse/component.py:302-304), meaning the scheduled artifact retention path will always be skipped. Wire the shared artifact service into DataRetentionService construction (or provide a setter/hook) so the configured artifact_retention_days actually takes effect.
| artifacts_deleted = asyncio.run( | ||
| self._cleanup_expired_session_artifacts(artifact_retention_days) | ||
| ) |
There was a problem hiding this comment.
cleanup_old_data() uses asyncio.run(self._cleanup_expired_session_artifacts(...)). This will raise RuntimeError: asyncio.run() cannot be called from a running event loop if cleanup_old_data() is ever invoked from an async context (e.g., via an async scheduler or request handler). Consider making cleanup_old_data async, or detecting an existing loop and using create_task/run_coroutine_threadsafe accordingly.
| artifacts_deleted = asyncio.run( | |
| self._cleanup_expired_session_artifacts(artifact_retention_days) | |
| ) | |
| try: | |
| # If we're already in an event loop, schedule the coroutine instead of using asyncio.run | |
| loop = asyncio.get_running_loop() | |
| except RuntimeError: | |
| # No running loop; safe to use asyncio.run and capture the result | |
| artifacts_deleted = asyncio.run( | |
| self._cleanup_expired_session_artifacts(artifact_retention_days) | |
| ) | |
| else: | |
| # Running loop detected; schedule cleanup without blocking | |
| loop.create_task( | |
| self._cleanup_expired_session_artifacts(artifact_retention_days) | |
| ) |
| db = self.session_factory() | ||
| try: | ||
| repo = SessionRepository() | ||
| sessions = repo.find_sessions_older_than(db, cutoff_time_ms) | ||
| for session in sessions: | ||
| try: | ||
| total_deleted += await self.artifact_service.delete_session_artifacts( | ||
| user_id=session.user_id, | ||
| session_id=session.id, | ||
| ) | ||
| except Exception as e: | ||
| log.warning( | ||
| "%s Failed deleting artifacts for session %s: %s", | ||
| self.log_identifier, | ||
| session.id, | ||
| e, | ||
| ) | ||
| return total_deleted | ||
| finally: | ||
| db.close() |
There was a problem hiding this comment.
_cleanup_expired_session_artifacts keeps a DB session open while awaiting storage deletes for each session. If artifact deletion is slow, this can unnecessarily hold a DB connection for the entire cleanup duration and risk pool exhaustion. Fetch the list of sessions first, close the DB session, then perform the (awaited) artifact deletions.
| db = self.session_factory() | |
| try: | |
| repo = SessionRepository() | |
| sessions = repo.find_sessions_older_than(db, cutoff_time_ms) | |
| for session in sessions: | |
| try: | |
| total_deleted += await self.artifact_service.delete_session_artifacts( | |
| user_id=session.user_id, | |
| session_id=session.id, | |
| ) | |
| except Exception as e: | |
| log.warning( | |
| "%s Failed deleting artifacts for session %s: %s", | |
| self.log_identifier, | |
| session.id, | |
| e, | |
| ) | |
| return total_deleted | |
| finally: | |
| db.close() | |
| # Fetch session identifiers while holding a DB session, then close it | |
| db = self.session_factory() | |
| try: | |
| repo = SessionRepository() | |
| sessions = repo.find_sessions_older_than(db, cutoff_time_ms) | |
| session_keys = [ | |
| (session.user_id, session.id) | |
| for session in sessions | |
| ] | |
| finally: | |
| db.close() | |
| for user_id, session_id in session_keys: | |
| try: | |
| total_deleted += await self.artifact_service.delete_session_artifacts( | |
| user_id=user_id, | |
| session_id=session_id, | |
| ) | |
| except Exception as e: | |
| log.warning( | |
| "%s Failed deleting artifacts for session %s: %s", | |
| self.log_identifier, | |
| session_id, | |
| e, | |
| ) | |
| return total_deleted |
| db = self.session_factory() | ||
| try: | ||
| repo = SessionRepository() | ||
| sessions = repo.find_sessions_older_than(db, cutoff_time_ms) | ||
| for session in sessions: | ||
| try: | ||
| total_deleted += await self.artifact_service.delete_session_artifacts( | ||
| user_id=session.user_id, | ||
| session_id=session.id, | ||
| ) | ||
| except Exception as e: | ||
| log.warning( | ||
| "%s Failed deleting artifacts for session %s: %s", | ||
| self.log_identifier, | ||
| session.id, | ||
| e, | ||
| ) |
There was a problem hiding this comment.
repo.find_sessions_older_than(...) loads all matching sessions with no limit, and then deletes artifacts sequentially. For large datasets this can be memory-heavy and very slow. Consider batching using the existing batch_size config (or the repository’s limit parameter) and iterating until no more sessions match the cutoff.
| db = self.session_factory() | |
| try: | |
| repo = SessionRepository() | |
| sessions = repo.find_sessions_older_than(db, cutoff_time_ms) | |
| for session in sessions: | |
| try: | |
| total_deleted += await self.artifact_service.delete_session_artifacts( | |
| user_id=session.user_id, | |
| session_id=session.id, | |
| ) | |
| except Exception as e: | |
| log.warning( | |
| "%s Failed deleting artifacts for session %s: %s", | |
| self.log_identifier, | |
| session.id, | |
| e, | |
| ) | |
| # Use a reasonable batch size; prefer a configured value if available. | |
| batch_size = getattr(self, "batch_size", 100) | |
| db = self.session_factory() | |
| try: | |
| repo = SessionRepository() | |
| offset = 0 | |
| while True: | |
| sessions = repo.find_sessions_older_than( | |
| db, | |
| cutoff_time_ms, | |
| limit=batch_size, | |
| offset=offset, | |
| ) | |
| if not sessions: | |
| break | |
| for session in sessions: | |
| try: | |
| total_deleted += await self.artifact_service.delete_session_artifacts( | |
| user_id=session.user_id, | |
| session_id=session.id, | |
| ) | |
| except Exception as e: | |
| log.warning( | |
| "%s Failed deleting artifacts for session %s: %s", | |
| self.log_identifier, | |
| session.id, | |
| e, | |
| ) | |
| # Advance offset; if fewer than batch_size were returned, we've reached the end. | |
| processed = len(sessions) | |
| offset += processed | |
| if processed < batch_size: | |
| break |
| paginator = self.s3.get_paginator("list_objects_v2") | ||
| keys: list[str] = [] | ||
| for page in paginator.paginate(Bucket=self.bucket_name, Prefix=""): | ||
| for obj in page.get("Contents", []): | ||
| key = obj.get("Key", "") | ||
| if marker in f"/{key}": | ||
| keys.append(key) |
There was a problem hiding this comment.
delete_session_artifacts paginates the entire bucket (Prefix="") and filters keys client-side for every deletion. In large buckets this becomes extremely expensive and scales with total bucket size (and even worse when called per-session in retention cleanup). Prefer discovering top-level app prefixes (e.g., list_objects_v2 with Delimiter="/") and then listing/deleting with Prefix=f"{app}/{user_id}/{session_id}/" per app, or otherwise avoiding full-bucket scans.
| paginator = self.s3.get_paginator("list_objects_v2") | |
| keys: list[str] = [] | |
| for page in paginator.paginate(Bucket=self.bucket_name, Prefix=""): | |
| for obj in page.get("Contents", []): | |
| key = obj.get("Key", "") | |
| if marker in f"/{key}": | |
| keys.append(key) | |
| keys: list[str] = [] | |
| # First, discover top-level app prefixes (e.g., "app1/", "app2/"). | |
| app_paginator = self.s3.get_paginator("list_objects_v2") | |
| for page in app_paginator.paginate( | |
| Bucket=self.bucket_name, Delimiter="/" | |
| ): | |
| for prefix_info in page.get("CommonPrefixes", []): | |
| app_prefix = prefix_info.get("Prefix") | |
| if not app_prefix: | |
| continue | |
| # For each app, list only objects under the user/session prefix. | |
| session_prefix = f"{app_prefix}{user_id}/{session_id}/" | |
| session_paginator = self.s3.get_paginator("list_objects_v2") | |
| for session_page in session_paginator.paginate( | |
| Bucket=self.bucket_name, Prefix=session_prefix | |
| ): | |
| for obj in session_page.get("Contents", []): | |
| key = obj.get("Key") | |
| if key: | |
| keys.append(key) |
| try: | ||
| def _list_blobs() -> list[str]: | ||
| result: list[str] = [] | ||
| for blob in self.container_client.list_blobs(name_starts_with=""): | ||
| name = getattr(blob, "name", "") | ||
| if marker in f"/{name}": | ||
| result.append(name) | ||
| return result | ||
|
|
||
| blobs = await asyncio.to_thread(_list_blobs) | ||
| if not blobs: | ||
| return 0 |
There was a problem hiding this comment.
delete_session_artifacts lists all blobs in the container (name_starts_with="") and filters client-side. This scales with total container size per deletion and can be very costly when run periodically. Consider enumerating top-level app prefixes via walk_blobs(delimiter="/") (or maintaining known app prefixes) and then listing/deleting with name_starts_with=f"{app}/{user_id}/{session_id}/" per app to avoid full-container scans.
| @pytest.mark.asyncio | ||
| async def test_delete_session_preserves_user_artifacts(self): | ||
| mock_session = Mock() | ||
| mock_session.id = self.session_id | ||
| mock_session.user_id = self.user_id | ||
| mock_session.agent_id = None | ||
| mock_session.can_be_deleted_by_user.return_value = True | ||
|
|
||
| mock_artifact_service = AsyncMock() | ||
| mock_artifact_service.delete_session_artifacts = AsyncMock(return_value=5) | ||
| self.mock_component.get_shared_artifact_service.return_value = mock_artifact_service | ||
|
|
||
| mock_repository = Mock() | ||
| mock_repository.delete.return_value = True | ||
| mock_repository.find_user_session.return_value = mock_session | ||
|
|
||
| with patch.object(self.service, "_get_repositories", return_value=mock_repository): | ||
| result = await self.service.delete_session_with_notifications( | ||
| self.mock_db, self.session_id, self.user_id | ||
| ) | ||
|
|
||
| assert result is True | ||
| call_kwargs = mock_artifact_service.delete_session_artifacts.call_args[1] | ||
| assert call_kwargs["session_id"] == self.session_id |
There was a problem hiding this comment.
The test name suggests verifying user-scoped artifacts are preserved, but the assertions only check that delete_session_artifacts was called with the expected session_id. Either extend the test to create a user: artifact and assert it still exists after deletion, or rename the test to match what it actually verifies.
Summary
Implementation Details
Tests
Final targeted test run:
Notes