Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 18 additions & 3 deletions src/tiledb/cloud/bioimg/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,19 @@
from tiledb.cloud.bioimg.exportation import export
from tiledb.cloud.bioimg.ingestion import ingest
from .exportation import export
from .helpers import batch
from .helpers import get_embeddings_uris
from .helpers import scale_calc
from .helpers import serialize_filter
Comment on lines +2 to +5
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

these helpers don’t seem like they would be public-facing.

from .ingestion import ingest
from .types import EMBEDDINGS
from .types import SupportedExtensions

__all__ = ("ingest", "export")
__all__ = (
"ingest",
"export",
"get_embeddings_uris",
"serialize_filter",
"batch",
"scale_calc",
SupportedExtensions,
EMBEDDINGS,
)
12 changes: 12 additions & 0 deletions src/tiledb/cloud/bioimg/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,18 @@ def get_logger_wrapper(

_SUPPORTED_EXTENSIONS = (".tiff", ".tif", ".svs", ".tdb")

from .types import SupportedExtensions


def get_embeddings_uris(output_file_uri: str) -> Tuple[str, str]:
destination = os.path.dirname(output_file_uri)
filename = os.path.basename(output_file_uri).split(".")
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

check out os.path.splitext

embeddings_flat_uri = os.path.join(destination, f"{filename}_embeddings_flat")
embeddings_ivf_flat_uri = os.path.join(
destination, f"{filename}_embeddings_ivf_flat"
)
return embeddings_flat_uri, embeddings_ivf_flat_uri


def get_uris(
source: Sequence[str], output_dir: str, config: Mapping[str, Any], output_ext: str
Expand Down
229 changes: 189 additions & 40 deletions src/tiledb/cloud/bioimg/ingestion.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,185 @@
from tiledb.cloud.bioimg.helpers import serialize_filter
from tiledb.cloud.utilities._common import run_dag

from .types import EMBEDDINGS

DEFAULT_RESOURCES = {"cpu": "8", "memory": "4Gi"}
DEFAULT_IMG_NAME = "3.9-imaging-dev"
DEFAULT_DAG_NAME = "bioimg-ingestion"

# --------------------------------------------------------------------
# UDFs
# --------------------------------------------------------------------


def ingest_tiff_udf(
io_uris: Sequence[Tuple],
config: Mapping[str, Any],
threads,
embedding_model,
embedding_level,
embedding_grid,
*args: Any,
**kwargs,
):
"""Internal udf that ingests server side batch of bioimaging files
into tiledb arrays using tiledb-bioimg API

:param io_uris: Pairs of tiff input - output tdb uris
:param config: dict configuration to pass on tiledb.VFS
:param embedding_model: The model to be used for creating embedding.
Supported values are of type class EMBEDDINGS
:param embedding_level: The resolution level to be used for the embedding.
This could be different from the ingestion level selected with parameter `level`
:param embedding_grid: A tuple that represents the (num_of_rows, num_of_cols)
in which the image will be splitted in patches for the embedding creation.
According to this grid internally the image is being splitted
to fit this requirement.
"""

import os

import numpy as np

import tiledb.vector_search as vs
from tiledb import filter
from tiledb.bioimg.converters.ome_tiff import OMETiffConverter
from tiledb.bioimg.openslide import TileDBOpenSlide
from tiledb.cloud.bioimg.types import EMBEDDINGS

compressor = kwargs.get("compressor", None)
if compressor:
compressor_args = dict(compressor)
compressor_name = compressor_args.pop("_name")
if compressor_name:
compressor_args = {
k: None if not v else v for k, v in compressor_args.items()
}
kwargs["compressor"] = vars(filter).get(compressor_name)(**compressor_args)
else:
raise ValueError

conf = tiledb.Config(params=config)
vfs = tiledb.VFS(config=conf)
if embedding_model:
# Calculate image embedding on image patches [arg=ResNet]
def calculate_model(
images_array: np.ndarray, model_id: EMBEDDINGS
) -> np.ndarray:
import tensorflow as tf
from tensorflow.keras.applications.resnet_v2 import preprocess_input

if model_id is EMBEDDINGS.RESNET:
model = tf.keras.applications.ResNet50V2(include_top=False)
maps = model.predict(preprocess_input(images_array))
if np.prod(maps.shape) == maps.shape[-1] * len(images_array):
return np.squeeze(maps)
else:
return maps.mean(axis=1).mean(axis=1)

def get_embeddings_uris(output_file_uri: str) -> Tuple[str, str]:
# The uri of the embeddings point inside the image group
filename = os.path.basename(output_file_uri).split(".")[0]
embeddings_flat_uri = os.path.join(
output_file_uri, f"{filename}_embeddings_flat"
)
embeddings_ivf_flat_uri = os.path.join(
output_file_uri, f"{filename}_embeddings_ivf_flat"
)
return embeddings_flat_uri, embeddings_ivf_flat_uri

with tiledb.scope_ctx(ctx_or_config=conf):
for input, output in io_uris:
with vfs.open(input) as src:
OMETiffConverter.to_tiledb(src, output, **kwargs)

if embedding_model:
# Create the embeddings output paths
embeddings_flat_uri, embeddings_ivf_flat_uri = get_embeddings_uris(
output
)

# Split image [level=arg] in rectangular patches [arg] size
def split_in_patches(
image: np.ndarray,
embedding_level: int,
embedding_grid: Tuple[int, int],
) -> Sequence[np.ndarray]:
level_shape_w, level_shape_h = image.level_dimensions[
embedding_level
]
# Calculate the region size based on the desired
# number of rows and columns
grid_row_num, grid_col_num = embedding_grid
region_height = level_shape_h // grid_row_num
region_width = level_shape_w // grid_col_num
# Loop through the image and extract each region
patches = []
for i in range(grid_row_num):
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems like it is going to be very slow?

for j in range(grid_col_num):
location = (i, j)
patches.append(
image.read_region(
location,
embedding_level,
(region_height, region_width),
)
)
return patches

with TileDBOpenSlide(output) as image:
# Filter out patches with [arg] % non-blank coverage
filtered = split_in_patches(image, embedding_level, embedding_grid)

patches_array = np.array([])
for patch in filtered:
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(also seems slow..)

patch_transformed = patch[np.newaxis]
if patches_array.any():
patches_array = np.concatenate(
(patches_array, patch_transformed), axis=0
)
else:
patches_array = patch_transformed

# Create embedding of the image given its patches
embeddings = calculate_model(patches_array, embedding_model)
# Create flat index of the embedding for faster vector search
tmp_features_file = "/tmp/features"
with open(tmp_features_file, "wb") as f:
np.array(embeddings.shape, dtype="uint32").tofile(f)
np.array(embeddings).astype("float32").tofile(f)

vs.ingest(
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you explain a bit the use case that you want to support?

From what I understand this is creating one vector index per image, storing the embedding of different patches of the image. Does this mean that you want to find similar patches within the image? Do you want to do any cross image queries?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The idea here is to find similar patches across multiple images. We don't have any specific customer request for any of that. The idea behind the patches though is that in this data environment is quite unlikely to find similarities across whole images. Each image is quite different from one another as a whole based on the depicted cell. Also from my knowledge due to the blank space outside the cells captured by sensor it is quite possible to introduce a lot of noise in the model if fed to it as a whole. So my thought was that a future user would like to search for similarities across images of a specific region of the query image, or in a next step to be able to query a region from the viewer and find similar "abnormalities" - that he can justify and I can't- across the images.

index_type="FLAT",
index_uri=embeddings_flat_uri,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You also need to pass source_uri of the input vectors (tmp_features_file)

source_uri="features",
source_type="F32BIN",
)
vs.ingest(
index_type="IVF_FLAT",
index_uri=embeddings_ivf_flat_uri,
source_uri="features",
source_type="F32BIN",
)

# Store embedding and flat index inside the image data model
grp = tiledb.Group(output, "w")
grp.add(embeddings_flat_uri)
grp.add(embeddings_ivf_flat_uri)
grp.meta["Embeddings Model"] = EMBEDDINGS.RESNET.name
grp.meta["fmt_version"] = 3
grp.close()

try:
os.remove(tmp_features_file)
except OSError as e:
print(f"Error: {tmp_features_file} could not be deleted. {e}")


# --------------------------------------------------------------------
# User functions
# --------------------------------------------------------------------


def ingest(
source: Union[Sequence[str], str],
Expand All @@ -24,6 +199,9 @@ def ingest(
resources: Optional[Mapping[str, Any]] = None,
compute: bool = True,
namespace: Optional[str],
embedding_model: Optional[EMBEDDINGS] = None,
embedding_level: int = 0,
embedding_grid: Tuple[int, int] = (4, 4),
verbose: bool = False,
**kwargs,
) -> tiledb.cloud.dag.DAG:
Expand All @@ -41,52 +219,20 @@ def ingest(
:param compute: When True the DAG returned will be computed inside the function
otherwise DAG will only be returned.
:param namespace: The namespace where the DAG will run
:param embedding_model: The model to be used for creating embedding.
Supported values are of type class EMBEDDINGS
:param embedding_level: The resolution level to be used for the embedding.
This could be different from the ingestion level selected with parameter `level`
:param embedding_grid: A tuple that represents the (num_of_rows, num_of_cols)
in which the image will be splitted in patches for the embedding creation.
According to this grid internally the image is being splitted
to fit this requirement.
:param verbose: verbose logging, defaults to False
"""

logger = get_logger_wrapper(verbose)
logger.debug("tiledbioimg=%s", tiledb.bioimg.version.version)

def ingest_tiff_udf(
io_uris: Sequence[Tuple],
config: Mapping[str, Any],
verbose: bool = False,
*args: Any,
**kwargs,
):
"""Internal udf that ingests server side batch of bioimaging files
into tiledb arrays using tiledb-bioimg API

:param io_uris: Pairs of tiff input - output tdb uris
:param config: dict configuration to pass on tiledb.VFS
"""

from tiledb import filter
from tiledb.bioimg import Converters
from tiledb.bioimg import from_bioimg

compressor = kwargs.get("compressor", None)
if compressor:
compressor_args = dict(compressor)
compressor_name = compressor_args.pop("_name")
if compressor_name:
compressor_args = {
k: None if not v else v for k, v in compressor_args.items()
}
kwargs["compressor"] = vars(filter).get(compressor_name)(
**compressor_args
)
else:
raise ValueError

conf = tiledb.Config(params=config)
vfs = tiledb.VFS(config=conf)

with tiledb.scope_ctx(ctx_or_config=conf):
for input, output in io_uris:
with vfs.open(input) as src:
from_bioimg(src, output, converter=Converters.OMETIFF, **kwargs)

if isinstance(source, str):
# Handle only lists
source = [source]
Expand Down Expand Up @@ -125,6 +271,9 @@ def ingest_tiff_udf(
config,
verbose,
threads,
embedding_model,
embedding_level,
embedding_grid,
*args,
name=f"{task_prefix} - {i}",
mode=tiledb.cloud.dag.Mode.BATCH,
Expand Down
12 changes: 12 additions & 0 deletions src/tiledb/cloud/bioimg/types.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
import enum


class EMBEDDINGS(enum.Enum):
RESNET = enum.auto()


class SupportedExtensions(enum.Enum):
TIFF: str = ".tiff"
TIF: str = ".tif"
SVS: str = ".svs"
TDB: str = ".tdb"