Skip to content

Commit

Permalink
Enable renaming in bioimg exporter (#542)
Browse files Browse the repository at this point in the history
* Enable renaming in bioimg exporter

* PR changes pfish

* Small fix

* Move check block higher in function
  • Loading branch information
ktsitsi authored Mar 22, 2024
1 parent 200bdfa commit 3256d0f
Showing 1 changed file with 87 additions and 45 deletions.
132 changes: 87 additions & 45 deletions src/tiledb/cloud/bioimg/exportation.py
Original file line number Diff line number Diff line change
@@ -1,26 +1,31 @@
from typing import Any, Dict, Iterator, Mapping, Optional, Sequence, Tuple, Union
import logging
from typing import Any, Iterator, Mapping, Optional, Sequence, Tuple, Union

import tiledb
from tiledb.cloud import dag
from tiledb.cloud.bioimg.helpers import get_logger_wrapper
from tiledb.cloud.bioimg.helpers import validate_io_paths
from tiledb.cloud.dag.mode import Mode
from tiledb.cloud.rest_api.models import RetryStrategy
from tiledb.cloud.utilities._common import as_batch
from tiledb.cloud.utilities._common import run_dag

DEFAULT_RESOURCES = {"cpu": "8", "memory": "4Gi"}
DEFAULT_IMG_NAME = "3.9-imaging-dev"
DEFAULT_DAG_NAME = "bioimg-exportation"
_RUNNING_PROFILES = ("client", "server")


def export(
source: Union[Sequence[str], str],
output: str,
output: Union[Sequence[str], str],
*args: Any,
access_credentials_name: str,
config: Optional[Mapping[str, Any]] = None,
taskgraph_name: Optional[str] = None,
num_batches: Optional[int] = None,
resources: Optional[Mapping[str, Any]] = None,
compute: bool = True,
run_on: Optional[str] = None,
mode: Optional[Mode] = Mode.BATCH,
namespace: Optional[str] = None,
verbose: bool = False,
output_ext: str = "tiff",
Expand All @@ -29,7 +34,9 @@ def export(
"""The function exports microscopy images from TileDB arrays
:param source: uri / iterable of uris of input files
:param output: output dir for the exported tiledb arrays
If the uri points to a directory of files make sure it ends with a trailing '/'
:param output: uri / iterable of uris of input files.
If the uri points to a directory of files make sure it ends with a trailing '/'
:param config: dict configuration to pass credentials of the destination
:param taskgraph_name: Optional name for taskgraph, defaults to None
:param num_batches: Number of graph nodes to spawn.
Expand All @@ -39,22 +46,30 @@ def export(
defaults to None
:param compute: When True the DAG returned will be computed inside the function
otherwise DAG will only be returned.
:param mode: By default runs Mode.Batch
:param run_on: By default runs on server if value is "client" runs client side.
:param namespace: The namespace where the DAG will run
:param verbose: verbose logging, defaults to False
:output_ext: extension for the output images in tiledb
:param access_credentials_name: Access Credentials Name (ACN) registered
in TileDB Cloud (ARN type)
:param output_ext: extension for the output images in tiledb
"""

if not access_credentials_name:
raise ValueError(
"Ingestion graph requires `access_credentials_name` to be set."
)
logger = get_logger_wrapper(verbose)
logger.debug("Exporting files: %s", source)
max_workers = None if num_batches else 20 # Default picked heuristically.

def build_io_uris_exportation(
source: Sequence[str], output_dir: str, output_ext: str
source: Sequence[str],
output: Sequence[str],
output_ext: str,
logger: logging.Logger,
):
"""Match input uri/s with output destinations
:param source: A sequence of paths or path to input
:param output_dir: A path to the output directory
"""
Expand All @@ -64,36 +79,65 @@ def build_io_uris_exportation(

vfs = tiledb.VFS()

def create_output_path(input_file, output_dir) -> str:
def create_output_path(input_file: str, output: str) -> str:
# Check if output is dir
if not output.endswith("/"):
# The output is considered a target file
return output
filename = os.path.splitext(os.path.basename(input_file))[0]
output_filename = filename + f".{output_ext}" if output_ext else filename
return os.path.join(output_dir, output_filename)

def iter_paths(source: Sequence) -> Iterator[Tuple]:
for uri in source:
if vfs.is_dir(uri) and tiledb.object_type(uri) != "group":
# Folder for exploration
contents = vfs.ls(uri)
yield from iter_paths(contents)
elif tiledb.object_type(uri) == "group":
# For exportation we require the source path to be a tiledb group
yield uri, create_output_path(uri, output_dir)

if len(source) == 0:
raise ValueError("The source files cannot be empty")
return tuple(iter_paths(source))
output_filename = f"{filename}.{output_ext}" if output_ext else filename
return os.path.join(output, output_filename)

def iter_paths(source: Sequence[str], output: Sequence[str]) -> Iterator[Tuple]:
if len(output) != 1:
for s, o in zip(source, output):
if tiledb.object_type(s) == "group":
logger.debug("Pair %s and %s", s, o)
yield s, create_output_path(s, o)
else:
logger.debug("Input %s is not a tiledb asset", s)
continue
else:
logger.debug("Traverse source: %s", source)
for s in source:
if tiledb.object_type(s) != "group":
# Folder for exploration
contents = vfs.ls(s)
# Explore folders only at depth 1
filtered_contents = [
c for c in contents if tiledb.object_type(c) == "group"
]
yield from iter_paths(filtered_contents, output)
else:
logger.debug("Pair %s and %s", s, output[0])
yield s, create_output_path(s, output[0])

logger.debug("Create pairs between %s and %s", source, output)
return tuple(iter_paths(source, output))

def build_input_batches(
source: Sequence[str], output: str, num_batches: int, out_ext: str
source: Sequence[str],
output: Sequence[str],
num_batches: int,
out_ext: str,
*,
verbose: bool,
):
logger = get_logger_wrapper(verbose)

"""Groups input URIs into batches."""
uri_pairs = build_io_uris_exportation(source, output, out_ext)
uri_pairs = build_io_uris_exportation(source, output, out_ext, logger)
# If the user didn't specify a number of batches, run every import
# as its own task.
logger.debug("Input batches: %s", uri_pairs)
logger.debug("The io pairs for ingestion: %s:", uri_pairs)
my_num_batches = num_batches or len(uri_pairs)
# If they specified too many batches, don't create empty tasks.
my_num_batches = min(len(uri_pairs), my_num_batches)
return [uri_pairs[n::my_num_batches] for n in range(my_num_batches)]
logger.debug("Number of batches: %r", my_num_batches)
split_batches = [uri_pairs[n::my_num_batches] for n in range(my_num_batches)]
logger.debug("Split batches: %r", split_batches)
return split_batches

def export_tiff_udf(
io_uris: Sequence[Tuple],
Expand Down Expand Up @@ -127,25 +171,24 @@ def export_tiff_udf(
**kwargs,
)

if isinstance(source, str):
# Handle only lists
source = [source]
logger.debug("Exporting files: %s", source)
source = [source] if isinstance(source, str) else source
output = [output] if isinstance(output, str) else output
validate_io_paths(source, output, for_registration=False)

# Build the task graph
dag_name = taskgraph_name or DEFAULT_DAG_NAME

logger.debug("Building graph")

run_mode = run_on or "server"
if run_mode not in _RUNNING_PROFILES:
raise ValueError("Invalid value for argument 'run_on'")

graph = dag.DAG(
name=dag_name,
mode=dag.Mode.REALTIME if run_mode == "client" else dag.Mode.BATCH,
mode=mode,
max_workers=max_workers,
namespace=namespace,
retry_strategy=RetryStrategy(
limit=3,
retry_policy="Always",
),
)

# The lister doesn't need many resources.
Expand All @@ -155,13 +198,13 @@ def export_tiff_udf(
output,
num_batches,
output_ext,
access_credentials_name=kwargs.get("access_credentials_name"),
*args,
verbose=verbose,
access_credentials_name=access_credentials_name,
name=f"{dag_name} input collector",
result_format="json",
)

logger.debug("Batched Input-Output pairs: %s", input_list_node)

graph.submit(
export_tiff_udf,
input_list_node,
Expand All @@ -172,6 +215,7 @@ def export_tiff_udf(
expand_node_output=input_list_node,
resources=DEFAULT_RESOURCES if resources is None else resources,
image_name=DEFAULT_IMG_NAME,
access_credentials_name=access_credentials_name,
**kwargs,
)

Expand All @@ -180,7 +224,5 @@ def export_tiff_udf(
return graph


def export_udf(*args: Any, **kwargs: Any) -> Dict[str, str]:
"""Exporter wrapper function that can be used as a UDF."""
grf = export(*args, **kwargs)
return {"status": "started", "graph_id": str(grf.server_graph_uuid)}
# Wrapper function for batch VCF ingestion
export_batch = as_batch(export)

0 comments on commit 3256d0f

Please sign in to comment.