Skip to content

Commit

Permalink
Merge pull request #360 from Mashape/feature/mashape-analytics
Browse files Browse the repository at this point in the history
[Feature/plugin] Mashape Analytics
  • Loading branch information
thibaultcha committed Jun 26, 2015
2 parents ebcf8a0 + 249c26c commit e3bfcd3
Show file tree
Hide file tree
Showing 25 changed files with 697 additions and 76 deletions.
1 change: 0 additions & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ before_install:
- bash .travis/setup_cassandra.sh

install:
- sudo make install
- sudo make dev

script:
Expand Down
7 changes: 6 additions & 1 deletion kong-0.3.2-1.rockspec
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,9 @@ build = {
["kong.plugins.keyauth.api"] = "kong/plugins/keyauth/api.lua",
["kong.plugins.keyauth.daos"] = "kong/plugins/keyauth/daos.lua",

["kong.plugins.log_serializers.basic"] = "kong/plugins/log_serializers/basic.lua",
["kong.plugins.log_serializers.alf"] = "kong/plugins/log_serializers/alf.lua",

["kong.plugins.tcplog.handler"] = "kong/plugins/tcplog/handler.lua",
["kong.plugins.tcplog.log"] = "kong/plugins/tcplog/log.lua",
["kong.plugins.tcplog.schema"] = "kong/plugins/tcplog/schema.lua",
Expand All @@ -114,10 +117,12 @@ build = {
["kong.plugins.httplog.schema"] = "kong/plugins/httplog/schema.lua",

["kong.plugins.filelog.handler"] = "kong/plugins/filelog/handler.lua",
["kong.plugins.filelog.log"] = "kong/plugins/filelog/log.lua",
["kong.plugins.filelog.schema"] = "kong/plugins/filelog/schema.lua",
["kong.plugins.filelog.fd_util"] = "kong/plugins/filelog/fd_util.lua",

["kong.plugins.analytics.handler"] = "kong/plugins/analytics/handler.lua",
["kong.plugins.analytics.schema"] = "kong/plugins/analytics/schema.lua",

["kong.plugins.ratelimiting.handler"] = "kong/plugins/ratelimiting/handler.lua",
["kong.plugins.ratelimiting.access"] = "kong/plugins/ratelimiting/access.lua",
["kong.plugins.ratelimiting.schema"] = "kong/plugins/ratelimiting/schema.lua",
Expand Down
1 change: 1 addition & 0 deletions kong.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ plugins_available:
- request_transformer
- response_transformer
- requestsizelimiting
- analytics

## The Kong working directory
## (Make sure you have read and write permissions)
Expand Down
71 changes: 24 additions & 47 deletions kong/kong.lua
Original file line number Diff line number Diff line change
Expand Up @@ -166,9 +166,9 @@ function _M.exec_plugins_certificate()
ngx.ctx.plugin_conf[plugin.name] = load_plugin_conf(ngx.ctx.api.id, nil, plugin.name)
end

local conf = ngx.ctx.plugin_conf[plugin.name]
if not ngx.ctx.stop_phases and (plugin.resolver or conf) then
plugin.handler:certificate(conf and conf.value or nil)
local plugin_conf = ngx.ctx.plugin_conf[plugin.name]
if not ngx.ctx.stop_phases and (plugin.resolver or plugin_conf) then
plugin.handler:certificate(plugin_conf and plugin_conf.value or nil)
end
end

Expand All @@ -178,7 +178,6 @@ end
-- Calls `access()` on every loaded plugin
function _M.exec_plugins_access()
local start = get_now()

ngx.ctx.plugin_conf = {}

-- Iterate over all the plugins
Expand All @@ -193,9 +192,10 @@ function _M.exec_plugins_access()
end
end
end
local conf = ngx.ctx.plugin_conf[plugin.name]
if not ngx.ctx.stop_phases and (plugin.resolver or conf) then
plugin.handler:access(conf and conf.value or nil)

local plugin_conf = ngx.ctx.plugin_conf[plugin.name]
if not ngx.ctx.stop_phases and (plugin.resolver or plugin_conf) then
plugin.handler:access(plugin_conf and plugin_conf.value or nil)
end
end
-- Append any modified querystring parameters
Expand All @@ -205,19 +205,26 @@ function _M.exec_plugins_access()
final_url = final_url.."?"..ngx.encode_args(ngx.req.get_uri_args())
end
ngx.var.backend_url = final_url
ngx.ctx.kong_processing_access = get_now() - start

local t_end = get_now()
ngx.ctx.kong_processing_access = t_end - start
-- Setting a property that will be available for every plugin
ngx.ctx.proxy_started_at = t_end
end

-- Calls `header_filter()` on every loaded plugin
function _M.exec_plugins_header_filter()
local start = get_now()
-- Setting a property that will be available for every plugin
ngx.ctx.proxy_ended_at = start

if not ngx.ctx.stop_phases then
ngx.header["Via"] = constants.NAME.."/"..constants.VERSION

for _, plugin in ipairs(plugins) do
local conf = ngx.ctx.plugin_conf[plugin.name]
if conf then
plugin.handler:header_filter(conf.value)
local plugin_conf = ngx.ctx.plugin_conf[plugin.name]
if plugin_conf then
plugin.handler:header_filter(plugin_conf and plugin_conf.value or nil)
end
end
end
Expand All @@ -229,9 +236,9 @@ function _M.exec_plugins_body_filter()
local start = get_now()
if not ngx.ctx.stop_phases then
for _, plugin in ipairs(plugins) do
local conf = ngx.ctx.plugin_conf[plugin.name]
if conf then
plugin.handler:body_filter(conf.value)
local plugin_conf = ngx.ctx.plugin_conf[plugin.name]
if plugin_conf then
plugin.handler:body_filter(plugin_conf and plugin_conf.value or nil)
end
end
end
Expand All @@ -241,40 +248,10 @@ end
-- Calls `log()` on every loaded plugin
function _M.exec_plugins_log()
if not ngx.ctx.stop_phases then
-- Creating the log variable that will be serialized
local message = {
request = {
uri = ngx.var.request_uri,
request_uri = ngx.var.scheme.."://"..ngx.var.host..":"..ngx.var.server_port..ngx.var.request_uri,
querystring = ngx.req.get_uri_args(), -- parameters, as a table
method = ngx.req.get_method(), -- http method
headers = ngx.req.get_headers(),
size = ngx.var.request_length
},
response = {
status = ngx.status,
headers = ngx.resp.get_headers(),
size = ngx.var.bytes_sent
},
latencies = {
kong = (ngx.ctx.kong_processing_access or 0) +
(ngx.ctx.kong_processing_header_filter or 0) +
(ngx.ctx.kong_processing_body_filter or 0),
proxy = ngx.var.upstream_response_time * 1000,
request = ngx.var.request_time * 1000
},
authenticated_entity = ngx.ctx.authenticated_entity,
api = ngx.ctx.api,
client_ip = ngx.var.remote_addr,
started_at = ngx.req.start_time() * 1000
}

ngx.ctx.log_message = message

for _, plugin in ipairs(plugins) do
local conf = ngx.ctx.plugin_conf[plugin.name]
if conf or plugin.reports then
plugin.handler:log(conf and conf.value or nil)
local plugin_conf = ngx.ctx.plugin_conf[plugin.name]
if plugin_conf then
plugin.handler:log(plugin_conf and plugin_conf.value or nil)
end
end
end
Expand Down
163 changes: 163 additions & 0 deletions kong/plugins/analytics/handler.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
-- Analytics plugin handler.
--
-- How it works:
-- Keep track of calls made to configured APIs on a per-worker basis, using the ALF format
-- (alf_serializer.lua). `:access()` and `:body_filter()` are implemented to record some properties
-- required for the ALF entry.
--
-- When the buffer is full (it reaches the `batch_size` configuration value), send the batch to the server.
-- If the server doesn't accept it, don't flush the data and it'll try again at the next call.
-- If the server accepted the batch, flush the buffer.
--
-- In order to keep Analytics as real-time as possible, we also start a 'delayed timer' running in background.
-- If no requests are made during a certain period of time (the `delay` configuration value), the
-- delayed timer will fire and send the batch + flush the data, not waiting for the buffer to be full.

local http = require "resty_http"
local BasePlugin = require "kong.plugins.base_plugin"
local ALFSerializer = require "kong.plugins.log_serializers.alf"

local ALF_BUFFER = {}
local DELAYED_LOCK = false -- careful: this will only work when lua_code_cache is on
local LATEST_CALL

local ANALYTICS_SOCKET = {
host = "socket.analytics.mashape.com",
port = 80,
path = "/1.0.0/single"
}

local function send_batch(premature, conf, alf)
-- Abort the sending if the entries are empty, maybe it was triggered from the delayed
-- timer, but already sent because we reached the limit in a request later.
if table.getn(alf.har.log.entries) < 1 then
return
end

local message = alf:to_json_string(conf.service_token, conf.environment)

local ok, err
local client = http:new()
client:set_timeout(50000) -- 5 sec

ok, err = client:connect(ANALYTICS_SOCKET.host, ANALYTICS_SOCKET.port)
if not ok then
ngx.log(ngx.ERR, "[analytics] failed to connect to the socket: "..err)
return
end

local res, err = client:request({ path = ANALYTICS_SOCKET.path, body = message })
if not res then
ngx.log(ngx.ERR, "[analytics] failed to send batch: "..err)
end

-- close connection, or put it into the connection pool
if res.headers["connection"] == "close" then
ok, err = client:close()
if not ok then
ngx.log(ngx.ERR, "[analytics] failed to close: "..err)
end
else
client:set_keepalive()
end

if res.status == 200 then
alf:flush_entries()
ngx.log(ngx.DEBUG, "[analytics] successfully saved the batch")
else
ngx.log(ngx.ERR, "[analytics] socket refused the batch: "..res.body)
end
end

-- A handler for delayed batch sending. When no call have been made for X seconds
-- (X being conf.delay), we send the batch to keep analytics as close to real-time
-- as possible.
local delayed_send_handler
delayed_send_handler = function(premature, conf, alf)
-- If the latest call was received during the wait delay, abort the delayed send and
-- report it for X more seconds.
if ngx.now() - LATEST_CALL < conf.delay then
local ok, err = ngx.timer.at(conf.delay, delayed_send_handler, conf, alf)
if not ok then
ngx.log(ngx.ERR, "[analytics] failed to create delayed batch sending timer: ", err)
end
else
DELAYED_LOCK = false -- re-enable creation of a delayed-timer
send_batch(premature, conf, alf)
end
end

--
--
--

local AnalyticsHandler = BasePlugin:extend()

function AnalyticsHandler:new()
AnalyticsHandler.super.new(self, "analytics")
end

function AnalyticsHandler:access(conf)
AnalyticsHandler.super.access(self)

-- Retrieve and keep in memory the bodies for this request
ngx.ctx.analytics = {
req_body = "",
res_body = ""
}

if conf.log_body then
ngx.req.read_body()
ngx.ctx.analytics.req_body = ngx.req.get_body_data()
end
end

function AnalyticsHandler:body_filter(conf)
AnalyticsHandler.super.body_filter(self)

local chunk, eof = ngx.arg[1], ngx.arg[2]
-- concatenate response chunks for ALF's `response.content.text`
if conf.log_body then
ngx.ctx.analytics.res_body = ngx.ctx.analytics.res_body..chunk
end

if eof then -- latest chunk
ngx.ctx.analytics.response_received = ngx.now() * 1000
end
end

function AnalyticsHandler:log(conf)
AnalyticsHandler.super.log(self)

local api_id = ngx.ctx.api.id

-- Create the ALF if not existing for this API
if not ALF_BUFFER[api_id] then
ALF_BUFFER[api_id] = ALFSerializer:new_alf()
end

-- Simply adding the entry to the ALF
local n_entries = ALF_BUFFER[api_id]:add_entry(ngx)

-- Keep track of the latest call for the delayed timer
LATEST_CALL = ngx.now()

if n_entries >= conf.batch_size then
-- Batch size reached, let's send the data
local ok, err = ngx.timer.at(0, send_batch, conf, ALF_BUFFER[api_id])
if not ok then
ngx.log(ngx.ERR, "[analytics] failed to create batch sending timer: ", err)
end
elseif not DELAYED_LOCK then
DELAYED_LOCK = true -- Make sure only one delayed timer is ever pending
-- Batch size not yet reached.
-- Set a timer sending the data only in case nothing happens for awhile or if the batch_size is taking
-- too much time to reach the limit and trigger the flush.
local ok, err = ngx.timer.at(conf.delay, delayed_send_handler, conf, ALF_BUFFER[api_id])
if not ok then
ngx.log(ngx.ERR, "[analytics] failed to create delayed batch sending timer: ", err)
end
end
end

return AnalyticsHandler
9 changes: 9 additions & 0 deletions kong/plugins/analytics/schema.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
return {
fields = {
service_token = { type = "string", required = true },
environment = { type = "string" },
batch_size = { type = "number", default = 100 },
log_body = { type = "boolean", default = false },
delay = { type = "number", default = 10 }
}
}
4 changes: 1 addition & 3 deletions kong/plugins/filelog/handler.lua
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
-- Copyright (C) Mashape, Inc.

local BasePlugin = require "kong.plugins.base_plugin"
local log = require "kong.plugins.filelog.log"
local BasePlugin = require "kong.plugins.base_plugin"

local FileLogHandler = BasePlugin:extend()

Expand Down
10 changes: 7 additions & 3 deletions kong/plugins/filelog/log.lua
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
-- Copyright (C) Mashape, Inc.
local cjson = require "cjson"

local ffi = require "ffi"
local cjson = require "cjson"
local fd_util = require "kong.plugins.filelog.fd_util"
local basic_serializer = require "kong.plugins.log_serializers.basic"

ffi.cdef[[
typedef struct {
Expand All @@ -24,7 +26,7 @@ int fprintf(FILE *stream, const char *format, ...);
-- @param `conf` Configuration table, holds http endpoint details
-- @param `message` Message to be logged
local function log(premature, conf, message)
local message = cjson.encode(message).."\n"
message = cjson.encode(message).."\n"

local f = fd_util.get_fd(conf.path)
if not f then
Expand All @@ -39,7 +41,9 @@ end
local _M = {}

function _M.execute(conf)
local ok, err = ngx.timer.at(0, log, conf, ngx.ctx.log_message)
local message = basic_serializer.serialize(ngx)

local ok, err = ngx.timer.at(0, log, conf, message)
if not ok then
ngx.log(ngx.ERR, "[filelog] failed to create timer: ", err)
end
Expand Down
5 changes: 4 additions & 1 deletion kong/plugins/httplog/handler.lua
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
local basic_serializer = require "kong.plugins.log_serializers.basic"
local BasePlugin = require "kong.plugins.base_plugin"
local log = require "kong.plugins.httplog.log"

Expand All @@ -9,7 +10,9 @@ end

function HttpLogHandler:log(conf)
HttpLogHandler.super.log(self)
log.execute(conf)

local message = basic_serializer.serialize(ngx)
log.execute(conf, message)
end

return HttpLogHandler
Loading

0 comments on commit e3bfcd3

Please sign in to comment.