Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: container logging deadlocks #2791

Merged
merged 1 commit into from
Sep 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
261 changes: 137 additions & 124 deletions docker.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"bufio"
"context"
"encoding/base64"
"encoding/binary"
"encoding/json"
"errors"
"fmt"
Expand All @@ -17,7 +16,6 @@ import (
"path/filepath"
"regexp"
"strings"
"sync"
"time"

"github.com/cenkalti/backoff/v4"
Expand All @@ -30,6 +28,7 @@ import (
"github.com/docker/docker/client"
"github.com/docker/docker/errdefs"
"github.com/docker/docker/pkg/jsonmessage"
"github.com/docker/docker/pkg/stdcopy"
"github.com/docker/go-connections/nat"
"github.com/moby/term"
specs "github.com/opencontainers/image-spec/specs-go/v1"
Expand All @@ -48,11 +47,21 @@ const (
Podman = "podman"
ReaperDefault = "reaper_default" // Default network name when bridge is not available
packagePath = "github.com/testcontainers/testcontainers-go"

logStoppedForOutOfSyncMessage = "Stopping log consumer: Headers out of sync"
)

var createContainerFailDueToNameConflictRegex = regexp.MustCompile("Conflict. The container name .* is already in use by container .*")
var (
// createContainerFailDueToNameConflictRegex is a regular expression that matches the container is already in use error.
createContainerFailDueToNameConflictRegex = regexp.MustCompile("Conflict. The container name .* is already in use by container .*")

// minLogProductionTimeout is the minimum log production timeout.
minLogProductionTimeout = time.Duration(5 * time.Second)

// maxLogProductionTimeout is the maximum log production timeout.
maxLogProductionTimeout = time.Duration(60 * time.Second)

// errLogProductionStop is the cause for stopping log production.
errLogProductionStop = errors.New("log production stopped")
)

// DockerContainer represents a container started using Docker
type DockerContainer struct {
Expand All @@ -65,23 +74,19 @@ type DockerContainer struct {
isRunning bool
imageWasBuilt bool
// keepBuiltImage makes Terminate not remove the image if imageWasBuilt.
keepBuiltImage bool
provider *DockerProvider
sessionID string
terminationSignal chan bool
consumers []LogConsumer
logProductionError chan error
keepBuiltImage bool
provider *DockerProvider
sessionID string
terminationSignal chan bool
consumers []LogConsumer

// TODO: Remove locking and wait group once the deprecated StartLogProducer and
// StopLogProducer have been removed and hence logging can only be started and
// stopped once.

// logProductionWaitGroup is used to signal when the log production has stopped.
// This allows stopLogProduction to safely set logProductionStop to nil.
// See simplification in https://go.dev/play/p/x0pOElF2Vjf
logProductionWaitGroup sync.WaitGroup

logProductionStop chan struct{}
// logProductionCancel is used to signal the log production to stop.
logProductionCancel context.CancelCauseFunc
logProductionCtx context.Context

logProductionTimeout *time.Duration
logger Logging
Expand Down Expand Up @@ -263,7 +268,6 @@ func (c *DockerContainer) Stop(ctx context.Context, timeout *time.Duration) erro
// without exposing the ability to fully initialize the container state.
// See: https://github.com/testcontainers/testcontainers-go/issues/2667
// TODO: Add a check for isRunning when the above issue is resolved.

err := c.stoppingHook(ctx)
if err != nil {
return fmt.Errorf("stopping hook: %w", err)
Expand Down Expand Up @@ -310,7 +314,7 @@ func (c *DockerContainer) Terminate(ctx context.Context) error {
}

select {
// close reaper if it was created
// Close reaper connection if it was attached.
case c.terminationSignal <- true:
default:
}
Expand Down Expand Up @@ -690,6 +694,29 @@ func (c *DockerContainer) copyToContainer(ctx context.Context, fileContent func(
return nil
}

// logConsumerWriter is a writer that writes to a LogConsumer.
type logConsumerWriter struct {
log Log
consumers []LogConsumer
stevenh marked this conversation as resolved.
Show resolved Hide resolved
}

// newLogConsumerWriter creates a new logConsumerWriter for logType that sends messages to all consumers.
func newLogConsumerWriter(logType string, consumers []LogConsumer) *logConsumerWriter {
return &logConsumerWriter{
log: Log{LogType: logType},
consumers: consumers,
}
}

// Write writes the p content to all consumers.
func (lw logConsumerWriter) Write(p []byte) (int, error) {
lw.log.Content = p
for _, consumer := range lw.consumers {
consumer.Accept(lw.log)
}
return len(p), nil
}

type LogProductionOption func(*DockerContainer)

// WithLogProductionTimeout is a functional option that sets the timeout for the log production.
Expand All @@ -707,124 +734,94 @@ func (c *DockerContainer) StartLogProducer(ctx context.Context, opts ...LogProdu

// startLogProduction will start a concurrent process that will continuously read logs
// from the container and will send them to each added LogConsumer.
//
// Default log production timeout is 5s. It is used to set the context timeout
// which means that each log-reading loop will last at least the specified timeout
// and that it cannot be cancelled earlier.
// which means that each log-reading loop will last at up to the specified timeout.
//
// Use functional option WithLogProductionTimeout() to override default timeout. If it's
// lower than 5s and greater than 60s it will be set to 5s or 60s respectively.
func (c *DockerContainer) startLogProduction(ctx context.Context, opts ...LogProductionOption) error {
c.logProductionStop = make(chan struct{}, 1) // buffered channel to avoid blocking
c.logProductionWaitGroup.Add(1)

for _, opt := range opts {
opt(c)
}

minLogProductionTimeout := time.Duration(5 * time.Second)
maxLogProductionTimeout := time.Duration(60 * time.Second)

if c.logProductionTimeout == nil {
// Validate the log production timeout.
switch {
case c.logProductionTimeout == nil:
c.logProductionTimeout = &minLogProductionTimeout
}

if *c.logProductionTimeout < minLogProductionTimeout {
case *c.logProductionTimeout < minLogProductionTimeout:
c.logProductionTimeout = &minLogProductionTimeout
}

if *c.logProductionTimeout > maxLogProductionTimeout {
case *c.logProductionTimeout > maxLogProductionTimeout:
c.logProductionTimeout = &maxLogProductionTimeout
}

c.logProductionError = make(chan error, 1)
// Setup the log writers.
stdout := newLogConsumerWriter(StdoutLog, c.consumers)
stderr := newLogConsumerWriter(StderrLog, c.consumers)

// Setup the log production context which will be used to stop the log production.
c.logProductionCtx, c.logProductionCancel = context.WithCancelCause(ctx)

go func() {
defer func() {
close(c.logProductionError)
c.logProductionWaitGroup.Done()
}()

since := ""
// if the socket is closed we will make additional logs request with updated Since timestamp
BEGIN:
options := container.LogsOptions{
ShowStdout: true,
ShowStderr: true,
Follow: true,
Since: since,
}
err := c.logProducer(stdout, stderr)
// Set context cancel cause, if not already set.
c.logProductionCancel(err)
}()

ctx, cancel := context.WithTimeout(ctx, *c.logProductionTimeout)
return nil
}

// logProducer read logs from the container and writes them to stdout, stderr until either:
// - logProductionCtx is done
// - A fatal error occurs
// - No more logs are available
func (c *DockerContainer) logProducer(stdout, stderr io.Writer) error {
// Clean up idle client connections.
defer c.provider.Close()

// Setup the log options, start from the beginning.
options := container.LogsOptions{
ShowStdout: true,
ShowStderr: true,
Follow: true,
}

for {
timeoutCtx, cancel := context.WithTimeout(c.logProductionCtx, *c.logProductionTimeout)
defer cancel()

r, err := c.provider.client.ContainerLogs(ctx, c.GetContainerID(), options)
if err != nil {
c.logProductionError <- err
return
err := c.copyLogs(timeoutCtx, stdout, stderr, options)
switch {
case err == nil:
// No more logs available.
return nil
case c.logProductionCtx.Err() != nil:
// Log production was stopped or caller context is done.
return nil
case timeoutCtx.Err() != nil, errors.Is(err, net.ErrClosed):
// Timeout or client connection closed, retry.
default:
// Unexpected error, retry.
Logger.Printf("Unexpected error reading logs: %v", err)
}
defer c.provider.Close()

for {
select {
case <-c.logProductionStop:
c.logProductionError <- r.Close()
return
default:
}
h := make([]byte, 8)
_, err := io.ReadFull(r, h)
if err != nil {
switch {
case err == io.EOF:
// No more logs coming
case errors.Is(err, net.ErrClosed):
now := time.Now()
since = fmt.Sprintf("%d.%09d", now.Unix(), int64(now.Nanosecond()))
goto BEGIN
case errors.Is(err, context.DeadlineExceeded) || errors.Is(err, context.Canceled):
// Probably safe to continue here
continue
default:
_, _ = fmt.Fprintf(os.Stderr, "container log error: %+v. %s", err, logStoppedForOutOfSyncMessage)
// if we would continue here, the next header-read will result into random data...
}
return
}

count := binary.BigEndian.Uint32(h[4:])
if count == 0 {
continue
}
logType := h[0]
if logType > 2 {
_, _ = fmt.Fprintf(os.Stderr, "received invalid log type: %d", logType)
// sometimes docker returns logType = 3 which is an undocumented log type, so treat it as stdout
logType = 1
}
// Retry from the last log received.
now := time.Now()
options.Since = fmt.Sprintf("%d.%09d", now.Unix(), int64(now.Nanosecond()))
}
}

// a map of the log type --> int representation in the header, notice the first is blank, this is stdin, but the go docker client doesn't allow following that in logs
logTypes := []string{"", StdoutLog, StderrLog}
// copyLogs copies logs from the container to stdout and stderr.
func (c *DockerContainer) copyLogs(ctx context.Context, stdout, stderr io.Writer, options container.LogsOptions) error {
rc, err := c.provider.client.ContainerLogs(ctx, c.GetContainerID(), options)
stevenh marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return fmt.Errorf("container logs: %w", err)
}
defer rc.Close()

b := make([]byte, count)
_, err = io.ReadFull(r, b)
if err != nil {
// TODO: add-logger: use logger to log out this error
_, _ = fmt.Fprintf(os.Stderr, "error occurred reading log with known length %s", err.Error())
if errors.Is(err, context.DeadlineExceeded) || errors.Is(err, context.Canceled) {
// Probably safe to continue here
continue
}
// we can not continue here as the next read most likely will not be the next header
_, _ = fmt.Fprintln(os.Stderr, logStoppedForOutOfSyncMessage)
return
}
for _, c := range c.consumers {
c.Accept(Log{
LogType: logTypes[logType],
Content: b,
})
}
}
}()
if _, err = stdcopy.StdCopy(stdout, stderr, rc); err != nil {
return fmt.Errorf("stdcopy: %w", err)
}

return nil
}
Expand All @@ -837,18 +834,25 @@ func (c *DockerContainer) StopLogProducer() error {
// stopLogProduction will stop the concurrent process that is reading logs
// and sending them to each added LogConsumer
func (c *DockerContainer) stopLogProduction() error {
// signal the log production to stop
c.logProductionStop <- struct{}{}
if c.logProductionCancel == nil {
return nil
}

c.logProductionWaitGroup.Wait()
// Signal the log production to stop.
c.logProductionCancel(errLogProductionStop)

if err := <-c.logProductionError; err != nil {
if errors.Is(err, context.DeadlineExceeded) || errors.Is(err, context.Canceled) {
// Returning context errors is not useful for the consumer.
if err := context.Cause(c.logProductionCtx); err != nil {
switch {
case errors.Is(err, errLogProductionStop):
// Log production was stopped.
return nil
case errors.Is(err, context.DeadlineExceeded),
errors.Is(err, context.Canceled):
// Parent context is done.
return nil
default:
return err
}

return err
}

return nil
Expand All @@ -857,7 +861,16 @@ func (c *DockerContainer) stopLogProduction() error {
// GetLogProductionErrorChannel exposes the only way for the consumer
// to be able to listen to errors and react to them.
func (c *DockerContainer) GetLogProductionErrorChannel() <-chan error {
return c.logProductionError
if c.logProductionCtx == nil {
return nil
}

errCh := make(chan error, 1)
go func() {
<-c.logProductionCtx.Done()
errCh <- context.Cause(c.logProductionCtx)
}()
return errCh
}

// DockerNetwork represents a network started using Docker
Expand Down
Loading