Skip to content

Commit

Permalink
[FAB-5264] Move interfaces from multichannel
Browse files Browse the repository at this point in the history
The multichannel (formerly multichain) package became a bit of a dumping
ground for all of the interface definitions used in the orderer system.
this makes the multichannel package clunky, difficult to mock, and
encourages poor separation between components.

This CR extracts the interface definitions and moves them to more
sensible locations based on the natural separation of function within
the orderer codebase.

It is also fixed the multichannel package to generally return pointers
to structs, rather than to interfaces, following the golang best
practice of accepting interfaces and returning structs.

Change-Id: Iaf004e1dadf7bf92d106bd7c90f244e0089b9924
Signed-off-by: Jason Yellick <[email protected]>
  • Loading branch information
Jason Yellick committed Jul 26, 2017
1 parent d6b54c8 commit 9b1490e
Show file tree
Hide file tree
Showing 17 changed files with 283 additions and 243 deletions.
42 changes: 42 additions & 0 deletions orderer/common/msgprocessor/api.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
Copyright IBM Corp. 2017 All Rights Reserved.
SPDX-License-Identifier: Apache-2.0
*/

// Package msgprocessor provides the implementations for processing of the assorted message
// types which may arrive in the system through Broadcast.
package msgprocessor

import (
cb "github.com/hyperledger/fabric/protos/common"
)

// Classification represents the possible message types for the system.
type Classification int

const (
// NormalMsg is the class of standard (endorser or otherwise non-config) messages.
// Messages of this type should be processed by ProcessNormalMsg.
NormalMsg Classification = iota

// ConfigUpdateMsg is the class of configuration related messages.
// Messages of this type should be processed by ProcessConfigUpdateMsg.
ConfigUpdateMsg
)

// Processor provides the methods necessary to classify and process any message which
// arrives through the Broadcast interface.
type Processor interface {
// ClassifyMsg inspects the message to determine which type of processing is necessary
ClassifyMsg(env *cb.Envelope) (Classification, error)

// ProcessNormalMsg will check the validity of a message based on the current configuration. It returns the current
// configuration sequence number and nil on success, or an error if the message is not valid
ProcessNormalMsg(env *cb.Envelope) (configSeq uint64, err error)

// ProcessConfigUpdateMsg will attempt to apply the config update to the current configuration, and if successful
// return the resulting config message and the configSeq the config was computed from. If the config update message
// is invalid, an error is returned.
ProcessConfigUpdateMsg(env *cb.Envelope) (config *cb.Envelope, configSeq uint64, err error)
}
159 changes: 42 additions & 117 deletions orderer/common/multichannel/chainsupport.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,111 +19,25 @@ package multichannel
import (
"fmt"

"github.com/hyperledger/fabric/common/config"
"github.com/hyperledger/fabric/common/crypto"
"github.com/hyperledger/fabric/common/policies"
"github.com/hyperledger/fabric/common/util"
"github.com/hyperledger/fabric/orderer/common/blockcutter"
"github.com/hyperledger/fabric/orderer/common/broadcast"
"github.com/hyperledger/fabric/orderer/common/configtxfilter"
"github.com/hyperledger/fabric/orderer/common/deliver"
"github.com/hyperledger/fabric/orderer/common/filter"
"github.com/hyperledger/fabric/orderer/common/ledger"
"github.com/hyperledger/fabric/orderer/common/msgprocessor"
"github.com/hyperledger/fabric/orderer/common/sigfilter"
"github.com/hyperledger/fabric/orderer/common/sizefilter"
"github.com/hyperledger/fabric/orderer/consensus"
cb "github.com/hyperledger/fabric/protos/common"
"github.com/hyperledger/fabric/protos/utils"
)

// MsgClassification represents the types of possible messages.
type MsgClassification int

const (
// NormalMsg is the class of standard (endorser or otherwise non-config) messages.
// Messages of this type should be processed by ProcessNormalMsg.
NormalMsg MsgClassification = iota

// ConfigUpdateMsg is the class of configuration related messages.
// Messages of this type should be processed by ProcessConfigUpdateMsg.
ConfigUpdateMsg
)

