-
Notifications
You must be signed in to change notification settings - Fork 1.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add API to rerun the pipeline #1720
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@IronPan can you explain the use-case here a little bit? I'd like to understand this in the context of caching with metadata. When we resubmit a run, is this submitting the same pipeline? How is it different from cloning the run?
@@ -67,6 +67,7 @@ type ClientManager struct { | |||
swfClient scheduledworkflowclient.ScheduledWorkflowInterface | |||
time util.TimeInterface | |||
uuid util.UUIDGeneratorInterface | |||
randomString util.RandomStringInterface |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
format
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
@@ -119,6 +120,11 @@ func (c *ClientManager) UUID() util.UUIDGeneratorInterface { | |||
return c.uuid | |||
} | |||
|
|||
func (c *ClientManager) RandomString() util.RandomStringInterface{ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I strongly recommend getting rid of this ClientManager abstraction. It adds unnecessary complexity to the code. This function is a perfect example of code indirection that adds to cognitive burden for the reader of the code. There is no need for generating a random string to be a function of ClientManager. We can push this to its own function where it is used, and in tests, replace that with a mock as needed, i.e.
file where used:
var randomStringGenFn = func() { return random() }
and then in tests:
randomStringGenFn = func() { return "abc"}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would +100 for getting rid of it. maybe do it separately?
@@ -64,6 +65,7 @@ type ResourceManager struct { | |||
scheduledWorkflowClient scheduledworkflowclient.ScheduledWorkflowInterface | |||
time util.TimeInterface | |||
uuid util.UUIDGeneratorInterface | |||
randomString util.RandomStringInterface |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
format
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
/test kubeflow-pipeline-e2e-test |
backend/src/apiserver/client/pod.go
Outdated
return clientSet.CoreV1().Pods(namespace), nil | ||
} | ||
|
||
// creates a new client for the Kubernetes pod. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: CreatePodClientOrFatal creates....
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thanks. done
backend/src/apiserver/client/pod.go
Outdated
"time" | ||
) | ||
|
||
func CreatePodClient(namespace string) (v1.PodInterface, error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this be public?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
made it private.
} | ||
|
||
// creates a new client for the Kubernetes pod. | ||
func CreatePodClientOrFatal(namespace string, initConnectionTimeout time.Duration) v1.PodInterface{ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Format the line?
Also, do we really want to die here? This kills the APIServer. Why not just return an error to the client? This is a blocking call anyway.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is called during api server startup. it implies something wrong with cluster that can't be recovered. this is what we do for other clients in the same folder too.
if err = deletePods(podsToDelete, newWorkflow.ObjectMeta.Namespace); err != nil { | ||
return util.NewInternalServerError(err, "Retry run failed. Failed to clean up the failed pods from previous run.") | ||
} | ||
// |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Commented code?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sorry it was not ready yet. it's ready now
@@ -0,0 +1,79 @@ | |||
package resource |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we need this? Client side mocks are really difficult to maintain.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is following the same as other fakes in the same folder. It's used for testing the Retry logic.
@@ -49,7 +49,11 @@ func (r MetricsReporter) ReportMetrics(workflow *util.Workflow) error { | |||
if workflow.Status.Nodes == nil { | |||
return nil | |||
} | |||
runID := string(workflow.UID) | |||
if _, ok := workflow.ObjectMeta.Labels[util.LabelKeyWorkflowRunId]; !ok { | |||
// Skip reporting if the workflow doesn't have the run id label |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When does this happen?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's unlikely going to happen. It only happen if someone delibrartly call this api with a workflow that's not created by KFP.
/assign @neuromage @Ark-kun |
@hongye-sun @neuromage @Ark-kun This PR is ready for another round of review |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
/lgtm
Thanks @IronPan!
/approve |
[APPROVALNOTIFIER] This PR is APPROVED This pull-request has been approved by: IronPan The full list of commands accepted by this bot can be found here. The pull request process is described here
Needs approval from an approver in each of these files:
Approvers can indicate their approval by writing |
/test kubeflow-pipeline-sample-test |
/lgtm |
* Update Alibi Explainers to 0.6.0 * fix lint * Update shap to 0.39.0 * Fix tests * update explainer tabular test
This change will add a new API for rerunning a failed pipeline without repeating the succeeded steps.
Instead of using argo workflow UID as the run ID, create the run id from API server, and mark the workflow with that run ID as label. Change persistence agent to lookup and sync run keyed by that ID label. This is due to the fact that when recreating an argo workflow, the workflow UID won't be same, it's no longer the unique identifier for a run.
Replace {{workflow.id}} with run id. {{workflow.id}} is argo specific and should be implementation details for KFP. See here for another work to hide this SDK - Hiding Argo's workflow.uid placeholder behind DSL #1683
/assign @hongye-sun
This change is