From 31f0cc245f14cf08abffbe5990202b57bf487a5a Mon Sep 17 00:00:00 2001 From: Lingqing Gan Date: Thu, 31 Mar 2022 20:20:08 +0000 Subject: [PATCH 01/16] support IR YAML format in API --- backend/src/apiserver/server/util.go | 4 +- backend/src/apiserver/template/template.go | 3 +- .../src/apiserver/template/template_test.go | 68 ++++++++++++++++++- backend/src/apiserver/template/v2_template.go | 4 +- 4 files changed, 73 insertions(+), 6 deletions(-) diff --git a/backend/src/apiserver/server/util.go b/backend/src/apiserver/server/util.go index 48ff655959c..6b6b6fab7ee 100644 --- a/backend/src/apiserver/server/util.go +++ b/backend/src/apiserver/server/util.go @@ -13,7 +13,6 @@ import ( "net/url" "strings" - "github.com/golang/protobuf/jsonpb" "github.com/kubeflow/pipelines/api/v2alpha1/go/pipelinespec" "github.com/golang/glog" @@ -23,6 +22,7 @@ import ( "github.com/kubeflow/pipelines/backend/src/common/util" "github.com/pkg/errors" authorizationv1 "k8s.io/api/authorization/v1" + "github.com/ghodss/yaml" ) // These are valid conditions of a ScheduledWorkflow. @@ -323,7 +323,7 @@ func validatePipelineManifest(pipelineManifest string) error { if pipelineManifest != "" { // Verify valid IR spec spec := &pipelinespec.PipelineSpec{} - if err := jsonpb.UnmarshalString(pipelineManifest, spec); err != nil { + if err := yaml.Unmarshal([]byte(pipelineManifest), spec); err != nil { return util.NewInvalidInputErrorWithDetails(err, "Invalid IR spec format.") } diff --git a/backend/src/apiserver/template/template.go b/backend/src/apiserver/template/template.go index 5bcd5258cc9..d76f92c6eea 100644 --- a/backend/src/apiserver/template/template.go +++ b/backend/src/apiserver/template/template.go @@ -107,7 +107,8 @@ func isArgoWorkflow(template []byte) bool { // isPipelineSpec returns whether template is in KFP api/v2alpha1/PipelineSpec format. func isPipelineSpec(template []byte) bool { var spec pipelinespec.PipelineSpec - err := protojson.Unmarshal(template, &spec) + templateJson, _ := yaml.YAMLToJSON(template) + err := protojson.Unmarshal(templateJson, &spec) return err == nil && spec.GetPipelineInfo().GetName() != "" && spec.GetRoot() != nil } diff --git a/backend/src/apiserver/template/template_test.go b/backend/src/apiserver/template/template_test.go index b7899922636..0c586a13ab2 100644 --- a/backend/src/apiserver/template/template_test.go +++ b/backend/src/apiserver/template/template_test.go @@ -72,7 +72,7 @@ apiVersion: argoproj.io/v1alpha2 kind: Workflow`, templateType: V1, }, { - template: v2SpecHelloWorld, + template: v2SpecHelloWorldJSON, templateType: V2, }, { template: "", @@ -95,7 +95,11 @@ kind: CronWorkflow`, }, { template: `{"abc": "def", "b": {"key": 3}}`, templateType: Unknown, + }, { + template: v2SpecHelloWorldYAML, + templateType: V2, }} + for _, test := range tt { format := inferTemplateFormat([]byte(test.template)) if format != test.templateType { @@ -145,7 +149,7 @@ spec: container: image: docker/whalesay:latest` -var v2SpecHelloWorld = ` +var v2SpecHelloWorldJSON = ` { "components": { "comp-hello-world": { @@ -217,6 +221,66 @@ var v2SpecHelloWorld = ` } ` +var v2SpecHelloWorldYAML = ` +# this is a comment +components: + comp-hello-world: + executorLabel: exec-hello-world + inputDefinitions: + parameters: + text: + type: STRING +deploymentSpec: + executors: + exec-hello-world: + container: + args: + - "--text" + - "{{$.inputs.parameters['text']}}" + command: + - sh + - "-ec" + - | + program_path=$(mktemp) + printf "%s" "$0" > "$program_path" + python3 -u "$program_path" "$@" + - | + def hello_world(text): + print(text) + return text + + import argparse + _parser = argparse.ArgumentParser(prog='Hello world', description='') + _parser.add_argument("--text", dest="text", type=str, required=True, default=argparse.SUPPRESS) + _parsed_args = vars(_parser.parse_args()) + + _outputs = hello_world(**_parsed_args) + image: python:3.7 +pipelineInfo: + name: namespace/n1/pipeline/hello-world +root: + dag: + tasks: + hello-world: + cachingOptions: + enableCache: true + componentRef: + name: comp-hello-world + inputs: + parameters: + text: + componentInputParameter: text + taskInfo: + name: hello-world + inputDefinitions: + parameters: + text: + type: STRING +schemaVersion: 2.0.0 +sdkVersion: kfp-1.6.5 +` + + func TestToSwfCRDResourceGeneratedName_SpecialCharsAndSpace(t *testing.T) { name, err := toSWFCRDResourceGeneratedName("! HaVe ä £unky name") assert.Nil(t, err) diff --git a/backend/src/apiserver/template/v2_template.go b/backend/src/apiserver/template/v2_template.go index f24300bb859..b1aa692a160 100644 --- a/backend/src/apiserver/template/v2_template.go +++ b/backend/src/apiserver/template/v2_template.go @@ -13,6 +13,7 @@ import ( "github.com/kubeflow/pipelines/backend/src/v2/compiler/argocompiler" "google.golang.org/protobuf/encoding/protojson" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "github.com/ghodss/yaml" ) type V2Spec struct { @@ -68,7 +69,8 @@ func (t *V2Spec) GetTemplateType() TemplateType { func NewV2SpecTemplate(template []byte) (*V2Spec, error) { var spec pipelinespec.PipelineSpec - err := protojson.Unmarshal(template, &spec) + templateJson, _ := yaml.YAMLToJSON(template) + err := protojson.Unmarshal(templateJson, &spec) if err != nil { return nil, util.NewInvalidInputErrorWithDetails(ErrorInvalidPipelineSpec, fmt.Sprintf("invalid v2 pipeline spec: %s", err.Error())) } From bf63df8f40f3ee6fc8add966a393cdde4cbf2838 Mon Sep 17 00:00:00 2001 From: Linchin Date: Thu, 31 Mar 2022 22:34:45 +0000 Subject: [PATCH 02/16] Check the error message and return false if it is not nil --- backend/src/apiserver/template/template.go | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/backend/src/apiserver/template/template.go b/backend/src/apiserver/template/template.go index d76f92c6eea..881c10cd6fe 100644 --- a/backend/src/apiserver/template/template.go +++ b/backend/src/apiserver/template/template.go @@ -107,8 +107,11 @@ func isArgoWorkflow(template []byte) bool { // isPipelineSpec returns whether template is in KFP api/v2alpha1/PipelineSpec format. func isPipelineSpec(template []byte) bool { var spec pipelinespec.PipelineSpec - templateJson, _ := yaml.YAMLToJSON(template) - err := protojson.Unmarshal(templateJson, &spec) + templateJson, err := yaml.YAMLToJSON(template) + if err != nil { + return false + } + err = protojson.Unmarshal(templateJson, &spec) return err == nil && spec.GetPipelineInfo().GetName() != "" && spec.GetRoot() != nil } From 4d97abfe4f84c85061968d452acad44c2f7cfefd Mon Sep 17 00:00:00 2001 From: Linchin Date: Fri, 1 Apr 2022 00:10:14 +0000 Subject: [PATCH 03/16] update error message --- backend/src/apiserver/template/v2_template.go | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/backend/src/apiserver/template/v2_template.go b/backend/src/apiserver/template/v2_template.go index b1aa692a160..538f0844da6 100644 --- a/backend/src/apiserver/template/v2_template.go +++ b/backend/src/apiserver/template/v2_template.go @@ -6,6 +6,7 @@ import ( structpb "github.com/golang/protobuf/ptypes/struct" + "github.com/ghodss/yaml" "github.com/kubeflow/pipelines/api/v2alpha1/go/pipelinespec" api "github.com/kubeflow/pipelines/backend/api/go_client" "github.com/kubeflow/pipelines/backend/src/common/util" @@ -13,7 +14,6 @@ import ( "github.com/kubeflow/pipelines/backend/src/v2/compiler/argocompiler" "google.golang.org/protobuf/encoding/protojson" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "github.com/ghodss/yaml" ) type V2Spec struct { @@ -69,8 +69,11 @@ func (t *V2Spec) GetTemplateType() TemplateType { func NewV2SpecTemplate(template []byte) (*V2Spec, error) { var spec pipelinespec.PipelineSpec - templateJson, _ := yaml.YAMLToJSON(template) - err := protojson.Unmarshal(templateJson, &spec) + templateJson, err := yaml.YAMLToJSON(template) + if err != nil { + return nil, util.NewInvalidInputErrorWithDetails(ErrorInvalidPipelineSpec, fmt.Sprintf("cannot convert v2 pipeline spec to json format: %s", err.Error())) + } + err = protojson.Unmarshal(templateJson, &spec) if err != nil { return nil, util.NewInvalidInputErrorWithDetails(ErrorInvalidPipelineSpec, fmt.Sprintf("invalid v2 pipeline spec: %s", err.Error())) } From 927d0257d6fe4b1793ed124b6b029b739aefc9fa Mon Sep 17 00:00:00 2001 From: Linchin Date: Mon, 18 Apr 2022 21:03:30 +0000 Subject: [PATCH 04/16] fixed simple loop but need cleaning up --- backend/src/v2/compiler/argocompiler/argo.go | 2 +- backend/src/v2/driver/driver.go | 110 ++++++++++++++++++- 2 files changed, 105 insertions(+), 7 deletions(-) diff --git a/backend/src/v2/compiler/argocompiler/argo.go b/backend/src/v2/compiler/argocompiler/argo.go index c4fb4868e6b..a00e24c6f4a 100644 --- a/backend/src/v2/compiler/argocompiler/argo.go +++ b/backend/src/v2/compiler/argocompiler/argo.go @@ -107,7 +107,7 @@ func Compile(jobArg *pipelinespec.PipelineJob, opts *Options) (*wfapi.Workflow, wf: wf, templates: make(map[string]*wfapi.Template), // TODO(chensun): release process and update the images. - driverImage: "gcr.io/ml-pipeline-test/dev/kfp-driver:latest", + driverImage: "gcr.io/ling-kfp/dev/kfp-driver:latest", launcherImage: "gcr.io/ml-pipeline-test/dev/kfp-launcher-v2:latest", job: job, spec: spec, diff --git a/backend/src/v2/driver/driver.go b/backend/src/v2/driver/driver.go index 87eb211892f..f4c2724b059 100644 --- a/backend/src/v2/driver/driver.go +++ b/backend/src/v2/driver/driver.go @@ -407,13 +407,32 @@ func DAG(ctx context.Context, opts Options, mlmd *metadata.Client) (execution *E if err != nil { return nil, err } + glog.Infof("ctx value: %+v\n"+ + "dag value: %+v\n"+ + "interationIndex value: %+v\n"+ + "pipeline value: %+v\n"+ + "opts.Task value: %+v\n"+ + "opts.Component.GetInputDefinitions() value: %#+v\n"+ + "mlmd value: %+v\n"+ + "expr value: %+v\n", + ctx, + dag, + iterationIndex, + pipeline, + opts.Task, + opts.Component.GetInputDefinitions(), + mlmd, + expr, + ) inputs, err := resolveInputs(ctx, dag, iterationIndex, pipeline, opts.Task, opts.Component.GetInputDefinitions(), mlmd, expr) if err != nil { return nil, err } + // unwrap static inputs executorInput := &pipelinespec.ExecutorInput{ Inputs: inputs, } + glog.Infof("executorInput value: %+v", executorInput) execution = &Execution{ExecutorInput: executorInput} condition := opts.Task.GetTriggerPolicy().GetCondition() if condition != "" { @@ -436,15 +455,76 @@ func DAG(ctx context.Context, opts Options, mlmd *metadata.Client) (execution *E return execution, fmt.Errorf("ArtifactIterator is not implemented") } isIterator := opts.Task.GetParameterIterator() != nil && opts.IterationIndex < 0 + // Fan out iterations if execution.WillTrigger() && isIterator { iterator := opts.Task.GetParameterIterator() - value, ok := executorInput.GetInputs().GetParameterValues()[iterator.GetItems().GetInputParameter()] report := func(err error) error { return fmt.Errorf("iterating on item input %q failed: %w", iterator.GetItemInput(), err) } - if !ok { - return execution, report(fmt.Errorf("cannot find input parameter")) + glog.Infof("iterator kind: %+v", iterator.GetItems().GetKind()) + glog.Infof("This log outputs variables used in the iterator.\n"+ + "Val of iterator: %+v\n"+ + "Val of executorInput: %+v\n", + iterator, + executorInput, + ) + glog.Infof("executorInput.GetInputs() value: %+v", executorInput.GetInputs()) + glog.Infof("iterator.GetItems() value: %+v", iterator.GetItems()) + // Check the items type of parameterIterator: + // It can be "inputParameter" or "Raw" + value := structpb.NewNullValue() + var ok bool + switch iterator.GetItems().GetKind().(type) { + case *pipelinespec.ParameterIteratorSpec_ItemsSpec_InputParameter: + glog.Infof("ParameterIterator type: %T", iterator.GetItems().GetKind()) + value, ok = executorInput.GetInputs().GetParameterValues()[iterator.GetItems().GetInputParameter()] + // value, ok := executorInput.GetInputs().GetParameterValues()[iterator.GetItems().GetRaw()] + glog.Infof("iterating on item input %q failed: %w, \n"+ + " (--lingqing print--) map type: %T, map value: %+v, \n"+ + " (--lingqing print--) key type: %T, key val: %q, \n"+ + " (--lingqing print--) inputs type: %T, inputs val: %+v, \n"+ + " (--lingqing print--) pipelineType: %T, pipelineVal: %+v, \n"+ + " (--lingqing print--) pipelineCtx: %+v, \n"+ + " (--lingqing print--) pipelineRunCtx: %+v, ", + iterator.GetItemInput(), + err, + executorInput.GetInputs().GetParameterValues(), + executorInput.GetInputs().GetParameterValues(), + iterator.GetItems().GetInputParameter(), + iterator.GetItems().GetInputParameter(), // this is empty too + inputs, + *inputs, // parameter field is empty + pipeline, + *pipeline, + pipeline.GetRunCtxID(), // run context id + pipeline.GetCtxID(), // context id + ) + if !ok { + return execution, report(fmt.Errorf("cannot find input parameter")) + } + glog.Infof("inputParameter value: %+v", value) + case *pipelinespec.ParameterIteratorSpec_ItemsSpec_Raw: + glog.Infof("ParameterIterator type: %T", iterator.GetItems().GetKind()) + value_raw := iterator.GetItems().GetRaw() + glog.Info("raw_string_value: ", value_raw) + var unmarshalled_raw interface{} + err = json.Unmarshal([]byte(value_raw), &unmarshalled_raw) + if err != nil { + return execution, fmt.Errorf("error unmarshall raw string: %q", err) + } + glog.Infof("unmarshalled_raw value: %+v", unmarshalled_raw) + value, err = structpb.NewValue(unmarshalled_raw) + if err != nil { + return execution, fmt.Errorf("error converting unmarshalled raw string into protobuf Value type: %q", err) + } + // Add the raw input to the executor input + execution.ExecutorInput.Inputs.ParameterValues[iterator.GetItemInput()] = value + // Add the raw input to the task + // opts.Task. + default: + return execution, fmt.Errorf("cannot find parameter iterator") } + glog.Infof("value value: %+v", value) items, err := getItems(value) if err != nil { return execution, report(err) @@ -460,6 +540,7 @@ func DAG(ctx context.Context, opts Options, mlmd *metadata.Client) (execution *E } glog.Infof("Created execution: %s", createdExecution) execution.ID = createdExecution.GetID() + glog.Infof("created execution: %+v\n", execution) return execution, nil } @@ -482,6 +563,8 @@ func getItems(value *structpb.Value) (items []*structpb.Value, err error) { } } +// Get iteratation items from a raw string + func reuseCachedOutputs(ctx context.Context, executorInput *pipelinespec.ExecutorInput, outputDefinitions *pipelinespec.ComponentOutputsSpec, mlmd *metadata.Client, cachedMLMDExecutionID string) (*pipelinespec.ExecutorOutput, []*metadata.OutputArtifact, error) { cachedMLMDExecutionIDInt64, err := strconv.ParseInt(cachedMLMDExecutionID, 10, 64) if err != nil { @@ -594,6 +677,7 @@ func validateNonRoot(opts Options) error { } func resolveInputs(ctx context.Context, dag *metadata.DAG, iterationIndex *int, pipeline *metadata.Pipeline, task *pipelinespec.PipelineTaskSpec, inputsSpec *pipelinespec.ComponentInputsSpec, mlmd *metadata.Client, expr *expression.Expr) (inputs *pipelinespec.ExecutorInput_Inputs, err error) { + glog.Infof("task: %+v", task) // task seems fine defer func() { if err != nil { err = fmt.Errorf("failed to resolve inputs: %w", err) @@ -603,12 +687,12 @@ func resolveInputs(ctx context.Context, dag *metadata.DAG, iterationIndex *int, if err != nil { return nil, err } - glog.Infof("parent DAG input parameters %+v", inputParams) + glog.Infof("parent DAG input parameters %+v", inputParams) // not related to us inputs = &pipelinespec.ExecutorInput_Inputs{ ParameterValues: make(map[string]*structpb.Value), Artifacts: make(map[string]*pipelinespec.ArtifactList), } - isIterationDriver := iterationIndex != nil + isIterationDriver := iterationIndex != nil // false handleParameterExpressionSelector := func() error { for name, paramSpec := range task.GetInputs().GetParameters() { @@ -724,7 +808,17 @@ func resolveInputs(ctx context.Context, dag *metadata.DAG, iterationIndex *int, case task.GetArtifactIterator() != nil: return nil, fmt.Errorf("artifact iterator not implemented yet") case task.GetParameterIterator() != nil: - itemsInput := task.GetParameterIterator().GetItems().GetInputParameter() + var itemsInput string + if task.GetParameterIterator().GetItems().GetInputParameter() != "" { + itemsInput = task.GetParameterIterator().GetItems().GetInputParameter() + } else if task.GetParameterIterator().GetItemInput() != "" { + itemsInput = task.GetParameterIterator().GetItemInput() + } else { + return nil, fmt.Errorf("cannot retrieve parameter iterator.") + } + glog.Infof("task value: %+v", task) + glog.Infof("itemsInput value: %+v", itemsInput) + glog.Infof("inputs value: %+v", inputs) items, err := getItems(inputs.ParameterValues[itemsInput]) if err != nil { return nil, err @@ -737,6 +831,7 @@ func resolveInputs(ctx context.Context, dag *metadata.DAG, iterationIndex *int, default: return nil, fmt.Errorf("bug: iteration_index>=0, but task iterator is empty") } + glog.Infof("inputs value %+v", inputs) return inputs, nil } // get executions in context on demand @@ -752,7 +847,9 @@ func resolveInputs(ctx context.Context, dag *metadata.DAG, iterationIndex *int, tasksCache = tasks return tasks, nil } + glog.Infof("task.GetInputs(): %+v", task.GetInputs()) // this is empty for name, paramSpec := range task.GetInputs().GetParameters() { + glog.Infof("name: %q, paramSpec: %+v", name, paramSpec) paramError := func(err error) error { return fmt.Errorf("resolving input parameter %s with spec %s: %w", name, paramSpec, err) } @@ -853,6 +950,7 @@ func resolveInputs(ctx context.Context, dag *metadata.DAG, iterationIndex *int, } } // TODO(Bobgy): validate executor inputs match component inputs definition + glog.Infof("inputs value: %+v", inputs) return inputs, nil } From 52760475e79f4c07b0f4ef91f13f731815998e13 Mon Sep 17 00:00:00 2001 From: Linchin Date: Mon, 18 Apr 2022 22:30:14 +0000 Subject: [PATCH 05/16] Deleted debug logs --- backend/src/v2/driver/driver.go | 66 ++------------------------------- 1 file changed, 4 insertions(+), 62 deletions(-) diff --git a/backend/src/v2/driver/driver.go b/backend/src/v2/driver/driver.go index f4c2724b059..dff9ffefa84 100644 --- a/backend/src/v2/driver/driver.go +++ b/backend/src/v2/driver/driver.go @@ -407,23 +407,6 @@ func DAG(ctx context.Context, opts Options, mlmd *metadata.Client) (execution *E if err != nil { return nil, err } - glog.Infof("ctx value: %+v\n"+ - "dag value: %+v\n"+ - "interationIndex value: %+v\n"+ - "pipeline value: %+v\n"+ - "opts.Task value: %+v\n"+ - "opts.Component.GetInputDefinitions() value: %#+v\n"+ - "mlmd value: %+v\n"+ - "expr value: %+v\n", - ctx, - dag, - iterationIndex, - pipeline, - opts.Task, - opts.Component.GetInputDefinitions(), - mlmd, - expr, - ) inputs, err := resolveInputs(ctx, dag, iterationIndex, pipeline, opts.Task, opts.Component.GetInputDefinitions(), mlmd, expr) if err != nil { return nil, err @@ -461,15 +444,6 @@ func DAG(ctx context.Context, opts Options, mlmd *metadata.Client) (execution *E report := func(err error) error { return fmt.Errorf("iterating on item input %q failed: %w", iterator.GetItemInput(), err) } - glog.Infof("iterator kind: %+v", iterator.GetItems().GetKind()) - glog.Infof("This log outputs variables used in the iterator.\n"+ - "Val of iterator: %+v\n"+ - "Val of executorInput: %+v\n", - iterator, - executorInput, - ) - glog.Infof("executorInput.GetInputs() value: %+v", executorInput.GetInputs()) - glog.Infof("iterator.GetItems() value: %+v", iterator.GetItems()) // Check the items type of parameterIterator: // It can be "inputParameter" or "Raw" value := structpb.NewNullValue() @@ -478,27 +452,6 @@ func DAG(ctx context.Context, opts Options, mlmd *metadata.Client) (execution *E case *pipelinespec.ParameterIteratorSpec_ItemsSpec_InputParameter: glog.Infof("ParameterIterator type: %T", iterator.GetItems().GetKind()) value, ok = executorInput.GetInputs().GetParameterValues()[iterator.GetItems().GetInputParameter()] - // value, ok := executorInput.GetInputs().GetParameterValues()[iterator.GetItems().GetRaw()] - glog.Infof("iterating on item input %q failed: %w, \n"+ - " (--lingqing print--) map type: %T, map value: %+v, \n"+ - " (--lingqing print--) key type: %T, key val: %q, \n"+ - " (--lingqing print--) inputs type: %T, inputs val: %+v, \n"+ - " (--lingqing print--) pipelineType: %T, pipelineVal: %+v, \n"+ - " (--lingqing print--) pipelineCtx: %+v, \n"+ - " (--lingqing print--) pipelineRunCtx: %+v, ", - iterator.GetItemInput(), - err, - executorInput.GetInputs().GetParameterValues(), - executorInput.GetInputs().GetParameterValues(), - iterator.GetItems().GetInputParameter(), - iterator.GetItems().GetInputParameter(), // this is empty too - inputs, - *inputs, // parameter field is empty - pipeline, - *pipeline, - pipeline.GetRunCtxID(), // run context id - pipeline.GetCtxID(), // context id - ) if !ok { return execution, report(fmt.Errorf("cannot find input parameter")) } @@ -519,12 +472,9 @@ func DAG(ctx context.Context, opts Options, mlmd *metadata.Client) (execution *E } // Add the raw input to the executor input execution.ExecutorInput.Inputs.ParameterValues[iterator.GetItemInput()] = value - // Add the raw input to the task - // opts.Task. default: return execution, fmt.Errorf("cannot find parameter iterator") } - glog.Infof("value value: %+v", value) items, err := getItems(value) if err != nil { return execution, report(err) @@ -540,7 +490,6 @@ func DAG(ctx context.Context, opts Options, mlmd *metadata.Client) (execution *E } glog.Infof("Created execution: %s", createdExecution) execution.ID = createdExecution.GetID() - glog.Infof("created execution: %+v\n", execution) return execution, nil } @@ -563,8 +512,6 @@ func getItems(value *structpb.Value) (items []*structpb.Value, err error) { } } -// Get iteratation items from a raw string - func reuseCachedOutputs(ctx context.Context, executorInput *pipelinespec.ExecutorInput, outputDefinitions *pipelinespec.ComponentOutputsSpec, mlmd *metadata.Client, cachedMLMDExecutionID string) (*pipelinespec.ExecutorOutput, []*metadata.OutputArtifact, error) { cachedMLMDExecutionIDInt64, err := strconv.ParseInt(cachedMLMDExecutionID, 10, 64) if err != nil { @@ -677,7 +624,6 @@ func validateNonRoot(opts Options) error { } func resolveInputs(ctx context.Context, dag *metadata.DAG, iterationIndex *int, pipeline *metadata.Pipeline, task *pipelinespec.PipelineTaskSpec, inputsSpec *pipelinespec.ComponentInputsSpec, mlmd *metadata.Client, expr *expression.Expr) (inputs *pipelinespec.ExecutorInput_Inputs, err error) { - glog.Infof("task: %+v", task) // task seems fine defer func() { if err != nil { err = fmt.Errorf("failed to resolve inputs: %w", err) @@ -687,12 +633,12 @@ func resolveInputs(ctx context.Context, dag *metadata.DAG, iterationIndex *int, if err != nil { return nil, err } - glog.Infof("parent DAG input parameters %+v", inputParams) // not related to us + glog.Infof("parent DAG input parameters %+v", inputParams) inputs = &pipelinespec.ExecutorInput_Inputs{ ParameterValues: make(map[string]*structpb.Value), Artifacts: make(map[string]*pipelinespec.ArtifactList), } - isIterationDriver := iterationIndex != nil // false + isIterationDriver := iterationIndex != nil handleParameterExpressionSelector := func() error { for name, paramSpec := range task.GetInputs().GetParameters() { @@ -810,15 +756,14 @@ func resolveInputs(ctx context.Context, dag *metadata.DAG, iterationIndex *int, case task.GetParameterIterator() != nil: var itemsInput string if task.GetParameterIterator().GetItems().GetInputParameter() != "" { + // input comes from outside the component itemsInput = task.GetParameterIterator().GetItems().GetInputParameter() } else if task.GetParameterIterator().GetItemInput() != "" { + // input comes from static input itemsInput = task.GetParameterIterator().GetItemInput() } else { return nil, fmt.Errorf("cannot retrieve parameter iterator.") } - glog.Infof("task value: %+v", task) - glog.Infof("itemsInput value: %+v", itemsInput) - glog.Infof("inputs value: %+v", inputs) items, err := getItems(inputs.ParameterValues[itemsInput]) if err != nil { return nil, err @@ -847,9 +792,7 @@ func resolveInputs(ctx context.Context, dag *metadata.DAG, iterationIndex *int, tasksCache = tasks return tasks, nil } - glog.Infof("task.GetInputs(): %+v", task.GetInputs()) // this is empty for name, paramSpec := range task.GetInputs().GetParameters() { - glog.Infof("name: %q, paramSpec: %+v", name, paramSpec) paramError := func(err error) error { return fmt.Errorf("resolving input parameter %s with spec %s: %w", name, paramSpec, err) } @@ -950,7 +893,6 @@ func resolveInputs(ctx context.Context, dag *metadata.DAG, iterationIndex *int, } } // TODO(Bobgy): validate executor inputs match component inputs definition - glog.Infof("inputs value: %+v", inputs) return inputs, nil } From 483248f00a7bb92254f560718eb9505c553c48a5 Mon Sep 17 00:00:00 2001 From: Linchin Date: Tue, 19 Apr 2022 23:00:01 +0000 Subject: [PATCH 06/16] remove logs and fix some format --- backend/src/v2/driver/driver.go | 4 ---- 1 file changed, 4 deletions(-) diff --git a/backend/src/v2/driver/driver.go b/backend/src/v2/driver/driver.go index dff9ffefa84..34d622073eb 100644 --- a/backend/src/v2/driver/driver.go +++ b/backend/src/v2/driver/driver.go @@ -411,7 +411,6 @@ func DAG(ctx context.Context, opts Options, mlmd *metadata.Client) (execution *E if err != nil { return nil, err } - // unwrap static inputs executorInput := &pipelinespec.ExecutorInput{ Inputs: inputs, } @@ -455,17 +454,14 @@ func DAG(ctx context.Context, opts Options, mlmd *metadata.Client) (execution *E if !ok { return execution, report(fmt.Errorf("cannot find input parameter")) } - glog.Infof("inputParameter value: %+v", value) case *pipelinespec.ParameterIteratorSpec_ItemsSpec_Raw: glog.Infof("ParameterIterator type: %T", iterator.GetItems().GetKind()) value_raw := iterator.GetItems().GetRaw() - glog.Info("raw_string_value: ", value_raw) var unmarshalled_raw interface{} err = json.Unmarshal([]byte(value_raw), &unmarshalled_raw) if err != nil { return execution, fmt.Errorf("error unmarshall raw string: %q", err) } - glog.Infof("unmarshalled_raw value: %+v", unmarshalled_raw) value, err = structpb.NewValue(unmarshalled_raw) if err != nil { return execution, fmt.Errorf("error converting unmarshalled raw string into protobuf Value type: %q", err) From 51c6c8c1b09379719090ef01e516cbb771fecadd Mon Sep 17 00:00:00 2001 From: Linchin Date: Wed, 20 Apr 2022 20:50:17 +0000 Subject: [PATCH 07/16] fix static_loop_arguments --- samples/core/loop_static/loop_static_v2.py | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/samples/core/loop_static/loop_static_v2.py b/samples/core/loop_static/loop_static_v2.py index a5c1eafe9c0..62f92b6e814 100644 --- a/samples/core/loop_static/loop_static_v2.py +++ b/samples/core/loop_static/loop_static_v2.py @@ -19,15 +19,12 @@ def concat_op(a: str, b: str) -> str: return a + b -_DEFAULT_LOOP_ARGUMENTS = [{'a': '1', 'b': '2'}, {'a': '10', 'b': '20'}] - - @dsl.pipeline(name='pipeline-with-loop-static') def my_pipeline( - static_loop_arguments: List[dict] = _DEFAULT_LOOP_ARGUMENTS, greeting: str = 'this is a test for looping through parameters', ): print_task = print_op(text=greeting) + static_loop_arguments = [{'a': '1', 'b': '2'}, {'a': '10', 'b': '20'}] with dsl.ParallelFor(static_loop_arguments) as item: concat_task = concat_op(a=item.a, b=item.b) From 02eb7b175385012aaba9782838604b1a2bdf01ee Mon Sep 17 00:00:00 2001 From: Lingqing Gan Date: Wed, 20 Apr 2022 14:00:32 -0700 Subject: [PATCH 08/16] change the driver image change the driver image back to the kfp container registry. --- backend/src/v2/compiler/argocompiler/argo.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/backend/src/v2/compiler/argocompiler/argo.go b/backend/src/v2/compiler/argocompiler/argo.go index a00e24c6f4a..c4fb4868e6b 100644 --- a/backend/src/v2/compiler/argocompiler/argo.go +++ b/backend/src/v2/compiler/argocompiler/argo.go @@ -107,7 +107,7 @@ func Compile(jobArg *pipelinespec.PipelineJob, opts *Options) (*wfapi.Workflow, wf: wf, templates: make(map[string]*wfapi.Template), // TODO(chensun): release process and update the images. - driverImage: "gcr.io/ling-kfp/dev/kfp-driver:latest", + driverImage: "gcr.io/ml-pipeline-test/dev/kfp-driver:latest", launcherImage: "gcr.io/ml-pipeline-test/dev/kfp-launcher-v2:latest", job: job, spec: spec, From 04d5b2ce03e57e364352cf6c992ee44353e6298b Mon Sep 17 00:00:00 2001 From: Linchin Date: Wed, 20 Apr 2022 21:08:53 +0000 Subject: [PATCH 09/16] change variable declaration --- backend/src/v2/driver/driver.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/backend/src/v2/driver/driver.go b/backend/src/v2/driver/driver.go index 34d622073eb..bb7f7deb799 100644 --- a/backend/src/v2/driver/driver.go +++ b/backend/src/v2/driver/driver.go @@ -445,7 +445,7 @@ func DAG(ctx context.Context, opts Options, mlmd *metadata.Client) (execution *E } // Check the items type of parameterIterator: // It can be "inputParameter" or "Raw" - value := structpb.NewNullValue() + var value *structpb.Value var ok bool switch iterator.GetItems().GetKind().(type) { case *pipelinespec.ParameterIteratorSpec_ItemsSpec_InputParameter: From ca6def8d01c9cfed01c0bfb1b5407d9978e56cf1 Mon Sep 17 00:00:00 2001 From: Linchin Date: Wed, 20 Apr 2022 22:20:08 +0000 Subject: [PATCH 10/16] remove logs --- backend/src/v2/driver/driver.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/backend/src/v2/driver/driver.go b/backend/src/v2/driver/driver.go index bb7f7deb799..33e69631f0d 100644 --- a/backend/src/v2/driver/driver.go +++ b/backend/src/v2/driver/driver.go @@ -449,13 +449,11 @@ func DAG(ctx context.Context, opts Options, mlmd *metadata.Client) (execution *E var ok bool switch iterator.GetItems().GetKind().(type) { case *pipelinespec.ParameterIteratorSpec_ItemsSpec_InputParameter: - glog.Infof("ParameterIterator type: %T", iterator.GetItems().GetKind()) value, ok = executorInput.GetInputs().GetParameterValues()[iterator.GetItems().GetInputParameter()] if !ok { return execution, report(fmt.Errorf("cannot find input parameter")) } case *pipelinespec.ParameterIteratorSpec_ItemsSpec_Raw: - glog.Infof("ParameterIterator type: %T", iterator.GetItems().GetKind()) value_raw := iterator.GetItems().GetRaw() var unmarshalled_raw interface{} err = json.Unmarshal([]byte(value_raw), &unmarshalled_raw) From 4ebab0a8f85e946d144c7ed9fb0791c9312e43ed Mon Sep 17 00:00:00 2001 From: Linchin Date: Wed, 20 Apr 2022 22:22:41 +0000 Subject: [PATCH 11/16] remove log --- backend/src/v2/driver/driver.go | 1 - 1 file changed, 1 deletion(-) diff --git a/backend/src/v2/driver/driver.go b/backend/src/v2/driver/driver.go index 33e69631f0d..1ca2c077e23 100644 --- a/backend/src/v2/driver/driver.go +++ b/backend/src/v2/driver/driver.go @@ -770,7 +770,6 @@ func resolveInputs(ctx context.Context, dag *metadata.DAG, iterationIndex *int, default: return nil, fmt.Errorf("bug: iteration_index>=0, but task iterator is empty") } - glog.Infof("inputs value %+v", inputs) return inputs, nil } // get executions in context on demand From a48e3ba6f7a9b92fc24f0e0a8d1f06943f6adab2 Mon Sep 17 00:00:00 2001 From: Lingqing Gan Date: Fri, 22 Apr 2022 11:33:32 -0700 Subject: [PATCH 12/16] move `ok` definition --- backend/src/v2/driver/driver.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/backend/src/v2/driver/driver.go b/backend/src/v2/driver/driver.go index 1ca2c077e23..5916ebcd7a6 100644 --- a/backend/src/v2/driver/driver.go +++ b/backend/src/v2/driver/driver.go @@ -446,9 +446,9 @@ func DAG(ctx context.Context, opts Options, mlmd *metadata.Client) (execution *E // Check the items type of parameterIterator: // It can be "inputParameter" or "Raw" var value *structpb.Value - var ok bool switch iterator.GetItems().GetKind().(type) { case *pipelinespec.ParameterIteratorSpec_ItemsSpec_InputParameter: + var ok bool value, ok = executorInput.GetInputs().GetParameterValues()[iterator.GetItems().GetInputParameter()] if !ok { return execution, report(fmt.Errorf("cannot find input parameter")) From bafad399fac68f4e00d7990bd4a46168dc67ff84 Mon Sep 17 00:00:00 2001 From: Linchin Date: Tue, 3 May 2022 20:35:38 +0000 Subject: [PATCH 13/16] change test file for debug purpose --- samples/core/loop_static/loop_static_test.py | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/samples/core/loop_static/loop_static_test.py b/samples/core/loop_static/loop_static_test.py index c11ad9312e2..97329666a8b 100644 --- a/samples/core/loop_static/loop_static_test.py +++ b/samples/core/loop_static/loop_static_test.py @@ -18,11 +18,19 @@ import kfp_server_api from .loop_static import my_pipeline from .loop_static_v2 import my_pipeline as my_pipeline_v2 -from kfp.samples.test.utils import KfpTask, debug_verify, run_pipeline_func, TestCase +from kfp.samples.test.utils import KfpTask, run_pipeline_func, TestCase +def obj_to_string(obj, extra=' '): + return str(obj.__class__) + '\n' + '\n'.join( + (extra + (str(item) + ' = ' + + (obj_to_string(obj.__dict__[item], extra + ' ') if hasattr(obj.__dict__[item], '__dict__') else str( + obj.__dict__[item]))) + for item in sorted(obj.__dict__))) + def verify(t: unittest.TestCase, run: kfp_server_api.ApiRun, tasks: dict[str, KfpTask], **kwargs): + print(obj_to_string(tasks)) t.assertEqual(run.status, 'Succeeded') # assert DAG structure t.assertCountEqual(['print-op', 'for-loop-1'], tasks.keys()) From a04c8cfc0ad511cc302b071c5785e451e151f777 Mon Sep 17 00:00:00 2001 From: Linchin Date: Tue, 3 May 2022 22:19:41 +0000 Subject: [PATCH 14/16] change test for debug purpose --- samples/core/loop_static/loop_static_test.py | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/samples/core/loop_static/loop_static_test.py b/samples/core/loop_static/loop_static_test.py index 97329666a8b..a96e62ac34b 100644 --- a/samples/core/loop_static/loop_static_test.py +++ b/samples/core/loop_static/loop_static_test.py @@ -24,13 +24,14 @@ def obj_to_string(obj, extra=' '): return str(obj.__class__) + '\n' + '\n'.join( (extra + (str(item) + ' = ' + - (obj_to_string(obj.__dict__[item], extra + ' ') if hasattr(obj.__dict__[item], '__dict__') else str( - obj.__dict__[item]))) - for item in sorted(obj.__dict__))) + (obj_to_string(obj[item], extra + ' ') if type(obj[item]) is dict else str( + obj[item]))) + for item in sorted(obj.keys()))) def verify(t: unittest.TestCase, run: kfp_server_api.ApiRun, tasks: dict[str, KfpTask], **kwargs): - print(obj_to_string(tasks)) + print("Line 33: tasks keys: ", tasks.keys()) + print("Line 34: tasks fields: ", obj_to_string(tasks)) t.assertEqual(run.status, 'Succeeded') # assert DAG structure t.assertCountEqual(['print-op', 'for-loop-1'], tasks.keys()) From e5d8c4b34e4781e6e2266e2a57dda00b1cde0e19 Mon Sep 17 00:00:00 2001 From: Linchin Date: Wed, 4 May 2022 00:38:58 +0000 Subject: [PATCH 15/16] update sample test for static loop --- samples/core/loop_static/loop_static_test.py | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/samples/core/loop_static/loop_static_test.py b/samples/core/loop_static/loop_static_test.py index a96e62ac34b..32972941e72 100644 --- a/samples/core/loop_static/loop_static_test.py +++ b/samples/core/loop_static/loop_static_test.py @@ -32,9 +32,10 @@ def verify(t: unittest.TestCase, run: kfp_server_api.ApiRun, tasks: dict[str, KfpTask], **kwargs): print("Line 33: tasks keys: ", tasks.keys()) print("Line 34: tasks fields: ", obj_to_string(tasks)) + print("Line 35 direct print of tasks: ", tasks) t.assertEqual(run.status, 'Succeeded') # assert DAG structure - t.assertCountEqual(['print-op', 'for-loop-1'], tasks.keys()) + t.assertCountEqual(['print-op', 'for-loop-2'], tasks.keys()) # assert all iteration parameters t.assertCountEqual( [{ @@ -46,14 +47,14 @@ def verify(t: unittest.TestCase, run: kfp_server_api.ApiRun, }], [ x.inputs - .parameters['pipelinechannel--static_loop_arguments-loop-item'] - for x in tasks['for-loop-1'].children.values() + .parameters['pipelinechannel--loop-item-param-1'] + for x in tasks['for-loop-2'].children.values() ], ) # assert all iteration outputs t.assertCountEqual(['12', '1020'], [ x.children['print-op-2'].outputs.parameters['Output'] - for x in tasks['for-loop-1'].children.values() + for x in tasks['for-loop-2'].children.values() ]) From 06e6402de07e3d8ae1f5480f1324a606be5aeca8 Mon Sep 17 00:00:00 2001 From: Linchin Date: Wed, 4 May 2022 03:27:10 +0000 Subject: [PATCH 16/16] update test file, remove code for debug --- samples/core/loop_static/loop_static_test.py | 10 ---------- 1 file changed, 10 deletions(-) diff --git a/samples/core/loop_static/loop_static_test.py b/samples/core/loop_static/loop_static_test.py index 32972941e72..909d6261f9d 100644 --- a/samples/core/loop_static/loop_static_test.py +++ b/samples/core/loop_static/loop_static_test.py @@ -21,18 +21,8 @@ from kfp.samples.test.utils import KfpTask, run_pipeline_func, TestCase -def obj_to_string(obj, extra=' '): - return str(obj.__class__) + '\n' + '\n'.join( - (extra + (str(item) + ' = ' + - (obj_to_string(obj[item], extra + ' ') if type(obj[item]) is dict else str( - obj[item]))) - for item in sorted(obj.keys()))) - def verify(t: unittest.TestCase, run: kfp_server_api.ApiRun, tasks: dict[str, KfpTask], **kwargs): - print("Line 33: tasks keys: ", tasks.keys()) - print("Line 34: tasks fields: ", obj_to_string(tasks)) - print("Line 35 direct print of tasks: ", tasks) t.assertEqual(run.status, 'Succeeded') # assert DAG structure t.assertCountEqual(['print-op', 'for-loop-2'], tasks.keys())