Skip to content

Commit

Permalink
kfp_client: swap _create_job_config <-> run_pipeline
Browse files Browse the repository at this point in the history
  • Loading branch information
faweis authored and michaelbeaumont committed Feb 11, 2020
1 parent ce72872 commit e2a42e5
Showing 1 changed file with 62 additions and 62 deletions.
124 changes: 62 additions & 62 deletions sdk/python/kfp/_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -296,51 +296,39 @@ def list_pipelines(self, page_token='', page_size=10, sort_by=''):
"""
return self._pipelines_api.list_pipelines(page_token=page_token, page_size=page_size, sort_by=sort_by)

# TODO: provide default namespace, similar to kubectl default namespaces.
def _create_job_config(self, experiment_id, params, pipeline_package_path, pipeline_id, namespace):
"""Create a JobConfig with spec and resource_references.
def run_pipeline(self, experiment_id, job_name, pipeline_package_path=None, params={}, pipeline_id=None, namespace=None):
"""Run a specified pipeline.
Args:
experiment_id: The string id of an experiment.
pipeline_package_path: Local path of the pipeline package(the filename should end with one of the following .tar.gz, .tgz, .zip, .yaml, .yml).
params: A dictionary with key (string) as param name and value (string) as param value.
pipeline_id: The string ID of a pipeline.
namespace: Kubernetes namespace where the pipeline runs are created.
job_name: name of the job.
pipeline_package_path: local path of the pipeline package(the filename should end with one of the following .tar.gz, .tgz, .zip, .yaml, .yml).
params: a dictionary with key (string) as param name and value (string) as as param value.
pipeline_id: the string ID of a pipeline.
namespace: kubernetes namespace where the pipeline runs are created.
For single user deployment, leave it as None;
For multi user, input a namespace where the user is authorized
Returns:
A JobConfig object with attributes spec and resource_reference.
A run object. Most important field is id.
"""
class JobConfig:
def __init__(self, spec, resource_references):
self.spec = spec
self.resource_references = resource_references
job_config = self._create_job_config(
experiment_id=experiment_id,
params=params,
pipeline_package_path=pipeline_package_path,
pipeline_id=pipeline_id,
namespace=namespace)
run_body = kfp_server_api.models.ApiRun(
pipeline_spec=job_config.spec, resource_references=job_config.resource_references, name=job_name)

pipeline_json_string = None
if pipeline_package_path:
pipeline_obj = self._extract_pipeline_yaml(pipeline_package_path)
pipeline_json_string = json.dumps(pipeline_obj)
api_params = [kfp_server_api.ApiParameter(
name=sanitize_k8s_name(name=k, allow_capital_underscore=True),
value=str(v)) for k,v in params.items()]
resource_references = []
response = self._run_api.create_run(body=run_body)

key = kfp_server_api.models.ApiResourceKey(id=experiment_id,
type=kfp_server_api.models.ApiResourceType.EXPERIMENT)
reference = kfp_server_api.models.ApiResourceReference(key=key,
relationship=kfp_server_api.models.ApiRelationship.OWNER)
resource_references.append(reference)
if namespace is not None:
key = kfp_server_api.models.ApiResourceKey(id=namespace,
type=kfp_server_api.models.ApiResourceType.NAMESPACE)
reference = kfp_server_api.models.ApiResourceReference(key=key,
name=namespace,
relationship=kfp_server_api.models.ApiRelationship.OWNER)
resource_references.append(reference)
spec = kfp_server_api.models.ApiPipelineSpec(
pipeline_id=pipeline_id,
workflow_manifest=pipeline_json_string,
parameters=api_params)
return JobConfig(spec=spec, resource_references=resource_references)
if self._is_ipython():
import IPython
html = ('Run link <a href="%s/#/runs/details/%s" target="_blank" >here</a>'
% (self._get_url_prefix(), response.run.id))
IPython.display.display(IPython.display.HTML(html))
return response.run

