Skip to content

Commit

Permalink
Merge pull request #95 from getindata/release-0.5.0
Browse files Browse the repository at this point in the history
Release 0.5.0
  • Loading branch information
Mariusz Strzelecki authored Jan 27, 2022
2 parents 69b87b2 + c8a4f29 commit c464564
Show file tree
Hide file tree
Showing 16 changed files with 202 additions and 83 deletions.
1 change: 1 addition & 0 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -12,3 +12,4 @@ repos:
rev: v2.3.0
hooks:
- id: flake8
args: ['--ignore=E203,W503'] # see https://github.com/psf/black/issues/315 https://github.com/psf/black/issues/52
10 changes: 9 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,12 @@

## [Unreleased]

## [0.5.0] - 2022-01-27

- Kedro paramters of complex types (lists and dicts) are now supported
- `run_once` and `schedule` accepts Kedro parameters override
- Names of the one-off runs and scheduled runs are templated with parameters

## [0.4.8] - 2022-01-10

## [0.4.7] - 2022-01-05
Expand Down Expand Up @@ -102,7 +108,9 @@
- Method to schedule runs for most recent version of given pipeline `kedro kubeflow schedule`
- Shortcut to open UI for pipelines using `kedro kubeflow ui`

[Unreleased]: https://github.com/getindata/kedro-kubeflow/compare/0.4.8...HEAD
[Unreleased]: https://github.com/getindata/kedro-kubeflow/compare/0.5.0...HEAD

[0.5.0]: https://github.com/getindata/kedro-kubeflow/compare/0.4.8...0.5.0

[0.4.8]: https://github.com/getindata/kedro-kubeflow/compare/0.4.7...0.4.8

Expand Down
2 changes: 1 addition & 1 deletion kedro_kubeflow/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
"""kedro_kubeflow."""

version = "0.4.8"
version = "0.5.0"
31 changes: 29 additions & 2 deletions kedro_kubeflow/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,10 @@
LOG = logging.getLogger(__name__)


def format_params(params: list):
return dict((p[: p.find(":")], p[p.find(":") + 1 :]) for p in params)


@click.group("Kubeflow")
def commands():
"""Kedro plugin adding support for Kubeflow Pipelines"""
Expand Down Expand Up @@ -67,8 +71,17 @@ def list_pipelines(ctx):
help="Namespace where pipeline experiment run should be deployed to. Not needed "
"if provided experiment name already exists.",
)
@click.option(
"--param",
"params",
type=str,
multiple=True,
help="Parameters override in form of `key=value`",
)
@click.pass_context
def run_once(ctx, image: str, pipeline: str, experiment_namespace: str):
def run_once(
ctx, image: str, pipeline: str, experiment_namespace: str, params: list
):
"""Deploy pipeline as a single run within given experiment.
Config can be specified in kubeflow.yml as well."""
context_helper = ctx.obj["context_helper"]
Expand All @@ -82,6 +95,7 @@ def run_once(ctx, image: str, pipeline: str, experiment_namespace: str):
run_name=config.run_name,
wait=config.wait_for_completion,
image_pull_policy=config.image_pull_policy,
parameters=format_params(params),
)


Expand Down Expand Up @@ -189,21 +203,34 @@ def upload_pipeline(ctx, image, pipeline) -> None:
help="Namespace where pipeline experiment run should be deployed to. Not needed "
"if provided experiment name already exists.",
)
@click.option(
"--param",
"params",
type=str,
multiple=True,
help="Parameters override in form of `key=value`",
)
@click.pass_context
def schedule(
ctx,
pipeline: str,
experiment_namespace: str,
experiment_name: str,
cron_expression: str,
params: list,
):
"""Schedules recurring execution of latest version of the pipeline"""
context_helper = ctx.obj["context_helper"]
config = context_helper.config.run_config
experiment = experiment_name if experiment_name else config.experiment_name

context_helper.kfp_client.schedule(
pipeline, experiment, experiment_namespace, cron_expression
pipeline,
experiment,
experiment_namespace,
cron_expression,
run_name=config.scheduled_run_name,
parameters=format_params(params),
)


