Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(controller): dehydrate workflow before deleting offloaded node status #6112

Merged
merged 1 commit into from
Jun 21, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion persist/sqldb/explosive_offload_node_status_repo.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,6 @@ func (n *explosiveOffloadNodeStatusRepo) Delete(string, string) error {
return OffloadNotSupportedError
}

func (n *explosiveOffloadNodeStatusRepo) ListOldOffloads(string) ([]UUIDVersion, error) {
func (n *explosiveOffloadNodeStatusRepo) ListOldOffloads(string) (map[string][]string, error) {
return nil, OffloadNotSupportedError
}
8 changes: 4 additions & 4 deletions persist/sqldb/mocks/OffloadNodeStatusRepo.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 7 additions & 3 deletions persist/sqldb/offload_node_status_repo.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ type OffloadNodeStatusRepo interface {
Save(uid, namespace string, nodes wfv1.Nodes) (string, error)
Get(uid, version string) (wfv1.Nodes, error)
List(namespace string) (map[UUIDVersion]wfv1.Nodes, error)
ListOldOffloads(namespace string) ([]UUIDVersion, error)
ListOldOffloads(namespace string) (map[string][]string, error)
Delete(uid, version string) error
IsEnabled() bool
}
Expand Down Expand Up @@ -178,7 +178,7 @@ func (wdc *nodeOffloadRepo) List(namespace string) (map[UUIDVersion]wfv1.Nodes,
return res, nil
}

func (wdc *nodeOffloadRepo) ListOldOffloads(namespace string) ([]UUIDVersion, error) {
func (wdc *nodeOffloadRepo) ListOldOffloads(namespace string) (map[string][]string, error) {
log.WithFields(log.Fields{"namespace": namespace}).Debug("Listing old offloaded nodes")
var records []UUIDVersion
err := wdc.session.
Expand All @@ -191,7 +191,11 @@ func (wdc *nodeOffloadRepo) ListOldOffloads(namespace string) ([]UUIDVersion, er
if err != nil {
return nil, err
}
return records, nil
x := make(map[string][]string)
for _, r := range records {
x[r.UID] = append(x[r.UID], r.Version)
}
return x, nil
}

func (wdc *nodeOffloadRepo) Delete(uid, version string) error {
Expand Down
71 changes: 47 additions & 24 deletions workflow/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,7 @@ var indexers = cache.Indexers{
indexes.SemaphoreConfigIndexName: indexes.WorkflowSemaphoreKeysIndexFunc(),
indexes.WorkflowPhaseIndex: indexes.MetaWorkflowPhaseIndexFunc(),
indexes.ConditionsIndex: indexes.ConditionsIndexFunc,
indexes.UIDIndex: indexes.MetaUIDFunc,
}

// Run starts an Workflow resource controller
Expand Down Expand Up @@ -553,37 +554,59 @@ func (wfc *WorkflowController) workflowGarbageCollector(stopCh <-chan struct{})
log.WithField("err", err).Error("Failed to list old offloaded nodes")
continue
}
if len(oldRecords) == 0 {
log.Info("Zero old offloads, nothing to do")
continue
}
// get every lives workflow (1000s) into a map
liveOffloadNodeStatusVersions := make(map[types.UID]string)
workflows, err := util.NewWorkflowLister(wfc.wfInformer).List()
if err != nil {
log.WithField("err", err).Error("Failed to list incomplete workflows")
continue
}
for _, wf := range workflows {
// this could be the empty string - as it is no longer offloaded
liveOffloadNodeStatusVersions[wf.UID] = wf.Status.OffloadNodeStatusVersion
}
log.WithFields(log.Fields{"len_wfs": len(liveOffloadNodeStatusVersions), "len_old_offloads": len(oldRecords)}).Info("Deleting old offloads that are not live")
for _, record := range oldRecords {
// this could be empty string
nodeStatusVersion, ok := liveOffloadNodeStatusVersions[types.UID(record.UID)]
if !ok || nodeStatusVersion != record.Version {
err := wfc.offloadNodeStatusRepo.Delete(record.UID, record.Version)
if err != nil {
log.WithField("err", err).Error("Failed to delete offloaded nodes")
}
log.WithField("len_wfs", len(oldRecords)).Info("Deleting old offloads that are not live")
for uid, versions := range oldRecords {
if err := wfc.deleteOffloadedNodesForWorkflow(uid, versions); err != nil {
log.WithError(err).WithField("uid", uid).Error("Failed to delete old offloaded nodes")
}
}
log.Info("Workflow GC finished")
}
}
}
}

func (wfc *WorkflowController) deleteOffloadedNodesForWorkflow(uid string, versions []string) error {
workflows, err := wfc.wfInformer.GetIndexer().ByIndex(indexes.UIDIndex, uid)
if err != nil {
return err
}
var wf *wfv1.Workflow
switch l := len(workflows); l {
case 0:
log.WithField("uid", uid).Info("Workflow missing, probably deleted")
case 1:
un := workflows[0].(*unstructured.Unstructured)
wf, err = util.FromUnstructured(un)
if err != nil {
return err
}
key := wf.ObjectMeta.Namespace + "/" + wf.ObjectMeta.Name
wfc.workflowKeyLock.Lock(key)
defer wfc.workflowKeyLock.Unlock(key)
// workflow might still be hydrated
if wfc.hydrator.IsHydrated(wf) {
log.WithField("uid", wf.UID).Info("Hydrated workflow encountered")
err = wfc.hydrator.Dehydrate(wf)
if err != nil {
return err
}
}
default:
return fmt.Errorf("expected no more than 1 workflow, got %d", l)
}
for _, version := range versions {
// skip delete if offload is live
if wf != nil && wf.Status.OffloadNodeStatusVersion == version {
continue
}
if err := wfc.offloadNodeStatusRepo.Delete(uid, version); err != nil {
return err
}
}
return nil
}

func (wfc *WorkflowController) archivedWorkflowGarbageCollector(stopCh <-chan struct{}) {
defer runtimeutil.HandleCrash(runtimeutil.PanicHandlers...)

Expand Down
1 change: 1 addition & 0 deletions workflow/controller/indexes/indexes.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,5 @@ const (
PodPhaseIndex = "pod.phase"
ConditionsIndex = "status.conditions"
SemaphoreConfigIndexName = "bySemaphoreConfigMap"
UIDIndex = "uid"
)
14 changes: 14 additions & 0 deletions workflow/controller/indexes/uid_index.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package indexes

import (
"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/client-go/tools/cache"
)

var MetaUIDFunc cache.IndexFunc = func(obj interface{}) ([]string, error) {
v, err := meta.Accessor(obj)
if err != nil {
return nil, nil
}
return []string{string(v.GetUID())}, nil
}