Skip to content

Commit

Permalink
[FAB-15389] Fix private data dissemination
Browse files Browse the repository at this point in the history
Endorsing peer was not honoring maxPeerCount for private data
dissemination due to the addition of the spraying logic for balancing
selected peers for disseminiation. There was a chance that peers chosen
from the remaining count up to maxPeerCount could overlap with the
already selected peers for dissemination from the spray, causing the
dissemination plan to be incomplete. This CR ensures that once a peer
has been selected to disseminate private data to, it can not be selected
again when computing the dissemination plan for a particular tx. Also
added logging around dissemination for debugging purposes.

Change-Id: Ib31c37b036363787718dba74e7e90a7764afe7fb
Signed-off-by: Danny Cao <[email protected]>
  • Loading branch information
caod123 committed Nov 8, 2019
1 parent 872fffe commit 96028fd
Show file tree
Hide file tree
Showing 4 changed files with 173 additions and 25 deletions.
86 changes: 70 additions & 16 deletions gossip/privdata/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,7 @@ func (d *distributorImpl) computeDisseminationPlan(txID string,
return nil, errors.WithStack(err)
}

logger.Debugf("Computing dissemination plan for collection [%s]", collectionName)
dPlan, err := d.disseminationPlanForMsg(colAP, colFilter, pvtDataMsg)
if err != nil {
return nil, errors.WithStack(err)
Expand Down Expand Up @@ -205,20 +206,37 @@ func (d *distributorImpl) disseminationPlanForMsg(colAP privdata.CollectionAcces
return nil, err
}

m := pvtDataMsg.GetPrivateData().Payload

eligiblePeers := d.eligiblePeersOfChannel(routingFilter)
identitySets := d.identitiesOfEligiblePeers(eligiblePeers, colAP)

// Select one representative from each org
peerEndpoints := map[string]string{}
for _, peer := range eligiblePeers {
epToAdd := peer.Endpoint
if epToAdd == "" {
epToAdd = peer.InternalEndpoint
}
peerEndpoints[string(peer.PKIid)] = epToAdd
}

maximumPeerCount := colAP.MaximumPeerCount()
requiredPeerCount := colAP.RequiredPeerCount()

remainingPeers := []api.PeerIdentityInfo{}
selectedPeerEndpoints := []string{}

rand.Seed(time.Now().Unix())
// Select one representative from each org
if maximumPeerCount > 0 {
for _, selectionPeers := range identitySets {
required := 1
if requiredPeerCount == 0 {
required = 0
}
peer2SendPerOrg := selectionPeers[rand.Intn(len(selectionPeers))]
selectedPeerIndex := rand.Intn(len(selectionPeers))
peer2SendPerOrg := selectionPeers[selectedPeerIndex]
selectedPeerEndpoints = append(selectedPeerEndpoints, peerEndpoints[string(peer2SendPerOrg.PKIId)])
sc := gossip2.SendCriteria{
Timeout: d.pushAckTimeout,
Channel: gossipCommon.ChainID(d.chainID),
Expand All @@ -236,34 +254,70 @@ func (d *distributorImpl) disseminationPlanForMsg(colAP privdata.CollectionAcces
},
})

// Add unselected peers to remainingPeers
for i, peer := range selectionPeers {
if i != selectedPeerIndex {
remainingPeers = append(remainingPeers, peer)
}
}

if requiredPeerCount > 0 {
requiredPeerCount--
}

maximumPeerCount--
if maximumPeerCount == 0 {
logger.Debug("MaximumPeerCount satisfied")
logger.Debugf("Disseminating private RWSet for TxID [%s] namespace [%s] collection [%s] to peers: %v", m.TxId, m.Namespace, m.CollectionName, selectedPeerEndpoints)
return disseminationPlan, nil
}
}
}

