Skip to content

Commit

Permalink
fix(reaper): refactor to allow retries and fix races (testcontainers#…
Browse files Browse the repository at this point in the history
…2728)

Refactor how the reaper is created to allow for proper retries of
temporary errors such as container not found issues during startup or
shutdown races.

This eliminates the use of sync.Once which wasn't solving the problem at
hand and replaces it with a singleton spawner with locked access.

Wrap reaper errors so we can determine the cause of failures more
easily.

Fix race condition in port wait when loading from container by always
waiting for the port first.

Remove unnecessary use of buffering and invalid retry logic in reaper
connection handling which could never recover correctly from a partial
read.

Move the reaper creation just before connections are established in
compose to ensure its still running when the Connect calls are made.

Previously the reaper was requested in NewDockerComposeWith which
means it could have already shutdown before connections are made
during the later sections of Up if the startup took over 1 minute.

This was causing consistent failures for:
TestDockerComposeAPIWithWaitLogStrategy

Ensure that resource labels are correct so that resources aren't reaped
when the reaper is disabled by excluding session id when reaper is
disabled.

Error when creating a reaper when the config says it's disabled so that
we avoid hard to debug issues because a reaper is running when it
shouldn't be.

Set org.testcontainers.reap label for containers which should be reaped
by the reaper, to prevent containers which disable the reaper from being
incorrectly reaped.
  • Loading branch information
