From a9af1723bc5d4fe55f36cef3625bfc1e6201ef02 Mon Sep 17 00:00:00 2001 From: CMGS Date: Fri, 11 Mar 2022 13:49:29 +0800 Subject: [PATCH] refactor wal2 --- Makefile | 1 + types/errors.go | 1 + wal2/event.go | 42 +++++++ wal2/hydro.go | 142 +++++++++++++++++++++ wal2/hydro_test.go | 271 ++++++++++++++++++++++++++++++++++++++++ wal2/kv/kv.go | 43 +++++++ wal2/kv/lithium.go | 201 +++++++++++++++++++++++++++++ wal2/kv/lithium_test.go | 160 ++++++++++++++++++++++++ wal2/kv/mocked.go | 137 ++++++++++++++++++++ wal2/kv/mocked_test.go | 47 +++++++ wal2/mocks/WAL.go | 78 ++++++++++++ wal2/wal.go | 49 ++++++++ wal2/wal_test.go | 104 +++++++++++++++ 13 files changed, 1276 insertions(+) create mode 100644 wal2/event.go create mode 100644 wal2/hydro.go create mode 100644 wal2/hydro_test.go create mode 100644 wal2/kv/kv.go create mode 100644 wal2/kv/lithium.go create mode 100644 wal2/kv/lithium_test.go create mode 100644 wal2/kv/mocked.go create mode 100644 wal2/kv/mocked_test.go create mode 100644 wal2/mocks/WAL.go create mode 100644 wal2/wal.go create mode 100644 wal2/wal_test.go diff --git a/Makefile b/Makefile index b9a620496..329083140 100644 --- a/Makefile +++ b/Makefile @@ -36,6 +36,7 @@ mock: deps mockery --dir store --output store/mocks --name Store mockery --dir engine --output engine/mocks --name API mockery --dir cluster --output cluster/mocks --name Cluster + mockery --dir wal2 --output wal2/mocks --name WAL mockery --dir lock --output lock/mocks --name DistributedLock mockery --dir store/etcdv3/meta --output store/etcdv3/meta/mocks --all mockery --dir vendor/go.etcd.io/etcd/client/v3 --output store/etcdv3/meta/mocks --name Txn diff --git a/types/errors.go b/types/errors.go index ef180d205..b0e4294bb 100644 --- a/types/errors.go +++ b/types/errors.go @@ -87,6 +87,7 @@ var ( ErrEntityNotExists = errors.New("entity not exists") ErrUnregisteredWALEventType = errors.New("unregistered WAL event type") + ErrBadWALEvent = errors.New("bad WAL event type") ErrInvalidWALBucket = errors.New("invalid WAL bucket") ErrInvalidType = errors.New("invalid type") ErrLockSessionDone = errors.New("lock session done") diff --git a/wal2/event.go b/wal2/event.go new file mode 100644 index 000000000..f09dda67a --- /dev/null +++ b/wal2/event.go @@ -0,0 +1,42 @@ +package wal2 + +import ( + "encoding/json" + "fmt" + "path/filepath" + "strconv" + "strings" +) + +// HydroEvent indicates a log event. +type HydroEvent struct { + // A global unique identifier. + ID uint64 `json:"id"` + + // Registered event type name. + Type string `json:"type"` + + // The encoded log item. + Item []byte `json:"item"` +} + +// NewHydroEvent initializes a new HydroEvent instance. +func NewHydroEvent(ID uint64, typ string, item []byte) *HydroEvent { + return &HydroEvent{ID: ID, Type: typ, Item: item} +} + +// Encode this event +func (e HydroEvent) Encode() ([]byte, error) { + return json.MarshalIndent(e, "", "\t") +} + +// Key returns this event's key path. +func (e HydroEvent) Key() []byte { + return []byte(filepath.Join(eventPrefix, fmt.Sprintf("%016x", e.ID))) +} + +func parseHydroEventID(key []byte) (uint64, error) { + // Trims the EventPrefix, then trims the padding 0. + id := strings.TrimLeft(strings.TrimPrefix(string(key), eventPrefix), "0") + return strconv.ParseUint(id, 16, 64) +} diff --git a/wal2/hydro.go b/wal2/hydro.go new file mode 100644 index 000000000..2c3c07edd --- /dev/null +++ b/wal2/hydro.go @@ -0,0 +1,142 @@ +package wal2 + +import ( + "context" + "encoding/json" + "time" + + "github.com/cornelk/hashmap" + "github.com/projecteru2/core/log" + coretypes "github.com/projecteru2/core/types" + "github.com/projecteru2/core/wal2/kv" +) + +const ( + fileMode = 0600 +) + +// Hydro is the simplest wal implementation. +type Hydro struct { + hashmap.HashMap + stor kv.KV +} + +// NewHydro initailizes a new Hydro instance. +func NewHydro(path string, timeout time.Duration) (*Hydro, error) { + stor := kv.NewLithium() + if err := stor.Open(path, fileMode, timeout); err != nil { + return nil, err + } + return &Hydro{HashMap: hashmap.HashMap{}, stor: stor}, nil +} + +// Close disconnects the kvdb. +func (h *Hydro) Close() error { + return h.stor.Close() +} + +// Register registers a new event handler. +func (h *Hydro) Register(handler EventHandler) { + h.Set(handler.Typ(), handler) +} + +// Recover starts a disaster recovery, which will replay all the events. +func (h *Hydro) Recover(ctx context.Context) { + ch, _ := h.stor.Scan([]byte(eventPrefix)) + + for scanEntry := range ch { + event, err := h.decodeEvent(scanEntry) + if err != nil { + log.Errorf(nil, "[Recover] decode event error: %v", err) // nolint + continue + } + + handler, ok := h.getEventHandler(event.Type) + if !ok { + log.Errorf(nil, "[Recover] no such event handler for %s", event.Type) // nolint + continue + } + + if err := h.recover(ctx, handler, event); err != nil { + log.Errorf(nil, "[Recover] handle event %d (%s) failed: %v", event.ID, event.Type, err) // nolint + continue + } + } +} + +// Log records a log item. +func (h *Hydro) Log(eventyp string, item interface{}) (Commit, error) { + handler, ok := h.getEventHandler(eventyp) + if !ok { + return nil, coretypes.NewDetailedErr(coretypes.ErrUnregisteredWALEventType, eventyp) + } + + bs, err := handler.Encode(item) // TODO 2 times encode is necessary? + if err != nil { + return nil, err + } + + var id uint64 + if id, err = h.stor.NextSequence(); err != nil { + return nil, err + } + + event := NewHydroEvent(id, eventyp, bs) + if bs, err = event.Encode(); err != nil { + return nil, coretypes.ErrBadWALEvent + } + + if err = h.stor.Put(event.Key(), bs); err != nil { + return nil, err + } + + return func() error { + return h.stor.Delete(event.Key()) + }, nil +} + +func (h *Hydro) recover(ctx context.Context, handler EventHandler, event HydroEvent) error { + item, err := handler.Decode(event.Item) + if err != nil { + return err + } + + delete := func() error { + return h.stor.Delete(event.Key()) + } + + switch handle, err := handler.Check(ctx, item); { + case err != nil: + return err + case !handle: + return delete() + default: + if err := handler.Handle(ctx, item); err != nil { + return err + } + } + return delete() +} + +func (h *Hydro) getEventHandler(eventyp string) (EventHandler, bool) { + v, ok := h.GetStringKey(eventyp) + if !ok { + return nil, ok + } + handler, ok := v.(EventHandler) + return handler, ok +} + +func (h *Hydro) decodeEvent(scanEntry kv.ScanEntry) (event HydroEvent, err error) { + if err = scanEntry.Error(); err != nil { + return + } + + key, value := scanEntry.Pair() + if err = json.Unmarshal(value, &event); err != nil { + return + } + + event.ID, err = parseHydroEventID(key) + return +} diff --git a/wal2/hydro_test.go b/wal2/hydro_test.go new file mode 100644 index 000000000..3ff6fd2b9 --- /dev/null +++ b/wal2/hydro_test.go @@ -0,0 +1,271 @@ +package wal2 + +import ( + "context" + "fmt" + "path" + "testing" + "time" + + "github.com/projecteru2/core/wal2/kv" + + "github.com/stretchr/testify/assert" +) + +func TestLogFailedAsNoSuchHandler(t *testing.T) { + hydro, _ := NewHydro(path.Join(t.TempDir(), "1"), time.Second) + commit, err := hydro.Log("create", struct{}{}) + assert.Error(t, err) + assert.Nil(t, commit) +} + +func TestLogFailedAsEncodeError(t *testing.T) { + var checked, handled, encoded, decoded bool + eventype := "create" + handler := newTestEventHandler(eventype, &checked, &handled, &encoded, &decoded) + handler.encode = func(interface{}) ([]byte, error) { return nil, fmt.Errorf("encode error") } + + hydro, _ := NewHydro(path.Join(t.TempDir(), "1"), time.Second) + hydro.stor = kv.NewMockedKV() + hydro.Register(handler) + + commit, err := hydro.Log(eventype, struct{}{}) + assert.Error(t, err) + assert.Nil(t, commit) + assert.False(t, encoded) + assert.False(t, checked) + assert.False(t, decoded) + assert.False(t, handled) +} + +func TestLogWithCommitEvent(t *testing.T) { + var checked, handled, encoded, decoded bool + eventype := "create" + handler := newTestEventHandler(eventype, &checked, &handled, &encoded, &decoded) + + hydro, _ := NewHydro(path.Join(t.TempDir(), "1"), time.Second) + hydro.stor = kv.NewMockedKV() + hydro.Register(handler) + + commit, err := hydro.Log(eventype, struct{}{}) + assert.NoError(t, err) + assert.NotNil(t, commit) + + assert.NoError(t, commit()) + assert.True(t, encoded) + assert.False(t, decoded) + assert.False(t, checked) + assert.False(t, handled) +} + +func TestRecoverFailedAsNoSuchHandler(t *testing.T) { + var checked, handled, encoded, decoded bool + eventype := "create" + handler := newTestEventHandler(eventype, &checked, &handled, &encoded, &decoded) + + hydro, _ := NewHydro(path.Join(t.TempDir(), "1"), time.Second) + hydro.stor = kv.NewMockedKV() + hydro.Register(handler) + + commit, err := hydro.Log(eventype, struct{}{}) + assert.NoError(t, err) + assert.NotNil(t, commit) + + hydro.Del(eventype) + + hydro.Recover(context.TODO()) + assert.True(t, encoded) + assert.False(t, decoded) + assert.False(t, checked) + assert.False(t, handled) +} + +func TestRecoverFailedAsCheckError(t *testing.T) { + var checked, handled, encoded, decoded bool + eventype := "create" + handler := newTestEventHandler(eventype, &checked, &handled, &encoded, &decoded) + handler.check = func(interface{}) (bool, error) { + checked = true + return false, fmt.Errorf("check error") + } + + hydro, _ := NewHydro(path.Join(t.TempDir(), "1"), time.Second) + hydro.stor = kv.NewMockedKV() + hydro.Register(handler) + + commit, err := hydro.Log(eventype, struct{}{}) + assert.NoError(t, err) + assert.NotNil(t, commit) + + hydro.Recover(context.TODO()) + assert.True(t, encoded) + assert.True(t, decoded) + assert.True(t, checked) + assert.False(t, handled) +} + +func TestDecodeEventFailedAsDecodeEntryError(t *testing.T) { + hydro, _ := NewHydro(path.Join(t.TempDir(), "1"), time.Second) + ent := kv.MockedScanEntry{Value: []byte("x")} + _, err := hydro.decodeEvent(ent) + assert.Error(t, err) +} + +func TestDecodeEventFailedAsInvalidEventID(t *testing.T) { + hydro, _ := NewHydro(path.Join(t.TempDir(), "1"), time.Second) + ent := kv.MockedScanEntry{Key: "/events/x", Value: []byte("{}")} + _, err := hydro.decodeEvent(ent) + assert.Error(t, err) +} + +func TestDecodeEventFailedAsEntryError(t *testing.T) { + hydro, _ := NewHydro(path.Join(t.TempDir(), "1"), time.Second) + expErr := fmt.Errorf("expects an error") + ent := kv.MockedScanEntry{Err: expErr} + _, err := hydro.decodeEvent(ent) + assert.Error(t, err) + assert.Equal(t, expErr.Error(), err.Error()) +} + +func TestRecoverFailedAsDecodeLogError(t *testing.T) { + var checked, handled, encoded, decoded bool + eventype := "create" + handler := newTestEventHandler(eventype, &checked, &handled, &encoded, &decoded) + handler.decode = func([]byte) (interface{}, error) { + decoded = true + return nil, fmt.Errorf("decode error") + } + + hydro, _ := NewHydro(path.Join(t.TempDir(), "1"), time.Second) + hydro.stor = kv.NewMockedKV() + hydro.Register(handler) + + commit, err := hydro.Log(eventype, struct{}{}) + assert.NoError(t, err) + assert.NotNil(t, commit) + + hydro.Recover(context.TODO()) + assert.True(t, encoded) + assert.True(t, decoded) + assert.False(t, checked) + assert.False(t, handled) +} + +func TestHydroRecoverDiscardNoNeedEvent(t *testing.T) { + var checked, handled, encoded, decoded bool + check := func(interface{}) (need bool, err error) { + checked = true + return + } + + eventype := "create" + handler := newTestEventHandler(eventype, &checked, &handled, &encoded, &decoded) + handler.check = check + + hydro, _ := NewHydro(path.Join(t.TempDir(), "1"), time.Second) + hydro.stor = kv.NewMockedKV() + hydro.Register(handler) + + commit, err := hydro.Log(eventype, struct{}{}) + assert.NoError(t, err) + assert.NotNil(t, commit) + + hydro.Recover(context.TODO()) + assert.True(t, encoded) + assert.True(t, decoded) + assert.True(t, checked) + assert.False(t, handled) +} + +func TestHydroRecover(t *testing.T) { + var checked, handled, encoded, decoded bool + eventype := "create" + handler := newTestEventHandler(eventype, &checked, &handled, &encoded, &decoded) + + hydro, _ := NewHydro(path.Join(t.TempDir(), "1"), time.Second) + hydro.stor = kv.NewMockedKV() + hydro.Register(handler) + + commit, err := hydro.Log(eventype, struct{}{}) + assert.NoError(t, err) + assert.NotNil(t, commit) + + hydro.Recover(context.TODO()) + assert.True(t, encoded) + assert.True(t, decoded) + assert.True(t, checked) + assert.True(t, handled) + + // The handled events should be removed. + ch, _ := hydro.stor.Scan([]byte(eventPrefix)) + for range ch { + assert.Fail(t, "the events should be deleted") + } +} + +func TestHydroEventKeyMustPadZero(t *testing.T) { + event := HydroEvent{ID: 15} + assert.Equal(t, "/events/000000000000000f", string(event.Key())) +} + +func TestHydroEventParseIDShouldRemovePadding(t *testing.T) { + id, err := parseHydroEventID([]byte("/events/00000000000000000000000000f")) + assert.NoError(t, err) + assert.Equal(t, uint64(15), id) +} + +func TestHydroRecoverWithRealLithium(t *testing.T) { + p := path.Join(t.TempDir(), "temp.wal") + hydro, err := NewHydro(p, time.Second) + assert.NoError(t, err) + + handler := simpleEventHandler{ + event: "create", + encode: func(interface{}) ([]byte, error) { return []byte("{}"), nil }, + decode: func([]byte) (interface{}, error) { return struct{}{}, nil }, + check: func(interface{}) (bool, error) { return true, nil }, + handle: func(interface{}) error { return nil }, + } + hydro.Register(handler) + + hydro.Log(handler.event, struct{}{}) + hydro.Log(handler.event, struct{}{}) + hydro.Log(handler.event, struct{}{}) + + hydro.Recover(context.TODO()) + + ch, _ := hydro.stor.Scan([]byte(eventPrefix)) + for range ch { + assert.FailNow(t, "expects no data") + } +} + +func newTestEventHandler(eventype string, checked, handled, encoded, decoded *bool) simpleEventHandler { + check := func(interface{}) (bool, error) { + *checked = true + return true, nil + } + + handle := func(interface{}) (err error) { + *handled = true + return + } + + encode := func(interface{}) (bs []byte, err error) { + *encoded = true + return + } + + decode := func([]byte) (item interface{}, err error) { + *decoded = true + return + } + + return simpleEventHandler{ + event: eventype, + encode: encode, + decode: decode, + check: check, + handle: handle, + } +} diff --git a/wal2/kv/kv.go b/wal2/kv/kv.go new file mode 100644 index 000000000..f3a76b049 --- /dev/null +++ b/wal2/kv/kv.go @@ -0,0 +1,43 @@ +package kv + +import ( + "os" + "time" +) + +// KV is the interface that groups the Simpler and Scanner interfaces. +type KV interface { + OpenCloser + Simpler + Scanner + Sequencer +} + +// Simpler is the interface that groups the basic Put, Get and Delete methods. +type Simpler interface { + Put([]byte, []byte) error + Get([]byte) ([]byte, error) + Delete([]byte) error +} + +// Scanner is the interface that wraps the basic Scan method. +type Scanner interface { + Scan([]byte) (<-chan ScanEntry, func()) +} + +// Sequencer is the interface that wraps the basic NextSequence method. +type Sequencer interface { + NextSequence() (id uint64, err error) +} + +// OpenCloser is the interface that groups the basic Open and Close methods. +type OpenCloser interface { + Open(path string, mode os.FileMode, timeout time.Duration) error + Close() error +} + +// ScanEntry is the interface that groups the basic Pair and Error methods. +type ScanEntry interface { + Pair() (key []byte, value []byte) + Error() error +} diff --git a/wal2/kv/lithium.go b/wal2/kv/lithium.go new file mode 100644 index 000000000..98dc22f39 --- /dev/null +++ b/wal2/kv/lithium.go @@ -0,0 +1,201 @@ +package kv + +import ( + "bytes" + "os" + "sync" + "time" + + "github.com/projecteru2/core/types" + + "go.etcd.io/bbolt" +) + +// Lithium . +type Lithium struct { + sync.Mutex + + // Name of the root bucket. + RootBucketKey []byte + + bolt *bbolt.DB + path string + mode os.FileMode + timeout time.Duration +} + +// NewLithium initializes a new Lithium instance. +func NewLithium() *Lithium { + return &Lithium{ + RootBucketKey: []byte("root"), + } +} + +// Reopen re-open a kvdb file. +func (l *Lithium) Reopen() error { + l.Lock() + defer l.Unlock() + + if err := l.close(); err != nil { + return err + } + + return l.open() +} + +// Open opens a kvdb file. +func (l *Lithium) Open(path string, mode os.FileMode, timeout time.Duration) (err error) { + l.Lock() + defer l.Unlock() + + l.path = path + l.mode = mode + l.timeout = timeout + + return l.open() +} + +// Close closes the kvdb file. +func (l *Lithium) Close() error { + l.Lock() + defer l.Unlock() + return l.close() +} + +// Put creates/updates a key/value pair. +func (l *Lithium) Put(key []byte, value []byte) (err error) { + return l.update(func(bkt *bbolt.Bucket) error { + return bkt.Put(key, value) + }) +} + +// Get read a key's value. +func (l *Lithium) Get(key []byte) (dst []byte, err error) { + err = l.view(func(bkt *bbolt.Bucket) error { + src := bkt.Get(key) + dst = make([]byte, len(src)) + + for n := 0; n < len(dst); { + n += copy(dst, src) + } + + return nil + }) + + return +} + +// Delete deletes a key. +func (l *Lithium) Delete(key []byte) error { + return l.update(func(bkt *bbolt.Bucket) error { + return bkt.Delete(key) + }) +} + +// Scan scans all the key/value pairs. +func (l *Lithium) Scan(prefix []byte) (<-chan ScanEntry, func()) { + ch := make(chan ScanEntry) + + exit := make(chan struct{}) + abort := func() { + close(exit) + } + + go func() { + defer close(ch) + + scan := func(bkt *bbolt.Bucket) error { + c := bkt.Cursor() + for key, value := c.First(); key != nil && bytes.HasPrefix(key, prefix); key, value = c.Next() { + select { + case <-exit: + return nil + case ch <- LithiumScanEntry{key: key, value: value}: + } + } + return nil + } + + if err := l.view(scan); err != nil { + select { + case <-exit: + case ch <- LithiumScanEntry{err: err}: + } + } + }() + + return ch, abort +} + +// NextSequence generates a new sequence. +func (l *Lithium) NextSequence() (uint64, error) { + var seq uint64 + err := l.update(func(bkt *bbolt.Bucket) (ue error) { + seq, ue = bkt.NextSequence() + return + }) + + return seq, err +} + +func (l *Lithium) open() (err error) { + if l.bolt, err = bbolt.Open(l.path, l.mode, &bbolt.Options{Timeout: l.timeout}); err != nil { + return + } + + err = l.bolt.Update(func(tx *bbolt.Tx) error { + _, ce := tx.CreateBucketIfNotExists(l.RootBucketKey) + return ce + }) + + return +} + +func (l *Lithium) close() error { + return l.bolt.Close() +} + +func (l *Lithium) view(fn func(*bbolt.Bucket) error) error { + return l.bolt.Update(func(tx *bbolt.Tx) error { + bkt, err := l.getBucket(tx, l.RootBucketKey) + if err != nil { + return err + } + return fn(bkt) + }) +} + +func (l *Lithium) update(fn func(*bbolt.Bucket) error) error { + return l.bolt.Update(func(tx *bbolt.Tx) error { + bkt, err := l.getBucket(tx, l.RootBucketKey) + if err != nil { + return err + } + return fn(bkt) + }) +} + +func (l *Lithium) getBucket(tx *bbolt.Tx, key []byte) (bkt *bbolt.Bucket, err error) { + bkt = tx.Bucket(l.RootBucketKey) + if bkt == nil { + err = types.NewDetailedErr(types.ErrInvalidWALBucket, key) + } + return +} + +// LithiumScanEntry indicates an entry of scanning. +type LithiumScanEntry struct { + err error + key []byte + value []byte +} + +// Pair means a pair of key/value. +func (e LithiumScanEntry) Pair() ([]byte, []byte) { + return e.key, e.value +} + +// Error . +func (e LithiumScanEntry) Error() error { + return e.err +} diff --git a/wal2/kv/lithium_test.go b/wal2/kv/lithium_test.go new file mode 100644 index 000000000..a7632c1fd --- /dev/null +++ b/wal2/kv/lithium_test.go @@ -0,0 +1,160 @@ +package kv + +import ( + "context" + "fmt" + "os" + "strconv" + "strings" + "testing" + "time" + + "github.com/stretchr/testify/require" +) + +func TestSet(t *testing.T) { + lit, cancel := newTestLithium(t) + defer cancel() + require.NoError(t, lit.Put([]byte("key"), []byte("value"))) +} + +func TestGet(t *testing.T) { + lit, cancel := newTestLithium(t) + defer cancel() + + key := []byte("key") + value := []byte("value") + require.NoError(t, lit.Put(key, value)) + + act, err := lit.Get(key) + require.NoError(t, err) + require.Equal(t, value, act) +} + +func TestDelete(t *testing.T) { + lit, cancel := newTestLithium(t) + defer cancel() + + key := []byte("key") + value := []byte("value") + require.NoError(t, lit.Put(key, value)) + + act, err := lit.Get(key) + require.NoError(t, err) + require.Equal(t, value, act) + + // deletes the key + require.NoError(t, lit.Delete(key)) + + act, err = lit.Get(key) + require.NoError(t, err) + require.Equal(t, []byte{}, act) +} + +func TestScan(t *testing.T) { + lit, cancel := newTestLithium(t) + defer cancel() + + key := []byte("/p1/key") + value := []byte("value") + require.NoError(t, lit.Put(key, value)) + require.NoError(t, lit.Put([]byte("/p2/key"), value)) + + ch, _ := lit.Scan([]byte("/p1/")) + require.Equal(t, LithiumScanEntry{key: key, value: value}, <-ch) + require.Nil(t, <-ch) +} + +func TestScanAbort(t *testing.T) { + lit, cancel := newTestLithium(t) + defer cancel() + + for i := 0; i < 10; i++ { + key := []byte(fmt.Sprintf("p%d", i)) + require.NoError(t, lit.Put(key, []byte("v"))) + } + + ch, abort := lit.Scan([]byte("p")) + abort() + + // before the above abort() has been finished, the scanned key/value pair + // had sent to ch already, then the code tries to recv again to make sure the + // ch had been closed. + if real := <-ch; real != nil { + require.Nil(t, <-ch) + } +} + +func TestNextSequence(t *testing.T) { + lit, cancel := newTestLithium(t) + defer cancel() + + seq0, err := lit.NextSequence() + require.NoError(t, err) + require.True(t, seq0 > 0) + + seq1, err := lit.NextSequence() + require.NoError(t, err) + require.True(t, seq1 > seq0) + + // Closes and Reopens + require.NoError(t, lit.Reopen()) + + seq2, err := lit.NextSequence() + require.NoError(t, err) + require.True(t, seq2 > seq1) +} + +func TestScanOrderedByKeys(t *testing.T) { + lit, cancel := newTestLithium(t) + defer cancel() + + // put by descending order. + for i := 0xf; i > 0; i-- { + key := []byte(fmt.Sprintf("/events/%016x", i)) + require.NoError(t, lit.Put(key, []byte("v"))) + } + + var last uint64 + // asserts read by ascending order. + ch, _ := lit.Scan([]byte("/events/")) + for ent := range ch { + require.NoError(t, ent.Error()) + + key, _ := ent.Pair() + raw := strings.TrimLeft(strings.TrimPrefix(string(key), "/events/"), "0") + + id, err := strconv.ParseUint(raw, 16, 64) + require.NoError(t, err) + require.True(t, id > last) + + last = id + } +} + +func newTestLithium(t *testing.T) (lit *Lithium, cancel func()) { + path := "/tmp/lithium.unitest.wal" + os.Remove(path) + + lit = NewLithium() + require.NoError(t, lit.Open(path, 0666, time.Second)) + + cancel = func() { + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() + + closed := make(chan struct{}) + go func() { + defer close(closed) + require.NoError(t, lit.Close()) + }() + + select { + case <-ctx.Done(): + require.FailNow(t, "close error: %s", ctx.Err()) + case <-closed: + } + } + + return +} diff --git a/wal2/kv/mocked.go b/wal2/kv/mocked.go new file mode 100644 index 000000000..1b3b912e4 --- /dev/null +++ b/wal2/kv/mocked.go @@ -0,0 +1,137 @@ +package kv + +import ( + "fmt" + "os" + "strings" + "sync" + "time" + + "github.com/cornelk/hashmap" +) + +// MockedKV . +type MockedKV struct { + sync.Mutex + pool hashmap.HashMap + nextSeq uint64 +} + +// NewMockedKV . +func NewMockedKV() *MockedKV { + return &MockedKV{ + nextSeq: 1, + } +} + +// Open . +func (m *MockedKV) Open(path string, mode os.FileMode, timeout time.Duration) error { + return nil +} + +// Close . +func (m *MockedKV) Close() error { + for kv := range m.pool.Iter() { + m.pool.Del(kv.Key) + } + return nil +} + +// NextSequence . +func (m *MockedKV) NextSequence() (nextSeq uint64, err error) { + m.Lock() + defer m.Unlock() + nextSeq = m.nextSeq + m.nextSeq++ + return +} + +// Put . +func (m *MockedKV) Put(key, value []byte) (err error) { + m.pool.Set(string(key), value) + return +} + +// Get . +func (m *MockedKV) Get(key []byte) (value []byte, err error) { + raw, ok := m.pool.GetStringKey(string(key)) + if !ok { + err = fmt.Errorf("no such key: %s", key) + return + } + + if value, ok = raw.([]byte); !ok { + err = fmt.Errorf("value must be a []byte, but %v", raw) + } + + return +} + +// Delete . +func (m *MockedKV) Delete(key []byte) (err error) { + m.pool.Del(string(key)) + return +} + +// Scan . +func (m *MockedKV) Scan(prefix []byte) (<-chan ScanEntry, func()) { + ch := make(chan ScanEntry) + + exit := make(chan struct{}) + abort := func() { + close(exit) + } + + go func() { + defer close(ch) + dataCh := m.pool.Iter() + + for { + select { + case <-exit: + return + case kv := <-dataCh: + var entry MockedScanEntry + var ok bool + + if kv.Key == nil { + return + } + + if entry.Key, ok = kv.Key.(string); !ok { + entry.Err = fmt.Errorf("key must be a string, but %v", kv.Key) + continue + } + + if !strings.HasPrefix(entry.Key, string(prefix)) { + continue + } + + if entry.Value, ok = kv.Value.([]byte); !ok { + entry.Err = fmt.Errorf("value must be a []byte, but %v", kv.Value) + continue + } + ch <- entry + } + } + }() + + return ch, abort +} + +// MockedScanEntry . +type MockedScanEntry struct { + Err error + Key string + Value []byte +} + +// Pair . +func (e MockedScanEntry) Pair() ([]byte, []byte) { + return []byte(e.Key), e.Value +} + +// Error . +func (e MockedScanEntry) Error() error { + return e.Err +} diff --git a/wal2/kv/mocked_test.go b/wal2/kv/mocked_test.go new file mode 100644 index 000000000..8e7b26a70 --- /dev/null +++ b/wal2/kv/mocked_test.go @@ -0,0 +1,47 @@ +package kv + +import ( + "testing" + "time" + + "github.com/stretchr/testify/require" +) + +func TestMockedKV(t *testing.T) { + m := &MockedKV{} + require.NoError(t, m.Open("/tmp/wal", 0777, time.Second)) + + a := []byte("/a") + b := []byte("/b") + expValue := []byte("v") + require.NoError(t, m.Put(a, expValue)) + require.NoError(t, m.Put(b, expValue)) + require.NoError(t, m.Put([]byte("out-of-scan"), expValue)) + + ch, abort := m.Scan([]byte("/")) + elem := <-ch + ent, ok := elem.(MockedScanEntry) + require.True(t, ok) + require.NoError(t, ent.Err) + abort() + + _, abort = m.Scan([]byte("/")) + abort() + + realValue, err := m.Get(a) + require.NoError(t, err) + require.Equal(t, expValue, realValue) + + realValue, err = m.Get(b) + require.NoError(t, err) + require.Equal(t, expValue, realValue) + + require.NoError(t, m.Delete(b)) + realValue, err = m.Get(b) + require.Error(t, err) + + require.NoError(t, m.Close()) + + realValue, err = m.Get(a) + require.Error(t, err) +} diff --git a/wal2/mocks/WAL.go b/wal2/mocks/WAL.go new file mode 100644 index 000000000..ddb349e83 --- /dev/null +++ b/wal2/mocks/WAL.go @@ -0,0 +1,78 @@ +// Code generated by mockery v2.10.0. DO NOT EDIT. + +package mocks + +import ( + context "context" + time "time" + + mock "github.com/stretchr/testify/mock" + + wal2 "github.com/projecteru2/core/wal2" +) + +// WAL is an autogenerated mock type for the WAL type +type WAL struct { + mock.Mock +} + +// Close provides a mock function with given fields: +func (_m *WAL) Close() error { + ret := _m.Called() + + var r0 error + if rf, ok := ret.Get(0).(func() error); ok { + r0 = rf() + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// Log provides a mock function with given fields: _a0, _a1 +func (_m *WAL) Log(_a0 string, _a1 interface{}) (wal2.Commit, error) { + ret := _m.Called(_a0, _a1) + + var r0 wal2.Commit + if rf, ok := ret.Get(0).(func(string, interface{}) wal2.Commit); ok { + r0 = rf(_a0, _a1) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(wal2.Commit) + } + } + + var r1 error + if rf, ok := ret.Get(1).(func(string, interface{}) error); ok { + r1 = rf(_a0, _a1) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// Open provides a mock function with given fields: _a0, _a1 +func (_m *WAL) Open(_a0 string, _a1 time.Duration) error { + ret := _m.Called(_a0, _a1) + + var r0 error + if rf, ok := ret.Get(0).(func(string, time.Duration) error); ok { + r0 = rf(_a0, _a1) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// Recover provides a mock function with given fields: _a0 +func (_m *WAL) Recover(_a0 context.Context) { + _m.Called(_a0) +} + +// Register provides a mock function with given fields: _a0 +func (_m *WAL) Register(_a0 wal2.EventHandler) { + _m.Called(_a0) +} diff --git a/wal2/wal.go b/wal2/wal.go new file mode 100644 index 000000000..18cf4ac83 --- /dev/null +++ b/wal2/wal.go @@ -0,0 +1,49 @@ +package wal2 + +import ( + "context" +) + +const ( + eventPrefix = "/events/" +) + +// WAL is the interface that groups the Register and Recover interfaces. +type WAL interface { + Registry + Recoverer + Logger + OpenCloser +} + +// Recoverer is the interface that wraps the basic Recover method. +type Recoverer interface { + Recover(context.Context) +} + +// Registry is the interface that wraps the basic Register method. +type Registry interface { + Register(EventHandler) +} + +// Logger is the interface that wraps the basic Log method. +type Logger interface { + Log(string, interface{}) (Commit, error) +} + +// Closer is the interface that groups the Close methods. +type OpenCloser interface { + Close() error +} + +// EventHandler is the interface that groups a few methods. +type EventHandler interface { + Typ() string + Check(context.Context, interface{}) (need bool, err error) + Encode(interface{}) ([]byte, error) + Decode([]byte) (interface{}, error) + Handle(context.Context, interface{}) error +} + +// Commit is a function for committing an event log. +type Commit func() error diff --git a/wal2/wal_test.go b/wal2/wal_test.go new file mode 100644 index 000000000..aa46a4aa0 --- /dev/null +++ b/wal2/wal_test.go @@ -0,0 +1,104 @@ +package wal2 + +import ( + "context" + "os" + "testing" + "time" + + "github.com/projecteru2/core/wal2/kv" + + "github.com/stretchr/testify/assert" +) + +func TestRecover(t *testing.T) { + var checked bool + check := func(interface{}) (bool, error) { + checked = true + return true, nil + } + + var handled bool + handle := func(interface{}) (err error) { + handled = true + return + } + + var encoded bool + encode := func(interface{}) (bs []byte, err error) { + encoded = true + return + } + + var decoded bool + decode := func([]byte) (item interface{}, err error) { + decoded = true + return + } + + path := "/tmp/wal.unitest.wal" + os.Remove(path) + + var wal WAL + var err error + wal, err = NewHydro(path, time.Second) + assert.NoError(t, err) + defer wal.Close() + + hydro, ok := wal.(*Hydro) + assert.True(t, ok) + assert.NotNil(t, hydro) + hydro.stor = kv.NewMockedKV() + + eventype := "create" + + wal.Register(simpleEventHandler{ + event: eventype, + encode: encode, + decode: decode, + check: check, + handle: handle, + }) + + wal.Log(eventype, struct{}{}) + + wal.Recover(context.TODO()) + assert.True(t, checked) + assert.True(t, handled) + assert.True(t, encoded) + assert.True(t, decoded) +} + +// simpleEventHandler simply implements the EventHandler. +type simpleEventHandler struct { + event string + check func(raw interface{}) (bool, error) + encode func(interface{}) ([]byte, error) + decode func([]byte) (interface{}, error) + handle func(interface{}) error +} + +// Event . +func (h simpleEventHandler) Typ() string { + return h.event +} + +// Check . +func (h simpleEventHandler) Check(ctx context.Context, raw interface{}) (bool, error) { + return h.check(raw) +} + +// Encode . +func (h simpleEventHandler) Encode(raw interface{}) ([]byte, error) { + return h.encode(raw) +} + +// Decode . +func (h simpleEventHandler) Decode(bs []byte) (interface{}, error) { + return h.decode(bs) +} + +// Handle . +func (h simpleEventHandler) Handle(ctx context.Context, raw interface{}) error { + return h.handle(raw) +}