Skip to content

Commit

Permalink
simplify realloc: accept single container ID
Browse files Browse the repository at this point in the history
  • Loading branch information
jschwinger233 committed Nov 9, 2020
1 parent 7f41a7a commit 0f7c423
Show file tree
Hide file tree
Showing 9 changed files with 5,997 additions and 3,705 deletions.
309 changes: 88 additions & 221 deletions cluster/calcium/realloc.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,265 +2,132 @@ package calcium

import (
"context"
"sync"

"github.com/pkg/errors"
enginetypes "github.com/projecteru2/core/engine/types"
"github.com/projecteru2/core/resources"
resourcetypes "github.com/projecteru2/core/resources/types"
"github.com/projecteru2/core/types"
"github.com/projecteru2/core/utils"
log "github.com/sirupsen/logrus"
)

// nodename -> container list
type nodeContainers map[string][]*types.Container

// ReallocResource allow realloc container resource
func (c *Calcium) ReallocResource(ctx context.Context, opts *types.ReallocOptions) (chan *types.ReallocResourceMessage, error) {
ch := make(chan *types.ReallocResourceMessage)
go func() {
defer close(ch)
if err := c.withContainersLocked(ctx, opts.IDs, func(containers map[string]*types.Container) error {
// Pod-Node-Containers
containersInfo := map[*types.Pod]nodeContainers{}
// Pod cache
podCache := map[string]*types.Pod{}
var err error
for _, container := range containers {
pod, ok := podCache[container.Podname]
if !ok {
pod, err = c.store.GetPod(ctx, container.Podname)
if err != nil {
ch <- &types.ReallocResourceMessage{
ContainerID: container.ID,
Error: err,
}
continue
}
podCache[container.Podname] = pod
containersInfo[pod] = nodeContainers{}
}
if _, ok = containersInfo[pod][container.Nodename]; !ok {
containersInfo[pod][container.Nodename] = []*types.Container{}
}
containersInfo[pod][container.Nodename] = append(containersInfo[pod][container.Nodename], container)
}

wg := sync.WaitGroup{}
wg.Add(len(containersInfo))
// deal with normal container
for _, nodeContainersInfo := range containersInfo {
go func(nodeContainersInfo nodeContainers) {
defer wg.Done()
c.doReallocContainersOnPod(ctx, ch, nodeContainersInfo, opts)
}(nodeContainersInfo)
}
wg.Wait()
return nil
}); err != nil {
log.Errorf("[ReallocResource] Realloc failed %+v", err)
for _, ID := range opts.IDs {
ch <- &types.ReallocResourceMessage{
ContainerID: ID,
Error: err,
}
}
func (c *Calcium) ReallocResource(ctx context.Context, opts *types.ReallocOptions) (err error) {
return c.withContainerLocked(ctx, opts.ID, func(container *types.Container) error {
rrs, err := resources.MakeRequests(
types.ResourceOptions{
CPUQuotaRequest: container.CPUQuotaRequest + opts.ResourceOpts.CPUQuotaRequest,
CPUQuotaLimit: container.CPUQuotaLimit + opts.ResourceOpts.CPUQuotaLimit,
CPUBind: types.ParseTriOption(opts.CPUBindOpts, len(container.CPU) > 0),
MemoryRequest: container.MemoryRequest + opts.ResourceOpts.MemoryRequest,
MemoryLimit: container.MemoryLimit + opts.ResourceOpts.MemoryLimit,
StorageRequest: container.StorageRequest + opts.ResourceOpts.StorageRequest,
StorageLimit: container.StorageLimit + opts.ResourceOpts.StorageLimit,
VolumeRequest: types.MergeVolumeBindings(container.VolumeRequest, opts.ResourceOpts.VolumeRequest),
VolumeLimit: types.MergeVolumeBindings(container.VolumeLimit, opts.ResourceOpts.VolumeLimit),
},
)
if err != nil {
return errors.WithStack(err)
}
}()
return ch, nil
return c.doReallocOnNode(ctx, container.Nodename, container, rrs)
})
}

// group containers by node and requests
func (c *Calcium) doReallocContainersOnPod(ctx context.Context, ch chan *types.ReallocResourceMessage, nodeContainersInfo nodeContainers, opts *types.ReallocOptions) {
hardVbsMap := map[string]types.VolumeBindings{}
containerGroups := map[string]map[resourcetypes.ResourceRequests][]*types.Container{}
for nodename, containers := range nodeContainersInfo {
containerGroups[nodename] = map[resourcetypes.ResourceRequests][]*types.Container{}
for _, container := range containers {
if err := func() (err error) {
var (
autoVbsRequest, autoVbsLimit types.VolumeBindings
rrs resourcetypes.ResourceRequests
)
autoVbsRequest, hardVbsMap[container.ID] = types.MergeVolumeBindings(container.VolumeRequest, opts.ResourceOpts.VolumeRequest, opts.ResourceOpts.Volumes).Divide()
autoVbsLimit, _ = types.MergeVolumeBindings(container.VolumeLimit, opts.ResourceOpts.VolumeLimit, opts.Volumes).Divide()

rrs, err = resources.MakeRequests(
types.ResourceOptions{
CPUQuotaRequest: container.CPUQuotaRequest + opts.ResourceOpts.CPUQuotaRequest + opts.CPU,
CPUQuotaLimit: container.CPUQuotaLimit + opts.CPULimit + opts.CPU,
CPUBind: types.ParseTriOption(opts.BindCPUOpt, len(container.CPURequest) > 0),
MemoryRequest: container.MemoryRequest + opts.MemoryRequest + opts.Memory,
MemoryLimit: container.MemoryLimit + opts.MemoryLimit + opts.Memory,
MemorySoft: types.ParseTriOption(opts.MemoryLimitOpt, container.SoftLimit),
VolumeRequest: autoVbsRequest,
VolumeLimit: autoVbsLimit,
StorageRequest: container.StorageRequest + opts.StorageRequest + opts.Storage,
StorageLimit: container.StorageLimit + opts.StorageLimit + opts.Storage,
})

containerGroups[nodename][rrs] = append(containerGroups[nodename][rrs], container)
return

}(); err != nil {
log.Errorf("[ReallocResource.doReallocContainersOnPod] Realloc failed: %+v", err)
ch <- &types.ReallocResourceMessage{Error: err}
return
}
// transaction: node resource
func (c *Calcium) doReallocOnNode(ctx context.Context, nodename string, container *types.Container, rrs resourcetypes.ResourceRequests) error {
return c.withNodeLocked(ctx, nodename, func(node *types.Node) error {
node.RecycleResources(&container.Resource1)
_, total, planMap, err := resources.SelectNodesByResourceRequests(rrs, map[string]*types.Node{node.Name: node})
if err != nil {
return errors.WithStack(err)
}
}

for nodename, containerByApps := range containerGroups {
for rrs, containers := range containerByApps {
if err := c.doReallocContainersOnNode(ctx, ch, nodename, containers, rrs, hardVbsMap); err != nil {

log.Errorf("[ReallocResource.doReallocContainersOnPod] Realloc failed: %+v", err)
ch <- &types.ReallocResourceMessage{Error: err}
}
if total != 1 {
return errors.WithStack(types.ErrInsufficientRes)
}
}
}

// transaction: node meta
func (c *Calcium) doReallocContainersOnNode(ctx context.Context, ch chan *types.ReallocResourceMessage, nodename string, containers []*types.Container, rrs resourcetypes.ResourceRequirements, hardVbsMap map[string]types.VolumeBindings) (err error) {
{
return c.withNodeLocked(ctx, nodename, func(node *types.Node) error {
originalContainer := *container
return utils.Txn(
ctx,

for _, container := range containers {
recycleResources(node, container)
}
planMap, total, _, err := resources.SelectNodes(rrs, map[string]*types.Node{node.Name: node})
if err != nil {
return errors.WithStack(err)
}
if total < len(containers) {
return errors.WithStack(types.ErrInsufficientRes)
}

var (
rollbacks []int
originalContainers []types.Container
)

return utils.Txn(
ctx,

// if: commit changes of realloc resources
func(ctx context.Context) (err error) {
for _, plan := range planMap {
plan.ApplyChangesOnNode(node, utils.Range(len(containers))...)
}
rollbacks = utils.Range(len(containers))
for _, container := range containers {
originalContainers = append(originalContainers, *container)
}
return c.store.UpdateNodes(ctx, node)
},

// then: update instances' resources
func(ctx context.Context) error {
rollbacks, err = c.doUpdateResourceOnInstances(ctx, ch, node, planMap, containers, hardVbsMap)
return err
},

// rollback: back to origin
func(ctx context.Context) error {
for _, plan := range planMap {
plan.RollbackChangesOnNode(node, rollbacks...)
}
for _, idx := range rollbacks {
preserveResources(node, &originalContainers[idx])
}
return c.store.UpdateNodes(ctx, node)
},
c.config.GlobalTimeout,
)
})
}
}

// boundary: chan *types.ReallocResourceMessage
func (c *Calcium) doUpdateResourceOnInstances(ctx context.Context, ch chan *types.ReallocResourceMessage, node *types.Node, planMap map[types.ResourceType]resourcetypes.ResourcePlans, containers []*types.Container, hardVbsMap map[string]types.VolumeBindings) (rollbacks []int, err error) {
wg := sync.WaitGroup{}
wg.Add(len(containers))

for idx, container := range containers {
go func(container *types.Container, idx int) {
var e error
msg := &types.ReallocResourceMessage{ContainerID: container.ID}
defer func() {
if e != nil {
err = e
msg.Error = e
rollbacks = append(rollbacks, idx)
// if commit changes
func(ctx context.Context) (err error) {
for _, plan := range planMap {
plan.ApplyChangesOnNode(node, 1)
}
ch <- msg
wg.Done()
}()

rsc := &types.Resources{}
for _, plan := range planMap {
if e = plan.Dispense(resourcetypes.DispenseOptions{
Node: node,
Index: idx,
ExistingInstances: containers,
HardVolumeBindings: hardVbsMap[container.ID],
}, rsc); e != nil {
return
return c.store.UpdateNodes(ctx, node)
},

// then update workload resources
func(ctx context.Context) error {
return c.doReallocContainersOnInstance(ctx, node, planMap, container)
},

// rollback to origin
func(ctx context.Context) error {
for _, plan := range planMap {
plan.RollbackChangesOnNode(node, 1)
}
}
node.PreserveResources(&originalContainer.Resource1)
return c.store.UpdateNodes(ctx, node)
},

e = c.doUpdateResourceOnInstance(ctx, node, container, *rsc)
}(container, idx)
}

wg.Wait()
return rollbacks, errors.WithStack(err)
c.config.GlobalTimeout,
)
})
}

// transaction: container meta
func (c *Calcium) doUpdateResourceOnInstance(ctx context.Context, node *types.Node, container *types.Container, rsc types.Resources) error {
originContainer := *container
func (c *Calcium) doReallocContainersOnInstance(ctx context.Context, node *types.Node, planMap map[types.ResourceType]resourcetypes.ResourcePlans, container *types.Container) (err error) {
r := &types.Resource1{}
for _, plan := range planMap {
// TODO@zc: single existing instance
// TODO@zc: no HardVolumeBindings
if r, err = plan.Dispense(resourcetypes.DispenseOptions{
Node: node,
Index: 1,
ExistingInstances: []*types.Container{container},
}, r); err != nil {
return
}
}

originalContainer := *container
return utils.Txn(
ctx,

// if: update container meta
func(ctx context.Context) error {
container.CPURequest = rsc.CPURequest
container.QuotaRequest = rsc.CPUQuotaRequest
container.QuotaLimit = rsc.CPUQuotaLimit
container.MemoryRequest = rsc.MemoryRequest
container.MemoryLimit = rsc.MemoryLimit
container.SoftLimit = rsc.MemorySoftLimit
container.VolumeRequest = rsc.VolumeRequest
container.VolumePlanRequest = rsc.VolumePlanRequest
container.VolumeLimit = rsc.VolumeLimit
container.VolumePlanLimit = rsc.VolumePlanLimit
container.StorageRequest = rsc.StorageRequest
container.StorageLimit = rsc.StorageLimit
container.CPUQuotaRequest = r.CPUQuotaRequest
container.CPUQuotaLimit = r.CPUQuotaLimit
container.CPU = r.CPU
container.MemoryRequest = r.MemoryRequest
container.MemoryLimit = r.MemoryLimit
container.VolumeRequest = r.VolumeRequest
container.VolumePlanRequest = r.VolumePlanRequest
container.VolumeLimit = r.VolumeLimit
container.VolumePlanLimit = r.VolumePlanLimit
container.StorageRequest = r.StorageRequest
container.StorageLimit = r.StorageLimit
return errors.WithStack(c.store.UpdateContainer(ctx, container))
},

// then: update container resources
func(ctx context.Context) error {
r := &enginetypes.VirtualizationResource{
CPU: rsc.CPURequest,
Quota: rsc.CPUQuotaLimit,
NUMANode: rsc.NUMANode,
Memory: rsc.MemoryLimit,
SoftLimit: rsc.MemorySoftLimit,
Volumes: rsc.VolumeLimit.ToStringSlice(false, false),
VolumePlan: rsc.VolumePlanLimit.ToLiteral(),
VolumeChanged: rsc.VolumeChanged,
Storage: rsc.StorageLimit,
CPU: r.CPU,
Quota: r.CPUQuotaLimit,
NUMANode: r.NUMANode,
Memory: r.MemoryLimit,
Volumes: r.VolumeLimit.ToStringSlice(false, false),
VolumePlan: r.VolumePlanLimit.ToLiteral(),
VolumeChanged: r.VolumeChanged,
Storage: r.StorageLimit,
}
return errors.WithStack(node.Engine.VirtualizationUpdateResource(ctx, container.ID, r))
},

// rollback: container meta
func(ctx context.Context) error {
return errors.WithStack(c.store.UpdateContainer(ctx, &originContainer))
return errors.WithStack(c.store.UpdateContainer(ctx, &originalContainer))
},

c.config.GlobalTimeout,
Expand Down
2 changes: 1 addition & 1 deletion cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ type Cluster interface {
DissociateContainer(ctx context.Context, IDs []string) (chan *types.DissociateContainerMessage, error)
ControlContainer(ctx context.Context, IDs []string, t string, force bool) (chan *types.ControlContainerMessage, error)
ExecuteContainer(ctx context.Context, opts *types.ExecuteContainerOptions, inCh <-chan []byte) chan *types.AttachContainerMessage
ReallocResource(ctx context.Context, opts *types.ReallocOptions) (chan *types.ReallocResourceMessage, error)
ReallocResource(ctx context.Context, opts *types.ReallocOptions) error
LogStream(ctx context.Context, opts *types.LogStreamOptions) (chan *types.LogStreamMessage, error)
RunAndWait(ctx context.Context, opts *types.DeployOptions, inCh <-chan []byte) (<-chan *types.AttachContainerMessage, error)
// finalizer
Expand Down
4 changes: 2 additions & 2 deletions resources/cpumem/cpumem.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
package cpumem

import (
"strconv"

"github.com/pkg/errors"
resourcetypes "github.com/projecteru2/core/resources/types"
"github.com/projecteru2/core/scheduler"
Expand Down Expand Up @@ -168,12 +166,14 @@ func (rp ResourcePlans) Dispense(opts resourcetypes.DispenseOptions, r *types.Re
r.NUMANode = opts.Node.GetNUMANode(r.CPU)
}

/* TODO@zc: check this
// special handle when converting from cpu-binding to cpu-unbinding
if len(opts.ExistingInstances) > opts.Index && len(opts.ExistingInstances[opts.Index].CPU) > 0 && len(rp.CPUPlans) == 0 {
r.CPU = types.CPUMap{}
for i := 0; i < len(opts.Node.InitCPU); i++ {
r.CPU[strconv.Itoa(i)] = 0
}
}
*/
return r, nil
}
Loading

0 comments on commit 0f7c423

Please sign in to comment.