Skip to content

Commit

Permalink
Merge pull request #5616 from hashicorp/b-retry-logmon-start-errs-2
Browse files Browse the repository at this point in the history
logmon: recover from shutting down call locally
  • Loading branch information
Mahmood Ali authored Apr 26, 2019
2 parents 81c7c46 + a321901 commit 1497b8e
Show file tree
Hide file tree
Showing 4 changed files with 137 additions and 11 deletions.
47 changes: 39 additions & 8 deletions client/allocrunner/taskrunner/logmon_hook.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,18 @@ import (
"fmt"
"path/filepath"
"runtime"
"time"

hclog "github.com/hashicorp/go-hclog"
plugin "github.com/hashicorp/go-plugin"
"github.com/hashicorp/nomad/client/allocrunner/interfaces"
"github.com/hashicorp/nomad/client/logmon"
"github.com/hashicorp/nomad/helper/uuid"
"github.com/hashicorp/nomad/nomad/structs"
bstructs "github.com/hashicorp/nomad/plugins/base/structs"
pstructs "github.com/hashicorp/nomad/plugins/shared/structs"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
)

const (
Expand Down Expand Up @@ -95,7 +99,39 @@ func reattachConfigFromHookData(data map[string]string) (*plugin.ReattachConfig,
func (h *logmonHook) Prestart(ctx context.Context,
req *interfaces.TaskPrestartRequest, resp *interfaces.TaskPrestartResponse) error {

// Attempt to reattach to logmon
attempts := 0
for {
err := h.prestartOneLoop(ctx, req)
if err == bstructs.ErrPluginShutdown || grpc.Code(err) == codes.Unavailable {
h.logger.Warn("logmon shutdown while making request", "error", err)

if attempts > 3 {
h.logger.Warn("logmon shutdown while making request; giving up", "attempts", attempts, "error", err)
return err
}

// retry after killing process and ensure we start a new logmon process
attempts++
h.logger.Warn("logmon shutdown while making request; retrying", "attempts", attempts, "error", err)
h.logmonPluginClient.Kill()
time.Sleep(1 * time.Second)
continue
} else if err != nil {
return err
}

rCfg := pstructs.ReattachConfigFromGoPlugin(h.logmonPluginClient.ReattachConfig())
jsonCfg, err := json.Marshal(rCfg)
if err != nil {
return err
}
resp.State = map[string]string{logmonReattachKey: string(jsonCfg)}
return nil
}
}

func (h *logmonHook) prestartOneLoop(ctx context.Context, req *interfaces.TaskPrestartRequest) error {
// attach to a running logmon if state indicates one
if h.logmonPluginClient == nil {
reattachConfig, err := reattachConfigFromHookData(req.PreviousState)
if err != nil {
Expand All @@ -105,12 +141,13 @@ func (h *logmonHook) Prestart(ctx context.Context,
if reattachConfig != nil {
if err := h.launchLogMon(reattachConfig); err != nil {
h.logger.Warn("failed to reattach to logmon process", "error", err)
// if we failed to launch logmon, try again below
}
}

}

// We did not reattach to a plugin and one is still not running.
// create a new client in initial starts, failed reattachment, or if we detect exits
if h.logmonPluginClient == nil || h.logmonPluginClient.Exited() {
if err := h.launchLogMon(nil); err != nil {
// Retry errors launching logmon as logmon may have crashed on start and
Expand All @@ -134,12 +171,6 @@ func (h *logmonHook) Prestart(ctx context.Context,
return err
}

rCfg := pstructs.ReattachConfigFromGoPlugin(h.logmonPluginClient.ReattachConfig())
jsonCfg, err := json.Marshal(rCfg)
if err != nil {
return err
}
resp.State = map[string]string{logmonReattachKey: string(jsonCfg)}
return nil
}

Expand Down
88 changes: 88 additions & 0 deletions client/allocrunner/taskrunner/logmon_hook_unix_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,13 @@ import (
"os"
"syscall"
"testing"
"time"

"github.com/hashicorp/nomad/client/allocrunner/interfaces"
"github.com/hashicorp/nomad/helper/testlog"
"github.com/hashicorp/nomad/nomad/mock"
"github.com/hashicorp/nomad/testutil"
"github.com/shirou/gopsutil/process"
"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -86,3 +88,89 @@ func TestTaskRunner_LogmonHook_StartCrashStop(t *testing.T) {
// Running stop should shutdown logmon
require.NoError(t, hook.Stop(context.Background(), nil, nil))
}

// TestTaskRunner_LogmonHook_ShutdownMidStart simulates logmon crashing while the
// Nomad client is calling Start() and asserts that we recover and spawn a new logmon.
func TestTaskRunner_LogmonHook_ShutdownMidStart(t *testing.T) {
t.Parallel()

alloc := mock.BatchAlloc()
task := alloc.Job.TaskGroups[0].Tasks[0]

dir, err := ioutil.TempDir("", "nomadtest")
require.NoError(t, err)
defer func() {
require.NoError(t, os.RemoveAll(dir))
}()

hookConf := newLogMonHookConfig(task.Name, dir)
hook := newLogMonHook(hookConf, testlog.HCLogger(t))

req := interfaces.TaskPrestartRequest{
Task: task,
}
resp := interfaces.TaskPrestartResponse{}

// First start
require.NoError(t, hook.Prestart(context.Background(), &req, &resp))
defer hook.Stop(context.Background(), nil, nil)

origState := resp.State
origHookData := resp.State[logmonReattachKey]
require.NotEmpty(t, origHookData)

// Pluck PID out of reattach synthesize a crash
reattach := struct {
Pid int
}{}
require.NoError(t, json.Unmarshal([]byte(origHookData), &reattach))
pid := reattach.Pid
require.NotZero(t, pid)

proc, err := process.NewProcess(int32(pid))
require.NoError(t, err)

// Assert logmon is running
require.NoError(t, proc.SendSignal(syscall.Signal(0)))

// SIGSTOP would freeze process without it being considered
// exited; so this causes process to be non-exited at beginning of call
// then we kill process while Start call is running
require.NoError(t, proc.SendSignal(syscall.SIGSTOP))
testutil.WaitForResult(func() (bool, error) {
status, err := proc.Status()
if err != nil {
return false, err
}

if status != "T" && status != "T+" {
return false, fmt.Errorf("process is not asleep yet: %v", status)
}

return true, nil
}, func(err error) {
require.NoError(t, err)
})

go func() {
time.Sleep(2 * time.Second)

proc.SendSignal(syscall.SIGCONT)
proc.Kill()
}()

req.PreviousState = map[string]string{
logmonReattachKey: origHookData,
}

initLogmon, initClient := hook.logmon, hook.logmonPluginClient

resp = interfaces.TaskPrestartResponse{}
err = hook.Prestart(context.Background(), &req, &resp)
require.NoError(t, err)
require.NotEqual(t, origState, resp.State)

// assert that we got a new client and logmon
require.True(t, initLogmon != hook.logmon)
require.True(t, initClient != hook.logmonPluginClient)
}
8 changes: 6 additions & 2 deletions client/logmon/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,14 @@ import (
"context"

"github.com/hashicorp/nomad/client/logmon/proto"
"github.com/hashicorp/nomad/helper/pluginutils/grpcutils"
)

type logmonClient struct {
client proto.LogMonClient

// doneCtx is closed when the plugin exits
doneCtx context.Context
}

func (c *logmonClient) Start(cfg *LogConfig) error {
Expand All @@ -21,11 +25,11 @@ func (c *logmonClient) Start(cfg *LogConfig) error {
StderrFifo: cfg.StderrFifo,
}
_, err := c.client.Start(context.Background(), req)
return err
return grpcutils.HandleGrpcErr(err, c.doneCtx)
}

func (c *logmonClient) Stop() error {
req := &proto.StopRequest{}
_, err := c.client.Stop(context.Background(), req)
return err
return grpcutils.HandleGrpcErr(err, c.doneCtx)
}
5 changes: 4 additions & 1 deletion client/logmon/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,5 +73,8 @@ func (p *Plugin) GRPCServer(broker *plugin.GRPCBroker, s *grpc.Server) error {
}

func (p *Plugin) GRPCClient(ctx context.Context, broker *plugin.GRPCBroker, c *grpc.ClientConn) (interface{}, error) {
return &logmonClient{client: proto.NewLogMonClient(c)}, nil
return &logmonClient{
doneCtx: ctx,
client: proto.NewLogMonClient(c),
}, nil
}

0 comments on commit 1497b8e

Please sign in to comment.