Skip to content

Commit

Permalink
systemtest: fix elastic-agent log copying (#7163) (#7166)
Browse files Browse the repository at this point in the history
Start copying logs as soon as the container
has started, in case container startup fails;
otherwise the container logs are lost and we
cannot tell why the container failed to start.

(cherry picked from commit 9d67ac1)

Co-authored-by: Andrew Wilkins <[email protected]>
  • Loading branch information
mergify[bot] and axw authored Feb 2, 2022
1 parent 238e487 commit c336aca
Show file tree
Hide file tree
Showing 3 changed files with 82 additions and 27 deletions.
17 changes: 4 additions & 13 deletions systemtest/cmd/runapm/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import (
"strings"
"time"

"github.com/docker/docker/pkg/stdcopy"
"github.com/testcontainers/testcontainers-go/wait"
"gopkg.in/yaml.v3"

Expand Down Expand Up @@ -138,25 +137,17 @@ func Main() error {

agent.ExposedPorts = []string{"8200"}
agent.WaitingFor = wait.ForHTTP("/").WithPort("8200/tcp").WithStartupTimeout(5 * time.Minute)
agent.Stdout = os.Stdout
agent.Stderr = os.Stderr
if err := agent.Start(); err != nil {
return err
}

serverURL := &url.URL{Scheme: "http", Host: agent.Addrs["8200"]}
log.Printf("Elastic Agent container started")
log.Printf(" - APM Server listening on %s", serverURL)

// Send elastic-agent container logs to stdout/stderr.
logs, err := agent.Logs(context.Background())
if err != nil {
return err
}
defer logs.Close()
if _, err := stdcopy.StdCopy(os.Stdout, os.Stderr, logs); err != nil {
return err
}

return nil
_, err = agent.Wait(context.Background())
return err
}

func main() {
Expand Down
81 changes: 73 additions & 8 deletions systemtest/containers.go
Original file line number Diff line number Diff line change
Expand Up @@ -357,6 +357,7 @@ func NewUnstartedElasticAgentContainer() (*ElasticAgentContainer, error) {
}
return &ElasticAgentContainer{
request: req,
exited: make(chan struct{}),
Reap: true,
StackVersion: agentImageVersion,
}, nil
Expand All @@ -366,6 +367,7 @@ func NewUnstartedElasticAgentContainer() (*ElasticAgentContainer, error) {
type ElasticAgentContainer struct {
container testcontainers.Container
request testcontainers.ContainerRequest
exited chan struct{}

// Reap entrols whether the container will be automatically reaped if
// the controlling process exits. This is true by default, and may be
Expand All @@ -391,6 +393,14 @@ type ElasticAgentContainer struct {
// use for enrolling the agent with Fleet. The agent will only enroll
// if this is specified.
FleetEnrollmentToken string

// Stdout, if non-nil, holds a writer to which the container's stdout
// will be written.
Stdout io.Writer

// Stderr, if non-nil, holds a writer to which the container's stderr
// will be written.
Stderr io.Writer
}

// Start starts the container.
Expand Down Expand Up @@ -426,9 +436,29 @@ func (c *ElasticAgentContainer) Start() error {
}
c.container = container

// Start a goroutine to read logs, and signal when the container process has exited.
if c.Stdout != nil || c.Stderr != nil {
go func() {
defer close(c.exited)
defer cancel()
stdout, stderr := c.Stdout, c.Stderr
if stdout == nil {
stdout = io.Discard
}
if stderr == nil {
stderr = io.Discard
}
_ = c.copyLogs(stdout, stderr)
}()
}

if err := container.Start(ctx); err != nil {
return err
if err != context.Canceled {
return fmt.Errorf("failed to start container: %w", err)
}
return errors.New("failed to start container")
}

if len(c.request.ExposedPorts) > 0 {
hostIP, err := container.Host(ctx)
if err != nil {
Expand All @@ -448,6 +478,41 @@ func (c *ElasticAgentContainer) Start() error {
return nil
}

func (c *ElasticAgentContainer) copyLogs(stdout, stderr io.Writer) error {
// Wait for the container to be running (or have gone past that),
// or ContainerLogs will return immediately.
ctx := context.Background()
for {
state, err := c.container.State(ctx)
if err != nil {
return err
}
if state.Status != "created" {
break
}
}

docker, err := client.NewClientWithOpts(client.FromEnv)
if err != nil {
return err
}
defer docker.Close()

options := types.ContainerLogsOptions{
ShowStdout: stdout != nil,
ShowStderr: stderr != nil,
Follow: true,
}
rc, err := docker.ContainerLogs(ctx, c.container.GetContainerID(), options)
if err != nil {
return err
}
defer rc.Close()

_, err = stdcopy.StdCopy(stdout, stderr, rc)
return err
}

// Close terminates and removes the container.
func (c *ElasticAgentContainer) Close() error {
if c.container == nil {
Expand All @@ -456,14 +521,14 @@ func (c *ElasticAgentContainer) Close() error {
return c.container.Terminate(context.Background())
}

// Logs returns an io.ReadCloser that can be used for reading the
// container's combined stdout/stderr log. If the container has not
// been created by Start(), Logs will return an error.
func (c *ElasticAgentContainer) Logs(ctx context.Context) (io.ReadCloser, error) {
if c.container == nil {
return nil, errors.New("container not created")
// Wait waits for the container process to exit, and returns its state.
func (c *ElasticAgentContainer) Wait(ctx context.Context) (*types.ContainerState, error) {
select {
case <-ctx.Done():
return nil, ctx.Err()
case <-c.exited:
return c.container.State(ctx)
}
return c.container.Logs(ctx)
}

// Exec executes a command in the container, and returns its stdout and stderr.
Expand Down
11 changes: 5 additions & 6 deletions systemtest/fleet_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package systemtest_test

import (
"bytes"
"context"
"encoding/json"
"fmt"
Expand Down Expand Up @@ -135,21 +136,19 @@ func newAPMIntegration(t testing.TB, vars map[string]interface{}) apmIntegration
_, enrollmentAPIKey := systemtest.CreateAgentPolicy(t, policyName, "default", vars)

// Enroll an elastic-agent to run the APM integration.
var output bytes.Buffer
agent, err := systemtest.NewUnstartedElasticAgentContainer()
require.NoError(t, err)
agent.Stdout = &output
agent.Stderr = &output
agent.FleetEnrollmentToken = enrollmentAPIKey.APIKey
t.Cleanup(func() { agent.Close() })
t.Cleanup(func() {
// Log the elastic-agent container output if the test fails.
if !t.Failed() {
return
}
if logs, err := agent.Logs(context.Background()); err == nil {
defer logs.Close()
if out, err := ioutil.ReadAll(logs); err == nil {
t.Logf("elastic-agent logs: %s", out)
}
}
t.Logf("elastic-agent logs: %s", output.String())
})

// Start elastic-agent with port 8200 exposed, and wait for the server to service
Expand Down

0 comments on commit c336aca

Please sign in to comment.