stevenh authored Oct 18, 2024
1 parent 56bb501 commit 5e988ff
Show file tree
Hide file tree
Showing 12 changed files with 857 additions and 707 deletions.
22 changes: 11 additions & 11 deletions container.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,17 +37,17 @@ type DeprecatedContainer interface {

// Container allows getting info about and controlling a single container instance
type Container interface {
GetContainerID() string // get the container id from the provider
Endpoint(context.Context, string) (string, error) // get proto://ip:port string for the lowest exposed port
PortEndpoint(context.Context, nat.Port, string) (string, error) // get proto://ip:port string for the given exposed port
Host(context.Context) (string, error) // get host where the container port is exposed
Inspect(context.Context) (*types.ContainerJSON, error) // get container info
MappedPort(context.Context, nat.Port) (nat.Port, error) // get externally mapped port for a container port
Ports(context.Context) (nat.PortMap, error) // Deprecated: Use c.Inspect(ctx).NetworkSettings.Ports instead
SessionID() string // get session id
IsRunning() bool // IsRunning returns true if the container is running, false otherwise.
Start(context.Context) error // start the container
Stop(context.Context, *time.Duration) error // stop the container
GetContainerID() string // get the container id from the provider
Endpoint(context.Context, string) (string, error) // get proto://ip:port string for the lowest exposed port
PortEndpoint(ctx context.Context, port nat.Port, proto string) (string, error) // get proto://ip:port string for the given exposed port
Host(context.Context) (string, error) // get host where the container port is exposed
Inspect(context.Context) (*types.ContainerJSON, error) // get container info
MappedPort(context.Context, nat.Port) (nat.Port, error) // get externally mapped port for a container port
Ports(context.Context) (nat.PortMap, error) // Deprecated: Use c.Inspect(ctx).NetworkSettings.Ports instead
SessionID() string // get session id
IsRunning() bool // IsRunning returns true if the container is running, false otherwise.
Start(context.Context) error // start the container
Stop(context.Context, *time.Duration) error // stop the container

// Terminate stops and removes the container and its image if it was built and not flagged as kept.
Terminate(ctx context.Context) error
Expand Down
102 changes: 51 additions & 51 deletions docker.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ func (c *DockerContainer) Inspect(ctx context.Context) (*types.ContainerJSON, er
func (c *DockerContainer) MappedPort(ctx context.Context, port nat.Port) (nat.Port, error) {
inspect, err := c.Inspect(ctx)
if err != nil {
return "", err
return "", fmt.Errorf("inspect: %w", err)
}
if inspect.ContainerJSONBase.HostConfig.NetworkMode == "host" {
return port, nil
Expand All @@ -204,7 +204,7 @@ func (c *DockerContainer) MappedPort(ctx context.Context, port nat.Port) (nat.Po
return nat.NewPort(k.Proto(), p[0].HostPort)
}

return "", errors.New("port not found")
return "", errdefs.NotFound(fmt.Errorf("port %q not found", port))
}

// Deprecated: use c.Inspect(ctx).NetworkSettings.Ports instead.
Expand Down Expand Up @@ -980,9 +980,7 @@ func (p *DockerProvider) BuildImage(ctx context.Context, img ImageBuildInfo) (st
}

// CreateContainer fulfils a request for a container without starting it
func (p *DockerProvider) CreateContainer(ctx context.Context, req ContainerRequest) (Container, error) {
var err error

func (p *DockerProvider) CreateContainer(ctx context.Context, req ContainerRequest) (con Container, err error) { //nolint:nonamedreturns // Needed for error checking.
// defer the close of the Docker client connection the soonest
defer p.Close()

Expand Down Expand Up @@ -1027,22 +1025,23 @@ func (p *DockerProvider) CreateContainer(ctx context.Context, req ContainerReque
// the reaper does not need to start a reaper for itself
isReaperContainer := strings.HasSuffix(imageName, config.ReaperDefaultImage)
if !p.config.RyukDisabled && !isReaperContainer {
r, err := reuseOrCreateReaper(context.WithValue(ctx, core.DockerHostContextKey, p.host), core.SessionID(), p)
r, err := spawner.reaper(context.WithValue(ctx, core.DockerHostContextKey, p.host), core.SessionID(), p)
if err != nil {
return nil, fmt.Errorf("%w: creating reaper failed", err)
return nil, fmt.Errorf("reaper: %w", err)
}
termSignal, err = r.Connect()

termSignal, err := r.Connect()
if err != nil {
return nil, fmt.Errorf("%w: connecting to reaper failed", err)
return nil, fmt.Errorf("reaper connect: %w", err)
}
}

// Cleanup on error, otherwise set termSignal to nil before successful return.
defer func() {
if termSignal != nil {
termSignal <- true
}
}()
// Cleanup on error.
defer func() {
if err != nil {
termSignal <- true
}
}()
}

if err = req.Validate(); err != nil {
return nil, err
Expand Down Expand Up @@ -1108,10 +1107,9 @@ func (p *DockerProvider) CreateContainer(ctx context.Context, req ContainerReque
}

if !isReaperContainer {
// add the labels that the reaper will use to terminate the container to the request
for k, v := range core.DefaultLabels(core.SessionID()) {
req.Labels[k] = v
}
// Add the labels that identify this as a testcontainers container and
// allow the reaper to terminate it if requested.
AddGenericLabels(req.Labels)
}

dockerInput := &container.Config{
Expand Down Expand Up @@ -1205,9 +1203,6 @@ func (p *DockerProvider) CreateContainer(ctx context.Context, req ContainerReque
return nil, err
}

// Disable cleanup on success
termSignal = nil

return c, nil
}

Expand Down Expand Up @@ -1256,7 +1251,7 @@ func (p *DockerProvider) waitContainerCreation(ctx context.Context, name string)
)
}

func (p *DockerProvider) ReuseOrCreateContainer(ctx context.Context, req ContainerRequest) (Container, error) {
func (p *DockerProvider) ReuseOrCreateContainer(ctx context.Context, req ContainerRequest) (con Container, err error) { //nolint:nonamedreturns // Needed for error check.
c, err := p.findContainerByName(ctx, req.Name)
if err != nil {
return nil, err
Expand All @@ -1279,14 +1274,22 @@ func (p *DockerProvider) ReuseOrCreateContainer(ctx context.Context, req Contain

var termSignal chan bool
if !p.config.RyukDisabled {
r, err := reuseOrCreateReaper(context.WithValue(ctx, core.DockerHostContextKey, p.host), sessionID, p)
r, err := spawner.reaper(context.WithValue(ctx, core.DockerHostContextKey, p.host), sessionID, p)
if err != nil {
return nil, fmt.Errorf("reaper: %w", err)
}
termSignal, err = r.Connect()

termSignal, err := r.Connect()
if err != nil {
return nil, fmt.Errorf("%w: connecting to reaper failed", err)
return nil, fmt.Errorf("reaper connect: %w", err)
}

// Cleanup on error.
defer func() {
if err != nil {
termSignal <- true
}
}()
}

// default hooks include logger hook and pre-create hook
Expand Down Expand Up @@ -1454,9 +1457,7 @@ func daemonHost(ctx context.Context, p *DockerProvider) (string, error) {

// Deprecated: use network.New instead
// CreateNetwork returns the object representing a new network identified by its name
func (p *DockerProvider) CreateNetwork(ctx context.Context, req NetworkRequest) (Network, error) {
var err error

func (p *DockerProvider) CreateNetwork(ctx context.Context, req NetworkRequest) (net Network, err error) { //nolint:nonamedreturns // Needed for error check.
// defer the close of the Docker client connection the soonest
defer p.Close()

Expand Down Expand Up @@ -1485,31 +1486,30 @@ func (p *DockerProvider) CreateNetwork(ctx context.Context, req NetworkRequest)

var termSignal chan bool
if !p.config.RyukDisabled {
r, err := reuseOrCreateReaper(context.WithValue(ctx, core.DockerHostContextKey, p.host), sessionID, p)
r, err := spawner.reaper(context.WithValue(ctx, core.DockerHostContextKey, p.host), sessionID, p)
if err != nil {
return nil, fmt.Errorf("%w: creating network reaper failed", err)
return nil, fmt.Errorf("reaper: %w", err)
}
termSignal, err = r.Connect()

termSignal, err := r.Connect()
if err != nil {
return nil, fmt.Errorf("%w: connecting to network reaper failed", err)
return nil, fmt.Errorf("reaper connect: %w", err)
}
}

// add the labels that the reaper will use to terminate the network to the request
for k, v := range core.DefaultLabels(sessionID) {
req.Labels[k] = v
// Cleanup on error.
defer func() {
if err != nil {
termSignal <- true
}
}()
}

// Cleanup on error, otherwise set termSignal to nil before successful return.
defer func() {
if termSignal != nil {
termSignal <- true
}
}()
// add the labels that the reaper will use to terminate the network to the request
core.AddDefaultLabels(sessionID, req.Labels)

response, err := p.client.NetworkCreate(ctx, req.Name, nc)
if err != nil {
return &DockerNetwork{}, err
return &DockerNetwork{}, fmt.Errorf("create network: %w", err)
}

n := &DockerNetwork{
Expand All @@ -1520,9 +1520,6 @@ func (p *DockerProvider) CreateNetwork(ctx context.Context, req NetworkRequest)
provider: p,
}

// Disable cleanup on success
termSignal = nil

return n, nil
}

Expand Down Expand Up @@ -1592,9 +1589,12 @@ func (p *DockerProvider) getDefaultNetwork(ctx context.Context, cli client.APICl
_, err = cli.NetworkCreate(ctx, reaperNetwork, network.CreateOptions{
Driver: Bridge,
Attachable: true,
Labels: core.DefaultLabels(core.SessionID()),
Labels: GenericLabels(),
})
if err != nil {
// If the network already exists, we can ignore the error as that can
// happen if we are running multiple tests in parallel and we only
// need to ensure that the network exists.
if err != nil && !errdefs.IsConflict(err) {
return "", err
}
}
Expand Down Expand Up @@ -1632,7 +1632,7 @@ func containerFromDockerResponse(ctx context.Context, response types.Container)
// populate the raw representation of the container
jsonRaw, err := ctr.inspectRawContainer(ctx)
if err != nil {
return nil, err
return nil, fmt.Errorf("inspect raw container: %w", err)
}

// the health status of the container, if any
Expand Down
4 changes: 1 addition & 3 deletions docker_mounts.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,9 +126,7 @@ func mapToDockerMounts(containerMounts ContainerMounts) []mount.Mount {
Labels: make(map[string]string),
}
}
for k, v := range GenericLabels() {
containerMount.VolumeOptions.Labels[k] = v
}
AddGenericLabels(containerMount.VolumeOptions.Labels)
}

mounts = append(mounts, containerMount)
Expand Down
12 changes: 11 additions & 1 deletion generic.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,17 @@ type GenericProvider interface {
ImageProvider
}

// GenericLabels returns a map of labels that can be used to identify containers created by this library
// GenericLabels returns a map of labels that can be used to identify resources
// created by this library. This includes the standard LabelSessionID if the
// reaper is enabled, otherwise this is excluded to prevent resources being
// incorrectly reaped.
func GenericLabels() map[string]string {
return core.DefaultLabels(core.SessionID())
}

// AddGenericLabels adds the generic labels to target.
func AddGenericLabels(target map[string]string) {
for k, v := range GenericLabels() {
target[k] = v
}
}
44 changes: 37 additions & 7 deletions internal/core/labels.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,23 +6,53 @@ import (
"strings"

"github.com/testcontainers/testcontainers-go/internal"
"github.com/testcontainers/testcontainers-go/internal/config"
)

const (
LabelBase = "org.testcontainers"
LabelLang = LabelBase + ".lang"
LabelReaper = LabelBase + ".reaper"
LabelRyuk = LabelBase + ".ryuk"
// LabelBase is the base label for all testcontainers labels.
LabelBase = "org.testcontainers"

// LabelLang specifies the language which created the test container.
LabelLang = LabelBase + ".lang"

// LabelReaper identifies the container as a reaper.
LabelReaper = LabelBase + ".reaper"

// LabelRyuk identifies the container as a ryuk.
LabelRyuk = LabelBase + ".ryuk"

// LabelSessionID specifies the session ID of the container.
LabelSessionID = LabelBase + ".sessionId"
LabelVersion = LabelBase + ".version"

// LabelVersion specifies the version of testcontainers which created the container.
LabelVersion = LabelBase + ".version"

// LabelReap specifies the container should be reaped by the reaper.
LabelReap = LabelBase + ".reap"
)

// DefaultLabels returns the standard set of labels which
// includes LabelSessionID if the reaper is enabled.
func DefaultLabels(sessionID string) map[string]string {
return map[string]string{
labels := map[string]string{
LabelBase: "true",
LabelLang: "go",
LabelSessionID: sessionID,
LabelVersion: internal.Version,
LabelSessionID: sessionID,
}

if !config.Read().RyukDisabled {
labels[LabelReap] = "true"
}

return labels
}

// AddDefaultLabels adds the default labels for sessionID to target.
func AddDefaultLabels(sessionID string, target map[string]string) {
for k, v := range DefaultLabels(sessionID) {
target[k] = v
}
}

Expand Down
2 changes: 1 addition & 1 deletion lifecycle.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ type ContainerRequestHook func(ctx context.Context, req ContainerRequest) error
// - Terminating
// - Terminated
// For that, it will receive a Container, modify it and return an error if needed.
type ContainerHook func(ctx context.Context, container Container) error
type ContainerHook func(ctx context.Context, ctr Container) error

// ContainerLifecycleHooks is a struct that contains all the hooks that can be used
// to modify the container lifecycle. All the container lifecycle hooks except the PreCreates hooks
Expand Down
18 changes: 0 additions & 18 deletions modules/compose/compose.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,23 +153,6 @@ func NewDockerComposeWith(opts ...ComposeStackOption) (*dockerCompose, error) {
return nil, fmt.Errorf("initialize docker client: %w", err)
}

reaperProvider, err := testcontainers.NewDockerProvider()
if err != nil {
return nil, fmt.Errorf("failed to create reaper provider for compose: %w", err)
}

var composeReaper *testcontainers.Reaper
if !reaperProvider.Config().Config.RyukDisabled {
// NewReaper is deprecated: we need to find a way to create the reaper for compose
// bypassing the deprecation.
r, err := testcontainers.NewReaper(context.Background(), testcontainers.SessionID(), reaperProvider, "")
if err != nil {
return nil, fmt.Errorf("failed to create reaper for compose: %w", err)
}

composeReaper = r
}

composeAPI := &dockerCompose{
name: composeOptions.Identifier,
configs: composeOptions.Paths,
Expand All @@ -182,7 +165,6 @@ func NewDockerComposeWith(opts ...ComposeStackOption) (*dockerCompose, error) {
containers: make(map[string]*testcontainers.DockerContainer),
networks: make(map[string]*testcontainers.DockerNetwork),
sessionID: testcontainers.SessionID(),
reaper: composeReaper,
}

return composeAPI, nil
Expand Down
Loading

0 comments on commit 5e988ff

Please sign in to comment.