Skip to content

Commit

Permalink
[FAB-10302] Don't use protobuf as map keys
Browse files Browse the repository at this point in the history
PvtDataDigest is a protobuf message that is used in the gossip private data
implementation.

The latest version of protobuf generates functions to protobuf structs,
which prevents using protobuf messages as map keys.

This change set moves the code to use an equivalent struct
that has the same fields but without functions.

Change-Id: Icdfdffb98625cb7ace37d6c0227906971e05fcc7
Signed-off-by: yacovm <[email protected]>
  • Loading branch information
yacovm committed Jul 6, 2018
1 parent 719efe2 commit 9c77fe5
Show file tree
Hide file tree
Showing 5 changed files with 111 additions and 52 deletions.
10 changes: 5 additions & 5 deletions gossip/privdata/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,10 +94,10 @@ type Coordinator interface {
Close()
}

type dig2sources map[*gossip2.PvtDataDigest][]*peer.Endorsement
type dig2sources map[DigKey][]*peer.Endorsement

func (d2s dig2sources) keys() []*gossip2.PvtDataDigest {
var res []*gossip2.PvtDataDigest
func (d2s dig2sources) keys() []DigKey {
var res []DigKey
for dig := range d2s {
res = append(res, dig)
}
Expand Down Expand Up @@ -257,10 +257,10 @@ func (c *coordinator) StoreBlock(block *common.Block, privateDataSets util.PvtDa
}

func (c *coordinator) fetchFromPeers(blockSeq uint64, ownedRWsets map[rwSetKey][]byte, privateInfo *privateDataInfo) {
dig2src := make(map[*gossip2.PvtDataDigest][]*peer.Endorsement)
dig2src := make(map[DigKey][]*peer.Endorsement)
privateInfo.missingKeys.foreach(func(k rwSetKey) {
logger.Debug("Fetching", k, "from peers")
dig := &gossip2.PvtDataDigest{
dig := DigKey{
TxId: k.txID,
SeqInBlock: k.seqInBlock,
Collection: k.collection,
Expand Down
56 changes: 37 additions & 19 deletions gossip/privdata/coordinator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,24 @@ func init() {
factory.InitFactories(nil)
}

// CollectionCriteria aggregates criteria of
// a collection
type CollectionCriteria struct {
Channel string
TxId string
Collection string
Namespace string
}

func fromCollectionCriteria(criteria common.CollectionCriteria) CollectionCriteria {
return CollectionCriteria{
TxId: criteria.TxId,
Collection: criteria.Collection,
Namespace: criteria.Namespace,
Channel: criteria.Channel,
}
}

type persistCall struct {
*mock.Call
store *mockTransientStore
Expand Down Expand Up @@ -240,13 +258,13 @@ func (v *validatorMock) Validate(block *common.Block) error {
return nil
}

type digests []*proto.PvtDataDigest
type digests []DigKey

func (d digests) Equal(other digests) bool {
flatten := func(d digests) map[proto.PvtDataDigest]struct{} {
m := map[proto.PvtDataDigest]struct{}{}
flatten := func(d digests) map[DigKey]struct{} {
m := map[DigKey]struct{}{}
for _, dig := range d {
m[*dig] = struct{}{}
m[dig] = struct{}{}
}
return m
}
Expand All @@ -271,8 +289,8 @@ func (fc *fetchCall) expectingEndorsers(orgs ...string) *fetchCall {
return fc
}

func (fc *fetchCall) expectingDigests(dig []*proto.PvtDataDigest) *fetchCall {
fc.fetcher.expectedDigests = dig
func (fc *fetchCall) expectingDigests(digests []DigKey) *fetchCall {
fc.fetcher.expectedDigests = digests
return fc
}

Expand All @@ -284,7 +302,7 @@ func (fc *fetchCall) Return(returnArguments ...interface{}) *mock.Call {
type fetcherMock struct {
t *testing.T
mock.Mock
expectedDigests []*proto.PvtDataDigest
expectedDigests []DigKey
expectedEndorsers map[string]struct{}
}

Expand Down Expand Up @@ -318,17 +336,17 @@ func (f *fetcherMock) fetch(dig2src dig2sources, _ uint64) (*FetchedPvtDataConta
func createcollectionStore(expectedSignedData common.SignedData) *collectionStore {
return &collectionStore{
expectedSignedData: expectedSignedData,
policies: make(map[collectionAccessPolicy]common.CollectionCriteria),
store: make(map[common.CollectionCriteria]collectionAccessPolicy),
policies: make(map[collectionAccessPolicy]CollectionCriteria),
store: make(map[CollectionCriteria]collectionAccessPolicy),
}
}

type collectionStore struct {
expectedSignedData common.SignedData
acceptsAll bool
lenient bool
store map[common.CollectionCriteria]collectionAccessPolicy
policies map[collectionAccessPolicy]common.CollectionCriteria
store map[CollectionCriteria]collectionAccessPolicy
policies map[collectionAccessPolicy]CollectionCriteria
}

func (cs *collectionStore) thatAcceptsAll() *collectionStore {
Expand All @@ -341,7 +359,7 @@ func (cs *collectionStore) andIsLenient() *collectionStore {
return cs
}

func (cs *collectionStore) thatAccepts(cc common.CollectionCriteria) *collectionStore {
func (cs *collectionStore) thatAccepts(cc CollectionCriteria) *collectionStore {
sp := collectionAccessPolicy{
cs: cs,
n: util.RandomUInt64(),
Expand All @@ -352,7 +370,7 @@ func (cs *collectionStore) thatAccepts(cc common.CollectionCriteria) *collection
}

func (cs *collectionStore) RetrieveCollectionAccessPolicy(cc common.CollectionCriteria) (privdata.CollectionAccessPolicy, error) {
if sp, exists := cs.store[cc]; exists {
if sp, exists := cs.store[fromCollectionCriteria(cc)]; exists {
return &sp, nil
}
if cs.acceptsAll || cs.lenient {
Expand Down Expand Up @@ -818,7 +836,7 @@ func TestCoordinatorToFilterOutPvtRWSetsWithWrongHash(t *testing.T) {
Validator: &validatorMock{},
}, peerSelfSignedData)

fetcher.On("fetch", mock.Anything).expectingDigests([]*proto.PvtDataDigest{
fetcher.On("fetch", mock.Anything).expectingDigests([]DigKey{
{
TxId: "tx1", Namespace: "ns1", Collection: "c1", BlockSeq: 1,
},
Expand Down Expand Up @@ -941,7 +959,7 @@ func TestCoordinatorStoreBlock(t *testing.T) {
// but it is also missing ns2: c1, and that data doesn't exist in the transient store - but in a peer.
// Additionally, the coordinator should pass an endorser identity of org1, but not of org2, since
// the MemberOrgs() call doesn't return org2 but only org0 and org1.
fetcher.On("fetch", mock.Anything).expectingDigests([]*proto.PvtDataDigest{
fetcher.On("fetch", mock.Anything).expectingDigests([]DigKey{
{
TxId: "tx1", Namespace: "ns1", Collection: "c2", BlockSeq: 1,
},
Expand Down Expand Up @@ -995,7 +1013,7 @@ func TestCoordinatorStoreBlock(t *testing.T) {
// In this case, we should try to fetch data from peers.
block = bf.AddTxn("tx3", "ns3", hash, "c3").create()
fetcher = &fetcherMock{t: t}
fetcher.On("fetch", mock.Anything).expectingDigests([]*proto.PvtDataDigest{
fetcher.On("fetch", mock.Anything).expectingDigests([]DigKey{
{
TxId: "tx3", Namespace: "ns3", Collection: "c3", BlockSeq: 1,
},
Expand Down Expand Up @@ -1044,7 +1062,7 @@ func TestCoordinatorStoreBlock(t *testing.T) {
// private data from the transient store or peers, and in fact- if it attempts to fetch the data it's not eligible
// for from the transient store or from peers - the test would fail because the Mock wasn't initialized.
block = bf.AddTxn("tx3", "ns3", hash, "c3", "c2", "c1").AddTxn("tx1", "ns1", hash, "c1").create()
cs = createcollectionStore(peerSelfSignedData).thatAccepts(common.CollectionCriteria{
cs = createcollectionStore(peerSelfSignedData).thatAccepts(CollectionCriteria{
TxId: "tx3",
Collection: "c3",
Namespace: "ns3",
Expand Down Expand Up @@ -1126,7 +1144,7 @@ func TestProceedWithoutPrivateData(t *testing.T) {

fetcher := &fetcherMock{t: t}
// Have the peer return in response to the pull, a private data with a non matching hash
fetcher.On("fetch", mock.Anything).expectingDigests([]*proto.PvtDataDigest{
fetcher.On("fetch", mock.Anything).expectingDigests([]DigKey{
{
TxId: "tx1", Namespace: "ns3", Collection: "c2", BlockSeq: 1,
},
Expand Down Expand Up @@ -1191,7 +1209,7 @@ func TestCoordinatorGetBlocks(t *testing.T) {

// Green path - block and private data is returned, but the requester isn't eligible for all the private data,
// but only to a subset of it.
cs = createcollectionStore(sd).thatAccepts(common.CollectionCriteria{
cs = createcollectionStore(sd).thatAccepts(CollectionCriteria{
Namespace: "ns1",
Collection: "c2",
TxId: "tx1",
Expand Down
58 changes: 45 additions & 13 deletions gossip/privdata/pull.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@ const (
btlPullMarginDefault = 10
)

// DigKey
// DigKey defines a digest that
// specifies a specific hashed RWSet
type DigKey struct {
TxId string
Namespace string
Expand Down Expand Up @@ -244,9 +245,15 @@ func (p *puller) fetch(dig2src dig2sources, blockSeq uint64) (*FetchedPvtDataCon
purgedPvt := p.getPurgedCollections(members, dig2Filter, blockSeq)
// Need to remove purged digest from mapping
for _, dig := range purgedPvt {
res.PurgedElements = append(res.PurgedElements, dig)
res.PurgedElements = append(res.PurgedElements, &proto.PvtDataDigest{
TxId: dig.TxId,
BlockSeq: dig.BlockSeq,
SeqInBlock: dig.SeqInBlock,
Namespace: dig.Namespace,
Collection: dig.Collection,
})
// remove digest so we won't even try to pull purged data
delete(dig2Filter, *dig)
delete(dig2Filter, dig)
itemsLeftToCollect--
}

Expand All @@ -271,7 +278,13 @@ func (p *puller) fetch(dig2src dig2sources, blockSeq uint64) (*FetchedPvtDataCon
logger.Debug("Got empty response for", resp.Digest)
continue
}
delete(dig2Filter, *resp.Digest)
delete(dig2Filter, DigKey{
TxId: resp.Digest.TxId,
BlockSeq: resp.Digest.BlockSeq,
SeqInBlock: resp.Digest.SeqInBlock,
Namespace: resp.Digest.Namespace,
Collection: resp.Digest.Collection,
})
itemsLeftToCollect--
}
res.AvailableElemenets = append(res.AvailableElemenets, responses...)
Expand Down Expand Up @@ -360,7 +373,13 @@ func (p *puller) assignDigestsToPeers(members []discovery.NetworkMember, dig2Fil
}
// Add the peer to the mapping from peer to digest slice
peer := remotePeer{pkiID: string(selectedPeer.PKIID), endpoint: selectedPeer.Endpoint}
res[peer] = append(res[peer], dig)
res[peer] = append(res[peer], proto.PvtDataDigest{
TxId: dig.TxId,
BlockSeq: dig.BlockSeq,
SeqInBlock: dig.SeqInBlock,
Namespace: dig.Namespace,
Collection: dig.Collection,
})
}

var noneSelectedPeers []discovery.NetworkMember
Expand All @@ -379,7 +398,7 @@ type collectionRoutingFilter struct {
endorser filter.RoutingFilter
}

type digestToFilterMapping map[proto.PvtDataDigest]collectionRoutingFilter
type digestToFilterMapping map[DigKey]collectionRoutingFilter

func (dig2f digestToFilterMapping) flattenFilterValues() []filter.RoutingFilter {
var filters []filter.RoutingFilter
Expand All @@ -393,7 +412,13 @@ func (dig2f digestToFilterMapping) flattenFilterValues() []filter.RoutingFilter
func (dig2f digestToFilterMapping) digests() []proto.PvtDataDigest {
var digs []proto.PvtDataDigest
for d := range dig2f {
digs = append(digs, d)
digs = append(digs, proto.PvtDataDigest{
TxId: d.TxId,
BlockSeq: d.BlockSeq,
SeqInBlock: d.SeqInBlock,
Namespace: d.Namespace,
Collection: d.Collection,
})
}
return digs
}
Expand All @@ -412,8 +437,15 @@ func (dig2f digestToFilterMapping) String() string {
}

func (p *puller) computeFilters(dig2src dig2sources) (digestToFilterMapping, error) {
filters := make(map[proto.PvtDataDigest]collectionRoutingFilter)
filters := make(map[DigKey]collectionRoutingFilter)
for digest, sources := range dig2src {
digKey := DigKey{
TxId: digest.TxId,
BlockSeq: digest.BlockSeq,
SeqInBlock: digest.SeqInBlock,
Namespace: digest.Namespace,
Collection: digest.Collection,
}
cc := fcommon.CollectionCriteria{
Channel: p.channel,
TxId: digest.TxId,
Expand Down Expand Up @@ -453,7 +485,7 @@ func (p *puller) computeFilters(dig2src dig2sources) (digestToFilterMapping, err
return nil, errors.WithStack(err)
}

filters[*digest] = collectionRoutingFilter{
filters[digKey] = collectionRoutingFilter{
anyPeer: anyPeerInCollection,
endorser: endorserPeer,
}
Expand All @@ -462,9 +494,9 @@ func (p *puller) computeFilters(dig2src dig2sources) (digestToFilterMapping, err
}

func (p *puller) getPurgedCollections(members []discovery.NetworkMember,
dig2Filter digestToFilterMapping, blockSeq uint64) []*proto.PvtDataDigest {
dig2Filter digestToFilterMapping, blockSeq uint64) []DigKey {

var res []*proto.PvtDataDigest
var res []DigKey
for dig := range dig2Filter {
dig := dig

Expand All @@ -480,13 +512,13 @@ func (p *puller) getPurgedCollections(members []discovery.NetworkMember,
logger.Debugf("Private data on channel [%s], chaincode [%s], collection name [%s] for txID = [%s],"+
"has been purged at peers [%v]", p.channel, dig.Namespace,
dig.Collection, dig.TxId, membersWithPurgedData)
res = append(res, &dig)
res = append(res, dig)
}
}
return res
}

func (p *puller) purgedFilter(dig proto.PvtDataDigest, blockSeq uint64) (filter.RoutingFilter, error) {
func (p *puller) purgedFilter(dig DigKey, blockSeq uint64) (filter.RoutingFilter, error) {
cc := fcommon.CollectionCriteria{
Channel: p.channel,
TxId: dig.TxId,
Expand Down
Loading

0 comments on commit 9c77fe5

Please sign in to comment.