Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Register ingested groups using the correct URI #679

Merged
merged 3 commits into from
Nov 13, 2024
Merged
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
147 changes: 53 additions & 94 deletions src/tiledb/cloud/soma/ingest.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import logging
import os
import os.path
import pathlib
import re
import warnings
from typing import Any, ContextManager, Dict, Mapping, Optional
Expand Down Expand Up @@ -34,9 +35,7 @@ def register_dataset_udf(
:param config: config dictionary, defaults to None
:param verbose: verbose logging, defaults to False
"""

logger = get_logger_wrapper(verbose)

namespace = namespace or tiledb.cloud.user_profile().default_namespace_charged
tiledb_uri = f"tiledb://{namespace}/{register_name}"

Expand Down Expand Up @@ -103,87 +102,54 @@ def run_ingest_workflow_udf(

logger = get_logger_wrapper(level=logging_level)

h5ad_ingest = None
collector = None
sgillies marked this conversation as resolved.
Show resolved Hide resolved

input_files = []
sgillies marked this conversation as resolved.
Show resolved Hide resolved
vfs = tiledb.VFS(config=extra_tiledb_config)

if vfs.is_dir(input_uri):
if dry_run:
name = "dry-run-h5ad-files"
else:
name = "ingest-h5ad-files"

grf = dag.DAG(
name=name,
mode=dag.Mode.BATCH,
namespace=carry_along.get("namespace", namespace),
)

collector = grf.submit(
lambda output_uri: output_uri,
output_uri=output_uri,
)

for entry_input_uri in vfs.ls(input_uri):
logger.debug("Processing folder item: entry_input_uri=%r", entry_input_uri)
for input_item in vfs.ls(input_uri):
logger.debug(
"Filtering directory items: input_uri=%r, input_item=%r, pattern=%r",
input_uri,
input_item,
pattern,
)

# Subdirectories/subfolders can't be ingested.
if vfs.is_dir(entry_input_uri):
logger.info(
"Skipping sub-directory: entry_input_uri=%r", entry_input_uri
)
continue

base = os.path.basename(entry_input_uri)
base, _ = os.path.splitext(base)

entry_output_uri = output_uri + "/" + base
if not output_uri.endswith("/"):
entry_output_uri += "/"
entry_output_uri += base
Copy link
Collaborator Author

@sgillies sgillies Nov 12, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand why we appended base to the output path twice. Now we only do it once. If twice is a requirement, I can restore that behavior.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's not a requirement


if pattern is not None and not re.search(pattern, entry_input_uri):
logger.info(
"Skipping non-matching input: pattern=%r, entry_input_uri=%r",
pattern,
entry_input_uri,
)
continue

logger.debug("Submitting h5ad file: entry_input_uri=%r", entry_input_uri)

node = grf.submit(
ingest_h5ad,
output_uri=entry_output_uri,
input_uri=entry_input_uri,
measurement_name=measurement_name,
extra_tiledb_config=extra_tiledb_config,
ingest_mode=ingest_mode,
platform_config=platform_config,
resources=ingest_resources, # Apply propagated resources here.
access_credentials_name=carry_along.get("access_credentials_name", acn),
logging_level=logging_level,
dry_run=dry_run,
)
collector.depends_on(node)
# Use the pattern ".h5ad" to select only .h5ad files.
if not vfs.is_dir(input_item) and (
not pattern or re.search(pattern, input_item)
):
logger.debug("Identified input file: input_item=%r", input_item)
input_files.append(input_item)

elif vfs.is_file(input_uri):
name = ("dry-run" if dry_run else "ingest") + "-h5ad-file"
input_files.append(input_item)
else:
raise ValueError("input_uri %r is neither a file nor a directory", input_uri)

grf = dag.DAG(
name=name,
mode=dag.Mode.BATCH,
namespace=carry_along.get("namespace", namespace),
)
logger.info("Building DAG for SOMA ingestion: input_files=%r", input_files)
sgillies marked this conversation as resolved.
Show resolved Hide resolved
grf = dag.DAG(
name=f"{'dry-run' if dry_run else 'ingest'}-h5ad-files",
mode=dag.Mode.BATCH,
namespace=carry_along.get("namespace", namespace),
)
collector = grf.submit(
lambda output_uri: output_uri,
output_uri=output_uri,
)

# We've propagated ingest_resources to this function so that
# we can use it as the resources argument of this method
# call.
h5ad_ingest = grf.submit(
for input_file in input_files:
stem = pathlib.Path(input_file).stem
output_group_uri = os.path.join(output_uri, stem)
sgillies marked this conversation as resolved.
Show resolved Hide resolved
logger.info(
"Building task for h5ad file: input_file=%r, output_group_uri=%r",
input_file,
output_group_uri,
)
node = grf.submit(
ingest_h5ad,
output_uri=output_uri,
input_uri=input_uri,
output_uri=output_group_uri,
input_uri=input_file,
measurement_name=measurement_name,
extra_tiledb_config=extra_tiledb_config,
ingest_mode=ingest_mode,
Expand All @@ -193,29 +159,22 @@ def run_ingest_workflow_udf(
logging_level=logging_level,
dry_run=dry_run,
)
collector.depends_on(node)

else:
raise ValueError("input_uri %r is neither file nor directory", input_uri)

# Register the SOMA result if not DRY-RUN
if not dry_run:
register_soma = grf.submit(
register_dataset_udf,
output_uri,
namespace=namespace,
register_name=register_name,
config=extra_tiledb_config,
verbose=logging_level == logging.DEBUG,
acn=acn,
)

if h5ad_ingest is not None:
register_soma.depends_on(h5ad_ingest)
else:
if not dry_run:
register_soma = grf.submit(
register_dataset_udf,
output_group_uri,
namespace=namespace,
register_name=register_name,
config=extra_tiledb_config,
verbose=logging_level == logging.DEBUG,
acn=acn,
)
register_soma.depends_on(collector)

logger.info("Computing DAG: grf=%r", grf)
grf.compute()

return grf.server_graph_uuid


Expand Down Expand Up @@ -408,7 +367,7 @@ def run_ingest_workflow(
pass

if not register_name:
register_name = os.path.splitext(os.path.basename(output_uri))[0]
register_name = pathlib.Path(output_uri).stem

# Graph init
grf = dag.DAG(
Expand Down
Loading