Skip to content

Commit

Permalink
Configure graceful shutdown in gossip node
Browse files Browse the repository at this point in the history
  • Loading branch information
aalda committed Nov 15, 2018
1 parent ab9c89d commit 2b236ef
Show file tree
Hide file tree
Showing 4 changed files with 162 additions and 17 deletions.
3 changes: 2 additions & 1 deletion cmd/agent_start.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,8 @@ func newAgentStartCommand() *cobra.Command {
}
log.Debugf("Number of nodes contacted: %d", contacted)

awaitTermSignal(agent.Shutdown)
defer agent.Shutdown()
awaitTermSignal(agent.Leave)

},
}
Expand Down
24 changes: 20 additions & 4 deletions gossip/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ package gossip

import (
"net"
"time"

"github.com/hashicorp/memberlist"
)
Expand Down Expand Up @@ -62,10 +63,12 @@ const DefaultBindPort int = 7946
// DefaultConfig contains the defaults for configurations.
func DefaultConfig() *Config {
return &Config{
BindAddr: "0.0.0.0",
AdvertiseAddr: "",
LeaveOnTerm: true,
EnableCompression: false,
BindAddr: "0.0.0.0",
AdvertiseAddr: "",
LeaveOnTerm: true,
EnableCompression: false,
BroadcastTimeout: 5 * time.Second,
LeavePropagateDelay: 0,
}
}

