Skip to content

Commit

Permalink
feat(APIAnalytics) config options + polishing
Browse files Browse the repository at this point in the history
- fix: don't mix up different APIs.

Using a shared memory zone for having per-API ALFs instead of one ALF
having all the APIs entries, which was a mistake.
  • Loading branch information
thibaultcha committed May 29, 2015
1 parent c7194bc commit b227424
Show file tree
Hide file tree
Showing 13 changed files with 236 additions and 125 deletions.
2 changes: 1 addition & 1 deletion kong/kong.lua
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ function _M.exec_plugins_certificate()
return
end

-- Calls plugins_access() on every loaded plugin
-- Calls plugins' access() on every loaded plugin
function _M.exec_plugins_access()
-- Setting a property that will be available for every plugin
ngx.ctx.started_at = ngx.req.start_time()
Expand Down
48 changes: 33 additions & 15 deletions kong/plugins/apianalytics/handler.lua
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,16 @@ local BasePlugin = require "kong.plugins.base_plugin"
local ALFSerializer = require "kong.plugins.log_serializers.alf"

local APIANALYTICS_SOCKET = {
host = "localhost",
host = "localhost", -- socket.apianalytics.mashape.com
port = 58000,
path = "/alf_1.0.0"
}

local function send_batch(premature, message)
local function send_batch(premature, conf, alf)
local message = alf:to_json_string(conf.service_token)

local client = http:new()
client:set_timeout(1000) -- 1 sec
client:set_timeout(50000) -- 5 sec

local ok, err = client:connect(APIANALYTICS_SOCKET.host, APIANALYTICS_SOCKET.port)
if not ok then
Expand All @@ -21,22 +23,20 @@ local function send_batch(premature, message)
local res, err = client:request({ path = APIANALYTICS_SOCKET.path, body = message })
if not res then
ngx.log(ngx.ERR, "[apianalytics] failed to send batch: "..err)
return
end

-- close connection, or put it into the connection pool
if res.headers["connection"] == "close" then
local ok, err = client:close()
if not ok then
ngx.log(ngx.ERR, "[apianalytics] failed to close: "..err)
return
end
else
client:set_keepalive()
end

if res.status == 200 then
ALFSerializer:flush_entries()
alf:flush_entries()
ngx.log(ngx.DEBUG, "[apianalytics] successfully saved the batch")
else
ngx.log(ngx.ERR, "[apianalytics] socket refused the batch: "..res.body)
Expand All @@ -56,19 +56,26 @@ end
function APIAnalyticsHandler:access(conf)
APIAnalyticsHandler.super.access(self)

ngx.req.read_body()
-- Retrieve and keep in memory the bodies for this request
ngx.ctx.apianalytics = {
req_body = ngx.req.get_body_data(),
req_body = "",
res_body = ""
}

if conf.log_body then
ngx.req.read_body()
ngx.ctx.apianalytics.req_body = ngx.req.get_body_data()
end
end

function APIAnalyticsHandler:body_filter(conf)
APIAnalyticsHandler.super.body_filter(self)

-- concatenate response chunks for ALF's `response.content.text`
local chunk, eof = ngx.arg[1], ngx.arg[2]
ngx.ctx.apianalytics.res_body = ngx.ctx.apianalytics.res_body..chunk
-- concatenate response chunks for ALF's `response.content.text`
if conf.log_body then
ngx.ctx.apianalytics.res_body = ngx.ctx.apianalytics.res_body..chunk
end

if eof then -- latest chunk
ngx.ctx.apianalytics.response_received = ngx.now()
Expand All @@ -78,13 +85,24 @@ end
function APIAnalyticsHandler:log(conf)
APIAnalyticsHandler.super.log(self)

local entries_size = ALFSerializer:add_entry(ngx)
local api_id = ngx.ctx.api.id

-- Shared memory zone for apianalytics ALFs
if not ngx.shared.apianalytics then
ngx.shared.apianalytics = {}
end

-- Create the ALF if not existing for this API
if not ngx.shared.apianalytics[api_id] then
ngx.shared.apianalytics[api_id] = ALFSerializer:new_alf()
end

if entries_size > 2 then
local message = ALFSerializer:to_json_string("54d2b98ee0d5076065fd6f93")
print("MESSAGE: "..message)
-- Simply adding the entry to the ALF
local n_entries = ngx.shared.apianalytics[api_id]:add_entry(ngx)

