Skip to content

Commit

Permalink
[FAB-17479] Migrated Kafka cluster can be safely expanded later (#642)
Browse files Browse the repository at this point in the history
* [FAB-17479] Migrated Kafka cluster can be safely expanded later

When new orderer nodes are added to a migrated Kafka cluster,
but are not added to all channels:

- The genesis block if written to the ledger to track future addition.
- However, the genesis block is a Kafka one, and therefore the orderer
  tries (wrongly) to launch a Kafka chain which fails.
- In addition, the orderer never tries to track future addition
  of the said channel, because this mechanism is only activated
  for Raft orderers.

This change set:
- Makes the Kafka chain activation be skipped in case
  that the orderer was migrated.
- Starts tracking future addition of the orderer to the channel.

An integration test is included.

Change-Id: Ib3598998073e884747b5966df72b70cac572b1c5
Signed-off-by: yacovm <[email protected]>

* Address code review comments

Change-Id: I9ae363c164e4221fe9095f587bb32fc329528f2a
Signed-off-by: yacovm <[email protected]>
  • Loading branch information
yacovm authored Feb 13, 2020
1 parent 5eaae3a commit e75bc71
Show file tree
Hide file tree
Showing 8 changed files with 130 additions and 33 deletions.
71 changes: 71 additions & 0 deletions integration/e2e/migration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/onsi/gomega/gexec"
"github.com/tedsuo/ifrit"
"github.com/tedsuo/ifrit/ginkgomon"
"github.com/tedsuo/ifrit/grouper"
)

