Skip to content

Commit

Permalink
cluster layer calls remap on resource changes
Browse files Browse the repository at this point in the history
  • Loading branch information
jschwinger233 authored and CMGS committed Mar 11, 2021
1 parent 2ec7b86 commit 6d2d3ef
Show file tree
Hide file tree
Showing 8 changed files with 99 additions and 21 deletions.
2 changes: 2 additions & 0 deletions cluster/calcium/build.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,9 @@ func (c *Calcium) pushImage(ctx context.Context, resp io.ReadCloser, node *types
break
}
if err == context.Canceled || err == context.DeadlineExceeded {
log.Errorf("[BuildImage] context timeout")
lastMessage.ErrorDetail.Code = -1
lastMessage.ErrorDetail.Message = err.Error()
lastMessage.Error = err.Error()
break
}
Expand Down
8 changes: 8 additions & 0 deletions cluster/calcium/create.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,14 @@ func (c *Calcium) doDeployWorkloadsOnNode(ctx context.Context, ch chan *types.Cr
err = e
indices = utils.Range(deploy)
}

// remap 就不搞进事务了吧, 回滚代价太大了
// 放任 remap 失败的后果是, share pool 没有更新, 这个后果姑且认为是可以承受的
// 而且 remap 是一个幂等操作, 就算这次 remap 失败, 下次 remap 也能收敛到正确到状态
c.withNodeLocked(ctx, nodename, func(ctx context.Context, node *types.Node) error {
c.doRemapResourceAndLog(ctx, logger, node)
return nil
})
return indices, errors.WithStack(err)
}

