Skip to content

Commit

Permalink
Minor enhancements and support for web links (#29)
Browse files Browse the repository at this point in the history
* Remove unneeded custom exceptions
* get a resource either by rname or rid
* Add metadata CRUD methods
* Update tests
  • Loading branch information
jkanche authored Jan 8, 2025
1 parent 59e19b6 commit c1fa09d
Show file tree
Hide file tree
Showing 6 changed files with 204 additions and 81 deletions.
6 changes: 4 additions & 2 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
# Changelog

## Version 0.6.1
## Version 0.6.1 - 0.6.2

- Generate rid's that match with R's cache.
- remove rname pattern checks.
- Remove rname pattern checks.
- Add functions to access metadata table.
- Add function to add web urls and download them if needed.
- Rename GitHub actions for consistency with the rest of the packages.

## Version 0.6.0
Expand Down
152 changes: 112 additions & 40 deletions src/pybiocfilecache/cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,20 +11,14 @@

from .config import CacheConfig
from .const import SCHEMA_VERSION
from .exceptions import (
BiocCacheError,
InvalidRnameError,
NoFpathError,
RnameExistsError,
RpathTimeoutError,
)
from .models import Base, Resource
from .models import Base, Metadata, Resource
from .utils import (
calculate_file_hash,
copy_or_move,
create_tmp_dir,
download_web_file,
generate_id,
validate_rname,
generate_uuid,
)

__author__ = "Jayaram Kancherla"
Expand Down Expand Up @@ -65,7 +59,6 @@ def __init__(self, cache_dir: Optional[Union[str, Path]] = None, config: Optiona
db_schema_version = self._setup_database()

if db_schema_version != SCHEMA_VERSION:
print(db_schema_version)
raise RuntimeError(f"Database version is not {SCHEMA_VERSION}.")

self._last_cleanup = datetime.now()
Expand Down Expand Up @@ -111,13 +104,15 @@ def _setup_database(self) -> None:

return SCHEMA_VERSION

def _get_detached_resource(self, session: Session, resource: Resource) -> Optional[Resource]:
def _get_detached_resource(
self, session: Session, obj: Union[Resource, Metadata]
) -> Optional[Union[Resource, Metadata]]:
"""Get a detached copy of a resource."""
if resource is None:
if obj is None:
return None
session.refresh(resource)
session.expunge(resource)
return resource
session.refresh(obj)
session.expunge(obj)
return obj

def __enter__(self) -> "BiocFileCache":
return self
Expand All @@ -142,10 +137,10 @@ def get_session(self) -> Iterator[Session]:
finally:
session.close()

def _validate_rname(self, rname: str) -> None:
"""Validate resource name format."""
if not validate_rname(rname, self.config.rname_pattern):
raise InvalidRnameError(f"Resource name '{rname}' doesn't match pattern " f"'{self.config.rname_pattern}'")
# def _validate_rname(self, rname: str) -> None:
# """Validate resource name format."""
# if not validate_rname(rname, self.config.rname_pattern):
# raise Exception(f"Resource name '{rname}' doesn't match pattern " f"'{self.config.rname_pattern}'")

def _should_cleanup(self) -> bool:
"""Check if cache cleanup should be performed.
Expand Down Expand Up @@ -196,24 +191,33 @@ def cleanup(self) -> int:
self._last_cleanup = datetime.now()
return removed

def get(self, rname: str) -> Optional[Resource]:
def get(self, rname: str = None, rid: str = None) -> Optional[Resource]:
"""Get resource by name from cache.
Args:
rname:
Name to identify the resource in cache.
rid:
Resource id to search by.
"""
if rname is None and rid is None:
raise ValueError("either 'rname' or 'rid' must be provided.")

with self.get_session() as session:
resource = session.query(Resource).filter(Resource.rname == rname).first()
if rname is not None:
resource = session.query(Resource).filter(Resource.rname == rname).first()
elif rid is not None:
resource = session.query(Resource).filter(Resource.rid == rid).first()

if resource is not None:
# Check if path exists with timeout
start = time()
timeout = 30
while not Path(str(resource.rpath)).exists():
if time() - start >= timeout:
raise RpathTimeoutError(
raise TimeoutError(
f"For resource: '{rname}' the rpath does not exist " f"after {timeout} seconds."
)
sleep(0.1)
Expand All @@ -229,10 +233,11 @@ def add(
self,
rname: str,
fpath: Union[str, Path],
rtype: Literal["local", "web", "relative"] = "local",
rtype: Literal["local", "web", "relative"] = "relative",
action: Literal["copy", "move", "asis"] = "copy",
expires: Optional[datetime] = None,
ext: bool = False,
download: bool = True,
ext: bool = True,
) -> Resource:
"""Add a resource to the cache.
Expand All @@ -252,29 +257,41 @@ def add(
How to handle the file ("copy", "move", or "asis").
Defaults to ``copy``.
download:
Whether to download the resource.
Only used if 'rtype' is "web".
expires:
Optional expiration datetime.
If None, resource never expires.
ext:
Whether to use filepath extension when storing in cache.
Defaults to `False`.
Defaults to `True`.
Returns:
The `Resource` object added to the cache.
"""
# self._validate_rname(rname)
fpath = Path(fpath)

if not fpath.exists():
raise NoFpathError(f"Resource at '{fpath}' does not exist")
fpath = Path(fpath) if rtype != "web" else fpath

if self.get(rname) is not None:
raise RnameExistsError(f"Resource '{rname}' already exists")
raise FileExistsError(f"Resource '{rname}' already exists")

if rtype == "web":
outpath = download_web_file(fpath, Path(fpath).name, download)
action = "copy"
else:
outpath = Path(fpath)

if action == "asis":
logger.warning("If action='asis', rtype must be 'local'.")
rtype = "local"

# Generate paths and check size
rid = generate_id(size=len(self))
rpath = self.config.cache_dir / f"{rid}{fpath.suffix if ext else ''}" if action != "asis" else fpath
uuid = generate_uuid()
rpath = self.config.cache_dir / f"{uuid}_{outpath.name if ext else outpath.stem}" if action != "asis" else fpath

# Create resource record
resource = Resource(
Expand All @@ -292,7 +309,7 @@ def add(
session.commit()

try:
copy_or_move(fpath, rpath, rname, action, False)
copy_or_move(outpath, rpath, rname, action, False)

# Calculate and store checksum
resource.etag = calculate_file_hash(rpath, self.config.hash_algorithm)
Expand All @@ -303,7 +320,7 @@ def add(
except Exception as e:
session.delete(resource)
session.commit()
raise BiocCacheError("Failed to add resource") from e
raise Exception("Failed to add resource") from e

def add_batch(self, resources: List[Dict[str, Any]]) -> List[Resource]:
"""Add multiple resources in a single transaction.
Expand Down Expand Up @@ -349,7 +366,7 @@ def update(
"""
fpath = Path(fpath)
if not fpath.exists():
raise NoFpathError(f"File '{fpath}' does not exist")
raise FileNotFoundError(f"File '{fpath}' does not exist")

with self.get_session() as session:
resource = session.query(Resource).filter(Resource.rname == rname).first()
Expand All @@ -369,7 +386,7 @@ def update(

except Exception as e:
session.rollback()
raise BiocCacheError("Failed to update resource") from e
raise Exception("Failed to update resource") from e

def remove(self, rname: str) -> None:
"""Remove a resource from cache by name.
Expand All @@ -381,7 +398,7 @@ def remove(self, rname: str) -> None:
Name to identify the resource in cache.
Raises:
BiocCacheError: If resource removal fails
Exception: If resource removal fails
"""
with self.get_session() as session:
resource = session.query(Resource).filter(Resource.rname == rname).first()
Expand All @@ -399,7 +416,7 @@ def remove(self, rname: str) -> None:

except Exception as e:
session.rollback()
raise BiocCacheError(f"Failed to remove resource '{rname}'") from e
raise Exception(f"Failed to remove resource '{rname}'") from e

def list_resources(self, rtype: Optional[str] = None, expired: Optional[bool] = None) -> List[Resource]:
"""List resources in the cache with optional filtering.
Expand Down Expand Up @@ -564,7 +581,7 @@ def purge(self, force: bool = False) -> bool:
True if purge was successful, False otherwise.
Raises:
BiocCacheError: If purge fails and force=False.
Exception: If purge fails and force=False.
"""
try:
with self.get_session() as session:
Expand All @@ -577,7 +594,7 @@ def purge(self, force: bool = False) -> bool:
except Exception as e:
if not force:
session.rollback()
raise BiocCacheError(f"Failed to remove file for resource '{resource.rname}'") from e
raise Exception(f"Failed to remove file for resource '{resource.rname}'") from e
logger.warning(f"Failed to remove file for resource '{resource.rname}': {e}")

session.commit()
Expand All @@ -598,7 +615,7 @@ def purge(self, force: bool = False) -> bool:

except Exception as e:
if not force:
raise BiocCacheError("Failed to purge cache") from e
raise Exception("Failed to purge cache") from e

logger.error("Database cleanup failed, forcing file removal", exc_info=e)
for file in self.config.cache_dir.iterdir():
Expand All @@ -616,3 +633,58 @@ def purge(self, force: bool = False) -> bool:
def __len__(self):
with self.get_session() as session:
return session.query(Resource).count()

def check_metadata_key(self, key: str) -> bool:
"""Check if a key exists in the metadata table.
Args:
key:
Key to search.
Returns:
True if the key exists, else False.
"""
with self.get_session() as session:
return session.query(Metadata).filter(Metadata.key == key).count() != 0

def get_metadata(self, key: str):
"""Add a new metadata key"""
with self.get_session() as session:
meta = session.query(Metadata).filter(Metadata.key == key).first()
if meta is not None:
return self._get_detached_resource(session, meta)

return None

def add_metadata(self, key: str, value: str):
"""Add a new metadata key"""
exists = self.get_metadata(key=key)

if exists is None:
meta = Metadata(key=key, value=value)

with self.get_session() as session:
try:
session.add(meta)
session.commit()
return self._get_detached_resource(session, meta)
except Exception as e:
session.delete(meta)
session.commit()
raise Exception("Failed to add metadata") from e
else:
raise Exception(f"'key'={key} already exists in metadata.")

def remove_metadata(self, key: str) -> None:
"""Remove a metadata key."""
with self.get_session() as session:
meta = session.query(Metadata).filter(Metadata.key == key).first()

if meta is not None:
try:
session.delete(meta)
session.commit()

except Exception as e:
session.rollback()
raise Exception(f"Failed to remove key '{key}'") from e
31 changes: 0 additions & 31 deletions src/pybiocfilecache/exceptions.py

This file was deleted.

2 changes: 1 addition & 1 deletion src/pybiocfilecache/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,4 +73,4 @@ class Resource(Base):
expires = Column(DateTime, default=None)

def __repr__(self) -> str:
return f"<Resource(rid='{self.rid}', rname='{self.rname}')>"
return f"<Resource(rid='{self.rid}', rname='{self.rname}', rpath='{self.rpath}')>"
16 changes: 13 additions & 3 deletions src/pybiocfilecache/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,13 @@
import logging
import re
import tempfile
import urllib.request
import uuid
import zlib
from pathlib import Path
from shutil import copy2, move
from typing import Literal

from .exceptions import BiocCacheError

__author__ = "Jayaram Kancherla"
__copyright__ = "Jayaram Kancherla"
__license__ = "MIT"
Expand Down Expand Up @@ -86,4 +85,15 @@ def copy_or_move(
elif action == "asis":
pass
except Exception as e:
raise BiocCacheError(f"Failed to store resource '{rname}' from '{source}' to '{target}'") from e
raise Exception(f"Failed to store resource '{rname}' from '{source}' to '{target}'") from e


def download_web_file(url: str, filename: str, download: bool):
tmp_dir = create_tmp_dir()
outpath = tmp_dir / filename
if download:
urllib.request.urlretrieve(str(url), str(outpath))
else:
open(str(outpath), "a").close()

return outpath
Loading

0 comments on commit c1fa09d

Please sign in to comment.