Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(executor): Minimize the number of Kubernetes API requests made by executors #4954

Merged
merged 60 commits into from
Feb 12, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
60 commits
Select commit Hold shift + click to select a range
f84fb1a
feat(executor): Remove need for watch
alexec Jan 31, 2021
9a6a1de
feat(executor): Remove need for watch
alexec Jan 31, 2021
64862b4
test: More tests
alexec Feb 1, 2021
c04090d
test: More tests
alexec Feb 1, 2021
e614705
Merge branch 'more-tests' into now
alexec Feb 1, 2021
146a599
test: More tests
alexec Feb 1, 2021
964db59
Merge branch 'more-tests' into now
alexec Feb 1, 2021
b7785e0
feat(executor): Remove need for watch
alexec Feb 1, 2021
4e8736f
Merge branch 'master' into now
alexec Feb 2, 2021
e1ea2e9
feat(executor): Remove need for watch
alexec Feb 2, 2021
0cd51e1
feat(executor): Log summary of Kubernetes API requests
alexec Feb 2, 2021
d2400a6
remove TODO
alexec Feb 2, 2021
b34ce98
feat(executor): Remove need for watch
alexec Feb 2, 2021
a786ffa
feat(executor): Log summary of Kubernetes API requests
alexec Feb 2, 2021
80826b0
Merge branch 'log-k8s' into now
alexec Feb 2, 2021
a445964
must be 1 not zero
alexec Feb 2, 2021
fa0b467
test: Fix TestDeletingRunningPod
alexec Feb 2, 2021
fa36044
test: Fix TestDeletingRunningPod
alexec Feb 2, 2021
6b55b95
Merge branch 'master' into log-k8s
alexec Feb 2, 2021
e3ddc51
Merge branch 'fix-test' into log-k8s
alexec Feb 2, 2021
a019630
Merge branch 'log-k8s' into now
alexec Feb 2, 2021
c879e13
Merge branch 'master' into now
alexec Feb 2, 2021
a18332f
feat(executor): Log summary of Kubernetes API requests
alexec Feb 2, 2021
ef05ccd
Merge branch 'log-k8s' into now
alexec Feb 2, 2021
14842ab
there might not be any sidecars
alexec Feb 2, 2021
50d063e
also update stress test argoexec
alexec Feb 2, 2021
9668016
feat(executor): Remove need for watch
alexec Feb 2, 2021
37fa9ab
feat(executor): Remove need for watch
alexec Feb 2, 2021
8e54372
Merge branch 'master' into now
alexec Feb 2, 2021
42a0181
feat(executor): Remove need for watch
alexec Feb 2, 2021
51ff45e
feat(executor): Remove need for watch
alexec Feb 2, 2021
69baf76
feat(executor): Remove need for watch
alexec Feb 3, 2021
17c53a3
Merge branch 'master' into log-k8s
alexec Feb 3, 2021
625e4e0
feat(executor): Log summary of Kubernetes API requests
alexec Feb 3, 2021
284e9a4
feat(executor): Log summary of Kubernetes API requests
alexec Feb 3, 2021
cfeb9a5
Merge branch 'master' into log-k8s
alexec Feb 3, 2021
8657824
Merge branch 'master' into now
alexec Feb 3, 2021
95dd6c4
Merge branch 'log-k8s' into now
alexec Feb 3, 2021
a3a60fc
Merge branch 'master' into now
alexec Feb 5, 2021
643d0bc
ignore build images
alexec Feb 5, 2021
5cb9570
feat(executor): Remove need for watch
alexec Feb 5, 2021
c214f8e
Merge branch 'master' into now
alexec Feb 5, 2021
1da3726
feat(executor): Remove need for watch
alexec Feb 5, 2021
ec5246e
feat(executor): Remove need for watch
alexec Feb 8, 2021
6a56fe2
feat(executor): Remove need for watch
alexec Feb 8, 2021
4b62841
feat(executor): Remove need for watch
alexec Feb 8, 2021
ea6f832
Merge branch 'master' into now
alexec Feb 8, 2021
a5ae663
feat(executor): Remove need for watch
alexec Feb 9, 2021
42d86ac
feat(executor): Remove need for watch
alexec Feb 9, 2021
fefed4c
feat(executor): Remove need for watch
alexec Feb 9, 2021
f1ac596
feat(executor): Remove need for watch
alexec Feb 9, 2021
560347d
ignore test
alexec Feb 9, 2021
394a721
ignore test
alexec Feb 9, 2021
94a778b
Merge branch 'master' into now
alexec Feb 10, 2021
2c87588
feat(executor): Remove need for watch
alexec Feb 10, 2021
ca47612
feat(executor): Remove need for watch
alexec Feb 10, 2021
dbb20c4
Merge branch 'master' into now
alexec Feb 10, 2021
b6edcc8
Merge branch 'master' into now
alexec Feb 12, 2021
a98f414
feat(executor): Remove need for watch
alexec Feb 12, 2021
48d86c3
Merge branch 'master' into now
alexec Feb 12, 2021
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/ci-build.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ jobs:
- name: Pre-pull images
env:
GOPATH: /home/runner/go
run: make pull-build-images test-images &
run: make test-images &
- name: Create Kubeconfig
run: |
mkdir -p ~/.kube
Expand Down
6 changes: 1 addition & 5 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -383,7 +383,7 @@ install: $(MANIFESTS) $(E2E_MANIFESTS) dist/kustomize
kubectl get ns $(KUBE_NAMESPACE) || kubectl create ns $(KUBE_NAMESPACE)
kubectl config set-context --current --namespace=$(KUBE_NAMESPACE)
@echo "installing PROFILE=$(PROFILE) VERSION=$(VERSION), E2E_EXECUTOR=$(E2E_EXECUTOR)"
dist/kustomize build --load_restrictor=none test/e2e/manifests/$(PROFILE) | sed 's/image: argoproj/image: $(IMAGE_NAMESPACE)/' | sed 's/:latest/:$(VERSION)/' | sed 's/pns/$(E2E_EXECUTOR)/' | kubectl -n $(KUBE_NAMESPACE) apply -f -
dist/kustomize build --load_restrictor=none test/e2e/manifests/$(PROFILE) | sed 's/argoproj\//$(IMAGE_NAMESPACE)\//' | sed 's/:latest/:$(VERSION)/' | sed 's/pns/$(E2E_EXECUTOR)/' | kubectl -n $(KUBE_NAMESPACE) apply -f -
kubectl -n $(KUBE_NAMESPACE) apply -f test/stress/massive-workflow.yaml
kubectl -n $(KUBE_NAMESPACE) rollout restart deploy workflow-controller
kubectl -n $(KUBE_NAMESPACE) rollout restart deploy argo-server
Expand All @@ -394,10 +394,6 @@ ifeq ($(RUN_MODE),kubernetes)
kubectl -n $(KUBE_NAMESPACE) scale deploy/argo-server --replicas 1
endif