var _ = Describe("Kafka2RaftMigration", func() {
Expand Down Expand Up @@ -580,6 +581,9 @@ var _ = Describe("Kafka2RaftMigration", func() {
// with a three orderers, a system channel and two standard channels.
// It then restarts the orderers onto a Raft-based system, and verifies that the
// newly restarted orderers perform as expected.
// Afterwards, it adds a new orderer to the system channel and ensures
// that it can successfully autonomously join application channels
// if added to them later on.
It("executes bootstrap to raft - multi node", func() {
//=== Step 1: Config update on system channel, MAINTENANCE ===
By("1) Config update on system channel, State=MAINTENANCE")
Expand Down Expand Up @@ -712,6 +716,73 @@ var _ = Describe("Kafka2RaftMigration", func() {
RunExpectQueryRetry(network, peer, channel2, 100)
RunExpectQueryInvokeQuery(network, o1, peer, channel2, 100)
RunExpectQueryInvokeQuery(network, o1, peer, channel2, 90)

By("Extending the network configuration to add a new orderer")
o4 := &nwo.Orderer{
Name: "orderer4",
Organization: "OrdererOrg",
}
ports := nwo.Ports{}
for _, portName := range nwo.OrdererPortNames() {
ports[portName] = network.ReservePort()
}
network.PortsByOrdererID[o4.ID()] = ports
network.Orderers = append(network.Orderers, o4)
network.GenerateOrdererConfig(o4)
extendNetwork(network)

fourthOrdererCertificatePath := filepath.Join(network.OrdererLocalTLSDir(o4), "server.crt")
fourthOrdererCertificate, err := ioutil.ReadFile(fourthOrdererCertificatePath)
Expect(err).NotTo(HaveOccurred())

By("Adding the fourth orderer to the system channel")
nwo.AddConsenter(network, peer, o1, "systemchannel", protosraft.Consenter{
ServerTlsCert: fourthOrdererCertificate,
ClientTlsCert: fourthOrdererCertificate,
Host: "127.0.0.1",
Port: uint32(network.OrdererPort(o4, nwo.ClusterPort)),
})

By("Obtaining the last config block from an orderer")
// Get the last config block of the system channel
configBlock := nwo.GetConfigBlock(network, peer, o1, "systemchannel")
// Plant it in the file system of orderer2, the new node to be onboarded.
err = ioutil.WriteFile(filepath.Join(testDir, "systemchannel_block.pb"), utils.MarshalOrPanic(configBlock), 0644)
Expect(err).NotTo(HaveOccurred())

By("Launching the fourth orderer")
o4Runner := network.OrdererRunner(o4)
o4Process := ifrit.Invoke(grouper.Member{Name: o4.ID(), Runner: o4Runner})

defer func() {
o4Process.Signal(syscall.SIGTERM)
Eventually(o4Process.Wait(), network.EventuallyTimeout).Should(Receive())
}()

Eventually(o4Process.Ready()).Should(BeClosed())

By("Waiting for the orderer to figure out it was migrated")
Eventually(o4Runner.Err(), time.Minute, time.Second).Should(gbytes.Say("This node was migrated from Kafka to Raft, skipping activation of Kafka chain"))

By("Adding the fourth orderer to the application channel")
nwo.AddConsenter(network, peer, o1, channel1, protosraft.Consenter{
ServerTlsCert: fourthOrdererCertificate,
ClientTlsCert: fourthOrdererCertificate,
Host: "127.0.0.1",
Port: uint32(network.OrdererPort(o4, nwo.ClusterPort)),
})

chan1BlockNum = nwo.CurrentConfigBlockNumber(network, peer, o1, channel1)

By("Ensuring the added orderer has synced the application channel")
assertBlockReception(
map[string]int{
channel1: int(chan1BlockNum),
},
[]*nwo.Orderer{o4},
peer,
network,
)
})
})

Expand Down
15 changes: 10 additions & 5 deletions orderer/common/server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -665,15 +665,19 @@ func initializeMultichannelRegistrar(

registrar := multichannel.NewRegistrar(*conf, lf, signer, metricsProvider, callbacks...)

var icr etcdraft.InactiveChainRegistry
if isClusterType(bootstrapBlock) {
etcdConsenter := initializeEtcdraftConsenter(consenters, conf, lf, clusterDialer, bootstrapBlock, ri, srvConf, srv, registrar, metricsProvider)
icr = etcdConsenter.InactiveChainRegistry
}

consenters["solo"] = solo.New()
var kafkaMetrics *kafka.Metrics
consenters["kafka"], kafkaMetrics = kafka.New(conf.Kafka, metricsProvider, healthChecker)
consenters["kafka"], kafkaMetrics = kafka.New(conf.Kafka, metricsProvider, healthChecker, icr, registrar.CreateChain)
// Note, we pass a 'nil' channel here, we could pass a channel that
// closes if we wished to cleanup this routine on exit.
go kafkaMetrics.PollGoMetricsUntilStop(time.Minute, nil)
if isClusterType(bootstrapBlock) {
initializeEtcdraftConsenter(consenters, conf, lf, clusterDialer, bootstrapBlock, ri, srvConf, srv, registrar, metricsProvider)
}

registrar.Initialize(consenters)
return registrar
}
Expand All @@ -689,7 +693,7 @@ func initializeEtcdraftConsenter(
srv *comm.GRPCServer,
registrar *multichannel.Registrar,
metricsProvider metrics.Provider,
) {
) *etcdraft.Consenter {
replicationRefreshInterval := conf.General.Cluster.ReplicationBackgroundRefreshInterval
if replicationRefreshInterval == 0 {
replicationRefreshInterval = defaultReplicationBackgroundRefreshInterval
Expand Down Expand Up @@ -729,6 +733,7 @@ func initializeEtcdraftConsenter(
go icr.run()
raftConsenter := etcdraft.New(clusterDialer, conf, srvConf, srv, registrar, icr, metricsProvider)
consenters["etcdraft"] = raftConsenter
return raftConsenter
}

func newOperationsSystem(ops localconfig.Operations, metrics localconfig.Metrics) *operations.System {
Expand Down
2 changes: 1 addition & 1 deletion orderer/common/server/onboarding.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ type chainCreation struct {

// TrackChain tracks a chain with the given name, and calls the given callback
// when this chain should be activated.
func (dc *inactiveChainReplicator) TrackChain(chain string, genesisBlock *common.Block, createChainCallback etcdraft.CreateChainCallback) {
func (dc *inactiveChainReplicator) TrackChain(chain string, genesisBlock *common.Block, createChainCallback func()) {
dc.lock.Lock()
defer dc.lock.Unlock()

Expand Down
5 changes: 1 addition & 4 deletions orderer/consensus/etcdraft/consenter.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,16 +30,13 @@ import (
"go.etcd.io/etcd/raft"
)

// CreateChainCallback creates a new chain
type CreateChainCallback func()

//go:generate mockery -dir . -name InactiveChainRegistry -case underscore -output mocks

// InactiveChainRegistry registers chains that are inactive
type InactiveChainRegistry interface {
// TrackChain tracks a chain with the given name, and calls the given callback
// when this chain should be created.
TrackChain(chainName string, genesisBlock *common.Block, createChain CreateChainCallback)
TrackChain(chainName string, genesisBlock *common.Block, createChain func())
}

//go:generate mockery -dir . -name ChainGetter -case underscore -output mocks
Expand Down
10 changes: 6 additions & 4 deletions orderer/consensus/etcdraft/mocks/inactive_chain_registry.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 3 additions & 3 deletions orderer/consensus/kafka/chain_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3485,7 +3485,7 @@ func TestDeliverSession(t *testing.T) {
defer env.broker2.Close()

// initialize consenter
consenter, _ := New(mockLocalConfig.Kafka, &disabled.Provider{}, &mockkafka.HealthChecker{})
consenter, _ := New(mockLocalConfig.Kafka, &disabled.Provider{}, &mockkafka.HealthChecker{}, nil, func(string) {})

// initialize chain
metadata := &cb.Metadata{Value: utils.MarshalOrPanic(&ab.KafkaMetadata{LastOffsetPersisted: env.height})}
Expand Down Expand Up @@ -3574,7 +3574,7 @@ func TestDeliverSession(t *testing.T) {
defer env.broker0.Close()

// initialize consenter
consenter, _ := New(mockLocalConfig.Kafka, &disabled.Provider{}, &mockkafka.HealthChecker{})
consenter, _ := New(mockLocalConfig.Kafka, &disabled.Provider{}, &mockkafka.HealthChecker{}, nil, func(string) {})

// initialize chain
metadata := &cb.Metadata{Value: utils.MarshalOrPanic(&ab.KafkaMetadata{LastOffsetPersisted: env.height})}
Expand Down Expand Up @@ -3636,7 +3636,7 @@ func TestDeliverSession(t *testing.T) {
defer env.broker0.Close()

// initialize consenter
consenter, _ := New(mockLocalConfig.Kafka, &disabled.Provider{}, &mockkafka.HealthChecker{})
consenter, _ := New(mockLocalConfig.Kafka, &disabled.Provider{}, &mockkafka.HealthChecker{}, nil, func(string) {})

// initialize chain
metadata := &cb.Metadata{Value: utils.MarshalOrPanic(&ab.KafkaMetadata{LastOffsetPersisted: env.height})}
Expand Down
50 changes: 36 additions & 14 deletions orderer/consensus/kafka/consenter.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,10 @@ import (
"github.com/hyperledger/fabric/common/metrics"
"github.com/hyperledger/fabric/orderer/common/localconfig"
"github.com/hyperledger/fabric/orderer/consensus"
"github.com/hyperledger/fabric/orderer/consensus/inactive"
cb "github.com/hyperledger/fabric/protos/common"
"github.com/op/go-logging"
"github.com/pkg/errors"
)

//go:generate counterfeiter -o mock/health_checker.go -fake-name HealthChecker . healthChecker
Expand All @@ -24,7 +26,7 @@ type healthChecker interface {
}

// New creates a Kafka-based consenter. Called by orderer's main.go.
func New(config localconfig.Kafka, metricsProvider metrics.Provider, healthChecker healthChecker) (consensus.Consenter, *Metrics) {
func New(config localconfig.Kafka, mp metrics.Provider, healthChecker healthChecker, icr InactiveChainRegistry, mkChain func(string)) (consensus.Consenter, *Metrics) {
if config.Verbose {
logging.SetLevel(logging.DEBUG, "orderer.consensus.kafka.sarama")
}
Expand All @@ -36,13 +38,15 @@ func New(config localconfig.Kafka, metricsProvider metrics.Provider, healthCheck
config.Version,
defaultPartition)

metrics := NewMetrics(metricsProvider, brokerConfig.MetricRegistry)
metrics := NewMetrics(mp, brokerConfig.MetricRegistry)

return &consenterImpl{
brokerConfigVal: brokerConfig,
tlsConfigVal: config.TLS,
retryOptionsVal: config.Retry,
kafkaVersionVal: config.Version,
mkChain: mkChain,
inactiveChainRegistry: icr,
brokerConfigVal: brokerConfig,
tlsConfigVal: config.TLS,
retryOptionsVal: config.Retry,
kafkaVersionVal: config.Version,
topicDetailVal: &sarama.TopicDetail{
NumPartitions: 1,
ReplicationFactor: config.Topic.ReplicationFactor,
Expand All @@ -52,18 +56,26 @@ func New(config localconfig.Kafka, metricsProvider metrics.Provider, healthCheck
}, metrics
}

// InactiveChainRegistry registers chains that are inactive
type InactiveChainRegistry interface {
// TrackChain tracks a chain with the given name, and calls the given callback
// when this chain should be created.
TrackChain(chainName string, genesisBlock *cb.Block, createChain func())
}

// consenterImpl holds the implementation of type that satisfies the
// consensus.Consenter interface --as the HandleChain contract requires-- and
// the commonConsenter one.
type consenterImpl struct {
brokerConfigVal *sarama.Config
tlsConfigVal localconfig.TLS
retryOptionsVal localconfig.Retry
kafkaVersionVal sarama.KafkaVersion
topicDetailVal *sarama.TopicDetail
metricsProvider metrics.Provider
healthChecker healthChecker
metrics *Metrics
mkChain func(string)
brokerConfigVal *sarama.Config
tlsConfigVal localconfig.TLS
retryOptionsVal localconfig.Retry
kafkaVersionVal sarama.KafkaVersion
topicDetailVal *sarama.TopicDetail
healthChecker healthChecker
metrics *Metrics
inactiveChainRegistry InactiveChainRegistry
}

// HandleChain creates/returns a reference to a consensus.Chain object for the
Expand All @@ -72,6 +84,16 @@ type consenterImpl struct {
// multichannel.NewManagerImpl() when ranging over the ledgerFactory's
// existingChains.
func (consenter *consenterImpl) HandleChain(support consensus.ConsenterSupport, metadata *cb.Metadata) (consensus.Chain, error) {

// Check if this node was migrated from Raft
if consenter.inactiveChainRegistry != nil {
logger.Infof("This node was migrated from Kafka to Raft, skipping activation of Kafka chain")
consenter.inactiveChainRegistry.TrackChain(support.ChainID(), support.Block(0), func() {
consenter.mkChain(support.ChainID())
})
return &inactive.Chain{Err: errors.Errorf("channel %s is not serviced by me", support.ChainID())}, nil
}

lastOffsetPersisted, lastOriginalOffsetProcessed, lastResubmittedConfigOffset := getOffsets(metadata.Value, support.ChainID())
ch, err := newChain(consenter, support, lastOffsetPersisted, lastOriginalOffsetProcessed, lastResubmittedConfigOffset)
if err != nil {
Expand Down
4 changes: 2 additions & 2 deletions orderer/consensus/kafka/consenter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,12 +72,12 @@ func init() {
}

func TestNew(t *testing.T) {
c, _ := New(mockLocalConfig.Kafka, &mock.MetricsProvider{}, &mock.HealthChecker{})
c, _ := New(mockLocalConfig.Kafka, &mock.MetricsProvider{}, &mock.HealthChecker{}, nil, func(string) {})
_ = consensus.Consenter(c)
}

func TestHandleChain(t *testing.T) {
consenter, _ := New(mockLocalConfig.Kafka, &disabled.Provider{}, &mock.HealthChecker{})
consenter, _ := New(mockLocalConfig.Kafka, &disabled.Provider{}, &mock.HealthChecker{}, nil, func(string) {})

oldestOffset := int64(0)
newestOffset := int64(5)
Expand Down

0 comments on commit e75bc71

Please sign in to comment.