Skip to content

Commit

Permalink
make sure locking nodes before locking workloads
Browse files Browse the repository at this point in the history
  • Loading branch information
jschwinger233 committed Jun 22, 2021
1 parent c90caa7 commit 8316d50
Showing 1 changed file with 82 additions and 78 deletions.
160 changes: 82 additions & 78 deletions cluster/calcium/realloc.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,94 +16,98 @@ import (
// ReallocResource updates workload resource dynamically
func (c *Calcium) ReallocResource(ctx context.Context, opts *types.ReallocOptions) (err error) {
logger := log.WithField("Calcium", "ReallocResource").WithField("opts", opts)
return c.withWorkloadLocked(ctx, opts.ID, func(ctx context.Context, workload *types.Workload) error {
rrs, err := resources.MakeRequests(
types.ResourceOptions{
CPUQuotaRequest: utils.Round(workload.CPUQuotaRequest + opts.ResourceOpts.CPUQuotaRequest),
CPUQuotaLimit: utils.Round(workload.CPUQuotaLimit + opts.ResourceOpts.CPUQuotaLimit),
CPUBind: types.ParseTriOption(opts.CPUBindOpts, len(workload.CPU) > 0),
CPU: workload.CPU,
MemoryRequest: workload.MemoryRequest + opts.ResourceOpts.MemoryRequest,
MemoryLimit: workload.MemoryLimit + opts.ResourceOpts.MemoryLimit,
StorageRequest: workload.StorageRequest + opts.ResourceOpts.StorageRequest,
StorageLimit: workload.StorageLimit + opts.ResourceOpts.StorageLimit,
VolumeRequest: types.MergeVolumeBindings(workload.VolumeRequest, opts.ResourceOpts.VolumeRequest),
VolumeLimit: types.MergeVolumeBindings(workload.VolumeLimit, opts.ResourceOpts.VolumeLimit),
VolumeExist: workload.VolumePlanRequest,
},
)
if err != nil {
return logger.Err(ctx, err)
}
return logger.Err(ctx, c.doReallocOnNode(ctx, workload.Nodename, workload, rrs))
workload, err := c.GetWorkload(ctx, opts.ID)
if err != nil {
return
}
return c.withNodeLocked(ctx, workload.Nodename, func(ctx context.Context, node *types.Node) error {
return c.withWorkloadLocked(ctx, opts.ID, func(ctx context.Context, workload *types.Workload) error {
rrs, err := resources.MakeRequests(
types.ResourceOptions{
CPUQuotaRequest: utils.Round(workload.CPUQuotaRequest + opts.ResourceOpts.CPUQuotaRequest),
CPUQuotaLimit: utils.Round(workload.CPUQuotaLimit + opts.ResourceOpts.CPUQuotaLimit),
CPUBind: types.ParseTriOption(opts.CPUBindOpts, len(workload.CPU) > 0),
CPU: workload.CPU,
MemoryRequest: workload.MemoryRequest + opts.ResourceOpts.MemoryRequest,
MemoryLimit: workload.MemoryLimit + opts.ResourceOpts.MemoryLimit,
StorageRequest: workload.StorageRequest + opts.ResourceOpts.StorageRequest,
StorageLimit: workload.StorageLimit + opts.ResourceOpts.StorageLimit,
VolumeRequest: types.MergeVolumeBindings(workload.VolumeRequest, opts.ResourceOpts.VolumeRequest),
VolumeLimit: types.MergeVolumeBindings(workload.VolumeLimit, opts.ResourceOpts.VolumeLimit),
VolumeExist: workload.VolumePlanRequest,
},
)
if err != nil {
return logger.Err(ctx, err)
}
return logger.Err(ctx, c.doReallocOnNode(ctx, node, workload, rrs))
})
})
}

// transaction: node resource
func (c *Calcium) doReallocOnNode(ctx context.Context, nodename string, workload *types.Workload, rrs resourcetypes.ResourceRequests) error {
return c.withNodeLocked(ctx, nodename, func(ctx context.Context, node *types.Node) (err error) {
node.RecycleResources(&workload.ResourceMeta)
plans, err := resources.SelectNodesByResourceRequests(ctx, rrs, map[string]*types.Node{node.Name: node})
if err != nil {
return err
}
func (c *Calcium) doReallocOnNode(ctx context.Context, node *types.Node, workload *types.Workload, rrs resourcetypes.ResourceRequests) (err error) {
node.RecycleResources(&workload.ResourceMeta)
plans, err := resources.SelectNodesByResourceRequests(ctx, rrs, map[string]*types.Node{node.Name: node})
if err != nil {
return err
}

originalWorkload := *workload
resourceMeta := &types.ResourceMeta{}
if err = utils.Txn(
ctx,
originalWorkload := *workload
resourceMeta := &types.ResourceMeta{}
if err = utils.Txn(
ctx,

// if update workload resources
func(ctx context.Context) (err error) {
resourceMeta := &types.ResourceMeta{}
for _, plan := range plans {
if resourceMeta, err = plan.Dispense(resourcetypes.DispenseOptions{
Node: node,
}, resourceMeta); err != nil {
return err
}
// if update workload resources
func(ctx context.Context) (err error) {
resourceMeta := &types.ResourceMeta{}
for _, plan := range plans {
if resourceMeta, err = plan.Dispense(resourcetypes.DispenseOptions{
Node: node,
}, resourceMeta); err != nil {
return err
}
}

return c.doReallocWorkloadsOnInstance(ctx, node.Engine, resourceMeta, workload)
},
// then commit changes
func(ctx context.Context) error {
for _, plan := range plans {
plan.ApplyChangesOnNode(node, 0)
}
return errors.WithStack(c.store.UpdateNodes(ctx, node))
},
// no need rollback
func(ctx context.Context, failureByCond bool) (err error) {
if failureByCond {
return
}
r := &types.ResourceMeta{
CPUQuotaRequest: originalWorkload.CPUQuotaRequest,
CPUQuotaLimit: originalWorkload.CPUQuotaLimit,
CPU: originalWorkload.CPU,
NUMANode: originalWorkload.NUMANode,
MemoryRequest: originalWorkload.MemoryRequest,
MemoryLimit: originalWorkload.MemoryLimit,
VolumeRequest: originalWorkload.VolumeRequest,
VolumeLimit: originalWorkload.VolumeLimit,
VolumePlanRequest: originalWorkload.VolumePlanRequest,
VolumePlanLimit: originalWorkload.VolumePlanLimit,
VolumeChanged: resourceMeta.VolumeChanged,
StorageRequest: originalWorkload.StorageRequest,
StorageLimit: originalWorkload.StorageLimit,
}
return c.doReallocWorkloadsOnInstance(ctx, node.Engine, r, workload)
},
return c.doReallocWorkloadsOnInstance(ctx, node.Engine, resourceMeta, workload)
},
// then commit changes
func(ctx context.Context) error {
for _, plan := range plans {
plan.ApplyChangesOnNode(node, 0)
}
return errors.WithStack(c.store.UpdateNodes(ctx, node))
},
// no need rollback
func(ctx context.Context, failureByCond bool) (err error) {
if failureByCond {
return
}
r := &types.ResourceMeta{
CPUQuotaRequest: originalWorkload.CPUQuotaRequest,
CPUQuotaLimit: originalWorkload.CPUQuotaLimit,
CPU: originalWorkload.CPU,
NUMANode: originalWorkload.NUMANode,
MemoryRequest: originalWorkload.MemoryRequest,
MemoryLimit: originalWorkload.MemoryLimit,
VolumeRequest: originalWorkload.VolumeRequest,
VolumeLimit: originalWorkload.VolumeLimit,
VolumePlanRequest: originalWorkload.VolumePlanRequest,
VolumePlanLimit: originalWorkload.VolumePlanLimit,
VolumeChanged: resourceMeta.VolumeChanged,
StorageRequest: originalWorkload.StorageRequest,
StorageLimit: originalWorkload.StorageLimit,
}
return c.doReallocWorkloadsOnInstance(ctx, node.Engine, r, workload)
},

c.config.GlobalTimeout,
); err != nil {
return
}
c.config.GlobalTimeout,
); err != nil {
return
}

c.doRemapResourceAndLog(ctx, log.WithField("Calcium", "doReallocOnNode"), node)
return nil
})
c.doRemapResourceAndLog(ctx, log.WithField("Calcium", "doReallocOnNode"), node)
return nil
}

func (c *Calcium) doReallocWorkloadsOnInstance(ctx context.Context, engine engine.API, resourceMeta *types.ResourceMeta, workload *types.Workload) (err error) {
Expand Down

0 comments on commit 8316d50

Please sign in to comment.