Skip to content

Commit

Permalink
Handle errors in log producer gracefully
Browse files Browse the repository at this point in the history
If the context is done, we close the log producer.
That is not an error, the context cancellation signals that the consumer
should stop.

If there is a non-context error during the HTTP call or while reading
the response, retry the HTTP request in 1 second again.

Previously, the error handling was inconsistent:
- an error while reading HTTP response headers would retry
  the HTTP request
- but an error while reading the body would just end the log producer

With this commit, the error handling should be more consistent.
  • Loading branch information
martin-sucha committed Aug 23, 2023
1 parent 0b6d915 commit bc8715e
Showing 1 changed file with 21 additions and 14 deletions.
35 changes: 21 additions & 14 deletions docker.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ const (
ReaperDefault = "reaper_default" // Default network name when bridge is not available
packagePath = "github.com/testcontainers/testcontainers-go"

logStoppedForOutOfSyncMessage = "Stopping log consumer: Headers out of sync"
logRestartedForOutOfSyncMessage = "headers out of sync, will retry"
)

// DockerContainer represents a container started using Docker
Expand Down Expand Up @@ -633,25 +633,27 @@ func (c *DockerContainer) StartLogProducer(ctx context.Context) error {
r, err := c.provider.client.ContainerLogs(ctx, c.GetContainerID(), options)
if err != nil {
// if we can't get the logs, retry in one second.
c.logger.Printf("cannot get logs for container %q: %v", c.ID, err)
if ctx.Err() != nil {
// context done.
return
}
c.logger.Printf("cannot get logs for container %q: %v", c.ID, err)
time.Sleep(1 * time.Second)
goto BEGIN
}
defer c.provider.Close()

for {
select {
case <-ctx.Done():
if ctx.Err() != nil {
err := r.Close()
if err != nil {
// we can't close the read closer, this should never happen
panic(err)
}
return
}
select {
case <-ctx.Done():
continue
default:
h := make([]byte, 8)
_, err := io.ReadFull(r, h)
Expand All @@ -662,13 +664,15 @@ func (c *DockerContainer) StartLogProducer(ctx context.Context) error {
since = fmt.Sprintf("%d.%09d", now.Unix(), int64(now.Nanosecond()))
goto BEGIN
}
if errors.Is(err, context.DeadlineExceeded) {
// Probably safe to continue here
if errors.Is(err, context.DeadlineExceeded) || errors.Is(err, context.Canceled) {
// If the outer context is done, loop will exit in the next iteration.
continue
}
_, _ = fmt.Fprintf(os.Stderr, "container log error: %+v. %s", err, logStoppedForOutOfSyncMessage)
_, _ = fmt.Fprintf(os.Stderr, "read log header: %+v. %s", err, logRestartedForOutOfSyncMessage)
// if we would continue here, the next header-read will result into random data...
return
// we need to restart the whole request.
time.Sleep(1 * time.Second)
goto BEGIN
}

count := binary.BigEndian.Uint32(h[4:])
Expand All @@ -690,13 +694,16 @@ func (c *DockerContainer) StartLogProducer(ctx context.Context) error {
if err != nil {
// TODO: add-logger: use logger to log out this error
_, _ = fmt.Fprintf(os.Stderr, "error occurred reading log with known length %s", err.Error())
if errors.Is(err, context.DeadlineExceeded) {
// Probably safe to continue here

if errors.Is(err, context.DeadlineExceeded) || errors.Is(err, context.Canceled) {
// If the outer context is done, loop will exit in the next iteration.
continue
}
// we can not continue here as the next read most likely will not be the next header
_, _ = fmt.Fprintln(os.Stderr, logStoppedForOutOfSyncMessage)
return
// if we would continue here, the next header-read will result into random data...
// we need to restart the whole request.
_, _ = fmt.Fprintf(os.Stderr, "read log message: %+v. %s", err, logRestartedForOutOfSyncMessage)
time.Sleep(1 * time.Second)
goto BEGIN
}
for _, c := range c.consumers {
c.Accept(Log{
Expand Down

0 comments on commit bc8715e

Please sign in to comment.