Skip to content

Commit

Permalink
simplify realloc: accept single container ID
Browse files Browse the repository at this point in the history
  • Loading branch information
jschwinger233 committed Nov 9, 2020
1 parent 7f41a7a commit b6461c9
Show file tree
Hide file tree
Showing 29 changed files with 6,050 additions and 3,758 deletions.
24 changes: 12 additions & 12 deletions cluster/calcium/create.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func (c *Calcium) doCreateWorkloads(ctx context.Context, opts *types.DeployOptio

var (
err error
planMap map[types.ResourceType]resourcetypes.ResourcePlans
plans []resourcetypes.ResourcePlans
deployMap map[string]int
rollbackMap map[string][]int
)
Expand Down Expand Up @@ -71,14 +71,14 @@ func (c *Calcium) doCreateWorkloads(ctx context.Context, opts *types.DeployOptio
}()

// calculate plans
if planMap, deployMap, err = c.doAllocResource(ctx, nodeMap, opts); err != nil {
if plans, deployMap, err = c.doAllocResource(ctx, nodeMap, opts); err != nil {
return errors.WithStack(err)
}

// commit changes
nodes := []*types.Node{}
for nodename, deploy := range deployMap {
for _, plan := range planMap {
for _, plan := range plans {
plan.ApplyChangesOnNode(nodeMap[nodename], utils.Range(deploy)...)
}
nodes = append(nodes, nodeMap[nodename])
Expand All @@ -92,15 +92,15 @@ func (c *Calcium) doCreateWorkloads(ctx context.Context, opts *types.DeployOptio

// then: deploy containers
func(ctx context.Context) error {
rollbackMap, err = c.doDeployWorkloads(ctx, ch, opts, planMap, deployMap)
rollbackMap, err = c.doDeployWorkloads(ctx, ch, opts, plans, deployMap)
return errors.WithStack(err)
},

// rollback: give back resources
func(ctx context.Context) (err error) {
for nodename, rollbackIndices := range rollbackMap {
if e := c.withNodeLocked(ctx, nodename, func(node *types.Node) error {
for _, plan := range planMap {
for _, plan := range plans {
plan.RollbackChangesOnNode(node, rollbackIndices...)
}
return errors.WithStack(c.store.UpdateNodes(ctx, node))
Expand All @@ -120,7 +120,7 @@ func (c *Calcium) doCreateWorkloads(ctx context.Context, opts *types.DeployOptio
return ch, err
}

func (c *Calcium) doDeployWorkloads(ctx context.Context, ch chan *types.CreateContainerMessage, opts *types.DeployOptions, planMap map[types.ResourceType]resourcetypes.ResourcePlans, deployMap map[string]int) (_ map[string][]int, err error) {
func (c *Calcium) doDeployWorkloads(ctx context.Context, ch chan *types.CreateContainerMessage, opts *types.DeployOptions, plans []resourcetypes.ResourcePlans, deployMap map[string]int) (_ map[string][]int, err error) {
wg := sync.WaitGroup{}
wg.Add(len(deployMap))

Expand All @@ -130,7 +130,7 @@ func (c *Calcium) doDeployWorkloads(ctx context.Context, ch chan *types.CreateCo
go metrics.Client.SendDeployCount(deploy)
go func(nodename string, deploy, seq int) {
defer wg.Done()
if indices, e := c.doDeployWorkloadsOnNode(ctx, ch, nodename, opts, deploy, planMap, seq); e != nil {
if indices, e := c.doDeployWorkloadsOnNode(ctx, ch, nodename, opts, deploy, plans, seq); e != nil {
err = e
rollbackMap[nodename] = indices
}
Expand All @@ -144,7 +144,7 @@ func (c *Calcium) doDeployWorkloads(ctx context.Context, ch chan *types.CreateCo
}

// deploy scheduled containers on one node
func (c *Calcium) doDeployWorkloadsOnNode(ctx context.Context, ch chan *types.CreateContainerMessage, nodename string, opts *types.DeployOptions, deploy int, planMap map[types.ResourceType]resourcetypes.ResourcePlans, seq int) (indices []int, err error) {
func (c *Calcium) doDeployWorkloadsOnNode(ctx context.Context, ch chan *types.CreateContainerMessage, nodename string, opts *types.DeployOptions, deploy int, plans []resourcetypes.ResourcePlans, seq int) (indices []int, err error) {
node, err := c.doGetAndPrepareNode(ctx, nodename, opts.Image)
if err != nil {
for i := 0; i < deploy; i++ {
Expand All @@ -170,18 +170,18 @@ func (c *Calcium) doDeployWorkloadsOnNode(ctx context.Context, ch chan *types.Cr
ch <- createMsg
}()

var r *types.Resource1
var r *types.ResourceMeta
o := resourcetypes.DispenseOptions{
Node: node,
Index: idx,
}
for _, plan := range planMap {
for _, plan := range plans {
if r, e = plan.Dispense(o, r); e != nil {
return
}
}

createMsg.Resource1 = *r
createMsg.ResourceMeta = *r
e = c.doDeployOneWorkload(ctx, node, opts, createMsg, seq+idx, deploy-1-idx)
return e
}
Expand Down Expand Up @@ -210,7 +210,7 @@ func (c *Calcium) doDeployOneWorkload(
) (err error) {
config := c.doMakeContainerOptions(no, msg, opts, node)
container := &types.Container{
Resource1: types.Resource1{
ResourceMeta: types.ResourceMeta{
CPU: msg.CPU,
CPUQuotaRequest: msg.CPUQuotaRequest,
CPUQuotaLimit: msg.CPUQuotaLimit,
Expand Down
2 changes: 1 addition & 1 deletion cluster/calcium/dissociate.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ func (c *Calcium) DissociateContainer(ctx context.Context, IDs []string) (chan *
// then
func(ctx context.Context) error {
log.Infof("[DissociateContainer] Container %s dissociated", container.ID)
return c.store.UpdateNodeResource(ctx, node, &container.Resource1, store.ActionIncr)
return c.store.UpdateNodeResource(ctx, node, &container.ResourceMeta, store.ActionIncr)
},
// rollback
nil,
Expand Down
2 changes: 1 addition & 1 deletion cluster/calcium/dissociate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ func TestDissociateContainer(t *testing.T) {
lock.On("Unlock", mock.Anything).Return(nil)

c1 := &types.Container{
Resource1: types.Resource1{
ResourceMeta: types.ResourceMeta{
MemoryLimit: 5 * int64(units.MiB),
MemoryRequest: 5 * int64(units.MiB),
CPUQuotaLimit: 0.9,
Expand Down
Loading

0 comments on commit b6461c9

Please sign in to comment.