.PHONY: pull-build-images
pull-build-images:
./hack/pull-build-images.sh

.PHONY: argosay
argosay: test/e2e/images/argosay/v2/argosay
cd test/e2e/images/argosay/v2 && docker build . -t argoproj/argosay:v2
Expand Down
6 changes: 3 additions & 3 deletions cmd/argoexec/commands/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,11 +101,11 @@ func initExecutor() *executor.WorkflowExecutor {
case common.ContainerRuntimeExecutorK8sAPI:
cre, err = k8sapi.NewK8sAPIExecutor(clientset, config, podName, namespace)
case common.ContainerRuntimeExecutorKubelet:
cre, err = kubelet.NewKubeletExecutor()
cre, err = kubelet.NewKubeletExecutor(namespace, podName)
case common.ContainerRuntimeExecutorPNS:
cre, err = pns.NewPNSExecutor(clientset, podName, namespace, tmpl.Outputs.HasOutputs())
cre, err = pns.NewPNSExecutor(clientset, podName, namespace)
default:
cre, err = docker.NewDockerExecutor()
cre, err = docker.NewDockerExecutor(namespace, podName)
}
checkErr(err)

Expand Down
9 changes: 9 additions & 0 deletions pkg/apis/workflow/v1alpha1/workflow_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -621,6 +621,15 @@ func (tmpl *Template) HasPodSpecPatch() bool {
return tmpl.PodSpecPatch != ""
}

