Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Regression because of DVC 3.51 version: made changes in pull.py file #179

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
86 changes: 71 additions & 15 deletions cmflib/commands/artifact/pull.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,40 +31,87 @@

class CmdArtifactPull(CmdBase):

def split_url_pipeline(self, url: str, pipeline_name: str):
if pipeline_name in url:
def split_url_pipeline(self, url: str, pipeline_name: str):
# This function takes url and pipeline_name as a input parameter
# return string which contains the artifact repo path of the artifact
# url = Test-env:/home/user/local-storage/files/md5/23/6d9502e0283d91f689d7038b8508a2
# pipeline_name = Test-env

# checking whether pipeline name exist inside url
if pipeline_name in url:
# if multiple pipelines logs same artifact, then spliting them using ',' delimiter
if "," in url:
urls = url.split(",")
# iterate over each urls
for u in urls:
# assign u to url if pipeline name exist
if pipeline_name in u:
url = u
# splitting url using ':' delimiter token = ["Test-env","home/user/local-storage/files/md5/23/6d9502e508a2"]
token = url.split(":")
# removing 1st element from token i.e pipeline name
# output token will be ["home/user/local-storage/files/md5/23/6d9502e508a2"]
token.pop(0)
if len(token) > 1:
# in case of metrics we have multiple ':' in its url
# concating remaining tokens after removing pipeline_name using ':' delimiter
token = ":".join(token)
return token
return "".join(token)

def extract_repo_args(self, type: str, name: str, url: str, current_directory: str):
#Extracting the repository URL, current path, bucket name, and other relevant
#information from the user-supplied arguments.
#url = 'Test-env:/home/user/local-storage/06/d100ff3e04e2c87bf20f0feacc9034,Second-env:/home/user/local-storage/06/d100ff3e04e2c>
# Extracting the repository URL, current path, bucket name, and other relevant
# information from the user-supplied arguments.
# url = Test-env:/home/user/local-storage/files/md5/06/d100ff3e04e2c87bf20f0feacc9034,
# Second-env:/home/user/local-storage/files/md5/06/d100ff3e04e2c"

# s_url = Url without pipeline name
s_url = self.split_url_pipeline(url, self.args.pipeline_name)

# got url in the form of /home/user/local-storage/files/md5/06/d100ff3e04e2c
#spliting url using '/' delimiter
token = s_url.split("/")

# name = artifacts/model/model.pkl
name = name.split(":")[0]
if type == "minio":
token_length = len(token)

# assigned 2nd position element to bucket_name
bucket_name = token[2]
object_name = token[3] + "/" + token[4]

# The folder structure of artifact data has been updated due to a change in the DVC 3.0 version
# Previously, the structure was dvc-art/23/69v2uu3jeejjeiw
# but now it includes additional directories and has become files dvc-art/files/md5/23/69v2uu3jeejjeiw.
# Consequently, the previous logic takes only the last 2 elements from the list of tokens,
# but with the new structure, it needs to take the last 4 elements.

# get last 4 element inside token
token = token[(token_length-4):]

# join last 4 token using '/' delimiter
object_name = "/".join(token)
# output = files/md5/23/69v2uu3jeejjeiw

path_name = current_directory + "/" + name
return bucket_name, object_name, path_name

elif type == "local":
token_length = len(token)
download_loc = current_directory + "/" + name
current_dvc_loc = (token[(token_length - 2)] + "/" + token[(token_length - 1)])

# local artifact repo path = local-storage/files/md5/23/69v2uu3jeejjeiw.

# get last 4 element inside token
token = token[(token_length-4):]

# join last 4 token using '/' delimiter
current_dvc_loc = "/".join(token)

return current_dvc_loc, download_loc