Expand Down
16 changes: 11 additions & 5 deletions cluster/calcium/dissociate.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,24 +16,30 @@ func (c *Calcium) DissociateWorkload(ctx context.Context, ids []string) (chan *t
ch := make(chan *types.DissociateWorkloadMessage)
go func() {
defer close(ch)
// TODO@zc: group ids by node
for _, id := range ids {
err := c.withWorkloadLocked(ctx, id, func(ctx context.Context, workload *types.Workload) error {
return c.withNodeLocked(ctx, workload.Nodename, func(ctx context.Context, node *types.Node) (err error) {
return utils.Txn(
err = utils.Txn(
ctx,
// if
func(ctx context.Context) error {
return errors.WithStack(c.store.RemoveWorkload(ctx, workload))
log.Infof("[DissociateWorkload] Workload %s dissociated", workload.ID)
return errors.WithStack(c.store.UpdateNodeResource(ctx, node, &workload.ResourceMeta, store.ActionIncr))
},
// then
func(ctx context.Context) error {
log.Infof("[DissociateWorkload] Workload %s dissociated", workload.ID)
return errors.WithStack(c.store.UpdateNodeResource(ctx, node, &workload.ResourceMeta, store.ActionIncr))
return errors.WithStack(c.store.RemoveWorkload(ctx, workload))
},
// rollback
nil,
func(ctx context.Context, _ bool) error {
return errors.WithStack(c.store.UpdateNodeResource(ctx, node, &workload.ResourceMeta, store.ActionDecr))
},
c.config.GlobalTimeout,
)

c.doRemapResourceAndLog(ctx, logger, node)
return err
})
})
if err != nil {
Expand Down
11 changes: 8 additions & 3 deletions cluster/calcium/realloc.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func (c *Calcium) ReallocResource(ctx context.Context, opts *types.ReallocOption

// transaction: node resource
func (c *Calcium) doReallocOnNode(ctx context.Context, nodename string, workload *types.Workload, rrs resourcetypes.ResourceRequests) error {
return c.withNodeLocked(ctx, nodename, func(ctx context.Context, node *types.Node) error {
return c.withNodeLocked(ctx, nodename, func(ctx context.Context, node *types.Node) (err error) {
node.RecycleResources(&workload.ResourceMeta)
plans, err := resources.SelectNodesByResourceRequests(rrs, map[string]*types.Node{node.Name: node})
if err != nil {
Expand All @@ -49,7 +49,7 @@ func (c *Calcium) doReallocOnNode(ctx context.Context, nodename string, workload

originalWorkload := *workload
resourceMeta := &types.ResourceMeta{}
return utils.Txn(
if err = utils.Txn(
ctx,

// if update workload resources
Expand Down Expand Up @@ -96,7 +96,12 @@ func (c *Calcium) doReallocOnNode(ctx context.Context, nodename string, workload
},

c.config.GlobalTimeout,
)
); err != nil {
return
}

c.doRemapResourceAndLog(ctx, log.WithField("Calcium", "doReallocOnNode"), node)
return nil
})
}

Expand Down
32 changes: 23 additions & 9 deletions cluster/calcium/remove.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,21 +33,32 @@ func (c *Calcium) RemoveWorkload(ctx context.Context, ids []string, force bool,
ret := &types.RemoveWorkloadMessage{WorkloadID: id, Success: false, Hook: []*bytes.Buffer{}}
if err := c.withWorkloadLocked(ctx, id, func(ctx context.Context, workload *types.Workload) error {
return c.withNodeLocked(ctx, workload.Nodename, func(ctx context.Context, node *types.Node) (err error) {
return utils.Txn(
if err = utils.Txn(
ctx,
// if
func(ctx context.Context) error {
return errors.WithStack(c.doRemoveWorkload(ctx, workload, force))
return errors.WithStack(c.store.UpdateNodeResource(ctx, node, &workload.ResourceMeta, store.ActionIncr))
},
// then
func(ctx context.Context) error {
log.Infof("[RemoveWorkload] Workload %s removed", workload.ID)
return errors.WithStack(c.store.UpdateNodeResource(ctx, node, &workload.ResourceMeta, store.ActionIncr))
err := errors.WithStack(c.doRemoveWorkload(ctx, workload, force))
if err != nil {
log.Infof("[RemoveWorkload] Workload %s removed", workload.ID)
}
return err
},
// rollback
nil,
func(ctx context.Context, _ bool) error {
return errors.WithStack(c.store.UpdateNodeResource(ctx, node, &workload.ResourceMeta, store.ActionDecr))
},
c.config.GlobalTimeout,
)
); err != nil {
return
}

// TODO@zc: 优化一下, 先按照 node 聚合 ids
c.doRemapResourceAndLog(ctx, logger, node)
return
})
}); err != nil {
logger.Errorf("[RemoveWorkload] Remove workload %s failed, err: %+v", id, err)
Expand All @@ -66,19 +77,22 @@ func (c *Calcium) RemoveWorkload(ctx context.Context, ids []string, force bool,
return ch, nil
}

// semantic: instance removed on err == nil, instance remained on err != nil
func (c *Calcium) doRemoveWorkload(ctx context.Context, workload *types.Workload, force bool) error {
return utils.Txn(
ctx,
// if
func(ctx context.Context) error {
return errors.WithStack(workload.Remove(ctx, force))
return errors.WithStack(c.store.RemoveWorkload(ctx, workload))
},
// then
func(ctx context.Context) error {
return errors.WithStack(c.store.RemoveWorkload(ctx, workload))
return errors.WithStack(workload.Remove(ctx, force))
},
// rollback
nil,
func(ctx context.Context, _ bool) error {
return errors.WithStack(c.store.AddWorkload(ctx, workload))
},
c.config.GlobalTimeout,
)

Expand Down
13 changes: 11 additions & 2 deletions cluster/calcium/replace.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ func (c *Calcium) doReplaceWorkload(
VolumePlanLimit: workload.VolumePlanLimit,
},
}
return createMessage, removeMessage, utils.Txn(
if err = utils.Txn(
ctx,
// if
func(ctx context.Context) (err error) {
Expand Down Expand Up @@ -194,7 +194,16 @@ func (c *Calcium) doReplaceWorkload(
return errors.WithStack(err)
},
c.config.GlobalTimeout,
)
); err != nil {
return createMessage, removeMessage, err
}

c.withNodeLocked(ctx, node.Name, func(ctx context.Context, node *types.Node) error {
c.doRemapResourceAndLog(ctx, log.WithField("Calcium", "doReplaceWorkload"), node)
return nil
})

return createMessage, removeMessage, err
}

func (c *Calcium) doMakeReplaceWorkloadOptions(no int, msg *types.CreateWorkloadMessage, opts *types.DeployOptions, node *types.Node, ancestorWorkloadID string) *enginetypes.VirtualizationCreateOptions {
Expand Down
34 changes: 34 additions & 0 deletions cluster/calcium/resource.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"github.com/pkg/errors"
"github.com/projecteru2/core/log"

enginetypes "github.com/projecteru2/core/engine/types"
resourcetypes "github.com/projecteru2/core/resources/types"
"github.com/projecteru2/core/strategy"
"github.com/projecteru2/core/types"
Expand Down Expand Up @@ -160,3 +161,36 @@ func (c *Calcium) doAllocResource(ctx context.Context, nodeMap map[string]*types
log.Infof("[Calium.doAllocResource] deployMap: %+v", deployMap)
return plans, deployMap, nil
}

// called on changes of resource binding, such as cpu binding
// as an internal api, remap doesn't lock node, the responsibility of that should be taken on by caller
func (c *Calcium) remapResource(ctx context.Context, node *types.Node) (ch <-chan enginetypes.VirtualizationRemapMessage, err error) {
workloads, err := c.store.ListNodeWorkloads(ctx, node.Name, nil)
if err != nil {
return
}
remapOpts := &enginetypes.VirtualizationRemapOptions{
CPUAvailable: node.CPU,
CPUInit: node.InitCPU,
CPUShareBase: int64(c.config.Scheduler.ShareBase),
WorkloadResources: make(map[string]enginetypes.VirtualizationResource),
}
for _, workload := range workloads {
remapOpts.WorkloadResources[workload.ID] = enginetypes.VirtualizationResource{
CPU: workload.CPU,
Quota: workload.CPUQuotaLimit,
NUMANode: workload.NUMANode,
}
}
ch, err = node.Engine.VirtualizationResourceRemap(ctx, remapOpts)
return ch, errors.WithStack(err)
}

func (c *Calcium) doRemapResourceAndLog(ctx context.Context, logger log.Fields, node *types.Node) {
logger = logger.WithField("Calcium", "doRemapResourceIrresponsibly").WithField("nodename", node.Name)
if ch, err := c.remapResource(ctx, node); logger.Err(err) == nil {
for msg := range ch {
logger.WithField("id", msg.ID).Err(msg.Error)
}
}
}
4 changes: 2 additions & 2 deletions engine/types/virtualization.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,6 @@ type VirtualizationRemapOptions struct {
}

type VirtualizationRemapMessage struct {
ID string
VirtualizationResource
ID string
Error error
}

0 comments on commit 6d2d3ef

Please sign in to comment.