Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: ewma use p2c to improve performance #3300

Merged
merged 30 commits into from
Jan 29, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
158 changes: 105 additions & 53 deletions apisix/balancer/ewma.lua
Original file line number Diff line number Diff line change
Expand Up @@ -3,28 +3,50 @@
-- Inspiration drawn from:
-- https://github.com/twitter/finagle/blob/1bc837c4feafc0096e43c0e98516a8e1c50c4421
-- /finagle-core/src/main/scala/com/twitter/finagle/loadbalancer/PeakEwma.scala

local core = require("apisix.core")
local resty_lock = require("resty.lock")

local nkeys = core.table.nkeys
local table_insert = core.table.insert
local ngx = ngx
local ngx_shared = ngx.shared
local ngx_now = ngx.now
local math = math
local pairs = pairs
local ipairs = ipairs
local next = next
local error = error

local _M = {}
local DECAY_TIME = 10 -- this value is in seconds
local LOCK_KEY = ":ewma_key"

local shm_ewma = ngx_shared.balancer_ewma
local shm_last_touched_at= ngx_shared.balancer_ewma_last_touched_at
local shm_last_touched_at = ngx_shared.balancer_ewma_last_touched_at

local lrucache_addr = core.lrucache.new({ttl = 300, count = 1024})
local lrucache_trans_format = core.lrucache.new({ttl = 300, count = 256})

local ewma_lock, ewma_lock_err = resty_lock:new("balancer_ewma_locks", {timeout = 0, exptime = 0.1})

local lrucache_addr = core.lrucache.new({
ttl = 300, count = 1024
})
local lrucache_trans_format = core.lrucache.new({
ttl = 300, count = 256
})
local _M = {name = "ewma"}

local function lock(upstream)
local _, err = ewma_lock:lock(upstream .. LOCK_KEY)
if err and err ~= "timeout" then
core.log.error("EWMA Balancer failed to lock: ", err)
end

return err
end

local function unlock()
local ok, err = ewma_lock:unlock()
if not ok then
core.log.error("EWMA Balancer failed to unlock: ", err)
end

return err
end

local function decay_ewma(ewma, last_touched_at, rtt, now)
local td = now - last_touched_at
Expand All @@ -35,73 +57,68 @@ local function decay_ewma(ewma, last_touched_at, rtt, now)
return ewma
end


local function store_stats(upstream, ewma, now)
local success, err, forcible = shm_last_touched_at:set(upstream, now)
if not success then
core.log.error("balancer_ewma_last_touched_at:set failed ", err)
core.log.error("shm_last_touched_at:set failed: ", err)
end
if forcible then
core.log.warn("balancer_ewma_last_touched_at:set valid items forcibly overwritten")
core.log.warn("shm_last_touched_at:set valid items forcibly overwritten")
end

success, err, forcible = shm_ewma:set(upstream, ewma)
if not success then
core.log.error("balancer_ewma:set failed ", err)
core.log.error("shm_ewma:set failed: ", err)
end
if forcible then
core.log.warn("balancer_ewma:set valid items forcibly overwritten")
core.log.warn("shm_ewma:set valid items forcibly overwritten")
end
end


local function get_or_update_ewma(upstream, rtt, update)
if update then

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not locking for get without update will result in potentially incorrect behaviour. Imagine at line 86 you fetch current ewma value, then another worker updates it and its last_touched_at value before you retrieve last_touched_at at line 92. Then you will end up treating the old ewma as a new one. I'm not sure if in practice it would make big difference, but it is definitely not a correct behaviour.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the only way of getting rid of locking for "get" operations is to combine ewma and timestamp in the same value and store under single key. But then you would need to do encoding and decoding every time you set and fetch it. It can be interesting to try that and see the performance.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ping @hnlq715

Copy link
Contributor Author

@sysulq sysulq Jan 19, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not locking for get without update will result in potentially incorrect behaviour. Imagine at line 86 you fetch current ewma value, then another worker updates it and its last_touched_at value before you retrieve last_touched_at at line 92. Then you will end up treating the old ewma as a new one. I'm not sure if in practice it would make big difference, but it is definitely not a correct behaviour.

@ElvinEfendi You're right, this is not a correct behavior, but maybe a proper solution. In this situation you mentioned, last_touched_at value should have little difference (almost the same time), and we do not need to lock all get operation, which is quite heavy.

