Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(zipkin): support b3 req header #3551

Merged
merged 2 commits into from
Feb 22, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
79 changes: 67 additions & 12 deletions apisix/plugins/zipkin.lua
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ local zipkin_codec = require("apisix.plugins.zipkin.codec")
local new_random_sampler = require("apisix.plugins.zipkin.random_sampler").new
local new_reporter = require("apisix.plugins.zipkin.reporter").new
local ngx = ngx
local ngx_re = require("ngx.re")
local pairs = pairs
local tonumber = tonumber

Expand Down Expand Up @@ -73,6 +74,27 @@ local function create_tracer(conf,ctx)
return tracer
end


local function parse_b3(b3)
-- See https://github.com/openzipkin/b3-propagation#single-header
if b3 == "0" then
return nil, nil, nil, "0", nil
end

local pieces, err = ngx_re.split(b3, "-", nil, nil, 4)
if not pieces then
return err
end
if not pieces[1] then
return "missing trace_id"
end
if not pieces[2] then
return "missing span_id"
end
return nil, pieces[1], pieces[2], pieces[3], pieces[4]
end


function _M.rewrite(plugin_conf, ctx)
local conf = core.table.clone(plugin_conf)
-- once the server started, server_addr and server_port won't change, so we can cache it.
Expand All @@ -88,29 +110,58 @@ function _M.rewrite(plugin_conf, ctx)
local headers = core.request.headers(ctx)
local per_req_sample_ratio

-- X-B3-Sampled: if the client decided to sample this request, we do too.
local sample = headers["x-b3-sampled"]
if sample == "1" or sample == "true" then
per_req_sample_ratio = 1
elseif sample == "0" or sample == "false" then
per_req_sample_ratio = 0
end

-- X-B3-Flags: if it equals '1' then it overrides sampling policy
-- We still want to warn on invalid sample header, so do this after the above
-- We still want to warn on invalid sampled header, so do this after the above
local debug = headers["x-b3-flags"]
if debug == "1" then
per_req_sample_ratio = 1
end

local trace_id, request_span_id, sampled, parent_span_id
local b3 = headers["b3"]
if b3 then
-- don't pass b3 header by default
core.request.set_header(ctx, "b3", nil)

local err
err, trace_id, request_span_id, sampled, parent_span_id = parse_b3(b3)

if err then
core.log.warn("invalid b3 header: ", b3, ", ignored: ", err)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use error is better here

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@membphis
Raise error in a request may be considered a bug of APISIX (as it will provide 500 and backtrack).
Maybe we can return 400 to reject it?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think 400 is better, since an invalid header is always caused by client.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think 400 is better, since an invalid header is always caused by client.

agree this too.

Raise error in a request may be considered a bug of APISIX (as it will provide 500 and backtrack).

I means core.log.error(****) ^_^

return
end

if sampled == "d" then
core.request.set_header(ctx, "x-b3-flags", "1")
sampled = "1"
end
else
-- X-B3-Sampled: if the client decided to sample this request, we do too.
sampled = headers["x-b3-sampled"]
trace_id = headers["x-b3-traceid"]
parent_span_id = headers["x-b3-parentspanid"]
request_span_id = headers["x-b3-spanid"]
end

if sampled == "1" or sampled == "true" then
per_req_sample_ratio = 1
elseif sampled == "0" or sampled == "false" then
per_req_sample_ratio = 0
end

ctx.opentracing_sample = tracer.sampler:sample(per_req_sample_ratio or conf.sample_ratio)
if not ctx.opentracing_sample then
core.request.set_header("x-b3-sampled", "0")
core.request.set_header(ctx, "x-b3-sampled", "0")
return
end

local wire_context = tracer:extract("http_headers",
core.request.headers(ctx))
local zipkin_ctx = core.tablepool.fetch("zipkin_ctx", 0, 3)
zipkin_ctx.trace_id = trace_id
zipkin_ctx.parent_span_id = parent_span_id
zipkin_ctx.request_span_id = request_span_id
ctx.zipkin = zipkin_ctx

local wire_context = tracer:extract("http_headers", ctx)

