Skip to content

Commit

Permalink
merge clean pod to remove images
Browse files Browse the repository at this point in the history
  • Loading branch information
CMGS committed Aug 24, 2018
1 parent 642f23f commit 8abbd6f
Show file tree
Hide file tree
Showing 9 changed files with 438 additions and 624 deletions.
34 changes: 0 additions & 34 deletions cluster/calcium/meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,7 @@ package calcium

import (
"context"
"sync"

enginetypes "github.com/docker/docker/api/types"
"github.com/docker/docker/api/types/filters"
"github.com/projecteru2/core/types"
log "github.com/sirupsen/logrus"
)
Expand All @@ -28,37 +25,6 @@ func (c *Calcium) RemovePod(ctx context.Context, podname string) error {
return c.store.RemovePod(ctx, podname)
}

// CleanPod clean pod images
func (c *Calcium) CleanPod(ctx context.Context, podname string, prune bool, images []string) error {
nodes, err := c.store.GetNodesByPod(ctx, podname)
if err != nil {
log.Debugf("[CleanPod] Error during GetNodesByPod %s %v", podname, err)
return err
}
wg := sync.WaitGroup{}
for _, node := range nodes {
wg.Add(1)
go func(node *types.Node) {
defer wg.Done()
for _, image := range images {
if _, err := node.Engine.ImageRemove(ctx, image, enginetypes.ImageRemoveOptions{PruneChildren: true}); err != nil {
log.Infof("[CleanPod] Clean %s pod %s node %s image", podname, node.Name, image)
} else {
log.Errorf("[CleanPod] Clean %s pod %s node %s image failed: %v", podname, node.Name, image, err)
}
}
if prune {
_, err := node.Engine.ImagesPrune(ctx, filters.NewArgs())
if err != nil {
log.Errorf("[CleanPod] Prune %s pod %s node failed: %v", podname, node.Name, err)
}
}
}(node)
}
wg.Wait()
return nil
}

// RemoveNode remove a node
func (c *Calcium) RemoveNode(ctx context.Context, nodename, podname string) (*types.Pod, error) {
n, err := c.GetNode(ctx, podname, nodename)
Expand Down
76 changes: 47 additions & 29 deletions cluster/calcium/remove_image.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,28 @@ import (
"sync"

enginetypes "github.com/docker/docker/api/types"
"github.com/docker/docker/api/types/filters"
"github.com/projecteru2/core/types"
log "github.com/sirupsen/logrus"
)

//RemoveImage remove images
func (c *Calcium) RemoveImage(ctx context.Context, podname, nodename string, images []string) (chan *types.RemoveImageMessage, error) {
// RemoveImage remove images
func (c *Calcium) RemoveImage(ctx context.Context, podname, nodename string, images []string, prune bool) (chan *types.RemoveImageMessage, error) {
ch := make(chan *types.RemoveImageMessage)

node, err := c.GetNode(ctx, podname, nodename)
if err != nil {
return ch, err
var err error
nodes := []*types.Node{}
if nodename != "" {
n, err := c.GetNode(ctx, podname, nodename)
if err != nil {
return ch, err
}
nodes = append(nodes, n)
} else {
nodes, err = c.store.GetNodesByPod(ctx, podname)
if err != nil {
return ch, err
}
}

opts := enginetypes.ImageRemoveOptions{
Expand All @@ -26,36 +38,42 @@ func (c *Calcium) RemoveImage(ctx context.Context, podname, nodename string, ima
go func() {
defer close(ch)
wg := sync.WaitGroup{}
wg.Add(len(images))
defer wg.Wait()

for _, image := range images {
go func(image string) {
for _, node := range nodes {
wg.Add(1)
go func(node *types.Node) {
defer wg.Done()

messages := []string{}
success := true
ms, err := node.Engine.ImageRemove(ctx, image, opts)
if err != nil {
success = false
messages = append(messages, err.Error())
} else {
for _, m := range ms {
if m.Untagged != "" {
messages = append(messages, fmt.Sprintf("Untagged: %s", m.Untagged))
}
if m.Deleted != "" {
messages = append(messages, fmt.Sprintf("Deleted: %s", m.Deleted))
for _, image := range images {
m := &types.RemoveImageMessage{
Success: false,
Image: image,
Messages: []string{},
}
if removeItems, err := node.Engine.ImageRemove(ctx, image, opts); err != nil {
m.Messages = append(m.Messages, err.Error())
} else {
m.Success = true
for _, item := range removeItems {
if item.Untagged != "" {
m.Messages = append(m.Messages, fmt.Sprintf("Untagged: %s", item.Untagged))
}
if item.Deleted != "" {
m.Messages = append(m.Messages, fmt.Sprintf("Deleted: %s", item.Deleted))
}
}
}
ch <- m
}
ch <- &types.RemoveImageMessage{
Image: image,
Success: success,
Messages: messages,
if prune {
_, err := node.Engine.ImagesPrune(ctx, filters.NewArgs())
if err != nil {
log.Errorf("[CleanPod] Prune %s pod %s node failed: %v", podname, node.Name, err)
} else {
log.Infof("[CleanPod] Prune %s pod %s node", podname, node.Name)
}
}
}(image)
}(node)
}
wg.Wait()
}()

return ch, nil
Expand Down
2 changes: 1 addition & 1 deletion cluster/calcium/remove_image_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ func TestRemoveImage(t *testing.T) {
initMockConfig()

images := []string{image}
ch, err := mockc.RemoveImage(context.Background(), podname, nodename, images)
ch, err := mockc.RemoveImage(context.Background(), podname, nodename, images, false)
assert.NoError(t, err)

for c := range ch {
Expand Down
3 changes: 1 addition & 2 deletions cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ type Cluster interface {
AddPod(ctx context.Context, podname, favor, desc string) (*types.Pod, error)
AddNode(ctx context.Context, nodename, endpoint, podname, ca, cert, key string, cpu, share int, memory int64, labels map[string]string) (*types.Node, error)
RemovePod(ctx context.Context, podname string) error
CleanPod(ctx context.Context, podname string, prune bool, images []string) error
RemoveNode(ctx context.Context, nodename, podname string) (*types.Pod, error)
ListPods(ctx context.Context) ([]*types.Pod, error)
ListPodNodes(ctx context.Context, podname string, all bool) ([]*types.Node, error)
Expand All @@ -48,7 +47,7 @@ type Cluster interface {
// cluster methods
Copy(ctx context.Context, opts *types.CopyOptions) (chan *types.CopyMessage, error)
BuildImage(ctx context.Context, opts *types.BuildOptions) (chan *types.BuildImageMessage, error)
RemoveImage(ctx context.Context, podname, nodename string, images []string) (chan *types.RemoveImageMessage, error)
RemoveImage(ctx context.Context, podname, nodename string, images []string, prune bool) (chan *types.RemoveImageMessage, error)
DeployStatusStream(ctx context.Context, appname, entrypoint, nodename string) chan *types.DeployStatus
RunAndWait(ctx context.Context, opts *types.DeployOptions, stdin io.ReadCloser) (chan *types.RunAndWaitMessage, error)
// this methods will not interrupt by client
Expand Down
Loading

0 comments on commit 8abbd6f

Please sign in to comment.