Skip to content

Commit

Permalink
[FAB-6520] Extract endorsers from block for pull
Browse files Browse the repository at this point in the history
This change set makes the coordinator extract the endorsers
of the transaction in order to pass that to the pull mechanism
which would prioritize pulling from endorsers.

Change-Id: I35b317c73435dc625d14699f3fa02faea431eab0
Signed-off-by: yacovm <[email protected]>
  • Loading branch information
yacovm committed Oct 13, 2017
1 parent 8a52d63 commit ff714cd
Show file tree
Hide file tree
Showing 8 changed files with 313 additions and 126 deletions.
4 changes: 4 additions & 0 deletions core/common/privdata/collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,10 @@ type CollectionAccessPolicy interface {
// RequiredExternalPeerCount returns the minimum number of internal peers
// required to send private data to
RequiredInternalPeerCount() int

// MemberOrgs returns the collection's members as MSP IDs. This serves as
// a human-readable way of quickly identifying who is part of a collection.
MemberOrgs() []string
}

// Filter defines a rule that filters peers according to data signed by them.
Expand Down
231 changes: 162 additions & 69 deletions gossip/privdata/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"fmt"
"time"

"github.com/golang/protobuf/proto"
util2 "github.com/hyperledger/fabric/common/util"
"github.com/hyperledger/fabric/core/committer"
"github.com/hyperledger/fabric/core/committer/txvalidator"
Expand All @@ -23,6 +24,7 @@ import (
"github.com/hyperledger/fabric/protos/common"
gossip2 "github.com/hyperledger/fabric/protos/gossip"
"github.com/hyperledger/fabric/protos/ledger/rwset"
msp "github.com/hyperledger/fabric/protos/msp"
"github.com/hyperledger/fabric/protos/peer"
"github.com/hyperledger/fabric/protos/utils"
"github.com/op/go-logging"
Expand Down Expand Up @@ -77,8 +79,18 @@ type Coordinator interface {
Close()
}

type dig2sources map[*gossip2.PvtDataDigest][]*peer.Endorsement

func (d2s dig2sources) keys() []*gossip2.PvtDataDigest {
var res []*gossip2.PvtDataDigest
for dig := range d2s {
res = append(res, dig)
}
return res
}

type Fetcher interface {
fetch(req *gossip2.RemotePvtDataRequest) ([]*gossip2.PvtDataElement, error)
fetch(dig2src dig2sources) ([]*gossip2.PvtDataElement, error)
}

type Support struct {
Expand Down Expand Up @@ -132,35 +144,24 @@ func (c *coordinator) StoreBlock(block *common.Block, privateDataSets util.PvtDa
}
logger.Info("Got block", block.Header.Number, "with", len(privateDataSets), "rwsets")

missing, txList, err := c.listMissingPrivateData(block, ownedRWsets)
privateInfo, err := c.listMissingPrivateData(block, ownedRWsets)
if err != nil {
logger.Warning(err)
return err
}
logger.Debug("Missing", len(missing), "rwsets")

// Put into ownedRWsets RW sets that are missing and found in the transient store
c.fetchMissingFromTransientStore(missing, ownedRWsets)

missingKeys := missing.flatten()
// Remove all keys we already own
missingKeys.exclude(func(key rwSetKey) bool {
_, exists := ownedRWsets[key]
return exists
})

retryThresh := viper.GetDuration("peer.gossip.pvtData.pullRetryThreshold")
logger.Debug("Fetching", len(missingKeys), "rwsets from peers for a maximum duration of", retryThresh)
logger.Debug("Fetching", len(privateInfo.missingKeys), "rwsets from peers for a maximum duration of", retryThresh)
start := time.Now()
limit := start.Add(retryThresh)
for len(missingKeys) > 0 && time.Now().Before(limit) {
c.fetchFromPeers(block.Header.Number, missingKeys, ownedRWsets)
for len(privateInfo.missingKeys) > 0 && time.Now().Before(limit) {
c.fetchFromPeers(block.Header.Number, ownedRWsets, privateInfo)
time.Sleep(pullRetrySleepInterval)
}
if len(missingKeys) == 0 {
if len(privateInfo.missingKeys) == 0 {
logger.Debug("Fetched all missing rwsets from peers")
} else {
logger.Warning("Missing", missingKeys)
logger.Warning("Missing", privateInfo.missingKeys)
}

// populate the private RWSets passed to the ledger
Expand All @@ -174,7 +175,7 @@ func (c *coordinator) StoreBlock(block *common.Block, privateDataSets util.PvtDa
}

// populate missing RWSets to be passed to the ledger
for missingRWS := range missingKeys {
for missingRWS := range privateInfo.missingKeys {
blockAndPvtData.Missing = append(blockAndPvtData.Missing, ledger.MissingPrivateData{
TxId: missingRWS.txID,
Namespace: missingRWS.namespace,
Expand All @@ -191,28 +192,30 @@ func (c *coordinator) StoreBlock(block *common.Block, privateDataSets util.PvtDa

if len(blockAndPvtData.BlockPvtData) > 0 {
// Finally, purge all transactions in block - valid or not valid.
if err := c.PurgeByTxids(txList); err != nil {
logger.Error("Purging transactions", txList, "failed:", err)
if err := c.PurgeByTxids(privateInfo.txns); err != nil {
logger.Error("Purging transactions", privateInfo.txns, "failed:", err)
}
}

return nil
}

func (c *coordinator) fetchFromPeers(blockSeq uint64, missingKeys rwsetKeys, ownedRWsets map[rwSetKey][]byte) {
req := &gossip2.RemotePvtDataRequest{}
missingKeys.foreach(func(k rwSetKey) {
req.Digests = append(req.Digests, &gossip2.PvtDataDigest{
type dataSources map[rwSetKey][]*peer.Endorsement

func (c *coordinator) fetchFromPeers(blockSeq uint64, ownedRWsets map[rwSetKey][]byte, privateInfo *privateDataInfo) {
dig2src := make(map[*gossip2.PvtDataDigest][]*peer.Endorsement)
privateInfo.missingKeys.foreach(func(k rwSetKey) {
logger.Debug("Fetching", k, "from peers")
dig := &gossip2.PvtDataDigest{
TxId: k.txID,
SeqInBlock: k.seqInBlock,
Collection: k.collection,
Namespace: k.namespace,
BlockSeq: blockSeq,
})
}
dig2src[dig] = privateInfo.sources[k]
})

logger.Debug("Fetching", req.Digests, "from peers")
fetchedData, err := c.fetch(req)
fetchedData, err := c.fetch(dig2src)
if err != nil {
logger.Warning("Failed fetching private data for block", blockSeq, "from peers:", err)
return
Expand All @@ -230,12 +233,12 @@ func (c *coordinator) fetchFromPeers(blockSeq uint64, missingKeys rwsetKeys, own
seqInBlock: dig.SeqInBlock,
hash: hash,
}
if _, isMissing := missingKeys[key]; !isMissing {
if _, isMissing := privateInfo.missingKeys[key]; !isMissing {
logger.Debug("Ignoring", key, "because it wasn't found in the block")
continue
}
ownedRWsets[key] = rws
delete(missingKeys, key)
delete(privateInfo.missingKeys, key)
// TODO Pass received at block height instead of 0
c.TransientStore.Persist(dig.TxId, 0, key.toTxPvtReadWriteSet(rws))
logger.Debug("Fetched", key)
Expand Down Expand Up @@ -461,8 +464,9 @@ func (k *rwSetKey) toTxPvtReadWriteSet(rws []byte) *rwset.TxPvtReadWriteSet {

type txns []string
type blockData [][]byte
type blockConsumer func(seqInBlock uint64, chdr *common.ChannelHeader, txRWSet *rwsetutil.TxRwSet, endorsers []*peer.Endorsement)

func (data blockData) forEachTxn(txsFilter txValidationFlags, consumer func(seqInBlock uint64, chdr *common.ChannelHeader, txRWSet *rwsetutil.TxRwSet)) (txns, error) {
func (data blockData) forEachTxn(txsFilter txValidationFlags, consumer blockConsumer) (txns, error) {
var txList []string
for seqInBlock, envBytes := range data {
env, err := utils.GetEnvelopeFromBlock(envBytes)
Expand Down Expand Up @@ -500,54 +504,81 @@ func (data blockData) forEachTxn(txsFilter txValidationFlags, consumer func(seqI
continue
}

tx, err := utils.GetTransaction(payload.Data)
if err != nil {
logger.Warning("Invalid transaction in payload data for tx ", chdr.TxId, ":", err)
continue
}

ccActionPayload, err := utils.GetChaincodeActionPayload(tx.Actions[0].Payload)
if err != nil {
logger.Warning("Invalid chaincode action in payload for tx", chdr.TxId, ":", err)
continue
}

if ccActionPayload.Action == nil {
logger.Warning("Action in ChaincodeActionPayload for", chdr.TxId, "is nil")
continue
}

txRWSet := &rwsetutil.TxRwSet{}
if err = txRWSet.FromProtoBytes(respPayload.Results); err != nil {
logger.Warning("Failed obtaining TxRwSet from ChaincodeAction's results", err)
continue
}
consumer(uint64(seqInBlock), chdr, txRWSet)
consumer(uint64(seqInBlock), chdr, txRWSet, ccActionPayload.Action.Endorsements)
}
return txList, nil
}

func (c *coordinator) listMissingPrivateData(block *common.Block, ownedRWsets map[rwSetKey][]byte) (rwSetKeysByTxIDs, txns, error) {
func endorsersFromOrgs(ns string, col string, endorsers []*peer.Endorsement, orgs []string) []*peer.Endorsement {
var res []*peer.Endorsement
for _, e := range endorsers {
sId := &msp.SerializedIdentity{}
err := proto.Unmarshal(e.Endorser, sId)
if err != nil {
logger.Warning("Failed unmarshalling endorser:", err)
continue
}
if !util.Contains(sId.Mspid, orgs) {
logger.Debug(sId.Mspid, "isn't among the collection's orgs:", orgs, "for namespace", ns, ",collection", col)
continue
}
res = append(res, e)
}
return res
}

type privateDataInfo struct {
sources map[rwSetKey][]*peer.Endorsement
missingKeysByTxIDs rwSetKeysByTxIDs
missingKeys rwsetKeys
txns txns
}

func (c *coordinator) listMissingPrivateData(block *common.Block, ownedRWsets map[rwSetKey][]byte) (*privateDataInfo, error) {
if block.Metadata == nil || len(block.Metadata.Metadata) <= int(common.BlockMetadataIndex_TRANSACTIONS_FILTER) {
return nil, nil, errors.New("Block.Metadata is nil or Block.Metadata lacks a Tx filter bitmap")
return nil, errors.New("Block.Metadata is nil or Block.Metadata lacks a Tx filter bitmap")
}
txsFilter := txValidationFlags(block.Metadata.Metadata[common.BlockMetadataIndex_TRANSACTIONS_FILTER])
if len(txsFilter) != len(block.Data.Data) {
return nil, nil, errors.Errorf("Block data size(%d) is different from Tx filter size(%d)", len(block.Data.Data), len(txsFilter))
return nil, errors.Errorf("Block data size(%d) is different from Tx filter size(%d)", len(block.Data.Data), len(txsFilter))
}

sources := make(map[rwSetKey][]*peer.Endorsement)
privateRWsetsInBlock := make(map[rwSetKey]struct{})
missing := make(rwSetKeysByTxIDs)
data := blockData(block.Data.Data)
txList, err := data.forEachTxn(txsFilter, func(seqInBlock uint64, chdr *common.ChannelHeader, txRWSet *rwsetutil.TxRwSet) {
for _, ns := range txRWSet.NsRwSets {
for _, hashed := range ns.CollHashedRwSets {
if !c.isEligible(chdr, ns.NameSpace, hashed.CollectionName) {
continue
}
key := rwSetKey{
txID: chdr.TxId,
seqInBlock: seqInBlock,
hash: hex.EncodeToString(hashed.PvtRwSetHash),
namespace: ns.NameSpace,
collection: hashed.CollectionName,
}
privateRWsetsInBlock[key] = struct{}{}
if _, exists := ownedRWsets[key]; !exists {
txAndSeq := txAndSeqInBlock{
txID: chdr.TxId,
seqInBlock: seqInBlock,
}
missing[txAndSeq] = append(missing[txAndSeq], key)
}
} // for all hashed RW sets
} // for all RW sets
})
bi := &transactionInspector{
sources: sources,
missingKeys: missing,
ownedRWsets: ownedRWsets,
privateRWsetsInBlock: privateRWsetsInBlock,
coordinator: c,
}
txList, err := data.forEachTxn(txsFilter, bi.inspectTransaction)
if err != nil {
return nil, nil, errors.WithStack(err)
return nil, errors.WithStack(err)
}
// In the end, iterate over the ownedRWsets, and if the key doesn't exist in
// the privateRWsetsInBlock - delete it from the ownedRWsets
Expand All @@ -558,11 +589,68 @@ func (c *coordinator) listMissingPrivateData(block *common.Block, ownedRWsets ma
}
}

return missing, txList, nil
privateInfo := &privateDataInfo{
sources: sources,
missingKeysByTxIDs: missing,
txns: txList,
}

logger.Debug("Missing", len(privateInfo.missingKeysByTxIDs), "rwsets")

// Put into ownedRWsets RW sets that are missing and found in the transient store
c.fetchMissingFromTransientStore(privateInfo.missingKeysByTxIDs, ownedRWsets)

privateInfo.missingKeys = privateInfo.missingKeysByTxIDs.flatten()
// Remove all keys we already own
privateInfo.missingKeys.exclude(func(key rwSetKey) bool {
_, exists := ownedRWsets[key]
return exists
})

return privateInfo, nil
}

type transactionInspector struct {
*coordinator
privateRWsetsInBlock map[rwSetKey]struct{}
missingKeys rwSetKeysByTxIDs
sources map[rwSetKey][]*peer.Endorsement
ownedRWsets map[rwSetKey][]byte
}

func (bi *transactionInspector) inspectTransaction(seqInBlock uint64, chdr *common.ChannelHeader, txRWSet *rwsetutil.TxRwSet, endorsers []*peer.Endorsement) {
for _, ns := range txRWSet.NsRwSets {
for _, hashed := range ns.CollHashedRwSets {
policy := bi.accessPolicyForCollection(chdr, ns.NameSpace, hashed.CollectionName)
if policy == nil {
continue
}
if !bi.isEligible(policy, ns.NameSpace, hashed.CollectionName) {
continue
}
key := rwSetKey{
txID: chdr.TxId,
seqInBlock: seqInBlock,
hash: hex.EncodeToString(hashed.PvtRwSetHash),
namespace: ns.NameSpace,
collection: hashed.CollectionName,
}
bi.privateRWsetsInBlock[key] = struct{}{}
if _, exists := bi.ownedRWsets[key]; !exists {
txAndSeq := txAndSeqInBlock{
txID: chdr.TxId,
seqInBlock: seqInBlock,
}
bi.missingKeys[txAndSeq] = append(bi.missingKeys[txAndSeq], key)
bi.sources[key] = endorsersFromOrgs(ns.NameSpace, hashed.CollectionName, endorsers, policy.MemberOrgs())
}
} // for all hashed RW sets
} // for all RW sets
}

// isEligible checks if this peer is eligible for a collection in a given namespace
func (c *coordinator) isEligible(chdr *common.ChannelHeader, namespace string, col string) bool {
// accessPolicyForCollection retrieves a CollectionAccessPolicy for a given namespace, collection name
// that corresponds to a given ChannelHeader
func (c *coordinator) accessPolicyForCollection(chdr *common.ChannelHeader, namespace string, col string) privdata.CollectionAccessPolicy {
cp := common.CollectionCriteria{
Channel: chdr.ChannelId,
Namespace: namespace,
Expand All @@ -572,16 +660,21 @@ func (c *coordinator) isEligible(chdr *common.ChannelHeader, namespace string, c
sp := c.CollectionStore.RetrieveCollectionAccessPolicy(cp)
if sp == nil {
logger.Warning("Failed obtaining policy for", cp, "skipping collection")
return false
return nil
}
filt := sp.AccessFilter()
return sp
}

// isEligible checks if this peer is eligible for a given CollectionAccessPolicy
func (c *coordinator) isEligible(ap privdata.CollectionAccessPolicy, namespace string, col string) bool {
filt := ap.AccessFilter()
if filt == nil {
logger.Warning("Failed parsing policy for", cp, "skipping collection")
logger.Warning("Failed parsing policy for namespace", namespace, "collection", col, "skipping collection")
return false
}
eligible := filt(c.selfSignedData)
if !eligible {
logger.Debug("Skipping", cp, "because we're not eligible for the private data")
logger.Debug("Skipping namespace", namespace, "collection", col, "because we're not eligible for the private data")
}
return eligible
}
Expand Down Expand Up @@ -638,7 +731,7 @@ func (c *coordinator) GetPvtDataAndBlockByNum(seqNum uint64, peerAuthInfo common

seqs2Namespaces := aggregatedCollections(make(map[seqAndDataModel]map[string][]*rwset.CollectionPvtReadWriteSet))
data := blockData(blockAndPvtData.Block.Data.Data)
_, err = data.forEachTxn(make(txValidationFlags, len(data)), func(seqInBlock uint64, chdr *common.ChannelHeader, txRWSet *rwsetutil.TxRwSet) {
_, err = data.forEachTxn(make(txValidationFlags, len(data)), func(seqInBlock uint64, chdr *common.ChannelHeader, txRWSet *rwsetutil.TxRwSet, _ []*peer.Endorsement) {
item, exists := blockAndPvtData.BlockPvtData[seqInBlock]
if !exists {
return
Expand Down
Loading

0 comments on commit ff714cd

Please sign in to comment.