diff --git a/ome_zarr/format.py b/ome_zarr/format.py index ca1ec9ea..866c6c7c 100644 --- a/ome_zarr/format.py +++ b/ome_zarr/format.py @@ -2,7 +2,7 @@ import logging from abc import ABC, abstractmethod -from typing import Iterator, Optional +from typing import Dict, Iterator, List, Optional from zarr.storage import FSStore @@ -44,11 +44,11 @@ def detect_format(metadata: dict) -> "Format": class Format(ABC): @property @abstractmethod - def version(self) -> str: + def version(self) -> str: # pragma: no cover raise NotImplementedError() @abstractmethod - def matches(self, metadata: dict) -> bool: + def matches(self, metadata: dict) -> bool: # pragma: no cover raise NotImplementedError() @abstractmethod @@ -56,7 +56,7 @@ def init_store(self, path: str, mode: str = "r") -> FSStore: raise NotImplementedError() # @abstractmethod - def init_channels(self) -> None: + def init_channels(self) -> None: # pragma: no cover raise NotImplementedError() def _get_multiscale_version(self, metadata: dict) -> Optional[str]: @@ -72,12 +72,26 @@ def __repr__(self) -> str: def __eq__(self, other: object) -> bool: return self.__class__ == other.__class__ + @abstractmethod + def generate_well_dict( + self, well: str, rows: List[str], columns: List[str] + ) -> dict: # pragma: no cover + raise NotImplementedError() + + @abstractmethod + def validate_well_dict( + self, well: dict, rows: List[str], columns: List[str] + ) -> None: # pragma: no cover + raise NotImplementedError() + class FormatV01(Format): """ Initial format. (2020) """ + REQUIRED_PLATE_WELL_KEYS: Dict[str, type] = {"path": str} + @property def version(self) -> str: return "0.1" @@ -92,8 +106,24 @@ def init_store(self, path: str, mode: str = "r") -> FSStore: LOGGER.debug(f"Created legacy flat FSStore({path}, {mode})") return store + def generate_well_dict( + self, well: str, rows: List[str], columns: List[str] + ) -> dict: + return {"path": str(well)} + + def validate_well_dict( + self, well: dict, rows: List[str], columns: List[str] + ) -> None: + if any(e not in self.REQUIRED_PLATE_WELL_KEYS for e in well.keys()): + LOGGER.debug("f{well} contains unspecified keys") + for key, key_type in self.REQUIRED_PLATE_WELL_KEYS.items(): + if key not in well: + raise ValueError(f"{well} must contain a {key} key of type {key_type}") + if not isinstance(well[key], key_type): + raise ValueError(f"{well} path must be of {key_type} type") -class FormatV02(Format): + +class FormatV02(FormatV01): """ Changelog: move to nested storage (April 2021) """ @@ -151,9 +181,39 @@ class FormatV04(FormatV03): introduce transformations in multiscales (Nov 2021) """ + REQUIRED_PLATE_WELL_KEYS = {"path": str, "rowIndex": int, "columnIndex": int} + @property def version(self) -> str: return "0.4" + def generate_well_dict( + self, well: str, rows: List[str], columns: List[str] + ) -> dict: + row, column = well.split("/") + if row not in rows: + raise ValueError(f"{row} is not defined in the list of rows") + rowIndex = rows.index(row) + if column not in columns: + raise ValueError(f"{column} is not defined in the list of columns") + columnIndex = columns.index(column) + return {"path": str(well), "rowIndex": rowIndex, "columnIndex": columnIndex} + + def validate_well_dict( + self, well: dict, rows: List[str], columns: List[str] + ) -> None: + super().validate_well_dict(well, rows, columns) + if len(well["path"].split("/")) != 2: + raise ValueError(f"{well} path must exactly be composed of 2 groups") + row, column = well["path"].split("/") + if row not in rows: + raise ValueError(f"{row} is not defined in the plate rows") + if well["rowIndex"] != rows.index(row): + raise ValueError(f"Mismatching row index for {well}") + if column not in columns: + raise ValueError(f"{column} is not defined in the plate columns") + if well["columnIndex"] != columns.index(column): + raise ValueError(f"Mismatching column index for {well}") + CurrentFormat = FormatV04 diff --git a/ome_zarr/writer.py b/ome_zarr/writer.py index 0d1a357c..17d796fd 100644 --- a/ome_zarr/writer.py +++ b/ome_zarr/writer.py @@ -107,26 +107,38 @@ def _validate_plate_acquisitions( return acquisitions +def _validate_plate_rows_columns( + rows_or_columns: List[str], + fmt: Format = CurrentFormat(), +) -> List[dict]: + + if len(set(rows_or_columns)) != len(rows_or_columns): + raise ValueError(f"{rows_or_columns} must contain unique elements") + validated_list = [] + for element in rows_or_columns: + if not element.isalnum(): + raise ValueError(f"{element} must contain alphanumeric characters") + validated_list.append({"name": str(element)}) + return validated_list + + def _validate_plate_wells( - wells: List[Union[str, dict]], fmt: Format = CurrentFormat() + wells: List[Union[str, dict]], + rows: List[str], + columns: List[str], + fmt: Format = CurrentFormat(), ) -> List[dict]: - VALID_KEYS = [ - "path", - ] validated_wells = [] if wells is None or len(wells) == 0: raise ValueError("Empty wells list") for well in wells: if isinstance(well, str): - validated_wells.append({"path": str(well)}) + well_dict = fmt.generate_well_dict(well, rows, columns) + fmt.validate_well_dict(well_dict, rows, columns) + validated_wells.append(well_dict) elif isinstance(well, dict): - if any(e not in VALID_KEYS for e in well.keys()): - LOGGER.debug("f{well} contains unspecified keys") - if "path" not in well: - raise ValueError(f"{well} must contain a path key") - if not isinstance(well["path"], str): - raise ValueError(f"{well} path must be of str type") + fmt.validate_well_dict(well, rows, columns) validated_wells.append(well) else: raise ValueError(f"Unrecognized type for {well}") @@ -259,9 +271,9 @@ def write_plate_metadata( """ plate: Dict[str, Union[str, int, List[Dict]]] = { - "columns": [{"name": str(c)} for c in columns], - "rows": [{"name": str(r)} for r in rows], - "wells": _validate_plate_wells(wells), + "columns": _validate_plate_rows_columns(columns), + "rows": _validate_plate_rows_columns(rows), + "wells": _validate_plate_wells(wells, rows, columns, fmt=fmt), "version": fmt.version, } if name is not None: diff --git a/tests/test_writer.py b/tests/test_writer.py index 0e0e11ed..a952dd44 100644 --- a/tests/test_writer.py +++ b/tests/test_writer.py @@ -298,7 +298,9 @@ def test_minimal_plate(self): assert self.root.attrs["plate"]["columns"] == [{"name": "1"}] assert self.root.attrs["plate"]["rows"] == [{"name": "A"}] assert self.root.attrs["plate"]["version"] == CurrentFormat().version - assert self.root.attrs["plate"]["wells"] == [{"path": "A/1"}] + assert self.root.attrs["plate"]["wells"] == [ + {"path": "A/1", "rowIndex": 0, "columnIndex": 0} + ] assert "name" not in self.root.attrs["plate"] assert "field_count" not in self.root.attrs["plate"] assert "acquisitions" not in self.root.attrs["plate"] @@ -335,25 +337,57 @@ def test_12wells_plate(self): ] assert self.root.attrs["plate"]["version"] == CurrentFormat().version assert self.root.attrs["plate"]["wells"] == [ - {"path": "A/1"}, - {"path": "A/2"}, - {"path": "A/3"}, - {"path": "B/1"}, - {"path": "B/2"}, - {"path": "B/3"}, - {"path": "C/1"}, - {"path": "C/2"}, - {"path": "C/3"}, - {"path": "D/1"}, - {"path": "D/2"}, - {"path": "D/3"}, + {"path": "A/1", "rowIndex": 0, "columnIndex": 0}, + {"path": "A/2", "rowIndex": 0, "columnIndex": 1}, + {"path": "A/3", "rowIndex": 0, "columnIndex": 2}, + {"path": "B/1", "rowIndex": 1, "columnIndex": 0}, + {"path": "B/2", "rowIndex": 1, "columnIndex": 1}, + {"path": "B/3", "rowIndex": 1, "columnIndex": 2}, + {"path": "C/1", "rowIndex": 2, "columnIndex": 0}, + {"path": "C/2", "rowIndex": 2, "columnIndex": 1}, + {"path": "C/3", "rowIndex": 2, "columnIndex": 2}, + {"path": "D/1", "rowIndex": 3, "columnIndex": 0}, + {"path": "D/2", "rowIndex": 3, "columnIndex": 1}, + {"path": "D/3", "rowIndex": 3, "columnIndex": 2}, + ] + assert "name" not in self.root.attrs["plate"] + assert "field_count" not in self.root.attrs["plate"] + assert "acquisitions" not in self.root.attrs["plate"] + + def test_sparse_plate(self): + rows = ["A", "B", "C", "D", "E"] + cols = ["1", "2", "3", "4", "5"] + wells = [ + "B/2", + "E/5", + ] + write_plate_metadata(self.root, rows, cols, wells) + assert "plate" in self.root.attrs + assert self.root.attrs["plate"]["columns"] == [ + {"name": "1"}, + {"name": "2"}, + {"name": "3"}, + {"name": "4"}, + {"name": "5"}, + ] + assert self.root.attrs["plate"]["rows"] == [ + {"name": "A"}, + {"name": "B"}, + {"name": "C"}, + {"name": "D"}, + {"name": "E"}, + ] + assert self.root.attrs["plate"]["version"] == CurrentFormat().version + assert self.root.attrs["plate"]["wells"] == [ + {"path": "B/2", "rowIndex": 1, "columnIndex": 1}, + {"path": "E/5", "rowIndex": 4, "columnIndex": 4}, ] assert "name" not in self.root.attrs["plate"] assert "field_count" not in self.root.attrs["plate"] assert "acquisitions" not in self.root.attrs["plate"] @pytest.mark.parametrize("fmt", (FormatV01(), FormatV02(), FormatV03())) - def test_plate_version(self, fmt): + def test_legacy_wells(self, fmt): write_plate_metadata(self.root, ["A"], ["1"], ["A/1"], fmt=fmt) assert "plate" in self.root.attrs assert self.root.attrs["plate"]["columns"] == [{"name": "1"}] @@ -371,7 +405,9 @@ def test_plate_name(self): assert self.root.attrs["plate"]["name"] == "test" assert self.root.attrs["plate"]["rows"] == [{"name": "A"}] assert self.root.attrs["plate"]["version"] == CurrentFormat().version - assert self.root.attrs["plate"]["wells"] == [{"path": "A/1"}] + assert self.root.attrs["plate"]["wells"] == [ + {"path": "A/1", "rowIndex": 0, "columnIndex": 0} + ] assert "field_count" not in self.root.attrs["plate"] assert "acquisitions" not in self.root.attrs["plate"] @@ -382,7 +418,9 @@ def test_field_count(self): assert self.root.attrs["plate"]["field_count"] == 10 assert self.root.attrs["plate"]["rows"] == [{"name": "A"}] assert self.root.attrs["plate"]["version"] == CurrentFormat().version - assert self.root.attrs["plate"]["wells"] == [{"path": "A/1"}] + assert self.root.attrs["plate"]["wells"] == [ + {"path": "A/1", "rowIndex": 0, "columnIndex": 0} + ] assert "name" not in self.root.attrs["plate"] assert "acquisitions" not in self.root.attrs["plate"] @@ -394,7 +432,9 @@ def test_acquisitions_minimal(self): assert self.root.attrs["plate"]["columns"] == [{"name": "1"}] assert self.root.attrs["plate"]["rows"] == [{"name": "A"}] assert self.root.attrs["plate"]["version"] == CurrentFormat().version - assert self.root.attrs["plate"]["wells"] == [{"path": "A/1"}] + assert self.root.attrs["plate"]["wells"] == [ + {"path": "A/1", "rowIndex": 0, "columnIndex": 0} + ] assert "name" not in self.root.attrs["plate"] assert "field_count" not in self.root.attrs["plate"] @@ -415,7 +455,9 @@ def test_acquisitions_maximal(self): assert self.root.attrs["plate"]["columns"] == [{"name": "1"}] assert self.root.attrs["plate"]["rows"] == [{"name": "A"}] assert self.root.attrs["plate"]["version"] == CurrentFormat().version - assert self.root.attrs["plate"]["wells"] == [{"path": "A/1"}] + assert self.root.attrs["plate"]["wells"] == [ + {"path": "A/1", "rowIndex": 0, "columnIndex": 0} + ] assert "name" not in self.root.attrs["plate"] assert "field_count" not in self.root.attrs["plate"] @@ -450,21 +492,65 @@ def test_invalid_well_list(self, wells): @pytest.mark.parametrize( "wells", ( - [{"path": 0}], + # Missing required keys [{"id": "test"}], - [{"path": "A/1"}, {"path": None}], + [{"path": "A/1"}], + [{"path": "A/1", "rowIndex": 0}], + [{"path": "A/1", "columnIndex": 0}], + [{"rowIndex": 0, "columnIndex": 0}], + # Invalid paths + [{"path": 0, "rowIndex": 0, "columnIndex": 0}], + [{"path": None, "rowIndex": 0, "columnIndex": 0}], + [{"path": "plate/A/1", "rowIndex": 0, "columnIndex": 0}], + [{"path": "plate/A1", "rowIndex": 0, "columnIndex": 0}], + [{"path": "A/1/0", "rowIndex": 0, "columnIndex": 0}], + [{"path": "A1", "rowIndex": 0, "columnIndex": 0}], + [{"path": "0", "rowIndex": 0, "columnIndex": 0}], + # Invalid row/column indices + [{"path": "A/1", "rowIndex": "0", "columnIndex": 0}], + [{"path": "A/1", "rowIndex": 0, "columnIndex": "0"}], + # Undefined rows/columns + [{"path": "C/1", "rowIndex": 2, "columnIndex": 0}], + [{"path": "A/3", "rowIndex": 0, "columnIndex": 2}], + # Mismatching indices + [{"path": "A/1", "rowIndex": 0, "columnIndex": 1}], + [{"path": "A/1", "rowIndex": 1, "columnIndex": 0}], ), ) def test_invalid_well_keys(self, wells): with pytest.raises(ValueError): write_plate_metadata(self.root, ["A"], ["1"], wells) - def test_unspecified_well_keys(self): + @pytest.mark.parametrize("fmt", (FormatV01(), FormatV02(), FormatV03())) + def test_legacy_unspecified_well_keys(self, fmt): wells = [ {"path": "A/1", "unspecified_key": "alpha"}, {"path": "A/2", "unspecified_key": "beta"}, {"path": "B/1", "unspecified_key": "gamma"}, ] + write_plate_metadata(self.root, ["A", "B"], ["1", "2"], wells, fmt=fmt) + assert "plate" in self.root.attrs + assert self.root.attrs["plate"]["columns"] == [{"name": "1"}, {"name": "2"}] + assert self.root.attrs["plate"]["rows"] == [{"name": "A"}, {"name": "B"}] + assert self.root.attrs["plate"]["version"] == fmt.version + assert self.root.attrs["plate"]["wells"] == wells + + def test_unspecified_well_keys(self): + wells = [ + { + "path": "A/1", + "rowIndex": 0, + "columnIndex": 0, + "unspecified_key": "alpha", + }, + {"path": "A/2", "rowIndex": 0, "columnIndex": 1, "unspecified_key": "beta"}, + { + "path": "B/1", + "rowIndex": 1, + "columnIndex": 0, + "unspecified_key": "gamma", + }, + ] write_plate_metadata(self.root, ["A", "B"], ["1", "2"], wells) assert "plate" in self.root.attrs assert self.root.attrs["plate"]["columns"] == [{"name": "1"}, {"name": "2"}] @@ -472,6 +558,35 @@ def test_unspecified_well_keys(self): assert self.root.attrs["plate"]["version"] == CurrentFormat().version assert self.root.attrs["plate"]["wells"] == wells + def test_missing_well_keys(self): + wells = [ + {"path": "A/1"}, + {"path": "A/2"}, + {"path": "B/1"}, + ] + with pytest.raises(ValueError): + write_plate_metadata(self.root, ["A", "B"], ["1", "2"], wells) + + def test_well_not_in_rows(self): + wells = ["A/1", "B/1", "C/1"] + with pytest.raises(ValueError): + write_plate_metadata(self.root, ["A", "B"], ["1", "2"], wells) + + def test_well_not_in_columns(self): + wells = ["A/1", "A/2", "A/3"] + with pytest.raises(ValueError): + write_plate_metadata(self.root, ["A", "B"], ["1", "2"], wells) + + @pytest.mark.parametrize("rows", (["A", "B", "B"], ["A", "&"])) + def test_invalid_rows(self, rows): + with pytest.raises(ValueError): + write_plate_metadata(self.root, rows, ["1"], ["A/1"]) + + @pytest.mark.parametrize("columns", (["1", "2", "2"], ["1", "&"])) + def test_invalid_columns(self, columns): + with pytest.raises(ValueError): + write_plate_metadata(self.root, ["A"], columns, ["A/1"]) + class TestWellMetadata: @pytest.fixture(autouse=True)