Skip to content

Commit

Permalink
refactor wal stage 1 (#561)
Browse files Browse the repository at this point in the history
* refactor wal2

* remove useless step in remove

* minor refactor createworkloadhandler
  • Loading branch information
CMGS authored Mar 24, 2022
1 parent ae9e4e5 commit 43d7381
Show file tree
Hide file tree
Showing 33 changed files with 1,204 additions and 1,285 deletions.
2 changes: 1 addition & 1 deletion 3rdmocks/ServerStream.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ mock: deps
mockery --dir store --output store/mocks --name Store
mockery --dir engine --output engine/mocks --name API
mockery --dir cluster --output cluster/mocks --name Cluster
mockery --dir wal --output wal/mocks --name WAL
mockery --dir lock --output lock/mocks --name DistributedLock
mockery --dir store/etcdv3/meta --output store/etcdv3/meta/mocks --all
mockery --dir vendor/go.etcd.io/etcd/client/v3 --output store/etcdv3/meta/mocks --name Txn
Expand Down
2 changes: 1 addition & 1 deletion cluster/calcium/calcium.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func New(config types.Config, t *testing.T) (*Calcium, error) {
watcher := helium.New(config.GRPCConfig, store)

cal := &Calcium{store: store, config: config, scheduler: potassium, source: scm, watcher: watcher}
cal.wal, err = newCalciumWAL(cal)
cal.wal, err = newWAL(config, cal)
cal.identifier = config.Identifier()

return cal, logger.Err(nil, errors.WithStack(err)) //nolint
Expand Down
7 changes: 1 addition & 6 deletions cluster/calcium/lambda.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import (
"github.com/projecteru2/core/strategy"
"github.com/projecteru2/core/types"
"github.com/projecteru2/core/utils"
"github.com/projecteru2/core/wal"

"github.com/pkg/errors"
)
Expand Down Expand Up @@ -68,7 +67,7 @@ func (c *Calcium) RunAndWait(ctx context.Context, opts *types.DeployOptions, inC
}
}

commit, err := c.walCreateLambda(message)
commit, err := c.wal.Log(eventCreateLambda, message.WorkloadID)
if err != nil {
return &types.AttachWorkloadMessage{
WorkloadID: message.WorkloadID,
Expand Down Expand Up @@ -197,7 +196,3 @@ func (c *Calcium) RunAndWait(ctx context.Context, opts *types.DeployOptions, inC

return workloadIDs, runMsgCh, nil
}

func (c *Calcium) walCreateLambda(opts *types.CreateWorkloadMessage) (wal.Commit, error) {
return c.wal.logCreateLambda(opts)
}
9 changes: 5 additions & 4 deletions cluster/calcium/remove.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ import (

// RemoveWorkload remove workloads
// returns a channel that contains removing responses
func (c *Calcium) RemoveWorkload(ctx context.Context, ids []string, force bool, step int) (chan *types.RemoveWorkloadMessage, error) {
logger := log.WithField("Calcium", "RemoveWorkload").WithField("ids", ids).WithField("force", force).WithField("step", step)
func (c *Calcium) RemoveWorkload(ctx context.Context, ids []string, force bool) (chan *types.RemoveWorkloadMessage, error) {
logger := log.WithField("Calcium", "RemoveWorkload").WithField("ids", ids).WithField("force", force)

nodeWorkloadGroup, err := c.groupWorkloadsByNode(ctx, ids)
if err != nil {
Expand Down Expand Up @@ -45,7 +45,7 @@ func (c *Calcium) RemoveWorkload(ctx context.Context, ids []string, force bool,
},
// then
func(ctx context.Context) (err error) {
if err = c.doRemoveWorkload(ctx, workload, force); err != nil {
if err = c.doRemoveWorkload(ctx, workload, force); err == nil {
log.Infof(ctx, "[RemoveWorkload] Workload %s removed", workload.ID)
}
return err
Expand Down Expand Up @@ -104,12 +104,13 @@ func (c *Calcium) doRemoveWorkload(ctx context.Context, workload *types.Workload

// 同步地删除容器, 在某些需要等待的场合异常有用!
func (c *Calcium) doRemoveWorkloadSync(ctx context.Context, ids []string) error {
ch, err := c.RemoveWorkload(ctx, ids, true, 1)
ch, err := c.RemoveWorkload(ctx, ids, true)
if err != nil {
return err
}

for m := range ch {
// TODO deal with failed
log.Debugf(ctx, "[doRemoveWorkloadSync] Removed %s", m.WorkloadID)
}
return nil
Expand Down
8 changes: 4 additions & 4 deletions cluster/calcium/remove_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ func TestRemoveWorkload(t *testing.T) {

// failed by GetWorkload
store.On("GetWorkloads", mock.Anything, mock.Anything).Return(nil, types.ErrNoETCD).Once()
ch, err := c.RemoveWorkload(ctx, []string{"xx"}, false, 0)
ch, err := c.RemoveWorkload(ctx, []string{"xx"}, false)
assert.True(t, errors.Is(err, types.ErrNoETCD))
store.AssertExpectations(t)

Expand All @@ -36,7 +36,7 @@ func TestRemoveWorkload(t *testing.T) {
}
store.On("GetWorkloads", mock.Anything, mock.Anything).Return([]*types.Workload{workload}, nil)
store.On("GetNode", mock.Anything, mock.Anything).Return(nil, types.ErrNoETCD).Once()
ch, err = c.RemoveWorkload(ctx, []string{"xx"}, false, 0)
ch, err = c.RemoveWorkload(ctx, []string{"xx"}, false)
assert.NoError(t, err)
for r := range ch {
assert.False(t, r.Success)
Expand All @@ -54,7 +54,7 @@ func TestRemoveWorkload(t *testing.T) {
store.On("GetNode", mock.Anything, mock.Anything).Return(node, nil)
store.On("RemoveWorkload", mock.Anything, mock.Anything).Return(types.ErrNoETCD).Twice()
store.On("ListNodeWorkloads", mock.Anything, mock.Anything, mock.Anything).Return(nil, types.ErrNoETCD)
ch, err = c.RemoveWorkload(ctx, []string{"xx"}, false, 0)
ch, err = c.RemoveWorkload(ctx, []string{"xx"}, false)
assert.NoError(t, err)
for r := range ch {
assert.False(t, r.Success)
Expand All @@ -69,7 +69,7 @@ func TestRemoveWorkload(t *testing.T) {
store.On("GetWorkloads", mock.Anything, mock.Anything).Return([]*types.Workload{workload}, nil)
store.On("RemoveWorkload", mock.Anything, mock.Anything).Return(nil)
store.On("UpdateNodeResource", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil)
ch, err = c.RemoveWorkload(ctx, []string{"xx"}, false, 0)
ch, err = c.RemoveWorkload(ctx, []string{"xx"}, false)
assert.NoError(t, err)
for r := range ch {
assert.True(t, r.Success)
Expand Down
207 changes: 95 additions & 112 deletions cluster/calcium/wal.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,23 +22,21 @@ const (
// WAL for calcium.
type WAL struct {
wal.WAL
config types.Config
calcium *Calcium
}

func newCalciumWAL(cal *Calcium) (*WAL, error) {
w := &WAL{
WAL: wal.NewHydro(),
config: cal.config,
calcium: cal,
func newWAL(config types.Config, calcium *Calcium) (*WAL, error) {
hydro, err := wal.NewHydro(config.WALFile, config.WALOpenTimeout)
if err != nil {
return nil, err
}

if err := w.WAL.Open(w.config.WALFile, w.config.WALOpenTimeout); err != nil {
return nil, err
w := &WAL{
WAL: hydro,
calcium: calcium,
}

w.registerHandlers()

return w, nil
}

Expand All @@ -49,105 +47,22 @@ func (w *WAL) registerHandlers() {
w.Register(newProcessingCreatedHandler(w.calcium))
}

func (w *WAL) logCreateLambda(opts *types.CreateWorkloadMessage) (wal.Commit, error) {
return w.Log(eventCreateLambda, opts.WorkloadID)
}

// CreateWorkloadHandler indicates event handler for creating workload.
type CreateWorkloadHandler struct {
event string
calcium *Calcium
}

func newCreateWorkloadHandler(cal *Calcium) *CreateWorkloadHandler {
return &CreateWorkloadHandler{
event: eventWorkloadCreated,
calcium: cal,
}
}

// Event .
func (h *CreateWorkloadHandler) Event() string {
return h.event
}

// Check .
func (h *CreateWorkloadHandler) Check(ctx context.Context, raw interface{}) (handle bool, err error) {
_, ok := raw.(*types.Workload)
if !ok {
return false, types.NewDetailedErr(types.ErrInvalidType, raw)
}
return true, nil
}

// Encode .
func (h *CreateWorkloadHandler) Encode(raw interface{}) ([]byte, error) {
wrk, ok := raw.(*types.Workload)
if !ok {
return nil, types.NewDetailedErr(types.ErrInvalidType, raw)
}
return json.Marshal(wrk)
}

// Decode .
func (h *CreateWorkloadHandler) Decode(bs []byte) (interface{}, error) {
wrk := &types.Workload{}
err := json.Unmarshal(bs, wrk)
return wrk, err
}

// Handle will remove instance, remove meta, restore resource
func (h *CreateWorkloadHandler) Handle(ctx context.Context, raw interface{}) (err error) {
wrk, _ := raw.(*types.Workload)
logger := log.WithField("WAL.Handle", "CreateWorkload").WithField("ID", wrk.ID).WithField("nodename", wrk.Nodename)

ctx, cancel := getReplayContext(ctx)
defer cancel()

if _, err = h.calcium.GetWorkload(ctx, wrk.ID); err == nil {
// workload meta exists
ch, err := h.calcium.RemoveWorkload(ctx, []string{wrk.ID}, true, 0)
if err != nil {
return logger.Err(ctx, err)
}
for msg := range ch {
if !msg.Success {
logger.Errorf(ctx, "failed to remove workload")
}
}
logger.Infof(ctx, "workload with meta removed")
return nil
}

// workload meta doesn't exist
node, err := h.calcium.GetNode(ctx, wrk.Nodename)
if err != nil {
return logger.Err(ctx, err)
}
if err = node.Engine.VirtualizationRemove(ctx, wrk.ID, true, true); err != nil && !errors.Is(err, types.ErrWorkloadNotExists) {
return logger.Err(ctx, err)
}

logger.Infof(ctx, "workload removed")
return nil
}

// CreateLambdaHandler indicates event handler for creating lambda.
type CreateLambdaHandler struct {
event string
typ string
calcium *Calcium
}

func newCreateLambdaHandler(cal *Calcium) *CreateLambdaHandler {
func newCreateLambdaHandler(calcium *Calcium) *CreateLambdaHandler {
return &CreateLambdaHandler{
event: eventCreateLambda,
calcium: cal,
typ: eventCreateLambda,
calcium: calcium,
}
}

// Event .
func (h *CreateLambdaHandler) Event() string {
return h.event
func (h *CreateLambdaHandler) Typ() string {
return h.typ
}

// Check .
Expand Down Expand Up @@ -202,26 +117,90 @@ func (h *CreateLambdaHandler) Handle(ctx context.Context, raw interface{}) error
return nil
}

func getReplayContext(ctx context.Context) (context.Context, context.CancelFunc) {
return context.WithTimeout(ctx, time.Second*32)
// CreateWorkloadHandler indicates event handler for creating workload.
type CreateWorkloadHandler struct {
typ string
calcium *Calcium
}

func newCreateWorkloadHandler(calcium *Calcium) *CreateWorkloadHandler {
return &CreateWorkloadHandler{
typ: eventWorkloadCreated,
calcium: calcium,
}
}

// Event .
func (h *CreateWorkloadHandler) Typ() string {
return h.typ
}

// Check .
func (h *CreateWorkloadHandler) Check(ctx context.Context, raw interface{}) (handle bool, err error) {
_, ok := raw.(*types.Workload)
if !ok {
return false, types.NewDetailedErr(types.ErrInvalidType, raw)
}
return true, nil
}

// Encode .
func (h *CreateWorkloadHandler) Encode(raw interface{}) ([]byte, error) {
wrk, ok := raw.(*types.Workload)
if !ok {
return nil, types.NewDetailedErr(types.ErrInvalidType, raw)
}
return json.Marshal(wrk)
}

// Decode .
func (h *CreateWorkloadHandler) Decode(bs []byte) (interface{}, error) {
wrk := &types.Workload{}
err := json.Unmarshal(bs, wrk)
return wrk, err
}

// Handle will remove instance, remove meta, restore resource
func (h *CreateWorkloadHandler) Handle(ctx context.Context, raw interface{}) (err error) {
wrk, _ := raw.(*types.Workload)
logger := log.WithField("WAL.Handle", "CreateWorkload").WithField("ID", wrk.ID).WithField("nodename", wrk.Nodename)

ctx, cancel := getReplayContext(ctx)
defer cancel()

if _, err = h.calcium.GetWorkload(ctx, wrk.ID); err == nil {
return h.calcium.doRemoveWorkloadSync(ctx, []string{wrk.ID})
}

// workload meta doesn't exist
node, err := h.calcium.GetNode(ctx, wrk.Nodename)
if err != nil {
return logger.Err(ctx, err)
}
if err = node.Engine.VirtualizationRemove(ctx, wrk.ID, true, true); err != nil && !errors.Is(err, types.ErrWorkloadNotExists) {
return logger.Err(ctx, err)
}

logger.Infof(ctx, "workload removed")
return nil
}

// WorkloadResourceAllocatedHandler .
type WorkloadResourceAllocatedHandler struct {
event string
typ string
calcium *Calcium
}

func newWorkloadResourceAllocatedHandler(cal *Calcium) *WorkloadResourceAllocatedHandler {
func newWorkloadResourceAllocatedHandler(calcium *Calcium) *WorkloadResourceAllocatedHandler {
return &WorkloadResourceAllocatedHandler{
event: eventWorkloadResourceAllocated,
calcium: cal,
typ: eventWorkloadResourceAllocated,
calcium: calcium,
}
}

// Event .
func (h *WorkloadResourceAllocatedHandler) Event() string {
return h.event
func (h *WorkloadResourceAllocatedHandler) Typ() string {
return h.typ
}

// Check .
Expand Down Expand Up @@ -276,20 +255,20 @@ func (h *WorkloadResourceAllocatedHandler) Handle(ctx context.Context, raw inter

// ProcessingCreatedHandler .
type ProcessingCreatedHandler struct {
event string
typ string
calcium *Calcium
}

func newProcessingCreatedHandler(cal *Calcium) *ProcessingCreatedHandler {
func newProcessingCreatedHandler(calcium *Calcium) *ProcessingCreatedHandler {
return &ProcessingCreatedHandler{
event: eventProcessingCreated,
calcium: cal,
typ: eventProcessingCreated,
calcium: calcium,
}
}

// Event .
func (h *ProcessingCreatedHandler) Event() string {
return h.event
func (h *ProcessingCreatedHandler) Typ() string {
return h.typ
}

// Check .
Expand Down Expand Up @@ -329,3 +308,7 @@ func (h *ProcessingCreatedHandler) Handle(ctx context.Context, raw interface{})
logger.Infof(ctx, "obsolete processing deleted")
return
}

func getReplayContext(ctx context.Context) (context.Context, context.CancelFunc) {
return context.WithTimeout(ctx, time.Second*32) // TODO why 32?
}
Loading

0 comments on commit 43d7381

Please sign in to comment.