Skip to content

Commit

Permalink
Stream launched container's logs to the CLI's logger
Browse files Browse the repository at this point in the history
  • Loading branch information
edigaryev committed Feb 16, 2021
1 parent a55d0ca commit 2f7aa4f
Show file tree
Hide file tree
Showing 7 changed files with 130 additions and 0 deletions.
19 changes: 19 additions & 0 deletions internal/executor/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/cirruslabs/echelon"
"github.com/cirruslabs/echelon/renderers"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"io"
"io/ioutil"
"os"
Expand Down Expand Up @@ -412,3 +413,21 @@ func TestLoggingNoExtraNewlines(t *testing.T) {
assert.Contains(t, buf.String(), "no newline in the output\n\x1b[32m'no_newline' script succeeded")
assert.Contains(t, buf.String(), "double newline in the output\n\n\x1b[32m'double' script succeeded")
}

// TestContainerLogs ensures that we receive logs from the agent running inside a container.
func TestContainerLogs(t *testing.T) {
// Create os.Stderr writer that duplicates it's output to buf
buf := bytes.NewBufferString("")
writer := io.MultiWriter(os.Stderr, buf)

// Create a logger and attach it to writer
renderer := renderers.NewSimpleRenderer(writer, nil)
logger := echelon.NewLogger(echelon.TraceLevel, renderer)

dir := testutil.TempDirPopulatedWith(t, "testdata/container-logs")
err := testutil.ExecuteWithOptions(t, dir, executor.WithLogger(logger))
require.NoError(t, err)
assert.Contains(t, buf.String(), "Getting initial commands...")
assert.Regexp(t, "container: [0-9]{4}/[0-9]{2}/[0-9]{2} [0-9]{2}:[0-9]{2}:[0-9]{2}"+
" Background commands to clean up after: [0-9]+", buf.String())
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ type ContainerBackend interface {
ContainerCreate(ctx context.Context, input *ContainerCreateInput, name string) (*ContainerCreateOutput, error)
ContainerStart(ctx context.Context, id string) error
ContainerWait(ctx context.Context, id string) (<-chan ContainerWaitResult, <-chan error)
ContainerLogs(ctx context.Context, id string) (<-chan string, error)
ContainerDelete(ctx context.Context, id string) error

SystemInfo(ctx context.Context) (*SystemInfo, error)
Expand Down
34 changes: 34 additions & 0 deletions internal/executor/instance/containerbackend/docker.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/docker/docker/api/types/mount"
"github.com/docker/docker/api/types/volume"
"github.com/docker/docker/client"
"github.com/docker/docker/pkg/stdcopy"
"io"
"io/ioutil"
)
Expand Down Expand Up @@ -266,6 +267,39 @@ func (backend *Docker) ContainerWait(ctx context.Context, id string) (<-chan Con
return waitChan, errChan
}

func (backend *Docker) ContainerLogs(ctx context.Context, id string) (<-chan string, error) {
logChan := make(chan string)

multiplexedStream, err := backend.cli.ContainerLogs(ctx, id, types.ContainerLogsOptions{
ShowStdout: true,
ShowStderr: true,
Follow: true,
})
if err != nil {
return nil, err
}

pipeReader, pipeWriter := io.Pipe()

go func() {
_, _ = stdcopy.StdCopy(pipeWriter, pipeWriter, multiplexedStream)
_ = pipeWriter.Close()
_ = multiplexedStream.Close()
}()

go func() {
scanner := bufio.NewScanner(pipeReader)

for scanner.Scan() {
logChan <- scanner.Text()
}

close(logChan)
}()

return logChan, nil
}

func (backend *Docker) ContainerDelete(ctx context.Context, id string) error {
return backend.cli.ContainerRemove(ctx, id, types.ContainerRemoveOptions{
RemoveVolumes: true,
Expand Down
51 changes: 51 additions & 0 deletions internal/executor/instance/containerbackend/podman_linux.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package containerbackend

import (
"bufio"
"context"
"encoding/json"
"errors"
Expand All @@ -9,6 +10,7 @@ import (
"github.com/avast/retry-go"
"github.com/cirruslabs/cirrus-cli/internal/executor/instance/containerbackend/podman"
"github.com/cirruslabs/podmanapi/pkg/swagger"
"github.com/docker/docker/pkg/stdcopy"
"github.com/google/uuid"
"io"
"net"
Expand Down Expand Up @@ -443,6 +445,55 @@ func (backend *Podman) ContainerWait(ctx context.Context, id string) (<-chan Con
return waitChan, errChan
}

func (backend *Podman) ContainerLogs(ctx context.Context, id string) (<-chan string, error) {
logChan := make(chan string)

buildURL, err := url.Parse(backend.basePath + "/containers/" + id + "/logs")
if err != nil {
return nil, err
}

q := buildURL.Query()
q.Add("stdout", "true")
q.Add("stderr", "true")
q.Add("follow", "true")
buildURL.RawQuery = q.Encode()

req, err := http.NewRequestWithContext(ctx, "GET", buildURL.String(), nil)
if err != nil {
return nil, err
}

resp, err := backend.httpClient.Do(req)
if err != nil {
return nil, err
}

if resp.StatusCode != http.StatusOK {
return nil, fmt.Errorf("%w: container logs endpoint returned HTTP %d", ErrPodman, resp.StatusCode)
}

pipeReader, pipeWriter := io.Pipe()

go func() {
_, _ = stdcopy.StdCopy(pipeWriter, pipeWriter, resp.Body)
_ = pipeWriter.Close()
_ = resp.Body.Close()
}()

go func() {
scanner := bufio.NewScanner(pipeReader)

for scanner.Scan() {
logChan <- scanner.Text()
}

close(logChan)
}()

return logChan, nil
}

func (backend *Podman) ContainerDelete(ctx context.Context, id string) error {
// nolint:bodyclose // already closed by Swagger-generated code
_, err := backend.cli.ContainersApi.LibpodRemoveContainer(ctx, id, &swagger.ContainersApiLibpodRemoveContainerOpts{
Expand Down
4 changes: 4 additions & 0 deletions internal/executor/instance/containerbackend/unimplemented.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,10 @@ func (*Unimplemented) ContainerWait(ctx context.Context, id string) (<-chan Cont
return waitChan, errChan
}

func (*Unimplemented) ContainerLogs(ctx context.Context, id string) (<-chan string, error) {
return nil, ErrNotImplemented
}

func (*Unimplemented) ContainerDelete(ctx context.Context, id string) error { return ErrNotImplemented }

func (*Unimplemented) SystemInfo(ctx context.Context) (*SystemInfo, error) {
Expand Down
16 changes: 16 additions & 0 deletions internal/executor/instance/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,9 @@ func RunContainerizedAgent(ctx context.Context, config *runconfig.RunConfig, par
var additionalContainersWG sync.WaitGroup
additionalContainersCtx, additionalContainersCancel := context.WithCancel(context.Background())

var logReaderWg sync.WaitGroup
logReaderWg.Add(1)

// Schedule all containers for removal
defer func() {
// We need to remove additional containers first in order to avoid Podman's
Expand All @@ -259,6 +262,8 @@ func RunContainerizedAgent(ctx context.Context, config *runconfig.RunConfig, par
logger.Warnf("error while removing container: %v", err)
}
}

logReaderWg.Wait()
}()

// Start additional containers (if any)
Expand Down Expand Up @@ -287,6 +292,17 @@ func RunContainerizedAgent(ctx context.Context, config *runconfig.RunConfig, par
return err
}

logChan, err := backend.ContainerLogs(ctx, cont.ID)
if err != nil {
return err
}
go func() {
for logLine := range logChan {
logger.Debugf("container: %s", logLine)
}
logReaderWg.Done()
}()

logger.Debugf("waiting for container %s to finish", cont.ID)
waitChan, errChan := backend.ContainerWait(ctx, cont.ID)
select {
Expand Down
5 changes: 5 additions & 0 deletions internal/executor/testdata/container-logs/.cirrus.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
container:
image: debian:latest

task:
script: true

0 comments on commit 2f7aa4f

Please sign in to comment.