Skip to content

Commit

Permalink
Do not require reboot when re-adding consenter
Browse files Browse the repository at this point in the history
FAB-13552

Change-Id: I5d77dd613db3ad4d1296f2e8b960859a785f04ec
Signed-off-by: Jay Guo <[email protected]>
Signed-off-by: Jason Yellick <[email protected]>
  • Loading branch information
guoger authored and Jason Yellick committed Nov 11, 2019
1 parent 3257ed6 commit 07db0a8
Show file tree
Hide file tree
Showing 7 changed files with 53 additions and 19 deletions.
26 changes: 22 additions & 4 deletions integration/e2e/etcdraft_reconfig_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -687,10 +687,10 @@ var _ = Describe("EndToEnd reconfiguration and onboarding", func() {
firstEvictedNode := findLeader(ordererRunners) - 1

By("Removing the leader from 3-node channel")
serverCertBytes, err := ioutil.ReadFile(filepath.Join(network.OrdererLocalTLSDir(orderers[firstEvictedNode]), "server.crt"))
server1CertBytes, err := ioutil.ReadFile(filepath.Join(network.OrdererLocalTLSDir(orderers[firstEvictedNode]), "server.crt"))
Expect(err).To(Not(HaveOccurred()))

nwo.RemoveConsenter(network, peer, network.Orderers[(firstEvictedNode+1)%3], "systemchannel", serverCertBytes)
nwo.RemoveConsenter(network, peer, network.Orderers[(firstEvictedNode+1)%3], "systemchannel", server1CertBytes)
fmt.Fprintln(GinkgoWriter, "Ensuring the other orderers detect the eviction of the node on channel", "systemchannel")
Eventually(ordererRunners[(firstEvictedNode+1)%3].Err(), network.EventuallyTimeout, time.Second).Should(gbytes.Say("Deactivated node"))
Eventually(ordererRunners[(firstEvictedNode+2)%3].Err(), network.EventuallyTimeout, time.Second).Should(gbytes.Say("Deactivated node"))
Expand Down Expand Up @@ -722,10 +722,10 @@ var _ = Describe("EndToEnd reconfiguration and onboarding", func() {
}

By("Removing the leader from 2-node channel")
serverCertBytes, err = ioutil.ReadFile(filepath.Join(network.OrdererLocalTLSDir(orderers[secondEvictedNode]), "server.crt"))
server2CertBytes, err := ioutil.ReadFile(filepath.Join(network.OrdererLocalTLSDir(orderers[secondEvictedNode]), "server.crt"))
Expect(err).To(Not(HaveOccurred()))

nwo.RemoveConsenter(network, peer, orderers[surviver], "systemchannel", serverCertBytes)
nwo.RemoveConsenter(network, peer, orderers[surviver], "systemchannel", server2CertBytes)
fmt.Fprintln(GinkgoWriter, "Ensuring the other orderer detect the eviction of the node on channel", "systemchannel")
Eventually(ordererRunners[secondEvictedNode].Err(), network.EventuallyTimeout, time.Second).Should(gbytes.Say(stopMSg))

Expand All @@ -734,6 +734,24 @@ var _ = Describe("EndToEnd reconfiguration and onboarding", func() {

By("Asserting the only remaining node elects itself")
findLeader([]*ginkgomon.Runner{ordererRunners[surviver]})

By("Re-adding first evicted orderer")
nwo.AddConsenter(network, peer, network.Orderers[surviver], "systemchannel", etcdraft.Consenter{
Host: "127.0.0.1",
Port: uint32(network.OrdererPort(orderers[firstEvictedNode], nwo.ClusterPort)),
ClientTlsCert: server1CertBytes,
ServerTlsCert: server1CertBytes,
})

By("Ensuring re-added orderer starts serving system channel")
assertBlockReception(map[string]int{
"systemchannel": 3,
}, []*nwo.Orderer{orderers[firstEvictedNode]}, peer, network)

env := CreateBroadcastEnvelope(network, orderers[secondEvictedNode], network.SystemChannel.Name, []byte("foo"))
resp, err := Broadcast(network, orderers[surviver], env)
Expect(err).NotTo(HaveOccurred())
Expect(resp.Status).To(Equal(common.Status_SUCCESS))
})

It("notices it even if it is down at the time of its eviction", func() {
Expand Down
9 changes: 9 additions & 0 deletions orderer/common/cluster/replication.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,15 @@ func (r *Replicator) ReplicateChains() []string {
if err != nil {
r.Logger.Panicf("Failed to create a ledger for channel %s: %v", channel.ChannelName, err)
}

if channel.GenesisBlock == nil {
if ledger.Height() == 0 {
r.Logger.Panicf("Expecting channel %s to at least contain genesis block, but it doesn't", channel.ChannelName)
}

continue
}

gb, err := ChannelCreationBlockToGenesisBlock(channel.GenesisBlock)
if err != nil {
r.Logger.Panicf("Failed converting channel creation block for channel %s to genesis block: %v",
Expand Down
4 changes: 1 addition & 3 deletions orderer/common/server/onboarding.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,11 +183,9 @@ type chainCreation struct {
// TrackChain tracks a chain with the given name, and calls the given callback
// when this chain should be activated.
func (dc *inactiveChainReplicator) TrackChain(chain string, genesisBlock *common.Block, createChainCallback etcdraft.CreateChainCallback) {
if genesisBlock == nil {
dc.logger.Panicf("Called with a nil genesis block")
}
dc.lock.Lock()
defer dc.lock.Unlock()

dc.logger.Infof("Adding %s to the set of chains to track", chain)
dc.chains2CreationCallbacks[chain] = chainCreation{
genesisBlock: genesisBlock,
Expand Down
14 changes: 4 additions & 10 deletions orderer/common/server/onboarding_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -846,18 +846,12 @@ func TestInactiveChainReplicatorChannels(t *testing.T) {
chains2CreationCallbacks: make(map[string]chainCreation),
}
icr.TrackChain("foo", &common.Block{}, func() {})
assert.Contains(t, icr.Channels(), cluster.ChannelGenesisBlock{ChannelName: "foo", GenesisBlock: &common.Block{}})

assert.Equal(t, []cluster.ChannelGenesisBlock{{ChannelName: "foo", GenesisBlock: &common.Block{}}}, icr.Channels())
icr.Close()
}
icr.TrackChain("bar", nil, func() {})
assert.Contains(t, icr.Channels(), cluster.ChannelGenesisBlock{ChannelName: "bar", GenesisBlock: nil})

func TestTrackChainNilGenesisBlock(t *testing.T) {
icr := &inactiveChainReplicator{
logger: flogging.MustGetLogger("test"),
}
assert.PanicsWithValue(t, "Called with a nil genesis block", func() {
icr.TrackChain("foo", nil, func() {})
})
icr.Close()
}

func TestLedgerFactory(t *testing.T) {
Expand Down
8 changes: 8 additions & 0 deletions orderer/consensus/etcdraft/chain.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,8 @@ type Chain struct {
logger *flogging.FabricLogger

periodicChecker *PeriodicCheck

haltCallback func()
}

// NewChain constructs a chain object.
Expand All @@ -200,6 +202,7 @@ func NewChain(
conf Configurator,
rpc RPC,
f CreateBlockPuller,
haltCallback func(),
observeC chan<- raft.SoftState) (*Chain, error) {

lg := opts.Logger.With("channel", support.ChainID(), "node", opts.RaftID)
Expand Down Expand Up @@ -258,6 +261,7 @@ func NewChain(
confState: cc,
createPuller: f,
clock: opts.Clock,
haltCallback: haltCallback,
Metrics: &Metrics{
ClusterSize: opts.Metrics.ClusterSize.With("channel", support.ChainID()),
IsLeader: opts.Metrics.IsLeader.With("channel", support.ChainID()),
Expand Down Expand Up @@ -484,6 +488,10 @@ func (c *Chain) Halt() {
return
}
<-c.doneC

if c.haltCallback != nil {
c.haltCallback()
}
}

func (c *Chain) isRunning() error {
Expand Down
8 changes: 6 additions & 2 deletions orderer/consensus/etcdraft/chain_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ var _ = Describe("Chain", func() {
}

JustBeforeEach(func() {
chain, err = etcdraft.NewChain(support, opts, configurator, nil, noOpBlockPuller, observeC)
chain, err = etcdraft.NewChain(support, opts, configurator, nil, noOpBlockPuller, nil, observeC)
Expect(err).NotTo(HaveOccurred())

chain.Start()
Expand Down Expand Up @@ -850,7 +850,7 @@ var _ = Describe("Chain", func() {
os.Chmod(path.Join(walDir, f.Name()), 0300)
}

c, err := etcdraft.NewChain(support, opts, configurator, nil, noOpBlockPuller, observeC)
c, err := etcdraft.NewChain(support, opts, configurator, nil, noOpBlockPuller, nil, observeC)
Expect(c).To(BeNil())
Expect(err).To(MatchError(ContainSubstring("permission denied")))
})
Expand Down Expand Up @@ -1266,6 +1266,7 @@ var _ = Describe("Chain", func() {
configurator,
nil,
nil,
nil,
observeC)
Expect(chain).NotTo(BeNil())
Expect(err).NotTo(HaveOccurred())
Expand Down Expand Up @@ -1298,6 +1299,7 @@ var _ = Describe("Chain", func() {
nil,
nil,
noOpBlockPuller,
nil,
nil)
Expect(chain).NotTo(BeNil())
Expect(err).ToNot(HaveOccurred())
Expand Down Expand Up @@ -1326,6 +1328,7 @@ var _ = Describe("Chain", func() {
nil,
nil,
noOpBlockPuller,
nil,
nil)
Expect(chain).To(BeNil())
Expect(err).To(MatchError(ContainSubstring("failed to initialize WAL: mkdir")))
Expand Down Expand Up @@ -3610,6 +3613,7 @@ func (c *chain) init() {
c.configurator,
c.rpc,
func() (etcdraft.BlockPuller, error) { return c.puller, nil },
nil,
c.observe,
)
Expect(err).NotTo(HaveOccurred())
Expand Down
3 changes: 3 additions & 0 deletions orderer/consensus/etcdraft/consenter.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,9 @@ func (c *Consenter) HandleChain(support consensus.ConsenterSupport, metadata *co
c.Communication,
rpc,
func() (BlockPuller, error) { return newBlockPuller(support, c.Dialer, c.OrdererConfig.General.Cluster) },
func() {
c.InactiveChainRegistry.TrackChain(support.ChainID(), nil, func() { c.CreateChain(support.ChainID()) })
},
nil,
)
}
Expand Down

0 comments on commit 07db0a8

Please sign in to comment.