-
Notifications
You must be signed in to change notification settings - Fork 42
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
13 changed files
with
1,280 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,42 @@ | ||
package wal2 | ||
|
||
import ( | ||
"encoding/json" | ||
"fmt" | ||
"path/filepath" | ||
"strconv" | ||
"strings" | ||
) | ||
|
||
// HydroEvent indicates a log event. | ||
type HydroEvent struct { | ||
// A global unique identifier. | ||
ID uint64 `json:"id"` | ||
|
||
// Registered event type name. | ||
Type string `json:"type"` | ||
|
||
// The encoded log item. | ||
Item []byte `json:"item"` | ||
} | ||
|
||
// NewHydroEvent initializes a new HydroEvent instance. | ||
func NewHydroEvent(ID uint64, typ string, item []byte) *HydroEvent { | ||
return &HydroEvent{ID: ID, Type: typ, Item: item} | ||
} | ||
|
||
// Encode this event | ||
func (e HydroEvent) Encode() ([]byte, error) { | ||
return json.MarshalIndent(e, "", "\t") | ||
} | ||
|
||
// Key returns this event's key path. | ||
func (e HydroEvent) Key() []byte { | ||
return []byte(filepath.Join(eventPrefix, fmt.Sprintf("%016x", e.ID))) | ||
} | ||
|
||
func parseHydroEventID(key []byte) (uint64, error) { | ||
// Trims the EventPrefix, then trims the padding 0. | ||
id := strings.TrimLeft(strings.TrimPrefix(string(key), eventPrefix), "0") | ||
return strconv.ParseUint(id, 16, 64) | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,146 @@ | ||
package wal2 | ||
|
||
import ( | ||
"context" | ||
"encoding/json" | ||
"time" | ||
|
||
"github.com/cornelk/hashmap" | ||
"github.com/projecteru2/core/log" | ||
coretypes "github.com/projecteru2/core/types" | ||
"github.com/projecteru2/core/wal2/kv" | ||
) | ||
|
||
const ( | ||
fileMode = 0600 | ||
) | ||
|
||
// Hydro is the simplest wal implementation. | ||
type Hydro struct { | ||
hashmap.HashMap | ||
stor kv.KV | ||
} | ||
|
||
// NewHydro initailizes a new Hydro instance. | ||
func NewHydro(path string, timeout time.Duration) (*Hydro, error) { | ||
stor := kv.NewLithium() | ||
if err := stor.Open(path, fileMode, timeout); err != nil { | ||
return nil, err | ||
} | ||
return &Hydro{HashMap: hashmap.HashMap{}, stor: stor}, nil | ||
} | ||
|
||
// Close disconnects the kvdb. | ||
func (h *Hydro) Close() error { | ||
return h.stor.Close() | ||
} | ||
|
||
// Register registers a new event handler. | ||
func (h *Hydro) Register(handler EventHandler) { | ||
h.Set(handler.Typ(), handler) | ||
} | ||
|
||
// Recover starts a disaster recovery, which will replay all the events. | ||
func (h *Hydro) Recover(ctx context.Context) { | ||
ch, _ := h.stor.Scan([]byte(eventPrefix)) | ||
|
||
events := []HydroEvent{} | ||
for scanEntry := range ch { | ||
event, err := h.decodeEvent(scanEntry) | ||
if err != nil { | ||
log.Errorf(nil, "[Recover] decode event error: %v", err) // nolint | ||
continue | ||
} | ||
events = append(events, event) | ||
} | ||
|
||
for _, event := range events { | ||
handler, ok := h.getEventHandler(event.Type) | ||
if !ok { | ||
log.Errorf(nil, "[Recover] no such event handler for %s", event.Type) // nolint | ||
continue | ||
} | ||
|
||
if err := h.recover(ctx, handler, event); err != nil { | ||
log.Errorf(nil, "[Recover] handle event %d (%s) failed: %v", event.ID, event.Type, err) // nolint | ||
continue | ||
} | ||
} | ||
} | ||
|
||
// Log records a log item. | ||
func (h *Hydro) Log(eventyp string, item interface{}) (Commit, error) { | ||
handler, ok := h.getEventHandler(eventyp) | ||
if !ok { | ||
return nil, coretypes.NewDetailedErr(coretypes.ErrUnregisteredWALEventType, eventyp) | ||
} | ||
|
||
bs, err := handler.Encode(item) // TODO 2 times encode is necessary? | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
||
var id uint64 | ||
if id, err = h.stor.NextSequence(); err != nil { | ||
return nil, err | ||
} | ||
|
||
event := NewHydroEvent(id, eventyp, bs) | ||
if bs, err = event.Encode(); err != nil { | ||
return nil, coretypes.ErrBadWALEvent | ||
} | ||
|
||
if err = h.stor.Put(event.Key(), bs); err != nil { | ||
return nil, err | ||
} | ||
|
||
return func() error { | ||
return h.stor.Delete(event.Key()) | ||
}, nil | ||
} | ||
|
||
func (h *Hydro) recover(ctx context.Context, handler EventHandler, event HydroEvent) error { | ||
item, err := handler.Decode(event.Item) | ||
if err != nil { | ||
return err | ||
} | ||
|
||
delete := func() error { | ||
return h.stor.Delete(event.Key()) | ||
} | ||
|
||
switch handle, err := handler.Check(ctx, item); { | ||
case err != nil: | ||
return err | ||
case !handle: | ||
return delete() | ||
default: | ||
if err := handler.Handle(ctx, item); err != nil { | ||
return err | ||
} | ||
} | ||
return delete() | ||
} | ||
|
||
func (h *Hydro) getEventHandler(eventyp string) (EventHandler, bool) { | ||
v, ok := h.GetStringKey(eventyp) | ||
if !ok { | ||
return nil, ok | ||
} | ||
handler, ok := v.(EventHandler) | ||
return handler, ok | ||
} | ||
|
||
func (h *Hydro) decodeEvent(scanEntry kv.ScanEntry) (event HydroEvent, err error) { | ||
if err = scanEntry.Error(); err != nil { | ||
return | ||
} | ||
|
||
key, value := scanEntry.Pair() | ||
if err = json.Unmarshal(value, &event); err != nil { | ||
return | ||
} | ||
|
||
event.ID, err = parseHydroEventID(key) | ||
return | ||
} |
Oops, something went wrong.