diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index c90a8e90082d..9b0b4b5da8a9 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -136,7 +136,7 @@ jobs: [[ ${{ steps.test_env.outputs.type }} != first ]] && sudo ./ci/init-${{ steps.test_env.outputs.type }}-test-service.sh after echo "Linux launch services, done." - name: Start Dubbo Backend - if: matrix.os_name == 'linux_openresty' && steps.test_env.outputs.type == 'plugin' + if: matrix.os_name == 'linux_openresty' && (steps.test_env.outputs.type == 'plugin' || steps.test_env.outputs.type == 'last') run: | sudo apt install -y maven cd t/lib/dubbo-backend diff --git a/Makefile b/Makefile index 52dd4b8264fa..d01cb6f6608e 100644 --- a/Makefile +++ b/Makefile @@ -382,6 +382,9 @@ install: runtime $(ENV_INSTALL) -d $(ENV_INST_LUADIR)/apisix/stream/xrpc/protocols/redis $(ENV_INSTALL) apisix/stream/xrpc/protocols/redis/*.lua $(ENV_INST_LUADIR)/apisix/stream/xrpc/protocols/redis/ + $(ENV_INSTALL) -d $(ENV_INST_LUADIR)/apisix/stream/xrpc/protocols/dubbo + $(ENV_INSTALL) apisix/stream/xrpc/protocols/dubbo/*.lua $(ENV_INST_LUADIR)/apisix/stream/xrpc/protocols/dubbo/ + $(ENV_INSTALL) -d $(ENV_INST_LUADIR)/apisix/utils $(ENV_INSTALL) apisix/utils/*.lua $(ENV_INST_LUADIR)/apisix/utils/ diff --git a/apisix/stream/xrpc/protocols/dubbo/init.lua b/apisix/stream/xrpc/protocols/dubbo/init.lua new file mode 100644 index 000000000000..19160d6c544e --- /dev/null +++ b/apisix/stream/xrpc/protocols/dubbo/init.lua @@ -0,0 +1,231 @@ +-- +-- Licensed to the Apache Software Foundation (ASF) under one or more +-- contributor license agreements. See the NOTICE file distributed with +-- this work for additional information regarding copyright ownership. +-- The ASF licenses this file to You under the Apache License, Version 2.0 +-- (the "License"); you may not use this file except in compliance with +-- the License. You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, software +-- distributed under the License is distributed on an "AS IS" BASIS, +-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +-- See the License for the specific language governing permissions and +-- limitations under the License. +-- +local core = require("apisix.core") +local sdk = require("apisix.stream.xrpc.sdk") +local xrpc_socket = require("resty.apisix.stream.xrpc.socket") +local math_random = math.random +local ngx = ngx +local OK = ngx.OK +local str_format = string.format +local DECLINED = ngx.DECLINED +local DONE = ngx.DONE +local bit = require("bit") +local ffi = require("ffi") +local ffi_str = ffi.string + + +-- dubbo protocol spec: https://cn.dubbo.apache.org/zh-cn/overview/reference/protocols/tcp/ +local header_len = 16 +local _M = {} + + +function _M.init_downstream(session) + session.req_id_seq = 0 + session.resp_id_seq = 0 + session.cmd_labels = { session.route.id, "" } + return xrpc_socket.downstream.socket() +end + + +local function parse_dubbo_header(header) + for i = 1, header_len do + local currentByte = header:byte(i) + if not currentByte then + return nil + end + end + + local magic_number = str_format("%04x", header:byte(1) * 256 + header:byte(2)) + local message_flag = header:byte(3) + local status = header:byte(4) + local request_id = 0 + for i = 5, 12 do + request_id = request_id * 256 + header:byte(i) + end + + local byte13Val = header:byte(13) * 256 * 256 * 256 + local byte14Val = header:byte(14) * 256 * 256 + local data_length = byte13Val + byte14Val + header:byte(15) * 256 + header:byte(16) + + local is_request = bit.band(bit.rshift(message_flag, 7), 0x01) == 1 and 1 or 0 + local is_two_way = bit.band(bit.rshift(message_flag, 6), 0x01) == 1 and 1 or 0 + local is_event = bit.band(bit.rshift(message_flag, 5), 0x01) == 1 and 1 or 0 + + return { + magic_number = magic_number, + message_flag = message_flag, + is_request = is_request, + is_two_way = is_two_way, + is_event = is_event, + status = status, + request_id = request_id, + data_length = data_length + } +end + + +local function read_data(sk, is_req) + local header_data, err = sk:read(header_len) + if not header_data then + return nil, err, false + end + + local header_str = ffi_str(header_data, header_len) + local header_info = parse_dubbo_header(header_str) + if not header_info then + return nil, "header insufficient", false + end + + local is_valid_magic_number = header_info.magic_number == "dabb" + if not is_valid_magic_number then + return nil, str_format("unknown magic number: \"%s\"", header_info.magic_number), false + end + + local body_data, err = sk:read(header_info.data_length) + if not body_data then + core.log.error("failed to read dubbo request body") + return nil, err, false + end + + local ctx = ngx.ctx + ctx.dubbo_serialization_id = bit.band(header_info.message_flag, 0x1F) + + if is_req then + ctx.dubbo_req_body_data = body_data + else + ctx.dubbo_rsp_body_data = body_data + end + + return true, nil, false +end + + +local function read_req(sk) + return read_data(sk, true) +end + + +local function read_reply(sk) + return read_data(sk, false) +end + + +local function handle_reply(session, sk) + local ok, err = read_reply(sk) + if not ok then + return nil, err + end + + local ctx = sdk.get_req_ctx(session, 10) + + return ctx +end + + +function _M.from_downstream(session, downstream) + local read_pipeline = false + session.req_id_seq = session.req_id_seq + 1 + local ctx = sdk.get_req_ctx(session, session.req_id_seq) + session._downstream_ctx = ctx + while true do + local ok, err, pipelined = read_req(downstream) + if not ok then + if err ~= "timeout" and err ~= "closed" then + core.log.error("failed to read request: ", err) + end + + if read_pipeline and err == "timeout" then + break + end + + return DECLINED + end + + if not pipelined then + break + end + + if not read_pipeline then + read_pipeline = true + -- set minimal read timeout to read pipelined data + downstream:settimeouts(0, 0, 1) + end + end + + if read_pipeline then + -- set timeout back + downstream:settimeouts(0, 0, 0) + end + + return OK, ctx +end + + +function _M.connect_upstream(session, ctx) + local conf = session.upstream_conf + local nodes = conf.nodes + if #nodes == 0 then + core.log.error("failed to connect: no nodes") + return DECLINED + end + + local node = nodes[math_random(#nodes)] + local sk = sdk.connect_upstream(node, conf) + if not sk then + return DECLINED + end + + core.log.debug("dubbo_connect_upstream end") + + return OK, sk +end + +function _M.disconnect_upstream(session, upstream) + sdk.disconnect_upstream(upstream, session.upstream_conf) +end + + +function _M.to_upstream(session, ctx, downstream, upstream) + local ok, _ = upstream:move(downstream) + if not ok then + return DECLINED + end + + return OK +end + + +function _M.from_upstream(session, downstream, upstream) + local ctx,err = handle_reply(session, upstream) + if err then + return DECLINED + end + + local ok, _ = downstream:move(upstream) + if not ok then + return DECLINED + end + + return DONE, ctx +end + + +function _M.log(_, _) +end + + +return _M diff --git a/apisix/stream/xrpc/protocols/dubbo/schema.lua b/apisix/stream/xrpc/protocols/dubbo/schema.lua new file mode 100644 index 000000000000..3a9d73325498 --- /dev/null +++ b/apisix/stream/xrpc/protocols/dubbo/schema.lua @@ -0,0 +1,32 @@ +-- +-- Licensed to the Apache Software Foundation (ASF) under one or more +-- contributor license agreements. See the NOTICE file distributed with +-- this work for additional information regarding copyright ownership. +-- The ASF licenses this file to You under the Apache License, Version 2.0 +-- (the "License"); you may not use this file except in compliance with +-- the License. You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, software +-- distributed under the License is distributed on an "AS IS" BASIS, +-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +-- See the License for the specific language governing permissions and +-- limitations under the License. +-- +local core = require("apisix.core") + + +local schema = { + type = "object", +} + +local _M = {} + + +function _M.check_schema(conf) + return core.schema.check(schema, conf) +end + + +return _M diff --git a/t/xrpc/dubbo.t b/t/xrpc/dubbo.t new file mode 100644 index 000000000000..290eadb3c6fa --- /dev/null +++ b/t/xrpc/dubbo.t @@ -0,0 +1,168 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +use t::APISIX; + +my $nginx_binary = $ENV{'TEST_NGINX_BINARY'} || 'nginx'; +my $version = eval { `$nginx_binary -V 2>&1` }; + +if ($version !~ m/\/apisix-nginx-module/) { + plan(skip_all => "apisix-nginx-module not installed"); +} else { + plan('no_plan'); +} +add_block_preprocessor(sub { + my ($block) = @_; + + if (!$block->extra_yaml_config) { + my $extra_yaml_config = <<_EOC_; +xrpc: + protocols: + - name: dubbo +_EOC_ + $block->set_value("extra_yaml_config", $extra_yaml_config); + } + + my $config = $block->config // <<_EOC_; + location /t { + content_by_lua_block { + ngx.req.read_body() + local sock = ngx.socket.tcp() + sock:settimeout(1000) + local ok, err = sock:connect("127.0.0.1", 1985) + if not ok then + ngx.log(ngx.ERR, "failed to connect: ", err) + return ngx.exit(503) + end + + local bytes, err = sock:send(ngx.req.get_body_data()) + if not bytes then + ngx.log(ngx.ERR, "send stream request error: ", err) + return ngx.exit(503) + end + while true do + local data, err = sock:receiveany(4096) + if not data then + sock:close() + break + end + ngx.print(data) + end + } + } +_EOC_ + + $block->set_value("config", $config); + + if ((!defined $block->error_log) && (!defined $block->no_error_log)) { + $block->set_value("no_error_log", "[error]\nRPC is not finished"); + } + + $block; +}); + +worker_connections(1024); +run_tests; + +__DATA__ + +=== TEST 1: init +--- config + location /t { + content_by_lua_block { + local t = require("lib.test_admin").test + local code, body = t('/apisix/admin/stream_routes/1', + ngx.HTTP_PUT, + { + protocol = { + name = "dubbo" + }, + upstream = { + nodes = { + ["127.0.0.1:20880"] = 1 + }, + type = "roundrobin" + } + } + ) + if code >= 300 then + ngx.status = code + end + ngx.say(body) + } + } +--- request +GET /t +--- response_body +passed + + + +=== TEST 2: use dubbo_backend_provider server. request=org.apache.dubbo.backend.DemoService,service_version:1.0.1#hello,response=dubbo success & 200 +--- request eval +"GET /t +\xda\xbb\xc2\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xef\x05\x32\x2e\x30\x2e\x32\x30\x24\x6f\x72\x67\x2e\x61\x70\x61\x63\x68\x65\x2e\x64\x75\x62\x62\x6f\x2e\x62\x61\x63\x6b\x65\x6e\x64\x2e\x44\x65\x6d\x6f\x53\x65\x72\x76\x69\x63\x65\x05\x31\x2e\x30\x2e\x30\x05\x68\x65\x6c\x6c\x6f\x0f\x4c\x6a\x61\x76\x61\x2f\x75\x74\x69\x6c\x2f\x4d\x61\x70\x3b\x48\x04\x6e\x61\x6d\x65\x08\x7a\x68\x61\x6e\x67\x73\x61\x6e\x5a\x48\x04\x70\x61\x74\x68\x30\x24\x6f\x72\x67\x2e\x61\x70\x61\x63\x68\x65\x2e\x64\x75\x62\x62\x6f\x2e\x62\x61\x63\x6b\x65\x6e\x64\x2e\x44\x65\x6d\x6f\x53\x65\x72\x76\x69\x63\x65\x12\x72\x65\x6d\x6f\x74\x65\x2e\x61\x70\x70\x6c\x69\x63\x61\x74\x69\x6f\x6e\x0b\x73\x70\x2d\x63\x6f\x6e\x73\x75\x6d\x65\x72\x09\x69\x6e\x74\x65\x72\x66\x61\x63\x65\x30\x24\x6f\x72\x67\x2e\x61\x70\x61\x63\x68\x65\x2e\x64\x75\x62\x62\x6f\x2e\x62\x61\x63\x6b\x65\x6e\x64\x2e\x44\x65\x6d\x6f\x53\x65\x72\x76\x69\x63\x65\x07\x76\x65\x72\x73\x69\x6f\x6e\x05\x31\x2e\x30\x2e\x30\x07\x74\x69\x6d\x65\x6f\x75\x74\x04\x31\x30\x30\x30\x5a" +--- response_body eval +"\xda\xbb\x02\x14\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x30\x94\x48\x04\x62\x6f\x64\x79\x0e\x64\x75\x62\x62\x6f\x20\x73\x75\x63\x63\x65\x73\x73\x0a\x06\x73\x74\x61\x74\x75\x73\x03\x32\x30\x30\x5a\x48\x05\x64\x75\x62\x62\x6f\x05\x32\x2e\x30\x2e\x32\x5a" +--- stream_conf_enable +--- log_level: debug +--- no_error_log + + + +=== TEST 3: heart beat. request=\xe2|11..,response=\x22|00... +--- request eval +"GET /t +\xda\xbb\xe2\x00\x00\x00\x00\x00\x00\x00\x00\x34\x00\x00\x00\x01\x4e" +--- response_body eval +"\xda\xbb\x22\x14\x00\x00\x00\x00\x00\x00\x00\x34\x00\x00\x00\x01\x4e" +--- stream_conf_enable +--- log_level: debug +--- no_error_log + + + +=== TEST 4: no response. Different from test2 \x82=10000010, the second bit=0 of the third byte means no need to return +--- request eval +"GET /t +\xda\xbb\x82\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xef\x05\x32\x2e\x30\x2e\x32\x30\x24\x6f\x72\x67\x2e\x61\x70\x61\x63\x68\x65\x2e\x64\x75\x62\x62\x6f\x2e\x62\x61\x63\x6b\x65\x6e\x64\x2e\x44\x65\x6d\x6f\x53\x65\x72\x76\x69\x63\x65\x05\x31\x2e\x30\x2e\x30\x05\x68\x65\x6c\x6c\x6f\x0f\x4c\x6a\x61\x76\x61\x2f\x75\x74\x69\x6c\x2f\x4d\x61\x70\x3b\x48\x04\x6e\x61\x6d\x65\x08\x7a\x68\x61\x6e\x67\x73\x61\x6e\x5a\x48\x04\x70\x61\x74\x68\x30\x24\x6f\x72\x67\x2e\x61\x70\x61\x63\x68\x65\x2e\x64\x75\x62\x62\x6f\x2e\x62\x61\x63\x6b\x65\x6e\x64\x2e\x44\x65\x6d\x6f\x53\x65\x72\x76\x69\x63\x65\x12\x72\x65\x6d\x6f\x74\x65\x2e\x61\x70\x70\x6c\x69\x63\x61\x74\x69\x6f\x6e\x0b\x73\x70\x2d\x63\x6f\x6e\x73\x75\x6d\x65\x72\x09\x69\x6e\x74\x65\x72\x66\x61\x63\x65\x30\x24\x6f\x72\x67\x2e\x61\x70\x61\x63\x68\x65\x2e\x64\x75\x62\x62\x6f\x2e\x62\x61\x63\x6b\x65\x6e\x64\x2e\x44\x65\x6d\x6f\x53\x65\x72\x76\x69\x63\x65\x07\x76\x65\x72\x73\x69\x6f\x6e\x05\x31\x2e\x30\x2e\x30\x07\x74\x69\x6d\x65\x6f\x75\x74\x04\x31\x30\x30\x30\x5a" +--- response_body eval +"" +--- stream_conf_enable +--- log_level: debug +--- no_error_log + + + +=== TEST 5: failed response. request=org.apache.dubbo.backend.DemoService,service_version:1.0.1#fail,response=503 +--- request eval +"GET /t +\xda\xbb\xc2\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xee\x05\x32\x2e\x30\x2e\x32\x30\x24\x6f\x72\x67\x2e\x61\x70\x61\x63\x68\x65\x2e\x64\x75\x62\x62\x6f\x2e\x62\x61\x63\x6b\x65\x6e\x64\x2e\x44\x65\x6d\x6f\x53\x65\x72\x76\x69\x63\x65\x05\x31\x2e\x30\x2e\x30\x04\x66\x61\x69\x6c\x0f\x4c\x6a\x61\x76\x61\x2f\x75\x74\x69\x6c\x2f\x4d\x61\x70\x3b\x48\x04\x6e\x61\x6d\x65\x08\x7a\x68\x61\x6e\x67\x73\x61\x6e\x5a\x48\x04\x70\x61\x74\x68\x30\x24\x6f\x72\x67\x2e\x61\x70\x61\x63\x68\x65\x2e\x64\x75\x62\x62\x6f\x2e\x62\x61\x63\x6b\x65\x6e\x64\x2e\x44\x65\x6d\x6f\x53\x65\x72\x76\x69\x63\x65\x12\x72\x65\x6d\x6f\x74\x65\x2e\x61\x70\x70\x6c\x69\x63\x61\x74\x69\x6f\x6e\x0b\x73\x70\x2d\x63\x6f\x6e\x73\x75\x6d\x65\x72\x09\x69\x6e\x74\x65\x72\x66\x61\x63\x65\x30\x24\x6f\x72\x67\x2e\x61\x70\x61\x63\x68\x65\x2e\x64\x75\x62\x62\x6f\x2e\x62\x61\x63\x6b\x65\x6e\x64\x2e\x44\x65\x6d\x6f\x53\x65\x72\x76\x69\x63\x65\x07\x76\x65\x72\x73\x69\x6f\x6e\x05\x31\x2e\x30\x2e\x30\x07\x74\x69\x6d\x65\x6f\x75\x74\x04\x31\x30\x30\x30\x5a" +--- response_body eval +"\xda\xbb\x02\x14\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x2d\x94\x48\x04\x62\x6f\x64\x79\x0b\x64\x75\x62\x62\x6f\x20\x66\x61\x69\x6c\x0a\x06\x73\x74\x61\x74\x75\x73\x03\x35\x30\x33\x5a\x48\x05\x64\x75\x62\x62\x6f\x05\x32\x2e\x30\x2e\x32\x5a" +--- stream_conf_enable +--- log_level: debug +--- no_error_log + + + +=== TEST 6: invalid magic(dabc<>dabb) for heart beat. +--- request eval +"GET /t +\xda\xbc\xe2\x00\x00\x00\x00\x00\x00\x00\x00\x34\x00\x00\x00\x01\x4e" +--- error_log +unknown magic number +--- stream_conf_enable