diff --git a/apisix/balancer/ewma.lua b/apisix/balancer/ewma.lua index b66b07a28701..e1b276273063 100644 --- a/apisix/balancer/ewma.lua +++ b/apisix/balancer/ewma.lua @@ -3,28 +3,50 @@ -- Inspiration drawn from: -- https://github.com/twitter/finagle/blob/1bc837c4feafc0096e43c0e98516a8e1c50c4421 -- /finagle-core/src/main/scala/com/twitter/finagle/loadbalancer/PeakEwma.scala - local core = require("apisix.core") +local resty_lock = require("resty.lock") + +local nkeys = core.table.nkeys +local table_insert = core.table.insert local ngx = ngx local ngx_shared = ngx.shared local ngx_now = ngx.now local math = math local pairs = pairs +local ipairs = ipairs local next = next +local error = error -local _M = {} local DECAY_TIME = 10 -- this value is in seconds +local LOCK_KEY = ":ewma_key" local shm_ewma = ngx_shared.balancer_ewma -local shm_last_touched_at= ngx_shared.balancer_ewma_last_touched_at +local shm_last_touched_at = ngx_shared.balancer_ewma_last_touched_at + +local lrucache_addr = core.lrucache.new({ttl = 300, count = 1024}) +local lrucache_trans_format = core.lrucache.new({ttl = 300, count = 256}) + +local ewma_lock, ewma_lock_err = resty_lock:new("balancer_ewma_locks", {timeout = 0, exptime = 0.1}) -local lrucache_addr = core.lrucache.new({ - ttl = 300, count = 1024 -}) -local lrucache_trans_format = core.lrucache.new({ - ttl = 300, count = 256 -}) +local _M = {name = "ewma"} +local function lock(upstream) + local _, err = ewma_lock:lock(upstream .. LOCK_KEY) + if err and err ~= "timeout" then + core.log.error("EWMA Balancer failed to lock: ", err) + end + + return err +end + +local function unlock() + local ok, err = ewma_lock:unlock() + if not ok then + core.log.error("EWMA Balancer failed to unlock: ", err) + end + + return err +end local function decay_ewma(ewma, last_touched_at, rtt, now) local td = now - last_touched_at @@ -35,73 +57,68 @@ local function decay_ewma(ewma, last_touched_at, rtt, now) return ewma end - local function store_stats(upstream, ewma, now) local success, err, forcible = shm_last_touched_at:set(upstream, now) if not success then - core.log.error("balancer_ewma_last_touched_at:set failed ", err) + core.log.error("shm_last_touched_at:set failed: ", err) end if forcible then - core.log.warn("balancer_ewma_last_touched_at:set valid items forcibly overwritten") + core.log.warn("shm_last_touched_at:set valid items forcibly overwritten") end success, err, forcible = shm_ewma:set(upstream, ewma) if not success then - core.log.error("balancer_ewma:set failed ", err) + core.log.error("shm_ewma:set failed: ", err) end if forcible then - core.log.warn("balancer_ewma:set valid items forcibly overwritten") + core.log.warn("shm_ewma:set valid items forcibly overwritten") end end - local function get_or_update_ewma(upstream, rtt, update) + if update then + local lock_err = lock(upstream) + if lock_err ~= nil then + return 0, lock_err + end + end + local ewma = shm_ewma:get(upstream) or 0 + local now = ngx_now() local last_touched_at = shm_last_touched_at:get(upstream) or 0 ewma = decay_ewma(ewma, last_touched_at, rtt, now) if not update then - return ewma + return ewma, nil end store_stats(upstream, ewma, now) - return ewma + unlock() + + return ewma, nil end +local function get_upstream_name(upstream) + return upstream.host .. ":" .. upstream.port +end local function score(upstream) -- Original implementation used names - -- Endpoints don't have names, so passing in host:Port as key instead - local upstream_name = upstream.host .. ":" .. upstream.port + -- Endpoints don't have names, so passing in IP:Port as key instead + local upstream_name = get_upstream_name(upstream) return get_or_update_ewma(upstream_name, 0, false) end - -local function pick_and_score(peers) - local lowest_score_index = 1 - local lowest_score = score(peers[lowest_score_index]) - for i = 2, #peers do - local new_score = score(peers[i]) - if new_score < lowest_score then - lowest_score_index, lowest_score = i, new_score - end - end - - return peers[lowest_score_index], lowest_score -end - - local function parse_addr(addr) local host, port, err = core.utils.parse_addr(addr) return {host = host, port = port}, err end - local function _trans_format(up_nodes) -- trans - --{"1.2.3.4:80":100,"5.6.7.8:8080":100} + -- {"1.2.3.4:80":100,"5.6.7.8:8080":100} -- into -- [{"host":"1.2.3.4","port":"80"},{"host":"5.6.7.8","port":"8080"}] local peers = {} @@ -119,38 +136,72 @@ local function _trans_format(up_nodes) return next(peers) and peers or nil end - local function _ewma_find(ctx, up_nodes) local peers - local endpoint - if not up_nodes - or core.table.nkeys(up_nodes) == 0 then + if not up_nodes or nkeys(up_nodes) == 0 then return nil, 'up_nodes empty' end - peers = lrucache_trans_format(ctx.upstream_key, ctx.upstream_version, - _trans_format, up_nodes) + if ctx.balancer_tried_servers and ctx.balancer_tried_servers_count == nkeys(up_nodes) then + return nil, "all upstream servers tried" + end + + peers = lrucache_trans_format(ctx.upstream_key, ctx.upstream_version, _trans_format, up_nodes) if not peers then return nil, 'up_nodes trans error' end - if #peers > 1 then - endpoint = pick_and_score(peers) + local filtered_peers + if ctx.balancer_tried_servers then + for _, peer in ipairs(peers) do + if not ctx.balancer_tried_servers[get_upstream_name(peer)] then + if not filtered_peers then + filtered_peers = {} + end + + table_insert(filtered_peers, peer) + end + end else - endpoint = peers[1] + filtered_peers = peers end - return endpoint.host .. ":" .. endpoint.port -end + local endpoint = filtered_peers[1] + + if #filtered_peers > 1 then + local a, b = math.random(1, #filtered_peers), math.random(1, #filtered_peers - 1) + if b >= a then + b = b + 1 + end + + local backendpoint + endpoint, backendpoint = filtered_peers[a], filtered_peers[b] + if score(endpoint) > score(backendpoint) then + endpoint = backendpoint + end + end + return get_upstream_name(endpoint) +end local function _ewma_after_balance(ctx, before_retry) if before_retry then - -- don't count tries which fail to complete + if not ctx.balancer_tried_servers then + ctx.balancer_tried_servers = core.tablepool.fetch("balancer_tried_servers", 0, 2) + end + + ctx.balancer_tried_servers[ctx.balancer_server] = true + ctx.balancer_tried_servers_count = (ctx.balancer_tried_servers_count or 0) + 1 + return nil end + if ctx.balancer_tried_servers then + core.tablepool.release("balancer_tried_servers", ctx.balancer_tried_servers) + ctx.balancer_tried_servers = nil + end + local response_time = ctx.var.upstream_response_time or 0 local connect_time = ctx.var.upstream_connect_time or 0 local rtt = connect_time + response_time @@ -163,21 +214,22 @@ local function _ewma_after_balance(ctx, before_retry) return get_or_update_ewma(upstream, rtt, true) end - function _M.new(up_nodes, upstream) - if not shm_ewma - or not shm_last_touched_at then + if not shm_ewma or not shm_last_touched_at then return nil, "dictionary not find" end + if not ewma_lock then + error(ewma_lock_err) + end + return { upstream = upstream, - get = function (ctx) + get = function(ctx) return _ewma_find(ctx, up_nodes) end, after_balance = _ewma_after_balance } end - return _M diff --git a/t/node/ewma.t b/t/node/ewma.t index 955a6b785fe6..53bf8485e6f1 100644 --- a/t/node/ewma.t +++ b/t/node/ewma.t @@ -166,7 +166,7 @@ GET /t return end - ngx.sleep(20) + ngx.sleep(11) --keep the node 1980 hot for i = 1, 12 do local httpc = http.new() @@ -215,3 +215,167 @@ GET /t --- error_code: 200 --- no_error_log [error] + + + +=== TEST 4: about filter tried servers +--- timeout: 10 +--- config + location /t { + content_by_lua_block { + local t = require("lib.test_admin").test + + --remove the 1981 node, + --add the 9527 node (invalid node) + --keep two nodes for triggering ewma logic in server_picker function of balancer phase + local code, body = t('/apisix/admin/routes/1', + ngx.HTTP_PUT, + [[{ + "upstream": { + "nodes": { + "127.0.0.1:1980": 1, + "127.0.0.1:9527": 1 + }, + "type": "ewma", + "timeout": { + "connect": 0.1, + "send": 0.5, + "read": 0.5 + } + }, + "uri": "/ewma" + }]] + ) + + if code ~= 200 then + ngx.say("update route failed") + return + end + + local http = require "resty.http" + local uri = "http://127.0.0.1:" .. ngx.var.server_port + .. "/ewma" + + --should always select the 1980 node, because 0 is invalid + local t = {} + local ports_count = {} + for i = 1, 12 do + local th = assert(ngx.thread.spawn(function(i) + local httpc = http.new() + httpc:set_timeout(2000) + local res, err = httpc:request_uri(uri, {method = "GET", keepalive = false}) + if not res then + ngx.say(err) + return + end + ports_count[res.body] = (ports_count[res.body] or 0) + 1 + end, i)) + table.insert(t, th) + end + for i, th in ipairs(t) do + ngx.thread.wait(th) + end + + local ports_arr = {} + for port, count in pairs(ports_count) do + table.insert(ports_arr, {port = port, count = count}) + end + + local function cmd(a, b) + return a.port > b.port + end + table.sort(ports_arr, cmd) + + ngx.say(require("toolkit.json").encode(ports_arr)) + ngx.exit(200) + } + } +--- request +GET /t +--- response_body +[{"count":12,"port":"1980"}] +--- error_code: 200 +--- error_log +Connection refused) while connecting to upstream + + + +=== TEST 5: about all endpoints have been retried +--- timeout: 10 +--- config + location /t { + content_by_lua_block { + local t = require("lib.test_admin").test + + --add the 9527 node (invalid node) + --add the 9528 node (invalid node) + --keep two nodes for triggering ewma logic in server_picker function of balancer phase + local code, body = t('/apisix/admin/routes/1', + ngx.HTTP_PUT, + [[{ + "upstream": { + "nodes": { + "127.0.0.1:9527": 1, + "127.0.0.1:9528": 1 + }, + "type": "ewma", + "timeout": { + "connect": 0.1, + "send": 0.5, + "read": 0.5 + } + }, + "uri": "/ewma" + }]] + ) + + if code ~= 200 then + ngx.say("update route failed") + return + end + + local http = require "resty.http" + local uri = "http://127.0.0.1:" .. ngx.var.server_port + .. "/ewma" + + --should always return 502, because both 9527 and 9528 are invalid + local t = {} + local ports_count = {} + for i = 1, 12 do + local th = assert(ngx.thread.spawn(function(i) + local httpc = http.new() + httpc:set_timeout(2000) + local res, err = httpc:request_uri(uri, {method = "GET", keepalive = false}) + if not res then + ngx.say(err) + return + end + ports_count[res.status] = (ports_count[res.status] or 0) + 1 + end, i)) + table.insert(t, th) + end + for i, th in ipairs(t) do + ngx.thread.wait(th) + end + + local ports_arr = {} + for port, count in pairs(ports_count) do + table.insert(ports_arr, {port = port, count = count}) + end + + local function cmd(a, b) + return a.port > b.port + end + table.sort(ports_arr, cmd) + + ngx.say(require("toolkit.json").encode(ports_arr)) + ngx.exit(200) + } + } +--- request +GET /t +--- response_body +[{"count":12,"port":502}] +--- error_code: 200 +--- error_log +Connection refused) while connecting to upstream