From 1225afbf0851ce126e101663f567f463411770f1 Mon Sep 17 00:00:00 2001 From: Seth Hoenig Date: Fri, 14 Feb 2020 13:44:34 -0600 Subject: [PATCH] consul/connect: in-place update sidecar service registrations on changes Fix a bug where consul service definitions would not be updated if changes were made to the service in the Nomad job. Currently this only fixes the bug for cases where the fix is a matter of updating consul agent's service registration. There is related bug where destructive changes are required (see #6877) which will be fixed in another PR. The enable_tag_override configuration setting for the parent service is applied to the sidecar service. Fixes #6459 --- command/agent/consul/client.go | 103 ++++++++++++----- command/agent/consul/client_test.go | 171 +++++++++++++++++++++++----- nomad/structs/services.go | 77 +++++++++---- nomad/structs/services_test.go | 107 +++++++++++++++++ 4 files changed, 378 insertions(+), 80 deletions(-) diff --git a/command/agent/consul/client.go b/command/agent/consul/client.go index 6b99147bb3a..578c1bfc9d5 100644 --- a/command/agent/consul/client.go +++ b/command/agent/consul/client.go @@ -109,7 +109,17 @@ type ACLsAPI interface { // agentServiceUpdateRequired checks if any critical fields in Nomad's version // of a service definition are different from the existing service definition as // known by Consul. -func agentServiceUpdateRequired(reason syncReason, wanted *api.AgentServiceRegistration, existing *api.AgentService) bool { +// +// reason - The syncReason that triggered this synchronization with the consul +// agent API. +// wanted - Nomad's view of what the service definition is intended to be. +// Not nil. +// existing - Consul's view (agent, not catalog) of the actual service definition. +// Not nil. +// sidecar - Consul's view (agent, not catalog) of the service definition of the sidecar +// associated with existing that may or may not exist. +// May be nil. +func agentServiceUpdateRequired(reason syncReason, wanted *api.AgentServiceRegistration, existing *api.AgentService, sidecar *api.AgentService) bool { switch reason { case syncPeriodic: // In a periodic sync with Consul, we need to respect the value of @@ -123,31 +133,39 @@ func agentServiceUpdateRequired(reason syncReason, wanted *api.AgentServiceRegis // // We do so by over-writing the nomad service registration by the value // of the tags that Consul contains, if enable_tag_override = true. - maybeTweakTags(wanted, existing) - return different(wanted, existing) + maybeTweakTags(wanted, existing, sidecar) + return different(wanted, existing, sidecar) default: // A non-periodic sync with Consul indicates an operation has been set // on the queue. This happens when service has been added / removed / modified // and implies the Consul agent should be sync'd with nomad, because // nomad is the ultimate source of truth for the service definition. - return different(wanted, existing) + return different(wanted, existing, sidecar) } } // maybeTweakTags will override wanted.Tags with a copy of existing.Tags only if // EnableTagOverride is true. Otherwise the wanted service registration is left // unchanged. -func maybeTweakTags(wanted *api.AgentServiceRegistration, existing *api.AgentService) { +func maybeTweakTags(wanted *api.AgentServiceRegistration, existing *api.AgentService, sidecar *api.AgentService) { if wanted.EnableTagOverride { wanted.Tags = helper.CopySliceString(existing.Tags) + // If the service registration also defines a sidecar service, use the ETO + // setting for the parent service to also apply to the sidecar. + if wanted.Connect != nil && wanted.Connect.SidecarService != nil { + if sidecar != nil { + wanted.Connect.SidecarService.Tags = helper.CopySliceString(sidecar.Tags) + } + } } } // different compares the wanted state of the service registration with the actual // (cached) state of the service registration reported by Consul. If any of the // critical fields are not deeply equal, they considered different. -func different(wanted *api.AgentServiceRegistration, existing *api.AgentService) bool { +func different(wanted *api.AgentServiceRegistration, existing *api.AgentService, sidecar *api.AgentService) bool { + return !(wanted.Kind == existing.Kind && wanted.ID == existing.ID && wanted.Port == existing.Port && @@ -155,7 +173,25 @@ func different(wanted *api.AgentServiceRegistration, existing *api.AgentService) wanted.Name == existing.Service && wanted.EnableTagOverride == existing.EnableTagOverride && reflect.DeepEqual(wanted.Meta, existing.Meta) && - reflect.DeepEqual(wanted.Tags, existing.Tags)) + reflect.DeepEqual(wanted.Tags, existing.Tags) && + !connectSidecarDifferent(wanted, sidecar)) +} + +func connectSidecarDifferent(wanted *api.AgentServiceRegistration, sidecar *api.AgentService) bool { + if wanted.Connect != nil && wanted.Connect.SidecarService != nil { + if sidecar == nil { + // consul lost our sidecar (?) + return true + } + if !reflect.DeepEqual(wanted.Connect.SidecarService.Tags, sidecar.Tags) { + // tags on the nomad definition have been modified + return true + } + } + + // There is no connect sidecar the nomad side; let consul anti-entropy worry + // about any registration on the consul side. + return false } // operations are submitted to the main loop via commit() for synchronizing @@ -365,7 +401,8 @@ func (c *ServiceClient) hasSeen() bool { // syncReason indicates why a sync operation with consul is about to happen. // // The trigger for a sync may have implications on the behavior of the sync itself. -// In particular, if a service is defined with enable_tag_override=true +// In particular if a service is defined with enable_tag_override=true, the sync +// should ignore changes to the service's Tags field. type syncReason byte const ( @@ -579,25 +616,20 @@ func (c *ServiceClient) sync(reason syncReason) error { } // Add Nomad services missing from Consul, or where the service has been updated. - for id, local := range c.services { - existingSvc, ok := consulServices[id] - - if ok { - // There is an existing registration of this service in Consul, so here - // we validate to see if the service has been invalidated to see if it - // should be updated. - if !agentServiceUpdateRequired(reason, local, existingSvc) { - // No Need to update services that have not changed - continue + for id, serviceInNomad := range c.services { + + serviceInConsul, exists := consulServices[id] + sidecarInConsul := getNomadSidecar(id, consulServices) + + if !exists || agentServiceUpdateRequired(reason, serviceInNomad, serviceInConsul, sidecarInConsul) { + if err = c.client.ServiceRegister(serviceInNomad); err != nil { + metrics.IncrCounter([]string{"client", "consul", "sync_failure"}, 1) + return err } + sreg++ + metrics.IncrCounter([]string{"client", "consul", "service_registrations"}, 1) } - if err = c.client.ServiceRegister(local); err != nil { - metrics.IncrCounter([]string{"client", "consul", "sync_failure"}, 1) - return err - } - sreg++ - metrics.IncrCounter([]string{"client", "consul", "service_registrations"}, 1) } // Remove Nomad checks in Consul but unknown locally @@ -1318,6 +1350,10 @@ func isOldNomadService(id string) bool { return strings.HasPrefix(id, prefix) } +const ( + sidecarSuffix = "-sidecar-proxy" +) + // isNomadSidecar returns true if the ID matches a sidecar proxy for a Nomad // managed service. // @@ -1330,16 +1366,29 @@ func isOldNomadService(id string) bool { // _nomad-task-5229c7f8-376b-3ccc-edd9-981e238f7033-cache-redis-cache-db-sidecar-proxy // func isNomadSidecar(id string, services map[string]*api.AgentServiceRegistration) bool { - const suffix = "-sidecar-proxy" - if !strings.HasSuffix(id, suffix) { + if !strings.HasSuffix(id, sidecarSuffix) { return false } // Make sure the Nomad managed service for this proxy still exists. - _, ok := services[id[:len(id)-len(suffix)]] + _, ok := services[id[:len(id)-len(sidecarSuffix)]] return ok } +// getNomadSidecar returns the service registration of the sidecar for the managed +// service with the specified id. +// +// If the managed service of the specified id does not exist, or the service does +// not have a sidecar proxy, nil is returned. +func getNomadSidecar(id string, services map[string]*api.AgentService) *api.AgentService { + if _, exists := services[id]; !exists { + return nil + } + + sidecarID := id + sidecarSuffix + return services[sidecarID] +} + // getAddress returns the IP and port to use for a service or check. If no port // label is specified (an empty value), zero values are returned because no // address could be resolved. diff --git a/command/agent/consul/client_test.go b/command/agent/consul/client_test.go index fc96eb00c8d..68bde388c76 100644 --- a/command/agent/consul/client_test.go +++ b/command/agent/consul/client_test.go @@ -1,30 +1,40 @@ package consul import ( + "reflect" "testing" "github.com/hashicorp/consul/api" "github.com/stretchr/testify/require" ) -func TestSyncLogic_agentServiceUpdateRequired(t *testing.T) { - t.Parallel() - - wanted := api.AgentServiceRegistration{ - Kind: "service", - ID: "_id", - Name: "name", +var ( + // the service as known by nomad + wanted = api.AgentServiceRegistration{ + Kind: "", + ID: "aca4c175-1778-5ef4-0220-2ab434147d35", + Name: "myservice", Tags: []string{"a", "b"}, Port: 9000, Address: "1.1.1.1", EnableTagOverride: true, Meta: map[string]string{"foo": "1"}, + Connect: &api.AgentServiceConnect{ + Native: false, + SidecarService: &api.AgentServiceRegistration{ + Kind: "connect-proxy", + ID: "_nomad-task-8e8413af-b5bb-aa67-2c24-c146c45f1ec9-group-mygroup-myservice-9001-sidecar-proxy", + Name: "name-sidecar-proxy", + Tags: []string{"x", "y", "z"}, + }, + }, } - existing := &api.AgentService{ - Kind: "service", - ID: "_id", - Service: "name", + // the service (and + connect proxy) as known by consul + existing = &api.AgentService{ + Kind: "", + ID: "aca4c175-1778-5ef4-0220-2ab434147d35", + Service: "myservice", Tags: []string{"a", "b"}, Port: 9000, Address: "1.1.1.1", @@ -32,6 +42,17 @@ func TestSyncLogic_agentServiceUpdateRequired(t *testing.T) { Meta: map[string]string{"foo": "1"}, } + sidecar = &api.AgentService{ + Kind: "connect-proxy", + ID: "_nomad-task-8e8413af-b5bb-aa67-2c24-c146c45f1ec9-group-mygroup-myservice-9001-sidecar-proxy", + Service: "myservice-sidecar-proxy", + Tags: []string{"x", "y", "z"}, + } +) + +func TestSyncLogic_agentServiceUpdateRequired(t *testing.T) { + t.Parallel() + // By default wanted and existing match. Each test should modify wanted in // 1 way, and / or configure the type of sync operation that is being // considered, then evaluate the result of the update-required algebra. @@ -44,7 +65,7 @@ func TestSyncLogic_agentServiceUpdateRequired(t *testing.T) { exp bool, reason syncReason, tweak tweaker) { - result := agentServiceUpdateRequired(reason, tweak(wanted), existing) + result := agentServiceUpdateRequired(reason, tweak(wanted), existing, sidecar) require.Equal(t, exp, result) } @@ -103,7 +124,7 @@ func TestSyncLogic_agentServiceUpdateRequired(t *testing.T) { }) }) - t.Run("different tags syncNewOps eto->true", func(t *testing.T) { + t.Run("different tags syncNewOps eto=true", func(t *testing.T) { // sync is required even though eto=true, because NewOps indicates the // service definition in nomad has changed (e.g. job run a modified job) try(t, true, syncNewOps, func(w asr) *asr { @@ -112,7 +133,7 @@ func TestSyncLogic_agentServiceUpdateRequired(t *testing.T) { }) }) - t.Run("different tags syncPeriodic eto->true", func(t *testing.T) { + t.Run("different tags syncPeriodic eto=true", func(t *testing.T) { // sync is not required since eto=true and this is a periodic sync // with consul - in which case we keep Consul's definition of the tags try(t, false, syncPeriodic, func(w asr) *asr { @@ -121,11 +142,29 @@ func TestSyncLogic_agentServiceUpdateRequired(t *testing.T) { }) }) + t.Run("different sidecar tags on syncPeriodic eto=true", func(t *testing.T) { + try(t, false, syncPeriodic, func(w asr) *asr { + // like the parent service, the sidecar's tags do not get enforced + // if ETO is true and this is a periodic sync + w.Connect.SidecarService.Tags = []string{"other", "tags"} + return &w + }) + }) + + t.Run("different sidecar tags on syncNewOps eto=true", func(t *testing.T) { + try(t, true, syncNewOps, func(w asr) *asr { + // like the parent service, the sidecar's tags always get enforced + // regardless of ETO if this is a sync due to applied operations + w.Connect.SidecarService.Tags = []string{"other", "tags"} + return &w + }) + }) + // for remaining tests, EnableTagOverride = false wanted.EnableTagOverride = false existing.EnableTagOverride = false - t.Run("different tags : syncPeriodic : eto->false", func(t *testing.T) { + t.Run("different tags syncPeriodic eto=false", func(t *testing.T) { // sync is required because eto=false and the tags do not match try(t, true, syncPeriodic, func(w asr) *asr { w.Tags = []string{"other", "tags"} @@ -133,28 +172,100 @@ func TestSyncLogic_agentServiceUpdateRequired(t *testing.T) { }) }) - t.Run("different tags : syncNewOps : eto->false", func(t *testing.T) { - // sync is required because it was triggered by NewOps and the tags - // do not match + t.Run("different tags syncNewOps eto=false", func(t *testing.T) { + // sync is required because eto=false and the tags do not match try(t, true, syncNewOps, func(w asr) *asr { w.Tags = []string{"other", "tags"} return &w }) }) + + t.Run("different sidecar tags on syncPeriodic eto=false", func(t *testing.T) { + // like the parent service, sync is required because eto=false and the + // sidecar's tags do not match + try(t, true, syncPeriodic, func(w asr) *asr { + w.Connect.SidecarService.Tags = []string{"other", "tags"} + return &w + }) + }) + + t.Run("different sidecar tags syncNewOps eto=false", func(t *testing.T) { + // like the parent service, sync is required because eto=false and the + // sidecar's tags do not match + try(t, true, syncNewOps, func(w asr) *asr { + w.Connect.SidecarService.Tags = []string{"other", "tags"} + return &w + }) + }) } func TestSyncLogic_maybeTweakTags(t *testing.T) { t.Parallel() - r := require.New(t) - - wanted := &api.AgentServiceRegistration{Tags: []string{"original"}} - existing := &api.AgentService{Tags: []string{"other"}} - maybeTweakTags(wanted, existing) - r.Equal([]string{"original"}, wanted.Tags) - - wantedETO := &api.AgentServiceRegistration{Tags: []string{"original"}, EnableTagOverride: true} - existingETO := &api.AgentService{Tags: []string{"other"}, EnableTagOverride: true} - maybeTweakTags(wantedETO, existingETO) - r.Equal(existingETO.Tags, wantedETO.Tags) - r.False(&(existingETO.Tags) == &(wantedETO.Tags)) + + differentPointers := func(a, b []string) bool { + return &(a) != &(b) + } + + try := func(inConsul, inConsulSC []string, eto bool) { + wanted := &api.AgentServiceRegistration{ + Tags: []string{"original"}, + Connect: &api.AgentServiceConnect{ + SidecarService: &api.AgentServiceRegistration{ + Tags: []string{"original-sidecar"}, + }, + }, + EnableTagOverride: eto, + } + + existing := &api.AgentService{Tags: inConsul} + sidecar := &api.AgentService{Tags: inConsulSC} + + maybeTweakTags(wanted, existing, sidecar) + + switch eto { + case false: + require.Equal(t, []string{"original"}, wanted.Tags) + require.Equal(t, []string{"original-sidecar"}, wanted.Connect.SidecarService.Tags) + require.True(t, differentPointers(wanted.Tags, wanted.Connect.SidecarService.Tags)) + case true: + require.Equal(t, inConsul, wanted.Tags) + require.Equal(t, inConsulSC, wanted.Connect.SidecarService.Tags) + require.True(t, differentPointers(wanted.Tags, wanted.Connect.SidecarService.Tags)) + } + } + + try([]string{"original"}, []string{"original-sidecar"}, true) + try([]string{"original"}, []string{"original-sidecar"}, false) + try([]string{"modified"}, []string{"original-sidecar"}, true) + try([]string{"modified"}, []string{"original-sidecar"}, false) + try([]string{"original"}, []string{"modified-sidecar"}, true) + try([]string{"original"}, []string{"modified-sidecar"}, false) + try([]string{"modified"}, []string{"modified-sidecar"}, true) + try([]string{"modified"}, []string{"modified-sidecar"}, false) +} + +func TestSyncLogic_maybeTweakTags_emptySC(t *testing.T) { + t.Parallel() + + // Check the edge cases where the connect service is deleted on the nomad + // side (i.e. are we checking multiple nil pointers). + + try := func(asr *api.AgentServiceRegistration) { + maybeTweakTags(asr, existing, sidecar) + require.False(t, reflect.DeepEqual([]string{"original"}, asr.Tags)) + } + + try(&api.AgentServiceRegistration{ + Tags: []string{"original"}, + EnableTagOverride: true, + Connect: nil, // ooh danger! + }) + + try(&api.AgentServiceRegistration{ + Tags: []string{"original"}, + EnableTagOverride: true, + Connect: &api.AgentServiceConnect{ + SidecarService: nil, // ooh danger! + }, + }) } diff --git a/nomad/structs/services.go b/nomad/structs/services.go index becb143c47b..dbafdfd1164 100644 --- a/nomad/structs/services.go +++ b/nomad/structs/services.go @@ -3,11 +3,13 @@ package structs import ( "crypto/sha1" "fmt" + "hash" "io" "net/url" "reflect" "regexp" "sort" + "strconv" "strings" "time" @@ -462,29 +464,18 @@ func (s *Service) ValidateName(name string) error { // as they're hashed independently. func (s *Service) Hash(allocID, taskName string, canary bool) string { h := sha1.New() - io.WriteString(h, allocID) - io.WriteString(h, taskName) - io.WriteString(h, s.Name) - io.WriteString(h, s.PortLabel) - io.WriteString(h, s.AddressMode) - for _, tag := range s.Tags { - io.WriteString(h, tag) - } - for _, tag := range s.CanaryTags { - io.WriteString(h, tag) - } - if len(s.Meta) > 0 { - fmt.Fprintf(h, "%v", s.Meta) - } - if len(s.CanaryMeta) > 0 { - fmt.Fprintf(h, "%v", s.CanaryMeta) - } - fmt.Fprintf(h, "%t", s.EnableTagOverride) - - // Vary ID on whether or not CanaryTags will be used - if canary { - h.Write([]byte("Canary")) - } + hashString(h, allocID) + hashString(h, taskName) + hashString(h, s.Name) + hashString(h, s.PortLabel) + hashString(h, s.AddressMode) + hashTags(h, s.Tags) + hashTags(h, s.CanaryTags) + hashBool(h, canary, "Canary") + hashBool(h, s.EnableTagOverride, "ETO") + hashMeta(h, s.Meta) + hashMeta(h, s.CanaryMeta) + hashConnect(h, s.Connect) // Base32 is used for encoding the hash as sha1 hashes can always be // encoded without padding, only 4 bytes larger than base64, and saves @@ -493,6 +484,46 @@ func (s *Service) Hash(allocID, taskName string, canary bool) string { return b32.EncodeToString(h.Sum(nil)) } +func hashConnect(h hash.Hash, connect *ConsulConnect) { + if connect != nil && connect.SidecarService != nil { + hashString(h, connect.SidecarService.Port) + hashTags(h, connect.SidecarService.Tags) + if p := connect.SidecarService.Proxy; p != nil { + hashString(h, p.LocalServiceAddress) + hashString(h, strconv.Itoa(p.LocalServicePort)) + hashConfig(h, p.Config) + for _, upstream := range p.Upstreams { + hashString(h, upstream.DestinationName) + hashString(h, strconv.Itoa(upstream.LocalBindPort)) + } + } + } +} + +func hashString(h hash.Hash, s string) { + _, _ = io.WriteString(h, s) +} + +func hashBool(h hash.Hash, b bool, name string) { + if b { + hashString(h, name) + } +} + +func hashTags(h hash.Hash, tags []string) { + for _, tag := range tags { + hashString(h, tag) + } +} + +func hashMeta(h hash.Hash, m map[string]string) { + _, _ = fmt.Fprintf(h, "%v", m) +} + +func hashConfig(h hash.Hash, c map[string]interface{}) { + _, _ = fmt.Fprintf(h, "%v", c) +} + // Equals returns true if the structs are recursively equal. func (s *Service) Equals(o *Service) bool { if s == nil || o == nil { diff --git a/nomad/structs/services_test.go b/nomad/structs/services_test.go index 9e92a7d5d4f..f92773e3011 100644 --- a/nomad/structs/services_test.go +++ b/nomad/structs/services_test.go @@ -8,6 +8,113 @@ import ( "github.com/stretchr/testify/require" ) +func TestService_Hash(t *testing.T) { + t.Parallel() + + original := &Service{ + Name: "myService", + PortLabel: "portLabel", + // AddressMode: "bridge", // not hashed + Tags: []string{"original", "tags"}, + CanaryTags: []string{"canary", "tags"}, + // Checks: nil, // not hashed (managed independently) + Connect: &ConsulConnect{ + // Native: false, // not hashed + SidecarService: &ConsulSidecarService{ + Tags: []string{"original", "sidecar", "tags"}, + Port: "9000", + Proxy: &ConsulProxy{ + LocalServiceAddress: "127.0.0.1", + LocalServicePort: 24000, + Config: map[string]interface{}{"foo": "bar"}, + Upstreams: []ConsulUpstream{{ + DestinationName: "upstream1", + LocalBindPort: 29000, + }}, + }, + }, + // SidecarTask: nil // not hashed + }} + + type svc = Service + type tweaker = func(service *svc) + + hash := func(s *svc, canary bool) string { + return s.Hash("AllocID", "TaskName", canary) + } + + t.Run("matching and is canary", func(t *testing.T) { + require.Equal(t, hash(original, true), hash(original, true)) + }) + + t.Run("matching and is not canary", func(t *testing.T) { + require.Equal(t, hash(original, false), hash(original, false)) + }) + + t.Run("matching mod canary", func(t *testing.T) { + require.NotEqual(t, hash(original, true), hash(original, false)) + }) + + try := func(t *testing.T, tweak tweaker) { + originalHash := hash(original, true) + modifiable := original.Copy() + tweak(modifiable) + tweakedHash := hash(modifiable, true) + require.NotEqual(t, originalHash, tweakedHash) + } + + // these tests use tweaker to modify 1 field and make the false assertion + // on comparing the resulting hash output + + t.Run("mod name", func(t *testing.T) { + try(t, func(s *svc) { s.Name = "newName" }) + }) + + t.Run("mod port label", func(t *testing.T) { + try(t, func(s *svc) { s.PortLabel = "newPortLabel" }) + }) + + t.Run("mod tags", func(t *testing.T) { + try(t, func(s *svc) { s.Tags = []string{"new", "tags"} }) + }) + + t.Run("mod canary tags", func(t *testing.T) { + try(t, func(s *svc) { s.CanaryTags = []string{"new", "tags"} }) + }) + + t.Run("mod enable tag override", func(t *testing.T) { + try(t, func(s *svc) { s.EnableTagOverride = true }) + }) + + t.Run("mod connect sidecar tags", func(t *testing.T) { + try(t, func(s *svc) { s.Connect.SidecarService.Tags = []string{"new", "tags"} }) + }) + + t.Run("mod connect sidecar port", func(t *testing.T) { + try(t, func(s *svc) { s.Connect.SidecarService.Port = "9090" }) + }) + + t.Run("mod connect sidecar proxy local service address", func(t *testing.T) { + try(t, func(s *svc) { s.Connect.SidecarService.Proxy.LocalServiceAddress = "1.1.1.1" }) + }) + + t.Run("mod connect sidecar proxy local service port", func(t *testing.T) { + try(t, func(s *svc) { s.Connect.SidecarService.Proxy.LocalServicePort = 9999 }) + }) + + t.Run("mod connect sidecar proxy config", func(t *testing.T) { + try(t, func(s *svc) { s.Connect.SidecarService.Proxy.Config = map[string]interface{}{"foo": "baz"} }) + }) + + t.Run("mod connect sidecar proxy upstream dest name", func(t *testing.T) { + try(t, func(s *svc) { s.Connect.SidecarService.Proxy.Upstreams[0].DestinationName = "dest2" }) + }) + + t.Run("mod connect sidecar proxy upstream dest local bind port", func(t *testing.T) { + try(t, func(s *svc) { s.Connect.SidecarService.Proxy.Upstreams[0].LocalBindPort = 29999 }) + }) +} + func TestConsulConnect_Validate(t *testing.T) { t.Parallel()