Skip to content

Commit

Permalink
feat: add support for health-check flag (#1271)
Browse files Browse the repository at this point in the history
  • Loading branch information
enocom committed Aug 23, 2022
1 parent d252f3c commit fdc6e06
Show file tree
Hide file tree
Showing 11 changed files with 673 additions and 60 deletions.
101 changes: 62 additions & 39 deletions cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"contrib.go.opencensus.io/exporter/prometheus"
"contrib.go.opencensus.io/exporter/stackdriver"
"github.com/GoogleCloudPlatform/cloudsql-proxy/v2/cloudsql"
"github.com/GoogleCloudPlatform/cloudsql-proxy/v2/internal/healthcheck"
"github.com/GoogleCloudPlatform/cloudsql-proxy/v2/internal/log"
"github.com/GoogleCloudPlatform/cloudsql-proxy/v2/internal/proxy"
"github.com/spf13/cobra"
Expand Down Expand Up @@ -76,6 +77,7 @@ type Command struct {
telemetryProject string
telemetryPrefix string
prometheusNamespace string
healthCheck bool
httpPort string
}

Expand Down Expand Up @@ -157,7 +159,6 @@ When this flag is not set, there is no limit.`)
to close after receiving a TERM signal. The proxy will shut
down when the number of open connections reaches 0 or when
the maximum time has passed. Defaults to 0s.`)

