From e968c959bb5772c84a97da3723a929ed6664432a Mon Sep 17 00:00:00 2001 From: Alex Latchford Date: Mon, 3 Aug 2020 09:33:47 -0700 Subject: [PATCH] chore: Clean up KFP SDK docstrings, make formatting a little more consistent (#4218) * Prepare SDK docs environment so its easier to understand how to build the docs locally so theyre consistent with ReadTheDocs. * Clean up docstrings for kfp.Client * Add in updates to the docs for compiler and components * Update components area to add in code references and make formatting a little more consistent. * Clean up containers, add in custom CSS to ensure we do not overflow on inline code blocks * Clean up containers, add in custom CSS to ensure we do not overflow on inline code blocks * Remove unused kfp.notebook package links * Clean up a few more errant references * Clean up the DSL docs some more * Update SDK docs for KFP extensions to follow Sphinx guidelines * Clean up formatting of docstrings after Ark-Kuns comments --- .readthedocs.yml | 8 + docs/_static/custom.css | 3 + docs/conf.py | 7 +- docs/index.rst | 8 +- docs/requirements.txt | 2 + docs/source/kfp.client.rst | 4 +- docs/source/kfp.components.structures.rst | 1 + docs/source/kfp.notebook.rst | 8 - docs/source/kfp.rst | 3 +- docs/source/modules.rst | 7 - sdk/python/kfp/_client.py | 214 +++++++++------- sdk/python/kfp/aws.py | 8 +- sdk/python/kfp/azure.py | 7 +- sdk/python/kfp/compiler/compiler.py | 52 ++-- sdk/python/kfp/components/_component_store.py | 50 ++-- sdk/python/kfp/components/_components.py | 26 +- sdk/python/kfp/components/_python_op.py | 190 +++++++------- .../components/_python_to_graph_component.py | 7 +- sdk/python/kfp/containers/_build_image_api.py | 25 +- .../kfp/containers/_component_builder.py | 6 +- sdk/python/kfp/dsl/_component.py | 58 ++--- sdk/python/kfp/dsl/_container_op.py | 236 +++++++++--------- sdk/python/kfp/dsl/_metadata.py | 2 + sdk/python/kfp/dsl/_ops_group.py | 71 +++--- sdk/python/kfp/dsl/_pipeline.py | 74 +++--- sdk/python/kfp/dsl/_pipeline_param.py | 59 +++-- sdk/python/kfp/dsl/_pipeline_volume.py | 19 +- sdk/python/kfp/dsl/_resource_op.py | 42 ++-- sdk/python/kfp/dsl/_volume_op.py | 48 ++-- sdk/python/kfp/dsl/_volume_snapshot_op.py | 34 ++- sdk/python/kfp/gcp.py | 11 +- sdk/python/kfp/onprem.py | 10 +- 32 files changed, 683 insertions(+), 617 deletions(-) create mode 100644 .readthedocs.yml create mode 100644 docs/_static/custom.css create mode 100644 docs/requirements.txt delete mode 100644 docs/source/kfp.notebook.rst delete mode 100644 docs/source/modules.rst diff --git a/.readthedocs.yml b/.readthedocs.yml new file mode 100644 index 000000000000..4fe9c7e44501 --- /dev/null +++ b/.readthedocs.yml @@ -0,0 +1,8 @@ +# See https://docs.readthedocs.io/en/stable/config-file/v2.html for details +version: 2 +sphinx: + configuration: docs/conf.py +python: + version: 3.7 + install: + - requirements: sdk/python/requirements.txt \ No newline at end of file diff --git a/docs/_static/custom.css b/docs/_static/custom.css new file mode 100644 index 000000000000..c6f856d077a2 --- /dev/null +++ b/docs/_static/custom.css @@ -0,0 +1,3 @@ +.rst-content code, .rst-content tt, code { + white-space: normal; +} \ No newline at end of file diff --git a/docs/conf.py b/docs/conf.py index 59b0fac1af43..3ccece6ddab5 100644 --- a/docs/conf.py +++ b/docs/conf.py @@ -55,7 +55,7 @@ # -- Project information ----------------------------------------------------- project = 'Kubeflow Pipelines' -copyright = '2019, Google' +copyright = '2020, Google' author = 'Google' # The short X.Y version @@ -106,7 +106,6 @@ # The name of the Pygments (syntax highlighting) style to use. pygments_style = None - # -- Options for HTML output ------------------------------------------------- # The theme to use for HTML and HTML Help pages. See the documentation for @@ -140,6 +139,10 @@ # # html_sidebars = {} +html_css_files = [ + 'custom.css', +] + # -- Options for HTMLHelp output --------------------------------------------- diff --git a/docs/index.rst b/docs/index.rst index 593759728007..a3944bfcfe2c 100644 --- a/docs/index.rst +++ b/docs/index.rst @@ -3,8 +3,8 @@ You can adapt this file completely to your liking, but it should at least contain the root `toctree` directive. -Welcome to Kubeflow Pipelines SDK API reference -================================================ +Kubeflow Pipelines SDK API +========================== Main documentation: https://www.kubeflow.org/docs/pipelines/ @@ -12,11 +12,11 @@ Source code: https://github.com/kubeflow/pipelines/ .. toctree:: :maxdepth: 3 - :caption: Contents: + :caption: Contents + self source/kfp - .. * :ref:`modindex` .. * :ref:`kfp-ref` .. * :ref:`search` diff --git a/docs/requirements.txt b/docs/requirements.txt new file mode 100644 index 000000000000..ce9d389c498f --- /dev/null +++ b/docs/requirements.txt @@ -0,0 +1,2 @@ +sphinx==3.1.2 +sphinx_rtd_theme==0.5.0 \ No newline at end of file diff --git a/docs/source/kfp.client.rst b/docs/source/kfp.client.rst index d979c353c445..a9b5331fa869 100644 --- a/docs/source/kfp.client.rst +++ b/docs/source/kfp.client.rst @@ -1,5 +1,5 @@ kfp.Client class ------------------- +================ .. autoclass:: kfp.Client :members: @@ -7,7 +7,7 @@ kfp.Client class :show-inheritance: Generated APIs ------------------- +-------------- .. toctree:: :maxdepth: 2 diff --git a/docs/source/kfp.components.structures.rst b/docs/source/kfp.components.structures.rst index fb499f4c1fdc..3f075f728822 100644 --- a/docs/source/kfp.components.structures.rst +++ b/docs/source/kfp.components.structures.rst @@ -9,5 +9,6 @@ kfp.components.structures package :imported-members: .. toctree:: + :maxdepth: 2 kfp.components.structures.kubernetes diff --git a/docs/source/kfp.notebook.rst b/docs/source/kfp.notebook.rst deleted file mode 100644 index c662ad49efae..000000000000 --- a/docs/source/kfp.notebook.rst +++ /dev/null @@ -1,8 +0,0 @@ -kfp.notebook package -==================== - -.. automodule:: kfp.notebook - :members: - :undoc-members: - :show-inheritance: - :imported-members: diff --git a/docs/source/kfp.rst b/docs/source/kfp.rst index c0cf566101f8..ef4a94e38773 100644 --- a/docs/source/kfp.rst +++ b/docs/source/kfp.rst @@ -7,12 +7,11 @@ kfp package .. toctree:: :maxdepth: 2 + kfp.client kfp.compiler kfp.components kfp.containers kfp.dsl - kfp.client - kfp.notebook kfp.extensions .. automodule:: kfp diff --git a/docs/source/modules.rst b/docs/source/modules.rst deleted file mode 100644 index e246a1ec59e9..000000000000 --- a/docs/source/modules.rst +++ /dev/null @@ -1,7 +0,0 @@ -kfp -=== - -.. toctree:: - :maxdepth: 4 - - kfp diff --git a/sdk/python/kfp/_client.py b/sdk/python/kfp/_client.py index a6f657ccb9bc..6565a273f22d 100644 --- a/sdk/python/kfp/_client.py +++ b/sdk/python/kfp/_client.py @@ -50,9 +50,9 @@ "LESS_THAN_EQUALS": 7} def _add_generated_apis(target_struct, api_module, api_client): - '''Initializes a hierarchical API object based on the generated API module. + """Initializes a hierarchical API object based on the generated API module. PipelineServiceApi.create_pipeline becomes target_struct.pipelines.create_pipeline - ''' + """ Struct = type('Struct', (), {}) def camel_case_to_snake_case(name): @@ -88,7 +88,26 @@ def camel_case_to_snake_case(name): class Client(object): - """ API Client for KubeFlow Pipeline. + """API Client for KubeFlow Pipeline. + + Args: + host: The host name to use to talk to Kubeflow Pipelines. If not set, the in-cluster + service DNS name will be used, which only works if the current environment is a pod + in the same cluster (such as a Jupyter instance spawned by Kubeflow's + JupyterHub). If you have a different connection to cluster, such as a kubectl + proxy connection, then set it to something like "127.0.0.1:8080/pipeline. + If you connect to an IAP enabled cluster, set it to + https://.endpoints..cloud.goog/pipeline". + client_id: The client ID used by Identity-Aware Proxy. + namespace: The namespace where the kubeflow pipeline system is run. + other_client_id: The client ID used to obtain the auth codes and refresh tokens. + Reference: https://cloud.google.com/iap/docs/authentication-howto#authenticating_from_a_desktop_app. + other_client_secret: The client secret used to obtain the auth codes and refresh tokens. + existing_token: Pass in token directly, it's used for cases better get token outside of SDK, e.x. GCP Cloud Functions + or caller already has a token + cookies: CookieJar object containing cookies that will be passed to the pipelines API. + proxy: HTTP or HTTPS proxy server + ssl_ca_cert: Cert for proxy """ # in-cluster DNS name of the pipeline service @@ -100,25 +119,6 @@ class Client(object): # TODO: Wrap the configurations for different authentication methods. def __init__(self, host=None, client_id=None, namespace='kubeflow', other_client_id=None, other_client_secret=None, existing_token=None, cookies=None, proxy=None, ssl_ca_cert=None): """Create a new instance of kfp client. - - Args: - host: the host name to use to talk to Kubeflow Pipelines. If not set, the in-cluster - service DNS name will be used, which only works if the current environment is a pod - in the same cluster (such as a Jupyter instance spawned by Kubeflow's - JupyterHub). If you have a different connection to cluster, such as a kubectl - proxy connection, then set it to something like "127.0.0.1:8080/pipeline. - If you connect to an IAP enabled cluster, set it to - https://.endpoints..cloud.goog/pipeline". - client_id: The client ID used by Identity-Aware Proxy. - namespace: the namespace where the kubeflow pipeline system is run. - other_client_id: The client ID used to obtain the auth codes and refresh tokens. - Reference: https://cloud.google.com/iap/docs/authentication-howto#authenticating_from_a_desktop_app. - other_client_secret: The client secret used to obtain the auth codes and refresh tokens. - existing_token: pass in token directly, it's used for cases better get token outside of SDK, e.x. GCP Cloud Functions - or caller already has a token - cookies: CookieJar object containing cookies that will be passed to the pipelines API. - proxy: HTTP or HTTPS proxy server - ssl_ca_cert: cert for proxy """ host = host or os.environ.get(KF_PIPELINES_ENDPOINT_ENV) self._uihost = os.environ.get(KF_PIPELINES_UI_ENDPOINT_ENV, host) @@ -265,7 +265,9 @@ def _refresh_api_client_token(self): def set_user_namespace(self, namespace): """Set user namespace into local context setting file. - This function should only be used when Kubeflow Pipelines is in the multi-user mode. + + This function should only be used when Kubeflow Pipelines is in the multi-user mode. + Args: namespace: kubernetes namespace the user has access to. """ @@ -275,6 +277,7 @@ def set_user_namespace(self, namespace): def get_user_namespace(self): """Get user namespace in context config. + Returns: namespace: kubernetes namespace from the local context file or empty if it wasn't set. """ @@ -282,12 +285,14 @@ def get_user_namespace(self): def create_experiment(self, name, description=None, namespace=None): """Create a new experiment. + Args: - name: the name of the experiment. - description: description of the experiment. - namespace: kubernetes namespace where the experiment should be created. + name: The name of the experiment. + description: Description of the experiment. + namespace: Kubernetes namespace where the experiment should be created. For single user deployment, leave it as None; For multi user, input a namespace where the user is authorized. + Returns: An Experiment object. Most important field is id. """ @@ -323,11 +328,13 @@ def create_experiment(self, name, description=None, namespace=None): return experiment def get_pipeline_id(self, name): - """Returns the pipeline id if a pipeline with the name exsists. + """Find the id of a pipeline by name. + Args: - name: pipeline name + name: Pipeline name. + Returns: - A response object including a list of experiments and next page token. + Returns the pipeline id if a pipeline with the name exists. """ pipeline_filter = json.dumps({ "predicates": [ @@ -347,13 +354,15 @@ def get_pipeline_id(self, name): def list_experiments(self, page_token='', page_size=10, sort_by='', namespace=None): """List experiments. + Args: - page_token: token for starting of the page. - page_size: size of the page. - sort_by: can be '[field_name]', '[field_name] des'. For example, 'name desc'. - namespace: kubernetes namespace where the experiment was created. + page_token: Token for starting of the page. + page_size: Size of the page. + sort_by: Can be '[field_name]', '[field_name] des'. For example, 'name desc'. + namespace: Kubernetes namespace where the experiment was created. For single user deployment, leave it as None; For multi user, input a namespace where the user is authorized. + Returns: A response object including a list of experiments and next page token. """ @@ -368,15 +377,19 @@ def list_experiments(self, page_token='', page_size=10, sort_by='', namespace=No def get_experiment(self, experiment_id=None, experiment_name=None, namespace=None): """Get details of an experiment + Either experiment_id or experiment_name is required + Args: - experiment_id: id of the experiment. (Optional) - experiment_name: name of the experiment. (Optional) - namespace: kubernetes namespace where the experiment was created. + experiment_id: Id of the experiment. (Optional) + experiment_name: Name of the experiment. (Optional) + namespace: Kubernetes namespace where the experiment was created. For single user deployment, leave it as None; For multi user, input the namespace where the user is authorized. + Returns: A response object including details of a experiment. + Throws: Exception if experiment is not found or None of the arguments is provided """ @@ -426,10 +439,12 @@ def _choose_pipeline_yaml_file(file_list) -> str: def list_pipelines(self, page_token='', page_size=10, sort_by=''): """List pipelines. + Args: - page_token: token for starting of the page. - page_size: size of the page. + page_token: Token for starting of the page. + page_size: Size of the page. sort_by: one of 'field_name', 'field_name desc'. For example, 'name desc'. + Returns: A response object including a list of pipelines and next page token. """ @@ -437,13 +452,15 @@ def list_pipelines(self, page_token='', page_size=10, sort_by=''): def list_pipeline_versions(self, pipeline_id: str, page_token='', page_size=10, sort_by=''): """List all versions of a given pipeline. + Args: - pipeline_id: the string ID of a pipeline. - page_token: token for starting of the page. - page_size: size of the page. - sort_by: one of 'field_name', 'field_name desc'. For example, 'name desc'. + pipeline_id: The id of a pipeline. + page_token: Token for starting of the page. + page_size: Size of the page. + sort_by: one of 'field_name', 'field_name desc'. For example, 'name desc'. + Returns: - A response object including a list of pipelines and next page token. + A response object including a list of pipeline versions and next page token. """ return self._pipelines_api.list_pipeline_versions( resource_key_type="PIPELINE", @@ -458,12 +475,12 @@ def run_pipeline(self, experiment_id, job_name, pipeline_package_path=None, para """Run a specified pipeline. Args: - experiment_id: The string id of an experiment. - job_name: name of the job. - pipeline_package_path: local path of the pipeline package(the filename should end with one of the following .tar.gz, .tgz, .zip, .yaml, .yml). - params: a dictionary with key (string) as param name and value (string) as as param value. - pipeline_id: the string ID of a pipeline. - version_id: the string ID of a pipeline version. + experiment_id: The id of an experiment. + job_name: Name of the job. + pipeline_package_path: Local path of the pipeline package(the filename should end with one of the following .tar.gz, .tgz, .zip, .yaml, .yml). + params: A dictionary with key (string) as param name and value (string) as as param value. + pipeline_id: The id of a pipeline. + version_id: The id of a pipeline version. If both pipeline_id and version_id are specified, version_id will take precendence. If only pipeline_id is specified, the default version of this pipeline is used to create the run. @@ -490,9 +507,10 @@ def run_pipeline(self, experiment_id, job_name, pipeline_package_path=None, para def create_recurring_run(self, experiment_id, job_name, description=None, start_time=None, end_time=None, interval_second=None, cron_expression=None, max_concurrency=1, no_catchup=None, params={}, pipeline_package_path=None, pipeline_id=None, version_id=None, enabled=True): """Create a recurring run. + Args: experiment_id: The string id of an experiment. - job_name: name of the job. + job_name: Name of the job. description: An optional job description. start_time: The RFC3339 time string of the time when to start the job. end_time: The RFC3339 time string of the time when to end the job. @@ -513,6 +531,7 @@ def create_recurring_run(self, experiment_id, job_name, description=None, start_ If both pipeline_id and version_id are specified, pipeline_id will take precendence This will change in a future version, so it is recommended to use version_id by itself. enabled: A bool indicating whether the recurring run is enabled or disabled. + Returns: A Job object. Most important field is id. """ @@ -549,14 +568,16 @@ def create_recurring_run(self, experiment_id, job_name, description=None, start_ def _create_job_config(self, experiment_id, params, pipeline_package_path, pipeline_id, version_id): """Create a JobConfig with spec and resource_references. + Args: - experiment_id: The string id of an experiment. + experiment_id: The id of an experiment. pipeline_package_path: Local path of the pipeline package(the filename should end with one of the following .tar.gz, .tgz, .zip, .yaml, .yml). params: A dictionary with key (string) as param name and value (string) as param value. - pipeline_id: The string ID of a pipeline. - version_id: The string ID of a pipeline version. + pipeline_id: The id of a pipeline. + version_id: The id of a pipeline version. If both pipeline_id and version_id are specified, pipeline_id will take precendence This will change in a future version, so it is recommended to use version_id by itself. + Returns: A JobConfig object with attributes spec and resource_reference. """ @@ -594,7 +615,8 @@ def __init__(self, spec, resource_references): return JobConfig(spec=spec, resource_references=resource_references) def create_run_from_pipeline_func(self, pipeline_func: Callable, arguments: Mapping[str, str], run_name=None, experiment_name=None, pipeline_conf: kfp.dsl.PipelineConf = None, namespace=None): - '''Runs pipeline on KFP-enabled Kubernetes cluster. + """Runs pipeline on KFP-enabled Kubernetes cluster. + This command compiles the pipeline function, creates or gets an experiment and submits the pipeline for execution. Args: @@ -602,10 +624,10 @@ def create_run_from_pipeline_func(self, pipeline_func: Callable, arguments: Mapp arguments: Arguments to the pipeline function provided as a dict. run_name: Optional. Name of the run to be shown in the UI. experiment_name: Optional. Name of the experiment to add the run to. - namespace: kubernetes namespace where the pipeline runs are created. + namespace: Kubernetes namespace where the pipeline runs are created. For single user deployment, leave it as None; For multi user, input a namespace where the user is authorized - ''' + """ #TODO: Check arguments against the pipeline function pipeline_name = pipeline_func.__name__ run_name = run_name or pipeline_name + ' ' + datetime.datetime.now().strftime('%Y-%m-%d %H-%M-%S') @@ -615,7 +637,8 @@ def create_run_from_pipeline_func(self, pipeline_func: Callable, arguments: Mapp return self.create_run_from_pipeline_package(pipeline_package_path, arguments, run_name, experiment_name, namespace) def create_run_from_pipeline_package(self, pipeline_file: str, arguments: Mapping[str, str], run_name=None, experiment_name=None, namespace=None): - '''Runs pipeline on KFP-enabled Kubernetes cluster. + """Runs pipeline on KFP-enabled Kubernetes cluster. + This command compiles the pipeline function, creates or gets an experiment and submits the pipeline for execution. Args: @@ -623,10 +646,10 @@ def create_run_from_pipeline_package(self, pipeline_file: str, arguments: Mappin arguments: Arguments to the pipeline function provided as a dict. run_name: Optional. Name of the run to be shown in the UI. experiment_name: Optional. Name of the experiment to add the run to. - namespace: kubernetes namespace where the pipeline runs are created. + namespace: Kubernetes namespace where the pipeline runs are created. For single user deployment, leave it as None; For multi user, input a namespace where the user is authorized - ''' + """ class RunPipelineResult: def __init__(self, client, run_info): @@ -657,15 +680,17 @@ def __repr__(self): return RunPipelineResult(self, run_info) def list_runs(self, page_token='', page_size=10, sort_by='', experiment_id=None, namespace=None): - """List runs. + """List runs, optionally can be filtered by experiment or namespace. + Args: - page_token: token for starting of the page. - page_size: size of the page. - sort_by: one of 'field_name', 'field_name desc'. For example, 'name desc'. - experiment_id: experiment id to filter upon - namespace: kubernetes namespace to filter upon. + page_token: Token for starting of the page. + page_size: Size of the page. + sort_by: One of 'field_name', 'field_name desc'. For example, 'name desc'. + experiment_id: Experiment id to filter upon + namespace: Kubernetes namespace to filter upon. For single user deployment, leave it as None; For multi user, input a namespace where the user is authorized. + Returns: A response object including a list of experiments and next page token. """ @@ -680,11 +705,13 @@ def list_runs(self, page_token='', page_size=10, sort_by='', experiment_id=None, def list_recurring_runs(self, page_token='', page_size=10, sort_by='', experiment_id=None): """List recurring runs. + Args: - page_token: token for starting of the page. - page_size: size of the page. - sort_by: one of 'field_name', 'field_name desc'. For example, 'name desc'. - experiment_id: experiment id to filter upon + page_token: Token for starting of the page. + page_size: Size of the page. + sort_by: One of 'field_name', 'field_name desc'. For example, 'name desc'. + experiment_id: Experiment id to filter upon. + Returns: A response object including a list of recurring_runs and next page token. """ @@ -696,10 +723,13 @@ def list_recurring_runs(self, page_token='', page_size=10, sort_by='', experimen def get_recurring_run(self, job_id): """Get recurring_run details. + Args: - id of the recurring_run. + job_id: id of the recurring_run. + Returns: A response object including details of a recurring_run. + Throws: Exception if recurring_run is not found. """ @@ -708,10 +738,13 @@ def get_recurring_run(self, job_id): def get_run(self, run_id): """Get run details. + Args: - id of the run. + run_id: id of the run. + Returns: A response object including details of a run. + Throws: Exception if run is not found. """ @@ -719,14 +752,16 @@ def get_run(self, run_id): def wait_for_run_completion(self, run_id, timeout): """Waits for a run to complete. + Args: - run_id: run id, returned from run_pipeline. - timeout: timeout in seconds. + run_id: Run id, returned from run_pipeline. + timeout: Timeout in seconds. + Returns: A run detail object: Most important fields are run and pipeline_runtime. + Raises: - TimeoutError: if the pipeline run failed to finish before the specified - timeout. + TimeoutError: if the pipeline run failed to finish before the specified timeout. """ status = 'Running:' start_time = datetime.datetime.now() @@ -750,10 +785,12 @@ def wait_for_run_completion(self, run_id, timeout): def _get_workflow_json(self, run_id): """Get the workflow json. + Args: run_id: run id, returned from run_pipeline. + Returns: - workflow: json workflow + workflow: Json workflow """ get_run_response = self._run_api.get_run(run_id=run_id) workflow = get_run_response.pipeline_runtime.workflow_manifest @@ -767,10 +804,12 @@ def upload_pipeline( description: str = None, ): """Uploads the pipeline to the Kubeflow Pipelines cluster. + Args: pipeline_package_path: Local path to the pipeline package. pipeline_name: Optional. Name of the pipeline to be shown in the UI. description: Optional. Description of the pipeline to be shown in the UI. + Returns: Server response object containing pipleine id and other information. """ @@ -822,10 +861,13 @@ def upload_pipeline_version( def get_pipeline(self, pipeline_id): """Get pipeline details. + Args: - id of the pipeline. + pipeline_id: id of the pipeline. + Returns: A response object including details of a pipeline. + Throws: Exception if pipeline is not found. """ @@ -833,11 +875,13 @@ def get_pipeline(self, pipeline_id): def delete_pipeline(self, pipeline_id): """Delete pipeline. + Args: - id of the pipeline. + pipeline_id: id of the pipeline. + Returns: - Object. If the method is called asynchronously, - returns the request thread. + Object. If the method is called asynchronously, returns the request thread. + Throws: Exception if pipeline is not found. """ @@ -845,11 +889,13 @@ def delete_pipeline(self, pipeline_id): def list_pipeline_versions(self, pipeline_id, page_token='', page_size=10, sort_by=''): """Lists pipeline versions. + Args: - pipeline_id: id of the pipeline to list versions - page_token: token for starting of the page. - page_size: size of the page. - sort_by: one of 'field_name', 'field_name des'. For example, 'name des'. + pipeline_id: Id of the pipeline to list versions + page_token: Token for starting of the page. + page_size: Size of the page. + sort_by: One of 'field_name', 'field_name des'. For example, 'name des'. + Returns: A response object including a list of versions and next page token. """ diff --git a/sdk/python/kfp/aws.py b/sdk/python/kfp/aws.py index 338cec288cc6..aad8315eb6b4 100644 --- a/sdk/python/kfp/aws.py +++ b/sdk/python/kfp/aws.py @@ -15,9 +15,11 @@ def use_aws_secret(secret_name='aws-secret', aws_access_key_id_name='AWS_ACCESS_KEY_ID', aws_secret_access_key_name='AWS_SECRET_ACCESS_KEY'): """An operator that configures the container to use AWS credentials. - AWS doesn't create secret along with kubeflow deployment and it requires users - to manually create credential secret with proper permissions. - --- + AWS doesn't create secret along with kubeflow deployment and it requires users + to manually create credential secret with proper permissions. + + :: + apiVersion: v1 kind: Secret metadata: diff --git a/sdk/python/kfp/azure.py b/sdk/python/kfp/azure.py index 27e1bcd5908c..9df2fcb674d6 100644 --- a/sdk/python/kfp/azure.py +++ b/sdk/python/kfp/azure.py @@ -15,11 +15,10 @@ def use_azure_secret(secret_name='azcreds'): """An operator that configures the container to use Azure user credentials. - The azcreds secret is created as part of the kubeflow deployment that - stores the client ID and secrets for the kubeflow azure service principal. + The azcreds secret is created as part of the kubeflow deployment that + stores the client ID and secrets for the kubeflow azure service principal. - With this service principal, the container has a range of Azure APIs to - access to. + With this service principal, the container has a range of Azure APIs to access to. """ def _use_azure_secret(task): diff --git a/sdk/python/kfp/compiler/compiler.py b/sdk/python/kfp/compiler/compiler.py index adf66f346aec..b7ca2c536842 100644 --- a/sdk/python/kfp/compiler/compiler.py +++ b/sdk/python/kfp/compiler/compiler.py @@ -36,19 +36,19 @@ class Compiler(object): - """DSL Compiler. - - It compiles DSL pipeline functions into workflow yaml. Example usage: - ```python - @dsl.pipeline( - name='name', - description='description' - ) - def my_pipeline(a: int = 1, b: str = "default value"): - ... - - Compiler().compile(my_pipeline, 'path/to/workflow.yaml') - ``` + """DSL Compiler that compiles pipeline functions into workflow yaml. + + Example: + How to use the compiler to construct workflow yaml:: + + @dsl.pipeline( + name='name', + description='description' + ) + def my_pipeline(a: int = 1, b: str = "default value"): + ... + + Compiler().compile(my_pipeline, 'path/to/workflow.yaml') """ def _pipelineparam_full_name(self, param): @@ -875,17 +875,20 @@ def create_workflow(self, pipeline_description: Text=None, params_list: List[dsl.PipelineParam]=None, pipeline_conf: dsl.PipelineConf = None) -> Dict[Text, Any]: - """ Create workflow spec from pipeline function and specified pipeline + """Create workflow spec from pipeline function and specified pipeline params/metadata. Currently, the pipeline params are either specified in the signature of the pipeline function or by passing a list of dsl.PipelineParam. Conflict will cause ValueError. - :param pipeline_func: pipeline function where ContainerOps are invoked. - :param pipeline_name: - :param pipeline_description: - :param params_list: list of pipeline params to append to the pipeline. - :param pipeline_conf: PipelineConf instance. Can specify op transforms, image pull secrets and other pipeline-level configuration options. Overrides any configuration that may be set by the pipeline. - :return: workflow dict. + Args: + pipeline_func: Pipeline function where ContainerOps are invoked. + pipeline_name: The name of the pipeline to compile. + pipeline_description: The description of the pipeline. + params_list: List of pipeline params to append to the pipeline. + pipeline_conf: PipelineConf instance. Can specify op transforms, image pull secrets and other pipeline-level configuration options. Overrides any configuration that may be set by the pipeline. + + Returns: + The created workflow dictionary. """ return self._create_workflow(pipeline_func, pipeline_name, pipeline_description, params_list, pipeline_conf) @@ -900,9 +903,9 @@ def compile(self, pipeline_func, package_path, type_check=True, pipeline_conf: d """Compile the given pipeline function into workflow yaml. Args: - pipeline_func: pipeline functions with @dsl.pipeline decorator. - package_path: the output workflow tar.gz file path. for example, "~/a.tar.gz" - type_check: whether to enable the type check or not, default: False. + pipeline_func: Pipeline functions with @dsl.pipeline decorator. + package_path: The output workflow tar.gz file path. for example, "~/a.tar.gz" + type_check: Whether to enable the type check or not, default: False. pipeline_conf: PipelineConf instance. Can specify op transforms, image pull secrets and other pipeline-level configuration options. Overrides any configuration that may be set by the pipeline. """ import kfp @@ -922,8 +925,7 @@ def _write_workflow(workflow: Dict[Text, Any], package_path: Text = None): Args: workflow: Workflow spec of the pipline, dict. - package_path: file path to be written. If not specified, a yaml_text string - will be returned. + package_path: file path to be written. If not specified, a yaml_text string will be returned. """ yaml_text = dump_yaml(workflow) diff --git a/sdk/python/kfp/components/_component_store.py b/sdk/python/kfp/components/_component_store.py index 52d6ba4b1d10..c5175cfebe2a 100644 --- a/sdk/python/kfp/components/_component_store.py +++ b/sdk/python/kfp/components/_component_store.py @@ -33,26 +33,45 @@ def __init__(self, local_search_paths=None, url_search_prefixes=None, auth=None) self._url_to_info_db = KeyValueStore(cache_dir=cache_base_dir / 'url_to_info') def load_component_from_url(self, url): + """Loads a component from a URL. + + Args: + url: The url of the component specification. + + Returns: + A factory function with a strongly-typed signature. + """ return comp.load_component_from_url(url=url, auth=self._auth) def load_component_from_file(self, path): + """Loads a component from a path. + + Args: + path: The path of the component specification. + + Returns: + A factory function with a strongly-typed signature. + """ return comp.load_component_from_file(path) def load_component(self, name, digest=None, tag=None): - ''' + """ Loads component local file or URL and creates a task factory function Search locations: - //component.yaml - //component.yaml + + * :code:`//component.yaml` + * :code:`//component.yaml` If the digest is specified, then the search locations are: - //versions/sha256/ - //versions/sha256/ + + * :code:`//versions/sha256/` + * :code:`//versions/sha256/` If the tag is specified, then the search locations are: - //versions/tags/ - //versions/tags/ + + * :code:`//versions/tags/` + * :code:`//versions/tags/` Args: name: Component name used to search and load the component artifact containing the component definition. @@ -63,7 +82,7 @@ def load_component(self, name, digest=None, tag=None): Returns: A factory function with a strongly-typed signature. Once called with the required arguments, the factory constructs a pipeline task instance (ContainerOp). - ''' + """ #This function should be called load_task_factory since it returns a factory function. #The real load_component function should produce an object with component properties (e.g. name, description, inputs/outputs). #TODO: Change this function to return component spec object but it should be callable to construct tasks. @@ -78,10 +97,10 @@ def _load_component_spec_in_component_ref( self, component_ref: ComponentReference, ) -> ComponentReference: - '''Takes component_ref, finds the component spec and returns component_ref with .spec set to the component spec. + """Takes component_ref, finds the component spec and returns component_ref with .spec set to the component spec. See ComponentStore.load_component for the details of the search logic. - ''' + """ if component_ref.spec: return component_ref @@ -144,7 +163,7 @@ def _load_component_from_ref(self, component_ref: ComponentReference) -> Callabl return comp._create_task_factory_from_component_spec(component_spec=component_ref.spec, component_ref=component_ref) def search(self, name: str): - '''Searches for components by name in the configured component store. + """Searches for components by name in the configured component store. Prints the component name and URL for components that match the given name. Only components on GitHub are currently supported. @@ -152,10 +171,11 @@ def search(self, name: str): Example:: kfp.components.ComponentStore.default_store.search('xgboost') - - >>> Xgboost train https://raw.githubusercontent.com/.../components/XGBoost/Train/component.yaml - >>> Xgboost predict https://raw.githubusercontent.com/.../components/XGBoost/Predict/component.yaml - ''' + + # Returns results: + # Xgboost train https://raw.githubusercontent.com/.../components/XGBoost/Train/component.yaml + # Xgboost predict https://raw.githubusercontent.com/.../components/XGBoost/Predict/component.yaml + """ self._refresh_component_cache() for url in self._url_to_info_db.keys(): component_info = json.loads(self._url_to_info_db.try_get_value_bytes(url)) diff --git a/sdk/python/kfp/components/_components.py b/sdk/python/kfp/components/_components.py index 50343e86c5c3..b778b85daaeb 100644 --- a/sdk/python/kfp/components/_components.py +++ b/sdk/python/kfp/components/_components.py @@ -33,20 +33,19 @@ def load_component(filename=None, url=None, text=None): - ''' - Loads component from text, file or URL and creates a task factory function + """Loads component from text, file or URL and creates a task factory function Only one argument should be specified. Args: filename: Path of local file containing the component definition. - url: The URL of the component file data + url: The URL of the component file data. text: A string containing the component file data. Returns: A factory function with a strongly-typed signature. Once called with the required arguments, the factory constructs a pipeline task instance (ContainerOp). - ''' + """ #This function should be called load_task_factory since it returns a factory function. #The real load_component function should produce an object with component properties (e.g. name, description, inputs/outputs). #TODO: Change this function to return component spec object but it should be callable to construct tasks. @@ -64,8 +63,7 @@ def load_component(filename=None, url=None, text=None): def load_component_from_url(url: str, auth=None): - ''' - Loads component from URL and creates a task factory function + """Loads component from URL and creates a task factory function Args: url: The URL of the component file data @@ -74,7 +72,7 @@ def load_component_from_url(url: str, auth=None): Returns: A factory function with a strongly-typed signature. Once called with the required arguments, the factory constructs a pipeline task instance (ContainerOp). - ''' + """ component_spec = _load_component_spec_from_url(url, auth) url = _fix_component_uri(url) component_ref = ComponentReference(url=url) @@ -86,8 +84,7 @@ def load_component_from_url(url: str, auth=None): def load_component_from_file(filename): - ''' - Loads component from file and creates a task factory function + """Loads component from file and creates a task factory function Args: filename: Path of local file containing the component definition. @@ -95,7 +92,7 @@ def load_component_from_file(filename): Returns: A factory function with a strongly-typed signature. Once called with the required arguments, the factory constructs a pipeline task instance (ContainerOp). - ''' + """ component_spec = _load_component_spec_from_file(path=filename) return _create_task_factory_from_component_spec( component_spec=component_spec, @@ -104,8 +101,7 @@ def load_component_from_file(filename): def load_component_from_text(text): - ''' - Loads component from text and creates a task factory function + """Loads component from text and creates a task factory function Args: text: A string containing the component file data. @@ -113,7 +109,7 @@ def load_component_from_text(text): Returns: A factory function with a strongly-typed signature. Once called with the required arguments, the factory constructs a pipeline task instance (ContainerOp). - ''' + """ if text is None: raise TypeError component_spec = _load_component_spec_from_component_text(text) @@ -149,10 +145,10 @@ def _load_component_spec_from_url(url: str, auth=None): def _load_component_spec_from_yaml_or_zip_bytes(data: bytes): - '''Loads component spec from binary data. + """Loads component spec from binary data. The data can be a YAML file or a zip file with a component.yaml file inside. - ''' + """ import zipfile import io stream = io.BytesIO(data) diff --git a/sdk/python/kfp/components/_python_op.py b/sdk/python/kfp/components/_python_op.py index ba68643d70d6..9a89dbff7ae6 100644 --- a/sdk/python/kfp/components/_python_op.py +++ b/sdk/python/kfp/components/_python_op.py @@ -42,40 +42,37 @@ # InputPath(list) or InputPath('JsonObject') class InputPath: - '''When creating component from function, InputPath should be used as function parameter annotation to tell the system to pass the *data file path* to the function instead of passing the actual data.''' + '''When creating component from function, :class:`.InputPath` should be used as function parameter annotation to tell the system to pass the *data file path* to the function instead of passing the actual data.''' def __init__(self, type=None): self.type = type class InputTextFile: - '''When creating component from function, InputTextFile should be used as function parameter annotation to tell the system to pass the *text data stream* object (`io.TextIOWrapper`) to the function instead of passing the actual data.''' + '''When creating component from function, :class:`.InputTextFile` should be used as function parameter annotation to tell the system to pass the *text data stream* object (`io.TextIOWrapper`) to the function instead of passing the actual data.''' def __init__(self, type=None): self.type = type class InputBinaryFile: - '''When creating component from function, InputBinaryFile should be used as function parameter annotation to tell the system to pass the *binary data stream* object (`io.BytesIO`) to the function instead of passing the actual data.''' + '''When creating component from function, :class:`.InputBinaryFile` should be used as function parameter annotation to tell the system to pass the *binary data stream* object (`io.BytesIO`) to the function instead of passing the actual data.''' def __init__(self, type=None): self.type = type -#OutputFile[GcsPath[Gzipped[Text]]] - - class OutputPath: - '''When creating component from function, OutputPath should be used as function parameter annotation to tell the system that the function wants to output data by writing it into a file with the given path instead of returning the data from the function.''' + '''When creating component from function, :class:`.OutputPath` should be used as function parameter annotation to tell the system that the function wants to output data by writing it into a file with the given path instead of returning the data from the function.''' def __init__(self, type=None): self.type = type class OutputTextFile: - '''When creating component from function, OutputTextFile should be used as function parameter annotation to tell the system that the function wants to output data by writing it into a given text file stream (`io.TextIOWrapper`) instead of returning the data from the function.''' + '''When creating component from function, :class:`.OutputTextFile` should be used as function parameter annotation to tell the system that the function wants to output data by writing it into a given text file stream (`io.TextIOWrapper`) instead of returning the data from the function.''' def __init__(self, type=None): self.type = type class OutputBinaryFile: - '''When creating component from function, OutputBinaryFile should be used as function parameter annotation to tell the system that the function wants to output data by writing it into a given binary file stream (`io.BytesIO`) instead of returning the data from the function.''' + '''When creating component from function, :class:`.OutputBinaryFile` should be used as function parameter annotation to tell the system that the function wants to output data by writing it into a given binary file stream (:code:`io.BytesIO`) instead of returning the data from the function.''' def __init__(self, type=None): self.type = type @@ -405,7 +402,7 @@ def annotation_to_type_struct(annotation): def _func_to_component_spec(func, extra_code='', base_image : str = None, packages_to_install: List[str] = None, modules_to_capture: List[str] = None, use_code_pickling=False) -> ComponentSpec: - '''Takes a self-contained python function and converts it to component + '''Takes a self-contained python function and converts it to component. Args: func: Required. The function to be converted @@ -413,8 +410,11 @@ def _func_to_component_spec(func, extra_code='', base_image : str = None, packag Note: The image can also be specified by decorating the function with the @python_component decorator. If different base images are explicitly specified in both places, an error is raised. extra_code: Optional. Python source code that gets placed before the function code. Can be used as workaround to define types used in function signature. packages_to_install: Optional. List of [versioned] python packages to pip install before executing the user function. - modules_to_capture: Optional. List of module names that will be captured (instead of just referencing) during the dependency scan. By default the func.__module__ is captured. + modules_to_capture: Optional. List of module names that will be captured (instead of just referencing) during the dependency scan. By default the :code:`func.__module__` is captured. use_code_pickling: Specifies whether the function code should be captured using pickling as opposed to source code manipulation. Pickling has better support for capturing dependencies, but is sensitive to version mismatch between python in component creation environment and runtime image. + + Returns: + A :py:class:`kfp.components.structures.ComponentSpec` instance. ''' decorator_base_image = getattr(func, '_component_base_image', None) if decorator_base_image is not None: @@ -650,11 +650,10 @@ def _func_to_component_dict(func, extra_code='', base_image: str = None, package def func_to_component_text(func, extra_code='', base_image: str = None, packages_to_install: List[str] = None, modules_to_capture: List[str] = None, use_code_pickling=False): - ''' - Converts a Python function to a component definition and returns its textual representation + '''Converts a Python function to a component definition and returns its textual representation. + + Function docstring is used as component description. Argument and return annotations are used as component input/output types. - Function docstring is used as component description. - Argument and return annotations are used as component input/output types. To declare a function with multiple return values, use the NamedTuple return annotation syntax:: from typing import NamedTuple @@ -667,7 +666,7 @@ def add_multiply_two_numbers(a: float, b: float) -> NamedTuple('DummyName', [('s base_image: Optional. Specify a custom Docker container image to use in the component. For lightweight components, the image needs to have python 3.5+. Default is python:3.7 extra_code: Optional. Extra code to add before the function code. Can be used as workaround to define types used in function signature. packages_to_install: Optional. List of [versioned] python packages to pip install before executing the user function. - modules_to_capture: Optional. List of module names that will be captured (instead of just referencing) during the dependency scan. By default the func.__module__ is captured. The actual algorithm: Starting with the initial function, start traversing dependencies. If the dependecy.__module__ is in the modules_to_capture list then it's captured and it's dependencies are traversed. Otherwise the dependency is only referenced instead of capturing and its dependencies are not traversed. + modules_to_capture: Optional. List of module names that will be captured (instead of just referencing) during the dependency scan. By default the :code:`func.__module__` is captured. The actual algorithm: Starting with the initial function, start traversing dependencies. If the dependency.__module__ is in the modules_to_capture list then it's captured and it's dependencies are traversed. Otherwise the dependency is only referenced instead of capturing and its dependencies are not traversed. use_code_pickling: Specifies whether the function code should be captured using pickling as opposed to source code manipulation. Pickling has better support for capturing dependencies, but is sensitive to version mismatch between python in component creation environment and runtime image. Returns: @@ -685,11 +684,10 @@ def add_multiply_two_numbers(a: float, b: float) -> NamedTuple('DummyName', [('s def func_to_component_file(func, output_component_file, base_image: str = None, extra_code='', packages_to_install: List[str] = None, modules_to_capture: List[str] = None, use_code_pickling=False) -> None: - ''' - Converts a Python function to a component definition and writes it to a file + '''Converts a Python function to a component definition and writes it to a file. + + Function docstring is used as component description. Argument and return annotations are used as component input/output types. - Function docstring is used as component description. - Argument and return annotations are used as component input/output types. To declare a function with multiple return values, use the NamedTuple return annotation syntax:: from typing import NamedTuple @@ -703,7 +701,7 @@ def add_multiply_two_numbers(a: float, b: float) -> NamedTuple('DummyName', [('s base_image: Optional. Specify a custom Docker container image to use in the component. For lightweight components, the image needs to have python 3.5+. Default is tensorflow/tensorflow:1.13.2-py3 extra_code: Optional. Extra code to add before the function code. Can be used as workaround to define types used in function signature. packages_to_install: Optional. List of [versioned] python packages to pip install before executing the user function. - modules_to_capture: Optional. List of module names that will be captured (instead of just referencing) during the dependency scan. By default the func.__module__ is captured. The actual algorithm: Starting with the initial function, start traversing dependencies. If the dependecy.__module__ is in the modules_to_capture list then it's captured and it's dependencies are traversed. Otherwise the dependency is only referenced instead of capturing and its dependencies are not traversed. + modules_to_capture: Optional. List of module names that will be captured (instead of just referencing) during the dependency scan. By default the :code:`func.__module__` is captured. The actual algorithm: Starting with the initial function, start traversing dependencies. If the :code:`dependency.__module__` is in the :code:`modules_to_capture` list then it's captured and it's dependencies are traversed. Otherwise the dependency is only referenced instead of capturing and its dependencies are not traversed. use_code_pickling: Specifies whether the function code should be captured using pickling as opposed to source code manipulation. Pickling has better support for capturing dependencies, but is sensitive to version mismatch between python in component creation environment and runtime image. ''' @@ -720,12 +718,11 @@ def add_multiply_two_numbers(a: float, b: float) -> NamedTuple('DummyName', [('s def func_to_container_op(func, output_component_file=None, base_image: str = None, extra_code='', packages_to_install: List[str] = None, modules_to_capture: List[str] = None, use_code_pickling=False): - ''' - Converts a Python function to a component and returns a task (ContainerOp) factory + '''Converts a Python function to a component and returns a task (:class:`kfp.dsl.ContainerOp`) factory. - Function docstring is used as component description. - Argument and return annotations are used as component input/output types. - To declare a function with multiple return values, use the NamedTuple return annotation syntax:: + Function docstring is used as component description. Argument and return annotations are used as component input/output types. + + To declare a function with multiple return values, use the :code:`NamedTuple` return annotation syntax:: from typing import NamedTuple def add_multiply_two_numbers(a: float, b: float) -> NamedTuple('DummyName', [('sum', float), ('product', float)]): @@ -738,12 +735,12 @@ def add_multiply_two_numbers(a: float, b: float) -> NamedTuple('DummyName', [('s output_component_file: Optional. Write a component definition to a local file. Can be used for sharing. extra_code: Optional. Extra code to add before the function code. Can be used as workaround to define types used in function signature. packages_to_install: Optional. List of [versioned] python packages to pip install before executing the user function. - modules_to_capture: Optional. List of module names that will be captured (instead of just referencing) during the dependency scan. By default the func.__module__ is captured. The actual algorithm: Starting with the initial function, start traversing dependencies. If the dependecy.__module__ is in the modules_to_capture list then it's captured and it's dependencies are traversed. Otherwise the dependency is only referenced instead of capturing and its dependencies are not traversed. + modules_to_capture: Optional. List of module names that will be captured (instead of just referencing) during the dependency scan. By default the :code:`func.__module__` is captured. The actual algorithm: Starting with the initial function, start traversing dependencies. If the :code:`dependency.__module__` is in the :code:`modules_to_capture` list then it's captured and it's dependencies are traversed. Otherwise the dependency is only referenced instead of capturing and its dependencies are not traversed. use_code_pickling: Specifies whether the function code should be captured using pickling as opposed to source code manipulation. Pickling has better support for capturing dependencies, but is sensitive to version mismatch between python in component creation environment and runtime image. Returns: A factory function with a strongly-typed signature taken from the python function. - Once called with the required arguments, the factory constructs a pipeline task instance (ContainerOp) that can run the original function in a container. + Once called with the required arguments, the factory constructs a pipeline task instance (:class:`kfp.dsl.ContainerOp`) that can run the original function in a container. ''' component_spec = _func_to_component_spec( @@ -769,98 +766,95 @@ def create_component_from_func( base_image: str = None, packages_to_install: List[str] = None, ): - ''' - Converts a Python function to a component and returns a task factory (a function that accepts arguments and returns a task object). - - Function name and docstring are used as component name and description. - Argument and return annotations are used as component input/output types. - Example:: - - def add(a: float, b: float) -> float: - """Returns sum of two arguments""" - return a + b - - # add_op is a task factory function that creates a task object when given arguments - add_op = create_component_from_func( - func=add, - base_image='python:3.7', # Optional - output_component_file='add.component.yaml', # Optional - packages_to_install=['pandas==0.24'], # Optional - ) + '''Converts a Python function to a component and returns a task factory (a function that accepts arguments and returns a task object). + + Args: + func: The python function to convert + base_image: Optional. Specify a custom Docker container image to use in the component. For lightweight components, the image needs to have python 3.5+. Default is the python image corresponding to the current python environment. + output_component_file: Optional. Write a component definition to a local file. The produced component file can be loaded back by calling :code:`load_component_from_file` or :code:`load_component_from_uri`. + packages_to_install: Optional. List of [versioned] python packages to pip install before executing the user function. - # The component spec can be accessed through the .component_spec attribute: - add_op.component_spec.save('add.component.yaml') + Returns: + A factory function with a strongly-typed signature taken from the python function. + Once called with the required arguments, the factory constructs a task instance that can run the original function in a container. - # The component function can be called with arguments to create a task: - add_task = add_op(1, 3) + Examples: + The function name and docstring are used as component name and description. Argument and return annotations are used as component input/output types:: - # The resulting task has output references, corresponding to the component outputs. - # When the function only has a single anonymous return value, the output name is "Output": - sum_output_ref = add_task.outputs['Output'] + def add(a: float, b: float) -> float: + """Returns sum of two arguments""" + return a + b - # These task output references can be passed to other component functions, constructing a computation graph: - task2 = add_op(sum_output_ref, 5) + # add_op is a task factory function that creates a task object when given arguments + add_op = create_component_from_func( + func=add, + base_image='python:3.7', # Optional + output_component_file='add.component.yaml', # Optional + packages_to_install=['pandas==0.24'], # Optional + ) + # The component spec can be accessed through the .component_spec attribute: + add_op.component_spec.save('add.component.yaml') - `create_component_from_func` function can also be used as decorator:: + # The component function can be called with arguments to create a task: + add_task = add_op(1, 3) - @create_component_from_func - def add_op(a: float, b: float) -> float: - """Returns sum of two arguments""" - return a + b + # The resulting task has output references, corresponding to the component outputs. + # When the function only has a single anonymous return value, the output name is "Output": + sum_output_ref = add_task.outputs['Output'] - To declare a function with multiple return values, use the NamedTuple return annotation syntax:: + # These task output references can be passed to other component functions, constructing a computation graph: + task2 = add_op(sum_output_ref, 5) - from typing import NamedTuple - def add_multiply_two_numbers(a: float, b: float) -> NamedTuple('Outputs', [('sum', float), ('product', float)]): - """Returns sum and product of two arguments""" - return (a + b, a * b) + :code:`create_component_from_func` function can also be used as decorator:: - add_multiply_op = create_component_from_func(add_multiply_two_numbers) + @create_component_from_func + def add_op(a: float, b: float) -> float: + """Returns sum of two arguments""" + return a + b - # The component function can be called with arguments to create a task: - add_multiply_task = add_multiply_op(1, 3) + To declare a function with multiple return values, use the :code:`NamedTuple` return annotation syntax:: - # The resulting task has output references, corresponding to the component outputs: - sum_output_ref = add_multiply_task.outputs['sum'] + from typing import NamedTuple - # These task output references can be passed to other component functions, constructing a computation graph: - task2 = add_multiply_op(sum_output_ref, 5) + def add_multiply_two_numbers(a: float, b: float) -> NamedTuple('Outputs', [('sum', float), ('product', float)]): + """Returns sum and product of two arguments""" + return (a + b, a * b) + add_multiply_op = create_component_from_func(add_multiply_two_numbers) - Bigger data should be read from files and written to files. - Use the `InputPath` parameter annotation to tell the system that the function wants to consume the corresponding input data as a file. The system will download the data, write it to a local file and then pass the **path** of that file to the function. - Use the `OutputPath` parameter annotation to tell the system that the function wants to produce the corresponding output data as a file. The system will prepare and pass the **path** of a file where the function should write the output data. After the function exits, the system will upload the data to the storage system so that it can be passed to downstream components. - You can specify the type of the consumed/produced data by specifying the type argument to `InputPath` and `OutputPath`. The type can be a python type or an arbitrary type name string. `OutputPath('CatBoostModel')` means that the function states that the data it has written to a file has type 'CatBoostModel'. `InputPath('CatBoostModel')` means that the function states that it expect the data it reads from a file to have type 'CatBoostModel'. When the pipeline author connects inputs to outputs the system checks whether the types match. - Every kind of data can be consumed as a file input. Conversely, bigger data should not be consumed by value as all value inputs pass through the command line. + # The component function can be called with arguments to create a task: + add_multiply_task = add_multiply_op(1, 3) - Example of a component function declaring file input and output:: + # The resulting task has output references, corresponding to the component outputs: + sum_output_ref = add_multiply_task.outputs['sum'] - def catboost_train_classifier( - training_data_path: InputPath('CSV'), # Path to input data file of type "CSV" - trained_model_path: OutputPath('CatBoostModel'), # Path to output data file of type "CatBoostModel" - number_of_trees: int = 100, # Small output of type "Integer" - ) -> NamedTuple('Outputs', [ - ('Accuracy', float), # Small output of type "Float" - ('Precision', float), # Small output of type "Float" - ('JobUri', 'URI'), # Small output of type "URI" - ]): - """Trains CatBoost classification model""" - ... + # These task output references can be passed to other component functions, constructing a computation graph: + task2 = add_multiply_op(sum_output_ref, 5) - return (accuracy, precision, recall) + Bigger data should be read from files and written to files. + Use the :py:class:`kfp.components.InputPath` parameter annotation to tell the system that the function wants to consume the corresponding input data as a file. The system will download the data, write it to a local file and then pass the **path** of that file to the function. + Use the :py:class:`kfp.components.OutputPath` parameter annotation to tell the system that the function wants to produce the corresponding output data as a file. The system will prepare and pass the **path** of a file where the function should write the output data. After the function exits, the system will upload the data to the storage system so that it can be passed to downstream components. + You can specify the type of the consumed/produced data by specifying the type argument to :py:class:`kfp.components.InputPath` and :py:class:`kfp.components.OutputPath`. The type can be a python type or an arbitrary type name string. :code:`OutputPath('CatBoostModel')` means that the function states that the data it has written to a file has type :code:`CatBoostModel`. :code:`InputPath('CatBoostModel')` means that the function states that it expect the data it reads from a file to have type 'CatBoostModel'. When the pipeline author connects inputs to outputs the system checks whether the types match. + Every kind of data can be consumed as a file input. Conversely, bigger data should not be consumed by value as all value inputs pass through the command line. - Args: - func: The python function to convert - base_image: Optional. Specify a custom Docker container image to use in the component. For lightweight components, the image needs to have python 3.5+. Default is the python image corresponding to the current python environment. - output_component_file: Optional. Write a component definition to a local file. The produced component file can be loaded back by calling `load_component_from_file` or `load_component_from_uri`. - packages_to_install: Optional. List of [versioned] python packages to pip install before executing the user function. + Example of a component function declaring file input and output:: - Returns: - A factory function with a strongly-typed signature taken from the python function. - Once called with the required arguments, the factory constructs a task instance that can run the original function in a container. + def catboost_train_classifier( + training_data_path: InputPath('CSV'), # Path to input data file of type "CSV" + trained_model_path: OutputPath('CatBoostModel'), # Path to output data file of type "CatBoostModel" + number_of_trees: int = 100, # Small output of type "Integer" + ) -> NamedTuple('Outputs', [ + ('Accuracy', float), # Small output of type "Float" + ('Precision', float), # Small output of type "Float" + ('JobUri', 'URI'), # Small output of type "URI" + ]): + """Trains CatBoost classification model""" + ... + + return (accuracy, precision, recall) ''' component_spec = _func_to_component_spec( diff --git a/sdk/python/kfp/components/_python_to_graph_component.py b/sdk/python/kfp/components/_python_to_graph_component.py index 624e85b65d37..e5ac71fa1100 100644 --- a/sdk/python/kfp/components/_python_to_graph_component.py +++ b/sdk/python/kfp/components/_python_to_graph_component.py @@ -33,11 +33,16 @@ def create_graph_component_from_pipeline_func( output_component_file: str = None, embed_component_specs: bool = False, ) -> Callable: - '''Experimental! Creates graph component definition from a python pipeline function. The component file can be published for sharing. + '''Creates graph component definition from a python pipeline function. The component file can be published for sharing. + Pipeline function is a function that only calls component functions and passes outputs to inputs. This feature is experimental and lacks support for some of the DSL features like conditions and loops. Only pipelines consisting of loaded components or python components are currently supported (no manually created ContainerOps or ResourceOps). + .. warning:: + + Please note this feature is considered experimental! + Args: pipeline_func: Python function to convert output_component_file: Path of the file where the component definition will be written. The `component.yaml` file can then be published for sharing. diff --git a/sdk/python/kfp/containers/_build_image_api.py b/sdk/python/kfp/containers/_build_image_api.py index 30c959575a35..d893028baedb 100644 --- a/sdk/python/kfp/containers/_build_image_api.py +++ b/sdk/python/kfp/containers/_build_image_api.py @@ -64,11 +64,12 @@ def _generate_dockerfile_text(context_dir: str, dockerfile_path: str, base_image def build_image_from_working_dir(image_name: str = None, working_dir: str = None, file_filter_re: str = r'.*\.py', timeout: int = 1000, base_image: str = None, builder: ContainerBuilder = None) -> str: - '''build_image_from_working_dir builds and pushes a new container image that captures the current python working directory. + '''Builds and pushes a new container image that captures the current python working directory. This function recursively scans the working directory and captures the following files in the container image context: - * requirements.txt files - * all python files (can be overridden by passing a different `file_filter_re` argument) + + * :code:`requirements.txt` files + * All python files (can be overridden by passing a different `file_filter_re` argument) The function generates Dockerfile that starts from a python container image, install packages from requirements.txt (if present) and copies all the captured python files to the container image. The Dockerfile can be overridden by placing a custom Dockerfile in the root of the working directory. @@ -79,17 +80,19 @@ def build_image_from_working_dir(image_name: str = None, working_dir: str = None file_filter_re: Optional. A regular expression that will be used to decide which files to include in the container building context. timeout: Optional. The image building timeout in seconds. base_image: Optional. The container image to use as the base for the new image. If not set, the Google Deep Learning Tensorflow CPU image will be used. - builder: Optional. An instance of ContainerBuilder or compatible class that will be used to build the image. + builder: Optional. An instance of :py:class:`kfp.containers.ContainerBuilder` or compatible class that will be used to build the image. The default builder uses "kubeflow-pipelines-container-builder" service account in "kubeflow" namespace. It works with Kubeflow Pipelines clusters installed in "kubeflow" namespace using Google Cloud Marketplace or Standalone with version > 0.4.0. - If your Kubeflow Pipelines is installed in a different namespace, you should use ContainerBuilder(namespace='', ...). - Depending on how you installed Kubeflow Pipelines, you need to configure your ContainerBuilder instance's namespace and service_account: - For clusters installed with Kubeflow >= 0.7, use ContainerBuidler(namespace='', service_account='default-editor', ...). You can omit the namespace if you use kfp sdk from in-cluster notebook, it uses notebook namespace by default. - For clusters installed with Kubeflow < 0.7, use ContainerBuilder(service_account='default', ...). - For clusters installed using Google Cloud Marketplace or Standalone with version <= 0.4.0, use ContainerBuilder(namespace='' service_account='default') - You may refer to https://www.kubeflow.org/docs/pipelines/installation/overview/ for more details about different installation options. + If your Kubeflow Pipelines is installed in a different namespace, you should use :code:`ContainerBuilder(namespace='', ...)`. + + Depending on how you installed Kubeflow Pipelines, you need to configure your :code:`ContainerBuilder` instance's namespace and service_account: + + * For clusters installed with Kubeflow >= 0.7, use :code:`ContainerBuilder(namespace='', service_account='default-editor', ...)`. You can omit the namespace if you use kfp sdk from in-cluster notebook, it uses notebook namespace by default. + * For clusters installed with Kubeflow < 0.7, use :code:`ContainerBuilder(service_account='default', ...)`. + * For clusters installed using Google Cloud Marketplace or Standalone with version <= 0.4.0, use :code:`ContainerBuilder(namespace='' service_account='default')` + You may refer to `installation guide `_ for more details about different installation options. Returns: - The full name of the container image including the hash digest. E.g. gcr.io/my-org/my-image@sha256:86c1...793c. + The full name of the container image including the hash digest. E.g. :code:`gcr.io/my-org/my-image@sha256:86c1...793c`. ''' current_dir = working_dir or os.getcwd() with tempfile.TemporaryDirectory() as context_dir: diff --git a/sdk/python/kfp/containers/_component_builder.py b/sdk/python/kfp/containers/_component_builder.py index cd2e473bacfc..ebb0347b5c17 100644 --- a/sdk/python/kfp/containers/_component_builder.py +++ b/sdk/python/kfp/containers/_component_builder.py @@ -170,8 +170,7 @@ def _configure_logger(logger): @deprecated(version='0.1.32', reason='`build_python_component` is deprecated. Use `kfp.containers.build_image_from_working_dir` + `kfp.components.func_to_container_op` instead.') def build_python_component(component_func, target_image, base_image=None, dependency=[], staging_gcs_path=None, timeout=600, namespace=None, target_component_file=None, python_version='python3'): - """ build_component automatically builds a container image for the component_func - based on the base_image and pushes to the target_image. + """build_component automatically builds a container image for the component_func based on the base_image and pushes to the target_image. Args: component_func (python function): The python function to build components upon @@ -184,6 +183,7 @@ def build_python_component(component_func, target_image, base_image=None, depend job is running on GKE and value is None the underlying functions will use the default namespace from GKE. . dependency (list): a list of VersionedDependency, which includes the package name and versions, default is empty python_version (str): choose python2 or python3, default is python3 + Raises: ValueError: The function is not decorated with python_component decorator or the python_version is neither python2 nor python3 """ @@ -271,7 +271,7 @@ def build_python_component(component_func, target_image, base_image=None, depend @deprecated(version='0.1.32', reason='`build_docker_image` is deprecated. Use `kfp.containers.build_image_from_working_dir` instead.') def build_docker_image(staging_gcs_path, target_image, dockerfile_path, timeout=600, namespace=None): - """ build_docker_image automatically builds a container image based on the specification in the dockerfile and + """build_docker_image automatically builds a container image based on the specification in the dockerfile and pushes to the target_image. Args: diff --git a/sdk/python/kfp/dsl/_component.py b/sdk/python/kfp/dsl/_component.py index 86f0e07eec6d..ba187ed5ac0c 100644 --- a/sdk/python/kfp/dsl/_component.py +++ b/sdk/python/kfp/dsl/_component.py @@ -37,16 +37,16 @@ def python_component(name, description=None, base_image=None, target_component_f Returns: The same function (with some metadata fields set). - Usage: - ```python - @dsl.python_component( - name='my awesome component', - description='Come, Let's play', - base_image='tensorflow/tensorflow:1.11.0-py3', - ) - def my_component(a: str, b: int) -> str: - ... - ``` + Example: + :: + + @dsl.python_component( + name='my awesome component', + description='Come, Let's play', + base_image='tensorflow/tensorflow:1.11.0-py3', + ) + def my_component(a: str, b: int) -> str: + ... """ def _python_component(func): func._component_human_name = name @@ -64,11 +64,12 @@ def component(func): """Decorator for component functions that returns a ContainerOp. This is useful to enable type checking in the DSL compiler - Usage: - ```python - @dsl.component - def foobar(model: TFModel(), step: MLStep()): - return dsl.ContainerOp() + Example: + :: + + @dsl.component + def foobar(model: TFModel(), step: MLStep()): + return dsl.ContainerOp() """ from functools import wraps @wraps(func) @@ -103,19 +104,20 @@ def graph_component(func): """Decorator for graph component functions. This decorator returns an ops_group. - Usage: - ```python - # Warning: caching is tricky when recursion is involved. Please be careful and - # set proper max_cache_staleness in case of infinite loop. - import kfp.dsl as dsl - @dsl.graph_component - def flip_component(flip_result): - print_flip = PrintOp(flip_result) - flipA = FlipCoinOp().after(print_flip) - flipA.execution_options.caching_strategy.max_cache_staleness = "P0D" - with dsl.Condition(flipA.output == 'heads'): - flip_component(flipA.output) - return {'flip_result': flipA.output} + Example: + :: + + # Warning: caching is tricky when recursion is involved. Please be careful and + # set proper max_cache_staleness in case of infinite loop. + import kfp.dsl as dsl + @dsl.graph_component + def flip_component(flip_result): + print_flip = PrintOp(flip_result) + flipA = FlipCoinOp().after(print_flip) + flipA.execution_options.caching_strategy.max_cache_staleness = "P0D" + with dsl.Condition(flipA.output == 'heads'): + flip_component(flipA.output) + return {'flip_result': flipA.output} """ from functools import wraps @wraps(func) diff --git a/sdk/python/kfp/dsl/_container_op.py b/sdk/python/kfp/dsl/_container_op.py index c22bec13870f..c8881c2e4408 100644 --- a/sdk/python/kfp/dsl/_container_op.py +++ b/sdk/python/kfp/dsl/_container_op.py @@ -106,28 +106,26 @@ class Container(V1Container): required property). See: - - https://github.com/kubernetes-client/python/blob/master/kubernetes/client/models/v1_container.py - - https://github.com/argoproj/argo/blob/master/api/openapi-spec/swagger.json + * https://github.com/kubernetes-client/python/blob/master/kubernetes/client/models/v1_container.py + * https://github.com/argoproj/argo/blob/master/api/openapi-spec/swagger.json - Example: + :: - from kfp.dsl import ContainerOp - from kubernetes.client.models import V1EnvVar - + from kfp.dsl import ContainerOp + from kubernetes.client.models import V1EnvVar + - # creates a operation - op = ContainerOp(name='bash-ops', - image='busybox:latest', - command=['echo'], - arguments=['$MSG']) + # creates a operation + op = ContainerOp(name='bash-ops', + image='busybox:latest', + command=['echo'], + arguments=['$MSG']) - # returns a `Container` object from `ContainerOp` - # and add an environment variable to `Container` - op.container.add_env_variable(V1EnvVar(name='MSG', value='hello world')) + # returns a `Container` object from `ContainerOp` + # and add an environment variable to `Container` + op.container.add_env_variable(V1EnvVar(name='MSG', value='hello world')) - """ - """ Attributes: attribute_map (dict): The key is attribute name and the value is json key in definition. @@ -556,7 +554,23 @@ class UserContainer(Container): See https://github.com/argoproj/argo/blob/master/api/openapi-spec/swagger.json - Example + Args: + name: unique name for the user container + image: image to use for the user container, e.g. redis:alpine + command: entrypoint array. Not executed within a shell. + args: arguments to the entrypoint. + mirror_volume_mounts: MirrorVolumeMounts will mount the same + volumes specified in the main container to the container (including artifacts), + at the same mountPaths. This enables dind daemon to partially see the same + filesystem as the main container in order to use features such as docker + volume binding + **kwargs: keyword arguments available for `Container` + + Attributes: + swagger_types (dict): The key is attribute name + and the value is attribute type. + + Example:: from kfp.dsl import ContainerOp, UserContainer @@ -564,14 +578,6 @@ class UserContainer(Container): op = (ContainerOp(name='foo-op', image='busybox:latest') .add_initContainer( UserContainer(name='redis', image='redis:alpine'))) - - """ - """ - Attributes: - swagger_types (dict): The key is attribute name - and the value is attribute type. - attribute_map (dict): The key is attribute name - and the value is json key in definition. """ # adds `mirror_volume_mounts` to `UserContainer` swagger definition # NOTE inherits definition from `V1Container` rather than `Container` @@ -592,21 +598,6 @@ def __init__(self, args: StringOrStringList = None, mirror_volume_mounts: bool = None, **kwargs): - """Creates a new instance of `UserContainer`. - - Args: - name {str}: unique name for the user container - image {str}: image to use for the user container, e.g. redis:alpine - command {StringOrStringList}: entrypoint array. Not executed within a shell. - args {StringOrStringList}: arguments to the entrypoint. - mirror_volume_mounts {bool}: MirrorVolumeMounts will mount the same - volumes specified in the main container to the container (including artifacts), - at the same mountPaths. This enables dind daemon to partially see the same - filesystem as the main container in order to use features such as docker - volume binding - **kwargs: keyword arguments available for `Container` - - """ super().__init__( name=name, image=image, @@ -638,6 +629,20 @@ def inputs(self): class Sidecar(UserContainer): + """Creates a new instance of `Sidecar`. + + Args: + name: unique name for the sidecar container + image: image to use for the sidecar container, e.g. redis:alpine + command: entrypoint array. Not executed within a shell. + args: arguments to the entrypoint. + mirror_volume_mounts: MirrorVolumeMounts will mount the same + volumes specified in the main container to the sidecar (including artifacts), + at the same mountPaths. This enables dind daemon to partially see the same + filesystem as the main container in order to use features such as docker + volume binding + **kwargs: keyword arguments available for `Container` + """ def __init__(self, name: str, @@ -646,21 +651,6 @@ def __init__(self, args: StringOrStringList = None, mirror_volume_mounts: bool = None, **kwargs): - """Creates a new instance of `Sidecar`. - - Args: - name {str}: unique name for the sidecar container - image {str}: image to use for the sidecar container, e.g. redis:alpine - command {StringOrStringList}: entrypoint array. Not executed within a shell. - args {StringOrStringList}: arguments to the entrypoint. - mirror_volume_mounts {bool}: MirrorVolumeMounts will mount the same - volumes specified in the main container to the sidecar (including artifacts), - at the same mountPaths. This enables dind daemon to partially see the same - filesystem as the main container in order to use features such as docker - volume binding - **kwargs: keyword arguments available for `Container` - - """ super().__init__( name=name, image=image, @@ -680,6 +670,17 @@ def _make_hash_based_id_for_op(op): class BaseOp(object): + """Base operator + + Args: + name: the name of the op. It does not have to be unique within a pipeline + because the pipeline will generates a unique new name in case of conflicts. + init_containers: the list of `UserContainer` objects describing the InitContainer + to deploy before the `main` container. + sidecars: the list of `Sidecar` objects describing the sidecar containers to deploy + together with the `main` container. + is_exit_handler: Deprecated. + """ # list of attributes that might have pipeline params - used to generate # the input parameters during compilation. @@ -695,18 +696,6 @@ def __init__(self, init_containers: List[UserContainer] = None, sidecars: List[Sidecar] = None, is_exit_handler: bool = False): - """Create a new instance of BaseOp - - Args: - name: the name of the op. It does not have to be unique within a pipeline - because the pipeline will generates a unique new name in case of conflicts. - init_containers: the list of `UserContainer` objects describing the InitContainer - to deploy before the `main` container. - sidecars: the list of `Sidecar` objects describing the sidecar containers to deploy - together with the `main` container. - is_exit_handler: Deprecated. - """ - valid_name_regex = r'^[A-Za-z][A-Za-z0-9\s_-]*$' if not re.match(valid_name_regex, name): raise ValueError( @@ -772,14 +761,15 @@ def apply(self, mod_func): """Applies a modifier function to self. The function should return the passed object. This is needed to chain "extention methods" to this class. - Example: - from kfp.gcp import use_gcp_secret - task = ( - train_op(...) - .set_memory_request('1G') - .apply(use_gcp_secret('user-gcp-sa')) - .set_memory_limit('2G') - ) + Example:: + + from kfp.gcp import use_gcp_secret + task = ( + train_op(...) + .set_memory_request('1G') + .apply(use_gcp_secret('user-gcp-sa')) + .set_memory_limit('2G') + ) """ return mod_func(self) or self @@ -813,16 +803,20 @@ def add_toleration(self, tolerations: V1Toleration): def add_affinity(self, affinity: V1Affinity): """Add K8s Affinity + Args: affinity: Kubernetes affinity For detailed spec, check affinity definition https://github.com/kubernetes-client/python/blob/master/kubernetes/client/models/v1_affinity.py - example: V1Affinity( - node_affinity=V1NodeAffinity( - required_during_scheduling_ignored_during_execution=V1NodeSelector( - node_selector_terms=[V1NodeSelectorTerm( - match_expressions=[V1NodeSelectorRequirement( - key='beta.kubernetes.io/instance-type', operator='In', values=['p2.xlarge'])])]))) + + Example:: + + V1Affinity( + node_affinity=V1NodeAffinity( + required_during_scheduling_ignored_during_execution=V1NodeSelector( + node_selector_terms=[V1NodeSelectorTerm( + match_expressions=[V1NodeSelectorRequirement( + key='beta.kubernetes.io/instance-type', operator='In', values=['p2.xlarge'])])]))) """ self.affinity = affinity return self @@ -921,15 +915,44 @@ def __init__(self, argument, input=None, path=None): class ContainerOp(BaseOp): - """ - Represents an op implemented by a container image. - + """Represents an op implemented by a container image. + + Args: + name: the name of the op. It does not have to be unique within a pipeline + because the pipeline will generates a unique new name in case of conflicts. + image: the container image name, such as 'python:3.5-jessie' + command: the command to run in the container. + If None, uses default CMD in defined in container. + arguments: the arguments of the command. The command can include "%s" and supply + a PipelineParam as the string replacement. For example, ('echo %s' % input_param). + At container run time the argument will be 'echo param_value'. + init_containers: the list of `UserContainer` objects describing the InitContainer + to deploy before the `main` container. + sidecars: the list of `Sidecar` objects describing the sidecar containers to deploy + together with the `main` container. + container_kwargs: the dict of additional keyword arguments to pass to the + op's `Container` definition. + artifact_argument_paths: Optional. Maps input artifact arguments (values or references) to the local file paths where they'll be placed. + At pipeline run time, the value of the artifact argument is saved to a local file with specified path. + This parameter is only needed when the input file paths are hard-coded in the program. + Otherwise it's better to pass input artifact placement paths by including artifact arguments in the command-line using the InputArgumentPath class instances. + file_outputs: Maps output labels to local file paths. At pipeline run time, + the value of a PipelineParam is saved to its corresponding local file. It's + one way for outside world to receive outputs of the container. + output_artifact_paths: Maps output artifact labels to local artifact file paths. + It has the following default artifact paths during compile time. + {'mlpipeline-ui-metadata': '/mlpipeline-ui-metadata.json', + 'mlpipeline-metrics': '/mlpipeline-metrics.json'} + is_exit_handler: Deprecated. This is no longer needed. + pvolumes: Dictionary for the user to match a path on the op's fs with a + V1Volume or it inherited type. + E.g {"/my/path": vol, "/mnt": other_op.pvolumes["/output"]}. + Example:: from kfp import dsl from kubernetes.client.models import V1EnvVar, V1SecretKeySelector - @dsl.pipeline( name='foo', description='hello world') @@ -952,7 +975,6 @@ def foo_pipeline(tag: str, pull_image_policy: str): # add sidecar with parameterized image tag # sidecar follows the argo sidecar swagger spec op.add_sidecar(dsl.Sidecar('redis', 'redis:%s' % tag).set_image_pull_policy('Always')) - """ # list of attributes that might have pipeline params - used to generate @@ -977,40 +999,6 @@ def __init__( is_exit_handler=False, pvolumes: Dict[str, V1Volume] = None, ): - """Create a new instance of ContainerOp. - - Args: - name: the name of the op. It does not have to be unique within a pipeline - because the pipeline will generates a unique new name in case of conflicts. - image: the container image name, such as 'python:3.5-jessie' - command: the command to run in the container. - If None, uses default CMD in defined in container. - arguments: the arguments of the command. The command can include "%s" and supply - a PipelineParam as the string replacement. For example, ('echo %s' % input_param). - At container run time the argument will be 'echo param_value'. - init_containers: the list of `UserContainer` objects describing the InitContainer - to deploy before the `main` container. - sidecars: the list of `Sidecar` objects describing the sidecar containers to deploy - together with the `main` container. - container_kwargs: the dict of additional keyword arguments to pass to the - op's `Container` definition. - artifact_argument_paths: Optional. Maps input artifact arguments (values or references) to the local file paths where they'll be placed. - At pipeline run time, the value of the artifact argument is saved to a local file with specified path. - This parameter is only needed when the input file paths are hard-coded in the program. - Otherwise it's better to pass input artifact placement paths by including artifact arguments in the command-line using the InputArgumentPath class instances. - file_outputs: Maps output labels to local file paths. At pipeline run time, - the value of a PipelineParam is saved to its corresponding local file. It's - one way for outside world to receive outputs of the container. - output_artifact_paths: Maps output artifact labels to local artifact file paths. - It has the following default artifact paths during compile time. - {'mlpipeline-ui-metadata': '/mlpipeline-ui-metadata.json', - 'mlpipeline-metrics': '/mlpipeline-metrics.json'} - is_exit_handler: Deprecated. This is no longer needed. - pvolumes: Dictionary for the user to match a path on the op's fs with a - V1Volume or it inherited type. - E.g {"/my/path": vol, "/mnt": other_op.pvolumes["/output"]}. - """ - super().__init__(name=name, init_containers=init_containers, sidecars=sidecars, is_exit_handler=is_exit_handler) if not ContainerOp._DISABLE_REUSABLE_COMPONENT_WARNING and '--component_launcher_class_path' not in (arguments or []): @@ -1149,14 +1137,15 @@ def container(self): `io.argoproj.workflow.v1alpha1.Template`. Can be used to update the container configurations. - Example: + Example:: + import kfp.dsl as dsl from kubernetes.client.models import V1EnvVar @dsl.pipeline(name='example_pipeline') def immediate_value_pipeline(): op1 = (dsl.ContainerOp(name='example', image='nginx:alpine') - .container + .container .add_env_variable(V1EnvVar(name='HOST', value='foo.bar')) .add_env_variable(V1EnvVar(name='PORT', value='80')) .parent # return the parent `ContainerOp` @@ -1165,8 +1154,9 @@ def immediate_value_pipeline(): return self._container def _set_metadata(self, metadata): - '''_set_metadata passes the containerop the metadata information + '''Passes the ContainerOp the metadata information and configures the right output + Args: metadata (ComponentSpec): component metadata ''' diff --git a/sdk/python/kfp/dsl/_metadata.py b/sdk/python/kfp/dsl/_metadata.py index 7cffcb0097bf..5ffc30f1804c 100644 --- a/sdk/python/kfp/dsl/_metadata.py +++ b/sdk/python/kfp/dsl/_metadata.py @@ -20,11 +20,13 @@ def _annotation_to_typemeta(annotation): '''_annotation_to_type_meta converts an annotation to a type structure + Args: annotation(BaseType/str/dict): input/output annotations BaseType: registered in kfp.dsl.types str: either a string of a dict serialization or a string of the type name dict: type name and properties. note that the properties values can be dict. + Returns: dict or string representing the type ''' diff --git a/sdk/python/kfp/dsl/_ops_group.py b/sdk/python/kfp/dsl/_ops_group.py index ab173c888044..7adcc635a6e6 100644 --- a/sdk/python/kfp/dsl/_ops_group.py +++ b/sdk/python/kfp/dsl/_ops_group.py @@ -30,6 +30,7 @@ class OpsGroup(object): def __init__(self, group_type: str, name: str=None, parallelism: int=None): """Create a new instance of OpsGroup. + Args: group_type (str): one of 'pipeline', 'exit_handler', 'condition', 'for_loop', and 'graph'. name (str): name of the opsgroup @@ -50,11 +51,13 @@ def __init__(self, group_type: str, name: str=None, parallelism: int=None): @staticmethod def _get_matching_opsgroup_already_in_pipeline(group_type, name): - """retrieves the opsgroup when the pipeline already contains it. + """Retrieves the opsgroup when the pipeline already contains it. the opsgroup might be already in the pipeline in case of recursive calls. + Args: group_type (str): one of 'pipeline', 'exit_handler', 'condition', and 'graph'. - name (str): the name before conversion. """ + name (str): the name before conversion. + """ if not _pipeline.Pipeline.get_default_pipeline(): raise ValueError('Default pipeline not defined.') if name is None: @@ -104,23 +107,22 @@ def remove_op_recursive(self, op): class ExitHandler(OpsGroup): """Represents an exit handler that is invoked upon exiting a group of ops. - Example usage: - ```python - exit_op = ContainerOp(...) - with ExitHandler(exit_op): - op1 = ContainerOp(...) - op2 = ContainerOp(...) - ``` + Args: + exit_op: An operator invoked at exiting a group of ops. + + Raises: + ValueError: Raised if the exit_op is invalid. + + Example: + :: + + exit_op = ContainerOp(...) + with ExitHandler(exit_op): + op1 = ContainerOp(...) + op2 = ContainerOp(...) """ def __init__(self, exit_op: _container_op.ContainerOp): - """Create a new instance of ExitHandler. - Args: - exit_op: an operator invoked at exiting a group of ops. - - Raises: - ValueError is the exit_op is invalid. - """ super(ExitHandler, self).__init__('exit_handler') if exit_op.dependent_names: raise ValueError('exit_op cannot depend on any other ops.') @@ -137,20 +139,19 @@ def __init__(self, exit_op: _container_op.ContainerOp): class Condition(OpsGroup): """Represents an condition group with a condition. - Example usage: - ```python - with Condition(param1=='pizza', '[param1 is pizza]'): - op1 = ContainerOp(...) - op2 = ContainerOp(...) - ``` + Args: + condition (ConditionOperator): the condition. + name (str): name of the condition + + Example: + :: + + with Condition(param1=='pizza', '[param1 is pizza]'): + op1 = ContainerOp(...) + op2 = ContainerOp(...) """ def __init__(self, condition, name = None): - """Create a new instance of Condition. - Args: - condition (ConditionOperator): the condition. - name (str): name of the condition - """ super(Condition, self).__init__('condition', name) self.condition = condition @@ -158,6 +159,9 @@ def __init__(self, condition, name = None): class Graph(OpsGroup): """Graph DAG with inputs, recursive_inputs, and outputs. This is not used directly by the users but auto generated when the graph_component decoration exists + + Args: + name: Name of the graph. """ def __init__(self, name): super(Graph, self).__init__(group_type='graph', name=name) @@ -169,13 +173,12 @@ def __init__(self, name): class ParallelFor(OpsGroup): """Represents a parallel for loop over a static set of items. - Example usage: - ```python - with dsl.ParallelFor([{'a': 1, 'b': 10}, {'a': 2, 'b': 20}]) as item: - op1 = ContainerOp(..., args=['echo {}'.format(item.a)]) - op2 = ContainerOp(..., args=['echo {}'.format(item.b]) - ``` - and op1 would be executed twice, once with args=['echo 1'] and once with args=['echo 2'] + Example: + In this case :code:`op1` would be executed twice, once with case :code:`args=['echo 1']` and once with case :code:`args=['echo 2']`:: + + with dsl.ParallelFor([{'a': 1, 'b': 10}, {'a': 2, 'b': 20}]) as item: + op1 = ContainerOp(..., args=['echo {}'.format(item.a)]) + op2 = ContainerOp(..., args=['echo {}'.format(item.b]) """ TYPE_NAME = 'for_loop' diff --git a/sdk/python/kfp/dsl/_pipeline.py b/sdk/python/kfp/dsl/_pipeline.py index 9397fbc74317..176d860b968b 100644 --- a/sdk/python/kfp/dsl/_pipeline.py +++ b/sdk/python/kfp/dsl/_pipeline.py @@ -30,15 +30,15 @@ def pipeline(name : str = None, description : str = None): """Decorator of pipeline functions. - Usage: - ```python - @pipeline( - name='my awesome pipeline', - description='Is it really awesome?' - ) - def my_pipeline(a: PipelineParam, b: PipelineParam): - ... - ``` + Example + :: + + @pipeline( + name='my awesome pipeline', + description='Is it really awesome?' + ) + def my_pipeline(a: PipelineParam, b: PipelineParam): + ... """ def _pipeline(func): if name: @@ -71,8 +71,8 @@ def set_image_pull_secrets(self, image_pull_secrets): Args: image_pull_secrets: a list of Kubernetes V1LocalObjectReference - For detailed description, check Kubernetes V1LocalObjectReference definition - https://github.com/kubernetes-client/python/blob/master/kubernetes/docs/V1LocalObjectReference.md + For detailed description, check Kubernetes V1LocalObjectReference definition + https://github.com/kubernetes-client/python/blob/master/kubernetes/docs/V1LocalObjectReference.md """ self.image_pull_secrets = image_pull_secrets return self @@ -90,7 +90,7 @@ def set_parallelism(self, max_num_pods: int): """Configures the max number of total parallel pods that can execute at the same time in a workflow. Args: - max_num_pods (int): max number of total parallel pods. + max_num_pods: max number of total parallel pods. """ self.parallelism = max_num_pods return self @@ -129,10 +129,10 @@ def set_image_pull_policy(self, policy: str): def add_op_transformer(self, transformer): """Configures the op_transformers which will be applied to all ops in the pipeline. - The ops can be ResourceOp, VolumenOp, or ContainerOp. + The ops can be ResourceOp, VolumeOp, or ContainerOp. Args: - transformer: a function that takes a kfp Op as input and returns a kfp Op + transformer: A function that takes a kfp Op as input and returns a kfp Op """ self.op_transformers.append(transformer) @@ -142,20 +142,22 @@ def data_passing_method(self): @data_passing_method.setter def data_passing_method(self, value): - '''Sets the object representing the method used for intermediate data passing. - Example:: - - from kfp.dsl import PipelineConf, data_passing_methods - from kubernetes.client.models import V1Volume, V1PersistentVolumeClaim - pipeline_conf = PipelineConf() - pipeline_conf.data_passing_method = data_passing_methods.KubernetesVolume( - volume=V1Volume( - name='data', - persistent_volume_claim=V1PersistentVolumeClaim('data-volume'), - ), - path_prefix='artifact_data/', - ) - ''' + """Sets the object representing the method used for intermediate data passing. + + Example: + :: + + from kfp.dsl import PipelineConf, data_passing_methods + from kubernetes.client.models import V1Volume, V1PersistentVolumeClaim + pipeline_conf = PipelineConf() + pipeline_conf.data_passing_method = data_passing_methods.KubernetesVolume( + volume=V1Volume( + name='data', + persistent_volume_claim=V1PersistentVolumeClaim('data-volume'), + ), + path_prefix='artifact_data/', + ) + """ self._data_passing_method = value def get_pipeline_conf(): @@ -173,12 +175,13 @@ class Pipeline(): is useful for implementing a compiler. For example, the compiler can use the following to get the pipeline object and its ops: - ```python - with Pipeline() as p: - pipeline_func(*args_list) + Example: + :: - traverse(p.ops) - ``` + with Pipeline() as p: + pipeline_func(*args_list) + + traverse(p.ops) """ # _default_pipeline is set when it (usually a compiler) runs "with Pipeline()" @@ -271,8 +274,9 @@ def get_next_group_id(self): return self.group_id def _set_metadata(self, metadata): - '''_set_metadata passes the containerop the metadata information + """_set_metadata passes the containerop the metadata information + Args: metadata (ComponentMeta): component metadata - ''' + """ self._metadata = metadata diff --git a/sdk/python/kfp/dsl/_pipeline_param.py b/sdk/python/kfp/dsl/_pipeline_param.py index 49cb51adf4cd..de809f7f8b91 100644 --- a/sdk/python/kfp/dsl/_pipeline_param.py +++ b/sdk/python/kfp/dsl/_pipeline_param.py @@ -23,16 +23,14 @@ def sanitize_k8s_name(name, allow_capital_underscore=False): - """From _make_kubernetes_name - sanitize_k8s_name cleans and converts the names in the workflow. + """Cleans and converts the names in the workflow. Args: name: original name, - allow_capital_underscore: whether to allow capital letter and underscore - in this name. + allow_capital_underscore: whether to allow capital letter and underscore in this name. Returns: - sanitized name. + A sanitized name. """ if allow_capital_underscore: return re.sub('-+', '-', re.sub('[^-_0-9A-Za-z]+', '-', name)).lstrip('-').rstrip('-') @@ -40,13 +38,14 @@ def sanitize_k8s_name(name, allow_capital_underscore=False): return re.sub('-+', '-', re.sub('[^-0-9a-z]+', '-', name.lower())).lstrip('-').rstrip('-') -def match_serialized_pipelineparam(payload: str): - """match_serialized_pipelineparam matches the serialized pipelineparam. +def match_serialized_pipelineparam(payload: str) -> List[PipelineParamTuple]: + """Matches the supplied serialized pipelineparam. + Args: - payloads (str): a string that contains the serialized pipelineparam. + payloads: The search space for the serialized pipelineparams. Returns: - PipelineParamTuple + The matched pipeline params we found in the supplied payload. """ matches = re.findall(r'{{pipelineparam:op=([\w\s_-]*);name=([\w\s_-]+)}}', payload) param_tuples = [] @@ -59,14 +58,15 @@ def match_serialized_pipelineparam(payload: str): return param_tuples -def _extract_pipelineparams(payloads: str or List[str]): - """_extract_pipelineparam extract a list of PipelineParam instances from the payload string. +def _extract_pipelineparams(payloads: Union[str, List[str]]) -> List['PipelineParam']: + """Extracts a list of PipelineParam instances from the payload string. Note: this function removes all duplicate matches. Args: - payload (str or list[str]): a string/a list of strings that contains serialized pipelineparams + payload: a string/a list of strings that contains serialized pipelineparams + Return: - List[PipelineParam] + List[] """ if isinstance(payloads, str): payloads = [payloads] @@ -81,7 +81,7 @@ def _extract_pipelineparams(payloads: str or List[str]): return pipeline_params -def extract_pipelineparams_from_any(payload) -> List['PipelineParam']: +def extract_pipelineparams_from_any(payload: Union['PipelineParam', str, list, tuple, dict]) -> List['PipelineParam']: """Recursively extract PipelineParam instances or serialized string from any object or list of objects. Args: @@ -135,24 +135,23 @@ class PipelineParam(object): A PipelineParam object can be used as a pipeline function argument so that it will be a pipeline parameter that shows up in ML Pipelines system UI. It can also represent an intermediate value passed between components. + + Args: + name: name of the pipeline parameter. + op_name: the name of the operation that produces the PipelineParam. None means + it is not produced by any operator, so if None, either user constructs it + directly (for providing an immediate value), or it is a pipeline function + argument. + value: The actual value of the PipelineParam. If provided, the PipelineParam is + "resolved" immediately. For now, we support string only. + param_type: the type of the PipelineParam. + pattern: the serialized string regex pattern this pipeline parameter created from. + + Raises: ValueError in name or op_name contains invalid characters, or both op_name + and value are set. """ - - def __init__(self, name: str, op_name: str=None, value: str=None, param_type : Union[str, Dict] = None, pattern: str=None): - """Create a new instance of PipelineParam. - Args: - name: name of the pipeline parameter. - op_name: the name of the operation that produces the PipelineParam. None means - it is not produced by any operator, so if None, either user constructs it - directly (for providing an immediate value), or it is a pipeline function - argument. - value: The actual value of the PipelineParam. If provided, the PipelineParam is - "resolved" immediately. For now, we support string only. - param_type: the type of the PipelineParam. - pattern: the serialized string regex pattern this pipeline parameter created from. - Raises: ValueError in name or op_name contains invalid characters, or both op_name - and value are set. - """ + def __init__(self, name: str, op_name: str=None, value: str=None, param_type : Union[str, Dict] = None, pattern: str=None): valid_name_regex = r'^[A-Za-z][A-Za-z0-9\s_-]*$' if not re.match(valid_name_regex, name): raise ValueError('Only letters, numbers, spaces, "_", and "-" are allowed in name. Must begin with a letter. ' diff --git a/sdk/python/kfp/dsl/_pipeline_volume.py b/sdk/python/kfp/dsl/_pipeline_volume.py index 9d8af8ee3f32..b16a05061258 100644 --- a/sdk/python/kfp/dsl/_pipeline_volume.py +++ b/sdk/python/kfp/dsl/_pipeline_volume.py @@ -38,21 +38,19 @@ class PipelineVolume(V1Volume): A PipelineVolume object can be used as an extention of the pipeline function's filesystem. It may then be passed between ContainerOps, exposing dependencies. + + Args: + pvc: The name of an existing PVC + volume: Create a deep copy out of a V1Volume or PipelineVolume with no deps + + Raises: + ValueError: If volume is not None and kwargs is not None + If pvc is not None and kwargs.pop("name") is not None """ def __init__(self, pvc: str = None, volume: V1Volume = None, **kwargs): - """Create a new instance of PipelineVolume. - - Args: - pvc: The name of an existing PVC - volume: Create a deep copy out of a V1Volume or PipelineVolume - with no deps - Raises: - ValueError: if volume is not None and kwargs is not None - if pvc is not None and kwargs.pop("name") is not None - """ if volume and kwargs: raise ValueError("You can't pass a volume along with other " "kwargs.") @@ -91,6 +89,7 @@ def __init__(self, def after(self, *ops): """Creates a duplicate of self with the required dependecies excluding the redundant dependenices. + Args: *ops: Pipeline operators to add as dependencies """ diff --git a/sdk/python/kfp/dsl/_resource_op.py b/sdk/python/kfp/dsl/_resource_op.py index 0ba3d75ae205..a95d97ce13aa 100644 --- a/sdk/python/kfp/dsl/_resource_op.py +++ b/sdk/python/kfp/dsl/_resource_op.py @@ -63,7 +63,27 @@ def __init__(self, class ResourceOp(BaseOp): - """Represents an op which will be translated into a resource template""" + """Represents an op which will be translated into a resource template + + Args: + k8s_resource: A k8s resource which will be submitted to the cluster + action: One of "create"/"delete"/"apply"/"patch" + (default is "create") + merge_strategy: The merge strategy for the "apply" action + success_condition: The successCondition of the template + failure_condition: The failureCondition of the template + For more info see: + https://github.com/argoproj/argo/blob/master/examples/k8s-jobs.yaml + attribute_outputs: Maps output labels to resource's json paths, + similarly to file_outputs of ContainerOp + kwargs: name, sidecars. See BaseOp definition + + Raises: + ValueError: if not inside a pipeline + if the name is an invalid string + if no k8s_resource is provided + if merge_strategy is set without "apply" action + """ def __init__(self, k8s_resource=None, @@ -73,26 +93,6 @@ def __init__(self, failure_condition: str = None, attribute_outputs: Dict[str, str] = None, **kwargs): - """Create a new instance of ResourceOp. - - Args: - k8s_resource: A k8s resource which will be submitted to the cluster - action: One of "create"/"delete"/"apply"/"patch" - (default is "create") - merge_strategy: The merge strategy for the "apply" action - success_condition: The successCondition of the template - failure_condition: The failureCondition of the template - For more info see: - https://github.com/argoproj/argo/blob/master/examples/k8s-jobs.yaml - attribute_outputs: Maps output labels to resource's json paths, - similarly to file_outputs of ContainerOp - kwargs: name, sidecars. See BaseOp definition - Raises: - ValueError: if not inside a pipeline - if the name is an invalid string - if no k8s_resource is provided - if merge_strategy is set without "apply" action - """ super().__init__(**kwargs) self.attrs_with_pipelineparams = list(self.attrs_with_pipelineparams) diff --git a/sdk/python/kfp/dsl/_volume_op.py b/sdk/python/kfp/dsl/_volume_op.py index 116f64087e3d..818a3fe142ff 100644 --- a/sdk/python/kfp/dsl/_volume_op.py +++ b/sdk/python/kfp/dsl/_volume_op.py @@ -35,6 +35,29 @@ class VolumeOp(ResourceOp): """Represents an op which will be translated into a resource template which will be creating a PVC. + + Args: + resource_name: A desired name for the PVC which will be created + size: The size of the PVC which will be created + storage_class: The storage class to use for the dynamically created PVC + modes: The access modes for the PVC + annotations: Annotations to be patched in the PVC + data_source: May be a V1TypedLocalObjectReference, and then it is + used in the data_source field of the PVC as is. Can also be a + string/PipelineParam, and in that case it will be used as a + VolumeSnapshot name (Alpha feature) + volume_name: VolumeName is the binding reference to the PersistentVolume + backing this claim. + kwargs: See :py:class:`kfp.dsl.ResourceOp` + + Raises: + ValueError: if k8s_resource is provided along with other arguments + if k8s_resource is not a V1PersistentVolumeClaim + if size is None + if size is an invalid memory string (when not a + PipelineParam) + if data_source is not one of (str, PipelineParam, + V1TypedLocalObjectReference) """ def __init__(self, @@ -46,31 +69,6 @@ def __init__(self, data_source=None, volume_name=None, **kwargs): - """Create a new instance of VolumeOp. - - Args: - resource_name: A desired name for the PVC which will be created - size: The size of the PVC which will be created - storage_class: The storage class to use for the dynamically created - PVC - modes: The access modes for the PVC - annotations: Annotations to be patched in the PVC - data_source: May be a V1TypedLocalObjectReference, and then it is - used in the data_source field of the PVC as is. Can also be a - string/PipelineParam, and in that case it will be used as a - VolumeSnapshot name (Alpha feature) - volume_name: VolumeName is the binding reference to the PersistentVolume - backing this claim. - kwargs: See ResourceOp definition - Raises: - ValueError: if k8s_resource is provided along with other arguments - if k8s_resource is not a V1PersistentVolumeClaim - if size is None - if size is an invalid memory string (when not a - PipelineParam) - if data_source is not one of (str, PipelineParam, - V1TypedLocalObjectReference) - """ # Add size to attribute outputs self.attribute_outputs = {"size": "{.status.capacity.storage}"} diff --git a/sdk/python/kfp/dsl/_volume_snapshot_op.py b/sdk/python/kfp/dsl/_volume_snapshot_op.py index 694d04cc39ff..bb9b4863677d 100644 --- a/sdk/python/kfp/dsl/_volume_snapshot_op.py +++ b/sdk/python/kfp/dsl/_volume_snapshot_op.py @@ -29,6 +29,22 @@ class VolumeSnapshotOp(ResourceOp): At the time that this feature is written, VolumeSnapshots are an Alpha feature in Kubernetes. You should check with your Kubernetes Cluster admin if they have it enabled. + + + Args: + resource_name: A desired name for the VolumeSnapshot which will be created + pvc: The name of the PVC which will be snapshotted + snapshot_class: The snapshot class to use for the dynamically created VolumeSnapshot + annotations: Annotations to be patched in the VolumeSnapshot + volume: An instance of V1Volume + kwargs: See :py:class:`kfp.dsl.ResourceOp` + + Raises: + ValueError: if k8s_resource is provided along with other arguments + if k8s_resource is not a VolumeSnapshot + if pvc and volume are None + if pvc and volume are not None + if volume does not reference a PVC """ def __init__(self, @@ -38,24 +54,6 @@ def __init__(self, annotations: Dict[str, str] = None, volume: V1Volume = None, **kwargs): - """Create a new instance of VolumeSnapshotOp. - - Args: - resource_name: A desired name for the VolumeSnapshot which will be - created - pvc: The name of the PVC which will be snapshotted - snapshot_class: The snapshot class to use for the dynamically - created VolumeSnapshot - annotations: Annotations to be patched in the VolumeSnapshot - volume: An instance of V1Volume - kwargs: See ResourceOp definition - Raises: - ValueError: if k8s_resource is provided along with other arguments - if k8s_resource is not a VolumeSnapshot - if pvc and volume are None - if pvc and volume are not None - if volume does not reference a PVC - """ # Add size to output params self.attribute_outputs = {"size": "{.status.restoreSize}"} # Add default success_condition if None provided diff --git a/sdk/python/kfp/gcp.py b/sdk/python/kfp/gcp.py index aaca532af52a..8cd8b34bca8c 100644 --- a/sdk/python/kfp/gcp.py +++ b/sdk/python/kfp/gcp.py @@ -17,9 +17,9 @@ def use_gcp_secret(secret_name='user-gcp-sa', secret_file_path_in_volume=None, volume_name=None, secret_volume_mount_path='/secret/gcp-credentials'): """An operator that configures the container to use GCP service account by service account key - stored in a Kubernetes secret. + stored in a Kubernetes secret. - For cluster setup and alternatives to using service account key, check https://www.kubeflow.org/docs/gke/authentication-pipelines/. + For cluster setup and alternatives to using service account key, check https://www.kubeflow.org/docs/gke/authentication-pipelines/. """ # permitted values for secret_name = ['admin-gcp-sa', 'user-gcp-sa'] @@ -95,9 +95,10 @@ def use_preemptible_nodepool(toleration: V1Toleration = V1Toleration(effect='NoS value='true'), hard_constraint: bool = False): """An operator that configures the GKE preemptible in a container op. + Args: - toleration (V1Toleration): toleration to pods, default is the preemptible label. - hard_constraint (bool): the constraint of scheduling the pods on preemptible + toleration: toleration to pods, default is the preemptible label. + hard_constraint: the constraint of scheduling the pods on preemptible nodepools is hard. (Default: False) """ @@ -127,7 +128,7 @@ def add_gpu_toleration(toleration: V1Toleration = V1Toleration( """An operator that configures the GKE GPU nodes in a container op. Args: - toleration {V1Toleration} -- toleration to pods, default is the nvidia.com/gpu label. + toleration: toleration to pods, default is the nvidia.com/gpu label. """ def _set_toleration(task): diff --git a/sdk/python/kfp/onprem.py b/sdk/python/kfp/onprem.py index 2df49ba4e20b..0a119287f639 100644 --- a/sdk/python/kfp/onprem.py +++ b/sdk/python/kfp/onprem.py @@ -1,9 +1,11 @@ def mount_pvc(pvc_name='pipeline-claim', volume_name='pipeline', volume_mount_path='/mnt/pipeline'): - """ - Modifier function to apply to a Container Op to simplify volume, volume mount addition and - enable better reuse of volumes, volume claims across container ops. - Usage: + """Modifier function to apply to a Container Op to simplify volume, volume mount addition and + enable better reuse of volumes, volume claims across container ops. + + Example: + :: + train = train_op(...) train.apply(mount_pvc('claim-name', 'pipeline', '/mnt/pipeline')) """