Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Execute exec/java script checks in containers #2585

Merged
merged 9 commits into from
May 5, 2017
Merged
10 changes: 8 additions & 2 deletions client/alloc_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,13 +186,19 @@ func (r *AllocRunner) RestoreState() error {
continue
}

if err := tr.RestoreState(); err != nil {
r.logger.Printf("[ERR] client: failed to restore state for alloc %s task '%s': %v", r.alloc.ID, name, err)
if restartReason, err := tr.RestoreState(); err != nil {
r.logger.Printf("[ERR] client: failed to restore state for alloc %s task %q: %v", r.alloc.ID, name, err)
mErr.Errors = append(mErr.Errors, err)
} else if !r.alloc.TerminalStatus() {
// Only start if the alloc isn't in a terminal status.
go tr.Run()

// Restart task runner if RestoreState gave a reason
if restartReason != "" {
tr.Restart("upgrade", restartReason)
}
}

}

return mErr.ErrorOrNil()
Expand Down
79 changes: 79 additions & 0 deletions client/alloc_runner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"io/ioutil"
"os"
"path/filepath"
"strings"
"testing"
"text/template"
"time"
Expand Down Expand Up @@ -476,6 +477,84 @@ func TestAllocRunner_SaveRestoreState_TerminalAlloc(t *testing.T) {
})
}

// TestAllocRunner_SaveRestoreState_Upgrade asserts that pre-0.6 exec tasks are
// restarted on upgrade.
func TestAllocRunner_SaveRestoreState_Upgrade(t *testing.T) {
alloc := mock.Alloc()
task := alloc.Job.TaskGroups[0].Tasks[0]
task.Driver = "mock_driver"
task.Config = map[string]interface{}{
"exit_code": "0",
"run_for": "10s",
}

upd, ar := testAllocRunnerFromAlloc(alloc, false)
// Hack in old version to cause an upgrade on RestoreState
origConfig := ar.config.Copy()
ar.config.Version = "0.5.6"
go ar.Run()

// Snapshot state
testutil.WaitForResult(func() (bool, error) {
return len(ar.tasks) == 1, nil
}, func(err error) {
t.Fatalf("task never started: %v", err)
})

err := ar.SaveState()
if err != nil {
t.Fatalf("err: %v", err)
}

// Create a new alloc runner
l2 := prefixedTestLogger("----- ar2: ")
ar2 := NewAllocRunner(l2, origConfig, upd.Update,
&structs.Allocation{ID: ar.alloc.ID}, ar.vaultClient,
ar.consulClient)
err = ar2.RestoreState()
if err != nil {
t.Fatalf("err: %v", err)
}
go ar2.Run()

testutil.WaitForResult(func() (bool, error) {
if len(ar2.tasks) != 1 {
return false, fmt.Errorf("Incorrect number of tasks")
}

if upd.Count < 3 {
return false, nil
}

for _, ev := range ar2.alloc.TaskStates["web"].Events {
if strings.HasSuffix(ev.RestartReason, pre06ScriptCheckReason) {
return true, nil
}
}
return false, fmt.Errorf("no restart with proper reason found")
}, func(err error) {
t.Fatalf("err: %v\nAllocs: %#v\nWeb State: %#v", err, upd.Allocs, ar2.alloc.TaskStates["web"])
})

// Destroy and wait
ar2.Destroy()
start := time.Now()

testutil.WaitForResult(func() (bool, error) {
alloc := ar2.Alloc()
if alloc.ClientStatus != structs.AllocClientStatusComplete {
return false, fmt.Errorf("Bad client status; got %v; want %v", alloc.ClientStatus, structs.AllocClientStatusComplete)
}
return true, nil
}, func(err error) {
t.Fatalf("err: %v %#v %#v", err, upd.Allocs[0], ar.alloc.TaskStates)
})

if time.Since(start) > time.Duration(testutil.TestMultiplier()*5)*time.Second {
t.Fatalf("took too long to terminate")
}
}

// Ensure pre-#2132 state files containing the Context struct are properly
// migrated to the new format.
//
Expand Down
7 changes: 6 additions & 1 deletion client/driver/exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,12 @@ func (h *execHandle) Update(task *structs.Task) error {
}

func (h *execHandle) Exec(ctx context.Context, cmd string, args []string) ([]byte, int, error) {
return execChroot(ctx, h.taskDir.Dir, cmd, args)
deadline, ok := ctx.Deadline()
if !ok {
// No deadline set on context; default to 1 minute
deadline = time.Now().Add(time.Minute)
}
return h.executor.Exec(deadline, cmd, args)
}