Expand Down Expand Up @@ -96,6 +99,19 @@ type Config struct {
// by `github.com/hashicorp/memberlist` when broadcasting events.
EnableCompression bool

// BroadcastTimeout is the amount of time to wait for a broadcast
// message to be sent to the cluster. Broadcast messages are used for
// things like leave messages and force remove messages. If this is not
// set, a timeout of 5 seconds will be set.
BroadcastTimeout time.Duration

// LeavePropagateDelay is for our leave (node dead) message to propagate
// through the cluster. In particular, we want to stay up long enough to
// service any probes from other nodes before they learn about us
// leaving and stop probing. Otherwise, we risk getting node failures as
// we leave.
LeavePropagateDelay time.Duration

// MemberlistConfig is the memberlist configuration that Aidotpr will
// use to do the underlying membership management and gossip. Some
// fields in the MemberlistConfig will be overwritten by Auditor no
Expand Down
9 changes: 8 additions & 1 deletion gossip/delegate.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,26 @@ import (
"github.com/hashicorp/memberlist"
)

// NodeTypeDelegate is an AliveDelegate...
// eventDelegate is a simpler delegate that is used only to receive
// notifications about members joining and leaving. The methods in this
// delegate may be called by multiple goroutines, but never concurrently.
// This allows you to reason about ordering.
type eventDelegate struct {
node *Node
}

// NotifyJoin is invoked when a node is detected to have joined.
func (e *eventDelegate) NotifyJoin(n *memberlist.Node) {
e.node.handleNodeJoin(n)
}

// NotifyLeave is invoked when a node is detected to have left.
func (e *eventDelegate) NotifyLeave(n *memberlist.Node) {
e.node.handleNodeLeave(n)
}

// NotifyUpdate is invoked when a node is detected to have
// updated, usually involving the meta data.
func (e *eventDelegate) NotifyUpdate(n *memberlist.Node) {
e.node.handleNodeUpdate(n)
}
Expand Down
143 changes: 132 additions & 11 deletions gossip/node.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
Copyright 2018 Banco Bilbao Vizcaya Argentaria, S.A.
Copyright 2018 Banco Bilbao Vizcaya Argentaria, n.A.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
Expand All @@ -17,6 +17,7 @@ import (
"fmt"
"net"
"sync"
"time"

"github.com/bbva/qed/log"
"github.com/hashicorp/go-msgpack/codec"
Expand All @@ -35,6 +36,34 @@ type Node struct {

topology *Topology
topologyLock sync.RWMutex

stateLock sync.Mutex
state NodeState
}

// NodeState is the state of the Node instance.
type NodeState int

const (
NodeAlive NodeState = iota
NodeLeaving
NodeLeft
NodeShutdown
)

func (s NodeState) String() string {
switch s {
case NodeAlive:
return "alive"
case NodeLeaving:
return "leaving"
case NodeLeft:
return "left"
case NodeShutdown:
return "shutdown"
default:
return "unknown"
}
}

type Topology struct {
Expand All @@ -59,6 +88,13 @@ func (t *Topology) Update(m *Member) error {
return nil
}

func (t *Topology) Delete(m *Member) error {
t.Lock()
defer t.Unlock()
delete(t.members[m.Role], m.Name)
return nil
}

func (t *Topology) Get(kind NodeType) []*Member {
t.Lock()
defer t.Unlock()
Expand Down Expand Up @@ -118,6 +154,7 @@ func Create(conf *Config, delegate DelegateBuilder) (node *Node, err error) {
config: conf,
meta: meta,
topology: NewTopology(),
state: NodeAlive,
}

bindIP, bindPort, err := conf.AddrParts(conf.BindAddr)
Expand All @@ -140,6 +177,7 @@ func Create(conf *Config, delegate DelegateBuilder) (node *Node, err error) {
conf.MemberlistConfig.AdvertiseAddr = advertiseIP
conf.MemberlistConfig.AdvertisePort = advertisePort
conf.MemberlistConfig.Name = conf.NodeName
conf.MemberlistConfig.Logger = log.GetLogger()

// Configure delegates
conf.MemberlistConfig.Delegate = delegate(node)
Expand All @@ -159,51 +197,134 @@ func Create(conf *Config, delegate DelegateBuilder) (node *Node, err error) {
NumNodes: func() int {
return node.memberlist.NumMembers()
},
RetransmitMult: 0,
RetransmitMult: 2,
}

return node, nil
}

// Join asks the Node instance to join.
func (n *Node) Join(addrs []string) (int, error) {

if n.State() != NodeAlive {
return 0, fmt.Errorf("Node can't join after Leave or Shutdown")
}

if len(addrs) > 0 {
log.Debugf("Trying to join the cluster using members: %v", addrs)
return n.memberlist.Join(addrs)
}
return 0, nil
}

func (n *Node) Leave() error {

// Check the current state
n.stateLock.Lock()
if n.state == NodeLeft {
n.stateLock.Unlock()
return nil
} else if n.state == NodeLeaving {
n.stateLock.Unlock()
return fmt.Errorf("Leave already in progress")
} else if n.state == NodeShutdown {
n.stateLock.Unlock()
return fmt.Errorf("Leave called after Shutdown")
}
n.state = NodeLeaving
n.stateLock.Unlock()

// Attempt the memberlist leave
err := n.memberlist.Leave(n.config.BroadcastTimeout)
if err != nil {
return err
}

// Wait for the leave to propagate through the cluster. The broadcast
// timeout is how long we wait for the message to go out from our own
// queue, but this wait is for that message to propagate through the
// cluster. In particular, we want to stay up long enough to service
// any probes from other nodes before they learn about us leaving.
time.Sleep(n.config.LeavePropagateDelay)

// Transition to Left only if we not already shutdown
n.stateLock.Lock()
if n.state != NodeShutdown {
n.state = NodeLeft
}
n.stateLock.Unlock()
return nil

}

// Shutdown forcefully shuts down the Node instance, stopping all network
// activity and background maintenance associated with the instance.
//
// This is not a graceful shutdown, and should be preceded by a call
// to Leave. Otherwise, other nodes in the cluster will detect this node's
// exit as a node failure.
//
// It is safe to call this method multiple times.
func (n *Node) Shutdown() error {

n.stateLock.Lock()
defer n.stateLock.Unlock()

if n.state == NodeShutdown {
return nil
}

if n.state != NodeLeft {
log.Info("node: Shutdown without a Leave")
}

n.state = NodeShutdown
err := n.memberlist.Shutdown()
if err != nil {
return err
}

return nil
}

func (n *Node) handleNodeJoin(peer *memberlist.Node) {
func (n *Node) Memberlist() *memberlist.Memberlist {
return n.memberlist
}

func (n *Node) State() NodeState {
n.stateLock.Lock()
defer n.stateLock.Unlock()
return n.state
}

func (n *Node) getMember(peer *memberlist.Node) *Member {
meta, err := n.decodeMetadata(peer.Meta)
if err != nil {
panic(err)
}
member := &Member{
Name: peer.Name,
Addr: net.IP(peer.Addr),
Port: peer.Port,
Role: meta.Role,
Status: StatusAlive,
return &Member{
Name: peer.Name,
Addr: net.IP(peer.Addr),
Port: peer.Port,
Role: meta.Role,
}
}

func (n *Node) handleNodeJoin(peer *memberlist.Node) {
member := n.getMember(peer)
member.Status = StatusAlive
n.topology.Update(member)
log.Debugf("%s member joined: %s %s:%d", member.Role, member.Name, member.Addr, member.Port)
}

func (n *Node) handleNodeLeave(peer *memberlist.Node) {

member := n.getMember(peer)
n.topology.Delete(member)
log.Debugf("%s member left: %s %s:%d", member.Role, member.Name, member.Addr, member.Port)
}

func (n *Node) handleNodeUpdate(peer *memberlist.Node) {

// ignore
}

func (n *Node) encodeMetadata() ([]byte, error) {
Expand Down

0 comments on commit 2b236ef

Please sign in to comment.