Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kfp_client: swap _create_job_config <-> run_pipeline #6

Merged
merged 1 commit into from
Feb 11, 2020
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
124 changes: 62 additions & 62 deletions sdk/python/kfp/_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -296,51 +296,39 @@ def list_pipelines(self, page_token='', page_size=10, sort_by=''):
"""
return self._pipelines_api.list_pipelines(page_token=page_token, page_size=page_size, sort_by=sort_by)

# TODO: provide default namespace, similar to kubectl default namespaces.
def _create_job_config(self, experiment_id, params, pipeline_package_path, pipeline_id, namespace):
"""Create a JobConfig with spec and resource_references.
def run_pipeline(self, experiment_id, job_name, pipeline_package_path=None, params={}, pipeline_id=None, namespace=None):
"""Run a specified pipeline.

Args:
experiment_id: The string id of an experiment.
pipeline_package_path: Local path of the pipeline package(the filename should end with one of the following .tar.gz, .tgz, .zip, .yaml, .yml).
params: A dictionary with key (string) as param name and value (string) as param value.
pipeline_id: The string ID of a pipeline.
namespace: Kubernetes namespace where the pipeline runs are created.
job_name: name of the job.
pipeline_package_path: local path of the pipeline package(the filename should end with one of the following .tar.gz, .tgz, .zip, .yaml, .yml).
params: a dictionary with key (string) as param name and value (string) as as param value.
pipeline_id: the string ID of a pipeline.
namespace: kubernetes namespace where the pipeline runs are created.
For single user deployment, leave it as None;
For multi user, input a namespace where the user is authorized

Returns:
A JobConfig object with attributes spec and resource_reference.
A run object. Most important field is id.
"""
class JobConfig:
def __init__(self, spec, resource_references):
self.spec = spec
self.resource_references = resource_references
job_config = self._create_job_config(
experiment_id=experiment_id,
params=params,
pipeline_package_path=pipeline_package_path,
pipeline_id=pipeline_id,
namespace=namespace)
run_body = kfp_server_api.models.ApiRun(
pipeline_spec=job_config.spec, resource_references=job_config.resource_references, name=job_name)

pipeline_json_string = None
if pipeline_package_path:
pipeline_obj = self._extract_pipeline_yaml(pipeline_package_path)
pipeline_json_string = json.dumps(pipeline_obj)
api_params = [kfp_server_api.ApiParameter(
name=sanitize_k8s_name(name=k, allow_capital_underscore=True),
value=str(v)) for k,v in params.items()]
resource_references = []
response = self._run_api.create_run(body=run_body)

key = kfp_server_api.models.ApiResourceKey(id=experiment_id,
type=kfp_server_api.models.ApiResourceType.EXPERIMENT)
reference = kfp_server_api.models.ApiResourceReference(key=key,
relationship=kfp_server_api.models.ApiRelationship.OWNER)
resource_references.append(reference)
if namespace is not None:
key = kfp_server_api.models.ApiResourceKey(id=namespace,
type=kfp_server_api.models.ApiResourceType.NAMESPACE)
reference = kfp_server_api.models.ApiResourceReference(key=key,
name=namespace,
relationship=kfp_server_api.models.ApiRelationship.OWNER)
resource_references.append(reference)
spec = kfp_server_api.models.ApiPipelineSpec(
pipeline_id=pipeline_id,
workflow_manifest=pipeline_json_string,
parameters=api_params)
return JobConfig(spec=spec, resource_references=resource_references)
if self._is_ipython():
import IPython
html = ('Run link <a href="%s/#/runs/details/%s" target="_blank" >here</a>'
% (self._get_url_prefix(), response.run.id))
IPython.display.display(IPython.display.HTML(html))
return response.run

