Skip to content

Commit

Permalink
Local ingestion and use of run_dag (#473)
Browse files Browse the repository at this point in the history
* Local ingestion and use of run_dag

* Importing dag module - fix reference before assignment graph

* local feature only on realtime dag mode

* Changing naming to follow exporter's convention

* Addressing PR comments
  • Loading branch information
ktsitsi authored Feb 1, 2024
1 parent cdaa3e4 commit 0ef039e
Showing 1 changed file with 6 additions and 3 deletions.
9 changes: 6 additions & 3 deletions src/tiledb/cloud/bioimg/ingestion.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from tiledb.cloud import dag
from tiledb.cloud.bioimg.helpers import get_logger_wrapper
from tiledb.cloud.bioimg.helpers import serialize_filter
from tiledb.cloud.dag.mode import Mode
from tiledb.cloud.rest_api.models import RetryStrategy
from tiledb.cloud.utilities._common import run_dag

Expand All @@ -23,6 +24,7 @@ def ingest(
threads: Optional[int] = 8,
resources: Optional[Mapping[str, Any]] = None,
compute: bool = True,
mode: Optional[Mode] = Mode.BATCH,
namespace: Optional[str],
verbose: bool = False,
exclude_metadata: bool = False,
Expand All @@ -42,9 +44,10 @@ def ingest(
defaults to None
:param compute: When True the DAG returned will be computed inside the function
otherwise DAG will only be returned.
:param mode: By default runs Mode.Batch
:param namespace: The namespace where the DAG will run
:param verbose: verbose logging, defaults to False
:output_ext: extension for the output images in tiledb
:param output_ext: extension for the output images in tiledb
"""

logger = get_logger_wrapper(verbose)
Expand Down Expand Up @@ -160,9 +163,10 @@ def ingest_tiff_udf(
dag_name = taskgraph_name or DEFAULT_DAG_NAME

logger.debug("Building graph")

graph = dag.DAG(
name=dag_name,
mode=dag.Mode.BATCH,
mode=mode,
max_workers=max_workers,
namespace=namespace,
retry_strategy=RetryStrategy(
Expand Down Expand Up @@ -203,7 +207,6 @@ def ingest_tiff_udf(
compressor=compressor_serial,
**kwargs,
)

if compute:
run_dag(graph, debug=verbose)
return graph
Expand Down

0 comments on commit 0ef039e

Please sign in to comment.