-
Notifications
You must be signed in to change notification settings - Fork 3.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Error running 1000s of tasks: "etcdserver: request is too large" #1186 #1264
Changes from all commits
cb0ab97
b365648
0af9f58
862307a
be7603f
9bedb33
1195457
c7b3d1c
835373f
5f51681
465a729
bac7d30
64f8e0c
ff7ddca
ec3cc51
fd8d8f7
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -477,6 +477,9 @@ type WorkflowStatus struct { | |
// A human readable message indicating details about why the workflow is in this condition. | ||
Message string `json:"message,omitempty"` | ||
|
||
// Compressed and base64 decoded Nodes map | ||
CompressedNodes string `json:"compressedNodes,omitempty"` | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please add comment There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please add spaces after all |
||
|
||
// Nodes is a mapping between a node ID and the node's status. | ||
Nodes map[string]NodeStatus `json:"nodes,omitempty"` | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,97 @@ | ||
package file | ||
|
||
import ( | ||
"archive/tar" | ||
"bytes" | ||
"compress/gzip" | ||
"encoding/base64" | ||
"io" | ||
"io/ioutil" | ||
"os" | ||
"strings" | ||
|
||
log "github.com/sirupsen/logrus" | ||
) | ||
|
||
// IsFileOrDirExistInGZip return true if file or directory exists in GZip file | ||
func IsFileOrDirExistInGZip(sourcePath string, gzipFilePath string) bool { | ||
|
||
fi, err := os.Open(gzipFilePath) | ||
|
||
if os.IsNotExist(err) { | ||
return false | ||
} | ||
defer close(fi) | ||
|
||
fz, err := gzip.NewReader(fi) | ||
if err != nil { | ||
return false | ||
} | ||
tr := tar.NewReader(fz) | ||
for { | ||
hdr, err := tr.Next() | ||
if err == io.EOF { | ||
break | ||
} | ||
if err != nil { | ||
|
||
return false | ||
} | ||
if hdr.FileInfo().IsDir() && strings.Contains(strings.Trim(hdr.Name, "/"), strings.Trim(sourcePath, "/")) { | ||
return true | ||
} | ||
if strings.Contains(sourcePath, hdr.Name) && hdr.Size > 0 { | ||
return true | ||
} | ||
} | ||
return false | ||
} | ||
|
||
//Close the file | ||
func close(f io.Closer) { | ||
err := f.Close() | ||
if err != nil { | ||
log.Warnf("Failed to close the file/writer/reader. %v", err) | ||
} | ||
} | ||
|
||
// CompressEncodeString will return the compressed string with base64 encoded | ||
func CompressEncodeString(content string) string { | ||
return base64.StdEncoding.EncodeToString(CompressContent([]byte(content))) | ||
} | ||
|
||
// DecodeDecompressString will return decode and decompress the | ||
func DecodeDecompressString(content string) (string, error) { | ||
|
||
buf, err := base64.StdEncoding.DecodeString(content) | ||
if err != nil { | ||
return "", err | ||
} | ||
dBuf, err := DecompressContent(buf) | ||
if err != nil { | ||
return "", err | ||
} | ||
return string(dBuf), nil | ||
} | ||
|
||
// CompressContent will compress the byte array using zip writer | ||
func CompressContent(content []byte) []byte { | ||
var buf bytes.Buffer | ||
zipWriter := gzip.NewWriter(&buf) | ||
|
||
_, err := zipWriter.Write(content) | ||
if err != nil { | ||
log.Warnf("Error in compressing: %v", err) | ||
} | ||
close(zipWriter) | ||
return buf.Bytes() | ||
} | ||
|
||
// DecompressContent will return the uncompressed content | ||
func DecompressContent(content []byte) ([]byte, error) { | ||
|
||
buf := bytes.NewReader(content) | ||
gZipReader, _ := gzip.NewReader(buf) | ||
defer close(gZipReader) | ||
return ioutil.ReadAll(gZipReader) | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,21 @@ | ||
package file | ||
|
||
import ( | ||
"testing" | ||
|
||
"github.com/stretchr/testify/assert" | ||
) | ||
|
||
// TestResubmitWorkflowWithOnExit ensures we do not carry over the onExit node even if successful | ||
func TestCompressContentString(t *testing.T) { | ||
content := "{\"pod-limits-rrdm8-591645159\":{\"id\":\"pod-limits-rrdm8-591645159\",\"name\":\"pod-limits-rrdm8[0]." + | ||
"run-pod(0:0)\",\"displayName\":\"run-pod(0:0)\",\"type\":\"Pod\",\"templateName\":\"run-pod\",\"phase\":" + | ||
"\"Succeeded\",\"boundaryID\":\"pod-limits-rrdm8\",\"startedAt\":\"2019-03-07T19:14:50Z\",\"finishedAt\":" + | ||
"\"2019-03-07T19:14:55Z\"}}" | ||
|
||
compString := CompressEncodeString(content) | ||
|
||
resultString, _ := DecodeDecompressString(compString) | ||
|
||
assert.Equal(t, content, resultString) | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -24,6 +24,7 @@ import ( | |
"github.com/argoproj/argo/errors" | ||
wfv1 "github.com/argoproj/argo/pkg/apis/workflow/v1alpha1" | ||
"github.com/argoproj/argo/pkg/client/clientset/versioned/typed/workflow/v1alpha1" | ||
"github.com/argoproj/argo/util/file" | ||
"github.com/argoproj/argo/util/retry" | ||
"github.com/argoproj/argo/workflow/common" | ||
"github.com/argoproj/argo/workflow/util" | ||
|
@@ -72,6 +73,9 @@ var ( | |
// for before requeuing the workflow onto the workqueue. | ||
const maxOperationTime time.Duration = 10 * time.Second | ||
|
||
//maxWorkflowSize is the maximum size for workflow.yaml | ||
const maxWorkflowSize int = 1024 * 1024 | ||
|
||
// newWorkflowOperationCtx creates and initializes a new wfOperationCtx object. | ||
func newWorkflowOperationCtx(wf *wfv1.Workflow, wfc *WorkflowController) *wfOperationCtx { | ||
// NEVER modify objects from the store. It's a read-only, local cache. | ||
|
@@ -275,9 +279,17 @@ func (woc *wfOperationCtx) persistUpdates() { | |
return | ||
} | ||
wfClient := woc.controller.wfclientset.ArgoprojV1alpha1().Workflows(woc.wf.ObjectMeta.Namespace) | ||
_, err := wfClient.Update(woc.wf) | ||
err := woc.checkAndCompress() | ||
if err != nil { | ||
woc.log.Warnf("Error updating workflow: %v", err) | ||
woc.log.Warnf("Error compressing workflow: %v", err) | ||
} | ||
if woc.wf.Status.CompressedNodes != "" { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Maybe we should remove this line like this
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thanks updated |
||
woc.wf.Status.Nodes = nil | ||
} | ||
|
||
_, err = wfClient.Update(woc.wf) | ||
if err != nil { | ||
woc.log.Warnf("Error updating workflow: %v %s", err, apierr.ReasonForError(err)) | ||
if argokubeerr.IsRequestEntityTooLargeErr(err) { | ||
woc.persistWorkflowSizeLimitErr(wfClient, err) | ||
return | ||
|
@@ -450,11 +462,24 @@ func (woc *wfOperationCtx) podReconciliation() error { | |
} | ||
|
||
for _, pod := range podList.Items { | ||
origNodeStatus := *woc.wf.Status.DeepCopy() | ||
performAssessment(&pod) | ||
err = woc.applyExecutionControl(&pod) | ||
if err != nil { | ||
woc.log.Warnf("Failed to apply execution control to pod %s", pod.Name) | ||
} | ||
err = woc.checkAndCompress() | ||
if err != nil { | ||
woc.wf.Status = origNodeStatus | ||
nodeNameForPod := pod.Annotations[common.AnnotationKeyNodeName] | ||
woc.log.Warnf("%v", err) | ||
woc.markNodeErrorClearOuput(nodeNameForPod, err) | ||
err = woc.checkAndCompress() | ||
if err != nil { | ||
woc.markWorkflowError(err, true) | ||
} | ||
} | ||
|
||
} | ||
|
||
// Now check for deleted pods. Iterate our nodes. If any one of our nodes does not show up in | ||
|
@@ -1138,6 +1163,14 @@ func (woc *wfOperationCtx) markNodePhase(nodeName string, phase wfv1.NodePhase, | |
return node | ||
} | ||
|
||
// markNodeErrorClearOuput is a convenience method to mark a node with an error and clear the output | ||
func (woc *wfOperationCtx) markNodeErrorClearOuput(nodeName string, err error) *wfv1.NodeStatus { | ||
nodeStatus := woc.markNodeError(nodeName, err) | ||
nodeStatus.Outputs = nil | ||
woc.wf.Status.Nodes[nodeStatus.ID] = *nodeStatus | ||
return nodeStatus | ||
} | ||
|
||
// markNodeError is a convenience method to mark a node with an error and set the message from the error | ||
func (woc *wfOperationCtx) markNodeError(nodeName string, err error) *wfv1.NodeStatus { | ||
return woc.markNodePhase(nodeName, wfv1.NodeError, err.Error()) | ||
|
@@ -1576,3 +1609,61 @@ func expandSequence(seq *wfv1.Sequence) ([]wfv1.Item, error) { | |
} | ||
return items, nil | ||
} | ||
|
||
// getSize return the entire workflow json string size | ||
func (woc *wfOperationCtx) getSize() int { | ||
nodeContent, err := json.Marshal(woc.wf) | ||
if err != nil { | ||
return -1 | ||
} | ||
|
||
compressNodeSize := len(woc.wf.Status.CompressedNodes) | ||
|
||
if compressNodeSize > 0 { | ||
nodeStatus, err := json.Marshal(woc.wf.Status.Nodes) | ||
if err != nil { | ||
return -1 | ||
} | ||
return len(nodeContent) - len(nodeStatus) | ||
} | ||
return len(nodeContent) | ||
} | ||
|
||
// checkAndCompress will check the workflow size and compress node status if total workflow size is more than maxWorkflowSize. | ||
// The compressed content will be assign to compressedNodes element and clear the nodestatus map. | ||
func (woc *wfOperationCtx) checkAndCompress() error { | ||
|
||
if woc.wf.Status.CompressedNodes != "" || (woc.wf.Status.CompressedNodes == "" && woc.getSize() >= maxWorkflowSize) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can we simplify this logic a little bit? As I understand Instead please change logic to ensure these two fields never set at the same time: checkAndDecompress should set Nodes field and immediately remove CompressedNodes; checkAndCompress should immediately remove Nodes after compressing There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This function gets called in two places. one is the final workflow save in persistupdate(). Another place is in podReconciliation during the workflow execution to check the output of each node fit into the size. In this scenario Nodes and CompressedNodes both will co-exist. |
||
|
||
nodeContent, err := json.Marshal(woc.wf.Status.Nodes) | ||
if err != nil { | ||
return errors.InternalWrapError(err) | ||
} | ||
buff := string(nodeContent) | ||
woc.wf.Status.CompressedNodes = file.CompressEncodeString(buff) | ||
|
||
} | ||
if woc.wf.Status.CompressedNodes != "" && woc.getSize() >= maxWorkflowSize { | ||
return errors.InternalError(fmt.Sprintf("Workflow is longer than maximum allowed size. Size=%d", woc.getSize())) | ||
} | ||
return nil | ||
} | ||
|
||
// checkAndDecompress will decompress the compressednode and assign to workflow.status.nodes map. | ||
func (woc *wfOperationCtx) checkAndDecompress() error { | ||
if woc.wf.Status.CompressedNodes != "" { | ||
nodeContent, err := file.DecodeDecompressString(woc.wf.Status.CompressedNodes) | ||
if err != nil { | ||
return errors.InternalWrapError(err) | ||
} | ||
var tempNodes map[string]wfv1.NodeStatus | ||
|
||
err = json.Unmarshal([]byte(nodeContent), &tempNodes) | ||
if err != nil { | ||
woc.log.Warn(err) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Did you mean There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. updated |
||
return err | ||
} | ||
woc.wf.Status.Nodes = tempNodes | ||
} | ||
return nil | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The
status.nodes
field is used if user runsargo list -o=wide
: https://github.com/argoproj/argo/blob/master/cmd/argo/commands/list.go#L137Also nodes field is accessed in
argo logs
: https://github.com/argoproj/argo/blob/master/cmd/argo/commands/logs.go#L139I would suggest to add method
GetNodes()
toWorkflowStatus
structure which does same asCheckAndDecompress
and returns nodes and use it everywhere in CLI instead ofStatus.Nodes