diff --git a/CHANGELOG.md b/CHANGELOG.md index ad7d7d5..13095f4 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,24 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +## [1.2.1] - 2026-03-29 + +### Fixed + +- `ProcessPoolExecutor` in `PillowImageProcessor` is now shut down during app + lifespan teardown, preventing leaked worker processes on exit. +- Race condition in retention sweeps: concurrent calls to `get_expired()` could + fetch the same rows, causing double-delete attempts on storage files. Added + `delete_expired_batch()` which uses `SELECT … FOR UPDATE SKIP LOCKED` to + atomically claim and delete expired rows in a single transaction. +- `ProcessImageUseCase` now cleans up the stored thumbnail if the final + metadata save fails, preventing orphaned files and an image stuck in + `PROCESSING` state. +- Narrowed overly broad `except Exception` handlers: `apply_retention.py` and + `process_image.py` now catch `OSError` for storage operations; + `pipeline.py` uses `asyncio.gather(return_exceptions=True)` instead of + swallowing exceptions inside coroutines. + ## [1.2.0] - 2026-03-29 ### Added @@ -128,6 +146,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - Fix `type: ignore` comment on `rowcount` to use correct mypy error code `attr-defined`. - Add proper type annotation for `settings` parameter in retention sweep endpoint. -[unreleased]: https://github.com/vlantonov/ImageProcessingServiceDemo/compare/v1.0.1...HEAD +[unreleased]: https://github.com/vlantonov/ImageProcessingServiceDemo/compare/v1.2.1...HEAD +[1.2.1]: https://github.com/vlantonov/ImageProcessingServiceDemo/compare/v1.2.0...v1.2.1 +[1.2.0]: https://github.com/vlantonov/ImageProcessingServiceDemo/compare/v1.1.0...v1.2.0 +[1.1.0]: https://github.com/vlantonov/ImageProcessingServiceDemo/compare/v1.0.1...v1.1.0 [1.0.1]: https://github.com/vlantonov/ImageProcessingServiceDemo/compare/v1.0.0...v1.0.1 [1.0.0]: https://github.com/vlantonov/ImageProcessingServiceDemo/releases/tag/v1.0.0 diff --git a/pyproject.toml b/pyproject.toml index bf7f463..b8f6013 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "image-processing-service" -version = "1.2.0" +version = "1.2.1" description = "High-performance image processing microservice with Clean Architecture" requires-python = ">=3.11" dependencies = [ diff --git a/src/application/use_cases/apply_retention.py b/src/application/use_cases/apply_retention.py index 236262e..7dc0d6a 100644 --- a/src/application/use_cases/apply_retention.py +++ b/src/application/use_cases/apply_retention.py @@ -26,20 +26,19 @@ def __init__(self, repository: ImageRepository, storage: ImageStorage) -> None: self._storage = storage async def execute(self, batch_size: int = 100) -> RetentionResult: - expired = await self._repository.get_expired(batch_size=batch_size) - deleted = 0 + deleted_images = await self._repository.delete_expired_batch( + batch_size=batch_size, + ) errors = 0 - for image in expired: + for image in deleted_images: try: await self._storage.delete(image.original_path) if image.thumbnail_path: await self._storage.delete(image.thumbnail_path) - await self._repository.delete(image.id) - deleted += 1 - except Exception: - logger.exception("Failed to delete expired image %s", image.id) + except OSError: + logger.exception("Failed to clean up storage for expired image %s", image.id) errors += 1 - logger.info("Retention sweep: deleted=%d errors=%d", deleted, errors) - return RetentionResult(deleted_count=deleted, errors=errors) + logger.info("Retention sweep: deleted=%d errors=%d", len(deleted_images), errors) + return RetentionResult(deleted_count=len(deleted_images), errors=errors) diff --git a/src/application/use_cases/process_image.py b/src/application/use_cases/process_image.py index 4e3a7c7..b39fcdf 100644 --- a/src/application/use_cases/process_image.py +++ b/src/application/use_cases/process_image.py @@ -36,6 +36,7 @@ async def execute(self, image_id: uuid.UUID) -> bool: image.mark_processing() await self._repository.save(image) + thumb_path: str | None = None try: raw_data = await self._storage.retrieve(image.original_path) result = await self._processor.generate_thumbnail(raw_data) @@ -50,11 +51,16 @@ async def execute(self, image_id: uuid.UUID) -> bool: channels=result.channels, ) image.mark_completed(thumb_path, metadata) + await self._repository.save(image) except Exception: logger.exception("Failed to process image %s", image_id) + if thumb_path is not None: + try: + await self._storage.delete(thumb_path) + except OSError: + logger.warning("Failed to clean up thumbnail %s", thumb_path) image.mark_failed() await self._repository.save(image) raise - await self._repository.save(image) return True diff --git a/src/domain/interfaces/image_repository.py b/src/domain/interfaces/image_repository.py index 041dce3..0670224 100644 --- a/src/domain/interfaces/image_repository.py +++ b/src/domain/interfaces/image_repository.py @@ -28,5 +28,15 @@ async def delete(self, image_id: uuid.UUID) -> bool: ... @abstractmethod async def get_expired(self, batch_size: int = 100) -> list[Image]: ... + @abstractmethod + async def delete_expired_batch(self, batch_size: int = 100) -> list[Image]: + """Atomically select and delete expired images. + + Uses row-level locking so concurrent sweeps never process the + same rows. Returns the deleted entities (caller should clean + up storage files afterward). + """ + ... + @abstractmethod async def count(self, *, status: str | None = None) -> int: ... diff --git a/src/infrastructure/cache/cached_image_repository.py b/src/infrastructure/cache/cached_image_repository.py index 2c9e4f2..70bfd3f 100644 --- a/src/infrastructure/cache/cached_image_repository.py +++ b/src/infrastructure/cache/cached_image_repository.py @@ -47,5 +47,11 @@ async def delete(self, image_id: uuid.UUID) -> bool: async def get_expired(self, batch_size: int = 100) -> list[Image]: return await self._inner.get_expired(batch_size=batch_size) + async def delete_expired_batch(self, batch_size: int = 100) -> list[Image]: + deleted = await self._inner.delete_expired_batch(batch_size=batch_size) + for image in deleted: + self._cache.invalidate(image.id) + return deleted + async def count(self, *, status: str | None = None) -> int: return await self._inner.count(status=status) diff --git a/src/infrastructure/database/postgres_image_repository.py b/src/infrastructure/database/postgres_image_repository.py index bca65f1..fdcbc35 100644 --- a/src/infrastructure/database/postgres_image_repository.py +++ b/src/infrastructure/database/postgres_image_repository.py @@ -61,6 +61,25 @@ async def get_expired(self, batch_size: int = 100) -> list[Image]: result = await session.stream_scalars(stmt) return [_model_to_entity(row) async for row in result] + async def delete_expired_batch(self, batch_size: int = 100) -> list[Image]: + now = datetime.now(UTC) + async with self._session_factory() as session, session.begin(): + stmt = ( + select(ImageModel) + .where(ImageModel.expires_at.isnot(None)) + .where(ImageModel.expires_at <= now) + .limit(batch_size) + .with_for_update(skip_locked=True) + ) + result = await session.stream_scalars(stmt) + models = [row async for row in result] + entities = [_model_to_entity(m) for m in models] + if models: + ids = [m.id for m in models] + del_stmt = delete(ImageModel).where(ImageModel.id.in_(ids)) + await session.execute(del_stmt) + return entities + async def count(self, *, status: str | None = None) -> int: async with self._session_factory() as session: stmt = select(func.count()).select_from(ImageModel) diff --git a/src/infrastructure/processing/pillow_processor.py b/src/infrastructure/processing/pillow_processor.py index 741de7c..6a8b58f 100644 --- a/src/infrastructure/processing/pillow_processor.py +++ b/src/infrastructure/processing/pillow_processor.py @@ -26,6 +26,14 @@ def get_executor(max_workers: int = 4) -> ProcessPoolExecutor: return _executor +def shutdown_executor() -> None: + """Shut down the module-level executor, releasing worker processes.""" + global _executor + if _executor is not None: + _executor.shutdown(wait=True) + _executor = None + + # ── Free functions executed in worker processes ────────────────────────────── diff --git a/src/infrastructure/processing/pipeline.py b/src/infrastructure/processing/pipeline.py index 29d33ab..f1a0e7b 100644 --- a/src/infrastructure/processing/pipeline.py +++ b/src/infrastructure/processing/pipeline.py @@ -26,15 +26,17 @@ async def process_batch( async def _process_one(image_id: uuid.UUID) -> bool: async with semaphore: - try: - return await use_case.execute(image_id) - except Exception: - logger.exception("Pipeline error for image %s", image_id) - return False - - results = await asyncio.gather(*[_process_one(iid) for iid in image_ids]) - for ok in results: - if ok: + return await use_case.execute(image_id) + + results = await asyncio.gather( + *[_process_one(iid) for iid in image_ids], + return_exceptions=True, + ) + for result in results: + if isinstance(result, BaseException): + logger.error("Pipeline error: %s", result, exc_info=result) + failed += 1 + elif result: success += 1 else: failed += 1 diff --git a/src/main.py b/src/main.py index 0b33600..d3d799a 100644 --- a/src/main.py +++ b/src/main.py @@ -27,6 +27,9 @@ async def lifespan(app: FastAPI): await conn.run_sync(Base.metadata.create_all) logger.info("Database tables ready") yield + from src.infrastructure.processing.pillow_processor import shutdown_executor + + shutdown_executor() await engine.dispose() diff --git a/tests/application/test_apply_retention.py b/tests/application/test_apply_retention.py index 75ef228..00487f2 100644 --- a/tests/application/test_apply_retention.py +++ b/tests/application/test_apply_retention.py @@ -21,7 +21,7 @@ async def test_retention_deletes_expired(mock_repository, mock_storage): status=ProcessingStatus.COMPLETED, expires_at=datetime.now(UTC) - timedelta(hours=1), ) - mock_repository.get_expired.return_value = [expired_img] + mock_repository.delete_expired_batch.return_value = [expired_img] uc = ApplyRetentionUseCase(mock_repository, mock_storage) result = await uc.execute(batch_size=10) @@ -30,12 +30,12 @@ async def test_retention_deletes_expired(mock_repository, mock_storage): assert result.errors == 0 mock_storage.delete.assert_any_await("/data/old.png") mock_storage.delete.assert_any_await("/data/thumb_old.png") - mock_repository.delete.assert_awaited_once_with(expired_img.id) + mock_repository.delete_expired_batch.assert_awaited_once_with(batch_size=10) @pytest.mark.asyncio async def test_retention_no_expired(mock_repository, mock_storage): - mock_repository.get_expired.return_value = [] + mock_repository.delete_expired_batch.return_value = [] uc = ApplyRetentionUseCase(mock_repository, mock_storage) result = await uc.execute() diff --git a/tests/application/test_process_image.py b/tests/application/test_process_image.py index 2938a34..1287469 100644 --- a/tests/application/test_process_image.py +++ b/tests/application/test_process_image.py @@ -2,6 +2,8 @@ from __future__ import annotations +import uuid + import pytest from src.application.use_cases.process_image import ProcessImageUseCase @@ -29,7 +31,6 @@ async def test_process_image_success( @pytest.mark.asyncio async def test_process_image_not_found(mock_repository, mock_storage, mock_processor): - import uuid mock_repository.get_by_id.return_value = None uc = ProcessImageUseCase(mock_repository, mock_storage, mock_processor) @@ -53,3 +54,35 @@ async def test_process_image_marks_failed_on_error( await uc.execute(entity.id) assert entity.status == ProcessingStatus.FAILED + + +@pytest.mark.asyncio +async def test_process_image_cleans_up_thumbnail_on_save_failure( + sample_image_entity, mock_repository, mock_storage, mock_processor +): + """If repository.save fails after thumbnail is stored, the thumbnail must be deleted.""" + entity = sample_image_entity + mock_repository.get_by_id.return_value = entity + + call_count = 0 + + async def _save_side_effect(img): + nonlocal call_count + call_count += 1 + if call_count == 1: + return img # mark_processing save succeeds + if call_count == 2: + raise RuntimeError("DB write failed") # mark_completed save fails + return img # mark_failed save succeeds + + mock_repository.save.side_effect = _save_side_effect + mock_storage.retrieve.return_value = b"raw-pixels" + mock_storage.store.return_value = "/data/images/thumb_test.png" + + uc = ProcessImageUseCase(mock_repository, mock_storage, mock_processor) + + with pytest.raises(RuntimeError, match="DB write failed"): + await uc.execute(entity.id) + + assert entity.status == ProcessingStatus.FAILED + mock_storage.delete.assert_awaited_once_with("/data/images/thumb_test.png") diff --git a/tests/conftest.py b/tests/conftest.py index 697ab1e..738e34a 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -61,6 +61,7 @@ def mock_repository() -> ImageRepository: repo.list_images = AsyncMock(return_value=[]) repo.delete = AsyncMock(return_value=True) repo.get_expired = AsyncMock(return_value=[]) + repo.delete_expired_batch = AsyncMock(return_value=[]) repo.count = AsyncMock(return_value=0) return repo diff --git a/tests/infrastructure/test_cached_repository.py b/tests/infrastructure/test_cached_repository.py index b1846cd..cf7fa32 100644 --- a/tests/infrastructure/test_cached_repository.py +++ b/tests/infrastructure/test_cached_repository.py @@ -98,6 +98,7 @@ def inner(self) -> ImageRepository: repo.list_images = AsyncMock(return_value=[]) repo.delete = AsyncMock(return_value=True) repo.get_expired = AsyncMock(return_value=[]) + repo.delete_expired_batch = AsyncMock(return_value=[]) repo.count = AsyncMock(return_value=0) return repo @@ -165,6 +166,17 @@ async def test_get_expired_delegates(self, cached_repo, inner): await cached_repo.get_expired(batch_size=50) inner.get_expired.assert_awaited_once_with(batch_size=50) + async def test_delete_expired_batch_delegates_and_invalidates(self, cached_repo, inner, cache): + image = self._make_image() + cache.set(image) + inner.delete_expired_batch.return_value = [image] + + result = await cached_repo.delete_expired_batch(batch_size=50) + + assert result == [image] + inner.delete_expired_batch.assert_awaited_once_with(batch_size=50) + assert cache.get(image.id) is None + async def test_count_delegates(self, cached_repo, inner): await cached_repo.count(status="completed") inner.count.assert_awaited_once_with(status="completed")