diff --git a/cluster/calcium/create_container.go b/cluster/calcium/create_container.go index 09f58f0bf..83c65f93f 100644 --- a/cluster/calcium/create_container.go +++ b/cluster/calcium/create_container.go @@ -19,6 +19,11 @@ import ( "golang.org/x/net/context" ) +var ( + MEMORY_PRIOR = "cpuperiod" + CPU_PRIOR = "scheduler" +) + // Create Container // Use specs and options to create // TODO what about networks? @@ -28,12 +33,12 @@ func (c *calcium) CreateContainer(specs types.Specs, opts *types.DeployOptions) return nil, err } if pod.Scheduler == "CPU" { - return c.createContainerWithScheduler(specs, opts) + return c.createContainerWithCPUPrior(specs, opts) } - return c.createContainerWithCPUPeriod(specs, opts) + return c.createContainerWithMemoryPrior(specs, opts) } -func (c *calcium) createContainerWithCPUPeriod(specs types.Specs, opts *types.DeployOptions) (chan *types.CreateContainerMessage, error) { +func (c *calcium) createContainerWithMemoryPrior(specs types.Specs, opts *types.DeployOptions) (chan *types.CreateContainerMessage, error) { ch := make(chan *types.CreateContainerMessage) if opts.Memory < 4194304 { // 4194304 Byte = 4 MB, docker 创建容器的内存最低标准 return ch, fmt.Errorf("Minimum memory limit allowed is 4MB") @@ -42,52 +47,51 @@ func (c *calcium) createContainerWithCPUPeriod(specs types.Specs, opts *types.De log.Debugf("Deploy options: %v", opts) log.Debugf("Deploy specs: %v", specs) - // 计算当前 app 部署情况的时候需要保证同一时间只有这个 app 的这个 entrypoint 在跑 + // TODO RFC 计算当前 app 部署情况的时候需要保证同一时间只有这个 app 的这个 entrypoint 在跑 // 因此需要在这里加个全局锁,直到部署完毕才释放 - lock, err := c.Lock(fmt.Sprintf("%s_%s", opts.Appname, opts.Entrypoint), 3600) - if err != nil { - return ch, err - } - plan, err := c.AllocMemoryPodResource(opts) + nodesInfo, err := c.allocMemoryPodResource(opts) if err != nil { return ch, err } - go func(specs types.Specs, plan map[string]int, opts *types.DeployOptions) { + go func() { defer close(ch) - // TODO 这个 LOCK 得有点长了…… - defer lock.Unlock() - wg := sync.WaitGroup{} - wg.Add(len(plan)) - count := 0 - for nodename, num := range plan { - //log.Debugf("Outside doCreateContainerWithCPUPeriod: nodename %s, num %d, specs %v, opts %v", nodename, num, specs, opts) - go func(nodename string, count, num int, opts *types.DeployOptions) { + wg.Add(len(nodesInfo)) + var count int64 = 0 + for _, nodeInfo := range nodesInfo { + go func(nodeInfo types.NodeInfo, count int64) { defer wg.Done() - //log.Debugf("Inside doCreateContainerWithCPUPeriod: nodename %s, num %d, specs %v, opts %v", nodename, num, specs, opts) - for _, m := range c.doCreateContainerWithCPUPeriod(nodename, count, num, opts.CPUQuota, specs, opts) { + for _, m := range c.doCreateContainerWithMemoryPrior(nodeInfo, specs, opts, count) { ch <- m } - }(nodename, count, num, opts) - count = count + num + }(nodeInfo, count) + count += nodeInfo.Deploy } wg.Wait() // 第一次部署的时候就去cache下镜像吧 go c.cacheImage(opts.Podname, opts.Image) - }(specs, plan, opts) + }() return ch, nil } -func (c *calcium) doCreateContainerWithCPUPeriod(nodename string, count, connum int, quota float64, specs types.Specs, opts *types.DeployOptions) []*types.CreateContainerMessage { - ms := make([]*types.CreateContainerMessage, connum) - for i := 0; i < len(ms); i++ { +func (c *calcium) removeMemoryPodFailedContainer(id string, node *types.Node, nodeInfo types.NodeInfo, opts *types.DeployOptions) { + defer c.store.UpdateNodeMem(opts.Podname, nodeInfo.Name, opts.Memory, "+") + if err := node.Engine.ContainerRemove(context.Background(), id, enginetypes.ContainerRemoveOptions{}); err != nil { + log.Errorf("[RemoveMemoryPodFailedContainer] Error during remove failed container %v", err) + } +} + +func (c *calcium) doCreateContainerWithMemoryPrior(nodeInfo types.NodeInfo, specs types.Specs, opts *types.DeployOptions, count int64) []*types.CreateContainerMessage { + ms := make([]*types.CreateContainerMessage, nodeInfo.Deploy) + var i int64 + for i = 0; i < nodeInfo.Deploy; i++ { ms[i] = &types.CreateContainerMessage{} } - node, err := c.GetNode(opts.Podname, nodename) + node, err := c.GetNode(opts.Podname, nodeInfo.Name) if err != nil { return ms } @@ -96,24 +100,24 @@ func (c *calcium) doCreateContainerWithCPUPeriod(nodename string, count, connum return ms } - for i := 0; i < connum; i++ { - config, hostConfig, networkConfig, containerName, err := c.makeContainerOptions(i+count, nil, specs, opts, "cpuperiod", node) + for i = 0; i < nodeInfo.Deploy; i++ { + config, hostConfig, networkConfig, containerName, err := c.makeContainerOptions(i+count, nil, specs, opts, node, MEMORY_PRIOR) ms[i].ContainerName = containerName ms[i].Podname = opts.Podname ms[i].Nodename = node.Name ms[i].Memory = opts.Memory if err != nil { - c.store.UpdateNodeMem(opts.Podname, nodename, opts.Memory, "+") // 创建容器失败就要把资源还回去对不对? ms[i].Error = err.Error() + c.store.UpdateNodeMem(opts.Podname, nodeInfo.Name, opts.Memory, "+") // 创建容器失败就要把资源还回去对不对? continue } //create container container, err := node.Engine.ContainerCreate(context.Background(), config, hostConfig, networkConfig, containerName) if err != nil { - log.Errorf("Error during ContainerCreate, %v", err) + log.Errorf("[CreateContainerWithMemoryPrior] Error during ContainerCreate, %v", err) ms[i].Error = err.Error() - c.store.UpdateNodeMem(opts.Podname, nodename, opts.Memory, "+") + c.store.UpdateNodeMem(opts.Podname, nodeInfo.Name, opts.Memory, "+") continue } @@ -126,33 +130,31 @@ func (c *calcium) doCreateContainerWithCPUPeriod(nodename string, count, connum // need to ensure all networks are correctly connected for networkID, ipv4 := range opts.Networks { if err = c.network.ConnectToNetwork(ctx, container.ID, networkID, ipv4); err != nil { - c.store.UpdateNodeMem(opts.Podname, nodename, opts.Memory, "+") - log.Errorf("Error during connecting container %q to network %q, %v", container.ID, networkID, err) + log.Errorf("[CreateContainerWithMemoryPrior] Error during connecting container %q to network %q, %v", container.ID, networkID, err) breaked = true + c.store.UpdateNodeMem(opts.Podname, nodeInfo.Name, opts.Memory, "+") break } } // remove bridge network if err := c.network.DisconnectFromNetwork(ctx, container.ID, "bridge"); err != nil { - log.Errorf("Error during disconnecting container %q from network %q, %v", container.ID, "bridge", err) + log.Errorf("[CreateContainerWithMemoryPrior] Error during disconnecting container %q from network %q, %v", container.ID, "bridge", err) } // if any break occurs, then this container needs to be removed if breaked { ms[i].Error = err.Error() - c.store.UpdateNodeMem(opts.Podname, nodename, opts.Memory, "+") - go node.Engine.ContainerRemove(context.Background(), container.ID, enginetypes.ContainerRemoveOptions{}) + go c.removeMemoryPodFailedContainer(container.ID, node, nodeInfo, opts) continue } } err = node.Engine.ContainerStart(context.Background(), container.ID, enginetypes.ContainerStartOptions{}) if err != nil { - log.Errorf("Error during ContainerStart, %v", err) + log.Errorf("[CreateContainerWithMemoryPrior] Error during ContainerStart, %v", err) ms[i].Error = err.Error() - c.store.UpdateNodeMem(opts.Podname, nodename, opts.Memory, "+") - go node.Engine.ContainerRemove(context.Background(), container.ID, enginetypes.ContainerRemoveOptions{}) + go c.removeMemoryPodFailedContainer(container.ID, node, nodeInfo, opts) continue } @@ -162,22 +164,24 @@ func (c *calcium) doCreateContainerWithCPUPeriod(nodename string, count, connum info, err := node.Engine.ContainerInspect(context.Background(), container.ID) if err != nil { - log.Errorf("Error during ContainerInspect, %v", err) + log.Errorf("[CreateContainerWithMemoryPrior] Error during ContainerInspect, %v", err) ms[i].Error = err.Error() - c.store.UpdateNodeMem(opts.Podname, nodename, opts.Memory, "+") + c.store.UpdateNodeMem(opts.Podname, nodeInfo.Name, opts.Memory, "+") continue } ms[i].ContainerID = info.ID // after start if err := runExec(node.Engine, info, AFTER_START); err != nil { - log.Errorf("Run exec at %s error: %v", AFTER_START, err) + log.Errorf("[CreateContainerWithMemoryPrior] Run exec at %s error: %v", AFTER_START, err) } _, err = c.store.AddContainer(info.ID, opts.Podname, node.Name, containerName, nil, opts.Memory) if err != nil { + log.Errorf("[CreateContainerWithMemoryPrior] Error during store etcd data %v", err) ms[i].Error = err.Error() - c.store.UpdateNodeMem(opts.Podname, nodename, opts.Memory, "+") + // 既然要回收资源就要干掉容器啊 + go c.removeMemoryPodFailedContainer(container.ID, node, nodeInfo, opts) continue } @@ -187,55 +191,59 @@ func (c *calcium) doCreateContainerWithCPUPeriod(nodename string, count, connum return ms } -func (c *calcium) createContainerWithScheduler(specs types.Specs, opts *types.DeployOptions) (chan *types.CreateContainerMessage, error) { +func (c *calcium) createContainerWithCPUPrior(specs types.Specs, opts *types.DeployOptions) (chan *types.CreateContainerMessage, error) { ch := make(chan *types.CreateContainerMessage) - result, err := c.AllocCPUPodResource(opts) + result, err := c.allocCPUPodResource(opts) if err != nil { return ch, err } + if len(result) == 0 { - return ch, fmt.Errorf("Not enough resource to create container") + return ch, fmt.Errorf("[CreateContainerWithCPUPrior] Not enough resource to create container") } - // check total count in case scheduler error - totalCount := 0 - for _, cores := range result { - totalCount = totalCount + len(cores) - } - if totalCount != opts.Count { - return ch, fmt.Errorf("Count mismatch (opt.Count %q, total %q), maybe scheduler error?", opts.Count, totalCount) - } + // FIXME check total count in case scheduler error + // FIXME ??? why - go func(plan map[string][]types.CPUMap, opts *types.DeployOptions) { + go func() { wg := sync.WaitGroup{} wg.Add(len(result)) + var count int64 // do deployment - for nodename, cpumap := range plan { - go func(nodename string, cpumap []types.CPUMap, opts *types.DeployOptions) { + for nodeName, cpuMap := range result { + go func(nodeName string, cpuMap []types.CPUMap, count int64) { defer wg.Done() - for _, m := range c.doCreateContainerWithScheduler(nodename, cpumap, specs, opts) { + for _, m := range c.doCreateContainerWithCPUPrior(nodeName, cpuMap, specs, opts, count) { ch <- m } - }(nodename, cpumap, opts) + }(nodeName, cpuMap, count) + count += int64(len(cpuMap)) } wg.Wait() close(ch) - }(result, opts) + }() return ch, nil } -func (c *calcium) doCreateContainerWithScheduler(nodename string, cpumap []types.CPUMap, specs types.Specs, opts *types.DeployOptions) []*types.CreateContainerMessage { - ms := make([]*types.CreateContainerMessage, len(cpumap)) +func (c *calcium) removeCPUPodFailedContainer(id string, node *types.Node, quota types.CPUMap) { + defer c.releaseQuota(node, quota) + if err := node.Engine.ContainerRemove(context.Background(), id, enginetypes.ContainerRemoveOptions{}); err != nil { + log.Errorf("[RemoveCPUPodFailedContainer] Error during remove failed container %v", err) + } +} + +func (c *calcium) doCreateContainerWithCPUPrior(nodeName string, cpuMap []types.CPUMap, specs types.Specs, opts *types.DeployOptions, count int64) []*types.CreateContainerMessage { + ms := make([]*types.CreateContainerMessage, len(cpuMap)) for i := 0; i < len(ms); i++ { ms[i] = &types.CreateContainerMessage{} } - node, err := c.GetNode(opts.Podname, nodename) + node, err := c.GetNode(opts.Podname, nodeName) if err != nil { - log.Errorf("Get node error %v", err) + log.Errorf("[CreateContainerWithCPUPrior] Get node error %v", err) return ms } @@ -243,10 +251,9 @@ func (c *calcium) doCreateContainerWithScheduler(nodename string, cpumap []types return ms } - for i, quota := range cpumap { + for i, quota := range cpuMap { // create options - //TODO ERU_CONTAINER_NO not support CPU pod now !!! - config, hostConfig, networkConfig, containerName, err := c.makeContainerOptions(i, quota, specs, opts, "scheduler", node) + config, hostConfig, networkConfig, containerName, err := c.makeContainerOptions(int64(i)+count, nil, specs, opts, node, CPU_PRIOR) ms[i].ContainerName = containerName ms[i].Podname = opts.Podname ms[i].Nodename = node.Name @@ -260,7 +267,7 @@ func (c *calcium) doCreateContainerWithScheduler(nodename string, cpumap []types // create container container, err := node.Engine.ContainerCreate(context.Background(), config, hostConfig, networkConfig, containerName) if err != nil { - log.Errorf("Error when creating container, %v", err) + log.Errorf("[CreateContainerWithCPUPrior] Error when creating container, %v", err) ms[i].Error = err.Error() c.releaseQuota(node, quota) continue @@ -275,7 +282,7 @@ func (c *calcium) doCreateContainerWithScheduler(nodename string, cpumap []types // need to ensure all networks are correctly connected for networkID, ipv4 := range opts.Networks { if err = c.network.ConnectToNetwork(ctx, container.ID, networkID, ipv4); err != nil { - log.Errorf("Error when connecting container %q to network %q, %v", container.ID, networkID, err) + log.Errorf("[CreateContainerWithCPUPrior] Error when connecting container %q to network %q, %v", container.ID, networkID, err) breaked = true break } @@ -285,25 +292,23 @@ func (c *calcium) doCreateContainerWithScheduler(nodename string, cpumap []types // only when user defined networks is given if len(opts.Networks) != 0 { if err := c.network.DisconnectFromNetwork(ctx, container.ID, "bridge"); err != nil { - log.Errorf("Error when disconnecting container %q from network %q, %v", container.ID, "bridge", err) + log.Errorf("[CreateContainerWithCPUPrior] Error when disconnecting container %q from network %q, %v", container.ID, "bridge", err) } } // if any break occurs, then this container needs to be removed if breaked { ms[i].Error = err.Error() - c.releaseQuota(node, quota) - go node.Engine.ContainerRemove(context.Background(), container.ID, enginetypes.ContainerRemoveOptions{}) + go c.removeCPUPodFailedContainer(container.ID, node, quota) continue } } err = node.Engine.ContainerStart(context.Background(), container.ID, enginetypes.ContainerStartOptions{}) if err != nil { - log.Errorf("Error when starting container, %v", err) + log.Errorf("[CreateContainerWithCPUPrior] Error when starting container, %v", err) ms[i].Error = err.Error() - c.releaseQuota(node, quota) - go node.Engine.ContainerRemove(context.Background(), container.ID, enginetypes.ContainerRemoveOptions{}) + go c.removeCPUPodFailedContainer(container.ID, node, quota) continue } @@ -313,7 +318,7 @@ func (c *calcium) doCreateContainerWithScheduler(nodename string, cpumap []types info, err := node.Engine.ContainerInspect(context.Background(), container.ID) if err != nil { - log.Errorf("Error when inspecting container, %v", err) + log.Errorf("[CreateContainerWithCPUPrior] Error when inspecting container, %v", err) ms[i].Error = err.Error() c.releaseQuota(node, quota) continue @@ -322,14 +327,14 @@ func (c *calcium) doCreateContainerWithScheduler(nodename string, cpumap []types // after start if err := runExec(node.Engine, info, AFTER_START); err != nil { - log.Errorf("Run exec at %s error: %v", AFTER_START, err) + log.Errorf("[CreateContainerWithCPUPrior] Run exec at %s error: %v", AFTER_START, err) } _, err = c.store.AddContainer(info.ID, opts.Podname, node.Name, containerName, quota, opts.Memory) if err != nil { + log.Errorf("[CreateContainerWithCPUPrior] Error during store etcd data %v", err) ms[i].Error = err.Error() - c.releaseQuota(node, quota) - continue + go c.removeCPUPodFailedContainer(container.ID, node, quota) } ms[i].Success = true } @@ -348,7 +353,7 @@ func (c *calcium) releaseQuota(node *types.Node, quota types.CPUMap) { c.store.UpdateNodeCPU(node.Podname, node.Name, quota, "+") } -func (c *calcium) makeContainerOptions(index int, quota map[string]int, specs types.Specs, opts *types.DeployOptions, optionMode string, node *types.Node) ( +func (c *calcium) makeContainerOptions(index int64, quota map[string]int, specs types.Specs, opts *types.DeployOptions, node *types.Node, optionMode string) ( *enginecontainer.Config, *enginecontainer.HostConfig, *enginenetwork.NetworkingConfig, diff --git a/cluster/calcium/helper.go b/cluster/calcium/helper.go index 8e265eb6d..a44dcdc39 100644 --- a/cluster/calcium/helper.go +++ b/cluster/calcium/helper.go @@ -38,14 +38,6 @@ func makeCPUAndMem(nodes []*types.Node) map[string]types.CPUAndMem { return r } -func makeCPUMap(nodes map[string]types.CPUAndMem) map[string]types.CPUMap { - r := make(map[string]types.CPUMap) - for key, node := range nodes { - r[key] = node.CpuMap - } - return r -} - // filter nodes // public is the flag func filterNodes(nodes []*types.Node, public bool) []*types.Node { diff --git a/cluster/calcium/resource.go b/cluster/calcium/resource.go index a3dc2b14a..1fc9f1001 100644 --- a/cluster/calcium/resource.go +++ b/cluster/calcium/resource.go @@ -11,7 +11,7 @@ import ( "gitlab.ricebook.net/platform/core/utils" ) -func (c *calcium) AllocMemoryPodResource(opts *types.DeployOptions) (map[string]int, error) { +func (c *calcium) allocMemoryPodResource(opts *types.DeployOptions) ([]types.NodeInfo, error) { lock, err := c.Lock(opts.Podname, 30) if err != nil { return nil, err @@ -22,36 +22,37 @@ func (c *calcium) AllocMemoryPodResource(opts *types.DeployOptions) (map[string] if err != nil { return nil, err } - nodesInfo := getNodesInfo(cpuandmem) - log.Debugf("Input opts.CPUQuota: %f", opts.CPUQuota) - cpuQuota := int(opts.CPUQuota * float64(utils.CpuPeriodBase)) - log.Debugf("Tranfered cpuQuota: %d", cpuQuota) + + // Load deploy status nodesInfo, err = c.store.MakeDeployStatus(opts, nodesInfo) if err != nil { return nil, err } - plan, err := c.scheduler.SelectMemoryNodes(nodesInfo, cpuQuota, opts.Memory, opts.Count) // 还是以 Bytes 作单位, 不转换了 + cpuRate := int64(opts.CPUQuota * float64(utils.CpuPeriodBase)) + log.Debugf("Input opts.CPUQuota: %f, equal CPURate %d", opts.CPUQuota, cpuRate) + sort.Slice(nodesInfo, func(i, j int) bool { return nodesInfo[i].MemCap < nodesInfo[j].MemCap }) + nodesInfo, err = c.scheduler.SelectMemoryNodes(nodesInfo, cpuRate, opts.Memory, opts.Count) // 还是以 Bytes 作单位, 不转换了 if err != nil { return nil, err } // 并发扣除所需资源 wg := sync.WaitGroup{} - wg.Add(len(plan)) - for nodename, connum := range plan { - go func(nodename string, connum int) { - wg.Done() - memoryTotal := opts.Memory * int64(connum) - c.store.UpdateNodeMem(opts.Podname, nodename, memoryTotal, "-") - }(nodename, connum) + wg.Add(len(nodesInfo)) + for _, nodeInfo := range nodesInfo { + go func(nodeInfo types.NodeInfo) { + defer wg.Done() + memoryTotal := opts.Memory * nodeInfo.Deploy + c.store.UpdateNodeMem(opts.Podname, nodeInfo.Name, memoryTotal, "-") + }(nodeInfo) } wg.Wait() - return plan, nil + return nodesInfo, nil } -func (c *calcium) AllocCPUPodResource(opts *types.DeployOptions) (map[string][]types.CPUMap, error) { +func (c *calcium) allocCPUPodResource(opts *types.DeployOptions) (map[string][]types.CPUMap, error) { lock, err := c.Lock(opts.Podname, 30) if err != nil { return nil, err @@ -62,10 +63,15 @@ func (c *calcium) AllocCPUPodResource(opts *types.DeployOptions) (map[string][]t if err != nil { return nil, err } + nodesInfo := getNodesInfo(cpuandmem) + + // Load deploy status + nodesInfo, err = c.store.MakeDeployStatus(opts, nodesInfo) + if err != nil { + return nil, err + } - cpumap := makeCPUMap(cpuandmem) // 做这个转换,免得改太多 - log.Debugf("Cpumap: %v", cpumap) - result, changed, err := c.scheduler.SelectCPUNodes(cpumap, opts.CPUQuota, opts.Count) + result, changed, err := c.scheduler.SelectCPUNodes(nodesInfo, opts.CPUQuota, opts.Count) log.Debugf("Result: %v, Changed: %v", result, changed) if err != nil { return result, err @@ -123,11 +129,11 @@ func (c *calcium) getCPUAndMem(podname, nodename string, quota float64) (map[str return result, nodes, nil } -func getNodesInfo(cpumemmap map[string]types.CPUAndMem) []types.NodeInfo { +func getNodesInfo(cpuAndMemData map[string]types.CPUAndMem) []types.NodeInfo { result := []types.NodeInfo{} - for node, cpuandmem := range cpumemmap { - result = append(result, types.NodeInfo{node, len(cpuandmem.CpuMap) * utils.CpuPeriodBase, cpuandmem.MemCap, 0, 0, 0}) + for nodeName, cpuAndMem := range cpuAndMemData { + cpuRate := int64(len(cpuAndMem.CpuMap)) * utils.CpuPeriodBase + result = append(result, types.NodeInfo{cpuAndMem, nodeName, cpuRate, 0, 0, 0}) } - sort.Slice(result, func(i, j int) bool { return result[i].Memory < result[j].Memory }) return result } diff --git a/rpc/gen/core.pb.go b/rpc/gen/core.pb.go index 1ad6ec44c..ca78f3bb3 100644 --- a/rpc/gen/core.pb.go +++ b/rpc/gen/core.pb.go @@ -595,7 +595,7 @@ type DeployOptions struct { Entrypoint string `protobuf:"bytes,6,opt,name=entrypoint" json:"entrypoint,omitempty"` ExtraArgs string `protobuf:"bytes,7,opt,name=extra_args,json=extraArgs" json:"extra_args,omitempty"` CpuQuota float64 `protobuf:"fixed64,8,opt,name=cpu_quota,json=cpuQuota" json:"cpu_quota,omitempty"` - Count int32 `protobuf:"varint,9,opt,name=count" json:"count,omitempty"` + Count int64 `protobuf:"varint,9,opt,name=count" json:"count,omitempty"` Memory int64 `protobuf:"varint,10,opt,name=memory" json:"memory,omitempty"` Env []string `protobuf:"bytes,11,rep,name=env" json:"env,omitempty"` Networks map[string]string `protobuf:"bytes,12,rep,name=networks" json:"networks,omitempty" protobuf_key:"bytes,1,opt,name=key" protobuf_val:"bytes,2,opt,name=value"` @@ -664,7 +664,7 @@ func (m *DeployOptions) GetCpuQuota() float64 { return 0 } -func (m *DeployOptions) GetCount() int32 { +func (m *DeployOptions) GetCount() int64 { if m != nil { return m.Count } diff --git a/rpc/gen/core.proto b/rpc/gen/core.proto index 3e2c3221e..7035318a5 100644 --- a/rpc/gen/core.proto +++ b/rpc/gen/core.proto @@ -139,7 +139,7 @@ message DeployOptions { string entrypoint = 6; string extra_args = 7; double cpu_quota = 8; - int32 count = 9; + int64 count = 9; int64 memory = 10; repeated string env = 11; map networks = 12; diff --git a/rpc/transform.go b/rpc/transform.go index 4bef57064..90dd0aaba 100644 --- a/rpc/transform.go +++ b/rpc/transform.go @@ -67,7 +67,7 @@ func toCoreDeployOptions(d *pb.DeployOptions) *types.DeployOptions { Entrypoint: d.Entrypoint, ExtraArgs: d.ExtraArgs, CPUQuota: d.CpuQuota, - Count: int(d.Count), + Count: d.Count, Memory: d.Memory, Env: d.Env, Networks: d.Networks, diff --git a/scheduler/complex/mem.go b/scheduler/complex/communism.go similarity index 71% rename from scheduler/complex/mem.go rename to scheduler/complex/communism.go index 13bc14019..66be0687e 100644 --- a/scheduler/complex/mem.go +++ b/scheduler/complex/communism.go @@ -1,16 +1,22 @@ package complexscheduler import ( + "sort" + "gitlab.ricebook.net/platform/core/types" ) -func equalDivisionPlan(arg []types.NodeInfo, need, volTotal int) ([]types.NodeInfo, error) { +// 吃我一记共产主义大锅饭 +func CommunismDivisionPlan(arg []types.NodeInfo, need, volTotal int64) ([]types.NodeInfo, error) { + sort.Slice(arg, func(i, j int) bool { return arg[i].Count < arg[j].Count }) length := len(arg) i := 0 + + var deploy, differ int64 for need > 0 && volTotal > 0 { p := i - deploy := 0 - differ := 1 + deploy = 0 + differ = 1 if i < length-1 { differ = arg[i+1].Count - arg[i].Count i++ diff --git a/scheduler/complex/cpu.go b/scheduler/complex/cpu.go index c46212add..9343ab469 100644 --- a/scheduler/complex/cpu.go +++ b/scheduler/complex/cpu.go @@ -11,21 +11,27 @@ import ( "gitlab.ricebook.net/platform/core/types" ) -func volumn(nodeInfo []types.NodeInfo) int { - val := 0 - for _, v := range nodeInfo { - val += v.Capacity +func min(a, b int64) int64 { + if a < b { + return a } - return val + return b +} + +func abs(a int64) int64 { + if a < 0 { + return -a + } + return a } type host struct { full types.CPUMap fragment types.CPUMap - share int + share int64 } -func newHost(cpuInfo types.CPUMap, share int) *host { +func newHost(cpuInfo types.CPUMap, share int64) *host { result := &host{ share: share, full: types.CPUMap{}, @@ -42,18 +48,19 @@ func newHost(cpuInfo types.CPUMap, share int) *host { return result } -func (self *host) calcuatePiecesCores(full int, fragment int, maxShareCore int) { - var fullResultNum, fragmentResultNum, canDeployNum, baseLine int - var fragmentBaseResult, count, num, flag, b, baseContainers int +func (self *host) calcuatePiecesCores(full, fragment, maxShareCore int64) { + var fullResultNum, fragmentResultNum, canDeployNum, baseLine int64 + var fragmentBaseResult, lenFull, count, num, flag, b, baseContainers int64 - count = len(self.fragment) + count = int64(len(self.fragment)) + lenFull = int64(len(self.full)) if maxShareCore == -1 { - maxShareCore = len(self.full) - count - full // 减枝,M == N 的情况下预留至少一个 full 量的核数 + maxShareCore = lenFull - count - full // 减枝,M == N 的情况下预留至少一个 full 量的核数 } else { maxShareCore -= count } - fullResultNum = len(self.full) / full + fullResultNum = lenFull / full fragmentBaseResult = 0 for _, pieces := range self.fragment { fragmentBaseResult += pieces / fragment @@ -63,8 +70,9 @@ func (self *host) calcuatePiecesCores(full int, fragment int, maxShareCore int) num = 0 flag = math.MaxInt64 baseContainers = self.share / fragment - for i := 1; i < maxShareCore+1; i++ { - fullResultNum = (len(self.full) - i) / full + var i int64 + for i = 1; i < maxShareCore+1; i++ { + fullResultNum = (lenFull - i) / full fragmentResultNum = fragmentBaseResult + i*baseContainers // 剪枝,2者结果相近的时候最优 b = abs(fullResultNum - fragmentResultNum) @@ -91,24 +99,24 @@ func (self *host) calcuatePiecesCores(full int, fragment int, maxShareCore int) } } -func (self *host) getContainerCores(num float64, maxShareCore int) []types.CPUMap { +func (self *host) getContainerCores(num float64, maxShareCore int64) []types.CPUMap { num = num * float64(self.share) - var full, fragment int + var full, fragment, i int64 var result = []types.CPUMap{} var fullResult = types.CPUMap{} var fragmentResult = []string{} - full = int(num) / self.share - fragment = int(num) % self.share + full = int64(num) / self.share + fragment = int64(num) % self.share if full == 0 { if maxShareCore == -1 { // 这个时候就把所有的核都当成碎片核 - maxShareCore = len(self.full) + len(self.fragment) + maxShareCore = int64(len(self.full)) + int64(len(self.fragment)) } for no, pieces := range self.full { - if len(self.fragment) >= maxShareCore { + if int64(len(self.fragment)) >= maxShareCore { break } self.fragment[no] = pieces @@ -122,8 +130,8 @@ func (self *host) getContainerCores(num float64, maxShareCore int) []types.CPUMa } if fragment == 0 { - n := len(self.full) / full - for i := 0; i < n; i++ { + n := int64(len(self.full)) / full + for i = 0; i < n; i++ { fullResult = self.getFullResult(full) result = append(result, fullResult) } @@ -135,7 +143,7 @@ func (self *host) getContainerCores(num float64, maxShareCore int) []types.CPUMa fragmentResult = self.getFragmentResult(fragment) for _, no := range fragmentResult { fullResult = self.getFullResult(full) - if len(fullResult) != full { // 可能整数核不够用了结果并不一定可靠必须再判断一次 + if int64(len(fullResult)) != full { // 可能整数核不够用了结果并不一定可靠必须再判断一次 return result // 减枝这时候整数核一定不够用了,直接退出,这样碎片核和整数核的计算就完成了 } fullResult[no] = fragment @@ -144,130 +152,63 @@ func (self *host) getContainerCores(num float64, maxShareCore int) []types.CPUMa return result } -func (self *host) getFragmentResult(fragment int) []string { +func (self *host) getFragmentResult(fragment int64) []string { var result = []string{} + var i int64 for no, pieces := range self.fragment { - for i := 0; i < pieces/fragment; i++ { + for i = 0; i < pieces/fragment; i++ { result = append(result, no) } } return result } -func (self *host) getFullResult(full int) types.CPUMap { +func (self *host) getFullResult(full int64) types.CPUMap { var result = types.CPUMap{} for no, pieces := range self.full { result[no] = pieces // 分配一整个核 delete(self.full, no) // 干掉这个可用资源 - if len(result) == full { + if int64(len(result)) == full { break } } return result } -func min(a, b int) int { - if a < b { - return a - } - return b -} - -func abs(a int) int { - if a < 0 { - return -a - } - return a -} - -func averagePlan(cpu float64, nodes map[string]types.CPUMap, need, maxShareCore, coreShare int) map[string][]types.CPUMap { +func cpuPriorPlan(cpu float64, nodesInfo []types.NodeInfo, need, maxShareCore, coreShare int64) ( + int64, []types.NodeInfo, map[string][]types.CPUMap) { - var nodecontainer = map[string][]types.CPUMap{} - var result = map[string][]types.CPUMap{} + var nodeContainer = map[string][]types.CPUMap{} var host *host var plan []types.CPUMap - var n int - var nodeInfo []types.NodeInfo - var nodeName string + var n int64 - if cpu < 0.01 { - resultLength := 0 - r: - for { - for nodeName, _ := range nodes { - result[nodeName] = append(result[nodeName], nil) - resultLength++ - } - if resultLength == need { - break r - } - } - return result - } + // TODO weird check cpu < 0.01 - for node, cpuInfo := range nodes { - host = newHost(cpuInfo, coreShare) + var volTotal int64 = 0 + for p, nodeInfo := range nodesInfo { + host = newHost(nodeInfo.CpuMap, coreShare) plan = host.getContainerCores(cpu, maxShareCore) - n = len(plan) // 每个node可以放的容器数 + n = int64(len(plan)) // 每个node可以放的容器数 if n > 0 { - nodeInfo = append(nodeInfo, types.NodeInfo{Name: node, Capacity: n}) - nodecontainer[node] = plan + nodesInfo[p].Capacity = n + nodeContainer[nodeInfo.Name] = plan + volTotal += n } } - if volumn(nodeInfo) < need { - return nil + if volTotal < need { + return -1, nil, nil } - // 排序 - sort.Slice(nodeInfo, func(i, j int) bool { - return nodeInfo[i].Capacity < nodeInfo[j].Capacity - }) - - // 决定分配方案 - allocplan := allocPlan(nodeInfo, need) - for node, ncon := range allocplan { - if ncon > 0 { - nodeName = nodeInfo[node].Name - result[nodeName] = nodecontainer[nodeName][:ncon] - } - } - - return result -} - -func allocPlan(info []types.NodeInfo, need int) map[int]int { - result := make(map[int]int) - NNode := len(info) - - var nodeToUse, more int - for i := 0; i < NNode; i++ { - nodeToUse = NNode - i - ave := need / nodeToUse - if ave > info[i].Capacity { - ave = 1 - } - for ; ave < info[i].Capacity && ave*nodeToUse < need; ave++ { - } - more = ave*nodeToUse - need - for j := i; nodeToUse != 0; nodeToUse-- { - if _, ok := result[j]; !ok { - result[j] = ave - } else { - result[j] += ave - } - if more > 0 { - more-- - result[j]-- - } else if more < 0 { - info[j].Capacity -= ave - } - j++ - } - if more == 0 { + // 裁剪掉不能部署的 + sort.Slice(nodesInfo, func(i, j int) bool { return nodesInfo[i].Capacity < nodesInfo[j].Capacity }) + var p int + for p = 0; p < len(nodesInfo); p++ { + if nodesInfo[p].Capacity > 0 { break } - need = -more } - return result + + return volTotal, nodesInfo[p:], nodeContainer } diff --git a/scheduler/complex/potassium.go b/scheduler/complex/potassium.go index 988593bb3..c088f81fd 100644 --- a/scheduler/complex/potassium.go +++ b/scheduler/complex/potassium.go @@ -2,7 +2,6 @@ package complexscheduler import ( "fmt" - "sort" log "github.com/Sirupsen/logrus" "gitlab.ricebook.net/platform/core/types" @@ -20,7 +19,7 @@ func (m *potassium) RandomNode(nodes map[string]types.CPUMap) (string, error) { if len(nodes) == 0 { return nodename, fmt.Errorf("No nodes provide to choose one") } - max := 0 + var max int64 = 0 for name, cpumap := range nodes { total := cpumap.Total() if total > max { @@ -34,28 +33,27 @@ func (m *potassium) RandomNode(nodes map[string]types.CPUMap) (string, error) { return nodename, nil } -func (m *potassium) SelectMemoryNodes(nodesInfo []types.NodeInfo, quota int, memory int64, need int) (map[string]int, error) { - log.Debugf("[AllocContainerPlan]: nodesInfo: %v, quota: %d, memory: %d, need: %d", nodesInfo, quota, memory, need) +func (m *potassium) SelectMemoryNodes(nodesInfo []types.NodeInfo, rate, memory, need int64) ([]types.NodeInfo, error) { + log.Debugf("[SelectMemoryNodes]: nodesInfo: %v, rate: %d, memory: %d, need: %d", nodesInfo, rate, memory, need) - result := map[string]int{} var p int = -1 for i, nodeInfo := range nodesInfo { - if nodeInfo.CorePer >= quota { + if nodeInfo.CPURate >= rate { p = i break } } if p == -1 { - return result, fmt.Errorf("[AllocContainerPlan] Cannot alloc a plan, not enough cpu quota") + return nil, fmt.Errorf("[SelectMemoryNodes] Cannot alloc a plan, not enough cpu rate") } - log.Debugf("[AllocContainerPlan] the %d th node has enough cpu quota.", p) + log.Debugf("[SelectMemoryNodes] the %d th node has enough cpu rate.", p) // 计算是否有足够的内存满足需求 nodesInfo = nodesInfo[p:] - volTotal := 0 + var volTotal int64 = 0 p = -1 for i, nodeInfo := range nodesInfo { - capacity := int(nodeInfo.Memory / memory) + capacity := int64(nodeInfo.MemCap / memory) if capacity <= 0 { continue } @@ -63,55 +61,59 @@ func (m *potassium) SelectMemoryNodes(nodesInfo []types.NodeInfo, quota int, mem p = i } volTotal += capacity - nodeInfo.Capacity = capacity + nodesInfo[i].Capacity = capacity } if volTotal < need { - return result, fmt.Errorf("[AllocContainerPlan] Cannot alloc a plan, not enough memory, volume %d, need %d", volTotal, need) + return nil, fmt.Errorf("[SelectMemoryNodes] Cannot alloc a plan, not enough memory, volume %d, need %d", volTotal, need) } // 继续裁可用节点池子 nodesInfo = nodesInfo[p:] - sort.Slice(nodesInfo, func(i, j int) bool { return nodesInfo[i].Count < nodesInfo[j].Count }) - log.Debugf("[AllocContainerPlan] volumn of each node: %v", nodesInfo) - nodesInfo, err := equalDivisionPlan(nodesInfo, need, volTotal) + log.Debugf("[SelectMemoryNodes] volumn of each node: %v", nodesInfo) + nodesInfo, err := CommunismDivisionPlan(nodesInfo, need, volTotal) if err != nil { - return result, err + return nil, err } - sort.Slice(nodesInfo, func(i, j int) bool { return nodesInfo[i].Memory < nodesInfo[j].Memory }) - for _, nodeInfo := range nodesInfo { - result[nodeInfo.Name] = nodeInfo.Deploy - } - log.Debugf("[AllocContainerPlan] allocAlgorithm result: %v", result) - return result, nil + // 这里并不需要再次排序了,理论上的排序是基于 Count 得到的 Deploy 最终方案 + log.Debugf("[SelectMemoryNodes] CommunismDivisionPlan: %v", nodesInfo) + return nodesInfo, nil } -func (m *potassium) SelectCPUNodes(nodes map[string]types.CPUMap, quota float64, need int) (map[string][]types.CPUMap, map[string]types.CPUMap, error) { +//TODO 这里要处理下输入 +func (m *potassium) SelectCPUNodes(nodesInfo []types.NodeInfo, quota float64, need int64) (map[string][]types.CPUMap, map[string]types.CPUMap, error) { result := make(map[string][]types.CPUMap) changed := make(map[string]types.CPUMap) - if len(nodes) == 0 { - return result, nil, fmt.Errorf("No nodes provide to choose some") + if len(nodesInfo) == 0 { + return result, nil, fmt.Errorf("[SelectCPUNodes] No nodes provide to choose some") } // all core could be shared // suppose each core has 10 coreShare // TODO: change it to be control by parameters - result = averagePlan(quota, nodes, need, -1, 10) - if result == nil { + volTotal, selectedNodesInfo, selectedNodesPool := cpuPriorPlan(quota, nodesInfo, need, -1, 10) + if volTotal == -1 { return nil, nil, fmt.Errorf("Not enough resource") } + selectedNodesInfo, err := CommunismDivisionPlan(selectedNodesInfo, need, volTotal) + if err != nil { + return nil, nil, err + } + // 只返回有修改的就可以了, 返回有修改的还剩下多少 - for nodename, cpuList := range result { - node, ok := nodes[nodename] - if !ok { + for _, selectedNode := range selectedNodesInfo { + if selectedNode.Deploy <= 0 { continue } + cpuList := selectedNodesPool[selectedNode.Name][:selectedNode.Deploy] + result[selectedNode.Name] = cpuList for _, cpu := range cpuList { - node.Sub(cpu) + selectedNode.CpuMap.Sub(cpu) } - changed[nodename] = node + changed[selectedNode.Name] = selectedNode.CpuMap } + return result, changed, nil } diff --git a/scheduler/complex/potassium_test.go b/scheduler/complex/potassium_test.go index 476b2b3fa..cbc2d0889 100644 --- a/scheduler/complex/potassium_test.go +++ b/scheduler/complex/potassium_test.go @@ -34,18 +34,30 @@ func TestSelectCPUNodes(t *testing.T) { t.Fatalf("Create Potassim error: %v", merr) } - _, _, err := k.SelectCPUNodes(map[string]types.CPUMap{}, 1, 1) + _, _, err := k.SelectCPUNodes([]types.NodeInfo{}, 1, 1) assert.Error(t, err) - assert.Equal(t, err.Error(), "No nodes provide to choose some") - - nodes := map[string]types.CPUMap{ - "node1": types.CPUMap{ - "0": 10, - "1": 10, - }, - "node2": types.CPUMap{ - "0": 10, - "1": 10, + assert.Equal(t, err.Error(), "[SelectCPUNodes] No nodes provide to choose some") + + nodes := []types.NodeInfo{ + types.NodeInfo{ + types.CPUAndMem{ + types.CPUMap{ + "0": 10, + "1": 10, + }, + 12400000, + }, + "node1", 0.0, 0, 0, 0, + }, + types.NodeInfo{ + types.CPUAndMem{ + types.CPUMap{ + "0": 10, + "1": 10, + }, + 12400000, + }, + "node2", 0.0, 0, 0, 0, }, } @@ -65,6 +77,29 @@ func TestSelectCPUNodes(t *testing.T) { assert.Error(t, err) assert.Contains(t, err.Error(), "Not enough") + nodes = []types.NodeInfo{ + types.NodeInfo{ + types.CPUAndMem{ + types.CPUMap{ + "0": 10, + "1": 10, + }, + 12400000, + }, + "node1", 0.0, 0, 0, 0, + }, + types.NodeInfo{ + types.CPUAndMem{ + types.CPUMap{ + "0": 10, + "1": 10, + }, + 12400000, + }, + "node2", 0.0, 0, 0, 0, + }, + } + r, re, err := k.SelectCPUNodes(nodes, 1, 2) assert.NoError(t, err) assert.Equal(t, 2, len(r)) @@ -75,19 +110,25 @@ func TestSelectCPUNodes(t *testing.T) { // assert.Equal(t, len(cpus), 1) cpu := cpus[0] - assert.Equal(t, cpu.Total(), 10) + assert.Equal(t, cpu.Total(), int64(10)) } // SelectCPUNodes 里有一些副作用, 粗暴地拿一个新的来测试吧 // 下面也是因为这个 - nodes = map[string]types.CPUMap{ - "node1": types.CPUMap{ - "0": 10, - "1": 10, + nodes = []types.NodeInfo{ + types.NodeInfo{ + types.CPUAndMem{ + types.CPUMap{"0": 10, "1": 10}, + 12400000, + }, + "node1", 0.0, 0, 0, 0, }, - "node2": types.CPUMap{ - "0": 10, - "1": 10, + types.NodeInfo{ + types.CPUAndMem{ + types.CPUMap{"0": 10, "1": 10}, + 12400000, + }, + "node2", 0.0, 0, 0, 0, }, } @@ -99,7 +140,7 @@ func TestSelectCPUNodes(t *testing.T) { assert.Equal(t, len(cpus), 1) cpu := cpus[0] - assert.Equal(t, cpu.Total(), 13) + assert.Equal(t, cpu.Total(), int64(13)) } } @@ -143,26 +184,44 @@ func TestRecurrence(t *testing.T) { k, _ := New(coreCfg) - nodes := map[string]types.CPUMap{ - "c2-docker-26": types.CPUMap{ - "0": 0, "10": 0, "7": 0, "8": 10, "9": 10, "13": 0, "14": 0, "15": 10, "2": 10, "5": 10, "11": 0, "12": 0, "4": 0, "1": 0, "3": 10, "6": 0, - }, - "c2-docker-28": types.CPUMap{ - "13": 0, "14": 0, "15": 0, "4": 10, "9": 0, "1": 0, "10": 0, "12": 10, "5": 10, "6": 10, "8": 10, "0": 0, "11": 0, "2": 10, "3": 0, "7": 0, - }, - "c2-docker-27": types.CPUMap{ - "6": 10, "10": 0, "13": 0, "14": 10, "2": 0, "7": 0, "1": 0, "11": 0, "15": 0, "8": 10, "0": 0, "3": 0, "4": 0, "5": 0, "9": 10, "12": 0, - }, - "c2-docker-29": types.CPUMap{ - "15": 0, "3": 10, "0": 0, "10": 0, "13": 0, "7": 10, "8": 0, "9": 10, "12": 10, "2": 10, "4": 10, "1": 0, "11": 0, "14": 10, "5": 10, "6": 10, + nodes := []types.NodeInfo{ + types.NodeInfo{ + types.CPUAndMem{ + types.CPUMap{"0": 0, "10": 0, "7": 0, "8": 10, "9": 10, "13": 0, "14": 0, "15": 10, "2": 10, "5": 10, "11": 0, "12": 0, "4": 0, "1": 0, "3": 10, "6": 0}, + 12400000, + }, + "c2-docker-26", 0.0, 0, 0, 0, + }, + types.NodeInfo{ + types.CPUAndMem{ + types.CPUMap{"6": 10, "10": 0, "13": 0, "14": 10, "2": 0, "7": 0, "1": 0, "11": 0, "15": 0, "8": 10, "0": 0, "3": 0, "4": 0, "5": 0, "9": 10, "12": 0}, + 12400000, + }, + "c2-docker-27", 0.0, 0, 0, 0, + }, + types.NodeInfo{ + types.CPUAndMem{ + types.CPUMap{"13": 0, "14": 0, "15": 0, "4": 10, "9": 0, "1": 0, "10": 0, "12": 10, "5": 10, "6": 10, "8": 10, "0": 0, "11": 0, "2": 10, "3": 0, "7": 0}, + 12400000, + }, + "c2-docker-28", 0.0, 0, 0, 0, + }, + types.NodeInfo{ + types.CPUAndMem{ + types.CPUMap{"15": 0, "3": 10, "0": 0, "10": 0, "13": 0, "7": 10, "8": 0, "9": 10, "12": 10, "2": 10, "4": 10, "1": 0, "11": 0, "14": 10, "5": 10, "6": 10}, + 12400000, + }, + "c2-docker-29", 0.0, 0, 0, 0, }, } + result, _, err := k.SelectCPUNodes(nodes, 0.5, 3) log.Infof("result: %v", result) if err != nil { t.Fatalf("err: %v", err) } } + func TestComplexNodes(t *testing.T) { coreCfg := types.Config{ @@ -180,32 +239,61 @@ func TestComplexNodes(t *testing.T) { t.Fatalf("Create Potassim error: %v", merr) } - // nodes can offer 28 containers. - nodes := map[string]types.CPUMap{ - "n1": types.CPUMap{ // 2 containers - "0": 10, "1": 10, "2": 10, "3": 10, - }, - "n2": types.CPUMap{ // 7 containers - "0": 10, "1": 10, "2": 10, "3": 10, - "4": 10, "5": 10, "6": 10, "7": 10, - "8": 10, "9": 10, "10": 10, "11": 10, - "12": 10, "13": 10, - }, - "n3": types.CPUMap{ // 6 containers - "0": 10, "1": 10, "2": 10, "3": 10, - "4": 10, "5": 10, "6": 10, "7": 10, - "8": 10, "9": 10, "10": 10, "11": 10, - }, - "n4": types.CPUMap{ // 9 containers - "0": 10, "1": 10, "2": 10, "3": 10, - "4": 10, "5": 10, "6": 10, "7": 10, - "8": 10, "9": 10, "10": 10, "11": 10, - "12": 10, "13": 10, "14": 10, "15": 10, - "16": 10, "17": 10, - }, - "n5": types.CPUMap{ // 4 containers - "0": 10, "1": 10, "2": 10, "3": 10, - "4": 10, "5": 10, "6": 10, "7": 10, + nodes := []types.NodeInfo{ + types.NodeInfo{ + types.CPUAndMem{ + types.CPUMap{ // 2 containers + "0": 10, "1": 10, "2": 10, "3": 10, + }, + 12400000, + }, + "n1", 0.0, 0, 0, 0, + }, + types.NodeInfo{ + types.CPUAndMem{ + types.CPUMap{ // 7 containers + "0": 10, "1": 10, "2": 10, "3": 10, + "4": 10, "5": 10, "6": 10, "7": 10, + "8": 10, "9": 10, "10": 10, "11": 10, + "12": 10, "13": 10, + }, + 12400000, + }, + "n2", 0.0, 0, 0, 0, + }, + types.NodeInfo{ + types.CPUAndMem{ + types.CPUMap{ // 6 containers + "0": 10, "1": 10, "2": 10, "3": 10, + "4": 10, "5": 10, "6": 10, "7": 10, + "8": 10, "9": 10, "10": 10, "11": 10, + }, + 12400000, + }, + "n3", 0.0, 0, 0, 0, + }, + types.NodeInfo{ + types.CPUAndMem{ + types.CPUMap{ // 9 containers + "0": 10, "1": 10, "2": 10, "3": 10, + "4": 10, "5": 10, "6": 10, "7": 10, + "8": 10, "9": 10, "10": 10, "11": 10, + "12": 10, "13": 10, "14": 10, "15": 10, + "16": 10, "17": 10, + }, + 12400000, + }, + "n4", 0.0, 0, 0, 0, + }, + types.NodeInfo{ + types.CPUAndMem{ + types.CPUMap{ // 4 containers + "0": 10, "1": 10, "2": 10, "3": 10, + "4": 10, "5": 10, "6": 10, "7": 10, + }, + 12400000, + }, + "n5", 0.0, 0, 0, 0, }, } @@ -223,33 +311,64 @@ func TestComplexNodes(t *testing.T) { // test2 // SelectCPUNodes 里有一些副作用, 粗暴地拿一个新的来测试吧 // 下面也是因为这个 - nodes = map[string]types.CPUMap{ - "n1": types.CPUMap{ // 2 containers - "0": 10, "1": 10, "2": 10, "3": 10, - }, - "n2": types.CPUMap{ // 7 containers - "0": 10, "1": 10, "2": 10, "3": 10, - "4": 10, "5": 10, "6": 10, "7": 10, - "8": 10, "9": 10, "10": 10, "11": 10, - "12": 10, "13": 10, - }, - "n3": types.CPUMap{ // 6 containers - "0": 10, "1": 10, "2": 10, "3": 10, - "4": 10, "5": 10, "6": 10, "7": 10, - "8": 10, "9": 10, "10": 10, "11": 10, - }, - "n4": types.CPUMap{ // 9 containers - "0": 10, "1": 10, "2": 10, "3": 10, - "4": 10, "5": 10, "6": 10, "7": 10, - "8": 10, "9": 10, "10": 10, "11": 10, - "12": 10, "13": 10, "14": 10, "15": 10, - "16": 10, "17": 10, - }, - "n5": types.CPUMap{ // 4 containers - "0": 10, "1": 10, "2": 10, "3": 10, - "4": 10, "5": 10, "6": 10, "7": 10, + nodes = []types.NodeInfo{ + types.NodeInfo{ + types.CPUAndMem{ + types.CPUMap{ // 2 containers + "0": 10, "1": 10, "2": 10, "3": 10, + }, + 12400000, + }, + "n1", 0.0, 0, 0, 0, + }, + types.NodeInfo{ + types.CPUAndMem{ + types.CPUMap{ // 7 containers + "0": 10, "1": 10, "2": 10, "3": 10, + "4": 10, "5": 10, "6": 10, "7": 10, + "8": 10, "9": 10, "10": 10, "11": 10, + "12": 10, "13": 10, + }, + 12400000, + }, + "n2", 0.0, 0, 0, 0, + }, + types.NodeInfo{ + types.CPUAndMem{ + types.CPUMap{ // 6 containers + "0": 10, "1": 10, "2": 10, "3": 10, + "4": 10, "5": 10, "6": 10, "7": 10, + "8": 10, "9": 10, "10": 10, "11": 10, + }, + 12400000, + }, + "n3", 0.0, 0, 0, 0, + }, + types.NodeInfo{ + types.CPUAndMem{ + types.CPUMap{ // 9 containers + "0": 10, "1": 10, "2": 10, "3": 10, + "4": 10, "5": 10, "6": 10, "7": 10, + "8": 10, "9": 10, "10": 10, "11": 10, + "12": 10, "13": 10, "14": 10, "15": 10, + "16": 10, "17": 10, + }, + 12400000, + }, + "n4", 0.0, 0, 0, 0, + }, + types.NodeInfo{ + types.CPUAndMem{ + types.CPUMap{ // 4 containers + "0": 10, "1": 10, "2": 10, "3": 10, + "4": 10, "5": 10, "6": 10, "7": 10, + }, + 12400000, + }, + "n5", 0.0, 0, 0, 0, }, } + res2, changed2, err := k.SelectCPUNodes(nodes, 1.7, 11) if err != nil { t.Fatalf("something went wrong") @@ -260,33 +379,64 @@ func TestComplexNodes(t *testing.T) { assert.Equal(t, len(changed2), len(res2)) // test3 - nodes = map[string]types.CPUMap{ - "n1": types.CPUMap{ // 2 containers - "0": 10, "1": 10, "2": 10, "3": 10, - }, - "n2": types.CPUMap{ // 7 containers - "0": 10, "1": 10, "2": 10, "3": 10, - "4": 10, "5": 10, "6": 10, "7": 10, - "8": 10, "9": 10, "10": 10, "11": 10, - "12": 10, "13": 10, - }, - "n3": types.CPUMap{ // 6 containers - "0": 10, "1": 10, "2": 10, "3": 10, - "4": 10, "5": 10, "6": 10, "7": 10, - "8": 10, "9": 10, "10": 10, "11": 10, - }, - "n4": types.CPUMap{ // 9 containers - "0": 10, "1": 10, "2": 10, "3": 10, - "4": 10, "5": 10, "6": 10, "7": 10, - "8": 10, "9": 10, "10": 10, "11": 10, - "12": 10, "13": 10, "14": 10, "15": 10, - "16": 10, "17": 10, - }, - "n5": types.CPUMap{ // 4 containers - "0": 10, "1": 10, "2": 10, "3": 10, - "4": 10, "5": 10, "6": 10, "7": 10, + nodes = []types.NodeInfo{ + types.NodeInfo{ + types.CPUAndMem{ + types.CPUMap{ // 2 containers + "0": 10, "1": 10, "2": 10, "3": 10, + }, + 12400000, + }, + "n1", 0.0, 0, 0, 0, + }, + types.NodeInfo{ + types.CPUAndMem{ + types.CPUMap{ // 7 containers + "0": 10, "1": 10, "2": 10, "3": 10, + "4": 10, "5": 10, "6": 10, "7": 10, + "8": 10, "9": 10, "10": 10, "11": 10, + "12": 10, "13": 10, + }, + 12400000, + }, + "n2", 0.0, 0, 0, 0, + }, + types.NodeInfo{ + types.CPUAndMem{ + types.CPUMap{ // 6 containers + "0": 10, "1": 10, "2": 10, "3": 10, + "4": 10, "5": 10, "6": 10, "7": 10, + "8": 10, "9": 10, "10": 10, "11": 10, + }, + 12400000, + }, + "n3", 0.0, 0, 0, 0, + }, + types.NodeInfo{ + types.CPUAndMem{ + types.CPUMap{ // 9 containers + "0": 10, "1": 10, "2": 10, "3": 10, + "4": 10, "5": 10, "6": 10, "7": 10, + "8": 10, "9": 10, "10": 10, "11": 10, + "12": 10, "13": 10, "14": 10, "15": 10, + "16": 10, "17": 10, + }, + 12400000, + }, + "n4", 0.0, 0, 0, 0, + }, + types.NodeInfo{ + types.CPUAndMem{ + types.CPUMap{ // 4 containers + "0": 10, "1": 10, "2": 10, "3": 10, + "4": 10, "5": 10, "6": 10, "7": 10, + }, + 12400000, + }, + "n5", 0.0, 0, 0, 0, }, } + res3, changed3, err := k.SelectCPUNodes(nodes, 1.7, 23) if err != nil { fmt.Println("May be we dont have plan") @@ -298,33 +448,64 @@ func TestComplexNodes(t *testing.T) { assert.Equal(t, len(changed3), len(res3)) // test4 - nodes = map[string]types.CPUMap{ - "n1": types.CPUMap{ // 2 containers - "0": 10, "1": 10, "2": 10, "3": 10, - }, - "n2": types.CPUMap{ // 7 containers - "0": 10, "1": 10, "2": 10, "3": 10, - "4": 10, "5": 10, "6": 10, "7": 10, - "8": 10, "9": 10, "10": 10, "11": 10, - "12": 10, "13": 10, - }, - "n3": types.CPUMap{ // 6 containers - "0": 10, "1": 10, "2": 10, "3": 10, - "4": 10, "5": 10, "6": 10, "7": 10, - "8": 10, "9": 10, "10": 10, "11": 10, - }, - "n4": types.CPUMap{ // 9 containers - "0": 10, "1": 10, "2": 10, "3": 10, - "4": 10, "5": 10, "6": 10, "7": 10, - "8": 10, "9": 10, "10": 10, "11": 10, - "12": 10, "13": 10, "14": 10, "15": 10, - "16": 10, "17": 10, - }, - "n5": types.CPUMap{ // 4 containers - "0": 10, "1": 10, "2": 10, "3": 10, - "4": 10, "5": 10, "6": 10, "7": 10, + nodes = []types.NodeInfo{ + types.NodeInfo{ + types.CPUAndMem{ + types.CPUMap{ // 2 containers + "0": 10, "1": 10, "2": 10, "3": 10, + }, + 12400000, + }, + "n1", 0.0, 0, 0, 0, + }, + types.NodeInfo{ + types.CPUAndMem{ + types.CPUMap{ // 7 containers + "0": 10, "1": 10, "2": 10, "3": 10, + "4": 10, "5": 10, "6": 10, "7": 10, + "8": 10, "9": 10, "10": 10, "11": 10, + "12": 10, "13": 10, + }, + 12400000, + }, + "n2", 0.0, 0, 0, 0, + }, + types.NodeInfo{ + types.CPUAndMem{ + types.CPUMap{ // 6 containers + "0": 10, "1": 10, "2": 10, "3": 10, + "4": 10, "5": 10, "6": 10, "7": 10, + "8": 10, "9": 10, "10": 10, "11": 10, + }, + 12400000, + }, + "n3", 0.0, 0, 0, 0, + }, + types.NodeInfo{ + types.CPUAndMem{ + types.CPUMap{ // 9 containers + "0": 10, "1": 10, "2": 10, "3": 10, + "4": 10, "5": 10, "6": 10, "7": 10, + "8": 10, "9": 10, "10": 10, "11": 10, + "12": 10, "13": 10, "14": 10, "15": 10, + "16": 10, "17": 10, + }, + 12400000, + }, + "n4", 0.0, 0, 0, 0, + }, + types.NodeInfo{ + types.CPUAndMem{ + types.CPUMap{ // 4 containers + "0": 10, "1": 10, "2": 10, "3": 10, + "4": 10, "5": 10, "6": 10, "7": 10, + }, + 12400000, + }, + "n5", 0.0, 0, 0, 0, }, } + _, _, newErr := k.SelectCPUNodes(nodes, 1.6, 29) if newErr == nil { t.Fatalf("how to alloc 29 containers when you only have 28?") @@ -348,12 +529,24 @@ func TestEvenPlan(t *testing.T) { } // nodes -- n1: 2, n2: 2 - pod1 := map[string]types.CPUMap{ - "n1": types.CPUMap{ // 2 containers - "0": 10, "1": 10, "2": 10, "3": 10, - }, - "n2": types.CPUMap{ // 2 containers - "0": 10, "1": 10, "2": 10, "3": 10, + pod1 := []types.NodeInfo{ + types.NodeInfo{ + types.CPUAndMem{ + types.CPUMap{ + "0": 10, "1": 10, "2": 10, "3": 10, + }, + 12400000, + }, + "node1", 0.0, 0, 0, 0, + }, + types.NodeInfo{ + types.CPUAndMem{ + types.CPUMap{ + "0": 10, "1": 10, "2": 10, "3": 10, + }, + 12400000, + }, + "node2", 0.0, 0, 0, 0, }, } @@ -367,25 +560,49 @@ func TestEvenPlan(t *testing.T) { assert.Equal(t, len(rem1), 2) // nodes -- n1: 4, n2: 5, n3:6, n4: 5 - pod2 := map[string]types.CPUMap{ - "n1": types.CPUMap{ // 4 containers - "0": 10, "1": 10, "2": 10, "3": 10, - "4": 10, "5": 10, "6": 10, "7": 10, - }, - "n2": types.CPUMap{ // 5 containers - "0": 10, "1": 10, "2": 10, "3": 10, - "4": 10, "5": 10, "6": 10, "7": 10, - "8": 10, "9": 10, - }, - "n3": types.CPUMap{ // 6 containers - "0": 10, "1": 10, "2": 10, "3": 10, - "4": 10, "5": 10, "6": 10, "7": 10, - "8": 10, "9": 10, "10": 10, "11": 10, - }, - "n4": types.CPUMap{ // 5 containers - "0": 10, "1": 10, "2": 10, "3": 10, - "4": 10, "5": 10, "6": 10, "7": 10, - "8": 10, "9": 10, + pod2 := []types.NodeInfo{ + types.NodeInfo{ + types.CPUAndMem{ + types.CPUMap{ // 4 containers + "0": 10, "1": 10, "2": 10, "3": 10, + "4": 10, "5": 10, "6": 10, "7": 10, + }, + 12400000, + }, + "n1", 0.0, 0, 0, 0, + }, + types.NodeInfo{ + types.CPUAndMem{ + types.CPUMap{ // 5 containers + "0": 10, "1": 10, "2": 10, "3": 10, + "4": 10, "5": 10, "6": 10, "7": 10, + "8": 10, "9": 10, + }, + 12400000, + }, + "n2", 0.0, 0, 0, 0, + }, + types.NodeInfo{ + types.CPUAndMem{ + types.CPUMap{ // 6 containers + "0": 10, "1": 10, "2": 10, "3": 10, + "4": 10, "5": 10, "6": 10, "7": 10, + "8": 10, "9": 10, "10": 10, "11": 10, + }, + 12400000, + }, + "n3", 0.0, 0, 0, 0, + }, + types.NodeInfo{ + types.CPUAndMem{ + types.CPUMap{ // 5 containers + "0": 10, "1": 10, "2": 10, "3": 10, + "4": 10, "5": 10, "6": 10, "7": 10, + "8": 10, "9": 10, + }, + 12400000, + }, + "n4", 0.0, 0, 0, 0, }, } @@ -395,52 +612,101 @@ func TestEvenPlan(t *testing.T) { } assert.Equal(t, len(rem2), 3) - pod3 := map[string]types.CPUMap{ - "n1": types.CPUMap{ // 4 containers - "0": 10, "1": 10, "2": 10, "3": 10, - "4": 10, "5": 10, "6": 10, "7": 10, - }, - "n2": types.CPUMap{ // 5 containers - "0": 10, "1": 10, "2": 10, "3": 10, - "4": 10, "5": 10, "6": 10, "7": 10, - "8": 10, "9": 10, - }, - "n3": types.CPUMap{ // 6 containers - "0": 10, "1": 10, "2": 10, "3": 10, - "4": 10, "5": 10, "6": 10, "7": 10, - "8": 10, "9": 10, "10": 10, "11": 10, - }, - "n4": types.CPUMap{ // 5 containers - "0": 10, "1": 10, "2": 10, "3": 10, - "4": 10, "5": 10, "6": 10, "7": 10, - "8": 10, "9": 10, + pod3 := []types.NodeInfo{ + types.NodeInfo{ + types.CPUAndMem{ + types.CPUMap{ // 4 containers + "0": 10, "1": 10, "2": 10, "3": 10, + "4": 10, "5": 10, "6": 10, "7": 10, + }, + 12400000, + }, + "n1", 0.0, 0, 0, 0, + }, + types.NodeInfo{ + types.CPUAndMem{ + types.CPUMap{ // 5 containers + "0": 10, "1": 10, "2": 10, "3": 10, + "4": 10, "5": 10, "6": 10, "7": 10, + "8": 10, "9": 10, + }, + 12400000, + }, + "n2", 0.0, 0, 0, 0, + }, + types.NodeInfo{ + types.CPUAndMem{ + types.CPUMap{ // 6 containers + "0": 10, "1": 10, "2": 10, "3": 10, + "4": 10, "5": 10, "6": 10, "7": 10, + "8": 10, "9": 10, "10": 10, "11": 10, + }, + 12400000, + }, + "n3", 0.0, 0, 0, 0, + }, + types.NodeInfo{ + types.CPUAndMem{ + types.CPUMap{ // 5 containers + "0": 10, "1": 10, "2": 10, "3": 10, + "4": 10, "5": 10, "6": 10, "7": 10, + "8": 10, "9": 10, + }, + 12400000, + }, + "n4", 0.0, 0, 0, 0, }, } + res3, rem3, err := k.SelectCPUNodes(pod3, 1.7, 8) if check := checkAvgPlan(res3, 2, 2, "res3"); check != nil { t.Fatalf("something went wront") } assert.Equal(t, len(rem3), 4) - pod4 := map[string]types.CPUMap{ - "n1": types.CPUMap{ // 4 containers - "0": 10, "1": 10, "2": 10, "3": 10, - "4": 10, "5": 10, "6": 10, "7": 10, - }, - "n2": types.CPUMap{ // 5 containers - "0": 10, "1": 10, "2": 10, "3": 10, - "4": 10, "5": 10, "6": 10, "7": 10, - "8": 10, "9": 10, - }, - "n3": types.CPUMap{ // 6 containers - "0": 10, "1": 10, "2": 10, "3": 10, - "4": 10, "5": 10, "6": 10, "7": 10, - "8": 10, "9": 10, "10": 10, "11": 10, - }, - "n4": types.CPUMap{ // 5 containers - "0": 10, "1": 10, "2": 10, "3": 10, - "4": 10, "5": 10, "6": 10, "7": 10, - "8": 10, "9": 10, + pod4 := []types.NodeInfo{ + types.NodeInfo{ + types.CPUAndMem{ + types.CPUMap{ // 4 containers + "0": 10, "1": 10, "2": 10, "3": 10, + "4": 10, "5": 10, "6": 10, "7": 10, + }, + 12400000, + }, + "n1", 0.0, 0, 0, 0, + }, + types.NodeInfo{ + types.CPUAndMem{ + types.CPUMap{ // 5 containers + "0": 10, "1": 10, "2": 10, "3": 10, + "4": 10, "5": 10, "6": 10, "7": 10, + "8": 10, "9": 10, + }, + 12400000, + }, + "n2", 0.0, 0, 0, 0, + }, + types.NodeInfo{ + types.CPUAndMem{ + types.CPUMap{ // 6 containers + "0": 10, "1": 10, "2": 10, "3": 10, + "4": 10, "5": 10, "6": 10, "7": 10, + "8": 10, "9": 10, "10": 10, "11": 10, + }, + 12400000, + }, + "n3", 0.0, 0, 0, 0, + }, + types.NodeInfo{ + types.CPUAndMem{ + types.CPUMap{ // 5 containers + "0": 10, "1": 10, "2": 10, "3": 10, + "4": 10, "5": 10, "6": 10, "7": 10, + "8": 10, "9": 10, + }, + 12400000, + }, + "n4", 0.0, 0, 0, 0, }, } @@ -452,17 +718,35 @@ func TestEvenPlan(t *testing.T) { } func TestSpecialCase(t *testing.T) { - pod := map[string]types.CPUMap{ - "n1": types.CPUMap{ // 1 containers - "0": 10, "1": 10, - }, - "n2": types.CPUMap{ // 3 containers - "0": 10, "1": 10, "2": 10, "3": 10, - "4": 10, "5": 10, - }, - "n3": types.CPUMap{ // 4 containers - "0": 10, "1": 10, "2": 10, "3": 10, - "4": 10, "5": 10, "6": 10, "7": 10, + pod := []types.NodeInfo{ + types.NodeInfo{ + types.CPUAndMem{ + types.CPUMap{ // 4 containers + "0": 10, "1": 10, + }, + 12400000, + }, + "n1", 0.0, 0, 0, 0, + }, + types.NodeInfo{ + types.CPUAndMem{ + types.CPUMap{ // 5 containers + "0": 10, "1": 10, "2": 10, "3": 10, + "4": 10, "5": 10, + }, + 12400000, + }, + "n2", 0.0, 0, 0, 0, + }, + types.NodeInfo{ + types.CPUAndMem{ + types.CPUMap{ // 6 containers + "0": 10, "1": 10, "2": 10, "3": 10, + "4": 10, "5": 10, "6": 10, "7": 10, + }, + 12400000, + }, + "n3", 0.0, 0, 0, 0, }, } @@ -483,14 +767,26 @@ func TestSpecialCase(t *testing.T) { } checkAvgPlan(res1, 1, 3, "new test 2") - newpod := map[string]types.CPUMap{ - "n1": types.CPUMap{ // 3 containers - "0": 10, "1": 10, "2": 10, "3": 10, - "4": 10, "5": 10, - }, - "n2": types.CPUMap{ // 4 containers - "0": 10, "1": 10, "2": 10, "3": 10, - "4": 10, "5": 10, "6": 10, "7": 10, + newpod := []types.NodeInfo{ + types.NodeInfo{ + types.CPUAndMem{ + types.CPUMap{ // 4 containers + "0": 10, "1": 10, "2": 10, "3": 10, + "4": 10, "5": 10, + }, + 12400000, + }, + "n1", 0.0, 0, 0, 0, + }, + types.NodeInfo{ + types.CPUAndMem{ + types.CPUMap{ // 4 containers + "0": 10, "1": 10, "2": 10, "3": 10, + "4": 10, "5": 10, "6": 10, "7": 10, + }, + 12400000, + }, + "n2", 0.0, 0, 0, 0, }, } @@ -499,10 +795,10 @@ func TestSpecialCase(t *testing.T) { checkAvgPlan(res2, 2, 2, "new test 2") } -func generateNodes(nums, maxCores, seed int) *map[string]types.CPUMap { +func generateNodes(nums, maxCores, seed int) []types.NodeInfo { var name string var cores int - pod := make(map[string]types.CPUMap) + pod := []types.NodeInfo{} s := rand.NewSource(int64(seed)) r1 := rand.New(s) @@ -510,43 +806,51 @@ func generateNodes(nums, maxCores, seed int) *map[string]types.CPUMap { for i := 0; i < nums; i++ { name = fmt.Sprintf("n%d", i) cores = r1.Intn(maxCores + 1) - pod[name] = make(types.CPUMap) + + cpumap := types.CPUMap{} for j := 0; j < cores; j++ { coreName := fmt.Sprintf("%d", j) - pod[name][coreName] = 10 + cpumap[coreName] = 10 } + cpuandmem := types.CPUAndMem{cpumap, 12040000} + nodeInfo := types.NodeInfo{cpuandmem, name, 0, 0, 0, 0} + pod = append(pod, nodeInfo) } - return &pod + return pod } var hugePod = generateNodes(10000, 24, 10086) -func getPodVol(nodes map[string]types.CPUMap, cpu float64) int { - var res int +func getPodVol(nodes []types.NodeInfo, cpu float64) int64 { + var res int64 var host *host var plan []types.CPUMap - for _, cpuInfo := range nodes { - host = newHost(cpuInfo, 10) + for _, nodeInfo := range nodes { + host = newHost(nodeInfo.CPUAndMem.CpuMap, 10) plan = host.getContainerCores(cpu, -1) - res += len(plan) + res += int64(len(plan)) } return res } func TestGetPodVol(t *testing.T) { - nodes := map[string]types.CPUMap{ - "c2-docker-26": types.CPUMap{ - "15": 0, "3": 10, "0": 0, "10": 0, "13": 0, "7": 10, "8": 0, "9": 10, "12": 10, "2": 10, "4": 10, "1": 0, "11": 0, "14": 10, "5": 10, "6": 10, + nodes := []types.NodeInfo{ + types.NodeInfo{ + types.CPUAndMem{ + types.CPUMap{"15": 0, "3": 10, "0": 0, "10": 0, "13": 0, "7": 10, "8": 0, "9": 10, "12": 10, "2": 10, "4": 10, "1": 0, "11": 0, "14": 10, "5": 10, "6": 10}, + 12400000, + }, + "c2-docker-26", 0.0, 0, 0, 0, }, } res := getPodVol(nodes, 0.5) - assert.Equal(t, res, 18) + assert.Equal(t, res, int64(18)) res = getPodVol(nodes, 0.3) - assert.Equal(t, res, 27) + assert.Equal(t, res, int64(27)) res = getPodVol(nodes, 1.1) - assert.Equal(t, res, 8) + assert.Equal(t, res, int64(8)) } func Benchmark_ExtreamAlloc(b *testing.B) { @@ -564,8 +868,8 @@ func Benchmark_ExtreamAlloc(b *testing.B) { b.StopTimer() b.StartTimer() - vol := getPodVol(*hugePod, 1.3) - result, changed, err := k.SelectCPUNodes(*hugePod, 1.3, vol) + vol := getPodVol(hugePod, 1.3) + result, changed, err := k.SelectCPUNodes(hugePod, 1.3, vol) if err != nil { b.Fatalf("something went wrong") } @@ -587,7 +891,7 @@ func Benchmark_AveAlloc(b *testing.B) { k, _ := New(coreCfg) b.StartTimer() - result, changed, err := k.SelectCPUNodes(*hugePod, 1.7, 12000) + result, changed, err := k.SelectCPUNodes(hugePod, 1.7, 12000) if err != nil { b.Fatalf("something went wrong") } diff --git a/scheduler/scheduler.go b/scheduler/scheduler.go index 4ce662155..3ca0a1170 100644 --- a/scheduler/scheduler.go +++ b/scheduler/scheduler.go @@ -9,8 +9,8 @@ type Scheduler interface { // select one node from nodes, returns nodename // typically used to build image RandomNode(nodes map[string]types.CPUMap) (string, error) - SelectMemoryNodes(nodesInfo []types.NodeInfo, quota int, memory int64, need int) (map[string]int, error) + SelectMemoryNodes(nodesInfo []types.NodeInfo, quota, memory, need int64) ([]types.NodeInfo, error) // select nodes from nodes, return a list of nodenames and the corresponding cpumap, and also the changed nodes with remaining cpumap // quota and number must be given, typically used to determine where to deploy - SelectCPUNodes(nodes map[string]types.CPUMap, quota float64, need int) (map[string][]types.CPUMap, map[string]types.CPUMap, error) + SelectCPUNodes(nodesInfo []types.NodeInfo, quota float64, need int64) (map[string][]types.CPUMap, map[string]types.CPUMap, error) } diff --git a/scheduler/scheduler_test.go b/scheduler/scheduler_test.go index a198147b0..96d88a2dc 100644 --- a/scheduler/scheduler_test.go +++ b/scheduler/scheduler_test.go @@ -9,7 +9,6 @@ import ( ) func TestSchedulerInvoke(t *testing.T) { - // scheduler := complexscheduler.New() coreCfg := types.Config{ EtcdMachines: []string{"http://127.0.0.1:2379"}, EtcdLockPrefix: "/eru-core/_lock", @@ -21,17 +20,28 @@ func TestSchedulerInvoke(t *testing.T) { } scheduler, _ := complexscheduler.New(coreCfg) - nodes := map[string]types.CPUMap{ - "node1": types.CPUMap{ - "0": 10, - "1": 10, + nodes := []types.NodeInfo{ + types.NodeInfo{ + types.CPUAndMem{ + types.CPUMap{ + "0": 10, + "1": 10, + }, + 12400000, + }, + "node1", 0.0, 0, 0, 0, }, - "node2": types.CPUMap{ - "0": 10, - "1": 10, + types.NodeInfo{ + types.CPUAndMem{ + types.CPUMap{ + "0": 10, + "1": 10, + }, + 12400000, + }, + "node2", 0.0, 0, 0, 0, }, } - _, _, err := scheduler.SelectCPUNodes(nodes, 1, 2) assert.NoError(t, err) } diff --git a/scheduler/simple/magnesium.go b/scheduler/simple/magnesium.go index c32ce972a..24a2eeafc 100644 --- a/scheduler/simple/magnesium.go +++ b/scheduler/simple/magnesium.go @@ -44,7 +44,7 @@ func (m *magnesium) SelectMemoryNodes(nodesInfo []types.NodeInfo, quota int, mem // Select nodes for deploying. // Use round robin method to select, in order to make scheduler average. -func (m *magnesium) SelectCPUNodes(nodes map[string]types.CPUMap, quota float64, num int) (map[string][]types.CPUMap, map[string]types.CPUMap, error) { +func (m *magnesium) SelectCPUNodes(nodes map[string]types.NodeInfo, quota float64, num int) (map[string][]types.CPUMap, map[string]types.CPUMap, error) { m.Lock() defer m.Unlock() @@ -65,7 +65,7 @@ func (m *magnesium) SelectCPUNodes(nodes map[string]types.CPUMap, quota float64, done: for { for nodename, cpumap := range nodes { - r, err := getQuota(cpumap, q) + r, err := getQuota(cpumap.CPU, q) if err != nil { continue } @@ -79,7 +79,7 @@ done: // 只把有修改的返回回去就可以了, 返回还剩下多少 for nodename, cpumap := range nodes { if _, ok := result[nodename]; ok { - changed[nodename] = cpumap + changed[nodename] = cpumap.CPU } } return result, changed, nil @@ -97,10 +97,10 @@ func resultLength(result map[string][]types.CPUMap) int { // count total CPU number // in fact is the number of all labels // don't care about the value -func totalQuota(nodes map[string]types.CPUMap) int { +func totalQuota(nodes map[string]types.NodeInfo) int { value := 0 for _, cmap := range nodes { - value += cpuCount(cmap) + value += cpuCount(cmap.CPU) } return value } diff --git a/scheduler/simple/magnesium_test.go b/scheduler/simple/magnesium_test.go index acb25ff86..b62561f49 100644 --- a/scheduler/simple/magnesium_test.go +++ b/scheduler/simple/magnesium_test.go @@ -42,18 +42,24 @@ func TestRandomNode(t *testing.T) { func TestSelectCPUNodes(t *testing.T) { m := &magnesium{} - _, _, err := m.SelectCPUNodes(map[string]types.CPUMap{}, 1, 1) + _, _, err := m.SelectCPUNodes(map[string]types.NodeInfo{}, 1, 1) assert.Error(t, err) assert.Equal(t, err.Error(), "No nodes provide to choose some") - nodes := map[string]types.CPUMap{ - "node1": types.CPUMap{ - "0": 10, - "1": 10, + nodes := map[string]types.NodeInfo{ + "node1": types.NodeInfo{ + Name: "node1", + CPU: types.CPUMap{ + "0": 10, + "1": 10, + }, }, - "node2": types.CPUMap{ - "0": 10, - "1": 10, + "node2": types.NodeInfo{ + Name: "node2", + CPU: types.CPUMap{ + "0": 10, + "1": 10, + }, }, } @@ -98,11 +104,11 @@ func TestSelectCPUNodes(t *testing.T) { assert.Error(t, err) assert.Contains(t, err.Error(), "Not enough") - for _, cpus := range nodes { - assert.Equal(t, cpus.Total(), 0) - assert.Equal(t, len(cpus), 2) - assert.Equal(t, cpus["node1"], 0) - assert.Equal(t, cpus["node2"], 0) + for _, node := range nodes { + assert.Equal(t, node.CPU.Total(), 0) + assert.Equal(t, len(node.CPU), 2) + assert.Equal(t, node.CPU["node1"], 0) + assert.Equal(t, node.CPU["node2"], 0) } } @@ -120,14 +126,20 @@ func TestResultLength(t *testing.T) { } func TestTotalQuota(t *testing.T) { - nodes := map[string]types.CPUMap{ - "node1": types.CPUMap{ - "0": 10, - "1": 0, + nodes := map[string]types.NodeInfo{ + "node1": types.NodeInfo{ + Name: "node1", + CPU: types.CPUMap{ + "0": 10, + "1": 0, + }, }, - "node2": types.CPUMap{ - "0": 5, - "1": 10, + "node2": types.NodeInfo{ + Name: "node2", + CPU: types.CPUMap{ + "0": 5, + "1": 10, + }, }, } assert.Equal(t, totalQuota(nodes), 3) @@ -135,18 +147,24 @@ func TestTotalQuota(t *testing.T) { func TestSelectPublicNodes(t *testing.T) { m := &magnesium{} - _, _, err := m.SelectCPUNodes(map[string]types.CPUMap{}, 1, 1) + _, _, err := m.SelectCPUNodes(map[string]types.NodeInfo{}, 1, 1) assert.Error(t, err) assert.Equal(t, err.Error(), "No nodes provide to choose some") - nodes := map[string]types.CPUMap{ - "node1": types.CPUMap{ - "0": 10, - "1": 10, + nodes := map[string]types.NodeInfo{ + "node1": types.NodeInfo{ + Name: "node1", + CPU: types.CPUMap{ + "0": 10, + "1": 10, + }, }, - "node2": types.CPUMap{ - "0": 10, - "1": 10, + "node2": types.NodeInfo{ + Name: "node2", + CPU: types.CPUMap{ + "0": 10, + "1": 10, + }, }, } @@ -161,8 +179,8 @@ func TestSelectPublicNodes(t *testing.T) { } } - for nodename, cpu := range nodes { + for nodename, node := range nodes { assert.Contains(t, []string{"node1", "node2"}, nodename) - assert.Equal(t, cpu.Total(), 20) + assert.Equal(t, node.CPU.Total(), 20) } } diff --git a/store/etcd/deploy.go b/store/etcd/deploy.go index f5b555362..f9b6522a9 100644 --- a/store/etcd/deploy.go +++ b/store/etcd/deploy.go @@ -21,7 +21,7 @@ func (k *krypton) MakeDeployStatus(opts *types.DeployOptions, nodesInfo []types. prefix := fmt.Sprintf("%s_%s", opts.Appname, opts.Entrypoint) m := map[string]string{} - nodesCount := map[string]int{} + nodesCount := map[string]int64{} for _, node := range resp.Node.Nodes { json.Unmarshal([]byte(node.Value), &m) if m["podname"] != opts.Podname { @@ -37,9 +37,9 @@ func (k *krypton) MakeDeployStatus(opts *types.DeployOptions, nodesInfo []types. nodesCount[m["nodename"]] += 1 } - for _, nodeInfo := range nodesInfo { + for p, nodeInfo := range nodesInfo { if v, ok := nodesCount[nodeInfo.Name]; ok { - nodeInfo.Count = v + nodesInfo[p].Count = v } } diff --git a/types/node.go b/types/node.go index 3deb03f94..4d0bc8345 100644 --- a/types/node.go +++ b/types/node.go @@ -11,16 +11,16 @@ import ( "golang.org/x/net/context" ) -type CPUMap map[string]int - type CPUAndMem struct { CpuMap CPUMap MemCap int64 } +type CPUMap map[string]int64 + // total quotas -func (c CPUMap) Total() int { - count := 0 +func (c CPUMap) Total() int64 { + var count int64 = 0 for _, value := range c { count += value } @@ -82,14 +82,15 @@ func (n *Node) GetIP() string { } type NodeInfo struct { + CPUAndMem Name string - CorePer int - Memory int64 + CPURate int64 - Capacity int + // 可以部署几个 + Capacity int64 // 上面有几个了 - Count int + Count int64 // 最终部署几个 - Deploy int + Deploy int64 // 其他需要 filter 的字段 } diff --git a/types/options.go b/types/options.go index 706d19bfa..62dbb42e9 100644 --- a/types/options.go +++ b/types/options.go @@ -11,7 +11,7 @@ type DeployOptions struct { Entrypoint string // Entrypoint to deploy ExtraArgs string // Extra arguments to append to command CPUQuota float64 // How many cores needed, e.g. 1.5 - Count int // How many containers needed, e.g. 4 + Count int64 // How many containers needed, e.g. 4 Memory int64 // Memory for container, in bytes Env []string // Env for container Networks map[string]string // Network names and specified IPs diff --git a/utils/utils_test.go b/utils/utils_test.go index 75e534a5d..02080a5e7 100644 --- a/utils/utils_test.go +++ b/utils/utils_test.go @@ -5,7 +5,6 @@ import ( "testing" "github.com/stretchr/testify/assert" - "gitlab.ricebook.net/platform/core/types" ) func TestRandomString(t *testing.T) { @@ -46,16 +45,6 @@ func TestGetGitRepoName(t *testing.T) { assert.Equal(t, r1, "core") } -func updateNodeInfo(input []types.NodeInfo, plan map[string]int, memory int64) []types.NodeInfo { - result := []types.NodeInfo{} - for _, node := range input { - memoryChange := int64(plan[node.Name]) * memory - node.Memory -= memoryChange - result = append(result, node) - } - return result -} - //TODO disable deploy test first //func TestContinuousAddingContainer(t *testing.T) { // // 测试分配算法随机性