Skip to content

Commit

Permalink
[FAB-3213] Gossip identity revocation support
Browse files Browse the repository at this point in the history
When a peer gets a new config block and it contains CRLs,
the gossip layer needs to be notified in order to close existing
connections to peers that their certificates have been expired.

If this is not done, then these peers are still forwarded data
like peer membership and channel membership, because the connection
is already open.

This commit adds an ability to revoke identities by receiving a predicate
function that: given an identity, it returns whether it is suspected of being
revoked (i.e, the SN is found within some CRL of some MSP).
Then- the gossip layer calls ValidateIdentity on the stored identity, and if
it is found to be invalid- it:
- deletes the identity from memory
- closes an active connection to the peer, if such exists.

Currently the implementation of that predicate is the naive/obvious one,
that suspects all identities.
In a future commit I'll (hopefully) add code that uses the CRLs themselves.

Change-Id: I56d995a3720a736b1242b13a193f9a7933299345
Signed-off-by: Yacov Manevich <[email protected]>
  • Loading branch information
yacovm committed Apr 18, 2017
1 parent 077126e commit 9d12166
Show file tree
Hide file tree
Showing 12 changed files with 234 additions and 126 deletions.
8 changes: 8 additions & 0 deletions core/peer/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"github.com/hyperledger/fabric/core/committer/txvalidator"
"github.com/hyperledger/fabric/core/ledger"
"github.com/hyperledger/fabric/core/ledger/ledgermgmt"
"github.com/hyperledger/fabric/gossip/api"
"github.com/hyperledger/fabric/gossip/service"
"github.com/hyperledger/fabric/msp"
mspmgmt "github.com/hyperledger/fabric/msp/mgmt"
Expand Down Expand Up @@ -187,6 +188,13 @@ func createChain(cid string, ledger ledger.PeerLedger, cb *common.Block) error {
Manager: cm,
Application: configtxInitializer.ApplicationConfig(),
})
service.GetGossipService().SuspectPeers(func(identity api.PeerIdentityType) bool {
// TODO: this is a place-holder that would somehow make the MSP layer suspect
// that a given certificate is revoked, or its intermediate CA is revoked.
// In the meantime, before we have such an ability, we return true in order
// to suspect ALL identities in order to validate all of them.
return true
})
}

