Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement fetch operations #4

Merged
merged 8 commits into from
May 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ install_requires =
importlib-metadata; python_version<"3.8"
requests
filelock
jsonschema


[options.packages.find]
Expand Down
9 changes: 9 additions & 0 deletions src/gypsum_client/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,14 @@
del version, PackageNotFoundError

from .auth import access_token, set_access_token
from .fetch_assets import (
fetch_latest,
fetch_manifest,
fetch_permissions,
fetch_quota,
fetch_summary,
fetch_usage,
)
from .fetch_metadata_schema import fetch_metadata_schema
from .list_assets import list_assets, list_files, list_projects, list_versions
from .s3_config import public_s3_config
116 changes: 111 additions & 5 deletions src/gypsum_client/_utils.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,14 @@
import json
import os
import shutil
import tempfile
from datetime import datetime, timezone
from pathlib import Path
from typing import Optional
from urllib.parse import quote_plus

import requests
from filelock import FileLock

__author__ = "Jayaram Kancherla"
__copyright__ = "Jayaram Kancherla"
Expand Down Expand Up @@ -35,12 +41,10 @@ def _cache_directory(dir: Optional[str] = None):
if dir is None:
return current
else:
if not os.path.exists(current):
raise FileNotFoundError(
f"Path {current} does not exist or is not accessible."
)
if not os.path.exists(dir):
raise FileNotFoundError(f"Path {dir} does not exist or is not accessible.")

return current
return dir


def _remove_slash_url(url: str):
Expand Down Expand Up @@ -77,3 +81,105 @@ def _list_for_prefix(
resp = [_remove_slash_url(val) for val in resp if not val.startswith("..")]

return resp


def _fetch_json(path: str, url: str):
full_url = f"{url}/file/{quote_plus(path)}"

req = requests.get(full_url)
req.raise_for_status()

return req.json()


BUCKET_CACHE_NAME = "bucket"


def _fetch_cacheable_json(
project: str,
asset: str,
version: str,
path: str,
cache: str,
url: str,
overwrite: bool,
):
bucket_path = f"{project}/{asset}/{version}/{path}"

if cache is None:
return _fetch_json(bucket_path, url=url)
else:
_out_path = os.path.join(
cache, BUCKET_CACHE_NAME, project, asset, version, path
)

_save_file(bucket_path, destination=_out_path, overwrite=overwrite, url=url)

with open(_out_path, "r") as jf:
return json.load(jf)


def _save_file(
path: str, destination: str, overwrite: bool, url: str, error: bool = True
):
if overwrite is True or not os.path.exists(destination):
os.makedirs(os.path.dirname(destination), exist_ok=True)

_lock = FileLock(destination)
with _lock:
with tempfile.NamedTemporaryFile(
dir=os.path.dirname(destination), delete=False
) as tmp_file:
try:
full_url = f"{url}/file/{quote_plus(path)}"

req = requests.get(full_url, stream=True)
req.raise_for_status()

for chunk in req.iter_content(chunk_size=None):
tmp_file.write(chunk)
except Exception as e:
if error:
raise Exception(f"Failed to save '{path}'; {str(e)}.")
else:
return False

# Rename the temporary file to the destination
shutil.move(tmp_file.name, destination)

return True


def _cast_datetime(x):
if x.endswith("Z"):
x = x[:-1]

# Remove fractional seconds.
x = x.split(".")[0]

return datetime.strptime(x, "%Y-%m-%dT%H:%M:%S").astimezone(tz=timezone.utc)


def _rename_file(src: str, dest: str):
try:
os.rename(src, dest)
except OSError:
try:
# If renaming fails, try copying
shutil.copy(src, dest)
os.remove(src) # Remove the original file after copying
except Exception as e:
raise RuntimeError(
f"Cannot move temporary file for '{src}' to its destination '{dest}': {e}."
)


def _download_and_rename_file(url: str, dest: str):
tmp = tempfile.NamedTemporaryFile(dir=os.path.dirname(dest), delete=False).name
req = requests.get(url, stream=True)

with open(tmp, "wb") as f:
for chunk in req.iter_content():
f.write(chunk)

_rename_file(tmp, dest)
4 changes: 2 additions & 2 deletions src/gypsum_client/auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ def _token_cache_path(cache_dir):

def access_token(
full: bool = False, request: bool = True, cache_dir: Optional[str] = None
):
) -> Optional[str]:
"""Get GitHub access token for authentication to the gypsum API's.

Args:
Expand Down Expand Up @@ -86,7 +86,7 @@ def set_access_token(
github_url: str = "https://api.github.com",
user_agent: Optional[str] = None,
cache_dir: Optional[str] = None,
):
) -> dict:
"""Set GitHub access token for authentication to the gypsum API's.

Args:
Expand Down
220 changes: 220 additions & 0 deletions src/gypsum_client/fetch_assets.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,220 @@
import os

from ._utils import (
BUCKET_CACHE_NAME,
_cache_directory,
_cast_datetime,
_fetch_cacheable_json,
_fetch_json,
_rest_url,
)