// Consenter defines the backing ordering mechanism
type Consenter interface {
// HandleChain should create and return a reference to a Chain for the given set of resources
// It will only be invoked for a given chain once per process. In general, errors will be treated
// as irrecoverable and cause system shutdown. See the description of Chain for more details
// The second argument to HandleChain is a pointer to the metadata stored on the `ORDERER` slot of
// the last block committed to the ledger of this Chain. For a new chain, this metadata will be
// nil, as this field is not set on the genesis block
HandleChain(support ConsenterSupport, metadata *cb.Metadata) (Chain, error)
}

// Chain defines a way to inject messages for ordering
// Note, that in order to allow flexibility in the implementation, it is the responsibility of the implementer
// to take the ordered messages, send them through the blockcutter.Receiver supplied via HandleChain to cut blocks,
// and ultimately write the ledger also supplied via HandleChain. This flow allows for two primary flows
// 1. Messages are ordered into a stream, the stream is cut into blocks, the blocks are committed (solo, kafka)
// 2. Messages are cut into blocks, the blocks are ordered, then the blocks are committed (sbft)
type Chain interface {
// Enqueue accepts a message and returns true on acceptance, or false on failure
Enqueue(env *cb.Envelope) bool

// Errored returns a channel which will close when an error has occurred
// This is especially useful for the Deliver client, who must terminate waiting
// clients when the consenter is not up to date
Errored() <-chan struct{}

// Start should allocate whatever resources are needed for staying up to date with the chain
// Typically, this involves creating a thread which reads from the ordering source, passes those
// messages to a block cutter, and writes the resulting blocks to the ledger
Start()

// Halt frees the resources which were allocated for this Chain
Halt()
}

// ConsenterSupport provides the resources available to a Consenter implementation
type ConsenterSupport interface {
crypto.LocalSigner
MsgProcessor
BlockCutter() blockcutter.Receiver
SharedConfig() config.Orderer
CreateNextBlock(messages []*cb.Envelope) *cb.Block
WriteBlock(block *cb.Block, encodedMetadataValue []byte) *cb.Block
WriteConfigBlock(block *cb.Block, encodedMetadataValue []byte) *cb.Block
ChainID() string // ChainID returns the chain ID this specific consenter instance is associated with
Height() uint64 // Returns the number of blocks on the chain this specific consenter instance is associated with
}

// MsgProcessor defines the methods necessary to interact with Broadcast messages.
type MsgProcessor interface {
// ClassifyMsg inspects the message to determine which type of processing is necessary.
ClassifyMsg(env *cb.Envelope) (MsgClassification, error)

// ProcessNormalMsg will check the validity of a message based on the current configuration. It returns the current
// configuration sequence number and nil on success, or an error if the message is not valid
ProcessNormalMsg(env *cb.Envelope) (configSeq uint64, err error)

// ProcessConfigUpdateMsg will attempt to apply the config impetus msg to the current configuration, and if successful
// return the resulting config message and the configSeq the config was computed from. If the config impetus message
// is invalid, an error is returned.
ProcessConfigUpdateMsg(env *cb.Envelope) (config *cb.Envelope, configSeq uint64, err error)
}

// ChainSupport provides a wrapper for the resources backing a chain
type ChainSupport interface {
broadcast.Support
deliver.Support
ConsenterSupport

// ProposeConfigUpdate applies a CONFIG_UPDATE to an existing config to produce a *cb.ConfigEnvelope
ProposeConfigUpdate(env *cb.Envelope) (*cb.ConfigEnvelope, error)
}

