Skip to content

Commit

Permalink
[FAB-2688]: Verify blocks on delivery and during p2p
Browse files Browse the repository at this point in the history
https://jira.hyperledger.org/browse/FAB-2688

Add block verification for delivery service client for blocks received
from ordering service and verification for blocks transfered via state
transfer mechanism.

Change-Id: Icbfbd290a3488332fd9113df6cf31ff2c7598d67
Signed-off-by: Artem Barger <[email protected]>
  • Loading branch information
C0rWin committed Mar 9, 2017
1 parent ba1275c commit 312d7e1
Show file tree
Hide file tree
Showing 10 changed files with 99 additions and 18 deletions.
21 changes: 17 additions & 4 deletions core/deliverservice/blocksprovider/blocksprovider.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/hyperledger/fabric/gossip/discovery"

"github.com/hyperledger/fabric/common/localmsp"
"github.com/hyperledger/fabric/gossip/api"
"github.com/hyperledger/fabric/protos/common"
gossip_proto "github.com/hyperledger/fabric/protos/gossip"
"github.com/hyperledger/fabric/protos/orderer"
Expand Down Expand Up @@ -86,6 +87,8 @@ type blocksProviderImpl struct {

gossip GossipServiceAdapter

mcs api.MessageCryptoService

done int32
}

Expand All @@ -96,11 +99,12 @@ func init() {
}

