From 0e85e44c6180b494c0dda4f3a10c1129c165aaa1 Mon Sep 17 00:00:00 2001 From: jgmoutafis Date: Fri, 26 Jul 2024 13:32:39 +0300 Subject: [PATCH 1/5] Upload folder implementation. --- src/tiledb/cloud/files/utils.py | 136 +++++++++++++++++++++++++++++++- 1 file changed, 135 insertions(+), 1 deletion(-) diff --git a/src/tiledb/cloud/files/utils.py b/src/tiledb/cloud/files/utils.py index 84ab5be6e..e8a9a83e5 100644 --- a/src/tiledb/cloud/files/utils.py +++ b/src/tiledb/cloud/files/utils.py @@ -4,19 +4,22 @@ import urllib.parse import warnings from fnmatch import fnmatch -from typing import Mapping, Optional, Tuple, Union +from typing import Dict, List, Mapping, Optional, Tuple, Union import tiledb +import tiledb.cloud import tiledb.cloud.tiledb_cloud_error as tce from tiledb.cloud import array from tiledb.cloud import client from tiledb.cloud import config +from tiledb.cloud import groups from tiledb.cloud import rest_api from tiledb.cloud import tiledb_cloud_error from tiledb.cloud._common import utils from tiledb.cloud.rest_api import ApiException as GenApiException from tiledb.cloud.rest_api import configuration from tiledb.cloud.rest_api import models +from tiledb.cloud.utilities import get_logger_wrapper def sanitize_filename(fname: str) -> str: @@ -264,3 +267,134 @@ def _auth_headers(cfg: configuration.Configuration) -> Mapping[str, object]: return {"authorization": basic} return {} # No authentication has been provided. Do nothing. + + +def upload_folder( + input_uri: str, + output_uri: str, + *, + group_uri: Optional[str] = None, + exclude_files: Optional[List[str]] = None, + flatten: bool = False, + access_credentials_name: Optional[str] = None, + config: Optional[dict] = None, + verbose: bool = False, +) -> Dict[str, Union[str, Dict[str, str]]]: + """ + Uploads a folder to TileDB Cloud. + By default respects the initial folder structure in the destination. + + :param input_uri: The URI or path of the input file. May be an ordinary path + or any URI accessible via TileDB VFS. + :param output_uri: The TileDB URI to write the file to. + :param group_uri: A TileDB Group URI to ingest folder into, defaults to None + :param exclude_files: A list of file paths to exclude from uploading, + defaults to None. + :param flatten: Flag. If set to True, the upload will flatten the folder + structure instead of recreating it. + :param access_credentials_name: If present, the name of the credentials + to use when writing the uploaded file to backend storage instead of + the defaults. + :param config: Config dictionary, defaults to None + :param verbose: Verbose logging, defaults to None + :return: A dictionary containing a report message + and an upload errors dictionary (if any) + """ + logger = get_logger_wrapper(verbose) + + # Prepare and sanitize arguments + output_uri = output_uri.strip("/") + exclude_files = exclude_files or [] + + input_uri = input_uri if input_uri.endswith(os.sep) else input_uri + os.sep + base_dir = os.path.dirname(input_uri) + base_dir = os.path.basename(base_dir) + + namespace, name = utils.split_uri(output_uri) + _, sp, acn = groups._default_ns_path_cred(namespace=namespace) + + storage_path = name if name.startswith("s3://") else sp + storage_path = f"{storage_path.strip('/')}/{base_dir}" + logger.debug("Output storage path: %s" % storage_path) + + access_credentials_name = access_credentials_name or acn + + # Group check and/or creation + if not group_uri: + logger.debug("No group_uri provided. Choosing one...") + if not name.startswith("s3://"): + group_uri = f"tiledb://{namespace}/{base_dir}" + else: + group_uri = output_uri + + group_created = False + if not tiledb.object_type(group_uri, ctx=tiledb.cloud.Ctx()) == "group": + group_namespace, group_name = utils.split_uri(group_uri) + groups.create( + name=group_name, + namespace=group_namespace, + storage_uri=storage_path, + credentials_name=access_credentials_name, + ) + group_created = True + logger.debug("Group URI: '%s' created" % group_uri) + + logger.info( + """ + ---------------------------------------------------- + Folder Upload Stats: + - Input URI: %s + - Output URI: %s + - Group URI: %s + - Created: %s + - Excluded Files: %s + - Flatten: %s + ---------------------------------------------------- + """ + % (input_uri, output_uri, group_uri, group_created, exclude_files, flatten) + ) + + vfs = tiledb.VFS(config=config) + # Create the base dir in the destination, if it does not exist. + if not vfs.is_dir(storage_path): + vfs.create_dir(storage_path) + + uploaded = 0 + dir_count = 0 + upload_errors: Dict[str, str] = {} + input_ls: List[str] = vfs.ls(input_uri, recursive=True) + for fname in input_ls: + # Skip manually excluded files/folders + if fname in exclude_files: + logger.debug("- '%s' in Excluded Files. Skipping..." % fname) + continue + + fpath = fname.split(base_dir + os.sep)[1] + if vfs.is_dir(fname): + dir_count += 1 + # Do not create nested folder if "flatten" + if flatten: + continue + + # Create any nested dir in the destination. + dir_path = f"{storage_path}/{fpath}" + if not vfs.is_dir(dir_path): + logger.debug("Creating sub-folder '%s'" % dir_path) + vfs.create_dir(dir_path) + else: + out_path, filename = os.path.split(fpath) + try: + upload_file( + input_uri=fname, + output_uri=f"{storage_path}/{out_path}", + filename=filename, + access_credentials_name=access_credentials_name, + ) + uploaded += 1 + except Exception as exc: + upload_errors[fname] = str(exc) + + return { + "msg": f"Uploaded {uploaded}/{len(input_ls) - dir_count} files", + "errors": upload_errors, + } From d7ef842bf9291f6c31c61db93ce402bbe2458e64 Mon Sep 17 00:00:00 2001 From: jgmoutafis Date: Wed, 31 Jul 2024 10:48:11 +0300 Subject: [PATCH 2/5] Upload Folder: Logging and argument improvements. --- src/tiledb/cloud/files/utils.py | 15 ++++++++++----- 1 file changed, 10 insertions(+), 5 deletions(-) diff --git a/src/tiledb/cloud/files/utils.py b/src/tiledb/cloud/files/utils.py index e8a9a83e5..1723899e9 100644 --- a/src/tiledb/cloud/files/utils.py +++ b/src/tiledb/cloud/files/utils.py @@ -4,7 +4,7 @@ import urllib.parse import warnings from fnmatch import fnmatch -from typing import Dict, List, Mapping, Optional, Tuple, Union +from typing import Dict, List, Mapping, Optional, Sequence, Tuple, Union import tiledb import tiledb.cloud @@ -274,7 +274,7 @@ def upload_folder( output_uri: str, *, group_uri: Optional[str] = None, - exclude_files: Optional[List[str]] = None, + exclude_files: Optional[Sequence[str]] = None, flatten: bool = False, access_credentials_name: Optional[str] = None, config: Optional[dict] = None, @@ -313,9 +313,10 @@ def upload_folder( namespace, name = utils.split_uri(output_uri) _, sp, acn = groups._default_ns_path_cred(namespace=namespace) - storage_path = name if name.startswith("s3://") else sp + # If `name` is a URL, assume it points to a cloud storage + storage_path = name if "://" in name else sp storage_path = f"{storage_path.strip('/')}/{base_dir}" - logger.debug("Output storage path: %s" % storage_path) + logger.debug("Output storage path: %s", storage_path) access_credentials_name = access_credentials_name or acn @@ -337,7 +338,7 @@ def upload_folder( credentials_name=access_credentials_name, ) group_created = True - logger.debug("Group URI: '%s' created" % group_uri) + logger.debug("Group URI: '%r' created", group_uri) logger.info( """ @@ -392,6 +393,10 @@ def upload_folder( ) uploaded += 1 except Exception as exc: + logger.exception( + "File '%s' while uploading to '%s' raised an exception" + % (filename, out_path) + ) upload_errors[fname] = str(exc) return { From 0c868cb0c8257e4a6ae0da4e7565caa3ed61532d Mon Sep 17 00:00:00 2001 From: jgmoutafis Date: Wed, 31 Jul 2024 10:54:44 +0300 Subject: [PATCH 3/5] Upload Folder: Missed `://` in `name` check. --- src/tiledb/cloud/files/utils.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/tiledb/cloud/files/utils.py b/src/tiledb/cloud/files/utils.py index 1723899e9..d27794f7e 100644 --- a/src/tiledb/cloud/files/utils.py +++ b/src/tiledb/cloud/files/utils.py @@ -323,7 +323,7 @@ def upload_folder( # Group check and/or creation if not group_uri: logger.debug("No group_uri provided. Choosing one...") - if not name.startswith("s3://"): + if "://" not in name: group_uri = f"tiledb://{namespace}/{base_dir}" else: group_uri = output_uri From 7d04428039eb4af35de2a4262e042472c56fa2a4 Mon Sep 17 00:00:00 2001 From: jgmoutafis Date: Wed, 14 Aug 2024 11:40:47 +0300 Subject: [PATCH 4/5] Upload Folder: Refactor according to design document and comments. --- src/tiledb/cloud/files/utils.py | 192 +++++++++++------- .../nested_in_1_test_file_1.txt | 0 .../nested_in_1_test_file_2.txt | 0 .../double_nested_test_file_1.txt | 0 .../double_nested_test_file_2.csv | 0 .../nested_in_2_test_file_1.txt | 0 .../nested_in_2_test_file_2.txt | 0 tests/data/upload_folder_test/test_file_1.txt | 0 tests/data/upload_folder_test/test_file_2.txt | 0 tests/test_file.py | 87 ++++++++ 10 files changed, 200 insertions(+), 79 deletions(-) create mode 100644 tests/data/upload_folder_test/nested_folder_1/nested_in_1_test_file_1.txt create mode 100644 tests/data/upload_folder_test/nested_folder_1/nested_in_1_test_file_2.txt create mode 100644 tests/data/upload_folder_test/nested_folder_2/double_nested_folder/double_nested_test_file_1.txt create mode 100644 tests/data/upload_folder_test/nested_folder_2/double_nested_folder/double_nested_test_file_2.csv create mode 100644 tests/data/upload_folder_test/nested_folder_2/nested_in_2_test_file_1.txt create mode 100644 tests/data/upload_folder_test/nested_folder_2/nested_in_2_test_file_2.txt create mode 100644 tests/data/upload_folder_test/test_file_1.txt create mode 100644 tests/data/upload_folder_test/test_file_2.txt diff --git a/src/tiledb/cloud/files/utils.py b/src/tiledb/cloud/files/utils.py index d27794f7e..82f2e2b18 100644 --- a/src/tiledb/cloud/files/utils.py +++ b/src/tiledb/cloud/files/utils.py @@ -1,10 +1,13 @@ import base64 +import logging import os import re import urllib.parse import warnings from fnmatch import fnmatch -from typing import Dict, List, Mapping, Optional, Sequence, Tuple, Union +from typing import Dict, List, Mapping, Optional, Tuple, Union + +import attrs import tiledb import tiledb.cloud @@ -269,42 +272,59 @@ def _auth_headers(cfg: configuration.Configuration) -> Mapping[str, object]: # No authentication has been provided. Do nothing. +@attrs.define +class UploadFoldersResults: + directory: str + report: str + errors: dict + sub_folders: list["UploadFoldersResults"] + + def upload_folder( input_uri: str, output_uri: str, *, - group_uri: Optional[str] = None, - exclude_files: Optional[Sequence[str]] = None, - flatten: bool = False, + parent_group_uri: Optional[str] = None, access_credentials_name: Optional[str] = None, config: Optional[dict] = None, + flatten: bool = False, + serializable: bool = True, + logger: Optional[logging.Logger] = None, verbose: bool = False, -) -> Dict[str, Union[str, Dict[str, str]]]: +) -> Union[dict, UploadFoldersResults]: """ Uploads a folder to TileDB Cloud. By default respects the initial folder structure in the destination. :param input_uri: The URI or path of the input file. May be an ordinary path or any URI accessible via TileDB VFS. - :param output_uri: The TileDB URI to write the file to. - :param group_uri: A TileDB Group URI to ingest folder into, defaults to None - :param exclude_files: A list of file paths to exclude from uploading, - defaults to None. - :param flatten: Flag. If set to True, the upload will flatten the folder - structure instead of recreating it. + :param output_uri: The TileDB URI to write the folder into. + :param parent_group_uri: A TileDB Group URI to add folder under, + defaults to None :param access_credentials_name: If present, the name of the credentials to use when writing the uploaded file to backend storage instead of the defaults. :param config: Config dictionary, defaults to None + :param flatten: Flag. If set to True, the upload will flatten the folder + structure instead of recreating it. (Not Implemented yet) + :param serializable: Flag. If set to True the function returns a dictionary + report. Differently it returns an UploadFoldersResults attrs class. + Defaults to True. + :param logger: A logging.Logger instance, defaults to None. :param verbose: Verbose logging, defaults to None :return: A dictionary containing a report message and an upload errors dictionary (if any) """ - logger = get_logger_wrapper(verbose) + logger = logger or get_logger_wrapper(verbose) + logger.info("=====") + + if flatten: + raise NotImplementedError( + "The option to flatten a folder structure is not yet implemented" + ) # Prepare and sanitize arguments output_uri = output_uri.strip("/") - exclude_files = exclude_files or [] input_uri = input_uri if input_uri.endswith(os.sep) else input_uri + os.sep base_dir = os.path.dirname(input_uri) @@ -316,17 +336,20 @@ def upload_folder( # If `name` is a URL, assume it points to a cloud storage storage_path = name if "://" in name else sp storage_path = f"{storage_path.strip('/')}/{base_dir}" + tb_storage_uri = f"tiledb://{namespace}/{storage_path}" logger.debug("Output storage path: %s", storage_path) + logger.debug("TileDB Storage URI: %r", tb_storage_uri) access_credentials_name = access_credentials_name or acn + logger.debug("Storage path: %r", storage_path) + logger.debug("ACN: %s", access_credentials_name) # Group check and/or creation - if not group_uri: - logger.debug("No group_uri provided. Choosing one...") - if "://" not in name: - group_uri = f"tiledb://{namespace}/{base_dir}" - else: - group_uri = output_uri + if "://" not in name: + group_uri = output_uri + else: + group_uri = f"tiledb://{namespace}/{base_dir}" + logger.debug("Group URI: %r", group_uri) group_created = False if not tiledb.object_type(group_uri, ctx=tiledb.cloud.Ctx()) == "group": @@ -335,71 +358,82 @@ def upload_folder( name=group_name, namespace=group_namespace, storage_uri=storage_path, + parent_uri=parent_group_uri, credentials_name=access_credentials_name, ) group_created = True - logger.debug("Group URI: '%r' created", group_uri) - - logger.info( - """ - ---------------------------------------------------- - Folder Upload Stats: - - Input URI: %s - - Output URI: %s - - Group URI: %s - - Created: %s - - Excluded Files: %s - - Flatten: %s - ---------------------------------------------------- - """ - % (input_uri, output_uri, group_uri, group_created, exclude_files, flatten) - ) - - vfs = tiledb.VFS(config=config) - # Create the base dir in the destination, if it does not exist. - if not vfs.is_dir(storage_path): - vfs.create_dir(storage_path) - + logger.debug("Group URI: %r created", group_uri) + + # Upload Stats + logger.info("-----") + if parent_group_uri: + logger.info("Sub-Folder %r Upload Stats", base_dir) + else: + logger.info("Folder %r Upload Stats", base_dir) + logger.info("-----") + logger.info("- Input URI: %r", input_uri) + logger.info("- Output URI: %r", output_uri) + logger.info("-- Storage Path: %r", storage_path) + logger.info("- Group URI: %r", group_uri) + logger.info("-- Group Created: %s", group_created) + + # Report results uploaded = 0 dir_count = 0 upload_errors: Dict[str, str] = {} - input_ls: List[str] = vfs.ls(input_uri, recursive=True) - for fname in input_ls: - # Skip manually excluded files/folders - if fname in exclude_files: - logger.debug("- '%s' in Excluded Files. Skipping..." % fname) - continue - - fpath = fname.split(base_dir + os.sep)[1] - if vfs.is_dir(fname): - dir_count += 1 - # Do not create nested folder if "flatten" - if flatten: - continue - - # Create any nested dir in the destination. - dir_path = f"{storage_path}/{fpath}" - if not vfs.is_dir(dir_path): - logger.debug("Creating sub-folder '%s'" % dir_path) - vfs.create_dir(dir_path) - else: - out_path, filename = os.path.split(fpath) - try: - upload_file( - input_uri=fname, - output_uri=f"{storage_path}/{out_path}", - filename=filename, - access_credentials_name=access_credentials_name, - ) - uploaded += 1 - except Exception as exc: - logger.exception( - "File '%s' while uploading to '%s' raised an exception" - % (filename, out_path) + results = UploadFoldersResults( + directory=base_dir, report="", errors={}, sub_folders=[] + ) + + vfs = tiledb.VFS(config=config) + # List local folder + input_ls: List[str] = vfs.ls(input_uri) + with tiledb.Group( + group_uri, mode="w", config=config, ctx=tiledb.cloud.Ctx() + ) as grp: + for fname in input_ls: + if vfs.is_dir(fname): + dir_count += 1 + + results.sub_folders.append( + upload_folder( + input_uri=fname, + output_uri=tb_storage_uri, + parent_group_uri=group_uri, + access_credentials_name=access_credentials_name, + config=config, + # Serialization concerns the final results. + serializable=False, + logger=logger, + verbose=verbose, + ) ) - upload_errors[fname] = str(exc) + else: + logger.info("Uploading %r to Group %r" % (fname, group_uri)) + filename = fname.split(base_dir + os.sep)[1] + tb_output_uri = f"{tb_storage_uri}/{filename}" + try: + uploaded_uri = upload_file( + input_uri=fname, + output_uri=tb_output_uri, + filename=filename, + access_credentials_name=access_credentials_name, + ) + grp.add(uploaded_uri) + uploaded += 1 + except Exception as exc: + logger.exception( + "File %r while uploading to %r raised an exception" + % (filename, tb_output_uri) + ) + upload_errors[fname] = str(exc) + continue + + results.report = f"Uploaded {uploaded}/{len(input_ls) - dir_count} files" + results.errors = upload_errors + logger.info(results.report) + logger.info("=====") - return { - "msg": f"Uploaded {uploaded}/{len(input_ls) - dir_count} files", - "errors": upload_errors, - } + if serializable: + return attrs.asdict(results) + return results diff --git a/tests/data/upload_folder_test/nested_folder_1/nested_in_1_test_file_1.txt b/tests/data/upload_folder_test/nested_folder_1/nested_in_1_test_file_1.txt new file mode 100644 index 000000000..e69de29bb diff --git a/tests/data/upload_folder_test/nested_folder_1/nested_in_1_test_file_2.txt b/tests/data/upload_folder_test/nested_folder_1/nested_in_1_test_file_2.txt new file mode 100644 index 000000000..e69de29bb diff --git a/tests/data/upload_folder_test/nested_folder_2/double_nested_folder/double_nested_test_file_1.txt b/tests/data/upload_folder_test/nested_folder_2/double_nested_folder/double_nested_test_file_1.txt new file mode 100644 index 000000000..e69de29bb diff --git a/tests/data/upload_folder_test/nested_folder_2/double_nested_folder/double_nested_test_file_2.csv b/tests/data/upload_folder_test/nested_folder_2/double_nested_folder/double_nested_test_file_2.csv new file mode 100644 index 000000000..e69de29bb diff --git a/tests/data/upload_folder_test/nested_folder_2/nested_in_2_test_file_1.txt b/tests/data/upload_folder_test/nested_folder_2/nested_in_2_test_file_1.txt new file mode 100644 index 000000000..e69de29bb diff --git a/tests/data/upload_folder_test/nested_folder_2/nested_in_2_test_file_2.txt b/tests/data/upload_folder_test/nested_folder_2/nested_in_2_test_file_2.txt new file mode 100644 index 000000000..e69de29bb diff --git a/tests/data/upload_folder_test/test_file_1.txt b/tests/data/upload_folder_test/test_file_1.txt new file mode 100644 index 000000000..e69de29bb diff --git a/tests/data/upload_folder_test/test_file_2.txt b/tests/data/upload_folder_test/test_file_2.txt new file mode 100644 index 000000000..e69de29bb diff --git a/tests/test_file.py b/tests/test_file.py index 7567e1e06..64cacf2f7 100644 --- a/tests/test_file.py +++ b/tests/test_file.py @@ -2,10 +2,12 @@ import os import pathlib import tempfile +import time import unittest from typing import List import tiledb +import tiledb.cloud from tiledb.cloud import array from tiledb.cloud import client from tiledb.cloud import groups @@ -190,6 +192,91 @@ def test_round_trip(self): array.delete_array(uri) +class UploadFolderTest(unittest.TestCase): + @classmethod + def setUpClass(cls): + cls.ASSERT_EXIST_TRIES = 5 + cls.folder_basename = "upload_folder_test" + cls.test_folder = os.path.join(CURRENT_DIR, "data", cls.folder_basename) + cls.namespace, cls.storage_path, cls.acn = groups._default_ns_path_cred() + return super().setUpClass() + + def setUp(self): + self.group_uri = ( + f"tiledb://{self.namespace}/{testonly.random_name('upload_folder')}" + ) + return super().setUp() + + def tearDown(self): + tiledb.cloud.groups.delete(self.group_uri, recursive=True) + return super().tearDown() + + def group_info(self, uri: str): + for _ in range(self.ASSERT_EXIST_TRIES): + try: + return groups.info(uri) + except Exception: + pass + time.sleep(1) + self.fail(f"Group '{uri}' does not exist") + + def test_upload_folder(self): + report = file_utils.upload_folder( + input_uri=self.test_folder, + output_uri=self.group_uri, + access_credentials_name=self.acn, + serializable=True, + ) + + info = self.group_info(self.group_uri) + self.assertEqual(info.size, 4) # 2 sub-folders, 2 files + + base_storage_uri = f"{self.storage_path}/{self.folder_basename}" + info = self.group_info(f"tiledb://{self.namespace}/nested_folder_1") + self.assertEqual(info.size, 2) # 2 files + self.assertEqual(info.uri, f"{base_storage_uri}/nested_folder_1") + + info = self.group_info(f"tiledb://{self.namespace}/nested_folder_2") + self.assertEqual(info.size, 3) # 1 sub-folder, 2 files + self.assertEqual(info.uri, f"{base_storage_uri}/nested_folder_2") + + info = self.group_info(f"tiledb://{self.namespace}/double_nested_folder") + self.assertEqual(info.size, 2) # 2 files + self.assertEqual( + info.uri, f"{base_storage_uri}/nested_folder_2/double_nested_folder" + ) + + self.assertDictEqual( + report, + { + "directory": self.folder_basename, + "report": "Uploaded 2/2 files", + "errors": {}, + "sub_folders": [ + { + "directory": "nested_folder_1", + "report": "Uploaded 2/2 files", + "errors": {}, + "sub_folders": [], + }, + { + "directory": "nested_folder_2", + "report": "Uploaded 2/2 files", + "errors": {}, + "sub_folders": [ + { + "directory": "double_nested_folder", + "report": "Uploaded 2/2 files", + "errors": {}, + "sub_folders": [], + } + ], + }, + ], + }, + ) + + class TestFileIngestion(unittest.TestCase): @classmethod def setUpClass(cls) -> None: From 1f9ccf2494665f488ae8fabd6e998e010489e392 Mon Sep 17 00:00:00 2001 From: jgmoutafis Date: Wed, 21 Aug 2024 16:23:21 +0300 Subject: [PATCH 5/5] Fix minor docstring issue. --- src/tiledb/cloud/files/utils.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/tiledb/cloud/files/utils.py b/src/tiledb/cloud/files/utils.py index 469fa7697..5e3e092c9 100644 --- a/src/tiledb/cloud/files/utils.py +++ b/src/tiledb/cloud/files/utils.py @@ -296,7 +296,7 @@ def upload_folder( Uploads a folder to TileDB Cloud. By default respects the initial folder structure in the destination. - :param input_uri: The URI or path of the input file. May be an ordinary path + :param input_uri: The URI or path of the input folder. May be an ordinary path or any URI accessible via TileDB VFS. :param output_uri: The TileDB URI to write the folder into. :param parent_group_uri: A TileDB Group URI to add folder under,