Skip to content

Commit

Permalink
Merge branch 'context_with_timeout' into 'master'
Browse files Browse the repository at this point in the history
context with timeout

See merge request !128
  • Loading branch information
CMGS committed Aug 9, 2017
2 parents 273a3f5 + 2f319ab commit ea6d3e6
Show file tree
Hide file tree
Showing 21 changed files with 345 additions and 176 deletions.
24 changes: 18 additions & 6 deletions cluster/calcium/build_image.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,10 @@ func (c *calcium) BuildImage(repository, version, uid, artifact string) (chan *t
}

log.Infof("Building image %v with artifact %v at %v:%v", tag, artifact, buildPodname, node.Name)
resp, err := node.Engine.ImageBuild(context.Background(), buildContext, buildOptions)

ctx, cancel := context.WithTimeout(context.Background(), c.config.Timeout.BuildImage)
defer cancel()
resp, err := node.Engine.ImageBuild(ctx, buildContext, buildOptions)
if err != nil {
return ch, err
}
Expand All @@ -196,9 +199,11 @@ func (c *calcium) BuildImage(repository, version, uid, artifact string) (chan *t
ch <- message
}

ctx, cancel := context.WithTimeout(context.Background(), c.config.Timeout.BuildImage)
defer cancel()
// About this "Khadgar", https://github.com/docker/docker/issues/10983#issuecomment-85892396
// Just because Ben Schnetzer's cute Khadgar...
rc, err := node.Engine.ImagePush(context.Background(), tag, enginetypes.ImagePushOptions{RegistryAuth: "Khadgar"})
rc, err := node.Engine.ImagePush(ctx, tag, enginetypes.ImagePushOptions{RegistryAuth: "Khadgar"})
if err != nil {
ch <- makeErrorBuildImageMessage(err)
return
Expand All @@ -222,10 +227,17 @@ func (c *calcium) BuildImage(repository, version, uid, artifact string) (chan *t
// 无论如何都删掉build机器的
// 事实上他不会跟cached pod一样
// 一样就砍死
go node.Engine.ImageRemove(context.Background(), tag, enginetypes.ImageRemoveOptions{
Force: false,
PruneChildren: true,
})
go func() {
ctx, cancel := context.WithTimeout(context.Background(), c.config.Timeout.BuildImage)
defer cancel()
_, err := node.Engine.ImageRemove(ctx, tag, enginetypes.ImageRemoveOptions{
Force: false,
PruneChildren: true,
})
if err != nil {
log.Errorf("Remove image error: %s", err)
}
}()

ch <- &types.BuildImageMessage{Status: "finished", Progress: tag}
}()
Expand Down
4 changes: 2 additions & 2 deletions cluster/calcium/build_image_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import (
func TestGetRandomNode(t *testing.T) {
store := &mockstore.MockStore{}
config := types.Config{}
c := &calcium{store: store, config: config, scheduler: simplescheduler.New(), network: calico.New(), source: gitlab.New(config.Git)}
c := &calcium{store: store, config: config, scheduler: simplescheduler.New(), network: calico.New(config), source: gitlab.New(config)}

n1 := &types.Node{Name: "node1", Podname: "podname", Endpoint: "tcp://10.0.0.1:2376", CPU: types.CPUMap{"0": 10, "1": 10}, Available: true}
n2 := &types.Node{Name: "node2", Podname: "podname", Endpoint: "tcp://10.0.0.2:2376", CPU: types.CPUMap{"0": 10, "1": 10}, Available: true}
Expand All @@ -34,7 +34,7 @@ func TestGetRandomNode(t *testing.T) {
func TestGetRandomNodeFail(t *testing.T) {
store := &mockstore.MockStore{}
config := types.Config{}
c := &calcium{store: store, config: config, scheduler: simplescheduler.New(), network: calico.New(), source: gitlab.New(config.Git)}
c := &calcium{store: store, config: config, scheduler: simplescheduler.New(), network: calico.New(config), source: gitlab.New(config)}

n1 := &types.Node{Name: "node1", Podname: "podname", Endpoint: "tcp://10.0.0.1:2376", CPU: types.CPUMap{"0": 10, "1": 10}, Available: false}
n2 := &types.Node{Name: "node2", Podname: "podname", Endpoint: "tcp://10.0.0.2:2376", CPU: types.CPUMap{"0": 10, "1": 10}, Available: false}
Expand Down
46 changes: 41 additions & 5 deletions cluster/calcium/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package calcium
import (
"fmt"
"strings"
"time"

"gitlab.ricebook.net/platform/core/network"
"gitlab.ricebook.net/platform/core/network/calico"
Expand Down Expand Up @@ -33,33 +34,68 @@ const (

// New returns a new cluster config
func New(config types.Config) (*calcium, error) {
var err error
// set timeout config
config = setTimeout(config)

// set store
store, err := etcdstore.New(config)
if err != nil {
return nil, err
}

// set scheduler
scheduler, err := complexscheduler.New(config)
if err != nil {
return nil, err
}
titanium := calico.New()

// set network
titanium := calico.New(config)

// set scm
var scm source.Source
scmtype := strings.ToLower(config.Git.SCMType)

switch scmtype {
case GITLAB:
scm = gitlab.New(config.Git)
scm = gitlab.New(config)
case GITHUB:
scm = github.New(config.Git)
scm = github.New(config)
default:
return nil, fmt.Errorf("Unkonwn SCM type: %s", config.Git.SCMType)
}

return &calcium{store: store, config: config, scheduler: scheduler, network: titanium, source: scm}, nil
}

// SetStore 用于在单元测试中更改etcd store为mock store
func (c *calcium) SetStore(s store.Store) {
c.store = s
}

func setTimeout(config types.Config) types.Config {
ct := config.Timeout

ct.Common *= time.Second
c := ct.Common
if ct.Backup *= time.Second; ct.Backup == 0 {
ct.Backup = c
}
if ct.BuildImage *= time.Second; ct.BuildImage == 0 {
ct.BuildImage = c
}
if ct.CreateContainer *= time.Second; ct.CreateContainer == 0 {
ct.CreateContainer = c
}
if ct.RemoveContainer *= time.Second; ct.RemoveContainer == 0 {
ct.RemoveContainer = c
}
if ct.RemoveImage *= time.Second; ct.RemoveImage == 0 {
ct.RemoveImage = c
}
if ct.RunAndWait *= time.Second; ct.RunAndWait == 0 {
ct.RunAndWait = c
}

config.Timeout = ct
return config
}
62 changes: 41 additions & 21 deletions cluster/calcium/create_container.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"strconv"
"strings"
"sync"
"time"

log "github.com/Sirupsen/logrus"
enginetypes "github.com/docker/docker/api/types"
Expand Down Expand Up @@ -81,7 +82,9 @@ func (c *calcium) createContainerWithMemoryPrior(specs types.Specs, opts *types.

func (c *calcium) removeMemoryPodFailedContainer(id string, node *types.Node, nodeInfo types.NodeInfo, opts *types.DeployOptions) {
defer c.store.UpdateNodeMem(opts.Podname, nodeInfo.Name, opts.Memory, "+")
if err := node.Engine.ContainerRemove(context.Background(), id, enginetypes.ContainerRemoveOptions{}); err != nil {
ctx, cancel := context.WithTimeout(context.Background(), c.config.Timeout.CreateContainer)
defer cancel()
if err := node.Engine.ContainerRemove(ctx, id, enginetypes.ContainerRemoveOptions{}); err != nil {
log.Errorf("[RemoveMemoryPodFailedContainer] Error during remove failed container %v", err)
}
}
Expand All @@ -98,7 +101,10 @@ func (c *calcium) doCreateContainerWithMemoryPrior(nodeInfo types.NodeInfo, spec
return ms
}

if err := pullImage(node, opts.Image); err != nil {
if err := pullImage(node, opts.Image, c.config.Timeout.CreateContainer); err != nil {
for i := 0; i < nodeInfo.Deploy; i++ {
ms[i].Error = err.Error()
}
return ms
}

Expand All @@ -115,7 +121,9 @@ func (c *calcium) doCreateContainerWithMemoryPrior(nodeInfo types.NodeInfo, spec
}

//create container
container, err := node.Engine.ContainerCreate(context.Background(), config, hostConfig, networkConfig, containerName)
ctx, cancel := context.WithTimeout(context.Background(), c.config.Timeout.CreateContainer)
defer cancel()
container, err := node.Engine.ContainerCreate(ctx, config, hostConfig, networkConfig, containerName)
if err != nil {
log.Errorf("[CreateContainerWithMemoryPrior] Error during ContainerCreate, %v", err)
ms[i].Error = err.Error()
Expand Down Expand Up @@ -151,8 +159,9 @@ func (c *calcium) doCreateContainerWithMemoryPrior(nodeInfo types.NodeInfo, spec
continue
}
}

err = node.Engine.ContainerStart(context.Background(), container.ID, enginetypes.ContainerStartOptions{})
ctxStart, cancelStart := context.WithTimeout(context.Background(), c.config.Timeout.CreateContainer)
defer cancelStart()
err = node.Engine.ContainerStart(ctxStart, container.ID, enginetypes.ContainerStartOptions{})
if err != nil {
log.Errorf("[CreateContainerWithMemoryPrior] Error during ContainerStart, %v", err)
ms[i].Error = err.Error()
Expand All @@ -163,8 +172,9 @@ func (c *calcium) doCreateContainerWithMemoryPrior(nodeInfo types.NodeInfo, spec
// TODO
// if network manager uses our own, then connect must be called after container starts
// here

info, err := node.Engine.ContainerInspect(context.Background(), container.ID)
ctxInspect, cancelInspect := context.WithTimeout(context.Background(), c.config.Timeout.CreateContainer)
defer cancelInspect()
info, err := node.Engine.ContainerInspect(ctxInspect, container.ID)
if err != nil {
log.Errorf("[CreateContainerWithMemoryPrior] Error during ContainerInspect, %v", err)
ms[i].Error = err.Error()
Expand All @@ -174,7 +184,7 @@ func (c *calcium) doCreateContainerWithMemoryPrior(nodeInfo types.NodeInfo, spec
ms[i].ContainerID = info.ID

// after start
if err := runExec(node.Engine, info, AFTER_START); err != nil {
if err := runExec(node.Engine, info, AFTER_START, c.config.Timeout.CreateContainer); err != nil {
log.Errorf("[CreateContainerWithMemoryPrior] Run exec at %s error: %v", AFTER_START, err)
}

Expand Down Expand Up @@ -242,7 +252,9 @@ func (c *calcium) createContainerWithCPUPrior(specs types.Specs, opts *types.Dep

func (c *calcium) removeCPUPodFailedContainer(id string, node *types.Node, quota types.CPUMap) {
defer c.releaseQuota(node, quota)
if err := node.Engine.ContainerRemove(context.Background(), id, enginetypes.ContainerRemoveOptions{}); err != nil {
ctx, cancel := context.WithTimeout(context.Background(), c.config.Timeout.CreateContainer)
defer cancel()
if err := node.Engine.ContainerRemove(ctx, id, enginetypes.ContainerRemoveOptions{}); err != nil {
log.Errorf("[RemoveCPUPodFailedContainer] Error during remove failed container %v", err)
}
}
Expand All @@ -259,7 +271,10 @@ func (c *calcium) doCreateContainerWithCPUPrior(nodeName string, cpuMap []types.
return ms
}

if err := pullImage(node, opts.Image); err != nil {
if err := pullImage(node, opts.Image, c.config.Timeout.CreateContainer); err != nil {
for i := 0; i < len(ms); i++ {
ms[i].Error = err.Error()
}
return ms
}

Expand All @@ -277,7 +292,9 @@ func (c *calcium) doCreateContainerWithCPUPrior(nodeName string, cpuMap []types.
}

// create container
container, err := node.Engine.ContainerCreate(context.Background(), config, hostConfig, networkConfig, containerName)
ctxCreate, cancelCreate := context.WithTimeout(context.Background(), c.config.Timeout.CreateContainer)
defer cancelCreate()
container, err := node.Engine.ContainerCreate(ctxCreate, config, hostConfig, networkConfig, containerName)
if err != nil {
log.Errorf("[CreateContainerWithCPUPrior] Error when creating container, %v", err)
ms[i].Error = err.Error()
Expand Down Expand Up @@ -315,8 +332,9 @@ func (c *calcium) doCreateContainerWithCPUPrior(nodeName string, cpuMap []types.
continue
}
}

err = node.Engine.ContainerStart(context.Background(), container.ID, enginetypes.ContainerStartOptions{})
ctxStart, cancelStart := context.WithTimeout(context.Background(), c.config.Timeout.CreateContainer)
defer cancelStart()
err = node.Engine.ContainerStart(ctxStart, container.ID, enginetypes.ContainerStartOptions{})
if err != nil {
log.Errorf("[CreateContainerWithCPUPrior] Error when starting container, %v", err)
ms[i].Error = err.Error()
Expand All @@ -327,8 +345,9 @@ func (c *calcium) doCreateContainerWithCPUPrior(nodeName string, cpuMap []types.
// TODO
// if network manager uses our own, then connect must be called after container starts
// here

info, err := node.Engine.ContainerInspect(context.Background(), container.ID)
ctxInspect, cancelInspect := context.WithTimeout(context.Background(), c.config.Timeout.CreateContainer)
defer cancelInspect()
info, err := node.Engine.ContainerInspect(ctxInspect, container.ID)
if err != nil {
log.Errorf("[CreateContainerWithCPUPrior] Error when inspecting container, %v", err)
ms[i].Error = err.Error()
Expand All @@ -338,7 +357,7 @@ func (c *calcium) doCreateContainerWithCPUPrior(nodeName string, cpuMap []types.
ms[i].ContainerID = info.ID

// after start
if err := runExec(node.Engine, info, AFTER_START); err != nil {
if err := runExec(node.Engine, info, AFTER_START, c.config.Timeout.CreateContainer); err != nil {
log.Errorf("[CreateContainerWithCPUPrior] Run exec at %s error: %v", AFTER_START, err)
}

Expand Down Expand Up @@ -564,18 +583,19 @@ func (c *calcium) makeContainerOptions(index int, quota map[string]int, specs ty

// Pull an image
// Blocks until it finishes.
func pullImage(node *types.Node, image string) error {
func pullImage(node *types.Node, image string, timeout time.Duration) error {
log.Debugf("Pulling image %s", image)
if image == "" {
return fmt.Errorf("Goddamn empty image, WTF?")
}

resp, err := node.Engine.ImagePull(context.Background(), image, enginetypes.ImagePullOptions{})
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
outStream, err := node.Engine.ImagePull(ctx, image, enginetypes.ImagePullOptions{})
if err != nil {
log.Errorf("Error during pulling image %s: %v", image, err)
return err
}
ensureReaderClosed(resp)
ensureReaderClosed(outStream)
log.Debugf("Done pulling image %s", image)
return nil
return ctx.Err()
}
Loading

0 comments on commit ea6d3e6

Please sign in to comment.