Skip to content

Commit

Permalink
[FAB-6218] Validate block before pulling private data
Browse files Browse the repository at this point in the history
This commit makes block validation happen before the private data is fetched
from the transient store or from peers, and also removes the validation
from the preCommit() step of the committer.

Change-Id: Ic6ad105293a942e43c0697b4573a2b652b2c933d
Signed-off-by: yacovm <[email protected]>
  • Loading branch information
yacovm committed Sep 24, 2017
1 parent 900850f commit 71847ce
Show file tree
Hide file tree
Showing 8 changed files with 183 additions and 31 deletions.
20 changes: 6 additions & 14 deletions core/committer/committer_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"fmt"

"github.com/hyperledger/fabric/common/flogging"
"github.com/hyperledger/fabric/core/committer/txvalidator"
"github.com/hyperledger/fabric/core/ledger"
"github.com/hyperledger/fabric/events/producer"
"github.com/hyperledger/fabric/protos/common"
Expand All @@ -43,9 +42,8 @@ func init() {
// it keeps the reference to the ledger to commit blocks and retrieve
// chain information
type LedgerCommitter struct {
ledger ledger.PeerLedger
validator txvalidator.Validator
eventer ConfigBlockEventer
ledger ledger.PeerLedger
eventer ConfigBlockEventer
}

// ConfigBlockEventer callback function proto type to define action
Expand All @@ -54,15 +52,15 @@ type ConfigBlockEventer func(block *common.Block) error

// NewLedgerCommitter is a factory function to create an instance of the committer
// which passes incoming blocks via validation and commits them into the ledger.
func NewLedgerCommitter(ledger ledger.PeerLedger, validator txvalidator.Validator) *LedgerCommitter {
return NewLedgerCommitterReactive(ledger, validator, func(_ *common.Block) error { return nil })
func NewLedgerCommitter(ledger ledger.PeerLedger) *LedgerCommitter {
return NewLedgerCommitterReactive(ledger, func(_ *common.Block) error { return nil })
}

// NewLedgerCommitterReactive is a factory function to create an instance of the committer
// same as way as NewLedgerCommitter, while also provides an option to specify callback to
// be called upon new configuration block arrival and commit event
func NewLedgerCommitterReactive(ledger ledger.PeerLedger, validator txvalidator.Validator, eventer ConfigBlockEventer) *LedgerCommitter {
return &LedgerCommitter{ledger: ledger, validator: validator, eventer: eventer}
func NewLedgerCommitterReactive(ledger ledger.PeerLedger, eventer ConfigBlockEventer) *LedgerCommitter {
return &LedgerCommitter{ledger: ledger, eventer: eventer}
}

// Commit commits block to into the ledger
Expand All @@ -88,12 +86,6 @@ func (lc *LedgerCommitter) Commit(block *common.Block) error {
// preCommit takes care to validate the block and update based on its
// content
func (lc *LedgerCommitter) preCommit(block *common.Block) error {
// Validate and mark invalid transactions
logger.Debug("Validating block")
if err := lc.validator.Validate(block); err != nil {
return err
}

// Updating CSCC with new configuration block
if utils.IsConfigBlock(block) {
logger.Debug("Received configuration update, calling CSCC ConfigUpdate")
Expand Down
5 changes: 2 additions & 3 deletions core/committer/committer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import (
"github.com/hyperledger/fabric/common/tools/configtxgen/provisional"
"github.com/hyperledger/fabric/common/util"
"github.com/hyperledger/fabric/core/ledger/ledgermgmt"
"github.com/hyperledger/fabric/core/mocks/validator"
"github.com/hyperledger/fabric/protos/common"
"github.com/spf13/viper"
"github.com/stretchr/testify/assert"
Expand All @@ -42,7 +41,7 @@ func TestKVLedgerBlockStorage(t *testing.T) {
assert.NoError(t, err, "Error while creating ledger: %s", err)
defer ledger.Close()

committer := NewLedgerCommitter(ledger, &validator.MockValidator{})
committer := NewLedgerCommitter(ledger)
height, err := committer.LedgerHeight()
assert.Equal(t, uint64(1), height)
assert.NoError(t, err)
Expand Down Expand Up @@ -92,7 +91,7 @@ func TestNewLedgerCommitterReactive(t *testing.T) {
defer ledger.Close()

var configArrived int32
committer := NewLedgerCommitterReactive(ledger, &validator.MockValidator{}, func(_ *common.Block) error {
committer := NewLedgerCommitterReactive(ledger, func(_ *common.Block) error {
atomic.AddInt32(&configArrived, 1)
return nil
})
Expand Down
4 changes: 3 additions & 1 deletion core/peer/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,8 @@ func createChain(cid string, ledger ledger.PeerLedger, cb *common.Block) error {
cs.Resources = bundleSource
cs.bundleSource = bundleSource

c := committer.NewLedgerCommitterReactive(ledger, txvalidator.NewTxValidator(cs), func(block *common.Block) error {
validator := txvalidator.NewTxValidator(cs)
c := committer.NewLedgerCommitterReactive(ledger, func(block *common.Block) error {
chainID, err := utils.GetChainIDFromBlock(block)
if err != nil {
return err
Expand All @@ -300,6 +301,7 @@ func createChain(cid string, ledger ledger.PeerLedger, cb *common.Block) error {
return errors.Wrapf(err, "Failed opening transient store for %s", bundle.ConfigtxManager().ChainID())
}
service.GetGossipService().InitializeChannel(bundle.ConfigtxManager().ChainID(), ordererAddresses, service.Support{
Validator: validator,
Committer: c,
Store: store,
Pp: &noopPolicyParser{},
Expand Down
27 changes: 24 additions & 3 deletions gossip/privdata/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,15 @@ import (

util2 "github.com/hyperledger/fabric/common/util"
"github.com/hyperledger/fabric/core/committer"
"github.com/hyperledger/fabric/core/committer/txvalidator"
"github.com/hyperledger/fabric/core/ledger"
"github.com/hyperledger/fabric/core/ledger/kvledger/txmgmt/rwsetutil"
"github.com/hyperledger/fabric/core/transientstore"
"github.com/hyperledger/fabric/gossip/util"
"github.com/hyperledger/fabric/protos/common"
gossip2 "github.com/hyperledger/fabric/protos/gossip"
"github.com/hyperledger/fabric/protos/ledger/rwset"
"github.com/hyperledger/fabric/protos/peer"
"github.com/hyperledger/fabric/protos/utils"
"github.com/op/go-logging"
"github.com/pkg/errors"
Expand Down Expand Up @@ -71,14 +73,15 @@ type fetcher interface {
}

type coordinator struct {
txvalidator.Validator
committer.Committer
TransientStore
gossipFetcher fetcher
}

// NewCoordinator creates a new instance of coordinator
func NewCoordinator(committer committer.Committer, store TransientStore, gossipFetcher fetcher) Coordinator {
return &coordinator{Committer: committer, TransientStore: store, gossipFetcher: gossipFetcher}
func NewCoordinator(committer committer.Committer, store TransientStore, gossipFetcher fetcher, validator txvalidator.Validator) Coordinator {
return &coordinator{Committer: committer, TransientStore: store, gossipFetcher: gossipFetcher, Validator: validator}
}

// StorePvtData used to persist private date into transient store
Expand All @@ -94,6 +97,13 @@ func (c *coordinator) StoreBlock(block *common.Block, privateDataSets util.PvtDa
if block.Header == nil {
return errors.New("Block header is nil")
}

logger.Debug("Validating block", block.Header.Number)
err := c.Validator.Validate(block)
if err != nil {
return errors.WithMessage(err, "Validation failed")
}

blockAndPvtData := &ledger.BlockAndPvtData{
Block: block,
BlockPvtData: make(map[uint64]*ledger.TxPvtData),
Expand Down Expand Up @@ -387,10 +397,21 @@ func (k *rwSetKey) toTxPvtReadWriteSet(rws []byte) *rwset.TxPvtReadWriteSet {
}

func (c *coordinator) listMissingPrivateData(block *common.Block, ownedRWsets map[rwSetKey][]byte) (rwSetKeysByTxIDs, error) {
if block.Metadata == nil || len(block.Metadata.Metadata) <= int(common.BlockMetadataIndex_TRANSACTIONS_FILTER) {
return nil, errors.New("Block.Metadata is nil or Block.Metadata lacks a Tx filter bitmap")
}
txsFilter := txValidationFlags(block.Metadata.Metadata[common.BlockMetadataIndex_TRANSACTIONS_FILTER])
if len(txsFilter) != len(block.Data.Data) {
return nil, errors.Errorf("Block data size(%d) is different from Tx filter size(%d)", len(block.Data.Data), len(txsFilter))
}

privateRWsetsInBlock := make(map[rwSetKey]struct{})
missing := make(rwSetKeysByTxIDs)

for seqInBlock, envBytes := range block.Data.Data {
if txsFilter[seqInBlock] != uint8(peer.TxValidationCode_VALID) {
logger.Debug("Skipping Tx", seqInBlock, "because it's invalid. Status is", txsFilter[seqInBlock])
continue
}
env, err := utils.GetEnvelopeFromBlock(envBytes)
if err != nil {
return nil, err
Expand Down
95 changes: 92 additions & 3 deletions gossip/privdata/coordinator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,17 @@ func (mock *committerMock) Close() {
mock.Called()
}

type validatorMock struct {
err error
}

func (v *validatorMock) Validate(block *common.Block) error {
if v.err != nil {
return v.err
}
return nil
}

type digests []*proto.PvtDataDigest

func (d digests) Equal(other digests) bool {
Expand Down Expand Up @@ -425,6 +436,84 @@ var expectedCommittedPrivateData2 = map[uint64]*ledger.TxPvtData{
}},
}

func TestCoordinatorStoreInvalidBlock(t *testing.T) {
hash := util2.ComputeSHA256([]byte("rws-pre-image"))
committer := &committerMock{}
committer.On("CommitWithPvtData", mock.Anything).Run(func(args mock.Arguments) {
t.Fatal("Shouldn't have committed")
}).Return(nil)
store := &mockTransientStore{t: t}
fetcher := &fetcherMock{t: t}
pdFactory := &pvtDataFactory{}
bf := &blockFactory{
channelID: "test",
}

block := bf.withoutMetadata().create()
// Scenario I: Block we got doesn't have any metadata with it
pvtData := pdFactory.create()
coordinator := NewCoordinator(committer, store, fetcher, &validatorMock{})
err := coordinator.StoreBlock(block, pvtData)
assert.Error(t, err)
assert.Contains(t, err.Error(), "Block.Metadata is nil or Block.Metadata lacks a Tx filter bitmap")

// Scenario II: Validator has an error while validating the block
block = bf.create()
pvtData = pdFactory.create()
coordinator = NewCoordinator(committer, store, fetcher, &validatorMock{fmt.Errorf("failed validating block")})
err = coordinator.StoreBlock(block, pvtData)
assert.Error(t, err)
assert.Contains(t, err.Error(), "failed validating block")

// Scenario III: Block we got contains an inadequate length of Tx filter in the metadata
block = bf.withMetadataSize(100).create()
pvtData = pdFactory.create()
coordinator = NewCoordinator(committer, store, fetcher, &validatorMock{})
err = coordinator.StoreBlock(block, pvtData)
assert.Error(t, err)
assert.Contains(t, err.Error(), "Block data size")
assert.Contains(t, err.Error(), "is different from Tx filter size")

// Scenario IV: The second transaction in the block we got is invalid, and we have no private data for that.
// If the coordinator would try to fetch private data, the test would fall because we haven't defined the
// mock operations for the transientstore (or for gossip) in this test.
var commitHappened bool
assertCommitHappened := func() {
assert.True(t, commitHappened)
commitHappened = false
}
committer = &committerMock{}
committer.On("CommitWithPvtData", mock.Anything).Run(func(args mock.Arguments) {
var privateDataPassed2Ledger privateData = args.Get(0).(*ledger.BlockAndPvtData).BlockPvtData
commitHappened = true
// Only the first transaction's private data is passed to the ledger
assert.Len(t, privateDataPassed2Ledger, 1)
assert.Equal(t, 0, int(privateDataPassed2Ledger[0].SeqInBlock))
// The private data passed to the ledger contains "ns1" and has 2 collections in it
assert.Len(t, privateDataPassed2Ledger[0].WriteSet.NsPvtRwset, 1)
assert.Equal(t, "ns1", privateDataPassed2Ledger[0].WriteSet.NsPvtRwset[0].Namespace)
assert.Len(t, privateDataPassed2Ledger[0].WriteSet.NsPvtRwset[0].CollectionPvtRwset, 2)
}).Return(nil)
block = bf.withInvalidTxns(1).AddTxn("tx1", "ns1", hash, "c1", "c2").AddTxn("tx2", "ns2", hash, "c1").create()
pvtData = pdFactory.addRWSet().addNSRWSet("ns1", "c1", "c2").create()
coordinator = NewCoordinator(committer, store, fetcher, &validatorMock{})
err = coordinator.StoreBlock(block, pvtData)
assert.NoError(t, err)
assertCommitHappened()

// Scenario V: Block doesn't contain a header
block.Header = nil
err = coordinator.StoreBlock(block, pvtData)
assert.Error(t, err)
assert.Contains(t, err.Error(), "Block header is nil")

// Scenario V: Block doesn't contain Data
block.Data = nil
err = coordinator.StoreBlock(block, pvtData)
assert.Error(t, err)
assert.Contains(t, err.Error(), "Block data is empty")
}

func TestCoordinatorStoreBlock(t *testing.T) {
// Green path test, all private data should be obtained successfully
var commitHappened bool
Expand Down Expand Up @@ -452,7 +541,7 @@ func TestCoordinatorStoreBlock(t *testing.T) {
// If the coordinator tries fetching from the transientstore, or peers it would result in panic,
// because we didn't define yet the "On(...)" invocation of the transient store or other peers.
pvtData := pdFactory.addRWSet().addNSRWSet("ns1", "c1", "c2").addRWSet().addNSRWSet("ns2", "c1").create()
coordinator := NewCoordinator(committer, store, fetcher)
coordinator := NewCoordinator(committer, store, fetcher, &validatorMock{})
err := coordinator.StoreBlock(block, pvtData)
assert.NoError(t, err)
assertCommitHappened()
Expand Down Expand Up @@ -551,7 +640,7 @@ func TestCoordinatorStoreBlock(t *testing.T) {
assert.True(t, privateDataPassed2Ledger.Equal(expectedCommittedPrivateData2))
commitHappened = true
}).Return(nil)
coordinator = NewCoordinator(committer, store, fetcher)
coordinator = NewCoordinator(committer, store, fetcher, &validatorMock{})
err = coordinator.StoreBlock(block, nil)
assert.NoError(t, err)
assertCommitHappened()
Expand All @@ -561,7 +650,7 @@ func TestCoordinatorGetBlocks(t *testing.T) {
committer := &committerMock{}
store := &mockTransientStore{t: t}
fetcher := &fetcherMock{t: t}
coordinator := NewCoordinator(committer, store, fetcher)
coordinator := NewCoordinator(committer, store, fetcher, &validatorMock{})

// Bad path: block is not returned
committer.On("GetBlocks", mock.Anything).Return([]*common.Block{})
Expand Down
49 changes: 45 additions & 4 deletions gossip/privdata/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,14 @@ import (
"github.com/hyperledger/fabric/protos/peer"
)

type txValidationFlags []uint8

type blockFactory struct {
channelID string
transactions [][]byte
channelID string
transactions [][]byte
metadataSize int
lacksMetadata bool
invalidTxns map[int]struct{}
}

func (bf *blockFactory) AddTxn(txID string, nsName string, hash []byte, collections ...string) *blockFactory {
Expand Down Expand Up @@ -91,16 +96,52 @@ func (bf *blockFactory) AddTxn(txID string, nsName string, hash []byte, collecti

func (bf *blockFactory) create() *common.Block {
defer func() {
bf.transactions = nil
*bf = blockFactory{}
}()
return &common.Block{
block := &common.Block{
Header: &common.BlockHeader{
Number: 1,
},
Data: &common.BlockData{
Data: bf.transactions,
},
}

if bf.lacksMetadata {
return block
}
block.Metadata = &common.BlockMetadata{
Metadata: make([][]byte, common.BlockMetadataIndex_TRANSACTIONS_FILTER+1),
}
if bf.metadataSize > 0 {
block.Metadata.Metadata[common.BlockMetadataIndex_TRANSACTIONS_FILTER] = make([]uint8, bf.metadataSize)
} else {
block.Metadata.Metadata[common.BlockMetadataIndex_TRANSACTIONS_FILTER] = make([]uint8, len(block.Data.Data))
}

for txSeqInBlock := range bf.invalidTxns {
block.Metadata.Metadata[common.BlockMetadataIndex_TRANSACTIONS_FILTER][txSeqInBlock] = uint8(peer.TxValidationCode_INVALID_ENDORSER_TRANSACTION)
}

return block
}

func (bf *blockFactory) withoutMetadata() *blockFactory {
bf.lacksMetadata = true
return bf
}

func (bf *blockFactory) withMetadataSize(mdSize int) *blockFactory {
bf.metadataSize = mdSize
return bf
}

func (bf *blockFactory) withInvalidTxns(sequences ...int) *blockFactory {
bf.invalidTxns = make(map[int]struct{})
for _, seq := range sequences {
bf.invalidTxns[seq] = struct{}{}
}
return bf
}

func sampleNsRwSet(ns string, hash []byte, collections ...string) *rwsetutil.NsRwSet {
Expand Down
4 changes: 3 additions & 1 deletion gossip/service/gossip_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/hyperledger/fabric/protos/ledger/rwset"

"github.com/hyperledger/fabric/core/committer"
"github.com/hyperledger/fabric/core/committer/txvalidator"
"github.com/hyperledger/fabric/core/common/privdata"
"github.com/hyperledger/fabric/core/deliverservice"
"github.com/hyperledger/fabric/core/deliverservice/blocksprovider"
Expand Down Expand Up @@ -201,6 +202,7 @@ func (g *gossipServiceImpl) NewConfigEventer() ConfigProcessor {
}

type Support struct {
Validator txvalidator.Validator
Committer committer.Committer
Store privdata2.TransientStore
Pp privdata.PolicyParser
Expand All @@ -215,7 +217,7 @@ func (g *gossipServiceImpl) InitializeChannel(chainID string, endpoints []string
logger.Debug("Creating state provider for chainID", chainID)
servicesAdapter := &state.ServicesMediator{GossipAdapter: g, MCSAdapter: g.mcs}
fetcher := privdata2.NewPuller(support.Ps, support.Pp, g.gossipSvc, NewDataRetriever(support.Store), chainID)
coordinator := privdata2.NewCoordinator(support.Committer, support.Store, fetcher)
coordinator := privdata2.NewCoordinator(support.Committer, support.Store, fetcher, support.Validator)
g.privateHandlers[chainID] = privateHandler{
support: support,
coordinator: coordinator,
Expand Down
Loading

0 comments on commit 71847ce

Please sign in to comment.