Skip to content

Commit

Permalink
[FAB-1288]: Expose gossip API for cscc.
Browse files Browse the repository at this point in the history
Add gossip service entity to encapsulate gossip + state instance.
Make service instance a singelton and add JoinChannel method so
cscc will be able to leverage it.

Change-Id: I38233781276d538f861e472a427a1df12887c887
Signed-off-by: Artem Barger <[email protected]>
Signed-off-by: Yacov Manevich <[email protected]>
  • Loading branch information
C0rWin authored and yacovm committed Dec 12, 2016
1 parent ce296d2 commit 8417c0e
Show file tree
Hide file tree
Showing 5 changed files with 241 additions and 49 deletions.
65 changes: 22 additions & 43 deletions core/committer/noopssinglechain/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,13 @@ limitations under the License.
package noopssinglechain

import (
"fmt"
"time"

"github.com/golang/protobuf/proto"
"github.com/hyperledger/fabric/core/chaincode"
"github.com/hyperledger/fabric/core/committer"
"github.com/hyperledger/fabric/core/ledger/kvledger"
"github.com/hyperledger/fabric/core/util"
"github.com/hyperledger/fabric/events/producer"
"github.com/hyperledger/fabric/protos/common"
"github.com/hyperledger/fabric/protos/orderer"
Expand All @@ -33,14 +33,10 @@ import (
"golang.org/x/net/context"
"google.golang.org/grpc"

"fmt"

"github.com/hyperledger/fabric/core/ledger"
"github.com/hyperledger/fabric/core/peer"
"github.com/hyperledger/fabric/gossip/gossip"
"github.com/hyperledger/fabric/gossip/integration"
gossip_proto "github.com/hyperledger/fabric/gossip/proto"
"github.com/hyperledger/fabric/gossip/state"
"github.com/hyperledger/fabric/gossip/service"
pb "github.com/hyperledger/fabric/protos/peer"
)

Expand All @@ -57,11 +53,9 @@ type DeliverService struct {
client orderer.AtomicBroadcast_DeliverClient
windowSize uint64
unAcknowledged uint64
committer *committer.LedgerCommitter

stateProvider state.GossipStateProvider
gossip gossip.Gossip
conn *grpc.ClientConn
chainID string
conn *grpc.ClientConn
}

// StopDeliveryService sends stop to the delivery service reference
Expand All @@ -73,25 +67,22 @@ func StopDeliveryService(service *DeliverService) {

// NewDeliverService construction function to create and initilize
// delivery service instance
func NewDeliverService(chainID string, address string, grpcServer *grpc.Server) *DeliverService {
func NewDeliverService(chainID string) *DeliverService {
if viper.GetBool("peer.committer.enabled") {
logger.Infof("Creating committer for single noops endorser")

deliverService := &DeliverService{
// Instance of RawLedger
committer: committer.NewLedgerCommitter(kvledger.GetLedger(chainID)),
chainID: chainID,
windowSize: 10,
}

deliverService.initStateProvider(address, grpcServer)

return deliverService
}
logger.Infof("Committer disabled")
return nil
}

func (d *DeliverService) startDeliver() error {
func (d *DeliverService) startDeliver(committer committer.Committer) error {
logger.Info("Starting deliver service client")
err := d.initDeliver()

Expand All @@ -100,7 +91,7 @@ func (d *DeliverService) startDeliver() error {
return err
}

height, err := d.committer.LedgerHeight()
height, err := committer.LedgerHeight()
if err != nil {
logger.Errorf("Can't get legder height from committer [%s]", err)
return err
Expand Down Expand Up @@ -153,36 +144,24 @@ func (d *DeliverService) stopDeliver() {
}
}

func (d *DeliverService) initStateProvider(address string, grpcServer *grpc.Server) error {
bootstrap := viper.GetStringSlice("peer.gossip.bootstrap")
logger.Debug("Initializing state provideer, endpoint = ", address, " bootstrap set = ", bootstrap)

gossip, gossipComm := integration.NewGossipComponent(address, grpcServer, bootstrap...)

d.gossip = gossip
d.stateProvider = state.NewGossipStateProvider(gossip, gossipComm, d.committer)
return nil
}

// Start the delivery service to read the block via delivery
// protocol from the orderers
func (d *DeliverService) Start() {
go d.checkLeaderAndRunDeliver()
}

// Stop all service and release resources
func (d *DeliverService) Stop() {
d.stopDeliver()
d.stateProvider.Stop()
d.gossip.Stop()
}

func (d *DeliverService) checkLeaderAndRunDeliver() {
func (d *DeliverService) JoinChannel(committer committer.Committer, configBlock *common.Block) {
if err := service.GetGossipService().JoinChannel(committer, configBlock); err != nil {
panic("Cannot join channel, exiting")
}

go d.checkLeaderAndRunDeliver(committer)
}

func (d *DeliverService) checkLeaderAndRunDeliver(committer committer.Committer) {
isLeader := viper.GetBool("peer.gossip.orgLeader")

if isLeader {
d.startDeliver()
d.startDeliver(committer)
}
}

Expand All @@ -192,7 +171,7 @@ func (d *DeliverService) seekOldest() error {
Seek: &orderer.SeekInfo{
Start: orderer.SeekInfo_OLDEST,
WindowSize: d.windowSize,
ChainID: util.GetTestChainID(),
ChainID: d.chainID,
},
},
})
Expand All @@ -205,7 +184,7 @@ func (d *DeliverService) seekLatestFromCommitter(height uint64) error {
Start: orderer.SeekInfo_SPECIFIED,
WindowSize: d.windowSize,
SpecifiedNumber: height,
ChainID: util.GetTestChainID(),
ChainID: d.chainID,
},
},
})
Expand Down Expand Up @@ -317,17 +296,17 @@ func (d *DeliverService) readUntilClose() {
}
}

