Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor kubernetes providers: each block in kube refers to a pod #1073

Merged
merged 9 commits into from
Jun 19, 2019
146 changes: 116 additions & 30 deletions parsl/providers/kubernetes/kube.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ def __init__(self,
max_blocks=10,
parallelism=1,
worker_init="",
deployment_name=None,
pod_name=None,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

needs docstring

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

user_id=None,
group_id=None,
run_as_non_root=False,
Expand All @@ -86,13 +86,13 @@ def __init__(self,
self.parallelism = parallelism
self.worker_init = worker_init
self.secret = secret
self.deployment_name = deployment_name
self.pod_name = pod_name
self.user_id = user_id
self.group_id = group_id
self.run_as_non_root = run_as_non_root
self.persistent_volumes = persistent_volumes

self.kube_client = client.ExtensionsV1beta1Api()
self.kube_client = client.CoreV1Api()

# Dictionary that keeps track of jobs, keyed on job_id
self.resources = {}
Expand All @@ -110,31 +110,29 @@ def submit(self, cmd_string, blocksize, tasks_per_node, job_name="parsl"):
- None: At capacity, cannot provision more
- job_id: (string) Identifier for the job
"""
if not self.resources:
cur_timestamp = str(time.time() * 1000).split(".")[0]
job_name = "{0}-{1}".format(job_name, cur_timestamp)

if not self.deployment_name:
deployment_name = '{}-deployment'.format(job_name)
else:
deployment_name = '{}-{}-deployment'.format(self.deployment_name,
cur_timestamp)

formatted_cmd = template_string.format(command=cmd_string,
worker_init=self.worker_init)

self.deployment_obj = self._create_deployment_object(job_name,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

_create_deployment and _create_deployment_object are dead code that should be removed maybe now.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed

self.image,
deployment_name,
cmd_string=formatted_cmd,
replicas=self.init_blocks,
volumes=self.persistent_volumes)
logger.debug("Deployment name :{}".format(deployment_name))
self._create_deployment(self.deployment_obj)
self.resources[deployment_name] = {'status': 'RUNNING',
'pods': self.init_blocks}

return deployment_name

cur_timestamp = str(time.time() * 1000).split(".")[0]
job_name = "{0}-{1}".format(job_name, cur_timestamp)

if not self.pod_name:
pod_name = '{}'.format(job_name)
else:
pod_name = '{}-{}'.format(self.pod_name,
cur_timestamp)

formatted_cmd = template_string.format(command=cmd_string,
worker_init=self.worker_init)

logger.debug("Pod name :{}".format(pod_name))
self._create_pod(image=self.image,
pod_name=pod_name,
job_name=job_name,
cmd_string=formatted_cmd,
volumes=self.persistent_volumes)
self.resources[pod_name] = {'status': 'RUNNING',
'pods': blocksize}

return pod_name

def status(self, job_ids):
""" Get the status of a list of jobs identified by the job identifiers
Expand All @@ -161,9 +159,10 @@ def cancel(self, job_ids):
for job in job_ids:
logger.debug("Terminating job/proc_id: {0}".format(job))
# Here we are assuming that for local, the job_ids are the process id's
self._delete_deployment(job)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe remove _delete_deployment from the source file?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

self._delete_pod(job)

self.resources[job]['status'] = 'CANCELLED'
del self.resources[job]
rets = [True for i in job_ids]

return rets
Expand All @@ -181,6 +180,93 @@ def _status(self):
return jobs_ids
# do something to get the deployment's status

def _create_pod(self,
image,
pod_name,
job_name,
port=80,
cmd_string=None,
volumes=[]):
""" Create a kubernetes non-deployment pod for the job.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Refer to this as this a "pod", not a "non-deployment pod".

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

Args:
- image (string) : Docker image to launch
- pod_name (string) : Name of the pod
- job_name (string) : App label
KWargs:
- port (integer) : Container port
Returns:
- None
"""

security_context = None
if self.user_id and self.group_id:
security_context = client.V1SecurityContext(run_as_group=self.group_id,
run_as_user=self.user_id,
run_as_non_root=self.run_as_non_root)

# Create the enviornment variables and command to initiate IPP
environment_vars = client.V1EnvVar(name="TEST", value="SOME DATA")

launch_args = ["-c", "{0};".format(cmd_string)]

volume_mounts = []
# Create mount paths for the volumes
for volume in volumes:
volume_mounts.append(client.V1VolumeMount(mount_path=volume[1],
name=volume[0]))
# Configureate Pod template container
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"configure"?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

container = None
if security_context:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the only purpose of this is to supply or not supply the security_context parameter. It looks (from the generated python code of V1Container in https://github.com/kubernetes-client/python/blob/master/kubernetes/client/models/v1_container.py) that it is fine to pass in None for security_context and so this if-statement could collapse into a single code path.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

collapsed

container = client.V1Container(
name=pod_name,
image=image,
ports=[client.V1ContainerPort(container_port=port)],
volume_mounts=volume_mounts,
command=['/bin/bash'],
args=launch_args,
env=[environment_vars],
security_context=security_context)
else:
container = client.V1Container(
name=pod_name,
image=image,
ports=[client.V1ContainerPort(container_port=port)],
volume_mounts=volume_mounts,
command=['/bin/bash'],
args=launch_args,
env=[environment_vars])
# Create a secret to enable pulling images from secure repositories
secret = None
if self.secret:
secret = client.V1LocalObjectReference(name=self.secret)

# Create list of volumes from (pvc, mount) tuples
volume_defs = []
for volume in volumes:
volume_defs.append(client.V1Volume(name=volume[0],
persistent_volume_claim=client.V1PersistentVolumeClaimVolumeSource(
claim_name=volume[0])))

metadata = client.V1ObjectMeta(name=pod_name,
labels={"app": job_name})
spec = client.V1PodSpec(containers=[container],
image_pull_secrets=[secret],
volumes=volume_defs
)

pod = client.V1Pod(spec=spec, metadata=metadata)
api_response = self.kube_client.create_namespaced_pod(namespace=self.namespace,
body=pod)
logger.debug("Pod created. status='{0}'".format(str(api_response.status)))

def _delete_pod(self, pod_name):
"""Delete a pod"""

api_response = self.kube_client.delete_namespaced_pod(name=pod_name,
namespace=self.namespace,
body=client.V1DeleteOptions())
logger.debug("Pod deleted. status='{0}'".format(str(api_response.status)))

def _create_deployment_object(self, job_name, job_image,
deployment_name, port=80,
replicas=1,
Expand Down Expand Up @@ -295,7 +381,7 @@ def _delete_deployment(self, deployment_name):

@property
def scaling_enabled(self):
return False
return True

@property
def channels_required(self):
Expand Down