From 7dfa6731974ab77dbe70c3c7616efa2a0794e927 Mon Sep 17 00:00:00 2001 From: anrs Date: Wed, 17 Feb 2021 11:58:15 +0800 Subject: [PATCH] feat: WAL for creating labmda --- cluster/calcium/calcium.go | 9 ++- cluster/calcium/calcium_test.go | 45 +++++++++-- cluster/calcium/create_test.go | 133 +++++++++++++++++--------------- cluster/calcium/lambda.go | 32 +++++++- cluster/calcium/lambda_test.go | 56 ++++++++++++++ cluster/calcium/wal.go | 125 ++++++++++++++++++++++++++++++ cluster/calcium/wal_test.go | 69 +++++++++++++++++ types/errors.go | 1 + wal/hydro.go | 8 +- wal/hydro_test.go | 28 ++++--- wal/mocks/WAL.go | 32 +++++++- wal/wal.go | 52 +++++++++---- wal/wal_test.go | 12 +-- 13 files changed, 498 insertions(+), 104 deletions(-) create mode 100644 cluster/calcium/lambda_test.go create mode 100644 cluster/calcium/wal.go create mode 100644 cluster/calcium/wal_test.go diff --git a/cluster/calcium/calcium.go b/cluster/calcium/calcium.go index 7f5096f37..ca0e4a1d7 100644 --- a/cluster/calcium/calcium.go +++ b/cluster/calcium/calcium.go @@ -24,6 +24,7 @@ type Calcium struct { scheduler scheduler.Scheduler source source.Source watcher discovery.Service + wal *WAL } // New returns a new cluster config @@ -52,11 +53,17 @@ func New(config types.Config, embeddedStorage bool) (*Calcium, error) { default: log.Warn("[Calcium] SCM not set, build API disabled") } + if err != nil { + log.Errorf("[Calcium] SCAM failed: %v", err) + return nil, err + } // set watcher watcher := helium.New(config.GRPCConfig, store) - return &Calcium{store: store, config: config, scheduler: potassium, source: scm, watcher: watcher}, err + cal := &Calcium{store: store, config: config, scheduler: potassium, source: scm, watcher: watcher} + cal.wal, err = newCalciumWAL(cal) + return cal, err } // Finalizer use for defer diff --git a/cluster/calcium/calcium_test.go b/cluster/calcium/calcium_test.go index 45f1b19b8..d50e67f31 100644 --- a/cluster/calcium/calcium_test.go +++ b/cluster/calcium/calcium_test.go @@ -3,6 +3,8 @@ package calcium import ( "context" "io/ioutil" + "os" + "path/filepath" "sync" "testing" "time" @@ -32,6 +34,11 @@ func (d *dummyLock) Unlock(ctx context.Context) error { } func NewTestCluster() *Calcium { + walDir, err := ioutil.TempDir(os.TempDir(), "core.wal.*") + if err != nil { + panic(err) + } + c := &Calcium{} c.config = types.Config{ GlobalTimeout: 30 * time.Second, @@ -42,34 +49,60 @@ func NewTestCluster() *Calcium { MaxShare: -1, ShareBase: 100, }, + WALFile: filepath.Join(walDir, "core.wal.log"), } c.store = &storemocks.Store{} c.scheduler = &schedulermocks.Scheduler{} c.source = &sourcemocks.Source{} + + wal, err := newCalciumWAL(c) + if err != nil { + panic(err) + } + c.wal = wal + return c } func TestNewCluster(t *testing.T) { - _, err := New(types.Config{}, false) + config := types.Config{WALFile: "/tmp/a"} + _, err := New(config, false) assert.Error(t, err) - c, err := New(types.Config{}, true) + + c, err := New(config, true) assert.NoError(t, err) + c.Finalizer() privFile, err := ioutil.TempFile("", "priv") assert.NoError(t, err) _, err = privFile.WriteString("privkey") assert.NoError(t, err) defer privFile.Close() + + config.Git = types.GitConfig{PrivateKey: privFile.Name()} + + var wg sync.WaitGroup + wg.Add(1) go func() { - c, err := New(types.Config{Git: types.GitConfig{SCMType: "gitlab", PrivateKey: privFile.Name()}}, true) - assert.NoError(t, err) + defer wg.Done() + config.Git.SCMType = "gitlab" + config.WALFile = "/tmp/b" + c, err := New(config, true) + assert.NoError(t, err, err) c.Finalizer() }() + + wg.Add(1) go func() { - c, err := New(types.Config{Git: types.GitConfig{SCMType: "github", PrivateKey: privFile.Name()}}, true) - assert.NoError(t, err) + defer wg.Done() + config.WALFile = "/tmp/c" + config.Git.SCMType = "github" + c, err := New(config, true) + assert.NoError(t, err, err) c.Finalizer() }() + + wg.Wait() } func TestFinalizer(t *testing.T) { diff --git a/cluster/calcium/create_test.go b/cluster/calcium/create_test.go index 14f3b7a4c..adfc4fee3 100644 --- a/cluster/calcium/create_test.go +++ b/cluster/calcium/create_test.go @@ -81,7 +81,7 @@ func TestCreateWorkload(t *testing.T) { } func TestCreateWorkloadTxn(t *testing.T) { - c := NewTestCluster() + c, nodes := newCreateWorkloadCluster(t) ctx := context.Background() opts := &types.DeployOptions{ Name: "zc:name", @@ -94,69 +94,12 @@ func TestCreateWorkloadTxn(t *testing.T) { Name: "good-entrypoint", }, } - store := &storemocks.Store{} - sche := &schedulermocks.Scheduler{} - scheduler.InitSchedulerV1(sche) - c.store = store - c.scheduler = sche - engine := &enginemocks.API{} - - pod1 := &types.Pod{Name: "p1"} - node1 := &types.Node{ - NodeMeta: types.NodeMeta{ - Name: "n1", - }, - Engine: engine, - } - node2 := &types.Node{ - NodeMeta: types.NodeMeta{ - Name: "n2", - }, - Engine: engine, - } - nodes := []*types.Node{node1, node2} - - store.On("SaveProcessing", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil) - store.On("UpdateProcessing", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil) - store.On("DeleteProcessing", mock.Anything, mock.Anything, mock.Anything).Return(nil) - // doAllocResource fails: MakeDeployStatus - lock := &lockmocks.DistributedLock{} - lock.On("Lock", mock.Anything).Return(context.Background(), nil) - lock.On("Unlock", mock.Anything).Return(nil) - store.On("CreateLock", mock.Anything, mock.Anything).Return(lock, nil) - store.On("GetPod", mock.Anything, mock.Anything).Return(pod1, nil) - store.On("GetNodesByPod", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nodes, nil) - store.On("GetNode", - mock.AnythingOfType("*context.emptyCtx"), - mock.AnythingOfType("string"), - ).Return( - func(_ context.Context, name string) (node *types.Node) { - node = node1 - if name == "n2" { - node = node2 - } - return - }, nil) - sche.On("SelectStorageNodes", mock.AnythingOfType("[]resourcetypes.ScheduleInfo"), mock.AnythingOfType("int64")).Return(func(scheduleInfos []resourcetypes.ScheduleInfo, _ int64) []resourcetypes.ScheduleInfo { - return scheduleInfos - }, len(nodes), nil) - sche.On("SelectStorageNodes", mock.AnythingOfType("[]types.ScheduleInfo"), mock.AnythingOfType("int64")).Return(func(scheduleInfos []resourcetypes.ScheduleInfo, _ int64) []resourcetypes.ScheduleInfo { - return scheduleInfos - }, len(nodes), nil) - sche.On("SelectVolumeNodes", mock.AnythingOfType("[]types.ScheduleInfo"), mock.AnythingOfType("types.VolumeBindings")).Return(func(scheduleInfos []resourcetypes.ScheduleInfo, _ types.VolumeBindings) []resourcetypes.ScheduleInfo { - return scheduleInfos - }, nil, len(nodes), nil) - sche.On("SelectMemoryNodes", mock.AnythingOfType("[]types.ScheduleInfo"), mock.AnythingOfType("float64"), mock.AnythingOfType("int64")).Return( - func(scheduleInfos []resourcetypes.ScheduleInfo, _ float64, _ int64) []resourcetypes.ScheduleInfo { - for i := range scheduleInfos { - scheduleInfos[i].Capacity = 1 - } - return scheduleInfos - }, len(nodes), nil) + store := c.store.(*storemocks.Store) store.On("MakeDeployStatus", mock.Anything, mock.Anything, mock.Anything).Return( errors.Wrap(context.DeadlineExceeded, "MakeDeployStatus"), ).Once() + ch, err := c.CreateWorkload(ctx, opts) assert.Nil(t, err) cnt := 0 @@ -168,7 +111,6 @@ func TestCreateWorkloadTxn(t *testing.T) { assert.EqualValues(t, 1, cnt) // commit resource changes fails: UpdateNodes - store.On("GetNodesByPod", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nodes, nil) store.On("MakeDeployStatus", mock.Anything, mock.Anything, mock.Anything).Return(nil) old := strategy.Plans[strategy.Auto] strategy.Plans[strategy.Auto] = func(sis []strategy.Info, need, total, _ int) (map[string]int, error) { @@ -191,6 +133,7 @@ func TestCreateWorkloadTxn(t *testing.T) { assert.Error(t, m.Error, "UpdateNodes1") } assert.EqualValues(t, 1, cnt) + node1, node2 := nodes[0], nodes[1] assert.EqualValues(t, 1, node1.CPUUsed) assert.EqualValues(t, 1, node2.CPUUsed) node1.CPUUsed = 0 @@ -221,6 +164,7 @@ func TestCreateWorkloadTxn(t *testing.T) { } return }, nil) + engine := node1.Engine.(*enginemocks.API) engine.On("ImageLocalDigests", mock.Anything, mock.Anything).Return(nil, errors.Wrap(context.DeadlineExceeded, "ImageLocalDigest")).Twice() engine.On("ImagePull", mock.Anything, mock.Anything, mock.Anything).Return(nil, errors.Wrap(context.DeadlineExceeded, "ImagePull")).Twice() store.On("UpdateProcessing", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil) @@ -299,3 +243,70 @@ func TestCreateWorkloadTxn(t *testing.T) { assert.EqualValues(t, 1, node1.CPUUsed+node2.CPUUsed) return } + +func newCreateWorkloadCluster(t *testing.T) (*Calcium, []*types.Node) { + c := NewTestCluster() + c.store = &storemocks.Store{} + c.scheduler = &schedulermocks.Scheduler{} + scheduler.InitSchedulerV1(c.scheduler) + + engine := &enginemocks.API{} + pod1 := &types.Pod{Name: "p1"} + node1 := &types.Node{ + NodeMeta: types.NodeMeta{ + Name: "n1", + }, + Engine: engine, + } + node2 := &types.Node{ + NodeMeta: types.NodeMeta{ + Name: "n2", + }, + Engine: engine, + } + nodes := []*types.Node{node1, node2} + + store := c.store.(*storemocks.Store) + store.On("SaveProcessing", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil) + store.On("UpdateProcessing", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil) + store.On("DeleteProcessing", mock.Anything, mock.Anything, mock.Anything).Return(nil) + + // doAllocResource fails: MakeDeployStatus + lock := &lockmocks.DistributedLock{} + lock.On("Lock", mock.Anything).Return(context.Background(), nil) + lock.On("Unlock", mock.Anything).Return(nil) + store.On("CreateLock", mock.Anything, mock.Anything).Return(lock, nil) + store.On("GetPod", mock.Anything, mock.Anything).Return(pod1, nil) + store.On("GetNodesByPod", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nodes, nil) + store.On("GetNode", + mock.AnythingOfType("*context.emptyCtx"), + mock.AnythingOfType("string"), + ).Return( + func(_ context.Context, name string) (node *types.Node) { + node = node1 + if name == "n2" { + node = node2 + } + return + }, nil) + + sche := c.scheduler.(*schedulermocks.Scheduler) + sche.On("SelectStorageNodes", mock.AnythingOfType("[]resourcetypes.ScheduleInfo"), mock.AnythingOfType("int64")).Return(func(scheduleInfos []resourcetypes.ScheduleInfo, _ int64) []resourcetypes.ScheduleInfo { + return scheduleInfos + }, len(nodes), nil) + sche.On("SelectStorageNodes", mock.AnythingOfType("[]types.ScheduleInfo"), mock.AnythingOfType("int64")).Return(func(scheduleInfos []resourcetypes.ScheduleInfo, _ int64) []resourcetypes.ScheduleInfo { + return scheduleInfos + }, len(nodes), nil) + sche.On("SelectVolumeNodes", mock.AnythingOfType("[]types.ScheduleInfo"), mock.AnythingOfType("types.VolumeBindings")).Return(func(scheduleInfos []resourcetypes.ScheduleInfo, _ types.VolumeBindings) []resourcetypes.ScheduleInfo { + return scheduleInfos + }, nil, len(nodes), nil) + sche.On("SelectMemoryNodes", mock.AnythingOfType("[]types.ScheduleInfo"), mock.AnythingOfType("float64"), mock.AnythingOfType("int64")).Return( + func(scheduleInfos []resourcetypes.ScheduleInfo, _ float64, _ int64) []resourcetypes.ScheduleInfo { + for i := range scheduleInfos { + scheduleInfos[i].Capacity = 1 + } + return scheduleInfos + }, len(nodes), nil) + + return c, nodes +} diff --git a/cluster/calcium/lambda.go b/cluster/calcium/lambda.go index 41760c85b..f1399549a 100644 --- a/cluster/calcium/lambda.go +++ b/cluster/calcium/lambda.go @@ -7,14 +7,20 @@ import ( "strconv" "sync" + "github.com/google/uuid" + enginetypes "github.com/projecteru2/core/engine/types" "github.com/projecteru2/core/log" "github.com/projecteru2/core/strategy" "github.com/projecteru2/core/types" "github.com/projecteru2/core/utils" + "github.com/projecteru2/core/wal" ) -const exitDataPrefix = "[exitcode] " +const ( + exitDataPrefix = "[exitcode] " + labelLambdaID = "LambdaID" +) // RunAndWait implement lambda func (c *Calcium) RunAndWait(ctx context.Context, opts *types.DeployOptions, inCh <-chan []byte) (<-chan *types.AttachWorkloadMessage, error) { @@ -28,6 +34,10 @@ func (c *Calcium) RunAndWait(ctx context.Context, opts *types.DeployOptions, inC return nil, types.ErrRunAndWaitCountOneWithStdin } + commit, err := c.walCreateLambda(ctx, opts) + if err != nil { + return nil, err + } createChan, err := c.CreateWorkload(ctx, opts) if err != nil { log.Errorf("[RunAndWait] Create workload error %s", err) @@ -113,8 +123,28 @@ func (c *Calcium) RunAndWait(ctx context.Context, opts *types.DeployOptions, inC go func() { defer close(runMsgCh) wg.Wait() + if err := commit(context.Background()); err != nil { + log.Errorf("[RunAndWait] Commit WAL %s failed", eventCreateLambda) + } log.Info("[RunAndWait] Finish run and wait for workloads") }() return runMsgCh, nil } + +func (c *Calcium) walCreateLambda(ctx context.Context, opts *types.DeployOptions) (wal.Commit, error) { + uid, err := uuid.NewRandom() + if err != nil { + return nil, err + } + + lambdaID := uid.String() + + if opts.Labels != nil { + opts.Labels[labelLambdaID] = lambdaID + } else { + opts.Labels = map[string]string{labelLambdaID: lambdaID} + } + + return c.wal.logCreateLambda(ctx, opts) +} diff --git a/cluster/calcium/lambda_test.go b/cluster/calcium/lambda_test.go new file mode 100644 index 000000000..92de78cd0 --- /dev/null +++ b/cluster/calcium/lambda_test.go @@ -0,0 +1,56 @@ +package calcium + +import ( + "context" + "fmt" + "testing" + + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" + + storemocks "github.com/projecteru2/core/store/mocks" + "github.com/projecteru2/core/strategy" + "github.com/projecteru2/core/types" + "github.com/projecteru2/core/wal" + walmocks "github.com/projecteru2/core/wal/mocks" +) + +func TestRunAndWaitFailedThenWALCommitted(t *testing.T) { + c, _ := newCreateWorkloadCluster(t) + c.wal = &WAL{WAL: &walmocks.WAL{}} + + mwal := c.wal.WAL.(*walmocks.WAL) + defer mwal.AssertExpectations(t) + var walCommitted bool + commit := wal.Commit(func(context.Context) error { + walCommitted = true + return nil + }) + mwal.On("Log", mock.Anything, eventCreateLambda, mock.Anything).Return(commit, nil).Once() + + opts := &types.DeployOptions{ + Name: "zc:name", + Count: 2, + DeployStrategy: strategy.Auto, + Podname: "p1", + ResourceOpts: types.ResourceOptions{CPUQuotaLimit: 1}, + Image: "zc:test", + Entrypoint: &types.Entrypoint{ + Name: "good-entrypoint", + }, + } + + mstore := c.store.(*storemocks.Store) + mstore.On("MakeDeployStatus", mock.Anything, mock.Anything, mock.Anything).Return(fmt.Errorf("err")).Once() + + ch, err := c.RunAndWait(context.Background(), opts, make(chan []byte)) + require.NoError(t, err) + require.NotNil(t, ch) + require.False(t, walCommitted) + require.Nil(t, <-ch) // recv nil due to the ch will be closed. + + lambdaID, exists := opts.Labels[labelLambdaID] + require.True(t, exists) + require.True(t, len(lambdaID) > 1) + require.True(t, walCommitted) +} diff --git a/cluster/calcium/wal.go b/cluster/calcium/wal.go new file mode 100644 index 000000000..fa1672b1d --- /dev/null +++ b/cluster/calcium/wal.go @@ -0,0 +1,125 @@ +package calcium + +import ( + "context" + "encoding/json" + + "github.com/projecteru2/core/log" + "github.com/projecteru2/core/types" + "github.com/projecteru2/core/wal" +) + +const ( + eventCreateLambda = "create-lambda" +) + +// WAL for calcium. +type WAL struct { + wal.WAL + config types.Config + calcium *Calcium +} + +func newCalciumWAL(cal *Calcium) (*WAL, error) { + w := &WAL{ + WAL: wal.NewHydro(), + config: cal.config, + calcium: cal, + } + if err := w.WAL.Open(context.Background(), w.config.WALFile, w.config.WALOpenTimeout); err != nil { + return nil, err + } + + w.registerHandlers() + + return w, nil +} + +func (w *WAL) registerHandlers() { + w.Register(newCreateLambdaHandler(w.calcium)) +} + +func (w *WAL) logCreateLambda(ctx context.Context, opts *types.DeployOptions) (wal.Commit, error) { + return w.Log(ctx, eventCreateLambda, &types.ListWorkloadsOptions{ + Appname: opts.Name, + Entrypoint: opts.Entrypoint.Name, + Labels: map[string]string{labelLambdaID: opts.Labels[labelLambdaID]}, + }) +} + +// CreateLambdaHandler indicates event handler for creating lambda. +type CreateLambdaHandler struct { + event string + calcium *Calcium +} + +func newCreateLambdaHandler(cal *Calcium) *CreateLambdaHandler { + return &CreateLambdaHandler{ + event: eventCreateLambda, + calcium: cal, + } +} + +// Event . +func (h *CreateLambdaHandler) Event() string { + return h.event +} + +// Check . +func (h *CreateLambdaHandler) Check(interface{}) (bool, error) { + return true, nil +} + +// Encode . +func (h *CreateLambdaHandler) Encode(raw interface{}) ([]byte, error) { + opts, ok := raw.(*types.ListWorkloadsOptions) + if !ok { + return nil, types.NewDetailedErr(types.ErrInvalidType, raw) + } + return json.Marshal(opts) +} + +// Decode . +func (h *CreateLambdaHandler) Decode(bs []byte) (interface{}, error) { + opts := &types.ListWorkloadsOptions{} + err := json.Unmarshal(bs, opts) + return opts, err +} + +// Handle . +func (h *CreateLambdaHandler) Handle(raw interface{}) error { + opts, ok := raw.(*types.ListWorkloadsOptions) + if !ok { + return types.NewDetailedErr(types.ErrInvalidType, raw) + } + + workloadIDs, err := h.getWorkloadIDs(opts) + if err != nil { + log.Errorf("[CreateLambdaHandler.Handle] Get workloads %s/%s/%v failed: %v", + opts.Appname, opts.Entrypoint, opts.Labels, err) + return err + } + + if err := h.calcium.doRemoveWorkloadSync(context.Background(), workloadIDs); err != nil { + log.Errorf("[CreateLambdaHandler.Handle] Remove lambda %v failed: %v", opts, err) + return err + } + + log.Infof("[CreateLambdaHandler.Handle] Lambda %v removed", opts) + + return nil +} + +func (h *CreateLambdaHandler) getWorkloadIDs(opts *types.ListWorkloadsOptions) ([]string, error) { + workloads, err := h.calcium.ListWorkloads(context.Background(), opts) + if err != nil { + return nil, err + } + + workloadIDs := make([]string, len(workloads)) + for i, wrk := range workloads { + workloadIDs[i] = wrk.ID + } + + return workloadIDs, nil +} diff --git a/cluster/calcium/wal_test.go b/cluster/calcium/wal_test.go new file mode 100644 index 000000000..6bc11b2d0 --- /dev/null +++ b/cluster/calcium/wal_test.go @@ -0,0 +1,69 @@ +package calcium + +import ( + "context" + "testing" + + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" + + enginemocks "github.com/projecteru2/core/engine/mocks" + lockmocks "github.com/projecteru2/core/lock/mocks" + storemocks "github.com/projecteru2/core/store/mocks" + "github.com/projecteru2/core/types" +) + +func TestHandleCreateLambda(t *testing.T) { + c := NewTestCluster() + deployOpts := &types.DeployOptions{ + Name: "appname", + Entrypoint: &types.Entrypoint{Name: "entry"}, + Labels: map[string]string{labelLambdaID: "lambda"}, + } + _, err := c.wal.logCreateLambda(context.Background(), deployOpts) + require.NoError(t, err) + + node := &types.Node{ + NodeMeta: types.NodeMeta{Name: "nodename"}, + Engine: &enginemocks.API{}, + } + wrk := &types.Workload{ + ID: "workloadid", + Nodename: node.Name, + Engine: node.Engine, + } + + store := c.store.(*storemocks.Store) + defer store.AssertExpectations(t) + store.On("ListWorkloads", mock.Anything, deployOpts.Name, deployOpts.Entrypoint.Name, "", int64(0), deployOpts.Labels). + Return([]*types.Workload{wrk}, nil). + Once() + store.On("GetWorkloads", mock.Anything, []string{wrk.ID}). + Return([]*types.Workload{wrk}, nil). + Once() + store.On("GetNode", mock.Anything, wrk.Nodename). + Return(node, nil) + + eng := wrk.Engine.(*enginemocks.API) + defer eng.AssertExpectations(t) + eng.On("VirtualizationRemove", mock.Anything, wrk.ID, true, true). + Return(nil). + Once() + + store.On("RemoveWorkload", mock.Anything, wrk). + Return(nil). + Once() + store.On("UpdateNodeResource", mock.Anything, node, mock.Anything, mock.Anything). + Return(nil). + Once() + + lock := &lockmocks.DistributedLock{} + lock.On("Lock", mock.Anything).Return(context.Background(), nil) + lock.On("Unlock", mock.Anything).Return(nil) + store.On("CreateLock", mock.Anything, mock.Anything).Return(lock, nil) + + c.wal.Recover(context.Background()) + + // Recovered nothing. + c.wal.Recover(context.Background()) +} diff --git a/types/errors.go b/types/errors.go index 8518001e1..987f24d8d 100644 --- a/types/errors.go +++ b/types/errors.go @@ -84,6 +84,7 @@ var ( ErrUnregisteredWALEventType = errors.New("unregistered WAL event type") ErrInvalidWALBucket = errors.New("invalid WAL bucket") + ErrInvalidType = errors.New("invalid type") ErrLockSessionDone = errors.New("lock session done") ) diff --git a/wal/hydro.go b/wal/hydro.go index 0c4417521..0b6f6b98c 100644 --- a/wal/hydro.go +++ b/wal/hydro.go @@ -38,7 +38,7 @@ func (h *Hydro) Close(ctx context.Context) error { // Register registers a new event handler. func (h *Hydro) Register(handler EventHandler) { - h.handlers.Store(handler.Event, handler) + h.handlers.Store(handler.Event(), handler) } // Recover starts a disaster recovery, which will replay all the events. @@ -78,7 +78,11 @@ func (h *Hydro) recover(ctx context.Context, handler EventHandler, event HydroEv return event.Delete(ctx) } - return handler.Handle(item) + if err := handler.Handle(item); err != nil { + return err + } + + return event.Delete(ctx) } // Log records a log item. diff --git a/wal/hydro_test.go b/wal/hydro_test.go index a92e675e3..9f82188d5 100644 --- a/wal/hydro_test.go +++ b/wal/hydro_test.go @@ -21,7 +21,7 @@ func TestLogFailedAsEncodeError(t *testing.T) { var checked, handled, encoded, decoded bool eventype := "create" handler := newTestEventHandler(eventype, &checked, &handled, &encoded, &decoded) - handler.Encode = func(interface{}) ([]byte, error) { return nil, fmt.Errorf("encode error") } + handler.encode = func(interface{}) ([]byte, error) { return nil, fmt.Errorf("encode error") } hydro := NewHydro() hydro.kv = kv.NewMockedKV() @@ -82,7 +82,7 @@ func TestRecoverFailedAsCheckError(t *testing.T) { var checked, handled, encoded, decoded bool eventype := "create" handler := newTestEventHandler(eventype, &checked, &handled, &encoded, &decoded) - handler.Check = func(interface{}) (bool, error) { + handler.check = func(interface{}) (bool, error) { checked = true return false, fmt.Errorf("check error") } @@ -129,7 +129,7 @@ func TestRecoverFailedAsDecodeLogError(t *testing.T) { var checked, handled, encoded, decoded bool eventype := "create" handler := newTestEventHandler(eventype, &checked, &handled, &encoded, &decoded) - handler.Decode = func([]byte) (interface{}, error) { + handler.decode = func([]byte) (interface{}, error) { decoded = true return nil, fmt.Errorf("decode error") } @@ -158,7 +158,7 @@ func TestHydroRecoverDiscardNoNeedEvent(t *testing.T) { eventype := "create" handler := newTestEventHandler(eventype, &checked, &handled, &encoded, &decoded) - handler.Check = check + handler.check = check hydro := NewHydro() hydro.kv = kv.NewMockedKV() @@ -193,6 +193,12 @@ func TestHydroRecover(t *testing.T) { require.True(t, decoded) require.True(t, checked) require.True(t, handled) + + // The handled events should be removed. + ch, _ := hydro.kv.Scan(context.Background(), []byte(EventPrefix)) + for range ch { + require.Fail(t, "the events should be deleted") + } } func TestHydroEventKeyMustPadZero(t *testing.T) { @@ -206,7 +212,7 @@ func TestHydroEventParseIDShouldRemovePadding(t *testing.T) { require.Equal(t, uint64(15), id) } -func newTestEventHandler(eventype string, checked, handled, encoded, decoded *bool) EventHandler { +func newTestEventHandler(eventype string, checked, handled, encoded, decoded *bool) SimpleEventHandler { check := func(interface{}) (bool, error) { *checked = true return true, nil @@ -227,11 +233,11 @@ func newTestEventHandler(eventype string, checked, handled, encoded, decoded *bo return } - return EventHandler{ - Event: eventype, - Encode: encode, - Decode: decode, - Check: check, - Handle: handle, + return SimpleEventHandler{ + event: eventype, + encode: encode, + decode: decode, + check: check, + handle: handle, } } diff --git a/wal/mocks/WAL.go b/wal/mocks/WAL.go index 47b1b4f47..e80eb8485 100644 --- a/wal/mocks/WAL.go +++ b/wal/mocks/WAL.go @@ -4,9 +4,11 @@ package mocks import ( context "context" + time "time" - wal "github.com/projecteru2/core/wal" mock "github.com/stretchr/testify/mock" + + wal "github.com/projecteru2/core/wal" ) // WAL is an autogenerated mock type for the WAL type @@ -14,6 +16,20 @@ type WAL struct { mock.Mock } +// Close provides a mock function with given fields: _a0 +func (_m *WAL) Close(_a0 context.Context) error { + ret := _m.Called(_a0) + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context) error); ok { + r0 = rf(_a0) + } else { + r0 = ret.Error(0) + } + + return r0 +} + // Log provides a mock function with given fields: _a0, _a1, _a2 func (_m *WAL) Log(_a0 context.Context, _a1 string, _a2 interface{}) (wal.Commit, error) { ret := _m.Called(_a0, _a1, _a2) @@ -37,6 +53,20 @@ func (_m *WAL) Log(_a0 context.Context, _a1 string, _a2 interface{}) (wal.Commit return r0, r1 } +// Open provides a mock function with given fields: _a0, _a1, _a2 +func (_m *WAL) Open(_a0 context.Context, _a1 string, _a2 time.Duration) error { + ret := _m.Called(_a0, _a1, _a2) + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context, string, time.Duration) error); ok { + r0 = rf(_a0, _a1, _a2) + } else { + r0 = ret.Error(0) + } + + return r0 +} + // Recover provides a mock function with given fields: _a0 func (_m *WAL) Recover(_a0 context.Context) { _m.Called(_a0) diff --git a/wal/wal.go b/wal/wal.go index 8260a6216..4a386576f 100644 --- a/wal/wal.go +++ b/wal/wal.go @@ -39,26 +39,48 @@ type OpenCloser interface { Close(context.Context) error } -// EventHandler indicates a handler of a specific event. -type EventHandler struct { - Event string - Check Check - Encode Encode - Decode Decode - Handle Handle +// SimpleEventHandler simply implements the EventHandler. +type SimpleEventHandler struct { + event string + check func(raw interface{}) (bool, error) + encode func(interface{}) ([]byte, error) + decode func([]byte) (interface{}, error) + handle func(interface{}) error } -// Encode is a function to encode a log item -type Encode func(interface{}) ([]byte, error) +// Event . +func (h SimpleEventHandler) Event() string { + return h.event +} + +// Check . +func (h SimpleEventHandler) Check(raw interface{}) (bool, error) { + return h.check(raw) +} + +// Encode . +func (h SimpleEventHandler) Encode(raw interface{}) ([]byte, error) { + return h.encode(raw) +} -// Decode is a function to decode bytes to an interface{} -type Decode func([]byte) (interface{}, error) +// Decode . +func (h SimpleEventHandler) Decode(bs []byte) (interface{}, error) { + return h.decode(bs) +} -// Handle is a function to play a log item. -type Handle func(interface{}) error +// Handler . +func (h SimpleEventHandler) Handle(raw interface{}) error { + return h.handle(raw) +} -// Check is a function for checking a log item whether need to be played it. -type Check func(interface{}) (need bool, err error) +// EventHandler is the interface that groups a few methods. +type EventHandler interface { + Event() string + Check(interface{}) (need bool, err error) + Encode(interface{}) ([]byte, error) + Decode([]byte) (interface{}, error) + Handle(interface{}) error +} // Commit is a function for committing an event log. type Commit func(context.Context) error diff --git a/wal/wal_test.go b/wal/wal_test.go index 74d9d70b3..52d05ff2a 100644 --- a/wal/wal_test.go +++ b/wal/wal_test.go @@ -49,12 +49,12 @@ func TestRecover(t *testing.T) { eventype := "create" - Register(EventHandler{ - Event: eventype, - Encode: encode, - Decode: decode, - Check: check, - Handle: handle, + Register(SimpleEventHandler{ + event: eventype, + encode: encode, + decode: decode, + check: check, + handle: handle, }) Log(context.Background(), eventype, struct{}{})