diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index d6fd0381..0358b23a 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -8,6 +8,6 @@ repos: hooks: - id: ruff - repo: https://github.com/pre-commit/mirrors-mypy - rev: v0.991 + rev: v1.4.1 hooks: - id: mypy diff --git a/tests/data/ome-metadata/hcs.ome.xml b/tests/data/ome-metadata/hcs.ome.xml new file mode 100644 index 00000000..8dca4836 --- /dev/null +++ b/tests/data/ome-metadata/hcs.ome.xml @@ -0,0 +1,37 @@ + + + + + + + + + + + + + + + + + 2008-02-06T13:43:19 + An example OME compliant file, based on Olympus.oib + + + + + + + + \ No newline at end of file diff --git a/tests/data/ome-metadata/one-screen-one-plate-four-wells.ome.xml b/tests/data/ome-metadata/one-screen-one-plate-four-wells.ome.xml new file mode 100644 index 00000000..caf573e0 --- /dev/null +++ b/tests/data/ome-metadata/one-screen-one-plate-four-wells.ome.xml @@ -0,0 +1,161 @@ + + + Plate 1 description. + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + 2010-02-23T12:51:30 + + + /wCrzur//wB5oMPi/wBIbJO3AP8ePGCF + + + + 2010-02-23T12:51:30 + + + /wCrzur//wB5oMPi/wBIbJO3AP8ePGCF + + + + 2010-02-23T12:51:30 + + + /wCrzur//wB5oMPi/wBIbJO3AP8ePGCF + + + + 2010-02-23T12:51:30 + + + /wCrzur//wB5oMPi/wBIbJO3AP8ePGCF + + + + 2010-02-23T12:51:30 + + + /wCrzur//wB5oMPi/wBIbJO3AP8ePGCF + + + + 2010-02-23T12:51:30 + + + /wCrzur//wB5oMPi/wBIbJO3AP8ePGCF + + + + 2010-02-23T12:51:30 + + + /wCrzur//wB5oMPi/wBIbJO3AP8ePGCF + + + + 2010-02-23T12:51:30 + + + /wCrzur//wB5oMPi/wBIbJO3AP8ePGCF + + + + 2010-02-23T12:51:30 + + + /wCrzur//wB5oMPi/wBIbJO3AP8ePGCF + + + + 2010-02-23T12:51:30 + + + /wCrzur//wB5oMPi/wBIbJO3AP8ePGCF + + + + 2010-02-23T12:51:30 + + + /wCrzur//wB5oMPi/wBIbJO3AP8ePGCF + + + \ No newline at end of file diff --git a/tests/data/ome-metadata/two-screens-two-plates-four-wells.ome.xml b/tests/data/ome-metadata/two-screens-two-plates-four-wells.ome.xml new file mode 100644 index 00000000..513f58a8 --- /dev/null +++ b/tests/data/ome-metadata/two-screens-two-plates-four-wells.ome.xml @@ -0,0 +1,313 @@ + + + Plate 1 description. + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + Plate 2 description. + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + twoScreen + + + + + + + 2010-02-23T12:51:30 + + + /wCrzur//wB5oMPi/wBIbJO3AP8ePGCF + + + + 2010-02-23T12:51:30 + + + /wCrzur//wB5oMPi/wBIbJO3AP8ePGCF + + + + 2010-02-23T12:51:30 + + + /wCrzur//wB5oMPi/wBIbJO3AP8ePGCF + + + + 2010-02-23T12:51:30 + + + /wCrzur//wB5oMPi/wBIbJO3AP8ePGCF + + + + 2010-02-23T12:51:30 + + + /wCrzur//wB5oMPi/wBIbJO3AP8ePGCF + + + + 2010-02-23T12:51:30 + + + /wCrzur//wB5oMPi/wBIbJO3AP8ePGCF + + + + 2010-02-23T12:51:30 + + + /wCrzur//wB5oMPi/wBIbJO3AP8ePGCF + + + + 2010-02-23T12:51:30 + + + /wCrzur//wB5oMPi/wBIbJO3AP8ePGCF + + + + 2010-02-23T12:51:30 + + + /wCrzur//wB5oMPi/wBIbJO3AP8ePGCF + + + + 2010-02-23T12:51:30 + + + /wCrzur//wB5oMPi/wBIbJO3AP8ePGCF + + + + 2010-02-23T12:51:30 + + + /wCrzur//wB5oMPi/wBIbJO3AP8ePGCF + + + + 2010-02-23T12:51:30 + + + /wCrzur//wB5oMPi/wBIbJO3AP8ePGCF + + + + 2010-02-23T12:51:30 + + + /wCrzur//wB5oMPi/wBIbJO3AP8ePGCF + + + + 2010-02-23T12:51:30 + + + /wCrzur//wB5oMPi/wBIbJO3AP8ePGCF + + + + 2010-02-23T12:51:30 + + + /wCrzur//wB5oMPi/wBIbJO3AP8ePGCF + + + + 2010-02-23T12:51:30 + + + /wCrzur//wB5oMPi/wBIbJO3AP8ePGCF + + + + 2010-02-23T12:51:30 + + + /wCrzur//wB5oMPi/wBIbJO3AP8ePGCF + + + + 2010-02-23T12:51:30 + + + /wCrzur//wB5oMPi/wBIbJO3AP8ePGCF + + + + 2010-02-23T12:51:30 + + + /wCrzur//wB5oMPi/wBIbJO3AP8ePGCF + + + + 2010-02-23T12:51:30 + + + /wCrzur//wB5oMPi/wBIbJO3AP8ePGCF + + + + 2010-02-23T12:51:30 + + + /wCrzur//wB5oMPi/wBIbJO3AP8ePGCF + + + + 2010-02-23T12:51:30 + + + /wCrzur//wB5oMPi/wBIbJO3AP8ePGCF + + + \ No newline at end of file diff --git a/tests/unit/test_ngff.py b/tests/unit/test_ngff.py new file mode 100644 index 00000000..767639f3 --- /dev/null +++ b/tests/unit/test_ngff.py @@ -0,0 +1,58 @@ +import pytest +from tifffile import tifffile + +from tests import get_path +from tiledb.bioimg.metadata import NGFFPlate, NGFFWell + + +@pytest.mark.parametrize( + "filename, expected", + [ + [ + "one-screen-one-plate-four-wells.ome.xml", + { + "plates": 1, + "acquisition": {"Plate:1": 2}, + "wells": {"Plate:1": {(1, 1): 2, (1, 2): 2, (2, 1): 5, (2, 2): 2}}, + }, + ], + [ + "two-screens-two-plates-four-wells.ome.xml", + { + "plates": 2, + "acquisition": {"Plate:1": 2, "Plate:2": 1}, + "wells": { + "Plate:1": {(1, 1): 2, (1, 2): 2, (2, 1): 5, (2, 2): 2}, + "Plate:2": {(1, 1): 2, (1, 2): 2, (2, 1): 5, (2, 2): 2}, + }, + }, + ], + [ + "hcs.ome.xml", + { + "plates": 1, + "acquisition": {"Plate:1": 0}, + "wells": {"Plate:1": {(0, 0): 1}}, + }, + ], + ], +) +def test_plate_ome_to_ngff(filename, expected): + input_path = get_path(f"ome-metadata/{filename}") + with open(input_path) as f: + omexml = f.read() + + plates = NGFFPlate.from_ome_tiff(tifffile.xml2dict(omexml)) + wells = NGFFWell.from_ome_tiff(tifffile.xml2dict(omexml)) + + assert len(plates) == expected.get("plates") + + for key, plate in plates.items(): + assert len(plate.wells) == len(expected.get("wells").get(key)) + assert ( + expected.get("acquisition").get(key) == 0 and plate.acquisitions is None + ) or len(plate.acquisitions) == expected.get("acquisition").get(key) + for well in plate.wells: + assert expected.get("wells").get(key).get( + (well.rowIndex, well.columnIndex) + ) == len(wells.get(key).get((well.rowIndex, well.columnIndex)).images) diff --git a/tiledb/bioimg/converters/base.py b/tiledb/bioimg/converters/base.py index 19b613a3..7fc8a1bd 100644 --- a/tiledb/bioimg/converters/base.py +++ b/tiledb/bioimg/converters/base.py @@ -22,6 +22,7 @@ import numpy as np from tqdm import tqdm +from ..metadata import NGFFMetadata from .scale import Scaler try: @@ -129,6 +130,11 @@ def image_metadata(self) -> Dict[str, Any]: def original_metadata(self) -> Dict[str, Any]: """Return the metadata of the original file.""" + @property + @abstractmethod + def ngff_metadata(self) -> Union[NGFFMetadata, None]: + """Return the NGFF compliant metadata of the original file.""" + class ImageWriter(ABC): @abstractmethod diff --git a/tiledb/bioimg/converters/ome_tiff.py b/tiledb/bioimg/converters/ome_tiff.py index fd44f5f7..f7cdf8af 100644 --- a/tiledb/bioimg/converters/ome_tiff.py +++ b/tiledb/bioimg/converters/ome_tiff.py @@ -10,6 +10,7 @@ from .. import ATTR_NAME, EXPORT_TILE_SIZE, WHITE_RGBA from ..helpers import get_decimal_from_rgba, get_rgba, iter_color +from ..metadata import NGFFMetadata from .axes import Axes from .base import ImageConverter, ImageReader, ImageWriter from .metadata import qpi_image_meta, qpi_original_meta @@ -280,6 +281,10 @@ def original_metadata(self) -> Dict[str, Any]: return metadata + @property + def ngff_metadata(self) -> Union[NGFFMetadata, None]: + return NGFFMetadata.from_ome_tiff(self._tiff) + class OMETiffWriter(ImageWriter): def __init__(self, output_path: str, ome: bool = True): diff --git a/tiledb/bioimg/converters/ome_zarr.py b/tiledb/bioimg/converters/ome_zarr.py index dd7b8449..cab6c0de 100644 --- a/tiledb/bioimg/converters/ome_zarr.py +++ b/tiledb/bioimg/converters/ome_zarr.py @@ -1,7 +1,7 @@ from __future__ import annotations import json -from typing import Any, Dict, List, Mapping, Optional, Sequence, Tuple, cast +from typing import Any, Dict, List, Mapping, Optional, Sequence, Tuple, Union, cast import numpy import numpy as np @@ -14,6 +14,7 @@ from .. import WHITE_RGB from ..helpers import get_rgba +from ..metadata import NGFFMetadata from .axes import Axes from .base import ImageConverter, ImageReader, ImageWriter @@ -132,6 +133,10 @@ def original_metadata(self) -> Dict[str, Any]: return metadata + @property + def ngff_metadata(self) -> Union[NGFFMetadata, None]: + return None + class OMEZarrWriter(ImageWriter): def __init__(self, output_path: str): diff --git a/tiledb/bioimg/converters/openslide.py b/tiledb/bioimg/converters/openslide.py index 83f3a355..d079e8fb 100644 --- a/tiledb/bioimg/converters/openslide.py +++ b/tiledb/bioimg/converters/openslide.py @@ -1,4 +1,4 @@ -from typing import Any, Dict, Optional, Sequence, Tuple, cast +from typing import Any, Dict, Optional, Sequence, Tuple, Union, cast import numpy as np import openslide as osd @@ -6,6 +6,7 @@ from tiledb.cc import WebpInputFormat from ..helpers import iter_color +from ..metadata import NGFFMetadata from .axes import Axes from .base import ImageConverter, ImageReader @@ -100,6 +101,10 @@ def image_metadata(self) -> Dict[str, Any]: def original_metadata(self) -> Dict[str, Any]: return {"SVS": list(self._osd.properties.items())} + @property + def ngff_metadata(self) -> Union[NGFFMetadata, None]: + return None + class OpenSlideConverter(ImageConverter): """Converter of OpenSlide-supported images to TileDB Groups of Arrays""" diff --git a/tiledb/bioimg/metadata.py b/tiledb/bioimg/metadata.py new file mode 100644 index 00000000..203f6b07 --- /dev/null +++ b/tiledb/bioimg/metadata.py @@ -0,0 +1,741 @@ +import json +from dataclasses import dataclass +from datetime import datetime +from typing import ( + Any, + Mapping, + MutableMapping, + MutableSequence, + Optional, + Sequence, + Tuple, + Union, +) + +import tifffile +from tifffile import TiffFile +from typing_extensions import Literal, Self + +SpaceUnit = Literal[ + "angstrom", + "attometer", + "centimeter", + "decimeter", + "exameter", + "femtometer", + "foot", + "gigameter", + "hectometer", + "inch", + "kilometer", + "megameter", + "meter", + "micrometer", + "mile", + "millimeter", + "nanometer", + "parsec", + "petameter", + "picometer", + "terameter", + "yard", + "yoctometer", + "yottameter", + "zeptometer", + "zettameter", +] +TimeUnit = Literal[ + "attosecond", + "centisecond", + "day", + "decisecond", + "exasecond", + "femtosecond", + "gigasecond", + "hectosecond", + "hour", + "kilosecond", + "megasecond", + "microsecond", + "millisecond", + "minute", + "nanosecond", + "petasecond", + "picosecond", + "second", + "terasecond", + "yoctosecond", + "yottasecond", + "zeptosecond", + "zettasecond", +] + +spaceUnitSymbolMap: Mapping[str, SpaceUnit] = { + "Å": "angstrom", + "am": "attometer", + "cm": "centimeter", + "dm": "decimeter", + "Em": "exameter", + "fm": "femtometer", + "ft": "foot", + "Gm": "gigameter", + "hm": "hectometer", + "in": "inch", + "km": "kilometer", + "Mm": "megameter", + "m": "meter", + "µm": "micrometer", + "mi.": "mile", + "mm": "millimeter", + "nm": "nanometer", + "pc": "parsec", + "Pm": "petameter", + "pm": "picometer", + "Tm": "terameter", + "yd": "yard", + "ym": "yoctometer", + "Ym": "yottameter", + "zm": "zeptometer", + "Zm": "zettameter", +} + +timeUnitSymbolMap: Mapping[str, TimeUnit] = { + "as": "attosecond", + "cs": "centisecond", + "d": "day", + "ds": "decisecond", + "Es": "exasecond", + "fs": "femtosecond", + "Gs": "gigasecond", + "hs": "hectosecond", + "h": "hour", + "ks": "kilosecond", + "Ms": "megasecond", + "µs": "microsecond", + "ms": "millisecond", + "min": "minute", + "ns": "nanosecond", + "Ps": "petasecond", + "ps": "picosecond", + "s": "second", + "Ts": "terasecond", + "ys": "yoctosecond", + "Ys": "yottasecond", + "zs": "zeptosecond", + "Zs": "zettasecond", +} + + +class JSONEncoder(json.JSONEncoder): + def default(self, obj: Any) -> Any: + if isinstance(obj, NGFFLabelProperty): + return { + key: val + for key, val in { + **obj.__dict__, + **(obj.additionalMetadata if obj.additionalMetadata else {}), + }.items() + if val is not None + } + return {key: val for key, val in obj.__dict__ if val is not None} + + +class NGFFAxes: + def __init__( + self, + name: str, + type: Optional[Union[Literal["space", "time", "channel"], str]] = None, + unit: Optional[Union[SpaceUnit, TimeUnit]] = None, + ): + self.name = name + self.type = type + self.unit = unit + + name: str + type: Optional[Union[Literal["space", "time", "channel"], str]] + unit: Optional[Union[SpaceUnit, TimeUnit]] + + +class NGFFCoordinateTransformation: + def __init__( + self, + type: Literal["identity", "translation", "scale"], + translation: Optional[Sequence[float]] = None, + scale: Optional[Sequence[float]] = None, + ): + self.type = type + self.translation = translation + self.scale = scale + + type: Literal["identity", "translation", "scale"] + translation: Optional[Sequence[float]] + scale: Optional[Sequence[float]] + + +class NGFFDataset: + def __init__( + self, + path: str, + coordinateTransformations: Sequence[NGFFCoordinateTransformation], + ): + self.path = path + self.coordinateTransformations = coordinateTransformations + + path: str + coordinateTransformations: Sequence[NGFFCoordinateTransformation] + + +class NGFFMultiscale: + def __init__( + self, + version: str, + axes: Sequence[NGFFAxes], + datasets: Sequence[NGFFDataset], + name: Optional[str] = None, + type: Optional[str] = None, + metadata: Optional[Mapping[str, Any]] = None, + coordinateTransformations: Optional[ + Sequence[NGFFCoordinateTransformation] + ] = None, + ): + self.version = version + self.name = name + self.type = type + self.metadata = metadata + self.axes = axes + self.datasets = datasets + self.coordinateTransformations = coordinateTransformations + + version: str + name: Optional[str] + type: Optional[str] + metadata: Optional[Mapping[str, Any]] + axes: Sequence[NGFFAxes] + datasets: Sequence[NGFFDataset] + coordinateTransformations: Optional[Sequence[NGFFCoordinateTransformation]] + + +@dataclass +class NGFFLabelColor: + labelValue: int + rgba: Tuple[int, int, int, int] + + +class NGFFLabelProperty: + def __init__( + self, labelValue: int, additionalMetadata: Optional[Mapping[str, Any]] = None + ): + self.labelValue = labelValue + self.additionalMetadata = additionalMetadata + + labelValue: int + additionalMetadata: Optional[Mapping[str, Any]] + + +@dataclass +class NGFFLabelSource: + image: str + + +class NGFFImageLabel: + def __init__( + self, + version: str, + colors: Optional[Sequence[NGFFLabelColor]] = None, + properties: Optional[Sequence[NGFFLabelProperty]] = None, + source: Optional[NGFFLabelSource] = None, + ): + self.version = version + self.colors = colors + self.properties = properties + self.source = source + + version: str + colors: Optional[Sequence[NGFFLabelColor]] + properties: Optional[Sequence[NGFFLabelProperty]] + source: Optional[NGFFLabelSource] + + +class NGFFAcquisition: + def __init__( + self, + id: int, + name: Optional[str] = None, + maximumFieldCount: Optional[int] = None, + description: Optional[str] = None, + startTime: Optional[int] = None, + endTime: Optional[int] = None, + ): + self.id = id + self.name = name + self.maximumFieldCount = maximumFieldCount + self.description = description + self.startTime = startTime + self.endTime = endTime + + id: int + name: Optional[str] + maximumFieldCount: Optional[int] + description: Optional[str] + startTime: Optional[int] + endTime: Optional[int] + + +@dataclass +class NGFFColumn: + name: str + + +@dataclass +class NGFFRow: + name: str + + +@dataclass +class NGFFPlateWell: + path: str + rowIndex: int + columnIndex: int + + +class NGFFPlate: + def __init__( + self, + version: str, + columns: Sequence[NGFFColumn], + rows: Sequence[NGFFRow], + wells: Sequence[NGFFPlateWell], + fieldCount: Optional[int] = None, + name: Optional[str] = None, + acquisitions: Optional[Sequence[NGFFAcquisition]] = None, + ): + self.version = version + self.columns = columns + self.rows = rows + self.wells = wells + self.fieldCount = fieldCount + self.name = name + self.acquisitions = acquisitions + + version: str + columns: Sequence[NGFFColumn] + rows: Sequence[NGFFRow] + wells: Sequence[NGFFPlateWell] + fieldCount: Optional[int] + name: Optional[str] + acquisitions: Optional[Sequence[NGFFAcquisition]] + + @classmethod + def from_ome_tiff( + cls, ome_metadata: Mapping[str, Any] + ) -> Union[Mapping[str, Self], None]: + ome_plates = ome_metadata.get("OME", {}).get("Plate", []) + ome_plates = [ome_plates] if not isinstance(ome_plates, list) else ome_plates + plates: MutableMapping[str, Self] = {} + + if not len(ome_plates): + return None + + for ome_plate in ome_plates: + wells: MutableSequence[NGFFPlateWell] = [] + acquisitions: MutableSequence[NGFFAcquisition] = [] + + row_naming: Literal["number", "letter"] = ome_plate.get( + "RowNamingConvention", "number" + ) + column_naming: Literal["number", "letter"] = ome_plate.get( + "ColumnNamingConvention", "number" + ) + + ome_acquisitions = ome_plate.get("PlateAcquisition", []) + ome_acquisitions = ( + [ome_acquisitions] + if not isinstance(ome_acquisitions, list) + else ome_acquisitions + ) + for ome_acquisition in ome_acquisitions: + start_time = ( + int( + datetime.fromisoformat( + ome_acquisition.get("StartTime") + ).timestamp() + ) + if "StartTime" in ome_acquisition + else None + ) + end_time = ( + int( + datetime.fromisoformat( + ome_acquisition.get("EndTime") + ).timestamp() + ) + if "EndTime" in ome_acquisition + else None + ) + acquisitions.append( + NGFFAcquisition( + id=ome_acquisition.get("ID"), + name=ome_acquisition.get("Name"), + description=ome_acquisition.get("Description"), + maximumFieldCount=ome_acquisition.get("MaximumFieldCount"), + startTime=start_time, + endTime=end_time, + ) + ) + + number_of_rows = 1 + number_of_columns = 1 + ome_wells = ome_plate.get("Well", []) + ome_wells = [ome_wells] if not isinstance(ome_wells, list) else ome_wells + + for ome_well in ome_wells: + number_of_rows = max(ome_well.get("Row") + 1, number_of_rows) + number_of_columns = max(ome_well.get("Column") + 1, number_of_columns) + wells.append( + NGFFPlateWell( + path=f'{format_number(ome_well.get("Row"), row_naming)}/{format_number(ome_well.get("Column"), column_naming)}', + rowIndex=ome_well.get("Row"), + columnIndex=ome_well.get("Column"), + ) + ) + plates.setdefault( + ome_plate.get("ID"), + cls( + version="0.5-dev", + columns=[ + NGFFColumn(format_number(idx, column_naming)) + for idx in range(number_of_columns) + ], + rows=[ + NGFFRow(format_number(idx, row_naming)) + for idx in range(number_of_rows) + ], + wells=wells, + acquisitions=acquisitions if len(acquisitions) else None, + name=ome_plate.get("Name"), + ), + ) + + return plates + + +class NGFFWellImage: + def __init__(self, path: str, acquisition: Optional[int] = None): + self.path = path + self.acquisition = acquisition + + path: str + acquisition: Optional[int] + + +class NGFFWell: + def __init__(self, images: Sequence[NGFFWellImage], version: Optional[str] = None): + self.version = version + self.images = images + + version: Optional[str] + images: Sequence[NGFFWellImage] + + @classmethod + def from_ome_tiff( + cls, ome_metadata: Mapping[str, Any] + ) -> Optional[Mapping[str, Mapping[Tuple[int, int], Self]]]: + ome_images = ome_metadata.get("OME", {}).get("Image", []) + ome_images = [ome_images] if not isinstance(ome_images, list) else ome_images + ome_plates = ome_metadata.get("OME", {}).get("Plate", []) + ome_plates = [ome_plates] if not isinstance(ome_plates, list) else ome_plates + + wells: MutableMapping[str, MutableMapping[Tuple[int, int], Self]] = {} + + if not len(ome_plates) or not len(ome_images): + return None + + for ome_plate in ome_plates: + ome_acquisitions = ome_plate.get("PlateAcquisition", []) + ome_acquisitions = ( + [ome_acquisitions] + if not isinstance(ome_acquisitions, list) + else ome_acquisitions + ) + ome_wells = ome_plate.get("Well", []) + ome_wells = [ome_wells] if not isinstance(ome_wells, list) else ome_wells + + if not len(ome_plate) or not len(ome_wells): + continue + + image_name_map: MutableMapping[str, str] = {} + for image in ome_images: + image_name_map.setdefault( + image.get("ID"), image.get("Name", image.get("ID")) + ) + + sample_acquisition_map: MutableMapping[str, int] = {} + for idx, acquisition in enumerate(ome_acquisitions): + for sample in acquisition.get("WellSampleRef", []): + sample_acquisition_map.setdefault(sample.get("ID"), idx) + + wells.setdefault(ome_plate.get("ID"), {}) + + for well in ome_wells: + images: MutableSequence[NGFFWellImage] = [] + ome_samples = well.get("WellSample", []) + ome_samples = ( + [ome_samples] if not isinstance(ome_samples, list) else ome_samples + ) + for sample in ome_samples: + images.append( + NGFFWellImage( + path=image_name_map.get( + sample.get("ImageRef", {}).get("ID"), "" + ), + acquisition=sample_acquisition_map.get(sample.get("ID")), + ) + ) + wells.get(ome_plate.get("ID"), {}).setdefault( + (int(well.get("Row")), int(well.get("Column"))), + cls(images=images, version="0.5-dev"), + ) + + return wells + + +class NGFFMetadata: + def __init__( + self, + axes: Sequence[NGFFAxes], + coordinateTransformations: Optional[ + Sequence[NGFFCoordinateTransformation] + ] = None, + multiscales: Optional[Sequence[NGFFMultiscale]] = None, + plate: Optional[Mapping[str, NGFFPlate]] = None, + wells: Optional[Mapping[str, Mapping[Tuple[int, int], NGFFWell]]] = None, + ): + self.axes = axes + self.coordinateTransformations = coordinateTransformations + self.multiscales = multiscales + self.plate = plate + self.wells = wells + + axes: Sequence[NGFFAxes] + coordinateTransformations: Optional[Sequence[NGFFCoordinateTransformation]] + multiscales: Optional[Sequence[NGFFMultiscale]] + labels: Optional[Sequence[str]] + # Image Labels are stored at the label image level + imageLabels: Optional[Sequence[NGFFImageLabel]] + + # Plate metadata should be written at the group level of each plate + plate: Optional[Mapping[str, NGFFPlate]] + + # Wells metadata should be written at the group level of each well. + # Each well is identified by a tuple (row, column) + wells: Optional[Mapping[str, Mapping[Tuple[int, int], NGFFWell]]] + + @classmethod + def from_ome_tiff(cls, tiff: TiffFile) -> Union[Self, None]: + multiscales: MutableSequence[NGFFMultiscale] = [] + ome_metadata = tifffile.xml2dict(tiff.ome_metadata) if tiff.ome_metadata else {} + + if "OME" not in ome_metadata: + return None + + ome_images = ome_metadata.get("OME", {}).get("Image", []) + ome_images = [ome_images] if not isinstance(ome_images, list) else ome_images + + if not len(ome_images): + return None + + # Step 1: Indentify all axes of the image. Special care must be taken for modulo datasets + # where multiple axes are squashed in TCZ dimensions. + xml_annotations = ( + ome_metadata.get("OME", {}) + .get("StructuredAnnotations", {}) + .get("XMLAnnotation", {}) + ) + + if not isinstance(xml_annotations, list): + xml_annotations = [xml_annotations] + + ome_modulo = {} + for annotation in ( + raw_annotation.get("Value", {}) for raw_annotation in xml_annotations + ): + if "Modulo" in annotation: + ome_modulo = annotation.get("Modulo", {}) + + additional_axes = dict() + for modulo_key in ["ModuloAlongZ", "ModuloAlongT", "ModuloAlongC"]: + if modulo_key not in ome_modulo: + continue + + modulo = ome_modulo.get(modulo_key, {}) + axis = NGFFAxes( + name=modulo_key, + type=modulo.get("Type", None), + unit=modulo.get("Unit", None), + ) + axis_size = ( + len(modulo.get("Label", [])) + if "Label" in modulo + else (modulo.get("End") - modulo.get("Start")) / modulo.get("Step", 1) + + 1 + ) + additional_axes[modulo_key] = (axis, axis_size) + + ome_pixels = ome_images[0].get("Pixels", {}) + canonical_axes = [ + "T", + "ModuloAlongT", + "ModuloAlongC", + "ModuloAlongZ", + "C", + "Z", + "Y", + "X", + ] + # Create 'axes' metadata field + axes = [] + for canonical_axis in canonical_axes: + if canonical_axis in ["X", "Y", "Z"]: + _, modulo_size = additional_axes.get( + f"ModuloAlong{canonical_axis}", (None, 1) + ) + if ome_pixels.get(f"Size{canonical_axis}") > modulo_size: + axes.append( + NGFFAxes( + name=canonical_axis, + type="space", + unit=spaceUnitSymbolMap.get( + ome_pixels.get( + f"PhysicalSize{canonical_axis}Unit", "µm" + ) + ), + ) + ) + elif canonical_axis == "C": + axes.append(NGFFAxes(name=canonical_axis, type="channel")) + elif canonical_axis == "T": + _, modulo_size = additional_axes.get("ModuloAlongT", (None, 1)) + if ome_pixels.get("SizeT") > modulo_size: + axes.append( + NGFFAxes( + name=canonical_axis, + type="time", + unit=timeUnitSymbolMap.get( + ome_pixels.get("TimeIncrementUnit", "s") + ), + ) + ) + elif canonical_axis in additional_axes: + axes.append(additional_axes.get(canonical_axis, [])[0]) + + # Create 'multiscales' metadata field + for idx, series in enumerate(tiff.series): + ome_pixels = ome_images[idx].get("Pixels", {}) + datasets: MutableSequence[NGFFDataset] = [] + x_index, y_index = series.levels[0].axes.index("X"), series.levels[ + 0 + ].axes.index("Y") + base_size = { + "X": series.levels[0].shape[x_index], + "Y": series.levels[0].shape[y_index], + } + + # Calculate axis using the base image + level_shape = list(series.levels[0].shape) + + # We need to map each modulo axis to its axis symbol + # Step 1: Iterate the dimension order + axes_order = [] + for dim in reversed(ome_pixels.get("DimensionOrder", "")): + size = ome_pixels.get(f"Size{dim}", 1) + + # If dimension size is 1 then the axis is skipped + if size == 1: + continue + + if dim in series.levels[0].axes: + # If the axis appear in the level axes then we add the axis + axes_order.append(dim) + + # If the length of the axis does not match its size then there must be a modulo axis + if size != level_shape[0]: + axes_order.append(f"ModuloAlong{dim}") + level_shape.pop(0) + level_shape.pop(0) + else: + axes_order.append(f"ModuloAlong{dim}") + level_shape.pop(0) + + if "C" not in axes_order: + axes_order.append("C") + + for idx, level in enumerate(series.levels): + if len(axes_order) != len(level.shape): + level_shape = list(level.shape) + [1] + else: + level_shape = list(level.shape) + + # Step 2: Calculate scale information for each axis after transpose + scale = [] + for axis in axes: + size = level_shape[axes_order.index(axis.name)] + + if axis.name in ["X", "Y"]: + scale.append( + ome_pixels.get(f"PhysicalSize{axis.name}", 1) + * base_size.get(axis.name, size) + / size + ) + else: + scale.append(1) + + datasets.append( + NGFFDataset( + level.name, [NGFFCoordinateTransformation("scale", scale)] + ) + ) + scale = [] + for axis in axes: + if axis.name == "T": + scale.append(ome_pixels.get("TimeIncrement", 1)) + elif axis.name == "Z": + scale.append(ome_pixels.get("PhysicalSizeZ", 1)) + else: + scale.append(1) + coordinateTransformation = ( + [NGFFCoordinateTransformation(type="scale", scale=scale)] + if not all(factor == 1 for factor in scale) + else None + ) + multiscales.append( + NGFFMultiscale( + version="0.5-dev", + name=series.name, + type=None, + metadata=None, + axes=axes, + datasets=datasets, + coordinateTransformations=coordinateTransformation, + ) + ) + + return cls( + axes=axes, + multiscales=multiscales, + plate=NGFFPlate.from_ome_tiff(ome_metadata), + wells=NGFFWell.from_ome_tiff(ome_metadata), + ) + + +def format_number(value: int, naming_convention: Literal["number", "letter"]) -> str: + if naming_convention == "number": + return str(value) + + value += 1 + + result = "" + while value > 0: + result = chr(ord("A") + (value - 1) % 26) + result + value = int((value - (value - 1) % 26) / 26) + + return result