From 907e0e2b7e763d71e9be1cfa723a00d6441dc8db Mon Sep 17 00:00:00 2001 From: Jonas Teuwen Date: Thu, 26 Sep 2024 13:38:13 +0200 Subject: [PATCH] Use dynamic annotation reader and writers --- dlup/__init__.py | 4 +- dlup/annotations.py | 1195 ----------------- dlup/annotations/__init__.py | 711 ++++++++++ dlup/annotations/exporters/__init__.py | 1 + dlup/annotations/exporters/dlup_xml.py | 86 ++ dlup/annotations/exporters/geojson.py | 104 ++ dlup/annotations/importers/asap_xml.py | 112 ++ dlup/annotations/importers/darwin_json.py | 246 ++++ dlup/annotations/importers/dlup_xml.py | 148 ++ dlup/annotations/importers/geojson.py | 131 ++ dlup/annotations/importers/halo_xml.py | 124 ++ dlup/annotations/tags.py | 12 + dlup/annotations_experimental.py | 1485 --------------------- dlup/background.py | 16 +- dlup/data/dataset.py | 20 +- dlup/data/transforms.py | 269 ---- examples/annotations_to_mask.py | 3 +- src/geometry/box.h | 2 +- 18 files changed, 1699 insertions(+), 2970 deletions(-) delete mode 100644 dlup/annotations.py create mode 100644 dlup/annotations/__init__.py create mode 100644 dlup/annotations/exporters/__init__.py create mode 100644 dlup/annotations/exporters/dlup_xml.py create mode 100644 dlup/annotations/exporters/geojson.py create mode 100644 dlup/annotations/importers/asap_xml.py create mode 100644 dlup/annotations/importers/darwin_json.py create mode 100644 dlup/annotations/importers/dlup_xml.py create mode 100644 dlup/annotations/importers/geojson.py create mode 100644 dlup/annotations/importers/halo_xml.py create mode 100644 dlup/annotations/tags.py delete mode 100644 dlup/annotations_experimental.py delete mode 100644 dlup/data/transforms.py diff --git a/dlup/__init__.py b/dlup/__init__.py index e0d3fb0a..e3a5130f 100644 --- a/dlup/__init__.py +++ b/dlup/__init__.py @@ -4,7 +4,7 @@ from ._image import SlideImage from ._region import BoundaryMode, RegionView -from .annotations import AnnotationClass, AnnotationType, WsiAnnotations +from .annotations import SlideAnnotations pyvips_logger = logging.getLogger("pyvips") pyvips_logger.setLevel(logging.CRITICAL) @@ -13,4 +13,4 @@ __email__ = "j.teuwen@nki.nl" __version__ = "0.7.0" -__all__ = ("SlideImage", "WsiAnnotations", "AnnotationType", "AnnotationClass", "RegionView", "BoundaryMode") +__all__ = ("SlideImage", "SlideAnnotations", "RegionView", "BoundaryMode") diff --git a/dlup/annotations.py b/dlup/annotations.py deleted file mode 100644 index c57e7bb2..00000000 --- a/dlup/annotations.py +++ /dev/null @@ -1,1195 +0,0 @@ -# Copyright (c) dlup contributors -""" -Annotation module for dlup. - -There are three types of annotations, in the `AnnotationType` variable: -- points -- boxes (which are internally polygons) -- polygons - -Supported file formats: -- ASAP XML -- Darwin V7 JSON -- GeoJSON -- HaloXML -""" -from __future__ import annotations - -import copy -import errno -import functools -import json -import os -import pathlib -import warnings -import xml.etree.ElementTree as ET -from dataclasses import dataclass, replace -from enum import Enum -from typing import Any, Callable, ClassVar, Iterable, NamedTuple, Optional, Type, TypedDict, TypeVar, Union, cast - -import numpy as np -import numpy.typing as npt -import shapely -import shapely.affinity -import shapely.geometry -import shapely.validation -from shapely import geometry -from shapely import lib as shapely_lib -from shapely.geometry import MultiPolygon as ShapelyMultiPolygon -from shapely.geometry import Point as ShapelyPoint -from shapely.geometry import Polygon as ShapelyPolygon -from shapely.strtree import STRtree -from shapely.validation import make_valid - -from dlup._exceptions import AnnotationError -from dlup._types import GenericNumber, PathLike -from dlup.utils.annotations_utils import _get_geojson_z_index, get_geojson_color, hex_to_rgb -from dlup.utils.imports import DARWIN_SDK_AVAILABLE, PYHALOXML_AVAILABLE - -# TODO: -# Group when exporting to GeoJSON -# Make GeoJSON work, use colors -# Verify ASAP - - -_TWsiAnnotations = TypeVar("_TWsiAnnotations", bound="WsiAnnotations") -ShapelyTypes = Union[ShapelyPoint, ShapelyMultiPolygon, ShapelyPolygon] - - -class AnnotationType(str, Enum): - POINT = "POINT" - BOX = "BOX" - POLYGON = "POLYGON" - TAG = "TAG" - RASTER = "RASTER" - - -class AnnotationTypeToDLUPAnnotationType(Enum): - # Shared annotation types - polygon = AnnotationType.POLYGON - # ASAP annotation types - rectangle = AnnotationType.BOX - dot = AnnotationType.POINT - spline = AnnotationType.POLYGON - pointset = AnnotationType.POINT - # Darwin V7 annotation types - bounding_box = AnnotationType.BOX - complex_polygon = AnnotationType.POLYGON - keypoint = AnnotationType.POINT - tag = AnnotationType.TAG - raster_layer = AnnotationType.RASTER - - @classmethod - def from_string(cls, annotation_type: str) -> AnnotationType: - try: - return cls[annotation_type].value - except KeyError: - raise NotImplementedError(f"annotation_type {annotation_type} is not implemented or not a valid dlup type.") - - -class AnnotationSorting(str, Enum): - """The ways to sort the annotations. This is used in the constructors of the `WsiAnnotations` class, and applied - to the output of `WsiAnnotations.read_region()`. - - - REVERSE: Sort the output in reverse order. - - AREA: Often when the annotation tools do not properly support hierarchical order, one would annotate in a way - that the smaller objects are on top of the larger objects. This option sorts the output by area, so that the - larger objects appear first in the output and then the smaller objects. - - Z_INDEX: Sort the output by the z-index of the annotations. This is useful when the annotations have a z-index - - NONE: Do not apply any sorting and output as is presented in the input file. - """ - - REVERSE = "REVERSE" - AREA = "AREA" - Z_INDEX = "Z_INDEX" - NONE = "NONE" - - -@dataclass(frozen=True) # Frozen makes the class hashable -class AnnotationClass: - """An annotation class. An annotation has two required properties: - - label: The name of the annotation, e.g., "lymphocyte". - - annotation_type: The type of annotation, e.g., AnnotationType.POINT. - - And two optional properties: - - color: The color of the annotation as a tuple of RGB values. - - z_index: The z-index of the annotation. This is useful when the annotations have a z-index. - - Parameters - ---------- - label : str - The name of the annotation. - annotation_type : AnnotationType - The type of annotation. - color : Optional[tuple[int, int, int]] - The color of the annotation as a tuple of RGB values. - z_index : Optional[int] - The z-index of the annotation. - """ - - label: str - annotation_type: AnnotationType | str - color: Optional[tuple[int, int, int]] = None - z_index: Optional[int] = None - - def __post_init__(self) -> None: - if isinstance(self.annotation_type, str): - if self.annotation_type in AnnotationType.__members__: - object.__setattr__(self, "annotation_type", AnnotationType[self.annotation_type]) - else: - raise ValueError(f"Unsupported annotation type {self.annotation_type}") - - if self.annotation_type in (AnnotationType.POINT, AnnotationType.TAG) and self.z_index is not None: - raise ValueError("z_index is not supported for point annotations or tags.") - - -class DarwinV7Metadata(NamedTuple): - label: str - color: tuple[int, int, int] - annotation_type: AnnotationType - - -@functools.lru_cache(maxsize=None) -def get_v7_metadata(filename: pathlib.Path) -> Optional[dict[tuple[str, str], DarwinV7Metadata]]: - if not DARWIN_SDK_AVAILABLE: - raise RuntimeError("`darwin` is not available. Install using `python -m pip install darwin-py`.") - import darwin.path_utils - - if not filename.is_dir(): - raise RuntimeError("Provide the path to the root folder of the Darwin V7 annotations") - - v7_metadata_fn = filename / ".v7" / "metadata.json" - if not v7_metadata_fn.exists(): - return None - v7_metadata = darwin.path_utils.parse_metadata(v7_metadata_fn) - output = {} - for sample in v7_metadata["classes"]: - annotation_type = AnnotationTypeToDLUPAnnotationType.from_string(sample["type"]) - # This is not implemented and can be skipped. The main function will raise a NonImplementedError - if annotation_type == AnnotationType.RASTER: - continue - - label = sample["name"] - color = sample["color"][5:-1].split(",") - if color[-1] != "1.0": - raise RuntimeError("Expected A-channel of color to be 1.0") - rgb_colors = (int(color[0]), int(color[1]), int(color[2])) - - output[(label, annotation_type.value)] = DarwinV7Metadata( - label=label, color=rgb_colors, annotation_type=annotation_type - ) - return output - - -def _is_rectangle(polygon: DlupShapelyPolygon | ShapelyPolygon) -> bool: - if not polygon.is_valid or len(polygon.exterior.coords) != 5 or len(polygon.interiors) != 0: - return False - return bool(np.isclose(polygon.area, polygon.minimum_rotated_rectangle.area)) - - -def _is_alligned_rectangle(polygon: DlupShapelyPolygon | ShapelyPolygon) -> bool: - if not _is_rectangle(polygon): - return False - min_rotated_rect = polygon.minimum_rotated_rectangle - aligned_rect = min_rotated_rect.minimum_rotated_rectangle - return bool(min_rotated_rect == aligned_rect) - - -def transform( - geometry: DlupShapelyPoint | DlupShapelyPolygon, - transformation: Callable[[npt.NDArray[np.float_]], npt.NDArray[np.float_]], -) -> DlupShapelyPoint | DlupShapelyPolygon: - """ - Transform a geometry. Function taken from Shapely 2.0.1 under the BSD 3-Clause "New" or "Revised" License. - Parameters - ---------- - geometry : Point or Polygon - transformation : Callable - Function mapping a numpy array of coordinates to a new numpy array of coordinates. - Returns - ------- - Point or Polygon - The transformed point - """ - original_class = geometry.annotation_class - geometry_arr = np.array(geometry, dtype=np.object_) # makes a copy - coordinates = shapely_lib.get_coordinates(geometry_arr, False, False) - new_coordinates = transformation(coordinates) - # check the array to yield understandable error messages - if not isinstance(new_coordinates, np.ndarray): - raise ValueError("The provided transformation did not return a numpy array") - if new_coordinates.dtype != np.float64: - raise ValueError( - "The provided transformation returned an array with an unexpected dtype ({new_coordinates.dtype})" - ) - if new_coordinates.shape != coordinates.shape: - # if the shape is too small we will get a segfault - raise ValueError( - "The provided transformation returned an array with an unexpected shape ({new_coordinates.shape})" - ) - geometry_arr = shapely_lib.set_coordinates(geometry_arr, new_coordinates) - returned_geometry = geometry_arr.item() - - if original_class.annotation_type != "POINT": - return DlupShapelyPolygon(returned_geometry, a_cls=original_class) - return DlupShapelyPoint(returned_geometry, a_cls=original_class) - - -class GeoJsonDict(TypedDict): - """ - TypedDict for standard GeoJSON output - """ - - id: str | None - type: str - features: list[dict[str, str | dict[str, str]]] - metadata: Optional[dict[str, str | list[str]]] - - -class AnnotatedGeometry(geometry.base.BaseGeometry): # type: ignore[misc] - __slots__ = geometry.base.BaseGeometry.__slots__ - _a_cls: ClassVar[dict[str, Any]] = {} - - def __init__( - self, - *args: Any, - **kwargs: Any, - ) -> None: - # Get annotation_class from args and kwargs. We do this because the __new__ method takes different (kw)args - a_cls = next((arg for arg in args if isinstance(arg, AnnotationClass)), kwargs.get("a_cls", None)) - self._a_cls[str(id(self))] = a_cls - - @property - def annotation_class(self) -> AnnotationClass: - return cast(AnnotationClass, self._a_cls[str(id(self))]) - - @property - def annotation_type(self) -> AnnotationType | str: - return self.annotation_class.annotation_type - - @property - def label(self) -> str: - return self.annotation_class.label - - @property - def color(self) -> Optional[tuple[int, int, int]]: - return self.annotation_class.color - - @property - def z_index(self) -> Optional[int]: - return self.annotation_class.z_index - - def __del__(self) -> None: - if str(id(self)) in self._a_cls: - del self._a_cls[str(id(self))] - - def __str__(self) -> str: - return f"{self.annotation_class}, {self.wkt}" - - def __eq__(self, other: object) -> bool: - geometry_equal = self.equals(other) - if not geometry_equal: - return False - - if not isinstance(other, type(self)): - return False - - if other.annotation_class != self.annotation_class: - return False - return True - - def __iadd__(self, other: Any) -> None: - raise TypeError(f"unsupported operand type(s) for +=: {type(self)} and {type(other)}") - - def __isub__(self, other: Any) -> None: - raise TypeError(f"unsupported operand type(s) for -=: {type(self)} and {type(other)}") - - -class DlupShapelyPoint(ShapelyPoint, AnnotatedGeometry): # type: ignore[misc] - __slots__ = ShapelyPoint.__slots__ - - def __new__( - cls, coord: ShapelyPoint | tuple[float, float], a_cls: Optional[AnnotationClass] = None - ) -> "DlupShapelyPoint": - point = super().__new__(cls, coord) - point.__class__ = cls - return cast("DlupShapelyPoint", point) - - def __reduce__(self) -> tuple[type, tuple[tuple[float, float], Optional[AnnotationClass]]]: - return (self.__class__, ((self.x, self.y), self.annotation_class)) - - -class DlupShapelyPolygon(ShapelyPolygon, AnnotatedGeometry): # type: ignore[misc] - __slots__ = ShapelyPolygon.__slots__ - - def __new__( - cls, - shell: Union[tuple[float, float], ShapelyPolygon], - holes: Optional[list[list[list[float]]] | list[npt.NDArray[np.float_]]] = None, - a_cls: Optional[AnnotationClass] = None, - ) -> "DlupShapelyPolygon": - instance = super().__new__(cls, shell, holes) - instance.__class__ = cls - return cast("DlupShapelyPolygon", instance) - - def intersect_with_box( - self, - other: ShapelyPolygon, - ) -> Optional[list["DlupShapelyPolygon"]]: - result = make_valid(self).intersection(other) - if self.area > 0 and result.area == 0: - return None - - # Verify if this box is still a box. Create annotation_type to polygon if that is not the case - if self.annotation_type == AnnotationType.BOX and not _is_rectangle(result): - annotation_class = replace(self.annotation_class, annotation_type=AnnotationType.POLYGON) - else: - annotation_class = self.annotation_class - - if isinstance(result, ShapelyPolygon): - return [DlupShapelyPolygon(result, a_cls=annotation_class)] - elif isinstance(result, (ShapelyMultiPolygon, shapely.geometry.collection.GeometryCollection)): - return [DlupShapelyPolygon(geom, a_cls=annotation_class) for geom in result.geoms if geom.area > 0] - else: - raise NotImplementedError(f"{type(result)}") - - def __reduce__( - self, - ) -> tuple[type, tuple[list[tuple[float, float]], list[list[tuple[float, float]]], Optional[AnnotationClass]]]: - return ( - self.__class__, - (self.exterior.coords[:], [ring.coords[:] for ring in self.interiors], self.annotation_class), - ) - - -class CoordinatesDict(TypedDict): - type: str - coordinates: list[list[list[float]]] | list[list[tuple[float, float]]] - - -def shape( - coordinates: CoordinatesDict, - label: str, - color: Optional[tuple[int, int, int]] = None, - z_index: Optional[int] = None, -) -> list[DlupShapelyPolygon | DlupShapelyPoint]: - geom_type = coordinates.get("type", "not_found").lower() - if geom_type == "not_found": - raise ValueError("No type found in coordinates.") - elif geom_type in ["point", "multipoint"]: - if z_index is not None: - raise AnnotationError("z_index is not supported for point annotations.") - - annotation_class = AnnotationClass(label=label, annotation_type=AnnotationType.POINT, color=color, z_index=None) - _coordinates = coordinates["coordinates"] - return [ - DlupShapelyPoint(np.asarray(c), a_cls=annotation_class) - for c in (_coordinates if geom_type == "multipoint" else [_coordinates]) - ] - elif geom_type in ["polygon", "multipolygon"]: - _coordinates = coordinates["coordinates"] - # TODO: Give every polygon in multipolygon their own annotation_class / annotation_type - annotation_type = ( - AnnotationType.BOX - if geom_type == "polygon" and _is_rectangle(DlupShapelyPolygon(_coordinates[0])) - else AnnotationType.POLYGON - ) - annotation_class = AnnotationClass(label=label, annotation_type=annotation_type, color=color, z_index=z_index) - return [ - DlupShapelyPolygon( - shell=np.asarray(c[0]), holes=[np.asarray(hole) for hole in c[1:]], a_cls=annotation_class - ) - for c in (_coordinates if geom_type == "multipolygon" else [_coordinates]) - ] - - raise AnnotationError(f"Unsupported geom_type {geom_type}") - - -def _geometry_to_geojson( - geometry: DlupShapelyPolygon | DlupShapelyPoint, label: str, color: tuple[int, int, int] | None, z_index: int | None -) -> dict[str, Any]: - """Function to convert a geometry to a GeoJSON object. - - Parameters - ---------- - geometry : Polygon | Point - A polygon or point object - label : str - The label name - color : tuple[int, int, int] - The color of the object in RGB values - z_index : int - The z-index of the object - - Returns - ------- - dict[str, Any] - Output dictionary representing the data in GeoJSON - - """ - data = { - "type": "Feature", - "properties": { - "classification": { - "name": label, - }, - }, - "geometry": shapely.geometry.mapping(geometry), - } - if color is not None: - data["properties"]["classification"]["color"] = color - - if z_index is not None: - data["properties"]["classification"]["z_index"] = z_index - - return data - - -class WsiAnnotations: - """Class that holds all annotations for a specific image""" - - def __init__( - self, - layers: list[DlupShapelyPoint | DlupShapelyPolygon], - tags: Optional[list[AnnotationClass]] = None, - offset_to_slide_bounds: bool = False, - sorting: AnnotationSorting | str = AnnotationSorting.NONE, - ): - """ - Parameters - ---------- - layers : list[Point | Polygon] - A list of layers for a single label. - tags: Optional[list[AnnotationClass]] - A list of tags for the annotations. These have to be of type `AnnotationType.TAG`. - offset_to_slide_bounds : bool - If true, will set the property `offset_to_slide_bounds` to True. This means that the annotations need - to be offset to the slide bounds. This is useful when the annotations are read from a file format which - requires this, for instance HaloXML. - sorting: AnnotationSorting - The sorting to apply to the annotations. Check the `AnnotationSorting` enum for more information. - By default, the annotations are not sorted. - - """ - self._layers = layers - self._tags = tags - self._offset_to_slide_bounds = offset_to_slide_bounds - self._sorting = sorting - self._sort_layers_in_place() - self._available_classes: set[AnnotationClass] = {layer.annotation_class for layer in self._layers} - self._str_tree = STRtree(self._layers) - warnings.warn( - "WsiAnnotations will be deprecated in the next release. Use SlideAnnotations instead.", DeprecationWarning - ) - - @property - def available_classes(self) -> set[AnnotationClass]: - return self._available_classes - - @property - def tags(self) -> Optional[list[AnnotationClass]]: - return self._tags - - @property - def offset_to_slide_bounds(self) -> bool: - """ - If True, the annotations need to be offset to the slide bounds. This is useful when the annotations are read - from a file format which requires this, for instance HaloXML. - - Returns - ------- - bool - """ - return self._offset_to_slide_bounds - - @property - def sorting(self) -> AnnotationSorting | str: - return self._sorting - - def filter(self, labels: str | list[str] | tuple[str]) -> None: - """ - Filter annotations based on the given label list. If annotations with the same name but a different type are - present, they will all be added irrespective of the type. - - Parameters - ---------- - labels : tuple or list or string - The list or tuple of labels or a single string of a label - - Returns - ------- - None - """ - _labels = [labels] if isinstance(labels, str) else labels - self._layers = [layer for layer in self._layers if layer.label in _labels] - self._available_classes = {layer.annotation_class for layer in self._layers} - self._str_tree = STRtree(self._layers) - - @property - def bounding_box(self) -> tuple[tuple[float, float], tuple[float, float]]: - """ - Return the bounding box of all annotations. - - Returns - ------- - tuple[tuple[float, float], tuple[float, float]] - Bounding box of the form ((x, y), (w, h)). - """ - if not self._layers: - return ((0.0, 0.0), (0.0, 0.0)) - - # Extract the bounds for each annotation - bounds = np.array( - [ - ( - annotation.bounds - if isinstance(annotation, DlupShapelyPolygon) - else (annotation.x, annotation.y, annotation.x, annotation.y) - ) - for annotation in self._layers - ] - ) - - # Calculate the min and max coordinates - min_coords = bounds[:, [0, 1]].min(axis=0) - max_coords = bounds[:, [2, 3]].max(axis=0) - - # Calculate width and height - width, height = max_coords - min_coords - - return (tuple(min_coords), (width, height)) - - def copy(self) -> WsiAnnotations: - """Make a copy of the object.""" - return copy.deepcopy(self) - - @classmethod - def from_geojson( - cls: Type[_TWsiAnnotations], - geojsons: PathLike | Iterable[PathLike], - sorting: AnnotationSorting | str = AnnotationSorting.NONE, - ) -> _TWsiAnnotations: - """ - Constructs an WsiAnnotations object from geojson. - - Parameters - ---------- - geojsons : Iterable, or PathLike - List of geojsons representing objects. The properties object must have the name which is the label of this - object. - sorting: AnnotationSorting - The sorting to apply to the annotations. Check the `AnnotationSorting` enum for more information. - By default, the annotations are sorted by area. - - Returns - ------- - WsiAnnotations - - """ - if isinstance(geojsons, str): - _geojsons: Iterable[Any] = [pathlib.Path(geojsons)] - - _geojsons = [geojsons] if not isinstance(geojsons, (tuple, list)) else geojsons - layers: list[DlupShapelyPolygon | DlupShapelyPoint] = [] - tags = None - for path in _geojsons: - path = pathlib.Path(path) - if not path.exists(): - raise FileNotFoundError(errno.ENOENT, os.strerror(errno.ENOENT), str(path)) - - with open(path, "r", encoding="utf-8") as annotation_file: - geojson_dict = json.load(annotation_file) - if "metadata" in geojson_dict: - if geojson_dict["metadata"] and geojson_dict["metadata"].get("tags", None) is not None: - _tags = geojson_dict["metadata"]["tags"] - tags = [ - AnnotationClass(label=tag, annotation_type=AnnotationType.TAG, color=None, z_index=None) - for tag in _tags - ] - features = geojson_dict["features"] - for x in features: - properties = x["properties"] - if "classification" in properties: - _label = properties["classification"]["name"] - _color = get_geojson_color(properties["classification"]) - _z_index = _get_geojson_z_index(properties["classification"]) - elif properties.get("objectType", None) == "annotation": - _label = properties["name"] - _color = get_geojson_color(properties) - _z_index = _get_geojson_z_index(properties) - else: - raise ValueError("Could not find label in the GeoJSON properties.") - - _geometry = shape(x["geometry"], label=_label, color=_color, z_index=_z_index) - layers += _geometry - - return cls(layers=layers, tags=tags, sorting=sorting) - - @classmethod - def from_asap_xml( - cls, - asap_xml: PathLike, - scaling: float | None = None, - sorting: AnnotationSorting | str = AnnotationSorting.AREA, - ) -> WsiAnnotations: - """ - Read annotations as an ASAP [1] XML file. ASAP is a tool for viewing and annotating whole slide images. - - Parameters - ---------- - asap_xml : PathLike - Path to ASAP XML annotation file. - scaling : float, optional - Scaling factor. Sometimes required when ASAP annotations are stored in a different resolution than the - original image. - sorting: AnnotationSorting - The sorting to apply to the annotations. Check the `AnnotationSorting` enum for more information. - By default, the annotations are sorted by area. - - References - ---------- - .. [1] https://github.com/computationalpathologygroup/ASAP - - Returns - ------- - WsiAnnotations - """ - tree = ET.parse(asap_xml) - opened_annotation = tree.getroot() - layers: list[DlupShapelyPolygon | DlupShapelyPoint] = [] - opened_annotations = 0 - for parent in opened_annotation: - for child in parent: - if child.tag != "Annotation": - continue - label = child.attrib.get("PartOfGroup").strip() # type: ignore - color = hex_to_rgb(child.attrib.get("Color").strip()) # type: ignore - - _type = child.attrib.get("Type").lower() # type: ignore - annotation_type = AnnotationTypeToDLUPAnnotationType.from_string(_type) - coordinates = _parse_asap_coordinates(child, annotation_type, scaling=scaling) - - if not coordinates.is_valid: - coordinates = shapely.validation.make_valid(coordinates) - - # It is possible there have been linestrings or so added. - if isinstance(coordinates, shapely.geometry.collection.GeometryCollection): - split_up = [_ for _ in coordinates.geoms if _.area > 0] - if len(split_up) != 1: - raise RuntimeError("Got unexpected object.") - coordinates = split_up[0] - - if coordinates.area == 0: - continue - - # Sometimes we have two adjacent polygons which can be split - if isinstance(coordinates, ShapelyMultiPolygon): - coordinates_list = coordinates.geoms - else: - # Explicitly turn into a list - coordinates_list = [coordinates] - - for coordinates in coordinates_list: - _cls = AnnotationClass(label=label, annotation_type=annotation_type, color=color) - if isinstance(coordinates, ShapelyPoint): - layers.append(DlupShapelyPoint(coordinates, a_cls=_cls)) - elif isinstance(coordinates, ShapelyPolygon): - layers.append(DlupShapelyPolygon(coordinates, a_cls=_cls)) - else: - raise NotImplementedError - - opened_annotations += 1 - - return cls(layers=layers, sorting=sorting) - - @classmethod - def from_halo_xml( - cls, - halo_xml: PathLike, - scaling: float | None = None, - sorting: AnnotationSorting | str = AnnotationSorting.NONE, - ) -> WsiAnnotations: - """ - Read annotations as a Halo [1] XML file. - This function requires `pyhaloxml` [2] to be installed. - - Parameters - ---------- - halo_xml : PathLike - Path to the Halo XML file. - scaling : float, optional - The scaling to apply to the annotations. - sorting: AnnotationSorting - The sorting to apply to the annotations. Check the `AnnotationSorting` enum for more information. By default - the annotations are not sorted as HALO supports hierarchical annotations. - - References - ---------- - .. [1] https://indicalab.com/halo/ - .. [2] https://github.com/rharkes/pyhaloxml - - Returns - ------- - WsiAnnotations - """ - if not PYHALOXML_AVAILABLE: - raise RuntimeError("`pyhaloxml` is not available. Install using `python -m pip install pyhaloxml`.") - import pyhaloxml.shapely - - output_layers = [] - with pyhaloxml.HaloXMLFile(halo_xml) as hx: - hx.matchnegative() - for layer in hx.layers: - for region in layer.regions: - curr_geometry = pyhaloxml.shapely.region_to_shapely(region) - if region.type == pyhaloxml.RegionType.Rectangle: - _cls = AnnotationClass(label=layer.name, annotation_type=AnnotationType.BOX) - output_layers.append(DlupShapelyPolygon(curr_geometry, a_cls=_cls)) - if region.type in [pyhaloxml.RegionType.Ellipse, pyhaloxml.RegionType.Polygon]: - _cls = AnnotationClass(label=layer.name, annotation_type=AnnotationType.POLYGON) - output_layers.append(DlupShapelyPolygon(curr_geometry, a_cls=_cls)) - if region.type == pyhaloxml.RegionType.Pin: - _cls = AnnotationClass(label=layer.name, annotation_type=AnnotationType.POINT) - output_layers.append(DlupShapelyPoint(curr_geometry, a_cls=_cls)) - else: - raise NotImplementedError(f"Regiontype {region.type} is not implemented in DLUP") - - return cls(output_layers, offset_to_slide_bounds=True, sorting=sorting) - - @classmethod - def from_darwin_json( - cls, - darwin_json: PathLike, - sorting: AnnotationSorting | str = AnnotationSorting.NONE, - z_indices: Optional[dict[str, int]] = None, - ) -> WsiAnnotations: - """ - Read annotations as a V7 Darwin [1] JSON file. If available will read the `.v7/metadata.json` file to extract - colors from the annotations. - - Parameters - ---------- - darwin_json : PathLike - Path to the Darwin JSON file. - sorting: AnnotationSorting - The sorting to apply to the annotations. Check the `AnnotationSorting` enum for more information. - By default, the annotations are sorted by the z-index which is generated by the order of the saved - annotations. - z_indices: dict[str, int], optional - If set, these z_indices will be used rather than the default order. - - References - ---------- - .. [1] https://darwin.v7labs.com/ - - Returns - ------- - WsiAnnotations - - """ - if not DARWIN_SDK_AVAILABLE: - raise RuntimeError("`darwin` is not available. Install using `python -m pip install darwin-py`.") - import darwin - - darwin_json_fn = pathlib.Path(darwin_json) - darwin_an = darwin.utils.parse_darwin_json(darwin_json_fn, None) - v7_metadata = get_v7_metadata(darwin_json_fn.parent) - - tags = [] - layers = [] - for curr_annotation in darwin_an.annotations: - name = curr_annotation.annotation_class.name - darwin_annotation_type = curr_annotation.annotation_class.annotation_type - annotation_type = AnnotationTypeToDLUPAnnotationType.from_string(darwin_annotation_type) - if annotation_type == AnnotationType.RASTER: - raise NotImplementedError("Raster annotations are not supported.") - - annotation_color = v7_metadata[(name, annotation_type.value)].color if v7_metadata else None - - if annotation_type == AnnotationType.TAG: - tags.append( - AnnotationClass( - label=name, annotation_type=AnnotationType.TAG, color=annotation_color, z_index=None - ) - ) - continue - - z_index = None if annotation_type == AnnotationType.POINT or z_indices is None else z_indices[name] - curr_data = curr_annotation.data - - _cls = AnnotationClass(label=name, annotation_type=annotation_type, color=annotation_color, z_index=z_index) - if annotation_type == AnnotationType.POINT: - curr_point = DlupShapelyPoint((curr_data["x"], curr_data["y"]), a_cls=_cls) - layers.append(curr_point) - continue - - elif annotation_type == AnnotationType.POLYGON: - if "path" in curr_data: # This is a regular polygon - curr_polygon = DlupShapelyPolygon([(_["x"], _["y"]) for _ in curr_data["path"]]) - layers.append(DlupShapelyPolygon(curr_polygon, a_cls=_cls)) - - elif "paths" in curr_data: # This is a complex polygon which needs to be parsed with the even-odd rule - curr_complex_polygon = _parse_darwin_complex_polygon(curr_data) - for polygon in curr_complex_polygon.geoms: - layers.append(DlupShapelyPolygon(polygon, a_cls=_cls)) - else: - raise ValueError(f"Got unexpected data keys: {curr_data.keys()}") - - elif annotation_type == AnnotationType.BOX: - x, y, w, h = list(map(curr_data.get, ["x", "y", "w", "h"])) - curr_polygon = shapely.geometry.box(x, y, x + w, y + h) - layers.append(DlupShapelyPolygon(curr_polygon, a_cls=_cls)) - else: - ValueError(f"Annotation type {annotation_type} is not supported.") - - return cls(layers=layers, tags=tags, sorting=sorting) - - def _sort_layers_in_place(self) -> None: - """ - Sorts a list of layers. Check AnnotationSorting for more information of the sorting types. - Returns - ------- - None - """ - if self._sorting == AnnotationSorting.Z_INDEX: - self._layers.sort(key=lambda x: (x.z_index is None, x.z_index)) - elif self._sorting == AnnotationSorting.REVERSE: - self._layers.reverse() - elif self._sorting == AnnotationSorting.AREA: - self._layers.sort(key=lambda x: x.area, reverse=True) - elif self._sorting == AnnotationSorting.NONE: - pass - else: - raise NotImplementedError(f"Sorting not implemented for {self._sorting}.") - - def as_geojson(self) -> GeoJsonDict: - """ - Output the annotations as proper geojson. These outputs are sorted according to the `AnnotationSorting` selected - for the annotations. This ensures the annotations are correctly sorted in the output. - - The output is not completely GeoJSON compliant as some parts such as the metadata and properties are not part - of the standard. However, these are implemented to ensure the output is compatible with QuPath. - - Returns - ------- - GeoJsonDict - The output as a GeoJSON dictionary. - """ - data: GeoJsonDict = {"type": "FeatureCollection", "metadata": None, "features": [], "id": None} - if self.tags: - data["metadata"] = {"tags": [_.label for _ in self.tags]} - - # # This used to be it. - for idx, curr_annotation in enumerate(self._layers): - json_dict = _geometry_to_geojson( - curr_annotation, - label=curr_annotation.label, - color=curr_annotation.color, - z_index=curr_annotation.z_index if isinstance(curr_annotation, DlupShapelyPolygon) else None, - ) - json_dict["id"] = str(idx) - data["features"].append(json_dict) - - return data - - def simplify(self, tolerance: float, *, preserve_topology: bool = True) -> None: - """Simplify the polygons in the annotation (i.e. reduce points). Other annotations will remain unchanged. - All points in the resulting polygons object will be in the tolerance distance of the original polygon. - - Parameters - ---------- - tolerance : float - preserve_topology : bool - Preserve the topology, if false, this function will be much faster. Internally the `shapely` simplify - algorithm is used. - - Returns - ------- - None - - """ - # TODO: Implement simplify on Polygon - for idx, layer in enumerate(self._layers): - a_cls = layer.annotation_class - if a_cls.annotation_type == AnnotationType.POINT: - continue - layer.simplify(tolerance, preserve_topology=preserve_topology) - self._layers[idx] = DlupShapelyPolygon(self._layers[idx], a_cls=a_cls) - - def read_region( - self, - location: npt.NDArray[np.int_ | np.float_] | tuple[GenericNumber, GenericNumber], - scaling: float, - size: npt.NDArray[np.int_ | np.float_] | tuple[GenericNumber, GenericNumber], - ) -> list[DlupShapelyPolygon | DlupShapelyPoint]: - """Reads the region of the annotations. API is the same as `dlup.SlideImage` so they can be used in conjunction. - - The process is as follows: - - 1. All the annotations which overlap with the requested region of interested are filtered with an STRTree. - 2. The annotations are filtered by name (if the names are the same they are in order of POINT, BOX and POLYGON) - , and subsequently by area from large to small. The reason this is implemented this way is because sometimes - one can annotate a larger region, and the smaller regions should overwrite the previous part. - A function `dlup.data.transforms.convert_annotations` can be used to convert such outputs to a mask. - 3. The annotations are cropped to the region-of-interest, or filtered in case of points. Polygons which - convert into points after intersection are removed. If it's a image-level label, nothing happens. - 4. The annotation is rescaled and shifted to the origin to match the local patch coordinate system. - - The final returned data is a list of `dlup.annotations.Polygon` or `dlup.annotations.Point`. - - Parameters - ---------- - location: np.ndarray or tuple - size : np.ndarray or tuple - scaling : float - - Returns - ------- - list[tuple[str, ShapelyTypes]] - List of tuples denoting the name of the annotation and a shapely object. - - Examples - -------- - 1. To read geojson annotations and convert them into masks: - - >>> from pathlib import Path - >>> from dlup import SlideImage - >>> import numpy as np - >>> from rasterio.features import rasterize - >>> wsi = SlideImage.from_file_path(Path("path/to/image.svs")) - >>> wsi = wsi.get_scaled_view(scaling=0.5) - >>> wsi = wsi.read_region(location=(0,0), size=wsi.size) - >>> annotations = WsiAnnotations.from_geojson([Path("path/to/geojson.json")], labels=["class_name"]) - >>> polygons: list[Polygons] = annotations.read_region(location=(0,0), size=wsi.size, scaling=0.01) - - The polygons can be converted to masks using `dlup.data.transforms.convert_annotations` or - `dlup.data.transforms.ConvertAnnotationsToMask`. - """ - - warnings.warn( - "WsiAnnotations.read_region() will be deprecated in the next release. " - "Use SlideAnnotations.read_region() instead, which has a different return type", - DeprecationWarning, - ) - - box = list(location) + list(np.asarray(location) + np.asarray(size)) - box = (np.asarray(box) / scaling).tolist() - query_box = geometry.box(*box) - - curr_indices = self._str_tree.query(query_box) - # This is needed because the STRTree returns (seemingly) arbitrary order, and this would destroy the order - curr_indices.sort() - filtered_annotations: list[DlupShapelyPoint | DlupShapelyPolygon] = self._str_tree.geometries.take( - curr_indices - ).tolist() - - cropped_annotations: list[DlupShapelyPoint | DlupShapelyPolygon] = [] - for annotation in filtered_annotations: - if annotation.annotation_type in (AnnotationType.BOX, AnnotationType.POLYGON): - _annotations = annotation.intersect_with_box(query_box) - if _annotations is not None: - cropped_annotations += _annotations - else: - cropped_annotations.append(annotation) - - def _affine_coords(coords: npt.NDArray[np.float_]) -> npt.NDArray[np.float_]: - return coords * scaling - np.asarray(location, dtype=np.float_) - - output: list[DlupShapelyPolygon | DlupShapelyPoint] = [] - for annotation in cropped_annotations: - annotation = transform(annotation, _affine_coords) - output.append(annotation) - return output - - def __str__(self) -> str: - return ( - f"{type(self).__name__}(n_layers={len(self._layers)}, " - f"tags={[tag.label for tag in self.tags] if self.tags else None})" - ) - - def __contains__(self, item: Union[str, AnnotationClass, DlupShapelyPoint, DlupShapelyPolygon]) -> bool: - if isinstance(item, str): - return item in [_.label for _ in self.available_classes] - elif isinstance(item, (DlupShapelyPoint, DlupShapelyPolygon)): - return item in self._layers - return item in self.available_classes - - def __getitem__(self, idx: int) -> DlupShapelyPoint | DlupShapelyPolygon: - return self._layers[idx] - - def __iter__(self) -> Iterable[DlupShapelyPoint | DlupShapelyPolygon]: - for layer in self._layers: - yield layer - - def __len__(self) -> int: - return len(self._layers) - - def __add__( - self, - other: WsiAnnotations | DlupShapelyPoint | DlupShapelyPolygon | list[DlupShapelyPoint | DlupShapelyPolygon], - ) -> WsiAnnotations: - if isinstance(other, (DlupShapelyPoint, DlupShapelyPolygon)): - other = [other] - - if isinstance(other, list): - if not all(isinstance(item, (DlupShapelyPoint, DlupShapelyPolygon)) for item in other): - raise TypeError("can only add list purely containing Point and Polygon objects to WsiAnnotations") - new_layers = self._layers + other - new_tags = self.tags - elif isinstance(other, WsiAnnotations): - if self.sorting != other.sorting or self.offset_to_slide_bounds != other.offset_to_slide_bounds: - raise ValueError( - "Both sorting and offset_to_slide_bounds must be the same to add WsiAnnotations together." - ) - new_layers = self._layers + other._layers - new_tags = self.tags if self.tags is not None else [] + other.tags if other.tags is not None else None - else: - return NotImplemented - return WsiAnnotations( - layers=new_layers, tags=new_tags, offset_to_slide_bounds=self.offset_to_slide_bounds, sorting=self.sorting - ) - - def __iadd__( - self, - other: WsiAnnotations | DlupShapelyPoint | DlupShapelyPolygon | list[DlupShapelyPoint | DlupShapelyPolygon], - ) -> WsiAnnotations: - if isinstance(other, (DlupShapelyPoint, DlupShapelyPolygon)): - other = [other] - - if isinstance(other, list): - if not all(isinstance(item, (DlupShapelyPoint, DlupShapelyPolygon)) for item in other): - raise TypeError("can only add list purely containing Point and Polygon objects to WsiAnnotations") - - self._layers += other - for item in other: - self._available_classes.add(item.annotation_class) - elif isinstance(other, WsiAnnotations): - if self.sorting != other.sorting or self.offset_to_slide_bounds != other.offset_to_slide_bounds: - raise ValueError( - "Both sorting and offset_to_slide_bounds must be the same to add WsiAnnotations together." - ) - self._layers += other._layers - - if self._tags is None: - self._tags = other._tags - elif other._tags is not None: - assert self - self._tags += other._tags - - self._available_classes.update(other.available_classes) - else: - return NotImplemented - self._sort_layers_in_place() - self._str_tree = STRtree(self._layers) - return self - - def __radd__( - self, - other: WsiAnnotations | DlupShapelyPoint | DlupShapelyPolygon | list[DlupShapelyPoint | DlupShapelyPolygon], - ) -> WsiAnnotations: - # in-place addition (+=) of Point and Polygon will raise a TypeError - if not isinstance(other, (WsiAnnotations, DlupShapelyPoint, DlupShapelyPolygon, list)): - return NotImplemented - if isinstance(other, list): - if not all(isinstance(item, (DlupShapelyPoint, DlupShapelyPolygon)) for item in other): - raise TypeError("can only add list purely containing Point and Polygon objects to WsiAnnotations") - raise TypeError( - "use the __add__ or __iadd__ operator instead of __radd__ when working with lists to avoid \ - unexpected behavior." - ) - return self + other - - def __sub__(self, other: WsiAnnotations | DlupShapelyPoint | DlupShapelyPolygon) -> WsiAnnotations: - return NotImplemented - - def __isub__(self, other: WsiAnnotations | DlupShapelyPoint | DlupShapelyPolygon) -> WsiAnnotations: - return NotImplemented - - def __rsub__(self, other: WsiAnnotations) -> WsiAnnotations: - return NotImplemented - - -def _parse_darwin_complex_polygon(annotation: dict[str, Any]) -> ShapelyMultiPolygon: - """ - Parse a complex polygon (i.e. polygon with holes) from a Darwin annotation. - Parameters - ---------- - annotation : dict - Returns - ------- - ShapelyMultiPolygon - """ - # Create Polygons and sort by area in descending order - polygons: list[ShapelyPolygon] = sorted( - [ShapelyPolygon([(p["x"], p["y"]) for p in path]) for path in annotation["paths"]], - key=lambda x: x.area, - reverse=True, - ) - outer_polygons: list[tuple[ShapelyPolygon, list[ShapelyPolygon], bool]] = [] - for polygon in polygons: - is_hole = False - # Check if the polygon can be a hole in any of the previously processed polygons - for outer_poly, holes, outer_poly_is_hole in reversed(outer_polygons): - contains = outer_poly.contains(polygon) - # If polygon is contained by a hole, it should be added as new polygon - if contains and outer_poly_is_hole: - break - # Polygon is added as hole if outer polygon is not a hole - elif contains: - holes.append(polygon.exterior.coords) - is_hole = True - break - outer_polygons.append((polygon, [], is_hole)) - - return ShapelyMultiPolygon( - [ - ShapelyPolygon(outer_poly.exterior.coords, holes) - for outer_poly, holes, _is_hole in outer_polygons - if not _is_hole - ] - ) - - -def _parse_asap_coordinates( - annotation_structure: ET.Element, - annotation_type: AnnotationType, - scaling: float | None, -) -> ShapelyTypes: - """ - Parse ASAP XML coordinates into Shapely objects. - - Parameters - ---------- - annotation_structure : list of strings - annotation_type : AnnotationType - The annotation type this structure is representing. - scaling : float - Scaling to apply to the coordinates - - Returns - ------- - Shapely object - - """ - coordinates = [] - coordinate_structure = annotation_structure[0] - - _scaling = 1.0 if not scaling else scaling - for coordinate in coordinate_structure: - coordinates.append( - ( - float(coordinate.get("X").replace(",", ".")) * _scaling, # type: ignore - float(coordinate.get("Y").replace(",", ".")) * _scaling, # type: ignore - ) - ) - - if annotation_type == AnnotationType.POLYGON: - coordinates = ShapelyPolygon(coordinates) - elif annotation_type == AnnotationType.BOX: - raise NotImplementedError - elif annotation_type == AnnotationType.POINT: - coordinates = shapely.geometry.MultiPoint(coordinates) - else: - raise AnnotationError(f"Annotation type not supported. Got {annotation_type}.") - - return coordinates diff --git a/dlup/annotations/__init__.py b/dlup/annotations/__init__.py new file mode 100644 index 00000000..a1de924f --- /dev/null +++ b/dlup/annotations/__init__.py @@ -0,0 +1,711 @@ +# Copyright (c) dlup contributors +""" +Experimental annotations module for dlup. + +""" +from __future__ import annotations + +import copy +import importlib +from enum import Enum +from typing import Any, Callable, Iterable, Optional, TypeVar + +import numpy as np +import numpy.typing as npt + +from dlup._geometry import AnnotationRegion # pylint: disable=no-name-in-module +from dlup._types import GenericNumber +from dlup.annotations.tags import SlideTag +from dlup.geometry import GeometryCollection, Point, Polygon + +_TSlideAnnotations = TypeVar("_TSlideAnnotations", bound="SlideAnnotations") + + +class AnnotationSorting(str, Enum): + """The ways to sort the annotations. This is used in the constructors of the `SlideAnnotations` class, and applied + to the output of `SlideAnnotations.read_region()`. + + - REVERSE: Sort the output in reverse order. + - AREA: Often when the annotation tools do not properly support hierarchical order, one would annotate in a way + that the smaller objects are on top of the larger objects. This option sorts the output by area, so that the + larger objects appear first in the output and then the smaller objects. + - Z_INDEX: Sort the output by the z-index of the annotations. This is useful when the annotations have a z-index + - NONE: Do not apply any sorting and output as is presented in the input file. + """ + + REVERSE = "REVERSE" + AREA = "AREA" + Z_INDEX = "Z_INDEX" + NONE = "NONE" + + def to_sorting_params(self) -> Any: + """Get the sorting parameters for the annotation sorting.""" + if self == AnnotationSorting.REVERSE: + return lambda x: None, True + + if self == AnnotationSorting.AREA: + return lambda x: x.area, False + + if self == AnnotationSorting.Z_INDEX: + return lambda x: x.get_field("z_index"), False + + +class SlideAnnotations: + """Class that holds all annotations for a specific image""" + + def __init__( + self, + layers: GeometryCollection, + tags: Optional[tuple[SlideTag, ...]] = None, + sorting: Optional[AnnotationSorting | str] = None, + **kwargs: Any, + ) -> None: + """ + Parameters + ---------- + layers : GeometryCollection + Geometry collection containing the polygons, boxes and points + tags: Optional[tuple[SlideTag, ...]] + A tuple of image-level tags such as staining quality + sorting: AnnotationSorting + Sorting method, see `AnnotationSorting`. This value is typically passed to the constructor + because of operations layer on (such as `__add__`). Typically the classmethod already sorts the data + **kwargs: Any + Additional keyword arguments. In this class they are used for additional metadata or offset functions. + Currently only HaloXML requires offsets. See `.from_halo_xml` for an example + """ + self._layers = layers + self._tags = tags + self._sorting = sorting + self._offset_function: bool = bool(kwargs.get("offset_function", False)) + self._metadata: Optional[dict[str, list[str] | str | int | float | bool]] = kwargs.get("metadata", None) + + @classmethod + def register_importer(cls, func: Optional[Callable[..., Any]], name: str) -> None: + """Register an importer function dynamically as a class method.""" + method_name = f"from_{name}" + + if hasattr(cls, method_name): + raise ValueError(f"Method `{method_name}` already registered.") + + # If no function is provided, assume it's an internal importer and load it dynamically + if func is None: + module_name = { + "geojson": ".importers.geojson", + "halo_xml": ".importers.halo_xml", + "asap_xml": ".importers.asap_xml", + "dlup_xml": ".importers.dlup_xml", + "darwin_json": ".importers.darwin_json", + }.get(name) + + if not module_name: + raise ValueError(f"No internal importer found for '{name}'") + + module = importlib.import_module(module_name, package=__name__) + func = getattr(module, f"{name}_importer") + + # Register the importer as a class method + setattr(cls, method_name, classmethod(func)) + + @classmethod + def register_exporter(cls, func: Optional[Callable[..., Any]], name: str) -> None: + """Register an exporter function dynamically as an instance method.""" + method_name = f"as_{name}" + + if hasattr(cls, method_name): + raise ValueError(f"Method `{method_name}` already registered.") + + # If no function is provided, assume it's an internal exporter and load it dynamically + if func is None: + module_name = { + "geojson": ".exporters.geojson", + "dlup_xml": ".exporters.dlup_xml", + }.get(name) + + if not module_name: + raise ValueError(f"No internal exporter found for '{name}'") + + module = importlib.import_module(module_name, package=__name__) + func = getattr(module, f"{name}_exporter") + + # Register the exporter as an instance method + setattr(cls, method_name, func) + + @property + def sorting(self) -> Optional[AnnotationSorting | str]: + return self._sorting + + @property + def tags(self) -> Optional[tuple[SlideTag, ...]]: + return self._tags + + @property + def num_polygons(self) -> int: + return len(self.layers.polygons) + + @property + def num_points(self) -> int: + return len(self.layers.points) + + @property + def num_boxes(self) -> int: + return len(self.layers.boxes) + + @property + def metadata(self) -> Optional[dict[str, list[str] | str | int | float | bool]]: + """Additional metadata for the annotations""" + return self._metadata + + @property + def offset_function(self) -> Any: + """ + In some cases a function needs to be applied to the coordinates which cannot be handled in this class as + it might require additional information. This function will be applied to the coordinates of all annotations. + This is useful from a file format which requires this, for instance HaloXML. + + Example + ------- + For HaloXML this is `offset = slide.slide_bounds[0] - slide.slide_bounds[0] % 256`. + >>> slide = Slide.from_file_path("image.svs") + >>> ann = SlideAnnotations.from_halo_xml("annotations.xml") + >>> assert ann.offset_function == lambda slide: slide.slide_bounds[0] - slide.slide_bounds[0] % 256 + >>> ann.set_offset(annotation.offset_function(slide)) + + Returns + ------- + Callable + + """ + return self._offset_function + + @offset_function.setter + def offset_function(self, func: Any) -> None: + self._offset_function = func + + @property + def layers(self) -> GeometryCollection: + """ + Get the layers of the annotations. + This is a GeometryCollection object which contains all the polygons and points + """ + return self._layers + + @staticmethod + def _in_place_sort_and_scale( + collection: GeometryCollection, scaling: Optional[float], sorting: Optional[AnnotationSorting | str] + ) -> None: + if sorting == "REVERSE": + raise NotImplementedError("This doesn't work for now.") + + if scaling != 1.0 and scaling is not None: + collection.scale(scaling) + if sorting == AnnotationSorting.NONE or sorting is None: + return + if isinstance(sorting, str): + key, reverse = AnnotationSorting[sorting].to_sorting_params() + else: + key, reverse = sorting.to_sorting_params() + collection.sort_polygons(key, reverse) + + @property + def bounding_box(self) -> tuple[tuple[float, float], tuple[float, float]]: + """Get the bounding box of the annotations combining points and polygons. + + Returns + ------- + tuple[tuple[float, float], tuple[float, float]] + The bounding box of the annotations. + + """ + return self._layers.bounding_box + + def bounding_box_at_scaling(self, scaling: float) -> tuple[tuple[float, float], tuple[float, float]]: + """Get the bounding box of the annotations at a specific scaling factor. + + Parameters + ---------- + scaling : float + The scaling factor to apply to the annotations. + + Returns + ------- + tuple[tuple[float, float], tuple[float, float]] + The bounding box of the annotations at the specific scaling factor. + + """ + bbox = self.bounding_box + return ((bbox[0][0] * scaling, bbox[0][1] * scaling), (bbox[1][0] * scaling, bbox[1][1] * scaling)) + + def simplify(self, tolerance: float) -> None: + """Simplify the polygons in the annotation (i.e. reduce points). Other annotations will remain unchanged. + All points in the resulting polygons object will be in the tolerance distance of the original polygon. + + Parameters + ---------- + tolerance : float + The tolerance to simplify the polygons with. + Returns + ------- + None + """ + self._layers.simplify(tolerance) + + def __contains__(self, item: str | Point | Polygon) -> bool: + if isinstance(item, str): + return item in self.available_classes + if isinstance(item, Point): + return item in self.layers.points + if isinstance(item, Polygon): + return item in self.layers.polygons + + return False + + def __len__(self) -> int: + return self._layers.size() + + @property + def available_classes(self) -> set[str]: + """Get the available classes in the annotations. + + Returns + ------- + set[str] + The available classes in the annotations. + + """ + available_classes = set() + for polygon in self.layers.polygons: + if polygon.label is not None: + available_classes.add(polygon.label) + for point in self.layers.points: + if point.label is not None: + available_classes.add(point.label) + + return available_classes + + def __iter__(self) -> Iterable[Polygon | Point]: + # First returns all the polygons then all points + for polygon in self.layers.polygons: + yield polygon + + for point in self.layers.points: + yield point + + def __add__(self, other: Any) -> "SlideAnnotations": + """ + Add two annotations together. This will return a new `SlideAnnotations` object with the annotations combined. + The polygons will be added from left to right followed the points from left to right. + + Notes + ----- + - The polygons and points are shared between the objects. This means that if you modify the polygons or points + in the new object, the original objects will also be modified. If you wish to avoid this, you must add two + copies together. + - Note that the sorting is not applied to this object. You can apply this by calling `sort_polygons()` on + the resulting object. + + Parameters + ---------- + other : SlideAnnotations + The other annotations to add. + + """ + if not isinstance(other, (SlideAnnotations, Point, Polygon, list)): + raise TypeError(f"Unsupported type {type(other)}") + + if isinstance(other, SlideAnnotations): + if not self.sorting == other.sorting: + raise TypeError("Cannot add annotations with different sorting.") + if self.offset_function != other.offset_function: + raise TypeError( + "Cannot add annotations with different requirements for offsetting to slide bounds " + "(`offset_function`)." + ) + + tags: tuple[SlideTag, ...] = () + if self.tags is None and other.tags is not None: + tags = other.tags + + if other.tags is None and self.tags is not None: + tags = self.tags + + if self.tags is not None and other.tags is not None: + tags = tuple(set(self.tags + other.tags)) + + # Let's add the annotations + collection = GeometryCollection() + for polygon in self.layers.polygons: + collection.add_polygon(copy.deepcopy(polygon)) + for point in self.layers.points: + collection.add_point(copy.deepcopy(point)) + + for polygon in other.layers.polygons: + collection.add_polygon(copy.deepcopy(polygon)) + for point in other.layers.points: + collection.add_point(copy.deepcopy(point)) + + SlideAnnotations._in_place_sort_and_scale(collection, None, self.sorting) + return self.__class__(layers=collection, tags=tuple(tags) if tags else None, sorting=self.sorting) + + if isinstance(other, (Point, Polygon)): + other = [other] + + if isinstance(other, list): + if not all(isinstance(item, (Point, Polygon)) for item in other): + raise TypeError( + f"can only add list purely containing Point and Polygon objects to {self.__class__.__name__}" + ) + + collection = copy.copy(self._layers) + for item in other: + if isinstance(item, Polygon): + collection.add_polygon(item) + elif isinstance(item, Point): + collection.add_point(item) + SlideAnnotations._in_place_sort_and_scale(collection, None, self.sorting) + return self.__class__(layers=collection, tags=copy.copy(self._tags), sorting=self.sorting) + + raise ValueError(f"Unsupported type {type(other)}") + + def __iadd__(self, other: Any) -> "SlideAnnotations": + if isinstance(other, (Point, Polygon)): + other = [other] + + if isinstance(other, list): + if not all(isinstance(item, (Point, Polygon)) for item in other): + raise TypeError( + f"can only add list purely containing Point and Polygon objects {self.__class__.__name__}" + ) + + for item in other: + if isinstance(item, Polygon): + self._layers.add_polygon(copy.deepcopy(item)) + elif isinstance(item, Point): + self._layers.add_point(copy.deepcopy(item)) + + elif isinstance(other, SlideAnnotations): + if self.sorting != other.sorting or self.offset_function != other.offset_function: + raise ValueError( + f"Both sorting and offset_function must be the same to " f"add {self.__class__.__name__}s together." + ) + + if self._tags is None: + self._tags = other._tags + elif other._tags is not None: + assert self + self._tags += other._tags + + for polygon in other.layers.polygons: + self._layers.add_polygon(copy.deepcopy(polygon)) + for point in other.layers.points: + self._layers.add_point(copy.deepcopy(point)) + else: + return NotImplemented + SlideAnnotations._in_place_sort_and_scale(self._layers, None, self.sorting) + + return self + + def __radd__(self, other: Any) -> "SlideAnnotations": + # in-place addition (+=) of Point and Polygon will raise a TypeError + if not isinstance(other, (SlideAnnotations, Point, Polygon, list)): + raise TypeError(f"Unsupported type {type(other)}") + if isinstance(other, list): + if not all(isinstance(item, (Polygon, Point)) for item in other): + raise TypeError( + f"can only add list purely containing Point and Polygon objects to {self.__class__.__name__}" + ) + raise TypeError( + "use the __add__ or __iadd__ operator instead of __radd__ when working with lists to avoid \ + unexpected behavior." + ) + return self + other + + def __sub__(self, other: Any) -> "SlideAnnotations": + return NotImplemented + + def __isub__(self, other: Any) -> "SlideAnnotations": + return NotImplemented + + def __rsub__(self, other: Any) -> "SlideAnnotations": + return NotImplemented + + def __eq__(self, other: Any) -> bool: + if not isinstance(other, SlideAnnotations): + return False + + our_sorting = self._sorting if self._sorting != AnnotationSorting.NONE else None + other_sorting = other._sorting if other._sorting != AnnotationSorting.NONE else None + + if our_sorting != other_sorting: + return False + + if self._tags != other._tags: + return False + + if self._layers != other._layers: + return False + + if self._offset_function != other._offset_function: + return False + + return True + + def read_region( + self, + coordinates: tuple[GenericNumber, GenericNumber], + scaling: float, + size: tuple[GenericNumber, GenericNumber], + ) -> AnnotationRegion: + """Reads the region of the annotations. Function signature is the same as `dlup.SlideImage` + so they can be used in conjunction. + + The process is as follows: + + 1. All the annotations which overlap with the requested region of interest are filtered + 2. The polygons in the GeometryContainer in `.layers` are cropped. + The boxes and points are only filtered, so it's possible the boxes have negative (x, y) values + 3. The annotation is rescaled and shifted to the origin to match the local patch coordinate system. + + The final returned data is a `dlup.geometry.AnnotationRegion`. + + Parameters + ---------- + location: tuple[GenericNumber, GenericNumber] + Top-left coordinates of the region in the requested scaling + size : tuple[GenericNumber, GenericNumber] + Output size of the region + scaling : float + Scaling to apply compared to the base level + + Returns + ------- + AnnotationRegion + + Examples + -------- + 1. To read geojson annotations and convert them into masks: + + >>> from pathlib import Path + >>> from dlup import SlideImage + >>> import numpy as np + >>> wsi = SlideImage.from_file_path(Path("path/to/image.svs")) + >>> wsi = wsi.get_scaled_view(scaling=0.5) + >>> wsi = wsi.read_region(location=(0,0), size=wsi.size) + >>> annotations = SlideAnnotations.from_geojson("path/to/geojson.json") + >>> region = annotations.read_region((0,0), 0.01, wsi.size) + >>> mask = region.to_mask() + >>> color_mask = annotations.color_lut[mask] + >>> polygons = region.polygons.get_geometries() # This is a list of `dlup.geometry.Polygon` objects + """ + region = self._layers.read_region(coordinates, scaling, size) + return region + + def scale(self, scaling: float) -> None: + """ + Scale the annotations by a multiplication factor. + This operation will be performed in-place. + + Parameters + ---------- + scaling : float + The scaling factor to apply to the annotations. + + Notes + ----- + This invalidates the R-tree. You could rebuild this manually using `.rebuild_rtree()`, or have the function + `read_region()` do it for you on-demand. + + Returns + ------- + None + """ + self._layers.scale(scaling) + + def set_offset(self, offset: tuple[float, float]) -> None: + """Set the offset for the annotations. This operation will be performed in-place. + + For example, if the offset is 1, 1, the annotations will be moved by 1 unit in the x and y direction. + + Parameters + ---------- + offset : tuple[float, float] + The offset to apply to the annotations. + + Notes + ----- + This invalidates the R-tree. You could rebuild this manually using `.rebuild_rtree()`, or have the function + `read_region()` do it for you on-demand. + + Returns + ------- + None + """ + self._layers.set_offset(offset) + + def rebuild_rtree(self) -> None: + """ + Rebuild the R-tree for the annotations. This operation will be performed in-place. + The R-tree is used for fast spatial queries on the annotations and is invalidated when the annotations are + modified. This function will rebuild the R-tree. Strictly speaking, this is not required as the R-tree will be + rebuilt on-demand when you invoke a `read_region()`. You could however do this if you want to avoid + the `read_region()` to do it for you the first time it runs. + """ + + self._layers.rebuild_rtree() + + def reindex_polygons(self, index_map: dict[str, int]) -> None: + """ + Reindex the polygons in the annotations. This operation will be performed in-place. + This is useful if you want to change the index of the polygons in the annotations. + + This requires that the `.label` property on the polygons is set. + + Parameters + ---------- + index_map : dict[str, int] + A dictionary that maps the label to the new index. + + Returns + ------- + None + """ + self._layers.reindex_polygons(index_map) + + def relabel_polygons(self, relabel_map: dict[str, str]) -> None: + """ + Relabel the polygons in the annotations. This operation will be performed in-place. + + Parameters + ---------- + relabel_map : dict[str, str] + A dictionary that maps the label to the new label. + + Returns + ------- + None + """ + # TODO: Implement in C++ + for polygon in self._layers.polygons: + if not polygon.label: + continue + if polygon.label in relabel_map: + polygon.label = relabel_map[polygon.label] + + def filter_polygons(self, label: str) -> None: + """Filter polygons in-place. + + Note + ---- + This will internally invalidate the R-tree. You could rebuild this manually using `.rebuild_rtree()`, or + have the function itself do this on-demand (typically when you invoke a `.read_region()`) + + Parameters + ---------- + label : str + The label to filter. + + """ + for polygon in self._layers.polygons: + if polygon.label == label: + self._layers.remove_polygon(polygon) + + def filter_points(self, label: str) -> None: + """Filter points in-place. + + Note + ---- + This will internally invalidate the R-tree. You could rebuild this manually using `.rebuild_rtree()`, or + have the function itself do this on-demand (typically when you invoke a `.read_region()`) + + Parameters + ---------- + label : str + The label to filter. + + """ + for point in self._layers.points: + if point.label == label: + self._layers.remove_point(point) + + def filter(self, label: str) -> None: + """Filter annotations in-place. + + Note + ---- + This will internally invalidate the R-tree. You could rebuild this manually using `.rebuild_rtree()`, or + have the function itself do this on-demand (typically when you invoke a `.read_region()`) + + Parameters + ---------- + label : str + The label to filter. + + """ + self.filter_polygons(label) + self.filter_points(label) + + def sort_polygons(self, key: Callable[[Polygon], int | float | str], reverse: bool = False) -> None: + """Sort the polygons in-place. + + Parameters + ---------- + key : callable + The key to sort the polygons on, this has to be a lambda function or similar. + For instance `lambda polygon: polygon.area` will sort the polygons on the area, or + `lambda polygon: polygon.get_field(field_name)` will sort the polygons on that field. + reverse : bool + Whether to sort in reverse order. + + Note + ---- + This will internally invalidate the R-tree. You could rebuild this manually using `.rebuild_rtree()`, or + have the function itself do this on-demand (typically when you invoke a `.read_region()`) + + Returns + ------- + None + + """ + self._layers.sort_polygons(key, reverse) + + @property + def color_lut(self) -> npt.NDArray[np.uint8]: + """Get the color lookup table for the annotations. + + Requires that the polygons have an index and color set. Be aware that for the background always + the value 0 is assumed. + So if you are using the `to_mask(default_value=0)` with a default value other than 0, + the LUT will still have this as index 0. + + Example + ------- + >>> color_lut = annotations.color_lut + >>> region = annotations.read_region(region_start, 0.02, region_size).to_mask() + >>> colored_mask = PIL.Image.fromarray(color_lut[mask]) + + Returns + ------- + np.ndarray + The color lookup table. + + """ + return self._layers.color_lut + + def __copy__(self) -> "SlideAnnotations": + return self.__class__(layers=copy.copy(self._layers), tags=copy.copy(self._tags)) + + def __deepcopy__(self, memo: dict[int, Any]) -> "SlideAnnotations": + return self.__class__(layers=copy.deepcopy(self._layers, memo), tags=copy.deepcopy(self._tags, memo)) + + def copy(self) -> "SlideAnnotations": + return self.__copy__() + + +SlideAnnotations.register_importer(None, "geojson") +SlideAnnotations.register_importer(None, "halo_xml") +SlideAnnotations.register_importer(None, "asap_xml") +SlideAnnotations.register_importer(None, "dlup_xml") +SlideAnnotations.register_importer(None, "darwin_json") + +SlideAnnotations.register_exporter(None, "geojson") +SlideAnnotations.register_exporter(None, "dlup_xml") diff --git a/dlup/annotations/exporters/__init__.py b/dlup/annotations/exporters/__init__.py new file mode 100644 index 00000000..8d73ff9f --- /dev/null +++ b/dlup/annotations/exporters/__init__.py @@ -0,0 +1 @@ +# Copyright (c) dlup contributors diff --git a/dlup/annotations/exporters/dlup_xml.py b/dlup/annotations/exporters/dlup_xml.py new file mode 100644 index 00000000..5483c727 --- /dev/null +++ b/dlup/annotations/exporters/dlup_xml.py @@ -0,0 +1,86 @@ +# Copyright (c) dlup contributors +from datetime import datetime +from typing import Optional + +from xsdata.formats.dataclass.serializers import XmlSerializer +from xsdata.formats.dataclass.serializers.config import SerializerConfig +from xsdata.models.datatype import XmlDate + +import dlup +from dlup.utils.annotations_utils import rgb_to_hex +from dlup.utils.geometry_xml import create_xml_geometries, create_xml_rois +from dlup.utils.schemas.generated import DlupAnnotations as XMLDlupAnnotations +from dlup.utils.schemas.generated import Metadata as XMLMetadata +from dlup.utils.schemas.generated import Tag as XMLTag +from dlup.utils.schemas.generated import Tags as XMLTags + + +def dlup_xml_exporter( + cls: "dlup.annotations.SlideAnnotations", + image_id: Optional[str] = None, + description: Optional[str] = None, + version: Optional[str] = None, + authors: Optional[list[str]] = None, + indent: Optional[int] = 2, +) -> str: + """ + Output the annotations as DLUP XML. + This format supports the complete serialization of a SlideAnnotations object. + + Parameters + ---------- + image_id : str, optional + The image ID corresponding to this annotation. + description : str, optional + Description of the annotations. + version : str, optional + Version of the annotations. + authors : list[str], optional + Authors of the annotations. + indent : int, optional + Indent for pretty printing. + + Returns + ------- + str + The output as a DLUP XML string. + """ + + metadata = XMLMetadata( + image_id=image_id if image_id is not None else "", + description=description if description is not None else "", + version=version if version is not None else "", + authors=XMLMetadata.Authors(authors) if authors is not None else None, + date_created=XmlDate.from_string(datetime.now().strftime("%Y-%m-%d")), + software=f"dlup {dlup.__version__}", + ) + xml_tags: list[XMLTag] = [] + if cls.tags: + for tag in cls.tags: + if tag.attributes: + attrs = [ + XMLTag.Attribute(value=_.label, color=rgb_to_hex(*_.color) if _.color else None) + for _ in tag.attributes + ] + xml_tag = XMLTag( + attribute=attrs if tag.attributes else [], + label=tag.label, + color=rgb_to_hex(*tag.color) if tag.color else None, + ) + xml_tags.append(xml_tag) + + tags = XMLTags(tag=xml_tags) if xml_tags else None + + geometries = create_xml_geometries(cls._layers) + rois = create_xml_rois(cls._layers) + + extra_annotation_params: dict[str, XMLTags] = {} + if tags: + extra_annotation_params["tags"] = tags + + dlup_annotations = XMLDlupAnnotations( + metadata=metadata, geometries=geometries, regions_of_interest=rois, **extra_annotation_params + ) + config = SerializerConfig(pretty_print=True) + serializer = XmlSerializer(config=config) + return serializer.render(dlup_annotations) diff --git a/dlup/annotations/exporters/geojson.py b/dlup/annotations/exporters/geojson.py new file mode 100644 index 00000000..1d3a93f0 --- /dev/null +++ b/dlup/annotations/exporters/geojson.py @@ -0,0 +1,104 @@ +# Copyright (c) dlup contributors +import warnings +from typing import Any, Optional, TypedDict + +import dlup +from dlup.geometry import Point, Polygon + + +class GeoJsonDict(TypedDict): + """ + TypedDict for standard GeoJSON output + """ + + id: str | None + type: str + features: list[dict[str, str | dict[str, str]]] + metadata: Optional[dict[str, str | list[str]]] + + +def _geometry_to_geojson( + geometry: Polygon | Point, label: str | None, color: tuple[int, int, int] | None +) -> dict[str, Any]: + """Function to convert a geometry to a GeoJSON object. + + Parameters + ---------- + geometry : DlupPolygon | DlupPoint + A polygon or point object + label : str + The label name + color : tuple[int, int, int] + The color of the object in RGB values + + Returns + ------- + dict[str, Any] + Output dictionary representing the data in GeoJSON + + """ + geojson: dict[str, Any] = { + "type": "Feature", + "properties": { + "classification": { + "name": label, + }, + }, + "geometry": {}, + } + + if isinstance(geometry, Polygon): + # Construct the coordinates for the polygon + exterior = geometry.get_exterior() # Get exterior coordinates + interiors = geometry.get_interiors() # Get interior coordinates (holes) + + # GeoJSON requires [ [x1, y1], [x2, y2], ... ] format + geojson["geometry"] = { + "type": "Polygon", + "coordinates": [[list(coord) for coord in exterior]] # Exterior ring + + [[list(coord) for coord in interior] for interior in interiors], # Interior rings (holes) + } + + elif isinstance(geometry, Point): + # Construct the coordinates for the point + geojson["geometry"] = { + "type": "Point", + "coordinates": [geometry.x, geometry.y], + } + else: + raise ValueError(f"Unsupported geometry type {type(geometry)}") + + if color is not None: + classification: dict[str, Any] = geojson["properties"]["classification"] + classification["color"] = color + + return geojson + + +def geojson_exporter(cls: "dlup.annotations.SlideAnnotations") -> GeoJsonDict: + """ + Output the annotations as proper geojson. These outputs are sorted according to the `AnnotationSorting` selected + for the annotations. This ensures the annotations are correctly sorted in the output. + + The output is not completely GeoJSON compliant as some parts such as the metadata and properties are not part + of the standard. However, these are implemented to ensure the output is compatible with QuPath. + + Returns + ------- + GeoJsonDict + The output as a GeoJSON dictionary. + """ + data: GeoJsonDict = {"type": "FeatureCollection", "metadata": None, "features": [], "id": None} + if cls.tags: + data["metadata"] = {"tags": [_.label for _ in cls.tags]} + + if cls.layers.boxes: + warnings.warn("Bounding boxes are not supported in GeoJSON and will be skipped.", UserWarning) + + all_layers = cls.layers.polygons + cls.layers.points + for idx, curr_annotation in enumerate(all_layers): + json_dict = _geometry_to_geojson(curr_annotation, label=curr_annotation.label, color=curr_annotation.color) + json_dict["id"] = str(idx) + data["features"].append(json_dict) + + return data diff --git a/dlup/annotations/importers/asap_xml.py b/dlup/annotations/importers/asap_xml.py new file mode 100644 index 00000000..2b6f7367 --- /dev/null +++ b/dlup/annotations/importers/asap_xml.py @@ -0,0 +1,112 @@ +# Copyright (c) dlup contributors +""" +Experimental annotations module for dlup. + +""" +from __future__ import annotations + +import errno +import os +import pathlib +import xml.etree.ElementTree as ET +from typing import Optional, Type, TypeVar + +from dlup._types import PathLike +from dlup.annotations import AnnotationSorting, SlideAnnotations +from dlup.geometry import GeometryCollection, Point, Polygon +from dlup.utils.annotations_utils import hex_to_rgb + +_TSlideAnnotations = TypeVar("_TSlideAnnotations", bound="SlideAnnotations") + + +def _parse_asap_coordinates( + annotation_structure: ET.Element, +) -> list[tuple[float, float]]: + """ + Parse ASAP XML coordinates into list. + + Parameters + ---------- + annotation_structure : list of strings + + Returns + ------- + list[tuple[float, float]] + + """ + coordinates = [] + coordinate_structure = annotation_structure[0] + + for coordinate in coordinate_structure: + coordinates.append( + ( + float(coordinate.get("X").replace(",", ".")), # type: ignore + float(coordinate.get("Y").replace(",", ".")), # type: ignore + ) + ) + + return coordinates + + +def asap_xml_importer( + cls: Type[_TSlideAnnotations], + asap_xml: PathLike, + scaling: float | None = None, + sorting: AnnotationSorting | str = AnnotationSorting.AREA, + roi_names: Optional[list[str]] = None, +) -> _TSlideAnnotations: + """ + Read annotations as an ASAP [1] XML file. ASAP is a tool for viewing and annotating whole slide images. + + Parameters + ---------- + asap_xml : PathLike + Path to ASAP XML annotation file. + scaling : float, optional + Scaling factor. Sometimes required when ASAP annotations are stored in a different resolution than the + original image. + sorting: AnnotationSorting + The sorting to apply to the annotations. Check the `AnnotationSorting` enum for more information. + By default, the annotations are sorted by area. + roi_names : list[str], optional + List of names that should be considered as regions of interest. If set, these will be added as ROIs rather + than polygons. + + References + ---------- + .. [1] https://github.com/computationalpathologygroup/ASAP + + Returns + ------- + SlideAnnotations + """ + path = pathlib.Path(asap_xml) + roi_names = [] if roi_names is None else roi_names + if not path.exists(): + raise FileNotFoundError(errno.ENOENT, os.strerror(errno.ENOENT), str(path)) + + tree = ET.parse(asap_xml) + opened_annotation = tree.getroot() + collection: GeometryCollection = GeometryCollection() + for parent in opened_annotation: + for child in parent: + if child.tag != "Annotation": + continue + label = child.attrib.get("PartOfGroup").strip() # type: ignore + color = hex_to_rgb(child.attrib.get("Color").strip()) # type: ignore + + annotation_type = child.attrib.get("Type").lower() # type: ignore + coordinates = _parse_asap_coordinates(child) + + if annotation_type == "pointset": + for point in coordinates: + collection.add_point(Point(point, label=label, color=color)) + + elif annotation_type == "polygon": + if label in roi_names: + collection.add_roi(Polygon(coordinates, [], label=label, color=color)) + else: + collection.add_polygon(Polygon(coordinates, [], label=label, color=color)) + + SlideAnnotations._in_place_sort_and_scale(collection, scaling, sorting) + return cls(layers=collection) diff --git a/dlup/annotations/importers/darwin_json.py b/dlup/annotations/importers/darwin_json.py new file mode 100644 index 00000000..7dfd3168 --- /dev/null +++ b/dlup/annotations/importers/darwin_json.py @@ -0,0 +1,246 @@ +# Copyright (c) dlup contributors +""" +Experimental annotations module for dlup. + +""" +from __future__ import annotations + +import errno +import functools +import os +import pathlib +import warnings +from enum import Enum +from typing import Any, Iterable, NamedTuple, Optional, Type, TypeVar + +from dlup._types import PathLike +from dlup.annotations import AnnotationSorting, SlideAnnotations +from dlup.annotations.tags import SlideTag, TagAttribute +from dlup.geometry import GeometryCollection, Point, Polygon +from dlup.utils.imports import DARWIN_SDK_AVAILABLE + +_TSlideAnnotations = TypeVar("_TSlideAnnotations", bound="SlideAnnotations") + + +class AnnotationType(str, Enum): + POINT = "POINT" + BOX = "BOX" + POLYGON = "POLYGON" + TAG = "TAG" + RASTER = "RASTER" + + +class DarwinV7Metadata(NamedTuple): + label: str + color: Optional[tuple[int, int, int]] + annotation_type: AnnotationType + + +@functools.lru_cache(maxsize=None) +def get_v7_metadata(filename: pathlib.Path) -> Optional[dict[tuple[str, str], DarwinV7Metadata]]: + if not DARWIN_SDK_AVAILABLE: + raise ImportError("`darwin` is not available. Install using `python -m pip install darwin-py`.") + import darwin.path_utils + + if not filename.is_dir(): + raise ValueError("Provide the path to the root folder of the Darwin V7 annotations") + + v7_metadata_fn = filename / ".v7" / "metadata.json" + if not v7_metadata_fn.exists(): + return None + v7_metadata = darwin.path_utils.parse_metadata(v7_metadata_fn) + output = {} + for sample in v7_metadata["classes"]: + annotation_type = sample["type"] + # This is not implemented and can be skipped. The main function will raise a NonImplementedError + if annotation_type == "raster_layer": + continue + + label = sample["name"] + color = sample["color"][5:-1].split(",") + if color[-1] != "1.0": + raise RuntimeError("Expected A-channel of color to be 1.0") + rgb_colors = (int(color[0]), int(color[1]), int(color[2])) + + output[(label, annotation_type)] = DarwinV7Metadata( + label=label, color=rgb_colors, annotation_type=annotation_type + ) + return output + + +def _parse_darwin_complex_polygon( + annotation: dict[str, Any], label: str, color: Optional[tuple[int, int, int]] +) -> Iterable[Polygon]: + """ + Parse a complex polygon (i.e. polygon with holes) from a Darwin annotation. + + Parameters + ---------- + annotation : dict + The annotation dictionary + label : str + The label of the annotation + color : tuple[int, int, int] + The color of the annotation + + Returns + ------- + Iterable[DlupPolygon] + """ + # Create Polygons and sort by area in descending order + polygons = [Polygon([(p["x"], p["y"]) for p in path], []) for path in annotation["paths"]] + polygons.sort(key=lambda x: x.area, reverse=True) + + outer_polygons: list[tuple[Polygon, list[Any], bool]] = [] + for polygon in polygons: + is_hole = False + # Check if the polygon can be a hole in any of the previously processed polygons + for outer_poly, holes, outer_poly_is_hole in reversed(outer_polygons): + contains = outer_poly.contains(polygon) + # If polygon is contained by a hole, it should be added as new polygon + if contains and outer_poly_is_hole: + break + # Polygon is added as hole if outer polygon is not a hole + elif contains: + holes.append(polygon.get_exterior()) + is_hole = True + break + outer_polygons.append((polygon, [], is_hole)) + + for outer_poly, holes, _is_hole in outer_polygons: + if not _is_hole: + polygon = Polygon(outer_poly.get_exterior(), holes) + polygon.label = label + polygon.color = color + yield polygon + + +def darwin_json_importer( + cls: Type[_TSlideAnnotations], + darwin_json: PathLike, + scaling: float | None = None, + sorting: AnnotationSorting | str = AnnotationSorting.NONE, + z_indices: Optional[dict[str, int]] = None, + roi_names: Optional[list[str]] = None, +) -> _TSlideAnnotations: + """ + Read annotations as a V7 Darwin [1] JSON file. If available will read the `.v7/metadata.json` file to extract + colors from the annotations. + + Parameters + ---------- + darwin_json : PathLike + Path to the Darwin JSON file. + sorting: AnnotationSorting + The sorting to apply to the annotations. Check the `AnnotationSorting` enum for more information. + By default, the annotations are sorted by the z-index which is generated by the order of the saved + annotations. + scaling : float, optional + Scaling factor. Sometimes required when Darwin annotations are stored in a different resolution + than the original image. + z_indices: dict[str, int], optional + If set, these z_indices will be used rather than the default order. + roi_names : list[str], optional + List of names that should be considered as regions of interest. If set, these will be added as ROIs rather + than polygons. + + References + ---------- + .. [1] https://darwin.v7labs.com/ + + Returns + ------- + SlideAnnotations + + """ + if not DARWIN_SDK_AVAILABLE: + raise RuntimeError("`darwin` is not available. Install using `python -m pip install darwin-py`.") + import darwin + + roi_names = [] if roi_names is None else roi_names + + darwin_json_fn = pathlib.Path(darwin_json) + if not darwin_json_fn.exists(): + raise FileNotFoundError(errno.ENOENT, os.strerror(errno.ENOENT), str(darwin_json_fn)) + + darwin_an = darwin.utils.parse_darwin_json(darwin_json_fn, None) + v7_metadata = get_v7_metadata(darwin_json_fn.parent) + + tags = [] + polygons: list[tuple[Polygon, int]] = [] + + collection = GeometryCollection() + for curr_annotation in darwin_an.annotations: + name = curr_annotation.annotation_class.name + annotation_type = curr_annotation.annotation_class.annotation_type + if annotation_type == "raster_layer": + raise NotImplementedError("Raster annotations are not supported.") + + annotation_color = v7_metadata[(name, annotation_type)].color if v7_metadata else None + + if annotation_type == "tag": + attributes = [] + if curr_annotation.subs: + for subannotation in curr_annotation.subs: + if subannotation.annotation_type == "attributes": + attributes.append(TagAttribute(label=subannotation.data, color=None)) + + tags.append( + SlideTag( + attributes=attributes if attributes != [] else None, + label=name, + color=annotation_color if annotation_color else None, + ) + ) + continue + + z_index = 0 if annotation_type == "keypoint" or z_indices is None else z_indices[name] + curr_data = curr_annotation.data + + if annotation_type == "keypoint": + x, y = curr_data["x"], curr_data["y"] + curr_point = Point(curr_data["x"], curr_data["y"]) + curr_point.label = name + curr_point.color = annotation_color + collection.add_point(curr_point) + + elif annotation_type in ("polygon", "complex_polygon"): + if "path" in curr_data: # This is a regular polygon + curr_polygon = Polygon( + [(_["x"], _["y"]) for _ in curr_data["path"]], [], label=name, color=annotation_color + ) + polygons.append((curr_polygon, z_index)) + + elif "paths" in curr_data: # This is a complex polygon which needs to be parsed with the even-odd rule + for curr_polygon in _parse_darwin_complex_polygon(curr_data, label=name, color=annotation_color): + polygons.append((curr_polygon, z_index)) + else: + raise ValueError(f"Got unexpected data keys: {curr_data.keys()}") + elif annotation_type == "bounding_box": + warnings.warn( + "Bounding box annotations are not fully supported and will be converted to Polygons.", UserWarning + ) + x, y, w, h = curr_data["x"], curr_data["y"], curr_data["w"], curr_data["h"] + curr_polygon = Polygon( + [(x, y), (x + w, y), (x + w, y + h), (x, y + h)], [], label=name, color=annotation_color + ) + polygons.append((curr_polygon, z_index)) + + else: + raise ValueError(f"Annotation type {annotation_type} is not supported.") + + if sorting == "Z_INDEX": + for polygon, _ in sorted(polygons, key=lambda x: x[1]): + if polygon.label in roi_names: + collection.add_roi(polygon) + else: + collection.add_polygon(polygon) + else: + for polygon, _ in polygons: + if polygon.label in roi_names: + collection.add_roi(polygon) + else: + collection.add_polygon(polygon) + + SlideAnnotations._in_place_sort_and_scale(collection, scaling, sorting="NONE" if sorting == "Z_INDEX" else sorting) + return cls(layers=collection, tags=tuple(tags), sorting=sorting) diff --git a/dlup/annotations/importers/dlup_xml.py b/dlup/annotations/importers/dlup_xml.py new file mode 100644 index 00000000..178fa8e6 --- /dev/null +++ b/dlup/annotations/importers/dlup_xml.py @@ -0,0 +1,148 @@ +import errno +import os +import pathlib +import xml.etree.ElementTree as ET +from dataclasses import asdict +from typing import Type, TypeVar + +from xsdata.formats.dataclass.parsers import XmlParser + +import dlup +from dlup._types import PathLike +from dlup.annotations.tags import SlideTag +from dlup.geometry import Box, GeometryCollection, Point, Polygon +from dlup.utils.annotations_utils import hex_to_rgb +from dlup.utils.geometry_xml import parse_dlup_xml_polygon, parse_dlup_xml_roi_box +from dlup.utils.schemas.generated import DlupAnnotations as XMLDlupAnnotations + +_TSlideAnnotations = TypeVar("_TSlideAnnotations", bound="dlup.annotations.SlideAnnotations") + + +def _parse_asap_coordinates( + annotation_structure: ET.Element, +) -> list[tuple[float, float]]: + """ + Parse ASAP XML coordinates into list. + + Parameters + ---------- + annotation_structure : list of strings + + Returns + ------- + list[tuple[float, float]] + + """ + coordinates = [] + coordinate_structure = annotation_structure[0] + + for coordinate in coordinate_structure: + coordinates.append( + ( + float(coordinate.get("X").replace(",", ".")), # type: ignore + float(coordinate.get("Y").replace(",", ".")), # type: ignore + ) + ) + + return coordinates + + +def dlup_xml_importer(cls: Type[_TSlideAnnotations], dlup_xml: PathLike) -> _TSlideAnnotations: + """ + Read annotations as a DLUP XML file. + + Parameters + ---------- + dlup_xml : PathLike + Path to the DLUP XML file. + + Returns + ------- + SlideAnnotations + """ + path = pathlib.Path(dlup_xml) + if not path.exists(): + raise FileNotFoundError(errno.ENOENT, os.strerror(errno.ENOENT), str(path)) + + parser = XmlParser() + with open(dlup_xml, "rb") as f: + dlup_annotations = parser.from_bytes(f.read(), XMLDlupAnnotations) + + metadata = None if not dlup_annotations.metadata else asdict(dlup_annotations.metadata) + tags: list[SlideTag] = [] + if dlup_annotations.tags: + for tag in dlup_annotations.tags.tag: + if not tag.label: + raise ValueError("Tag does not have a label.") + curr_tag = SlideTag(attributes=[], label=tag.label, color=hex_to_rgb(tag.color) if tag.color else None) + tags.append(curr_tag) + + collection = GeometryCollection() + polygons: list[tuple[Polygon, int]] = [] + if not dlup_annotations.geometries: + return cls(layers=collection, tags=tuple(tags)) + + if dlup_annotations.geometries.polygon: + polygons += parse_dlup_xml_polygon(dlup_annotations.geometries.polygon) + + # Complain if there are multipolygons + if dlup_annotations.geometries.multi_polygon: + for curr_polygons in dlup_annotations.geometries.multi_polygon: + polygons += parse_dlup_xml_polygon( + curr_polygons.polygon, + order=curr_polygons.order, + label=curr_polygons.label, + index=curr_polygons.index, + ) + + # Now we sort the polygons on order + for polygon, _ in sorted(polygons, key=lambda x: x[1]): + collection.add_polygon(polygon) + + for curr_point in dlup_annotations.geometries.point: + point = Point( + curr_point.x, + curr_point.y, + label=curr_point.label, + color=hex_to_rgb(curr_point.color) if curr_point.color else None, + ) + collection.add_point(point) + + # Complain if there are multipoints + if dlup_annotations.geometries.multi_point: + raise NotImplementedError("Multipoints are not supported.") + + for curr_box in dlup_annotations.geometries.box: + # mypy struggles + assert isinstance(curr_box.x_min, float) + assert isinstance(curr_box.y_min, float) + assert isinstance(curr_box.x_max, float) + assert isinstance(curr_box.y_max, float) + box = Box( + (curr_box.x_min, curr_box.y_min), + (curr_box.x_max - curr_box.x_min, curr_box.y_max - curr_box.y_min), + label=curr_box.label, + color=hex_to_rgb(curr_box.color) if curr_box.color else None, + ) + collection.add_box(box) + + rois: list[tuple[Polygon, int]] = [] + if dlup_annotations.regions_of_interest: + for region_of_interest in dlup_annotations.regions_of_interest.multi_polygon: + raise NotImplementedError( + "MultiPolygon regions of interest are not yet supported. " + "If you have a use case for this, " + "please open an issue at https://github.com/NKI-AI/dlup/issues." + ) + + if dlup_annotations.regions_of_interest.polygon: + rois += parse_dlup_xml_polygon(dlup_annotations.regions_of_interest.polygon) + + if dlup_annotations.regions_of_interest.box: + for _curr_box in dlup_annotations.regions_of_interest.box: + box, curr_order = parse_dlup_xml_roi_box(_curr_box) + rois.append((box.as_polygon(), curr_order)) + for roi, _ in sorted(rois, key=lambda x: x[1]): + collection.add_roi(roi) + + return cls(layers=collection, tags=tuple(tags), metadata=metadata) diff --git a/dlup/annotations/importers/geojson.py b/dlup/annotations/importers/geojson.py new file mode 100644 index 00000000..fbad2029 --- /dev/null +++ b/dlup/annotations/importers/geojson.py @@ -0,0 +1,131 @@ +# Copyright (c) dlup contributors +""" +Experimental annotations module for dlup. + +""" +from __future__ import annotations + +import errno +import json +import os +import pathlib +from typing import Optional, Type, TypedDict, TypeVar + +import numpy as np + +import dlup +from dlup._exceptions import AnnotationError +from dlup._types import PathLike +from dlup.annotations import AnnotationSorting, SlideAnnotations +from dlup.geometry import GeometryCollection, Point, Polygon +from dlup.utils.annotations_utils import get_geojson_color + +_TSlideAnnotations = TypeVar("_TSlideAnnotations", bound="dlup.annotations.SlideAnnotations") + + +class CoordinatesDict(TypedDict): + type: str + coordinates: list[list[list[float]]] + + +def geojson_to_dlup( + coordinates: CoordinatesDict, + label: str, + color: Optional[tuple[int, int, int]] = None, + z_index: Optional[int] = None, +) -> list[Polygon | Point]: + geom_type = coordinates.get("type", None) + if geom_type is None: + raise ValueError("No type found in coordinates.") + geom_type = geom_type.lower() + + if geom_type in ["point", "multipoint"] and z_index is not None: + raise AnnotationError("z_index is not supported for point annotations.") + + if geom_type == "point": + x, y = np.asarray(coordinates["coordinates"]) + return [Point(*(x, y), label=label, color=color)] + + if geom_type == "multipoint": + return [Point(*np.asarray(c).tolist(), label=label, color=color) for c in coordinates["coordinates"]] + + if geom_type == "polygon": + _coordinates = coordinates["coordinates"] + polygon = Polygon( + np.asarray(_coordinates[0]), [np.asarray(hole) for hole in _coordinates[1:]], label=label, color=color + ) + return [polygon] + if geom_type == "multipolygon": + output: list[Polygon | Point] = [] + for polygon_coords in coordinates["coordinates"]: + exterior = np.asarray(polygon_coords[0]) + interiors = [np.asarray(hole) for hole in polygon_coords[1:]] + output.append(Polygon(exterior, interiors, label=label, color=color)) + return output + + raise AnnotationError(f"Unsupported geom_type {geom_type}") + + +def geojson_importer( + cls: Type[_TSlideAnnotations], + geojsons: PathLike, + scaling: float | None = None, + sorting: AnnotationSorting | str = AnnotationSorting.NONE, + roi_names: Optional[list[str]] = None, +) -> _TSlideAnnotations: + """ + Read annotations from a GeoJSON file. + + Parameters + ---------- + geojsons : PathLike + GeoJSON file. In the properties key there must be a name which is the label of this + object. + scaling : float, optional + Scaling factor. Sometimes required when GeoJSON annotations are stored in a different resolution than the + original image. + sorting : AnnotationSorting + The sorting to apply to the annotations. Check the `AnnotationSorting` enum for more information. + By default, the annotations are sorted by area. + roi_names : list[str], optional + List of names that should be considered as regions of interest. If set, these will be added as ROIs rather + than polygons. + + Returns + ------- + SlideAnnotations + """ + roi_names = [] if roi_names is None else roi_names + collection = GeometryCollection() + path = pathlib.Path(geojsons) + if not path.exists(): + raise FileNotFoundError(errno.ENOENT, os.strerror(errno.ENOENT), str(path)) + + with open(path, "r", encoding="utf-8") as annotation_file: + geojson_dict = json.load(annotation_file) + features = geojson_dict["features"] + for x in features: + properties = x["properties"] + if "classification" in properties: + _label = properties["classification"]["name"] + _color = get_geojson_color(properties["classification"]) + elif properties.get("objectType", None) == "annotation": + _label = properties["name"] + _color = get_geojson_color(properties) + else: + raise ValueError("Could not find label in the GeoJSON properties.") + + _geometries = geojson_to_dlup(x["geometry"], label=_label, color=_color) + for geometry in _geometries: + if isinstance(geometry, Polygon): + if geometry.label in roi_names: + collection.add_roi(geometry) + else: + collection.add_polygon(geometry) + elif isinstance(geometry, Point): + collection.add_point(geometry) + else: + raise ValueError(f"Unsupported geometry type {geometry}") + + SlideAnnotations._in_place_sort_and_scale(collection, scaling, sorting) + return cls(layers=collection) diff --git a/dlup/annotations/importers/halo_xml.py b/dlup/annotations/importers/halo_xml.py new file mode 100644 index 00000000..32b4aeac --- /dev/null +++ b/dlup/annotations/importers/halo_xml.py @@ -0,0 +1,124 @@ +# Copyright (c) dlup contributors +""" +Experimental annotations module for dlup. + +""" +from __future__ import annotations + +import errno +import os +import pathlib +import warnings +from typing import Optional, Type, TypeVar + +from dlup import SlideImage +from dlup._types import PathLike +from dlup.annotations import AnnotationSorting, SlideAnnotations +from dlup.geometry import Box, GeometryCollection, Point, Polygon +from dlup.utils.imports import PYHALOXML_AVAILABLE + +_TSlideAnnotations = TypeVar("_TSlideAnnotations", bound="SlideAnnotations") + + +def halo_xml_importer( + cls: Type[_TSlideAnnotations], + halo_xml: PathLike, + scaling: float | None = None, + sorting: AnnotationSorting | str = AnnotationSorting.NONE, + box_as_polygon: bool = False, + roi_names: Optional[list[str]] = None, +) -> _TSlideAnnotations: + """ + Read annotations as a Halo [1] XML file. + This function requires `pyhaloxml` [2] to be installed. + + Parameters + ---------- + halo_xml : PathLike + Path to the Halo XML file. + scaling : float, optional + The scaling to apply to the annotations. + sorting: AnnotationSorting + The sorting to apply to the annotations. Check the `AnnotationSorting` enum for more information. By default + the annotations are not sorted as HALO supports hierarchical annotations. + box_as_polygon : bool + If True, rectangles are converted to polygons, and added as such. + This is useful when the rectangles are actually implicitly bounding boxes. + roi_names : list[str], optional + List of names that should be considered as regions of interest. If set, these will be added as ROIs rather + than polygons. + + References + ---------- + .. [1] https://indicalab.com/halo/ + .. [2] https://github.com/rharkes/pyhaloxml + + Returns + ------- + SlideAnnotations + """ + path = pathlib.Path(halo_xml) + roi_names = [] if roi_names is None else roi_names + + if not path.exists(): + raise FileNotFoundError(errno.ENOENT, os.strerror(errno.ENOENT), str(path)) + + if not PYHALOXML_AVAILABLE: + raise RuntimeError("`pyhaloxml` is not available. Install using `python -m pip install pyhaloxml`.") + import pyhaloxml.shapely + + collection = GeometryCollection() + with pyhaloxml.HaloXMLFile(halo_xml) as hx: + hx.matchnegative() + for layer in hx.layers: + _color = layer.linecolor.rgb + color = (_color[0], _color[1], _color[2]) + for region in layer.regions: + if region.type == pyhaloxml.RegionType.Rectangle: + # The data is a CCW polygon, so the first and one to last coordinates are the coordinates + vertices = region.getvertices() + min_x = min(v[0] for v in vertices) + max_x = max(v[0] for v in vertices) + min_y = min(v[1] for v in vertices) + max_y = max(v[1] for v in vertices) + curr_box = Box((min_x, min_y), (max_x - min_x, max_y - min_y)) + + if box_as_polygon: + polygon = curr_box.as_polygon() + if polygon.label in roi_names: + collection.add_roi(polygon) + else: + collection.add_polygon(polygon) + else: + collection.add_box(curr_box) + continue + + elif region.type in [pyhaloxml.RegionType.Ellipse, pyhaloxml.RegionType.Polygon]: + polygon = Polygon( + region.getvertices(), [x.getvertices() for x in region.holes], label=layer.name, color=color + ) + if polygon.label in roi_names: + collection.add_roi(polygon) + else: + collection.add_polygon(polygon) + elif region.type == pyhaloxml.RegionType.Pin: + point = Point(*(region.getvertices()[0]), label=layer.name, color=color) + collection.add_point(point) + elif region.type == pyhaloxml.RegionType.Ruler: + warnings.warn( + f"Ruler annotations are not supported. Annotation {layer.name} will be skipped", + UserWarning, + ) + continue + else: + raise NotImplementedError(f"Regiontype {region.type} is not implemented in dlup") + + SlideAnnotations._in_place_sort_and_scale(collection, scaling, sorting) + + def offset_function(slide: "SlideImage") -> tuple[int, int]: + return ( + slide.slide_bounds[0][0] - slide.slide_bounds[0][0] % 256, + slide.slide_bounds[0][1] - slide.slide_bounds[0][1] % 256, + ) + + return cls(collection, tags=None, sorting=sorting, offset_function=offset_function) diff --git a/dlup/annotations/tags.py b/dlup/annotations/tags.py new file mode 100644 index 00000000..da8b265a --- /dev/null +++ b/dlup/annotations/tags.py @@ -0,0 +1,12 @@ +from typing import NamedTuple, Optional + + +class TagAttribute(NamedTuple): + label: str + color: Optional[tuple[int, int, int]] + + +class SlideTag(NamedTuple): + attributes: Optional[list[TagAttribute]] + label: str + color: Optional[tuple[int, int, int]] diff --git a/dlup/annotations_experimental.py b/dlup/annotations_experimental.py deleted file mode 100644 index 43651a74..00000000 --- a/dlup/annotations_experimental.py +++ /dev/null @@ -1,1485 +0,0 @@ -# Copyright (c) dlup contributors -""" -Experimental annotations module for dlup. - -""" -from __future__ import annotations - -import copy -import errno -import functools -import json -import os -import pathlib -import warnings -import xml.etree.ElementTree as ET -from dataclasses import asdict -from datetime import datetime -from enum import Enum -from typing import Any, Callable, Iterable, NamedTuple, Optional, Type, TypedDict, TypeVar - -import numpy as np -import numpy.typing as npt -from xsdata.formats.dataclass.parsers import XmlParser -from xsdata.formats.dataclass.serializers import XmlSerializer -from xsdata.formats.dataclass.serializers.config import SerializerConfig -from xsdata.models.datatype import XmlDate - -from dlup import SlideImage, __version__ -from dlup._exceptions import AnnotationError -from dlup._geometry import AnnotationRegion # pylint: disable=no-name-in-module -from dlup._types import GenericNumber, PathLike -from dlup.geometry import Box, GeometryCollection, Point, Polygon -from dlup.utils.annotations_utils import get_geojson_color, hex_to_rgb, rgb_to_hex -from dlup.utils.geometry_xml import ( - create_xml_geometries, - create_xml_rois, - parse_dlup_xml_polygon, - parse_dlup_xml_roi_box, -) -from dlup.utils.imports import DARWIN_SDK_AVAILABLE, PYHALOXML_AVAILABLE -from dlup.utils.schemas.generated import DlupAnnotations as XMLDlupAnnotations -from dlup.utils.schemas.generated import Metadata as XMLMetadata -from dlup.utils.schemas.generated import Tag as XMLTag -from dlup.utils.schemas.generated import Tags as XMLTags - -_TSlideAnnotations = TypeVar("_TSlideAnnotations", bound="SlideAnnotations") - - -class AnnotationType(str, Enum): - POINT = "POINT" - BOX = "BOX" - POLYGON = "POLYGON" - TAG = "TAG" - RASTER = "RASTER" - - -class GeoJsonDict(TypedDict): - """ - TypedDict for standard GeoJSON output - """ - - id: str | None - type: str - features: list[dict[str, str | dict[str, str]]] - metadata: Optional[dict[str, str | list[str]]] - - -class CoordinatesDict(TypedDict): - type: str - coordinates: list[list[list[float]]] - - -class DarwinV7Metadata(NamedTuple): - label: str - color: Optional[tuple[int, int, int]] - annotation_type: AnnotationType - - -@functools.lru_cache(maxsize=None) -def get_v7_metadata(filename: pathlib.Path) -> Optional[dict[tuple[str, str], DarwinV7Metadata]]: - if not DARWIN_SDK_AVAILABLE: - raise ImportError("`darwin` is not available. Install using `python -m pip install darwin-py`.") - import darwin.path_utils - - if not filename.is_dir(): - raise ValueError("Provide the path to the root folder of the Darwin V7 annotations") - - v7_metadata_fn = filename / ".v7" / "metadata.json" - if not v7_metadata_fn.exists(): - return None - v7_metadata = darwin.path_utils.parse_metadata(v7_metadata_fn) - output = {} - for sample in v7_metadata["classes"]: - annotation_type = sample["type"] - # This is not implemented and can be skipped. The main function will raise a NonImplementedError - if annotation_type == "raster_layer": - continue - - label = sample["name"] - color = sample["color"][5:-1].split(",") - if color[-1] != "1.0": - raise RuntimeError("Expected A-channel of color to be 1.0") - rgb_colors = (int(color[0]), int(color[1]), int(color[2])) - - output[(label, annotation_type)] = DarwinV7Metadata( - label=label, color=rgb_colors, annotation_type=annotation_type - ) - return output - - -class AnnotationSorting(str, Enum): - """The ways to sort the annotations. This is used in the constructors of the `SlideAnnotations` class, and applied - to the output of `SlideAnnotations.read_region()`. - - - REVERSE: Sort the output in reverse order. - - AREA: Often when the annotation tools do not properly support hierarchical order, one would annotate in a way - that the smaller objects are on top of the larger objects. This option sorts the output by area, so that the - larger objects appear first in the output and then the smaller objects. - - Z_INDEX: Sort the output by the z-index of the annotations. This is useful when the annotations have a z-index - - NONE: Do not apply any sorting and output as is presented in the input file. - """ - - REVERSE = "REVERSE" - AREA = "AREA" - Z_INDEX = "Z_INDEX" - NONE = "NONE" - - def to_sorting_params(self) -> Any: - """Get the sorting parameters for the annotation sorting.""" - if self == AnnotationSorting.REVERSE: - return lambda x: None, True - - if self == AnnotationSorting.AREA: - return lambda x: x.area, False - - if self == AnnotationSorting.Z_INDEX: - return lambda x: x.get_field("z_index"), False - - -def _geometry_to_geojson( - geometry: Polygon | Point, label: str | None, color: tuple[int, int, int] | None -) -> dict[str, Any]: - """Function to convert a geometry to a GeoJSON object. - - Parameters - ---------- - geometry : DlupPolygon | DlupPoint - A polygon or point object - label : str - The label name - color : tuple[int, int, int] - The color of the object in RGB values - - Returns - ------- - dict[str, Any] - Output dictionary representing the data in GeoJSON - - """ - geojson: dict[str, Any] = { - "type": "Feature", - "properties": { - "classification": { - "name": label, - }, - }, - "geometry": {}, - } - - if isinstance(geometry, Polygon): - # Construct the coordinates for the polygon - exterior = geometry.get_exterior() # Get exterior coordinates - interiors = geometry.get_interiors() # Get interior coordinates (holes) - - # GeoJSON requires [ [x1, y1], [x2, y2], ... ] format - geojson["geometry"] = { - "type": "Polygon", - "coordinates": [[list(coord) for coord in exterior]] # Exterior ring - + [[list(coord) for coord in interior] for interior in interiors], # Interior rings (holes) - } - - elif isinstance(geometry, Point): - # Construct the coordinates for the point - geojson["geometry"] = { - "type": "Point", - "coordinates": [geometry.x, geometry.y], - } - else: - raise ValueError(f"Unsupported geometry type {type(geometry)}") - - if color is not None: - classification: dict[str, Any] = geojson["properties"]["classification"] - classification["color"] = color - - return geojson - - -def geojson_to_dlup( - coordinates: CoordinatesDict, - label: str, - color: Optional[tuple[int, int, int]] = None, - z_index: Optional[int] = None, -) -> list[Polygon | Point]: - geom_type = coordinates.get("type", None) - if geom_type is None: - raise ValueError("No type found in coordinates.") - geom_type = geom_type.lower() - - if geom_type in ["point", "multipoint"] and z_index is not None: - raise AnnotationError("z_index is not supported for point annotations.") - - if geom_type == "point": - x, y = np.asarray(coordinates["coordinates"]) - return [Point(*(x, y), label=label, color=color)] - - if geom_type == "multipoint": - return [Point(*np.asarray(c).tolist(), label=label, color=color) for c in coordinates["coordinates"]] - - if geom_type == "polygon": - _coordinates = coordinates["coordinates"] - polygon = Polygon( - np.asarray(_coordinates[0]), [np.asarray(hole) for hole in _coordinates[1:]], label=label, color=color - ) - return [polygon] - if geom_type == "multipolygon": - output: list[Polygon | Point] = [] - for polygon_coords in coordinates["coordinates"]: - exterior = np.asarray(polygon_coords[0]) - interiors = [np.asarray(hole) for hole in polygon_coords[1:]] - output.append(Polygon(exterior, interiors, label=label, color=color)) - return output - - raise AnnotationError(f"Unsupported geom_type {geom_type}") - - -class TagAttribute(NamedTuple): - label: str - color: Optional[tuple[int, int, int]] - - -class SlideTag(NamedTuple): - attributes: Optional[list[TagAttribute]] - label: str - color: Optional[tuple[int, int, int]] - - -class SlideAnnotations: - """Class that holds all annotations for a specific image""" - - def __init__( - self, - layers: GeometryCollection, - tags: Optional[tuple[SlideTag, ...]] = None, - sorting: Optional[AnnotationSorting | str] = None, - **kwargs: Any, - ) -> None: - """ - Parameters - ---------- - layers : GeometryCollection - Geometry collection containing the polygons, boxes and points - tags: Optional[tuple[SlideTag, ...]] - A tuple of image-level tags such as staining quality - sorting: AnnotationSorting - Sorting method, see `AnnotationSorting`. This value is typically passed to the constructor - because of operations layer on (such as `__add__`). Typically the classmethod already sorts the data - **kwargs: Any - Additional keyword arguments. In this class they are used for additional metadata or offset functions. - Currently only HaloXML requires offsets. See `.from_halo_xml` for an example - """ - self._layers = layers - self._tags = tags - self._sorting = sorting - self._offset_function: bool = bool(kwargs.get("offset_function", False)) - self._metadata: Optional[dict[str, list[str] | str | int | float | bool]] = kwargs.get("metadata", None) - - @property - def sorting(self) -> Optional[AnnotationSorting | str]: - return self._sorting - - @property - def tags(self) -> Optional[tuple[SlideTag, ...]]: - return self._tags - - @property - def num_polygons(self) -> int: - return len(self.layers.polygons) - - @property - def num_points(self) -> int: - return len(self.layers.points) - - @property - def num_boxes(self) -> int: - return len(self.layers.boxes) - - @property - def metadata(self) -> Optional[dict[str, list[str] | str | int | float | bool]]: - """Additional metadata for the annotations""" - return self._metadata - - @property - def offset_function(self) -> Any: - """ - In some cases a function needs to be applied to the coordinates which cannot be handled in this class as - it might require additional information. This function will be applied to the coordinates of all annotations. - This is useful from a file format which requires this, for instance HaloXML. - - Example - ------- - For HaloXML this is `offset = slide.slide_bounds[0] - slide.slide_bounds[0] % 256`. - >>> slide = Slide.from_file_path("image.svs") - >>> ann = SlideAnnotations.from_halo_xml("annotations.xml") - >>> assert ann.offset_function == lambda slide: slide.slide_bounds[0] - slide.slide_bounds[0] % 256 - >>> ann.set_offset(annotation.offset_function(slide)) - - Returns - ------- - Callable - - """ - return self._offset_function - - @offset_function.setter - def offset_function(self, func: Any) -> None: - self._offset_function = func - - @property - def layers(self) -> GeometryCollection: - """ - Get the layers of the annotations. - This is a GeometryCollection object which contains all the polygons and points - """ - return self._layers - - @classmethod - def from_geojson( - cls: Type[_TSlideAnnotations], - geojsons: PathLike, - scaling: float | None = None, - sorting: AnnotationSorting | str = AnnotationSorting.NONE, - roi_names: Optional[list[str]] = None, - ) -> _TSlideAnnotations: - """ - Read annotations from a GeoJSON file. - - Parameters - ---------- - geojsons : PathLike - GeoJSON file. In the properties key there must be a name which is the label of this - object. - scaling : float, optional - Scaling factor. Sometimes required when GeoJSON annotations are stored in a different resolution than the - original image. - sorting : AnnotationSorting - The sorting to apply to the annotations. Check the `AnnotationSorting` enum for more information. - By default, the annotations are sorted by area. - roi_names : list[str], optional - List of names that should be considered as regions of interest. If set, these will be added as ROIs rather - than polygons. - - Returns - ------- - SlideAnnotations - """ - roi_names = [] if roi_names is None else roi_names - collection = GeometryCollection() - path = pathlib.Path(geojsons) - if not path.exists(): - raise FileNotFoundError(errno.ENOENT, os.strerror(errno.ENOENT), str(path)) - - with open(path, "r", encoding="utf-8") as annotation_file: - geojson_dict = json.load(annotation_file) - features = geojson_dict["features"] - for x in features: - properties = x["properties"] - if "classification" in properties: - _label = properties["classification"]["name"] - _color = get_geojson_color(properties["classification"]) - elif properties.get("objectType", None) == "annotation": - _label = properties["name"] - _color = get_geojson_color(properties) - else: - raise ValueError("Could not find label in the GeoJSON properties.") - - _geometries = geojson_to_dlup(x["geometry"], label=_label, color=_color) - for geometry in _geometries: - if isinstance(geometry, Polygon): - if geometry.label in roi_names: - collection.add_roi(geometry) - else: - collection.add_polygon(geometry) - elif isinstance(geometry, Point): - collection.add_point(geometry) - else: - raise ValueError(f"Unsupported geometry type {geometry}") - - SlideAnnotations._in_place_sort_and_scale(collection, scaling, sorting) - return cls(layers=collection) - - @classmethod - def from_asap_xml( - cls: Type[_TSlideAnnotations], - asap_xml: PathLike, - scaling: float | None = None, - sorting: AnnotationSorting | str = AnnotationSorting.AREA, - roi_names: Optional[list[str]] = None, - ) -> _TSlideAnnotations: - """ - Read annotations as an ASAP [1] XML file. ASAP is a tool for viewing and annotating whole slide images. - - Parameters - ---------- - asap_xml : PathLike - Path to ASAP XML annotation file. - scaling : float, optional - Scaling factor. Sometimes required when ASAP annotations are stored in a different resolution than the - original image. - sorting: AnnotationSorting - The sorting to apply to the annotations. Check the `AnnotationSorting` enum for more information. - By default, the annotations are sorted by area. - roi_names : list[str], optional - List of names that should be considered as regions of interest. If set, these will be added as ROIs rather - than polygons. - - References - ---------- - .. [1] https://github.com/computationalpathologygroup/ASAP - - Returns - ------- - SlideAnnotations - """ - path = pathlib.Path(asap_xml) - roi_names = [] if roi_names is None else roi_names - if not path.exists(): - raise FileNotFoundError(errno.ENOENT, os.strerror(errno.ENOENT), str(path)) - - tree = ET.parse(asap_xml) - opened_annotation = tree.getroot() - collection: GeometryCollection = GeometryCollection() - for parent in opened_annotation: - for child in parent: - if child.tag != "Annotation": - continue - label = child.attrib.get("PartOfGroup").strip() # type: ignore - color = hex_to_rgb(child.attrib.get("Color").strip()) # type: ignore - - annotation_type = child.attrib.get("Type").lower() # type: ignore - coordinates = _parse_asap_coordinates(child) - - if annotation_type == "pointset": - for point in coordinates: - collection.add_point(Point(point, label=label, color=color)) - - elif annotation_type == "polygon": - if label in roi_names: - collection.add_roi(Polygon(coordinates, [], label=label, color=color)) - else: - collection.add_polygon(Polygon(coordinates, [], label=label, color=color)) - - SlideAnnotations._in_place_sort_and_scale(collection, scaling, sorting) - return cls(layers=collection) - - @classmethod - def from_darwin_json( - cls: Type[_TSlideAnnotations], - darwin_json: PathLike, - scaling: float | None = None, - sorting: AnnotationSorting | str = AnnotationSorting.NONE, - z_indices: Optional[dict[str, int]] = None, - roi_names: Optional[list[str]] = None, - ) -> _TSlideAnnotations: - """ - Read annotations as a V7 Darwin [1] JSON file. If available will read the `.v7/metadata.json` file to extract - colors from the annotations. - - Parameters - ---------- - darwin_json : PathLike - Path to the Darwin JSON file. - sorting: AnnotationSorting - The sorting to apply to the annotations. Check the `AnnotationSorting` enum for more information. - By default, the annotations are sorted by the z-index which is generated by the order of the saved - annotations. - scaling : float, optional - Scaling factor. Sometimes required when Darwin annotations are stored in a different resolution - than the original image. - z_indices: dict[str, int], optional - If set, these z_indices will be used rather than the default order. - roi_names : list[str], optional - List of names that should be considered as regions of interest. If set, these will be added as ROIs rather - than polygons. - - References - ---------- - .. [1] https://darwin.v7labs.com/ - - Returns - ------- - SlideAnnotations - - """ - if not DARWIN_SDK_AVAILABLE: - raise RuntimeError("`darwin` is not available. Install using `python -m pip install darwin-py`.") - import darwin - - roi_names = [] if roi_names is None else roi_names - - darwin_json_fn = pathlib.Path(darwin_json) - if not darwin_json_fn.exists(): - raise FileNotFoundError(errno.ENOENT, os.strerror(errno.ENOENT), str(darwin_json_fn)) - - darwin_an = darwin.utils.parse_darwin_json(darwin_json_fn, None) - v7_metadata = get_v7_metadata(darwin_json_fn.parent) - - tags = [] - polygons: list[tuple[Polygon, int]] = [] - - collection = GeometryCollection() - for curr_annotation in darwin_an.annotations: - name = curr_annotation.annotation_class.name - annotation_type = curr_annotation.annotation_class.annotation_type - if annotation_type == "raster_layer": - raise NotImplementedError("Raster annotations are not supported.") - - annotation_color = v7_metadata[(name, annotation_type)].color if v7_metadata else None - - if annotation_type == "tag": - attributes = [] - if curr_annotation.subs: - for subannotation in curr_annotation.subs: - if subannotation.annotation_type == "attributes": - attributes.append(TagAttribute(label=subannotation.data, color=None)) - - tags.append( - SlideTag( - attributes=attributes if attributes != [] else None, - label=name, - color=annotation_color if annotation_color else None, - ) - ) - continue - - z_index = 0 if annotation_type == "keypoint" or z_indices is None else z_indices[name] - curr_data = curr_annotation.data - - if annotation_type == "keypoint": - x, y = curr_data["x"], curr_data["y"] - curr_point = Point(curr_data["x"], curr_data["y"]) - curr_point.label = name - curr_point.color = annotation_color - collection.add_point(curr_point) - - elif annotation_type in ("polygon", "complex_polygon"): - if "path" in curr_data: # This is a regular polygon - curr_polygon = Polygon( - [(_["x"], _["y"]) for _ in curr_data["path"]], [], label=name, color=annotation_color - ) - polygons.append((curr_polygon, z_index)) - - elif "paths" in curr_data: # This is a complex polygon which needs to be parsed with the even-odd rule - for curr_polygon in _parse_darwin_complex_polygon(curr_data, label=name, color=annotation_color): - polygons.append((curr_polygon, z_index)) - else: - raise ValueError(f"Got unexpected data keys: {curr_data.keys()}") - elif annotation_type == "bounding_box": - warnings.warn( - "Bounding box annotations are not fully supported and will be converted to Polygons.", UserWarning - ) - x, y, w, h = curr_data["x"], curr_data["y"], curr_data["w"], curr_data["h"] - curr_polygon = Polygon( - [(x, y), (x + w, y), (x + w, y + h), (x, y + h)], [], label=name, color=annotation_color - ) - polygons.append((curr_polygon, z_index)) - - else: - raise ValueError(f"Annotation type {annotation_type} is not supported.") - - if sorting == "Z_INDEX": - for polygon, _ in sorted(polygons, key=lambda x: x[1]): - if polygon.label in roi_names: - collection.add_roi(polygon) - else: - collection.add_polygon(polygon) - else: - for polygon, _ in polygons: - if polygon.label in roi_names: - collection.add_roi(polygon) - else: - collection.add_polygon(polygon) - - SlideAnnotations._in_place_sort_and_scale( - collection, scaling, sorting="NONE" if sorting == "Z_INDEX" else sorting - ) - return cls(layers=collection, tags=tuple(tags), sorting=sorting) - - @classmethod - def from_dlup_xml(cls: Type[_TSlideAnnotations], dlup_xml: PathLike) -> _TSlideAnnotations: - """ - Read annotations as a DLUP XML file. - - Parameters - ---------- - dlup_xml : PathLike - Path to the DLUP XML file. - - Returns - ------- - SlideAnnotations - """ - path = pathlib.Path(dlup_xml) - if not path.exists(): - raise FileNotFoundError(errno.ENOENT, os.strerror(errno.ENOENT), str(path)) - - parser = XmlParser() - with open(dlup_xml, "rb") as f: - dlup_annotations = parser.from_bytes(f.read(), XMLDlupAnnotations) - - metadata = None if not dlup_annotations.metadata else asdict(dlup_annotations.metadata) - tags: list[SlideTag] = [] - if dlup_annotations.tags: - for tag in dlup_annotations.tags.tag: - if not tag.label: - raise ValueError("Tag does not have a label.") - curr_tag = SlideTag(attributes=[], label=tag.label, color=hex_to_rgb(tag.color) if tag.color else None) - tags.append(curr_tag) - - collection = GeometryCollection() - polygons: list[tuple[Polygon, int]] = [] - if not dlup_annotations.geometries: - return cls(layers=collection, tags=tuple(tags)) - - if dlup_annotations.geometries.polygon: - polygons += parse_dlup_xml_polygon(dlup_annotations.geometries.polygon) - - # Complain if there are multipolygons - if dlup_annotations.geometries.multi_polygon: - for curr_polygons in dlup_annotations.geometries.multi_polygon: - polygons += parse_dlup_xml_polygon( - curr_polygons.polygon, - order=curr_polygons.order, - label=curr_polygons.label, - index=curr_polygons.index, - ) - - # Now we sort the polygons on order - for polygon, _ in sorted(polygons, key=lambda x: x[1]): - collection.add_polygon(polygon) - - for curr_point in dlup_annotations.geometries.point: - point = Point( - curr_point.x, - curr_point.y, - label=curr_point.label, - color=hex_to_rgb(curr_point.color) if curr_point.color else None, - ) - collection.add_point(point) - - # Complain if there are multipoints - if dlup_annotations.geometries.multi_point: - raise NotImplementedError("Multipoints are not supported.") - - for curr_box in dlup_annotations.geometries.box: - # mypy struggles - assert isinstance(curr_box.x_min, float) - assert isinstance(curr_box.y_min, float) - assert isinstance(curr_box.x_max, float) - assert isinstance(curr_box.y_max, float) - box = Box( - (curr_box.x_min, curr_box.y_min), - (curr_box.x_max - curr_box.x_min, curr_box.y_max - curr_box.y_min), - label=curr_box.label, - color=hex_to_rgb(curr_box.color) if curr_box.color else None, - ) - collection.add_box(box) - - rois: list[tuple[Polygon, int]] = [] - if dlup_annotations.regions_of_interest: - for region_of_interest in dlup_annotations.regions_of_interest.multi_polygon: - raise NotImplementedError( - "MultiPolygon regions of interest are not yet supported. " - "If you have a use case for this, " - "please open an issue at https://github.com/NKI-AI/dlup/issues." - ) - - if dlup_annotations.regions_of_interest.polygon: - rois += parse_dlup_xml_polygon(dlup_annotations.regions_of_interest.polygon) - - if dlup_annotations.regions_of_interest.box: - for _curr_box in dlup_annotations.regions_of_interest.box: - box, curr_order = parse_dlup_xml_roi_box(_curr_box) - rois.append((box.as_polygon(), curr_order)) - for roi, _ in sorted(rois, key=lambda x: x[1]): - collection.add_roi(roi) - - return cls(layers=collection, tags=tuple(tags), metadata=metadata) - - @classmethod - def from_halo_xml( - cls: Type[_TSlideAnnotations], - halo_xml: PathLike, - scaling: float | None = None, - sorting: AnnotationSorting | str = AnnotationSorting.NONE, - box_as_polygon: bool = False, - roi_names: Optional[list[str]] = None, - ) -> _TSlideAnnotations: - """ - Read annotations as a Halo [1] XML file. - This function requires `pyhaloxml` [2] to be installed. - - Parameters - ---------- - halo_xml : PathLike - Path to the Halo XML file. - scaling : float, optional - The scaling to apply to the annotations. - sorting: AnnotationSorting - The sorting to apply to the annotations. Check the `AnnotationSorting` enum for more information. By default - the annotations are not sorted as HALO supports hierarchical annotations. - box_as_polygon : bool - If True, rectangles are converted to polygons, and added as such. - This is useful when the rectangles are actually implicitly bounding boxes. - roi_names : list[str], optional - List of names that should be considered as regions of interest. If set, these will be added as ROIs rather - than polygons. - - References - ---------- - .. [1] https://indicalab.com/halo/ - .. [2] https://github.com/rharkes/pyhaloxml - - Returns - ------- - SlideAnnotations - """ - path = pathlib.Path(halo_xml) - roi_names = [] if roi_names is None else roi_names - - if not path.exists(): - raise FileNotFoundError(errno.ENOENT, os.strerror(errno.ENOENT), str(path)) - - if not PYHALOXML_AVAILABLE: - raise RuntimeError("`pyhaloxml` is not available. Install using `python -m pip install pyhaloxml`.") - import pyhaloxml.shapely - - collection = GeometryCollection() - with pyhaloxml.HaloXMLFile(halo_xml) as hx: - hx.matchnegative() - for layer in hx.layers: - _color = layer.linecolor.rgb - color = (_color[0], _color[1], _color[2]) - for region in layer.regions: - if region.type == pyhaloxml.RegionType.Rectangle: - # The data is a CCW polygon, so the first and one to last coordinates are the coordinates - vertices = region.getvertices() - min_x = min(v[0] for v in vertices) - max_x = max(v[0] for v in vertices) - min_y = min(v[1] for v in vertices) - max_y = max(v[1] for v in vertices) - curr_box = Box((min_x, min_y), (max_x - min_x, max_y - min_y)) - - if box_as_polygon: - polygon = curr_box.as_polygon() - if polygon.label in roi_names: - collection.add_roi(polygon) - else: - collection.add_polygon(polygon) - else: - collection.add_box(curr_box) - continue - - elif region.type in [pyhaloxml.RegionType.Ellipse, pyhaloxml.RegionType.Polygon]: - polygon = Polygon( - region.getvertices(), [x.getvertices() for x in region.holes], label=layer.name, color=color - ) - if polygon.label in roi_names: - collection.add_roi(polygon) - else: - collection.add_polygon(polygon) - elif region.type == pyhaloxml.RegionType.Pin: - point = Point(*(region.getvertices()[0]), label=layer.name, color=color) - collection.add_point(point) - elif region.type == pyhaloxml.RegionType.Ruler: - warnings.warn( - f"Ruler annotations are not supported. Annotation {layer.name} will be skipped", - UserWarning, - ) - continue - else: - raise NotImplementedError(f"Regiontype {region.type} is not implemented in dlup") - - SlideAnnotations._in_place_sort_and_scale(collection, scaling, sorting) - - def offset_function(slide: "SlideImage") -> tuple[int, int]: - return ( - slide.slide_bounds[0][0] - slide.slide_bounds[0][0] % 256, - slide.slide_bounds[0][1] - slide.slide_bounds[0][1] % 256, - ) - - return cls(collection, tags=None, sorting=sorting, offset_function=offset_function) - - @staticmethod - def _in_place_sort_and_scale( - collection: GeometryCollection, scaling: Optional[float], sorting: Optional[AnnotationSorting | str] - ) -> None: - if sorting == "REVERSE": - raise NotImplementedError("This doesn't work for now.") - - if scaling != 1.0 and scaling is not None: - collection.scale(scaling) - if sorting == AnnotationSorting.NONE or sorting is None: - return - if isinstance(sorting, str): - key, reverse = AnnotationSorting[sorting].to_sorting_params() - else: - key, reverse = sorting.to_sorting_params() - collection.sort_polygons(key, reverse) - - def as_geojson(self) -> GeoJsonDict: - """ - Output the annotations as proper geojson. These outputs are sorted according to the `AnnotationSorting` selected - for the annotations. This ensures the annotations are correctly sorted in the output. - - The output is not completely GeoJSON compliant as some parts such as the metadata and properties are not part - of the standard. However, these are implemented to ensure the output is compatible with QuPath. - - Returns - ------- - GeoJsonDict - The output as a GeoJSON dictionary. - """ - data: GeoJsonDict = {"type": "FeatureCollection", "metadata": None, "features": [], "id": None} - if self.tags: - data["metadata"] = {"tags": [_.label for _ in self.tags]} - - if self._layers.boxes: - warnings.warn("Bounding boxes are not supported in GeoJSON and will be skipped.", UserWarning) - - all_layers = self.layers.polygons + self.layers.points - for idx, curr_annotation in enumerate(all_layers): - json_dict = _geometry_to_geojson(curr_annotation, label=curr_annotation.label, color=curr_annotation.color) - json_dict["id"] = str(idx) - data["features"].append(json_dict) - - return data - - def as_dlup_xml( - self, - image_id: Optional[str] = None, - description: Optional[str] = None, - version: Optional[str] = None, - authors: Optional[list[str]] = None, - indent: Optional[int] = 2, - ) -> str: - """ - Output the annotations as DLUP XML. - This format supports the complete serialization of a SlideAnnotations object. - - Parameters - ---------- - image_id : str, optional - The image ID corresponding to this annotation. - description : str, optional - Description of the annotations. - version : str, optional - Version of the annotations. - authors : list[str], optional - Authors of the annotations. - indent : int, optional - Indent for pretty printing. - - Returns - ------- - str - The output as a DLUP XML string. - """ - - metadata = XMLMetadata( - image_id=image_id if image_id is not None else "", - description=description if description is not None else "", - version=version if version is not None else "", - authors=XMLMetadata.Authors(authors) if authors is not None else None, - date_created=XmlDate.from_string(datetime.now().strftime("%Y-%m-%d")), - software=f"dlup {__version__}", - ) - xml_tags: list[XMLTag] = [] - if self.tags: - for tag in self.tags: - if tag.attributes: - attrs = [ - XMLTag.Attribute(value=_.label, color=rgb_to_hex(*_.color) if _.color else None) - for _ in tag.attributes - ] - xml_tag = XMLTag( - attribute=attrs if tag.attributes else [], - label=tag.label, - color=rgb_to_hex(*tag.color) if tag.color else None, - ) - xml_tags.append(xml_tag) - - tags = XMLTags(tag=xml_tags) if xml_tags else None - - geometries = create_xml_geometries(self._layers) - rois = create_xml_rois(self._layers) - - extra_annotation_params: dict[str, XMLTags] = {} - if tags: - extra_annotation_params["tags"] = tags - - dlup_annotations = XMLDlupAnnotations( - metadata=metadata, geometries=geometries, regions_of_interest=rois, **extra_annotation_params - ) - config = SerializerConfig(pretty_print=True) - serializer = XmlSerializer(config=config) - return serializer.render(dlup_annotations) - - @property - def bounding_box(self) -> tuple[tuple[float, float], tuple[float, float]]: - """Get the bounding box of the annotations combining points and polygons. - - Returns - ------- - tuple[tuple[float, float], tuple[float, float]] - The bounding box of the annotations. - - """ - return self._layers.bounding_box - - def bounding_box_at_scaling(self, scaling: float) -> tuple[tuple[float, float], tuple[float, float]]: - """Get the bounding box of the annotations at a specific scaling factor. - - Parameters - ---------- - scaling : float - The scaling factor to apply to the annotations. - - Returns - ------- - tuple[tuple[float, float], tuple[float, float]] - The bounding box of the annotations at the specific scaling factor. - - """ - bbox = self.bounding_box - return ((bbox[0][0] * scaling, bbox[0][1] * scaling), (bbox[1][0] * scaling, bbox[1][1] * scaling)) - - def simplify(self, tolerance: float) -> None: - """Simplify the polygons in the annotation (i.e. reduce points). Other annotations will remain unchanged. - All points in the resulting polygons object will be in the tolerance distance of the original polygon. - - Parameters - ---------- - tolerance : float - The tolerance to simplify the polygons with. - Returns - ------- - None - """ - self._layers.simplify(tolerance) - - def __contains__(self, item: str | Point | Polygon) -> bool: - if isinstance(item, str): - return item in self.available_classes - if isinstance(item, Point): - return item in self.layers.points - if isinstance(item, Polygon): - return item in self.layers.polygons - - return False - - def __len__(self) -> int: - return self._layers.size() - - @property - def available_classes(self) -> set[str]: - """Get the available classes in the annotations. - - Returns - ------- - set[str] - The available classes in the annotations. - - """ - available_classes = set() - for polygon in self.layers.polygons: - if polygon.label is not None: - available_classes.add(polygon.label) - for point in self.layers.points: - if point.label is not None: - available_classes.add(point.label) - - return available_classes - - def __iter__(self) -> Iterable[Polygon | Point]: - # First returns all the polygons then all points - for polygon in self.layers.polygons: - yield polygon - - for point in self.layers.points: - yield point - - def __add__(self, other: Any) -> "SlideAnnotations": - """ - Add two annotations together. This will return a new `SlideAnnotations` object with the annotations combined. - The polygons will be added from left to right followed the points from left to right. - - Notes - ----- - - The polygons and points are shared between the objects. This means that if you modify the polygons or points - in the new object, the original objects will also be modified. If you wish to avoid this, you must add two - copies together. - - Note that the sorting is not applied to this object. You can apply this by calling `sort_polygons()` on - the resulting object. - - Parameters - ---------- - other : SlideAnnotations - The other annotations to add. - - """ - if not isinstance(other, (SlideAnnotations, Point, Polygon, list)): - raise TypeError(f"Unsupported type {type(other)}") - - if isinstance(other, SlideAnnotations): - if not self.sorting == other.sorting: - raise TypeError("Cannot add annotations with different sorting.") - if self.offset_function != other.offset_function: - raise TypeError( - "Cannot add annotations with different requirements for offsetting to slide bounds " - "(`offset_function`)." - ) - - tags: tuple[SlideTag, ...] = () - if self.tags is None and other.tags is not None: - tags = other.tags - - if other.tags is None and self.tags is not None: - tags = self.tags - - if self.tags is not None and other.tags is not None: - tags = tuple(set(self.tags + other.tags)) - - # Let's add the annotations - collection = GeometryCollection() - for polygon in self.layers.polygons: - collection.add_polygon(copy.deepcopy(polygon)) - for point in self.layers.points: - collection.add_point(copy.deepcopy(point)) - - for polygon in other.layers.polygons: - collection.add_polygon(copy.deepcopy(polygon)) - for point in other.layers.points: - collection.add_point(copy.deepcopy(point)) - - SlideAnnotations._in_place_sort_and_scale(collection, None, self.sorting) - return self.__class__(layers=collection, tags=tuple(tags) if tags else None, sorting=self.sorting) - - if isinstance(other, (Point, Polygon)): - other = [other] - - if isinstance(other, list): - if not all(isinstance(item, (Point, Polygon)) for item in other): - raise TypeError( - f"can only add list purely containing Point and Polygon objects to {self.__class__.__name__}" - ) - - collection = copy.copy(self._layers) - for item in other: - if isinstance(item, Polygon): - collection.add_polygon(item) - elif isinstance(item, Point): - collection.add_point(item) - SlideAnnotations._in_place_sort_and_scale(collection, None, self.sorting) - return self.__class__(layers=collection, tags=copy.copy(self._tags), sorting=self.sorting) - - raise ValueError(f"Unsupported type {type(other)}") - - def __iadd__(self, other: Any) -> "SlideAnnotations": - if isinstance(other, (Point, Polygon)): - other = [other] - - if isinstance(other, list): - if not all(isinstance(item, (Point, Polygon)) for item in other): - raise TypeError( - f"can only add list purely containing Point and Polygon objects {self.__class__.__name__}" - ) - - for item in other: - if isinstance(item, Polygon): - self._layers.add_polygon(copy.deepcopy(item)) - elif isinstance(item, Point): - self._layers.add_point(copy.deepcopy(item)) - - elif isinstance(other, SlideAnnotations): - if self.sorting != other.sorting or self.offset_function != other.offset_function: - raise ValueError( - f"Both sorting and offset_function must be the same to " f"add {self.__class__.__name__}s together." - ) - - if self._tags is None: - self._tags = other._tags - elif other._tags is not None: - assert self - self._tags += other._tags - - for polygon in other.layers.polygons: - self._layers.add_polygon(copy.deepcopy(polygon)) - for point in other.layers.points: - self._layers.add_point(copy.deepcopy(point)) - else: - return NotImplemented - SlideAnnotations._in_place_sort_and_scale(self._layers, None, self.sorting) - - return self - - def __radd__(self, other: Any) -> "SlideAnnotations": - # in-place addition (+=) of Point and Polygon will raise a TypeError - if not isinstance(other, (SlideAnnotations, Point, Polygon, list)): - raise TypeError(f"Unsupported type {type(other)}") - if isinstance(other, list): - if not all(isinstance(item, (Polygon, Point)) for item in other): - raise TypeError( - f"can only add list purely containing Point and Polygon objects to {self.__class__.__name__}" - ) - raise TypeError( - "use the __add__ or __iadd__ operator instead of __radd__ when working with lists to avoid \ - unexpected behavior." - ) - return self + other - - def __sub__(self, other: Any) -> "SlideAnnotations": - return NotImplemented - - def __isub__(self, other: Any) -> "SlideAnnotations": - return NotImplemented - - def __rsub__(self, other: Any) -> "SlideAnnotations": - return NotImplemented - - def __eq__(self, other: Any) -> bool: - if not isinstance(other, SlideAnnotations): - return False - - our_sorting = self._sorting if self._sorting != AnnotationSorting.NONE else None - other_sorting = other._sorting if other._sorting != AnnotationSorting.NONE else None - - if our_sorting != other_sorting: - return False - - if self._tags != other._tags: - return False - - if self._layers != other._layers: - return False - - if self._offset_function != other._offset_function: - return False - - return True - - def read_region( - self, - coordinates: tuple[GenericNumber, GenericNumber], - scaling: float, - size: tuple[GenericNumber, GenericNumber], - ) -> AnnotationRegion: - """Reads the region of the annotations. Function signature is the same as `dlup.SlideImage` - so they can be used in conjunction. - - The process is as follows: - - 1. All the annotations which overlap with the requested region of interest are filtered - 2. The polygons in the GeometryContainer in `.layers` are cropped. - The boxes and points are only filtered, so it's possible the boxes have negative (x, y) values - 3. The annotation is rescaled and shifted to the origin to match the local patch coordinate system. - - The final returned data is a `dlup.geometry.AnnotationRegion`. - - Parameters - ---------- - location: tuple[GenericNumber, GenericNumber] - Top-left coordinates of the region in the requested scaling - size : tuple[GenericNumber, GenericNumber] - Output size of the region - scaling : float - Scaling to apply compared to the base level - - Returns - ------- - AnnotationRegion - - Examples - -------- - 1. To read geojson annotations and convert them into masks: - - >>> from pathlib import Path - >>> from dlup import SlideImage - >>> import numpy as np - >>> wsi = SlideImage.from_file_path(Path("path/to/image.svs")) - >>> wsi = wsi.get_scaled_view(scaling=0.5) - >>> wsi = wsi.read_region(location=(0,0), size=wsi.size) - >>> annotations = SlideAnnotations.from_geojson("path/to/geojson.json") - >>> region = annotations.read_region((0,0), 0.01, wsi.size) - >>> mask = region.to_mask() - >>> color_mask = annotations.color_lut[mask] - >>> polygons = region.polygons.get_geometries() # This is a list of `dlup.geometry.Polygon` objects - """ - region = self._layers.read_region(coordinates, scaling, size) - return region - - def scale(self, scaling: float) -> None: - """ - Scale the annotations by a multiplication factor. - This operation will be performed in-place. - - Parameters - ---------- - scaling : float - The scaling factor to apply to the annotations. - - Notes - ----- - This invalidates the R-tree. You could rebuild this manually using `.rebuild_rtree()`, or have the function - `read_region()` do it for you on-demand. - - Returns - ------- - None - """ - self._layers.scale(scaling) - - def set_offset(self, offset: tuple[float, float]) -> None: - """Set the offset for the annotations. This operation will be performed in-place. - - For example, if the offset is 1, 1, the annotations will be moved by 1 unit in the x and y direction. - - Parameters - ---------- - offset : tuple[float, float] - The offset to apply to the annotations. - - Notes - ----- - This invalidates the R-tree. You could rebuild this manually using `.rebuild_rtree()`, or have the function - `read_region()` do it for you on-demand. - - Returns - ------- - None - """ - self._layers.set_offset(offset) - - def rebuild_rtree(self) -> None: - """ - Rebuild the R-tree for the annotations. This operation will be performed in-place. - The R-tree is used for fast spatial queries on the annotations and is invalidated when the annotations are - modified. This function will rebuild the R-tree. Strictly speaking, this is not required as the R-tree will be - rebuilt on-demand when you invoke a `read_region()`. You could however do this if you want to avoid - the `read_region()` to do it for you the first time it runs. - """ - - self._layers.rebuild_rtree() - - def reindex_polygons(self, index_map: dict[str, int]) -> None: - """ - Reindex the polygons in the annotations. This operation will be performed in-place. - This is useful if you want to change the index of the polygons in the annotations. - - This requires that the `.label` property on the polygons is set. - - Parameters - ---------- - index_map : dict[str, int] - A dictionary that maps the label to the new index. - - Returns - ------- - None - """ - self._layers.reindex_polygons(index_map) - - def relabel_polygons(self, relabel_map: dict[str, str]) -> None: - """ - Relabel the polygons in the annotations. This operation will be performed in-place. - - Parameters - ---------- - relabel_map : dict[str, str] - A dictionary that maps the label to the new label. - - Returns - ------- - None - """ - # TODO: Implement in C++ - for polygon in self._layers.polygons: - if not polygon.label: - continue - if polygon.label in relabel_map: - polygon.label = relabel_map[polygon.label] - - def filter_polygons(self, label: str) -> None: - """Filter polygons in-place. - - Note - ---- - This will internally invalidate the R-tree. You could rebuild this manually using `.rebuild_rtree()`, or - have the function itself do this on-demand (typically when you invoke a `.read_region()`) - - Parameters - ---------- - label : str - The label to filter. - - """ - for polygon in self._layers.polygons: - if polygon.label == label: - self._layers.remove_polygon(polygon) - - def filter_points(self, label: str) -> None: - """Filter points in-place. - - Note - ---- - This will internally invalidate the R-tree. You could rebuild this manually using `.rebuild_rtree()`, or - have the function itself do this on-demand (typically when you invoke a `.read_region()`) - - Parameters - ---------- - label : str - The label to filter. - - """ - for point in self._layers.points: - if point.label == label: - self._layers.remove_point(point) - - def filter(self, label: str) -> None: - """Filter annotations in-place. - - Note - ---- - This will internally invalidate the R-tree. You could rebuild this manually using `.rebuild_rtree()`, or - have the function itself do this on-demand (typically when you invoke a `.read_region()`) - - Parameters - ---------- - label : str - The label to filter. - - """ - self.filter_polygons(label) - self.filter_points(label) - - def sort_polygons(self, key: Callable[[Polygon], int | float | str], reverse: bool = False) -> None: - """Sort the polygons in-place. - - Parameters - ---------- - key : callable - The key to sort the polygons on, this has to be a lambda function or similar. - For instance `lambda polygon: polygon.area` will sort the polygons on the area, or - `lambda polygon: polygon.get_field(field_name)` will sort the polygons on that field. - reverse : bool - Whether to sort in reverse order. - - Note - ---- - This will internally invalidate the R-tree. You could rebuild this manually using `.rebuild_rtree()`, or - have the function itself do this on-demand (typically when you invoke a `.read_region()`) - - Returns - ------- - None - - """ - self._layers.sort_polygons(key, reverse) - - @property - def color_lut(self) -> npt.NDArray[np.uint8]: - """Get the color lookup table for the annotations. - - Requires that the polygons have an index and color set. Be aware that for the background always - the value 0 is assumed. - So if you are using the `to_mask(default_value=0)` with a default value other than 0, - the LUT will still have this as index 0. - - Example - ------- - >>> color_lut = annotations.color_lut - >>> region = annotations.read_region(region_start, 0.02, region_size).to_mask() - >>> colored_mask = PIL.Image.fromarray(color_lut[mask]) - - Returns - ------- - np.ndarray - The color lookup table. - - """ - return self._layers.color_lut - - def __copy__(self) -> "SlideAnnotations": - return self.__class__(layers=copy.copy(self._layers), tags=copy.copy(self._tags)) - - def __deepcopy__(self, memo: dict[int, Any]) -> "SlideAnnotations": - return self.__class__(layers=copy.deepcopy(self._layers, memo), tags=copy.deepcopy(self._tags, memo)) - - def copy(self) -> "SlideAnnotations": - return self.__copy__() - - -def _parse_asap_coordinates( - annotation_structure: ET.Element, -) -> list[tuple[float, float]]: - """ - Parse ASAP XML coordinates into list. - - Parameters - ---------- - annotation_structure : list of strings - - Returns - ------- - list[tuple[float, float]] - - """ - coordinates = [] - coordinate_structure = annotation_structure[0] - - for coordinate in coordinate_structure: - coordinates.append( - ( - float(coordinate.get("X").replace(",", ".")), # type: ignore - float(coordinate.get("Y").replace(",", ".")), # type: ignore - ) - ) - - return coordinates - - -def _parse_darwin_complex_polygon( - annotation: dict[str, Any], label: str, color: Optional[tuple[int, int, int]] -) -> Iterable[Polygon]: - """ - Parse a complex polygon (i.e. polygon with holes) from a Darwin annotation. - - Parameters - ---------- - annotation : dict - The annotation dictionary - label : str - The label of the annotation - color : tuple[int, int, int] - The color of the annotation - - Returns - ------- - Iterable[DlupPolygon] - """ - # Create Polygons and sort by area in descending order - polygons = [Polygon([(p["x"], p["y"]) for p in path], []) for path in annotation["paths"]] - polygons.sort(key=lambda x: x.area, reverse=True) - - outer_polygons: list[tuple[Polygon, list[Any], bool]] = [] - for polygon in polygons: - is_hole = False - # Check if the polygon can be a hole in any of the previously processed polygons - for outer_poly, holes, outer_poly_is_hole in reversed(outer_polygons): - contains = outer_poly.contains(polygon) - # If polygon is contained by a hole, it should be added as new polygon - if contains and outer_poly_is_hole: - break - # Polygon is added as hole if outer polygon is not a hole - elif contains: - holes.append(polygon.get_exterior()) - is_hole = True - break - outer_polygons.append((polygon, [], is_hole)) - - for outer_poly, holes, _is_hole in outer_polygons: - if not _is_hole: - polygon = Polygon(outer_poly.get_exterior(), holes) - polygon.label = label - polygon.color = color - yield polygon diff --git a/dlup/background.py b/dlup/background.py index 4e696069..6b748e1c 100644 --- a/dlup/background.py +++ b/dlup/background.py @@ -15,7 +15,7 @@ # pylint: disable=import-error, no-name-in-module from dlup._background import _get_foreground_indices_numpy from dlup._exceptions import DlupError -from dlup.annotations import WsiAnnotations +from dlup.annotations import SlideAnnotations if TYPE_CHECKING: from dlup.data.dataset import MaskTypes @@ -27,14 +27,14 @@ def compute_masked_indices( regions: collections.abc.Sequence[tuple[float, float, int, int, float]], threshold: float | None = 1.0, ) -> npt.NDArray[np.int64]: - """Filter the regions to foreground data. This can be either an `np.ndarray` or `SlideImage` or `WsiAnnotations`. + """Filter the regions to foreground data. This can be either an `np.ndarray` or `SlideImage` or `SlideAnnotations`. Using numpy arrays is the fastest way to filter the background. Parameters ---------- slide_image : SlideImage Slide image to check - background_mask : np.ndarray or SlideImage or WsiAnnotations + background_mask : np.ndarray or SlideImage or SlideAnnotations Background mask to check against regions : collections.abc.Sequence[tuple[float, float, int, int, float]] Regions to check @@ -66,10 +66,10 @@ def compute_masked_indices( elif isinstance(background_mask, SlideImage): slide_image_boolean_mask: npt.NDArray[np.bool_] = np.zeros(len(regions), dtype=bool) for idx, region in enumerate(regions): - slide_image_boolean_mask[idx] = _is_foreground_wsiannotations(background_mask, region, threshold) + slide_image_boolean_mask[idx] = _is_foreground_slideannotations(background_mask, region, threshold) masked_indices = np.argwhere(slide_image_boolean_mask).flatten() - elif isinstance(background_mask, WsiAnnotations): + elif isinstance(background_mask, SlideAnnotations): wsi_annotations_boolean_mask: npt.NDArray[np.bool_] = np.zeros(len(regions), dtype=bool) for idx, region in enumerate(regions): wsi_annotations_boolean_mask[idx] = _is_foreground_polygon(slide_image, background_mask, region, threshold) @@ -83,7 +83,7 @@ def compute_masked_indices( def _is_foreground_polygon( slide_image: SlideImage, - background_mask: WsiAnnotations, + background_mask: SlideAnnotations, region: tuple[float, float, int, int, float], threshold: float = 1.0, ) -> bool: @@ -92,7 +92,7 @@ def _is_foreground_polygon( scaling = slide_image.get_scaling(mpp) - polygon_region = background_mask.read_region((x, y), scaling, (w, h)) + polygon_region = background_mask.read_region((x, y), scaling, (w, h)).polygons.get_geometries() total_area = sum(_.area for _ in polygon_region) if threshold == 1.0 and total_area == w * h: @@ -104,7 +104,7 @@ def _is_foreground_polygon( return False -def _is_foreground_wsiannotations( +def _is_foreground_slideannotations( background_mask: SlideImage, region: tuple[float, float, int, int, float], threshold: float = 1.0, diff --git a/dlup/data/dataset.py b/dlup/data/dataset.py index 80cae0f4..3c35ccc5 100644 --- a/dlup/data/dataset.py +++ b/dlup/data/dataset.py @@ -32,21 +32,23 @@ from numpy.typing import NDArray from dlup import BoundaryMode, SlideImage +from dlup._geometry import AnnotationRegion # pylint: disable=no-name-in-module from dlup._types import PathLike, ROIType -from dlup.annotations import DlupShapelyPoint, DlupShapelyPolygon, WsiAnnotations +from dlup.annotations import SlideAnnotations from dlup.backends.common import AbstractSlideBackend from dlup.background import compute_masked_indices +from dlup.geometry import Box, Point, Polygon from dlup.tiling import Grid, GridOrder, TilingMode from dlup.tools import ConcatSequences, MapSequence from dlup.utils.backends import ImageBackend -MaskTypes = Union["SlideImage", npt.NDArray[np.int_], "WsiAnnotations"] +MaskTypes = Union["SlideImage", npt.NDArray[np.int_], "SlideAnnotations"] -_AnnotationsTypes = DlupShapelyPoint | DlupShapelyPolygon +_AnnotationsTypes = Point | Polygon | Box T_co = TypeVar("T_co", covariant=True) T = TypeVar("T") -_BaseAnnotationTypes = Union[SlideImage, WsiAnnotations] +_BaseAnnotationTypes = Union[SlideImage, SlideAnnotations] _AnnotationTypes = Union[list[tuple[str, _BaseAnnotationTypes]], _BaseAnnotationTypes] _LabelTypes = Union[str, bool, int, float] @@ -62,7 +64,7 @@ class TileSample(TypedDict): path: PathLike region_index: int labels: dict[str, Any] | None - annotations: Optional[Iterable[_AnnotationsTypes]] + annotations: Optional[AnnotationRegion] PointType = tuple[float, float] @@ -244,7 +246,7 @@ def __init__( Sequence of rectangular regions as (x, y, h, w, mpp) crop : bool Whether to crop overflowing tiles. - mask : np.ndarray or SlideImage or WsiAnnotations + mask : np.ndarray or SlideImage or SlideAnnotations Binary mask used to filter each region together with a threshold. mask_threshold : float, optional Threshold to check against. The foreground percentage should be strictly larger than threshold. @@ -253,7 +255,7 @@ def __init__( output_tile_size: tuple[int, int], optional If this value is set, this value will be used as the tile size of the output tiles. If this value is different from the underlying grid, this tile will be extracted around the center of the region. - annotations : WsiAnnotations + annotations : SlideAnnotations Annotation classes. labels : list Image-level labels. Will be added to each individual tile. @@ -356,8 +358,8 @@ def __getitem__(self, index: Union[int, slice]) -> Union[TileSample, Sequence[Ti # TODO: This needs to move to TiledWsiDataset (v1.0) if self.annotations is not None: - if not isinstance(self.annotations, WsiAnnotations): - raise NotImplementedError("Only WsiAnnotations are supported at the moment.") + if not isinstance(self.annotations, SlideAnnotations): + raise NotImplementedError("Only SlideAnnotations are supported at the moment.") _requires_offset = getattr(self.annotations, "offset_to_slide_bounds", False) if _requires_offset: _scaled_offset = slide_image.get_scaled_slide_bounds(scaling)[0] diff --git a/dlup/data/transforms.py b/dlup/data/transforms.py deleted file mode 100644 index ca969acb..00000000 --- a/dlup/data/transforms.py +++ /dev/null @@ -1,269 +0,0 @@ -# Copyright (c) dlup contributors -"""This module contains the transforms which can be applied to the output of a Dataset class""" -from __future__ import annotations - -from collections import defaultdict -from typing import Iterable, cast - -import cv2 -import numpy as np -import numpy.typing as npt - -import dlup.annotations -from dlup._exceptions import AnnotationError -from dlup.annotations import AnnotationClass, AnnotationType -from dlup.data.dataset import PointType, TileSample, TileSampleWithAnnotationData - -_AnnotationsTypes = dlup.annotations.DlupShapelyPoint | dlup.annotations.DlupShapelyPolygon - - -def convert_annotations( - annotations: Iterable[_AnnotationsTypes], - region_size: tuple[int, int], - index_map: dict[str, int], - roi_name: str | None = None, - default_value: int = 0, - multiclass: bool = False, -) -> tuple[dict[str, list[PointType]], npt.NDArray[np.int_], npt.NDArray[np.int_] | None]: - """ - Convert the polygon and point annotations as output of a dlup dataset class, where: - - In case of points the output is dictionary mapping the annotation name to a list of locations. - - In case of bounding boxes the output is a dictionary mapping the annotation name to a list of bounding boxes. - Note that the internal representation of a bounding box is a polygon (`AnnotationType is AnnotationType.BOX`), - so the bounding box of that polygon is computed to convert. - - In case of polygons these are converted into a mask according to `index_map`, or the z_index + 1. - - *BE AWARE*: the polygon annotations are processed sequentially and later annotations can overwrite earlier ones. - This is for instance useful when you would annotate "tumor associated stroma" on top of "stroma". - The dlup Annotation classes return the polygons with area from large to small. - - When the polygon has holes, the previous written annotation is used to fill the holes. - - *BE AWARE*: This function will silently ignore annotations which are written out of bounds. - - TODO - ---- - - Convert segmentation index map to an Enum - - Parameters - ---------- - annotations : Iterable[_AnnotationsTypes] - The annotations as a list, e.g., as output from `dlup.annotations.WsiAnnotations.read_region()`. - region_size : tuple[int, int] - index_map : dict[str, int] - Map mapping annotation name to index number in the output. - roi_name : str, optional - Name of the region-of-interest key. - default_value : int - The mask will be initialized with this value. - multiclass : bool - Output a multiclass mask, the first axis will be the index. This requires that an index_map is available. - - Returns - ------- - dict, dict, np.ndarray, np.ndarray or None - Dictionary of points, dictionary of boxes, mask and roi_mask. - - """ - if multiclass and index_map is None: - raise ValueError("index_map must be provided when multiclass is True.") - - # Initialize the base mask, which may be multi-channel if output_class_axis is True - if multiclass: - assert isinstance(index_map, dict) - num_classes = max(index_map.values()) + 1 # Assuming index_map starts from 0 - mask = np.zeros((num_classes, *region_size), dtype=np.int32) - else: - mask = np.empty(region_size, dtype=np.int32) - mask[:] = default_value - - points: dict[str, list[PointType]] = defaultdict(list) - roi_mask = np.zeros(region_size, dtype=np.int32) - - has_roi = False - for curr_annotation in annotations: - holes_mask = None - if isinstance(curr_annotation, dlup.annotations.DlupShapelyPoint): - coords = tuple(curr_annotation.coords) - points[curr_annotation.label] += tuple(coords) - continue - - index_value: int - if curr_annotation.label != roi_name: - if curr_annotation.label not in index_map: - raise ValueError(f"Label {curr_annotation.label} is not in the index map {index_map}") - elif roi_name is not None: - cv2.fillPoly( - roi_mask, - [np.asarray(curr_annotation.exterior.coords).round().astype(np.int32)], - [1], - ) - has_roi = True - continue - - index_value = index_map[curr_annotation.label] - - original_values = None - interiors = [np.asarray(pi.coords).round().astype(np.int32) for pi in curr_annotation.interiors] - if interiors != []: - original_values = mask.copy() - holes_mask = np.zeros(region_size, dtype=np.int32) - # Get a mask where the holes are - cv2.fillPoly(holes_mask, interiors, [1]) - - if not multiclass: - cv2.fillPoly( - mask, - [np.asarray(curr_annotation.exterior.coords).round().astype(np.int32)], - [index_value], - ) - else: - cv2.fillPoly( - mask[index_value], - [np.asarray(curr_annotation.exterior.coords).round().astype(np.int32)], - [1], - ) - if interiors != []: - # TODO: This is a bit hacky to ignore mypy here, but I don't know how to fix it. - mask = np.where(holes_mask == 1, original_values, mask) # type: ignore - - # This is a hard to find bug, so better give an explicit error. - if not has_roi and roi_name is not None: - raise AnnotationError(f"ROI mask {roi_name} not found, please add a ROI mask to the annotations.") - - return dict(points), mask, roi_mask if roi_name else None - - -class ConvertAnnotationsToMask: - """Transform which converts polygons to masks. Will overwrite the annotations key""" - - def __init__( - self, *, roi_name: str | None, index_map: dict[str, int], multiclass: bool = False, default_value: int = 0 - ): - """ - Converts annotations given my `dlup.annotations.Polygon` or `dlup.annotations.Point` to a mask and a dictionary - of points. The mask is initialized with `default_value`, (i.e., background). The values in the mask are - subsequently determined by `index_map`, where each value is written to the mask according to this map, in the - order of the elements in the annotations. This means that if you have overlapping polygons, the last polygon - will overwrite the previous one. The sorting can be handled in the `dlup.annotations.WsiAnnotation` class. - - In case there are no annotations present (i.e. the "annotations" key is None) a `ValueError` is - raised. - - Parameters - ---------- - roi_name : str, optional - Name of the ROI key. - index_map : dict - Dictionary mapping the label to the integer in the output. - multiclass: bool - Output a multiclass mask, the first axis will be the index. This requires that an index_map is available. - default_value : int - The mask will be initialized with this value. - """ - self._roi_name = roi_name - self._index_map = index_map - self._default_value = default_value - self._multiclass = multiclass - - def __call__(self, sample: TileSample) -> TileSampleWithAnnotationData: - """ - Convert the annotations to a mask. - - Parameters - ---------- - sample : TileSample - The input sample. - - Raises - ------ - ValueError - If no annotations are found. - - Returns - ------- - TileSampleWithAnnotationData - The input sample with the annotation data added. - - """ - - _annotations = sample["annotations"] - if _annotations is None: - raise ValueError("No annotations found to convert to mask.") - - image = sample["image"] - points, mask, roi = convert_annotations( - _annotations, - (image.height, image.width), - roi_name=self._roi_name, - index_map=self._index_map, - default_value=self._default_value, - ) - - output: TileSampleWithAnnotationData = cast(TileSampleWithAnnotationData, sample) - output["annotation_data"] = { - "points": points, - "mask": mask, - "roi": roi, - } - - return output - - -def rename_labels(annotations: Iterable[_AnnotationsTypes], remap_labels: dict[str, str]) -> list[_AnnotationsTypes]: - """ - Rename the labels in the annotations. - - Parameters - ---------- - annotations: Iterable[_AnnotationsTypes] - The annotations - remap_labels: dict[str, str] - The renaming table - - Returns - ------- - list[_AnnotationsTypes] - """ - output_annotations = [] - for annotation in annotations: - label = annotation.label - if label not in remap_labels: - output_annotations.append(annotation) - continue - - if annotation.annotation_class.annotation_type == AnnotationType.BOX: - a_cls = AnnotationClass(label=remap_labels[label], annotation_type=AnnotationType.BOX) - output_annotations.append(dlup.annotations.DlupShapelyPolygon(annotation, a_cls=a_cls)) - elif annotation.annotation_class.annotation_type == AnnotationType.POLYGON: - a_cls = AnnotationClass(label=remap_labels[label], annotation_type=AnnotationType.POLYGON) - output_annotations.append(dlup.annotations.DlupShapelyPolygon(annotation, a_cls=a_cls)) - elif annotation.annotation_class.annotation_type == AnnotationType.POINT: - a_cls = AnnotationClass(label=remap_labels[label], annotation_type=AnnotationType.POINT) - output_annotations.append(dlup.annotations.DlupShapelyPoint(annotation, a_cls=a_cls)) - else: - raise AnnotationError(f"Unsupported annotation type {annotation.annotation_class.annotation_type}") - - return output_annotations - - -class RenameLabels: - """Remap the label names""" - - def __init__(self, remap_labels: dict[str, str]): - """ - - Parameters - ---------- - remap_labels : dict - Dictionary mapping old name to new name. - """ - self._remap_labels = remap_labels - - def __call__(self, sample: TileSample) -> TileSample: - _annotations = sample["annotations"] - if _annotations is None: - raise ValueError("No annotations found to rename.") - - sample["annotations"] = rename_labels(_annotations, self._remap_labels) - return sample diff --git a/examples/annotations_to_mask.py b/examples/annotations_to_mask.py index c8c8f35d..06af112e 100644 --- a/examples/annotations_to_mask.py +++ b/examples/annotations_to_mask.py @@ -1,10 +1,11 @@ # Copyright (c) dlup contributors +# pylint: disable=no-member """This code provides an example of how to convert annotations to a mask.""" from pathlib import Path import PIL.Image -from dlup.annotations_experimental import SlideAnnotations +from dlup.annotations import SlideAnnotations def convert_annotations_to_mask() -> None: diff --git a/src/geometry/box.h b/src/geometry/box.h index 889afa9d..647d0827 100644 --- a/src/geometry/box.h +++ b/src/geometry/box.h @@ -102,4 +102,4 @@ inline void declare_box(py::module &m) { .def_property_readonly("wkt", &Box::toWkt, "Get the WKT representation of the box"); } -#endif // DLUP_GEOMETRY_BOX_H \ No newline at end of file +#endif // DLUP_GEOMETRY_BOX_H