diff --git a/components/contrib/kubeflow/pytorch-launcher/Dockerfile b/components/contrib/kubeflow/pytorch-launcher/Dockerfile deleted file mode 100644 index 521fee988fd..00000000000 --- a/components/contrib/kubeflow/pytorch-launcher/Dockerfile +++ /dev/null @@ -1,23 +0,0 @@ -# Copyright 2021 The Kubeflow Authors -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -FROM python:3.6 - -ADD requirements.txt requirements.txt - -RUN pip install --no-cache-dir -r requirements.txt - -ADD build /ml - -ENTRYPOINT ["python", "/ml/launch_pytorchjob.py"] diff --git a/components/contrib/kubeflow/pytorch-launcher/build_image.sh b/components/contrib/kubeflow/pytorch-launcher/build_image.sh deleted file mode 100755 index b036d79a972..00000000000 --- a/components/contrib/kubeflow/pytorch-launcher/build_image.sh +++ /dev/null @@ -1,46 +0,0 @@ -#!/bin/bash -LAUNCHER_IMAGE_NAME_DEFAULT=kubeflow-pytorchjob-launcher - -while getopts ":hr:t:i:" opt; do - case "${opt}" in - h) echo "-r: repo name (including gcr.io/, etc., if not in Docker Hub)" - echo "-i: image name (default is $LAUNCHER_IMAGE_NAME_DEFAULT)" - echo "-t: image tag (default is inferred from date/git)" - exit - ;; - r) REPO_NAME=${OPTARG} - ;; - t) TAG_NAME=${OPTARG} - ;; - i) LAUNCHER_IMAGE_NAME=${OPTARG} - ;; - \? ) echo "Usage: cmd [-p] project [-t] tag [-i] image" - exit - ;; - esac -done - -# Apply defaults/interpret inputs -LAUNCHER_IMAGE_NAME=${LAUNCHER_IMAGE_NAME:-$LAUNCHER_IMAGE_NAME_DEFAULT} -TAG_NAME=${TAG_NAME:-$(date +v%Y%m%d)-$(git describe --tags --always --dirty)-$(git diff | shasum -a256 | cut -c -6)} - -if [ -n "${REPO_NAME}" ]; then - # Ensure ends with / - if [[ "$REPO_NAME" != */ ]]; then - REPO_NAME+=/ - fi -fi - -FULL_NAME=${REPO_NAME}${LAUNCHER_IMAGE_NAME}:${TAG_NAME} - -mkdir -p ./build -cp -R ./src/ ./build/ -cp -R ../common/ ./build/ - -echo "Building image $FULL_NAME" -docker build -t ${FULL_NAME} . - -echo "Pushing image $FULL_NAME" -docker push ${FULL_NAME} - -rm -rf ./build diff --git a/components/contrib/kubeflow/pytorch-launcher/component.yaml b/components/contrib/kubeflow/pytorch-launcher/component.yaml deleted file mode 100644 index b8f8e820347..00000000000 --- a/components/contrib/kubeflow/pytorch-launcher/component.yaml +++ /dev/null @@ -1,49 +0,0 @@ -description: Kubeflow PyTorchJob launcher -inputs: -- {name: name, type: String, description: 'PyTorchJob name.'} -- {name: namespace, type: String, default: kubeflow, description: 'PyTorchJob namespace (likely your current namespace).'} -- {name: version, type: String, default: v1, description: 'PyTorchJob version.'} -- {name: master_spec, type: JsonObject, default: '{}', description: 'PyTorchJob Master replicaSpecs.'} -- {name: worker_spec, type: JsonObject, default: '{}', description: 'PyTorchJob Worker replicaSpecs.'} -- {name: job_timeout_minutes, type: Integer, default: 1440, description: 'Time in minutes to wait for the job to complete.'} -- {name: delete_after_done, type: Boolean, default: 'True' , description: 'Whether to delete the job after it is finished.'} -- {name: clean_pod_policy, type: String, default: Running, description: 'Defines the policy for cleaning up pods after the PyTorchJob completes.'} -- {name: active_deadline_seconds, type: Integer, optional: true, description: 'Specifies the duration (in seconds) since startTime during which the job can remain active before it is terminated. Must be a positive integer. This setting applies only to pods where restartPolicy is OnFailure or Always.'} -- {name: backoff_limit, type: Integer, optional: true, description: 'Number of retries before marking this job as failed.'} -- {name: ttl_seconds_after_finished, type: Integer, optional: true, description: 'Defines the TTL for cleaning up finished PyTorchJobs.'} -implementation: - container: - image: cascribner/kubeflow-pytorchjob-launcher:v1 - command: [python, /ml/launch_pytorchjob.py] - args: - - --name - - {inputValue: name} - - --namespace - - {inputValue: namespace} - - --version - - {inputValue: version} - - --masterSpec - - {inputValue: master_spec} - - --workerSpec - - {inputValue: worker_spec} - - --jobTimeoutMinutes - - {inputValue: job_timeout_minutes} - - --deleteAfterDone - - {inputValue: delete_after_done} - - --cleanPodPolicy - - {inputValue: clean_pod_policy} - - if: - cond: {isPresent: active_deadline_seconds} - then: - - --activeDeadlineSeconds - - {inputValue: active_deadline_seconds} - - if: - cond: {isPresent: backoff_limit} - then: - - --backoffLimit - - {inputValue: backoff_limit} - - if: - cond: {isPresent: ttl_seconds_after_finished} - then: - - --ttlSecondsAfterFinished - - {inputValue: ttl_seconds_after_finished} diff --git a/components/contrib/kubeflow/pytorch-launcher/requirements.txt b/components/contrib/kubeflow/pytorch-launcher/requirements.txt deleted file mode 100644 index 2fc0c670117..00000000000 --- a/components/contrib/kubeflow/pytorch-launcher/requirements.txt +++ /dev/null @@ -1,4 +0,0 @@ -pyyaml -kubernetes -kubeflow-pytorchjob -retrying diff --git a/components/contrib/kubeflow/pytorch-launcher/sample.py b/components/contrib/kubeflow/pytorch-launcher/sample.py deleted file mode 100644 index 580d22eb82c..00000000000 --- a/components/contrib/kubeflow/pytorch-launcher/sample.py +++ /dev/null @@ -1,189 +0,0 @@ -import json -from typing import NamedTuple -from collections import namedtuple -import kfp -import kfp.dsl as dsl -from kfp import components -from kfp.dsl.types import Integer - - -def get_current_namespace(): - """Returns current namespace if available, else kubeflow""" - try: - current_namespace = open( - "/var/run/secrets/kubernetes.io/serviceaccount/namespace" - ).read() - except: - current_namespace = "kubeflow" - return current_namespace - - -def create_worker_spec( - worker_num: int = 0 -) -> NamedTuple( - "CreatWorkerSpec", [("worker_spec", dict)] -): - """ - Creates pytorch-job worker spec - """ - worker = {} - if worker_num > 0: - worker = { - "replicas": worker_num, - "restartPolicy": "OnFailure", - "template": { - "metadata": { - "annotations": { - "sidecar.istio.io/inject": "false" - } - }, - "spec": { - "containers": [ - { - "args": [ - "--backend", - "gloo", - ], - "image": "public.ecr.aws/pytorch-samples/pytorch_dist_mnist:latest", - "name": "pytorch", - "resources": { - "requests": { - "memory": "4Gi", - "cpu": "2000m", - # Uncomment for GPU - # "nvidia.com/gpu": 1, - }, - "limits": { - "memory": "4Gi", - "cpu": "2000m", - # Uncomment for GPU - # "nvidia.com/gpu": 1, - }, - }, - } - ] - }, - }, - } - - worker_spec_output = namedtuple( - "MyWorkerOutput", ["worker_spec"] - ) - return worker_spec_output(worker) - - -worker_spec_op = components.func_to_container_op( - create_worker_spec, - base_image="python:slim", -) - - -@dsl.pipeline( - name="launch-kubeflow-pytorchjob", - description="An example to launch pytorch.", -) -def mnist_train( - namespace: str = get_current_namespace(), - worker_replicas: int = 1, - ttl_seconds_after_finished: int = -1, - job_timeout_minutes: int = 600, - delete_after_done: bool = False, -): - pytorchjob_launcher_op = components.load_component_from_file( - "./component.yaml" - ) - - master = { - "replicas": 1, - "restartPolicy": "OnFailure", - "template": { - "metadata": { - "annotations": { - # See https://github.com/kubeflow/website/issues/2011 - "sidecar.istio.io/inject": "false" - } - }, - "spec": { - "containers": [ - { - # To override default command - # "command": [ - # "python", - # "/opt/mnist/src/mnist.py" - # ], - "args": [ - "--backend", - "gloo", - ], - # Or, create your own image from - # https://github.com/kubeflow/pytorch-operator/tree/master/examples/mnist - "image": "public.ecr.aws/pytorch-samples/pytorch_dist_mnist:latest", - "name": "pytorch", - "resources": { - "requests": { - "memory": "4Gi", - "cpu": "2000m", - # Uncomment for GPU - # "nvidia.com/gpu": 1, - }, - "limits": { - "memory": "4Gi", - "cpu": "2000m", - # Uncomment for GPU - # "nvidia.com/gpu": 1, - }, - }, - } - ], - # If imagePullSecrets required - # "imagePullSecrets": [ - # {"name": "image-pull-secret"}, - # ], - }, - }, - } - - worker_spec_create = worker_spec_op( - worker_replicas - ) - - # Launch and monitor the job with the launcher - pytorchjob_launcher_op( - # Note: name needs to be a unique pytorchjob name in the namespace. - # Using RUN_ID_PLACEHOLDER is one way of getting something unique. - name=f"name-{kfp.dsl.RUN_ID_PLACEHOLDER}", - namespace=namespace, - master_spec=master, - # pass worker_spec as a string because the JSON serializer will convert - # the placeholder for worker_replicas (which it sees as a string) into - # a quoted variable (eg a string) instead of an unquoted variable - # (number). If worker_replicas is quoted in the spec, it will break in - # k8s. See https://github.com/kubeflow/pipelines/issues/4776 - worker_spec=worker_spec_create.outputs[ - "worker_spec" - ], - ttl_seconds_after_finished=ttl_seconds_after_finished, - job_timeout_minutes=job_timeout_minutes, - delete_after_done=delete_after_done, - ) - - -if __name__ == "__main__": - import kfp.compiler as compiler - - pipeline_file = "test.tar.gz" - print( - f"Compiling pipeline as {pipeline_file}" - ) - compiler.Compiler().compile( - mnist_train, pipeline_file - ) - -# # To run: -# client = kfp.Client() -# run = client.create_run_from_pipeline_package( -# pipeline_file, -# arguments={}, -# run_name="test pytorchjob run" -# ) -# print(f"Created run {run}") diff --git a/components/contrib/kubeflow/pytorch-launcher/src/__init__.py b/components/contrib/kubeflow/pytorch-launcher/src/__init__.py deleted file mode 100644 index b4447dd5838..00000000000 --- a/components/contrib/kubeflow/pytorch-launcher/src/__init__.py +++ /dev/null @@ -1,13 +0,0 @@ -# Copyright 2021 The Kubeflow Authors -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. diff --git a/components/contrib/kubeflow/pytorch-launcher/src/launch_pytorchjob.py b/components/contrib/kubeflow/pytorch-launcher/src/launch_pytorchjob.py deleted file mode 100644 index c2c24d82fe5..00000000000 --- a/components/contrib/kubeflow/pytorch-launcher/src/launch_pytorchjob.py +++ /dev/null @@ -1,151 +0,0 @@ -import argparse -import datetime -from distutils.util import strtobool -import logging -import yaml - -from kubernetes import client as k8s_client -from kubernetes import config - -import launch_crd -from kubeflow.pytorchjob import V1PyTorchJob as V1PyTorchJob_original -from kubeflow.pytorchjob import V1PyTorchJobSpec as V1PyTorchJobSpec_original - - -def yamlOrJsonStr(string): - if string == "" or string is None: - return None - return yaml.safe_load(string) - - -def get_current_namespace(): - """Returns current namespace if available, else kubeflow""" - try: - namespace = "/var/run/secrets/kubernetes.io/serviceaccount/namespace" - current_namespace = open(namespace).read() - except FileNotFoundError: - current_namespace = "kubeflow" - return current_namespace - - -# Patch PyTorchJob APIs to align with k8s usage -class V1PyTorchJob(V1PyTorchJob_original): - def __init__(self, *args, **kwargs): - super().__init__(*args, **kwargs) - self.openapi_types = self.swagger_types - - -class V1PyTorchJobSpec(V1PyTorchJobSpec_original): - def __init__(self, *args, **kwargs): - super().__init__(*args, **kwargs) - self.openapi_types = self.swagger_types - - -def get_arg_parser(): - parser = argparse.ArgumentParser(description='Kubeflow Job launcher') - parser.add_argument('--name', type=str, - default="pytorchjob", - help='Job name.') - parser.add_argument('--namespace', type=str, - default=get_current_namespace(), - help='Job namespace.') - parser.add_argument('--version', type=str, - default='v1', - help='Job version.') - parser.add_argument('--activeDeadlineSeconds', type=int, - default=None, - help='Specifies the duration (in seconds) since startTime during which the job can remain active before it is terminated. Must be a positive integer. This setting applies only to pods where restartPolicy is OnFailure or Always.') - parser.add_argument('--backoffLimit', type=int, - default=None, - help='Number of retries before marking this job as failed.') - parser.add_argument('--cleanPodPolicy', type=str, - default="Running", - help='Defines the policy for cleaning up pods after the Job completes.') - parser.add_argument('--ttlSecondsAfterFinished', type=int, - default=None, - help='Defines the TTL for cleaning up finished Jobs.') - parser.add_argument('--masterSpec', type=yamlOrJsonStr, - default={}, - help='Job master replicaSpecs.') - parser.add_argument('--workerSpec', type=yamlOrJsonStr, - default={}, - help='Job worker replicaSpecs.') - parser.add_argument('--deleteAfterDone', type=strtobool, - default=True, - help='When Job done, delete the Job automatically if it is True.') - parser.add_argument('--jobTimeoutMinutes', type=int, - default=60*24, - help='Time in minutes to wait for the Job to reach end') - - # Options that likely wont be used, but left here for future use - parser.add_argument('--jobGroup', type=str, - default="kubeflow.org", - help='Group for the CRD, ex: kubeflow.org') - parser.add_argument('--jobPlural', type=str, - default="pytorchjobs", # We could select a launcher here and populate these automatically - help='Plural name for the CRD, ex: pytorchjobs') - parser.add_argument('--kind', type=str, - default='PyTorchJob', - help='CRD kind.') - return parser - - -def main(args): - logging.getLogger(__name__).setLevel(logging.INFO) - logging.info('Generating job template.') - - jobSpec = V1PyTorchJobSpec( - pytorch_replica_specs={ - 'Master': args.masterSpec, - 'Worker': args.workerSpec, - }, - active_deadline_seconds=args.activeDeadlineSeconds, - backoff_limit=args.backoffLimit, - clean_pod_policy=args.cleanPodPolicy, - ttl_seconds_after_finished=args.ttlSecondsAfterFinished, - ) - - api_version = f"{args.jobGroup}/{args.version}" - - job = V1PyTorchJob( - api_version=api_version, - kind=args.kind, - metadata=k8s_client.V1ObjectMeta( - name=args.name, - namespace=args.namespace, - ), - spec=jobSpec, - ) - - serialized_job = k8s_client.ApiClient().sanitize_for_serialization(job) - - logging.info('Creating launcher client.') - - config.load_incluster_config() - api_client = k8s_client.ApiClient() - launcher_client = launch_crd.K8sCR( - group=args.jobGroup, - plural=args.jobPlural, - version=args.version, - client=api_client - ) - - logging.info('Submitting CR.') - create_response = launcher_client.create(serialized_job) - - expected_conditions = ["Succeeded", "Failed"] - logging.info( - f'Monitoring job until status is any of {expected_conditions}.' - ) - launcher_client.wait_for_condition( - args.namespace, args.name, expected_conditions, - timeout=datetime.timedelta(minutes=args.jobTimeoutMinutes)) - if args.deleteAfterDone: - logging.info('Deleting job.') - launcher_client.delete(args.name, args.namespace) - - -if __name__ == "__main__": - parser = get_arg_parser() - args = parser.parse_args() - main(args) diff --git a/components/kubeflow/pytorch-launcher/sample.py b/components/kubeflow/pytorch-launcher/sample.py new file mode 100644 index 00000000000..5cd39fd8299 --- /dev/null +++ b/components/kubeflow/pytorch-launcher/sample.py @@ -0,0 +1,109 @@ +import kfp +from kfp import dsl +import uuid +import datetime +import logging +from kubernetes import config +from kubeflow.training import TrainingClient +from kubeflow.training.utils import utils + + +def get_current_namespace(): + """Returns current namespace if available, else kubeflow""" + try: + current_namespace = open( + "/var/run/secrets/kubernetes.io/serviceaccount/namespace" + ).read() + except: + current_namespace = "kubeflow" + return current_namespace + + +@dsl.component(packages_to_install=['kubernetes<31,>=8.0.0', 'kubeflow-training>=1.8.0', 'retrying>=1.3.3'], base_image="python:3.11") +def pytorch_job_launcher( + name: str, + kind: str = "PyTorchJob", + namespace: str = "kubeflow", + worker_replicas: int = 1, + job_timeout_minutes: int = 1440, + delete_after_done: bool = True, +): + args = ["--backend","gloo"] + resources = {"cpu": "2000m", "memory": "4Gi"} # add "nvidia.com/gpu": 1, for GPU + base_image="public.ecr.aws/pytorch-samples/pytorch_dist_mnist:latest" + + container_spec = utils.get_container_spec(base_image=base_image, name="pytorch", resources=resources, args=args) + spec = utils.get_pod_template_spec(containers=[container_spec]) + job_template = utils.get_pytorchjob_template(name=name, namespace=namespace, num_workers=worker_replicas, worker_pod_template_spec=spec, master_pod_template_spec=spec) + + + logging.getLogger(__name__).setLevel(logging.INFO) + logging.info('Generating job template.') + + logging.info('Creating TrainingClient.') + + # remove one of these depending on where you are running this + config.load_incluster_config() + #config.load_kube_config() + + training_client = TrainingClient() + + logging.info(f"Creating PyTorchJob in namespace: {namespace}") + training_client.create_job(job_template, namespace=namespace) + + expected_conditions = ["Succeeded", "Failed"] + logging.info( + f'Monitoring job until status is any of {expected_conditions}.' + ) + training_client.wait_for_job_conditions( + name=name, + namespace=namespace, + job_kind=kind, + expected_conditions=set(expected_conditions), + timeout=int(datetime.timedelta(minutes=job_timeout_minutes).total_seconds()) + ) + if delete_after_done: + logging.info('Deleting job after completion.') + training_client.delete_job(name, namespace) + + +@dsl.pipeline( + name="launch-kubeflow-pytorchjob", + description="An example to launch pytorch.", +) +def pytorch_job_pipeline( + kind: str = "PyTorchJob", + worker_replicas: int = 1, +): + + namespace = get_current_namespace() + + result = pytorch_job_launcher( + name=f"mnist-train-{uuid.uuid4().hex[:8]}", + kind=kind, + namespace=namespace, + version="v1", + worker_replicas=worker_replicas + ) + + +if __name__ == "__main__": + import kfp.compiler as compiler + + pipeline_file = "test.yaml" + print( + f"Compiling pipeline as {pipeline_file}" + ) + compiler.Compiler().compile( + pytorch_job_pipeline, pipeline_file + ) + + # To run: + host="http://localhost:8080" + client = kfp.Client(host=host) + run = client.create_run_from_pipeline_package( + pipeline_file, + arguments={}, + run_name="test pytorchjob run" + ) + print(f"Created run {run}")