Skip to content

Commit

Permalink
feat(kafka-logger): support sasl config in brokers (#8050)
Browse files Browse the repository at this point in the history
Co-authored-by: 罗泽轩 <[email protected]>
Co-authored-by: biubiue <[email protected]>
  • Loading branch information
3 people authored Oct 11, 2022
1 parent 682e72a commit c201d72
Show file tree
Hide file tree
Showing 7 changed files with 201 additions and 2 deletions.
16 changes: 15 additions & 1 deletion apisix/plugins/kafka-logger.lua
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,20 @@ local schema = {
maximum = 65535,
description = "the port of kafka broker",
},
sasl_config = {
type = "object",
description = "sasl config",
properties = {
mechanism = {
type = "string",
default = "PLAIN",
enum = {"PLAIN"},
},
user = { type = "string", description = "user" },
password = { type = "string", description = "password" },
},
required = {"user", "password"},
},
},
required = {"host", "port"},
},
Expand Down Expand Up @@ -109,7 +123,7 @@ local schema = {
producer_batch_num = {type = "integer", minimum = 1, default = 200},
producer_batch_size = {type = "integer", minimum = 0, default = 1048576},
producer_max_buffering = {type = "integer", minimum = 1, default = 50000},
producer_time_linger = {type = "integer", minimum = 1, default = 1}
producer_time_linger = {type = "integer", minimum = 1, default = 1},
},
oneOf = {
{ required = {"broker_list", "kafka_topic"},},
Expand Down
5 changes: 4 additions & 1 deletion ci/pod/docker-compose.plugin.yml
Original file line number Diff line number Diff line change
Expand Up @@ -104,17 +104,20 @@ services:
kafka-server2:
image: bitnami/kafka:2.8.1
env_file:
- ci/pod/kafka/kafka-server/env/common.env
- ci/pod/kafka/kafka-server/env/common2.env
environment:
KAFKA_CFG_ZOOKEEPER_CONNECT: zookeeper-server2:2181
restart: unless-stopped
ports:
- "19092:9092"
- "19094:9094"
depends_on:
- zookeeper-server1
- zookeeper-server2
networks:
kafka_net:
volumes:
- ./ci/pod/kafka/kafka-server/kafka_jaas.conf:/opt/bitnami/kafka/config/kafka_jaas.conf:ro

## SkyWalking
skywalking:
Expand Down
8 changes: 8 additions & 0 deletions ci/pod/kafka/kafka-server/env/common2.env
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
ALLOW_PLAINTEXT_LISTENER=yes
KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE=false
KAFKA_CFG_LISTENERS=PLAINTEXT://0.0.0.0:9092,SASL_PLAINTEXT://0.0.0.0:9094
KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://127.0.0.1:9092,SASL_PLAINTEXT://127.0.0.1:9094
KAFKA_CFG_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM=
KAFKA_CFG_SSL_KEYSTORE_LOCATION=/opt/bitnami/kafka/config/certs/kafka.keystore.jks
KAFKA_CFG_SSL_KEYSTORE_PASSWORD=changeit
KAFKA_CFG_SSL_KEY_PASSWORD=changeit
4 changes: 4 additions & 0 deletions docs/en/latest/plugins/kafka-logger.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,10 @@ It might take some time to receive the log data. It will be automatically sent a
| brokers | array | True | | | List of Kafka brokers (nodes). |
| brokers.host | string | True | | | The host of Kafka broker, e.g, `192.168.1.1`. |
| brokers.port | integer | True | | [0, 65535] | The port of Kafka broker |
| brokers.sasl_config | object | False | | | The sasl config of Kafka broker |
| brokers.sasl_config.mechanism | string | False | "PLAIN" | ["PLAIN"] | The mechaism of sasl config |
| brokers.sasl_config.user | string | True | | | The user of sasl_config. If sasl_config exists, it's required. |
| brokers.sasl_config.password | string | True | | | The password of sasl_config. If sasl_config exists, it's required. |
| kafka_topic | string | True | | | Target topic to push the logs for organisation. |
| producer_type | string | False | async | ["async", "sync"] | Message sending mode of the producer. |
| required_acks | integer | False | 1 | [0, 1, -1] | Number of acknowledgements the leader needs to receive for the producer to consider the request complete. This controls the durability of the sent records. The attribute follows the same configuration as the Kafka `acks` attribute. See [Apache Kafka documentation](https://kafka.apache.org/documentation/#producerconfigs_acks) for more. |
Expand Down
4 changes: 4 additions & 0 deletions docs/zh/latest/plugins/kafka-logger.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,10 @@ description: API 网关 Apache APISIX 的 kafka-logger 插件用于将日志作
| brokers | array || | | 需要推送的 Kafka 的 broker 列表。 |
| brokers.host | string || | | Kafka broker 的节点 host 配置,例如 `192.168.1.1` |
| brokers.port | string || | | Kafka broker 的节点端口配置 |
| brokers.sasl_config | object || | | Kafka broker 中的 sasl_config |
| brokers.sasl_config.mechanism | string || "PLAIN" | ["PLAIN"] | Kafka broker 中的 sasl 认证机制 |
| brokers.sasl_config.user | string || | | Kafka broker 中 sasl 配置中的 user,如果 sasl_config 存在,则必须填写 |
| brokers.sasl_config.password | string || | | Kafka broker 中 sasl 配置中的 password,如果 sasl_config 存在,则必须填写 |
| kafka_topic | string || | | 需要推送的 topic。 |
| producer_type | string || async | ["async", "sync"] | 生产者发送消息的模式。 |
| required_acks | integer || 1 | [0, 1, -1] | 生产者在确认一个请求发送完成之前需要收到的反馈信息的数量。该参数是为了保证发送请求的可靠性。该属性的配置与 Kafka `acks` 属性相同,具体配置请参考 [Apache Kafka 文档](https://kafka.apache.org/documentation/#producerconfigs_acks)|
Expand Down
116 changes: 116 additions & 0 deletions t/plugin/kafka-logger.t
Original file line number Diff line number Diff line change
Expand Up @@ -592,3 +592,119 @@ qr/partition_id: 2/]
[qr/partition_id: 1/,
qr/partition_id: 0/,
qr/partition_id: 2/]
=== TEST 20: set route with incorrect sasl_config
--- config
location /t {
content_by_lua_block {
local t = require("lib.test_admin").test
local code, body = t('/apisix/admin/routes/1',
ngx.HTTP_PUT,
[[{
"plugins":{
"kafka-logger":{
"brokers":[
{
"host":"127.0.0.1",
"port":19094,
"sasl_config":{
"mechanism":"PLAIN",
"user":"admin",
"password":"admin-secret2233"
}
}],
"kafka_topic":"test2",
"key":"key1",
"timeout":1,
"batch_max_size":1
}
},
"upstream":{
"nodes":{
"127.0.0.1:1980":1
},
"type":"roundrobin"
},
"uri":"/hello"
}]]
)
if code >= 300 then
ngx.status = code
end
ngx.say(body)
}
}
--- response_body
passed
=== TEST 21: hit route, failed to send data to kafka
--- request
GET /hello
--- response_body
hello world
--- error_log
failed to do PLAIN auth with 127.0.0.1:19094: Authentication failed: Invalid username or password
--- wait: 2
=== TEST 22: set route with correct sasl_config
--- config
location /t {
content_by_lua_block {
local t = require("lib.test_admin").test
local code, body = t('/apisix/admin/routes/1',
ngx.HTTP_PUT,
[[{
"plugins":{
"kafka-logger":{
"brokers":[
{
"host":"127.0.0.1",
"port":19094,
"sasl_config":{
"mechanism":"PLAIN",
"user":"admin",
"password":"admin-secret"
}
}],
"kafka_topic":"test2",
"key":"key1",
"timeout":1,
"batch_max_size":1,
"include_req_body": true
}
},
"upstream":{
"nodes":{
"127.0.0.1:1980":1
},
"type":"roundrobin"
},
"uri":"/hello"
}]]
)
if code >= 300 then
ngx.status = code
end
ngx.say(body)
}
}
--- response_body
passed
=== TEST 23: hit route, send data to kafka successfully
--- request
POST /hello?name=qwerty
abcdef
--- response_body
hello world
--- error_log eval
qr/send data to kafka: \{.*"body":"abcdef"/
--- wait: 2
50 changes: 50 additions & 0 deletions t/plugin/kafka-logger2.t
Original file line number Diff line number Diff line change
Expand Up @@ -406,6 +406,53 @@ qr/not found topic, retryable: true, topic: undefined_topic, partition_id: -1/
key = "key1",
},
},
{
input = {
brokers = {
{
host = "127.0.0.1",
port = 9093,
sasl_config = {
mechanism = "INVALID",
user = "admin",
password = "admin-secret",
},
},
},
kafka_topic = "test",
key = "key1",
},
},
{
input = {
brokers = {
{
host = "127.0.0.1",
port = 9093,
sasl_config = {
user = "admin",
},
},
},
kafka_topic = "test",
key = "key1",
},
},
{
input = {
brokers = {
{
host = "127.0.0.1",
port = 9093,
sasl_config = {
password = "admin-secret",
},
},
},
kafka_topic = "test",
key = "key1",
},
},
}
local plugin = require("apisix.plugins.kafka-logger")
Expand Down Expand Up @@ -433,6 +480,9 @@ property "brokers" validation failed: failed to validate item 1: property "host"
property "brokers" validation failed: failed to validate item 1: property "port" validation failed: wrong type: expected integer, got string
property "brokers" validation failed: failed to validate item 1: property "port" validation failed: expected 0 to be at least 1
property "brokers" validation failed: failed to validate item 1: property "port" validation failed: expected 65536 to be at most 65535
property "brokers" validation failed: failed to validate item 1: property "sasl_config" validation failed: property "mechanism" validation failed: matches none of the enum values
property "brokers" validation failed: failed to validate item 1: property "sasl_config" validation failed: property "password" is required
property "brokers" validation failed: failed to validate item 1: property "sasl_config" validation failed: property "user" is required
Expand Down

0 comments on commit c201d72

Please sign in to comment.