Skip to content

Commit

Permalink
[FAB-1774] Use metadata field for orderer info
Browse files Browse the repository at this point in the history
https://jira.hyperledger.org/browse/FAB-1774

A consensus implementation may need to read the ledger and extract
metadata from it during operation (most likely when booting up).

As an example, the Kafka-based orderer, should read the offset of the
last envelope it placed into a block and wrote to the local ledger, and
should use that offset to resume consumption of chain's partition.

This changeset follows up on the work of FAB-1773 [1].

Specifically, it modifies:

1. The common components `Consenter` interface so that the
newly-introduced metadata field is passed on to the consensus
implementations via the `HandleChain` method.
2. The `WriteBlock` method of the `ConsenterSupport` interface so that
this metadata can be persisted to blocks.

It also adds relevant unit tests.

This is a precursor to FAB-1623 [2], which will add restart support to
the Kafka-based orderer.

Review starting point: fabric/orderer/multichain/chainsupport.go

[1] https://jira.hyperledger.org/browse/FAB-1773
[2] https://jira.hyperledger.org/browse/FAB-1623

Change-Id: I3d1c932eb30537f6a1aa8056b9d38550b17dee6d
Signed-off-by: Kostas Christidis <[email protected]>
  • Loading branch information
kchristidis committed Jan 21, 2017
1 parent d5a70d1 commit 4b0176a
Show file tree
Hide file tree
Showing 9 changed files with 109 additions and 45 deletions.
6 changes: 3 additions & 3 deletions orderer/kafka/orderer.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ type consenterImpl struct {
// HandleChain creates/returns a reference to a Chain for the given set of support resources.
// Implements the multichain.Consenter interface. Called by multichain.newChainSupport(), which
// is itself called by multichain.NewManagerImpl() when ranging over the ledgerFactory's existingChains.
func (co *consenterImpl) HandleChain(cs multichain.ConsenterSupport) (multichain.Chain, error) {
func (co *consenterImpl) HandleChain(cs multichain.ConsenterSupport, metadata *cb.Metadata) (multichain.Chain, error) {
return newChain(co, cs), nil
}

Expand Down Expand Up @@ -237,7 +237,7 @@ func (ch *chainImpl) loop() {
return
}
block := ch.support.CreateNextBlock(batch)
ch.support.WriteBlock(block, committers)
ch.support.WriteBlock(block, committers, nil)
ch.lastCutBlock++
logger.Debug("Proper time-to-cut received, just cut block", ch.lastCutBlock)
continue
Expand All @@ -264,7 +264,7 @@ func (ch *chainImpl) loop() {
// If !ok, batches == nil, so this will be skipped
for i, batch := range batches {
block := ch.support.CreateNextBlock(batch)
ch.support.WriteBlock(block, committers[i])
ch.support.WriteBlock(block, committers[i], nil)
ch.lastCutBlock++
logger.Debug("Batch filled, just cut block", ch.lastCutBlock)
}
Expand Down
3 changes: 2 additions & 1 deletion orderer/mocks/multichain/multichain.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,13 +69,14 @@ func (mcs *ConsenterSupport) CreateNextBlock(data []*cb.Envelope) *cb.Block {

// WriteBlock writes data to the Batches channel
// Note that _committers is ignored by this mock implementation
func (mcs *ConsenterSupport) WriteBlock(block *cb.Block, _committers []filter.Committer) *cb.Block {
func (mcs *ConsenterSupport) WriteBlock(block *cb.Block, _committers []filter.Committer, encodedMetadataValue []byte) *cb.Block {
logger.Debugf("mockWriter: attempting to write batch")
umtxs := make([]*cb.Envelope, len(block.Data.Data))
for i := range block.Data.Data {
umtxs[i] = utils.UnmarshalEnvelopeOrPanic(block.Data.Data[i])
}
mcs.Batches <- umtxs
block.Metadata.Metadata[cb.BlockMetadataIndex_ORDERER] = utils.MarshalOrPanic(&cb.Metadata{Value: encodedMetadataValue})
return block
}

Expand Down
25 changes: 21 additions & 4 deletions orderer/multichain/chainsupport.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,10 @@ type Consenter interface {
// HandleChain should create a return a reference to a Chain for the given set of resources
// It will only be invoked for a given chain once per process. In general, errors will be treated
// as irrecoverable and cause system shutdown. See the description of Chain for more details
HandleChain(support ConsenterSupport) (Chain, error)
// The second argument to HandleChain is a pointer to the metadata stored on the `ORDERER` slot of
// the last block committed to the ledger of this Chain. For a new chain, this metadata will be
// nil, as this field is not set on the genesis block
HandleChain(support ConsenterSupport, metadata *cb.Metadata) (Chain, error)
}

// Chain defines a way to inject messages for ordering
Expand Down Expand Up @@ -65,7 +68,7 @@ type ConsenterSupport interface {
BlockCutter() blockcutter.Receiver
SharedConfig() sharedconfig.Manager
CreateNextBlock(messages []*cb.Envelope) *cb.Block
WriteBlock(block *cb.Block, committers []filter.Committer) *cb.Block
WriteBlock(block *cb.Block, committers []filter.Committer, encodedMetadataValue []byte) *cb.Block
ChainID() string // ChainID returns the chain ID this specific consenter instance is associated with
}

Expand Down Expand Up @@ -128,7 +131,16 @@ func newChainSupport(
}

var err error
cs.chain, err = consenter.HandleChain(cs)

lastBlock := ordererledger.GetBlock(cs.Reader(), cs.Reader().Height()-1)
metadata, err := utils.GetMetadataFromBlock(lastBlock, cb.BlockMetadataIndex_ORDERER)
// Assuming a block created with cb.NewBlock(), this should not
// error even if the orderer metadata is an empty byte slice
if err != nil {
logger.Fatalf("Error extracting orderer metadata for chain %x: %s", configManager.ChainID(), err)
}

cs.chain, err = consenter.HandleChain(cs, metadata)
if err != nil {
logger.Fatalf("Error creating consenter for chain %x: %s", configManager.ChainID(), err)
}
Expand Down Expand Up @@ -253,11 +265,16 @@ func (cs *chainSupport) addLastConfigSignature(block *cb.Block) {
})
}

func (cs *chainSupport) WriteBlock(block *cb.Block, committers []filter.Committer) *cb.Block {
func (cs *chainSupport) WriteBlock(block *cb.Block, committers []filter.Committer, encodedMetadataValue []byte) *cb.Block {
for _, committer := range committers {
committer.Commit()
}

// Set the orderer-related metadata field
if encodedMetadataValue != nil {
block.Metadata.Metadata[cb.BlockMetadataIndex_ORDERER] = utils.MarshalOrPanic(&cb.Metadata{Value: encodedMetadataValue})
}

cs.addBlockSignature(block)
cs.addLastConfigSignature(block)

Expand Down
48 changes: 26 additions & 22 deletions orderer/multichain/chainsupport_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"reflect"
"testing"

"github.com/golang/protobuf/proto"
"github.com/hyperledger/fabric/orderer/common/filter"
ordererledger "github.com/hyperledger/fabric/orderer/ledger"
mockconfigtx "github.com/hyperledger/fabric/orderer/mocks/configtx"
Expand Down Expand Up @@ -68,7 +69,7 @@ func TestCommitConfig(t *testing.T) {
txs := []*cb.Envelope{makeNormalTx("foo", 0), makeNormalTx("bar", 1)}
committers := []filter.Committer{&mockCommitter{}, &mockCommitter{}}
block := cs.CreateNextBlock(txs)
cs.WriteBlock(block, committers)
cs.WriteBlock(block, committers, nil)

blockTXs := make([]*cb.Envelope, len(ml.data))
for i := range ml.data {
Expand All @@ -91,48 +92,51 @@ func TestWriteBlockSignatures(t *testing.T) {
cm := &mockconfigtx.Manager{}
cs := &chainSupport{ledger: ml, configManager: cm, signer: &xxxCryptoHelper{}}

blockMetadata := func(block *cb.Block) *cb.Metadata {
metadata, err := utils.GetMetadataFromBlock(block, cb.BlockMetadataIndex_SIGNATURES)
if err != nil {
panic(err)
}
return metadata
}

if blockMetadata(cs.WriteBlock(cb.NewBlock(0, nil), nil)) == nil {
if utils.GetMetadataFromBlockOrPanic(cs.WriteBlock(cb.NewBlock(0, nil), nil, nil), cb.BlockMetadataIndex_SIGNATURES) == nil {
t.Fatalf("Block should have block signature")
}
}

func TestWriteLastConfiguration(t *testing.T) {
func TestWriteBlockOrdererMetadata(t *testing.T) {
ml := &mockLedgerReadWriter{}
cm := &mockconfigtx.Manager{}
cs := &chainSupport{ledger: ml, configManager: cm, signer: &xxxCryptoHelper{}}

lastConfig := func(block *cb.Block) uint64 {
index, err := utils.GetLastConfigurationIndexFromBlock(block)
if err != nil {
panic(err)
}
return index
value := []byte("foo")
expected := &cb.Metadata{Value: value}
actual := utils.GetMetadataFromBlockOrPanic(cs.WriteBlock(cb.NewBlock(0, nil), nil, value), cb.BlockMetadataIndex_ORDERER)

if actual == nil {
t.Fatalf("Block should have orderer metadata written")
}
if !proto.Equal(expected, actual) {
t.Fatalf("Orderer metadata not written to block correctly")
}

}

func TestWriteLastConfiguration(t *testing.T) {
ml := &mockLedgerReadWriter{}
cm := &mockconfigtx.Manager{}
cs := &chainSupport{ledger: ml, configManager: cm, signer: &xxxCryptoHelper{}}

expected := uint64(0)
if lc := lastConfig(cs.WriteBlock(cb.NewBlock(0, nil), nil)); lc != expected {

if lc := utils.GetLastConfigurationIndexFromBlockOrPanic(cs.WriteBlock(cb.NewBlock(0, nil), nil, nil)); lc != expected {
t.Fatalf("First block should have config block index of %d, but got %d", expected, lc)
}

if lc := lastConfig(cs.WriteBlock(cb.NewBlock(1, nil), nil)); lc != expected {
if lc := utils.GetLastConfigurationIndexFromBlockOrPanic(cs.WriteBlock(cb.NewBlock(1, nil), nil, nil)); lc != expected {
t.Fatalf("Second block should have config block index of %d, but got %d", expected, lc)
}

cm.SequenceVal = 1
expected = uint64(2)
if lc := lastConfig(cs.WriteBlock(cb.NewBlock(2, nil), nil)); lc != expected {

if lc := utils.GetLastConfigurationIndexFromBlockOrPanic(cs.WriteBlock(cb.NewBlock(2, nil), nil, nil)); lc != expected {
t.Fatalf("Second block should have config block index of %d, but got %d", expected, lc)
}

if lc := lastConfig(cs.WriteBlock(cb.NewBlock(3, nil), nil)); lc != expected {
if lc := utils.GetLastConfigurationIndexFromBlockOrPanic(cs.WriteBlock(cb.NewBlock(3, nil), nil, nil)); lc != expected {
t.Fatalf("Second block should have config block index of %d, but got %d", expected, lc)
}

Expand Down
22 changes: 12 additions & 10 deletions orderer/multichain/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,20 +27,22 @@ import (
type mockConsenter struct {
}

func (mc *mockConsenter) HandleChain(support ConsenterSupport) (Chain, error) {
func (mc *mockConsenter) HandleChain(support ConsenterSupport, metadata *cb.Metadata) (Chain, error) {
return &mockChain{
queue: make(chan *cb.Envelope),
cutter: support.BlockCutter(),
support: support,
done: make(chan struct{}),
queue: make(chan *cb.Envelope),
cutter: support.BlockCutter(),
support: support,
metadata: metadata,
done: make(chan struct{}),
}, nil
}

type mockChain struct {
queue chan *cb.Envelope
support ConsenterSupport
cutter blockcutter.Receiver
done chan struct{}
queue chan *cb.Envelope
cutter blockcutter.Receiver
support ConsenterSupport
metadata *cb.Metadata
done chan struct{}
}

func (mch *mockChain) Enqueue(env *cb.Envelope) bool {
Expand All @@ -59,7 +61,7 @@ func (mch *mockChain) Start() {
batches, committers, _ := mch.cutter.Ordered(msg)
for i, batch := range batches {
block := mch.support.CreateNextBlock(batch)
mch.support.WriteBlock(block, committers[i])
mch.support.WriteBlock(block, committers[i], nil)
}
}
}()
Expand Down
2 changes: 1 addition & 1 deletion orderer/sbft/main/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ func New() multichain.Consenter {
}

// HandleChain creates/returns a reference to a Chain for the given set of support resources.
func (solo *consenter) HandleChain(support multichain.ConsenterSupport) (multichain.Chain, error) {
func (solo *consenter) HandleChain(support multichain.ConsenterSupport, metadata *cb.Metadata) (multichain.Chain, error) {
return newChain(support), nil
}

Expand Down
6 changes: 3 additions & 3 deletions orderer/solo/consensus.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func New() multichain.Consenter {
return &consenter{}
}

func (solo *consenter) HandleChain(support multichain.ConsenterSupport) (multichain.Chain, error) {
func (solo *consenter) HandleChain(support multichain.ConsenterSupport, metadata *cb.Metadata) (multichain.Chain, error) {
return newChain(support), nil
}

Expand Down Expand Up @@ -92,7 +92,7 @@ func (ch *chain) main() {
}
for i, batch := range batches {
block := ch.support.CreateNextBlock(batch)
ch.support.WriteBlock(block, committers[i])
ch.support.WriteBlock(block, committers[i], nil)
}
if len(batches) > 0 {
timer = nil
Expand All @@ -108,7 +108,7 @@ func (ch *chain) main() {
}
logger.Debugf("Batch timer expired, creating block")
block := ch.support.CreateNextBlock(batch)
ch.support.WriteBlock(block, committers)
ch.support.WriteBlock(block, committers, nil)
case <-ch.exitChan:
logger.Debugf("Exiting")
return
Expand Down
26 changes: 25 additions & 1 deletion protos/utils/blockutils.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func GetChainIDFromBlock(block *cb.Block) (string, error) {
return payload.Header.ChainHeader.ChainID, nil
}

// GetMetadataFromBlock retrieves metadata at the specified index
// GetMetadataFromBlock retrieves metadata at the specified index.
func GetMetadataFromBlock(block *cb.Block, index cb.BlockMetadataIndex) (*cb.Metadata, error) {
md := &cb.Metadata{}
err := proto.Unmarshal(block.Metadata.Metadata[index], md)
Expand All @@ -55,6 +55,16 @@ func GetMetadataFromBlock(block *cb.Block, index cb.BlockMetadataIndex) (*cb.Met
return md, nil
}

// GetMetadataFromBlockOrPanic retrieves metadata at the specified index, or panics on error.
func GetMetadataFromBlockOrPanic(block *cb.Block, index cb.BlockMetadataIndex) *cb.Metadata {
md := &cb.Metadata{}
err := proto.Unmarshal(block.Metadata.Metadata[index], md)
if err != nil {
panic(err)
}
return md
}

// GetLastConfigurationIndexFromBlock retrieves the index of the last configuration block as encoded in the block metadata
func GetLastConfigurationIndexFromBlock(block *cb.Block) (uint64, error) {
md, err := GetMetadataFromBlock(block, cb.BlockMetadataIndex_LAST_CONFIGURATION)
Expand All @@ -69,6 +79,20 @@ func GetLastConfigurationIndexFromBlock(block *cb.Block) (uint64, error) {
return lc.Index, nil
}

// GetLastConfigurationIndexFromBlockOrPanic retrieves the index of the last configuration block as encoded in the block metadata, or panics on error.
func GetLastConfigurationIndexFromBlockOrPanic(block *cb.Block) uint64 {
md, err := GetMetadataFromBlock(block, cb.BlockMetadataIndex_LAST_CONFIGURATION)
if err != nil {
panic(err)
}
lc := &cb.LastConfiguration{}
err = proto.Unmarshal(md.Value, lc)
if err != nil {
panic(err)
}
return lc.Index
}

// GetBlockFromBlockBytes marshals the bytes into Block
func GetBlockFromBlockBytes(blockBytes []byte) (*cb.Block, error) {
block := &cb.Block{}
Expand Down
16 changes: 16 additions & 0 deletions protos/utils/blockutils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (

configtxtest "github.com/hyperledger/fabric/common/configtx/test"
"github.com/hyperledger/fabric/protos/common"
cb "github.com/hyperledger/fabric/protos/common"
"github.com/hyperledger/fabric/protos/utils"
)

Expand Down Expand Up @@ -68,3 +69,18 @@ func TestGetBlockFromBlockBytes(t *testing.T) {
t.Fatalf("failed to get block from block bytes: %s", err)
}
}

func TestGetMetadataFromNewBlock(t *testing.T) {
block := common.NewBlock(0, nil)
md, err := utils.GetMetadataFromBlock(block, cb.BlockMetadataIndex_ORDERER)
if err != nil {
t.Fatal("Expected no error when extracting metadata from new block")
}
if md.Value != nil {
t.Fatal("Expected metadata field value to be nil, got", md.Value)
}
if len(md.Value) > 0 {
t.Fatal("Expected length of metadata field value to be 0, got", len(md.Value))
}

}

0 comments on commit 4b0176a

Please sign in to comment.