diff --git a/api/services.go b/api/services.go index 6b1220ea367..d371148a926 100644 --- a/api/services.go +++ b/api/services.go @@ -97,17 +97,18 @@ type ServiceCheck struct { // Service represents a Consul service definition. type Service struct { //FIXME Id is unused. Remove? - Id string - Name string - Tags []string - CanaryTags []string `mapstructure:"canary_tags"` - PortLabel string `mapstructure:"port"` - AddressMode string `mapstructure:"address_mode"` - Checks []ServiceCheck - CheckRestart *CheckRestart `mapstructure:"check_restart"` - Connect *ConsulConnect - Meta map[string]string - CanaryMeta map[string]string + Id string + Name string + Tags []string + CanaryTags []string `mapstructure:"canary_tags"` + EnableTagOverride bool `mapstructure:"enable_tag_override"` + PortLabel string `mapstructure:"port"` + AddressMode string `mapstructure:"address_mode"` + Checks []ServiceCheck + CheckRestart *CheckRestart `mapstructure:"check_restart"` + Connect *ConsulConnect + Meta map[string]string + CanaryMeta map[string]string } // Canonicalize the Service by ensuring its name and address mode are set. Task diff --git a/api/services_test.go b/api/services_test.go index 9dc2eb07cdf..bedfefd11c3 100644 --- a/api/services_test.go +++ b/api/services_test.go @@ -5,11 +5,14 @@ import ( "time" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" ) // TestService_CheckRestart asserts Service.CheckRestart settings are properly // inherited by Checks. func TestService_CheckRestart(t *testing.T) { + t.Parallel() + job := &Job{Name: stringToPtr("job")} tg := &TaskGroup{Name: stringToPtr("group")} task := &Task{Name: "task"} @@ -58,6 +61,8 @@ func TestService_CheckRestart(t *testing.T) { // TestService_Connect asserts Service.Connect settings are properly // inherited by Checks. func TestService_Connect(t *testing.T) { + t.Parallel() + job := &Job{Name: stringToPtr("job")} tg := &TaskGroup{Name: stringToPtr("group")} task := &Task{Name: "task"} @@ -83,3 +88,23 @@ func TestService_Connect(t *testing.T) { assert.Equal(t, proxy.Upstreams[0].DestinationName, "upstream") assert.Equal(t, proxy.LocalServicePort, 8000) } + +func TestService_Tags(t *testing.T) { + t.Parallel() + r := require.New(t) + + // canonicalize does not modify eto or tags + job := &Job{Name: stringToPtr("job")} + tg := &TaskGroup{Name: stringToPtr("group")} + task := &Task{Name: "task"} + service := &Service{ + Tags: []string{"a", "b"}, + CanaryTags: []string{"c", "d"}, + EnableTagOverride: true, + } + + service.Canonicalize(task, tg, job) + r.True(service.EnableTagOverride) + r.Equal([]string{"a", "b"}, service.Tags) + r.Equal([]string{"c", "d"}, service.CanaryTags) +} diff --git a/client/allocrunner/groupservice_hook.go b/client/allocrunner/groupservice_hook.go index 39a8c7993ae..0325d1bae21 100644 --- a/client/allocrunner/groupservice_hook.go +++ b/client/allocrunner/groupservice_hook.go @@ -97,6 +97,7 @@ func (h *groupServiceHook) Prerun() error { func (h *groupServiceHook) Update(req *interfaces.RunnerUpdateRequest) error { h.mu.Lock() defer h.mu.Unlock() + oldWorkloadServices := h.getWorkloadServices() // Store new updated values out of request diff --git a/command/agent/consul/catalog_testing.go b/command/agent/consul/catalog_testing.go index 5eb777376f2..61e5e3088b7 100644 --- a/command/agent/consul/catalog_testing.go +++ b/command/agent/consul/catalog_testing.go @@ -204,3 +204,17 @@ func (c *MockAgent) UpdateTTL(id string, output string, status string) error { c.checkTTLs[id]++ return nil } + +// a convenience method for looking up a registered service by name +func (c *MockAgent) lookupService(name string) []*api.AgentServiceRegistration { + c.mu.Lock() + defer c.mu.Unlock() + + var services []*api.AgentServiceRegistration + for _, service := range c.services { + if service.Name == name { + services = append(services, service) + } + } + return services +} diff --git a/command/agent/consul/client.go b/command/agent/consul/client.go index 333f510b525..6b99147bb3a 100644 --- a/command/agent/consul/client.go +++ b/command/agent/consul/client.go @@ -106,14 +106,56 @@ type ACLsAPI interface { TokenList(q *api.QueryOptions) ([]*api.ACLTokenListEntry, *api.QueryMeta, error) } -func agentServiceUpdateRequired(reg *api.AgentServiceRegistration, svc *api.AgentService) bool { - return !(reg.Kind == svc.Kind && - reg.ID == svc.ID && - reg.Port == svc.Port && - reg.Address == svc.Address && - reg.Name == svc.Service && - reflect.DeepEqual(reg.Tags, svc.Tags) && - reflect.DeepEqual(reg.Meta, svc.Meta)) +// agentServiceUpdateRequired checks if any critical fields in Nomad's version +// of a service definition are different from the existing service definition as +// known by Consul. +func agentServiceUpdateRequired(reason syncReason, wanted *api.AgentServiceRegistration, existing *api.AgentService) bool { + switch reason { + case syncPeriodic: + // In a periodic sync with Consul, we need to respect the value of + // the enable_tag_override field so that we maintain the illusion that the + // user is in control of the Consul tags, as they may be externally edited + // via the Consul catalog API (e.g. a user manually sets them). + // + // As Consul does by disabling anti-entropy for the tags field, Nomad will + // ignore differences in the tags field during the periodic syncs with + // the Consul agent API. + // + // We do so by over-writing the nomad service registration by the value + // of the tags that Consul contains, if enable_tag_override = true. + maybeTweakTags(wanted, existing) + return different(wanted, existing) + + default: + // A non-periodic sync with Consul indicates an operation has been set + // on the queue. This happens when service has been added / removed / modified + // and implies the Consul agent should be sync'd with nomad, because + // nomad is the ultimate source of truth for the service definition. + return different(wanted, existing) + } +} + +// maybeTweakTags will override wanted.Tags with a copy of existing.Tags only if +// EnableTagOverride is true. Otherwise the wanted service registration is left +// unchanged. +func maybeTweakTags(wanted *api.AgentServiceRegistration, existing *api.AgentService) { + if wanted.EnableTagOverride { + wanted.Tags = helper.CopySliceString(existing.Tags) + } +} + +// different compares the wanted state of the service registration with the actual +// (cached) state of the service registration reported by Consul. If any of the +// critical fields are not deeply equal, they considered different. +func different(wanted *api.AgentServiceRegistration, existing *api.AgentService) bool { + return !(wanted.Kind == existing.Kind && + wanted.ID == existing.ID && + wanted.Port == existing.Port && + wanted.Address == existing.Address && + wanted.Name == existing.Service && + wanted.EnableTagOverride == existing.EnableTagOverride && + reflect.DeepEqual(wanted.Meta, existing.Meta) && + reflect.DeepEqual(wanted.Tags, existing.Tags)) } // operations are submitted to the main loop via commit() for synchronizing @@ -320,6 +362,18 @@ func (c *ServiceClient) hasSeen() bool { return atomic.LoadInt32(&c.seen) == seen } +// syncReason indicates why a sync operation with consul is about to happen. +// +// The trigger for a sync may have implications on the behavior of the sync itself. +// In particular, if a service is defined with enable_tag_override=true +type syncReason byte + +const ( + syncPeriodic = iota + syncShutdown + syncNewOps +) + // Run the Consul main loop which retries operations against Consul. It should // be called exactly once. func (c *ServiceClient) Run() { @@ -357,16 +411,23 @@ INIT: failures := 0 for { + // On every iteration take note of what the trigger for the next sync + // was, so that it may be referenced during the sync itself. + var reasonForSync syncReason + select { case <-retryTimer.C: + reasonForSync = syncPeriodic case <-c.shutdownCh: + reasonForSync = syncShutdown // Cancel check watcher but sync one last time cancel() case ops := <-c.opCh: + reasonForSync = syncNewOps c.merge(ops) } - if err := c.sync(); err != nil { + if err := c.sync(reasonForSync); err != nil { if failures == 0 { // Log on the first failure c.logger.Warn("failed to update services in Consul", "error", err) @@ -460,7 +521,7 @@ func (c *ServiceClient) merge(ops *operations) { } // sync enqueued operations. -func (c *ServiceClient) sync() error { +func (c *ServiceClient) sync(reason syncReason) error { sreg, creg, sdereg, cdereg := 0, 0, 0, 0 consulServices, err := c.client.Services() @@ -518,20 +579,20 @@ func (c *ServiceClient) sync() error { } // Add Nomad services missing from Consul, or where the service has been updated. - for id, locals := range c.services { + for id, local := range c.services { existingSvc, ok := consulServices[id] if ok { // There is an existing registration of this service in Consul, so here // we validate to see if the service has been invalidated to see if it // should be updated. - if !agentServiceUpdateRequired(locals, existingSvc) { + if !agentServiceUpdateRequired(reason, local, existingSvc) { // No Need to update services that have not changed continue } } - if err = c.client.ServiceRegister(locals); err != nil { + if err = c.client.ServiceRegister(local); err != nil { metrics.IncrCounter([]string{"client", "consul", "sync_failure"}, 1) return err } @@ -746,13 +807,14 @@ func (c *ServiceClient) serviceRegs(ops *operations, service *structs.Service, w // Build the Consul Service registration request serviceReg := &api.AgentServiceRegistration{ - ID: id, - Name: service.Name, - Tags: tags, - Address: ip, - Port: port, - Meta: meta, - Connect: connect, // will be nil if no Connect stanza + ID: id, + Name: service.Name, + Tags: tags, + EnableTagOverride: service.EnableTagOverride, + Address: ip, + Port: port, + Meta: meta, + Connect: connect, // will be nil if no Connect stanza } ops.regServices = append(ops.regServices, serviceReg) @@ -868,8 +930,7 @@ func (c *ServiceClient) RegisterWorkload(workload *WorkloadServices) error { // // DriverNetwork must not change between invocations for the same allocation. func (c *ServiceClient) UpdateWorkload(old, newWorkload *WorkloadServices) error { - ops := &operations{} - + ops := new(operations) regs := new(ServiceRegistrations) regs.Services = make(map[string]*ServiceRegistration, len(newWorkload.Services)) @@ -984,6 +1045,7 @@ func (c *ServiceClient) UpdateWorkload(old, newWorkload *WorkloadServices) error } } } + return nil } diff --git a/command/agent/consul/client_test.go b/command/agent/consul/client_test.go new file mode 100644 index 00000000000..fc96eb00c8d --- /dev/null +++ b/command/agent/consul/client_test.go @@ -0,0 +1,160 @@ +package consul + +import ( + "testing" + + "github.com/hashicorp/consul/api" + "github.com/stretchr/testify/require" +) + +func TestSyncLogic_agentServiceUpdateRequired(t *testing.T) { + t.Parallel() + + wanted := api.AgentServiceRegistration{ + Kind: "service", + ID: "_id", + Name: "name", + Tags: []string{"a", "b"}, + Port: 9000, + Address: "1.1.1.1", + EnableTagOverride: true, + Meta: map[string]string{"foo": "1"}, + } + + existing := &api.AgentService{ + Kind: "service", + ID: "_id", + Service: "name", + Tags: []string{"a", "b"}, + Port: 9000, + Address: "1.1.1.1", + EnableTagOverride: true, + Meta: map[string]string{"foo": "1"}, + } + + // By default wanted and existing match. Each test should modify wanted in + // 1 way, and / or configure the type of sync operation that is being + // considered, then evaluate the result of the update-required algebra. + + type asr = api.AgentServiceRegistration + type tweaker func(w asr) *asr // create a conveniently modifiable copy + + try := func( + t *testing.T, + exp bool, + reason syncReason, + tweak tweaker) { + result := agentServiceUpdateRequired(reason, tweak(wanted), existing) + require.Equal(t, exp, result) + } + + t.Run("matching", func(t *testing.T) { + try(t, false, syncNewOps, func(w asr) *asr { + return &w + }) + }) + + t.Run("different kind", func(t *testing.T) { + try(t, true, syncNewOps, func(w asr) *asr { + w.Kind = "other" + return &w + }) + }) + + t.Run("different id", func(t *testing.T) { + try(t, true, syncNewOps, func(w asr) *asr { + w.ID = "_other" + return &w + }) + }) + + t.Run("different port", func(t *testing.T) { + try(t, true, syncNewOps, func(w asr) *asr { + w.Port = 9001 + return &w + }) + }) + + t.Run("different address", func(t *testing.T) { + try(t, true, syncNewOps, func(w asr) *asr { + w.Address = "2.2.2.2" + return &w + }) + }) + + t.Run("different name", func(t *testing.T) { + try(t, true, syncNewOps, func(w asr) *asr { + w.Name = "bob" + return &w + }) + }) + + t.Run("different enable_tag_override", func(t *testing.T) { + try(t, true, syncNewOps, func(w asr) *asr { + w.EnableTagOverride = false + return &w + }) + }) + + t.Run("different meta", func(t *testing.T) { + try(t, true, syncNewOps, func(w asr) *asr { + w.Meta = map[string]string{"foo": "2"} + return &w + }) + }) + + t.Run("different tags syncNewOps eto->true", func(t *testing.T) { + // sync is required even though eto=true, because NewOps indicates the + // service definition in nomad has changed (e.g. job run a modified job) + try(t, true, syncNewOps, func(w asr) *asr { + w.Tags = []string{"other", "tags"} + return &w + }) + }) + + t.Run("different tags syncPeriodic eto->true", func(t *testing.T) { + // sync is not required since eto=true and this is a periodic sync + // with consul - in which case we keep Consul's definition of the tags + try(t, false, syncPeriodic, func(w asr) *asr { + w.Tags = []string{"other", "tags"} + return &w + }) + }) + + // for remaining tests, EnableTagOverride = false + wanted.EnableTagOverride = false + existing.EnableTagOverride = false + + t.Run("different tags : syncPeriodic : eto->false", func(t *testing.T) { + // sync is required because eto=false and the tags do not match + try(t, true, syncPeriodic, func(w asr) *asr { + w.Tags = []string{"other", "tags"} + return &w + }) + }) + + t.Run("different tags : syncNewOps : eto->false", func(t *testing.T) { + // sync is required because it was triggered by NewOps and the tags + // do not match + try(t, true, syncNewOps, func(w asr) *asr { + w.Tags = []string{"other", "tags"} + return &w + }) + }) +} + +func TestSyncLogic_maybeTweakTags(t *testing.T) { + t.Parallel() + r := require.New(t) + + wanted := &api.AgentServiceRegistration{Tags: []string{"original"}} + existing := &api.AgentService{Tags: []string{"other"}} + maybeTweakTags(wanted, existing) + r.Equal([]string{"original"}, wanted.Tags) + + wantedETO := &api.AgentServiceRegistration{Tags: []string{"original"}, EnableTagOverride: true} + existingETO := &api.AgentService{Tags: []string{"other"}, EnableTagOverride: true} + maybeTweakTags(wantedETO, existingETO) + r.Equal(existingETO.Tags, wantedETO.Tags) + r.False(&(existingETO.Tags) == &(wantedETO.Tags)) +} diff --git a/command/agent/consul/unit_test.go b/command/agent/consul/unit_test.go index a7c8f5d8162..e1e9a77047e 100644 --- a/command/agent/consul/unit_test.go +++ b/command/agent/consul/unit_test.go @@ -2,6 +2,7 @@ package consul import ( "context" + "errors" "fmt" "reflect" "strings" @@ -71,18 +72,34 @@ var errNoOps = fmt.Errorf("testing error: no pending operations") // syncOps simulates one iteration of the ServiceClient.Run loop and returns // any errors returned by sync() or errNoOps if no pending operations. -func (t *testFakeCtx) syncOnce() error { - select { - case ops := <-t.ServiceClient.opCh: - t.ServiceClient.merge(ops) - err := t.ServiceClient.sync() +func (t *testFakeCtx) syncOnce(reason syncReason) error { + switch reason { + + case syncPeriodic: + err := t.ServiceClient.sync(syncPeriodic) if err == nil { t.ServiceClient.clearExplicitlyDeregistered() } return err - default: - return errNoOps + + case syncNewOps: + select { + case ops := <-t.ServiceClient.opCh: + t.ServiceClient.merge(ops) + err := t.ServiceClient.sync(syncNewOps) + if err == nil { + t.ServiceClient.clearExplicitlyDeregistered() + } + return err + default: + return errNoOps + } + + case syncShutdown: + return errors.New("no test for sync due to shutdown") } + + return errors.New("bad sync reason") } // setupFake creates a testFakeCtx with a ServiceClient backed by a fakeConsul. @@ -103,40 +120,83 @@ func setupFake(t *testing.T) *testFakeCtx { } func TestConsul_ChangeTags(t *testing.T) { + t.Parallel() ctx := setupFake(t) - require := require.New(t) + r := require.New(t) - require.NoError(ctx.ServiceClient.RegisterWorkload(ctx.Workload)) - require.NoError(ctx.syncOnce()) - require.Equal(1, len(ctx.FakeConsul.services), "Expected 1 service to be registered with Consul") + r.NoError(ctx.ServiceClient.RegisterWorkload(ctx.Workload)) + r.NoError(ctx.syncOnce(syncNewOps)) + r.Equal(1, len(ctx.FakeConsul.services), "Expected 1 service to be registered with Consul") // Validate the alloc registration reg1, err := ctx.ServiceClient.AllocRegistrations(ctx.Workload.AllocID) - require.NoError(err) - require.NotNil(reg1, "Unexpected nil alloc registration") - require.Equal(1, reg1.NumServices()) - require.Equal(0, reg1.NumChecks()) + r.NoError(err) + r.NotNil(reg1, "Unexpected nil alloc registration") + r.Equal(1, reg1.NumServices()) + r.Equal(0, reg1.NumChecks()) - for _, v := range ctx.FakeConsul.services { - require.Equal(v.Name, ctx.Workload.Services[0].Name) - require.Equal(v.Tags, ctx.Workload.Services[0].Tags) - } + serviceBefore := ctx.FakeConsul.lookupService("taskname-service")[0] + r.Equal(serviceBefore.Name, ctx.Workload.Services[0].Name) + r.Equal(serviceBefore.Tags, ctx.Workload.Services[0].Tags) // Update the task definition origWorkload := ctx.Workload.Copy() - ctx.Workload.Services[0].Tags[0] = "newtag" + ctx.Workload.Services[0].Tags[0] = "new-tag" // Register and sync the update - require.NoError(ctx.ServiceClient.UpdateWorkload(origWorkload, ctx.Workload)) - require.NoError(ctx.syncOnce()) - require.Equal(1, len(ctx.FakeConsul.services), "Expected 1 service to be registered with Consul") + r.NoError(ctx.ServiceClient.UpdateWorkload(origWorkload, ctx.Workload)) + r.NoError(ctx.syncOnce(syncNewOps)) + r.Equal(1, len(ctx.FakeConsul.services), "Expected 1 service to be registered with Consul") + + // Validate the consul service definition changed + serviceAfter := ctx.FakeConsul.lookupService("taskname-service")[0] + r.Equal(serviceAfter.Name, ctx.Workload.Services[0].Name) + r.Equal(serviceAfter.Tags, ctx.Workload.Services[0].Tags) + r.Equal("new-tag", serviceAfter.Tags[0]) +} - // Validate the metadata changed - for _, v := range ctx.FakeConsul.services { - require.Equal(v.Name, ctx.Workload.Services[0].Name) - require.Equal(v.Tags, ctx.Workload.Services[0].Tags) - require.Equal("newtag", v.Tags[0]) - } +func TestConsul_EnableTagOverride_Syncs(t *testing.T) { + t.Parallel() + ctx := setupFake(t) + r := require.New(t) + + // Configure our test service to set EnableTagOverride = true + ctx.Workload.Services[0].EnableTagOverride = true + + r.NoError(ctx.ServiceClient.RegisterWorkload(ctx.Workload)) + r.NoError(ctx.syncOnce(syncNewOps)) + r.Equal(1, len(ctx.FakeConsul.services)) + + // Validate the alloc registration + reg1, err := ctx.ServiceClient.AllocRegistrations(ctx.Workload.AllocID) + r.NoError(err) + r.NotNil(reg1) + r.Equal(1, reg1.NumServices()) + r.Equal(0, reg1.NumChecks()) + + const service = "taskname-service" + + // sanity check things are what we expect + consulServiceDefBefore := ctx.FakeConsul.lookupService(service)[0] + r.Equal(ctx.Workload.Services[0].Name, consulServiceDefBefore.Name) + r.Equal([]string{"tag1", "tag2"}, consulServiceDefBefore.Tags) + r.True(consulServiceDefBefore.EnableTagOverride) + + // manually set the tags in consul + ctx.FakeConsul.lookupService(service)[0].Tags = []string{"new", "tags"} + + // do a periodic sync (which will respect EnableTagOverride) + r.NoError(ctx.syncOnce(syncPeriodic)) + r.Equal(1, len(ctx.FakeConsul.services)) + consulServiceDefAfter := ctx.FakeConsul.lookupService(service)[0] + r.Equal([]string{"new", "tags"}, consulServiceDefAfter.Tags) // manually set tags should still be there + + // now do a new-ops sync (which will override EnableTagOverride) + r.NoError(ctx.ServiceClient.RegisterWorkload(ctx.Workload)) + r.NoError(ctx.syncOnce(syncNewOps)) + r.Equal(1, len(ctx.FakeConsul.services)) + consulServiceDefUpdated := ctx.FakeConsul.lookupService(service)[0] + r.Equal([]string{"tag1", "tag2"}, consulServiceDefUpdated.Tags) // jobspec tags should be set now } // TestConsul_ChangePorts asserts that changing the ports on a service updates @@ -172,7 +232,7 @@ func TestConsul_ChangePorts(t *testing.T) { } require.NoError(ctx.ServiceClient.RegisterWorkload(ctx.Workload)) - require.NoError(ctx.syncOnce()) + require.NoError(ctx.syncOnce(syncNewOps)) require.Equal(1, len(ctx.FakeConsul.services), "Expected 1 service to be registered with Consul") for _, v := range ctx.FakeConsul.services { @@ -234,7 +294,7 @@ func TestConsul_ChangePorts(t *testing.T) { } require.NoError(ctx.ServiceClient.UpdateWorkload(origWorkload, ctx.Workload)) - require.NoError(ctx.syncOnce()) + require.NoError(ctx.syncOnce(syncNewOps)) require.Equal(1, len(ctx.FakeConsul.services), "Expected 1 service to be registered with Consul") for _, v := range ctx.FakeConsul.services { @@ -284,7 +344,7 @@ func TestConsul_ChangeChecks(t *testing.T) { t.Fatalf("unexpected error registering task: %v", err) } - if err := ctx.syncOnce(); err != nil { + if err := ctx.syncOnce(syncNewOps); err != nil { t.Fatalf("unexpected error syncing task: %v", err) } @@ -380,7 +440,7 @@ func TestConsul_ChangeChecks(t *testing.T) { c1ID, upd.remove, upd.checkID) } - if err := ctx.syncOnce(); err != nil { + if err := ctx.syncOnce(syncNewOps); err != nil { t.Fatalf("unexpected error syncing task: %v", err) } @@ -474,7 +534,7 @@ func TestConsul_ChangeChecks(t *testing.T) { if err := ctx.ServiceClient.UpdateWorkload(origWorkload, ctx.Workload); err != nil { t.Fatalf("unexpected error registering task: %v", err) } - if err := ctx.syncOnce(); err != nil { + if err := ctx.syncOnce(syncNewOps); err != nil { t.Fatalf("unexpected error syncing task: %v", err) } @@ -518,7 +578,7 @@ func TestConsul_RegServices(t *testing.T) { t.Fatalf("unexpected error registering task: %v", err) } - if err := ctx.syncOnce(); err != nil { + if err := ctx.syncOnce(syncNewOps); err != nil { t.Fatalf("unexpected error syncing task: %v", err) } @@ -582,7 +642,7 @@ func TestConsul_RegServices(t *testing.T) { } // Now sync() and re-check for the applied updates - if err := ctx.syncOnce(); err != nil { + if err := ctx.syncOnce(syncNewOps); err != nil { t.Fatalf("unexpected error syncing task: %v", err) } if n := len(ctx.FakeConsul.services); n != 2 { @@ -606,7 +666,7 @@ func TestConsul_RegServices(t *testing.T) { // Remove the new task ctx.ServiceClient.RemoveWorkload(ctx.Workload) - if err := ctx.syncOnce(); err != nil { + if err := ctx.syncOnce(syncNewOps); err != nil { t.Fatalf("unexpected error syncing task: %v", err) } if n := len(ctx.FakeConsul.services); n != 1 { @@ -799,7 +859,7 @@ func TestConsul_DriverNetwork_AutoUse(t *testing.T) { t.Fatalf("unexpected error registering task: %v", err) } - if err := ctx.syncOnce(); err != nil { + if err := ctx.syncOnce(syncNewOps); err != nil { t.Fatalf("unexpected error syncing task: %v", err) } @@ -903,7 +963,7 @@ func TestConsul_DriverNetwork_NoAutoUse(t *testing.T) { t.Fatalf("unexpected error registering task: %v", err) } - if err := ctx.syncOnce(); err != nil { + if err := ctx.syncOnce(syncNewOps); err != nil { t.Fatalf("unexpected error syncing task: %v", err) } @@ -964,7 +1024,7 @@ func TestConsul_DriverNetwork_Change(t *testing.T) { } syncAndAssertPort := func(port int) { - if err := ctx.syncOnce(); err != nil { + if err := ctx.syncOnce(syncNewOps); err != nil { t.Fatalf("unexpected error syncing task: %v", err) } @@ -1024,7 +1084,7 @@ func TestConsul_CanaryTags(t *testing.T) { ctx.Workload.Services[0].CanaryTags = canaryTags require.NoError(ctx.ServiceClient.RegisterWorkload(ctx.Workload)) - require.NoError(ctx.syncOnce()) + require.NoError(ctx.syncOnce(syncNewOps)) require.Len(ctx.FakeConsul.services, 1) for _, service := range ctx.FakeConsul.services { require.Equal(canaryTags, service.Tags) @@ -1034,14 +1094,14 @@ func TestConsul_CanaryTags(t *testing.T) { origWorkload := ctx.Workload.Copy() ctx.Workload.Canary = false require.NoError(ctx.ServiceClient.UpdateWorkload(origWorkload, ctx.Workload)) - require.NoError(ctx.syncOnce()) + require.NoError(ctx.syncOnce(syncNewOps)) require.Len(ctx.FakeConsul.services, 1) for _, service := range ctx.FakeConsul.services { require.NotEqual(canaryTags, service.Tags) } ctx.ServiceClient.RemoveWorkload(ctx.Workload) - require.NoError(ctx.syncOnce()) + require.NoError(ctx.syncOnce(syncNewOps)) require.Len(ctx.FakeConsul.services, 0) } @@ -1057,7 +1117,7 @@ func TestConsul_CanaryTags_NoTags(t *testing.T) { ctx.Workload.Services[0].Tags = tags require.NoError(ctx.ServiceClient.RegisterWorkload(ctx.Workload)) - require.NoError(ctx.syncOnce()) + require.NoError(ctx.syncOnce(syncNewOps)) require.Len(ctx.FakeConsul.services, 1) for _, service := range ctx.FakeConsul.services { require.Equal(tags, service.Tags) @@ -1067,14 +1127,14 @@ func TestConsul_CanaryTags_NoTags(t *testing.T) { origWorkload := ctx.Workload.Copy() ctx.Workload.Canary = false require.NoError(ctx.ServiceClient.UpdateWorkload(origWorkload, ctx.Workload)) - require.NoError(ctx.syncOnce()) + require.NoError(ctx.syncOnce(syncNewOps)) require.Len(ctx.FakeConsul.services, 1) for _, service := range ctx.FakeConsul.services { require.Equal(tags, service.Tags) } ctx.ServiceClient.RemoveWorkload(ctx.Workload) - require.NoError(ctx.syncOnce()) + require.NoError(ctx.syncOnce(syncNewOps)) require.Len(ctx.FakeConsul.services, 0) } @@ -1090,7 +1150,7 @@ func TestConsul_CanaryMeta(t *testing.T) { ctx.Workload.Services[0].CanaryMeta = canaryMeta require.NoError(ctx.ServiceClient.RegisterWorkload(ctx.Workload)) - require.NoError(ctx.syncOnce()) + require.NoError(ctx.syncOnce(syncNewOps)) require.Len(ctx.FakeConsul.services, 1) for _, service := range ctx.FakeConsul.services { require.Equal(canaryMeta, service.Meta) @@ -1100,14 +1160,14 @@ func TestConsul_CanaryMeta(t *testing.T) { origWorkload := ctx.Workload.Copy() ctx.Workload.Canary = false require.NoError(ctx.ServiceClient.UpdateWorkload(origWorkload, ctx.Workload)) - require.NoError(ctx.syncOnce()) + require.NoError(ctx.syncOnce(syncNewOps)) require.Len(ctx.FakeConsul.services, 1) for _, service := range ctx.FakeConsul.services { require.NotEqual(canaryMeta, service.Meta) } ctx.ServiceClient.RemoveWorkload(ctx.Workload) - require.NoError(ctx.syncOnce()) + require.NoError(ctx.syncOnce(syncNewOps)) require.Len(ctx.FakeConsul.services, 0) } @@ -1124,7 +1184,7 @@ func TestConsul_CanaryMeta_NoMeta(t *testing.T) { ctx.Workload.Services[0].Meta = meta require.NoError(ctx.ServiceClient.RegisterWorkload(ctx.Workload)) - require.NoError(ctx.syncOnce()) + require.NoError(ctx.syncOnce(syncNewOps)) require.Len(ctx.FakeConsul.services, 1) for _, service := range ctx.FakeConsul.services { require.Equal(meta, service.Meta) @@ -1134,14 +1194,14 @@ func TestConsul_CanaryMeta_NoMeta(t *testing.T) { origWorkload := ctx.Workload.Copy() ctx.Workload.Canary = false require.NoError(ctx.ServiceClient.UpdateWorkload(origWorkload, ctx.Workload)) - require.NoError(ctx.syncOnce()) + require.NoError(ctx.syncOnce(syncNewOps)) require.Len(ctx.FakeConsul.services, 1) for _, service := range ctx.FakeConsul.services { require.Equal(meta, service.Meta) } ctx.ServiceClient.RemoveWorkload(ctx.Workload) - require.NoError(ctx.syncOnce()) + require.NoError(ctx.syncOnce(syncNewOps)) require.Len(ctx.FakeConsul.services, 0) } @@ -1509,7 +1569,7 @@ func TestConsul_ServiceName_Duplicates(t *testing.T) { require.NoError(ctx.ServiceClient.RegisterWorkload(ctx.Workload)) - require.NoError(ctx.syncOnce()) + require.NoError(ctx.syncOnce(syncNewOps)) require.Len(ctx.FakeConsul.services, 3) @@ -1554,7 +1614,7 @@ func TestConsul_ServiceDeregistration_OutProbation(t *testing.T) { remainingWorkload.Name(), remainingWorkload.Services[0]) require.NoError(ctx.ServiceClient.RegisterWorkload(remainingWorkload)) - require.NoError(ctx.syncOnce()) + require.NoError(ctx.syncOnce(syncNewOps)) require.Len(ctx.FakeConsul.services, 1) require.Len(ctx.FakeConsul.checks, 1) @@ -1578,7 +1638,7 @@ func TestConsul_ServiceDeregistration_OutProbation(t *testing.T) { require.NoError(ctx.ServiceClient.RegisterWorkload(explicitlyRemovedWorkload)) - require.NoError(ctx.syncOnce()) + require.NoError(ctx.syncOnce(syncNewOps)) require.Len(ctx.FakeConsul.services, 2) require.Len(ctx.FakeConsul.checks, 2) @@ -1602,7 +1662,7 @@ func TestConsul_ServiceDeregistration_OutProbation(t *testing.T) { outofbandWorkload.Name(), outofbandWorkload.Services[0]) require.NoError(ctx.ServiceClient.RegisterWorkload(outofbandWorkload)) - require.NoError(ctx.syncOnce()) + require.NoError(ctx.syncOnce(syncNewOps)) require.Len(ctx.FakeConsul.services, 3) @@ -1619,8 +1679,8 @@ func TestConsul_ServiceDeregistration_OutProbation(t *testing.T) { // Sync and ensure that explicitly removed service as well as outofbandWorkload were removed ctx.ServiceClient.RemoveWorkload(explicitlyRemovedWorkload) - require.NoError(ctx.syncOnce()) - require.NoError(ctx.ServiceClient.sync()) + require.NoError(ctx.syncOnce(syncNewOps)) + require.NoError(ctx.ServiceClient.sync(syncNewOps)) require.Len(ctx.FakeConsul.services, 1) require.Len(ctx.FakeConsul.checks, 1) @@ -1663,7 +1723,7 @@ func TestConsul_ServiceDeregistration_InProbation(t *testing.T) { remainingWorkload.Name(), remainingWorkload.Services[0]) require.NoError(ctx.ServiceClient.RegisterWorkload(remainingWorkload)) - require.NoError(ctx.syncOnce()) + require.NoError(ctx.syncOnce(syncNewOps)) require.Len(ctx.FakeConsul.services, 1) require.Len(ctx.FakeConsul.checks, 1) @@ -1687,7 +1747,7 @@ func TestConsul_ServiceDeregistration_InProbation(t *testing.T) { require.NoError(ctx.ServiceClient.RegisterWorkload(explicitlyRemovedWorkload)) - require.NoError(ctx.syncOnce()) + require.NoError(ctx.syncOnce(syncNewOps)) require.Len(ctx.FakeConsul.services, 2) require.Len(ctx.FakeConsul.checks, 2) @@ -1711,7 +1771,7 @@ func TestConsul_ServiceDeregistration_InProbation(t *testing.T) { outofbandWorkload.Name(), outofbandWorkload.Services[0]) require.NoError(ctx.ServiceClient.RegisterWorkload(outofbandWorkload)) - require.NoError(ctx.syncOnce()) + require.NoError(ctx.syncOnce(syncNewOps)) require.Len(ctx.FakeConsul.services, 3) @@ -1728,8 +1788,8 @@ func TestConsul_ServiceDeregistration_InProbation(t *testing.T) { // Sync and ensure that explicitly removed service was removed, but outofbandWorkload remains ctx.ServiceClient.RemoveWorkload(explicitlyRemovedWorkload) - require.NoError(ctx.syncOnce()) - require.NoError(ctx.ServiceClient.sync()) + require.NoError(ctx.syncOnce(syncNewOps)) + require.NoError(ctx.ServiceClient.sync(syncNewOps)) require.Len(ctx.FakeConsul.services, 2) require.Len(ctx.FakeConsul.checks, 2) @@ -1744,7 +1804,7 @@ func TestConsul_ServiceDeregistration_InProbation(t *testing.T) { // after probation, outofband services and checks are removed ctx.ServiceClient.deregisterProbationExpiry = time.Now().Add(-1 * time.Hour) - require.NoError(ctx.ServiceClient.sync()) + require.NoError(ctx.ServiceClient.sync(syncNewOps)) require.Len(ctx.FakeConsul.services, 1) require.Len(ctx.FakeConsul.checks, 1) diff --git a/command/agent/job_endpoint.go b/command/agent/job_endpoint.go index 56e7d6d3ce0..9f5b4b1ed38 100644 --- a/command/agent/job_endpoint.go +++ b/command/agent/job_endpoint.go @@ -829,13 +829,14 @@ func ApiTaskToStructsTask(apiTask *api.Task, structsTask *structs.Task) { structsTask.Services = make([]*structs.Service, l) for i, service := range apiTask.Services { structsTask.Services[i] = &structs.Service{ - Name: service.Name, - PortLabel: service.PortLabel, - Tags: service.Tags, - CanaryTags: service.CanaryTags, - AddressMode: service.AddressMode, - Meta: helper.CopyMapStringString(service.Meta), - CanaryMeta: helper.CopyMapStringString(service.CanaryMeta), + Name: service.Name, + PortLabel: service.PortLabel, + Tags: service.Tags, + CanaryTags: service.CanaryTags, + EnableTagOverride: service.EnableTagOverride, + AddressMode: service.AddressMode, + Meta: helper.CopyMapStringString(service.Meta), + CanaryMeta: helper.CopyMapStringString(service.CanaryMeta), } if l := len(service.Checks); l != 0 { @@ -1008,13 +1009,14 @@ func ApiServicesToStructs(in []*api.Service) []*structs.Service { out := make([]*structs.Service, len(in)) for i, s := range in { out[i] = &structs.Service{ - Name: s.Name, - PortLabel: s.PortLabel, - Tags: s.Tags, - CanaryTags: s.CanaryTags, - AddressMode: s.AddressMode, - Meta: helper.CopyMapStringString(s.Meta), - CanaryMeta: helper.CopyMapStringString(s.CanaryMeta), + Name: s.Name, + PortLabel: s.PortLabel, + Tags: s.Tags, + CanaryTags: s.CanaryTags, + EnableTagOverride: s.EnableTagOverride, + AddressMode: s.AddressMode, + Meta: helper.CopyMapStringString(s.Meta), + CanaryMeta: helper.CopyMapStringString(s.CanaryMeta), } if l := len(s.Checks); l != 0 { diff --git a/command/agent/job_endpoint_test.go b/command/agent/job_endpoint_test.go index 41d08ca0ae4..19587961b67 100644 --- a/command/agent/job_endpoint_test.go +++ b/command/agent/job_endpoint_test.go @@ -1503,10 +1503,11 @@ func TestJobs_ApiJobToStructsJob(t *testing.T) { }, Services: []*api.Service{ { - Name: "groupserviceA", - Tags: []string{"a", "b"}, - CanaryTags: []string{"d", "e"}, - PortLabel: "1234", + Name: "groupserviceA", + Tags: []string{"a", "b"}, + CanaryTags: []string{"d", "e"}, + EnableTagOverride: true, + PortLabel: "1234", Meta: map[string]string{ "servicemeta": "foobar", }, @@ -1576,11 +1577,12 @@ func TestJobs_ApiJobToStructsJob(t *testing.T) { Services: []*api.Service{ { - Id: "id", - Name: "serviceA", - Tags: []string{"1", "2"}, - CanaryTags: []string{"3", "4"}, - PortLabel: "foo", + Id: "id", + Name: "serviceA", + Tags: []string{"1", "2"}, + CanaryTags: []string{"3", "4"}, + EnableTagOverride: true, + PortLabel: "foo", Meta: map[string]string{ "servicemeta": "foobar", }, @@ -1854,11 +1856,12 @@ func TestJobs_ApiJobToStructsJob(t *testing.T) { }, Services: []*structs.Service{ { - Name: "groupserviceA", - Tags: []string{"a", "b"}, - CanaryTags: []string{"d", "e"}, - PortLabel: "1234", - AddressMode: "auto", + Name: "groupserviceA", + Tags: []string{"a", "b"}, + CanaryTags: []string{"d", "e"}, + EnableTagOverride: true, + PortLabel: "1234", + AddressMode: "auto", Meta: map[string]string{ "servicemeta": "foobar", }, @@ -1923,11 +1926,12 @@ func TestJobs_ApiJobToStructsJob(t *testing.T) { }, Services: []*structs.Service{ { - Name: "serviceA", - Tags: []string{"1", "2"}, - CanaryTags: []string{"3", "4"}, - PortLabel: "foo", - AddressMode: "auto", + Name: "serviceA", + Tags: []string{"1", "2"}, + CanaryTags: []string{"3", "4"}, + EnableTagOverride: true, + PortLabel: "foo", + AddressMode: "auto", Meta: map[string]string{ "servicemeta": "foobar", }, diff --git a/jobspec/parse_service.go b/jobspec/parse_service.go index a974527f009..e44754a301a 100644 --- a/jobspec/parse_service.go +++ b/jobspec/parse_service.go @@ -41,6 +41,7 @@ func parseService(o *ast.ObjectItem) (*api.Service, error) { "name", "tags", "canary_tags", + "enable_tag_override", "port", "check", "address_mode", diff --git a/jobspec/parse_test.go b/jobspec/parse_test.go index 91e63a7766a..08072e3e9c6 100644 --- a/jobspec/parse_test.go +++ b/jobspec/parse_test.go @@ -823,6 +823,25 @@ func TestParse(t *testing.T) { }, false, }, + { + "service-enable-tag-override.hcl", + &api.Job{ + ID: helper.StringToPtr("service_eto"), + Name: helper.StringToPtr("service_eto"), + Type: helper.StringToPtr("service"), + TaskGroups: []*api.TaskGroup{{ + Name: helper.StringToPtr("group"), + Tasks: []*api.Task{{ + Name: "task", + Services: []*api.Service{{ + Name: "example", + EnableTagOverride: true, + }}, + }}, + }}, + }, + false, + }, { "reschedule-job.hcl", &api.Job{ @@ -1046,6 +1065,21 @@ func TestParse(t *testing.T) { }, false, }, + { + "tg-service-enable-tag-override.hcl", + &api.Job{ + ID: helper.StringToPtr("group_service_eto"), + Name: helper.StringToPtr("group_service_eto"), + TaskGroups: []*api.TaskGroup{{ + Name: helper.StringToPtr("group"), + Services: []*api.Service{{ + Name: "example", + EnableTagOverride: true, + }}, + }}, + }, + false, + }, } for _, tc := range cases { diff --git a/jobspec/test-fixtures/service-enable-tag-override.hcl b/jobspec/test-fixtures/service-enable-tag-override.hcl new file mode 100644 index 00000000000..9143b8c34cd --- /dev/null +++ b/jobspec/test-fixtures/service-enable-tag-override.hcl @@ -0,0 +1,11 @@ +job "service_eto" { + type = "service" + group "group" { + task "task" { + service { + name = "example" + enable_tag_override = true + } + } + } +} diff --git a/jobspec/test-fixtures/tg-service-enable-tag-override.hcl b/jobspec/test-fixtures/tg-service-enable-tag-override.hcl new file mode 100644 index 00000000000..a78ff66f995 --- /dev/null +++ b/jobspec/test-fixtures/tg-service-enable-tag-override.hcl @@ -0,0 +1,8 @@ +job "group_service_eto" { + group "group" { + service { + name = "example" + enable_tag_override = true + } + } +} diff --git a/nomad/structs/diff_test.go b/nomad/structs/diff_test.go index 0e80a594616..3273fedede0 100644 --- a/nomad/structs/diff_test.go +++ b/nomad/structs/diff_test.go @@ -4,6 +4,8 @@ import ( "reflect" "testing" "time" + + "github.com/stretchr/testify/require" ) func TestJobDiff(t *testing.T) { @@ -1201,20 +1203,22 @@ func TestJobDiff(t *testing.T) { func TestTaskGroupDiff(t *testing.T) { cases := []struct { + TestCase string Old, New *TaskGroup Expected *TaskGroupDiff - Error bool + ExpErr bool Contextual bool }{ { - Old: nil, - New: nil, + TestCase: "Empty", + Old: nil, + New: nil, Expected: &TaskGroupDiff{ Type: DiffTypeNone, }, }, { - // Primitive only that has different names + TestCase: "Primitive only that has different names", Old: &TaskGroup{ Name: "foo", Count: 10, @@ -1229,10 +1233,10 @@ func TestTaskGroupDiff(t *testing.T) { "foo": "bar", }, }, - Error: true, + ExpErr: true, }, { - // Primitive only that is the same + TestCase: "Primitive only that is the same", Old: &TaskGroup{ Name: "foo", Count: 10, @@ -1253,7 +1257,7 @@ func TestTaskGroupDiff(t *testing.T) { }, }, { - // Primitive only that has diffs + TestCase: "Primitive only that has diffs", Old: &TaskGroup{ Name: "foo", Count: 10, @@ -1288,7 +1292,7 @@ func TestTaskGroupDiff(t *testing.T) { }, }, { - // Map diff + TestCase: "Map diff", Old: &TaskGroup{ Meta: map[string]string{ "foo": "foo", @@ -1320,7 +1324,7 @@ func TestTaskGroupDiff(t *testing.T) { }, }, { - // Constraints edited + TestCase: "Constraints edited", Old: &TaskGroup{ Constraints: []*Constraint{ { @@ -1408,7 +1412,7 @@ func TestTaskGroupDiff(t *testing.T) { }, }, { - // Affinities edited + TestCase: "Affinities edited", Old: &TaskGroup{ Affinities: []*Affinity{ { @@ -1512,8 +1516,8 @@ func TestTaskGroupDiff(t *testing.T) { }, }, { - // RestartPolicy added - Old: &TaskGroup{}, + TestCase: "RestartPolicy added", + Old: &TaskGroup{}, New: &TaskGroup{ RestartPolicy: &RestartPolicy{ Attempts: 1, @@ -1559,7 +1563,7 @@ func TestTaskGroupDiff(t *testing.T) { }, }, { - // RestartPolicy deleted + TestCase: "RestartPolicy deleted", Old: &TaskGroup{ RestartPolicy: &RestartPolicy{ Attempts: 1, @@ -1606,7 +1610,7 @@ func TestTaskGroupDiff(t *testing.T) { }, }, { - // RestartPolicy edited + TestCase: "RestartPolicy edited", Old: &TaskGroup{ RestartPolicy: &RestartPolicy{ Attempts: 1, @@ -1660,7 +1664,7 @@ func TestTaskGroupDiff(t *testing.T) { }, }, { - // RestartPolicy edited with context + TestCase: "RestartPolicy edited with context", Contextual: true, Old: &TaskGroup{ RestartPolicy: &RestartPolicy{ @@ -1715,8 +1719,8 @@ func TestTaskGroupDiff(t *testing.T) { }, }, { - // ReschedulePolicy added - Old: &TaskGroup{}, + TestCase: "ReschedulePolicy added", + Old: &TaskGroup{}, New: &TaskGroup{ ReschedulePolicy: &ReschedulePolicy{ Attempts: 1, @@ -1776,7 +1780,7 @@ func TestTaskGroupDiff(t *testing.T) { }, }, { - // ReschedulePolicy deleted + TestCase: "ReschedulePolicy deleted", Old: &TaskGroup{ ReschedulePolicy: &ReschedulePolicy{ Attempts: 1, @@ -1837,7 +1841,7 @@ func TestTaskGroupDiff(t *testing.T) { }, }, { - // ReschedulePolicy edited + TestCase: "ReschedulePolicy edited", Old: &TaskGroup{ ReschedulePolicy: &ReschedulePolicy{ Attempts: 1, @@ -1899,8 +1903,9 @@ func TestTaskGroupDiff(t *testing.T) { }, }, }, - }, { - // ReschedulePolicy edited with context + }, + { + TestCase: "ReschedulePolicy edited with context", Contextual: true, Old: &TaskGroup{ ReschedulePolicy: &ReschedulePolicy{ @@ -1963,7 +1968,7 @@ func TestTaskGroupDiff(t *testing.T) { }, }, { - // Update strategy deleted + TestCase: "Update strategy deleted", Old: &TaskGroup{ Update: &UpdateStrategy{ AutoRevert: true, @@ -2025,8 +2030,8 @@ func TestTaskGroupDiff(t *testing.T) { }, }, { - // Update strategy added - Old: &TaskGroup{}, + TestCase: "Update strategy added", + Old: &TaskGroup{}, New: &TaskGroup{ Update: &UpdateStrategy{ AutoRevert: true, @@ -2087,7 +2092,7 @@ func TestTaskGroupDiff(t *testing.T) { }, }, { - // Update strategy edited + TestCase: "Update strategy edited", Old: &TaskGroup{ Update: &UpdateStrategy{ MaxParallel: 5, @@ -2173,7 +2178,7 @@ func TestTaskGroupDiff(t *testing.T) { }, }, { - // Update strategy edited with context + TestCase: "Update strategy edited with context", Contextual: true, Old: &TaskGroup{ Update: &UpdateStrategy{ @@ -2260,8 +2265,8 @@ func TestTaskGroupDiff(t *testing.T) { }, }, { - // EphemeralDisk added - Old: &TaskGroup{}, + TestCase: "EphemeralDisk added", + Old: &TaskGroup{}, New: &TaskGroup{ EphemeralDisk: &EphemeralDisk{ Migrate: true, @@ -2300,7 +2305,7 @@ func TestTaskGroupDiff(t *testing.T) { }, }, { - // EphemeralDisk deleted + TestCase: "EphemeralDisk deleted", Old: &TaskGroup{ EphemeralDisk: &EphemeralDisk{ Migrate: true, @@ -2340,7 +2345,7 @@ func TestTaskGroupDiff(t *testing.T) { }, }, { - // EphemeralDisk edited + TestCase: "EphemeralDisk edited", Old: &TaskGroup{ EphemeralDisk: &EphemeralDisk{ Migrate: true, @@ -2387,7 +2392,7 @@ func TestTaskGroupDiff(t *testing.T) { }, }, { - // EphemeralDisk edited with context + TestCase: "EphemeralDisk edited with context", Contextual: true, Old: &TaskGroup{ EphemeralDisk: &EphemeralDisk{ @@ -2433,14 +2438,14 @@ func TestTaskGroupDiff(t *testing.T) { }, }, }, - { - // TaskGroup Services edited + TestCase: "TaskGroup Services edited", Contextual: true, Old: &TaskGroup{ Services: []*Service{ { - Name: "foo", + Name: "foo", + EnableTagOverride: false, Checks: []*ServiceCheck{ { Name: "foo", @@ -2472,7 +2477,8 @@ func TestTaskGroupDiff(t *testing.T) { New: &TaskGroup{ Services: []*Service{ { - Name: "foo", + Name: "foo", + EnableTagOverride: true, Checks: []*ServiceCheck{ { Name: "foo", @@ -2522,6 +2528,12 @@ func TestTaskGroupDiff(t *testing.T) { Old: "", New: "", }, + { + Type: DiffTypeEdited, + Name: "EnableTagOverride", + Old: "false", + New: "true", + }, { Type: DiffTypeNone, Name: "Name", @@ -2771,7 +2783,7 @@ func TestTaskGroupDiff(t *testing.T) { }, }, { - // TaskGroup Networks edited + TestCase: "TaskGroup Networks edited", Contextual: true, Old: &TaskGroup{ Networks: Networks{ @@ -2894,7 +2906,7 @@ func TestTaskGroupDiff(t *testing.T) { }, }, { - // Tasks edited + TestCase: "Tasks edited", Old: &TaskGroup{ Tasks: []*Task{ { @@ -3012,21 +3024,19 @@ func TestTaskGroupDiff(t *testing.T) { } for i, c := range cases { - actual, err := c.Old.Diff(c.New, c.Contextual) - if c.Error && err == nil { - t.Fatalf("case %d: expected errored", i+1) - } else if err != nil { - if !c.Error { - t.Fatalf("case %d: errored %#v", i+1, err) - } else { - continue - } - } + require.NotEmpty(t, c.TestCase, "case #%d needs a name", i+1) - if !reflect.DeepEqual(actual, c.Expected) { - t.Fatalf("case %d: got:\n%#v\n want:\n%#v\n", - i+1, actual, c.Expected) - } + t.Run(c.TestCase, func(t *testing.T) { + result, err := c.Old.Diff(c.New, c.Contextual) + switch c.ExpErr { + case true: + require.Error(t, err, "case %q expected error", c.TestCase) + case false: + require.NoError(t, err, "case %q expected no error", c.TestCase) + require.True(t, reflect.DeepEqual(result, c.Expected), + "case %q got\n%#v\nwant:\n%#v\n", c.TestCase, result, c.Expected) + } + }) } } @@ -4441,6 +4451,12 @@ func TestTaskDiff(t *testing.T) { Type: DiffTypeAdded, Name: "Service", Fields: []*FieldDiff{ + { + Type: DiffTypeAdded, + Name: "EnableTagOverride", + Old: "", + New: "false", + }, { Type: DiffTypeAdded, Name: "Name", @@ -4459,6 +4475,12 @@ func TestTaskDiff(t *testing.T) { Type: DiffTypeDeleted, Name: "Service", Fields: []*FieldDiff{ + { + Type: DiffTypeDeleted, + Name: "EnableTagOverride", + Old: "false", + New: "", + }, { Type: DiffTypeDeleted, Name: "Name", @@ -4506,8 +4528,15 @@ func TestTaskDiff(t *testing.T) { { Type: DiffTypeAdded, Name: "AddressMode", + Old: "", New: "driver", }, + { + Type: DiffTypeNone, + Name: "EnableTagOverride", + Old: "false", + New: "false", + }, { Type: DiffTypeNone, Name: "Name", @@ -4525,6 +4554,37 @@ func TestTaskDiff(t *testing.T) { }, }, }, + { + Name: "Service EnableTagOverride edited no context", + Contextual: false, + Old: &Task{ + Services: []*Service{{ + EnableTagOverride: false, + }}, + }, + New: &Task{ + Services: []*Service{{ + EnableTagOverride: true, + }}, + }, + Expected: &TaskDiff{ + Type: DiffTypeEdited, + Objects: []*ObjectDiff{ + { + Type: DiffTypeEdited, + Name: "Service", + Fields: []*FieldDiff{ + { + Type: DiffTypeEdited, + Name: "EnableTagOverride", + Old: "false", + New: "true", + }, + }, + }, + }, + }, + }, { Name: "Services tags edited (no checks) with context", Contextual: true, @@ -4605,6 +4665,12 @@ func TestTaskDiff(t *testing.T) { Type: DiffTypeNone, Name: "AddressMode", }, + { + Type: DiffTypeNone, + Name: "EnableTagOverride", + Old: "false", + New: "false", + }, { Type: DiffTypeNone, Name: "Name", @@ -4990,6 +5056,12 @@ func TestTaskDiff(t *testing.T) { Old: "", New: "", }, + { + Type: DiffTypeNone, + Name: "EnableTagOverride", + Old: "false", + New: "false", + }, { Type: DiffTypeNone, Name: "Name", diff --git a/nomad/structs/services.go b/nomad/structs/services.go index e3b40c27933..becb143c47b 100644 --- a/nomad/structs/services.go +++ b/nomad/structs/services.go @@ -326,6 +326,14 @@ type Service struct { // this service. AddressMode string + // EnableTagOverride will disable Consul's anti-entropy mechanism for the + // tags of this service. External updates to the service definition via + // Consul will not be corrected to match the service definition set in the + // Nomad job specification. + // + // https://www.consul.io/docs/agent/services.html#service-definition + EnableTagOverride bool + Tags []string // List of tags for the service CanaryTags []string // List of tags for the service when it is a canary Checks []*ServiceCheck // List of checks associated with the service @@ -388,7 +396,7 @@ func (s *Service) Canonicalize(job string, taskGroup string, task string) { } } -// Validate checks if the Check definition is valid +// Validate checks if the Service definition is valid func (s *Service) Validate() error { var mErr multierror.Error @@ -436,7 +444,7 @@ func (s *Service) Validate() error { return mErr.ErrorOrNil() } -// ValidateName checks if the services Name is valid and should be called after +// ValidateName checks if the service Name is valid and should be called after // the name has been interpolated func (s *Service) ValidateName(name string) error { // Ensure the service name is valid per RFC-952 ยง1 @@ -471,6 +479,7 @@ func (s *Service) Hash(allocID, taskName string, canary bool) string { if len(s.CanaryMeta) > 0 { fmt.Fprintf(h, "%v", s.CanaryMeta) } + fmt.Fprintf(h, "%t", s.EnableTagOverride) // Vary ID on whether or not CanaryTags will be used if canary { @@ -539,6 +548,10 @@ OUTER: return false } + if s.EnableTagOverride != o.EnableTagOverride { + return false + } + return true } diff --git a/nomad/structs/structs_test.go b/nomad/structs/structs_test.go index f549adeadf9..ed010f3a753 100644 --- a/nomad/structs/structs_test.go +++ b/nomad/structs/structs_test.go @@ -2621,6 +2621,9 @@ func TestService_Equals(t *testing.T) { o.Connect = &ConsulConnect{Native: true} assertDiff() + + o.EnableTagOverride = true + assertDiff() } func TestJob_ExpandServiceNames(t *testing.T) {