Skip to content

Commit

Permalink
Overhaul wal for create workload (#534)
Browse files Browse the repository at this point in the history
* wal for createworkload will clean resources

* wal for resource consistancy during creating

* wal for processing

* handle created WAL by distinguishing status

* revise unittest set

* engine VirtualizationRemove return ErrWorkloadNotExists
  • Loading branch information
jschwinger233 authored Jan 29, 2022
1 parent 80fcdb5 commit 10b0708
Show file tree
Hide file tree
Showing 10 changed files with 259 additions and 109 deletions.
52 changes: 42 additions & 10 deletions cluster/calcium/create.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,14 +55,36 @@ func (c *Calcium) doCreateWorkloads(ctx context.Context, opts *types.DeployOptio
defer func() {
cctx, cancel := context.WithTimeout(utils.InheritTracingInfo(ctx, context.TODO()), c.config.GlobalTimeout)
for nodename := range deployMap {
if e := c.store.DeleteProcessing(cctx, opts.GetProcessing(nodename)); e != nil {
logger.Errorf(ctx, "[Calcium.doCreateWorkloads] delete processing failed for %s: %+v", nodename, e)
processing := opts.GetProcessing(nodename)
if err := c.store.DeleteProcessing(cctx, processing); err != nil {
logger.Errorf(ctx, "[Calcium.doCreateWorkloads] delete processing failed for %s: %+v", nodename, err)
}
}
close(ch)
cancel()
}()

var resourceCommit wal.Commit
defer func() {
if resourceCommit != nil {
if err := resourceCommit(); err != nil {
logger.Errorf(ctx, "commit wal failed: %s, %+v", eventWorkloadResourceAllocated, err)
}
}
}()

var processingCommits map[string]wal.Commit
defer func() {
for nodename := range processingCommits {
if processingCommits[nodename] == nil {
continue
}
if err := processingCommits[nodename](); err != nil {
logger.Errorf(ctx, "commit wal failed: %s, %s, %+v", eventProcessingCreated, nodename, err)
}
}
}()

_ = utils.Txn(
ctx,

Expand All @@ -81,15 +103,23 @@ func (c *Calcium) doCreateWorkloads(ctx context.Context, opts *types.DeployOptio

// commit changes
nodes := []*types.Node{}
processingCommits = make(map[string]wal.Commit)
for nodename, deploy := range deployMap {
for _, plan := range plans {
plan.ApplyChangesOnNode(nodeMap[nodename], utils.Range(deploy)...)
}
nodes = append(nodes, nodeMap[nodename])
if err = c.store.CreateProcessing(ctx, opts.GetProcessing(nodename), deploy); err != nil {
processing := opts.GetProcessing(nodename)
if processingCommits[nodename], err = c.wal.Log(eventProcessingCreated, processing); err != nil {
return errors.WithStack(err)
}
if err = c.store.CreateProcessing(ctx, processing, deploy); err != nil {
return errors.WithStack(err)
}
}
if resourceCommit, err = c.wal.Log(eventWorkloadResourceAllocated, nodes); err != nil {
return errors.WithStack(err)
}
return errors.WithStack(c.store.UpdateNodes(ctx, nodes...))
})
},
Expand Down Expand Up @@ -246,6 +276,7 @@ func (c *Calcium) doDeployOneWorkload(
config *enginetypes.VirtualizationCreateOptions,
decrProcessing bool,
) (err error) {
logger := log.WithField("Calcium", "doDeployWorkload").WithField("nodename", node.Name).WithField("opts", opts).WithField("msg", msg)
workload := &types.Workload{
ResourceMeta: types.ResourceMeta{
CPU: msg.CPU,
Expand Down Expand Up @@ -276,7 +307,7 @@ func (c *Calcium) doDeployOneWorkload(
defer func() {
if commit != nil {
if err := commit(); err != nil {
log.Errorf(ctx, "[doDeployOneWorkload] Commit WAL %s failed: %v", eventCreateWorkload, err)
logger.Errorf(ctx, "Commit WAL %s failed: %+v", eventWorkloadCreated, err)
}
}
}()
Expand All @@ -292,10 +323,11 @@ func (c *Calcium) doDeployOneWorkload(
// We couldn't WAL the workload ID above VirtualizationCreate temporarily,
// so there's a time gap window, once the core process crashes between
// VirtualizationCreate and logCreateWorkload then the worload is leaky.
if commit, err = c.wal.logCreateWorkload(workload.ID, node.Name); err != nil {
return err
}
return nil
commit, err = c.wal.Log(eventWorkloadCreated, &types.Workload{
ID: workload.ID,
Nodename: workload.Nodename,
})
return errors.WithStack(err)
},

func(ctx context.Context) (err error) {
Expand Down Expand Up @@ -375,13 +407,13 @@ func (c *Calcium) doDeployOneWorkload(

// remove workload
func(ctx context.Context, _ bool) error {
log.Errorf(ctx, "[doDeployOneWorkload] failed to deploy workload %s, rollback", workload.ID)
logger.Errorf(ctx, "[doDeployOneWorkload] failed to deploy workload %s, rollback", workload.ID)
if workload.ID == "" {
return nil
}

if err := c.store.RemoveWorkload(ctx, workload); err != nil {
log.Errorf(ctx, "[doDeployOneWorkload] failed to remove workload %s")
logger.Errorf(ctx, "[doDeployOneWorkload] failed to remove workload %s", workload.ID)
}

return workload.Remove(ctx, true)
Expand Down
20 changes: 9 additions & 11 deletions cluster/calcium/create_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"testing"

"github.com/pkg/errors"
enginemocks "github.com/projecteru2/core/engine/mocks"
enginetypes "github.com/projecteru2/core/engine/types"
lockmocks "github.com/projecteru2/core/lock/mocks"
Expand All @@ -15,8 +16,6 @@ import (
"github.com/projecteru2/core/types"
"github.com/projecteru2/core/wal"
walmocks "github.com/projecteru2/core/wal/mocks"

"github.com/pkg/errors"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
)
Expand Down Expand Up @@ -107,8 +106,12 @@ func TestCreateWorkloadTxn(t *testing.T) {

c.wal = &WAL{WAL: &walmocks.WAL{}}
mwal := c.wal.WAL.(*walmocks.WAL)
defer mwal.AssertExpectations(t)
var walCommitted bool
commit := wal.Commit(func() error {
walCommitted = true
return nil
})
mwal.On("Log", mock.Anything, mock.Anything).Return(commit, nil)

store.On("CreateProcessing", mock.Anything, mock.Anything, mock.Anything).Return(nil)
store.On("DeleteProcessing", mock.Anything, mock.Anything, mock.Anything).Return(nil)
Expand Down Expand Up @@ -191,7 +194,7 @@ func TestCreateWorkloadTxn(t *testing.T) {
assert.EqualValues(t, 1, node2.CPUUsed)
node1.CPUUsed = 0
node2.CPUUsed = 0
assert.False(t, walCommitted)
assert.True(t, walCommitted)

// doCreateWorkloadOnNode fails: doGetAndPrepareNode
store.On("UpdateNodes", mock.Anything, mock.Anything, mock.Anything).Return(nil)
Expand Down Expand Up @@ -224,7 +227,7 @@ func TestCreateWorkloadTxn(t *testing.T) {
assert.EqualValues(t, 2, cnt)
assert.EqualValues(t, 0, node1.CPUUsed)
assert.EqualValues(t, 0, node2.CPUUsed)
assert.False(t, walCommitted)
assert.True(t, walCommitted)

// doDeployOneWorkload fails: VirtualizationCreate
engine.On("ImageLocalDigests", mock.Anything, mock.Anything).Return([]string{""}, nil)
Expand All @@ -245,18 +248,13 @@ func TestCreateWorkloadTxn(t *testing.T) {
assert.EqualValues(t, 2, cnt)
assert.EqualValues(t, 0, node1.CPUUsed)
assert.EqualValues(t, 0, node2.CPUUsed)
assert.False(t, walCommitted)
assert.True(t, walCommitted)

// doCreateAndStartWorkload fails: AddWorkload
engine.On("VirtualizationCreate", mock.Anything, mock.Anything).Return(&enginetypes.VirtualizationCreated{ID: "c1"}, nil)
engine.On("VirtualizationStart", mock.Anything, mock.Anything).Return(nil)
engine.On("VirtualizationInspect", mock.Anything, mock.Anything).Return(&enginetypes.VirtualizationInfo{}, nil)
store.On("AddWorkload", mock.Anything, mock.Anything, mock.Anything).Return(errors.Wrap(context.DeadlineExceeded, "AddWorkload")).Twice()
commit := wal.Commit(func() error {
walCommitted = true
return nil
})
mwal.On("Log", eventCreateWorkload, mock.Anything).Return(commit, nil)
walCommitted = false
ch, err = c.CreateWorkload(ctx, opts)
assert.Nil(t, err)
Expand Down
3 changes: 2 additions & 1 deletion cluster/calcium/lambda_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@ func TestRunAndWaitFailedThenWALCommitted(t *testing.T) {
c.wal = &WAL{WAL: &walmocks.WAL{}}

mwal := c.wal.WAL.(*walmocks.WAL)
defer mwal.AssertExpectations(t)
defer mwal.AssertNotCalled(t, "Log")
mwal.On("Log", mock.Anything, mock.Anything).Return(nil, nil)

opts := &types.DeployOptions{
Name: "zc:name",
Expand Down
3 changes: 0 additions & 3 deletions cluster/calcium/replace_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,9 +168,6 @@ func TestReplaceWorkload(t *testing.T) {
// failed by VirtualizationCreate
engine.On("VirtualizationCreate", mock.Anything, mock.Anything).Return(nil, types.ErrCannotGetEngine).Once()
engine.On("VirtualizationStart", mock.Anything, mock.Anything).Return(types.ErrCannotGetEngine).Once()
//store.On("UpdateNodeResource", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil).Once()
//engine.On("VirtualizationRemove", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil).Once()
//store.On("RemoveWorkload", mock.Anything, mock.Anything).Return(nil).Once()
ch, err = c.ReplaceWorkload(ctx, opts)
assert.NoError(t, err)
for r := range ch {
Expand Down
Loading

0 comments on commit 10b0708

Please sign in to comment.