From e1aabe6193366d9fe57b40013621274f3d6c1c79 Mon Sep 17 00:00:00 2001 From: Tony Holdstock-Brown Date: Mon, 19 Jul 2021 16:13:58 -0700 Subject: [PATCH] Allow docklog to collect logs from stopped containers Short-lived containers (especially those < 1 second) often do not have thier logs sent to Nomad. This PR adjusts the nomad docker driver and docker logging driver to: 1. Enable docklog to run after a container has stopped (to some grace period limit) 2. Collect logs from stopped containers up until the grace period This fixes the current issues: 1. docklog is killed by the handlea as soon as the task finishes, which means fast running containers can never have their logs scraped 2. docklog quits streaming logs in its event loop if the container has stopped In order to do this, we need to know _whether_ we have read logs for the current container in order to apply a grace period. We add a copier to the fifo streams which sets an atomic flag, letting us know whether we need to retry reading the logs and use a grace period or if we can quit early. Fixes #2475, #6931. Always wait to read from logs before exiting Store number of bytes read vs a simple counter --- drivers/docker/config.go | 55 ++++-- drivers/docker/docklog/docker_logger.go | 109 ++++++++++-- drivers/docker/docklog/docker_logger_test.go | 172 +++++++++++++++++++ drivers/docker/driver.go | 47 ++--- drivers/docker/handle.go | 39 +++-- 5 files changed, 347 insertions(+), 75 deletions(-) diff --git a/drivers/docker/config.go b/drivers/docker/config.go index 6194d60f1e4..5e47f9a2214 100644 --- a/drivers/docker/config.go +++ b/drivers/docker/config.go @@ -293,6 +293,13 @@ var ( // disable_log_collection indicates whether docker driver should collect logs of docker // task containers. If true, nomad doesn't start docker_logger/logmon processes "disable_log_collection": hclspec.NewAttr("disable_log_collection", "bool", false), + + // container_log_grace_period is the period in which the docker logger remains active if the + // container exits and no logs have been read yet. + "container_log_grace_period": hclspec.NewDefault( + hclspec.NewAttr("container_log_grace_period", "string", false), + hclspec.NewLiteral(`"10s"`), + ), }) // mountBodySpec is the hcl specification for the `mount` block @@ -611,23 +618,25 @@ type ContainerGCConfig struct { } type DriverConfig struct { - Endpoint string `codec:"endpoint"` - Auth AuthConfig `codec:"auth"` - TLS TLSConfig `codec:"tls"` - GC GCConfig `codec:"gc"` - Volumes VolumeConfig `codec:"volumes"` - AllowPrivileged bool `codec:"allow_privileged"` - AllowCaps []string `codec:"allow_caps"` - GPURuntimeName string `codec:"nvidia_runtime"` - InfraImage string `codec:"infra_image"` - InfraImagePullTimeout string `codec:"infra_image_pull_timeout"` - infraImagePullTimeoutDuration time.Duration `codec:"-"` - DisableLogCollection bool `codec:"disable_log_collection"` - PullActivityTimeout string `codec:"pull_activity_timeout"` - PidsLimit int64 `codec:"pids_limit"` - pullActivityTimeoutDuration time.Duration `codec:"-"` - ExtraLabels []string `codec:"extra_labels"` - Logging LoggingConfig `codec:"logging"` + Endpoint string `codec:"endpoint"` + Auth AuthConfig `codec:"auth"` + TLS TLSConfig `codec:"tls"` + GC GCConfig `codec:"gc"` + Volumes VolumeConfig `codec:"volumes"` + AllowPrivileged bool `codec:"allow_privileged"` + AllowCaps []string `codec:"allow_caps"` + GPURuntimeName string `codec:"nvidia_runtime"` + InfraImage string `codec:"infra_image"` + InfraImagePullTimeout string `codec:"infra_image_pull_timeout"` + infraImagePullTimeoutDuration time.Duration `codec:"-"` + DisableLogCollection bool `codec:"disable_log_collection"` + ContainerLogGracePeriod string `codec:"container_log_grace_period"` + containerLogGracePeriodDuration time.Duration `codec:"-"` + PullActivityTimeout string `codec:"pull_activity_timeout"` + PidsLimit int64 `codec:"pids_limit"` + pullActivityTimeoutDuration time.Duration `codec:"-"` + ExtraLabels []string `codec:"extra_labels"` + Logging LoggingConfig `codec:"logging"` AllowRuntimesList []string `codec:"allow_runtimes"` allowRuntimes map[string]struct{} `codec:"-"` @@ -673,6 +682,7 @@ func (d *Driver) ConfigSchema() (*hclspec.Spec, error) { const danglingContainersCreationGraceMinimum = 1 * time.Minute const pullActivityTimeoutMinimum = 1 * time.Minute +const containerLogGracePeriodMinimum = 2 * time.Second func (d *Driver) SetConfig(c *base.Config) error { var config DriverConfig @@ -723,6 +733,17 @@ func (d *Driver) SetConfig(c *base.Config) error { d.config.pullActivityTimeoutDuration = dur } + if len(d.config.ContainerLogGracePeriod) > 0 { + dur, err := time.ParseDuration(d.config.ContainerLogGracePeriod) + if err != nil { + return fmt.Errorf("failed to parse 'container_log_grace_period' duaration: %v", err) + } + if dur < containerLogGracePeriodMinimum { + return fmt.Errorf("container_log_grace_period is less than minimum, %v", containerLogGracePeriodMinimum) + } + d.config.containerLogGracePeriodDuration = dur + } + if d.config.InfraImagePullTimeout != "" { dur, err := time.ParseDuration(d.config.InfraImagePullTimeout) if err != nil { diff --git a/drivers/docker/docklog/docker_logger.go b/drivers/docker/docklog/docker_logger.go index 29668b664a3..595a0ed6583 100644 --- a/drivers/docker/docklog/docker_logger.go +++ b/drivers/docker/docklog/docker_logger.go @@ -6,6 +6,7 @@ import ( "math/rand" "strings" "sync" + "sync/atomic" "time" docker "github.com/fsouza/go-dockerclient" @@ -40,6 +41,10 @@ type StartOpts struct { // from StartTime int64 + // GracePeriod is the time in which we can attempt to collect logs from a stopped + // container, if none have been read yet. + GracePeriod time.Duration + // TLS settings for docker client TLSCert string TLSKey string @@ -56,28 +61,72 @@ func NewDockerLogger(logger hclog.Logger) DockerLogger { // dockerLogger implements the DockerLogger interface type dockerLogger struct { - logger hclog.Logger + logger hclog.Logger + containerID string stdout io.WriteCloser stderr io.WriteCloser stdLock sync.Mutex - cancelCtx context.CancelFunc - doneCh chan interface{} + // containerDoneCtx is called when the container dies, indicating that there will be no + // more logs to be read. + containerDoneCtx context.CancelFunc + + // read indicates whether we have read anything from the logs. This is manipulated + // using the sync package via multiple goroutines. + read int64 + + doneCh chan interface{} + // readDelay is used in testing to delay reads, simulating race conditions between + // container exits and reading + readDelay *time.Duration } // Start log monitoring func (d *dockerLogger) Start(opts *StartOpts) error { + d.containerID = opts.ContainerID + client, err := d.getDockerClient(opts) if err != nil { return fmt.Errorf("failed to open docker client: %v", err) } - ctx, cancel := context.WithCancel(context.Background()) - d.cancelCtx = cancel + // Set up a ctx which is called when the container quits. + containerDoneCtx, cancel := context.WithCancel(context.Background()) + d.containerDoneCtx = cancel + + // Set up a ctx which will be cancelled when we stop reading logs. This + // grace period allows us to collect logs from stopped containers if none + // have yet been read. + ctx, cancelStreams := context.WithCancel(context.Background()) + go func() { + <-containerDoneCtx.Done() + + // Wait until we've read from the logs to exit. + timeout := time.After(opts.GracePeriod) + for { + select { + case <-time.After(time.Second): + if d.read > 0 { + cancelStreams() + return + } + case <-timeout: + cancelStreams() + return + } + } + + }() go func() { defer close(d.doneCh) + defer d.cleanup() + + if d.readDelay != nil { + // Allows us to simulate reading from stopped containers in testing. + <-time.After(*d.readDelay) + } stdout, stderr, err := d.openStreams(ctx, opts) if err != nil { @@ -106,7 +155,11 @@ func (d *dockerLogger) Start(opts *StartOpts) error { } err := client.Logs(logOpts) - if ctx.Err() != nil { + // If we've been reading logs and the container is done we can safely break + // the loop + if containerDoneCtx.Err() != nil && d.read != 0 { + return + } else if ctx.Err() != nil { // If context is terminated then we can safely break the loop return } else if err == nil { @@ -123,7 +176,7 @@ func (d *dockerLogger) Start(opts *StartOpts) error { sinceTime = time.Now() - container, err := client.InspectContainerWithOptions(docker.InspectContainerOptions{ + _, err = client.InspectContainerWithOptions(docker.InspectContainerOptions{ ID: opts.ContainerID, }) if err != nil { @@ -131,8 +184,6 @@ func (d *dockerLogger) Start(opts *StartOpts) error { if !notFoundOk { return } - } else if !container.State.Running { - return } } }() @@ -169,25 +220,45 @@ func (d *dockerLogger) openStreams(ctx context.Context, opts *StartOpts) (stdout } } - if ctx.Err() != nil { - // Stop was called and don't need files anymore - stdoutF.Close() - stderrF.Close() - return nil, nil, ctx.Err() - } - d.stdLock.Lock() d.stdout, d.stderr = stdoutF, stderrF d.stdLock.Unlock() - return stdoutF, stderrF, nil + return d.streamCopier(stdoutF), d.streamCopier(stderrF), nil +} + +// streamCopier copies into the given writer and sets a flag indicating that +// we have read some logs. +func (d *dockerLogger) streamCopier(to io.WriteCloser) io.WriteCloser { + return &copier{read: &d.read, writer: to} +} + +type copier struct { + read *int64 + writer io.WriteCloser +} + +func (c *copier) Write(p []byte) (n int, err error) { + if *c.read == 0 { + atomic.AddInt64(c.read, int64(len(p))) + } + return c.writer.Write(p) +} + +func (c *copier) Close() error { + return c.writer.Close() } // Stop log monitoring func (d *dockerLogger) Stop() error { - if d.cancelCtx != nil { - d.cancelCtx() + if d.containerDoneCtx != nil { + d.containerDoneCtx() } + return nil +} + +func (d *dockerLogger) cleanup() error { + d.logger.Debug("cleaning up", "container_id", d.containerID) d.stdLock.Lock() stdout, stderr := d.stdout, d.stderr diff --git a/drivers/docker/docklog/docker_logger_test.go b/drivers/docker/docklog/docker_logger_test.go index 52af6d70b6b..efca07a2820 100644 --- a/drivers/docker/docklog/docker_logger_test.go +++ b/drivers/docker/docklog/docker_logger_test.go @@ -115,6 +115,178 @@ func TestDockerLogger_Success(t *testing.T) { }) } +func TestDockerLogger_OutOfOrderSuccess(t *testing.T) { + ctu.DockerCompatible(t) + + t.Parallel() + require := require.New(t) + + containerImage, containerImageName, containerImageTag := testContainerDetails() + + client, err := docker.NewClientFromEnv() + if err != nil { + t.Skip("docker unavailable:", err) + } + + if img, err := client.InspectImage(containerImage); err != nil || img == nil { + t.Log("image not found locally, downloading...") + err = client.PullImage(docker.PullImageOptions{ + Repository: containerImageName, + Tag: containerImageTag, + }, docker.AuthConfiguration{}) + require.NoError(err, "failed to pull image") + } + + containerConf := docker.CreateContainerOptions{ + Config: &docker.Config{ + Cmd: []string{ + "sh", "-c", "touch ~/docklog; tail -f ~/docklog", + }, + Image: containerImage, + }, + Context: context.Background(), + } + + container, err := client.CreateContainer(containerConf) + require.NoError(err) + + defer client.RemoveContainer(docker.RemoveContainerOptions{ + ID: container.ID, + Force: true, + }) + + err = client.StartContainer(container.ID, nil) + require.NoError(err) + + testutil.WaitForResult(func() (bool, error) { + container, err = client.InspectContainer(container.ID) + if err != nil { + return false, err + } + if !container.State.Running { + return false, fmt.Errorf("container not running") + } + return true, nil + }, func(err error) { + require.NoError(err) + }) + + stdout := &noopCloser{bytes.NewBuffer(nil)} + stderr := &noopCloser{bytes.NewBuffer(nil)} + + echoToContainer(t, client, container.ID, "abc") + echoToContainer(t, client, container.ID, "123") + + dl := NewDockerLogger(testlog.HCLogger(t)).(*dockerLogger) + dl.stdout = stdout + dl.stderr = stderr + require.NoError(dl.Start(&StartOpts{ + ContainerID: container.ID, + })) + + testutil.WaitForResult(func() (bool, error) { + act := stdout.String() + if "abc\n123\n" != act { + return false, fmt.Errorf("expected abc\\n123\\n for stdout but got %s", act) + } + + return true, nil + }, func(err error) { + require.NoError(err) + }) +} + +func TestDockerLogger_WaitSuccess(t *testing.T) { + ctu.DockerCompatible(t) + + t.Parallel() + require := require.New(t) + + containerImage, containerImageName, containerImageTag := testContainerDetails() + + client, err := docker.NewClientFromEnv() + if err != nil { + t.Skip("docker unavailable:", err) + } + + if img, err := client.InspectImage(containerImage); err != nil || img == nil { + t.Log("image not found locally, downloading...") + err = client.PullImage(docker.PullImageOptions{ + Repository: containerImageName, + Tag: containerImageTag, + }, docker.AuthConfiguration{}) + require.NoError(err, "failed to pull image") + } + + containerConf := docker.CreateContainerOptions{ + Config: &docker.Config{ + Cmd: []string{ + "sh", "-c", "touch ~/docklog; tail -f ~/docklog", + }, + Image: containerImage, + }, + Context: context.Background(), + } + + container, err := client.CreateContainer(containerConf) + require.NoError(err) + + defer client.RemoveContainer(docker.RemoveContainerOptions{ + ID: container.ID, + Force: true, + }) + + err = client.StartContainer(container.ID, nil) + require.NoError(err) + + testutil.WaitForResult(func() (bool, error) { + container, err = client.InspectContainer(container.ID) + if err != nil { + return false, err + } + if !container.State.Running { + return false, fmt.Errorf("container not running") + } + return true, nil + }, func(err error) { + require.NoError(err) + }) + + stdout := &noopCloser{bytes.NewBuffer(nil)} + stderr := &noopCloser{bytes.NewBuffer(nil)} + + echoToContainer(t, client, container.ID, "abc") + echoToContainer(t, client, container.ID, "123") + + <-time.After(time.Second) + + err = client.StopContainer(container.ID, 0) + require.NoError(err) + + <-time.After(time.Second) + + dl := NewDockerLogger(testlog.HCLogger(t)).(*dockerLogger) + dl.stdout = stdout + dl.stderr = stderr + delay := time.Second + dl.readDelay = &delay + require.NoError(dl.Start(&StartOpts{ + ContainerID: container.ID, + })) + dl.containerDoneCtx() + + testutil.WaitForResult(func() (bool, error) { + act := stdout.String() + if "abc\n123\n" != act { + return false, fmt.Errorf("expected abc\\n123\\n for stdout but got %s", act) + } + + return true, nil + }, func(err error) { + require.NoError(err) + }) +} + func TestDockerLogger_Success_TTY(t *testing.T) { ci.Parallel(t) ctu.DockerCompatible(t) diff --git a/drivers/docker/driver.go b/drivers/docker/driver.go index ec2455515df..146e92ed469 100644 --- a/drivers/docker/driver.go +++ b/drivers/docker/driver.go @@ -172,6 +172,7 @@ func (d *Driver) setupNewDockerLogger(container *docker.Container, cfg *drivers. TLSKey: d.config.TLS.Key, TLSCA: d.config.TLS.CA, StartTime: startTime.Unix(), + GracePeriod: d.config.containerLogGracePeriodDuration, }); err != nil { pluginClient.Kill() return nil, nil, fmt.Errorf("failed to launch docker logger process %s: %v", container.ID, err) @@ -203,16 +204,17 @@ func (d *Driver) RecoverTask(handle *drivers.TaskHandle) error { } h := &taskHandle{ - client: client, - waitClient: waitClient, - logger: d.logger.With("container_id", container.ID), - task: handle.Config, - containerID: container.ID, - containerImage: container.Image, - doneCh: make(chan bool), - waitCh: make(chan struct{}), - removeContainerOnExit: d.config.GC.Container, - net: handleState.DriverNetwork, + client: client, + waitClient: waitClient, + logger: d.logger.With("container_id", container.ID), + task: handle.Config, + containerID: container.ID, + containerImage: container.Image, + doneCh: make(chan bool), + waitCh: make(chan struct{}), + removeContainerOnExit: d.config.GC.Container, + net: handleState.DriverNetwork, + containerLogGracePeriod: d.config.containerLogGracePeriodDuration, } if !d.config.DisableLogCollection { @@ -383,18 +385,19 @@ CREATE: // Return a driver handle h := &taskHandle{ - client: client, - waitClient: waitClient, - dlogger: dlogger, - dloggerPluginClient: pluginClient, - logger: d.logger.With("container_id", container.ID), - task: cfg, - containerID: container.ID, - containerImage: container.Image, - doneCh: make(chan bool), - waitCh: make(chan struct{}), - removeContainerOnExit: d.config.GC.Container, - net: net, + client: client, + waitClient: waitClient, + dlogger: dlogger, + dloggerPluginClient: pluginClient, + logger: d.logger.With("container_id", container.ID), + task: cfg, + containerID: container.ID, + containerImage: container.Image, + doneCh: make(chan bool), + waitCh: make(chan struct{}), + removeContainerOnExit: d.config.GC.Container, + net: net, + containerLogGracePeriod: d.config.containerLogGracePeriodDuration, } if err := handle.SetDriverState(h.buildState()); err != nil { diff --git a/drivers/docker/handle.go b/drivers/docker/handle.go index 59515f2f97f..f4eec732ac8 100644 --- a/drivers/docker/handle.go +++ b/drivers/docker/handle.go @@ -21,18 +21,19 @@ import ( ) type taskHandle struct { - client *docker.Client - waitClient *docker.Client - logger hclog.Logger - dlogger docklog.DockerLogger - dloggerPluginClient *plugin.Client - task *drivers.TaskConfig - containerID string - containerImage string - doneCh chan bool - waitCh chan struct{} - removeContainerOnExit bool - net *drivers.DriverNetwork + client *docker.Client + waitClient *docker.Client + logger hclog.Logger + dlogger docklog.DockerLogger + dloggerPluginClient *plugin.Client + task *drivers.TaskConfig + containerID string + containerImage string + doneCh chan bool + waitCh chan struct{} + removeContainerOnExit bool + containerLogGracePeriod time.Duration + net *drivers.DriverNetwork exitResult *drivers.ExitResult exitResultLock sync.Mutex @@ -216,11 +217,15 @@ func (h *taskHandle) shutdownLogger() { return } - if err := h.dlogger.Stop(); err != nil { - h.logger.Error("failed to stop docker logger process during StopTask", - "error", err, "logger_pid", h.dloggerPluginClient.ReattachConfig().Pid) - } - h.dloggerPluginClient.Kill() + go func() { + h.logger.Info("sending stop signal to logger", "job_id", h.task.JobID, "container_id", h.containerID) + if err := h.dlogger.Stop(); err != nil { + h.logger.Error("failed to stop docker logger process during StopTask", + "error", err, "logger_pid", h.dloggerPluginClient.ReattachConfig().Pid) + } + <-time.After(h.containerLogGracePeriod) + h.dloggerPluginClient.Kill() + }() } func (h *taskHandle) run() {