Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(kafka-logger): support sasl config in brokers #8050

Merged
merged 45 commits into from
Oct 11, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
45 commits
Select commit Hold shift + click to select a range
40ddca3
feat(kafka-logger): support sasl conf
biubiue Aug 24, 2022
21be757
feat(kafka-logger): doc add sasl conf
biubiue Aug 25, 2022
fad3df6
fix(kafka-logger): doc
biubiue Aug 25, 2022
de82ec3
feat(kafka-logger): test case
biubiue Aug 28, 2022
33cd466
feat(kafka-logger): modify test case code style
biubiue Aug 28, 2022
7e1db99
fix(kafka-logger): test case code style
biubiue Aug 28, 2022
ebeebc8
fix(kafka-logger): test case code style
biubiue Aug 28, 2022
cd68da2
fix(kafka-logger): test case code style
biubiue Aug 28, 2022
0104c5a
fix(kafka-logger): test case code style
biubiue Aug 28, 2022
503011c
Update kafka-logger.t
biubiue Aug 28, 2022
18358aa
fix(kafka-logger): test case
biubiue Sep 1, 2022
3812c9c
fix(kafka-logger): test case
biubiue Sep 1, 2022
c8baf35
Update kafka-logger.t
biubiue Sep 1, 2022
d348987
Update kafka-logger.t
biubiue Sep 1, 2022
72f0d06
Update kafka-logger.t
biubiue Sep 2, 2022
5b7dd8c
fix(kafka-logger): test case
biubiue Sep 6, 2022
9f1817a
fix(kafka-logger): try other actions
biubiue Sep 6, 2022
588bb0f
Update kafka-logger.t
biubiue Sep 6, 2022
3a90b5d
Update kafka-logger.t
biubiue Sep 7, 2022
d10ea12
Update kafka-logger.t
biubiue Sep 7, 2022
70054aa
Update kafka-logger.t
biubiue Sep 7, 2022
35a581b
Update kafka-logger.t
biubiue Sep 7, 2022
9d9f005
fix(kafka-logger): modify sasl_config properties
biubiue Sep 8, 2022
8c41473
Update kafka-logger.md
biubiue Sep 8, 2022
4e1cc34
Update kafka-logger.t
biubiue Sep 9, 2022
ddde433
Update kafka-logger.lua
biubiue Sep 9, 2022
0444c00
Update kafka-logger.md
biubiue Sep 9, 2022
cc90a47
Update kafka-logger.lua
biubiue Sep 9, 2022
290070c
fix(kafka-logger): test case
biubiue Sep 13, 2022
1481240
fix(kafka-logger): test case
biubiue Sep 26, 2022
6968d48
test case
biubiue Sep 26, 2022
11d8dc6
test case
biubiue Sep 27, 2022
8ca60db
test case
biubiue Sep 27, 2022
c68948f
Merge branch 'master' into kafka_sasl
starsz Oct 9, 2022
82e81e6
fix: ci
starsz Oct 9, 2022
1131842
fix: lint
starsz Oct 9, 2022
8612380
fix: docker-compose
starsz Oct 9, 2022
d8b59c2
doc: chinese doc
starsz Oct 10, 2022
b6c4c6a
doc: fix chinese doc
starsz Oct 10, 2022
23dc1b1
Update docs/en/latest/plugins/kafka-logger.md
starsz Oct 10, 2022
1825dc8
Update docs/en/latest/plugins/kafka-logger.md
starsz Oct 10, 2022
d0e53fd
Update docs/zh/latest/plugins/kafka-logger.md
starsz Oct 10, 2022
7cec61a
Update docs/zh/latest/plugins/kafka-logger.md
starsz Oct 10, 2022
8f08476
chore: add enum for mechanism
starsz Oct 10, 2022
5d00e9a
fix: docker compose
starsz Oct 11, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 15 additions & 1 deletion apisix/plugins/kafka-logger.lua
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,20 @@ local schema = {
maximum = 65535,
description = "the port of kafka broker",
},
sasl_config = {
type = "object",
description = "sasl config",
properties = {
mechanism = {
type = "string",
default = "PLAIN",
enum = {"PLAIN"},
},
user = { type = "string", description = "user" },
password = { type = "string", description = "password" },
},
required = {"user", "password"},
},
},
required = {"host", "port"},
},
Expand Down Expand Up @@ -109,7 +123,7 @@ local schema = {
producer_batch_num = {type = "integer", minimum = 1, default = 200},
producer_batch_size = {type = "integer", minimum = 0, default = 1048576},
producer_max_buffering = {type = "integer", minimum = 1, default = 50000},
producer_time_linger = {type = "integer", minimum = 1, default = 1}
producer_time_linger = {type = "integer", minimum = 1, default = 1},
},
oneOf = {
{ required = {"broker_list", "kafka_topic"},},
Expand Down
5 changes: 4 additions & 1 deletion ci/pod/docker-compose.plugin.yml
Original file line number Diff line number Diff line change
Expand Up @@ -104,17 +104,20 @@ services:
kafka-server2:
image: bitnami/kafka:2.8.1
env_file:
- ci/pod/kafka/kafka-server/env/common.env
- ci/pod/kafka/kafka-server/env/common2.env
environment:
KAFKA_CFG_ZOOKEEPER_CONNECT: zookeeper-server2:2181
restart: unless-stopped
ports:
- "19092:9092"
- "19094:9094"
depends_on:
- zookeeper-server1
- zookeeper-server2
networks:
kafka_net:
volumes:
- ./ci/pod/kafka/kafka-server/kafka_jaas.conf:/opt/bitnami/kafka/config/kafka_jaas.conf:ro

## SkyWalking
skywalking:
Expand Down
8 changes: 8 additions & 0 deletions ci/pod/kafka/kafka-server/env/common2.env
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
ALLOW_PLAINTEXT_LISTENER=yes
KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE=false
KAFKA_CFG_LISTENERS=PLAINTEXT://0.0.0.0:9092,SASL_PLAINTEXT://0.0.0.0:9094
KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://127.0.0.1:9092,SASL_PLAINTEXT://127.0.0.1:9094
KAFKA_CFG_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM=
KAFKA_CFG_SSL_KEYSTORE_LOCATION=/opt/bitnami/kafka/config/certs/kafka.keystore.jks
KAFKA_CFG_SSL_KEYSTORE_PASSWORD=changeit
KAFKA_CFG_SSL_KEY_PASSWORD=changeit
4 changes: 4 additions & 0 deletions docs/en/latest/plugins/kafka-logger.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,10 @@ It might take some time to receive the log data. It will be automatically sent a
| brokers | array | True | | | List of Kafka brokers (nodes). |
| brokers.host | string | True | | | The host of Kafka broker, e.g, `192.168.1.1`. |
| brokers.port | integer | True | | [0, 65535] | The port of Kafka broker |
| brokers.sasl_config | object | False | | | The sasl config of Kafka broker |
| brokers.sasl_config.mechanism | string | False | "PLAIN" | ["PLAIN"] | The mechaism of sasl config |
| brokers.sasl_config.user | string | True | | | The user of sasl_config. If sasl_config exists, it's required. |
| brokers.sasl_config.password | string | True | | | The password of sasl_config. If sasl_config exists, it's required. |
| kafka_topic | string | True | | | Target topic to push the logs for organisation. |
| producer_type | string | False | async | ["async", "sync"] | Message sending mode of the producer. |
| required_acks | integer | False | 1 | [0, 1, -1] | Number of acknowledgements the leader needs to receive for the producer to consider the request complete. This controls the durability of the sent records. The attribute follows the same configuration as the Kafka `acks` attribute. See [Apache Kafka documentation](https://kafka.apache.org/documentation/#producerconfigs_acks) for more. |
Expand Down
4 changes: 4 additions & 0 deletions docs/zh/latest/plugins/kafka-logger.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,10 @@ description: API 网关 Apache APISIX 的 kafka-logger 插件用于将日志作
| brokers | array | 是 | | | 需要推送的 Kafka 的 broker 列表。 |
| brokers.host | string | 是 | | | Kafka broker 的节点 host 配置,例如 `192.168.1.1` |
| brokers.port | string | 是 | | | Kafka broker 的节点端口配置 |
| brokers.sasl_config | object | 否 | | | Kafka broker 中的 sasl_config |
| brokers.sasl_config.mechanism | string | 否 | "PLAIN" | ["PLAIN"] | Kafka broker 中的 sasl 认证机制 |
| brokers.sasl_config.user | string | 是 | | | Kafka broker 中 sasl 配置中的 user,如果 sasl_config 存在,则必须填写 |
| brokers.sasl_config.password | string | 是 | | | Kafka broker 中 sasl 配置中的 password,如果 sasl_config 存在,则必须填写 |
| kafka_topic | string | 是 | | | 需要推送的 topic。 |
| producer_type | string | 否 | async | ["async", "sync"] | 生产者发送消息的模式。 |
| required_acks | integer | 否 | 1 | [0, 1, -1] | 生产者在确认一个请求发送完成之前需要收到的反馈信息的数量。该参数是为了保证发送请求的可靠性。该属性的配置与 Kafka `acks` 属性相同,具体配置请参考 [Apache Kafka 文档](https://kafka.apache.org/documentation/#producerconfigs_acks)。 |
Expand Down
116 changes: 116 additions & 0 deletions t/plugin/kafka-logger.t
Original file line number Diff line number Diff line change
Expand Up @@ -592,3 +592,119 @@ qr/partition_id: 2/]
[qr/partition_id: 1/,
qr/partition_id: 0/,
qr/partition_id: 2/]



=== TEST 20: set route with incorrect sasl_config
--- config
location /t {
content_by_lua_block {
local t = require("lib.test_admin").test
local code, body = t('/apisix/admin/routes/1',
ngx.HTTP_PUT,
[[{
"plugins":{
"kafka-logger":{
"brokers":[
{
"host":"127.0.0.1",
"port":19094,
"sasl_config":{
"mechanism":"PLAIN",
"user":"admin",
"password":"admin-secret2233"
}
}],
"kafka_topic":"test2",
"key":"key1",
"timeout":1,
"batch_max_size":1
}
},
"upstream":{
"nodes":{
"127.0.0.1:1980":1
},
"type":"roundrobin"
},
"uri":"/hello"
}]]
)
if code >= 300 then
ngx.status = code
end
ngx.say(body)
}
}
--- response_body
passed



=== TEST 21: hit route, failed to send data to kafka
--- request
GET /hello
--- response_body
hello world
--- error_log
failed to do PLAIN auth with 127.0.0.1:19094: Authentication failed: Invalid username or password
--- wait: 2



=== TEST 22: set route with correct sasl_config
--- config
location /t {
content_by_lua_block {
local t = require("lib.test_admin").test
local code, body = t('/apisix/admin/routes/1',
ngx.HTTP_PUT,
[[{
"plugins":{
"kafka-logger":{
"brokers":[
{
"host":"127.0.0.1",
"port":19094,
"sasl_config":{
"mechanism":"PLAIN",
"user":"admin",
"password":"admin-secret"
}
}],
"kafka_topic":"test2",
"key":"key1",
"timeout":1,
"batch_max_size":1,
"include_req_body": true
}
},
"upstream":{
"nodes":{
"127.0.0.1:1980":1
},
"type":"roundrobin"
},
"uri":"/hello"
}]]
)
if code >= 300 then
ngx.status = code
end
ngx.say(body)
}
}
--- response_body
passed



=== TEST 23: hit route, send data to kafka successfully
--- request
POST /hello?name=qwerty
abcdef
--- response_body
hello world
--- error_log eval
qr/send data to kafka: \{.*"body":"abcdef"/
--- wait: 2
50 changes: 50 additions & 0 deletions t/plugin/kafka-logger2.t
Original file line number Diff line number Diff line change
Expand Up @@ -406,6 +406,53 @@ qr/not found topic, retryable: true, topic: undefined_topic, partition_id: -1/
key = "key1",
},
},
{
input = {
brokers = {
{
host = "127.0.0.1",
port = 9093,
sasl_config = {
mechanism = "INVALID",
user = "admin",
password = "admin-secret",
},
},
},
kafka_topic = "test",
key = "key1",
},
},
{
input = {
brokers = {
{
host = "127.0.0.1",
port = 9093,
sasl_config = {
user = "admin",
},
},
},
kafka_topic = "test",
key = "key1",
},
},
{
input = {
brokers = {
{
host = "127.0.0.1",
port = 9093,
sasl_config = {
password = "admin-secret",
},
},
},
kafka_topic = "test",
key = "key1",
},
},
}

local plugin = require("apisix.plugins.kafka-logger")
Expand Down Expand Up @@ -433,6 +480,9 @@ property "brokers" validation failed: failed to validate item 1: property "host"
property "brokers" validation failed: failed to validate item 1: property "port" validation failed: wrong type: expected integer, got string
property "brokers" validation failed: failed to validate item 1: property "port" validation failed: expected 0 to be at least 1
property "brokers" validation failed: failed to validate item 1: property "port" validation failed: expected 65536 to be at most 65535
property "brokers" validation failed: failed to validate item 1: property "sasl_config" validation failed: property "mechanism" validation failed: matches none of the enum values
property "brokers" validation failed: failed to validate item 1: property "sasl_config" validation failed: property "password" is required
property "brokers" validation failed: failed to validate item 1: property "sasl_config" validation failed: property "user" is required



Expand Down