Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 22 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,24 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

## [1.2.1] - 2026-03-29

### Fixed

- `ProcessPoolExecutor` in `PillowImageProcessor` is now shut down during app
lifespan teardown, preventing leaked worker processes on exit.
- Race condition in retention sweeps: concurrent calls to `get_expired()` could
fetch the same rows, causing double-delete attempts on storage files. Added
`delete_expired_batch()` which uses `SELECT … FOR UPDATE SKIP LOCKED` to
atomically claim and delete expired rows in a single transaction.
- `ProcessImageUseCase` now cleans up the stored thumbnail if the final
metadata save fails, preventing orphaned files and an image stuck in
`PROCESSING` state.
- Narrowed overly broad `except Exception` handlers: `apply_retention.py` and
`process_image.py` now catch `OSError` for storage operations;
`pipeline.py` uses `asyncio.gather(return_exceptions=True)` instead of
swallowing exceptions inside coroutines.

## [1.2.0] - 2026-03-29

### Added
Expand Down Expand Up @@ -128,6 +146,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- Fix `type: ignore` comment on `rowcount` to use correct mypy error code `attr-defined`.
- Add proper type annotation for `settings` parameter in retention sweep endpoint.

[unreleased]: https://github.com/vlantonov/ImageProcessingServiceDemo/compare/v1.0.1...HEAD
[unreleased]: https://github.com/vlantonov/ImageProcessingServiceDemo/compare/v1.2.1...HEAD
[1.2.1]: https://github.com/vlantonov/ImageProcessingServiceDemo/compare/v1.2.0...v1.2.1
[1.2.0]: https://github.com/vlantonov/ImageProcessingServiceDemo/compare/v1.1.0...v1.2.0
[1.1.0]: https://github.com/vlantonov/ImageProcessingServiceDemo/compare/v1.0.1...v1.1.0
[1.0.1]: https://github.com/vlantonov/ImageProcessingServiceDemo/compare/v1.0.0...v1.0.1
[1.0.0]: https://github.com/vlantonov/ImageProcessingServiceDemo/releases/tag/v1.0.0
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[project]
name = "image-processing-service"
version = "1.2.0"
version = "1.2.1"
description = "High-performance image processing microservice with Clean Architecture"
requires-python = ">=3.11"
dependencies = [
Expand Down
17 changes: 8 additions & 9 deletions src/application/use_cases/apply_retention.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,20 +26,19 @@ def __init__(self, repository: ImageRepository, storage: ImageStorage) -> None:
self._storage = storage

async def execute(self, batch_size: int = 100) -> RetentionResult:
expired = await self._repository.get_expired(batch_size=batch_size)
deleted = 0
deleted_images = await self._repository.delete_expired_batch(
batch_size=batch_size,
)
errors = 0

for image in expired:
for image in deleted_images:
try:
await self._storage.delete(image.original_path)
if image.thumbnail_path:
await self._storage.delete(image.thumbnail_path)
await self._repository.delete(image.id)
deleted += 1
except Exception:
logger.exception("Failed to delete expired image %s", image.id)
except OSError:
logger.exception("Failed to clean up storage for expired image %s", image.id)
errors += 1

logger.info("Retention sweep: deleted=%d errors=%d", deleted, errors)
return RetentionResult(deleted_count=deleted, errors=errors)
logger.info("Retention sweep: deleted=%d errors=%d", len(deleted_images), errors)
return RetentionResult(deleted_count=len(deleted_images), errors=errors)
8 changes: 7 additions & 1 deletion src/application/use_cases/process_image.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ async def execute(self, image_id: uuid.UUID) -> bool:
image.mark_processing()
await self._repository.save(image)

thumb_path: str | None = None
try:
raw_data = await self._storage.retrieve(image.original_path)
result = await self._processor.generate_thumbnail(raw_data)
Expand All @@ -50,11 +51,16 @@ async def execute(self, image_id: uuid.UUID) -> bool:
channels=result.channels,
)
image.mark_completed(thumb_path, metadata)
await self._repository.save(image)
except Exception:
logger.exception("Failed to process image %s", image_id)
if thumb_path is not None:
try:
await self._storage.delete(thumb_path)
except OSError:
logger.warning("Failed to clean up thumbnail %s", thumb_path)
image.mark_failed()
await self._repository.save(image)
raise

