Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use a unix socket instead udp for reception of metrics #2652

Merged
merged 1 commit into from
Jun 17, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 3 additions & 4 deletions cmd/nginx/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,13 +130,12 @@ func main() {
err = collector.InitNGINXStatusCollector(conf.Namespace, class.IngressClass, conf.ListenPorts.Status)

if err != nil {
glog.Fatalf("Error generating metric collector: %v", err)
glog.Fatalf("Error creating metric collector: %v", err)
}

err = collector.InitUDPCollector(conf.Namespace, class.IngressClass, 8000)

err = collector.NewInstance(conf.Namespace, class.IngressClass)
if err != nil {
glog.Fatalf("Error generating UDP collector: %v", err)
glog.Fatalf("Error creating unix socket server: %v", err)
}

ngx.Start()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import (
"github.com/prometheus/client_golang/prometheus"
)

type udpData struct {
type socketData struct {
Host string `json:"host"` // Label
Status string `json:"status"` // Label

Expand All @@ -53,37 +53,34 @@ type udpData struct {
Service string `json:"service"` // Label
}

// UDPCollector stores prometheus metrics and ingress meta-data
type UDPCollector struct {
// SocketCollector stores prometheus metrics and ingress meta-data
type SocketCollector struct {
upstreamResponseTime *prometheus.HistogramVec
requestTime *prometheus.HistogramVec
requestLength *prometheus.HistogramVec
bytesSent *prometheus.HistogramVec
collectorSuccess *prometheus.GaugeVec
collectorSuccessTime *prometheus.GaugeVec
requests *prometheus.CounterVec
listener *net.UDPConn
listener net.Listener
ns string
ingressClass string
port int
}

// InitUDPCollector creates a new UDPCollector instance
func InitUDPCollector(ns string, class string, port int) error {
sc := UDPCollector{}
// NewInstance creates a new SocketCollector instance
func NewInstance(ns string, class string) error {
sc := SocketCollector{}

ns = strings.Replace(ns, "-", "_", -1)

listener, err := newUDPListener(port)

listener, err := net.Listen("unix", "/tmp/prometheus-nginx.socket")
if err != nil {
return err
}

sc.listener = listener
sc.ns = ns
sc.ingressClass = class
sc.port = port

requestTags := []string{"host", "status", "remote_address", "real_ip_address", "remote_user", "protocol", "method", "uri", "upstream_name", "upstream_ip", "upstream_status", "namespace", "ingress", "service"}
collectorTags := []string{"namespace", "ingress_class"}
Expand Down Expand Up @@ -166,13 +163,13 @@ func InitUDPCollector(ns string, class string, port int) error {
return nil
}

func (sc *UDPCollector) handleMessage(msg []byte) {
func (sc *SocketCollector) handleMessage(msg []byte) {
glog.V(5).Infof("msg: %v", string(msg))

collectorSuccess := true

// Unmarshall bytes
var stats udpData
var stats socketData
err := json.Unmarshal(msg, &stats)
if err != nil {
glog.Errorf("Unexpected error deserializing JSON paylod: %v", err)
Expand Down Expand Up @@ -271,7 +268,29 @@ func (sc *UDPCollector) handleMessage(msg []byte) {
}
}

// Run adds a message handler to a UDP listener
func (sc *UDPCollector) Run() {
handleMessages(sc.listener, sc.handleMessage)
// Run listen for connections in the unix socket and spawns a goroutine to process the content
func (sc *SocketCollector) Run() {
for {
conn, err := sc.listener.Accept()
if err != nil {
continue
}

go handleMessages(conn, sc.handleMessage)
}
}

const packetSize = 1024 * 65

// handleMessages process the content received in a network connection
func handleMessages(conn net.Conn, fn func([]byte)) {
defer conn.Close()

msg := make([]byte, packetSize)
s, err := conn.Read(msg[0:])
if err != nil {
return
}

fn(msg[0:s])
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,28 +25,37 @@ import (
)

func TestNewUDPLogListener(t *testing.T) {
port := freeUDPPort()

var count uint64

fn := func(message []byte) {
t.Logf("message: %v", string(message))
atomic.AddUint64(&count, 1)
}

t.Logf("UDP Port: %v", port)
tmpFile := fmt.Sprintf("/tmp/test-socket-%v", time.Now().Nanosecond())

l, err := newUDPListener(port)
l, err := net.Listen("unix", tmpFile)
if err != nil {
t.Errorf("unexpected error creating UDP listener: %v", err)
t.Fatalf("unexpected error creating unix socket: %v", err)
}
if l == nil {
t.Errorf("expected a listener but none returned")
t.Fatalf("expected a listener but none returned")
}

go handleMessages(l, fn)
defer l.Close()

go func() {
for {
conn, err := l.Accept()
if err != nil {
continue
}

go handleMessages(conn, fn)
}
}()

conn, _ := net.Dial("udp", fmt.Sprintf(":%v", port))
conn, _ := net.Dial("unix", tmpFile)
conn.Write([]byte("message"))
conn.Close()

Expand All @@ -55,16 +64,3 @@ func TestNewUDPLogListener(t *testing.T) {
t.Errorf("expected only one message from the UDP listern but %v returned", count)
}
}

func freeUDPPort() int {
l, err := net.ListenUDP("udp", &net.UDPAddr{})
if err != nil {
return 0
}

if err := l.Close(); err != nil {
return 0
}

return l.LocalAddr().(*net.UDPAddr).Port
}
51 changes: 0 additions & 51 deletions internal/ingress/metric/collector/listener.go

This file was deleted.

4 changes: 2 additions & 2 deletions rootfs/etc/nginx/lua/monitor.lua
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
local socket = ngx.socket.udp
local socket = ngx.socket.tcp
local cjson = require('cjson')
local defer = require('defer')
local assert = assert
Expand All @@ -7,7 +7,7 @@ local _M = {}

local function send_data(jsonData)
local s = assert(socket())
assert(s:setpeername("127.0.0.1", 8000))
assert(s:connect('unix:/tmp/prometheus-nginx.socket'))
assert(s:send(jsonData))
assert(s:close())
end
Expand Down