From 2cf9841d341ddef86bfa02ebc14c296dc81f54c5 Mon Sep 17 00:00:00 2001 From: Zhuozhao Li Date: Wed, 24 Apr 2019 22:48:42 +0000 Subject: [PATCH 1/6] each block in kube refers to a pod --- parsl/providers/kubernetes/kube.py | 146 +++++++++++++++++++++++------ 1 file changed, 116 insertions(+), 30 deletions(-) diff --git a/parsl/providers/kubernetes/kube.py b/parsl/providers/kubernetes/kube.py index 1b97b88011..46e830fc2a 100644 --- a/parsl/providers/kubernetes/kube.py +++ b/parsl/providers/kubernetes/kube.py @@ -65,7 +65,7 @@ def __init__(self, max_blocks=10, parallelism=1, worker_init="", - deployment_name=None, + pod_name=None, user_id=None, group_id=None, run_as_non_root=False, @@ -86,13 +86,13 @@ def __init__(self, self.parallelism = parallelism self.worker_init = worker_init self.secret = secret - self.deployment_name = deployment_name + self.pod_name = pod_name self.user_id = user_id self.group_id = group_id self.run_as_non_root = run_as_non_root self.persistent_volumes = persistent_volumes - self.kube_client = client.ExtensionsV1beta1Api() + self.kube_client = client.CoreV1Api() # Dictionary that keeps track of jobs, keyed on job_id self.resources = {} @@ -110,31 +110,29 @@ def submit(self, cmd_string, blocksize, tasks_per_node, job_name="parsl"): - None: At capacity, cannot provision more - job_id: (string) Identifier for the job """ - if not self.resources: - cur_timestamp = str(time.time() * 1000).split(".")[0] - job_name = "{0}-{1}".format(job_name, cur_timestamp) - - if not self.deployment_name: - deployment_name = '{}-deployment'.format(job_name) - else: - deployment_name = '{}-{}-deployment'.format(self.deployment_name, - cur_timestamp) - - formatted_cmd = template_string.format(command=cmd_string, - worker_init=self.worker_init) - - self.deployment_obj = self._create_deployment_object(job_name, - self.image, - deployment_name, - cmd_string=formatted_cmd, - replicas=self.init_blocks, - volumes=self.persistent_volumes) - logger.debug("Deployment name :{}".format(deployment_name)) - self._create_deployment(self.deployment_obj) - self.resources[deployment_name] = {'status': 'RUNNING', - 'pods': self.init_blocks} - - return deployment_name + + cur_timestamp = str(time.time() * 1000).split(".")[0] + job_name = "{0}-{1}".format(job_name, cur_timestamp) + + if not self.pod_name: + pod_name = '{}'.format(job_name) + else: + pod_name = '{}-{}'.format(self.pod_name, + cur_timestamp) + + formatted_cmd = template_string.format(command=cmd_string, + worker_init=self.worker_init) + + logger.debug("Pod name :{}".format(pod_name)) + self._create_pod(image=self.image, + pod_name=pod_name, + job_name=job_name, + cmd_string=formatted_cmd, + volumes=self.persistent_volumes) + self.resources[pod_name] = {'status': 'RUNNING', + 'pods': blocksize} + + return pod_name def status(self, job_ids): """ Get the status of a list of jobs identified by the job identifiers @@ -161,9 +159,10 @@ def cancel(self, job_ids): for job in job_ids: logger.debug("Terminating job/proc_id: {0}".format(job)) # Here we are assuming that for local, the job_ids are the process id's - self._delete_deployment(job) + self._delete_pod(job) self.resources[job]['status'] = 'CANCELLED' + del self.resources[job] rets = [True for i in job_ids] return rets @@ -181,6 +180,93 @@ def _status(self): return jobs_ids # do something to get the deployment's status + def _create_pod(self, + image, + pod_name, + job_name, + port=80, + cmd_string=None, + volumes=[]): + """ Create a kubernetes non-deployment pod for the job. + Args: + - image (string) : Docker image to launch + - pod_name (string) : Name of the pod + - job_name (string) : App label + KWargs: + - port (integer) : Container port + Returns: + - None + """ + + security_context = None + if self.user_id and self.group_id: + security_context = client.V1SecurityContext(run_as_group=self.group_id, + run_as_user=self.user_id, + run_as_non_root=self.run_as_non_root) + + # Create the enviornment variables and command to initiate IPP + environment_vars = client.V1EnvVar(name="TEST", value="SOME DATA") + + launch_args = ["-c", "{0};".format(cmd_string)] + + volume_mounts = [] + # Create mount paths for the volumes + for volume in volumes: + volume_mounts.append(client.V1VolumeMount(mount_path=volume[1], + name=volume[0])) + # Configureate Pod template container + container = None + if security_context: + container = client.V1Container( + name=pod_name, + image=image, + ports=[client.V1ContainerPort(container_port=port)], + volume_mounts=volume_mounts, + command=['/bin/bash'], + args=launch_args, + env=[environment_vars], + security_context=security_context) + else: + container = client.V1Container( + name=pod_name, + image=image, + ports=[client.V1ContainerPort(container_port=port)], + volume_mounts=volume_mounts, + command=['/bin/bash'], + args=launch_args, + env=[environment_vars]) + # Create a secret to enable pulling images from secure repositories + secret = None + if self.secret: + secret = client.V1LocalObjectReference(name=self.secret) + + # Create list of volumes from (pvc, mount) tuples + volume_defs = [] + for volume in volumes: + volume_defs.append(client.V1Volume(name=volume[0], + persistent_volume_claim=client.V1PersistentVolumeClaimVolumeSource( + claim_name=volume[0]))) + + metadata = client.V1ObjectMeta(name=pod_name, + labels={"app": job_name}) + spec = client.V1PodSpec(containers=[container], + image_pull_secrets=[secret], + volumes=volume_defs + ) + + pod = client.V1Pod(spec=spec, metadata=metadata) + api_response = self.kube_client.create_namespaced_pod(namespace=self.namespace, + body=pod) + logger.debug("Pod created. status='{0}'".format(str(api_response.status))) + + def _delete_pod(self, pod_name): + """Delete a pod""" + + api_response = self.kube_client.delete_namespaced_pod(name=pod_name, + namespace=self.namespace, + body=client.V1DeleteOptions()) + logger.debug("Pod deleted. status='{0}'".format(str(api_response.status))) + def _create_deployment_object(self, job_name, job_image, deployment_name, port=80, replicas=1, @@ -295,7 +381,7 @@ def _delete_deployment(self, deployment_name): @property def scaling_enabled(self): - return False + return True @property def channels_required(self): From d0930c3b8a4195c50154f1d904d1dc905d43c392 Mon Sep 17 00:00:00 2001 From: Zhuozhao Li Date: Tue, 18 Jun 2019 17:53:31 -0500 Subject: [PATCH 2/6] add docstring for pod_name --- parsl/providers/kubernetes/kube.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/parsl/providers/kubernetes/kube.py b/parsl/providers/kubernetes/kube.py index 46e830fc2a..e9ac9e28ea 100644 --- a/parsl/providers/kubernetes/kube.py +++ b/parsl/providers/kubernetes/kube.py @@ -44,6 +44,9 @@ class KubernetesProvider(ExecutionProvider, RepresentationMixin): Command to be run first for the workers, such as `python start.py`. secret : str Docker secret to use to pull images + pod_name : str + The name for the pod, will be appended with a timestamp. + Default is None, meaning parsl automatically names the pod. user_id : str Unix user id to run the container as. group_id : str From 4beeb15fe2938d76bca0fa433afac000d04b41a7 Mon Sep 17 00:00:00 2001 From: Zhuozhao Li Date: Tue, 18 Jun 2019 17:56:12 -0500 Subject: [PATCH 3/6] remove dead code about create deployment --- parsl/providers/kubernetes/kube.py | 112 ----------------------------- 1 file changed, 112 deletions(-) diff --git a/parsl/providers/kubernetes/kube.py b/parsl/providers/kubernetes/kube.py index e9ac9e28ea..615f03baa5 100644 --- a/parsl/providers/kubernetes/kube.py +++ b/parsl/providers/kubernetes/kube.py @@ -270,118 +270,6 @@ def _delete_pod(self, pod_name): body=client.V1DeleteOptions()) logger.debug("Pod deleted. status='{0}'".format(str(api_response.status))) - def _create_deployment_object(self, job_name, job_image, - deployment_name, port=80, - replicas=1, - cmd_string=None, - engine_json_file='~/.ipython/profile_default/security/ipcontroller-engine.json', - engine_dir='.', - volumes=[]): - """ Create a kubernetes deployment for the job. - Args: - - job_name (string) : Name of the job and deployment - - job_image (string) : Docker image to launch - KWargs: - - port (integer) : Container port - - replicas : Number of replica containers to maintain - Returns: - - True: The deployment object to launch - """ - - # sorry, quick hack that doesn't pass this stuff through to test it works. - # TODO it also doesn't only add what is set :( - security_context = None - if self.user_id and self.group_id: - security_context = client.V1SecurityContext(run_as_group=self.group_id, - run_as_user=self.user_id, - run_as_non_root=self.run_as_non_root) - - # Create the enviornment variables and command to initiate IPP - environment_vars = client.V1EnvVar(name="TEST", value="SOME DATA") - - launch_args = ["-c", "{0}; /app/deploy.sh;".format(cmd_string)] - - volume_mounts = [] - # Create mount paths for the volumes - for volume in volumes: - volume_mounts.append(client.V1VolumeMount(mount_path=volume[1], - name=volume[0])) - # Configureate Pod template container - container = None - if security_context: - container = client.V1Container( - name=job_name, - image=job_image, - ports=[client.V1ContainerPort(container_port=port)], - volume_mounts=volume_mounts, - command=['/bin/bash'], - args=launch_args, - env=[environment_vars], - security_context=security_context) - else: - container = client.V1Container( - name=job_name, - image=job_image, - ports=[client.V1ContainerPort(container_port=port)], - volume_mounts=volume_mounts, - command=['/bin/bash'], - args=launch_args, - env=[environment_vars]) - # Create a secret to enable pulling images from secure repositories - secret = None - if self.secret: - secret = client.V1LocalObjectReference(name=self.secret) - - # Create list of volumes from (pvc, mount) tuples - volume_defs = [] - for volume in volumes: - volume_defs.append(client.V1Volume(name=volume[0], - persistent_volume_claim=client.V1PersistentVolumeClaimVolumeSource( - claim_name=volume[0]))) - - # Create and configurate a spec section - template = client.V1PodTemplateSpec( - metadata=client.V1ObjectMeta(labels={"app": job_name}), - spec=client.V1PodSpec(containers=[container], - image_pull_secrets=[secret], - volumes=volume_defs - )) - - # Create the specification of deployment - spec = client.ExtensionsV1beta1DeploymentSpec(replicas=replicas, - template=template) - - # Instantiate the deployment object - deployment = client.ExtensionsV1beta1Deployment( - api_version="extensions/v1beta1", - kind="Deployment", - metadata=client.V1ObjectMeta(name=deployment_name), - spec=spec) - - return deployment - - def _create_deployment(self, deployment): - """ Create the kubernetes deployment """ - - api_response = self.kube_client.create_namespaced_deployment( - body=deployment, - namespace=self.namespace) - - logger.debug("Deployment created. status='{0}'".format(str(api_response.status))) - - def _delete_deployment(self, deployment_name): - """ Delete deployment """ - - api_response = self.kube_client.delete_namespaced_deployment( - name=deployment_name, - namespace=self.namespace, - body=client.V1DeleteOptions( - propagation_policy='Foreground', - grace_period_seconds=5)) - - logger.debug("Deployment deleted. status='{0}'".format( - str(api_response.status))) - @property def scaling_enabled(self): return True From 0b429544425c197419371db54a8652ded70993af Mon Sep 17 00:00:00 2001 From: Zhuozhao Li Date: Tue, 18 Jun 2019 17:58:00 -0500 Subject: [PATCH 4/6] docstring - non-deployment pod to pod --- parsl/providers/kubernetes/kube.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/parsl/providers/kubernetes/kube.py b/parsl/providers/kubernetes/kube.py index 615f03baa5..a8caec0f3d 100644 --- a/parsl/providers/kubernetes/kube.py +++ b/parsl/providers/kubernetes/kube.py @@ -190,7 +190,7 @@ def _create_pod(self, port=80, cmd_string=None, volumes=[]): - """ Create a kubernetes non-deployment pod for the job. + """ Create a kubernetes pod for the job. Args: - image (string) : Docker image to launch - pod_name (string) : Name of the pod From 0ce7f93b7c3b832e23bc0616d56793edadc73bc5 Mon Sep 17 00:00:00 2001 From: Zhuozhao Li Date: Tue, 18 Jun 2019 17:59:44 -0500 Subject: [PATCH 5/6] typo in comment --- parsl/providers/kubernetes/kube.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/parsl/providers/kubernetes/kube.py b/parsl/providers/kubernetes/kube.py index a8caec0f3d..a3d99be303 100644 --- a/parsl/providers/kubernetes/kube.py +++ b/parsl/providers/kubernetes/kube.py @@ -217,7 +217,7 @@ def _create_pod(self, for volume in volumes: volume_mounts.append(client.V1VolumeMount(mount_path=volume[1], name=volume[0])) - # Configureate Pod template container + # Configure Pod template container container = None if security_context: container = client.V1Container( From e80792db36598e601f9c0a201fc78134e8b71c07 Mon Sep 17 00:00:00 2001 From: Zhuozhao Li Date: Wed, 19 Jun 2019 03:19:26 +0000 Subject: [PATCH 6/6] simplify container creation code --- parsl/providers/kubernetes/kube.py | 30 ++++++++++-------------------- 1 file changed, 10 insertions(+), 20 deletions(-) diff --git a/parsl/providers/kubernetes/kube.py b/parsl/providers/kubernetes/kube.py index a3d99be303..d2136384ec 100644 --- a/parsl/providers/kubernetes/kube.py +++ b/parsl/providers/kubernetes/kube.py @@ -218,26 +218,16 @@ def _create_pod(self, volume_mounts.append(client.V1VolumeMount(mount_path=volume[1], name=volume[0])) # Configure Pod template container - container = None - if security_context: - container = client.V1Container( - name=pod_name, - image=image, - ports=[client.V1ContainerPort(container_port=port)], - volume_mounts=volume_mounts, - command=['/bin/bash'], - args=launch_args, - env=[environment_vars], - security_context=security_context) - else: - container = client.V1Container( - name=pod_name, - image=image, - ports=[client.V1ContainerPort(container_port=port)], - volume_mounts=volume_mounts, - command=['/bin/bash'], - args=launch_args, - env=[environment_vars]) + container = client.V1Container( + name=pod_name, + image=image, + ports=[client.V1ContainerPort(container_port=port)], + volume_mounts=volume_mounts, + command=['/bin/bash'], + args=launch_args, + env=[environment_vars], + security_context=security_context) + # Create a secret to enable pulling images from secure repositories secret = None if self.secret: