Skip to content

Commit

Permalink
Allow docklog to collect logs from stopped containers
Browse files Browse the repository at this point in the history
Short-lived containers (especially those < 1 second) often do not have
thier logs sent to Nomad.

This PR adjusts the nomad docker driver and docker logging driver to:

1. Enable docklog to run after a container has stopped (to some grace
   period limit)
2. Collect logs from stopped containers up until the grace period

This fixes the current issues:

1. docklog is killed by the handlea as soon as the task finishes, which
   means fast running containers can never have their logs scraped
2. docklog quits streaming logs in its event loop if the container has
   stopped

In order to do this, we need to know _whether_ we have read logs for the
current container in order to apply a grace period.  We add a copier to
the fifo streams which sets an atomic flag, letting us know whether we
need to retry reading the logs and use a grace period or if we can quit
early.

Fixes hashicorp#2475, hashicorp#6931.

Always wait to read from logs before exiting

Store number of bytes read vs a simple counter
  • Loading branch information
tonyhb authored and shoenig committed Jun 9, 2022
1 parent 2697e63 commit e1aabe6
Show file tree
Hide file tree
Showing 5 changed files with 347 additions and 75 deletions.
55 changes: 38 additions & 17 deletions drivers/docker/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,13 @@ var (
// disable_log_collection indicates whether docker driver should collect logs of docker
// task containers. If true, nomad doesn't start docker_logger/logmon processes
"disable_log_collection": hclspec.NewAttr("disable_log_collection", "bool", false),

// container_log_grace_period is the period in which the docker logger remains active if the
// container exits and no logs have been read yet.
"container_log_grace_period": hclspec.NewDefault(
hclspec.NewAttr("container_log_grace_period", "string", false),
hclspec.NewLiteral(`"10s"`),
),
})

// mountBodySpec is the hcl specification for the `mount` block
Expand Down Expand Up @@ -611,23 +618,25 @@ type ContainerGCConfig struct {
}

