From 34b05802258b21e44de663bfb303719ceebc832f Mon Sep 17 00:00:00 2001 From: Manuel Alejandro de Brito Fontes Date: Mon, 21 Jan 2019 11:29:36 -0300 Subject: [PATCH] Replace Status port using a socket --- cmd/nginx/flags.go | 13 +- cmd/nginx/main.go | 2 +- docs/user-guide/cli-arguments.md | 3 +- internal/ingress/controller/checker.go | 45 ++--- internal/ingress/controller/checker_test.go | 45 +++-- internal/ingress/controller/config/config.go | 6 +- internal/ingress/controller/controller.go | 10 +- internal/ingress/controller/nginx.go | 57 +++--- internal/ingress/controller/nginx_test.go | 189 +++++++++--------- .../ingress/metric/collectors/nginx_status.go | 56 ++---- .../metric/collectors/nginx_status_test.go | 46 ++++- internal/ingress/metric/main.go | 4 +- internal/nginx/main.go | 101 ++++++++++ rootfs/etc/nginx/template/nginx.tmpl | 67 ++----- test/e2e/lua/dynamic_configuration.go | 8 +- 15 files changed, 350 insertions(+), 302 deletions(-) create mode 100644 internal/nginx/main.go diff --git a/cmd/nginx/flags.go b/cmd/nginx/flags.go index df5a8167ba..cdf9afb9e9 100644 --- a/cmd/nginx/flags.go +++ b/cmd/nginx/flags.go @@ -31,6 +31,7 @@ import ( "k8s.io/ingress-nginx/internal/ingress/controller" ngx_config "k8s.io/ingress-nginx/internal/ingress/controller/config" ing_net "k8s.io/ingress-nginx/internal/net" + "k8s.io/ingress-nginx/internal/nginx" ) func parseFlags() (bool, *controller.Configuration, error) { @@ -151,7 +152,7 @@ Feature backed by OpenResty Lua libraries. Requires that OCSP stapling is not en httpPort = flags.Int("http-port", 80, `Port to use for servicing HTTP traffic.`) httpsPort = flags.Int("https-port", 443, `Port to use for servicing HTTPS traffic.`) - statusPort = flags.Int("status-port", 18080, `Port to use for exposing NGINX status pages.`) + _ = flags.Int("status-port", 18080, `Port to use for exposing NGINX status pages.`) sslProxyPort = flags.Int("ssl-passthrough-proxy-port", 442, `Port to use internally for SSL Passthrough.`) defServerPort = flags.Int("default-server-port", 8181, `Port to use for exposing the default server (catch-all).`) healthzPort = flags.Int("healthz-port", 10254, "Port to use for the healthz endpoint.") @@ -160,7 +161,7 @@ Feature backed by OpenResty Lua libraries. Requires that OCSP stapling is not en `Disable support for catch-all Ingresses`) ) - flags.MarkDeprecated("sort-backends", "Feature removed because of the lua load balancer that removed the need of reloads for change in endpoints") + flags.MarkDeprecated("status-port", `The status port is a unix socket now.`) flag.Set("logtostderr", "true") @@ -200,10 +201,6 @@ Feature backed by OpenResty Lua libraries. Requires that OCSP stapling is not en return false, nil, fmt.Errorf("Port %v is already in use. Please check the flag --https-port", *httpsPort) } - if !ing_net.IsPortAvailable(*statusPort) { - return false, nil, fmt.Errorf("Port %v is already in use. Please check the flag --status-port", *statusPort) - } - if !ing_net.IsPortAvailable(*defServerPort) { return false, nil, fmt.Errorf("Port %v is already in use. Please check the flag --default-server-port", *defServerPort) } @@ -224,6 +221,8 @@ Feature backed by OpenResty Lua libraries. Requires that OCSP stapling is not en return false, nil, fmt.Errorf("Flags --publish-service and --publish-status-address are mutually exclusive") } + nginx.HealthPath = *defHealthzURL + config := &controller.Configuration{ APIServerHost: *apiserverHost, KubeConfigFile: *kubeConfigFile, @@ -241,7 +240,6 @@ Feature backed by OpenResty Lua libraries. Requires that OCSP stapling is not en TCPConfigMapName: *tcpConfigMapName, UDPConfigMapName: *udpConfigMapName, DefaultSSLCertificate: *defSSLCertificate, - DefaultHealthzURL: *defHealthzURL, HealthCheckTimeout: *healthCheckTimeout, PublishService: *publishSvc, PublishStatusAddress: *publishStatusAddress, @@ -256,7 +254,6 @@ Feature backed by OpenResty Lua libraries. Requires that OCSP stapling is not en HTTP: *httpPort, HTTPS: *httpsPort, SSLProxy: *sslProxyPort, - Status: *statusPort, }, DisableCatchAll: *disableCatchAll, } diff --git a/cmd/nginx/main.go b/cmd/nginx/main.go index 5f5b4b9cb2..0ccbd6ed2e 100644 --- a/cmd/nginx/main.go +++ b/cmd/nginx/main.go @@ -131,7 +131,7 @@ func main() { mc := metric.NewDummyCollector() if conf.EnableMetrics { - mc, err = metric.NewCollector(conf.ListenPorts.Status, conf.MetricsPerHost, reg) + mc, err = metric.NewCollector(conf.MetricsPerHost, reg) if err != nil { klog.Fatalf("Error creating prometheus collector: %v", err) } diff --git a/docs/user-guide/cli-arguments.md b/docs/user-guide/cli-arguments.md index a593fd4146..4ec5cd3f6e 100644 --- a/docs/user-guide/cli-arguments.md +++ b/docs/user-guide/cli-arguments.md @@ -34,7 +34,6 @@ They are set in the container spec of the `nginx-ingress-controller` Deployment | `--report-node-internal-ip-address` | Set the load-balancer status of Ingress objects to internal Node addresses instead of external. Requires the update-status parameter. | | `--sort-backends` | Sort servers inside NGINX upstreams. | | `--ssl-passthrough-proxy-port int` | Port to use internally for SSL Passthrough. (default 442) | -| `--status-port int` | Port to use for exposing NGINX status pages. (default 18080) | | `--stderrthreshold severity` | logs at or above this threshold go to stderr (default 2) | | `--sync-period duration` | Period at which the controller forces the repopulation of its local object stores. Disabled by default. | | `--sync-rate-limit float32` | Define the sync frequency upper limit (default 0.3) | @@ -46,4 +45,4 @@ They are set in the container spec of the `nginx-ingress-controller` Deployment | `--version` | Show release information about the NGINX Ingress controller and exit. | | `--vmodule moduleSpec` | comma-separated list of pattern=N settings for file-filtered logging | | `--watch-namespace string` | Namespace the controller watches for updates to Kubernetes objects. This includes Ingresses, Services and all configuration resources. All namespaces are watched if this parameter is left empty. | -| `--disable-catch-all` | Disable support for catch-all Ingresses. | +| `--disable-catch-all` | Disable support for catch-all Ingresses. | \ No newline at end of file diff --git a/internal/ingress/controller/checker.go b/internal/ingress/controller/checker.go index 9ab499888f..b2beaa79d0 100644 --- a/internal/ingress/controller/checker.go +++ b/internal/ingress/controller/checker.go @@ -21,13 +21,13 @@ import ( "net/http" "strconv" "strings" - "time" "github.com/ncabatoff/process-exporter/proc" "github.com/pkg/errors" -) + "k8s.io/klog" -const nginxPID = "/tmp/nginx.pid" + "k8s.io/ingress-nginx/internal/nginx" +) // Name returns the healthcheck name func (n NGINXController) Name() string { @@ -36,25 +36,25 @@ func (n NGINXController) Name() string { // Check returns if the nginx healthz endpoint is returning ok (status code 200) func (n *NGINXController) Check(_ *http.Request) error { - - url := fmt.Sprintf("http://127.0.0.1:%v%v", n.cfg.ListenPorts.Status, ngxHealthPath) - timeout := n.cfg.HealthCheckTimeout - statusCode, err := simpleGet(url, timeout) + statusCode, _, err := nginx.NewGetStatusRequest(nginx.HealthPath) if err != nil { + klog.Errorf("healthcheck error: %v", err) return err } if statusCode != 200 { + klog.Errorf("healthcheck error: %v", statusCode) return fmt.Errorf("ingress controller is not healthy") } - url = fmt.Sprintf("http://127.0.0.1:%v/is-dynamic-lb-initialized", n.cfg.ListenPorts.Status) - statusCode, err = simpleGet(url, timeout) + statusCode, _, err = nginx.NewGetStatusRequest("/is-dynamic-lb-initialized") if err != nil { + klog.Errorf("healthcheck error: %v", err) return err } if statusCode != 200 { + klog.Errorf("healthcheck error: %v", statusCode) return fmt.Errorf("dynamic load balancer not started") } @@ -63,35 +63,14 @@ func (n *NGINXController) Check(_ *http.Request) error { if err != nil { return errors.Wrap(err, "unexpected error reading /proc directory") } - f, err := n.fileSystem.ReadFile(nginxPID) + f, err := n.fileSystem.ReadFile(nginx.PID) if err != nil { - return errors.Wrapf(err, "unexpected error reading %v", nginxPID) + return errors.Wrapf(err, "unexpected error reading %v", nginx.PID) } pid, err := strconv.Atoi(strings.TrimRight(string(f), "\r\n")) if err != nil { - return errors.Wrapf(err, "unexpected error reading the nginx PID from %v", nginxPID) + return errors.Wrapf(err, "unexpected error reading the nginx PID from %v", nginx.PID) } _, err = fs.NewProc(pid) - return err } - -func simpleGet(url string, timeout time.Duration) (int, error) { - client := &http.Client{ - Timeout: timeout * time.Second, - Transport: &http.Transport{DisableKeepAlives: true}, - } - - req, err := http.NewRequest("GET", url, nil) - if err != nil { - return -1, err - } - - res, err := client.Do(req) - if err != nil { - return -1, err - } - defer res.Body.Close() - - return res.StatusCode, nil -} diff --git a/internal/ingress/controller/checker_test.go b/internal/ingress/controller/checker_test.go index 4e2385cf4f..290c12ea15 100644 --- a/internal/ingress/controller/checker_test.go +++ b/internal/ingress/controller/checker_test.go @@ -21,6 +21,7 @@ import ( "net" "net/http" "net/http/httptest" + "os" "os/exec" "testing" @@ -29,27 +30,37 @@ import ( "k8s.io/ingress-nginx/internal/file" ngx_config "k8s.io/ingress-nginx/internal/ingress/controller/config" + "k8s.io/ingress-nginx/internal/nginx" ) func TestNginxCheck(t *testing.T) { mux := http.NewServeMux() - server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - w.WriteHeader(http.StatusOK) - fmt.Fprintf(w, "ok") - })) + listener, err := net.Listen("unix", nginx.StatusSocket) + if err != nil { + t.Errorf("crating unix listener: %s", err) + } + defer listener.Close() + defer os.Remove(nginx.StatusSocket) + + server := &httptest.Server{ + Listener: listener, + Config: &http.Server{ + Handler: http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusOK) + fmt.Fprintf(w, "ok") + }), + }, + } defer server.Close() - // port to be used in the check - p := server.Listener.Addr().(*net.TCPAddr).Port + server.Start() // mock filesystem - fs := filesystem.NewFakeFs() + fs := filesystem.DefaultFs{} n := &NGINXController{ cfg: &Configuration{ - ListenPorts: &ngx_config.ListenPorts{ - Status: p, - }, + ListenPorts: &ngx_config.ListenPorts{}, }, fileSystem: fs, } @@ -62,7 +73,7 @@ func TestNginxCheck(t *testing.T) { // create pid file fs.MkdirAll("/tmp", file.ReadWriteByUser) - pidFile, err := fs.Create(nginxPID) + pidFile, err := fs.Create(nginx.PID) if err != nil { t.Fatalf("unexpected error: %v", err) } @@ -102,20 +113,14 @@ func TestNginxCheck(t *testing.T) { t.Error("expected an error but none returned") } }) - - t.Run("invalid port", func(t *testing.T) { - n.cfg.ListenPorts.Status = 9000 - if err := callHealthz(true, mux); err == nil { - t.Error("expected an error but none returned") - } - }) } func callHealthz(expErr bool, mux *http.ServeMux) error { - req, err := http.NewRequest("GET", "http://localhost:8080/healthz", nil) + req, err := http.NewRequest("GET", "/healthz", nil) if err != nil { - return err + return fmt.Errorf("healthz error: %v", err) } + w := httptest.NewRecorder() mux.ServeHTTP(w, req) diff --git a/internal/ingress/controller/config/config.go b/internal/ingress/controller/config/config.go index ac1551f4bb..14a23d91b6 100644 --- a/internal/ingress/controller/config/config.go +++ b/internal/ingress/controller/config/config.go @@ -726,6 +726,11 @@ type TemplateConfig struct { PublishService *apiv1.Service DynamicCertificatesEnabled bool EnableMetrics bool + + PID string + StatusSocket string + StatusPath string + StreamSocket string } // ListenPorts describe the ports required to run the @@ -733,7 +738,6 @@ type TemplateConfig struct { type ListenPorts struct { HTTP int HTTPS int - Status int Health int Default int SSLProxy int diff --git a/internal/ingress/controller/controller.go b/internal/ingress/controller/controller.go index f180e905e9..47257336ce 100644 --- a/internal/ingress/controller/controller.go +++ b/internal/ingress/controller/controller.go @@ -65,7 +65,6 @@ type Configuration struct { // +optional UDPConfigMapName string - DefaultHealthzURL string HealthCheckTimeout time.Duration DefaultSSLCertificate string @@ -195,10 +194,8 @@ func (n *NGINXController) syncIngress(interface{}) error { isFirstSync := n.runningConfig.Equal(&ingress.Configuration{}) if isFirstSync { - // For the initial sync it always takes some time for NGINX to - // start listening on the configured port (default 18080) - // For large configurations it might take a while so we loop - // and back off + // For the initial sync it always takes some time for NGINX to start listening + // For large configurations it might take a while so we loop and back off klog.Info("Initial sync, sleeping for 1 second.") time.Sleep(1 * time.Second) } @@ -211,7 +208,7 @@ func (n *NGINXController) syncIngress(interface{}) error { } err := wait.ExponentialBackoff(retry, func() (bool, error) { - err := configureDynamically(pcfg, n.cfg.ListenPorts.Status, n.cfg.DynamicCertificatesEnabled) + err := configureDynamically(pcfg, n.cfg.DynamicCertificatesEnabled) if err == nil { klog.V(2).Infof("Dynamic reconfiguration succeeded.") return true, nil @@ -255,7 +252,6 @@ func (n *NGINXController) getStreamServices(configmapName string, proto apiv1.Pr n.cfg.ListenPorts.HTTP, n.cfg.ListenPorts.HTTPS, n.cfg.ListenPorts.SSLProxy, - n.cfg.ListenPorts.Status, n.cfg.ListenPorts.Health, n.cfg.ListenPorts.Default, } diff --git a/internal/ingress/controller/nginx.go b/internal/ingress/controller/nginx.go index 4d4c3328cf..ee119a1f16 100644 --- a/internal/ingress/controller/nginx.go +++ b/internal/ingress/controller/nginx.go @@ -60,14 +60,13 @@ import ( ing_net "k8s.io/ingress-nginx/internal/net" "k8s.io/ingress-nginx/internal/net/dns" "k8s.io/ingress-nginx/internal/net/ssl" + "k8s.io/ingress-nginx/internal/nginx" "k8s.io/ingress-nginx/internal/task" "k8s.io/ingress-nginx/internal/watch" ) const ( - ngxHealthPath = "/healthz" - nginxStreamSocket = "/tmp/ingress-stream.sock" - tempNginxPattern = "nginx-cfg" + tempNginxPattern = "nginx-cfg" ) var ( @@ -595,7 +594,6 @@ func (n *NGINXController) OnUpdate(ingressCfg ingress.Configuration) error { Servers: ingressCfg.Servers, TCPBackends: ingressCfg.TCPEndpoints, UDPBackends: ingressCfg.UDPEndpoints, - HealthzURI: ngxHealthPath, CustomErrors: len(cfg.CustomHTTPErrors) > 0, Cfg: cfg, IsIPV6Enabled: n.isIPV6Enabled && !cfg.DisableIpv6, @@ -607,6 +605,12 @@ func (n *NGINXController) OnUpdate(ingressCfg ingress.Configuration) error { PublishService: n.GetPublishService(), DynamicCertificatesEnabled: n.cfg.DynamicCertificatesEnabled, EnableMetrics: n.cfg.EnableMetrics, + + HealthzURI: nginx.HealthPath, + PID: nginx.PID, + StatusSocket: nginx.StatusSocket, + StatusPath: nginx.StatusPath, + StreamSocket: nginx.StreamSocket, } tc.Cfg.Checksum = ingressCfg.ConfigurationChecksum @@ -772,7 +776,7 @@ func (n *NGINXController) IsDynamicConfigurationEnough(pcfg *ingress.Configurati // configureDynamically encodes new Backends in JSON format and POSTs the // payload to an internal HTTP endpoint handled by Lua. -func configureDynamically(pcfg *ingress.Configuration, port int, isDynamicCertificatesEnabled bool) error { +func configureDynamically(pcfg *ingress.Configuration, isDynamicCertificatesEnabled bool) error { backends := make([]*ingress.Backend, len(pcfg.Backends)) for i, backend := range pcfg.Backends { @@ -805,12 +809,15 @@ func configureDynamically(pcfg *ingress.Configuration, port int, isDynamicCertif backends[i] = luaBackend } - url := fmt.Sprintf("http://localhost:%d/configuration/backends", port) - err := post(url, backends) + statusCode, _, err := nginx.NewPostStatusRequest("/configuration/backends", "application/json", backends) if err != nil { return err } + if statusCode != http.StatusCreated { + return fmt.Errorf("unexpected error code: %d", statusCode) + } + streams := make([]ingress.Backend, 0) for _, ep := range pcfg.TCPEndpoints { var service *apiv1.Service @@ -846,16 +853,19 @@ func configureDynamically(pcfg *ingress.Configuration, port int, isDynamicCertif return err } - url = fmt.Sprintf("http://localhost:%d/configuration/general", port) - err = post(url, &ingress.GeneralConfig{ + statusCode, _, err = nginx.NewPostStatusRequest("/configuration/general", "application/json", ingress.GeneralConfig{ ControllerPodsCount: pcfg.ControllerPodsCount, }) if err != nil { return err } + if statusCode != http.StatusCreated { + return fmt.Errorf("unexpected error code: %d", statusCode) + } + if isDynamicCertificatesEnabled { - err = configureCertificates(pcfg, port) + err = configureCertificates(pcfg) if err != nil { return err } @@ -865,7 +875,7 @@ func configureDynamically(pcfg *ingress.Configuration, port int, isDynamicCertif } func updateStreamConfiguration(streams []ingress.Backend) error { - conn, err := net.Dial("unix", nginxStreamSocket) + conn, err := net.Dial("unix", nginx.StreamSocket) if err != nil { return err } @@ -890,7 +900,7 @@ func updateStreamConfiguration(streams []ingress.Backend) error { // configureCertificates JSON encodes certificates and POSTs it to an internal HTTP endpoint // that is handled by Lua -func configureCertificates(pcfg *ingress.Configuration, port int) error { +func configureCertificates(pcfg *ingress.Configuration) error { var servers []*ingress.Server for _, server := range pcfg.Servers { @@ -902,30 +912,13 @@ func configureCertificates(pcfg *ingress.Configuration, port int) error { }) } - url := fmt.Sprintf("http://localhost:%d/configuration/servers", port) - return post(url, servers) -} - -func post(url string, data interface{}) error { - buf, err := json.Marshal(data) - if err != nil { - return err - } - - klog.V(2).Infof("Posting to %s", url) - resp, err := http.Post(url, "application/json", bytes.NewReader(buf)) + statusCode, _, err := nginx.NewPostStatusRequest("/configuration/servers", "application/json", servers) if err != nil { return err } - defer func() { - if err := resp.Body.Close(); err != nil { - klog.Warningf("Error while closing response body:\n%v", err) - } - }() - - if resp.StatusCode != http.StatusCreated { - return fmt.Errorf("unexpected error code: %d", resp.StatusCode) + if statusCode != http.StatusCreated { + return fmt.Errorf("unexpected error code: %d", statusCode) } return nil diff --git a/internal/ingress/controller/nginx_test.go b/internal/ingress/controller/nginx_test.go index 3d17981a24..1de35f97db 100644 --- a/internal/ingress/controller/nginx_test.go +++ b/internal/ingress/controller/nginx_test.go @@ -32,6 +32,7 @@ import ( apiv1 "k8s.io/api/core/v1" "k8s.io/ingress-nginx/internal/ingress" + "k8s.io/ingress-nginx/internal/nginx" ) func TestIsDynamicConfigurationEnough(t *testing.T) { @@ -149,32 +150,62 @@ func TestIsDynamicConfigurationEnough(t *testing.T) { } } -func mockUnixSocket(t *testing.T) net.Listener { - l, err := net.Listen("unix", nginxStreamSocket) +func TestConfigureDynamically(t *testing.T) { + listener, err := net.Listen("unix", nginx.StatusSocket) if err != nil { - t.Fatalf("unexpected error creating unix socket: %v", err) + t.Errorf("crating unix listener: %s", err) } - if l == nil { - t.Fatalf("expected a listener but none returned") + defer listener.Close() + defer os.Remove(nginx.StatusSocket) + + streamListener, err := net.Listen("unix", nginx.StreamSocket) + if err != nil { + t.Errorf("crating unix listener: %s", err) } + defer streamListener.Close() + defer os.Remove(nginx.StreamSocket) - go func() { - for { - conn, err := l.Accept() - if err != nil { - continue - } + server := &httptest.Server{ + Listener: listener, + Config: &http.Server{ + Handler: http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusCreated) - time.Sleep(100 * time.Millisecond) - defer conn.Close() - } - }() + if r.Method != "POST" { + t.Errorf("expected a 'POST' request, got '%s'", r.Method) + } - return l -} -func TestConfigureDynamically(t *testing.T) { - l := mockUnixSocket(t) - defer l.Close() + b, err := ioutil.ReadAll(r.Body) + if err != nil && err != io.EOF { + t.Fatal(err) + } + body := string(b) + + switch r.URL.Path { + case "/configuration/backends": + { + if strings.Contains(body, "target") { + t.Errorf("unexpected target reference in JSON content: %v", body) + } + + if !strings.Contains(body, "service") { + t.Errorf("service reference should be present in JSON content: %v", body) + } + } + case "/configuration/general": + { + if !strings.Contains(body, "controllerPodsCount") { + t.Errorf("controllerPodsCount should be present in JSON content: %v", body) + } + } + default: + t.Errorf("unknown request to %s", r.URL.Path) + } + }), + }, + } + defer server.Close() + server.Start() target := &apiv1.ObjectReference{} @@ -212,46 +243,7 @@ func TestConfigureDynamically(t *testing.T) { ControllerPodsCount: 2, } - ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - w.WriteHeader(http.StatusCreated) - - if r.Method != "POST" { - t.Errorf("expected a 'POST' request, got '%s'", r.Method) - } - - b, err := ioutil.ReadAll(r.Body) - if err != nil && err != io.EOF { - t.Fatal(err) - } - body := string(b) - - switch r.URL.Path { - case "/configuration/backends": - { - if strings.Contains(body, "target") { - t.Errorf("unexpected target reference in JSON content: %v", body) - } - - if !strings.Contains(body, "service") { - t.Errorf("service reference should be present in JSON content: %v", body) - } - } - case "/configuration/general": - { - if !strings.Contains(body, "controllerPodsCount") { - t.Errorf("controllerPodsCount should be present in JSON content: %v", body) - } - } - default: - t.Errorf("unknown request to %s", r.URL.Path) - } - - })) - - port := ts.Listener.Addr().(*net.TCPAddr).Port - defer ts.Close() - - err := configureDynamically(commonConfig, port, false) + err = configureDynamically(commonConfig, false) if err != nil { t.Errorf("unexpected error posting dynamic configuration: %v", err) } @@ -262,6 +254,19 @@ func TestConfigureDynamically(t *testing.T) { } func TestConfigureCertificates(t *testing.T) { + listener, err := net.Listen("unix", nginx.StatusSocket) + if err != nil { + t.Errorf("crating unix listener: %s", err) + } + defer listener.Close() + defer os.Remove(nginx.StatusSocket) + + streamListener, err := net.Listen("unix", nginx.StreamSocket) + if err != nil { + t.Errorf("crating unix listener: %s", err) + } + defer streamListener.Close() + defer os.Remove(nginx.StreamSocket) servers := []*ingress.Server{{ Hostname: "myapp.fake", @@ -270,42 +275,46 @@ func TestConfigureCertificates(t *testing.T) { }, }} - commonConfig := &ingress.Configuration{ - Servers: servers, - } - - ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - w.WriteHeader(http.StatusCreated) + server := &httptest.Server{ + Listener: listener, + Config: &http.Server{ + Handler: http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusCreated) - if r.Method != "POST" { - t.Errorf("expected a 'POST' request, got '%s'", r.Method) - } + if r.Method != "POST" { + t.Errorf("expected a 'POST' request, got '%s'", r.Method) + } - b, err := ioutil.ReadAll(r.Body) - if err != nil && err != io.EOF { - t.Fatal(err) - } - var postedServers []ingress.Server - err = jsoniter.ConfigCompatibleWithStandardLibrary.Unmarshal(b, &postedServers) - if err != nil { - t.Fatal(err) - } + b, err := ioutil.ReadAll(r.Body) + if err != nil && err != io.EOF { + t.Fatal(err) + } + var postedServers []ingress.Server + err = jsoniter.ConfigCompatibleWithStandardLibrary.Unmarshal(b, &postedServers) + if err != nil { + t.Fatal(err) + } - if len(servers) != len(postedServers) { - t.Errorf("Expected servers to be the same length as the posted servers") - } + if len(servers) != len(postedServers) { + t.Errorf("Expected servers to be the same length as the posted servers") + } - for i, server := range servers { - if !server.Equal(&postedServers[i]) { - t.Errorf("Expected servers and posted servers to be equal") - } - } - })) + for i, server := range servers { + if !server.Equal(&postedServers[i]) { + t.Errorf("Expected servers and posted servers to be equal") + } + } + }), + }, + } + defer server.Close() + server.Start() - port := ts.Listener.Addr().(*net.TCPAddr).Port - defer ts.Close() + commonConfig := &ingress.Configuration{ + Servers: servers, + } - err := configureCertificates(commonConfig, port) + err = configureCertificates(commonConfig) if err != nil { t.Errorf("unexpected error posting dynamic certificate configuration: %v", err) } diff --git a/internal/ingress/metric/collectors/nginx_status.go b/internal/ingress/metric/collectors/nginx_status.go index 0df9d8d089..15f45a56f9 100644 --- a/internal/ingress/metric/collectors/nginx_status.go +++ b/internal/ingress/metric/collectors/nginx_status.go @@ -17,13 +17,12 @@ limitations under the License. package collectors import ( - "fmt" - "io/ioutil" - "net/http" + "log" "regexp" "strconv" "github.com/prometheus/client_golang/prometheus" + "k8s.io/ingress-nginx/internal/nginx" "k8s.io/klog" ) @@ -39,9 +38,6 @@ type ( nginxStatusCollector struct { scrapeChan chan scrapeRequest - ngxHealthPort int - ngxStatusPath string - data *nginxStatusData } @@ -78,12 +74,10 @@ type NGINXStatusCollector interface { } // NewNGINXStatus returns a new prometheus collector the default nginx status module -func NewNGINXStatus(podName, namespace, ingressClass string, ngxHealthPort int) (NGINXStatusCollector, error) { +func NewNGINXStatus(podName, namespace, ingressClass string) (NGINXStatusCollector, error) { p := nginxStatusCollector{ - scrapeChan: make(chan scrapeRequest), - ngxHealthPort: ngxHealthPort, - ngxStatusPath: "/nginx_status", + scrapeChan: make(chan scrapeRequest), } constLabels := prometheus.Labels{ @@ -138,24 +132,6 @@ func (p nginxStatusCollector) Stop() { close(p.scrapeChan) } -func httpBody(url string) ([]byte, error) { - resp, err := http.DefaultClient.Get(url) - if err != nil { - return nil, fmt.Errorf("unexpected error scraping nginx : %v", err) - } - - data, err := ioutil.ReadAll(resp.Body) - if err != nil { - return nil, fmt.Errorf("unexpected error scraping nginx (%v)", err) - } - defer resp.Body.Close() - if resp.StatusCode < 200 || resp.StatusCode >= 400 { - return nil, fmt.Errorf("unexpected error scraping nginx (status %v)", resp.StatusCode) - } - - return data, nil -} - func toInt(data []string, pos int) int { if len(data) == 0 { return 0 @@ -187,27 +163,23 @@ func parse(data string) *basicStatus { } } -func getNginxStatus(port int, path string) (*basicStatus, error) { - url := fmt.Sprintf("http://0.0.0.0:%v%v", port, path) - klog.V(3).Infof("start scraping url: %v", url) - - data, err := httpBody(url) - - if err != nil { - return nil, fmt.Errorf("unexpected error scraping nginx status page: %v", err) - } - - return parse(string(data)), nil -} - // nginxStatusCollector scrape the nginx status func (p nginxStatusCollector) scrape(ch chan<- prometheus.Metric) { - s, err := getNginxStatus(p.ngxHealthPort, p.ngxStatusPath) + klog.V(3).Infof("start scraping socket: %v", nginx.StatusPath) + status, data, err := nginx.NewGetStatusRequest(nginx.StatusPath) if err != nil { + log.Printf("%v", err) klog.Warningf("unexpected error obtaining nginx status info: %v", err) return } + if status < 200 || status >= 400 { + klog.Warningf("unexpected error obtaining nginx status info (status %v)", status) + return + } + + s := parse(string(data)) + ch <- prometheus.MustNewConstMetric(p.data.connectionsTotal, prometheus.CounterValue, float64(s.Active), "active") ch <- prometheus.MustNewConstMetric(p.data.connectionsTotal, diff --git a/internal/ingress/metric/collectors/nginx_status_test.go b/internal/ingress/metric/collectors/nginx_status_test.go index 5d6dee0c73..e42620d5a2 100644 --- a/internal/ingress/metric/collectors/nginx_status_test.go +++ b/internal/ingress/metric/collectors/nginx_status_test.go @@ -21,9 +21,12 @@ import ( "net" "net/http" "net/http/httptest" + "os" "testing" + "time" "github.com/prometheus/client_golang/prometheus" + "k8s.io/ingress-nginx/internal/nginx" ) func TestStatusCollector(t *testing.T) { @@ -96,24 +99,39 @@ func TestStatusCollector(t *testing.T) { for _, c := range cases { t.Run(c.name, func(t *testing.T) { - server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - w.WriteHeader(http.StatusOK) - fmt.Fprintf(w, c.mock) - })) - p := server.Listener.Addr().(*net.TCPAddr).Port + listener, err := net.Listen("unix", nginx.StatusSocket) + if err != nil { + t.Fatalf("crating unix listener: %s", err) + } + + server := &httptest.Server{ + Listener: listener, + Config: &http.Server{Handler: http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusOK) - cm, err := NewNGINXStatus("pod", "default", "nginx", p) + if r.URL.Path == "/nginx_status" { + _, err := fmt.Fprintf(w, c.mock) + if err != nil { + t.Fatal(err) + } + + return + } + + fmt.Fprintf(w, "OK") + })}, + } + server.Start() + + time.Sleep(1 * time.Second) + + cm, err := NewNGINXStatus("pod", "default", "nginx") if err != nil { t.Errorf("unexpected error creating nginx status collector: %v", err) } go cm.Start() - defer func() { - server.Close() - cm.Stop() - }() - reg := prometheus.NewPedanticRegistry() if err := reg.Register(cm); err != nil { t.Errorf("registering collector failed: %s", err) @@ -124,6 +142,12 @@ func TestStatusCollector(t *testing.T) { } reg.Unregister(cm) + + server.Close() + cm.Stop() + + listener.Close() + os.Remove(nginx.StatusSocket) }) } } diff --git a/internal/ingress/metric/main.go b/internal/ingress/metric/main.go index c25343aeaa..4950a24976 100644 --- a/internal/ingress/metric/main.go +++ b/internal/ingress/metric/main.go @@ -59,7 +59,7 @@ type collector struct { } // NewCollector creates a new metric collector the for ingress controller -func NewCollector(statusPort int, metricsPerHost bool, registry *prometheus.Registry) (Collector, error) { +func NewCollector(metricsPerHost bool, registry *prometheus.Registry) (Collector, error) { podNamespace := os.Getenv("POD_NAMESPACE") if podNamespace == "" { podNamespace = "default" @@ -67,7 +67,7 @@ func NewCollector(statusPort int, metricsPerHost bool, registry *prometheus.Regi podName := os.Getenv("POD_NAME") - nc, err := collectors.NewNGINXStatus(podName, podNamespace, class.IngressClass, statusPort) + nc, err := collectors.NewNGINXStatus(podName, podNamespace, class.IngressClass) if err != nil { return nil, err } diff --git a/internal/nginx/main.go b/internal/nginx/main.go new file mode 100644 index 0000000000..474bf9fb5f --- /dev/null +++ b/internal/nginx/main.go @@ -0,0 +1,101 @@ +/* +Copyright 2019 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package nginx + +import ( + "bytes" + "encoding/json" + "fmt" + "io/ioutil" + "net/http" + "time" + + "github.com/tv42/httpunix" +) + +// PID defines the location of the pid file used by NGINX +var PID = "/tmp/nginx.pid" + +// StatusSocket defines the location of the unix socket used by NGINX for the status server +var StatusSocket = "/tmp/nginx-status-server.sock" + +// HealthPath defines the path used to define the health check location in NGINX +var HealthPath = "/healthz" + +// StatusPath defines the path used to expose the NGINX status page +// http://nginx.org/en/docs/http/ngx_http_stub_status_module.html +var StatusPath = "/nginx_status" + +// StreamSocket defines the location of the unix socket used by NGINX for the NGINX stream configuration socket +var StreamSocket = "/tmp/ingress-stream.sock" + +var statusLocation = "nginx-status" + +var socketClient = buildUnixSocketClient() + +// NewGetStatusRequest creates a new GET request to the internal NGINX status server +func NewGetStatusRequest(path string) (int, []byte, error) { + url := fmt.Sprintf("http+unix://%v%v", statusLocation, path) + res, err := socketClient.Get(url) + if err != nil { + return 0, nil, err + } + defer res.Body.Close() + + data, err := ioutil.ReadAll(res.Body) + if err != nil { + return 0, nil, err + } + + return res.StatusCode, data, nil +} + +// NewPostStatusRequest creates a new POST request to the internal NGINX status server +func NewPostStatusRequest(path, contentType string, data interface{}) (int, []byte, error) { + url := fmt.Sprintf("http+unix://%v%v", statusLocation, path) + + buf, err := json.Marshal(data) + if err != nil { + return 0, nil, err + } + + res, err := socketClient.Post(url, contentType, bytes.NewReader(buf)) + if err != nil { + return 0, nil, err + } + defer res.Body.Close() + + body, err := ioutil.ReadAll(res.Body) + if err != nil { + return 0, nil, err + } + + return res.StatusCode, body, nil +} + +func buildUnixSocketClient() *http.Client { + u := &httpunix.Transport{ + DialTimeout: 1 * time.Second, + RequestTimeout: 10 * time.Second, + ResponseHeaderTimeout: 10 * time.Second, + } + u.RegisterLocation(statusLocation, StatusSocket) + + return &http.Client{ + Transport: u, + } +} diff --git a/rootfs/etc/nginx/template/nginx.tmpl b/rootfs/etc/nginx/template/nginx.tmpl index be46290e6b..b82e403956 100644 --- a/rootfs/etc/nginx/template/nginx.tmpl +++ b/rootfs/etc/nginx/template/nginx.tmpl @@ -10,7 +10,7 @@ # Configuration checksum: {{ $all.Cfg.Checksum }} # setup custom paths that do not require root access -pid /tmp/nginx.pid; +pid {{ .PID }}; {{ if $cfg.UseGeoIP2 }} load_module /etc/nginx/modules/ngx_http_geoip2_module.so; @@ -614,7 +614,7 @@ http { server { listen {{ $all.ListenPorts.Default }} default_server {{ if $all.Cfg.ReusePort }}reuseport{{ end }} backlog={{ $all.BacklogSize }}; {{ if $IsIPV6Enabled }}listen [::]:{{ $all.ListenPorts.Default }} default_server {{ if $all.Cfg.ReusePort }}reuseport{{ end }} backlog={{ $all.BacklogSize }};{{ end }} - set $proxy_upstream_name "-"; + set $proxy_upstream_name "internal"; location / { return 404; @@ -623,35 +623,23 @@ http { # default server, used for NGINX healthcheck and access to nginx stats server { - listen {{ $all.ListenPorts.Status }} default_server {{ if $all.Cfg.ReusePort }}reuseport{{ end }} backlog={{ $all.BacklogSize }}; - {{ if $IsIPV6Enabled }}listen [::]:{{ $all.ListenPorts.Status }} default_server {{ if $all.Cfg.ReusePort }}reuseport{{ end }} backlog={{ $all.BacklogSize }};{{ end }} - set $proxy_upstream_name "-"; + listen unix:{{ .StatusSocket }}; + set $proxy_upstream_name "internal"; - {{ if gt (len $cfg.BlockUserAgents) 0 }} - if ($block_ua) { - return 403; - } - {{ end }} - {{ if gt (len $cfg.BlockReferers) 0 }} - if ($block_ref) { - return 403; - } + keepalive_timeout 0; + gzip off; + + access_log off; + + {{ if $cfg.EnableOpentracing }} + opentracing off; {{ end }} location {{ $healthzURI }} { - {{ if $cfg.EnableOpentracing }} - opentracing off; - {{ end }} - access_log off; return 200; } location /is-dynamic-lb-initialized { - {{ if $cfg.EnableOpentracing }} - opentracing off; - {{ end }} - access_log off; - content_by_lua_block { local configuration = require("configuration") local backend_data = configuration.get_backends_data() @@ -665,28 +653,11 @@ http { } } - location /nginx_status { - set $proxy_upstream_name "internal"; - {{ if $cfg.EnableOpentracing }} - opentracing off; - {{ end }} - - access_log off; + location {{ .StatusPath }} { stub_status on; } location /configuration { - access_log off; - {{ if $cfg.EnableOpentracing }} - opentracing off; - {{ end }} - - allow 127.0.0.1; - {{ if $IsIPV6Enabled }} - allow ::1; - {{ end }} - deny all; - # this should be equals to configuration_data dict client_max_body_size 10m; client_body_buffer_size 10m; @@ -698,16 +669,10 @@ http { } location / { - {{ if .CustomErrors }} - proxy_set_header X-Code 404; - {{ end }} - set $proxy_upstream_name "upstream-default-backend"; - proxy_set_header Host $best_http_host; - - proxy_pass http://upstream_balancer; + content_by_lua_block { + ngx.exit(ngx.HTTP_NOT_FOUND) + } } - - {{ template "CUSTOM_ERRORS" (buildCustomErrorDeps $all.ProxySetHeaders $cfg.CustomHTTPErrors $all.EnableMetrics) }} } } @@ -765,7 +730,7 @@ stream { } server { - listen unix:/tmp/ingress-stream.sock; + listen unix:{{ .StreamSocket }}; content_by_lua_block { tcp_udp_configuration.call() diff --git a/test/e2e/lua/dynamic_configuration.go b/test/e2e/lua/dynamic_configuration.go index df6dc704d2..a909e650db 100644 --- a/test/e2e/lua/dynamic_configuration.go +++ b/test/e2e/lua/dynamic_configuration.go @@ -31,6 +31,7 @@ import ( extensions "k8s.io/api/extensions/v1beta1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/ingress-nginx/internal/nginx" "k8s.io/ingress-nginx/test/e2e/framework" ) @@ -151,7 +152,10 @@ var _ = framework.IngressNginxDescribe("Dynamic Configuration", func() { }) It("sets controllerPodsCount in Lua general configuration", func() { - output, err := f.ExecIngressPod("curl --fail --silent http://127.0.0.1:18080/configuration/general") + // https://github.com/curl/curl/issues/936 + curlCmd := fmt.Sprintf("curl --fail --silent --unix-socket %v http://localhost/configuration/general", nginx.StatusSocket) + + output, err := f.ExecIngressPod(curlCmd) Expect(err).ToNot(HaveOccurred()) Expect(output).Should(Equal(`{"controllerPodsCount":1}`)) @@ -159,7 +163,7 @@ var _ = framework.IngressNginxDescribe("Dynamic Configuration", func() { Expect(err).ToNot(HaveOccurred()) time.Sleep(waitForLuaSync) - output, err = f.ExecIngressPod("curl --fail --silent http://127.0.0.1:18080/configuration/general") + output, err = f.ExecIngressPod(curlCmd) Expect(err).ToNot(HaveOccurred()) Expect(output).Should(Equal(`{"controllerPodsCount":3}`)) })