diff --git a/orderer/multichain/chainsupport.go b/orderer/multichain/chainsupport.go new file mode 100644 index 00000000000..c2216bd75f4 --- /dev/null +++ b/orderer/multichain/chainsupport.go @@ -0,0 +1,152 @@ +/* +Copyright IBM Corp. 2016 All Rights Reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package multichain + +import ( + "github.com/hyperledger/fabric/orderer/common/blockcutter" + "github.com/hyperledger/fabric/orderer/common/broadcastfilter" + "github.com/hyperledger/fabric/orderer/common/configtx" + "github.com/hyperledger/fabric/orderer/common/policies" + "github.com/hyperledger/fabric/orderer/rawledger" + cb "github.com/hyperledger/fabric/protos/common" +) + +const XXXBatchSize = 10 // XXX + +// Consenter defines the backing ordering mechanism +type Consenter interface { + // HandleChain should create a return a reference to a Chain for the given set of resources + // It will only be invoked for a given chain once per process. See the description of Chain + // for more details + HandleChain(configManager configtx.Manager, cutter blockcutter.Receiver, rl rawledger.Writer, metadata []byte) Chain +} + +// Chain defines a way to inject messages for ordering +// Note, that in order to allow flexibility in the implementation, it is the responsibility of the implementer +// to take the ordered messages, send them through the blockcutter.Receiver supplied via HandleChain to cut blocks, +// and ultimately write the ledger also supplied via HandleChain. This flow allows for two primary flows +// 1. Messages are ordered into a stream, the stream is cut into blocks, the blocks are committed (solo, kafka) +// 2. Messages are cut into blocks, the blocks are ordered, then the blocks are committed (sbft) +type Chain interface { + // Enqueue accepts a message and returns true on acceptance, or false on shutdown + Enqueue(env *cb.Envelope) bool + + // Start should allocate whatever resources are needed for staying up to date with the chain + // Typically, this involves creating a thread which reads from the ordering source, passes those + // messages to a block cutter, and writes the resulting blocks to the ledger + Start() + + // Halt frees the resources which were allocated for this Chain + Halt() +} + +// ChainSupport provides a wrapper for the resources backing a chain +type ChainSupport interface { + // ConfigManager returns the current config for the chain + ConfigManager() configtx.Manager + + // PolicyManager returns the current policy manager as specified by the chain configuration + PolicyManager() policies.Manager + + // Filters returns the set of broadcast filters for this chain + Filters() *broadcastfilter.RuleSet + + // Reader returns the chain Reader for the chain + Reader() rawledger.Reader + + // Chain returns the consenter backed chain + Chain() Chain +} + +type chainSupport struct { + chain Chain + configManager configtx.Manager + policyManager policies.Manager + reader rawledger.Reader + writer rawledger.Writer + filters *broadcastfilter.RuleSet +} + +func newChainSupport(configManager configtx.Manager, policyManager policies.Manager, backing rawledger.ReadWriter, consenters map[string]Consenter) *chainSupport { + batchSize := XXXBatchSize // XXX Pull this from chain config + filters := createBroadcastRuleset(configManager) + cutter := blockcutter.NewReceiverImpl(batchSize, filters, configManager) + consenterType := "solo" // XXX retrieve this from the chain config + consenter, ok := consenters[consenterType] + if !ok { + logger.Fatalf("Error retrieving consenter of type: %s", consenterType) + } + + cs := &chainSupport{ + configManager: configManager, + policyManager: policyManager, + filters: filters, + reader: backing, + writer: newWriteInterceptor(configManager, backing), + } + + cs.chain = consenter.HandleChain(configManager, cutter, cs.writer, nil) + + return cs +} + +func createBroadcastRuleset(configManager configtx.Manager) *broadcastfilter.RuleSet { + return broadcastfilter.NewRuleSet([]broadcastfilter.Rule{ + broadcastfilter.EmptyRejectRule, + // configfilter.New(configManager), + broadcastfilter.AcceptRule, + }) +} + +func (cs *chainSupport) start() { + cs.chain.Start() +} + +func (cs *chainSupport) ConfigManager() configtx.Manager { + return cs.configManager +} + +func (cs *chainSupport) PolicyManager() policies.Manager { + return cs.policyManager +} + +func (cs *chainSupport) Filters() *broadcastfilter.RuleSet { + return cs.filters +} + +func (cs *chainSupport) Reader() rawledger.Reader { + return cs.reader +} + +func (cs *chainSupport) Chain() Chain { + return cs.chain +} + +type writeInterceptor struct { + backing rawledger.Writer +} + +// TODO ultimately set write interception policy by config +func newWriteInterceptor(configManager configtx.Manager, backing rawledger.Writer) *writeInterceptor { + return &writeInterceptor{ + backing: backing, + } +} + +func (wi *writeInterceptor) Append(blockContents []*cb.Envelope, metadata [][]byte) *cb.Block { + return wi.backing.Append(blockContents, metadata) +} diff --git a/orderer/multichain/chainsupport_mock_test.go b/orderer/multichain/chainsupport_mock_test.go new file mode 100644 index 00000000000..1be04b4f792 --- /dev/null +++ b/orderer/multichain/chainsupport_mock_test.go @@ -0,0 +1,65 @@ +/* +Copyright IBM Corp. 2016 All Rights Reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package multichain + +import ( + "github.com/hyperledger/fabric/orderer/common/blockcutter" + "github.com/hyperledger/fabric/orderer/common/configtx" + "github.com/hyperledger/fabric/orderer/rawledger" + cb "github.com/hyperledger/fabric/protos/common" +) + +type mockConsenter struct { +} + +func (mc *mockConsenter) HandleChain(configManager configtx.Manager, cutter blockcutter.Receiver, rl rawledger.Writer, metadata []byte) Chain { + return &mockChain{ + queue: make(chan *cb.Envelope), + ledger: rl, + cutter: cutter, + } +} + +type mockChain struct { + queue chan *cb.Envelope + ledger rawledger.Writer + cutter blockcutter.Receiver +} + +func (mch *mockChain) Enqueue(env *cb.Envelope) bool { + mch.queue <- env + return true +} + +func (mch *mockChain) Start() { + go func() { + for { + msg, ok := <-mch.queue + if !ok { + return + } + batches, _ := mch.cutter.Ordered(msg) + for _, batch := range batches { + mch.ledger.Append(batch, nil) + } + } + }() +} + +func (mch *mockChain) Halt() { + close(mch.queue) +} diff --git a/orderer/multichain/manager.go b/orderer/multichain/manager.go new file mode 100644 index 00000000000..d23f48afe37 --- /dev/null +++ b/orderer/multichain/manager.go @@ -0,0 +1,174 @@ +/* +Copyright IBM Corp. 2016 All Rights Reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package multichain + +import ( + "sync" + + "github.com/hyperledger/fabric/orderer/common/configtx" + "github.com/hyperledger/fabric/orderer/common/policies" + "github.com/hyperledger/fabric/orderer/rawledger" + cb "github.com/hyperledger/fabric/protos/common" + ab "github.com/hyperledger/fabric/protos/orderer" + + "github.com/golang/protobuf/proto" + "github.com/op/go-logging" +) + +var logger = logging.MustGetLogger("orderer/multichain") + +// XXX This crypto helper is a stand in until we have a real crypto handler +// it considers all signatures to be valid +type xxxCryptoHelper struct{} + +func (xxx xxxCryptoHelper) VerifySignature(msg []byte, ids []byte, sigs []byte) bool { + return true +} + +func init() { + logging.SetLevel(logging.DEBUG, "") +} + +// Manager coordinates the creation and access of chains +type Manager interface { + // GetChain retrieves the chain support for a chain (and whether it exists) + GetChain(chainID []byte) (ChainSupport, bool) +} + +type multiLedger struct { + chains map[string]*chainSupport + consenters map[string]Consenter + ledgerFactory rawledger.Factory + mutex sync.Mutex +} + +// getConfigTx, this should ultimately be done more intelligently, but for now, we search the whole chain for txs and pick the last config one +func getConfigTx(reader rawledger.Reader) *cb.Envelope { + var lastConfigTx *cb.Envelope + + it, _ := reader.Iterator(ab.SeekInfo_OLDEST, 0) + // Iterate over the blockchain, looking for config transactions, track the most recent one encountered + // this will be the transaction which is returned + for { + select { + case <-it.ReadyChan(): + block, status := it.Next() + if status != cb.Status_SUCCESS { + logger.Fatalf("Error parsing blockchain at startup: %v", status) + } + // ConfigTxs should always be by themselves + if len(block.Data.Data) != 1 { + continue + } + + maybeConfigTx := &cb.Envelope{} + + err := proto.Unmarshal(block.Data.Data[0], maybeConfigTx) + + if err != nil { + logger.Fatalf("Found data which was not an envelope: %s", err) + } + + payload := &cb.Payload{} + err = proto.Unmarshal(maybeConfigTx.Payload, payload) + + if payload.Header.ChainHeader.Type != int32(cb.HeaderType_CONFIGURATION_TRANSACTION) { + continue + } + + logger.Debugf("Found configuration transaction for chain %x at block %d", payload.Header.ChainHeader.ChainID, block.Header.Number) + lastConfigTx = maybeConfigTx + default: + return lastConfigTx + } + } +} + +// NewManagerImpl produces an instance of a Manager +func NewManagerImpl(ledgerFactory rawledger.Factory, consenters map[string]Consenter) Manager { + ml := &multiLedger{ + chains: make(map[string]*chainSupport), + ledgerFactory: ledgerFactory, + } + + existingChains := ledgerFactory.ChainIDs() + for _, chainID := range existingChains { + rl, err := ledgerFactory.GetOrCreate(chainID) + if err != nil { + logger.Fatalf("Ledger factory reported chainID %x but could not retrieve it: %s", chainID, err) + } + configTx := getConfigTx(rl) + if configTx == nil { + logger.Fatalf("Could not find configuration transaction for chain %x", chainID) + } + configManager, policyManager, backingLedger := ml.newResources(configTx) + chainID := configManager.ChainID() + ml.chains[string(chainID)] = newChainSupport(configManager, policyManager, backingLedger, consenters) + } + + for _, cs := range ml.chains { + cs.start() + } + + return ml +} + +// GetChain retrieves the chain support for a chain (and whether it exists) +func (ml *multiLedger) GetChain(chainID []byte) (ChainSupport, bool) { + cs, ok := ml.chains[string(chainID)] + return cs, ok +} + +func (ml *multiLedger) newResources(configTx *cb.Envelope) (configtx.Manager, policies.Manager, rawledger.ReadWriter) { + policyManager := policies.NewManagerImpl(xxxCryptoHelper{}) + configHandlerMap := make(map[cb.ConfigurationItem_ConfigurationType]configtx.Handler) + for ctype := range cb.ConfigurationItem_ConfigurationType_name { + rtype := cb.ConfigurationItem_ConfigurationType(ctype) + switch rtype { + case cb.ConfigurationItem_Policy: + configHandlerMap[rtype] = policyManager + default: + configHandlerMap[rtype] = configtx.NewBytesHandler() + } + } + + payload := &cb.Payload{} + err := proto.Unmarshal(configTx.Payload, payload) + if err != nil { + logger.Fatalf("Error unmarshaling a config transaction payload: %s", err) + } + + configEnvelope := &cb.ConfigurationEnvelope{} + err = proto.Unmarshal(payload.Data, configEnvelope) + if err != nil { + logger.Fatalf("Error unmarshaling a config transaction to config envelope: %s", err) + } + + configManager, err := configtx.NewConfigurationManager(configEnvelope, policyManager, configHandlerMap) + if err != nil { + logger.Fatalf("Error unpacking configuration transaction: %s", err) + } + + chainID := configManager.ChainID() + + ledger, err := ml.ledgerFactory.GetOrCreate(chainID) + if err != nil { + logger.Fatalf("Error getting ledger for %x", chainID) + } + + return configManager, policyManager, ledger +} diff --git a/orderer/multichain/manager_test.go b/orderer/multichain/manager_test.go new file mode 100644 index 00000000000..f7e398eed26 --- /dev/null +++ b/orderer/multichain/manager_test.go @@ -0,0 +1,157 @@ +/* +Copyright IBM Corp. 2016 All Rights Reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package multichain + +import ( + "fmt" + "reflect" + "testing" + "time" + + "github.com/hyperledger/fabric/orderer/common/bootstrap/static" + "github.com/hyperledger/fabric/orderer/common/util" + "github.com/hyperledger/fabric/orderer/rawledger/ramledger" + cb "github.com/hyperledger/fabric/protos/common" + ab "github.com/hyperledger/fabric/protos/orderer" +) + +var genesisBlock *cb.Block + +func init() { + var err error + genesisBlock, err = static.New().GenesisBlock() + if err != nil { + panic(err) + } +} + +func makeNormalTx(chainID []byte, i int) *cb.Envelope { + payload := &cb.Payload{ + Header: &cb.Header{ + ChainHeader: &cb.ChainHeader{ + Type: int32(cb.HeaderType_ENDORSER_TRANSACTION), + ChainID: chainID, + }, + }, + Data: []byte(fmt.Sprintf("%d", i)), + } + return &cb.Envelope{ + Payload: util.MarshalOrPanic(payload), + } +} + +func makeConfigTx(chainID []byte, i int) *cb.Envelope { + payload := &cb.Payload{ + Header: &cb.Header{ + ChainHeader: &cb.ChainHeader{ + Type: int32(cb.HeaderType_CONFIGURATION_TRANSACTION), + ChainID: chainID, + }, + }, + Data: util.MarshalOrPanic(&cb.ConfigurationEnvelope{ + Items: []*cb.SignedConfigurationItem{&cb.SignedConfigurationItem{ + ConfigurationItem: util.MarshalOrPanic(&cb.ConfigurationItem{ + Value: []byte(fmt.Sprintf("%d", i)), + }), + }}, + }), + } + return &cb.Envelope{ + Payload: util.MarshalOrPanic(payload), + } +} + +// Tests for a normal chain which contains 3 config transactions and other normal transactions to make sure the right one returned +func TestGetConfigTx(t *testing.T) { + _, rl := ramledger.New(10, genesisBlock) + for i := 0; i < 5; i++ { + rl.Append([]*cb.Envelope{makeNormalTx(static.TestChainID, i)}, nil) + } + rl.Append([]*cb.Envelope{makeConfigTx(static.TestChainID, 5)}, nil) + ctx := makeConfigTx(static.TestChainID, 6) + rl.Append([]*cb.Envelope{ctx}, nil) + rl.Append([]*cb.Envelope{makeNormalTx(static.TestChainID, 7)}, nil) + + pctx := getConfigTx(rl) + + if !reflect.DeepEqual(ctx, pctx) { + t.Fatalf("Did not select most recent config transaction") + } +} + +// Tests a chain which contains blocks with multi-transactions mixed with config txs, and a single tx which is not a config tx, none count as config blocks so nil should return +func TestGetConfigTxFailure(t *testing.T) { + _, rl := ramledger.New(10, genesisBlock) + for i := 0; i < 10; i++ { + rl.Append([]*cb.Envelope{ + makeNormalTx(static.TestChainID, i), + makeConfigTx(static.TestChainID, i), + }, nil) + } + rl.Append([]*cb.Envelope{makeNormalTx(static.TestChainID, 11)}, nil) + pctx := getConfigTx(rl) + + if pctx != nil { + t.Fatalf("Should not have found a configuration tx") + } +} + +// This test essentially brings the entire system up and is ultimately what main.go will replicate +func TestManagerImpl(t *testing.T) { + lf, rl := ramledger.New(10, genesisBlock) + + consenters := make(map[string]Consenter) + consenters["solo"] = &mockConsenter{} + + manager := NewManagerImpl(lf, consenters) + + _, ok := manager.GetChain([]byte("Fake")) + if ok { + t.Errorf("Should not have found a chain that was not created") + } + + chainSupport, ok := manager.GetChain(static.TestChainID) + + if !ok { + t.Fatalf("Should have gotten chain which was initialized by ramledger") + } + + messages := make([]*cb.Envelope, XXXBatchSize) + for i := 0; i < XXXBatchSize; i++ { + messages[i] = makeNormalTx(static.TestChainID, i) + } + + for _, message := range messages { + chainSupport.Chain().Enqueue(message) + } + + it, _ := rl.Iterator(ab.SeekInfo_SPECIFIED, 1) + select { + case <-it.ReadyChan(): + block, status := it.Next() + if status != cb.Status_SUCCESS { + t.Fatalf("Could not retrieve block") + } + for i := 0; i < XXXBatchSize; i++ { + if !reflect.DeepEqual(util.ExtractEnvelopeOrPanic(block, i), messages[i]) { + t.Errorf("Block contents wrong at index %d", i) + } + } + case <-time.After(time.Second): + t.Fatalf("Block 1 not produced after timeout") + } +}