Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add E2E test for gang-scheduling #1736

Merged
merged 3 commits into from
Jan 24, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 13 additions & 2 deletions .github/workflows/integration-tests.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,24 @@ name: integration test
on:
- pull_request

concurrency:
group: ${{ github.workflow }}-${{ github.ref }}
cancel-in-progress: true
Comment on lines +5 to +7
Copy link
Member Author

@tenzen-y tenzen-y Jan 22, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Adding these parameters, when we push commits to the same PR, the already running Jobs are forcefully stopped, and new Jobs are started.

So we can reduce CI run times.


jobs:
integration-test:
runs-on: ubuntu-latest
strategy:
fail-fast: false
matrix:
kubernetes-version: ["v1.23.12", "v1.24.6", "v1.25.2"]
# TODO (tenzen-y): Add volcano.
gang-scheduler-name: ["none", "scheduler-plugins"]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good idea to extend to volcano in the future

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, we should run E2E with volcano. Although, we can follow up on other PRs.
So, I will create an issue to keep tracking this.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I created #1738.


steps:
- name: Checkout
uses: actions/checkout@v3

- name: Create k8s Kind Cluster
uses: helm/[email protected]
with:
Expand All @@ -25,15 +32,19 @@ jobs:
./scripts/gha/build-image.sh
env:
TRAINING_CI_IMAGE: kubeflowtraining/training-operator:test

- name: Deploy training operator
run: |
./scripts/gha/setup-training-operator.sh
env:
KIND_CLUSTER: training-operator-cluster
TRAINING_CI_IMAGE: kubeflowtraining/training-operator:test
GANG_SCHEDULER_NAME: ${{ matrix.gang-scheduler-name }}
KUBERNETES_VERSION: ${{ matrix.kubernetes-version }}

- name: Run tests
run: |
pip install pytest
python3 -m pip install -e sdk/python; pytest sdk/python/test --log-cli-level=info
env:
GANG_SCHEDULER_NAME: ${{ matrix.gang-scheduler-name }}
2 changes: 1 addition & 1 deletion pkg/controller.v1/mxnet/mxjob_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -372,7 +372,7 @@ func (r *MXJobReconciler) UpdateJobStatus(job interface{}, replicas map[commonv1
}
}

//check whether mxnet singleHost training
// check whether mxnet singleHost training
singleTraining := r.isSingleWorker(replicas)

