From ed9517ea1c8dc495cc6c1cd9e0593ee5192468b9 Mon Sep 17 00:00:00 2001 From: Jason Yellick Date: Fri, 7 Jul 2017 12:42:06 -0400 Subject: [PATCH] [FAB-5265] Rm blockcutter message validation The primary goal of the series in FAB-5258 is to prevent all OSNs from having to validate all messages for all channels. Since all messages pass through the block cutter, the block cutter cannot be involved in message validation as it currently is. This CR pulls the message validation out of the blockcutter and pushes it into the msgprocessor definitions. Ultimately, the msgprocessor interfaces will only be called if necessary, eliminating the performance bottleneck. Change-Id: I3c0d41e47873aa6e764c70fd176722306f00655c Signed-off-by: Jason Yellick --- orderer/common/blockcutter/blockcutter.go | 16 +-- .../common/blockcutter/blockcutter_test.go | 121 +----------------- orderer/common/multichannel/chainsupport.go | 2 +- orderer/consensus/kafka/chain.go | 18 ++- orderer/consensus/solo/consensus.go | 17 ++- 5 files changed, 30 insertions(+), 144 deletions(-) diff --git a/orderer/common/blockcutter/blockcutter.go b/orderer/common/blockcutter/blockcutter.go index 3105aa207d0..eb378373187 100644 --- a/orderer/common/blockcutter/blockcutter.go +++ b/orderer/common/blockcutter/blockcutter.go @@ -51,17 +51,15 @@ type Receiver interface { type receiver struct { sharedConfigManager config.Orderer - filters *filter.RuleSet pendingBatch []*cb.Envelope pendingBatchSizeBytes uint32 pendingCommitters []filter.Committer } // NewReceiverImpl creates a Receiver implementation based on the given configtxorderer manager and filters -func NewReceiverImpl(sharedConfigManager config.Orderer, filters *filter.RuleSet) Receiver { +func NewReceiverImpl(sharedConfigManager config.Orderer) Receiver { return &receiver{ sharedConfigManager: sharedConfigManager, - filters: filters, } } @@ -83,17 +81,7 @@ func NewReceiverImpl(sharedConfigManager config.Orderer, filters *filter.RuleSet // - The current message will cause the pending batch size in bytes to exceed BatchSize.PreferredMaxBytes. // - After adding the current message to the pending batch, the message count has reached BatchSize.MaxMessageCount. func (r *receiver) Ordered(msg *cb.Envelope) ([][]*cb.Envelope, bool) { - // The messages must be filtered a second time in case configuration has changed since the message was received - committer, err := r.filters.Apply(msg) - if err != nil { - logger.Debugf("Rejecting message: %s", err) - return nil, false - } - - if committer.Isolated() { - logger.Panicf("The use of isolated committers has been deprecated and should no longer appear in this path") - } - + // The messages are not filtered a second time, this is pushed onto the Consenter messageSizeBytes := messageSizeBytes(msg) if messageSizeBytes > r.sharedConfigManager.BatchSize().PreferredMaxBytes { diff --git a/orderer/common/blockcutter/blockcutter_test.go b/orderer/common/blockcutter/blockcutter_test.go index 53a8a1eac8f..0c17b6b3d35 100644 --- a/orderer/common/blockcutter/blockcutter_test.go +++ b/orderer/common/blockcutter/blockcutter_test.go @@ -17,11 +17,9 @@ limitations under the License. package blockcutter import ( - "bytes" "testing" mockconfig "github.com/hyperledger/fabric/common/mocks/config" - "github.com/hyperledger/fabric/orderer/common/filter" cb "github.com/hyperledger/fabric/protos/common" ab "github.com/hyperledger/fabric/protos/orderer" @@ -33,59 +31,14 @@ func init() { logging.SetLevel(logging.DEBUG, "") } -type isolatedCommitter struct{} - -func (ic isolatedCommitter) Isolated() bool { return true } - -func (ic isolatedCommitter) Commit() {} - -type mockIsolatedFilter struct{} - -func (mif *mockIsolatedFilter) Apply(msg *cb.Envelope) (filter.Action, filter.Committer) { - if bytes.Equal(msg.Payload, isolatedTx.Payload) { - return filter.Accept, isolatedCommitter{} - } - return filter.Forward, nil -} - -type mockRejectFilter struct{} - -func (mrf mockRejectFilter) Apply(message *cb.Envelope) (filter.Action, filter.Committer) { - if bytes.Equal(message.Payload, badTx.Payload) { - return filter.Reject, nil - } - return filter.Forward, nil -} - -type mockAcceptFilter struct{} - -func (mrf mockAcceptFilter) Apply(message *cb.Envelope) (filter.Action, filter.Committer) { - if bytes.Equal(message.Payload, goodTx.Payload) { - return filter.Accept, filter.NoopCommitter - } - return filter.Forward, nil -} - -func getFilters() *filter.RuleSet { - return filter.NewRuleSet([]filter.Rule{ - &mockIsolatedFilter{}, - &mockRejectFilter{}, - &mockAcceptFilter{}, - }) -} - -var badTx = &cb.Envelope{Payload: []byte("BAD")} var goodTx = &cb.Envelope{Payload: []byte("GOOD")} var goodTxLarge = &cb.Envelope{Payload: []byte("GOOD"), Signature: make([]byte, 1000)} -var isolatedTx = &cb.Envelope{Payload: []byte("ISOLATED")} -var unmatchedTx = &cb.Envelope{Payload: []byte("UNMATCHED")} func TestNormalBatch(t *testing.T) { - filters := getFilters() maxMessageCount := uint32(2) absoluteMaxBytes := uint32(1000) preferredMaxBytes := uint32(100) - r := NewReceiverImpl(&mockconfig.Orderer{BatchSizeVal: &ab.BatchSize{MaxMessageCount: maxMessageCount, AbsoluteMaxBytes: absoluteMaxBytes, PreferredMaxBytes: preferredMaxBytes}}, filters) + r := NewReceiverImpl(&mockconfig.Orderer{BatchSizeVal: &ab.BatchSize{MaxMessageCount: maxMessageCount, AbsoluteMaxBytes: absoluteMaxBytes, PreferredMaxBytes: preferredMaxBytes}}) batches, ok := r.Ordered(goodTx) assert.Nil(t, batches, "Should not have created batch") @@ -96,73 +49,7 @@ func TestNormalBatch(t *testing.T) { assert.True(t, ok, "Should have enqueued second message into batch") } -func TestBadMessageInBatch(t *testing.T) { - filters := getFilters() - maxMessageCount := uint32(2) - absoluteMaxBytes := uint32(1000) - preferredMaxBytes := uint32(100) - r := NewReceiverImpl(&mockconfig.Orderer{BatchSizeVal: &ab.BatchSize{MaxMessageCount: maxMessageCount, AbsoluteMaxBytes: absoluteMaxBytes, PreferredMaxBytes: preferredMaxBytes}}, filters) - - batches, ok := r.Ordered(badTx) - assert.Nil(t, batches, "Should not have created batch") - assert.False(t, ok, "Should not have enqueued bad message into batch") - - batches, ok = r.Ordered(goodTx) - assert.Nil(t, batches, "Should not have created batch") - assert.True(t, ok, "Should have enqueued good message into batch") - - batches, ok = r.Ordered(badTx) - assert.Nil(t, batches, "Should not have created batch") - assert.False(t, ok, "Should not have enqueued second bad message into batch") -} - -func TestUnmatchedMessageInBatch(t *testing.T) { - filters := getFilters() - maxMessageCount := uint32(2) - absoluteMaxBytes := uint32(1000) - preferredMaxBytes := uint32(100) - r := NewReceiverImpl(&mockconfig.Orderer{BatchSizeVal: &ab.BatchSize{MaxMessageCount: maxMessageCount, AbsoluteMaxBytes: absoluteMaxBytes, PreferredMaxBytes: preferredMaxBytes}}, filters) - - batches, ok := r.Ordered(unmatchedTx) - assert.Nil(t, batches, "Should not have created batch") - assert.False(t, ok, "Should not have enqueued unmatched message into batch") - - batches, ok = r.Ordered(goodTx) - assert.Nil(t, batches, "Should not have created batch") - assert.True(t, ok, "Should have enqueued good message into batch") - - batches, ok = r.Ordered(unmatchedTx) - assert.Nil(t, batches, "Should not have created batch from unmatched message") - assert.False(t, ok, "Should not have enqueued second bad message into batch") -} - -func TestIsolatedEmptyBatch(t *testing.T) { - filters := getFilters() - maxMessageCount := uint32(2) - absoluteMaxBytes := uint32(1000) - preferredMaxBytes := uint32(100) - r := NewReceiverImpl(&mockconfig.Orderer{BatchSizeVal: &ab.BatchSize{MaxMessageCount: maxMessageCount, AbsoluteMaxBytes: absoluteMaxBytes, PreferredMaxBytes: preferredMaxBytes}}, filters) - - assert.Panics(t, func() { r.Ordered(isolatedTx) }, "Should not have handled an isolated by committer message") -} - -func TestIsolatedPartialBatch(t *testing.T) { - filters := getFilters() - maxMessageCount := uint32(2) - absoluteMaxBytes := uint32(1000) - preferredMaxBytes := uint32(100) - r := NewReceiverImpl(&mockconfig.Orderer{BatchSizeVal: &ab.BatchSize{MaxMessageCount: maxMessageCount, AbsoluteMaxBytes: absoluteMaxBytes, PreferredMaxBytes: preferredMaxBytes}}, filters) - - batches, ok := r.Ordered(goodTx) - assert.Nil(t, batches, "Should not have created batch") - assert.True(t, ok, "Should have enqueued good message into batch") - - assert.Panics(t, func() { r.Ordered(isolatedTx) }, "Should not have handled an isolated by committer message") -} - func TestBatchSizePreferredMaxBytesOverflow(t *testing.T) { - filters := getFilters() - goodTxBytes := messageSizeBytes(goodTx) // set preferred max bytes such that 10 goodTx will not fit @@ -171,7 +58,7 @@ func TestBatchSizePreferredMaxBytesOverflow(t *testing.T) { // set message count > 9 maxMessageCount := uint32(20) - r := NewReceiverImpl(&mockconfig.Orderer{BatchSizeVal: &ab.BatchSize{MaxMessageCount: maxMessageCount, AbsoluteMaxBytes: preferredMaxBytes * 2, PreferredMaxBytes: preferredMaxBytes}}, filters) + r := NewReceiverImpl(&mockconfig.Orderer{BatchSizeVal: &ab.BatchSize{MaxMessageCount: maxMessageCount, AbsoluteMaxBytes: preferredMaxBytes * 2, PreferredMaxBytes: preferredMaxBytes}}) // enqueue 9 messages for i := 0; i < 9; i++ { @@ -194,8 +81,6 @@ func TestBatchSizePreferredMaxBytesOverflow(t *testing.T) { } func TestBatchSizePreferredMaxBytesOverflowNoPending(t *testing.T) { - filters := getFilters() - goodTxLargeBytes := messageSizeBytes(goodTxLarge) // set preferred max bytes such that 1 goodTxLarge will not fit @@ -204,7 +89,7 @@ func TestBatchSizePreferredMaxBytesOverflowNoPending(t *testing.T) { // set message count > 1 maxMessageCount := uint32(20) - r := NewReceiverImpl(&mockconfig.Orderer{BatchSizeVal: &ab.BatchSize{MaxMessageCount: maxMessageCount, AbsoluteMaxBytes: preferredMaxBytes * 3, PreferredMaxBytes: preferredMaxBytes}}, filters) + r := NewReceiverImpl(&mockconfig.Orderer{BatchSizeVal: &ab.BatchSize{MaxMessageCount: maxMessageCount, AbsoluteMaxBytes: preferredMaxBytes * 3, PreferredMaxBytes: preferredMaxBytes}}) // submit large message batches, ok := r.Ordered(goodTxLarge) diff --git a/orderer/common/multichannel/chainsupport.go b/orderer/common/multichannel/chainsupport.go index 5c134d9d3be..2a4e85d8edc 100644 --- a/orderer/common/multichannel/chainsupport.go +++ b/orderer/common/multichannel/chainsupport.go @@ -52,7 +52,7 @@ func newChainSupport( signer crypto.LocalSigner, ) *ChainSupport { - cutter := blockcutter.NewReceiverImpl(ledgerResources.SharedConfig(), filters) + cutter := blockcutter.NewReceiverImpl(ledgerResources.SharedConfig()) consenterType := ledgerResources.SharedConfig().ConsensusType() consenter, ok := consenters[consenterType] if !ok { diff --git a/orderer/consensus/kafka/chain.go b/orderer/consensus/kafka/chain.go index 0bae1fda615..80ca854f422 100644 --- a/orderer/consensus/kafka/chain.go +++ b/orderer/consensus/kafka/chain.go @@ -379,21 +379,27 @@ func processRegular(regularMessage *ab.KafkaMessageRegular, support consensus.Co } switch class { case msgprocessor.ConfigUpdateMsg: - batch := support.BlockCutter().Cut() - if batch != nil { - block := support.CreateNextBlock(batch) - support.WriteBlock(block, nil) - } - _, err := support.ProcessNormalMsg(env) if err != nil { logger.Warningf("[channel: %s] Discarding bad config message: %s", support.ChainID(), err) break } + + batch := support.BlockCutter().Cut() + if batch != nil { + block := support.CreateNextBlock(batch) + support.WriteBlock(block, nil) + } block := support.CreateNextBlock([]*cb.Envelope{env}) support.WriteConfigBlock(block, nil) *timer = nil case msgprocessor.NormalMsg: + _, err := support.ProcessNormalMsg(env) + if err != nil { + logger.Warningf("Discarding bad normal message: %s", err) + break + } + batches, ok := support.BlockCutter().Ordered(env) logger.Debugf("[channel: %s] Ordering results: items in batch = %d, ok = %v", support.ChainID(), len(batches), ok) if ok && len(batches) == 0 && *timer == nil { diff --git a/orderer/consensus/solo/consensus.go b/orderer/consensus/solo/consensus.go index 3adc70d50d7..a9bae3b2c0d 100644 --- a/orderer/consensus/solo/consensus.go +++ b/orderer/consensus/solo/consensus.go @@ -95,21 +95,28 @@ func (ch *chain) main() { } switch class { case msgprocessor.ConfigUpdateMsg: + _, err := ch.support.ProcessNormalMsg(msg) + if err != nil { + logger.Warningf("Discarding bad config message: %s", err) + continue + } + batch := ch.support.BlockCutter().Cut() if batch != nil { block := ch.support.CreateNextBlock(batch) ch.support.WriteBlock(block, nil) } - _, err := ch.support.ProcessNormalMsg(msg) - if err != nil { - logger.Warningf("Discarding bad config message: %s", err) - continue - } block := ch.support.CreateNextBlock([]*cb.Envelope{msg}) ch.support.WriteConfigBlock(block, nil) timer = nil case msgprocessor.NormalMsg: + _, err := ch.support.ProcessNormalMsg(msg) + if err != nil { + logger.Warningf("Discarding bad normal message: %s", err) + continue + } + batches, ok := ch.support.BlockCutter().Ordered(msg) if ok && len(batches) == 0 && timer == nil { timer = time.After(ch.support.SharedConfig().BatchTimeout())