From 0ca65ad011374b9b20bbcf9e45735e7117e9e3bd Mon Sep 17 00:00:00 2001 From: zc Date: Fri, 13 Nov 2020 17:33:29 +0800 Subject: [PATCH] fix realloc transaction --- cluster/calcium/realloc.go | 94 ++++++++++++++++++++++++-------------- 1 file changed, 59 insertions(+), 35 deletions(-) diff --git a/cluster/calcium/realloc.go b/cluster/calcium/realloc.go index 743378753..2cdfc1998 100644 --- a/cluster/calcium/realloc.go +++ b/cluster/calcium/realloc.go @@ -4,6 +4,7 @@ import ( "context" "github.com/pkg/errors" + "github.com/projecteru2/core/engine" enginetypes "github.com/projecteru2/core/engine/types" "github.com/projecteru2/core/resources" resourcetypes "github.com/projecteru2/core/resources/types" @@ -46,38 +47,60 @@ func (c *Calcium) doReallocOnNode(ctx context.Context, nodename string, containe return errors.WithStack(types.ErrInsufficientRes) } + originalContainer := *container + resourceMeta := &types.ResourceMeta{} return utils.Txn( ctx, // if update workload resources func(ctx context.Context) (err error) { - return c.doReallocContainersOnInstance(ctx, node, plans, container) + resourceMeta := &types.ResourceMeta{} + for _, plan := range plans { + if resourceMeta, err = plan.Dispense(resourcetypes.DispenseOptions{ + Node: node, + ExistingInstance: container, + }, resourceMeta); err != nil { + return + } + } + return errors.WithStack(c.doReallocContainersOnInstance(ctx, node.Engine, resourceMeta, container)) }, // then commit changes func(ctx context.Context) error { for _, plan := range plans { plan.ApplyChangesOnNode(node, 1) } - return c.store.UpdateNodes(ctx, node) + return errors.WithStack(c.store.UpdateNodes(ctx, node)) }, // no need rollback - nil, + func(ctx context.Context, failureByCond bool) (err error) { + if failureByCond { + return + } + r := &types.ResourceMeta{ + CPUQuotaRequest: originalContainer.CPUQuotaRequest, + CPUQuotaLimit: originalContainer.CPUQuotaLimit, + CPU: originalContainer.CPU, + NUMANode: originalContainer.NUMANode, + MemoryRequest: originalContainer.MemoryRequest, + MemoryLimit: originalContainer.MemoryLimit, + VolumeRequest: originalContainer.VolumeRequest, + VolumeLimit: originalContainer.VolumeLimit, + VolumePlanRequest: originalContainer.VolumePlanRequest, + VolumePlanLimit: originalContainer.VolumePlanLimit, + VolumeChanged: resourceMeta.VolumeChanged, + StorageRequest: originalContainer.StorageRequest, + StorageLimit: originalContainer.StorageLimit, + } + return errors.WithStack(c.doReallocContainersOnInstance(ctx, node.Engine, r, container)) + }, c.config.GlobalTimeout, ) }) } -func (c *Calcium) doReallocContainersOnInstance(ctx context.Context, node *types.Node, plans []resourcetypes.ResourcePlans, container *types.Container) (err error) { - r := &types.ResourceMeta{} - for _, plan := range plans { - if r, err = plan.Dispense(resourcetypes.DispenseOptions{ - Node: node, - ExistingInstance: container, - }, r); err != nil { - return - } - } +func (c *Calcium) doReallocContainersOnInstance(ctx context.Context, engine engine.API, resourceMeta *types.ResourceMeta, container *types.Container) (err error) { originalContainer := *container return utils.Txn( @@ -86,31 +109,32 @@ func (c *Calcium) doReallocContainersOnInstance(ctx context.Context, node *types // if: update container resources func(ctx context.Context) error { r := &enginetypes.VirtualizationResource{ - CPU: r.CPU, - Quota: r.CPUQuotaLimit, - NUMANode: r.NUMANode, - Memory: r.MemoryLimit, - Volumes: r.VolumeLimit.ToStringSlice(false, false), - VolumePlan: r.VolumePlanLimit.ToLiteral(), - VolumeChanged: r.VolumeChanged, - Storage: r.StorageLimit, + CPU: resourceMeta.CPU, + Quota: resourceMeta.CPUQuotaLimit, + NUMANode: resourceMeta.NUMANode, + Memory: resourceMeta.MemoryLimit, + Volumes: resourceMeta.VolumeLimit.ToStringSlice(false, false), + VolumePlan: resourceMeta.VolumePlanLimit.ToLiteral(), + VolumeChanged: resourceMeta.VolumeChanged, + Storage: resourceMeta.StorageLimit, } - return errors.WithStack(node.Engine.VirtualizationUpdateResource(ctx, container.ID, r)) + return errors.WithStack(engine.VirtualizationUpdateResource(ctx, container.ID, r)) }, // then: update container meta func(ctx context.Context) error { - container.CPUQuotaRequest = r.CPUQuotaRequest - container.CPUQuotaLimit = r.CPUQuotaLimit - container.CPU = r.CPU - container.MemoryRequest = r.MemoryRequest - container.MemoryLimit = r.MemoryLimit - container.VolumeRequest = r.VolumeRequest - container.VolumePlanRequest = r.VolumePlanRequest - container.VolumeLimit = r.VolumeLimit - container.VolumePlanLimit = r.VolumePlanLimit - container.StorageRequest = r.StorageRequest - container.StorageLimit = r.StorageLimit + container.CPUQuotaRequest = resourceMeta.CPUQuotaRequest + container.CPUQuotaLimit = resourceMeta.CPUQuotaLimit + container.CPU = resourceMeta.CPU + container.NUMANode = resourceMeta.NUMANode + container.MemoryRequest = resourceMeta.MemoryRequest + container.MemoryLimit = resourceMeta.MemoryLimit + container.VolumeRequest = resourceMeta.VolumeRequest + container.VolumePlanRequest = resourceMeta.VolumePlanRequest + container.VolumeLimit = resourceMeta.VolumeLimit + container.VolumePlanLimit = resourceMeta.VolumePlanLimit + container.StorageRequest = resourceMeta.StorageRequest + container.StorageLimit = resourceMeta.StorageLimit return errors.WithStack(c.store.UpdateContainer(ctx, container)) }, @@ -126,10 +150,10 @@ func (c *Calcium) doReallocContainersOnInstance(ctx context.Context, node *types Memory: originalContainer.MemoryLimit, Volumes: originalContainer.VolumeLimit.ToStringSlice(false, false), VolumePlan: originalContainer.VolumePlanLimit.ToLiteral(), - VolumeChanged: r.VolumeChanged, + VolumeChanged: resourceMeta.VolumeChanged, Storage: originalContainer.StorageLimit, } - return errors.WithStack(node.Engine.VirtualizationUpdateResource(ctx, container.ID, r)) + return errors.WithStack(engine.VirtualizationUpdateResource(ctx, container.ID, r)) }, c.config.GlobalTimeout,