Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix missing err check in the commit path #1543

Merged
merged 1 commit into from
Jul 16, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 32 additions & 2 deletions core/ledger/kvledger/hashcheck_pvtdata_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,45 @@ import (
"github.com/hyperledger/fabric/common/ledger/testutil"
"github.com/hyperledger/fabric/core/ledger"
"github.com/hyperledger/fabric/core/ledger/kvledger/txmgmt/rwsetutil"
"github.com/hyperledger/fabric/core/ledger/mock"
"github.com/hyperledger/fabric/protoutil"
"github.com/stretchr/testify/require"
)

func TestConstructValidInvalidBlocksPvtData(t *testing.T) {
conf, cleanup := testConfig(t)
defer cleanup()
provider := testutilNewProvider(conf, t, &mock.DeployedChaincodeInfoProvider{})
nsCollBtlConfs := []*nsCollBtlConfig{
{
namespace: "ns-1",
btlConfig: map[string]uint64{
"coll-1": 0,
"coll-2": 0,
},
},
{
namespace: "ns-2",
btlConfig: map[string]uint64{
"coll-2": 0,
},
},
{
namespace: "ns-4",
btlConfig: map[string]uint64{
"coll-2": 0,
},
},
{
namespace: "ns-6",
btlConfig: map[string]uint64{
"coll-2": 0,
},
},
}
provider := testutilNewProviderWithCollectionConfig(
t,
nsCollBtlConfs,
conf,
)
defer provider.Close()

_, gb := testutil.NewBlockGenerator(t, "testLedger", false)
Expand Down
4 changes: 1 addition & 3 deletions core/ledger/kvledger/kv_ledger.go
Original file line number Diff line number Diff line change
Expand Up @@ -314,9 +314,7 @@ func (l *kvLedger) syncStateDBWithOldBlkPvtdata() error {
return err
}

l.pvtdataStore.ResetLastUpdatedOldBlocksList()

return nil
return l.pvtdataStore.ResetLastUpdatedOldBlocksList()
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

An err can occur due to a leveldb internal error. Further, this is part of the peer recovery code and executes during the peer startup. Hence, it is safe to process this error and it would not create any fork or make difficulties in joining a new peer (which would fetch and process all blocks). A leveldb error during the peer startup would result in a peer panic and is the expected behavior.

}