// NewBlocksProvider constructor function to creare blocks deliverer instance
func NewBlocksProvider(chainID string, client BlocksDeliverer, gossip GossipServiceAdapter) BlocksProvider {
func NewBlocksProvider(chainID string, client BlocksDeliverer, gossip GossipServiceAdapter, mcs api.MessageCryptoService) BlocksProvider {
return &blocksProviderImpl{
chainID: chainID,
client: client,
gossip: gossip,
mcs: mcs,
}
}

Expand All @@ -123,9 +127,19 @@ func (b *blocksProviderImpl) DeliverBlocks() {
case *orderer.DeliverResponse_Block:
seqNum := t.Block.Header.Number

marshaledBlock, err := proto.Marshal(t.Block)
if err != nil {
logger.Errorf("Error serializing block with sequence number %d, due to %s", seqNum, err)
continue
}
if err := b.mcs.VerifyBlock(gossipcommon.ChainID(b.chainID), marshaledBlock); err != nil {
logger.Errorf("Error verifying block with sequnce number %d, due to %s", seqNum, err)
continue
}

numberOfPeers := len(b.gossip.PeersOfChannel(gossipcommon.ChainID(b.chainID)))
// Create payload with a block received
payload := createPayload(seqNum, t.Block)
payload := createPayload(seqNum, marshaledBlock)
// Use payload to create gossip message
gossipMsg := createGossipMsg(b.chainID, payload)

Expand Down Expand Up @@ -223,8 +237,7 @@ func createGossipMsg(chainID string, payload *gossip_proto.Payload) *gossip_prot
return gossipMsg
}

func createPayload(seqNum uint64, block *common.Block) *gossip_proto.Payload {
marshaledBlock, _ := proto.Marshal(block)
func createPayload(seqNum uint64, marshaledBlock []byte) *gossip_proto.Payload {
return &gossip_proto.Payload{
Data: marshaledBlock,
SeqNum: seqNum,
Expand Down
30 changes: 30 additions & 0 deletions core/deliverservice/blocksprovider/blocksprovider_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,40 @@ import (
"time"

"github.com/hyperledger/fabric/core/deliverservice/mocks"
"github.com/hyperledger/fabric/gossip/api"
common2 "github.com/hyperledger/fabric/gossip/common"
"github.com/hyperledger/fabric/protos/common"
"github.com/hyperledger/fabric/protos/orderer"
"github.com/stretchr/testify/assert"
)

type mockMCS struct {
}

func (*mockMCS) GetPKIidOfCert(peerIdentity api.PeerIdentityType) common2.PKIidType {
return common2.PKIidType("pkiID")
}

func (*mockMCS) VerifyBlock(chainID common2.ChainID, signedBlock []byte) error {
return nil
}

func (*mockMCS) Sign(msg []byte) ([]byte, error) {
return msg, nil
}

func (*mockMCS) Verify(peerIdentity api.PeerIdentityType, signature, message []byte) error {
return nil
}

func (*mockMCS) VerifyByChannel(chainID common2.ChainID, peerIdentity api.PeerIdentityType, signature, message []byte) error {
return nil
}

func (*mockMCS) ValidateIdentity(peerIdentity api.PeerIdentityType) error {
return nil
}

// Used to generate a simple test case to initialize delivery
// from given block sequence number.
func makeTestCase(ledgerHeight uint64) func(*testing.T) {
Expand All @@ -38,6 +67,7 @@ func makeTestCase(ledgerHeight uint64) func(*testing.T) {
chainID: "***TEST_CHAINID***",
gossip: gossipServiceAdapter,
client: deliverer,
mcs: &mockMCS{},
}

provider.RequestBlocks(&mocks.MockLedgerInfo{ledgerHeight})
Expand Down
12 changes: 8 additions & 4 deletions core/deliverservice/deliveryclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (

"github.com/hyperledger/fabric/core/comm"
"github.com/hyperledger/fabric/core/deliverservice/blocksprovider"
"github.com/hyperledger/fabric/gossip/api"
"github.com/hyperledger/fabric/protos/orderer"
"github.com/op/go-logging"
"golang.org/x/net/context"
Expand Down Expand Up @@ -95,13 +96,15 @@ type deliverServiceImpl struct {
stopping bool

conn *grpc.ClientConn

mcs api.MessageCryptoService
}

// NewDeliverService construction function to create and initialize
// delivery service instance. It tries to establish connection to
// the specified in the configuration ordering service, in case it
// fails to dial to it, return nil
func NewDeliverService(gossip blocksprovider.GossipServiceAdapter, endpoints []string) (DeliverService, error) {
func NewDeliverService(gossip blocksprovider.GossipServiceAdapter, endpoints []string, mcs api.MessageCryptoService) (DeliverService, error) {
indices := rand.Perm(len(endpoints))
for _, idx := range indices {
logger.Infof("Creating delivery service to get blocks from the ordering service, %s", endpoints[idx])
Expand All @@ -119,20 +122,21 @@ func NewDeliverService(gossip blocksprovider.GossipServiceAdapter, endpoints []s
logger.Errorf("Cannot dial to %s, because of %s", endpoints[idx], err)
continue
}
return NewFactoryDeliverService(gossip, &blocksDelivererFactoryImpl{conn}, conn), nil
return NewFactoryDeliverService(gossip, &blocksDelivererFactoryImpl{conn}, conn, mcs), nil
}
return nil, fmt.Errorf("Wasn't able to connect to any of ordering service endpoints %s", endpoints)
}

// NewFactoryDeliverService construction function to create and initialize
// delivery service instance, with gossip service adapter and customized
// factory to create blocks deliverers.
func NewFactoryDeliverService(gossip blocksprovider.GossipServiceAdapter, factory BlocksDelivererFactory, conn *grpc.ClientConn) DeliverService {
func NewFactoryDeliverService(gossip blocksprovider.GossipServiceAdapter, factory BlocksDelivererFactory, conn *grpc.ClientConn, mcs api.MessageCryptoService) DeliverService {
return &deliverServiceImpl{
clientsFactory: factory,
gossip: gossip,
clients: make(map[string]blocksprovider.BlocksProvider),
conn: conn,
mcs: mcs,
}
}

Expand All @@ -159,7 +163,7 @@ func (d *deliverServiceImpl) StartDeliverForChannel(chainID string, ledgerInfo b
return err
}
logger.Debug("This peer will pass blocks from orderer service to other peers")
d.clients[chainID] = blocksprovider.NewBlocksProvider(chainID, abc, d.gossip)
d.clients[chainID] = blocksprovider.NewBlocksProvider(chainID, abc, d.gossip, d.mcs)

if err := d.clients[chainID].RequestBlocks(ledgerInfo); err == nil {
// Start reading blocks from ordering service in case this peer is a leader for specified chain
Expand Down
31 changes: 30 additions & 1 deletion core/deliverservice/deliveryclient_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ import (
"github.com/docker/docker/pkg/testutil/assert"
"github.com/hyperledger/fabric/core/deliverservice/blocksprovider"
"github.com/hyperledger/fabric/core/deliverservice/mocks"
"github.com/hyperledger/fabric/gossip/api"
"github.com/hyperledger/fabric/gossip/common"
"github.com/spf13/viper"
)

Expand All @@ -35,6 +37,33 @@ func (mock *mockBlocksDelivererFactory) Create() (blocksprovider.BlocksDeliverer
return mock.mockCreate()
}

type mockMCS struct {
}

func (*mockMCS) GetPKIidOfCert(peerIdentity api.PeerIdentityType) common.PKIidType {
return common.PKIidType("pkiID")
}

func (*mockMCS) VerifyBlock(chainID common.ChainID, signedBlock []byte) error {
return nil
}

func (*mockMCS) Sign(msg []byte) ([]byte, error) {
return msg, nil
}

func (*mockMCS) Verify(peerIdentity api.PeerIdentityType, signature, message []byte) error {
return nil
}

func (*mockMCS) VerifyByChannel(chainID common.ChainID, peerIdentity api.PeerIdentityType, signature, message []byte) error {
return nil
}

func (*mockMCS) ValidateIdentity(peerIdentity api.PeerIdentityType) error {
return nil
}

func TestNewDeliverService(t *testing.T) {
viper.Set("peer.gossip.orgLeader", true)

Expand All @@ -48,7 +77,7 @@ func TestNewDeliverService(t *testing.T) {
return blocksDeliverer, nil
}

service := NewFactoryDeliverService(gossipServiceAdapter, factory, nil)
service := NewFactoryDeliverService(gossipServiceAdapter, factory, nil, &mockMCS{})
assert.NilError(t, service.StartDeliverForChannel("TEST_CHAINID", &mocks.MockLedgerInfo{0}))

// Lets start deliver twice
Expand Down
3 changes: 2 additions & 1 deletion core/peer/peer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/hyperledger/fabric/core/deliverservice"
"github.com/hyperledger/fabric/core/deliverservice/blocksprovider"
"github.com/hyperledger/fabric/core/mocks/ccprovider"
"github.com/hyperledger/fabric/gossip/api"
"github.com/hyperledger/fabric/gossip/service"
"github.com/hyperledger/fabric/msp/mgmt"
"github.com/hyperledger/fabric/msp/mgmt/testtools"
Expand Down Expand Up @@ -60,7 +61,7 @@ func (*mockDeliveryClient) Stop() {
type mockDeliveryClientFactory struct {
}

func (*mockDeliveryClientFactory) Service(g service.GossipService, endpoints []string) (deliverclient.DeliverService, error) {
func (*mockDeliveryClientFactory) Service(g service.GossipService, endpoints []string, mcs api.MessageCryptoService) (deliverclient.DeliverService, error) {
return &mockDeliveryClient{}, nil
}

Expand Down
3 changes: 2 additions & 1 deletion core/scc/cscc/configure_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/hyperledger/fabric/core/deliverservice/blocksprovider"
"github.com/hyperledger/fabric/core/ledger/ledgermgmt"
"github.com/hyperledger/fabric/core/peer"
"github.com/hyperledger/fabric/gossip/api"
"github.com/hyperledger/fabric/gossip/service"
"github.com/hyperledger/fabric/msp/mgmt"
"github.com/hyperledger/fabric/msp/mgmt/testtools"
Expand Down Expand Up @@ -66,7 +67,7 @@ func (*mockDeliveryClient) Stop() {
type mockDeliveryClientFactory struct {
}

func (*mockDeliveryClientFactory) Service(g service.GossipService, endpoints []string) (deliverclient.DeliverService, error) {
func (*mockDeliveryClientFactory) Service(g service.GossipService, endpoints []string, mcs api.MessageCryptoService) (deliverclient.DeliverService, error) {
return &mockDeliveryClient{}, nil
}

Expand Down
3 changes: 1 addition & 2 deletions gossip/election/election_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,12 @@ package election

import (
"fmt"
"strings"
"sync"
"sync/atomic"
"testing"
"time"

"strings"

"github.com/spf13/viper"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
Expand Down
8 changes: 4 additions & 4 deletions gossip/service/gossip_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,15 +61,15 @@ type GossipService interface {
// DeliveryServiceFactory factory to create and initialize delivery service instance
type DeliveryServiceFactory interface {
// Returns an instance of delivery client
Service(g GossipService, endpoints []string) (deliverclient.DeliverService, error)
Service(g GossipService, endpoints []string, msc api.MessageCryptoService) (deliverclient.DeliverService, error)
}

type deliveryFactoryImpl struct {
}

// Returns an instance of delivery client
func (*deliveryFactoryImpl) Service(g GossipService, endpoints []string) (deliverclient.DeliverService, error) {
return deliverclient.NewDeliverService(g, endpoints)
func (*deliveryFactoryImpl) Service(g GossipService, endpoints []string, mcs api.MessageCryptoService) (deliverclient.DeliverService, error) {
return deliverclient.NewDeliverService(g, endpoints, mcs)
}

type gossipServiceImpl struct {
Expand Down Expand Up @@ -168,7 +168,7 @@ func (g *gossipServiceImpl) InitializeChannel(chainID string, committer committe
g.chains[chainID] = state.NewGossipStateProvider(chainID, g, committer, g.mcs)
if g.deliveryService == nil {
var err error
g.deliveryService, err = g.deliveryFactory.Service(gossipServiceInstance, endpoints)
g.deliveryService, err = g.deliveryFactory.Service(gossipServiceInstance, endpoints, g.mcs)
if err != nil {
logger.Warning("Cannot create delivery client, due to", err)
}
Expand Down
2 changes: 1 addition & 1 deletion gossip/service/gossip_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,7 @@ type mockDeliverServiceFactory struct {
service *mockDeliverService
}

func (mf *mockDeliverServiceFactory) Service(g GossipService, endpoints []string) (deliverclient.DeliverService, error) {
func (mf *mockDeliverServiceFactory) Service(g GossipService, endpoints []string, mcs api.MessageCryptoService) (deliverclient.DeliverService, error) {
return mf.service, nil
}

Expand Down
4 changes: 4 additions & 0 deletions gossip/state/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,10 @@ func (s *GossipStateProviderImpl) handleStateResponse(msg proto.ReceivedMessage)
response := msg.GetGossipMessage().GetStateResponse()
for _, payload := range response.GetPayloads() {
s.logger.Debugf("Received payload with sequence number %d.", payload.SeqNum)
if err := s.mcs.VerifyBlock(common2.ChainID(s.chainID), payload.Data); err != nil {
s.logger.Warningf("Error verifying block with sequence number %d, due to %s", payload.SeqNum, err)
return
}
err := s.payloads.Push(payload)
if err != nil {
s.logger.Warningf("Payload with sequence number %d was received earlier", payload.SeqNum)
Expand Down

0 comments on commit 312d7e1

Please sign in to comment.