From 2fad2f246fd2f2ecce44f8a5fde398f622a3f5ce Mon Sep 17 00:00:00 2001 From: zc Date: Tue, 10 Nov 2020 22:31:18 +0800 Subject: [PATCH] simplify volume dispense: compatible computing --- .../servicediscovery/eru_service_discovery.go | 4 +- cluster/calcium/create.go | 2 +- cluster/calcium/realloc.go | 36 +- cluster/calcium/realloc_test.go | 355 ++++-------------- resources/cpumem/cpumem.go | 10 - resources/scheduler.go | 2 +- resources/types/types.go | 5 +- resources/volume/volume.go | 32 +- rpc/rpc.go | 5 +- types/node_test.go | 32 ++ types/stream_test.go | 16 +- 11 files changed, 160 insertions(+), 339 deletions(-) diff --git a/client/servicediscovery/eru_service_discovery.go b/client/servicediscovery/eru_service_discovery.go index 8204dd373..094473716 100644 --- a/client/servicediscovery/eru_service_discovery.go +++ b/client/servicediscovery/eru_service_discovery.go @@ -51,7 +51,7 @@ func (w *EruServiceDiscovery) Watch(ctx context.Context) (_ <-chan []string, err for { cancelTimer := make(chan struct{}) - go func() { + go func(expectedInterval time.Duration) { timer := time.NewTimer(expectedInterval * time.Second) defer timer.Stop() select { @@ -60,7 +60,7 @@ func (w *EruServiceDiscovery) Watch(ctx context.Context) (_ <-chan []string, err case <-cancelTimer: return } - }() + }(expectedInterval) status, err := stream.Recv() close(cancelTimer) if err != nil { diff --git a/cluster/calcium/create.go b/cluster/calcium/create.go index 5e08c0c00..3b799e789 100644 --- a/cluster/calcium/create.go +++ b/cluster/calcium/create.go @@ -101,7 +101,7 @@ func (c *Calcium) doCreateWorkloads(ctx context.Context, opts *types.DeployOptio for nodename, rollbackIndices := range rollbackMap { if e := c.withNodeLocked(ctx, nodename, func(node *types.Node) error { for _, plan := range plans { - plan.RollbackChangesOnNode(node, rollbackIndices...) + plan.RollbackChangesOnNode(node, rollbackIndices...) // nolint:scopelint } return errors.WithStack(c.store.UpdateNodes(ctx, node)) }); e != nil { diff --git a/cluster/calcium/realloc.go b/cluster/calcium/realloc.go index 225c1fd25..743378753 100644 --- a/cluster/calcium/realloc.go +++ b/cluster/calcium/realloc.go @@ -11,6 +11,7 @@ import ( "github.com/projecteru2/core/utils" ) +// ReallocResource updates workload resource dynamically func (c *Calcium) ReallocResource(ctx context.Context, opts *types.ReallocOptions) (err error) { return c.withContainerLocked(ctx, opts.ID, func(container *types.Container) error { rrs, err := resources.MakeRequests( @@ -41,11 +42,10 @@ func (c *Calcium) doReallocOnNode(ctx context.Context, nodename string, containe if err != nil { return errors.WithStack(err) } - if total != 1 { + if total < 1 { return errors.WithStack(types.ErrInsufficientRes) } - originalContainer := *container return utils.Txn( ctx, @@ -60,17 +60,8 @@ func (c *Calcium) doReallocOnNode(ctx context.Context, nodename string, containe } return c.store.UpdateNodes(ctx, node) }, - // rollback to origin - func(ctx context.Context, failureByCond bool) error { - if failureByCond { - return nil - } - for _, plan := range plans { - plan.RollbackChangesOnNode(node, 1) - } - node.PreserveResources(&originalContainer.ResourceMeta) - return c.store.UpdateNodes(ctx, node) - }, + // no need rollback + nil, c.config.GlobalTimeout, ) @@ -80,12 +71,9 @@ func (c *Calcium) doReallocOnNode(ctx context.Context, nodename string, containe func (c *Calcium) doReallocContainersOnInstance(ctx context.Context, node *types.Node, plans []resourcetypes.ResourcePlans, container *types.Container) (err error) { r := &types.ResourceMeta{} for _, plan := range plans { - // TODO@zc: single existing instance - // TODO@zc: no HardVolumeBindings if r, err = plan.Dispense(resourcetypes.DispenseOptions{ - Node: node, - Index: 1, - ExistingInstances: []*types.Container{container}, + Node: node, + ExistingInstance: container, }, r); err != nil { return } @@ -131,7 +119,17 @@ func (c *Calcium) doReallocContainersOnInstance(ctx context.Context, node *types if failureByCond { return nil } - return errors.WithStack(c.store.UpdateContainer(ctx, &originalContainer)) + r := &enginetypes.VirtualizationResource{ + CPU: originalContainer.CPU, + Quota: originalContainer.CPUQuotaLimit, + NUMANode: originalContainer.NUMANode, + Memory: originalContainer.MemoryLimit, + Volumes: originalContainer.VolumeLimit.ToStringSlice(false, false), + VolumePlan: originalContainer.VolumePlanLimit.ToLiteral(), + VolumeChanged: r.VolumeChanged, + Storage: originalContainer.StorageLimit, + } + return errors.WithStack(node.Engine.VirtualizationUpdateResource(ctx, container.ID, r)) }, c.config.GlobalTimeout, diff --git a/cluster/calcium/realloc_test.go b/cluster/calcium/realloc_test.go index 01e98b174..ade8d3bc3 100644 --- a/cluster/calcium/realloc_test.go +++ b/cluster/calcium/realloc_test.go @@ -1,7 +1,20 @@ package calcium import ( + "context" + "testing" + + "github.com/docker/go-units" + enginemocks "github.com/projecteru2/core/engine/mocks" + enginetypes "github.com/projecteru2/core/engine/types" + lockmocks "github.com/projecteru2/core/lock/mocks" + "github.com/projecteru2/core/scheduler" + complexscheduler "github.com/projecteru2/core/scheduler/complex" + schedulermocks "github.com/projecteru2/core/scheduler/mocks" + storemocks "github.com/projecteru2/core/store/mocks" "github.com/projecteru2/core/types" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" ) func newReallocOptions(id string, cpu float64, memory int64, vbs types.VolumeBindings, bindCPUOpt, memoryLimitOpt types.TriOptions) *types.ReallocOptions { @@ -16,7 +29,6 @@ func newReallocOptions(id string, cpu float64, memory int64, vbs types.VolumeBin } } -/* func TestRealloc(t *testing.T) { c := NewTestCluster() ctx := context.Background() @@ -31,10 +43,6 @@ func TestRealloc(t *testing.T) { engine := &enginemocks.API{} engine.On("VirtualizationInspect", mock.Anything, mock.Anything).Return(&enginetypes.VirtualizationInfo{}, nil) - pod1 := &types.Pod{ - Name: "p1", - } - node1 := &types.Node{ Name: "node1", MemCap: int64(units.GiB), @@ -47,71 +55,72 @@ func TestRealloc(t *testing.T) { Volume: types.VolumeMap{"/dir0": 100}, } - c1 := &types.Container{ - ID: "c1", - Podname: "p1", - Engine: engine, - ResourceMeta: types.ResourceMeta{ - MemoryLimit: 5 * int64(units.MiB), - MemoryRequest: 5 * int64(units.MiB), - CPUQuotaLimit: 0.9, - CPUQuotaRequest: 0.9, - CPU: types.CPUMap{"2": 90}, - VolumePlanRequest: types.VolumePlan{types.MustToVolumeBinding("AUTO:/data:rw:50"): types.VolumeMap{"/dir0": 50}}, - VolumeRequest: types.MustToVolumeBindings([]string{"AUTO:/data:rw:50"}), - VolumePlanLimit: types.VolumePlan{types.MustToVolumeBinding("AUTO:/data:rw:50"): types.VolumeMap{"/dir0": 50}}, - VolumeLimit: types.MustToVolumeBindings([]string{"AUTO:/data:rw:50"}), - }, - Nodename: "node1", + newC1 := func(context.Context, []string) []*types.Container { + return []*types.Container{ + { + ID: "c1", + Podname: "p1", + Engine: engine, + ResourceMeta: types.ResourceMeta{ + MemoryLimit: 5 * int64(units.MiB), + MemoryRequest: 5 * int64(units.MiB), + CPUQuotaLimit: 0.9, + CPUQuotaRequest: 0.9, + CPU: types.CPUMap{"2": 90}, + VolumePlanRequest: types.VolumePlan{types.MustToVolumeBinding("AUTO:/data:rw:50"): types.VolumeMap{"/dir0": 50}}, + VolumeRequest: types.MustToVolumeBindings([]string{"AUTO:/data:rw:50"}), + VolumePlanLimit: types.VolumePlan{types.MustToVolumeBinding("AUTO:/data:rw:50"): types.VolumeMap{"/dir0": 50}}, + VolumeLimit: types.MustToVolumeBindings([]string{"AUTO:/data:rw:50"}), + }, + Nodename: "node1", + }, + } } - c2 := &types.Container{ - ID: "c2", - Podname: "p1", - Engine: engine, - ResourceMeta: types.ResourceMeta{ - MemoryRequest: 5 * int64(units.MiB), - MemoryLimit: 5 * int64(units.MiB), - CPUQuotaLimit: 0.9, - CPUQuotaRequest: 0.9, - }, - Nodename: "node1", + newC2 := func(context.Context, []string) []*types.Container { + return []*types.Container{ + { + ID: "c2", + Podname: "p1", + Engine: engine, + ResourceMeta: types.ResourceMeta{ + MemoryRequest: 5 * int64(units.MiB), + MemoryLimit: 5 * int64(units.MiB), + CPUQuotaLimit: 0.9, + CPUQuotaRequest: 0.9, + }, + Nodename: "node1", + }, + } } - store.On("GetContainers", mock.Anything, []string{"c1"}).Return([]*types.Container{c1}, nil) + store.On("GetContainers", mock.Anything, []string{"c1"}).Return(newC1, nil) // failed by lock store.On("CreateLock", mock.Anything, mock.Anything).Return(nil, types.ErrNoETCD).Once() err := c.ReallocResource(ctx, newReallocOptions("c1", -1, 2*int64(units.GiB), nil, types.TriKeep, types.TriKeep)) - assert.NoError(t, err) + assert.EqualError(t, err, "ETCD must be set") store.AssertExpectations(t) - store.On("CreateLock", mock.Anything, mock.Anything).Return(lock, nil) - // failed by GetPod - store.On("GetPod", mock.Anything, mock.Anything).Return(pod1, types.ErrNoETCD).Once() - err = c.ReallocResource(ctx, newReallocOptions("c1", -1, 2*int64(units.GiB), nil, types.TriKeep, types.TriKeep)) - assert.NoError(t, err) - store.AssertExpectations(t) - store.On("GetPod", mock.Anything, mock.Anything).Return(pod1, nil) // failed by newCPU < 0 err = c.ReallocResource(ctx, newReallocOptions("c1", -1, 2*int64(units.GiB), nil, types.TriKeep, types.TriKeep)) - assert.NoError(t, err) + assert.EqualError(t, err, "limit or request less than 0: bad `CPU` value") store.AssertExpectations(t) // failed by GetNode store.On("GetNode", mock.Anything, "node1").Return(nil, types.ErrNoETCD).Once() err = c.ReallocResource(ctx, newReallocOptions("c1", 0.1, 2*int64(units.GiB), nil, types.TriKeep, types.TriKeep)) - assert.NoError(t, err) + assert.EqualError(t, err, "ETCD must be set") store.AssertExpectations(t) - // failed by no new CPU Plan + // failed by no enough mem store.On("GetNode", mock.Anything, "node1").Return(node1, nil) simpleMockScheduler := &schedulermocks.Scheduler{} scheduler.InitSchedulerV1(simpleMockScheduler) c.scheduler = simpleMockScheduler simpleMockScheduler.On("SelectCPUNodes", mock.Anything, mock.Anything, mock.Anything).Return(nil, nil, 0, types.ErrInsufficientMEM).Once() err = c.ReallocResource(ctx, newReallocOptions("c1", 0.1, 2*int64(units.MiB), nil, types.TriKeep, types.TriKeep)) - assert.NoError(t, err) + assert.EqualError(t, err, "cannot alloc a plan, not enough memory") store.AssertExpectations(t) simpleMockScheduler.AssertExpectations(t) @@ -121,9 +130,9 @@ func TestRealloc(t *testing.T) { nodeVolumePlans := map[string][]types.VolumePlan{ "node1": {{types.MustToVolumeBinding("AUTO:/data:rw:50"): types.VolumeMap{"/dir0": 50}}}, } - simpleMockScheduler.On("SelectVolumeNodes", mock.Anything, types.MustToVolumeBindings([]string{"AUTO:/data:rw:50"})).Return(nil, nodeVolumePlans, 1, nil) + simpleMockScheduler.On("SelectVolumeNodes", mock.Anything, types.MustToVolumeBindings([]string{"AUTO:/data:rw:50"})).Return(nil, nodeVolumePlans, 1, nil).Once() err = c.ReallocResource(ctx, newReallocOptions("c1", 0.1, 2*int64(units.MiB), nil, types.TriKeep, types.TriKeep)) - assert.NoError(t, err) + assert.EqualError(t, err, "not enough resource") simpleMockScheduler.AssertExpectations(t) store.AssertExpectations(t) @@ -134,13 +143,10 @@ func TestRealloc(t *testing.T) { {"2": 100}, }, } - simpleMockScheduler.On("SelectCPUNodes", mock.Anything, mock.Anything, mock.Anything).Return(nil, nodeCPUPlans, 2, nil).Once() simpleMockScheduler.On("SelectMemoryNodes", mock.Anything, mock.Anything, mock.Anything).Return(nil, 2, nil).Once() - simpleMockScheduler.On("SelectVolumeNodes", mock.Anything, types.VolumeBindings{}).Return(nil, nil, 100, nil) + simpleMockScheduler.On("SelectVolumeNodes", mock.Anything, mock.Anything).Return(nil, nil, 100, nil).Once() // failed by apply resource - engine.On("VirtualizationUpdateResource", mock.Anything, mock.Anything, mock.Anything).Return(types.ErrBadContainerID).Twice() - // update node failed - store.On("UpdateNodes", mock.Anything, mock.Anything).Return(types.ErrNoETCD).Times(4) + engine.On("VirtualizationUpdateResource", mock.Anything, mock.Anything, mock.Anything).Return(types.ErrBadContainerID).Once() // reset node node1 = &types.Node{ Name: "node1", @@ -149,25 +155,21 @@ func TestRealloc(t *testing.T) { Engine: engine, Endpoint: "http://1.1.1.1:1", } - store.On("GetContainers", mock.Anything, "c2").Return([]*types.Container{c2}, nil) + store.On("GetContainers", mock.Anything, []string{"c2"}).Return(newC2, nil) err = c.ReallocResource(ctx, newReallocOptions("c2", 0.1, 2*int64(units.MiB), nil, types.TriKeep, types.TriKeep)) - assert.NoError(t, err) - - // check node resource as usual + assert.EqualError(t, err, "container ID must be length of 64") assert.Equal(t, node1.CPU["2"], int64(10)) assert.Equal(t, node1.MemCap, int64(units.GiB)) simpleMockScheduler.AssertExpectations(t) store.AssertExpectations(t) + // failed by update container simpleMockScheduler.On("SelectCPUNodes", mock.Anything, mock.Anything, mock.Anything).Return(nil, nodeCPUPlans, 2, nil).Once() - simpleMockScheduler.On("SelectMemoryNodes", mock.Anything, mock.Anything, mock.Anything).Return(nil, 2, nil).Once() + simpleMockScheduler.On("SelectVolumeNodes", mock.Anything, mock.Anything).Return(nil, nil, 100, nil).Once() engine.On("VirtualizationUpdateResource", mock.Anything, mock.Anything, mock.Anything).Return(nil) - store.On("UpdateNodes", mock.Anything, mock.Anything).Return(nil) - - // failed by update container - store.On("UpdateContainer", mock.Anything, mock.Anything).Return(types.ErrBadContainerID).Times(4) + store.On("UpdateContainer", mock.Anything, mock.Anything).Return(types.ErrBadContainerID).Times(1) err = c.ReallocResource(ctx, newReallocOptions("c1", 0.1, 2*int64(units.MiB), nil, types.TriKeep, types.TriKeep)) - assert.NoError(t, err) + assert.EqualError(t, err, "container ID must be length of 64") simpleMockScheduler.AssertExpectations(t) store.AssertExpectations(t) @@ -181,10 +183,9 @@ func TestRealloc(t *testing.T) { }, } simpleMockScheduler.On("SelectCPUNodes", mock.Anything, mock.Anything, mock.Anything).Return(nil, nodeCPUPlans, 2, nil).Once() - simpleMockScheduler.On("SelectVolumeNodes", mock.Anything, types.MustToVolumeBindings([]string{"AUTO:/data:rw:100"})).Return(nil, nodeVolumePlans, 4, nil).Once() + simpleMockScheduler.On("SelectVolumeNodes", mock.Anything, mock.Anything).Return(nil, nodeVolumePlans, 4, nil).Once() err = c.ReallocResource(ctx, newReallocOptions("c1", 0.1, int64(units.MiB), types.MustToVolumeBindings([]string{"AUTO:/data:rw:50"}), types.TriKeep, types.TriKeep)) - assert.NoError(t, err) - + assert.EqualError(t, err, "incompatible volume plans: cannot alloc a plan, not enough volume") simpleMockScheduler.AssertExpectations(t) store.AssertExpectations(t) @@ -192,17 +193,10 @@ func TestRealloc(t *testing.T) { simpleMockScheduler.On("SelectCPUNodes", mock.Anything, mock.Anything, mock.Anything).Return(nil, nodeCPUPlans, 2, nil).Once() simpleMockScheduler.On("SelectVolumeNodes", mock.Anything, mock.Anything).Return(nil, nil, 0, types.ErrInsufficientVolume).Once() err = c.ReallocResource(ctx, newReallocOptions("c1", 0.1, int64(units.MiB), types.MustToVolumeBindings([]string{"AUTO:/data:rw:1"}), types.TriKeep, types.TriKeep)) - assert.NoError(t, err) + assert.EqualError(t, err, "cannot alloc a plan, not enough volume") simpleMockScheduler.AssertExpectations(t) store.AssertExpectations(t) - // failed due to re-volume plan less then container number - simpleMockScheduler.On("SelectVolumeNodes", mock.Anything, mock.Anything).Return(nil, nodeVolumePlans, 0, nil).Once() - simpleMockScheduler.On("SelectCPUNodes", mock.Anything, mock.Anything, mock.Anything).Return(nil, nodeCPUPlans, 2, nil).Once() - err = c.ReallocResource(ctx, newReallocOptions("c1", 0.1, int64(units.MiB), types.MustToVolumeBindings([]string{"AUTO:/data:rw:1"}), types.TriKeep, types.TriKeep)) - assert.NoError(t, err) - simpleMockScheduler.AssertExpectations(t) - // good to go // rest everything node2 := &types.Node{ @@ -243,7 +237,6 @@ func TestRealloc(t *testing.T) { nodeCPUPlans = map[string][]types.CPUMap{ node2.Name: { {"3": 100}, - {"2": 100}, }, } nodeVolumePlans = map[string][]types.VolumePlan{ @@ -259,210 +252,21 @@ func TestRealloc(t *testing.T) { }, } simpleMockScheduler.On("SelectCPUNodes", mock.Anything, mock.Anything, mock.Anything).Return(nil, nodeCPUPlans, 2, nil) - simpleMockScheduler.On("SelectMemoryNodes", mock.Anything, mock.Anything, mock.Anything).Return(nil, 2, nil).Once() - simpleMockScheduler.On("SelectVolumeNodes", mock.Anything, types.MustToVolumeBindings([]string{"AUTO:/data0:rw:50", "AUTO:/data1:rw:200"})).Return(nil, nodeVolumePlans, 2, nil) + simpleMockScheduler.On("SelectVolumeNodes", mock.Anything, mock.Anything).Return(nil, nodeVolumePlans, 2, nil) store.On("GetNode", mock.Anything, "node2").Return(node2, nil) - store.On("GetContainers", mock.Anything, "c3").Return([]*types.Container{c3}, nil) - store.On("UpdateContainer", mock.Anything, mock.Anything).Return(types.ErrBadContainerID).Times(4) + store.On("GetContainers", mock.Anything, []string{"c3"}).Return([]*types.Container{c3}, nil) + store.On("UpdateContainer", mock.Anything, mock.Anything).Return(types.ErrBadContainerID).Times(1) err = c.ReallocResource(ctx, newReallocOptions("c3", 0.1, 2*int64(units.MiB), types.MustToVolumeBindings([]string{"AUTO:/data0:rw:-50"}), types.TriKeep, types.TriKeep)) - assert.NoError(t, err) + assert.EqualError(t, err, "container ID must be length of 64") assert.Equal(t, node2.CPU["3"], int64(100)) - assert.Equal(t, node2.CPU["2"], int64(10)) - assert.Equal(t, node2.MemCap, int64(units.GiB)) - assert.Equal(t, node2.Volume, types.VolumeMap{"/dir0": 200, "/dir1": 200, "/dir2": 200}) - assert.Equal(t, node2.VolumeUsed, int64(300)) + assert.Equal(t, node2.CPU["2"], int64(100)) + assert.Equal(t, node2.MemCap, int64(units.GiB)+5*int64(units.MiB)) + assert.Equal(t, node2.Volume, types.VolumeMap{"/dir0": 300, "/dir1": 400, "/dir2": 200}) + assert.Equal(t, node2.VolumeUsed, int64(0)) store.AssertExpectations(t) simpleMockScheduler.AssertExpectations(t) } -*/ - -/* -func TestReallocVolume(t *testing.T) { - c := NewTestCluster() - store := &storemocks.Store{} - c.store = store - - simpleMockScheduler := &schedulermocks.Scheduler{} - c.scheduler = simpleMockScheduler - scheduler.InitSchedulerV1(simpleMockScheduler) - engine := &enginemocks.API{} - - node1 := &types.Node{ - Name: "node1", - Volume: types.VolumeMap{"/data": 1000, "/data1": 1000, "/data2": 1000, "/data3": 1000}, - InitVolume: types.VolumeMap{"/data": 2000, "/data1": 2000, "/data2": 2000, "/data3": 2000}, - Engine: engine, - } - - c1 := &types.Container{ - ID: "c1", - Engine: engine, - Podname: "p1", - Nodename: "node1", - ResourceMeta: types.ResourceMeta{ - VolumeRequest: types.MustToVolumeBindings([]string{"AUTO:/data:rw:0", "AUTO:/data1:rw:100", "AUTO:/data2:rw:0", "AUTO:/data3:rw:600"}), - VolumeLimit: types.MustToVolumeBindings([]string{"AUTO:/data:rw:0", "AUTO:/data1:rw:100", "AUTO:/data2:rw:0", "AUTO:/data3:rw:600"}), - VolumePlanRequest: types.VolumePlan{ - types.MustToVolumeBinding("AUTO:/data:rw:0"): types.VolumeMap{"/dir0": 0}, - types.MustToVolumeBinding("AUTO:/data1:rw:100"): types.VolumeMap{"/dir0": 100}, - types.MustToVolumeBinding("AUTO:/data2:rw:0"): types.VolumeMap{"/dir0": 0}, - types.MustToVolumeBinding("AUTO:/data3:rw:600"): types.VolumeMap{"/dir0": 600}, - }, - VolumePlanLimit: types.VolumePlan{ - types.MustToVolumeBinding("AUTO:/data:rw:0"): types.VolumeMap{"/dir0": 0}, - types.MustToVolumeBinding("AUTO:/data1:rw:100"): types.VolumeMap{"/dir0": 100}, - types.MustToVolumeBinding("AUTO:/data2:rw:0"): types.VolumeMap{"/dir0": 0}, - types.MustToVolumeBinding("AUTO:/data3:rw:600"): types.VolumeMap{"/dir0": 600}, - }, - }, - } - - pod1 := &types.Pod{Name: "p1"} - - newVbs := types.MustToVolumeBindings([]string{ - "AUTO:/data:rw:0", - "AUTO:/data1:rw:-100", - "AUTO:/data2:ro:110", - "AUTO:/data3:rw:-580", - }) - - // test 1: incompatible - - newPlans := map[string][]types.VolumePlan{ - "node1": { - { - *newVbs[0]: types.VolumeMap{"/dir1": 0}, - *newVbs[1]: types.VolumeMap{"/dir1": 0}, - *newVbs[2]: types.VolumeMap{"/dir0": 110}, - *newVbs[3]: types.VolumeMap{"/dir1": 20}, - }, - }, - } - - ctx := context.Background() - simpleMockScheduler.On("SelectVolumeNodes", mock.Anything, mock.Anything).Return(nil, newPlans, 1, nil).Once() - simpleMockScheduler.On("SelectMemoryNodes", mock.Anything, mock.Anything, mock.Anything).Return(nil, 100, nil) - simpleMockScheduler.On("SelectStorageNodes", mock.Anything, mock.Anything, mock.Anything).Return(nil, 100, nil) - lock := &lockmocks.DistributedLock{} - lock.On("Lock", mock.Anything).Return(nil) - lock.On("Unlock", mock.Anything).Return(nil) - store.On("CreateLock", mock.Anything, mock.Anything).Return(lock, nil) - store.On("GetPod", mock.Anything, mock.Anything).Return(pod1, nil) - store.On("GetContainers", mock.Anything, []string{"c1"}).Return([]*types.Container{c1}, nil) - store.On("GetNode", mock.Anything, "node1").Return(node1, nil) - store.On("UpdateNodes", mock.Anything, mock.Anything).Return(nil) - err := c.ReallocResource(ctx, newReallocOptions("c1", 0, 0, newVbs, types.TriKeep, types.TriKeep)) - assert.Nil(t, err) - simpleMockScheduler.AssertExpectations(t) - store.AssertExpectations(t) - - // test 2: modify unlimited volume map for compatible requirement - - newPlans = map[string][]types.VolumePlan{ - "node1": { - { - types.MustToVolumeBinding("AUTO:/data:rw:0"): types.VolumeMap{"/dir1": 0}, - types.MustToVolumeBinding("AUTO:/data1:rw:0"): types.VolumeMap{"/dir1": 0}, - types.MustToVolumeBinding("AUTO:/data2:rw:110"): types.VolumeMap{"/dir1": 110}, - types.MustToVolumeBinding("AUTO:/data3:rw:20"): types.VolumeMap{"/dir1": 20}, - }, - { - types.MustToVolumeBinding("AUTO:/data:rw:0"): types.VolumeMap{"/dir1": 0}, - types.MustToVolumeBinding("AUTO:/data1:rw:0"): types.VolumeMap{"/dir1": 0}, - types.MustToVolumeBinding("AUTO:/data2:rw:110"): types.VolumeMap{"/dir0": 110}, - types.MustToVolumeBinding("AUTO:/data3:rw:20"): types.VolumeMap{"/dir0": 20}, - }, - }, - } - - simpleMockScheduler.On("SelectVolumeNodes", mock.Anything, mock.Anything).Return(nil, newPlans, 1, nil).Once() - store.On("UpdateContainer", mock.Anything, mock.Anything).Return(nil) - engine.On("VirtualizationUpdateResource", mock.Anything, mock.Anything, mock.Anything).Return(nil) - err = c.ReallocResource(ctx, newReallocOptions("c1", 0, 0, newVbs, types.TriKeep, types.TriKeep)) - assert.Nil(t, err) - assert.EqualValues(t, 0, c1.VolumePlanRequest[types.MustToVolumeBinding("AUTO:/data:rw:0")]["/dir0"]) - assert.EqualValues(t, 0, c1.VolumePlanRequest[types.MustToVolumeBinding("AUTO:/data1:rw:0")]["/dir0"]) - assert.EqualValues(t, 110, c1.VolumePlanRequest[types.MustToVolumeBinding("AUTO:/data2:rw:110")]["/dir0"]) - assert.EqualValues(t, 20, c1.VolumePlanRequest[types.MustToVolumeBinding("AUTO:/data3:rw:20")]["/dir0"]) - simpleMockScheduler.AssertExpectations(t) - - // test 3: multiple containers search compatible respective plans - - newPlans = map[string][]types.VolumePlan{ - "node1": { - { - types.MustToVolumeBinding("AUTO:/data:rw:0"): types.VolumeMap{"/dir1": 0}, - types.MustToVolumeBinding("AUTO:/data1:rw:0"): types.VolumeMap{"/dir1": 0}, - types.MustToVolumeBinding("AUTO:/data2:rw:110"): types.VolumeMap{"/dir0": 110}, - types.MustToVolumeBinding("AUTO:/data3:rw:20"): types.VolumeMap{"/dir1": 20}, - }, - { - types.MustToVolumeBinding("AUTO:/data:rw:0"): types.VolumeMap{"/dir1": 0}, - types.MustToVolumeBinding("AUTO:/data1:rw:0"): types.VolumeMap{"/dir1": 0}, - types.MustToVolumeBinding("AUTO:/data2:rw:110"): types.VolumeMap{"/dir0": 110}, - types.MustToVolumeBinding("AUTO:/data3:rw:20"): types.VolumeMap{"/dir0": 20}, - }, - { - types.MustToVolumeBinding("AUTO:/data:rw:0"): types.VolumeMap{"/dir1": 0}, - types.MustToVolumeBinding("AUTO:/data1:rw:0"): types.VolumeMap{"/dir1": 0}, - types.MustToVolumeBinding("AUTO:/data2:rw:110"): types.VolumeMap{"/dir1": 110}, - types.MustToVolumeBinding("AUTO:/data3:rw:20"): types.VolumeMap{"/dir0": 20}, - }, - }, - } - - c1.VolumeRequest = types.MustToVolumeBindings([]string{"AUTO:/data:rw:0", "AUTO:/data1:rw:100", "AUTO:/data2:rw:0", "AUTO:/data3:rw:600"}) - c1.VolumeLimit = types.MustToVolumeBindings([]string{"AUTO:/data:rw:0", "AUTO:/data1:rw:100", "AUTO:/data2:rw:0", "AUTO:/data3:rw:600"}) - c1.VolumePlanLimit = types.VolumePlan{ - types.MustToVolumeBinding("AUTO:/data:rw:0"): types.VolumeMap{"/dir0": 0}, - types.MustToVolumeBinding("AUTO:/data1:rw:100"): types.VolumeMap{"/dir0": 100}, - types.MustToVolumeBinding("AUTO:/data2:rw:0"): types.VolumeMap{"/dir0": 0}, - types.MustToVolumeBinding("AUTO:/data3:rw:600"): types.VolumeMap{"/dir0": 600}, - } - c1.VolumePlanRequest = types.VolumePlan{ - types.MustToVolumeBinding("AUTO:/data:rw:0"): types.VolumeMap{"/dir0": 0}, - types.MustToVolumeBinding("AUTO:/data1:rw:100"): types.VolumeMap{"/dir0": 100}, - types.MustToVolumeBinding("AUTO:/data2:rw:0"): types.VolumeMap{"/dir0": 0}, - types.MustToVolumeBinding("AUTO:/data3:rw:600"): types.VolumeMap{"/dir0": 600}, - } - - c2 := &types.Container{ - ID: "c2", - Engine: engine, - Podname: "p1", - Nodename: "node1", - VolumeRequest: types.MustToVolumeBindings([]string{"AUTO:/data:rw:0", "AUTO:/data1:rw:100", "AUTO:/data2:rw:0", "AUTO:/data3:rw:600"}), - VolumeLimit: types.MustToVolumeBindings([]string{"AUTO:/data:rw:0", "AUTO:/data1:rw:100", "AUTO:/data2:rw:0", "AUTO:/data3:rw:600"}), - VolumePlanRequest: types.VolumePlan{ - types.MustToVolumeBinding("AUTO:/data:rw:0"): types.VolumeMap{"/dir0": 0}, - types.MustToVolumeBinding("AUTO:/data1:rw:100"): types.VolumeMap{"/dir1": 100}, - types.MustToVolumeBinding("AUTO:/data2:rw:0"): types.VolumeMap{"/dir1": 0}, - types.MustToVolumeBinding("AUTO:/data3:rw:600"): types.VolumeMap{"/dir0": 600}, - }, - VolumePlanLimit: types.VolumePlan{ - types.MustToVolumeBinding("AUTO:/data:rw:0"): types.VolumeMap{"/dir0": 0}, - types.MustToVolumeBinding("AUTO:/data1:rw:100"): types.VolumeMap{"/dir1": 100}, - types.MustToVolumeBinding("AUTO:/data2:rw:0"): types.VolumeMap{"/dir1": 0}, - types.MustToVolumeBinding("AUTO:/data3:rw:600"): types.VolumeMap{"/dir0": 600}, - }, - } - - simpleMockScheduler.On("SelectVolumeNodes", mock.Anything, mock.Anything).Return(nil, newPlans, 3, nil).Once() - - err = c.ReallocResource(ctx, newReallocOptions("c1", 0, 0, newVbs, types.TriKeep, types.TriKeep)) - assert.Nil(t, err) - assert.EqualValues(t, 0, c1.VolumePlanRequest[types.MustToVolumeBinding("AUTO:/data:rw:0")]["/dir0"]) - assert.EqualValues(t, 0, c1.VolumePlanRequest[types.MustToVolumeBinding("AUTO:/data1:rw:0")]["/dir0"]) - assert.EqualValues(t, 110, c1.VolumePlanRequest[types.MustToVolumeBinding("AUTO:/data2:rw:110")]["/dir0"]) - assert.EqualValues(t, 20, c1.VolumePlanRequest[types.MustToVolumeBinding("AUTO:/data3:rw:20")]["/dir0"]) - assert.EqualValues(t, 0, c2.VolumePlanRequest[types.MustToVolumeBinding("AUTO:/data:rw:0")]["/dir0"]) - assert.EqualValues(t, 0, c2.VolumePlanRequest[types.MustToVolumeBinding("AUTO:/data1:rw:0")]["/dir1"]) - assert.EqualValues(t, 110, c2.VolumePlanRequest[types.MustToVolumeBinding("AUTO:/data2:rw:110")]["/dir1"]) - assert.EqualValues(t, 20, c2.VolumePlanRequest[types.MustToVolumeBinding("AUTO:/data3:rw:20")]["/dir0"]) -} -*/ -/* func TestReallocBindCpu(t *testing.T) { c := NewTestCluster() c.config.Scheduler.ShareBase = 100 @@ -556,18 +360,15 @@ func TestReallocBindCpu(t *testing.T) { err = c.ReallocResource(ctx, newReallocOptions("c6", 0.1, 2*int64(units.MiB), nil, types.TriTrue, types.TriKeep)) assert.NoError(t, err) - assert.NotEmpty(t, c6.CPURequest) + assert.NotEmpty(t, c6.ResourceMeta.CPU) node3.CPU = types.CPUMap{"0": 10, "1": 70, "2": 100, "3": 100} err = c.ReallocResource(ctx, newReallocOptions("c5", -0.1, 2*int64(units.MiB), nil, types.TriTrue, types.TriKeep)) assert.NoError(t, err) - assert.NotEmpty(t, c6.CPURequest) - assert.NotEmpty(t, c5.CPURequest) + assert.NotEmpty(t, c5.ResourceMeta.CPU) err = c.ReallocResource(ctx, newReallocOptions("c6", -0.1, 2*int64(units.MiB), nil, types.TriFalse, types.TriKeep)) assert.NoError(t, err) - assert.Equal(t, 0, len(c5.CPURequest)) - assert.Equal(t, 0, len(c6.CPURequest)) + assert.Equal(t, 0, len(c6.ResourceMeta.CPU)) } -*/ diff --git a/resources/cpumem/cpumem.go b/resources/cpumem/cpumem.go index 85001ef19..b8d59c1cb 100644 --- a/resources/cpumem/cpumem.go +++ b/resources/cpumem/cpumem.go @@ -165,15 +165,5 @@ func (rp ResourcePlans) Dispense(opts resourcetypes.DispenseOptions, r *types.Re r.CPU = rp.CPUPlans[opts.Node.Name][opts.Index] r.NUMANode = opts.Node.GetNUMANode(r.CPU) } - - /* TODO@zc: check this - // special handle when converting from cpu-binding to cpu-unbinding - if len(opts.ExistingInstances) > opts.Index && len(opts.ExistingInstances[opts.Index].CPU) > 0 && len(rp.CPUPlans) == 0 { - r.CPU = types.CPUMap{} - for i := 0; i < len(opts.Node.InitCPU); i++ { - r.CPU[strconv.Itoa(i)] = 0 - } - } - */ return r, nil } diff --git a/resources/scheduler.go b/resources/scheduler.go index b3632fd55..07821e05f 100644 --- a/resources/scheduler.go +++ b/resources/scheduler.go @@ -51,5 +51,5 @@ func SelectNodesByResourceRequests(resourceRequests resourcetypes.ResourceReques if scheduleType == 0 { scheduleType = types.ResourceMemory } - return + return // nolint:nakedret } diff --git a/resources/types/types.go b/resources/types/types.go index 30934bc23..0827ea8d3 100644 --- a/resources/types/types.go +++ b/resources/types/types.go @@ -23,9 +23,8 @@ type SchedulerV2 func([]types.NodeInfo) (ResourcePlans, int, error) // DispenseOptions . type DispenseOptions struct { *types.Node - ExistingInstances []*types.Container - Index int - HardVolumeBindings types.VolumeBindings + Index int + ExistingInstance *types.Container } // ResourcePlans . diff --git a/resources/volume/volume.go b/resources/volume/volume.go index d44b0ff39..ba7790242 100644 --- a/resources/volume/volume.go +++ b/resources/volume/volume.go @@ -67,7 +67,7 @@ func (v *volumeRequest) Validate() error { return errors.Wrap(types.ErrBadVolume, "request and limit not match") } if req.SizeInBytes > 0 && lim.SizeInBytes > 0 && req.SizeInBytes > lim.SizeInBytes { - return errors.Wrap(types.ErrBadVolume, "request size less than limit size ") + v.limit[i].SizeInBytes = req.SizeInBytes } } return nil @@ -162,26 +162,19 @@ func (rp ResourcePlans) Dispense(opts resourcetypes.DispenseOptions, r *types.Re r.VolumePlanRequest = rp.plan[opts.Node.Name][opts.Index] // if there are existing ones, ensure new volumes are compatible - if len(opts.ExistingInstances) > 0 { - plans := map[*types.Container]types.VolumePlan{} - Searching: + if opts.ExistingInstance != nil { + found := false for _, plan := range rp.plan[opts.Node.Name] { - for _, container := range opts.ExistingInstances { - if _, ok := plans[container]; !ok && plan.Compatible(container.VolumePlanRequest) { - plans[container] = plan - if len(plans) == len(opts.ExistingInstances) { - break Searching - } - break - } + if plan.Compatible(opts.ExistingInstance.VolumePlanRequest) { + r.VolumePlanRequest = plan + found = true + break } } - if len(plans) < len(opts.ExistingInstances) { + if !found { return nil, errors.Wrap(types.ErrInsufficientVolume, "incompatible volume plans") } - - r.VolumePlanRequest = plans[opts.ExistingInstances[opts.Index]] } // fix plans while limit > request @@ -200,17 +193,12 @@ func (rp ResourcePlans) Dispense(opts resourcetypes.DispenseOptions, r *types.Re } } - // append hard vbs - if opts.HardVolumeBindings != nil { - r.VolumeRequest = append(r.VolumeRequest, opts.HardVolumeBindings...) - r.VolumeLimit = append(r.VolumeLimit, opts.HardVolumeBindings...) - } - // judge if volume changed - r.VolumeChanged = len(opts.ExistingInstances) > 0 && !r.VolumeLimit.IsEqual(opts.ExistingInstances[opts.Index].VolumeLimit) + r.VolumeChanged = opts.ExistingInstance != nil && !r.VolumeLimit.IsEqual(opts.ExistingInstance.VolumeLimit) return r, nil } +// GetPlan return volume plans by nodename func (rp ResourcePlans) GetPlan(nodename string) []types.VolumePlan { return rp.plan[nodename] } diff --git a/rpc/rpc.go b/rpc/rpc.go index 87d45e066..a1f977bbe 100644 --- a/rpc/rpc.go +++ b/rpc/rpc.go @@ -653,10 +653,11 @@ func (v *Vibranium) ExecuteContainer(stream pb.CoreRPC_ExecuteContainerServer) ( // ReallocResource realloc res for containers func (v *Vibranium) ReallocResource(ctx context.Context, opts *pb.ReallocOptions) (msg *pb.ReallocResourceMessage, err error) { defer func() { + errString := "" if err != nil { - msg = &pb.ReallocResourceMessage{Error: err.Error()} - err = nil + errString = err.Error() } + msg = &pb.ReallocResourceMessage{Error: errString} }() v.taskAdd("ReallocResource", true) diff --git a/types/node_test.go b/types/node_test.go index 79fe81b65..f7bb27912 100644 --- a/types/node_test.go +++ b/types/node_test.go @@ -4,6 +4,7 @@ import ( "context" "encoding/json" "math" + "reflect" "testing" enginemocks "github.com/projecteru2/core/engine/mocks" @@ -202,3 +203,34 @@ func TestAddNodeOptions(t *testing.T) { o.Normalize() assert.EqualValues(t, 3, o.Storage) } + +func TestNodeWithResource(t *testing.T) { + n := Node{ + CPU: CPUMap{"0": 0}, + Volume: VolumeMap{"sda1": 0}, + } + resource := &ResourceMeta{ + CPUQuotaLimit: 0.4, + CPUQuotaRequest: 0.3, + CPU: CPUMap{"0": 30}, + MemoryLimit: 100, + MemoryRequest: 99, + StorageLimit: 88, + StorageRequest: 87, + VolumePlanLimit: MustToVolumePlan(map[string]map[string]int64{"AUTO:/data0:rw:100": {"/sda0": 100}}), + VolumePlanRequest: MustToVolumePlan(map[string]map[string]int64{"AUTO:/data1:rw:101": {"sda1": 101}}), + } + n.RecycleResources(resource) + assert.EqualValues(t, -0.3, n.CPUUsed) + assert.True(t, reflect.DeepEqual(n.CPU, CPUMap{"0": 30})) + assert.EqualValues(t, 99, n.MemCap) + assert.EqualValues(t, 87, n.StorageCap) + assert.EqualValues(t, -101, n.VolumeUsed) + + n.PreserveResources(resource) + assert.EqualValues(t, 0, n.CPUUsed) + assert.True(t, reflect.DeepEqual(n.CPU, CPUMap{"0": 0})) + assert.EqualValues(t, 0, n.MemCap) + assert.EqualValues(t, 0, n.StorageCap) + assert.EqualValues(t, 0, n.VolumeUsed) +} diff --git a/types/stream_test.go b/types/stream_test.go index 41a3210a9..2135d42b6 100644 --- a/types/stream_test.go +++ b/types/stream_test.go @@ -1,7 +1,19 @@ package types -import "testing" +import ( + "io/ioutil" + "strings" + "testing" + + "github.com/stretchr/testify/assert" +) func TestGetReader(t *testing.T) { - // TODO + reader := strings.NewReader("aaa") + rm, err := NewReaderManager(reader) + assert.Nil(t, err) + reader2, err := rm.GetReader() + assert.Nil(t, err) + bs, err := ioutil.ReadAll(reader2) + assert.Equal(t, "aaa", string(bs)) }