Skip to content

Commit

Permalink
Merge pull request #1887 from FedML-AI/test/v0.7.0
Browse files Browse the repository at this point in the history
Test/v0.7.0
  • Loading branch information
fedml-alex authored Feb 2, 2024
2 parents 0118e08 + f680e30 commit 532d452
Show file tree
Hide file tree
Showing 15 changed files with 411 additions and 9 deletions.
2 changes: 1 addition & 1 deletion python/fedml/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
_global_training_type = None
_global_comm_backend = None

__version__ = "0.8.18b10"
__version__ = "0.8.18b11"


# This is the deployment environment used for different roles (RD/PM/BD/Public Developers). Potential VALUE: local, dev, test, release
Expand Down
3 changes: 2 additions & 1 deletion python/fedml/api/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
"""
from typing import List, Tuple

from fedml.api.constants import RunStatus
from fedml.api.fedml_response import FedMLResponse
from fedml.api.modules import launch, utils, build, device, logs, diagnosis, cluster, run, train, federate, storage, \
model as model_module # Since "model" has conflict with one of the input parameters, we need to rename it
Expand Down Expand Up @@ -128,7 +129,7 @@ def run_list(run_name: str = None, run_id: str = None, platform: str = "falcon",


def run_status(run_name: str = None, run_id: str = None, platform: str = "falcon", api_key: str = None) -> (
FedMLRunModelList, str):
FedMLRunModelList, RunStatus):
return run.status(run_name=run_name, run_id=run_id, platform=platform, api_key=api_key)


Expand Down
37 changes: 36 additions & 1 deletion python/fedml/api/constants.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
from enum import Enum, unique


class ApiConstants:
RESOURCE_MATCHED_STATUS_MATCHED = "MATCHED"
RESOURCE_MATCHED_STATUS_JOB_URL_ERROR = "ERROR_JOB_URL"
Expand Down Expand Up @@ -71,4 +74,36 @@ class ApiConstants:
LAUNCH_JOB_STATUS_JOB_NOT_EXISTS: 25, LAUNCH_JOB_STATUS_MACHINE_STARTUP_FAILED: 26,
LAUNCH_JOB_STATUS_CREATE_PROJECT_FAILED: 27, LAUNCH_JOB_STATUS_PROJECT_NOT_EXISTS: 28,
LAUNCH_JOB_STATUS_DB_INSERT_ERROR: 29, LAUNCH_JOB_STATUS_OCCUPIED_FAILED: 30,
LAUNCH_JOB_STATUS_JOB_CONFIG_NOT_EXISTS: 31, LAUNCH_JOB_STATUS_GENERAL_ERROR: 32}
LAUNCH_JOB_STATUS_JOB_CONFIG_NOT_EXISTS: 31}


@unique
class RunStatus(Enum):
NOT_STARTED = "NOT_STARTED"
QUEUED = "QUEUED"
STARTING = "STARTING"
RUNNING = "RUNNING"
STOPPING = "STOPPING"
KILLED = "KILLED"
FAILED = "FAILED"
FINISHED = "FINISHED"
ABANDONED = "ABANDONED"
LAUNCHED = "LAUNCHED"
ERROR = "ERROR"
BLOCKED = "BLOCKED"
PRE_QUEUE = "PRE_QUEUE"
INVALID = "INVALID"
CLUSTER_QUEUE = "CLUSTER_QUEUE"
PROVISIONING = "PROVISIONING"
UNDETERMINED = "UNDETERMINED"

def __str__(self):
return self.value

@classmethod
def get_run_enum_from_str(cls, run_status_str: str):
for run_status in cls:
if run_status.value == run_status_str:
return run_status
return cls.UNDETERMINED

5 changes: 3 additions & 2 deletions python/fedml/api/modules/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
FedMLRunStartedModel, FedMLRunModelList, FeatureEntryPoint
from fedml.computing.scheduler.comm_utils.security_utils import get_api_key
from fedml.computing.scheduler.scheduler_entry.launch_manager import FedMLJobConfig
from fedml.api.constants import RunStatus


class RunLogResult(object):
Expand Down Expand Up @@ -74,7 +75,7 @@ def list_run(run_name: str, run_id: str, platform: str, api_key: str) -> FedMLRu
return run_list_obj


def status(run_name: str, run_id: str, platform: str, api_key: str) -> (FedMLRunModelList, str):
def status(run_name: str, run_id: str, platform: str, api_key: str) -> (FedMLRunModelList, RunStatus):
_authenticate_and_validate_platform(api_key, platform)

run_status = None
Expand All @@ -84,7 +85,7 @@ def status(run_name: str, run_id: str, platform: str, api_key: str) -> (FedMLRun
if len(run_list_obj.run_list) > 1:
raise Exception("Found more than one runs for the specified run name or run id.")

run_status = run_list_obj.run_list[0].status
run_status = RunStatus.get_run_enum_from_str(run_list_obj.run_list[0].status)

return run_list_obj, run_status

Expand Down
2 changes: 1 addition & 1 deletion python/fedml/cli/modules/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
"-pf",
type=str,
default="falcon",
help="The platform name at the FedML® Nexus AI Platform (options: octopus, parrot, spider, beehive, falcon, launch, "
help="The platform name at the FedML® Nexus AI Platform (options: octopus, parrot, spider, beehive, falcon, launch,"
"default is falcon).",
)
def fedml_run(api_key, version, platform):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

import collections.abc

import fedml
from fedml.computing.scheduler.comm_utils import sys_utils, security_utils
from fedml.computing.scheduler.comm_utils.container_utils import ContainerUtils
from fedml.computing.scheduler.comm_utils.job_utils import JobRunnerUtils
Expand Down Expand Up @@ -440,6 +441,12 @@ def start_deployment(end_point_id, end_point_name, model_id, model_version,
}
environment["MAIN_ENTRY"] = relative_entry
environment["BOOTSTRAP_DIR"] = dst_bootstrap_dir
environment["FEDML_CURRENT_RUN_ID"] = end_point_id
environment["FEDML_CURRENT_EDGE_ID"] = edge_id
environment["FEDML_CURRENT_VERSION"] = fedml.get_env_version()
environment["FEDML_ENV_VERSION"] = fedml.get_env_version()
environment["FEDML_ENV_LOCAL_ON_PREMISE_PLATFORM_HOST"] = fedml.get_local_on_premise_platform_host()
environment["FEDML_ENV_LOCAL_ON_PREMISE_PLATFORM_PORT"] = fedml.get_local_on_premise_platform_port()
logging.info(f"volume: {volumns}, binds: {binds}, environment: {environment}")
logging.info(f"dst_model_serving_dir: {dst_model_serving_dir}")
logging.info(f"relative_entry: {relative_entry}")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ def __init__(self, response: Response, data: dict, project_name: str = None, fed
application_name: str = None, job_type: str = None, inner_id: str = None, app_job_id: str = None,
app_job_name: str = None):
if data is not None:
self.run_id = data.get("job_id", "0")
self.run_id = data.get("job_id", "-1")
self.run_name = data.get("job_name", None)
self.project_id = data.get("project_id", None)
self.status = data.get("status", None)
Expand Down
Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
*.pyc
45 changes: 45 additions & 0 deletions python/fedml/workflow/driver_example/hello_world/hello_world.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
import os
import time

import fedml

if __name__ == "__main__":
print("Hi everyone, I am an launch job.")

print(f"current config is {fedml.get_env_version()}")

run_id = os.getenv('FEDML_CURRENT_RUN_ID', 0)
edge_id = os.getenv('FEDML_CURRENT_EDGE_ID', 0)

artifact = fedml.mlops.Artifact(name=f"general-file@{run_id}-{edge_id}", type=fedml.mlops.ARTIFACT_TYPE_NAME_GENERAL)
artifact.add_file("./requirements.txt")
artifact.add_dir("./config")
fedml.mlops.log_artifact(artifact)

fedml.mlops.log_model(f"model-file@{run_id}-{edge_id}", "requirements.txt")

artifact = fedml.mlops.Artifact(name=f"log-file@{run_id}-{edge_id}", type=fedml.mlops.ARTIFACT_TYPE_NAME_LOG)
artifact.add_file("./requirements.txt")
artifact.add_dir("./config")
fedml.mlops.log_artifact(artifact)

artifact = fedml.mlops.Artifact(name=f"source-file@{run_id}-{edge_id}", type=fedml.mlops.ARTIFACT_TYPE_NAME_SOURCE)
artifact.add_file("./requirements.txt")
artifact.add_dir("./config")
fedml.mlops.log_artifact(artifact)

artifact = fedml.mlops.Artifact(name=f"dataset-file@{run_id}-{edge_id}", type=fedml.mlops.ARTIFACT_TYPE_NAME_DATASET)
artifact.add_file("./requirements.txt")
artifact.add_dir("./config")
fedml.mlops.log_artifact(artifact)

acc = 0.1
loss = 2.0
for iter_count in range(10):
acc += 0.01
loss -= 0.02
fedml.mlops.log_metric({"acc": acc, "loss": loss})
time.sleep(2)


time.sleep(10)
29 changes: 29 additions & 0 deletions python/fedml/workflow/driver_example/hello_world_job.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
# Local directory where your source code resides.
# It should be the relative path to this job yaml file or the absolute path.
# If your job doesn't contain any source code, it can be empty.
workspace: hello_world

# Docker image name.
# It should be the full name of the image with tag.
# If you want to use the default image, it can be empty.
docker:
image: fedml/fedml-default-launch:cu12.1-u22.04

# Running entry commands which will be executed as the job entry point.
# Support multiple lines, which can not be empty.
job: |
echo "Hello, Here is the Falcon platform."
echo "Current directory is as follows."
pwd
python3 hello_world.py
# Bootstrap shell commands which will be executed before running entry commands.
# Support multiple lines, which can be empty.
bootstrap: |
pip install -r requirements.txt
echo "Bootstrap finished."
computing:
resource_type: H100-80GB-HBM3 # e.g., A100-80G, please check the resource type list by "fedml show-resource-type" or visiting URL: https://open.fedml.ai/accelerator_resource_type
minimum_num_gpus: 1 # minimum # of GPUs to provision
maximum_cost_per_hour: $0.5 # max cost per hour of all machines for your job
56 changes: 56 additions & 0 deletions python/fedml/workflow/driver_example/main.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
import logging

import fedml
import os
from fedml.workflow.jobs import Job, JobStatus
from fedml.workflow.workflow import Workflow


class HelloWorldJob(Job):
def __init__(self, name):
super().__init__(name)
self.run_id = None

def run(self):
fedml.set_env_version("test")
working_directory = os.path.dirname(os.path.abspath(__file__))
absolute_path = os.path.join(working_directory, "hello_world_job.yaml")
result = fedml.api.launch_job(yaml_file=absolute_path, api_key="30d1bbcae9ec48ffa314caa8e944d187")
if result.run_id and int(result.run_id) > 0:
self.run_id = result.run_id

def status(self):
if self.run_id:
try:
_, run_status = fedml.api.run_status(run_id=self.run_id, api_key="30d1bbcae9ec48ffa314caa8e944d187")
return JobStatus.get_job_status_from_run_status(run_status)
except Exception as e:
logging.error(f"Error while getting status of run {self.run_id}: {e}")
return JobStatus.UNDETERMINED

def kill(self):
if self.run_id:
try:
return fedml.api.run_stop(run_id=self.run_id, api_key="30d1bbcae9ec48ffa314caa8e944d187")
except Exception as e:
logging.error(f"Error while stopping run {self.run_id}: {e}")


if __name__ == "__main__":
job_1 = HelloWorldJob(name="hello_world")
job_2 = HelloWorldJob(name="hello_world_dependent_on_job_1")
workflow = Workflow(name="hello_world_workflow", loop=False)
workflow.add_job(job_1)
workflow.add_job(job_2, dependencies=[job_1])
workflow.run()

job_1 = HelloWorldJob(name="hello_world")
job_2 = HelloWorldJob(name="hello_world_dependent_on_job_1")
workflow = Workflow(name="hello_world_workflow", loop=False)
workflow.add_job(job_1)
workflow.add_job(job_2, dependencies=[job_1])
workflow.run()
print("graph", workflow.metadata.graph)
print("nodes", workflow.metadata.nodes)
print("topological_order", workflow.metadata.topological_order)
print("loop", workflow.loop)
75 changes: 75 additions & 0 deletions python/fedml/workflow/jobs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
from enum import Enum
from abc import ABC, abstractmethod
from fedml.api.constants import RunStatus


# Define an enum for job status
class JobStatus(Enum):

"""
Enum for job status
"""
PROVISIONING = "PROVISIONING"
RUNNING = "RUNNING"
FINISHED = "FINISHED"
FAILED = "FAILED"
UNDETERMINED = "UNDETERMINED"

@classmethod
def _create_run_status_to_job_status_mapping(cls):
cls._run_status_to_job_status_mapping = {
JobStatus.PROVISIONING: {RunStatus.NOT_STARTED, RunStatus.QUEUED, RunStatus.CLUSTER_QUEUE,
RunStatus.PRE_QUEUE, RunStatus.PROVISIONING},
JobStatus.RUNNING: {RunStatus.STARTING, RunStatus.RUNNING, RunStatus.LAUNCHED},
JobStatus.FINISHED: {RunStatus.FINISHED},
JobStatus.FAILED: {RunStatus.STOPPING, RunStatus.KILLED, RunStatus.FAILED, RunStatus.ABANDONED,
RunStatus.ERROR, RunStatus.BLOCKED, RunStatus.INVALID},
JobStatus.UNDETERMINED: {RunStatus.UNDETERMINED}
}

@classmethod
def get_job_status_from_run_status(cls, run_status: RunStatus):
if not hasattr(cls, "_run_status_to_job_status_mapping"):
cls._create_run_status_to_job_status_mapping()
for job_status, run_status_set in cls._run_status_to_job_status_mapping.items():
if run_status in run_status_set:
return job_status
return JobStatus.UNDETERMINED


class Job(ABC):

def __init__(self, name):
"""
Initialize the Job instance.
Parameters:
- name (str): Name for the job. This is used to identify the job in the workflow so it should be unique.
"""
self.name = name

def __repr__(self):
return "<{klass} @{id:x} {attrs}>".format(
klass=self.__class__.__name__,
id=id(self) & 0xFFFFFF,
attrs=" ".join("{}={!r}".format(k, v) for k, v in self.__dict__.items()),
)

@abstractmethod
def run(self):
"""
Abstract method to run the job. This method should contain the execution logic of the job.
"""

@abstractmethod
def status(self) -> JobStatus:
"""
Abstract method to get the status of the job.
Represents the status of the job, which should be of type JobStatus: Running, Success, or Failed.
"""

@abstractmethod
def kill(self):
"""
Method to kill the job if running on remote server.
"""
Loading

0 comments on commit 532d452

Please sign in to comment.