Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

driver: allow disabling log collection #6820

Merged
merged 2 commits into from
Dec 13, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 21 additions & 2 deletions client/allocrunner/taskrunner/logmon_hook.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/hashicorp/nomad/helper/uuid"
"github.com/hashicorp/nomad/nomad/structs"
bstructs "github.com/hashicorp/nomad/plugins/base/structs"
"github.com/hashicorp/nomad/plugins/drivers"
pstructs "github.com/hashicorp/nomad/plugins/shared/structs"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
Expand All @@ -28,6 +29,8 @@ const (

// logmonHook launches logmon and manages task logging
type logmonHook struct {
runner *TaskRunner

// logmon is the handle to the log monitor process for the task.
logmon logmon.LogMon
logmonPluginClient *plugin.Client
Expand All @@ -43,9 +46,10 @@ type logmonHookConfig struct {
stderrFifo string
}

func newLogMonHook(cfg *logmonHookConfig, logger hclog.Logger) *logmonHook {
func newLogMonHook(tr *TaskRunner, logger hclog.Logger) *logmonHook {
hook := &logmonHook{
config: cfg,
runner: tr,
config: tr.logmonHookConfig,
logger: logger,
}

Expand Down Expand Up @@ -99,6 +103,11 @@ func reattachConfigFromHookData(data map[string]string) (*plugin.ReattachConfig,
func (h *logmonHook) Prestart(ctx context.Context,
req *interfaces.TaskPrestartRequest, resp *interfaces.TaskPrestartResponse) error {

if h.isLoggingDisabled() {
h.logger.Debug("logging is disabled by driver")
return nil
}

attempts := 0
for {
err := h.prestartOneLoop(ctx, req)
Expand Down Expand Up @@ -130,6 +139,16 @@ func (h *logmonHook) Prestart(ctx context.Context,
}
}

func (h *logmonHook) isLoggingDisabled() bool {
ic, ok := h.runner.driver.(drivers.InternalCapabilitiesDriver)
if !ok {
return false
}

caps := ic.InternalCapabilities()
return caps.DisableLogCollection
}

func (h *logmonHook) prestartOneLoop(ctx context.Context, req *interfaces.TaskPrestartRequest) error {
// attach to a running logmon if state indicates one
if h.logmonPluginClient == nil {
Expand Down
3 changes: 2 additions & 1 deletion client/allocrunner/taskrunner/logmon_hook_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,8 @@ func TestTaskRunner_LogmonHook_StartStop(t *testing.T) {
}()

hookConf := newLogMonHookConfig(task.Name, dir)
hook := newLogMonHook(hookConf, testlog.HCLogger(t))
runner := &TaskRunner{logmonHookConfig: hookConf}
hook := newLogMonHook(runner, testlog.HCLogger(t))

req := interfaces.TaskPrestartRequest{
Task: task,
Expand Down
6 changes: 4 additions & 2 deletions client/allocrunner/taskrunner/logmon_hook_unix_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@ func TestTaskRunner_LogmonHook_StartCrashStop(t *testing.T) {
}()

hookConf := newLogMonHookConfig(task.Name, dir)
hook := newLogMonHook(hookConf, testlog.HCLogger(t))
runner := &TaskRunner{logmonHookConfig: hookConf}
hook := newLogMonHook(runner, testlog.HCLogger(t))

req := interfaces.TaskPrestartRequest{
Task: task,
Expand Down Expand Up @@ -104,7 +105,8 @@ func TestTaskRunner_LogmonHook_ShutdownMidStart(t *testing.T) {
}()

hookConf := newLogMonHookConfig(task.Name, dir)
hook := newLogMonHook(hookConf, testlog.HCLogger(t))
runner := &TaskRunner{logmonHookConfig: hookConf}
hook := newLogMonHook(runner, testlog.HCLogger(t))

req := interfaces.TaskPrestartRequest{
Task: task,
Expand Down
2 changes: 1 addition & 1 deletion client/allocrunner/taskrunner/task_runner_hooks.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func (tr *TaskRunner) initHooks() {
tr.runnerHooks = []interfaces.TaskHook{
newValidateHook(tr.clientConfig, hookLogger),
newTaskDirHook(tr, hookLogger),
newLogMonHook(tr.logmonHookConfig, hookLogger),
newLogMonHook(tr, hookLogger),
newDispatchHook(alloc, hookLogger),
newVolumeHook(tr, hookLogger),
newArtifactHook(tr, hookLogger),
Expand Down
31 changes: 22 additions & 9 deletions drivers/docker/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,10 @@ var (
hclspec.NewAttr("infra_image", "string", false),
hclspec.NewLiteral(`"gcr.io/google_containers/pause-amd64:3.0"`),
),

// disable_log_collection indicates whether docker driver should collect logs of docker
// task containers. If true, nomad doesn't start docker_logger/logmon processes
"disable_log_collection": hclspec.NewAttr("disable_log_collection", "bool", false),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would maybe document that this in code that this may change, unless we're committing to this being a driver option rather than eventually a client one.

})

// taskConfigSpec is the hcl specification for the driver config section of
Expand Down Expand Up @@ -549,15 +553,16 @@ type ContainerGCConfig struct {
}

type DriverConfig struct {
Endpoint string `codec:"endpoint"`
Auth AuthConfig `codec:"auth"`
TLS TLSConfig `codec:"tls"`
GC GCConfig `codec:"gc"`
Volumes VolumeConfig `codec:"volumes"`
AllowPrivileged bool `codec:"allow_privileged"`
AllowCaps []string `codec:"allow_caps"`
GPURuntimeName string `codec:"nvidia_runtime"`
InfraImage string `codec:"infra_image"`
Endpoint string `codec:"endpoint"`
Auth AuthConfig `codec:"auth"`
TLS TLSConfig `codec:"tls"`
GC GCConfig `codec:"gc"`
Volumes VolumeConfig `codec:"volumes"`
AllowPrivileged bool `codec:"allow_privileged"`
AllowCaps []string `codec:"allow_caps"`
GPURuntimeName string `codec:"nvidia_runtime"`
InfraImage string `codec:"infra_image"`
DisableLogCollection bool `codec:"disable_log_collection"`
}

type AuthConfig struct {
Expand Down Expand Up @@ -660,3 +665,11 @@ func (d *Driver) TaskConfigSchema() (*hclspec.Spec, error) {
func (d *Driver) Capabilities() (*drivers.Capabilities, error) {
return capabilities, nil
}

var _ drivers.InternalCapabilitiesDriver = (*Driver)(nil)

func (d *Driver) InternalCapabilities() drivers.InternalCapabilities {
return drivers.InternalCapabilities{
DisableLogCollection: d.config != nil && d.config.DisableLogCollection,
}
}
36 changes: 36 additions & 0 deletions drivers/docker/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"testing"

"github.com/hashicorp/nomad/helper/pluginutils/hclutils"
"github.com/hashicorp/nomad/plugins/drivers"
"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -488,3 +489,38 @@ func TestConfig_DriverConfig_DanglingContainers(t *testing.T) {
})
}
}

func TestConfig_InternalCapabilities(t *testing.T) {
cases := []struct {
name string
config string
expected drivers.InternalCapabilities
}{
{
name: "pure default",
config: `{}`,
expected: drivers.InternalCapabilities{},
},
{
name: "disabled",
config: `{ disable_log_collection = true }`,
expected: drivers.InternalCapabilities{DisableLogCollection: true},
},
{
name: "enabled explicitly",
config: `{ disable_log_collection = false }`,
expected: drivers.InternalCapabilities{},
},
}

for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
var tc DriverConfig
hclutils.NewConfigParser(configSpec).ParseHCL(t, "config "+c.config, &tc)

d := &Driver{config: &tc}
require.Equal(t, c.expected, d.InternalCapabilities())
})
}

}
51 changes: 31 additions & 20 deletions drivers/docker/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,23 +209,25 @@ func (d *Driver) RecoverTask(handle *drivers.TaskHandle) error {
net: handleState.DriverNetwork,
}

h.dlogger, h.dloggerPluginClient, err = d.reattachToDockerLogger(handleState.ReattachConfig)
if err != nil {
d.logger.Warn("failed to reattach to docker logger process", "error", err)

h.dlogger, h.dloggerPluginClient, err = d.setupNewDockerLogger(container, handle.Config, time.Now())
if !d.config.DisableLogCollection {
h.dlogger, h.dloggerPluginClient, err = d.reattachToDockerLogger(handleState.ReattachConfig)
if err != nil {
if err := client.StopContainer(handleState.ContainerID, 0); err != nil {
d.logger.Warn("failed to stop container during cleanup", "container_id", handleState.ContainerID, "error", err)
d.logger.Warn("failed to reattach to docker logger process", "error", err)

h.dlogger, h.dloggerPluginClient, err = d.setupNewDockerLogger(container, handle.Config, time.Now())
if err != nil {
if err := client.StopContainer(handleState.ContainerID, 0); err != nil {
d.logger.Warn("failed to stop container during cleanup", "container_id", handleState.ContainerID, "error", err)
}
return fmt.Errorf("failed to setup replacement docker logger: %v", err)
}
return fmt.Errorf("failed to setup replacement docker logger: %v", err)
}

if err := handle.SetDriverState(h.buildState()); err != nil {
if err := client.StopContainer(handleState.ContainerID, 0); err != nil {
d.logger.Warn("failed to stop container during cleanup", "container_id", handleState.ContainerID, "error", err)
if err := handle.SetDriverState(h.buildState()); err != nil {
if err := client.StopContainer(handleState.ContainerID, 0); err != nil {
d.logger.Warn("failed to stop container during cleanup", "container_id", handleState.ContainerID, "error", err)
}
return fmt.Errorf("failed to store driver state: %v", err)
}
return fmt.Errorf("failed to store driver state: %v", err)
}
}

Expand Down Expand Up @@ -334,11 +336,18 @@ CREATE:
container.ID, "container_state", container.State.String())
}

dlogger, pluginClient, err := d.setupNewDockerLogger(container, cfg, time.Unix(0, 0))
if err != nil {
d.logger.Error("an error occurred after container startup, terminating container", "container_id", container.ID)
client.RemoveContainer(docker.RemoveContainerOptions{ID: container.ID, Force: true})
return nil, nil, err
collectingLogs := !d.config.DisableLogCollection

var dlogger docklog.DockerLogger
var pluginClient *plugin.Client

if collectingLogs {
dlogger, pluginClient, err = d.setupNewDockerLogger(container, cfg, time.Unix(0, 0))
if err != nil {
d.logger.Error("an error occurred after container startup, terminating container", "container_id", container.ID)
client.RemoveContainer(docker.RemoveContainerOptions{ID: container.ID, Force: true})
return nil, nil, err
}
}

// Detect container address
Expand Down Expand Up @@ -368,8 +377,10 @@ CREATE:

if err := handle.SetDriverState(h.buildState()); err != nil {
d.logger.Error("error encoding container occurred after startup, terminating container", "container_id", container.ID, "error", err)
dlogger.Stop()
pluginClient.Kill()
if collectingLogs {
dlogger.Stop()
pluginClient.Kill()
}
client.RemoveContainer(docker.RemoveContainerOptions{ID: container.ID, Force: true})
return nil, nil, err
}
Expand Down
15 changes: 11 additions & 4 deletions drivers/docker/handle.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,11 +51,14 @@ type taskHandleState struct {
}

func (h *taskHandle) buildState() *taskHandleState {
return &taskHandleState{
ReattachConfig: pstructs.ReattachConfigFromGoPlugin(h.dloggerPluginClient.ReattachConfig()),
ContainerID: h.containerID,
DriverNetwork: h.net,
s := &taskHandleState{
ContainerID: h.containerID,
DriverNetwork: h.net,
}
if h.dloggerPluginClient != nil {
s.ReattachConfig = pstructs.ReattachConfigFromGoPlugin(h.dloggerPluginClient.ReattachConfig())
}
return s
}

func (h *taskHandle) Exec(ctx context.Context, cmd string, args []string) (*drivers.ExecTaskResult, error) {
Expand Down Expand Up @@ -171,6 +174,10 @@ func (h *taskHandle) Kill(killTimeout time.Duration, signal os.Signal) error {
}

func (h *taskHandle) shutdownLogger() {
if h.dlogger == nil {
return
}

if err := h.dlogger.Stop(); err != nil {
h.logger.Error("failed to stop docker logger process during StopTask",
"error", err, "logger_pid", h.dloggerPluginClient.ReattachConfig().Pid)
Expand Down
15 changes: 15 additions & 0 deletions plugins/drivers/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -527,3 +527,18 @@ type ExecTaskStream interface {

type ExecTaskStreamingRequestMsg = proto.ExecTaskStreamingRequest
type ExecTaskStreamingResponseMsg = proto.ExecTaskStreamingResponse

// InternalCapabilitiesDriver is an experimental interface enabling a driver
// to disable some nomad functionality (e.g. logs or metrics).
//
// Intended for internal drivers only while the interface is stabalized.
type InternalCapabilitiesDriver interface {
InternalCapabilities() InternalCapabilities
}

// InternalCapabilities flags disabled functionality.
// Zero value means all is supported.
type InternalCapabilities struct {
DisableLogCollection bool
DisableMetricsCollection bool
}