-
Notifications
You must be signed in to change notification settings - Fork 10
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
proxy-lifecycle: add HTTP Server with endpoints for proxy lifecycle shutdown #115
Changes from all commits
937d893
722f263
2af6219
a1c21c9
68f206d
892392d
bb0f87a
cde897a
471a087
5b54f12
2852040
bbb3785
c7e8f86
ae041fc
52e5fd5
095aaf0
2b0f0ee
bf9acdb
9833553
f0dfd78
7f9b0f0
8c8141c
f98ce24
91a5b81
bfea751
aadfeed
496d196
4340c2f
52b4557
21595f0
b5e3aea
bf8f0c8
790881e
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,3 @@ | ||
```release-note:feature | ||
Add HTTP server with configurable port and endpoint path for initiating graceful shutdown. | ||
``` |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -7,6 +7,7 @@ import ( | |
"context" | ||
"errors" | ||
"fmt" | ||
"io" | ||
"net" | ||
"net/http" | ||
"strings" | ||
|
@@ -31,8 +32,9 @@ type xdsServer struct { | |
exitedCh chan struct{} | ||
} | ||
|
||
type httpGetter interface { | ||
type httpClient interface { | ||
Get(string) (*http.Response, error) | ||
Post(string, string, io.Reader) (*http.Response, error) | ||
} | ||
|
||
// ConsulDataplane represents the consul-dataplane process | ||
|
@@ -44,6 +46,7 @@ type ConsulDataplane struct { | |
xdsServer *xdsServer | ||
aclToken string | ||
metricsConfig *metricsConfig | ||
lifecycleConfig *lifecycleConfig | ||
} | ||
|
||
// NewConsulDP creates a new instance of ConsulDataplane | ||
|
@@ -209,6 +212,12 @@ func (cdp *ConsulDataplane) Run(ctx context.Context) error { | |
return err | ||
} | ||
|
||
cdp.lifecycleConfig = NewLifecycleConfig(cdp.cfg, proxy) | ||
err = cdp.lifecycleConfig.startLifecycleManager(ctx) | ||
if err != nil { | ||
return err | ||
} | ||
|
||
doneCh := make(chan error) | ||
go func() { | ||
select { | ||
|
@@ -217,12 +226,25 @@ func (cdp *ConsulDataplane) Run(ctx context.Context) error { | |
case <-proxy.Exited(): | ||
doneCh <- errors.New("envoy proxy exited unexpectedly") | ||
case <-cdp.xdsServerExited(): | ||
if err := proxy.Stop(); err != nil { | ||
cdp.logger.Error("failed to stop proxy", "error", err) | ||
// Initiate graceful shutdown of Envoy, kill if error | ||
if err := proxy.Quit(); err != nil { | ||
cdp.logger.Error("failed to stop proxy, will attempt to kill", "error", err) | ||
if err := proxy.Kill(); err != nil { | ||
cdp.logger.Error("failed to kill proxy", "error", err) | ||
} | ||
} | ||
doneCh <- errors.New("xDS server exited unexpectedly") | ||
case <-cdp.metricsConfig.metricsServerExited(): | ||
doneCh <- errors.New("metrics server exited unexpectedly") | ||
case <-cdp.lifecycleConfig.lifecycleServerExited(): | ||
// Initiate graceful shutdown of Envoy, kill if error | ||
if err := proxy.Quit(); err != nil { | ||
cdp.logger.Error("failed to stop proxy", "error", err) | ||
if err := proxy.Kill(); err != nil { | ||
cdp.logger.Error("failed to kill proxy", "error", err) | ||
} | ||
} | ||
doneCh <- errors.New("proxy lifecycle management server exited unexpectedly") | ||
} | ||
}() | ||
return <-doneCh | ||
|
@@ -250,20 +272,33 @@ func (cdp *ConsulDataplane) startDNSProxy(ctx context.Context) error { | |
} | ||
|
||
func (cdp *ConsulDataplane) envoyProxyConfig(cfg []byte) envoy.ProxyConfig { | ||
setConcurrency := true | ||
extraArgs := cdp.cfg.Envoy.ExtraArgs | ||
// Users could set the concurrency as an extra args. Take that as priority for best ux | ||
// experience. | ||
for _, v := range extraArgs { | ||
if v == "--concurrency" { | ||
setConcurrency = false | ||
} | ||
|
||
envoyArgs := map[string]interface{}{ | ||
"--concurrency": cdp.cfg.Envoy.EnvoyConcurrency, | ||
"--drain-time-s": cdp.cfg.Envoy.EnvoyDrainTimeSeconds, | ||
"--drain-strategy": cdp.cfg.Envoy.EnvoyDrainStrategy, | ||
} | ||
if setConcurrency { | ||
extraArgs = append(extraArgs, fmt.Sprintf("--concurrency %v", cdp.cfg.Envoy.EnvoyConcurrency)) | ||
|
||
// Users could set the Envoy concurrency, drain time, or drain strategy as | ||
// extra args. Prioritize values set in that way over passthrough or defaults | ||
// from consul-dataplane. | ||
for envoyArg, cdpEnvoyValue := range envoyArgs { | ||
for _, v := range extraArgs { | ||
// If found in extraArgs, skip setting value from consul-dataplane Envoy | ||
// config | ||
if v == envoyArg { | ||
break | ||
} | ||
} | ||
|
||
// If not found, append value from consul-dataplane Envoy config to extraArgs | ||
extraArgs = append(extraArgs, fmt.Sprintf("%s %v", envoyArg, cdpEnvoyValue)) | ||
} | ||
|
||
return envoy.ProxyConfig{ | ||
AdminAddr: cdp.cfg.Envoy.AdminBindAddress, | ||
AdminBindPort: cdp.cfg.Envoy.AdminBindPort, | ||
Comment on lines
+300
to
+301
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Curious why these are showing up in this PR There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It felt reasonable to pull these out of the consul-dataplane Envoy config at this point when creating the config to pass as the only argument into This was not needed previously, as the Envoy process was just terminated with a process kill signal. |
||
Logger: cdp.logger, | ||
LogJSON: cdp.cfg.Logging.LogJSON, | ||
BootstrapConfig: cfg, | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,195 @@ | ||
// Copyright (c) HashiCorp, Inc. | ||
// SPDX-License-Identifier: MPL-2.0 | ||
|
||
package consuldp | ||
|
||
import ( | ||
"context" | ||
"fmt" | ||
"net/http" | ||
"strconv" | ||
"sync" | ||
"time" | ||
|
||
"github.com/hashicorp/go-hclog" | ||
|
||
"github.com/hashicorp/consul-dataplane/pkg/envoy" | ||
) | ||
|
||
const ( | ||
// defaultLifecycleBindPort is the port which will serve the proxy lifecycle HTTP | ||
// endpoints on the loopback interface. | ||
defaultLifecycleBindPort = "20300" | ||
cdpLifecycleBindAddr = "127.0.0.1" | ||
cdpLifecycleUrl = "http://" + cdpLifecycleBindAddr | ||
|
||
defaultLifecycleShutdownPath = "/graceful_shutdown" | ||
) | ||
|
||
// lifecycleConfig handles all configuration related to managing the Envoy proxy | ||
// lifecycle, including exposing management controls via an HTTP server. | ||
type lifecycleConfig struct { | ||
logger hclog.Logger | ||
|
||
// consuldp proxy lifecycle management config | ||
shutdownDrainListenersEnabled bool | ||
shutdownGracePeriodSeconds int | ||
gracefulPort int | ||
gracefulShutdownPath string | ||
|
||
// manager for controlling the Envoy proxy process | ||
proxy envoy.ProxyManager | ||
|
||
// consuldp proxy lifecycle management server | ||
lifecycleServer *http.Server | ||
|
||
// consuldp proxy lifecycle server control | ||
errorExitCh chan struct{} | ||
running bool | ||
mu sync.Mutex | ||
} | ||
|
||
func NewLifecycleConfig(cfg *Config, proxy envoy.ProxyManager) *lifecycleConfig { | ||
return &lifecycleConfig{ | ||
shutdownDrainListenersEnabled: cfg.Envoy.ShutdownDrainListenersEnabled, | ||
shutdownGracePeriodSeconds: cfg.Envoy.ShutdownGracePeriodSeconds, | ||
gracefulPort: cfg.Envoy.GracefulPort, | ||
gracefulShutdownPath: cfg.Envoy.GracefulShutdownPath, | ||
|
||
proxy: proxy, | ||
|
||
errorExitCh: make(chan struct{}, 1), | ||
mikemorris marked this conversation as resolved.
Show resolved
Hide resolved
|
||
mu: sync.Mutex{}, | ||
} | ||
} | ||
|
||
func (m *lifecycleConfig) startLifecycleManager(ctx context.Context) error { | ||
m.mu.Lock() | ||
defer m.mu.Unlock() | ||
if m.running { | ||
return nil | ||
} | ||
|
||
m.logger = hclog.FromContext(ctx).Named("lifecycle") | ||
m.running = true | ||
go func() { | ||
<-ctx.Done() | ||
m.stopLifecycleServer() | ||
}() | ||
|
||
// Start the server which will expose HTTP endpoints for proxy lifecycle | ||
// management control | ||
mux := http.NewServeMux() | ||
|
||
// Determine what HTTP endpoint paths to configure for the proxy lifecycle | ||
// management server. These can be set as flags. | ||
cdpLifecycleShutdownPath := defaultLifecycleShutdownPath | ||
if m.gracefulShutdownPath != "" { | ||
cdpLifecycleShutdownPath = m.gracefulShutdownPath | ||
} | ||
|
||
// Set config to allow introspection of default path for testing | ||
m.gracefulShutdownPath = cdpLifecycleShutdownPath | ||
|
||
m.logger.Info(fmt.Sprintf("setting graceful shutdown path: %s\n", cdpLifecycleShutdownPath)) | ||
mux.HandleFunc(cdpLifecycleShutdownPath, m.gracefulShutdown) | ||
|
||
// Determine what the proxy lifecycle management server bind port is. It can be | ||
// set as a flag. | ||
cdpLifecycleBindPort := defaultLifecycleBindPort | ||
if m.gracefulPort != 0 { | ||
cdpLifecycleBindPort = strconv.Itoa(m.gracefulPort) | ||
} | ||
m.lifecycleServer = &http.Server{ | ||
Addr: fmt.Sprintf("%s:%s", cdpLifecycleBindAddr, cdpLifecycleBindPort), | ||
Handler: mux, | ||
} | ||
|
||
// Start the proxy lifecycle management server | ||
go m.startLifecycleServer() | ||
|
||
return nil | ||
} | ||
|
||
// startLifecycleServer starts the main proxy lifecycle management server that | ||
// exposes HTTP endpoints for proxy lifecycle control. | ||
func (m *lifecycleConfig) startLifecycleServer() { | ||
m.logger.Info("starting proxy lifecycle management server", "address", m.lifecycleServer.Addr) | ||
err := m.lifecycleServer.ListenAndServe() | ||
if err != nil && err != http.ErrServerClosed { | ||
m.logger.Error("failed to serve proxy lifecycle management requests", "error", err) | ||
close(m.errorExitCh) | ||
} | ||
} | ||
|
||
// stopLifecycleServer stops the consul dataplane proxy lifecycle server | ||
func (m *lifecycleConfig) stopLifecycleServer() { | ||
m.mu.Lock() | ||
defer m.mu.Unlock() | ||
m.running = false | ||
|
||
if m.lifecycleServer != nil { | ||
m.logger.Info("stopping the lifecycle management server") | ||
err := m.lifecycleServer.Close() | ||
if err != nil { | ||
m.logger.Warn("error while closing lifecycle server", "error", err) | ||
close(m.errorExitCh) | ||
} | ||
} | ||
} | ||
|
||
// lifecycleServerExited is used to signal that the lifecycle server | ||
// recieved a signal to initiate shutdown. | ||
func (m *lifecycleConfig) lifecycleServerExited() <-chan struct{} { | ||
return m.errorExitCh | ||
} | ||
|
||
// gracefulShutdown blocks until shutdownGracePeriodSeconds seconds have elapsed, and, if | ||
// configured, will drain inbound connections to Envoy listeners during that time. | ||
func (m *lifecycleConfig) gracefulShutdown(rw http.ResponseWriter, _ *http.Request) { | ||
pglass marked this conversation as resolved.
Show resolved
Hide resolved
|
||
m.logger.Info("initiating shutdown") | ||
|
||
// Create a context that will signal a cancel at the specified duration. | ||
// TODO: should this use lifecycleManager ctx instead of context.Background? | ||
timeout := time.Duration(m.shutdownGracePeriodSeconds) * time.Second | ||
ctx, cancel := context.WithTimeout(context.Background(), timeout) | ||
defer cancel() | ||
|
||
m.logger.Info(fmt.Sprintf("waiting %d seconds before terminating dataplane proxy", m.shutdownGracePeriodSeconds)) | ||
|
||
var wg sync.WaitGroup | ||
wg.Add(1) | ||
|
||
go func() { | ||
defer wg.Done() | ||
|
||
// If shutdownDrainListenersEnabled, initiatie graceful shutdown of Envoy. | ||
// We want to start draining connections from inbound listeners if | ||
// configured, but still allow outbound traffic until gracefulShutdownPeriod | ||
// has elapsed to facilitate a graceful application shutdown. | ||
if m.shutdownDrainListenersEnabled { | ||
err := m.proxy.Drain() | ||
if err != nil { | ||
m.logger.Warn("error while draining Envoy listeners", "error", err) | ||
close(m.errorExitCh) | ||
} | ||
} | ||
|
||
// Block until context timeout has elapsed | ||
<-ctx.Done() | ||
|
||
// Finish graceful shutdown, quit Envoy proxy | ||
m.logger.Info("shutdown grace period timeout reached") | ||
err := m.proxy.Quit() | ||
if err != nil { | ||
m.logger.Warn("error while shutting down Envoy", "error", err) | ||
close(m.errorExitCh) | ||
} | ||
}() | ||
|
||
// Wait for context timeout to elapse | ||
wg.Wait() | ||
pglass marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
// Return HTTP 200 Success | ||
rw.WriteHeader(http.StatusOK) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What kind of impact does this have on the runtime for these tests?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd like to clean this up, but skipped for now in the interest of expediency. It didn't feel substantial enough to warrant the effort at this time, as the full suite still completes in under a minute.