I think the only way of getting rid of locking for "get" operations is to combine ewma and timestamp in the same value and store under single key. But then you would need to do encoding and decoding every time you set and fetch it. It can be interesting to try that and see the performance.

My first implementation is using worker process cache to store this, and we can simply avoid locking, without frequently encoding and decoding, which have a better performance. But we still have other concerns, details can be found in #3211

And with shared memory, we have to sacrifice between performance and correctness.

I did some benchmark between these two situations:

lock get and update

+ wrk -d 5 -c 16 http://127.0.0.1:9080/hello
Running 5s test @ http://127.0.0.1:9080/hello
  2 threads and 16 connections
  Thread Stats   Avg      Stdev     Max   +/- Stdev
    Latency     1.62ms  282.29us   7.64ms   90.45%
    Req/Sec     4.98k   511.87     9.62k    97.03%
  50013 requests in 5.10s, 199.42MB read
Requests/sec:   9806.24
Transfer/sec:     39.10MB
+ sleep 1
+ wrk -d 5 -c 16 http://127.0.0.1:9080/hello
Running 5s test @ http://127.0.0.1:9080/hello
  2 threads and 16 connections
  Thread Stats   Avg      Stdev     Max   +/- Stdev
    Latency     1.69ms  288.94us   7.25ms   89.40%
    Req/Sec     4.77k   285.81     5.28k    63.73%
  48370 requests in 5.10s, 192.87MB read
Requests/sec:   9484.82
Transfer/sec:     37.82MB

lock update

+ wrk -d 5 -c 16 http://127.0.0.1:9080/hello
Running 5s test @ http://127.0.0.1:9080/hello
  2 threads and 16 connections
  Thread Stats   Avg      Stdev     Max   +/- Stdev
    Latency     1.57ms  289.91us   7.23ms   89.61%
    Req/Sec     5.14k   584.14    10.43k    96.04%
  51652 requests in 5.10s, 205.95MB read
Requests/sec:  10128.09
Transfer/sec:     40.38MB
+ sleep 1
+ wrk -d 5 -c 16 http://127.0.0.1:9080/hello
Running 5s test @ http://127.0.0.1:9080/hello
  2 threads and 16 connections
  Thread Stats   Avg      Stdev     Max   +/- Stdev
    Latency     1.55ms  255.99us   6.55ms   89.96%
    Req/Sec     5.18k   539.62     9.77k    95.05%
  52008 requests in 5.10s, 207.37MB read
Requests/sec:  10198.48
Transfer/sec:     40.66MB

local lock_err = lock(upstream)
if lock_err ~= nil then
return 0, lock_err
end
end

local ewma = shm_ewma:get(upstream) or 0

local now = ngx_now()
local last_touched_at = shm_last_touched_at:get(upstream) or 0
ewma = decay_ewma(ewma, last_touched_at, rtt, now)

if not update then
return ewma
return ewma, nil
end

store_stats(upstream, ewma, now)

return ewma
unlock()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The unlock()'s err is not checked?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have record the error, when unlock() failed.


return ewma, nil
end

local function get_upstream_name(upstream)
return upstream.host .. ":" .. upstream.port
end

local function score(upstream)
-- Original implementation used names
-- Endpoints don't have names, so passing in host:Port as key instead
local upstream_name = upstream.host .. ":" .. upstream.port
-- Endpoints don't have names, so passing in IP:Port as key instead
local upstream_name = get_upstream_name(upstream)
return get_or_update_ewma(upstream_name, 0, false)
end


local function pick_and_score(peers)
local lowest_score_index = 1
local lowest_score = score(peers[lowest_score_index])
for i = 2, #peers do
local new_score = score(peers[i])
if new_score < lowest_score then
lowest_score_index, lowest_score = i, new_score
end
end

return peers[lowest_score_index], lowest_score
end


local function parse_addr(addr)
local host, port, err = core.utils.parse_addr(addr)
return {host = host, port = port}, err
end


