From 38221ae57873c71fe0f698c617cf8369baaaaf1d Mon Sep 17 00:00:00 2001 From: CMGS Date: Fri, 11 Mar 2022 13:49:29 +0800 Subject: [PATCH] refactor wal2 --- 3rdmocks/ServerStream.go | 2 +- Makefile | 1 + cluster/calcium/calcium.go | 2 +- cluster/calcium/lambda.go | 7 +- cluster/calcium/wal.go | 200 +++++++++-------- cluster/calcium/wal_test.go | 12 +- engine/docker/mocks/APIClient.go | 2 +- engine/mocks/API.go | 2 +- lock/mocks/DistributedLock.go | 2 +- rpc/mocks/CoreRPC_RunAndWaitServer.go | 2 +- scheduler/mocks/Scheduler.go | 2 +- source/mocks/Source.go | 2 +- store/etcdv3/meta/mocks/ETCDClientV3.go | 2 +- store/etcdv3/meta/mocks/KV.go | 2 +- store/etcdv3/meta/mocks/Txn.go | 2 +- store/mocks/Store.go | 2 +- types/errors.go | 1 + wal/hydro.go | 113 +++++----- wal/hydro_event.go | 62 ------ wal/hydro_test.go | 179 ++++++++-------- wal/kv/lithium.go | 6 - wal/kv/mocked.go | 68 +++--- wal/mocks/WAL.go | 20 +- wal/wal.go | 47 +--- wal/wal_test.go | 58 ++++- wal2/event.go | 42 ++++ wal2/hydro.go | 146 +++++++++++++ wal2/hydro_test.go | 271 ++++++++++++++++++++++++ wal2/wal.go | 49 +++++ wal2/wal_test.go | 104 +++++++++ 30 files changed, 960 insertions(+), 450 deletions(-) delete mode 100644 wal/hydro_event.go create mode 100644 wal2/event.go create mode 100644 wal2/hydro.go create mode 100644 wal2/hydro_test.go create mode 100644 wal2/wal.go create mode 100644 wal2/wal_test.go diff --git a/3rdmocks/ServerStream.go b/3rdmocks/ServerStream.go index 3f2cebdfc..66087d461 100644 --- a/3rdmocks/ServerStream.go +++ b/3rdmocks/ServerStream.go @@ -1,4 +1,4 @@ -// Code generated by mockery v0.0.0-dev. DO NOT EDIT. +// Code generated by mockery v2.10.0. DO NOT EDIT. package mocks diff --git a/Makefile b/Makefile index b9a620496..30fc33116 100644 --- a/Makefile +++ b/Makefile @@ -36,6 +36,7 @@ mock: deps mockery --dir store --output store/mocks --name Store mockery --dir engine --output engine/mocks --name API mockery --dir cluster --output cluster/mocks --name Cluster + mockery --dir wal --output wal/mocks --name WAL mockery --dir lock --output lock/mocks --name DistributedLock mockery --dir store/etcdv3/meta --output store/etcdv3/meta/mocks --all mockery --dir vendor/go.etcd.io/etcd/client/v3 --output store/etcdv3/meta/mocks --name Txn diff --git a/cluster/calcium/calcium.go b/cluster/calcium/calcium.go index 51f295e3c..cfb36601c 100644 --- a/cluster/calcium/calcium.go +++ b/cluster/calcium/calcium.go @@ -68,7 +68,7 @@ func New(config types.Config, t *testing.T) (*Calcium, error) { watcher := helium.New(config.GRPCConfig, store) cal := &Calcium{store: store, config: config, scheduler: potassium, source: scm, watcher: watcher} - cal.wal, err = newCalciumWAL(cal) + cal.wal, err = newWAL(config, cal) cal.identifier = config.Identifier() return cal, logger.Err(nil, errors.WithStack(err)) //nolint diff --git a/cluster/calcium/lambda.go b/cluster/calcium/lambda.go index 90449a193..1fa5d4112 100644 --- a/cluster/calcium/lambda.go +++ b/cluster/calcium/lambda.go @@ -13,7 +13,6 @@ import ( "github.com/projecteru2/core/strategy" "github.com/projecteru2/core/types" "github.com/projecteru2/core/utils" - "github.com/projecteru2/core/wal" "github.com/pkg/errors" ) @@ -68,7 +67,7 @@ func (c *Calcium) RunAndWait(ctx context.Context, opts *types.DeployOptions, inC } } - commit, err := c.walCreateLambda(message) + commit, err := c.wal.Log(eventCreateLambda, message.WorkloadID) if err != nil { return &types.AttachWorkloadMessage{ WorkloadID: message.WorkloadID, @@ -197,7 +196,3 @@ func (c *Calcium) RunAndWait(ctx context.Context, opts *types.DeployOptions, inC return workloadIDs, runMsgCh, nil } - -func (c *Calcium) walCreateLambda(opts *types.CreateWorkloadMessage) (wal.Commit, error) { - return c.wal.logCreateLambda(opts) -} diff --git a/cluster/calcium/wal.go b/cluster/calcium/wal.go index e97c5cab0..cba192a46 100644 --- a/cluster/calcium/wal.go +++ b/cluster/calcium/wal.go @@ -22,23 +22,21 @@ const ( // WAL for calcium. type WAL struct { wal.WAL - config types.Config calcium *Calcium } -func newCalciumWAL(cal *Calcium) (*WAL, error) { - w := &WAL{ - WAL: wal.NewHydro(), - config: cal.config, - calcium: cal, +func newWAL(config types.Config, calcium *Calcium) (*WAL, error) { + hydro, err := wal.NewHydro(config.WALFile, config.WALOpenTimeout) + if err != nil { + return nil, err } - if err := w.WAL.Open(w.config.WALFile, w.config.WALOpenTimeout); err != nil { - return nil, err + w := &WAL{ + WAL: hydro, + calcium: calcium, } w.registerHandlers() - return w, nil } @@ -49,26 +47,92 @@ func (w *WAL) registerHandlers() { w.Register(newProcessingCreatedHandler(w.calcium)) } -func (w *WAL) logCreateLambda(opts *types.CreateWorkloadMessage) (wal.Commit, error) { - return w.Log(eventCreateLambda, opts.WorkloadID) +// CreateLambdaHandler indicates event handler for creating lambda. +type CreateLambdaHandler struct { + typ string + calcium *Calcium +} + +func newCreateLambdaHandler(calcium *Calcium) *CreateLambdaHandler { + return &CreateLambdaHandler{ + typ: eventCreateLambda, + calcium: calcium, + } +} + +// Event . +func (h *CreateLambdaHandler) Typ() string { + return h.typ +} + +// Check . +func (h *CreateLambdaHandler) Check(context.Context, interface{}) (bool, error) { + return true, nil +} + +// Encode . +func (h *CreateLambdaHandler) Encode(raw interface{}) ([]byte, error) { + workloadID, ok := raw.(string) + if !ok { + return nil, types.NewDetailedErr(types.ErrInvalidType, raw) + } + return []byte(workloadID), nil +} + +// Decode . +func (h *CreateLambdaHandler) Decode(bs []byte) (interface{}, error) { + return string(bs), nil +} + +// Handle . +func (h *CreateLambdaHandler) Handle(ctx context.Context, raw interface{}) error { + workloadID, ok := raw.(string) + if !ok { + return types.NewDetailedErr(types.ErrInvalidType, raw) + } + + logger := log.WithField("WAL.Handle", "RunAndWait").WithField("ID", workloadID) + go func() { + workload, err := h.calcium.GetWorkload(ctx, workloadID) + if err != nil { + logger.Errorf(ctx, "Get workload failed: %v", err) + return + } + + r, err := workload.Engine.VirtualizationWait(ctx, workloadID, "") + if err != nil { + logger.Errorf(ctx, "Wait failed: %+v", err) + return + } + if r.Code != 0 { + logger.Errorf(ctx, "Run failed: %s", r.Message) + } + + if err := h.calcium.doRemoveWorkloadSync(ctx, []string{workloadID}); err != nil { + logger.Errorf(ctx, "Remove failed: %+v", err) + } + logger.Infof(ctx, "waited and removed") + }() + + return nil } // CreateWorkloadHandler indicates event handler for creating workload. type CreateWorkloadHandler struct { - event string + typ string calcium *Calcium } -func newCreateWorkloadHandler(cal *Calcium) *CreateWorkloadHandler { +func newCreateWorkloadHandler(calcium *Calcium) *CreateWorkloadHandler { return &CreateWorkloadHandler{ - event: eventWorkloadCreated, - calcium: cal, + typ: eventWorkloadCreated, + calcium: calcium, } } // Event . -func (h *CreateWorkloadHandler) Event() string { - return h.event +func (h *CreateWorkloadHandler) Typ() string { + return h.typ } // Check . @@ -132,96 +196,22 @@ func (h *CreateWorkloadHandler) Handle(ctx context.Context, raw interface{}) (er return nil } -// CreateLambdaHandler indicates event handler for creating lambda. -type CreateLambdaHandler struct { - event string - calcium *Calcium -} - -func newCreateLambdaHandler(cal *Calcium) *CreateLambdaHandler { - return &CreateLambdaHandler{ - event: eventCreateLambda, - calcium: cal, - } -} - -// Event . -func (h *CreateLambdaHandler) Event() string { - return h.event -} - -// Check . -func (h *CreateLambdaHandler) Check(context.Context, interface{}) (bool, error) { - return true, nil -} - -// Encode . -func (h *CreateLambdaHandler) Encode(raw interface{}) ([]byte, error) { - workloadID, ok := raw.(string) - if !ok { - return nil, types.NewDetailedErr(types.ErrInvalidType, raw) - } - return []byte(workloadID), nil -} - -// Decode . -func (h *CreateLambdaHandler) Decode(bs []byte) (interface{}, error) { - return string(bs), nil -} - -// Handle . -func (h *CreateLambdaHandler) Handle(ctx context.Context, raw interface{}) error { - workloadID, ok := raw.(string) - if !ok { - return types.NewDetailedErr(types.ErrInvalidType, raw) - } - - logger := log.WithField("WAL.Handle", "RunAndWait").WithField("ID", workloadID) - go func() { - workload, err := h.calcium.GetWorkload(ctx, workloadID) - if err != nil { - logger.Errorf(ctx, "Get workload failed: %v", err) - return - } - - r, err := workload.Engine.VirtualizationWait(ctx, workloadID, "") - if err != nil { - logger.Errorf(ctx, "Wait failed: %+v", err) - return - } - if r.Code != 0 { - logger.Errorf(ctx, "Run failed: %s", r.Message) - } - - if err := h.calcium.doRemoveWorkloadSync(ctx, []string{workloadID}); err != nil { - logger.Errorf(ctx, "Remove failed: %+v", err) - } - logger.Infof(ctx, "waited and removed") - }() - - return nil -} - -func getReplayContext(ctx context.Context) (context.Context, context.CancelFunc) { - return context.WithTimeout(ctx, time.Second*32) -} - // WorkloadResourceAllocatedHandler . type WorkloadResourceAllocatedHandler struct { - event string + typ string calcium *Calcium } -func newWorkloadResourceAllocatedHandler(cal *Calcium) *WorkloadResourceAllocatedHandler { +func newWorkloadResourceAllocatedHandler(calcium *Calcium) *WorkloadResourceAllocatedHandler { return &WorkloadResourceAllocatedHandler{ - event: eventWorkloadResourceAllocated, - calcium: cal, + typ: eventWorkloadResourceAllocated, + calcium: calcium, } } // Event . -func (h *WorkloadResourceAllocatedHandler) Event() string { - return h.event +func (h *WorkloadResourceAllocatedHandler) Typ() string { + return h.typ } // Check . @@ -276,20 +266,20 @@ func (h *WorkloadResourceAllocatedHandler) Handle(ctx context.Context, raw inter // ProcessingCreatedHandler . type ProcessingCreatedHandler struct { - event string + typ string calcium *Calcium } -func newProcessingCreatedHandler(cal *Calcium) *ProcessingCreatedHandler { +func newProcessingCreatedHandler(calcium *Calcium) *ProcessingCreatedHandler { return &ProcessingCreatedHandler{ - event: eventProcessingCreated, - calcium: cal, + typ: eventProcessingCreated, + calcium: calcium, } } // Event . -func (h *ProcessingCreatedHandler) Event() string { - return h.event +func (h *ProcessingCreatedHandler) Typ() string { + return h.typ } // Check . @@ -329,3 +319,7 @@ func (h *ProcessingCreatedHandler) Handle(ctx context.Context, raw interface{}) logger.Infof(ctx, "obsolete processing deleted") return } + +func getReplayContext(ctx context.Context) (context.Context, context.CancelFunc) { + return context.WithTimeout(ctx, time.Second*32) // TODO why 32? +} diff --git a/cluster/calcium/wal_test.go b/cluster/calcium/wal_test.go index 24f8ee952..45f5dafc0 100644 --- a/cluster/calcium/wal_test.go +++ b/cluster/calcium/wal_test.go @@ -18,7 +18,7 @@ import ( func TestHandleCreateWorkloadNoHandle(t *testing.T) { c := NewTestCluster() - wal, err := newCalciumWAL(c) + wal, err := newWAL(c.config, c) require.NoError(t, err) c.wal = wal @@ -43,7 +43,7 @@ func TestHandleCreateWorkloadNoHandle(t *testing.T) { func TestHandleCreateWorkloadError(t *testing.T) { c := NewTestCluster() - wal, err := newCalciumWAL(c) + wal, err := newWAL(c.config, c) require.NoError(t, err) c.wal = wal @@ -90,7 +90,7 @@ func TestHandleCreateWorkloadError(t *testing.T) { func TestHandleCreateWorkloadHandled(t *testing.T) { c := NewTestCluster() - wal, err := newCalciumWAL(c) + wal, err := newWAL(c.config, c) require.NoError(t, err) c.wal = wal @@ -130,11 +130,11 @@ func TestHandleCreateWorkloadHandled(t *testing.T) { func TestHandleCreateLambda(t *testing.T) { c := NewTestCluster() - wal, err := newCalciumWAL(c) + wal, err := newWAL(c.config, c) require.NoError(t, err) c.wal = wal - _, err = c.wal.logCreateLambda(&types.CreateWorkloadMessage{WorkloadID: "workloadid"}) + _, err = c.wal.Log(eventCreateLambda, "workloadid") require.NoError(t, err) node := &types.Node{ @@ -153,7 +153,7 @@ func TestHandleCreateLambda(t *testing.T) { time.Sleep(500 * time.Millisecond) store.AssertExpectations(t) - _, err = c.wal.logCreateLambda(&types.CreateWorkloadMessage{WorkloadID: "workloadid"}) + _, err = c.wal.Log(eventCreateLambda, "workloadid") require.NoError(t, err) store.On("GetWorkload", mock.Anything, mock.Anything). Return(wrk, nil). diff --git a/engine/docker/mocks/APIClient.go b/engine/docker/mocks/APIClient.go index 3783fd42e..1515db221 100644 --- a/engine/docker/mocks/APIClient.go +++ b/engine/docker/mocks/APIClient.go @@ -1,4 +1,4 @@ -// Code generated by mockery v0.0.0-dev. DO NOT EDIT. +// Code generated by mockery v2.10.0. DO NOT EDIT. package mocks diff --git a/engine/mocks/API.go b/engine/mocks/API.go index cb994fb9c..98610ba4c 100644 --- a/engine/mocks/API.go +++ b/engine/mocks/API.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.9.4. DO NOT EDIT. +// Code generated by mockery v2.10.0. DO NOT EDIT. package mocks diff --git a/lock/mocks/DistributedLock.go b/lock/mocks/DistributedLock.go index 4b941e5f1..3415ee31e 100644 --- a/lock/mocks/DistributedLock.go +++ b/lock/mocks/DistributedLock.go @@ -1,4 +1,4 @@ -// Code generated by mockery v0.0.0-dev. DO NOT EDIT. +// Code generated by mockery v2.10.0. DO NOT EDIT. package mocks diff --git a/rpc/mocks/CoreRPC_RunAndWaitServer.go b/rpc/mocks/CoreRPC_RunAndWaitServer.go index 9377f73e7..9bea5e81a 100644 --- a/rpc/mocks/CoreRPC_RunAndWaitServer.go +++ b/rpc/mocks/CoreRPC_RunAndWaitServer.go @@ -1,4 +1,4 @@ -// Code generated by mockery v0.0.0-dev. DO NOT EDIT. +// Code generated by mockery v2.10.0. DO NOT EDIT. package mocks diff --git a/scheduler/mocks/Scheduler.go b/scheduler/mocks/Scheduler.go index 83b4f41ec..9892b71f6 100644 --- a/scheduler/mocks/Scheduler.go +++ b/scheduler/mocks/Scheduler.go @@ -1,4 +1,4 @@ -// Code generated by mockery v0.0.0-dev. DO NOT EDIT. +// Code generated by mockery v2.10.0. DO NOT EDIT. package mocks diff --git a/source/mocks/Source.go b/source/mocks/Source.go index ac84f3a54..bcc485e9a 100644 --- a/source/mocks/Source.go +++ b/source/mocks/Source.go @@ -1,4 +1,4 @@ -// Code generated by mockery v0.0.0-dev. DO NOT EDIT. +// Code generated by mockery v2.10.0. DO NOT EDIT. package mocks diff --git a/store/etcdv3/meta/mocks/ETCDClientV3.go b/store/etcdv3/meta/mocks/ETCDClientV3.go index 28a4221c8..63c94abc4 100644 --- a/store/etcdv3/meta/mocks/ETCDClientV3.go +++ b/store/etcdv3/meta/mocks/ETCDClientV3.go @@ -1,4 +1,4 @@ -// Code generated by mockery v0.0.0-dev. DO NOT EDIT. +// Code generated by mockery v2.10.0. DO NOT EDIT. package mocks diff --git a/store/etcdv3/meta/mocks/KV.go b/store/etcdv3/meta/mocks/KV.go index 6125d4552..d73abf562 100644 --- a/store/etcdv3/meta/mocks/KV.go +++ b/store/etcdv3/meta/mocks/KV.go @@ -1,4 +1,4 @@ -// Code generated by mockery v0.0.0-dev. DO NOT EDIT. +// Code generated by mockery v2.10.0. DO NOT EDIT. package mocks diff --git a/store/etcdv3/meta/mocks/Txn.go b/store/etcdv3/meta/mocks/Txn.go index 15e96cfc3..2c4fbb168 100644 --- a/store/etcdv3/meta/mocks/Txn.go +++ b/store/etcdv3/meta/mocks/Txn.go @@ -1,4 +1,4 @@ -// Code generated by mockery v0.0.0-dev. DO NOT EDIT. +// Code generated by mockery v2.10.0. DO NOT EDIT. package mocks diff --git a/store/mocks/Store.go b/store/mocks/Store.go index e01aa70eb..becfdd1f7 100644 --- a/store/mocks/Store.go +++ b/store/mocks/Store.go @@ -1,4 +1,4 @@ -// Code generated by mockery v0.0.0-dev. DO NOT EDIT. +// Code generated by mockery v2.10.0. DO NOT EDIT. package mocks diff --git a/types/errors.go b/types/errors.go index ef180d205..b0e4294bb 100644 --- a/types/errors.go +++ b/types/errors.go @@ -87,6 +87,7 @@ var ( ErrEntityNotExists = errors.New("entity not exists") ErrUnregisteredWALEventType = errors.New("unregistered WAL event type") + ErrBadWALEvent = errors.New("bad WAL event type") ErrInvalidWALBucket = errors.New("invalid WAL bucket") ErrInvalidType = errors.New("invalid type") ErrLockSessionDone = errors.New("lock session done") diff --git a/wal/hydro.go b/wal/hydro.go index 00fa481f3..a6c73560c 100644 --- a/wal/hydro.go +++ b/wal/hydro.go @@ -3,92 +3,100 @@ package wal import ( "context" "encoding/json" - "sync" "time" + "github.com/cornelk/hashmap" "github.com/projecteru2/core/log" coretypes "github.com/projecteru2/core/types" "github.com/projecteru2/core/wal/kv" ) +const ( + fileMode = 0600 +) + // Hydro is the simplest wal implementation. type Hydro struct { - handlers sync.Map - kv kv.KV + hashmap.HashMap + stor kv.KV } // NewHydro initailizes a new Hydro instance. -func NewHydro() *Hydro { - return &Hydro{ - kv: kv.NewLithium(), +func NewHydro(path string, timeout time.Duration) (*Hydro, error) { + stor := kv.NewLithium() + if err := stor.Open(path, fileMode, timeout); err != nil { + return nil, err } -} - -// Open connects a kvdb. -func (h *Hydro) Open(path string, timeout time.Duration) (err error) { - err = h.kv.Open(path, 0600, timeout) - return + return &Hydro{HashMap: hashmap.HashMap{}, stor: stor}, nil } // Close disconnects the kvdb. func (h *Hydro) Close() error { - return h.kv.Close() + return h.stor.Close() } // Register registers a new event handler. func (h *Hydro) Register(handler EventHandler) { - h.handlers.Store(handler.Event(), handler) + h.Set(handler.Typ(), handler) } // Recover starts a disaster recovery, which will replay all the events. func (h *Hydro) Recover(ctx context.Context) { - ch, _ := h.kv.Scan([]byte(EventPrefix)) + ch, _ := h.stor.Scan([]byte(eventPrefix)) events := []HydroEvent{} - for ent := range ch { - ev, err := h.decodeEvent(ent) + for scanEntry := range ch { + event, err := h.decodeEvent(scanEntry) if err != nil { - log.Errorf(nil, "[Recover] decode event error: %v", err) //nolint + log.Errorf(nil, "[Recover] decode event error: %v", err) // nolint continue } - events = append(events, ev) + events = append(events, event) } - for _, ev := range events { - handler, ok := h.getEventHandler(ev.Type) + for _, event := range events { + handler, ok := h.getEventHandler(event.Type) if !ok { - log.Errorf(nil, "[Recover] no such event handler for %s", ev.Type) //nolint + log.Errorf(nil, "[Recover] no such event handler for %s", event.Type) // nolint continue } - if err := h.recover(ctx, handler, ev); err != nil { - log.Errorf(nil, "[Recover] handle event %d (%s) failed: %v", ev.ID, ev.Type, err) //nolint + if err := h.recover(ctx, handler, event); err != nil { + log.Errorf(nil, "[Recover] handle event %d (%s) failed: %v", event.ID, event.Type, err) // nolint continue } } } // Log records a log item. -func (h *Hydro) Log(eventype string, item interface{}) (Commit, error) { - handler, ok := h.getEventHandler(eventype) +func (h *Hydro) Log(eventyp string, item interface{}) (Commit, error) { + handler, ok := h.getEventHandler(eventyp) if !ok { - return nil, coretypes.NewDetailedErr(coretypes.ErrUnregisteredWALEventType, eventype) + return nil, coretypes.NewDetailedErr(coretypes.ErrUnregisteredWALEventType, eventyp) } - bs, err := handler.Encode(item) + bs, err := handler.Encode(item) // TODO 2 times encode is necessary? if err != nil { return nil, err } - event := NewHydroEvent(h.kv) - event.Type = eventype - event.Item = bs + var id uint64 + if id, err = h.stor.NextSequence(); err != nil { + return nil, err + } + + event := NewHydroEvent(id, eventyp, bs) + if bs, err = event.Encode(); err != nil { + return nil, coretypes.ErrBadWALEvent + } - if err = event.Create(); err != nil { + if err = h.stor.Put(event.Key(), bs); err != nil { return nil, err } - return event.Delete, nil + return func() error { + return h.stor.Delete(event.Key()) + }, nil } func (h *Hydro) recover(ctx context.Context, handler EventHandler, event HydroEvent) error { @@ -97,43 +105,42 @@ func (h *Hydro) recover(ctx context.Context, handler EventHandler, event HydroEv return err } + delete := func() error { + return h.stor.Delete(event.Key()) + } + switch handle, err := handler.Check(ctx, item); { case err != nil: return err case !handle: - return event.Delete() - } - - if err := handler.Handle(ctx, item); err != nil { - return err + return delete() + default: + if err := handler.Handle(ctx, item); err != nil { + return err + } } - - return event.Delete() + return delete() } -func (h *Hydro) getEventHandler(event string) (handler EventHandler, ok bool) { - var raw interface{} - if raw, ok = h.handlers.Load(event); !ok { - return +func (h *Hydro) getEventHandler(eventyp string) (EventHandler, bool) { + v, ok := h.GetStringKey(eventyp) + if !ok { + return nil, ok } - - handler, ok = raw.(EventHandler) - - return + handler, ok := v.(EventHandler) + return handler, ok } -func (h *Hydro) decodeEvent(ent kv.ScanEntry) (event HydroEvent, err error) { - if err = ent.Error(); err != nil { +func (h *Hydro) decodeEvent(scanEntry kv.ScanEntry) (event HydroEvent, err error) { + if err = scanEntry.Error(); err != nil { return } - key, value := ent.Pair() + key, value := scanEntry.Pair() if err = json.Unmarshal(value, &event); err != nil { return } - event.kv = h.kv event.ID, err = parseHydroEventID(key) - return } diff --git a/wal/hydro_event.go b/wal/hydro_event.go deleted file mode 100644 index b4af274fc..000000000 --- a/wal/hydro_event.go +++ /dev/null @@ -1,62 +0,0 @@ -package wal - -import ( - "encoding/json" - "fmt" - "path/filepath" - "strconv" - "strings" - - "github.com/projecteru2/core/wal/kv" -) - -// HydroEvent indicates a log event. -type HydroEvent struct { - // A global unique identifier. - ID uint64 `json:"id"` - - // Registered event type name. - Type string `json:"type"` - - // The encoded log item. - Item []byte `json:"item"` - - kv kv.KV -} - -// NewHydroEvent initializes a new HydroEvent instance. -func NewHydroEvent(kv kv.KV) (e *HydroEvent) { - e = &HydroEvent{} - e.kv = kv - return -} - -// Create persists this event. -func (e *HydroEvent) Create() (err error) { - if e.ID, err = e.kv.NextSequence(); err != nil { - return - } - - var value []byte - if value, err = json.MarshalIndent(e, "", "\t"); err != nil { - return err - } - - return e.kv.Put(e.Key(), value) -} - -// Delete removes this event from persistence. -func (e HydroEvent) Delete() error { - return e.kv.Delete(e.Key()) -} - -// Key returns this event's key path. -func (e HydroEvent) Key() []byte { - return []byte(filepath.Join(EventPrefix, fmt.Sprintf("%016x", e.ID))) -} - -func parseHydroEventID(key []byte) (uint64, error) { - // Trims the EventPrefix, then trims the padding 0. - id := strings.TrimLeft(strings.TrimPrefix(string(key), EventPrefix), "0") - return strconv.ParseUint(id, 16, 64) -} diff --git a/wal/hydro_test.go b/wal/hydro_test.go index c383089b1..27d25894d 100644 --- a/wal/hydro_test.go +++ b/wal/hydro_test.go @@ -3,22 +3,20 @@ package wal import ( "context" "fmt" - "io/ioutil" - "os" - "path/filepath" + "path" "testing" "time" "github.com/projecteru2/core/wal/kv" - "github.com/stretchr/testify/require" + "github.com/stretchr/testify/assert" ) func TestLogFailedAsNoSuchHandler(t *testing.T) { - hydro := NewHydro() + hydro, _ := NewHydro(path.Join(t.TempDir(), "1"), time.Second) commit, err := hydro.Log("create", struct{}{}) - require.Error(t, err) - require.Nil(t, commit) + assert.Error(t, err) + assert.Nil(t, commit) } func TestLogFailedAsEncodeError(t *testing.T) { @@ -27,17 +25,17 @@ func TestLogFailedAsEncodeError(t *testing.T) { handler := newTestEventHandler(eventype, &checked, &handled, &encoded, &decoded) handler.encode = func(interface{}) ([]byte, error) { return nil, fmt.Errorf("encode error") } - hydro := NewHydro() - hydro.kv = kv.NewMockedKV() + hydro, _ := NewHydro(path.Join(t.TempDir(), "1"), time.Second) + hydro.stor = kv.NewMockedKV() hydro.Register(handler) commit, err := hydro.Log(eventype, struct{}{}) - require.Error(t, err) - require.Nil(t, commit) - require.False(t, encoded) - require.False(t, checked) - require.False(t, decoded) - require.False(t, handled) + assert.Error(t, err) + assert.Nil(t, commit) + assert.False(t, encoded) + assert.False(t, checked) + assert.False(t, decoded) + assert.False(t, handled) } func TestLogWithCommitEvent(t *testing.T) { @@ -45,19 +43,19 @@ func TestLogWithCommitEvent(t *testing.T) { eventype := "create" handler := newTestEventHandler(eventype, &checked, &handled, &encoded, &decoded) - hydro := NewHydro() - hydro.kv = kv.NewMockedKV() + hydro, _ := NewHydro(path.Join(t.TempDir(), "1"), time.Second) + hydro.stor = kv.NewMockedKV() hydro.Register(handler) commit, err := hydro.Log(eventype, struct{}{}) - require.NoError(t, err) - require.NotNil(t, commit) - - require.NoError(t, commit()) - require.True(t, encoded) - require.False(t, decoded) - require.False(t, checked) - require.False(t, handled) + assert.NoError(t, err) + assert.NotNil(t, commit) + + assert.NoError(t, commit()) + assert.True(t, encoded) + assert.False(t, decoded) + assert.False(t, checked) + assert.False(t, handled) } func TestRecoverFailedAsNoSuchHandler(t *testing.T) { @@ -65,21 +63,21 @@ func TestRecoverFailedAsNoSuchHandler(t *testing.T) { eventype := "create" handler := newTestEventHandler(eventype, &checked, &handled, &encoded, &decoded) - hydro := NewHydro() - hydro.kv = kv.NewMockedKV() + hydro, _ := NewHydro(path.Join(t.TempDir(), "1"), time.Second) + hydro.stor = kv.NewMockedKV() hydro.Register(handler) commit, err := hydro.Log(eventype, struct{}{}) - require.NoError(t, err) - require.NotNil(t, commit) + assert.NoError(t, err) + assert.NotNil(t, commit) - hydro.handlers.Delete(eventype) + hydro.Del(eventype) hydro.Recover(context.TODO()) - require.True(t, encoded) - require.False(t, decoded) - require.False(t, checked) - require.False(t, handled) + assert.True(t, encoded) + assert.False(t, decoded) + assert.False(t, checked) + assert.False(t, handled) } func TestRecoverFailedAsCheckError(t *testing.T) { @@ -91,42 +89,42 @@ func TestRecoverFailedAsCheckError(t *testing.T) { return false, fmt.Errorf("check error") } - hydro := NewHydro() - hydro.kv = kv.NewMockedKV() + hydro, _ := NewHydro(path.Join(t.TempDir(), "1"), time.Second) + hydro.stor = kv.NewMockedKV() hydro.Register(handler) commit, err := hydro.Log(eventype, struct{}{}) - require.NoError(t, err) - require.NotNil(t, commit) + assert.NoError(t, err) + assert.NotNil(t, commit) hydro.Recover(context.TODO()) - require.True(t, encoded) - require.True(t, decoded) - require.True(t, checked) - require.False(t, handled) + assert.True(t, encoded) + assert.True(t, decoded) + assert.True(t, checked) + assert.False(t, handled) } func TestDecodeEventFailedAsDecodeEntryError(t *testing.T) { - hydro := NewHydro() + hydro, _ := NewHydro(path.Join(t.TempDir(), "1"), time.Second) ent := kv.MockedScanEntry{Value: []byte("x")} _, err := hydro.decodeEvent(ent) - require.Error(t, err) + assert.Error(t, err) } func TestDecodeEventFailedAsInvalidEventID(t *testing.T) { - hydro := NewHydro() + hydro, _ := NewHydro(path.Join(t.TempDir(), "1"), time.Second) ent := kv.MockedScanEntry{Key: "/events/x", Value: []byte("{}")} _, err := hydro.decodeEvent(ent) - require.Error(t, err) + assert.Error(t, err) } func TestDecodeEventFailedAsEntryError(t *testing.T) { - hydro := NewHydro() + hydro, _ := NewHydro(path.Join(t.TempDir(), "1"), time.Second) expErr := fmt.Errorf("expects an error") ent := kv.MockedScanEntry{Err: expErr} _, err := hydro.decodeEvent(ent) - require.Error(t, err) - require.Equal(t, expErr.Error(), err.Error()) + assert.Error(t, err) + assert.Equal(t, expErr.Error(), err.Error()) } func TestRecoverFailedAsDecodeLogError(t *testing.T) { @@ -138,19 +136,19 @@ func TestRecoverFailedAsDecodeLogError(t *testing.T) { return nil, fmt.Errorf("decode error") } - hydro := NewHydro() - hydro.kv = kv.NewMockedKV() + hydro, _ := NewHydro(path.Join(t.TempDir(), "1"), time.Second) + hydro.stor = kv.NewMockedKV() hydro.Register(handler) commit, err := hydro.Log(eventype, struct{}{}) - require.NoError(t, err) - require.NotNil(t, commit) + assert.NoError(t, err) + assert.NotNil(t, commit) hydro.Recover(context.TODO()) - require.True(t, encoded) - require.True(t, decoded) - require.False(t, checked) - require.False(t, handled) + assert.True(t, encoded) + assert.True(t, decoded) + assert.False(t, checked) + assert.False(t, handled) } func TestHydroRecoverDiscardNoNeedEvent(t *testing.T) { @@ -164,19 +162,19 @@ func TestHydroRecoverDiscardNoNeedEvent(t *testing.T) { handler := newTestEventHandler(eventype, &checked, &handled, &encoded, &decoded) handler.check = check - hydro := NewHydro() - hydro.kv = kv.NewMockedKV() + hydro, _ := NewHydro(path.Join(t.TempDir(), "1"), time.Second) + hydro.stor = kv.NewMockedKV() hydro.Register(handler) commit, err := hydro.Log(eventype, struct{}{}) - require.NoError(t, err) - require.NotNil(t, commit) + assert.NoError(t, err) + assert.NotNil(t, commit) hydro.Recover(context.TODO()) - require.True(t, encoded) - require.True(t, decoded) - require.True(t, checked) - require.False(t, handled) + assert.True(t, encoded) + assert.True(t, decoded) + assert.True(t, checked) + assert.False(t, handled) } func TestHydroRecover(t *testing.T) { @@ -184,47 +182,44 @@ func TestHydroRecover(t *testing.T) { eventype := "create" handler := newTestEventHandler(eventype, &checked, &handled, &encoded, &decoded) - hydro := NewHydro() - hydro.kv = kv.NewMockedKV() + hydro, _ := NewHydro(path.Join(t.TempDir(), "1"), time.Second) + hydro.stor = kv.NewMockedKV() hydro.Register(handler) commit, err := hydro.Log(eventype, struct{}{}) - require.NoError(t, err) - require.NotNil(t, commit) + assert.NoError(t, err) + assert.NotNil(t, commit) hydro.Recover(context.TODO()) - require.True(t, encoded) - require.True(t, decoded) - require.True(t, checked) - require.True(t, handled) + assert.True(t, encoded) + assert.True(t, decoded) + assert.True(t, checked) + assert.True(t, handled) // The handled events should be removed. - ch, _ := hydro.kv.Scan([]byte(EventPrefix)) + ch, _ := hydro.stor.Scan([]byte(eventPrefix)) for range ch { - require.Fail(t, "the events should be deleted") + assert.Fail(t, "the events should be deleted") } } func TestHydroEventKeyMustPadZero(t *testing.T) { event := HydroEvent{ID: 15} - require.Equal(t, "/events/000000000000000f", string(event.Key())) + assert.Equal(t, "/events/000000000000000f", string(event.Key())) } func TestHydroEventParseIDShouldRemovePadding(t *testing.T) { id, err := parseHydroEventID([]byte("/events/00000000000000000000000000f")) - require.NoError(t, err) - require.Equal(t, uint64(15), id) + assert.NoError(t, err) + assert.Equal(t, uint64(15), id) } func TestHydroRecoverWithRealLithium(t *testing.T) { - dir, rmdir := tempdir(t) - defer rmdir() + p := path.Join(t.TempDir(), "temp.wal") + hydro, err := NewHydro(p, time.Second) + assert.NoError(t, err) - hydro := NewHydro() - // Uses a real Lithium instance rather than a mocked one. - require.NoError(t, hydro.Open(filepath.Join(dir, "temp.wal"), time.Second)) - - handler := SimpleEventHandler{ + handler := simpleEventHandler{ event: "create", encode: func(interface{}) ([]byte, error) { return []byte("{}"), nil }, decode: func([]byte) (interface{}, error) { return struct{}{}, nil }, @@ -239,19 +234,13 @@ func TestHydroRecoverWithRealLithium(t *testing.T) { hydro.Recover(context.TODO()) - ch, _ := hydro.kv.Scan([]byte(EventPrefix)) + ch, _ := hydro.stor.Scan([]byte(eventPrefix)) for range ch { - require.FailNow(t, "expects no data") + assert.FailNow(t, "expects no data") } } -func tempdir(t *testing.T) (string, func()) { - dir, err := ioutil.TempDir("", "temp.wal") - require.NoError(t, err) - return dir, func() { os.RemoveAll(dir) } -} - -func newTestEventHandler(eventype string, checked, handled, encoded, decoded *bool) SimpleEventHandler { +func newTestEventHandler(eventype string, checked, handled, encoded, decoded *bool) simpleEventHandler { check := func(interface{}) (bool, error) { *checked = true return true, nil @@ -272,7 +261,7 @@ func newTestEventHandler(eventype string, checked, handled, encoded, decoded *bo return } - return SimpleEventHandler{ + return simpleEventHandler{ event: eventype, encode: encode, decode: decode, diff --git a/wal/kv/lithium.go b/wal/kv/lithium.go index f1f368e27..98dc22f39 100644 --- a/wal/kv/lithium.go +++ b/wal/kv/lithium.go @@ -95,7 +95,6 @@ func (l *Lithium) Delete(key []byte) error { // Scan scans all the key/value pairs. func (l *Lithium) Scan(prefix []byte) (<-chan ScanEntry, func()) { ch := make(chan ScanEntry) - locked := make(chan struct{}) exit := make(chan struct{}) abort := func() { @@ -105,8 +104,6 @@ func (l *Lithium) Scan(prefix []byte) (<-chan ScanEntry, func()) { go func() { defer close(ch) - close(locked) - scan := func(bkt *bbolt.Bucket) error { c := bkt.Cursor() for key, value := c.First(); key != nil && bytes.HasPrefix(key, prefix); key, value = c.Next() { @@ -127,9 +124,6 @@ func (l *Lithium) Scan(prefix []byte) (<-chan ScanEntry, func()) { } }() - // Makes sure that the scan goroutine has been locked. - <-locked - return ch, abort } diff --git a/wal/kv/mocked.go b/wal/kv/mocked.go index 54b902025..1b3b912e4 100644 --- a/wal/kv/mocked.go +++ b/wal/kv/mocked.go @@ -6,12 +6,14 @@ import ( "strings" "sync" "time" + + "github.com/cornelk/hashmap" ) // MockedKV . type MockedKV struct { sync.Mutex - pool sync.Map + pool hashmap.HashMap nextSeq uint64 } @@ -29,16 +31,9 @@ func (m *MockedKV) Open(path string, mode os.FileMode, timeout time.Duration) er // Close . func (m *MockedKV) Close() error { - keys := []interface{}{} - m.pool.Range(func(key, _ interface{}) bool { - keys = append(keys, key) - return true - }) - - for _, key := range keys { - m.pool.Delete(key) + for kv := range m.pool.Iter() { + m.pool.Del(kv.Key) } - return nil } @@ -53,13 +48,13 @@ func (m *MockedKV) NextSequence() (nextSeq uint64, err error) { // Put . func (m *MockedKV) Put(key, value []byte) (err error) { - m.pool.Store(string(key), value) + m.pool.Set(string(key), value) return } // Get . func (m *MockedKV) Get(key []byte) (value []byte, err error) { - raw, ok := m.pool.Load(string(key)) + raw, ok := m.pool.GetStringKey(string(key)) if !ok { err = fmt.Errorf("no such key: %s", key) return @@ -74,7 +69,7 @@ func (m *MockedKV) Get(key []byte) (value []byte, err error) { // Delete . func (m *MockedKV) Delete(key []byte) (err error) { - m.pool.Delete(string(key)) + m.pool.Del(string(key)) return } @@ -89,35 +84,36 @@ func (m *MockedKV) Scan(prefix []byte) (<-chan ScanEntry, func()) { go func() { defer close(ch) + dataCh := m.pool.Iter() + + for { + select { + case <-exit: + return + case kv := <-dataCh: + var entry MockedScanEntry + var ok bool - m.pool.Range(func(rkey, rvalue interface{}) (next bool) { - var entry MockedScanEntry - defer func() { - select { - case <-exit: - next = false - case ch <- entry: - next = true + if kv.Key == nil { + return } - }() - var ok bool - if entry.Key, ok = rkey.(string); !ok { - entry.Err = fmt.Errorf("key must be a string, but %v", rkey) - return - } + if entry.Key, ok = kv.Key.(string); !ok { + entry.Err = fmt.Errorf("key must be a string, but %v", kv.Key) + continue + } - if !strings.HasPrefix(entry.Key, string(prefix)) { - return - } + if !strings.HasPrefix(entry.Key, string(prefix)) { + continue + } - if entry.Value, ok = rvalue.([]byte); !ok { - entry.Err = fmt.Errorf("value must be a []byte, but %v", rvalue) - return + if entry.Value, ok = kv.Value.([]byte); !ok { + entry.Err = fmt.Errorf("value must be a []byte, but %v", kv.Value) + continue + } + ch <- entry } - - return - }) + } }() return ch, abort diff --git a/wal/mocks/WAL.go b/wal/mocks/WAL.go index 890384403..b65a7f50e 100644 --- a/wal/mocks/WAL.go +++ b/wal/mocks/WAL.go @@ -1,14 +1,12 @@ -// Code generated by mockery v2.3.0. DO NOT EDIT. +// Code generated by mockery v2.10.0. DO NOT EDIT. package mocks import ( context "context" - time "time" - - mock "github.com/stretchr/testify/mock" wal "github.com/projecteru2/core/wal" + mock "github.com/stretchr/testify/mock" ) // WAL is an autogenerated mock type for the WAL type @@ -53,20 +51,6 @@ func (_m *WAL) Log(_a0 string, _a1 interface{}) (wal.Commit, error) { return r0, r1 } -// Open provides a mock function with given fields: _a0, _a1 -func (_m *WAL) Open(_a0 string, _a1 time.Duration) error { - ret := _m.Called(_a0, _a1) - - var r0 error - if rf, ok := ret.Get(0).(func(string, time.Duration) error); ok { - r0 = rf(_a0, _a1) - } else { - r0 = ret.Error(0) - } - - return r0 -} - // Recover provides a mock function with given fields: _a0 func (_m *WAL) Recover(_a0 context.Context) { _m.Called(_a0) diff --git a/wal/wal.go b/wal/wal.go index 45daec9d9..f9e4129fd 100644 --- a/wal/wal.go +++ b/wal/wal.go @@ -2,12 +2,10 @@ package wal import ( "context" - "time" ) const ( - // EventPrefix indicates the key prefix of all events' keys. - EventPrefix = "/events/" + eventPrefix = "/events/" ) // WAL is the interface that groups the Register and Recover interfaces. @@ -15,7 +13,7 @@ type WAL interface { Registry Recoverer Logger - OpenCloser + Closer } // Recoverer is the interface that wraps the basic Recover method. @@ -33,49 +31,14 @@ type Logger interface { Log(string, interface{}) (Commit, error) } -// OpenCloser is the interface that groups the basic Open and Close methods. -type OpenCloser interface { - Open(string, time.Duration) error +// Closer is the interface that groups the Close methods. +type Closer interface { Close() error } -// SimpleEventHandler simply implements the EventHandler. -type SimpleEventHandler struct { - event string - check func(raw interface{}) (bool, error) - encode func(interface{}) ([]byte, error) - decode func([]byte) (interface{}, error) - handle func(interface{}) error -} - -// Event . -func (h SimpleEventHandler) Event() string { - return h.event -} - -// Check . -func (h SimpleEventHandler) Check(ctx context.Context, raw interface{}) (bool, error) { - return h.check(raw) -} - -// Encode . -func (h SimpleEventHandler) Encode(raw interface{}) ([]byte, error) { - return h.encode(raw) -} - -// Decode . -func (h SimpleEventHandler) Decode(bs []byte) (interface{}, error) { - return h.decode(bs) -} - -// Handle . -func (h SimpleEventHandler) Handle(ctx context.Context, raw interface{}) error { - return h.handle(raw) -} - // EventHandler is the interface that groups a few methods. type EventHandler interface { - Event() string + Typ() string Check(context.Context, interface{}) (need bool, err error) Encode(interface{}) ([]byte, error) Decode([]byte) (interface{}, error) diff --git a/wal/wal_test.go b/wal/wal_test.go index 88f8bcacb..fcc96bd4d 100644 --- a/wal/wal_test.go +++ b/wal/wal_test.go @@ -8,7 +8,7 @@ import ( "github.com/projecteru2/core/wal/kv" - "github.com/stretchr/testify/require" + "github.com/stretchr/testify/assert" ) func TestRecover(t *testing.T) { @@ -39,18 +39,20 @@ func TestRecover(t *testing.T) { path := "/tmp/wal.unitest.wal" os.Remove(path) - var wal WAL = NewHydro() - require.NoError(t, wal.Open(path, time.Second)) + var wal WAL + var err error + wal, err = NewHydro(path, time.Second) + assert.NoError(t, err) defer wal.Close() hydro, ok := wal.(*Hydro) - require.True(t, ok) - require.NotNil(t, hydro) - hydro.kv = kv.NewMockedKV() + assert.True(t, ok) + assert.NotNil(t, hydro) + hydro.stor = kv.NewMockedKV() eventype := "create" - wal.Register(SimpleEventHandler{ + wal.Register(simpleEventHandler{ event: eventype, encode: encode, decode: decode, @@ -61,8 +63,42 @@ func TestRecover(t *testing.T) { wal.Log(eventype, struct{}{}) wal.Recover(context.TODO()) - require.True(t, checked) - require.True(t, handled) - require.True(t, encoded) - require.True(t, decoded) + assert.True(t, checked) + assert.True(t, handled) + assert.True(t, encoded) + assert.True(t, decoded) +} + +// simpleEventHandler simply implements the EventHandler. +type simpleEventHandler struct { + event string + check func(raw interface{}) (bool, error) + encode func(interface{}) ([]byte, error) + decode func([]byte) (interface{}, error) + handle func(interface{}) error +} + +// Event . +func (h simpleEventHandler) Typ() string { + return h.event +} + +// Check . +func (h simpleEventHandler) Check(ctx context.Context, raw interface{}) (bool, error) { + return h.check(raw) +} + +// Encode . +func (h simpleEventHandler) Encode(raw interface{}) ([]byte, error) { + return h.encode(raw) +} + +// Decode . +func (h simpleEventHandler) Decode(bs []byte) (interface{}, error) { + return h.decode(bs) +} + +// Handle . +func (h simpleEventHandler) Handle(ctx context.Context, raw interface{}) error { + return h.handle(raw) } diff --git a/wal2/event.go b/wal2/event.go new file mode 100644 index 000000000..6838a1029 --- /dev/null +++ b/wal2/event.go @@ -0,0 +1,42 @@ +package wal + +import ( + "encoding/json" + "fmt" + "path/filepath" + "strconv" + "strings" +) + +// HydroEvent indicates a log event. +type HydroEvent struct { + // A global unique identifier. + ID uint64 `json:"id"` + + // Registered event type name. + Type string `json:"type"` + + // The encoded log item. + Item []byte `json:"item"` +} + +// NewHydroEvent initializes a new HydroEvent instance. +func NewHydroEvent(ID uint64, typ string, item []byte) *HydroEvent { + return &HydroEvent{ID: ID, Type: typ, Item: item} +} + +// Encode this event +func (e HydroEvent) Encode() ([]byte, error) { + return json.MarshalIndent(e, "", "\t") +} + +// Key returns this event's key path. +func (e HydroEvent) Key() []byte { + return []byte(filepath.Join(eventPrefix, fmt.Sprintf("%016x", e.ID))) +} + +func parseHydroEventID(key []byte) (uint64, error) { + // Trims the EventPrefix, then trims the padding 0. + id := strings.TrimLeft(strings.TrimPrefix(string(key), eventPrefix), "0") + return strconv.ParseUint(id, 16, 64) +} diff --git a/wal2/hydro.go b/wal2/hydro.go new file mode 100644 index 000000000..a6c73560c --- /dev/null +++ b/wal2/hydro.go @@ -0,0 +1,146 @@ +package wal + +import ( + "context" + "encoding/json" + "time" + + "github.com/cornelk/hashmap" + "github.com/projecteru2/core/log" + coretypes "github.com/projecteru2/core/types" + "github.com/projecteru2/core/wal/kv" +) + +const ( + fileMode = 0600 +) + +// Hydro is the simplest wal implementation. +type Hydro struct { + hashmap.HashMap + stor kv.KV +} + +// NewHydro initailizes a new Hydro instance. +func NewHydro(path string, timeout time.Duration) (*Hydro, error) { + stor := kv.NewLithium() + if err := stor.Open(path, fileMode, timeout); err != nil { + return nil, err + } + return &Hydro{HashMap: hashmap.HashMap{}, stor: stor}, nil +} + +// Close disconnects the kvdb. +func (h *Hydro) Close() error { + return h.stor.Close() +} + +// Register registers a new event handler. +func (h *Hydro) Register(handler EventHandler) { + h.Set(handler.Typ(), handler) +} + +// Recover starts a disaster recovery, which will replay all the events. +func (h *Hydro) Recover(ctx context.Context) { + ch, _ := h.stor.Scan([]byte(eventPrefix)) + + events := []HydroEvent{} + for scanEntry := range ch { + event, err := h.decodeEvent(scanEntry) + if err != nil { + log.Errorf(nil, "[Recover] decode event error: %v", err) // nolint + continue + } + events = append(events, event) + } + + for _, event := range events { + handler, ok := h.getEventHandler(event.Type) + if !ok { + log.Errorf(nil, "[Recover] no such event handler for %s", event.Type) // nolint + continue + } + + if err := h.recover(ctx, handler, event); err != nil { + log.Errorf(nil, "[Recover] handle event %d (%s) failed: %v", event.ID, event.Type, err) // nolint + continue + } + } +} + +// Log records a log item. +func (h *Hydro) Log(eventyp string, item interface{}) (Commit, error) { + handler, ok := h.getEventHandler(eventyp) + if !ok { + return nil, coretypes.NewDetailedErr(coretypes.ErrUnregisteredWALEventType, eventyp) + } + + bs, err := handler.Encode(item) // TODO 2 times encode is necessary? + if err != nil { + return nil, err + } + + var id uint64 + if id, err = h.stor.NextSequence(); err != nil { + return nil, err + } + + event := NewHydroEvent(id, eventyp, bs) + if bs, err = event.Encode(); err != nil { + return nil, coretypes.ErrBadWALEvent + } + + if err = h.stor.Put(event.Key(), bs); err != nil { + return nil, err + } + + return func() error { + return h.stor.Delete(event.Key()) + }, nil +} + +func (h *Hydro) recover(ctx context.Context, handler EventHandler, event HydroEvent) error { + item, err := handler.Decode(event.Item) + if err != nil { + return err + } + + delete := func() error { + return h.stor.Delete(event.Key()) + } + + switch handle, err := handler.Check(ctx, item); { + case err != nil: + return err + case !handle: + return delete() + default: + if err := handler.Handle(ctx, item); err != nil { + return err + } + } + return delete() +} + +func (h *Hydro) getEventHandler(eventyp string) (EventHandler, bool) { + v, ok := h.GetStringKey(eventyp) + if !ok { + return nil, ok + } + handler, ok := v.(EventHandler) + return handler, ok +} + +func (h *Hydro) decodeEvent(scanEntry kv.ScanEntry) (event HydroEvent, err error) { + if err = scanEntry.Error(); err != nil { + return + } + + key, value := scanEntry.Pair() + if err = json.Unmarshal(value, &event); err != nil { + return + } + + event.ID, err = parseHydroEventID(key) + return +} diff --git a/wal2/hydro_test.go b/wal2/hydro_test.go new file mode 100644 index 000000000..27d25894d --- /dev/null +++ b/wal2/hydro_test.go @@ -0,0 +1,271 @@ +package wal + +import ( + "context" + "fmt" + "path" + "testing" + "time" + + "github.com/projecteru2/core/wal/kv" + + "github.com/stretchr/testify/assert" +) + +func TestLogFailedAsNoSuchHandler(t *testing.T) { + hydro, _ := NewHydro(path.Join(t.TempDir(), "1"), time.Second) + commit, err := hydro.Log("create", struct{}{}) + assert.Error(t, err) + assert.Nil(t, commit) +} + +func TestLogFailedAsEncodeError(t *testing.T) { + var checked, handled, encoded, decoded bool + eventype := "create" + handler := newTestEventHandler(eventype, &checked, &handled, &encoded, &decoded) + handler.encode = func(interface{}) ([]byte, error) { return nil, fmt.Errorf("encode error") } + + hydro, _ := NewHydro(path.Join(t.TempDir(), "1"), time.Second) + hydro.stor = kv.NewMockedKV() + hydro.Register(handler) + + commit, err := hydro.Log(eventype, struct{}{}) + assert.Error(t, err) + assert.Nil(t, commit) + assert.False(t, encoded) + assert.False(t, checked) + assert.False(t, decoded) + assert.False(t, handled) +} + +func TestLogWithCommitEvent(t *testing.T) { + var checked, handled, encoded, decoded bool + eventype := "create" + handler := newTestEventHandler(eventype, &checked, &handled, &encoded, &decoded) + + hydro, _ := NewHydro(path.Join(t.TempDir(), "1"), time.Second) + hydro.stor = kv.NewMockedKV() + hydro.Register(handler) + + commit, err := hydro.Log(eventype, struct{}{}) + assert.NoError(t, err) + assert.NotNil(t, commit) + + assert.NoError(t, commit()) + assert.True(t, encoded) + assert.False(t, decoded) + assert.False(t, checked) + assert.False(t, handled) +} + +func TestRecoverFailedAsNoSuchHandler(t *testing.T) { + var checked, handled, encoded, decoded bool + eventype := "create" + handler := newTestEventHandler(eventype, &checked, &handled, &encoded, &decoded) + + hydro, _ := NewHydro(path.Join(t.TempDir(), "1"), time.Second) + hydro.stor = kv.NewMockedKV() + hydro.Register(handler) + + commit, err := hydro.Log(eventype, struct{}{}) + assert.NoError(t, err) + assert.NotNil(t, commit) + + hydro.Del(eventype) + + hydro.Recover(context.TODO()) + assert.True(t, encoded) + assert.False(t, decoded) + assert.False(t, checked) + assert.False(t, handled) +} + +func TestRecoverFailedAsCheckError(t *testing.T) { + var checked, handled, encoded, decoded bool + eventype := "create" + handler := newTestEventHandler(eventype, &checked, &handled, &encoded, &decoded) + handler.check = func(interface{}) (bool, error) { + checked = true + return false, fmt.Errorf("check error") + } + + hydro, _ := NewHydro(path.Join(t.TempDir(), "1"), time.Second) + hydro.stor = kv.NewMockedKV() + hydro.Register(handler) + + commit, err := hydro.Log(eventype, struct{}{}) + assert.NoError(t, err) + assert.NotNil(t, commit) + + hydro.Recover(context.TODO()) + assert.True(t, encoded) + assert.True(t, decoded) + assert.True(t, checked) + assert.False(t, handled) +} + +func TestDecodeEventFailedAsDecodeEntryError(t *testing.T) { + hydro, _ := NewHydro(path.Join(t.TempDir(), "1"), time.Second) + ent := kv.MockedScanEntry{Value: []byte("x")} + _, err := hydro.decodeEvent(ent) + assert.Error(t, err) +} + +func TestDecodeEventFailedAsInvalidEventID(t *testing.T) { + hydro, _ := NewHydro(path.Join(t.TempDir(), "1"), time.Second) + ent := kv.MockedScanEntry{Key: "/events/x", Value: []byte("{}")} + _, err := hydro.decodeEvent(ent) + assert.Error(t, err) +} + +func TestDecodeEventFailedAsEntryError(t *testing.T) { + hydro, _ := NewHydro(path.Join(t.TempDir(), "1"), time.Second) + expErr := fmt.Errorf("expects an error") + ent := kv.MockedScanEntry{Err: expErr} + _, err := hydro.decodeEvent(ent) + assert.Error(t, err) + assert.Equal(t, expErr.Error(), err.Error()) +} + +func TestRecoverFailedAsDecodeLogError(t *testing.T) { + var checked, handled, encoded, decoded bool + eventype := "create" + handler := newTestEventHandler(eventype, &checked, &handled, &encoded, &decoded) + handler.decode = func([]byte) (interface{}, error) { + decoded = true + return nil, fmt.Errorf("decode error") + } + + hydro, _ := NewHydro(path.Join(t.TempDir(), "1"), time.Second) + hydro.stor = kv.NewMockedKV() + hydro.Register(handler) + + commit, err := hydro.Log(eventype, struct{}{}) + assert.NoError(t, err) + assert.NotNil(t, commit) + + hydro.Recover(context.TODO()) + assert.True(t, encoded) + assert.True(t, decoded) + assert.False(t, checked) + assert.False(t, handled) +} + +func TestHydroRecoverDiscardNoNeedEvent(t *testing.T) { + var checked, handled, encoded, decoded bool + check := func(interface{}) (need bool, err error) { + checked = true + return + } + + eventype := "create" + handler := newTestEventHandler(eventype, &checked, &handled, &encoded, &decoded) + handler.check = check + + hydro, _ := NewHydro(path.Join(t.TempDir(), "1"), time.Second) + hydro.stor = kv.NewMockedKV() + hydro.Register(handler) + + commit, err := hydro.Log(eventype, struct{}{}) + assert.NoError(t, err) + assert.NotNil(t, commit) + + hydro.Recover(context.TODO()) + assert.True(t, encoded) + assert.True(t, decoded) + assert.True(t, checked) + assert.False(t, handled) +} + +func TestHydroRecover(t *testing.T) { + var checked, handled, encoded, decoded bool + eventype := "create" + handler := newTestEventHandler(eventype, &checked, &handled, &encoded, &decoded) + + hydro, _ := NewHydro(path.Join(t.TempDir(), "1"), time.Second) + hydro.stor = kv.NewMockedKV() + hydro.Register(handler) + + commit, err := hydro.Log(eventype, struct{}{}) + assert.NoError(t, err) + assert.NotNil(t, commit) + + hydro.Recover(context.TODO()) + assert.True(t, encoded) + assert.True(t, decoded) + assert.True(t, checked) + assert.True(t, handled) + + // The handled events should be removed. + ch, _ := hydro.stor.Scan([]byte(eventPrefix)) + for range ch { + assert.Fail(t, "the events should be deleted") + } +} + +func TestHydroEventKeyMustPadZero(t *testing.T) { + event := HydroEvent{ID: 15} + assert.Equal(t, "/events/000000000000000f", string(event.Key())) +} + +func TestHydroEventParseIDShouldRemovePadding(t *testing.T) { + id, err := parseHydroEventID([]byte("/events/00000000000000000000000000f")) + assert.NoError(t, err) + assert.Equal(t, uint64(15), id) +} + +func TestHydroRecoverWithRealLithium(t *testing.T) { + p := path.Join(t.TempDir(), "temp.wal") + hydro, err := NewHydro(p, time.Second) + assert.NoError(t, err) + + handler := simpleEventHandler{ + event: "create", + encode: func(interface{}) ([]byte, error) { return []byte("{}"), nil }, + decode: func([]byte) (interface{}, error) { return struct{}{}, nil }, + check: func(interface{}) (bool, error) { return true, nil }, + handle: func(interface{}) error { return nil }, + } + hydro.Register(handler) + + hydro.Log(handler.event, struct{}{}) + hydro.Log(handler.event, struct{}{}) + hydro.Log(handler.event, struct{}{}) + + hydro.Recover(context.TODO()) + + ch, _ := hydro.stor.Scan([]byte(eventPrefix)) + for range ch { + assert.FailNow(t, "expects no data") + } +} + +func newTestEventHandler(eventype string, checked, handled, encoded, decoded *bool) simpleEventHandler { + check := func(interface{}) (bool, error) { + *checked = true + return true, nil + } + + handle := func(interface{}) (err error) { + *handled = true + return + } + + encode := func(interface{}) (bs []byte, err error) { + *encoded = true + return + } + + decode := func([]byte) (item interface{}, err error) { + *decoded = true + return + } + + return simpleEventHandler{ + event: eventype, + encode: encode, + decode: decode, + check: check, + handle: handle, + } +} diff --git a/wal2/wal.go b/wal2/wal.go new file mode 100644 index 000000000..5ee0d358e --- /dev/null +++ b/wal2/wal.go @@ -0,0 +1,49 @@ +package wal + +import ( + "context" +) + +const ( + eventPrefix = "/events/" +) + +// WAL is the interface that groups the Register and Recover interfaces. +type WAL interface { + Registry + Recoverer + Logger + OpenCloser +} + +// Recoverer is the interface that wraps the basic Recover method. +type Recoverer interface { + Recover(context.Context) +} + +// Registry is the interface that wraps the basic Register method. +type Registry interface { + Register(EventHandler) +} + +// Logger is the interface that wraps the basic Log method. +type Logger interface { + Log(string, interface{}) (Commit, error) +} + +// Closer is the interface that groups the Close methods. +type OpenCloser interface { + Close() error +} + +// EventHandler is the interface that groups a few methods. +type EventHandler interface { + Typ() string + Check(context.Context, interface{}) (need bool, err error) + Encode(interface{}) ([]byte, error) + Decode([]byte) (interface{}, error) + Handle(context.Context, interface{}) error +} + +// Commit is a function for committing an event log. +type Commit func() error diff --git a/wal2/wal_test.go b/wal2/wal_test.go new file mode 100644 index 000000000..fcc96bd4d --- /dev/null +++ b/wal2/wal_test.go @@ -0,0 +1,104 @@ +package wal + +import ( + "context" + "os" + "testing" + "time" + + "github.com/projecteru2/core/wal/kv" + + "github.com/stretchr/testify/assert" +) + +func TestRecover(t *testing.T) { + var checked bool + check := func(interface{}) (bool, error) { + checked = true + return true, nil + } + + var handled bool + handle := func(interface{}) (err error) { + handled = true + return + } + + var encoded bool + encode := func(interface{}) (bs []byte, err error) { + encoded = true + return + } + + var decoded bool + decode := func([]byte) (item interface{}, err error) { + decoded = true + return + } + + path := "/tmp/wal.unitest.wal" + os.Remove(path) + + var wal WAL + var err error + wal, err = NewHydro(path, time.Second) + assert.NoError(t, err) + defer wal.Close() + + hydro, ok := wal.(*Hydro) + assert.True(t, ok) + assert.NotNil(t, hydro) + hydro.stor = kv.NewMockedKV() + + eventype := "create" + + wal.Register(simpleEventHandler{ + event: eventype, + encode: encode, + decode: decode, + check: check, + handle: handle, + }) + + wal.Log(eventype, struct{}{}) + + wal.Recover(context.TODO()) + assert.True(t, checked) + assert.True(t, handled) + assert.True(t, encoded) + assert.True(t, decoded) +} + +// simpleEventHandler simply implements the EventHandler. +type simpleEventHandler struct { + event string + check func(raw interface{}) (bool, error) + encode func(interface{}) ([]byte, error) + decode func([]byte) (interface{}, error) + handle func(interface{}) error +} + +// Event . +func (h simpleEventHandler) Typ() string { + return h.event +} + +// Check . +func (h simpleEventHandler) Check(ctx context.Context, raw interface{}) (bool, error) { + return h.check(raw) +} + +// Encode . +func (h simpleEventHandler) Encode(raw interface{}) ([]byte, error) { + return h.encode(raw) +} + +// Decode . +func (h simpleEventHandler) Decode(bs []byte) (interface{}, error) { + return h.decode(bs) +} + +// Handle . +func (h simpleEventHandler) Handle(ctx context.Context, raw interface{}) error { + return h.handle(raw) +}