Skip to content

Commit

Permalink
Adds open source side of network segments (feature is Enterprise-only).
Browse files Browse the repository at this point in the history
  • Loading branch information
slackpad authored and kyhavlov committed Aug 30, 2017
1 parent 9ef2156 commit b1a15e0
Show file tree
Hide file tree
Showing 44 changed files with 1,089 additions and 452 deletions.
2 changes: 1 addition & 1 deletion GNUmakefile
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ cov:

test: dev-build vet
go test -tags '$(GOTAGS)' -i ./...
go test $(GOTEST_FLAGS) -tags '$(GOTAGS)' -timeout 7m -v ./... 2>&1 >test$(GOTEST_FLAGS).log ; echo $$? > exit-code
go test $(GOTEST_FLAGS) -tags '$(GOTAGS)' -timeout 7m -v ./... 2>&1 >test.log ; echo $$? > exit-code
@echo "Exit code: `cat exit-code`" >> test$(GOTEST_FLAGS).log
@echo "----"
@grep -A5 'DATA RACE' test.log || true
Expand Down
73 changes: 54 additions & 19 deletions agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ import (
"github.com/hashicorp/consul/watch"
"github.com/hashicorp/go-uuid"
"github.com/hashicorp/raft"
"github.com/hashicorp/serf/coordinate"
"github.com/hashicorp/serf/serf"
"github.com/shirou/gopsutil/host"
)
Expand All @@ -56,9 +55,10 @@ const (
// consul.Client and consul.Server.
type delegate interface {
Encrypted() bool
GetLANCoordinate() (*coordinate.Coordinate, error)
GetLANCoordinate() (lib.CoordinateSet, error)
Leave() error
LANMembers() []serf.Member
LANSegmentMembers(name string) ([]serf.Member, error)
LocalMember() serf.Member
JoinLAN(addrs []string) (n int, err error)
RemoveFailedNode(node string) error
Expand Down Expand Up @@ -647,6 +647,32 @@ func (a *Agent) consulConfig() (*consul.Config, error) {
if a.config.AdvertiseAddrs.RPC != nil {
base.RPCAdvertise = a.config.AdvertiseAddrs.RPC
}
base.Segment = a.config.Segment
for _, segment := range a.config.Segments {
config := consul.DefaultConfig().SerfLANConfig

config.MemberlistConfig.AdvertiseAddr = segment.Advertise
config.MemberlistConfig.AdvertisePort = segment.Port
config.MemberlistConfig.BindAddr = segment.Bind
config.MemberlistConfig.BindPort = segment.Port
if a.config.ReconnectTimeoutLan != 0 {
config.ReconnectTimeout = a.config.ReconnectTimeoutLan
}
if a.config.EncryptVerifyIncoming != nil {
config.MemberlistConfig.GossipVerifyIncoming = *a.config.EncryptVerifyIncoming
}
if a.config.EncryptVerifyOutgoing != nil {
config.MemberlistConfig.GossipVerifyOutgoing = *a.config.EncryptVerifyOutgoing
}

base.Segments = append(base.Segments, consul.NetworkSegment{
Name: segment.Name,
Bind: segment.Bind,
Port: segment.Port,
Advertise: segment.Advertise,
SerfConfig: config,
})
}
if a.config.Bootstrap {
base.Bootstrap = true
}
Expand Down Expand Up @@ -1154,15 +1180,16 @@ func (a *Agent) ResumeSync() {
a.state.Resume()
}

// GetLANCoordinate returns the coordinate of this node in the local pool (assumes coordinates
// are enabled, so check that before calling).
func (a *Agent) GetLANCoordinate() (*coordinate.Coordinate, error) {
// GetLANCoordinate returns the coordinates of this node in the local pools
// (assumes coordinates are enabled, so check that before calling).
func (a *Agent) GetLANCoordinate() (lib.CoordinateSet, error) {
return a.delegate.GetLANCoordinate()
}

// sendCoordinate is a long-running loop that periodically sends our coordinate
// to the server. Closing the agent's shutdownChannel will cause this to exit.
func (a *Agent) sendCoordinate() {
OUTER:
for {
rate := a.config.SyncCoordinateRateTarget
min := a.config.SyncCoordinateIntervalMin
Expand All @@ -1182,26 +1209,29 @@ func (a *Agent) sendCoordinate() {
continue
}

c, err := a.GetLANCoordinate()
cs, err := a.GetLANCoordinate()
if err != nil {
a.logger.Printf("[ERR] agent: Failed to get coordinate: %s", err)
continue
}

req := structs.CoordinateUpdateRequest{
Datacenter: a.config.Datacenter,
Node: a.config.NodeName,
Coord: c,
WriteRequest: structs.WriteRequest{Token: a.tokens.AgentToken()},
}
var reply struct{}
if err := a.RPC("Coordinate.Update", &req, &reply); err != nil {
if acl.IsErrPermissionDenied(err) {
a.logger.Printf("[WARN] agent: Coordinate update blocked by ACLs")
} else {
a.logger.Printf("[ERR] agent: Coordinate update error: %v", err)
for segment, coord := range cs {
req := structs.CoordinateUpdateRequest{
Datacenter: a.config.Datacenter,
Node: a.config.NodeName,
Segment: segment,
Coord: coord,
WriteRequest: structs.WriteRequest{Token: a.tokens.AgentToken()},
}
var reply struct{}
if err := a.RPC("Coordinate.Update", &req, &reply); err != nil {
if acl.IsErrPermissionDenied(err) {
a.logger.Printf("[WARN] agent: Coordinate update blocked by ACLs")
} else {
a.logger.Printf("[ERR] agent: Coordinate update error: %v", err)
}
continue OUTER
}
continue
}
case <-a.shutdownCh:
return
Expand Down Expand Up @@ -2105,6 +2135,11 @@ func (a *Agent) loadMetadata(conf *Config) error {
a.state.metadata[key] = value
}

// The segment isn't reloadable so we only add it once.
if _, ok := a.state.metadata[structs.MetaSegmentKey]; !ok {
a.state.metadata[structs.MetaSegmentKey] = conf.Segment
}

a.state.changeMade()

return nil
Expand Down
22 changes: 18 additions & 4 deletions agent/agent_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/api"
"github.com/hashicorp/consul/ipaddr"
"github.com/hashicorp/consul/lib"
"github.com/hashicorp/consul/logger"
"github.com/hashicorp/consul/types"
"github.com/hashicorp/logutils"
Expand All @@ -27,10 +28,10 @@ type Self struct {
}

func (s *HTTPServer) AgentSelf(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
var c *coordinate.Coordinate
var cs lib.CoordinateSet
if !s.agent.config.DisableCoordinates {
var err error
if c, err = s.agent.GetLANCoordinate(); err != nil {
if cs, err = s.agent.GetLANCoordinate(); err != nil {
return nil, err
}
}
Expand All @@ -48,7 +49,7 @@ func (s *HTTPServer) AgentSelf(resp http.ResponseWriter, req *http.Request) (int

return Self{
Config: s.agent.config,
Coord: c,
Coord: cs[s.agent.config.Segment],
Member: s.agent.LocalMember(),
Stats: s.agent.Stats(),
Meta: s.agent.state.Metadata(),
Expand Down Expand Up @@ -155,11 +156,24 @@ func (s *HTTPServer) AgentMembers(resp http.ResponseWriter, req *http.Request) (
wan = true
}

segment := req.URL.Query().Get("segment")
if wan && segment != "" {
resp.WriteHeader(http.StatusBadRequest)
fmt.Fprint(resp, "Cannot provide a segment with wan=true")
return nil, nil
}

var members []serf.Member
if wan {
members = s.agent.WANMembers()
} else {
} else if segment == "" {
members = s.agent.LANMembers()
} else {
var err error
members, err = s.agent.delegate.LANSegmentMembers(segment)
if err != nil {
return nil, err
}
}
if err := s.agent.filterMembers(token, &members); err != nil {
return nil, err
Expand Down
5 changes: 3 additions & 2 deletions agent/agent_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,13 +191,14 @@ func TestAgent_Self(t *testing.T) {
t.Fatalf("incorrect port: %v", obj)
}

c, err := a.GetLANCoordinate()
cs, err := a.GetLANCoordinate()
if err != nil {
t.Fatalf("err: %v", err)
}
if !reflect.DeepEqual(c, val.Coord) {
if c := cs[cfg.Segment]; !reflect.DeepEqual(c, val.Coord) {
t.Fatalf("coordinates are not equal: %v != %v", c, val.Coord)
}
delete(val.Meta, structs.MetaSegmentKey) // Added later, not in config.
if !reflect.DeepEqual(cfg.Meta, val.Meta) {
t.Fatalf("meta fields are not equal: %v != %v", cfg.Meta, val.Meta)
}
Expand Down
38 changes: 38 additions & 0 deletions agent/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -342,6 +342,22 @@ type Autopilot struct {
UpgradeVersionTag string `mapstructure:"upgrade_version_tag"`
}

// (Enterprise-only) NetworkSegment is the configuration for a network segment, which is an
// isolated serf group on the LAN.
type NetworkSegment struct {
// Name is the name of the segment.
Name string `mapstructure:"name"`

// Bind is the bind address for this segment.
Bind string `mapstructure:"bind"`

// Port is the port for this segment.
Port int `mapstructure:"port"`

// Advertise is the advertise address of this segment.
Advertise string `mapstructure:"advertise"`
}

// Config is the configuration that can be set for an Agent.
// Some of this is configurable as CLI flags, but most must
// be set using a configuration file.
Expand Down Expand Up @@ -465,6 +481,12 @@ type Config struct {
// Address configurations
Addresses AddressConfig

// (Enterprise-only) NetworkSegment is the network segment for this client to join
Segment string `mapstructure:"segment"`

// Segments
Segments []NetworkSegment `mapstructure:"segments"`

// Tagged addresses. These are used to publish a set of addresses for
// for a node, which can be used by the remote agent. We currently
// populate only the "wan" tag based on the SerfWan advertise address,
Expand Down Expand Up @@ -1426,6 +1448,11 @@ func DecodeConfig(r io.Reader) (*Config, error) {
}
}

// Validate node meta fields
if err := structs.ValidateMetadata(result.Meta); err != nil {
return nil, fmt.Errorf("Failed to parse node metadata: %v", err)
}

return &result, nil
}

Expand Down Expand Up @@ -1861,6 +1888,12 @@ func MergeConfig(a, b *Config) *Config {
if b.Addresses.RPC != "" {
result.Addresses.RPC = b.Addresses.RPC
}
if b.Segment != "" {
result.Segment = b.Segment
}
if len(b.Segments) > 0 {
result.Segments = append(result.Segments, b.Segments...)
}
if b.EnableUI {
result.EnableUI = true
}
Expand Down Expand Up @@ -2204,6 +2237,11 @@ func (c *Config) ResolveTmplAddrs() (err error) {
parse(&c.ClientAddr, true, "Client address")
parse(&c.SerfLanBindAddr, false, "Serf LAN address")
parse(&c.SerfWanBindAddr, false, "Serf WAN address")
for i, segment := range c.Segments {
parse(&c.Segments[i].Bind, false, fmt.Sprintf("Segment %q bind address", segment.Name))
parse(&c.Segments[i].Advertise, false, fmt.Sprintf("Segment %q advertise address", segment.Name))

}

return
}
Expand Down
17 changes: 17 additions & 0 deletions agent/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -592,6 +592,14 @@ func TestDecodeConfig(t *testing.T) {
in: `{"retry_max_wan":123}`,
c: &Config{RetryMaxAttemptsWan: 123},
},
{
in: `{"segment":"thing"}`,
c: &Config{Segment: "thing"},
},
{
in: `{"segments":[{"name": "alpha", "bind": "127.0.0.1", "port": 1234, "advertise": "1.1.1.1"}]}`,
c: &Config{Segments: []NetworkSegment{{Name: "alpha", Bind: "127.0.0.1", Port: 1234, Advertise: "1.1.1.1"}}},
},
{
in: `{"serf_lan_bind":"1.2.3.4"}`,
c: &Config{SerfLanBindAddr: "1.2.3.4"},
Expand Down Expand Up @@ -1401,6 +1409,15 @@ func TestMergeConfig(t *testing.T) {
HTTP: "127.0.0.2",
HTTPS: "127.0.0.4",
},
Segment: "alpha",
Segments: []NetworkSegment{
{
Name: "alpha",
Bind: "127.0.0.1",
Port: 1234,
Advertise: "127.0.0.2",
},
},
Server: true,
LeaveOnTerm: Bool(true),
SkipLeaveOnInt: Bool(true),
Expand Down
12 changes: 6 additions & 6 deletions agent/consul/catalog_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -890,9 +890,9 @@ func TestCatalog_ListNodes_DistanceSort(t *testing.T) {

// Set all but one of the nodes to known coordinates.
updates := structs.Coordinates{
{"foo", lib.GenerateCoordinate(2 * time.Millisecond)},
{"bar", lib.GenerateCoordinate(5 * time.Millisecond)},
{"baz", lib.GenerateCoordinate(1 * time.Millisecond)},
{Node: "foo", Coord: lib.GenerateCoordinate(2 * time.Millisecond)},
{Node: "bar", Coord: lib.GenerateCoordinate(5 * time.Millisecond)},
{Node: "baz", Coord: lib.GenerateCoordinate(1 * time.Millisecond)},
}
if err := s1.fsm.State().CoordinateBatchUpdate(5, updates); err != nil {
t.Fatalf("err: %v", err)
Expand Down Expand Up @@ -1495,9 +1495,9 @@ func TestCatalog_ListServiceNodes_DistanceSort(t *testing.T) {

// Set all but one of the nodes to known coordinates.
updates := structs.Coordinates{
{"foo", lib.GenerateCoordinate(2 * time.Millisecond)},
{"bar", lib.GenerateCoordinate(5 * time.Millisecond)},
{"baz", lib.GenerateCoordinate(1 * time.Millisecond)},
{Node: "foo", Coord: lib.GenerateCoordinate(2 * time.Millisecond)},
{Node: "bar", Coord: lib.GenerateCoordinate(5 * time.Millisecond)},
{Node: "baz", Coord: lib.GenerateCoordinate(1 * time.Millisecond)},
}
if err := s1.fsm.State().CoordinateBatchUpdate(9, updates); err != nil {
t.Fatalf("err: %v", err)
Expand Down
Loading

0 comments on commit b1a15e0

Please sign in to comment.