Skip to content

Commit

Permalink
test and fix transaction during creating (#215)
Browse files Browse the repository at this point in the history
  • Loading branch information
zc authored and CMGS committed Jun 14, 2020
1 parent 64d3c5d commit 33c735d
Show file tree
Hide file tree
Showing 5 changed files with 311 additions and 10 deletions.
270 changes: 270 additions & 0 deletions cluster/calcium/create_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,12 @@ import (
"context"
"testing"

"github.com/pkg/errors"
enginemocks "github.com/projecteru2/core/engine/mocks"
enginetypes "github.com/projecteru2/core/engine/types"
lockmocks "github.com/projecteru2/core/lock/mocks"
schedulermocks "github.com/projecteru2/core/scheduler/mocks"
st "github.com/projecteru2/core/store"
storemocks "github.com/projecteru2/core/store/mocks"
"github.com/projecteru2/core/types"
"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -38,3 +44,267 @@ func TestCreateContainer(t *testing.T) {
_, err = c.CreateContainer(ctx, opts)
assert.Error(t, err)
}

func TestCreateContainerTxn(t *testing.T) {
c := NewTestCluster()
ctx := context.Background()
opts := &types.DeployOptions{
Count: 2,
DeployMethod: "auto",
CPUQuota: 1,
Image: "zc:test",
Entrypoint: &types.Entrypoint{},
}
store := &storemocks.Store{}
scheduler := &schedulermocks.Scheduler{}
c.store = store
c.scheduler = scheduler
engine := &enginemocks.API{}

pod1 := &types.Pod{Name: "p1"}
node1 := &types.Node{
Name: "n1",
Engine: engine,
}
node2 := &types.Node{
Name: "n2",
Engine: engine,
}
nodes := []*types.Node{node1, node2}

// GetPod fails
store.On("GetPod", mock.Anything, mock.Anything).Return(
nil,
errors.Wrap(context.DeadlineExceeded, "GetPod"),
).Once()
_, err := c.CreateContainer(ctx, opts)
assert.Error(t, err)
assert.True(t, errors.Is(err, context.DeadlineExceeded))
assert.Error(t, err, "GetPod")

// doAllocResource fails: MakeDeployStatus
lock := &lockmocks.DistributedLock{}
lock.On("Lock", mock.Anything).Return(nil)
lock.On("Unlock", mock.Anything).Return(nil)
store.On("CreateLock", mock.Anything, mock.Anything).Return(lock, nil)
store.On("GetPod", mock.Anything, mock.Anything).Return(pod1, nil)
store.On("GetNodesByPod", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nodes, nil)
store.On("GetNode",
mock.AnythingOfType("*context.emptyCtx"),
mock.AnythingOfType("string"),
).Return(
func(_ context.Context, name string) (node *types.Node) {
node = node1
if name == "n2" {
node = node2
}
return
}, nil)
store.On("MakeDeployStatus", mock.Anything, mock.Anything, mock.Anything).Return(
nil,
errors.Wrap(context.DeadlineExceeded, "MakeDeployStatus"),
).Once()
_, err = c.CreateContainer(ctx, opts)
assert.Error(t, err)
assert.True(t, errors.Is(err, context.DeadlineExceeded))
assert.Error(t, err, "MakeDeployStatus")

// doAllocResource fails: UpdateNodeResource for 1st node
store.On("GetNodesByPod", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nodes, nil)
store.On("MakeDeployStatus", ctx, opts, mock.AnythingOfType("[]types.NodeInfo")).Return(
func(_ context.Context, _ *types.DeployOptions, nodesInfo []types.NodeInfo) []types.NodeInfo {
return nodesInfo
}, nil)
scheduler.On("SelectMemoryNodes", mock.AnythingOfType("[]types.NodeInfo"), mock.AnythingOfType("float64"), mock.AnythingOfType("int64")).Return(
func(nodesInfo []types.NodeInfo, _ float64, _ int64) []types.NodeInfo {
return nodesInfo
}, len(nodes), nil)
scheduler.On("SelectStorageNodes", mock.AnythingOfType("[]types.NodeInfo"), mock.AnythingOfType("int64")).Return(
func(nodesInfo []types.NodeInfo, _ int64) []types.NodeInfo {
return nodesInfo
},
len(nodes), nil,
)
scheduler.On("SelectVolumeNodes", mock.AnythingOfType("[]types.NodeInfo"), mock.AnythingOfType("types.VolumeBindings")).Return(
func(nodesInfo []types.NodeInfo, _ types.VolumeBindings) []types.NodeInfo {
return nodesInfo
},
nil, len(nodes), nil,
)
scheduler.On("CommonDivision", mock.AnythingOfType("[]types.NodeInfo"), mock.AnythingOfType("int"), mock.AnythingOfType("int"), mock.AnythingOfType("types.ResourceType")).Return(
func(nodesInfo []types.NodeInfo, _, _ int, _ types.ResourceType) []types.NodeInfo {
for i := range nodesInfo {
nodesInfo[i].Deploy = 1
}
return nodesInfo
},
nil,
)
store.On("UpdateNodeResource",
mock.AnythingOfType("*context.timerCtx"),
mock.AnythingOfType("*types.Node"),
mock.AnythingOfType("types.ResourceMap"),
mock.AnythingOfType("float64"),
mock.AnythingOfType("int64"),
mock.AnythingOfType("int64"),
mock.AnythingOfType("types.ResourceMap"),
mock.AnythingOfType("string")).Return(
func(ctx context.Context, node *types.Node, _ types.CPUMap, quota float64, _, _ int64, _ types.VolumeMap, action string) error {
if action == st.ActionDecr {
return errors.Wrap(context.DeadlineExceeded, "UpdateNodeResource")
}
if action == st.ActionIncr {
quota = -quota
}
node.CPUUsed += quota
return nil
},
).Once()
_, err = c.CreateContainer(ctx, opts)
assert.Error(t, err)
assert.True(t, errors.Is(err, context.DeadlineExceeded))
assert.Error(t, err, "UpdateNodeResource")
assert.EqualValues(t, 0, node1.CPUUsed)
assert.EqualValues(t, 0, node2.CPUUsed)

// doAllocResource fails: UpdateNodeResource for 2nd node
cnt := 0
store.On("UpdateNodeResource",
mock.AnythingOfType("*context.timerCtx"),
mock.AnythingOfType("*types.Node"),
mock.AnythingOfType("types.ResourceMap"),
mock.AnythingOfType("float64"),
mock.AnythingOfType("int64"),
mock.AnythingOfType("int64"),
mock.AnythingOfType("types.ResourceMap"),
mock.AnythingOfType("string")).Return(
func(ctx context.Context, node *types.Node, _ types.CPUMap, quota float64, _, _ int64, _ types.VolumeMap, action string) error {
if action == st.ActionDecr {
cnt++
if cnt == 2 {
return errors.Wrap(context.DeadlineExceeded, "UpdateNodeResource2")
}
}
if action == st.ActionIncr {
quota = -quota
}
node.CPUUsed += quota
return nil
},
).Times(3)
_, err = c.CreateContainer(ctx, opts)
assert.Error(t, err)
assert.True(t, errors.Is(err, context.DeadlineExceeded))
assert.Error(t, err, "UpdateNodeResource2")
assert.EqualValues(t, 0, node1.CPUUsed)
assert.EqualValues(t, 0, node2.CPUUsed)
assert.EqualValues(t, 2, cnt)

// doAllocResource fails: SaveProcessing
store.On("UpdateNodeResource",
mock.AnythingOfType("*context.timerCtx"),
mock.AnythingOfType("*types.Node"),
mock.AnythingOfType("types.ResourceMap"),
mock.AnythingOfType("float64"),
mock.AnythingOfType("int64"),
mock.AnythingOfType("int64"),
mock.AnythingOfType("types.ResourceMap"),
mock.AnythingOfType("string")).Return(
func(ctx context.Context, node *types.Node, _ types.CPUMap, quota float64, _, _ int64, _ types.VolumeMap, action string) error {
if action == st.ActionIncr {
quota = -quota
}
node.CPUUsed += quota
return nil
},
)
store.On("SaveProcessing", mock.Anything, mock.Anything, mock.Anything).Return(errors.Wrap(context.DeadlineExceeded, "SaveProcessing")).Once()
_, err = c.CreateContainer(ctx, opts)
assert.Error(t, err)
assert.True(t, errors.Is(err, context.DeadlineExceeded))
assert.Error(t, err, "SaveProcessing")
assert.EqualValues(t, 0, node1.CPUUsed)
assert.EqualValues(t, 0, node2.CPUUsed)

// doCreateContainerOnNode fails: doGetAndPrepareNode
store.On("GetNode",
mock.AnythingOfType("*context.timerCtx"),
mock.AnythingOfType("string"),
).Return(
func(_ context.Context, name string) (node *types.Node) {
node = node1
if name == "n2" {
node = node2
}
return
}, nil)
store.On("GetNode",
mock.AnythingOfType("*context.cancelCtx"),
mock.AnythingOfType("string"),
).Return(
func(_ context.Context, name string) (node *types.Node) {
node = node1
if name == "n2" {
node = node2
}
return
}, nil)
store.On("SaveProcessing", mock.Anything, mock.Anything, mock.Anything).Return(nil)
engine.On("ImageLocalDigests", mock.Anything, mock.Anything).Return(nil, errors.Wrap(context.DeadlineExceeded, "ImageLocalDigest")).Twice()
engine.On("ImagePull", mock.Anything, mock.Anything, mock.Anything).Return(nil, errors.Wrap(context.DeadlineExceeded, "ImagePull")).Twice()
store.On("UpdateProcessing", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil)
store.On("DeleteProcessing", mock.Anything, mock.Anything, mock.Anything).Return(nil)
ch, err := c.CreateContainer(ctx, opts)
assert.Nil(t, err)
for m := range ch {
assert.Error(t, m.Error)
assert.True(t, errors.Is(m.Error, context.DeadlineExceeded))
assert.Error(t, m.Error, "ImagePull")
}
assert.EqualValues(t, 0, node1.CPUUsed)
assert.EqualValues(t, 0, node2.CPUUsed)

// doCreateAndStartContainer fails: VirtualizationCreate
engine.On("ImageLocalDigests", mock.Anything, mock.Anything).Return([]string{""}, nil)
engine.On("ImageRemoteDigest", mock.Anything, mock.Anything).Return("", nil)
engine.On("VirtualizationCreate", mock.Anything, mock.Anything).Return(nil, errors.Wrap(context.DeadlineExceeded, "VirtualizationCreate")).Twice()
ch, err = c.CreateContainer(ctx, opts)
assert.Nil(t, err)
for m := range ch {
assert.Error(t, m.Error)
assert.True(t, errors.Is(m.Error, context.DeadlineExceeded))
assert.Error(t, m.Error, "VirtualizationCreate")
}
assert.EqualValues(t, 0, node1.CPUUsed)
assert.EqualValues(t, 0, node2.CPUUsed)

// doCreateAndStartContainer fails: AddContainer
engine.On("VirtualizationCreate", mock.Anything, mock.Anything).Return(&enginetypes.VirtualizationCreated{ID: "c1"}, nil)
engine.On("VirtualizationStart", mock.Anything, mock.Anything).Return(nil)
engine.On("VirtualizationInspect", mock.Anything, mock.Anything).Return(&enginetypes.VirtualizationInfo{}, nil)
store.On("AddContainer", mock.Anything, mock.Anything).Return(errors.Wrap(context.DeadlineExceeded, "AddContainer")).Twice()
engine.On("VirtualizationRemove", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil)
store.On("RemoveContainer", mock.Anything, mock.Anything).Return(nil).Twice()
ch, err = c.CreateContainer(ctx, opts)
assert.Nil(t, err)
for m := range ch {
assert.Error(t, m.Error)
assert.True(t, errors.Is(m.Error, context.DeadlineExceeded))
assert.Error(t, m.Error, "AddContainer")
}
assert.EqualValues(t, 0, node1.CPUUsed)
assert.EqualValues(t, 0, node2.CPUUsed)

// doCreateAndStartContainer fails: RemoveContainer
store.On("AddContainer", mock.Anything, mock.Anything).Return(errors.Wrap(context.DeadlineExceeded, "AddContainer")).Twice()
store.On("RemoveContainer", mock.Anything, mock.Anything).Return(errors.Wrap(context.DeadlineExceeded, "RemoveContainer")).Twice()
ch, err = c.CreateContainer(ctx, opts)
assert.Nil(t, err)
for m := range ch {
assert.Error(t, m.Error)
assert.True(t, errors.Is(m.Error, context.DeadlineExceeded))
assert.Error(t, m.Error, "AddContainer")
}
assert.EqualValues(t, 0, node1.CPUUsed)
assert.EqualValues(t, 0, node2.CPUUsed)
}
12 changes: 4 additions & 8 deletions cluster/calcium/resource.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ func (c *Calcium) doAllocResource(ctx context.Context, opts *types.DeployOptions
var nodesInfo []types.NodeInfo
var nodeCPUPlans map[string][]types.CPUMap
var nodeVolumePlans map[string][]types.VolumePlan
if err = c.withNodesLocked(ctx, opts.Podname, opts.Nodename, opts.NodeLabels, false, func(nodes map[string]*types.Node) error {
return nodesInfo, c.withNodesLocked(ctx, opts.Podname, opts.Nodename, opts.NodeLabels, false, func(nodes map[string]*types.Node) error {
if len(nodes) == 0 {
return types.ErrInsufficientNodes
}
Expand Down Expand Up @@ -196,7 +196,7 @@ func (c *Calcium) doAllocResource(ctx context.Context, opts *types.DeployOptions
return types.ErrInsufficientRes
}
nodesInfo = nodesInfo[p:]
var track int
track := -1
return utils.Txn(
ctx,
func(ctx context.Context) error {
Expand All @@ -223,7 +223,7 @@ func (c *Calcium) doAllocResource(ctx context.Context, opts *types.DeployOptions
log.Infof("[allocResource] deploy %d to %s", nodeInfo.Deploy, nodeInfo.Name)
}
}()
return nil
return c.doBindProcessStatus(ctx, opts, nodesInfo)
},
func(ctx context.Context) error {
for i := 0; i < track+1; i++ {
Expand All @@ -238,11 +238,7 @@ func (c *Calcium) doAllocResource(ctx context.Context, opts *types.DeployOptions
},
c.config.GlobalTimeout,
)
}); err != nil {
return nil, err
}

return nodesInfo, c.doBindProcessStatus(ctx, opts, nodesInfo)
})
}

func (c *Calcium) doBindProcessStatus(ctx context.Context, opts *types.DeployOptions, nodesInfo []types.NodeInfo) error {
Expand Down
2 changes: 1 addition & 1 deletion cluster/calcium/resource_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -389,7 +389,7 @@ func testAllocFailedAsUpdateNodeResourceError(t *testing.T, c *Calcium, opts *ty
store.On("UpdateNodeResource",
mock.Anything, mock.Anything, mock.Anything, mock.Anything,
mock.Anything, mock.Anything, mock.Anything, mock.Anything,
).Return(types.ErrNoETCD).Twice()
).Return(types.ErrNoETCD).Once()
_, err := c.doAllocResource(context.Background(), opts)
assert.Error(t, err)
}
Expand Down
8 changes: 7 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -45,18 +45,24 @@ require (
github.com/sanity-io/litter v1.1.0
github.com/sirupsen/logrus v1.4.2
github.com/soheilhy/cmux v0.1.3 // indirect
github.com/stretchr/testify v1.4.0
github.com/stretchr/objx v0.2.0 // indirect
github.com/stretchr/testify v1.6.1
github.com/tmc/grpc-websocket-proxy v0.0.0-20170815181823-89b8d40f7ca8 // indirect
github.com/urfave/cli/v2 v2.0.0-alpha.2
github.com/vektra/mockery v1.1.2 // indirect
github.com/xiang90/probing v0.0.0-20160813154853-07dd2e8dfe18 // indirect
go.uber.org/atomic v1.3.2 // indirect
go.uber.org/automaxprocs v1.3.0
go.uber.org/multierr v1.1.0 // indirect
go.uber.org/zap v1.7.1 // indirect
golang.org/x/crypto v0.0.0-20200221231518-2aa609cf4a9d
golang.org/x/mod v0.3.0 // indirect
golang.org/x/net v0.0.0-20200319234117-63522dbf7eec
golang.org/x/time v0.0.0-20191024005414-555d28b269f0 // indirect
golang.org/x/tools v0.0.0-20200612022331-742c5eb664c2 // indirect
google.golang.org/grpc v1.28.0
gopkg.in/alexcesaro/statsd.v2 v2.0.0 // indirect
gopkg.in/yaml.v2 v2.3.0 // indirect
gopkg.in/yaml.v3 v3.0.0-20200605160147-a5ece683394c // indirect
gotest.tools v2.2.0+incompatible // indirect
)
Loading

0 comments on commit 33c735d

Please sign in to comment.