cmd.PersistentFlags().StringVar(&c.telemetryProject, "telemetry-project", "",
"Enable Cloud Monitoring and Cloud Trace integration with the provided project ID.")
cmd.PersistentFlags().BoolVar(&c.disableTraces, "disable-traces", false,
Expand All @@ -172,12 +173,16 @@ the maximum time has passed. Defaults to 0s.`)
"Enable Prometheus for metric collection using the provided namespace")
cmd.PersistentFlags().StringVar(&c.httpPort, "http-port", "9090",
"Port for the Prometheus server to use")
cmd.PersistentFlags().BoolVar(&c.healthCheck, "health-check", false,
`Enables HTTP endpoints /startup, /liveness, and /readiness
that report on the proxy's health. Endpoints are available on localhost
only. Uses the port specified by the http-port flag.`)
cmd.PersistentFlags().StringVar(&c.conf.APIEndpointURL, "sqladmin-api-endpoint", "",
"When set, the proxy uses this url as the API endpoint for all Cloud SQL Admin API requests.\nExample: https://sqladmin.googleapis.com")
cmd.PersistentFlags().StringVar(&c.conf.QuotaProject, "quota-project", "",
`Specifies the project to use for Cloud SQL Admin API quota tracking.
The IAM principal must have the "serviceusage.services.use" permission
for the given project. See https://cloud.google.com/service-usage/docs/overview and
for the given project. See https://cloud.google.com/service-usage/docs/overview and
https://cloud.google.com/storage/docs/requester-pays`)

// Global and per instance flags
Expand Down Expand Up @@ -225,18 +230,18 @@ func parseConfig(cmd *Command, conf *proxy.Config, args []string) error {
return newBadCommandError("cannot specify --credentials-file and --gcloud-auth flags at the same time")
}

if userHasSet("http-port") && !userHasSet("prometheus-namespace") {
return newBadCommandError("cannot specify --http-port without --prometheus-namespace")
if userHasSet("http-port") && !userHasSet("prometheus-namespace") && !userHasSet("health-check") {
cmd.logger.Infof("Ignoring --http-port because --prometheus-namespace or --health-check was not set")
}

if !userHasSet("telemetry-project") && userHasSet("telemetry-prefix") {
cmd.logger.Infof("Ignoring telementry-prefix as telemetry-project was not set")
cmd.logger.Infof("Ignoring --telementry-prefix because --telemetry-project was not set")
}
if !userHasSet("telemetry-project") && userHasSet("disable-metrics") {
cmd.logger.Infof("Ignoring disable-metrics as telemetry-project was not set")
cmd.logger.Infof("Ignoring --disable-metrics because --telemetry-project was not set")
}
if !userHasSet("telemetry-project") && userHasSet("disable-traces") {
cmd.logger.Infof("Ignoring disable-traces as telemetry-project was not set")
cmd.logger.Infof("Ignoring --disable-traces because --telemetry-project was not set")
}

if userHasSet("sqladmin-api-endpoint") && conf.APIEndpointURL != "" {
Expand Down Expand Up @@ -364,9 +369,8 @@ func runSignalWrapper(cmd *Command) error {
ctx, cancel := context.WithCancel(cmd.Context())
defer cancel()

// Configure Cloud Trace and/or Cloud Monitoring based on command
// invocation. If a project has not been enabled, no traces or metrics are
// enabled.
// Configure collectors before the proxy has started to ensure we are
// collecting metrics before *ANY* Cloud SQL Admin API calls are made.
enableMetrics := !cmd.disableMetrics
enableTraces := !cmd.disableTraces
if cmd.telemetryProject != "" && (enableMetrics || enableTraces) {
Expand Down Expand Up @@ -394,40 +398,22 @@ func runSignalWrapper(cmd *Command) error {
}()
}

shutdownCh := make(chan error)

var (
needsHTTPServer bool
mux = http.NewServeMux()
)
if cmd.prometheusNamespace != "" {
needsHTTPServer = true
e, err := prometheus.NewExporter(prometheus.Options{
Namespace: cmd.prometheusNamespace,
})
if err != nil {
return err
}
mux := http.NewServeMux()
mux.Handle("/metrics", e)
addr := fmt.Sprintf("localhost:%s", cmd.httpPort)
server := &http.Server{Addr: addr, Handler: mux}
go func() {
select {
case <-ctx.Done():
// Give the HTTP server a second to shutdown cleanly.
ctx2, _ := context.WithTimeout(context.Background(), time.Second)
if err := server.Shutdown(ctx2); err != nil {
cmd.logger.Errorf("failed to shutdown Prometheus HTTP server: %v\n", err)
}
}
}()
go func() {
err := server.ListenAndServe()
if err == http.ErrServerClosed {
return
}
if err != nil {
shutdownCh <- fmt.Errorf("failed to start prometheus HTTP server: %v", err)
}
}()
}

shutdownCh := make(chan error)
// watch for sigterm / sigint signals
signals := make(chan os.Signal, 1)
signal.Notify(signals, syscall.SIGTERM, syscall.SIGINT)
Expand Down Expand Up @@ -465,18 +451,55 @@ func runSignalWrapper(cmd *Command) error {
cmd.logger.Errorf("The proxy has encountered a terminal error: %v", err)
return err
case p = <-startCh:
cmd.logger.Infof("The proxy has started successfully and is ready for new connections!")
}
cmd.logger.Infof("The proxy has started successfully and is ready for new connections!")
defer p.Close()
defer func() {
if cErr := p.Close(); cErr != nil {
cmd.logger.Errorf("error during shutdown: %v", cErr)
}
}()

go func() {
shutdownCh <- p.Serve(ctx)
}()
notify := func() {}
if cmd.healthCheck {
needsHTTPServer = true
hc := healthcheck.NewCheck(p, cmd.logger)
mux.HandleFunc("/startup", hc.HandleStartup)
mux.HandleFunc("/readiness", hc.HandleReadiness)
mux.HandleFunc("/liveness", hc.HandleLiveness)
notify = hc.NotifyStarted
}

// Start the HTTP server if anything requiring HTTP is specified.
if needsHTTPServer {
server := &http.Server{
Addr: fmt.Sprintf("localhost:%s", cmd.httpPort),
Handler: mux,
}
// Start the HTTP server.
go func() {
err := server.ListenAndServe()
if err == http.ErrServerClosed {
return
}
if err != nil {
shutdownCh <- fmt.Errorf("failed to start HTTP server: %v", err)
}
}()
// Handle shutdown of the HTTP server gracefully.
go func() {
select {
case <-ctx.Done():
// Give the HTTP server a second to shutdown cleanly.
ctx2, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
if err := server.Shutdown(ctx2); err != nil {
cmd.logger.Errorf("failed to shutdown Prometheus HTTP server: %v\n", err)
}
}
}()
}

go func() { shutdownCh <- p.Serve(ctx, notify) }()

err := <-shutdownCh
switch {
Expand Down
4 changes: 0 additions & 4 deletions cmd/root_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -516,10 +516,6 @@ func TestNewCommandWithErrors(t *testing.T) {
desc: "when the iam authn login query param is bogus",
args: []string{"proj:region:inst?auto-iam-authn=nope"},
},
{
desc: "enabling a Prometheus port without a namespace",
args: []string{"--http-port", "1111", "proj:region:inst"},
},
{
desc: "using an invalid url for sqladmin-api-endpoint",
args: []string{"--sqladmin-api-endpoint", "https://user:abc{[email protected]:5432/db?sslmode=require", "proj:region:inst"},
Expand Down
109 changes: 109 additions & 0 deletions internal/healthcheck/healthcheck.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
// Copyright 2022 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// Package healthcheck tests and communicates the health of the Cloud SQL Auth proxy.
package healthcheck

import (
"context"
"errors"
"fmt"
"net/http"
"sync"

"github.com/GoogleCloudPlatform/cloudsql-proxy/v2/cloudsql"
"github.com/GoogleCloudPlatform/cloudsql-proxy/v2/internal/proxy"
)

// Check provides HTTP handlers for use as healthchecks typically in a
// Kubernetes context.
type Check struct {
once *sync.Once
started chan struct{}
proxy *proxy.Client
logger cloudsql.Logger
}

// NewCheck is the initializer for Check.
func NewCheck(p *proxy.Client, l cloudsql.Logger) *Check {
return &Check{
once: &sync.Once{},
started: make(chan struct{}),
proxy: p,
logger: l,
}
}

// NotifyStarted notifies the check that the proxy has started up successfully.
func (c *Check) NotifyStarted() {
c.once.Do(func() { close(c.started) })
}

// HandleStartup reports whether the Check has been notified of startup.
func (c *Check) HandleStartup(w http.ResponseWriter, _ *http.Request) {
select {
case <-c.started:
w.WriteHeader(http.StatusOK)
w.Write([]byte("ok"))
default:
w.WriteHeader(http.StatusServiceUnavailable)
w.Write([]byte("error"))
}
}

var errNotStarted = errors.New("proxy is not started")

// HandleReadiness ensures the Check has been notified of successful startup,
// that the proxy has not reached maximum connections, and that all connections
// are healthy.
func (c *Check) HandleReadiness(w http.ResponseWriter, _ *http.Request) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

select {
case <-c.started:
default:
c.logger.Errorf("[Health Check] Readiness failed: %v", errNotStarted)
w.WriteHeader(http.StatusServiceUnavailable)
w.Write([]byte(errNotStarted.Error()))
return
}

if open, max := c.proxy.ConnCount(); max > 0 && open == max {
err := fmt.Errorf("max connections reached (open = %v, max = %v)", open, max)
c.logger.Errorf("[Health Check] Readiness failed: %v", err)
w.WriteHeader(http.StatusServiceUnavailable)
w.Write([]byte(err.Error()))
return
}

err := c.proxy.CheckConnections(ctx)
if err != nil {
c.logger.Errorf("[Health Check] Readiness failed: %v", err)
w.WriteHeader(http.StatusServiceUnavailable)
w.Write([]byte(err.Error()))
return
}

w.WriteHeader(http.StatusOK)
w.Write([]byte("ok"))
}

// HandleLiveness indicates the process is up and responding to HTTP requests.
// If this check fails (because it's not reachable), the process is in a bad
// state and should be restarted.
func (c *Check) HandleLiveness(w http.ResponseWriter, _ *http.Request) {
w.WriteHeader(http.StatusOK)
w.Write([]byte("ok"))
}
Loading

0 comments on commit fdc6e06

Please sign in to comment.