diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml new file mode 100644 index 0000000..3a6e949 --- /dev/null +++ b/.github/workflows/ci.yaml @@ -0,0 +1,11 @@ +name: CI + +on: [push, pull_request] + +jobs: + tests: + uses: ./.github/workflows/tests.yaml + lint: + uses: ./.github/workflows/lint.yaml + format: + uses: ./.github/workflows/format.yaml diff --git a/.github/workflows/format.yaml b/.github/workflows/format.yaml new file mode 100644 index 0000000..ed0bdb2 --- /dev/null +++ b/.github/workflows/format.yaml @@ -0,0 +1,27 @@ +name: format + +on: [workflow_call] + +jobs: + format: + strategy: + fail-fast: false + matrix: + python-version: ["3.10"] + os: [ubuntu-22.04] + runs-on: ${{ matrix.os }} + + steps: + - uses: actions/checkout@v3 + - name: Set up Python ${{ matrix.python-version }} + uses: actions/setup-python@v4 + with: + python-version: ${{ matrix.python-version }} + - name: Install project + run: pip install .[formatting] + - name: Run black + run: black . --check --diff --color + - name: Run codespell + run: codespell --enable-colors + - name: Run isort + run: isort looptrace_napari tests --check --diff --color diff --git a/.github/workflows/lint.yaml b/.github/workflows/lint.yaml new file mode 100644 index 0000000..1b04ad5 --- /dev/null +++ b/.github/workflows/lint.yaml @@ -0,0 +1,25 @@ +name: lint + +on: [workflow_call] + +jobs: + lint: + strategy: + fail-fast: false + matrix: + python-version: ["3.10"] + os: [ubuntu-22.04] + runs-on: ${{ matrix.os }} + + steps: + - uses: actions/checkout@v3 + - name: Set up Python ${{ matrix.python-version }} + uses: actions/setup-python@v4 + with: + python-version: ${{ matrix.python-version }} + - name: Install project + run: pip install .[linting] + - name: Run mypy on Python ${{ matrix.python-version }} on ${{ matrix.os }} + run: mypy + - name: Run pylint on Python ${{ matrix.python-version }} on ${{ matrix.os }} + run: pylint looptrace_napari diff --git a/.github/workflows/tests.yaml b/.github/workflows/tests.yaml new file mode 100644 index 0000000..52a1825 --- /dev/null +++ b/.github/workflows/tests.yaml @@ -0,0 +1,23 @@ +name: Tests + +on: [workflow_call] + +jobs: + tests: + strategy: + fail-fast: false + matrix: + python-version: [ "3.10", "3.11", "3.12" ] + os: [ ubuntu-latest, macos-latest, windows-latest, ubuntu-20.04 ] + runs-on: ${{ matrix.os }} + + steps: + - uses: actions/checkout@v3 + - name: Set up Python ${{ matrix.python-version }} on ${{ matrix.os }} + uses: actions/setup-python@v4 + with: + python-version: ${{ matrix.python-version }} + - name: Install project + run: python -m pip install .[test] + - name: Run unit tests on Python ${{ matrix.python-version }} on ${{ matrix.os }} + run: pytest tests -vv --cov=./ --cov-report=xml diff --git a/.gitignore b/.gitignore index 8b95a8a..a550073 100644 --- a/.gitignore +++ b/.gitignore @@ -4,3 +4,6 @@ build/ *.pyc .venv/ __pycache__/ + +# test-related +.coverage* diff --git a/docs/development.md b/docs/development.md index e91fba8..84c71a5 100644 --- a/docs/development.md +++ b/docs/development.md @@ -6,3 +6,30 @@ By default, with `nix-shell`, you should have all the dependencies you'll need n In other words, dependencies to do things like run tests, run linter(s), and run type checking should all be provided. If you try and that's not the case, please check the [Issue Tracker](https://github.com/gerlichlab/looptrace-napari/issues). If an issue's open for what you're experiencing, please upvote the initial description of the issue and/or comment on the ticket. + +### Testing, formatting, and linting +Here's what corresponds approximately to what's run for CI through the project's [GitHub actions workflows](../.github/workflows/). + +NB: Each of the following commands is to be run _from the project [root folder](../)_. + +__Run test suite__ with coverage statistics +```console +pytest tests -vv --cov=. +``` + +__Run formatters__ +```console +black . +codespell +isort looptrace_napari tests +``` + +__Run type checker__ +```console +mypy looptrace_napari +``` + +__Run linter__ +```console +pylint looptrace_napari +``` \ No newline at end of file diff --git a/docs/user_docs/locus-spots.md b/docs/user_docs/locus-spots.md index c73e66e..4dfd321 100644 --- a/docs/user_docs/locus-spots.md +++ b/docs/user_docs/locus-spots.md @@ -20,6 +20,6 @@ __Scrolling__ In version 0.2 of this plugin, there's an image plane for every z-slice, for every timepoint, for every ROI in the particular field of view. This can makes scrolling time-consuming and will be simplified in a future release. -__Finding points (centers of Guassian fits to pixel volume)__ +__Finding points (centers of Gaussian fits to pixel volume)__ In version 0.2 of this plugin, a point (blue or red indicator) can only be visible only in the z-slice closest to the z-coordinate of the the center of the Gaussian fit to a particular pixel volume. Generally, these will be more toward the middle of the z-stack rather than at either end, so focus scrolling mainly through the midrange of the z-axis. diff --git a/looptrace_napari/_docs.py b/looptrace_napari/_docs.py index 7bdc513..a6f8773 100644 --- a/looptrace_napari/_docs.py +++ b/looptrace_napari/_docs.py @@ -1,12 +1,17 @@ """Documentation-related helpers""" from dataclasses import dataclass + from typing_extensions import Annotated + from .types import PathOrPaths @dataclass(frozen=True, kw_only=True) class CommonReaderDoc: - path_type = Annotated[PathOrPaths, "The file(s) or folder(s) for which to attempt to infer reader function"] + path_type = Annotated[ + PathOrPaths, + "The file(s) or folder(s) for which to attempt to infer reader function", + ] returns = "A None if we can't read the given path(s), else a reader function" notes = "https://napari.org/stable/plugins/guides.html#layer-data-tuples" diff --git a/looptrace_napari/geometry.py b/looptrace_napari/geometry.py index b8b2244..1f9e8dc 100644 --- a/looptrace_napari/geometry.py +++ b/looptrace_napari/geometry.py @@ -3,15 +3,17 @@ from abc import abstractmethod from dataclasses import dataclass from typing import Protocol + +from numpydoc_decorator import doc # type: ignore[import-untyped] + # TODO: need Python >= 3.12 # See: https://github.com/gerlichlab/looptrace-napari/issues/6 -#from typing import override - -from numpydoc_decorator import doc +# from typing import override class LocatableXY(Protocol): - + """Something that admits x- and y-coordinate.""" + @abstractmethod def get_x_coordinate(self) -> float: raise NotImplementedError @@ -21,6 +23,13 @@ def get_y_coordinate(self) -> float: raise NotImplementedError +@doc( + summary="Bundle x and y position to create point in 2D space.", + parameters=dict( + x="Position in x", + y="Position in y", + ), +) @dataclass(kw_only=True, frozen=True) class ImagePoint2D(LocatableXY): x: float @@ -31,14 +40,14 @@ def __post_init__(self) -> None: raise TypeError(f"At least one coordinate isn't floating-point! {self}") if any(c < 0 for c in [self.x, self.y]): raise ValueError(f"At least one coordinate is negative! {self}") - + # TODO: adopt @override once on Python >= 3.12 # See: https://github.com/gerlichlab/looptrace-napari/issues/6 def get_x_coordinate(self) -> float: return self.x - + # TODO: adopt @override once on Python >= 3.12 # See: https://github.com/gerlichlab/looptrace-napari/issues/6 - #@override + # @override def get_y_coordinate(self) -> float: return self.y diff --git a/looptrace_napari/locus_specific_points_reader.py b/looptrace_napari/locus_specific_points_reader.py index c113498..f8c843c 100644 --- a/looptrace_napari/locus_specific_points_reader.py +++ b/looptrace_napari/locus_specific_points_reader.py @@ -4,24 +4,19 @@ import logging import os from pathlib import Path -from typing import ( - Callable, - Literal, - Optional, - Tuple, - Union, -) -from numpydoc_decorator import doc +from typing import Callable, Literal, Optional, Tuple, Union + +from numpydoc_decorator import doc # type: ignore[import-untyped] from ._docs import CommonReaderDoc -from .types import ( - CsvRow, +from .types import ( # pylint: disable=unused-import + CsvRow, LayerParams, - PathLike, + PathLike, PointRecord, Timepoint, TraceId, - ) +) __author__ = "Vince Reuter" __credits__ = ["Vince Reuter"] @@ -35,56 +30,71 @@ @doc( - summary="This is the main hook required by napari / napari plugins to provide a Reader plugin.", + summary=( + "This is the main hook required by napari / napari plugins to provide a Reader" + " plugin." + ), returns=CommonReaderDoc.returns, notes=CommonReaderDoc.notes, ) -def get_reader(path: CommonReaderDoc.path_type) -> Optional[Callable[[PathLike], list[FullLayerData]]]: - static_params = {"size": 0.5, "edge_width": 0.05, "edge_width_is_relative": True, "n_dimensional": False} +def get_reader( + path: CommonReaderDoc.path_type, +) -> Optional[Callable[[PathLike], list[FullLayerData]]]: + static_params = { + "size": 0.5, + "edge_width": 0.05, + "edge_width_is_relative": True, + "n_dimensional": False, + } - def do_not_parse(why: str, *, level: int = logging.DEBUG): - logging.log(level=level, msg=f"{why}, cannot be read as looptrace locus-specific points: {path}") - return None + def do_not_parse(why: str, *, level: int = logging.DEBUG) -> None: + return logging.log( + level=level, + msg=f"{why}, cannot be read as looptrace locus-specific points: {path}", + ) # Input should be a single extant filepath. if not isinstance(path, (str, Path)): - return do_not_parse("Not a path-like") + return do_not_parse("Not a path-like") # type: ignore[func-returns-value] if not os.path.isfile(path): - return do_not_parse("Not an extant file") - + return do_not_parse("Not an extant file") # type: ignore[func-returns-value] + # Input should also be a CSV. base, ext = os.path.splitext(os.path.basename(path)) if ext != ".csv": - return do_not_parse("Not a CSV") - + return do_not_parse("Not a CSV") # type: ignore[func-returns-value] + # Determine how to read and display the points layer to be parsed. status_name = base.lower().split(".")[-1].lstrip("qc_").lstrip("qc") if status_name in {"pass", "passed"}: - logging.debug(f"Parsing as QC-pass: {path}") + logging.debug("Parsing as QC-pass: %s", path) color = "red" symbol = "*" read_rows = parse_passed elif status_name in {"fail", "failed"}: - logging.debug(f"Parsing as QC-fail: {path}") + logging.debug("Parsing as QC-fail: %s", path) color = "blue" symbol = "o" read_rows = parse_failed else: - return do_not_parse(f"Could not infer QC status from '{status_name}'", level=logging.WARNING) - + return do_not_parse( # type: ignore[func-returns-value] + f"Could not infer QC status from '{status_name}'", level=logging.WARNING + ) + # Build the actual parser function. base_meta = {"edge_color": color, "face_color": color, "symbol": symbol} + def parse(p): - with open(p, mode='r', newline='') as fh: + with open(p, mode="r", newline="") as fh: rows = list(csv.reader(fh)) data, extra_meta = read_rows(rows) if not data: logging.warning("No data rows parsed!") params = {**static_params, **base_meta, **extra_meta} return data, params, "points" - + return lambda p: [parse(p)] - + @doc( summary="Parse records from points which passed QC.", @@ -113,7 +123,9 @@ def parse_failed(records: list[CsvRow]) -> Tuple[RawLocusPointsLike, LayerParams data = [] codes = [] else: - data_codes_pairs: list[Tuple[PointRecord, QCFailReasons]] = [(parse_simple_record(r, exp_num_fields=6), r[5]) for r in records] + data_codes_pairs: list[Tuple[PointRecord, QCFailReasons]] = [ + (parse_simple_record(r, exp_num_fields=6), r[5]) for r in records + ] data, codes = map(list, zip(*data_codes_pairs)) return data, {"text": QC_FAIL_CODES_KEY, "properties": {QC_FAIL_CODES_KEY: codes}} @@ -122,19 +134,23 @@ def parse_failed(records: list[CsvRow]) -> Tuple[RawLocusPointsLike, LayerParams summary="Parse single-point from a single record (e.g., row from a CSV file).", parameters=dict( r="Record (e.g. CSV row) to parse", - exp_num_fields="The expected number of data fields (e.g., columns) in the record", + exp_num_fields=( + "The expected number of data fields (e.g., columns) in the record" ), + ), returns=""" A pair of values in which the first element represents a locus spot's trace ID and timepoint, and the second element represents the (z, y, x) coordinates of the centroid of the spot fit. """, ) -def parse_simple_record(r: CsvRow, *, exp_num_fields: int) -> PointRecord: +def parse_simple_record(r: CsvRow, *, exp_num_fields: int) -> RawLocusPointsLike: """Parse a single line from an input CSV file.""" if not isinstance(r, list): raise TypeError(f"Record to parse must be list, not {type(r).__name__}") if len(r) != exp_num_fields: - raise ValueError(f"Expected record of length {exp_num_fields} but got {len(r)}: {r}") + raise ValueError( + f"Expected record of length {exp_num_fields} but got {len(r)}: {r}" + ) trace = int(r[0]) timepoint = int(r[1]) z = float(r[2]) diff --git a/looptrace_napari/nuclei_reader.py b/looptrace_napari/nuclei_reader.py index c588900..ee93fef 100644 --- a/looptrace_napari/nuclei_reader.py +++ b/looptrace_napari/nuclei_reader.py @@ -1,24 +1,28 @@ """Reading looptrace-written, ZARR-stored data""" +import logging +import os +import warnings from dataclasses import dataclass from enum import Enum -import logging from operator import itemgetter -import os from pathlib import Path from typing import Dict, Literal, Mapping, Optional, Tuple, Union -import warnings import dask.array as da +import numpy as np import numpy.typing as npt -from numpydoc_decorator import doc import pandas as pd -import zarr +import zarr # type: ignore[import-untyped] +from numpydoc_decorator import doc # type: ignore[import-untyped] -from .geometry import ImagePoint2D from ._docs import CommonReaderDoc +from .geometry import ImagePoint2D from .types import FieldOfViewFrom1, LayerParams, NucleusNumber, PathLike +PixelValue = Union[np.uint8, np.uint16] +PixelArray = Union[npt.NDArray[PixelValue], da.Array] + # Generic ArrayLike since element type differs depending on kind of layer. FullLayerData = Tuple[npt.ArrayLike, LayerParams, "LayerTypeName"] # We'll produce one of each of these types of layers. @@ -34,27 +38,33 @@ @doc( - summary="This is the main hook required by napari / napari plugins to provide a Reader plugin.", + summary=( + "This is the main hook required by napari / napari plugins to provide a Reader" + " plugin." + ), returns=CommonReaderDoc.returns, notes=CommonReaderDoc.returns, ) def get_reader(path: CommonReaderDoc.path_type): - def do_not_parse(why: str, *, level: int = logging.DEBUG): - logging.log(level=level, msg=f"{why}, cannot read looptrace nuclei visualisation data: {path}") - return None - + def do_not_parse(why: str, *, level: int = logging.DEBUG) -> None: + return logging.log( + level=level, + msg=f"{why}, cannot read looptrace nuclei visualisation data: {path}", + ) + # Input should be a single extant folder. if not isinstance(path, (str, Path)): return do_not_parse(f"Not a path-like: {path}") if not os.path.isdir(path): return do_not_parse(f"Not an extant directory: {path}") - path: Path = Path(path) - + path: Path = Path(path) # type: ignore[no-redef] + # Each of the subpaths to parse must be extant folder. if not NucleiDataSubfolders.all_present_within(path): return do_not_parse( - f"At least one subpath to parse isn't a folder! {NucleiDataSubfolders.relpaths(path)}." - ) + "At least one subpath to parse isn't a folder!" + f" {NucleiDataSubfolders.relpaths(path)}." + ) return lambda root: build_layers(NucleiDataSubfolders.read_all_from_root(root)) @@ -66,42 +76,52 @@ class NucleiDataSubfolders(Enum): @classmethod def all_present_within(cls, p: PathLike) -> bool: - return all(m._is_present_within(p) for m in cls) + return all(m.is_present_within(p) for m in cls) @classmethod - def read_all_from_root(cls, p: PathLike) -> Dict[FieldOfViewFrom1, "NucleiVisualisationData"]: + def read_all_from_root( + cls, p: PathLike + ) -> Dict[FieldOfViewFrom1, "NucleiVisualisationData"]: image_paths = find_paths_by_fov(cls.IMAGES.relpath(p), extension=".zarr") masks_paths = find_paths_by_fov(cls.MASKS.relpath(p), extension=".zarr") - centers_paths = find_paths_by_fov(cls.CENTERS.relpath(p), extension=".nuclear_masks.csv") - fields_of_view = set(image_paths.keys()) & set(masks_paths.keys()) & set(centers_paths.keys()) - logging.debug(f"Image paths count: {len(image_paths)}") - logging.debug(f"Masks paths count: {len(masks_paths)}") - logging.debug(f"Centers paths count: {len(centers_paths)}") + centers_paths = find_paths_by_fov( + cls.CENTERS.relpath(p), extension=".nuclear_masks.csv" + ) + fields_of_view = ( + set(image_paths.keys()) + & set(masks_paths.keys()) + & set(centers_paths.keys()) + ) + logging.debug("Image paths count: %d", len(image_paths)) + logging.debug("Masks paths count: %d", len(masks_paths)) + logging.debug("Centers paths count: %d", len(centers_paths)) bundles: Dict[FieldOfViewFrom1, "NucleiVisualisationData"] = {} for fov in sorted(fields_of_view): - logging.debug(f"Reading data for FOV: {fov}") + logging.debug("Reading data for FOV: %d", fov.get) image_fp: Path = image_paths[fov] - logging.debug(f"Reading nuclei image: {image_fp}") + logging.debug("Reading nuclei image: %s", image_fp) image = _read_zarr(image_fp) masks_fp: Path = masks_paths[fov] - logging.debug(f"Reading nuclei masks: {masks_fp}") + logging.debug("Reading nuclei masks: %s", masks_fp) masks = _read_zarr(masks_fp) centers_fp: Path = centers_paths[fov] - logging.debug(f"Reading nuclei centers: {centers_fp}") + logging.debug("Reading nuclei centers: %s", centers_fp) centers = _read_csv(centers_fp) - bundles[fov] = NucleiVisualisationData(image=image, masks=masks, centers=centers) + bundles[fov] = NucleiVisualisationData( + image=image, masks=masks, centers=centers + ) return bundles @classmethod def relpaths(cls, p: PathLike) -> Dict[str, Path]: return {m.value: m.relpath(p) for m in cls} - def _is_present_within(self, p: PathLike) -> bool: + def is_present_within(self, p: PathLike) -> bool: return self.relpath(p).is_dir() def relpath(self, p: PathLike) -> Path: return Path(p) / self.value - + @doc( summary="Bundle the data needed to visualise nuclei.", @@ -117,8 +137,8 @@ def relpath(self, p: PathLike) -> Path: ) @dataclass(frozen=True, kw_only=True) class NucleiVisualisationData: - image: npt.ArrayLike - masks: npt.ArrayLike + image: PixelArray + masks: PixelArray centers: list[Tuple[NucleusNumber, ImagePoint2D]] def __post_init__(self) -> None: @@ -126,29 +146,31 @@ def __post_init__(self) -> None: if len(self.image.shape) == 5: if self.image.shape[0] != 1: raise ValueError( - f"5D image for nuclei visualisation must have trivial first dimension; got {self.image.shape[0]} (not 1)" - ) + "5D image for nuclei visualisation must have trivial first" + f" dimension; got {self.image.shape[0]} (not 1)" + ) object.__setattr__(self, "image", self.image[0]) if len(self.image.shape) == 4: if self.image.shape[0] == 1: logging.debug("Collapsing trivial channel axis for nuclei image") object.__setattr__(self, "image", self.image[0]) else: - logging.debug("Multiple channels in nuclei image; attempting to determine which to use") - nuc_channel = os.getenv(LOOPTRACE_NAPARI_NUCLEI_CHANNEL_ENV_VAR, "") - if nuc_channel == "": - raise Exception(f"When using nuclei images with multiple channels, nuclei channel must be specified through environment variable {LOOPTRACE_NAPARI_NUCLEI_CHANNEL_ENV_VAR}.") - try: - nuc_channel = int(nuc_channel) - except TypeError as e: - raise ValueError( - f"Illegal nuclei channel value (from {LOOPTRACE_NAPARI_NUCLEI_CHANNEL_ENV_VAR}): {nuc_channel}" - ) from e + logging.debug( + "Multiple channels in nuclei image; attempting to determine which" + " to use" + ) + nuc_channel: int = determine_nuclei_channel() if nuc_channel >= self.image.shape[0]: raise ValueError( - f"Illegal nuclei channel value (from {LOOPTRACE_NAPARI_NUCLEI_CHANNEL_ENV_VAR}), {nuc_channel}, for channel axis of length {self.image.shape[0]}" - ) - logging.debug(f"Using nuclei channel (from {LOOPTRACE_NAPARI_NUCLEI_CHANNEL_ENV_VAR}): {nuc_channel}") + "Illegal nuclei channel value (from" + f" {LOOPTRACE_NAPARI_NUCLEI_CHANNEL_ENV_VAR}), {nuc_channel}," + f" for channel axis of length {self.image.shape[0]}" + ) + logging.debug( + "Using nuclei channel (from %s): %d", + LOOPTRACE_NAPARI_NUCLEI_CHANNEL_ENV_VAR, + nuc_channel, + ) object.__setattr__(self, "image", self.image[nuc_channel]) if len(self.image.shape) == 3: if self.image.shape[0] == 1: @@ -156,25 +178,34 @@ def __post_init__(self) -> None: object.__setattr__(self, "image", self.image[0]) else: logging.debug("Max projecting along z for nuclei image") - object.__setattr__(self, "image", max_project_z(self.img)) + object.__setattr__(self, "image", max_project_z(self.image)) if len(self.image.shape) == 2: # All good pass else: - raise ValueError(f"Cannot use image with {len(self.image.shape)} dimension(s) for nuclei visualisation") - + raise ValueError( + f"Cannot use image with {len(self.image.shape)} dimension(s) for nuclei" + " visualisation" + ) + # Then, handle the masks image. if len(self.masks.shape) == 5: if any(d != 1 for d in self.masks.shape[:3]): - raise ValueError(f"5D nuclear masks image with at least 1 nontrivial (t, c, z) axis! {self.masks.shape}") + raise ValueError( + "5D nuclear masks image with at least 1 nontrivial (t, c, z) axis!" + f" {self.masks.shape}" + ) logging.debug("Reducing 5D nuclear masks to 2D") object.__setattr__(self, "masks", self.masks[0, 0, 0]) if len(self.masks.shape) != 2: - raise ValueError(f"Need 2D image for nuclear masks! Got {len(self.masks.shape)}: {self.masks.shape}") + raise ValueError( + f"Need 2D image for nuclear masks! Got {len(self.masks.shape)}:" + f" {self.masks.shape}" + ) def build_layers( - bundles: Mapping[FieldOfViewFrom1, NucleiVisualisationData] + bundles: Mapping[FieldOfViewFrom1, NucleiVisualisationData], ) -> Tuple[ImageLayer, MasksLayer, CentroidsLayer]: images = [] masks = [] @@ -186,26 +217,74 @@ def build_layers( for nuc, pt in visdata.centers: nuclei_points.append([i, pt.get_y_coordinate(), pt.get_x_coordinate()]) nuclei_labels.append(nuc.get) - + # Prep the data for presentation as layers. - images = da.stack(images) - logging.debug(f"Image layer data shape: {images.shape}") - masks = da.stack(masks) - logging.debug(f"Masks layer data shape: {masks.shape}") - + images: da.Array = da.stack(images) # type: ignore[no-redef] + logging.debug("Image layer data shape: %s", images.shape) # type: ignore[attr-defined] + masks: da.Array = da.stack(masks) # type: ignore[no-redef] + logging.debug("Masks layer data shape: %s", masks.shape) # type: ignore[attr-defined] + labs_text = { "string": "{nucleus}", - "size": 10, # tested on FOVs of 2048 (x) x 2044 (y), with ~15-20 nuclei per FOV + "size": 10, # tested on FOVs of 2048 (x) x 2044 (y), with ~15-20 nuclei per FOV "color": "black", } - points_params = {"name": "labels", "text": labs_text, "properties": {"nucleus": nuclei_labels}} + points_params = { + "name": "labels", + "text": labs_text, + "properties": {"nucleus": nuclei_labels}, + } images_layer = (images, {"name": "max_proj_z"}, "image") masks_layer = (masks, {"name": "masks"}, "labels") points_layer = (nuclei_points, points_params, "points") - return images_layer, masks_layer, points_layer + return images_layer, masks_layer, points_layer # type: ignore[return-value] + + +@doc( + summary=( + "Read (from environment variable) the image channel in which to find nuclei" + " signal." + ), + raises=dict( + ValueError="When the environment variable value's set to a non-integer-like", + ), + returns="The integer value of image channel in which to find nuclei signal", +) +def determine_nuclei_channel(): + nuc_channel = os.getenv(LOOPTRACE_NAPARI_NUCLEI_CHANNEL_ENV_VAR, "") + if nuc_channel == "": + raise ValueError( + "When using nuclei images with multiple channels, nuclei channel must be" + " specified through environment variable" + f" {LOOPTRACE_NAPARI_NUCLEI_CHANNEL_ENV_VAR}." + ) + try: + nuc_channel: int = int(nuc_channel) # type: ignore[no-redef] + except TypeError as e: + raise ValueError( + "Illegal nuclei channel value (from" + f" {LOOPTRACE_NAPARI_NUCLEI_CHANNEL_ENV_VAR}): {nuc_channel}" + ) from e +@doc( + summary=( + "Directly in given folder, find all filepaths with given extension and a field" + " of view embedded in filename" + ), + parameters=dict( + folder="Path to folder in which to find files", + extension="The extension of files to find", + ), + raises=dict( + Exception="If the same FOV is found to correspond to more than one path" + ), + returns="Mapping from field of view to filepath", + see_also=dict( + get_fov_sort_key="The function used to try to parse FOV from filename" + ), +) def find_paths_by_fov(folder: Path, *, extension: str) -> Dict[FieldOfViewFrom1, Path]: image_paths = {} for fn in os.listdir(folder): @@ -213,32 +292,59 @@ def find_paths_by_fov(folder: Path, *, extension: str) -> Dict[FieldOfViewFrom1, fov = get_fov_sort_key(fp, extension=extension) if fov is not None: if fov in image_paths: - raise Exception(f"FOV {fov} already seen in folder! {folder}") + raise RuntimeError(f"FOV {fov} already seen in folder! {folder}") image_paths[fov] = fp return image_paths +@doc( + summary="Get the sort key (by FOV) for the given filename or filepath.", + parameters=dict( + path="The path for which to get the FOV (to be used as sort key)", + extension="The expected extension of the file name or path", + ), + raises=dict( + TypeError="If the given value is neither a path nor a string", + ), + returns="Field of view parsed from filename, if parse succeeded; otherwise, None", +) def get_fov_sort_key(path: PathLike, *, extension: str) -> Optional[FieldOfViewFrom1]: if not isinstance(path, (str, Path)): - raise TypeError(f"Cannot parse sort-by-FOV key for {extension} stack from alleged path: {path} (type {type(path).__name__})") + raise TypeError( + f"Cannot parse sort-by-FOV key for {extension} stack from alleged path:" + f" {path} (type {type(path).__name__})" + ) _, fn = os.path.split(path) if not (fn.startswith("P") and fn.endswith(extension)): return None - rawval = fn.lstrip("P").rstrip(extension) + rawval: str = fn.lstrip("P").rstrip(extension) if rawval.endswith(extension): # Support older looptrace-emitted data. - warnings.warn(f"Stripping second '{extension}' extension; use data from newer software", DeprecationWarning) + warnings.warn( + f"Stripping second '{extension}' extension; use data from newer software", + DeprecationWarning, + ) rawval = rawval.rstrip(extension) try: - rawval = int(rawval) + rawval: int = int(rawval) # type: ignore[no-redef] except ValueError: return None - return FieldOfViewFrom1(rawval) + return FieldOfViewFrom1(rawval) # type: ignore[arg-type] -def max_project_z(img: npt.ArrayLike) -> npt.ArrayLike: +@doc( + summary=( + "Max project images z-stack along the z-axis, assumed to be axis 0 (first" + " axis)." + ), + parameters=dict(img="The image stack to max-project along z"), + raises=dict(ValueError="If the given image isn't 3D (z-stack of 2D images)"), +) +def max_project_z(img: PixelArray) -> PixelArray: if len(img.shape) != 3: - raise ValueError(f"Image to max-z-project must have 3 dimensions! Got {len(img.shape)}") + raise ValueError( + f"Image to max-z-project must have 3 dimensions! Got {len(img.shape)}" + ) return da.max(img, axis=0) @@ -249,20 +355,26 @@ def __init__(self, *, path: Path, msg: str) -> None: def _read_csv(fp: Path) -> list[Tuple[NucleusNumber, ImagePoint2D]]: - logging.debug(f"Reading CSV: {fp}") + logging.debug("Reading CSV: %s", fp) df = pd.read_csv(fp, index_col=0) - nuclei = df["label"] # preserves data type of this column / field, .iterrows() seems to lose it. + nuclei = df[ + "label" + ] # preserves data type of this column / field, .iterrows() seems to lose it. ys = df["yc"] xs = df["xc"] - return [(NucleusNumber(n), ImagePoint2D(y=y, x=x)) for n, y, x in zip(nuclei, ys, xs)] + return [ + (NucleusNumber(n), ImagePoint2D(y=y, x=x)) for n, y, x in zip(nuclei, ys, xs) + ] -def _read_zarr(root: Path) -> npt.ArrayLike: - logging.debug(f"Reading ZARR: {root}") +def _read_zarr(root: Path) -> PixelArray: + logging.debug("Reading ZARR: %s", root) if (root / ".zarray").is_file(): data_root = root elif (root / "0" / ".zarray").is_file(): data_root = root / "0" else: - raise ZarrParseException(path=root, msg="Failed to find .zarray to indicate data folder") + raise ZarrParseException( + path=root, msg="Failed to find .zarray to indicate data folder" + ) return zarr.open(data_root)[:] diff --git a/looptrace_napari/types.py b/looptrace_napari/types.py index 8edea31..e61f501 100644 --- a/looptrace_napari/types.py +++ b/looptrace_napari/types.py @@ -4,7 +4,7 @@ from pathlib import Path from typing import Dict, Tuple, Union -from numpydoc_decorator import doc +from numpydoc_decorator import doc # type: ignore[import-untyped] CsvRow = list[str] FailCodesText = str @@ -21,12 +21,12 @@ @doc( - summary="Wrap an int as a 1-based field of view (FOV).", - parameters=dict(get="The value to wrape"), + summary="Wrap an int as a 1-based field of view (FOV).", + parameters=dict(get="The value to wrape"), raises=dict( TypeError="If the given value to wrap isn't an integer", ValueError="If the given value to wrap isn't positive", - ) + ), ) @dataclass(frozen=True, order=True) class FieldOfViewFrom1: @@ -37,12 +37,12 @@ def __post_init__(self) -> None: @doc( - summary="Wrap an int as a 1-based nucleus number.", - parameters=dict(get="The value to wrap"), + summary="Wrap an int as a 1-based nucleus number.", + parameters=dict(get="The value to wrap"), raises=dict( TypeError="If the given value to wrap isn't an integer", ValueError="If the given value to wrap isn't positive", - ) + ), ) @dataclass(frozen=True, order=True) class NucleusNumber: @@ -56,4 +56,4 @@ def _refine_as_pos_int(x: object, *, context: str) -> None: if not isinstance(x, int): raise TypeError(f"Non-integer as {context}! {x} (type {type(x).__name__})") if x < 1: - raise ValueError(f"1-based {context} must be positive int, not {x}") \ No newline at end of file + raise ValueError(f"1-based {context} must be positive int, not {x}") diff --git a/pyproject.toml b/pyproject.toml index 49dab5b..21e01c8 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -15,6 +15,7 @@ classifiers = [ "Programming Language :: Python :: 3.10", "Programming Language :: Python :: 3.11", "Topic :: Scientific/Engineering :: Bio-Informatics", + "Typing :: Typed", ] dependencies = [ "napari >= 0.4.19; sys_platform == 'darwin'", @@ -27,6 +28,15 @@ dependencies = [ "zarr >= 2.4.12", ] +[project.urls] +Homepage = "https://github.com/gerlichlab/looptrace-napari" +Repository = "https://github.com/gerlichlab/looptrace-napari.git" +Issues = "https://github.com/gerlichlab/looptrace-napari/issues" + +[build-system] +requires = ["setuptools", "wheel"] +build-backend = "setuptools.build_meta" + [tool.setuptools.package-data] looptrace_napari = [ "napari.yaml" ] @@ -38,14 +48,33 @@ looptrace-napari = "looptrace_napari:napari.yaml" [project.optional-dependencies] test = [ - "pytest", + "black[colorama] >= 24.1", + "codespell >= 2.2.4", + "isort >= 5.10", + "mypy >= 1.0.1", + "pandas-stubs", + "pylint >= 3", + "pytest >= 7.1.0", + "pytest-cov", ] -[project.urls] -Homepage = "https://github.com/gerlichlab/looptrace-napari" -Repository = "https://github.com/gerlichlab/looptrace-napari.git" -Issues = "https://github.com/gerlichlab/looptrace-napari/issues" +[tool.black] +enable-unstable-feature = [ + "string_processing", # split long formatted strings +] +preview = true + +[tool.codespell] +skip = ".git,.mypy_cache,.nox,.vscode,__pycache__,poetry.lock" +builtin = "clear,rare,informal,usage,code,names" +ignore-words-list = "jupyter,iff" # prevent jupyter -> jupiter, iff -> if +check-filenames = true +uri-ignore-words-list = "*" + +[tool.isort] +profile = "black" + +[tool.pylint] +# Disable missing docstrings since in general they're handled by numpydoc_decorator's @doc. +"messages control".disable = "fixme,missing-class-docstring,missing-function-docstring,too-many-branches,unspecified-encoding,use-dict-literal" -[build-system] -requires = ["setuptools", "wheel"] -build-backend = "setuptools.build_meta" diff --git a/shell.nix b/shell.nix index 29804ae..14c52cd 100644 --- a/shell.nix +++ b/shell.nix @@ -7,7 +7,7 @@ }: let pyenv = pkgs.python311.withPackages (pp: with pp; [ pip wheel ]); - pipInstallExtras = if dev then "\"[dev]\"" else ""; + pipInstallExtras = if dev then "\"[test]\"" else ""; in pkgs.mkShell { name = "looptrace-napari-env"; diff --git a/tests/test_get_fov_sort_key.py b/tests/test_get_fov_sort_key.py index 2c0f4a3..6e87920 100644 --- a/tests/test_get_fov_sort_key.py +++ b/tests/test_get_fov_sort_key.py @@ -7,12 +7,13 @@ @pytest.mark.parametrize( - ["folder", "extension", "expected"], [ - ("P0001.zarr", ".zarr", FieldOfViewFrom1(1)), + ["folder", "extension", "expected"], + [ + ("P0001.zarr", ".zarr", FieldOfViewFrom1(1)), ("P0001.zarr", "zarr", None), ("P0002.zarr", ".zarr", FieldOfViewFrom1(2)), - ("P0001.zarr", ".csv", None) - ] + ("P0001.zarr", ".csv", None), + ], ) def test_simple_zarr_folder_sort_keys(tmp_path, folder, extension, expected): arg = tmp_path / folder