Skip to content

Commit

Permalink
[FAB-2205]Make gossip comm configuable
Browse files Browse the repository at this point in the history
https://jira.hyperledger.org/browse/FAB-2205

Change-Id: Ifb12f5c9f269bd2bed5ee74d9aa84dee16b4ed4a
Signed-off-by: grapebaba <[email protected]>
  • Loading branch information
GrapeBaBa committed Feb 15, 2017
1 parent 9f561b8 commit d633d6f
Show file tree
Hide file tree
Showing 6 changed files with 89 additions and 47 deletions.
27 changes: 14 additions & 13 deletions gossip/comm/comm_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package comm
import (
"bytes"
"crypto/tls"
"errors"
"fmt"
"math/rand"
"net"
Expand All @@ -33,6 +34,7 @@ import (
"github.com/hyperledger/fabric/gossip/util"
proto "github.com/hyperledger/fabric/protos/gossip"
"github.com/op/go-logging"
"github.com/spf13/viper"
"golang.org/x/net/context"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
Expand All @@ -47,16 +49,15 @@ const (
sendOverflowErr = "Send buffer overflow"
)

var errSendOverflow = fmt.Errorf(sendOverflowErr)
var dialTimeout = defDialTimeout
var errSendOverflow = errors.New(sendOverflowErr)

func init() {
rand.Seed(42)
}

// SetDialTimeout sets the dial timeout
func SetDialTimeout(timeout time.Duration) {
dialTimeout = timeout
viper.Set("peer.gossip.dialTimeout", timeout)
}

func (c *commImpl) SetDialOpts(opts ...grpc.DialOption) {
Expand All @@ -75,7 +76,7 @@ func NewCommInstanceWithServer(port int, idMapper identity.Mapper, peerIdentity
var certHash []byte

if len(dialOpts) == 0 {
dialOpts = []grpc.DialOption{grpc.WithTimeout(dialTimeout)}
dialOpts = []grpc.DialOption{grpc.WithTimeout(util.GetDurationOrDefault("peer.gossip.dialTimeout", defDialTimeout))}
}

if port > 0 {
Expand Down Expand Up @@ -168,7 +169,7 @@ func (c *commImpl) createConnection(endpoint string, expectedPKIID common.PKIidT
defer c.logger.Debug("Exiting")

if c.isStopping() {
return nil, fmt.Errorf("Stopping")
return nil, errors.New("Stopping")
}
cc, err = grpc.Dial(endpoint, append(c.opts, grpc.WithBlock())...)
if err != nil {
Expand All @@ -188,7 +189,7 @@ func (c *commImpl) createConnection(endpoint string, expectedPKIID common.PKIidT
if expectedPKIID != nil && !bytes.Equal(pkiID, expectedPKIID) {
// PKIID is nil when we don't know the remote PKI id's
c.logger.Warning("Remote endpoint claims to be a different peer, expected", expectedPKIID, "but got", pkiID)
return nil, fmt.Errorf("Authentication failure")
return nil, errors.New("Authentication failure")
}
conn := newConnection(cl, cc, stream, nil)
conn.pkiID = pkiID
Expand Down Expand Up @@ -275,7 +276,7 @@ func (c *commImpl) Probe(remotePeer *RemotePeer) error {
endpoint := remotePeer.Endpoint
pkiID := remotePeer.PKIID
if c.isStopping() {
return fmt.Errorf("Stopping")
return errors.New("Stopping")
}
c.logger.Debug("Entering, endpoint:", endpoint, "PKIID:", pkiID)
cc, err := grpc.Dial(remotePeer.Endpoint, append(c.opts, grpc.WithBlock())...)
Expand Down Expand Up @@ -407,15 +408,15 @@ func (c *commImpl) authenticateRemotePeer(stream stream) (common.PKIidType, erro

c.logger.Debug("Sending", cMsg, "to", remoteAddress)
stream.Send(cMsg)
m := readWithTimeout(stream, defConnTimeout)
m := readWithTimeout(stream, util.GetDurationOrDefault("peer.gossip.connTimeout", defConnTimeout))
if m == nil {
c.logger.Warning("Timed out waiting for connection message from", remoteAddress)
return nil, fmt.Errorf("Timed out")
return nil, errors.New("Timed out")
}
receivedMsg := m.GetConn()
if receivedMsg == nil {
c.logger.Warning("Expected connection message but got", receivedMsg)
return nil, fmt.Errorf("Wrong type")
return nil, errors.New("Wrong type")
}

if receivedMsg.PkiID == nil {
Expand All @@ -425,7 +426,7 @@ func (c *commImpl) authenticateRemotePeer(stream stream) (common.PKIidType, erro

if c.isPKIblackListed(receivedMsg.PkiID) {
c.logger.Warning("Connection attempt from", remoteAddress, "but it is black-listed")
return nil, fmt.Errorf("Black-listed")
return nil, errors.New("Black-listed")
}
c.logger.Debug("Received", receivedMsg, "from", remoteAddress)
err = c.idMapper.Put(receivedMsg.PkiID, receivedMsg.Cert)
Expand Down Expand Up @@ -456,7 +457,7 @@ func (c *commImpl) authenticateRemotePeer(stream stream) (common.PKIidType, erro

func (c *commImpl) GossipStream(stream proto.Gossip_GossipStreamServer) error {
if c.isStopping() {
return fmt.Errorf("Shutting down")
return errors.New("Shutting down")
}
PKIID, err := c.authenticateRemotePeer(stream)
if err != nil {
Expand Down Expand Up @@ -573,7 +574,7 @@ func createGRPCLayer(port int) (*grpc.Server, net.Listener, grpc.DialOption, []b
}

if len(cert.Certificate) == 0 {
panic(fmt.Errorf("Certificate chain is nil"))
panic(errors.New("Certificate chain is nil"))
}

returnedCertHash = certHashFromRawCert(cert.Certificate[0])
Expand Down
36 changes: 31 additions & 5 deletions gossip/comm/comm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,17 @@ import (
"fmt"
"math/rand"
"os"
"strings"
"sync"
"testing"
"time"

"github.com/hyperledger/fabric/gossip/api"
"github.com/hyperledger/fabric/gossip/common"
"github.com/hyperledger/fabric/gossip/identity"
"github.com/hyperledger/fabric/gossip/util"
proto "github.com/hyperledger/fabric/protos/gossip"
"github.com/spf13/viper"
"github.com/stretchr/testify/assert"
"golang.org/x/net/context"
"google.golang.org/grpc"
Expand All @@ -38,7 +41,6 @@ import (

func init() {
rand.Seed(42)
SetDialTimeout(time.Duration(300) * time.Millisecond)
}

func acceptAll(msg interface{}) bool {
Expand Down Expand Up @@ -149,6 +151,23 @@ func handshaker(endpoint string, comm Comm, t *testing.T, sigMutator func([]byte
return acceptChan
}

func TestViperConfig(t *testing.T) {
viper.SetConfigName("core")
viper.SetEnvPrefix("CORE")
viper.AddConfigPath("./../../peer")
viper.SetEnvKeyReplacer(strings.NewReplacer(".", "_"))
viper.AutomaticEnv()
err := viper.ReadInConfig()
if err != nil { // Handle errors reading the config file
panic(fmt.Errorf("Fatal error config file: %s \n", err))
}

assert.Equal(t, time.Duration(2)*time.Second, util.GetDurationOrDefault("peer.gossip.connTimeout", 0))
assert.Equal(t, time.Duration(300)*time.Millisecond, util.GetDurationOrDefault("peer.gossip.dialTimeout", 0))
assert.Equal(t, 20, util.GetIntOrDefault("peer.gossip.recvBuffSize", 0))
assert.Equal(t, 20, util.GetIntOrDefault("peer.gossip.sendBuffSize", 0))
}

func TestHandshake(t *testing.T) {
t.Parallel()
comm, _ := newCommInstance(9611, naiveSec)
Expand Down Expand Up @@ -285,7 +304,7 @@ func TestParallelSend(t *testing.T) {
defer comm1.Stop()
defer comm2.Stop()

messages2Send := defRecvBuffSize
messages2Send := util.GetIntOrDefault("peer.gossip.recvBuffSize", defRecvBuffSize)

wg := sync.WaitGroup{}
go func() {
Expand Down Expand Up @@ -378,7 +397,7 @@ func TestAccept(t *testing.T) {
var evenResults []uint64
var oddResults []uint64

out := make(chan uint64, defRecvBuffSize)
out := make(chan uint64, util.GetIntOrDefault("peer.gossip.recvBuffSize", defRecvBuffSize))
sem := make(chan struct{}, 0)

readIntoSlice := func(a *[]uint64, ch <-chan proto.ReceivedMessage) {
Expand All @@ -392,11 +411,11 @@ func TestAccept(t *testing.T) {
go readIntoSlice(&evenResults, evenNONCES)
go readIntoSlice(&oddResults, oddNONCES)

for i := 0; i < defRecvBuffSize; i++ {
for i := 0; i < util.GetIntOrDefault("peer.gossip.recvBuffSize", defRecvBuffSize); i++ {
comm2.Send(createGossipMsg(), remotePeer(7611))
}

waitForMessages(t, out, defRecvBuffSize, "Didn't receive all messages sent")
waitForMessages(t, out, util.GetIntOrDefault("peer.gossip.recvBuffSize", defRecvBuffSize), "Didn't receive all messages sent")

comm1.Stop()
comm2.Stop()
Expand Down Expand Up @@ -532,3 +551,10 @@ func waitForMessages(t *testing.T, msgChan chan uint64, count int, errMsg string
}
assert.Equal(t, count, c, errMsg)
}

func TestMain(m *testing.M) {
SetDialTimeout(time.Duration(300) * time.Millisecond)

ret := m.Run()
os.Exit(ret)
}
7 changes: 4 additions & 3 deletions gossip/comm/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"sync/atomic"

"github.com/hyperledger/fabric/gossip/common"
"github.com/hyperledger/fabric/gossip/util"
proto "github.com/hyperledger/fabric/protos/gossip"
"github.com/op/go-logging"
"google.golang.org/grpc"
Expand Down Expand Up @@ -183,7 +184,7 @@ func (cs *connectionStore) closeByPKIid(pkiID common.PKIidType) {

func newConnection(cl proto.GossipClient, c *grpc.ClientConn, cs proto.Gossip_GossipStreamClient, ss proto.Gossip_GossipStreamServer) *connection {
connection := &connection{
outBuff: make(chan *msgSending, defSendBuffSize),
outBuff: make(chan *msgSending, util.GetIntOrDefault("peer.gossip.sendBuffSize", defSendBuffSize)),
cl: cl,
conn: c,
clientStream: cs,
Expand Down Expand Up @@ -242,7 +243,7 @@ func (conn *connection) send(msg *proto.GossipMessage, onErr func(error)) {
conn.Lock()
defer conn.Unlock()

if len(conn.outBuff) == defSendBuffSize {
if len(conn.outBuff) == util.GetIntOrDefault("peer.gossip.sendBuffSize", defSendBuffSize) {
go onErr(errSendOverflow)
return
}
Expand All @@ -257,7 +258,7 @@ func (conn *connection) send(msg *proto.GossipMessage, onErr func(error)) {

func (conn *connection) serviceConnection() error {
errChan := make(chan error, 1)
msgChan := make(chan *proto.GossipMessage, defRecvBuffSize)
msgChan := make(chan *proto.GossipMessage, util.GetIntOrDefault("peer.gossip.recvBuffSize", defRecvBuffSize))
defer close(msgChan)

// Call stream.Recv() asynchronously in readFromStream(),
Expand Down
37 changes: 11 additions & 26 deletions gossip/integration/integration.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,28 +25,13 @@ import (
"github.com/hyperledger/fabric/gossip/api"
"github.com/hyperledger/fabric/gossip/gossip"
"github.com/hyperledger/fabric/gossip/identity"
"github.com/hyperledger/fabric/gossip/util"
"github.com/spf13/viper"
"google.golang.org/grpc"
)

// This file is used to bootstrap a gossip instance and/or leader election service instance

func getIntOrDefault(key string, defVal int) int {
if viper.GetInt(key) == 0 {
return defVal
} else {
return viper.GetInt(key)
}
}

func getDurationOrDefault(key string, defVal time.Duration) time.Duration {
if viper.GetDuration(key) == 0 {
return defVal
} else {
return viper.GetDuration(key)
}
}

func newConfig(selfEndpoint string, externalEndpoint string, bootPeers ...string) *gossip.Config {
port, err := strconv.ParseInt(strings.Split(selfEndpoint, ":")[1], 10, 64)
if err != nil {
Expand All @@ -65,18 +50,18 @@ func newConfig(selfEndpoint string, externalEndpoint string, bootPeers ...string
BindPort: int(port),
BootstrapPeers: bootPeers,
ID: selfEndpoint,
MaxBlockCountToStore: getIntOrDefault("peer.gossip.maxBlockCountToStore", 100),
MaxPropagationBurstLatency: getDurationOrDefault("peer.gossip.maxPropagationBurstLatency", 10*time.Millisecond),
MaxPropagationBurstSize: getIntOrDefault("peer.gossip.maxPropagationBurstSize", 10),
PropagateIterations: getIntOrDefault("peer.gossip.propagateIterations", 1),
PropagatePeerNum: getIntOrDefault("peer.gossip.propagatePeerNum", 3),
PullInterval: getDurationOrDefault("peer.gossip.pullInterval", 4*time.Second),
PullPeerNum: getIntOrDefault("peer.gossip.pullPeerNum", 3),
MaxBlockCountToStore: util.GetIntOrDefault("peer.gossip.maxBlockCountToStore", 100),
MaxPropagationBurstLatency: util.GetDurationOrDefault("peer.gossip.maxPropagationBurstLatency", 10*time.Millisecond),
MaxPropagationBurstSize: util.GetIntOrDefault("peer.gossip.maxPropagationBurstSize", 10),
PropagateIterations: util.GetIntOrDefault("peer.gossip.propagateIterations", 1),
PropagatePeerNum: util.GetIntOrDefault("peer.gossip.propagatePeerNum", 3),
PullInterval: util.GetDurationOrDefault("peer.gossip.pullInterval", 4*time.Second),
PullPeerNum: util.GetIntOrDefault("peer.gossip.pullPeerNum", 3),
InternalEndpoint: selfEndpoint,
ExternalEndpoint: externalEndpoint,
PublishCertPeriod: getDurationOrDefault("peer.gossip.publishCertPeriod", 10*time.Second),
RequestStateInfoInterval: getDurationOrDefault("peer.gossip.requestStateInfoInterval", 4*time.Second),
PublishStateInfoInterval: getDurationOrDefault("peer.gossip.publishStateInfoInterval", 4*time.Second),
PublishCertPeriod: util.GetDurationOrDefault("peer.gossip.publishCertPeriod", 10*time.Second),
RequestStateInfoInterval: util.GetDurationOrDefault("peer.gossip.requestStateInfoInterval", 4*time.Second),
PublishStateInfoInterval: util.GetDurationOrDefault("peer.gossip.publishStateInfoInterval", 4*time.Second),
SkipBlockVerification: viper.GetBool("peer.gossip.skipBlockVerification"),
TLSServerCert: cert,
}
Expand Down
21 changes: 21 additions & 0 deletions gossip/util/misc.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@ import (
"reflect"
"runtime"
"sync"
"time"

"github.com/spf13/viper"
)

// Equals returns whether a and b are the same
Expand Down Expand Up @@ -146,3 +149,21 @@ func PrintStackTrace() {
runtime.Stack(buf, true)
fmt.Printf("%s", buf)
}

// GetIntOrDefault returns the int value from config if present otherwise default value
func GetIntOrDefault(key string, defVal int) int {
if val := viper.GetInt(key); val != 0 {
return val
}

return defVal
}

// GetIntOrDefault returns the Duration value from config if present otherwise default value
func GetDurationOrDefault(key string, defVal time.Duration) time.Duration {
if val := viper.GetDuration(key); val != 0 {
return val
}

return defVal
}
8 changes: 8 additions & 0 deletions peer/core.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,14 @@ peer:
skipBlockVerification: false
# Should we ignore security or not
ignoreSecurity: false
# Dial timeout(unit: second)
dialTimeout: 3s
# Connection timeout(unit: second)
connTimeout: 2s
# Buffer size of received messages
recvBuffSize: 20
# Buffer size of sending messages
sendBuffSize: 20

# This is an endpoint that is published to peers outside of the organization.
# If this isn't set, the peer will not be known to other organizations.
Expand Down

0 comments on commit d633d6f

Please sign in to comment.