trustedRootsCallbackWrapper := func(cm configtxapi.Manager) {
Expand Down
4 changes: 4 additions & 0 deletions gossip/api/crypto.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,3 +57,7 @@ type MessageCryptoService interface {

// PeerIdentityType is the peer's certificate
type PeerIdentityType []byte

// PeerSuspector returns whether a peer with a given identity is suspected
// as being revoked, or its CA is revoked
type PeerSuspector func(identity PeerIdentityType) bool
3 changes: 0 additions & 3 deletions gossip/comm/comm.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,6 @@ type Comm interface {

// Stop stops the module
Stop()

// BlackListPKIid prohibits the module communicating with the given PKIid
BlackListPKIid(PKIid common.PKIidType)
}

// RemotePeer defines a peer's endpoint and its PKIid
Expand Down
94 changes: 33 additions & 61 deletions gossip/comm/comm_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,22 +86,21 @@ func NewCommInstanceWithServer(port int, idMapper identity.Mapper, peerIdentity
}

commInst := &commImpl{
selfCertHash: certHash,
PKIID: idMapper.GetPKIidOfCert(peerIdentity),
idMapper: idMapper,
logger: util.GetLogger(util.LoggingCommModule, fmt.Sprintf("%d", port)),
peerIdentity: peerIdentity,
opts: dialOpts,
port: port,
lsnr: ll,
gSrv: s,
msgPublisher: NewChannelDemultiplexer(),
lock: &sync.RWMutex{},
deadEndpoints: make(chan common.PKIidType, 100),
stopping: int32(0),
exitChan: make(chan struct{}, 1),
subscriptions: make([]chan proto.ReceivedMessage, 0),
blackListedPKIIDs: make([]common.PKIidType, 0),
selfCertHash: certHash,
PKIID: idMapper.GetPKIidOfCert(peerIdentity),
idMapper: idMapper,
logger: util.GetLogger(util.LoggingCommModule, fmt.Sprintf("%d", port)),
peerIdentity: peerIdentity,
opts: dialOpts,
port: port,
lsnr: ll,
gSrv: s,
msgPublisher: NewChannelDemultiplexer(),
lock: &sync.RWMutex{},
deadEndpoints: make(chan common.PKIidType, 100),
stopping: int32(0),
exitChan: make(chan struct{}, 1),
subscriptions: make([]chan proto.ReceivedMessage, 0),
}
commInst.connStore = newConnStore(commInst, commInst.logger)
commInst.idMapper.Put(idMapper.GetPKIidOfCert(peerIdentity), peerIdentity)
Expand Down Expand Up @@ -145,25 +144,24 @@ func NewCommInstance(s *grpc.Server, cert *tls.Certificate, idStore identity.Map
}

type commImpl struct {
skipHandshake bool
selfCertHash []byte
peerIdentity api.PeerIdentityType
idMapper identity.Mapper
logger *logging.Logger
opts []grpc.DialOption
connStore *connectionStore
PKIID []byte
port int
deadEndpoints chan common.PKIidType
msgPublisher *ChannelDeMultiplexer
lock *sync.RWMutex
lsnr net.Listener
gSrv *grpc.Server
exitChan chan struct{}
stopping int32
stopWG sync.WaitGroup
subscriptions []chan proto.ReceivedMessage
blackListedPKIIDs []common.PKIidType
skipHandshake bool
selfCertHash []byte
peerIdentity api.PeerIdentityType
idMapper identity.Mapper
logger *logging.Logger
opts []grpc.DialOption
connStore *connectionStore
PKIID []byte
port int
deadEndpoints chan common.PKIidType
msgPublisher *ChannelDeMultiplexer
lock *sync.RWMutex
lsnr net.Listener
gSrv *grpc.Server
exitChan chan struct{}
stopping int32
stopWG sync.WaitGroup
subscriptions []chan proto.ReceivedMessage
}

func (c *commImpl) createConnection(endpoint string, expectedPKIID common.PKIidType) (*connection, error) {
Expand Down Expand Up @@ -238,28 +236,6 @@ func (c *commImpl) Send(msg *proto.SignedGossipMessage, peers ...*RemotePeer) {
}
}

func (c *commImpl) BlackListPKIid(PKIID common.PKIidType) {
c.logger.Info("Entering", PKIID)
defer c.logger.Info("Exiting")
c.lock.Lock()
defer c.lock.Unlock()
c.connStore.closeByPKIid(PKIID)
c.blackListedPKIIDs = append(c.blackListedPKIIDs, PKIID)
}

func (c *commImpl) isPKIblackListed(p common.PKIidType) bool {
c.lock.RLock()
defer c.lock.RUnlock()
for _, pki := range c.blackListedPKIIDs {
if bytes.Equal(pki, p) {
c.logger.Debug(p, ":", true)
return true
}
}
c.logger.Debug(p, ":", false)
return false
}

func (c *commImpl) sendToEndpoint(peer *RemotePeer, msg *proto.SignedGossipMessage) {
if c.isStopping() {
return
Expand Down Expand Up @@ -464,10 +440,6 @@ func (c *commImpl) authenticateRemotePeer(stream stream) (*proto.ConnectionInfo,
return nil, fmt.Errorf("%s didn't send a pkiID", remoteAddress)
}

if c.isPKIblackListed(receivedMsg.PkiId) {
c.logger.Warning("Connection attempt from", remoteAddress, "but it is black-listed")
return nil, errors.New("Black-listed")
}
c.logger.Debug("Received", receivedMsg, "from", remoteAddress)
err = c.idMapper.Put(receivedMsg.PkiId, receivedMsg.Cert)
if err != nil {
Expand Down
85 changes: 32 additions & 53 deletions gossip/comm/comm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -295,65 +295,44 @@ func TestGetConnectionInfo(t *testing.T) {
}
}

func TestBlackListPKIid(t *testing.T) {
func TestCloseConn(t *testing.T) {
t.Parallel()
comm1, _ := newCommInstance(1611, naiveSec)
comm2, _ := newCommInstance(1612, naiveSec)
comm3, _ := newCommInstance(1613, naiveSec)
comm4, _ := newCommInstance(1614, naiveSec)
defer comm1.Stop()
defer comm2.Stop()
defer comm3.Stop()
defer comm4.Stop()
acceptChan := comm1.Accept(acceptAll)

reader := func(instance string, out chan uint64, in <-chan proto.ReceivedMessage) {
for {
msg := <-in
if msg == nil {
return
}
out <- msg.GetGossipMessage().Nonce
}
err := generateCertificates("key.pem", "cert.pem")
defer os.Remove("cert.pem")
defer os.Remove("key.pem")
cert, err := tls.LoadX509KeyPair("cert.pem", "key.pem")
tlsCfg := &tls.Config{
InsecureSkipVerify: true,
Certificates: []tls.Certificate{cert},
}
ta := credentials.NewTLS(tlsCfg)

out1 := make(chan uint64, 4)
out2 := make(chan uint64, 4)
out3 := make(chan uint64, 4)
out4 := make(chan uint64, 4)

go reader("comm1", out1, comm1.Accept(acceptAll))
go reader("comm2", out2, comm2.Accept(acceptAll))
go reader("comm3", out3, comm3.Accept(acceptAll))
go reader("comm4", out4, comm4.Accept(acceptAll))

// have comm1 BL comm3
comm1.BlackListPKIid([]byte("localhost:1613"))

// make comm3 send to 1 and 2
comm3.Send(createGossipMsg(), remotePeer(1612)) // out2++
comm3.Send(createGossipMsg(), remotePeer(1611))

waitForMessages(t, out2, 1, "comm2 should have received 1 message")

// make comm1 and comm2 send to comm3
comm1.Send(createGossipMsg(), remotePeer(1613))
comm2.Send(createGossipMsg(), remotePeer(1613)) // out3++
waitForMessages(t, out3, 1, "comm3 should have received 1 message")

// make comm1 and comm2 send to comm4 which is not blacklisted // out4 += 4
comm1.Send(createGossipMsg(), remotePeer(1614))
comm2.Send(createGossipMsg(), remotePeer(1614))
comm1.Send(createGossipMsg(), remotePeer(1614))
comm2.Send(createGossipMsg(), remotePeer(1614))

// blacklist comm3 by comm2
comm2.BlackListPKIid([]byte("localhost:1613"))

// send from comm1 and comm2 to comm3 again
comm1.Send(createGossipMsg(), remotePeer(1613)) // shouldn't have an effect
comm2.Send(createGossipMsg(), remotePeer(1613)) // shouldn't have an effect

waitForMessages(t, out4, 4, "comm1 should have received 4 messages")
conn, err := grpc.Dial("localhost:1611", grpc.WithTransportCredentials(&authCreds{tlsCreds: ta}), grpc.WithBlock(), grpc.WithTimeout(time.Second))
assert.NoError(t, err, "%v", err)
cl := proto.NewGossipClient(conn)
stream, err := cl.GossipStream(context.Background())
assert.NoError(t, err, "%v", err)
c := &commImpl{}
hash := certHashFromRawCert(tlsCfg.Certificates[0].Certificate[0])
connMsg := c.createConnectionMsg(common.PKIidType("pkiID"), hash, api.PeerIdentityType("pkiID"), func(msg []byte) ([]byte, error) {
mac := hmac.New(sha256.New, hmacKey)
mac.Write(msg)
return mac.Sum(nil), nil
})
assert.NoError(t, stream.Send(connMsg.Envelope))
stream.Send(createGossipMsg().Envelope)
select {
case <-acceptChan:
case <-time.After(time.Second):
assert.Fail(t, "Didn't receive a message within a timely period")
}
comm1.CloseConn(&RemotePeer{PKIID: common.PKIidType("pkiID")})
time.Sleep(time.Second * 10)
assert.Error(t, stream.Send(createGossipMsg().Envelope), "Should have failed because connection is closed")
}

func TestParallelSend(t *testing.T) {
Expand Down
4 changes: 4 additions & 0 deletions gossip/gossip/certstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,10 @@ func (cs *certStore) createIdentityMessage() *proto.SignedGossipMessage {
return sMsg
}

func (cs *certStore) listRevokedPeers(isSuspected api.PeerSuspector) []common.PKIidType {
return cs.idMapper.ListRevokedPeers(isSuspected)
}

func (cs *certStore) stop() {
cs.pull.Stop()
}
4 changes: 4 additions & 0 deletions gossip/gossip/gossip.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,10 @@ type Gossip interface {
// JoinChan makes the Gossip instance join a channel
JoinChan(joinMsg api.JoinChannelMessage, chainID common.ChainID)

// SuspectPeers makes the gossip instance validate identities of suspected peers, and close
// any connections to peers with identities that are found invalid
SuspectPeers(s api.PeerSuspector)

// Stop stops the gossip component
Stop()
}
Expand Down
8 changes: 8 additions & 0 deletions gossip/gossip/gossip_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,14 @@ func (g *gossipServiceImpl) JoinChan(joinMsg api.JoinChannelMessage, chainID com
}
}

// SuspectPeers makes the gossip instance validate identities of suspected peers, and close
// any connections to peers with identities that are found invalid
func (g *gossipServiceImpl) SuspectPeers(isSuspected api.PeerSuspector) {
for _, pkiID := range g.certStore.listRevokedPeers(isSuspected) {
g.comm.CloseConn(&comm.RemotePeer{PKIID: pkiID})
}
}

func (g *gossipServiceImpl) learnAnchorPeers(orgOfAnchorPeers api.OrgIdentityType, anchorPeers []api.AnchorPeer) {
for _, ap := range anchorPeers {
if ap.Host == "" {
Expand Down
Loading

0 comments on commit 9d12166

Please sign in to comment.