diff --git a/cmd/nginx/main.go b/cmd/nginx/main.go index 4d03526cfe..e924b55b65 100644 --- a/cmd/nginx/main.go +++ b/cmd/nginx/main.go @@ -130,13 +130,12 @@ func main() { err = collector.InitNGINXStatusCollector(conf.Namespace, class.IngressClass, conf.ListenPorts.Status) if err != nil { - glog.Fatalf("Error generating metric collector: %v", err) + glog.Fatalf("Error creating metric collector: %v", err) } - err = collector.InitUDPCollector(conf.Namespace, class.IngressClass, 8000) - + err = collector.NewInstance(conf.Namespace, class.IngressClass) if err != nil { - glog.Fatalf("Error generating UDP collector: %v", err) + glog.Fatalf("Error creating unix socket server: %v", err) } ngx.Start() diff --git a/internal/ingress/metric/collector/udp_collector.go b/internal/ingress/metric/collector/collector.go similarity index 88% rename from internal/ingress/metric/collector/udp_collector.go rename to internal/ingress/metric/collector/collector.go index 69af70b33c..697923764c 100644 --- a/internal/ingress/metric/collector/udp_collector.go +++ b/internal/ingress/metric/collector/collector.go @@ -26,7 +26,7 @@ import ( "github.com/prometheus/client_golang/prometheus" ) -type udpData struct { +type socketData struct { Host string `json:"host"` // Label Status string `json:"status"` // Label @@ -53,8 +53,8 @@ type udpData struct { Service string `json:"service"` // Label } -// UDPCollector stores prometheus metrics and ingress meta-data -type UDPCollector struct { +// SocketCollector stores prometheus metrics and ingress meta-data +type SocketCollector struct { upstreamResponseTime *prometheus.HistogramVec requestTime *prometheus.HistogramVec requestLength *prometheus.HistogramVec @@ -62,20 +62,18 @@ type UDPCollector struct { collectorSuccess *prometheus.GaugeVec collectorSuccessTime *prometheus.GaugeVec requests *prometheus.CounterVec - listener *net.UDPConn + listener net.Listener ns string ingressClass string - port int } -// InitUDPCollector creates a new UDPCollector instance -func InitUDPCollector(ns string, class string, port int) error { - sc := UDPCollector{} +// NewInstance creates a new SocketCollector instance +func NewInstance(ns string, class string) error { + sc := SocketCollector{} ns = strings.Replace(ns, "-", "_", -1) - listener, err := newUDPListener(port) - + listener, err := net.Listen("unix", "/tmp/prometheus-nginx.socket") if err != nil { return err } @@ -83,7 +81,6 @@ func InitUDPCollector(ns string, class string, port int) error { sc.listener = listener sc.ns = ns sc.ingressClass = class - sc.port = port requestTags := []string{"host", "status", "remote_address", "real_ip_address", "remote_user", "protocol", "method", "uri", "upstream_name", "upstream_ip", "upstream_status", "namespace", "ingress", "service"} collectorTags := []string{"namespace", "ingress_class"} @@ -166,13 +163,13 @@ func InitUDPCollector(ns string, class string, port int) error { return nil } -func (sc *UDPCollector) handleMessage(msg []byte) { +func (sc *SocketCollector) handleMessage(msg []byte) { glog.V(5).Infof("msg: %v", string(msg)) collectorSuccess := true // Unmarshall bytes - var stats udpData + var stats socketData err := json.Unmarshal(msg, &stats) if err != nil { glog.Errorf("Unexpected error deserializing JSON paylod: %v", err) @@ -271,7 +268,29 @@ func (sc *UDPCollector) handleMessage(msg []byte) { } } -// Run adds a message handler to a UDP listener -func (sc *UDPCollector) Run() { - handleMessages(sc.listener, sc.handleMessage) +// Run listen for connections in the unix socket and spawns a goroutine to process the content +func (sc *SocketCollector) Run() { + for { + conn, err := sc.listener.Accept() + if err != nil { + continue + } + + go handleMessages(conn, sc.handleMessage) + } +} + +const packetSize = 1024 * 65 + +// handleMessages process the content received in a network connection +func handleMessages(conn net.Conn, fn func([]byte)) { + defer conn.Close() + + msg := make([]byte, packetSize) + s, err := conn.Read(msg[0:]) + if err != nil { + return + } + + fn(msg[0:s]) } diff --git a/internal/ingress/metric/collector/listener_test.go b/internal/ingress/metric/collector/collector_test.go similarity index 68% rename from internal/ingress/metric/collector/listener_test.go rename to internal/ingress/metric/collector/collector_test.go index 6c159181c7..d5544924c6 100644 --- a/internal/ingress/metric/collector/listener_test.go +++ b/internal/ingress/metric/collector/collector_test.go @@ -25,8 +25,6 @@ import ( ) func TestNewUDPLogListener(t *testing.T) { - port := freeUDPPort() - var count uint64 fn := func(message []byte) { @@ -34,19 +32,30 @@ func TestNewUDPLogListener(t *testing.T) { atomic.AddUint64(&count, 1) } - t.Logf("UDP Port: %v", port) + tmpFile := fmt.Sprintf("/tmp/test-socket-%v", time.Now().Nanosecond()) - l, err := newUDPListener(port) + l, err := net.Listen("unix", tmpFile) if err != nil { - t.Errorf("unexpected error creating UDP listener: %v", err) + t.Fatalf("unexpected error creating unix socket: %v", err) } if l == nil { - t.Errorf("expected a listener but none returned") + t.Fatalf("expected a listener but none returned") } - go handleMessages(l, fn) + defer l.Close() + + go func() { + for { + conn, err := l.Accept() + if err != nil { + continue + } + + go handleMessages(conn, fn) + } + }() - conn, _ := net.Dial("udp", fmt.Sprintf(":%v", port)) + conn, _ := net.Dial("unix", tmpFile) conn.Write([]byte("message")) conn.Close() @@ -55,16 +64,3 @@ func TestNewUDPLogListener(t *testing.T) { t.Errorf("expected only one message from the UDP listern but %v returned", count) } } - -func freeUDPPort() int { - l, err := net.ListenUDP("udp", &net.UDPAddr{}) - if err != nil { - return 0 - } - - if err := l.Close(); err != nil { - return 0 - } - - return l.LocalAddr().(*net.UDPAddr).Port -} diff --git a/internal/ingress/metric/collector/listener.go b/internal/ingress/metric/collector/listener.go deleted file mode 100644 index ce985db2c1..0000000000 --- a/internal/ingress/metric/collector/listener.go +++ /dev/null @@ -1,51 +0,0 @@ -/* -Copyright 2018 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package collector - -import ( - "fmt" - "net" -) - -const packetSize = 1024 * 65 - -// newUDPListener creates a new UDP listener used to process messages -// from the NGINX log phase containing information about a request -func newUDPListener(port int) (*net.UDPConn, error) { - service := fmt.Sprintf("127.0.0.1:%v", port) - - udpAddr, err := net.ResolveUDPAddr("udp4", service) - if err != nil { - return nil, err - } - - return net.ListenUDP("udp", udpAddr) -} - -// handleMessages process packets received in an UDP connection -func handleMessages(conn *net.UDPConn, fn func([]byte)) { - msg := make([]byte, packetSize) - - for { - s, _, err := conn.ReadFrom(msg[0:]) - if err != nil { - continue - } - - fn(msg[0:s]) - } -} diff --git a/rootfs/etc/nginx/lua/monitor.lua b/rootfs/etc/nginx/lua/monitor.lua index 3cc56609f5..605edc8eb0 100644 --- a/rootfs/etc/nginx/lua/monitor.lua +++ b/rootfs/etc/nginx/lua/monitor.lua @@ -1,4 +1,4 @@ -local socket = ngx.socket.udp +local socket = ngx.socket.tcp local cjson = require('cjson') local defer = require('defer') local assert = assert @@ -7,7 +7,7 @@ local _M = {} local function send_data(jsonData) local s = assert(socket()) - assert(s:setpeername("127.0.0.1", 8000)) + assert(s:connect('unix:/tmp/prometheus-nginx.socket')) assert(s:send(jsonData)) assert(s:close()) end