Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 18 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,22 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

## [1.2.2] - 2026-03-30

### Security

- Added image validation on upload: uploaded bytes are verified with Pillow's
`verify()` and `load()` to reject corrupt, truncated, or malicious files
before they reach storage or processing. Decompression bombs are blocked via
a 100-megapixel limit, and only JPEG/PNG/WEBP/TIFF formats are accepted.
- Explicitly set `ImageFile.LOAD_TRUNCATED_IMAGES = False` in the Pillow
processor to prevent partial parsing of corrupt images that could trigger
Pillow CVEs.
- Added filename sanitization on upload to prevent path-traversal attacks
(`../../../etc/passwd`), null-byte injection, and hidden-file creation.
`LocalImageStorage.store()` also validates the resolved path stays inside
the base directory as defence-in-depth.

## [1.2.1] - 2026-03-29

### Fixed
Expand Down Expand Up @@ -146,7 +162,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- Fix `type: ignore` comment on `rowcount` to use correct mypy error code `attr-defined`.
- Add proper type annotation for `settings` parameter in retention sweep endpoint.

[unreleased]: https://github.com/vlantonov/ImageProcessingServiceDemo/compare/v1.2.1...HEAD
[unreleased]: https://github.com/vlantonov/ImageProcessingServiceDemo/compare/v1.2.2...HEAD
[1.2.2]: https://github.com/vlantonov/ImageProcessingServiceDemo/compare/v1.2.1...v1.2.2
[1.2.1]: https://github.com/vlantonov/ImageProcessingServiceDemo/compare/v1.2.0...v1.2.1
[1.2.0]: https://github.com/vlantonov/ImageProcessingServiceDemo/compare/v1.1.0...v1.2.0
[1.1.0]: https://github.com/vlantonov/ImageProcessingServiceDemo/compare/v1.0.1...v1.1.0
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[project]
name = "image-processing-service"
version = "1.2.1"
version = "1.2.2"
description = "High-performance image processing microservice with Clean Architecture"
requires-python = ">=3.11"
dependencies = [
Expand Down
72 changes: 72 additions & 0 deletions src/infrastructure/processing/image_validator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
"""Image validation to prevent malicious file exploits against Pillow.

Validates that uploaded bytes are legitimate images before storage or
processing, mitigating decompression bombs, truncated-file attacks,
and other CVEs targeting image parsers.
"""

from __future__ import annotations

import io
import logging

from PIL import Image as PILImage
from PIL import ImageFile

# Reject truncated images — never set to True, as it allows partial
# parsing of corrupt/malicious files that can trigger Pillow CVEs.
ImageFile.LOAD_TRUNCATED_IMAGES = False

logger = logging.getLogger(__name__)

ALLOWED_FORMATS = {"JPEG", "PNG", "WEBP", "TIFF"}

# 100 megapixels — generous for legitimate photos, blocks decompression bombs.
MAX_IMAGE_PIXELS = 100_000_000


class InvalidImageError(Exception):
"""Raised when uploaded bytes fail image validation."""


def validate_image_bytes(data: bytes, allowed_formats: set[str] | None = None) -> None:
"""Validate that *data* is a genuine, safe image file.

Raises ``InvalidImageError`` if the data is not a valid image, uses a
disallowed format, or exceeds pixel-count limits (decompression bomb).
"""
if allowed_formats is None:
allowed_formats = ALLOWED_FORMATS

if not data:
raise InvalidImageError("Empty file")

old_max = PILImage.MAX_IMAGE_PIXELS
try:
PILImage.MAX_IMAGE_PIXELS = MAX_IMAGE_PIXELS

img = PILImage.open(io.BytesIO(data))

# Detect format before full decode — fast rejection of unsupported types.
fmt = (img.format or "").upper()
if fmt not in allowed_formats:
raise InvalidImageError(f"Unsupported image format: {fmt or 'unknown'}")

# Full decode + integrity check. This calls Pillow's verify() which
# reads the full file and validates checksums / structure without
# loading pixel data into memory.
img.verify()

# After verify() the image object is unusable, so re-open and load
# to ensure pixel data is decodable (catches truncated payloads).
img = PILImage.open(io.BytesIO(data))
img.load()

except InvalidImageError:
raise
except PILImage.DecompressionBombError as exc:
raise InvalidImageError(f"Image exceeds pixel limit: {exc}") from exc
except Exception as exc:
raise InvalidImageError(f"Invalid or corrupt image file: {exc}") from exc
finally:
PILImage.MAX_IMAGE_PIXELS = old_max
4 changes: 4 additions & 0 deletions src/infrastructure/processing/pillow_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,13 @@
from concurrent.futures import ProcessPoolExecutor

from PIL import Image as PILImage
from PIL import ImageFile

from src.domain.interfaces.image_processor import ImageProcessor, ProcessingResult

# Security: never load truncated/corrupt images — prevents CVE exploitation.
ImageFile.LOAD_TRUNCATED_IMAGES = False

# Module-level executor shared across requests.
_executor: ProcessPoolExecutor | None = None

Expand Down
9 changes: 7 additions & 2 deletions src/infrastructure/storage/local_image_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,13 @@ def __init__(self, base_dir: str) -> None:

async def store(self, filename: str, data: bytes) -> str:
content_hash = hashlib.sha256(data).hexdigest()[:12]
safe_name = f"{content_hash}_{filename}"
dest = self._base / safe_name
# Use only the basename to strip any residual path components.
basename = Path(filename).name or "unnamed"
safe_name = f"{content_hash}_{basename}"
dest = (self._base / safe_name).resolve()
# Defence-in-depth: ensure the resolved path stays inside the base dir.
if not str(dest).startswith(str(self._base.resolve())):
raise ValueError("Path traversal detected in filename")
await asyncio.to_thread(dest.write_bytes, data)
return str(dest)

Expand Down
12 changes: 11 additions & 1 deletion src/presentation/api/routes/images.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,15 @@
from src.application.use_cases.list_images import ListImagesUseCase
from src.application.use_cases.process_image import ProcessImageUseCase
from src.application.use_cases.upload_image import UploadImageUseCase
from src.infrastructure.processing.image_validator import InvalidImageError, validate_image_bytes
from src.infrastructure.processing.pipeline import process_batch
from src.presentation.api.dependencies import (
get_get_image_use_case,
get_list_use_case,
get_process_use_case,
get_upload_use_case,
)
from src.presentation.sanitize import sanitize_filename
from src.presentation.schemas.image_schemas import (
BatchProcessRequest,
BatchProcessResponse,
Expand Down Expand Up @@ -57,8 +59,16 @@ async def upload_image(
status_code=status.HTTP_413_REQUEST_ENTITY_TOO_LARGE,
detail="File exceeds 50 MB limit",
)
try:
validate_image_bytes(data)
except InvalidImageError as exc:
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
detail=f"Image validation failed: {exc}",
) from exc
safe_filename = sanitize_filename(file.filename or "unnamed")
result = await use_case.execute(
filename=file.filename or "unnamed",
filename=safe_filename,
data=data,
tags=tags,
ttl_hours=ttl_hours,
Expand Down
59 changes: 59 additions & 0 deletions src/presentation/sanitize.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
"""Filename sanitization for uploaded files.

Strips path traversal components, null bytes, and unsafe characters to
prevent directory-traversal attacks and filesystem issues.
"""

from __future__ import annotations

import re
import unicodedata
from pathlib import PurePosixPath

# Characters allowed in sanitized filenames: alphanumeric, hyphen, underscore, dot.
_SAFE_CHARS = re.compile(r"[^\w.\-]", re.ASCII)

MAX_FILENAME_LENGTH = 255


def sanitize_filename(filename: str) -> str:
"""Return a safe, flat filename from an untrusted user-supplied string.

- Strips directory components (path traversal like ``../../etc/passwd``)
- Removes null bytes and control characters
- Normalises Unicode to ASCII-safe form
- Replaces unsafe characters with underscores
- Collapses consecutive dots/underscores
- Falls back to ``"unnamed"`` if the result is empty
- Truncates to 255 characters
"""
# Remove null bytes first — they can bypass downstream checks.
filename = filename.replace("\x00", "")

# Normalise Unicode to NFKD and strip non-ASCII to avoid homoglyph attacks.
filename = unicodedata.normalize("NFKD", filename).encode("ascii", "ignore").decode("ascii")

# Extract only the final path component — defeats ../ traversal.
filename = PurePosixPath(filename).name

# Also handle Windows-style backslash separators.
if "\\" in filename:
filename = filename.rsplit("\\", 1)[-1]

# Strip leading dots to prevent hidden files (e.g. .htaccess, .env).
filename = filename.lstrip(".")

# Replace unsafe characters with underscores.
filename = _SAFE_CHARS.sub("_", filename)

# Collapse runs of underscores and dots.
filename = re.sub(r"[_.]{2,}", "_", filename)

# Strip leading/trailing underscores and dots.
filename = filename.strip("_.")

# Truncate to filesystem limit.
if len(filename) > MAX_FILENAME_LENGTH:
filename = filename[:MAX_FILENAME_LENGTH]

return filename or "unnamed"
101 changes: 101 additions & 0 deletions tests/infrastructure/test_image_validator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
"""Tests for image validation security checks."""

from __future__ import annotations

import io
import struct

import pytest
from PIL import Image as PILImage

from src.infrastructure.processing.image_validator import (
InvalidImageError,
validate_image_bytes,
)


@pytest.fixture
def valid_png() -> bytes:
img = PILImage.new("RGB", (100, 80), color=(255, 0, 0))
buf = io.BytesIO()
img.save(buf, format="PNG")
return buf.getvalue()


@pytest.fixture
def valid_jpeg() -> bytes:
img = PILImage.new("RGB", (100, 80), color=(0, 255, 0))
buf = io.BytesIO()
img.save(buf, format="JPEG")
return buf.getvalue()


def test_valid_png_passes(valid_png: bytes) -> None:
validate_image_bytes(valid_png)


def test_valid_jpeg_passes(valid_jpeg: bytes) -> None:
validate_image_bytes(valid_jpeg)


def test_empty_data_rejected() -> None:
with pytest.raises(InvalidImageError, match="Empty file"):
validate_image_bytes(b"")


def test_random_bytes_rejected() -> None:
with pytest.raises(InvalidImageError, match="Invalid or corrupt"):
validate_image_bytes(b"this is not an image at all")


def test_truncated_png_rejected(valid_png: bytes) -> None:
truncated = valid_png[: len(valid_png) // 2]
with pytest.raises(InvalidImageError):
validate_image_bytes(truncated)


def test_disallowed_format_rejected() -> None:
img = PILImage.new("RGB", (10, 10))
buf = io.BytesIO()
img.save(buf, format="BMP")
with pytest.raises(InvalidImageError, match="Unsupported image format"):
validate_image_bytes(buf.getvalue())


def test_custom_allowed_formats(valid_png: bytes) -> None:
with pytest.raises(InvalidImageError, match="Unsupported image format"):
validate_image_bytes(valid_png, allowed_formats={"JPEG"})


def test_webp_passes() -> None:
img = PILImage.new("RGB", (50, 50))
buf = io.BytesIO()
img.save(buf, format="WEBP")
validate_image_bytes(buf.getvalue())


def test_crafted_header_with_bad_body_rejected() -> None:
# PNG magic bytes followed by garbage
png_header = b"\x89PNG\r\n\x1a\n"
bad_data = png_header + b"\x00" * 100
with pytest.raises(InvalidImageError):
validate_image_bytes(bad_data)


def test_decompression_bomb_rejected() -> None:
"""Create a PNG that claims huge dimensions via header manipulation."""
# Create a minimal valid PNG, then patch the IHDR to claim enormous dimensions.
img = PILImage.new("L", (1, 1))
buf = io.BytesIO()
img.save(buf, format="PNG")
raw = bytearray(buf.getvalue())

# IHDR chunk starts at offset 8 (after 8-byte signature), chunk data at +16.
# Width (4 bytes) at offset 16, Height (4 bytes) at offset 20.
width_offset = 16
height_offset = 20
struct.pack_into(">I", raw, width_offset, 100_000)
struct.pack_into(">I", raw, height_offset, 100_000)

with pytest.raises(InvalidImageError):
validate_image_bytes(bytes(raw))
Loading
Loading