local function _trans_format(up_nodes)
-- trans
--{"1.2.3.4:80":100,"5.6.7.8:8080":100}
-- {"1.2.3.4:80":100,"5.6.7.8:8080":100}
-- into
-- [{"host":"1.2.3.4","port":"80"},{"host":"5.6.7.8","port":"8080"}]
local peers = {}
Expand All @@ -119,38 +136,72 @@ local function _trans_format(up_nodes)
return next(peers) and peers or nil
end


local function _ewma_find(ctx, up_nodes)
local peers
local endpoint

if not up_nodes
or core.table.nkeys(up_nodes) == 0 then
if not up_nodes or nkeys(up_nodes) == 0 then
return nil, 'up_nodes empty'
end

peers = lrucache_trans_format(ctx.upstream_key, ctx.upstream_version,
_trans_format, up_nodes)
if ctx.balancer_tried_servers and ctx.balancer_tried_servers_count == nkeys(up_nodes) then
return nil, "all upstream servers tried"
end

peers = lrucache_trans_format(ctx.upstream_key, ctx.upstream_version, _trans_format, up_nodes)
if not peers then
return nil, 'up_nodes trans error'
end

if #peers > 1 then
endpoint = pick_and_score(peers)
local filtered_peers
if ctx.balancer_tried_servers then
for _, peer in ipairs(peers) do
if not ctx.balancer_tried_servers[get_upstream_name(peer)] then
if not filtered_peers then
filtered_peers = {}
end

table_insert(filtered_peers, peer)
end
end
else
endpoint = peers[1]
filtered_peers = peers
end

return endpoint.host .. ":" .. endpoint.port
end
local endpoint = filtered_peers[1]

if #filtered_peers > 1 then
local a, b = math.random(1, #filtered_peers), math.random(1, #filtered_peers - 1)
if b >= a then
b = b + 1
end

local backendpoint
endpoint, backendpoint = filtered_peers[a], filtered_peers[b]
if score(endpoint) > score(backendpoint) then
endpoint = backendpoint
end
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to sync the tried_endpoint check from https://github.com/kubernetes/ingress-nginx/blob/a2e77185cc2e91278962f4f1267246c8fefc6e73/rootfs/etc/nginx/lua/balancer/ewma.lua#L180 to our new implementation.
You can take a look at:

if not before_retry then
if ctx.balancer_tried_servers then
core.tablepool.release("balancer_tried_servers", ctx.balancer_tried_servers)
ctx.balancer_tried_servers = nil
end
return nil
end
if not ctx.balancer_tried_servers then
ctx.balancer_tried_servers = core.tablepool.fetch("balancer_tried_servers", 0, 2)
end
ctx.balancer_tried_servers[ctx.balancer_server] = true
ctx.balancer_tried_servers_count = (ctx.balancer_tried_servers_count or 0) + 1
end
}
end
return _M

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe this is another topic we could improve the stability, like passive/active healthcheck, tried record & tries count?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be better to fix this known issue in this PR.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems this problem is still not addressed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, filter logic is added, I'm focusing something else in these days.

end

return get_upstream_name(endpoint)
end

local function _ewma_after_balance(ctx, before_retry)
if before_retry then
-- don't count tries which fail to complete
if not ctx.balancer_tried_servers then
ctx.balancer_tried_servers = core.tablepool.fetch("balancer_tried_servers", 0, 2)
end

ctx.balancer_tried_servers[ctx.balancer_server] = true
ctx.balancer_tried_servers_count = (ctx.balancer_tried_servers_count or 0) + 1

return nil
end

if ctx.balancer_tried_servers then
core.tablepool.release("balancer_tried_servers", ctx.balancer_tried_servers)
ctx.balancer_tried_servers = nil
end

local response_time = ctx.var.upstream_response_time or 0
local connect_time = ctx.var.upstream_connect_time or 0
local rtt = connect_time + response_time
Expand All @@ -163,21 +214,22 @@ local function _ewma_after_balance(ctx, before_retry)
return get_or_update_ewma(upstream, rtt, true)
end


function _M.new(up_nodes, upstream)
if not shm_ewma
or not shm_last_touched_at then
if not shm_ewma or not shm_last_touched_at then
return nil, "dictionary not find"
end

if not ewma_lock then
error(ewma_lock_err)
end

return {
upstream = upstream,
get = function (ctx)
get = function(ctx)
return _ewma_find(ctx, up_nodes)
end,
after_balance = _ewma_after_balance
}
end


return _M
Loading