type DriverConfig struct {
Endpoint string `codec:"endpoint"`
Auth AuthConfig `codec:"auth"`
TLS TLSConfig `codec:"tls"`
GC GCConfig `codec:"gc"`
Volumes VolumeConfig `codec:"volumes"`
AllowPrivileged bool `codec:"allow_privileged"`
AllowCaps []string `codec:"allow_caps"`
GPURuntimeName string `codec:"nvidia_runtime"`
InfraImage string `codec:"infra_image"`
InfraImagePullTimeout string `codec:"infra_image_pull_timeout"`
infraImagePullTimeoutDuration time.Duration `codec:"-"`
DisableLogCollection bool `codec:"disable_log_collection"`
PullActivityTimeout string `codec:"pull_activity_timeout"`
PidsLimit int64 `codec:"pids_limit"`
pullActivityTimeoutDuration time.Duration `codec:"-"`
ExtraLabels []string `codec:"extra_labels"`
Logging LoggingConfig `codec:"logging"`
Endpoint string `codec:"endpoint"`
Auth AuthConfig `codec:"auth"`
TLS TLSConfig `codec:"tls"`
GC GCConfig `codec:"gc"`
Volumes VolumeConfig `codec:"volumes"`
AllowPrivileged bool `codec:"allow_privileged"`
AllowCaps []string `codec:"allow_caps"`
GPURuntimeName string `codec:"nvidia_runtime"`
InfraImage string `codec:"infra_image"`
InfraImagePullTimeout string `codec:"infra_image_pull_timeout"`
infraImagePullTimeoutDuration time.Duration `codec:"-"`
DisableLogCollection bool `codec:"disable_log_collection"`
ContainerLogGracePeriod string `codec:"container_log_grace_period"`
containerLogGracePeriodDuration time.Duration `codec:"-"`
PullActivityTimeout string `codec:"pull_activity_timeout"`
PidsLimit int64 `codec:"pids_limit"`
pullActivityTimeoutDuration time.Duration `codec:"-"`
ExtraLabels []string `codec:"extra_labels"`
Logging LoggingConfig `codec:"logging"`

AllowRuntimesList []string `codec:"allow_runtimes"`
allowRuntimes map[string]struct{} `codec:"-"`
Expand Down Expand Up @@ -673,6 +682,7 @@ func (d *Driver) ConfigSchema() (*hclspec.Spec, error) {

const danglingContainersCreationGraceMinimum = 1 * time.Minute
const pullActivityTimeoutMinimum = 1 * time.Minute
const containerLogGracePeriodMinimum = 2 * time.Second

func (d *Driver) SetConfig(c *base.Config) error {
var config DriverConfig
Expand Down Expand Up @@ -723,6 +733,17 @@ func (d *Driver) SetConfig(c *base.Config) error {
d.config.pullActivityTimeoutDuration = dur
}

if len(d.config.ContainerLogGracePeriod) > 0 {
dur, err := time.ParseDuration(d.config.ContainerLogGracePeriod)
if err != nil {
return fmt.Errorf("failed to parse 'container_log_grace_period' duaration: %v", err)
}
if dur < containerLogGracePeriodMinimum {
return fmt.Errorf("container_log_grace_period is less than minimum, %v", containerLogGracePeriodMinimum)
}
d.config.containerLogGracePeriodDuration = dur
}

if d.config.InfraImagePullTimeout != "" {
dur, err := time.ParseDuration(d.config.InfraImagePullTimeout)
if err != nil {
Expand Down
109 changes: 90 additions & 19 deletions drivers/docker/docklog/docker_logger.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"math/rand"
"strings"
"sync"
"sync/atomic"
"time"

docker "github.com/fsouza/go-dockerclient"
Expand Down Expand Up @@ -40,6 +41,10 @@ type StartOpts struct {
// from
StartTime int64

// GracePeriod is the time in which we can attempt to collect logs from a stopped
// container, if none have been read yet.
GracePeriod time.Duration

// TLS settings for docker client
TLSCert string
TLSKey string
Expand All @@ -56,28 +61,72 @@ func NewDockerLogger(logger hclog.Logger) DockerLogger {

// dockerLogger implements the DockerLogger interface
type dockerLogger struct {
logger hclog.Logger
logger hclog.Logger
containerID string

stdout io.WriteCloser
stderr io.WriteCloser
stdLock sync.Mutex

cancelCtx context.CancelFunc
doneCh chan interface{}
// containerDoneCtx is called when the container dies, indicating that there will be no
// more logs to be read.
containerDoneCtx context.CancelFunc

// read indicates whether we have read anything from the logs. This is manipulated
// using the sync package via multiple goroutines.
read int64

doneCh chan interface{}
// readDelay is used in testing to delay reads, simulating race conditions between
// container exits and reading
readDelay *time.Duration
}

// Start log monitoring
func (d *dockerLogger) Start(opts *StartOpts) error {
d.containerID = opts.ContainerID

client, err := d.getDockerClient(opts)
if err != nil {
return fmt.Errorf("failed to open docker client: %v", err)
}

ctx, cancel := context.WithCancel(context.Background())
d.cancelCtx = cancel
// Set up a ctx which is called when the container quits.
containerDoneCtx, cancel := context.WithCancel(context.Background())
d.containerDoneCtx = cancel

// Set up a ctx which will be cancelled when we stop reading logs. This
// grace period allows us to collect logs from stopped containers if none
// have yet been read.
ctx, cancelStreams := context.WithCancel(context.Background())
go func() {
<-containerDoneCtx.Done()

// Wait until we've read from the logs to exit.
timeout := time.After(opts.GracePeriod)
for {
select {
case <-time.After(time.Second):
if d.read > 0 {
cancelStreams()
return
}
case <-timeout:
cancelStreams()
return
}
}

}()

go func() {
defer close(d.doneCh)
defer d.cleanup()

if d.readDelay != nil {
// Allows us to simulate reading from stopped containers in testing.
<-time.After(*d.readDelay)
}

stdout, stderr, err := d.openStreams(ctx, opts)
if err != nil {
Expand Down Expand Up @@ -106,7 +155,11 @@ func (d *dockerLogger) Start(opts *StartOpts) error {
}

err := client.Logs(logOpts)
if ctx.Err() != nil {
// If we've been reading logs and the container is done we can safely break
// the loop
if containerDoneCtx.Err() != nil && d.read != 0 {
return
} else if ctx.Err() != nil {
// If context is terminated then we can safely break the loop
return
} else if err == nil {
Expand All @@ -123,16 +176,14 @@ func (d *dockerLogger) Start(opts *StartOpts) error {

sinceTime = time.Now()

container, err := client.InspectContainerWithOptions(docker.InspectContainerOptions{
_, err = client.InspectContainerWithOptions(docker.InspectContainerOptions{
ID: opts.ContainerID,
})
if err != nil {
_, notFoundOk := err.(*docker.NoSuchContainer)
if !notFoundOk {
return
}
} else if !container.State.Running {
return
}
}
}()
Expand Down Expand Up @@ -169,25 +220,45 @@ func (d *dockerLogger) openStreams(ctx context.Context, opts *StartOpts) (stdout
}
}

if ctx.Err() != nil {
// Stop was called and don't need files anymore
stdoutF.Close()
stderrF.Close()
return nil, nil, ctx.Err()
}

d.stdLock.Lock()
d.stdout, d.stderr = stdoutF, stderrF
d.stdLock.Unlock()

return stdoutF, stderrF, nil
return d.streamCopier(stdoutF), d.streamCopier(stderrF), nil
}

// streamCopier copies into the given writer and sets a flag indicating that
// we have read some logs.
func (d *dockerLogger) streamCopier(to io.WriteCloser) io.WriteCloser {
return &copier{read: &d.read, writer: to}
}

type copier struct {
read *int64
writer io.WriteCloser
}

func (c *copier) Write(p []byte) (n int, err error) {
if *c.read == 0 {
atomic.AddInt64(c.read, int64(len(p)))
}
return c.writer.Write(p)
}

func (c *copier) Close() error {
return c.writer.Close()
}

// Stop log monitoring
func (d *dockerLogger) Stop() error {
if d.cancelCtx != nil {
d.cancelCtx()
if d.containerDoneCtx != nil {
d.containerDoneCtx()
}
return nil
}

func (d *dockerLogger) cleanup() error {
d.logger.Debug("cleaning up", "container_id", d.containerID)

d.stdLock.Lock()
stdout, stderr := d.stdout, d.stderr
Expand Down
Loading

0 comments on commit e1aabe6

Please sign in to comment.