Skip to content

Commit

Permalink
feat(executor): Minimize the number of Kubernetes API requests made b…
Browse files Browse the repository at this point in the history
…y executors (#4954)

Signed-off-by: Alex Collins <[email protected]>
  • Loading branch information
alexec authored Feb 12, 2021
1 parent 68979f6 commit 2ff4db1
Show file tree
Hide file tree
Showing 27 changed files with 716 additions and 819 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/ci-build.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ jobs:
- name: Pre-pull images
env:
GOPATH: /home/runner/go
run: make pull-build-images test-images &
run: make test-images &
- name: Create Kubeconfig
run: |
mkdir -p ~/.kube
Expand Down
6 changes: 1 addition & 5 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -383,7 +383,7 @@ install: $(MANIFESTS) $(E2E_MANIFESTS) dist/kustomize
kubectl get ns $(KUBE_NAMESPACE) || kubectl create ns $(KUBE_NAMESPACE)
kubectl config set-context --current --namespace=$(KUBE_NAMESPACE)
@echo "installing PROFILE=$(PROFILE) VERSION=$(VERSION), E2E_EXECUTOR=$(E2E_EXECUTOR)"
dist/kustomize build --load_restrictor=none test/e2e/manifests/$(PROFILE) | sed 's/image: argoproj/image: $(IMAGE_NAMESPACE)/' | sed 's/:latest/:$(VERSION)/' | sed 's/pns/$(E2E_EXECUTOR)/' | kubectl -n $(KUBE_NAMESPACE) apply -f -
dist/kustomize build --load_restrictor=none test/e2e/manifests/$(PROFILE) | sed 's/argoproj\//$(IMAGE_NAMESPACE)\//' | sed 's/:latest/:$(VERSION)/' | sed 's/pns/$(E2E_EXECUTOR)/' | kubectl -n $(KUBE_NAMESPACE) apply -f -
kubectl -n $(KUBE_NAMESPACE) apply -f test/stress/massive-workflow.yaml
kubectl -n $(KUBE_NAMESPACE) rollout restart deploy workflow-controller
kubectl -n $(KUBE_NAMESPACE) rollout restart deploy argo-server
Expand All @@ -394,10 +394,6 @@ ifeq ($(RUN_MODE),kubernetes)
kubectl -n $(KUBE_NAMESPACE) scale deploy/argo-server --replicas 1
endif

.PHONY: pull-build-images
pull-build-images:
./hack/pull-build-images.sh

.PHONY: argosay
argosay: test/e2e/images/argosay/v2/argosay
cd test/e2e/images/argosay/v2 && docker build . -t argoproj/argosay:v2
Expand Down
6 changes: 3 additions & 3 deletions cmd/argoexec/commands/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,11 +101,11 @@ func initExecutor() *executor.WorkflowExecutor {
case common.ContainerRuntimeExecutorK8sAPI:
cre, err = k8sapi.NewK8sAPIExecutor(clientset, config, podName, namespace)
case common.ContainerRuntimeExecutorKubelet:
cre, err = kubelet.NewKubeletExecutor()
cre, err = kubelet.NewKubeletExecutor(namespace, podName)
case common.ContainerRuntimeExecutorPNS:
cre, err = pns.NewPNSExecutor(clientset, podName, namespace, tmpl.Outputs.HasOutputs())
cre, err = pns.NewPNSExecutor(clientset, podName, namespace)
default:
cre, err = docker.NewDockerExecutor()
cre, err = docker.NewDockerExecutor(namespace, podName)
}
checkErr(err)

Expand Down
9 changes: 9 additions & 0 deletions pkg/apis/workflow/v1alpha1/workflow_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -621,6 +621,15 @@ func (tmpl *Template) HasPodSpecPatch() bool {
return tmpl.PodSpecPatch != ""
}

func (tmpl *Template) GetSidecarNames() []string {
var containerNames []string
for _, s := range tmpl.Sidecars {
containerNames = append(containerNames, s.Name)
}
return containerNames

}

type Artifacts []Artifact

func (a Artifacts) GetArtifactByName(name string) *Artifact {
Expand Down
9 changes: 9 additions & 0 deletions pkg/apis/workflow/v1alpha1/workflow_types_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -610,3 +610,12 @@ func TestWorkflow_GetSemaphoreKeys(t *testing.T) {
assert.Contains(keys, "test/template")
assert.Contains(keys, "test/template1")
}

func TestTemplate_GetSidecarNames(t *testing.T) {
m := &Template{
Sidecars: []UserContainer{
{Container: corev1.Container{Name: "sidecar-0"}},
},
}
assert.ElementsMatch(t, []string{"sidecar-0"}, m.GetSidecarNames())
}
13 changes: 8 additions & 5 deletions test/e2e/signals_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@ import (
"github.com/argoproj/argo-workflows/v3/test/e2e/fixtures"
)

// Tests the use of signals to kill containers.
// argoproj/argosay:v2 does not contain sh, so you must use argoproj/argosay:v1.
// Killing often requires SIGKILL, which is issued 30s after SIGTERM. So tests need longer (>30s) timeout.
type SignalsSuite struct {
fixtures.E2ESuite
}
Expand All @@ -37,10 +40,10 @@ func (s *SignalsSuite) TestStopBehavior() {
WaitForWorkflow(1 * time.Minute).
Then().
ExpectWorkflow(func(t *testing.T, m *metav1.ObjectMeta, status *wfv1.WorkflowStatus) {
assert.Equal(t, wfv1.WorkflowFailed, status.Phase)
assert.Contains(t, []wfv1.WorkflowPhase{wfv1.WorkflowFailed, wfv1.WorkflowError}, status.Phase)
nodeStatus := status.Nodes.FindByDisplayName("A")
if assert.NotNil(t, nodeStatus) {
assert.Equal(t, wfv1.NodeFailed, nodeStatus.Phase)
assert.Contains(t, []wfv1.NodePhase{wfv1.NodeFailed, wfv1.NodeError}, nodeStatus.Phase)
}
nodeStatus = status.Nodes.FindByDisplayName("A.onExit")
if assert.NotNil(t, nodeStatus) {
Expand All @@ -66,10 +69,10 @@ func (s *SignalsSuite) TestTerminateBehavior() {
WaitForWorkflow(1 * time.Minute).
Then().
ExpectWorkflow(func(t *testing.T, m *metav1.ObjectMeta, status *wfv1.WorkflowStatus) {
assert.Equal(t, wfv1.WorkflowFailed, status.Phase)
assert.Contains(t, []wfv1.WorkflowPhase{wfv1.WorkflowFailed, wfv1.WorkflowError}, status.Phase)
nodeStatus := status.Nodes.FindByDisplayName("A")
if assert.NotNil(t, nodeStatus) {
assert.Equal(t, wfv1.NodeFailed, nodeStatus.Phase)
assert.Contains(t, []wfv1.NodePhase{wfv1.NodeFailed, wfv1.NodeError}, nodeStatus.Phase)
}
nodeStatus = status.Nodes.FindByDisplayName("A.onExit")
assert.Nil(t, nodeStatus)
Expand Down Expand Up @@ -133,7 +136,7 @@ spec:
WaitForWorkflow(1 * time.Minute).
Then().
ExpectWorkflow(func(t *testing.T, _ *metav1.ObjectMeta, status *wfv1.WorkflowStatus) {
assert.Equal(t, wfv1.WorkflowFailed, status.Phase)
assert.Contains(t, []wfv1.WorkflowPhase{wfv1.WorkflowFailed, wfv1.WorkflowError}, status.Phase)
assert.Len(t, status.Nodes, 3)
node := status.Nodes.FindByDisplayName("retry-backoff-2(1)")
if assert.NotNil(t, node) {
Expand Down
10 changes: 7 additions & 3 deletions test/e2e/testdata/sidecar-workflow.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,12 @@ spec:
templates:
- name: main
container:
image: argoproj/argosay:v2
image: argoproj/argosay:v1
args: [ sleep, "5s" ]
sidecars:
- name: sidecar-0
image: argoproj/argosay:v2
argso: [ sleep, "999" ]
image: argoproj/argosay:v1
args: [ sleep, "999s" ]
- name: sidecar-1
image: argoproj/argosay:v1
args: [ sleep, "999s" ]
3 changes: 2 additions & 1 deletion test/stress/tool/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
_ "k8s.io/client-go/plugin/pkg/client/auth"
"k8s.io/client-go/tools/clientcmd"
"sigs.k8s.io/yaml"

Expand Down Expand Up @@ -45,7 +46,7 @@ spec:
}

ctx := context.Background()
for i := 0; i < 100; i++ {
for i := 0; i < 500; i++ {
_, err := w.Create(ctx, wf, metav1.CreateOptions{})
if err != nil {
panic(err)
Expand Down
2 changes: 2 additions & 0 deletions workflow/common/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,8 @@ const (

// EnvVarPodName contains the name of the pod (currently unused)
EnvVarPodName = "ARGO_POD_NAME"
// EnvVarContainerName container the container's name for the current pod
EnvVarContainerName = "ARGO_CONTAINER_NAME"
// EnvVarContainerRuntimeExecutor contains the name of the container runtime executor to use, empty is equal to "docker"
EnvVarContainerRuntimeExecutor = "ARGO_CONTAINER_RUNTIME_EXECUTOR"
// EnvVarDownwardAPINodeIP is the envvar used to get the `status.hostIP`
Expand Down
120 changes: 37 additions & 83 deletions workflow/controller/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -1272,110 +1272,64 @@ func inferFailedReason(pod *apiv1.Pod) (wfv1.NodePhase, string) {
// We only get one message to set for the overall node status.
// If multiple containers failed, in order of preference:
// init, main (annotated), main (exit code), wait, sidecars
for _, ctr := range pod.Status.InitContainerStatuses {
// Virtual Kubelet environment will not set the terminate on waiting container
// https://github.com/argoproj/argo-workflows/issues/3879
// https://github.com/virtual-kubelet/virtual-kubelet/blob/7f2a02291530d2df14905702e6d51500dd57640a/node/sync.go#L195-L208
if ctr.State.Waiting != nil {
return wfv1.NodeError, fmt.Sprintf("Pod failed before %s container starts", ctr.Name)
order := func(n string) int {
order, ok := map[string]int{
common.InitContainerName: 0,
common.MainContainerName: 1,
common.WaitContainerName: 2,
}[n]
if ok {
return order
}
if ctr.State.Terminated == nil {
// We should never get here
log.Warnf("Pod %s phase was Failed but %s did not have terminated state", pod.ObjectMeta.Name, ctr.Name)
continue
}
if ctr.State.Terminated.ExitCode == 0 {
continue
}
errMsg := "failed to load artifacts"
for _, msg := range []string{annotatedMsg, ctr.State.Terminated.Message} {
if msg != "" {
errMsg += ": " + msg
break
}
}
// NOTE: we consider artifact load issues as Error instead of Failed
return wfv1.NodeError, errMsg
return 3
}
failMessages := make(map[string]string)
for _, ctr := range pod.Status.ContainerStatuses {

ctrs := append(pod.Status.InitContainerStatuses, pod.Status.ContainerStatuses...)
sort.Slice(ctrs, func(i, j int) bool { return order(ctrs[i].Name) < order(ctrs[j].Name) })

for _, ctr := range ctrs {

// Virtual Kubelet environment will not set the terminate on waiting container
// https://github.com/argoproj/argo-workflows/issues/3879
// https://github.com/virtual-kubelet/virtual-kubelet/blob/7f2a02291530d2df14905702e6d51500dd57640a/node/sync.go#L195-L208

if ctr.State.Waiting != nil {
return wfv1.NodeError, fmt.Sprintf("Pod failed before %s container starts", ctr.Name)
}
if ctr.State.Terminated == nil {
t := ctr.State.Terminated
if t == nil {
// We should never get here
log.Warnf("Pod %s phase was Failed but %s did not have terminated state", pod.ObjectMeta.Name, ctr.Name)
continue
}
if ctr.State.Terminated.ExitCode == 0 {
continue
}
if ctr.State.Terminated.Message == "" && ctr.State.Terminated.Reason == "OOMKilled" {
failMessages[ctr.Name] = ctr.State.Terminated.Reason
continue
}
if ctr.Name == common.WaitContainerName {
errDetails := ""
for _, msg := range []string{annotatedMsg, ctr.State.Terminated.Message} {
if msg != "" {
errDetails = msg
break
}
}
if errDetails == "" {
// executor is expected to annotate a message to the pod upon any errors.
// If we failed to see the annotated message, it is likely the pod ran with
// insufficient privileges. Give a hint to that effect.
errDetails = fmt.Sprintf("verify serviceaccount %s:%s has necessary privileges", pod.ObjectMeta.Namespace, pod.Spec.ServiceAccountName)
}
errMsg := fmt.Sprintf("failed to save outputs: %s", errDetails)
failMessages[ctr.Name] = errMsg
log.Warnf("Pod %s phase was Failed but %s did not have terminated state", pod.Name, ctr.Name)
continue
}
if ctr.State.Terminated.Message != "" {
errMsg := ctr.State.Terminated.Message
if ctr.Name != common.MainContainerName {
errMsg = fmt.Sprintf("sidecar '%s' %s", ctr.Name, errMsg)
}
failMessages[ctr.Name] = errMsg
if t.ExitCode == 0 {
continue
}
errMsg := fmt.Sprintf("failed with exit code %d", ctr.State.Terminated.ExitCode)
if ctr.Name != common.MainContainerName {
if ctr.State.Terminated.ExitCode == 137 || ctr.State.Terminated.ExitCode == 143 {

msg := fmt.Sprintf("exit code %d: %s; %s; %s", t.ExitCode, t.Reason, t.Message, annotatedMsg)

switch ctr.Name {
case common.InitContainerName:
return wfv1.NodeError, msg
case common.MainContainerName:
return wfv1.NodeFailed, msg
case common.WaitContainerName:
// executor is expected to annotate a message to the pod upon any errors.
// If we failed to see the annotated message, it is likely the pod ran with
// insufficient privileges. Give a hint to that effect.
return wfv1.NodeError, fmt.Sprintf("%s; verify serviceaccount %s:%s has necessary privileges", msg, pod.Namespace, pod.Spec.ServiceAccountName)
default:
if t.ExitCode == 137 || t.ExitCode == 143 {
// if the sidecar was SIGKILL'd (exit code 137) assume it was because argoexec
// forcibly killed the container, which we ignore the error for.
// Java code 143 is a normal exit 128 + 15 https://github.com/elastic/elasticsearch/issues/31847
log.Infof("Ignoring %d exit code of sidecar '%s'", ctr.State.Terminated.ExitCode, ctr.Name)
continue
log.Infof("Ignoring %d exit code of container '%s'", t.ExitCode, ctr.Name)
} else {
return wfv1.NodeFailed, msg
}
errMsg = fmt.Sprintf("sidecar '%s' %s", ctr.Name, errMsg)
}
failMessages[ctr.Name] = errMsg
}
if failMsg, ok := failMessages[common.MainContainerName]; ok {
_, ok = failMessages[common.WaitContainerName]
isResourceTemplate := !ok
if isResourceTemplate && annotatedMsg != "" {
// For resource templates, we prefer the annotated message
// over the vanilla exit code 1 error
return wfv1.NodeFailed, annotatedMsg
}
return wfv1.NodeFailed, failMsg
}
if failMsg, ok := failMessages[common.WaitContainerName]; ok {
return wfv1.NodeError, failMsg
}

// If we get here, both the main and wait container succeeded. Iterate the fail messages to
// identify the sidecar which failed and return the message.
for _, failMsg := range failMessages {
return wfv1.NodeFailed, failMsg
}
// If we get here, we have detected that the main/wait containers succeed but the sidecar(s)
// were SIGKILL'd. The executor may have had to forcefully terminate the sidecar (kill -9),
// resulting in a 137 exit code (which we had ignored earlier). If failMessages is empty, it
Expand Down
2 changes: 1 addition & 1 deletion workflow/controller/operator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5439,7 +5439,7 @@ func TestPodFailureWithContainerOOM(t *testing.T) {
assert.NotNil(t, pod)
nodeStatus, msg := inferFailedReason(&pod)
assert.Equal(t, tt.phase, nodeStatus)
assert.Equal(t, msg, "OOMKilled")
assert.Contains(t, msg, "OOMKilled")
}
}

Expand Down
13 changes: 11 additions & 2 deletions workflow/controller/workflowpod.go
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,11 @@ func (woc *wfOperationCtx) createWorkflowPod(ctx context.Context, nodeName strin
}
addOutputArtifactsVolumes(pod, tmpl)

for i, c := range pod.Spec.Containers {
c.Env = append(c.Env, apiv1.EnvVar{Name: common.EnvVarContainerName, Value: c.Name}) // used to identify the container name of the process
pod.Spec.Containers[i] = c
}

// Set the container template JSON in pod annotations, which executor examines for things like
// artifact location/path.
tmplBytes, err := json.Marshal(tmpl)
Expand Down Expand Up @@ -428,13 +433,13 @@ func substitutePodParams(pod *apiv1.Pod, globalParams common.Parameters, tmpl *w

func (woc *wfOperationCtx) newInitContainer(tmpl *wfv1.Template) apiv1.Container {
ctr := woc.newExecContainer(common.InitContainerName, tmpl)
ctr.Command = []string{"argoexec", "init"}
ctr.Command = []string{"argoexec", "init", "--loglevel", getExecutorLogLevel()}
return *ctr
}

func (woc *wfOperationCtx) newWaitContainer(tmpl *wfv1.Template) (*apiv1.Container, error) {
ctr := woc.newExecContainer(common.WaitContainerName, tmpl)
ctr.Command = []string{"argoexec", "wait"}
ctr.Command = []string{"argoexec", "wait", "--loglevel", getExecutorLogLevel()}
switch woc.controller.GetContainerRuntimeExecutor() {
case common.ContainerRuntimeExecutorPNS:
ctr.SecurityContext = &apiv1.SecurityContext{
Expand All @@ -459,6 +464,10 @@ func (woc *wfOperationCtx) newWaitContainer(tmpl *wfv1.Template) (*apiv1.Contain
return ctr, nil
}

func getExecutorLogLevel() string {
return log.GetLevel().String()
}

// hasPrivilegedContainers tests if the main container or sidecars is privileged
func hasPrivilegedContainers(tmpl *wfv1.Template) bool {
if containerIsPrivileged(tmpl.Container) {
Expand Down
Loading

0 comments on commit 2ff4db1

Please sign in to comment.