Skip to content


feat(executor): Remove need for watch
Browse files Browse the repository at this point in the history
Signed-off-by: Alex Collins <[email protected]>
  • Loading branch information
alexec committed Feb 12, 2021
1 parent 2f7c908 commit bbbae71
Show file tree
Hide file tree
Showing 24 changed files with 764 additions and 55 deletions.
5 changes: 4 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -375,7 +375,7 @@ endif

# for local we have a faster target that prints to stdout, does not use json, and can cache because it has no coverage
.PHONY: test
test: server/static/files.go
test: server/static/files.go dist/argosay
env KUBECONFIG=/dev/null $(GOTEST) ./...

.PHONY: install
Expand Down Expand Up @@ -407,6 +407,9 @@ endif
test/e2e/images/argosay/v2/argosay: test/e2e/images/argosay/v2/main/argosay.go
cd test/e2e/images/argosay/v2 && GOOS=linux CGO_ENABLED=0 go build -ldflags '-w -s' -o argosay ./main

dist/argosay: test/e2e/images/argosay/v2/main/argosay.go
go build -ldflags '-w -s' -o dist/argosay ./test/e2e/images/argosay/v2/main

.PHONY: test-images
$(call docker_pull,argoproj/argosay:v1)
Expand Down
2 changes: 1 addition & 1 deletion cmd/argo/commands/submit.go
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,7 @@ func waitWatchOrLog(ctx context.Context, serviceClient workflowpkg.WorkflowServi
if cliSubmitOpts.log {
for _, workflow := range workflowNames {
logWorkflow(ctx, serviceClient, namespace, workflow, "", &corev1.PodLogOptions{
Container: "main",
Container: common.MainContainerName,
Follow: true,
Previous: false,
Expand Down
216 changes: 216 additions & 0 deletions cmd/argoexec/commands/emissary.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,216 @@
package commands

import (


wfv1 ""

var (
varArgo = "/var/run/argo"
containerName = os.Getenv("ARGO_CONTAINER_NAME")
template = &wfv1.Template{}

func NewEmissaryCommand() *cobra.Command {
return &cobra.Command{
Use: "emissary",
SilenceUsage: true, // this prevents confusing usage message being printed when we SIGTERM
RunE: func(cmd *cobra.Command, args []string) error {
exitCode := 64

defer func() {
err := ioutil.WriteFile(varArgo+"/ctr/"+containerName+"/exitcode", []byte(strconv.Itoa(exitCode)), 0600)
if err != nil {
println(fmt.Errorf("failed to write exit code: %w", err))

// this also indicates we've started
if err := os.MkdirAll(varArgo+"/ctr/"+containerName, 0700); err != nil {
return fmt.Errorf("failed to create ctr directory: %w", err)

name, args := args[0], args[1:]

signals := make(chan os.Signal, 1)
defer close(signals)
defer signal.Reset()
go func() {
for s := range signals {
if s != syscall.SIGCHLD {
_ = syscall.Kill(-os.Getpid(), s.(syscall.Signal))

data, err := ioutil.ReadFile(varArgo + "/template")
if err != nil {
return fmt.Errorf("failed to read template: %w", err)

if err := json.Unmarshal(data, template); err != nil {
return fmt.Errorf("failed to unmarshal template: %w", err)

name, err = path.Search(name)
if err != nil {
return fmt.Errorf("failed to find name in PATH: %w", err)

command := exec.Command(name, args...)
command.Env = os.Environ()
command.SysProcAttr = &syscall.SysProcAttr{Setpgid: true}

stdout, err := os.Create(varArgo + "/ctr/" + containerName + "/stdout")
if err != nil {
return fmt.Errorf("failed to open stdout: %w", err)
defer func() { _ = stdout.Close() }()
command.Stdout = io.MultiWriter(os.Stdout, stdout)

stderr, err := os.Create(varArgo + "/ctr/" + containerName + "/stderr")
if err != nil {
return fmt.Errorf("failed to open stderr: %w", err)
defer func() { _ = stderr.Close() }()
command.Stderr = io.MultiWriter(os.Stderr, stderr)

if err := command.Start(); err != nil {
return err

go func() {
for {
data, _ := ioutil.ReadFile(varArgo + "/ctr/" + containerName + "/signal")
_ = os.Remove(varArgo + "/ctr/" + containerName + "/signal")
s, _ := strconv.Atoi(string(data))
if s > 0 {
_ = syscall.Kill(command.Process.Pid, syscall.Signal(s))
time.Sleep(2 * time.Second)

cmdErr := command.Wait()

if cmdErr == nil {
exitCode = 0
} else if exitError, ok := cmdErr.(*exec.ExitError); ok {
if exitError.ExitCode() >= 0 {
exitCode = exitError.ExitCode()
} else {
exitCode = 137 // SIGTERM

if err := stderr.Close(); err != nil {
return fmt.Errorf("failed to close stderr: %w", err)
if err := stdout.Close(); err != nil {
return fmt.Errorf("failed to close stdout: %w", err)

if containerName == common.MainContainerName {
for _, x := range template.Outputs.Parameters {
if x.ValueFrom != nil && x.ValueFrom.Path != "" {
if err := saveParameter(x.ValueFrom.Path); err != nil {
return err
for _, x := range template.Outputs.Artifacts {
if x.Path != "" {
if err := saveArtifact(x.Path); err != nil {
return err
} else {
println("not saving outputs - not main container")

return cmdErr // this is the error returned from cmd.Wait(), which maybe an exitError

func saveArtifact(srcPath string) error {
if common.FindOverlappingVolume(template, srcPath) != nil {
println("no need to save artifact - on overlapping volume", srcPath)
return nil
if _, err := os.Stat(srcPath); os.IsNotExist(err) { // might be optional, so we ignore
println("cannot save artifact", srcPath, err)
return nil
dstPath := varArgo + "/outputs/artifacts/" + srcPath + ".tgz"
println(srcPath, "->", dstPath)
z := filepath.Dir(dstPath)
if err := os.MkdirAll(z, 0700); err != nil { // chmod rwx------
return fmt.Errorf("failed to create directory %s: %w", z, err)
dst, err := os.Create(dstPath)
if err != nil {
return fmt.Errorf("failed to create destination %s: %w", dstPath, err)
defer func() { _ = dst.Close() }()
if err = archive.TarGzToWriter(srcPath, gzip.DefaultCompression, dst); err != nil {
return fmt.Errorf("failed to tarball the output %s to %s: %w", srcPath, dstPath, err)
if err = dst.Close(); err != nil {
return fmt.Errorf("failed to close %s: %w", dstPath, err)
return nil

func saveParameter(srcPath string) error {
if common.FindOverlappingVolume(template, srcPath) != nil {
println("no need to save parameter - on overlapping volume", srcPath)
return nil
src, err := os.Open(srcPath)
if os.IsNotExist(err) { // might be optional, so we ignore
println("cannot save parameter", srcPath, err)
return nil
if err != nil {
return fmt.Errorf("failed to open %s: %w", srcPath, err)
defer func() { _ = src.Close() }()
dstPath := varArgo + "/outputs/parameters/" + srcPath
println(srcPath, "->", dstPath)
z := filepath.Dir(dstPath)
if err := os.MkdirAll(z, 0700); err != nil { // chmod rwx------
return fmt.Errorf("failed to create directory %s: %w", z, err)
dst, err := os.Create(dstPath)
if err != nil {
return fmt.Errorf("failed to create %s: %w", srcPath, err)
defer func() { _ = dst.Close() }()
if _, err = io.Copy(dst, src); err != nil {
return fmt.Errorf("failed to copy %s to %s: %w", srcPath, dstPath, err)
if err = dst.Close(); err != nil {
return fmt.Errorf("failed to close %s: %w", dstPath, err)
return nil
118 changes: 118 additions & 0 deletions cmd/argoexec/commands/emissary_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
package commands

import (


func TestEmissary(t *testing.T) {
tmp, err := ioutil.TempDir("", "")
assert.NoError(t, err)

varArgo = tmp

wd, err := os.Getwd()
assert.NoError(t, err)

x := filepath.Join(wd, "../../../dist/argosay")

err = ioutil.WriteFile(varArgo+"/template", []byte(`{}`), 0600)
assert.NoError(t, err)

t.Run("Exit0", func(t *testing.T) {
err := run(x, []string{"exit"})
assert.NoError(t, err)
data, err := ioutil.ReadFile(varArgo + "/ctr/main/exitcode")
assert.NoError(t, err)
assert.Equal(t, "0", string(data))
t.Run("Exit1", func(t *testing.T) {
err := run(x, []string{"exit", "1"})
assert.Equal(t, 1, err.(*exec.ExitError).ExitCode())
data, err := ioutil.ReadFile(varArgo + "/ctr/main/exitcode")
assert.NoError(t, err)
assert.Equal(t, "1", string(data))
t.Run("Stdout", func(t *testing.T) {
err := run(x, []string{"echo", "hello", "/dev/stdout"})
assert.NoError(t, err)
data, err := ioutil.ReadFile(varArgo + "/ctr/main/stdout")
assert.NoError(t, err)
assert.Equal(t, "hello", string(data))
t.Run("Stderr", func(t *testing.T) {
err := run(x, []string{"echo", "hello", "/dev/stderr"})
assert.NoError(t, err)
data, err := ioutil.ReadFile(varArgo + "/ctr/main/stderr")
assert.NoError(t, err)
assert.Equal(t, "hello", string(data))
t.Run("Signal", func(t *testing.T) {
for signal, message := range map[syscall.Signal]string{
syscall.SIGTERM: "terminated",
syscall.SIGKILL: "killed",
} {
err := ioutil.WriteFile(varArgo+"/ctr/main/signal", []byte(strconv.Itoa(int(signal))), 0600)
assert.NoError(t, err)
var wg sync.WaitGroup
go func() {
defer wg.Done()
err := run(x, []string{"sleep", "5s"})
assert.EqualError(t, err, "signal: "+message)
t.Run("Artifact", func(t *testing.T) {
err = ioutil.WriteFile(varArgo+"/template", []byte(`
"outputs": {
"artifacts": [
{"path": "/tmp/artifact"}
`), 0600)
assert.NoError(t, err)
err := run(x, []string{"echo", "hello", "/tmp/artifact"})
assert.NoError(t, err)
data, err := ioutil.ReadFile(varArgo + "/outputs/artifacts/tmp/artifact.tgz")
assert.NoError(t, err)
assert.NotEmpty(t, string(data)) // data is tgz format
t.Run("Parameter", func(t *testing.T) {
err = ioutil.WriteFile(varArgo+"/template", []byte(`
"outputs": {
"parameters": [
"valueFrom": {"path": "/tmp/parameter"}
`), 0600)
assert.NoError(t, err)
err := run(x, []string{"echo", "hello", "/tmp/parameter"})
assert.NoError(t, err)
data, err := ioutil.ReadFile(varArgo + "/outputs/parameters/tmp/parameter")
assert.NoError(t, err)
assert.Equal(t, "hello", string(data))

func run(name string, args []string) error {
cmd := NewEmissaryCommand()
containerName = "main"
return cmd.RunE(cmd, append([]string{name}, args...))
4 changes: 4 additions & 0 deletions cmd/argoexec/commands/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,10 @@ func loadArtifacts(ctx context.Context) error {
defer wfExecutor.HandleError(ctx)
defer stats.LogStats()

if err := wfExecutor.Init(); err != nil {
return err
// Download input artifacts
err := wfExecutor.StageFiles()
if err != nil {
Expand Down

0 comments on commit bbbae71

Please sign in to comment.