Skip to content

Commit

Permalink
feat(rm): convert Kubernetes submissions from pods to jobs (#9438)
Browse files Browse the repository at this point in the history
This change updates the Kubernetes resource manager to submit one Kubernetes job per Determined allocation instead of many pods. This is complicated but we think it is worth it because:
- Jobs play nice with resource quotas and other Kubernetes features out of the box.
- Eventually we can delegate restarts, TTL, pause/resume (using suspend), and more to jobs.
- They allow us to better integrate with Kueue and other tools in the ml ecosystem.
- Supporting VolcanoJobs (or similar alternatives) alongside Jobs is realistic.
- The refactor is net positive w.r.t. test coverage (20% to 80%) and code quality.

This commit is the result of several PRs, enumerated here for easier discovery.
- #9296 contains most of the code changes.
- #9443 
- #9447 
- #9450 
- #9451

Co-authored-by: Carolina Calderon <[email protected]>
  • Loading branch information
stoksc and carolinaecalderon authored May 31, 2024
1 parent f54fb7c commit 0fdb822
Show file tree
Hide file tree
Showing 42 changed files with 4,391 additions and 3,810 deletions.
10 changes: 10 additions & 0 deletions docs/release-notes/pods-to-jobs.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
:orphan:

**New Features**

- Kubernetes: The system now launches Kubernetes jobs on behalf of users when they submit workloads
to Determined, instead of launching Kubernetes pods. This change allows Determined to work
properly with other Kubernetes features like resource quotas.

As a result, permissions are now required to create, get, list, delete, and watch Kubernetes job
resources.
19 changes: 3 additions & 16 deletions docs/setup-cluster/k8s/_index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,10 @@ Determined master and a Postgres database in the Kubernetes cluster. Once the ma
running, you can launch :ref:`experiments <experiments>`, :ref:`notebooks <notebooks>`,
:ref:`TensorBoards <tensorboards>`, :ref:`commands <commands-and-shells>`, and :ref:`shells
<commands-and-shells>`. When new workloads are submitted to the Determined master, the master
launches pods and configMaps on the Kubernetes cluster to execute those workloads. Users of
launches jobs and config maps on the Kubernetes cluster to execute those workloads. Users of
Determined shouldn't need to interact with Kubernetes directly after installation, as Determined
handles all the necessary interaction with the Kubernetes cluster.
handles all the necessary interaction with the Kubernetes cluster. Kubernetes creates and cleans up
pods for all jobs that Determined may request.

It is also important to note that when running Determined on Kubernetes, a higher priority value
means a higher priority (e.g. a priority 50 task will run before a priority 40 task). This is
Expand Down Expand Up @@ -138,20 +139,6 @@ for diagnosing any issues that arise during installation.
# Get logs for the pod running the Determined master.
kubectl logs <determined-master-pod-name>
Get All Running Task Pods
=========================

These ``kubectl`` commands list and delete pods which are running Determined tasks:

.. code:: bash
# Get all pods that are running Determined tasks.
kubectl get pods -l=determined
# Delete all Determined task pods. Users should never have to run this,
# unless they are removing a deployment of Determined.
kubectl get pods --no-headers=true -l=determined | awk '{print $1}' | xargs kubectl delete pod
.. toctree::
:maxdepth: 1
:hidden:
Expand Down
4 changes: 2 additions & 2 deletions docs/setup-cluster/k8s/custom-pod-specs.rst
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@
#################

In a :ref:`Determined cluster running on Kubernetes <determined-on-kubernetes>`, tasks (e.g.,
experiments, notebooks) are executed by launching one or more Kubernetes pods. You can customize
these pods by providing custom `pod specs
experiments, notebooks) are executed by launching a Kubernetes job. These jobs launch one or more
Kubernetes pods. You can customize these pods by providing custom `pod specs
<https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.23/#pod-v1-core>`__. Common use
cases include assigning pods to specific nodes, specifying additional volume mounts, and attaching
permissions. Configuring pod specs is not required to use Determined on Kubernetes.
Expand Down
27 changes: 19 additions & 8 deletions docs/setup-cluster/k8s/helm-commands.rst
Original file line number Diff line number Diff line change
Expand Up @@ -76,20 +76,31 @@ for diagnosing any issues that arise during installation.
# Get logs for the pod running the Determined master.
kubectl logs <determined-master-pod-name>
***************************
Get All Running Task Pods
***************************
*********************************************
Get All Determined-launched Kubernetes Jobs
*********************************************

These ``kubectl`` commands list and delete pods which are running Determined tasks:
On Determined with Kubernetes, tasks start their own jobs, which have associated pods. These
``kubectl`` commands list and delete pods which are running Determined tasks:

.. code:: bash
# Get all pods that are running Determined tasks.
kubectl get pods -l=determined
# Get all jobs that are running Determined tasks.
kubectl get jobs -l=determined
# Get all pods associated with a given job.
kubectl get pods -l="batch.kubernetes.io/job-name=<determined-job-name>"
# Delete all Determined task pods. Users should never have to run this,
# Delete all Determined jobs for all tasks for ALL clusters. Users should never have to run this,
# unless they are removing a deployment of Determined.
kubectl get pods --no-headers=true -l=determined | awk '{print $1}' | xargs kubectl delete pod
kubectl get jobs --no-headers=true -l=determined | awk '{print $1}' | xargs kubectl delete jobs
# Get logs for a Determined task that make it to STDOUT or STDERR. Most logs are shipped to the
# Determined API server but logs that can't be shipped still go here. This is useful for debugging
# log shipping failures.
# For Determined tasks that require multiple pods, this will return logs for only one pod. It is
# recommended that you search the logs for each pod individually.
kubectl logs jobs/<determined-job-name>
***************************
Useful Debugging Commands
Expand Down
77 changes: 74 additions & 3 deletions harness/determined/exec/prep_container.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,60 @@ def do_rendezvous_slurm(
)


def do_rendezvous_kubernetes(
sess: api.Session,
allocation_id: str,
resources_id: str,
) -> "det.RendezvousInfo":
job_parallelism_str = os.environ.get("DET_KUBERNETES_JOB_PARALLELISM")
assert job_parallelism_str, "Unable to rendezvous without DET_KUBERNETES_JOB_PARALLELISM"
job_parallelism = int(job_parallelism_str)

pod_ip_str = os.environ.get("DET_KUBERNETES_POD_IP")
assert pod_ip_str, "Unable to rendezvous without DET_KUBERNETES_POD_IP"

num_slots_str = os.environ.get("DET_SLOT_IDS")
assert num_slots_str, "Unable to rendezvous without DET_SLOT_IDS"
num_slots = len(json.loads(os.environ["DET_SLOT_IDS"]))

request_uuid = str(uuid.uuid4())
resp = bindings.post_AllocationAllGather(
sess,
allocationId=allocation_id,
body=bindings.v1AllocationAllGatherRequest(
allocationId=allocation_id,
requestUuid=request_uuid,
numPeers=job_parallelism,
data={
# We use the lexigraphical order of request IDs to
# agree on ranks among peers, so they all need it.
"request_uuid": request_uuid,
"rendezvous_ip": pod_ip_str,
"slots": num_slots,
},
),
)

# TODO(RM-306): Use indexed completions and JOB_COMPLETION_INDEX to get pod rank.
data_by_rank = []
our_rank = None
for i, d in enumerate(sorted(resp.data, key=lambda d: str(d["request_uuid"]))):
if d["request_uuid"] == request_uuid:
our_rank = i
data_by_rank.append(d)
assert our_rank is not None, "rendezvous was missing our own information"
assert len(data_by_rank) == job_parallelism, "didn't receive enough peers from rendezvous"

addrs = [d["rendezvous_ip"] for d in data_by_rank]
slots = [d["slots"] for d in data_by_rank]

return det.RendezvousInfo(
container_addrs=addrs,
container_rank=our_rank,
container_slot_counts=slots,
)


# On HPC, the "launcher" tells the Determined Master that the job is "Running"
# as soon as the workload manager (e.g., Slurm, PBS, etc) starts running the job.
# However, if the container is not already cached on the compute node, it will
Expand Down Expand Up @@ -194,7 +248,7 @@ def get_eth_interface_name() -> Optional[str]:


# The canonical definitions of these consts live in Go code.
RESOURCES_TYPE_K8S_POD = "k8s-pod"
RESOURCES_TYPE_K8S_JOB = "k8s-job"
RESOURCES_TYPE_DOCKER_CONTAINER = "docker-container"
RESOURCES_TYPE_SLURM_JOB = "slurm-job"

Expand All @@ -207,10 +261,12 @@ def do_rendezvous(sess: api.Session, allocation_id: str) -> None:
assert r_type, "Unable to complete rendezvous info without DET_RESOURCES_TYPE"

rendezvous_info = None
if r_type == RESOURCES_TYPE_DOCKER_CONTAINER or r_type == RESOURCES_TYPE_K8S_POD:
if r_type == RESOURCES_TYPE_DOCKER_CONTAINER:
rendezvous_info = do_rendezvous_rm_provided(sess, allocation_id, r_id)
elif r_type == RESOURCES_TYPE_SLURM_JOB:
rendezvous_info = do_rendezvous_slurm(sess, allocation_id, r_id)
elif r_type == RESOURCES_TYPE_K8S_JOB:
rendezvous_info = do_rendezvous_kubernetes(sess, allocation_id, r_id)
else:
raise ValueError(f"unsupported resources type: {r_type}")

Expand Down Expand Up @@ -251,14 +307,29 @@ def set_proxy_address(sess: api.Session, allocation_id: str) -> None:
)


