Skip to content

Commit

Permalink
Merge pull request #1898 from FedML-AI/merge-swap-2
Browse files Browse the repository at this point in the history
Dev/v0.7.0
  • Loading branch information
fedml-alex authored Feb 4, 2024
2 parents 29a2971 + bfceb24 commit 7a4d4e1
Show file tree
Hide file tree
Showing 21 changed files with 158 additions and 51 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -275,3 +275,5 @@ python/tests/test_model_cli/llm_deploy/src/constants/prompt_template.py
python/examples/launch/hello_world/bootstrap.bat
python/examples/launch/hello_world/fedml_job_entry_pack.bat
**mpi_host_file
/python/fedml/workflow/driver_example/customized_job_example/train_job/bootstrap.bat
/python/fedml/workflow/driver_example/customized_job_example/train_job/fedml_job_entry_pack.bat
2 changes: 1 addition & 1 deletion python/fedml/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
_global_training_type = None
_global_comm_backend = None

__version__ = "0.8.18a14"
__version__ = "0.8.19a1"


# This is the deployment environment used for different roles (RD/PM/BD/Public Developers). Potential VALUE: local, dev, test, release
Expand Down
10 changes: 0 additions & 10 deletions python/fedml/computing/scheduler/comm_utils/job_monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -760,15 +760,5 @@ def monitor_endpoint_logs(self):
with open(log_file_path, "a") as f:
f.write(endpoint_logs)

if is_job_container_running and not MLOpsRuntimeLogDaemon.get_instance(fedml_args). \
is_log_processor_running(job.job_id, job.edge_id):
setattr(fedml_args, "log_file_dir", os.path.dirname(log_file_path))
MLOpsRuntimeLogDaemon.get_instance(fedml_args).log_file_dir = os.path.dirname(log_file_path)
MLOpsRuntimeLogDaemon.get_instance(fedml_args).start_log_processor(
job.job_id, job.edge_id,
log_source=device_client_constants.ClientConstants.FEDML_LOG_SOURCE_TYPE_MODEL_END_POINT,
log_file_prefix=JobMonitor.ENDPOINT_CONTAINER_LOG_PREFIX
)

except Exception as e:
print(f"Exception when syncing endpoint log to MLOps {traceback.format_exc()}.")
Original file line number Diff line number Diff line change
Expand Up @@ -268,10 +268,12 @@ def run(self, process_event, completed_event):
logging.info(f"[endpoint/device][{run_id}/{self.edge_id}] Release gpu resource when the worker deployment stopped.")
self.release_gpu_ids(run_id)
self.reset_devices_status(self.edge_id, ClientConstants.MSG_MLOPS_CLIENT_STATUS_KILLED)
MLOpsRuntimeLogDaemon.get_instance(self.args).stop_log_processor(run_id, self.edge_id)
except RunnerCompletedError:
logging.info(f"[endpoint/device][{run_id}/{self.edge_id}] Release gpu resource when the worker deployment completed.")
self.release_gpu_ids(run_id)
logging.info("Runner completed.")
MLOpsRuntimeLogDaemon.get_instance(self.args).stop_log_processor(run_id, self.edge_id)
except Exception as e:
logging.error("Runner exits with exceptions. {}".format(traceback.format_exc()))
self.cleanup_run_when_starting_failed()
Expand All @@ -284,7 +286,6 @@ def run(self, process_event, completed_event):
sys.exit(1)
finally:
logging.info("Release resources.")
MLOpsRuntimeLogDaemon.get_instance(self.args).stop_log_processor(run_id, self.edge_id)
if self.mlops_metrics is not None:
self.mlops_metrics.stop_sys_perf()
time.sleep(3)
Expand Down Expand Up @@ -932,7 +933,8 @@ def callback_runner_id_status(self, topic, payload):
status_process.join(15)

# Stop log processor for current run
MLOpsRuntimeLogDaemon.get_instance(self.args).stop_log_processor(run_id, edge_id)
if status != ClientConstants.MSG_MLOPS_CLIENT_STATUS_FINISHED:
MLOpsRuntimeLogDaemon.get_instance(self.args).stop_log_processor(run_id, edge_id)

def callback_report_current_status(self, topic, payload):
self.send_agent_active_msg()
Expand Down
17 changes: 11 additions & 6 deletions python/fedml/core/mlops/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -172,9 +172,10 @@ def event(event_name, event_started=True, event_value=None, event_edge_id=None):
MLOpsStore.mlops_event.log_event_ended(event_name, event_value, event_edge_id)


