From d1119445490093d4a38d6ca1db7d1c440348b8e4 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?=E5=A4=A7=E5=8F=AF?= <hnlq.sysu@gmail.com>
Date: Fri, 15 Jan 2021 14:55:13 +0800
Subject: [PATCH 01/30] ewma use p2c

---
 apisix/balancer/ewma.lua | 147 +++++++++++++++++++++++----------------
 1 file changed, 88 insertions(+), 59 deletions(-)

diff --git a/apisix/balancer/ewma.lua b/apisix/balancer/ewma.lua
index b66b07a28701..5546cc778206 100644
--- a/apisix/balancer/ewma.lua
+++ b/apisix/balancer/ewma.lua
@@ -5,15 +5,18 @@
 --   /finagle-core/src/main/scala/com/twitter/finagle/loadbalancer/PeakEwma.scala
 
 local core = require("apisix.core")
+local resty_lock = require("resty.lock")
+
 local ngx = ngx
 local ngx_shared = ngx.shared
 local ngx_now = ngx.now
 local math = math
 local pairs = pairs
-local next = next
+local tostring = tostring
+local string = string
 
-local _M = {}
 local DECAY_TIME = 10 -- this value is in seconds
+local LOCK_KEY = ":ewma_key"
 
 local shm_ewma = ngx_shared.balancer_ewma
 local shm_last_touched_at= ngx_shared.balancer_ewma_last_touched_at
@@ -25,80 +28,102 @@ local lrucache_trans_format = core.lrucache.new({
     ttl = 300, count = 256
 })
 
-
-local function decay_ewma(ewma, last_touched_at, rtt, now)
-    local td = now - last_touched_at
-    td = math.max(td, 0)
-    local weight = math.exp(-td / DECAY_TIME)
-
-    ewma = ewma * weight + rtt * (1.0 - weight)
-    return ewma
+local ewma_lock, ewma_lock_err = resty_lock:new("balancer_ewma_locks", {timeout = 0, exptime = 0.1})
+if not ewma_lock then
+  error(ewma_lock_err)
 end
 
+local _M = { name = "ewma" }
 
-local function store_stats(upstream, ewma, now)
-    local success, err, forcible = shm_last_touched_at:set(upstream, now)
-    if not success then
-        core.log.error("balancer_ewma_last_touched_at:set failed ", err)
-    end
-    if forcible then
-        core.log.warn("balancer_ewma_last_touched_at:set valid items forcibly overwritten")
+local function lock(upstream)
+  local _, err = ewma_lock:lock(upstream .. LOCK_KEY)
+  if err then
+    if err ~= "timeout" then
+      ngx.log(ngx.ERR, string.format("EWMA Balancer failed to lock: %s", tostring(err)))
     end
+  end
 
-    success, err, forcible = shm_ewma:set(upstream, ewma)
-    if not success then
-        core.log.error("balancer_ewma:set failed ", err)
-    end
-    if forcible then
-        core.log.warn("balancer_ewma:set valid items forcibly overwritten")
-    end
+  return err
 end
 
+local function unlock()
+  local ok, err = ewma_lock:unlock()
+  if not ok then
+    ngx.log(ngx.ERR, string.format("EWMA Balancer failed to unlock: %s", tostring(err)))
+  end
 
-local function get_or_update_ewma(upstream, rtt, update)
-    local ewma = shm_ewma:get(upstream) or 0
-    local now = ngx_now()
-    local last_touched_at = shm_last_touched_at:get(upstream) or 0
-    ewma = decay_ewma(ewma, last_touched_at, rtt, now)
+  return err
+end
 
-    if not update then
-        return ewma
-    end
+local function decay_ewma(ewma, last_touched_at, rtt, now)
+  local td = now - last_touched_at
+  td = (td > 0) and td or 0
+  local weight = math.exp(-td/DECAY_TIME)
 
-    store_stats(upstream, ewma, now)
+  ewma = ewma * weight + rtt * (1.0 - weight)
+  return ewma
+end
 
-    return ewma
+local function store_stats(upstream, ewma, now)
+  local success, err, forcible = shm_last_touched_at:set(upstream, now)
+  if not success then
+    ngx.log(ngx.WARN, "shm_last_touched_at:set failed " .. err)
+  end
+  if forcible then
+    ngx.log(ngx.WARN, "shm_last_touched_at:set valid items forcibly overwritten")
+  end
+
+  success, err, forcible = shm_ewma:set(upstream, ewma)
+  if not success then
+    ngx.log(ngx.WARN, "shm_ewma:set failed " .. err)
+  end
+  if forcible then
+    ngx.log(ngx.WARN, "shm_ewma:set valid items forcibly overwritten")
+  end
 end
 
+local function get_or_update_ewma(upstream, rtt, update)
+  local lock_err = nil
+  if update then
+    lock_err = lock(upstream)
+  end
 
-local function score(upstream)
-    -- Original implementation used names
-    -- Endpoints don't have names, so passing in host:Port as key instead
-    local upstream_name = upstream.host .. ":" .. upstream.port
-    return get_or_update_ewma(upstream_name, 0, false)
-end
+  local ewma = ngx.shared.balancer_ewma:get(upstream) or 0
+  if lock_err ~= nil then
+    return ewma, lock_err
+  end
 
+  local now = ngx_now()
+  local last_touched_at = ngx.shared.balancer_ewma_last_touched_at:get(upstream) or 0
+  ewma = decay_ewma(ewma, last_touched_at, rtt, now)
 
-local function pick_and_score(peers)
-    local lowest_score_index = 1
-    local lowest_score = score(peers[lowest_score_index])
-    for i = 2, #peers do
-        local new_score = score(peers[i])
-        if new_score < lowest_score then
-            lowest_score_index, lowest_score = i, new_score
-        end
-    end
+  if not update then
+    return ewma, nil
+  end
+
+  store_stats(upstream, ewma, now)
+
+  unlock()
 
-    return peers[lowest_score_index], lowest_score
+  return ewma, nil
 end
 
+local function get_upstream_name(upstream)
+   return upstream.host .. ":" .. upstream.port
+end
+
+local function score(upstream)
+  -- Original implementation used names
+  -- Endpoints don't have names, so passing in IP:Port as key instead
+  local upstream_name = get_upstream_name(upstream)
+  return get_or_update_ewma(upstream_name, 0, false)
+end
 
 local function parse_addr(addr)
     local host, port, err = core.utils.parse_addr(addr)
     return {host = host, port = port}, err
 end
 
-
 local function _trans_format(up_nodes)
     -- trans
     --{"1.2.3.4:80":100,"5.6.7.8:8080":100}
@@ -119,10 +144,8 @@ local function _trans_format(up_nodes)
     return next(peers) and peers or nil
 end
 
-
 local function _ewma_find(ctx, up_nodes)
     local peers
-    local endpoint
 
     if not up_nodes
        or core.table.nkeys(up_nodes) == 0 then
@@ -135,13 +158,21 @@ local function _ewma_find(ctx, up_nodes)
         return nil, 'up_nodes trans error'
     end
 
+    local endpoint, backendpoint = peers[1], nil
+
     if #peers > 1 then
-        endpoint = pick_and_score(peers)
-    else
-        endpoint = peers[1]
+        local a, b = math.random(1, #peers), math.random(1, #peers-1)
+        if b >= a then
+            b = b+1
+        end
+
+        endpoint, backendpoint = peers[a], peers[b]
+        if score(endpoint) > score(backendpoint) then
+            endpoint, backendpoint = backendpoint, endpoint
+        end
     end
 
-    return endpoint.host .. ":" .. endpoint.port
+    return get_upstream_name(endpoint)
 end
 
 
@@ -163,7 +194,6 @@ local function _ewma_after_balance(ctx, before_retry)
     return get_or_update_ewma(upstream, rtt, true)
 end
 
-
 function _M.new(up_nodes, upstream)
     if not shm_ewma
        or not shm_last_touched_at then
@@ -179,5 +209,4 @@ function _M.new(up_nodes, upstream)
     }
 end
 
-
 return _M