def set_proxy_address_kubernetes(sess: api.Session, allocation_id: str) -> None:
pod_ip_str = os.environ.get("DET_KUBERNETES_POD_IP")
assert pod_ip_str, "Unable to complete rendezvous without DET_KUBERNETES_POD_IP"

bindings.post_PostAllocationProxyAddress(
sess,
allocationId=allocation_id,
body=bindings.v1PostAllocationProxyAddressRequest(
proxyAddress=pod_ip_str,
),
)


def do_proxy(sess: api.Session, allocation_id: str) -> None:
r_type = os.environ.get("DET_RESOURCES_TYPE")
assert r_type, "Unable to complete rendezvous info without DET_RESOURCES_TYPE"

if r_type == RESOURCES_TYPE_DOCKER_CONTAINER or r_type == RESOURCES_TYPE_K8S_POD:
if r_type == RESOURCES_TYPE_DOCKER_CONTAINER:
return
elif r_type == RESOURCES_TYPE_SLURM_JOB:
set_proxy_address(sess, allocation_id)
elif r_type == RESOURCES_TYPE_K8S_JOB:
set_proxy_address_kubernetes(sess, allocation_id)
else:
raise ValueError(f"unsupported resources type: {r_type}")

Expand Down
3 changes: 3 additions & 0 deletions helm/charts/determined/templates/master-permissions.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@ rules:
- apiGroups: [""]
resources: ["nodes"]
verbs: ["list", "watch", "patch"]
- apiGroups: ["batch"]
resources: ["jobs"]
verbs: ["create", "get", "list", "delete", "watch"]


