diff --git a/container.go b/container.go index 1e95fb09d4..d114a5988a 100644 --- a/container.go +++ b/container.go @@ -37,17 +37,17 @@ type DeprecatedContainer interface { // Container allows getting info about and controlling a single container instance type Container interface { - GetContainerID() string // get the container id from the provider - Endpoint(context.Context, string) (string, error) // get proto://ip:port string for the lowest exposed port - PortEndpoint(context.Context, nat.Port, string) (string, error) // get proto://ip:port string for the given exposed port - Host(context.Context) (string, error) // get host where the container port is exposed - Inspect(context.Context) (*types.ContainerJSON, error) // get container info - MappedPort(context.Context, nat.Port) (nat.Port, error) // get externally mapped port for a container port - Ports(context.Context) (nat.PortMap, error) // Deprecated: Use c.Inspect(ctx).NetworkSettings.Ports instead - SessionID() string // get session id - IsRunning() bool // IsRunning returns true if the container is running, false otherwise. - Start(context.Context) error // start the container - Stop(context.Context, *time.Duration) error // stop the container + GetContainerID() string // get the container id from the provider + Endpoint(context.Context, string) (string, error) // get proto://ip:port string for the lowest exposed port + PortEndpoint(ctx context.Context, port nat.Port, proto string) (string, error) // get proto://ip:port string for the given exposed port + Host(context.Context) (string, error) // get host where the container port is exposed + Inspect(context.Context) (*types.ContainerJSON, error) // get container info + MappedPort(context.Context, nat.Port) (nat.Port, error) // get externally mapped port for a container port + Ports(context.Context) (nat.PortMap, error) // Deprecated: Use c.Inspect(ctx).NetworkSettings.Ports instead + SessionID() string // get session id + IsRunning() bool // IsRunning returns true if the container is running, false otherwise. + Start(context.Context) error // start the container + Stop(context.Context, *time.Duration) error // stop the container // Terminate stops and removes the container and its image if it was built and not flagged as kept. Terminate(ctx context.Context) error diff --git a/docker.go b/docker.go index 9319c630dd..2ef8c6973a 100644 --- a/docker.go +++ b/docker.go @@ -183,7 +183,7 @@ func (c *DockerContainer) Inspect(ctx context.Context) (*types.ContainerJSON, er func (c *DockerContainer) MappedPort(ctx context.Context, port nat.Port) (nat.Port, error) { inspect, err := c.Inspect(ctx) if err != nil { - return "", err + return "", fmt.Errorf("inspect: %w", err) } if inspect.ContainerJSONBase.HostConfig.NetworkMode == "host" { return port, nil @@ -204,7 +204,7 @@ func (c *DockerContainer) MappedPort(ctx context.Context, port nat.Port) (nat.Po return nat.NewPort(k.Proto(), p[0].HostPort) } - return "", errors.New("port not found") + return "", errdefs.NotFound(fmt.Errorf("port %q not found", port)) } // Deprecated: use c.Inspect(ctx).NetworkSettings.Ports instead. @@ -980,9 +980,7 @@ func (p *DockerProvider) BuildImage(ctx context.Context, img ImageBuildInfo) (st } // CreateContainer fulfils a request for a container without starting it -func (p *DockerProvider) CreateContainer(ctx context.Context, req ContainerRequest) (Container, error) { - var err error - +func (p *DockerProvider) CreateContainer(ctx context.Context, req ContainerRequest) (con Container, err error) { //nolint:nonamedreturns // Needed for error checking. // defer the close of the Docker client connection the soonest defer p.Close() @@ -1027,22 +1025,23 @@ func (p *DockerProvider) CreateContainer(ctx context.Context, req ContainerReque // the reaper does not need to start a reaper for itself isReaperContainer := strings.HasSuffix(imageName, config.ReaperDefaultImage) if !p.config.RyukDisabled && !isReaperContainer { - r, err := reuseOrCreateReaper(context.WithValue(ctx, core.DockerHostContextKey, p.host), core.SessionID(), p) + r, err := spawner.reaper(context.WithValue(ctx, core.DockerHostContextKey, p.host), core.SessionID(), p) if err != nil { - return nil, fmt.Errorf("%w: creating reaper failed", err) + return nil, fmt.Errorf("reaper: %w", err) } - termSignal, err = r.Connect() + + termSignal, err := r.Connect() if err != nil { - return nil, fmt.Errorf("%w: connecting to reaper failed", err) + return nil, fmt.Errorf("reaper connect: %w", err) } - } - // Cleanup on error, otherwise set termSignal to nil before successful return. - defer func() { - if termSignal != nil { - termSignal <- true - } - }() + // Cleanup on error. + defer func() { + if err != nil { + termSignal <- true + } + }() + } if err = req.Validate(); err != nil { return nil, err @@ -1108,10 +1107,9 @@ func (p *DockerProvider) CreateContainer(ctx context.Context, req ContainerReque } if !isReaperContainer { - // add the labels that the reaper will use to terminate the container to the request - for k, v := range core.DefaultLabels(core.SessionID()) { - req.Labels[k] = v - } + // Add the labels that identify this as a testcontainers container and + // allow the reaper to terminate it if requested. + AddGenericLabels(req.Labels) } dockerInput := &container.Config{ @@ -1205,9 +1203,6 @@ func (p *DockerProvider) CreateContainer(ctx context.Context, req ContainerReque return nil, err } - // Disable cleanup on success - termSignal = nil - return c, nil } @@ -1256,7 +1251,7 @@ func (p *DockerProvider) waitContainerCreation(ctx context.Context, name string) ) } -func (p *DockerProvider) ReuseOrCreateContainer(ctx context.Context, req ContainerRequest) (Container, error) { +func (p *DockerProvider) ReuseOrCreateContainer(ctx context.Context, req ContainerRequest) (con Container, err error) { //nolint:nonamedreturns // Needed for error check. c, err := p.findContainerByName(ctx, req.Name) if err != nil { return nil, err @@ -1279,14 +1274,22 @@ func (p *DockerProvider) ReuseOrCreateContainer(ctx context.Context, req Contain var termSignal chan bool if !p.config.RyukDisabled { - r, err := reuseOrCreateReaper(context.WithValue(ctx, core.DockerHostContextKey, p.host), sessionID, p) + r, err := spawner.reaper(context.WithValue(ctx, core.DockerHostContextKey, p.host), sessionID, p) if err != nil { return nil, fmt.Errorf("reaper: %w", err) } - termSignal, err = r.Connect() + + termSignal, err := r.Connect() if err != nil { - return nil, fmt.Errorf("%w: connecting to reaper failed", err) + return nil, fmt.Errorf("reaper connect: %w", err) } + + // Cleanup on error. + defer func() { + if err != nil { + termSignal <- true + } + }() } // default hooks include logger hook and pre-create hook @@ -1454,9 +1457,7 @@ func daemonHost(ctx context.Context, p *DockerProvider) (string, error) { // Deprecated: use network.New instead // CreateNetwork returns the object representing a new network identified by its name -func (p *DockerProvider) CreateNetwork(ctx context.Context, req NetworkRequest) (Network, error) { - var err error - +func (p *DockerProvider) CreateNetwork(ctx context.Context, req NetworkRequest) (net Network, err error) { //nolint:nonamedreturns // Needed for error check. // defer the close of the Docker client connection the soonest defer p.Close() @@ -1485,31 +1486,30 @@ func (p *DockerProvider) CreateNetwork(ctx context.Context, req NetworkRequest) var termSignal chan bool if !p.config.RyukDisabled { - r, err := reuseOrCreateReaper(context.WithValue(ctx, core.DockerHostContextKey, p.host), sessionID, p) + r, err := spawner.reaper(context.WithValue(ctx, core.DockerHostContextKey, p.host), sessionID, p) if err != nil { - return nil, fmt.Errorf("%w: creating network reaper failed", err) + return nil, fmt.Errorf("reaper: %w", err) } - termSignal, err = r.Connect() + + termSignal, err := r.Connect() if err != nil { - return nil, fmt.Errorf("%w: connecting to network reaper failed", err) + return nil, fmt.Errorf("reaper connect: %w", err) } - } - // add the labels that the reaper will use to terminate the network to the request - for k, v := range core.DefaultLabels(sessionID) { - req.Labels[k] = v + // Cleanup on error. + defer func() { + if err != nil { + termSignal <- true + } + }() } - // Cleanup on error, otherwise set termSignal to nil before successful return. - defer func() { - if termSignal != nil { - termSignal <- true - } - }() + // add the labels that the reaper will use to terminate the network to the request + core.AddDefaultLabels(sessionID, req.Labels) response, err := p.client.NetworkCreate(ctx, req.Name, nc) if err != nil { - return &DockerNetwork{}, err + return &DockerNetwork{}, fmt.Errorf("create network: %w", err) } n := &DockerNetwork{ @@ -1520,9 +1520,6 @@ func (p *DockerProvider) CreateNetwork(ctx context.Context, req NetworkRequest) provider: p, } - // Disable cleanup on success - termSignal = nil - return n, nil } @@ -1592,9 +1589,12 @@ func (p *DockerProvider) getDefaultNetwork(ctx context.Context, cli client.APICl _, err = cli.NetworkCreate(ctx, reaperNetwork, network.CreateOptions{ Driver: Bridge, Attachable: true, - Labels: core.DefaultLabels(core.SessionID()), + Labels: GenericLabels(), }) - if err != nil { + // If the network already exists, we can ignore the error as that can + // happen if we are running multiple tests in parallel and we only + // need to ensure that the network exists. + if err != nil && !errdefs.IsConflict(err) { return "", err } } @@ -1632,7 +1632,7 @@ func containerFromDockerResponse(ctx context.Context, response types.Container) // populate the raw representation of the container jsonRaw, err := ctr.inspectRawContainer(ctx) if err != nil { - return nil, err + return nil, fmt.Errorf("inspect raw container: %w", err) } // the health status of the container, if any diff --git a/docker_mounts.go b/docker_mounts.go index aed3010361..d8af3fae3e 100644 --- a/docker_mounts.go +++ b/docker_mounts.go @@ -126,9 +126,7 @@ func mapToDockerMounts(containerMounts ContainerMounts) []mount.Mount { Labels: make(map[string]string), } } - for k, v := range GenericLabels() { - containerMount.VolumeOptions.Labels[k] = v - } + AddGenericLabels(containerMount.VolumeOptions.Labels) } mounts = append(mounts, containerMount) diff --git a/generic.go b/generic.go index 9052287b51..fd13a607de 100644 --- a/generic.go +++ b/generic.go @@ -101,7 +101,17 @@ type GenericProvider interface { ImageProvider } -// GenericLabels returns a map of labels that can be used to identify containers created by this library +// GenericLabels returns a map of labels that can be used to identify resources +// created by this library. This includes the standard LabelSessionID if the +// reaper is enabled, otherwise this is excluded to prevent resources being +// incorrectly reaped. func GenericLabels() map[string]string { return core.DefaultLabels(core.SessionID()) } + +// AddGenericLabels adds the generic labels to target. +func AddGenericLabels(target map[string]string) { + for k, v := range GenericLabels() { + target[k] = v + } +} diff --git a/internal/core/labels.go b/internal/core/labels.go index b5da2fb29d..0814924234 100644 --- a/internal/core/labels.go +++ b/internal/core/labels.go @@ -6,23 +6,53 @@ import ( "strings" "github.com/testcontainers/testcontainers-go/internal" + "github.com/testcontainers/testcontainers-go/internal/config" ) const ( - LabelBase = "org.testcontainers" - LabelLang = LabelBase + ".lang" - LabelReaper = LabelBase + ".reaper" - LabelRyuk = LabelBase + ".ryuk" + // LabelBase is the base label for all testcontainers labels. + LabelBase = "org.testcontainers" + + // LabelLang specifies the language which created the test container. + LabelLang = LabelBase + ".lang" + + // LabelReaper identifies the container as a reaper. + LabelReaper = LabelBase + ".reaper" + + // LabelRyuk identifies the container as a ryuk. + LabelRyuk = LabelBase + ".ryuk" + + // LabelSessionID specifies the session ID of the container. LabelSessionID = LabelBase + ".sessionId" - LabelVersion = LabelBase + ".version" + + // LabelVersion specifies the version of testcontainers which created the container. + LabelVersion = LabelBase + ".version" + + // LabelReap specifies the container should be reaped by the reaper. + LabelReap = LabelBase + ".reap" ) +// DefaultLabels returns the standard set of labels which +// includes LabelSessionID if the reaper is enabled. func DefaultLabels(sessionID string) map[string]string { - return map[string]string{ + labels := map[string]string{ LabelBase: "true", LabelLang: "go", - LabelSessionID: sessionID, LabelVersion: internal.Version, + LabelSessionID: sessionID, + } + + if !config.Read().RyukDisabled { + labels[LabelReap] = "true" + } + + return labels +} + +// AddDefaultLabels adds the default labels for sessionID to target. +func AddDefaultLabels(sessionID string, target map[string]string) { + for k, v := range DefaultLabels(sessionID) { + target[k] = v } } diff --git a/lifecycle.go b/lifecycle.go index ff1472d043..57833dafc1 100644 --- a/lifecycle.go +++ b/lifecycle.go @@ -33,7 +33,7 @@ type ContainerRequestHook func(ctx context.Context, req ContainerRequest) error // - Terminating // - Terminated // For that, it will receive a Container, modify it and return an error if needed. -type ContainerHook func(ctx context.Context, container Container) error +type ContainerHook func(ctx context.Context, ctr Container) error // ContainerLifecycleHooks is a struct that contains all the hooks that can be used // to modify the container lifecycle. All the container lifecycle hooks except the PreCreates hooks diff --git a/modules/compose/compose.go b/modules/compose/compose.go index c63eb73bb1..fa02cde077 100644 --- a/modules/compose/compose.go +++ b/modules/compose/compose.go @@ -153,23 +153,6 @@ func NewDockerComposeWith(opts ...ComposeStackOption) (*dockerCompose, error) { return nil, fmt.Errorf("initialize docker client: %w", err) } - reaperProvider, err := testcontainers.NewDockerProvider() - if err != nil { - return nil, fmt.Errorf("failed to create reaper provider for compose: %w", err) - } - - var composeReaper *testcontainers.Reaper - if !reaperProvider.Config().Config.RyukDisabled { - // NewReaper is deprecated: we need to find a way to create the reaper for compose - // bypassing the deprecation. - r, err := testcontainers.NewReaper(context.Background(), testcontainers.SessionID(), reaperProvider, "") - if err != nil { - return nil, fmt.Errorf("failed to create reaper for compose: %w", err) - } - - composeReaper = r - } - composeAPI := &dockerCompose{ name: composeOptions.Identifier, configs: composeOptions.Paths, @@ -182,7 +165,6 @@ func NewDockerComposeWith(opts ...ComposeStackOption) (*dockerCompose, error) { containers: make(map[string]*testcontainers.DockerContainer), networks: make(map[string]*testcontainers.DockerNetwork), sessionID: testcontainers.SessionID(), - reaper: composeReaper, } return composeAPI, nil diff --git a/modules/compose/compose_api.go b/modules/compose/compose_api.go index 9f21d09e87..2c852bdaff 100644 --- a/modules/compose/compose_api.go +++ b/modules/compose/compose_api.go @@ -2,6 +2,7 @@ package compose import ( "context" + "errors" "fmt" "io" "os" @@ -228,9 +229,6 @@ type dockerCompose struct { // sessionID is used to identify the reaper session sessionID string - - // reaper is used to clean up containers after the stack is stopped - reaper *testcontainers.Reaper } func (d *dockerCompose) ServiceContainer(ctx context.Context, svcName string) (*testcontainers.DockerContainer, error) { @@ -269,12 +267,10 @@ func (d *dockerCompose) Down(ctx context.Context, opts ...StackDownOption) error return d.composeService.Down(ctx, d.name, options.DownOptions) } -func (d *dockerCompose) Up(ctx context.Context, opts ...StackUpOption) error { +func (d *dockerCompose) Up(ctx context.Context, opts ...StackUpOption) (err error) { d.lock.Lock() defer d.lock.Unlock() - var err error - d.project, err = d.compileProject(ctx) if err != nil { return err @@ -329,27 +325,61 @@ func (d *dockerCompose) Up(ctx context.Context, opts ...StackUpOption) error { return err } - if d.reaper != nil { + provider, err := testcontainers.NewDockerProvider(testcontainers.WithLogger(d.logger)) + if err != nil { + return fmt.Errorf("new docker provider: %w", err) + } + + var termSignals []chan bool + var reaper *testcontainers.Reaper + if !provider.Config().Config.RyukDisabled { + // NewReaper is deprecated: we need to find a way to create the reaper for compose + // bypassing the deprecation. + reaper, err = testcontainers.NewReaper(ctx, testcontainers.SessionID(), provider, "") + if err != nil { + return fmt.Errorf("create reaper: %w", err) + } + + // Cleanup on error, otherwise set termSignal to nil before successful return. + defer func() { + if len(termSignals) == 0 { + // Need to call Connect at least once to ensure the initial + // connection is cleaned up. + termSignal, errc := reaper.Connect() + if errc != nil { + err = errors.Join(err, fmt.Errorf("reaper connect: %w", errc)) + } else { + termSignal <- true + } + } + + if err == nil { + // No need to cleanup. + return + } + + for _, ts := range termSignals { + ts <- true + } + }() + + // Connect to the reaper and set the termination signal for each network. for _, n := range d.networks { - termSignal, err := d.reaper.Connect() + termSignal, err := reaper.Connect() if err != nil { - return fmt.Errorf("failed to connect to reaper: %w", err) + return fmt.Errorf("reaper connect: %w", err) } - n.SetTerminationSignal(termSignal) - // Cleanup on error, otherwise set termSignal to nil before successful return. - defer func() { - if termSignal != nil { - termSignal <- true - } - }() + n.SetTerminationSignal(termSignal) + termSignals = append(termSignals, termSignal) } } errGrpContainers, errGrpCtx := errgroup.WithContext(ctx) + // Lookup the containers for each service and connect them + // to the reaper if needed. for _, srv := range d.project.Services { - // we are going to connect each container to the reaper srv := srv errGrpContainers.Go(func() error { dc, err := d.lookupContainer(errGrpCtx, srv.Name) @@ -357,19 +387,14 @@ func (d *dockerCompose) Up(ctx context.Context, opts ...StackUpOption) error { return err } - if d.reaper != nil { - termSignal, err := d.reaper.Connect() + if reaper != nil { + termSignal, err := reaper.Connect() if err != nil { - return fmt.Errorf("failed to connect to reaper: %w", err) + return fmt.Errorf("reaper connect: %w", err) } - dc.SetTerminationSignal(termSignal) - // Cleanup on error, otherwise set termSignal to nil before successful return. - defer func() { - if termSignal != nil { - termSignal <- true - } - }() + dc.SetTerminationSignal(termSignal) + termSignals = append(termSignals, termSignal) } return nil @@ -401,7 +426,11 @@ func (d *dockerCompose) Up(ctx context.Context, opts ...StackUpOption) error { }) } - return errGrpWait.Wait() + if err := errGrpWait.Wait(); err != nil { + return fmt.Errorf("wait for services: %w", err) + } + + return nil } func (d *dockerCompose) WaitForService(s string, strategy wait.Strategy) ComposeStack { @@ -486,6 +515,9 @@ func (d *dockerCompose) lookupContainer(ctx context.Context, svcName string) (*t return ctr, nil } +// lookupNetworks is used to retrieve the networks that are part of the compose stack. +// +// Safe for concurrent calls. func (d *dockerCompose) lookupNetworks(ctx context.Context) error { networks, err := d.dockerClient.NetworkList(ctx, dockernetwork.ListOptions{ Filters: filters.NewArgs( @@ -543,9 +575,7 @@ func (d *dockerCompose) compileProject(ctx context.Context) (*types.Project, err api.OneoffLabel: "False", // default, will be overridden by `run` command } - for k, label := range testcontainers.GenericLabels() { - s.CustomLabels[k] = label - } + testcontainers.AddGenericLabels(s.CustomLabels) for i, envFile := range compiledOptions.EnvFiles { // add a label for each env file, indexed by its position @@ -562,9 +592,7 @@ func (d *dockerCompose) compileProject(ctx context.Context) (*types.Project, err api.VersionLabel: api.ComposeVersion, } - for k, label := range testcontainers.GenericLabels() { - n.Labels[k] = label - } + testcontainers.AddGenericLabels(n.Labels) proj.Networks[key] = n } diff --git a/modules/compose/compose_api_test.go b/modules/compose/compose_api_test.go index 9a0a4841b1..e5f30a5257 100644 --- a/modules/compose/compose_api_test.go +++ b/modules/compose/compose_api_test.go @@ -48,8 +48,7 @@ func TestDockerComposeAPIStrategyForInvalidService(t *testing.T) { WaitForService("non-existent-srv-1", wait.NewLogStrategy("started").WithStartupTimeout(10*time.Second).WithOccurrence(1)). Up(ctx, Wait(true)) cleanup(t, compose) - require.Error(t, err, "Expected error to be thrown because service with wait strategy is not running") - require.Equal(t, "no container found for service name non-existent-srv-1", err.Error()) + require.EqualError(t, err, "wait for services: no container found for service name non-existent-srv-1") serviceNames := compose.Services() diff --git a/reaper.go b/reaper.go index c41520b5b7..8f2bde8ab6 100644 --- a/reaper.go +++ b/reaper.go @@ -1,13 +1,16 @@ package testcontainers import ( - "bufio" + "bytes" "context" + "errors" "fmt" - "math/rand" + "io" "net" + "os" "strings" "sync" + "syscall" "time" "github.com/cenkalti/backoff/v4" @@ -34,9 +37,23 @@ const ( var ( // Deprecated: it has been replaced by an internal value ReaperDefaultImage = config.ReaperDefaultImage - reaperInstance *Reaper // We would like to create reaper only once - reaperMutex sync.Mutex - reaperOnce sync.Once + + // defaultReaperPort is the default port that the reaper listens on if not + // overridden by the RYUK_PORT environment variable. + defaultReaperPort = nat.Port("8080/tcp") + + // errReaperNotFound is returned when no reaper container is found. + errReaperNotFound = errors.New("reaper not found") + + // errReaperDisabled is returned if a reaper is requested but the + // config has it disabled. + errReaperDisabled = errors.New("reaper disabled") + + // spawner is the singleton instance of reaperSpawner. + spawner = &reaperSpawner{} + + // reaperAck is the expected response from the reaper container. + reaperAck = []byte("ACK\n") ) // ReaperProvider represents a provider for the reaper to run itself with @@ -47,10 +64,18 @@ type ReaperProvider interface { } // NewReaper creates a Reaper with a sessionID to identify containers and a provider to use -// Deprecated: it's not possible to create a reaper anymore. Compose module uses this method +// Deprecated: it's not possible to create a reaper any more. Compose module uses this method // to create a reaper for the compose stack. +// +// The caller must call Connect at least once on the returned Reaper and use the returned +// result otherwise the reaper will be kept open until the process exits. func NewReaper(ctx context.Context, sessionID string, provider ReaperProvider, reaperImageName string) (*Reaper, error) { - return reuseOrCreateReaper(ctx, sessionID, provider) + reaper, err := spawner.reaper(ctx, sessionID, provider) + if err != nil { + return nil, fmt.Errorf("reaper: %w", err) + } + + return reaper, nil } // reaperContainerNameFromSessionID returns the container name that uniquely @@ -61,31 +86,80 @@ func reaperContainerNameFromSessionID(sessionID string) string { return fmt.Sprintf("reaper_%s", sessionID) } -// lookUpReaperContainer returns a DockerContainer type with the reaper container in the case -// it's found in the running state, and including the labels for sessionID, reaper, and ryuk. -// It will perform a retry with exponential backoff to allow for the container to be started and -// avoid potential false negatives. -func lookUpReaperContainer(ctx context.Context, sessionID string) (*DockerContainer, error) { - dockerClient, err := NewDockerClientWithOpts(ctx) - if err != nil { - return nil, err +// reaperSpawner is a singleton that manages the reaper container. +type reaperSpawner struct { + instance *Reaper + mtx sync.Mutex +} + +// port returns the port that a new reaper should listens on. +func (r *reaperSpawner) port() nat.Port { + if port := os.Getenv("RYUK_PORT"); port != "" { + natPort, err := nat.NewPort("tcp", port) + if err != nil { + panic(fmt.Sprintf("invalid RYUK_PORT value %q: %s", port, err)) + } + return natPort } - defer dockerClient.Close() - // the backoff will take at most 5 seconds to find the reaper container - // doing each attempt every 100ms - exp := backoff.NewExponentialBackOff() + return defaultReaperPort +} - // we want random intervals between 100ms and 500ms for concurrent executions +// backoff returns a backoff policy for the reaper spawner. +// It will take at most 20 seconds, doing each attempt every 100ms - 250ms. +func (r *reaperSpawner) backoff() *backoff.ExponentialBackOff { + // We want random intervals between 100ms and 250ms for concurrent executions // to not be synchronized: it could be the case that multiple executions of this // function happen at the same time (specifically when called from a different test // process execution), and we want to avoid that they all try to find the reaper // container at the same time. - exp.InitialInterval = time.Duration(rand.Intn(5)*100) * time.Millisecond - exp.RandomizationFactor = rand.Float64() * 0.5 - exp.Multiplier = rand.Float64() * 2.0 - exp.MaxInterval = 5.0 * time.Second // max interval between attempts - exp.MaxElapsedTime = 1 * time.Minute // max time to keep trying + b := &backoff.ExponentialBackOff{ + InitialInterval: time.Millisecond * 100, + RandomizationFactor: backoff.DefaultRandomizationFactor, + Multiplier: backoff.DefaultMultiplier, + // Adjust MaxInterval to compensate for randomization factor which can be added to + // returned interval so we have a maximum of 250ms. + MaxInterval: time.Duration(float64(time.Millisecond*250) * backoff.DefaultRandomizationFactor), + MaxElapsedTime: time.Second * 20, + Stop: backoff.Stop, + Clock: backoff.SystemClock, + } + b.Reset() + + return b +} + +// cleanup terminates the reaper container if set. +func (r *reaperSpawner) cleanup() error { + r.mtx.Lock() + defer r.mtx.Unlock() + + return r.cleanupLocked() +} + +// cleanupLocked terminates the reaper container if set. +// It must be called with the lock held. +func (r *reaperSpawner) cleanupLocked() error { + if r.instance == nil { + return nil + } + + err := TerminateContainer(r.instance.container) + r.instance = nil + + return err +} + +// lookupContainer returns a DockerContainer type with the reaper container in the case +// it's found in the running state, and including the labels for sessionID, reaper, and ryuk. +// It will perform a retry with exponential backoff to allow for the container to be started and +// avoid potential false negatives. +func (r *reaperSpawner) lookupContainer(ctx context.Context, sessionID string) (*DockerContainer, error) { + dockerClient, err := NewDockerClientWithOpts(ctx) + if err != nil { + return nil, fmt.Errorf("new client: %w", err) + } + defer dockerClient.Close() opts := container.ListOptions{ All: true, @@ -97,159 +171,212 @@ func lookUpReaperContainer(ctx context.Context, sessionID string) (*DockerContai ), } - return backoff.RetryNotifyWithData( + return backoff.RetryWithData( func() (*DockerContainer, error) { resp, err := dockerClient.ContainerList(ctx, opts) if err != nil { - return nil, err + return nil, fmt.Errorf("container list: %w", err) } if len(resp) == 0 { - // reaper container not found in the running state: do not look for it again - return nil, nil + // No reaper container not found. + return nil, backoff.Permanent(errReaperNotFound) } if len(resp) > 1 { - return nil, fmt.Errorf("not possible to have multiple reaper containers found for session ID %s", sessionID) + return nil, fmt.Errorf("multiple reaper containers found for session ID %s", sessionID) } - r, err := containerFromDockerResponse(ctx, resp[0]) + container := resp[0] + r, err := containerFromDockerResponse(ctx, container) if err != nil { - return nil, err + return nil, fmt.Errorf("from docker: %w", err) } - if r.healthStatus == types.Healthy || r.healthStatus == types.NoHealthcheck { + switch { + case r.healthStatus == types.Healthy, + r.healthStatus == types.NoHealthcheck: return r, nil - } - - // if a health status is present on the container, and the container is healthy, error - if r.healthStatus != "" { - return nil, fmt.Errorf("container %s is not healthy, wanted status=%s, got status=%s", resp[0].ID[:8], types.Healthy, r.healthStatus) + case r.healthStatus != "": + return nil, fmt.Errorf("container not healthy: %s", r.healthStatus) } return r, nil }, - backoff.WithContext(exp, ctx), - func(err error, duration time.Duration) { - Logger.Printf("Error looking up reaper container, will retry: %v", err) - }, + backoff.WithContext(r.backoff(), ctx), ) } -// reuseOrCreateReaper returns an existing Reaper instance if it exists and is running. Otherwise, a new Reaper instance -// will be created with a sessionID to identify containers in the same test session/program. -func reuseOrCreateReaper(ctx context.Context, sessionID string, provider ReaperProvider) (*Reaper, error) { - reaperMutex.Lock() - defer reaperMutex.Unlock() - - // 1. if the reaper instance has been already created, return it - if reaperInstance != nil { - // Verify this instance is still running by checking state. - // Can't use Container.IsRunning because the bool is not updated when Reaper is terminated - state, err := reaperInstance.container.State(ctx) - if err != nil { - if !errdefs.IsNotFound(err) { - return nil, err +// isRunning returns an error if the container is not running. +func (r *reaperSpawner) isRunning(ctx context.Context, ctr Container) error { + state, err := ctr.State(ctx) + if err != nil { + return fmt.Errorf("container state: %w", err) + } + + if !state.Running { + // Use NotFound error to indicate the container is not running + // and should be recreated. + return errdefs.NotFound(fmt.Errorf("container state: %s", state.Status)) + } + + return nil +} + +// retryError returns a permanent error if the error is not considered retryable. +func (r *reaperSpawner) retryError(err error) error { + var timeout interface { + Timeout() bool + } + switch { + case isCleanupSafe(err), + createContainerFailDueToNameConflictRegex.MatchString(err.Error()), + errors.Is(err, syscall.ECONNREFUSED), + errors.Is(err, syscall.ECONNRESET), + errors.Is(err, syscall.ECONNABORTED), + errors.Is(err, syscall.ETIMEDOUT), + errors.Is(err, os.ErrDeadlineExceeded), + errors.As(err, &timeout) && timeout.Timeout(), + errors.Is(err, context.DeadlineExceeded), + errors.Is(err, context.Canceled): + // Retryable error. + return err + default: + return backoff.Permanent(err) + } +} + +// reaper returns an existing Reaper instance if it exists and is running, otherwise +// a new Reaper instance will be created with a sessionID to identify containers in +// the same test session/program. If connect is true, the reaper will be connected +// to the reaper container. +// Returns an error if config.RyukDisabled is true. +// +// Safe for concurrent calls. +func (r *reaperSpawner) reaper(ctx context.Context, sessionID string, provider ReaperProvider) (*Reaper, error) { + if config.Read().RyukDisabled { + return nil, errReaperDisabled + } + + r.mtx.Lock() + defer r.mtx.Unlock() + + return backoff.RetryWithData( + r.retryLocked(ctx, sessionID, provider), + backoff.WithContext(r.backoff(), ctx), + ) +} + +// retryLocked returns a function that can be used to create or reuse a reaper container. +// If connect is true, the reaper will be connected to the reaper container. +// It must be called with the lock held. +func (r *reaperSpawner) retryLocked(ctx context.Context, sessionID string, provider ReaperProvider) func() (*Reaper, error) { + return func() (reaper *Reaper, err error) { //nolint:nonamedreturns // Needed for deferred error check. + reaper, err = r.reuseOrCreate(ctx, sessionID, provider) + // Ensure that the reaper is terminated if an error occurred. + defer func() { + if err != nil { + if reaper != nil { + err = errors.Join(err, TerminateContainer(reaper.container)) + } + err = r.retryError(errors.Join(err, r.cleanupLocked())) } - } else if state.Running { - return reaperInstance, nil - } - // else: the reaper instance has been terminated, so we need to create a new one - reaperOnce = sync.Once{} - } - - // 2. because the reaper instance has not been created yet, look for it in the Docker daemon, which - // will happen if the reaper container has been created in the same test session but in a different - // test process execution (e.g. when running tests in parallel), not having initialized the reaper - // instance yet. - reaperContainer, err := lookUpReaperContainer(context.Background(), sessionID) - if err == nil && reaperContainer != nil { - // The reaper container exists as a Docker container: re-use it - Logger.Printf("🔥 Reaper obtained from Docker for this test session %s", reaperContainer.ID) - reaperInstance, err = reuseReaperContainer(ctx, sessionID, provider, reaperContainer) + }() if err != nil { return nil, err } - return reaperInstance, nil - } + if err = r.isRunning(ctx, reaper.container); err != nil { + return nil, err + } - // 3. the reaper container does not exist in the Docker daemon: create it, and do it using the - // synchronization primitive to avoid multiple executions of this function to create the reaper - var reaperErr error - reaperOnce.Do(func() { - r, err := newReaper(ctx, sessionID, provider) + // Check we can still connect. + termSignal, err := reaper.connect(ctx) if err != nil { - reaperErr = err - return + return nil, fmt.Errorf("connect: %w", err) } - reaperInstance, reaperErr = r, nil - }) - if reaperErr != nil { - reaperOnce = sync.Once{} - return nil, reaperErr - } + reaper.setOrSignal(termSignal) + + r.instance = reaper - return reaperInstance, nil + return reaper, nil + } } -// reuseReaperContainer constructs a Reaper from an already running reaper -// DockerContainer. -func reuseReaperContainer(ctx context.Context, sessionID string, provider ReaperProvider, reaperContainer *DockerContainer) (*Reaper, error) { - endpoint, err := reaperContainer.PortEndpoint(ctx, "8080", "") +// reuseOrCreate returns an existing Reaper instance if it exists, otherwise a new Reaper instance. +func (r *reaperSpawner) reuseOrCreate(ctx context.Context, sessionID string, provider ReaperProvider) (*Reaper, error) { + if r.instance != nil { + // We already have an associated reaper. + return r.instance, nil + } + + // Look for an existing reaper created in the same test session but in a + // different test process execution e.g. when running tests in parallel. + container, err := r.lookupContainer(context.Background(), sessionID) if err != nil { - return nil, err + if !errors.Is(err, errReaperNotFound) { + return nil, fmt.Errorf("look up container: %w", err) + } + + // The reaper container was not found, continue to create a new one. + reaper, err := r.newReaper(ctx, sessionID, provider) + if err != nil { + return nil, fmt.Errorf("new reaper: %w", err) + } + + return reaper, nil } - Logger.Printf("⏳ Waiting for Reaper port to be ready") + // A reaper container exists re-use it. + reaper, err := r.fromContainer(ctx, sessionID, provider, container) + if err != nil { + return nil, fmt.Errorf("from container %q: %w", container.ID[:8], err) + } + + return reaper, nil +} - var containerJson *types.ContainerJSON +// fromContainer constructs a Reaper from an already running reaper DockerContainer. +func (r *reaperSpawner) fromContainer(ctx context.Context, sessionID string, provider ReaperProvider, dockerContainer *DockerContainer) (*Reaper, error) { + Logger.Printf("⏳ Waiting for Reaper %q to be ready", dockerContainer.ID[:8]) - if containerJson, err = reaperContainer.Inspect(ctx); err != nil { - return nil, fmt.Errorf("failed to inspect reaper container %s: %w", reaperContainer.ID[:8], err) + // Reusing an existing container so we determine the port from the container's exposed ports. + if err := wait.ForExposedPort(). + WithPollInterval(100*time.Millisecond). + SkipInternalCheck(). + WaitUntilReady(ctx, dockerContainer); err != nil { + return nil, fmt.Errorf("wait for reaper %s: %w", dockerContainer.ID[:8], err) } - if containerJson != nil && containerJson.NetworkSettings != nil { - for port := range containerJson.NetworkSettings.Ports { - err := wait.ForListeningPort(port). - WithPollInterval(100*time.Millisecond). - WaitUntilReady(ctx, reaperContainer) - if err != nil { - return nil, fmt.Errorf("failed waiting for reaper container %s port %s/%s to be ready: %w", - reaperContainer.ID[:8], port.Proto(), port.Port(), err) - } - } + endpoint, err := dockerContainer.Endpoint(ctx, "") + if err != nil { + return nil, fmt.Errorf("port endpoint: %w", err) } + Logger.Printf("🔥 Reaper obtained from Docker for this test session %s", dockerContainer.ID[:8]) + return &Reaper{ Provider: provider, SessionID: sessionID, Endpoint: endpoint, - container: reaperContainer, + container: dockerContainer, }, nil } -// newReaper creates a Reaper with a sessionID to identify containers and a -// provider to use. Do not call this directly, use reuseOrCreateReaper instead. -func newReaper(ctx context.Context, sessionID string, provider ReaperProvider) (*Reaper, error) { +// newReaper creates a connected Reaper with a sessionID to identify containers +// and a provider to use. +func (r *reaperSpawner) newReaper(ctx context.Context, sessionID string, provider ReaperProvider) (reaper *Reaper, err error) { //nolint:nonamedreturns // Needed for deferred error check. dockerHostMount := core.MustExtractDockerSocket(ctx) - reaper := &Reaper{ - Provider: provider, - SessionID: sessionID, - } - - listeningPort := nat.Port("8080/tcp") - + port := r.port() tcConfig := provider.Config().Config - req := ContainerRequest{ Image: config.ReaperDefaultImage, - ExposedPorts: []string{string(listeningPort)}, + ExposedPorts: []string{string(port)}, Labels: core.DefaultLabels(sessionID), Privileged: tcConfig.RyukPrivileged, - WaitingFor: wait.ForListeningPort(listeningPort), + WaitingFor: wait.ForListeningPort(port), Name: reaperContainerNameFromSessionID(sessionID), HostConfigModifier: func(hc *container.HostConfig) { hc.AutoRemove = true @@ -268,9 +395,10 @@ func newReaper(ctx context.Context, sessionID string, provider ReaperProvider) ( req.Env["RYUK_VERBOSE"] = "true" } - // include reaper-specific labels to the reaper container + // Setup reaper-specific labels for the reaper container. req.Labels[core.LabelReaper] = "true" req.Labels[core.LabelRyuk] = "true" + delete(req.Labels, core.LabelReap) // Attach reaper container to a requested network if it is specified if p, ok := provider.(*DockerProvider); ok { @@ -278,123 +406,158 @@ func newReaper(ctx context.Context, sessionID string, provider ReaperProvider) ( } c, err := provider.RunContainer(ctx, req) - if err != nil { - // We need to check whether the error is caused by a container with the same name - // already existing due to race conditions. We manually match the error message - // as we do not have any error types to check against. - if createContainerFailDueToNameConflictRegex.MatchString(err.Error()) { - // Manually retrieve the already running reaper container. However, we need to - // use retries here as there are two possible race conditions that might lead to - // errors: In most cases, there is a small delay between container creation and - // actually being visible in list-requests. This means that creation might fail - // due to name conflicts, but when we list containers with this name, we do not - // get any results. In another case, the container might have simply died in the - // meantime and therefore cannot be found. - const timeout = 5 * time.Second - const cooldown = 100 * time.Millisecond - start := time.Now() - var reaperContainer *DockerContainer - for time.Since(start) < timeout { - reaperContainer, err = lookUpReaperContainer(ctx, sessionID) - if err == nil && reaperContainer != nil { - break - } - select { - case <-ctx.Done(): - case <-time.After(cooldown): - } - } - if err != nil { - return nil, fmt.Errorf("look up reaper container due to name conflict failed: %w", err) - } - // If the reaper container was not found, it is most likely to have died in - // between as we can exclude any client errors because of the previous error - // check. Because the reaper should only die if it performed clean-ups, we can - // fail here as the reaper timeout needs to be increased, anyway. - if reaperContainer == nil { - return nil, fmt.Errorf("look up reaper container returned nil although creation failed due to name conflict") - } - Logger.Printf("🔥 Reaper obtained from Docker for this test session %s", reaperContainer.ID) - reaper, err := reuseReaperContainer(ctx, sessionID, provider, reaperContainer) - if err != nil { - return nil, err - } - return reaper, nil + defer func() { + if err != nil { + err = errors.Join(err, TerminateContainer(c)) } - return nil, err + }() + if err != nil { + return nil, fmt.Errorf("run container: %w", err) } - reaper.container = c - endpoint, err := c.PortEndpoint(ctx, "8080", "") + endpoint, err := c.PortEndpoint(ctx, port, "") if err != nil { - return nil, err + return nil, fmt.Errorf("port endpoint: %w", err) } - reaper.Endpoint = endpoint - return reaper, nil + return &Reaper{ + Provider: provider, + SessionID: sessionID, + Endpoint: endpoint, + container: c, + }, nil } // Reaper is used to start a sidecar container that cleans up resources type Reaper struct { - Provider ReaperProvider - SessionID string - Endpoint string - container Container + Provider ReaperProvider + SessionID string + Endpoint string + container Container + mtx sync.Mutex // Protects termSignal. + termSignal chan bool } -// Connect runs a goroutine which can be terminated by sending true into the returned channel +// Connect connects to the reaper container and sends the labels to it +// so that it can clean up the containers with the same labels. +// +// It returns a channel that can be closed to terminate the connection. +// Returns an error if config.RyukDisabled is true. func (r *Reaper) Connect() (chan bool, error) { - conn, err := net.DialTimeout("tcp", r.Endpoint, 10*time.Second) - if err != nil { - return nil, fmt.Errorf("%w: Connecting to Ryuk on %s failed", err, r.Endpoint) + if config.Read().RyukDisabled { + return nil, errReaperDisabled } - terminationSignal := make(chan bool) - go func(conn net.Conn) { - sock := bufio.NewReadWriter(bufio.NewReader(conn), bufio.NewWriter(conn)) - defer conn.Close() + if termSignal := r.useTermSignal(); termSignal != nil { + return termSignal, nil + } - labelFilters := []string{} - for l, v := range core.DefaultLabels(r.SessionID) { - labelFilters = append(labelFilters, fmt.Sprintf("label=%s=%s", l, v)) - } + ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) + defer cancel() - retryLimit := 3 - for retryLimit > 0 { - retryLimit-- + return r.connect(ctx) +} - if _, err := sock.WriteString(strings.Join(labelFilters, "&")); err != nil { - continue - } +// close signals the connection to close if needed. +// Safe for concurrent calls. +func (r *Reaper) close() { + r.mtx.Lock() + defer r.mtx.Unlock() - if _, err := sock.WriteString("\n"); err != nil { - continue - } + if r.termSignal != nil { + r.termSignal <- true + r.termSignal = nil + } +} - if err := sock.Flush(); err != nil { - continue - } +// setOrSignal sets the reapers termSignal field if nil +// otherwise consumes by sending true to it. +// Safe for concurrent calls. +func (r *Reaper) setOrSignal(termSignal chan bool) { + r.mtx.Lock() + defer r.mtx.Unlock() + + if r.termSignal != nil { + // Already have an existing connection, close the new one. + termSignal <- true + return + } - resp, err := sock.ReadString('\n') - if err != nil { - continue - } + // First or new unused termSignal, assign for caller to reuse. + r.termSignal = termSignal +} - if resp == "ACK\n" { - break - } - } +// useTermSignal if termSignal is not nil returns it +// and sets it to nil, otherwise returns nil. +// +// Safe for concurrent calls. +func (r *Reaper) useTermSignal() chan bool { + r.mtx.Lock() + defer r.mtx.Unlock() + + if r.termSignal == nil { + return nil + } + + // Use existing connection. + term := r.termSignal + r.termSignal = nil + + return term +} +// connect connects to the reaper container and sends the labels to it +// so that it can clean up the containers with the same labels. +// +// It returns a channel that can be sent true to terminate the connection. +// Returns an error if config.RyukDisabled is true. +func (r *Reaper) connect(ctx context.Context) (chan bool, error) { + var d net.Dialer + conn, err := d.DialContext(ctx, "tcp", r.Endpoint) + if err != nil { + return nil, fmt.Errorf("dial reaper %s: %w", r.Endpoint, err) + } + + terminationSignal := make(chan bool) + go func() { + defer conn.Close() + if err := r.handshake(conn); err != nil { + Logger.Printf("Reaper handshake failed: %s", err) + } <-terminationSignal - }(conn) + }() return terminationSignal, nil } +// handshake sends the labels to the reaper container and reads the ACK. +func (r *Reaper) handshake(conn net.Conn) error { + labels := core.DefaultLabels(r.SessionID) + labelFilters := make([]string, 0, len(labels)) + for l, v := range labels { + labelFilters = append(labelFilters, fmt.Sprintf("label=%s=%s", l, v)) + } + + filters := []byte(strings.Join(labelFilters, "&") + "\n") + buf := make([]byte, 4) + if _, err := conn.Write(filters); err != nil { + return fmt.Errorf("writing filters: %w", err) + } + + n, err := io.ReadFull(conn, buf) + if err != nil { + return fmt.Errorf("read ack: %w", err) + } + + if !bytes.Equal(reaperAck, buf[:n]) { + // We have received the ACK so all done. + return fmt.Errorf("unexpected reaper response: %s", buf[:n]) + } + + return nil +} + // Labels returns the container labels to use so that this Reaper cleans them up // Deprecated: internally replaced by core.DefaultLabels(sessionID) func (r *Reaper) Labels() map[string]string { - return map[string]string{ - core.LabelLang: "go", - core.LabelSessionID: r.SessionID, - } + return GenericLabels() } diff --git a/reaper_test.go b/reaper_test.go index f421c2686d..2cef02de6e 100644 --- a/reaper_test.go +++ b/reaper_test.go @@ -4,14 +4,15 @@ import ( "context" "errors" "os" + "strconv" "sync" "testing" "time" "github.com/docker/docker/api/types/container" "github.com/docker/docker/api/types/network" + "github.com/docker/docker/errdefs" "github.com/docker/go-connections/nat" - "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/testcontainers/testcontainers-go/internal/config" @@ -23,48 +24,29 @@ import ( const testSessionID = "this-is-a-different-session-id" type mockReaperProvider struct { - req ContainerRequest - hostConfig *container.HostConfig - enpointSettings map[string]*network.EndpointSettings - config TestcontainersConfig - initialReaper *Reaper - initialReaperOnce sync.Once - t *testing.T + req ContainerRequest + hostConfig *container.HostConfig + endpointSettings map[string]*network.EndpointSettings + config TestcontainersConfig } -func newMockReaperProvider(t *testing.T) *mockReaperProvider { +func newMockReaperProvider(cfg config.Config) *mockReaperProvider { m := &mockReaperProvider{ config: TestcontainersConfig{ - Config: config.Config{}, + Config: cfg, }, - t: t, - initialReaper: reaperInstance, - //nolint:govet - initialReaperOnce: reaperOnce, } - // explicitly reset the reaperInstance to nil to start from a fresh state - reaperInstance = nil - reaperOnce = sync.Once{} - return m } var errExpected = errors.New("expected") -func (m *mockReaperProvider) RestoreReaperState() { - m.t.Cleanup(func() { - reaperInstance = m.initialReaper - //nolint:govet - reaperOnce = m.initialReaperOnce - }) -} - func (m *mockReaperProvider) RunContainer(ctx context.Context, req ContainerRequest) (Container, error) { m.req = req m.hostConfig = &container.HostConfig{} - m.enpointSettings = map[string]*network.EndpointSettings{} + m.endpointSettings = map[string]*network.EndpointSettings{} if req.HostConfigModifier == nil { req.HostConfigModifier = defaultHostConfigModifier(req) @@ -72,7 +54,7 @@ func (m *mockReaperProvider) RunContainer(ctx context.Context, req ContainerRequ req.HostConfigModifier(m.hostConfig) if req.EnpointSettingsModifier != nil { - req.EnpointSettingsModifier(m.enpointSettings) + req.EnpointSettingsModifier(m.endpointSettings) } // we're only interested in the request, so instead of mocking the Docker client @@ -84,8 +66,8 @@ func (m *mockReaperProvider) Config() TestcontainersConfig { return m.config } -// createContainerRequest creates the expected request and allows for customization -func createContainerRequest(customize func(ContainerRequest) ContainerRequest) ContainerRequest { +// expectedReaperRequest creates the expected reaper container request with the given customizations. +func expectedReaperRequest(customize ...func(*ContainerRequest)) ContainerRequest { req := ContainerRequest{ Image: config.ReaperDefaultImage, ExposedPorts: []string{"8080/tcp"}, @@ -102,21 +84,26 @@ func createContainerRequest(customize func(ContainerRequest) ContainerRequest) C req.Labels[core.LabelReaper] = "true" req.Labels[core.LabelRyuk] = "true" + delete(req.Labels, core.LabelReap) - if customize == nil { - return req + for _, customize := range customize { + customize(&req) } - return customize(req) + return req } -func TestContainerStartsWithoutTheReaper(t *testing.T) { - config.Reset() // reset the config using the internal method to avoid the sync.Once - tcConfig := config.Read() - if !tcConfig.RyukDisabled { - t.Skip("Ryuk is enabled, skipping test") - } +// reaperDisable disables / enables the reaper for the duration of the test. +func reaperDisable(t *testing.T, disabled bool) { + t.Helper() + + config.Reset() + t.Setenv("TESTCONTAINERS_RYUK_DISABLED", strconv.FormatBool(disabled)) + t.Cleanup(config.Reset) +} +func testContainerStart(t *testing.T) { + t.Helper() ctx := context.Background() ctr, err := GenericContainer(ctx, GenericContainerRequest{ @@ -131,59 +118,55 @@ func TestContainerStartsWithoutTheReaper(t *testing.T) { }) CleanupContainer(t, ctr) require.NoError(t, err) +} - sessionID := core.SessionID() +// testReaperRunning validates that a reaper is running. +func testReaperRunning(t *testing.T) { + t.Helper() - reaperContainer, err := lookUpReaperContainer(ctx, sessionID) - if err != nil { - t.Fatal(err, "expected reaper container not found.") - } - if reaperContainer != nil { - t.Fatal("expected zero reaper running.") - } + ctx := context.Background() + sessionID := core.SessionID() + reaperContainer, err := spawner.lookupContainer(ctx, sessionID) + require.NoError(t, err) + require.NotNil(t, reaperContainer) } -func TestContainerStartsWithTheReaper(t *testing.T) { - config.Reset() // reset the config using the internal method to avoid the sync.Once - tcConfig := config.Read() - if tcConfig.RyukDisabled { - t.Skip("Ryuk is disabled, skipping test") - } +func TestContainer(t *testing.T) { + reaperDisable(t, false) - ctx := context.Background() + t.Run("start/reaper-enabled", func(t *testing.T) { + testContainerStart(t) + testReaperRunning(t) + }) - c, err := GenericContainer(ctx, GenericContainerRequest{ - ProviderType: providerType, - ContainerRequest: ContainerRequest{ - Image: nginxAlpineImage, - ExposedPorts: []string{ - nginxDefaultPort, - }, - }, - Started: true, + t.Run("stop/reaper-enabled", func(t *testing.T) { + testContainerStop(t) + testReaperRunning(t) }) - CleanupContainer(t, c) - if err != nil { - t.Fatal(err) - } - sessionID := core.SessionID() + t.Run("terminate/reaper-enabled", func(t *testing.T) { + testContainerTerminate(t) + testReaperRunning(t) + }) - reaperContainer, err := lookUpReaperContainer(ctx, sessionID) - if err != nil { - t.Fatal(err, "expected reaper container running.") - } - if reaperContainer == nil { - t.Fatal("expected one reaper to be running.") - } + reaperDisable(t, true) + + t.Run("start/reaper-disabled", func(t *testing.T) { + testContainerStart(t) + }) + + t.Run("stop/reaper-disabled", func(t *testing.T) { + testContainerStop(t) + }) + + t.Run("terminate/reaper-disabled", func(t *testing.T) { + testContainerTerminate(t) + }) } -func TestContainerStopWithReaper(t *testing.T) { - config.Reset() // reset the config using the internal method to avoid the sync.Once - tcConfig := config.Read() - if tcConfig.RyukDisabled { - t.Skip("Ryuk is disabled, skipping test") - } +// testContainerStop tests stopping a container. +func testContainerStop(t *testing.T) { + t.Helper() ctx := context.Background() @@ -201,37 +184,21 @@ func TestContainerStopWithReaper(t *testing.T) { require.NoError(t, err) state, err := nginxA.State(ctx) - if err != nil { - t.Fatal(err) - } - if state.Running != true { - t.Fatal("The container shoud be in running state") - } + require.NoError(t, err) + require.True(t, state.Running) + stopTimeout := 10 * time.Second err = nginxA.Stop(ctx, &stopTimeout) - if err != nil { - t.Fatal(err) - } + require.NoError(t, err) state, err = nginxA.State(ctx) - if err != nil { - t.Fatal(err) - } - if state.Running != false { - t.Fatal("The container shoud not be running") - } - if state.Status != "exited" { - t.Fatal("The container shoud be in exited state") - } + require.NoError(t, err) + require.False(t, state.Running) + require.Equal(t, "exited", state.Status) } -func TestContainerTerminationWithReaper(t *testing.T) { - config.Reset() // reset the config using the internal method to avoid the sync.Once - tcConfig := config.Read() - if tcConfig.RyukDisabled { - t.Skip("Ryuk is disabled, skipping test") - } - +// testContainerTerminate tests terminating a container. +func testContainerTerminate(t *testing.T) { ctx := context.Background() nginxA, err := GenericContainer(ctx, GenericContainerRequest{ @@ -258,324 +225,274 @@ func TestContainerTerminationWithReaper(t *testing.T) { require.Error(t, err) } -func TestContainerTerminationWithoutReaper(t *testing.T) { - config.Reset() // reset the config using the internal method to avoid the sync.Once - tcConfig := config.Read() - if !tcConfig.RyukDisabled { - t.Skip("Ryuk is enabled, skipping test") - } +func Test_NewReaper(t *testing.T) { + reaperDisable(t, false) ctx := context.Background() - nginxA, err := GenericContainer(ctx, GenericContainerRequest{ - ProviderType: providerType, - ContainerRequest: ContainerRequest{ - Image: nginxAlpineImage, - ExposedPorts: []string{ - nginxDefaultPort, + t.Run("non-privileged", func(t *testing.T) { + testNewReaper(ctx, t, + config.Config{ + RyukConnectionTimeout: time.Minute, + RyukReconnectionTimeout: 10 * time.Second, }, - }, - Started: true, + expectedReaperRequest(), + ) }) - CleanupContainer(t, nginxA) - if err != nil { - t.Fatal(err) - } - - state, err := nginxA.State(ctx) - if err != nil { - t.Fatal(err) - } - if state.Running != true { - t.Fatal("The container shoud be in running state") - } - err = nginxA.Terminate(ctx) - if err != nil { - t.Fatal(err) - } - - _, err = nginxA.State(ctx) - if err == nil { - t.Fatal("expected error from container inspect.") - } -} - -func Test_NewReaper(t *testing.T) { - config.Reset() // reset the config using the internal method to avoid the sync.Once - tcConfig := config.Read() - if tcConfig.RyukDisabled { - t.Skip("Ryuk is disabled, skipping test") - } - - type cases struct { - name string - req ContainerRequest - config TestcontainersConfig - ctx context.Context - env map[string]string - } - tests := []cases{ - { - name: "non-privileged", - req: createContainerRequest(nil), - config: TestcontainersConfig{Config: config.Config{ - RyukConnectionTimeout: time.Minute, - RyukReconnectionTimeout: 10 * time.Second, - }}, - }, - { - name: "privileged", - req: createContainerRequest(func(req ContainerRequest) ContainerRequest { - req.Privileged = true - return req - }), - config: TestcontainersConfig{Config: config.Config{ + t.Run("privileged", func(t *testing.T) { + testNewReaper(ctx, t, + config.Config{ RyukPrivileged: true, RyukConnectionTimeout: time.Minute, RyukReconnectionTimeout: 10 * time.Second, - }}, - }, - { - name: "configured non-default timeouts", - req: createContainerRequest(func(req ContainerRequest) ContainerRequest { - req.Env = map[string]string{ - "RYUK_CONNECTION_TIMEOUT": "1m0s", - "RYUK_RECONNECTION_TIMEOUT": "10m0s", - } - return req - }), - config: TestcontainersConfig{Config: config.Config{ + }, + expectedReaperRequest(), + ) + }) + + t.Run("custom-timeouts", func(t *testing.T) { + testNewReaper(ctx, t, + config.Config{ RyukPrivileged: true, - RyukConnectionTimeout: time.Minute, - RyukReconnectionTimeout: 10 * time.Minute, - }}, - }, - { - name: "configured verbose mode", - req: createContainerRequest(func(req ContainerRequest) ContainerRequest { + RyukConnectionTimeout: 2 * time.Minute, + RyukReconnectionTimeout: 20 * time.Second, + }, + expectedReaperRequest(func(req *ContainerRequest) { req.Env = map[string]string{ - "RYUK_VERBOSE": "true", + "RYUK_CONNECTION_TIMEOUT": "2m0s", + "RYUK_RECONNECTION_TIMEOUT": "20s", } - return req }), - config: TestcontainersConfig{Config: config.Config{ + ) + }) + + t.Run("verbose", func(t *testing.T) { + testNewReaper(ctx, t, + config.Config{ RyukPrivileged: true, RyukVerbose: true, - }}, - }, - { - name: "docker-host in context", - req: createContainerRequest(func(req ContainerRequest) ContainerRequest { - req.HostConfigModifier = func(hostConfig *container.HostConfig) { - hostConfig.Binds = []string{core.MustExtractDockerSocket(context.Background()) + ":/var/run/docker.sock"} + }, + expectedReaperRequest(func(req *ContainerRequest) { + req.Env = map[string]string{ + "RYUK_VERBOSE": "true", } - return req }), - config: TestcontainersConfig{Config: config.Config{ + ) + }) + + t.Run("docker-host", func(t *testing.T) { + testNewReaper(context.WithValue(ctx, core.DockerHostContextKey, core.DockerSocketPathWithSchema), t, + config.Config{ RyukConnectionTimeout: time.Minute, RyukReconnectionTimeout: 10 * time.Second, - }}, - ctx: context.WithValue(context.TODO(), core.DockerHostContextKey, core.DockerSocketPathWithSchema), - }, - { - name: "Reaper including custom Hub prefix", - req: createContainerRequest(func(req ContainerRequest) ContainerRequest { - req.Image = config.ReaperDefaultImage - req.Privileged = true - return req + }, + expectedReaperRequest(func(req *ContainerRequest) { + req.HostConfigModifier = func(hostConfig *container.HostConfig) { + hostConfig.Binds = []string{core.MustExtractDockerSocket(ctx) + ":/var/run/docker.sock"} + } }), - config: TestcontainersConfig{Config: config.Config{ + ) + }) + + t.Run("hub-prefix", func(t *testing.T) { + testNewReaper(context.WithValue(ctx, core.DockerHostContextKey, core.DockerSocketPathWithSchema), t, + config.Config{ HubImageNamePrefix: "registry.mycompany.com/mirror", RyukPrivileged: true, RyukConnectionTimeout: time.Minute, RyukReconnectionTimeout: 10 * time.Second, - }}, - }, - { - name: "Reaper including custom Hub prefix as env var", - req: createContainerRequest(func(req ContainerRequest) ContainerRequest { + }, + expectedReaperRequest(func(req *ContainerRequest) { req.Image = config.ReaperDefaultImage req.Privileged = true - return req }), - config: TestcontainersConfig{Config: config.Config{ + ) + }) + + t.Run("hub-prefix-env", func(t *testing.T) { + config.Reset() + t.Cleanup(config.Reset) + + t.Setenv("TESTCONTAINERS_HUB_IMAGE_NAME_PREFIX", "registry.mycompany.com/mirror") + testNewReaper(context.WithValue(ctx, core.DockerHostContextKey, core.DockerSocketPathWithSchema), t, + config.Config{ RyukPrivileged: true, RyukConnectionTimeout: time.Minute, RyukReconnectionTimeout: 10 * time.Second, - }}, - env: map[string]string{ - "TESTCONTAINERS_HUB_IMAGE_NAME_PREFIX": "registry.mycompany.com/mirror", }, - }, - } + expectedReaperRequest(func(req *ContainerRequest) { + req.Image = config.ReaperDefaultImage + req.Privileged = true + }), + ) + }) +} - for _, test := range tests { - t.Run(test.name, func(t *testing.T) { - if test.env != nil { - config.Reset() // reset the config using the internal method to avoid the sync.Once - for k, v := range test.env { - t.Setenv(k, v) - } - } +func testNewReaper(ctx context.Context, t *testing.T, cfg config.Config, expected ContainerRequest) { + t.Helper() - if prefix := os.Getenv("TESTCONTAINERS_HUB_IMAGE_NAME_PREFIX"); prefix != "" { - test.config.Config.HubImageNamePrefix = prefix - } - - provider := newMockReaperProvider(t) - provider.config = test.config - t.Cleanup(provider.RestoreReaperState) + if prefix := os.Getenv("TESTCONTAINERS_HUB_IMAGE_NAME_PREFIX"); prefix != "" { + cfg.HubImageNamePrefix = prefix + } - if test.ctx == nil { - test.ctx = context.TODO() - } + provider := newMockReaperProvider(cfg) - _, err := reuseOrCreateReaper(test.ctx, testSessionID, provider) - // we should have errored out see mockReaperProvider.RunContainer - require.EqualError(t, err, "expected") + // We need a new reaperSpawner for each test case to avoid reusing + // an existing reaper instance. + spawner := &reaperSpawner{} + reaper, err := spawner.reaper(ctx, testSessionID, provider) + cleanupReaper(t, reaper, spawner) + // We should have errored out see mockReaperProvider.RunContainer. + require.ErrorIs(t, err, errExpected) - assert.Equal(t, test.req.Image, provider.req.Image, "expected image doesn't match the submitted request") - assert.Equal(t, test.req.ExposedPorts, provider.req.ExposedPorts, "expected exposed ports don't match the submitted request") - assert.Equal(t, test.req.Labels, provider.req.Labels, "expected labels don't match the submitted request") - assert.Equal(t, test.req.Mounts, provider.req.Mounts, "expected mounts don't match the submitted request") - assert.Equal(t, test.req.WaitingFor, provider.req.WaitingFor, "expected waitingFor don't match the submitted request") - assert.Equal(t, test.req.Env, provider.req.Env, "expected env doesn't match the submitted request") + require.Equal(t, expected.Image, provider.req.Image, "expected image doesn't match the submitted request") + require.Equal(t, expected.ExposedPorts, provider.req.ExposedPorts, "expected exposed ports don't match the submitted request") + require.Equal(t, expected.Labels, provider.req.Labels, "expected labels don't match the submitted request") + require.Equal(t, expected.Mounts, provider.req.Mounts, "expected mounts don't match the submitted request") + require.Equal(t, expected.WaitingFor, provider.req.WaitingFor, "expected waitingFor don't match the submitted request") + require.Equal(t, expected.Env, provider.req.Env, "expected env doesn't match the submitted request") - // checks for reaper's preCreationCallback fields - assert.Equal(t, container.NetworkMode(Bridge), provider.hostConfig.NetworkMode, "expected networkMode doesn't match the submitted request") - assert.True(t, provider.hostConfig.AutoRemove, "expected networkMode doesn't match the submitted request") - }) - } + // checks for reaper's preCreationCallback fields + require.Equal(t, container.NetworkMode(Bridge), provider.hostConfig.NetworkMode, "expected networkMode doesn't match the submitted request") + require.True(t, provider.hostConfig.AutoRemove, "expected networkMode doesn't match the submitted request") } func Test_ReaperReusedIfHealthy(t *testing.T) { - config.Reset() // reset the config using the internal method to avoid the sync.Once - tcConfig := config.Read() - if tcConfig.RyukDisabled { - t.Skip("Ryuk is disabled, skipping test") - } - - testProvider := newMockReaperProvider(t) - t.Cleanup(testProvider.RestoreReaperState) + reaperDisable(t, false) SkipIfProviderIsNotHealthy(t) ctx := context.Background() // As other integration tests run with the (shared) Reaper as well, re-use the instance to not interrupt other tests - wasReaperRunning := reaperInstance != nil + if spawner.instance != nil { + t.Cleanup(func() { + require.NoError(t, spawner.cleanup()) + }) + } + + provider, err := ProviderDocker.GetProvider() + require.NoError(t, err) - provider, _ := ProviderDocker.GetProvider() - reaper, err := reuseOrCreateReaper(context.WithValue(ctx, core.DockerHostContextKey, provider.(*DockerProvider).host), testSessionID, provider) + reaper, err := spawner.reaper(context.WithValue(ctx, core.DockerHostContextKey, provider.(*DockerProvider).host), testSessionID, provider) + cleanupReaper(t, reaper, spawner) require.NoError(t, err, "creating the Reaper should not error") - reaperReused, err := reuseOrCreateReaper(context.WithValue(ctx, core.DockerHostContextKey, provider.(*DockerProvider).host), testSessionID, provider) + reaperReused, err := spawner.reaper(context.WithValue(ctx, core.DockerHostContextKey, provider.(*DockerProvider).host), testSessionID, provider) + cleanupReaper(t, reaper, spawner) require.NoError(t, err, "reusing the Reaper should not error") - // assert that the internal state of both reaper instances is the same - assert.Equal(t, reaper.SessionID, reaperReused.SessionID, "expecting the same SessionID") - assert.Equal(t, reaper.Endpoint, reaperReused.Endpoint, "expecting the same reaper endpoint") - assert.Equal(t, reaper.Provider, reaperReused.Provider, "expecting the same container provider") - assert.Equal(t, reaper.container.GetContainerID(), reaperReused.container.GetContainerID(), "expecting the same container ID") - assert.Equal(t, reaper.container.SessionID(), reaperReused.container.SessionID(), "expecting the same session ID") - - terminate, err := reaper.Connect() - defer func(term chan bool) { - term <- true - }(terminate) - require.NoError(t, err, "connecting to Reaper should be successful") - if !wasReaperRunning { - CleanupContainer(t, reaper.container) - } + // Ensure the internal state of both reaper instances is the same + require.Equal(t, reaper.SessionID, reaperReused.SessionID, "expecting the same SessionID") + require.Equal(t, reaper.Endpoint, reaperReused.Endpoint, "expecting the same reaper endpoint") + require.Equal(t, reaper.Provider, reaperReused.Provider, "expecting the same container provider") + require.Equal(t, reaper.container.GetContainerID(), reaperReused.container.GetContainerID(), "expecting the same container ID") + require.Equal(t, reaper.container.SessionID(), reaperReused.container.SessionID(), "expecting the same session ID") + + termSignal, err := reaper.Connect() + cleanupTermSignal(t, termSignal) + require.NoError(t, err, "connecting to Reaper should be successful") } func Test_RecreateReaperIfTerminated(t *testing.T) { - config.Reset() // reset the config using the internal method to avoid the sync.Once - tcConfig := config.Read() - if tcConfig.RyukDisabled { - t.Skip("Ryuk is disabled, skipping test") - } - - mockProvider := newMockReaperProvider(t) - t.Cleanup(mockProvider.RestoreReaperState) + reaperDisable(t, false) SkipIfProviderIsNotHealthy(t) - provider, _ := ProviderDocker.GetProvider() + provider, err := ProviderDocker.GetProvider() + require.NoError(t, err) + ctx := context.Background() - reaper, err := reuseOrCreateReaper(context.WithValue(ctx, core.DockerHostContextKey, provider.(*DockerProvider).host), testSessionID, provider) + reaper, err := spawner.reaper(context.WithValue(ctx, core.DockerHostContextKey, provider.(*DockerProvider).host), testSessionID, provider) + cleanupReaper(t, reaper, spawner) require.NoError(t, err, "creating the Reaper should not error") - terminate, err := reaper.Connect() - require.NoError(t, err, "connecting to Reaper should be successful") - terminate <- true + termSignal, err := reaper.Connect() + if termSignal != nil { + termSignal <- true + } + require.NoError(t, err) - // Wait for ryuk's default timeout (10s) + 1s to allow for a graceful shutdown/cleanup of the container. - time.Sleep(11 * time.Second) + // Wait for up to ryuk's default reconnect timeout + 1s to allow for a graceful shutdown/cleanup of the container. + timeout := time.NewTimer(time.Second * 11) + t.Cleanup(func() { + timeout.Stop() + }) + for { + state, err := reaper.container.State(ctx) + if err != nil { + if errdefs.IsNotFound(err) { + break + } + require.NoError(t, err) + } + + if !state.Running { + break + } + + select { + case <-timeout.C: + t.Fatal("reaper container should have been terminated") + default: + } - recreatedReaper, err := reuseOrCreateReaper(context.WithValue(ctx, core.DockerHostContextKey, provider.(*DockerProvider).host), testSessionID, provider) + time.Sleep(time.Millisecond * 100) + } + + recreatedReaper, err := spawner.reaper(context.WithValue(ctx, core.DockerHostContextKey, provider.(*DockerProvider).host), testSessionID, provider) + cleanupReaper(t, recreatedReaper, spawner) require.NoError(t, err, "creating the Reaper should not error") - assert.NotEqual(t, reaper.container.GetContainerID(), recreatedReaper.container.GetContainerID(), "expected different container ID") + require.NotEqual(t, reaper.container.GetContainerID(), recreatedReaper.container.GetContainerID(), "expected different container ID") - terminate, err = recreatedReaper.Connect() - defer func(term chan bool) { - term <- true - }(terminate) + recreatedTermSignal, err := recreatedReaper.Connect() + cleanupTermSignal(t, recreatedTermSignal) require.NoError(t, err, "connecting to Reaper should be successful") - CleanupContainer(t, recreatedReaper.container) } func TestReaper_reuseItFromOtherTestProgramUsingDocker(t *testing.T) { - config.Reset() // reset the config using the internal method to avoid the sync.Once - tcConfig := config.Read() - if tcConfig.RyukDisabled { - t.Skip("Ryuk is disabled, skipping test") - } + reaperDisable(t, false) - mockProvider := &mockReaperProvider{ - initialReaper: reaperInstance, - //nolint:govet - initialReaperOnce: reaperOnce, - t: t, - } - t.Cleanup(mockProvider.RestoreReaperState) - - // explicitly set the reaperInstance to nil to simulate another test program in the same session accessing the same reaper - reaperInstance = nil - reaperOnce = sync.Once{} + // Explicitly set the reaper instance to nil to simulate another test + // program in the same session accessing the same reaper. + spawner.instance = nil SkipIfProviderIsNotHealthy(t) ctx := context.Background() - // As other integration tests run with the (shared) Reaper as well, re-use the instance to not interrupt other tests - wasReaperRunning := reaperInstance != nil + // As other integration tests run with the (shared) Reaper as well, + // re-use the instance to not interrupt other tests. + if spawner.instance != nil { + t.Cleanup(func() { + require.NoError(t, spawner.cleanup()) + }) + } - provider, _ := ProviderDocker.GetProvider() - reaper, err := reuseOrCreateReaper(context.WithValue(ctx, core.DockerHostContextKey, provider.(*DockerProvider).host), testSessionID, provider) + provider, err := ProviderDocker.GetProvider() + require.NoError(t, err) + + reaper, err := spawner.reaper(context.WithValue(ctx, core.DockerHostContextKey, provider.(*DockerProvider).host), testSessionID, provider) + cleanupReaper(t, reaper, spawner) require.NoError(t, err, "creating the Reaper should not error") - // explicitly reset the reaperInstance to nil to simulate another test program in the same session accessing the same reaper - reaperInstance = nil - reaperOnce = sync.Once{} + // Explicitly reset the reaper instance to nil to simulate another test + // program in the same session accessing the same reaper. + spawner.instance = nil - reaperReused, err := reuseOrCreateReaper(context.WithValue(ctx, core.DockerHostContextKey, provider.(*DockerProvider).host), testSessionID, provider) + reaperReused, err := spawner.reaper(context.WithValue(ctx, core.DockerHostContextKey, provider.(*DockerProvider).host), testSessionID, provider) + cleanupReaper(t, reaper, spawner) require.NoError(t, err, "reusing the Reaper should not error") - // assert that the internal state of both reaper instances is the same - assert.Equal(t, reaper.SessionID, reaperReused.SessionID, "expecting the same SessionID") - assert.Equal(t, reaper.Endpoint, reaperReused.Endpoint, "expecting the same reaper endpoint") - assert.Equal(t, reaper.Provider, reaperReused.Provider, "expecting the same container provider") - assert.Equal(t, reaper.container.GetContainerID(), reaperReused.container.GetContainerID(), "expecting the same container ID") - assert.Equal(t, reaper.container.SessionID(), reaperReused.container.SessionID(), "expecting the same session ID") - - terminate, err := reaper.Connect() - defer func(term chan bool) { - term <- true - }(terminate) - require.NoError(t, err, "connecting to Reaper should be successful") - if !wasReaperRunning { - CleanupContainer(t, reaper.container) - } + // Ensure that the internal state of both reaper instances is the same. + require.Equal(t, reaper.SessionID, reaperReused.SessionID, "expecting the same SessionID") + require.Equal(t, reaper.Endpoint, reaperReused.Endpoint, "expecting the same reaper endpoint") + require.Equal(t, reaper.Provider, reaperReused.Provider, "expecting the same container provider") + require.Equal(t, reaper.container.GetContainerID(), reaperReused.container.GetContainerID(), "expecting the same container ID") + require.Equal(t, reaper.container.SessionID(), reaperReused.container.SessionID(), "expecting the same session ID") + + termSignal, err := reaper.Connect() + cleanupTermSignal(t, termSignal) + require.NoError(t, err, "connecting to Reaper should be successful") } // TestReaper_ReuseRunning tests whether reusing the reaper if using @@ -586,15 +503,11 @@ func TestReaper_reuseItFromOtherTestProgramUsingDocker(t *testing.T) { // already running for the same session id by returning its container instance // instead. func TestReaper_ReuseRunning(t *testing.T) { - config.Reset() // reset the config using the internal method to avoid the sync.Once - tcConfig := config.Read() - if tcConfig.RyukDisabled { - t.Skip("Ryuk is disabled, skipping test") - } + reaperDisable(t, false) const concurrency = 64 - timeout, cancel := context.WithTimeout(context.Background(), 5*time.Minute) + timeout, cancel := context.WithTimeout(context.Background(), time.Minute) defer cancel() sessionID := SessionID() @@ -605,27 +518,54 @@ func TestReaper_ReuseRunning(t *testing.T) { obtainedReaperContainerIDs := make([]string, concurrency) var wg sync.WaitGroup for i := 0; i < concurrency; i++ { - i := i wg.Add(1) - go func() { + go func(i int) { defer wg.Done() - reaperContainer, err := lookUpReaperContainer(timeout, sessionID) - if err == nil && reaperContainer != nil { - // Found. - obtainedReaperContainerIDs[i] = reaperContainer.GetContainerID() - return - } - // Not found -> create. - createdReaper, err := newReaper(timeout, sessionID, dockerProvider) - require.NoError(t, err, "new reaper should not fail") - obtainedReaperContainerIDs[i] = createdReaper.container.GetContainerID() - }() + spawner := &reaperSpawner{} + reaper, err := spawner.reaper(timeout, sessionID, dockerProvider) + cleanupReaper(t, reaper, spawner) + require.NoError(t, err) + + obtainedReaperContainerIDs[i] = reaper.container.GetContainerID() + }(i) } wg.Wait() // Assure that all calls returned the same container. firstContainerID := obtainedReaperContainerIDs[0] for i, containerID := range obtainedReaperContainerIDs { - assert.Equal(t, firstContainerID, containerID, "call %d should have returned same container id", i) + require.Equal(t, firstContainerID, containerID, "call %d should have returned same container id", i) + } +} + +func TestSpawnerBackoff(t *testing.T) { + b := spawner.backoff() + for i := 0; i < 100; i++ { + require.LessOrEqual(t, b.NextBackOff(), time.Millisecond*250, "backoff should not exceed max interval") + } +} + +// cleanupReaper schedules reaper for cleanup if it's not nil. +func cleanupReaper(t *testing.T, reaper *Reaper, spawner *reaperSpawner) { + t.Helper() + + if reaper == nil { + return } + + t.Cleanup(func() { + reaper.close() + require.NoError(t, spawner.cleanup()) + }) +} + +// cleanupTermSignal ensures that termSignal +func cleanupTermSignal(t *testing.T, termSignal chan bool) { + t.Helper() + + t.Cleanup(func() { + if termSignal != nil { + termSignal <- true + } + }) } diff --git a/testing.go b/testing.go index 41391519de..cafd4fe920 100644 --- a/testing.go +++ b/testing.go @@ -68,11 +68,11 @@ func (lc *StdoutLogConsumer) Accept(l Log) { // container is stopped when the function ends. // // before any error check. If container is nil, its a no-op. -func CleanupContainer(tb testing.TB, container Container, options ...TerminateOption) { +func CleanupContainer(tb testing.TB, ctr Container, options ...TerminateOption) { tb.Helper() tb.Cleanup(func() { - noErrorOrIgnored(tb, TerminateContainer(container, options...)) + noErrorOrIgnored(tb, TerminateContainer(ctr, options...)) }) }