Skip to content

Commit

Permalink
Merge pull request #97 from aalda/remove_index
Browse files Browse the repository at this point in the history
Remove index table.
  • Loading branch information
iknite authored Apr 2, 2019
2 parents e563f63 + 0593929 commit 21a7e39
Show file tree
Hide file tree
Showing 16 changed files with 135 additions and 289 deletions.
57 changes: 21 additions & 36 deletions balloon/balloon.go
Original file line number Diff line number Diff line change
Expand Up @@ -247,13 +247,7 @@ func (b Balloon) QueryDigestMembership(keyDigest hashing.Digest, version uint64)
stats.AddFloat("QueryMembership", 1)

var proof MembershipProof
var wg sync.WaitGroup
var hyperErr, historyErr error
var hyperProof *hyper.QueryProof
var historyProof *history.MembershipProof
var leaf *storage.KVPair
var err error

proof.Hasher = b.hasherF()
proof.KeyDigest = keyDigest
proof.QueryVersion = version
Expand All @@ -263,45 +257,36 @@ func (b Balloon) QueryDigestMembership(keyDigest hashing.Digest, version uint64)
version = proof.CurrentVersion
}

leaf, err = b.store.Get(storage.IndexTable, proof.KeyDigest)
switch {
case err != nil && err != storage.ErrKeyNotFound:
return nil, fmt.Errorf("error reading leaf %v data: %v", proof.KeyDigest, err)
proof.HyperProof, err = b.hyperTree.QueryMembership(keyDigest)
if err != nil {
return nil, fmt.Errorf("unable to get proof from hyper tree: %v", err)
}

case err != nil && err == storage.ErrKeyNotFound:
if len(proof.HyperProof.Value) == 0 {
proof.Exists = false
proof.ActualVersion = version
leaf = &storage.KVPair{Key: keyDigest, Value: util.Uint64AsBytes(version)}

case err == nil:
proof.Exists = true
proof.ActualVersion = util.BytesAsUint64(leaf.Value)

if proof.ActualVersion <= version {
wg.Add(1)
go func() {
defer wg.Done()
historyProof, historyErr = b.historyTree.ProveMembership(proof.ActualVersion, version)
}()
} else {
return nil, fmt.Errorf("query version %d is greater than the actual version which is %d", version, proof.ActualVersion)
}

return &proof, nil
}

hyperProof, hyperErr = b.hyperTree.QueryMembership(leaf.Key, leaf.Value)

wg.Wait()
if hyperErr != nil {
return nil, fmt.Errorf("unable to get proof from hyper tree: %v", err)
proof.Exists = true
if versionLen := len(proof.HyperProof.Value); versionLen < 8 { // TODO GET RID OF THIS: used only to pass tests
// the version is stored in the hyper tree with the length of the event digest
// if the length of the value is less than the length of a uint64 in bytes, we have to add padding
proof.ActualVersion = util.BytesAsUint64(util.AddPaddingToBytes(proof.HyperProof.Value, 8-versionLen))
} else {
// if the length of the value is greater or equal than the length of a uint64 in bytes, we have to truncate
proof.ActualVersion = util.BytesAsUint64(proof.HyperProof.Value[versionLen-8:])
}

if historyErr != nil {
return nil, fmt.Errorf("unable to get proof from history tree: %v", err)
if proof.ActualVersion <= version {
proof.HistoryProof, err = b.historyTree.ProveMembership(proof.ActualVersion, version)
if err != nil {
return nil, fmt.Errorf("unable to get proof from history tree: %v", err)
}
} else {
return nil, fmt.Errorf("query version %d is greater than the actual version which is %d", version, proof.ActualVersion)
}

proof.HyperProof = hyperProof
proof.HistoryProof = historyProof
return &proof, nil
}

Expand Down
79 changes: 37 additions & 42 deletions balloon/balloon_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,14 @@ package balloon

import (
"fmt"
"sync/atomic"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/bbva/qed/hashing"
"github.com/bbva/qed/log"
"github.com/bbva/qed/storage"
"github.com/bbva/qed/testutils/rand"
storage_utils "github.com/bbva/qed/testutils/storage"
"github.com/bbva/qed/util"
Expand Down Expand Up @@ -198,44 +198,6 @@ func TestCacheWarmingUp(t *testing.T) {
}
}

func TestTamperAndVerify(t *testing.T) {
log.SetLogger("TestTamperAndVerify", log.SILENT)

store, closeF := storage_utils.OpenRocksDBStore(t, "/var/tmp/balloon.test.2")
defer closeF()

b, err := NewBalloon(store, hashing.NewSha256Hasher)
assert.NoError(t, err)

event := hashing.Digest("Never knows best")
eventDigest := b.hasher.Do(event)

snapshot, mutations, err := b.Add(event)
store.Mutate(mutations)

memProof, err := b.QueryMembership(event, snapshot.Version)
assert.NoError(t, err)
assert.True(t, memProof.Verify(event, snapshot), "The proof should verify correctly")

original, err := store.Get(storage.IndexTable, eventDigest)
assert.NoError(t, err)

tpBytes := util.Uint64AsBytes(^uint64(0))

assert.NoError(t, store.Mutate(
[]*storage.Mutation{
{storage.IndexTable, eventDigest, tpBytes},
},
), "store add returned non nil value")

tampered, _ := store.Get(storage.IndexTable, eventDigest)
assert.Equal(t, tpBytes, tampered.Value, "Tamper unsuccessful")
assert.NotEqual(t, original.Value, tampered.Value, "Tamper unsuccessful")

_, err = b.QueryMembership(event, snapshot.Version)
require.Error(t, err)
}

func TestGenIncrementalAndVerify(t *testing.T) {
log.SetLogger("TestGenIncrementalAndVerify", log.SILENT)

Expand Down Expand Up @@ -318,7 +280,7 @@ func BenchmarkAddRocksDB(b *testing.B) {
require.NoError(b, err)

b.ResetTimer()
b.N = 100000
b.N = 1000000
for i := 0; i < b.N; i++ {
event := rand.Bytes(128)
_, mutations, _ := balloon.Add(event)
Expand All @@ -337,7 +299,7 @@ func BenchmarkQueryRocksDB(b *testing.B) {
balloon, err := NewBalloon(store, hashing.NewSha256Hasher)
require.NoError(b, err)

b.N = 100000
b.N = 1000000
for i := 0; i < b.N; i++ {
event := rand.Bytes(128)
events = append(events, event)
Expand All @@ -347,7 +309,40 @@ func BenchmarkQueryRocksDB(b *testing.B) {

b.ResetTimer()
for i, e := range events {
balloon.QueryMembership(e, uint64(i))
_, err := balloon.QueryMembership(e, uint64(i))
require.NoError(b, err)
}

}

func BenchmarkQueryRocksDBParallel(b *testing.B) {
var events [][]byte
log.SetLogger("BenchmarkQueryRocksDB", log.SILENT)

store, closeF := storage_utils.OpenRocksDBStore(b, "/var/tmp/ballon_bench.db")
defer closeF()

balloon, err := NewBalloon(store, hashing.NewSha256Hasher)
require.NoError(b, err)

b.N = 1000000
for i := 0; i < b.N; i++ {
event := rand.Bytes(128)
events = append(events, event)
_, mutations, _ := balloon.Add(event)
store.Mutate(mutations)
}

b.ResetTimer()
n := int64(-1)
b.SetParallelism(100)
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
i := atomic.AddInt64(&n, 1)
event := events[i]
_, err := balloon.QueryMembership(event, uint64(i))
require.NoError(b, err)
}
})

}
2 changes: 1 addition & 1 deletion balloon/cache/passthrough_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func TestPassThroughCache(t *testing.T) {

store, closeF := storage_utils.OpenBPlusTreeStore()
defer closeF()
table := storage.IndexTable
table := storage.HistoryCacheTable
cache := NewPassThroughCache(table, store)

for i, c := range testCases {
Expand Down
7 changes: 3 additions & 4 deletions balloon/history/tree.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package history
import (
"github.com/bbva/qed/balloon/cache"
"github.com/bbva/qed/hashing"
"github.com/bbva/qed/log"
"github.com/bbva/qed/storage"
)

Expand Down Expand Up @@ -48,7 +47,7 @@ func NewHistoryTree(hasherF func() hashing.Hasher, store storage.Store, cacheSiz

func (t *HistoryTree) Add(eventDigest hashing.Digest, version uint64) (hashing.Digest, []*storage.Mutation, error) {

log.Debugf("Adding new event digest %x with version %d", eventDigest, version)
// log.Debugf("Adding new event digest %x with version %d", eventDigest, version)

// build a visitable pruned tree and then visit it to generate the root hash
visitor := newInsertVisitor(t.hasher, t.writeCache, storage.HistoryCacheTable)
Expand All @@ -59,7 +58,7 @@ func (t *HistoryTree) Add(eventDigest hashing.Digest, version uint64) (hashing.D

func (t *HistoryTree) ProveMembership(index, version uint64) (*MembershipProof, error) {

log.Debugf("Proving membership for index %d with version %d", index, version)
//log.Debugf("Proving membership for index %d with version %d", index, version)

// build a visitable pruned tree and then visit it to collect the audit path
visitor := newAuditPathVisitor(t.hasherF(), t.readCache)
Expand All @@ -75,7 +74,7 @@ func (t *HistoryTree) ProveMembership(index, version uint64) (*MembershipProof,

func (t *HistoryTree) ProveConsistency(start, end uint64) (*IncrementalProof, error) {

log.Debugf("Proving consistency between versions %d and %d", start, end)
//log.Debugf("Proving consistency between versions %d and %d", start, end)

// build a visitable pruned tree and then visit it to collect the audit path
visitor := newAuditPathVisitor(t.hasherF(), t.readCache)
Expand Down
14 changes: 14 additions & 0 deletions balloon/hyper/operation.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ type pruningContext struct {
DefaultHashes []hashing.Digest
Mutations []*storage.Mutation
AuditPath AuditPath
Value []byte
}

type operationCode int
Expand All @@ -42,6 +43,7 @@ const (
getProvidedHashCode
putInCacheCode
mutateBatchCode
collectValueCode
collectHashCode
getFromPathCode
useHashCode
Expand Down Expand Up @@ -146,6 +148,18 @@ func mutateBatch(pos position, batch *batchNode) *operation {
}
}

func collectValue(pos position, value []byte) *operation {
return &operation{
Code: collectValueCode,
Pos: pos,
Interpret: func(ops *operationsStack, c *pruningContext) hashing.Digest {
hash := ops.Pop().Interpret(ops, c)
c.Value = value
return hash
},
}
}

func collectHash(pos position) *operation {
return &operation{
Code: collectHashCode,
Expand Down
2 changes: 1 addition & 1 deletion balloon/hyper/proof.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func NewQueryProof(key, value []byte, auditPath AuditPath, hasher hashing.Hasher
// false otherwise.
func (p QueryProof) Verify(key []byte, expectedRootHash hashing.Digest) (valid bool) {

log.Debugf("Verifying query proof for key %d", p.Key)
log.Debugf("Verifying query proof for key %x", p.Key)

if len(p.AuditPath) == 0 {
// an empty audit path (empty tree) shows non-membersip for any key
Expand Down
4 changes: 4 additions & 0 deletions balloon/hyper/search.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,10 @@ func pruneToFind(index []byte, batches batchLoader) *operationsStack {
// regardless if the key of the shortcut matches the searched index
// we must stop traversing because there are no more leaves below
ops.Push(getProvidedHash(pos, iBatch, batch)) // not collected
k, v := batch.GetLeafKVAt(iBatch)
if bytes.Equal(k, index) {
ops.Push(collectValue(pos, v)) // collect value if the key matches the queried index
}
return
}

Expand Down
3 changes: 3 additions & 0 deletions balloon/hyper/search_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ func TestPruneToFind(t *testing.T) {
{innerHashCode, pos(0, 5)},
{collectHashCode, pos(16, 4)},
{getDefaultHashCode, pos(16, 4)},
{collectValueCode, pos(0, 4)},
{getProvidedHashCode, pos(0, 4)}, // we stop traversing at the shortcut (index=0)
},
},
Expand Down Expand Up @@ -177,6 +178,7 @@ func TestPruneToFind(t *testing.T) {
{collectHashCode, pos(2, 1)},
{getDefaultHashCode, pos(2, 1)},
{innerHashCode, pos(0, 1)},
{collectValueCode, pos(1, 0)},
{getProvidedHashCode, pos(1, 0)}, // shortcut found but not collected
{collectHashCode, pos(0, 0)},
{getProvidedHashCode, pos(0, 0)}, // we take the hash of the index=0 position from the batch
Expand Down Expand Up @@ -264,6 +266,7 @@ func TestPruneToFind(t *testing.T) {
{getDefaultHashCode, pos(16, 4)},
{innerHashCode, pos(0, 4)},
{innerHashCode, pos(8, 3)},
{collectValueCode, pos(12, 2)},
{getProvidedHashCode, pos(12, 2)}, // found shortcut index=12
{collectHashCode, pos(8, 2)},
{getProvidedHashCode, pos(8, 2)}, // shortcut index=8
Expand Down
15 changes: 5 additions & 10 deletions balloon/hyper/tree.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,20 +93,14 @@ func (t *HyperTree) Add(eventDigest hashing.Digest, version uint64) (hashing.Dig

rh := ops.Pop().Interpret(ops, ctx)

// create a mutation for the new leaf
leafMutation := storage.NewMutation(storage.IndexTable, eventDigest, versionAsBytes)

// collect mutations
mutations := append(ctx.Mutations, leafMutation)

return rh, mutations, nil
return rh, ctx.Mutations, nil
}

func (t *HyperTree) QueryMembership(eventDigest hashing.Digest, version []byte) (proof *QueryProof, err error) {
func (t *HyperTree) QueryMembership(eventDigest hashing.Digest) (proof *QueryProof, err error) {
t.Lock()
defer t.Unlock()

//log.Debugf("Proving membership for index %d with version %d", eventDigest, version)
//log.Debugf("Proving membership for index %d", eventDigest)

// build a stack of operations and then interpret it to generate the audit path
ops := pruneToFind(eventDigest, t.batchLoader)
Expand All @@ -119,7 +113,8 @@ func (t *HyperTree) QueryMembership(eventDigest hashing.Digest, version []byte)

ops.Pop().Interpret(ops, ctx)

return NewQueryProof(eventDigest, version, ctx.AuditPath, t.hasherF()), nil
// ctx.Value is nil if the digest does not exist
return NewQueryProof(eventDigest, ctx.Value, ctx.AuditPath, t.hasherF()), nil
}

func (t *HyperTree) RebuildCache() {
Expand Down
Loading

0 comments on commit 21a7e39

Please sign in to comment.