diff --git a/command/agent/consul/client.go b/command/agent/consul/client.go index 6b99147bb3a..578c1bfc9d5 100644 --- a/command/agent/consul/client.go +++ b/command/agent/consul/client.go @@ -109,7 +109,17 @@ type ACLsAPI interface { // agentServiceUpdateRequired checks if any critical fields in Nomad's version // of a service definition are different from the existing service definition as // known by Consul. -func agentServiceUpdateRequired(reason syncReason, wanted *api.AgentServiceRegistration, existing *api.AgentService) bool { +// +// reason - The syncReason that triggered this synchronization with the consul +// agent API. +// wanted - Nomad's view of what the service definition is intended to be. +// Not nil. +// existing - Consul's view (agent, not catalog) of the actual service definition. +// Not nil. +// sidecar - Consul's view (agent, not catalog) of the service definition of the sidecar +// associated with existing that may or may not exist. +// May be nil. +func agentServiceUpdateRequired(reason syncReason, wanted *api.AgentServiceRegistration, existing *api.AgentService, sidecar *api.AgentService) bool { switch reason { case syncPeriodic: // In a periodic sync with Consul, we need to respect the value of @@ -123,31 +133,39 @@ func agentServiceUpdateRequired(reason syncReason, wanted *api.AgentServiceRegis // // We do so by over-writing the nomad service registration by the value // of the tags that Consul contains, if enable_tag_override = true. - maybeTweakTags(wanted, existing) - return different(wanted, existing) + maybeTweakTags(wanted, existing, sidecar) + return different(wanted, existing, sidecar) default: // A non-periodic sync with Consul indicates an operation has been set // on the queue. This happens when service has been added / removed / modified // and implies the Consul agent should be sync'd with nomad, because // nomad is the ultimate source of truth for the service definition. - return different(wanted, existing) + return different(wanted, existing, sidecar) } } // maybeTweakTags will override wanted.Tags with a copy of existing.Tags only if // EnableTagOverride is true. Otherwise the wanted service registration is left // unchanged. -func maybeTweakTags(wanted *api.AgentServiceRegistration, existing *api.AgentService) { +func maybeTweakTags(wanted *api.AgentServiceRegistration, existing *api.AgentService, sidecar *api.AgentService) { if wanted.EnableTagOverride { wanted.Tags = helper.CopySliceString(existing.Tags) + // If the service registration also defines a sidecar service, use the ETO + // setting for the parent service to also apply to the sidecar. + if wanted.Connect != nil && wanted.Connect.SidecarService != nil { + if sidecar != nil { + wanted.Connect.SidecarService.Tags = helper.CopySliceString(sidecar.Tags) + } + } } } // different compares the wanted state of the service registration with the actual // (cached) state of the service registration reported by Consul. If any of the // critical fields are not deeply equal, they considered different. -func different(wanted *api.AgentServiceRegistration, existing *api.AgentService) bool { +func different(wanted *api.AgentServiceRegistration, existing *api.AgentService, sidecar *api.AgentService) bool { + return !(wanted.Kind == existing.Kind && wanted.ID == existing.ID && wanted.Port == existing.Port && @@ -155,7 +173,25 @@ func different(wanted *api.AgentServiceRegistration, existing *api.AgentService) wanted.Name == existing.Service && wanted.EnableTagOverride == existing.EnableTagOverride && reflect.DeepEqual(wanted.Meta, existing.Meta) && - reflect.DeepEqual(wanted.Tags, existing.Tags)) + reflect.DeepEqual(wanted.Tags, existing.Tags) && + !connectSidecarDifferent(wanted, sidecar)) +} + +func connectSidecarDifferent(wanted *api.AgentServiceRegistration, sidecar *api.AgentService) bool { + if wanted.Connect != nil && wanted.Connect.SidecarService != nil { + if sidecar == nil { + // consul lost our sidecar (?) + return true + } + if !reflect.DeepEqual(wanted.Connect.SidecarService.Tags, sidecar.Tags) { + // tags on the nomad definition have been modified + return true + } + } + + // There is no connect sidecar the nomad side; let consul anti-entropy worry + // about any registration on the consul side. + return false } // operations are submitted to the main loop via commit() for synchronizing @@ -365,7 +401,8 @@ func (c *ServiceClient) hasSeen() bool { // syncReason indicates why a sync operation with consul is about to happen. // // The trigger for a sync may have implications on the behavior of the sync itself. -// In particular, if a service is defined with enable_tag_override=true +// In particular if a service is defined with enable_tag_override=true, the sync +// should ignore changes to the service's Tags field. type syncReason byte const ( @@ -579,25 +616,20 @@ func (c *ServiceClient) sync(reason syncReason) error { } // Add Nomad services missing from Consul, or where the service has been updated. - for id, local := range c.services { - existingSvc, ok := consulServices[id] - - if ok { - // There is an existing registration of this service in Consul, so here - // we validate to see if the service has been invalidated to see if it - // should be updated. - if !agentServiceUpdateRequired(reason, local, existingSvc) { - // No Need to update services that have not changed - continue + for id, serviceInNomad := range c.services { + + serviceInConsul, exists := consulServices[id] + sidecarInConsul := getNomadSidecar(id, consulServices) + + if !exists || agentServiceUpdateRequired(reason, serviceInNomad, serviceInConsul, sidecarInConsul) { + if err = c.client.ServiceRegister(serviceInNomad); err != nil { + metrics.IncrCounter([]string{"client", "consul", "sync_failure"}, 1) + return err } + sreg++ + metrics.IncrCounter([]string{"client", "consul", "service_registrations"}, 1) } - if err = c.client.ServiceRegister(local); err != nil { - metrics.IncrCounter([]string{"client", "consul", "sync_failure"}, 1) - return err - } - sreg++ - metrics.IncrCounter([]string{"client", "consul", "service_registrations"}, 1) } // Remove Nomad checks in Consul but unknown locally @@ -1318,6 +1350,10 @@ func isOldNomadService(id string) bool { return strings.HasPrefix(id, prefix) } +const ( + sidecarSuffix = "-sidecar-proxy" +) + // isNomadSidecar returns true if the ID matches a sidecar proxy for a Nomad // managed service. // @@ -1330,16 +1366,29 @@ func isOldNomadService(id string) bool { // _nomad-task-5229c7f8-376b-3ccc-edd9-981e238f7033-cache-redis-cache-db-sidecar-proxy // func isNomadSidecar(id string, services map[string]*api.AgentServiceRegistration) bool { - const suffix = "-sidecar-proxy" - if !strings.HasSuffix(id, suffix) { + if !strings.HasSuffix(id, sidecarSuffix) { return false } // Make sure the Nomad managed service for this proxy still exists. - _, ok := services[id[:len(id)-len(suffix)]] + _, ok := services[id[:len(id)-len(sidecarSuffix)]] return ok } +// getNomadSidecar returns the service registration of the sidecar for the managed +// service with the specified id. +// +// If the managed service of the specified id does not exist, or the service does +// not have a sidecar proxy, nil is returned. +func getNomadSidecar(id string, services map[string]*api.AgentService) *api.AgentService { + if _, exists := services[id]; !exists { + return nil + } + + sidecarID := id + sidecarSuffix + return services[sidecarID] +} + // getAddress returns the IP and port to use for a service or check. If no port // label is specified (an empty value), zero values are returned because no // address could be resolved. diff --git a/command/agent/consul/client_test.go b/command/agent/consul/client_test.go index fc96eb00c8d..68bde388c76 100644 --- a/command/agent/consul/client_test.go +++ b/command/agent/consul/client_test.go @@ -1,30 +1,40 @@ package consul import ( + "reflect" "testing" "github.com/hashicorp/consul/api" "github.com/stretchr/testify/require" ) -func TestSyncLogic_agentServiceUpdateRequired(t *testing.T) { - t.Parallel() - - wanted := api.AgentServiceRegistration{ - Kind: "service", - ID: "_id", - Name: "name", +var ( + // the service as known by nomad + wanted = api.AgentServiceRegistration{ + Kind: "", + ID: "aca4c175-1778-5ef4-0220-2ab434147d35", + Name: "myservice", Tags: []string{"a", "b"}, Port: 9000, Address: "1.1.1.1", EnableTagOverride: true, Meta: map[string]string{"foo": "1"}, + Connect: &api.AgentServiceConnect{ + Native: false, + SidecarService: &api.AgentServiceRegistration{ + Kind: "connect-proxy", + ID: "_nomad-task-8e8413af-b5bb-aa67-2c24-c146c45f1ec9-group-mygroup-myservice-9001-sidecar-proxy", + Name: "name-sidecar-proxy", + Tags: []string{"x", "y", "z"}, + }, + }, } - existing := &api.AgentService{ - Kind: "service", - ID: "_id", - Service: "name", + // the service (and + connect proxy) as known by consul + existing = &api.AgentService{ + Kind: "", + ID: "aca4c175-1778-5ef4-0220-2ab434147d35", + Service: "myservice", Tags: []string{"a", "b"}, Port: 9000, Address: "1.1.1.1", @@ -32,6 +42,17 @@ func TestSyncLogic_agentServiceUpdateRequired(t *testing.T) { Meta: map[string]string{"foo": "1"}, } + sidecar = &api.AgentService{ + Kind: "connect-proxy", + ID: "_nomad-task-8e8413af-b5bb-aa67-2c24-c146c45f1ec9-group-mygroup-myservice-9001-sidecar-proxy", + Service: "myservice-sidecar-proxy", + Tags: []string{"x", "y", "z"}, + } +) + +func TestSyncLogic_agentServiceUpdateRequired(t *testing.T) { + t.Parallel() + // By default wanted and existing match. Each test should modify wanted in // 1 way, and / or configure the type of sync operation that is being // considered, then evaluate the result of the update-required algebra. @@ -44,7 +65,7 @@ func TestSyncLogic_agentServiceUpdateRequired(t *testing.T) { exp bool, reason syncReason, tweak tweaker) { - result := agentServiceUpdateRequired(reason, tweak(wanted), existing) + result := agentServiceUpdateRequired(reason, tweak(wanted), existing, sidecar) require.Equal(t, exp, result) } @@ -103,7 +124,7 @@ func TestSyncLogic_agentServiceUpdateRequired(t *testing.T) { }) }) - t.Run("different tags syncNewOps eto->true", func(t *testing.T) { + t.Run("different tags syncNewOps eto=true", func(t *testing.T) { // sync is required even though eto=true, because NewOps indicates the // service definition in nomad has changed (e.g. job run a modified job) try(t, true, syncNewOps, func(w asr) *asr { @@ -112,7 +133,7 @@ func TestSyncLogic_agentServiceUpdateRequired(t *testing.T) { }) }) - t.Run("different tags syncPeriodic eto->true", func(t *testing.T) { + t.Run("different tags syncPeriodic eto=true", func(t *testing.T) { // sync is not required since eto=true and this is a periodic sync // with consul - in which case we keep Consul's definition of the tags try(t, false, syncPeriodic, func(w asr) *asr { @@ -121,11 +142,29 @@ func TestSyncLogic_agentServiceUpdateRequired(t *testing.T) { }) }) + t.Run("different sidecar tags on syncPeriodic eto=true", func(t *testing.T) { + try(t, false, syncPeriodic, func(w asr) *asr { + // like the parent service, the sidecar's tags do not get enforced + // if ETO is true and this is a periodic sync + w.Connect.SidecarService.Tags = []string{"other", "tags"} + return &w + }) + }) + + t.Run("different sidecar tags on syncNewOps eto=true", func(t *testing.T) { + try(t, true, syncNewOps, func(w asr) *asr { + // like the parent service, the sidecar's tags always get enforced + // regardless of ETO if this is a sync due to applied operations + w.Connect.SidecarService.Tags = []string{"other", "tags"} + return &w + }) + }) + // for remaining tests, EnableTagOverride = false wanted.EnableTagOverride = false existing.EnableTagOverride = false - t.Run("different tags : syncPeriodic : eto->false", func(t *testing.T) { + t.Run("different tags syncPeriodic eto=false", func(t *testing.T) { // sync is required because eto=false and the tags do not match try(t, true, syncPeriodic, func(w asr) *asr { w.Tags = []string{"other", "tags"} @@ -133,28 +172,100 @@ func TestSyncLogic_agentServiceUpdateRequired(t *testing.T) { }) }) - t.Run("different tags : syncNewOps : eto->false", func(t *testing.T) { - // sync is required because it was triggered by NewOps and the tags - // do not match + t.Run("different tags syncNewOps eto=false", func(t *testing.T) { + // sync is required because eto=false and the tags do not match try(t, true, syncNewOps, func(w asr) *asr { w.Tags = []string{"other", "tags"} return &w }) }) + + t.Run("different sidecar tags on syncPeriodic eto=false", func(t *testing.T) { + // like the parent service, sync is required because eto=false and the + // sidecar's tags do not match + try(t, true, syncPeriodic, func(w asr) *asr { + w.Connect.SidecarService.Tags = []string{"other", "tags"} + return &w + }) + }) + + t.Run("different sidecar tags syncNewOps eto=false", func(t *testing.T) { + // like the parent service, sync is required because eto=false and the + // sidecar's tags do not match + try(t, true, syncNewOps, func(w asr) *asr { + w.Connect.SidecarService.Tags = []string{"other", "tags"} + return &w + }) + }) } func TestSyncLogic_maybeTweakTags(t *testing.T) { t.Parallel() - r := require.New(t) - - wanted := &api.AgentServiceRegistration{Tags: []string{"original"}} - existing := &api.AgentService{Tags: []string{"other"}} - maybeTweakTags(wanted, existing) - r.Equal([]string{"original"}, wanted.Tags) - - wantedETO := &api.AgentServiceRegistration{Tags: []string{"original"}, EnableTagOverride: true} - existingETO := &api.AgentService{Tags: []string{"other"}, EnableTagOverride: true} - maybeTweakTags(wantedETO, existingETO) - r.Equal(existingETO.Tags, wantedETO.Tags) - r.False(&(existingETO.Tags) == &(wantedETO.Tags)) + + differentPointers := func(a, b []string) bool { + return &(a) != &(b) + } + + try := func(inConsul, inConsulSC []string, eto bool) { + wanted := &api.AgentServiceRegistration{ + Tags: []string{"original"}, + Connect: &api.AgentServiceConnect{ + SidecarService: &api.AgentServiceRegistration{ + Tags: []string{"original-sidecar"}, + }, + }, + EnableTagOverride: eto, + } + + existing := &api.AgentService{Tags: inConsul} + sidecar := &api.AgentService{Tags: inConsulSC} + + maybeTweakTags(wanted, existing, sidecar) + + switch eto { + case false: + require.Equal(t, []string{"original"}, wanted.Tags) + require.Equal(t, []string{"original-sidecar"}, wanted.Connect.SidecarService.Tags) + require.True(t, differentPointers(wanted.Tags, wanted.Connect.SidecarService.Tags)) + case true: + require.Equal(t, inConsul, wanted.Tags) + require.Equal(t, inConsulSC, wanted.Connect.SidecarService.Tags) + require.True(t, differentPointers(wanted.Tags, wanted.Connect.SidecarService.Tags)) + } + } + + try([]string{"original"}, []string{"original-sidecar"}, true) + try([]string{"original"}, []string{"original-sidecar"}, false) + try([]string{"modified"}, []string{"original-sidecar"}, true) + try([]string{"modified"}, []string{"original-sidecar"}, false) + try([]string{"original"}, []string{"modified-sidecar"}, true) + try([]string{"original"}, []string{"modified-sidecar"}, false) + try([]string{"modified"}, []string{"modified-sidecar"}, true) + try([]string{"modified"}, []string{"modified-sidecar"}, false) +} + +func TestSyncLogic_maybeTweakTags_emptySC(t *testing.T) { + t.Parallel() + + // Check the edge cases where the connect service is deleted on the nomad + // side (i.e. are we checking multiple nil pointers). + + try := func(asr *api.AgentServiceRegistration) { + maybeTweakTags(asr, existing, sidecar) + require.False(t, reflect.DeepEqual([]string{"original"}, asr.Tags)) + } + + try(&api.AgentServiceRegistration{ + Tags: []string{"original"}, + EnableTagOverride: true, + Connect: nil, // ooh danger! + }) + + try(&api.AgentServiceRegistration{ + Tags: []string{"original"}, + EnableTagOverride: true, + Connect: &api.AgentServiceConnect{ + SidecarService: nil, // ooh danger! + }, + }) } diff --git a/nomad/structs/services.go b/nomad/structs/services.go index becb143c47b..dbafdfd1164 100644 --- a/nomad/structs/services.go +++ b/nomad/structs/services.go @@ -3,11 +3,13 @@ package structs import ( "crypto/sha1" "fmt" + "hash" "io" "net/url" "reflect" "regexp" "sort" + "strconv" "strings" "time" @@ -462,29 +464,18 @@ func (s *Service) ValidateName(name string) error { // as they're hashed independently. func (s *Service) Hash(allocID, taskName string, canary bool) string { h := sha1.New() - io.WriteString(h, allocID) - io.WriteString(h, taskName) - io.WriteString(h, s.Name) - io.WriteString(h, s.PortLabel) - io.WriteString(h, s.AddressMode) - for _, tag := range s.Tags { - io.WriteString(h, tag) - } - for _, tag := range s.CanaryTags { - io.WriteString(h, tag) - } - if len(s.Meta) > 0 { - fmt.Fprintf(h, "%v", s.Meta) - } - if len(s.CanaryMeta) > 0 { - fmt.Fprintf(h, "%v", s.CanaryMeta) - } - fmt.Fprintf(h, "%t", s.EnableTagOverride) - - // Vary ID on whether or not CanaryTags will be used - if canary { - h.Write([]byte("Canary")) - } + hashString(h, allocID) + hashString(h, taskName) + hashString(h, s.Name) + hashString(h, s.PortLabel) + hashString(h, s.AddressMode) + hashTags(h, s.Tags) + hashTags(h, s.CanaryTags) + hashBool(h, canary, "Canary") + hashBool(h, s.EnableTagOverride, "ETO") + hashMeta(h, s.Meta) + hashMeta(h, s.CanaryMeta) + hashConnect(h, s.Connect) // Base32 is used for encoding the hash as sha1 hashes can always be // encoded without padding, only 4 bytes larger than base64, and saves @@ -493,6 +484,46 @@ func (s *Service) Hash(allocID, taskName string, canary bool) string { return b32.EncodeToString(h.Sum(nil)) } +func hashConnect(h hash.Hash, connect *ConsulConnect) { + if connect != nil && connect.SidecarService != nil { + hashString(h, connect.SidecarService.Port) + hashTags(h, connect.SidecarService.Tags) + if p := connect.SidecarService.Proxy; p != nil { + hashString(h, p.LocalServiceAddress) + hashString(h, strconv.Itoa(p.LocalServicePort)) + hashConfig(h, p.Config) + for _, upstream := range p.Upstreams { + hashString(h, upstream.DestinationName) + hashString(h, strconv.Itoa(upstream.LocalBindPort)) + } + } + } +} + +func hashString(h hash.Hash, s string) { + _, _ = io.WriteString(h, s) +} + +func hashBool(h hash.Hash, b bool, name string) { + if b { + hashString(h, name) + } +} + +func hashTags(h hash.Hash, tags []string) { + for _, tag := range tags { + hashString(h, tag) + } +} + +func hashMeta(h hash.Hash, m map[string]string) { + _, _ = fmt.Fprintf(h, "%v", m) +} + +func hashConfig(h hash.Hash, c map[string]interface{}) { + _, _ = fmt.Fprintf(h, "%v", c) +} + // Equals returns true if the structs are recursively equal. func (s *Service) Equals(o *Service) bool { if s == nil || o == nil { diff --git a/nomad/structs/services_test.go b/nomad/structs/services_test.go index 9e92a7d5d4f..f92773e3011 100644 --- a/nomad/structs/services_test.go +++ b/nomad/structs/services_test.go @@ -8,6 +8,113 @@ import ( "github.com/stretchr/testify/require" ) +func TestService_Hash(t *testing.T) { + t.Parallel() + + original := &Service{ + Name: "myService", + PortLabel: "portLabel", + // AddressMode: "bridge", // not hashed + Tags: []string{"original", "tags"}, + CanaryTags: []string{"canary", "tags"}, + // Checks: nil, // not hashed (managed independently) + Connect: &ConsulConnect{ + // Native: false, // not hashed + SidecarService: &ConsulSidecarService{ + Tags: []string{"original", "sidecar", "tags"}, + Port: "9000", + Proxy: &ConsulProxy{ + LocalServiceAddress: "127.0.0.1", + LocalServicePort: 24000, + Config: map[string]interface{}{"foo": "bar"}, + Upstreams: []ConsulUpstream{{ + DestinationName: "upstream1", + LocalBindPort: 29000, + }}, + }, + }, + // SidecarTask: nil // not hashed + }} + + type svc = Service + type tweaker = func(service *svc) + + hash := func(s *svc, canary bool) string { + return s.Hash("AllocID", "TaskName", canary) + } + + t.Run("matching and is canary", func(t *testing.T) { + require.Equal(t, hash(original, true), hash(original, true)) + }) + + t.Run("matching and is not canary", func(t *testing.T) { + require.Equal(t, hash(original, false), hash(original, false)) + }) + + t.Run("matching mod canary", func(t *testing.T) { + require.NotEqual(t, hash(original, true), hash(original, false)) + }) + + try := func(t *testing.T, tweak tweaker) { + originalHash := hash(original, true) + modifiable := original.Copy() + tweak(modifiable) + tweakedHash := hash(modifiable, true) + require.NotEqual(t, originalHash, tweakedHash) + } + + // these tests use tweaker to modify 1 field and make the false assertion + // on comparing the resulting hash output + + t.Run("mod name", func(t *testing.T) { + try(t, func(s *svc) { s.Name = "newName" }) + }) + + t.Run("mod port label", func(t *testing.T) { + try(t, func(s *svc) { s.PortLabel = "newPortLabel" }) + }) + + t.Run("mod tags", func(t *testing.T) { + try(t, func(s *svc) { s.Tags = []string{"new", "tags"} }) + }) + + t.Run("mod canary tags", func(t *testing.T) { + try(t, func(s *svc) { s.CanaryTags = []string{"new", "tags"} }) + }) + + t.Run("mod enable tag override", func(t *testing.T) { + try(t, func(s *svc) { s.EnableTagOverride = true }) + }) + + t.Run("mod connect sidecar tags", func(t *testing.T) { + try(t, func(s *svc) { s.Connect.SidecarService.Tags = []string{"new", "tags"} }) + }) + + t.Run("mod connect sidecar port", func(t *testing.T) { + try(t, func(s *svc) { s.Connect.SidecarService.Port = "9090" }) + }) + + t.Run("mod connect sidecar proxy local service address", func(t *testing.T) { + try(t, func(s *svc) { s.Connect.SidecarService.Proxy.LocalServiceAddress = "1.1.1.1" }) + }) + + t.Run("mod connect sidecar proxy local service port", func(t *testing.T) { + try(t, func(s *svc) { s.Connect.SidecarService.Proxy.LocalServicePort = 9999 }) + }) + + t.Run("mod connect sidecar proxy config", func(t *testing.T) { + try(t, func(s *svc) { s.Connect.SidecarService.Proxy.Config = map[string]interface{}{"foo": "baz"} }) + }) + + t.Run("mod connect sidecar proxy upstream dest name", func(t *testing.T) { + try(t, func(s *svc) { s.Connect.SidecarService.Proxy.Upstreams[0].DestinationName = "dest2" }) + }) + + t.Run("mod connect sidecar proxy upstream dest local bind port", func(t *testing.T) { + try(t, func(s *svc) { s.Connect.SidecarService.Proxy.Upstreams[0].LocalBindPort = 29999 }) + }) +} + func TestConsulConnect_Validate(t *testing.T) { t.Parallel()