Skip to content

Commit

Permalink
feat(APIAnalytics) Implement timer to flush data
Browse files Browse the repository at this point in the history
If the API has low traffic, this timer will flush the data to API
Analytics after some delay, otherwise the batch size is never reached,
and API Analytics is not real-time anymore.
  • Loading branch information
thibaultcha committed Jun 2, 2015
1 parent b4f001d commit 6cf77f6
Show file tree
Hide file tree
Showing 4 changed files with 29 additions and 16 deletions.
23 changes: 19 additions & 4 deletions kong/plugins/apianalytics/handler.lua
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,19 @@ local APIANALYTICS_SOCKET = {
}

local function send_batch(premature, conf, alf)
-- Abort the sending if the entries are empty, maybe it was triggered from the delayed
-- timer, but already sent because we reached the limit in a request later.
if table.getn(alf.har.log.entries) < 1 then
return
end

local message = alf:to_json_string(conf.service_token)

local ok, err
local client = http:new()
client:set_timeout(50000) -- 5 sec

local ok, err = client:connect(APIANALYTICS_SOCKET.host, APIANALYTICS_SOCKET.port)
ok, err = client:connect(APIANALYTICS_SOCKET.host, APIANALYTICS_SOCKET.port)
if not ok then
ngx.log(ngx.ERR, "[apianalytics] failed to connect to the socket: "..err)
return
Expand All @@ -27,7 +34,7 @@ local function send_batch(premature, conf, alf)

-- close connection, or put it into the connection pool
if res.headers["connection"] == "close" then
local ok, err = client:close()
ok, err = client:close()
if not ok then
ngx.log(ngx.ERR, "[apianalytics] failed to close: "..err)
end
Expand Down Expand Up @@ -100,11 +107,19 @@ function APIAnalyticsHandler:log(conf)
-- Simply adding the entry to the ALF
local n_entries = ngx.shared.apianalytics[api_id]:add_entry(ngx)

-- Batch size reached, let's send the data
if n_entries >= conf.batch_size then
-- Batch size reached, let's send the data
local ok, err = ngx.timer.at(0, send_batch, conf, ngx.shared.apianalytics[api_id])
if not ok then
ngx.log(ngx.ERR, "[apianalytics] failed to create timer: ", err)
ngx.log(ngx.ERR, "[apianalytics] failed to create batch sending timer: ", err)
end
else
-- Batch size not yet reached
-- Set a timer sending the data only in case nothing happens for awhile or if the batch_size is taking
-- too much time to reach the limit and trigger the flush.
local ok, err = ngx.timer.at(conf.delay, send_batch, conf, ngx.shared.apianalytics[api_id])
if not ok then
ngx.log(ngx.ERR, "[apianalytics] failed to create delayed batch sending timer: ", err)
end
end
end
Expand Down
3 changes: 2 additions & 1 deletion kong/plugins/apianalytics/schema.lua
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
return {
service_token = { type = "string", required = true },
batch_size = { type = "number", default = 100 },
log_body = { type = "boolean", default = false }
log_body = { type = "boolean", default = false },
delay = { type = "number", default = 10 }
}
7 changes: 2 additions & 5 deletions kong/plugins/filelog/handler.lua
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
local json = require "cjson"
local log = require "kong.plugins.filelog.log"
local BasePlugin = require "kong.plugins.base_plugin"
local basic_serializer = require "kong.plugins.log_serializers.basic"

local FileLogHandler = BasePlugin:extend()

Expand All @@ -10,9 +9,7 @@ end

function FileLogHandler:log(conf)
FileLogHandler.super.log(self)

local message = basic_serializer.serialize(ngx)
ngx.log(ngx.INFO, json.encode(message))
log.execute(conf)
end

return FileLogHandler
12 changes: 6 additions & 6 deletions spec/plugins/apianalytics/alf_serializer_spec.lua
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ local function sameEntry(state, arguments)
fixture_entry.time = nil
fixture_entry.timings = nil

assert.same(fixture_entry, entry)
assert.are.same(fixture_entry, entry)
return true
end

Expand Down Expand Up @@ -69,7 +69,7 @@ describe("ALF serializer", function()

it("should serialize an ngx GET request/response", function()
local entry = alf:serialize_entry(fixtures.GET.NGX_STUB)
assert.are.same(fixtures.GET.ENTRY, entry)
assert.are.sameEntry(fixtures.GET.ENTRY, entry)
end)

end)
Expand All @@ -78,11 +78,11 @@ describe("ALF serializer", function()

it("should add the entry to the serializer entries property", function()
alf:add_entry(fixtures.GET.NGX_STUB)
assert.are.same(1, table.getn(alf.har.log.entries))
assert.equal(1, table.getn(alf.har.log.entries))
assert.are.sameEntry(fixtures.GET.ENTRY, alf.har.log.entries[1])

alf:add_entry(fixtures.GET.NGX_STUB)
assert.are.same(2, table.getn(alf.har.log.entries))
assert.equal(2, table.getn(alf.har.log.entries))
assert.are.sameEntry(fixtures.GET.ENTRY, alf.har.log.entries[2])
end)

Expand All @@ -102,7 +102,7 @@ describe("ALF serializer", function()

it("should return a JSON string", function()
local json_str = alf:to_json_string("stub_service_token")
assert.are.same("string", type(json_str))
assert.equal("string", type(json_str))
end)

end)
Expand All @@ -111,7 +111,7 @@ describe("ALF serializer", function()

it("should remove any existing entry", function()
alf:flush_entries()
assert.are.same(0, table.getn(alf.har.log.entries))
assert.equal(0, table.getn(alf.har.log.entries))
end)

end)
Expand Down

0 comments on commit 6cf77f6

Please sign in to comment.