type chainSupport struct {
// ChainSupport holds the resources for a particular channel.
type ChainSupport struct {
*ledgerResources
chain Chain
chain consensus.Chain
cutter blockcutter.Receiver
filters *filter.RuleSet
signer crypto.LocalSigner
Expand All @@ -134,9 +48,9 @@ type chainSupport struct {
func newChainSupport(
filters *filter.RuleSet,
ledgerResources *ledgerResources,
consenters map[string]Consenter,
consenters map[string]consensus.Consenter,
signer crypto.LocalSigner,
) *chainSupport {
) *ChainSupport {

cutter := blockcutter.NewReceiverImpl(ledgerResources.SharedConfig(), filters)
consenterType := ledgerResources.SharedConfig().ConsensusType()
Expand All @@ -145,7 +59,7 @@ func newChainSupport(
logger.Fatalf("Error retrieving consenter of type: %s", consenterType)
}

cs := &chainSupport{
cs := &ChainSupport{
ledgerResources: ledgerResources,
cutter: cutter,
filters: filters,
Expand Down Expand Up @@ -196,39 +110,43 @@ func createStandardFilters(ledgerResources *ledgerResources) *filter.RuleSet {
}

// createSystemChainFilters creates the set of filters for the ordering system chain
func createSystemChainFilters(ml *multiLedger, ledgerResources *ledgerResources) *filter.RuleSet {
func createSystemChainFilters(r *Registrar, ledgerResources *ledgerResources) *filter.RuleSet {
return filter.NewRuleSet([]filter.Rule{
filter.EmptyRejectRule,
sizefilter.MaxBytesRule(ledgerResources.SharedConfig()),
sigfilter.New(policies.ChannelWriters, ledgerResources.PolicyManager()),
newSystemChainFilter(ledgerResources, ml),
newSystemChainFilter(ledgerResources, r),
configtxfilter.NewFilter(ledgerResources),
filter.AcceptRule,
})
}

func (cs *chainSupport) start() {
func (cs *ChainSupport) start() {
cs.chain.Start()
}

func (cs *chainSupport) NewSignatureHeader() (*cb.SignatureHeader, error) {
// NewSignatureHeader passes through to the signer NewSignatureHeader method.
func (cs *ChainSupport) NewSignatureHeader() (*cb.SignatureHeader, error) {
return cs.signer.NewSignatureHeader()
}

func (cs *chainSupport) Sign(message []byte) ([]byte, error) {
// Sign passes through to the signer Sign method.
func (cs *ChainSupport) Sign(message []byte) ([]byte, error) {
return cs.signer.Sign(message)
}

func (cs *chainSupport) Filters() *filter.RuleSet {
// Filters returns the set of filters created for this channel.
func (cs *ChainSupport) Filters() *filter.RuleSet {
return cs.filters
}

func (cs *chainSupport) BlockCutter() blockcutter.Receiver {
// BlockCutter returns the blockcutter.Receiver instance for this channel.
func (cs *ChainSupport) BlockCutter() blockcutter.Receiver {
return cs.cutter
}

// ClassifyMsg inspects the message to determine which type of processing is necessary
func (cs *chainSupport) ClassifyMsg(env *cb.Envelope) (MsgClassification, error) {
func (cs *ChainSupport) ClassifyMsg(env *cb.Envelope) (msgprocessor.Classification, error) {
payload, err := utils.UnmarshalPayload(env.Payload)
if err != nil {
return 0, fmt.Errorf("bad payload: %s", err)
Expand All @@ -245,23 +163,23 @@ func (cs *chainSupport) ClassifyMsg(env *cb.Envelope) (MsgClassification, error)

switch chdr.Type {
case int32(cb.HeaderType_CONFIG_UPDATE):
return ConfigUpdateMsg, nil
return msgprocessor.ConfigUpdateMsg, nil
case int32(cb.HeaderType_ORDERER_TRANSACTION):
return ConfigUpdateMsg, nil
return msgprocessor.ConfigUpdateMsg, nil
// XXX Eventually, these types cannot be allowed to be submitted directly
// return 0, fmt.Errorf("Transactions of type ORDERER_TRANSACTION cannot be Broadcast")
case int32(cb.HeaderType_CONFIG):
return ConfigUpdateMsg, nil
return msgprocessor.ConfigUpdateMsg, nil
// XXX Eventually, these types cannot be allowed to be submitted directly
// return 0, fmt.Errorf("Transactions of type CONFIG cannot be Broadcast")
default:
return NormalMsg, nil
return msgprocessor.NormalMsg, nil
}
}

// ProcessNormalMsg will check the validity of a message based on the current configuration. It returns the current
// configuration sequence number and nil on success, or an error if the message is not valid
func (cs *chainSupport) ProcessNormalMsg(env *cb.Envelope) (configSeq uint64, err error) {
func (cs *ChainSupport) ProcessNormalMsg(env *cb.Envelope) (configSeq uint64, err error) {
configSeq = cs.Sequence()
_, err = cs.filters.Apply(env)
return
Expand All @@ -270,27 +188,31 @@ func (cs *chainSupport) ProcessNormalMsg(env *cb.Envelope) (configSeq uint64, er
// ProcessConfigUpdateMsg will attempt to apply the config update msg to the current configuration, and if successful
// return the resulting config message and the configSeq the config was computed from. If the config update message
// is invalid, an error is returned.
func (cs *chainSupport) ProcessConfigUpdateMsg(env *cb.Envelope) (config *cb.Envelope, configSeq uint64, err error) {
func (cs *ChainSupport) ProcessConfigUpdateMsg(env *cb.Envelope) (config *cb.Envelope, configSeq uint64, err error) {
return nil, cs.Sequence(), fmt.Errorf("Config update message not yet implemented")
}

func (cs *chainSupport) Reader() ledger.Reader {
// Reader returns a reader for the underlying ledger.
func (cs *ChainSupport) Reader() ledger.Reader {
return cs.ledger
}

func (cs *chainSupport) Enqueue(env *cb.Envelope) bool {
// Enqueue takes a message and sends it to the consenter for ordering.
func (cs *ChainSupport) Enqueue(env *cb.Envelope) bool {
return cs.chain.Enqueue(env)
}

func (cs *chainSupport) Errored() <-chan struct{} {
// Errored returns whether the backing consenter has errored
func (cs *ChainSupport) Errored() <-chan struct{} {
return cs.chain.Errored()
}

func (cs *chainSupport) CreateNextBlock(messages []*cb.Envelope) *cb.Block {
// CreateNextBlock creates a new block with the next block number, and the given contents.
func (cs *ChainSupport) CreateNextBlock(messages []*cb.Envelope) *cb.Block {
return ledger.CreateNextBlock(cs.ledger, messages)
}

func (cs *chainSupport) addBlockSignature(block *cb.Block) {
func (cs *ChainSupport) addBlockSignature(block *cb.Block) {
logger.Debugf("%+v", cs)
logger.Debugf("%+v", cs.signer)

Expand All @@ -312,7 +234,7 @@ func (cs *chainSupport) addBlockSignature(block *cb.Block) {
})
}

func (cs *chainSupport) addLastConfigSignature(block *cb.Block) {
func (cs *ChainSupport) addLastConfigSignature(block *cb.Block) {
configSeq := cs.Sequence()
if configSeq > cs.lastConfigSeq {
logger.Debugf("[channel: %s] Detected lastConfigSeq transitioning from %d to %d, setting lastConfig from %d to %d", cs.ChainID(), cs.lastConfigSeq, configSeq, cs.lastConfig, block.Header.Number)
Expand All @@ -337,7 +259,8 @@ func (cs *chainSupport) addLastConfigSignature(block *cb.Block) {
})
}

func (cs *chainSupport) WriteConfigBlock(block *cb.Block, encodedMetadataValue []byte) *cb.Block {
// WriteConfigBlock should be invoked for blocks which contain a config transaction.
func (cs *ChainSupport) WriteConfigBlock(block *cb.Block, encodedMetadataValue []byte) *cb.Block {
// XXX This hacky path is temporary and will be removed by the end of this change series
// The panics here are just fine
committer, err := cs.filters.Apply(utils.UnmarshalEnvelopeOrPanic(block.Data.Data[0]))
Expand All @@ -349,7 +272,8 @@ func (cs *chainSupport) WriteConfigBlock(block *cb.Block, encodedMetadataValue [
return cs.WriteBlock(block, encodedMetadataValue)
}

func (cs *chainSupport) WriteBlock(block *cb.Block, encodedMetadataValue []byte) *cb.Block {
// WriteBlock should be invoked for blocks which contain normal transactions.
func (cs *ChainSupport) WriteBlock(block *cb.Block, encodedMetadataValue []byte) *cb.Block {
// Set the orderer-related metadata field
if encodedMetadataValue != nil {
block.Metadata.Metadata[cb.BlockMetadataIndex_ORDERER] = utils.MarshalOrPanic(&cb.Metadata{Value: encodedMetadataValue})
Expand All @@ -366,6 +290,7 @@ func (cs *chainSupport) WriteBlock(block *cb.Block, encodedMetadataValue []byte)
return block
}

func (cs *chainSupport) Height() uint64 {
// Height passes through to the underlying ledger's Height.
func (cs *ChainSupport) Height() uint64 {
return cs.Reader().Height()
}
12 changes: 6 additions & 6 deletions orderer/common/multichannel/chainsupport_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func (mc *mockCommitter) Commit() {
func TestCommitConfig(t *testing.T) {
ml := &mockLedgerReadWriter{}
cm := &mockconfigtx.Manager{}
cs := &chainSupport{
cs := &ChainSupport{
ledgerResources: &ledgerResources{configResources: &configResources{Manager: cm}, ledger: ml},
filters: filter.NewRuleSet([]filter.Rule{filter.AcceptRule}),
signer: mockCrypto(),
Expand All @@ -89,7 +89,7 @@ func TestCommitConfig(t *testing.T) {
func TestWriteBlockSignatures(t *testing.T) {
ml := &mockLedgerReadWriter{}
cm := &mockconfigtx.Manager{}
cs := &chainSupport{ledgerResources: &ledgerResources{configResources: &configResources{Manager: cm}, ledger: ml}, signer: mockCrypto()}
cs := &ChainSupport{ledgerResources: &ledgerResources{configResources: &configResources{Manager: cm}, ledger: ml}, signer: mockCrypto()}

actual := utils.GetMetadataFromBlockOrPanic(cs.WriteBlock(cb.NewBlock(0, nil), nil), cb.BlockMetadataIndex_SIGNATURES)
assert.NotNil(t, actual, "Block should have block signature")
Expand All @@ -98,7 +98,7 @@ func TestWriteBlockSignatures(t *testing.T) {
func TestWriteBlockOrdererMetadata(t *testing.T) {
ml := &mockLedgerReadWriter{}
cm := &mockconfigtx.Manager{}
cs := &chainSupport{ledgerResources: &ledgerResources{configResources: &configResources{Manager: cm}, ledger: ml}, signer: mockCrypto()}
cs := &ChainSupport{ledgerResources: &ledgerResources{configResources: &configResources{Manager: cm}, ledger: ml}, signer: mockCrypto()}

value := []byte("foo")
expected := &cb.Metadata{Value: value}
Expand All @@ -110,7 +110,7 @@ func TestWriteBlockOrdererMetadata(t *testing.T) {
func TestSignature(t *testing.T) {
ml := &mockLedgerReadWriter{}
cm := &mockconfigtx.Manager{}
cs := &chainSupport{ledgerResources: &ledgerResources{configResources: &configResources{Manager: cm}, ledger: ml}, signer: mockCrypto()}
cs := &ChainSupport{ledgerResources: &ledgerResources{configResources: &configResources{Manager: cm}, ledger: ml}, signer: mockCrypto()}

message := []byte("Darth Vader")
signed, _ := cs.Sign(message)
Expand All @@ -124,7 +124,7 @@ func TestSignature(t *testing.T) {
func TestWriteLastConfig(t *testing.T) {
ml := &mockLedgerReadWriter{}
cm := &mockconfigtx.Manager{}
cs := &chainSupport{ledgerResources: &ledgerResources{configResources: &configResources{Manager: cm}, ledger: ml}, signer: mockCrypto()}
cs := &ChainSupport{ledgerResources: &ledgerResources{configResources: &configResources{Manager: cm}, ledger: ml}, signer: mockCrypto()}

expected := uint64(0)
lc := utils.GetLastConfigIndexFromBlockOrPanic(cs.WriteBlock(cb.NewBlock(0, nil), nil))
Expand All @@ -144,7 +144,7 @@ func TestWriteLastConfig(t *testing.T) {
cm.SequenceVal = 2
expected = uint64(4)

cs = &chainSupport{ledgerResources: &ledgerResources{configResources: &configResources{Manager: cm}, ledger: ml}, signer: mockCrypto()}
cs = &ChainSupport{ledgerResources: &ledgerResources{configResources: &configResources{Manager: cm}, ledger: ml}, signer: mockCrypto()}
lc := utils.GetLastConfigIndexFromBlockOrPanic(cs.WriteBlock(cb.NewBlock(4, nil), nil))
assert.Equal(t, expected, lc, "Second block should have config block index of %d, but got %d", expected, lc)

Expand Down
Loading

0 comments on commit 9b1490e

Please sign in to comment.