Skip to content

Commit

Permalink
[FAB-6107] unit test for bulk loading & a bug fix
Browse files Browse the repository at this point in the history
As discussed in https://gerrit.hyperledger.org/r/#/c/13149/, need a unit
test to ensure cache is loaded correctly during validation phase when
using private data.

This CR introduces GetCachedVersion() in BulkOptimizable interface so
that we can test the cache loading logic. GetCachedVersion() retrieves
key's version present in the cache (and if not present, it returns false)
whereas GetVersion() retrieves version from state db if required key is
not available in the cache.

This CR fixes a bug in LoadCommittedVersions() implemented in
privacyenabledstate.DB.

This CR also renames ClearCommittedVersions() to ClearCachedVersions()
as it is more relevant.

Change-Id: Ib04ea26db025c82c4e34be48a7d875a5363ef3b7
Signed-off-by: senthil <[email protected]>
  • Loading branch information
cendhu committed Sep 28, 2017
1 parent f66e4e6 commit edcaa8e
Show file tree
Hide file tree
Showing 6 changed files with 177 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ func (s *CommonStorageDB) LoadCommittedVersionsOfPubAndHashedKeys(pubKeys []*sta

// Here, hashedKeys are merged into pubKeys to get a combined set of keys for combined loading
for _, key := range hashedKeys {
ns := derivePvtDataNs(key.Namespace, key.CollectionName)
ns := deriveHashedDataNs(key.Namespace, key.CollectionName)
// No need to check for duplicates as hashedKeys are in separate namespace
var keyHashStr string
if !s.BytesKeySuppoted() {
Expand All @@ -104,8 +104,8 @@ func (s *CommonStorageDB) LoadCommittedVersionsOfPubAndHashedKeys(pubKeys []*sta
bulkOptimizable.LoadCommittedVersions(pubKeys)
}

// ClearCommittedVersions implements corresponding function in interface DB
func (s *CommonStorageDB) ClearCommittedVersions() {
// ClearCachedVersions implements corresponding function in interface DB
func (s *CommonStorageDB) ClearCachedVersions() {
bulkOptimizable, ok := s.VersionedDB.(statedb.BulkOptimizable)
if ok {
bulkOptimizable.ClearCachedVersions()
Expand Down Expand Up @@ -135,6 +135,19 @@ func (s *CommonStorageDB) GetKeyHashVersion(namespace, collection string, keyHas
return s.GetVersion(deriveHashedDataNs(namespace, collection), keyHashStr)
}

func (s *CommonStorageDB) GetCachedKeyHashVersion(namespace, collection string, keyHash []byte) (*version.Height, bool) {
bulkOptimizable, ok := s.VersionedDB.(statedb.BulkOptimizable)
if !ok {
return nil, false
}

keyHashStr := string(keyHash)
if !s.BytesKeySuppoted() {
keyHashStr = base64.StdEncoding.EncodeToString(keyHash)
}
return bulkOptimizable.GetCachedVersion(deriveHashedDataNs(namespace, collection), keyHashStr)
}

// GetPrivateDataMultipleKeys implements corresponding function in interface DB
func (s *CommonStorageDB) GetPrivateDataMultipleKeys(namespace, collection string, keys []string) ([]*statedb.VersionedValue, error) {
return s.GetStateMultipleKeys(derivePvtDataNs(namespace, collection), keys)
Expand Down
3 changes: 2 additions & 1 deletion core/ledger/kvledger/txmgmt/privacyenabledstate/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ type DB interface {
statedb.VersionedDB
IsBulkOptimizable() bool
LoadCommittedVersionsOfPubAndHashedKeys(pubKeys []*statedb.CompositeKey, hashedKeys []*HashedCompositeKey)
ClearCommittedVersions()
GetCachedKeyHashVersion(namespace, collection string, keyHash []byte) (*version.Height, bool)
ClearCachedVersions()
GetPrivateData(namespace, collection, key string) (*statedb.VersionedValue, error)
GetValueHash(namespace, collection string, keyHash []byte) (*statedb.VersionedValue, error)
GetKeyHashVersion(namespace, collection string, keyHash []byte) (*version.Height, error)
Expand Down
19 changes: 17 additions & 2 deletions core/ledger/kvledger/txmgmt/statedb/statecouchdb/statecouchdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,14 +159,27 @@ func (vdb *VersionedDB) GetState(namespace string, key string) (*statedb.Version
}

// GetVersion implements method in VersionedDB interface
func (vdb *VersionedDB) GetVersion(namespace string, key string) (*version.Height, error) {
func (vdb *VersionedDB) GetCachedVersion(namespace string, key string) (*version.Height, bool) {

logger.Debugf("Retrieving cached version: %s~%s", key, namespace)

compositeKey := statedb.CompositeKey{Namespace: namespace, Key: key}

// Retrieve the version from committed data cache.
// Since the cache was populated based on block readsets,
// checks during validation should find the version here
returnVersion, keyFound := vdb.committedDataCache.committedVersions[compositeKey]
version, keyFound := vdb.committedDataCache.committedVersions[compositeKey]

if !keyFound {
return nil, false
}
return version, true
}

// GetVersion implements method in VersionedDB interface
func (vdb *VersionedDB) GetVersion(namespace string, key string) (*version.Height, error) {

returnVersion, keyFound := vdb.GetCachedVersion(namespace, key)

// If the version was not found in the committed data cache, retrieve it from statedb.
if !keyFound {
Expand Down Expand Up @@ -540,6 +553,8 @@ func (vdb *VersionedDB) LoadCommittedVersions(keys []*statedb.CompositeKey) {
keysToRetrieve := []string{}
for _, key := range keys {

logger.Debugf("Load into version cache: %s~%s", key.Key, key.Namespace)

// create composite key for couchdb
compositeDBKey := constructCompositeKey(key.Namespace, key.Key)

Expand Down
1 change: 1 addition & 0 deletions core/ledger/kvledger/txmgmt/statedb/statedb.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ type VersionedDB interface {
//databases capable of batch operations
type BulkOptimizable interface {
LoadCommittedVersions(keys []*CompositeKey)
GetCachedVersion(namespace, key string) (*version.Height, bool)
ClearCachedVersions()
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ func (txmgr *LockBasedTxMgr) Rollback() {
// clearCache empty the cache maintained by the statedb implementation
func (txmgr *LockBasedTxMgr) clearCache() {
if txmgr.db.IsBulkOptimizable() {
txmgr.db.ClearCommittedVersions()
txmgr.db.ClearCachedVersions()
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,19 +25,159 @@ import (
"github.com/hyperledger/fabric/common/ledger/testutil"
"github.com/hyperledger/fabric/core/ledger/kvledger/txmgmt/privacyenabledstate"
"github.com/hyperledger/fabric/core/ledger/kvledger/txmgmt/rwsetutil"
"github.com/hyperledger/fabric/core/ledger/kvledger/txmgmt/statedb"
"github.com/hyperledger/fabric/core/ledger/kvledger/txmgmt/validator/valinternal"
"github.com/hyperledger/fabric/core/ledger/kvledger/txmgmt/version"
"github.com/hyperledger/fabric/core/ledger/util"
"github.com/hyperledger/fabric/protos/ledger/rwset/kvrwset"
"github.com/hyperledger/fabric/protos/peer"
"github.com/spf13/viper"
)

type keyValue struct {
namespace string
collection string
key string
keyHash []byte
value []byte
version *version.Height
}

func TestMain(m *testing.M) {
flogging.SetModuleLevel("statevalidator", "debug")
flogging.SetModuleLevel("statebasedval", "debug")
flogging.SetModuleLevel("statecouchdb", "debug")
viper.Set("peer.fileSystemPath", "/tmp/fabric/ledgertests/kvledger/txmgmt/validator/statebasedval")
os.Exit(m.Run())
}

func TestValidatorBulkLoadingOfCache(t *testing.T) {
testDBEnv := privacyenabledstate.CouchDBCommonStorageTestEnv{}
testDBEnv.Init(t)
defer testDBEnv.Cleanup()
db := testDBEnv.GetDBHandle("testdb")

validator := NewValidator(db)

//populate db with initial data
batch := privacyenabledstate.NewUpdateBatch()

// Create two public KV pairs
pubKV1 := keyValue{namespace: "ns1", key: "key1", value: []byte("value1"), version: version.NewHeight(1, 0)}
pubKV2 := keyValue{namespace: "ns1", key: "key2", value: []byte("value2"), version: version.NewHeight(1, 1)}

// Create two hashed KV pairs
hashedKV1 := keyValue{namespace: "ns2", collection: "col1", key: "hashedPvtKey1",
keyHash: util.ComputeStringHash("hashedPvtKey1"), value: []byte("value1"),
version: version.NewHeight(1, 2)}
hashedKV2 := keyValue{namespace: "ns2", collection: "col2", key: "hashedPvtKey2",
keyHash: util.ComputeStringHash("hashedPvtKey2"), value: []byte("value2"),
version: version.NewHeight(1, 3)}

// Store the public and hashed KV pairs to DB
batch.PubUpdates.Put(pubKV1.namespace, pubKV1.key, pubKV1.value, pubKV1.version)
batch.PubUpdates.Put(pubKV2.namespace, pubKV2.key, pubKV2.value, pubKV2.version)
batch.HashUpdates.Put(hashedKV1.namespace, hashedKV1.collection, hashedKV1.keyHash, hashedKV1.value, hashedKV1.version)
batch.HashUpdates.Put(hashedKV2.namespace, hashedKV2.collection, hashedKV2.keyHash, hashedKV2.value, hashedKV2.version)

db.ApplyPrivacyAwareUpdates(batch, version.NewHeight(1, 4))

// Construct read set for transaction 1. It contains two public KV pairs (pubKV1, pubKV2) and two
// hashed KV pairs (hashedKV1, hashedKV2).
rwsetBuilder1 := rwsetutil.NewRWSetBuilder()
rwsetBuilder1.AddToReadSet(pubKV1.namespace, pubKV1.key, pubKV1.version)
rwsetBuilder1.AddToReadSet(pubKV2.namespace, pubKV2.key, pubKV2.version)
rwsetBuilder1.AddToHashedReadSet(hashedKV1.namespace, hashedKV1.collection, hashedKV1.key, hashedKV1.version)
rwsetBuilder1.AddToHashedReadSet(hashedKV2.namespace, hashedKV2.collection, hashedKV2.key, hashedKV2.version)

// Construct read set for transaction 1. It contains KV pairs which are not in the state db.
rwsetBuilder2 := rwsetutil.NewRWSetBuilder()
rwsetBuilder2.AddToReadSet("ns3", "key1", nil)
rwsetBuilder2.AddToHashedReadSet("ns3", "col1", "hashedPvtKey1", nil)

// Construct internal block
transRWSets := getTestPubSimulationRWSet(t, rwsetBuilder1, rwsetBuilder2)
var trans []*valinternal.Transaction
for i, tranRWSet := range transRWSets {
tx := &valinternal.Transaction{
ID: fmt.Sprintf("txid-%d", i),
IndexInBlock: i,
ValidationCode: peer.TxValidationCode_VALID,
RWSet: tranRWSet,
}
trans = append(trans, tx)
}
block := &valinternal.Block{Num: 1, Txs: trans}

if validator.db.IsBulkOptimizable() {

commonStorageDB := validator.db.(*privacyenabledstate.CommonStorageDB)
bulkOptimizable, _ := commonStorageDB.VersionedDB.(statedb.BulkOptimizable)

// Clear cache loaded during ApplyPrivacyAwareUpdates()
validator.db.ClearCachedVersions()

validator.preLoadCommittedVersionOfRSet(block)

// pubKV1 should be found in cache
version, keyFound := bulkOptimizable.GetCachedVersion(pubKV1.namespace, pubKV1.key)
testutil.AssertEquals(t, keyFound, true)
testutil.AssertEquals(t, version, pubKV1.version)

// pubKV2 should be found in cache
version, keyFound = bulkOptimizable.GetCachedVersion(pubKV2.namespace, pubKV2.key)
testutil.AssertEquals(t, keyFound, true)
testutil.AssertEquals(t, version, pubKV2.version)

// [ns3, key1] should be found in cache as it was in the readset of transaction 1 though it is
// not in the state db but the version would be nil
version, keyFound = bulkOptimizable.GetCachedVersion("ns3", "key1")
testutil.AssertEquals(t, keyFound, true)
testutil.AssertEquals(t, version, nil)

// [ns4, key1] should not be found in cache as it was not loaded
version, keyFound = bulkOptimizable.GetCachedVersion("ns4", "key1")
testutil.AssertEquals(t, keyFound, false)
testutil.AssertEquals(t, version, nil)

// hashedKV1 should be found in cache
version, keyFound = validator.db.GetCachedKeyHashVersion(hashedKV1.namespace,
hashedKV1.collection, hashedKV1.keyHash)
testutil.AssertEquals(t, keyFound, true)
testutil.AssertEquals(t, version, hashedKV1.version)

// hashedKV2 should be found in cache
version, keyFound = validator.db.GetCachedKeyHashVersion(hashedKV2.namespace,
hashedKV2.collection, hashedKV2.keyHash)
testutil.AssertEquals(t, keyFound, true)
testutil.AssertEquals(t, version, hashedKV2.version)

// [ns3, col1, hashedPvtKey1] should be found in cache as it was in the readset of transaction 2 though it is
// not in the state db
version, keyFound = validator.db.GetCachedKeyHashVersion("ns3", "col1", util.ComputeStringHash("hashedPvtKey1"))
testutil.AssertEquals(t, keyFound, true)
testutil.AssertEquals(t, version, nil)

// [ns4, col, key1] should not be found in cache as it was not loaded
version, keyFound = validator.db.GetCachedKeyHashVersion("ns4", "col1", util.ComputeStringHash("key1"))
testutil.AssertEquals(t, keyFound, false)
testutil.AssertEquals(t, version, nil)

// Clear cache
validator.db.ClearCachedVersions()

// pubKV1 should not be found in cache as cahce got emptied
version, keyFound = bulkOptimizable.GetCachedVersion(pubKV1.namespace, pubKV1.key)
testutil.AssertEquals(t, keyFound, false)
testutil.AssertEquals(t, version, nil)

// [ns3, col1, key1] should not be found in cache as cahce got emptied
version, keyFound = validator.db.GetCachedKeyHashVersion("ns3", "col1", util.ComputeStringHash("hashedPvtKey1"))
testutil.AssertEquals(t, keyFound, false)
testutil.AssertEquals(t, version, nil)
}
}

func TestValidator(t *testing.T) {
testDBEnv := privacyenabledstate.LevelDBCommonStorageTestEnv{}
testDBEnv.Init(t)
Expand Down

0 comments on commit edcaa8e

Please sign in to comment.