diff --git a/.changelog/20596.txt b/.changelog/20596.txt new file mode 100644 index 00000000000..e8386f46fbb --- /dev/null +++ b/.changelog/20596.txt @@ -0,0 +1,3 @@ +```release-note:bug +services: Added retry to Nomad service deregistration RPCs during alloc stop +``` diff --git a/client/serviceregistration/nsd/nsd.go b/client/serviceregistration/nsd/nsd.go index 1ec34902caa..45a5aaf14bc 100644 --- a/client/serviceregistration/nsd/nsd.go +++ b/client/serviceregistration/nsd/nsd.go @@ -9,11 +9,13 @@ import ( "fmt" "strings" "sync" + "time" "github.com/hashicorp/go-hclog" "github.com/hashicorp/go-multierror" "github.com/hashicorp/nomad/client/serviceregistration" "github.com/hashicorp/nomad/nomad/structs" + "oss.indeed.com/go/libtime/decay" ) type ServiceRegistrationHandler struct { @@ -34,6 +36,9 @@ type ServiceRegistrationHandler struct { // shutDownCh coordinates shutting down the handler and any long-running // processes, such as the RPC retry. shutDownCh chan struct{} + + backoffMax time.Duration + backoffInitial time.Duration } // ServiceRegistrationHandlerCfg holds critical information used during the @@ -62,6 +67,15 @@ type ServiceRegistrationHandlerCfg struct { // CheckWatcher watches checks of services in the Nomad service provider, // and restarts associated tasks in accordance with their check_restart block. CheckWatcher serviceregistration.CheckWatcher + + // BackoffMax is the maximum amont of time failed RemoveWorkload RPCs will + // be retried, defaults to 1s + BackoffMax time.Duration + + // BackoffInitial is the initial gap before retrying failed RemoveWorkload + // RPCs, defaults to 100ms. This will double each attempt until BackoffMax + // is reached + BackoffInitial time.Duration } // NewServiceRegistrationHandler returns a ready to use @@ -69,13 +83,23 @@ type ServiceRegistrationHandlerCfg struct { // interface. func NewServiceRegistrationHandler(log hclog.Logger, cfg *ServiceRegistrationHandlerCfg) serviceregistration.Handler { go cfg.CheckWatcher.Run(context.TODO()) - return &ServiceRegistrationHandler{ + + s := &ServiceRegistrationHandler{ cfg: cfg, log: log.Named("service_registration.nomad"), registrationEnabled: cfg.Enabled, checkWatcher: cfg.CheckWatcher, shutDownCh: make(chan struct{}), + backoffMax: cfg.BackoffMax, + backoffInitial: cfg.BackoffInitial, } + if s.backoffInitial == 0 { + s.backoffInitial = 100 * time.Millisecond + } + if s.backoffMax == 0 { + s.backoffMax = time.Second + } + return s } func (s *ServiceRegistrationHandler) RegisterWorkload(workload *serviceregistration.WorkloadServices) error { @@ -183,26 +207,44 @@ func (s *ServiceRegistrationHandler) removeWorkload( var deleteResp structs.ServiceRegistrationDeleteByIDResponse - err := s.cfg.RPCFn(structs.ServiceRegistrationDeleteByIDRPCMethod, &deleteArgs, &deleteResp) - if err == nil { - return + backoffOpts := decay.BackoffOptions{ + MaxSleepTime: s.backoffMax, + InitialGapSize: s.backoffInitial, } + backoffErr := decay.Backoff(func() (bool, error) { - // The Nomad API exposes service registration deletion to handle - // orphaned service registrations. In the event a service is removed - // accidentally that is still running, we will hit this error when we - // eventually want to remove it. We therefore want to handle this, - // while ensuring the operator can see. - if strings.Contains(err.Error(), "service registration not found") { - s.log.Info("attempted to delete non-existent service registration", - "service_id", id, "namespace", workload.ProviderNamespace) - return - } + select { + case <-s.shutDownCh: + return true, nil + default: + } + + err := s.cfg.RPCFn(structs.ServiceRegistrationDeleteByIDRPCMethod, + &deleteArgs, &deleteResp) + if err == nil { + return false, nil + } - // Log the error as there is nothing left to do, so the operator can see it - // and identify any problems. - s.log.Error("failed to delete service registration", - "error", err, "service_id", id, "namespace", workload.ProviderNamespace) + // The Nomad API exposes service registration deletion to handle + // orphaned service registrations. In the event a service is removed + // accidentally that is still running, we will hit this error when we + // eventually want to remove it. We therefore want to handle this, + // while ensuring the operator can see. + if strings.Contains(err.Error(), "service registration not found") { + s.log.Info("attempted to delete non-existent service registration", + "service_id", id, "namespace", workload.ProviderNamespace) + return false, nil + } + + return true, err + }, backoffOpts) + + if backoffErr != nil { + // Log the error as there is nothing left to do, so the operator can see + // it and identify any problems. + s.log.Error("failed to delete service registration", + "error", backoffErr, "service_id", id, "namespace", workload.ProviderNamespace) + } } func (s *ServiceRegistrationHandler) UpdateWorkload(old, new *serviceregistration.WorkloadServices) error { diff --git a/client/serviceregistration/nsd/nsd_test.go b/client/serviceregistration/nsd/nsd_test.go index 7210eb913af..e76d11fb4b9 100644 --- a/client/serviceregistration/nsd/nsd_test.go +++ b/client/serviceregistration/nsd/nsd_test.go @@ -13,8 +13,10 @@ import ( "github.com/hashicorp/go-hclog" "github.com/hashicorp/nomad/client/serviceregistration" + "github.com/hashicorp/nomad/helper/testlog" "github.com/hashicorp/nomad/nomad/structs" "github.com/shoenig/test" + "github.com/shoenig/test/must" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -110,6 +112,7 @@ func TestServiceRegistrationHandler_RemoveWorkload(t *testing.T) { name string inputCfg *ServiceRegistrationHandlerCfg inputWorkload *serviceregistration.WorkloadServices + returnedDeleteErr error expectedRPCs map[string]int expectedError error expWatch, expUnWatch int @@ -138,26 +141,39 @@ func TestServiceRegistrationHandler_RemoveWorkload(t *testing.T) { expWatch: 0, expUnWatch: 2, }, + { + name: "failed deregister", + inputCfg: &ServiceRegistrationHandlerCfg{ + Enabled: true, + CheckWatcher: new(mockCheckWatcher), + BackoffMax: 75 * time.Millisecond, + BackoffInitial: 50 * time.Millisecond, + }, + inputWorkload: mockWorkload(), + returnedDeleteErr: errors.New("unrecoverable error"), + expectedRPCs: map[string]int{structs.ServiceRegistrationDeleteByIDRPCMethod: 4}, + expectedError: nil, + expWatch: 0, + expUnWatch: 2, + }, } - // Create a logger we can use for all tests. - log := hclog.NewNullLogger() - for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { // Add the mock RPC functionality. - mockRPC := mockRPC{callCounts: map[string]int{}} + mockRPC := mockRPC{ + callCounts: map[string]int{}, + deleteResponseErr: tc.returnedDeleteErr, + } tc.inputCfg.RPCFn = mockRPC.RPC // Create the handler and run the tests. - h := NewServiceRegistrationHandler(log, tc.inputCfg) + h := NewServiceRegistrationHandler(testlog.HCLogger(t), tc.inputCfg) h.RemoveWorkload(tc.inputWorkload) - require.Eventually(t, func() bool { - return assert.Equal(t, tc.expectedRPCs, mockRPC.calls()) - }, 100*time.Millisecond, 10*time.Millisecond) + must.Eq(t, tc.expectedRPCs, mockRPC.calls()) tc.inputCfg.CheckWatcher.(*mockCheckWatcher).assert(t, tc.expWatch, tc.expUnWatch) }) } @@ -647,6 +663,9 @@ type mockRPC struct { // lock should be used to access this. callCounts map[string]int l sync.RWMutex + + deleteResponseErr error + upsertResponseErr error } // calls returns the mapping counting the number of calls made to each RPC @@ -659,12 +678,17 @@ func (mr *mockRPC) calls() map[string]int { // RPC mocks the server RPCs, acting as though any request succeeds. func (mr *mockRPC) RPC(method string, _, _ interface{}) error { + mr.l.Lock() + defer mr.l.Unlock() + switch method { - case structs.ServiceRegistrationUpsertRPCMethod, structs.ServiceRegistrationDeleteByIDRPCMethod: - mr.l.Lock() + case structs.ServiceRegistrationUpsertRPCMethod: + mr.callCounts[method]++ + return mr.upsertResponseErr + + case structs.ServiceRegistrationDeleteByIDRPCMethod: mr.callCounts[method]++ - mr.l.Unlock() - return nil + return mr.deleteResponseErr default: return fmt.Errorf("unexpected RPC method: %v", method) }