Skip to content

Commit

Permalink
Add docs for Envoy shutdown manager
Browse files Browse the repository at this point in the history
Signed-off-by: Steve Sloka <[email protected]>
  • Loading branch information
stevesloka committed Feb 20, 2020
1 parent ab8a1a8 commit 572fdc2
Show file tree
Hide file tree
Showing 13 changed files with 359 additions and 401 deletions.
2 changes: 1 addition & 1 deletion cmd/contour/contour.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ func main() {
args := os.Args[1:]
switch kingpin.MustParse(app.Parse(args)) {
case shutdownManager.FullCommand():
check(doShutdownManager(shutdownManagerCtx))
doShutdownManager(shutdownManagerCtx)
case bootstrap.FullCommand():
doBootstrap(bootstrapCtx)
case certgenApp.FullCommand():
Expand Down
150 changes: 105 additions & 45 deletions cmd/contour/shutdownmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,51 +15,92 @@ package main

import (
"fmt"
"io"
"log"
"net/http"
"time"

"github.com/projectcontour/contour/internal/metrics"
"github.com/projectcontour/contour/internal/contour"

"github.com/prometheus/common/expfmt"

"github.com/sirupsen/logrus"

"github.com/projectcontour/contour/internal/workgroup"
"gopkg.in/alecthomas/kingpin.v2"
)

// handler for /healthz
const (
prometheusURL = "http://localhost:9001/stats/prometheus"
healthcheckFailURL = "http://localhost:9001/healthcheck/fail"
prometheusStat = "envoy_http_downstream_cx_active"
)

func prometheusLabels() []string {
return []string{contour.ENVOY_HTTP_LISTENER, contour.ENVOY_HTTPS_LISTENER}
}

type shutdownmanagerContext struct {
// checkInterval defines time delay between polling Envoy for open connections
checkInterval time.Duration

// checkDelay defines time to wait before polling Envoy for open connections
checkDelay time.Duration

// minOpenConnections defines the minimum amount of connections
// that can be open when polling for active connections in Envoy
minOpenConnections int

// httpServePort defines what port the shutdown-manager listens on
httpServePort int

logrus.FieldLogger
}

func newShutdownManagerContext() *shutdownmanagerContext {
// Set defaults for parameters which are then overridden via flags, ENV, or ConfigFile
return &shutdownmanagerContext{
checkInterval: 5 * time.Second,
checkDelay: 60 * time.Second,
minOpenConnections: 0,
httpServePort: 8090,
}
}

// handles the /healthz endpoint which is used for the shutdown-manager's liveness probe
func (s *shutdownmanagerContext) healthzHandler(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
_, err := w.Write([]byte("OK"))
if err != nil {
http.StatusText(http.StatusOK)
if _, err := w.Write([]byte("OK")); err != nil {
s.Error(err)
}
}

// handles /shutdown
// shutdownHandler handles the /shutdown endpoint which should be called from a pod preStop hook,
// where it will block pod shutdown until envoy is able to drain connections to below the min-open threshold
func (s *shutdownmanagerContext) shutdownHandler(w http.ResponseWriter, r *http.Request) {
prometheusURL := fmt.Sprintf("http://%s:%d%s", s.envoyHost, s.envoyPort, s.prometheusPath)
envoyAdminURL := fmt.Sprintf("http://%s:%d/healthcheck/fail", s.envoyHost, s.envoyPort)

// Send shutdown signal to Envoy to start draining connections
err := shutdownEnvoy(envoyAdminURL)
s.Infof("failing envoy healthchecks")
err := shutdownEnvoy(healthcheckFailURL)
if err != nil {
s.Errorf("Error sending envoy healthcheck fail: %v", err)
s.Errorf("error sending envoy healthcheck fail: %v", err)
}

s.Infof("Sent healthcheck fail to Envoy...waiting %s before polling for draining connections", s.checkDelay)
s.Infof("waiting %s before polling for draining connections", s.checkDelay)
time.Sleep(s.checkDelay)

for {
openConnections, err := getOpenConnections(prometheusURL, s.prometheusStat, s.prometheusValues)
openConnections, err := getOpenConnections(prometheusURL)
if err != nil {
s.Error(err)
} else {
if openConnections <= s.minOpenConnections {
s.Infof("Found %d open connections with min number of %d connections. Shutting down...", openConnections, s.minOpenConnections)
s.WithField("open_connections", openConnections).
WithField("min_connections", s.minOpenConnections).
Info("min number of open connections found, shutting down")
return
}
s.Infof("Found %d open connections with min number of %d connections. Polling again...", openConnections, s.minOpenConnections)
s.WithField("open_connections", openConnections).
WithField("min_connections", s.minOpenConnections).
Info("polled open connections")
}
time.Sleep(s.checkInterval)
}
Expand All @@ -69,64 +110,83 @@ func (s *shutdownmanagerContext) shutdownHandler(w http.ResponseWriter, r *http.
func shutdownEnvoy(url string) error {
resp, err := http.Post(url, "", nil)
if err != nil {
return fmt.Errorf("creating POST request for URL %q failed: %s", url, err)
return fmt.Errorf("creating healthcheck fail post request failed: %s", err)
}

defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
return fmt.Errorf("POST request for URL %q returned HTTP status %s", url, resp.Status)
return fmt.Errorf("post request for url %q returned http status %s", url, resp.Status)
}
return nil
}

// getOpenConnections parses a http request to a prometheus endpoint returning the sum of values found
func getOpenConnections(url, prometheusStat string, prometheusValues []string) (int, error) {
func getOpenConnections(url string) (int, error) {
// Make request to Envoy Prometheus endpoint
resp, err := http.Get(url)
if err != nil {
return -1, fmt.Errorf("GET request for URL %q failed: %s", url, err)
return -1, fmt.Errorf("get request for metrics failed: %s", err)
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
return -1, fmt.Errorf("GET request for URL %q returned HTTP status %s", url, resp.Status)
return -1, fmt.Errorf("get request for metrics failed with http status %s", resp.Status)
}

// Parse Prometheus listener stats for open connections
return metrics.ParseOpenConnections(resp.Body, prometheusStat, prometheusValues)
return parseOpenConnections(resp.Body)
}

func doShutdownManager(config *shutdownmanagerContext) error {
var g workgroup.Group
// parseOpenConnections returns the sum of open connections from a Prometheus HTTP request
func parseOpenConnections(stats io.Reader) (int, error) {
var parser expfmt.TextParser
openConnections := 0

if stats == nil {
return -1, fmt.Errorf("stats input was nil")
}

g.Add(func(stop <-chan struct{}) error {
config.Info("started envoy shutdown manager")
defer config.Info("stopped")
// Parse Prometheus http response
metricFamilies, err := parser.TextToMetricFamilies(stats)
if err != nil {
return -1, fmt.Errorf("parsing Prometheus text format failed: %v", err)
}

// Validate stat exists in output
if _, ok := metricFamilies[prometheusStat]; !ok {
return -1, fmt.Errorf("error finding Prometheus stat %q in the request result", prometheusStat)
}

http.HandleFunc("/healthz", config.healthzHandler)
http.HandleFunc("/shutdown", config.shutdownHandler)
log.Fatal(http.ListenAndServe(fmt.Sprintf(":%d", config.httpServePort), nil))
// Look up open connections value
for _, metrics := range metricFamilies[prometheusStat].Metric {
for _, labels := range metrics.Label {
for _, item := range prometheusLabels() {
if item == labels.GetValue() {
openConnections += int(metrics.Gauge.GetValue())
}
}
}
}
return openConnections, nil
}

return nil
})
func doShutdownManager(config *shutdownmanagerContext) {
config.Info("started envoy shutdown manager")
defer config.Info("stopped")

return g.Run()
http.HandleFunc("/healthz", config.healthzHandler)
http.HandleFunc("/shutdown", config.shutdownHandler)
log.Fatal(http.ListenAndServe(fmt.Sprintf(":%d", config.httpServePort), nil))
}

// registerShutdownManager registers the envoy shutdown sub-command and flags
func registerShutdownManager(cmd *kingpin.CmdClause, log logrus.FieldLogger) (*kingpin.CmdClause, *shutdownmanagerContext) {
ctx := &shutdownmanagerContext{
FieldLogger: log,
}
ctx := newShutdownManagerContext()
ctx.FieldLogger = log.WithField("context", "shutdown-manager")

shutdownmgr := cmd.Command("shutdown-manager", "Start envoy shutdown-manager.")
shutdownmgr.Flag("check-interval", "Time to poll Envoy for open connections.").Default("5s").DurationVar(&ctx.checkInterval)
shutdownmgr.Flag("check-interval", "Time to poll Envoy for open connections.").DurationVar(&ctx.checkInterval)
shutdownmgr.Flag("check-delay", "Time wait before polling Envoy for open connections.").Default("60s").DurationVar(&ctx.checkDelay)
shutdownmgr.Flag("min-open-connections", "Min number of open connections when polling Envoy.").Default("0").IntVar(&ctx.minOpenConnections)
shutdownmgr.Flag("serve-port", "Port to serve the http server on.").Default("8090").IntVar(&ctx.httpServePort)
shutdownmgr.Flag("prometheus-path", "The path to query Envoy's Prometheus HTTP Endpoint.").Default("/stats/prometheus").StringVar(&ctx.prometheusPath)
shutdownmgr.Flag("prometheus-stat", "Prometheus stat to query.").Default("envoy_http_downstream_cx_active").StringVar(&ctx.prometheusStat)
shutdownmgr.Flag("prometheus-values", "Prometheus values to look for in prometheus-stat.").Default("ingress_http", "ingress_https").StringsVar(&ctx.prometheusValues)
shutdownmgr.Flag("envoy-host", "HTTP endpoint for Envoy's stats page.").Default("localhost").StringVar(&ctx.envoyHost)
shutdownmgr.Flag("envoy-port", "HTTP port for Envoy's stats page.").Default("9001").IntVar(&ctx.envoyPort)

shutdownmgr.Flag("min-open-connections", "Min number of open connections when polling Envoy.").IntVar(&ctx.minOpenConnections)
shutdownmgr.Flag("serve-port", "Port to serve the http server on.").IntVar(&ctx.httpServePort)
return shutdownmgr, ctx
}
Loading

0 comments on commit 572fdc2

Please sign in to comment.