diff --git a/drivers/docker/config.go b/drivers/docker/config.go index 6194d60f1e4..5e47f9a2214 100644 --- a/drivers/docker/config.go +++ b/drivers/docker/config.go @@ -293,6 +293,13 @@ var ( // disable_log_collection indicates whether docker driver should collect logs of docker // task containers. If true, nomad doesn't start docker_logger/logmon processes "disable_log_collection": hclspec.NewAttr("disable_log_collection", "bool", false), + + // container_log_grace_period is the period in which the docker logger remains active if the + // container exits and no logs have been read yet. + "container_log_grace_period": hclspec.NewDefault( + hclspec.NewAttr("container_log_grace_period", "string", false), + hclspec.NewLiteral(`"10s"`), + ), }) // mountBodySpec is the hcl specification for the `mount` block @@ -611,23 +618,25 @@ type ContainerGCConfig struct { } type DriverConfig struct { - Endpoint string `codec:"endpoint"` - Auth AuthConfig `codec:"auth"` - TLS TLSConfig `codec:"tls"` - GC GCConfig `codec:"gc"` - Volumes VolumeConfig `codec:"volumes"` - AllowPrivileged bool `codec:"allow_privileged"` - AllowCaps []string `codec:"allow_caps"` - GPURuntimeName string `codec:"nvidia_runtime"` - InfraImage string `codec:"infra_image"` - InfraImagePullTimeout string `codec:"infra_image_pull_timeout"` - infraImagePullTimeoutDuration time.Duration `codec:"-"` - DisableLogCollection bool `codec:"disable_log_collection"` - PullActivityTimeout string `codec:"pull_activity_timeout"` - PidsLimit int64 `codec:"pids_limit"` - pullActivityTimeoutDuration time.Duration `codec:"-"` - ExtraLabels []string `codec:"extra_labels"` - Logging LoggingConfig `codec:"logging"` + Endpoint string `codec:"endpoint"` + Auth AuthConfig `codec:"auth"` + TLS TLSConfig `codec:"tls"` + GC GCConfig `codec:"gc"` + Volumes VolumeConfig `codec:"volumes"` + AllowPrivileged bool `codec:"allow_privileged"` + AllowCaps []string `codec:"allow_caps"` + GPURuntimeName string `codec:"nvidia_runtime"` + InfraImage string `codec:"infra_image"` + InfraImagePullTimeout string `codec:"infra_image_pull_timeout"` + infraImagePullTimeoutDuration time.Duration `codec:"-"` + DisableLogCollection bool `codec:"disable_log_collection"` + ContainerLogGracePeriod string `codec:"container_log_grace_period"` + containerLogGracePeriodDuration time.Duration `codec:"-"` + PullActivityTimeout string `codec:"pull_activity_timeout"` + PidsLimit int64 `codec:"pids_limit"` + pullActivityTimeoutDuration time.Duration `codec:"-"` + ExtraLabels []string `codec:"extra_labels"` + Logging LoggingConfig `codec:"logging"` AllowRuntimesList []string `codec:"allow_runtimes"` allowRuntimes map[string]struct{} `codec:"-"` @@ -673,6 +682,7 @@ func (d *Driver) ConfigSchema() (*hclspec.Spec, error) { const danglingContainersCreationGraceMinimum = 1 * time.Minute const pullActivityTimeoutMinimum = 1 * time.Minute +const containerLogGracePeriodMinimum = 2 * time.Second func (d *Driver) SetConfig(c *base.Config) error { var config DriverConfig @@ -723,6 +733,17 @@ func (d *Driver) SetConfig(c *base.Config) error { d.config.pullActivityTimeoutDuration = dur } + if len(d.config.ContainerLogGracePeriod) > 0 { + dur, err := time.ParseDuration(d.config.ContainerLogGracePeriod) + if err != nil { + return fmt.Errorf("failed to parse 'container_log_grace_period' duaration: %v", err) + } + if dur < containerLogGracePeriodMinimum { + return fmt.Errorf("container_log_grace_period is less than minimum, %v", containerLogGracePeriodMinimum) + } + d.config.containerLogGracePeriodDuration = dur + } + if d.config.InfraImagePullTimeout != "" { dur, err := time.ParseDuration(d.config.InfraImagePullTimeout) if err != nil { diff --git a/drivers/docker/docklog/docker_logger.go b/drivers/docker/docklog/docker_logger.go index 29668b664a3..595a0ed6583 100644 --- a/drivers/docker/docklog/docker_logger.go +++ b/drivers/docker/docklog/docker_logger.go @@ -6,6 +6,7 @@ import ( "math/rand" "strings" "sync" + "sync/atomic" "time" docker "github.com/fsouza/go-dockerclient" @@ -40,6 +41,10 @@ type StartOpts struct { // from StartTime int64 + // GracePeriod is the time in which we can attempt to collect logs from a stopped + // container, if none have been read yet. + GracePeriod time.Duration + // TLS settings for docker client TLSCert string TLSKey string @@ -56,28 +61,72 @@ func NewDockerLogger(logger hclog.Logger) DockerLogger { // dockerLogger implements the DockerLogger interface type dockerLogger struct { - logger hclog.Logger + logger hclog.Logger + containerID string stdout io.WriteCloser stderr io.WriteCloser stdLock sync.Mutex - cancelCtx context.CancelFunc - doneCh chan interface{} + // containerDoneCtx is called when the container dies, indicating that there will be no + // more logs to be read. + containerDoneCtx context.CancelFunc + + // read indicates whether we have read anything from the logs. This is manipulated + // using the sync package via multiple goroutines. + read int64 + + doneCh chan interface{} + // readDelay is used in testing to delay reads, simulating race conditions between + // container exits and reading + readDelay *time.Duration } // Start log monitoring func (d *dockerLogger) Start(opts *StartOpts) error { + d.containerID = opts.ContainerID + client, err := d.getDockerClient(opts) if err != nil { return fmt.Errorf("failed to open docker client: %v", err) } - ctx, cancel := context.WithCancel(context.Background()) - d.cancelCtx = cancel + // Set up a ctx which is called when the container quits. + containerDoneCtx, cancel := context.WithCancel(context.Background()) + d.containerDoneCtx = cancel + + // Set up a ctx which will be cancelled when we stop reading logs. This + // grace period allows us to collect logs from stopped containers if none + // have yet been read. + ctx, cancelStreams := context.WithCancel(context.Background()) + go func() { + <-containerDoneCtx.Done() + + // Wait until we've read from the logs to exit. + timeout := time.After(opts.GracePeriod) + for { + select { + case <-time.After(time.Second): + if d.read > 0 { + cancelStreams() + return + } + case <-timeout: + cancelStreams() + return + } + } + + }() go func() { defer close(d.doneCh) + defer d.cleanup() + + if d.readDelay != nil { + // Allows us to simulate reading from stopped containers in testing. + <-time.After(*d.readDelay) + } stdout, stderr, err := d.openStreams(ctx, opts) if err != nil { @@ -106,7 +155,11 @@ func (d *dockerLogger) Start(opts *StartOpts) error { } err := client.Logs(logOpts) - if ctx.Err() != nil { + // If we've been reading logs and the container is done we can safely break + // the loop + if containerDoneCtx.Err() != nil && d.read != 0 { + return + } else if ctx.Err() != nil { // If context is terminated then we can safely break the loop return } else if err == nil { @@ -123,7 +176,7 @@ func (d *dockerLogger) Start(opts *StartOpts) error { sinceTime = time.Now() - container, err := client.InspectContainerWithOptions(docker.InspectContainerOptions{ + _, err = client.InspectContainerWithOptions(docker.InspectContainerOptions{ ID: opts.ContainerID, }) if err != nil { @@ -131,8 +184,6 @@ func (d *dockerLogger) Start(opts *StartOpts) error { if !notFoundOk { return } - } else if !container.State.Running { - return } } }() @@ -169,25 +220,45 @@ func (d *dockerLogger) openStreams(ctx context.Context, opts *StartOpts) (stdout } } - if ctx.Err() != nil { - // Stop was called and don't need files anymore - stdoutF.Close() - stderrF.Close() - return nil, nil, ctx.Err() - } - d.stdLock.Lock() d.stdout, d.stderr = stdoutF, stderrF d.stdLock.Unlock() - return stdoutF, stderrF, nil + return d.streamCopier(stdoutF), d.streamCopier(stderrF), nil +} + +// streamCopier copies into the given writer and sets a flag indicating that +// we have read some logs. +func (d *dockerLogger) streamCopier(to io.WriteCloser) io.WriteCloser { + return &copier{read: &d.read, writer: to} +} + +type copier struct { + read *int64 + writer io.WriteCloser +} + +func (c *copier) Write(p []byte) (n int, err error) { + if *c.read == 0 { + atomic.AddInt64(c.read, int64(len(p))) + } + return c.writer.Write(p) +} + +func (c *copier) Close() error { + return c.writer.Close() } // Stop log monitoring func (d *dockerLogger) Stop() error { - if d.cancelCtx != nil { - d.cancelCtx() + if d.containerDoneCtx != nil { + d.containerDoneCtx() } + return nil +} + +func (d *dockerLogger) cleanup() error { + d.logger.Debug("cleaning up", "container_id", d.containerID) d.stdLock.Lock() stdout, stderr := d.stdout, d.stderr diff --git a/drivers/docker/docklog/docker_logger_test.go b/drivers/docker/docklog/docker_logger_test.go index 52af6d70b6b..efca07a2820 100644 --- a/drivers/docker/docklog/docker_logger_test.go +++ b/drivers/docker/docklog/docker_logger_test.go @@ -115,6 +115,178 @@ func TestDockerLogger_Success(t *testing.T) { }) } +func TestDockerLogger_OutOfOrderSuccess(t *testing.T) { + ctu.DockerCompatible(t) + + t.Parallel() + require := require.New(t) + + containerImage, containerImageName, containerImageTag := testContainerDetails() + + client, err := docker.NewClientFromEnv() + if err != nil { + t.Skip("docker unavailable:", err) + } + + if img, err := client.InspectImage(containerImage); err != nil || img == nil { + t.Log("image not found locally, downloading...") + err = client.PullImage(docker.PullImageOptions{ + Repository: containerImageName, + Tag: containerImageTag, + }, docker.AuthConfiguration{}) + require.NoError(err, "failed to pull image") + } + + containerConf := docker.CreateContainerOptions{ + Config: &docker.Config{ + Cmd: []string{ + "sh", "-c", "touch ~/docklog; tail -f ~/docklog", + }, + Image: containerImage, + }, + Context: context.Background(), + } + + container, err := client.CreateContainer(containerConf) + require.NoError(err) + + defer client.RemoveContainer(docker.RemoveContainerOptions{ + ID: container.ID, + Force: true, + }) + + err = client.StartContainer(container.ID, nil) + require.NoError(err) + + testutil.WaitForResult(func() (bool, error) { + container, err = client.InspectContainer(container.ID) + if err != nil { + return false, err + } + if !container.State.Running { + return false, fmt.Errorf("container not running") + } + return true, nil + }, func(err error) { + require.NoError(err) + }) + + stdout := &noopCloser{bytes.NewBuffer(nil)} + stderr := &noopCloser{bytes.NewBuffer(nil)} + + echoToContainer(t, client, container.ID, "abc") + echoToContainer(t, client, container.ID, "123") + + dl := NewDockerLogger(testlog.HCLogger(t)).(*dockerLogger) + dl.stdout = stdout + dl.stderr = stderr + require.NoError(dl.Start(&StartOpts{ + ContainerID: container.ID, + })) + + testutil.WaitForResult(func() (bool, error) { + act := stdout.String() + if "abc\n123\n" != act { + return false, fmt.Errorf("expected abc\\n123\\n for stdout but got %s", act) + } + + return true, nil + }, func(err error) { + require.NoError(err) + }) +} + +func TestDockerLogger_WaitSuccess(t *testing.T) { + ctu.DockerCompatible(t) + + t.Parallel() + require := require.New(t) + + containerImage, containerImageName, containerImageTag := testContainerDetails() + + client, err := docker.NewClientFromEnv() + if err != nil { + t.Skip("docker unavailable:", err) + } + + if img, err := client.InspectImage(containerImage); err != nil || img == nil { + t.Log("image not found locally, downloading...") + err = client.PullImage(docker.PullImageOptions{ + Repository: containerImageName, + Tag: containerImageTag, + }, docker.AuthConfiguration{}) + require.NoError(err, "failed to pull image") + } + + containerConf := docker.CreateContainerOptions{ + Config: &docker.Config{ + Cmd: []string{ + "sh", "-c", "touch ~/docklog; tail -f ~/docklog", + }, + Image: containerImage, + }, + Context: context.Background(), + } + + container, err := client.CreateContainer(containerConf) + require.NoError(err) + + defer client.RemoveContainer(docker.RemoveContainerOptions{ + ID: container.ID, + Force: true, + }) + + err = client.StartContainer(container.ID, nil) + require.NoError(err) + + testutil.WaitForResult(func() (bool, error) { + container, err = client.InspectContainer(container.ID) + if err != nil { + return false, err + } + if !container.State.Running { + return false, fmt.Errorf("container not running") + } + return true, nil + }, func(err error) { + require.NoError(err) + }) + + stdout := &noopCloser{bytes.NewBuffer(nil)} + stderr := &noopCloser{bytes.NewBuffer(nil)} + + echoToContainer(t, client, container.ID, "abc") + echoToContainer(t, client, container.ID, "123") + + <-time.After(time.Second) + + err = client.StopContainer(container.ID, 0) + require.NoError(err) + + <-time.After(time.Second) + + dl := NewDockerLogger(testlog.HCLogger(t)).(*dockerLogger) + dl.stdout = stdout + dl.stderr = stderr + delay := time.Second + dl.readDelay = &delay + require.NoError(dl.Start(&StartOpts{ + ContainerID: container.ID, + })) + dl.containerDoneCtx() + + testutil.WaitForResult(func() (bool, error) { + act := stdout.String() + if "abc\n123\n" != act { + return false, fmt.Errorf("expected abc\\n123\\n for stdout but got %s", act) + } + + return true, nil + }, func(err error) { + require.NoError(err) + }) +} + func TestDockerLogger_Success_TTY(t *testing.T) { ci.Parallel(t) ctu.DockerCompatible(t) diff --git a/drivers/docker/driver.go b/drivers/docker/driver.go index ec2455515df..146e92ed469 100644 --- a/drivers/docker/driver.go +++ b/drivers/docker/driver.go @@ -172,6 +172,7 @@ func (d *Driver) setupNewDockerLogger(container *docker.Container, cfg *drivers. TLSKey: d.config.TLS.Key, TLSCA: d.config.TLS.CA, StartTime: startTime.Unix(), + GracePeriod: d.config.containerLogGracePeriodDuration, }); err != nil { pluginClient.Kill() return nil, nil, fmt.Errorf("failed to launch docker logger process %s: %v", container.ID, err) @@ -203,16 +204,17 @@ func (d *Driver) RecoverTask(handle *drivers.TaskHandle) error { } h := &taskHandle{ - client: client, - waitClient: waitClient, - logger: d.logger.With("container_id", container.ID), - task: handle.Config, - containerID: container.ID, - containerImage: container.Image, - doneCh: make(chan bool), - waitCh: make(chan struct{}), - removeContainerOnExit: d.config.GC.Container, - net: handleState.DriverNetwork, + client: client, + waitClient: waitClient, + logger: d.logger.With("container_id", container.ID), + task: handle.Config, + containerID: container.ID, + containerImage: container.Image, + doneCh: make(chan bool), + waitCh: make(chan struct{}), + removeContainerOnExit: d.config.GC.Container, + net: handleState.DriverNetwork, + containerLogGracePeriod: d.config.containerLogGracePeriodDuration, } if !d.config.DisableLogCollection { @@ -383,18 +385,19 @@ CREATE: // Return a driver handle h := &taskHandle{ - client: client, - waitClient: waitClient, - dlogger: dlogger, - dloggerPluginClient: pluginClient, - logger: d.logger.With("container_id", container.ID), - task: cfg, - containerID: container.ID, - containerImage: container.Image, - doneCh: make(chan bool), - waitCh: make(chan struct{}), - removeContainerOnExit: d.config.GC.Container, - net: net, + client: client, + waitClient: waitClient, + dlogger: dlogger, + dloggerPluginClient: pluginClient, + logger: d.logger.With("container_id", container.ID), + task: cfg, + containerID: container.ID, + containerImage: container.Image, + doneCh: make(chan bool), + waitCh: make(chan struct{}), + removeContainerOnExit: d.config.GC.Container, + net: net, + containerLogGracePeriod: d.config.containerLogGracePeriodDuration, } if err := handle.SetDriverState(h.buildState()); err != nil { diff --git a/drivers/docker/handle.go b/drivers/docker/handle.go index 59515f2f97f..f4eec732ac8 100644 --- a/drivers/docker/handle.go +++ b/drivers/docker/handle.go @@ -21,18 +21,19 @@ import ( ) type taskHandle struct { - client *docker.Client - waitClient *docker.Client - logger hclog.Logger - dlogger docklog.DockerLogger - dloggerPluginClient *plugin.Client - task *drivers.TaskConfig - containerID string - containerImage string - doneCh chan bool - waitCh chan struct{} - removeContainerOnExit bool - net *drivers.DriverNetwork + client *docker.Client + waitClient *docker.Client + logger hclog.Logger + dlogger docklog.DockerLogger + dloggerPluginClient *plugin.Client + task *drivers.TaskConfig + containerID string + containerImage string + doneCh chan bool + waitCh chan struct{} + removeContainerOnExit bool + containerLogGracePeriod time.Duration + net *drivers.DriverNetwork exitResult *drivers.ExitResult exitResultLock sync.Mutex @@ -216,11 +217,15 @@ func (h *taskHandle) shutdownLogger() { return } - if err := h.dlogger.Stop(); err != nil { - h.logger.Error("failed to stop docker logger process during StopTask", - "error", err, "logger_pid", h.dloggerPluginClient.ReattachConfig().Pid) - } - h.dloggerPluginClient.Kill() + go func() { + h.logger.Info("sending stop signal to logger", "job_id", h.task.JobID, "container_id", h.containerID) + if err := h.dlogger.Stop(); err != nil { + h.logger.Error("failed to stop docker logger process during StopTask", + "error", err, "logger_pid", h.dloggerPluginClient.ReattachConfig().Pid) + } + <-time.After(h.containerLogGracePeriod) + h.dloggerPluginClient.Kill() + }() } func (h *taskHandle) run() {