Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add dubbo protocols support for xrpc #9660

Merged
merged 35 commits into from
Sep 13, 2023
Merged
Show file tree
Hide file tree
Changes from 31 commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
028e2e6
init dubbo protocols
wxbty Jun 14, 2023
21fbb30
add testcase
wxbty Jul 5, 2023
9c9861d
add testcase
wxbty Jul 5, 2023
e6852f5
remove unused ffi.string
wxbty Jul 10, 2023
b63d9f1
Update apisix/stream/xrpc/protocols/dubbo/init.lua
wxbty Jul 10, 2023
599207e
not need log
wxbty Jul 10, 2023
1b4d17b
Update apisix/stream/xrpc/protocols/dubbo/init.lua
wxbty Jul 10, 2023
aeb5242
add comment
wxbty Jul 10, 2023
5af4f14
Merge branch 'feature_dubbo_protocol' of https://github.com/wxbty/api…
wxbty Jul 10, 2023
554ed8d
add dubbo in makefile
wxbty Jul 14, 2023
440ab5d
add heartbeat testcase
wxbty Jul 14, 2023
051322a
add no respnse testcase
wxbty Jul 14, 2023
e241d73
add status 503 testcase
wxbty Jul 14, 2023
5fb8d3c
remove unuse
wxbty Jul 17, 2023
654f32c
fix code style
wxbty Jul 18, 2023
4a3fe80
Update apisix/stream/xrpc/protocols/dubbo/init.lua
wxbty Jul 18, 2023
926a39f
Update apisix/stream/xrpc/protocols/dubbo/init.lua
wxbty Jul 18, 2023
a6cfa48
remove unuse
wxbty Jul 18, 2023
a9cfd34
use local var
wxbty Jul 19, 2023
66e7f24
fix build
wxbty Jul 21, 2023
d3d8d4e
add invalid request
wxbty Jul 27, 2023
050048a
revert some
wxbty Jul 27, 2023
810f865
flush
wxbty Aug 1, 2023
ea3aff4
add xrpc in centos-ci&redhad-ci
wxbty Aug 2, 2023
ff47382
rename
wxbty Aug 3, 2023
a51825c
blank num
wxbty Aug 8, 2023
1d55b58
use var port
wxbty Aug 8, 2023
0e03ff4
use last output
wxbty Aug 8, 2023
bd8b253
Merge remote-tracking branch 'upstream/master' into feature_dubbo_pro…
wxbty Aug 8, 2023
ac07506
fix blank
wxbty Aug 8, 2023
eedf856
fix []
wxbty Aug 8, 2023
d8f6b11
fix port 1985
wxbty Sep 5, 2023
61fff04
Merge branch 'master' into feature_dubbo_protocol
wxbty Sep 5, 2023
fa90dae
code ipt
wxbty Sep 5, 2023
d58cc2b
ci: fix ci problem
monkeyDluffy6017 Sep 13, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ jobs:
[[ ${{ steps.test_env.outputs.type }} != first ]] && sudo ./ci/init-${{ steps.test_env.outputs.type }}-test-service.sh after
echo "Linux launch services, done."
- name: Start Dubbo Backend
if: matrix.os_name == 'linux_openresty' && steps.test_env.outputs.type == 'plugin'
if: matrix.os_name == 'linux_openresty' && [[steps.test_env.outputs.type == 'plugin' || steps.test_env.outputs.type == 'last']]
run: |
sudo apt install -y maven
cd t/lib/dubbo-backend
Expand Down
3 changes: 3 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -382,6 +382,9 @@ install: runtime
$(ENV_INSTALL) -d $(ENV_INST_LUADIR)/apisix/stream/xrpc/protocols/redis
$(ENV_INSTALL) apisix/stream/xrpc/protocols/redis/*.lua $(ENV_INST_LUADIR)/apisix/stream/xrpc/protocols/redis/

$(ENV_INSTALL) -d $(ENV_INST_LUADIR)/apisix/stream/xrpc/protocols/dubbo
$(ENV_INSTALL) apisix/stream/xrpc/protocols/dubbo/*.lua $(ENV_INST_LUADIR)/apisix/stream/xrpc/protocols/dubbo/

$(ENV_INSTALL) -d $(ENV_INST_LUADIR)/apisix/utils
$(ENV_INSTALL) apisix/utils/*.lua $(ENV_INST_LUADIR)/apisix/utils/

Expand Down
231 changes: 231 additions & 0 deletions apisix/stream/xrpc/protocols/dubbo/init.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,231 @@
--
-- Licensed to the Apache Software Foundation (ASF) under one or more
-- contributor license agreements. See the NOTICE file distributed with
-- this work for additional information regarding copyright ownership.
-- The ASF licenses this file to You under the Apache License, Version 2.0
-- (the "License"); you may not use this file except in compliance with
-- the License. You may obtain a copy of the License at
--
-- http://www.apache.org/licenses/LICENSE-2.0
--
-- Unless required by applicable law or agreed to in writing, software
-- distributed under the License is distributed on an "AS IS" BASIS,
-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-- See the License for the specific language governing permissions and
-- limitations under the License.
--
local core = require("apisix.core")
local sdk = require("apisix.stream.xrpc.sdk")
local xrpc_socket = require("resty.apisix.stream.xrpc.socket")
local math_random = math.random
local ngx = ngx
local OK = ngx.OK
local str_format = string.format
local DECLINED = ngx.DECLINED
local DONE = ngx.DONE
local bit = require("bit")
local ffi = require("ffi")
local ffi_str = ffi.string


-- dubbo protocol spec: https://cn.dubbo.apache.org/zh-cn/overview/reference/protocols/tcp/
local header_len = 16
local _M = {}


function _M.init_downstream(session)
session.req_id_seq = 0
session.resp_id_seq = 0
session.cmd_labels = { session.route.id, "" }
return xrpc_socket.downstream.socket()
end

wxbty marked this conversation as resolved.
Show resolved Hide resolved

local function parse_dubbo_header(header)
for i = 1, header_len do
local currentByte = header:byte(i)
if not currentByte then
return nil
end
end

local magic_number = str_format("%04x", header:byte(1) * 256 + header:byte(2))
local message_flag = header:byte(3)
wxbty marked this conversation as resolved.
Show resolved Hide resolved
local status = header:byte(4)
local request_id = 0
for i = 5, 12 do
request_id = request_id * 256 + header:byte(i)
end

local byte13Val = header:byte(13) * 256 * 256 * 256
wxbty marked this conversation as resolved.
Show resolved Hide resolved
local byte14Val = header:byte(14) * 256 * 256
local data_length = byte13Val + byte14Val + header:byte(15) * 256 + header:byte(16)

local is_request = bit.band(bit.rshift(message_flag, 7), 0x01) == 1 and 1 or 0
wxbty marked this conversation as resolved.
Show resolved Hide resolved
local is_two_way = bit.band(bit.rshift(message_flag, 6), 0x01) == 1 and 1 or 0
local is_event = bit.band(bit.rshift(message_flag, 5), 0x01) == 1 and 1 or 0

return {
magic_number = magic_number,
message_flag = message_flag,
is_request = is_request,
is_two_way = is_two_way,
is_event = is_event,
status = status,
request_id = request_id,
data_length = data_length
}
end


local function read_data(sk, is_req)
local header_data, err = sk:read(header_len)
if not header_data then
core.log.error("failed to read dubbo request header: ", err)
return
end

local header_str = ffi_str(header_data, header_len)
wxbty marked this conversation as resolved.
Show resolved Hide resolved
local header_info = parse_dubbo_header(header_str)
if not header_info then
return nil, "header insufficient", false
end

local is_valid_magic_number = header_info.magic_number == "dabb"
if not is_valid_magic_number then
return nil, str_format("unknown magic number: \"%s\"", header_info.magic_number), false
end

local body_data, err = sk:read(header_info.data_length)
if not body_data then
core.log.error("failed to read dubbo request body")
return nil, err, false
end

local ctx = ngx.ctx
ctx.dubbo_serialization_id = bit.band(header_info.message_flag, 0x1F)

if is_req then
ctx.dubbo_req_body_data = body_data
else
ctx.dubbo_rsp_body_data = body_data
end

return true, nil, false
end


local function read_req(sk)
return read_data(sk, true)
end


local function read_reply(sk)
return read_data(sk, false)
end


local function handle_reply(session, sk)
local ok, err = read_reply(sk)
if not ok then
return nil, err
end

local ctx = sdk.get_req_ctx(session, 10)

return ctx
end


function _M.from_downstream(session, downstream)
local read_pipeline = false
session.req_id_seq = session.req_id_seq + 1
local ctx = sdk.get_req_ctx(session, session.req_id_seq)
session._downstream_ctx = ctx
while true do
local ok, err, pipelined = read_req(downstream)
if not ok then
if err ~= "timeout" and err ~= "closed" then
core.log.error("failed to read request: ", err)
end

if read_pipeline and err == "timeout" then
break
end

return DECLINED
end

if not pipelined then
break
end

if not read_pipeline then
read_pipeline = true
-- set minimal read timeout to read pipelined data
downstream:settimeouts(0, 0, 1)
end
end

if read_pipeline then
-- set timeout back
downstream:settimeouts(0, 0, 0)
end

return OK, ctx
end


function _M.connect_upstream(session, ctx)
local conf = session.upstream_conf
local nodes = conf.nodes
if #nodes == 0 then
core.log.error("failed to connect: no nodes")
return DECLINED
end

local node = nodes[math_random(#nodes)]
local sk = sdk.connect_upstream(node, conf)
if not sk then
return DECLINED
end

core.log.debug("dubbo_connect_upstream end")

return OK, sk
end

function _M.disconnect_upstream(session, upstream)
sdk.disconnect_upstream(upstream, session.upstream_conf)
end


function _M.to_upstream(session, ctx, downstream, upstream)
local ok, err = upstream:move(downstream)
if not ok then
core.log.error("failed to send to upstream: ", err)
return DECLINED
end

return OK
end


function _M.from_upstream(session, downstream, upstream)
local ctx = handle_reply(session, upstream)

local ok, err = downstream:move(upstream)
if not ok then
core.log.error("failed to handle upstream: ", err)
return DECLINED
end

return DONE, ctx
end


function _M.log(_, _)
end


return _M
32 changes: 32 additions & 0 deletions apisix/stream/xrpc/protocols/dubbo/schema.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
--
-- Licensed to the Apache Software Foundation (ASF) under one or more
-- contributor license agreements. See the NOTICE file distributed with
-- this work for additional information regarding copyright ownership.
-- The ASF licenses this file to You under the Apache License, Version 2.0
-- (the "License"); you may not use this file except in compliance with
-- the License. You may obtain a copy of the License at
--
-- http://www.apache.org/licenses/LICENSE-2.0
--
-- Unless required by applicable law or agreed to in writing, software
-- distributed under the License is distributed on an "AS IS" BASIS,
-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-- See the License for the specific language governing permissions and
-- limitations under the License.
--
local core = require("apisix.core")


local schema = {
type = "object",
}

local _M = {}


function _M.check_schema(conf)
return core.schema.check(schema, conf)
end


return _M
Loading