Skip to content

Commit

Permalink
refactor wal2
Browse files Browse the repository at this point in the history
  • Loading branch information
CMGS committed Mar 11, 2022
1 parent 1b765fa commit a9af172
Show file tree
Hide file tree
Showing 13 changed files with 1,276 additions and 0 deletions.
1 change: 1 addition & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ mock: deps
mockery --dir store --output store/mocks --name Store
mockery --dir engine --output engine/mocks --name API
mockery --dir cluster --output cluster/mocks --name Cluster
mockery --dir wal2 --output wal2/mocks --name WAL
mockery --dir lock --output lock/mocks --name DistributedLock
mockery --dir store/etcdv3/meta --output store/etcdv3/meta/mocks --all
mockery --dir vendor/go.etcd.io/etcd/client/v3 --output store/etcdv3/meta/mocks --name Txn
Expand Down
1 change: 1 addition & 0 deletions types/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ var (
ErrEntityNotExists = errors.New("entity not exists")

ErrUnregisteredWALEventType = errors.New("unregistered WAL event type")
ErrBadWALEvent = errors.New("bad WAL event type")
ErrInvalidWALBucket = errors.New("invalid WAL bucket")
ErrInvalidType = errors.New("invalid type")
ErrLockSessionDone = errors.New("lock session done")
Expand Down
42 changes: 42 additions & 0 deletions wal2/event.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package wal2

import (
"encoding/json"
"fmt"
"path/filepath"
"strconv"
"strings"
)

// HydroEvent indicates a log event.
type HydroEvent struct {
// A global unique identifier.
ID uint64 `json:"id"`

// Registered event type name.
Type string `json:"type"`

// The encoded log item.
Item []byte `json:"item"`
}

// NewHydroEvent initializes a new HydroEvent instance.
func NewHydroEvent(ID uint64, typ string, item []byte) *HydroEvent {
return &HydroEvent{ID: ID, Type: typ, Item: item}
}

// Encode this event
func (e HydroEvent) Encode() ([]byte, error) {
return json.MarshalIndent(e, "", "\t")
}

// Key returns this event's key path.
func (e HydroEvent) Key() []byte {
return []byte(filepath.Join(eventPrefix, fmt.Sprintf("%016x", e.ID)))
}

func parseHydroEventID(key []byte) (uint64, error) {
// Trims the EventPrefix, then trims the padding 0.
id := strings.TrimLeft(strings.TrimPrefix(string(key), eventPrefix), "0")
return strconv.ParseUint(id, 16, 64)
}
142 changes: 142 additions & 0 deletions wal2/hydro.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
package wal2

import (
"context"
"encoding/json"
"time"

"github.com/cornelk/hashmap"
"github.com/projecteru2/core/log"
coretypes "github.com/projecteru2/core/types"
"github.com/projecteru2/core/wal2/kv"
)

const (
fileMode = 0600
)

// Hydro is the simplest wal implementation.
type Hydro struct {
hashmap.HashMap
stor kv.KV
}

// NewHydro initailizes a new Hydro instance.
func NewHydro(path string, timeout time.Duration) (*Hydro, error) {
stor := kv.NewLithium()
if err := stor.Open(path, fileMode, timeout); err != nil {
return nil, err
}
return &Hydro{HashMap: hashmap.HashMap{}, stor: stor}, nil
}

// Close disconnects the kvdb.
func (h *Hydro) Close() error {
return h.stor.Close()
}

// Register registers a new event handler.
func (h *Hydro) Register(handler EventHandler) {
h.Set(handler.Typ(), handler)
}

// Recover starts a disaster recovery, which will replay all the events.
func (h *Hydro) Recover(ctx context.Context) {
ch, _ := h.stor.Scan([]byte(eventPrefix))

for scanEntry := range ch {
event, err := h.decodeEvent(scanEntry)
if err != nil {
log.Errorf(nil, "[Recover] decode event error: %v", err) // nolint
continue
}

handler, ok := h.getEventHandler(event.Type)
if !ok {
log.Errorf(nil, "[Recover] no such event handler for %s", event.Type) // nolint
continue
}

if err := h.recover(ctx, handler, event); err != nil {
log.Errorf(nil, "[Recover] handle event %d (%s) failed: %v", event.ID, event.Type, err) // nolint
continue
}
}
}

// Log records a log item.
func (h *Hydro) Log(eventyp string, item interface{}) (Commit, error) {
handler, ok := h.getEventHandler(eventyp)
if !ok {
return nil, coretypes.NewDetailedErr(coretypes.ErrUnregisteredWALEventType, eventyp)
}

bs, err := handler.Encode(item) // TODO 2 times encode is necessary?
if err != nil {
return nil, err
}

var id uint64
if id, err = h.stor.NextSequence(); err != nil {
return nil, err
}

event := NewHydroEvent(id, eventyp, bs)
if bs, err = event.Encode(); err != nil {
return nil, coretypes.ErrBadWALEvent
}

if err = h.stor.Put(event.Key(), bs); err != nil {
return nil, err
}

return func() error {
return h.stor.Delete(event.Key())
}, nil
}

func (h *Hydro) recover(ctx context.Context, handler EventHandler, event HydroEvent) error {
item, err := handler.Decode(event.Item)
if err != nil {
return err
}

delete := func() error {
return h.stor.Delete(event.Key())
}

switch handle, err := handler.Check(ctx, item); {
case err != nil:
return err
case !handle:
return delete()
default:
if err := handler.Handle(ctx, item); err != nil {
return err
}
}
return delete()
}

func (h *Hydro) getEventHandler(eventyp string) (EventHandler, bool) {
v, ok := h.GetStringKey(eventyp)
if !ok {
return nil, ok
}
handler, ok := v.(EventHandler)
return handler, ok
}

func (h *Hydro) decodeEvent(scanEntry kv.ScanEntry) (event HydroEvent, err error) {
if err = scanEntry.Error(); err != nil {
return
}

key, value := scanEntry.Pair()
if err = json.Unmarshal(value, &event); err != nil {
return
}

event.ID, err = parseHydroEventID(key)
return
}
Loading

0 comments on commit a9af172

Please sign in to comment.