Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bhargav191098/naming conflict #2083

Closed
wants to merge 7 commits into from
5 changes: 2 additions & 3 deletions python/fedml/api/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -179,13 +179,12 @@ def cluster_killall(api_key=None) -> bool:
return cluster.kill(cluster_names=(), api_key=api_key)


def upload(data_path, api_key=None, service="R2", name=None, description=None, metadata=None, show_progress=False,
def upload(data_path, api_key=None, tag_list=[], service="R2", name=None, description=None, metadata=None, show_progress=False,
out_progress_to_err=True, progress_desc=None) -> FedMLResponse:
return storage.upload(data_path=data_path, api_key=api_key, name=name, description=description,
return storage.upload(data_path=data_path, api_key=api_key, name=name, description=description, tag_list =tag_list,
service=service, progress_desc=progress_desc, show_progress=show_progress,
out_progress_to_err=out_progress_to_err, metadata=metadata)


def get_storage_user_defined_metadata(data_name, api_key=None) -> FedMLResponse:
return storage.get_user_metadata(data_name=data_name, api_key=api_key)

Expand Down
151 changes: 131 additions & 20 deletions python/fedml/api/modules/storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@
from fedml.computing.scheduler.master.server_constants import ServerConstants
from fedml.api.fedml_response import FedMLResponse, ResponseCode

DUMMY_NEW_DATA = "new_data"
DUMMY_V1_EXISTS = "v1_exists"
DUMMY_MAIN_EXISTS = "main_exists"

class StorageMetadata(object):
def __init__(self, data: dict):
Expand All @@ -18,12 +21,14 @@ def __init__(self, data: dict):
self.createdAt = data.get("createTime", None)
self.updatedAt = data.get("updateTime", None)
self.size = _get_size(data.get("fileSize",None))
self.tag_list = data.get("tags", None)
self.num_versions = 3


# Todo (alaydshah): Store service name in metadata
# Todo (alaydshah): If data already exists, don't upload again. Instead suggest to use update command

def upload(data_path, api_key, name, description, service, show_progress, out_progress_to_err, progress_desc,
def upload(data_path, api_key, name, description, tag_list, service, show_progress, out_progress_to_err, progress_desc,
metadata) -> FedMLResponse:
api_key = authenticate(api_key)

Expand All @@ -38,11 +43,45 @@ def upload(data_path, api_key, name, description, service, show_progress, out_pr
archive_path, message = _archive_data(data_path)
if not archive_path:
return FedMLResponse(code=ResponseCode.FAILURE, message=message)


response_version = _dummy_get_version(DUMMY_V1_EXISTS)
data_already_exists = False
new_version = False
if response_version.data != None:
data_already_exists = True
choice = input("Do you want to overwrite the existing data? (y/n): ")
if choice.lower() == 'n':
print("A new version of the dataset will be created!\n")
commit_message = input("Enter commit message or Description for version : \n")
new_version = True
elif choice.lower() == 'y':
print("Data will be overwritten!\n")
else:
return FedMLResponse(code=ResponseCode.FAILURE, message="Invalid choice.")
else:
new_version = False
version_to_write = ""
if(data_already_exists):
#means that the data exists already - overwriting or creating a new version. So the version list will exist.
latest_version = response_version.data
if(new_version):
if(latest_version=="main"):
version_to_write = "v_1"
else:
latest_version_number = int(latest_version.split("_")[1])
new_version_number = latest_version_number + 1
version_to_write = f"v_{new_version_number}"
else:
version_to_write = "main"
else:
#This is the first time data is being uploaded.
version_to_write = "main"

store = _get_storage_service(service)
data_path = name
name = os.path.splitext(os.path.basename(archive_path))[0] if name is None else name
file_name = name + ".zip"
dest_path = os.path.join(user_id, file_name)
dest_path = os.path.join(user_id,name,version_to_write,file_name)
file_size = os.path.getsize(archive_path)

file_uploaded_url = store.upload_file_with_progress(src_local_path=archive_path, dest_s3_path=dest_path,
Expand All @@ -52,24 +91,58 @@ def upload(data_path, api_key, name, description, service, show_progress, out_pr
os.remove(archive_path)
if not file_uploaded_url:
return FedMLResponse(code=ResponseCode.FAILURE, message=f"Failed to upload file: {archive_path}")

description = ""
tag_list = []
if version_to_write == "main":
tag_list = tag_list
description = description
else:
tag_list = []
description = commit_message

if new_version:
json_data ={
"datasetName": name,
"description":description,
"fileSize": file_size,
"fileUrl": file_uploaded_url,
"tagNameList": tag_list,
"version_id":version_to_write
}
try:
response = _create_dataset_version(api_key=api_key, json_data=json_data)
code, message, data = _dummy_get_data_from_response(response)
#code, message, data = _get_data_from_response(message="Failed to upload data", response=response)
except Exception as e:
print("")
#return FedMLResponse(code=ResponseCode.FAILURE, message=f"Failed to create dataset: {e}")

json_data = {
"datasetName": name,
"description": description,
"fileSize": file_size,
"fileUrl": file_uploaded_url,
"tagNameList": [],
}
if data:
return FedMLResponse(code=code, message=message, data=file_uploaded_url)
return FedMLResponse(code=code, message=message)

try:
response = _create_dataset(api_key=api_key, json_data=json_data)
code, message, data = _get_data_from_response(message="Failed to upload data", response=response)
except Exception as e:
return FedMLResponse(code=ResponseCode.FAILURE, message=f"Failed to create dataset: {e}")
else:
# this part will update.
json_data = {
"datasetName": name,
"description": description,
"fileSize": file_size,
"fileUrl": file_uploaded_url,
"tagNameList": tag_list,
"version_name":"main"
}

if data:
return FedMLResponse(code=code, message=message, data=file_uploaded_url)
return FedMLResponse(code=code, message=message)
try:
response = _update_dataset(api_key=api_key, json_data=json_data)
#code, message, data = _get_data_from_response(message="Failed to update data", response=response)
code, message, data = _dummy_get_data_from_response(response)
except Exception as e:
return FedMLResponse(code=ResponseCode.FAILURE, message=f"Failed to update dataset: {e}")

if data:
return FedMLResponse(code=code, message=message, data=file_uploaded_url)
return FedMLResponse(code=code, message=message)


# Todo(alaydshah): Query service from object metadata
Expand Down Expand Up @@ -274,6 +347,22 @@ def _create_dataset(api_key: str, json_data: dict) -> requests.Response:
)
return response

'''
Doesn't make the correct call as of now.
'''

def _update_dataset(api_key: str, json_data: dict) -> requests.Response:
print("Update dataset function!")
return FedMLResponse(code=ResponseCode.SUCCESS,message="Data updated successfully",data=json_data)

'''
Create version of the dataset
'''
def _create_dataset_version(api_key: str, json_data: dict) -> FedMLResponse:
print("Create version of the dataset")
print("The composite key in the backend can be datasetName + version_id")
return FedMLResponse(code=ResponseCode.SUCCESS,message="Data version created successfully",data=json_data)


def _list_dataset(api_key: str) -> requests.Response:
list_dataset_url = ServerConstants.list_dataset_url()
Expand Down Expand Up @@ -317,7 +406,6 @@ def _get_dataset_metadata(api_key: str, data_name: str) -> requests.Response:
)
return response


def _delete_dataset(api_key: str, data_name: str) -> requests.Response:
dataset_url = ServerConstants.get_dataset_url()
cert_path = MLOpsConfigs.get_cert_path_with_version()
Expand Down Expand Up @@ -374,4 +462,27 @@ def _get_size(size_in_bytes:str)->str:
size_str = f"{size_in_kb:.2f} KB"
else:
size_str = f"{size} B"
return size_str
return size_str

def _dummy_get_version(test_scenario:str=DUMMY_NEW_DATA):
VERSION = None
message = ""
data = None
if(test_scenario == DUMMY_NEW_DATA):
message = "Data doesn't exist"
data = None
elif(test_scenario == DUMMY_MAIN_EXISTS):
message = "Data exists"
data = "main"
elif(test_scenario == DUMMY_V1_EXISTS):
message = "Data exists"
data = "v_1"
return FedMLResponse(code=ResponseCode.SUCCESS,message=message,data=data)

def _dummy_get_num_versions():
NUM_VERSIONS = 1
return NUM_VERSIONS

def _dummy_get_data_from_response(response:FedMLResponse):
code, message, data = ResponseCode.SUCCESS, "Will get updated once re-routed to new create_dataset_with_version" , response.data
return code, message, data
20 changes: 14 additions & 6 deletions python/fedml/cli/modules/storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ def validate_argument(ctx, param, value):
@click.option("--user_metadata", "-um", type=str, help="User-defined metadata in the form of a dictionary, for instance, "
" {'name':'value'} within double quotes. "" "
"Defaults to None.")
@click.option("--tags", "-t", type=str, help="Add tags to your data to store. Give tags in comma separated form like 'cv,unet,segmentation' If not provided, the tags will be empty.")
@click.option('--service', "-s", type=click.Choice(['R2']), default="R2", help="Storage service for object storage. "
"Only R2 is supported as of now")
@click.option(
Expand All @@ -65,10 +66,11 @@ def validate_argument(ctx, param, value):
default="release",
help=version_help,
)
def upload(data_path: str, name: str, user_metadata: str, description: str, version: str, api_key: str, service):
def upload(data_path: str, name: str, user_metadata: str, description: str, version: str, api_key: str, tags:str, service):
metadata = _parse_metadata(user_metadata)
tag_list = _parse_tags(tags)
fedml.set_env_version(version)
response = fedml.api.upload(data_path=data_path, api_key=api_key, name=name, service=service, show_progress=True,
response = fedml.api.upload(data_path=data_path, api_key=api_key, name=name, tag_list = tag_list, service=service, show_progress=True,
description=description, metadata=metadata)
if response.code == ResponseCode.SUCCESS:
click.echo(f"Data uploaded successfully. | url: {response.data}")
Expand Down Expand Up @@ -96,10 +98,10 @@ def list_data(version, api_key):
if not response.data:
click.echo(f"No stored objects found for account linked with apikey: {api_key}")
return
object_list_table = PrettyTable(["Data Name", "Data Size", "Description", "Created At", "Updated At"])
object_list_table = PrettyTable(["Data Name", "Data Size", "Description", "Data Tags","Created At", "Updated At"])
for stored_object in response.data:
object_list_table.add_row(
[stored_object.dataName, stored_object.size, stored_object.description, stored_object.createdAt, stored_object.updatedAt])
[stored_object.dataName, stored_object.size, stored_object.description, stored_object.tag_list,stored_object.createdAt, stored_object.updatedAt])
click.echo(object_list_table)
else:
click.echo(f"Failed to list stored objects for account linked with apikey {api_key}. "
Expand Down Expand Up @@ -157,8 +159,8 @@ def get_metadata(data_name, version, api_key):
return
click.echo(f"Successfully fetched metadata for object {data_name}:")
# Todo (alaydshah): Add file size and tags
metadata_table = PrettyTable(["Data Name","Data Size","Description", "Created At", "Updated At"])
metadata_table.add_row([metadata.dataName,metadata.size,metadata.description, metadata.createdAt, metadata.updatedAt])
metadata_table = PrettyTable(["Data Name","Data Size","Description","Data Tags","Created At", "Updated At"])
metadata_table.add_row([metadata.dataName,metadata.size,metadata.description,metadata.tag_list,metadata.createdAt, metadata.updatedAt])
click.echo(metadata_table)
click.echo("")
else:
Expand Down Expand Up @@ -238,3 +240,9 @@ def _parse_metadata(metadata: str):
click.echo(
f"Input metadata cannot be evaluated. Please make sure metadata is in the correct format. Error: {e}.")
exit()

def _parse_tags(tags:str):
if not tags:
return []
tag_list = tags.split(",")
return tag_list
Original file line number Diff line number Diff line change
Expand Up @@ -460,6 +460,11 @@ def upload_file_with_progress(self, src_local_path, dest_s3_path,
if metadata is None:
metadata = {}
file_uploaded_url = ""
'''
Uncomment for debugging purpose - remove in the final commit.
'''
# print("Bucket Name: ", self.bucket_name)
# print("Dest S3 Path: ", dest_s3_path)
progress_desc_text = "Uploading Package to Remote Storage"
if progress_desc is not None:
progress_desc_text = progress_desc
Expand Down
Loading