From 85ae7ed5c0d99125b53f53cf97442c9892e5190d Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?=E5=A4=A7=E5=8F=AF?= <hnlq.sysu@gmail.com>
Date: Fri, 15 Jan 2021 15:30:34 +0800
Subject: [PATCH 02/30] better style

---
 apisix/balancer/ewma.lua | 14 +++++++-------
 1 file changed, 7 insertions(+), 7 deletions(-)

diff --git a/apisix/balancer/ewma.lua b/apisix/balancer/ewma.lua
index 5546cc778206..f727c6f818e6 100644
--- a/apisix/balancer/ewma.lua
+++ b/apisix/balancer/ewma.lua
@@ -39,7 +39,7 @@ local function lock(upstream)
   local _, err = ewma_lock:lock(upstream .. LOCK_KEY)
   if err then
     if err ~= "timeout" then
-      ngx.log(ngx.ERR, string.format("EWMA Balancer failed to lock: %s", tostring(err)))
+      core.log.error("EWMA Balancer failed to lock: ", err)
     end
   end
 
@@ -49,7 +49,7 @@ end
 local function unlock()
   local ok, err = ewma_lock:unlock()
   if not ok then
-    ngx.log(ngx.ERR, string.format("EWMA Balancer failed to unlock: %s", tostring(err)))
+    core.log.error("EWMA Balancer failed to unlock: ", err)
   end
 
   return err
@@ -67,18 +67,18 @@ end
 local function store_stats(upstream, ewma, now)
   local success, err, forcible = shm_last_touched_at:set(upstream, now)
   if not success then
-    ngx.log(ngx.WARN, "shm_last_touched_at:set failed " .. err)
+    core.log.warn("shm_last_touched_at:set failed: ", err)
   end
   if forcible then
-    ngx.log(ngx.WARN, "shm_last_touched_at:set valid items forcibly overwritten")
+    core.log.warn("shm_last_touched_at:set valid items forcibly overwritten")
   end
 
   success, err, forcible = shm_ewma:set(upstream, ewma)
   if not success then
-    ngx.log(ngx.WARN, "shm_ewma:set failed " .. err)
+    core.log.warn("shm_ewma:set failed: ", err)
   end
   if forcible then
-    ngx.log(ngx.WARN, "shm_ewma:set valid items forcibly overwritten")
+    core.log.warn("shm_ewma:set valid items forcibly overwritten")
   end
 end
 
@@ -88,7 +88,7 @@ local function get_or_update_ewma(upstream, rtt, update)
     lock_err = lock(upstream)
   end
 
-  local ewma = ngx.shared.balancer_ewma:get(upstream) or 0
+  local ewma = shm_ewma:get(upstream) or 0
   if lock_err ~= nil then
     return ewma, lock_err
   end

From 08ad76c9c20cb4438bec2a8fd9d27842271e2601 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?=E5=A4=A7=E5=8F=AF?= <hnlq.sysu@gmail.com>
Date: Fri, 15 Jan 2021 15:51:32 +0800
Subject: [PATCH 03/30] format code using lua-format

---
 apisix/balancer/ewma.lua | 143 +++++++++++++++++++--------------------
 1 file changed, 68 insertions(+), 75 deletions(-)

diff --git a/apisix/balancer/ewma.lua b/apisix/balancer/ewma.lua
index f727c6f818e6..dd0c3f661687 100644
--- a/apisix/balancer/ewma.lua
+++ b/apisix/balancer/ewma.lua
@@ -3,7 +3,6 @@
 -- Inspiration drawn from:
 -- https://github.com/twitter/finagle/blob/1bc837c4feafc0096e43c0e98516a8e1c50c4421
 --   /finagle-core/src/main/scala/com/twitter/finagle/loadbalancer/PeakEwma.scala
-
 local core = require("apisix.core")
 local resty_lock = require("resty.lock")
 
@@ -12,111 +11,108 @@ local ngx_shared = ngx.shared
 local ngx_now = ngx.now
 local math = math
 local pairs = pairs
-local tostring = tostring
-local string = string
 
 local DECAY_TIME = 10 -- this value is in seconds
 local LOCK_KEY = ":ewma_key"
 
 local shm_ewma = ngx_shared.balancer_ewma
-local shm_last_touched_at= ngx_shared.balancer_ewma_last_touched_at
+local shm_last_touched_at = ngx_shared.balancer_ewma_last_touched_at
 
-local lrucache_addr = core.lrucache.new({
-    ttl = 300, count = 1024
-})
-local lrucache_trans_format = core.lrucache.new({
-    ttl = 300, count = 256
-})
+local lrucache_addr = core.lrucache.new({ttl = 300, count = 1024})
+local lrucache_trans_format = core.lrucache.new({ttl = 300, count = 256})
 
-local ewma_lock, ewma_lock_err = resty_lock:new("balancer_ewma_locks", {timeout = 0, exptime = 0.1})
+local ewma_lock, ewma_lock_err = resty_lock:new("balancer_ewma_locks",
+                                                {timeout = 0, exptime = 0.1})
 if not ewma_lock then
-  error(ewma_lock_err)
+    error(ewma_lock_err)
 end
 
-local _M = { name = "ewma" }
+local _M = {name = "ewma"}
 
 local function lock(upstream)
-  local _, err = ewma_lock:lock(upstream .. LOCK_KEY)
-  if err then
-    if err ~= "timeout" then
-      core.log.error("EWMA Balancer failed to lock: ", err)
+    local _, err = ewma_lock:lock(upstream .. LOCK_KEY)
+    if err then
+        if err ~= "timeout" then
+            core.log.error("EWMA Balancer failed to lock: ", err)
+        end
     end
-  end
 
-  return err
+    return err
 end
 
 local function unlock()
-  local ok, err = ewma_lock:unlock()
-  if not ok then
-    core.log.error("EWMA Balancer failed to unlock: ", err)
-  end
+    local ok, err = ewma_lock:unlock()
+    if not ok then
+        core.log.error("EWMA Balancer failed to unlock: ", err)
+    end
 
-  return err
+    return err
 end
 
 local function decay_ewma(ewma, last_touched_at, rtt, now)
-  local td = now - last_touched_at
-  td = (td > 0) and td or 0
-  local weight = math.exp(-td/DECAY_TIME)
+    local td = now - last_touched_at
+    td = (td > 0) and td or 0
+    local weight = math.exp(-td / DECAY_TIME)
 
-  ewma = ewma * weight + rtt * (1.0 - weight)
-  return ewma
+    ewma = ewma * weight + rtt * (1.0 - weight)
+    return ewma
 end
 
 local function store_stats(upstream, ewma, now)
-  local success, err, forcible = shm_last_touched_at:set(upstream, now)
-  if not success then
-    core.log.warn("shm_last_touched_at:set failed: ", err)
-  end
-  if forcible then
-    core.log.warn("shm_last_touched_at:set valid items forcibly overwritten")
-  end
-
-  success, err, forcible = shm_ewma:set(upstream, ewma)
-  if not success then
-    core.log.warn("shm_ewma:set failed: ", err)
-  end
-  if forcible then
-    core.log.warn("shm_ewma:set valid items forcibly overwritten")
-  end
+    local success, err, forcible = shm_last_touched_at:set(upstream, now)
+    if not success then
+        core.log.warn("shm_last_touched_at:set failed: ", err)
+    end
+    if forcible then
+        core.log
+            .warn("shm_last_touched_at:set valid items forcibly overwritten")
+    end
+
+    success, err, forcible = shm_ewma:set(upstream, ewma)
+    if not success then
+        core.log.warn("shm_ewma:set failed: ", err)
+    end
+    if forcible then
+        core.log.warn("shm_ewma:set valid items forcibly overwritten")
+    end
 end
 
 local function get_or_update_ewma(upstream, rtt, update)
-  local lock_err = nil
-  if update then
-    lock_err = lock(upstream)
-  end
+    local lock_err = nil
+    if update then
+        lock_err = lock(upstream)
+    end
 