// criteria to select remaining peers to satisfy colAP.MaximumPeerCount()
// collection policy parameters
sc := gossip2.SendCriteria{
Timeout: d.pushAckTimeout,
Channel: gossipCommon.ChainID(d.chainID),
MaxPeers: maximumPeerCount,
MinAck: requiredPeerCount,
IsEligible: func(member discovery.NetworkMember) bool {
return routingFilter(member)
},
// criteria to select remaining peers to satisfy colAP.MaximumPeerCount() if there are still
// unselected peers remaining for dissemination
numPeersToSelect := maximumPeerCount
if len(remainingPeers) < maximumPeerCount {
numPeersToSelect = len(remainingPeers)
}
if numPeersToSelect > 0 {
logger.Debugf("MaximumPeerCount not satisfied, selecting %d more peer(s) for dissemination", numPeersToSelect)
}
for maximumPeerCount > 0 && len(remainingPeers) > 0 {
required := 1
if requiredPeerCount == 0 {
required = 0
}
selectedPeerIndex := rand.Intn(len(remainingPeers))
peer2Send := remainingPeers[selectedPeerIndex]
selectedPeerEndpoints = append(selectedPeerEndpoints, peerEndpoints[string(peer2Send.PKIId)])
sc := gossip2.SendCriteria{
Timeout: d.pushAckTimeout,
Channel: gossipCommon.ChainID(d.chainID),
MaxPeers: 1,
MinAck: required,
IsEligible: func(member discovery.NetworkMember) bool {
return bytes.Equal(member.PKIid, peer2Send.PKIId)
},
}
disseminationPlan = append(disseminationPlan, &dissemination{
criteria: sc,
msg: &proto.SignedGossipMessage{
Envelope: proto2.Clone(pvtDataMsg.Envelope).(*proto.Envelope),
GossipMessage: proto2.Clone(pvtDataMsg.GossipMessage).(*proto.GossipMessage),
},
})
if requiredPeerCount > 0 {
requiredPeerCount--
}

disseminationPlan = append(disseminationPlan, &dissemination{
criteria: sc,
msg: pvtDataMsg,
})
maximumPeerCount--

// remove the selected peer from remaining peers
remainingPeers = append(remainingPeers[:selectedPeerIndex], remainingPeers[selectedPeerIndex+1:]...)
}

logger.Debugf("Disseminating private RWSet for TxID [%s] namespace [%s] collection [%s] to peers: %v", m.TxId, m.Namespace, m.CollectionName, selectedPeerEndpoints)
return disseminationPlan, nil
}

Expand Down
2 changes: 1 addition & 1 deletion gossip/privdata/distributor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,7 @@ func TestDistributor(t *testing.T) {
},
}, 0)
assert.Error(t, err)
assert.Contains(t, err.Error(), "Failed disseminating 4 out of 4 private dissemination plans")
assert.Contains(t, err.Error(), "Failed disseminating 2 out of 2 private dissemination plans")

assert.Equal(t,
[]string{"channel", channelID},
Expand Down
92 changes: 84 additions & 8 deletions integration/pvtdata/pvtdata_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ var _ bool = Describe("PrivateData", func() {
// 2. collectionMarblePrivateDetails - Org2 and Org3 have access to this collection
// when calling QueryChaincode with first arg "readMarble", it will query collectionMarbles[1]
// when calling QueryChaincode with first arg "readMarblePrivateDetails", it will query collectionMarblePrivateDetails[2]
Describe("reconciliation", func() {
Describe("dissemination", func() {
var (
testDir string
network *nwo.Network
Expand All @@ -47,8 +47,66 @@ var _ bool = Describe("PrivateData", func() {
)

BeforeEach(func() {
testDir, network, process, orderer, expectedPeers = initThreeOrgsSetup()
testDir, network = initThreeOrgsSetup()

By("disabling pulling by setting pullRetryThreshold to 0")
// set pull retry threshold to 0, this ensures private data can only be transfered via dissemination
peers := []*nwo.Peer{
network.Peer("org1", "peer0"),
network.Peer("org2", "peer0"),
network.Peer("org3", "peer0"),
}
for _, p := range peers {
core := network.ReadPeerConfig(p)
core.Peer.Gossip.PvtData.PullRetryThreshold = 0
network.WritePeerConfig(p, core)
}
})

JustBeforeEach(func() {
process, orderer, expectedPeers = startNetwork(network)
By("installing and instantiating chaincode on all peers")
chaincode := nwo.Chaincode{
Name: "marblesp",
Version: "1.0",
Path: "github.com/hyperledger/fabric/integration/chaincode/marbles_private/cmd",
Ctor: `{"Args":["init"]}`,
Policy: `OR ('Org1MSP.member','Org2MSP.member', 'Org3MSP.member')`,
CollectionsConfig: filepath.Join("testdata", "collection_configs", "collections_config1.json")}
nwo.DeployChaincode(network, "testchannel", orderer, chaincode)

By("invoking initMarble function of the chaincode")
invokeChaincode(network, "org1", "peer0", "marblesp", `{"Args":["initMarble","marble1","blue","35","tom","99"]}`, "testchannel", orderer)

By("waiting for block to propagate")
waitUntilAllPeersSameLedgerHeight(network, expectedPeers, "testchannel", getLedgerHeight(network, network.Peer("org1", "peer0"), "testchannel"))
})

AfterEach(func() {
testCleanup(testDir, network, process)
})

It("verifies private data was disseminated", func() {
By("verify access of initial setup")
verifyAccessInitialSetup(network)
})
})

Describe("reconciliation and pulling", func() {
var (
testDir string
network *nwo.Network
process ifrit.Process
orderer *nwo.Orderer
expectedPeers []*nwo.Peer
)

BeforeEach(func() {
testDir, network = initThreeOrgsSetup()
})

JustBeforeEach(func() {
process, orderer, expectedPeers = startNetwork(network)
By("installing and instantiating chaincode on all peers")
chaincode := nwo.Chaincode{
Name: "marblesp",
Expand Down Expand Up @@ -141,7 +199,7 @@ var _ bool = Describe("PrivateData", func() {
`{"docType":"marble","name":"marble1","color":"blue","size":35,"owner":"tom"}`)
})

It("verify private data reconciliation when joining a new peer in an org that belongs to collection config", func() {
It("verify private data is pulled when joining a new peer in an org that belongs to collection config", func() {
By("verify access of initial setup")
verifyAccessInitialSetup(network)

Expand Down Expand Up @@ -222,7 +280,11 @@ var _ bool = Describe("PrivateData", func() {
)

BeforeEach(func() {
testDir, network, process, orderer, expectedPeers = initThreeOrgsSetup()
testDir, network = initThreeOrgsSetup()
})

JustBeforeEach(func() {
process, orderer, expectedPeers = startNetwork(network)

By("installing and instantiating chaincode on all peers")
chaincode := nwo.Chaincode{
Expand Down Expand Up @@ -282,7 +344,11 @@ var _ bool = Describe("PrivateData", func() {
orderer *nwo.Orderer
)
BeforeEach(func() {
testDir, network, process, orderer, _ = initThreeOrgsSetup()
testDir, network = initThreeOrgsSetup()
})

JustBeforeEach(func() {
process, orderer, _ = startNetwork(network)
})

AfterEach(func() {
Expand Down Expand Up @@ -558,7 +624,11 @@ var _ bool = Describe("PrivateData", func() {
)

BeforeEach(func() {
testDir, network, process, orderer, expectedPeers = initThreeOrgsSetup()
testDir, network = initThreeOrgsSetup()
})

JustBeforeEach(func() {
process, orderer, expectedPeers = startNetwork(network)

By("installing and instantiating chaincode on all peers")
chaincode := nwo.Chaincode{
Expand Down Expand Up @@ -623,7 +693,7 @@ var _ bool = Describe("PrivateData", func() {
})
})

func initThreeOrgsSetup() (string, *nwo.Network, ifrit.Process, *nwo.Orderer, []*nwo.Peer) {
func initThreeOrgsSetup() (string, *nwo.Network) {
var err error
testDir, err := ioutil.TempDir("", "e2e-pvtdata")
Expect(err).NotTo(HaveOccurred())
Expand All @@ -640,6 +710,11 @@ func initThreeOrgsSetup() (string, *nwo.Network, ifrit.Process, *nwo.Orderer, []

n := nwo.New(networkConfig, testDir, client, 35000+1000*GinkgoParallelNode(), components)
n.GenerateConfigTree()

return testDir, n
}

func startNetwork(n *nwo.Network) (ifrit.Process, *nwo.Orderer, []*nwo.Peer) {
n.Bootstrap()

networkRunner := n.NetworkGroupRunner()
Expand All @@ -659,7 +734,8 @@ func initThreeOrgsSetup() (string, *nwo.Network, ifrit.Process, *nwo.Orderer, []
By("verifying membership")
verifyMembership(n, expectedPeers, "testchannel")

return testDir, n, process, orderer, expectedPeers
return process, orderer, expectedPeers

}

func verifyAccessInitialSetup(network *nwo.Network) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
[
{
"name": "collectionMarbles",
"policy": "OR('Org1MSP.member', 'Org2MSP.member')",
"requiredPeerCount": 0,
"maxPeerCount": 0,
"blockToLive":1000000,
"memberOnlyRead": false
},
{
"name": "collectionMarblePrivateDetails",
"policy": "OR('Org2MSP.member', 'Org3MSP.member')",
"requiredPeerCount": 0,
"maxPeerCount": 0,
"blockToLive":1000000,
"memberOnlyRead": false
}
]

0 comments on commit 96028fd

Please sign in to comment.