Skip to content

Commit

Permalink
Simplified Realloc and Refined Misc (#261)
Browse files Browse the repository at this point in the history
* redefine resource rpc interface

* redefine transform interface

* redefine cpumem and storage

* redefine a lot

* simplify realloc: accept single container ID

* Txn rollback func can know which part fails

* simplify volume dispense: compatible computing

Co-authored-by: CMGS <[email protected]>
  • Loading branch information
jschwinger233 and CMGS authored Nov 11, 2020
1 parent 30f925f commit 6dadb17
Show file tree
Hide file tree
Showing 51 changed files with 7,170 additions and 5,315 deletions.
4 changes: 2 additions & 2 deletions client/servicediscovery/eru_service_discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func (w *EruServiceDiscovery) Watch(ctx context.Context) (_ <-chan []string, err

for {
cancelTimer := make(chan struct{})
go func() {
go func(expectedInterval time.Duration) {
timer := time.NewTimer(expectedInterval * time.Second)
defer timer.Stop()
select {
Expand All @@ -60,7 +60,7 @@ func (w *EruServiceDiscovery) Watch(ctx context.Context) (_ <-chan []string, err
case <-cancelTimer:
return
}
}()
}(expectedInterval)
status, err := stream.Recv()
close(cancelTimer)
if err != nil {
Expand Down
99 changes: 48 additions & 51 deletions cluster/calcium/create.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func (c *Calcium) doCreateWorkloads(ctx context.Context, opts *types.DeployOptio

var (
err error
planMap map[types.ResourceType]resourcetypes.ResourcePlans
plans []resourcetypes.ResourcePlans
deployMap map[string]int
rollbackMap map[string][]int
)
Expand Down Expand Up @@ -71,14 +71,14 @@ func (c *Calcium) doCreateWorkloads(ctx context.Context, opts *types.DeployOptio
}()

// calculate plans
if planMap, deployMap, err = c.doAllocResource(ctx, nodeMap, opts); err != nil {
if plans, deployMap, err = c.doAllocResource(ctx, nodeMap, opts); err != nil {
return errors.WithStack(err)
}

// commit changes
nodes := []*types.Node{}
for nodename, deploy := range deployMap {
for _, plan := range planMap {
for _, plan := range plans {
plan.ApplyChangesOnNode(nodeMap[nodename], utils.Range(deploy)...)
}
nodes = append(nodes, nodeMap[nodename])
Expand All @@ -92,17 +92,16 @@ func (c *Calcium) doCreateWorkloads(ctx context.Context, opts *types.DeployOptio

// then: deploy containers
func(ctx context.Context) error {
rollbackMap, err = c.doDeployWorkloads(ctx, ch, opts, planMap, deployMap)
rollbackMap, err = c.doDeployWorkloads(ctx, ch, opts, plans, deployMap)
return errors.WithStack(err)
},

// rollback: give back resources
func(ctx context.Context) (err error) {
for nodeName, rollbackIndices := range rollbackMap {
indices := rollbackIndices
if e := c.withNodeLocked(ctx, nodeName, func(node *types.Node) error {
for _, plan := range planMap {
plan.RollbackChangesOnNode(node, indices...)
func(ctx context.Context, _ bool) (err error) {
for nodename, rollbackIndices := range rollbackMap {
if e := c.withNodeLocked(ctx, nodename, func(node *types.Node) error {
for _, plan := range plans {
plan.RollbackChangesOnNode(node, rollbackIndices...) // nolint:scopelint
}
return errors.WithStack(c.store.UpdateNodes(ctx, node))
}); e != nil {
Expand All @@ -121,7 +120,7 @@ func (c *Calcium) doCreateWorkloads(ctx context.Context, opts *types.DeployOptio
return ch, err
}

func (c *Calcium) doDeployWorkloads(ctx context.Context, ch chan *types.CreateContainerMessage, opts *types.DeployOptions, planMap map[types.ResourceType]resourcetypes.ResourcePlans, deployMap map[string]int) (_ map[string][]int, err error) {
func (c *Calcium) doDeployWorkloads(ctx context.Context, ch chan *types.CreateContainerMessage, opts *types.DeployOptions, plans []resourcetypes.ResourcePlans, deployMap map[string]int) (_ map[string][]int, err error) {
wg := sync.WaitGroup{}
wg.Add(len(deployMap))

Expand All @@ -131,7 +130,7 @@ func (c *Calcium) doDeployWorkloads(ctx context.Context, ch chan *types.CreateCo
go metrics.Client.SendDeployCount(deploy)
go func(nodename string, deploy, seq int) {
defer wg.Done()
if indices, e := c.doDeployWorkloadsOnNode(ctx, ch, nodename, opts, deploy, planMap, seq); e != nil {
if indices, e := c.doDeployWorkloadsOnNode(ctx, ch, nodename, opts, deploy, plans, seq); e != nil {
err = e
rollbackMap[nodename] = indices
}
Expand All @@ -145,8 +144,8 @@ func (c *Calcium) doDeployWorkloads(ctx context.Context, ch chan *types.CreateCo
}

// deploy scheduled containers on one node
func (c *Calcium) doDeployWorkloadsOnNode(ctx context.Context, ch chan *types.CreateContainerMessage, nodeName string, opts *types.DeployOptions, deploy int, planMap map[types.ResourceType]resourcetypes.ResourcePlans, seq int) (indices []int, err error) {
node, err := c.doGetAndPrepareNode(ctx, nodeName, opts.Image)
func (c *Calcium) doDeployWorkloadsOnNode(ctx context.Context, ch chan *types.CreateContainerMessage, nodename string, opts *types.DeployOptions, deploy int, plans []resourcetypes.ResourcePlans, seq int) (indices []int, err error) {
node, err := c.doGetAndPrepareNode(ctx, nodename, opts.Image)
if err != nil {
for i := 0; i < deploy; i++ {
ch <- &types.CreateContainerMessage{Error: err}
Expand All @@ -157,12 +156,11 @@ func (c *Calcium) doDeployWorkloadsOnNode(ctx context.Context, ch chan *types.Cr
for idx := 0; idx < deploy; idx++ {
createMsg := &types.CreateContainerMessage{
Podname: opts.Podname,
Nodename: nodeName,
Nodename: nodename,
Publish: map[string][]string{},
}

func() {
var e error
do := func(idx int) (e error) {
defer func() {
if e != nil {
err = e
Expand All @@ -172,20 +170,21 @@ func (c *Calcium) doDeployWorkloadsOnNode(ctx context.Context, ch chan *types.Cr
ch <- createMsg
}()

rsc := &types.Resources{}
r := &types.ResourceMeta{}
o := resourcetypes.DispenseOptions{
Node: node,
Index: idx,
}
for _, plan := range planMap {
if e = plan.Dispense(o, rsc); e != nil {
for _, plan := range plans {
if r, e = plan.Dispense(o, r); e != nil {
return
}
}

createMsg.Resources = *rsc
e = c.doDeployOneWorkload(ctx, node, opts, createMsg, seq+idx, deploy-1-idx)
}()
createMsg.ResourceMeta = *r
return c.doDeployOneWorkload(ctx, node, opts, createMsg, seq+idx, deploy-1-idx)
}
_ = do(idx)
}

return indices, errors.WithStack(err)
Expand All @@ -210,30 +209,29 @@ func (c *Calcium) doDeployOneWorkload(
) (err error) {
config := c.doMakeContainerOptions(no, msg, opts, node)
container := &types.Container{
Name: config.Name,
Labels: config.Labels,
Podname: opts.Podname,
Nodename: node.Name,
CPURequest: msg.CPURequest,
CPULimit: msg.CPULimit,
QuotaRequest: msg.CPUQuotaRequest,
QuotaLimit: msg.CPUQuotaLimit,
MemoryRequest: msg.MemoryRequest,
MemoryLimit: msg.MemoryLimit,
StorageRequest: msg.StorageRequest,
StorageLimit: msg.StorageLimit,
VolumeRequest: msg.VolumeRequest,
VolumeLimit: msg.VolumeLimit,
VolumePlanRequest: msg.VolumePlanRequest,
VolumePlanLimit: msg.VolumePlanLimit,
Hook: opts.Entrypoint.Hook,
Privileged: opts.Entrypoint.Privileged,
Engine: node.Engine,
SoftLimit: opts.SoftLimit,
Image: opts.Image,
Env: opts.Env,
User: opts.User,
ResourceSubdivisible: true,
ResourceMeta: types.ResourceMeta{
CPU: msg.CPU,
CPUQuotaRequest: msg.CPUQuotaRequest,
CPUQuotaLimit: msg.CPUQuotaLimit,
MemoryRequest: msg.MemoryRequest,
MemoryLimit: msg.MemoryLimit,
StorageRequest: msg.StorageRequest,
StorageLimit: msg.StorageLimit,
VolumeRequest: msg.VolumeRequest,
VolumeLimit: msg.VolumeLimit,
VolumePlanRequest: msg.VolumePlanRequest,
VolumePlanLimit: msg.VolumePlanLimit,
},
Name: config.Name,
Labels: config.Labels,
Podname: opts.Podname,
Nodename: node.Name,
Hook: opts.Entrypoint.Hook,
Privileged: opts.Entrypoint.Privileged,
Engine: node.Engine,
Image: opts.Image,
Env: opts.Env,
User: opts.User,
}
return utils.Txn(
ctx,
Expand Down Expand Up @@ -308,7 +306,7 @@ func (c *Calcium) doDeployOneWorkload(
},

// remove container
func(ctx context.Context) error {
func(ctx context.Context, _ bool) error {
return errors.WithStack(c.doRemoveContainer(ctx, container, true))
},
c.config.GlobalTimeout,
Expand All @@ -318,12 +316,11 @@ func (c *Calcium) doDeployOneWorkload(
func (c *Calcium) doMakeContainerOptions(no int, msg *types.CreateContainerMessage, opts *types.DeployOptions, node *types.Node) *enginetypes.VirtualizationCreateOptions {
config := &enginetypes.VirtualizationCreateOptions{}
// general
config.CPU = msg.CPULimit
config.CPU = msg.CPU
config.Quota = msg.CPUQuotaLimit
config.Memory = msg.MemoryLimit
config.Storage = msg.StorageLimit
config.NUMANode = node.GetNUMANode(msg.CPULimit)
config.SoftLimit = opts.SoftLimit
config.NUMANode = msg.NUMANode
config.RawArgs = opts.RawArgs
config.Lambda = opts.Lambda
config.User = opts.User
Expand Down
16 changes: 8 additions & 8 deletions cluster/calcium/create_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func TestCreateContainer(t *testing.T) {
opts.Count = 1

// failed by memory check
opts.RawResourceOptions = types.RawResourceOptions{MemoryLimit: -1}
opts.ResourceOpts = types.ResourceOptions{MemoryLimit: -1}
store.On("GetNodesByPod", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil, nil)
ch, err := c.CreateContainer(ctx, opts)
assert.Nil(t, err)
Expand All @@ -48,7 +48,7 @@ func TestCreateContainer(t *testing.T) {
}

// failed by CPUQuota
opts.RawResourceOptions = types.RawResourceOptions{CPULimit: -1, MemoryLimit: 1}
opts.ResourceOpts = types.ResourceOptions{CPUQuotaLimit: -1, MemoryLimit: 1}
ch, err = c.CreateContainer(ctx, opts)
assert.Nil(t, err)
for m := range ch {
Expand All @@ -60,12 +60,12 @@ func TestCreateContainerTxn(t *testing.T) {
c := NewTestCluster()
ctx := context.Background()
opts := &types.DeployOptions{
Count: 2,
DeployStrategy: strategy.Auto,
Podname: "p1",
RawResourceOptions: types.RawResourceOptions{CPULimit: 1},
Image: "zc:test",
Entrypoint: &types.Entrypoint{},
Count: 2,
DeployStrategy: strategy.Auto,
Podname: "p1",
ResourceOpts: types.ResourceOptions{CPUQuotaLimit: 1},
Image: "zc:test",
Entrypoint: &types.Entrypoint{},
}
store := &storemocks.Store{}
sche := &schedulermocks.Scheduler{}
Expand Down
2 changes: 1 addition & 1 deletion cluster/calcium/dissociate.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ func (c *Calcium) DissociateContainer(ctx context.Context, IDs []string) (chan *
// then
func(ctx context.Context) error {
log.Infof("[DissociateContainer] Container %s dissociated", container.ID)
return c.store.UpdateNodeResource(ctx, node, container.CPURequest, container.QuotaRequest, container.MemoryRequest, container.StorageRequest, container.VolumePlanRequest.IntoVolumeMap(), store.ActionIncr)
return c.store.UpdateNodeResource(ctx, node, &container.ResourceMeta, store.ActionIncr)
},
// rollback
nil,
Expand Down
20 changes: 11 additions & 9 deletions cluster/calcium/dissociate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,16 @@ func TestDissociateContainer(t *testing.T) {
lock.On("Unlock", mock.Anything).Return(nil)

c1 := &types.Container{
ID: "c1",
Podname: "p1",
MemoryLimit: 5 * int64(units.MiB),
MemoryRequest: 5 * int64(units.MiB),
QuotaLimit: 0.9,
QuotaRequest: 0.9,
CPURequest: types.CPUMap{"2": 90},
Nodename: "node1",
ResourceMeta: types.ResourceMeta{
MemoryLimit: 5 * int64(units.MiB),
MemoryRequest: 5 * int64(units.MiB),
CPUQuotaLimit: 0.9,
CPUQuotaRequest: 0.9,
CPU: types.CPUMap{"2": 90},
},
ID: "c1",
Podname: "p1",
Nodename: "node1",
}

node1 := &types.Node{
Expand Down Expand Up @@ -59,7 +61,7 @@ func TestDissociateContainer(t *testing.T) {
}
store.On("RemoveContainer", mock.Anything, mock.Anything).Return(nil)
// success
store.On("UpdateNodeResource", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil)
store.On("UpdateNodeResource", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil)
ch, err = c.DissociateContainer(ctx, []string{"c1"})
assert.NoError(t, err)
for r := range ch {
Expand Down
Loading

0 comments on commit 6dadb17

Please sign in to comment.