From 6d5077897ed4b6f9a37bd0cb48da0fb861edcbaf Mon Sep 17 00:00:00 2001 From: Alex Dadgar Date: Mon, 30 Jan 2017 16:34:13 -0800 Subject: [PATCH 1/2] Update serf --- .../hashicorp/serf/serf/coalesce_user.go | 2 +- .../github.com/hashicorp/serf/serf/config.go | 14 +- vendor/github.com/hashicorp/serf/serf/serf.go | 133 ++++++++++-------- vendor/vendor.json | 10 +- 4 files changed, 85 insertions(+), 74 deletions(-) diff --git a/vendor/github.com/hashicorp/serf/serf/coalesce_user.go b/vendor/github.com/hashicorp/serf/serf/coalesce_user.go index 58131e50f2e..1551b6c52cb 100644 --- a/vendor/github.com/hashicorp/serf/serf/coalesce_user.go +++ b/vendor/github.com/hashicorp/serf/serf/coalesce_user.go @@ -36,7 +36,7 @@ func (c *userEventCoalescer) Coalesce(e Event) { return } - // If the same age, save it + // If the the same age, save it if latest.LTime == user.LTime { latest.Events = append(latest.Events, e) } diff --git a/vendor/github.com/hashicorp/serf/serf/config.go b/vendor/github.com/hashicorp/serf/serf/config.go index e8edd6902ca..dfe878bbcde 100644 --- a/vendor/github.com/hashicorp/serf/serf/config.go +++ b/vendor/github.com/hashicorp/serf/serf/config.go @@ -121,12 +121,12 @@ type Config struct { // prevent an unbounded growth of memory utilization MaxQueueDepth int - // RecentIntentBuffer is used to set the size of recent join and leave intent - // messages that will be buffered. This is used to guard against - // the case where Serf broadcasts an intent that arrives before the - // Memberlist event. It is important that this not be too small to avoid - // continuous rebroadcasting of dead events. - RecentIntentBuffer int + // RecentIntentTimeout is used to determine how long we store recent + // join and leave intents. This is used to guard against the case where + // Serf broadcasts an intent that arrives before the Memberlist event. + // It is important that this not be too short to avoid continuous + // rebroadcasting of dead events. + RecentIntentTimeout time.Duration // EventBuffer is used to control how many events are buffered. // This is used to prevent re-delivery of events to a client. The buffer @@ -242,7 +242,7 @@ func DefaultConfig() *Config { LogOutput: os.Stderr, ProtocolVersion: ProtocolVersionMax, ReapInterval: 15 * time.Second, - RecentIntentBuffer: 128, + RecentIntentTimeout: 5 * time.Minute, ReconnectInterval: 30 * time.Second, ReconnectTimeout: 24 * time.Hour, QueueDepthWarning: 128, diff --git a/vendor/github.com/hashicorp/serf/serf/serf.go b/vendor/github.com/hashicorp/serf/serf/serf.go index 04b300a9b5d..424da01953c 100644 --- a/vendor/github.com/hashicorp/serf/serf/serf.go +++ b/vendor/github.com/hashicorp/serf/serf/serf.go @@ -65,12 +65,11 @@ type Serf struct { memberLock sync.RWMutex members map[string]*memberState - // Circular buffers for recent intents, used - // in case we get the intent before the relevant event - recentLeave []nodeIntent - recentLeaveIndex int - recentJoin []nodeIntent - recentJoinIndex int + // recentIntents the lamport time and type of intent for a given node in + // case we get an intent before the relevant memberlist event. This is + // indexed by node, and always store the latest lamport time / intent + // we've seen. The memberLock protects this structure. + recentIntents map[string]nodeIntent eventBroadcasts *memberlist.TransmitLimitedQueue eventBuffer []*userEvents @@ -179,10 +178,18 @@ type memberState struct { leaveTime time.Time // wall clock time of leave } -// nodeIntent is used to buffer intents for out-of-order deliveries +// nodeIntent is used to buffer intents for out-of-order deliveries. type nodeIntent struct { + // Type is the intent being tracked. Only messageJoinType and + // messageLeaveType are tracked. + Type messageType + + // WallTime is the wall clock time we saw this intent in order to + // expire it from the buffer. + WallTime time.Time + + // LTime is the Lamport time, used for cluster-wide ordering of events. LTime LamportTime - Node string } // userEvent is used to buffer events to prevent re-delivery @@ -340,8 +347,7 @@ func Create(conf *Config) (*Serf, error) { } // Create the buffer for recent intents - serf.recentJoin = make([]nodeIntent, conf.RecentIntentBuffer) - serf.recentLeave = make([]nodeIntent, conf.RecentIntentBuffer) + serf.recentIntents = make(map[string]nodeIntent) // Create a buffer for events and queries serf.eventBuffer = make([]*userEvents, conf.EventBuffer) @@ -855,17 +861,15 @@ func (s *Serf) handleNodeJoin(n *memberlist.Node) { }, } - // Check if we have a join intent and use the LTime - if join := recentIntent(s.recentJoin, n.Name); join != nil { - member.statusLTime = join.LTime + // Check if we have a join or leave intent. The intent buffer + // will only hold one event for this node, so the more recent + // one will take effect. + if join, ok := recentIntent(s.recentIntents, n.Name, messageJoinType); ok { + member.statusLTime = join } - - // Check if we have a leave intent - if leave := recentIntent(s.recentLeave, n.Name); leave != nil { - if leave.LTime > member.statusLTime { - member.Status = StatusLeaving - member.statusLTime = leave.LTime - } + if leave, ok := recentIntent(s.recentIntents, n.Name, messageLeaveType); ok { + member.Status = StatusLeaving + member.statusLTime = leave } s.members[n.Name] = member @@ -1016,18 +1020,8 @@ func (s *Serf) handleNodeLeaveIntent(leaveMsg *messageLeave) bool { member, ok := s.members[leaveMsg.Node] if !ok { - // If we've already seen this message don't rebroadcast - if recentIntent(s.recentLeave, leaveMsg.Node) != nil { - return false - } - - // We don't know this member so store it in a buffer for now - s.recentLeave[s.recentLeaveIndex] = nodeIntent{ - LTime: leaveMsg.LTime, - Node: leaveMsg.Node, - } - s.recentLeaveIndex = (s.recentLeaveIndex + 1) % len(s.recentLeave) - return true + // Rebroadcast only if this was an update we hadn't seen before. + return upsertIntent(s.recentIntents, leaveMsg.Node, messageLeaveType, leaveMsg.LTime, time.Now) } // If the message is old, then it is irrelevant and we can skip it @@ -1087,15 +1081,8 @@ func (s *Serf) handleNodeJoinIntent(joinMsg *messageJoin) bool { member, ok := s.members[joinMsg.Node] if !ok { - // If we've already seen this message don't rebroadcast - if recentIntent(s.recentJoin, joinMsg.Node) != nil { - return false - } - - // We don't know this member so store it in a buffer for now - s.recentJoin[s.recentJoinIndex] = nodeIntent{LTime: joinMsg.LTime, Node: joinMsg.Node} - s.recentJoinIndex = (s.recentJoinIndex + 1) % len(s.recentJoin) - return true + // Rebroadcast only if this was an update we hadn't seen before. + return upsertIntent(s.recentIntents, joinMsg.Node, messageJoinType, joinMsg.LTime, time.Now) } // Check if this time is newer than what we have @@ -1387,14 +1374,17 @@ func (s *Serf) resolveNodeConflict() { } } -// handleReap periodically reaps the list of failed and left members. +// handleReap periodically reaps the list of failed and left members, as well +// as old buffered intents. func (s *Serf) handleReap() { for { select { case <-time.After(s.config.ReapInterval): s.memberLock.Lock() - s.failedMembers = s.reap(s.failedMembers, s.config.ReconnectTimeout) - s.leftMembers = s.reap(s.leftMembers, s.config.TombstoneTimeout) + now := time.Now() + s.failedMembers = s.reap(s.failedMembers, now, s.config.ReconnectTimeout) + s.leftMembers = s.reap(s.leftMembers, now, s.config.TombstoneTimeout) + reapIntents(s.recentIntents, now, s.config.RecentIntentTimeout) s.memberLock.Unlock() case <-s.shutdownCh: return @@ -1418,8 +1408,7 @@ func (s *Serf) handleReconnect() { // reap is called with a list of old members and a timeout, and removes // members that have exceeded the timeout. The members are removed from // both the old list and the members itself. Locking is left to the caller. -func (s *Serf) reap(old []*memberState, timeout time.Duration) []*memberState { - now := time.Now() +func (s *Serf) reap(old []*memberState, now time.Time, timeout time.Duration) []*memberState { n := len(old) for i := 0; i < n; i++ { m := old[i] @@ -1490,7 +1479,7 @@ func (s *Serf) reconnect() { } // Select a random member to try and join - idx := int(rand.Uint32() % uint32(n)) + idx := rand.Int31n(int32(n)) mem := s.failedMembers[idx] s.memberLock.RUnlock() @@ -1538,24 +1527,46 @@ func removeOldMember(old []*memberState, name string) []*memberState { return old } -// recentIntent checks the recent intent buffer for a matching -// entry for a given node, and either returns the message or nil -func recentIntent(recent []nodeIntent, node string) (intent *nodeIntent) { - for i := 0; i < len(recent); i++ { - // Break fast if we hit a zero entry - if recent[i].LTime == 0 { - break +// reapIntents clears out any intents that are older than the timeout. Make sure +// the memberLock is held when passing in the Serf instance's recentIntents +// member. +func reapIntents(intents map[string]nodeIntent, now time.Time, timeout time.Duration) { + for node, intent := range intents { + if now.Sub(intent.WallTime) > timeout { + delete(intents, node) } + } +} - // Check for a node match - if recent[i].Node == node { - // Take the most recent entry - if intent == nil || recent[i].LTime > intent.LTime { - intent = &recent[i] - } +// upsertIntent will update an existing intent with the supplied Lamport time, +// or create a new entry. This will return true if a new entry was added. The +// stamper is used to capture the wall clock time for expiring these buffered +// intents. Make sure the memberLock is held when passing in the Serf instance's +// recentIntents member. +func upsertIntent(intents map[string]nodeIntent, node string, itype messageType, + ltime LamportTime, stamper func() time.Time) bool { + if intent, ok := intents[node]; !ok || ltime > intent.LTime { + intents[node] = nodeIntent{ + Type: itype, + WallTime: stamper(), + LTime: ltime, } + return true } - return + + return false +} + +// recentIntent checks the recent intent buffer for a matching entry for a given +// node, and returns the Lamport time, if an intent is present, indicated by the +// returned boolean. Make sure the memberLock is held for read when passing in +// the Serf instance's recentIntents member. +func recentIntent(intents map[string]nodeIntent, node string, itype messageType) (LamportTime, bool) { + if intent, ok := intents[node]; ok && intent.Type == itype { + return intent.LTime, true + } + + return LamportTime(0), false } // handleRejoin attempts to reconnect to previously known alive nodes diff --git a/vendor/vendor.json b/vendor/vendor.json index 00815673206..f751926a2c6 100644 --- a/vendor/vendor.json +++ b/vendor/vendor.json @@ -811,15 +811,15 @@ "checksumSHA1": "E3Xcanc9ouQwL+CZGOUyA/+giLg=", "comment": "v0.7.0-18-gc4c55f1", "path": "github.com/hashicorp/serf/coordinate", - "revision": "6c4672d66fc6312ddde18399262943e21175d831", - "revisionTime": "2016-06-09T00:18:40Z" + "revision": "b9642a47e6139e50548b6f14588a1a3c0839660a", + "revisionTime": "2016-09-14T16:26:25Z" }, { - "checksumSHA1": "jgQHuXL6QZLht/1dIYfSytPKWr4=", + "checksumSHA1": "vLyudzMEdik8IpRY1H2vRa2PeLU=", "comment": "v0.7.0-18-gc4c55f1", "path": "github.com/hashicorp/serf/serf", - "revision": "6c4672d66fc6312ddde18399262943e21175d831", - "revisionTime": "2016-06-09T00:18:40Z" + "revision": "b9642a47e6139e50548b6f14588a1a3c0839660a", + "revisionTime": "2016-09-14T16:26:25Z" }, { "checksumSHA1": "eGzvBRMFD6ZB3A6uO750np7Om/E=", From 45c78635a2b98e2574b846f69da27b08fec7cfa9 Mon Sep 17 00:00:00 2001 From: Alex Dadgar Date: Mon, 30 Jan 2017 16:39:00 -0800 Subject: [PATCH 2/2] Vendor memberlist --- .../github.com/hashicorp/memberlist/config.go | 15 ++++-- .../hashicorp/memberlist/memberlist.go | 4 +- vendor/github.com/hashicorp/memberlist/net.go | 51 ++++++++++++++++--- .../github.com/hashicorp/memberlist/state.go | 37 ++++++++++---- .../github.com/hashicorp/memberlist/util.go | 37 +++++++------- vendor/vendor.json | 6 +-- 6 files changed, 108 insertions(+), 42 deletions(-) diff --git a/vendor/github.com/hashicorp/memberlist/config.go b/vendor/github.com/hashicorp/memberlist/config.go index 27a52ea5612..498a62c548d 100644 --- a/vendor/github.com/hashicorp/memberlist/config.go +++ b/vendor/github.com/hashicorp/memberlist/config.go @@ -126,8 +126,12 @@ type Config struct { // per GossipInterval. Increasing this number causes the gossip messages // to propagate across the cluster more quickly at the expense of // increased bandwidth. - GossipInterval time.Duration - GossipNodes int + // + // GossipToTheDeadTime is the interval after which a node has died that + // we will still try to gossip to it. This gives it a chance to refute. + GossipInterval time.Duration + GossipNodes int + GossipToTheDeadTime time.Duration // EnableCompression is used to control message compression. This can // be used to reduce bandwidth usage at the cost of slightly more CPU @@ -212,8 +216,9 @@ func DefaultLANConfig() *Config { DisableTcpPings: false, // TCP pings are safe, even with mixed versions AwarenessMaxMultiplier: 8, // Probe interval backs off to 8 seconds - GossipNodes: 3, // Gossip to 3 nodes - GossipInterval: 200 * time.Millisecond, // Gossip more rapidly + GossipNodes: 3, // Gossip to 3 nodes + GossipInterval: 200 * time.Millisecond, // Gossip more rapidly + GossipToTheDeadTime: 30 * time.Second, // Same as push/pull EnableCompression: true, // Enable compression by default @@ -238,6 +243,7 @@ func DefaultWANConfig() *Config { conf.ProbeInterval = 5 * time.Second conf.GossipNodes = 4 // Gossip less frequently, but to an additional node conf.GossipInterval = 500 * time.Millisecond + conf.GossipToTheDeadTime = 60 * time.Second return conf } @@ -254,6 +260,7 @@ func DefaultLocalConfig() *Config { conf.ProbeTimeout = 200 * time.Millisecond conf.ProbeInterval = time.Second conf.GossipInterval = 100 * time.Millisecond + conf.GossipToTheDeadTime = 15 * time.Second return conf } diff --git a/vendor/github.com/hashicorp/memberlist/memberlist.go b/vendor/github.com/hashicorp/memberlist/memberlist.go index 6c8c93ba90b..7e696762599 100644 --- a/vendor/github.com/hashicorp/memberlist/memberlist.go +++ b/vendor/github.com/hashicorp/memberlist/memberlist.go @@ -496,7 +496,7 @@ func (m *Memberlist) SendTo(to net.Addr, msg []byte) error { buf = append(buf, msg...) // Send the message - return m.rawSendMsgUDP(to, buf) + return m.rawSendMsgUDP(to, nil, buf) } // SendToUDP is used to directly send a message to another node, without @@ -513,7 +513,7 @@ func (m *Memberlist) SendToUDP(to *Node, msg []byte) error { // Send the message destAddr := &net.UDPAddr{IP: to.Addr, Port: int(to.Port)} - return m.rawSendMsgUDP(destAddr, buf) + return m.rawSendMsgUDP(destAddr, to, buf) } // SendToTCP is used to directly send a message to another node, without diff --git a/vendor/github.com/hashicorp/memberlist/net.go b/vendor/github.com/hashicorp/memberlist/net.go index 7f66f5d0a49..378adba78c2 100644 --- a/vendor/github.com/hashicorp/memberlist/net.go +++ b/vendor/github.com/hashicorp/memberlist/net.go @@ -5,6 +5,7 @@ import ( "bytes" "encoding/binary" "fmt" + "hash/crc32" "io" "net" "time" @@ -32,7 +33,7 @@ const ( // understand version 4 or greater. ProtocolVersion2Compatible = 2 - ProtocolVersionMax = 4 + ProtocolVersionMax = 5 ) // messageType is an integer ID of a type of message that can be received @@ -53,6 +54,7 @@ const ( compressMsg encryptMsg nackRespMsg + hasCrcMsg ) // compressionType is used to specify the compression algorithm @@ -338,8 +340,18 @@ func (m *Memberlist) ingestPacket(buf []byte, from net.Addr, timestamp time.Time buf = plain } - // Handle the command - m.handleCommand(buf, from, timestamp) + // See if there's a checksum included to verify the contents of the message + if len(buf) >= 5 && messageType(buf[0]) == hasCrcMsg { + crc := crc32.ChecksumIEEE(buf[5:]) + expected := binary.BigEndian.Uint32(buf[1:5]) + if crc != expected { + m.logger.Printf("[WARN] memberlist: Got invalid checksum for UDP packet: %x, %x", crc, expected) + return + } + m.handleCommand(buf[5:], from, timestamp) + } else { + m.handleCommand(buf, from, timestamp) + } } func (m *Memberlist) handleCommand(buf []byte, from net.Addr, timestamp time.Time) { @@ -601,7 +613,7 @@ func (m *Memberlist) sendMsg(to net.Addr, msg []byte) error { // Fast path if nothing to piggypack if len(extra) == 0 { - return m.rawSendMsgUDP(to, msg) + return m.rawSendMsgUDP(to, nil, msg) } // Join all the messages @@ -613,11 +625,11 @@ func (m *Memberlist) sendMsg(to net.Addr, msg []byte) error { compound := makeCompoundMessage(msgs) // Send the message - return m.rawSendMsgUDP(to, compound.Bytes()) + return m.rawSendMsgUDP(to, nil, compound.Bytes()) } // rawSendMsgUDP is used to send a UDP message to another host without modification -func (m *Memberlist) rawSendMsgUDP(to net.Addr, msg []byte) error { +func (m *Memberlist) rawSendMsgUDP(addr net.Addr, node *Node, msg []byte) error { // Check if we have compression enabled if m.config.EnableCompression { buf, err := compressPayload(msg) @@ -631,6 +643,31 @@ func (m *Memberlist) rawSendMsgUDP(to net.Addr, msg []byte) error { } } + // Try to look up the destination node + if node == nil { + toAddr, _, err := net.SplitHostPort(addr.String()) + if err != nil { + m.logger.Printf("[ERR] memberlist: Failed to parse address %q: %v", addr.String(), err) + return err + } + m.nodeLock.RLock() + nodeState, ok := m.nodeMap[toAddr] + m.nodeLock.RUnlock() + if ok { + node = &nodeState.Node + } + } + + // Add a CRC to the end of the payload if the recipient understands + // ProtocolVersion >= 5 + if node != nil && node.PMax >= 5 { + crc := crc32.ChecksumIEEE(msg) + header := make([]byte, 5, 5+len(msg)) + header[0] = byte(hasCrcMsg) + binary.BigEndian.PutUint32(header[1:], crc) + msg = append(header, msg...) + } + // Check if we have encryption enabled if m.config.EncryptionEnabled() { // Encrypt the payload @@ -645,7 +682,7 @@ func (m *Memberlist) rawSendMsgUDP(to net.Addr, msg []byte) error { } metrics.IncrCounter([]string{"memberlist", "udp", "sent"}, float32(len(msg))) - _, err := m.udpListener.WriteTo(msg, to) + _, err := m.udpListener.WriteTo(msg, addr) return err } diff --git a/vendor/github.com/hashicorp/memberlist/state.go b/vendor/github.com/hashicorp/memberlist/state.go index 77871299d75..cc422bc1a3c 100644 --- a/vendor/github.com/hashicorp/memberlist/state.go +++ b/vendor/github.com/hashicorp/memberlist/state.go @@ -261,7 +261,7 @@ func (m *Memberlist) probeNode(node *nodeState) { } compound := makeCompoundMessage(msgs) - if err := m.rawSendMsgUDP(destAddr, compound.Bytes()); err != nil { + if err := m.rawSendMsgUDP(destAddr, &node.Node, compound.Bytes()); err != nil { m.logger.Printf("[ERR] memberlist: Failed to send compound ping and suspect message to %s: %s", destAddr, err) return } @@ -310,8 +310,11 @@ func (m *Memberlist) probeNode(node *nodeState) { // Get some random live nodes. m.nodeLock.RLock() - excludes := []string{m.config.Name, node.Name} - kNodes := kRandomNodes(m.config.IndirectChecks, excludes, m.nodes) + kNodes := kRandomNodes(m.config.IndirectChecks, m.nodes, func(n *nodeState) bool { + return n.Name == m.config.Name || + n.Name == node.Name || + n.State != stateAlive + }) m.nodeLock.RUnlock() // Attempt an indirect ping. @@ -460,10 +463,24 @@ func (m *Memberlist) resetNodes() { func (m *Memberlist) gossip() { defer metrics.MeasureSince([]string{"memberlist", "gossip"}, time.Now()) - // Get some random live nodes + // Get some random live, suspect, or recently dead nodes m.nodeLock.RLock() - excludes := []string{m.config.Name} - kNodes := kRandomNodes(m.config.GossipNodes, excludes, m.nodes) + kNodes := kRandomNodes(m.config.GossipNodes, m.nodes, func(n *nodeState) bool { + if n.Name == m.config.Name { + return true + } + + switch n.State { + case stateAlive, stateSuspect: + return false + + case stateDead: + return time.Since(n.StateChange) > m.config.GossipToTheDeadTime + + default: + return true + } + }) m.nodeLock.RUnlock() // Compute the bytes available @@ -484,7 +501,7 @@ func (m *Memberlist) gossip() { // Send the compound message destAddr := &net.UDPAddr{IP: node.Addr, Port: int(node.Port)} - if err := m.rawSendMsgUDP(destAddr, compound.Bytes()); err != nil { + if err := m.rawSendMsgUDP(destAddr, &node.Node, compound.Bytes()); err != nil { m.logger.Printf("[ERR] memberlist: Failed to send gossip to %s: %s", destAddr, err) } } @@ -497,8 +514,10 @@ func (m *Memberlist) gossip() { func (m *Memberlist) pushPull() { // Get a random live node m.nodeLock.RLock() - excludes := []string{m.config.Name} - nodes := kRandomNodes(1, excludes, m.nodes) + nodes := kRandomNodes(1, m.nodes, func(n *nodeState) bool { + return n.Name == m.config.Name || + n.State != stateAlive + }) m.nodeLock.RUnlock() // If no nodes, bail diff --git a/vendor/github.com/hashicorp/memberlist/util.go b/vendor/github.com/hashicorp/memberlist/util.go index 7a59e3b3705..a294c3034d7 100644 --- a/vendor/github.com/hashicorp/memberlist/util.go +++ b/vendor/github.com/hashicorp/memberlist/util.go @@ -155,8 +155,9 @@ func randomOffset(n int) int { // suspicionTimeout computes the timeout that should be used when // a node is suspected func suspicionTimeout(suspicionMult, n int, interval time.Duration) time.Duration { - nodeScale := math.Ceil(math.Log10(float64(n + 1))) - timeout := time.Duration(suspicionMult) * time.Duration(nodeScale) * interval + nodeScale := math.Max(1.0, math.Log10(math.Max(1.0, float64(n)))) + // multiply by 1000 to keep some precision because time.Duration is an int64 type + timeout := time.Duration(suspicionMult) * time.Duration(nodeScale*1000) * interval / 1000 return timeout } @@ -207,9 +208,10 @@ func moveDeadNodes(nodes []*nodeState) int { return n - numDead } -// kRandomNodes is used to select up to k random nodes, excluding a given -// node and any non-alive nodes. It is possible that less than k nodes are returned. -func kRandomNodes(k int, excludes []string, nodes []*nodeState) []*nodeState { +// kRandomNodes is used to select up to k random nodes, excluding any nodes where +// the filter function returns true. It is possible that less than k nodes are +// returned. +func kRandomNodes(k int, nodes []*nodeState, filterFn func(*nodeState) bool) []*nodeState { n := len(nodes) kNodes := make([]*nodeState, 0, k) OUTER: @@ -221,16 +223,9 @@ OUTER: idx := randomOffset(n) node := nodes[idx] - // Exclude node if match - for _, exclude := range excludes { - if node.Name == exclude { - continue OUTER - } - } - - // Exclude if not alive - if node.State != stateAlive { - continue + // Give the filter a shot at it. + if filterFn != nil && filterFn(node) { + continue OUTER } // Check if we have this node already @@ -327,10 +322,18 @@ func isLoopbackIP(ip_str string) bool { return loopbackBlock.Contains(ip) } -// Given a string of the form "host", "host:port", or "[ipv6::address]:port", +// Given a string of the form "host", "host:port", +// "ipv6::addr" or "[ipv6::address]:port", // return true if the string includes a port. func hasPort(s string) bool { - return strings.LastIndex(s, ":") > strings.LastIndex(s, "]") + last := strings.LastIndex(s, ":") + if last == -1 { + return false + } + if s[0] == '[' { + return s[last-1] == ']' + } + return strings.Index(s, ":") == last } // compressPayload takes an opaque input buffer, compresses it diff --git a/vendor/vendor.json b/vendor/vendor.json index f751926a2c6..394408c1c45 100644 --- a/vendor/vendor.json +++ b/vendor/vendor.json @@ -774,10 +774,10 @@ "revision": "0dc08b1671f34c4250ce212759ebd880f743d883" }, { - "checksumSHA1": "Ozk/S4U1x/OllNP2SsMYJjCl/gs=", + "checksumSHA1": "hSoH77pX3FyU6kkYqOOYmf3r55Y=", "path": "github.com/hashicorp/memberlist", - "revision": "7ad712f5f34ec40aebe6ca47756d07898486a8d2", - "revisionTime": "2016-09-15T13:02:55Z" + "revision": "9800c50ab79c002353852a9b1095e9591b161513", + "revisionTime": "2016-12-13T23:44:46Z" }, { "path": "github.com/hashicorp/net-rpc-msgpackrpc",