numberOfPeers := len(d.gossip.GetPeers())
numberOfPeers := len(service.GetGossipService().GetPeers())
// Create payload with a block received
payload := createPayload(seqNum, block)
// Use payload to create gossip message
gossipMsg := createGossipMsg(payload)
logger.Debugf("Adding payload locally, buffer seqNum = [%d], peers number [%d]", seqNum, numberOfPeers)
// Add payload to local state payloads buffer
d.stateProvider.AddPayload(payload)
service.GetGossipService().AddPayload(d.chainID, payload)
// Gossip messages with other nodes
logger.Debugf("Gossiping block [%d], peers number [%d]", seqNum, numberOfPeers)
d.gossip.Gossip(gossipMsg)
service.GetGossipService().Gossip(gossipMsg)
if err = producer.SendProducerBlockEvent(block); err != nil {
logger.Errorf("Error sending block event %s", err)
}
Expand Down
145 changes: 145 additions & 0 deletions gossip/service/gossip_service.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
/*
Copyright IBM Corp. 2016 All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package service

import (
"sync"
"fmt"

pb "github.com/golang/protobuf/proto"
"github.com/hyperledger/fabric/core/committer"
"github.com/hyperledger/fabric/gossip/comm"
gossipCommon "github.com/hyperledger/fabric/gossip/common"
"github.com/hyperledger/fabric/gossip/discovery"
"github.com/hyperledger/fabric/gossip/gossip"
"github.com/hyperledger/fabric/gossip/integration"
"github.com/hyperledger/fabric/gossip/proto"
"github.com/hyperledger/fabric/gossip/state"
"github.com/hyperledger/fabric/protos/common"
"google.golang.org/grpc"
)

var (
gossipServiceInstance *gossipServiceImpl
once sync.Once
)

// GossipService encapsulates gossip and state capabilities into single interface
type GossipService interface {
gossip.Gossip

// JoinChannel joins new chain given the configuration block and initialized committer service
JoinChannel(committer committer.Committer, block *common.Block) error
// GetBlock returns block for given chain
GetBlock(chainID string, index uint64) *common.Block
// AddPayload appends message payload to for given chain
AddPayload(chainID string, payload *proto.Payload) error
}

type gossipServiceImpl struct {
comm comm.Comm
gossip gossip.Gossip
chains map[string]state.GossipStateProvider
lock sync.RWMutex
}

// InitGossipService initialize gossip service
func InitGossipService(endpoint string, s *grpc.Server, bootPeers ...string) {
once.Do(func() {
gossip, communication := integration.NewGossipComponent(endpoint, s, bootPeers...)
gossipServiceInstance = &gossipServiceImpl{
gossip: gossip,
comm: communication,
chains: make(map[string]state.GossipStateProvider),
}
})
}

// GetGossipService returns an instance of gossip service
func GetGossipService() GossipService {
return gossipServiceInstance
}

// JoinChannel joins the channel and initialize gossip state with given committer
func (g *gossipServiceImpl) JoinChannel(commiter committer.Committer, block *common.Block) error {
g.lock.Lock()
defer g.lock.Unlock()

if block.Data == nil || block.Data.Data == nil || len(block.Data.Data) == 0 {
return fmt.Errorf("Cannot join channel, configuration block is empty")
}

envelope := &common.Envelope{}
if err := pb.Unmarshal(block.Data.Data[0], envelope); err != nil {
return err
}

payload := &common.Payload{}
if err := pb.Unmarshal(envelope.Payload, payload); err != nil {
return err
}

chainID := payload.Header.ChainHeader.ChainID
if len(chainID) == 0 {
return fmt.Errorf("Cannot join channel, with empty chainID")
}
// Initialize new state provider for given committer
g.chains[chainID] = state.NewGossipStateProvider(g.gossip, g.comm, commiter)
return nil
}

// GetPeers returns a mapping of endpoint --> []discovery.NetworkMember
func (g *gossipServiceImpl) GetPeers() []discovery.NetworkMember {
return g.gossip.GetPeers()
}

// UpdateMetadata updates the self metadata of the discovery layer
func (g *gossipServiceImpl) UpdateMetadata(data []byte) {
g.gossip.UpdateMetadata(data)
}

// Gossip sends a message to other peers to the network
func (g *gossipServiceImpl) Gossip(msg *proto.GossipMessage) {
g.gossip.Gossip(msg)
}

// Accept returns a channel that outputs messages from other peers
func (g *gossipServiceImpl) Accept(acceptor gossipCommon.MessageAcceptor) <-chan *proto.GossipMessage {
return g.gossip.Accept(acceptor)
}

// GetBlock returns block for given chain
func (g *gossipServiceImpl) GetBlock(chainID string, index uint64) *common.Block {
g.lock.RLock()
defer g.lock.RUnlock()
return g.chains[chainID].GetBlock(index)
}

// AddPayload appends message payload to for given chain
func (g *gossipServiceImpl) AddPayload(chainID string, payload *proto.Payload) error {
g.lock.RLock()
defer g.lock.RUnlock()
return g.chains[chainID].AddPayload(payload)
}

// Stop stops the gossip component
func (g *gossipServiceImpl) Stop() {
for _, ch := range g.chains {
ch.Stop()
}
g.gossip.Stop()
}
56 changes: 56 additions & 0 deletions gossip/service/gossip_service_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
Copyright IBM Corp. 2016 All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package service

import (
"fmt"
"net"
"sync"
"testing"

"github.com/stretchr/testify/assert"
"google.golang.org/grpc"
)

func TestInitGossipService(t *testing.T) {
// Test whenever gossip service is indeed singleton
grpcServer := grpc.NewServer()
socket, error := net.Listen("tcp", fmt.Sprintf("%s:%d", "", 5611))
assert.NoError(t, error)

go grpcServer.Serve(socket)
defer grpcServer.Stop()

wg := sync.WaitGroup{}
wg.Add(10)
for i := 0; i < 10; i++ {
go func() {
InitGossipService("localhost:5611", grpcServer)
wg.Done()
}()
}
wg.Wait()

defer GetGossipService().Stop()
gossip := GetGossipService()

for i := 0; i < 10; i++ {
go func(gossipInstance GossipService) {
assert.Equal(t, gossip, GetGossipService())
}(gossip)
}
}
4 changes: 2 additions & 2 deletions peer/common/ordererclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,12 +56,12 @@ func GetBroadcastClient() (BroadcastClient, error) {

conn, err := grpc.Dial(orderer, opts...)
if err != nil {
return nil, fmt.Errorf("Error connecting: %s", err)
return nil, fmt.Errorf("Error connecting to %s due to %s", orderer, err)
}
client, err := ab.NewAtomicBroadcastClient(conn).Broadcast(context.TODO())
if err != nil {
conn.Close()
return nil, fmt.Errorf("Error connecting: %s", err)
return nil, fmt.Errorf("Error connecting to %s due to %s", orderer, err)
}

return &broadcastClient{conn: conn, client: client}, nil
Expand Down
Loading

0 comments on commit 8417c0e

Please sign in to comment.