Skip to content

Commit

Permalink
feat: core WAL (#303)
Browse files Browse the repository at this point in the history
Co-authored-by: anrs <[email protected]>
  • Loading branch information
2 people authored and CMGS committed Dec 18, 2020
1 parent 2788601 commit 7e7907e
Show file tree
Hide file tree
Showing 14 changed files with 1,065 additions and 27 deletions.
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ require (
github.com/CMGS/statsd v0.0.0-20160223095033-48c421b3c1ab
github.com/Microsoft/hcsshim v0.8.11 // indirect
github.com/alexcesaro/statsd v2.0.0+incompatible // indirect
github.com/boltdb/bolt v1.3.1
github.com/cenkalti/backoff/v4 v4.0.2
github.com/containerd/containerd v1.4.3 // indirect
github.com/containerd/continuity v0.0.0-20200710164510-efbc4488d8fe // indirect
Expand Down
29 changes: 2 additions & 27 deletions go.sum

Large diffs are not rendered by default.

3 changes: 3 additions & 0 deletions types/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@ type Config struct {
Auth AuthConfig `yaml:"auth"` // grpc auth
GRPCConfig GRPCConfig `yaml:"grpc"` // grpc config

WALFile string `yaml:"wal_file" required:"true" default:"core.wal"` // WAL file path
WALOpenTimeout time.Duration `yaml:"wal_open_timeout" required:"true" default:"8s"` // timeout for opening a WAL file

Git GitConfig `yaml:"git"`
Etcd EtcdConfig `yaml:"etcd"`
Docker DockerConfig `yaml:"docker"`
Expand Down
3 changes: 3 additions & 0 deletions types/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,9 @@ var (

ErrNodeNotExists = errors.New("node not exists")
ErrWorkloadNotExists = errors.New("workload not exists")

ErrUnregisteredWALEventType = errors.New("unregistered WAL event type")
ErrInvalidWALBucket = errors.New("invalid WAL bucket")
)

// NewDetailedErr returns an error with details
Expand Down
137 changes: 137 additions & 0 deletions wal/hydro.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
package wal

import (
"context"
"encoding/json"
"strconv"
"strings"
"sync"
"time"

log "github.com/sirupsen/logrus"

coretypes "github.com/projecteru2/core/types"
"github.com/projecteru2/core/wal/kv"
)

// Hydro is the simplest wal implementation.
type Hydro struct {
handlers sync.Map
kv kv.KV
}

// NewHydro initailizes a new Hydro instance.
func NewHydro() *Hydro {
return &Hydro{
kv: kv.NewLithium(),
}
}

// Open connects a kvdb.
func (h *Hydro) Open(ctx context.Context, path string, timeout time.Duration) (err error) {
err = h.kv.Open(ctx, path, 0600, timeout)
return
}

// Close disconnects the kvdb.
func (h *Hydro) Close(ctx context.Context) error {
return h.kv.Close(ctx)
}

// Register registers a new event handler.
func (h *Hydro) Register(handler EventHandler) {
h.handlers.Store(handler.Event, handler)
}

// Recover starts a disaster recovery, which will replay all the events.
func (h *Hydro) Recover(ctx context.Context) {
for ent := range h.kv.Scan(ctx, []byte(EventPrefix)) {
event, err := h.decodeEvent(ent)
if err != nil {
log.Errorf("[Recover] decode event error: %v", err)
continue
}

handler, ok := h.getEventHandler(event.Type)
if !ok {
log.Errorf("[Recover] no such event handler for %s", event.Type)
continue
}

if err := h.recover(ctx, handler, event); err != nil {
log.Errorf("[Recover] handle event %d (%s) failed: %v", event.ID, event.Type, err)
continue
}
}
}

func (h *Hydro) recover(ctx context.Context, handler EventHandler, event HydroEvent) error {
item, err := handler.Decode(event.Item)
if err != nil {
return err
}

switch handle, err := handler.Check(item); {
case err != nil:
return err
case !handle:
return event.Delete(ctx)
}

return handler.Handle(item)
}

// Log records a log item.
func (h *Hydro) Log(ctx context.Context, eventype string, item interface{}) (Commit, error) {
handler, ok := h.getEventHandler(eventype)
if !ok {
return nil, coretypes.NewDetailedErr(coretypes.ErrUnregisteredWALEventType, eventype)
}

bs, err := handler.Encode(item)
if err != nil {
return nil, err
}

event := NewHydroEvent(h.kv)
event.Type = eventype
event.Item = bs

if err = event.Create(ctx); err != nil {
return nil, err
}

commit := func(context.Context) error {
return event.Delete(ctx)
}

return commit, nil
}

func (h *Hydro) getEventHandler(event string) (handler EventHandler, ok bool) {
var raw interface{}
if raw, ok = h.handlers.Load(event); !ok {
return
}

handler, ok = raw.(EventHandler)

return
}

func (h *Hydro) decodeEvent(ent kv.ScanEntry) (event HydroEvent, err error) {
if err = ent.Error(); err != nil {
return
}

key, value := ent.Pair()
if err = json.Unmarshal(value, &event); err != nil {
return
}

event.kv = h.kv

event.ID, err = strconv.ParseUint(strings.TrimPrefix(string(key), EventPrefix), 10, 64)

return
}
55 changes: 55 additions & 0 deletions wal/hydro_event.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package wal

import (
"context"
"encoding/json"
"path/filepath"
"strconv"

"github.com/projecteru2/core/wal/kv"
)

// HydroEvent indicates a log event.
type HydroEvent struct {
// A global unique identifier.
ID uint64 `json:"id"`

// Registered event type name.
Type string `json:"type"`

// The encoded log item.
Item []byte `json:"item"`

kv kv.KV
}

// NewHydroEvent initializes a new HydroEvent instance.
func NewHydroEvent(kv kv.KV) (e *HydroEvent) {
e = &HydroEvent{}
e.kv = kv
return
}

// Create persists this event.
func (e *HydroEvent) Create(ctx context.Context) (err error) {
if e.ID, err = e.kv.NextSequence(ctx); err != nil {
return
}

var value []byte
if value, err = json.MarshalIndent(e, "", "\t"); err != nil {
return err
}

return e.kv.Put(ctx, e.Key(), value)
}

// Delete removes this event from persistence.
func (e HydroEvent) Delete(ctx context.Context) error {
return e.kv.Delete(ctx, e.Key())
}

// Key returns this event's key path.
func (e HydroEvent) Key() []byte {
return []byte(filepath.Join(EventPrefix, strconv.FormatUint(e.ID, 10)))
}
Loading

0 comments on commit 7e7907e

Please sign in to comment.