def log(metrics: dict, step: int = None, customized_step_key: str = None, commit: bool = True):
def log(metrics: dict, step: int = None, customized_step_key: str = None, commit: bool = True, is_endpoint_metric=False):
if MLOpsStore.mlops_args is None or fedml._global_training_type == constants.FEDML_TRAINING_PLATFORM_CROSS_CLOUD:
log_metric(metrics, step=step, customized_step_key=customized_step_key, commit=commit)
log_metric(metrics, step=step, customized_step_key=customized_step_key, commit=commit,
is_endpoint_metric=is_endpoint_metric)
return

if not mlops_enabled(MLOpsStore.mlops_args):
Expand All @@ -186,12 +187,13 @@ def log(metrics: dict, step: int = None, customized_step_key: str = None, commit
return

log_metric(metrics, step=step, customized_step_key=customized_step_key, commit=commit,
run_id=MLOpsStore.mlops_run_id, edge_id=MLOpsStore.mlops_edge_id)
run_id=MLOpsStore.mlops_run_id, edge_id=MLOpsStore.mlops_edge_id,
is_endpoint_metric=is_endpoint_metric)


def log_endpoint(metrics: dict, step: int = None, customized_step_key: str = None, commit: bool = True):
if MLOpsStore.mlops_args is None or fedml._global_training_type == constants.FEDML_TRAINING_PLATFORM_CROSS_CLOUD:
log_metric(metrics, step=step, customized_step_key=customized_step_key, commit=commit)
log_metric(metrics, step=step, customized_step_key=customized_step_key, commit=commit, is_endpoint_metric=True)
return