---
Expand Down
2 changes: 1 addition & 1 deletion master/.gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ vendor/
# Test binary, build with `go test -c`
*.test

# Output of the go coverage tool, specifically when used with LiteIDE
# Output of the go coverage tool
*.out

# VS Code Workspace
Expand Down
4 changes: 3 additions & 1 deletion master/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ SCHEMA_INPUTS = ../schemas/gen.py $(shell find ./pkg/schemas/ -name 'zgen_*.go'
STREAM_INPUTS = $(shell find ./internal/stream/ -name '*_test.go' -prune -o -name '*.go' -print)
STREAM_PYTHON_CLIENT = ../harness/determined/common/streams/wire.py
STREAM_TS_CLIENT = ../webui/react/src/services/stream/wire.ts
MOCK_INPUTS = ./internal/sproto/task.go ./internal/db/database.go ./internal/command/authz_iface.go ../go.mod ../go.sum ./internal/rm/resource_manager_iface.go ./internal/task/allocation_service_iface.go
MOCK_INPUTS = Makefile ./internal/sproto/task.go ./internal/db/database.go ./internal/command/authz_iface.go ../go.mod ../go.sum ./internal/rm/resource_manager_iface.go ./internal/task/allocation_service_iface.go
GORELEASER = goreleaser

export VERSION := $(shell cat ../VERSION)
Expand Down Expand Up @@ -99,11 +99,13 @@ build/mock_gen.stamp: $(MOCK_INPUTS)
mockery --quiet --name=PodInterface --srcpkg=k8s.io/client-go/kubernetes/typed/core/v1 --output internal/mocks --filename pod_iface.go
mockery --quiet --name=EventInterface --srcpkg=k8s.io/client-go/kubernetes/typed/core/v1 --output internal/mocks --filename event_iface.go
mockery --quiet --name=NodeInterface --srcpkg=k8s.io/client-go/kubernetes/typed/core/v1 --output internal/mocks --filename node_iface.go
mockery --quiet --name=JobInterface --srcpkg=k8s.io/client-go/kubernetes/typed/batch/v1 --output internal/mocks --filename job_iface.go
mockery --quiet --name=ResourceManager --dir=internal/rm --output internal/mocks --filename rm.go
mockery --quiet --name=AllocationService --dir=internal/task --output internal/mocks/allocationmocks --filename allocation_service.go --outpkg allocationmocks
mockery --quiet --name=ResourceManagerAuthZ --dir=internal/rm --output internal/mocks --filename rm_authz_iface.go
mockery --quiet --name=Interface --output internal/mocks --srcpkg "k8s.io/client-go/kubernetes" --filename k8s_clientset.go --structname K8sClientsetInterface
mockery --quiet --name=CoreV1Interface --output internal/mocks --srcpkg "k8s.io/client-go/kubernetes/typed/core/v1" --filename k8s_corev1_iface.go --structname K8sCoreV1Interface
mockery --quiet --name=BatchV1Interface --output internal/mocks --srcpkg "k8s.io/client-go/kubernetes/typed/batch/v1" --filename k8s_batchv1_iface.go --structname K8sBatchV1Interface

mkdir -p build
touch $@
Expand Down
2 changes: 1 addition & 1 deletion master/internal/db/postgres_agent_intg_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ func TestEndAllAgentStats(t *testing.T) {
setTimesTo(a0, a0Start, nil)

// Cluster heartbeat between these.
// TODO(!!!) make cluster heartbeat a timestamptz.
// TODO(nickb): make cluster heartbeat a timestamptz.
_, err := db.GetOrCreateClusterID("")
require.NoError(t, err)
heartBeatTime := time.Date(2021, 10, 10, 0, 0, 0, 0, time.Local).Truncate(time.Millisecond)
Expand Down
2 changes: 1 addition & 1 deletion master/internal/db/postgres_experiments_intg_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -437,7 +437,7 @@ func TestProjectHyperparameters(t *testing.T) {

require.NoError(t,
RemoveProjectHyperparameters(ctx, nil, []int32{int32(exp1.ID)}))
require.ElementsMatch(t, []string{}, // TODO(!!!) this is a bug in the query.
require.ElementsMatch(t, []string{}, // TODO(nickb): This is a bug in the query.
RequireGetProjectHParams(t, db, projectID))
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ func TestGetDefaultPoolsForWorkspace(t *testing.T) {
MustMigrateTestPostgres(t, pgDB, MigrationsFromDB)

comp, aux, err := GetDefaultPoolsForWorkspace(ctx, -1)
require.NoError(t, err) // TODO(!!!) we should return errors for these cases.
require.NoError(t, err) // TODO(nickb): We should return errors for these cases.
require.Equal(t, "", comp)
require.Equal(t, "", aux)

Expand Down
2 changes: 1 addition & 1 deletion master/internal/rm/agentrm/resource_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ func (rp *resourcePool) allocateRequest(msg sproto.AllocateRequest) {
log.WithError(err).Error("error restoring resources")

// Clear out the state / close and terminate the allocation.
rmevents.Publish(msg.AllocationID, &sproto.ResourcesRestoreError{
rmevents.Publish(msg.AllocationID, &sproto.ResourcesFailedError{
FailureType: sproto.RestoreError,
ErrMsg: err.Error(),
ExitCode: nil,
Expand Down
6 changes: 5 additions & 1 deletion master/internal/rm/kubernetesrm/informer.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,11 @@ func newEventInformer(
"namespace": namespace,
})
for i := range events.Items {
syslog.Debugf("informer added event: %s", events.Items[i].Name)
syslog.Debugf(
"informer added %s event: %s",
events.Items[i].InvolvedObject.Kind,
events.Items[i].Name,
)
cb(watch.Event{Object: &events.Items[i]})
}

Expand Down
Loading

0 comments on commit 0fdb822

Please sign in to comment.