Skip to content

Commit

Permalink
fix: save and restore pb state (apache#9606)
Browse files Browse the repository at this point in the history
  • Loading branch information
kingluo authored and AlinsRan committed Jun 25, 2023
1 parent f815a11 commit 7ae3031
Show file tree
Hide file tree
Showing 6 changed files with 204 additions and 7 deletions.
3 changes: 2 additions & 1 deletion apisix/core/pubsub.lua
Original file line number Diff line number Diff line change
Expand Up @@ -185,9 +185,10 @@ function _M.wait(self)
end

-- recovery of stored pb_store
pb.state(pb_state)
local pb_old_state = pb.state(pb_state)

local data, err = pb.decode("PubSubReq", raw_data)
pb.state(pb_old_state)
if not data then
log.error("pubsub server receives undecodable data, err: ", err)
send_error(ws, 0, "wrong command")
Expand Down
4 changes: 2 additions & 2 deletions apisix/plugins/grpc-transcode/proto.lua
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ end

local function compile_proto(content)
-- clear pb state
pb.state(nil)
local old_pb_state = pb.state(nil)

local compiled, err = compile_proto_text(content)
if not compiled then
Expand All @@ -110,7 +110,7 @@ local function compile_proto(content)
end

-- fetch pb state
compiled.pb_state = pb.state(nil)
compiled.pb_state = pb.state(old_pb_state)
return compiled
end

Expand Down
2 changes: 2 additions & 0 deletions apisix/plugins/grpc-transcode/request.lua
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,12 @@ return function (proto, service, method, pb_option, deadline, default_values)

req_read_body()

local pb_old_state = pb.state(proto.pb_state)
util.set_options(proto, pb_option)

local map_message = util.map_message(m.input_type, default_values or {})
local ok, encoded = pcall(pb.encode, m.input_type, map_message)
pb.state(pb_old_state)

if not ok or not encoded then
return false, "failed to encode request data to protobuf", 400
Expand Down
8 changes: 6 additions & 2 deletions apisix/plugins/grpc-transcode/response.lua
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ local ipairs = ipairs
local pcall = pcall


local function handle_error_response(status_detail_type)
local function handle_error_response(status_detail_type, proto)
local err_msg

local grpc_status = ngx.header["grpc-status-details-bin"]
Expand Down Expand Up @@ -58,7 +58,9 @@ local function handle_error_response(status_detail_type)
if status_detail_type and details then
local decoded_details = {}
for _, detail in ipairs(details) do
local pb_old_state = pb.state(proto.pb_state)
local ok, err_or_value = pcall(pb.decode, status_detail_type, detail.value)
pb.state(pb_old_state)
if not ok then
err_msg = "failed to call pb.decode to decode details in "
.. "grpc-status-details-bin"
Expand Down Expand Up @@ -99,7 +101,7 @@ return function(ctx, proto, service, method, pb_option, show_status_in_body, sta

-- handle error response after the last response chunk
if ngx.status >= 300 and show_status_in_body then
return handle_error_response(status_detail_type)
return handle_error_response(status_detail_type, proto)
end

-- when body has already been read by other plugin
Expand All @@ -118,10 +120,12 @@ return function(ctx, proto, service, method, pb_option, show_status_in_body, sta
buffer = string.sub(buffer, 6)
end

local pb_old_state = pb.state(proto.pb_state)
util.set_options(proto, pb_option)

local err_msg
local decoded = pb.decode(m.output_type, buffer)
pb.state(pb_old_state)
if not decoded then
err_msg = "failed to decode response data by protobuf"
ngx.arg[1] = err_msg
Expand Down
2 changes: 0 additions & 2 deletions apisix/plugins/grpc-transcode/util.lua
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,6 @@ function _M.find_method(proto, service, method)
return nil
end

-- restore pb state
pb.state(proto.pb_state)
return res
end

Expand Down
192 changes: 192 additions & 0 deletions t/plugin/opentelemetry-bugfix-pb-state.t
Original file line number Diff line number Diff line change
@@ -0,0 +1,192 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

use t::APISIX 'no_plan';

add_block_preprocessor(sub {
my ($block) = @_;

if (!$block->extra_yaml_config) {
my $extra_yaml_config = <<_EOC_;
plugins:
- example-plugin
- key-auth
- opentelemetry
plugin_attr:
opentelemetry:
batch_span_processor:
max_export_batch_size: 1
inactive_timeout: 0.5
_EOC_
$block->set_value("extra_yaml_config", $extra_yaml_config);
}


if (!$block->extra_init_by_lua) {
my $extra_init_by_lua = <<_EOC_;
-- mock exporter http client
local client = require("opentelemetry.trace.exporter.http_client")
client.do_request = function()
ngx.log(ngx.INFO, "opentelemetry export span")
return "ok"
end
local ctx_new = require("opentelemetry.context").new
require("opentelemetry.context").new = function (...)
local ctx = ctx_new(...)
local current = ctx.current
ctx.current = function (...)
ngx.log(ngx.INFO, "opentelemetry context current")
return current(...)
end
return ctx
end
_EOC_

$block->set_value("extra_init_by_lua", $extra_init_by_lua);
}

if (!$block->request) {
$block->set_value("request", "GET /t");
}

$block;
});

run_tests;

__DATA__
=== TEST 1: set additional_attributes with match
--- config
location /t {
content_by_lua_block {
local t = require("lib.test_admin").test
local code, body = t('/apisix/admin/routes/1',
ngx.HTTP_PUT,
[[{
"name": "route_name",
"plugins": {
"opentelemetry": {
"sampler": {
"name": "always_on"
},
"additional_header_prefix_attributes": [
"x-my-header-*"
]
}
},
"upstream": {
"nodes": {
"127.0.0.1:1980": 1
},
"type": "roundrobin"
},
"uri": "/hello"
}]]
)
if code >= 300 then
ngx.status = code
end
ngx.say(body)
}
}
--- response_body
passed
=== TEST 2: opentelemetry expands headers
--- extra_init_by_lua
local otlp = require("opentelemetry.trace.exporter.otlp")
local orig_export_spans = otlp.export_spans
otlp.export_spans = function(self, spans)
if (#spans ~= 1) then
ngx.log(ngx.ERR, "unexpected spans length: ", #spans)
return
end
local attributes_names = {}
local attributes = {}
local span = spans[1]
for _, attribute in ipairs(span.attributes) do
if attribute.key == "hostname" then
-- remove any randomness
goto skip
end
table.insert(attributes_names, attribute.key)
attributes[attribute.key] = attribute.value.string_value or ""
::skip::
end
table.sort(attributes_names)
for _, attribute in ipairs(attributes_names) do
ngx.log(ngx.INFO, "attribute " .. attribute .. ": \"" .. attributes[attribute] .. "\"")
end
ngx.log(ngx.INFO, "opentelemetry export span")
return orig_export_spans(self, spans)
end
--- config
location /t {
content_by_lua_block {
local t = require("lib.test_admin").test
local code, body = t('/apisix/admin/protos/1',
ngx.HTTP_PUT,
[[{
"content" : "syntax = \"proto3\";
package helloworld;
service Greeter {
rpc SayHello (HelloRequest) returns (HelloReply) {}
}
message HelloRequest {
string name = 1;
}
message HelloReply {
string message = 1;
}"
}]]
)
if code >= 300 then
ngx.status = code
end
local http = require "resty.http"
local httpc = http.new()
local uri1 = "http://127.0.0.1:" .. ngx.var.server_port .. "/hello"
local headers = {
["x-my-header-name"] = "william",
["x-my-header-nick"] = "bill",
}
local res, err = httpc:request_uri(uri1, {method = "GET", headers = headers})
if not res then
ngx.say(err)
return
end
ngx.status = res.status
}
}
--- wait: 1
--- error_code: 200
--- no_error_log
type 'opentelemetry.proto.trace.v1.TracesData' does not exists
--- grep_error_log eval
qr/attribute .+?:.[^,]*/
--- grep_error_log_out
attribute route: "route_name"
attribute service: ""
attribute x-my-header-name: "william"
attribute x-my-header-nick: "bill"

0 comments on commit 7ae3031

Please sign in to comment.