Skip to content

Commit

Permalink
handle created WAL by distinguishing status
Browse files Browse the repository at this point in the history
  • Loading branch information
jschwinger233 committed Jan 20, 2022
1 parent 607336a commit 1f31358
Show file tree
Hide file tree
Showing 8 changed files with 58 additions and 58 deletions.
67 changes: 24 additions & 43 deletions cluster/calcium/wal.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package calcium
import (
"context"
"encoding/json"
"strings"
"time"

"github.com/projecteru2/core/log"
Expand Down Expand Up @@ -49,13 +48,6 @@ func (w *WAL) registerHandlers() {
w.Register(newProcessingCreatedHandler(w.calcium))
}

func (w *WAL) logCreateWorkload(workloadID, nodename string) (wal.Commit, error) {
return w.Log(eventCreateWorkload, &types.Workload{
ID: workloadID,
Nodename: nodename,
})
}

func (w *WAL) logCreateLambda(opts *types.CreateWorkloadMessage) (wal.Commit, error) {
return w.Log(eventCreateLambda, opts.WorkloadID)
}
Expand All @@ -79,30 +71,12 @@ func (h *CreateWorkloadHandler) Event() string {
}

// Check .
func (h *CreateWorkloadHandler) Check(ctx context.Context, raw interface{}) (bool, error) {
wrk, ok := raw.(*types.Workload)
func (h *CreateWorkloadHandler) Check(ctx context.Context, raw interface{}) (handle bool, err error) {
_, ok := raw.(*types.Workload)
if !ok {
return false, types.NewDetailedErr(types.ErrInvalidType, raw)
}
logger := log.WithField("WAL.Check", "CreateWorkload").WithField("ID", wrk.ID)

ctx, cancel := getReplayContext(ctx)
defer cancel()

_, err := h.calcium.GetWorkload(ctx, wrk.ID)
switch {
// there has been an exact workload metadata.
case err == nil:
return false, nil

case strings.HasPrefix(err.Error(), types.ErrBadCount.Error()):
logger.Errorf(ctx, "No such workload")
return true, nil

default:
logger.Errorf(ctx, "Unexpected error: %v", err)
return false, err
}
return true, nil
}

// Encode .
Expand All @@ -121,31 +95,38 @@ func (h *CreateWorkloadHandler) Decode(bs []byte) (interface{}, error) {
return wrk, err
}

// Handle .
// Handle: remove instance, remove meta, restore resource
func (h *CreateWorkloadHandler) Handle(ctx context.Context, raw interface{}) (err error) {
wrk, ok := raw.(*types.Workload)
if !ok {
return types.NewDetailedErr(types.ErrInvalidType, raw)
}
wrk, _ := raw.(*types.Workload)
logger := log.WithField("WAL.Handle", "CreateWorkload").WithField("ID", wrk.ID).WithField("nodename", wrk.Nodename)

ctx, cancel := getReplayContext(ctx)
defer cancel()

ch, err := h.calcium.RemoveWorkload(ctx, []string{wrk.ID}, true, 0)
if _, err = h.calcium.GetWorkload(ctx, wrk.ID); err == nil {
// workload meta exists
ch, err := h.calcium.RemoveWorkload(ctx, []string{wrk.ID}, true, 0)
if err != nil {
return logger.Err(ctx, err)
}
for msg := range ch {
if !msg.Success {
logger.Errorf(ctx, "failed to remove workload")
}
}
return nil
}

// workload meta doesn't exist
node, err := h.calcium.GetNode(ctx, wrk.Nodename)
if err != nil {
logger.Errorf(ctx, "failed to remove workload")
return
return logger.Err(ctx, err)
}
for msg := range ch {
if !msg.Success {
logger.Errorf(ctx, "failed to remove workload")
return nil
}
if _, err = node.Engine.VirtualizationRemove(ctx, wrk.ID, true, true); err != nil {
return logger.Err(ctx, err)
}

logger.Infof(ctx, "workload removed")

return nil
}

Expand Down
10 changes: 8 additions & 2 deletions engine/docker/container.go
Original file line number Diff line number Diff line change
Expand Up @@ -346,8 +346,14 @@ func (e *Engine) VirtualizationStop(ctx context.Context, ID string, gracefulTime
}

// VirtualizationRemove remove virtualization
func (e *Engine) VirtualizationRemove(ctx context.Context, ID string, removeVolumes, force bool) error {
return e.client.ContainerRemove(ctx, ID, dockertypes.ContainerRemoveOptions{RemoveVolumes: removeVolumes, Force: force})
func (e *Engine) VirtualizationRemove(ctx context.Context, ID string, removeVolumes, force bool) (removed int, err error) {
if err = e.client.ContainerRemove(ctx, ID, dockertypes.ContainerRemoveOptions{RemoveVolumes: removeVolumes, Force: force}); err == nil {
return 1, nil
}
if strings.Contains(err.Error(), "No such container") {
return 0, nil
}
return
}

// VirtualizationInspect get virtualization info
Expand Down
2 changes: 1 addition & 1 deletion engine/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ type API interface {
VirtualizationCopyTo(ctx context.Context, ID, target string, content []byte, uid, gid int, mode int64) error
VirtualizationStart(ctx context.Context, ID string) error
VirtualizationStop(ctx context.Context, ID string, gracefulTimeout time.Duration) error
VirtualizationRemove(ctx context.Context, ID string, volumes, force bool) error
VirtualizationRemove(ctx context.Context, ID string, volumes, force bool) (int, error)
VirtualizationInspect(ctx context.Context, ID string) (*enginetypes.VirtualizationInfo, error)
VirtualizationLogs(ctx context.Context, opts *enginetypes.VirtualizationLogStreamOptions) (stdout, stderr io.ReadCloser, err error)
VirtualizationAttach(ctx context.Context, ID string, stream, openStdin bool) (stdout, stderr io.ReadCloser, stdin io.WriteCloser, err error)
Expand Down
4 changes: 2 additions & 2 deletions engine/fake/fake.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,8 +139,8 @@ func (f *Engine) VirtualizationStop(ctx context.Context, ID string, gracefulTime
}

// VirtualizationRemove .
func (f *Engine) VirtualizationRemove(ctx context.Context, ID string, volumes, force bool) error {
return types.ErrNilEngine
func (f *Engine) VirtualizationRemove(ctx context.Context, ID string, volumes, force bool) (int, error) {
return 0, types.ErrNilEngine
}

// VirtualizationInspect .
Expand Down
19 changes: 13 additions & 6 deletions engine/mocks/API.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion engine/mocks/fakeengine/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ func MakeClient(ctx context.Context, config coretypes.Config, nodename, endpoint
e.On("VirtualizationCopyTo", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil)
e.On("VirtualizationStart", mock.Anything, mock.Anything).Return(nil)
e.On("VirtualizationStop", mock.Anything, mock.Anything, mock.Anything).Return(nil)
e.On("VirtualizationRemove", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil)
e.On("VirtualizationRemove", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(1, nil)
vcJSON := &enginetypes.VirtualizationInfo{ID: ID, Image: "mock-image", Running: true, Networks: map[string]string{"mock-network": "1.1.1.1"}}
e.On("VirtualizationInspect", mock.Anything, mock.Anything).Return(vcJSON, nil)
logs := ioutil.NopCloser(bytes.NewBufferString("logs1...\nlogs2...\n"))
Expand Down
9 changes: 7 additions & 2 deletions engine/virt/virt.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,8 +228,13 @@ func (v *Virt) VirtualizationStop(ctx context.Context, ID string, gracefulTimeou
}

// VirtualizationRemove removes a guest.
func (v *Virt) VirtualizationRemove(ctx context.Context, ID string, volumes, force bool) (err error) {
_, err = v.client.DestroyGuest(ctx, ID, force)
func (v *Virt) VirtualizationRemove(ctx context.Context, ID string, volumes, force bool) (removed int, err error) {
if _, err = v.client.DestroyGuest(ctx, ID, force); err == nil {
return 1, nil
}
if strings.Contains(err.Error(), "key not exists") {
return 0, nil
}
return
}

Expand Down
3 changes: 2 additions & 1 deletion types/workload.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,8 @@ func (c *Workload) Remove(ctx context.Context, force bool) error {
if c.Engine == nil {
return errors.WithStack(ErrNilEngine)
}
return errors.WithStack(c.Engine.VirtualizationRemove(ctx, c.ID, true, force))
_, err := c.Engine.VirtualizationRemove(ctx, c.ID, true, force)
return errors.WithStack(err)
}

// WorkloadStatus store deploy status
Expand Down

0 comments on commit 1f31358

Please sign in to comment.