Skip to content

Commit

Permalink
[FAB-5764] Errors handling - 1
Browse files Browse the repository at this point in the history
All packages under gossip except gossip/gossip.
New errors created using github.com/pkg/errors, stack trace added to
returned errors using WithStack.
In case of returned error was incorporated in new one, now it is
wrapped using Wrap.
%+v added to each logger call that output error.

Change-Id: I4c7a3517da6c34bedffd68fe336e940e68ebcfe3
Signed-off-by: Gennady Laventman <[email protected]>
  • Loading branch information
gennadylaventman authored and yacovm committed Aug 26, 2017
1 parent 01adda9 commit f257f3d
Show file tree
Hide file tree
Showing 13 changed files with 110 additions and 103 deletions.
48 changes: 25 additions & 23 deletions gossip/comm/comm_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ package comm
import (
"bytes"
"crypto/tls"
"errors"
"fmt"
"net"
"reflect"
Expand All @@ -23,6 +22,7 @@ import (
"github.com/hyperledger/fabric/gossip/util"
proto "github.com/hyperledger/fabric/protos/gossip"
"github.com/op/go-logging"
"github.com/pkg/errors"
"github.com/spf13/viper"
"golang.org/x/net/context"
"google.golang.org/grpc"
Expand Down Expand Up @@ -106,7 +106,7 @@ func NewCommInstance(s *grpc.Server, cert *tls.Certificate, idStore identity.Map
dialOpts = append(dialOpts, grpc.WithTimeout(util.GetDurationOrDefault("peer.gossip.dialTimeout", defDialTimeout)))
commInst, err := NewCommInstanceWithServer(-1, idStore, peerIdentity, secureDialOpts, dialOpts...)
if err != nil {
return nil, err
return nil, errors.WithStack(err)
}

if cert != nil {
Expand Down Expand Up @@ -163,7 +163,7 @@ func (c *commImpl) createConnection(endpoint string, expectedPKIID common.PKIidT
dialOpts = append(dialOpts, c.opts...)
cc, err = grpc.Dial(endpoint, dialOpts...)
if err != nil {
return nil, err
return nil, errors.WithStack(err)
}

cl := proto.NewGossipClient(cc)
Expand All @@ -172,7 +172,7 @@ func (c *commImpl) createConnection(endpoint string, expectedPKIID common.PKIidT
defer cancel()
if _, err = cl.Ping(ctx, &proto.Empty{}); err != nil {
cc.Close()
return nil, err
return nil, errors.WithStack(err)
}

ctx, cf := context.WithCancel(context.Background())
Expand Down Expand Up @@ -204,10 +204,10 @@ func (c *commImpl) createConnection(endpoint string, expectedPKIID common.PKIidT
conn.handler = h
return conn, nil
}
c.logger.Warning("Authentication failed:", err)
c.logger.Warningf("Authentication failed: %+v", err)
}
cc.Close()
return nil, err
return nil, errors.WithStack(err)
}

func (c *commImpl) Send(msg *proto.SignedGossipMessage, peers ...*RemotePeer) {
Expand Down Expand Up @@ -235,13 +235,15 @@ func (c *commImpl) sendToEndpoint(peer *RemotePeer, msg *proto.SignedGossipMessa
conn, err := c.connStore.getConnection(peer)
if err == nil {
disConnectOnErr := func(err error) {
c.logger.Warning(peer, "isn't responsive:", err)
err = errors.WithStack(err)
c.logger.Warningf("%v isn't responsive: %+v", peer, err)
c.disconnect(peer.PKIID)
}
conn.send(msg, disConnectOnErr)
return
}
c.logger.Warning("Failed obtaining connection for", peer, "reason:", err)
err = errors.WithStack(err)
c.logger.Warningf("Failed obtaining connection for %v reason: %+v", peer, err)
c.disconnect(peer.PKIID)
}

Expand All @@ -263,15 +265,15 @@ func (c *commImpl) Probe(remotePeer *RemotePeer) error {

cc, err := grpc.Dial(remotePeer.Endpoint, dialOpts...)
if err != nil {
c.logger.Debug("Returning", err)
c.logger.Debugf("Returning %v", err)
return err
}
defer cc.Close()
cl := proto.NewGossipClient(cc)
ctx, cancel := context.WithTimeout(context.Background(), defConnTimeout)
defer cancel()
_, err = cl.Ping(ctx, &proto.Empty{})
c.logger.Debug("Returning", err)
c.logger.Debugf("Returning %v", err)
return err
}

Expand Down Expand Up @@ -300,11 +302,11 @@ func (c *commImpl) Handshake(remotePeer *RemotePeer) (api.PeerIdentityType, erro
}
connInfo, err := c.authenticateRemotePeer(stream)
if err != nil {
c.logger.Warning("Authentication failed:", err)
c.logger.Warningf("Authentication failed: %v", err)
return nil, err
}
if len(remotePeer.PKIID) > 0 && !bytes.Equal(connInfo.ID, remotePeer.PKIID) {
return nil, errors.New("PKI-ID of remote peer doesn't match expected PKI-ID")
return nil, fmt.Errorf("PKI-ID of remote peer doesn't match expected PKI-ID")
}
return connInfo.Identity, nil
}
Expand Down Expand Up @@ -424,7 +426,7 @@ func (c *commImpl) authenticateRemotePeer(stream stream) (*proto.ConnectionInfo,
// TLS enabled but not detected on other side
if useTLS && len(remoteCertHash) == 0 {
c.logger.Warningf("%s didn't send TLS certificate", remoteAddress)
return nil, errors.New("No TLS certificate")
return nil, fmt.Errorf("No TLS certificate")
}

cMsg, err = c.createConnectionMsg(c.PKIID, c.selfCertHash, c.peerIdentity, signer)
Expand All @@ -442,18 +444,18 @@ func (c *commImpl) authenticateRemotePeer(stream stream) (*proto.ConnectionInfo,
receivedMsg := m.GetConn()
if receivedMsg == nil {
c.logger.Warning("Expected connection message from", remoteAddress, "but got", receivedMsg)
return nil, errors.New("Wrong type")
return nil, fmt.Errorf("Wrong type")
}

if receivedMsg.PkiId == nil {
c.logger.Warning("%s didn't send a pkiID", remoteAddress)
return nil, errors.New("No PKI-ID")
return nil, fmt.Errorf("No PKI-ID")
}

c.logger.Debug("Received", receivedMsg, "from", remoteAddress)
err = c.idMapper.Put(receivedMsg.PkiId, receivedMsg.Identity)
if err != nil {
c.logger.Warning("Identity store rejected", remoteAddress, ":", err)
c.logger.Warningf("Identity store rejected %s : %v", remoteAddress, err)
return nil, err
}

Expand All @@ -468,15 +470,15 @@ func (c *commImpl) authenticateRemotePeer(stream stream) (*proto.ConnectionInfo,
// If the remote peer sent its TLS certificate, make sure it actually matches the TLS cert
// that the peer used.
if !bytes.Equal(remoteCertHash, receivedMsg.TlsCertHash) {
return nil, fmt.Errorf("Expected %v in remote hash of TLS cert, but got %v", remoteCertHash, receivedMsg.TlsCertHash)
return nil, errors.Errorf("Expected %v in remote hash of TLS cert, but got %v", remoteCertHash, receivedMsg.TlsCertHash)
}
verifier := func(peerIdentity []byte, signature, message []byte) error {
pkiID := c.idMapper.GetPKIidOfCert(api.PeerIdentityType(peerIdentity))
return c.idMapper.Verify(pkiID, signature, message)
}
err = m.Verify(receivedMsg.Identity, verifier)
if err != nil {
c.logger.Error("Failed verifying signature from", remoteAddress, ":", err)
c.logger.Error("Failed verifying signature from %s : %v", remoteAddress, err)
return nil, err
}
connInfo.Auth = &proto.AuthInfo{
Expand All @@ -496,7 +498,7 @@ func (c *commImpl) GossipStream(stream proto.Gossip_GossipStreamServer) error {
}
connInfo, err := c.authenticateRemotePeer(stream)
if err != nil {
c.logger.Error("Authentication failed:", err)
c.logger.Errorf("Authentication failed: %v", err)
return err
}
c.logger.Debug("Servicing", extractRemoteAddress(stream))
Expand Down Expand Up @@ -564,16 +566,16 @@ func readWithTimeout(stream interface{}, timeout time.Duration, address string)
incChan <- msg
}
} else {
panic(fmt.Errorf("Stream isn't a GossipStreamServer or a GossipStreamClient, but %v. Aborting", reflect.TypeOf(stream)))
panic(errors.Errorf("Stream isn't a GossipStreamServer or a GossipStreamClient, but %v. Aborting", reflect.TypeOf(stream)))
}
}()
select {
case <-time.NewTicker(timeout).C:
return nil, fmt.Errorf("Timed out waiting for connection message from %s", address)
return nil, errors.Errorf("Timed out waiting for connection message from %s", address)
case m := <-incChan:
return m, nil
case err := <-errChan:
return nil, err
return nil, errors.WithStack(err)
}
}

Expand All @@ -593,7 +595,7 @@ func (c *commImpl) createConnectionMsg(pkiID common.PKIidType, certHash []byte,
GossipMessage: m,
}
_, err := sMsg.Sign(signer)
return sMsg, err
return sMsg, errors.WithStack(err)
}

type stream interface {
Expand Down
16 changes: 8 additions & 8 deletions gossip/comm/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,14 @@ SPDX-License-Identifier: Apache-2.0
package comm

import (
"errors"
"sync"
"sync/atomic"

"github.com/hyperledger/fabric/gossip/common"
"github.com/hyperledger/fabric/gossip/util"
proto "github.com/hyperledger/fabric/protos/gossip"
"github.com/op/go-logging"
"github.com/pkg/errors"
"golang.org/x/net/context"
"google.golang.org/grpc"
)
Expand Down Expand Up @@ -103,7 +103,7 @@ func (cs *connectionStore) getConnection(peer *RemotePeer) (*connection, error)

// no one connected to us AND we failed connecting!
if err != nil {
return nil, err
return nil, errors.WithStack(err)
}

// at this point in the code, we created a connection to a remote peer
Expand Down Expand Up @@ -285,7 +285,7 @@ func (conn *connection) serviceConnection() error {
conn.stopChan <- stop
return nil
case err := <-errChan:
return err
return errors.WithStack(err)
case msg := <-msgChan:
conn.handler(msg)
}
Expand All @@ -304,7 +304,7 @@ func (conn *connection) writeToStream() {
case m := <-conn.outBuff:
err := stream.Send(m.envelope)
if err != nil {
go m.onErr(err)
go m.onErr(errors.WithStack(err))
return
}
case stop := <-conn.stopChan:
Expand Down Expand Up @@ -333,13 +333,13 @@ func (conn *connection) readFromStream(errChan chan error, msgChan chan *proto.S
}
if err != nil {
errChan <- err
conn.logger.Debug(conn.pkiID, "Got error, aborting:", err)
conn.logger.Debugf("%v Got error, aborting: %+v", conn.pkiID, errors.WithStack(err))
return
}
msg, err := envelope.ToGossipMessage()
if err != nil {
errChan <- err
conn.logger.Warning(conn.pkiID, "Got error, aborting:", err)
conn.logger.Warning("%v Got error, aborting: %+v", conn.pkiID, errors.WithStack(err))
}
msgChan <- msg
}
Expand All @@ -350,8 +350,8 @@ func (conn *connection) getStream() stream {
defer conn.Unlock()

if conn.clientStream != nil && conn.serverStream != nil {
e := "Both client and server stream are not nil, something went wrong"
conn.logger.Error(e)
e := errors.New("Both client and server stream are not nil, something went wrong")
conn.logger.Errorf("%+v", e)
}

if conn.clientStream != nil {
Expand Down
3 changes: 2 additions & 1 deletion gossip/comm/crypto.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,12 @@ import (
"crypto/tls"
"crypto/x509"
"encoding/pem"
"errors"
"fmt"
"math/big"
"os"

"errors"

"github.com/hyperledger/fabric/common/util"
gutil "github.com/hyperledger/fabric/gossip/util"
"golang.org/x/net/context"
Expand Down
4 changes: 3 additions & 1 deletion gossip/comm/msg.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"sync"

proto "github.com/hyperledger/fabric/protos/gossip"
"github.com/pkg/errors"
)

// ReceivedMessageImpl is an implementation of ReceivedMessage
Expand All @@ -30,7 +31,8 @@ func (m *ReceivedMessageImpl) GetSourceEnvelope() *proto.Envelope {
func (m *ReceivedMessageImpl) Respond(msg *proto.GossipMessage) {
sMsg, err := msg.NoopSign()
if err != nil {
m.conn.logger.Error("Failed creating SignedGossipMessage:", err)
err = errors.WithStack(err)
m.conn.logger.Errorf("Failed creating SignedGossipMessage: %+v", err)
return
}
m.conn.send(sMsg, func(e error) {})
Expand Down
6 changes: 4 additions & 2 deletions gossip/common/metastate.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ package common
import (
"bytes"
"encoding/binary"

"github.com/pkg/errors"
)

// NodeMetastate information to store the information about current
Expand All @@ -32,7 +34,7 @@ func (n *NodeMetastate) Bytes() ([]byte, error) {
// with FromBytes function
err := binary.Write(buffer, binary.BigEndian, *n)
if err != nil {
return nil, err
return nil, errors.WithStack(err)
}
return buffer.Bytes(), nil
}
Expand All @@ -56,7 +58,7 @@ func FromBytes(buf []byte) (*NodeMetastate, error) {
// done using same order
err := binary.Read(reader, binary.BigEndian, &state)
if err != nil {
return nil, err
return nil, errors.WithStack(err)
}
return &state, nil
}
Loading

0 comments on commit f257f3d

Please sign in to comment.