def run_pipeline_recurring(self, experiment_id, job_name, start_time=None, end_time=None, interval_second=600 , max_concurrency=1, params={}, pipeline_package_path=None, pipeline_id=None, namespace=None, enabled=True):
"""Create a recurring run.
Expand Down Expand Up @@ -379,39 +367,51 @@ def run_pipeline_recurring(self, experiment_id, job_name, start_time=None, end_t
max_concurrency=max_concurrency)
return self._job_api.create_job(body=job_body)

def run_pipeline(self, experiment_id, job_name, pipeline_package_path=None, params={}, pipeline_id=None, namespace=None):
"""Run a specified pipeline.

# TODO: provide default namespace, similar to kubectl default namespaces.
def _create_job_config(self, experiment_id, params, pipeline_package_path, pipeline_id, namespace):
"""Create a JobConfig with spec and resource_references.
Args:
experiment_id: The string id of an experiment.
job_name: name of the job.
pipeline_package_path: local path of the pipeline package(the filename should end with one of the following .tar.gz, .tgz, .zip, .yaml, .yml).
params: a dictionary with key (string) as param name and value (string) as as param value.
pipeline_id: the string ID of a pipeline.
namespace: kubernetes namespace where the pipeline runs are created.
pipeline_package_path: Local path of the pipeline package(the filename should end with one of the following .tar.gz, .tgz, .zip, .yaml, .yml).
params: A dictionary with key (string) as param name and value (string) as param value.
pipeline_id: The string ID of a pipeline.
namespace: Kubernetes namespace where the pipeline runs are created.
For single user deployment, leave it as None;
For multi user, input a namespace where the user is authorized

Returns:
A run object. Most important field is id.
A JobConfig object with attributes spec and resource_reference.
"""
job_config = self._create_job_config(
experiment_id=experiment_id,
params=params,
pipeline_package_path=pipeline_package_path,
pipeline_id=pipeline_id,
namespace=namespace)
run_body = kfp_server_api.models.ApiRun(
pipeline_spec=job_config.spec, resource_references=job_config.resource_references, name=job_name)
class JobConfig:
def __init__(self, spec, resource_references):
self.spec = spec
self.resource_references = resource_references

response = self._run_api.create_run(body=run_body)
pipeline_json_string = None
if pipeline_package_path:
pipeline_obj = self._extract_pipeline_yaml(pipeline_package_path)
pipeline_json_string = json.dumps(pipeline_obj)
api_params = [kfp_server_api.ApiParameter(
name=sanitize_k8s_name(name=k, allow_capital_underscore=True),
value=str(v)) for k,v in params.items()]
resource_references = []

if self._is_ipython():
import IPython
html = ('Run link <a href="%s/#/runs/details/%s" target="_blank" >here</a>'
% (self._get_url_prefix(), response.run.id))
IPython.display.display(IPython.display.HTML(html))
return response.run
key = kfp_server_api.models.ApiResourceKey(id=experiment_id,
type=kfp_server_api.models.ApiResourceType.EXPERIMENT)
reference = kfp_server_api.models.ApiResourceReference(key=key,
relationship=kfp_server_api.models.ApiRelationship.OWNER)
resource_references.append(reference)
if namespace is not None:
key = kfp_server_api.models.ApiResourceKey(id=namespace,
type=kfp_server_api.models.ApiResourceType.NAMESPACE)
reference = kfp_server_api.models.ApiResourceReference(key=key,
name=namespace,
relationship=kfp_server_api.models.ApiRelationship.OWNER)
resource_references.append(reference)
spec = kfp_server_api.models.ApiPipelineSpec(
pipeline_id=pipeline_id,
workflow_manifest=pipeline_json_string,
parameters=api_params)
return JobConfig(spec=spec, resource_references=resource_references)

def create_run_from_pipeline_func(self, pipeline_func: Callable, arguments: Mapping[str, str], run_name=None, experiment_name=None, pipeline_conf: kfp.dsl.PipelineConf = None, namespace=None):
'''Runs pipeline on KFP-enabled Kubernetes cluster.
Expand Down