Skip to content

Commit

Permalink
fix bugs
Browse files Browse the repository at this point in the history
  • Loading branch information
tonicmuroq committed Jun 21, 2016
1 parent b3679da commit 6252b73
Show file tree
Hide file tree
Showing 9 changed files with 95 additions and 21 deletions.
4 changes: 0 additions & 4 deletions cluster/calcium/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,3 @@ func New(config types.Config) (*Calcium, error) {

return &Calcium{store: store, config: config, scheduler: scheduler}, nil
}

func (c *Calcium) Run() {

}
47 changes: 39 additions & 8 deletions cluster/calcium/create_container.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,32 +77,53 @@ func (c *Calcium) prepareNodes(podname string, quota float64, num int) (map[stri
c.Lock()
defer c.Unlock()

q := int(quota)
r := make(map[string][]types.CPUMap)

nodes, err := c.ListPodNodes(podname)
if err != nil {
return r, err
}

// if public, use only public nodes
if q == 0 {
nodes = filterPublicNodes(nodes)
}

cpumap := makeCPUMap(nodes)
// TODO too be simple, just use int type as quota
r, err = c.scheduler.SelectNodes(cpumap, int(quota), num)
r, err = c.scheduler.SelectNodes(cpumap, q, num)
if err != nil {
return r, err
}

// cpus remained
// update data to etcd
// `SelectNodes` reduces count in cpumap
for _, node := range nodes {
node.CPU = cpumap[node.Name]
// ignore error
c.store.UpdateNode(node)
// if quota is set to 0
// then no cpu is required
if q > 0 {
// cpus remained
// update data to etcd
// `SelectNodes` reduces count in cpumap
for _, node := range nodes {
node.CPU = cpumap[node.Name]
// ignore error
c.store.UpdateNode(node)
}
}

return r, err
}

// filter only public nodes
func filterPublicNodes(nodes []*types.Node) []*types.Node {
rs := []*types.Node{}
for _, node := range nodes {
if node.Public {
rs = append(rs, node)
}
}
return rs
}

// Pull an image
// Blocks until it finishes.
func doPullImage(node *types.Node, image string) error {
Expand All @@ -120,6 +141,10 @@ func doPullImage(node *types.Node, image string) error {

func (c *Calcium) doCreateContainer(nodename string, cpumap []types.CPUMap, specs types.Specs, opts *types.DeployOptions) []*types.CreateContainerMessage {
ms := make([]*types.CreateContainerMessage, len(cpumap))
for i := 0; i < len(ms); i++ {
ms[i] = &types.CreateContainerMessage{}
}

node, err := c.GetNode(opts.Podname, nodename)
if err != nil {
return ms
Expand Down Expand Up @@ -178,7 +203,13 @@ func (c *Calcium) doCreateContainer(nodename string, cpumap []types.CPUMap, spec
return ms
}

// When deploy on a public host
// quota is set to 0
// no need to update this to etcd (save 1 time write on etcd)
func (c *Calcium) releaseQuota(node *types.Node, quota types.CPUMap) {
if quota.Total() == 0 {
return
}
node.CPU.Add(quota)
c.store.UpdateNode(node)
}
Expand Down
4 changes: 3 additions & 1 deletion cluster/calcium/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,12 @@ package calcium
import "io"
import "io/ioutil"

// As the name says,
// blocks until the stream is empty, until we meet EOF
func ensureReaderClosed(stream io.ReadCloser) {
if stream == nil {
return
}
io.CopyN(ioutil.Discard, stream, 512)
io.Copy(ioutil.Discard, stream)
stream.Close()
}
10 changes: 7 additions & 3 deletions cluster/calcium/remove_container.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,9 +89,13 @@ func (c *Calcium) removeOneContainer(container *types.Container) error {
}
go container.Engine.ImageRemove(context.Background(), info.Image, rmiOpts)

node.CPU.Add(container.CPU)
if err := c.store.UpdateNode(node); err != nil {
return err
// if total cpu of container > 0, then we need to release these core resource
// but if it's 0, just ignore to save 1 time write on etcd.
if container.CPU.Total() > 0 {
node.CPU.Add(container.CPU)
if err := c.store.UpdateNode(node); err != nil {
return err
}
}
return c.store.RemoveContainer(info.ID)
}
2 changes: 1 addition & 1 deletion devtools/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ def create_container(ctx):
image='hub.ricebook.net/test-ci:966fd83',
podname='dev',
entrypoint='log',
cpu_quota=1,
cpu_quota=0,
count=1,
env=['ENV_A=1', 'ENV_B=2'])

Expand Down
3 changes: 2 additions & 1 deletion main.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,8 +115,9 @@ func main() {
EnvVar: "ERU_LOG_LEVEL",
},
}
app.Action = func(c *cli.Context) {
app.Action = func(c *cli.Context) error {
serve()
return nil
}

app.Run(os.Args)
Expand Down
5 changes: 5 additions & 0 deletions scheduler/simple/cpu.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,13 @@ func addCPUMap(c, b types.CPUMap) {

// Get quota from CPUMap c
// Returns the corresponding CPUMap
// If quota is 0, just return empty CPUMap
func getQuota(c types.CPUMap, quota int) (types.CPUMap, error) {
r := types.CPUMap{}
if quota == 0 {
return r, nil
}

if cpuCount(c) < quota {
return r, fmt.Errorf("Can't get quota, not enough resources")
}
Expand Down
8 changes: 5 additions & 3 deletions scheduler/simple/magnesium.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,11 @@ func (m *Magnesium) SelectNodes(nodes map[string]types.CPUMap, quota int, num in
return result, fmt.Errorf("No nodes provide to choose some")
}

total := totalQuota(nodes)
if total < num*quota {
return result, fmt.Errorf("Not enough CPUs, total: %d, require: %d", total, num)
if quota > 0 {
total := totalQuota(nodes)
if total < num*quota {
return result, fmt.Errorf("Not enough CPUs, total: %d, require: %d", total, num)
}
}

done:
Expand Down
33 changes: 33 additions & 0 deletions scheduler/simple/magnesium_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,3 +131,36 @@ func TestTotalQuota(t *testing.T) {
}
assert.Equal(t, totalQuota(nodes), 3)
}

func TestSelectPublicNodes(t *testing.T) {
m := &Magnesium{}
_, err := m.SelectNodes(map[string]types.CPUMap{}, 1, 1)
assert.Error(t, err)
assert.Equal(t, err.Error(), "No nodes provide to choose some")

nodes := map[string]types.CPUMap{
"node1": types.CPUMap{
"0": 10,
"1": 10,
},
"node2": types.CPUMap{
"0": 10,
"1": 10,
},
}

r, err := m.SelectNodes(nodes, 0, 10)
assert.NoError(t, err)
assert.Equal(t, resultLength(r), 10)
for nodename, cpus := range r {
assert.Contains(t, []string{"node1", "node2"}, nodename)
for _, cpu := range cpus {
assert.Equal(t, cpu.Total(), 0)
}
}

for nodename, cpu := range nodes {
assert.Contains(t, []string{"node1", "node2"}, nodename)
assert.Equal(t, cpu.Total(), 20)
}
}

0 comments on commit 6252b73

Please sign in to comment.