func (tmpl *Template) GetSidecarNames() []string {
var containerNames []string
for _, s := range tmpl.Sidecars {
containerNames = append(containerNames, s.Name)
}
return containerNames

}

type Artifacts []Artifact

func (a Artifacts) GetArtifactByName(name string) *Artifact {
Expand Down
9 changes: 9 additions & 0 deletions pkg/apis/workflow/v1alpha1/workflow_types_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -610,3 +610,12 @@ func TestWorkflow_GetSemaphoreKeys(t *testing.T) {
assert.Contains(keys, "test/template")
assert.Contains(keys, "test/template1")
}

func TestTemplate_GetSidecarNames(t *testing.T) {
m := &Template{
Sidecars: []UserContainer{
{Container: corev1.Container{Name: "sidecar-0"}},
},
}
assert.ElementsMatch(t, []string{"sidecar-0"}, m.GetSidecarNames())
}
13 changes: 8 additions & 5 deletions test/e2e/signals_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@ import (
"github.com/argoproj/argo-workflows/v3/test/e2e/fixtures"
)

// Tests the use of signals to kill containers.
// argoproj/argosay:v2 does not contain sh, so you must use argoproj/argosay:v1.
// Killing often requires SIGKILL, which is issued 30s after SIGTERM. So tests need longer (>30s) timeout.
type SignalsSuite struct {
fixtures.E2ESuite
}
Expand All @@ -37,10 +40,10 @@ func (s *SignalsSuite) TestStopBehavior() {
WaitForWorkflow(1 * time.Minute).
Then().
ExpectWorkflow(func(t *testing.T, m *metav1.ObjectMeta, status *wfv1.WorkflowStatus) {
assert.Equal(t, wfv1.WorkflowFailed, status.Phase)
assert.Contains(t, []wfv1.WorkflowPhase{wfv1.WorkflowFailed, wfv1.WorkflowError}, status.Phase)
nodeStatus := status.Nodes.FindByDisplayName("A")
if assert.NotNil(t, nodeStatus) {
assert.Equal(t, wfv1.NodeFailed, nodeStatus.Phase)
assert.Contains(t, []wfv1.NodePhase{wfv1.NodeFailed, wfv1.NodeError}, nodeStatus.Phase)
}
nodeStatus = status.Nodes.FindByDisplayName("A.onExit")
if assert.NotNil(t, nodeStatus) {
Expand All @@ -66,10 +69,10 @@ func (s *SignalsSuite) TestTerminateBehavior() {
WaitForWorkflow(1 * time.Minute).
Then().
ExpectWorkflow(func(t *testing.T, m *metav1.ObjectMeta, status *wfv1.WorkflowStatus) {
assert.Equal(t, wfv1.WorkflowFailed, status.Phase)
assert.Contains(t, []wfv1.WorkflowPhase{wfv1.WorkflowFailed, wfv1.WorkflowError}, status.Phase)
nodeStatus := status.Nodes.FindByDisplayName("A")
if assert.NotNil(t, nodeStatus) {
assert.Equal(t, wfv1.NodeFailed, nodeStatus.Phase)
assert.Contains(t, []wfv1.NodePhase{wfv1.NodeFailed, wfv1.NodeError}, nodeStatus.Phase)
}
nodeStatus = status.Nodes.FindByDisplayName("A.onExit")
assert.Nil(t, nodeStatus)
Expand Down Expand Up @@ -133,7 +136,7 @@ spec:
WaitForWorkflow(1 * time.Minute).
Then().
ExpectWorkflow(func(t *testing.T, _ *metav1.ObjectMeta, status *wfv1.WorkflowStatus) {
assert.Equal(t, wfv1.WorkflowFailed, status.Phase)
assert.Contains(t, []wfv1.WorkflowPhase{wfv1.WorkflowFailed, wfv1.WorkflowError}, status.Phase)
assert.Len(t, status.Nodes, 3)
node := status.Nodes.FindByDisplayName("retry-backoff-2(1)")
if assert.NotNil(t, node) {
Expand Down
10 changes: 7 additions & 3 deletions test/e2e/testdata/sidecar-workflow.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,12 @@ spec:
templates:
- name: main
container:
image: argoproj/argosay:v2
image: argoproj/argosay:v1
args: [ sleep, "5s" ]
sidecars:
- name: sidecar-0
image: argoproj/argosay:v2
argso: [ sleep, "999" ]
image: argoproj/argosay:v1
args: [ sleep, "999s" ]
- name: sidecar-1
image: argoproj/argosay:v1
args: [ sleep, "999s" ]
3 changes: 2 additions & 1 deletion test/stress/tool/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
_ "k8s.io/client-go/plugin/pkg/client/auth"
"k8s.io/client-go/tools/clientcmd"
"sigs.k8s.io/yaml"

Expand Down Expand Up @@ -45,7 +46,7 @@ spec:
}

ctx := context.Background()
for i := 0; i < 100; i++ {
for i := 0; i < 500; i++ {
_, err := w.Create(ctx, wf, metav1.CreateOptions{})
if err != nil {
panic(err)
Expand Down
2 changes: 2 additions & 0 deletions workflow/common/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,8 @@ const (

// EnvVarPodName contains the name of the pod (currently unused)
EnvVarPodName = "ARGO_POD_NAME"
// EnvVarContainerName container the container's name for the current pod
EnvVarContainerName = "ARGO_CONTAINER_NAME"
// EnvVarContainerRuntimeExecutor contains the name of the container runtime executor to use, empty is equal to "docker"
EnvVarContainerRuntimeExecutor = "ARGO_CONTAINER_RUNTIME_EXECUTOR"
// EnvVarDownwardAPINodeIP is the envvar used to get the `status.hostIP`
Expand Down
120 changes: 37 additions & 83 deletions workflow/controller/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -1272,110 +1272,64 @@ func inferFailedReason(pod *apiv1.Pod) (wfv1.NodePhase, string) {
// We only get one message to set for the overall node status.
// If multiple containers failed, in order of preference:
// init, main (annotated), main (exit code), wait, sidecars
for _, ctr := range pod.Status.InitContainerStatuses {
// Virtual Kubelet environment will not set the terminate on waiting container
// https://github.com/argoproj/argo-workflows/issues/3879
// https://github.com/virtual-kubelet/virtual-kubelet/blob/7f2a02291530d2df14905702e6d51500dd57640a/node/sync.go#L195-L208
if ctr.State.Waiting != nil {
return wfv1.NodeError, fmt.Sprintf("Pod failed before %s container starts", ctr.Name)
order := func(n string) int {
order, ok := map[string]int{
common.InitContainerName: 0,
common.MainContainerName: 1,
common.WaitContainerName: 2,
}[n]
if ok {
return order
}
if ctr.State.Terminated == nil {
// We should never get here
log.Warnf("Pod %s phase was Failed but %s did not have terminated state", pod.ObjectMeta.Name, ctr.Name)
continue
}
if ctr.State.Terminated.ExitCode == 0 {
continue
}
errMsg := "failed to load artifacts"
for _, msg := range []string{annotatedMsg, ctr.State.Terminated.Message} {
if msg != "" {
errMsg += ": " + msg
break
}
}
// NOTE: we consider artifact load issues as Error instead of Failed
return wfv1.NodeError, errMsg
return 3
}
failMessages := make(map[string]string)
for _, ctr := range pod.Status.ContainerStatuses {

ctrs := append(pod.Status.InitContainerStatuses, pod.Status.ContainerStatuses...)
sort.Slice(ctrs, func(i, j int) bool { return order(ctrs[i].Name) < order(ctrs[j].Name) })

for _, ctr := range ctrs {

// Virtual Kubelet environment will not set the terminate on waiting container
// https://github.com/argoproj/argo-workflows/issues/3879
// https://github.com/virtual-kubelet/virtual-kubelet/blob/7f2a02291530d2df14905702e6d51500dd57640a/node/sync.go#L195-L208

if ctr.State.Waiting != nil {
return wfv1.NodeError, fmt.Sprintf("Pod failed before %s container starts", ctr.Name)
}
if ctr.State.Terminated == nil {
t := ctr.State.Terminated
if t == nil {
// We should never get here
log.Warnf("Pod %s phase was Failed but %s did not have terminated state", pod.ObjectMeta.Name, ctr.Name)
continue
}
if ctr.State.Terminated.ExitCode == 0 {
continue
}
if ctr.State.Terminated.Message == "" && ctr.State.Terminated.Reason == "OOMKilled" {
failMessages[ctr.Name] = ctr.State.Terminated.Reason
continue
}
if ctr.Name == common.WaitContainerName {
errDetails := ""
for _, msg := range []string{annotatedMsg, ctr.State.Terminated.Message} {
if msg != "" {
errDetails = msg
break
}
}
if errDetails == "" {
// executor is expected to annotate a message to the pod upon any errors.
// If we failed to see the annotated message, it is likely the pod ran with
// insufficient privileges. Give a hint to that effect.
errDetails = fmt.Sprintf("verify serviceaccount %s:%s has necessary privileges", pod.ObjectMeta.Namespace, pod.Spec.ServiceAccountName)
}
errMsg := fmt.Sprintf("failed to save outputs: %s", errDetails)
failMessages[ctr.Name] = errMsg
log.Warnf("Pod %s phase was Failed but %s did not have terminated state", pod.Name, ctr.Name)
continue
}
if ctr.State.Terminated.Message != "" {
errMsg := ctr.State.Terminated.Message
if ctr.Name != common.MainContainerName {
errMsg = fmt.Sprintf("sidecar '%s' %s", ctr.Name, errMsg)
}
failMessages[ctr.Name] = errMsg
if t.ExitCode == 0 {
continue
}
errMsg := fmt.Sprintf("failed with exit code %d", ctr.State.Terminated.ExitCode)
if ctr.Name != common.MainContainerName {
if ctr.State.Terminated.ExitCode == 137 || ctr.State.Terminated.ExitCode == 143 {

msg := fmt.Sprintf("exit code %d: %s; %s; %s", t.ExitCode, t.Reason, t.Message, annotatedMsg)

switch ctr.Name {
case common.InitContainerName:
return wfv1.NodeError, msg
case common.MainContainerName:
return wfv1.NodeFailed, msg
case common.WaitContainerName:
// executor is expected to annotate a message to the pod upon any errors.
// If we failed to see the annotated message, it is likely the pod ran with
// insufficient privileges. Give a hint to that effect.
return wfv1.NodeError, fmt.Sprintf("%s; verify serviceaccount %s:%s has necessary privileges", msg, pod.Namespace, pod.Spec.ServiceAccountName)
default:
if t.ExitCode == 137 || t.ExitCode == 143 {
// if the sidecar was SIGKILL'd (exit code 137) assume it was because argoexec
// forcibly killed the container, which we ignore the error for.
// Java code 143 is a normal exit 128 + 15 https://github.com/elastic/elasticsearch/issues/31847
log.Infof("Ignoring %d exit code of sidecar '%s'", ctr.State.Terminated.ExitCode, ctr.Name)
continue
log.Infof("Ignoring %d exit code of container '%s'", t.ExitCode, ctr.Name)
} else {
return wfv1.NodeFailed, msg
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this code is much cleaner now

}
errMsg = fmt.Sprintf("sidecar '%s' %s", ctr.Name, errMsg)
}
failMessages[ctr.Name] = errMsg
}
if failMsg, ok := failMessages[common.MainContainerName]; ok {
_, ok = failMessages[common.WaitContainerName]
isResourceTemplate := !ok
if isResourceTemplate && annotatedMsg != "" {
// For resource templates, we prefer the annotated message
// over the vanilla exit code 1 error
return wfv1.NodeFailed, annotatedMsg
}
return wfv1.NodeFailed, failMsg
}
if failMsg, ok := failMessages[common.WaitContainerName]; ok {
return wfv1.NodeError, failMsg
}

// If we get here, both the main and wait container succeeded. Iterate the fail messages to
// identify the sidecar which failed and return the message.
for _, failMsg := range failMessages {
return wfv1.NodeFailed, failMsg
}
// If we get here, we have detected that the main/wait containers succeed but the sidecar(s)
// were SIGKILL'd. The executor may have had to forcefully terminate the sidecar (kill -9),
// resulting in a 137 exit code (which we had ignored earlier). If failMessages is empty, it
Expand Down
2 changes: 1 addition & 1 deletion workflow/controller/operator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5439,7 +5439,7 @@ func TestPodFailureWithContainerOOM(t *testing.T) {
assert.NotNil(t, pod)
nodeStatus, msg := inferFailedReason(&pod)
assert.Equal(t, tt.phase, nodeStatus)
assert.Equal(t, msg, "OOMKilled")
assert.Contains(t, msg, "OOMKilled")
}
}

Expand Down
13 changes: 11 additions & 2 deletions workflow/controller/workflowpod.go
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,11 @@ func (woc *wfOperationCtx) createWorkflowPod(ctx context.Context, nodeName strin
}
addOutputArtifactsVolumes(pod, tmpl)

for i, c := range pod.Spec.Containers {
c.Env = append(c.Env, apiv1.EnvVar{Name: common.EnvVarContainerName, Value: c.Name}) // used to identify the container name of the process
pod.Spec.Containers[i] = c
}

// Set the container template JSON in pod annotations, which executor examines for things like
// artifact location/path.
tmplBytes, err := json.Marshal(tmpl)
Expand Down Expand Up @@ -428,13 +433,13 @@ func substitutePodParams(pod *apiv1.Pod, globalParams common.Parameters, tmpl *w

func (woc *wfOperationCtx) newInitContainer(tmpl *wfv1.Template) apiv1.Container {
ctr := woc.newExecContainer(common.InitContainerName, tmpl)
ctr.Command = []string{"argoexec", "init"}
ctr.Command = []string{"argoexec", "init", "--loglevel", getExecutorLogLevel()}
return *ctr
}

func (woc *wfOperationCtx) newWaitContainer(tmpl *wfv1.Template) (*apiv1.Container, error) {
ctr := woc.newExecContainer(common.WaitContainerName, tmpl)
ctr.Command = []string{"argoexec", "wait"}
ctr.Command = []string{"argoexec", "wait", "--loglevel", getExecutorLogLevel()}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice

switch woc.controller.GetContainerRuntimeExecutor() {
case common.ContainerRuntimeExecutorPNS:
ctr.SecurityContext = &apiv1.SecurityContext{
Expand All @@ -459,6 +464,10 @@ func (woc *wfOperationCtx) newWaitContainer(tmpl *wfv1.Template) (*apiv1.Contain
return ctr, nil
}

func getExecutorLogLevel() string {
return log.GetLevel().String()
}

// hasPrivilegedContainers tests if the main container or sidecars is privileged
func hasPrivilegedContainers(tmpl *wfv1.Template) bool {
if containerIsPrivileged(tmpl.Container) {
Expand Down
Loading