Skip to content

Commit

Permalink
Use a unix socket instead udp for reception of metrics (#2652)
Browse files Browse the repository at this point in the history
  • Loading branch information
aledbf authored Jun 17, 2018
1 parent cd7625b commit c4ec773
Show file tree
Hide file tree
Showing 5 changed files with 57 additions and 94 deletions.
7 changes: 3 additions & 4 deletions cmd/nginx/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,13 +130,12 @@ func main() {
err = collector.InitNGINXStatusCollector(conf.Namespace, class.IngressClass, conf.ListenPorts.Status)

if err != nil {
glog.Fatalf("Error generating metric collector: %v", err)
glog.Fatalf("Error creating metric collector: %v", err)
}

err = collector.InitUDPCollector(conf.Namespace, class.IngressClass, 8000)

err = collector.NewInstance(conf.Namespace, class.IngressClass)
if err != nil {
glog.Fatalf("Error generating UDP collector: %v", err)
glog.Fatalf("Error creating unix socket server: %v", err)
}

ngx.Start()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import (
"github.com/prometheus/client_golang/prometheus"
)

type udpData struct {
type socketData struct {
Host string `json:"host"` // Label
Status string `json:"status"` // Label

Expand All @@ -53,37 +53,34 @@ type udpData struct {
Service string `json:"service"` // Label
}

// UDPCollector stores prometheus metrics and ingress meta-data
type UDPCollector struct {
// SocketCollector stores prometheus metrics and ingress meta-data
type SocketCollector struct {
upstreamResponseTime *prometheus.HistogramVec
requestTime *prometheus.HistogramVec
requestLength *prometheus.HistogramVec
bytesSent *prometheus.HistogramVec
collectorSuccess *prometheus.GaugeVec
collectorSuccessTime *prometheus.GaugeVec
requests *prometheus.CounterVec
listener *net.UDPConn
listener net.Listener
ns string
ingressClass string
port int
}

// InitUDPCollector creates a new UDPCollector instance
func InitUDPCollector(ns string, class string, port int) error {
sc := UDPCollector{}
// NewInstance creates a new SocketCollector instance
func NewInstance(ns string, class string) error {
sc := SocketCollector{}

ns = strings.Replace(ns, "-", "_", -1)

listener, err := newUDPListener(port)

listener, err := net.Listen("unix", "/tmp/prometheus-nginx.socket")
if err != nil {
return err
}

sc.listener = listener
sc.ns = ns
sc.ingressClass = class
sc.port = port

requestTags := []string{"host", "status", "remote_address", "real_ip_address", "remote_user", "protocol", "method", "uri", "upstream_name", "upstream_ip", "upstream_status", "namespace", "ingress", "service"}
collectorTags := []string{"namespace", "ingress_class"}
Expand Down Expand Up @@ -166,13 +163,13 @@ func InitUDPCollector(ns string, class string, port int) error {
return nil
}

func (sc *UDPCollector) handleMessage(msg []byte) {
func (sc *SocketCollector) handleMessage(msg []byte) {
glog.V(5).Infof("msg: %v", string(msg))

collectorSuccess := true

// Unmarshall bytes
var stats udpData
var stats socketData
err := json.Unmarshal(msg, &stats)
if err != nil {
glog.Errorf("Unexpected error deserializing JSON paylod: %v", err)
Expand Down Expand Up @@ -271,7 +268,29 @@ func (sc *UDPCollector) handleMessage(msg []byte) {
}
}

// Run adds a message handler to a UDP listener
func (sc *UDPCollector) Run() {
handleMessages(sc.listener, sc.handleMessage)
// Run listen for connections in the unix socket and spawns a goroutine to process the content
func (sc *SocketCollector) Run() {
for {
conn, err := sc.listener.Accept()
if err != nil {
continue
}

go handleMessages(conn, sc.handleMessage)
}
}

const packetSize = 1024 * 65

// handleMessages process the content received in a network connection
func handleMessages(conn net.Conn, fn func([]byte)) {
defer conn.Close()

msg := make([]byte, packetSize)
s, err := conn.Read(msg[0:])
if err != nil {
return
}

fn(msg[0:s])
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,28 +25,37 @@ import (
)

func TestNewUDPLogListener(t *testing.T) {
port := freeUDPPort()

var count uint64

fn := func(message []byte) {
t.Logf("message: %v", string(message))
atomic.AddUint64(&count, 1)
}

t.Logf("UDP Port: %v", port)
tmpFile := fmt.Sprintf("/tmp/test-socket-%v", time.Now().Nanosecond())

l, err := newUDPListener(port)
l, err := net.Listen("unix", tmpFile)
if err != nil {
t.Errorf("unexpected error creating UDP listener: %v", err)
t.Fatalf("unexpected error creating unix socket: %v", err)
}
if l == nil {
t.Errorf("expected a listener but none returned")
t.Fatalf("expected a listener but none returned")
}

go handleMessages(l, fn)
defer l.Close()

go func() {
for {
conn, err := l.Accept()
if err != nil {
continue
}

go handleMessages(conn, fn)
}
}()

conn, _ := net.Dial("udp", fmt.Sprintf(":%v", port))
conn, _ := net.Dial("unix", tmpFile)
conn.Write([]byte("message"))
conn.Close()

Expand All @@ -55,16 +64,3 @@ func TestNewUDPLogListener(t *testing.T) {
t.Errorf("expected only one message from the UDP listern but %v returned", count)
}
}

func freeUDPPort() int {
l, err := net.ListenUDP("udp", &net.UDPAddr{})
if err != nil {
return 0
}

if err := l.Close(); err != nil {
return 0
}

return l.LocalAddr().(*net.UDPAddr).Port
}
51 changes: 0 additions & 51 deletions internal/ingress/metric/collector/listener.go

This file was deleted.

4 changes: 2 additions & 2 deletions rootfs/etc/nginx/lua/monitor.lua
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
local socket = ngx.socket.udp
local socket = ngx.socket.tcp
local cjson = require('cjson')
local defer = require('defer')
local assert = assert
Expand All @@ -7,7 +7,7 @@ local _M = {}

local function send_data(jsonData)
local s = assert(socket())
assert(s:setpeername("127.0.0.1", 8000))
assert(s:connect('unix:/tmp/prometheus-nginx.socket'))
assert(s:send(jsonData))
assert(s:close())
end
Expand Down

0 comments on commit c4ec773

Please sign in to comment.