-
Notifications
You must be signed in to change notification settings - Fork 2.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: support send error-log to kafka brokers #8693
Changes from all commits
1b378d6
202a3a2
afcdcfa
dff0e68
4e35ef8
833467e
6357cd1
e663ae9
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||||
---|---|---|---|---|---|---|---|---|
|
@@ -21,6 +21,7 @@ local batch_processor = require("apisix.utils.batch-processor") | |||||||
local plugin = require("apisix.plugin") | ||||||||
local timers = require("apisix.timers") | ||||||||
local http = require("resty.http") | ||||||||
local producer = require("resty.kafka.producer") | ||||||||
local plugin_name = "error-log-logger" | ||||||||
local table = core.table | ||||||||
local schema_def = core.schema | ||||||||
|
@@ -32,6 +33,9 @@ local string = require("string") | |||||||
local lrucache = core.lrucache.new({ | ||||||||
ttl = 300, count = 32 | ||||||||
}) | ||||||||
local kafka_prod_lrucache = core.lrucache.new({ | ||||||||
ttl = 300, count = 32 | ||||||||
}) | ||||||||
|
||||||||
|
||||||||
local metadata_schema = { | ||||||||
|
@@ -66,6 +70,62 @@ local metadata_schema = { | |||||||
}, | ||||||||
required = {"endpoint_addr", "user", "password", "database", "logtable"} | ||||||||
}, | ||||||||
kafka = { | ||||||||
type = "object", | ||||||||
properties = { | ||||||||
brokers = { | ||||||||
type = "array", | ||||||||
minItems = 1, | ||||||||
items = { | ||||||||
type = "object", | ||||||||
properties = { | ||||||||
host = { | ||||||||
type = "string", | ||||||||
description = "the host of kafka broker", | ||||||||
}, | ||||||||
port = { | ||||||||
type = "integer", | ||||||||
minimum = 1, | ||||||||
maximum = 65535, | ||||||||
description = "the port of kafka broker", | ||||||||
}, | ||||||||
sasl_config = { | ||||||||
type = "object", | ||||||||
description = "sasl config", | ||||||||
properties = { | ||||||||
mechanism = { | ||||||||
type = "string", | ||||||||
default = "PLAIN", | ||||||||
enum = {"PLAIN"}, | ||||||||
}, | ||||||||
user = { type = "string", description = "user" }, | ||||||||
password = { type = "string", description = "password" }, | ||||||||
}, | ||||||||
required = {"user", "password"}, | ||||||||
}, | ||||||||
}, | ||||||||
required = {"host", "port"}, | ||||||||
}, | ||||||||
uniqueItems = true, | ||||||||
}, | ||||||||
kafka_topic = {type = "string"}, | ||||||||
producer_type = { | ||||||||
type = "string", | ||||||||
default = "async", | ||||||||
enum = {"async", "sync"}, | ||||||||
}, | ||||||||
required_acks = { | ||||||||
type = "integer", | ||||||||
default = 1, | ||||||||
enum = { 0, 1, -1 }, | ||||||||
}, | ||||||||
key = {type = "string"}, | ||||||||
-- in lua-resty-kafka, cluster_name is defined as number | ||||||||
-- see https://github.com/doujiang24/lua-resty-kafka#new-1 | ||||||||
cluster_name = {type = "integer", minimum = 1, default = 1}, | ||||||||
}, | ||||||||
required = {"brokers", "kafka_topic"}, | ||||||||
}, | ||||||||
name = {type = "string", default = plugin_name}, | ||||||||
level = {type = "string", default = "WARN", enum = {"STDERR", "EMERG", "ALERT", "CRIT", | ||||||||
"ERR", "ERROR", "WARN", "NOTICE", "INFO", "DEBUG"}}, | ||||||||
|
@@ -81,6 +141,7 @@ local metadata_schema = { | |||||||
{required = {"skywalking"}}, | ||||||||
{required = {"tcp"}}, | ||||||||
{required = {"clickhouse"}}, | ||||||||
{required = {"kafka"}}, | ||||||||
-- for compatible with old schema | ||||||||
{required = {"host", "port"}} | ||||||||
}, | ||||||||
|
@@ -285,11 +346,63 @@ local function update_filter(value) | |||||||
end | ||||||||
|
||||||||
|
||||||||
local function create_producer(broker_list, broker_config, cluster_name) | ||||||||
core.log.info("create new kafka producer instance") | ||||||||
return producer:new(broker_list, broker_config, cluster_name) | ||||||||
end | ||||||||
|
||||||||
|
||||||||
local function send_to_kafka(log_message) | ||||||||
-- avoid race of the global config | ||||||||
local metadata = plugin.plugin_metadata(plugin_name) | ||||||||
if not (metadata and metadata.value and metadata.modifiedIndex) then | ||||||||
return false, "please set the correct plugin_metadata for " .. plugin_name | ||||||||
end | ||||||||
local config, err = lrucache(plugin_name, metadata.modifiedIndex, update_filter, metadata.value) | ||||||||
if not config then | ||||||||
return false, "get config failed: " .. err | ||||||||
end | ||||||||
|
||||||||
core.log.info("sending a batch logs to kafka brokers: ", | ||||||||
core.json.delay_encode(config.kafka.brokers)) | ||||||||
|
||||||||
local broker_config = {} | ||||||||
broker_config["request_timeout"] = config.timeout * 1000 | ||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Where is the config from? This function doesn't have an argument called config. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. it comes from
apisix/apisix/plugins/error-log-logger.lua Line 110 in d852953
and set by apisix/apisix/plugins/error-log-logger.lua Line 305 in d852953
|
||||||||
broker_config["producer_type"] = config.kafka.producer_type | ||||||||
broker_config["required_acks"] = config.kafka.required_acks | ||||||||
|
||||||||
-- reuse producer via kafka_prod_lrucache to avoid unbalanced partitions of messages in kafka | ||||||||
local prod, err = kafka_prod_lrucache(plugin_name, metadata.modifiedIndex, | ||||||||
create_producer, config.kafka.brokers, broker_config, | ||||||||
config.kafka.cluster_name) | ||||||||
if not prod then | ||||||||
return false, "get kafka producer failed: " .. err | ||||||||
end | ||||||||
core.log.info("kafka cluster name ", config.kafka.cluster_name, ", broker_list[1] port ", | ||||||||
prod.client.broker_list[1].port) | ||||||||
|
||||||||
local ok | ||||||||
for i = 1, #log_message, 2 do | ||||||||
ok, err = prod:send(config.kafka.kafka_topic, | ||||||||
config.kafka.key, core.json.encode(log_message[i])) | ||||||||
if not ok then | ||||||||
return false, "failed to send data to Kafka topic: " .. err .. | ||||||||
", brokers: " .. core.json.encode(config.kafka.brokers) | ||||||||
end | ||||||||
core.log.info("send data to kafka: ", core.json.delay_encode(log_message[i])) | ||||||||
end | ||||||||
|
||||||||
return true | ||||||||
end | ||||||||
|
||||||||
|
||||||||
local function send(data) | ||||||||
if config.skywalking then | ||||||||
return send_to_skywalking(data) | ||||||||
elseif config.clickhouse then | ||||||||
return send_to_clickhouse(data) | ||||||||
elseif config.kafka then | ||||||||
return send_to_kafka(data) | ||||||||
end | ||||||||
return send_to_tcp_server(data) | ||||||||
end | ||||||||
|
@@ -307,7 +420,7 @@ local function process() | |||||||
core.log.warn("set log filter failed for ", err) | ||||||||
return | ||||||||
end | ||||||||
if not (config.tcp or config.skywalking or config.clickhouse) then | ||||||||
if not (config.tcp or config.skywalking or config.clickhouse or config.kafka) then | ||||||||
config.tcp = { | ||||||||
host = config.host, | ||||||||
port = config.port, | ||||||||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -28,7 +28,7 @@ description: This document contains information about the Apache APISIX error-lo | |
|
||
## Description | ||
|
||
The `error-log-logger` Plugin is used to push APISIX's error logs (`error.log`) to TCP, [Apache SkyWalking](https://skywalking.apache.org/), or ClickHouse servers. You can also set the error log level to send the logs to server. | ||
The `error-log-logger` Plugin is used to push APISIX's error logs (`error.log`) to TCP, [Apache SkyWalking](https://skywalking.apache.org/), Apache Kafka or ClickHouse servers. You can also set the error log level to send the logs to server. | ||
|
||
It might take some time to receive the log data. It will be automatically sent after the timer function in the [batch processor](../batch-processor.md) expires. | ||
|
||
|
@@ -48,6 +48,18 @@ It might take some time to receive the log data. It will be automatically sent a | |
| clickhouse.password | String | False | | | ClickHouse password. | | ||
| clickhouse.database | String | False | | | Name of the database to store the logs. | | ||
| clickhouse.logtable | String | False | | | Table name to store the logs. | | ||
| kafka.brokers | array | True | | | List of Kafka brokers (nodes). | | ||
| kafka.brokers.host | string | True | | | The host of Kafka broker, e.g, `192.168.1.1`. | | ||
| kafka.brokers.port | integer | True | | [0, 65535] | The port of Kafka broker | | ||
| kafka.brokers.sasl_config | object | False | | | The sasl config of Kafka broker | | ||
| kafka.brokers.sasl_config.mechanism | string | False | "PLAIN" | ["PLAIN"] | The mechaism of sasl config | | ||
| kafka.brokers.sasl_config.user | string | True | | | The user of sasl_config. If sasl_config exists, it's required. | | ||
| kafka.brokers.sasl_config.password | string | True | | | The password of sasl_config. If sasl_config exists, it's required. | | ||
| kafka.kafka_topic | string | True | | | Target topic to push the logs for organisation. | | ||
| kafka.producer_type | string | False | async | ["async", "sync"] | Message sending mode of the producer. | | ||
| kafka.required_acks | integer | False | 1 | [0, 1, -1] | Number of acknowledgements the leader needs to receive for the producer to consider the request complete. This controls the durability of the sent records. The attribute follows the same configuration as the Kafka `acks` attribute. See [Apache Kafka documentation](https://kafka.apache.org/documentation/#producerconfigs_acks) for more. | | ||
| kafka.key | string | False | | | Key used for allocating partitions for messages. | | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. cluster_name is not documented? |
||
| kafka.cluster_name | integer | False | 1 | [0,...] | Name of the cluster. Used when there are two or more Kafka clusters. Only works if the `producer_type` attribute is set to `async`. | | ||
| timeout | integer | False | 3 | [1,...] | Timeout (in seconds) for the upstream to connect and send data. | | ||
| keepalive | integer | False | 30 | [1,...] | Time in seconds to keep the connection alive after sending data. | | ||
| level | string | False | WARN | ["STDERR", "EMERG", "ALERT", "CRIT", "ERR", "ERROR", "WARN", "NOTICE", "INFO", "DEBUG"] | Log level to filter the error logs. `ERR` is same as `ERROR`. | | ||
|
@@ -118,6 +130,28 @@ curl http://127.0.0.1:9180/apisix/admin/plugin_metadata/error-log-logger -H 'X-A | |
}' | ||
``` | ||
|
||
### Configuring Kafka server | ||
|
||
The Plugin sends the error log to Kafka, you can configure it as shown below: | ||
|
||
```shell | ||
curl http://127.0.0.1:9180/apisix/admin/plugin_metadata/error-log-logger \ | ||
-H 'X-API-KEY: edd1c9f034335f136f87ad84b625c8f1' -X PUT -d ' | ||
{ | ||
"kafka":{ | ||
"brokers":[ | ||
{ | ||
"host":"127.0.0.1", | ||
"port":9092 | ||
} | ||
], | ||
"kafka_topic":"test2" | ||
}, | ||
"level":"ERROR", | ||
"inactive_timeout":1 | ||
}' | ||
``` | ||
|
||
## Disable Plugin | ||
|
||
To disable the Plugin, you can remove it from your configuration file (`conf/config.yaml`): | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's group the resty module together.