Skip to content

Commit

Permalink
bulky operate remove and diss by node
Browse files Browse the repository at this point in the history
  • Loading branch information
jschwinger233 authored and CMGS committed Mar 11, 2021
1 parent 8754425 commit 21bc787
Show file tree
Hide file tree
Showing 2 changed files with 110 additions and 62 deletions.
67 changes: 49 additions & 18 deletions cluster/calcium/dissociate.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,30 +13,61 @@ import (
// DissociateWorkload dissociate workload from eru, return it resource but not modity it
func (c *Calcium) DissociateWorkload(ctx context.Context, ids []string) (chan *types.DissociateWorkloadMessage, error) {
logger := log.WithField("Caliucm", "Dissociate").WithField("ids", ids)

nodeWorkloadGroup, err := c.groupWorkloadsByNode(ctx, ids)
if err != nil {
logger.Errorf("failed to group workloads by node: %+v", err)
return nil, errors.WithStack(err)
}

ch := make(chan *types.DissociateWorkloadMessage)
go func() {
defer close(ch)
// TODO@zc: group ids by node

for nodename, workloadIDs := range nodeWorkloadGroup {
if err := c.withNodeLocked(ctx, nodename, func(ctx context.Context, node *types.Node) error {
for _, workloadID := range workloadIDs {
if err := c.withWorkloadLocked(ctx, workloadID, func(ctx context.Context, workload *types.Workload) error {
msg := &types.DissociateWorkloadMessage{WorkloadID: workloadID}
if err := utils.Txn(
ctx,
// if
func(ctx context.Context) error {
log.Infof("[DissociateWorkload] Workload %s dissociated", workload.ID)
return errors.WithStack(c.store.UpdateNodeResource(ctx, node, &workload.ResourceMeta, store.ActionIncr))
},
// then
func(ctx context.Context) error {
return errors.WithStack(c.store.RemoveWorkload(ctx, workload))
},
// rollback
func(ctx context.Context, failedByCond bool) error {
if failedByCond {
return nil
}
return errors.WithStack(c.store.UpdateNodeResource(ctx, node, &workload.ResourceMeta, store.ActionDecr))
},
c.config.GlobalTimeout,
); err != nil {
msg.Error = err
logger.WithField("id", workloadID).Errorf("failed to diss workload: %+v", err)
}
ch <- msg
return nil
}); err != nil {
logger.WithField("id", workloadID).Errorf("failed to lock workload: %+v", err)
}
}
c.doRemapResourceAndLog(ctx, logger, node)
return nil
}); err != nil {
logger.WithField("nodename", nodename).Errorf("failed to lock node: %+v", err)
}
}

for _, id := range ids {
err := c.withWorkloadLocked(ctx, id, func(ctx context.Context, workload *types.Workload) error {
return c.withNodeLocked(ctx, workload.Nodename, func(ctx context.Context, node *types.Node) (err error) {
err = utils.Txn(
ctx,
// if
func(ctx context.Context) error {
log.Infof("[DissociateWorkload] Workload %s dissociated", workload.ID)
return errors.WithStack(c.store.UpdateNodeResource(ctx, node, &workload.ResourceMeta, store.ActionIncr))
},
// then
func(ctx context.Context) error {
return errors.WithStack(c.store.RemoveWorkload(ctx, workload))
},
// rollback
func(ctx context.Context, _ bool) error {
return errors.WithStack(c.store.UpdateNodeResource(ctx, node, &workload.ResourceMeta, store.ActionDecr))
},
c.config.GlobalTimeout,
)

c.doRemapResourceAndLog(ctx, logger, node)
return err
Expand Down
105 changes: 61 additions & 44 deletions cluster/calcium/remove.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,61 +17,66 @@ import (
// returns a channel that contains removing responses
func (c *Calcium) RemoveWorkload(ctx context.Context, ids []string, force bool, step int) (chan *types.RemoveWorkloadMessage, error) {
logger := log.WithField("Calcium", "RemoveWorkload").WithField("ids", ids).WithField("force", force).WithField("step", step)
ch := make(chan *types.RemoveWorkloadMessage)
if step < 1 {
step = 1

nodeWorkloadGroup, err := c.groupWorkloadsByNode(ctx, ids)
if err != nil {
logger.Errorf("failed to group workloads by node: %+v", err)
return nil, errors.WithStack(err)
}

ch := make(chan *types.RemoveWorkloadMessage)
go func() {
defer close(ch)
wg := sync.WaitGroup{}
defer wg.Wait()
for i, id := range ids {
for nodename, workloadIDs := range nodeWorkloadGroup {
wg.Add(1)
go func(id string) {
go func(nodename string, workloadIDs []string) {
defer wg.Done()
ret := &types.RemoveWorkloadMessage{WorkloadID: id, Success: false, Hook: []*bytes.Buffer{}}
if err := c.withWorkloadLocked(ctx, id, func(ctx context.Context, workload *types.Workload) error {
return c.withNodeLocked(ctx, workload.Nodename, func(ctx context.Context, node *types.Node) (err error) {
if err = utils.Txn(
ctx,
// if
func(ctx context.Context) error {
return errors.WithStack(c.store.UpdateNodeResource(ctx, node, &workload.ResourceMeta, store.ActionIncr))
},
// then
func(ctx context.Context) error {
err := errors.WithStack(c.doRemoveWorkload(ctx, workload, force))
if err != nil {
log.Infof("[RemoveWorkload] Workload %s removed", workload.ID)
}
return err
},
// rollback
func(ctx context.Context, _ bool) error {
return errors.WithStack(c.store.UpdateNodeResource(ctx, node, &workload.ResourceMeta, store.ActionDecr))
},
c.config.GlobalTimeout,
); err != nil {
return
}
if err := c.withNodeLocked(ctx, nodename, func(ctx context.Context, node *types.Node) error {
for _, workloadID := range workloadIDs {
if err := c.withWorkloadLocked(ctx, workloadID, func(ctx context.Context, workload *types.Workload) error {
ret := &types.RemoveWorkloadMessage{WorkloadID: workloadID, Success: true, Hook: []*bytes.Buffer{}}
if err := utils.Txn(
ctx,
// if
func(ctx context.Context) error {
return errors.WithStack(c.store.UpdateNodeResource(ctx, node, &workload.ResourceMeta, store.ActionIncr))
},
// then
func(ctx context.Context) error {
err := errors.WithStack(c.doRemoveWorkload(ctx, workload, force))
if err != nil {
log.Infof("[RemoveWorkload] Workload %s removed", workload.ID)
}
return err
},
// rollback
func(ctx context.Context, failedByCond bool) error {
if failedByCond {
return nil
}
return errors.WithStack(c.store.UpdateNodeResource(ctx, node, &workload.ResourceMeta, store.ActionDecr))
},
c.config.GlobalTimeout,
); err != nil {
logger.WithField("id", workloadID).Errorf("[RemoveWorkload] Remove workload failed: %+v", err)
ret.Hook = append(ret.Hook, bytes.NewBufferString(err.Error()))
ret.Success = false
}

// TODO@zc: 优化一下, 先按照 node 聚合 ids
c.doRemapResourceAndLog(ctx, logger, node)
return
})
ch <- ret
return nil
}); err != nil {
logger.WithField("id", workloadID).Errorf("failed to lock workload: %+v", err)
}
}
c.doRemapResourceAndLog(ctx, logger, node)
return nil
}); err != nil {
logger.Errorf("[RemoveWorkload] Remove workload %s failed, err: %+v", id, err)
ret.Hook = append(ret.Hook, bytes.NewBufferString(err.Error()))
} else {
ret.Success = true
logger.WithField("nodename", nodename).Errorf("failed to lock node: %+v", err)
}
ch <- ret
}(id)
if (i+1)%step == 0 {
log.Info("[RemoveWorkload] Wait for previous tasks done")
wg.Wait()
}
}(nodename, workloadIDs)
}
}()
return ch, nil
Expand Down Expand Up @@ -110,3 +115,15 @@ func (c *Calcium) doRemoveWorkloadSync(ctx context.Context, ids []string) error
}
return nil
}

func (c *Calcium) groupWorkloadsByNode(ctx context.Context, ids []string) (map[string][]string, error) {
workloads, err := c.store.GetWorkloads(ctx, ids)
if err != nil {
return nil, errors.WithStack(err)
}
nodeWorkloadGroup := map[string][]string{}
for _, workload := range workloads {
nodeWorkloadGroup[workload.Nodename] = append(nodeWorkloadGroup[workload.Nodename], workload.ID)
}
return nodeWorkloadGroup, nil
}

0 comments on commit 21bc787

Please sign in to comment.