diff --git a/apisix/schema_def.lua b/apisix/schema_def.lua index f7b117af93da..280680729d50 100644 --- a/apisix/schema_def.lua +++ b/apisix/schema_def.lua @@ -508,7 +508,7 @@ local upstream_schema = { _M.upstream_hash_vars_schema = { type = "string", pattern = [[^((uri|server_name|server_addr|request_uri|remote_port]] - .. [[|remote_addr|query_string|host|hostname)]] + .. [[|remote_addr|query_string|host|hostname|mqtt_client_id)]] .. [[|arg_[0-9a-zA-z_-]+)$]], } diff --git a/t/admin/upstream3.t b/t/admin/upstream3.t index e40e24e99b4a..335bbfae36db 100644 --- a/t/admin/upstream3.t +++ b/t/admin/upstream3.t @@ -442,7 +442,7 @@ passed } --- error_code: 400 --- response_body -{"error_msg":"invalid configuration: failed to match pattern \"^((uri|server_name|server_addr|request_uri|remote_port|remote_addr|query_string|host|hostname)|arg_[0-9a-zA-z_-]+)$\" with \"not_support\""} +{"error_msg":"invalid configuration: failed to match pattern \"^((uri|server_name|server_addr|request_uri|remote_port|remote_addr|query_string|host|hostname|mqtt_client_id)|arg_[0-9a-zA-z_-]+)$\" with \"not_support\""} @@ -521,7 +521,7 @@ passed } --- error_code: 400 --- response_body -{"error_msg":"invalid configuration: failed to match pattern \"^((uri|server_name|server_addr|request_uri|remote_port|remote_addr|query_string|host|hostname)|arg_[0-9a-zA-z_-]+)$\" with \"not_support\""} +{"error_msg":"invalid configuration: failed to match pattern \"^((uri|server_name|server_addr|request_uri|remote_port|remote_addr|query_string|host|hostname|mqtt_client_id)|arg_[0-9a-zA-z_-]+)$\" with \"not_support\""} diff --git a/t/stream-plugin/mqtt-proxy2.t b/t/stream-plugin/mqtt-proxy2.t index 8c797a353d77..35211878e330 100644 --- a/t/stream-plugin/mqtt-proxy2.t +++ b/t/stream-plugin/mqtt-proxy2.t @@ -75,3 +75,110 @@ passed --- error_log failed to parse domain: loc, error: --- timeout: 10 + + + +=== TEST 3: set upstream(id: 1) +--- config + location /t { + content_by_lua_block { + local t = require("lib.test_admin").test + local code, body = t('/apisix/admin/upstreams/1', + ngx.HTTP_PUT, + [[{ + "type": "chash", + "key": "mqtt_client_id", + "nodes": [ + { + "host": "0.0.0.0", + "port": 1995, + "weight": 1 + }, + { + "host": "127.0.0.1", + "port": 1995, + "weight": 1 + } + ] + }]] + ) + + ngx.status = code + ngx.say(body) + } + } +--- request +GET /t +--- response_body +passed + + + +=== TEST 4: balance with mqtt_client_id +--- config + location /t { + content_by_lua_block { + local t = require("lib.test_admin").test + local code, body = t('/apisix/admin/stream_routes/1', + ngx.HTTP_PUT, + [[{ + "remote_addr": "127.0.0.1", + "server_port": 1985, + "plugins": { + "mqtt-proxy": { + "protocol_name": "MQTT", + "protocol_level": 5 + } + }, + "upstream_id": 1 + }]] + ) + + if code >= 300 then + ngx.status = code + end + ngx.say(body) + } + } +--- request +GET /t +--- response_body +passed + + + +=== TEST 5: hit route with empty id +--- stream_request eval +"\x10\x0d\x00\x04\x4d\x51\x54\x54\x05\x02\x00\x3c\x00\x00\x00" +--- stream_response +hello world +--- grep_error_log eval +qr/(mqtt client id: \w+|proxy request to \S+)/ +--- grep_error_log_out +proxy request to 127.0.0.1:1995 + + + +=== TEST 6: hit route with different client id, part 1 +--- stream_request eval +"\x10\x0e\x00\x04\x4d\x51\x54\x54\x05\x02\x00\x3c\x00\x00\x01\x66" +--- stream_response +hello world +--- grep_error_log eval +qr/(mqtt client id: \w+|proxy request to \S+)/ +--- grep_error_log_out +mqtt client id: f +proxy request to 0.0.0.0:1995 + + + +=== TEST 7: hit route with different client id, part 2 +--- stream_request eval +"\x10\x0e\x00\x04\x4d\x51\x54\x54\x05\x02\x00\x3c\x00\x00\x01\x67" +--- stream_response +hello world +--- grep_error_log eval +qr/(mqtt client id: \w+|proxy request to \S+)/ +--- grep_error_log_out +mqtt client id: g +proxy request to 127.0.0.1:1995