Skip to content

Commit

Permalink
Fix vfs.ls with access_credentials_name
Browse files Browse the repository at this point in the history
  • Loading branch information
johnkerl committed Mar 18, 2024
1 parent 5a52b42 commit afa2f06
Showing 1 changed file with 94 additions and 14 deletions.
108 changes: 94 additions & 14 deletions src/tiledb/cloud/soma/ingest.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,12 +76,13 @@ def run_ingest_workflow(
dry_run=dry_run,
)
grf.compute()
the_node = next(iter(grf.nodes.values()))
real_graph_uuid = the_node.result()
return {
"status": "started",
"graph_id": str(grf.server_graph_uuid),
"graph_id": str(real_graph_uuid),
}


def build_ingest_workflow_graph(
*,
output_uri: str,
Expand All @@ -101,13 +102,74 @@ def build_ingest_workflow_graph(
directly.
"""

grf = dag.DAG(
name="ingest-h5ad-launcher",
mode=dag.Mode.BATCH,
namespace=namespace,
)
grf.submit(
_run_ingest_workflow_udf_byval,
output_uri=output_uri,
input_uri=input_uri,
measurement_name=measurement_name,
pattern=pattern,
extra_tiledb_config=extra_tiledb_config,
platform_config=platform_config,
ingest_mode=ingest_mode,
resources=resources,
namespace=namespace,
access_credentials_name=access_credentials_name,
carry_along={
"resources": _DEFAULT_RESOURCES if resources is None else resources,
"namespace": namespace,
"access_credentials_name": access_credentials_name,
},
dry_run=dry_run,
)
return grf


def run_ingest_workflow_udf(
*,
output_uri: str,
input_uri: str,
measurement_name: str,
carry_along: Dict[str, Optional[str]], # TODO: COMMENT ME
pattern: Optional[str] = None,
extra_tiledb_config: Optional[Dict[str, object]] = None,
platform_config: Optional[Dict[str, object]] = None,
ingest_mode: str = "write",
resources: Optional[Dict[str, object]] = None,
namespace: Optional[str] = None,
access_credentials_name: Optional[str] = None,
dry_run: bool = False,
) -> Dict[str, str]:
"""
This is the highest-level ingestor component that runs on-node. Only here
can we do VFS with access_credentials_name -- that does not work correctly
on the client.
"""

logging.basicConfig(level=logging.INFO)
logging.info("ENUMERATOR ENTER")
logging.info(f"ENUMERATOR INPUT_URI {input_uri}")
logging.info(f"ENUMERATOR OUTPUT_URI {output_uri}")
logging.info(f"ENUMERATOR DRY_RUN {dry_run}")

vfs = tiledb.VFS(config=extra_tiledb_config)

if vfs.is_file(input_uri):
logging.info("ENUMERATOR VFS.IS_FILE")

if dry_run:
name = "dry-run-h5ad-file"
else:
name = "ingest-h5ad-file"

grf = dag.DAG(
name="ingest-h5ad-file",
name=name,
mode=dag.Mode.BATCH,
namespace=namespace,
namespace=carry_along["namespace"],
)
grf.submit(
_ingest_h5ad_byval,
Expand All @@ -117,15 +179,21 @@ def build_ingest_workflow_graph(
extra_tiledb_config=extra_tiledb_config,
ingest_mode=ingest_mode,
platform_config=platform_config,
resources=_DEFAULT_RESOURCES if resources is None else resources,
access_credentials_name=access_credentials_name,
resources=carry_along["resources"],
access_credentials_name=carry_along["access_credentials_name"],
dry_run=dry_run,
)
return grf

if vfs.is_dir(input_uri):
elif vfs.is_dir(input_uri):
logging.info("ENUMERATOR VFS.IS_DIR")

if dry_run:
name = "dry-run-h5ad-files"
else:
name = "ingest-h5ad-files"

grf = dag.DAG(
name="ingest-h5ad-files",
name=name,
mode=dag.Mode.BATCH,
namespace=namespace,
)
Expand All @@ -136,15 +204,18 @@ def build_ingest_workflow_graph(
)

for entry_input_uri in vfs.ls(input_uri):
logging.info(f"ENUMERATOR ENTRY_INPUT_URI={entry_input_uri}")
base = os.path.basename(entry_input_uri)
base, _ = os.path.splitext(base)

entry_output_uri = output_uri + "/" + base
if not output_uri.endswith("/"):
entry_output_uri += "/"
entry_output_uri += base
logging.info(f"ENUMERATOR ENTRY_OUTPUT_URI={entry_output_uri}")

if pattern is not None and not re.match(pattern, entry_input_uri):
logging.info(f"ENUMERATOR SKIP NO MATCH ON <<{pattern}>>")
continue

node = grf.submit(
Expand All @@ -155,15 +226,20 @@ def build_ingest_workflow_graph(
extra_tiledb_config=extra_tiledb_config,
ingest_mode=ingest_mode,
platform_config=platform_config,
resources=_DEFAULT_RESOURCES if resources is None else resources,
access_credentials_name=access_credentials_name,
resources=carry_along["resources"],
access_credentials_name=carry_along["access_credentials_name"],
dry_run=dry_run,
)
collector.depends_on(node)

return grf
else:
raise ValueError(f"input_uri {input_uri!r} is neither file nor directory")

grf.compute()
grf.wait()

raise ValueError(f"input_uri {input_uri!r} is neither file nor directory")
logging.info(f"ENUMERATOR EXIT {grf.server_graph_uuid}")
return grf.server_graph_uuid


def _hack_patch_anndata() -> ContextManager[object]:
Expand Down Expand Up @@ -208,8 +284,11 @@ def ingest_h5ad(
import tiledbsoma.logging as somalog
from tiledbsoma import io

# XXX parameterize
logging.basicConfig(level=logging.INFO)
somalog.info()
# logging.basicConfig(level=logging.DEBUG)
# somalog.debug()

# While h5ad supports any file-like object, annndata specifically
# wants only an `os.PathLike` object. The only thing it does with
Expand Down Expand Up @@ -264,5 +343,6 @@ def __getattr__(self, name: str) -> object:
# checked into tiledb-cloud-py and deployed server-side. _All_ dev work _must_
# use this idiom.
_ingest_h5ad_byval = functions.to_register_by_value(ingest_h5ad)
_run_ingest_workflow = functions.to_register_by_value(run_ingest_workflow)
_run_ingest_workflow_byval = functions.to_register_by_value(run_ingest_workflow)
_run_ingest_workflow_udf_byval = functions.to_register_by_value(run_ingest_workflow_udf)
_hack_patch_anndata_byval = functions.to_register_by_value(_hack_patch_anndata)

0 comments on commit afa2f06

Please sign in to comment.