Skip to content

Commit

Permalink
fix realloc transaction
Browse files Browse the repository at this point in the history
  • Loading branch information
jschwinger233 committed Nov 16, 2020
1 parent 2817b76 commit 0ca65ad
Showing 1 changed file with 59 additions and 35 deletions.
94 changes: 59 additions & 35 deletions cluster/calcium/realloc.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"

"github.com/pkg/errors"
"github.com/projecteru2/core/engine"
enginetypes "github.com/projecteru2/core/engine/types"
"github.com/projecteru2/core/resources"
resourcetypes "github.com/projecteru2/core/resources/types"
Expand Down Expand Up @@ -46,38 +47,60 @@ func (c *Calcium) doReallocOnNode(ctx context.Context, nodename string, containe
return errors.WithStack(types.ErrInsufficientRes)
}

originalContainer := *container
resourceMeta := &types.ResourceMeta{}
return utils.Txn(
ctx,

// if update workload resources
func(ctx context.Context) (err error) {
return c.doReallocContainersOnInstance(ctx, node, plans, container)
resourceMeta := &types.ResourceMeta{}
for _, plan := range plans {
if resourceMeta, err = plan.Dispense(resourcetypes.DispenseOptions{
Node: node,
ExistingInstance: container,
}, resourceMeta); err != nil {
return
}
}
return errors.WithStack(c.doReallocContainersOnInstance(ctx, node.Engine, resourceMeta, container))
},
// then commit changes
func(ctx context.Context) error {
for _, plan := range plans {
plan.ApplyChangesOnNode(node, 1)
}
return c.store.UpdateNodes(ctx, node)
return errors.WithStack(c.store.UpdateNodes(ctx, node))
},
// no need rollback
nil,
func(ctx context.Context, failureByCond bool) (err error) {
if failureByCond {
return
}
r := &types.ResourceMeta{
CPUQuotaRequest: originalContainer.CPUQuotaRequest,
CPUQuotaLimit: originalContainer.CPUQuotaLimit,
CPU: originalContainer.CPU,
NUMANode: originalContainer.NUMANode,
MemoryRequest: originalContainer.MemoryRequest,
MemoryLimit: originalContainer.MemoryLimit,
VolumeRequest: originalContainer.VolumeRequest,
VolumeLimit: originalContainer.VolumeLimit,
VolumePlanRequest: originalContainer.VolumePlanRequest,
VolumePlanLimit: originalContainer.VolumePlanLimit,
VolumeChanged: resourceMeta.VolumeChanged,
StorageRequest: originalContainer.StorageRequest,
StorageLimit: originalContainer.StorageLimit,
}
return errors.WithStack(c.doReallocContainersOnInstance(ctx, node.Engine, r, container))
},

c.config.GlobalTimeout,
)
})
}

func (c *Calcium) doReallocContainersOnInstance(ctx context.Context, node *types.Node, plans []resourcetypes.ResourcePlans, container *types.Container) (err error) {
r := &types.ResourceMeta{}
for _, plan := range plans {
if r, err = plan.Dispense(resourcetypes.DispenseOptions{
Node: node,
ExistingInstance: container,
}, r); err != nil {
return
}
}
func (c *Calcium) doReallocContainersOnInstance(ctx context.Context, engine engine.API, resourceMeta *types.ResourceMeta, container *types.Container) (err error) {

originalContainer := *container
return utils.Txn(
Expand All @@ -86,31 +109,32 @@ func (c *Calcium) doReallocContainersOnInstance(ctx context.Context, node *types
// if: update container resources
func(ctx context.Context) error {
r := &enginetypes.VirtualizationResource{
CPU: r.CPU,
Quota: r.CPUQuotaLimit,
NUMANode: r.NUMANode,
Memory: r.MemoryLimit,
Volumes: r.VolumeLimit.ToStringSlice(false, false),
VolumePlan: r.VolumePlanLimit.ToLiteral(),
VolumeChanged: r.VolumeChanged,
Storage: r.StorageLimit,
CPU: resourceMeta.CPU,
Quota: resourceMeta.CPUQuotaLimit,
NUMANode: resourceMeta.NUMANode,
Memory: resourceMeta.MemoryLimit,
Volumes: resourceMeta.VolumeLimit.ToStringSlice(false, false),
VolumePlan: resourceMeta.VolumePlanLimit.ToLiteral(),
VolumeChanged: resourceMeta.VolumeChanged,
Storage: resourceMeta.StorageLimit,
}
return errors.WithStack(node.Engine.VirtualizationUpdateResource(ctx, container.ID, r))
return errors.WithStack(engine.VirtualizationUpdateResource(ctx, container.ID, r))
},

// then: update container meta
func(ctx context.Context) error {
container.CPUQuotaRequest = r.CPUQuotaRequest
container.CPUQuotaLimit = r.CPUQuotaLimit
container.CPU = r.CPU
container.MemoryRequest = r.MemoryRequest
container.MemoryLimit = r.MemoryLimit
container.VolumeRequest = r.VolumeRequest
container.VolumePlanRequest = r.VolumePlanRequest
container.VolumeLimit = r.VolumeLimit
container.VolumePlanLimit = r.VolumePlanLimit
container.StorageRequest = r.StorageRequest
container.StorageLimit = r.StorageLimit
container.CPUQuotaRequest = resourceMeta.CPUQuotaRequest
container.CPUQuotaLimit = resourceMeta.CPUQuotaLimit
container.CPU = resourceMeta.CPU
container.NUMANode = resourceMeta.NUMANode
container.MemoryRequest = resourceMeta.MemoryRequest
container.MemoryLimit = resourceMeta.MemoryLimit
container.VolumeRequest = resourceMeta.VolumeRequest
container.VolumePlanRequest = resourceMeta.VolumePlanRequest
container.VolumeLimit = resourceMeta.VolumeLimit
container.VolumePlanLimit = resourceMeta.VolumePlanLimit
container.StorageRequest = resourceMeta.StorageRequest
container.StorageLimit = resourceMeta.StorageLimit
return errors.WithStack(c.store.UpdateContainer(ctx, container))
},

Expand All @@ -126,10 +150,10 @@ func (c *Calcium) doReallocContainersOnInstance(ctx context.Context, node *types
Memory: originalContainer.MemoryLimit,
Volumes: originalContainer.VolumeLimit.ToStringSlice(false, false),
VolumePlan: originalContainer.VolumePlanLimit.ToLiteral(),
VolumeChanged: r.VolumeChanged,
VolumeChanged: resourceMeta.VolumeChanged,
Storage: originalContainer.StorageLimit,
}
return errors.WithStack(node.Engine.VirtualizationUpdateResource(ctx, container.ID, r))
return errors.WithStack(engine.VirtualizationUpdateResource(ctx, container.ID, r))
},

c.config.GlobalTimeout,
Expand Down

0 comments on commit 0ca65ad

Please sign in to comment.