diff --git a/apisix/schema_def.lua b/apisix/schema_def.lua index 96cc11eaa715..fbeef89bb17c 100644 --- a/apisix/schema_def.lua +++ b/apisix/schema_def.lua @@ -807,6 +807,28 @@ local xrpc_protocol_schema = { description = "protocol-specific configuration", type = "object", }, + logger = { + type = "array", + items = { + properties = { + name = { + type = "string", + }, + filter = { + description = "logger filter rules", + type = "array", + }, + conf = { + description = "logger plugin configuration", + type = "object", + }, + }, + dependencies = { + name = {"conf"}, + }, + }, + }, + }, required = {"name"} } diff --git a/apisix/stream/xrpc/runner.lua b/apisix/stream/xrpc/runner.lua index ea807f60a181..22cb62d03d02 100644 --- a/apisix/stream/xrpc/runner.lua +++ b/apisix/stream/xrpc/runner.lua @@ -14,14 +14,22 @@ -- See the License for the specific language governing permissions and -- limitations under the License. -- +local require = require local core = require("apisix.core") +local expr = require("resty.expr.v1") local pairs = pairs local ngx = ngx local ngx_now = ngx.now local OK = ngx.OK local DECLINED = ngx.DECLINED local DONE = ngx.DONE +local pcall = pcall +local ipairs = ipairs +local tostring = tostring +local logger_expr_cache = core.lrucache.new({ + ttl = 300, count = 1024 +}) local _M = {} @@ -71,9 +79,56 @@ local function put_req_ctx(session, ctx) end +local function filter_logger(ctx, logger) + if not logger then + return false + end + + if not logger.filter or #logger.filter == 0 then + -- no valid filter, default execution plugin + return true + end + + local version = tostring(logger.filter) + local filter_expr, err = logger_expr_cache(ctx.conf_id, version, expr.new, logger.filter) + if not filter_expr or err then + core.log.error("failed to validate the 'filter' expression: ", err) + return false + end + return filter_expr:eval(ctx) +end + + +local function run_log_plugin(ctx, logger) + local pkg_name = "apisix.stream.plugins." .. logger.name + local ok, plugin = pcall(require, pkg_name) + if not ok then + core.log.error("failed to load plugin [", logger.name, "] err: ", plugin) + return + end + + local log_func = plugin.log + if log_func then + log_func(logger.conf, ctx) + end +end + + local function finish_req(protocol, session, ctx) ctx._rpc_end_time = ngx_now() + local loggers = session.route.protocol.logger + if loggers and #loggers > 0 then + for _, logger in ipairs(loggers) do + ctx.conf_id = tostring(logger.conf) + local matched = filter_logger(ctx, logger) + core.log.info("log filter: ", logger.name, " filter result: ", matched) + if matched then + run_log_plugin(ctx, logger) + end + end + end + protocol.log(session, ctx) put_req_ctx(session, ctx) end diff --git a/apisix/stream/xrpc/sdk.lua b/apisix/stream/xrpc/sdk.lua index 3e3f4557ad03..eb70a4018420 100644 --- a/apisix/stream/xrpc/sdk.lua +++ b/apisix/stream/xrpc/sdk.lua @@ -108,6 +108,9 @@ function _M.get_req_ctx(session, id) local ctx = core.tablepool.fetch("xrpc_ctxs", 4, 4) -- fields start with '_' should not be accessed by the protocol implementation ctx._id = id + core.ctx.set_vars_meta(ctx) + ctx.conf_type = "xrpc-" .. session.route.protocol.name .. "-logger" + session._ctxs[id] = ctx ctx._rpc_start_time = ngx_now() diff --git a/t/xds-library/config_xds_2.t b/t/xds-library/config_xds_2.t index 85f9e0de3e75..67629d4bcbed 100644 --- a/t/xds-library/config_xds_2.t +++ b/t/xds-library/config_xds_2.t @@ -205,8 +205,6 @@ decode the conf of [/routes/3] failed, err: Expected object key string but found } } local data_str = core.json.encode(data) - ngx.log(ngx.WARN, "data_str : ", require("inspect")(data_str)) - ngx.shared["xds-config"]:set("/routes/3", data_str) ngx.update_time() ngx.shared["xds-config-version"]:set("version", ngx.now()) diff --git a/t/xrpc/apisix/stream/xrpc/protocols/pingpong/init.lua b/t/xrpc/apisix/stream/xrpc/protocols/pingpong/init.lua index 1487044ff5af..212cd6302bef 100644 --- a/t/xrpc/apisix/stream/xrpc/protocols/pingpong/init.lua +++ b/t/xrpc/apisix/stream/xrpc/protocols/pingpong/init.lua @@ -160,6 +160,7 @@ function _M.from_downstream(session, downstream) if typ == TYPE_UNARY_DYN_UP then ctx.len = ctx.len + 4 end + return OK, ctx end diff --git a/t/xrpc/pingpong2.t b/t/xrpc/pingpong2.t index 3c7152c5b6c0..93c57bd27576 100644 --- a/t/xrpc/pingpong2.t +++ b/t/xrpc/pingpong2.t @@ -86,6 +86,18 @@ _EOC_ $block->set_value("no_error_log", "[error]\nRPC is not finished"); } + if (!defined $block->extra_stream_config) { + my $stream_config = <<_EOC_; + server { + listen 8125 udp; + content_by_lua_block { + require("lib.mock_layer4").dogstatsd() + } + } +_EOC_ + $block->set_value("extra_stream_config", $stream_config); + } + $block; }); @@ -139,3 +151,604 @@ lua tcp socket send timeout: 60000 stream lua tcp socket read timeout: 60000 --- log_level: debug --- stream_conf_enable + + + +=== TEST 3: bad loggger filter +--- config + location /t { + content_by_lua_block { + local t = require("lib.test_admin").test + local code, body = t('/apisix/admin/stream_routes/1', + ngx.HTTP_PUT, + { + protocol = { + name = "pingpong", + logger = { + { + name = "syslog", + filter = { + {} + }, + conf = {} + } + } + }, + upstream = { + nodes = { + ["127.0.0.1:1995"] = 1 + }, + type = "roundrobin" + } + } + ) + if code >= 300 then + ngx.status = code + end + ngx.say(body) + } + } +--- request +GET /t +--- response_body +passed + + + +=== TEST 4: failed to validate the 'filter' expression +--- request eval +"POST /t +" . +"pp\x02\x00\x00\x00\x00\x00\x00\x03ABC" +--- stream_conf_enable +--- error_log +failed to validate the 'filter' expression: rule too short + + + +=== TEST 5: set loggger filter(single rule) +--- config + location /t { + content_by_lua_block { + local t = require("lib.test_admin").test + local code, body = t('/apisix/admin/stream_routes/1', + ngx.HTTP_PUT, + { + protocol = { + name = "pingpong", + logger = { + { + name = "syslog", + filter = { + {"len", ">", 10} + }, + conf = {} + } + } + }, + upstream = { + nodes = { + ["127.0.0.1:1995"] = 1 + }, + type = "roundrobin" + } + } + ) + if code >= 300 then + ngx.status = code + end + ngx.say(body) + } + } +--- request +GET /t +--- response_body +passed + + + +=== TEST 6: log filter matched successful +--- request eval +"POST /t +" . +"pp\x02\x00\x00\x00\x00\x00\x00\x03ABC" +--- stream_conf_enable +--- error_log +log filter: syslog filter result: true + + + +=== TEST 7: update loggger filter +--- config + location /t { + content_by_lua_block { + local t = require("lib.test_admin").test + local code, body = t('/apisix/admin/stream_routes/1', + ngx.HTTP_PUT, + { + protocol = { + name = "pingpong", + logger = { + { + name = "syslog", + filter = { + {"len", "<", 10} + }, + conf = {} + } + } + }, + upstream = { + nodes = { + ["127.0.0.1:1995"] = 1 + }, + type = "roundrobin" + } + } + ) + if code >= 300 then + ngx.status = code + end + ngx.say(body) + } + } +--- request +GET /t +--- response_body +passed + + + +=== TEST 8: failed to match log filter +--- request eval +"POST /t +" . +"pp\x02\x00\x00\x00\x00\x00\x00\x03ABC" +--- stream_conf_enable +--- error_log +log filter: syslog filter result: false + + + +=== TEST 9: set loggger filter(multiple rules) +--- config + location /t { + content_by_lua_block { + local t = require("lib.test_admin").test + local code, body = t('/apisix/admin/stream_routes/1', + ngx.HTTP_PUT, + { + protocol = { + name = "pingpong", + logger = { + { + name = "syslog", + filter = { + {"len", ">", 12}, + {"len", "<", 14} + }, + conf = {} + } + } + }, + upstream = { + nodes = { + ["127.0.0.1:1995"] = 1 + }, + type = "roundrobin" + } + } + ) + if code >= 300 then + ngx.status = code + end + ngx.say(body) + } + } +--- request +GET /t +--- response_body +passed + + + +=== TEST 10: log filter matched successful +--- request eval +"POST /t +" . +"pp\x02\x00\x00\x00\x00\x00\x00\x03ABC" +--- stream_conf_enable +--- error_log +log filter: syslog filter result: true + + + +=== TEST 11: update loggger filter +--- config + location /t { + content_by_lua_block { + local t = require("lib.test_admin").test + local code, body = t('/apisix/admin/stream_routes/1', + ngx.HTTP_PUT, + { + protocol = { + name = "pingpong", + logger = { + { + name = "syslog", + filter = { + {"len", "<", 10}, + {"len", ">", 12} + }, + conf = {} + } + } + }, + upstream = { + nodes = { + ["127.0.0.1:1995"] = 1 + }, + type = "roundrobin" + } + } + ) + if code >= 300 then + ngx.status = code + end + ngx.say(body) + } + } +--- request +GET /t +--- response_body +passed + + + +=== TEST 12: failed to match log filter +--- request eval +"POST /t +" . +"pp\x02\x00\x00\x00\x00\x00\x00\x03ABC" +--- stream_conf_enable +--- error_log +log filter: syslog filter result: false + + + +=== TEST 13: set custom log format +--- config + location /t { + content_by_lua_block { + local t = require("lib.test_admin").test + local code, body = t('/apisix/admin/plugin_metadata/syslog', + ngx.HTTP_PUT, + [[{ + "log_format": { + "client_ip": "$remote_addr" + } + }]] + ) + + if code >= 300 then + ngx.status = code + ngx.say(body) + return + end + + ngx.say(body) + } + } +--- request +GET /t +--- response_body +passed + + + +=== TEST 14: no loggger filter, defaulte executed logger plugin +--- config + location /t { + content_by_lua_block { + local t = require("lib.test_admin").test + local code, body = t('/apisix/admin/stream_routes/1', + ngx.HTTP_PUT, + { + protocol = { + name = "pingpong", + logger = { + { + name = "syslog", + conf = { + host = "127.0.0.1", + port = 8125, + sock_type = "udp", + batch_max_size = 1, + flush_limit = 1 + } + } + } + }, + upstream = { + nodes = { + ["127.0.0.1:1995"] = 1 + }, + type = "roundrobin" + } + } + ) + if code >= 300 then + ngx.status = code + end + ngx.say(body) + } + } +--- request +GET /t +--- response_body +passed + + + +=== TEST 15: verify the data received by the log server +--- request eval +"POST /t +" . +"pp\x02\x00\x00\x00\x00\x00\x00\x03ABC" +--- stream_conf_enable +--- wait: 0.5 +--- error_log eval +qr/message received:.*\"client_ip\\"\:\\"127.0.0.1\\"/ + + + +=== TEST 16: set loggger filter +--- config + location /t { + content_by_lua_block { + local t = require("lib.test_admin").test + local code, body = t('/apisix/admin/stream_routes/1', + ngx.HTTP_PUT, + { + protocol = { + name = "pingpong", + logger = { + { + name = "syslog", + filter = { + {"len", ">", 10} + }, + conf = { + host = "127.0.0.1", + port = 8125, + sock_type = "udp", + batch_max_size = 1, + flush_limit = 1 + } + } + } + }, + upstream = { + nodes = { + ["127.0.0.1:1995"] = 1 + }, + type = "roundrobin" + } + } + ) + if code >= 300 then + ngx.status = code + end + ngx.say(body) + } + } +--- request +GET /t +--- response_body +passed + + + +=== TEST 17: verify the data received by the log server +--- request eval +"POST /t +" . +"pp\x02\x00\x00\x00\x00\x00\x00\x03ABC" +--- stream_conf_enable +--- wait: 0.5 +--- error_log eval +qr/message received:.*\"client_ip\\"\:\\"127.0.0.1\\"/ + + + +=== TEST 18: small flush_limit, instant flush +--- stream_conf_enable +--- config + location /t { + content_by_lua_block { + local t = require("lib.test_admin").test + local code, body = t('/apisix/admin/stream_routes/1', + ngx.HTTP_PUT, + { + protocol = { + name = "pingpong", + logger = { + { + name = "syslog", + filter = { + {"len", ">", 10} + }, + conf = { + host = "127.0.0.1", + port = 5044, + batch_max_size = 1, + flush_limit = 1 + } + } + } + }, + upstream = { + nodes = { + ["127.0.0.1:1995"] = 1 + }, + type = "roundrobin" + } + } + ) + if code >= 300 then + ngx.status = code + end + + -- wait etcd sync + ngx.sleep(0.5) + + local sock = ngx.socket.tcp() + sock:settimeout(1000) + local ok, err = sock:connect("127.0.0.1", 1985) + if not ok then + ngx.log(ngx.ERR, "failed to connect: ", err) + return ngx.exit(503) + end + + assert(sock:send("pp\x02\x00\x00\x00\x00\x00\x00\x03ABC")) + + while true do + local data, err = sock:receiveany(4096) + if not data then + sock:close() + break + end + ngx.print(data) + end + -- wait flush log + ngx.sleep(2.5) + } + } +--- request +GET /t +--- response_body eval +"pp\x02\x00\x00\x00\x00\x00\x00\x03ABC" +--- timeout: 5 +--- error_log +try to lock with key xrpc-pingpong-logger#table +unlock with key xrpc-pingpong-logger#table + + + +=== TEST 19: check plugin configuration updating +--- stream_conf_enable +--- config + location /t { + content_by_lua_block { + local t = require("lib.test_admin").test + local code, body = t('/apisix/admin/stream_routes/1', + ngx.HTTP_PUT, + { + protocol = { + name = "pingpong", + logger = { + { + name = "syslog", + filter = { + {"len", ">", 10} + }, + conf = { + host = "127.0.0.1", + port = 5044, + batch_max_size = 1 + } + } + } + }, + upstream = { + nodes = { + ["127.0.0.1:1995"] = 1 + }, + type = "roundrobin" + } + } + ) + if code >= 300 then + ngx.status = code + ngx.say("fail") + return + end + local sock = ngx.socket.tcp() + local ok, err = sock:connect("127.0.0.1", 1985) + if not ok then + ngx.status = code + ngx.say("fail") + return + end + assert(sock:send("pp\x02\x00\x00\x00\x00\x00\x00\x03ABC")) + local body1, err + while true do + body1, err = sock:receiveany(4096) + if not data then + sock:close() + break + end + end + local code, body = t('/apisix/admin/stream_routes/1', + ngx.HTTP_PUT, + { + protocol = { + name = "pingpong", + logger = { + { + name = "syslog", + filter = { + {"len", ">", 10} + }, + conf = { + host = "127.0.0.1", + port = 5045, + batch_max_size = 1 + } + } + } + }, + upstream = { + nodes = { + ["127.0.0.1:1995"] = 1 + }, + type = "roundrobin" + } + } + ) + if code >= 300 then + ngx.status = code + ngx.say("fail") + return + end + local sock = ngx.socket.tcp() + local ok, err = sock:connect("127.0.0.1", 1985) + if not ok then + ngx.status = code + ngx.say("fail") + return + end + assert(sock:send("pp\x02\x00\x00\x00\x00\x00\x00\x03ABC")) + local body2, err + while true do + body2, err = sock:receiveany(4096) + if not data then + sock:close() + break + end + end + ngx.print(body1) + ngx.print(body2) + } + } +--- request +GET /t +--- wait: 0.5 +--- response_body eval +"pp\x02\x00\x00\x00\x00\x00\x00\x03ABC" . +"pp\x02\x00\x00\x00\x00\x00\x00\x03ABC" +--- grep_error_log eval +qr/sending a batch logs to 127.0.0.1:(\d+)/ +--- grep_error_log_out +sending a batch logs to 127.0.0.1:5044 +sending a batch logs to 127.0.0.1:5045