Skip to content

Commit

Permalink
Merge pull request #878 from hashicorp/b-kill-timeout-update
Browse files Browse the repository at this point in the history
client: Updating kill timeout adheres to operator specified maximum
  • Loading branch information
diptanu committed Mar 10, 2016
2 parents 4478238 + 3040c4d commit 0240334
Show file tree
Hide file tree
Showing 11 changed files with 228 additions and 182 deletions.
30 changes: 18 additions & 12 deletions client/driver/docker.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,11 +91,12 @@ func (c *DockerDriverConfig) Validate() error {
}

type dockerPID struct {
Version string
ImageID string
ContainerID string
KillTimeout time.Duration
PluginConfig *PluginReattachConfig
Version string
ImageID string
ContainerID string
KillTimeout time.Duration
MaxKillTimeout time.Duration
PluginConfig *PluginReattachConfig
}

type DockerHandle struct {
Expand All @@ -109,6 +110,7 @@ type DockerHandle struct {
containerID string
version string
killTimeout time.Duration
maxKillTimeout time.Duration
waitCh chan *cstructs.WaitResult
doneCh chan struct{}
}
Expand Down Expand Up @@ -624,6 +626,7 @@ func (d *DockerDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle
d.logger.Printf("[INFO] driver.docker: started container %s", container.ID)

// Return a driver handle
maxKill := d.DriverContext.config.MaxKillTimeout
h := &DockerHandle{
client: client,
logCollector: logCollector,
Expand All @@ -634,7 +637,8 @@ func (d *DockerDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle
imageID: dockerImage.ID,
containerID: container.ID,
version: d.config.Version,
killTimeout: d.DriverContext.KillTimeout(task),
killTimeout: GetKillTimeout(task.KillTimeout, maxKill),
maxKillTimeout: maxKill,
doneCh: make(chan struct{}),
waitCh: make(chan *cstructs.WaitResult, 1),
}
Expand Down Expand Up @@ -703,6 +707,7 @@ func (d *DockerDriver) Open(ctx *ExecContext, handleID string) (DriverHandle, er
containerID: pid.ContainerID,
version: pid.Version,
killTimeout: pid.KillTimeout,
maxKillTimeout: pid.MaxKillTimeout,
doneCh: make(chan struct{}),
waitCh: make(chan *cstructs.WaitResult, 1),
}
Expand All @@ -713,11 +718,12 @@ func (d *DockerDriver) Open(ctx *ExecContext, handleID string) (DriverHandle, er
func (h *DockerHandle) ID() string {
// Return a handle to the PID
pid := dockerPID{
Version: h.version,
ImageID: h.imageID,
ContainerID: h.containerID,
KillTimeout: h.killTimeout,
PluginConfig: NewPluginReattachConfig(h.pluginClient.ReattachConfig()),
Version: h.version,
ImageID: h.imageID,
ContainerID: h.containerID,
KillTimeout: h.killTimeout,
MaxKillTimeout: h.maxKillTimeout,
PluginConfig: NewPluginReattachConfig(h.pluginClient.ReattachConfig()),
}
data, err := json.Marshal(pid)
if err != nil {
Expand All @@ -736,7 +742,7 @@ func (h *DockerHandle) WaitCh() chan *cstructs.WaitResult {

func (h *DockerHandle) Update(task *structs.Task) error {
// Store the updated kill timeout.
h.killTimeout = task.KillTimeout
h.killTimeout = GetKillTimeout(task.KillTimeout, h.maxKillTimeout)
if err := h.logCollector.UpdateLogConfig(task.LogConfig); err != nil {
h.logger.Printf("[DEBUG] driver.docker: failed to update log config: %v", err)
}
Expand Down
19 changes: 10 additions & 9 deletions client/driver/docker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,18 +152,19 @@ func TestDockerDriver_Handle(t *testing.T) {
defer pluginClient.Kill()

h := &DockerHandle{
version: "version",
imageID: "imageid",
logCollector: logCollector,
pluginClient: pluginClient,
containerID: "containerid",
killTimeout: 5 * time.Nanosecond,
doneCh: make(chan struct{}),
waitCh: make(chan *cstructs.WaitResult, 1),
version: "version",
imageID: "imageid",
logCollector: logCollector,
pluginClient: pluginClient,
containerID: "containerid",
killTimeout: 5 * time.Nanosecond,
maxKillTimeout: 15 * time.Nanosecond,
doneCh: make(chan struct{}),
waitCh: make(chan *cstructs.WaitResult, 1),
}

actual := h.ID()
expected := fmt.Sprintf("DOCKER:{\"Version\":\"version\",\"ImageID\":\"imageid\",\"ContainerID\":\"containerid\",\"KillTimeout\":5,\"PluginConfig\":{\"Pid\":%d,\"AddrNet\":\"unix\",\"AddrName\":\"%s\"}}",
expected := fmt.Sprintf("DOCKER:{\"Version\":\"version\",\"ImageID\":\"imageid\",\"ContainerID\":\"containerid\",\"KillTimeout\":5,\"MaxKillTimeout\":15,\"PluginConfig\":{\"Pid\":%d,\"AddrNet\":\"unix\",\"AddrName\":\"%s\"}}",
pluginClient.ReattachConfig().Pid, pluginClient.ReattachConfig().Addr.String())
if actual != expected {
t.Errorf("Expected `%s`, found `%s`", expected, actual)
Expand Down
19 changes: 0 additions & 19 deletions client/driver/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"log"
"path/filepath"
"sync"
"time"

"github.com/hashicorp/nomad/client/allocdir"
"github.com/hashicorp/nomad/client/config"
Expand Down Expand Up @@ -84,24 +83,6 @@ func NewDriverContext(taskName string, config *config.Config, node *structs.Node
}
}

// KillTimeout returns the timeout that should be used for the task between
// signaling and killing the task.
func (d *DriverContext) KillTimeout(task *structs.Task) time.Duration {
max := d.config.MaxKillTimeout.Nanoseconds()
desired := task.KillTimeout.Nanoseconds()

// Make the minimum time between signal and kill, 1 second.
if desired == 0 {
desired = (1 * time.Second).Nanoseconds()
}

if desired < max {
return time.Duration(desired)
}

return d.config.MaxKillTimeout
}

// DriverHandle is an opaque handle into a driver used for task
// manipulation
type DriverHandle interface {
Expand Down
18 changes: 0 additions & 18 deletions client/driver/driver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,24 +66,6 @@ func testDriverContexts(task *structs.Task) (*DriverContext, *ExecContext) {
return driverCtx, execCtx
}

func TestDriver_KillTimeout(t *testing.T) {
expected := 1 * time.Second
task := &structs.Task{Name: "foo", KillTimeout: expected}
ctx, _ := testDriverContexts(task)
ctx.config.MaxKillTimeout = 10 * time.Second

if actual := ctx.KillTimeout(task); expected != actual {
t.Fatalf("KillTimeout(%v) returned %v; want %v", task, actual, expected)
}

expected = 10 * time.Second
task = &structs.Task{KillTimeout: 11 * time.Second}

if actual := ctx.KillTimeout(task); expected != actual {
t.Fatalf("KillTimeout(%v) returned %v; want %v", task, actual, expected)
}
}

func TestDriver_GetTaskEnv(t *testing.T) {
t.Parallel()
task := &structs.Task{
Expand Down
10 changes: 8 additions & 2 deletions client/driver/exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ type execHandle struct {
userPid int
allocDir *allocdir.AllocDir
killTimeout time.Duration
maxKillTimeout time.Duration
logger *log.Logger
waitCh chan *cstructs.WaitResult
doneCh chan struct{}
Expand Down Expand Up @@ -134,13 +135,15 @@ func (d *ExecDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle,
d.logger.Printf("[DEBUG] driver.exec: started process via plugin with pid: %v", ps.Pid)

// Return a driver handle
maxKill := d.DriverContext.config.MaxKillTimeout
h := &execHandle{
pluginClient: pluginClient,
userPid: ps.Pid,
executor: exec,
allocDir: ctx.AllocDir,
isolationConfig: ps.IsolationConfig,
killTimeout: d.DriverContext.KillTimeout(task),
killTimeout: GetKillTimeout(task.KillTimeout, maxKill),
maxKillTimeout: maxKill,
logger: d.logger,
version: d.config.Version,
doneCh: make(chan struct{}),
Expand All @@ -153,6 +156,7 @@ func (d *ExecDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle,
type execId struct {
Version string
KillTimeout time.Duration
MaxKillTimeout time.Duration
UserPid int
TaskDir string
AllocDir *allocdir.AllocDir
Expand Down Expand Up @@ -198,6 +202,7 @@ func (d *ExecDriver) Open(ctx *ExecContext, handleID string) (DriverHandle, erro
logger: d.logger,
version: id.Version,
killTimeout: id.KillTimeout,
maxKillTimeout: id.MaxKillTimeout,
doneCh: make(chan struct{}),
waitCh: make(chan *cstructs.WaitResult, 1),
}
Expand All @@ -209,6 +214,7 @@ func (h *execHandle) ID() string {
id := execId{
Version: h.version,
KillTimeout: h.killTimeout,
MaxKillTimeout: h.maxKillTimeout,
PluginConfig: NewPluginReattachConfig(h.pluginClient.ReattachConfig()),
UserPid: h.userPid,
AllocDir: h.allocDir,
Expand All @@ -228,7 +234,7 @@ func (h *execHandle) WaitCh() chan *cstructs.WaitResult {

func (h *execHandle) Update(task *structs.Task) error {
// Store the updated kill timeout.
h.killTimeout = task.KillTimeout
h.killTimeout = GetKillTimeout(task.KillTimeout, h.maxKillTimeout)
h.executor.UpdateLogConfig(task.LogConfig)

// Update is not possible
Expand Down
24 changes: 15 additions & 9 deletions client/driver/java.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,13 +47,14 @@ type javaHandle struct {
executor executor.Executor
isolationConfig *cstructs.IsolationConfig

taskDir string
allocDir *allocdir.AllocDir
killTimeout time.Duration
version string
logger *log.Logger
waitCh chan *cstructs.WaitResult
doneCh chan struct{}
taskDir string
allocDir *allocdir.AllocDir
killTimeout time.Duration
maxKillTimeout time.Duration
version string
logger *log.Logger
waitCh chan *cstructs.WaitResult
doneCh chan struct{}
}

// NewJavaDriver is used to create a new exec driver
Expand Down Expand Up @@ -182,14 +183,16 @@ func (d *JavaDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle,
d.logger.Printf("[DEBUG] driver.java: started process with pid: %v", ps.Pid)

// Return a driver handle
maxKill := d.DriverContext.config.MaxKillTimeout
h := &javaHandle{
pluginClient: pluginClient,
executor: exec,
userPid: ps.Pid,
isolationConfig: ps.IsolationConfig,
taskDir: taskDir,
allocDir: ctx.AllocDir,
killTimeout: d.DriverContext.KillTimeout(task),
killTimeout: GetKillTimeout(task.KillTimeout, maxKill),
maxKillTimeout: maxKill,
version: d.config.Version,
logger: d.logger,
doneCh: make(chan struct{}),
Expand All @@ -210,6 +213,7 @@ func (d *JavaDriver) cgroupsMounted(node *structs.Node) bool {
type javaId struct {
Version string
KillTimeout time.Duration
MaxKillTimeout time.Duration
PluginConfig *PluginReattachConfig
IsolationConfig *cstructs.IsolationConfig
TaskDir string
Expand Down Expand Up @@ -257,6 +261,7 @@ func (d *JavaDriver) Open(ctx *ExecContext, handleID string) (DriverHandle, erro
logger: d.logger,
version: id.Version,
killTimeout: id.KillTimeout,
maxKillTimeout: id.MaxKillTimeout,
doneCh: make(chan struct{}),
waitCh: make(chan *cstructs.WaitResult, 1),
}
Expand All @@ -269,6 +274,7 @@ func (h *javaHandle) ID() string {
id := javaId{
Version: h.version,
KillTimeout: h.killTimeout,
MaxKillTimeout: h.maxKillTimeout,
PluginConfig: NewPluginReattachConfig(h.pluginClient.ReattachConfig()),
UserPid: h.userPid,
TaskDir: h.taskDir,
Expand All @@ -289,7 +295,7 @@ func (h *javaHandle) WaitCh() chan *cstructs.WaitResult {

func (h *javaHandle) Update(task *structs.Task) error {
// Store the updated kill timeout.
h.killTimeout = task.KillTimeout
h.killTimeout = GetKillTimeout(task.KillTimeout, h.maxKillTimeout)
h.executor.UpdateLogConfig(task.LogConfig)

// Update is not possible
Expand Down
Loading

0 comments on commit 0240334

Please sign in to comment.