Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move pod jobs to parallel execution #7382

Merged
merged 2 commits into from
Oct 7, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
241 changes: 108 additions & 133 deletions libpod/pod_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"github.com/containers/podman/v2/libpod/define"
"github.com/containers/podman/v2/libpod/events"
"github.com/containers/podman/v2/pkg/cgroups"
"github.com/containers/podman/v2/pkg/parallel"
"github.com/containers/podman/v2/pkg/rootless"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
Expand Down Expand Up @@ -99,47 +100,52 @@ func (p *Pod) StopWithTimeout(ctx context.Context, cleanup bool, timeout int) (m
return nil, err
}

ctrErrors := make(map[string]error)

// TODO: There may be cases where it makes sense to order stops based on
// dependencies. Should we bother with this?

// Stop to all containers
for _, ctr := range allCtrs {
ctr.lock.Lock()
ctrErrChan := make(map[string]<-chan error)

if err := ctr.syncContainer(); err != nil {
ctr.lock.Unlock()
ctrErrors[ctr.ID()] = err
continue
}

// Ignore containers that are not running
if ctr.state.State != define.ContainerStateRunning {
ctr.lock.Unlock()
continue
}
stopTimeout := ctr.config.StopTimeout
if timeout > -1 {
stopTimeout = uint(timeout)
}
if err := ctr.stop(stopTimeout); err != nil {
ctr.lock.Unlock()
ctrErrors[ctr.ID()] = err
continue
}
// Enqueue a function for each container with the parallel executor.
for _, ctr := range allCtrs {
c := ctr
logrus.Debugf("Adding parallel job to stop container %s", c.ID())
retChan := parallel.Enqueue(ctx, func() error {
// TODO: Might be better to batch stop and cleanup
// together?
if timeout > -1 {
if err := c.StopWithTimeout(uint(timeout)); err != nil {
return err
}
} else {
if err := c.Stop(); err != nil {
return err
}
}

if cleanup {
if err := ctr.cleanup(ctx); err != nil {
ctrErrors[ctr.ID()] = err
if cleanup {
return c.Cleanup(ctx)
}
}

ctr.lock.Unlock()
return nil
})

ctrErrChan[c.ID()] = retChan
}

p.newPodEvent(events.Stop)

ctrErrors := make(map[string]error)

// Get returned error for every container we worked on
for id, channel := range ctrErrChan {
if err := <-channel; err != nil {
if errors.Cause(err) == define.ErrCtrStateInvalid || errors.Cause(err) == define.ErrCtrStopped {
continue
}
ctrErrors[id] = err
}
}

if len(ctrErrors) > 0 {
return ctrErrors, errors.Wrapf(define.ErrPodPartialFail, "error stopping some containers")
}
Expand Down Expand Up @@ -169,45 +175,29 @@ func (p *Pod) Cleanup(ctx context.Context) (map[string]error, error) {
return nil, err
}

ctrErrors := make(map[string]error)
ctrErrChan := make(map[string]<-chan error)

// Clean up all containers
// Enqueue a function for each container with the parallel executor.
for _, ctr := range allCtrs {
ctr.lock.Lock()

if err := ctr.syncContainer(); err != nil {
ctr.lock.Unlock()
ctrErrors[ctr.ID()] = err
continue
}

// Ignore containers that are running/paused
if !ctr.ensureState(define.ContainerStateConfigured, define.ContainerStateCreated, define.ContainerStateStopped, define.ContainerStateExited) {
ctr.lock.Unlock()
continue
}

// Check for running exec sessions, ignore containers with them.
sessions, err := ctr.getActiveExecSessions()
if err != nil {
ctr.lock.Unlock()
ctrErrors[ctr.ID()] = err
continue
}
if len(sessions) > 0 {
ctr.lock.Unlock()
continue
}
c := ctr
logrus.Debugf("Adding parallel job to clean up container %s", c.ID())
retChan := parallel.Enqueue(ctx, func() error {
return c.Cleanup(ctx)
})

// TODO: Should we handle restart policy here?
ctrErrChan[c.ID()] = retChan
}

ctr.newContainerEvent(events.Cleanup)
ctrErrors := make(map[string]error)

if err := ctr.cleanup(ctx); err != nil {
ctrErrors[ctr.ID()] = err
// Get returned error for every container we worked on
for id, channel := range ctrErrChan {
if err := <-channel; err != nil {
if errors.Cause(err) == define.ErrCtrStateInvalid || errors.Cause(err) == define.ErrCtrStopped {
continue
}
ctrErrors[id] = err
}

ctr.lock.Unlock()
}

if len(ctrErrors) > 0 {
Expand All @@ -229,7 +219,7 @@ func (p *Pod) Cleanup(ctx context.Context) (map[string]error, error) {
// containers. The container ID is mapped to the error encountered. The error is
// set to ErrPodPartialFail.
// If both error and the map are nil, all containers were paused without error
func (p *Pod) Pause() (map[string]error, error) {
func (p *Pod) Pause(ctx context.Context) (map[string]error, error) {
p.lock.Lock()
defer p.lock.Unlock()

Expand All @@ -252,37 +242,34 @@ func (p *Pod) Pause() (map[string]error, error) {
return nil, err
}

ctrErrors := make(map[string]error)
ctrErrChan := make(map[string]<-chan error)

// Pause to all containers
// Enqueue a function for each container with the parallel executor.
for _, ctr := range allCtrs {
ctr.lock.Lock()
c := ctr
logrus.Debugf("Adding parallel job to pause container %s", c.ID())
retChan := parallel.Enqueue(ctx, c.Pause)

if err := ctr.syncContainer(); err != nil {
ctr.lock.Unlock()
ctrErrors[ctr.ID()] = err
continue
}
ctrErrChan[c.ID()] = retChan
}

// Ignore containers that are not running
if ctr.state.State != define.ContainerStateRunning {
ctr.lock.Unlock()
continue
}
p.newPodEvent(events.Pause)

if err := ctr.pause(); err != nil {
ctr.lock.Unlock()
ctrErrors[ctr.ID()] = err
continue
}
ctrErrors := make(map[string]error)

ctr.lock.Unlock()
// Get returned error for every container we worked on
for id, channel := range ctrErrChan {
if err := <-channel; err != nil {
if errors.Cause(err) == define.ErrCtrStateInvalid || errors.Cause(err) == define.ErrCtrStopped {
continue
}
ctrErrors[id] = err
}
}

if len(ctrErrors) > 0 {
return ctrErrors, errors.Wrapf(define.ErrPodPartialFail, "error pausing some containers")
}
defer p.newPodEvent(events.Pause)
return nil, nil
}

Expand All @@ -298,7 +285,7 @@ func (p *Pod) Pause() (map[string]error, error) {
// containers. The container ID is mapped to the error encountered. The error is
// set to ErrPodPartialFail.
// If both error and the map are nil, all containers were unpaused without error.
func (p *Pod) Unpause() (map[string]error, error) {
func (p *Pod) Unpause(ctx context.Context) (map[string]error, error) {
p.lock.Lock()
defer p.lock.Unlock()

Expand All @@ -311,38 +298,34 @@ func (p *Pod) Unpause() (map[string]error, error) {
return nil, err
}

ctrErrors := make(map[string]error)
ctrErrChan := make(map[string]<-chan error)

// Pause to all containers
// Enqueue a function for each container with the parallel executor.
for _, ctr := range allCtrs {
ctr.lock.Lock()
c := ctr
logrus.Debugf("Adding parallel job to unpause container %s", c.ID())
retChan := parallel.Enqueue(ctx, c.Unpause)

if err := ctr.syncContainer(); err != nil {
ctr.lock.Unlock()
ctrErrors[ctr.ID()] = err
continue
}
ctrErrChan[c.ID()] = retChan
}

// Ignore containers that are not paused
if ctr.state.State != define.ContainerStatePaused {
ctr.lock.Unlock()
continue
}
p.newPodEvent(events.Unpause)

if err := ctr.unpause(); err != nil {
ctr.lock.Unlock()
ctrErrors[ctr.ID()] = err
continue
}
ctrErrors := make(map[string]error)

ctr.lock.Unlock()
// Get returned error for every container we worked on
for id, channel := range ctrErrChan {
if err := <-channel; err != nil {
if errors.Cause(err) == define.ErrCtrStateInvalid || errors.Cause(err) == define.ErrCtrStopped {
continue
}
ctrErrors[id] = err
}
}

if len(ctrErrors) > 0 {
return ctrErrors, errors.Wrapf(define.ErrPodPartialFail, "error unpausing some containers")
}

defer p.newPodEvent(events.Unpause)
return nil, nil
}

Expand Down Expand Up @@ -411,7 +394,7 @@ func (p *Pod) Restart(ctx context.Context) (map[string]error, error) {
// containers. The container ID is mapped to the error encountered. The error is
// set to ErrPodPartialFail.
// If both error and the map are nil, all containers were signalled successfully.
func (p *Pod) Kill(signal uint) (map[string]error, error) {
func (p *Pod) Kill(ctx context.Context, signal uint) (map[string]error, error) {
p.lock.Lock()
defer p.lock.Unlock()

Expand All @@ -424,44 +407,36 @@ func (p *Pod) Kill(signal uint) (map[string]error, error) {
return nil, err
}

ctrErrors := make(map[string]error)
ctrErrChan := make(map[string]<-chan error)

// Send a signal to all containers
// Enqueue a function for each container with the parallel executor.
for _, ctr := range allCtrs {
ctr.lock.Lock()

if err := ctr.syncContainer(); err != nil {
ctr.lock.Unlock()
ctrErrors[ctr.ID()] = err
continue
}
c := ctr
logrus.Debugf("Adding parallel job to kill container %s", c.ID())
retChan := parallel.Enqueue(ctx, func() error {
return c.Kill(signal)
})

// Ignore containers that are not running
if ctr.state.State != define.ContainerStateRunning {
ctr.lock.Unlock()
continue
}
ctrErrChan[c.ID()] = retChan
}

if err := ctr.ociRuntime.KillContainer(ctr, signal, false); err != nil {
ctr.lock.Unlock()
ctrErrors[ctr.ID()] = err
continue
}
p.newPodEvent(events.Kill)

logrus.Debugf("Killed container %s with signal %d", ctr.ID(), signal)
ctrErrors := make(map[string]error)

ctr.state.StoppedByUser = true
if err := ctr.save(); err != nil {
ctrErrors[ctr.ID()] = err
// Get returned error for every container we worked on
for id, channel := range ctrErrChan {
if err := <-channel; err != nil {
if errors.Cause(err) == define.ErrCtrStateInvalid || errors.Cause(err) == define.ErrCtrStopped {
continue
}
ctrErrors[id] = err
}

ctr.lock.Unlock()
}

if len(ctrErrors) > 0 {
return ctrErrors, errors.Wrapf(define.ErrPodPartialFail, "error killing some containers")
}
defer p.newPodEvent(events.Kill)
return nil, nil
}

Expand Down
6 changes: 3 additions & 3 deletions pkg/api/handlers/libpod/pods.go
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,7 @@ func PodPause(w http.ResponseWriter, r *http.Request) {
utils.PodNotFound(w, name, err)
return
}
responses, err := pod.Pause()
responses, err := pod.Pause(r.Context())
if err != nil && errors.Cause(err) != define.ErrPodPartialFail {
utils.Error(w, "Something went wrong", http.StatusInternalServerError, err)
return
Expand All @@ -294,7 +294,7 @@ func PodUnpause(w http.ResponseWriter, r *http.Request) {
utils.PodNotFound(w, name, err)
return
}
responses, err := pod.Unpause()
responses, err := pod.Unpause(r.Context())
if err != nil && errors.Cause(err) != define.ErrPodPartialFail {
utils.Error(w, "failed to pause pod", http.StatusInternalServerError, err)
return
Expand Down Expand Up @@ -402,7 +402,7 @@ func PodKill(w http.ResponseWriter, r *http.Request) {
return
}

responses, err := pod.Kill(uint(sig))
responses, err := pod.Kill(r.Context(), uint(sig))
if err != nil && errors.Cause(err) != define.ErrPodPartialFail {
utils.Error(w, "failed to kill pod", http.StatusInternalServerError, err)
return
Expand Down
Loading