-
Notifications
You must be signed in to change notification settings - Fork 3.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(controller) Emissary executor. #4925
Merged
Merged
Changes from 10 commits
Commits
Show all changes
21 commits
Select commit
Hold shift + click to select a range
4c3a274
feat(executor): Remove need for watch
alexec 1e35dec
feat(executor): Remove need for watch
alexec f04aff6
feat(controller): Logs Kubernetes API requests (#5084)
alexec e0c5aef
feat: Add checker to ensure that env variable doc is up to date (#5091)
simster7 660e412
chore: More opinionated linting (#5072)
alexec ab867ac
Update workflow-executors.md
alexec 4fb48a7
Update docs/workflow-executors.md
alexec 7166f11
feat(executor): Remove need for watch
alexec 0905b51
feat(executor): Remove need for watch
alexec 8f259f7
Merge branch 'master' into emissary
alexec c837dd3
Merge branch 'master' into emissary
alexec 2e527ed
feat(executor): Remove need for watch
alexec d00eb93
Merge branch 'master' into emissary
alexec 0d67321
feat(executor): Remove need for watch
alexec daed6fd
feat(executor): Remove need for watch
alexec 5e347e4
feat(executor): Remove need for watch
alexec 4284449
feat(executor): Remove need for watch
alexec 7c609de
feat(executor): Remove need for watch
alexec fe4c3a2
Merge branch 'master' into emissary
alexec de3ebf4
feat(executor): Remove need for watch
alexec 00fb957
feat(executor): Remove need for watch
alexec File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,216 @@ | ||
package commands | ||
|
||
import ( | ||
"compress/gzip" | ||
"encoding/json" | ||
"fmt" | ||
"io" | ||
"io/ioutil" | ||
"os" | ||
"os/exec" | ||
"os/signal" | ||
"path/filepath" | ||
"strconv" | ||
"syscall" | ||
"time" | ||
|
||
"github.com/spf13/cobra" | ||
|
||
wfv1 "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1" | ||
"github.com/argoproj/argo-workflows/v3/util/archive" | ||
"github.com/argoproj/argo-workflows/v3/workflow/common" | ||
"github.com/argoproj/argo-workflows/v3/workflow/util/path" | ||
) | ||
|
||
var ( | ||
varArgo = "/var/run/argo" | ||
containerName = os.Getenv("ARGO_CONTAINER_NAME") | ||
template = &wfv1.Template{} | ||
) | ||
|
||
func NewEmissaryCommand() *cobra.Command { | ||
return &cobra.Command{ | ||
Use: "emissary", | ||
SilenceUsage: true, // this prevents confusing usage message being printed when we SIGTERM | ||
RunE: func(cmd *cobra.Command, args []string) error { | ||
exitCode := 64 | ||
|
||
defer func() { | ||
err := ioutil.WriteFile(varArgo+"/ctr/"+containerName+"/exitcode", []byte(strconv.Itoa(exitCode)), 0600) | ||
if err != nil { | ||
println(fmt.Errorf("failed to write exit code: %w", err)) | ||
} | ||
}() | ||
|
||
// this also indicates we've started | ||
if err := os.MkdirAll(varArgo+"/ctr/"+containerName, 0700); err != nil { | ||
return fmt.Errorf("failed to create ctr directory: %w", err) | ||
} | ||
|
||
name, args := args[0], args[1:] | ||
|
||
signals := make(chan os.Signal, 1) | ||
defer close(signals) | ||
signal.Notify(signals) | ||
defer signal.Reset() | ||
go func() { | ||
for s := range signals { | ||
if s != syscall.SIGCHLD { | ||
_ = syscall.Kill(-os.Getpid(), s.(syscall.Signal)) | ||
} | ||
} | ||
}() | ||
|
||
data, err := ioutil.ReadFile(varArgo + "/template") | ||
if err != nil { | ||
return fmt.Errorf("failed to read template: %w", err) | ||
} | ||
|
||
if err := json.Unmarshal(data, template); err != nil { | ||
return fmt.Errorf("failed to unmarshal template: %w", err) | ||
} | ||
|
||
name, err = path.Search(name) | ||
if err != nil { | ||
return fmt.Errorf("failed to find name in PATH: %w", err) | ||
} | ||
|
||
command := exec.Command(name, args...) | ||
command.Env = os.Environ() | ||
command.SysProcAttr = &syscall.SysProcAttr{Setpgid: true} | ||
|
||
stdout, err := os.Create(varArgo + "/ctr/" + containerName + "/stdout") | ||
if err != nil { | ||
return fmt.Errorf("failed to open stdout: %w", err) | ||
} | ||
defer func() { _ = stdout.Close() }() | ||
command.Stdout = io.MultiWriter(os.Stdout, stdout) | ||
|
||
stderr, err := os.Create(varArgo + "/ctr/" + containerName + "/stderr") | ||
if err != nil { | ||
return fmt.Errorf("failed to open stderr: %w", err) | ||
} | ||
defer func() { _ = stderr.Close() }() | ||
command.Stderr = io.MultiWriter(os.Stderr, stderr) | ||
|
||
if err := command.Start(); err != nil { | ||
return err | ||
} | ||
|
||
go func() { | ||
for { | ||
data, _ := ioutil.ReadFile(varArgo + "/ctr/" + containerName + "/signal") | ||
_ = os.Remove(varArgo + "/ctr/" + containerName + "/signal") | ||
s, _ := strconv.Atoi(string(data)) | ||
if s > 0 { | ||
_ = syscall.Kill(command.Process.Pid, syscall.Signal(s)) | ||
} | ||
time.Sleep(2 * time.Second) | ||
} | ||
}() | ||
|
||
cmdErr := command.Wait() | ||
|
||
if cmdErr == nil { | ||
exitCode = 0 | ||
} else if exitError, ok := cmdErr.(*exec.ExitError); ok { | ||
if exitError.ExitCode() >= 0 { | ||
exitCode = exitError.ExitCode() | ||
} else { | ||
exitCode = 137 // SIGTERM | ||
} | ||
} | ||
|
||
if err := stderr.Close(); err != nil { | ||
return fmt.Errorf("failed to close stderr: %w", err) | ||
} | ||
if err := stdout.Close(); err != nil { | ||
return fmt.Errorf("failed to close stdout: %w", err) | ||
} | ||
|
||
if containerName == common.MainContainerName { | ||
for _, x := range template.Outputs.Parameters { | ||
if x.ValueFrom != nil && x.ValueFrom.Path != "" { | ||
if err := saveParameter(x.ValueFrom.Path); err != nil { | ||
return err | ||
} | ||
} | ||
} | ||
for _, x := range template.Outputs.Artifacts { | ||
if x.Path != "" { | ||
if err := saveArtifact(x.Path); err != nil { | ||
return err | ||
} | ||
} | ||
} | ||
} else { | ||
println("not saving outputs - not main container") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this will be mingled with the containers logs, it might be good to make it conventional, e.g.
|
||
} | ||
|
||
return cmdErr // this is the error returned from cmd.Wait(), which maybe an exitError | ||
}, | ||
} | ||
} | ||
|
||
func saveArtifact(srcPath string) error { | ||
if common.FindOverlappingVolume(template, srcPath) != nil { | ||
println("no need to save artifact - on overlapping volume", srcPath) | ||
return nil | ||
} | ||
if _, err := os.Stat(srcPath); os.IsNotExist(err) { // might be optional, so we ignore | ||
println("cannot save artifact", srcPath, err) | ||
return nil | ||
} | ||
dstPath := varArgo + "/outputs/artifacts/" + srcPath + ".tgz" | ||
println(srcPath, "->", dstPath) | ||
z := filepath.Dir(dstPath) | ||
if err := os.MkdirAll(z, 0700); err != nil { // chmod rwx------ | ||
return fmt.Errorf("failed to create directory %s: %w", z, err) | ||
} | ||
dst, err := os.Create(dstPath) | ||
if err != nil { | ||
return fmt.Errorf("failed to create destination %s: %w", dstPath, err) | ||
} | ||
defer func() { _ = dst.Close() }() | ||
if err = archive.TarGzToWriter(srcPath, gzip.DefaultCompression, dst); err != nil { | ||
return fmt.Errorf("failed to tarball the output %s to %s: %w", srcPath, dstPath, err) | ||
} | ||
if err = dst.Close(); err != nil { | ||
return fmt.Errorf("failed to close %s: %w", dstPath, err) | ||
} | ||
return nil | ||
} | ||
|
||
func saveParameter(srcPath string) error { | ||
if common.FindOverlappingVolume(template, srcPath) != nil { | ||
println("no need to save parameter - on overlapping volume", srcPath) | ||
return nil | ||
} | ||
src, err := os.Open(srcPath) | ||
if os.IsNotExist(err) { // might be optional, so we ignore | ||
println("cannot save parameter", srcPath, err) | ||
return nil | ||
} | ||
if err != nil { | ||
return fmt.Errorf("failed to open %s: %w", srcPath, err) | ||
} | ||
defer func() { _ = src.Close() }() | ||
dstPath := varArgo + "/outputs/parameters/" + srcPath | ||
println(srcPath, "->", dstPath) | ||
z := filepath.Dir(dstPath) | ||
if err := os.MkdirAll(z, 0700); err != nil { // chmod rwx------ | ||
return fmt.Errorf("failed to create directory %s: %w", z, err) | ||
} | ||
dst, err := os.Create(dstPath) | ||
if err != nil { | ||
return fmt.Errorf("failed to create %s: %w", srcPath, err) | ||
} | ||
defer func() { _ = dst.Close() }() | ||
if _, err = io.Copy(dst, src); err != nil { | ||
return fmt.Errorf("failed to copy %s to %s: %w", srcPath, dstPath, err) | ||
} | ||
if err = dst.Close(); err != nil { | ||
return fmt.Errorf("failed to close %s: %w", dstPath, err) | ||
} | ||
return nil | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,118 @@ | ||
package commands | ||
|
||
import ( | ||
"io/ioutil" | ||
"os" | ||
"os/exec" | ||
"path/filepath" | ||
"strconv" | ||
"sync" | ||
"syscall" | ||
"testing" | ||
"time" | ||
|
||
"github.com/stretchr/testify/assert" | ||
) | ||
|
||
func TestEmissary(t *testing.T) { | ||
tmp, err := ioutil.TempDir("", "") | ||
assert.NoError(t, err) | ||
|
||
varArgo = tmp | ||
|
||
wd, err := os.Getwd() | ||
assert.NoError(t, err) | ||
|
||
x := filepath.Join(wd, "../../../dist/argosay") | ||
|
||
err = ioutil.WriteFile(varArgo+"/template", []byte(`{}`), 0600) | ||
assert.NoError(t, err) | ||
|
||
t.Run("Exit0", func(t *testing.T) { | ||
err := run(x, []string{"exit"}) | ||
assert.NoError(t, err) | ||
data, err := ioutil.ReadFile(varArgo + "/ctr/main/exitcode") | ||
assert.NoError(t, err) | ||
assert.Equal(t, "0", string(data)) | ||
}) | ||
t.Run("Exit1", func(t *testing.T) { | ||
err := run(x, []string{"exit", "1"}) | ||
assert.Equal(t, 1, err.(*exec.ExitError).ExitCode()) | ||
data, err := ioutil.ReadFile(varArgo + "/ctr/main/exitcode") | ||
assert.NoError(t, err) | ||
assert.Equal(t, "1", string(data)) | ||
}) | ||
t.Run("Stdout", func(t *testing.T) { | ||
err := run(x, []string{"echo", "hello", "/dev/stdout"}) | ||
assert.NoError(t, err) | ||
data, err := ioutil.ReadFile(varArgo + "/ctr/main/stdout") | ||
assert.NoError(t, err) | ||
assert.Equal(t, "hello", string(data)) | ||
}) | ||
t.Run("Stderr", func(t *testing.T) { | ||
err := run(x, []string{"echo", "hello", "/dev/stderr"}) | ||
assert.NoError(t, err) | ||
data, err := ioutil.ReadFile(varArgo + "/ctr/main/stderr") | ||
assert.NoError(t, err) | ||
assert.Equal(t, "hello", string(data)) | ||
}) | ||
t.Run("Signal", func(t *testing.T) { | ||
for signal, message := range map[syscall.Signal]string{ | ||
syscall.SIGTERM: "terminated", | ||
syscall.SIGKILL: "killed", | ||
} { | ||
err := ioutil.WriteFile(varArgo+"/ctr/main/signal", []byte(strconv.Itoa(int(signal))), 0600) | ||
assert.NoError(t, err) | ||
var wg sync.WaitGroup | ||
wg.Add(1) | ||
go func() { | ||
defer wg.Done() | ||
err := run(x, []string{"sleep", "5s"}) | ||
assert.EqualError(t, err, "signal: "+message) | ||
}() | ||
time.Sleep(time.Second) | ||
} | ||
}) | ||
t.Run("Artifact", func(t *testing.T) { | ||
err = ioutil.WriteFile(varArgo+"/template", []byte(` | ||
{ | ||
"outputs": { | ||
"artifacts": [ | ||
{"path": "/tmp/artifact"} | ||
] | ||
} | ||
} | ||
`), 0600) | ||
assert.NoError(t, err) | ||
err := run(x, []string{"echo", "hello", "/tmp/artifact"}) | ||
assert.NoError(t, err) | ||
data, err := ioutil.ReadFile(varArgo + "/outputs/artifacts/tmp/artifact.tgz") | ||
assert.NoError(t, err) | ||
assert.NotEmpty(t, string(data)) // data is tgz format | ||
}) | ||
t.Run("Parameter", func(t *testing.T) { | ||
err = ioutil.WriteFile(varArgo+"/template", []byte(` | ||
{ | ||
"outputs": { | ||
"parameters": [ | ||
{ | ||
"valueFrom": {"path": "/tmp/parameter"} | ||
} | ||
] | ||
} | ||
} | ||
`), 0600) | ||
assert.NoError(t, err) | ||
err := run(x, []string{"echo", "hello", "/tmp/parameter"}) | ||
assert.NoError(t, err) | ||
data, err := ioutil.ReadFile(varArgo + "/outputs/parameters/tmp/parameter") | ||
assert.NoError(t, err) | ||
assert.Equal(t, "hello", string(data)) | ||
}) | ||
} | ||
|
||
func run(name string, args []string) error { | ||
cmd := NewEmissaryCommand() | ||
containerName = "main" | ||
return cmd.RunE(cmd, append([]string{name}, args...)) | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm aware we only need to capture stdout for (a) main containers and (b) if there our outputs - v1.1
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
could use
ioutil.Discard
for that