From ff714cdecbfe6407313453500eca30f2f67dfccd Mon Sep 17 00:00:00 2001 From: yacovm Date: Mon, 2 Oct 2017 16:07:12 +0300 Subject: [PATCH] [FAB-6520] Extract endorsers from block for pull This change set makes the coordinator extract the endorsers of the transaction in order to pass that to the pull mechanism which would prioritize pulling from endorsers. Change-Id: I35b317c73435dc625d14699f3fa02faea431eab0 Signed-off-by: yacovm --- core/common/privdata/collection.go | 4 + gossip/privdata/coordinator.go | 231 +++++++++++++++++++--------- gossip/privdata/coordinator_test.go | 83 ++++++---- gossip/privdata/pull.go | 8 +- gossip/privdata/pull_test.go | 47 +++--- gossip/privdata/util.go | 51 ++++++ gossip/util/misc.go | 10 ++ gossip/util/misc_test.go | 5 + 8 files changed, 313 insertions(+), 126 deletions(-) diff --git a/core/common/privdata/collection.go b/core/common/privdata/collection.go index 6dca1299bdc..5f22d1d82f9 100644 --- a/core/common/privdata/collection.go +++ b/core/common/privdata/collection.go @@ -40,6 +40,10 @@ type CollectionAccessPolicy interface { // RequiredExternalPeerCount returns the minimum number of internal peers // required to send private data to RequiredInternalPeerCount() int + + // MemberOrgs returns the collection's members as MSP IDs. This serves as + // a human-readable way of quickly identifying who is part of a collection. + MemberOrgs() []string } // Filter defines a rule that filters peers according to data signed by them. diff --git a/gossip/privdata/coordinator.go b/gossip/privdata/coordinator.go index d5bb459ae6a..ec43b2166c1 100644 --- a/gossip/privdata/coordinator.go +++ b/gossip/privdata/coordinator.go @@ -12,6 +12,7 @@ import ( "fmt" "time" + "github.com/golang/protobuf/proto" util2 "github.com/hyperledger/fabric/common/util" "github.com/hyperledger/fabric/core/committer" "github.com/hyperledger/fabric/core/committer/txvalidator" @@ -23,6 +24,7 @@ import ( "github.com/hyperledger/fabric/protos/common" gossip2 "github.com/hyperledger/fabric/protos/gossip" "github.com/hyperledger/fabric/protos/ledger/rwset" + msp "github.com/hyperledger/fabric/protos/msp" "github.com/hyperledger/fabric/protos/peer" "github.com/hyperledger/fabric/protos/utils" "github.com/op/go-logging" @@ -77,8 +79,18 @@ type Coordinator interface { Close() } +type dig2sources map[*gossip2.PvtDataDigest][]*peer.Endorsement + +func (d2s dig2sources) keys() []*gossip2.PvtDataDigest { + var res []*gossip2.PvtDataDigest + for dig := range d2s { + res = append(res, dig) + } + return res +} + type Fetcher interface { - fetch(req *gossip2.RemotePvtDataRequest) ([]*gossip2.PvtDataElement, error) + fetch(dig2src dig2sources) ([]*gossip2.PvtDataElement, error) } type Support struct { @@ -132,35 +144,24 @@ func (c *coordinator) StoreBlock(block *common.Block, privateDataSets util.PvtDa } logger.Info("Got block", block.Header.Number, "with", len(privateDataSets), "rwsets") - missing, txList, err := c.listMissingPrivateData(block, ownedRWsets) + privateInfo, err := c.listMissingPrivateData(block, ownedRWsets) if err != nil { logger.Warning(err) return err } - logger.Debug("Missing", len(missing), "rwsets") - - // Put into ownedRWsets RW sets that are missing and found in the transient store - c.fetchMissingFromTransientStore(missing, ownedRWsets) - - missingKeys := missing.flatten() - // Remove all keys we already own - missingKeys.exclude(func(key rwSetKey) bool { - _, exists := ownedRWsets[key] - return exists - }) retryThresh := viper.GetDuration("peer.gossip.pvtData.pullRetryThreshold") - logger.Debug("Fetching", len(missingKeys), "rwsets from peers for a maximum duration of", retryThresh) + logger.Debug("Fetching", len(privateInfo.missingKeys), "rwsets from peers for a maximum duration of", retryThresh) start := time.Now() limit := start.Add(retryThresh) - for len(missingKeys) > 0 && time.Now().Before(limit) { - c.fetchFromPeers(block.Header.Number, missingKeys, ownedRWsets) + for len(privateInfo.missingKeys) > 0 && time.Now().Before(limit) { + c.fetchFromPeers(block.Header.Number, ownedRWsets, privateInfo) time.Sleep(pullRetrySleepInterval) } - if len(missingKeys) == 0 { + if len(privateInfo.missingKeys) == 0 { logger.Debug("Fetched all missing rwsets from peers") } else { - logger.Warning("Missing", missingKeys) + logger.Warning("Missing", privateInfo.missingKeys) } // populate the private RWSets passed to the ledger @@ -174,7 +175,7 @@ func (c *coordinator) StoreBlock(block *common.Block, privateDataSets util.PvtDa } // populate missing RWSets to be passed to the ledger - for missingRWS := range missingKeys { + for missingRWS := range privateInfo.missingKeys { blockAndPvtData.Missing = append(blockAndPvtData.Missing, ledger.MissingPrivateData{ TxId: missingRWS.txID, Namespace: missingRWS.namespace, @@ -191,28 +192,30 @@ func (c *coordinator) StoreBlock(block *common.Block, privateDataSets util.PvtDa if len(blockAndPvtData.BlockPvtData) > 0 { // Finally, purge all transactions in block - valid or not valid. - if err := c.PurgeByTxids(txList); err != nil { - logger.Error("Purging transactions", txList, "failed:", err) + if err := c.PurgeByTxids(privateInfo.txns); err != nil { + logger.Error("Purging transactions", privateInfo.txns, "failed:", err) } } return nil } -func (c *coordinator) fetchFromPeers(blockSeq uint64, missingKeys rwsetKeys, ownedRWsets map[rwSetKey][]byte) { - req := &gossip2.RemotePvtDataRequest{} - missingKeys.foreach(func(k rwSetKey) { - req.Digests = append(req.Digests, &gossip2.PvtDataDigest{ +type dataSources map[rwSetKey][]*peer.Endorsement + +func (c *coordinator) fetchFromPeers(blockSeq uint64, ownedRWsets map[rwSetKey][]byte, privateInfo *privateDataInfo) { + dig2src := make(map[*gossip2.PvtDataDigest][]*peer.Endorsement) + privateInfo.missingKeys.foreach(func(k rwSetKey) { + logger.Debug("Fetching", k, "from peers") + dig := &gossip2.PvtDataDigest{ TxId: k.txID, SeqInBlock: k.seqInBlock, Collection: k.collection, Namespace: k.namespace, BlockSeq: blockSeq, - }) + } + dig2src[dig] = privateInfo.sources[k] }) - - logger.Debug("Fetching", req.Digests, "from peers") - fetchedData, err := c.fetch(req) + fetchedData, err := c.fetch(dig2src) if err != nil { logger.Warning("Failed fetching private data for block", blockSeq, "from peers:", err) return @@ -230,12 +233,12 @@ func (c *coordinator) fetchFromPeers(blockSeq uint64, missingKeys rwsetKeys, own seqInBlock: dig.SeqInBlock, hash: hash, } - if _, isMissing := missingKeys[key]; !isMissing { + if _, isMissing := privateInfo.missingKeys[key]; !isMissing { logger.Debug("Ignoring", key, "because it wasn't found in the block") continue } ownedRWsets[key] = rws - delete(missingKeys, key) + delete(privateInfo.missingKeys, key) // TODO Pass received at block height instead of 0 c.TransientStore.Persist(dig.TxId, 0, key.toTxPvtReadWriteSet(rws)) logger.Debug("Fetched", key) @@ -461,8 +464,9 @@ func (k *rwSetKey) toTxPvtReadWriteSet(rws []byte) *rwset.TxPvtReadWriteSet { type txns []string type blockData [][]byte +type blockConsumer func(seqInBlock uint64, chdr *common.ChannelHeader, txRWSet *rwsetutil.TxRwSet, endorsers []*peer.Endorsement) -func (data blockData) forEachTxn(txsFilter txValidationFlags, consumer func(seqInBlock uint64, chdr *common.ChannelHeader, txRWSet *rwsetutil.TxRwSet)) (txns, error) { +func (data blockData) forEachTxn(txsFilter txValidationFlags, consumer blockConsumer) (txns, error) { var txList []string for seqInBlock, envBytes := range data { env, err := utils.GetEnvelopeFromBlock(envBytes) @@ -500,54 +504,81 @@ func (data blockData) forEachTxn(txsFilter txValidationFlags, consumer func(seqI continue } + tx, err := utils.GetTransaction(payload.Data) + if err != nil { + logger.Warning("Invalid transaction in payload data for tx ", chdr.TxId, ":", err) + continue + } + + ccActionPayload, err := utils.GetChaincodeActionPayload(tx.Actions[0].Payload) + if err != nil { + logger.Warning("Invalid chaincode action in payload for tx", chdr.TxId, ":", err) + continue + } + + if ccActionPayload.Action == nil { + logger.Warning("Action in ChaincodeActionPayload for", chdr.TxId, "is nil") + continue + } + txRWSet := &rwsetutil.TxRwSet{} if err = txRWSet.FromProtoBytes(respPayload.Results); err != nil { logger.Warning("Failed obtaining TxRwSet from ChaincodeAction's results", err) continue } - consumer(uint64(seqInBlock), chdr, txRWSet) + consumer(uint64(seqInBlock), chdr, txRWSet, ccActionPayload.Action.Endorsements) } return txList, nil } -func (c *coordinator) listMissingPrivateData(block *common.Block, ownedRWsets map[rwSetKey][]byte) (rwSetKeysByTxIDs, txns, error) { +func endorsersFromOrgs(ns string, col string, endorsers []*peer.Endorsement, orgs []string) []*peer.Endorsement { + var res []*peer.Endorsement + for _, e := range endorsers { + sId := &msp.SerializedIdentity{} + err := proto.Unmarshal(e.Endorser, sId) + if err != nil { + logger.Warning("Failed unmarshalling endorser:", err) + continue + } + if !util.Contains(sId.Mspid, orgs) { + logger.Debug(sId.Mspid, "isn't among the collection's orgs:", orgs, "for namespace", ns, ",collection", col) + continue + } + res = append(res, e) + } + return res +} + +type privateDataInfo struct { + sources map[rwSetKey][]*peer.Endorsement + missingKeysByTxIDs rwSetKeysByTxIDs + missingKeys rwsetKeys + txns txns +} + +func (c *coordinator) listMissingPrivateData(block *common.Block, ownedRWsets map[rwSetKey][]byte) (*privateDataInfo, error) { if block.Metadata == nil || len(block.Metadata.Metadata) <= int(common.BlockMetadataIndex_TRANSACTIONS_FILTER) { - return nil, nil, errors.New("Block.Metadata is nil or Block.Metadata lacks a Tx filter bitmap") + return nil, errors.New("Block.Metadata is nil or Block.Metadata lacks a Tx filter bitmap") } txsFilter := txValidationFlags(block.Metadata.Metadata[common.BlockMetadataIndex_TRANSACTIONS_FILTER]) if len(txsFilter) != len(block.Data.Data) { - return nil, nil, errors.Errorf("Block data size(%d) is different from Tx filter size(%d)", len(block.Data.Data), len(txsFilter)) + return nil, errors.Errorf("Block data size(%d) is different from Tx filter size(%d)", len(block.Data.Data), len(txsFilter)) } + sources := make(map[rwSetKey][]*peer.Endorsement) privateRWsetsInBlock := make(map[rwSetKey]struct{}) missing := make(rwSetKeysByTxIDs) data := blockData(block.Data.Data) - txList, err := data.forEachTxn(txsFilter, func(seqInBlock uint64, chdr *common.ChannelHeader, txRWSet *rwsetutil.TxRwSet) { - for _, ns := range txRWSet.NsRwSets { - for _, hashed := range ns.CollHashedRwSets { - if !c.isEligible(chdr, ns.NameSpace, hashed.CollectionName) { - continue - } - key := rwSetKey{ - txID: chdr.TxId, - seqInBlock: seqInBlock, - hash: hex.EncodeToString(hashed.PvtRwSetHash), - namespace: ns.NameSpace, - collection: hashed.CollectionName, - } - privateRWsetsInBlock[key] = struct{}{} - if _, exists := ownedRWsets[key]; !exists { - txAndSeq := txAndSeqInBlock{ - txID: chdr.TxId, - seqInBlock: seqInBlock, - } - missing[txAndSeq] = append(missing[txAndSeq], key) - } - } // for all hashed RW sets - } // for all RW sets - }) + bi := &transactionInspector{ + sources: sources, + missingKeys: missing, + ownedRWsets: ownedRWsets, + privateRWsetsInBlock: privateRWsetsInBlock, + coordinator: c, + } + txList, err := data.forEachTxn(txsFilter, bi.inspectTransaction) if err != nil { - return nil, nil, errors.WithStack(err) + return nil, errors.WithStack(err) } // In the end, iterate over the ownedRWsets, and if the key doesn't exist in // the privateRWsetsInBlock - delete it from the ownedRWsets @@ -558,11 +589,68 @@ func (c *coordinator) listMissingPrivateData(block *common.Block, ownedRWsets ma } } - return missing, txList, nil + privateInfo := &privateDataInfo{ + sources: sources, + missingKeysByTxIDs: missing, + txns: txList, + } + + logger.Debug("Missing", len(privateInfo.missingKeysByTxIDs), "rwsets") + + // Put into ownedRWsets RW sets that are missing and found in the transient store + c.fetchMissingFromTransientStore(privateInfo.missingKeysByTxIDs, ownedRWsets) + + privateInfo.missingKeys = privateInfo.missingKeysByTxIDs.flatten() + // Remove all keys we already own + privateInfo.missingKeys.exclude(func(key rwSetKey) bool { + _, exists := ownedRWsets[key] + return exists + }) + + return privateInfo, nil +} + +type transactionInspector struct { + *coordinator + privateRWsetsInBlock map[rwSetKey]struct{} + missingKeys rwSetKeysByTxIDs + sources map[rwSetKey][]*peer.Endorsement + ownedRWsets map[rwSetKey][]byte +} + +func (bi *transactionInspector) inspectTransaction(seqInBlock uint64, chdr *common.ChannelHeader, txRWSet *rwsetutil.TxRwSet, endorsers []*peer.Endorsement) { + for _, ns := range txRWSet.NsRwSets { + for _, hashed := range ns.CollHashedRwSets { + policy := bi.accessPolicyForCollection(chdr, ns.NameSpace, hashed.CollectionName) + if policy == nil { + continue + } + if !bi.isEligible(policy, ns.NameSpace, hashed.CollectionName) { + continue + } + key := rwSetKey{ + txID: chdr.TxId, + seqInBlock: seqInBlock, + hash: hex.EncodeToString(hashed.PvtRwSetHash), + namespace: ns.NameSpace, + collection: hashed.CollectionName, + } + bi.privateRWsetsInBlock[key] = struct{}{} + if _, exists := bi.ownedRWsets[key]; !exists { + txAndSeq := txAndSeqInBlock{ + txID: chdr.TxId, + seqInBlock: seqInBlock, + } + bi.missingKeys[txAndSeq] = append(bi.missingKeys[txAndSeq], key) + bi.sources[key] = endorsersFromOrgs(ns.NameSpace, hashed.CollectionName, endorsers, policy.MemberOrgs()) + } + } // for all hashed RW sets + } // for all RW sets } -// isEligible checks if this peer is eligible for a collection in a given namespace -func (c *coordinator) isEligible(chdr *common.ChannelHeader, namespace string, col string) bool { +// accessPolicyForCollection retrieves a CollectionAccessPolicy for a given namespace, collection name +// that corresponds to a given ChannelHeader +func (c *coordinator) accessPolicyForCollection(chdr *common.ChannelHeader, namespace string, col string) privdata.CollectionAccessPolicy { cp := common.CollectionCriteria{ Channel: chdr.ChannelId, Namespace: namespace, @@ -572,16 +660,21 @@ func (c *coordinator) isEligible(chdr *common.ChannelHeader, namespace string, c sp := c.CollectionStore.RetrieveCollectionAccessPolicy(cp) if sp == nil { logger.Warning("Failed obtaining policy for", cp, "skipping collection") - return false + return nil } - filt := sp.AccessFilter() + return sp +} + +// isEligible checks if this peer is eligible for a given CollectionAccessPolicy +func (c *coordinator) isEligible(ap privdata.CollectionAccessPolicy, namespace string, col string) bool { + filt := ap.AccessFilter() if filt == nil { - logger.Warning("Failed parsing policy for", cp, "skipping collection") + logger.Warning("Failed parsing policy for namespace", namespace, "collection", col, "skipping collection") return false } eligible := filt(c.selfSignedData) if !eligible { - logger.Debug("Skipping", cp, "because we're not eligible for the private data") + logger.Debug("Skipping namespace", namespace, "collection", col, "because we're not eligible for the private data") } return eligible } @@ -638,7 +731,7 @@ func (c *coordinator) GetPvtDataAndBlockByNum(seqNum uint64, peerAuthInfo common seqs2Namespaces := aggregatedCollections(make(map[seqAndDataModel]map[string][]*rwset.CollectionPvtReadWriteSet)) data := blockData(blockAndPvtData.Block.Data.Data) - _, err = data.forEachTxn(make(txValidationFlags, len(data)), func(seqInBlock uint64, chdr *common.ChannelHeader, txRWSet *rwsetutil.TxRwSet) { + _, err = data.forEachTxn(make(txValidationFlags, len(data)), func(seqInBlock uint64, chdr *common.ChannelHeader, txRWSet *rwsetutil.TxRwSet, _ []*peer.Endorsement) { item, exists := blockAndPvtData.BlockPvtData[seqInBlock] if !exists { return diff --git a/gossip/privdata/coordinator_test.go b/gossip/privdata/coordinator_test.go index b06704e45c8..322d72df171 100644 --- a/gossip/privdata/coordinator_test.go +++ b/gossip/privdata/coordinator_test.go @@ -15,6 +15,7 @@ import ( "testing" "time" + pb "github.com/golang/protobuf/proto" util2 "github.com/hyperledger/fabric/common/util" "github.com/hyperledger/fabric/core/common/privdata" "github.com/hyperledger/fabric/core/ledger" @@ -23,6 +24,7 @@ import ( "github.com/hyperledger/fabric/protos/common" proto "github.com/hyperledger/fabric/protos/gossip" "github.com/hyperledger/fabric/protos/ledger/rwset" + "github.com/hyperledger/fabric/protos/msp" "github.com/spf13/viper" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" @@ -206,8 +208,21 @@ type fetchCall struct { *mock.Call } -func (fc *fetchCall) expectingReq(req *proto.RemotePvtDataRequest) *fetchCall { - fc.fetcher.expectedReq = req +func (fc *fetchCall) expectingEndorsers(orgs ...string) *fetchCall { + if fc.fetcher.expectedEndorsers == nil { + fc.fetcher.expectedEndorsers = make(map[string]struct{}) + } + for _, org := range orgs { + sId := &msp.SerializedIdentity{Mspid: org, IdBytes: []byte(fmt.Sprintf("p0%s", org))} + b, _ := pb.Marshal(sId) + fc.fetcher.expectedEndorsers[string(b)] = struct{}{} + } + + return fc +} + +func (fc *fetchCall) expectingDigests(dig []*proto.PvtDataDigest) *fetchCall { + fc.fetcher.expectedDigests = dig return fc } @@ -219,7 +234,8 @@ func (fc *fetchCall) Return(returnArguments ...interface{}) *mock.Call { type fetcherMock struct { t *testing.T mock.Mock - expectedReq *proto.RemotePvtDataRequest + expectedDigests []*proto.PvtDataDigest + expectedEndorsers map[string]struct{} } func (f *fetcherMock) On(methodName string, arguments ...interface{}) *fetchCall { @@ -229,9 +245,20 @@ func (f *fetcherMock) On(methodName string, arguments ...interface{}) *fetchCall } } -func (f *fetcherMock) fetch(req *proto.RemotePvtDataRequest) ([]*proto.PvtDataElement, error) { - assert.True(f.t, digests(req.Digests).Equal(digests(f.expectedReq.Digests))) - args := f.Called(req) +func (f *fetcherMock) fetch(dig2src dig2sources) ([]*proto.PvtDataElement, error) { + for _, endorsements := range dig2src { + for _, endorsement := range endorsements { + _, exists := f.expectedEndorsers[string(endorsement.Endorser)] + if !exists { + f.t.Fatalf("Encountered a non-expected endorser: %s", string(endorsement.Endorser)) + } + // Else, it exists, so delete it so we will end up with an empty expected map at the end of the call + delete(f.expectedEndorsers, string(endorsement.Endorser)) + } + } + assert.True(f.t, digests(dig2src.keys()).Equal(digests(f.expectedDigests))) + assert.Empty(f.t, f.expectedEndorsers) + args := f.Called(dig2src) if args.Get(1) == nil { return args.Get(0).([]*proto.PvtDataElement), nil } @@ -287,6 +314,10 @@ type collectionAccessPolicy struct { n uint64 } +func (cap *collectionAccessPolicy) MemberOrgs() []string { + return []string{"org0", "org1"} +} + func (cap *collectionAccessPolicy) RequiredInternalPeerCount() int { return viper.GetInt("peer.gossip.pvtData.minInternalPeers") } @@ -693,7 +724,7 @@ func TestCoordinatorStoreBlock(t *testing.T) { bf := &blockFactory{ channelID: "test", } - block := bf.AddTxn("tx1", "ns1", hash, "c1", "c2").AddTxn("tx2", "ns2", hash, "c1").create() + block := bf.AddTxnWithEndorsement("tx1", "ns1", hash, "org1", "c1", "c2").AddTxnWithEndorsement("tx2", "ns2", hash, "org2", "c1").create() // Scenario I: Block we got has sufficient private data alongside it. // If the coordinator tries fetching from the transientstore, or peers it would result in panic, @@ -729,17 +760,17 @@ func TestCoordinatorStoreBlock(t *testing.T) { // Scenario III: Block doesn't have sufficient private data alongside it, // it is missing ns1: c2, and the data exists in the transient store, - // but it is also missing ns2: c1, and that data doesn't exist in the transient store - but in a peer - fetcher.On("fetch", mock.Anything).expectingReq(&proto.RemotePvtDataRequest{ - Digests: []*proto.PvtDataDigest{ - { - TxId: "tx1", Namespace: "ns1", Collection: "c2", BlockSeq: 1, - }, - { - TxId: "tx2", Namespace: "ns2", Collection: "c1", BlockSeq: 1, SeqInBlock: 1, - }, + // but it is also missing ns2: c1, and that data doesn't exist in the transient store - but in a peer. + // Additionally, the coordinator should pass an endorser identity of org1, but not of org2, since + // the MemberOrgs() call doesn't return org2 but only org0 and org1. + fetcher.On("fetch", mock.Anything).expectingDigests([]*proto.PvtDataDigest{ + { + TxId: "tx1", Namespace: "ns1", Collection: "c2", BlockSeq: 1, }, - }).Return([]*proto.PvtDataElement{ + { + TxId: "tx2", Namespace: "ns2", Collection: "c1", BlockSeq: 1, SeqInBlock: 1, + }, + }).expectingEndorsers("org1").Return([]*proto.PvtDataElement{ { Digest: &proto.PvtDataDigest{ BlockSeq: 1, @@ -782,11 +813,9 @@ func TestCoordinatorStoreBlock(t *testing.T) { // In this case, we should try to fetch data from peers. block = bf.AddTxn("tx3", "ns3", hash, "c3").create() fetcher = &fetcherMock{t: t} - fetcher.On("fetch", mock.Anything).expectingReq(&proto.RemotePvtDataRequest{ - Digests: []*proto.PvtDataDigest{ - { - TxId: "tx3", Namespace: "ns3", Collection: "c3", BlockSeq: 1, - }, + fetcher.On("fetch", mock.Anything).expectingDigests([]*proto.PvtDataDigest{ + { + TxId: "tx3", Namespace: "ns3", Collection: "c3", BlockSeq: 1, }, }).Return([]*proto.PvtDataElement{ { @@ -811,8 +840,6 @@ func TestCoordinatorStoreBlock(t *testing.T) { committer.On("CommitWithPvtData", mock.Anything).Run(func(args mock.Arguments) { var privateDataPassed2Ledger privateData = args.Get(0).(*ledger.BlockAndPvtData).BlockPvtData assert.True(t, privateDataPassed2Ledger.Equal(expectedCommittedPrivateData2)) - fmt.Println(privateDataPassed2Ledger) - fmt.Println(expectedCommittedPrivateData2) commitHappened = true }).Return(nil) coordinator = NewCoordinator(Support{ @@ -914,11 +941,9 @@ func TestProceedWithoutPrivateData(t *testing.T) { fetcher := &fetcherMock{t: t} // Have the peer return in response to the pull, a private data with a non matching hash - fetcher.On("fetch", mock.Anything).expectingReq(&proto.RemotePvtDataRequest{ - Digests: []*proto.PvtDataDigest{ - { - TxId: "tx1", Namespace: "ns3", Collection: "c2", BlockSeq: 1, - }, + fetcher.On("fetch", mock.Anything).expectingDigests([]*proto.PvtDataDigest{ + { + TxId: "tx1", Namespace: "ns3", Collection: "c2", BlockSeq: 1, }, }).Return([]*proto.PvtDataElement{ { diff --git a/gossip/privdata/pull.go b/gossip/privdata/pull.go index d6e0f0bffcf..0e0fd1426c3 100644 --- a/gossip/privdata/pull.go +++ b/gossip/privdata/pull.go @@ -200,9 +200,9 @@ func (p *puller) waitForMembership() []discovery.NetworkMember { return members } -func (p *puller) fetch(req *proto.RemotePvtDataRequest) ([]*proto.PvtDataElement, error) { +func (p *puller) fetch(dig2src dig2sources) ([]*proto.PvtDataElement, error) { // computeFilters returns a map from a digest to a routing filter - dig2Filter, err := p.computeFilters(req) + dig2Filter, err := p.computeFilters(dig2src.keys()) if err != nil { return nil, errors.WithStack(err) } @@ -355,9 +355,9 @@ func (dig2Filter digestToFilterMapping) String() string { return buffer.String() } -func (p *puller) computeFilters(req *proto.RemotePvtDataRequest) (digestToFilterMapping, error) { +func (p *puller) computeFilters(digests []*proto.PvtDataDigest) (digestToFilterMapping, error) { filters := make(map[proto.PvtDataDigest]filter.RoutingFilter) - for _, digest := range req.Digests { + for _, digest := range digests { collection := p.cs.RetrieveCollectionAccessPolicy(fcommon.CollectionCriteria{ Channel: p.channel, TxId: digest.TxId, diff --git a/gossip/privdata/pull_test.go b/gossip/privdata/pull_test.go index 06413ab4d7b..e29d165c787 100644 --- a/gossip/privdata/pull_test.go +++ b/gossip/privdata/pull_test.go @@ -9,9 +9,8 @@ package privdata import ( "bytes" "crypto/rand" - "testing" - "sync" + "testing" "github.com/hyperledger/fabric/core/common/privdata" "github.com/hyperledger/fabric/gossip/api" @@ -78,6 +77,10 @@ func (mc *mockCollectionAccess) thatMapsTo(peers ...string) *mockCollectionStore return mc.cs } +func (mc *mockCollectionAccess) MemberOrgs() []string { + return nil +} + func (mc *mockCollectionAccess) AccessFilter() privdata.Filter { policyLock.Lock() defer policyLock.Unlock() @@ -237,9 +240,9 @@ func TestPullerFromOnly1Peer(t *testing.T) { t.Fatal("p3 shouldn't have been selected for pull") }) - fetchedMessages, err := p1.fetch(&proto.RemotePvtDataRequest{ - Digests: []*proto.PvtDataDigest{dig}, - }) + dasf := &digestsAndSourceFactory{} + + fetchedMessages, err := p1.fetch(dasf.mapDigest(dig).toSources().create()) rws1 := util.PrivateRWSet(fetchedMessages[0].Payload[0]) rws2 := util.PrivateRWSet(fetchedMessages[0].Payload[1]) fetched := []util.PrivateRWSet{rws1, rws2} @@ -270,9 +273,8 @@ func TestPullerDataNotAvailable(t *testing.T) { t.Fatal("p3 shouldn't have been selected for pull") }) - fetchedMessages, err := p1.fetch(&proto.RemotePvtDataRequest{ - Digests: []*proto.PvtDataDigest{dig}, - }) + dasf := &digestsAndSourceFactory{} + fetchedMessages, err := p1.fetch(dasf.mapDigest(dig).toSources().create()) assert.Empty(t, fetchedMessages) assert.NoError(t, err) } @@ -283,9 +285,9 @@ func TestPullerNoPeersKnown(t *testing.T) { gn := &gossipNetwork{} policyStore := newCollectionStore().withPolicy("col1").thatMapsTo("p2").withPolicy("col1").thatMapsTo("p3") p1 := gn.newPuller("p1", policyStore) - fetchedMessages, err := p1.fetch(&proto.RemotePvtDataRequest{ - Digests: []*proto.PvtDataDigest{{Collection: "col1", TxId: "txID1", Namespace: "ns1"}}, - }) + dasf := &digestsAndSourceFactory{} + d2s := dasf.mapDigest(&proto.PvtDataDigest{Collection: "col1", TxId: "txID1", Namespace: "ns1"}).toSources().create() + fetchedMessages, err := p1.fetch(d2s) assert.Empty(t, fetchedMessages) assert.Error(t, err) assert.Contains(t, err.Error(), "Empty membership") @@ -298,9 +300,9 @@ func TestPullPeerFilterError(t *testing.T) { policyStore := newCollectionStore().withPolicy("col1").thatMapsTo("p2") p1 := gn.newPuller("p1", policyStore) gn.peers[0].On("PeerFilter", mock.Anything, mock.Anything).Return(nil, errors.New("Failed obtaining filter")) - fetchedMessages, err := p1.fetch(&proto.RemotePvtDataRequest{ - Digests: []*proto.PvtDataDigest{{Collection: "col1", TxId: "txID1", Namespace: "ns1"}}, - }) + dasf := &digestsAndSourceFactory{} + d2s := dasf.mapDigest(&proto.PvtDataDigest{Collection: "col1", TxId: "txID1", Namespace: "ns1"}).toSources().create() + fetchedMessages, err := p1.fetch(d2s) assert.Error(t, err) assert.Contains(t, err.Error(), "Failed obtaining filter") assert.Empty(t, fetchedMessages) @@ -332,10 +334,9 @@ func TestPullerPeerNotEligible(t *testing.T) { p3.PrivateDataRetriever.(*dataRetrieverMock).On("CollectionRWSet", dig).Run(func(_ mock.Arguments) { t.Fatal("p3 shouldn't have approved the pull") }) - - fetchedMessages, err := p1.fetch(&proto.RemotePvtDataRequest{ - Digests: []*proto.PvtDataDigest{{Collection: "col1", TxId: "txID1"}}, - }) + dasf := &digestsAndSourceFactory{} + d2s := dasf.mapDigest(&proto.PvtDataDigest{Collection: "col1", TxId: "txID1"}).toSources().create() + fetchedMessages, err := p1.fetch(d2s) assert.Empty(t, fetchedMessages) assert.NoError(t, err) } @@ -371,9 +372,8 @@ func TestPullerDifferentPeersDifferentCollections(t *testing.T) { p3.PrivateDataRetriever.(*dataRetrieverMock).On("CollectionRWSet", dig2).Return(p3TransientStore) - fetchedMessages, err := p1.fetch(&proto.RemotePvtDataRequest{ - Digests: []*proto.PvtDataDigest{dig1, dig2}, - }) + dasf := &digestsAndSourceFactory{} + fetchedMessages, err := p1.fetch(dasf.mapDigest(dig1).toSources().mapDigest(dig2).toSources().create()) assert.NoError(t, err) rws1 := util.PrivateRWSet(fetchedMessages[0].Payload[0]) rws2 := util.PrivateRWSet(fetchedMessages[0].Payload[1]) @@ -426,9 +426,8 @@ func TestPullerRetries(t *testing.T) { p5.PrivateDataRetriever.(*dataRetrieverMock).On("CollectionRWSet", dig).Return(transientStore) // Fetch from someone - fetchedMessages, err := p1.fetch(&proto.RemotePvtDataRequest{ - Digests: []*proto.PvtDataDigest{dig}, - }) + dasf := &digestsAndSourceFactory{} + fetchedMessages, err := p1.fetch(dasf.mapDigest(dig).toSources().create()) assert.NoError(t, err) rws1 := util.PrivateRWSet(fetchedMessages[0].Payload[0]) rws2 := util.PrivateRWSet(fetchedMessages[0].Payload[1]) diff --git a/gossip/privdata/util.go b/gossip/privdata/util.go index 454e7104de9..62625dba1d7 100644 --- a/gossip/privdata/util.go +++ b/gossip/privdata/util.go @@ -7,12 +7,16 @@ SPDX-License-Identifier: Apache-2.0 package privdata import ( + "fmt" + "github.com/golang/protobuf/proto" "github.com/hyperledger/fabric/core/ledger" "github.com/hyperledger/fabric/core/ledger/kvledger/txmgmt/rwsetutil" "github.com/hyperledger/fabric/protos/common" + gossip2 "github.com/hyperledger/fabric/protos/gossip" "github.com/hyperledger/fabric/protos/ledger/rwset" "github.com/hyperledger/fabric/protos/ledger/rwset/kvrwset" + "github.com/hyperledger/fabric/protos/msp" "github.com/hyperledger/fabric/protos/peer" ) @@ -27,6 +31,10 @@ type blockFactory struct { } func (bf *blockFactory) AddTxn(txID string, nsName string, hash []byte, collections ...string) *blockFactory { + return bf.AddTxnWithEndorsement(txID, nsName, hash, "", collections...) +} + +func (bf *blockFactory) AddTxnWithEndorsement(txID string, nsName string, hash []byte, org string, collections ...string) *blockFactory { txn := &peer.Transaction{ Actions: []*peer.TransactionAction{ {}, @@ -63,6 +71,16 @@ func (bf *blockFactory) AddTxn(txID string, nsName string, hash []byte, collecti }, } + if org != "" { + sId := &msp.SerializedIdentity{Mspid: org, IdBytes: []byte(fmt.Sprintf("p0%s", org))} + b, _ := proto.Marshal(sId) + ccPayload.Action.Endorsements = []*peer.Endorsement{ + { + Endorser: b, + }, + } + } + ccPayloadBytes, err := proto.Marshal(ccPayload) if err != nil { panic(err) @@ -221,3 +239,36 @@ func (df *pvtDataFactory) create() []*ledger.TxPvtData { }() return df.data } + +type digestsAndSourceFactory struct { + d2s dig2sources + lastDig *gossip2.PvtDataDigest +} + +func (f *digestsAndSourceFactory) mapDigest(dig *gossip2.PvtDataDigest) *digestsAndSourceFactory { + f.lastDig = dig + return f +} + +func (f *digestsAndSourceFactory) toSources(orgs ...string) *digestsAndSourceFactory { + if f.d2s == nil { + f.d2s = make(dig2sources) + } + var endorsements []*peer.Endorsement + for i, org := range orgs { + sId := &msp.SerializedIdentity{ + Mspid: org, + IdBytes: []byte(fmt.Sprintf("p%d.%s", i, org)), + } + b, _ := proto.Marshal(sId) + endorsements = append(endorsements, &peer.Endorsement{ + Endorser: b, + }) + } + f.d2s[f.lastDig] = endorsements + return f +} + +func (f *digestsAndSourceFactory) create() dig2sources { + return f.d2s +} diff --git a/gossip/util/misc.go b/gossip/util/misc.go index e192b45dd25..791e0bc7899 100644 --- a/gossip/util/misc.go +++ b/gossip/util/misc.go @@ -25,6 +25,16 @@ type Equals func(a interface{}, b interface{}) bool var viperLock sync.RWMutex +// Contains returns whether a given slice a contains a string s +func Contains(s string, a []string) bool { + for _, e := range a { + if e == s { + return true + } + } + return false +} + // IndexInSlice returns the index of given object o in array func IndexInSlice(array interface{}, o interface{}, equals Equals) int { arr := reflect.ValueOf(array) diff --git a/gossip/util/misc_test.go b/gossip/util/misc_test.go index 5026f01eb1a..f9946586b2d 100644 --- a/gossip/util/misc_test.go +++ b/gossip/util/misc_test.go @@ -27,6 +27,11 @@ func testHappyPath(t *testing.T) { assert.NotEqual(t, n3, n4) } +func TestContains(t *testing.T) { + assert.True(t, Contains("foo", []string{"bar", "foo", "baz"})) + assert.False(t, Contains("foo", []string{"bar", "baz"})) +} + func TestGetRandomInt(t *testing.T) { testHappyPath(t) }