Expand Down
11 changes: 10 additions & 1 deletion kedro_kubeflow/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,12 @@
# Name of the kubeflow experiment to be created
experiment_name: {project}
# Name of the run for run-once
# Name of the run for run-once, templated with the run-once parameters
run_name: {run_name}
# Name of the scheduled run, templated with the schedule parameters
scheduled_run_name: {run_name}
# Optional pipeline description
#description: "Very Important Pipeline"
Expand Down Expand Up @@ -205,6 +208,12 @@ def experiment_name(self):
def run_name(self):
return self._get_or_fail("run_name")

@property
def scheduled_run_name(self):
return self._get_or_default(
"scheduled_run_name", self._get_or_fail("run_name")
)

@property
def description(self):
return self._get_or_default("description", None)
Expand Down
23 changes: 12 additions & 11 deletions kedro_kubeflow/generators/one_pod_pipeline_generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,9 @@

from ..utils import clean_name
from .utils import (
create_arguments_from_parameters,
create_command_using_params_dumper,
create_container_environment,
create_params,
maybe_add_params,
)

Expand Down Expand Up @@ -50,16 +51,16 @@ def _build_kfp_op(
container_op = dsl.ContainerOp(
name=clean_name(pipeline),
image=image,
command=["kedro"],
arguments=[
"run",
"--env",
self.context.env,
"--params",
create_params(self.context.params.keys()),
"--pipeline",
pipeline,
],
command=create_command_using_params_dumper(
"kedro "
"run "
f"--env {self.context.env} "
f"--pipeline {pipeline} "
f"--config config.yaml"
),
arguments=create_arguments_from_parameters(
self.context.params.keys()
),
container_kwargs=kwargs,
file_outputs={
output: f"/home/kedro/{self.catalog[output]['filepath']}"
Expand Down
26 changes: 13 additions & 13 deletions kedro_kubeflow/generators/pod_per_node_pipeline_generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,9 @@

from ..utils import clean_name, is_mlflow_enabled
from .utils import (
create_arguments_from_parameters,
create_command_using_params_dumper,
create_container_environment,
create_params,
maybe_add_params,
)

Expand Down Expand Up @@ -149,18 +150,17 @@ def _build_kfp_ops(
dsl.ContainerOp(
name=name,
image=image,
command=["kedro"],
arguments=[
"run",
"--env",
self.context.env,
"--params",
create_params(self.context.params.keys()),
"--pipeline",
pipeline,
"--node",
node.name,
],
command=create_command_using_params_dumper(
"kedro "
"run "
f"--env {self.context.env} "
f"--pipeline {pipeline} "
f"--node {node.name} "
f"--config config.yaml"
),
arguments=create_arguments_from_parameters(
self.context.params.keys()
),
pvolumes=node_volumes,
container_kwargs=kwargs,
file_outputs={
Expand Down
28 changes: 21 additions & 7 deletions kedro_kubeflow/generators/utils.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import itertools
import os
from functools import wraps
from inspect import Parameter, signature
from typing import Iterable

import kubernetes.client as k8s
from kfp import dsl
Expand All @@ -26,12 +26,6 @@ def wrapper(*args, **kwargs):
return decorator


def create_params(param_keys: Iterable[str]) -> str:
return ",".join(
[f"{param}:{dsl.PipelineParam(param)}" for param in param_keys]
)


def create_container_environment():
env_vars = [
k8s.V1EnvVar(
Expand All @@ -44,3 +38,23 @@ def create_container_environment():
env_vars.append(k8s.V1EnvVar(name=key, value=os.environ[key]))

return env_vars


def create_command_using_params_dumper(command):
return [
"bash",
"-c",
"python -c 'import yaml, sys;"
"load=lambda e: yaml.load(e, Loader=yaml.FullLoader);"
"params=dict(zip(sys.argv[1::2], [load(e) for e in sys.argv[2::2]]));"
'f=open("config.yaml", "w");'
'yaml.dump({"run": {"params": params}}, f)\' "$@" &&' + command,
]


def create_arguments_from_parameters(paramter_names):
return ["_"] + list(
itertools.chain(
*[[param, dsl.PipelineParam(param)] for param in paramter_names]
)
)
37 changes: 22 additions & 15 deletions kedro_kubeflow/kfpclient.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,15 +69,16 @@ def run_once(
run_name,
wait,
image_pull_policy="IfNotPresent",
parameters={},
) -> None:
run = self.client.create_run_from_pipeline_func(
self.generator.generate_pipeline(
pipeline, image, image_pull_policy
),
arguments={},
arguments=parameters,
experiment_name=experiment_name,
namespace=experiment_namespace,
run_name=run_name,
run_name=run_name.format(**parameters),
)

if wait:
Expand Down Expand Up @@ -175,29 +176,35 @@ def _ensure_experiment_exists(self, experiment_name, experiment_namespace):
return experiment.id

def schedule(
self, pipeline, experiment_name, experiment_namespace, cron_expression
self,
pipeline,
experiment_name,
experiment_namespace,
cron_expression,
run_name,
parameters={},
):
experiment_id = self._ensure_experiment_exists(
experiment_name, experiment_namespace
)
pipeline_id = self._get_pipeline_id(self.project_name)
self._disable_runs(experiment_id, pipeline_id)
formatted_run_name = run_name.format(**parameters)
self._disable_runs(experiment_id, formatted_run_name)
self.client.create_recurring_run(
experiment_id,
f"{self.project_name} on {cron_expression}",
formatted_run_name,
cron_expression=cron_expression,
pipeline_id=pipeline_id,
params=parameters,
)
self.log.info("Pipeline scheduled to %s", cron_expression)

def _disable_runs(self, experiment_id, pipeline_id):
def _disable_runs(self, experiment_id, run_name):
runs = self.client.list_recurring_runs(experiment_id=experiment_id)
if runs.jobs is not None:
my_runs = [
job
for job in runs.jobs
if job.pipeline_spec.pipeline_id == pipeline_id
]
for job in my_runs:
self.client.jobs.delete_job(job.id)
self.log.info(f"Previous schedule deleted {job.id}")
if runs.jobs is None:
return

my_runs = [job for job in runs.jobs if job.name == run_name]
for job in my_runs:
self.client.jobs.delete_job(job.id)
self.log.info(f"Previous schedule deleted {job.id}")
2 changes: 1 addition & 1 deletion setup.cfg
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[bumpversion]
current_version = 0.4.8
current_version = 0.5.0

[bumpversion:file:setup.py]

Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@

setup(
name="kedro-kubeflow",
version="0.4.8",
version="0.5.0",
description="Kedro plugin with Kubeflow support",
long_description=README,
long_description_content_type="text/markdown",
Expand Down
21 changes: 19 additions & 2 deletions tests/test_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@ def test_run_once(self):
"new_pipe",
"--experiment-namespace",
"my-ns",
"--param",
"key1:some value",
],
obj=config,
)
Expand All @@ -78,6 +80,7 @@ def test_run_once(self):
run_name="test run",
wait=True,
experiment_namespace="my-ns",
parameters={"key1": "some value"},
)

@patch("webbrowser.open_new_tab")
Expand Down Expand Up @@ -133,13 +136,27 @@ def test_schedule(self):

result = runner.invoke(
schedule,
["-c", "* * *", "-x", "test_experiment", "-p", "my-pipeline"],
[
"-c",
"* * *",
"-x",
"test_experiment",
"-p",
"my-pipeline",
"--param",
"key1:some value",
],
obj=config,
)

assert result.exit_code == 0
context_helper.kfp_client.schedule.assert_called_with(
"my-pipeline", "test_experiment", None, "* * *"
"my-pipeline",
"test_experiment",
None,
"* * *",
run_name="test run",
parameters={"key1": "some value"},
)

@patch.object(Path, "cwd")
Expand Down
Loading

0 comments on commit c464564

Please sign in to comment.