diff --git a/orderer/common/cluster/deliver.go b/orderer/common/cluster/deliver.go index e66ddb3abd5..10a9c74b48e 100644 --- a/orderer/common/cluster/deliver.go +++ b/orderer/common/cluster/deliver.go @@ -39,7 +39,7 @@ type BlockPuller struct { VerifyBlockSequence BlockSequenceVerifier Endpoints []string // Internal state - stream *impatientStream + stream *ImpatientStream blockBuff []*common.Block latestSeq uint64 endpoint string @@ -182,8 +182,8 @@ func (p *BlockPuller) pullBlocks(seq uint64, reConnected bool) error { return nil } -func (p *BlockPuller) obtainStream(reConnected bool, env *common.Envelope, seq uint64) (*impatientStream, error) { - var stream *impatientStream +func (p *BlockPuller) obtainStream(reConnected bool, env *common.Envelope, seq uint64) (*ImpatientStream, error) { + var stream *ImpatientStream var err error if reConnected { p.Logger.Infof("Sending request for block %d to %s", seq, p.endpoint) @@ -347,10 +347,10 @@ func (p *BlockPuller) fetchLastBlockSeq(minRequestedSequence uint64, endpoint st return block.Header.Number, nil } -// requestBlocks starts requesting blocks from the given endpoint, using the given streamCreator by sending +// requestBlocks starts requesting blocks from the given endpoint, using the given ImpatientStreamCreator by sending // the given envelope. // It returns a stream that is used to pull blocks, or error if something goes wrong. -func (p *BlockPuller) requestBlocks(endpoint string, newStream streamCreator, env *common.Envelope) (*impatientStream, error) { +func (p *BlockPuller) requestBlocks(endpoint string, newStream ImpatientStreamCreator, env *common.Envelope) (*ImpatientStream, error) { stream, err := newStream() if err != nil { p.Logger.Warningf("Failed establishing deliver stream with %s", endpoint) @@ -455,22 +455,23 @@ func (eib endpointInfoBucket) byEndpoints() map[string]*endpointInfo { return infoByEndpoints } -type streamCreator func() (*impatientStream, error) +// ImpatientStreamCreator creates an ImpatientStream +type ImpatientStreamCreator func() (*ImpatientStream, error) -// impatientStream aborts the stream if it waits for too long for a message. -type impatientStream struct { +// ImpatientStream aborts the stream if it waits for too long for a message. +type ImpatientStream struct { waitTimeout time.Duration orderer.AtomicBroadcast_DeliverClient cancelFunc func() } -func (stream *impatientStream) abort() { +func (stream *ImpatientStream) abort() { stream.cancelFunc() } // Recv blocks until a response is received from the stream or the // timeout expires. -func (stream *impatientStream) Recv() (*orderer.DeliverResponse, error) { +func (stream *ImpatientStream) Recv() (*orderer.DeliverResponse, error) { // Initialize a timeout to cancel the stream when it expires timeout := time.NewTimer(stream.waitTimeout) defer timeout.Stop() @@ -498,9 +499,9 @@ func (stream *impatientStream) Recv() (*orderer.DeliverResponse, error) { } } -// NewImpatientStream returns a streamCreator that creates impatientStreams. -func NewImpatientStream(conn *grpc.ClientConn, waitTimeout time.Duration) streamCreator { - return func() (*impatientStream, error) { +// NewImpatientStream returns a ImpatientStreamCreator that creates impatientStreams. +func NewImpatientStream(conn *grpc.ClientConn, waitTimeout time.Duration) ImpatientStreamCreator { + return func() (*ImpatientStream, error) { abc := orderer.NewAtomicBroadcastClient(conn) ctx, cancel := context.WithCancel(context.Background()) @@ -511,7 +512,7 @@ func NewImpatientStream(conn *grpc.ClientConn, waitTimeout time.Duration) stream } once := &sync.Once{} - return &impatientStream{ + return &ImpatientStream{ waitTimeout: waitTimeout, // The stream might be canceled while Close() is being called, but also // while a timeout expires, so ensure it's only called once. diff --git a/orderer/common/cluster/replication.go b/orderer/common/cluster/replication.go index b1693b3fe29..a55cc95697e 100644 --- a/orderer/common/cluster/replication.go +++ b/orderer/common/cluster/replication.go @@ -209,7 +209,7 @@ func (r *Replicator) channelsToPull(channels []string) []string { puller.Close() // Restore the previous buffer size puller.MaxTotalBufferBytes = bufferSize - if err == NotInChannelError { + if err == ErrNotInChannel { r.Logger.Info("I do not belong to channel", channel, ", skipping chain retrieval") continue } @@ -305,8 +305,8 @@ type ChainInspector struct { LastConfigBlock *common.Block } -// NotInChannelError denotes that an ordering node is not in the channel -var NotInChannelError = errors.New("not in the channel") +// ErrNotInChannel denotes that an ordering node is not in the channel +var ErrNotInChannel = errors.New("not in the channel") // selfMembershipPredicate determines whether the caller is found in the given config block type selfMembershipPredicate func(configBlock *common.Block) error diff --git a/orderer/common/cluster/replication_test.go b/orderer/common/cluster/replication_test.go index b44968178f3..b6e15de430b 100644 --- a/orderer/common/cluster/replication_test.go +++ b/orderer/common/cluster/replication_test.go @@ -303,7 +303,7 @@ func TestReplicateChainsGreenPath(t *testing.T) { // For channel A amIPartOfChannelMock.On("func2").Return(nil).Once() // For channel B - amIPartOfChannelMock.On("func2").Return(cluster.NotInChannelError).Once() + amIPartOfChannelMock.On("func2").Return(cluster.ErrNotInChannel).Once() // 22 is for the system channel, and 31 is for channel A blocksCommittedToLedger := make(chan *common.Block, 22+31) @@ -512,7 +512,7 @@ func TestParticipant(t *testing.T) { }, latestConfigBlockSeq: 42, latestConfigBlock: &common.Block{Header: &common.BlockHeader{Number: 42}}, - predicateReturns: cluster.NotInChannelError, + predicateReturns: cluster.ErrNotInChannel, }, } { t.Run(testCase.name, func(t *testing.T) { diff --git a/orderer/common/cluster/util.go b/orderer/common/cluster/util.go index f418b6a7c3d..fc4edaf9dba 100644 --- a/orderer/common/cluster/util.go +++ b/orderer/common/cluster/util.go @@ -115,23 +115,23 @@ func (dialer *PredicateDialer) ClientConfig() (comm.ClientConfig, error) { if val == nil { return comm.ClientConfig{}, errors.New("client config not initialized") } - if cc, isClientConfig := val.(comm.ClientConfig); !isClientConfig { + cc, isClientConfig := val.(comm.ClientConfig) + if !isClientConfig { err := errors.Errorf("value stored is %v, not comm.ClientConfig", reflect.TypeOf(val)) return comm.ClientConfig{}, err - } else { - if cc.SecOpts == nil { - return comm.ClientConfig{}, errors.New("SecOpts is nil") - } - // Copy by value the secure options - secOpts := *cc.SecOpts - return comm.ClientConfig{ - AsyncConnect: cc.AsyncConnect, - Timeout: cc.Timeout, - SecOpts: &secOpts, - KaOpts: cc.KaOpts, - }, nil } + if cc.SecOpts == nil { + return comm.ClientConfig{}, errors.New("SecOpts is nil") + } + // Copy by value the secure options + secOpts := *cc.SecOpts + return comm.ClientConfig{ + AsyncConnect: cc.AsyncConnect, + Timeout: cc.Timeout, + SecOpts: &secOpts, + KaOpts: cc.KaOpts, + }, nil } // SetConfig sets the configuration of the PredicateDialer @@ -182,6 +182,7 @@ type StandardDialer struct { Dialer *PredicateDialer } +// Dial dials to the given address func (bdp *StandardDialer) Dial(address string) (*grpc.ClientConn, error) { return bdp.Dialer.Dial(address, nil) } @@ -229,7 +230,7 @@ func VerifyBlocks(blockBuff []*common.Block, signatureVerifier BlockVerifier) er // during iteration over the block batch. for _, block := range blockBuff { configFromBlock, err := ConfigFromBlock(block) - if err == notAConfig { + if err == errNotAConfig { continue } if err != nil { @@ -247,7 +248,7 @@ func VerifyBlocks(blockBuff []*common.Block, signatureVerifier BlockVerifier) er return VerifyBlockSignature(lastBlock, signatureVerifier, config) } -var notAConfig = errors.New("not a config block") +var errNotAConfig = errors.New("not a config block") // ConfigFromBlock returns a ConfigEnvelope if exists, or a *NotAConfigBlock error. // It may also return some other error in case parsing failed. @@ -272,7 +273,7 @@ func ConfigFromBlock(block *common.Block) (*common.ConfigEnvelope, error) { return nil, errors.WithStack(err) } if common.HeaderType(chdr.Type) != common.HeaderType_CONFIG { - return nil, notAConfig + return nil, errNotAConfig } configEnvelope, err := configtx.UnmarshalConfigEnvelope(payload.Data) if err != nil { diff --git a/orderer/consensus/etcdraft/util.go b/orderer/consensus/etcdraft/util.go index 23c1c13a48f..120a0b81931 100644 --- a/orderer/consensus/etcdraft/util.go +++ b/orderer/consensus/etcdraft/util.go @@ -371,5 +371,5 @@ func (conCert ConsenterCertificate) IsConsenterOfChannel(configBlock *common.Blo return nil } } - return cluster.NotInChannelError + return cluster.ErrNotInChannel } diff --git a/orderer/consensus/etcdraft/util_test.go b/orderer/consensus/etcdraft/util_test.go index 98d87e9fad3..d700b911fd8 100644 --- a/orderer/consensus/etcdraft/util_test.go +++ b/orderer/consensus/etcdraft/util_test.go @@ -79,7 +79,7 @@ func TestIsConsenterOfChannel(t *testing.T) { name: "valid config block with cert mismatch", configBlock: validBlock(), certificate: certInsideConfigBlock[2:], - expectedError: cluster.NotInChannelError.Error(), + expectedError: cluster.ErrNotInChannel.Error(), }, { name: "valid config block with matching cert",