From 40ddca32909e8015440871fdcdb60f6ac8d2f46c Mon Sep 17 00:00:00 2001 From: biubiue Date: Wed, 24 Aug 2022 20:52:11 +0800 Subject: [PATCH 01/44] feat(kafka-logger): support sasl conf --- apisix/plugins/kafka-logger.lua | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/apisix/plugins/kafka-logger.lua b/apisix/plugins/kafka-logger.lua index cb43ae3db24b..cf27fd4196b8 100644 --- a/apisix/plugins/kafka-logger.lua +++ b/apisix/plugins/kafka-logger.lua @@ -51,6 +51,16 @@ local schema = { }, }, }, + sasl_config = { + type = "object", + description = "sasl config", + properties = { + mechanism = { type = "string", description = "mechanism" }, + password = { type = "string", description = "password" }, + user = { type = "string", description = "user" }, + strategy = { type = "string", description = "strategy" } + } + }, kafka_topic = {type = "string"}, producer_type = { type = "string", @@ -205,7 +215,8 @@ function _M.log(conf, ctx) for host, port in pairs(conf.broker_list) do local broker = { host = host, - port = port + port = port, + sasl_config = conf.sasl_config or nil } core.table.insert(broker_list, broker) end From 21be75746d2b867073e21712991519e93ade8a08 Mon Sep 17 00:00:00 2001 From: biubiue Date: Thu, 25 Aug 2022 15:01:45 +0800 Subject: [PATCH 02/44] feat(kafka-logger): doc add sasl conf --- docs/en/latest/plugins/kafka-logger.md | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/docs/en/latest/plugins/kafka-logger.md b/docs/en/latest/plugins/kafka-logger.md index d492857e0282..890c537aa974 100644 --- a/docs/en/latest/plugins/kafka-logger.md +++ b/docs/en/latest/plugins/kafka-logger.md @@ -38,6 +38,11 @@ It might take some time to receive the log data. It will be automatically sent a | Name | Type | Required | Default | Valid values | Description | | ---------------------- | ------- | -------- | -------------- | --------------------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------ | | broker_list | object | True | | | List of Kafka brokers (nodes). | +| sasl_config | object | False | | | Kafka sasl conf. | +| sasl_config.mechanism | string | False | | | mechanism. | +| sasl_config.password | string | False | | | password. | +| sasl_config.user | string | False | | | user. | +| sasl_config.strategy | string | False | | | strategy. | | kafka_topic | string | True | | | Target topic to push the logs for organisation. | | producer_type | string | False | async | ["async", "sync"] | Message sending mode of the producer. | | required_acks | integer | False | 1 | [0, 1, -1] | Number of acknowledgements the leader needs to receive for the producer to consider the request complete. This controls the durability of the sent records. The attribute follows the same configuration as the Kafka `acks` attribute. See [Apache Kafka documentation](https://kafka.apache.org/documentation/#producerconfigs_acks) for more. | From fad3df6a1598382f9a7700ab9e695be60558e721 Mon Sep 17 00:00:00 2001 From: biubiue Date: Thu, 25 Aug 2022 16:03:06 +0800 Subject: [PATCH 03/44] fix(kafka-logger): doc --- docs/en/latest/plugins/kafka-logger.md | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/docs/en/latest/plugins/kafka-logger.md b/docs/en/latest/plugins/kafka-logger.md index 890c537aa974..569650d23c26 100644 --- a/docs/en/latest/plugins/kafka-logger.md +++ b/docs/en/latest/plugins/kafka-logger.md @@ -39,10 +39,10 @@ It might take some time to receive the log data. It will be automatically sent a | ---------------------- | ------- | -------- | -------------- | --------------------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------ | | broker_list | object | True | | | List of Kafka brokers (nodes). | | sasl_config | object | False | | | Kafka sasl conf. | -| sasl_config.mechanism | string | False | | | mechanism. | -| sasl_config.password | string | False | | | password. | -| sasl_config.user | string | False | | | user. | -| sasl_config.strategy | string | False | | | strategy. | +| sasl_config.mechanism | string | False | | | Kafka mechanism. | +| sasl_config.password | string | False | | | Kafka password. | +| sasl_config.user | string | False | | | Kafka user. | +| sasl_config.strategy | string | False | | | Kafka strategy. | | kafka_topic | string | True | | | Target topic to push the logs for organisation. | | producer_type | string | False | async | ["async", "sync"] | Message sending mode of the producer. | | required_acks | integer | False | 1 | [0, 1, -1] | Number of acknowledgements the leader needs to receive for the producer to consider the request complete. This controls the durability of the sent records. The attribute follows the same configuration as the Kafka `acks` attribute. See [Apache Kafka documentation](https://kafka.apache.org/documentation/#producerconfigs_acks) for more. | From de82ec3ce2c5b92d5fedb1236b5d47972e6330ee Mon Sep 17 00:00:00 2001 From: biubiue Date: Sun, 28 Aug 2022 11:22:07 +0800 Subject: [PATCH 04/44] feat(kafka-logger): test case --- t/plugin/kafka-logger.t | 30 ++++++++++++++++++++++++++++++ 1 file changed, 30 insertions(+) diff --git a/t/plugin/kafka-logger.t b/t/plugin/kafka-logger.t index 5b894abce0bc..0dca1cc66eb4 100644 --- a/t/plugin/kafka-logger.t +++ b/t/plugin/kafka-logger.t @@ -592,3 +592,33 @@ qr/partition_id: 2/] [qr/partition_id: 1/, qr/partition_id: 0/, qr/partition_id: 2/] + + + + +=== TEST 20: sasl simple send +--- http_config eval: $::HttpConfig +--- config + location /t { + content_by_lua ' + local producer = require "resty.kafka.producer" + local broker_list = { + { host = "127.0.0.1", port = 9092 , + sasl_config = { mechanism="PLAIN", user="admin", password = "admin-secret" },}, + } + local message = "halo world" + local p = producer:new(broker_list) + local offset, err = p:send("test", nil, message) + if not offset then + ngx.say("send err:", err) + return + end + ngx.say("offset: ", tostring(offset)) + '; + } +--- request +GET /t +--- response_body_like +.*offset.* +--- no_error_log +[error] From 33cd4669ca895bded75ca0b87c2be336c12fe3d6 Mon Sep 17 00:00:00 2001 From: biubiue Date: Sun, 28 Aug 2022 11:27:07 +0800 Subject: [PATCH 05/44] feat(kafka-logger): modify test case code style --- t/plugin/kafka-logger.t | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/t/plugin/kafka-logger.t b/t/plugin/kafka-logger.t index 0dca1cc66eb4..805f17c2d88e 100644 --- a/t/plugin/kafka-logger.t +++ b/t/plugin/kafka-logger.t @@ -600,7 +600,7 @@ qr/partition_id: 2/] --- http_config eval: $::HttpConfig --- config location /t { - content_by_lua ' + content_by_lua_block { local producer = require "resty.kafka.producer" local broker_list = { { host = "127.0.0.1", port = 9092 , @@ -614,7 +614,7 @@ qr/partition_id: 2/] return end ngx.say("offset: ", tostring(offset)) - '; + } } --- request GET /t From 7e1db99179ccc3c0de63477af056e0dbfcf83b82 Mon Sep 17 00:00:00 2001 From: biubiue Date: Sun, 28 Aug 2022 11:33:02 +0800 Subject: [PATCH 06/44] fix(kafka-logger): test case code style --- t/plugin/kafka-logger.t | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/t/plugin/kafka-logger.t b/t/plugin/kafka-logger.t index 805f17c2d88e..fc602e4302ed 100644 --- a/t/plugin/kafka-logger.t +++ b/t/plugin/kafka-logger.t @@ -620,5 +620,5 @@ qr/partition_id: 2/] GET /t --- response_body_like .*offset.* ---- no_error_log +--- error_log [error] From ebeebc86dae35d73280d837367d7120859d301fd Mon Sep 17 00:00:00 2001 From: biubiue Date: Sun, 28 Aug 2022 11:39:39 +0800 Subject: [PATCH 07/44] fix(kafka-logger): test case code style --- t/plugin/kafka-logger.t | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/t/plugin/kafka-logger.t b/t/plugin/kafka-logger.t index fc602e4302ed..8dd5fe283a20 100644 --- a/t/plugin/kafka-logger.t +++ b/t/plugin/kafka-logger.t @@ -603,8 +603,16 @@ qr/partition_id: 2/] content_by_lua_block { local producer = require "resty.kafka.producer" local broker_list = { - { host = "127.0.0.1", port = 9092 , - sasl_config = { mechanism="PLAIN", user="admin", password = "admin-secret" },}, + { + host = "127.0.0.1", + port = 9092, + sasl_config = { + mechanism = "PLAIN", + strategy = "sasl" + user = "admin", + password = "admin-secret" + } + } } local message = "halo world" local p = producer:new(broker_list) From cd68da20ea2785a03f5a1520d2b926075b8a760a Mon Sep 17 00:00:00 2001 From: biubiue Date: Sun, 28 Aug 2022 11:48:51 +0800 Subject: [PATCH 08/44] fix(kafka-logger): test case code style --- t/plugin/kafka-logger.t | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/t/plugin/kafka-logger.t b/t/plugin/kafka-logger.t index 8dd5fe283a20..c8706d9cd511 100644 --- a/t/plugin/kafka-logger.t +++ b/t/plugin/kafka-logger.t @@ -595,7 +595,6 @@ qr/partition_id: 2/] - === TEST 20: sasl simple send --- http_config eval: $::HttpConfig --- config @@ -608,7 +607,7 @@ qr/partition_id: 2/] port = 9092, sasl_config = { mechanism = "PLAIN", - strategy = "sasl" + strategy = "sasl", user = "admin", password = "admin-secret" } @@ -628,5 +627,5 @@ qr/partition_id: 2/] GET /t --- response_body_like .*offset.* ---- error_log +--- error_log_like eval [error] From 0104c5a9daa15dda401f88f9e71026768efb156d Mon Sep 17 00:00:00 2001 From: biubiue Date: Sun, 28 Aug 2022 11:51:43 +0800 Subject: [PATCH 09/44] fix(kafka-logger): test case code style --- t/plugin/kafka-logger.t | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/t/plugin/kafka-logger.t b/t/plugin/kafka-logger.t index c8706d9cd511..5858eae91725 100644 --- a/t/plugin/kafka-logger.t +++ b/t/plugin/kafka-logger.t @@ -602,14 +602,14 @@ qr/partition_id: 2/] content_by_lua_block { local producer = require "resty.kafka.producer" local broker_list = { - { + { host = "127.0.0.1", port = 9092, - sasl_config = { - mechanism = "PLAIN", + sasl_config = { + mechanism = "PLAIN", strategy = "sasl", - user = "admin", - password = "admin-secret" + user = "admin", + password = "admin-secret" } } } From 503011cf48310647986a233cc4e5472bc87e5d31 Mon Sep 17 00:00:00 2001 From: biubiue Date: Sun, 28 Aug 2022 14:45:59 +0800 Subject: [PATCH 10/44] Update kafka-logger.t --- t/plugin/kafka-logger.t | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/t/plugin/kafka-logger.t b/t/plugin/kafka-logger.t index 5858eae91725..b1cde5bb0946 100644 --- a/t/plugin/kafka-logger.t +++ b/t/plugin/kafka-logger.t @@ -615,7 +615,7 @@ qr/partition_id: 2/] } local message = "halo world" local p = producer:new(broker_list) - local offset, err = p:send("test", nil, message) + local offset, err = p:send("test2", nil, message) if not offset then ngx.say("send err:", err) return From 18358aa07b62d6466f51f10de0a847646eada340 Mon Sep 17 00:00:00 2001 From: biubiue Date: Thu, 1 Sep 2022 10:20:56 +0800 Subject: [PATCH 11/44] fix(kafka-logger): test case --- t/plugin/kafka-logger.t | 31 +++++++++++++++++++++---------- 1 file changed, 21 insertions(+), 10 deletions(-) diff --git a/t/plugin/kafka-logger.t b/t/plugin/kafka-logger.t index b1cde5bb0946..3ed9f1f7a44e 100644 --- a/t/plugin/kafka-logger.t +++ b/t/plugin/kafka-logger.t @@ -22,7 +22,7 @@ no_root_location(); add_block_preprocessor(sub { my ($block) = @_; - + if (!$block->request) { $block->set_value("request", "GET /t"); } @@ -30,6 +30,19 @@ add_block_preprocessor(sub { if ((!defined $block->error_log) && (!defined $block->no_error_log)) { $block->set_value("no_error_log", "[error]"); } + my $extra_init_by_lua = <<_EOC_; + local producer = require("resty.kafka.producer") + local inject = function(mod, name) + local old_f = mod[name] + mod[name] = function (...) + ngx.say("success") + return old_f(...) + end + end + inject(producer, "new") +_EOC_ + + $block->set_value("extra_init_by_lua", $extra_init_by_lua); }); run_tests; @@ -596,7 +609,6 @@ qr/partition_id: 2/] === TEST 20: sasl simple send ---- http_config eval: $::HttpConfig --- config location /t { content_by_lua_block { @@ -613,19 +625,18 @@ qr/partition_id: 2/] } } } - local message = "halo world" - local p = producer:new(broker_list) - local offset, err = p:send("test2", nil, message) - if not offset then - ngx.say("send err:", err) +--- extra_init_by_lua + local producer = producer:new(broker_list) + if not producer then + ngx.say("err: producer not created ") return end - ngx.say("offset: ", tostring(offset)) + ngx.say("success") } } --- request GET /t --- response_body_like -.*offset.* +success --- error_log_like eval -[error] +producer not created From 3812c9cb15f72594843df3deb089d91fe77d6fcd Mon Sep 17 00:00:00 2001 From: biubiue Date: Thu, 1 Sep 2022 10:50:00 +0800 Subject: [PATCH 12/44] fix(kafka-logger): test case --- t/plugin/kafka-logger.t | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/t/plugin/kafka-logger.t b/t/plugin/kafka-logger.t index 3ed9f1f7a44e..7f4d0fe6d670 100644 --- a/t/plugin/kafka-logger.t +++ b/t/plugin/kafka-logger.t @@ -34,7 +34,7 @@ add_block_preprocessor(sub { local producer = require("resty.kafka.producer") local inject = function(mod, name) local old_f = mod[name] - mod[name] = function (...) + mod[name] = function(...) ngx.say("success") return old_f(...) end @@ -608,7 +608,7 @@ qr/partition_id: 2/] -=== TEST 20: sasl simple send +=== TEST 20: user sasl create producer --- config location /t { content_by_lua_block { From c8baf35ef278b8db094f1bf85dd8a729acf1124b Mon Sep 17 00:00:00 2001 From: biubiue Date: Thu, 1 Sep 2022 13:05:39 +0800 Subject: [PATCH 13/44] Update kafka-logger.t --- t/plugin/kafka-logger.t | 51 ++--------------------------------------- 1 file changed, 2 insertions(+), 49 deletions(-) diff --git a/t/plugin/kafka-logger.t b/t/plugin/kafka-logger.t index 7f4d0fe6d670..18b38c82f31b 100644 --- a/t/plugin/kafka-logger.t +++ b/t/plugin/kafka-logger.t @@ -22,7 +22,7 @@ no_root_location(); add_block_preprocessor(sub { my ($block) = @_; - + if (!$block->request) { $block->set_value("request", "GET /t"); } @@ -30,19 +30,7 @@ add_block_preprocessor(sub { if ((!defined $block->error_log) && (!defined $block->no_error_log)) { $block->set_value("no_error_log", "[error]"); } - my $extra_init_by_lua = <<_EOC_; - local producer = require("resty.kafka.producer") - local inject = function(mod, name) - local old_f = mod[name] - mod[name] = function(...) - ngx.say("success") - return old_f(...) - end - end - inject(producer, "new") -_EOC_ - - $block->set_value("extra_init_by_lua", $extra_init_by_lua); + }); run_tests; @@ -605,38 +593,3 @@ qr/partition_id: 2/] [qr/partition_id: 1/, qr/partition_id: 0/, qr/partition_id: 2/] - - - -=== TEST 20: user sasl create producer ---- config - location /t { - content_by_lua_block { - local producer = require "resty.kafka.producer" - local broker_list = { - { - host = "127.0.0.1", - port = 9092, - sasl_config = { - mechanism = "PLAIN", - strategy = "sasl", - user = "admin", - password = "admin-secret" - } - } - } ---- extra_init_by_lua - local producer = producer:new(broker_list) - if not producer then - ngx.say("err: producer not created ") - return - end - ngx.say("success") - } - } ---- request -GET /t ---- response_body_like -success ---- error_log_like eval -producer not created From d3489871763afd826a0eb9dcc430553aa0e8be27 Mon Sep 17 00:00:00 2001 From: biubiue Date: Thu, 1 Sep 2022 15:01:08 +0800 Subject: [PATCH 14/44] Update kafka-logger.t --- t/plugin/kafka-logger.t | 35 +++++++++++++++++++++++++++++++++++ 1 file changed, 35 insertions(+) diff --git a/t/plugin/kafka-logger.t b/t/plugin/kafka-logger.t index 18b38c82f31b..7f72b13434b0 100644 --- a/t/plugin/kafka-logger.t +++ b/t/plugin/kafka-logger.t @@ -593,3 +593,38 @@ qr/partition_id: 2/] [qr/partition_id: 1/, qr/partition_id: 0/, qr/partition_id: 2/] + + + +=== TEST 20: user sasl create producer +--- config + location /t { + content_by_lua_block { + local producer = require "resty.kafka.producer" + local broker_list = { + { + host = "127.0.0.1", + port = 9092, + sasl_config = { + mechanism = "PLAIN", + strategy = "sasl", + user = "admin", + password = "admin-secret" + } + } + } +--- extra_init_by_lua + local producer = producer:new(broker_list) + if not producer then + ngx.say("err: producer not created ") + return + end + ngx.say("success") + } + } +--- request +GET /t +--- response_body_like +success +--- error_log_like eval +producer not created From 72f0d0649cabc6d2a5a9918882a0af193b057ef8 Mon Sep 17 00:00:00 2001 From: biubiue Date: Fri, 2 Sep 2022 10:22:32 +0800 Subject: [PATCH 15/44] Update kafka-logger.t --- t/plugin/kafka-logger.t | 22 ++++++++++++++++++++-- 1 file changed, 20 insertions(+), 2 deletions(-) diff --git a/t/plugin/kafka-logger.t b/t/plugin/kafka-logger.t index 7f72b13434b0..dfa711f1e0cd 100644 --- a/t/plugin/kafka-logger.t +++ b/t/plugin/kafka-logger.t @@ -30,7 +30,25 @@ add_block_preprocessor(sub { if ((!defined $block->error_log) && (!defined $block->no_error_log)) { $block->set_value("no_error_log", "[error]"); } - + if (!$block->extra_init_by_lua) { + my $extra_init_by_lua = <<_EOC_; + local producer = require("resty.kafka.producer") + local inject = function(mod, name) + local old_f = mod[name] + mod[name] = function(...) + ngx.say("passed") + return old_f(...) + end + end + inject(producer, "new") +-- mock exporter producer +producer.send = function() + ngx.say("passed") +end +_EOC_ + + $block->set_value("extra_init_by_lua", $extra_init_by_lua); + } }); run_tests; @@ -625,6 +643,6 @@ qr/partition_id: 2/] --- request GET /t --- response_body_like -success +passed --- error_log_like eval producer not created From 5b7dd8c1cccbd607d135674854f43c3b0139abaa Mon Sep 17 00:00:00 2001 From: biubiue Date: Tue, 6 Sep 2022 16:33:14 +0800 Subject: [PATCH 16/44] fix(kafka-logger): test case --- t/plugin/kafka-logger.t | 37 ++++++++++++------------------------- 1 file changed, 12 insertions(+), 25 deletions(-) diff --git a/t/plugin/kafka-logger.t b/t/plugin/kafka-logger.t index dfa711f1e0cd..52319c2f32b2 100644 --- a/t/plugin/kafka-logger.t +++ b/t/plugin/kafka-logger.t @@ -30,25 +30,6 @@ add_block_preprocessor(sub { if ((!defined $block->error_log) && (!defined $block->no_error_log)) { $block->set_value("no_error_log", "[error]"); } - if (!$block->extra_init_by_lua) { - my $extra_init_by_lua = <<_EOC_; - local producer = require("resty.kafka.producer") - local inject = function(mod, name) - local old_f = mod[name] - mod[name] = function(...) - ngx.say("passed") - return old_f(...) - end - end - inject(producer, "new") --- mock exporter producer -producer.send = function() - ngx.say("passed") -end -_EOC_ - - $block->set_value("extra_init_by_lua", $extra_init_by_lua); - } }); run_tests; @@ -615,6 +596,11 @@ qr/partition_id: 2/] === TEST 20: user sasl create producer +--- extra_init_by_lua +local producer = require("resty.kafka.producer") +producer.send = function() + return true +end --- config location /t { content_by_lua_block { @@ -631,10 +617,11 @@ qr/partition_id: 2/] } } } ---- extra_init_by_lua - local producer = producer:new(broker_list) - if not producer then - ngx.say("err: producer not created ") + + local p = producer:new(broker_list) + local ok,err = p.send("test",nil,"hello world") + if not ok then + ngx.say("send to kafka err ") return end ngx.say("success") @@ -643,6 +630,6 @@ qr/partition_id: 2/] --- request GET /t --- response_body_like -passed +success --- error_log_like eval -producer not created +end to kafka err From 9f1817ac463d89e48806f722a658821094881b4f Mon Sep 17 00:00:00 2001 From: biubiue Date: Tue, 6 Sep 2022 18:04:47 +0800 Subject: [PATCH 17/44] fix(kafka-logger): try other actions --- t/plugin/kafka-logger.t | 1 + 1 file changed, 1 insertion(+) diff --git a/t/plugin/kafka-logger.t b/t/plugin/kafka-logger.t index 52319c2f32b2..8f1daf9dabe4 100644 --- a/t/plugin/kafka-logger.t +++ b/t/plugin/kafka-logger.t @@ -633,3 +633,4 @@ GET /t success --- error_log_like eval end to kafka err + From 588bb0fe31ca35e6db08373a3dab96ffeaa0ff90 Mon Sep 17 00:00:00 2001 From: biubiue Date: Tue, 6 Sep 2022 18:13:54 +0800 Subject: [PATCH 18/44] Update kafka-logger.t --- t/plugin/kafka-logger.t | 1 - 1 file changed, 1 deletion(-) diff --git a/t/plugin/kafka-logger.t b/t/plugin/kafka-logger.t index 8f1daf9dabe4..52319c2f32b2 100644 --- a/t/plugin/kafka-logger.t +++ b/t/plugin/kafka-logger.t @@ -633,4 +633,3 @@ GET /t success --- error_log_like eval end to kafka err - From 3a90b5d4c3476f837c4d33f43b3b4c9b11ba38f3 Mon Sep 17 00:00:00 2001 From: biubiue Date: Wed, 7 Sep 2022 08:55:44 +0800 Subject: [PATCH 19/44] Update kafka-logger.t --- t/plugin/kafka-logger.t | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/t/plugin/kafka-logger.t b/t/plugin/kafka-logger.t index 52319c2f32b2..4b78ff13badc 100644 --- a/t/plugin/kafka-logger.t +++ b/t/plugin/kafka-logger.t @@ -621,7 +621,7 @@ end local p = producer:new(broker_list) local ok,err = p.send("test",nil,"hello world") if not ok then - ngx.say("send to kafka err ") + ngx.say("send to kafka err") return end ngx.say("success") @@ -632,4 +632,4 @@ GET /t --- response_body_like success --- error_log_like eval -end to kafka err +send to kafka err From d10ea12c9dda91457fe8412557cf1d3a248d1b80 Mon Sep 17 00:00:00 2001 From: biubiue Date: Wed, 7 Sep 2022 09:10:43 +0800 Subject: [PATCH 20/44] Update kafka-logger.t --- t/plugin/kafka-logger.t | 41 ----------------------------------------- 1 file changed, 41 deletions(-) diff --git a/t/plugin/kafka-logger.t b/t/plugin/kafka-logger.t index 4b78ff13badc..5b894abce0bc 100644 --- a/t/plugin/kafka-logger.t +++ b/t/plugin/kafka-logger.t @@ -592,44 +592,3 @@ qr/partition_id: 2/] [qr/partition_id: 1/, qr/partition_id: 0/, qr/partition_id: 2/] - - - -=== TEST 20: user sasl create producer ---- extra_init_by_lua -local producer = require("resty.kafka.producer") -producer.send = function() - return true -end ---- config - location /t { - content_by_lua_block { - local producer = require "resty.kafka.producer" - local broker_list = { - { - host = "127.0.0.1", - port = 9092, - sasl_config = { - mechanism = "PLAIN", - strategy = "sasl", - user = "admin", - password = "admin-secret" - } - } - } - - local p = producer:new(broker_list) - local ok,err = p.send("test",nil,"hello world") - if not ok then - ngx.say("send to kafka err") - return - end - ngx.say("success") - } - } ---- request -GET /t ---- response_body_like -success ---- error_log_like eval -send to kafka err From 70054aa99843e457459caea3cbbb5197aa6397f4 Mon Sep 17 00:00:00 2001 From: biubiue Date: Wed, 7 Sep 2022 09:18:07 +0800 Subject: [PATCH 21/44] Update kafka-logger.t --- t/plugin/kafka-logger.t | 41 +++++++++++++++++++++++++++++++++++++++++ 1 file changed, 41 insertions(+) diff --git a/t/plugin/kafka-logger.t b/t/plugin/kafka-logger.t index 5b894abce0bc..4b78ff13badc 100644 --- a/t/plugin/kafka-logger.t +++ b/t/plugin/kafka-logger.t @@ -592,3 +592,44 @@ qr/partition_id: 2/] [qr/partition_id: 1/, qr/partition_id: 0/, qr/partition_id: 2/] + + + +=== TEST 20: user sasl create producer +--- extra_init_by_lua +local producer = require("resty.kafka.producer") +producer.send = function() + return true +end +--- config + location /t { + content_by_lua_block { + local producer = require "resty.kafka.producer" + local broker_list = { + { + host = "127.0.0.1", + port = 9092, + sasl_config = { + mechanism = "PLAIN", + strategy = "sasl", + user = "admin", + password = "admin-secret" + } + } + } + + local p = producer:new(broker_list) + local ok,err = p.send("test",nil,"hello world") + if not ok then + ngx.say("send to kafka err") + return + end + ngx.say("success") + } + } +--- request +GET /t +--- response_body_like +success +--- error_log_like eval +send to kafka err From 35a581b21bf5f5285a886cd6ee4cc08752e94ed7 Mon Sep 17 00:00:00 2001 From: biubiue Date: Wed, 7 Sep 2022 14:28:41 +0800 Subject: [PATCH 22/44] Update kafka-logger.t --- t/plugin/kafka-logger.t | 129 ++++++++++++++++++++++++++++++---------- 1 file changed, 98 insertions(+), 31 deletions(-) diff --git a/t/plugin/kafka-logger.t b/t/plugin/kafka-logger.t index 4b78ff13badc..0e7f95e26e4f 100644 --- a/t/plugin/kafka-logger.t +++ b/t/plugin/kafka-logger.t @@ -595,41 +595,108 @@ qr/partition_id: 2/] -=== TEST 20: user sasl create producer ---- extra_init_by_lua -local producer = require("resty.kafka.producer") -producer.send = function() - return true -end +=== TEST 20: set route(id: 1) --- config location /t { content_by_lua_block { - local producer = require "resty.kafka.producer" - local broker_list = { - { - host = "127.0.0.1", - port = 9092, - sasl_config = { - mechanism = "PLAIN", - strategy = "sasl", - user = "admin", - password = "admin-secret" - } - } - } - - local p = producer:new(broker_list) - local ok,err = p.send("test",nil,"hello world") - if not ok then - ngx.say("send to kafka err") - return + local t = require("lib.test_admin").test + local code, body = t('/apisix/admin/routes/1', + ngx.HTTP_PUT, + [[{ + "plugins": { + "kafka-logger": { + "broker_list" : + { + "127.0.0.1":9092 + }, + "kafka_topic" : "test2", + "key" : "key1", + "timeout" : 1, + "batch_max_size": 1, + "sasl_config": { + "mechanism": "PLAIN", + "strategy": "sasl", + "user": "admin", + "password": "admin-secret" + } + } + }, + "upstream": { + "nodes": { + "127.0.0.1:1980": 1 + }, + "type": "roundrobin" + }, + "uri": "/hello" + }]] + ) + if code >= 300 then + ngx.status = code end - ngx.say("success") + ngx.say(body) } } +--- response_body +passed + + + +=== TEST 21: access --- request -GET /t ---- response_body_like -success ---- error_log_like eval -send to kafka err +GET /hello +--- response_body +hello world +--- wait: 2 + + + +=== TEST 22: error log +--- config + location /t { + content_by_lua_block { + local t = require("lib.test_admin").test + local code, body = t('/apisix/admin/routes/1', + ngx.HTTP_PUT, + [[{ + "plugins": { + "kafka-logger": { + "broker_list" : + { + "127.0.0.1":9092, + "127.0.0.1":9093 + }, + "kafka_topic" : "test2", + "key" : "key1", + "timeout" : 1, + "batch_max_size": 1, + "sasl_config": { + "mechanism": "PLAIN", + "strategy": "sasl", + "user": "admin", + "password": "admin-secret" + } + } + }, + "upstream": { + "nodes": { + "127.0.0.1:1980": 1 + }, + "type": "roundrobin" + }, + "uri": "/hello" + }]] + ) + if code >= 300 then + ngx.status = code + end + ngx.say(body) + local http = require "resty.http" + local httpc = http.new() + local uri = "http://127.0.0.1:" .. ngx.var.server_port .. "/hello" + local res, err = httpc:request_uri(uri, {method = "GET"}) + } + } +--- error_log +failed to send data to Kafka topic +[error] +--- wait: 1 From 9d9f005ba781a0d3db71f1527d92d41550482574 Mon Sep 17 00:00:00 2001 From: biubiue Date: Thu, 8 Sep 2022 09:44:22 +0800 Subject: [PATCH 23/44] fix(kafka-logger): modify sasl_config properties --- apisix/plugins/kafka-logger.lua | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/apisix/plugins/kafka-logger.lua b/apisix/plugins/kafka-logger.lua index cf27fd4196b8..6f9e1c3b81f3 100644 --- a/apisix/plugins/kafka-logger.lua +++ b/apisix/plugins/kafka-logger.lua @@ -55,11 +55,11 @@ local schema = { type = "object", description = "sasl config", properties = { - mechanism = { type = "string", description = "mechanism" }, + mechanism = { type = "string", description = "mechanism", default = "PLAIN" }, password = { type = "string", description = "password" }, - user = { type = "string", description = "user" }, - strategy = { type = "string", description = "strategy" } - } + user = { type = "string", description = "user" } + }, + required = {"password", "user"}, }, kafka_topic = {type = "string"}, producer_type = { From 8c414739d5cca8bf159820cd45fbc864b6c9961f Mon Sep 17 00:00:00 2001 From: biubiue Date: Thu, 8 Sep 2022 10:59:18 +0800 Subject: [PATCH 24/44] Update kafka-logger.md --- docs/en/latest/plugins/kafka-logger.md | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/docs/en/latest/plugins/kafka-logger.md b/docs/en/latest/plugins/kafka-logger.md index 569650d23c26..c4040a0df41b 100644 --- a/docs/en/latest/plugins/kafka-logger.md +++ b/docs/en/latest/plugins/kafka-logger.md @@ -39,10 +39,9 @@ It might take some time to receive the log data. It will be automatically sent a | ---------------------- | ------- | -------- | -------------- | --------------------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------ | | broker_list | object | True | | | List of Kafka brokers (nodes). | | sasl_config | object | False | | | Kafka sasl conf. | -| sasl_config.mechanism | string | False | | | Kafka mechanism. | -| sasl_config.password | string | False | | | Kafka password. | -| sasl_config.user | string | False | | | Kafka user. | -| sasl_config.strategy | string | False | | | Kafka strategy. | +| sasl_config.mechanism | string | False | PLAIN | ["PLAIN"] | Kafka mechanism. | +| sasl_config.password | string | True | | | Kafka password. If sasl_config exists, it's required. | +| sasl_config.user | string | True | | | Kafka user. If sasl_config exists, it's required. | | kafka_topic | string | True | | | Target topic to push the logs for organisation. | | producer_type | string | False | async | ["async", "sync"] | Message sending mode of the producer. | | required_acks | integer | False | 1 | [0, 1, -1] | Number of acknowledgements the leader needs to receive for the producer to consider the request complete. This controls the durability of the sent records. The attribute follows the same configuration as the Kafka `acks` attribute. See [Apache Kafka documentation](https://kafka.apache.org/documentation/#producerconfigs_acks) for more. | From 4e1cc3485a8de0b2fee5f78a65055c1b39511db7 Mon Sep 17 00:00:00 2001 From: biubiue Date: Fri, 9 Sep 2022 17:43:42 +0800 Subject: [PATCH 25/44] Update kafka-logger.t --- t/plugin/kafka-logger.t | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) diff --git a/t/plugin/kafka-logger.t b/t/plugin/kafka-logger.t index 0e7f95e26e4f..8dd641070a23 100644 --- a/t/plugin/kafka-logger.t +++ b/t/plugin/kafka-logger.t @@ -641,7 +641,19 @@ passed -=== TEST 21: access +=== TEST 21: create producer with sasl_config +--- extra_init_by_lua +    local producer = require("resty.kafka.producer") +    local klogger = require("apisix.plugins.kafka-logger") +    producer.new = function(klogger) +        if (!klogger.sasl_config) then +            ngx.say("create producer without sasl_config") +            return producer +        end +        producer.sasl_config = klogger.sasl_config +        ngx.say("create producer with sasl_config") +        return producer +    end --- request GET /hello --- response_body From ddde43355cf1e2b9cf44ed82dc02549efbda3b63 Mon Sep 17 00:00:00 2001 From: biubiue Date: Fri, 9 Sep 2022 18:21:15 +0800 Subject: [PATCH 26/44] Update kafka-logger.lua --- apisix/plugins/kafka-logger.lua | 14 ++++++++++++-- 1 file changed, 12 insertions(+), 2 deletions(-) diff --git a/apisix/plugins/kafka-logger.lua b/apisix/plugins/kafka-logger.lua index 6f9e1c3b81f3..fa772ab9da97 100644 --- a/apisix/plugins/kafka-logger.lua +++ b/apisix/plugins/kafka-logger.lua @@ -97,7 +97,12 @@ local schema = { producer_batch_num = {type = "integer", minimum = 1, default = 200}, producer_batch_size = {type = "integer", minimum = 0, default = 1048576}, producer_max_buffering = {type = "integer", minimum = 1, default = 50000}, - producer_time_linger = {type = "integer", minimum = 1, default = 1} + producer_time_linger = {type = "integer", minimum = 1, default = 1}, + client_ssl = {type = "boolean", default = false}, + client_ssl_verify = {type = "boolean", default = false}, + client_socket_timeout = {type = "integer", default = 3000}, + client_keepalive_timeout = {type = "integer", default = 600}, + client_keepalive_size = {type = "integer", default = 2} }, required = {"broker_list", "kafka_topic"} } @@ -228,7 +233,12 @@ function _M.log(conf, ctx) broker_config["batch_size"] = conf.producer_batch_size broker_config["max_buffering"] = conf.producer_max_buffering broker_config["flush_time"] = conf.producer_time_linger * 1000 - + broker_config["ssl"] = conf.client_ssl + broker_config["ssl_verify"] = conf.client_ssl_verify + broker_config["socket_timeout"] = conf.client_socket_timeout + broker_config["keepalive_timeout"] = conf.client_keepalive_timeout * 1000 + broker_config["keepalive_size"] = conf.client_keepalive_size + local prod, err = core.lrucache.plugin_ctx(lrucache, ctx, nil, create_producer, broker_list, broker_config, conf.cluster_name) core.log.info("kafka cluster name ", conf.cluster_name, ", broker_list[1] port ", From 0444c000bb839aa72bdab7c8d80a604a7ab7d947 Mon Sep 17 00:00:00 2001 From: biubiue Date: Fri, 9 Sep 2022 18:44:21 +0800 Subject: [PATCH 27/44] Update kafka-logger.md --- docs/en/latest/plugins/kafka-logger.md | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/docs/en/latest/plugins/kafka-logger.md b/docs/en/latest/plugins/kafka-logger.md index c4040a0df41b..0b67bb084a5c 100644 --- a/docs/en/latest/plugins/kafka-logger.md +++ b/docs/en/latest/plugins/kafka-logger.md @@ -58,6 +58,11 @@ It might take some time to receive the log data. It will be automatically sent a | producer_batch_size | integer | optional | 1048576 | [0,...] | `batch_size` parameter in [lua-resty-kafka](https://github.com/doujiang24/lua-resty-kafka) in bytes. | | producer_max_buffering | integer | optional | 50000 | [1,...] | `max_buffering` parameter in [lua-resty-kafka](https://github.com/doujiang24/lua-resty-kafka) representing maximum buffer size. Unit is message count. | | producer_time_linger | integer | optional | 1 | [1,...] | `flush_time` parameter in [lua-resty-kafka](https://github.com/doujiang24/lua-resty-kafka) in seconds. | +| client_ssl | boolean | optional | False | [True,False] | `ssl` parameter in [lua-resty-kafka](https://github.com/doujiang24/lua-resty-kafka) | +| client_ssl_verify | boolean | optional | False | [True,False] | `ssl_verify` parameter in [lua-resty-kafka](https://github.com/doujiang24/lua-resty-kafka) . | +| client_socket_timeout | integer | optional | 3000 | [1,...] | `socket_timeout` parameter in [lua-resty-kafka](https://github.com/doujiang24/lua-resty-kafka),Unit is seconds. | +| client_keepalive_timeout | integer | optional | 600 | [1,...] | `keepalive_timeout` parameter in [lua-resty-kafka](https://github.com/doujiang24/lua-resty-kafka) ,Unit is millisecond. | +| client_keepalive_size | integer | optional | 2 | [1,...] | `keepalive_size` parameter in [lua-resty-kafka](https://github.com/doujiang24/lua-resty-kafka) . | This Plugin supports using batch processors to aggregate and process entries (logs/data) in a batch. This avoids the need for frequently submitting the data. The batch processor submits data every `5` seconds or when the data in the queue reaches `1000`. See [Batch Processor](../batch-processor.md#configuration) for more information or setting your custom configuration. From cc90a472735d0ec28dddbfa6f78bf16f8d7e4896 Mon Sep 17 00:00:00 2001 From: biubiue Date: Fri, 9 Sep 2022 18:58:52 +0800 Subject: [PATCH 28/44] Update kafka-logger.lua --- apisix/plugins/kafka-logger.lua | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/apisix/plugins/kafka-logger.lua b/apisix/plugins/kafka-logger.lua index fa772ab9da97..517cbaac9b96 100644 --- a/apisix/plugins/kafka-logger.lua +++ b/apisix/plugins/kafka-logger.lua @@ -238,7 +238,7 @@ function _M.log(conf, ctx) broker_config["socket_timeout"] = conf.client_socket_timeout broker_config["keepalive_timeout"] = conf.client_keepalive_timeout * 1000 broker_config["keepalive_size"] = conf.client_keepalive_size - + local prod, err = core.lrucache.plugin_ctx(lrucache, ctx, nil, create_producer, broker_list, broker_config, conf.cluster_name) core.log.info("kafka cluster name ", conf.cluster_name, ", broker_list[1] port ", From 290070c475bd0310f334475aa0eafcd7e6843428 Mon Sep 17 00:00:00 2001 From: biubiue Date: Tue, 13 Sep 2022 16:18:43 +0800 Subject: [PATCH 29/44] fix(kafka-logger): test case --- docs/en/latest/plugins/kafka-logger.md | 4 ++-- t/plugin/kafka-logger.t | 15 +++++---------- 2 files changed, 7 insertions(+), 12 deletions(-) diff --git a/docs/en/latest/plugins/kafka-logger.md b/docs/en/latest/plugins/kafka-logger.md index 0b67bb084a5c..db9a710af9fd 100644 --- a/docs/en/latest/plugins/kafka-logger.md +++ b/docs/en/latest/plugins/kafka-logger.md @@ -58,8 +58,8 @@ It might take some time to receive the log data. It will be automatically sent a | producer_batch_size | integer | optional | 1048576 | [0,...] | `batch_size` parameter in [lua-resty-kafka](https://github.com/doujiang24/lua-resty-kafka) in bytes. | | producer_max_buffering | integer | optional | 50000 | [1,...] | `max_buffering` parameter in [lua-resty-kafka](https://github.com/doujiang24/lua-resty-kafka) representing maximum buffer size. Unit is message count. | | producer_time_linger | integer | optional | 1 | [1,...] | `flush_time` parameter in [lua-resty-kafka](https://github.com/doujiang24/lua-resty-kafka) in seconds. | -| client_ssl | boolean | optional | False | [True,False] | `ssl` parameter in [lua-resty-kafka](https://github.com/doujiang24/lua-resty-kafka) | -| client_ssl_verify | boolean | optional | False | [True,False] | `ssl_verify` parameter in [lua-resty-kafka](https://github.com/doujiang24/lua-resty-kafka) . | +| client_ssl | boolean | optional | false | [true,false] | `ssl` parameter in [lua-resty-kafka](https://github.com/doujiang24/lua-resty-kafka) | +| client_ssl_verify | boolean | optional | false | [true,false] | `ssl_verify` parameter in [lua-resty-kafka](https://github.com/doujiang24/lua-resty-kafka) . | | client_socket_timeout | integer | optional | 3000 | [1,...] | `socket_timeout` parameter in [lua-resty-kafka](https://github.com/doujiang24/lua-resty-kafka),Unit is seconds. | | client_keepalive_timeout | integer | optional | 600 | [1,...] | `keepalive_timeout` parameter in [lua-resty-kafka](https://github.com/doujiang24/lua-resty-kafka) ,Unit is millisecond. | | client_keepalive_size | integer | optional | 2 | [1,...] | `keepalive_size` parameter in [lua-resty-kafka](https://github.com/doujiang24/lua-resty-kafka) . | diff --git a/t/plugin/kafka-logger.t b/t/plugin/kafka-logger.t index 8dd641070a23..f0949611ac96 100644 --- a/t/plugin/kafka-logger.t +++ b/t/plugin/kafka-logger.t @@ -615,7 +615,6 @@ qr/partition_id: 2/] "batch_max_size": 1, "sasl_config": { "mechanism": "PLAIN", - "strategy": "sasl", "user": "admin", "password": "admin-secret" } @@ -641,17 +640,14 @@ passed -=== TEST 21: create producer with sasl_config +=== TEST 21: inject create producer --- extra_init_by_lua     local producer = require("resty.kafka.producer") -    local klogger = require("apisix.plugins.kafka-logger") -    producer.new = function(klogger) -        if (!klogger.sasl_config) then -            ngx.say("create producer without sasl_config") -            return producer +    producer.new = function(self, broker_list, producer_config, cluster_name) +        if (#broker_list) then +            ngx.log(ngx.ERR, "unexpected broker_list length: ", #broker_list) +            return nil         end -        producer.sasl_config = klogger.sasl_config -        ngx.say("create producer with sasl_config")         return producer     end --- request @@ -683,7 +679,6 @@ hello world "batch_max_size": 1, "sasl_config": { "mechanism": "PLAIN", - "strategy": "sasl", "user": "admin", "password": "admin-secret" } From 1481240ed6b18cf91438cc014c3ebc3c52b12cc8 Mon Sep 17 00:00:00 2001 From: biubiue Date: Mon, 26 Sep 2022 16:23:14 +0800 Subject: [PATCH 30/44] fix(kafka-logger): test case --- t/plugin/kafka-logger.t | 46 ++++++++++++++++++++++++++++++++++++++--- 1 file changed, 43 insertions(+), 3 deletions(-) diff --git a/t/plugin/kafka-logger.t b/t/plugin/kafka-logger.t index f0949611ac96..3894cbeac9a4 100644 --- a/t/plugin/kafka-logger.t +++ b/t/plugin/kafka-logger.t @@ -642,13 +642,53 @@ passed === TEST 21: inject create producer --- extra_init_by_lua + local setmetatable_p = setmetatable + local setmetatable_c = setmetatable     local producer = require("resty.kafka.producer") + local client = require("resty.kafka.client") + local sendbuffer = require("resty.kafka.sendbuffer") + local ringbuffer = require("resty.kafka.ringbuffer")     producer.new = function(self, broker_list, producer_config, cluster_name) -        if (#broker_list) then -            ngx.log(ngx.ERR, "unexpected broker_list length: ", #broker_list) +        if #broker_list=0 then +            ngx.log(ngx.ERR, "broker_list length must not zero")             return nil         end -        return producer + local name = cluster_name or 1 + local opts = producer_config or {} + local cli = setmetatable_c({ + broker_list = broker_list, + topic_partitions = {}, + brokers = {}, + api_versions = {}, -- support APIs version on broker + client_id = "worker" .. pid(), + socket_config = { + socket_timeout = opts.socket_timeout or 3000, + keepalive_timeout = opts.keepalive_timeout or (600 * 1000), -- 10 min + keepalive_size = opts.keepalive_size or 2, + ssl = opts.ssl or false, + ssl_verify = opts.ssl_verify or false, + resolver = opts.resolver -- or nil + } + }, { __index = { _VERSION = "0.20" } }) + return setmetatable_p({ + client = cli, + correlation_id = 1, + request_timeout = opts.request_timeout or 2000, + retry_backoff = opts.retry_backoff or 100, -- ms + max_retry = opts.max_retry or 3, + required_acks = opts.required_acks or 1, + partitioner = opts.partitioner, + error_handle = opts.error_handle, + api_version = opts.api_version or 0, + async = opts.producer_type == "async", + socket_config = cli.socket_config, + _timer_flushing_buffer = false, + ringbuffer = ringbuffer:new(opts.batch_num or 200, opts.max_buffering or 50000), -- 200, 50K + sendbuffer = sendbuffer:new(opts.batch_num or 200, opts.batch_size or 1048576) + -- default: 1K, 1M + -- batch_size should less than (MaxRequestSize / 2 - 10KiB) + -- config in the kafka server, default 100M + }, { __index = { _VERSION = "0.20" } })     end --- request GET /hello From 6968d487c1bb65c8574bfc4378c8f4591a69d1e5 Mon Sep 17 00:00:00 2001 From: biubiue Date: Mon, 26 Sep 2022 17:59:34 +0800 Subject: [PATCH 31/44] test case --- t/plugin/kafka-logger.t | 24 ++++++++---------------- 1 file changed, 8 insertions(+), 16 deletions(-) diff --git a/t/plugin/kafka-logger.t b/t/plugin/kafka-logger.t index 3894cbeac9a4..1285ee4c9784 100644 --- a/t/plugin/kafka-logger.t +++ b/t/plugin/kafka-logger.t @@ -645,36 +645,28 @@ passed local setmetatable_p = setmetatable local setmetatable_c = setmetatable     local producer = require("resty.kafka.producer") - local client = require("resty.kafka.client") - local sendbuffer = require("resty.kafka.sendbuffer") - local ringbuffer = require("resty.kafka.ringbuffer")     producer.new = function(self, broker_list, producer_config, cluster_name) -        if #broker_list=0 then -            ngx.log(ngx.ERR, "broker_list length must not zero") -            return nil -        end - local name = cluster_name or 1 local opts = producer_config or {} local cli = setmetatable_c({ broker_list = broker_list, topic_partitions = {}, brokers = {}, - api_versions = {}, -- support APIs version on broker + api_versions = {}, client_id = "worker" .. pid(), socket_config = { socket_timeout = opts.socket_timeout or 3000, - keepalive_timeout = opts.keepalive_timeout or (600 * 1000), -- 10 min + keepalive_timeout = opts.keepalive_timeout or (600 * 1000), keepalive_size = opts.keepalive_size or 2, ssl = opts.ssl or false, ssl_verify = opts.ssl_verify or false, - resolver = opts.resolver -- or nil + resolver = opts.resolver or nil } }, { __index = { _VERSION = "0.20" } }) return setmetatable_p({ client = cli, correlation_id = 1, request_timeout = opts.request_timeout or 2000, - retry_backoff = opts.retry_backoff or 100, -- ms + retry_backoff = opts.retry_backoff or 100, max_retry = opts.max_retry or 3, required_acks = opts.required_acks or 1, partitioner = opts.partitioner, @@ -683,13 +675,13 @@ passed async = opts.producer_type == "async", socket_config = cli.socket_config, _timer_flushing_buffer = false, - ringbuffer = ringbuffer:new(opts.batch_num or 200, opts.max_buffering or 50000), -- 200, 50K + ringbuffer = ringbuffer:new(opts.batch_num or 200, opts.max_buffering or 50000), sendbuffer = sendbuffer:new(opts.batch_num or 200, opts.batch_size or 1048576) - -- default: 1K, 1M - -- batch_size should less than (MaxRequestSize / 2 - 10KiB) - -- config in the kafka server, default 100M }, { __index = { _VERSION = "0.20" } })     end + producer.send = function(self, topic, key, message) + return 1 + end --- request GET /hello --- response_body From 11d8dc653217f95ffe7c71fffdb772f7c4112cf2 Mon Sep 17 00:00:00 2001 From: biubiue Date: Tue, 27 Sep 2022 09:06:35 +0800 Subject: [PATCH 32/44] test case --- t/plugin/kafka-logger.t | 3 --- 1 file changed, 3 deletions(-) diff --git a/t/plugin/kafka-logger.t b/t/plugin/kafka-logger.t index 1285ee4c9784..20070656ba82 100644 --- a/t/plugin/kafka-logger.t +++ b/t/plugin/kafka-logger.t @@ -679,9 +679,6 @@ passed sendbuffer = sendbuffer:new(opts.batch_num or 200, opts.batch_size or 1048576) }, { __index = { _VERSION = "0.20" } })     end - producer.send = function(self, topic, key, message) - return 1 - end --- request GET /hello --- response_body From 8ca60db16084ccbddad6b83ada34deb394345da4 Mon Sep 17 00:00:00 2001 From: biubiue Date: Tue, 27 Sep 2022 10:12:27 +0800 Subject: [PATCH 33/44] test case --- t/plugin/kafka-logger.t | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/t/plugin/kafka-logger.t b/t/plugin/kafka-logger.t index 20070656ba82..30c790b1e374 100644 --- a/t/plugin/kafka-logger.t +++ b/t/plugin/kafka-logger.t @@ -642,12 +642,10 @@ passed === TEST 21: inject create producer --- extra_init_by_lua - local setmetatable_p = setmetatable - local setmetatable_c = setmetatable     local producer = require("resty.kafka.producer")     producer.new = function(self, broker_list, producer_config, cluster_name) local opts = producer_config or {} - local cli = setmetatable_c({ + local cli = { broker_list = broker_list, topic_partitions = {}, brokers = {}, @@ -661,8 +659,8 @@ passed ssl_verify = opts.ssl_verify or false, resolver = opts.resolver or nil } - }, { __index = { _VERSION = "0.20" } }) - return setmetatable_p({ + } + return { client = cli, correlation_id = 1, request_timeout = opts.request_timeout or 2000, @@ -677,7 +675,7 @@ passed _timer_flushing_buffer = false, ringbuffer = ringbuffer:new(opts.batch_num or 200, opts.max_buffering or 50000), sendbuffer = sendbuffer:new(opts.batch_num or 200, opts.batch_size or 1048576) - }, { __index = { _VERSION = "0.20" } }) + }     end --- request GET /hello From 82e81e6a31a7c3b0bb71776b8f9fd9442ba544ad Mon Sep 17 00:00:00 2001 From: starsz Date: Sun, 9 Oct 2022 17:31:28 +0800 Subject: [PATCH 34/44] fix: ci --- apisix/plugins/kafka-logger.lua | 25 +--- ci/pod/docker-compose.plugin.yml | 36 +---- ci/pod/kafka/kafka-server/env/common2.env | 8 ++ docs/en/latest/plugins/kafka-logger.md | 11 +- t/plugin/kafka-logger.t | 164 +++++++++------------- 5 files changed, 87 insertions(+), 157 deletions(-) create mode 100644 ci/pod/kafka/kafka-server/env/common2.env diff --git a/apisix/plugins/kafka-logger.lua b/apisix/plugins/kafka-logger.lua index 95187bfd1927..972a5f5ffa3b 100644 --- a/apisix/plugins/kafka-logger.lua +++ b/apisix/plugins/kafka-logger.lua @@ -73,10 +73,11 @@ local schema = { description = "sasl config", properties = { mechanism = { type = "string", description = "mechanism", default = "PLAIN" }, + user = { type = "string", description = "user" }, password = { type = "string", description = "password" }, - user = { type = "string", description = "user" } }, - required = {"password", "user"}, + required = {"user", "password"}, + }, }, required = {"host", "port"}, }, @@ -119,11 +120,6 @@ local schema = { producer_batch_size = {type = "integer", minimum = 0, default = 1048576}, producer_max_buffering = {type = "integer", minimum = 1, default = 50000}, producer_time_linger = {type = "integer", minimum = 1, default = 1}, - client_ssl = {type = "boolean", default = false}, - client_ssl_verify = {type = "boolean", default = false}, - client_socket_timeout = {type = "integer", default = 3000}, - client_keepalive_timeout = {type = "integer", default = 600}, - client_keepalive_size = {type = "integer", default = 2} }, oneOf = { { required = {"broker_list", "kafka_topic"},}, @@ -241,15 +237,6 @@ function _M.log(conf, ctx) local broker_list = core.table.clone(conf.brokers or {}) local broker_config = {} -<<<<<<< HEAD - for host, port in pairs(conf.broker_list) do - local broker = { - host = host, - port = port, - sasl_config = conf.sasl_config or nil - } - core.table.insert(broker_list, broker) -======= if conf.broker_list then for host, port in pairs(conf.broker_list) do local broker = { @@ -258,7 +245,6 @@ function _M.log(conf, ctx) } core.table.insert(broker_list, broker) end ->>>>>>> master end broker_config["request_timeout"] = conf.timeout * 1000 @@ -268,11 +254,6 @@ function _M.log(conf, ctx) broker_config["batch_size"] = conf.producer_batch_size broker_config["max_buffering"] = conf.producer_max_buffering broker_config["flush_time"] = conf.producer_time_linger * 1000 - broker_config["ssl"] = conf.client_ssl - broker_config["ssl_verify"] = conf.client_ssl_verify - broker_config["socket_timeout"] = conf.client_socket_timeout - broker_config["keepalive_timeout"] = conf.client_keepalive_timeout * 1000 - broker_config["keepalive_size"] = conf.client_keepalive_size local prod, err = core.lrucache.plugin_ctx(lrucache, ctx, nil, create_producer, broker_list, broker_config, conf.cluster_name) diff --git a/ci/pod/docker-compose.plugin.yml b/ci/pod/docker-compose.plugin.yml index 4c0c4cb7e8e6..f16fa269304b 100644 --- a/ci/pod/docker-compose.plugin.yml +++ b/ci/pod/docker-compose.plugin.yml @@ -104,17 +104,22 @@ services: kafka-server2: image: bitnami/kafka:2.8.1 env_file: - - ci/pod/kafka/kafka-server/env/common.env + - ci/pod/kafka/kafka-server/env/common2.env environment: KAFKA_CFG_ZOOKEEPER_CONNECT: zookeeper-server2:2181 restart: unless-stopped ports: - "19092:9092" + - "19094:9094" depends_on: - zookeeper-server1 - zookeeper-server2 networks: kafka_net: + volumes: + - ./ci/pod/kafka/kafka-server/kafka_jaas.conf:/opt/bitnami/kafka/config/kafka_jaas.conf:ro + - ./ci/pod/kafka/kafka-server/selfsigned.jks:/opt/bitnami/kafka/config/certs/kafka.keystore.jks:ro + - ./ci/pod/kafka/kafka-server/selfsigned.jks:/opt/bitnami/kafka/config/certs/kafka.truststore.jks:ro ## SkyWalking skywalking: @@ -247,35 +252,6 @@ services: xpack.security.enabled: 'true' - # The function services of OpenFunction - test-header: - image: test-header-image:latest - restart: unless-stopped - ports: - - "30583:8080" - environment: - CONTEXT_MODE: "self-host" - FUNC_CONTEXT: "{\"name\":\"HelloWorld\",\"version\":\"v1.0.0\",\"port\":\"8080\",\"runtime\":\"Knative\"}" - - test-uri: - image: test-uri-image:latest - restart: unless-stopped - ports: - - "30584:8080" - environment: - CONTEXT_MODE: "self-host" - FUNC_CONTEXT: "{\"name\":\"HelloWorld\",\"version\":\"v1.0.0\",\"port\":\"8080\",\"runtime\":\"Knative\"}" - - test-body: - image: test-body-image:latest - restart: unless-stopped - ports: - - "30585:8080" - environment: - CONTEXT_MODE: "self-host" - FUNC_CONTEXT: "{\"name\":\"HelloWorld\",\"version\":\"v1.0.0\",\"port\":\"8080\",\"runtime\":\"Knative\"}" - - networks: apisix_net: kafka_net: diff --git a/ci/pod/kafka/kafka-server/env/common2.env b/ci/pod/kafka/kafka-server/env/common2.env new file mode 100644 index 000000000000..d07bf6d1ad58 --- /dev/null +++ b/ci/pod/kafka/kafka-server/env/common2.env @@ -0,0 +1,8 @@ +ALLOW_PLAINTEXT_LISTENER=yes +KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE=false +KAFKA_CFG_LISTENERS=PLAINTEXT://0.0.0.0:9092,SASL_PLAINTEXT://0.0.0.0:9094 +KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://127.0.0.1:9092,SASL_PLAINTEXT://127.0.0.1:9094 +KAFKA_CFG_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM= +KAFKA_CFG_SSL_KEYSTORE_LOCATION=/opt/bitnami/kafka/config/certs/kafka.keystore.jks +KAFKA_CFG_SSL_KEYSTORE_PASSWORD=changeit +KAFKA_CFG_SSL_KEY_PASSWORD=changeit diff --git a/docs/en/latest/plugins/kafka-logger.md b/docs/en/latest/plugins/kafka-logger.md index 3d9904522aee..714cc6d780ba 100644 --- a/docs/en/latest/plugins/kafka-logger.md +++ b/docs/en/latest/plugins/kafka-logger.md @@ -37,16 +37,12 @@ It might take some time to receive the log data. It will be automatically sent a | Name | Type | Required | Default | Valid values | Description | | ---------------------- | ------- | -------- | -------------- | --------------------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------ | -<<<<<<< HEAD -| broker_list | object | True | | | List of Kafka brokers (nodes). | - -======= | broker_list | object | True | | | Deprecated, use `brokers` instead. List of Kafka brokers. (nodes). | | brokers | array | True | | | List of Kafka brokers (nodes). | | brokers.host | string | True | | | The host of Kafka broker, e.g, `192.168.1.1`. | | brokers.port | integer | True | | [0, 65535] | The port of Kafka broker | | brokers.sasl_config | object | False | | | The sasl config of Kafka broker | -| brokers.sasl_config.mechanism | string | False | PLAIN | "PLAIN" | The mechaism of sasl config | +| brokers.sasl_config.mechanism | string | False | "PLAIN" | ["PLAIN"] | The mechaism of sasl config | | brokers.sasl_config.user | string | True | | | The user of sasl_config.If sasl_config exists, it's required. | | brokers.sasl_config.password | string | True | | | The password of sasl_config.If sasl_config exists, it's required. | | kafka_topic | string | True | | | Target topic to push the logs for organisation. | @@ -65,11 +61,6 @@ It might take some time to receive the log data. It will be automatically sent a | producer_batch_size | integer | optional | 1048576 | [0,...] | `batch_size` parameter in [lua-resty-kafka](https://github.com/doujiang24/lua-resty-kafka) in bytes. | | producer_max_buffering | integer | optional | 50000 | [1,...] | `max_buffering` parameter in [lua-resty-kafka](https://github.com/doujiang24/lua-resty-kafka) representing maximum buffer size. Unit is message count. | | producer_time_linger | integer | optional | 1 | [1,...] | `flush_time` parameter in [lua-resty-kafka](https://github.com/doujiang24/lua-resty-kafka) in seconds. | -| client_ssl | boolean | optional | false | [true,false] | `ssl` parameter in [lua-resty-kafka](https://github.com/doujiang24/lua-resty-kafka) | -| client_ssl_verify | boolean | optional | false | [true,false] | `ssl_verify` parameter in [lua-resty-kafka](https://github.com/doujiang24/lua-resty-kafka) . | -| client_socket_timeout | integer | optional | 3000 | [1,...] | `socket_timeout` parameter in [lua-resty-kafka](https://github.com/doujiang24/lua-resty-kafka),Unit is seconds. | -| client_keepalive_timeout | integer | optional | 600 | [1,...] | `keepalive_timeout` parameter in [lua-resty-kafka](https://github.com/doujiang24/lua-resty-kafka) ,Unit is millisecond. | -| client_keepalive_size | integer | optional | 2 | [1,...] | `keepalive_size` parameter in [lua-resty-kafka](https://github.com/doujiang24/lua-resty-kafka) . | This Plugin supports using batch processors to aggregate and process entries (logs/data) in a batch. This avoids the need for frequently submitting the data. The batch processor submits data every `5` seconds or when the data in the queue reaches `1000`. See [Batch Processor](../batch-processor.md#configuration) for more information or setting your custom configuration. diff --git a/t/plugin/kafka-logger.t b/t/plugin/kafka-logger.t index ec3b9344ff6d..4064c10b43eb 100644 --- a/t/plugin/kafka-logger.t +++ b/t/plugin/kafka-logger.t @@ -595,7 +595,7 @@ qr/partition_id: 2/] -=== TEST 20: set route(id: 1) +=== TEST 20: set route with incorrect sasl_config --- config location /t { content_by_lua_block { @@ -603,32 +603,33 @@ qr/partition_id: 2/] local code, body = t('/apisix/admin/routes/1', ngx.HTTP_PUT, [[{ - "plugins": { - "kafka-logger": { - "broker_list" : - { - "127.0.0.1":9092 - }, - "kafka_topic" : "test2", - "key" : "key1", - "timeout" : 1, - "batch_max_size": 1, - "sasl_config": { - "mechanism": "PLAIN", - "user": "admin", - "password": "admin-secret" - } + "plugins":{ + "kafka-logger":{ + "brokers":[ + { + "host":"127.0.0.1", + "port":19094, + "sasl_config":{ + "mechanism":"PLAIN", + "user":"admin", + "password":"admin-secret2233" } + }], + "kafka_topic":"test2", + "key":"key1", + "timeout":1, + "batch_max_size":1 + } + }, + "upstream":{ + "nodes":{ + "127.0.0.1:1980":1 }, - "upstream": { - "nodes": { - "127.0.0.1:1980": 1 - }, - "type": "roundrobin" - }, - "uri": "/hello" + "type":"roundrobin" + }, + "uri":"/hello" }]] - ) + ) if code >= 300 then ngx.status = code end @@ -640,52 +641,18 @@ passed -=== TEST 21: inject create producer ---- extra_init_by_lua -    local producer = require("resty.kafka.producer") -    producer.new = function(self, broker_list, producer_config, cluster_name) - local opts = producer_config or {} - local cli = { - broker_list = broker_list, - topic_partitions = {}, - brokers = {}, - api_versions = {}, - client_id = "worker" .. pid(), - socket_config = { - socket_timeout = opts.socket_timeout or 3000, - keepalive_timeout = opts.keepalive_timeout or (600 * 1000), - keepalive_size = opts.keepalive_size or 2, - ssl = opts.ssl or false, - ssl_verify = opts.ssl_verify or false, - resolver = opts.resolver or nil - } - } - return { - client = cli, - correlation_id = 1, - request_timeout = opts.request_timeout or 2000, - retry_backoff = opts.retry_backoff or 100, - max_retry = opts.max_retry or 3, - required_acks = opts.required_acks or 1, - partitioner = opts.partitioner, - error_handle = opts.error_handle, - api_version = opts.api_version or 0, - async = opts.producer_type == "async", - socket_config = cli.socket_config, - _timer_flushing_buffer = false, - ringbuffer = ringbuffer:new(opts.batch_num or 200, opts.max_buffering or 50000), - sendbuffer = sendbuffer:new(opts.batch_num or 200, opts.batch_size or 1048576) - } -    end +=== TEST 21: hit route, failed to send data to kafka --- request GET /hello --- response_body hello world +--- error_log +failed to do PLAIN auth with 127.0.0.1:19094: Authentication failed: Invalid username or password --- wait: 2 -=== TEST 22: error log +=== TEST 22: set route with correct sasl_config --- config location /t { content_by_lua_block { @@ -693,44 +660,51 @@ hello world local code, body = t('/apisix/admin/routes/1', ngx.HTTP_PUT, [[{ - "plugins": { - "kafka-logger": { - "broker_list" : - { - "127.0.0.1":9092, - "127.0.0.1":9093 - }, - "kafka_topic" : "test2", - "key" : "key1", - "timeout" : 1, - "batch_max_size": 1, - "sasl_config": { - "mechanism": "PLAIN", - "user": "admin", - "password": "admin-secret" - } - } - }, - "upstream": { - "nodes": { - "127.0.0.1:1980": 1 - }, - "type": "roundrobin" + "plugins":{ + "kafka-logger":{ + "brokers":[ + { + "host":"127.0.0.1", + "port":19094, + "sasl_config":{ + "mechanism":"PLAIN", + "user":"admin", + "password":"admin-secret" + } + }], + "kafka_topic":"test2", + "key":"key1", + "timeout":1, + "batch_max_size":1, + "include_req_body": true + } + }, + "upstream":{ + "nodes":{ + "127.0.0.1:1980":1 }, - "uri": "/hello" + "type":"roundrobin" + }, + "uri":"/hello" }]] - ) + ) if code >= 300 then ngx.status = code end ngx.say(body) - local http = require "resty.http" - local httpc = http.new() - local uri = "http://127.0.0.1:" .. ngx.var.server_port .. "/hello" - local res, err = httpc:request_uri(uri, {method = "GET"}) } } ---- error_log -failed to send data to Kafka topic -[error] ---- wait: 1 +--- response_body +passed + + + +=== TEST 23: hit route, send data to kafka successfully +--- request +POST /hello?name=qwerty +abcdef +--- response_body +hello world +--- error_log eval +qr/send data to kafka: \{.*"body":"abcdef"/ +--- wait: 2 From 113184221b512a33373db2f42de31e6733ea8ba7 Mon Sep 17 00:00:00 2001 From: starsz Date: Sun, 9 Oct 2022 17:38:07 +0800 Subject: [PATCH 35/44] fix: lint --- apisix/plugins/kafka-logger.lua | 5 ++++- docs/en/latest/plugins/kafka-logger.md | 6 +++--- t/plugin/kafka-logger.t | 2 +- 3 files changed, 8 insertions(+), 5 deletions(-) diff --git a/apisix/plugins/kafka-logger.lua b/apisix/plugins/kafka-logger.lua index 972a5f5ffa3b..5f8972b39bac 100644 --- a/apisix/plugins/kafka-logger.lua +++ b/apisix/plugins/kafka-logger.lua @@ -72,7 +72,10 @@ local schema = { type = "object", description = "sasl config", properties = { - mechanism = { type = "string", description = "mechanism", default = "PLAIN" }, + mechanism = { + type = "string", + default = "PLAIN" + }, user = { type = "string", description = "user" }, password = { type = "string", description = "password" }, }, diff --git a/docs/en/latest/plugins/kafka-logger.md b/docs/en/latest/plugins/kafka-logger.md index 714cc6d780ba..b065dc4e38f3 100644 --- a/docs/en/latest/plugins/kafka-logger.md +++ b/docs/en/latest/plugins/kafka-logger.md @@ -42,9 +42,9 @@ It might take some time to receive the log data. It will be automatically sent a | brokers.host | string | True | | | The host of Kafka broker, e.g, `192.168.1.1`. | | brokers.port | integer | True | | [0, 65535] | The port of Kafka broker | | brokers.sasl_config | object | False | | | The sasl config of Kafka broker | -| brokers.sasl_config.mechanism | string | False | "PLAIN" | ["PLAIN"] | The mechaism of sasl config | -| brokers.sasl_config.user | string | True | | | The user of sasl_config.If sasl_config exists, it's required. | -| brokers.sasl_config.password | string | True | | | The password of sasl_config.If sasl_config exists, it's required. | +| brokers.sasl_config.mechanism | string | False | "PLAIN" | ["PLAIN"] | The mechaism of sasl config | +| brokers.sasl_config.user | string | True | | | The user of sasl_config.If sasl_config exists, it's required. | +| brokers.sasl_config.password | string | True | | | The password of sasl_config.If sasl_config exists, it's required. | | kafka_topic | string | True | | | Target topic to push the logs for organisation. | | producer_type | string | False | async | ["async", "sync"] | Message sending mode of the producer. | | required_acks | integer | False | 1 | [0, 1, -1] | Number of acknowledgements the leader needs to receive for the producer to consider the request complete. This controls the durability of the sent records. The attribute follows the same configuration as the Kafka `acks` attribute. See [Apache Kafka documentation](https://kafka.apache.org/documentation/#producerconfigs_acks) for more. | diff --git a/t/plugin/kafka-logger.t b/t/plugin/kafka-logger.t index 4064c10b43eb..777e503292d2 100644 --- a/t/plugin/kafka-logger.t +++ b/t/plugin/kafka-logger.t @@ -641,7 +641,7 @@ passed -=== TEST 21: hit route, failed to send data to kafka +=== TEST 21: hit route, failed to send data to kafka --- request GET /hello --- response_body From 8612380b7bc311b895fab049a2555d87297b7cb2 Mon Sep 17 00:00:00 2001 From: starsz Date: Sun, 9 Oct 2022 17:40:30 +0800 Subject: [PATCH 36/44] fix: docker-compose --- ci/pod/docker-compose.plugin.yml | 28 ++++++++++++++++++++++++++++ 1 file changed, 28 insertions(+) diff --git a/ci/pod/docker-compose.plugin.yml b/ci/pod/docker-compose.plugin.yml index f16fa269304b..3dbcd1c5efb9 100644 --- a/ci/pod/docker-compose.plugin.yml +++ b/ci/pod/docker-compose.plugin.yml @@ -251,6 +251,34 @@ services: transport.tcp.port: 9301 xpack.security.enabled: 'true' + # The function services of OpenFunction + test-header: + image: test-header-image:latest + restart: unless-stopped + ports: + - "30583:8080" + environment: + CONTEXT_MODE: "self-host" + FUNC_CONTEXT: "{\"name\":\"HelloWorld\",\"version\":\"v1.0.0\",\"port\":\"8080\",\"runtime\":\"Knative\"}" + + test-uri: + image: test-uri-image:latest + restart: unless-stopped + ports: + - "30584:8080" + environment: + CONTEXT_MODE: "self-host" + FUNC_CONTEXT: "{\"name\":\"HelloWorld\",\"version\":\"v1.0.0\",\"port\":\"8080\",\"runtime\":\"Knative\"}" + + test-body: + image: test-body-image:latest + restart: unless-stopped + ports: + - "30585:8080" + environment: + CONTEXT_MODE: "self-host" + FUNC_CONTEXT: "{\"name\":\"HelloWorld\",\"version\":\"v1.0.0\",\"port\":\"8080\",\"runtime\":\"Knative\"}" + networks: apisix_net: From d8b59c2c4f124af83f8753093dd8d78e2ebdc4d3 Mon Sep 17 00:00:00 2001 From: starsz Date: Mon, 10 Oct 2022 10:20:07 +0800 Subject: [PATCH 37/44] doc: chinese doc --- ci/pod/docker-compose.plugin.yml | 1 + docs/zh/latest/plugins/kafka-logger.md | 4 ++++ t/plugin/kafka-logger2.t | 32 ++++++++++++++++++++++++++ 3 files changed, 37 insertions(+) diff --git a/ci/pod/docker-compose.plugin.yml b/ci/pod/docker-compose.plugin.yml index 3dbcd1c5efb9..c2c053253707 100644 --- a/ci/pod/docker-compose.plugin.yml +++ b/ci/pod/docker-compose.plugin.yml @@ -251,6 +251,7 @@ services: transport.tcp.port: 9301 xpack.security.enabled: 'true' + # The function services of OpenFunction test-header: image: test-header-image:latest diff --git a/docs/zh/latest/plugins/kafka-logger.md b/docs/zh/latest/plugins/kafka-logger.md index e14e1ed626e2..5fe782441877 100644 --- a/docs/zh/latest/plugins/kafka-logger.md +++ b/docs/zh/latest/plugins/kafka-logger.md @@ -39,6 +39,10 @@ description: API 网关 Apache APISIX 的 kafka-logger 插件用于将日志作 | brokers | array | 是 | | | 需要推送的 Kafka 的 broker 列表。 | | brokers.host | string | 是 | | | Kafka broker 的节点 host 配置,例如 `192.168.1.1` | | brokers.port | string | 是 | | | Kafka broker 的节点端口配置 | +| brokers.sasl_config | object | 否 | | | Kafka broker 中的 sasl_config | +| brokers.sasl_config.mechanism | string | 否 | "PLAIN" | ["PLAIN"] | Kafka broker 中的 sasl 认证机制 | +| brokers.sasl_config.user | string | 是 | | | Kafka broker 中 sasl 配置中的 user, 如果 sasl_config 存在,则必须填写 | +| brokers.sasl_config.password | string | 是 | | | Kafka broker 中 sasl 配置中的 password, 如果 sasl_config 存在,则必须填写 | | kafka_topic | string | 是 | | | 需要推送的 topic。 | | producer_type | string | 否 | async | ["async", "sync"] | 生产者发送消息的模式。 | | required_acks | integer | 否 | 1 | [0, 1, -1] | 生产者在确认一个请求发送完成之前需要收到的反馈信息的数量。该参数是为了保证发送请求的可靠性。该属性的配置与 Kafka `acks` 属性相同,具体配置请参考 [Apache Kafka 文档](https://kafka.apache.org/documentation/#producerconfigs_acks)。 | diff --git a/t/plugin/kafka-logger2.t b/t/plugin/kafka-logger2.t index 98179734b62e..c2d99af7381c 100644 --- a/t/plugin/kafka-logger2.t +++ b/t/plugin/kafka-logger2.t @@ -406,6 +406,36 @@ qr/not found topic, retryable: true, topic: undefined_topic, partition_id: -1/ key = "key1", }, }, + { + input = { + brokers = { + { + host = "127.0.0.1", + port = 9093, + sasl_config = { + user = "admin", + }, + }, + }, + kafka_topic = "test", + key = "key1", + }, + }, + { + input = { + brokers = { + { + host = "127.0.0.1", + port = 9093, + sasl_config = { + password = "admin-secret", + }, + }, + }, + kafka_topic = "test", + key = "key1", + }, + }, } local plugin = require("apisix.plugins.kafka-logger") @@ -433,6 +463,8 @@ property "brokers" validation failed: failed to validate item 1: property "host" property "brokers" validation failed: failed to validate item 1: property "port" validation failed: wrong type: expected integer, got string property "brokers" validation failed: failed to validate item 1: property "port" validation failed: expected 0 to be at least 1 property "brokers" validation failed: failed to validate item 1: property "port" validation failed: expected 65536 to be at most 65535 +property "brokers" validation failed: failed to validate item 1: property "sasl_config" validation failed: property "password" is required +property "brokers" validation failed: failed to validate item 1: property "sasl_config" validation failed: property "user" is required From b6c4c6a496ba691055b7cd19ccdc522bb474c08d Mon Sep 17 00:00:00 2001 From: starsz Date: Mon, 10 Oct 2022 10:22:03 +0800 Subject: [PATCH 38/44] doc: fix chinese doc --- docs/zh/latest/plugins/kafka-logger.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/docs/zh/latest/plugins/kafka-logger.md b/docs/zh/latest/plugins/kafka-logger.md index 5fe782441877..c6f052bac1ad 100644 --- a/docs/zh/latest/plugins/kafka-logger.md +++ b/docs/zh/latest/plugins/kafka-logger.md @@ -41,8 +41,8 @@ description: API 网关 Apache APISIX 的 kafka-logger 插件用于将日志作 | brokers.port | string | 是 | | | Kafka broker 的节点端口配置 | | brokers.sasl_config | object | 否 | | | Kafka broker 中的 sasl_config | | brokers.sasl_config.mechanism | string | 否 | "PLAIN" | ["PLAIN"] | Kafka broker 中的 sasl 认证机制 | -| brokers.sasl_config.user | string | 是 | | | Kafka broker 中 sasl 配置中的 user, 如果 sasl_config 存在,则必须填写 | -| brokers.sasl_config.password | string | 是 | | | Kafka broker 中 sasl 配置中的 password, 如果 sasl_config 存在,则必须填写 | +| brokers.sasl_config.user | string | 是 | | | Kafka broker 中 sasl 配置中的 user, 如果 sasl_config 存在,则必须填写 | +| brokers.sasl_config.password | string | 是 | | | Kafka broker 中 sasl 配置中的 password, 如果 sasl_config 存在,则必须填写 | | kafka_topic | string | 是 | | | 需要推送的 topic。 | | producer_type | string | 否 | async | ["async", "sync"] | 生产者发送消息的模式。 | | required_acks | integer | 否 | 1 | [0, 1, -1] | 生产者在确认一个请求发送完成之前需要收到的反馈信息的数量。该参数是为了保证发送请求的可靠性。该属性的配置与 Kafka `acks` 属性相同,具体配置请参考 [Apache Kafka 文档](https://kafka.apache.org/documentation/#producerconfigs_acks)。 | From 23dc1b1957cd8551112f7e2c4d6380324cea70e0 Mon Sep 17 00:00:00 2001 From: Peter Zhu Date: Mon, 10 Oct 2022 14:33:50 +0800 Subject: [PATCH 39/44] Update docs/en/latest/plugins/kafka-logger.md MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: 罗泽轩 --- docs/en/latest/plugins/kafka-logger.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/en/latest/plugins/kafka-logger.md b/docs/en/latest/plugins/kafka-logger.md index b065dc4e38f3..fcbef00448fd 100644 --- a/docs/en/latest/plugins/kafka-logger.md +++ b/docs/en/latest/plugins/kafka-logger.md @@ -43,7 +43,7 @@ It might take some time to receive the log data. It will be automatically sent a | brokers.port | integer | True | | [0, 65535] | The port of Kafka broker | | brokers.sasl_config | object | False | | | The sasl config of Kafka broker | | brokers.sasl_config.mechanism | string | False | "PLAIN" | ["PLAIN"] | The mechaism of sasl config | -| brokers.sasl_config.user | string | True | | | The user of sasl_config.If sasl_config exists, it's required. | +| brokers.sasl_config.user | string | True | | | The user of sasl_config. If sasl_config exists, it's required. | | brokers.sasl_config.password | string | True | | | The password of sasl_config.If sasl_config exists, it's required. | | kafka_topic | string | True | | | Target topic to push the logs for organisation. | | producer_type | string | False | async | ["async", "sync"] | Message sending mode of the producer. | From 1825dc8250792189ada480f417365c2de4f2f602 Mon Sep 17 00:00:00 2001 From: Peter Zhu Date: Mon, 10 Oct 2022 14:34:04 +0800 Subject: [PATCH 40/44] Update docs/en/latest/plugins/kafka-logger.md MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: 罗泽轩 --- docs/en/latest/plugins/kafka-logger.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/en/latest/plugins/kafka-logger.md b/docs/en/latest/plugins/kafka-logger.md index fcbef00448fd..2f49108a7051 100644 --- a/docs/en/latest/plugins/kafka-logger.md +++ b/docs/en/latest/plugins/kafka-logger.md @@ -44,7 +44,7 @@ It might take some time to receive the log data. It will be automatically sent a | brokers.sasl_config | object | False | | | The sasl config of Kafka broker | | brokers.sasl_config.mechanism | string | False | "PLAIN" | ["PLAIN"] | The mechaism of sasl config | | brokers.sasl_config.user | string | True | | | The user of sasl_config. If sasl_config exists, it's required. | -| brokers.sasl_config.password | string | True | | | The password of sasl_config.If sasl_config exists, it's required. | +| brokers.sasl_config.password | string | True | | | The password of sasl_config. If sasl_config exists, it's required. | | kafka_topic | string | True | | | Target topic to push the logs for organisation. | | producer_type | string | False | async | ["async", "sync"] | Message sending mode of the producer. | | required_acks | integer | False | 1 | [0, 1, -1] | Number of acknowledgements the leader needs to receive for the producer to consider the request complete. This controls the durability of the sent records. The attribute follows the same configuration as the Kafka `acks` attribute. See [Apache Kafka documentation](https://kafka.apache.org/documentation/#producerconfigs_acks) for more. | From d0e53fd169e257b074a2080194776b1999e6a292 Mon Sep 17 00:00:00 2001 From: Peter Zhu Date: Mon, 10 Oct 2022 14:34:13 +0800 Subject: [PATCH 41/44] Update docs/zh/latest/plugins/kafka-logger.md MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: 罗泽轩 --- docs/zh/latest/plugins/kafka-logger.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/zh/latest/plugins/kafka-logger.md b/docs/zh/latest/plugins/kafka-logger.md index c6f052bac1ad..f3b94330ba77 100644 --- a/docs/zh/latest/plugins/kafka-logger.md +++ b/docs/zh/latest/plugins/kafka-logger.md @@ -41,7 +41,7 @@ description: API 网关 Apache APISIX 的 kafka-logger 插件用于将日志作 | brokers.port | string | 是 | | | Kafka broker 的节点端口配置 | | brokers.sasl_config | object | 否 | | | Kafka broker 中的 sasl_config | | brokers.sasl_config.mechanism | string | 否 | "PLAIN" | ["PLAIN"] | Kafka broker 中的 sasl 认证机制 | -| brokers.sasl_config.user | string | 是 | | | Kafka broker 中 sasl 配置中的 user, 如果 sasl_config 存在,则必须填写 | +| brokers.sasl_config.user | string | 是 | | | Kafka broker 中 sasl 配置中的 user,如果 sasl_config 存在,则必须填写 | | brokers.sasl_config.password | string | 是 | | | Kafka broker 中 sasl 配置中的 password, 如果 sasl_config 存在,则必须填写 | | kafka_topic | string | 是 | | | 需要推送的 topic。 | | producer_type | string | 否 | async | ["async", "sync"] | 生产者发送消息的模式。 | From 7cec61ac581e79782291a0cf416de76ade0b1344 Mon Sep 17 00:00:00 2001 From: Peter Zhu Date: Mon, 10 Oct 2022 14:34:21 +0800 Subject: [PATCH 42/44] Update docs/zh/latest/plugins/kafka-logger.md MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: 罗泽轩 --- docs/zh/latest/plugins/kafka-logger.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/zh/latest/plugins/kafka-logger.md b/docs/zh/latest/plugins/kafka-logger.md index f3b94330ba77..5a6fd987bea5 100644 --- a/docs/zh/latest/plugins/kafka-logger.md +++ b/docs/zh/latest/plugins/kafka-logger.md @@ -42,7 +42,7 @@ description: API 网关 Apache APISIX 的 kafka-logger 插件用于将日志作 | brokers.sasl_config | object | 否 | | | Kafka broker 中的 sasl_config | | brokers.sasl_config.mechanism | string | 否 | "PLAIN" | ["PLAIN"] | Kafka broker 中的 sasl 认证机制 | | brokers.sasl_config.user | string | 是 | | | Kafka broker 中 sasl 配置中的 user,如果 sasl_config 存在,则必须填写 | -| brokers.sasl_config.password | string | 是 | | | Kafka broker 中 sasl 配置中的 password, 如果 sasl_config 存在,则必须填写 | +| brokers.sasl_config.password | string | 是 | | | Kafka broker 中 sasl 配置中的 password,如果 sasl_config 存在,则必须填写 | | kafka_topic | string | 是 | | | 需要推送的 topic。 | | producer_type | string | 否 | async | ["async", "sync"] | 生产者发送消息的模式。 | | required_acks | integer | 否 | 1 | [0, 1, -1] | 生产者在确认一个请求发送完成之前需要收到的反馈信息的数量。该参数是为了保证发送请求的可靠性。该属性的配置与 Kafka `acks` 属性相同,具体配置请参考 [Apache Kafka 文档](https://kafka.apache.org/documentation/#producerconfigs_acks)。 | From 8f08476b8f37cce93c633a0ffd9e36bf1d076988 Mon Sep 17 00:00:00 2001 From: starsz Date: Mon, 10 Oct 2022 14:41:51 +0800 Subject: [PATCH 43/44] chore: add enum for mechanism --- apisix/plugins/kafka-logger.lua | 3 ++- t/plugin/kafka-logger2.t | 18 ++++++++++++++++++ 2 files changed, 20 insertions(+), 1 deletion(-) diff --git a/apisix/plugins/kafka-logger.lua b/apisix/plugins/kafka-logger.lua index 5f8972b39bac..ee8453e2610d 100644 --- a/apisix/plugins/kafka-logger.lua +++ b/apisix/plugins/kafka-logger.lua @@ -74,7 +74,8 @@ local schema = { properties = { mechanism = { type = "string", - default = "PLAIN" + default = "PLAIN", + enum = {"PLAIN"}, }, user = { type = "string", description = "user" }, password = { type = "string", description = "password" }, diff --git a/t/plugin/kafka-logger2.t b/t/plugin/kafka-logger2.t index c2d99af7381c..6f670bbaf46c 100644 --- a/t/plugin/kafka-logger2.t +++ b/t/plugin/kafka-logger2.t @@ -406,6 +406,23 @@ qr/not found topic, retryable: true, topic: undefined_topic, partition_id: -1/ key = "key1", }, }, + { + input = { + brokers = { + { + host = "127.0.0.1", + port = 9093, + sasl_config = { + mechanism = "INVALID", + user = "admin", + password = "admin-secret", + }, + }, + }, + kafka_topic = "test", + key = "key1", + }, + }, { input = { brokers = { @@ -463,6 +480,7 @@ property "brokers" validation failed: failed to validate item 1: property "host" property "brokers" validation failed: failed to validate item 1: property "port" validation failed: wrong type: expected integer, got string property "brokers" validation failed: failed to validate item 1: property "port" validation failed: expected 0 to be at least 1 property "brokers" validation failed: failed to validate item 1: property "port" validation failed: expected 65536 to be at most 65535 +property "brokers" validation failed: failed to validate item 1: property "sasl_config" validation failed: property "mechanism" validation failed: matches none of the enum values property "brokers" validation failed: failed to validate item 1: property "sasl_config" validation failed: property "password" is required property "brokers" validation failed: failed to validate item 1: property "sasl_config" validation failed: property "user" is required From 5d00e9a21a54b0c4e8750c37993613f972915c98 Mon Sep 17 00:00:00 2001 From: starsz Date: Tue, 11 Oct 2022 09:57:53 +0800 Subject: [PATCH 44/44] fix: docker compose --- ci/pod/docker-compose.plugin.yml | 2 -- 1 file changed, 2 deletions(-) diff --git a/ci/pod/docker-compose.plugin.yml b/ci/pod/docker-compose.plugin.yml index c2c053253707..6715c7a33433 100644 --- a/ci/pod/docker-compose.plugin.yml +++ b/ci/pod/docker-compose.plugin.yml @@ -118,8 +118,6 @@ services: kafka_net: volumes: - ./ci/pod/kafka/kafka-server/kafka_jaas.conf:/opt/bitnami/kafka/config/kafka_jaas.conf:ro - - ./ci/pod/kafka/kafka-server/selfsigned.jks:/opt/bitnami/kafka/config/certs/kafka.keystore.jks:ro - - ./ci/pod/kafka/kafka-server/selfsigned.jks:/opt/bitnami/kafka/config/certs/kafka.truststore.jks:ro ## SkyWalking skywalking: