Skip to content

Commit

Permalink
[FAB-12816] cluster.RPC mapping per destination.
Browse files Browse the repository at this point in the history
The egress wrapper for the cluster communication caches the last
Submit stream and only reacquires it if a Send() or Recv() was
attempted and failed.

This is problematic because a leader may change while there is no call
to Send() or Recv() in a follower node, and then the stream will be
mapped to a node that is no longer the leader.

This change set makes the mapping be per destination and not global,
by adding a map that maps destinations to streams.

Change-Id: I028bde6fb2248cb8a56ad622be4e73827834caf4
Signed-off-by: yacovm <[email protected]>
  • Loading branch information
yacovm committed Nov 19, 2018
1 parent c1ff9e6 commit d2f2029
Show file tree
Hide file tree
Showing 3 changed files with 79 additions and 14 deletions.
39 changes: 30 additions & 9 deletions orderer/common/cluster/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ package cluster

import (
"context"
"sync"

"github.com/hyperledger/fabric/protos/orderer"
"github.com/pkg/errors"
Expand Down Expand Up @@ -36,9 +37,10 @@ type Client interface {

// RPC performs remote procedure calls to remote cluster nodes.
type RPC struct {
stream orderer.Cluster_SubmitClient
Channel string
Comm Communicator
Channel string
Comm Communicator
lock sync.RWMutex
DestinationToStream map[uint64]orderer.Cluster_SubmitClient
}

// Step sends a StepRequest to the given destination node and returns the response
Expand All @@ -58,7 +60,7 @@ func (s *RPC) SendSubmit(destination uint64, request *orderer.SubmitRequest) err
}
err = stream.Send(request)
if err != nil {
s.stream = nil
s.unMapStream(destination)
}
return err
}
Expand All @@ -71,24 +73,43 @@ func (s *RPC) ReceiveSubmitResponse(destination uint64) (*orderer.SubmitResponse
}
msg, err := stream.Recv()
if err != nil {
s.stream = nil
s.unMapStream(destination)
}
return msg, err
}

// getProposeStream obtains a Submit stream for the given destination node
func (s *RPC) getProposeStream(destination uint64) (orderer.Cluster_SubmitClient, error) {
if s.stream != nil {
return s.stream, nil
stream := s.getStream(destination)
if stream != nil {
return stream, nil
}
stub, err := s.Comm.Remote(s.Channel, destination)
if err != nil {
return nil, errors.WithStack(err)
}
stream, err := stub.SubmitStream()
stream, err = stub.SubmitStream()
if err != nil {
return nil, errors.WithStack(err)
}
s.stream = stream
s.mapStream(destination, stream)
return stream, nil
}

func (s *RPC) getStream(destination uint64) orderer.Cluster_SubmitClient {
s.lock.RLock()
defer s.lock.RUnlock()
return s.DestinationToStream[destination]
}

func (s *RPC) mapStream(destination uint64, stream orderer.Cluster_SubmitClient) {
s.lock.Lock()
defer s.lock.Unlock()
s.DestinationToStream[destination] = stream
}

func (s *RPC) unMapStream(destination uint64) {
s.lock.Lock()
defer s.lock.Unlock()
delete(s.DestinationToStream, destination)
}
48 changes: 44 additions & 4 deletions orderer/common/cluster/rpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,9 @@ func TestRPCStep(t *testing.T) {
}, testcase.remoteErr)

rpc := &cluster.RPC{
Channel: "mychannel",
Comm: comm,
DestinationToStream: make(map[uint64]orderer.Cluster_SubmitClient),
Channel: "mychannel",
Comm: comm,
}

response, err := rpc.Step(1, &orderer.StepRequest{
Expand All @@ -74,6 +75,44 @@ func TestRPCStep(t *testing.T) {
}
}

func TestRPCChangeDestination(t *testing.T) {
t.Parallel()
// We send a Submit() to 2 different nodes - 1 and 2.
// The first invocation of Submit() establishes a stream with node 1
// and the second establishes a stream with node 2.
// We define a mock behavior for only a single invocation of Send() on each
// of the streams (to node 1 and to node 2), therefore we test that invocation
// of rpc.SendSubmit to node 2 doesn't send the message to node 1.
comm := &mocks.Communicator{}

client1 := &mocks.ClusterClient{}
client2 := &mocks.ClusterClient{}

comm.On("Remote", "mychannel", uint64(1)).Return(&cluster.RemoteContext{Client: client1}, nil)
comm.On("Remote", "mychannel", uint64(2)).Return(&cluster.RemoteContext{Client: client2}, nil)

streamToNode1 := &mocks.SubmitClient{}
streamToNode2 := &mocks.SubmitClient{}

client1.On("Submit", mock.Anything).Return(streamToNode1, nil).Once()
client2.On("Submit", mock.Anything).Return(streamToNode2, nil).Once()

rpc := &cluster.RPC{
DestinationToStream: make(map[uint64]orderer.Cluster_SubmitClient),
Channel: "mychannel",
Comm: comm,
}

streamToNode1.On("Send", mock.Anything).Return(nil).Once()
streamToNode2.On("Send", mock.Anything).Return(nil).Once()

rpc.SendSubmit(1, &orderer.SubmitRequest{Channel: "mychannel"})
rpc.SendSubmit(2, &orderer.SubmitRequest{Channel: "mychannel"})

streamToNode1.AssertNumberOfCalls(t, "Send", 1)
streamToNode2.AssertNumberOfCalls(t, "Send", 1)
}

func TestRPCSubmitSend(t *testing.T) {
t.Parallel()
submitRequest := &orderer.SubmitRequest{Channel: "mychannel"}
Expand Down Expand Up @@ -149,8 +188,9 @@ func TestRPCSubmitSend(t *testing.T) {
}, testCase.remoteError)

rpc := &cluster.RPC{
Channel: "mychannel",
Comm: comm,
DestinationToStream: make(map[uint64]orderer.Cluster_SubmitClient),
Channel: "mychannel",
Comm: comm,
}

var msg *orderer.SubmitResponse
Expand Down
6 changes: 5 additions & 1 deletion orderer/consensus/etcdraft/consenter.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,11 @@ func (c *Consenter) HandleChain(support consensus.ConsenterSupport, metadata *co
SnapDir: path.Join(c.EtcdRaftConfig.SnapDir, support.ChainID()),
}

rpc := &cluster.RPC{Channel: support.ChainID(), Comm: c.Communication}
rpc := &cluster.RPC{
Channel: support.ChainID(),
Comm: c.Communication,
DestinationToStream: make(map[uint64]orderer.Cluster_SubmitClient),
}
return NewChain(support, opts, c.Communication, rpc, bp, nil)
}

Expand Down

0 comments on commit d2f2029

Please sign in to comment.