Skip to content

Commit

Permalink
Merge pull request containers#7382 from mheon/pod_parallel
Browse files Browse the repository at this point in the history
Move pod jobs to parallel execution
  • Loading branch information
openshift-merge-robot authored Oct 7, 2020
2 parents 9ae873e + 55f5e4a commit 0e1d011
Show file tree
Hide file tree
Showing 8 changed files with 167 additions and 178 deletions.
241 changes: 108 additions & 133 deletions libpod/pod_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"github.com/containers/podman/v2/libpod/define"
"github.com/containers/podman/v2/libpod/events"
"github.com/containers/podman/v2/pkg/cgroups"
"github.com/containers/podman/v2/pkg/parallel"
"github.com/containers/podman/v2/pkg/rootless"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
Expand Down Expand Up @@ -99,47 +100,52 @@ func (p *Pod) StopWithTimeout(ctx context.Context, cleanup bool, timeout int) (m
return nil, err
}

ctrErrors := make(map[string]error)

// TODO: There may be cases where it makes sense to order stops based on
// dependencies. Should we bother with this?

// Stop to all containers
for _, ctr := range allCtrs {
ctr.lock.Lock()
ctrErrChan := make(map[string]<-chan error)

if err := ctr.syncContainer(); err != nil {
ctr.lock.Unlock()
ctrErrors[ctr.ID()] = err
continue
}

// Ignore containers that are not running
if ctr.state.State != define.ContainerStateRunning {
ctr.lock.Unlock()
continue
}
stopTimeout := ctr.config.StopTimeout
if timeout > -1 {
stopTimeout = uint(timeout)
}
if err := ctr.stop(stopTimeout); err != nil {
ctr.lock.Unlock()
ctrErrors[ctr.ID()] = err
continue
}
// Enqueue a function for each container with the parallel executor.
for _, ctr := range allCtrs {
c := ctr
logrus.Debugf("Adding parallel job to stop container %s", c.ID())
retChan := parallel.Enqueue(ctx, func() error {
// TODO: Might be better to batch stop and cleanup
// together?
if timeout > -1 {
if err := c.StopWithTimeout(uint(timeout)); err != nil {
return err
}
} else {
if err := c.Stop(); err != nil {
return err
}
}

if cleanup {
if err := ctr.cleanup(ctx); err != nil {
ctrErrors[ctr.ID()] = err
if cleanup {
return c.Cleanup(ctx)
}
}

ctr.lock.Unlock()
return nil
})

ctrErrChan[c.ID()] = retChan
}

p.newPodEvent(events.Stop)

ctrErrors := make(map[string]error)

// Get returned error for every container we worked on
for id, channel := range ctrErrChan {
if err := <-channel; err != nil {
if errors.Cause(err) == define.ErrCtrStateInvalid || errors.Cause(err) == define.ErrCtrStopped {
continue
}
ctrErrors[id] = err
}
}

if len(ctrErrors) > 0 {
return ctrErrors, errors.Wrapf(define.ErrPodPartialFail, "error stopping some containers")
}
Expand Down Expand Up @@ -169,45 +175,29 @@ func (p *Pod) Cleanup(ctx context.Context) (map[string]error, error) {
return nil, err
}

ctrErrors := make(map[string]error)
ctrErrChan := make(map[string]<-chan error)

// Clean up all containers
// Enqueue a function for each container with the parallel executor.
for _, ctr := range allCtrs {
ctr.lock.Lock()

if err := ctr.syncContainer(); err != nil {
ctr.lock.Unlock()
ctrErrors[ctr.ID()] = err
continue
}

// Ignore containers that are running/paused
if !ctr.ensureState(define.ContainerStateConfigured, define.ContainerStateCreated, define.ContainerStateStopped, define.ContainerStateExited) {
ctr.lock.Unlock()
continue
}

// Check for running exec sessions, ignore containers with them.
sessions, err := ctr.getActiveExecSessions()
if err != nil {
ctr.lock.Unlock()
ctrErrors[ctr.ID()] = err
continue
}
if len(sessions) > 0 {
ctr.lock.Unlock()
continue
}
c := ctr
logrus.Debugf("Adding parallel job to clean up container %s", c.ID())
retChan := parallel.Enqueue(ctx, func() error {
return c.Cleanup(ctx)
})

// TODO: Should we handle restart policy here?
ctrErrChan[c.ID()] = retChan
}

ctr.newContainerEvent(events.Cleanup)
ctrErrors := make(map[string]error)

if err := ctr.cleanup(ctx); err != nil {
ctrErrors[ctr.ID()] = err
// Get returned error for every container we worked on
for id, channel := range ctrErrChan {
if err := <-channel; err != nil {
if errors.Cause(err) == define.ErrCtrStateInvalid || errors.Cause(err) == define.ErrCtrStopped {
continue
}
ctrErrors[id] = err
}

ctr.lock.Unlock()
}

if len(ctrErrors) > 0 {
Expand All @@ -229,7 +219,7 @@ func (p *Pod) Cleanup(ctx context.Context) (map[string]error, error) {
// containers. The container ID is mapped to the error encountered. The error is
// set to ErrPodPartialFail.
// If both error and the map are nil, all containers were paused without error
func (p *Pod) Pause() (map[string]error, error) {
func (p *Pod) Pause(ctx context.Context) (map[string]error, error) {
p.lock.Lock()
defer p.lock.Unlock()

Expand All @@ -252,37 +242,34 @@ func (p *Pod) Pause() (map[string]error, error) {
return nil, err
}

ctrErrors := make(map[string]error)
ctrErrChan := make(map[string]<-chan error)

// Pause to all containers
// Enqueue a function for each container with the parallel executor.
for _, ctr := range allCtrs {
ctr.lock.Lock()
c := ctr
logrus.Debugf("Adding parallel job to pause container %s", c.ID())
retChan := parallel.Enqueue(ctx, c.Pause)

if err := ctr.syncContainer(); err != nil {
ctr.lock.Unlock()
ctrErrors[ctr.ID()] = err
continue
}
ctrErrChan[c.ID()] = retChan
}

// Ignore containers that are not running
if ctr.state.State != define.ContainerStateRunning {
ctr.lock.Unlock()
continue
}
p.newPodEvent(events.Pause)

if err := ctr.pause(); err != nil {
ctr.lock.Unlock()
ctrErrors[ctr.ID()] = err
continue
}
ctrErrors := make(map[string]error)

ctr.lock.Unlock()
// Get returned error for every container we worked on
for id, channel := range ctrErrChan {
if err := <-channel; err != nil {
if errors.Cause(err) == define.ErrCtrStateInvalid || errors.Cause(err) == define.ErrCtrStopped {
continue
}
ctrErrors[id] = err
}
}

if len(ctrErrors) > 0 {
return ctrErrors, errors.Wrapf(define.ErrPodPartialFail, "error pausing some containers")
}
defer p.newPodEvent(events.Pause)
return nil, nil
}

Expand All @@ -298,7 +285,7 @@ func (p *Pod) Pause() (map[string]error, error) {
// containers. The container ID is mapped to the error encountered. The error is
// set to ErrPodPartialFail.
// If both error and the map are nil, all containers were unpaused without error.
func (p *Pod) Unpause() (map[string]error, error) {
func (p *Pod) Unpause(ctx context.Context) (map[string]error, error) {
p.lock.Lock()
defer p.lock.Unlock()

Expand All @@ -311,38 +298,34 @@ func (p *Pod) Unpause() (map[string]error, error) {
return nil, err
}

ctrErrors := make(map[string]error)
ctrErrChan := make(map[string]<-chan error)

// Pause to all containers
// Enqueue a function for each container with the parallel executor.
for _, ctr := range allCtrs {
ctr.lock.Lock()
c := ctr
logrus.Debugf("Adding parallel job to unpause container %s", c.ID())
retChan := parallel.Enqueue(ctx, c.Unpause)

if err := ctr.syncContainer(); err != nil {
ctr.lock.Unlock()
ctrErrors[ctr.ID()] = err
continue
}
ctrErrChan[c.ID()] = retChan
}

// Ignore containers that are not paused
if ctr.state.State != define.ContainerStatePaused {
ctr.lock.Unlock()
continue
}
p.newPodEvent(events.Unpause)

if err := ctr.unpause(); err != nil {
ctr.lock.Unlock()
ctrErrors[ctr.ID()] = err
continue
}
ctrErrors := make(map[string]error)

ctr.lock.Unlock()
// Get returned error for every container we worked on
for id, channel := range ctrErrChan {
if err := <-channel; err != nil {
if errors.Cause(err) == define.ErrCtrStateInvalid || errors.Cause(err) == define.ErrCtrStopped {
continue
}
ctrErrors[id] = err
}
}

if len(ctrErrors) > 0 {
return ctrErrors, errors.Wrapf(define.ErrPodPartialFail, "error unpausing some containers")
}

defer p.newPodEvent(events.Unpause)
return nil, nil
}

Expand Down Expand Up @@ -411,7 +394,7 @@ func (p *Pod) Restart(ctx context.Context) (map[string]error, error) {
// containers. The container ID is mapped to the error encountered. The error is
// set to ErrPodPartialFail.
// If both error and the map are nil, all containers were signalled successfully.
func (p *Pod) Kill(signal uint) (map[string]error, error) {
func (p *Pod) Kill(ctx context.Context, signal uint) (map[string]error, error) {
p.lock.Lock()
defer p.lock.Unlock()

Expand All @@ -424,44 +407,36 @@ func (p *Pod) Kill(signal uint) (map[string]error, error) {
return nil, err
}

ctrErrors := make(map[string]error)
ctrErrChan := make(map[string]<-chan error)

// Send a signal to all containers
// Enqueue a function for each container with the parallel executor.
for _, ctr := range allCtrs {
ctr.lock.Lock()

if err := ctr.syncContainer(); err != nil {
ctr.lock.Unlock()
ctrErrors[ctr.ID()] = err
continue
}
c := ctr
logrus.Debugf("Adding parallel job to kill container %s", c.ID())
retChan := parallel.Enqueue(ctx, func() error {
return c.Kill(signal)
})

// Ignore containers that are not running
if ctr.state.State != define.ContainerStateRunning {
ctr.lock.Unlock()
continue
}
ctrErrChan[c.ID()] = retChan
}

if err := ctr.ociRuntime.KillContainer(ctr, signal, false); err != nil {
ctr.lock.Unlock()
ctrErrors[ctr.ID()] = err
continue
}
p.newPodEvent(events.Kill)

logrus.Debugf("Killed container %s with signal %d", ctr.ID(), signal)
ctrErrors := make(map[string]error)

ctr.state.StoppedByUser = true
if err := ctr.save(); err != nil {
ctrErrors[ctr.ID()] = err
// Get returned error for every container we worked on
for id, channel := range ctrErrChan {
if err := <-channel; err != nil {
if errors.Cause(err) == define.ErrCtrStateInvalid || errors.Cause(err) == define.ErrCtrStopped {
continue
}
ctrErrors[id] = err
}

ctr.lock.Unlock()
}

if len(ctrErrors) > 0 {
return ctrErrors, errors.Wrapf(define.ErrPodPartialFail, "error killing some containers")
}
defer p.newPodEvent(events.Kill)
return nil, nil
}

Expand Down
6 changes: 3 additions & 3 deletions pkg/api/handlers/libpod/pods.go
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,7 @@ func PodPause(w http.ResponseWriter, r *http.Request) {
utils.PodNotFound(w, name, err)
return
}
responses, err := pod.Pause()
responses, err := pod.Pause(r.Context())
if err != nil && errors.Cause(err) != define.ErrPodPartialFail {
utils.Error(w, "Something went wrong", http.StatusInternalServerError, err)
return
Expand All @@ -294,7 +294,7 @@ func PodUnpause(w http.ResponseWriter, r *http.Request) {
utils.PodNotFound(w, name, err)
return
}
responses, err := pod.Unpause()
responses, err := pod.Unpause(r.Context())
if err != nil && errors.Cause(err) != define.ErrPodPartialFail {
utils.Error(w, "failed to pause pod", http.StatusInternalServerError, err)
return
Expand Down Expand Up @@ -402,7 +402,7 @@ func PodKill(w http.ResponseWriter, r *http.Request) {
return
}

responses, err := pod.Kill(uint(sig))
responses, err := pod.Kill(r.Context(), uint(sig))
if err != nil && errors.Cause(err) != define.ErrPodPartialFail {
utils.Error(w, "failed to kill pod", http.StatusInternalServerError, err)
return
Expand Down
Loading

0 comments on commit 0e1d011

Please sign in to comment.