Skip to content

Commit

Permalink
Osdf cache support (#221)
Browse files Browse the repository at this point in the history
* Add functionality for downloading and verifying OSDF artifacts from caches with MD5 hash validation

- Add optional argument `--cache` for specifying the OSDF cache path.
- Implement `download_and_verify_file` function to download a file,
write it to disk, and verify its MD5 hash.
- Identify Hash recorded in MLMD file.
- If Calculated hash and recorded hash match, cmf artifact pull
  is successfull
- If Cache is not specified or hashes don't match, pull from origin and
  assume it is correct

* Updated documentation

- Added --cache as optional argument in `cmf_client.md`
- If cache or redirector URL is not specified, cmf will fetch
pulls from the origin recorded in the MLMD file

* Improved handling of user submitted OSDF cache URL

- Parse user supplied cache to its schema+netloc only. Skip the path
- Assumption is that the path from MLMD will be more accurate
- cached_url is the netlocation of the supplied cache and path from the MLMD
  records
  • Loading branch information
atripathy86 authored Dec 16, 2024
1 parent 239dc51 commit bf9c559
Show file tree
Hide file tree
Showing 6 changed files with 163 additions and 38 deletions.
16 changes: 12 additions & 4 deletions cmflib/cmf.py
Original file line number Diff line number Diff line change
Expand Up @@ -2137,6 +2137,7 @@ def cmf_init(type: str = "",
password: str = "",
port: int = 0,
osdf_path: str = "",
osdf_cache: str = "",
key_id: str = "",
key_path: str = "",
key_issuer: str = "",
Expand Down Expand Up @@ -2169,7 +2170,12 @@ def cmf_init(type: str = "",
session_token: Session token for AmazonS3.
user: SSH remote username.
password: SSH remote password.
port: SSH remote port
port: SSH remote port.
osdf_path: OSDF Origin Path.
osdf_cache: OSDF Cache Path (Optional).
key_id: OSDF Key ID.
key_path: OSDF Private Key Path.
key_issuer: OSDF Key Issuer URL.
Returns:
Output based on the initialized repository type.
"""
Expand All @@ -2196,6 +2202,7 @@ def cmf_init(type: str = "",
'user': user,
'password': password,
'osdf_path': osdf_path,
'osdf_cache': osdf_cache,
'key_id': key_id,
'key_path': key_path,
'key-issuer': key_issuer,
Expand Down Expand Up @@ -2265,10 +2272,11 @@ def cmf_init(type: str = "",

return output

elif type == "osdfremote" and osdf_path != "" and key_id != "" and key_path != 0 and key_issuer != "" and git_remote_url != "":
elif type == "osdfremote" and osdf_path != "" and key_id != "" and key_path != "" and key_issuer != "" and git_remote_url != "":
"""Initialize osdfremote repository"""
output = _init_osdfremote(
osdf_path,
osdf_cache,
key_id,
key_path,
key_issuer,
Expand All @@ -2293,10 +2301,10 @@ def non_related_args(type : str, args : dict):
minioS3=["url", "endpoint_url", "access_key_id", "secret_key", "git_remote_url"]
amazonS3=["url", "access_key_id", "secret_key", "session_token", "git_remote_url"]
sshremote=["path", "user", "port", "password", "git_remote_url"]
osdfremote=["osdf_path", "key_id", "key_path", "key-issuer", "git_remote_url"]
osdfremote=["osdf_path", "osdf_cache", "key_id", "key_path", "key-issuer", "git_remote_url"]


dict_repository_args={"local" : local, "minioS3" : minioS3, "amazonS3" : amazonS3, "sshremote" : sshremote}
dict_repository_args={"local" : local, "minioS3" : minioS3, "amazonS3" : amazonS3, "sshremote" : sshremote, "osdfremote": osdfremote}

for repo,arg in dict_repository_args.items():
if repo ==type:
Expand Down
4 changes: 3 additions & 1 deletion cmflib/cmf_commands_wrapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -252,13 +252,15 @@ def _init_sshremote(path,user, port, password, git_remote_url, cmf_server_url, n
print(msg)
return msg

def _init_osdfremote(path, key_id, key_path, key_issuer, git_remote_url, cmf_server_url, neo4j_user, neo4j_password, neo4j_uri):
def _init_osdfremote(path, cache, key_id, key_path, key_issuer, git_remote_url, cmf_server_url, neo4j_user, neo4j_password, neo4j_uri):
cli_args = cli.parse_args(
[
"init",
"osdf",
"--path",
path,
"--cache",
cache,
"--key-id",
key_id,
"--key-path",
Expand Down
20 changes: 16 additions & 4 deletions cmflib/commands/artifact/pull.py
Original file line number Diff line number Diff line change
Expand Up @@ -151,10 +151,11 @@ def search_artifact(self, input_dict):
continue
# Splitting the 'name' using ':' as the delimiter and storing the first argument in the 'name' variable.
name = name.split(":")[0]
artifact_hash = name = name.split(":")[1]
# Splitting the path on '/' to extract the file name, excluding the directory structure.
file_name = name.split('/')[-1]
if file_name == self.args.artifact_name:
return name, url
return name, url, artifact_hash
else:
pass

Expand Down Expand Up @@ -201,7 +202,8 @@ def run(self):
) # getting all artifacts with id
temp_dict = dict(zip(get_artifacts['name'], get_artifacts['url'])) # getting dictionary of name and url pair
name_url_dict.update(temp_dict) # updating name_url_dict with temp_dict
# print(name_url_dict)

#print(name_url_dict)
# name_url_dict = ('artifacts/parsed/test.tsv:6f597d341ceb7d8fbbe88859a892ef81', 'Test-env:/home/sharvark/local-storage/6f/597d341ceb7d8fbbe88859a892ef81'
# name_url_dict = ('artifacts/parsed/test.tsv:6f597d341ceb7d8fbbe88859a892ef81', 'Test-env:/home/sharvark/local-storage/6f/597d341ceb7d8fbbe88859a892ef81,Second-env:/home/sharvark/local-storage/6f/597d341ceb7d8fbbe88859a892ef81')

Expand All @@ -216,6 +218,7 @@ def run(self):
output = self.search_artifact(name_url_dict)
# output[0] = name
# output[1] = url
# output[2] = hash
if output is None:
print(f"{self.args.artifact_name} doesn't exist.")
else:
Expand Down Expand Up @@ -322,36 +325,45 @@ def run(self):
#Need to write to cmfconfig with new credentials
#CmfConfig.write_config(cmf_config, "osdf", attr_dict, True)
#Now Ready to do dvc pull
cache_path=cmf_config["osdf-cache"]

osdfremote_class_obj = osdf_artifacts.OSDFremoteArtifacts()
if self.args.artifact_name:
output = self.search_artifact(name_url_dict)
# output[0] = name
# output[1] = url
# output[3]=artifact_hash
if output is None:
print(f"{self.args.artifact_name} doesn't exist.")
else:
args = self.extract_repo_args("osdf", output[0], output[1], current_directory)
#print(f"Hash for the artifact {self.args.artifact_name} is {output[3]}")
stmt = osdfremote_class_obj.download_artifacts(
dvc_config_op,
args[0], # s_url of the artifact
cache_path,
current_directory,
args[1], # download_loc of the artifact
args[2] # name of the artifact
args[2], # name of the artifact
output[3] #Artifact Hash
)
print(stmt)
else:
for name, url in name_url_dict.items():
#print(name, url)
if not isinstance(url, str):
continue
artifact_hash = name.split(':')[1] #Extract Hash of the artifact from name
#print(f"Hash for the artifact {name} is {artifact_hash}")
args = self.extract_repo_args("osdf", name, url, current_directory)
stmt = osdfremote_class_obj.download_artifacts(
dvc_config_op,
args[0], # host,
cache_path,
current_directory,
args[1], # remote_loc of the artifact
args[2] # name
args[2], # name
artifact_hash #Artifact Hash
)
print(stmt)
return "Done"
Expand Down
9 changes: 9 additions & 0 deletions cmflib/commands/init/osdfremote.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ def run(self):

attr_dict = {}
attr_dict["path"] = self.args.path
attr_dict["cache"] = self.args.cache
attr_dict["key_id"] = self.args.key_id
attr_dict["key_path"] = self.args.key_path
attr_dict["key_issuer"] = self.args.key_issuer
Expand Down Expand Up @@ -127,6 +128,14 @@ def add_parser(subparsers, parent_parser):
default=argparse.SUPPRESS,
)

parser.add_argument(
"--cache",
help="Specify FQDN for OSDF cache path including port and path. For Ex. https://osdf-director.osg-htc.org/nrp/fdp/",
metavar="<cache>",
#default="https://osdf-director.osg-htc.org/nrp/fdp/",
default="",
)

required_arguments.add_argument(
"--key-id",
required=True,
Expand Down
146 changes: 119 additions & 27 deletions cmflib/storage_backends/osdf_artifacts.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,91 @@
import requests
#import urllib3
#urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
import hashlib
import time
from urllib.parse import urlparse

def generate_cached_url(url, cache):
#This takes host URL as supplied from MLMD records and generates cached URL=cache_path + path
#Example Input: https://sdsc-origin.nationalresearchplatform.org:8443/nrp/fdp/23/6d9502e0283d91f689d7038b8508a2
#Example Output: https://osdf-director.osg-htc.org/nrp/fdp/23/6d9502e0283d91f689d7038b8508a2
#The assumption is that url obtained from MLMD is more accurate. So we use the path from this URL and append it to cache path
#but we clean up the cache path to only its scheme + netloc: https://osdf-director.osg-htc.org
parsed_url = urlparse(url)
parsed_cache_url= urlparse(cache)
cached_url= parsed_cache_url.scheme + "://" + parsed_cache_url.netloc + parsed_url.path
return cached_url

def calculate_md5_from_file(file_path, chunk_size=8192):
md5 = hashlib.md5()
try:
with open(file_path, 'rb') as f:
while chunk := f.read(chunk_size):
md5.update(chunk)
except Exception as e:
print(f"An error occurred while reading the file: {e}")
return None
return md5.hexdigest()

def download_and_verify_file(host, headers, remote_file_path, local_path, artifact_hash, timeout):
print(f"Fetching artifact={local_path}, surl={host} to {remote_file_path}")
data= None
try:
response = requests.get(host, headers=headers, timeout=timeout, verify=True) # This should be made True. otherwise this will produce Insecure SSL Warning
if response.status_code == 200 and response.content:
data = response.content
else:
return False, "No data received from the server."
#pass
except requests.exceptions.Timeout:
return False, "The request timed out."
#pass
except Exception as exception:
return False, str(exception)

if data is not None:
try:
with open(remote_file_path, 'wb') as file:
file.write(data)
if os.path.exists(remote_file_path) and os.path.getsize(remote_file_path) > 0:
# Calculate MD5 hash of the downloaded file
start_time = time.time()
md5_hash = calculate_md5_from_file(remote_file_path)
end_time = time.time()
time_taken = end_time - start_time
if md5_hash:
#print(f"MD5 hash of the downloaded file is: {md5_hash}")
#print(f"Time taken to calculate MD5 hash: {time_taken:.2f} seconds")
if artifact_hash == md5_hash:
#print("MD5 hash of the downloaded file matches the hash in MLMD records.")
stmt = f"object {local_path} downloaded at {remote_file_path} in {time_taken:.2f} seconds and matches MLMD records."
success=True
else:
#print("Error: MD5 hash of the downloaded file does not match the hash in MLMD records.")
stmt = f"object {local_path} downloaded at {remote_file_path} in {time_taken:.2f} seconds and does NOT match MLMD records."
success=False
return success, stmt
else:
print("Failed to calculate MD5 hash of the downloaded file.")
except Exception as e:
print(f"An error occurred while writing to the file: {e}")
return False, f"An error occurred while writing to the file: {e}"

return False, "Data is None."


class OSDFremoteArtifacts:
def download_artifacts(
self,
dvc_config_op,
host: str, #s_url
cache: str, #cache_path from cmfconfig
current_directory: str, #current_directory where cmf artifact pull is executed
remote_file_path: str, # download_loc of the artifact
local_path: str, #name of the artifact
artifact_hash: str, #hash of the artifact from MLMD records
):
#print(f"Configured Host from MLMD record={host}. User configured cache redirector={cache}")
output = ""
remote_repo = dvc_config_op["remote.osdf.url"]
user = "nobody"
Expand All @@ -36,33 +111,50 @@ def download_artifacts(
#print(f"dynamic password from download_artifacts={dynamic_password}")
#print(f"Fetching artifact={local_path}, surl={host} to {remote_file_path} when this has been called at {current_directory}")

try:
headers={dvc_config_op["remote.osdf.custom_auth_header"]: dvc_config_op["remote.osdf.password"]}
temp = local_path.split("/")
temp.pop()
dir_path = "/".join(temp)
dir_to_create = os.path.join(current_directory, dir_path)
os.makedirs(
dir_to_create, mode=0o777, exist_ok=True
) # creates subfolders needed as per artifacts folder structure
local_file_path = os.path.join(current_directory, local_path)
local_file_path = os.path.abspath(local_file_path)

response = requests.get(host, headers=headers, verify=True) #This should be made True. otherwise this will produce Insecure SSL Warning
if response.status_code == 200 and response.content:
data = response.content
# Prepare directories and file paths
headers={dvc_config_op["remote.osdf.custom_auth_header"]: dvc_config_op["remote.osdf.password"]}
temp = local_path.split("/")
temp.pop()
dir_path = "/".join(temp)
dir_to_create = os.path.join(current_directory, dir_path)
os.makedirs(
dir_to_create, mode=0o777, exist_ok=True
) # creates subfolders needed as per artifacts folder structure
local_file_path = os.path.join(current_directory, local_path)
local_file_path = os.path.abspath(local_file_path)

#Cache can be Blank. If so, fetch from Origin
if cache == "":
#Fetch from Origin
success, result = download_and_verify_file(host, headers, remote_file_path, local_file_path, artifact_hash, timeout=10)
if success:
#print(result)
return result
else:
return "No data received from the server."
#print(f"Failed to download and verify file: {result}")
return f"Failed to download and verify file"
else:
#Generate Cached path for artifact
cached_s_url=generate_cached_url(host,cache)
#Try to fetch from cache first
success, cached_result = download_and_verify_file(cached_s_url, headers, remote_file_path, local_path, artifact_hash,timeout=5)
if success:
#print(cached_result)
return cached_result
else:
print(f"Failed to download and verify file from cache: {cached_result}")
print(f"Trying Origin at {host}")
#Fetch from Origin
success, origin_result = download_and_verify_file(host, headers, remote_file_path, local_path, artifact_hash, timeout=10)
if success:
#print(origin_result)
return origin_result
else:
#print(f"Failed to download and verify file: {result}")
return f"Failed to download and verify file"


except Exception as exception:
return exception



try:
with open(remote_file_path, 'wb') as file:
file.write(data)
if os.path.exists(remote_file_path) and os.path.getsize(remote_file_path) > 0:
#print(f"object {local_path} downloaded at {remote_file_path}")
stmt = f"object {local_path} downloaded at {remote_file_path}."
return stmt
except Exception as e:
print(f"An error occurred while writing to the file: {e}")

6 changes: 4 additions & 2 deletions docs/cmf_client/cmf_client.md
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,7 @@ Optional Arguments
### cmf init osdfremote
```
Usage: cmf init osdfremote [-h] --path [path]
--cache [cache]
--key-id [key_id]
--key-path [key_path]
--key-issuer [key_issuer]
Expand All @@ -196,18 +197,19 @@ Usage: cmf init osdfremote [-h] --path [path]
```
`cmf init osdfremote` configures a OSDF Origin as a cmf artifact repository.
```
cmf init osdfremote --path https://[Some Origin]:8443/nrp/fdp/ --key-id c2a5 --key-path ~/.ssh/fdp.pem --key-issuer https://[Token Issuer]] --git-remote-url https://github.com/user/experiment-repo.git --git-remote-url https://github.com/user/experiment-repo.git --cmf-server-url http://127.0.0.1:80 --neo4j-user neo4j --neo4j-password password --neo4j-uri bolt://localhost:7687
cmf init osdfremote --path https://[Some Origin]:8443/nrp/fdp/ --cache http://[Some Redirector]/nrp/fdp --key-id c2a5 --key-path ~/.ssh/fdp.pem --key-issuer https://[Token Issuer]] --git-remote-url https://github.com/user/experiment-repo.git --git-remote-url https://github.com/user/experiment-repo.git --cmf-server-url http://127.0.0.1:80 --neo4j-user neo4j --neo4j-password password --neo4j-uri bolt://localhost:7687
```
Required Arguments
```
--path [path] Specify FQDN for OSDF origin including including port and directory path
--path [path] Specify FQDN for OSDF origin including including port and directory path if any
--key-id [key_id] Specify key_id for provided private key. eg. b2d3
--key-path [key_path] Specify path for private key on local filesystem. eg. ~/.ssh/XXX.pem
--key-issuer [key_issuer] Specify URL for Key Issuer. eg. https://t.nationalresearchplatform.org/XXX
--git-remote-url [git_remote_url] Specify git repo url. eg: https://github.com/XXX/example.git
```
Optional Arguments
```
--cache [cache] Specify FQDN for OSDF cache including including port and directory path if any
-h, --help show this help message and exit
--cmf-server-url [cmf_server_url] Specify cmf-server url. (default: http://127.0.0.1:80)
--neo4j-user [neo4j_user] Specify neo4j user. (default: None)
Expand Down

0 comments on commit bf9c559

Please sign in to comment.