def run_pipeline_recurring(self, experiment_id, job_name, start_time=None, end_time=None, interval_second=600 , max_concurrency=1, params={}, pipeline_package_path=None, pipeline_id=None, namespace=None, enabled=True):
"""Create a recurring run.
Expand Down Expand Up @@ -379,39 +367,51 @@ def run_pipeline_recurring(self, experiment_id, job_name, start_time=None, end_t
max_concurrency=max_concurrency)
return self._job_api.create_job(body=job_body)

def run_pipeline(self, experiment_id, job_name, pipeline_package_path=None, params={}, pipeline_id=None, namespace=None):
"""Run a specified pipeline.
# TODO: provide default namespace, similar to kubectl default namespaces.
def _create_job_config(self, experiment_id, params, pipeline_package_path, pipeline_id, namespace):
"""Create a JobConfig with spec and resource_references.
Args:
experiment_id: The string id of an experiment.
job_name: name of the job.
pipeline_package_path: local path of the pipeline package(the filename should end with one of the following .tar.gz, .tgz, .zip, .yaml, .yml).
params: a dictionary with key (string) as param name and value (string) as as param value.
pipeline_id: the string ID of a pipeline.
namespace: kubernetes namespace where the pipeline runs are created.
pipeline_package_path: Local path of the pipeline package(the filename should end with one of the following .tar.gz, .tgz, .zip, .yaml, .yml).
params: A dictionary with key (string) as param name and value (string) as param value.
pipeline_id: The string ID of a pipeline.
namespace: Kubernetes namespace where the pipeline runs are created.
For single user deployment, leave it as None;
For multi user, input a namespace where the user is authorized
Returns:
A run object. Most important field is id.
A JobConfig object with attributes spec and resource_reference.
"""
job_config = self._create_job_config(
experiment_id=experiment_id,
params=params,
pipeline_package_path=pipeline_package_path,
pipeline_id=pipeline_id,
namespace=namespace)
run_body = kfp_server_api.models.ApiRun(
pipeline_spec=job_config.spec, resource_references=job_config.resource_references, name=job_name)
class JobConfig:
def __init__(self, spec, resource_references):
self.spec = spec
self.resource_references = resource_references

response = self._run_api.create_run(body=run_body)
pipeline_json_string = None
if pipeline_package_path:
pipeline_obj = self._extract_pipeline_yaml(pipeline_package_path)
pipeline_json_string = json.dumps(pipeline_obj)
api_params = [kfp_server_api.ApiParameter(
name=sanitize_k8s_name(name=k, allow_capital_underscore=True),
value=str(v)) for k,v in params.items()]
resource_references = []

if self._is_ipython():
import IPython
html = ('Run link <a href="%s/#/runs/details/%s" target="_blank" >here</a>'
% (self._get_url_prefix(), response.run.id))
IPython.display.display(IPython.display.HTML(html))
return response.run
key = kfp_server_api.models.ApiResourceKey(id=experiment_id,
type=kfp_server_api.models.ApiResourceType.EXPERIMENT)
reference = kfp_server_api.models.ApiResourceReference(key=key,
relationship=kfp_server_api.models.ApiRelationship.OWNER)
resource_references.append(reference)
if namespace is not None:
key = kfp_server_api.models.ApiResourceKey(id=namespace,
type=kfp_server_api.models.ApiResourceType.NAMESPACE)
reference = kfp_server_api.models.ApiResourceReference(key=key,
name=namespace,
relationship=kfp_server_api.models.ApiRelationship.OWNER)
resource_references.append(reference)
spec = kfp_server_api.models.ApiPipelineSpec(
pipeline_id=pipeline_id,
workflow_manifest=pipeline_json_string,
parameters=api_params)
return JobConfig(spec=spec, resource_references=resource_references)

def create_run_from_pipeline_func(self, pipeline_func: Callable, arguments: Mapping[str, str], run_name=None, experiment_name=None, pipeline_conf: kfp.dsl.PipelineConf = None, namespace=None):
'''Runs pipeline on KFP-enabled Kubernetes cluster.
Expand Down

0 comments on commit e2a42e5

Please sign in to comment.