Skip to content

Commit

Permalink
Merge pull request #10209 from ndeloof/wait_containers
Browse files Browse the repository at this point in the history
use containers we expect to start for wait condition
  • Loading branch information
glours authored Feb 7, 2023
2 parents e908f41 + 52478f0 commit 9a4e74c
Show file tree
Hide file tree
Showing 5 changed files with 53 additions and 65 deletions.
73 changes: 24 additions & 49 deletions pkg/compose/convergence.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import (
"github.com/containerd/containerd/platforms"
moby "github.com/docker/docker/api/types"
containerType "github.com/docker/docker/api/types/container"
"github.com/docker/docker/api/types/filters"
"github.com/docker/docker/api/types/network"
specs "github.com/opencontainers/image-spec/specs-go/v1"
"github.com/pkg/errors"
Expand Down Expand Up @@ -281,7 +280,7 @@ func containerEvents(containers Containers, eventFunc func(string) progress.Even
// ServiceConditionRunningOrHealthy is a service condition on status running or healthy
const ServiceConditionRunningOrHealthy = "running_or_healthy"

func (s *composeService) waitDependencies(ctx context.Context, project *types.Project, dependencies types.DependsOnConfig) error {
func (s *composeService) waitDependencies(ctx context.Context, project *types.Project, dependencies types.DependsOnConfig, containers Containers) error {
eg, _ := errgroup.WithContext(ctx)
w := progress.ContextWriter(ctx)
for dep, config := range dependencies {
Expand All @@ -291,11 +290,8 @@ func (s *composeService) waitDependencies(ctx context.Context, project *types.Pr
continue
}

containers, err := s.getContainers(ctx, project.Name, oneOffExclude, false, dep)
if err != nil {
return err
}
w.Events(containerEvents(containers, progress.Waiting))
waitingFor := containers.filter(isService(dep))
w.Events(containerEvents(waitingFor, progress.Waiting))

dep, config := dep, config
eg.Go(func() error {
Expand All @@ -305,31 +301,31 @@ func (s *composeService) waitDependencies(ctx context.Context, project *types.Pr
<-ticker.C
switch config.Condition {
case ServiceConditionRunningOrHealthy:
healthy, err := s.isServiceHealthy(ctx, project, dep, true)
healthy, err := s.isServiceHealthy(ctx, waitingFor, true)
if err != nil {
return err
}
if healthy {
w.Events(containerEvents(containers, progress.Healthy))
w.Events(containerEvents(waitingFor, progress.Healthy))
return nil
}
case types.ServiceConditionHealthy:
healthy, err := s.isServiceHealthy(ctx, project, dep, false)
healthy, err := s.isServiceHealthy(ctx, waitingFor, false)
if err != nil {
w.Events(containerEvents(containers, progress.ErrorEvent))
w.Events(containerEvents(waitingFor, progress.ErrorEvent))
return errors.Wrap(err, "dependency failed to start")
}
if healthy {
w.Events(containerEvents(containers, progress.Healthy))
w.Events(containerEvents(waitingFor, progress.Healthy))
return nil
}
case types.ServiceConditionCompletedSuccessfully:
exited, code, err := s.isServiceCompleted(ctx, project, dep)
exited, code, err := s.isServiceCompleted(ctx, waitingFor)
if err != nil {
return err
}
if exited {
w.Events(containerEvents(containers, progress.Exited))
w.Events(containerEvents(waitingFor, progress.Exited))
if code != 0 {
return fmt.Errorf("service %q didn't complete successfully: exit %d", dep, code)
}
Expand Down Expand Up @@ -650,51 +646,41 @@ func (s *composeService) connectContainerToNetwork(ctx context.Context, id strin
return nil
}

func (s *composeService) isServiceHealthy(ctx context.Context, project *types.Project, service string, fallbackRunning bool) (bool, error) {
containers, err := s.getContainers(ctx, project.Name, oneOffExclude, true, service)
if err != nil {
return false, err
}

if len(containers) == 0 {
return false, nil
}
func (s *composeService) isServiceHealthy(ctx context.Context, containers Containers, fallbackRunning bool) (bool, error) {
for _, c := range containers {
container, err := s.apiClient().ContainerInspect(ctx, c.ID)
if err != nil {
return false, err
}
name := container.Name[1:]

if container.State.Status == "exited" {
return false, fmt.Errorf("container %s exited (%d)", name, container.State.ExitCode)
}

if container.Config.Healthcheck == nil && fallbackRunning {
// Container does not define a health check, but we can fall back to "running" state
return container.State != nil && container.State.Status == "running", nil
}

if container.State.Status == "exited" {
return false, fmt.Errorf("container for service %q exited (%d)", service, container.State.ExitCode)
}

if container.State == nil || container.State.Health == nil {
return false, fmt.Errorf("container for service %q has no healthcheck configured", service)
return false, fmt.Errorf("container %s has no healthcheck configured", name)
}
switch container.State.Health.Status {
case moby.Healthy:
// Continue by checking the next container.
case moby.Unhealthy:
return false, fmt.Errorf("container for service %q is unhealthy", service)
return false, fmt.Errorf("container %s is unhealthy", name)
case moby.Starting:
return false, nil
default:
return false, fmt.Errorf("container for service %q had unexpected health status %q", service, container.State.Health.Status)
return false, fmt.Errorf("container %s had unexpected health status %q", name, container.State.Health.Status)
}
}
return true, nil
}

func (s *composeService) isServiceCompleted(ctx context.Context, project *types.Project, dep string) (bool, int, error) {
containers, err := s.getContainers(ctx, project.Name, oneOffExclude, true, dep)
if err != nil {
return false, 0, err
}
func (s *composeService) isServiceCompleted(ctx context.Context, containers Containers) (bool, int, error) {
for _, c := range containers {
container, err := s.apiClient().ContainerInspect(ctx, c.ID)
if err != nil {
Expand All @@ -707,23 +693,12 @@ func (s *composeService) isServiceCompleted(ctx context.Context, project *types.
return false, 0, nil
}

func (s *composeService) startService(ctx context.Context, project *types.Project, service types.ServiceConfig) error {
func (s *composeService) startService(ctx context.Context, project *types.Project, service types.ServiceConfig, containers Containers) error {
if service.Deploy != nil && service.Deploy.Replicas != nil && *service.Deploy.Replicas == 0 {
return nil
}

err := s.waitDependencies(ctx, project, service.DependsOn)
if err != nil {
return err
}
containers, err := s.apiClient().ContainerList(ctx, moby.ContainerListOptions{
Filters: filters.NewArgs(
projectFilter(project.Name),
serviceFilter(service.Name),
oneOffFilter(false),
),
All: true,
})
err := s.waitDependencies(ctx, project, service.DependsOn, containers)
if err != nil {
return err
}
Expand All @@ -736,7 +711,7 @@ func (s *composeService) startService(ctx context.Context, project *types.Projec
}

w := progress.ContextWriter(ctx)
for _, container := range containers {
for _, container := range containers.filter(isService(service.Name)) {
if container.State == ContainerRunning {
continue
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/compose/convergence_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,7 @@ func TestWaitDependencies(t *testing.T) {
"db": {Condition: ServiceConditionRunningOrHealthy},
"redis": {Condition: ServiceConditionRunningOrHealthy},
}
assert.NilError(t, tested.waitDependencies(context.Background(), &project, dependencies))
assert.NilError(t, tested.waitDependencies(context.Background(), &project, dependencies, nil))
})
t.Run("should skip dependencies with condition service_started", func(t *testing.T) {
dbService := types.ServiceConfig{Name: "db", Scale: 1}
Expand All @@ -239,6 +239,6 @@ func TestWaitDependencies(t *testing.T) {
"db": {Condition: types.ServiceConditionStarted},
"redis": {Condition: types.ServiceConditionStarted},
}
assert.NilError(t, tested.waitDependencies(context.Background(), &project, dependencies))
assert.NilError(t, tested.waitDependencies(context.Background(), &project, dependencies, nil))
})
}
10 changes: 5 additions & 5 deletions pkg/compose/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,18 +76,18 @@ func (s *composeService) prepareRun(ctx context.Context, project *types.Project,
if err := s.ensureImagesExists(ctx, project, opts.QuietPull); err != nil { // all dependencies already checked, but might miss service img
return "", err
}
if !opts.NoDeps {
if err := s.waitDependencies(ctx, project, service.DependsOn); err != nil {
return "", err
}
}

observedState, err := s.getContainers(ctx, project.Name, oneOffInclude, true)
if err != nil {
return "", err
}
updateServices(&service, observedState)

if !opts.NoDeps {
if err := s.waitDependencies(ctx, project, service.DependsOn, observedState); err != nil {
return "", err
}
}
created, err := s.createContainer(ctx, project, service, service.ContainerName, 1,
opts.AutoRemove, opts.UseNetworkAliases, opts.Interactive)
if err != nil {
Expand Down
27 changes: 20 additions & 7 deletions pkg/compose/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,15 @@ import (
"strings"
"time"

"github.com/compose-spec/compose-go/types"
"github.com/docker/compose/v2/pkg/api"
"github.com/docker/compose/v2/pkg/progress"
"github.com/docker/compose/v2/pkg/utils"

"github.com/compose-spec/compose-go/types"
moby "github.com/docker/docker/api/types"
"github.com/docker/docker/api/types/filters"
"github.com/pkg/errors"
"golang.org/x/sync/errgroup"

"github.com/docker/compose/v2/pkg/api"
"github.com/docker/compose/v2/pkg/progress"
)

func (s *composeService) Start(ctx context.Context, projectName string, options api.StartOptions) error {
Expand Down Expand Up @@ -75,13 +76,25 @@ func (s *composeService) start(ctx context.Context, projectName string, options
})
}

err := InDependencyOrder(ctx, project, func(c context.Context, name string) error {
var containers Containers
containers, err := s.apiClient().ContainerList(ctx, moby.ContainerListOptions{
Filters: filters.NewArgs(
projectFilter(project.Name),
oneOffFilter(false),
),
All: true,
})
if err != nil {
return err
}

err = InDependencyOrder(ctx, project, func(c context.Context, name string) error {
service, err := project.GetService(name)
if err != nil {
return err
}

return s.startService(ctx, project, service)
return s.startService(ctx, project, service, containers)
})
if err != nil {
return err
Expand All @@ -94,7 +107,7 @@ func (s *composeService) start(ctx context.Context, projectName string, options
Condition: getDependencyCondition(s, project),
}
}
err = s.waitDependencies(ctx, project, depends)
err = s.waitDependencies(ctx, project, depends, containers)
if err != nil {
return err
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/e2e/up_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func TestUpServiceUnhealthy(t *testing.T) {
const projectName = "e2e-start-fail"

res := c.RunDockerComposeCmdNoCheck(t, "-f", "fixtures/start-fail/compose.yaml", "--project-name", projectName, "up", "-d")
res.Assert(t, icmd.Expected{ExitCode: 1, Err: `container for service "fail" is unhealthy`})
res.Assert(t, icmd.Expected{ExitCode: 1, Err: `container e2e-start-fail-fail-1 is unhealthy`})

c.RunDockerComposeCmd(t, "--project-name", projectName, "down")
}
Expand Down Expand Up @@ -135,6 +135,6 @@ func TestUpWithDependencyExit(t *testing.T) {
c.RunDockerComposeCmd(t, "--project-name", "dependencies", "down")
})

res.Assert(t, icmd.Expected{ExitCode: 1, Err: "dependency failed to start: container for service \"db\" exited (1)"})
res.Assert(t, icmd.Expected{ExitCode: 1, Err: "dependency failed to start: container dependencies-db-1 exited (1)"})
})
}

0 comments on commit 9a4e74c

Please sign in to comment.