Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ENG-29 get_available_jobs_count should take all pods into account #909

Merged
merged 13 commits into from
May 10, 2024
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 0 additions & 4 deletions charts/platform-monitoring/templates/_helpers.tpl
Original file line number Diff line number Diff line change
Expand Up @@ -120,10 +120,6 @@ release: {{ .Release.Name | quote }}
- name: SENTRY_SAMPLE_RATE
value: {{ .Values.sentry.sampleRate | default 0 | quote }}
{{- end }}
{{- if .Values.nodeLabels.job }}
- name: NP_MONITORING_NODE_LABEL_JOB
value: {{ .Values.nodeLabels.job }}
{{- end }}
{{- if .Values.nodeLabels.nodePool }}
- name: NP_MONITORING_NODE_LABEL_NODE_POOL
value: {{ .Values.nodeLabels.nodePool }}
Expand Down
2 changes: 1 addition & 1 deletion charts/platform-monitoring/templates/fluent-bit.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ spec:
requiredDuringSchedulingIgnoredDuringExecution:
nodeSelectorTerms:
- matchExpressions:
- key: {{ .Values.nodeLabels.job | quote }}
- key: {{ .Values.nodeLabels.nodePool | quote }}
operator: Exists
{{- if .Values.fluentbit.tolerations }}
tolerations: {{ toYaml .Values.fluentbit.tolerations | nindent 8 }}
Expand Down
1 change: 0 additions & 1 deletion charts/platform-monitoring/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@ sentry:
sampleRate: 0.002

nodeLabels:
job: platform.neuromation.io/job
nodePool: platform.neuromation.io/nodepool

fluentbit:
Expand Down
1 change: 0 additions & 1 deletion minikube.sh
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ function minikube::start {
--wait-timeout=5m
kubectl config use-context minikube
kubectl get nodes -o name | xargs -I {} kubectl label {} --overwrite \
platform.neuromation.io/job=true \
platform.neuromation.io/nodepool=minikube
}

Expand Down
1 change: 0 additions & 1 deletion platform_monitoring/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -820,7 +820,6 @@ async def _init_app(app: aiohttp.web.Application) -> AsyncIterator[None]:
kube_client=kube_client,
container_runtime_client_registry=container_runtime_client_registry,
cluster_name=config.cluster_name,
kube_job_label=config.kube.job_label,
kube_node_pool_label=config.kube.node_pool_label,
)
app[MONITORING_APP_KEY][JOBS_SERVICE_KEY] = jobs_service
Expand Down
1 change: 0 additions & 1 deletion platform_monitoring/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,6 @@ class KubeConfig:
kubelet_node_port: int = 10250
nvidia_dcgm_node_port: int | None = None

job_label: str = "platform.neuromation.io/job"
node_pool_label: str = "platform.neuromation.io/nodepool"


Expand Down
3 changes: 0 additions & 3 deletions platform_monitoring/config_factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -170,9 +170,6 @@ def _create_kube(self) -> KubeConfig:
if "NP_MONITORING_K8S_NVIDIA_DCGM_PORT" in self._environ
else KubeConfig.nvidia_dcgm_node_port
),
job_label=self._environ.get(
"NP_MONITORING_NODE_LABEL_JOB", KubeConfig.job_label
),
node_pool_label=self._environ.get(
"NP_MONITORING_NODE_LABEL_NODE_POOL", KubeConfig.node_pool_label
),
Expand Down
119 changes: 75 additions & 44 deletions platform_monitoring/jobs_service.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
import asyncio
from collections import defaultdict
from collections.abc import AsyncGenerator, AsyncIterator, Mapping, Sequence
from contextlib import AbstractAsyncContextManager, asynccontextmanager
from dataclasses import dataclass
from functools import reduce

import aiohttp
from neuro_config_client import ConfigClient, ResourcePoolType
Expand All @@ -14,7 +14,14 @@
ContainerRuntimeClientRegistry,
ContainerRuntimeError,
)
from .kube_client import JobNotFoundException, KubeClient, Pod, Resources
from .kube_client import (
DEFAULT_MAX_PODS_PER_NODE,
ContainerResources,
JobNotFoundException,
KubeClient,
NodeResources,
Pod,
)
from .user import User
from .utils import KubeHelper, asyncgeneratorcontextmanager

