From ffd7b289bddff9fe2b0282941736c927a7d03f4c Mon Sep 17 00:00:00 2001 From: Danielle Lancashire Date: Wed, 10 Apr 2019 10:39:24 +0200 Subject: [PATCH] consul: Use a stable identifier for services The current implementation of Service Registration uses a hash of the nomad-internal state of a service to register it with Consul, this means that any update to the service invalidates this name and we then deregister, and recreate the service in Consul. While this behaviour slightly simplifies reasoning about service registration, this becomes problematic when we add consul health checks to a service. When the service is re-registered, so are the checks, which default to failing for at least one check period. This commit migrates us to using a stable identifier based on the allocation, task, and service identifiers, and uses the difference between the remote and local state to decide when to push updates. It uses the existing hashing mechanic to decide when UpdateTask should regenerate service registrations for providing to Sync, but this should be removable as part of a future refactor. It additionally introduces the _nomad-check- prefix for check definitions, to allow for future allowing of consul features like maintenance mode. --- command/agent/consul/client.go | 60 +++++--- command/agent/consul/unit_test.go | 221 ++++++++---------------------- 2 files changed, 100 insertions(+), 181 deletions(-) diff --git a/command/agent/consul/client.go b/command/agent/consul/client.go index 141454d1fd8..ce882ab6cab 100644 --- a/command/agent/consul/client.go +++ b/command/agent/consul/client.go @@ -5,6 +5,7 @@ import ( "fmt" "net" "net/url" + "reflect" "strconv" "strings" "sync" @@ -29,6 +30,10 @@ const ( // for tasks. nomadTaskPrefix = nomadServicePrefix + "-task-" + // nomadCheckPrefix is the prefix that scopes Nomad registered checks for + // services. + nomadCheckPrefix = nomadServicePrefix + "-check-" + // defaultRetryInterval is how quickly to retry syncing services and // checks to Consul when an error occurs. Will backoff up to a max. defaultRetryInterval = time.Second @@ -83,6 +88,15 @@ type AgentAPI interface { UpdateTTL(id, output, status string) error } +func agentServiceUpdateRequired(reg *api.AgentServiceRegistration, svc *api.AgentService) bool { + return !(reg.Kind == svc.Kind && + reg.ID == svc.ID && + reg.Port == svc.Port && + reg.Address == svc.Address && + reg.Name == svc.Service && + reflect.DeepEqual(reg.Tags, svc.Tags)) +} + // operations are submitted to the main loop via commit() for synchronizing // with Consul. type operations struct { @@ -466,16 +480,26 @@ func (c *ServiceClient) sync() error { metrics.IncrCounter([]string{"client", "consul", "service_deregistrations"}, 1) } - // Add Nomad services missing from Consul + // Add Nomad services missing from Consul, or where the service has been updated. for id, locals := range c.services { - if _, ok := consulServices[id]; !ok { - if err = c.client.ServiceRegister(locals); err != nil { - metrics.IncrCounter([]string{"client", "consul", "sync_failure"}, 1) - return err + existingSvc, ok := consulServices[id] + + if ok { + // There is an existing registration of this service in Consul, so here + // we validate to see if the service has been invalidated to see if it + // should be updated. + if !agentServiceUpdateRequired(locals, existingSvc) { + // No Need to update services that have not changed + continue } - sreg++ - metrics.IncrCounter([]string{"client", "consul", "service_registrations"}, 1) } + + if err = c.client.ServiceRegister(locals); err != nil { + metrics.IncrCounter([]string{"client", "consul", "sync_failure"}, 1) + return err + } + sreg++ + metrics.IncrCounter([]string{"client", "consul", "service_registrations"}, 1) } // Remove Nomad checks in Consul but unknown locally @@ -809,10 +833,10 @@ func (c *ServiceClient) UpdateTask(old, newTask *TaskServices) error { newIDs[makeTaskServiceID(newTask.AllocID, newTask.Name, s, newTask.Canary)] = s } - // Loop over existing Service IDs to see if they have been removed or - // updated. + // Loop over existing Service IDs to see if they have been removed for existingID, existingSvc := range existingIDs { newSvc, ok := newIDs[existingID] + if !ok { // Existing service entry removed ops.deregServices = append(ops.deregServices, existingID) @@ -828,8 +852,12 @@ func (c *ServiceClient) UpdateTask(old, newTask *TaskServices) error { continue } - // Service exists and hasn't changed, don't re-add it later - delete(newIDs, existingID) + oldHash := existingSvc.Hash(old.AllocID, old.Name, old.Canary) + newHash := newSvc.Hash(newTask.AllocID, newTask.Name, newTask.Canary) + if oldHash == newHash { + // Service exists and hasn't changed, don't re-add it later + delete(newIDs, existingID) + } // Service still exists so add it to the task's registration sreg := &ServiceRegistration{ @@ -848,7 +876,8 @@ func (c *ServiceClient) UpdateTask(old, newTask *TaskServices) error { for _, check := range newSvc.Checks { checkID := makeCheckID(existingID, check) if _, exists := existingChecks[checkID]; exists { - // Check exists, so don't remove it + // Check is still required. Remove it from the map so it doesn't get + // deleted later. delete(existingChecks, checkID) sreg.checkIDs[checkID] = struct{}{} } @@ -861,7 +890,6 @@ func (c *ServiceClient) UpdateTask(old, newTask *TaskServices) error { for _, checkID := range newCheckIDs { sreg.checkIDs[checkID] = struct{}{} - } // Update all watched checks as CheckRestart fields aren't part of ID @@ -1082,14 +1110,14 @@ func makeAgentServiceID(role string, service *structs.Service) string { // Consul. All structs.Service fields are included in the ID's hash except // Checks. This allows updates to merely compare IDs. // -// Example Service ID: _nomad-task-TNM333JKJPM5AK4FAS3VXQLXFDWOF4VH +// Example Service ID: _nomad-task-b4e61df9-b095-d64e-f241-23860da1375f-redis-http func makeTaskServiceID(allocID, taskName string, service *structs.Service, canary bool) string { - return nomadTaskPrefix + service.Hash(allocID, taskName, canary) + return fmt.Sprintf("%s%s-%s-%s", nomadTaskPrefix, allocID, taskName, service.Name) } // makeCheckID creates a unique ID for a check. func makeCheckID(serviceID string, check *structs.ServiceCheck) string { - return check.Hash(serviceID) + return fmt.Sprintf("%s%s", nomadCheckPrefix, check.Hash(serviceID)) } // createCheckReg creates a Check that can be registered with Consul. diff --git a/command/agent/consul/unit_test.go b/command/agent/consul/unit_test.go index 4d2009e5aa8..e555a59eb7e 100644 --- a/command/agent/consul/unit_test.go +++ b/command/agent/consul/unit_test.go @@ -128,97 +128,38 @@ func setupFake(t *testing.T) *testFakeCtx { func TestConsul_ChangeTags(t *testing.T) { ctx := setupFake(t) + require := require.New(t) - if err := ctx.ServiceClient.RegisterTask(ctx.Task); err != nil { - t.Fatalf("unexpected error registering task: %v", err) - } - - if err := ctx.syncOnce(); err != nil { - t.Fatalf("unexpected error syncing task: %v", err) - } - - if n := len(ctx.FakeConsul.services); n != 1 { - t.Fatalf("expected 1 service but found %d:\n%#v", n, ctx.FakeConsul.services) - } + require.NoError(ctx.ServiceClient.RegisterTask(ctx.Task)) + require.NoError(ctx.syncOnce()) + require.Equal(1, len(ctx.FakeConsul.services), "Expected 1 service to be registered with Consul") - // Query the allocs registrations and then again when we update. The IDs - // should change + // Validate the alloc registration reg1, err := ctx.ServiceClient.AllocRegistrations(ctx.Task.AllocID) - if err != nil { - t.Fatalf("Looking up alloc registration failed: %v", err) - } - if reg1 == nil { - t.Fatalf("Nil alloc registrations: %v", err) - } - if num := reg1.NumServices(); num != 1 { - t.Fatalf("Wrong number of services: got %d; want 1", num) - } - if num := reg1.NumChecks(); num != 0 { - t.Fatalf("Wrong number of checks: got %d; want 0", num) - } + require.NoError(err) + require.NotNil(reg1, "Unexpected nil alloc registration") + require.Equal(1, reg1.NumServices()) + require.Equal(0, reg1.NumChecks()) - origKey := "" - for k, v := range ctx.FakeConsul.services { - origKey = k - if v.Name != ctx.Task.Services[0].Name { - t.Errorf("expected Name=%q != %q", ctx.Task.Services[0].Name, v.Name) - } - if !reflect.DeepEqual(v.Tags, ctx.Task.Services[0].Tags) { - t.Errorf("expected Tags=%v != %v", ctx.Task.Services[0].Tags, v.Tags) - } + for _, v := range ctx.FakeConsul.services { + require.Equal(v.Name, ctx.Task.Services[0].Name) + require.Equal(v.Tags, ctx.Task.Services[0].Tags) } + // Update the task definition origTask := ctx.Task.Copy() ctx.Task.Services[0].Tags[0] = "newtag" - if err := ctx.ServiceClient.UpdateTask(origTask, ctx.Task); err != nil { - t.Fatalf("unexpected error registering task: %v", err) - } - if err := ctx.syncOnce(); err != nil { - t.Fatalf("unexpected error syncing task: %v", err) - } - - if n := len(ctx.FakeConsul.services); n != 1 { - t.Fatalf("expected 1 service but found %d:\n%#v", n, ctx.FakeConsul.services) - } - - for k, v := range ctx.FakeConsul.services { - if k == origKey { - t.Errorf("expected key to change but found %q", k) - } - if v.Name != ctx.Task.Services[0].Name { - t.Errorf("expected Name=%q != %q", ctx.Task.Services[0].Name, v.Name) - } - if !reflect.DeepEqual(v.Tags, ctx.Task.Services[0].Tags) { - t.Errorf("expected Tags=%v != %v", ctx.Task.Services[0].Tags, v.Tags) - } - } - - // Check again and ensure the IDs changed - reg2, err := ctx.ServiceClient.AllocRegistrations(ctx.Task.AllocID) - if err != nil { - t.Fatalf("Looking up alloc registration failed: %v", err) - } - if reg2 == nil { - t.Fatalf("Nil alloc registrations: %v", err) - } - if num := reg2.NumServices(); num != 1 { - t.Fatalf("Wrong number of services: got %d; want 1", num) - } - if num := reg2.NumChecks(); num != 0 { - t.Fatalf("Wrong number of checks: got %d; want 0", num) - } - for task, treg := range reg1.Tasks { - otherTaskReg, ok := reg2.Tasks[task] - if !ok { - t.Fatalf("Task %q not in second reg", task) - } + // Register and sync the update + require.NoError(ctx.ServiceClient.UpdateTask(origTask, ctx.Task)) + require.NoError(ctx.syncOnce()) + require.Equal(1, len(ctx.FakeConsul.services), "Expected 1 service to be registered with Consul") - for sID := range treg.Services { - if _, ok := otherTaskReg.Services[sID]; ok { - t.Fatalf("service ID didn't change") - } - } + // Validate the metadata changed + for _, v := range ctx.FakeConsul.services { + require.Equal(v.Name, ctx.Task.Services[0].Name) + require.Equal(v.Tags, ctx.Task.Services[0].Tags) + require.Equal("newtag", v.Tags[0]) } } @@ -227,6 +168,8 @@ func TestConsul_ChangeTags(t *testing.T) { // slightly different code path than changing tags. func TestConsul_ChangePorts(t *testing.T) { ctx := setupFake(t) + require := require.New(t) + ctx.Task.Services[0].Checks = []*structs.ServiceCheck{ { Name: "c1", @@ -252,35 +195,17 @@ func TestConsul_ChangePorts(t *testing.T) { }, } - if err := ctx.ServiceClient.RegisterTask(ctx.Task); err != nil { - t.Fatalf("unexpected error registering task: %v", err) - } - - if err := ctx.syncOnce(); err != nil { - t.Fatalf("unexpected error syncing task: %v", err) - } - - if n := len(ctx.FakeConsul.services); n != 1 { - t.Fatalf("expected 1 service but found %d:\n%#v", n, ctx.FakeConsul.services) - } + require.NoError(ctx.ServiceClient.RegisterTask(ctx.Task)) + require.NoError(ctx.syncOnce()) + require.Equal(1, len(ctx.FakeConsul.services), "Expected 1 service to be registered with Consul") - origServiceKey := "" - for k, v := range ctx.FakeConsul.services { - origServiceKey = k - if v.Name != ctx.Task.Services[0].Name { - t.Errorf("expected Name=%q != %q", ctx.Task.Services[0].Name, v.Name) - } - if !reflect.DeepEqual(v.Tags, ctx.Task.Services[0].Tags) { - t.Errorf("expected Tags=%v != %v", ctx.Task.Services[0].Tags, v.Tags) - } - if v.Port != xPort { - t.Errorf("expected Port x=%v but found: %v", xPort, v.Port) - } + for _, v := range ctx.FakeConsul.services { + require.Equal(ctx.Task.Services[0].Name, v.Name) + require.Equal(ctx.Task.Services[0].Tags, v.Tags) + require.Equal(xPort, v.Port) } - if n := len(ctx.FakeConsul.checks); n != 3 { - t.Fatalf("expected 3 checks but found %d:\n%#v", n, ctx.FakeConsul.checks) - } + require.Equal(3, len(ctx.FakeConsul.checks)) origTCPKey := "" origScriptKey := "" @@ -289,29 +214,28 @@ func TestConsul_ChangePorts(t *testing.T) { switch v.Name { case "c1": origTCPKey = k - if expected := fmt.Sprintf(":%d", xPort); v.TCP != expected { - t.Errorf("expected Port x=%v but found: %v", expected, v.TCP) - } + require.Equal(fmt.Sprintf(":%d", xPort), v.TCP) case "c2": origScriptKey = k select { case <-ctx.MockExec.execs: - if n := len(ctx.MockExec.execs); n > 0 { - t.Errorf("expected 1 exec but found: %d", n+1) - } + // Here we validate there is nothing left on the channel + require.Equal(0, len(ctx.MockExec.execs)) case <-time.After(3 * time.Second): - t.Errorf("script not called in time") + t.Fatalf("script not called in time") } case "c3": origHTTPKey = k - if expected := fmt.Sprintf("http://:%d/", yPort); v.HTTP != expected { - t.Errorf("expected Port y=%v but found: %v", expected, v.HTTP) - } + require.Equal(fmt.Sprintf("http://:%d/", yPort), v.HTTP) default: t.Fatalf("unexpected check: %q", v.Name) } } + require.NotEmpty(origTCPKey) + require.NotEmpty(origScriptKey) + require.NotEmpty(origHTTPKey) + // Now update the PortLabel on the Service and Check c3 origTask := ctx.Task.Copy() ctx.Task.Services[0].PortLabel = "y" @@ -339,64 +263,31 @@ func TestConsul_ChangePorts(t *testing.T) { // Removed PortLabel; should default to service's (y) }, } - if err := ctx.ServiceClient.UpdateTask(origTask, ctx.Task); err != nil { - t.Fatalf("unexpected error registering task: %v", err) - } - if err := ctx.syncOnce(); err != nil { - t.Fatalf("unexpected error syncing task: %v", err) - } - if n := len(ctx.FakeConsul.services); n != 1 { - t.Fatalf("expected 1 service but found %d:\n%#v", n, ctx.FakeConsul.services) - } + require.NoError(ctx.ServiceClient.UpdateTask(origTask, ctx.Task)) + require.NoError(ctx.syncOnce()) + require.Equal(1, len(ctx.FakeConsul.services), "Expected 1 service to be registered with Consul") - for k, v := range ctx.FakeConsul.services { - if k == origServiceKey { - t.Errorf("expected key change; still: %q", k) - } - if v.Name != ctx.Task.Services[0].Name { - t.Errorf("expected Name=%q != %q", ctx.Task.Services[0].Name, v.Name) - } - if !reflect.DeepEqual(v.Tags, ctx.Task.Services[0].Tags) { - t.Errorf("expected Tags=%v != %v", ctx.Task.Services[0].Tags, v.Tags) - } - if v.Port != yPort { - t.Errorf("expected Port y=%v but found: %v", yPort, v.Port) - } + for _, v := range ctx.FakeConsul.services { + require.Equal(ctx.Task.Services[0].Name, v.Name) + require.Equal(ctx.Task.Services[0].Tags, v.Tags) + require.Equal(yPort, v.Port) } - if n := len(ctx.FakeConsul.checks); n != 3 { - t.Fatalf("expected 3 check but found %d:\n%#v", n, ctx.FakeConsul.checks) - } + require.Equal(3, len(ctx.FakeConsul.checks)) for k, v := range ctx.FakeConsul.checks { switch v.Name { case "c1": - if k == origTCPKey { - t.Errorf("expected key change for %s from %q", v.Name, origTCPKey) - } - if expected := fmt.Sprintf(":%d", xPort); v.TCP != expected { - t.Errorf("expected Port x=%v but found: %v", expected, v.TCP) - } + // C1 is not changed + require.Equal(origTCPKey, k) + require.Equal(fmt.Sprintf(":%d", xPort), v.TCP) case "c2": - if k == origScriptKey { - t.Errorf("expected key change for %s from %q", v.Name, origScriptKey) - } - select { - case <-ctx.MockExec.execs: - if n := len(ctx.MockExec.execs); n > 0 { - t.Errorf("expected 1 exec but found: %d", n+1) - } - case <-time.After(3 * time.Second): - t.Errorf("script not called in time") - } + // C2 is not changed and should not have been re-registered + require.Equal(origScriptKey, k) case "c3": - if k == origHTTPKey { - t.Errorf("expected %s key to change from %q", v.Name, k) - } - if expected := fmt.Sprintf("http://:%d/", yPort); v.HTTP != expected { - t.Errorf("expected Port y=%v but found: %v", expected, v.HTTP) - } + require.NotEqual(origHTTPKey, k) + require.Equal(fmt.Sprintf("http://:%d/", yPort), v.HTTP) default: t.Errorf("Unknown check: %q", k) }