Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP PR for supporting versioning in FedML storage #2084

Open
wants to merge 1 commit into
base: dev/v0.7.0
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
148 changes: 129 additions & 19 deletions python/fedml/api/modules/storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@
from fedml.computing.scheduler.master.server_constants import ServerConstants
from fedml.api.fedml_response import FedMLResponse, ResponseCode

DUMMY_NEW_DATA = "new_data"
DUMMY_V1_EXISTS = "v1_exists"
DUMMY_MAIN_EXISTS = "main_exists"

class StorageMetadata(object):
def __init__(self, data: dict):
Expand All @@ -19,6 +22,7 @@ def __init__(self, data: dict):
self.updatedAt = data.get("updateTime", None)
self.size = _get_size(data.get("fileSize",None))
self.tag_list = data.get("tags", None)
self.num_versions = 3


# Todo (alaydshah): Store service name in metadata
Expand All @@ -39,11 +43,45 @@ def upload(data_path, api_key, name, description, tag_list, service, show_progre
archive_path, message = _archive_data(data_path)
if not archive_path:
return FedMLResponse(code=ResponseCode.FAILURE, message=message)


response_version = _dummy_get_version(DUMMY_V1_EXISTS)
data_already_exists = False
new_version = False
if response_version.data != None:
data_already_exists = True
choice = input("Do you want to overwrite the existing data? (y/n): ")
if choice.lower() == 'n':
print("A new version of the dataset will be created!\n")
commit_message = input("Enter commit message or Description for version : \n")
new_version = True
elif choice.lower() == 'y':
print("Data will be overwritten!\n")
else:
return FedMLResponse(code=ResponseCode.FAILURE, message="Invalid choice.")
else:
new_version = False
version_to_write = ""
if(data_already_exists):
#means that the data exists already - overwriting or creating a new version. So the version list will exist.
latest_version = response_version.data
if(new_version):
if(latest_version=="main"):
version_to_write = "v_1"
else:
latest_version_number = int(latest_version.split("_")[1])
new_version_number = latest_version_number + 1
version_to_write = f"v_{new_version_number}"
else:
version_to_write = "main"
else:
#This is the first time data is being uploaded.
version_to_write = "main"

store = _get_storage_service(service)
data_path = name
name = os.path.splitext(os.path.basename(archive_path))[0] if name is None else name
file_name = name + ".zip"
dest_path = os.path.join(user_id, file_name)
dest_path = os.path.join(user_id,name,version_to_write,file_name)
file_size = os.path.getsize(archive_path)

file_uploaded_url = store.upload_file_with_progress(src_local_path=archive_path, dest_s3_path=dest_path,
Expand All @@ -53,24 +91,58 @@ def upload(data_path, api_key, name, description, tag_list, service, show_progre
os.remove(archive_path)
if not file_uploaded_url:
return FedMLResponse(code=ResponseCode.FAILURE, message=f"Failed to upload file: {archive_path}")

description = ""
tag_list = []
if version_to_write == "main":
tag_list = tag_list
description = description
else:
tag_list = []
description = commit_message

if new_version:
json_data ={
"datasetName": name,
"description":description,
"fileSize": file_size,
"fileUrl": file_uploaded_url,
"tagNameList": tag_list,
"version_id":version_to_write
}
try:
response = _create_dataset_version(api_key=api_key, json_data=json_data)
code, message, data = _dummy_get_data_from_response(response)
#code, message, data = _get_data_from_response(message="Failed to upload data", response=response)
except Exception as e:
print("")
#return FedMLResponse(code=ResponseCode.FAILURE, message=f"Failed to create dataset: {e}")

json_data = {
"datasetName": name,
"description": description,
"fileSize": file_size,
"fileUrl": file_uploaded_url,
"tagNameList": tag_list,
}
if data:
return FedMLResponse(code=code, message=message, data=file_uploaded_url)
return FedMLResponse(code=code, message=message)

try:
response = _create_dataset(api_key=api_key, json_data=json_data)
code, message, data = _get_data_from_response(message="Failed to upload data", response=response)
except Exception as e:
return FedMLResponse(code=ResponseCode.FAILURE, message=f"Failed to create dataset: {e}")
else:
# this part will update.
json_data = {
"datasetName": name,
"description": description,
"fileSize": file_size,
"fileUrl": file_uploaded_url,
"tagNameList": tag_list,
"version_name":"main"
}

if data:
return FedMLResponse(code=code, message=message, data=file_uploaded_url)
return FedMLResponse(code=code, message=message)
try:
response = _update_dataset(api_key=api_key, json_data=json_data)
#code, message, data = _get_data_from_response(message="Failed to update data", response=response)
code, message, data = _dummy_get_data_from_response(response)
except Exception as e:
return FedMLResponse(code=ResponseCode.FAILURE, message=f"Failed to update dataset: {e}")

if data:
return FedMLResponse(code=code, message=message, data=file_uploaded_url)
return FedMLResponse(code=code, message=message)


# Todo(alaydshah): Query service from object metadata
Expand Down Expand Up @@ -275,6 +347,22 @@ def _create_dataset(api_key: str, json_data: dict) -> requests.Response:
)
return response

'''
Doesn't make the correct call as of now.
'''

def _update_dataset(api_key: str, json_data: dict) -> requests.Response:
print("Update dataset function!")
return FedMLResponse(code=ResponseCode.SUCCESS,message="Data updated successfully",data=json_data)

'''
Create version of the dataset
'''
def _create_dataset_version(api_key: str, json_data: dict) -> FedMLResponse:
print("Create version of the dataset")
print("The composite key in the backend can be datasetName + version_id")
return FedMLResponse(code=ResponseCode.SUCCESS,message="Data version created successfully",data=json_data)


def _list_dataset(api_key: str) -> requests.Response:
list_dataset_url = ServerConstants.list_dataset_url()
Expand Down Expand Up @@ -318,7 +406,6 @@ def _get_dataset_metadata(api_key: str, data_name: str) -> requests.Response:
)
return response


def _delete_dataset(api_key: str, data_name: str) -> requests.Response:
dataset_url = ServerConstants.get_dataset_url()
cert_path = MLOpsConfigs.get_cert_path_with_version()
Expand Down Expand Up @@ -375,4 +462,27 @@ def _get_size(size_in_bytes:str)->str:
size_str = f"{size_in_kb:.2f} KB"
else:
size_str = f"{size} B"
return size_str
return size_str

def _dummy_get_version(test_scenario:str=DUMMY_NEW_DATA):
VERSION = None
message = ""
data = None
if(test_scenario == DUMMY_NEW_DATA):
message = "Data doesn't exist"
data = None
elif(test_scenario == DUMMY_MAIN_EXISTS):
message = "Data exists"
data = "main"
elif(test_scenario == DUMMY_V1_EXISTS):
message = "Data exists"
data = "v_1"
return FedMLResponse(code=ResponseCode.SUCCESS,message=message,data=data)

def _dummy_get_num_versions():
NUM_VERSIONS = 1
return NUM_VERSIONS

def _dummy_get_data_from_response(response:FedMLResponse):
code, message, data = ResponseCode.SUCCESS, "Will get updated once re-routed to new create_dataset_with_version" , response.data
return code, message, data
Original file line number Diff line number Diff line change
Expand Up @@ -460,6 +460,11 @@ def upload_file_with_progress(self, src_local_path, dest_s3_path,
if metadata is None:
metadata = {}
file_uploaded_url = ""
'''
Uncomment for debugging purpose - remove in the final commit.
'''
# print("Bucket Name: ", self.bucket_name)
# print("Dest S3 Path: ", dest_s3_path)
progress_desc_text = "Uploading Package to Remote Storage"
if progress_desc is not None:
progress_desc_text = progress_desc
Expand Down
Loading