From 87e6c3102622fb8a2a7f7d8d4f29adba7e820f3e Mon Sep 17 00:00:00 2001 From: anrs Date: Fri, 18 Dec 2020 14:01:15 +0800 Subject: [PATCH] feat: core WAL (#303) Co-authored-by: anrs --- go.mod | 1 + go.sum | 2 + types/config.go | 3 + types/errors.go | 3 + wal/hydro.go | 137 +++++++++++++++++++++++++ wal/hydro_event.go | 55 ++++++++++ wal/hydro_test.go | 226 +++++++++++++++++++++++++++++++++++++++++ wal/kv/kv.go | 44 ++++++++ wal/kv/lithium.go | 173 +++++++++++++++++++++++++++++++ wal/kv/lithium_test.go | 81 +++++++++++++++ wal/kv/mocked.go | 134 ++++++++++++++++++++++++ wal/mocks/WAL.go | 48 +++++++++ wal/wal.go | 91 +++++++++++++++++ wal/wal_test.go | 67 ++++++++++++ 14 files changed, 1065 insertions(+) create mode 100644 wal/hydro.go create mode 100644 wal/hydro_event.go create mode 100644 wal/hydro_test.go create mode 100644 wal/kv/kv.go create mode 100644 wal/kv/lithium.go create mode 100644 wal/kv/lithium_test.go create mode 100644 wal/kv/mocked.go create mode 100644 wal/mocks/WAL.go create mode 100644 wal/wal.go create mode 100644 wal/wal_test.go diff --git a/go.mod b/go.mod index fdf7f2023..e4e713561 100644 --- a/go.mod +++ b/go.mod @@ -6,6 +6,7 @@ require ( github.com/CMGS/statsd v0.0.0-20160223095033-48c421b3c1ab github.com/Microsoft/hcsshim v0.8.11 // indirect github.com/alexcesaro/statsd v2.0.0+incompatible // indirect + github.com/boltdb/bolt v1.3.1 github.com/cenkalti/backoff/v4 v4.0.2 github.com/containerd/containerd v1.4.3 // indirect github.com/containerd/continuity v0.0.0-20200710164510-efbc4488d8fe // indirect diff --git a/go.sum b/go.sum index e9df1aff6..8d62f1aa7 100644 --- a/go.sum +++ b/go.sum @@ -46,6 +46,8 @@ github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+Ce github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= github.com/bgentry/speakeasy v0.1.0/go.mod h1:+zsyZBPWlz7T6j88CTgSN5bM796AkVf0kBD4zp0CCIs= +github.com/boltdb/bolt v1.3.1 h1:JQmyP4ZBrce+ZQu0dY660FMfatumYDLun9hBCUVIkF4= +github.com/boltdb/bolt v1.3.1/go.mod h1:clJnj/oiGkjum5o1McbSZDSLxVThjynRyGBgiAx27Ps= github.com/cenkalti/backoff/v4 v4.0.2 h1:JIufpQLbh4DkbQoii76ItQIUFzevQSqOLZca4eamEDs= github.com/cenkalti/backoff/v4 v4.0.2/go.mod h1:eEew/i+1Q6OrCDZh3WiXYv3+nJwBASZ8Bog/87DQnVg= github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= diff --git a/types/config.go b/types/config.go index 84be4a8c7..eb905ec2b 100644 --- a/types/config.go +++ b/types/config.go @@ -16,6 +16,9 @@ type Config struct { Auth AuthConfig `yaml:"auth"` // grpc auth GRPCConfig GRPCConfig `yaml:"grpc"` // grpc config + WALFile string `yaml:"wal_file" required:"true" default:"core.wal"` // WAL file path + WALOpenTimeout time.Duration `yaml:"wal_open_timeout" required:"true" default:"8s"` // timeout for opening a WAL file + Git GitConfig `yaml:"git"` Etcd EtcdConfig `yaml:"etcd"` Docker DockerConfig `yaml:"docker"` diff --git a/types/errors.go b/types/errors.go index a6d1c156d..47dcf8b3e 100644 --- a/types/errors.go +++ b/types/errors.go @@ -80,6 +80,9 @@ var ( ErrNodeNotExists = errors.New("node not exists") ErrWorkloadNotExists = errors.New("workload not exists") + + ErrUnregisteredWALEventType = errors.New("unregistered WAL event type") + ErrInvalidWALBucket = errors.New("invalid WAL bucket") ) // NewDetailedErr returns an error with details diff --git a/wal/hydro.go b/wal/hydro.go new file mode 100644 index 000000000..3eb7d4bc3 --- /dev/null +++ b/wal/hydro.go @@ -0,0 +1,137 @@ +package wal + +import ( + "context" + "encoding/json" + "strconv" + "strings" + "sync" + "time" + + log "github.com/sirupsen/logrus" + + coretypes "github.com/projecteru2/core/types" + "github.com/projecteru2/core/wal/kv" +) + +// Hydro is the simplest wal implementation. +type Hydro struct { + handlers sync.Map + kv kv.KV +} + +// NewHydro initailizes a new Hydro instance. +func NewHydro() *Hydro { + return &Hydro{ + kv: kv.NewLithium(), + } +} + +// Open connects a kvdb. +func (h *Hydro) Open(ctx context.Context, path string, timeout time.Duration) (err error) { + err = h.kv.Open(ctx, path, 0600, timeout) + return +} + +// Close disconnects the kvdb. +func (h *Hydro) Close(ctx context.Context) error { + return h.kv.Close(ctx) +} + +// Register registers a new event handler. +func (h *Hydro) Register(handler EventHandler) { + h.handlers.Store(handler.Event, handler) +} + +// Recover starts a disaster recovery, which will replay all the events. +func (h *Hydro) Recover(ctx context.Context) { + for ent := range h.kv.Scan(ctx, []byte(EventPrefix)) { + event, err := h.decodeEvent(ent) + if err != nil { + log.Errorf("[Recover] decode event error: %v", err) + continue + } + + handler, ok := h.getEventHandler(event.Type) + if !ok { + log.Errorf("[Recover] no such event handler for %s", event.Type) + continue + } + + if err := h.recover(ctx, handler, event); err != nil { + log.Errorf("[Recover] handle event %d (%s) failed: %v", event.ID, event.Type, err) + continue + } + } +} + +func (h *Hydro) recover(ctx context.Context, handler EventHandler, event HydroEvent) error { + item, err := handler.Decode(event.Item) + if err != nil { + return err + } + + switch handle, err := handler.Check(item); { + case err != nil: + return err + case !handle: + return event.Delete(ctx) + } + + return handler.Handle(item) +} + +// Log records a log item. +func (h *Hydro) Log(ctx context.Context, eventype string, item interface{}) (Commit, error) { + handler, ok := h.getEventHandler(eventype) + if !ok { + return nil, coretypes.NewDetailedErr(coretypes.ErrUnregisteredWALEventType, eventype) + } + + bs, err := handler.Encode(item) + if err != nil { + return nil, err + } + + event := NewHydroEvent(h.kv) + event.Type = eventype + event.Item = bs + + if err = event.Create(ctx); err != nil { + return nil, err + } + + commit := func(context.Context) error { + return event.Delete(ctx) + } + + return commit, nil +} + +func (h *Hydro) getEventHandler(event string) (handler EventHandler, ok bool) { + var raw interface{} + if raw, ok = h.handlers.Load(event); !ok { + return + } + + handler, ok = raw.(EventHandler) + + return +} + +func (h *Hydro) decodeEvent(ent kv.ScanEntry) (event HydroEvent, err error) { + if err = ent.Error(); err != nil { + return + } + + key, value := ent.Pair() + if err = json.Unmarshal(value, &event); err != nil { + return + } + + event.kv = h.kv + + event.ID, err = strconv.ParseUint(strings.TrimPrefix(string(key), EventPrefix), 10, 64) + + return +} diff --git a/wal/hydro_event.go b/wal/hydro_event.go new file mode 100644 index 000000000..72bd5d9bd --- /dev/null +++ b/wal/hydro_event.go @@ -0,0 +1,55 @@ +package wal + +import ( + "context" + "encoding/json" + "path/filepath" + "strconv" + + "github.com/projecteru2/core/wal/kv" +) + +// HydroEvent indicates a log event. +type HydroEvent struct { + // A global unique identifier. + ID uint64 `json:"id"` + + // Registered event type name. + Type string `json:"type"` + + // The encoded log item. + Item []byte `json:"item"` + + kv kv.KV +} + +// NewHydroEvent initializes a new HydroEvent instance. +func NewHydroEvent(kv kv.KV) (e *HydroEvent) { + e = &HydroEvent{} + e.kv = kv + return +} + +// Create persists this event. +func (e *HydroEvent) Create(ctx context.Context) (err error) { + if e.ID, err = e.kv.NextSequence(ctx); err != nil { + return + } + + var value []byte + if value, err = json.MarshalIndent(e, "", "\t"); err != nil { + return err + } + + return e.kv.Put(ctx, e.Key(), value) +} + +// Delete removes this event from persistence. +func (e HydroEvent) Delete(ctx context.Context) error { + return e.kv.Delete(ctx, e.Key()) +} + +// Key returns this event's key path. +func (e HydroEvent) Key() []byte { + return []byte(filepath.Join(EventPrefix, strconv.FormatUint(e.ID, 10))) +} diff --git a/wal/hydro_test.go b/wal/hydro_test.go new file mode 100644 index 000000000..edae9ae3f --- /dev/null +++ b/wal/hydro_test.go @@ -0,0 +1,226 @@ +package wal + +import ( + "context" + "fmt" + "testing" + + "github.com/stretchr/testify/require" + + "github.com/projecteru2/core/wal/kv" +) + +func TestLogFailedAsNoSuchHandler(t *testing.T) { + hydro := NewHydro() + commit, err := hydro.Log(context.Background(), "create", struct{}{}) + require.Error(t, err) + require.Nil(t, commit) +} + +func TestLogFailedAsEncodeError(t *testing.T) { + var checked, handled, encoded, decoded bool + eventype := "create" + handler := newTestEventHandler(eventype, &checked, &handled, &encoded, &decoded) + handler.Encode = func(interface{}) ([]byte, error) { return nil, fmt.Errorf("encode error") } + + hydro := NewHydro() + hydro.kv = kv.NewMockedKV() + hydro.Register(handler) + + commit, err := hydro.Log(context.Background(), eventype, struct{}{}) + require.Error(t, err) + require.Nil(t, commit) + require.False(t, encoded) + require.False(t, checked) + require.False(t, decoded) + require.False(t, handled) +} + +func TestLogWithCommitEvent(t *testing.T) { + var checked, handled, encoded, decoded bool + eventype := "create" + handler := newTestEventHandler(eventype, &checked, &handled, &encoded, &decoded) + + hydro := NewHydro() + hydro.kv = kv.NewMockedKV() + hydro.Register(handler) + + commit, err := hydro.Log(context.Background(), eventype, struct{}{}) + require.NoError(t, err) + require.NotNil(t, commit) + + require.NoError(t, commit(context.Background())) + require.True(t, encoded) + require.False(t, decoded) + require.False(t, checked) + require.False(t, handled) +} + +func TestRecoverFailedAsNoSuchHandler(t *testing.T) { + var checked, handled, encoded, decoded bool + eventype := "create" + handler := newTestEventHandler(eventype, &checked, &handled, &encoded, &decoded) + + hydro := NewHydro() + hydro.kv = kv.NewMockedKV() + hydro.Register(handler) + + commit, err := hydro.Log(context.Background(), eventype, struct{}{}) + require.NoError(t, err) + require.NotNil(t, commit) + + hydro.handlers.Delete(eventype) + + hydro.Recover(context.Background()) + require.True(t, encoded) + require.False(t, decoded) + require.False(t, checked) + require.False(t, handled) +} + +func TestRecoverFailedAsCheckError(t *testing.T) { + var checked, handled, encoded, decoded bool + eventype := "create" + handler := newTestEventHandler(eventype, &checked, &handled, &encoded, &decoded) + handler.Check = func(interface{}) (bool, error) { + checked = true + return false, fmt.Errorf("check error") + } + + hydro := NewHydro() + hydro.kv = kv.NewMockedKV() + hydro.Register(handler) + + commit, err := hydro.Log(context.Background(), eventype, struct{}{}) + require.NoError(t, err) + require.NotNil(t, commit) + + hydro.Recover(context.Background()) + require.True(t, encoded) + require.True(t, decoded) + require.True(t, checked) + require.False(t, handled) +} + +func TestDecodeEventFailedAsDecodeEntryError(t *testing.T) { + hydro := NewHydro() + ent := kv.MockedScanEntry{Value: []byte("x")} + _, err := hydro.decodeEvent(ent) + require.Error(t, err) +} + +func TestDecodeEventFailedAsInvalidEventID(t *testing.T) { + hydro := NewHydro() + ent := kv.MockedScanEntry{Key: "/events/x", Value: []byte("{}")} + _, err := hydro.decodeEvent(ent) + require.Error(t, err) +} + +func TestDecodeEventFailedAsEntryError(t *testing.T) { + hydro := NewHydro() + expErr := fmt.Errorf("expects an error") + ent := kv.MockedScanEntry{Err: expErr} + _, err := hydro.decodeEvent(ent) + require.Error(t, err) + require.Equal(t, expErr.Error(), err.Error()) +} + +func TestRecoverFailedAsDecodeLogError(t *testing.T) { + var checked, handled, encoded, decoded bool + eventype := "create" + handler := newTestEventHandler(eventype, &checked, &handled, &encoded, &decoded) + handler.Decode = func([]byte) (interface{}, error) { + decoded = true + return nil, fmt.Errorf("decode error") + } + + hydro := NewHydro() + hydro.kv = kv.NewMockedKV() + hydro.Register(handler) + + commit, err := hydro.Log(context.Background(), eventype, struct{}{}) + require.NoError(t, err) + require.NotNil(t, commit) + + hydro.Recover(context.Background()) + require.True(t, encoded) + require.True(t, decoded) + require.False(t, checked) + require.False(t, handled) +} + +func TestHydroRecoverDiscardNoNeedEvent(t *testing.T) { + var checked, handled, encoded, decoded bool + check := func(interface{}) (need bool, err error) { + checked = true + return + } + + eventype := "create" + handler := newTestEventHandler(eventype, &checked, &handled, &encoded, &decoded) + handler.Check = check + + hydro := NewHydro() + hydro.kv = kv.NewMockedKV() + hydro.Register(handler) + + commit, err := hydro.Log(context.Background(), eventype, struct{}{}) + require.NoError(t, err) + require.NotNil(t, commit) + + hydro.Recover(context.Background()) + require.True(t, encoded) + require.True(t, decoded) + require.True(t, checked) + require.False(t, handled) +} + +func TestHydroRecover(t *testing.T) { + var checked, handled, encoded, decoded bool + eventype := "create" + handler := newTestEventHandler(eventype, &checked, &handled, &encoded, &decoded) + + hydro := NewHydro() + hydro.kv = kv.NewMockedKV() + hydro.Register(handler) + + commit, err := hydro.Log(context.Background(), eventype, struct{}{}) + require.NoError(t, err) + require.NotNil(t, commit) + + hydro.Recover(context.Background()) + require.True(t, encoded) + require.True(t, decoded) + require.True(t, checked) + require.True(t, handled) +} + +func newTestEventHandler(eventype string, checked, handled, encoded, decoded *bool) EventHandler { + check := func(interface{}) (bool, error) { + *checked = true + return true, nil + } + + handle := func(interface{}) (err error) { + *handled = true + return + } + + encode := func(interface{}) (bs []byte, err error) { + *encoded = true + return + } + + decode := func([]byte) (item interface{}, err error) { + *decoded = true + return + } + + return EventHandler{ + Event: eventype, + Encode: encode, + Decode: decode, + Check: check, + Handle: handle, + } +} diff --git a/wal/kv/kv.go b/wal/kv/kv.go new file mode 100644 index 000000000..ed309e9e2 --- /dev/null +++ b/wal/kv/kv.go @@ -0,0 +1,44 @@ +package kv + +import ( + "context" + "os" + "time" +) + +// KV is the interface that groups the Simpler and Scanner interfaces. +type KV interface { + OpenCloser + Simpler + Scanner + Sequencer +} + +// Simpler is the interface that groups the basic Put, Get and Delete methods. +type Simpler interface { + Put(context.Context, []byte, []byte) error + Get(context.Context, []byte) ([]byte, error) + Delete(context.Context, []byte) error +} + +// Scanner is the interface that wraps the basic Scan method. +type Scanner interface { + Scan(context.Context, []byte) <-chan ScanEntry +} + +// Sequencer is the interface that wraps the basic NextSequence method. +type Sequencer interface { + NextSequence(context.Context) (id uint64, err error) +} + +// OpenCloser is the interface that groups the basic Open and Close methods. +type OpenCloser interface { + Open(ctx context.Context, path string, mode os.FileMode, timeout time.Duration) error + Close(context.Context) error +} + +// ScanEntry is the interface that groups the basic Pair and Error methods. +type ScanEntry interface { + Pair() (key []byte, value []byte) + Error() error +} diff --git a/wal/kv/lithium.go b/wal/kv/lithium.go new file mode 100644 index 000000000..c9ee4c9da --- /dev/null +++ b/wal/kv/lithium.go @@ -0,0 +1,173 @@ +package kv + +import ( + "bytes" + "context" + "os" + "sync" + "time" + + "github.com/boltdb/bolt" + + "github.com/projecteru2/core/types" +) + +// Lithium . +type Lithium struct { + sync.Mutex + + // Name of the root bucket. + RootBucketKey []byte + + bolt *bolt.DB +} + +// NewLithium initializes a new Lithium instance. +func NewLithium() *Lithium { + return &Lithium{ + RootBucketKey: []byte("root"), + } +} + +// Open opens a kvdb file. +func (l *Lithium) Open(ctx context.Context, path string, mode os.FileMode, timeout time.Duration) (err error) { + l.Lock() + defer l.Unlock() + + if l.bolt, err = bolt.Open(path, mode, &bolt.Options{Timeout: timeout}); err != nil { + return + } + + err = l.bolt.Update(func(tx *bolt.Tx) error { + _, ce := tx.CreateBucketIfNotExists(l.RootBucketKey) + return ce + }) + + return +} + +// Close closes the kvdb file. +func (l *Lithium) Close(ctx context.Context) error { + l.Lock() + defer l.Unlock() + return l.bolt.Close() +} + +// Put creates/updates a key/value pair. +func (l *Lithium) Put(ctx context.Context, key []byte, value []byte) (err error) { + l.Lock() + defer l.Unlock() + + return l.update(func(bkt *bolt.Bucket) error { + return bkt.Put(key, value) + }) +} + +// Get read a key's value. +func (l *Lithium) Get(ctx context.Context, key []byte) (dst []byte, err error) { + l.Lock() + defer l.Unlock() + + err = l.view(func(bkt *bolt.Bucket) error { + src := bkt.Get(key) + dst = make([]byte, len(src)) + + for n := 0; n < len(dst); { + n += copy(dst, src) + } + + return nil + }) + + return +} + +// Delete deletes a key. +func (l *Lithium) Delete(ctx context.Context, key []byte) error { + l.Lock() + defer l.Unlock() + return l.update(func(bkt *bolt.Bucket) error { + return bkt.Delete(key) + }) +} + +// Scan scans all the key/value pairs. +func (l *Lithium) Scan(ctx context.Context, prefix []byte) <-chan ScanEntry { + ch := make(chan ScanEntry) + + go func() { + defer close(ch) + + l.Lock() + defer l.Unlock() + + ent := &LithiumScanEntry{} + + scan := func(bkt *bolt.Bucket) error { + c := bkt.Cursor() + for key, value := c.First(); key != nil && bytes.HasPrefix(key, prefix); key, value = c.Next() { + ent.key = key + ent.value = value + ch <- *ent + } + return nil + } + + if err := l.view(scan); err != nil { + ent.err = err + ch <- *ent + } + }() + + return ch +} + +func (l *Lithium) view(fn func(*bolt.Bucket) error) error { + return l.bolt.Update(func(tx *bolt.Tx) error { + bkt, err := l.getBucket(tx, l.RootBucketKey) + if err != nil { + return err + } + return fn(bkt) + }) +} + +func (l *Lithium) update(fn func(*bolt.Bucket) error) error { + return l.bolt.Update(func(tx *bolt.Tx) error { + bkt, err := l.getBucket(tx, l.RootBucketKey) + if err != nil { + return err + } + return fn(bkt) + }) +} + +func (l *Lithium) getBucket(tx *bolt.Tx, key []byte) (bkt *bolt.Bucket, err error) { + bkt = tx.Bucket(l.RootBucketKey) + if bkt == nil { + err = types.NewDetailedErr(types.ErrInvalidWALBucket, key) + } + return +} + +// NextSequence generates a new sequence. +func (l *Lithium) NextSequence(context.Context) (uint64, error) { + return 0, nil +} + +// LithiumScanEntry indicates an entry of scanning. +type LithiumScanEntry struct { + err error + key []byte + value []byte +} + +// Pair means a pair of key/value. +func (e LithiumScanEntry) Pair() ([]byte, []byte) { + return e.key, e.value +} + +// Error . +func (e LithiumScanEntry) Error() error { + return e.err +} diff --git a/wal/kv/lithium_test.go b/wal/kv/lithium_test.go new file mode 100644 index 000000000..adc855247 --- /dev/null +++ b/wal/kv/lithium_test.go @@ -0,0 +1,81 @@ +package kv + +import ( + "context" + "os" + "testing" + "time" + + "github.com/stretchr/testify/require" +) + +func TestSet(t *testing.T) { + lit, cancel := newTestLithium(t) + defer cancel() + require.NoError(t, lit.Put(context.Background(), []byte("key"), []byte("value"))) +} + +func TestGet(t *testing.T) { + lit, cancel := newTestLithium(t) + defer cancel() + + key := []byte("key") + value := []byte("value") + require.NoError(t, lit.Put(context.Background(), key, value)) + + act, err := lit.Get(context.Background(), key) + require.NoError(t, err) + require.Equal(t, value, act) +} + +func TestDelete(t *testing.T) { + lit, cancel := newTestLithium(t) + defer cancel() + + key := []byte("key") + value := []byte("value") + require.NoError(t, lit.Put(context.Background(), key, value)) + + act, err := lit.Get(context.Background(), key) + require.NoError(t, err) + require.Equal(t, value, act) + + // deletes the key + require.NoError(t, lit.Delete(context.Background(), key)) + + act, err = lit.Get(context.Background(), key) + require.NoError(t, err) + require.Equal(t, []byte{}, act) +} + +func TestScan(t *testing.T) { + lit, cancel := newTestLithium(t) + defer cancel() + + key := []byte("/p1/key") + value := []byte("value") + require.NoError(t, lit.Put(context.Background(), key, value)) + require.NoError(t, lit.Put(context.Background(), []byte("/p2/key"), value)) + + ch := lit.Scan(context.Background(), []byte("/p1/")) + require.Equal(t, LithiumScanEntry{key: key, value: value}, <-ch) + require.Equal(t, nil, <-ch) +} + +func TestNextSequence(t *testing.T) { + // TODO +} + +func newTestLithium(t *testing.T) (lit *Lithium, cancel func()) { + path := "/tmp/lithium.unitest.wal" + os.Remove(path) + + lit = NewLithium() + require.NoError(t, lit.Open(context.Background(), path, 0666, time.Second)) + + cancel = func() { + require.NoError(t, lit.Close(context.Background())) + } + + return +} diff --git a/wal/kv/mocked.go b/wal/kv/mocked.go new file mode 100644 index 000000000..31f27df60 --- /dev/null +++ b/wal/kv/mocked.go @@ -0,0 +1,134 @@ +package kv + +import ( + "context" + "fmt" + "os" + "strings" + "sync" + "time" +) + +// MockedKV . +type MockedKV struct { + sync.Mutex + pool sync.Map + nextSeq uint64 +} + +// NewMockedKV . +func NewMockedKV() *MockedKV { + return &MockedKV{ + nextSeq: 1, + } +} + +// Open . +func (m *MockedKV) Open(ctx context.Context, path string, mode os.FileMode, timeout time.Duration) error { + return nil +} + +// Close . +func (m *MockedKV) Close(context.Context) error { + keys := []interface{}{} + m.pool.Range(func(key, _ interface{}) bool { + keys = append(keys, key) + return true + }) + + for _, key := range keys { + m.pool.Delete(key) + } + + return nil +} + +// NextSequence . +func (m *MockedKV) NextSequence(ctx context.Context) (nextSeq uint64, err error) { + m.Lock() + defer m.Unlock() + nextSeq = m.nextSeq + m.nextSeq++ + return +} + +// Put . +func (m *MockedKV) Put(ctx context.Context, key, value []byte) (err error) { + m.pool.Store(string(key), value) + return +} + +// Get . +func (m *MockedKV) Get(ctx context.Context, key []byte) (value []byte, err error) { + raw, ok := m.pool.Load(string(key)) + if !ok { + err = fmt.Errorf("no such key: %s", key) + return + } + + if value, ok = raw.([]byte); !ok { + err = fmt.Errorf("value must be a []byte, but %v", raw) + } + + return +} + +// Delete . +func (m *MockedKV) Delete(ctx context.Context, key []byte) (err error) { + m.pool.Delete(string(key)) + return +} + +// Scan . +func (m *MockedKV) Scan(ctx context.Context, prefix []byte) <-chan ScanEntry { + ch := make(chan ScanEntry) + + go func() { + defer close(ch) + + m.pool.Range(func(rkey, rvalue interface{}) (next bool) { + next = true + + var entry MockedScanEntry + defer func() { + ch <- entry + }() + + var ok bool + if entry.Key, ok = rkey.(string); !ok { + entry.Err = fmt.Errorf("key must be a string, but %v", rkey) + return + } + + if !strings.HasPrefix(entry.Key, string(prefix)) { + return + } + + if entry.Value, ok = rvalue.([]byte); !ok { + entry.Err = fmt.Errorf("value must be a []byte, but %v", rvalue) + return + } + + return + }) + }() + + return ch +} + +// MockedScanEntry . +type MockedScanEntry struct { + Err error + Key string + Value []byte +} + +// Pair . +func (e MockedScanEntry) Pair() ([]byte, []byte) { + return []byte(e.Key), e.Value +} + +// Error . +func (e MockedScanEntry) Error() error { + return e.Err +} diff --git a/wal/mocks/WAL.go b/wal/mocks/WAL.go new file mode 100644 index 000000000..47b1b4f47 --- /dev/null +++ b/wal/mocks/WAL.go @@ -0,0 +1,48 @@ +// Code generated by mockery v2.3.0. DO NOT EDIT. + +package mocks + +import ( + context "context" + + wal "github.com/projecteru2/core/wal" + mock "github.com/stretchr/testify/mock" +) + +// WAL is an autogenerated mock type for the WAL type +type WAL struct { + mock.Mock +} + +// Log provides a mock function with given fields: _a0, _a1, _a2 +func (_m *WAL) Log(_a0 context.Context, _a1 string, _a2 interface{}) (wal.Commit, error) { + ret := _m.Called(_a0, _a1, _a2) + + var r0 wal.Commit + if rf, ok := ret.Get(0).(func(context.Context, string, interface{}) wal.Commit); ok { + r0 = rf(_a0, _a1, _a2) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(wal.Commit) + } + } + + var r1 error + if rf, ok := ret.Get(1).(func(context.Context, string, interface{}) error); ok { + r1 = rf(_a0, _a1, _a2) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// Recover provides a mock function with given fields: _a0 +func (_m *WAL) Recover(_a0 context.Context) { + _m.Called(_a0) +} + +// Register provides a mock function with given fields: _a0 +func (_m *WAL) Register(_a0 wal.EventHandler) { + _m.Called(_a0) +} diff --git a/wal/wal.go b/wal/wal.go new file mode 100644 index 000000000..8260a6216 --- /dev/null +++ b/wal/wal.go @@ -0,0 +1,91 @@ +package wal + +import ( + "context" + "time" +) + +const ( + // EventPrefix indicates the key prefix of all events' keys. + EventPrefix = "/events/" +) + +// WAL is the interface that groups the Register and Recover interfaces. +type WAL interface { + Registry + Recoverer + Logger + OpenCloser +} + +// Recoverer is the interface that wraps the basic Recover method. +type Recoverer interface { + Recover(context.Context) +} + +// Registry is the interface that wraps the basic Register method. +type Registry interface { + Register(EventHandler) +} + +// Logger is the interface that wraps the basic Log method. +type Logger interface { + Log(context.Context, string, interface{}) (Commit, error) +} + +// OpenCloser is the interface that groups the basic Open and Close methods. +type OpenCloser interface { + Open(context.Context, string, time.Duration) error + Close(context.Context) error +} + +// EventHandler indicates a handler of a specific event. +type EventHandler struct { + Event string + Check Check + Encode Encode + Decode Decode + Handle Handle +} + +// Encode is a function to encode a log item +type Encode func(interface{}) ([]byte, error) + +// Decode is a function to decode bytes to an interface{} +type Decode func([]byte) (interface{}, error) + +// Handle is a function to play a log item. +type Handle func(interface{}) error + +// Check is a function for checking a log item whether need to be played it. +type Check func(interface{}) (need bool, err error) + +// Commit is a function for committing an event log. +type Commit func(context.Context) error + +// Register registers a new event to doit. +func Register(handler EventHandler) { + wal.Register(handler) +} + +// Log records a log item. +func Log(ctx context.Context, event string, item interface{}) (Commit, error) { + return wal.Log(ctx, event, item) +} + +// Recover makes a disaster recovery. +func Recover(ctx context.Context) { + wal.Recover(ctx) +} + +// Close closes a WAL file. +func Close(ctx context.Context) error { + return wal.Close(ctx) +} + +// Open opens a WAL file. +func Open(ctx context.Context, path string, timeout time.Duration) error { + return wal.Open(ctx, path, timeout) +} + +var wal WAL = NewHydro() diff --git a/wal/wal_test.go b/wal/wal_test.go new file mode 100644 index 000000000..74d9d70b3 --- /dev/null +++ b/wal/wal_test.go @@ -0,0 +1,67 @@ +package wal + +import ( + "context" + "os" + "testing" + "time" + + "github.com/stretchr/testify/require" + + "github.com/projecteru2/core/wal/kv" +) + +func TestRecover(t *testing.T) { + var checked bool + check := func(interface{}) (bool, error) { + checked = true + return true, nil + } + + var handled bool + handle := func(interface{}) (err error) { + handled = true + return + } + + var encoded bool + encode := func(interface{}) (bs []byte, err error) { + encoded = true + return + } + + var decoded bool + decode := func([]byte) (item interface{}, err error) { + decoded = true + return + } + + path := "/tmp/wal.unitest.wal" + os.Remove(path) + + require.NoError(t, Open(context.Background(), path, time.Second)) + defer Close(context.Background()) + + hydro, ok := wal.(*Hydro) + require.True(t, ok) + require.NotNil(t, hydro) + hydro.kv = kv.NewMockedKV() + + eventype := "create" + + Register(EventHandler{ + Event: eventype, + Encode: encode, + Decode: decode, + Check: check, + Handle: handle, + }) + + Log(context.Background(), eventype, struct{}{}) + + Recover(context.Background()) + require.True(t, checked) + require.True(t, handled) + require.True(t, encoded) + require.True(t, decoded) +}