diff --git a/python/fedml/api/modules/storage.py b/python/fedml/api/modules/storage.py index 51f58539bf..837ec61d77 100644 --- a/python/fedml/api/modules/storage.py +++ b/python/fedml/api/modules/storage.py @@ -9,6 +9,9 @@ from fedml.computing.scheduler.master.server_constants import ServerConstants from fedml.api.fedml_response import FedMLResponse, ResponseCode +DUMMY_NEW_DATA = "new_data" +DUMMY_V1_EXISTS = "v1_exists" +DUMMY_MAIN_EXISTS = "main_exists" class StorageMetadata(object): def __init__(self, data: dict): @@ -19,6 +22,7 @@ def __init__(self, data: dict): self.updatedAt = data.get("updateTime", None) self.size = _get_size(data.get("fileSize",None)) self.tag_list = data.get("tags", None) + self.num_versions = 3 # Todo (alaydshah): Store service name in metadata @@ -39,11 +43,45 @@ def upload(data_path, api_key, name, description, tag_list, service, show_progre archive_path, message = _archive_data(data_path) if not archive_path: return FedMLResponse(code=ResponseCode.FAILURE, message=message) - + + response_version = _dummy_get_version(DUMMY_V1_EXISTS) + data_already_exists = False + new_version = False + if response_version.data != None: + data_already_exists = True + choice = input("Do you want to overwrite the existing data? (y/n): ") + if choice.lower() == 'n': + print("A new version of the dataset will be created!\n") + commit_message = input("Enter commit message or Description for version : \n") + new_version = True + elif choice.lower() == 'y': + print("Data will be overwritten!\n") + else: + return FedMLResponse(code=ResponseCode.FAILURE, message="Invalid choice.") + else: + new_version = False + version_to_write = "" + if(data_already_exists): + #means that the data exists already - overwriting or creating a new version. So the version list will exist. + latest_version = response_version.data + if(new_version): + if(latest_version=="main"): + version_to_write = "v_1" + else: + latest_version_number = int(latest_version.split("_")[1]) + new_version_number = latest_version_number + 1 + version_to_write = f"v_{new_version_number}" + else: + version_to_write = "main" + else: + #This is the first time data is being uploaded. + version_to_write = "main" + store = _get_storage_service(service) + data_path = name name = os.path.splitext(os.path.basename(archive_path))[0] if name is None else name file_name = name + ".zip" - dest_path = os.path.join(user_id, file_name) + dest_path = os.path.join(user_id,name,version_to_write,file_name) file_size = os.path.getsize(archive_path) file_uploaded_url = store.upload_file_with_progress(src_local_path=archive_path, dest_s3_path=dest_path, @@ -53,24 +91,58 @@ def upload(data_path, api_key, name, description, tag_list, service, show_progre os.remove(archive_path) if not file_uploaded_url: return FedMLResponse(code=ResponseCode.FAILURE, message=f"Failed to upload file: {archive_path}") + + description = "" + tag_list = [] + if version_to_write == "main": + tag_list = tag_list + description = description + else: + tag_list = [] + description = commit_message + + if new_version: + json_data ={ + "datasetName": name, + "description":description, + "fileSize": file_size, + "fileUrl": file_uploaded_url, + "tagNameList": tag_list, + "version_id":version_to_write + } + try: + response = _create_dataset_version(api_key=api_key, json_data=json_data) + code, message, data = _dummy_get_data_from_response(response) + #code, message, data = _get_data_from_response(message="Failed to upload data", response=response) + except Exception as e: + print("") + #return FedMLResponse(code=ResponseCode.FAILURE, message=f"Failed to create dataset: {e}") - json_data = { - "datasetName": name, - "description": description, - "fileSize": file_size, - "fileUrl": file_uploaded_url, - "tagNameList": tag_list, - } + if data: + return FedMLResponse(code=code, message=message, data=file_uploaded_url) + return FedMLResponse(code=code, message=message) - try: - response = _create_dataset(api_key=api_key, json_data=json_data) - code, message, data = _get_data_from_response(message="Failed to upload data", response=response) - except Exception as e: - return FedMLResponse(code=ResponseCode.FAILURE, message=f"Failed to create dataset: {e}") + else: + # this part will update. + json_data = { + "datasetName": name, + "description": description, + "fileSize": file_size, + "fileUrl": file_uploaded_url, + "tagNameList": tag_list, + "version_name":"main" + } - if data: - return FedMLResponse(code=code, message=message, data=file_uploaded_url) - return FedMLResponse(code=code, message=message) + try: + response = _update_dataset(api_key=api_key, json_data=json_data) + #code, message, data = _get_data_from_response(message="Failed to update data", response=response) + code, message, data = _dummy_get_data_from_response(response) + except Exception as e: + return FedMLResponse(code=ResponseCode.FAILURE, message=f"Failed to update dataset: {e}") + + if data: + return FedMLResponse(code=code, message=message, data=file_uploaded_url) + return FedMLResponse(code=code, message=message) # Todo(alaydshah): Query service from object metadata @@ -275,6 +347,22 @@ def _create_dataset(api_key: str, json_data: dict) -> requests.Response: ) return response +''' +Doesn't make the correct call as of now. +''' + +def _update_dataset(api_key: str, json_data: dict) -> requests.Response: + print("Update dataset function!") + return FedMLResponse(code=ResponseCode.SUCCESS,message="Data updated successfully",data=json_data) + +''' +Create version of the dataset +''' +def _create_dataset_version(api_key: str, json_data: dict) -> FedMLResponse: + print("Create version of the dataset") + print("The composite key in the backend can be datasetName + version_id") + return FedMLResponse(code=ResponseCode.SUCCESS,message="Data version created successfully",data=json_data) + def _list_dataset(api_key: str) -> requests.Response: list_dataset_url = ServerConstants.list_dataset_url() @@ -318,7 +406,6 @@ def _get_dataset_metadata(api_key: str, data_name: str) -> requests.Response: ) return response - def _delete_dataset(api_key: str, data_name: str) -> requests.Response: dataset_url = ServerConstants.get_dataset_url() cert_path = MLOpsConfigs.get_cert_path_with_version() @@ -375,4 +462,27 @@ def _get_size(size_in_bytes:str)->str: size_str = f"{size_in_kb:.2f} KB" else: size_str = f"{size} B" - return size_str \ No newline at end of file + return size_str + +def _dummy_get_version(test_scenario:str=DUMMY_NEW_DATA): + VERSION = None + message = "" + data = None + if(test_scenario == DUMMY_NEW_DATA): + message = "Data doesn't exist" + data = None + elif(test_scenario == DUMMY_MAIN_EXISTS): + message = "Data exists" + data = "main" + elif(test_scenario == DUMMY_V1_EXISTS): + message = "Data exists" + data = "v_1" + return FedMLResponse(code=ResponseCode.SUCCESS,message=message,data=data) + +def _dummy_get_num_versions(): + NUM_VERSIONS = 1 + return NUM_VERSIONS + +def _dummy_get_data_from_response(response:FedMLResponse): + code, message, data = ResponseCode.SUCCESS, "Will get updated once re-routed to new create_dataset_with_version" , response.data + return code, message, data \ No newline at end of file diff --git a/python/fedml/core/distributed/communication/s3/remote_storage.py b/python/fedml/core/distributed/communication/s3/remote_storage.py index 78bcf31fd7..fd0a813202 100644 --- a/python/fedml/core/distributed/communication/s3/remote_storage.py +++ b/python/fedml/core/distributed/communication/s3/remote_storage.py @@ -460,6 +460,11 @@ def upload_file_with_progress(self, src_local_path, dest_s3_path, if metadata is None: metadata = {} file_uploaded_url = "" + ''' + Uncomment for debugging purpose - remove in the final commit. + ''' + # print("Bucket Name: ", self.bucket_name) + # print("Dest S3 Path: ", dest_s3_path) progress_desc_text = "Uploading Package to Remote Storage" if progress_desc is not None: progress_desc_text = progress_desc