diff --git a/internal/ingress/metric/collectors/socket.go b/internal/ingress/metric/collectors/socket.go index 77cd994d55..ebfa37b07f 100644 --- a/internal/ingress/metric/collectors/socket.go +++ b/internal/ingress/metric/collectors/socket.go @@ -20,6 +20,7 @@ import ( "encoding/json" "fmt" "io" + "io/ioutil" "net" "os" @@ -206,92 +207,94 @@ func (sc *SocketCollector) handleMessage(msg []byte) { glog.V(5).Infof("msg: %v", string(msg)) // Unmarshall bytes - var stats socketData - err := json.Unmarshal(msg, &stats) + var statsBatch []socketData + err := json.Unmarshal(msg, &statsBatch) if err != nil { - glog.Errorf("Unexpected error deserializing JSON paylod: %v", err) + glog.Errorf("Unexpected error deserializing JSON paylod: %v. Payload:\n%v", err, string(msg)) return } - requestLabels := prometheus.Labels{ - "host": stats.Host, - "status": stats.Status, - "method": stats.Method, - "path": stats.Path, - //"endpoint": stats.Endpoint, - "namespace": stats.Namespace, - "ingress": stats.Ingress, - "service": stats.Service, - } - - collectorLabels := prometheus.Labels{ - "namespace": stats.Namespace, - "ingress": stats.Ingress, - "status": stats.Status, - } + for _, stats := range statsBatch { + requestLabels := prometheus.Labels{ + "host": stats.Host, + "status": stats.Status, + "method": stats.Method, + "path": stats.Path, + //"endpoint": stats.Endpoint, + "namespace": stats.Namespace, + "ingress": stats.Ingress, + "service": stats.Service, + } - latencyLabels := prometheus.Labels{ - "namespace": stats.Namespace, - "ingress": stats.Ingress, - "service": stats.Service, - } + collectorLabels := prometheus.Labels{ + "namespace": stats.Namespace, + "ingress": stats.Ingress, + "status": stats.Status, + } - requestsMetric, err := sc.requests.GetMetricWith(collectorLabels) - if err != nil { - glog.Errorf("Error fetching requests metric: %v", err) - } else { - requestsMetric.Inc() - } + latencyLabels := prometheus.Labels{ + "namespace": stats.Namespace, + "ingress": stats.Ingress, + "service": stats.Service, + } - if stats.Latency != -1 { - latencyMetric, err := sc.upstreamLatency.GetMetricWith(latencyLabels) + requestsMetric, err := sc.requests.GetMetricWith(collectorLabels) if err != nil { - glog.Errorf("Error fetching latency metric: %v", err) + glog.Errorf("Error fetching requests metric: %v", err) } else { - latencyMetric.Observe(stats.Latency) + requestsMetric.Inc() } - } - if stats.RequestTime != -1 { - requestTimeMetric, err := sc.requestTime.GetMetricWith(requestLabels) - if err != nil { - glog.Errorf("Error fetching request duration metric: %v", err) - } else { - requestTimeMetric.Observe(stats.RequestTime) + if stats.Latency != -1 { + latencyMetric, err := sc.upstreamLatency.GetMetricWith(latencyLabels) + if err != nil { + glog.Errorf("Error fetching latency metric: %v", err) + } else { + latencyMetric.Observe(stats.Latency) + } } - } - if stats.RequestLength != -1 { - requestLengthMetric, err := sc.requestLength.GetMetricWith(requestLabels) - if err != nil { - glog.Errorf("Error fetching request length metric: %v", err) - } else { - requestLengthMetric.Observe(stats.RequestLength) + if stats.RequestTime != -1 { + requestTimeMetric, err := sc.requestTime.GetMetricWith(requestLabels) + if err != nil { + glog.Errorf("Error fetching request duration metric: %v", err) + } else { + requestTimeMetric.Observe(stats.RequestTime) + } } - } - if stats.ResponseTime != -1 { - responseTimeMetric, err := sc.responseTime.GetMetricWith(requestLabels) - if err != nil { - glog.Errorf("Error fetching upstream response time metric: %v", err) - } else { - responseTimeMetric.Observe(stats.ResponseTime) + if stats.RequestLength != -1 { + requestLengthMetric, err := sc.requestLength.GetMetricWith(requestLabels) + if err != nil { + glog.Errorf("Error fetching request length metric: %v", err) + } else { + requestLengthMetric.Observe(stats.RequestLength) + } } - } - if stats.ResponseLength != -1 { - bytesSentMetric, err := sc.bytesSent.GetMetricWith(requestLabels) - if err != nil { - glog.Errorf("Error fetching bytes sent metric: %v", err) - } else { - bytesSentMetric.Observe(stats.ResponseLength) + if stats.ResponseTime != -1 { + responseTimeMetric, err := sc.responseTime.GetMetricWith(requestLabels) + if err != nil { + glog.Errorf("Error fetching upstream response time metric: %v", err) + } else { + responseTimeMetric.Observe(stats.ResponseTime) + } } - responseSizeMetric, err := sc.responseLength.GetMetricWith(requestLabels) - if err != nil { - glog.Errorf("Error fetching bytes sent metric: %v", err) - } else { - responseSizeMetric.Observe(stats.ResponseLength) + if stats.ResponseLength != -1 { + bytesSentMetric, err := sc.bytesSent.GetMetricWith(requestLabels) + if err != nil { + glog.Errorf("Error fetching bytes sent metric: %v", err) + } else { + bytesSentMetric.Observe(stats.ResponseLength) + } + + responseSizeMetric, err := sc.responseLength.GetMetricWith(requestLabels) + if err != nil { + glog.Errorf("Error fetching bytes sent metric: %v", err) + } else { + responseSizeMetric.Observe(stats.ResponseLength) + } } } } @@ -408,19 +411,15 @@ func (sc SocketCollector) Collect(ch chan<- prometheus.Metric) { sc.bytesSent.Collect(ch) } -const packetSize = 1024 * 65 - // handleMessages process the content received in a network connection func handleMessages(conn io.ReadCloser, fn func([]byte)) { defer conn.Close() - - msg := make([]byte, packetSize) - s, err := conn.Read(msg[0:]) + data, err := ioutil.ReadAll(conn) if err != nil { return } - fn(msg[0:s]) + fn(data) } func deleteConstants(labels prometheus.Labels) { diff --git a/internal/ingress/metric/collectors/socket_test.go b/internal/ingress/metric/collectors/socket_test.go index 21bd535a5d..931ebb9db2 100644 --- a/internal/ingress/metric/collectors/socket_test.go +++ b/internal/ingress/metric/collectors/socket_test.go @@ -100,7 +100,7 @@ func TestCollector(t *testing.T) { }, { name: "valid metric object should update prometheus metrics", - data: []string{`{ + data: []string{`[{ "host":"testshop.com", "status":"200", "bytesSent":150.0, @@ -115,7 +115,7 @@ func TestCollector(t *testing.T) { "namespace":"test-app-production", "ingress":"web-yml", "service":"test-app" - }`}, + }]`}, metrics: []string{"nginx_ingress_controller_response_duration_seconds"}, wantBefore: ` # HELP nginx_ingress_controller_response_duration_seconds The time spent on receiving the response from the upstream server @@ -142,7 +142,7 @@ func TestCollector(t *testing.T) { { name: "multiple messages should increase prometheus metric by two", - data: []string{`{ + data: []string{`[{ "host":"testshop.com", "status":"200", "bytesSent":150.0, @@ -157,7 +157,7 @@ func TestCollector(t *testing.T) { "namespace":"test-app-production", "ingress":"web-yml", "service":"test-app" - }`, `{ + }]`, `[{ "host":"testshop.com", "status":"200", "bytesSent":150.0, @@ -172,7 +172,7 @@ func TestCollector(t *testing.T) { "namespace":"test-app-qa", "ingress":"web-yml-qa", "service":"test-app-qa" - }`, `{ + }]`, `[{ "host":"testshop.com", "status":"200", "bytesSent":150.0, @@ -187,7 +187,7 @@ func TestCollector(t *testing.T) { "namespace":"test-app-qa", "ingress":"web-yml-qa", "service":"test-app-qa" - }`}, + }]`}, metrics: []string{"nginx_ingress_controller_response_duration_seconds"}, wantBefore: ` # HELP nginx_ingress_controller_response_duration_seconds The time spent on receiving the response from the upstream server @@ -222,6 +222,65 @@ func TestCollector(t *testing.T) { nginx_ingress_controller_response_duration_seconds_count{controller_class="ingress",controller_namespace="default",controller_pod="pod",host="testshop.com",ingress="web-yml-qa",method="GET",namespace="test-app-qa",path="/admin",service="test-app-qa",status="200"} 2 `, }, + + { + name: "collector should be able to handle batched metrics correctly", + data: []string{`[ + { + "host":"testshop.com", + "status":"200", + "bytesSent":150.0, + "method":"GET", + "path":"/admin", + "requestLength":300.0, + "requestTime":60.0, + "upstreamName":"test-upstream", + "upstreamIP":"1.1.1.1:8080", + "upstreamResponseTime":200, + "upstreamStatus":"220", + "namespace":"test-app-production", + "ingress":"web-yml", + "service":"test-app" + }, + { + "host":"testshop.com", + "status":"200", + "bytesSent":150.0, + "method":"GET", + "path":"/admin", + "requestLength":300.0, + "requestTime":60.0, + "upstreamName":"test-upstream", + "upstreamIP":"1.1.1.1:8080", + "upstreamResponseTime":100, + "upstreamStatus":"220", + "namespace":"test-app-production", + "ingress":"web-yml", + "service":"test-app" + }]`}, + metrics: []string{"nginx_ingress_controller_response_duration_seconds"}, + wantBefore: ` + # HELP nginx_ingress_controller_response_duration_seconds The time spent on receiving the response from the upstream server + # TYPE nginx_ingress_controller_response_duration_seconds histogram + nginx_ingress_controller_response_duration_seconds_bucket{controller_class="ingress",controller_namespace="default",controller_pod="pod",host="testshop.com",ingress="web-yml",method="GET",namespace="test-app-production",path="/admin",service="test-app",status="200",le="0.005"} 0 + nginx_ingress_controller_response_duration_seconds_bucket{controller_class="ingress",controller_namespace="default",controller_pod="pod",host="testshop.com",ingress="web-yml",method="GET",namespace="test-app-production",path="/admin",service="test-app",status="200",le="0.01"} 0 + nginx_ingress_controller_response_duration_seconds_bucket{controller_class="ingress",controller_namespace="default",controller_pod="pod",host="testshop.com",ingress="web-yml",method="GET",namespace="test-app-production",path="/admin",service="test-app",status="200",le="0.025"} 0 + nginx_ingress_controller_response_duration_seconds_bucket{controller_class="ingress",controller_namespace="default",controller_pod="pod",host="testshop.com",ingress="web-yml",method="GET",namespace="test-app-production",path="/admin",service="test-app",status="200",le="0.05"} 0 + nginx_ingress_controller_response_duration_seconds_bucket{controller_class="ingress",controller_namespace="default",controller_pod="pod",host="testshop.com",ingress="web-yml",method="GET",namespace="test-app-production",path="/admin",service="test-app",status="200",le="0.1"} 0 + nginx_ingress_controller_response_duration_seconds_bucket{controller_class="ingress",controller_namespace="default",controller_pod="pod",host="testshop.com",ingress="web-yml",method="GET",namespace="test-app-production",path="/admin",service="test-app",status="200",le="0.25"} 0 + nginx_ingress_controller_response_duration_seconds_bucket{controller_class="ingress",controller_namespace="default",controller_pod="pod",host="testshop.com",ingress="web-yml",method="GET",namespace="test-app-production",path="/admin",service="test-app",status="200",le="0.5"} 0 + nginx_ingress_controller_response_duration_seconds_bucket{controller_class="ingress",controller_namespace="default",controller_pod="pod",host="testshop.com",ingress="web-yml",method="GET",namespace="test-app-production",path="/admin",service="test-app",status="200",le="1"} 0 + nginx_ingress_controller_response_duration_seconds_bucket{controller_class="ingress",controller_namespace="default",controller_pod="pod",host="testshop.com",ingress="web-yml",method="GET",namespace="test-app-production",path="/admin",service="test-app",status="200",le="2.5"} 0 + nginx_ingress_controller_response_duration_seconds_bucket{controller_class="ingress",controller_namespace="default",controller_pod="pod",host="testshop.com",ingress="web-yml",method="GET",namespace="test-app-production",path="/admin",service="test-app",status="200",le="5"} 0 + nginx_ingress_controller_response_duration_seconds_bucket{controller_class="ingress",controller_namespace="default",controller_pod="pod",host="testshop.com",ingress="web-yml",method="GET",namespace="test-app-production",path="/admin",service="test-app",status="200",le="10"} 0 + nginx_ingress_controller_response_duration_seconds_bucket{controller_class="ingress",controller_namespace="default",controller_pod="pod",host="testshop.com",ingress="web-yml",method="GET",namespace="test-app-production",path="/admin",service="test-app",status="200",le="+Inf"} 2 + nginx_ingress_controller_response_duration_seconds_sum{controller_class="ingress",controller_namespace="default",controller_pod="pod",host="testshop.com",ingress="web-yml",method="GET",namespace="test-app-production",path="/admin",service="test-app",status="200"} 300 + nginx_ingress_controller_response_duration_seconds_count{controller_class="ingress",controller_namespace="default",controller_pod="pod",host="testshop.com",ingress="web-yml",method="GET",namespace="test-app-production",path="/admin",service="test-app",status="200"} 2 + `, + removeIngresses: []string{"test-app-production/web-yml"}, + wantAfter: ` + `, + }, } for _, c := range cases { diff --git a/rootfs/etc/nginx/lua/monitor.lua b/rootfs/etc/nginx/lua/monitor.lua index d9f7a3fc07..512ef46eb9 100644 --- a/rootfs/etc/nginx/lua/monitor.lua +++ b/rootfs/etc/nginx/lua/monitor.lua @@ -1,50 +1,83 @@ local socket = ngx.socket.tcp local cjson = require('cjson') -local defer = require('util.defer') local assert = assert +local metrics_batch = {} +-- if an Nginx worker processes more than (MAX_BATCH_SIZE/FLUSH_INTERVAL) RPS then it will start dropping metrics +local MAX_BATCH_SIZE = 10000 +local FLUSH_INTERVAL = 1 -- second + local _M = {} -local function send_data(jsonData) +local function send(payload) local s = assert(socket()) - assert(s:connect('unix:/tmp/prometheus-nginx.socket')) - assert(s:send(jsonData)) + assert(s:connect("unix:/tmp/prometheus-nginx.socket")) + assert(s:send(payload)) assert(s:close()) end -function _M.encode_nginx_stats() - return cjson.encode({ +local function metrics() + return { host = ngx.var.host or "-", - - method = ngx.var.request_method or "-", + namespace = ngx.var.namespace or "-", + ingress = ngx.var.ingress_name or "-", + service = ngx.var.service_name or "-", path = ngx.var.location_path or "-", + method = ngx.var.request_method or "-", status = ngx.var.status or "-", - requestLength = tonumber(ngx.var.request_length) or -1, requestTime = tonumber(ngx.var.request_time) or -1, - responseLength = tonumber(ngx.var.bytes_sent) or -1, endpoint = ngx.var.upstream_addr or "-", - upstreamLatency = tonumber(ngx.var.upstream_connect_time) or -1, upstreamResponseTime = tonumber(ngx.var.upstream_response_time) or -1, upstreamResponseLength = tonumber(ngx.var.upstream_response_length) or -1, upstreamStatus = ngx.var.upstream_status or "-", + } +end - namespace = ngx.var.namespace or "-", - ingress = ngx.var.ingress_name or "-", - service = ngx.var.service_name or "-", - }) +local function flush(premature) + if premature then + return + end + + if #metrics_batch == 0 then + return + end + + local current_metrics_batch = metrics_batch + metrics_batch = {} + + local ok, payload = pcall(cjson.encode, current_metrics_batch) + if not ok then + ngx.log(ngx.ERR, "error while encoding metrics: " .. tostring(payload)) + return + end + + send(payload) +end + +function _M.init_worker() + local _, err = ngx.timer.every(FLUSH_INTERVAL, flush) + if err then + ngx.log(ngx.ERR, string.format("error when setting up timer.every: %s", tostring(err))) + end end function _M.call() - local ok, err = defer.to_timer_phase(send_data, _M.encode_nginx_stats()) - if not ok then - ngx.log(ngx.ERR, "failed to defer send_data to timer phase: ", err) + if #metrics_batch >= MAX_BATCH_SIZE then + ngx.log(ngx.WARN, "omiting metrics for the request, current batch is full") return end + + table.insert(metrics_batch, metrics()) +end + +if _TEST then + _M.flush = flush + _M.get_metrics_batch = function() return metrics_batch end end return _M diff --git a/rootfs/etc/nginx/lua/test/defer_test.lua b/rootfs/etc/nginx/lua/test/defer_test.lua deleted file mode 100644 index 459a7c8bfa..0000000000 --- a/rootfs/etc/nginx/lua/test/defer_test.lua +++ /dev/null @@ -1,12 +0,0 @@ -_G._TEST = true -local defer = require("util.defer") - -describe("Defer", function() - describe("to_timer_phase", function() - it("executes passed callback immediately if called on timer phase", function() - defer.counter = 0 - defer.to_timer_phase(function() defer.counter = defer.counter + 1 end) - assert.equal(defer.counter, 1) - end) - end) -end) diff --git a/rootfs/etc/nginx/lua/test/monitor_test.lua b/rootfs/etc/nginx/lua/test/monitor_test.lua index b67c449a2c..08f7fcd24b 100644 --- a/rootfs/etc/nginx/lua/test/monitor_test.lua +++ b/rootfs/etc/nginx/lua/test/monitor_test.lua @@ -1,96 +1,106 @@ _G._TEST = true -local cjson = require("cjson") + +local original_ngx = ngx +local function reset_ngx() + _G.ngx = original_ngx +end + +local function mock_ngx(mock) + local _ngx = mock + setmetatable(_ngx, { __index = ngx }) + _G.ngx = _ngx +end + +local function mock_ngx_socket_tcp() + local tcp_mock = {} + stub(tcp_mock, "connect", true) + stub(tcp_mock, "send", true) + stub(tcp_mock, "close", true) + + local socket_mock = {} + stub(socket_mock, "tcp", tcp_mock) + mock_ngx({ socket = socket_mock }) + + return tcp_mock +end describe("Monitor", function() + after_each(function() + reset_ngx() + package.loaded["monitor"] = nil + end) + + it("batches metrics", function() local monitor = require("monitor") - describe("encode_nginx_stats()", function() - it("successfuly encodes the current stats of nginx to JSON", function() - local nginx_environment = { - host = "testshop.com", - status = "200", - bytes_sent = "150", - server_protocol = "HTTP", - request_method = "GET", - location_path = "/admin", - request_length = "300", - request_time = "210", - proxy_upstream_name = "test-upstream", - upstream_addr = "2.2.2.2", - upstream_response_time = "200", - upstream_response_length = "150", - upstream_connect_time = "1", - upstream_status = "220", - namespace = "test-app-production", - ingress_name = "web-yml", - service_name = "test-app", - } - ngx.var = nginx_environment - - local encode_nginx_stats = monitor.encode_nginx_stats - local encoded_json_stats = encode_nginx_stats() - local decoded_json_stats = cjson.decode(encoded_json_stats) - - local expected_json_stats = { - host = "testshop.com", - status = "200", - responseLength = 150.0, - method = "GET", - path = "/admin", - requestLength = 300.0, - requestTime = 210.0, - endpoint = "2.2.2.2", - upstreamResponseTime = 200, - upstreamStatus = "220", - upstreamLatency = 1.0, - upstreamResponseLength = 150.0, - namespace = "test-app-production", - ingress = "web-yml", - service = "test-app", - } - - assert.are.same(decoded_json_stats,expected_json_stats) - end) - - it("replaces empty numeric keys with -1 and missing string keys with -", function() - local nginx_environment = { - remote_addr = "10.10.10.10", - realip_remote_addr = "5.5.5.5", - remote_user = "francisco", - server_protocol = "HTTP", - request_method = "GET", - location_path = "/admin", - request_time = "202", - proxy_upstream_name = "test-upstream", - upstream_addr = "2.2.2.2", - upstream_response_time = "201", - upstream_status = "220", - ingress_name = "web-yml", - } - ngx.var = nginx_environment - - local encode_nginx_stats = monitor.encode_nginx_stats - local encoded_json_stats = encode_nginx_stats() - local decoded_json_stats = cjson.decode(encoded_json_stats) - - local expected_json_stats = { - host = "-", - status = "-", - responseLength = -1, - method = "GET", - path = "/admin", - requestLength = -1, - requestTime = 202.0, - endpoint = "2.2.2.2", - upstreamStatus = "220", - namespace = "-", - ingress = "web-yml", - upstreamLatency = -1, - upstreamResponseTime = 201, - upstreamResponseLength = -1, - responseLength = -1, - service = "-", - } - assert.are.same(decoded_json_stats,expected_json_stats) - end) + mock_ngx({ var = {} }) + + for i = 1,10,1 do + monitor.call() + end + + assert.equal(10, #monitor.get_metrics_batch()) + end) + + describe("flush", function() + it("short circuits when premmature is true (when worker is shutting down)", function() + local tcp_mock = mock_ngx_socket_tcp() + local monitor = require("monitor") + mock_ngx({ var = {} }) + + for i = 1,10,1 do + monitor.call() + end + monitor.flush(true) + assert.stub(tcp_mock.connect).was_not_called() + end) + + it("short circuits when there's no metrics batched", function() + local tcp_mock = mock_ngx_socket_tcp() + local monitor = require("monitor") + + monitor.flush() + assert.stub(tcp_mock.connect).was_not_called() + end) + + it("JSON encodes and sends the batched metrics", function() + local tcp_mock = mock_ngx_socket_tcp() + local monitor = require("monitor") + + local ngx_var_mock = { + host = "example.com", + namespace = "default", + ingress_name = "example", + service_name = "http-svc", + location_path = "/", + + request_method = "GET", + status = "200", + request_length = "256", + request_time = "0.04", + bytes_sent = "512", + + upstream_addr = "10.10.0.1", + upstream_connect_time = "0.01", + upstream_response_time = "0.02", + upstream_response_length = "456", + upstream_status = "200", + } + mock_ngx({ var = ngx_var_mock }) + monitor.call() + + local ngx_var_mock1 = ngx_var_mock + ngx_var_mock1.status = "201" + ngx_var_mock1.request_method = "POST" + mock_ngx({ var = ngx_var_mock }) + monitor.call() + + monitor.flush() + + local expected_payload = '[{"upstreamStatus":"200","requestLength":256,"ingress":"example","status":"200","service":"http-svc","requestTime":0.04,"namespace":"default","host":"example.com","method":"GET","upstreamResponseTime":0.02,"upstreamResponseLength":456,"endpoint":"10.10.0.1","upstreamLatency":0.01,"path":"\\/","responseLength":512},{"upstreamStatus":"200","requestLength":256,"ingress":"example","status":"201","service":"http-svc","requestTime":0.04,"namespace":"default","host":"example.com","method":"POST","upstreamResponseTime":0.02,"upstreamResponseLength":456,"endpoint":"10.10.0.1","upstreamLatency":0.01,"path":"\\/","responseLength":512}]' + + assert.stub(tcp_mock.connect).was_called_with(tcp_mock, "unix:/tmp/prometheus-nginx.socket") + assert.stub(tcp_mock.send).was_called_with(tcp_mock, expected_payload) + assert.stub(tcp_mock.close).was_called_with(tcp_mock) end) + end) end) diff --git a/rootfs/etc/nginx/lua/util/defer.lua b/rootfs/etc/nginx/lua/util/defer.lua deleted file mode 100644 index 3658de4a8f..0000000000 --- a/rootfs/etc/nginx/lua/util/defer.lua +++ /dev/null @@ -1,57 +0,0 @@ -local util = require("util") - -local timer_started = false -local queue = {} -local MAX_QUEUE_SIZE = 10000 - -local _M = {} - -local function flush_queue(premature) - -- TODO Investigate if we should actually still flush the queue when we're - -- shutting down. - if premature then return end - - local current_queue = queue - queue = {} - timer_started = false - - for _,v in ipairs(current_queue) do - v.func(unpack(v.args)) - end -end - --- `to_timer_phase` will enqueue a function that will be executed in a timer --- context, at a later point in time. The purpose is that some APIs (such as --- sockets) are not available during some nginx request phases (such as the --- logging phase), but are available for use in timers. There are no ordering --- guarantees for when a function will be executed. -function _M.to_timer_phase(func, ...) - if ngx.get_phase() == "timer" then - func(...) - return true - end - - if #queue >= MAX_QUEUE_SIZE then - ngx.log(ngx.ERR, "deferred timer queue full") - return nil, "deferred timer queue full" - end - - table.insert(queue, { func = func, args = {...} }) - if not timer_started then - local ok, err = ngx.timer.at(0, flush_queue) - if ok then - -- unfortunately this is to deal with tests - when running unit tests, we - -- dont actually run the timer, we call the function inline - if util.tablelength(queue) > 0 then - timer_started = true - end - else - local msg = "failed to create timer: " .. tostring(err) - ngx.log(ngx.ERR, msg) - return nil, msg - end - end - return true -end - -return _M diff --git a/rootfs/etc/nginx/template/nginx.tmpl b/rootfs/etc/nginx/template/nginx.tmpl index c1bc5f16e1..ba2d2df688 100644 --- a/rootfs/etc/nginx/template/nginx.tmpl +++ b/rootfs/etc/nginx/template/nginx.tmpl @@ -90,6 +90,7 @@ http { {{ if $all.DynamicConfigurationEnabled }} init_worker_by_lua_block { balancer.init_worker() + monitor.init_worker() } {{ end }} {{ end }}