local start_timestamp = ngx.req.start_time()
local request_span = tracer:start_span("apisix.request", {
Expand Down Expand Up @@ -205,6 +256,10 @@ function _M.log(conf, ctx)
end

opentracing.request_span:finish(log_end_time)

if ctx.zipkin_ctx then
core.tablepool.release("zipkin_ctx", ctx.zipkin_ctx)
end
end

return _M
14 changes: 10 additions & 4 deletions apisix/plugins/zipkin/codec.lua
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,13 @@ local function from_hex(str)
end

local function new_extractor()
return function(headers)
return function(ctx)
local had_invalid_id = false

local trace_id = headers["x-b3-traceid"]
local parent_span_id = headers["x-b3-parentspanid"]
local request_span_id = headers["x-b3-spanid"]
local zipkin_ctx = ctx.zipkin
local trace_id = zipkin_ctx.trace_id
local parent_span_id = zipkin_ctx.parent_span_id
local request_span_id = zipkin_ctx.request_span_id

-- Validate trace id
if trace_id and
Expand Down Expand Up @@ -68,13 +69,18 @@ local function new_extractor()

-- Process jaegar baggage header
local baggage = {}
local headers = core.request.headers(ctx)
for k, v in pairs(headers) do
local baggage_key = k:match("^uberctx%-(.*)$")
if baggage_key then
baggage[baggage_key] = ngx.unescape_uri(v)
end
end

core.log.info("new span context: trace id: ", trace_id,
", span id: ", request_span_id,
", parent span id: ", parent_span_id)

trace_id = from_hex(trace_id)
parent_span_id = from_hex(parent_span_id)
request_span_id = from_hex(request_span_id)
Expand Down
141 changes: 141 additions & 0 deletions t/plugin/zipkin2.t
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
use t::APISIX 'no_plan';

repeat_each(1);
no_long_string();
no_root_location();
log_level("info");

add_block_preprocessor(sub {
my ($block) = @_;

if (!$block->request) {
$block->set_value("request", "GET /echo");
}

if (!$block->no_error_log) {
$block->set_value("no_error_log", "[error]\n[alert]");
}
});

run_tests;

__DATA__

=== TEST 1: b3 single header
--- config
location /t {
content_by_lua_block {
local t = require("lib.test_admin").test
local code, body = t('/apisix/admin/routes/1',
ngx.HTTP_PUT,
[[{
"plugins": {
"zipkin": {
"endpoint": "http://127.0.0.1:9999/mock_zipkin",
"sample_ratio": 1
}
},
"upstream": {
"nodes": {
"127.0.0.1:1980": 1
},
"type": "roundrobin"
},
"uri": "/echo"
}]]
)

if code >= 300 then
ngx.status = code
end
ngx.say(body)
}
}
--- request
GET /t
--- response_body
passed



=== TEST 2: sanity
--- more_headers
b3: 80f198ee56343ba864fe8b2a57d3eff7-e457b5a2e4d86bd1-1-05e3ac9a4f6e3b90
--- response_headers
x-b3-sampled: 1
--- raw_response_headers_unlike
b3:
--- error_log
new span context: trace id: 80f198ee56343ba864fe8b2a57d3eff7, span id: e457b5a2e4d86bd1, parent span id: 05e3ac9a4f6e3b90



=== TEST 3: invalid header
--- more_headers
b3: 80f198ee56343ba864fe8b2a57d3eff7
--- response_headers
x-b3-sampled:
--- error_log
invalid b3 header



=== TEST 4: disable via b3
--- more_headers
b3: 80f198ee56343ba864fe8b2a57d3eff7-e457b5a2e4d86bd1-0-05e3ac9a4f6e3b90
--- response_headers
x-b3-sampled: 0



=== TEST 5: disable via b3 (abbr)
--- more_headers
b3: 0
--- response_headers
x-b3-sampled: 0



=== TEST 6: debug via b3
--- more_headers
b3: 80f198ee56343ba864fe8b2a57d3eff7-e457b5a2e4d86bd1-d-05e3ac9a4f6e3b90
--- response_headers
x-b3-sampled: 1
x-b3-flags: 1



=== TEST 7: b3 without parent span id
--- more_headers
b3: 80f198ee56343ba864fe8b2a57d3eff7-e457b5a2e4d86bd1-d
--- response_headers
x-b3-sampled: 1
x-b3-flags: 1
--- error_log
new span context: trace id: 80f198ee56343ba864fe8b2a57d3eff7, span id: e457b5a2e4d86bd1, parent span id: nil



=== TEST 8: b3 without sampled & parent span id
--- more_headers
b3: 80f198ee56343ba864fe8b2a57d3eff7-e457b5a2e4d86bd1
--- response_headers
x-b3-sampled: 1
--- error_log
new span context: trace id: 80f198ee56343ba864fe8b2a57d3eff7, span id: e457b5a2e4d86bd1, parent span id: nil