elif type == "ssh":
# comments remaining
token_var = token[2].split(":")
host = token_var[0]
token.pop(0)
Expand All @@ -73,25 +120,30 @@ def extract_repo_args(self, type: str, name: str, url: str, current_directory: s
current_loc_1 = "/".join(token)
current_loc = f"/{current_loc_1}"
return host, current_loc, name

else:
# sometimes s_url is empty - this shouldn't happen technically
# sometimes s_url is not starting with s3:// - technically this shouldn't happen
if s_url and s_url.startswith("s3://"):
url_with_bucket = s_url.split("s3://")[1]
# url_with_bucket = varkha-test/23/6d9502e0283d91f689d7038b8508a2
# Splitting the string using '/' as the delimiter
# url_with_bucket = mybucket/user/files/md5/23/6d9502e0283d91f689d7038b8508a2
# splitting the string using '/' as the delimiter
# bucket_name = mybucket
# object_name = user/files/md5/23/6d9502e0283d91f689d7038b8508a2
bucket_name, object_name = url_with_bucket.split('/', 1)
download_loc = current_directory + "/" + name if current_directory != "" else name
print(download_loc)
#print(download_loc)
return bucket_name, object_name, download_loc
else:
# returning bucket_name, object_name and download_loc returning as empty
return "", "", ""

def search_artifact(self, input_dict):
# This function takes input_dict as input artifact
for name, url in input_dict.items():
if not isinstance(url, str):
continue
# splitting name with ':' as the delimiter and store first argument inside name
name = name.split(":")[0]
file_name = name.split('/')[-1]
if file_name == self.args.artifact_name:
Expand All @@ -100,7 +152,7 @@ def search_artifact(self, input_dict):
pass

def run(self):
# Put a check to see whether pipline exists or not
# check whether the mlmd file exist or not in current directory
pipeline_name = self.args.pipeline_name
current_directory = os.getcwd()
mlmd_file_name = "./mlmd"
Expand All @@ -113,6 +165,7 @@ def run(self):
return f"ERROR: {mlmd_file_name} doesn't exists in {current_directory} directory."
query = cmfquery.CmfQuery(mlmd_file_name)

# getting all pipeline stages[i.e Prepare, Featurize, Train and Evaluate]
stages = query.get_pipeline_stages(self.args.pipeline_name)
executions = []
identifiers = []
Expand All @@ -124,21 +177,23 @@ def run(self):
if len(executions) > 0:
# converting it to dictionary
dict_executions = executions.to_dict("dict")
# append id's of executions inside identifiers
for id in dict_executions["id"].values():
identifiers.append(id)
else:
print("No Executions found for " + stage + " stage.")

# created dictionary
name_url_dict = {}
if len(identifiers) == 0: # check if there are no executions
return "No executions found."
for identifier in identifiers:
get_artifacts = query.get_all_artifacts_for_execution(
identifier
) # getting all artifacts with id
temp_dict = dict(zip(get_artifacts['name'], get_artifacts['url']))
name_url_dict.update(temp_dict)
#print(name_url_dict)
temp_dict = dict(zip(get_artifacts['name'], get_artifacts['url'])) # getting dictionary of name and url pair
name_url_dict.update(temp_dict) # updating name_url_dict with temp_dict
# print(name_url_dict)
# name_url_dict = ('artifacts/parsed/test.tsv:6f597d341ceb7d8fbbe88859a892ef81', 'Test-env:/home/sharvark/local-storage/6f/597d341ceb7d8fbbe88859a892ef81'
# name_url_dict = ('artifacts/parsed/test.tsv:6f597d341ceb7d8fbbe88859a892ef81', 'Test-env:/home/sharvark/local-storage/6f/597d341ceb7d8fbbe88859a892ef81,Second-env:/home/sharvark/local-storage/6f/597d341ceb7d8fbbe88859a892ef81')

Expand Down Expand Up @@ -241,6 +296,7 @@ def run(self):
return "Done"
elif dvc_config_op["core.remote"] == "amazons3":
amazonS3_class_obj = amazonS3_artifacts.AmazonS3Artifacts()
#print(self.args.artifact_name,"artifact name")
if self.args.artifact_name:
output = self.search_artifact(name_url_dict)
# output[0] = name
Expand All @@ -260,7 +316,6 @@ def run(self):
print(stmt)
else:
for name, url in name_url_dict.items():
#print(name, url)
if not isinstance(url, str):
continue
args = self.extract_repo_args("amazons3", name, url, current_directory)
Expand Down Expand Up @@ -310,3 +365,4 @@ def add_parser(subparsers, parent_parser):
)

parser.set_defaults(func=CmdArtifactPull)

2 changes: 1 addition & 1 deletion docs/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ models and performance metrics) recorded by the framework are versioned and iden
## Installation

#### 1. Pre-Requisites:
* 3.9>= Python <=3.11
* 3.9>= Python <=3.10
* Git latest version

#### 2. Set up Python Virtual Environment:
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ authors = [
]
description = "Track metadata for AI pipeline"
readme = "README.md"
requires-python = ">=3.9,<=3.11"
requires-python = ">=3.9,<=3.10"
classifiers = [
"Programming Language :: Python :: 3",
"Operating System :: POSIX :: Linux",
Expand Down