Skip to content

Commit

Permalink
[FAB-5270] Remove filters from multichannel
Browse files Browse the repository at this point in the history
The only consumer of the filters framework is now the msgprocessor
package.

This CR removes all of the filter references from multichannel, and
moves the filter code from fabric/orderer/common to
fabric/orderer/common/msgprocessor.

Change-Id: I73a562887776086c62212167c71bd72c8894b286
Signed-off-by: Jason Yellick <[email protected]>
Jason Yellick committed Jul 27, 2017
1 parent 9018aea commit 204f0f4
Showing 20 changed files with 170 additions and 126 deletions.
4 changes: 2 additions & 2 deletions orderer/common/deliver/deliver.go
Original file line number Diff line number Diff line change
@@ -20,9 +20,9 @@ import (
"io"

"github.com/hyperledger/fabric/common/policies"
"github.com/hyperledger/fabric/orderer/common/filter"
"github.com/hyperledger/fabric/orderer/common/ledger"
"github.com/hyperledger/fabric/orderer/common/sigfilter"
"github.com/hyperledger/fabric/orderer/common/msgprocessor/filter"
"github.com/hyperledger/fabric/orderer/common/msgprocessor/sigfilter"
cb "github.com/hyperledger/fabric/protos/common"
ab "github.com/hyperledger/fabric/protos/orderer"
"github.com/op/go-logging"
Original file line number Diff line number Diff line change
@@ -19,7 +19,7 @@ package configtxfilter
import (
"github.com/hyperledger/fabric/common/configtx"
"github.com/hyperledger/fabric/common/configtx/api"
"github.com/hyperledger/fabric/orderer/common/filter"
"github.com/hyperledger/fabric/orderer/common/msgprocessor/filter"
cb "github.com/hyperledger/fabric/protos/common"
"github.com/hyperledger/fabric/protos/utils"
)
Original file line number Diff line number Diff line change
@@ -21,7 +21,7 @@ import (
"testing"

mockconfigtx "github.com/hyperledger/fabric/common/mocks/configtx"
"github.com/hyperledger/fabric/orderer/common/filter"
"github.com/hyperledger/fabric/orderer/common/msgprocessor/filter"
cb "github.com/hyperledger/fabric/protos/common"
"github.com/hyperledger/fabric/protos/utils"

File renamed without changes.
File renamed without changes.
3 changes: 3 additions & 0 deletions orderer/common/msgprocessor/msgprocessor.go
Original file line number Diff line number Diff line change
@@ -11,9 +11,12 @@ package msgprocessor
import (
"errors"

"github.com/hyperledger/fabric/common/flogging"
cb "github.com/hyperledger/fabric/protos/common"
)

var logger = flogging.MustGetLogger("common/msgprocessor")

const (
// These should eventually be derived from the channel support once enabled
msgVersion = int32(0)
Original file line number Diff line number Diff line change
@@ -18,7 +18,7 @@ package sigfilter

import (
"github.com/hyperledger/fabric/common/policies"
"github.com/hyperledger/fabric/orderer/common/filter"
"github.com/hyperledger/fabric/orderer/common/msgprocessor/filter"
cb "github.com/hyperledger/fabric/protos/common"

"github.com/op/go-logging"
Original file line number Diff line number Diff line change
@@ -21,7 +21,7 @@ import (
"testing"

mockpolicies "github.com/hyperledger/fabric/common/mocks/policies"
"github.com/hyperledger/fabric/orderer/common/filter"
"github.com/hyperledger/fabric/orderer/common/msgprocessor/filter"
cb "github.com/hyperledger/fabric/protos/common"
"github.com/hyperledger/fabric/protos/utils"

Original file line number Diff line number Diff line change
@@ -17,7 +17,7 @@ limitations under the License.
package sizefilter

import (
"github.com/hyperledger/fabric/orderer/common/filter"
"github.com/hyperledger/fabric/orderer/common/msgprocessor/filter"
cb "github.com/hyperledger/fabric/protos/common"
ab "github.com/hyperledger/fabric/protos/orderer"
logging "github.com/op/go-logging"
Original file line number Diff line number Diff line change
@@ -21,7 +21,7 @@ import (

"github.com/golang/protobuf/proto"
mockconfig "github.com/hyperledger/fabric/common/mocks/config"
"github.com/hyperledger/fabric/orderer/common/filter"
"github.com/hyperledger/fabric/orderer/common/msgprocessor/filter"
cb "github.com/hyperledger/fabric/protos/common"
ab "github.com/hyperledger/fabric/protos/orderer"
)
37 changes: 29 additions & 8 deletions orderer/common/msgprocessor/standardchannel.go
Original file line number Diff line number Diff line change
@@ -7,8 +7,13 @@ SPDX-License-Identifier: Apache-2.0
package msgprocessor

import (
configtxapi "github.com/hyperledger/fabric/common/configtx/api"
"github.com/hyperledger/fabric/common/crypto"
"github.com/hyperledger/fabric/orderer/common/filter"
"github.com/hyperledger/fabric/common/policies"
"github.com/hyperledger/fabric/orderer/common/msgprocessor/configtxfilter"
"github.com/hyperledger/fabric/orderer/common/msgprocessor/filter"
"github.com/hyperledger/fabric/orderer/common/msgprocessor/sigfilter"
"github.com/hyperledger/fabric/orderer/common/msgprocessor/sizefilter"
cb "github.com/hyperledger/fabric/protos/common"
"github.com/hyperledger/fabric/protos/utils"
)
@@ -21,9 +26,6 @@ type StandardChannelSupport interface {
// ChainID returns the ChannelID.
ChainID() string

// Filters returns the set of filters for the channel.
Filters() *filter.RuleSet

// Signer returns the signer for this orderer.
Signer() crypto.LocalSigner

@@ -35,16 +37,33 @@ type StandardChannelSupport interface {
// StandardChannel implements the Processor interface for standard extant channels.
type StandardChannel struct {
support StandardChannelSupport
filters *filter.RuleSet
}

// NewStandardChannel creates a new message processor for a standard channel.
func NewStandardChannel(support StandardChannelSupport) *StandardChannel {
func NewStandardChannel(support StandardChannelSupport, filters *filter.RuleSet) *StandardChannel {
return &StandardChannel{
filters: filters,
support: support,
}
}

// ClassifyMsg inspects the message to determine which type of processing is necessary.
// CreateStandardFilters creates the set of filters for a normal (non-system) chain
func CreateStandardFilters(filterSupport configtxapi.Manager) *filter.RuleSet {
ordererConfig, ok := filterSupport.OrdererConfig()
if !ok {
logger.Panicf("Missing orderer config")
}
return filter.NewRuleSet([]filter.Rule{
filter.EmptyRejectRule,
sizefilter.MaxBytesRule(ordererConfig),
sigfilter.New(policies.ChannelWriters, filterSupport.PolicyManager()),
configtxfilter.NewFilter(filterSupport),
filter.AcceptRule,
})
}

// ClassifyMsg inspects the message to determine which type of processing is necessary
func (s *StandardChannel) ClassifyMsg(chdr *cb.ChannelHeader) (Classification, error) {
switch chdr.Type {
case int32(cb.HeaderType_CONFIG_UPDATE):
@@ -66,18 +85,20 @@ func (s *StandardChannel) ClassifyMsg(chdr *cb.ChannelHeader) (Classification, e
// configuration sequence number and nil on success, or an error if the message is not valid.
func (s *StandardChannel) ProcessNormalMsg(env *cb.Envelope) (configSeq uint64, err error) {
configSeq = s.support.Sequence()
err = s.support.Filters().Apply(env)
err = s.filters.Apply(env)
return
}

// ProcessConfigUpdateMsg will attempt to apply the config impetus msg to the current configuration, and if successful
// return the resulting config message and the configSeq the config was computed from. If the config impetus message
// is invalid, an error is returned.
func (s *StandardChannel) ProcessConfigUpdateMsg(env *cb.Envelope) (config *cb.Envelope, configSeq uint64, err error) {
logger.Debugf("Processing config update message for channel %s", s.support.ChainID())

// Call Sequence first. If seq advances between proposal and acceptance, this is okay, and will cause reprocessing
// however, if Sequence is called last, then a success could be falsely attributed to a newer configSeq.
seq := s.support.Sequence()
err = s.support.Filters().Apply(env)
err = s.filters.Apply(env)
if err != nil {
return nil, 0, err
}
22 changes: 6 additions & 16 deletions orderer/common/msgprocessor/standardchannel_test.go
Original file line number Diff line number Diff line change
@@ -11,7 +11,7 @@ import (
"testing"

"github.com/hyperledger/fabric/common/crypto"
"github.com/hyperledger/fabric/orderer/common/filter"
"github.com/hyperledger/fabric/orderer/common/msgprocessor/filter"
cb "github.com/hyperledger/fabric/protos/common"

"github.com/stretchr/testify/assert"
@@ -20,16 +20,11 @@ import (
const testChannelID = "foo"

type mockSupport struct {
filters *filter.RuleSet
ProposeConfigUpdateVal *cb.ConfigEnvelope
ProposeConfigUpdateErr error
SequenceVal uint64
}

func (ms *mockSupport) Filters() *filter.RuleSet {
return ms.filters
}

func (ms *mockSupport) ProposeConfigUpdate(env *cb.Envelope) (*cb.ConfigEnvelope, error) {
return ms.ProposeConfigUpdateVal, ms.ProposeConfigUpdateErr
}
@@ -72,9 +67,8 @@ func TestClassifyMsg(t *testing.T) {
func TestProcessNormalMsg(t *testing.T) {
ms := &mockSupport{
SequenceVal: 7,
filters: filter.NewRuleSet([]filter.Rule{filter.AcceptRule}),
}
cs, err := NewStandardChannel(ms).ProcessNormalMsg(nil)
cs, err := NewStandardChannel(ms, filter.NewRuleSet([]filter.Rule{filter.AcceptRule})).ProcessNormalMsg(nil)
assert.Equal(t, cs, ms.SequenceVal)
assert.Nil(t, err)
}
@@ -84,18 +78,15 @@ func TestConfigUpdateMsg(t *testing.T) {
ms := &mockSupport{
ProposeConfigUpdateVal: &cb.ConfigEnvelope{},
ProposeConfigUpdateErr: fmt.Errorf("An error"),
filters: filter.NewRuleSet([]filter.Rule{filter.EmptyRejectRule}),
}
config, cs, err := NewStandardChannel(ms).ProcessConfigUpdateMsg(&cb.Envelope{})
config, cs, err := NewStandardChannel(ms, filter.NewRuleSet([]filter.Rule{filter.EmptyRejectRule})).ProcessConfigUpdateMsg(&cb.Envelope{})
assert.Nil(t, config)
assert.Equal(t, uint64(0), cs)
assert.NotNil(t, err)
})
t.Run("SignedEnvelopeFailure", func(t *testing.T) {
ms := &mockSupport{
filters: filter.NewRuleSet([]filter.Rule{filter.AcceptRule}),
}
config, cs, err := NewStandardChannel(ms).ProcessConfigUpdateMsg(nil)
ms := &mockSupport{}
config, cs, err := NewStandardChannel(ms, filter.NewRuleSet([]filter.Rule{filter.AcceptRule})).ProcessConfigUpdateMsg(nil)
assert.Nil(t, config)
assert.Equal(t, uint64(0), cs)
assert.NotNil(t, err)
@@ -105,9 +96,8 @@ func TestConfigUpdateMsg(t *testing.T) {
ms := &mockSupport{
SequenceVal: 7,
ProposeConfigUpdateVal: &cb.ConfigEnvelope{},
filters: filter.NewRuleSet([]filter.Rule{filter.AcceptRule}),
}
config, cs, err := NewStandardChannel(ms).ProcessConfigUpdateMsg(nil)
config, cs, err := NewStandardChannel(ms, filter.NewRuleSet([]filter.Rule{filter.AcceptRule})).ProcessConfigUpdateMsg(nil)
assert.NotNil(t, config)
assert.Equal(t, cs, ms.SequenceVal)
assert.Nil(t, err)
37 changes: 35 additions & 2 deletions orderer/common/msgprocessor/systemchannel.go
Original file line number Diff line number Diff line change
@@ -8,6 +8,12 @@ package msgprocessor

import (
configtxapi "github.com/hyperledger/fabric/common/configtx/api"
"github.com/hyperledger/fabric/common/policies"
"github.com/hyperledger/fabric/orderer/common/msgprocessor/configtxfilter"
"github.com/hyperledger/fabric/orderer/common/msgprocessor/filter"
"github.com/hyperledger/fabric/orderer/common/msgprocessor/sigfilter"
"github.com/hyperledger/fabric/orderer/common/msgprocessor/sizefilter"
"github.com/hyperledger/fabric/orderer/common/msgprocessor/systemchannelfilter"
cb "github.com/hyperledger/fabric/protos/common"
"github.com/hyperledger/fabric/protos/utils"
)
@@ -25,13 +31,36 @@ type SystemChannel struct {
}

// NewSystemChannel creates a new system channel message processor
func NewSystemChannel(support StandardChannelSupport, systemChannelSupport SystemChannelSupport) *SystemChannel {
func NewSystemChannel(support StandardChannelSupport, systemChannelSupport SystemChannelSupport, filters *filter.RuleSet) *SystemChannel {
logger.Debugf("Creating system channel msg processor for channel %s", support.ChainID())
return &SystemChannel{
StandardChannel: NewStandardChannel(support),
StandardChannel: NewStandardChannel(support, filters),
systemChannelSupport: systemChannelSupport,
}
}

// SystemChannelFilterSupport specifies the subset of the full channel support required to create the filter.
type SystemChannelFilterSupport interface {
SystemChannelSupport
configtxapi.Manager
}

// CreateSystemChannelFilters creates the set of filters for the ordering system chain
func CreateSystemChannelFilters(chainCreator systemchannelfilter.ChainCreator, ledgerResources configtxapi.Manager) *filter.RuleSet {
ordererConfig, ok := ledgerResources.OrdererConfig()
if !ok {
logger.Panicf("Cannot create system channel filters without orderer config")
}
return filter.NewRuleSet([]filter.Rule{
filter.EmptyRejectRule,
sizefilter.MaxBytesRule(ordererConfig),
sigfilter.New(policies.ChannelWriters, ledgerResources.PolicyManager()),
systemchannelfilter.New(ledgerResources, chainCreator),
configtxfilter.NewFilter(ledgerResources),
filter.AcceptRule,
})
}

// ProcessNormalMsg handles normal messages, rejecting them if they are not bound for the system channel ID
// with ErrChannelDoesNotExist.
func (s *SystemChannel) ProcessNormalMsg(msg *cb.Envelope) (configSeq uint64, err error) {
@@ -60,12 +89,16 @@ func (s *SystemChannel) ProcessConfigUpdateMsg(envConfigUpdate *cb.Envelope) (co
return nil, 0, err
}

logger.Debugf("Processing config update tx with system channel message processor for channel ID %s", channelID)

if channelID == s.support.ChainID() {
return s.StandardChannel.ProcessConfigUpdateMsg(envConfigUpdate)
}

// XXX we should check that the signature on the outer envelope is at least valid for some MSP in the system channel

logger.Debugf("Processing channel create tx for channel %s on system channel %s", channelID, s.support.ChainID())

// If the channel ID does not match the system channel, then this must be a channel creation transaction

ctxm, err := s.systemChannelSupport.NewChannelConfig(envConfigUpdate)
22 changes: 10 additions & 12 deletions orderer/common/msgprocessor/systemchannel_test.go
Original file line number Diff line number Diff line change
@@ -12,7 +12,7 @@ import (

configtxapi "github.com/hyperledger/fabric/common/configtx/api"
mockconfigtx "github.com/hyperledger/fabric/common/mocks/configtx"
"github.com/hyperledger/fabric/orderer/common/filter"
"github.com/hyperledger/fabric/orderer/common/msgprocessor/filter"
cb "github.com/hyperledger/fabric/protos/common"
"github.com/hyperledger/fabric/protos/utils"

@@ -32,14 +32,14 @@ func TestProcessSystemChannelNormalMsg(t *testing.T) {
t.Run("Missing header", func(t *testing.T) {
mscs := &mockSystemChannelSupport{}
ms := &mockSupport{}
_, err := NewSystemChannel(ms, mscs).ProcessNormalMsg(&cb.Envelope{})
_, err := NewSystemChannel(ms, mscs, nil).ProcessNormalMsg(&cb.Envelope{})
assert.NotNil(t, err)
assert.Regexp(t, "no header was set", err.Error())
})
t.Run("Mismatched channel ID", func(t *testing.T) {
mscs := &mockSystemChannelSupport{}
ms := &mockSupport{}
_, err := NewSystemChannel(ms, mscs).ProcessNormalMsg(&cb.Envelope{
_, err := NewSystemChannel(ms, mscs, nil).ProcessNormalMsg(&cb.Envelope{
Payload: utils.MarshalOrPanic(&cb.Payload{
Header: &cb.Header{
ChannelHeader: utils.MarshalOrPanic(&cb.ChannelHeader{
@@ -54,9 +54,8 @@ func TestProcessSystemChannelNormalMsg(t *testing.T) {
mscs := &mockSystemChannelSupport{}
ms := &mockSupport{
SequenceVal: 7,
filters: filter.NewRuleSet([]filter.Rule{filter.AcceptRule}),
}
cs, err := NewSystemChannel(ms, mscs).ProcessNormalMsg(&cb.Envelope{
cs, err := NewSystemChannel(ms, mscs, filter.NewRuleSet([]filter.Rule{filter.AcceptRule})).ProcessNormalMsg(&cb.Envelope{
Payload: utils.MarshalOrPanic(&cb.Payload{
Header: &cb.Header{
ChannelHeader: utils.MarshalOrPanic(&cb.ChannelHeader{
@@ -75,7 +74,7 @@ func TestSystemChannelConfigUpdateMsg(t *testing.T) {
t.Run("Missing header", func(t *testing.T) {
mscs := &mockSystemChannelSupport{}
ms := &mockSupport{}
_, _, err := NewSystemChannel(ms, mscs).ProcessConfigUpdateMsg(&cb.Envelope{})
_, _, err := NewSystemChannel(ms, mscs, nil).ProcessConfigUpdateMsg(&cb.Envelope{})
assert.NotNil(t, err)
assert.Regexp(t, "no header was set", err.Error())
})
@@ -84,9 +83,8 @@ func TestSystemChannelConfigUpdateMsg(t *testing.T) {
ms := &mockSupport{
SequenceVal: 7,
ProposeConfigUpdateVal: &cb.ConfigEnvelope{},
filters: filter.NewRuleSet([]filter.Rule{filter.AcceptRule}),
}
config, cs, err := NewSystemChannel(ms, mscs).ProcessConfigUpdateMsg(&cb.Envelope{
config, cs, err := NewSystemChannel(ms, mscs, filter.NewRuleSet([]filter.Rule{filter.AcceptRule})).ProcessConfigUpdateMsg(&cb.Envelope{
Payload: utils.MarshalOrPanic(&cb.Payload{
Header: &cb.Header{
ChannelHeader: utils.MarshalOrPanic(&cb.ChannelHeader{
@@ -106,7 +104,7 @@ func TestSystemChannelConfigUpdateMsg(t *testing.T) {
ms := &mockSupport{
ProposeConfigUpdateVal: &cb.ConfigEnvelope{},
}
_, _, err := NewSystemChannel(ms, mscs).ProcessConfigUpdateMsg(&cb.Envelope{
_, _, err := NewSystemChannel(ms, mscs, nil).ProcessConfigUpdateMsg(&cb.Envelope{
Payload: utils.MarshalOrPanic(&cb.Payload{
Header: &cb.Header{
ChannelHeader: utils.MarshalOrPanic(&cb.ChannelHeader{
@@ -126,7 +124,7 @@ func TestSystemChannelConfigUpdateMsg(t *testing.T) {
ms := &mockSupport{
ProposeConfigUpdateVal: &cb.ConfigEnvelope{},
}
_, _, err := NewSystemChannel(ms, mscs).ProcessConfigUpdateMsg(&cb.Envelope{
_, _, err := NewSystemChannel(ms, mscs, nil).ProcessConfigUpdateMsg(&cb.Envelope{
Payload: utils.MarshalOrPanic(&cb.Payload{
Header: &cb.Header{
ChannelHeader: utils.MarshalOrPanic(&cb.ChannelHeader{
@@ -144,7 +142,7 @@ func TestSystemChannelConfigUpdateMsg(t *testing.T) {
ms := &mockSupport{
ProposeConfigUpdateVal: &cb.ConfigEnvelope{},
}
_, _, err := NewSystemChannel(ms, mscs).ProcessConfigUpdateMsg(&cb.Envelope{
_, _, err := NewSystemChannel(ms, mscs, nil).ProcessConfigUpdateMsg(&cb.Envelope{
Payload: utils.MarshalOrPanic(&cb.Payload{
Header: &cb.Header{
ChannelHeader: utils.MarshalOrPanic(&cb.ChannelHeader{
@@ -165,7 +163,7 @@ func TestSystemChannelConfigUpdateMsg(t *testing.T) {
SequenceVal: 7,
ProposeConfigUpdateVal: &cb.ConfigEnvelope{},
}
config, cs, err := NewSystemChannel(ms, mscs).ProcessConfigUpdateMsg(&cb.Envelope{
config, cs, err := NewSystemChannel(ms, mscs, nil).ProcessConfigUpdateMsg(&cb.Envelope{
Payload: utils.MarshalOrPanic(&cb.Payload{
Header: &cb.Header{
ChannelHeader: utils.MarshalOrPanic(&cb.ChannelHeader{
Loading

0 comments on commit 204f0f4

Please sign in to comment.