Skip to content

Commit

Permalink
fix(backend): MLMD pagination on getting executions of DAG (kubeflow#…
Browse files Browse the repository at this point in the history
  • Loading branch information
hellojunha authored and petethegreat committed Mar 29, 2024
1 parent b4ddfab commit b98296d
Showing 1 changed file with 33 additions and 20 deletions.
53 changes: 33 additions & 20 deletions backend/src/v2/metadata/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -700,29 +700,42 @@ func (c *Client) GetExecutionsInDAG(ctx context.Context, dag *DAG, pipeline *Pip
// Note, because MLMD does not have index on custom properties right now, we
// take a pipeline run context to limit the number of executions the DB needs to
// iterate through to find sub-executions.
res, err := c.svc.GetExecutionsByContext(ctx, &pb.GetExecutionsByContextRequest{
ContextId: pipeline.pipelineRunCtx.Id,
Options: &pb.ListOperationOptions{
FilterQuery: &parentDAGFilter,
},
})
if err != nil {
return nil, err
}
execs := res.GetExecutions()
for _, e := range execs {
execution := &Execution{execution: e}
taskName := execution.TaskName()
if taskName == "" {
return nil, fmt.Errorf("empty task name for execution ID: %v", execution.GetID())

nextPageToken := ""
for {
res, err := c.svc.GetExecutionsByContext(ctx, &pb.GetExecutionsByContextRequest{
ContextId: pipeline.pipelineRunCtx.Id,
Options: &pb.ListOperationOptions{
FilterQuery: &parentDAGFilter,
NextPageToken: &nextPageToken,
},
})
if err != nil {
return nil, err
}

execs := res.GetExecutions()
for _, e := range execs {
execution := &Execution{execution: e}
taskName := execution.TaskName()
if taskName == "" {
return nil, fmt.Errorf("empty task name for execution ID: %v", execution.GetID())
}
existing, ok := executionsMap[taskName]
if ok {
// TODO(Bobgy): to support retry, we need to handle multiple tasks with the same task name.
return nil, fmt.Errorf("two tasks have the same task name %q, id1=%v id2=%v", taskName, existing.GetID(), execution.GetID())
}
executionsMap[taskName] = execution
}
existing, ok := executionsMap[taskName]
if ok {
// TODO(Bobgy): to support retry, we need to handle multiple tasks with the same task name.
return nil, fmt.Errorf("two tasks have the same task name %q, id1=%v id2=%v", taskName, existing.GetID(), execution.GetID())

nextPageToken = res.GetNextPageToken()

if nextPageToken == "" {
break
}
executionsMap[taskName] = execution
}

return executionsMap, nil
}

Expand Down

0 comments on commit b98296d

Please sign in to comment.