-
Notifications
You must be signed in to change notification settings - Fork 42
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Co-authored-by: anrs <[email protected]>
- Loading branch information
Showing
14 changed files
with
1,065 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,137 @@ | ||
package wal | ||
|
||
import ( | ||
"context" | ||
"encoding/json" | ||
"strconv" | ||
"strings" | ||
"sync" | ||
"time" | ||
|
||
log "github.com/sirupsen/logrus" | ||
|
||
coretypes "github.com/projecteru2/core/types" | ||
"github.com/projecteru2/core/wal/kv" | ||
) | ||
|
||
// Hydro is the simplest wal implementation. | ||
type Hydro struct { | ||
handlers sync.Map | ||
kv kv.KV | ||
} | ||
|
||
// NewHydro initailizes a new Hydro instance. | ||
func NewHydro() *Hydro { | ||
return &Hydro{ | ||
kv: kv.NewLithium(), | ||
} | ||
} | ||
|
||
// Open connects a kvdb. | ||
func (h *Hydro) Open(ctx context.Context, path string, timeout time.Duration) (err error) { | ||
err = h.kv.Open(ctx, path, 0600, timeout) | ||
return | ||
} | ||
|
||
// Close disconnects the kvdb. | ||
func (h *Hydro) Close(ctx context.Context) error { | ||
return h.kv.Close(ctx) | ||
} | ||
|
||
// Register registers a new event handler. | ||
func (h *Hydro) Register(handler EventHandler) { | ||
h.handlers.Store(handler.Event, handler) | ||
} | ||
|
||
// Recover starts a disaster recovery, which will replay all the events. | ||
func (h *Hydro) Recover(ctx context.Context) { | ||
for ent := range h.kv.Scan(ctx, []byte(EventPrefix)) { | ||
event, err := h.decodeEvent(ent) | ||
if err != nil { | ||
log.Errorf("[Recover] decode event error: %v", err) | ||
continue | ||
} | ||
|
||
handler, ok := h.getEventHandler(event.Type) | ||
if !ok { | ||
log.Errorf("[Recover] no such event handler for %s", event.Type) | ||
continue | ||
} | ||
|
||
if err := h.recover(ctx, handler, event); err != nil { | ||
log.Errorf("[Recover] handle event %d (%s) failed: %v", event.ID, event.Type, err) | ||
continue | ||
} | ||
} | ||
} | ||
|
||
func (h *Hydro) recover(ctx context.Context, handler EventHandler, event HydroEvent) error { | ||
item, err := handler.Decode(event.Item) | ||
if err != nil { | ||
return err | ||
} | ||
|
||
switch handle, err := handler.Check(item); { | ||
case err != nil: | ||
return err | ||
case !handle: | ||
return event.Delete(ctx) | ||
} | ||
|
||
return handler.Handle(item) | ||
} | ||
|
||
// Log records a log item. | ||
func (h *Hydro) Log(ctx context.Context, eventype string, item interface{}) (Commit, error) { | ||
handler, ok := h.getEventHandler(eventype) | ||
if !ok { | ||
return nil, coretypes.NewDetailedErr(coretypes.ErrUnregisteredWALEventType, eventype) | ||
} | ||
|
||
bs, err := handler.Encode(item) | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
||
event := NewHydroEvent(h.kv) | ||
event.Type = eventype | ||
event.Item = bs | ||
|
||
if err = event.Create(ctx); err != nil { | ||
return nil, err | ||
} | ||
|
||
commit := func(context.Context) error { | ||
return event.Delete(ctx) | ||
} | ||
|
||
return commit, nil | ||
} | ||
|
||
func (h *Hydro) getEventHandler(event string) (handler EventHandler, ok bool) { | ||
var raw interface{} | ||
if raw, ok = h.handlers.Load(event); !ok { | ||
return | ||
} | ||
|
||
handler, ok = raw.(EventHandler) | ||
|
||
return | ||
} | ||
|
||
func (h *Hydro) decodeEvent(ent kv.ScanEntry) (event HydroEvent, err error) { | ||
if err = ent.Error(); err != nil { | ||
return | ||
} | ||
|
||
key, value := ent.Pair() | ||
if err = json.Unmarshal(value, &event); err != nil { | ||
return | ||
} | ||
|
||
event.kv = h.kv | ||
|
||
event.ID, err = strconv.ParseUint(strings.TrimPrefix(string(key), EventPrefix), 10, 64) | ||
|
||
return | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,55 @@ | ||
package wal | ||
|
||
import ( | ||
"context" | ||
"encoding/json" | ||
"path/filepath" | ||
"strconv" | ||
|
||
"github.com/projecteru2/core/wal/kv" | ||
) | ||
|
||
// HydroEvent indicates a log event. | ||
type HydroEvent struct { | ||
// A global unique identifier. | ||
ID uint64 `json:"id"` | ||
|
||
// Registered event type name. | ||
Type string `json:"type"` | ||
|
||
// The encoded log item. | ||
Item []byte `json:"item"` | ||
|
||
kv kv.KV | ||
} | ||
|
||
// NewHydroEvent initializes a new HydroEvent instance. | ||
func NewHydroEvent(kv kv.KV) (e *HydroEvent) { | ||
e = &HydroEvent{} | ||
e.kv = kv | ||
return | ||
} | ||
|
||
// Create persists this event. | ||
func (e *HydroEvent) Create(ctx context.Context) (err error) { | ||
if e.ID, err = e.kv.NextSequence(ctx); err != nil { | ||
return | ||
} | ||
|
||
var value []byte | ||
if value, err = json.MarshalIndent(e, "", "\t"); err != nil { | ||
return err | ||
} | ||
|
||
return e.kv.Put(ctx, e.Key(), value) | ||
} | ||
|
||
// Delete removes this event from persistence. | ||
func (e HydroEvent) Delete(ctx context.Context) error { | ||
return e.kv.Delete(ctx, e.Key()) | ||
} | ||
|
||
// Key returns this event's key path. | ||
func (e HydroEvent) Key() []byte { | ||
return []byte(filepath.Join(EventPrefix, strconv.FormatUint(e.ID, 10))) | ||
} |
Oops, something went wrong.