await self._repository.save(image)
return True
10 changes: 10 additions & 0 deletions src/domain/interfaces/image_repository.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,5 +28,15 @@ async def delete(self, image_id: uuid.UUID) -> bool: ...
@abstractmethod
async def get_expired(self, batch_size: int = 100) -> list[Image]: ...

@abstractmethod
async def delete_expired_batch(self, batch_size: int = 100) -> list[Image]:
"""Atomically select and delete expired images.

Uses row-level locking so concurrent sweeps never process the
same rows. Returns the deleted entities (caller should clean
up storage files afterward).
"""
...

@abstractmethod
async def count(self, *, status: str | None = None) -> int: ...
6 changes: 6 additions & 0 deletions src/infrastructure/cache/cached_image_repository.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,5 +47,11 @@ async def delete(self, image_id: uuid.UUID) -> bool:
async def get_expired(self, batch_size: int = 100) -> list[Image]:
return await self._inner.get_expired(batch_size=batch_size)

async def delete_expired_batch(self, batch_size: int = 100) -> list[Image]:
deleted = await self._inner.delete_expired_batch(batch_size=batch_size)
for image in deleted:
self._cache.invalidate(image.id)
return deleted

async def count(self, *, status: str | None = None) -> int:
return await self._inner.count(status=status)
19 changes: 19 additions & 0 deletions src/infrastructure/database/postgres_image_repository.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,25 @@ async def get_expired(self, batch_size: int = 100) -> list[Image]:
result = await session.stream_scalars(stmt)
return [_model_to_entity(row) async for row in result]

async def delete_expired_batch(self, batch_size: int = 100) -> list[Image]:
now = datetime.now(UTC)
async with self._session_factory() as session, session.begin():
stmt = (
select(ImageModel)
.where(ImageModel.expires_at.isnot(None))
.where(ImageModel.expires_at <= now)
.limit(batch_size)
.with_for_update(skip_locked=True)
)
result = await session.stream_scalars(stmt)
models = [row async for row in result]
entities = [_model_to_entity(m) for m in models]
if models:
ids = [m.id for m in models]
del_stmt = delete(ImageModel).where(ImageModel.id.in_(ids))
await session.execute(del_stmt)
return entities

async def count(self, *, status: str | None = None) -> int:
async with self._session_factory() as session:
stmt = select(func.count()).select_from(ImageModel)
Expand Down
8 changes: 8 additions & 0 deletions src/infrastructure/processing/pillow_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,14 @@ def get_executor(max_workers: int = 4) -> ProcessPoolExecutor:
return _executor


def shutdown_executor() -> None:
"""Shut down the module-level executor, releasing worker processes."""
global _executor
if _executor is not None:
_executor.shutdown(wait=True)
_executor = None


# ── Free functions executed in worker processes ──────────────────────────────


Expand Down
20 changes: 11 additions & 9 deletions src/infrastructure/processing/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,17 @@ async def process_batch(

async def _process_one(image_id: uuid.UUID) -> bool:
async with semaphore:
try:
return await use_case.execute(image_id)
except Exception:
logger.exception("Pipeline error for image %s", image_id)
return False

results = await asyncio.gather(*[_process_one(iid) for iid in image_ids])
for ok in results:
if ok:
return await use_case.execute(image_id)

results = await asyncio.gather(
*[_process_one(iid) for iid in image_ids],
return_exceptions=True,
)
for result in results:
if isinstance(result, BaseException):
logger.error("Pipeline error: %s", result, exc_info=result)
failed += 1
elif result:
success += 1
else:
failed += 1
Expand Down
3 changes: 3 additions & 0 deletions src/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@ async def lifespan(app: FastAPI):
await conn.run_sync(Base.metadata.create_all)
logger.info("Database tables ready")
yield
from src.infrastructure.processing.pillow_processor import shutdown_executor

shutdown_executor()
await engine.dispose()


Expand Down
6 changes: 3 additions & 3 deletions tests/application/test_apply_retention.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ async def test_retention_deletes_expired(mock_repository, mock_storage):
status=ProcessingStatus.COMPLETED,
expires_at=datetime.now(UTC) - timedelta(hours=1),
)
mock_repository.get_expired.return_value = [expired_img]
mock_repository.delete_expired_batch.return_value = [expired_img]

