From a6e6f0a3671b36cdb5661cb72032d0dbe003a783 Mon Sep 17 00:00:00 2001 From: Mike Morris Date: Thu, 18 May 2023 15:05:02 -0400 Subject: [PATCH] wip: lifecycle method cleanup, rename httpGetter to httpClient, add Post method, start graceful shutdown impl --- pkg/consuldp/consul_dataplane.go | 4 ++- pkg/consuldp/lifecycle.go | 62 ++++++++++++++++++++++++++------ pkg/consuldp/lifecycle_test.go | 7 ++-- pkg/consuldp/metrics.go | 2 +- pkg/consuldp/metrics_test.go | 6 ++++ 5 files changed, 65 insertions(+), 16 deletions(-) diff --git a/pkg/consuldp/consul_dataplane.go b/pkg/consuldp/consul_dataplane.go index d1e88516..596e9309 100644 --- a/pkg/consuldp/consul_dataplane.go +++ b/pkg/consuldp/consul_dataplane.go @@ -7,6 +7,7 @@ import ( "context" "errors" "fmt" + "io" "net" "net/http" "strings" @@ -31,8 +32,9 @@ type xdsServer struct { exitedCh chan struct{} } -type httpGetter interface { +type httpClient interface { Get(string) (*http.Response, error) + Post(string, string, io.Reader) (*http.Response, error) } // ConsulDataplane represents the consul-dataplane process diff --git a/pkg/consuldp/lifecycle.go b/pkg/consuldp/lifecycle.go index bba7a9ab..303c3fbe 100644 --- a/pkg/consuldp/lifecycle.go +++ b/pkg/consuldp/lifecycle.go @@ -6,12 +6,11 @@ package consuldp import ( "context" "fmt" - "io" "net/http" // "net/url" "strconv" "sync" - // "time" + "time" // "github.com/hashicorp/consul-server-connection-manager/discovery" "github.com/hashicorp/go-hclog" @@ -39,7 +38,7 @@ type lifecycleConfig struct { // consuldp proxy lifecycle management config gracefulPort int gracefulShutdownPath string - client httpGetter // client that will dial the managed Envoy proxy + client httpClient // client that will dial the managed Envoy proxy // consuldp proxy lifecycle management server lifecycleServer *http.Server @@ -103,8 +102,7 @@ func (m *lifecycleConfig) startLifecycleServer() { } } -// stopLifecycleServer stops the main merged metrics server and the consul -// dataplane metrics server +// stopLifecycleServer stops the consul dataplane proxy lifecycle server func (m *lifecycleConfig) stopLifecycleServer() { m.mu.Lock() defer m.mu.Unlock() @@ -115,7 +113,7 @@ func (m *lifecycleConfig) stopLifecycleServer() { m.logger.Info("stopping the merged server") err := m.lifecycleServer.Close() if err != nil { - m.logger.Warn("error while closing metrics server", "error", err) + m.logger.Warn("error while closing lifecycle server", "error", err) errs = multierror.Append(err, errs) } } @@ -123,7 +121,7 @@ func (m *lifecycleConfig) stopLifecycleServer() { m.logger.Info("stopping consul dp promtheus server") err := m.lifecycleServer.Close() if err != nil { - m.logger.Warn("error while closing metrics server", "error", err) + m.logger.Warn("error while closing lifecycle server", "error", err) errs = multierror.Append(err, errs) } } @@ -143,9 +141,53 @@ func (m *lifecycleConfig) stopLifecycleServer() { // or, if configured, until all open connections to Envoy listeners have been // drained. func (m *lifecycleConfig) gracefulShutdown(rw http.ResponseWriter, _ *http.Request) { - // envoyShutdownUrl := fmt.Sprintf("http://%s:%v/quitquitquit", m.envoyAdminAddr, m.envoyAdminBindPort) - m.logger.Debug("initiating graceful shutdown") - // TODO: implement + // Create a context that is both manually cancellable and will signal + // a cancel at the specified duration. + // TODO: calculate timeout from m.shutdownGracePeriod + // TODO: should this use lifecycleManager ctx instead of context.Background? + timeout := 15 * time.Second + ctx, cancel := context.WithTimeout(context.Background(), timeout) + defer cancel() + + // Create a channel to received a signal that work is done. + // TODO: should this be a buffered channel instead? + shutdownCh := make(chan int) + + // Ask the goroutine to do some work for us. + // If shutdownDrainListeners is enabled, initiatie graceful shutdown of Envoy + // and wait until all open connections have closed or shutdownGracePeriod + // seconds have elapsed. + go func() { + // envoyDrainListenersUrl := fmt.Sprintf("http://%s:%v/drain_listeners?inboundonly", m.envoyAdminAddr, m.envoyAdminBindPort) + // envoyShutdownUrl := fmt.Sprintf("http://%s:%v/quitquitquit", m.envoyAdminAddr, m.envoyAdminBindPort) + + // TODO: actually initiate Envoy shutdown and loop checking for open + // connections + // By default, the Envoy server will close listeners immediately on server + // shutdown. To drain listeners for some duration of time prior to server + // shutdown, use drain_listeners before shutting down the server. + // We want to start draining connections from inbound listeners if + // configured, but still allow outbound traffic until gracfulShutdownPeriod + // has elapsed to facilitate a graceful application shutdown. + // resp, err := m.client.Post(envoyDrainListenersUrl) + + time.Sleep(5 * time.Second) + + // Report the work is done. + // TODO: is there actually any point to sending this signal if we always just + // want to wait unitl the shutdownGracePeriod has elapsed? + shutdownCh <- 0 + }() + + for { + select { + case _ = <-shutdownCh: + m.logger.Info("shutting down, all open Envoy connections have been drained") + case <-ctx.Done(): + m.logger.Info("shutdown grace period timeout reached") + // resp, err := m.client.Post(envoyShutdownUrl) + } + } } diff --git a/pkg/consuldp/lifecycle_test.go b/pkg/consuldp/lifecycle_test.go index 4b82a288..ad0ee3f9 100644 --- a/pkg/consuldp/lifecycle_test.go +++ b/pkg/consuldp/lifecycle_test.go @@ -7,7 +7,7 @@ import ( // "bytes" "context" // "errors" - "fmt" + // "fmt" // "io" "log" // "net" @@ -22,9 +22,8 @@ import ( ) var ( - envoyAdminPort = 19000 - envoyAdminAddr = "127.0.0.1" - envoyShutdownUrl = fmt.Sprintf("http://%s:%v/quitquitquit", envoyAdminAddr, envoyAdminPort) + envoyAdminPort = 19000 + envoyAdminAddr = "127.0.0.1" ) func TestLifecycleServerClosed(t *testing.T) { diff --git a/pkg/consuldp/metrics.go b/pkg/consuldp/metrics.go index d11568e6..313a182b 100644 --- a/pkg/consuldp/metrics.go +++ b/pkg/consuldp/metrics.go @@ -79,7 +79,7 @@ type metricsConfig struct { // merged metrics config promScrapeServer *http.Server // the server that will serve all the merged metrics - client httpGetter // the client that will scrape the urls + client httpClient // the client that will scrape the urls urls []string // the urls that will be scraped // consuldp metrics server diff --git a/pkg/consuldp/metrics_test.go b/pkg/consuldp/metrics_test.go index a2f31d37..3f443b69 100644 --- a/pkg/consuldp/metrics_test.go +++ b/pkg/consuldp/metrics_test.go @@ -223,6 +223,12 @@ func (c *mockClient) Get(url string) (*http.Response, error) { }, nil } +func (c *mockClient) Post(url string, contentType string, body io.Reader) (*http.Response, error) { + return &http.Response{ + StatusCode: http.StatusOK, + }, nil +} + func makeFakeMetric(url string) string { return fmt.Sprintf(`fake_metric{url="%s"} 1\n`, url) }