Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: Remove the need for pod annotations to be mounted as a volume !!POC!! #5950

Closed
wants to merge 4 commits into from
Closed
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
ok
Signed-off-by: Alex Collins <[email protected]>
alexec committed May 19, 2021

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature. The key has expired.
commit 2ca715f198ffabf9d2661c13598a37fd40a68b6c
19 changes: 15 additions & 4 deletions cmd/argoexec/commands/root.go
Original file line number Diff line number Diff line change
@@ -4,6 +4,7 @@ import (
"encoding/json"
"fmt"
"os"
"time"

wfv1 "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1"

@@ -98,8 +99,11 @@ func initExecutor() *executor.WorkflowExecutor {
}

tmpl := &wfv1.Template{}
tmplString := os.Getenv(common.EnvVarTemplate)
checkErr(json.Unmarshal([]byte(tmplString), tmpl))
checkErr(json.Unmarshal([]byte(os.Getenv(common.EnvVarTemplate)), tmpl))

includeScriptOutput := os.Getenv(common.EnvVarIncludeScriptOutput) == "true"
deadline, err := time.Parse(time.RFC3339, os.Getenv(common.EnvVarDeadline))
checkErr(err)

var cre executor.ContainerRuntimeExecutor
switch executorType {
@@ -116,8 +120,15 @@ func initExecutor() *executor.WorkflowExecutor {
}
checkErr(err)

wfExecutor := executor.NewExecutor(clientset, restClient, podName, namespace, cre, *tmpl)
log.Infof("Executor (version: %s, build_date: %s) initialized (pod: %s/%s) with template:\n%s", version.Version, version.BuildDate, namespace, podName, tmplString)
wfExecutor := executor.NewExecutor(clientset, restClient, podName, namespace, cre, *tmpl, includeScriptOutput, deadline)
log.
WithField("version", version.String()).
WithField("namespace", namespace).
WithField("podName", podName).
WithField("template", wfv1.MustMarshallJSON(template)).
WithField("includeScriptOutput", includeScriptOutput).
WithField("deadline", deadline).
Info("Executor initialized")
return &wfExecutor
}

6 changes: 5 additions & 1 deletion workflow/common/common.go
Original file line number Diff line number Diff line change
@@ -30,13 +30,14 @@ const (
AnnotationKeyRBACRulePrecedence = workflow.WorkflowFullName + "/rbac-rule-precedence"

// AnnotationKeyTemplate is the pod metadata annotation key containing the container template as JSON
// DEPRECATED: in v3.2
// DEPRECATED: only for legacy pods
AnnotationKeyTemplate = workflow.WorkflowFullName + "/template"
// AnnotationKeyOutputs is the pod metadata annotation key containing the container outputs
AnnotationKeyOutputs = workflow.WorkflowFullName + "/outputs"
// AnnotationKeyExecutionControl is the pod metadata annotation key containing execution control parameters
// set by the controller and obeyed by the executor. For example, the controller will use this annotation to
// signal the executors of daemoned containers that it should terminate.
// DEPRECATED: only for legacy pods
AnnotationKeyExecutionControl = workflow.WorkflowFullName + "/execution"
// AnnotationKeyCronWfScheduledTime is the workflow metadata annotation key containing the time when the workflow
// was scheduled to run by CronWorkflow.
@@ -98,6 +99,8 @@ const (
EnvVarPodName = "ARGO_POD_NAME"
// EnvVarContainerName container the container's name for the current pod
EnvVarContainerName = "ARGO_CONTAINER_NAME"
// EnvVarDeadline is the deadline for the pod
EnvVarDeadline = "ARGO_DEADLINE"
// EnvVarIncludeScriptOutput capture the stdout and stderr
EnvVarIncludeScriptOutput = "ARGO_INCLUDE_SCRIPT_OUTPUT"
// EnvVarTemplate is the template
@@ -177,6 +180,7 @@ const (
var GlobalVarValidWorkflowVariablePrefix = []string{"item.", "steps.", "inputs.", "outputs.", "pod.", "workflow.", "tasks."}

// ExecutionControl contains execution control parameters for executor to decide how to execute the container
// DEPRECATED: only for legacy pods
type ExecutionControl struct {
// Deadline is a max timestamp in which an executor can run the container before terminating it
// It is used to signal the executor to terminate a daemoned container. In the future it will be
17 changes: 16 additions & 1 deletion workflow/controller/controller.go
Original file line number Diff line number Diff line change
@@ -457,10 +457,25 @@ func (wfc *WorkflowController) processNextPodCleanupItem(ctx context.Context) bo
err := func() error {
pods := wfc.kubeclientset.CoreV1().Pods(namespace)
switch action {
case deadlineExceeded:
case shutdownPod:
// to shutdown a pod, we signal the wait container to terminate, the wait container in turn will
// kill the main container (using whatever mechanism the executor uses), and will then exit itself
// once the main container exited
if err := signal.SignalContainer(wfc.restConfig, namespace, podName, common.WaitContainerName, syscall.SIGTERM); err != nil {
return err
}
// legacy pods must have their annotations updated, it can take several minutes for this to propagate,
// that delay must be traded against the fact these are legacy pods and this code should be removed at some point
if err := common.AddPodAnnotation(
ctx,
wfc.kubeclientset,
podName,
namespace,
common.AnnotationKeyExecutionControl,
wfv1.MustMarshallJSON(common.ExecutionControl{Deadline: &time.Time{}}),
); err != nil {
return err
}
case terminateContainers:
if terminationGracePeriod, err := wfc.signalContainers(namespace, podName, syscall.SIGTERM); err != nil {
return err
2 changes: 1 addition & 1 deletion workflow/controller/dag.go
Original file line number Diff line number Diff line change
@@ -215,7 +215,7 @@ func (woc *wfOperationCtx) executeDAG(ctx context.Context, nodeName string, tmpl

defer func() {
if woc.wf.Status.Nodes[node.ID].Fulfilled() {
_ = woc.killDaemonedChildren(ctx, node.ID)
woc.killDaemonedChildren(node.ID)
}
}()

103 changes: 10 additions & 93 deletions workflow/controller/exec_control.go
Original file line number Diff line number Diff line change
@@ -2,29 +2,27 @@ package controller

import (
"context"
"encoding/json"
"fmt"
"sync"
"time"

apiv1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

"github.com/argoproj/argo-workflows/v3/errors"
wfv1 "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1"
"github.com/argoproj/argo-workflows/v3/workflow/common"
)

// applyExecutionControl will ensure a pod's execution control annotation is up-to-date
// kills any pending pods when workflow has reached it's deadline
func (woc *wfOperationCtx) applyExecutionControl(ctx context.Context, pod *apiv1.Pod, wfNodesLock *sync.RWMutex) error {
func (woc *wfOperationCtx) applyExecutionControl(ctx context.Context, pod *apiv1.Pod, wfNodesLock *sync.RWMutex) {
if pod == nil {
return nil
return
}
switch pod.Status.Phase {
case apiv1.PodSucceeded, apiv1.PodFailed:
// Skip any pod which are already completed
return nil
return
case apiv1.PodPending:
// Check if we are currently shutting down
if woc.GetShutdownStrategy().Enabled() {
@@ -39,7 +37,7 @@ func (woc *wfOperationCtx) applyExecutionControl(ctx context.Context, pod *apiv1
defer wfNodesLock.Unlock()
node := woc.wf.Status.Nodes[pod.Name]
woc.markNodePhase(node.Name, wfv1.NodeFailed, fmt.Sprintf("workflow shutdown with strategy: %s", woc.GetShutdownStrategy()))
return nil
return
}
// If we fail to delete the pod, fall back to setting the annotation
woc.log.Warnf("Failed to delete %s/%s: %v", pod.Namespace, pod.Name, err)
@@ -58,115 +56,34 @@ func (woc *wfOperationCtx) applyExecutionControl(ctx context.Context, pod *apiv1
defer wfNodesLock.Unlock()
node := woc.wf.Status.Nodes[pod.Name]
woc.markNodePhase(node.Name, wfv1.NodeFailed, "Step exceeded its deadline")
return nil
return
}
// If we fail to delete the pod, fall back to setting the annotation
woc.log.Warnf("Failed to delete %s/%s: %v", pod.Namespace, pod.Name, err)
}
}
}

var podExecCtl common.ExecutionControl
if execCtlStr, ok := pod.Annotations[common.AnnotationKeyExecutionControl]; ok && execCtlStr != "" {
err := json.Unmarshal([]byte(execCtlStr), &podExecCtl)
if err != nil {
woc.log.Warnf("Failed to unmarshal execution control from pod %s", pod.Name)
if woc.GetShutdownStrategy().Enabled() {
if _, onExitPod := pod.Labels[common.LabelKeyOnExit]; !woc.GetShutdownStrategy().ShouldExecute(onExitPod) {
woc.queuePodForCleanup(pod.Name, shutdownPod)
}
}

for _, c := range woc.findTemplate(pod).GetMainContainerNames() {
if woc.GetShutdownStrategy().Enabled() {
if _, onExitPod := pod.Labels[common.LabelKeyOnExit]; !woc.GetShutdownStrategy().ShouldExecute(onExitPod) {
podExecCtl.Deadline = &time.Time{}
woc.log.Infof("Applying shutdown deadline for pod %s", pod.Name)
return woc.updateExecutionControl(ctx, pod.Name, podExecCtl, c)
}
}

if woc.workflowDeadline != nil {
if podExecCtl.Deadline == nil || woc.workflowDeadline.Before(*podExecCtl.Deadline) {
podExecCtl.Deadline = woc.workflowDeadline
woc.log.Infof("Applying sooner Workflow Deadline for pod %s at: %v", pod.Name, woc.workflowDeadline)
return woc.updateExecutionControl(ctx, pod.Name, podExecCtl, c)
}
}
}

return nil
}

// killDaemonedChildren kill any daemoned pods of a steps or DAG template node.
func (woc *wfOperationCtx) killDaemonedChildren(ctx context.Context, nodeID string) error {
func (woc *wfOperationCtx) killDaemonedChildren(nodeID string) {
woc.log.Infof("Checking daemoned children of %s", nodeID)
var firstErr error
execCtl := common.ExecutionControl{
Deadline: &time.Time{},
}
for _, childNode := range woc.wf.Status.Nodes {
if childNode.BoundaryID != nodeID {
continue
}
if childNode.Daemoned == nil || !*childNode.Daemoned {
continue
}
err := woc.updateExecutionControl(ctx, childNode.ID, execCtl, common.WaitContainerName)
if err != nil {
woc.log.Errorf("Failed to update execution control of node %s: %+v", childNode.ID, err)
if firstErr == nil {
firstErr = err
}
}
woc.queuePodForCleanup(childNode.ID, shutdownPod)
childNode.Phase = wfv1.NodeSucceeded
childNode.Daemoned = nil
woc.wf.Status.Nodes[childNode.ID] = childNode
woc.updated = true
}
return firstErr
}

// updateExecutionControl updates the execution control parameters
func (woc *wfOperationCtx) updateExecutionControl(ctx context.Context, podName string, execCtl common.ExecutionControl, containerName string) error {
execCtlBytes, err := json.Marshal(execCtl)
if err != nil {
return errors.InternalWrapError(err)
}

woc.log.Infof("Updating execution control of %s: %s", podName, execCtlBytes)
err = common.AddPodAnnotation(
ctx,
woc.controller.kubeclientset,
podName,
woc.wf.ObjectMeta.Namespace,
common.AnnotationKeyExecutionControl,
string(execCtlBytes),
)
if err != nil {
return err
}

// Ideally we would simply annotate the pod with the updates and be done with it, allowing
// the executor to notice the updates naturally via the Downward API annotations volume
// mounted file. However, updates to the Downward API volumes take a very long time to
// propagate (minutes). The following code fast-tracks this by signaling the executor
// using SIGUSR2 that something changed.
woc.log.Infof("Signalling %s of updates", podName)
exec, err := common.ExecPodContainer(
woc.controller.restConfig, woc.wf.ObjectMeta.Namespace, podName,
containerName, true, true, "sh", "-c", "kill -s USR2 $(pidof argoexec)",
)
if err != nil {
return err
}
go func() {
// This call is necessary to actually send the exec. Since signalling is best effort,
// it is launched as a goroutine and the error is discarded
_, _, err = common.GetExecutorOutput(exec)
if err != nil {
woc.log.Warnf("Signal command failed: %v", err)
return
}
woc.log.Infof("Signal of %s (%s) successfully issued", podName, common.WaitContainerName)
}()

return nil
}
3 changes: 1 addition & 2 deletions workflow/controller/exec_control_test.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package controller

import (
"context"
"testing"

"github.com/stretchr/testify/assert"
@@ -28,6 +27,6 @@ func TestKillDaemonChildrenUnmarkPod(t *testing.T) {

assert.NotNil(t, woc.wf.Status.Nodes["a"].Daemoned)
// Error will be that it cannot find the pod, but we only care about the node status for this test
_ = woc.killDaemonedChildren(context.Background(), "a")
woc.killDaemonedChildren("a")
assert.Nil(t, woc.wf.Status.Nodes["a"].Daemoned)
}
18 changes: 13 additions & 5 deletions workflow/controller/operator.go
Original file line number Diff line number Diff line change
@@ -173,7 +173,7 @@ func newWorkflowOperationCtx(wf *wfv1.Workflow, wfc *WorkflowController) *wfOper
func (woc *wfOperationCtx) operate(ctx context.Context) {
defer func() {
if woc.wf.Status.Fulfilled() {
_ = woc.killDaemonedChildren(ctx, "")
woc.killDaemonedChildren("")
}
woc.persistUpdates(ctx)
}()
@@ -933,10 +933,7 @@ func (woc *wfOperationCtx) podReconciliation(ctx context.Context) error {
go func(pod *apiv1.Pod) {
defer wg.Done()
performAssessment(pod)
err = woc.applyExecutionControl(ctx, pod, wfNodesLock)
if err != nil {
woc.log.Warnf("Failed to apply execution control to pod %s", pod.Name)
}
woc.applyExecutionControl(ctx, pod, wfNodesLock)
<-parallelPodNum
}(pod)
}
@@ -1206,6 +1203,17 @@ func getPodTemplate(pod *apiv1.Pod) (*wfv1.Template, error) {
return nil, fmt.Errorf("not found")
}

func getPodDeadline(pod *apiv1.Pod) (time.Time, error) {
for _, c := range pod.Spec.Containers {
for _, e := range c.Env {
if e.Name == common.EnvVarDeadline {
return time.Parse(time.RFC3339, e.Value)
}
}
}
return time.Time{}, fmt.Errorf("not found")
}

func getExitCode(pod *apiv1.Pod) *int32 {
for _, c := range pod.Status.ContainerStatuses {
if c.Name == common.MainContainerName && c.State.Terminated != nil {
4 changes: 2 additions & 2 deletions workflow/controller/operator_test.go
Original file line number Diff line number Diff line change
@@ -5660,8 +5660,8 @@ func TestWFWithRetryAndWithParam(t *testing.T) {
ctrs := pods.Items[0].Spec.Containers
assert.Len(t, ctrs, 2)
envs := ctrs[1].Env
assert.Len(t, envs, 3)
assert.Equal(t, apiv1.EnvVar{Name: "ARGO_INCLUDE_SCRIPT_OUTPUT", Value: "true"}, envs[2])
assert.Len(t, envs, 4)
assert.Equal(t, apiv1.EnvVar{Name: "ARGO_INCLUDE_SCRIPT_OUTPUT", Value: "true"}, envs[3])
}
})
}
2 changes: 1 addition & 1 deletion workflow/controller/pod_cleanup_key.go
Original file line number Diff line number Diff line change
@@ -16,7 +16,7 @@ type (

const (
deletePod podCleanupAction = "deletePod"
deadlineExceeded podCleanupAction = "deadlineExceeded"
shutdownPod podCleanupAction = "shutdownPod"
labelPodCompleted podCleanupAction = "labelPodCompleted"
terminateContainers podCleanupAction = "terminateContainers"
killContainers podCleanupAction = "killContainers"
2 changes: 1 addition & 1 deletion workflow/controller/steps.go
Original file line number Diff line number Diff line change
@@ -40,7 +40,7 @@ func (woc *wfOperationCtx) executeSteps(ctx context.Context, nodeName string, tm

defer func() {
if woc.wf.Status.Nodes[node.ID].Fulfilled() {
_ = woc.killDaemonedChildren(ctx, node.ID)
woc.killDaemonedChildren(node.ID)
}
}()

52 changes: 21 additions & 31 deletions workflow/controller/workflowpod.go
Original file line number Diff line number Diff line change
@@ -250,7 +250,7 @@ func (woc *wfOperationCtx) createWorkflowPod(ctx context.Context, nodeName strin
}

addSchedulingConstraints(pod, wfSpec, tmpl)
woc.addMetadata(pod, tmpl, opts)
woc.addMetadata(pod, tmpl)

err = addVolumeReferences(pod, woc.volumes, tmpl, woc.wf.Status.PersistentVolumeClaims)
if err != nil {
@@ -297,18 +297,29 @@ func (woc *wfOperationCtx) createWorkflowPod(ctx context.Context, nodeName strin
}
}

deadline := time.Time{}
if woc.workflowDeadline != nil {
deadline = *woc.workflowDeadline
}
if deadline.IsZero() || opts.executionDeadline.Before(deadline) {
deadline = opts.executionDeadline
}

// add standard environment variables, making pod spec larger
envVars := []apiv1.EnvVar{
{Name: common.EnvVarTemplate, Value: wfv1.MustMarshallJSON(tmpl)},
{Name: common.EnvVarDeadline, Value: deadline.Format(time.RFC3339)},
{Name: common.EnvVarIncludeScriptOutput, Value: strconv.FormatBool(opts.includeScriptOutput)},
}

for i, c := range pod.Spec.InitContainers {
c.Env = append(c.Env,
apiv1.EnvVar{Name: common.EnvVarTemplate, Value: wfv1.MustMarshallJSON(tmpl)},
)
c.Env = append(c.Env, apiv1.EnvVar{Name: common.EnvVarContainerName, Value: c.Name})
c.Env = append(c.Env, envVars...)
pod.Spec.InitContainers[i] = c
}
for i, c := range pod.Spec.Containers {
c.Env = append(c.Env,
apiv1.EnvVar{Name: common.EnvVarContainerName, Value: c.Name},
apiv1.EnvVar{Name: common.EnvVarTemplate, Value: wfv1.MustMarshallJSON(tmpl)},
apiv1.EnvVar{Name: common.EnvVarIncludeScriptOutput, Value: strconv.FormatBool(opts.includeScriptOutput)},
)
c.Env = append(c.Env, apiv1.EnvVar{Name: common.EnvVarContainerName, Value: c.Name})
c.Env = append(c.Env, envVars...)
pod.Spec.Containers[i] = c
}

@@ -617,7 +628,7 @@ func isResourcesSpecified(ctr *apiv1.Container) bool {
}

// addMetadata applies metadata specified in the template
func (woc *wfOperationCtx) addMetadata(pod *apiv1.Pod, tmpl *wfv1.Template, opts *createWorkflowPodOpts) {
func (woc *wfOperationCtx) addMetadata(pod *apiv1.Pod, tmpl *wfv1.Template) {
if woc.wf.Spec.PodMetadata != nil {
// add workflow-level pod annotations and labels
for k, v := range woc.wf.Spec.PodMetadata.Annotations {
@@ -634,27 +645,6 @@ func (woc *wfOperationCtx) addMetadata(pod *apiv1.Pod, tmpl *wfv1.Template, opts
for k, v := range tmpl.Metadata.Labels {
pod.ObjectMeta.Labels[k] = v
}

execCtl := common.ExecutionControl{}

if woc.workflowDeadline != nil {
execCtl.Deadline = woc.workflowDeadline
}

// If we're passed down an executionDeadline, only set it if there isn't one set already, or if it's before than
// the one already set.
if !opts.executionDeadline.IsZero() && (execCtl.Deadline == nil || opts.executionDeadline.Before(*execCtl.Deadline)) {
execCtl.Deadline = &opts.executionDeadline
}

if execCtl.Deadline != nil || opts.includeScriptOutput {
execCtlBytes, err := json.Marshal(execCtl)
if err != nil {
panic(err)
}

pod.ObjectMeta.Annotations[common.AnnotationKeyExecutionControl] = string(execCtlBytes)
}
}

// addSchedulingConstraints applies any node selectors or affinity rules to the pod, either set in the workflow or the template
10 changes: 4 additions & 6 deletions workflow/controller/workflowpod_test.go
Original file line number Diff line number Diff line change
@@ -2,7 +2,6 @@ package controller

import (
"context"
"encoding/json"
"fmt"
"path/filepath"
"testing"
@@ -1350,14 +1349,13 @@ func TestPropagateMaxDuration(t *testing.T) {
// Ensure that volume mount is added when artifact is provided
tmpl := unmarshalTemplate(propagateMaxDuration)
woc := newWoc()
deadline := time.Now()
deadline := time.Time{}.Add(time.Second)
ctx := context.Background()
pod, err := woc.createWorkflowPod(ctx, tmpl.Name, []apiv1.Container{*tmpl.Container}, tmpl, &createWorkflowPodOpts{executionDeadline: deadline})
assert.NoError(t, err)
out, err := json.Marshal(map[string]time.Time{"deadline": deadline})
if assert.NoError(t, err) {
assert.Equal(t, string(out), pod.Annotations[common.AnnotationKeyExecutionControl])
}
v, err := getPodDeadline(pod)
assert.NoError(t, err)
assert.Equal(t, v, deadline)
}

var wfWithPodMetadata = `
74 changes: 46 additions & 28 deletions workflow/executor/executor.go
Original file line number Diff line number Diff line change
@@ -16,10 +16,9 @@ import (
"path/filepath"
"runtime/debug"
"strings"
"syscall"
"time"

os_specific "github.com/argoproj/argo-workflows/v3/workflow/executor/os-specific"

argofile "github.com/argoproj/pkg/file"
log "github.com/sirupsen/logrus"
apiv1 "k8s.io/api/core/v1"
@@ -62,12 +61,14 @@ const (

// WorkflowExecutor is program which runs as the init/wait container
type WorkflowExecutor struct {
PodName string
Template wfv1.Template
ClientSet kubernetes.Interface
RESTClient rest.Interface
Namespace string
RuntimeExecutor ContainerRuntimeExecutor
PodName string
Template wfv1.Template
includeScriptOutput bool
deadline time.Time
ClientSet kubernetes.Interface
RESTClient rest.Interface
Namespace string
RuntimeExecutor ContainerRuntimeExecutor

// memoized configmaps
memoizedConfigMaps map[string]string
@@ -107,17 +108,19 @@ type ContainerRuntimeExecutor interface {
}

// NewExecutor instantiates a new workflow executor
func NewExecutor(clientset kubernetes.Interface, restClient rest.Interface, podName, namespace string, cre ContainerRuntimeExecutor, template wfv1.Template) WorkflowExecutor {
func NewExecutor(clientset kubernetes.Interface, restClient rest.Interface, podName, namespace string, cre ContainerRuntimeExecutor, template wfv1.Template, includeScriptOutput bool, deadline time.Time) WorkflowExecutor {
return WorkflowExecutor{
PodName: podName,
ClientSet: clientset,
RESTClient: restClient,
Namespace: namespace,
RuntimeExecutor: cre,
Template: template,
memoizedConfigMaps: map[string]string{},
memoizedSecrets: map[string][]byte{},
errors: []error{},
PodName: podName,
ClientSet: clientset,
RESTClient: restClient,
Namespace: namespace,
RuntimeExecutor: cre,
Template: template,
includeScriptOutput: includeScriptOutput,
deadline: deadline,
memoizedConfigMaps: map[string]string{},
memoizedSecrets: map[string][]byte{},
errors: []error{},
}
}

@@ -671,7 +674,7 @@ func (we *WorkflowExecutor) GetTerminationGracePeriodDuration(ctx context.Contex

// CaptureScriptResult will add the stdout of a script template as output result
func (we *WorkflowExecutor) CaptureScriptResult(ctx context.Context) error {
if os.Getenv(common.EnvVarIncludeScriptOutput) == "false" {
if !we.includeScriptOutput {
log.Infof("No Script output reference in workflow. Capturing script output ignored")
return nil
}
@@ -927,27 +930,42 @@ func (we *WorkflowExecutor) Wait(ctx context.Context) error {
// monitorDeadline checks to see if we exceeded the deadline for the step and
// terminates the main container if we did
func (we *WorkflowExecutor) monitorDeadline(ctx context.Context, containerNames []string) {
sigs := make(chan os.Signal, 1)
signal.Notify(sigs, os_specific.GetOsSignal())
terminate := make(chan os.Signal, 1)
signal.Notify(terminate, syscall.SIGTERM)

deadlineExceeded := make(chan bool, 1)
if !we.deadline.IsZero() {
t := time.AfterFunc(time.Until(we.deadline), func() {
deadlineExceeded <- true
})
defer t.Stop()
}

log.Infof("Starting deadline monitor")
for {
select {
case <-ctx.Done():
log.Info("Deadline monitor stopped")
return
case <-sigs:
util.WriteTeriminateMessage("terminated or step exceeded its deadline")
log.Infof("Killing main container")
terminationGracePeriodDuration, _ := we.GetTerminationGracePeriodDuration(ctx)
if err := we.RuntimeExecutor.Kill(ctx, containerNames, terminationGracePeriodDuration); err != nil {
log.Warnf("Failed to kill main container: %v", err)
}
case <-deadlineExceeded:
we.killMainContainer(ctx, containerNames, "Step exceeded its deadline")
return
case <-terminate:
we.killMainContainer(ctx, containerNames, "terminated")
return
}
}
}

func (we *WorkflowExecutor) killMainContainer(ctx context.Context, containerNames []string, message string) {
log.Infof("Killing main container: %s", message)
util.WriteTeriminateMessage(message)
terminationGracePeriodDuration, _ := we.GetTerminationGracePeriodDuration(ctx)
if err := we.RuntimeExecutor.Kill(ctx, containerNames, terminationGracePeriodDuration); err != nil {
log.Warnf("Failed to kill main container: %v", err)
}
}

// KillSidecars kills any sidecars to the main container
func (we *WorkflowExecutor) KillSidecars(ctx context.Context) error {
containerNames, err := we.RuntimeExecutor.ListContainerNames(ctx)
4 changes: 0 additions & 4 deletions workflow/executor/os-specific/signal_darwin.go
Original file line number Diff line number Diff line change
@@ -5,10 +5,6 @@ import (
"syscall"
)

func GetOsSignal() os.Signal {
return syscall.SIGUSR2
}

func IsSIGCHLD(s os.Signal) bool { return s == syscall.SIGCHLD }

func Kill(pid int, s syscall.Signal) error {
4 changes: 0 additions & 4 deletions workflow/executor/os-specific/signal_linux.go
Original file line number Diff line number Diff line change
@@ -5,10 +5,6 @@ import (
"syscall"
)

func GetOsSignal() os.Signal {
return syscall.SIGUSR2
}

func IsSIGCHLD(s os.Signal) bool { return s == syscall.SIGCHLD }

func Kill(pid int, s syscall.Signal) error {
4 changes: 0 additions & 4 deletions workflow/executor/os-specific/signal_windows.go
Original file line number Diff line number Diff line change
@@ -5,10 +5,6 @@ import (
"syscall"
)

func GetOsSignal() os.Signal {
return syscall.SIGINT
}

func IsSIGCHLD(s os.Signal) bool {
return false // this does not exist on windows
}
10 changes: 7 additions & 3 deletions workflow/signal/signal.go
Original file line number Diff line number Diff line change
@@ -2,16 +2,20 @@ package signal

import (
"fmt"
"syscall"
"os"

log "github.com/sirupsen/logrus"
"k8s.io/client-go/rest"

"github.com/argoproj/argo-workflows/v3/workflow/common"
)

func SignalContainer(restConfig *rest.Config, namespace string, pod string, container string, s syscall.Signal) error {
return ExecPodContainerAndGetOutput(restConfig, namespace, pod, container, "/bin/sh", "-c", fmt.Sprintf("kill -s%d -- -1", s))
func SignalContainer(restConfig *rest.Config, namespace string, pod string, container string, s os.Signal) error {
command := fmt.Sprintf("kill -s%d -- -1", s)
if container == "wait" {
command = fmt.Sprintf("kill -s %d $(pidof argoexec)", s)
}
return ExecPodContainerAndGetOutput(restConfig, namespace, pod, container, "sh", "-c", command)
}

func ExecPodContainerAndGetOutput(restConfig *rest.Config, namespace string, pod string, container string, command ...string) error {