func (h *execHandle) Signal(s os.Signal) error {
Expand Down
53 changes: 47 additions & 6 deletions client/driver/exec_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,8 @@ func TestExecDriverUser(t *testing.T) {
}
}

// TestExecDriver_HandlerExec ensures the exec driver's handle properly executes commands inside the chroot.
// TestExecDriver_HandlerExec ensures the exec driver's handle properly
// executes commands inside the container.
func TestExecDriver_HandlerExec(t *testing.T) {
ctestutils.ExecCompatible(t)
task := &structs.Task{
Expand Down Expand Up @@ -315,20 +316,60 @@ func TestExecDriver_HandlerExec(t *testing.T) {
t.Fatalf("missing handle")
}

// Exec a command that should work
out, code, err := handle.Exec(context.TODO(), "/usr/bin/stat", []string{"/alloc"})
// Exec a command that should work and dump the environment
out, code, err := handle.Exec(context.Background(), "/bin/sh", []string{"-c", "env | grep NOMAD"})
if err != nil {
t.Fatalf("error exec'ing stat: %v", err)
}
if code != 0 {
t.Fatalf("expected `stat /alloc` to succeed but exit code was: %d", code)
}
if expected := 100; len(out) < expected {
t.Fatalf("expected at least %d bytes of output but found %d:\n%s", expected, len(out), out)

// Assert exec'd commands are run in a task-like environment
scriptEnv := make(map[string]string)
for _, line := range strings.Split(string(out), "\n") {
if line == "" {
continue
}
parts := strings.SplitN(string(line), "=", 2)
if len(parts) != 2 {
t.Fatalf("Invalid env var: %q", line)
}
scriptEnv[parts[0]] = parts[1]
}
if v, ok := scriptEnv["NOMAD_SECRETS_DIR"]; !ok || v != "/secrets" {
t.Errorf("Expected NOMAD_SECRETS_DIR=/secrets but found=%t value=%q", ok, v)
}
if v, ok := scriptEnv["NOMAD_ALLOC_ID"]; !ok || v != ctx.DriverCtx.allocID {
t.Errorf("Expected NOMAD_SECRETS_DIR=%q but found=%t value=%q", ok, v)
}

// Assert cgroup membership
out, code, err = handle.Exec(context.Background(), "/bin/cat", []string{"/proc/self/cgroup"})
if err != nil {
t.Fatalf("error exec'ing cat /proc/self/cgroup: %v", err)
}
if code != 0 {
t.Fatalf("expected `cat /proc/self/cgroup` to succeed but exit code was: %d", code)
}
found := false
for _, line := range strings.Split(string(out), "\n") {
// Every cgroup entry should be /nomad/$ALLOC_ID
if line == "" {
continue
}
if !strings.Contains(line, ":/nomad/") {
t.Errorf("Not a member of the alloc's cgroup: expected=...:/nomad/... -- found=%q", line)
continue
}
found = true
}
if !found {
t.Errorf("exec'd command isn't in the task's cgroup")
}

// Exec a command that should fail
out, code, err = handle.Exec(context.TODO(), "/usr/bin/stat", []string{"lkjhdsaflkjshowaisxmcvnlia"})
out, code, err = handle.Exec(context.Background(), "/usr/bin/stat", []string{"lkjhdsaflkjshowaisxmcvnlia"})
if err != nil {
t.Fatalf("error exec'ing stat: %v", err)
}
Expand Down
46 changes: 44 additions & 2 deletions client/driver/executor/executor.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package executor

import (
"context"
"fmt"
"io/ioutil"
"log"
Expand All @@ -15,6 +16,7 @@ import (
"syscall"
"time"

"github.com/armon/circbuf"
"github.com/hashicorp/go-multierror"
"github.com/mitchellh/go-ps"
"github.com/shirou/gopsutil/process"
Expand Down Expand Up @@ -57,6 +59,7 @@ type Executor interface {
Version() (*ExecutorVersion, error)
Stats() (*cstructs.TaskResourceUsage, error)
Signal(s os.Signal) error
Exec(deadline time.Time, cmd string, args []string) ([]byte, int, error)
}

// ExecutorContext holds context to configure the command user
Expand Down Expand Up @@ -203,8 +206,8 @@ func (e *UniversalExecutor) SetContext(ctx *ExecutorContext) error {
return nil
}

// LaunchCmd launches a process and returns it's state. It also configures an
// applies isolation on certain platforms.
// LaunchCmd launches the main process and returns its state. It also
// configures an applies isolation on certain platforms.
func (e *UniversalExecutor) LaunchCmd(command *ExecCommand) (*ProcessState, error) {
e.logger.Printf("[DEBUG] executor: launching command %v %v", command.Cmd, strings.Join(command.Args, " "))

Expand Down Expand Up @@ -283,6 +286,45 @@ func (e *UniversalExecutor) LaunchCmd(command *ExecCommand) (*ProcessState, erro
return &ProcessState{Pid: e.cmd.Process.Pid, ExitCode: -1, IsolationConfig: ic, Time: time.Now()}, nil
}

// Exec a command inside a container for exec and java drivers.
func (e *UniversalExecutor) Exec(deadline time.Time, name string, args []string) ([]byte, int, error) {
ctx, cancel := context.WithDeadline(context.Background(), deadline)
defer cancel()

name = e.ctx.TaskEnv.ReplaceEnv(name)
cmd := exec.CommandContext(ctx, name, e.ctx.TaskEnv.ParseAndReplace(args)...)

// Copy runtime environment from the main command
cmd.SysProcAttr = e.cmd.SysProcAttr
cmd.Dir = e.cmd.Dir
cmd.Env = e.ctx.TaskEnv.EnvList()

// Capture output
buf, _ := circbuf.NewBuffer(int64(dstructs.CheckBufSize))
cmd.Stdout = buf
cmd.Stderr = buf

if err := cmd.Run(); err != nil {
exitErr, ok := err.(*exec.ExitError)
if !ok {
// Non-exit error, return it and let the caller treat
// it as a critical failure
return nil, 0, err
}

// Some kind of error happened; default to critical
exitCode := 2
if status, ok := exitErr.Sys().(syscall.WaitStatus); ok {
exitCode = status.ExitStatus()
}

// Don't return the exitError as the caller only needs the
// output and code.
return buf.Bytes(), exitCode, nil
}
return buf.Bytes(), 0, nil
}

// configureLoggers sets up the standard out/error file rotators
func (e *UniversalExecutor) configureLoggers() error {
e.rotatorLock.Lock()
Expand Down
36 changes: 36 additions & 0 deletions client/driver/executor_plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"net/rpc"
"os"
"syscall"
"time"

"github.com/hashicorp/go-plugin"
"github.com/hashicorp/nomad/client/driver/executor"
Expand Down Expand Up @@ -33,6 +34,17 @@ type LaunchCmdArgs struct {
Cmd *executor.ExecCommand
}

type ExecCmdArgs struct {
Deadline time.Time
Name string
Args []string
}

type ExecCmdReturn struct {
Output []byte
Code int
}

func (e *ExecutorRPC) LaunchCmd(cmd *executor.ExecCommand) (*executor.ProcessState, error) {
var ps *executor.ProcessState
err := e.client.Call("Plugin.LaunchCmd", LaunchCmdArgs{Cmd: cmd}, &ps)
Expand Down Expand Up @@ -91,6 +103,20 @@ func (e *ExecutorRPC) Signal(s os.Signal) error {
return e.client.Call("Plugin.Signal", &s, new(interface{}))
}

func (e *ExecutorRPC) Exec(deadline time.Time, name string, args []string) ([]byte, int, error) {
req := ExecCmdArgs{
Deadline: deadline,
Name: name,
Args: args,
}
var resp *ExecCmdReturn
err := e.client.Call("Plugin.Exec", req, &resp)
if resp == nil {
return nil, 0, err
}
return resp.Output, resp.Code, err
}

type ExecutorRPCServer struct {
Impl executor.Executor
logger *log.Logger
Expand Down Expand Up @@ -165,6 +191,16 @@ func (e *ExecutorRPCServer) Signal(args os.Signal, resp *interface{}) error {
return e.Impl.Signal(args)
}

func (e *ExecutorRPCServer) Exec(args ExecCmdArgs, result *ExecCmdReturn) error {
out, code, err := e.Impl.Exec(args.Deadline, args.Name, args.Args)
ret := &ExecCmdReturn{
Output: out,
Code: code,
}
*result = *ret
return err
}

type ExecutorPlugin struct {
logger *log.Logger
Impl *ExecutorRPCServer
Expand Down
7 changes: 6 additions & 1 deletion client/driver/java.go
Original file line number Diff line number Diff line change
Expand Up @@ -390,7 +390,12 @@ func (h *javaHandle) Update(task *structs.Task) error {
}

func (h *javaHandle) Exec(ctx context.Context, cmd string, args []string) ([]byte, int, error) {
return execChroot(ctx, h.taskDir, cmd, args)
deadline, ok := ctx.Deadline()
if !ok {
// No deadline set on context; default to 1 minute
deadline = time.Now().Add(time.Minute)
}
return h.executor.Exec(deadline, cmd, args)
}

func (h *javaHandle) Signal(s os.Signal) error {
Expand Down
Loading