Expand Down Expand Up @@ -44,12 +51,12 @@ def __init__(self, name: str) -> None:
class JobsService:
def __init__(
self,
*,
config_client: ConfigClient,
jobs_client: JobsClient,
kube_client: KubeClient,
container_runtime_client_registry: ContainerRuntimeClientRegistry,
cluster_name: str,
kube_job_label: str = KubeConfig.job_label,
kube_node_pool_label: str = KubeConfig.node_pool_label,
) -> None:
self._config_client = config_client
Expand All @@ -58,7 +65,6 @@ def __init__(
self._kube_helper = KubeHelper()
self._container_runtime_client_registry = container_runtime_client_registry
self._cluster_name = cluster_name
self._kube_job_label = kube_job_label
self._kube_node_pool_label = kube_node_pool_label

async def get(self, job_id: str) -> Job:
Expand Down Expand Up @@ -91,7 +97,10 @@ async def save(
cont_id = pod.get_container_id(pod_name)
assert cont_id

runtime_client = await self._container_runtime_client_registry.get(pod.host_ip)
assert pod.status.host_ip
runtime_client = await self._container_runtime_client_registry.get(
pod.status.host_ip
)

try:
async with runtime_client.commit(
Expand Down Expand Up @@ -138,7 +147,10 @@ async def attach(
if not pod.tty:
tty = False

runtime_client = await self._container_runtime_client_registry.get(pod.host_ip)
assert pod.status.host_ip
runtime_client = await self._container_runtime_client_registry.get(
pod.status.host_ip
)

async with runtime_client.attach(
cont_id, tty=tty, stdin=stdin, stdout=stdout, stderr=stderr
Expand All @@ -161,7 +173,10 @@ async def exec(
cont_id = pod.get_container_id(pod_name)
assert cont_id

runtime_client = await self._container_runtime_client_registry.get(pod.host_ip)
assert pod.status.host_ip
runtime_client = await self._container_runtime_client_registry.get(
pod.status.host_ip
)

async with runtime_client.exec(
cont_id, cmd, tty=tty, stdin=stdin, stdout=stdout, stderr=stderr
Expand All @@ -175,7 +190,10 @@ async def kill(self, job: Job) -> None:
cont_id = pod.get_container_id(pod_name)
assert cont_id

runtime_client = await self._container_runtime_client_registry.get(pod.host_ip)
assert pod.status.host_ip
runtime_client = await self._container_runtime_client_registry.get(
pod.status.host_ip
)

await runtime_client.kill(cont_id)

Expand All @@ -184,14 +202,14 @@ async def port_forward(
) -> tuple[asyncio.StreamReader, asyncio.StreamWriter]:
pod_name = self._kube_helper.get_job_pod_name(job)
pod = await self._get_running_jobs_pod(pod_name)
reader, writer = await asyncio.open_connection(pod.pod_ip, port)
reader, writer = await asyncio.open_connection(pod.status.pod_ip, port)
return reader, writer

async def _get_running_jobs_pod(self, job_id: str) -> Pod:
pod: Pod | None
try:
pod = await self._kube_client.get_pod(job_id)
if not pod.is_phase_running:
if not pod.status.is_running:
pod = None
except JobNotFoundException:
# job's pod does not exist: it might be already garbage-collected
Expand All @@ -207,39 +225,47 @@ async def get_available_jobs_counts(self) -> Mapping[str, int]:
result: dict[str, int] = {}
cluster = await self._config_client.get_cluster(self._cluster_name)
assert cluster.orchestrator is not None
resource_requests = await self._get_resource_requests_by_node_pool()
available_resources = await self._get_available_resources_by_node_pool()
pool_types = {p.name: p for p in cluster.orchestrator.resource_pool_types}
for preset in cluster.orchestrator.resource_presets:
available_jobs_count = 0
preset_resources = Resources(
preset_resources = ContainerResources(
cpu_m=int(preset.cpu * 1000),
memory=preset.memory,
gpu=preset.gpu or 0,
nvidia_gpu=preset.nvidia_gpu or 0,
amd_gpu=preset.amd_gpu or 0,
)
preset_pool_types = [pool_types[r] for r in preset.resource_affinity]
for node_pool in preset_pool_types:
node_pools = [pool_types[r] for r in preset.available_resource_pool_names]
for node_pool in node_pools:
node_resource_limit = self._get_node_resource_limit(node_pool)
node_resource_requests = resource_requests.get(node_pool.name, [])
running_nodes_count = len(node_resource_requests)
node_pool_available_resources = available_resources.get(
node_pool.name, []
)
running_nodes_count = len(node_pool_available_resources)
free_nodes_count = node_pool.max_size - running_nodes_count
# get number of jobs that can be scheduled on running nodes
# in the current node pool
for request in node_resource_requests:
available_resources = node_resource_limit.available(request)
available_jobs_count += available_resources.count(preset_resources)
for node_available_resources in node_pool_available_resources:
available_jobs_count += min(
node_available_resources // preset_resources,
node_available_resources.pods,
)
# get number of jobs that can be scheduled on free nodes
# in the current node pool
if free_nodes_count > 0:
available_jobs_count += (
free_nodes_count * node_resource_limit.count(preset_resources)
available_jobs_count += free_nodes_count * min(
DEFAULT_MAX_PODS_PER_NODE,
node_resource_limit // preset_resources,
)
result[preset.name] = available_jobs_count
return result

async def _get_resource_requests_by_node_pool(self) -> dict[str, list[Resources]]:
result: dict[str, list[Resources]] = {}
async def _get_available_resources_by_node_pool(
self,
) -> dict[str, list[NodeResources]]:
result: dict[str, list[NodeResources]] = defaultdict(list)
pods = await self._kube_client.get_pods(
label_selector=self._kube_job_label,
all_namespaces=True,
field_selector=",".join(
(
"status.phase!=Failed",
Expand All @@ -248,36 +274,41 @@ async def _get_resource_requests_by_node_pool(self) -> dict[str, list[Resources]
),
),
)
nodes = await self._kube_client.get_nodes(label_selector=self._kube_job_label)
for node_name, node_pods in self._group_pods_by_node(pods).items():
if not node_name:
continue
nodes = await self._kube_client.get_nodes(
label_selector=self._kube_node_pool_label
)
for node_name, node_pods in self._get_pods_by_node(pods).items():
for node in nodes:
if node.name == node_name:
if node.metadata.name == node_name:
break
else:
raise NodeNotFoundException(node_name)
node_pool_name = node.get_label(self._kube_node_pool_label)
node_pool_name = node.metadata.labels.get(self._kube_node_pool_label)
if not node_pool_name: # pragma: no coverage
continue
pod_resources = [p.resource_requests for p in node_pods]
node_resources = reduce(Resources.add, pod_resources, Resources())
result.setdefault(node_pool_name, []).append(node_resources)
resource_requests = sum(
(pod.resource_requests for pod in node_pods), ContainerResources()
)
available_resources = node.status.allocatable - resource_requests
available_resources = available_resources.with_pods(
available_resources.pods - len(node_pods)
)
result[node_pool_name].append(available_resources)
return result

def _group_pods_by_node(self, pods: Sequence[Pod]) -> dict[str | None, list[Pod]]:
result: dict[str | None, list[Pod]] = {}
def _get_pods_by_node(self, pods: Sequence[Pod]) -> dict[str, list[Pod]]:
result: dict[str, list[Pod]] = defaultdict(list)
for pod in pods:
group = result.get(pod.node_name)
if not group:
group = []
result[pod.node_name] = group
group.append(pod)
if pod.spec.node_name:
result[pod.spec.node_name].append(pod)
return result

def _get_node_resource_limit(self, node_pool: ResourcePoolType) -> Resources:
return Resources(
def _get_node_resource_limit(
self, node_pool: ResourcePoolType
) -> ContainerResources:
return ContainerResources(
cpu_m=int(node_pool.available_cpu * 1000),
memory=node_pool.available_memory,
gpu=node_pool.gpu or 0,
nvidia_gpu=node_pool.nvidia_gpu or 0,
amd_gpu=node_pool.amd_gpu or 0,
)
Loading
Loading