Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Upload folder implementation. #618

Open
wants to merge 7 commits into
base: main
Choose a base branch
from
136 changes: 135 additions & 1 deletion src/tiledb/cloud/files/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,22 @@
import urllib.parse
import warnings
from fnmatch import fnmatch
from typing import Mapping, Optional, Tuple, Union
from typing import Dict, List, Mapping, Optional, Tuple, Union

import tiledb
import tiledb.cloud
import tiledb.cloud.tiledb_cloud_error as tce
from tiledb.cloud import array
from tiledb.cloud import client
from tiledb.cloud import config
from tiledb.cloud import groups
from tiledb.cloud import rest_api
from tiledb.cloud import tiledb_cloud_error
from tiledb.cloud._common import utils
from tiledb.cloud.rest_api import ApiException as GenApiException
from tiledb.cloud.rest_api import configuration
from tiledb.cloud.rest_api import models
from tiledb.cloud.utilities import get_logger_wrapper


def sanitize_filename(fname: str) -> str:
Expand Down Expand Up @@ -264,3 +267,134 @@ def _auth_headers(cfg: configuration.Configuration) -> Mapping[str, object]:
return {"authorization": basic}
return {}
# No authentication has been provided. Do nothing.


def upload_folder(
input_uri: str,
output_uri: str,
*,
group_uri: Optional[str] = None,
exclude_files: Optional[List[str]] = None,
JohnMoutafis marked this conversation as resolved.
Show resolved Hide resolved
flatten: bool = False,
access_credentials_name: Optional[str] = None,
config: Optional[dict] = None,
verbose: bool = False,
) -> Dict[str, Union[str, Dict[str, str]]]:
"""
Uploads a folder to TileDB Cloud.
By default respects the initial folder structure in the destination.

:param input_uri: The URI or path of the input file. May be an ordinary path
or any URI accessible via TileDB VFS.
:param output_uri: The TileDB URI to write the file to.
:param group_uri: A TileDB Group URI to ingest folder into, defaults to None
:param exclude_files: A list of file paths to exclude from uploading,
defaults to None.
:param flatten: Flag. If set to True, the upload will flatten the folder
structure instead of recreating it.
:param access_credentials_name: If present, the name of the credentials
to use when writing the uploaded file to backend storage instead of
the defaults.
:param config: Config dictionary, defaults to None
:param verbose: Verbose logging, defaults to None
:return: A dictionary containing a report message
and an upload errors dictionary (if any)
"""
logger = get_logger_wrapper(verbose)

# Prepare and sanitize arguments
output_uri = output_uri.strip("/")
exclude_files = exclude_files or []

input_uri = input_uri if input_uri.endswith(os.sep) else input_uri + os.sep
base_dir = os.path.dirname(input_uri)
base_dir = os.path.basename(base_dir)

namespace, name = utils.split_uri(output_uri)
_, sp, acn = groups._default_ns_path_cred(namespace=namespace)

storage_path = name if name.startswith("s3://") else sp
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are we treating Amazon URIs specially here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am now checking if the name contains :// will that suffice?

storage_path = f"{storage_path.strip('/')}/{base_dir}"
logger.debug("Output storage path: %s" % storage_path)
JohnMoutafis marked this conversation as resolved.
Show resolved Hide resolved

access_credentials_name = access_credentials_name or acn

# Group check and/or creation
if not group_uri:
logger.debug("No group_uri provided. Choosing one...")
if not name.startswith("s3://"):
group_uri = f"tiledb://{namespace}/{base_dir}"
else:
group_uri = output_uri

group_created = False
if not tiledb.object_type(group_uri, ctx=tiledb.cloud.Ctx()) == "group":
group_namespace, group_name = utils.split_uri(group_uri)
groups.create(
JohnMoutafis marked this conversation as resolved.
Show resolved Hide resolved
name=group_name,
namespace=group_namespace,
storage_uri=storage_path,
credentials_name=access_credentials_name,
)
group_created = True
logger.debug("Group URI: '%s' created" % group_uri)
JohnMoutafis marked this conversation as resolved.
Show resolved Hide resolved

logger.info(
"""
----------------------------------------------------
Folder Upload Stats:
- Input URI: %s
- Output URI: %s
- Group URI: %s
- Created: %s
- Excluded Files: %s
- Flatten: %s
----------------------------------------------------
"""
% (input_uri, output_uri, group_uri, group_created, exclude_files, flatten)
)
JohnMoutafis marked this conversation as resolved.
Show resolved Hide resolved

vfs = tiledb.VFS(config=config)
# Create the base dir in the destination, if it does not exist.
if not vfs.is_dir(storage_path):
JohnMoutafis marked this conversation as resolved.
Show resolved Hide resolved
vfs.create_dir(storage_path)

uploaded = 0
dir_count = 0
upload_errors: Dict[str, str] = {}
input_ls: List[str] = vfs.ls(input_uri, recursive=True)
for fname in input_ls:
# Skip manually excluded files/folders
if fname in exclude_files:
logger.debug("- '%s' in Excluded Files. Skipping..." % fname)
continue

fpath = fname.split(base_dir + os.sep)[1]
if vfs.is_dir(fname):
dir_count += 1
# Do not create nested folder if "flatten"
if flatten:
continue

# Create any nested dir in the destination.
dir_path = f"{storage_path}/{fpath}"
if not vfs.is_dir(dir_path):
logger.debug("Creating sub-folder '%s'" % dir_path)
vfs.create_dir(dir_path)
else:
out_path, filename = os.path.split(fpath)
try:
upload_file(
input_uri=fname,
output_uri=f"{storage_path}/{out_path}",
filename=filename,
access_credentials_name=access_credentials_name,
)
uploaded += 1
except Exception as exc:
upload_errors[fname] = str(exc)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why are we turning the exception here into a string? it strips away any useful information that might have been included.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I prefer the result to be serializable.


return {
"msg": f"Uploaded {uploaded}/{len(input_ls) - dir_count} files",
"errors": upload_errors,
}
JohnMoutafis marked this conversation as resolved.
Show resolved Hide resolved
Loading