if not mlops_enabled(MLOpsStore.mlops_args):
Expand Down Expand Up @@ -860,8 +862,11 @@ def log_metric(metrics: dict, step: int = None, customized_step_key: str = None,
return
MLOpsStore.mlops_log_metrics = log_metrics_obj.copy()
setup_log_mqtt_mgr()
MLOpsStore.mlops_metrics.report_fedml_train_metric(
MLOpsStore.mlops_log_metrics, run_id=run_id, is_endpoint=is_endpoint_metric)
if is_endpoint_metric:
MLOpsStore.mlops_metrics.report_endpoint_metric(MLOpsStore.mlops_log_metrics)
else:
MLOpsStore.mlops_metrics.report_fedml_train_metric(
MLOpsStore.mlops_log_metrics, run_id=run_id, is_endpoint=is_endpoint_metric)
MLOpsStore.mlops_log_metrics.clear()
if step is None:
MLOpsStore.mlops_log_metrics_steps = current_step + 1
Expand Down
7 changes: 6 additions & 1 deletion python/fedml/core/mlops/mlops_device_perfs.py
Original file line number Diff line number Diff line change
Expand Up @@ -166,10 +166,15 @@ def report_gpu_device_info(edge_id, mqtt_mgr=None):
gpu_cores_available, sent_bytes, recv_bytes, gpu_available_ids = sys_utils.get_sys_realtime_stats(edge_id)

topic_name = "ml_client/mlops/gpu_device_info"
deploy_worker_id_list = list()
try:
deploy_worker_id_list = json.loads(os.environ.get("FEDML_DEPLOY_WORKER_IDS", "[]")),
except Exception as e:
pass
device_info_json = {
"edgeId": edge_id,
"deployMasterId": os.environ.get("FEDML_DEPLOY_MASTER_ID", ""),
"deployWorkerIds": os.environ.get("FEDML_DEPLOY_WORKER_IDS", "[]"),
"deployWorkerIds": deploy_worker_id_list,
"memoryTotal": round(total_mem * MLOpsUtils.BYTES_TO_GB, 2),
"memoryAvailable": round(free_mem * MLOpsUtils.BYTES_TO_GB, 2),
"diskSpaceTotal": round(total_disk_size * MLOpsUtils.BYTES_TO_GB, 2),
Expand Down
5 changes: 3 additions & 2 deletions python/fedml/mlops/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,9 @@ def event(event_name, event_started=True, event_value=None, event_edge_id=None):
mlops.event(event_name, event_started, event_value, event_edge_id)


def log(metrics: dict, step: int = None, customized_step_key: str = None, commit: bool = True):
mlops.log(metrics, step=step, customized_step_key=customized_step_key, commit=commit)
def log(metrics: dict, step: int = None, customized_step_key: str = None, commit: bool = True, is_endpoint_metric=False):
mlops.log(metrics, step=step, customized_step_key=customized_step_key, commit=commit,
is_endpoint_metric=is_endpoint_metric)


def log_endpoint(metrics: dict, step: int = None, customized_step_key: str = None, commit: bool = True):
Expand Down
5 changes: 2 additions & 3 deletions python/fedml/workflow/customized_jobs/deploy_job.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
from fedml.workflow.jobs import JobStatus

from fedml.workflow.customized_jobs.customized_base_job import CustomizedBaseJob
import fedml


class DeployJob(CustomizedBaseJob):
Expand All @@ -14,7 +13,7 @@ def run(self):
super().run()

def status(self):
super().status()
return super().status()

def kill(self):
super().kill()
4 changes: 1 addition & 3 deletions python/fedml/workflow/customized_jobs/train_job.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@

from fedml.workflow.jobs import JobStatus
from fedml.workflow.customized_jobs.customized_base_job import CustomizedBaseJob
import fedml


class TrainJob(CustomizedBaseJob):
Expand All @@ -15,7 +13,7 @@ def run(self):
super().run()

def status(self):
super().status()
return super().status()

def kill(self):
super().kill()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@

# Make your own workflow with multiple jobs
## Define the job yaml
```
working_directory = os.path.dirname(os.path.abspath(__file__))
deploy_image_job_yaml = os.path.join(working_directory, "deploy_image_job.yaml")
deploy_3d_job_yaml = os.path.join(working_directory, "deploy_3d_job.yaml")
train_job_yaml = os.path.join(working_directory, "train_job.yaml")
```

## If needed, we may load the job yaml and change some config items.
```
deploy_image_job_yaml_obj = DeployImageJob.load_yaml_config(deploy_image_job_yaml)
deploy_3d_job_yaml_obj = DeployImageJob.load_yaml_config(deploy_3d_job_yaml)
train_job_yaml_obj = DeployImageJob.load_yaml_config(train_job_yaml)
# deploy_image_job_yaml_obj["computing"]["resource_type"] = "A100-80GB-SXM"
# deploy_image_job_yaml_obj["computing"]["device_type"] = "GPU"
# DeployImageJob.generate_yaml_doc(deploy_image_job_yaml_obj, deploy_image_job_yaml)
```

## Generate the job object
```
deploy_image_job = DeployImageJob(name="deploy_image_job", job_yaml_absolute_path=deploy_image_job_yaml)
deploy_3d_job = Deploy3DJob(name="deploy_3d_job", job_yaml_absolute_path=deploy_3d_job_yaml)
train_job = TrainJob(name="train_job", job_yaml_absolute_path=train_job_yaml)
```

## Define the workflow
```
workflow = Workflow(name="workflow_with_multi_jobs", loop=False)
```

## Add the job object to workflow and set the dependency (DAG based).
```
workflow.add_job(deploy_image_job)
#workflow.add_job(deploy_3d_job, dependencies=[deploy_image_job])
workflow.add_job(train_job, dependencies=[deploy_image_job])
```

## Run workflow
```
workflow.run()
```

## After the workflow finished, print the graph, nodes and topological order
```
print("graph", workflow.metadata.graph)
print("nodes", workflow.metadata.nodes)
print("topological_order", workflow.metadata.topological_order)
print("loop", workflow.loop)
```
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
import os

from fedml.workflow.workflow import Workflow
from fedml.workflow.workflow import JobStatus, Workflow
from fedml.workflow.customized_jobs.deploy_job import DeployJob
from fedml.workflow.customized_jobs.train_job import TrainJob

CURRENT_CONFIG_VERSION = "release"
CURRENT_ON_PREM_LOCAL_HOST = "localhost"
CURRENT_ON_PREM_LOCAL_PORT = 18080
MY_API_KEY = "1316b93c82da40ce90113a2ed12f0b14"
MY_API_KEY = "" # Here you need to set your API key from nexus.fedml.ai


class DeployImageJob(DeployJob):
def __init__(self, name, job_yaml_absolute_path=None):
Expand All @@ -20,7 +21,17 @@ def run(self):
super().run()

def status(self):
super().status()
current_status = super().status()
if current_status == JobStatus.FINISHED:
pass
elif current_status == JobStatus.FAILED:
pass
elif current_status == JobStatus.RUNNING:
pass
elif current_status == JobStatus.PROVISIONING:
pass

return current_status

def kill(self):
super().kill()
Expand All @@ -38,7 +49,17 @@ def run(self):
super().run()

def status(self):
super().status()
current_status = super().status()
if current_status == JobStatus.FINISHED:
pass
elif current_status == JobStatus.FAILED:
pass
elif current_status == JobStatus.RUNNING:
pass
elif current_status == JobStatus.PROVISIONING:
pass

return current_status

def kill(self):
super().kill()
Expand All @@ -56,7 +77,17 @@ def run(self):
super().run()

def status(self):
super().status()
current_status = super().status()
if current_status == JobStatus.FINISHED:
pass
elif current_status == JobStatus.FAILED:
pass
elif current_status == JobStatus.RUNNING:
pass
elif current_status == JobStatus.PROVISIONING:
pass

return current_status

def kill(self):
super().kill()
Expand Down Expand Up @@ -87,8 +118,8 @@ def kill(self):

# Add the job object to workflow and set the dependency (DAG based).
workflow.add_job(deploy_image_job)
workflow.add_job(deploy_3d_job, dependencies=[deploy_image_job])
workflow.add_job(train_job, dependencies=[deploy_3d_job])
#workflow.add_job(deploy_3d_job, dependencies=[deploy_image_job])
workflow.add_job(train_job, dependencies=[deploy_image_job])

# Run workflow
workflow.run()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ workspace: deploy_3d_job
job: |
echo "current job id: $FEDML_CURRENT_RUN_ID"
echo "current edge id: $FEDML_CURRENT_EDGE_ID"
echo "Hello, Here is the launch platform."
echo "Hello, Here is the FedML Nexus AI platform."
echo "Current directory is as follows."
pwd
sleep 3
Expand All @@ -22,7 +22,7 @@ bootstrap: |
echo "Bootstrap finished."
computing:
resource_type: A100-80GB-SXM # e.g., A100-80G, please check the resource type list by "fedml show-resource-type" or visiting URL: https://open.fedml.ai/accelerator_resource_type
resource_type: RTX-3090 # e.g., A100-80G, please check the resource type list by "fedml show-resource-type" or visiting URL: https://open.fedml.ai/accelerator_resource_type
minimum_num_gpus: 1 # minimum # of GPUs to provision
maximum_cost_per_hour: $10 # max cost per hour of all machines for your job
# device_type: GPU # GPU or CPU
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ server_external_port: 20202
server_internal_port: 2203

environment_variables:
NEXUS_API_KEY: ""
NEXUS_API_KEY: "" # Here you need to set your API key from nexus.fedml.ai

bootstrap: |
echo "Bootstrap start..."
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,14 +64,15 @@ def predict(self, request: dict):
response_text = self.chatbot.predict(instruction=question)

try:
unique_id = str(uuid.uuid4())
unique_id = "3D_MODEL_KEY"
with open(f"{unique_id}.txt", "w") as f:
f.write(question)
f.write("\n\n")
f.write(response_text)
f.write("\n\n")
fedml.api.upload(data_path=f"{unique_id}.txt", name=unique_id,
response = fedml.api.upload(data_path=f"{unique_id}.txt", name=unique_id,
api_key=os.environ.get("NEXUS_API_KEY", None), metadata={"type": "chatbot"})
print(f"upload response {response}")
except Exception as e:
pass

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ workspace: deploy_image_job
job: |
echo "current job id: $FEDML_CURRENT_RUN_ID"
echo "current edge id: $FEDML_CURRENT_EDGE_ID"
echo "Hello, Here is the launch platform."
echo "Hello, Here is the FedML Nexus AI platform."
echo "Current directory is as follows."
pwd
sleep 3
Expand All @@ -22,7 +22,7 @@ bootstrap: |
echo "Bootstrap finished."
computing:
resource_type: A100-80GB-SXM # e.g., A100-80G, please check the resource type list by "fedml show-resource-type" or visiting URL: https://open.fedml.ai/accelerator_resource_type
resource_type: RTX-3090 # e.g., A100-80G, please check the resource type list by "fedml show-resource-type" or visiting URL: https://open.fedml.ai/accelerator_resource_type
minimum_num_gpus: 1 # minimum # of GPUs to provision
maximum_cost_per_hour: $10 # max cost per hour of all machines for your job
# device_type: GPU # GPU or CPU
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ server_external_port: 20202
server_internal_port: 2203

environment_variables:
NEXUS_API_KEY: ""
NEXUS_API_KEY: "" # Here you need to set your API key from nexus.fedml.ai

bootstrap: |
echo "Bootstrap start..."
Expand Down
Loading

0 comments on commit 7a4d4e1

Please sign in to comment.