Skip to content

Commit

Permalink
Replace storage prefixes with tables
Browse files Browse the repository at this point in the history
Add rocksdb column families
  • Loading branch information
aalda committed Mar 27, 2019
1 parent c80a4a5 commit 248a2d7
Show file tree
Hide file tree
Showing 26 changed files with 545 additions and 530 deletions.
4 changes: 2 additions & 2 deletions balloon/balloon.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ func (b Balloon) Version() uint64 {

func (b *Balloon) RefreshVersion() error {
// get last stored version
kv, err := b.store.GetLast(storage.HistoryCachePrefix)
kv, err := b.store.GetLast(storage.HistoryCacheTable)
if err != nil {
if err != storage.ErrKeyNotFound {
return err
Expand Down Expand Up @@ -263,7 +263,7 @@ func (b Balloon) QueryDigestMembership(keyDigest hashing.Digest, version uint64)
version = proof.CurrentVersion
}

leaf, err = b.store.Get(storage.IndexPrefix, proof.KeyDigest)
leaf, err = b.store.Get(storage.IndexTable, proof.KeyDigest)
switch {
case err != nil && err != storage.ErrKeyNotFound:
return nil, fmt.Errorf("error reading leaf %v data: %v", proof.KeyDigest, err)
Expand Down
6 changes: 3 additions & 3 deletions balloon/balloon_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,18 +217,18 @@ func TestTamperAndVerify(t *testing.T) {
assert.NoError(t, err)
assert.True(t, memProof.Verify(event, snapshot), "The proof should verify correctly")

original, err := store.Get(storage.IndexPrefix, eventDigest)
original, err := store.Get(storage.IndexTable, eventDigest)
assert.NoError(t, err)

tpBytes := util.Uint64AsBytes(^uint64(0))

assert.NoError(t, store.Mutate(
[]*storage.Mutation{
{storage.IndexPrefix, eventDigest, tpBytes},
{storage.IndexTable, eventDigest, tpBytes},
},
), "store add returned non nil value")

tampered, _ := store.Get(storage.IndexPrefix, eventDigest)
tampered, _ := store.Get(storage.IndexTable, eventDigest)
assert.Equal(t, tpBytes, tampered.Value, "Tamper unsuccessful")
assert.NotEqual(t, original.Value, tampered.Value, "Tamper unsuccessful")

Expand Down
8 changes: 4 additions & 4 deletions balloon/cache/lru.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,16 @@ type entry struct {
}

type LruReadThroughCache struct {
prefix byte
table storage.Table
store storage.Store
size int
items map[[lruKeySize]byte]*list.Element
evictList *list.List
}

func NewLruReadThroughCache(prefix byte, store storage.Store, cacheSize uint16) *LruReadThroughCache {
func NewLruReadThroughCache(table storage.Table, store storage.Store, cacheSize uint16) *LruReadThroughCache {
return &LruReadThroughCache{
prefix: prefix,
table: table,
store: store,
size: int(cacheSize),
items: make(map[[lruKeySize]byte]*list.Element),
Expand All @@ -36,7 +36,7 @@ func (c LruReadThroughCache) Get(key []byte) ([]byte, bool) {
copy(k[:], key)
e, ok := c.items[k]
if !ok {
pair, err := c.store.Get(c.prefix, key)
pair, err := c.store.Get(c.table, key)
if err != nil {
return nil, false
}
Expand Down
12 changes: 6 additions & 6 deletions balloon/cache/passthrough.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,19 +21,19 @@ import (
)

type PassThroughCache struct {
prefix byte
store storage.Store
table storage.Table
store storage.Store
}

func NewPassThroughCache(prefix byte, store storage.Store) *PassThroughCache {
func NewPassThroughCache(table storage.Table, store storage.Store) *PassThroughCache {
return &PassThroughCache{
prefix: prefix,
store: store,
table: table,
store: store,
}
}

func (c PassThroughCache) Get(key []byte) ([]byte, bool) {
pair, err := c.store.Get(c.prefix, key)
pair, err := c.store.Get(c.table, key)
if err != nil {
return nil, false
}
Expand Down
6 changes: 3 additions & 3 deletions balloon/cache/passthrough_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,13 +39,13 @@ func TestPassThroughCache(t *testing.T) {

store, closeF := storage_utils.OpenBPlusTreeStore()
defer closeF()
prefix := byte(0x0)
cache := NewPassThroughCache(prefix, store)
table := storage.IndexTable
cache := NewPassThroughCache(table, store)

for i, c := range testCases {
if c.cached {
err := store.Mutate([]*storage.Mutation{
{prefix, c.key, c.value},
{table, c.key, c.value},
})
require.NoError(t, err)
}
Expand Down
18 changes: 9 additions & 9 deletions balloon/history/insert_visitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,19 +25,19 @@ import (
)

type insertVisitor struct {
hasher hashing.Hasher
cache cache.ModifiableCache
storagePrefix byte // TODO shall i remove this?
hasher hashing.Hasher
cache cache.ModifiableCache
storageTable storage.Table // TODO shall i remove this?

mutations []*storage.Mutation
}

func newInsertVisitor(hasher hashing.Hasher, cache cache.ModifiableCache, storagePrefix byte) *insertVisitor {
func newInsertVisitor(hasher hashing.Hasher, cache cache.ModifiableCache, storageTable storage.Table) *insertVisitor {
return &insertVisitor{
hasher: hasher,
cache: cache,
storagePrefix: storagePrefix,
mutations: make([]*storage.Mutation, 0),
hasher: hasher,
cache: cache,
storageTable: storageTable,
mutations: make([]*storage.Mutation, 0),
}
}

Expand Down Expand Up @@ -76,7 +76,7 @@ func (v *insertVisitor) VisitPutCacheOp(op putCacheOp) hashing.Digest {

func (v *insertVisitor) VisitMutateOp(op mutateOp) hashing.Digest {
hash := op.operation.Accept(v)
v.mutations = append(v.mutations, storage.NewMutation(v.storagePrefix, op.Position().Bytes(), hash))
v.mutations = append(v.mutations, storage.NewMutation(v.storageTable, op.Position().Bytes(), hash))
return hash
}

Expand Down
26 changes: 13 additions & 13 deletions balloon/history/insert_visitor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,24 +44,24 @@ func TestInsertVisitor(t *testing.T) {
))),
expectedMutations: []*storage.Mutation{
{
Prefix: storage.HistoryCachePrefix,
Key: pos(7, 0).Bytes(),
Value: []byte{7},
Table: storage.HistoryCacheTable,
Key: pos(7, 0).Bytes(),
Value: []byte{7},
},
{
Prefix: storage.HistoryCachePrefix,
Key: pos(6, 1).Bytes(),
Value: []byte{7},
Table: storage.HistoryCacheTable,
Key: pos(6, 1).Bytes(),
Value: []byte{7},
},
{
Prefix: storage.HistoryCachePrefix,
Key: pos(4, 2).Bytes(),
Value: []byte{7},
Table: storage.HistoryCacheTable,
Key: pos(4, 2).Bytes(),
Value: []byte{7},
},
{
Prefix: storage.HistoryCachePrefix,
Key: pos(0, 3).Bytes(),
Value: []byte{7},
Table: storage.HistoryCacheTable,
Key: pos(0, 3).Bytes(),
Value: []byte{7},
},
},
expectedElements: []*cachedElement{
Expand All @@ -75,7 +75,7 @@ func TestInsertVisitor(t *testing.T) {

for i, c := range testCases {
cache := cache.NewFakeCache([]byte{0x0})
visitor := newInsertVisitor(hashing.NewFakeXorHasher(), cache, storage.HistoryCachePrefix)
visitor := newInsertVisitor(hashing.NewFakeXorHasher(), cache, storage.HistoryCacheTable)

c.op.Accept(visitor)

Expand Down
6 changes: 3 additions & 3 deletions balloon/history/tree.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,10 @@ type HistoryTree struct {
func NewHistoryTree(hasherF func() hashing.Hasher, store storage.Store, cacheSize uint16) *HistoryTree {

// create cache for Adding
writeCache := cache.NewLruReadThroughCache(storage.HistoryCachePrefix, store, cacheSize)
writeCache := cache.NewLruReadThroughCache(storage.HistoryCacheTable, store, cacheSize)

// create cache for Membership and Incremental
readCache := cache.NewPassThroughCache(storage.HistoryCachePrefix, store)
readCache := cache.NewPassThroughCache(storage.HistoryCacheTable, store)

return &HistoryTree{
hasherF: hasherF,
Expand All @@ -51,7 +51,7 @@ func (t *HistoryTree) Add(eventDigest hashing.Digest, version uint64) (hashing.D
log.Debugf("Adding new event digest %x with version %d", eventDigest, version)

// build a visitable pruned tree and then visit it to generate the root hash
visitor := newInsertVisitor(t.hasher, t.writeCache, storage.HistoryCachePrefix)
visitor := newInsertVisitor(t.hasher, t.writeCache, storage.HistoryCacheTable)
rh := pruneToInsert(version, eventDigest).Accept(visitor)

return rh, visitor.Result(), nil
Expand Down
32 changes: 16 additions & 16 deletions balloon/hyper/insert_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -340,8 +340,8 @@ func TestInsertInterpretation(t *testing.T) {
storedBatches: map[string][]byte{},
expectedMutations: []*storage.Mutation{
{
Prefix: storage.HyperCachePrefix,
Key: pos(0, 4).Bytes(),
Table: storage.HyperCacheTable,
Key: pos(0, 4).Bytes(),
Value: []byte{
0xe0, 0x00, 0x00, 0x00, // bitmap: 11100000 00000000 00000000 00000000
0x00, 0x01, // iBatch 0 -> hash=0x00 (shortcut index=0)
Expand Down Expand Up @@ -388,8 +388,8 @@ func TestInsertInterpretation(t *testing.T) {
},
expectedMutations: []*storage.Mutation{
{
Prefix: storage.HyperCachePrefix,
Key: pos(0, 4).Bytes(),
Table: storage.HyperCacheTable,
Key: pos(0, 4).Bytes(),
Value: []byte{
0xe0, 0x00, 0x00, 0x00, // bitmap: 11100000 00000000 00000000 00000000
0x00, 0x01, // iBatch 0 -> hash=0x00 (shortcut index=0)
Expand Down Expand Up @@ -437,8 +437,8 @@ func TestInsertInterpretation(t *testing.T) {
},
expectedMutations: []*storage.Mutation{
{
Prefix: storage.HyperCachePrefix,
Key: pos(1, 0).Bytes(),
Table: storage.HyperCacheTable,
Key: pos(1, 0).Bytes(),
Value: []byte{
0xe0, 0x00, 0x00, 0x00, // bitmap: 11100000 00000000 00000000 00000000
0x01, 0x01, // iBatch 0 -> hash=0x01 (shortcut index=0)
Expand All @@ -447,8 +447,8 @@ func TestInsertInterpretation(t *testing.T) {
},
},
{
Prefix: storage.HyperCachePrefix,
Key: pos(0, 0).Bytes(),
Table: storage.HyperCacheTable,
Key: pos(0, 0).Bytes(),
Value: []byte{
0xe0, 0x00, 0x00, 0x00, // bitmap: 11100000 00000000 00000000 00000000
0x00, 0x01, // iBatch 0 -> hash=0x00 (shortcut index=0)
Expand All @@ -457,8 +457,8 @@ func TestInsertInterpretation(t *testing.T) {
},
},
{
Prefix: storage.HyperCachePrefix,
Key: pos(0, 4).Bytes(),
Table: storage.HyperCacheTable,
Key: pos(0, 4).Bytes(),
Value: []byte{
0xd1, 0x01, 0x80, 0x00, // bitmap: 11010001 00000001 10000000 00000000
0x01, 0x00, // iBatch 0 -> hash=0x01
Expand Down Expand Up @@ -509,8 +509,8 @@ func TestInsertInterpretation(t *testing.T) {
},
expectedMutations: []*storage.Mutation{
{
Prefix: storage.HyperCachePrefix,
Key: pos(0, 4).Bytes(),
Table: storage.HyperCacheTable,
Key: pos(0, 4).Bytes(),
Value: []byte{
0xfe, 0x00, 0x00, 0x00, // bitmap: 11111110 00000000 00000000 00000000
0x08, 0x00, // iBatch 0 -> hash=0x08
Expand Down Expand Up @@ -566,8 +566,8 @@ func TestInsertInterpretation(t *testing.T) {
},
expectedMutations: []*storage.Mutation{
{
Prefix: storage.HyperCachePrefix,
Key: pos(0, 4).Bytes(),
Table: storage.HyperCacheTable,
Key: pos(0, 4).Bytes(),
Value: []byte{
0xfe, 0x1e, 0x00, 0x00, // bitmap: 11111110 00011110 00000000 00000000
0x04, 0x00, // iBatch 0 -> hash=0x08
Expand Down Expand Up @@ -622,8 +622,8 @@ func TestInsertInterpretation(t *testing.T) {
},
expectedMutations: []*storage.Mutation{
{
Prefix: storage.HyperCachePrefix,
Key: pos(128, 4).Bytes(),
Table: storage.HyperCacheTable,
Key: pos(128, 4).Bytes(),
Value: []byte{
0xe0, 0x00, 0x00, 0x00, // bitmap: 11100000 00000000 00000000 00000000
0x80, 0x01, // iBatch 0 -> hash=0x80 (shortcut index=128)
Expand Down
2 changes: 1 addition & 1 deletion balloon/hyper/loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func (l defaultBatchLoader) loadBatchFromCache(pos position) *batchNode {
}

func (l defaultBatchLoader) loadBatchFromStore(pos position) *batchNode {
kv, err := l.store.Get(storage.HyperCachePrefix, pos.Bytes())
kv, err := l.store.Get(storage.HyperCacheTable, pos.Bytes())
if err != nil {
if err == storage.ErrKeyNotFound {
return newEmptyBatchNode(len(pos.Index))
Expand Down
2 changes: 1 addition & 1 deletion balloon/hyper/operation.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ func mutateBatch(pos position, batch *batchNode) *operation {
Pos: pos,
Interpret: func(ops *operationsStack, c *pruningContext) hashing.Digest {
hash := ops.Pop().Interpret(ops, c)
c.Mutations = append(c.Mutations, storage.NewMutation(storage.HyperCachePrefix, pos.Bytes(), batch.Serialize()))
c.Mutations = append(c.Mutations, storage.NewMutation(storage.HyperCacheTable, pos.Bytes(), batch.Serialize()))
return hash
},
}
Expand Down
4 changes: 2 additions & 2 deletions balloon/hyper/tree.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ func (t *HyperTree) Add(eventDigest hashing.Digest, version uint64) (hashing.Dig
rh := ops.Pop().Interpret(ops, ctx)

// create a mutation for the new leaf
leafMutation := storage.NewMutation(storage.IndexPrefix, eventDigest, versionAsBytes)
leafMutation := storage.NewMutation(storage.IndexTable, eventDigest, versionAsBytes)

// collect mutations
mutations := append(ctx.Mutations, leafMutation)
Expand Down Expand Up @@ -134,7 +134,7 @@ func (t *HyperTree) RebuildCache() {
end := make([]byte, 2+t.hasher.Len()/8)
start[1] = byte(t.cacheHeightLimit)
end[1] = byte(t.cacheHeightLimit + 1)
nodes, err := t.store.GetRange(storage.HyperCachePrefix, start, end)
nodes, err := t.store.GetRange(storage.HyperCacheTable, start, end)
if err != nil {
log.Fatalf("Oops, something went wrong: %v", err)
}
Expand Down
12 changes: 6 additions & 6 deletions balloon/hyper/tree_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ func TestProveMembership(t *testing.T) {
require.NoErrorf(t, err, "This should not fail for index %d", i)
}

leaf, err := store.Get(storage.IndexPrefix, searchedDigest)
leaf, err := store.Get(storage.IndexTable, searchedDigest)
require.NoErrorf(t, err, "No leaf with digest %v", err)

proof, err := tree.QueryMembership(leaf.Key, leaf.Value)
Expand Down Expand Up @@ -161,7 +161,7 @@ func TestAddAndVerify(t *testing.T) {
require.NoErrorf(t, err, "Add operation should not fail for index %d", i)
tree.store.Mutate(mutations)

leaf, err := store.Get(storage.IndexPrefix, key)
leaf, err := store.Get(storage.IndexTable, key)
require.NoErrorf(t, err, "No leaf with key %d: %v", key, err)

proof, err := tree.QueryMembership(leaf.Key, leaf.Value)
Expand Down Expand Up @@ -202,8 +202,8 @@ func TestDeterministicAdd(t *testing.T) {
}

// check index store equality
reader11 := store1.GetAll(storage.IndexPrefix)
reader21 := store2.GetAll(storage.IndexPrefix)
reader11 := store1.GetAll(storage.IndexTable)
reader21 := store2.GetAll(storage.IndexTable)
defer reader11.Close()
defer reader21.Close()
buff11 := make([]*storage.KVPair, 0)
Expand All @@ -227,8 +227,8 @@ func TestDeterministicAdd(t *testing.T) {
require.Equalf(t, buff11, buff21, "The stored indexes should be equal")

// check cache store equality
reader12 := store1.GetAll(storage.HyperCachePrefix)
reader22 := store2.GetAll(storage.HyperCachePrefix)
reader12 := store1.GetAll(storage.HyperCacheTable)
reader22 := store2.GetAll(storage.HyperCacheTable)
defer reader12.Close()
defer reader22.Close()
buff12 := make([]*storage.KVPair, 0)
Expand Down
4 changes: 2 additions & 2 deletions raftwal/fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ type BalloonFSM struct {

func loadState(s storage.ManagedStore) (*fsmState, error) {
var state fsmState
kvstate, err := s.Get(storage.FSMStatePrefix, []byte{0xab})
kvstate, err := s.Get(storage.FSMStateTable, []byte{0xab})
if err == storage.ErrKeyNotFound {
log.Infof("Unable to find previous state: assuming a clean instance")
return &fsmState{0, 0, 0}, nil
Expand Down Expand Up @@ -255,7 +255,7 @@ func (fsm *BalloonFSM) applyAdd(event []byte, state *fsmState) *fsmAddResponse {
return &fsmAddResponse{error: err}
}

mutations = append(mutations, storage.NewMutation(storage.FSMStatePrefix, []byte{0xab}, stateBuff.Bytes()))
mutations = append(mutations, storage.NewMutation(storage.FSMStateTable, []byte{0xab}, stateBuff.Bytes()))
err = fsm.store.Mutate(mutations)
if err != nil {
return &fsmAddResponse{error: err}
Expand Down
Loading

0 comments on commit 248a2d7

Please sign in to comment.