Skip to content

Commit

Permalink
wal for processing
Browse files Browse the repository at this point in the history
  • Loading branch information
jschwinger233 committed Jan 12, 2022
1 parent 6b5aedc commit d6da4e9
Show file tree
Hide file tree
Showing 2 changed files with 67 additions and 4 deletions.
16 changes: 12 additions & 4 deletions cluster/calcium/create.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,8 @@ func (c *Calcium) doCreateWorkloads(ctx context.Context, opts *types.DeployOptio
defer func() {
cctx, cancel := context.WithTimeout(utils.InheritTracingInfo(ctx, context.TODO()), c.config.GlobalTimeout)
for nodename := range deployMap {
if e := c.store.DeleteProcessing(cctx, opts.GetProcessing(nodename)); e != nil {
processing := opts.GetProcessing(nodename)
if e := c.store.DeleteProcessing(cctx, processing); e != nil {
logger.Errorf(ctx, "[Calcium.doCreateWorkloads] delete processing failed for %s: %+v", nodename, e)
}
}
Expand All @@ -72,6 +73,8 @@ func (c *Calcium) doCreateWorkloads(ctx context.Context, opts *types.DeployOptio
}
}()

var processingCommits map[string]wal.Commit

_ = utils.Txn(
ctx,

Expand All @@ -90,17 +93,22 @@ func (c *Calcium) doCreateWorkloads(ctx context.Context, opts *types.DeployOptio

// commit changes
nodes := []*types.Node{}
processingCommits = make(map[string]wal.Commit)
for nodename, deploy := range deployMap {
for _, plan := range plans {
plan.ApplyChangesOnNode(nodeMap[nodename], utils.Range(deploy)...)
}
nodes = append(nodes, nodeMap[nodename])
if err = c.store.CreateProcessing(ctx, opts.GetProcessing(nodename), deploy); err != nil {
processing := opts.GetProcessing(nodename)
if processingCommits[nodename], err = c.wal.Log(eventProcessingCreated, processing); err != nil {
return errors.WithStack(err)
}
if err = c.store.CreateProcessing(ctx, processing, deploy); err != nil {
return errors.WithStack(err)
}
}
if resourceCommit, err = c.wal.Log(eventWorkloadResourceAllocated, nodes); err != nil {
return
return errors.WithStack(err)
}
return errors.WithStack(c.store.UpdateNodes(ctx, nodes...))
})
Expand Down Expand Up @@ -309,7 +317,7 @@ func (c *Calcium) doDeployOneWorkload(
ID: workload.ID,
Nodename: workload.Nodename,
})
return nil
return errors.WithStack(err)
},

func(ctx context.Context) (err error) {
Expand Down
55 changes: 55 additions & 0 deletions cluster/calcium/wal.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ const (
eventCreateLambda = "create-lambda"
eventWorkloadCreated = "create-workload" // created but yet to start
eventWorkloadResourceAllocated = "allocate-workload" // resource updated in node meta but yet to create all workloads
eventProcessingCreated = "create-processing" // processing created but yet to delete
)

// WAL for calcium.
Expand Down Expand Up @@ -297,3 +298,57 @@ func (h *WorkloadResourceAllocatedHandler) Handle(ctx context.Context, raw inter

return nil
}

type ProcessingCreatedHandler struct {
event string
calcium *Calcium
}

func newProcessingCreatedHandler(cal *Calcium) *ProcessingCreatedHandler {
return &ProcessingCreatedHandler{
event: eventProcessingCreated,
calcium: cal,
}
}

// Event .
func (h *ProcessingCreatedHandler) Event() string {
return h.event
}

// Check .
func (h ProcessingCreatedHandler) Check(ctx context.Context, raw interface{}) (bool, error) {
if _, ok := raw.(*types.Processing); !ok {
return false, types.NewDetailedErr(types.ErrInvalidType, raw)
}
return true, nil
}

// Encode .
func (h *ProcessingCreatedHandler) Encode(raw interface{}) ([]byte, error) {
processing, ok := raw.(*types.Processing)
if !ok {
return nil, types.NewDetailedErr(types.ErrInvalidType, raw)
}
return json.Marshal(processing)
}

// Decode .
func (h *ProcessingCreatedHandler) Decode(bs []byte) (interface{}, error) {
processing := &types.Processing{}
return processing, json.Unmarshal(bs, processing)
}

// Handle .
func (h *ProcessingCreatedHandler) Handle(ctx context.Context, raw interface{}) (err error) {
processing, _ := raw.(*types.Processing)
logger := log.WithField("WAL", "Handle").WithField("event", eventProcessingCreated)

ctx, cancel := getReplayContext(ctx)
defer cancel()

if err = h.calcium.store.DeleteProcessing(ctx, processing); err != nil {
logger.Errorf(ctx, "faild to delete processing %s", processing.Ident)
}
return
}

0 comments on commit d6da4e9

Please sign in to comment.