-  local ewma = shm_ewma:get(upstream) or 0
-  if lock_err ~= nil then
-    return ewma, lock_err
-  end
+    local ewma = shm_ewma:get(upstream) or 0
+    if lock_err ~= nil then
+        return ewma, lock_err
+    end
 
-  local now = ngx_now()
-  local last_touched_at = ngx.shared.balancer_ewma_last_touched_at:get(upstream) or 0
-  ewma = decay_ewma(ewma, last_touched_at, rtt, now)
+    local now = ngx_now()
+    local last_touched_at = ngx.shared.balancer_ewma_last_touched_at:get(
+                                upstream) or 0
+    ewma = decay_ewma(ewma, last_touched_at, rtt, now)
 
-  if not update then
-    return ewma, nil
-  end
+    if not update then
+        return ewma, nil
+    end
 
-  store_stats(upstream, ewma, now)
+    store_stats(upstream, ewma, now)
 
-  unlock()
+    unlock()
 
-  return ewma, nil
+    return ewma, nil
 end
 
 local function get_upstream_name(upstream)
-   return upstream.host .. ":" .. upstream.port
+    return upstream.host .. ":" .. upstream.port
 end
 
 local function score(upstream)
-  -- Original implementation used names
-  -- Endpoints don't have names, so passing in IP:Port as key instead
-  local upstream_name = get_upstream_name(upstream)
-  return get_or_update_ewma(upstream_name, 0, false)
+    -- Original implementation used names
+    -- Endpoints don't have names, so passing in IP:Port as key instead
+    local upstream_name = get_upstream_name(upstream)
+    return get_or_update_ewma(upstream_name, 0, false)
 end
 
 local function parse_addr(addr)
@@ -126,7 +122,7 @@ end
 
 local function _trans_format(up_nodes)
     -- trans
-    --{"1.2.3.4:80":100,"5.6.7.8:8080":100}
+    -- {"1.2.3.4:80":100,"5.6.7.8:8080":100}
     -- into
     -- [{"host":"1.2.3.4","port":"80"},{"host":"5.6.7.8","port":"8080"}]
     local peers = {}
@@ -147,8 +143,7 @@ end
 local function _ewma_find(ctx, up_nodes)
     local peers
 
-    if not up_nodes
-       or core.table.nkeys(up_nodes) == 0 then
+    if not up_nodes or core.table.nkeys(up_nodes) == 0 then
         return nil, 'up_nodes empty'
     end
 
@@ -161,9 +156,9 @@ local function _ewma_find(ctx, up_nodes)
     local endpoint, backendpoint = peers[1], nil
 
     if #peers > 1 then
