Skip to content

Commit

Permalink
Add agent's local store for snapshots
Browse files Browse the repository at this point in the history
  • Loading branch information
aalda committed Dec 3, 2018
1 parent e650307 commit e2859d1
Show file tree
Hide file tree
Showing 2 changed files with 90 additions and 0 deletions.
70 changes: 70 additions & 0 deletions gossip/store.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
package gossip

import (
"bytes"

"github.com/bbva/qed/protocol"

"github.com/bbva/qed/util"
"github.com/google/btree"
)

type LocalStore interface {
Put(version uint64, snapshot protocol.Snapshot) error
GetRange(start, end uint64) ([]protocol.Snapshot, error)
DeleteRange(start, end uint64) error
}

type BPlusTreeStore struct {
db *btree.BTree
}

type StoreItem struct {
Key, Value []byte
}

func (p StoreItem) Less(b btree.Item) bool {
return bytes.Compare(p.Key, b.(StoreItem).Key) < 0
}

func (s *BPlusTreeStore) Put(version uint64, snapshot protocol.Snapshot) error {
encoded, err := snapshot.Encode()
if err != nil {
return err
}
s.db.ReplaceOrInsert(StoreItem{util.Uint64AsBytes(version), encoded})
return nil
}

func (s BPlusTreeStore) GetRange(start, end uint64) ([]protocol.Snapshot, error) {
result := make([]protocol.Snapshot, 0)
startKey := util.Uint64AsBytes(start)
endKey := util.Uint64AsBytes(end)
s.db.AscendGreaterOrEqual(StoreItem{startKey, nil}, func(i btree.Item) bool {
key := i.(StoreItem).Key
if bytes.Compare(key, endKey) > 0 {
return false
}
var snapshot protocol.Snapshot
if err := snapshot.Decode(i.(StoreItem).Value); err != nil {
return false
}
result = append(result, snapshot)
return true
})
return result, nil
}

func (s *BPlusTreeStore) DeleteRange(start, end uint64) error {
startKey := util.Uint64AsBytes(start)
endKey := util.Uint64AsBytes(end)
s.db.AscendGreaterOrEqual(StoreItem{startKey, nil}, func(i btree.Item) bool {
key := i.(StoreItem).Key
if bytes.Compare(key, endKey) > 0 {
return false
}
s.db.Delete(i)
return true
})
return nil
}
20 changes: 20 additions & 0 deletions protocol/protocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,26 @@ type Source struct {
Role string
}

func (b *Snapshot) Encode() ([]byte, error) {
var buf bytes.Buffer
encoder := codec.NewEncoder(&buf, &codec.MsgpackHandle{})
if err := encoder.Encode(b); err != nil {
log.Errorf("Failed to encode message: %v", err)
return nil, err
}
return buf.Bytes(), nil
}

func (b *Snapshot) Decode(msg []byte) error {
reader := bytes.NewReader(msg)
decoder := codec.NewDecoder(reader, &codec.MsgpackHandle{})
if err := decoder.Decode(b); err != nil {
log.Errorf("Failed to decode snapshot batch: %v", err)
return err
}
return nil
}

func (b *BatchSnapshots) Encode() ([]byte, error) {
var buf bytes.Buffer
encoder := codec.NewEncoder(&buf, &codec.MsgpackHandle{})
Expand Down

0 comments on commit e2859d1

Please sign in to comment.