Skip to content

Commit

Permalink
wal for resource consistancy during creating
Browse files Browse the repository at this point in the history
  • Loading branch information
jschwinger233 committed Jan 12, 2022
1 parent 910dd96 commit 6b5aedc
Show file tree
Hide file tree
Showing 3 changed files with 91 additions and 8 deletions.
21 changes: 17 additions & 4 deletions cluster/calcium/create.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,15 @@ func (c *Calcium) doCreateWorkloads(ctx context.Context, opts *types.DeployOptio
cancel()
}()

var resourceCommit wal.Commit
defer func() {
if resourceCommit != nil {
if err := resourceCommit(); err != nil {
logger.Errorf(ctx, "commit wal failed: %s, %+v", eventWorkloadResourceAllocated, err)
}
}
}()

_ = utils.Txn(
ctx,

Expand Down Expand Up @@ -90,6 +99,9 @@ func (c *Calcium) doCreateWorkloads(ctx context.Context, opts *types.DeployOptio
return errors.WithStack(err)
}
}
if resourceCommit, err = c.wal.Log(eventWorkloadResourceAllocated, nodes); err != nil {
return
}
return errors.WithStack(c.store.UpdateNodes(ctx, nodes...))
})
},
Expand Down Expand Up @@ -246,6 +258,7 @@ func (c *Calcium) doDeployOneWorkload(
config *enginetypes.VirtualizationCreateOptions,
decrProcessing bool,
) (err error) {
logger := log.WithField("Calcium", "doDeployWorkload").WithField("nodename", node.Name).WithField("opts", opts).WithField("msg", msg)
workload := &types.Workload{
ResourceMeta: types.ResourceMeta{
CPU: msg.CPU,
Expand Down Expand Up @@ -276,7 +289,7 @@ func (c *Calcium) doDeployOneWorkload(
defer func() {
if commit != nil {
if err := commit(); err != nil {
log.Errorf(ctx, "[doDeployOneWorkload] Commit WAL %s failed: %v", eventCreateWorkload, err)
logger.Errorf(ctx, "Commit WAL %s failed: %+v", eventWorkloadCreated, err)
}
}
}()
Expand All @@ -292,7 +305,7 @@ func (c *Calcium) doDeployOneWorkload(
// We couldn't WAL the workload ID above VirtualizationCreate temporarily,
// so there's a time gap window, once the core process crashes between
// VirtualizationCreate and logCreateWorkload then the worload is leaky.
commit, err = c.wal.Log(eventCreateWorkload, &types.Workload{
commit, err = c.wal.Log(eventWorkloadCreated, &types.Workload{
ID: workload.ID,
Nodename: workload.Nodename,
})
Expand Down Expand Up @@ -376,13 +389,13 @@ func (c *Calcium) doDeployOneWorkload(

// remove workload
func(ctx context.Context, _ bool) error {
log.Errorf(ctx, "[doDeployOneWorkload] failed to deploy workload %s, rollback", workload.ID)
logger.Errorf(ctx, "[doDeployOneWorkload] failed to deploy workload %s, rollback", workload.ID)
if workload.ID == "" {
return nil
}

if err := c.store.RemoveWorkload(ctx, workload); err != nil {
log.Errorf(ctx, "[doDeployOneWorkload] failed to remove workload %s")
logger.Errorf(ctx, "[doDeployOneWorkload] failed to remove workload %s", workload.ID)
}

return workload.Remove(ctx, true)
Expand Down
2 changes: 1 addition & 1 deletion cluster/calcium/create_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,7 @@ func TestCreateWorkloadTxn(t *testing.T) {
walCommitted = true
return nil
})
mwal.On("Log", eventCreateWorkload, mock.Anything).Return(commit, nil)
mwal.On("Log", eventWorkloadCreated, mock.Anything).Return(commit, nil)
walCommitted = false
ch, err = c.CreateWorkload(ctx, opts)
assert.Nil(t, err)
Expand Down
76 changes: 73 additions & 3 deletions cluster/calcium/wal.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,14 @@ import (

"github.com/projecteru2/core/log"
"github.com/projecteru2/core/types"
"github.com/projecteru2/core/utils"
"github.com/projecteru2/core/wal"
)

const (
eventCreateLambda = "create-lambda"
eventCreateWorkload = "create-workload" // created but yet to start
eventCreateLambda = "create-lambda"
eventWorkloadCreated = "create-workload" // created but yet to start
eventWorkloadResourceAllocated = "allocate-workload" // resource updated in node meta but yet to create all workloads
)

// WAL for calcium.
Expand Down Expand Up @@ -42,6 +44,7 @@ func newCalciumWAL(cal *Calcium) (*WAL, error) {
func (w *WAL) registerHandlers() {
w.Register(newCreateLambdaHandler(w.calcium))
w.Register(newCreateWorkloadHandler(w.calcium))
w.Register(newWorkloadResourceAllocatedHandler(w.calcium))
}

func (w *WAL) logCreateLambda(opts *types.DeployOptions) (wal.Commit, error) {
Expand All @@ -60,7 +63,7 @@ type CreateWorkloadHandler struct {

func newCreateWorkloadHandler(cal *Calcium) *CreateWorkloadHandler {
return &CreateWorkloadHandler{
event: eventCreateWorkload,
event: eventWorkloadCreated,
calcium: cal,
}
}
Expand Down Expand Up @@ -227,3 +230,70 @@ func (h *CreateLambdaHandler) getWorkloadIDs(ctx context.Context, opts *types.Li
func getReplayContext(ctx context.Context) (context.Context, context.CancelFunc) {
return context.WithTimeout(ctx, time.Second*32)
}

type WorkloadResourceAllocatedHandler struct {
event string
calcium *Calcium
}

func newWorkloadResourceAllocatedHandler(cal *Calcium) *WorkloadResourceAllocatedHandler {
return &WorkloadResourceAllocatedHandler{
event: eventWorkloadResourceAllocated,
calcium: cal,
}
}

// Event .
func (h *WorkloadResourceAllocatedHandler) Event() string {
return h.event
}

// Check .
func (h *WorkloadResourceAllocatedHandler) Check(ctx context.Context, raw interface{}) (bool, error) {
if _, ok := raw.([]*types.Node); !ok {
return false, types.NewDetailedErr(types.ErrInvalidType, raw)
}
return true, nil
}

// Encode .
func (h *WorkloadResourceAllocatedHandler) Encode(raw interface{}) ([]byte, error) {
nodes, ok := raw.([]*types.Node)
if !ok {
return nil, types.NewDetailedErr(types.ErrInvalidType, raw)
}
return json.Marshal(nodes)
}

// Decode .
func (h *WorkloadResourceAllocatedHandler) Decode(bytes []byte) (interface{}, error) {
nodes := []*types.Node{}
return nodes, json.Unmarshal(bytes, &nodes)
}

// Handle .
func (h *WorkloadResourceAllocatedHandler) Handle(ctx context.Context, raw interface{}) (err error) {
nodes, _ := raw.([]*types.Node)
logger := log.WithField("WAL", "Handle").WithField("event", eventWorkloadResourceAllocated)

ctx, cancel := getReplayContext(ctx)
defer cancel()

pool := utils.NewGoroutinePool(20)
for _, node := range nodes {
pool.Go(ctx, func(nodename string) func() {
return func() {
{
if _, err = h.calcium.NodeResource(ctx, node.Name, true); err != nil {
logger.Errorf(ctx, "failed to fix node resource: %s, %+v", node.Name, err)
return
}
logger.Infof(ctx, "fixed node resource: %s", node.Name)
}
}
}(node.Name))
}
pool.Wait(ctx)

return nil
}

0 comments on commit 6b5aedc

Please sign in to comment.