Skip to content

Commit

Permalink
pass realloc test
Browse files Browse the repository at this point in the history
  • Loading branch information
jschwinger233 committed Oct 13, 2020
1 parent 07fe7c4 commit 24eae87
Show file tree
Hide file tree
Showing 5 changed files with 276 additions and 130 deletions.
29 changes: 0 additions & 29 deletions cluster/calcium/create_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -255,33 +255,4 @@ func TestCreateContainerTxn(t *testing.T) {
assert.EqualValues(t, 1, errCnt)
assert.EqualValues(t, 1, node1.CPUUsed+node2.CPUUsed)
return

/*
// doAllocResource fails: SaveProcessing
store.On("UpdateNodeResource",
mock.AnythingOfType("*context.timerCtx"),
mock.AnythingOfType("*types.Node"),
mock.AnythingOfType("types.ResourceMap"),
mock.AnythingOfType("float64"),
mock.AnythingOfType("int64"),
mock.AnythingOfType("int64"),
mock.AnythingOfType("types.ResourceMap"),
mock.AnythingOfType("string")).Return(
func(ctx context.Context, node *types.Node, _ types.CPUMap, quota float64, _, _ int64, _ types.VolumeMap, action string) error {
if action == st.ActionIncr {
quota = -quota
}
node.CPUUsed += quota
return nil
},
)
store.On("SaveProcessing", mock.Anything, mock.Anything, mock.Anything).Return(errors.Wrap(context.DeadlineExceeded, "SaveProcessing")).Once()
_, err = c.CreateContainer(ctx, opts)
assert.Error(t, err)
assert.True(t, errors.Is(err, context.DeadlineExceeded))
assert.Error(t, err, "SaveProcessing")
assert.EqualValues(t, 0, node1.CPUUsed)
assert.EqualValues(t, 0, node2.CPUUsed)
*/
}
64 changes: 42 additions & 22 deletions cluster/calcium/realloc.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,13 @@ func (c *Calcium) doReallocContainersOnPod(ctx context.Context, ch chan *types.R
Quota: container.Storage + opts.Storage,
},
}

for _, req := range reqs {
if err = req.DeployValidate(); err != nil {
ch <- &types.ReallocResourceMessage{Error: err}
return errors.WithStack(err)
}
}
containerGroups[nodename][reqs] = append(containerGroups[nodename][reqs], container)
}
}
Expand All @@ -121,12 +128,16 @@ func (c *Calcium) doReallocContainersOnNode(ctx context.Context, ch chan *types.
{
return c.withNodeLocked(ctx, nodename, func(node *types.Node) error {
planMap, total, _, err := scheduler.SelectNodes(newReqs, map[string]*types.Node{node.Name: node})
if err != nil {
return errors.WithStack(err)
}
if total < len(containers) {
return errors.WithStack(types.ErrInsufficientRes)
}

var (
rollbacks []int
rollbacks []int
originalContainers []types.Container
)

return utils.Txn(
Expand All @@ -136,7 +147,7 @@ func (c *Calcium) doReallocContainersOnNode(ctx context.Context, ch chan *types.
func(ctx context.Context) (err error) {

for _, plan := range planMap {
plan.ApplyChangesOnNode(node)
plan.ApplyChangesOnNode(node, utils.Range(len(containers))...)
}
for _, container := range containers {
recycleResources(node, container)
Expand All @@ -146,6 +157,9 @@ func (c *Calcium) doReallocContainersOnNode(ctx context.Context, ch chan *types.

// then: update instances' resources
func(ctx context.Context) error {
for _, container := range containers {
originalContainers = append(originalContainers, *container)
}
rollbacks, err = c.doUpdateResourceOnInstances(ctx, ch, node, planMap, containers, hardVbsMap)
return err
},
Expand All @@ -156,7 +170,7 @@ func (c *Calcium) doReallocContainersOnNode(ctx context.Context, ch chan *types.
plan.RollbackChangesOnNode(node, rollbacks...)
}
for _, idx := range rollbacks {
preserveResources(node, containers[idx])
preserveResources(node, &originalContainers[idx])
}
return c.store.UpdateNodes(ctx, node)
},
Expand All @@ -172,27 +186,33 @@ func (c *Calcium) doUpdateResourceOnInstances(ctx context.Context, ch chan *type
wg.Add(len(containers))

for idx, container := range containers {
resources := &types.Resources{}
for _, plan := range planMap {
plan.Dispense(types.DispenseOptions{
Node: node,
Index: idx,
ExistingInstances: containers,
HardVolumeBindings: hardVbsMap[container.ID],
}, resources)
}

go func(container *types.Container, resource types.Resources, idx int) {
defer wg.Done()

go func(container *types.Container, idx int) {
var e error
msg := &types.ReallocResourceMessage{ContainerID: container.ID}
if e := c.doUpdateResourceOnInstance(ctx, node, container, resource); e != nil {
err = e
msg.Error = e
rollbacks = append(rollbacks, idx)
defer func() {
if e != nil {
err = e
msg.Error = e
rollbacks = append(rollbacks, idx)
}
wg.Done()
ch <- msg
}()

resources := &types.Resources{}
for _, plan := range planMap {
if e = plan.Dispense(types.DispenseOptions{
Node: node,
Index: idx,
ExistingInstances: containers,
HardVolumeBindings: hardVbsMap[container.ID],
}, resources); e != nil {
return
}
}
ch <- msg
}(container, *resources, idx)

e = c.doUpdateResourceOnInstance(ctx, node, container, *resources)
}(container, idx)
}

wg.Wait()
Expand Down
Loading

0 comments on commit 24eae87

Please sign in to comment.