Skip to content

Commit

Permalink
remap without lock
Browse files Browse the repository at this point in the history
  • Loading branch information
DuodenumL committed Mar 3, 2022
1 parent 2263dcf commit 3f0e424
Show file tree
Hide file tree
Showing 15 changed files with 128 additions and 86 deletions.
8 changes: 7 additions & 1 deletion cluster/calcium/calcium.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ type Calcium struct {
scheduler scheduler.Scheduler
source source.Source
watcher discovery.Service
remapChan chan remapEntry
wal *WAL
identifier string
}
Expand Down Expand Up @@ -66,10 +67,12 @@ func New(config types.Config, t *testing.T) (*Calcium, error) {

// set watcher
watcher := helium.New(config.GRPCConfig, store)
remapChan := make(chan remapEntry)

cal := &Calcium{store: store, config: config, scheduler: potassium, source: scm, watcher: watcher}
cal := &Calcium{store: store, config: config, scheduler: potassium, source: scm, watcher: watcher, remapChan: remapChan}
cal.wal, err = newCalciumWAL(cal)
cal.identifier = config.Identifier()
go cal.watchRemapChan()

return cal, logger.Err(nil, errors.WithStack(err)) //nolint
}
Expand All @@ -82,6 +85,9 @@ func (c *Calcium) DisasterRecover(ctx context.Context) {
// Finalizer use for defer
func (c *Calcium) Finalizer() {
// TODO some resource recycle
if c.remapChan != nil {
close(c.remapChan)
}
}

// GetIdentifier returns the identifier of calcium
Expand Down
7 changes: 1 addition & 6 deletions cluster/calcium/create.go
Original file line number Diff line number Diff line change
Expand Up @@ -249,12 +249,7 @@ func (c *Calcium) doDeployWorkloadsOnNode(ctx context.Context, ch chan *types.Cr
// remap 就不搞进事务了吧, 回滚代价太大了
// 放任 remap 失败的后果是, share pool 没有更新, 这个后果姑且认为是可以承受的
// 而且 remap 是一个幂等操作, 就算这次 remap 失败, 下次 remap 也能收敛到正确到状态
if err := c.withNodeLocked(ctx, nodename, func(ctx context.Context, node *types.Node) error {
c.doRemapResourceAndLog(ctx, logger, node)
return nil
}); err != nil {
logger.Errorf(ctx, "failed to lock node to remap: %v", err)
}
go c.pushNodeToRemapQueue(ctx, logger, node.Name)
return indices, err
}

Expand Down
6 changes: 3 additions & 3 deletions cluster/calcium/create_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@ import (
"testing"

"github.com/pkg/errors"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"

enginemocks "github.com/projecteru2/core/engine/mocks"
enginetypes "github.com/projecteru2/core/engine/types"
lockmocks "github.com/projecteru2/core/lock/mocks"
Expand All @@ -16,8 +19,6 @@ import (
"github.com/projecteru2/core/types"
"github.com/projecteru2/core/wal"
walmocks "github.com/projecteru2/core/wal/mocks"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
)

func TestCreateWorkload(t *testing.T) {
Expand Down Expand Up @@ -234,7 +235,6 @@ func TestCreateWorkloadTxn(t *testing.T) {
engine.On("ImageRemoteDigest", mock.Anything, mock.Anything).Return("", nil)
engine.On("VirtualizationCreate", mock.Anything, mock.Anything).Return(nil, errors.Wrap(context.DeadlineExceeded, "VirtualizationCreate")).Twice()
engine.On("VirtualizationRemove", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil)
store.On("ListNodeWorkloads", mock.Anything, mock.Anything, mock.Anything).Return(nil, types.ErrNoETCD)
walCommitted = false
ch, err = c.CreateWorkload(ctx, opts)
assert.Nil(t, err)
Expand Down
2 changes: 1 addition & 1 deletion cluster/calcium/dissociate.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func (c *Calcium) DissociateWorkload(ctx context.Context, ids []string) (chan *t
}
ch <- msg
}
c.doRemapResourceAndLog(ctx, logger, node)
go c.pushNodeToRemapQueue(ctx, logger, node.Name)
return nil
}); err != nil {
logger.WithField("nodename", nodename).Errorf(ctx, "failed to lock node: %+v", err)
Expand Down
1 change: 0 additions & 1 deletion cluster/calcium/dissociate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,6 @@ func TestDissociateWorkload(t *testing.T) {
// failed by RemoveWorkload
store.On("UpdateNodeResource", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil)
store.On("RemoveWorkload", mock.Anything, mock.Anything).Return(types.ErrNoETCD).Once()
store.On("ListNodeWorkloads", mock.Anything, mock.Anything, mock.Anything).Return(nil, types.ErrNoETCD)
ch, err = c.DissociateWorkload(ctx, []string{"c1"})
assert.NoError(t, err)
for r := range ch {
Expand Down
2 changes: 1 addition & 1 deletion cluster/calcium/realloc.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ func (c *Calcium) doReallocOnNode(ctx context.Context, node *types.Node, workloa
return
}

c.doRemapResourceAndLog(ctx, log.WithField("Calcium", "doReallocOnNode"), node)
go c.pushNodeToRemapQueue(ctx, log.WithField("Calcium", "doReallocOnNode"), node.Name)
return nil
}

Expand Down
1 change: 0 additions & 1 deletion cluster/calcium/realloc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -357,7 +357,6 @@ func TestReallocBindCpu(t *testing.T) {
assert.Error(t, err)
store.AssertExpectations(t)

store.On("ListNodeWorkloads", mock.Anything, mock.Anything, mock.Anything).Return(nil, types.ErrNoETCD)
store.On("UpdateNodes", mock.Anything, mock.Anything).Return(nil)
err = c.ReallocResource(ctx, newReallocOptions("c5", 0.1, 2*int64(units.MiB), nil, types.TriFalse, types.TriKeep))
assert.NoError(t, err)
Expand Down
73 changes: 73 additions & 0 deletions cluster/calcium/remap.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
package calcium

import (
"context"

"github.com/pkg/errors"

enginetypes "github.com/projecteru2/core/engine/types"
"github.com/projecteru2/core/log"
"github.com/projecteru2/core/utils"
)

type remapEntry struct {
ctx context.Context
nodeName string
logger log.Fields
}

// push node to remap queue
func (c *Calcium) pushNodeToRemapQueue(ctx context.Context, logger log.Fields, nodeName string) {
c.remapChan <- remapEntry{ctx: ctx, nodeName: nodeName, logger: logger}
}

// watch remap queue
func (c *Calcium) watchRemapChan() {
for {
if entry, ok := <-c.remapChan; ok {
c.doRemapResourceAndLog(entry.ctx, entry.logger, entry.nodeName)
} else {
return
}
}
}

// called on changes of resource binding, such as cpu binding
func (c *Calcium) remapResource(ctx context.Context, nodeName string) (ch <-chan enginetypes.VirtualizationRemapMessage, err error) {
node, err := c.store.GetNode(ctx, nodeName)
if err != nil {
return nil, err
}
workloads, err := c.store.ListNodeWorkloads(ctx, nodeName, nil)
if err != nil {
return nil, err
}
remapOpts := &enginetypes.VirtualizationRemapOptions{
CPUAvailable: node.CPU,
CPUInit: node.InitCPU,
CPUShareBase: int64(c.config.Scheduler.ShareBase),
WorkloadResources: make(map[string]enginetypes.VirtualizationResource),
}
for _, workload := range workloads {
remapOpts.WorkloadResources[workload.ID] = enginetypes.VirtualizationResource{
CPU: workload.CPU,
Quota: workload.CPUQuotaLimit,
NUMANode: workload.NUMANode,
}
}
ch, err = node.Engine.VirtualizationResourceRemap(ctx, remapOpts)
return ch, errors.WithStack(err)
}

func (c *Calcium) doRemapResourceAndLog(ctx context.Context, logger log.Fields, nodeName string) {
log.Debugf(ctx, "[doRemapResourceAndLog] remap node %s", nodeName)
ctx, cancel := context.WithTimeout(utils.InheritTracingInfo(ctx, context.TODO()), c.config.GlobalTimeout)
defer cancel()
logger = logger.WithField("Calcium", "doRemapResourceAndLog").WithField("nodename", nodeName)
if ch, err := c.remapResource(ctx, nodeName); logger.Err(ctx, err) == nil {
for msg := range ch {
log.Debugf(ctx, "[doRemapResourceAndLog] remap node %s, msg: %+v", nodeName, msg)
logger.WithField("id", msg.ID).Err(ctx, msg.Error) // nolint:errcheck
}
}
}
40 changes: 40 additions & 0 deletions cluster/calcium/remap_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package calcium

import (
"context"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"

enginemocks "github.com/projecteru2/core/engine/mocks"
enginetypes "github.com/projecteru2/core/engine/types"
"github.com/projecteru2/core/log"
storemocks "github.com/projecteru2/core/store/mocks"
"github.com/projecteru2/core/types"
)

func TestRemapResource(t *testing.T) {
c := NewTestCluster()
store := &storemocks.Store{}
c.store = store
engine := &enginemocks.API{}
node := &types.Node{Engine: engine}
node.Name = "node1"

workload := &types.Workload{
ResourceMeta: types.ResourceMeta{
CPUQuotaLimit: 1,
},
}
store.On("GetNode", mock.Anything, mock.Anything).Return(node, nil)
store.On("ListNodeWorkloads", mock.Anything, mock.Anything, mock.Anything).Return([]*types.Workload{workload}, nil)
ch := make(chan enginetypes.VirtualizationRemapMessage, 1)
ch <- enginetypes.VirtualizationRemapMessage{}
close(ch)
engine.On("VirtualizationResourceRemap", mock.Anything, mock.Anything).Return((<-chan enginetypes.VirtualizationRemapMessage)(ch), nil)
_, err := c.remapResource(context.Background(), node.Name)
assert.Nil(t, err)

c.doRemapResourceAndLog(context.TODO(), log.WithField("test", "zc"), node.Name)
}
2 changes: 1 addition & 1 deletion cluster/calcium/remove.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func (c *Calcium) RemoveWorkload(ctx context.Context, ids []string, force bool,
}
ch <- ret
}
c.doRemapResourceAndLog(ctx, logger, node)
go c.pushNodeToRemapQueue(ctx, logger, node.Name)
return nil
}); err != nil {
logger.WithField("nodename", nodename).Errorf(ctx, "failed to lock node: %+v", err)
Expand Down
1 change: 0 additions & 1 deletion cluster/calcium/remove_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@ func TestRemoveWorkload(t *testing.T) {
store.On("UpdateNodeResource", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil)
store.On("GetNode", mock.Anything, mock.Anything).Return(node, nil)
store.On("RemoveWorkload", mock.Anything, mock.Anything).Return(types.ErrNoETCD).Twice()
store.On("ListNodeWorkloads", mock.Anything, mock.Anything, mock.Anything).Return(nil, types.ErrNoETCD)
ch, err = c.RemoveWorkload(ctx, []string{"xx"}, false, 0)
assert.NoError(t, err)
for r := range ch {
Expand Down
7 changes: 1 addition & 6 deletions cluster/calcium/replace.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,12 +206,7 @@ func (c *Calcium) doReplaceWorkload(
return createMessage, removeMessage, err
}

if err := c.withNodeLocked(ctx, node.Name, func(_ context.Context, node *types.Node) error {
c.doRemapResourceAndLog(ctx, log.WithField("Calcium", "doReplaceWorkload"), node)
return nil
}); err != nil {
log.Errorf(ctx, "[replaceAndRemove] failed to lock node to remap: %v", err)
}
go c.pushNodeToRemapQueue(ctx, log.WithField("Calcium", "doReplaceWorkload"), node.Name)

return createMessage, removeMessage, err
}
Expand Down
37 changes: 0 additions & 37 deletions cluster/calcium/resource.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"
"fmt"

enginetypes "github.com/projecteru2/core/engine/types"
"github.com/projecteru2/core/log"
resourcetypes "github.com/projecteru2/core/resources/types"
"github.com/projecteru2/core/strategy"
Expand Down Expand Up @@ -208,39 +207,3 @@ func (c *Calcium) doAllocResource(ctx context.Context, nodeMap map[string]*types
log.Infof(ctx, "[Calium.doAllocResource] deployMap: %+v", deployMap)
return plans, deployMap, nil
}

// called on changes of resource binding, such as cpu binding
// as an internal api, remap doesn't lock node, the responsibility of that should be taken on by caller
func (c *Calcium) remapResource(ctx context.Context, node *types.Node) (ch <-chan enginetypes.VirtualizationRemapMessage, err error) {
workloads, err := c.store.ListNodeWorkloads(ctx, node.Name, nil)
if err != nil {
return
}
remapOpts := &enginetypes.VirtualizationRemapOptions{
CPUAvailable: node.CPU,
CPUInit: node.InitCPU,
CPUShareBase: int64(c.config.Scheduler.ShareBase),
WorkloadResources: make(map[string]enginetypes.VirtualizationResource),
}
for _, workload := range workloads {
remapOpts.WorkloadResources[workload.ID] = enginetypes.VirtualizationResource{
CPU: workload.CPU,
Quota: workload.CPUQuotaLimit,
NUMANode: workload.NUMANode,
}
}
ch, err = node.Engine.VirtualizationResourceRemap(ctx, remapOpts)
return ch, errors.WithStack(err)
}

func (c *Calcium) doRemapResourceAndLog(ctx context.Context, logger log.Fields, node *types.Node) {
log.Debugf(ctx, "[doRemapResourceAndLog] remap node %s", node.Name)
ctx, cancel := context.WithTimeout(utils.InheritTracingInfo(ctx, context.TODO()), c.config.GlobalTimeout)
defer cancel()
logger = logger.WithField("Calcium", "doRemapResourceAndLog").WithField("nodename", node.Name)
if ch, err := c.remapResource(ctx, node); logger.Err(ctx, err) == nil {
for msg := range ch {
logger.WithField("id", msg.ID).Err(ctx, msg.Error) // nolint:errcheck
}
}
}
25 changes: 0 additions & 25 deletions cluster/calcium/resource_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,7 @@ import (
"time"

enginemocks "github.com/projecteru2/core/engine/mocks"
enginetypes "github.com/projecteru2/core/engine/types"
lockmocks "github.com/projecteru2/core/lock/mocks"
"github.com/projecteru2/core/log"
resourcetypes "github.com/projecteru2/core/resources/types"
"github.com/projecteru2/core/scheduler"
schedulermocks "github.com/projecteru2/core/scheduler/mocks"
Expand Down Expand Up @@ -291,26 +289,3 @@ func testAllocFailedAsCommonDivisionError(t *testing.T, c *Calcium, opts *types.
_, _, err := c.doAllocResource(context.Background(), nodeMap, opts)
assert.Error(t, err)
}

func TestRemapResource(t *testing.T) {
c := NewTestCluster()
store := &storemocks.Store{}
c.store = store
engine := &enginemocks.API{}
node := &types.Node{Engine: engine}

workload := &types.Workload{
ResourceMeta: types.ResourceMeta{
CPUQuotaLimit: 1,
},
}
store.On("ListNodeWorkloads", mock.Anything, mock.Anything, mock.Anything).Return([]*types.Workload{workload}, nil)
ch := make(chan enginetypes.VirtualizationRemapMessage, 1)
ch <- enginetypes.VirtualizationRemapMessage{}
close(ch)
engine.On("VirtualizationResourceRemap", mock.Anything, mock.Anything).Return((<-chan enginetypes.VirtualizationRemapMessage)(ch), nil)
_, err := c.remapResource(context.Background(), node)
assert.Nil(t, err)

c.doRemapResourceAndLog(context.TODO(), log.WithField("test", "zc"), node)
}
2 changes: 0 additions & 2 deletions cluster/calcium/wal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,6 @@ func TestHandleCreateLambda(t *testing.T) {
eng.On("VirtualizationRemove", mock.Anything, wrk.ID, true, true).
Return(nil).
Once()
eng.On("VirtualizationResourceRemap", mock.Anything, mock.Anything).Return(nil, nil).Once()
store.On("GetWorkloads", mock.Anything, []string{wrk.ID}).
Return([]*types.Workload{wrk}, nil).
Twice()
Expand All @@ -175,7 +174,6 @@ func TestHandleCreateLambda(t *testing.T) {
store.On("UpdateNodeResource", mock.Anything, node, mock.Anything, mock.Anything).
Return(nil).
Once()
store.On("ListNodeWorkloads", mock.Anything, mock.Anything, mock.Anything).Return(nil, nil).Once()
lock := &lockmocks.DistributedLock{}
lock.On("Lock", mock.Anything).Return(context.TODO(), nil)
lock.On("Unlock", mock.Anything).Return(nil)
Expand Down

0 comments on commit 3f0e424

Please sign in to comment.