for rtype, spec := range replicas {
Expand Down
29 changes: 28 additions & 1 deletion scripts/gha/setup-training-operator.sh
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ set -o errexit
set -o nounset
set -o pipefail


echo "Kind load newly locally built image"
# use cluster name which is used in github actions kind create
kind load docker-image ${TRAINING_CI_IMAGE} --name ${KIND_CLUSTER}
Expand All @@ -34,11 +33,39 @@ kustomize edit set image kubeflow/training-operator=${TRAINING_CI_IMAGE}
echo "Installing training operator manifests"
kustomize build . | kubectl apply -f -

if [ "${GANG_SCHEDULER_NAME}" = "scheduler-plugins" ]; then
echo "Installing Scheduler Plugins..."
# We need to use latest helm chart since older helm chart has bugs in RBAC.
git clone https://github.com/kubernetes-sigs/scheduler-plugins.git
pushd scheduler-plugins/manifests/install/charts

# We need to use a values.yaml for v1.23 if K8S cluster version is v1.23.x since latest helm chart does not have compatible with v1.23.
# TODO (tenzen-y): Once we stop supporting v1.23, we must remove the below:
K8S_MINOR=$(echo "${KUBERNETES_VERSION}" | cut -d . -f 2)
if [ "$K8S_MINOR" = "23" ]; then \
helm install \
-f https://raw.githubusercontent.com/kubernetes-sigs/scheduler-plugins/release-1.23/manifests/install/charts/as-a-second-scheduler/values.yaml \
scheduler-plugins as-a-second-scheduler/
else
helm install scheduler-plugins as-a-second-scheduler/
fi
popd
rm -rf scheduler-plugins

echo "Configure gang-scheduling using scheduler-plugins to training-operator"
kubectl patch -n kubeflow deployments training-operator --type='json' \
-p='[{"op": "add", "path": "/spec/template/spec/containers/0/command/1", "value": "--gang-scheduler-name=scheduler-plugins"}]'
fi

TIMEOUT=30
until kubectl get pods -n kubeflow | grep training-operator | grep 1/1 || [[ $TIMEOUT -eq 1 ]]; do
sleep 10
TIMEOUT=$(( TIMEOUT - 1 ))
done
if [ "${GANG_SCHEDULER_NAME}" = "scheduler-plugins" ]; then
kubectl wait pods --for=condition=ready -n scheduler-plugins --timeout "${TIMEOUT}s" --all
fi

kubectl version
kubectl cluster-info
kubectl get nodes
Expand Down
174 changes: 170 additions & 4 deletions sdk/python/kubeflow/training/api/training_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -752,12 +752,12 @@ def list_tfjobs(

Args:
namespace: Namespace to list the TFJobs.
timeout: Optional, Kubernetes API server timeout in seconds
to execute the request.

Returns:
list[KubeflowOrgV1TFJob]: List of TFJobs objects. It returns
empty list if TFJobs cannot be found.
timeout: Optional, Kubernetes API server timeout in seconds
to execute the request.

Raises:
TimeoutError: Timeout to list TFJobs.
Expand Down Expand Up @@ -802,6 +802,33 @@ def delete_tfjob(
delete_options=delete_options,
)

def patch_tfjob(
self,
tfjob: models.KubeflowOrgV1TFJob,
name: str,
namespace: str = utils.get_default_target_namespace(),
andreyvelich marked this conversation as resolved.
Show resolved Hide resolved
):
"""Patch the TFJob.

Args:
tfjob: TFJob object of type KubeflowOrgV1TFJob to patch.
name: Name for the TFJob.
namespace: Namespace for the TFJob.

Raises:
TimeoutError: Timeout to patch TFJob.
RuntimeError: Failed to patch TFJob.
"""

return utils.patch_job(
custom_api=self.custom_api,
job=tfjob,
name=name,
namespace=namespace,
job_kind=constants.TFJOB_KIND,
job_plural=constants.TFJOB_PLURAL,
)

# ------------------------------------------------------------------------ #
# PyTorchJob Training Client APIs.
# ------------------------------------------------------------------------ #
Expand Down Expand Up @@ -1000,6 +1027,33 @@ def delete_pytorchjob(
delete_options=delete_options,
)

def patch_pytorchjob(
self,
pytorchjob: models.KubeflowOrgV1PyTorchJob,
name: str,
namespace: str = utils.get_default_target_namespace(),
):
"""Patch the PyTorchJob.

Args:
pytorchjob: PyTorchJob object of type KubeflowOrgV1PyTorchJob.
name: Name for the PyTorchJob.
namespace: Namespace for the PyTorchJob.

Raises:
TimeoutError: Timeout to patch PyTorchJob.
RuntimeError: Failed to patch PyTorchJob.
"""

return utils.patch_job(
custom_api=self.custom_api,
job=pytorchjob,
name=name,
namespace=namespace,
job_kind=constants.PYTORCHJOB_KIND,
job_plural=constants.PYTORCHJOB_PLURAL,
)

# ------------------------------------------------------------------------ #
# MXJob Training Client APIs.
# ------------------------------------------------------------------------ #
Expand Down Expand Up @@ -1044,6 +1098,8 @@ def get_mxjob(
Args:
name: Name for the MXJob.
namespace: Namespace for the MXJob.
timeout: Optional, Kubernetes API server timeout in seconds
to execute the request.

Returns:
KubeflowOrgV1MXJob: MXJob object.
Expand Down Expand Up @@ -1123,6 +1179,33 @@ def delete_mxjob(
delete_options=delete_options,
)

def patch_mxjob(
self,
mxjob: models.KubeflowOrgV1MXJob,
name: str,
namespace: str = utils.get_default_target_namespace(),
):
"""Patch the MXJob.

Args:
mxjob: MXJob object of type KubeflowOrgV1MXJob.
name: Name for the MXJob.
namespace: Namespace for the MXJob.

Raises:
TimeoutError: Timeout to patch MXJob.
RuntimeError: Failed to patch MXJob.
"""

return utils.patch_job(
custom_api=self.custom_api,
job=mxjob,
name=name,
namespace=namespace,
job_kind=constants.MXJOB_KIND,
job_plural=constants.MXJOB_PLURAL,
)

# ------------------------------------------------------------------------ #
# XGBoostJob Training Client APIs.
# ------------------------------------------------------------------------ #
Expand Down Expand Up @@ -1248,6 +1331,33 @@ def delete_xgboostjob(
delete_options=delete_options,
)

def patch_xgboostjob(
self,
xgboostjob: models.KubeflowOrgV1XGBoostJob,
name: str,
namespace: str = utils.get_default_target_namespace(),
):
"""Patch the XGBoostJob.

Args:
xgboostjob: XGBoostJob object of type KubeflowOrgV1XGBoostJob.
name: Name for the XGBoostJob.
namespace: Namespace for the XGBoostJob.

Raises:
TimeoutError: Timeout to patch XGBoostJob.
RuntimeError: Failed to patch XGBoostJob.
"""

return utils.patch_job(
custom_api=self.custom_api,
job=xgboostjob,
name=name,
namespace=namespace,
job_kind=constants.XGBOOSTJOB_KIND,
job_plural=constants.XGBOOSTJOB_PLURAL,
)

# ------------------------------------------------------------------------ #
# MPIJob Training Client APIs.
# ------------------------------------------------------------------------ #
Expand Down Expand Up @@ -1323,12 +1433,12 @@ def list_mpijobs(

Args:
namespace: Namespace to list the MPIJobs.
timeout: Optional, Kubernetes API server timeout in seconds
to execute the request.

Returns:
list[KubeflowOrgV1MPIJob]: List of MPIJobs objects. It returns
empty list if MPIJobs cannot be found.
timeout: Optional, Kubernetes API server timeout in seconds
to execute the request.
Comment on lines 1434 to -1331
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The timeout seems to be Args, not Returns.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice catch!


Raises:
TimeoutError: Timeout to list MPIJobs.
Expand Down Expand Up @@ -1373,6 +1483,33 @@ def delete_mpijob(
delete_options=delete_options,
)

def patch_mpijob(
self,
mpijob: models.KubeflowOrgV1MPIJob,
name: str,
namespace: str = utils.get_default_target_namespace(),
):
"""Patch the MPIJob.

Args:
mpijob: MPIJob object of type KubeflowOrgV1MPIJob.
name: Name for the MPIJob.
namespace: Namespace for the MPIJob.

Raises:
TimeoutError: Timeout to patch MPIJob.
RuntimeError: Failed to patch MPIJob.
"""

return utils.patch_job(
custom_api=self.custom_api,
job=mpijob,
name=name,
namespace=namespace,
job_kind=constants.MPIJOB_KIND,
job_plural=constants.MPIJOB_PLURAL,
)

# ------------------------------------------------------------------------ #
# PaddleJob Training Client APIs.
# ------------------------------------------------------------------------ #
Expand Down Expand Up @@ -1417,6 +1554,8 @@ def get_paddlejob(
Args:
name: Name for the PaddleJob.
namespace: Namespace for the PaddleJob.
timeout: Optional, Kubernetes API server timeout in seconds
to execute the request.

Returns:
KubeflowOrgV1PaddleJob: PaddleJob object.
Expand Down Expand Up @@ -1495,3 +1634,30 @@ def delete_paddlejob(
job_plural=constants.PADDLEJOB_PLURAL,
delete_options=delete_options,
)

def patch_paddlejob(
self,
paddlejob: models.KubeflowOrgV1PaddleJob,
name: str,
namespace: str = utils.get_default_target_namespace(),
):
"""Patch the PaddleJob.

Args:
paddlejob: PaddleJob object of type KubeflowOrgV1PaddleJob.
name: Name for the PaddleJob.
namespace: Namespace for the PaddleJob.

Raises:
TimeoutError: Timeout to patch PaddleJob.
RuntimeError: Failed to patch PaddleJob.
"""

return utils.patch_job(
custom_api=self.custom_api,
job=paddlejob,
name=name,
namespace=namespace,
job_kind=constants.PADDLEJOB_KIND,
job_plural=constants.PADDLEJOB_PLURAL,
)
1 change: 0 additions & 1 deletion sdk/python/kubeflow/training/constants/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,6 @@
"docker.io/paddlepaddle/paddle:2.4.0rc0-gpu-cuda11.2-cudnn8.1-trt8.0"
)


# Dictionary to get plural and model for each Job kind.
JOB_KINDS = {
TFJOB_KIND: {"plural": TFJOB_PLURAL, "model": models.KubeflowOrgV1TFJob},
Expand Down
31 changes: 31 additions & 0 deletions sdk/python/kubeflow/training/utils/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,37 @@ def delete_job(
logging.info(f"{job_kind} {namespace}/{name} has been deleted")


def patch_job(
custom_api: client.CustomObjectsApi,
job: object,
name: str,
namespace: str,
job_kind: str,
job_plural: str,
):
"""Patch the Training Job."""

try:
custom_api.patch_namespaced_custom_object(
constants.KUBEFLOW_GROUP,
constants.OPERATOR_VERSION,
namespace,
job_plural,
name,
job,
)
except multiprocessing.TimeoutError:
raise TimeoutError(
f"Timeout to patch {job_kind}: {namespace}/{job.metadata.name}"
)
except Exception:
raise RuntimeError(
f"Failed to patch {job_kind}: {namespace}/{job.metadata.name}"
)

logging.info(f"{job_kind} {namespace}/{job.metadata.name} has been patched")


def wrap_log_stream(q, stream):
while True:
try:
Expand Down
Loading