diff --git a/GNUmakefile b/GNUmakefile index 37aa0f29feee..a419e6c73018 100644 --- a/GNUmakefile +++ b/GNUmakefile @@ -58,7 +58,7 @@ cov: test: dev-build vet go test -tags '$(GOTAGS)' -i ./... - go test $(GOTEST_FLAGS) -tags '$(GOTAGS)' -timeout 7m -v ./... 2>&1 >test$(GOTEST_FLAGS).log ; echo $$? > exit-code + go test $(GOTEST_FLAGS) -tags '$(GOTAGS)' -timeout 7m -v ./... 2>&1 >test.log ; echo $$? > exit-code @echo "Exit code: `cat exit-code`" >> test$(GOTEST_FLAGS).log @echo "----" @grep -A5 'DATA RACE' test.log || true diff --git a/agent/agent.go b/agent/agent.go index ee121dd16f09..b0a59022dbb0 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -32,7 +32,6 @@ import ( "github.com/hashicorp/consul/watch" "github.com/hashicorp/go-uuid" "github.com/hashicorp/raft" - "github.com/hashicorp/serf/coordinate" "github.com/hashicorp/serf/serf" "github.com/shirou/gopsutil/host" ) @@ -56,9 +55,10 @@ const ( // consul.Client and consul.Server. type delegate interface { Encrypted() bool - GetLANCoordinate() (*coordinate.Coordinate, error) + GetLANCoordinate() (lib.CoordinateSet, error) Leave() error LANMembers() []serf.Member + LANSegmentMembers(name string) ([]serf.Member, error) LocalMember() serf.Member JoinLAN(addrs []string) (n int, err error) RemoveFailedNode(node string) error @@ -647,6 +647,32 @@ func (a *Agent) consulConfig() (*consul.Config, error) { if a.config.AdvertiseAddrs.RPC != nil { base.RPCAdvertise = a.config.AdvertiseAddrs.RPC } + base.Segment = a.config.Segment + for _, segment := range a.config.Segments { + config := consul.DefaultConfig().SerfLANConfig + + config.MemberlistConfig.AdvertiseAddr = segment.Advertise + config.MemberlistConfig.AdvertisePort = segment.Port + config.MemberlistConfig.BindAddr = segment.Bind + config.MemberlistConfig.BindPort = segment.Port + if a.config.ReconnectTimeoutLan != 0 { + config.ReconnectTimeout = a.config.ReconnectTimeoutLan + } + if a.config.EncryptVerifyIncoming != nil { + config.MemberlistConfig.GossipVerifyIncoming = *a.config.EncryptVerifyIncoming + } + if a.config.EncryptVerifyOutgoing != nil { + config.MemberlistConfig.GossipVerifyOutgoing = *a.config.EncryptVerifyOutgoing + } + + base.Segments = append(base.Segments, consul.NetworkSegment{ + Name: segment.Name, + Bind: segment.Bind, + Port: segment.Port, + Advertise: segment.Advertise, + SerfConfig: config, + }) + } if a.config.Bootstrap { base.Bootstrap = true } @@ -1154,15 +1180,16 @@ func (a *Agent) ResumeSync() { a.state.Resume() } -// GetLANCoordinate returns the coordinate of this node in the local pool (assumes coordinates -// are enabled, so check that before calling). -func (a *Agent) GetLANCoordinate() (*coordinate.Coordinate, error) { +// GetLANCoordinate returns the coordinates of this node in the local pools +// (assumes coordinates are enabled, so check that before calling). +func (a *Agent) GetLANCoordinate() (lib.CoordinateSet, error) { return a.delegate.GetLANCoordinate() } // sendCoordinate is a long-running loop that periodically sends our coordinate // to the server. Closing the agent's shutdownChannel will cause this to exit. func (a *Agent) sendCoordinate() { +OUTER: for { rate := a.config.SyncCoordinateRateTarget min := a.config.SyncCoordinateIntervalMin @@ -1182,26 +1209,29 @@ func (a *Agent) sendCoordinate() { continue } - c, err := a.GetLANCoordinate() + cs, err := a.GetLANCoordinate() if err != nil { a.logger.Printf("[ERR] agent: Failed to get coordinate: %s", err) continue } - req := structs.CoordinateUpdateRequest{ - Datacenter: a.config.Datacenter, - Node: a.config.NodeName, - Coord: c, - WriteRequest: structs.WriteRequest{Token: a.tokens.AgentToken()}, - } - var reply struct{} - if err := a.RPC("Coordinate.Update", &req, &reply); err != nil { - if acl.IsErrPermissionDenied(err) { - a.logger.Printf("[WARN] agent: Coordinate update blocked by ACLs") - } else { - a.logger.Printf("[ERR] agent: Coordinate update error: %v", err) + for segment, coord := range cs { + req := structs.CoordinateUpdateRequest{ + Datacenter: a.config.Datacenter, + Node: a.config.NodeName, + Segment: segment, + Coord: coord, + WriteRequest: structs.WriteRequest{Token: a.tokens.AgentToken()}, + } + var reply struct{} + if err := a.RPC("Coordinate.Update", &req, &reply); err != nil { + if acl.IsErrPermissionDenied(err) { + a.logger.Printf("[WARN] agent: Coordinate update blocked by ACLs") + } else { + a.logger.Printf("[ERR] agent: Coordinate update error: %v", err) + } + continue OUTER } - continue } case <-a.shutdownCh: return @@ -2105,6 +2135,11 @@ func (a *Agent) loadMetadata(conf *Config) error { a.state.metadata[key] = value } + // The segment isn't reloadable so we only add it once. + if _, ok := a.state.metadata[structs.MetaSegmentKey]; !ok { + a.state.metadata[structs.MetaSegmentKey] = conf.Segment + } + a.state.changeMade() return nil diff --git a/agent/agent_endpoint.go b/agent/agent_endpoint.go index e14b71321ab9..7438df9e5605 100644 --- a/agent/agent_endpoint.go +++ b/agent/agent_endpoint.go @@ -11,6 +11,7 @@ import ( "github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/api" "github.com/hashicorp/consul/ipaddr" + "github.com/hashicorp/consul/lib" "github.com/hashicorp/consul/logger" "github.com/hashicorp/consul/types" "github.com/hashicorp/logutils" @@ -27,10 +28,10 @@ type Self struct { } func (s *HTTPServer) AgentSelf(resp http.ResponseWriter, req *http.Request) (interface{}, error) { - var c *coordinate.Coordinate + var cs lib.CoordinateSet if !s.agent.config.DisableCoordinates { var err error - if c, err = s.agent.GetLANCoordinate(); err != nil { + if cs, err = s.agent.GetLANCoordinate(); err != nil { return nil, err } } @@ -48,7 +49,7 @@ func (s *HTTPServer) AgentSelf(resp http.ResponseWriter, req *http.Request) (int return Self{ Config: s.agent.config, - Coord: c, + Coord: cs[s.agent.config.Segment], Member: s.agent.LocalMember(), Stats: s.agent.Stats(), Meta: s.agent.state.Metadata(), @@ -155,11 +156,24 @@ func (s *HTTPServer) AgentMembers(resp http.ResponseWriter, req *http.Request) ( wan = true } + segment := req.URL.Query().Get("segment") + if wan && segment != "" { + resp.WriteHeader(http.StatusBadRequest) + fmt.Fprint(resp, "Cannot provide a segment with wan=true") + return nil, nil + } + var members []serf.Member if wan { members = s.agent.WANMembers() - } else { + } else if segment == "" { members = s.agent.LANMembers() + } else { + var err error + members, err = s.agent.delegate.LANSegmentMembers(segment) + if err != nil { + return nil, err + } } if err := s.agent.filterMembers(token, &members); err != nil { return nil, err diff --git a/agent/agent_endpoint_test.go b/agent/agent_endpoint_test.go index 9538b811b93f..ff77b5597535 100644 --- a/agent/agent_endpoint_test.go +++ b/agent/agent_endpoint_test.go @@ -191,13 +191,14 @@ func TestAgent_Self(t *testing.T) { t.Fatalf("incorrect port: %v", obj) } - c, err := a.GetLANCoordinate() + cs, err := a.GetLANCoordinate() if err != nil { t.Fatalf("err: %v", err) } - if !reflect.DeepEqual(c, val.Coord) { + if c := cs[cfg.Segment]; !reflect.DeepEqual(c, val.Coord) { t.Fatalf("coordinates are not equal: %v != %v", c, val.Coord) } + delete(val.Meta, structs.MetaSegmentKey) // Added later, not in config. if !reflect.DeepEqual(cfg.Meta, val.Meta) { t.Fatalf("meta fields are not equal: %v != %v", cfg.Meta, val.Meta) } diff --git a/agent/config.go b/agent/config.go index 51db1cc7ac70..5e0d6e86b684 100644 --- a/agent/config.go +++ b/agent/config.go @@ -342,6 +342,22 @@ type Autopilot struct { UpgradeVersionTag string `mapstructure:"upgrade_version_tag"` } +// (Enterprise-only) NetworkSegment is the configuration for a network segment, which is an +// isolated serf group on the LAN. +type NetworkSegment struct { + // Name is the name of the segment. + Name string `mapstructure:"name"` + + // Bind is the bind address for this segment. + Bind string `mapstructure:"bind"` + + // Port is the port for this segment. + Port int `mapstructure:"port"` + + // Advertise is the advertise address of this segment. + Advertise string `mapstructure:"advertise"` +} + // Config is the configuration that can be set for an Agent. // Some of this is configurable as CLI flags, but most must // be set using a configuration file. @@ -465,6 +481,12 @@ type Config struct { // Address configurations Addresses AddressConfig + // (Enterprise-only) NetworkSegment is the network segment for this client to join + Segment string `mapstructure:"segment"` + + // Segments + Segments []NetworkSegment `mapstructure:"segments"` + // Tagged addresses. These are used to publish a set of addresses for // for a node, which can be used by the remote agent. We currently // populate only the "wan" tag based on the SerfWan advertise address, @@ -1426,6 +1448,11 @@ func DecodeConfig(r io.Reader) (*Config, error) { } } + // Validate node meta fields + if err := structs.ValidateMetadata(result.Meta); err != nil { + return nil, fmt.Errorf("Failed to parse node metadata: %v", err) + } + return &result, nil } @@ -1861,6 +1888,12 @@ func MergeConfig(a, b *Config) *Config { if b.Addresses.RPC != "" { result.Addresses.RPC = b.Addresses.RPC } + if b.Segment != "" { + result.Segment = b.Segment + } + if len(b.Segments) > 0 { + result.Segments = append(result.Segments, b.Segments...) + } if b.EnableUI { result.EnableUI = true } @@ -2204,6 +2237,11 @@ func (c *Config) ResolveTmplAddrs() (err error) { parse(&c.ClientAddr, true, "Client address") parse(&c.SerfLanBindAddr, false, "Serf LAN address") parse(&c.SerfWanBindAddr, false, "Serf WAN address") + for i, segment := range c.Segments { + parse(&c.Segments[i].Bind, false, fmt.Sprintf("Segment %q bind address", segment.Name)) + parse(&c.Segments[i].Advertise, false, fmt.Sprintf("Segment %q advertise address", segment.Name)) + + } return } diff --git a/agent/config_test.go b/agent/config_test.go index 6f4c5e2fb8f1..3494dcd891b1 100644 --- a/agent/config_test.go +++ b/agent/config_test.go @@ -592,6 +592,14 @@ func TestDecodeConfig(t *testing.T) { in: `{"retry_max_wan":123}`, c: &Config{RetryMaxAttemptsWan: 123}, }, + { + in: `{"segment":"thing"}`, + c: &Config{Segment: "thing"}, + }, + { + in: `{"segments":[{"name": "alpha", "bind": "127.0.0.1", "port": 1234, "advertise": "1.1.1.1"}]}`, + c: &Config{Segments: []NetworkSegment{{Name: "alpha", Bind: "127.0.0.1", Port: 1234, Advertise: "1.1.1.1"}}}, + }, { in: `{"serf_lan_bind":"1.2.3.4"}`, c: &Config{SerfLanBindAddr: "1.2.3.4"}, @@ -1401,6 +1409,15 @@ func TestMergeConfig(t *testing.T) { HTTP: "127.0.0.2", HTTPS: "127.0.0.4", }, + Segment: "alpha", + Segments: []NetworkSegment{ + { + Name: "alpha", + Bind: "127.0.0.1", + Port: 1234, + Advertise: "127.0.0.2", + }, + }, Server: true, LeaveOnTerm: Bool(true), SkipLeaveOnInt: Bool(true), diff --git a/agent/consul/catalog_endpoint_test.go b/agent/consul/catalog_endpoint_test.go index 3391fff98dc3..b2adffe04f13 100644 --- a/agent/consul/catalog_endpoint_test.go +++ b/agent/consul/catalog_endpoint_test.go @@ -890,9 +890,9 @@ func TestCatalog_ListNodes_DistanceSort(t *testing.T) { // Set all but one of the nodes to known coordinates. updates := structs.Coordinates{ - {"foo", lib.GenerateCoordinate(2 * time.Millisecond)}, - {"bar", lib.GenerateCoordinate(5 * time.Millisecond)}, - {"baz", lib.GenerateCoordinate(1 * time.Millisecond)}, + {Node: "foo", Coord: lib.GenerateCoordinate(2 * time.Millisecond)}, + {Node: "bar", Coord: lib.GenerateCoordinate(5 * time.Millisecond)}, + {Node: "baz", Coord: lib.GenerateCoordinate(1 * time.Millisecond)}, } if err := s1.fsm.State().CoordinateBatchUpdate(5, updates); err != nil { t.Fatalf("err: %v", err) @@ -1495,9 +1495,9 @@ func TestCatalog_ListServiceNodes_DistanceSort(t *testing.T) { // Set all but one of the nodes to known coordinates. updates := structs.Coordinates{ - {"foo", lib.GenerateCoordinate(2 * time.Millisecond)}, - {"bar", lib.GenerateCoordinate(5 * time.Millisecond)}, - {"baz", lib.GenerateCoordinate(1 * time.Millisecond)}, + {Node: "foo", Coord: lib.GenerateCoordinate(2 * time.Millisecond)}, + {Node: "bar", Coord: lib.GenerateCoordinate(5 * time.Millisecond)}, + {Node: "baz", Coord: lib.GenerateCoordinate(1 * time.Millisecond)}, } if err := s1.fsm.State().CoordinateBatchUpdate(9, updates); err != nil { t.Fatalf("err: %v", err) diff --git a/agent/consul/client.go b/agent/consul/client.go index 00f339d6d46c..69552d87c1f6 100644 --- a/agent/consul/client.go +++ b/agent/consul/client.go @@ -5,18 +5,14 @@ import ( "io" "log" "os" - "path/filepath" "strconv" - "strings" "sync" "time" - "github.com/hashicorp/consul/agent/metadata" "github.com/hashicorp/consul/agent/pool" "github.com/hashicorp/consul/agent/router" "github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/lib" - "github.com/hashicorp/serf/coordinate" "github.com/hashicorp/serf/serf" ) @@ -146,35 +142,6 @@ func NewClientLogger(config *Config, logger *log.Logger) (*Client, error) { return c, nil } -// setupSerf is used to setup and initialize a Serf -func (c *Client) setupSerf(conf *serf.Config, ch chan serf.Event, path string) (*serf.Serf, error) { - conf.Init() - conf.NodeName = c.config.NodeName - conf.Tags["role"] = "node" - conf.Tags["dc"] = c.config.Datacenter - conf.Tags["id"] = string(c.config.NodeID) - conf.Tags["vsn"] = fmt.Sprintf("%d", c.config.ProtocolVersion) - conf.Tags["vsn_min"] = fmt.Sprintf("%d", ProtocolVersionMin) - conf.Tags["vsn_max"] = fmt.Sprintf("%d", ProtocolVersionMax) - conf.Tags["build"] = c.config.Build - conf.MemberlistConfig.LogOutput = c.config.LogOutput - conf.LogOutput = c.config.LogOutput - conf.Logger = c.logger - conf.EventCh = ch - conf.SnapshotPath = filepath.Join(c.config.DataDir, path) - conf.ProtocolVersion = protocolVersionMap[c.config.ProtocolVersion] - conf.RejoinAfterLeave = c.config.RejoinAfterLeave - conf.Merge = &lanMergeDelegate{ - dc: c.config.Datacenter, - nodeID: c.config.NodeID, - nodeName: c.config.NodeName, - } - if err := lib.EnsurePath(conf.SnapshotPath, false); err != nil { - return nil, err - } - return serf.Create(conf) -} - // Shutdown is used to shutdown the client func (c *Client) Shutdown() error { c.logger.Printf("[INFO] consul: shutting down client") @@ -227,6 +194,16 @@ func (c *Client) LANMembers() []serf.Member { return c.serf.Members() } +// LANSegmentMembers only returns our own segment's members, because clients +// can't be in multiple segments. +func (c *Client) LANSegmentMembers(name string) ([]serf.Member, error) { + if name == c.config.Segment { + return c.LANMembers(), nil + } + + return nil, fmt.Errorf("segment %q not found", name) +} + // RemoveFailedNode is used to remove a failed node from the cluster func (c *Client) RemoveFailedNode(node string) error { return c.serf.RemoveFailedNode(node) @@ -242,98 +219,6 @@ func (c *Client) Encrypted() bool { return c.serf.EncryptionEnabled() } -// lanEventHandler is used to handle events from the lan Serf cluster -func (c *Client) lanEventHandler() { - var numQueuedEvents int - for { - numQueuedEvents = len(c.eventCh) - if numQueuedEvents > serfEventBacklogWarning { - c.logger.Printf("[WARN] consul: number of queued serf events above warning threshold: %d/%d", numQueuedEvents, serfEventBacklogWarning) - } - - select { - case e := <-c.eventCh: - switch e.EventType() { - case serf.EventMemberJoin: - c.nodeJoin(e.(serf.MemberEvent)) - case serf.EventMemberLeave, serf.EventMemberFailed: - c.nodeFail(e.(serf.MemberEvent)) - case serf.EventUser: - c.localEvent(e.(serf.UserEvent)) - case serf.EventMemberUpdate: // Ignore - case serf.EventMemberReap: // Ignore - case serf.EventQuery: // Ignore - default: - c.logger.Printf("[WARN] consul: unhandled LAN Serf Event: %#v", e) - } - case <-c.shutdownCh: - return - } - } -} - -// nodeJoin is used to handle join events on the serf cluster -func (c *Client) nodeJoin(me serf.MemberEvent) { - for _, m := range me.Members { - ok, parts := metadata.IsConsulServer(m) - if !ok { - continue - } - if parts.Datacenter != c.config.Datacenter { - c.logger.Printf("[WARN] consul: server %s for datacenter %s has joined wrong cluster", - m.Name, parts.Datacenter) - continue - } - c.logger.Printf("[INFO] consul: adding server %s", parts) - c.routers.AddServer(parts) - - // Trigger the callback - if c.config.ServerUp != nil { - c.config.ServerUp() - } - } -} - -// nodeFail is used to handle fail events on the serf cluster -func (c *Client) nodeFail(me serf.MemberEvent) { - for _, m := range me.Members { - ok, parts := metadata.IsConsulServer(m) - if !ok { - continue - } - c.logger.Printf("[INFO] consul: removing server %s", parts) - c.routers.RemoveServer(parts) - } -} - -// localEvent is called when we receive an event on the local Serf -func (c *Client) localEvent(event serf.UserEvent) { - // Handle only consul events - if !strings.HasPrefix(event.Name, "consul:") { - return - } - - switch name := event.Name; { - case name == newLeaderEvent: - c.logger.Printf("[INFO] consul: New leader elected: %s", event.Payload) - - // Trigger the callback - if c.config.ServerUp != nil { - c.config.ServerUp() - } - case isUserEvent(name): - event.Name = rawUserEventName(name) - c.logger.Printf("[DEBUG] consul: user event: %s", event.Name) - - // Trigger the callback - if c.config.UserEventHandler != nil { - c.config.UserEventHandler(event) - } - default: - c.logger.Printf("[WARN] consul: Unhandled local event: %v", event) - } -} - // RPC is used to forward an RPC call to a consul server, or fail if no servers func (c *Client) RPC(method string, args interface{}, reply interface{}) error { server := c.routers.FindServer() @@ -413,6 +298,12 @@ func (c *Client) Stats() map[string]map[string]string { // GetLANCoordinate returns the network coordinate of the current node, as // maintained by Serf. -func (c *Client) GetLANCoordinate() (*coordinate.Coordinate, error) { - return c.serf.GetCoordinate() +func (c *Client) GetLANCoordinate() (lib.CoordinateSet, error) { + lan, err := c.serf.GetCoordinate() + if err != nil { + return nil, err + } + + cs := lib.CoordinateSet{c.config.Segment: lan} + return cs, nil } diff --git a/agent/consul/client_serf.go b/agent/consul/client_serf.go new file mode 100644 index 000000000000..0b64e41145a7 --- /dev/null +++ b/agent/consul/client_serf.go @@ -0,0 +1,137 @@ +package consul + +import ( + "fmt" + "path/filepath" + "strings" + + "github.com/hashicorp/consul/agent/metadata" + "github.com/hashicorp/consul/lib" + "github.com/hashicorp/serf/serf" +) + +// setupSerf is used to setup and initialize a Serf +func (c *Client) setupSerf(conf *serf.Config, ch chan serf.Event, path string) (*serf.Serf, error) { + conf.Init() + + conf.NodeName = c.config.NodeName + conf.Tags["role"] = "node" + conf.Tags["dc"] = c.config.Datacenter + conf.Tags["segment"] = c.config.Segment + conf.Tags["id"] = string(c.config.NodeID) + conf.Tags["vsn"] = fmt.Sprintf("%d", c.config.ProtocolVersion) + conf.Tags["vsn_min"] = fmt.Sprintf("%d", ProtocolVersionMin) + conf.Tags["vsn_max"] = fmt.Sprintf("%d", ProtocolVersionMax) + conf.Tags["build"] = c.config.Build + conf.MemberlistConfig.LogOutput = c.config.LogOutput + conf.LogOutput = c.config.LogOutput + conf.Logger = c.logger + conf.EventCh = ch + conf.ProtocolVersion = protocolVersionMap[c.config.ProtocolVersion] + conf.RejoinAfterLeave = c.config.RejoinAfterLeave + conf.Merge = &lanMergeDelegate{ + dc: c.config.Datacenter, + nodeID: c.config.NodeID, + nodeName: c.config.NodeName, + segment: c.config.Segment, + } + + conf.SnapshotPath = filepath.Join(c.config.DataDir, path) + if err := lib.EnsurePath(conf.SnapshotPath, false); err != nil { + return nil, err + } + + return serf.Create(conf) +} + +// lanEventHandler is used to handle events from the lan Serf cluster +func (c *Client) lanEventHandler() { + var numQueuedEvents int + for { + numQueuedEvents = len(c.eventCh) + if numQueuedEvents > serfEventBacklogWarning { + c.logger.Printf("[WARN] consul: number of queued serf events above warning threshold: %d/%d", numQueuedEvents, serfEventBacklogWarning) + } + + select { + case e := <-c.eventCh: + switch e.EventType() { + case serf.EventMemberJoin: + c.nodeJoin(e.(serf.MemberEvent)) + case serf.EventMemberLeave, serf.EventMemberFailed: + c.nodeFail(e.(serf.MemberEvent)) + case serf.EventUser: + c.localEvent(e.(serf.UserEvent)) + case serf.EventMemberUpdate: // Ignore + case serf.EventMemberReap: // Ignore + case serf.EventQuery: // Ignore + default: + c.logger.Printf("[WARN] consul: unhandled LAN Serf Event: %#v", e) + } + case <-c.shutdownCh: + return + } + } +} + +// nodeJoin is used to handle join events on the serf cluster +func (c *Client) nodeJoin(me serf.MemberEvent) { + for _, m := range me.Members { + ok, parts := metadata.IsConsulServer(m) + if !ok { + continue + } + if parts.Datacenter != c.config.Datacenter { + c.logger.Printf("[WARN] consul: server %s for datacenter %s has joined wrong cluster", + m.Name, parts.Datacenter) + continue + } + c.logger.Printf("[INFO] consul: adding server %s", parts) + c.routers.AddServer(parts) + + // Trigger the callback + if c.config.ServerUp != nil { + c.config.ServerUp() + } + } +} + +// nodeFail is used to handle fail events on the serf cluster +func (c *Client) nodeFail(me serf.MemberEvent) { + for _, m := range me.Members { + ok, parts := metadata.IsConsulServer(m) + if !ok { + continue + } + c.logger.Printf("[INFO] consul: removing server %s", parts) + c.routers.RemoveServer(parts) + } +} + +// localEvent is called when we receive an event on the local Serf +func (c *Client) localEvent(event serf.UserEvent) { + // Handle only consul events + if !strings.HasPrefix(event.Name, "consul:") { + return + } + + switch name := event.Name; { + case name == newLeaderEvent: + c.logger.Printf("[INFO] consul: New leader elected: %s", event.Payload) + + // Trigger the callback + if c.config.ServerUp != nil { + c.config.ServerUp() + } + case isUserEvent(name): + event.Name = rawUserEventName(name) + c.logger.Printf("[DEBUG] consul: user event: %s", event.Name) + + // Trigger the callback + if c.config.UserEventHandler != nil { + c.config.UserEventHandler(event) + } + default: + c.logger.Printf("[WARN] consul: Unhandled local event: %v", event) + } +} diff --git a/agent/consul/config.go b/agent/consul/config.go index 2361161cf3f3..c5d4d4501c57 100644 --- a/agent/consul/config.go +++ b/agent/consul/config.go @@ -49,6 +49,15 @@ func init() { } } +// (Enterprise-only) +type NetworkSegment struct { + Name string + Bind string + Port int + Advertise string + SerfConfig *serf.Config +} + // Config is used to configure the server type Config struct { // Bootstrap mode is used to bring up the first Consul server. @@ -105,6 +114,13 @@ type Config struct { // RPCSrcAddr is the source address for outgoing RPC connections. RPCSrcAddr *net.TCPAddr + // (Enterprise-only) The network segment this agent is part of. + Segment string + + // (Enterprise-only) Segments is a list of network segments for a server to + // bind on. + Segments []NetworkSegment + // SerfLANConfig is the configuration for the intra-dc serf SerfLANConfig *serf.Config diff --git a/agent/consul/coordinate_endpoint.go b/agent/consul/coordinate_endpoint.go index c279afd4a9b5..3ba1dcdc52fc 100644 --- a/agent/consul/coordinate_endpoint.go +++ b/agent/consul/coordinate_endpoint.go @@ -10,7 +10,6 @@ import ( "github.com/hashicorp/consul/agent/consul/state" "github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/go-memdb" - "github.com/hashicorp/serf/coordinate" ) // Coordinate manages queries and updates for network coordinates. @@ -18,8 +17,10 @@ type Coordinate struct { // srv is a pointer back to the server. srv *Server - // updates holds pending coordinate updates for the given nodes. - updates map[string]*coordinate.Coordinate + // updates holds pending coordinate updates for the given nodes. This is + // keyed by node:segment so we can get a coordinate for each segment for + // servers, and we only track the latest update per node:segment. + updates map[string]*structs.CoordinateUpdateRequest // updatesLock synchronizes access to the updates map. updatesLock sync.Mutex @@ -29,7 +30,7 @@ type Coordinate struct { func NewCoordinate(srv *Server) *Coordinate { c := &Coordinate{ srv: srv, - updates: make(map[string]*coordinate.Coordinate), + updates: make(map[string]*structs.CoordinateUpdateRequest), } go c.batchUpdate() @@ -58,7 +59,7 @@ func (c *Coordinate) batchApplyUpdates() error { // incoming messages. c.updatesLock.Lock() pending := c.updates - c.updates = make(map[string]*coordinate.Coordinate) + c.updates = make(map[string]*structs.CoordinateUpdateRequest) c.updatesLock.Unlock() // Enforce the rate limit. @@ -73,12 +74,16 @@ func (c *Coordinate) batchApplyUpdates() error { // batches. i := 0 updates := make(structs.Coordinates, size) - for node, coord := range pending { + for _, update := range pending { if !(i < size) { break } - updates[i] = &structs.Coordinate{Node: node, Coord: coord} + updates[i] = &structs.Coordinate{ + Node: update.Node, + Segment: update.Segment, + Coord: update.Coord, + } i++ } @@ -140,8 +145,9 @@ func (c *Coordinate) Update(args *structs.CoordinateUpdateRequest, reply *struct } // Add the coordinate to the map of pending updates. + key := fmt.Sprintf("%s:%s", args.Node, args.Segment) c.updatesLock.Lock() - c.updates[args.Node] = args.Coord + c.updates[key] = args c.updatesLock.Unlock() return nil } @@ -187,6 +193,7 @@ func (c *Coordinate) ListNodes(args *structs.DCSpecificRequest, reply *structs.I if err := c.srv.filterACL(args.Token, reply); err != nil { return err } + return nil }) } diff --git a/agent/consul/coordinate_endpoint_test.go b/agent/consul/coordinate_endpoint_test.go index 34fb3712a713..ba9ded9a7ff4 100644 --- a/agent/consul/coordinate_endpoint_test.go +++ b/agent/consul/coordinate_endpoint_test.go @@ -5,17 +5,18 @@ import ( "math" "math/rand" "os" - "reflect" "strings" "testing" "time" "github.com/hashicorp/consul/acl" "github.com/hashicorp/consul/agent/structs" + "github.com/hashicorp/consul/lib" "github.com/hashicorp/consul/testrpc" "github.com/hashicorp/consul/testutil/retry" "github.com/hashicorp/net-rpc-msgpackrpc" "github.com/hashicorp/serf/coordinate" + "github.com/pascaldekloe/goe/verify" ) // generateRandomCoordinate creates a random coordinate. This mucks with the @@ -33,15 +34,6 @@ func generateRandomCoordinate() *coordinate.Coordinate { return coord } -// verifyCoordinatesEqual will compare a and b and fail if they are not exactly -// equal (no floating point fuzz is considered since we are trying to make sure -// we are getting exactly the coordinates we expect, without math on them). -func verifyCoordinatesEqual(t *testing.T, a, b *coordinate.Coordinate) { - if !reflect.DeepEqual(a, b) { - t.Fatalf("coordinates are not equal: %v != %v", a, b) - } -} - func TestCoordinate_Update(t *testing.T) { t.Parallel() dir1, s1 := testServerWithConfig(t, func(c *Config) { @@ -94,20 +86,17 @@ func TestCoordinate_Update(t *testing.T) { // Make sure the updates did not yet apply because the update period // hasn't expired. state := s1.fsm.State() - c, err := state.CoordinateGetRaw("node1") + c, err := state.Coordinate("node1") if err != nil { t.Fatalf("err: %v", err) } - if c != nil { - t.Fatalf("should be nil because the update should be batched") - } - c, err = state.CoordinateGetRaw("node2") + verify.Values(t, "", c, lib.CoordinateSet{}) + + c, err = state.Coordinate("node2") if err != nil { t.Fatalf("err: %v", err) } - if c != nil { - t.Fatalf("should be nil because the update should be batched") - } + verify.Values(t, "", c, lib.CoordinateSet{}) // Send another update for the second node. It should take precedence // since there will be two updates in the same batch. @@ -118,22 +107,23 @@ func TestCoordinate_Update(t *testing.T) { // Wait a while and the updates should get picked up. time.Sleep(3 * s1.config.CoordinateUpdatePeriod) - c, err = state.CoordinateGetRaw("node1") + c, err = state.Coordinate("node1") if err != nil { t.Fatalf("err: %v", err) } - if c == nil { - t.Fatalf("should return a coordinate but it's nil") + expected := lib.CoordinateSet{ + "": arg1.Coord, } - verifyCoordinatesEqual(t, c, arg1.Coord) - c, err = state.CoordinateGetRaw("node2") + verify.Values(t, "", c, expected) + + c, err = state.Coordinate("node2") if err != nil { t.Fatalf("err: %v", err) } - if c == nil { - t.Fatalf("should return a coordinate but it's nil") + expected = lib.CoordinateSet{ + "": arg2.Coord, } - verifyCoordinatesEqual(t, c, arg2.Coord) + verify.Values(t, "", c, expected) // Register a bunch of additional nodes. spamLen := s1.config.CoordinateUpdateBatchSize*s1.config.CoordinateUpdateMaxBatches + 1 @@ -165,11 +155,11 @@ func TestCoordinate_Update(t *testing.T) { time.Sleep(3 * s1.config.CoordinateUpdatePeriod) numDropped := 0 for i := 0; i < spamLen; i++ { - c, err = state.CoordinateGetRaw(fmt.Sprintf("bogusnode%d", i)) + c, err = state.Coordinate(fmt.Sprintf("bogusnode%d", i)) if err != nil { t.Fatalf("err: %v", err) } - if c == nil { + if len(c) == 0 { numDropped++ } } @@ -304,7 +294,7 @@ func TestCoordinate_ListDatacenters(t *testing.T) { if err != nil { t.Fatalf("bad: %v", err) } - verifyCoordinatesEqual(t, c, out[0].Coordinates[0].Coord) + verify.Values(t, "", c, out[0].Coordinates[0].Coord) } func TestCoordinate_ListNodes(t *testing.T) { @@ -374,9 +364,9 @@ func TestCoordinate_ListNodes(t *testing.T) { resp.Coordinates[2].Node != "foo" { r.Fatalf("bad: %v", resp.Coordinates) } - verifyCoordinatesEqual(t, resp.Coordinates[0].Coord, arg2.Coord) // bar - verifyCoordinatesEqual(t, resp.Coordinates[1].Coord, arg3.Coord) // baz - verifyCoordinatesEqual(t, resp.Coordinates[2].Coord, arg1.Coord) // foo + verify.Values(t, "", resp.Coordinates[0].Coord, arg2.Coord) // bar + verify.Values(t, "", resp.Coordinates[1].Coord, arg3.Coord) // baz + verify.Values(t, "", resp.Coordinates[2].Coord, arg1.Coord) // foo }) } diff --git a/agent/consul/health_endpoint_test.go b/agent/consul/health_endpoint_test.go index 3f44e78c9db2..c9581e3a7514 100644 --- a/agent/consul/health_endpoint_test.go +++ b/agent/consul/health_endpoint_test.go @@ -171,8 +171,8 @@ func TestHealth_ChecksInState_DistanceSort(t *testing.T) { t.Fatalf("err: %v", err) } updates := structs.Coordinates{ - {"foo", lib.GenerateCoordinate(1 * time.Millisecond)}, - {"bar", lib.GenerateCoordinate(2 * time.Millisecond)}, + {Node: "foo", Coord: lib.GenerateCoordinate(1 * time.Millisecond)}, + {Node: "bar", Coord: lib.GenerateCoordinate(2 * time.Millisecond)}, } if err := s1.fsm.State().CoordinateBatchUpdate(3, updates); err != nil { t.Fatalf("err: %v", err) @@ -444,8 +444,8 @@ func TestHealth_ServiceChecks_DistanceSort(t *testing.T) { t.Fatalf("err: %v", err) } updates := structs.Coordinates{ - {"foo", lib.GenerateCoordinate(1 * time.Millisecond)}, - {"bar", lib.GenerateCoordinate(2 * time.Millisecond)}, + {Node: "foo", Coord: lib.GenerateCoordinate(1 * time.Millisecond)}, + {Node: "bar", Coord: lib.GenerateCoordinate(2 * time.Millisecond)}, } if err := s1.fsm.State().CoordinateBatchUpdate(3, updates); err != nil { t.Fatalf("err: %v", err) @@ -748,8 +748,8 @@ func TestHealth_ServiceNodes_DistanceSort(t *testing.T) { t.Fatalf("err: %v", err) } updates := structs.Coordinates{ - {"foo", lib.GenerateCoordinate(1 * time.Millisecond)}, - {"bar", lib.GenerateCoordinate(2 * time.Millisecond)}, + {Node: "foo", Coord: lib.GenerateCoordinate(1 * time.Millisecond)}, + {Node: "bar", Coord: lib.GenerateCoordinate(2 * time.Millisecond)}, } if err := s1.fsm.State().CoordinateBatchUpdate(3, updates); err != nil { t.Fatalf("err: %v", err) diff --git a/agent/consul/internal_endpoint.go b/agent/consul/internal_endpoint.go index d1d7080dc445..8168be03188e 100644 --- a/agent/consul/internal_endpoint.go +++ b/agent/consul/internal_endpoint.go @@ -7,6 +7,7 @@ import ( "github.com/hashicorp/consul/agent/consul/state" "github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/go-memdb" + "github.com/hashicorp/go-multierror" "github.com/hashicorp/serf/serf" ) @@ -85,8 +86,22 @@ func (m *Internal) EventFire(args *structs.EventFireRequest, // Add the consul prefix to the event name eventName := userEventName(args.Name) - // Fire the event - return m.srv.serfLAN.UserEvent(eventName, args.Payload, false) + // Fire the event on all LAN segments + segments := m.srv.LANSegments() + var errs error + err = m.srv.serfLAN.UserEvent(eventName, args.Payload, false) + if err != nil { + err = fmt.Errorf("error broadcasting event to default segment: %v", err) + errs = multierror.Append(errs, err) + } + for name, segment := range segments { + err := segment.UserEvent(eventName, args.Payload, false) + if err != nil { + err = fmt.Errorf("error broadcasting event to segment %q: %v", name, err) + errs = multierror.Append(errs, err) + } + } + return errs } // KeyringOperation will query the WAN and LAN gossip keyrings of all nodes. @@ -130,23 +145,36 @@ func (m *Internal) KeyringOperation( return nil } -// executeKeyringOp executes the appropriate keyring-related function based on -// the type of keyring operation in the request. It takes the KeyManager as an -// argument, so it can handle any operation for either LAN or WAN pools. +// executeKeyringOp executes the keyring-related operation in the request +// on either the WAN or LAN pools. func (m *Internal) executeKeyringOp( args *structs.KeyringRequest, reply *structs.KeyringResponses, wan bool) { - var serfResp *serf.KeyResponse - var err error - var mgr *serf.KeyManager - if wan { - mgr = m.srv.KeyManagerWAN() + mgr := m.srv.KeyManagerWAN() + m.executeKeyringOpMgr(mgr, args, reply, wan) } else { - mgr = m.srv.KeyManagerLAN() + segments := m.srv.LANSegments() + m.executeKeyringOpMgr(m.srv.KeyManagerLAN(), args, reply, wan) + for _, segment := range segments { + mgr := segment.KeyManager() + m.executeKeyringOpMgr(mgr, args, reply, wan) + } } +} + +// executeKeyringOpMgr executes the appropriate keyring-related function based on +// the type of keyring operation in the request. It takes the KeyManager as an +// argument, so it can handle any operation for either LAN or WAN pools. +func (m *Internal) executeKeyringOpMgr( + mgr *serf.KeyManager, + args *structs.KeyringRequest, + reply *structs.KeyringResponses, + wan bool) { + var serfResp *serf.KeyResponse + var err error opts := &serf.KeyRequestOptions{RelayFactor: args.RelayFactor} switch args.Operation { diff --git a/agent/consul/leader.go b/agent/consul/leader.go index 43e48fa843f0..a213c6d39a59 100644 --- a/agent/consul/leader.go +++ b/agent/consul/leader.go @@ -64,7 +64,12 @@ func (s *Server) leaderLoop(stopCh chan struct{}) { // Fire a user event indicating a new leader payload := []byte(s.config.NodeName) if err := s.serfLAN.UserEvent(newLeaderEvent, payload, false); err != nil { - s.logger.Printf("[WARN] consul: failed to broadcast new leader event: %v", err) + s.logger.Printf("[WARN] consul: failed to broadcast new leader event on default segment: %v", err) + } + for name, segment := range s.LANSegments() { + if err := segment.UserEvent(newLeaderEvent, payload, false); err != nil { + s.logger.Printf("[WARN] consul: failed to broadcast new leader event on segment %q: %v", name, err) + } } // Reconcile channel is only used once initial reconcile @@ -439,7 +444,9 @@ func (s *Server) shouldHandleMember(member serf.Member) bool { if valid, dc := isConsulNode(member); valid && dc == s.config.Datacenter { return true } - if valid, parts := metadata.IsConsulServer(member); valid && parts.Datacenter == s.config.Datacenter { + if valid, parts := metadata.IsConsulServer(member); valid && + parts.Segment == "" && + parts.Datacenter == s.config.Datacenter { return true } return false diff --git a/agent/consul/merge.go b/agent/consul/merge.go index f1bc2956761a..9092725426a7 100644 --- a/agent/consul/merge.go +++ b/agent/consul/merge.go @@ -15,6 +15,7 @@ type lanMergeDelegate struct { dc string nodeID types.NodeID nodeName string + segment string } func (md *lanMergeDelegate) NotifyMerge(members []*serf.Member) error { @@ -53,6 +54,10 @@ func (md *lanMergeDelegate) NotifyMerge(members []*serf.Member) error { return fmt.Errorf("Member '%s' part of wrong datacenter '%s'", m.Name, parts.Datacenter) } + + if segment := m.Tags["segment"]; segment != md.segment { + return fmt.Errorf("Member '%s' part of wrong segment '%s' (expected '%s')", m.Name, segment, md.segment) + } } return nil } diff --git a/agent/consul/merge_test.go b/agent/consul/merge_test.go index cce99eaf8cc0..04d39223b3ff 100644 --- a/agent/consul/merge_test.go +++ b/agent/consul/merge_test.go @@ -101,6 +101,7 @@ func TestMerge_LAN(t *testing.T) { dc: "dc1", nodeID: types.NodeID("ee954a2f-80de-4b34-8780-97b942a50a99"), nodeName: "node0", + segment: "", } for i, c := range cases { if err := delegate.NotifyMerge(c.members); c.expect == "" { diff --git a/agent/consul/rtt.go b/agent/consul/rtt.go index 50802d568979..84a39f34ddb5 100644 --- a/agent/consul/rtt.go +++ b/agent/consul/rtt.go @@ -6,7 +6,6 @@ import ( "github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/lib" - "github.com/hashicorp/serf/coordinate" ) // nodeSorter takes a list of nodes and a parallel vector of distances and @@ -19,15 +18,16 @@ type nodeSorter struct { // newNodeSorter returns a new sorter for the given source coordinate and set of // nodes. -func (s *Server) newNodeSorter(c *coordinate.Coordinate, nodes structs.Nodes) (sort.Interface, error) { +func (s *Server) newNodeSorter(cs lib.CoordinateSet, nodes structs.Nodes) (sort.Interface, error) { state := s.fsm.State() vec := make([]float64, len(nodes)) for i, node := range nodes { - coord, err := state.CoordinateGetRaw(node.Node) + other, err := state.Coordinate(node.Node) if err != nil { return nil, err } - vec[i] = lib.ComputeDistance(c, coord) + c1, c2 := cs.Intersect(other) + vec[i] = lib.ComputeDistance(c1, c2) } return &nodeSorter{nodes, vec}, nil } @@ -58,15 +58,16 @@ type serviceNodeSorter struct { // newServiceNodeSorter returns a new sorter for the given source coordinate and // set of service nodes. -func (s *Server) newServiceNodeSorter(c *coordinate.Coordinate, nodes structs.ServiceNodes) (sort.Interface, error) { +func (s *Server) newServiceNodeSorter(cs lib.CoordinateSet, nodes structs.ServiceNodes) (sort.Interface, error) { state := s.fsm.State() vec := make([]float64, len(nodes)) for i, node := range nodes { - coord, err := state.CoordinateGetRaw(node.Node) + other, err := state.Coordinate(node.Node) if err != nil { return nil, err } - vec[i] = lib.ComputeDistance(c, coord) + c1, c2 := cs.Intersect(other) + vec[i] = lib.ComputeDistance(c1, c2) } return &serviceNodeSorter{nodes, vec}, nil } @@ -97,15 +98,16 @@ type healthCheckSorter struct { // newHealthCheckSorter returns a new sorter for the given source coordinate and // set of health checks with nodes. -func (s *Server) newHealthCheckSorter(c *coordinate.Coordinate, checks structs.HealthChecks) (sort.Interface, error) { +func (s *Server) newHealthCheckSorter(cs lib.CoordinateSet, checks structs.HealthChecks) (sort.Interface, error) { state := s.fsm.State() vec := make([]float64, len(checks)) for i, check := range checks { - coord, err := state.CoordinateGetRaw(check.Node) + other, err := state.Coordinate(check.Node) if err != nil { return nil, err } - vec[i] = lib.ComputeDistance(c, coord) + c1, c2 := cs.Intersect(other) + vec[i] = lib.ComputeDistance(c1, c2) } return &healthCheckSorter{checks, vec}, nil } @@ -136,15 +138,16 @@ type checkServiceNodeSorter struct { // newCheckServiceNodeSorter returns a new sorter for the given source coordinate // and set of nodes with health checks. -func (s *Server) newCheckServiceNodeSorter(c *coordinate.Coordinate, nodes structs.CheckServiceNodes) (sort.Interface, error) { +func (s *Server) newCheckServiceNodeSorter(cs lib.CoordinateSet, nodes structs.CheckServiceNodes) (sort.Interface, error) { state := s.fsm.State() vec := make([]float64, len(nodes)) for i, node := range nodes { - coord, err := state.CoordinateGetRaw(node.Node.Node) + other, err := state.Coordinate(node.Node.Node) if err != nil { return nil, err } - vec[i] = lib.ComputeDistance(c, coord) + c1, c2 := cs.Intersect(other) + vec[i] = lib.ComputeDistance(c1, c2) } return &checkServiceNodeSorter{nodes, vec}, nil } @@ -166,16 +169,16 @@ func (n *checkServiceNodeSorter) Less(i, j int) bool { } // newSorterByDistanceFrom returns a sorter for the given type. -func (s *Server) newSorterByDistanceFrom(c *coordinate.Coordinate, subj interface{}) (sort.Interface, error) { +func (s *Server) newSorterByDistanceFrom(cs lib.CoordinateSet, subj interface{}) (sort.Interface, error) { switch v := subj.(type) { case structs.Nodes: - return s.newNodeSorter(c, v) + return s.newNodeSorter(cs, v) case structs.ServiceNodes: - return s.newServiceNodeSorter(c, v) + return s.newServiceNodeSorter(cs, v) case structs.HealthChecks: - return s.newHealthCheckSorter(c, v) + return s.newHealthCheckSorter(cs, v) case structs.CheckServiceNodes: - return s.newCheckServiceNodeSorter(c, v) + return s.newCheckServiceNodeSorter(cs, v) default: panic(fmt.Errorf("Unhandled type passed to newSorterByDistanceFrom: %#v", subj)) } @@ -197,19 +200,19 @@ func (s *Server) sortNodesByDistanceFrom(source structs.QuerySource, subj interf return nil } - // There won't always be a coordinate for the source node. If there's not - // one then we can bail out because there's no meaning for the sort. + // There won't always be coordinates for the source node. If there are + // none then we can bail out because there's no meaning for the sort. state := s.fsm.State() - coord, err := state.CoordinateGetRaw(source.Node) + cs, err := state.Coordinate(source.Node) if err != nil { return err } - if coord == nil { + if len(cs) == 0 { return nil } // Do the sort! - sorter, err := s.newSorterByDistanceFrom(coord, subj) + sorter, err := s.newSorterByDistanceFrom(cs, subj) if err != nil { return err } diff --git a/agent/consul/segment_stub.go b/agent/consul/segment_stub.go new file mode 100644 index 000000000000..1140741ee1d4 --- /dev/null +++ b/agent/consul/segment_stub.go @@ -0,0 +1,45 @@ +// +build !ent + +package consul + +import ( + "errors" + + "github.com/hashicorp/serf/serf" +) + +const ( + errSegmentsNotSupported = "network segments are not supported in this version of Consul" +) + +var ( + ErrSegmentsNotSupported = errors.New(errSegmentsNotSupported) +) + +// LANSegmentMembers is used to return the members of the given LAN segment. +func (s *Server) LANSegmentMembers(name string) ([]serf.Member, error) { + if name == "" { + return s.LANMembers(), nil + } + + return nil, ErrSegmentsNotSupported +} + +// LANSegmentAddr is used to return the address used for the given LAN segment. +func (s *Server) LANSegmentAddr(name string) string { + return "" +} + +// setupSegments returns an error if any segments are defined since the OSS +// version of Consul doens't support them. +func (s *Server) setupSegments(config *Config, port int) error { + if len(config.Segments) > 0 { + return ErrSegmentsNotSupported + } + + return nil +} + +// floodSegments is a NOP in the OSS version of Consul. +func (s *Server) floodSegments(config *Config) { +} diff --git a/agent/consul/server.go b/agent/consul/server.go index e0015bb9f34c..b5b8e9beac63 100644 --- a/agent/consul/server.go +++ b/agent/consul/server.go @@ -50,10 +50,11 @@ const ( ) const ( - serfLANSnapshot = "serf/local.snapshot" - serfWANSnapshot = "serf/remote.snapshot" - raftState = "raft/" - snapshotsRetained = 2 + serfLANSnapshot = "serf/local.snapshot" + serfLANSegmentSnapshot = "serf/local-segment-%s.snapshot" + serfWANSnapshot = "serf/remote.snapshot" + raftState = "raft/" + snapshotsRetained = 2 // serverRPCCache controls how long we keep an idle connection // open to a server @@ -162,6 +163,10 @@ type Server struct { // which contains all the DC nodes serfLAN *serf.Serf + // segmentLAN maps segment names to their Serf cluster + segmentLAN map[string]*serf.Serf + segmentLock sync.RWMutex + // serfWAN is the Serf cluster maintained between DC's // which SHOULD only consist of Consul servers serfWAN *serf.Serf @@ -300,6 +305,7 @@ func NewServerLogger(config *Config, logger *log.Logger, tokens *token.Store) (* rpcServer: rpc.NewServer(), rpcTLS: incomingTLS, reassertLeaderCh: make(chan chan error), + segmentLAN: make(map[string]*serf.Serf, len(config.Segments)), sessionTimers: NewSessionTimers(), tombstoneGC: gc, serverLookup: NewServerLookup(), @@ -353,7 +359,7 @@ func NewServerLogger(config *Config, logger *log.Logger, tokens *token.Store) (* // Initialize the WAN Serf. serfBindPortWAN := config.SerfWANConfig.MemberlistConfig.BindPort - s.serfWAN, err = s.setupSerf(config.SerfWANConfig, s.eventChWAN, serfWANSnapshot, true, serfBindPortWAN) + s.serfWAN, err = s.setupSerf(config.SerfWANConfig, s.eventChWAN, serfWANSnapshot, true, serfBindPortWAN, "") if err != nil { s.Shutdown() return nil, fmt.Errorf("Failed to start WAN Serf: %v", err) @@ -368,14 +374,24 @@ func NewServerLogger(config *Config, logger *log.Logger, tokens *token.Store) (* s.logger.Printf("[INFO] agent: Serf WAN TCP bound to port %d", serfBindPortWAN) } - // Initialize the LAN Serf. - s.serfLAN, err = s.setupSerf(config.SerfLANConfig, s.eventChLAN, serfLANSnapshot, false, serfBindPortWAN) + // Initialize the LAN segments before the default LAN Serf so we have + // updated port information to publish there. + if err := s.setupSegments(config, serfBindPortWAN); err != nil { + s.Shutdown() + return nil, fmt.Errorf("Failed to setup network segments: %v", err) + } + + // Initialize the LAN Serf for the default network segment. + s.serfLAN, err = s.setupSerf(config.SerfLANConfig, s.eventChLAN, serfLANSnapshot, false, serfBindPortWAN, "") if err != nil { s.Shutdown() return nil, fmt.Errorf("Failed to start LAN Serf: %v", err) } go s.lanEventHandler() + // Start the flooders after the LAN event handler is wired up. + s.floodSegments(config) + // Add a "static route" to the WAN Serf and hook it up to Serf events. if err := s.router.AddArea(types.AreaWAN, s.serfWAN, s.connPool, s.config.VerifyOutgoing); err != nil { s.Shutdown() @@ -413,67 +429,6 @@ func NewServerLogger(config *Config, logger *log.Logger, tokens *token.Store) (* return s, nil } -// setupSerf is used to setup and initialize a Serf -func (s *Server) setupSerf(conf *serf.Config, ch chan serf.Event, path string, wan bool, wanPort int) (*serf.Serf, error) { - addr := s.Listener.Addr().(*net.TCPAddr) - conf.Init() - if wan { - conf.NodeName = fmt.Sprintf("%s.%s", s.config.NodeName, s.config.Datacenter) - } else { - conf.NodeName = s.config.NodeName - conf.Tags["wan_join_port"] = fmt.Sprintf("%d", wanPort) - } - conf.Tags["role"] = "consul" - conf.Tags["dc"] = s.config.Datacenter - conf.Tags["id"] = string(s.config.NodeID) - conf.Tags["vsn"] = fmt.Sprintf("%d", s.config.ProtocolVersion) - conf.Tags["vsn_min"] = fmt.Sprintf("%d", ProtocolVersionMin) - conf.Tags["vsn_max"] = fmt.Sprintf("%d", ProtocolVersionMax) - conf.Tags["raft_vsn"] = fmt.Sprintf("%d", s.config.RaftConfig.ProtocolVersion) - conf.Tags["build"] = s.config.Build - conf.Tags["port"] = fmt.Sprintf("%d", addr.Port) - if s.config.Bootstrap { - conf.Tags["bootstrap"] = "1" - } - if s.config.BootstrapExpect != 0 { - conf.Tags["expect"] = fmt.Sprintf("%d", s.config.BootstrapExpect) - } - if s.config.NonVoter { - conf.Tags["nonvoter"] = "1" - } - if s.config.UseTLS { - conf.Tags["use_tls"] = "1" - } - conf.MemberlistConfig.LogOutput = s.config.LogOutput - conf.LogOutput = s.config.LogOutput - conf.Logger = s.logger - conf.EventCh = ch - if !s.config.DevMode { - conf.SnapshotPath = filepath.Join(s.config.DataDir, path) - } - conf.ProtocolVersion = protocolVersionMap[s.config.ProtocolVersion] - conf.RejoinAfterLeave = s.config.RejoinAfterLeave - if wan { - conf.Merge = &wanMergeDelegate{} - } else { - conf.Merge = &lanMergeDelegate{ - dc: s.config.Datacenter, - nodeID: s.config.NodeID, - nodeName: s.config.NodeName, - } - } - - // Until Consul supports this fully, we disable automatic resolution. - // When enabled, the Serf gossip may just turn off if we are the minority - // node which is rather unexpected. - conf.EnableNameConflictResolution = false - if err := lib.EnsurePath(conf.SnapshotPath, false); err != nil { - return nil, err - } - - return serf.Create(conf) -} - // setupRaft is used to setup and initialize Raft func (s *Server) setupRaft() error { // If we have an unclean exit then attempt to close the Raft store. @@ -931,6 +886,19 @@ func (s *Server) Encrypted() bool { return s.serfLAN.EncryptionEnabled() && s.serfWAN.EncryptionEnabled() } +// LANSegments returns a map of LAN segments by name +func (s *Server) LANSegments() map[string]*serf.Serf { + s.segmentLock.RLock() + defer s.segmentLock.RUnlock() + + segments := make(map[string]*serf.Serf, len(s.segmentLAN)) + for name, segment := range s.segmentLAN { + segments[name] = segment + } + + return segments +} + // inmemCodec is used to do an RPC call without going over a network type inmemCodec struct { method string @@ -1042,8 +1010,21 @@ func (s *Server) Stats() map[string]map[string]string { } // GetLANCoordinate returns the coordinate of the server in the LAN gossip pool. -func (s *Server) GetLANCoordinate() (*coordinate.Coordinate, error) { - return s.serfLAN.GetCoordinate() +func (s *Server) GetLANCoordinate() (lib.CoordinateSet, error) { + lan, err := s.serfLAN.GetCoordinate() + if err != nil { + return nil, err + } + + cs := lib.CoordinateSet{"": lan} + for name, segment := range s.segmentLAN { + c, err := segment.GetCoordinate() + if err != nil { + return nil, err + } + cs[name] = c + } + return cs, nil } // GetWANCoordinate returns the coordinate of the server in the WAN gossip pool. diff --git a/agent/consul/serf.go b/agent/consul/server_serf.go similarity index 77% rename from agent/consul/serf.go rename to agent/consul/server_serf.go index 779ab0154b1c..1debd33c182a 100644 --- a/agent/consul/serf.go +++ b/agent/consul/server_serf.go @@ -1,10 +1,14 @@ package consul import ( + "fmt" + "net" + "path/filepath" "strings" "time" "github.com/hashicorp/consul/agent/metadata" + "github.com/hashicorp/consul/lib" "github.com/hashicorp/raft" "github.com/hashicorp/serf/serf" ) @@ -24,6 +28,76 @@ const ( peerRetryBase = 1 * time.Second ) +// setupSerf is used to setup and initialize a Serf +func (s *Server) setupSerf(conf *serf.Config, ch chan serf.Event, path string, wan bool, wanPort int, segment string) (*serf.Serf, error) { + conf.Init() + + if wan { + conf.NodeName = fmt.Sprintf("%s.%s", s.config.NodeName, s.config.Datacenter) + } else { + conf.NodeName = s.config.NodeName + conf.Tags["wan_join_port"] = fmt.Sprintf("%d", wanPort) + } + conf.Tags["role"] = "consul" + conf.Tags["dc"] = s.config.Datacenter + conf.Tags["segment"] = segment + if segment == "" { + for _, s := range s.config.Segments { + conf.Tags["segment_port_"+s.Name] = fmt.Sprintf("%d", s.Port) + } + } + conf.Tags["id"] = string(s.config.NodeID) + conf.Tags["vsn"] = fmt.Sprintf("%d", s.config.ProtocolVersion) + conf.Tags["vsn_min"] = fmt.Sprintf("%d", ProtocolVersionMin) + conf.Tags["vsn_max"] = fmt.Sprintf("%d", ProtocolVersionMax) + conf.Tags["raft_vsn"] = fmt.Sprintf("%d", s.config.RaftConfig.ProtocolVersion) + conf.Tags["build"] = s.config.Build + addr := s.Listener.Addr().(*net.TCPAddr) + conf.Tags["port"] = fmt.Sprintf("%d", addr.Port) + if s.config.Bootstrap { + conf.Tags["bootstrap"] = "1" + } + if s.config.BootstrapExpect != 0 { + conf.Tags["expect"] = fmt.Sprintf("%d", s.config.BootstrapExpect) + } + if s.config.NonVoter { + conf.Tags["nonvoter"] = "1" + } + if s.config.UseTLS { + conf.Tags["use_tls"] = "1" + } + conf.MemberlistConfig.LogOutput = s.config.LogOutput + conf.LogOutput = s.config.LogOutput + conf.Logger = s.logger + conf.EventCh = ch + conf.ProtocolVersion = protocolVersionMap[s.config.ProtocolVersion] + conf.RejoinAfterLeave = s.config.RejoinAfterLeave + if wan { + conf.Merge = &wanMergeDelegate{} + } else { + conf.Merge = &lanMergeDelegate{ + dc: s.config.Datacenter, + nodeID: s.config.NodeID, + nodeName: s.config.NodeName, + segment: segment, + } + } + + // Until Consul supports this fully, we disable automatic resolution. + // When enabled, the Serf gossip may just turn off if we are the minority + // node which is rather unexpected. + conf.EnableNameConflictResolution = false + + if !s.config.DevMode { + conf.SnapshotPath = filepath.Join(s.config.DataDir, path) + } + if err := lib.EnsurePath(conf.SnapshotPath, false); err != nil { + return nil, err + } + + return serf.Create(conf) +} + // userEventName computes the name of a user event func userEventName(name string) string { return userEventPrefix + name @@ -126,7 +200,7 @@ func (s *Server) localEvent(event serf.UserEvent) { func (s *Server) lanNodeJoin(me serf.MemberEvent) { for _, m := range me.Members { ok, serverMeta := metadata.IsConsulServer(m) - if !ok { + if !ok || serverMeta.Segment != "" { continue } s.logger.Printf("[INFO] consul: Adding LAN server %s", serverMeta) @@ -262,7 +336,7 @@ func (s *Server) maybeBootstrap() { func (s *Server) lanNodeFailed(me serf.MemberEvent) { for _, m := range me.Members { ok, serverMeta := metadata.IsConsulServer(m) - if !ok { + if !ok || serverMeta.Segment != "" { continue } s.logger.Printf("[INFO] consul: Removing LAN server %s", serverMeta) diff --git a/agent/consul/state/catalog.go b/agent/consul/state/catalog.go index e9249e26d9df..73a1c75da7a6 100644 --- a/agent/consul/state/catalog.go +++ b/agent/consul/state/catalog.go @@ -370,12 +370,12 @@ func (s *Store) deleteNodeTxn(tx *memdb.Txn, idx uint64, nodeName string) error } } - // Delete any coordinate associated with this node. - coord, err := tx.First("coordinates", "id", nodeName) + // Delete any coordinates associated with this node. + coords, err := tx.Get("coordinates", "node", nodeName) if err != nil { return fmt.Errorf("failed coordinate lookup: %s", err) } - if coord != nil { + for coord := coords.Next(); coord != nil; coord = coords.Next() { if err := tx.Delete("coordinates", coord); err != nil { return fmt.Errorf("failed deleting coordinate: %s", err) } diff --git a/agent/consul/state/coordinate.go b/agent/consul/state/coordinate.go index 9d05e05d1cb1..83db264553ad 100644 --- a/agent/consul/state/coordinate.go +++ b/agent/consul/state/coordinate.go @@ -4,8 +4,8 @@ import ( "fmt" "github.com/hashicorp/consul/agent/structs" + "github.com/hashicorp/consul/lib" "github.com/hashicorp/go-memdb" - "github.com/hashicorp/serf/coordinate" ) // Coordinates is used to pull all the coordinates from the snapshot. @@ -40,26 +40,23 @@ func (s *Restore) Coordinates(idx uint64, updates structs.Coordinates) error { return nil } -// CoordinateGetRaw queries for the coordinate of the given node. This is an -// unusual state store method because it just returns the raw coordinate or -// nil, none of the Raft or node information is returned. This hits the 90% -// internal-to-Consul use case for this data, and this isn't exposed via an -// endpoint, so it doesn't matter that the Raft info isn't available. -func (s *Store) CoordinateGetRaw(node string) (*coordinate.Coordinate, error) { +// Coordinate returns a map of coordinates for the given node, indexed by +// network segment. +func (s *Store) Coordinate(node string) (lib.CoordinateSet, error) { tx := s.db.Txn(false) defer tx.Abort() - // Pull the full coordinate entry. - coord, err := tx.First("coordinates", "id", node) + iter, err := tx.Get("coordinates", "node", node) if err != nil { return nil, fmt.Errorf("failed coordinate lookup: %s", err) } - // Pick out just the raw coordinate. - if coord != nil { - return coord.(*structs.Coordinate).Coord, nil + results := make(lib.CoordinateSet) + for raw := iter.Next(); raw != nil; raw = iter.Next() { + coord := raw.(*structs.Coordinate) + results[coord.Segment] = coord.Coord } - return nil, nil + return results, nil } // Coordinates queries for all nodes with coordinates. diff --git a/agent/consul/state/coordinate_test.go b/agent/consul/state/coordinate_test.go index 70e84a6bdce9..b126e4478718 100644 --- a/agent/consul/state/coordinate_test.go +++ b/agent/consul/state/coordinate_test.go @@ -3,12 +3,13 @@ package state import ( "math" "math/rand" - "reflect" "testing" "github.com/hashicorp/consul/agent/structs" + "github.com/hashicorp/consul/lib" "github.com/hashicorp/go-memdb" "github.com/hashicorp/serf/coordinate" + "github.com/pascaldekloe/goe/verify" ) // generateRandomCoordinate creates a random coordinate. This mucks with the @@ -30,25 +31,22 @@ func TestStateStore_Coordinate_Updates(t *testing.T) { s := testStateStore(t) // Make sure the coordinates list starts out empty, and that a query for - // a raw coordinate for a nonexistent node doesn't do anything bad. + // a per-node coordinate for a nonexistent node doesn't do anything bad. ws := memdb.NewWatchSet() - idx, coords, err := s.Coordinates(ws) + idx, all, err := s.Coordinates(ws) if err != nil { t.Fatalf("err: %s", err) } if idx != 0 { t.Fatalf("bad index: %d", idx) } - if coords != nil { - t.Fatalf("bad: %#v", coords) - } - coord, err := s.CoordinateGetRaw("nope") + verify.Values(t, "", all, structs.Coordinates{}) + + coords, err := s.Coordinate("nope") if err != nil { t.Fatalf("err: %s", err) } - if coord != nil { - t.Fatalf("bad: %#v", coord) - } + verify.Values(t, "", coords, lib.CoordinateSet{}) // Make an update for nodes that don't exist and make sure they get // ignored. @@ -72,16 +70,14 @@ func TestStateStore_Coordinate_Updates(t *testing.T) { // Should still be empty, though applying an empty batch does bump // the table index. ws = memdb.NewWatchSet() - idx, coords, err = s.Coordinates(ws) + idx, all, err = s.Coordinates(ws) if err != nil { t.Fatalf("err: %s", err) } if idx != 1 { t.Fatalf("bad index: %d", idx) } - if coords != nil { - t.Fatalf("bad: %#v", coords) - } + verify.Values(t, "", all, structs.Coordinates{}) // Register the nodes then do the update again. testRegisterNode(t, s, 1, "node1") @@ -95,26 +91,25 @@ func TestStateStore_Coordinate_Updates(t *testing.T) { // Should go through now. ws = memdb.NewWatchSet() - idx, coords, err = s.Coordinates(ws) + idx, all, err = s.Coordinates(ws) if err != nil { t.Fatalf("err: %s", err) } if idx != 3 { t.Fatalf("bad index: %d", idx) } - if !reflect.DeepEqual(coords, updates) { - t.Fatalf("bad: %#v", coords) - } + verify.Values(t, "", all, updates) - // Also verify the raw coordinate interface. + // Also verify the per-node coordinate interface. for _, update := range updates { - coord, err := s.CoordinateGetRaw(update.Node) + coords, err := s.Coordinate(update.Node) if err != nil { t.Fatalf("err: %s", err) } - if !reflect.DeepEqual(coord, update.Coord) { - t.Fatalf("bad: %#v", coord) + expected := lib.CoordinateSet{ + "": update.Coord, } + verify.Values(t, "", coords, expected) } // Update the coordinate for one of the nodes. @@ -127,26 +122,25 @@ func TestStateStore_Coordinate_Updates(t *testing.T) { } // Verify it got applied. - idx, coords, err = s.Coordinates(nil) + idx, all, err = s.Coordinates(nil) if err != nil { t.Fatalf("err: %s", err) } if idx != 4 { t.Fatalf("bad index: %d", idx) } - if !reflect.DeepEqual(coords, updates) { - t.Fatalf("bad: %#v", coords) - } + verify.Values(t, "", all, updates) - // And check the raw coordinate version of the same thing. + // And check the per-node coordinate version of the same thing. for _, update := range updates { - coord, err := s.CoordinateGetRaw(update.Node) + coords, err := s.Coordinate(update.Node) if err != nil { t.Fatalf("err: %s", err) } - if !reflect.DeepEqual(coord, update.Coord) { - t.Fatalf("bad: %#v", coord) + expected := lib.CoordinateSet{ + "": update.Coord, } + verify.Values(t, "", coords, expected) } // Apply an invalid update and make sure it gets ignored. @@ -162,16 +156,14 @@ func TestStateStore_Coordinate_Updates(t *testing.T) { // Verify we are at the previous state, though the empty batch does bump // the table index. - idx, coords, err = s.Coordinates(nil) + idx, all, err = s.Coordinates(nil) if err != nil { t.Fatalf("err: %s", err) } if idx != 5 { t.Fatalf("bad index: %d", idx) } - if !reflect.DeepEqual(coords, updates) { - t.Fatalf("bad: %#v", coords) - } + verify.Values(t, "", all, updates) } func TestStateStore_Coordinate_Cleanup(t *testing.T) { @@ -181,8 +173,14 @@ func TestStateStore_Coordinate_Cleanup(t *testing.T) { testRegisterNode(t, s, 1, "node1") updates := structs.Coordinates{ &structs.Coordinate{ - Node: "node1", - Coord: generateRandomCoordinate(), + Node: "node1", + Segment: "alpha", + Coord: generateRandomCoordinate(), + }, + &structs.Coordinate{ + Node: "node1", + Segment: "beta", + Coord: generateRandomCoordinate(), }, } if err := s.CoordinateBatchUpdate(2, updates); err != nil { @@ -190,13 +188,15 @@ func TestStateStore_Coordinate_Cleanup(t *testing.T) { } // Make sure it's in there. - coord, err := s.CoordinateGetRaw("node1") + coords, err := s.Coordinate("node1") if err != nil { t.Fatalf("err: %s", err) } - if !reflect.DeepEqual(coord, updates[0].Coord) { - t.Fatalf("bad: %#v", coord) + expected := lib.CoordinateSet{ + "alpha": updates[0].Coord, + "beta": updates[1].Coord, } + verify.Values(t, "", coords, expected) // Now delete the node. if err := s.DeleteNode(3, "node1"); err != nil { @@ -204,25 +204,21 @@ func TestStateStore_Coordinate_Cleanup(t *testing.T) { } // Make sure the coordinate is gone. - coord, err = s.CoordinateGetRaw("node1") + coords, err = s.Coordinate("node1") if err != nil { t.Fatalf("err: %s", err) } - if coord != nil { - t.Fatalf("bad: %#v", coord) - } + verify.Values(t, "", coords, lib.CoordinateSet{}) // Make sure the index got updated. - idx, coords, err := s.Coordinates(nil) + idx, all, err := s.Coordinates(nil) if err != nil { t.Fatalf("err: %s", err) } if idx != 3 { t.Fatalf("bad index: %d", idx) } - if coords != nil { - t.Fatalf("bad: %#v", coords) - } + verify.Values(t, "", all, structs.Coordinates{}) } func TestStateStore_Coordinate_Snapshot_Restore(t *testing.T) { @@ -291,9 +287,7 @@ func TestStateStore_Coordinate_Snapshot_Restore(t *testing.T) { // The snapshot will have the bad update in it, since we don't filter on // the read side. - if !reflect.DeepEqual(dump, append(updates, badUpdate)) { - t.Fatalf("bad: %#v", dump) - } + verify.Values(t, "", dump, append(updates, badUpdate)) // Restore the values into a new state store. func() { @@ -312,9 +306,7 @@ func TestStateStore_Coordinate_Snapshot_Restore(t *testing.T) { if idx != 6 { t.Fatalf("bad index: %d", idx) } - if !reflect.DeepEqual(res, updates) { - t.Fatalf("bad: %#v", res) - } + verify.Values(t, "", res, updates) // Check that the index was updated (note that it got passed // in during the restore). diff --git a/agent/consul/state/schema.go b/agent/consul/state/schema.go index 6abc78f3a55c..d7fa449e88ed 100644 --- a/agent/consul/state/schema.go +++ b/agent/consul/state/schema.go @@ -374,6 +374,26 @@ func coordinatesTableSchema() *memdb.TableSchema { Name: "id", AllowMissing: false, Unique: true, + Indexer: &memdb.CompoundIndex{ + // AllowMissing is required since we allow + // Segment to be an empty string. + AllowMissing: true, + Indexes: []memdb.Indexer{ + &memdb.StringFieldIndex{ + Field: "Node", + Lowercase: true, + }, + &memdb.StringFieldIndex{ + Field: "Segment", + Lowercase: true, + }, + }, + }, + }, + "node": &memdb.IndexSchema{ + Name: "node", + AllowMissing: false, + Unique: false, Indexer: &memdb.StringFieldIndex{ Field: "Node", Lowercase: true, diff --git a/agent/coordinate_endpoint.go b/agent/coordinate_endpoint.go index 790bc058ed6c..e602dacbed41 100644 --- a/agent/coordinate_endpoint.go +++ b/agent/coordinate_endpoint.go @@ -81,5 +81,18 @@ func (s *HTTPServer) CoordinateNodes(resp http.ResponseWriter, req *http.Request if out.Coordinates == nil { out.Coordinates = make(structs.Coordinates, 0) } + + // Filter by segment if applicable + if v, ok := req.URL.Query()["segment"]; ok && len(v) > 0 { + segment := v[0] + filtered := make(structs.Coordinates, 0) + for _, coord := range out.Coordinates { + if coord.Segment == segment { + filtered = append(filtered, coord) + } + } + out.Coordinates = filtered + } + return out.Coordinates, nil } diff --git a/agent/coordinate_endpoint_test.go b/agent/coordinate_endpoint_test.go index 010550c6b5e9..414d9e6fda6e 100644 --- a/agent/coordinate_endpoint_test.go +++ b/agent/coordinate_endpoint_test.go @@ -68,6 +68,7 @@ func TestCoordinate_Nodes(t *testing.T) { arg1 := structs.CoordinateUpdateRequest{ Datacenter: "dc1", Node: "foo", + Segment: "alpha", Coord: coordinate.NewCoordinate(coordinate.DefaultConfig()), } var out struct{} @@ -99,4 +100,43 @@ func TestCoordinate_Nodes(t *testing.T) { coordinates[1].Node != "foo" { t.Fatalf("bad: %v", coordinates) } + + // Filter on a nonexistant node segment + req, _ = http.NewRequest("GET", "/v1/coordinate/nodes?segment=nope", nil) + resp = httptest.NewRecorder() + obj, err = a.srv.CoordinateNodes(resp, req) + if err != nil { + t.Fatalf("err: %v", err) + } + + coordinates = obj.(structs.Coordinates) + if len(coordinates) != 0 { + t.Fatalf("bad: %v", coordinates) + } + + // Filter on a real node segment + req, _ = http.NewRequest("GET", "/v1/coordinate/nodes?segment=alpha", nil) + resp = httptest.NewRecorder() + obj, err = a.srv.CoordinateNodes(resp, req) + if err != nil { + t.Fatalf("err: %v", err) + } + + coordinates = obj.(structs.Coordinates) + if len(coordinates) != 1 || coordinates[0].Node != "foo" { + t.Fatalf("bad: %v", coordinates) + } + + // Make sure the empty filter works + req, _ = http.NewRequest("GET", "/v1/coordinate/nodes?segment=", nil) + resp = httptest.NewRecorder() + obj, err = a.srv.CoordinateNodes(resp, req) + if err != nil { + t.Fatalf("err: %v", err) + } + + coordinates = obj.(structs.Coordinates) + if len(coordinates) != 1 || coordinates[0].Node != "bar" { + t.Fatalf("bad: %v", coordinates) + } } diff --git a/agent/local_test.go b/agent/local_test.go index 5ff8b1c6110e..993224395b35 100644 --- a/agent/local_test.go +++ b/agent/local_test.go @@ -127,6 +127,7 @@ func TestAgentAntiEntropy_Services(t *testing.T) { id := services.NodeServices.Node.ID addrs := services.NodeServices.Node.TaggedAddresses meta := services.NodeServices.Node.Meta + delete(meta, structs.MetaSegmentKey) // Added later, not in config. if id != a.Config.NodeID || !reflect.DeepEqual(addrs, a.Config.TaggedAddresses) || !reflect.DeepEqual(meta, a.Config.Meta) { @@ -828,6 +829,7 @@ func TestAgentAntiEntropy_Checks(t *testing.T) { id := services.NodeServices.Node.ID addrs := services.NodeServices.Node.TaggedAddresses meta := services.NodeServices.Node.Meta + delete(meta, structs.MetaSegmentKey) // Added later, not in config. if id != a.Config.NodeID || !reflect.DeepEqual(addrs, a.Config.TaggedAddresses) || !reflect.DeepEqual(meta, a.Config.Meta) { @@ -1364,6 +1366,7 @@ func TestAgentAntiEntropy_NodeInfo(t *testing.T) { id := services.NodeServices.Node.ID addrs := services.NodeServices.Node.TaggedAddresses meta := services.NodeServices.Node.Meta + delete(meta, structs.MetaSegmentKey) // Added later, not in config. if id != cfg.NodeID || !reflect.DeepEqual(addrs, cfg.TaggedAddresses) || !reflect.DeepEqual(meta, cfg.Meta) { @@ -1387,6 +1390,7 @@ func TestAgentAntiEntropy_NodeInfo(t *testing.T) { id := services.NodeServices.Node.ID addrs := services.NodeServices.Node.TaggedAddresses meta := services.NodeServices.Node.Meta + delete(meta, structs.MetaSegmentKey) // Added later, not in config. if id != cfg.NodeID || !reflect.DeepEqual(addrs, cfg.TaggedAddresses) || !reflect.DeepEqual(meta, cfg.Meta) { diff --git a/agent/metadata/server.go b/agent/metadata/server.go index 97beb0315894..7ff6c69ce2a3 100644 --- a/agent/metadata/server.go +++ b/agent/metadata/server.go @@ -10,6 +10,7 @@ import ( "net" "regexp" "strconv" + "strings" "github.com/hashicorp/go-version" "github.com/hashicorp/serf/serf" @@ -27,19 +28,21 @@ func (k *Key) Equal(x *Key) bool { // Server is used to return details of a consul server type Server struct { - Name string - ID string - Datacenter string - Port int - WanJoinPort int - Bootstrap bool - Expect int - Build version.Version - Version int - RaftVersion int - NonVoter bool - Addr net.Addr - Status serf.MemberStatus + Name string + ID string + Datacenter string + Segment string + Port int + SegmentPorts map[string]int + WanJoinPort int + Bootstrap bool + Expect int + Build version.Version + Version int + RaftVersion int + NonVoter bool + Addr net.Addr + Status serf.MemberStatus // If true, use TLS when connecting to this server UseTLS bool @@ -73,8 +76,8 @@ func IsConsulServer(m serf.Member) (bool, *Server) { } datacenter := m.Tags["dc"] + segment := m.Tags["segment"] _, bootstrap := m.Tags["bootstrap"] - _, useTLS := m.Tags["use_tls"] expect := 0 @@ -93,6 +96,17 @@ func IsConsulServer(m serf.Member) (bool, *Server) { return false, nil } + segment_ports := make(map[string]int) + for name, value := range m.Tags { + if strings.HasPrefix(name, "segment_port_") { + segment_port, err := strconv.Atoi(value) + if err != nil { + return false, nil + } + segment_ports[strings.TrimPrefix(name, "segment_port_")] = segment_port + } + } + build_version, err := version.NewVersion(versionFormat.FindString(m.Tags["build"])) if err != nil { return false, nil @@ -127,20 +141,22 @@ func IsConsulServer(m serf.Member) (bool, *Server) { addr := &net.TCPAddr{IP: m.Addr, Port: port} parts := &Server{ - Name: m.Name, - ID: m.Tags["id"], - Datacenter: datacenter, - Port: port, - WanJoinPort: wan_join_port, - Bootstrap: bootstrap, - Expect: expect, - Addr: addr, - Build: *build_version, - Version: vsn, - RaftVersion: raft_vsn, - Status: m.Status, - NonVoter: nonVoter, - UseTLS: useTLS, + Name: m.Name, + ID: m.Tags["id"], + Datacenter: datacenter, + Segment: segment, + Port: port, + SegmentPorts: segment_ports, + WanJoinPort: wan_join_port, + Bootstrap: bootstrap, + Expect: expect, + Addr: addr, + Build: *build_version, + Version: vsn, + RaftVersion: raft_vsn, + Status: m.Status, + NonVoter: nonVoter, + UseTLS: useTLS, } return true, parts } diff --git a/agent/structs/structs.go b/agent/structs/structs.go index f83060a3f1fc..b86f869697d4 100644 --- a/agent/structs/structs.go +++ b/agent/structs/structs.go @@ -74,6 +74,9 @@ const ( // metaValueMaxLength is the maximum allowed length of a metadata value metaValueMaxLength = 512 + // MetaSegmentKey is the node metadata key used to store the node's network segment + MetaSegmentKey = "consul-network-segment" + // MaxLockDelay provides a maximum LockDelay value for // a session. Any value above this will not be respected. MaxLockDelay = 60 * time.Second @@ -747,8 +750,9 @@ type IndexedSessions struct { // Coordinate stores a node name with its associated network coordinate. type Coordinate struct { - Node string - Coord *coordinate.Coordinate + Node string + Segment string + Coord *coordinate.Coordinate } type Coordinates []*Coordinate @@ -781,6 +785,7 @@ type DatacenterMap struct { type CoordinateUpdateRequest struct { Datacenter string Node string + Segment string Coord *coordinate.Coordinate WriteRequest } diff --git a/agent/testagent.go b/agent/testagent.go index de2306fa961b..e02efb339301 100644 --- a/agent/testagent.go +++ b/agent/testagent.go @@ -235,6 +235,13 @@ func (a *TestAgent) HTTPAddr() string { return a.srv.Addr } +func (a *TestAgent) SegmentAddr(name string) string { + if server, ok := a.Agent.delegate.(*consul.Server); ok { + return server.LANSegmentAddr(name) + } + return "" +} + func (a *TestAgent) Client() *api.Client { conf := api.DefaultConfig() conf.Address = a.HTTPAddr() diff --git a/api/agent.go b/api/agent.go index be4e00ff6b3b..9f39a33c8ccf 100644 --- a/api/agent.go +++ b/api/agent.go @@ -44,6 +44,15 @@ type AgentMember struct { DelegateCur uint8 } +// MemberOpts is used for querying member information. +type MemberOpts struct { + // Wan is whether to show members from the LAN. + Wan bool + + // Segment is the LAN segment to show members + Segment string +} + // AgentServiceRegistration is used to register a new service type AgentServiceRegistration struct { ID string `json:",omitempty"` @@ -256,6 +265,28 @@ func (a *Agent) Members(wan bool) ([]*AgentMember, error) { return out, nil } +// Members returns the known gossip members. The WAN +// flag can be used to query a server for WAN members. +func (a *Agent) MembersOpts(wan bool, segment string) ([]*AgentMember, error) { + r := a.c.newRequest("GET", "/v1/agent/members") + r.params.Set("segment", segment) + if wan { + r.params.Set("wan", "1") + } + + _, resp, err := requireOK(a.c.doRequest(r)) + if err != nil { + return nil, err + } + defer resp.Body.Close() + + var out []*AgentMember + if err := decodeBody(resp, &out); err != nil { + return nil, err + } + return out, nil +} + // ServiceRegister is used to register a new service with // the local agent func (a *Agent) ServiceRegister(service *AgentServiceRegistration) error { diff --git a/api/catalog_test.go b/api/catalog_test.go index 2f49969bae78..57cf5acdc95c 100644 --- a/api/catalog_test.go +++ b/api/catalog_test.go @@ -48,7 +48,9 @@ func TestAPI_CatalogNodes(t *testing.T) { "lan": "127.0.0.1", "wan": "127.0.0.1", }, - Meta: map[string]string{}, + Meta: map[string]string{ + "consul-network-segment": "", + }, CreateIndex: meta.LastIndex - 1, ModifyIndex: meta.LastIndex, }, diff --git a/api/coordinate.go b/api/coordinate.go index ae8d16ee6851..90214e392ce7 100644 --- a/api/coordinate.go +++ b/api/coordinate.go @@ -6,8 +6,9 @@ import ( // CoordinateEntry represents a node and its associated network coordinate. type CoordinateEntry struct { - Node string - Coord *coordinate.Coordinate + Node string + Segment string + Coord *coordinate.Coordinate } // CoordinateDatacenterMap has the coordinates for servers in a given datacenter diff --git a/api/operator_segment.go b/api/operator_segment.go new file mode 100644 index 000000000000..432d55badd4b --- /dev/null +++ b/api/operator_segment.go @@ -0,0 +1,11 @@ +package api + +// SegmentList returns all the available LAN segments. +func (op *Operator) SegmentList(q *QueryOptions) ([]string, *QueryMeta, error) { + var out []string + qm, err := op.c.query("/v1/operator/segment/list", &out, q) + if err != nil { + return nil, nil, err + } + return out, qm, nil +} diff --git a/command/agent.go b/command/agent.go index 36e1ccd26bb6..d41534baff79 100644 --- a/command/agent.go +++ b/command/agent.go @@ -117,6 +117,7 @@ func (cmd *AgentCommand) readConfig() *agent.Config { f.StringVar(&cmdCfg.AdvertiseAddr, "advertise", "", "Sets the advertise address to use.") f.StringVar(&cmdCfg.AdvertiseAddrWan, "advertise-wan", "", "Sets address to advertise on WAN instead of -advertise address.") + f.StringVar(&cmdCfg.Segment, "segment", "", "(Enterprise-only) Sets the network segment to join.") f.IntVar(&cmdCfg.Protocol, "protocol", -1, "Sets the protocol version. Defaults to latest.") @@ -224,6 +225,10 @@ func (cmd *AgentCommand) readConfig() *agent.Config { key, value := agent.ParseMetaPair(entry) cmdCfg.Meta[key] = value } + if err := structs.ValidateMetadata(cmdCfg.Meta); err != nil { + cmd.UI.Error(fmt.Sprintf("Failed to parse node metadata: %v", err)) + return nil + } } cfg := agent.DefaultConfig() @@ -508,11 +513,6 @@ func (cmd *AgentCommand) readConfig() *agent.Config { cmd.UI.Error("WARNING: Bootstrap mode enabled! Do not enable unless necessary") } - // Verify the node metadata entries are valid - if err := structs.ValidateMetadata(cfg.Meta); err != nil { - cmd.UI.Error(fmt.Sprintf("Failed to parse node metadata: %v", err)) - } - // It doesn't make sense to include both UI options. if cfg.EnableUI == true && cfg.UIDir != "" { cmd.UI.Error("Both the ui and ui-dir flags were specified, please provide only one") @@ -804,17 +804,22 @@ func (cmd *AgentCommand) run(args []string) int { // Let the agent know we've finished registration agent.StartSync() + segment := config.Segment + if config.Server { + segment = "" + } + cmd.UI.Output("Consul agent running!") cmd.UI.Info(fmt.Sprintf(" Version: '%s'", cmd.HumanVersion)) cmd.UI.Info(fmt.Sprintf(" Node ID: '%s'", config.NodeID)) cmd.UI.Info(fmt.Sprintf(" Node name: '%s'", config.NodeName)) - cmd.UI.Info(fmt.Sprintf(" Datacenter: '%s'", config.Datacenter)) - cmd.UI.Info(fmt.Sprintf(" Server: %v (bootstrap: %v)", config.Server, config.Bootstrap)) + cmd.UI.Info(fmt.Sprintf(" Datacenter: '%s' (Segment: '%s')", config.Datacenter, segment)) + cmd.UI.Info(fmt.Sprintf(" Server: %v (Bootstrap: %v)", config.Server, config.Bootstrap)) cmd.UI.Info(fmt.Sprintf(" Client Addr: %v (HTTP: %d, HTTPS: %d, DNS: %d)", config.ClientAddr, config.Ports.HTTP, config.Ports.HTTPS, config.Ports.DNS)) cmd.UI.Info(fmt.Sprintf(" Cluster Addr: %v (LAN: %d, WAN: %d)", config.AdvertiseAddr, config.Ports.SerfLan, config.Ports.SerfWan)) - cmd.UI.Info(fmt.Sprintf("Gossip encrypt: %v, RPC-TLS: %v, TLS-Incoming: %v", + cmd.UI.Info(fmt.Sprintf(" Encrypt: Gossip: %v, TLS-Outgoing: %v, TLS-Incoming: %v", agent.GossipEncrypted(), config.VerifyOutgoing, config.VerifyIncoming)) // Enable log streaming diff --git a/command/agent_test.go b/command/agent_test.go index b8a98508e4d5..414fef968985 100644 --- a/command/agent_test.go +++ b/command/agent_test.go @@ -196,8 +196,11 @@ func TestReadCliConfig(t *testing.T) { if config.SerfLanBindAddr != "4.3.2.2" { t.Fatalf("expected -serf-lan-bind 4.3.2.2 got %s", config.SerfLanBindAddr) } - if len(config.Meta) != 1 || config.Meta["somekey"] != "somevalue" { - t.Fatalf("expected somekey=somevalue, got %v", config.Meta) + expected := map[string]string{ + "somekey": "somevalue", + } + if !reflect.DeepEqual(config.Meta, expected) { + t.Fatalf("bad: %v %v", config.Meta, expected) } } @@ -213,11 +216,11 @@ func TestReadCliConfig(t *testing.T) { ShutdownCh: shutdownCh, BaseCommand: baseCommand(cli.NewMockUi()), } + config := cmd.readConfig() expected := map[string]string{ "somekey": "somevalue", "otherkey": "othervalue", } - config := cmd.readConfig() if !reflect.DeepEqual(config.Meta, expected) { t.Fatalf("bad: %v %v", config.Meta, expected) } diff --git a/command/members.go b/command/members.go index 631d1c08b3dc..d6ea606163df 100644 --- a/command/members.go +++ b/command/members.go @@ -33,6 +33,7 @@ func (c *MembersCommand) Run(args []string) int { var detailed bool var wan bool var statusFilter string + var segment string f := c.BaseCommand.NewFlagSet(c) f.BoolVar(&detailed, "detailed", false, @@ -43,6 +44,9 @@ func (c *MembersCommand) Run(args []string) int { f.StringVar(&statusFilter, "status", ".*", "If provided, output is filtered to only nodes matching the regular "+ "expression for status.") + f.StringVar(&segment, "segment", "", + "(Enterprise-only) If provided, output is filtered to only nodes in"+ + "the given segment.") if err := c.BaseCommand.Parse(args); err != nil { return 1 @@ -61,16 +65,39 @@ func (c *MembersCommand) Run(args []string) int { return 1 } - members, err := client.Agent().Members(wan) + members, err := client.Agent().MembersOpts(wan, segment) if err != nil { c.UI.Error(fmt.Sprintf("Error retrieving members: %s", err)) return 1 } + // Check if we queried a server and need to query for members in all segments. + if !wan && segment == "" { + self, err := client.Agent().Self() + if err != nil { + c.UI.Error(fmt.Sprintf("Error retrieving agent info: %s", err)) + return 1 + } + if self["Config"]["Server"].(bool) { + segmentMembers, err := getSegmentMembers(client) + if err != nil { + c.UI.Error(fmt.Sprintf("Error retrieving members in segments: %s", err)) + return 1 + } + members = append(members, segmentMembers...) + } + } + // Filter the results n := len(members) for i := 0; i < n; i++ { member := members[i] + if member.Tags["segment"] == "" { + member.Tags["segment"] = "" + if member.Tags["role"] == "consul" { + member.Tags["segment"] = "" + } + } statusString := serf.MemberStatus(member.Status).String() if !statusRe.MatchString(statusString) { members[i], members[n-1] = members[n-1], members[i] @@ -86,7 +113,7 @@ func (c *MembersCommand) Run(args []string) int { return 2 } - sort.Sort(ByMemberName(members)) + sort.Sort(ByMemberNameAndSegment(members)) // Generate the output var result []string @@ -104,17 +131,26 @@ func (c *MembersCommand) Run(args []string) int { } // so we can sort members by name -type ByMemberName []*consulapi.AgentMember - -func (m ByMemberName) Len() int { return len(m) } -func (m ByMemberName) Swap(i, j int) { m[i], m[j] = m[j], m[i] } -func (m ByMemberName) Less(i, j int) bool { return m[i].Name < m[j].Name } +type ByMemberNameAndSegment []*consulapi.AgentMember + +func (m ByMemberNameAndSegment) Len() int { return len(m) } +func (m ByMemberNameAndSegment) Swap(i, j int) { m[i], m[j] = m[j], m[i] } +func (m ByMemberNameAndSegment) Less(i, j int) bool { + switch { + case m[i].Tags["segment"] < m[j].Tags["segment"]: + return true + case m[i].Tags["segment"] > m[j].Tags["segment"]: + return false + default: + return m[i].Name < m[j].Name + } +} // standardOutput is used to dump the most useful information about nodes // in a more human-friendly format func (c *MembersCommand) standardOutput(members []*consulapi.AgentMember) []string { result := make([]string, 0, len(members)) - header := "Node|Address|Status|Type|Build|Protocol|DC" + header := "Node|Address|Status|Type|Build|Protocol|DC|Segment" result = append(result, header) for _, member := range members { addr := net.TCPAddr{IP: net.ParseIP(member.Addr), Port: int(member.Port)} @@ -126,19 +162,20 @@ func (c *MembersCommand) standardOutput(members []*consulapi.AgentMember) []stri build = build[:idx] } dc := member.Tags["dc"] + segment := member.Tags["segment"] statusString := serf.MemberStatus(member.Status).String() switch member.Tags["role"] { case "node": - line := fmt.Sprintf("%s|%s|%s|client|%s|%s|%s", - member.Name, addr.String(), statusString, build, protocol, dc) + line := fmt.Sprintf("%s|%s|%s|client|%s|%s|%s|%s", + member.Name, addr.String(), statusString, build, protocol, dc, segment) result = append(result, line) case "consul": - line := fmt.Sprintf("%s|%s|%s|server|%s|%s|%s", - member.Name, addr.String(), statusString, build, protocol, dc) + line := fmt.Sprintf("%s|%s|%s|server|%s|%s|%s|%s", + member.Name, addr.String(), statusString, build, protocol, dc, segment) result = append(result, line) default: - line := fmt.Sprintf("%s|%s|%s|unknown|||", + line := fmt.Sprintf("%s|%s|%s|unknown||||", member.Name, addr.String(), statusString) result = append(result, line) } diff --git a/command/rtt.go b/command/rtt.go index 591677c67ece..7fdf337c644f 100644 --- a/command/rtt.go +++ b/command/rtt.go @@ -4,6 +4,7 @@ import ( "fmt" "strings" + "github.com/hashicorp/consul/lib" "github.com/hashicorp/serf/coordinate" ) @@ -145,18 +146,21 @@ func (c *RTTCommand) Run(args []string) int { return 1 } - // See if the requested nodes are in there. + // Index all the coordinates by segment. + cs1, cs2 := make(lib.CoordinateSet), make(lib.CoordinateSet) for _, entry := range entries { if entry.Node == nodes[0] { - coord1 = entry.Coord + cs1[entry.Segment] = entry.Coord } if entry.Node == nodes[1] { - coord2 = entry.Coord + cs2[entry.Segment] = entry.Coord } + } - if coord1 != nil && coord2 != nil { - goto SHOW_RTT - } + // See if there's a compatible set of coordinates. + coord1, coord2 = cs1.Intersect(cs2) + if coord1 != nil && coord2 != nil { + goto SHOW_RTT } } diff --git a/command/segment_stub.go b/command/segment_stub.go new file mode 100644 index 000000000000..1d4a2efb8c23 --- /dev/null +++ b/command/segment_stub.go @@ -0,0 +1,13 @@ +// +build !ent + +package command + +import ( + consulapi "github.com/hashicorp/consul/api" +) + +// getSegmentMembers returns an empty list since network segments are not +// supported in OSS Consul. +func getSegmentMembers(client *consulapi.Client) ([]*consulapi.AgentMember, error) { + return nil, nil +} diff --git a/lib/rtt.go b/lib/rtt.go index e53a52707ad4..a417f533c915 100644 --- a/lib/rtt.go +++ b/lib/rtt.go @@ -18,6 +18,39 @@ func ComputeDistance(a *coordinate.Coordinate, b *coordinate.Coordinate) float64 return a.DistanceTo(b).Seconds() } +// CoordinateSet holds all the coordinates for a given node, indexed by network +// segment name. +type CoordinateSet map[string]*coordinate.Coordinate + +// Intersect tries to return a pair of coordinates which are compatible with the +// current set and a given set. We employ some special knowledge about network +// segments to avoid doing a full intersection, since this is in several hot +// paths. This might return nil for either coordinate in the output pair if an +// intersection cannot be found. The ComputeDistance function above is designed +// to deal with that. +func (cs CoordinateSet) Intersect(other CoordinateSet) (*coordinate.Coordinate, *coordinate.Coordinate) { + // Use the empty segment by default. + segment := "" + + // If we have a single segment, then let our segment take priority since + // we are possibly a client. Any node with more than one segment can only + // be a server, which means it should be in all segments. + if len(cs) == 1 { + for s, _ := range cs { + segment = s + } + } + + // Likewise for the other set. + if len(other) == 1 { + for s, _ := range other { + segment = s + } + } + + return cs[segment], other[segment] +} + // GenerateCoordinate creates a new coordinate with the given distance from the // origin. This should only be used for tests. func GenerateCoordinate(rtt time.Duration) *coordinate.Coordinate { diff --git a/lib/rtt_test.go b/lib/rtt_test.go index 2d6fe16f75dc..22004494dd2c 100644 --- a/lib/rtt_test.go +++ b/lib/rtt_test.go @@ -6,49 +6,148 @@ import ( "time" "github.com/hashicorp/serf/coordinate" + "github.com/pascaldekloe/goe/verify" ) -func TestRTT(t *testing.T) { - cases := []struct { +func TestRTT_ComputeDistance(t *testing.T) { + tests := []struct { + desc string a *coordinate.Coordinate b *coordinate.Coordinate dist float64 }{ { + "10 ms", GenerateCoordinate(0), GenerateCoordinate(10 * time.Millisecond), 0.010, }, { + "0 ms", GenerateCoordinate(10 * time.Millisecond), GenerateCoordinate(10 * time.Millisecond), 0.0, }, { + "2 ms", GenerateCoordinate(8 * time.Millisecond), GenerateCoordinate(10 * time.Millisecond), 0.002, }, { + "2 ms reversed", GenerateCoordinate(10 * time.Millisecond), GenerateCoordinate(8 * time.Millisecond), 0.002, }, { + "a nil", nil, GenerateCoordinate(8 * time.Millisecond), math.Inf(1.0), }, { + "b nil", GenerateCoordinate(8 * time.Millisecond), nil, math.Inf(1.0), }, + { + "both nil", + nil, + nil, + math.Inf(1.0), + }, + } + for _, tt := range tests { + t.Run(tt.desc, func(t *testing.T) { + dist := ComputeDistance(tt.a, tt.b) + verify.Values(t, "", dist, tt.dist) + }) + } +} + +func TestRTT_Intersect(t *testing.T) { + // The numbers here don't matter, we just want a unique coordinate for + // each one. + server_1 := CoordinateSet{ + "": GenerateCoordinate(1 * time.Millisecond), + "alpha": GenerateCoordinate(2 * time.Millisecond), + "beta": GenerateCoordinate(3 * time.Millisecond), + } + server_2 := CoordinateSet{ + "": GenerateCoordinate(4 * time.Millisecond), + "alpha": GenerateCoordinate(5 * time.Millisecond), + "beta": GenerateCoordinate(6 * time.Millisecond), + } + client_alpha := CoordinateSet{ + "alpha": GenerateCoordinate(7 * time.Millisecond), + } + client_beta_1 := CoordinateSet{ + "beta": GenerateCoordinate(8 * time.Millisecond), + } + client_beta_2 := CoordinateSet{ + "beta": GenerateCoordinate(9 * time.Millisecond), + } + + tests := []struct { + desc string + a CoordinateSet + b CoordinateSet + c1 *coordinate.Coordinate + c2 *coordinate.Coordinate + }{ + { + "nil maps", + nil, nil, + nil, nil, + }, + { + "two servers", + server_1, server_2, + server_1[""], server_2[""], + }, + { + "two clients", + client_beta_1, client_beta_2, + client_beta_1["beta"], client_beta_2["beta"], + }, + { + "server_1 and client alpha", + server_1, client_alpha, + server_1["alpha"], client_alpha["alpha"], + }, + { + "server_1 and client beta 1", + server_1, client_beta_1, + server_1["beta"], client_beta_1["beta"], + }, + { + "server_1 and client alpha reversed", + client_alpha, server_1, + client_alpha["alpha"], server_1["alpha"], + }, + { + "server_1 and client beta 1 reversed", + client_beta_1, server_1, + client_beta_1["beta"], server_1["beta"], + }, + { + "nothing in common", + client_alpha, client_beta_1, + nil, client_beta_1["beta"], + }, + { + "nothing in common reversed", + client_beta_1, client_alpha, + nil, client_alpha["alpha"], + }, } - for i, c := range cases { - dist := ComputeDistance(c.a, c.b) - if c.dist != dist { - t.Fatalf("bad (%d): %9.6f != %9.6f", i, c.dist, dist) - } + for _, tt := range tests { + t.Run(tt.desc, func(t *testing.T) { + r1, r2 := tt.a.Intersect(tt.b) + verify.Values(t, "", r1, tt.c1) + verify.Values(t, "", r2, tt.c2) + }) } } diff --git a/testutil/server.go b/testutil/server.go index 969d06a58481..9852c055e9e0 100644 --- a/testutil/server.go +++ b/testutil/server.go @@ -58,6 +58,14 @@ type TestAddressConfig struct { HTTP string `json:"http,omitempty"` } +// TestNetworkSegment contains the configuration for a network segment. +type TestNetworkSegment struct { + Name string `json:"name"` + Bind string `json:"bind"` + Port int `json:"port"` + Advertise string `json:"advertise"` +} + // TestServerConfig is the main server configuration struct. type TestServerConfig struct { NodeName string `json:"node_name"` @@ -68,6 +76,7 @@ type TestServerConfig struct { Server bool `json:"server,omitempty"` DataDir string `json:"data_dir,omitempty"` Datacenter string `json:"datacenter,omitempty"` + Segments []TestNetworkSegment `json:"segments"` DisableCheckpoint bool `json:"disable_update_check"` LogLevel string `json:"log_level,omitempty"` Bind string `json:"bind_addr,omitempty"` diff --git a/website/source/api/coordinate.html.md b/website/source/api/coordinate.html.md index 14800b5c30ca..f92b8601d756 100644 --- a/website/source/api/coordinate.html.md +++ b/website/source/api/coordinate.html.md @@ -108,6 +108,7 @@ $ curl \ [ { "Node": "agent-one", + "Segment": "", "Coord": { "Adjustment": 0, "Error": 1.5, @@ -117,3 +118,7 @@ $ curl \ } ] ``` + +In **Consul Enterprise**, this may include multiple coordinates for the same node, +each marked with a different `Segment`. Coordinates are only compatible within the same +segment.