diff --git a/gossip/agent.go b/gossip/agent.go index 09f850d64..7348b5f05 100644 --- a/gossip/agent.go +++ b/gossip/agent.go @@ -13,173 +13,50 @@ package gossip import ( - "bytes" "fmt" "net" "sync" "time" + "github.com/bbva/qed/gossip/member" "github.com/bbva/qed/log" "github.com/bbva/qed/protocol" - "github.com/hashicorp/go-msgpack/codec" "github.com/hashicorp/memberlist" ) -type AgentMeta struct { - Role AgentType -} - -func (m *AgentMeta) Encode() ([]byte, error) { - var buf bytes.Buffer - encoder := codec.NewEncoder(&buf, &codec.MsgpackHandle{}) - if err := encoder.Encode(m); err != nil { - log.Errorf("Failed to encode agent metadata: %v", err) - return nil, err - } - return buf.Bytes(), nil +type Alert struct { + id string + msg string } type Agent struct { - config *Config - meta *AgentMeta + config *Config + Self *member.Peer + memberlist *memberlist.Memberlist broadcasts *memberlist.TransmitLimitedQueue - topology *Topology - topologyLock sync.RWMutex + Topology *Topology stateLock sync.Mutex - state AgentState -} - -// AgentState is the state of the Agent instance. -type AgentState int - -const ( - AgentAlive AgentState = iota - AgentLeaving - AgentLeft - AgentShutdown -) - -func (s AgentState) String() string { - switch s { - case AgentAlive: - return "alive" - case AgentLeaving: - return "leaving" - case AgentLeft: - return "left" - case AgentShutdown: - return "shutdown" - default: - return "unknown" - } -} - -type Topology struct { - members map[AgentType]map[string]*Member - sync.Mutex -} - -func NewTopology() *Topology { - members := make(map[AgentType]map[string]*Member) - for i := 0; i < int(MaxType); i++ { - members[AgentType(i)] = make(map[string]*Member) - } - return &Topology{ - members: members, - } -} - -func (t *Topology) Update(m *Member) error { - t.Lock() - defer t.Unlock() - log.Debugf("Updating topology with member: %+v", m) - t.members[m.Role][m.Name] = m - return nil -} - -func (t *Topology) Delete(m *Member) error { - t.Lock() - defer t.Unlock() - delete(t.members[m.Role], m.Name) - return nil -} - -func (t *Topology) Get(kind AgentType) []*Member { - t.Lock() - defer t.Unlock() - members := make([]*Member, 0) - for _, member := range t.members[kind] { - members = append(members, member) - } - return members -} - -// Member is a single member of the gossip cluster. -type Member struct { - Name string - Addr net.IP - Port uint16 - Role AgentType - Status MemberStatus -} - -// MemberStatus is the state that a member is in. -type MemberStatus int - -const ( - StatusNone MemberStatus = iota - StatusAlive - StatusLeaving - StatusLeft - StatusFailed -) -func (s MemberStatus) String() string { - switch s { - case StatusNone: - return "none" - case StatusAlive: - return "alive" - case StatusLeaving: - return "leaving" - case StatusLeft: - return "left" - case StatusFailed: - return "failed" - default: - panic(fmt.Sprintf("unknown MemberStatus: %d", s)) - } -} + processors []Processor -type MessageHandler interface { - HandleMsg([]byte) + In chan *protocol.BatchSnapshots + Out chan *protocol.BatchSnapshots + Alerts chan Alert + quit chan bool } -type MessageHandlerBuilder func(*Agent) MessageHandler - -type NopMessageHanler struct { -} - -func (h *NopMessageHanler) HandleMsg([]byte) {} - -func NewNopMessageHandler(*Agent) MessageHandler { - return &NopMessageHanler{} -} - -func Create(conf *Config, handler MessageHandlerBuilder) (agent *Agent, err error) { - - meta := &AgentMeta{ - Role: conf.Role, - } +func NewAgent(conf *Config, p []Processor) (agent *Agent, err error) { agent = &Agent{ config: conf, - meta: meta, - topology: NewTopology(), - state: AgentAlive, + Topology: NewTopology(), + In: make(chan *protocol.BatchSnapshots, 1000), + Out: make(chan *protocol.BatchSnapshots, 1000), + Alerts: make(chan Alert, 100), + quit: make(chan bool), } bindIP, bindPort, err := conf.AddrParts(conf.BindAddr) @@ -205,7 +82,7 @@ func Create(conf *Config, handler MessageHandlerBuilder) (agent *Agent, err erro conf.MemberlistConfig.Logger = log.GetLogger() // Configure delegates - conf.MemberlistConfig.Delegate = newAgentDelegate(agent, handler(agent)) + conf.MemberlistConfig.Delegate = newAgentDelegate(agent) conf.MemberlistConfig.Events = &eventDelegate{agent} agent.memberlist, err = memberlist.Create(conf.MemberlistConfig) @@ -214,8 +91,8 @@ func Create(conf *Config, handler MessageHandlerBuilder) (agent *Agent, err erro } // Print local member info - localNode := agent.memberlist.LocalNode() - log.Infof("Local member %s:%d", localNode.Addr, localNode.Port) + agent.Self = member.ParsePeer(agent.memberlist.LocalNode()) + log.Infof("Local member %+v", agent.Self) // Set broadcast queue agent.broadcasts = &memberlist.TransmitLimitedQueue{ @@ -228,10 +105,66 @@ func Create(conf *Config, handler MessageHandlerBuilder) (agent *Agent, err erro return agent, nil } +func (a *Agent) Start() { + + outTicker := time.NewTimer(2 * time.Second) + alertTicker := time.NewTimer(1 * time.Second) + + for { + select { + case batch := <-a.In: + for _, p := range a.processors { + go p.Process(batch) + } + + a.Out <- batch + case <-outTicker.C: + a.sendOutQueue() + case <-alertTicker.C: + a.processAlertQueue() + case <-a.quit: + return + } + + } + +} + +func (a *Agent) processAlertQueue() { + +} + +func (a *Agent) sendOutQueue() { + var batch *protocol.BatchSnapshots + for { + select { + case batch = <-a.Out: + default: + return + } + + if batch.TTL <= 0 { + continue + } + + batch.TTL-- + + for _, dst := range a.route(batch.From) { + msg, _ := batch.Encode() + a.memberlist.SendReliable(dst, msg) + } + } + +} + +func (a Agent) route(s *member.Peer) []*memberlist.Node { + return nil +} + // Join asks the Agent instance to join. func (a *Agent) Join(addrs []string) (int, error) { - if a.State() != AgentAlive { + if a.State() != member.Alive { return 0, fmt.Errorf("Agent can't join after Leave or Shutdown") } @@ -246,18 +179,20 @@ func (a *Agent) Leave() error { // Check the current state a.stateLock.Lock() - if a.state == AgentLeft { + switch a.Self.Status { + case member.Left: a.stateLock.Unlock() return nil - } else if a.state == AgentLeaving { + case member.Leaving: a.stateLock.Unlock() return fmt.Errorf("Leave already in progress") - } else if a.state == AgentShutdown { + case member.Shutdown: a.stateLock.Unlock() return fmt.Errorf("Leave called after Shutdown") + default: + a.Self.Status = member.Leaving + a.stateLock.Unlock() } - a.state = AgentLeaving - a.stateLock.Unlock() // Attempt the memberlist leave err := a.memberlist.Leave(a.config.BroadcastTimeout) @@ -274,8 +209,8 @@ func (a *Agent) Leave() error { // Transition to Left only if we not already shutdown a.stateLock.Lock() - if a.state != AgentShutdown { - a.state = AgentLeft + if a.Self.Status != member.Shutdown { + a.Self.Status = member.Left } a.stateLock.Unlock() return nil @@ -295,15 +230,15 @@ func (a *Agent) Shutdown() error { a.stateLock.Lock() defer a.stateLock.Unlock() - if a.state == AgentShutdown { + if a.Self.Status == member.Shutdown { return nil } - if a.state != AgentLeft { + if a.Self.Status != member.Left { log.Info("agent: Shutdown without a Leave") } - a.state = AgentShutdown + a.Self.Status = member.Shutdown err := a.memberlist.Shutdown() if err != nil { return err @@ -316,10 +251,6 @@ func (a *Agent) Memberlist() *memberlist.Memberlist { return a.memberlist } -func (a *Agent) Metadata() *AgentMeta { - return a.meta -} - func (a *Agent) Broadcasts() *memberlist.TransmitLimitedQueue { return a.broadcasts } @@ -329,107 +260,8 @@ func (a *Agent) GetAddrPort() (net.IP, uint16) { return n.Addr, n.Port } -func (a *Agent) State() AgentState { +func (a *Agent) State() member.Status { a.stateLock.Lock() defer a.stateLock.Unlock() - return a.state -} - -func (a *Agent) getMember(peer *memberlist.Node) *Member { - meta, err := a.decodeMetadata(peer.Meta) - if err != nil { - panic(err) - } - return &Member{ - Name: peer.Name, - Addr: net.IP(peer.Addr), - Port: peer.Port, - Role: meta.Role, - } -} - -func (a *Agent) handleNodeJoin(peer *memberlist.Node) { - member := a.getMember(peer) - member.Status = StatusAlive - a.topology.Update(member) - log.Debugf("%s member joined: %s %s:%d", member.Role, member.Name, member.Addr, member.Port) -} - -func (a *Agent) handleNodeLeave(peer *memberlist.Node) { - member := a.getMember(peer) - a.topology.Delete(member) - log.Debugf("%s member left: %s %s:%d", member.Role, member.Name, member.Addr, member.Port) -} - -func (a *Agent) handleNodeUpdate(peer *memberlist.Node) { - // ignore -} - -func (a *Agent) encodeMetadata() ([]byte, error) { - var buf bytes.Buffer - encoder := codec.NewEncoder(&buf, &codec.MsgpackHandle{}) - if err := encoder.Encode(a.meta); err != nil { - log.Errorf("Failed to encode agent metadata: %v", err) - return nil, err - } - return buf.Bytes(), nil -} - -func (a *Agent) decodeMetadata(buf []byte) (*AgentMeta, error) { - meta := &AgentMeta{} - reader := bytes.NewReader(buf) - decoder := codec.NewDecoder(reader, &codec.MsgpackHandle{}) - if err := decoder.Decode(meta); err != nil { - log.Errorf("Failed to decode agent metadata: %v", err) - return nil, err - } - return meta, nil -} - -func memberToNode(members []*Member) []*memberlist.Node { - list := make([]*memberlist.Node, 0) - for _, m := range members { - list = append(list, &memberlist.Node{Addr: m.Addr, Port: m.Port}) - } - return list -} - -func (a *Agent) GetPeers(max int, agentType AgentType, excluded []*protocol.Source) []*memberlist.Node { - fullList := a.topology.Get(agentType) - fullList = excludePeers(fullList, excluded) - - if len(fullList) <= max { - return memberToNode(fullList) - } - - var randomPeers []*Member - for i := 0; i < max; i++ { - randomPeers = append(randomPeers, fullList[i]) - } - - return memberToNode(randomPeers) -} - -func excludePeers(peers []*Member, excluded []*protocol.Source) []*Member { - if excluded == nil { - return peers - } - - filteredList := make([]*Member, 0) - var exclude bool - - for _, p := range peers { - exclude = false - for _, e := range excluded { - if bytes.Equal(p.Addr, e.Addr) && p.Port == e.Port { - exclude = true - } - } - - if !exclude { - filteredList = append(filteredList, p) - } - } - - return filteredList + return a.Self.Status } diff --git a/gossip/config.go b/gossip/config.go index cd5474926..4ee4fae29 100644 --- a/gossip/config.go +++ b/gossip/config.go @@ -16,47 +16,10 @@ import ( "net" "time" + "github.com/bbva/qed/gossip/member" "github.com/hashicorp/memberlist" ) -type AgentType int - -func (t AgentType) String() string { - switch t { - case AuditorType: - return "auditor" - case MonitorType: - return "monitor" - case PublisherType: - return "publisher" - case ServerType: - return "server" - default: - return "unknown" - } -} - -func NewNodeType(value string) AgentType { - switch value { - case "auditor": - return AuditorType - case "monitor": - return MonitorType - case "publisher": - return PublisherType - default: - return ServerType - } -} - -const ( - AuditorType AgentType = iota - MonitorType - PublisherType - ServerType - MaxType -) - // This is the default port that we use for the Agent communication const DefaultBindPort int = 7946 @@ -78,7 +41,7 @@ type Config struct { // is not set, Auditor will set it to the hostname of the running machine. NodeName string - Role AgentType + Role member.Type // BindAddr is the address that the Auditor agent's communication ports // will bind to. Auditor will use this address to bind to for both TCP diff --git a/gossip/delegate.go b/gossip/delegate.go index b0115791a..898b18887 100644 --- a/gossip/delegate.go +++ b/gossip/delegate.go @@ -13,8 +13,9 @@ package gossip import ( - "log" - + "github.com/bbva/qed/gossip/member" + "github.com/bbva/qed/log" + "github.com/bbva/qed/protocol" "github.com/hashicorp/memberlist" ) @@ -28,29 +29,32 @@ type eventDelegate struct { // NotifyJoin is invoked when a node is detected to have joined. func (e *eventDelegate) NotifyJoin(n *memberlist.Node) { - e.agent.handleNodeJoin(n) + peer := member.ParsePeer(n) + peer.Status = member.Alive + e.agent.Topology.Update(peer) + log.Debugf("%s member joined: %+v", peer) } // NotifyLeave is invoked when a node is detected to have left. func (e *eventDelegate) NotifyLeave(n *memberlist.Node) { - e.agent.handleNodeLeave(n) + peer := member.ParsePeer(n) + e.agent.Topology.Delete(peer) + log.Debugf("%s member left: %+v", peer) } // NotifyUpdate is invoked when a node is detected to have // updated, usually involving the meta data. func (e *eventDelegate) NotifyUpdate(n *memberlist.Node) { - e.agent.handleNodeUpdate(n) + // ignore } type agentDelegate struct { - agent *Agent - handler MessageHandler + agent *Agent } -func newAgentDelegate(agent *Agent, handler MessageHandler) *agentDelegate { +func newAgentDelegate(agent *Agent) *agentDelegate { return &agentDelegate{ - agent: agent, - handler: handler, + agent: agent, } } @@ -58,7 +62,7 @@ func newAgentDelegate(agent *Agent, handler MessageHandler) *agentDelegate { // when broadcasting an alive message. It's length is limited to // the given byte size. This metadata is available in the Node structure. func (d *agentDelegate) NodeMeta(limit int) []byte { - meta, err := d.agent.encodeMetadata() + meta, err := d.agent.Self.Meta.Encode() if err != nil { log.Fatalf("Unable to encode node metadata: %v", err) } @@ -70,7 +74,15 @@ func (d *agentDelegate) NodeMeta(limit int) []byte { // so would block the entire UDP packet receive loop. Additionally, the byte // slice may be modified after the call returns, so it should be copied if needed func (d *agentDelegate) NotifyMsg(msg []byte) { - d.handler.HandleMsg(msg) + var batch protocol.BatchSnapshots + err := batch.Decode(msg) + if err != nil { + log.Errorf("Unable to decode message: %v", err) + return + } + + log.Infof("Batch received, TTL: %d: %v", batch.TTL, batch) + d.agent.In <- &batch } // GetBroadcasts is called when user data messages can be broadcast. diff --git a/gossip/sender/sender.go b/gossip/sender/sender.go index 3cceda9e0..13ac2b782 100644 --- a/gossip/sender/sender.go +++ b/gossip/sender/sender.go @@ -14,7 +14,6 @@ package sender import ( - "bytes" "fmt" "time" @@ -22,7 +21,6 @@ import ( "github.com/bbva/qed/log" "github.com/bbva/qed/protocol" "github.com/bbva/qed/sign" - "github.com/hashicorp/go-msgpack/codec" ) type Sender struct { @@ -36,13 +34,15 @@ type Config struct { BatchSize uint BatchInterval time.Duration TTL int + EachN int } func DefaultConfig() *Config { return &Config{ - 100, - 1 * time.Second, - 2, + BatchSize: 100, + BatchInterval: 1 * time.Second, + TTL: 2, + EachN: 2, } } @@ -68,15 +68,10 @@ func (s Sender) Start(ch chan *protocol.Snapshot) { log.Debugf("Encoding batch: %+v", batch) msg, _ := batch.Encode() - peers := s.Agent.GetPeers(2, gossip.AuditorType, nil) - peers = append(peers, s.Agent.GetPeers(2, gossip.MonitorType, nil)...) - peers = append(peers, s.Agent.GetPeers(2, gossip.PublisherType, nil)...) - log.Debugf("Peers selected: %+v", peers) + peers := s.Agent.Topology.Each(s.Config.EachN, nil) - for _, peer := range peers { - log.Debugf("%+v", peer) - log.Debugf("Sending batch to peer: %s:%d", peer.Addr, peer.Port) - err := s.Agent.Memberlist().SendReliable(peer, msg) + for _, peer := range peers.L { + err := s.Agent.Memberlist().SendReliable(peer.Node(), msg) if err != nil { log.Errorf("Failed send message: %v", err) } @@ -91,16 +86,6 @@ func (s Sender) Stop() { s.quit <- true } -func encode(msg *protocol.BatchSnapshots) ([]byte, error) { - var buf bytes.Buffer - encoder := codec.NewEncoder(&buf, &codec.MsgpackHandle{}) - if err := encoder.Encode(msg); err != nil { - log.Errorf("Failed to encode message: %v", err) - return nil, err - } - return buf.Bytes(), nil -} - func (s *Sender) getBatch(ch chan *protocol.Snapshot) *protocol.BatchSnapshots { if len(ch) == 0 { @@ -109,16 +94,13 @@ func (s *Sender) getBatch(ch chan *protocol.Snapshot) *protocol.BatchSnapshots { var snapshot *protocol.Snapshot var batch protocol.BatchSnapshots + var batchSize int = 100 var counter int = 0 + batch.Snapshots = make([]*protocol.SignedSnapshot, 0) batch.TTL = s.Config.TTL - addr, port := s.Agent.GetAddrPort() - batch.From = &protocol.Source{ - Addr: addr, - Port: port, - Role: s.Agent.Metadata().Role.String(), - } + batch.From = s.Agent.Self for { select {