From a48033ad765d1c24a160f6e13f1aa8ddfd295cd7 Mon Sep 17 00:00:00 2001 From: Qi Date: Sun, 5 Jun 2022 23:23:11 +0800 Subject: [PATCH] feat(timerng) integrate the lua-resty-timer-ng * fix(plugin_servers) bypass timer library * feat(AdminAPI) add `/timer` route * feat(conf) add new config `legacy_timer_mechanism` (boolean) * chore(rockspec) add `resty.timerng` as dependency * tests(*) add `wait_timers_*()` for helper * tests(plugin/zipkin) fix * tests(unit/db/cache_warmup) fix * tests(integration/cmd/quit) fix * tests(integration/hybrid_mode/ocsp) fix * tests(plugin/rate-limiting/access) fix * tests(integration/db/query-semaphore) fix * tests(integration/admin/timers) add tests for `http://kong_admin/timers` * tests(integration/cmd/start_stop) fix * tests(integration/proxy/router) fix * docs(CHANGELOG) update Co-authored-by: Mayo Co-authored-by: Wangchong Zhou --- CHANGELOG.md | 18 ++-- kong-2.8.0-0.rockspec | 1 + kong/api/routes/kong.lua | 5 ++ kong/globalpatches.lua | 44 ++++++++++ kong/init.lua | 1 + kong/runloop/plugin_servers/init.lua | 5 +- spec/01-unit/01-db/08-cache_warmup_spec.lua | 18 +++- .../02-cmd/02-start_stop_spec.lua | 35 +++++--- spec/02-integration/02-cmd/08-quit_spec.lua | 2 +- .../03-db/09-query-semaphore_spec.lua | 7 +- .../04-admin_api/20-timers_spec.lua | 54 ++++++++++++ .../05-proxy/02-router_spec.lua | 8 +- .../09-hybrid_mode/05-ocsp_spec.lua | 24 +++--- .../10-go_plugins/01-reports_spec.lua | 2 +- .../23-rate-limiting/04-access_spec.lua | 84 +++++++++++++++++++ spec/03-plugins/34-zipkin/zipkin_spec.lua | 18 ++++ .../lua-resty-dns/resty/dns/resolver.lua | 16 ++-- spec/helpers.lua | 65 ++++++++++++++ 18 files changed, 360 insertions(+), 47 deletions(-) create mode 100644 spec/02-integration/04-admin_api/20-timers_spec.lua diff --git a/CHANGELOG.md b/CHANGELOG.md index 60905b5008c..c08a6cfcead 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -198,6 +198,18 @@ ### Additions +#### Performance +- Do not register unnecessary event handlers on Hybrid mode Control Plane +nodes [#8452](https://github.com/Kong/kong/pull/8452). +- Use the new timer library to improve performance, + except for the plugin server. + [8912](https://github.com/Kong/kong/pull/8912) + +#### Admin API + +- Added a new API `/timers` to get the timer statistics. + [8912](https://github.com/Kong/kong/pull/8912) + #### Core - Added `cache_key` on target entity for uniqueness detection. @@ -279,12 +291,6 @@ a restart (e.g., upon a plugin server crash). instead of `proxy_error_log` [8583](https://github.com/Kong/kong/pull/8583) -### Additions - -#### Performance -- Do not register unnecessary event handlers on Hybrid mode Control Plane -nodes [#8452](https://github.com/Kong/kong/pull/8452). - ## [2.8.0] ### Deprecations diff --git a/kong-2.8.0-0.rockspec b/kong-2.8.0-0.rockspec index 71cfa356f8c..af9230f79c7 100644 --- a/kong-2.8.0-0.rockspec +++ b/kong-2.8.0-0.rockspec @@ -41,6 +41,7 @@ dependencies = { "lua-resty-ipmatcher == 0.6.1", "lua-resty-acme == 0.8.0", "lua-resty-session == 3.10", + "lua-resty-timer-ng == 0.1.0", } build = { type = "builtin", diff --git a/kong/api/routes/kong.lua b/kong/api/routes/kong.lua index e3cf728a1a1..b567e20b15f 100644 --- a/kong/api/routes/kong.lua +++ b/kong/api/routes/kong.lua @@ -185,4 +185,9 @@ return { return kong.response.exit(200, copy) end }, + ["/timers"] = { + GET = function (self, db, helpers) + return kong.response.exit(200, _G.timerng_stats()) + end + } } diff --git a/kong/globalpatches.lua b/kong/globalpatches.lua index 5fd16c2c01a..b535b614904 100644 --- a/kong/globalpatches.lua +++ b/kong/globalpatches.lua @@ -68,6 +68,50 @@ return function(options) end + do + _G.native_timer_at = ngx.timer.at + _G.native_timer_every = ngx.timer.every + + local timerng + + if options.cli or options.rbusted then + timerng = require("resty.timerng").new({ + min_threads = 16, + max_threads = 32, + }) + + timerng:start() + + else + timerng = require("resty.timerng").new() + + -- TODO rename + _G.timerng_start = function (debug) + timerng:start() + timerng:set_debug(debug) + end + + end + + _G.ngx.timer.at = function (delay, callback, ...) + return timerng:at(delay, callback, ...) + end + + _G.ngx.timer.every = function (interval, callback, ...) + return timerng:every(interval, callback, ...) + end + + -- TODO rename + _G.timerng_stats = function () + return timerng:stats({ + verbose = true, + flamegraph = true, + }) + end + + end + + do -- implement a Lua based shm for: cli (and hence rbusted) diff --git a/kong/init.lua b/kong/init.lua index 707dd200c3e..c761d2f6913 100644 --- a/kong/init.lua +++ b/kong/init.lua @@ -592,6 +592,7 @@ function Kong.init_worker() -- duplicated seeds. math.randomseed() + _G.timerng_start(kong.configuration.log_level == "debug") -- init DB diff --git a/kong/runloop/plugin_servers/init.lua b/kong/runloop/plugin_servers/init.lua index 8039f267e90..c2634b646b0 100644 --- a/kong/runloop/plugin_servers/init.lua +++ b/kong/runloop/plugin_servers/init.lua @@ -8,7 +8,6 @@ local kong = kong local ngx_var = ngx.var local coroutine_running = coroutine.running local get_plugin_info = proc_mgmt.get_plugin_info -local ngx_timer_at = ngx.timer.at local subsystem = ngx.config.subsystem --- keep request data a bit longer, into the log timer @@ -250,7 +249,7 @@ local function build_phases(plugin) response_status = ngx.status, } - ngx_timer_at(0, function() + _G.native_timer_at(0, function() local co = coroutine_running() save_for_later[co] = saved @@ -322,7 +321,7 @@ function plugin_servers.start() for _, server_def in ipairs(proc_mgmt.get_server_defs()) do if server_def.start_command then - ngx_timer_at(0, pluginserver_timer, server_def) + _G.native_timer_at(0, pluginserver_timer, server_def) end end end diff --git a/spec/01-unit/01-db/08-cache_warmup_spec.lua b/spec/01-unit/01-db/08-cache_warmup_spec.lua index 8ae7e6dd335..6cb36f87bb1 100644 --- a/spec/01-unit/01-db/08-cache_warmup_spec.lua +++ b/spec/01-unit/01-db/08-cache_warmup_spec.lua @@ -1,4 +1,5 @@ local cache_warmup = require("kong.cache.warmup") +local helpers = require("spec.helpers") local function mock_entity(db_data, entity_name, cache_key) @@ -211,9 +212,15 @@ describe("cache_warmup", function() cache_warmup._mock_kong(kong) + local runs_old = _G.timerng_stats().sys.runs + assert.truthy(cache_warmup.execute({"my_entity", "services"})) - ngx.sleep(0) -- yield so that async DNS caching happens + -- waiting async DNS cacheing + helpers.wait_until(function () + local runs = _G.timerng_stats().sys.runs + return runs_old < runs + end) -- `my_entity` isn't a core entity; lookup is on client cache assert.same(kong.cache:get("111").bbb, 222) @@ -264,9 +271,15 @@ describe("cache_warmup", function() cache_warmup._mock_kong(kong) + local runs_old = _G.timerng_stats().sys.runs + assert.truthy(cache_warmup.execute({"my_entity", "services"})) - ngx.sleep(0.001) -- yield so that async DNS caching happens + -- waiting async DNS cacheing + helpers.wait_until(function () + local runs = _G.timerng_stats().sys.runs + return runs_old < runs + end) -- `my_entity` isn't a core entity; lookup is on client cache assert.same(kong.cache:get("111").bbb, 222) @@ -279,6 +292,7 @@ describe("cache_warmup", function() -- skipped IP entry assert.same({ "example.com", "example.test" }, dns_queries) + end) diff --git a/spec/02-integration/02-cmd/02-start_stop_spec.lua b/spec/02-integration/02-cmd/02-start_stop_spec.lua index 8ba896874af..bd111b0cfae 100644 --- a/spec/02-integration/02-cmd/02-start_stop_spec.lua +++ b/spec/02-integration/02-cmd/02-start_stop_spec.lua @@ -13,7 +13,9 @@ describe("kong start/stop #" .. strategy, function() end) after_each(function() helpers.kill_all() + os.execute("rm -rf " .. helpers.test_conf.prefix .. "/worker_events.sock") end) + lazy_teardown(function() helpers.clean_prefix() end) @@ -178,6 +180,10 @@ describe("kong start/stop #" .. strategy, function() end) describe("verbose args", function() + after_each(function () + os.execute("rm -rf " .. helpers.test_conf.prefix .. "/worker_events.sock") + end) + it("accepts verbose --v", function() local _, _, stdout = assert(helpers.kong_exec("start --v --conf " .. helpers.test_conf_path)) assert.matches("[verbose] prefix in use: ", stdout, nil, true) @@ -226,18 +232,23 @@ describe("kong start/stop #" .. strategy, function() end) describe("/etc/hosts resolving in CLI", function() - it("resolves #cassandra hostname", function() - assert(helpers.kong_exec("start --vv --run-migrations --conf " .. helpers.test_conf_path, { - cassandra_contact_points = "localhost", - database = "cassandra" - })) - end) - it("resolves #postgres hostname", function() - assert(helpers.kong_exec("start --conf " .. helpers.test_conf_path, { - pg_host = "localhost", - database = "postgres" - })) - end) + if strategy == "cassandra" then + it("resolves #cassandra hostname", function() + assert(helpers.kong_exec("start --vv --run-migrations --conf " .. helpers.test_conf_path, { + cassandra_contact_points = "localhost", + database = "cassandra" + })) + end) + + elseif strategy == "postgres" then + it("resolves #postgres hostname", function() + assert(helpers.kong_exec("start --conf " .. helpers.test_conf_path, { + pg_host = "localhost", + database = "postgres" + })) + end) + end + end) -- TODO: update with new error messages and behavior diff --git a/spec/02-integration/02-cmd/08-quit_spec.lua b/spec/02-integration/02-cmd/08-quit_spec.lua index e7af6c0486b..2989ab82ef7 100644 --- a/spec/02-integration/02-cmd/08-quit_spec.lua +++ b/spec/02-integration/02-cmd/08-quit_spec.lua @@ -31,6 +31,6 @@ describe("kong quit", function() assert(helpers.kong_exec("quit --wait 2 --prefix " .. helpers.test_conf.prefix)) ngx.update_time() local duration = ngx.now() - start - assert.is.near(2, duration, 1.6) + assert.is.near(2, duration, 2.5) end) end) diff --git a/spec/02-integration/03-db/09-query-semaphore_spec.lua b/spec/02-integration/03-db/09-query-semaphore_spec.lua index 21983bffa9e..e32517337d2 100644 --- a/spec/02-integration/03-db/09-query-semaphore_spec.lua +++ b/spec/02-integration/03-db/09-query-semaphore_spec.lua @@ -33,6 +33,8 @@ describe("#postgres Postgres query locks", function() end) it("results in query error failing to acquire resource", function() + local wait_timers_ctx = helpers.wait_timers_begin() + local res = assert(client:send { method = "GET", path = "/slow-resource?prime=true", @@ -40,7 +42,9 @@ describe("#postgres Postgres query locks", function() }) assert.res_status(204 , res) - -- make a request that would run a query while no resources are available + -- wait for zero-delay timer + helpers.wait_timers_end(wait_timers_ctx, 0.5) + res = assert(client:send { method = "GET", path = "/slow-resource", @@ -49,5 +53,6 @@ describe("#postgres Postgres query locks", function() local body = assert.res_status(500 , res) local json = cjson.decode(body) assert.same({ error = "error acquiring query semaphore: timeout" }, json) + end) end) diff --git a/spec/02-integration/04-admin_api/20-timers_spec.lua b/spec/02-integration/04-admin_api/20-timers_spec.lua new file mode 100644 index 00000000000..499f679fc6e --- /dev/null +++ b/spec/02-integration/04-admin_api/20-timers_spec.lua @@ -0,0 +1,54 @@ +local helpers = require "spec.helpers" +local cjson = require "cjson" + + +for _, strategy in helpers.each_strategy() do + +describe("Admin API[#" .. strategy .. "]" , function() +local client + + lazy_setup(function() + helpers.get_db_utils(strategy) + + assert(helpers.start_kong({ + database = strategy, + nginx_conf = "spec/fixtures/custom_nginx.template", + })) + + client = helpers.admin_client() + end) + + teardown(function() + if client then + client:close() + end + helpers.stop_kong() + end) + + it("/timers", function () + local res = assert(client:send { + method = "GET", + path = "/timers", + headers = { ["Content-Type"] = "application/json" } + }) + + local body = assert.res_status(200 , res) + local json = cjson.decode(body) + + assert(type(json.flamegraph.running) == "string") + assert(type(json.flamegraph.pending) == "string") + assert(type(json.flamegraph.elapsed_time) == "string") + + assert(type(json.sys.total) == "number") + assert(type(json.sys.runs) == "number") + assert(type(json.sys.running) == "number") + assert(type(json.sys.pending) == "number") + assert(type(json.sys.waiting) == "number") + + assert(type(json.timers) == "table") + + end) + +end) + +end diff --git a/spec/02-integration/05-proxy/02-router_spec.lua b/spec/02-integration/05-proxy/02-router_spec.lua index 262bfbdd812..f39a6c41936 100644 --- a/spec/02-integration/05-proxy/02-router_spec.lua +++ b/spec/02-integration/05-proxy/02-router_spec.lua @@ -72,6 +72,9 @@ local function insert_routes(bp, routes) local cfg = bp.done() local yaml = declarative.to_yaml_string(cfg) local admin_client = helpers.admin_client() + + local wait_timers_ctx = helpers.wait_timers_begin() + local res = assert(admin_client:send { method = "POST", path = "/config", @@ -85,9 +88,12 @@ local function insert_routes(bp, routes) assert.res_status(201, res) admin_client:close() + -- wait for timers finish + helpers.wait_timers_end(wait_timers_ctx, 0.5) end - ngx.sleep(0.01) -- temporary wait + ngx.sleep(0.01) -- temporary wait for worker events + return routes end diff --git a/spec/02-integration/09-hybrid_mode/05-ocsp_spec.lua b/spec/02-integration/09-hybrid_mode/05-ocsp_spec.lua index 7eeddee6a78..da4d49c45ac 100644 --- a/spec/02-integration/09-hybrid_mode/05-ocsp_spec.lua +++ b/spec/02-integration/09-hybrid_mode/05-ocsp_spec.lua @@ -46,6 +46,8 @@ for cluster_protocol, conf in pairs(confs) do cluster_ca_cert = "spec/fixtures/ocsp_certs/ca.crt", })) + set_ocsp_status("good") + assert(helpers.start_kong({ role = "data_plane", cluster_protocol = cluster_protocol, @@ -60,8 +62,6 @@ for cluster_protocol, conf in pairs(confs) do cluster_server_name = "kong_clustering", cluster_ca_cert = "spec/fixtures/ocsp_certs/ca.crt", })) - - set_ocsp_status("good") end) lazy_teardown(function() @@ -117,6 +117,8 @@ for cluster_protocol, conf in pairs(confs) do cluster_ca_cert = "spec/fixtures/ocsp_certs/ca.crt", })) + set_ocsp_status("revoked") + assert(helpers.start_kong({ role = "data_plane", cluster_protocol = cluster_protocol, @@ -131,8 +133,6 @@ for cluster_protocol, conf in pairs(confs) do cluster_server_name = "kong_clustering", cluster_ca_cert = "spec/fixtures/ocsp_certs/ca.crt", })) - - set_ocsp_status("revoked") end) lazy_teardown(function() @@ -186,6 +186,8 @@ for cluster_protocol, conf in pairs(confs) do cluster_ca_cert = "spec/fixtures/ocsp_certs/ca.crt", })) + set_ocsp_status("error") + assert(helpers.start_kong({ role = "data_plane", cluster_protocol = cluster_protocol, @@ -200,8 +202,6 @@ for cluster_protocol, conf in pairs(confs) do cluster_server_name = "kong_clustering", cluster_ca_cert = "spec/fixtures/ocsp_certs/ca.crt", })) - - set_ocsp_status("error") end) lazy_teardown(function() @@ -258,6 +258,8 @@ for cluster_protocol, conf in pairs(confs) do cluster_ca_cert = "spec/fixtures/ocsp_certs/ca.crt", })) + set_ocsp_status("revoked") + assert(helpers.start_kong({ role = "data_plane", cluster_protocol = cluster_protocol, @@ -272,8 +274,6 @@ for cluster_protocol, conf in pairs(confs) do cluster_server_name = "kong_clustering", cluster_ca_cert = "spec/fixtures/ocsp_certs/ca.crt", })) - - set_ocsp_status("revoked") end) lazy_teardown(function() @@ -331,6 +331,8 @@ for cluster_protocol, conf in pairs(confs) do cluster_ca_cert = "spec/fixtures/ocsp_certs/ca.crt", })) + set_ocsp_status("revoked") + assert(helpers.start_kong({ role = "data_plane", cluster_protocol = cluster_protocol, @@ -345,8 +347,6 @@ for cluster_protocol, conf in pairs(confs) do cluster_server_name = "kong_clustering", cluster_ca_cert = "spec/fixtures/ocsp_certs/ca.crt", })) - - set_ocsp_status("revoked") end) lazy_teardown(function() @@ -400,6 +400,8 @@ for cluster_protocol, conf in pairs(confs) do cluster_ca_cert = "spec/fixtures/ocsp_certs/ca.crt", })) + set_ocsp_status("error") + assert(helpers.start_kong({ role = "data_plane", cluster_protocol = cluster_protocol, @@ -414,8 +416,6 @@ for cluster_protocol, conf in pairs(confs) do cluster_server_name = "kong_clustering", cluster_ca_cert = "spec/fixtures/ocsp_certs/ca.crt", })) - - set_ocsp_status("error") end) lazy_teardown(function() diff --git a/spec/02-integration/10-go_plugins/01-reports_spec.lua b/spec/02-integration/10-go_plugins/01-reports_spec.lua index 719c98eca7d..7dd001add44 100644 --- a/spec/02-integration/10-go_plugins/01-reports_spec.lua +++ b/spec/02-integration/10-go_plugins/01-reports_spec.lua @@ -10,7 +10,7 @@ for _, strategy in helpers.each_strategy() do describe("anonymous reports for go plugins #" .. strategy, function() local reports_send_ping = function(port) - ngx.sleep(0.01) -- hand over the CPU so other threads can do work (processing the sent data) + ngx.sleep(0.2) -- hand over the CPU so other threads can do work (processing the sent data) local admin_client = helpers.admin_client() local res = admin_client:post("/reports/send-ping" .. (port and "?port=" .. port or "")) assert.response(res).has_status(200) diff --git a/spec/03-plugins/23-rate-limiting/04-access_spec.lua b/spec/03-plugins/23-rate-limiting/04-access_spec.lua index b02c662b18e..626bcbba507 100644 --- a/spec/03-plugins/23-rate-limiting/04-access_spec.lua +++ b/spec/03-plugins/23-rate-limiting/04-access_spec.lua @@ -360,6 +360,8 @@ for _, strategy in helpers.each_strategy() do describe("Without authentication (IP address)", function() it_with_retry("blocks if exceeding limit", function() for i = 1, 6 do + local wait_timers_ctx = helpers.wait_timers_begin() + local res = GET("/status/200", { headers = { Host = "test1.com" }, }, 200) @@ -370,6 +372,9 @@ for _, strategy in helpers.each_strategy() do assert.are.same(6 - i, tonumber(res.headers["ratelimit-remaining"])) local reset = tonumber(res.headers["ratelimit-reset"]) assert.equal(true, reset <= 60 and reset >= 0) + + -- wait for zero-delay timer + helpers.wait_timers_end(wait_timers_ctx, 0.5) end -- Additonal request, while limit is 6/minute @@ -392,6 +397,8 @@ for _, strategy in helpers.each_strategy() do it_with_retry("blocks if exceeding limit, only if done via same path", function() for i = 1, 3 do + local wait_timers_ctx = helpers.wait_timers_begin() + local res = GET("/status/200", { headers = { Host = "test-path.com" }, }, 200) @@ -402,10 +409,15 @@ for _, strategy in helpers.each_strategy() do assert.are.same(6 - i, tonumber(res.headers["ratelimit-remaining"])) local reset = tonumber(res.headers["ratelimit-reset"]) assert.equal(true, reset <= 60 and reset > 0) + + -- wait for zero-delay timer + helpers.wait_timers_end(wait_timers_ctx, 0.5) end -- Try a different path on the same host. This should reset the timers for i = 1, 3 do + local wait_timers_ctx = helpers.wait_timers_begin() + local res = GET("/status/201", { headers = { Host = "test-path.com" }, }, 201) @@ -416,10 +428,15 @@ for _, strategy in helpers.each_strategy() do assert.are.same(6 - i, tonumber(res.headers["ratelimit-remaining"])) local reset = tonumber(res.headers["ratelimit-reset"]) assert.equal(true, reset <= 60 and reset > 0) + + -- wait for zero-delay timer + helpers.wait_timers_end(wait_timers_ctx, 0.5) end -- Continue doing requests on the path which "blocks" for i = 4, 6 do + local wait_timers_ctx = helpers.wait_timers_begin() + local res = GET("/status/200", { headers = { Host = "test-path.com" }, }, 200) @@ -430,6 +447,9 @@ for _, strategy in helpers.each_strategy() do assert.are.same(6 - i, tonumber(res.headers["ratelimit-remaining"])) local reset = tonumber(res.headers["ratelimit-reset"]) assert.equal(true, reset <= 60 and reset > 0) + + -- wait for zero-delay timer + helpers.wait_timers_end(wait_timers_ctx, 0.5) end -- Additonal request, while limit is 6/minute @@ -452,6 +472,8 @@ for _, strategy in helpers.each_strategy() do it_with_retry("counts against the same service register from different routes", function() for i = 1, 3 do + local wait_timers_ctx = helpers.wait_timers_begin() + local res = GET("/status/200", { headers = { Host = "test-service1.com" }, }, 200) @@ -462,9 +484,14 @@ for _, strategy in helpers.each_strategy() do assert.are.same(6 - i, tonumber(res.headers["ratelimit-remaining"])) local reset = tonumber(res.headers["ratelimit-reset"]) assert.equal(true, reset <= 60 and reset > 0) + + -- wait for zero-delay timer + helpers.wait_timers_end(wait_timers_ctx, 0.5) end for i = 4, 6 do + local wait_timers_ctx = helpers.wait_timers_begin() + local res = GET("/status/200", { headers = { Host = "test-service2.com" }, }, 200) @@ -475,6 +502,9 @@ for _, strategy in helpers.each_strategy() do assert.are.same(6 - i, tonumber(res.headers["ratelimit-remaining"])) local reset = tonumber(res.headers["ratelimit-reset"]) assert.equal(true, reset <= 60 and reset > 0) + + -- wait for zero-delay timer + helpers.wait_timers_end(wait_timers_ctx, 0.5) end -- Additonal request, while limit is 6/minute @@ -502,6 +532,8 @@ for _, strategy in helpers.each_strategy() do } for i = 1, 3 do + local wait_timers_ctx = helpers.wait_timers_begin() + local res = GET("/status/200", { headers = { Host = "test2.com" }, }, 200) @@ -514,6 +546,9 @@ for _, strategy in helpers.each_strategy() do assert.are.same(limits.minute - i, tonumber(res.headers["ratelimit-remaining"])) local reset = tonumber(res.headers["ratelimit-reset"]) assert.equal(true, reset <= 60 and reset > 0) + + -- wait for zero-delay timer + helpers.wait_timers_end(wait_timers_ctx, 0.5) end local res, body = GET("/status/200", { @@ -539,6 +574,8 @@ for _, strategy in helpers.each_strategy() do describe("Without authentication (IP address)", function() it_with_retry("blocks if exceeding limit #grpc", function() for i = 1, 6 do + local wait_timers_ctx = helpers.wait_timers_begin() + local ok, res = helpers.proxy_client_grpc(){ service = "hello.HelloService.SayHello", opts = { @@ -555,6 +592,8 @@ for _, strategy in helpers.each_strategy() do local reset = tonumber(string.match(res, "ratelimit%-reset: (%d+)")) assert.equal(true, reset <= 60 and reset >= 0) + -- wait for zero-delay timer + helpers.wait_timers_end(wait_timers_ctx, 0.5) end -- Additonal request, while limit is 6/minute @@ -582,6 +621,8 @@ for _, strategy in helpers.each_strategy() do describe("API-specific plugin", function() it_with_retry("blocks if exceeding limit", function() for i = 1, 6 do + local wait_timers_ctx = helpers.wait_timers_begin() + local res = GET("/status/200?apikey=apikey123", { headers = { Host = "test3.com" }, }, 200) @@ -592,6 +633,9 @@ for _, strategy in helpers.each_strategy() do assert.are.same(6 - i, tonumber(res.headers["ratelimit-remaining"])) local reset = tonumber(res.headers["ratelimit-reset"]) assert.equal(true, reset <= 60 and reset > 0) + + -- wait for zero-delay timer + helpers.wait_timers_end(wait_timers_ctx, 0.5) end -- Third query, while limit is 2/minute @@ -620,6 +664,8 @@ for _, strategy in helpers.each_strategy() do describe("#flaky Plugin customized for specific consumer and route", function() it_with_retry("blocks if exceeding limit", function() for i = 1, 8 do + local wait_timers_ctx = helpers.wait_timers_begin() + local res = GET("/status/200?apikey=apikey122", { headers = { Host = "test3.com" }, }, 200) @@ -630,6 +676,9 @@ for _, strategy in helpers.each_strategy() do assert.are.same(8 - i, tonumber(res.headers["ratelimit-remaining"])) local reset = tonumber(res.headers["ratelimit-reset"]) assert.equal(true, reset <= 60 and reset > 0) + + -- wait for zero-delay timer + helpers.wait_timers_end(wait_timers_ctx, 0.5) end local res, body = GET("/status/200?apikey=apikey122", { @@ -651,6 +700,8 @@ for _, strategy in helpers.each_strategy() do it_with_retry("blocks if the only rate-limiting plugin existing is per consumer and not per API", function() for i = 1, 6 do + local wait_timers_ctx = helpers.wait_timers_begin() + local res = GET("/status/200?apikey=apikey122", { headers = { Host = "test4.com" }, }, 200) @@ -661,6 +712,9 @@ for _, strategy in helpers.each_strategy() do assert.are.same(6 - i, tonumber(res.headers["ratelimit-remaining"])) local reset = tonumber(res.headers["ratelimit-reset"]) assert.equal(true, reset <= 60 and reset > 0) + + -- wait for zero-delay timer + helpers.wait_timers_end(wait_timers_ctx, 0.5) end local res, body = GET("/status/200?apikey=apikey122", { @@ -987,6 +1041,8 @@ for _, strategy in helpers.each_strategy() do it_with_retry("blocks when the consumer exceeds their quota, no matter what service/route used", function() for i = 1, 6 do + local wait_timers_ctx = helpers.wait_timers_begin() + local res = GET("/status/200?apikey=apikey125", { headers = { Host = fmt("test%d.com", i) }, }, 200) @@ -997,6 +1053,9 @@ for _, strategy in helpers.each_strategy() do assert.are.same(6 - i, tonumber(res.headers["ratelimit-remaining"])) local reset = tonumber(res.headers["ratelimit-reset"]) assert.equal(true, reset <= 60 and reset > 0) + + -- wait for zero-delay timer + helpers.wait_timers_end(wait_timers_ctx, 0.5) end -- Additonal request, while limit is 6/minute @@ -1067,6 +1126,8 @@ for _, strategy in helpers.each_strategy() do it_with_retry("blocks if exceeding limit", function() for i = 1, 6 do + local wait_timers_ctx = helpers.wait_timers_begin() + local res = GET("/status/200", { headers = { Host = fmt("test%d.com", i) }, }, 200) @@ -1077,6 +1138,9 @@ for _, strategy in helpers.each_strategy() do assert.are.same(6 - i, tonumber(res.headers["ratelimit-remaining"])) local reset = tonumber(res.headers["ratelimit-reset"]) assert.equal(true, reset <= 60 and reset > 0) + + -- wait for zero-delay timer + helpers.wait_timers_end(wait_timers_ctx, 0.5) end -- Additonal request, while limit is 6/minute @@ -1150,6 +1214,8 @@ for _, strategy in helpers.each_strategy() do it_with_retry("blocks if exceeding limit", function() for i = 1, 6 do + local wait_timers_ctx = helpers.wait_timers_begin() + local res = GET("/status/200", { headers = { Host = "test1.com" } }, 200) assert.are.same(6, tonumber(res.headers["x-ratelimit-limit-minute"])) @@ -1158,9 +1224,14 @@ for _, strategy in helpers.each_strategy() do assert.are.same(6 - i, tonumber(res.headers["ratelimit-remaining"])) local reset = tonumber(res.headers["ratelimit-reset"]) assert.equal(true, reset <= 60 and reset > 0) + + -- wait for zero-delay timer + helpers.wait_timers_end(wait_timers_ctx, 0.5) end for i = 1, 6 do + local wait_timers_ctx = helpers.wait_timers_begin() + local res = GET("/status/200", { headers = { Host = "test2.com" } }, 200) assert.are.same(6, tonumber(res.headers["x-ratelimit-limit-minute"])) @@ -1169,6 +1240,9 @@ for _, strategy in helpers.each_strategy() do assert.are.same(6 - i, tonumber(res.headers["ratelimit-remaining"])) local reset = tonumber(res.headers["ratelimit-reset"]) assert.equal(true, reset <= 60 and reset > 0) + + -- wait for zero-delay timer + helpers.wait_timers_end(wait_timers_ctx, 0.5) end -- Additonal request, while limit is 6/minute @@ -1233,6 +1307,8 @@ for _, strategy in helpers.each_strategy() do it_with_retry("blocks if exceeding limit", function() for i = 1, 6 do + local wait_timers_ctx = helpers.wait_timers_begin() + local res = GET("/status/200", { headers = { Host = fmt("test%d.com", i) }, }, 200) @@ -1243,6 +1319,9 @@ for _, strategy in helpers.each_strategy() do assert.are.same(6 - i, tonumber(res.headers["ratelimit-remaining"])) local reset = tonumber(res.headers["ratelimit-reset"]) assert.equal(true, reset <= 60 and reset > 0) + + -- wait for zero-delay timer + helpers.wait_timers_end(wait_timers_ctx, 0.5) end -- Additonal request, while limit is 6/minute @@ -1313,6 +1392,8 @@ for _, strategy in helpers.each_strategy() do it_with_retry("maintains the counters for a path through different services and routes", function() for i = 1, 6 do + local wait_timers_ctx = helpers.wait_timers_begin() + local res = GET("/status/200", { headers = { Host = fmt("test%d.com", i) }, }, 200) @@ -1323,6 +1404,9 @@ for _, strategy in helpers.each_strategy() do assert.are.same(6 - i, tonumber(res.headers["ratelimit-remaining"])) local reset = tonumber(res.headers["ratelimit-reset"]) assert.equal(true, reset <= 60 and reset > 0) + + -- wait for zero-delay timer + helpers.wait_timers_end(wait_timers_ctx, 0.5) end -- Additonal request, while limit is 6/minute diff --git a/spec/03-plugins/34-zipkin/zipkin_spec.lua b/spec/03-plugins/34-zipkin/zipkin_spec.lua index 2854e9435b5..72e8d76a839 100644 --- a/spec/03-plugins/34-zipkin/zipkin_spec.lua +++ b/spec/03-plugins/34-zipkin/zipkin_spec.lua @@ -374,6 +374,8 @@ for _, strategy in helpers.each_strategy() do end) it("times out if connection times out to upstream zipkin server", function() + local wait_timers_ctx = helpers.wait_timers_begin() + local res = assert(proxy_client:send({ method = "GET", path = "/status/200", @@ -382,10 +384,16 @@ for _, strategy in helpers.each_strategy() do } })) assert.res_status(200, res) + + -- wait for zero-delay timer + helpers.wait_timers_end(wait_timers_ctx, 0.5) + assert.logfile().has.line("reporter flush failed to request: timeout", false, 2) end) it("times out if upstream zipkin server takes too long to respond", function() + local wait_timers_ctx = helpers.wait_timers_begin() + local res = assert(proxy_client:send({ method = "GET", path = "/status/200", @@ -394,10 +402,16 @@ for _, strategy in helpers.each_strategy() do } })) assert.res_status(200, res) + + -- wait for zero-delay timer + helpers.wait_timers_end(wait_timers_ctx, 0.5) + assert.logfile().has.line("reporter flush failed to request: timeout", false, 2) end) it("connection refused if upstream zipkin server is not listening", function() + local wait_timers_ctx = helpers.wait_timers_begin() + local res = assert(proxy_client:send({ method = "GET", path = "/status/200", @@ -406,6 +420,10 @@ for _, strategy in helpers.each_strategy() do } })) assert.res_status(200, res) + + -- wait for zero-delay timer + helpers.wait_timers_end(wait_timers_ctx, 0.5) + assert.logfile().has.line("reporter flush failed to request: connection refused", false, 2) end) end) diff --git a/spec/fixtures/mocks/lua-resty-dns/resty/dns/resolver.lua b/spec/fixtures/mocks/lua-resty-dns/resty/dns/resolver.lua index 75dcea922b9..f6d56e71b86 100644 --- a/spec/fixtures/mocks/lua-resty-dns/resty/dns/resolver.lua +++ b/spec/fixtures/mocks/lua-resty-dns/resty/dns/resolver.lua @@ -101,14 +101,14 @@ resolver.query = function(self, name, options, tries) end end -do - local semaphore = require "ngx.semaphore" - local old_post = semaphore.post - function semaphore.post(self, n) - old_post(self, n) - ngx.sleep(0) - end -end +-- do +-- local semaphore = require "ngx.semaphore" +-- local old_post = semaphore.post +-- function semaphore.post(self, n) +-- old_post(self, n) +-- ngx.sleep(0) +-- end +-- end return resolver diff --git a/spec/helpers.lua b/spec/helpers.lua index 41d20509ed5..7404dd5d4c5 100644 --- a/spec/helpers.lua +++ b/spec/helpers.lua @@ -1390,6 +1390,69 @@ local function wait_until(f, timeout, step) end +---record the currently existing timer +---@param admin_client_timeout number optional, to override the default timeout setting +---@param forced_admin_port number optional, to override the default port of admin API +---@return table ctx is used for `wait_timers_end` +local function wait_timers_begin(admin_client_timeout, forced_admin_port) + local admin_client = admin_client(admin_client_timeout, forced_admin_port) + local res = assert(admin_client:get("/timers")) + local body = luassert.res_status(200, res) + local json = cjson.decode(body) + + return assert(json.timers) +end + + +---wait for all timers created after `wait_timers_begin` was called to finish +---@param ctx table was returned by `wait_timers_begin` +---@param timeout number optional, maximum time to wait (default = 2) +---@param admin_client_timeout? number optional, to override the default timeout setting +---@param forced_admin_port? number optional, to override the default port of admin API +local function wait_timers_end(ctx, timeout, + admin_client_timeout, forced_admin_port) + local _admin_client = admin_client(admin_client_timeout, forced_admin_port) + local res = assert(_admin_client:get("/timers")) + local body = luassert.res_status(200, res) + local json = cjson.decode(body) + + local new_timers = assert(json.timers) + + -- difference pf `ctx` and `new_timers` + local delta_timers = {} + + for timer_name, timer in pairs(new_timers) do + if not ctx[timer_name] then + delta_timers[timer_name] = timer + end + end + + if not timeout then + timeout = 2 + end + + wait_until(function () + _admin_client:close() + _admin_client = admin_client() + res = assert(_admin_client:get("/timers")) + body = luassert.res_status(200, res) + json = cjson.decode(body) + + local intersection = pl_tablex.intersection(json.timers, delta_timers) + + if #intersection == 0 then + return true + end + + local now_timers_str = cjson.encode(pl_tablex.keys(json.timers)) + local delta_timers_str = cjson.encode(pl_tablex.keys(delta_timers)) + + return false, "now_timers: " .. now_timers_str .. ", delta_timers: " .. delta_timers_str + + end, timeout) +end + + --- Waits for invalidation of a cached key by polling the mgt-api -- and waiting for a 404 response. Throws an error on timeout. -- @@ -3017,6 +3080,8 @@ end http2_client = http2_client, wait_until = wait_until, wait_pid = wait_pid, + wait_timers_begin = wait_timers_begin, + wait_timers_end = wait_timers_end, tcp_server = tcp_server, udp_server = udp_server, kill_tcp_server = kill_tcp_server,