Skip to content

Commit

Permalink
feat(zipkin): use queues to send data to zipkin server
Browse files Browse the repository at this point in the history
Previously, the zipkin plugin performed some internal buffering to
group spans from one requests together before sending them to the
zipkin server.  This buffer has replaced by using a plugin queue.  The
normal queueing parameters can be used to control the batching
behavior.
  • Loading branch information
hanshuebner authored and Hans Hübner committed Apr 26, 2023
1 parent 8de9db2 commit 9baecad
Show file tree
Hide file tree
Showing 9 changed files with 157 additions and 123 deletions.
5 changes: 4 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,10 @@
[#10417](https://github.com/Kong/kong/pull/10417)
- **Opentelemetry**: plugin version has been updated to match Kong's version
[#10646](https://github.com/Kong/kong/pull/10646)

- **Zipkin**: The zipkin plugin now uses queues for internal
buffering. The standard queue parameter set is available to
control queuing behavior.
[#10753](https://github.com/Kong/kong/pull/10753)

### Additions

Expand Down
3 changes: 3 additions & 0 deletions kong/clustering/compat/removed_fields.lua
Original file line number Diff line number Diff line change
Expand Up @@ -81,5 +81,8 @@ return {
datadog = {
"queue",
},
zipkin = {
"queue",
},
},
}
10 changes: 10 additions & 0 deletions kong/plugins/zipkin/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
# Testing the zipkin plugin:

Run cassandra and postgres locally.

docker run -it -p 15002:9000 -p 15003:9001 moul/grpcbin
docker run -p 9411:9411 -it openzipkin/zipkin:2.19

KONG_SPEC_TEST_GRPCBIN_PORT=15002 \
KONG_SPEC_TEST_GRPCBIN_SSL_PORT=15003 \
bin/busted -o gtest spec/03-plugins/34-zipkin/
27 changes: 1 addition & 26 deletions kong/plugins/zipkin/handler.lua
Original file line number Diff line number Diff line change
Expand Up @@ -43,13 +43,7 @@ end

local function get_reporter(conf)
if reporter_cache[conf] == nil then
reporter_cache[conf] = new_zipkin_reporter(conf.http_endpoint,
conf.default_service_name,
conf.local_service_name,
conf.connect_timeout,
conf.send_timeout,
conf.read_timeout,
kong.log)
reporter_cache[conf] = new_zipkin_reporter(conf)
end
return reporter_cache[conf]
end
Expand Down Expand Up @@ -91,20 +85,6 @@ local function get_or_add_proxy_span(zipkin, timestamp)
end


local function timer_log(premature, reporter)
if premature then
return
end

local ok, err = reporter:flush()
if not ok then
reporter.logger.err("reporter flush ", err)
return
end
end



local initialize_request


Expand Down Expand Up @@ -408,11 +388,6 @@ function ZipkinLogHandler:log(conf) -- luacheck: ignore 212
reporter:report(proxy_span)
request_span:finish(now_mu)
reporter:report(request_span)

local ok, err = ngx.timer.at(0, timer_log, reporter)
if not ok then
kong.log.err("failed to create timer: ", err)
end
end


Expand Down
77 changes: 33 additions & 44 deletions kong/plugins/zipkin/reporter.lua
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
local resty_http = require "resty.http"
local to_hex = require "resty.string".to_hex
local cjson = require "cjson".new()
local Queue = require "kong.tools.queue"

cjson.encode_number_precision(16)

local zipkin_reporter_methods = {}
Expand All @@ -21,17 +23,35 @@ local function ip_kind(addr)
end


local function new(http_endpoint, default_service_name, local_service_name, connect_timeout, send_timeout, read_timeout, logger)
local function send_entries_to_zipkin(conf, entries)
if conf.http_endpoint == nil or conf.http_endpoint == ngx.null then
return true
end

kong.log.debug("zipkin batch size: ", #entries)
local httpc = resty_http.new()
httpc:set_timeouts(conf.connect_timeout, conf.send_timeout, conf.read_timeout)
local res, err = httpc:request_uri(conf.http_endpoint, {
method = "POST",
headers = {
["content-type"] = "application/json",
},
body = cjson.encode(entries),
})
if not res then
return nil, "zipkin request failed: " .. err
elseif res.status < 200 or res.status >= 300 then
return nil, "zipkin server responded unexpectedly: " .. tostring(res.status) .. " " .. tostring(res.reason)
end
return true
end


local function new(conf)
return setmetatable({
default_service_name = default_service_name,
local_service_name = local_service_name,
http_endpoint = http_endpoint,
connect_timeout = connect_timeout,
send_timeout = send_timeout,
read_timeout = read_timeout,
pending_spans = {},
pending_spans_n = 0,
logger = logger,
conf = conf,
default_service_name = conf.default_service_name,
local_service_name = conf.local_service_name,
}, zipkin_reporter_mt)
end

Expand Down Expand Up @@ -88,41 +108,10 @@ function zipkin_reporter_methods:report(span)
annotations = span.annotations,
}

local i = self.pending_spans_n + 1
self.pending_spans[i] = zipkin_span
self.pending_spans_n = i
end


function zipkin_reporter_methods:flush()
if self.pending_spans_n == 0 then
return true
end

local pending_spans = cjson.encode(self.pending_spans)
self.pending_spans = {}
self.pending_spans_n = 0

if self.http_endpoint == nil or self.http_endpoint == ngx.null then
return true
end

local httpc = resty_http.new()
httpc:set_timeouts(self.connect_timeout, self.send_timeout, self.read_timeout)
local res, err = httpc:request_uri(self.http_endpoint, {
method = "POST",
headers = {
["content-type"] = "application/json",
},
body = pending_spans,
})
-- TODO: on failure, retry?
if not res then
return nil, "failed to request: " .. err
elseif res.status < 200 or res.status >= 300 then
return nil, "failed: " .. res.status .. " " .. res.reason
local ok, err = Queue.enqueue(Queue.get_params(self.conf), send_entries_to_zipkin, self.conf, zipkin_span)
if not ok then
kong.log.err("failed to enqueue span: ", err)
end
return true
end


Expand Down
1 change: 1 addition & 0 deletions kong/plugins/zipkin/schema.lua
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ return {
{ http_response_header_for_traceid = { type = "string", default = nil }},
{ phase_duration_flavor = { type = "string", required = true, default = "annotations",
one_of = { "annotations", "tags" } } },
{ queue = typedefs.queue },
},
}, },
},
Expand Down
49 changes: 0 additions & 49 deletions spec/03-plugins/34-zipkin/reporter_spec.lua

