Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Backport of logs: allow disabling log collection in jobspec into release/1.5.x #16970

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .changelog/16962.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:improvement
jobspec: Added option for disabling task log collection in the `logs` block
```
9 changes: 7 additions & 2 deletions api/tasks.go
Original file line number Diff line number Diff line change
Expand Up @@ -636,14 +636,16 @@ func (g *TaskGroup) AddSpread(s *Spread) *TaskGroup {

// LogConfig provides configuration for log rotation
type LogConfig struct {
MaxFiles *int `mapstructure:"max_files" hcl:"max_files,optional"`
MaxFileSizeMB *int `mapstructure:"max_file_size" hcl:"max_file_size,optional"`
MaxFiles *int `mapstructure:"max_files" hcl:"max_files,optional"`
MaxFileSizeMB *int `mapstructure:"max_file_size" hcl:"max_file_size,optional"`
Enabled *bool `mapstructure:"enabled" hcl:"enabled,optional"`
}

func DefaultLogConfig() *LogConfig {
return &LogConfig{
MaxFiles: pointerOf(10),
MaxFileSizeMB: pointerOf(10),
Enabled: pointerOf(true),
}
}

Expand All @@ -654,6 +656,9 @@ func (l *LogConfig) Canonicalize() {
if l.MaxFileSizeMB == nil {
l.MaxFileSizeMB = pointerOf(10)
}
if l.Enabled == nil {
l.Enabled = pointerOf(true)
}
}

// DispatchPayloadConfig configures how a task gets its input from a job dispatch
Expand Down
41 changes: 33 additions & 8 deletions client/allocrunner/taskrunner/logmon_hook.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"encoding/json"
"fmt"
"os"
"path/filepath"
"runtime"
"time"
Expand Down Expand Up @@ -42,6 +43,7 @@ type logmonHook struct {

type logmonHookConfig struct {
logDir string
enabled bool
stdoutFifo string
stderrFifo string
}
Expand All @@ -56,10 +58,19 @@ func newLogMonHook(tr *TaskRunner, logger hclog.Logger) *logmonHook {
return hook
}

func newLogMonHookConfig(taskName, logDir string) *logmonHookConfig {
func newLogMonHookConfig(taskName string, logCfg *structs.LogConfig, logDir string) *logmonHookConfig {
cfg := &logmonHookConfig{
logDir: logDir,
logDir: logDir,
enabled: logCfg.Enabled,
}

// If logging is disabled configure task's stdout/err to point to devnull
if !logCfg.Enabled {
cfg.stdoutFifo = os.DevNull
cfg.stderrFifo = os.DevNull
return cfg
}

if runtime.GOOS == "windows" {
id := uuid.Generate()[:8]
cfg.stdoutFifo = fmt.Sprintf("//./pipe/%s-%s.stdout", taskName, id)
Expand Down Expand Up @@ -102,9 +113,7 @@ func reattachConfigFromHookData(data map[string]string) (*plugin.ReattachConfig,

func (h *logmonHook) Prestart(ctx context.Context,
req *interfaces.TaskPrestartRequest, resp *interfaces.TaskPrestartResponse) error {

if h.isLoggingDisabled() {
h.logger.Debug("logging is disabled by driver")
if !h.isLoggingEnabled() {
return nil
}

Expand Down Expand Up @@ -139,14 +148,27 @@ func (h *logmonHook) Prestart(ctx context.Context,
}
}

func (h *logmonHook) isLoggingDisabled() bool {
func (h *logmonHook) isLoggingEnabled() bool {
if !h.config.enabled {
h.logger.Debug("log collection is disabled by task")
return false
}

// internal plugins have access to a capability to disable logging and
// metrics via a private interface; this is an "experimental" interface and
// currently only the docker driver exposes this to users.
ic, ok := h.runner.driver.(drivers.InternalCapabilitiesDriver)
if !ok {
return false
return true
}

caps := ic.InternalCapabilities()
return caps.DisableLogCollection
if caps.DisableLogCollection {
h.logger.Debug("log collection is disabled by driver")
return false
}

return true
}

func (h *logmonHook) prestartOneLoop(ctx context.Context, req *interfaces.TaskPrestartRequest) error {
Expand Down Expand Up @@ -194,6 +216,9 @@ func (h *logmonHook) prestartOneLoop(ctx context.Context, req *interfaces.TaskPr
}

func (h *logmonHook) Stop(_ context.Context, req *interfaces.TaskStopRequest, _ *interfaces.TaskStopResponse) error {
if !h.isLoggingEnabled() {
return nil
}

// It's possible that Stop was called without calling Prestart on agent
// restarts. Attempt to reattach to an existing logmon.
Expand Down
52 changes: 51 additions & 1 deletion client/allocrunner/taskrunner/logmon_hook_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/hashicorp/nomad/helper/testlog"
"github.com/hashicorp/nomad/nomad/mock"
pstructs "github.com/hashicorp/nomad/plugins/shared/structs"
"github.com/shoenig/test/must"
"github.com/stretchr/testify/require"
"golang.org/x/exp/maps"
)
Expand Down Expand Up @@ -66,7 +67,7 @@ func TestTaskRunner_LogmonHook_StartStop(t *testing.T) {

dir := t.TempDir()

hookConf := newLogMonHookConfig(task.Name, dir)
hookConf := newLogMonHookConfig(task.Name, task.LogConfig, dir)
runner := &TaskRunner{logmonHookConfig: hookConf}
hook := newLogMonHook(runner, testlog.HCLogger(t))

Expand Down Expand Up @@ -100,3 +101,52 @@ func TestTaskRunner_LogmonHook_StartStop(t *testing.T) {
}
require.NoError(t, hook.Stop(context.Background(), &stopReq, nil))
}

// TestTaskRunner_LogmonHook_Disabled asserts that no logmon running or expected
// by any of the lifecycle hooks.
func TestTaskRunner_LogmonHook_Disabled(t *testing.T) {
ci.Parallel(t)

alloc := mock.BatchAlloc()
task := alloc.Job.TaskGroups[0].Tasks[0]
task.LogConfig.Enabled = false

dir := t.TempDir()

hookConf := newLogMonHookConfig(task.Name, task.LogConfig, dir)
runner := &TaskRunner{logmonHookConfig: hookConf}
hook := newLogMonHook(runner, testlog.HCLogger(t))

req := interfaces.TaskPrestartRequest{Task: task}
resp := interfaces.TaskPrestartResponse{}

// First prestart should not set reattach key and never be Done.
must.NoError(t, hook.Prestart(context.Background(), &req, &resp))
t.Cleanup(func() { hook.Stop(context.Background(), nil, nil) })

must.False(t, resp.Done)
hookData, ok := resp.State[logmonReattachKey]
must.False(t, ok)
must.Eq(t, "", hookData)

// Running prestart again should still be a noop
req.PreviousState = map[string]string{}
must.NoError(t, hook.Prestart(context.Background(), &req, &resp))

must.False(t, resp.Done)
hookData, ok = resp.State[logmonReattachKey]
must.False(t, ok)
must.Eq(t, "", hookData)

// PreviousState should always be initialized by the caller, but just
// belt-and-suspenders for this test to ensure we can't panic on this code
// path
req.PreviousState = nil
must.NoError(t, hook.Prestart(context.Background(), &req, &resp))

// Running stop should not error even with no running logmon
stopReq := interfaces.TaskStopRequest{
ExistingState: maps.Clone(resp.State),
}
must.NoError(t, hook.Stop(context.Background(), &stopReq, nil))
}
4 changes: 2 additions & 2 deletions client/allocrunner/taskrunner/logmon_hook_unix_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func TestTaskRunner_LogmonHook_StartCrashStop(t *testing.T) {

dir := t.TempDir()

hookConf := newLogMonHookConfig(task.Name, dir)
hookConf := newLogMonHookConfig(task.Name, task.LogConfig, dir)
runner := &TaskRunner{logmonHookConfig: hookConf}
hook := newLogMonHook(runner, testlog.HCLogger(t))

Expand Down Expand Up @@ -97,7 +97,7 @@ func TestTaskRunner_LogmonHook_ShutdownMidStart(t *testing.T) {

dir := t.TempDir()

hookConf := newLogMonHookConfig(task.Name, dir)
hookConf := newLogMonHookConfig(task.Name, task.LogConfig, dir)
runner := &TaskRunner{logmonHookConfig: hookConf}
hook := newLogMonHook(runner, testlog.HCLogger(t))

Expand Down
2 changes: 1 addition & 1 deletion client/allocrunner/taskrunner/task_runner_hooks.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func (tr *TaskRunner) initHooks() {
hookLogger := tr.logger.Named("task_hook")
task := tr.Task()

tr.logmonHookConfig = newLogMonHookConfig(task.Name, tr.taskDir.LogDir)
tr.logmonHookConfig = newLogMonHookConfig(task.Name, task.LogConfig, tr.taskDir.LogDir)

// Add the hook resources
tr.hookResources = &hookResources{}
Expand Down
2 changes: 2 additions & 0 deletions command/agent/job_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -1183,6 +1183,7 @@ func ApiTaskToStructsTask(job *structs.Job, group *structs.TaskGroup,
structsTask.LogConfig = &structs.LogConfig{
MaxFiles: *apiTask.LogConfig.MaxFiles,
MaxFileSizeMB: *apiTask.LogConfig.MaxFileSizeMB,
Enabled: *apiTask.LogConfig.Enabled,
}

if len(apiTask.Artifacts) > 0 {
Expand Down Expand Up @@ -1747,6 +1748,7 @@ func apiLogConfigToStructs(in *api.LogConfig) *structs.LogConfig {
return nil
}
return &structs.LogConfig{
Enabled: *in.Enabled,
MaxFiles: dereferenceInt(in.MaxFiles),
MaxFileSizeMB: dereferenceInt(in.MaxFileSizeMB),
}
Expand Down
8 changes: 8 additions & 0 deletions command/agent/job_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2707,6 +2707,7 @@ func TestJobs_ApiJobToStructsJob(t *testing.T) {
KillTimeout: pointer.Of(10 * time.Second),
KillSignal: "SIGQUIT",
LogConfig: &api.LogConfig{
Enabled: pointer.Of(true),
MaxFiles: pointer.Of(10),
MaxFileSizeMB: pointer.Of(100),
},
Expand Down Expand Up @@ -3124,6 +3125,7 @@ func TestJobs_ApiJobToStructsJob(t *testing.T) {
KillTimeout: 10 * time.Second,
KillSignal: "SIGQUIT",
LogConfig: &structs.LogConfig{
Enabled: true,
MaxFiles: 10,
MaxFileSizeMB: 100,
},
Expand Down Expand Up @@ -3280,6 +3282,7 @@ func TestJobs_ApiJobToStructsJob(t *testing.T) {
KillTimeout: pointer.Of(10 * time.Second),
KillSignal: "SIGQUIT",
LogConfig: &api.LogConfig{
Enabled: pointer.Of(true),
MaxFiles: pointer.Of(10),
MaxFileSizeMB: pointer.Of(100),
},
Expand Down Expand Up @@ -3405,6 +3408,7 @@ func TestJobs_ApiJobToStructsJob(t *testing.T) {
KillTimeout: 10 * time.Second,
KillSignal: "SIGQUIT",
LogConfig: &structs.LogConfig{
Enabled: true,
MaxFiles: 10,
MaxFileSizeMB: 100,
},
Expand Down Expand Up @@ -3577,9 +3581,11 @@ func TestConversion_apiLogConfigToStructs(t *testing.T) {
ci.Parallel(t)
require.Nil(t, apiLogConfigToStructs(nil))
require.Equal(t, &structs.LogConfig{
Enabled: true,
MaxFiles: 2,
MaxFileSizeMB: 8,
}, apiLogConfigToStructs(&api.LogConfig{
Enabled: pointer.Of(true),
MaxFiles: pointer.Of(2),
MaxFileSizeMB: pointer.Of(8),
}))
Expand Down Expand Up @@ -3653,6 +3659,7 @@ func TestConversion_apiConnectSidecarTaskToStructs(t *testing.T) {
Meta: meta,
KillTimeout: &timeout,
LogConfig: &structs.LogConfig{
Enabled: true,
MaxFiles: 2,
MaxFileSizeMB: 8,
},
Expand All @@ -3671,6 +3678,7 @@ func TestConversion_apiConnectSidecarTaskToStructs(t *testing.T) {
Meta: meta,
KillTimeout: &timeout,
LogConfig: &api.LogConfig{
Enabled: pointer.Of(true),
MaxFiles: pointer.Of(2),
MaxFileSizeMB: pointer.Of(8),
},
Expand Down
14 changes: 12 additions & 2 deletions drivers/docker/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,7 @@ func (d *Driver) RecoverTask(handle *drivers.TaskHandle) error {
net: handleState.DriverNetwork,
}

if !d.config.DisableLogCollection {
if loggingIsEnabled(d.config, handle.Config) {
h.dlogger, h.dloggerPluginClient, err = d.reattachToDockerLogger(handleState.ReattachConfig)
if err != nil {
d.logger.Warn("failed to reattach to docker logger process", "error", err)
Expand Down Expand Up @@ -282,6 +282,16 @@ func (d *Driver) RecoverTask(handle *drivers.TaskHandle) error {
return nil
}

func loggingIsEnabled(driverCfg *DriverConfig, taskCfg *drivers.TaskConfig) bool {
if driverCfg.DisableLogCollection {
return false
}
if taskCfg.StderrPath == os.DevNull && taskCfg.StdoutPath == os.DevNull {
return false
}
return true
}

func (d *Driver) StartTask(cfg *drivers.TaskConfig) (*drivers.TaskHandle, *drivers.DriverNetwork, error) {
if _, ok := d.tasks.Get(cfg.ID); ok {
return nil, nil, fmt.Errorf("task with ID %q already started", cfg.ID)
Expand Down Expand Up @@ -397,7 +407,7 @@ CREATE:
}
}

collectingLogs := !d.config.DisableLogCollection
collectingLogs := loggingIsEnabled(d.config, cfg)

var dlogger docklog.DockerLogger
var pluginClient *plugin.Client
Expand Down
33 changes: 32 additions & 1 deletion drivers/docker/driver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ func dockerTask(t *testing.T) (*drivers.TaskConfig, *TaskConfig, []int) {
func dockerSetup(t *testing.T, task *drivers.TaskConfig, driverCfg map[string]interface{}) (*docker.Client, *dtestutil.DriverHarness, *taskHandle, func()) {
client := newTestDockerClient(t)
driver := dockerDriverHarness(t, driverCfg)
cleanup := driver.MkAllocDir(task, true)
cleanup := driver.MkAllocDir(task, loggingIsEnabled(&DriverConfig{}, task))

copyImage(t, task.TaskDir(), "busybox.tar")
_, _, err := driver.StartTask(task)
Expand Down Expand Up @@ -838,6 +838,37 @@ func TestDockerDriver_LoggingConfiguration(t *testing.T) {
require.Equal(t, loggerConfig, container.HostConfig.LogConfig.Config)
}

// TestDockerDriver_LogCollectionDisabled ensures that logmon isn't configured
// when log collection is disable, but out-of-band Docker log shipping still
// works as expected
func TestDockerDriver_LogCollectionDisabled(t *testing.T) {
ci.Parallel(t)
testutil.DockerCompatible(t)

task, cfg, _ := dockerTask(t)
task.StdoutPath = os.DevNull
task.StderrPath = os.DevNull

must.NoError(t, task.EncodeConcreteDriverConfig(cfg))

dockerClientConfig := make(map[string]interface{})
loggerConfig := map[string]string{"gelf-address": "udp://1.2.3.4:12201", "tag": "gelf"}

dockerClientConfig["logging"] = LoggingConfig{
Type: "gelf",
Config: loggerConfig,
}
client, d, handle, cleanup := dockerSetup(t, task, dockerClientConfig)
t.Cleanup(cleanup)
must.NoError(t, d.WaitUntilStarted(task.ID, 5*time.Second))
container, err := client.InspectContainer(handle.containerID)
must.NoError(t, err)
must.Nil(t, handle.dlogger)

must.Eq(t, "gelf", container.HostConfig.LogConfig.Type)
must.Eq(t, loggerConfig, container.HostConfig.LogConfig.Config)
}

func TestDockerDriver_HealthchecksDisable(t *testing.T) {
ci.Parallel(t)
testutil.DockerCompatible(t)
Expand Down
1 change: 1 addition & 0 deletions jobspec/parse_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,7 @@ func parseTask(item *ast.ObjectItem, keys []string) (*api.Task, error) {
valid := []string{
"max_files",
"max_file_size",
"enabled",
}
if err := checkHCLKeys(logsBlock.Val, valid); err != nil {
return nil, multierror.Prefix(err, "logs ->")
Expand Down
1 change: 1 addition & 0 deletions jobspec/parse_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -347,6 +347,7 @@ func TestParse(t *testing.T) {
LogConfig: &api.LogConfig{
MaxFiles: intToPtr(14),
MaxFileSizeMB: intToPtr(101),
Enabled: boolToPtr(true),
},
Artifacts: []*api.TaskArtifact{
{
Expand Down
Loading