Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Overhaul wal for create workload #534

Merged
merged 6 commits into from
Jan 29, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 42 additions & 10 deletions cluster/calcium/create.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,14 +55,36 @@ func (c *Calcium) doCreateWorkloads(ctx context.Context, opts *types.DeployOptio
defer func() {
cctx, cancel := context.WithTimeout(utils.InheritTracingInfo(ctx, context.TODO()), c.config.GlobalTimeout)
for nodename := range deployMap {
if e := c.store.DeleteProcessing(cctx, opts.GetProcessing(nodename)); e != nil {
logger.Errorf(ctx, "[Calcium.doCreateWorkloads] delete processing failed for %s: %+v", nodename, e)
processing := opts.GetProcessing(nodename)
if err := c.store.DeleteProcessing(cctx, processing); err != nil {
logger.Errorf(ctx, "[Calcium.doCreateWorkloads] delete processing failed for %s: %+v", nodename, err)
}
}
close(ch)
cancel()
}()

var resourceCommit wal.Commit
defer func() {
if resourceCommit != nil {
if err := resourceCommit(); err != nil {
logger.Errorf(ctx, "commit wal failed: %s, %+v", eventWorkloadResourceAllocated, err)
}
}
}()

var processingCommits map[string]wal.Commit
defer func() {
for nodename := range processingCommits {
if processingCommits[nodename] == nil {
continue
}
if err := processingCommits[nodename](); err != nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we make sure the processingCommits[nodename]is callable or nil while c.wal.Log returns an error?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

加了一个判断

logger.Errorf(ctx, "commit wal failed: %s, %s, %+v", eventProcessingCreated, nodename, err)
}
}
}()

_ = utils.Txn(
ctx,

Expand All @@ -81,15 +103,23 @@ func (c *Calcium) doCreateWorkloads(ctx context.Context, opts *types.DeployOptio

// commit changes
nodes := []*types.Node{}
processingCommits = make(map[string]wal.Commit)
for nodename, deploy := range deployMap {
for _, plan := range plans {
plan.ApplyChangesOnNode(nodeMap[nodename], utils.Range(deploy)...)
}
nodes = append(nodes, nodeMap[nodename])
if err = c.store.CreateProcessing(ctx, opts.GetProcessing(nodename), deploy); err != nil {
processing := opts.GetProcessing(nodename)
if processingCommits[nodename], err = c.wal.Log(eventProcessingCreated, processing); err != nil {
return errors.WithStack(err)
}
if err = c.store.CreateProcessing(ctx, processing, deploy); err != nil {
return errors.WithStack(err)
}
}
if resourceCommit, err = c.wal.Log(eventWorkloadResourceAllocated, nodes); err != nil {
return errors.WithStack(err)
}
return errors.WithStack(c.store.UpdateNodes(ctx, nodes...))
})
},
Expand Down Expand Up @@ -246,6 +276,7 @@ func (c *Calcium) doDeployOneWorkload(
config *enginetypes.VirtualizationCreateOptions,
decrProcessing bool,
) (err error) {
logger := log.WithField("Calcium", "doDeployWorkload").WithField("nodename", node.Name).WithField("opts", opts).WithField("msg", msg)
workload := &types.Workload{
ResourceMeta: types.ResourceMeta{
CPU: msg.CPU,
Expand Down Expand Up @@ -276,7 +307,7 @@ func (c *Calcium) doDeployOneWorkload(
defer func() {
if commit != nil {
if err := commit(); err != nil {
log.Errorf(ctx, "[doDeployOneWorkload] Commit WAL %s failed: %v", eventCreateWorkload, err)
logger.Errorf(ctx, "Commit WAL %s failed: %+v", eventWorkloadCreated, err)
}
}
}()
Expand All @@ -292,10 +323,11 @@ func (c *Calcium) doDeployOneWorkload(
// We couldn't WAL the workload ID above VirtualizationCreate temporarily,
// so there's a time gap window, once the core process crashes between
// VirtualizationCreate and logCreateWorkload then the worload is leaky.
if commit, err = c.wal.logCreateWorkload(workload.ID, node.Name); err != nil {
return err
}
return nil
commit, err = c.wal.Log(eventWorkloadCreated, &types.Workload{
ID: workload.ID,
Nodename: workload.Nodename,
})
return errors.WithStack(err)
},

func(ctx context.Context) (err error) {
Expand Down Expand Up @@ -375,13 +407,13 @@ func (c *Calcium) doDeployOneWorkload(

// remove workload
func(ctx context.Context, _ bool) error {
log.Errorf(ctx, "[doDeployOneWorkload] failed to deploy workload %s, rollback", workload.ID)
logger.Errorf(ctx, "[doDeployOneWorkload] failed to deploy workload %s, rollback", workload.ID)
if workload.ID == "" {
return nil
}

if err := c.store.RemoveWorkload(ctx, workload); err != nil {
log.Errorf(ctx, "[doDeployOneWorkload] failed to remove workload %s")
logger.Errorf(ctx, "[doDeployOneWorkload] failed to remove workload %s", workload.ID)
}

return workload.Remove(ctx, true)
Expand Down
20 changes: 9 additions & 11 deletions cluster/calcium/create_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"testing"

"github.com/pkg/errors"
enginemocks "github.com/projecteru2/core/engine/mocks"
enginetypes "github.com/projecteru2/core/engine/types"
lockmocks "github.com/projecteru2/core/lock/mocks"
Expand All @@ -15,8 +16,6 @@ import (
"github.com/projecteru2/core/types"
"github.com/projecteru2/core/wal"
walmocks "github.com/projecteru2/core/wal/mocks"

"github.com/pkg/errors"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
)
Expand Down Expand Up @@ -107,8 +106,12 @@ func TestCreateWorkloadTxn(t *testing.T) {

c.wal = &WAL{WAL: &walmocks.WAL{}}
mwal := c.wal.WAL.(*walmocks.WAL)
defer mwal.AssertExpectations(t)
var walCommitted bool
commit := wal.Commit(func() error {
walCommitted = true
return nil
})
mwal.On("Log", mock.Anything, mock.Anything).Return(commit, nil)

store.On("CreateProcessing", mock.Anything, mock.Anything, mock.Anything).Return(nil)
store.On("DeleteProcessing", mock.Anything, mock.Anything, mock.Anything).Return(nil)
Expand Down Expand Up @@ -191,7 +194,7 @@ func TestCreateWorkloadTxn(t *testing.T) {
assert.EqualValues(t, 1, node2.CPUUsed)
node1.CPUUsed = 0
node2.CPUUsed = 0
assert.False(t, walCommitted)
assert.True(t, walCommitted)

// doCreateWorkloadOnNode fails: doGetAndPrepareNode
store.On("UpdateNodes", mock.Anything, mock.Anything, mock.Anything).Return(nil)
Expand Down Expand Up @@ -224,7 +227,7 @@ func TestCreateWorkloadTxn(t *testing.T) {
assert.EqualValues(t, 2, cnt)
assert.EqualValues(t, 0, node1.CPUUsed)
assert.EqualValues(t, 0, node2.CPUUsed)
assert.False(t, walCommitted)
assert.True(t, walCommitted)

// doDeployOneWorkload fails: VirtualizationCreate
engine.On("ImageLocalDigests", mock.Anything, mock.Anything).Return([]string{""}, nil)
Expand All @@ -245,18 +248,13 @@ func TestCreateWorkloadTxn(t *testing.T) {
assert.EqualValues(t, 2, cnt)
assert.EqualValues(t, 0, node1.CPUUsed)
assert.EqualValues(t, 0, node2.CPUUsed)
assert.False(t, walCommitted)
assert.True(t, walCommitted)

// doCreateAndStartWorkload fails: AddWorkload
engine.On("VirtualizationCreate", mock.Anything, mock.Anything).Return(&enginetypes.VirtualizationCreated{ID: "c1"}, nil)
engine.On("VirtualizationStart", mock.Anything, mock.Anything).Return(nil)
engine.On("VirtualizationInspect", mock.Anything, mock.Anything).Return(&enginetypes.VirtualizationInfo{}, nil)
store.On("AddWorkload", mock.Anything, mock.Anything, mock.Anything).Return(errors.Wrap(context.DeadlineExceeded, "AddWorkload")).Twice()
commit := wal.Commit(func() error {
walCommitted = true
return nil
})
mwal.On("Log", eventCreateWorkload, mock.Anything).Return(commit, nil)
walCommitted = false
ch, err = c.CreateWorkload(ctx, opts)
assert.Nil(t, err)
Expand Down
3 changes: 2 additions & 1 deletion cluster/calcium/lambda_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@ func TestRunAndWaitFailedThenWALCommitted(t *testing.T) {
c.wal = &WAL{WAL: &walmocks.WAL{}}

mwal := c.wal.WAL.(*walmocks.WAL)
defer mwal.AssertExpectations(t)
defer mwal.AssertNotCalled(t, "Log")
mwal.On("Log", mock.Anything, mock.Anything).Return(nil, nil)

opts := &types.DeployOptions{
Name: "zc:name",
Expand Down
3 changes: 0 additions & 3 deletions cluster/calcium/replace_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,9 +168,6 @@ func TestReplaceWorkload(t *testing.T) {
// failed by VirtualizationCreate
engine.On("VirtualizationCreate", mock.Anything, mock.Anything).Return(nil, types.ErrCannotGetEngine).Once()
engine.On("VirtualizationStart", mock.Anything, mock.Anything).Return(types.ErrCannotGetEngine).Once()
//store.On("UpdateNodeResource", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil).Once()
//engine.On("VirtualizationRemove", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil).Once()
//store.On("RemoveWorkload", mock.Anything, mock.Anything).Return(nil).Once()
ch, err = c.ReplaceWorkload(ctx, opts)
assert.NoError(t, err)
for r := range ch {
Expand Down
Loading