Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

User specifiable kill timeout and operator configurable max #624

Merged
merged 3 commits into from
Jan 4, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions api/tasks.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ type Task struct {
Services []Service
Resources *Resources
Meta map[string]string
KillTimeout time.Duration
}

// NewTask creates and initializes a new Task.
Expand Down
6 changes: 6 additions & 0 deletions client/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"io"
"strconv"
"strings"
"time"

"github.com/hashicorp/nomad/nomad/structs"
)
Expand Down Expand Up @@ -41,6 +42,11 @@ type Config struct {
// be determined dynamically.
NetworkSpeed int

// MaxKillTimeout allows capping the user-specifiable KillTimeout. If the
// task's KillTimeout is greater than the MaxKillTimeout, MaxKillTimeout is
// used.
MaxKillTimeout time.Duration

// Servers is a list of known server addresses. These are as "host:port"
Servers []string

Expand Down
10 changes: 8 additions & 2 deletions client/driver/docker.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"strconv"
"strings"
"sync"
"time"

docker "github.com/fsouza/go-dockerclient"

Expand Down Expand Up @@ -67,6 +68,7 @@ func (c *DockerDriverConfig) Validate() error {
type dockerPID struct {
ImageID string
ContainerID string
KillTimeout time.Duration
}

type DockerHandle struct {
Expand All @@ -76,6 +78,7 @@ type DockerHandle struct {
cleanupImage bool
imageID string
containerID string
killTimeout time.Duration
waitCh chan *cstructs.WaitResult
doneCh chan struct{}
}
Expand Down Expand Up @@ -502,6 +505,7 @@ func (d *DockerDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle
logger: d.logger,
imageID: dockerImage.ID,
containerID: container.ID,
killTimeout: d.DriverContext.KillTimeout(task),
doneCh: make(chan struct{}),
waitCh: make(chan *cstructs.WaitResult, 1),
}
Expand Down Expand Up @@ -555,6 +559,7 @@ func (d *DockerDriver) Open(ctx *ExecContext, handleID string) (DriverHandle, er
logger: d.logger,
imageID: pid.ImageID,
containerID: pid.ContainerID,
killTimeout: pid.KillTimeout,
doneCh: make(chan struct{}),
waitCh: make(chan *cstructs.WaitResult, 1),
}
Expand All @@ -567,6 +572,7 @@ func (h *DockerHandle) ID() string {
pid := dockerPID{
ImageID: h.imageID,
ContainerID: h.containerID,
KillTimeout: h.killTimeout,
}
data, err := json.Marshal(pid)
if err != nil {
Expand All @@ -588,10 +594,10 @@ func (h *DockerHandle) Update(task *structs.Task) error {
return nil
}

// Kill is used to terminate the task. This uses docker stop -t 5
// Kill is used to terminate the task. This uses `docker stop -t killTimeout`
func (h *DockerHandle) Kill() error {
// Stop the container
err := h.client.StopContainer(h.containerID, 5)
err := h.client.StopContainer(h.containerID, uint(h.killTimeout.Seconds()))
if err != nil {
h.logger.Printf("[ERR] driver.docker: failed to stop container %s", h.containerID)
return fmt.Errorf("Failed to stop container %s: %s", h.containerID, err)
Expand Down
3 changes: 2 additions & 1 deletion client/driver/docker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,12 +123,13 @@ func TestDockerDriver_Handle(t *testing.T) {
h := &DockerHandle{
imageID: "imageid",
containerID: "containerid",
killTimeout: 5 * time.Nanosecond,
doneCh: make(chan struct{}),
waitCh: make(chan *cstructs.WaitResult, 1),
}

actual := h.ID()
expected := `DOCKER:{"ImageID":"imageid","ContainerID":"containerid"}`
expected := `DOCKER:{"ImageID":"imageid","ContainerID":"containerid","KillTimeout":5}`
if actual != expected {
t.Errorf("Expected `%s`, found `%s`", expected, actual)
}
Expand Down
13 changes: 13 additions & 0 deletions client/driver/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"log"
"path/filepath"
"sync"
"time"

"github.com/hashicorp/nomad/client/allocdir"
"github.com/hashicorp/nomad/client/config"
Expand Down Expand Up @@ -80,6 +81,18 @@ func NewDriverContext(taskName string, config *config.Config, node *structs.Node
}
}

// KillTimeout returns the timeout that should be used for the task between
// signaling and killing the task.
func (d *DriverContext) KillTimeout(task *structs.Task) time.Duration {
max := d.config.MaxKillTimeout.Nanoseconds()
desired := task.KillTimeout.Nanoseconds()
if desired < max {
return task.KillTimeout
}

return d.config.MaxKillTimeout
}

// DriverHandle is an opaque handle into a driver used for task
// manipulation
type DriverHandle interface {
Expand Down
19 changes: 19 additions & 0 deletions client/driver/driver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"path/filepath"
"reflect"
"testing"
"time"

"github.com/hashicorp/nomad/client/allocdir"
"github.com/hashicorp/nomad/client/config"
Expand Down Expand Up @@ -60,6 +61,24 @@ func testDriverExecContext(task *structs.Task, driverCtx *DriverContext) *ExecCo
return ctx
}

func TestDriver_KillTimeout(t *testing.T) {
ctx := testDriverContext("foo")
ctx.config.MaxKillTimeout = 10 * time.Second
expected := 1 * time.Second
task := &structs.Task{KillTimeout: expected}

if actual := ctx.KillTimeout(task); expected != actual {
t.Fatalf("KillTimeout(%v) returned %v; want %v", task, actual, expected)
}

expected = 10 * time.Second
task = &structs.Task{KillTimeout: 11 * time.Second}

if actual := ctx.KillTimeout(task); expected != actual {
t.Fatalf("KillTimeout(%v) returned %v; want %v", task, actual, expected)
}
}

func TestDriver_TaskEnvironmentVariables(t *testing.T) {
t.Parallel()
ctx := &ExecContext{}
Expand Down
55 changes: 41 additions & 14 deletions client/driver/exec.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
package driver

import (
"encoding/json"
"fmt"
"log"
"path/filepath"
"runtime"
"syscall"
Expand Down Expand Up @@ -32,9 +34,11 @@ type ExecDriverConfig struct {

// execHandle is returned from Start/Open as a handle to the PID
type execHandle struct {
cmd executor.Executor
waitCh chan *cstructs.WaitResult
doneCh chan struct{}
cmd executor.Executor
killTimeout time.Duration
logger *log.Logger
waitCh chan *cstructs.WaitResult
doneCh chan struct{}
}

// NewExecDriver is used to create a new exec driver
Expand Down Expand Up @@ -110,34 +114,57 @@ func (d *ExecDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle,

// Return a driver handle
h := &execHandle{
cmd: cmd,
doneCh: make(chan struct{}),
waitCh: make(chan *cstructs.WaitResult, 1),
cmd: cmd,
killTimeout: d.DriverContext.KillTimeout(task),
logger: d.logger,
doneCh: make(chan struct{}),
waitCh: make(chan *cstructs.WaitResult, 1),
}
go h.run()
return h, nil
}

type execId struct {
ExecutorId string
KillTimeout time.Duration
}

func (d *ExecDriver) Open(ctx *ExecContext, handleID string) (DriverHandle, error) {
id := &execId{}
if err := json.Unmarshal([]byte(handleID), id); err != nil {
return nil, fmt.Errorf("Failed to parse handle '%s': %v", handleID, err)
}

// Find the process
cmd, err := executor.OpenId(handleID)
cmd, err := executor.OpenId(id.ExecutorId)
if err != nil {
return nil, fmt.Errorf("failed to open ID %v: %v", handleID, err)
return nil, fmt.Errorf("failed to open ID %v: %v", id.ExecutorId, err)
}

// Return a driver handle
h := &execHandle{
cmd: cmd,
doneCh: make(chan struct{}),
waitCh: make(chan *cstructs.WaitResult, 1),
cmd: cmd,
logger: d.logger,
killTimeout: id.KillTimeout,
doneCh: make(chan struct{}),
waitCh: make(chan *cstructs.WaitResult, 1),
}
go h.run()
return h, nil
}

func (h *execHandle) ID() string {
id, _ := h.cmd.ID()
return id
executorId, _ := h.cmd.ID()
id := execId{
ExecutorId: executorId,
KillTimeout: h.killTimeout,
}

data, err := json.Marshal(id)
if err != nil {
h.logger.Printf("[ERR] driver.exec: failed to marshal ID to JSON: %s", err)
}
return string(data)
}

func (h *execHandle) WaitCh() chan *cstructs.WaitResult {
Expand All @@ -154,7 +181,7 @@ func (h *execHandle) Kill() error {
select {
case <-h.doneCh:
return nil
case <-time.After(5 * time.Second):
case <-time.After(h.killTimeout):
return h.cmd.ForceStop()
}
}
Expand Down
55 changes: 41 additions & 14 deletions client/driver/java.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@ package driver

import (
"bytes"
"encoding/json"
"fmt"
"log"
"os/exec"
"path/filepath"
"runtime"
Expand Down Expand Up @@ -36,9 +38,11 @@ type JavaDriverConfig struct {

// javaHandle is returned from Start/Open as a handle to the PID
type javaHandle struct {
cmd executor.Executor
waitCh chan *cstructs.WaitResult
doneCh chan struct{}
cmd executor.Executor
killTimeout time.Duration
logger *log.Logger
waitCh chan *cstructs.WaitResult
doneCh chan struct{}
}

// NewJavaDriver is used to create a new exec driver
Expand Down Expand Up @@ -158,36 +162,59 @@ func (d *JavaDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle,

// Return a driver handle
h := &javaHandle{
cmd: cmd,
doneCh: make(chan struct{}),
waitCh: make(chan *cstructs.WaitResult, 1),
cmd: cmd,
killTimeout: d.DriverContext.KillTimeout(task),
logger: d.logger,
doneCh: make(chan struct{}),
waitCh: make(chan *cstructs.WaitResult, 1),
}

go h.run()
return h, nil
}

type javaId struct {
ExecutorId string
KillTimeout time.Duration
}

func (d *JavaDriver) Open(ctx *ExecContext, handleID string) (DriverHandle, error) {
id := &javaId{}
if err := json.Unmarshal([]byte(handleID), id); err != nil {
return nil, fmt.Errorf("Failed to parse handle '%s': %v", handleID, err)
}

// Find the process
cmd, err := executor.OpenId(handleID)
cmd, err := executor.OpenId(id.ExecutorId)
if err != nil {
return nil, fmt.Errorf("failed to open ID %v: %v", handleID, err)
return nil, fmt.Errorf("failed to open ID %v: %v", id.ExecutorId, err)
}

// Return a driver handle
h := &javaHandle{
cmd: cmd,
doneCh: make(chan struct{}),
waitCh: make(chan *cstructs.WaitResult, 1),
cmd: cmd,
logger: d.logger,
killTimeout: id.KillTimeout,
doneCh: make(chan struct{}),
waitCh: make(chan *cstructs.WaitResult, 1),
}

go h.run()
return h, nil
}

func (h *javaHandle) ID() string {
id, _ := h.cmd.ID()
return id
executorId, _ := h.cmd.ID()
id := javaId{
ExecutorId: executorId,
KillTimeout: h.killTimeout,
}

data, err := json.Marshal(id)
if err != nil {
h.logger.Printf("[ERR] driver.java: failed to marshal ID to JSON: %s", err)
}
return string(data)
}

func (h *javaHandle) WaitCh() chan *cstructs.WaitResult {
Expand All @@ -204,7 +231,7 @@ func (h *javaHandle) Kill() error {
select {
case <-h.doneCh:
return nil
case <-time.After(5 * time.Second):
case <-time.After(h.killTimeout):
return h.cmd.ForceStop()
}
}
Expand Down
Loading