Skip to content

Commit

Permalink
backport of commit 7cc983b (#16970)
Browse files Browse the repository at this point in the history
This pull request was automerged via backport-assistant
  • Loading branch information
hc-github-team-nomad-core authored Apr 24, 2023
1 parent 213bc9e commit f7e5455
Show file tree
Hide file tree
Showing 22 changed files with 261 additions and 27 deletions.
3 changes: 3 additions & 0 deletions .changelog/16962.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:improvement
jobspec: Added option for disabling task log collection in the `logs` block
```
9 changes: 7 additions & 2 deletions api/tasks.go
Original file line number Diff line number Diff line change
Expand Up @@ -636,14 +636,16 @@ func (g *TaskGroup) AddSpread(s *Spread) *TaskGroup {

// LogConfig provides configuration for log rotation
type LogConfig struct {
MaxFiles *int `mapstructure:"max_files" hcl:"max_files,optional"`
MaxFileSizeMB *int `mapstructure:"max_file_size" hcl:"max_file_size,optional"`
MaxFiles *int `mapstructure:"max_files" hcl:"max_files,optional"`
MaxFileSizeMB *int `mapstructure:"max_file_size" hcl:"max_file_size,optional"`
Enabled *bool `mapstructure:"enabled" hcl:"enabled,optional"`
}

func DefaultLogConfig() *LogConfig {
return &LogConfig{
MaxFiles: pointerOf(10),
MaxFileSizeMB: pointerOf(10),
Enabled: pointerOf(true),
}
}

Expand All @@ -654,6 +656,9 @@ func (l *LogConfig) Canonicalize() {
if l.MaxFileSizeMB == nil {
l.MaxFileSizeMB = pointerOf(10)
}
if l.Enabled == nil {
l.Enabled = pointerOf(true)
}
}

// DispatchPayloadConfig configures how a task gets its input from a job dispatch
Expand Down
41 changes: 33 additions & 8 deletions client/allocrunner/taskrunner/logmon_hook.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"encoding/json"
"fmt"
"os"
"path/filepath"
"runtime"
"time"
Expand Down Expand Up @@ -42,6 +43,7 @@ type logmonHook struct {

type logmonHookConfig struct {
logDir string
enabled bool
stdoutFifo string
stderrFifo string
}
Expand All @@ -56,10 +58,19 @@ func newLogMonHook(tr *TaskRunner, logger hclog.Logger) *logmonHook {
return hook
}

func newLogMonHookConfig(taskName, logDir string) *logmonHookConfig {
func newLogMonHookConfig(taskName string, logCfg *structs.LogConfig, logDir string) *logmonHookConfig {
cfg := &logmonHookConfig{
logDir: logDir,
logDir: logDir,
enabled: logCfg.Enabled,
}

// If logging is disabled configure task's stdout/err to point to devnull
if !logCfg.Enabled {
cfg.stdoutFifo = os.DevNull
cfg.stderrFifo = os.DevNull
return cfg
}

if runtime.GOOS == "windows" {
id := uuid.Generate()[:8]
cfg.stdoutFifo = fmt.Sprintf("//./pipe/%s-%s.stdout", taskName, id)
Expand Down Expand Up @@ -102,9 +113,7 @@ func reattachConfigFromHookData(data map[string]string) (*plugin.ReattachConfig,

func (h *logmonHook) Prestart(ctx context.Context,
req *interfaces.TaskPrestartRequest, resp *interfaces.TaskPrestartResponse) error {

if h.isLoggingDisabled() {
h.logger.Debug("logging is disabled by driver")
if !h.isLoggingEnabled() {
return nil
}

Expand Down Expand Up @@ -139,14 +148,27 @@ func (h *logmonHook) Prestart(ctx context.Context,
}
}

func (h *logmonHook) isLoggingDisabled() bool {
func (h *logmonHook) isLoggingEnabled() bool {
if !h.config.enabled {
h.logger.Debug("log collection is disabled by task")
return false
}

// internal plugins have access to a capability to disable logging and
// metrics via a private interface; this is an "experimental" interface and
// currently only the docker driver exposes this to users.
ic, ok := h.runner.driver.(drivers.InternalCapabilitiesDriver)
if !ok {
return false
return true
}

caps := ic.InternalCapabilities()
return caps.DisableLogCollection
if caps.DisableLogCollection {
h.logger.Debug("log collection is disabled by driver")
return false
}

return true
}

func (h *logmonHook) prestartOneLoop(ctx context.Context, req *interfaces.TaskPrestartRequest) error {
Expand Down Expand Up @@ -194,6 +216,9 @@ func (h *logmonHook) prestartOneLoop(ctx context.Context, req *interfaces.TaskPr
}

func (h *logmonHook) Stop(_ context.Context, req *interfaces.TaskStopRequest, _ *interfaces.TaskStopResponse) error {
if !h.isLoggingEnabled() {
return nil
}

// It's possible that Stop was called without calling Prestart on agent
// restarts. Attempt to reattach to an existing logmon.
Expand Down
52 changes: 51 additions & 1 deletion client/allocrunner/taskrunner/logmon_hook_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/hashicorp/nomad/helper/testlog"
"github.com/hashicorp/nomad/nomad/mock"
pstructs "github.com/hashicorp/nomad/plugins/shared/structs"
"github.com/shoenig/test/must"
"github.com/stretchr/testify/require"
"golang.org/x/exp/maps"
)
Expand Down Expand Up @@ -66,7 +67,7 @@ func TestTaskRunner_LogmonHook_StartStop(t *testing.T) {

dir := t.TempDir()

hookConf := newLogMonHookConfig(task.Name, dir)
hookConf := newLogMonHookConfig(task.Name, task.LogConfig, dir)
runner := &TaskRunner{logmonHookConfig: hookConf}
hook := newLogMonHook(runner, testlog.HCLogger(t))

Expand Down Expand Up @@ -100,3 +101,52 @@ func TestTaskRunner_LogmonHook_StartStop(t *testing.T) {
}
require.NoError(t, hook.Stop(context.Background(), &stopReq, nil))
}

// TestTaskRunner_LogmonHook_Disabled asserts that no logmon running or expected
// by any of the lifecycle hooks.
func TestTaskRunner_LogmonHook_Disabled(t *testing.T) {
ci.Parallel(t)

alloc := mock.BatchAlloc()
task := alloc.Job.TaskGroups[0].Tasks[0]
task.LogConfig.Enabled = false

dir := t.TempDir()

hookConf := newLogMonHookConfig(task.Name, task.LogConfig, dir)
runner := &TaskRunner{logmonHookConfig: hookConf}
hook := newLogMonHook(runner, testlog.HCLogger(t))

req := interfaces.TaskPrestartRequest{Task: task}
resp := interfaces.TaskPrestartResponse{}

// First prestart should not set reattach key and never be Done.
must.NoError(t, hook.Prestart(context.Background(), &req, &resp))
t.Cleanup(func() { hook.Stop(context.Background(), nil, nil) })

must.False(t, resp.Done)
hookData, ok := resp.State[logmonReattachKey]
must.False(t, ok)
must.Eq(t, "", hookData)

// Running prestart again should still be a noop
req.PreviousState = map[string]string{}
must.NoError(t, hook.Prestart(context.Background(), &req, &resp))

must.False(t, resp.Done)
hookData, ok = resp.State[logmonReattachKey]
must.False(t, ok)
must.Eq(t, "", hookData)

// PreviousState should always be initialized by the caller, but just
// belt-and-suspenders for this test to ensure we can't panic on this code
// path
req.PreviousState = nil
must.NoError(t, hook.Prestart(context.Background(), &req, &resp))

// Running stop should not error even with no running logmon
stopReq := interfaces.TaskStopRequest{
ExistingState: maps.Clone(resp.State),
}
must.NoError(t, hook.Stop(context.Background(), &stopReq, nil))
}
4 changes: 2 additions & 2 deletions client/allocrunner/taskrunner/logmon_hook_unix_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func TestTaskRunner_LogmonHook_StartCrashStop(t *testing.T) {

dir := t.TempDir()

hookConf := newLogMonHookConfig(task.Name, dir)
hookConf := newLogMonHookConfig(task.Name, task.LogConfig, dir)
runner := &TaskRunner{logmonHookConfig: hookConf}
hook := newLogMonHook(runner, testlog.HCLogger(t))

Expand Down Expand Up @@ -97,7 +97,7 @@ func TestTaskRunner_LogmonHook_ShutdownMidStart(t *testing.T) {

dir := t.TempDir()

hookConf := newLogMonHookConfig(task.Name, dir)
hookConf := newLogMonHookConfig(task.Name, task.LogConfig, dir)
runner := &TaskRunner{logmonHookConfig: hookConf}
hook := newLogMonHook(runner, testlog.HCLogger(t))

Expand Down
2 changes: 1 addition & 1 deletion client/allocrunner/taskrunner/task_runner_hooks.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func (tr *TaskRunner) initHooks() {
hookLogger := tr.logger.Named("task_hook")
task := tr.Task()

tr.logmonHookConfig = newLogMonHookConfig(task.Name, tr.taskDir.LogDir)
tr.logmonHookConfig = newLogMonHookConfig(task.Name, task.LogConfig, tr.taskDir.LogDir)

// Add the hook resources
tr.hookResources = &hookResources{}
Expand Down
2 changes: 2 additions & 0 deletions command/agent/job_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -1183,6 +1183,7 @@ func ApiTaskToStructsTask(job *structs.Job, group *structs.TaskGroup,
structsTask.LogConfig = &structs.LogConfig{
MaxFiles: *apiTask.LogConfig.MaxFiles,
MaxFileSizeMB: *apiTask.LogConfig.MaxFileSizeMB,
Enabled: *apiTask.LogConfig.Enabled,
}

if len(apiTask.Artifacts) > 0 {
Expand Down Expand Up @@ -1747,6 +1748,7 @@ func apiLogConfigToStructs(in *api.LogConfig) *structs.LogConfig {
return nil
}
return &structs.LogConfig{
Enabled: *in.Enabled,
MaxFiles: dereferenceInt(in.MaxFiles),
MaxFileSizeMB: dereferenceInt(in.MaxFileSizeMB),
}
Expand Down
8 changes: 8 additions & 0 deletions command/agent/job_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2707,6 +2707,7 @@ func TestJobs_ApiJobToStructsJob(t *testing.T) {
KillTimeout: pointer.Of(10 * time.Second),
KillSignal: "SIGQUIT",
LogConfig: &api.LogConfig{
Enabled: pointer.Of(true),
MaxFiles: pointer.Of(10),
MaxFileSizeMB: pointer.Of(100),
},
Expand Down Expand Up @@ -3124,6 +3125,7 @@ func TestJobs_ApiJobToStructsJob(t *testing.T) {
KillTimeout: 10 * time.Second,
KillSignal: "SIGQUIT",
LogConfig: &structs.LogConfig{
Enabled: true,
MaxFiles: 10,
MaxFileSizeMB: 100,
},
Expand Down Expand Up @@ -3280,6 +3282,7 @@ func TestJobs_ApiJobToStructsJob(t *testing.T) {
KillTimeout: pointer.Of(10 * time.Second),
KillSignal: "SIGQUIT",
LogConfig: &api.LogConfig{
Enabled: pointer.Of(true),
MaxFiles: pointer.Of(10),
MaxFileSizeMB: pointer.Of(100),
},
Expand Down Expand Up @@ -3405,6 +3408,7 @@ func TestJobs_ApiJobToStructsJob(t *testing.T) {
KillTimeout: 10 * time.Second,
KillSignal: "SIGQUIT",
LogConfig: &structs.LogConfig{
Enabled: true,
MaxFiles: 10,
MaxFileSizeMB: 100,
},
Expand Down Expand Up @@ -3577,9 +3581,11 @@ func TestConversion_apiLogConfigToStructs(t *testing.T) {
ci.Parallel(t)
require.Nil(t, apiLogConfigToStructs(nil))
require.Equal(t, &structs.LogConfig{
Enabled: true,
MaxFiles: 2,
MaxFileSizeMB: 8,
}, apiLogConfigToStructs(&api.LogConfig{
Enabled: pointer.Of(true),
MaxFiles: pointer.Of(2),
MaxFileSizeMB: pointer.Of(8),
}))
Expand Down Expand Up @@ -3653,6 +3659,7 @@ func TestConversion_apiConnectSidecarTaskToStructs(t *testing.T) {
Meta: meta,
KillTimeout: &timeout,
LogConfig: &structs.LogConfig{
Enabled: true,
MaxFiles: 2,
MaxFileSizeMB: 8,
},
Expand All @@ -3671,6 +3678,7 @@ func TestConversion_apiConnectSidecarTaskToStructs(t *testing.T) {
Meta: meta,
KillTimeout: &timeout,
LogConfig: &api.LogConfig{
Enabled: pointer.Of(true),
MaxFiles: pointer.Of(2),
MaxFileSizeMB: pointer.Of(8),
},
Expand Down
14 changes: 12 additions & 2 deletions drivers/docker/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,7 @@ func (d *Driver) RecoverTask(handle *drivers.TaskHandle) error {
net: handleState.DriverNetwork,
}

if !d.config.DisableLogCollection {
if loggingIsEnabled(d.config, handle.Config) {
h.dlogger, h.dloggerPluginClient, err = d.reattachToDockerLogger(handleState.ReattachConfig)
if err != nil {
d.logger.Warn("failed to reattach to docker logger process", "error", err)
Expand Down Expand Up @@ -282,6 +282,16 @@ func (d *Driver) RecoverTask(handle *drivers.TaskHandle) error {
return nil
}

func loggingIsEnabled(driverCfg *DriverConfig, taskCfg *drivers.TaskConfig) bool {
if driverCfg.DisableLogCollection {
return false
}
if taskCfg.StderrPath == os.DevNull && taskCfg.StdoutPath == os.DevNull {
return false
}
return true
}

func (d *Driver) StartTask(cfg *drivers.TaskConfig) (*drivers.TaskHandle, *drivers.DriverNetwork, error) {
if _, ok := d.tasks.Get(cfg.ID); ok {
return nil, nil, fmt.Errorf("task with ID %q already started", cfg.ID)
Expand Down Expand Up @@ -397,7 +407,7 @@ CREATE:
}
}

collectingLogs := !d.config.DisableLogCollection
collectingLogs := loggingIsEnabled(d.config, cfg)

var dlogger docklog.DockerLogger
var pluginClient *plugin.Client
Expand Down
33 changes: 32 additions & 1 deletion drivers/docker/driver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ func dockerTask(t *testing.T) (*drivers.TaskConfig, *TaskConfig, []int) {
func dockerSetup(t *testing.T, task *drivers.TaskConfig, driverCfg map[string]interface{}) (*docker.Client, *dtestutil.DriverHarness, *taskHandle, func()) {
client := newTestDockerClient(t)
driver := dockerDriverHarness(t, driverCfg)
cleanup := driver.MkAllocDir(task, true)
cleanup := driver.MkAllocDir(task, loggingIsEnabled(&DriverConfig{}, task))

copyImage(t, task.TaskDir(), "busybox.tar")
_, _, err := driver.StartTask(task)
Expand Down Expand Up @@ -838,6 +838,37 @@ func TestDockerDriver_LoggingConfiguration(t *testing.T) {
require.Equal(t, loggerConfig, container.HostConfig.LogConfig.Config)
}

// TestDockerDriver_LogCollectionDisabled ensures that logmon isn't configured
// when log collection is disable, but out-of-band Docker log shipping still
// works as expected
func TestDockerDriver_LogCollectionDisabled(t *testing.T) {
ci.Parallel(t)
testutil.DockerCompatible(t)

task, cfg, _ := dockerTask(t)
task.StdoutPath = os.DevNull
task.StderrPath = os.DevNull

must.NoError(t, task.EncodeConcreteDriverConfig(cfg))

dockerClientConfig := make(map[string]interface{})
loggerConfig := map[string]string{"gelf-address": "udp://1.2.3.4:12201", "tag": "gelf"}

dockerClientConfig["logging"] = LoggingConfig{
Type: "gelf",
Config: loggerConfig,
}
client, d, handle, cleanup := dockerSetup(t, task, dockerClientConfig)
t.Cleanup(cleanup)
must.NoError(t, d.WaitUntilStarted(task.ID, 5*time.Second))
container, err := client.InspectContainer(handle.containerID)
must.NoError(t, err)
must.Nil(t, handle.dlogger)

must.Eq(t, "gelf", container.HostConfig.LogConfig.Type)
must.Eq(t, loggerConfig, container.HostConfig.LogConfig.Config)
}

func TestDockerDriver_HealthchecksDisable(t *testing.T) {
ci.Parallel(t)
testutil.DockerCompatible(t)
Expand Down
1 change: 1 addition & 0 deletions jobspec/parse_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,7 @@ func parseTask(item *ast.ObjectItem, keys []string) (*api.Task, error) {
valid := []string{
"max_files",
"max_file_size",
"enabled",
}
if err := checkHCLKeys(logsBlock.Val, valid); err != nil {
return nil, multierror.Prefix(err, "logs ->")
Expand Down
1 change: 1 addition & 0 deletions jobspec/parse_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -347,6 +347,7 @@ func TestParse(t *testing.T) {
LogConfig: &api.LogConfig{
MaxFiles: intToPtr(14),
MaxFileSizeMB: intToPtr(101),
Enabled: boolToPtr(true),
},
Artifacts: []*api.TaskArtifact{
{
Expand Down
Loading

0 comments on commit f7e5455

Please sign in to comment.