Skip to content

Commit

Permalink
logs: allow disabling log collection in jobspec
Browse files Browse the repository at this point in the history
Some Nomad users ship application logs out-of-band via syslog. For these users
having `logmon` (and `docker_logger`) running is unnecessary overhead. Allow
disabling the logmon and pointing the task's stdout/stderr to /dev/null.

This changeset is the first of several incremental improvements to log
collection short of full-on logging plugins. The next step will likely be to
extend the internal-only task driver configuration so that cluster
administrators can turn off log collection for the entire driver.

---

Fixes: #11175
Co-authored-By: Thomas Weber <[email protected]>
  • Loading branch information
tgross and towe75 committed Apr 24, 2023
1 parent b4e6a70 commit 7cc983b
Show file tree
Hide file tree
Showing 22 changed files with 261 additions and 27 deletions.
3 changes: 3 additions & 0 deletions .changelog/16962.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:improvement
jobspec: Added option for disabling task log collection in the `logs` block
```
9 changes: 7 additions & 2 deletions api/tasks.go
Original file line number Diff line number Diff line change
Expand Up @@ -639,14 +639,16 @@ func (g *TaskGroup) AddSpread(s *Spread) *TaskGroup {

// LogConfig provides configuration for log rotation
type LogConfig struct {
MaxFiles *int `mapstructure:"max_files" hcl:"max_files,optional"`
MaxFileSizeMB *int `mapstructure:"max_file_size" hcl:"max_file_size,optional"`
MaxFiles *int `mapstructure:"max_files" hcl:"max_files,optional"`
MaxFileSizeMB *int `mapstructure:"max_file_size" hcl:"max_file_size,optional"`
Enabled *bool `mapstructure:"enabled" hcl:"enabled,optional"`
}

func DefaultLogConfig() *LogConfig {
return &LogConfig{
MaxFiles: pointerOf(10),
MaxFileSizeMB: pointerOf(10),
Enabled: pointerOf(true),
}
}

Expand All @@ -657,6 +659,9 @@ func (l *LogConfig) Canonicalize() {
if l.MaxFileSizeMB == nil {
l.MaxFileSizeMB = pointerOf(10)
}
if l.Enabled == nil {
l.Enabled = pointerOf(true)
}
}

// DispatchPayloadConfig configures how a task gets its input from a job dispatch
Expand Down
41 changes: 33 additions & 8 deletions client/allocrunner/taskrunner/logmon_hook.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"context"
"encoding/json"
"fmt"
"os"
"path/filepath"
"runtime"
"time"
Expand Down Expand Up @@ -45,6 +46,7 @@ type logmonHook struct {

type logmonHookConfig struct {
logDir string
enabled bool
stdoutFifo string
stderrFifo string
}
Expand All @@ -59,10 +61,19 @@ func newLogMonHook(tr *TaskRunner, logger hclog.Logger) *logmonHook {
return hook
}

func newLogMonHookConfig(taskName, logDir string) *logmonHookConfig {
func newLogMonHookConfig(taskName string, logCfg *structs.LogConfig, logDir string) *logmonHookConfig {
cfg := &logmonHookConfig{
logDir: logDir,
logDir: logDir,
enabled: logCfg.Enabled,
}

// If logging is disabled configure task's stdout/err to point to devnull
if !logCfg.Enabled {
cfg.stdoutFifo = os.DevNull
cfg.stderrFifo = os.DevNull
return cfg
}

if runtime.GOOS == "windows" {
id := uuid.Generate()[:8]
cfg.stdoutFifo = fmt.Sprintf("//./pipe/%s-%s.stdout", taskName, id)
Expand Down Expand Up @@ -105,9 +116,7 @@ func reattachConfigFromHookData(data map[string]string) (*plugin.ReattachConfig,

func (h *logmonHook) Prestart(ctx context.Context,
req *interfaces.TaskPrestartRequest, resp *interfaces.TaskPrestartResponse) error {

if h.isLoggingDisabled() {
h.logger.Debug("logging is disabled by driver")
if !h.isLoggingEnabled() {
return nil
}

Expand Down Expand Up @@ -142,14 +151,27 @@ func (h *logmonHook) Prestart(ctx context.Context,
}
}

func (h *logmonHook) isLoggingDisabled() bool {
func (h *logmonHook) isLoggingEnabled() bool {
if !h.config.enabled {
h.logger.Debug("log collection is disabled by task")
return false
}

// internal plugins have access to a capability to disable logging and
// metrics via a private interface; this is an "experimental" interface and
// currently only the docker driver exposes this to users.
ic, ok := h.runner.driver.(drivers.InternalCapabilitiesDriver)
if !ok {
return false
return true
}

caps := ic.InternalCapabilities()
return caps.DisableLogCollection
if caps.DisableLogCollection {
h.logger.Debug("log collection is disabled by driver")
return false
}

return true
}

func (h *logmonHook) prestartOneLoop(ctx context.Context, req *interfaces.TaskPrestartRequest) error {
Expand Down Expand Up @@ -197,6 +219,9 @@ func (h *logmonHook) prestartOneLoop(ctx context.Context, req *interfaces.TaskPr
}

func (h *logmonHook) Stop(_ context.Context, req *interfaces.TaskStopRequest, _ *interfaces.TaskStopResponse) error {
if !h.isLoggingEnabled() {
return nil
}

// It's possible that Stop was called without calling Prestart on agent
// restarts. Attempt to reattach to an existing logmon.
Expand Down
52 changes: 51 additions & 1 deletion client/allocrunner/taskrunner/logmon_hook_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/hashicorp/nomad/helper/testlog"
"github.com/hashicorp/nomad/nomad/mock"
pstructs "github.com/hashicorp/nomad/plugins/shared/structs"
"github.com/shoenig/test/must"
"github.com/stretchr/testify/require"
"golang.org/x/exp/maps"
)
Expand Down Expand Up @@ -69,7 +70,7 @@ func TestTaskRunner_LogmonHook_StartStop(t *testing.T) {

dir := t.TempDir()

hookConf := newLogMonHookConfig(task.Name, dir)
hookConf := newLogMonHookConfig(task.Name, task.LogConfig, dir)
runner := &TaskRunner{logmonHookConfig: hookConf}
hook := newLogMonHook(runner, testlog.HCLogger(t))

Expand Down Expand Up @@ -103,3 +104,52 @@ func TestTaskRunner_LogmonHook_StartStop(t *testing.T) {
}
require.NoError(t, hook.Stop(context.Background(), &stopReq, nil))
}

// TestTaskRunner_LogmonHook_Disabled asserts that no logmon running or expected
// by any of the lifecycle hooks.
func TestTaskRunner_LogmonHook_Disabled(t *testing.T) {
ci.Parallel(t)

alloc := mock.BatchAlloc()
task := alloc.Job.TaskGroups[0].Tasks[0]
task.LogConfig.Enabled = false

dir := t.TempDir()

hookConf := newLogMonHookConfig(task.Name, task.LogConfig, dir)
runner := &TaskRunner{logmonHookConfig: hookConf}
hook := newLogMonHook(runner, testlog.HCLogger(t))

req := interfaces.TaskPrestartRequest{Task: task}
resp := interfaces.TaskPrestartResponse{}

// First prestart should not set reattach key and never be Done.
must.NoError(t, hook.Prestart(context.Background(), &req, &resp))
t.Cleanup(func() { hook.Stop(context.Background(), nil, nil) })

must.False(t, resp.Done)
hookData, ok := resp.State[logmonReattachKey]
must.False(t, ok)
must.Eq(t, "", hookData)

// Running prestart again should still be a noop
req.PreviousState = map[string]string{}
must.NoError(t, hook.Prestart(context.Background(), &req, &resp))

must.False(t, resp.Done)
hookData, ok = resp.State[logmonReattachKey]
must.False(t, ok)
must.Eq(t, "", hookData)

// PreviousState should always be initialized by the caller, but just
// belt-and-suspenders for this test to ensure we can't panic on this code
// path
req.PreviousState = nil
must.NoError(t, hook.Prestart(context.Background(), &req, &resp))

// Running stop should not error even with no running logmon
stopReq := interfaces.TaskStopRequest{
ExistingState: maps.Clone(resp.State),
}
must.NoError(t, hook.Stop(context.Background(), &stopReq, nil))
}
4 changes: 2 additions & 2 deletions client/allocrunner/taskrunner/logmon_hook_unix_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func TestTaskRunner_LogmonHook_StartCrashStop(t *testing.T) {

dir := t.TempDir()

hookConf := newLogMonHookConfig(task.Name, dir)
hookConf := newLogMonHookConfig(task.Name, task.LogConfig, dir)
runner := &TaskRunner{logmonHookConfig: hookConf}
hook := newLogMonHook(runner, testlog.HCLogger(t))

Expand Down Expand Up @@ -100,7 +100,7 @@ func TestTaskRunner_LogmonHook_ShutdownMidStart(t *testing.T) {

dir := t.TempDir()

hookConf := newLogMonHookConfig(task.Name, dir)
hookConf := newLogMonHookConfig(task.Name, task.LogConfig, dir)
runner := &TaskRunner{logmonHookConfig: hookConf}
hook := newLogMonHook(runner, testlog.HCLogger(t))

Expand Down
2 changes: 1 addition & 1 deletion client/allocrunner/taskrunner/task_runner_hooks.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func (tr *TaskRunner) initHooks() {
hookLogger := tr.logger.Named("task_hook")
task := tr.Task()

tr.logmonHookConfig = newLogMonHookConfig(task.Name, tr.taskDir.LogDir)
tr.logmonHookConfig = newLogMonHookConfig(task.Name, task.LogConfig, tr.taskDir.LogDir)

// Add the hook resources
tr.hookResources = &hookResources{}
Expand Down
2 changes: 2 additions & 0 deletions command/agent/job_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -1245,6 +1245,7 @@ func ApiTaskToStructsTask(job *structs.Job, group *structs.TaskGroup,
structsTask.LogConfig = &structs.LogConfig{
MaxFiles: *apiTask.LogConfig.MaxFiles,
MaxFileSizeMB: *apiTask.LogConfig.MaxFileSizeMB,
Enabled: *apiTask.LogConfig.Enabled,
}

if len(apiTask.Artifacts) > 0 {
Expand Down Expand Up @@ -1809,6 +1810,7 @@ func apiLogConfigToStructs(in *api.LogConfig) *structs.LogConfig {
return nil
}
return &structs.LogConfig{
Enabled: *in.Enabled,
MaxFiles: dereferenceInt(in.MaxFiles),
MaxFileSizeMB: dereferenceInt(in.MaxFileSizeMB),
}
Expand Down
8 changes: 8 additions & 0 deletions command/agent/job_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2767,6 +2767,7 @@ func TestJobs_ApiJobToStructsJob(t *testing.T) {
KillTimeout: pointer.Of(10 * time.Second),
KillSignal: "SIGQUIT",
LogConfig: &api.LogConfig{
Enabled: pointer.Of(true),
MaxFiles: pointer.Of(10),
MaxFileSizeMB: pointer.Of(100),
},
Expand Down Expand Up @@ -3184,6 +3185,7 @@ func TestJobs_ApiJobToStructsJob(t *testing.T) {
KillTimeout: 10 * time.Second,
KillSignal: "SIGQUIT",
LogConfig: &structs.LogConfig{
Enabled: true,
MaxFiles: 10,
MaxFileSizeMB: 100,
},
Expand Down Expand Up @@ -3340,6 +3342,7 @@ func TestJobs_ApiJobToStructsJob(t *testing.T) {
KillTimeout: pointer.Of(10 * time.Second),
KillSignal: "SIGQUIT",
LogConfig: &api.LogConfig{
Enabled: pointer.Of(true),
MaxFiles: pointer.Of(10),
MaxFileSizeMB: pointer.Of(100),
},
Expand Down Expand Up @@ -3465,6 +3468,7 @@ func TestJobs_ApiJobToStructsJob(t *testing.T) {
KillTimeout: 10 * time.Second,
KillSignal: "SIGQUIT",
LogConfig: &structs.LogConfig{
Enabled: true,
MaxFiles: 10,
MaxFileSizeMB: 100,
},
Expand Down Expand Up @@ -3637,9 +3641,11 @@ func TestConversion_apiLogConfigToStructs(t *testing.T) {
ci.Parallel(t)
require.Nil(t, apiLogConfigToStructs(nil))
require.Equal(t, &structs.LogConfig{
Enabled: true,
MaxFiles: 2,
MaxFileSizeMB: 8,
}, apiLogConfigToStructs(&api.LogConfig{
Enabled: pointer.Of(true),
MaxFiles: pointer.Of(2),
MaxFileSizeMB: pointer.Of(8),
}))
Expand Down Expand Up @@ -3737,6 +3743,7 @@ func TestConversion_apiConnectSidecarTaskToStructs(t *testing.T) {
Meta: meta,
KillTimeout: &timeout,
LogConfig: &structs.LogConfig{
Enabled: true,
MaxFiles: 2,
MaxFileSizeMB: 8,
},
Expand All @@ -3755,6 +3762,7 @@ func TestConversion_apiConnectSidecarTaskToStructs(t *testing.T) {
Meta: meta,
KillTimeout: &timeout,
LogConfig: &api.LogConfig{
Enabled: pointer.Of(true),
MaxFiles: pointer.Of(2),
MaxFileSizeMB: pointer.Of(8),
},
Expand Down
14 changes: 12 additions & 2 deletions drivers/docker/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,7 @@ func (d *Driver) RecoverTask(handle *drivers.TaskHandle) error {
net: handleState.DriverNetwork,
}

if !d.config.DisableLogCollection {
if loggingIsEnabled(d.config, handle.Config) {
h.dlogger, h.dloggerPluginClient, err = d.reattachToDockerLogger(handleState.ReattachConfig)
if err != nil {
d.logger.Warn("failed to reattach to docker logger process", "error", err)
Expand Down Expand Up @@ -284,6 +284,16 @@ func (d *Driver) RecoverTask(handle *drivers.TaskHandle) error {
return nil
}

func loggingIsEnabled(driverCfg *DriverConfig, taskCfg *drivers.TaskConfig) bool {
if driverCfg.DisableLogCollection {
return false
}
if taskCfg.StderrPath == os.DevNull && taskCfg.StdoutPath == os.DevNull {
return false
}
return true
}

func (d *Driver) StartTask(cfg *drivers.TaskConfig) (*drivers.TaskHandle, *drivers.DriverNetwork, error) {
if _, ok := d.tasks.Get(cfg.ID); ok {
return nil, nil, fmt.Errorf("task with ID %q already started", cfg.ID)
Expand Down Expand Up @@ -399,7 +409,7 @@ CREATE:
}
}

collectingLogs := !d.config.DisableLogCollection
collectingLogs := loggingIsEnabled(d.config, cfg)

var dlogger docklog.DockerLogger
var pluginClient *plugin.Client
Expand Down
33 changes: 32 additions & 1 deletion drivers/docker/driver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ func dockerTask(t *testing.T) (*drivers.TaskConfig, *TaskConfig, []int) {
func dockerSetup(t *testing.T, task *drivers.TaskConfig, driverCfg map[string]interface{}) (*docker.Client, *dtestutil.DriverHarness, *taskHandle, func()) {
client := newTestDockerClient(t)
driver := dockerDriverHarness(t, driverCfg)
cleanup := driver.MkAllocDir(task, true)
cleanup := driver.MkAllocDir(task, loggingIsEnabled(&DriverConfig{}, task))

copyImage(t, task.TaskDir(), "busybox.tar")
_, _, err := driver.StartTask(task)
Expand Down Expand Up @@ -841,6 +841,37 @@ func TestDockerDriver_LoggingConfiguration(t *testing.T) {
require.Equal(t, loggerConfig, container.HostConfig.LogConfig.Config)
}

// TestDockerDriver_LogCollectionDisabled ensures that logmon isn't configured
// when log collection is disable, but out-of-band Docker log shipping still
// works as expected
func TestDockerDriver_LogCollectionDisabled(t *testing.T) {
ci.Parallel(t)
testutil.DockerCompatible(t)

task, cfg, _ := dockerTask(t)
task.StdoutPath = os.DevNull
task.StderrPath = os.DevNull

must.NoError(t, task.EncodeConcreteDriverConfig(cfg))

dockerClientConfig := make(map[string]interface{})
loggerConfig := map[string]string{"gelf-address": "udp://1.2.3.4:12201", "tag": "gelf"}

dockerClientConfig["logging"] = LoggingConfig{
Type: "gelf",
Config: loggerConfig,
}
client, d, handle, cleanup := dockerSetup(t, task, dockerClientConfig)
t.Cleanup(cleanup)
must.NoError(t, d.WaitUntilStarted(task.ID, 5*time.Second))
container, err := client.InspectContainer(handle.containerID)
must.NoError(t, err)
must.Nil(t, handle.dlogger)

must.Eq(t, "gelf", container.HostConfig.LogConfig.Type)
must.Eq(t, loggerConfig, container.HostConfig.LogConfig.Config)
}

func TestDockerDriver_HealthchecksDisable(t *testing.T) {
ci.Parallel(t)
testutil.DockerCompatible(t)
Expand Down
1 change: 1 addition & 0 deletions jobspec/parse_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,7 @@ func parseTask(item *ast.ObjectItem, keys []string) (*api.Task, error) {
valid := []string{
"max_files",
"max_file_size",
"enabled",
}
if err := checkHCLKeys(logsBlock.Val, valid); err != nil {
return nil, multierror.Prefix(err, "logs ->")
Expand Down
1 change: 1 addition & 0 deletions jobspec/parse_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -350,6 +350,7 @@ func TestParse(t *testing.T) {
LogConfig: &api.LogConfig{
MaxFiles: intToPtr(14),
MaxFileSizeMB: intToPtr(101),
Enabled: boolToPtr(true),
},
Artifacts: []*api.TaskArtifact{
{
Expand Down
Loading

0 comments on commit 7cc983b

Please sign in to comment.