Skip to content

Commit

Permalink
split remap
Browse files Browse the repository at this point in the history
  • Loading branch information
CMGS committed Jul 12, 2022
1 parent c280e15 commit f963a86
Show file tree
Hide file tree
Showing 6 changed files with 116 additions and 94 deletions.
2 changes: 1 addition & 1 deletion cluster/calcium/image_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func TestRemoveImage(t *testing.T) {
assert.False(t, c.Success)
}
engine.On("ImageRemove", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return([]string{"xx"}, nil)
// sucess remove but prune fail
// success remove but prune fail
engine.On("ImagesPrune", mock.Anything).Return(types.ErrBadStorage).Once()
ch, err = c.RemoveImage(ctx, &types.ImageOptions{Podname: "podname", Images: []string{"xx"}, Prune: true})
for c := range ch {
Expand Down
68 changes: 68 additions & 0 deletions cluster/calcium/remap.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
package calcium

import (
"context"

enginetypes "github.com/projecteru2/core/engine/types"
"github.com/projecteru2/core/log"
"github.com/projecteru2/core/types"
"github.com/projecteru2/core/utils"
)

type remapMsg struct {
id string
err error
}

func (c *Calcium) doRemapResourceAndLog(ctx context.Context, logger log.Fields, node *types.Node) {
log.Debugf(ctx, "[doRemapResourceAndLog] remap node %s", node.Name)
ctx, cancel := context.WithTimeout(utils.InheritTracingInfo(ctx, context.TODO()), c.config.GlobalTimeout)
defer cancel()

err := c.withNodeOperationLocked(ctx, node.Name, func(ctx context.Context, node *types.Node) error {
logger = logger.WithField("Calcium", "doRemapResourceAndLog").WithField("nodename", node.Name)
if ch, err := c.remapResource(ctx, node); logger.ErrWithTracing(ctx, err) == nil {
for msg := range ch {
log.Infof(ctx, "[doRemapResourceAndLog] id %v", msg.id)
logger.WithField("id", msg.id).ErrWithTracing(ctx, msg.err) // nolint:errcheck
}
}
return nil
})

if err != nil {
log.Errorf(ctx, "[doRemapResourceAndLog] remap node %s failed, err: %v", node.Name, err)
}
}

// called on changes of resource binding, such as cpu binding
// as an internal api, remap doesn't lock node, the responsibility of that should be taken on by caller
func (c *Calcium) remapResource(ctx context.Context, node *types.Node) (ch chan *remapMsg, err error) {
workloads, err := c.store.ListNodeWorkloads(ctx, node.Name, nil)
if err != nil {
return
}

workloadMap := map[string]*types.Workload{}
for _, workload := range workloads {
workloadMap[workload.ID] = workload
}

engineArgsMap, err := c.rmgr.GetRemapArgs(ctx, node.Name, workloadMap)
if err != nil {
return nil, err
}

ch = make(chan *remapMsg, len(engineArgsMap))
go func() {
defer close(ch)
for workloadID, engineArgs := range engineArgsMap {
ch <- &remapMsg{
id: workloadID,
err: node.Engine.VirtualizationUpdateResource(ctx, workloadID, &enginetypes.VirtualizationResource{EngineArgs: engineArgs}),
}
}
}()

return ch, nil
}
45 changes: 45 additions & 0 deletions cluster/calcium/remap_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package calcium

import (
"context"
"testing"

enginemocks "github.com/projecteru2/core/engine/mocks"
enginetypes "github.com/projecteru2/core/engine/types"
"github.com/projecteru2/core/log"
resourcemocks "github.com/projecteru2/core/resources/mocks"
storemocks "github.com/projecteru2/core/store/mocks"
"github.com/projecteru2/core/types"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
)

func TestRemapResource(t *testing.T) {
c := NewTestCluster()
store := c.store.(*storemocks.Store)
rmgr := c.rmgr.(*resourcemocks.Manager)
rmgr.On("GetNodeResourceInfo", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(
map[string]types.NodeResourceArgs{"test": map[string]interface{}{"abc": 123}},
map[string]types.NodeResourceArgs{"test": map[string]interface{}{"abc": 123}},
[]string{types.ErrNoETCD.Error()},
nil)
rmgr.On("GetRemapArgs", mock.Anything, mock.Anything, mock.Anything).Return(
map[string]types.EngineArgs{},
nil,
)
engine := &enginemocks.API{}
node := &types.Node{Engine: engine}

workload := &types.Workload{
ResourceArgs: map[string]types.WorkloadResourceArgs{},
}
store.On("ListNodeWorkloads", mock.Anything, mock.Anything, mock.Anything).Return([]*types.Workload{workload}, nil)
ch := make(chan enginetypes.VirtualizationRemapMessage, 1)
ch <- enginetypes.VirtualizationRemapMessage{}
close(ch)
engine.On("VirtualizationResourceRemap", mock.Anything, mock.Anything).Return((<-chan enginetypes.VirtualizationRemapMessage)(ch), nil)
_, err := c.remapResource(context.Background(), node)
assert.Nil(t, err)

c.doRemapResourceAndLog(context.TODO(), log.WithField("test", "zc"), node)
}
59 changes: 0 additions & 59 deletions cluster/calcium/resource.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"
"fmt"

enginetypes "github.com/projecteru2/core/engine/types"
"github.com/projecteru2/core/log"
"github.com/projecteru2/core/strategy"
"github.com/projecteru2/core/types"
Expand Down Expand Up @@ -120,61 +119,3 @@ func (c *Calcium) doGetDeployMap(ctx context.Context, nodes []string, opts *type

return deployMap, nil
}

type remapMsg struct {
id string
err error
}

// called on changes of resource binding, such as cpu binding
// as an internal api, remap doesn't lock node, the responsibility of that should be taken on by caller
func (c *Calcium) remapResource(ctx context.Context, node *types.Node) (ch chan *remapMsg, err error) {
workloads, err := c.store.ListNodeWorkloads(ctx, node.Name, nil)
if err != nil {
return
}

workloadMap := map[string]*types.Workload{}
for _, workload := range workloads {
workloadMap[workload.ID] = workload
}

engineArgsMap, err := c.rmgr.GetRemapArgs(ctx, node.Name, workloadMap)
if err != nil {
return nil, err
}

ch = make(chan *remapMsg, len(engineArgsMap))
go func() {
defer close(ch)
for workloadID, engineArgs := range engineArgsMap {
ch <- &remapMsg{
id: workloadID,
err: node.Engine.VirtualizationUpdateResource(ctx, workloadID, &enginetypes.VirtualizationResource{EngineArgs: engineArgs}),
}
}
}()

return ch, nil
}

func (c *Calcium) doRemapResourceAndLog(ctx context.Context, logger log.Fields, node *types.Node) {
log.Debugf(ctx, "[doRemapResourceAndLog] remap node %s", node.Name)
ctx, cancel := context.WithTimeout(utils.InheritTracingInfo(ctx, context.TODO()), c.config.GlobalTimeout)
defer cancel()

err := c.withNodeOperationLocked(ctx, node.Name, func(ctx context.Context, node *types.Node) error {
logger = logger.WithField("Calcium", "doRemapResourceAndLog").WithField("nodename", node.Name)
if ch, err := c.remapResource(ctx, node); logger.ErrWithTracing(ctx, err) == nil {
for msg := range ch {
log.Infof(ctx, "[doRemapResourceAndLog] id %v", msg.id)
logger.WithField("id", msg.id).ErrWithTracing(ctx, msg.err) // nolint:errcheck
}
}
return nil
})

if err != nil {
log.Errorf(ctx, "[doRemapResourceAndLog] remap node %s failed, err: %v", node.Name, err)
}
}
32 changes: 0 additions & 32 deletions cluster/calcium/resource_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,7 @@ import (
"time"

enginemocks "github.com/projecteru2/core/engine/mocks"
enginetypes "github.com/projecteru2/core/engine/types"
lockmocks "github.com/projecteru2/core/lock/mocks"
"github.com/projecteru2/core/log"
resourcetypes "github.com/projecteru2/core/resources"
resourcemocks "github.com/projecteru2/core/resources/mocks"
storemocks "github.com/projecteru2/core/store/mocks"
Expand Down Expand Up @@ -132,33 +130,3 @@ func TestNodeResource(t *testing.T) {
details := strings.Join(nr.Diffs, ",")
assert.Contains(t, details, "inspect failed")
}

func TestRemapResource(t *testing.T) {
c := NewTestCluster()
store := c.store.(*storemocks.Store)
rmgr := c.rmgr.(*resourcemocks.Manager)
rmgr.On("GetNodeResourceInfo", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(
map[string]types.NodeResourceArgs{"test": map[string]interface{}{"abc": 123}},
map[string]types.NodeResourceArgs{"test": map[string]interface{}{"abc": 123}},
[]string{types.ErrNoETCD.Error()},
nil)
rmgr.On("GetRemapArgs", mock.Anything, mock.Anything, mock.Anything).Return(
map[string]types.EngineArgs{},
nil,
)
engine := &enginemocks.API{}
node := &types.Node{Engine: engine}

workload := &types.Workload{
ResourceArgs: map[string]types.WorkloadResourceArgs{},
}
store.On("ListNodeWorkloads", mock.Anything, mock.Anything, mock.Anything).Return([]*types.Workload{workload}, nil)
ch := make(chan enginetypes.VirtualizationRemapMessage, 1)
ch <- enginetypes.VirtualizationRemapMessage{}
close(ch)
engine.On("VirtualizationResourceRemap", mock.Anything, mock.Anything).Return((<-chan enginetypes.VirtualizationRemapMessage)(ch), nil)
_, err := c.remapResource(context.Background(), node)
assert.Nil(t, err)

c.doRemapResourceAndLog(context.TODO(), log.WithField("test", "zc"), node)
}
4 changes: 2 additions & 2 deletions lock/etcdlock/mutex_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func TestMutex(t *testing.T) {
assert.NoError(t, err)
_, err = m5.Lock(context.Background())
assert.NoError(t, err)
// then after embeded ETCD close, m5 will be unlocked from passive branch
// then after embedded ETCD close, m5 will be unlocked from passive branch
}

func TestTryLock(t *testing.T) {
Expand Down Expand Up @@ -109,5 +109,5 @@ func TestTryLock(t *testing.T) {
assert.NoError(t, err)
_, err = m6.TryLock(context.Background())
assert.NoError(t, err)
// then after embeded ETCD close, m5 will be unlocked from passive branch
// then after embedded ETCD close, m5 will be unlocked from passive branch
}

0 comments on commit f963a86

Please sign in to comment.