func (l *kvLedger) filterYetToCommitBlocks(blocksPvtData map[uint64][]*ledger.TxPvtData) error {
Expand Down
53 changes: 35 additions & 18 deletions core/ledger/kvledger/kv_ledger_provider_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -623,43 +623,60 @@ func testutilNewProvider(conf *lgr.Config, t *testing.T, ccInfoProvider *mock.De
return provider
}

type nsCollBtlConfig struct {
namespace string
btlConfig map[string]uint64
}

func testutilNewProviderWithCollectionConfig(
t *testing.T,
namespace string,
btlConfigs map[string]uint64,
nsCollBtlConfigs []*nsCollBtlConfig,
conf *lgr.Config,
) *Provider {
provider := testutilNewProvider(conf, t, &mock.DeployedChaincodeInfoProvider{})
mockCCInfoProvider := provider.initializer.DeployedChaincodeInfoProvider.(*mock.DeployedChaincodeInfoProvider)
collMap := map[string]*peer.StaticCollectionConfig{}
var collConf []*peer.CollectionConfig
for collName, btl := range btlConfigs {
staticConf := &peer.StaticCollectionConfig{Name: collName, BlockToLive: btl}
collMap[collName] = staticConf
collectionConf := &peer.CollectionConfig{}
collectionConf.Payload = &peer.CollectionConfig_StaticCollectionConfig{StaticCollectionConfig: staticConf}
collConf = append(collConf, collectionConf)
collectionConfPkgs := []*peer.CollectionConfigPackage{}

nsCollMap := map[string]map[string]*peer.StaticCollectionConfig{}
for _, nsCollBtlConf := range nsCollBtlConfigs {
collMap := map[string]*peer.StaticCollectionConfig{}
var collConf []*peer.CollectionConfig
for collName, btl := range nsCollBtlConf.btlConfig {
staticConf := &peer.StaticCollectionConfig{Name: collName, BlockToLive: btl}
collMap[collName] = staticConf
collectionConf := &peer.CollectionConfig{}
collectionConf.Payload = &peer.CollectionConfig_StaticCollectionConfig{StaticCollectionConfig: staticConf}
collConf = append(collConf, collectionConf)
}
collectionConfPkgs = append(collectionConfPkgs, &peer.CollectionConfigPackage{Config: collConf})
nsCollMap[nsCollBtlConf.namespace] = collMap
}
collectionConfPkg := &peer.CollectionConfigPackage{Config: collConf}

mockCCInfoProvider.ChaincodeInfoStub = func(channelName, ccName string, qe lgr.SimpleQueryExecutor) (*lgr.DeployedChaincodeInfo, error) {
if ccName == namespace {
return &lgr.DeployedChaincodeInfo{
Name: namespace, ExplicitCollectionConfigPkg: collectionConfPkg}, nil
for i, nsCollBtlConf := range nsCollBtlConfigs {
if ccName == nsCollBtlConf.namespace {
return &lgr.DeployedChaincodeInfo{
Name: nsCollBtlConf.namespace, ExplicitCollectionConfigPkg: collectionConfPkgs[i]}, nil
}
}
return nil, nil
}

mockCCInfoProvider.AllCollectionsConfigPkgStub = func(channelName, ccName string, qe lgr.SimpleQueryExecutor) (*peer.CollectionConfigPackage, error) {
if ccName == namespace {
return collectionConfPkg, nil
for i, nsCollBtlConf := range nsCollBtlConfigs {
if ccName == nsCollBtlConf.namespace {
return collectionConfPkgs[i], nil
}
}
return nil, nil

}

mockCCInfoProvider.CollectionInfoStub = func(channelName, ccName, collName string, qe lgr.SimpleQueryExecutor) (*peer.StaticCollectionConfig, error) {
if ccName == namespace {
return collMap[collName], nil
for _, nsCollBtlConf := range nsCollBtlConfigs {
if ccName == nsCollBtlConf.namespace {
return nsCollMap[nsCollBtlConf.namespace][collName], nil
}
}
return nil, nil
}
Expand Down
18 changes: 10 additions & 8 deletions core/ledger/kvledger/kv_ledger_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -258,10 +258,15 @@ func TestKVLedgerBlockStorageWithPvtdata(t *testing.T) {
func TestKVLedgerDBRecovery(t *testing.T) {
conf, cleanup := testConfig(t)
defer cleanup()
nsCollBtlConfs := []*nsCollBtlConfig{
{
namespace: "ns",
btlConfig: map[string]uint64{"coll": 0},
},
}
provider1 := testutilNewProviderWithCollectionConfig(
t,
"ns",
map[string]uint64{"coll": 0},
nsCollBtlConfs,
conf,
)
defer provider1.Close()
Expand Down Expand Up @@ -328,8 +333,7 @@ func TestKVLedgerDBRecovery(t *testing.T) {
// StateDB and HistoryDB should be recovered before returning from NewKVLedger call
provider2 := testutilNewProviderWithCollectionConfig(
t,
"ns",
map[string]uint64{"coll": 0},
nsCollBtlConfs,
conf,
)
defer provider2.Close()
Expand Down Expand Up @@ -385,8 +389,7 @@ func TestKVLedgerDBRecovery(t *testing.T) {
// history DB should be recovered before returning from NewKVLedger call
provider3 := testutilNewProviderWithCollectionConfig(
t,
"ns",
map[string]uint64{"coll": 0},
nsCollBtlConfs,
conf,
)
defer provider3.Close()
Expand Down Expand Up @@ -443,8 +446,7 @@ func TestKVLedgerDBRecovery(t *testing.T) {
// state DB should be recovered before returning from NewKVLedger call
provider4 := testutilNewProviderWithCollectionConfig(
t,
"ns",
map[string]uint64{"coll": 0},
nsCollBtlConfs,
conf,
)
defer provider4.Close()
Expand Down
9 changes: 7 additions & 2 deletions core/ledger/kvledger/snapshot_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,15 @@ func TestGenerateSnapshot(t *testing.T) {
conf, cleanup := testConfig(t)
defer cleanup()
snapshotRootDir := conf.SnapshotsConfig.RootDir
nsCollBtlConfs := []*nsCollBtlConfig{
{
namespace: "ns",
btlConfig: map[string]uint64{"coll": 0},
},
}
provider := testutilNewProviderWithCollectionConfig(
t,
"ns",
map[string]uint64{"coll": 0},
nsCollBtlConfs,
conf,
)
defer provider.Close()
Expand Down
15 changes: 10 additions & 5 deletions core/ledger/kvledger/txmgmt/pvtstatepurgemgmt/purge_mgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,9 @@ func (p *PurgeMgr) UpdateExpiryInfoOfPvtDataOfOldBlocks(pvtUpdates *privacyenabl
builder := newExpiryScheduleBuilder(p.btlPolicy)
pvtUpdateCompositeKeyMap := pvtUpdates.ToCompositeKeyMap()
for k, vv := range pvtUpdateCompositeKeyMap {
builder.add(k.Namespace, k.CollectionName, k.Key, util.ComputeStringHash(k.Key), vv)
if err := builder.add(k.Namespace, k.CollectionName, k.Key, util.ComputeStringHash(k.Key), vv); err != nil {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

An err can occur here while retrieving collection info (NoSuchCollectionError or DB read error). At this point, NoSuchCollectionError error would not occur because the private data coordinator return err (which eventually results in peer panic) if a collection does not exist. Hence, an err can happen here only due to DB access. For DB access error, peer panic would occur, and no forks possibility as such.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess that you mistakenly linked this function to the regular commit path. This function is for old missing pvt data which is in the reconciliation path and does cause a panic but rather retries.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah. Correct. My mistake.

return err
}
}

var expiryInfoUpdates []*expiryInfo
Expand Down Expand Up @@ -212,7 +214,10 @@ func (p *PurgeMgr) prepareWorkingsetFor(expiringAtBlk uint64) *workingset {
// Transform the keys into the form such that for each hashed key that is eligible for purge appears in 'toPurge'
toPurge := transformToExpiryInfoMap(expiryInfo)
// Load the latest versions of the hashed keys
p.preloadCommittedVersionsInCache(toPurge)
if err = p.preloadCommittedVersionsInCache(toPurge); err != nil {
workingset.err = err
return workingset
}
var expiryInfoKeysToClear []*expiryInfoKey

if len(toPurge) == 0 {
Expand Down Expand Up @@ -266,15 +271,15 @@ func (p *PurgeMgr) prepareWorkingsetFor(expiringAtBlk uint64) *workingset {
return workingset
}

func (p *PurgeMgr) preloadCommittedVersionsInCache(expInfoMap expiryInfoMap) {
func (p *PurgeMgr) preloadCommittedVersionsInCache(expInfoMap expiryInfoMap) error {
if !p.db.IsBulkOptimizable() {
return
return nil
}
var hashedKeys []*privacyenabledstate.HashedCompositeKey
for k := range expInfoMap {
hashedKeys = append(hashedKeys, &k)
}
p.db.LoadCommittedVersionsOfPubAndHashedKeys(nil, hashedKeys)
return p.db.LoadCommittedVersionsOfPubAndHashedKeys(nil, hashedKeys)
}

func transformToExpiryInfoMap(expiryInfo []*expiryInfo) expiryInfoMap {
Expand Down
4 changes: 3 additions & 1 deletion core/ledger/kvledger/txmgmt/queryutil/iterator_combiner.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,9 @@ func (combiner *itrCombiner) Next() (commonledger.QueryResult, error) {
}
}
kv := combiner.kvAt(smallestHolderIndex)
combiner.moveItrAndRemoveIfExhausted(smallestHolderIndex)
if _, err := combiner.moveItrAndRemoveIfExhausted(smallestHolderIndex); err != nil {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

an err can happen here due leveldb internal error (i.e., iter.Next()). As this error is related to the system resources and does not change depending on the block content, there is no possibility of a fork.

return nil, err
}
if kv.IsDelete() {
return combiner.Next()
}
Expand Down
7 changes: 4 additions & 3 deletions core/ledger/kvledger/txmgmt/rwsetutil/query_results_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,9 @@ func (helper *RangeQueryResultsHelper) Done() ([]*kvrwset.KVRead, *kvrwset.Query
return helper.pendingResults, nil, err
}
}
helper.mt.done()
if err := helper.mt.done(); err != nil {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

An err can happen here only when the hash computation returns an error., i.e., either GetHash() or hash.Write(). Given that this error would not change with respect to the block content, no possibility of a fork.

return nil, nil, err
}
return helper.pendingResults, helper.mt.getSummery(), nil
}

Expand All @@ -132,8 +134,7 @@ func (helper *RangeQueryResultsHelper) processPendingResults() error {
if err != nil {
return err
}
helper.mt.update(hash)
return nil
return helper.mt.update(hash)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as the helper.mt.done()

}

func serializeKVReads(kvReads []*kvrwset.KVRead) ([]byte, error) {
Expand Down
4 changes: 3 additions & 1 deletion core/ledger/kvledger/txmgmt/txmgr/lockbased_txmgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,9 @@ func NewLockBasedTxMgr(initializer *Initializer) (*LockBasedTxMgr, error) {
return nil, errors.New("create new lock based TxMgr failed: passed in nil ledger hasher")
}

initializer.DB.Open()
if err := initializer.DB.Open(); err != nil {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Open() always return a nil and it is a nop. We can remove this API in a separate PR.

return nil, err
}
txmgr := &LockBasedTxMgr{
ledgerid: initializer.LedgerID,
db: initializer.DB,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,9 @@ func (v *rangeQueryHashValidator) validate() (bool, error) {
return equals, nil
}
versionedKV := result.(*statedb.VersionedKV)
v.resultsHelper.AddResult(rwsetutil.NewKVRead(versionedKV.Key, versionedKV.Version))
if err := v.resultsHelper.AddResult(rwsetutil.NewKVRead(versionedKV.Key, versionedKV.Version)); err != nil {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

An err can happen here only if either the proto marshal fails or the hash function returns an error. As the validator reads the value from the DB, the content of the block should not change the result. Hence, there is no possibility of a fork here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll summarize it a bit differently... Though, practically its a remote chance, but theoretically, this error miss could already cause a fork, as per the existing code. Because, the peer that would miss this error will continue with evaluating the results further and will eventually find the query results different and hence will mark the transaction invalid. Now, this change, instead of marking the transaction invalid, will cause a panic in upstream - So, this should be an acceptable change without an additional capability check.

On a side note, having said the above, I see the possibility of this error getting generated only if someone is using HSM for hash computation. Because, golang's crypto hash library does not return error. Also, as you mentioned the other possibility of the error is the proto marshal down stream, which is only possible if there would have been a programming error in our code, potentially passing nil (which is not happening here)

return false, err
}
merkle := v.resultsHelper.GetMerkleSummary()

if merkle.MaxLevel < inMerkle.MaxLevel {
Expand Down
15 changes: 10 additions & 5 deletions core/ledger/kvledger/txmgmt/validation/tx_ops.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,9 @@ import (
func prepareTxOps(rwset *rwsetutil.TxRwSet, txht *version.Height,
precedingUpdates *publicAndHashUpdates, db *privacyenabledstate.DB) (txOps, error) {
txops := txOps{}
txops.applyTxRwset(rwset)
//logger.Debugf("prepareTxOps() txops after applying raw rwset=%#v", spew.Sdump(txops))
if err := txops.applyTxRwset(rwset); err != nil {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

An err can happen here only due to proto marshaling. Given that the block is already unmarshaled into an internal block structure for the validation purpose, a tx with invalid content in the block would have been invalidated and such tx's rwset would never reach here. Hence, there is no possibility of a fork here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rather, in the current code, the error miss could have caused a state fork (because of missing applying the metadata). Now, instead, this would cause panic in the upstream code instead of committing the transaction without metadata - So, safe and desired change as such.

However, this error should never happen, as this is mainly a programming error (e.g., passing nil to proto.Marshal.

return nil, err
}
for ck, keyop := range txops {
// check if the final state of the key, value and metadata, is already present in the transaction, then skip
// otherwise we need to retrieve latest state and merge in the current value or metadata update
Expand Down Expand Up @@ -62,7 +63,9 @@ func (txops txOps) applyTxRwset(rwset *rwsetutil.TxRwSet) error {
txops.applyKVWrite(ns, "", kvWrite)
}
for _, kvMetadataWrite := range nsRWSet.KvRwSet.MetadataWrites {
txops.applyMetadata(ns, "", kvMetadataWrite)
if err := txops.applyMetadata(ns, "", kvMetadataWrite); err != nil {
return err
}
}

// apply collection level kvwrite and kvMetadataWrite
Expand All @@ -79,12 +82,14 @@ func (txops txOps) applyTxRwset(rwset *rwsetutil.TxRwSet) error {
}

for _, metadataWrite := range collHashRWset.HashedRwSet.MetadataWrites {
txops.applyMetadata(ns, coll,
if err := txops.applyMetadata(ns, coll,
&kvrwset.KVMetadataWrite{
Key: string(metadataWrite.KeyHash),
Entries: metadataWrite.Entries,
},
)
); err != nil {
return err
}
}
}
}
Expand Down
8 changes: 6 additions & 2 deletions core/ledger/kvledger/txmgmt/validation/validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,9 @@ func (v *validator) validateAndPrepareBatch(blk *block, doMVCCValidation bool) (
if validationCode == peer.TxValidationCode_VALID {
logger.Debugf("Block [%d] Transaction index [%d] TxId [%s] marked as valid by state validator. ContainsPostOrderWrites [%t]", blk.num, tx.indexInBlock, tx.id, tx.containsPostOrderWrites)
committingTxHeight := version.NewHeight(blk.num, uint64(tx.indexInBlock))
updates.applyWriteSet(tx.rwset, committingTxHeight, v.db, tx.containsPostOrderWrites)
if err := updates.applyWriteSet(tx.rwset, committingTxHeight, v.db, tx.containsPostOrderWrites); err != nil {
return nil, err
}
} else {
logger.Warningf("Block [%d] Transaction index [%d] TxId [%s] marked as invalid by state validator. Reason code [%s]",
blk.num, tx.indexInBlock, tx.id, validationCode.String())
Expand Down Expand Up @@ -228,7 +230,9 @@ func (v *validator) validateRangeQuery(ns string, rangeQueryInfo *kvrwset.RangeQ
logger.Debug(`Hashing results are not present in the range query info hence, initiating raw KVReads based validation`)
qv = &rangeQueryResultsValidator{}
}
qv.init(rangeQueryInfo, combinedItr)
if err := qv.init(rangeQueryInfo, combinedItr); err != nil {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Even if the init() returns an err (due to Merkle tree degree < 2) in the existing code, the qv.resultsHelper would have become nil. As a result, a panic would occur when we call qv.Validate(). Hence, there is no possibility of a fork here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To summarize it in simple terms, this does not change the current behavior but rather returns the error early on.

return false, err
}
return qv.validate()
}

Expand Down
8 changes: 6 additions & 2 deletions core/ledger/pvtdatastorage/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,12 +78,16 @@ func prepareExpiryEntries(committingBlk uint64, dataEntries []*dataEntry, missin

// 1. prepare expiryData for non-missing data
for _, dataEntry := range dataEntries {
prepareExpiryEntriesForPresentData(mapByExpiringBlk, dataEntry.key, btlPolicy)
if err := prepareExpiryEntriesForPresentData(mapByExpiringBlk, dataEntry.key, btlPolicy); err != nil {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

An err can occur here only if the collection does not exist or if DB access results in an error. As NoSuchCollectionError is not possible here, it is safe to process DB access error and it would not cause a fork.

return nil, err
}
}

// 2. prepare expiryData for missing data
for missingDataKey := range missingDataEntries {
prepareExpiryEntriesForMissingData(mapByExpiringBlk, &missingDataKey, btlPolicy)
if err := prepareExpiryEntriesForMissingData(mapByExpiringBlk, &missingDataKey, btlPolicy); err != nil {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as above.

return nil, err
}
}

for expiryBlk, expiryData := range mapByExpiringBlk {
Expand Down
4 changes: 3 additions & 1 deletion core/ledger/pvtdatastorage/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -776,7 +776,9 @@ func (s *Store) purgeExpiredData(minBlkNum, maxBlkNum uint64) error {
for _, missingDataKey := range missingDataKeys {
batch.Delete(encodeMissingDataKey(missingDataKey))
}
s.db.WriteBatch(batch, false)
if err := s.db.WriteBatch(batch, false); err != nil {
return err
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Only results in a warning message.

}
}
logger.Infof("[%s] - [%d] Entries purged from private data storage till block number [%d]", s.ledgerid, len(expiryEntries), maxBlkNum)
return nil
Expand Down