This file was deleted.

102 changes: 102 additions & 0 deletions spec/03-plugins/34-zipkin/zipkin_queue_spec.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
local helpers = require "spec.helpers"
local utils = require "kong.tools.utils"
local cjson = require "cjson"


local fmt = string.format

local function wait_for_spans(zipkin_client, expected_spans, service_name)
helpers.wait_until(function()
local received_spans = 0
local res = zipkin_client:get("/api/v2/traces", {
query = {
limit = 1000,
remoteServiceName = service_name,
}
})
local data = assert.response(res).has.status(200)
local all_spans = cjson.decode(data)
for i = 1, #all_spans do
received_spans = received_spans + #all_spans[i]
end
return received_spans == expected_spans
end)
end


describe("queueing behavior", function()
local max_batch_size = 10
local service
local zipkin_client
local proxy_client

lazy_setup(function()
local bp = helpers.get_db_utils(nil, { "services", "routes", "plugins" })

-- enable zipkin plugin globally pointing to mock server
bp.plugins:insert({
name = "zipkin",
protocols = { "http" },
config = {
sample_ratio = 1,
http_endpoint = fmt("http://%s:%d/api/v2/spans", helpers.zipkin_host, helpers.zipkin_port),
static_tags = {
{ name = "static", value = "ok" },
},
default_header_type = "b3-single",
phase_duration_flavor = "tags",
queue = {
max_batch_size = max_batch_size,
max_coalescing_delay = 10,
}
}
})

service = bp.services:insert {
name = string.lower("http-" .. utils.random_string()),
}

-- kong (http) mock upstream
bp.routes:insert({
name = string.lower("route-" .. utils.random_string()),
service = service,
hosts = { "http-route" },
preserve_host = true,
paths = { "/" },
})

helpers.start_kong({
nginx_conf = "spec/fixtures/custom_nginx.template",
stream_listen = helpers.get_proxy_ip(false) .. ":19000",
})

proxy_client = helpers.proxy_client()
zipkin_client = helpers.http_client(helpers.zipkin_host, helpers.zipkin_port)
end)


teardown(function()
helpers.stop_kong()
end)

before_each(function()
helpers.clean_logfile() -- prevent log assertions from poisoning each other.
end)

it("batches spans from multiple requests", function()
local count = 10

for _ = 1, count do
local r = proxy_client:get("/", {
headers = {
["x-b3-sampled"] = "1",
host = "http-route",
["zipkin-tags"] = "foo=bar; baz=qux"
},
})
assert.response(r).has.status(200)
end
wait_for_spans(zipkin_client, 3 * count, service.name)
assert.logfile().has.line("zipkin batch size: " .. tostring(max_batch_size), true)
end)
end)
6 changes: 3 additions & 3 deletions spec/03-plugins/34-zipkin/zipkin_spec.lua
Original file line number Diff line number Diff line change
Expand Up @@ -410,7 +410,7 @@ for _, strategy in helpers.each_strategy() do
-- wait for zero-delay timer
helpers.wait_timer("zipkin", true, "any-finish")

assert.logfile().has.line("[zipkin] reporter flush failed to request: timeout", true, 2)
assert.logfile().has.line("zipkin request failed: timeout", true, 2)
end)

it("times out if upstream zipkin server takes too long to respond", function()
Expand All @@ -426,7 +426,7 @@ for _, strategy in helpers.each_strategy() do
-- wait for zero-delay timer
helpers.wait_timer("zipkin", true, "any-finish")

assert.logfile().has.line("[zipkin] reporter flush failed to request: timeout", true, 2)
assert.logfile().has.line("zipkin request failed: timeout", true, 2)
end)

it("connection refused if upstream zipkin server is not listening", function()
Expand All @@ -442,7 +442,7 @@ for _, strategy in helpers.each_strategy() do
-- wait for zero-delay timer
helpers.wait_timer("zipkin", true, "any-finish")

assert.logfile().has.line("[zipkin] reporter flush failed to request: connection refused", true, 2)
assert.logfile().has.line("zipkin request failed: connection refused", true, 2)
end)
end)
end
Expand Down

0 comments on commit 9baecad

Please sign in to comment.