From 2f7aa4fdded05c30869b2a14ca7241505827f3c5 Mon Sep 17 00:00:00 2001 From: Nikolay Edigaryev Date: Tue, 16 Feb 2021 04:13:44 +0300 Subject: [PATCH] Stream launched container's logs to the CLI's logger --- internal/executor/executor_test.go | 19 +++++++ .../containerbackend/containerbackend.go | 1 + .../instance/containerbackend/docker.go | 34 +++++++++++++ .../instance/containerbackend/podman_linux.go | 51 +++++++++++++++++++ .../containerbackend/unimplemented.go | 4 ++ internal/executor/instance/instance.go | 16 ++++++ .../testdata/container-logs/.cirrus.yml | 5 ++ 7 files changed, 130 insertions(+) create mode 100644 internal/executor/testdata/container-logs/.cirrus.yml diff --git a/internal/executor/executor_test.go b/internal/executor/executor_test.go index e4f3e14f..3f2996c8 100644 --- a/internal/executor/executor_test.go +++ b/internal/executor/executor_test.go @@ -17,6 +17,7 @@ import ( "github.com/cirruslabs/echelon" "github.com/cirruslabs/echelon/renderers" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" "io" "io/ioutil" "os" @@ -412,3 +413,21 @@ func TestLoggingNoExtraNewlines(t *testing.T) { assert.Contains(t, buf.String(), "no newline in the output\n\x1b[32m'no_newline' script succeeded") assert.Contains(t, buf.String(), "double newline in the output\n\n\x1b[32m'double' script succeeded") } + +// TestContainerLogs ensures that we receive logs from the agent running inside a container. +func TestContainerLogs(t *testing.T) { + // Create os.Stderr writer that duplicates it's output to buf + buf := bytes.NewBufferString("") + writer := io.MultiWriter(os.Stderr, buf) + + // Create a logger and attach it to writer + renderer := renderers.NewSimpleRenderer(writer, nil) + logger := echelon.NewLogger(echelon.TraceLevel, renderer) + + dir := testutil.TempDirPopulatedWith(t, "testdata/container-logs") + err := testutil.ExecuteWithOptions(t, dir, executor.WithLogger(logger)) + require.NoError(t, err) + assert.Contains(t, buf.String(), "Getting initial commands...") + assert.Regexp(t, "container: [0-9]{4}/[0-9]{2}/[0-9]{2} [0-9]{2}:[0-9]{2}:[0-9]{2}"+ + " Background commands to clean up after: [0-9]+", buf.String()) +} diff --git a/internal/executor/instance/containerbackend/containerbackend.go b/internal/executor/instance/containerbackend/containerbackend.go index bbcd51a4..86ac3e28 100644 --- a/internal/executor/instance/containerbackend/containerbackend.go +++ b/internal/executor/instance/containerbackend/containerbackend.go @@ -33,6 +33,7 @@ type ContainerBackend interface { ContainerCreate(ctx context.Context, input *ContainerCreateInput, name string) (*ContainerCreateOutput, error) ContainerStart(ctx context.Context, id string) error ContainerWait(ctx context.Context, id string) (<-chan ContainerWaitResult, <-chan error) + ContainerLogs(ctx context.Context, id string) (<-chan string, error) ContainerDelete(ctx context.Context, id string) error SystemInfo(ctx context.Context) (*SystemInfo, error) diff --git a/internal/executor/instance/containerbackend/docker.go b/internal/executor/instance/containerbackend/docker.go index 0e4fc354..e83d4647 100644 --- a/internal/executor/instance/containerbackend/docker.go +++ b/internal/executor/instance/containerbackend/docker.go @@ -14,6 +14,7 @@ import ( "github.com/docker/docker/api/types/mount" "github.com/docker/docker/api/types/volume" "github.com/docker/docker/client" + "github.com/docker/docker/pkg/stdcopy" "io" "io/ioutil" ) @@ -266,6 +267,39 @@ func (backend *Docker) ContainerWait(ctx context.Context, id string) (<-chan Con return waitChan, errChan } +func (backend *Docker) ContainerLogs(ctx context.Context, id string) (<-chan string, error) { + logChan := make(chan string) + + multiplexedStream, err := backend.cli.ContainerLogs(ctx, id, types.ContainerLogsOptions{ + ShowStdout: true, + ShowStderr: true, + Follow: true, + }) + if err != nil { + return nil, err + } + + pipeReader, pipeWriter := io.Pipe() + + go func() { + _, _ = stdcopy.StdCopy(pipeWriter, pipeWriter, multiplexedStream) + _ = pipeWriter.Close() + _ = multiplexedStream.Close() + }() + + go func() { + scanner := bufio.NewScanner(pipeReader) + + for scanner.Scan() { + logChan <- scanner.Text() + } + + close(logChan) + }() + + return logChan, nil +} + func (backend *Docker) ContainerDelete(ctx context.Context, id string) error { return backend.cli.ContainerRemove(ctx, id, types.ContainerRemoveOptions{ RemoveVolumes: true, diff --git a/internal/executor/instance/containerbackend/podman_linux.go b/internal/executor/instance/containerbackend/podman_linux.go index fa87096a..b5925689 100644 --- a/internal/executor/instance/containerbackend/podman_linux.go +++ b/internal/executor/instance/containerbackend/podman_linux.go @@ -1,6 +1,7 @@ package containerbackend import ( + "bufio" "context" "encoding/json" "errors" @@ -9,6 +10,7 @@ import ( "github.com/avast/retry-go" "github.com/cirruslabs/cirrus-cli/internal/executor/instance/containerbackend/podman" "github.com/cirruslabs/podmanapi/pkg/swagger" + "github.com/docker/docker/pkg/stdcopy" "github.com/google/uuid" "io" "net" @@ -443,6 +445,55 @@ func (backend *Podman) ContainerWait(ctx context.Context, id string) (<-chan Con return waitChan, errChan } +func (backend *Podman) ContainerLogs(ctx context.Context, id string) (<-chan string, error) { + logChan := make(chan string) + + buildURL, err := url.Parse(backend.basePath + "/containers/" + id + "/logs") + if err != nil { + return nil, err + } + + q := buildURL.Query() + q.Add("stdout", "true") + q.Add("stderr", "true") + q.Add("follow", "true") + buildURL.RawQuery = q.Encode() + + req, err := http.NewRequestWithContext(ctx, "GET", buildURL.String(), nil) + if err != nil { + return nil, err + } + + resp, err := backend.httpClient.Do(req) + if err != nil { + return nil, err + } + + if resp.StatusCode != http.StatusOK { + return nil, fmt.Errorf("%w: container logs endpoint returned HTTP %d", ErrPodman, resp.StatusCode) + } + + pipeReader, pipeWriter := io.Pipe() + + go func() { + _, _ = stdcopy.StdCopy(pipeWriter, pipeWriter, resp.Body) + _ = pipeWriter.Close() + _ = resp.Body.Close() + }() + + go func() { + scanner := bufio.NewScanner(pipeReader) + + for scanner.Scan() { + logChan <- scanner.Text() + } + + close(logChan) + }() + + return logChan, nil +} + func (backend *Podman) ContainerDelete(ctx context.Context, id string) error { // nolint:bodyclose // already closed by Swagger-generated code _, err := backend.cli.ContainersApi.LibpodRemoveContainer(ctx, id, &swagger.ContainersApiLibpodRemoveContainerOpts{ diff --git a/internal/executor/instance/containerbackend/unimplemented.go b/internal/executor/instance/containerbackend/unimplemented.go index 0878b0c7..24c6d992 100644 --- a/internal/executor/instance/containerbackend/unimplemented.go +++ b/internal/executor/instance/containerbackend/unimplemented.go @@ -67,6 +67,10 @@ func (*Unimplemented) ContainerWait(ctx context.Context, id string) (<-chan Cont return waitChan, errChan } +func (*Unimplemented) ContainerLogs(ctx context.Context, id string) (<-chan string, error) { + return nil, ErrNotImplemented +} + func (*Unimplemented) ContainerDelete(ctx context.Context, id string) error { return ErrNotImplemented } func (*Unimplemented) SystemInfo(ctx context.Context) (*SystemInfo, error) { diff --git a/internal/executor/instance/instance.go b/internal/executor/instance/instance.go index 91f0bd4e..8776467d 100644 --- a/internal/executor/instance/instance.go +++ b/internal/executor/instance/instance.go @@ -241,6 +241,9 @@ func RunContainerizedAgent(ctx context.Context, config *runconfig.RunConfig, par var additionalContainersWG sync.WaitGroup additionalContainersCtx, additionalContainersCancel := context.WithCancel(context.Background()) + var logReaderWg sync.WaitGroup + logReaderWg.Add(1) + // Schedule all containers for removal defer func() { // We need to remove additional containers first in order to avoid Podman's @@ -259,6 +262,8 @@ func RunContainerizedAgent(ctx context.Context, config *runconfig.RunConfig, par logger.Warnf("error while removing container: %v", err) } } + + logReaderWg.Wait() }() // Start additional containers (if any) @@ -287,6 +292,17 @@ func RunContainerizedAgent(ctx context.Context, config *runconfig.RunConfig, par return err } + logChan, err := backend.ContainerLogs(ctx, cont.ID) + if err != nil { + return err + } + go func() { + for logLine := range logChan { + logger.Debugf("container: %s", logLine) + } + logReaderWg.Done() + }() + logger.Debugf("waiting for container %s to finish", cont.ID) waitChan, errChan := backend.ContainerWait(ctx, cont.ID) select { diff --git a/internal/executor/testdata/container-logs/.cirrus.yml b/internal/executor/testdata/container-logs/.cirrus.yml new file mode 100644 index 00000000..b67eaf92 --- /dev/null +++ b/internal/executor/testdata/container-logs/.cirrus.yml @@ -0,0 +1,5 @@ +container: + image: debian:latest + +task: + script: true