uc = ApplyRetentionUseCase(mock_repository, mock_storage)
result = await uc.execute(batch_size=10)
Expand All @@ -30,12 +30,12 @@ async def test_retention_deletes_expired(mock_repository, mock_storage):
assert result.errors == 0
mock_storage.delete.assert_any_await("/data/old.png")
mock_storage.delete.assert_any_await("/data/thumb_old.png")
mock_repository.delete.assert_awaited_once_with(expired_img.id)
mock_repository.delete_expired_batch.assert_awaited_once_with(batch_size=10)


@pytest.mark.asyncio
async def test_retention_no_expired(mock_repository, mock_storage):
mock_repository.get_expired.return_value = []
mock_repository.delete_expired_batch.return_value = []

uc = ApplyRetentionUseCase(mock_repository, mock_storage)
result = await uc.execute()
Expand Down
35 changes: 34 additions & 1 deletion tests/application/test_process_image.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

from __future__ import annotations

import uuid

import pytest

from src.application.use_cases.process_image import ProcessImageUseCase
Expand Down Expand Up @@ -29,7 +31,6 @@ async def test_process_image_success(

@pytest.mark.asyncio
async def test_process_image_not_found(mock_repository, mock_storage, mock_processor):
import uuid

mock_repository.get_by_id.return_value = None
uc = ProcessImageUseCase(mock_repository, mock_storage, mock_processor)
Expand All @@ -53,3 +54,35 @@ async def test_process_image_marks_failed_on_error(
await uc.execute(entity.id)

assert entity.status == ProcessingStatus.FAILED


@pytest.mark.asyncio
async def test_process_image_cleans_up_thumbnail_on_save_failure(
sample_image_entity, mock_repository, mock_storage, mock_processor
):
"""If repository.save fails after thumbnail is stored, the thumbnail must be deleted."""
entity = sample_image_entity
mock_repository.get_by_id.return_value = entity

call_count = 0

async def _save_side_effect(img):
nonlocal call_count
call_count += 1
if call_count == 1:
return img # mark_processing save succeeds
if call_count == 2:
raise RuntimeError("DB write failed") # mark_completed save fails
return img # mark_failed save succeeds

mock_repository.save.side_effect = _save_side_effect
mock_storage.retrieve.return_value = b"raw-pixels"
mock_storage.store.return_value = "/data/images/thumb_test.png"

uc = ProcessImageUseCase(mock_repository, mock_storage, mock_processor)

with pytest.raises(RuntimeError, match="DB write failed"):
await uc.execute(entity.id)

assert entity.status == ProcessingStatus.FAILED
mock_storage.delete.assert_awaited_once_with("/data/images/thumb_test.png")
1 change: 1 addition & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ def mock_repository() -> ImageRepository:
repo.list_images = AsyncMock(return_value=[])
repo.delete = AsyncMock(return_value=True)
repo.get_expired = AsyncMock(return_value=[])
repo.delete_expired_batch = AsyncMock(return_value=[])
repo.count = AsyncMock(return_value=0)
return repo

Expand Down
12 changes: 12 additions & 0 deletions tests/infrastructure/test_cached_repository.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ def inner(self) -> ImageRepository:
repo.list_images = AsyncMock(return_value=[])
repo.delete = AsyncMock(return_value=True)
repo.get_expired = AsyncMock(return_value=[])
repo.delete_expired_batch = AsyncMock(return_value=[])
repo.count = AsyncMock(return_value=0)
return repo

Expand Down Expand Up @@ -165,6 +166,17 @@ async def test_get_expired_delegates(self, cached_repo, inner):
await cached_repo.get_expired(batch_size=50)
inner.get_expired.assert_awaited_once_with(batch_size=50)

async def test_delete_expired_batch_delegates_and_invalidates(self, cached_repo, inner, cache):
image = self._make_image()
cache.set(image)
inner.delete_expired_batch.return_value = [image]

result = await cached_repo.delete_expired_batch(batch_size=50)

assert result == [image]
inner.delete_expired_batch.assert_awaited_once_with(batch_size=50)
assert cache.get(image.id) is None

async def test_count_delegates(self, cached_repo, inner):
await cached_repo.count(status="completed")
inner.count.assert_awaited_once_with(status="completed")
Expand Down
Loading