local ok, err = ngx.timer.at(0, send_batch, message)
-- Batch size reached, let's send the data
if n_entries >= conf.batch_size then
local ok, err = ngx.timer.at(0, send_batch, conf, ngx.shared.apianalytics[api_id])
if not ok then
ngx.log(ngx.ERR, "[apianalytics] failed to create timer: ", err)
end
Expand Down
5 changes: 3 additions & 2 deletions kong/plugins/apianalytics/schema.lua
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
return {
serviceToken = { type = "string", required = true },
log_body = { type = "boolean", default = true }
service_token = { type = "string", required = true },
batch_size = { type = "number", default = 100 },
log_body = { type = "boolean", default = false }
}
10 changes: 5 additions & 5 deletions kong/plugins/httplog/log.lua
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,9 @@ local _M = {}
-- @param `message` Message to be logged
-- @return `payload` http payload
local function generate_post_payload(method, parsed_url, message)
local body = cjson.encode(message);
local body = cjson.encode(message)
local payload = string.format(
"%s %s HTTP/1.1\r\nHost: %s\r\nConnection: Keep-Alive\r\nContent-Type: application/json\r\nContent-Length: %s\r\n\r\n%s",
"%s %s HTTP/1.1\r\nHost: %s\r\nConnection: Keep-Alive\r\nContent-Type: application/json\r\nContent-Length: %s\r\n\r\n%s",
method:upper(), parsed_url.path, parsed_url.host, string.len(body), body)
return payload
end
Expand Down Expand Up @@ -65,11 +65,11 @@ local function log(premature, conf, message)
end
end

function _M.execute(conf)
local ok, err = ngx.timer.at(0, log, conf, ngx.ctx.log_message)
function _M.execute(conf, message)
local ok, err = ngx.timer.at(0, log, conf, message)
if not ok then
ngx.log(ngx.ERR, "[httplog] failed to create timer: ", err)
end
end

