From 2f319ab36d5b23730621718e855608b746a96142 Mon Sep 17 00:00:00 2001 From: wrfly Date: Mon, 7 Aug 2017 14:52:24 +0800 Subject: [PATCH] add timeout to context of engine actions fix test [error logs] timeout test --- cluster/calcium/build_image.go | 24 +++-- cluster/calcium/build_image_test.go | 4 +- cluster/calcium/cluster.go | 46 +++++++++- cluster/calcium/create_container.go | 62 ++++++++----- cluster/calcium/create_container_test.go | 111 +++++++++++++---------- cluster/calcium/helper.go | 12 ++- cluster/calcium/image.go | 15 ++- cluster/calcium/meta_test.go | 6 +- cluster/calcium/mock_test.go | 78 ++++++++++++---- cluster/calcium/remove_container.go | 12 ++- cluster/calcium/remove_image.go | 4 +- cluster/calcium/run_and_wait.go | 8 +- cluster/calcium/run_and_wait_test.go | 1 - core.yaml.sample | 10 +- main.go | 4 +- network/calico/plugin.go | 23 +++-- rpc/rpc_test.go | 19 ++-- source/github/manganese.go | 7 +- source/gitlab/cesium.go | 7 +- source/source_test.go | 24 +++-- types/config.go | 44 +++++---- 21 files changed, 345 insertions(+), 176 deletions(-) diff --git a/cluster/calcium/build_image.go b/cluster/calcium/build_image.go index 86cde6352..290f9261f 100644 --- a/cluster/calcium/build_image.go +++ b/cluster/calcium/build_image.go @@ -174,7 +174,10 @@ func (c *calcium) BuildImage(repository, version, uid, artifact string) (chan *t } log.Infof("Building image %v with artifact %v at %v:%v", tag, artifact, buildPodname, node.Name) - resp, err := node.Engine.ImageBuild(context.Background(), buildContext, buildOptions) + + ctx, cancel := context.WithTimeout(context.Background(), c.config.Timeout.BuildImage) + defer cancel() + resp, err := node.Engine.ImageBuild(ctx, buildContext, buildOptions) if err != nil { return ch, err } @@ -196,9 +199,11 @@ func (c *calcium) BuildImage(repository, version, uid, artifact string) (chan *t ch <- message } + ctx, cancel := context.WithTimeout(context.Background(), c.config.Timeout.BuildImage) + defer cancel() // About this "Khadgar", https://github.com/docker/docker/issues/10983#issuecomment-85892396 // Just because Ben Schnetzer's cute Khadgar... - rc, err := node.Engine.ImagePush(context.Background(), tag, enginetypes.ImagePushOptions{RegistryAuth: "Khadgar"}) + rc, err := node.Engine.ImagePush(ctx, tag, enginetypes.ImagePushOptions{RegistryAuth: "Khadgar"}) if err != nil { ch <- makeErrorBuildImageMessage(err) return @@ -222,10 +227,17 @@ func (c *calcium) BuildImage(repository, version, uid, artifact string) (chan *t // 无论如何都删掉build机器的 // 事实上他不会跟cached pod一样 // 一样就砍死 - go node.Engine.ImageRemove(context.Background(), tag, enginetypes.ImageRemoveOptions{ - Force: false, - PruneChildren: true, - }) + go func() { + ctx, cancel := context.WithTimeout(context.Background(), c.config.Timeout.BuildImage) + defer cancel() + _, err := node.Engine.ImageRemove(ctx, tag, enginetypes.ImageRemoveOptions{ + Force: false, + PruneChildren: true, + }) + if err != nil { + log.Errorf("Remove image error: %s", err) + } + }() ch <- &types.BuildImageMessage{Status: "finished", Progress: tag} }() diff --git a/cluster/calcium/build_image_test.go b/cluster/calcium/build_image_test.go index 79a6e6e20..539aba6ca 100644 --- a/cluster/calcium/build_image_test.go +++ b/cluster/calcium/build_image_test.go @@ -17,7 +17,7 @@ import ( func TestGetRandomNode(t *testing.T) { store := &mockstore.MockStore{} config := types.Config{} - c := &calcium{store: store, config: config, scheduler: simplescheduler.New(), network: calico.New(), source: gitlab.New(config.Git)} + c := &calcium{store: store, config: config, scheduler: simplescheduler.New(), network: calico.New(config), source: gitlab.New(config)} n1 := &types.Node{Name: "node1", Podname: "podname", Endpoint: "tcp://10.0.0.1:2376", CPU: types.CPUMap{"0": 10, "1": 10}, Available: true} n2 := &types.Node{Name: "node2", Podname: "podname", Endpoint: "tcp://10.0.0.2:2376", CPU: types.CPUMap{"0": 10, "1": 10}, Available: true} @@ -34,7 +34,7 @@ func TestGetRandomNode(t *testing.T) { func TestGetRandomNodeFail(t *testing.T) { store := &mockstore.MockStore{} config := types.Config{} - c := &calcium{store: store, config: config, scheduler: simplescheduler.New(), network: calico.New(), source: gitlab.New(config.Git)} + c := &calcium{store: store, config: config, scheduler: simplescheduler.New(), network: calico.New(config), source: gitlab.New(config)} n1 := &types.Node{Name: "node1", Podname: "podname", Endpoint: "tcp://10.0.0.1:2376", CPU: types.CPUMap{"0": 10, "1": 10}, Available: false} n2 := &types.Node{Name: "node2", Podname: "podname", Endpoint: "tcp://10.0.0.2:2376", CPU: types.CPUMap{"0": 10, "1": 10}, Available: false} diff --git a/cluster/calcium/cluster.go b/cluster/calcium/cluster.go index ad1b0f2e7..27ab53a97 100644 --- a/cluster/calcium/cluster.go +++ b/cluster/calcium/cluster.go @@ -3,6 +3,7 @@ package calcium import ( "fmt" "strings" + "time" "gitlab.ricebook.net/platform/core/network" "gitlab.ricebook.net/platform/core/network/calico" @@ -33,26 +34,32 @@ const ( // New returns a new cluster config func New(config types.Config) (*calcium, error) { - var err error + // set timeout config + config = setTimeout(config) + + // set store store, err := etcdstore.New(config) if err != nil { return nil, err } + // set scheduler scheduler, err := complexscheduler.New(config) if err != nil { return nil, err } - titanium := calico.New() + // set network + titanium := calico.New(config) + + // set scm var scm source.Source scmtype := strings.ToLower(config.Git.SCMType) - switch scmtype { case GITLAB: - scm = gitlab.New(config.Git) + scm = gitlab.New(config) case GITHUB: - scm = github.New(config.Git) + scm = github.New(config) default: return nil, fmt.Errorf("Unkonwn SCM type: %s", config.Git.SCMType) } @@ -60,6 +67,35 @@ func New(config types.Config) (*calcium, error) { return &calcium{store: store, config: config, scheduler: scheduler, network: titanium, source: scm}, nil } +// SetStore 用于在单元测试中更改etcd store为mock store func (c *calcium) SetStore(s store.Store) { c.store = s } + +func setTimeout(config types.Config) types.Config { + ct := config.Timeout + + ct.Common *= time.Second + c := ct.Common + if ct.Backup *= time.Second; ct.Backup == 0 { + ct.Backup = c + } + if ct.BuildImage *= time.Second; ct.BuildImage == 0 { + ct.BuildImage = c + } + if ct.CreateContainer *= time.Second; ct.CreateContainer == 0 { + ct.CreateContainer = c + } + if ct.RemoveContainer *= time.Second; ct.RemoveContainer == 0 { + ct.RemoveContainer = c + } + if ct.RemoveImage *= time.Second; ct.RemoveImage == 0 { + ct.RemoveImage = c + } + if ct.RunAndWait *= time.Second; ct.RunAndWait == 0 { + ct.RunAndWait = c + } + + config.Timeout = ct + return config +} diff --git a/cluster/calcium/create_container.go b/cluster/calcium/create_container.go index 4a7f777df..662bee059 100644 --- a/cluster/calcium/create_container.go +++ b/cluster/calcium/create_container.go @@ -7,6 +7,7 @@ import ( "strconv" "strings" "sync" + "time" log "github.com/Sirupsen/logrus" enginetypes "github.com/docker/docker/api/types" @@ -81,7 +82,9 @@ func (c *calcium) createContainerWithMemoryPrior(specs types.Specs, opts *types. func (c *calcium) removeMemoryPodFailedContainer(id string, node *types.Node, nodeInfo types.NodeInfo, opts *types.DeployOptions) { defer c.store.UpdateNodeMem(opts.Podname, nodeInfo.Name, opts.Memory, "+") - if err := node.Engine.ContainerRemove(context.Background(), id, enginetypes.ContainerRemoveOptions{}); err != nil { + ctx, cancel := context.WithTimeout(context.Background(), c.config.Timeout.CreateContainer) + defer cancel() + if err := node.Engine.ContainerRemove(ctx, id, enginetypes.ContainerRemoveOptions{}); err != nil { log.Errorf("[RemoveMemoryPodFailedContainer] Error during remove failed container %v", err) } } @@ -98,7 +101,10 @@ func (c *calcium) doCreateContainerWithMemoryPrior(nodeInfo types.NodeInfo, spec return ms } - if err := pullImage(node, opts.Image); err != nil { + if err := pullImage(node, opts.Image, c.config.Timeout.CreateContainer); err != nil { + for i := 0; i < nodeInfo.Deploy; i++ { + ms[i].Error = err.Error() + } return ms } @@ -115,7 +121,9 @@ func (c *calcium) doCreateContainerWithMemoryPrior(nodeInfo types.NodeInfo, spec } //create container - container, err := node.Engine.ContainerCreate(context.Background(), config, hostConfig, networkConfig, containerName) + ctx, cancel := context.WithTimeout(context.Background(), c.config.Timeout.CreateContainer) + defer cancel() + container, err := node.Engine.ContainerCreate(ctx, config, hostConfig, networkConfig, containerName) if err != nil { log.Errorf("[CreateContainerWithMemoryPrior] Error during ContainerCreate, %v", err) ms[i].Error = err.Error() @@ -151,8 +159,9 @@ func (c *calcium) doCreateContainerWithMemoryPrior(nodeInfo types.NodeInfo, spec continue } } - - err = node.Engine.ContainerStart(context.Background(), container.ID, enginetypes.ContainerStartOptions{}) + ctxStart, cancelStart := context.WithTimeout(context.Background(), c.config.Timeout.CreateContainer) + defer cancelStart() + err = node.Engine.ContainerStart(ctxStart, container.ID, enginetypes.ContainerStartOptions{}) if err != nil { log.Errorf("[CreateContainerWithMemoryPrior] Error during ContainerStart, %v", err) ms[i].Error = err.Error() @@ -163,8 +172,9 @@ func (c *calcium) doCreateContainerWithMemoryPrior(nodeInfo types.NodeInfo, spec // TODO // if network manager uses our own, then connect must be called after container starts // here - - info, err := node.Engine.ContainerInspect(context.Background(), container.ID) + ctxInspect, cancelInspect := context.WithTimeout(context.Background(), c.config.Timeout.CreateContainer) + defer cancelInspect() + info, err := node.Engine.ContainerInspect(ctxInspect, container.ID) if err != nil { log.Errorf("[CreateContainerWithMemoryPrior] Error during ContainerInspect, %v", err) ms[i].Error = err.Error() @@ -174,7 +184,7 @@ func (c *calcium) doCreateContainerWithMemoryPrior(nodeInfo types.NodeInfo, spec ms[i].ContainerID = info.ID // after start - if err := runExec(node.Engine, info, AFTER_START); err != nil { + if err := runExec(node.Engine, info, AFTER_START, c.config.Timeout.CreateContainer); err != nil { log.Errorf("[CreateContainerWithMemoryPrior] Run exec at %s error: %v", AFTER_START, err) } @@ -242,7 +252,9 @@ func (c *calcium) createContainerWithCPUPrior(specs types.Specs, opts *types.Dep func (c *calcium) removeCPUPodFailedContainer(id string, node *types.Node, quota types.CPUMap) { defer c.releaseQuota(node, quota) - if err := node.Engine.ContainerRemove(context.Background(), id, enginetypes.ContainerRemoveOptions{}); err != nil { + ctx, cancel := context.WithTimeout(context.Background(), c.config.Timeout.CreateContainer) + defer cancel() + if err := node.Engine.ContainerRemove(ctx, id, enginetypes.ContainerRemoveOptions{}); err != nil { log.Errorf("[RemoveCPUPodFailedContainer] Error during remove failed container %v", err) } } @@ -259,7 +271,10 @@ func (c *calcium) doCreateContainerWithCPUPrior(nodeName string, cpuMap []types. return ms } - if err := pullImage(node, opts.Image); err != nil { + if err := pullImage(node, opts.Image, c.config.Timeout.CreateContainer); err != nil { + for i := 0; i < len(ms); i++ { + ms[i].Error = err.Error() + } return ms } @@ -277,7 +292,9 @@ func (c *calcium) doCreateContainerWithCPUPrior(nodeName string, cpuMap []types. } // create container - container, err := node.Engine.ContainerCreate(context.Background(), config, hostConfig, networkConfig, containerName) + ctxCreate, cancelCreate := context.WithTimeout(context.Background(), c.config.Timeout.CreateContainer) + defer cancelCreate() + container, err := node.Engine.ContainerCreate(ctxCreate, config, hostConfig, networkConfig, containerName) if err != nil { log.Errorf("[CreateContainerWithCPUPrior] Error when creating container, %v", err) ms[i].Error = err.Error() @@ -315,8 +332,9 @@ func (c *calcium) doCreateContainerWithCPUPrior(nodeName string, cpuMap []types. continue } } - - err = node.Engine.ContainerStart(context.Background(), container.ID, enginetypes.ContainerStartOptions{}) + ctxStart, cancelStart := context.WithTimeout(context.Background(), c.config.Timeout.CreateContainer) + defer cancelStart() + err = node.Engine.ContainerStart(ctxStart, container.ID, enginetypes.ContainerStartOptions{}) if err != nil { log.Errorf("[CreateContainerWithCPUPrior] Error when starting container, %v", err) ms[i].Error = err.Error() @@ -327,8 +345,9 @@ func (c *calcium) doCreateContainerWithCPUPrior(nodeName string, cpuMap []types. // TODO // if network manager uses our own, then connect must be called after container starts // here - - info, err := node.Engine.ContainerInspect(context.Background(), container.ID) + ctxInspect, cancelInspect := context.WithTimeout(context.Background(), c.config.Timeout.CreateContainer) + defer cancelInspect() + info, err := node.Engine.ContainerInspect(ctxInspect, container.ID) if err != nil { log.Errorf("[CreateContainerWithCPUPrior] Error when inspecting container, %v", err) ms[i].Error = err.Error() @@ -338,7 +357,7 @@ func (c *calcium) doCreateContainerWithCPUPrior(nodeName string, cpuMap []types. ms[i].ContainerID = info.ID // after start - if err := runExec(node.Engine, info, AFTER_START); err != nil { + if err := runExec(node.Engine, info, AFTER_START, c.config.Timeout.CreateContainer); err != nil { log.Errorf("[CreateContainerWithCPUPrior] Run exec at %s error: %v", AFTER_START, err) } @@ -564,18 +583,19 @@ func (c *calcium) makeContainerOptions(index int, quota map[string]int, specs ty // Pull an image // Blocks until it finishes. -func pullImage(node *types.Node, image string) error { +func pullImage(node *types.Node, image string, timeout time.Duration) error { log.Debugf("Pulling image %s", image) if image == "" { return fmt.Errorf("Goddamn empty image, WTF?") } - - resp, err := node.Engine.ImagePull(context.Background(), image, enginetypes.ImagePullOptions{}) + ctx, cancel := context.WithTimeout(context.Background(), timeout) + defer cancel() + outStream, err := node.Engine.ImagePull(ctx, image, enginetypes.ImagePullOptions{}) if err != nil { log.Errorf("Error during pulling image %s: %v", image, err) return err } - ensureReaderClosed(resp) + ensureReaderClosed(outStream) log.Debugf("Done pulling image %s", image) - return nil + return ctx.Err() } diff --git a/cluster/calcium/create_container_test.go b/cluster/calcium/create_container_test.go index bd9bc4359..cbad10ece 100644 --- a/cluster/calcium/create_container_test.go +++ b/cluster/calcium/create_container_test.go @@ -3,31 +3,16 @@ package calcium import ( "fmt" "testing" - - "github.com/stretchr/testify/mock" + "time" "github.com/docker/docker/client" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" "gitlab.ricebook.net/platform/core/types" ) -func TestPullImage(t *testing.T) { - initMockConfig() - - nodes, err := mockc.store.GetAllNodes() - if err != nil || len(nodes) == 0 { - t.Fatal(err) - } - - if err := pullImage(nodes[0], image); err != nil { - t.Fatal(err) - } -} - -func TestCreateContainerWithMemPrior(t *testing.T) { - initMockConfig() - - specs := types.Specs{ +var ( + specs = types.Specs{ Appname: "root", Entrypoints: map[string]types.Entrypoint{ "test": types.Entrypoint{ @@ -41,7 +26,7 @@ func TestCreateContainerWithMemPrior(t *testing.T) { Build: []string{""}, Base: image, } - opts := &types.DeployOptions{ + opts = &types.DeployOptions{ Appname: "root", Image: image, Podname: podname, @@ -50,9 +35,26 @@ func TestCreateContainerWithMemPrior(t *testing.T) { Memory: 268435456, CPUQuota: 1, } +) + +func TestPullImage(t *testing.T) { + initMockConfig() + + nodes, err := mockc.store.GetAllNodes() + if err != nil || len(nodes) == 0 { + t.Fatal(err) + } + + if err := pullImage(nodes[0], image, 5*time.Second); err != nil { + t.Fatal(err) + } +} + +func TestCreateContainerWithMemPrior(t *testing.T) { + initMockConfig() // Create Container with memory prior - t.Log("Create containers with memory prior") + testlogF("Create containers with memory prior") createCh, err := mockc.createContainerWithMemoryPrior(specs, opts) assert.NoError(t, err) ids := []string{} @@ -76,7 +78,7 @@ func TestCreateContainerWithMemPrior(t *testing.T) { mockStore.On("GetContainers", ids).Return(&cs, nil) // Remove Container - t.Log("Remove containers") + testlogF("Remove containers") removeCh, err := mockc.RemoveContainer(ids) assert.NoError(t, err) for msg := range removeCh { @@ -102,37 +104,13 @@ func TestClean(t *testing.T) { func TestCreateContainerWithCPUPrior(t *testing.T) { initMockConfig() - specs := types.Specs{ - Appname: "root", - Entrypoints: map[string]types.Entrypoint{ - "test": types.Entrypoint{ - Command: "sleep 9999", - Ports: []types.Port{"6006/tcp"}, - HealthCheckPort: 6006, - HealthCheckUrl: "", - HealthCheckExpectedCode: 200, - }, - }, - Build: []string{""}, - Base: image, - } - opts := &types.DeployOptions{ - Appname: "root", - Image: image, - Podname: podname, - Entrypoint: "test", - Count: 3, - Memory: 268435456, - CPUQuota: 1, - } - // update node mockStore.On("UpdateNode", mock.MatchedBy(func(input *types.Node) bool { return true })).Return(nil) // Create Container with memory prior - t.Log("Create containers with memory prior") + testlogF("Create containers with memory prior") createCh, err := mockc.createContainerWithCPUPrior(specs, opts) assert.NoError(t, err) ids := []string{} @@ -142,3 +120,42 @@ func TestCreateContainerWithCPUPrior(t *testing.T) { fmt.Printf("Get Container ID: %s\n", msg.ContainerID) } } + +func TestCreateContainerMemTimeoutError(t *testing.T) { + initMockConfig() + mockTimeoutError = true + + // update node + mockStore.On("UpdateNode", mock.MatchedBy(func(input *types.Node) bool { + return true + })).Return(nil) + + // Create Container with memory prior + testlogF("Create containers with memory prior") + createCh, err := mockc.createContainerWithMemoryPrior(specs, opts) + assert.NoError(t, err) + for msg := range createCh { + assert.False(t, msg.Success) + assert.Contains(t, msg.Error, "context deadline exceeded") + } + +} + +func TestCreateContainerCPUTimeoutError(t *testing.T) { + initMockConfig() + mockTimeoutError = true + + // update node + mockStore.On("UpdateNode", mock.MatchedBy(func(input *types.Node) bool { + return true + })).Return(nil) + + // Create Container with cpu prior + testlogF("Create containers with cpu prior") + createCh, err := mockc.createContainerWithCPUPrior(specs, opts) + assert.NoError(t, err) + for msg := range createCh { + assert.False(t, msg.Success) + assert.Contains(t, msg.Error, "context deadline exceeded") + } +} diff --git a/cluster/calcium/helper.go b/cluster/calcium/helper.go index 1d500f89c..535c03918 100644 --- a/cluster/calcium/helper.go +++ b/cluster/calcium/helper.go @@ -8,6 +8,7 @@ import ( "os" "path/filepath" "strings" + "time" log "github.com/Sirupsen/logrus" enginetypes "github.com/docker/docker/api/types" @@ -144,7 +145,7 @@ func makeMountPaths(specs types.Specs, config types.Config) ([]string, map[strin // 跑存在labels里的exec // 为什么要存labels呢, 因为下线容器的时候根本不知道entrypoint是啥 -func runExec(client *engineapi.Client, container enginetypes.ContainerJSON, label string) error { +func runExec(client *engineapi.Client, container enginetypes.ContainerJSON, label string, timeout time.Duration) error { cmd, ok := container.Config.Labels[label] if !ok || cmd == "" { log.Debugf("No %s found in container %s", label, container.ID) @@ -153,11 +154,14 @@ func runExec(client *engineapi.Client, container enginetypes.ContainerJSON, labe cmds := utils.MakeCommandLineArgs(cmd) execConfig := enginetypes.ExecConfig{User: container.Config.User, Cmd: cmds} - resp, err := client.ContainerExecCreate(context.Background(), container.ID, execConfig) + ctxExec, cancelCreate := context.WithTimeout(context.Background(), timeout) + defer cancelCreate() + resp, err := client.ContainerExecCreate(ctxExec, container.ID, execConfig) if err != nil { log.Errorf("Error during runExec: %v", err) return err } - - return client.ContainerExecStart(context.Background(), resp.ID, enginetypes.ExecStartCheck{}) + ctxStart, cancel := context.WithTimeout(context.Background(), timeout) + defer cancel() + return client.ContainerExecStart(ctxStart, resp.ID, enginetypes.ExecStartCheck{}) } diff --git a/cluster/calcium/image.go b/cluster/calcium/image.go index a5178a68c..1ded6ad08 100644 --- a/cluster/calcium/image.go +++ b/cluster/calcium/image.go @@ -4,6 +4,7 @@ import ( "context" "strings" "sync" + "time" log "github.com/Sirupsen/logrus" enginetypes "github.com/docker/docker/api/types" @@ -35,7 +36,7 @@ func (c *calcium) cacheImage(podname, image string) error { wg.Add(1) go func(node *types.Node) { defer wg.Done() - pullImage(node, image) + pullImage(node, image, c.config.Timeout.CreateContainer) }(node) } @@ -62,7 +63,7 @@ func (c *calcium) cleanImage(podname, image string) error { wg.Add(1) go func(node *types.Node) { defer wg.Done() - if err := cleanImageOnNode(node, image, c.config.ImageCache); err != nil { + if err := cleanImageOnNode(node, image, c.config.ImageCache, c.config.Timeout.RemoveImage); err != nil { log.Errorf("cleanImageOnNode error: %s", err) } }(node) @@ -89,12 +90,14 @@ func (x imageList) Less(i, j int) bool { return x[i].Created > x[j].Created } // 清理一个node上的这个image // 只清理同名字不同tag的 // 并且保留最新的两个 -func cleanImageOnNode(node *types.Node, image string, count int) error { +func cleanImageOnNode(node *types.Node, image string, count int, timeout time.Duration) error { log.Debugf("[cleanImageOnNode] node: %s, image: %s", node.Name, strings.Split(image, ":")[0]) imgListFilter := filters.NewArgs() image = normalizeImage(image) imgListFilter.Add("reference", image) // 相同repo的image - images, err := node.Engine.ImageList(context.Background(), enginetypes.ImageListOptions{Filters: imgListFilter}) + ctxList, cancelList := context.WithTimeout(context.Background(), timeout) + defer cancelList() + images, err := node.Engine.ImageList(ctxList, enginetypes.ImageListOptions{Filters: imgListFilter}) if err != nil { return err } @@ -107,13 +110,15 @@ func cleanImageOnNode(node *types.Node, image string, count int) error { log.Debugf("Delete Images: %v", images) for _, image := range images { - _, err := node.Engine.ImageRemove(context.Background(), image.ID, enginetypes.ImageRemoveOptions{ + ctx, cancel := context.WithTimeout(context.Background(), timeout) + _, err := node.Engine.ImageRemove(ctx, image.ID, enginetypes.ImageRemoveOptions{ Force: false, PruneChildren: true, }) if err != nil { log.Errorf("[cleanImageOnNode] Node %s ImageRemove error: %s, imageID: %s", node.Name, err, image.ID) } + cancel() } return nil } diff --git a/cluster/calcium/meta_test.go b/cluster/calcium/meta_test.go index 57fd802a2..762d7bc1a 100644 --- a/cluster/calcium/meta_test.go +++ b/cluster/calcium/meta_test.go @@ -15,7 +15,7 @@ import ( func TestListPods(t *testing.T) { store := &mockstore.MockStore{} config := types.Config{} - c := &calcium{store: store, config: config, scheduler: simplescheduler.New(), network: calico.New(), source: gitlab.New(config.Git)} + c := &calcium{store: store, config: config, scheduler: simplescheduler.New(), network: calico.New(config), source: gitlab.New(config)} store.On("GetAllPods").Return([]*types.Pod{ &types.Pod{Name: "pod1", Desc: "desc1"}, @@ -36,7 +36,7 @@ func TestListPods(t *testing.T) { func TestAddPod(t *testing.T) { store := &mockstore.MockStore{} config := types.Config{} - c := &calcium{store: store, config: config, scheduler: simplescheduler.New(), network: calico.New(), source: gitlab.New(config.Git)} + c := &calcium{store: store, config: config, scheduler: simplescheduler.New(), network: calico.New(config), source: gitlab.New(config)} store.On("AddPod", "pod1", "desc1").Return(&types.Pod{Name: "pod1", Desc: "desc1"}, nil) store.On("AddPod", "pod2", "desc2").Return(nil, fmt.Errorf("Etcd Error")) @@ -54,7 +54,7 @@ func TestAddPod(t *testing.T) { func TestGetPods(t *testing.T) { store := &mockstore.MockStore{} config := types.Config{} - c := &calcium{store: store, config: config, scheduler: simplescheduler.New(), network: calico.New(), source: gitlab.New(config.Git)} + c := &calcium{store: store, config: config, scheduler: simplescheduler.New(), network: calico.New(config), source: gitlab.New(config)} store.On("GetPod", "pod1").Return(&types.Pod{Name: "pod1", Desc: "desc1"}, nil).Once() store.On("GetPod", "pod2").Return(nil, fmt.Errorf("Not found")).Once() diff --git a/cluster/calcium/mock_test.go b/cluster/calcium/mock_test.go index aa315ca10..1f385e2d7 100644 --- a/cluster/calcium/mock_test.go +++ b/cluster/calcium/mock_test.go @@ -2,19 +2,23 @@ package calcium import ( "bytes" + "context" "encoding/base64" "encoding/json" "fmt" + "io" "io/ioutil" "net/http" "runtime" "strings" + "time" "github.com/stretchr/testify/mock" "github.com/docker/docker/api/types" "github.com/docker/docker/api/types/container" "github.com/docker/docker/client" + "github.com/docker/docker/pkg/ioutils" "gitlab.ricebook.net/platform/core/store/mock" coretypes "gitlab.ricebook.net/platform/core/types" "gitlab.ricebook.net/platform/core/utils" @@ -39,10 +43,14 @@ var ( Hub: "hub.testhub.com", HubPrefix: "apps", }, + Timeout: coretypes.TimeoutConfig{ + Common: 3, + }, } - mockc *calcium - mockStore *mockstore.MockStore - err error + mockc *calcium + mockStore *mockstore.MockStore + err error + mockTimeoutError bool ) func testlogF(format interface{}, a ...interface{}) { @@ -68,6 +76,35 @@ func mockContainerID() string { return utils.RandomString(64) } +func mockReadCloser(ctx context.Context) io.ReadCloser { + pr, pw := io.Pipe() + w := ioutils.NewWriteFlusher(pw) + msgChan := make(chan string) + + go func() { + for { + msgChan <- "pulling image...\n" + time.Sleep(1000 * time.Millisecond) + } + }() + go func() { + for { + select { + case <-ctx.Done(): + testlogF("Error!! Context canceld!!!") + w.Close() + pw.Close() + pr.Close() + return + case msg := <-msgChan: + w.Write([]byte(msg)) + } + } + }() + + return pr +} + func mockDockerDoer(r *http.Request) (*http.Response, error) { var b []byte prefix := fmt.Sprintf("/%s", APIVersion) @@ -105,6 +142,13 @@ func mockDockerDoer(r *http.Request) (*http.Response, error) { tag := query.Get("tag") testlogF("mock docker create image: %s:%s", fromImage, tag) b = []byte("body") + if mockTimeoutError { + ctx := r.Context() + return &http.Response{ + StatusCode: http.StatusOK, + Body: mockReadCloser(ctx), + }, nil + } case "/images/json": // docker images testlogF("mock docker list images") b, _ = json.Marshal([]types.ImageSummary{ @@ -114,6 +158,16 @@ func mockDockerDoer(r *http.Request) (*http.Response, error) { }) case fmt.Sprintf("/images/%s", image): // docker images testlogF("mock docker remove image") + b, _ = json.Marshal([]types.ImageDeleteResponseItem{ + { + Untagged: image, + }, + { + Deleted: image, + }, + }) + case "/images/image_id": // docker images + testlogF("mock docker remove image image_id") b, _ = json.Marshal([]types.ImageDeleteResponseItem{ { Untagged: "image_id1", @@ -288,22 +342,6 @@ func initMockConfig() { mockStore.On("DeletePod", mockStringType, mock.AnythingOfType("bool")).Return(nil) - nodeinfo := []coretypes.NodeInfo{ - coretypes.NodeInfo{ - CPUAndMem: coretypes.CPUAndMem{CpuMap: coretypes.CPUMap{"0": 10, "1": 10, "2": 10, "3": 10}, MemCap: 8589934592}, - Name: "node1", - CPURate: 400000, - Capacity: 0, - Count: 0, - Deploy: 0}, - coretypes.NodeInfo{ - CPUAndMem: coretypes.CPUAndMem{CpuMap: coretypes.CPUMap{"0": 10, "1": 10, "2": 10, "3": 10}, MemCap: 8589934592}, - Name: "node2", - CPURate: 400000, - Capacity: 0, - Count: 0, - Deploy: 0}, - } deployNodeInfo := []coretypes.NodeInfo{ coretypes.NodeInfo{ Name: "node1", @@ -331,7 +369,7 @@ func initMockConfig() { } // make plan - mockStore.On("MakeDeployStatus", opts, nodeinfo).Return(deployNodeInfo, nil) + mockStore.On("MakeDeployStatus", opts, mock.Anything).Return(deployNodeInfo, nil) // GetContainer rContainer := &coretypes.Container{ diff --git a/cluster/calcium/remove_container.go b/cluster/calcium/remove_container.go index 3b615152d..e9b8d8a5c 100644 --- a/cluster/calcium/remove_container.go +++ b/cluster/calcium/remove_container.go @@ -143,12 +143,14 @@ func (c *calcium) removeOneContainer(container *types.Container, info enginetype }() // before stop - if err := runExec(container.Engine, info, BEFORE_STOP); err != nil { + if err := runExec(container.Engine, info, BEFORE_STOP, c.config.Timeout.RemoveContainer); err != nil { log.Errorf("Run exec at %s error: %s", BEFORE_STOP, err.Error()) } - timeout := 5 * time.Second - err = container.Engine.ContainerStop(context.Background(), info.ID, &timeout) + stopTimeout := 5 * time.Second + ctxStop, cancelStop := context.WithTimeout(context.Background(), stopTimeout) + defer cancelStop() + err = container.Engine.ContainerStop(ctxStop, info.ID, &stopTimeout) if err != nil { log.Errorf("Error during ContainerStop: %s", err.Error()) return err @@ -158,7 +160,9 @@ func (c *calcium) removeOneContainer(container *types.Container, info enginetype RemoveVolumes: true, Force: true, } - err = container.Engine.ContainerRemove(context.Background(), info.ID, rmOpts) + ctxRemove, cancelRemove := context.WithTimeout(context.Background(), c.config.Timeout.RemoveContainer) + defer cancelRemove() + err = container.Engine.ContainerRemove(ctxRemove, info.ID, rmOpts) if err != nil { log.Errorf("Error during ContainerRemove: %s", err.Error()) return err diff --git a/cluster/calcium/remove_image.go b/cluster/calcium/remove_image.go index d1ddd33e6..d9f211aea 100644 --- a/cluster/calcium/remove_image.go +++ b/cluster/calcium/remove_image.go @@ -35,7 +35,8 @@ func (c *calcium) RemoveImage(podname, nodename string, images []string) (chan * messages := []string{} success := true - ms, err := node.Engine.ImageRemove(context.Background(), image, opts) + ctx, cancel := context.WithTimeout(context.Background(), c.config.Timeout.RemoveImage) + ms, err := node.Engine.ImageRemove(ctx, image, opts) if err != nil { success = false messages = append(messages, err.Error()) @@ -49,6 +50,7 @@ func (c *calcium) RemoveImage(podname, nodename string, images []string) (chan * } } } + cancel() ch <- &types.RemoveImageMessage{ Image: image, Success: success, diff --git a/cluster/calcium/run_and_wait.go b/cluster/calcium/run_and_wait.go index 11e708af7..83a9c4545 100644 --- a/cluster/calcium/run_and_wait.go +++ b/cluster/calcium/run_and_wait.go @@ -24,9 +24,9 @@ func (c *calcium) RunAndWait(specs types.Specs, opts *types.DeployOptions, stdin // 默认给出1200秒的超时时间吧 // 没别的地方好传了, 不如放这里好了, 不需要用的就默认0或者不传 - waitTimeout := entry.RunAndWaitTimeout - if waitTimeout == 0 { - waitTimeout = c.config.RunAndWaitTimeout + waitTimeout := c.config.Timeout.RunAndWait + if entry.RunAndWaitTimeout != 0 { + waitTimeout = time.Duration(entry.RunAndWaitTimeout) * time.Second } // count = 1 && OpenStdin @@ -77,7 +77,7 @@ func (c *calcium) RunAndWait(specs types.Specs, opts *types.DeployOptions, stdin defer log.Infof("[RunAndWait] Container %s finished and removed", containerID[:12]) defer c.removeContainerSync([]string{containerID}) - ctx, cancel := context.WithTimeout(context.Background(), time.Duration(waitTimeout)*time.Second) + ctx, cancel := context.WithTimeout(context.Background(), waitTimeout) defer cancel() resp, err := node.Engine.ContainerLogs(ctx, containerID, logsOpts) if err != nil { diff --git a/cluster/calcium/run_and_wait_test.go b/cluster/calcium/run_and_wait_test.go index b58f5a91b..c4561425d 100644 --- a/cluster/calcium/run_and_wait_test.go +++ b/cluster/calcium/run_and_wait_test.go @@ -23,5 +23,4 @@ func TestRunAndWait(t *testing.T) { _, err = mockc.RunAndWait(specs, &opts, nil) assert.Error(t, err) assert.Contains(t, err.Error(), "`Count` must be 1 if `OpenStdin` is true") - } diff --git a/core.yaml.sample b/core.yaml.sample index 35da75d32..0287a2eef 100644 --- a/core.yaml.sample +++ b/core.yaml.sample @@ -11,7 +11,6 @@ statsd: "statsd2.ricebook.net:8125" zone: "c1" -run_and_wait_timeout: 1200 image_cache: 2 git: @@ -37,3 +36,12 @@ syslog: address: "udp://localhost:5111" facility: "daemon" format: "rfc5424" + +timeout: + run_and_wait: 1200 + build_image: 600 + create_container: 600 + remove_container: 600 + remove_image: 600 + backup: 600 + common: 600 \ No newline at end of file diff --git a/main.go b/main.go index 361391cb1..9ebe5fa2c 100644 --- a/main.go +++ b/main.go @@ -54,8 +54,8 @@ func initConfig(configPath string) (types.Config, error) { return config, err } - if config.RunAndWaitTimeout == 0 { - config.RunAndWaitTimeout = 1200 + if config.Timeout.Common == 0 { + log.Fatal("Common timeout not set, exit") } if config.Docker.APIVersion == "" { diff --git a/network/calico/plugin.go b/network/calico/plugin.go index 5d0db0812..767e87a54 100644 --- a/network/calico/plugin.go +++ b/network/calico/plugin.go @@ -13,7 +13,9 @@ import ( "gitlab.ricebook.net/platform/core/utils" ) -type titanium struct{} +type titanium struct { + Timeout types.TimeoutConfig +} // type of the network manager // if set to "plugin", then it will act like a plugin @@ -54,7 +56,9 @@ func (t *titanium) ConnectToNetwork(ctx context.Context, containerID, networkID, } log.Debugf("Connect %q to %q with IP %q", containerID, networkID, ipv4) - return engine.NetworkConnect(context.Background(), networkID, containerID, config) + ctx, cancel := context.WithTimeout(context.Background(), t.Timeout.Common) + defer cancel() + return engine.NetworkConnect(ctx, networkID, containerID, config) } // disconnect from network @@ -69,7 +73,9 @@ func (t *titanium) DisconnectFromNetwork(ctx context.Context, containerID, netwo } log.Debugf("Disconnect %q from %q", containerID, networkID) - return engine.NetworkDisconnect(context.Background(), networkID, containerID, false) + ctx, cancel := context.WithTimeout(context.Background(), t.Timeout.Common) + defer cancel() + return engine.NetworkDisconnect(ctx, networkID, containerID, false) } // list networks from context @@ -80,11 +86,12 @@ func (t *titanium) ListNetworks(ctx context.Context) ([]*types.Network, error) { return networks, fmt.Errorf("Not actually a `engineapi.Client` for value engine in context") } + ctx, cancel := context.WithTimeout(context.Background(), t.Timeout.Common) + defer cancel() + filters := enginefilters.NewArgs() filters.Add("driver", t.Name()) - ns, err := engine.NetworkList( - context.Background(), - enginetypes.NetworkListOptions{Filters: filters}) + ns, err := engine.NetworkList(ctx, enginetypes.NetworkListOptions{Filters: filters}) if err != nil { return networks, err } @@ -99,6 +106,6 @@ func (t *titanium) ListNetworks(ctx context.Context) ([]*types.Network, error) { return networks, nil } -func New() *titanium { - return &titanium{} +func New(config types.Config) *titanium { + return &titanium{Timeout: config.Timeout} } diff --git a/rpc/rpc_test.go b/rpc/rpc_test.go index 2449b5e90..3d5732b51 100644 --- a/rpc/rpc_test.go +++ b/rpc/rpc_test.go @@ -187,16 +187,15 @@ func TestNodes(t *testing.T) { func initConfig(mStore *mockstore.MockStore) (types.Config, *vibranium) { config := types.Config{ - Bind: ":5001", // HTTP API address - AppDir: "/tmp", // App directory inside container - PermDir: "/tmp", // Permanent dir on host - BackupDir: "/tmp", // Backup dir on host - EtcdMachines: []string{"MOCK"}, // etcd cluster addresses - EtcdLockPrefix: "/eru-core/_lock", // etcd lock prefix, all locks will be created under this dir - ResourceAlloc: "cpu-period", // scheduler or cpu-period TODO give it a good name - Statsd: "localhost:1080", // Statsd host and port - Zone: "c1", // zone for core, e.g. C1, C2 - RunAndWaitTimeout: 1200, // timeout for run and wait + Bind: ":5001", // HTTP API address + AppDir: "/tmp", // App directory inside container + PermDir: "/tmp", // Permanent dir on host + BackupDir: "/tmp", // Backup dir on host + EtcdMachines: []string{"MOCK"}, // etcd cluster addresses + EtcdLockPrefix: "/eru-core/_lock", // etcd lock prefix, all locks will be created under this dir + ResourceAlloc: "cpu-period", // scheduler or cpu-period TODO give it a good name + Statsd: "localhost:1080", // Statsd host and port + Zone: "c1", // zone for core, e.g. C1, C2 Git: types.GitConfig{ SCMType: "gitlab", diff --git a/source/github/manganese.go b/source/github/manganese.go index 131b53afa..31bf9cacd 100644 --- a/source/github/manganese.go +++ b/source/github/manganese.go @@ -7,9 +7,10 @@ import ( "gitlab.ricebook.net/platform/core/types" ) -func New(config types.GitConfig) *common.GitScm { - token := fmt.Sprintf("token %s", config.Token) +func New(config types.Config) *common.GitScm { + gitConfig := config.Git + token := fmt.Sprintf("token %s", gitConfig.Token) authheaders := map[string]string{} authheaders["Authorization"] = token - return &common.GitScm{Config: config, AuthHeaders: authheaders} + return &common.GitScm{Config: gitConfig, AuthHeaders: authheaders} } diff --git a/source/gitlab/cesium.go b/source/gitlab/cesium.go index fc7935ae7..838e6fde0 100644 --- a/source/gitlab/cesium.go +++ b/source/gitlab/cesium.go @@ -5,8 +5,9 @@ import ( "gitlab.ricebook.net/platform/core/types" ) -func New(config types.GitConfig) *common.GitScm { +func New(config types.Config) *common.GitScm { + gitConfig := config.Git authheaders := map[string]string{} - authheaders["PRIVATE-TOKEN"] = config.Token - return &common.GitScm{Config: config, AuthHeaders: authheaders} + authheaders["PRIVATE-TOKEN"] = gitConfig.Token + return &common.GitScm{Config: gitConfig, AuthHeaders: authheaders} } diff --git a/source/source_test.go b/source/source_test.go index ab4a55d69..92ac3899d 100644 --- a/source/source_test.go +++ b/source/source_test.go @@ -19,7 +19,8 @@ import ( var ( repo = "git@github.com:noexist/noexist.git" revision = "6" - config = types.GitConfig{} + config = types.Config{} + gitConfig = types.GitConfig{} artifactURL = "http://127.0.0.1:8088/" ) @@ -50,19 +51,22 @@ func initTest() { prikeyPath.WriteString(prikeyString) defer prikeyPath.Close() - config = types.GitConfig{ - PublicKey: pubkeyPath.Name(), - PrivateKey: prikeyPath.Name(), - Token: "x", + config = types.Config{ + Git: types.GitConfig{ + PublicKey: pubkeyPath.Name(), + PrivateKey: prikeyPath.Name(), + Token: "x", + }, } + gitConfig = config.Git } func TestGitSourceCode(t *testing.T) { initTest() - source := common.GitScm{Config: config} + source := common.GitScm{Config: gitConfig} - defer os.RemoveAll(config.PublicKey) - defer os.RemoveAll(config.PrivateKey) + defer os.RemoveAll(gitConfig.PublicKey) + defer os.RemoveAll(gitConfig.PrivateKey) path, err := ioutil.TempDir(os.TempDir(), "sourcecode-") if err != nil { @@ -111,8 +115,8 @@ func TestGitLabArtifact(t *testing.T) { initTest() source := gitlab.New(config) - defer os.Remove(config.PublicKey) - defer os.Remove(config.PrivateKey) + defer os.Remove(gitConfig.PublicKey) + defer os.Remove(gitConfig.PrivateKey) path, err := ioutil.TempDir(os.TempDir(), "sourcecode-") assert.NoError(t, err) diff --git a/types/config.go b/types/config.go index dd8dcf398..fd410118e 100644 --- a/types/config.go +++ b/types/config.go @@ -1,23 +1,25 @@ package types +import "time" + // Config holds eru-core config type Config struct { - Bind string `yaml:"bind"` // HTTP API address - AppDir string `yaml:"appdir"` // App directory inside container - PermDir string `yaml:"permdir"` // Permanent dir on host - BackupDir string `yaml:"backupdir"` // Backup dir on host - EtcdMachines []string `yaml:"etcd"` // etcd cluster addresses - EtcdLockPrefix string `yaml:"etcd_lock_prefix"` // etcd lock prefix, all locks will be created under this dir - ResourceAlloc string `yaml:"resource_alloc"` // scheduler or cpu-period TODO give it a good name - Statsd string `yaml:"statsd"` // Statsd host and port - Zone string `yaml:"zone"` // zone for core, e.g. C1, C2 - RunAndWaitTimeout int `yaml:"run_and_wait_timeout"` // timeout for run and wait - ImageCache int `yaml:"image_cache"` // cache image count - - Git GitConfig `yaml:"git"` - Docker DockerConfig `yaml:"docker"` - Scheduler SchedConfig `yaml:"scheduler"` - Syslog SyslogConfig `yaml:"syslog"` + Bind string `yaml:"bind"` // HTTP API address + AppDir string `yaml:"appdir"` // App directory inside container + PermDir string `yaml:"permdir"` // Permanent dir on host + BackupDir string `yaml:"backupdir"` // Backup dir on host + EtcdMachines []string `yaml:"etcd"` // etcd cluster addresses + EtcdLockPrefix string `yaml:"etcd_lock_prefix"` // etcd lock prefix, all locks will be created under this dir + ResourceAlloc string `yaml:"resource_alloc"` // scheduler or cpu-period TODO give it a good name + Statsd string `yaml:"statsd"` // Statsd host and port + Zone string `yaml:"zone"` // zone for core, e.g. C1, C2 + ImageCache int `yaml:"image_cache"` // cache image count + + Git GitConfig `yaml:"git"` + Docker DockerConfig `yaml:"docker"` + Scheduler SchedConfig `yaml:"scheduler"` + Syslog SyslogConfig `yaml:"syslog"` + Timeout TimeoutConfig `yaml:"timeout"` } // GitConfig holds eru-core git config @@ -53,3 +55,13 @@ type SyslogConfig struct { Facility string `yaml:"facility"` Format string `yaml:"format"` } + +type TimeoutConfig struct { + RunAndWait time.Duration `yaml:"run_and_wait"` + BuildImage time.Duration `yaml:"build_image"` + CreateContainer time.Duration `yaml:"create_container"` + RemoveContainer time.Duration `yaml:"remove_container"` + RemoveImage time.Duration `yaml:"remove_image"` + Backup time.Duration `yaml:"backup"` + Common time.Duration `yaml:"common"` +}