Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: Remove the need for pod annotations to be mounted as a volume #6022

Merged
merged 14 commits into from
Jun 22, 2021
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 14 additions & 11 deletions cmd/argoexec/commands/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@ import (
"encoding/json"
"fmt"
"os"
"time"

wfv1 "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1"

"github.com/argoproj/pkg/cli"
kubecli "github.com/argoproj/pkg/kube/cli"
Expand Down Expand Up @@ -32,11 +35,10 @@ const (
)

var (
clientConfig clientcmd.ClientConfig
logLevel string // --loglevel
glogLevel int // --gloglevel
logFormat string // --log-format
podAnnotationsPath string // --pod-annotations
clientConfig clientcmd.ClientConfig
logLevel string // --loglevel
glogLevel int // --gloglevel
logFormat string // --log-format
)

func init() {
Expand Down Expand Up @@ -66,7 +68,6 @@ func NewRootCommand() *cobra.Command {
command.AddCommand(cmd.NewVersionCmd(CLIName))

clientConfig = kubecli.AddKubectlFlagsToCmd(&command)
command.PersistentFlags().StringVar(&podAnnotationsPath, "pod-annotations", common.PodMetadataAnnotationsPath, "Pod annotations file from k8s downward API")
command.PersistentFlags().StringVar(&logLevel, "loglevel", "info", "Set the logging level. One of: debug|info|warn|error")
command.PersistentFlags().IntVar(&glogLevel, "gloglevel", 0, "Set the glog logging level")
command.PersistentFlags().StringVar(&logFormat, "log-format", "text", "The formatter to use for logs. One of: text|json")
Expand Down Expand Up @@ -97,10 +98,12 @@ func initExecutor() *executor.WorkflowExecutor {
log.Fatalf("Unable to determine pod name from environment variable %s", common.EnvVarPodName)
}

tmpl, err := executor.LoadTemplate(podAnnotationsPath)
checkErr(err)
tmpl := &wfv1.Template{}
checkErr(json.Unmarshal([]byte(os.Getenv(common.EnvVarTemplate)), tmpl))

includeScriptOutput := os.Getenv(common.EnvVarIncludeScriptOutput) == "true"
deadline, err := time.Parse(time.RFC3339, os.Getenv(common.EnvVarDeadline))
checkErr(err)

var cre executor.ContainerRuntimeExecutor
switch executorType {
Expand All @@ -117,14 +120,14 @@ func initExecutor() *executor.WorkflowExecutor {
}
checkErr(err)

wfExecutor := executor.NewExecutor(clientset, restClient, podName, namespace, podAnnotationsPath, cre, *tmpl, includeScriptOutput)
yamlBytes, _ := json.Marshal(&wfExecutor.Template)
wfExecutor := executor.NewExecutor(clientset, restClient, podName, namespace, cre, *tmpl, includeScriptOutput, deadline)
log.
WithField("version", version.String()).
WithField("namespace", namespace).
WithField("podName", podName).
WithField("template", string(yamlBytes)).
WithField("template", wfv1.MustMarshallJSON(&wfExecutor.Template)).
WithField("includeScriptOutput", includeScriptOutput).
WithField("deadline", deadline).
Info("Executor initialized")
return &wfExecutor
}
Expand Down
30 changes: 4 additions & 26 deletions workflow/common/common.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
package common

import (
"time"

"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"

"github.com/argoproj/argo-workflows/v3/pkg/apis/workflow"
Expand All @@ -17,16 +15,6 @@ const (
InitContainerName = "init"
WaitContainerName = "wait"

// PodMetadataVolumeName is the volume name defined in a workflow pod spec to expose pod metadata via downward API
PodMetadataVolumeName = "podmetadata"

// PodMetadataAnnotationsVolumePath is volume path for metadata.annotations in the downward API
PodMetadataAnnotationsVolumePath = "annotations"
// PodMetadataMountPath is the directory mount location for DownwardAPI volume containing pod metadata
PodMetadataMountPath = "/argo/" + PodMetadataVolumeName
// PodMetadataAnnotationsPath is the file path containing pod metadata annotations. Examined by executor
PodMetadataAnnotationsPath = PodMetadataMountPath + "/" + PodMetadataAnnotationsVolumePath

// DockerSockVolumeName is the volume name for the /var/run/docker.sock host path volume
DockerSockVolumeName = "docker-sock"

Expand All @@ -39,14 +27,8 @@ const (
AnnotationKeyRBACRule = workflow.WorkflowFullName + "/rbac-rule"
AnnotationKeyRBACRulePrecedence = workflow.WorkflowFullName + "/rbac-rule-precedence"

// AnnotationKeyTemplate is the pod metadata annotation key containing the container template as JSON
AnnotationKeyTemplate = workflow.WorkflowFullName + "/template"
// AnnotationKeyOutputs is the pod metadata annotation key containing the container outputs
AnnotationKeyOutputs = workflow.WorkflowFullName + "/outputs"
// AnnotationKeyExecutionControl is the pod metadata annotation key containing execution control parameters
// set by the controller and obeyed by the executor. For example, the controller will use this annotation to
// signal the executors of daemoned containers that it should terminate.
AnnotationKeyExecutionControl = workflow.WorkflowFullName + "/execution"
// AnnotationKeyCronWfScheduledTime is the workflow metadata annotation key containing the time when the workflow
// was scheduled to run by CronWorkflow.
AnnotationKeyCronWfScheduledTime = workflow.WorkflowFullName + "/scheduled-time"
Expand Down Expand Up @@ -107,8 +89,12 @@ const (
EnvVarPodName = "ARGO_POD_NAME"
// EnvVarContainerName container the container's name for the current pod
EnvVarContainerName = "ARGO_CONTAINER_NAME"
// EnvVarDeadline is the deadline for the pod
EnvVarDeadline = "ARGO_DEADLINE"
// EnvVarIncludeScriptOutput capture the stdout and stderr
EnvVarIncludeScriptOutput = "ARGO_INCLUDE_SCRIPT_OUTPUT"
// EnvVarTemplate is the template
EnvVarTemplate = "ARGO_TEMPLATE"
// EnvVarContainerRuntimeExecutor contains the name of the container runtime executor to use, empty is equal to "docker"
EnvVarContainerRuntimeExecutor = "ARGO_CONTAINER_RUNTIME_EXECUTOR"
// EnvVarDownwardAPINodeIP is the envvar used to get the `status.hostIP`
Expand Down Expand Up @@ -186,14 +172,6 @@ var AnnotationKeyKillCmd = func(containerName string) string { return workflow.W
// GlobalVarWorkflowRootTags is a list of root tags in workflow which could be used for variable reference
var GlobalVarValidWorkflowVariablePrefix = []string{"item.", "steps.", "inputs.", "outputs.", "pod.", "workflow.", "tasks."}

// ExecutionControl contains execution control parameters for executor to decide how to execute the container
type ExecutionControl struct {
// Deadline is a max timestamp in which an executor can run the container before terminating it
// It is used to signal the executor to terminate a daemoned container. In the future it will be
// used to support workflow or steps/dag level timeouts.
Deadline *time.Time `json:"deadline,omitempty"`
}

func UnstructuredHasCompletedLabel(obj interface{}) bool {
if wf, ok := obj.(*unstructured.Unstructured); ok {
return wf.GetLabels()[LabelKeyCompleted] == "true"
Expand Down
31 changes: 0 additions & 31 deletions workflow/controller/container_set_template_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,15 +44,6 @@ spec:

socket := corev1.HostPathSocket
assert.ElementsMatch(t, []corev1.Volume{
{
Name: "podmetadata",
VolumeSource: corev1.VolumeSource{DownwardAPI: &corev1.DownwardAPIVolumeSource{
Items: []corev1.DownwardAPIVolumeFile{{
Path: "annotations",
FieldRef: &corev1.ObjectFieldSelector{APIVersion: "v1", FieldPath: "metadata.annotations"},
}},
}},
},
{Name: "docker-sock", VolumeSource: corev1.VolumeSource{HostPath: &corev1.HostPathVolumeSource{Path: "/var/run/docker.sock", Type: &socket}}},
{Name: "workspace", VolumeSource: corev1.VolumeSource{EmptyDir: &corev1.EmptyDirVolumeSource{}}},
}, pod.Spec.Volumes)
Expand All @@ -64,7 +55,6 @@ spec:
switch c.Name {
case common.WaitContainerName:
assert.ElementsMatch(t, []corev1.VolumeMount{
{Name: "podmetadata", MountPath: "/argo/podmetadata"},
{Name: "docker-sock", MountPath: "/var/run/docker.sock", ReadOnly: true},
}, c.VolumeMounts)
case "ctr-0":
Expand Down Expand Up @@ -120,15 +110,6 @@ spec:

socket := corev1.HostPathSocket
assert.ElementsMatch(t, []corev1.Volume{
{
Name: "podmetadata",
VolumeSource: corev1.VolumeSource{DownwardAPI: &corev1.DownwardAPIVolumeSource{
Items: []corev1.DownwardAPIVolumeFile{{
Path: "annotations",
FieldRef: &corev1.ObjectFieldSelector{APIVersion: "v1", FieldPath: "metadata.annotations"},
}},
}},
},
{Name: "docker-sock", VolumeSource: corev1.VolumeSource{HostPath: &corev1.HostPathVolumeSource{Path: "/var/run/docker.sock", Type: &socket}}},
{Name: "workspace", VolumeSource: corev1.VolumeSource{EmptyDir: &corev1.EmptyDirVolumeSource{}}},
{Name: "input-artifacts", VolumeSource: corev1.VolumeSource{EmptyDir: &corev1.EmptyDirVolumeSource{}}},
Expand All @@ -137,7 +118,6 @@ spec:
if assert.Len(t, pod.Spec.InitContainers, 1) {
c := pod.Spec.InitContainers[0]
assert.ElementsMatch(t, []corev1.VolumeMount{
{Name: "podmetadata", MountPath: "/argo/podmetadata"},
{Name: "input-artifacts", MountPath: "/argo/inputs/artifacts"},
{Name: "workspace", MountPath: "/mainctrfs/workspace"},
}, c.VolumeMounts)
Expand All @@ -148,7 +128,6 @@ spec:
switch c.Name {
case common.WaitContainerName:
assert.ElementsMatch(t, []corev1.VolumeMount{
{Name: "podmetadata", MountPath: "/argo/podmetadata"},
{Name: "docker-sock", MountPath: "/var/run/docker.sock", ReadOnly: true},
{Name: "workspace", MountPath: "/mainctrfs/workspace"},
{Name: "input-artifacts", MountPath: "/mainctrfs/in/in-0", SubPath: "in-0"},
Expand Down Expand Up @@ -207,15 +186,6 @@ spec:

socket := corev1.HostPathSocket
assert.ElementsMatch(t, []corev1.Volume{
{
Name: "podmetadata",
VolumeSource: corev1.VolumeSource{DownwardAPI: &corev1.DownwardAPIVolumeSource{
Items: []corev1.DownwardAPIVolumeFile{{
Path: "annotations",
FieldRef: &corev1.ObjectFieldSelector{APIVersion: "v1", FieldPath: "metadata.annotations"},
}},
}},
},
{Name: "docker-sock", VolumeSource: corev1.VolumeSource{HostPath: &corev1.HostPathVolumeSource{Path: "/var/run/docker.sock", Type: &socket}}},
{Name: "workspace", VolumeSource: corev1.VolumeSource{EmptyDir: &corev1.EmptyDirVolumeSource{}}},
}, pod.Spec.Volumes)
Expand All @@ -227,7 +197,6 @@ spec:
switch c.Name {
case common.WaitContainerName:
assert.ElementsMatch(t, []corev1.VolumeMount{
{Name: "podmetadata", MountPath: "/argo/podmetadata"},
{Name: "docker-sock", MountPath: "/var/run/docker.sock", ReadOnly: true},
{Name: "workspace", MountPath: "/mainctrfs/workspace"},
}, c.VolumeMounts)
Expand Down
35 changes: 31 additions & 4 deletions workflow/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -466,6 +466,24 @@ func (wfc *WorkflowController) processNextPodCleanupItem(ctx context.Context) bo
err := func() error {
pods := wfc.kubeclientset.CoreV1().Pods(namespace)
switch action {
case shutdownPod:
// to shutdown a pod, we signal the wait container to terminate, the wait container in turn will
// kill the main container (using whatever mechanism the executor uses), and will then exit itself
// once the main container exited
pod, err := wfc.getPod(namespace, podName)
if pod == nil || err != nil {
return err
}
for _, c := range pod.Spec.Containers {
if c.Name == common.WaitContainerName {
if err := signal.SignalContainer(wfc.restConfig, pod, common.WaitContainerName, syscall.SIGTERM); err != nil {
return err
}
return nil // done
}
}
// no wait container found
fallthrough
case terminateContainers:
if terminationGracePeriod, err := wfc.signalContainers(namespace, podName, syscall.SIGTERM); err != nil {
return err
Expand Down Expand Up @@ -508,18 +526,27 @@ func (wfc *WorkflowController) processNextPodCleanupItem(ctx context.Context) bo
return true
}

func (wfc *WorkflowController) signalContainers(namespace string, podName string, sig syscall.Signal) (time.Duration, error) {
func (wfc *WorkflowController) getPod(namespace string, podName string) (*apiv1.Pod, error) {
obj, exists, err := wfc.podInformer.GetStore().GetByKey(namespace + "/" + podName)
if err != nil {
return 0, err
return nil, err
}
if !exists {
return 0, nil
return nil, nil
}
pod, ok := obj.(*apiv1.Pod)
if !ok {
return 0, fmt.Errorf("object is not a pod")
return nil, fmt.Errorf("object is not a pod")
}
return pod, nil
}

func (wfc *WorkflowController) signalContainers(namespace string, podName string, sig syscall.Signal) (time.Duration, error) {
pod, err := wfc.getPod(namespace, podName)
if pod == nil || err != nil {
return 0, err
}

for _, c := range pod.Status.ContainerStatuses {
if c.Name == common.WaitContainerName || c.State.Terminated != nil {
continue
Expand Down
2 changes: 1 addition & 1 deletion workflow/controller/dag.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,7 @@ func (woc *wfOperationCtx) executeDAG(ctx context.Context, nodeName string, tmpl

defer func() {
if woc.wf.Status.Nodes[node.ID].Fulfilled() {
_ = woc.killDaemonedChildren(ctx, node.ID)
woc.killDaemonedChildren(node.ID)
}
}()

Expand Down
Loading