Skip to content

Commit

Permalink
Merge branch 'refactor/timeout' into 'master'
Browse files Browse the repository at this point in the history
[skip ci] completely remove timeout from config

See merge request !141
  • Loading branch information
CMGS committed Aug 28, 2017
2 parents 74208e1 + 31ebc2e commit bb1e51d
Show file tree
Hide file tree
Showing 16 changed files with 57 additions and 172 deletions.
12 changes: 3 additions & 9 deletions cluster/calcium/build_image.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,9 +176,7 @@ func (c *calcium) BuildImage(repository, version, uid, artifact string) (chan *t

log.Infof("Building image %v with artifact %v at %v:%v", tag, artifact, buildPodname, node.Name)

ctx, cancel := context.WithTimeout(context.Background(), c.config.Timeout.BuildImage)
defer cancel()
resp, err := node.Engine.ImageBuild(ctx, buildContext, buildOptions)
resp, err := node.Engine.ImageBuild(context.Background(), buildContext, buildOptions)
if err != nil {
return ch, err
}
Expand All @@ -200,11 +198,9 @@ func (c *calcium) BuildImage(repository, version, uid, artifact string) (chan *t
ch <- message
}

ctx, cancel := context.WithTimeout(context.Background(), c.config.Timeout.BuildImage)
defer cancel()
// About this "Khadgar", https://github.com/docker/docker/issues/10983#issuecomment-85892396
// Just because Ben Schnetzer's cute Khadgar...
rc, err := node.Engine.ImagePush(ctx, tag, enginetypes.ImagePushOptions{RegistryAuth: "Khadgar"})
rc, err := node.Engine.ImagePush(context.Background(), tag, enginetypes.ImagePushOptions{RegistryAuth: "Khadgar"})
if err != nil {
ch <- makeErrorBuildImageMessage(err)
return
Expand All @@ -229,9 +225,7 @@ func (c *calcium) BuildImage(repository, version, uid, artifact string) (chan *t
// 事实上他不会跟cached pod一样
// 一样就砍死
go func() {
ctx, cancel := context.WithTimeout(context.Background(), c.config.Timeout.BuildImage)
defer cancel()
_, err := node.Engine.ImageRemove(ctx, tag, enginetypes.ImageRemoveOptions{
_, err := node.Engine.ImageRemove(context.Background(), tag, enginetypes.ImageRemoveOptions{
Force: false,
PruneChildren: true,
})
Expand Down
4 changes: 2 additions & 2 deletions cluster/calcium/build_image_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import (
func TestGetRandomNode(t *testing.T) {
store := &mockstore.MockStore{}
config := types.Config{}
c := &calcium{store: store, config: config, scheduler: simplescheduler.New(), network: calico.New(config), source: gitlab.New(config)}
c := &calcium{store: store, config: config, scheduler: simplescheduler.New(), network: calico.New(), source: gitlab.New(config)}

n1 := &types.Node{Name: "node1", Podname: "podname", Endpoint: "tcp://10.0.0.1:2376", CPU: types.CPUMap{"0": 10, "1": 10}, Available: true}
n2 := &types.Node{Name: "node2", Podname: "podname", Endpoint: "tcp://10.0.0.2:2376", CPU: types.CPUMap{"0": 10, "1": 10}, Available: true}
Expand All @@ -34,7 +34,7 @@ func TestGetRandomNode(t *testing.T) {
func TestGetRandomNodeFail(t *testing.T) {
store := &mockstore.MockStore{}
config := types.Config{}
c := &calcium{store: store, config: config, scheduler: simplescheduler.New(), network: calico.New(config), source: gitlab.New(config)}
c := &calcium{store: store, config: config, scheduler: simplescheduler.New(), network: calico.New(), source: gitlab.New(config)}

n1 := &types.Node{Name: "node1", Podname: "podname", Endpoint: "tcp://10.0.0.1:2376", CPU: types.CPUMap{"0": 10, "1": 10}, Available: false}
n2 := &types.Node{Name: "node2", Podname: "podname", Endpoint: "tcp://10.0.0.2:2376", CPU: types.CPUMap{"0": 10, "1": 10}, Available: false}
Expand Down
2 changes: 1 addition & 1 deletion cluster/calcium/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ func New(config types.Config) (*calcium, error) {
}

// set network
titanium := calico.New(config)
titanium := calico.New()

// set scm
var scm source.Source
Expand Down
36 changes: 10 additions & 26 deletions cluster/calcium/create_container.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,9 +83,7 @@ func (c *calcium) createContainerWithMemoryPrior(specs types.Specs, opts *types.

func (c *calcium) removeMemoryPodFailedContainer(id string, node *types.Node, nodeInfo types.NodeInfo, opts *types.DeployOptions) {
defer c.store.UpdateNodeMem(opts.Podname, nodeInfo.Name, opts.Memory, "+")
ctx, cancel := context.WithTimeout(context.Background(), c.config.Timeout.CreateContainer)
defer cancel()
if err := node.Engine.ContainerRemove(ctx, id, enginetypes.ContainerRemoveOptions{}); err != nil {
if err := node.Engine.ContainerRemove(context.Background(), id, enginetypes.ContainerRemoveOptions{}); err != nil {
log.Errorf("[RemoveMemoryPodFailedContainer] Error during remove failed container %v", err)
}
}
Expand Down Expand Up @@ -127,9 +125,7 @@ func (c *calcium) doCreateContainerWithMemoryPrior(nodeInfo types.NodeInfo, spec
}

//create container
ctx, cancel := context.WithTimeout(context.Background(), c.config.Timeout.CreateContainer)
defer cancel()
container, err := node.Engine.ContainerCreate(ctx, config, hostConfig, networkConfig, containerName)
container, err := node.Engine.ContainerCreate(context.Background(), config, hostConfig, networkConfig, containerName)
if err != nil {
log.Errorf("[CreateContainerWithMemoryPrior] Error during ContainerCreate, %v", err)
ms[i].Error = err.Error()
Expand Down Expand Up @@ -165,9 +161,7 @@ func (c *calcium) doCreateContainerWithMemoryPrior(nodeInfo types.NodeInfo, spec
continue
}
}
ctxStart, cancelStart := context.WithTimeout(context.Background(), c.config.Timeout.CreateContainer)
defer cancelStart()
err = node.Engine.ContainerStart(ctxStart, container.ID, enginetypes.ContainerStartOptions{})
err = node.Engine.ContainerStart(context.Background(), container.ID, enginetypes.ContainerStartOptions{})
if err != nil {
log.Errorf("[CreateContainerWithMemoryPrior] Error during ContainerStart, %v", err)
ms[i].Error = err.Error()
Expand All @@ -178,9 +172,7 @@ func (c *calcium) doCreateContainerWithMemoryPrior(nodeInfo types.NodeInfo, spec
// TODO
// if network manager uses our own, then connect must be called after container starts
// here
ctxInspect, cancelInspect := context.WithTimeout(context.Background(), c.config.Timeout.CreateContainer)
defer cancelInspect()
info, err := node.Engine.ContainerInspect(ctxInspect, container.ID)
info, err := node.Engine.ContainerInspect(context.Background(), container.ID)
if err != nil {
log.Errorf("[CreateContainerWithMemoryPrior] Error during ContainerInspect, %v", err)
ms[i].Error = err.Error()
Expand All @@ -190,7 +182,7 @@ func (c *calcium) doCreateContainerWithMemoryPrior(nodeInfo types.NodeInfo, spec
ms[i].ContainerID = info.ID

// after start
if err := runExec(node.Engine, info, AFTER_START, c.config.Timeout.CreateContainer); err != nil {
if err := runExec(node.Engine, info, AFTER_START); err != nil {
log.Errorf("[CreateContainerWithMemoryPrior] Run exec at %s error: %v", AFTER_START, err)
}

Expand Down Expand Up @@ -259,9 +251,7 @@ func (c *calcium) createContainerWithCPUPrior(specs types.Specs, opts *types.Dep

func (c *calcium) removeCPUPodFailedContainer(id string, node *types.Node, quota types.CPUMap) {
defer c.releaseQuota(node, quota)
ctx, cancel := context.WithTimeout(context.Background(), c.config.Timeout.CreateContainer)
defer cancel()
if err := node.Engine.ContainerRemove(ctx, id, enginetypes.ContainerRemoveOptions{}); err != nil {
if err := node.Engine.ContainerRemove(context.Background(), id, enginetypes.ContainerRemoveOptions{}); err != nil {
log.Errorf("[RemoveCPUPodFailedContainer] Error during remove failed container %v", err)
}
}
Expand Down Expand Up @@ -301,9 +291,7 @@ func (c *calcium) doCreateContainerWithCPUPrior(nodeName string, cpuMap []types.
}

// create container
ctxCreate, cancelCreate := context.WithTimeout(context.Background(), c.config.Timeout.CreateContainer)
defer cancelCreate()
container, err := node.Engine.ContainerCreate(ctxCreate, config, hostConfig, networkConfig, containerName)
container, err := node.Engine.ContainerCreate(context.Background(), config, hostConfig, networkConfig, containerName)
if err != nil {
log.Errorf("[CreateContainerWithCPUPrior] Error when creating container, %v", err)
ms[i].Error = err.Error()
Expand Down Expand Up @@ -341,9 +329,7 @@ func (c *calcium) doCreateContainerWithCPUPrior(nodeName string, cpuMap []types.
continue
}
}
ctxStart, cancelStart := context.WithTimeout(context.Background(), c.config.Timeout.CreateContainer)
defer cancelStart()
err = node.Engine.ContainerStart(ctxStart, container.ID, enginetypes.ContainerStartOptions{})
err = node.Engine.ContainerStart(context.Background(), container.ID, enginetypes.ContainerStartOptions{})
if err != nil {
log.Errorf("[CreateContainerWithCPUPrior] Error when starting container, %v", err)
ms[i].Error = err.Error()
Expand All @@ -354,9 +340,7 @@ func (c *calcium) doCreateContainerWithCPUPrior(nodeName string, cpuMap []types.
// TODO
// if network manager uses our own, then connect must be called after container starts
// here
ctxInspect, cancelInspect := context.WithTimeout(context.Background(), c.config.Timeout.CreateContainer)
defer cancelInspect()
info, err := node.Engine.ContainerInspect(ctxInspect, container.ID)
info, err := node.Engine.ContainerInspect(context.Background(), container.ID)
if err != nil {
log.Errorf("[CreateContainerWithCPUPrior] Error when inspecting container, %v", err)
ms[i].Error = err.Error()
Expand All @@ -366,7 +350,7 @@ func (c *calcium) doCreateContainerWithCPUPrior(nodeName string, cpuMap []types.
ms[i].ContainerID = info.ID

// after start
if err := runExec(node.Engine, info, AFTER_START, c.config.Timeout.CreateContainer); err != nil {
if err := runExec(node.Engine, info, AFTER_START); err != nil {
log.Errorf("[CreateContainerWithCPUPrior] Run exec at %s error: %v", AFTER_START, err)
}

Expand Down
11 changes: 3 additions & 8 deletions cluster/calcium/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"os"
"path/filepath"
"strings"
"time"

log "github.com/Sirupsen/logrus"
enginetypes "github.com/docker/docker/api/types"
Expand Down Expand Up @@ -188,7 +187,7 @@ func makeMountPaths(specs types.Specs, config types.Config) ([]string, map[strin

// 跑存在labels里的exec
// 为什么要存labels呢, 因为下线容器的时候根本不知道entrypoint是啥
func runExec(client *engineapi.Client, container enginetypes.ContainerJSON, label string, timeout time.Duration) error {
func runExec(client *engineapi.Client, container enginetypes.ContainerJSON, label string) error {
cmd, ok := container.Config.Labels[label]
if !ok || cmd == "" {
log.Debugf("No %s found in container %s", label, container.ID)
Expand All @@ -197,14 +196,10 @@ func runExec(client *engineapi.Client, container enginetypes.ContainerJSON, labe

cmds := utils.MakeCommandLineArgs(cmd)
execConfig := enginetypes.ExecConfig{User: container.Config.User, Cmd: cmds}
ctxExec, cancelCreate := context.WithTimeout(context.Background(), timeout)
defer cancelCreate()
resp, err := client.ContainerExecCreate(ctxExec, container.ID, execConfig)
resp, err := client.ContainerExecCreate(context.Background(), container.ID, execConfig)
if err != nil {
log.Errorf("Error during runExec: %v", err)
return err
}
ctxStart, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
return client.ContainerExecStart(ctxStart, resp.ID, enginetypes.ExecStartCheck{})
return client.ContainerExecStart(context.Background(), resp.ID, enginetypes.ExecStartCheck{})
}
13 changes: 4 additions & 9 deletions cluster/calcium/image.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"
"strings"
"sync"
"time"

log "github.com/Sirupsen/logrus"
enginetypes "github.com/docker/docker/api/types"
Expand Down Expand Up @@ -63,7 +62,7 @@ func (c *calcium) cleanImage(podname, image string) error {
wg.Add(1)
go func(node *types.Node) {
defer wg.Done()
if err := cleanImageOnNode(node, image, c.config.ImageCache, c.config.Timeout.RemoveImage); err != nil {
if err := cleanImageOnNode(node, image, c.config.ImageCache); err != nil {
log.Errorf("cleanImageOnNode error: %s", err)
}
}(node)
Expand All @@ -90,14 +89,12 @@ func (x imageList) Less(i, j int) bool { return x[i].Created > x[j].Created }
// 清理一个node上的这个image
// 只清理同名字不同tag的
// 并且保留最新的两个
func cleanImageOnNode(node *types.Node, image string, count int, timeout time.Duration) error {
func cleanImageOnNode(node *types.Node, image string, count int) error {
log.Debugf("[cleanImageOnNode] node: %s, image: %s", node.Name, strings.Split(image, ":")[0])
imgListFilter := filters.NewArgs()
image = normalizeImage(image)
imgListFilter.Add("reference", image) // 相同repo的image
ctxList, cancelList := context.WithTimeout(context.Background(), timeout)
defer cancelList()
images, err := node.Engine.ImageList(ctxList, enginetypes.ImageListOptions{Filters: imgListFilter})
images, err := node.Engine.ImageList(context.Background(), enginetypes.ImageListOptions{Filters: imgListFilter})
if err != nil {
return err
}
Expand All @@ -110,15 +107,13 @@ func cleanImageOnNode(node *types.Node, image string, count int, timeout time.Du
log.Debugf("Delete Images: %v", images)

for _, image := range images {
ctx, cancel := context.WithTimeout(context.Background(), timeout)
_, err := node.Engine.ImageRemove(ctx, image.ID, enginetypes.ImageRemoveOptions{
_, err := node.Engine.ImageRemove(context.Background(), image.ID, enginetypes.ImageRemoveOptions{
Force: false,
PruneChildren: true,
})
if err != nil {
log.Errorf("[cleanImageOnNode] Node %s ImageRemove error: %s, imageID: %s", node.Name, err, image.ID)
}
cancel()
}
return nil
}
6 changes: 3 additions & 3 deletions cluster/calcium/meta_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import (
func TestListPods(t *testing.T) {
store := &mockstore.MockStore{}
config := types.Config{}
c := &calcium{store: store, config: config, scheduler: simplescheduler.New(), network: calico.New(config), source: gitlab.New(config)}
c := &calcium{store: store, config: config, scheduler: simplescheduler.New(), network: calico.New(), source: gitlab.New(config)}

store.On("GetAllPods").Return([]*types.Pod{
&types.Pod{Name: "pod1", Desc: "desc1"},
Expand All @@ -36,7 +36,7 @@ func TestListPods(t *testing.T) {
func TestAddPod(t *testing.T) {
store := &mockstore.MockStore{}
config := types.Config{}
c := &calcium{store: store, config: config, scheduler: simplescheduler.New(), network: calico.New(config), source: gitlab.New(config)}
c := &calcium{store: store, config: config, scheduler: simplescheduler.New(), network: calico.New(), source: gitlab.New(config)}

store.On("AddPod", "pod1", "", "desc1").Return(&types.Pod{Name: "pod1", Favor: "MEM", Desc: "desc1"}, nil)
store.On("AddPod", "pod2", "", "desc2").Return(nil, fmt.Errorf("Etcd Error"))
Expand All @@ -55,7 +55,7 @@ func TestAddPod(t *testing.T) {
func TestGetPods(t *testing.T) {
store := &mockstore.MockStore{}
config := types.Config{}
c := &calcium{store: store, config: config, scheduler: simplescheduler.New(), network: calico.New(config), source: gitlab.New(config)}
c := &calcium{store: store, config: config, scheduler: simplescheduler.New(), network: calico.New(), source: gitlab.New(config)}

store.On("GetPod", "pod1").Return(&types.Pod{Name: "pod1", Desc: "desc1"}, nil).Once()
store.On("GetPod", "pod2").Return(nil, fmt.Errorf("Not found")).Once()
Expand Down
14 changes: 5 additions & 9 deletions cluster/calcium/mock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,19 +41,15 @@ var (
Hub: "hub.testhub.com",
HubPrefix: "apps",
},
Timeout: coretypes.TimeoutConfig{
Common: 3,
},
Scheduler: coretypes.SchedConfig{
ShareBase: 10,
MaxShare: -1,
},
}
mockc *calcium
mockStore *mockstore.MockStore
err error
mockTimeoutError bool
specs = coretypes.Specs{
mockc *calcium
mockStore *mockstore.MockStore
err error
specs = coretypes.Specs{
Appname: "root",
Entrypoints: map[string]coretypes.Entrypoint{
"test": coretypes.Entrypoint{
Expand Down Expand Up @@ -291,7 +287,7 @@ func (tf transportFunc) RoundTrip(req *http.Request) (*http.Response, error) {
}

func initMockConfig() {
mockc, err = New(utils.SetTimeout(config))
mockc, err = New(config)
if err != nil {
panic(err)
}
Expand Down
11 changes: 4 additions & 7 deletions cluster/calcium/realloc.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"
"fmt"
"sync"
"time"

log "github.com/Sirupsen/logrus"
enginecontainer "github.com/docker/docker/api/types/container"
Expand Down Expand Up @@ -149,7 +148,7 @@ func (c *calcium) doUpdateContainerWithMemoryPrior(
// CPUQuota not cpu
newResource := c.makeMemoryPriorSetting(newMemory, float64(newCPUQuota)/float64(utils.CpuPeriodBase))
updateConfig := enginecontainer.UpdateConfig{Resources: newResource}
if err := reSetContainer(containerJSON.ID, node, updateConfig, c.config.Timeout.Realloc); err != nil {
if err := reSetContainer(containerJSON.ID, node, updateConfig); err != nil {
log.Errorf("[realloc] update container failed %v, %s", err, containerJSON.ID)
ch <- &types.ReallocResourceMessage{ContainerID: containerJSON.ID, Success: false}
// 如果是增加内存,失败的时候应该把内存还回去
Expand Down Expand Up @@ -258,7 +257,7 @@ func (c *calcium) doReallocContainersWithCPUPrior(
for index, container := range containers {
resource := c.makeCPUPriorSetting(cpuset[index])
updateConfig := enginecontainer.UpdateConfig{Resources: resource}
if err := reSetContainer(container.ID, node, updateConfig, c.config.Timeout.Realloc); err != nil {
if err := reSetContainer(container.ID, node, updateConfig); err != nil {
log.Errorf("[realloc] update container failed %v", err)
// TODO 这里理论上是可以恢复 CPU 占用表的,一来我们知道新的占用是怎样,二来我们也晓得老的占用是啥样
ch <- &types.ReallocResourceMessage{ContainerID: container.ID, Success: false}
Expand All @@ -268,9 +267,7 @@ func (c *calcium) doReallocContainersWithCPUPrior(
}
}

func reSetContainer(ID string, node *types.Node, config enginecontainer.UpdateConfig, timeout time.Duration) error {
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
_, err := node.Engine.ContainerUpdate(ctx, ID, config)
func reSetContainer(ID string, node *types.Node, config enginecontainer.UpdateConfig) error {
_, err := node.Engine.ContainerUpdate(context.Background(), ID, config)
return err
}
12 changes: 6 additions & 6 deletions cluster/calcium/remove_container.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"
"sync"
"time"

log "github.com/Sirupsen/logrus"
enginetypes "github.com/docker/docker/api/types"
Expand Down Expand Up @@ -141,12 +142,13 @@ func (c *calcium) removeOneContainer(container *types.Container, info enginetype
}()

// before stop
if err := runExec(container.Engine, info, BEFORE_STOP, c.config.Timeout.RemoveContainer); err != nil {
if err := runExec(container.Engine, info, BEFORE_STOP); err != nil {
log.Errorf("Run exec at %s error: %s", BEFORE_STOP, err.Error())
}

// FUXK docker
err = container.Engine.ContainerStop(context.Background(), info.ID, &c.config.Timeout.RemoveContainer)
// 一分钟还停不下来的话, 就好自为之吧
timeout := time.Minute * 2
err = container.Engine.ContainerStop(context.Background(), info.ID, &timeout)
if err != nil {
log.Errorf("Error during ContainerStop: %s", err.Error())
return err
Expand All @@ -156,9 +158,7 @@ func (c *calcium) removeOneContainer(container *types.Container, info enginetype
RemoveVolumes: true,
Force: true,
}
ctxRemove, cancelRemove := context.WithTimeout(context.Background(), c.config.Timeout.RemoveContainer)
defer cancelRemove()
err = container.Engine.ContainerRemove(ctxRemove, info.ID, rmOpts)
err = container.Engine.ContainerRemove(context.Background(), info.ID, rmOpts)
if err != nil {
log.Errorf("Error during ContainerRemove: %s", err.Error())
return err
Expand Down
Loading

0 comments on commit bb1e51d

Please sign in to comment.