-        local a, b = math.random(1, #peers), math.random(1, #peers-1)
+        local a, b = math.random(1, #peers), math.random(1, #peers - 1)
         if b >= a then
-            b = b+1
+            b = b + 1
         end
 
         endpoint, backendpoint = peers[a], peers[b]
@@ -175,7 +170,6 @@ local function _ewma_find(ctx, up_nodes)
     return get_upstream_name(endpoint)
 end
 
-
 local function _ewma_after_balance(ctx, before_retry)
     if before_retry then
         -- don't count tries which fail to complete
@@ -195,14 +189,13 @@ local function _ewma_after_balance(ctx, before_retry)
 end
 
 function _M.new(up_nodes, upstream)
-    if not shm_ewma
-       or not shm_last_touched_at then
+    if not shm_ewma or not shm_last_touched_at then
         return nil, "dictionary not find"
     end
 
     return {
         upstream = upstream,
-        get = function (ctx)
+        get = function(ctx)
             return _ewma_find(ctx, up_nodes)
         end,
         after_balance = _ewma_after_balance

From e57de312817fa593d8c94969494ae8e0a5299cf7 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?=E5=A4=A7=E5=8F=AF?= <hnlq.sysu@gmail.com>
Date: Fri, 15 Jan 2021 17:22:48 +0800
Subject: [PATCH 04/30] fix lint error

---
 apisix/balancer/ewma.lua | 5 +++--
 1 file changed, 3 insertions(+), 2 deletions(-)

diff --git a/apisix/balancer/ewma.lua b/apisix/balancer/ewma.lua
index dd0c3f661687..a0a1d2c28582 100644
--- a/apisix/balancer/ewma.lua
+++ b/apisix/balancer/ewma.lua
@@ -153,7 +153,7 @@ local function _ewma_find(ctx, up_nodes)
         return nil, 'up_nodes trans error'
     end
 
-    local endpoint, backendpoint = peers[1], nil
+    local endpoint = peers[1]
 
     if #peers > 1 then
         local a, b = math.random(1, #peers), math.random(1, #peers - 1)
@@ -161,9 +161,10 @@ local function _ewma_find(ctx, up_nodes)
             b = b + 1
         end
 
+        local backendpoint
         endpoint, backendpoint = peers[a], peers[b]
         if score(endpoint) > score(backendpoint) then
-            endpoint, backendpoint = backendpoint, endpoint
+            endpoint = backendpoint
         end
     end
 

From 9e85605536f87f92998aa4d12179629df0779e30 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?=E5=A4=A7=E5=8F=AF?= <hnlq.sysu@gmail.com>
Date: Fri, 15 Jan 2021 18:43:24 +0800
Subject: [PATCH 05/30] fix lint error

---
 apisix/balancer/ewma.lua | 1 +
 1 file changed, 1 insertion(+)

diff --git a/apisix/balancer/ewma.lua b/apisix/balancer/ewma.lua
index a0a1d2c28582..051d1e7c9997 100644
--- a/apisix/balancer/ewma.lua
+++ b/apisix/balancer/ewma.lua
@@ -11,6 +11,7 @@ local ngx_shared = ngx.shared
 local ngx_now = ngx.now
 local math = math
 local pairs = pairs
+local next = next
 
 local DECAY_TIME = 10 -- this value is in seconds
 local LOCK_KEY = ":ewma_key"

From 89c71ce91ebe2ddd26e4135e672ebf7d40d372a7 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?=E5=A4=A7=E5=8F=AF?= <hnlq.sysu@gmail.com>
Date: Fri, 15 Jan 2021 20:06:51 +0800
Subject: [PATCH 06/30] fix test case

---
 apisix/balancer/ewma.lua | 13 +++++++------
 1 file changed, 7 insertions(+), 6 deletions(-)

diff --git a/apisix/balancer/ewma.lua b/apisix/balancer/ewma.lua
index 051d1e7c9997..11f4b0eb0b97 100644
--- a/apisix/balancer/ewma.lua
+++ b/apisix/balancer/ewma.lua
@@ -4,6 +4,7 @@
 -- https://github.com/twitter/finagle/blob/1bc837c4feafc0096e43c0e98516a8e1c50c4421
 --   /finagle-core/src/main/scala/com/twitter/finagle/loadbalancer/PeakEwma.scala
 local core = require("apisix.core")
+local nkeys = require("core.table.nkeys")
 local resty_lock = require("resty.lock")
 
 local ngx = ngx
@@ -24,9 +25,6 @@ local lrucache_trans_format = core.lrucache.new({ttl = 300, count = 256})
 
 local ewma_lock, ewma_lock_err = resty_lock:new("balancer_ewma_locks",
                                                 {timeout = 0, exptime = 0.1})
-if not ewma_lock then
-    error(ewma_lock_err)
-end
 
 local _M = {name = "ewma"}
 
@@ -90,8 +88,7 @@ local function get_or_update_ewma(upstream, rtt, update)
     end
 
     local now = ngx_now()
-    local last_touched_at = ngx.shared.balancer_ewma_last_touched_at:get(
-                                upstream) or 0
+    local last_touched_at = shm_last_touched_at:get(upstream) or 0
     ewma = decay_ewma(ewma, last_touched_at, rtt, now)
 
     if not update then
@@ -144,7 +141,7 @@ end
 local function _ewma_find(ctx, up_nodes)
     local peers
 
-    if not up_nodes or core.table.nkeys(up_nodes) == 0 then
+    if not up_nodes or nkeys(up_nodes) == 0 then
         return nil, 'up_nodes empty'
     end
 
@@ -195,6 +192,10 @@ function _M.new(up_nodes, upstream)
         return nil, "dictionary not find"
     end
 
+    if not ewma_lock then
+        error(ewma_lock_err)
+    end
+
     return {
         upstream = upstream,
         get = function(ctx)

From 53473f64269d1e7e642d2b27f242090d5e51251b Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?=E5=A4=A7=E5=8F=AF?= <hnlq.sysu@gmail.com>
Date: Fri, 15 Jan 2021 20:13:50 +0800
Subject: [PATCH 07/30] fix lint error

---
 apisix/balancer/ewma.lua | 1 +
 1 file changed, 1 insertion(+)

diff --git a/apisix/balancer/ewma.lua b/apisix/balancer/ewma.lua
index 11f4b0eb0b97..56468d3c07b4 100644
--- a/apisix/balancer/ewma.lua
+++ b/apisix/balancer/ewma.lua
@@ -13,6 +13,7 @@ local ngx_now = ngx.now
 local math = math
 local pairs = pairs
 local next = next
+local error = error
 
 local DECAY_TIME = 10 -- this value is in seconds
 local LOCK_KEY = ":ewma_key"

From 35862990df79d63abedf2e846d1ade7f52721eab Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?=E5=A4=A7=E5=8F=AF?= <hnlq.sysu@gmail.com>
Date: Mon, 18 Jan 2021 08:14:57 +0800
Subject: [PATCH 08/30] fix require error

---
 apisix/balancer/ewma.lua | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/apisix/balancer/ewma.lua b/apisix/balancer/ewma.lua
index 56468d3c07b4..8ba813c54f99 100644
--- a/apisix/balancer/ewma.lua
+++ b/apisix/balancer/ewma.lua
@@ -4,7 +4,7 @@
 -- https://github.com/twitter/finagle/blob/1bc837c4feafc0096e43c0e98516a8e1c50c4421
 --   /finagle-core/src/main/scala/com/twitter/finagle/loadbalancer/PeakEwma.scala
 local core = require("apisix.core")
-local nkeys = require("core.table.nkeys")
+local nkeys = require("core.table").nkeys
 local resty_lock = require("resty.lock")
 
 local ngx = ngx

From 99845b18e6601c8e8dcb88902293223d4a7f02dc Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?=E5=A4=A7=E5=8F=AF?= <hnlq.sysu@gmail.com>
Date: Mon, 18 Jan 2021 08:35:16 +0800
Subject: [PATCH 09/30] fix test case

---
 apisix/balancer/ewma.lua | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/apisix/balancer/ewma.lua b/apisix/balancer/ewma.lua
index 8ba813c54f99..ca1549ddb9f7 100644
--- a/apisix/balancer/ewma.lua
+++ b/apisix/balancer/ewma.lua
@@ -4,9 +4,9 @@
 -- https://github.com/twitter/finagle/blob/1bc837c4feafc0096e43c0e98516a8e1c50c4421
 --   /finagle-core/src/main/scala/com/twitter/finagle/loadbalancer/PeakEwma.scala
 local core = require("apisix.core")
-local nkeys = require("core.table").nkeys
 local resty_lock = require("resty.lock")
 
+local nkeys = core.table.nkeys
 local ngx = ngx
 local ngx_shared = ngx.shared
 local ngx_now = ngx.now

From 6bb3e1d666837f8c3691e65819907f17b1e0eb7c Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?=E5=A4=A7=E5=8F=AF?= <hnlq.sysu@gmail.com>
Date: Mon, 18 Jan 2021 09:51:26 +0800
Subject: [PATCH 10/30] improve typo

---
 apisix/balancer/ewma.lua | 5 ++---
 1 file changed, 2 insertions(+), 3 deletions(-)

diff --git a/apisix/balancer/ewma.lua b/apisix/balancer/ewma.lua
index ca1549ddb9f7..d372670ffd87 100644
--- a/apisix/balancer/ewma.lua
+++ b/apisix/balancer/ewma.lua
@@ -51,7 +51,7 @@ end
 
 local function decay_ewma(ewma, last_touched_at, rtt, now)
     local td = now - last_touched_at
-    td = (td > 0) and td or 0
+    td = math.max(td, 0)
     local weight = math.exp(-td / DECAY_TIME)
 
     ewma = ewma * weight + rtt * (1.0 - weight)
@@ -64,8 +64,7 @@ local function store_stats(upstream, ewma, now)
         core.log.warn("shm_last_touched_at:set failed: ", err)
     end
     if forcible then
-        core.log
-            .warn("shm_last_touched_at:set valid items forcibly overwritten")
+        core.log.warn("shm_last_touched_at:set valid items forcibly overwritten")
     end
 
     success, err, forcible = shm_ewma:set(upstream, ewma)

From 46ad8ea1d514f794c3d8aa096490adc73fa7baa9 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?=E5=A4=A7=E5=8F=AF?= <hnlq.sysu@gmail.com>
Date: Mon, 18 Jan 2021 10:08:16 +0800
Subject: [PATCH 11/30] improve style

---
 apisix/balancer/ewma.lua | 6 ++----
 1 file changed, 2 insertions(+), 4 deletions(-)

diff --git a/apisix/balancer/ewma.lua b/apisix/balancer/ewma.lua
index d372670ffd87..f448344b3176 100644
--- a/apisix/balancer/ewma.lua
+++ b/apisix/balancer/ewma.lua
@@ -31,10 +31,8 @@ local _M = {name = "ewma"}
 
 local function lock(upstream)
     local _, err = ewma_lock:lock(upstream .. LOCK_KEY)
-    if err then
-        if err ~= "timeout" then
-            core.log.error("EWMA Balancer failed to lock: ", err)
-        end
+    if err and err ~= "timeout" then
+        core.log.error("EWMA Balancer failed to lock: ", err)
     end
 
     return err

From c3a90093145daaa6b09e2e26a56a24b97399a558 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?=E5=A4=A7=E5=8F=AF?= <hnlq.sysu@gmail.com>
Date: Tue, 19 Jan 2021 10:26:55 +0800
Subject: [PATCH 12/30] fix log level

---
 apisix/balancer/ewma.lua | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)

diff --git a/apisix/balancer/ewma.lua b/apisix/balancer/ewma.lua
index f448344b3176..3ff774925648 100644
--- a/apisix/balancer/ewma.lua
+++ b/apisix/balancer/ewma.lua
@@ -59,7 +59,7 @@ end
 local function store_stats(upstream, ewma, now)
     local success, err, forcible = shm_last_touched_at:set(upstream, now)
     if not success then
-        core.log.warn("shm_last_touched_at:set failed: ", err)
+        core.log.error("shm_last_touched_at:set failed: ", err)
     end
     if forcible then
         core.log.warn("shm_last_touched_at:set valid items forcibly overwritten")
@@ -67,7 +67,7 @@ local function store_stats(upstream, ewma, now)
 
     success, err, forcible = shm_ewma:set(upstream, ewma)
     if not success then
-        core.log.warn("shm_ewma:set failed: ", err)
+        core.log.error("shm_ewma:set failed: ", err)
     end
     if forcible then
         core.log.warn("shm_ewma:set valid items forcibly overwritten")

From 581f6f5cd65c802de2150b2a56666b6fbcedf3c2 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?=E5=A4=A7=E5=8F=AF?= <hnlq.sysu@gmail.com>
Date: Thu, 28 Jan 2021 11:44:47 +0800
Subject: [PATCH 13/30] fiter tried servers

---
 apisix/balancer/ewma.lua | 52 +++++++++++++++++++++++++++++++++-------
 1 file changed, 43 insertions(+), 9 deletions(-)

diff --git a/apisix/balancer/ewma.lua b/apisix/balancer/ewma.lua
index 3ff774925648..fcafccbbf2d6 100644
--- a/apisix/balancer/ewma.lua
+++ b/apisix/balancer/ewma.lua
@@ -7,6 +7,8 @@ local core = require("apisix.core")
 local resty_lock = require("resty.lock")
 
 local nkeys = core.table.nkeys
+local table_insert = core.table.insert
+local table_deepcopy = core.table.deepcopy
 local ngx = ngx
 local ngx_shared = ngx.shared
 local ngx_now = ngx.now
@@ -24,8 +26,7 @@ local shm_last_touched_at = ngx_shared.balancer_ewma_last_touched_at
 local lrucache_addr = core.lrucache.new({ttl = 300, count = 1024})
 local lrucache_trans_format = core.lrucache.new({ttl = 300, count = 256})
 
-local ewma_lock, ewma_lock_err = resty_lock:new("balancer_ewma_locks",
-                                                {timeout = 0, exptime = 0.1})
+local ewma_lock, ewma_lock_err = resty_lock:new("balancer_ewma_locks", {timeout = 0, exptime = 0.1})
 
 local _M = {name = "ewma"}
 
@@ -143,22 +144,44 @@ local function _ewma_find(ctx, up_nodes)
         return nil, 'up_nodes empty'
     end
 
-    peers = lrucache_trans_format(ctx.upstream_key, ctx.upstream_version,
-                                  _trans_format, up_nodes)
+    peers = lrucache_trans_format(ctx.upstream_key, ctx.upstream_version, _trans_format, up_nodes)
     if not peers then
         return nil, 'up_nodes trans error'
     end
 
-    local endpoint = peers[1]
+    local tried_endpoints
+    if not ctx.balancer_tried_servers then
+        tried_endpoints = {}
+        ctx.balancer_tried_servers = tried_endpoints
+    else
+        tried_endpoints = ctx.balancer_tried_servers
+    end
+
+    local filtered_peers
+    for _, peer in ipairs(peers) do
+        if not tried_endpoints[get_upstream_name(peer)] then
+            if not filtered_peers then
+                filtered_peers = {}
+            end
+            table_insert(filtered_peers, peer)
+        end
+    end
+
+    if not filtered_peers then
+        core.log.warn("all endpoints have been retried")
+        filtered_peers = table_deepcopy(peers)
+    end
 
-    if #peers > 1 then
-        local a, b = math.random(1, #peers), math.random(1, #peers - 1)
+    local endpoint = filtered_peers[1]
+
+    if #filtered_peers > 1 then
+        local a, b = math.random(1, #filtered_peers), math.random(1, #filtered_peers - 1)
         if b >= a then
             b = b + 1
         end
 
         local backendpoint
-        endpoint, backendpoint = peers[a], peers[b]
+        endpoint, backendpoint = filtered_peers[a], filtered_peers[b]
         if score(endpoint) > score(backendpoint) then
             endpoint = backendpoint
         end
@@ -169,10 +192,21 @@ end
 
 local function _ewma_after_balance(ctx, before_retry)
     if before_retry then
-        -- don't count tries which fail to complete
+        if not ctx.balancer_tried_servers then
+            ctx.balancer_tried_servers = core.tablepool.fetch("balancer_tried_servers", 0, 2)
+        end
+
+        ctx.balancer_tried_servers[ctx.balancer_server] = true
+        ctx.balancer_tried_servers_count = (ctx.balancer_tried_servers_count or 0) + 1
+
         return nil
     end
 
+    if ctx.balancer_tried_servers then
+        core.tablepool.release("balancer_tried_servers", ctx.balancer_tried_servers)
+        ctx.balancer_tried_servers = nil
+    end
+
     local response_time = ctx.var.upstream_response_time or 0
     local connect_time = ctx.var.upstream_connect_time or 0
     local rtt = connect_time + response_time

From 7c3bfcf2b42d8a1917c02ff71377ab2527787595 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?=E5=A4=A7=E5=8F=AF?= <hnlq.sysu@gmail.com>
Date: Thu, 28 Jan 2021 12:04:20 +0800
Subject: [PATCH 14/30] add local ipairs

---
 apisix/balancer/ewma.lua | 1 +
 1 file changed, 1 insertion(+)

diff --git a/apisix/balancer/ewma.lua b/apisix/balancer/ewma.lua
index fcafccbbf2d6..6b90c2d0e342 100644
--- a/apisix/balancer/ewma.lua
+++ b/apisix/balancer/ewma.lua
@@ -14,6 +14,7 @@ local ngx_shared = ngx.shared
 local ngx_now = ngx.now
 local math = math
 local pairs = pairs
+local ipairs = ipairs
 local next = next
 local error = error
 

From ea3e53eb17ff0c1d5e1484f4aa271607db564379 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?=E5=A4=A7=E5=8F=AF?= <hnlq.sysu@gmail.com>
Date: Thu, 28 Jan 2021 13:42:37 +0800
Subject: [PATCH 15/30] add test case for filter tried servers

---
 t/node/ewma.t | 71 +++++++++++++++++++++++++++++++++++++++++++++++++++
 1 file changed, 71 insertions(+)

diff --git a/t/node/ewma.t b/t/node/ewma.t
index 955a6b785fe6..4bd95cceb49a 100644
--- a/t/node/ewma.t
+++ b/t/node/ewma.t
@@ -215,3 +215,74 @@ GET /t
 --- error_code: 200
 --- no_error_log
 [error]
+
+
+=== TEST 4: about filter tried servers
+--- timeout: 5
+--- config
+    location /t {
+        content_by_lua_block {
+            local t = require("lib.test_admin").test
+
+            --remove the 1981 node,
+            --add the 1984 node (invalid node)
+            --keep two nodes for triggering ewma logic in server_picker function of balancer phase
+            local code, body = t('/apisix/admin/routes/1',
+                 ngx.HTTP_PUT,
+                 [[{
+                        "upstream": {
+                            "nodes": {
+                                "127.0.0.1:1980": 100,
+                                "127.0.0.1:1984": 100
+                            },
+                            "type": "ewma"
+                        },
+                        "uri": "/ewma"
+                }]]
+                )
+
+            if code ~= 200 then
+                ngx.say("update route failed")
+                return
+            end
+
+            local http = require "resty.http"
+            local uri = "http://127.0.0.1:" .. ngx.var.server_port
+                        .. "/ewma"
+
+            local ports_count = {}
+            for i = 1, 12 do
+                local httpc = http.new()
+                httpc:set_timeout(1000)
+                local res, err = httpc:request_uri(uri, {method = "GET", keepalive = false})
+                if not res then
+                    ngx.say(err)
+                    return
+                end
+
+                ports_count[res.body] = (ports_count[res.body] or 0) + 1
+            end
+
+            local ports_arr = {}
+            for port, count in pairs(ports_count) do
+                table.insert(ports_arr, {port = port, count = count})
+            end
+
+            local function cmd(a, b)
+                return a.port > b.port
+            end
+            table.sort(ports_arr, cmd)
+
+            ngx.say(require("toolkit.json").encode(ports_arr))
+            ngx.exit(200)
+        }
+    }
+--- request
+GET /t
+--- response_body
+[{"count":12,"port":"1980"}]
+--- error_code: 200
+--- no_error_log
+[error]
+
+

From e6e5a23547ffa0bdb5456918041259bbe9722b18 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?=E5=A4=A7=E5=8F=AF?= <hnlq.sysu@gmail.com>
Date: Thu, 28 Jan 2021 13:59:12 +0800
Subject: [PATCH 16/30] remove blank line

---
 t/node/ewma.t | 3 +--
 1 file changed, 1 insertion(+), 2 deletions(-)

diff --git a/t/node/ewma.t b/t/node/ewma.t
index 4bd95cceb49a..3f3646bbf548 100644
--- a/t/node/ewma.t
+++ b/t/node/ewma.t
@@ -217,6 +217,7 @@ GET /t
 [error]
 
 
+
 === TEST 4: about filter tried servers
 --- timeout: 5
 --- config
@@ -284,5 +285,3 @@ GET /t
 --- error_code: 200
 --- no_error_log
 [error]
-
-

From b45a560c98f1b5ed18941ff267a2e228730f6024 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?=E5=A4=A7=E5=8F=AF?= <hnlq.sysu@gmail.com>
Date: Thu, 28 Jan 2021 15:14:41 +0800
Subject: [PATCH 17/30] add comment for test

---
 t/node/ewma.t | 1 +
 1 file changed, 1 insertion(+)

diff --git a/t/node/ewma.t b/t/node/ewma.t
index 3f3646bbf548..c41c7ee95120 100644
--- a/t/node/ewma.t
+++ b/t/node/ewma.t
@@ -251,6 +251,7 @@ GET /t
             local uri = "http://127.0.0.1:" .. ngx.var.server_port
                         .. "/ewma"
 
+            --should select the 1984 node, because it is invalid
             local ports_count = {}
             for i = 1, 12 do
                 local httpc = http.new()

From 022cda1a9cfffc790aaa126b1ce5e196a1ea9b1f Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?=E5=A4=A7=E5=8F=AF?= <hnlq.sysu@gmail.com>
Date: Thu, 28 Jan 2021 17:25:29 +0800
Subject: [PATCH 18/30] improve test case

---
 t/node/ewma.t | 21 +++++++++++++--------
 1 file changed, 13 insertions(+), 8 deletions(-)

diff --git a/t/node/ewma.t b/t/node/ewma.t
index c41c7ee95120..5a90ec4529ab 100644
--- a/t/node/ewma.t
+++ b/t/node/ewma.t
@@ -219,7 +219,7 @@ GET /t
 
 
 === TEST 4: about filter tried servers
---- timeout: 5
+--- timeout: 10
 --- config
     location /t {
         content_by_lua_block {
@@ -233,10 +233,15 @@ GET /t
                  [[{
                         "upstream": {
                             "nodes": {
-                                "127.0.0.1:1980": 100,
-                                "127.0.0.1:1984": 100
+                                "127.0.0.1:1980": 1,
+                                "127.0.0.1:1984": 1
                             },
-                            "type": "ewma"
+                            "type": "ewma",
+                            "timeout": {
+                                "connect": 0.5,
+                                "send": 0.5,
+                                "read": 0.5
+                            }
                         },
                         "uri": "/ewma"
                 }]]
@@ -251,11 +256,11 @@ GET /t
             local uri = "http://127.0.0.1:" .. ngx.var.server_port
                         .. "/ewma"
 
-            --should select the 1984 node, because it is invalid
+            --should select the 1980 node, because 1984 is invalid
             local ports_count = {}
             for i = 1, 12 do
                 local httpc = http.new()
-                httpc:set_timeout(1000)
+                httpc:set_timeout(2000)
                 local res, err = httpc:request_uri(uri, {method = "GET", keepalive = false})
                 if not res then
                     ngx.say(err)
@@ -284,5 +289,5 @@ GET /t
 --- response_body
 [{"count":12,"port":"1980"}]
 --- error_code: 200
---- no_error_log
-[error]
+--- error_log
+timed out) while reading response header from upstream

From 6db0b80d1d763a623e6fda1a99145dd7f078dc5b Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?=E5=A4=A7=E5=8F=AF?= <hnlq.sysu@gmail.com>
Date: Thu, 28 Jan 2021 18:13:26 +0800
Subject: [PATCH 19/30] improve test case

---
 t/node/ewma.t | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/t/node/ewma.t b/t/node/ewma.t
index 5a90ec4529ab..8116ff4f7f1c 100644
--- a/t/node/ewma.t
+++ b/t/node/ewma.t
@@ -290,4 +290,4 @@ GET /t
 [{"count":12,"port":"1980"}]
 --- error_code: 200
 --- error_log
-timed out) while reading response header from upstream
+upstream timed out

From 1a99bd1a7af9230ee0c3ee6c8066efde2e15c82c Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?=E5=A4=A7=E5=8F=AF?= <hnlq.sysu@gmail.com>
Date: Thu, 28 Jan 2021 19:14:16 +0800
Subject: [PATCH 20/30] improve test case

---
 t/node/ewma.t | 12 ++++++------
 1 file changed, 6 insertions(+), 6 deletions(-)

diff --git a/t/node/ewma.t b/t/node/ewma.t
index 8116ff4f7f1c..32b16de08849 100644
--- a/t/node/ewma.t
+++ b/t/node/ewma.t
@@ -166,7 +166,7 @@ GET /t
                 return
             end
 
-            ngx.sleep(20)
+            ngx.sleep(11)
             --keep the node 1980 hot
             for i = 1, 12 do
                 local httpc = http.new()
@@ -226,7 +226,7 @@ GET /t
             local t = require("lib.test_admin").test
 
             --remove the 1981 node,
-            --add the 1984 node (invalid node)
+            --add the 8888 node (invalid node)
             --keep two nodes for triggering ewma logic in server_picker function of balancer phase
             local code, body = t('/apisix/admin/routes/1',
                  ngx.HTTP_PUT,
@@ -234,11 +234,11 @@ GET /t
                         "upstream": {
                             "nodes": {
                                 "127.0.0.1:1980": 1,
-                                "127.0.0.1:1984": 1
+                                "127.0.0.1:8888": 1
                             },
                             "type": "ewma",
                             "timeout": {
-                                "connect": 0.5,
+                                "connect": 0.1,
                                 "send": 0.5,
                                 "read": 0.5
                             }
@@ -256,7 +256,7 @@ GET /t
             local uri = "http://127.0.0.1:" .. ngx.var.server_port
                         .. "/ewma"
 
-            --should select the 1980 node, because 1984 is invalid
+            --should always select the 1980 node, because 8888 is invalid
             local ports_count = {}
             for i = 1, 12 do
                 local httpc = http.new()
@@ -290,4 +290,4 @@ GET /t
 [{"count":12,"port":"1980"}]
 --- error_code: 200
 --- error_log
-upstream timed out
+Connection refused) while connecting to upstream

From 0518f39f11d1b1adb2d8036a784f0872610f74c6 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?=E5=A4=A7=E5=8F=AF?= <hnlq.sysu@gmail.com>
Date: Thu, 28 Jan 2021 19:47:49 +0800
Subject: [PATCH 21/30] improve ewma logic and test case

---
 apisix/balancer/ewma.lua |  24 ++++-----
 t/node/ewma.t            | 106 +++++++++++++++++++++++++++++++++++----
 2 files changed, 107 insertions(+), 23 deletions(-)

diff --git a/apisix/balancer/ewma.lua b/apisix/balancer/ewma.lua
index 6b90c2d0e342..c8b9b10cb4c6 100644
--- a/apisix/balancer/ewma.lua
+++ b/apisix/balancer/ewma.lua
@@ -141,7 +141,8 @@ end
 local function _ewma_find(ctx, up_nodes)
     local peers
 
-    if not up_nodes or nkeys(up_nodes) == 0 then
+    if not up_nodes or nkeys(up_nodes) == 0 or
+        (ctx.balancer_tried_servers and ctx.balancer_tried_servers_count == nkeys(up_nodes)) then
         return nil, 'up_nodes empty'
     end
 
@@ -150,27 +151,22 @@ local function _ewma_find(ctx, up_nodes)
         return nil, 'up_nodes trans error'
     end
 
-    local tried_endpoints
-    if not ctx.balancer_tried_servers then
-        tried_endpoints = {}
-        ctx.balancer_tried_servers = tried_endpoints
-    else
-        tried_endpoints = ctx.balancer_tried_servers
-    end
-
     local filtered_peers
     for _, peer in ipairs(peers) do
-        if not tried_endpoints[get_upstream_name(peer)] then
-            if not filtered_peers then
-                filtered_peers = {}
+        if ctx.balancer_tried_servers then
+            if not ctx.balancer_tried_servers[get_upstream_name(peer)] then
+                if not filtered_peers then
+                    filtered_peers = {}
+                end
+
+                table_insert(filtered_peers, peer)
             end
-            table_insert(filtered_peers, peer)
         end
     end
 
     if not filtered_peers then
         core.log.warn("all endpoints have been retried")
-        filtered_peers = table_deepcopy(peers)
+        filtered_peers = peers
     end
 
     local endpoint = filtered_peers[1]
diff --git a/t/node/ewma.t b/t/node/ewma.t
index 32b16de08849..5cc0acf8c180 100644
--- a/t/node/ewma.t
+++ b/t/node/ewma.t
@@ -257,17 +257,23 @@ GET /t
                         .. "/ewma"
 
             --should always select the 1980 node, because 8888 is invalid
+            local t = {}
             local ports_count = {}
             for i = 1, 12 do
-                local httpc = http.new()
-                httpc:set_timeout(2000)
-                local res, err = httpc:request_uri(uri, {method = "GET", keepalive = false})
-                if not res then
-                    ngx.say(err)
-                    return
-                end
-
-                ports_count[res.body] = (ports_count[res.body] or 0) + 1
+                local th = assert(ngx.thread.spawn(function(i)
+                    local httpc = http.new()
+                    httpc:set_timeout(2000)
+                    local res, err = httpc:request_uri(uri, {method = "GET", keepalive = false})
+                    if not res then
+                        ngx.say(err)
+                        return
+                    end
+                    ports_count[res.body] = (ports_count[res.body] or 0) + 1
+                end, i))
+                table.insert(t, th)
+            end
+            for i, th in ipairs(t) do
+                ngx.thread.wait(th)
             end
 
             local ports_arr = {}
@@ -291,3 +297,85 @@ GET /t
 --- error_code: 200
 --- error_log
 Connection refused) while connecting to upstream
+
+
+
+=== TEST 5: about all endpoints have been retried
+--- timeout: 10
+--- config
+    location /t {
+        content_by_lua_block {
+            local t = require("lib.test_admin").test
+
+            --add the 9527 node (invalid node)
+            --remove the 1980 node
+            --keep two nodes for triggering ewma logic in server_picker function of balancer phase
+            local code, body = t('/apisix/admin/routes/1',
+                 ngx.HTTP_PUT,
+                 [[{
+                        "upstream": {
+                            "nodes": {
+                                "127.0.0.1:9527": 1,
+                                "127.0.0.1:8888": 1
+                            },
+                            "type": "ewma",
+                            "timeout": {
+                                "connect": 0.1,
+                                "send": 0.5,
+                                "read": 0.5
+                            }
+                        },
+                        "uri": "/ewma"
+                }]]
+                )
+
+            if code ~= 200 then
+                ngx.say("update route failed")
+                return
+            end
+
+            local http = require "resty.http"
+            local uri = "http://127.0.0.1:" .. ngx.var.server_port
+                        .. "/ewma"
+
+            --should always select the 1980 node, because 8888 is invalid
+            local t = {}
+            local ports_count = {}
+            for i = 1, 12 do
+                local th = assert(ngx.thread.spawn(function(i)
+                    local httpc = http.new()
+                    httpc:set_timeout(2000)
+                    local res, err = httpc:request_uri(uri, {method = "GET", keepalive = false})
+                    if not res then
+                        ngx.say(err)
+                        return
+                    end
+                    ports_count[res.status] = (ports_count[res.status] or 0) + 1
+                end, i))
+                table.insert(t, th)
+            end
+            for i, th in ipairs(t) do
+                ngx.thread.wait(th)
+            end
+
+            local ports_arr = {}
+            for port, count in pairs(ports_count) do
+                table.insert(ports_arr, {port = port, count = count})
+            end
+
+            local function cmd(a, b)
+                return a.port > b.port
+            end
+            table.sort(ports_arr, cmd)
+
+            ngx.say(require("toolkit.json").encode(ports_arr))
+            ngx.exit(200)
+        }
+    }
+--- request
+GET /t
+--- response_body
+[{"count":12,"port":502}]
+--- error_code: 200
+--- error_log
+Connection refused) while connecting to upstream

From 4fec4508eec9632a1958079a1772d84e10a013be Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?=E5=A4=A7=E5=8F=AF?= <hnlq.sysu@gmail.com>
Date: Thu, 28 Jan 2021 19:48:41 +0800
Subject: [PATCH 22/30] remove unused local

---
 apisix/balancer/ewma.lua | 1 -
 1 file changed, 1 deletion(-)

diff --git a/apisix/balancer/ewma.lua b/apisix/balancer/ewma.lua
index c8b9b10cb4c6..f8b77e289869 100644
--- a/apisix/balancer/ewma.lua
+++ b/apisix/balancer/ewma.lua
@@ -8,7 +8,6 @@ local resty_lock = require("resty.lock")
 
 local nkeys = core.table.nkeys
 local table_insert = core.table.insert
-local table_deepcopy = core.table.deepcopy
 local ngx = ngx
 local ngx_shared = ngx.shared
 local ngx_now = ngx.now

From c799d3ed7040bcccdb0d3a2ffafd91deaa56bcce Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?=E5=A4=A7=E5=8F=AF?= <hnlq.sysu@gmail.com>
Date: Thu, 28 Jan 2021 19:54:00 +0800
Subject: [PATCH 23/30] improve

---
 apisix/balancer/ewma.lua | 7 +++++--
 1 file changed, 5 insertions(+), 2 deletions(-)

diff --git a/apisix/balancer/ewma.lua b/apisix/balancer/ewma.lua
index f8b77e289869..bf261985f2f7 100644
--- a/apisix/balancer/ewma.lua
+++ b/apisix/balancer/ewma.lua
@@ -140,11 +140,14 @@ end
 local function _ewma_find(ctx, up_nodes)
     local peers
 
-    if not up_nodes or nkeys(up_nodes) == 0 or
-        (ctx.balancer_tried_servers and ctx.balancer_tried_servers_count == nkeys(up_nodes)) then
+    if not up_nodes or nkeys(up_nodes) == 0 then
         return nil, 'up_nodes empty'
     end
 
+    if ctx.balancer_tried_servers and ctx.balancer_tried_servers_count == nkeys(up_nodes) then
+        return nil, "all upstream servers tried"
+    end
+
     peers = lrucache_trans_format(ctx.upstream_key, ctx.upstream_version, _trans_format, up_nodes)
     if not peers then
         return nil, 'up_nodes trans error'

From b57a2675fadadd5dfa5a3f2896d3ca56bdbfd21d Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?=E5=A4=A7=E5=8F=AF?= <hnlq.sysu@gmail.com>
Date: Fri, 29 Jan 2021 07:46:36 +0800
Subject: [PATCH 24/30] fix test case

---
 t/node/ewma.t | 10 +++++-----
 1 file changed, 5 insertions(+), 5 deletions(-)

diff --git a/t/node/ewma.t b/t/node/ewma.t
index 5cc0acf8c180..74bfbf7a3c84 100644
--- a/t/node/ewma.t
+++ b/t/node/ewma.t
@@ -226,7 +226,7 @@ GET /t
             local t = require("lib.test_admin").test
 
             --remove the 1981 node,
-            --add the 8888 node (invalid node)
+            --add the 0 node (invalid node)
             --keep two nodes for triggering ewma logic in server_picker function of balancer phase
             local code, body = t('/apisix/admin/routes/1',
                  ngx.HTTP_PUT,
@@ -234,7 +234,7 @@ GET /t
                         "upstream": {
                             "nodes": {
                                 "127.0.0.1:1980": 1,
-                                "127.0.0.1:8888": 1
+                                "127.0.0.1:0": 1
                             },
                             "type": "ewma",
                             "timeout": {
@@ -256,7 +256,7 @@ GET /t
             local uri = "http://127.0.0.1:" .. ngx.var.server_port
                         .. "/ewma"
 
-            --should always select the 1980 node, because 8888 is invalid
+            --should always select the 1980 node, because 0 is invalid
             local t = {}
             local ports_count = {}
             for i = 1, 12 do
@@ -316,7 +316,7 @@ Connection refused) while connecting to upstream
                         "upstream": {
                             "nodes": {
                                 "127.0.0.1:9527": 1,
-                                "127.0.0.1:8888": 1
+                                "127.0.0.1:0": 1
                             },
                             "type": "ewma",
                             "timeout": {
@@ -338,7 +338,7 @@ Connection refused) while connecting to upstream
             local uri = "http://127.0.0.1:" .. ngx.var.server_port
                         .. "/ewma"
 
-            --should always select the 1980 node, because 8888 is invalid
+            --should always select the 1980 node, because 0 is invalid
             local t = {}
             local ports_count = {}
             for i = 1, 12 do

From 6a642b0d6223b3979673e4533921fae8e628fc4a Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?=E5=A4=A7=E5=8F=AF?= <hnlq.sysu@gmail.com>
Date: Fri, 29 Jan 2021 07:48:42 +0800
Subject: [PATCH 25/30] improve

---
 t/node/ewma.t | 8 ++++----
 1 file changed, 4 insertions(+), 4 deletions(-)

diff --git a/t/node/ewma.t b/t/node/ewma.t
index 74bfbf7a3c84..2779f9363114 100644
--- a/t/node/ewma.t
+++ b/t/node/ewma.t
@@ -226,7 +226,7 @@ GET /t
             local t = require("lib.test_admin").test
 
             --remove the 1981 node,
-            --add the 0 node (invalid node)
+            --add the 9527 node (invalid node)
             --keep two nodes for triggering ewma logic in server_picker function of balancer phase
             local code, body = t('/apisix/admin/routes/1',
                  ngx.HTTP_PUT,
@@ -234,7 +234,7 @@ GET /t
                         "upstream": {
                             "nodes": {
                                 "127.0.0.1:1980": 1,
-                                "127.0.0.1:0": 1
+                                "127.0.0.1:9527": 1
                             },
                             "type": "ewma",
                             "timeout": {
@@ -308,7 +308,7 @@ Connection refused) while connecting to upstream
             local t = require("lib.test_admin").test
 
             --add the 9527 node (invalid node)
-            --remove the 1980 node
+            --add the 9528 node (invalid node)
             --keep two nodes for triggering ewma logic in server_picker function of balancer phase
             local code, body = t('/apisix/admin/routes/1',
                  ngx.HTTP_PUT,
@@ -316,7 +316,7 @@ Connection refused) while connecting to upstream
                         "upstream": {
                             "nodes": {
                                 "127.0.0.1:9527": 1,
-                                "127.0.0.1:0": 1
+                                "127.0.0.1:9528": 1
                             },
                             "type": "ewma",
                             "timeout": {

From e2f04d3f0dcc706dba07cc489c5c8b050d22a1d5 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?=E5=A4=A7=E5=8F=AF?= <hnlq.sysu@gmail.com>
Date: Fri, 29 Jan 2021 08:21:57 +0800
Subject: [PATCH 26/30] trigger test

---
 t/node/ewma.t | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/t/node/ewma.t b/t/node/ewma.t
index 2779f9363114..53bf8485e6f1 100644
--- a/t/node/ewma.t
+++ b/t/node/ewma.t
@@ -338,7 +338,7 @@ Connection refused) while connecting to upstream
             local uri = "http://127.0.0.1:" .. ngx.var.server_port
                         .. "/ewma"
 
-            --should always select the 1980 node, because 0 is invalid
+            --should always return 502, because both 9527 and 9528 are invalid
             local t = {}
             local ports_count = {}
             for i = 1, 12 do

From d24d0a16690eba489d05c773a8dd8dcd0d800843 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?=E5=A4=A7=E5=8F=AF?= <hnlq.sysu@gmail.com>
Date: Fri, 29 Jan 2021 10:40:08 +0800
Subject: [PATCH 27/30] improve filter logic

---
 apisix/balancer/ewma.lua | 11 +++++------
 1 file changed, 5 insertions(+), 6 deletions(-)

diff --git a/apisix/balancer/ewma.lua b/apisix/balancer/ewma.lua
index bf261985f2f7..6975937a6cb3 100644
--- a/apisix/balancer/ewma.lua
+++ b/apisix/balancer/ewma.lua
@@ -154,8 +154,8 @@ local function _ewma_find(ctx, up_nodes)
     end
 
     local filtered_peers
-    for _, peer in ipairs(peers) do
-        if ctx.balancer_tried_servers then
+    if ctx.balancer_tried_servers then
+        for _, peer in ipairs(peers) do
             if not ctx.balancer_tried_servers[get_upstream_name(peer)] then
                 if not filtered_peers then
                     filtered_peers = {}
@@ -164,10 +164,7 @@ local function _ewma_find(ctx, up_nodes)
                 table_insert(filtered_peers, peer)
             end
         end
-    end
-
-    if not filtered_peers then
-        core.log.warn("all endpoints have been retried")
+    else
         filtered_peers = peers
     end
 
@@ -190,6 +187,8 @@ local function _ewma_find(ctx, up_nodes)
 end
 
 local function _ewma_after_balance(ctx, before_retry)
+    ngx.log(ngx.WARN,"--------",core.json.encode(ctx.balancer_tried_servers), ctx.balancer_tried_servers_count)
+
     if before_retry then
         if not ctx.balancer_tried_servers then
             ctx.balancer_tried_servers = core.tablepool.fetch("balancer_tried_servers", 0, 2)

From 116d2d45942d352d5cccef372ddfab33fec710ec Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?=E5=A4=A7=E5=8F=AF?= <hnlq.sysu@gmail.com>
Date: Fri, 29 Jan 2021 10:42:59 +0800
Subject: [PATCH 28/30] remove log

---
 apisix/balancer/ewma.lua | 2 --
 1 file changed, 2 deletions(-)

diff --git a/apisix/balancer/ewma.lua b/apisix/balancer/ewma.lua
index 6975937a6cb3..048a7258d9ee 100644
--- a/apisix/balancer/ewma.lua
+++ b/apisix/balancer/ewma.lua
@@ -187,8 +187,6 @@ local function _ewma_find(ctx, up_nodes)
 end
 
 local function _ewma_after_balance(ctx, before_retry)
-    ngx.log(ngx.WARN,"--------",core.json.encode(ctx.balancer_tried_servers), ctx.balancer_tried_servers_count)
-
     if before_retry then
         if not ctx.balancer_tried_servers then
             ctx.balancer_tried_servers = core.tablepool.fetch("balancer_tried_servers", 0, 2)

From 6f4eb0d22e905966f1a1cc6ac019f17c8ef9d230 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?=E5=A4=A7=E5=8F=AF?= <hnlq.sysu@gmail.com>
Date: Fri, 29 Jan 2021 11:12:32 +0800
Subject: [PATCH 29/30] improve lock logic

---
 apisix/balancer/ewma.lua | 6 +++---
 1 file changed, 3 insertions(+), 3 deletions(-)

diff --git a/apisix/balancer/ewma.lua b/apisix/balancer/ewma.lua
index 048a7258d9ee..b6128966a511 100644
--- a/apisix/balancer/ewma.lua
+++ b/apisix/balancer/ewma.lua
@@ -79,12 +79,12 @@ local function get_or_update_ewma(upstream, rtt, update)
     local lock_err = nil
     if update then
         lock_err = lock(upstream)
+        if lock_err ~= nil then
+            return 0, lock_err
+        end
     end
 
     local ewma = shm_ewma:get(upstream) or 0
-    if lock_err ~= nil then
-        return ewma, lock_err
-    end
 
     local now = ngx_now()
     local last_touched_at = shm_last_touched_at:get(upstream) or 0

From c4c9fb6ab97e3b5261ce889d83d91c3b614a911c Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?=E5=A4=A7=E5=8F=AF?= <hnlq.sysu@gmail.com>
Date: Fri, 29 Jan 2021 11:15:13 +0800
Subject: [PATCH 30/30] fix lint

---
 apisix/balancer/ewma.lua | 3 +--
 1 file changed, 1 insertion(+), 2 deletions(-)

diff --git a/apisix/balancer/ewma.lua b/apisix/balancer/ewma.lua
index b6128966a511..e1b276273063 100644
--- a/apisix/balancer/ewma.lua
+++ b/apisix/balancer/ewma.lua
@@ -76,9 +76,8 @@ local function store_stats(upstream, ewma, now)
 end
 
 local function get_or_update_ewma(upstream, rtt, update)
-    local lock_err = nil
     if update then
-        lock_err = lock(upstream)
+        local lock_err = lock(upstream)
         if lock_err ~= nil then
             return 0, lock_err
         end