Skip to content

Commit

Permalink
fix(trigger): add component intermediate data in the trigger stream/r…
Browse files Browse the repository at this point in the history
…esponse
  • Loading branch information
donch1989 committed Dec 11, 2024
1 parent ab29d4d commit 9503130
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 0 deletions.
11 changes: 11 additions & 0 deletions pkg/memory/memory.go
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,17 @@ func (wfm *workflowMemory) SetComponentData(ctx context.Context, batchIdx int, c
}
wfm.Data[batchIdx].(data.Map)[componentID].(data.Map)[string(t)] = value

// TODO: For binary data fields, we should return a URL to access the blob instead of the raw data
if t == ComponentDataInput {
if err := wfm.sendComponentEvent(ctx, batchIdx, componentID, ComponentInputUpdated); err != nil {
return err
}
} else if t == ComponentDataOutput {
if err := wfm.sendComponentEvent(ctx, batchIdx, componentID, ComponentOutputUpdated); err != nil {
return err
}
}

return nil
}
func (wfm *workflowMemory) GetComponentData(ctx context.Context, batchIdx int, componentID string, t ComponentDataType) (value format.Value, err error) {
Expand Down
23 changes: 23 additions & 0 deletions pkg/recipe/dag.go
Original file line number Diff line number Diff line change
Expand Up @@ -628,6 +628,8 @@ func GenerateTraces(ctx context.Context, wfm memory.WorkflowMemory, full bool) (

for compID := range wfm.GetRecipe().Component {

inputs := make([]*structpb.Struct, batchSize)
outputs := make([]*structpb.Struct, batchSize)
errors := make([]*structpb.Struct, batchSize)
traceStatuses := make([]pb.Trace_Status, batchSize)

Expand Down Expand Up @@ -657,10 +659,31 @@ func GenerateTraces(ctx context.Context, wfm memory.WorkflowMemory, full bool) (
errors[dataIdx] = structVal.GetStructValue()
}

// TODO: For binary data fields, we should return a URL to access the blob instead of the raw data
if full {
if input, err := wfm.GetComponentData(ctx, dataIdx, compID, memory.ComponentDataInput); err == nil {
structVal, err := input.ToStructValue()
if err != nil {
return nil, err
}
inputs[dataIdx] = structVal.GetStructValue()
}

if output, err := wfm.GetComponentData(ctx, dataIdx, compID, memory.ComponentDataOutput); err == nil {
structVal, err := output.ToStructValue()
if err != nil {
return nil, err
}
outputs[dataIdx] = structVal.GetStructValue()
}
}
}

trace[compID] = &pb.Trace{
Statuses: traceStatuses,
Inputs: inputs,
Outputs: outputs,

// Note: Currently, all errors in a batch are the same.
Error: errors[0],
}
Expand Down

0 comments on commit 9503130

Please sign in to comment.