__author__ = "Jayaram Kancherla"
__copyright__ = "Jayaram Kancherla"
__license__ = "MIT"


def fetch_latest(project: str, asset: str, url: str = _rest_url()) -> str:
"""Fetch the latest version of a project's asset.

Args:
project:
Project name.

asset:
Asset name.

url:
URL to the gypsum compatible API.

Returns:
Latest version of the project.
"""
resp = _fetch_json(f"{project}/{asset}/..latest", url=url)
return resp["version"]


def fetch_manifest(
project: str,
asset: str,
version: str,
cache_dir: str = _cache_directory(),
overwrite: bool = False,
url: str = _rest_url(),
) -> dict:
"""Fetch the manifest for a version of an asset of a project.

Args:
project:
Project name.

asset:
Asset name.

version:
Version name.

cache_dir:
Path to the cache directory.

overwrite:
Whether to overwrite existing file in cache.

url:
URL to the gypsum compatible API.

Returns:
Dictionary containing the manifest for this version.
Each element is named after the relative path of a file in this version.
The value of each element is another list with the following fields:
- ``size``, an integer specifying the size of the file in bytes.
- ``md5sum``, a string containing the hex-encoded MD5 checksum of the file.
- Optional ``link``, a list specifying the link destination for a file.

This contains the strings ``project``, ``asset``, ``version`` and ``path``.
If the link destination is itself a link, an ``ancestor`` list will be
present that specifies the final location of the file after resolving all intermediate links.
"""
return _fetch_cacheable_json(
project,
asset,
version,
"..manifest",
url=url,
cache=cache_dir,
overwrite=overwrite,
)


def fetch_permissions(project: str, url: str = _rest_url()) -> list:
"""Fetch the permissions for a project.

Args:
project:
Project name.

url:
URL to the gypsum compatible API.

Returns:
Dictionary containing the permissions for this project:
- ``owners``, a character vector containing the GitHub users or
organizations that are owners of this project.
- ``uploaders``, a list of lists specifying the users or organizations
who are authorzied to upload to this project.
Each entry is a list with the following fields:
- ``id``, a string containing the GitHub user or organization
that is authorized to upload.
- Optional ``asset``, a string containing the name of the asset
that the uploader is allowed to upload to. If not provided, there is no
restriction on the uploaded asset name.
- Optional ``version``, a string containing the name of the version
that the uploader is allowed to upload to.If not provided, there is
no restriction on the uploaded version name.
- Optional ``until``a POSIXct object containing the expiry date of this
authorization. If not provided, the authorization does not expire.
- Optional ``trusted``, whether the uploader is trusted.
If not provided, defaults to False.
"""
perms = _fetch_json(f"{project}/..permissions", url=url)

for i, val in enumerate(perms["uploaders"]):
if "until" in val:
perms["uploaders"][i]["until"] = _cast_datetime(val["until"])

return perms


def fetch_quota(project: str, url: str = _rest_url()):
"""Fetch the quota details for a project.

Args:
project:
Project name.

url:
URL to the gypsum compatible API.

Returns:
Dictionary containing ``baseline``, the baseline quota at time zero in bytes;
``growth_rate``, the annual growth rate for the quota in bytes;
``year``, the creation year (i.e., time zero) for this project.
"""
return _fetch_json(f"{project}/..quota", url=url)


def fetch_summary(
project: str,
asset: str,
version: str,
cache_dir: str = _cache_directory(),
overwrite: bool = False,
url: str = _rest_url(),
):
"""Fetch the summary for a version of an asset of a project.

Args:
project:
Project name.

asset:
Asset name.

version:
Version name.

cache_dir:
Path to the cache directory.

overwrite:
Whether to overwrite existing file in cache.

url:
URL to the gypsum compatible API.

Returns:
Dictionary containing the summary for this version, with the following fields:
- ``upload_user_id``, string containing the identity of the uploader.
- ``upload_start``, a POSIXct object containing the upload start time.
- ``upload_finish``, a POSIXct object containing the upload finish time.
- ``on_probation`` (optional), a logical scalar indicating whether the upload is probational.
If missing, this can be assumed to be False.
"""
_out = _fetch_cacheable_json(
project,
asset,
version,
"..summary",
cache=cache_dir,
overwrite=overwrite,
url=url,
)

_out["upload_start"] = _cast_datetime(_out["upload_start"])
_out["upload_finish"] = _cast_datetime(_out["upload_finish"])

if "on_probation" in _out:
if _out["on_probation"] is True and cache_dir is not None:
_out_path = os.path.join(
cache_dir, BUCKET_CACHE_NAME, project, asset, version, "..summary"
)
os.unlink(_out_path)

return _out


def fetch_usage(project: str, url: str = _rest_url()):
"""Fetch the quota usage for a project.

Args:
project:
Project name.

url:
URL to the gypsum compatible API.

Returns:
Numeric scalar specifying the quota usage for the project, in bytes.
"""
_usage = _fetch_json(f"{project}/..usage", url=url)
return _usage["total"]
Loading
Loading