From bebe57f58d7735e36e83c5181b0eacd4d550988a Mon Sep 17 00:00:00 2001 From: "Chang, Hui-Tang" Date: Thu, 16 Jan 2025 12:49:33 +0800 Subject: [PATCH] feat(pipeline): return the error from a component inside an iterator (#955) --- pkg/worker/io.go | 92 ++++++++++++++++++++++++++++++------------ pkg/worker/workflow.go | 58 ++++++++++++++++++-------- 2 files changed, 108 insertions(+), 42 deletions(-) diff --git a/pkg/worker/io.go b/pkg/worker/io.go index 2fc7a6099..e60896ffa 100644 --- a/pkg/worker/io.go +++ b/pkg/worker/io.go @@ -14,26 +14,32 @@ import ( ) type setupReader struct { + memoryStore memory.MemoryStore + workflowID string compID string - wfm memory.WorkflowMemory conditionMap map[int]int } -func NewSetupReader(wfm memory.WorkflowMemory, compID string, conditionMap map[int]int) *setupReader { +func NewSetupReader(memoryStore memory.MemoryStore, workflowID string, compID string, conditionMap map[int]int) *setupReader { return &setupReader{ + memoryStore: memoryStore, + workflowID: workflowID, compID: compID, - wfm: wfm, conditionMap: conditionMap, } } func (i *setupReader) Read(ctx context.Context) (setups []*structpb.Struct, err error) { + wfm, err := i.memoryStore.GetWorkflowMemory(ctx, i.workflowID) + if err != nil { + return nil, err + } for idx := range len(i.conditionMap) { - setupTemplate, err := i.wfm.GetComponentData(ctx, i.conditionMap[idx], i.compID, memory.ComponentDataSetupTemplate) + setupTemplate, err := wfm.GetComponentData(ctx, i.conditionMap[idx], i.compID, memory.ComponentDataSetupTemplate) if err != nil { return nil, err } - setupVal, err := recipe.Render(ctx, setupTemplate, i.conditionMap[idx], i.wfm, false) + setupVal, err := recipe.Render(ctx, setupTemplate, i.conditionMap[idx], wfm, false) if err != nil { return nil, err } @@ -48,16 +54,18 @@ func (i *setupReader) Read(ctx context.Context) (setups []*structpb.Struct, err } type inputReader struct { + memoryStore memory.MemoryStore + workflowID string compID string - wfm memory.WorkflowMemory originalIdx int binaryFetcher external.BinaryFetcher } -func NewInputReader(wfm memory.WorkflowMemory, compID string, originalIdx int, binaryFetcher external.BinaryFetcher) *inputReader { +func NewInputReader(memoryStore memory.MemoryStore, workflowID string, compID string, originalIdx int, binaryFetcher external.BinaryFetcher) *inputReader { return &inputReader{ + memoryStore: memoryStore, + workflowID: workflowID, compID: compID, - wfm: wfm, originalIdx: originalIdx, binaryFetcher: binaryFetcher, } @@ -67,18 +75,22 @@ func NewInputReader(wfm memory.WorkflowMemory, compID string, originalIdx int, b // ReadData() instead. // structpb is not suitable for handling binary data and will be phased out gradually. func (i *inputReader) read(ctx context.Context) (inputVal format.Value, err error) { + wfm, err := i.memoryStore.GetWorkflowMemory(ctx, i.workflowID) + if err != nil { + return nil, err + } - inputTemplate, err := i.wfm.GetComponentData(ctx, i.originalIdx, i.compID, memory.ComponentDataInputTemplate) + inputTemplate, err := wfm.GetComponentData(ctx, i.originalIdx, i.compID, memory.ComponentDataInputTemplate) if err != nil { return nil, err } - inputVal, err = recipe.Render(ctx, inputTemplate, i.originalIdx, i.wfm, false) + inputVal, err = recipe.Render(ctx, inputTemplate, i.originalIdx, wfm, false) if err != nil { return nil, err } - if err = i.wfm.SetComponentData(ctx, i.originalIdx, i.compID, memory.ComponentDataInput, inputVal); err != nil { + if err = wfm.SetComponentData(ctx, i.originalIdx, i.compID, memory.ComponentDataInput, inputVal); err != nil { return nil, err } return inputVal, nil @@ -116,16 +128,18 @@ func (i *inputReader) ReadData(ctx context.Context, input any) (err error) { } type outputWriter struct { + memoryStore memory.MemoryStore + workflowID string compID string - wfm memory.WorkflowMemory originalIdx int streaming bool } -func NewOutputWriter(wfm memory.WorkflowMemory, compID string, originalIdx int, streaming bool) *outputWriter { +func NewOutputWriter(memoryStore memory.MemoryStore, workflowID string, compID string, originalIdx int, streaming bool) *outputWriter { return &outputWriter{ + memoryStore: memoryStore, + workflowID: workflowID, compID: compID, - wfm: wfm, originalIdx: originalIdx, streaming: streaming, } @@ -157,22 +171,26 @@ func (o *outputWriter) Write(ctx context.Context, output *structpb.Struct) (err // Use WriteData() instead. structpb is not suitable for handling binary data // and will be phased out gradually. func (o *outputWriter) write(ctx context.Context, val format.Value) (err error) { + wfm, err := o.memoryStore.GetWorkflowMemory(ctx, o.workflowID) + if err != nil { + return err + } - if err := o.wfm.SetComponentData(ctx, o.originalIdx, o.compID, memory.ComponentDataOutput, val); err != nil { + if err := wfm.SetComponentData(ctx, o.originalIdx, o.compID, memory.ComponentDataOutput, val); err != nil { return err } if o.streaming { - outputTemplate, err := o.wfm.Get(ctx, o.originalIdx, string(memory.PipelineOutputTemplate)) + outputTemplate, err := wfm.Get(ctx, o.originalIdx, string(memory.PipelineOutputTemplate)) if err != nil { return err } - output, err := recipe.Render(ctx, outputTemplate, o.originalIdx, o.wfm, true) + output, err := recipe.Render(ctx, outputTemplate, o.originalIdx, wfm, true) if err != nil { return err } - err = o.wfm.SetPipelineData(ctx, o.originalIdx, memory.PipelineOutput, output) + err = wfm.SetPipelineData(ctx, o.originalIdx, memory.PipelineOutput, output) if err != nil { return err } @@ -182,20 +200,44 @@ func (o *outputWriter) write(ctx context.Context, val format.Value) (err error) } type errorHandler struct { + memoryStore memory.MemoryStore + workflowID string compID string - wfm memory.WorkflowMemory originalIdx int + + parentWorkflowID *string + parentCompID *string + parentOriginalIdx *int } -func NewErrorHandler(wfm memory.WorkflowMemory, compID string, originalIdx int) *errorHandler { +func NewErrorHandler(memoryStore memory.MemoryStore, workflowID string, compID string, originalIdx int, parentWorkflowID *string, parentCompID *string, parentOriginalIdx *int) *errorHandler { return &errorHandler{ - compID: compID, - wfm: wfm, - originalIdx: originalIdx, + memoryStore: memoryStore, + workflowID: workflowID, + compID: compID, + originalIdx: originalIdx, + parentWorkflowID: parentWorkflowID, + parentCompID: parentCompID, + parentOriginalIdx: parentOriginalIdx, } } func (e *errorHandler) Error(ctx context.Context, err error) { - _ = e.wfm.SetComponentStatus(ctx, e.originalIdx, e.compID, memory.ComponentStatusErrored, true) - _ = e.wfm.SetComponentErrorMessage(ctx, e.originalIdx, e.compID, errmsg.MessageOrErr(err)) + + wfm, wfmErr := e.memoryStore.GetWorkflowMemory(ctx, e.workflowID) + if wfmErr != nil { + return + } + + _ = wfm.SetComponentStatus(ctx, e.originalIdx, e.compID, memory.ComponentStatusErrored, true) + _ = wfm.SetComponentErrorMessage(ctx, e.originalIdx, e.compID, errmsg.MessageOrErr(err)) + + if e.parentWorkflowID != nil { + iterWfm, iterWfmErr := e.memoryStore.GetWorkflowMemory(ctx, *e.parentWorkflowID) + if iterWfmErr != nil { + return + } + _ = iterWfm.SetComponentStatus(ctx, *e.parentOriginalIdx, *e.parentCompID, memory.ComponentStatusErrored, true) + _ = iterWfm.SetComponentErrorMessage(ctx, *e.parentOriginalIdx, *e.parentCompID, errmsg.MessageOrErr(err)) + } } diff --git a/pkg/worker/workflow.go b/pkg/worker/workflow.go index e918d4b78..f70dc7da0 100644 --- a/pkg/worker/workflow.go +++ b/pkg/worker/workflow.go @@ -46,6 +46,11 @@ type TriggerPipelineWorkflowParam struct { Mode mgmtpb.Mode TriggerFromAPI bool WorkerUID uuid.UUID + + // If the pipeline trigger is from an iterator, these fields will be set + ParentWorkflowID *string + ParentCompID *string + ParentOriginalIdx *int } type SchedulePipelineLoaderActivityParam struct { @@ -69,6 +74,11 @@ type ComponentActivityParam struct { Task string SystemVariables recipe.SystemVariables // TODO: we should store vars directly in trigger memory. Streaming bool + + // If the component belongs to an iterator, these fields will be set + ParentWorkflowID *string + ParentCompID *string + ParentOriginalIdx *int } type PreIteratorActivityParam struct { @@ -301,13 +311,16 @@ func (w *worker) TriggerPipelineWorkflow(ctx workflow.Context, param *TriggerPip }).Get(ctx, nil) args := &ComponentActivityParam{ - WorkflowID: workflowID, - ID: compID, - UpstreamIDs: upstreamIDs, - Type: comp.Type, - Task: comp.Task, - Condition: comp.Condition, - SystemVariables: param.SystemVariables, + WorkflowID: workflowID, + ID: compID, + UpstreamIDs: upstreamIDs, + Type: comp.Type, + Task: comp.Task, + Condition: comp.Condition, + SystemVariables: param.SystemVariables, + ParentWorkflowID: param.ParentWorkflowID, + ParentCompID: param.ParentCompID, + ParentOriginalIdx: param.ParentOriginalIdx, } futures = append(futures, workflow.ExecuteActivity(ctx, w.ComponentActivity, args)) @@ -351,12 +364,13 @@ func (w *worker) TriggerPipelineWorkflow(ctx workflow.Context, param *TriggerPip workflow.WithChildOptions(ctx, childWorkflowOptions), "TriggerPipelineWorkflow", &TriggerPipelineWorkflowParam{ - TriggerFromAPI: false, - SystemVariables: param.SystemVariables, - Mode: mgmtpb.Mode_MODE_SYNC, - WorkerUID: param.WorkerUID, - // TODO: support streaming inside iterator. - // IsStreaming: param.IsStreaming, + TriggerFromAPI: false, + SystemVariables: param.SystemVariables, + Mode: mgmtpb.Mode_MODE_SYNC, + WorkerUID: param.WorkerUID, + ParentWorkflowID: &workflowID, + ParentCompID: &compID, + ParentOriginalIdx: &iter, })) } for iter := 0; iter < len(itFutures); iter++ { @@ -538,7 +552,7 @@ func (w *worker) ComponentActivity(ctx context.Context, param *ComponentActivity return nil } - setups, err := NewSetupReader(wfm, param.ID, conditionMap).Read(ctx) + setups, err := NewSetupReader(w.memoryStore, param.WorkflowID, param.ID, conditionMap).Read(ctx) if err != nil { return componentActivityError(ctx, wfm, err, componentActivityErrorType, param.ID) } @@ -563,10 +577,11 @@ func (w *worker) ComponentActivity(ctx context.Context, param *ComponentActivity jobs := make([]*componentbase.Job, len(conditionMap)) for idx, originalIdx := range conditionMap { + jobs[idx] = &componentbase.Job{ - Input: NewInputReader(wfm, param.ID, originalIdx, w.binaryFetcher), - Output: NewOutputWriter(wfm, param.ID, originalIdx, wfm.IsStreaming()), - Error: NewErrorHandler(wfm, param.ID, originalIdx), + Input: NewInputReader(w.memoryStore, param.WorkflowID, param.ID, originalIdx, w.binaryFetcher), + Output: NewOutputWriter(w.memoryStore, param.WorkflowID, param.ID, originalIdx, wfm.IsStreaming()), + Error: NewErrorHandler(w.memoryStore, param.WorkflowID, param.ID, originalIdx, param.ParentWorkflowID, param.ParentCompID, param.ParentOriginalIdx), } } err = execution.Execute( @@ -905,6 +920,15 @@ func (w *worker) PostIteratorActivity(ctx context.Context, param *PostIteratorAc return componentActivityError(ctx, wfm, err, postIteratorActivityErrorType, param.ID) } + errored, err := wfm.GetComponentStatus(ctx, originalIdx, param.ID, memory.ComponentStatusErrored) + + if err != nil { + return componentActivityError(ctx, wfm, err, postIteratorActivityErrorType, param.ID) + } + if errored { + return nil + } + output := data.Map{} for k, v := range param.OutputElements { elemVals := data.Array{}