return _M
return _M
60 changes: 38 additions & 22 deletions kong/plugins/log_serializers/alf.lua
Original file line number Diff line number Diff line change
@@ -1,6 +1,18 @@
-- ALF serializer module.
-- ALF is the format supported by API Analytics (http://apianalytics.com)
--
-- This module represents _one_ ALF entry, which can have multiple requests entries.
-- # Usage:
--
-- ## Create the ALF like so:
-- local alf = ALFSerializer:new_alf()
--
-- ## Add entries:
-- local n_entries = alf:add_entry(ngx)
--
-- ## Output the ALF with all its entries as JSON:
-- local json_str = alf:to_json_str(service_token)
--
-- - ALF specifications: https://github.com/Mashape/api-log-format
-- - Nginx lua module documentation: http://wiki.nginx.org/HttpLuaModule
-- - ngx_http_core_module: http://wiki.nginx.org/HttpCoreModule#.24http_HEADER
Expand All @@ -12,20 +24,24 @@ local EMPTY_ARRAY_PLACEHOLDER = "__empty_array_placeholder__"
local alf_mt = {}
alf_mt.__index = alf_mt

local ALF = {
version = "1.0.0",
serviceToken = "", -- will be filled by to_json_string()
har = {
log = {
version = "1.2",
creator = {
name = "kong-api-analytics-plugin",
version = "0.1"
},
entries = {}
function alf_mt:new_alf()
local ALF = {
version = "1.0.0",
serviceToken = "", -- will be filled by to_json_string()
har = {
log = {
version = "1.2",
creator = {
name = "kong-api-analytics-plugin",
version = "0.1"
},
entries = {}
}
}
}
}

return setmetatable(ALF, self)
end

-- Transform a key/value lua table into an array of elements with `name`, `value`.
-- Since Lua won't recognize {} as an empty array but an empty object, we need to force it
Expand Down Expand Up @@ -65,6 +81,7 @@ end
-- Serialize `ngx` into one ALF entry.
-- For performance reasons, it tries to use the NGINX Lua API instead of
-- ngx_http_core_module when possible.
-- Public for unit testing.
function alf_mt:serialize_entry(ngx)
-- Extracted data
local req_headers = ngx.req.get_headers()
Expand All @@ -79,23 +96,22 @@ function alf_mt:serialize_entry(ngx)
-- ALF properties
-- timers
local send_time = round(ngx.ctx.proxy_started_at - started_at)
local wait_time = round(ngx.ctx.proxy_ended_at - ngx.ctx.proxy_started_at)
local receive_time = round(apianalytics_data.response_received - ngx.ctx.proxy_ended_at)
local wait_time = ngx.ctx.proxy_ended_at - ngx.ctx.proxy_started_at
local receive_time = apianalytics_data.response_received - ngx.ctx.proxy_ended_at
-- headers and headers size
local req_headers_str, res_headers_str= "", ""
local req_headers_str, res_headers_str = "", ""
local req_headers_arr = dic_to_array(req_headers, function(k, v) req_headers_str = req_headers_str..k..v end)
local res_headers_arr = dic_to_array(res_headers, function(k, v) res_headers_str = res_headers_str..k..v end)
local req_headers_size = string.len(req_headers_str)
local res_headers_size = string.len(res_headers_str)
-- values extracted from headers
local alf_req_mimeType = req_headers["Content-Type"] and req_headers["Content-Type"] or "application/octet-stream"
local alf_res_mimeType = res_headers["Content-Type"] and res_headers["Content-Type"] or "application/octet-stream"
local alf_req_bodySize = req_headers["Content-Length"] and req_headers["Content-Length"] or -1

return {
startedDateTime = os.date("!%Y-%m-%dT%TZ", started_at),
clientIPAddress = ngx.var.remote_addr,
time = send_time + wait_time + receive_time,
time = round(send_time + wait_time + receive_time),
request = {
method = ngx.req.get_method(),
url = ngx.var.scheme.."://"..ngx.var.host..ngx.var.uri,
Expand All @@ -104,7 +120,7 @@ function alf_mt:serialize_entry(ngx)
headers = req_headers_arr,
headersSize = req_headers_size,
cookies = {EMPTY_ARRAY_PLACEHOLDER},
bodySize = tonumber(alf_req_bodySize),
bodySize = string.len(req_body),
postData = {
mimeType = alf_req_mimeType,
params = dic_to_array(ngx.req.get_post_args()),
Expand All @@ -128,9 +144,9 @@ function alf_mt:serialize_entry(ngx)
},
cache = {},
timings = {
send = send_time,
wait = wait_time,
receive = receive_time,
send = round(send_time),
wait = round(wait_time),
receive = round(receive_time),
blocked = -1,
connect = -1,
dns = -1,
Expand Down Expand Up @@ -160,4 +176,4 @@ function alf_mt:flush_entries()
self.har.log.entries = {}
end

return setmetatable(ALF, alf_mt)
return alf_mt
9 changes: 6 additions & 3 deletions kong/tools/http_client.lua
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,13 @@ local function http_call(options)
options.protocol = "tlsv1"
options.mode = "client"
options.options = "all"
end

local _, code, headers = http.request(options)
return resp[1], code, headers
local _, code, headers = https.request(options)
return resp[1], code, headers
else
local _, code, headers = http.request(options)
return resp[1], code, headers
end
end

local function with_body(method)
Expand Down
1 change: 1 addition & 0 deletions kong/tools/ngx_stub.lua
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ _G.ngx = {
exit = function() end,
say = function() end,
log = function() end,
socket = { tcp = {} },
time = function() return os.time() end,
re = {
match = reg.match
Expand Down
6 changes: 4 additions & 2 deletions kong/vendor/resty_http.lua
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ local type = type
local tonumber = tonumber
local tostring = tostring
local setmetatable = setmetatable
local encode_args = ngx.encode_args
local tcp = ngx.socket.tcp
local concat = table.concat
local insert = table.insert
local upper = string.upper
Expand Down Expand Up @@ -46,7 +48,7 @@ local function _req_header(conf, opts)

-- Normalize query string
if type(opts.query) == "table" then
opts.query = ngx.encode_args(opts.query)
opts.query = encode_args(opts.query)
end

-- Append query string
Expand Down Expand Up @@ -216,7 +218,7 @@ end
--------------------------------------

function new(self)
local sock, err = ngx.socket.tcp()
local sock, err = tcp()
if not sock then
return nil, err
end
Expand Down
20 changes: 10 additions & 10 deletions spec/integration/proxy/resolver_spec.lua
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ describe("Resolver", function()
spec_helper.prepare_db()
spec_helper.insert_fixtures {
api = {
{ name = "tests host resolver 1", public_dns = "mocbkin.com", target_url = "http://mockbin.com" },
{ name = "tests host resolver 2", public_dns = "mocbkin-auth.com", target_url = "http://mockbin.com" }
{ name = "tests host resolver 1", public_dns = "mockbin.com", target_url = "http://mockbin.com" },
{ name = "tests host resolver 2", public_dns = "mockbin-auth.com", target_url = "http://mockbin.com" }
},
plugin_configuration = {
{ name = "keyauth", value = {key_names = {"apikey"} }, __api = 2 }
Expand Down Expand Up @@ -55,7 +55,7 @@ describe("Resolver", function()
describe("SSL", function()

it("should work when calling SSL port", function()
local response, status = http_client.get(STUB_GET_SSL_URL, nil, { host = "mocbkin.com" })
local response, status = http_client.get(STUB_GET_SSL_URL, nil, { host = "mockbin.com" })
assert.are.equal(200, status)
assert.truthy(response)
local parsed_response = cjson.decode(response)
Expand Down Expand Up @@ -101,24 +101,24 @@ describe("Resolver", function()
describe("Existing API", function()

it("should proxy when the API is in Kong", function()
local _, status = http_client.get(STUB_GET_URL, nil, { host = "mocbkin.com"})
local _, status = http_client.get(STUB_GET_URL, nil, { host = "mockbin.com"})
assert.are.equal(200, status)
end)

it("should proxy when the Host header is not trimmed", function()
local _, status = http_client.get(STUB_GET_URL, nil, { host = " mocbkin.com "})
local _, status = http_client.get(STUB_GET_URL, nil, { host = " mockbin.com "})
assert.are.equal(200, status)
end)

it("should return the correct Server and Via headers when the request was proxied", function()
local _, status, headers = http_client.get(STUB_GET_URL, nil, { host = "mocbkin.com"})
local _, status, headers = http_client.get(STUB_GET_URL, nil, { host = "mockbin.com"})
assert.are.equal(200, status)
assert.are.equal("cloudflare-nginx", headers.server)
assert.are.equal(constants.NAME.."/"..constants.VERSION, headers.via)
end)

it("should return the correct Server and no Via header when the request was NOT proxied", function()
local _, status, headers = http_client.get(STUB_GET_URL, nil, { host = "mocbkin-auth.com"})
local _, status, headers = http_client.get(STUB_GET_URL, nil, { host = "mockbin-auth.com"})
assert.are.equal(403, status)
assert.are.equal(constants.NAME.."/"..constants.VERSION, headers.server)
assert.falsy(headers.via)
Expand All @@ -131,7 +131,7 @@ describe("Resolver", function()

local tcp = socket.tcp()
tcp:connect(host, port)
tcp:send("GET "..parsed_url.path.." HTTP/1.0\r\nHost: mocbkin.com\r\n\r\n");
tcp:send("GET "..parsed_url.path.." HTTP/1.0\r\nHost: mockbin.com\r\n\r\n");
local response = ""
while true do
local s, status, partial = tcp:receive()
Expand All @@ -150,7 +150,7 @@ describe("Resolver", function()

local tcp = socket.tcp()
tcp:connect(host, port)
tcp:send("GET "..parsed_url.path.." HTTP/1.0\r\nHost: fake.com\r\nHost: mocbkin.com\r\n\r\n");
tcp:send("GET "..parsed_url.path.." HTTP/1.0\r\nHost: fake.com\r\nHost: mockbin.com\r\n\r\n");
local response = ""
while true do
local s, status, partial = tcp:receive()
Expand All @@ -163,7 +163,7 @@ describe("Resolver", function()
end)

it("should proxy when the request has no Host header but the X-Host-Override header", function()
local _, status = http_client.get(STUB_GET_URL, nil, { ["X-Host-Override"] = "mocbkin.com"})
local _, status = http_client.get(STUB_GET_URL, nil, { ["X-Host-Override"] = "mockbin.com"})
assert.are.equal(200, status)
end)

Expand Down
Loading

0 comments on commit b227424

Please sign in to comment.