diff --git a/nomad/structs/services.go b/nomad/structs/services.go index 504cc8dce4c..0558e52aa43 100644 --- a/nomad/structs/services.go +++ b/nomad/structs/services.go @@ -696,7 +696,7 @@ func (c *ConsulConnect) Copy() *ConsulConnect { } } -// Equals returns true if the structs are recursively equal. +// Equals returns true if the connect blocks are deeply equal. func (c *ConsulConnect) Equals(o *ConsulConnect) bool { if c == nil || o == nil { return c == o @@ -710,7 +710,9 @@ func (c *ConsulConnect) Equals(o *ConsulConnect) bool { return false } - // todo(shoenig) task has never been compared, should it be? + if !c.SidecarTask.Equals(o.SidecarTask) { + return false + } if !c.Gateway.Equals(o.Gateway) { return false @@ -864,6 +866,59 @@ type SidecarTask struct { KillSignal string } +func (t *SidecarTask) Equals(o *SidecarTask) bool { + if t == nil || o == nil { + return t == o + } + + if t.Name != o.Name { + return false + } + + if t.Driver != o.Driver { + return false + } + + if t.User != o.User { + return false + } + + // config compare + if !opaqueMapsEqual(t.Config, o.Config) { + return false + } + + if !helper.CompareMapStringString(t.Env, o.Env) { + return false + } + + if !t.Resources.Equals(o.Resources) { + return false + } + + if !helper.CompareMapStringString(t.Meta, o.Meta) { + return false + } + + if !helper.CompareTimePtrs(t.KillTimeout, o.KillTimeout) { + return false + } + + if !t.LogConfig.Equals(o.LogConfig) { + return false + } + + if !helper.CompareTimePtrs(t.ShutdownDelay, o.ShutdownDelay) { + return false + } + + if t.KillSignal != o.KillSignal { + return false + } + + return true +} + func (t *SidecarTask) Copy() *SidecarTask { if t == nil { return nil diff --git a/nomad/structs/services_test.go b/nomad/structs/services_test.go index 45b29b54fa9..12b3b5b55c4 100644 --- a/nomad/structs/services_test.go +++ b/nomad/structs/services_test.go @@ -349,6 +349,85 @@ func TestSidecarTask_MergeIntoTask(t *testing.T) { require.Exactly(t, expected, task) } +func TestSidecarTask_Equals(t *testing.T) { + t.Parallel() + + original := &SidecarTask{ + Name: "sidecar-task-1", + Driver: "docker", + User: "nobody", + Config: map[string]interface{}{"foo": 1}, + Env: map[string]string{"color": "blue"}, + Resources: &Resources{MemoryMB: 300}, + Meta: map[string]string{"index": "1"}, + KillTimeout: helper.TimeToPtr(2 * time.Second), + LogConfig: &LogConfig{ + MaxFiles: 2, + MaxFileSizeMB: 300, + }, + ShutdownDelay: helper.TimeToPtr(10 * time.Second), + KillSignal: "SIGTERM", + } + + t.Run("unmodified", func(t *testing.T) { + duplicate := original.Copy() + require.True(t, duplicate.Equals(original)) + }) + + type st = SidecarTask + type tweaker = func(task *st) + + try := func(t *testing.T, tweak tweaker) { + modified := original.Copy() + tweak(modified) + require.NotEqual(t, original, modified) + } + + t.Run("mod name", func(t *testing.T) { + try(t, func(s *st) { s.Name = "sidecar-task-2" }) + }) + + t.Run("mod driver", func(t *testing.T) { + try(t, func(s *st) { s.Driver = "exec" }) + }) + + t.Run("mod user", func(t *testing.T) { + try(t, func(s *st) { s.User = "root" }) + }) + + t.Run("mod config", func(t *testing.T) { + try(t, func(s *st) { s.Config = map[string]interface{}{"foo": 2} }) + }) + + t.Run("mod env", func(t *testing.T) { + try(t, func(s *st) { s.Env = map[string]string{"color": "red"} }) + }) + + t.Run("mod resources", func(t *testing.T) { + try(t, func(s *st) { s.Resources = &Resources{MemoryMB: 200} }) + }) + + t.Run("mod meta", func(t *testing.T) { + try(t, func(s *st) { s.Meta = map[string]string{"index": "2"} }) + }) + + t.Run("mod kill timeout", func(t *testing.T) { + try(t, func(s *st) { s.KillTimeout = helper.TimeToPtr(3 * time.Second) }) + }) + + t.Run("mod log config", func(t *testing.T) { + try(t, func(s *st) { s.LogConfig = &LogConfig{MaxFiles: 3} }) + }) + + t.Run("mod shutdown delay", func(t *testing.T) { + try(t, func(s *st) { s.ShutdownDelay = helper.TimeToPtr(20 * time.Second) }) + }) + + t.Run("mod kill signal", func(t *testing.T) { + try(t, func(s *st) { s.KillSignal = "SIGHUP" }) + }) +} + func TestConsulUpstream_upstreamEquals(t *testing.T) { t.Parallel() diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index 7d23ab22257..1f6bb71d277 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -6169,6 +6169,22 @@ type LogConfig struct { Config map[string]interface{} } +func (l *LogConfig) Equals(o *LogConfig) bool { + if l == nil || o == nil { + return l == o + } + + if l.MaxFiles != o.MaxFiles { + return false + } + + if l.MaxFileSizeMB != o.MaxFileSizeMB { + return false + } + + return true +} + func (l *LogConfig) Copy() *LogConfig { if l == nil { return nil diff --git a/nomad/structs/structs_test.go b/nomad/structs/structs_test.go index 02ff3b9425f..a13b24de010 100644 --- a/nomad/structs/structs_test.go +++ b/nomad/structs/structs_test.go @@ -2039,6 +2039,38 @@ func TestTask_Validate_LogConfig(t *testing.T) { } } +func TestLogConfig_Equals(t *testing.T) { + t.Run("both nil", func(t *testing.T) { + a := (*LogConfig)(nil) + b := (*LogConfig)(nil) + require.True(t, a.Equals(b)) + }) + + t.Run("one nil", func(t *testing.T) { + a := new(LogConfig) + b := (*LogConfig)(nil) + require.False(t, a.Equals(b)) + }) + + t.Run("max files", func(t *testing.T) { + a := &LogConfig{MaxFiles: 1, MaxFileSizeMB: 200} + b := &LogConfig{MaxFiles: 2, MaxFileSizeMB: 200} + require.False(t, a.Equals(b)) + }) + + t.Run("max file size", func(t *testing.T) { + a := &LogConfig{MaxFiles: 1, MaxFileSizeMB: 100} + b := &LogConfig{MaxFiles: 1, MaxFileSizeMB: 200} + require.False(t, a.Equals(b)) + }) + + t.Run("same", func(t *testing.T) { + a := &LogConfig{MaxFiles: 1, MaxFileSizeMB: 200} + b := &LogConfig{MaxFiles: 1, MaxFileSizeMB: 200} + require.True(t, a.Equals(b)) + }) +} + func TestTask_Validate_CSIPluginConfig(t *testing.T) { table := []struct { name string diff --git a/scheduler/util.go b/scheduler/util.go index 1799ede53e3..7261f67deb8 100644 --- a/scheduler/util.go +++ b/scheduler/util.go @@ -377,6 +377,11 @@ func tasksUpdated(jobA, jobB *structs.Job, taskGroup string) bool { return true } + // Check connect service(s) updated + if connectServiceUpdated(a.Services, b.Services) { + return true + } + // Check each task for _, at := range a.Tasks { bt := b.LookupTask(at.Name) @@ -429,6 +434,78 @@ func tasksUpdated(jobA, jobB *structs.Job, taskGroup string) bool { return false } +// connectServiceUpdated returns true if any services with a connect stanza have +// been changed in such a way that requires a destructive update. +// +// Ordinary services can be updated in-place by updating the service definition +// in Consul. Connect service changes mostly require destroying the task. +func connectServiceUpdated(servicesA, servicesB []*structs.Service) bool { + for _, serviceA := range servicesA { + if serviceA.Connect != nil { + for _, serviceB := range servicesB { + if serviceA.Name == serviceB.Name { + if connectUpdated(serviceA.Connect, serviceB.Connect) { + return true + } + // Part of the Connect plumbing is derived from port label, + // if that changes we need to destroy the task. + if serviceA.PortLabel != serviceB.PortLabel { + return true + } + break + } + } + } + } + return false +} + +// connectUpdated returns true if the connect block has been updated in a manner +// that will require a destructive update. +// +// Fields that can be updated through consul-sync do not need a destructive +// update. +func connectUpdated(connectA, connectB *structs.ConsulConnect) bool { + if connectA == nil || connectB == nil { + return connectA == connectB + } + + if connectA.Native != connectB.Native { + return true + } + + if !connectA.Gateway.Equals(connectB.Gateway) { + return true + } + + if !connectA.SidecarTask.Equals(connectB.SidecarTask) { + return true + } + + // not everything in sidecar_service needs task destruction + if connectSidecarServiceUpdated(connectA.SidecarService, connectB.SidecarService) { + return true + } + + return false +} + +func connectSidecarServiceUpdated(ssA, ssB *structs.ConsulSidecarService) bool { + if ssA == nil || ssB == nil { + return ssA == ssB + } + + if ssA.Port != ssB.Port { + return true + } + + // sidecar_service.tags handled in-place (registration) + + // sidecar_service.proxy handled in-place (registration + xDS) + + return false +} + func networkUpdated(netA, netB []*structs.NetworkResource) bool { if len(netA) != len(netB) { return true diff --git a/scheduler/util_test.go b/scheduler/util_test.go index 56d2d2e6d4d..295587bcd49 100644 --- a/scheduler/util_test.go +++ b/scheduler/util_test.go @@ -691,6 +691,85 @@ func TestTasksUpdated(t *testing.T) { require.True(t, tasksUpdated(j19, j20, name)) } +func TestTasksUpdated_connectServiceUpdated(t *testing.T) { + servicesA := []*structs.Service{{ + Name: "service1", + PortLabel: "1111", + Connect: &structs.ConsulConnect{ + SidecarService: &structs.ConsulSidecarService{ + Tags: []string{"a"}, + }, + }, + }} + + t.Run("service not updated", func(t *testing.T) { + servicesB := []*structs.Service{{ + Name: "service0", + }, { + Name: "service1", + PortLabel: "1111", + Connect: &structs.ConsulConnect{ + SidecarService: &structs.ConsulSidecarService{ + Tags: []string{"a"}, + }, + }, + }, { + Name: "service2", + }} + updated := connectServiceUpdated(servicesA, servicesB) + require.False(t, updated) + }) + + t.Run("service connect tags updated", func(t *testing.T) { + servicesB := []*structs.Service{{ + Name: "service0", + }, { + Name: "service1", + PortLabel: "1111", + Connect: &structs.ConsulConnect{ + SidecarService: &structs.ConsulSidecarService{ + Tags: []string{"b"}, // in-place update + }, + }, + }} + updated := connectServiceUpdated(servicesA, servicesB) + require.False(t, updated) + }) + + t.Run("service connect port updated", func(t *testing.T) { + servicesB := []*structs.Service{{ + Name: "service0", + }, { + Name: "service1", + PortLabel: "1111", + Connect: &structs.ConsulConnect{ + SidecarService: &structs.ConsulSidecarService{ + Tags: []string{"a"}, + Port: "2222", // destructive update + }, + }, + }} + updated := connectServiceUpdated(servicesA, servicesB) + require.True(t, updated) + }) + + t.Run("service port label updated", func(t *testing.T) { + servicesB := []*structs.Service{{ + Name: "service0", + }, { + Name: "service1", + PortLabel: "1112", // destructive update + Connect: &structs.ConsulConnect{ + SidecarService: &structs.ConsulSidecarService{ + Tags: []string{"1"}, + }, + }, + }} + updated := connectServiceUpdated(servicesA, servicesB) + require.True(t, updated) + }) +} + func TestEvictAndPlace_LimitLessThanAllocs(t *testing.T) { _, ctx := testContext(t) allocs := []allocTuple{