Skip to content

Commit

Permalink
Add message handler and default delegate
Browse files Browse the repository at this point in the history
  • Loading branch information
aalda committed Nov 19, 2018
1 parent da60ddd commit 529c14a
Show file tree
Hide file tree
Showing 5 changed files with 105 additions and 26 deletions.
13 changes: 12 additions & 1 deletion cmd/agent_start.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"syscall"

"github.com/bbva/qed/gossip"
"github.com/bbva/qed/gossip/auditor"
"github.com/bbva/qed/log"
"github.com/spf13/cobra"
)
Expand All @@ -44,7 +45,17 @@ func newAgentStartCommand() *cobra.Command {
config.AdvertiseAddr = advertiseAddr
config.Role = gossip.NewNodeType(role)

agent, err := gossip.Create(config, gossip.NewFakeDelegate())
var agent *gossip.Agent
var err error
switch config.Role {
case gossip.PublisherType:
case gossip.MonitorType:
case gossip.AuditorType:
conf := auditor.DefaultConfig()
agent, err = gossip.Create(config, auditor.NewAuditorHandlerBuilder(conf))
default:
log.Fatalf("Failed to start the QED agent: unknown role")
}
if err != nil {
log.Fatalf("Failed to start the QED agent: %v", err)
}
Expand Down
57 changes: 44 additions & 13 deletions gossip/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,16 @@ type AgentMeta struct {
Role AgentType
}

func (m *AgentMeta) Encode() ([]byte, error) {
var buf bytes.Buffer
encoder := codec.NewEncoder(&buf, &codec.MsgpackHandle{})
if err := encoder.Encode(m); err != nil {
log.Errorf("Failed to encode agent metadata: %v", err)
return nil, err
}
return buf.Bytes(), nil
}

type Agent struct {
config *Config
meta *AgentMeta
Expand All @@ -41,7 +51,7 @@ type Agent struct {
state AgentState
}

// AgentState is the state of the Node instance.
// AgentState is the state of the Agent instance.
type AgentState int

const (
Expand Down Expand Up @@ -143,9 +153,22 @@ func (s MemberStatus) String() string {
}
}

type DelegateBuilder func(*Agent) memberlist.Delegate
type MessageHandler interface {
HandleMsg([]byte)
}

type MessageHandlerBuilder func(*Agent) MessageHandler

func Create(conf *Config, delegate DelegateBuilder) (agent *Agent, err error) {
type NopMessageHanler struct {
}

func (h *NopMessageHanler) HandleMsg([]byte) {}

func NewNopMessageHandler(*Agent) MessageHandler {
return nil // &NopMessageHanler{}
}

func Create(conf *Config, handler MessageHandlerBuilder) (agent *Agent, err error) {

meta := &AgentMeta{
Role: conf.Role,
Expand Down Expand Up @@ -181,7 +204,7 @@ func Create(conf *Config, delegate DelegateBuilder) (agent *Agent, err error) {
conf.MemberlistConfig.Logger = log.GetLogger()

// Configure delegates
conf.MemberlistConfig.Delegate = delegate(agent)
conf.MemberlistConfig.Delegate = newAgentDelegate(agent, handler(agent))
conf.MemberlistConfig.Events = &eventDelegate{agent}

agent.memberlist, err = memberlist.Create(conf.MemberlistConfig)
Expand All @@ -204,11 +227,11 @@ func Create(conf *Config, delegate DelegateBuilder) (agent *Agent, err error) {
return agent, nil
}

// Join asks the Node instance to join.
// Join asks the Agent instance to join.
func (a *Agent) Join(addrs []string) (int, error) {

if a.State() != AgentAlive {
return 0, fmt.Errorf("Node can't join after Leave or Shutdown")
return 0, fmt.Errorf("Agent can't join after Leave or Shutdown")
}

if len(addrs) > 0 {
Expand Down Expand Up @@ -245,7 +268,7 @@ func (a *Agent) Leave() error {
// timeout is how long we wait for the message to go out from our own
// queue, but this wait is for that message to propagate through the
// cluster. In particular, we want to stay up long enough to service
// any probes from other nodes before they learn about us leaving.
// any probes from other agents before they learn about us leaving.
time.Sleep(a.config.LeavePropagateDelay)

// Transition to Left only if we not already shutdown
Expand All @@ -258,12 +281,12 @@ func (a *Agent) Leave() error {

}

// Shutdown forcefully shuts down the Node instance, stopping all network
// Shutdown forcefully shuts down the Agent instance, stopping all network
// activity and background maintenance associated with the instance.
//
// This is not a graceful shutdown, and should be preceded by a call
// to Leave. Otherwise, other nodes in the cluster will detect this node's
// exit as a node failure.
// to Leave. Otherwise, other agents in the cluster will detect this agent's
// exit as a agent failure.
//
// It is safe to call this method multiple times.
func (a *Agent) Shutdown() error {
Expand All @@ -276,7 +299,7 @@ func (a *Agent) Shutdown() error {
}

if a.state != AgentLeft {
log.Info("node: Shutdown without a Leave")
log.Info("agent: Shutdown without a Leave")
}

a.state = AgentShutdown
Expand All @@ -292,6 +315,14 @@ func (a *Agent) Memberlist() *memberlist.Memberlist {
return a.memberlist
}

func (a *Agent) Metadata() *AgentMeta {
return a.meta
}

func (a *Agent) Broadcasts() *memberlist.TransmitLimitedQueue {
return a.broadcasts
}

func (a *Agent) State() AgentState {
a.stateLock.Lock()
defer a.stateLock.Unlock()
Expand Down Expand Up @@ -333,7 +364,7 @@ func (a *Agent) encodeMetadata() ([]byte, error) {
var buf bytes.Buffer
encoder := codec.NewEncoder(&buf, &codec.MsgpackHandle{})
if err := encoder.Encode(a.meta); err != nil {
log.Errorf("Failed to encode node metadata: %v", err)
log.Errorf("Failed to encode agent metadata: %v", err)
return nil, err
}
return buf.Bytes(), nil
Expand All @@ -344,7 +375,7 @@ func (a *Agent) decodeMetadata(buf []byte) (*AgentMeta, error) {
reader := bytes.NewReader(buf)
decoder := codec.NewDecoder(reader, &codec.MsgpackHandle{})
if err := decoder.Decode(meta); err != nil {
log.Errorf("Failed to decode node metadata: %v", err)
log.Errorf("Failed to decode agent metadata: %v", err)
return nil, err
}
return meta, nil
Expand Down
33 changes: 33 additions & 0 deletions gossip/auditor/auditor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package auditor

import (
"github.com/bbva/qed/gossip"
)

type Auditor struct {
Agent *gossip.Agent
Config *Config
quit chan bool
}

type Config struct {
}

func DefaultConfig() *Config {
return &Config{}
}

func NewAuditorHandlerBuilder(c *Config) gossip.MessageHandlerBuilder {
auditor := &Auditor{
Config: c,
quit: make(chan bool),
}
return func(a *gossip.Agent) gossip.MessageHandler {
auditor.Agent = a
return auditor
}
}

func (a *Auditor) HandleMsg(msg []byte) {

}
26 changes: 15 additions & 11 deletions gossip/delegate.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,20 +42,22 @@ func (e *eventDelegate) NotifyUpdate(n *memberlist.Node) {
e.agent.handleNodeUpdate(n)
}

func NewFakeDelegate() DelegateBuilder {
return func(n *Agent) memberlist.Delegate {
return &fakeDelegate{n}
}
type agentDelegate struct {
agent *Agent
handler MessageHandler
}

type fakeDelegate struct {
agent *Agent
func newAgentDelegate(agent *Agent, handler MessageHandler) *agentDelegate {
return &agentDelegate{
agent: agent,
handler: handler,
}
}

// NodeMeta is used to retrieve meta-data about the current node
// when broadcasting an alive message. It's length is limited to
// the given byte size. This metadata is available in the Node structure.
func (d *fakeDelegate) NodeMeta(limit int) []byte {
func (d *agentDelegate) NodeMeta(limit int) []byte {
meta, err := d.agent.encodeMetadata()
if err != nil {
log.Fatalf("Unable to encode node metadata: %v", err)
Expand All @@ -67,28 +69,30 @@ func (d *fakeDelegate) NodeMeta(limit int) []byte {
// Care should be taken that this method does not block, since doing
// so would block the entire UDP packet receive loop. Additionally, the byte
// slice may be modified after the call returns, so it should be copied if needed
func (d *fakeDelegate) NotifyMsg([]byte) {}
func (d *agentDelegate) NotifyMsg(msg []byte) {
d.handler.HandleMsg(msg)
}

// GetBroadcasts is called when user data messages can be broadcast.
// It can return a list of buffers to send. Each buffer should assume an
// overhead as provided with a limit on the total byte size allowed.
// The total byte size of the resulting data to send must not exceed
// the limit. Care should be taken that this method does not block,
// since doing so would block the entire UDP packet receive loop.
func (d *fakeDelegate) GetBroadcasts(overhead, limit int) [][]byte {
func (d *agentDelegate) GetBroadcasts(overhead, limit int) [][]byte {
return d.agent.broadcasts.GetBroadcasts(overhead, limit)
}

// LocalState is used for a TCP Push/Pull. This is sent to
// the remote side in addition to the membership information. Any
// data can be sent here. See MergeRemoteState as well. The `join`
// boolean indicates this is for a join instead of a push/pull.
func (d *fakeDelegate) LocalState(join bool) []byte {
func (d *agentDelegate) LocalState(join bool) []byte {
return []byte{}
}

// MergeRemoteState is invoked after a TCP Push/Pull. This is the
// state received from the remote side and is the result of the
// remote side's LocalState call. The 'join'
// boolean indicates this is for a join instead of a push/pull.
func (d *fakeDelegate) MergeRemoteState(buf []byte, join bool) {}
func (d *agentDelegate) MergeRemoteState(buf []byte, join bool) {}
2 changes: 1 addition & 1 deletion server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ func NewServer(
// Create gossip agent
config := gossip.DefaultConfig()
config.BindAddr = gossipAddr
server.agent, err = gossip.Create(config, gossip.NewFakeDelegate())
server.agent, err = gossip.Create(config, gossip.NewNopMessageHandler)
if err != nil {
return nil, err
}
Expand Down

0 comments on commit 529c14a

Please sign in to comment.