Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support send error-log to kafka brokers #8693

Merged
merged 8 commits into from
Jan 31, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
115 changes: 114 additions & 1 deletion apisix/plugins/error-log-logger.lua
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ local batch_processor = require("apisix.utils.batch-processor")
local plugin = require("apisix.plugin")
local timers = require("apisix.timers")
local http = require("resty.http")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's group the resty module together.

local producer = require("resty.kafka.producer")
local plugin_name = "error-log-logger"
local table = core.table
local schema_def = core.schema
Expand All @@ -32,6 +33,9 @@ local string = require("string")
local lrucache = core.lrucache.new({
ttl = 300, count = 32
})
local kafka_prod_lrucache = core.lrucache.new({
ttl = 300, count = 32
})


local metadata_schema = {
Expand Down Expand Up @@ -66,6 +70,62 @@ local metadata_schema = {
},
required = {"endpoint_addr", "user", "password", "database", "logtable"}
},
kafka = {
type = "object",
properties = {
brokers = {
type = "array",
minItems = 1,
items = {
type = "object",
properties = {
host = {
type = "string",
description = "the host of kafka broker",
},
port = {
type = "integer",
minimum = 1,
maximum = 65535,
description = "the port of kafka broker",
},
sasl_config = {
type = "object",
description = "sasl config",
properties = {
mechanism = {
type = "string",
default = "PLAIN",
enum = {"PLAIN"},
},
user = { type = "string", description = "user" },
password = { type = "string", description = "password" },
},
required = {"user", "password"},
},
},
required = {"host", "port"},
},
uniqueItems = true,
},
kafka_topic = {type = "string"},
producer_type = {
type = "string",
default = "async",
enum = {"async", "sync"},
},
required_acks = {
type = "integer",
default = 1,
enum = { 0, 1, -1 },
},
key = {type = "string"},
-- in lua-resty-kafka, cluster_name is defined as number
-- see https://github.com/doujiang24/lua-resty-kafka#new-1
cluster_name = {type = "integer", minimum = 1, default = 1},
},
required = {"brokers", "kafka_topic"},
},
name = {type = "string", default = plugin_name},
level = {type = "string", default = "WARN", enum = {"STDERR", "EMERG", "ALERT", "CRIT",
"ERR", "ERROR", "WARN", "NOTICE", "INFO", "DEBUG"}},
Expand All @@ -81,6 +141,7 @@ local metadata_schema = {
{required = {"skywalking"}},
{required = {"tcp"}},
{required = {"clickhouse"}},
{required = {"kafka"}},
-- for compatible with old schema
{required = {"host", "port"}}
},
Expand Down Expand Up @@ -285,11 +346,63 @@ local function update_filter(value)
end


local function create_producer(broker_list, broker_config, cluster_name)
core.log.info("create new kafka producer instance")
return producer:new(broker_list, broker_config, cluster_name)
end


local function send_to_kafka(log_message)
-- avoid race of the global config
local metadata = plugin.plugin_metadata(plugin_name)
if not (metadata and metadata.value and metadata.modifiedIndex) then
return false, "please set the correct plugin_metadata for " .. plugin_name
end
local config, err = lrucache(plugin_name, metadata.modifiedIndex, update_filter, metadata.value)
if not config then
return false, "get config failed: " .. err
end

core.log.info("sending a batch logs to kafka brokers: ",
core.json.delay_encode(config.kafka.brokers))

local broker_config = {}
broker_config["request_timeout"] = config.timeout * 1000
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where is the config from? This function doesn't have an argument called config.

Copy link
Contributor Author

@ronething ronething Jan 17, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it comes from

timeout = {type = "integer", minimum = 1, default = 3},

local config = {}

and set by
config, err = lrucache(plugin_name, metadata.modifiedIndex, update_filter, metadata.value)

broker_config["producer_type"] = config.kafka.producer_type
broker_config["required_acks"] = config.kafka.required_acks

-- reuse producer via kafka_prod_lrucache to avoid unbalanced partitions of messages in kafka
local prod, err = kafka_prod_lrucache(plugin_name, metadata.modifiedIndex,
create_producer, config.kafka.brokers, broker_config,
config.kafka.cluster_name)
if not prod then
return false, "get kafka producer failed: " .. err
end
core.log.info("kafka cluster name ", config.kafka.cluster_name, ", broker_list[1] port ",
prod.client.broker_list[1].port)

local ok
for i = 1, #log_message, 2 do
ok, err = prod:send(config.kafka.kafka_topic,
config.kafka.key, core.json.encode(log_message[i]))
if not ok then
return false, "failed to send data to Kafka topic: " .. err ..
", brokers: " .. core.json.encode(config.kafka.brokers)
end
core.log.info("send data to kafka: ", core.json.delay_encode(log_message[i]))
end

return true
end


local function send(data)
if config.skywalking then
return send_to_skywalking(data)
elseif config.clickhouse then
return send_to_clickhouse(data)
elseif config.kafka then
return send_to_kafka(data)
end
return send_to_tcp_server(data)
end
Expand All @@ -307,7 +420,7 @@ local function process()
core.log.warn("set log filter failed for ", err)
return
end
if not (config.tcp or config.skywalking or config.clickhouse) then
if not (config.tcp or config.skywalking or config.clickhouse or config.kafka) then
config.tcp = {
host = config.host,
port = config.port,
Expand Down
36 changes: 35 additions & 1 deletion docs/en/latest/plugins/error-log-logger.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ description: This document contains information about the Apache APISIX error-lo

## Description

The `error-log-logger` Plugin is used to push APISIX's error logs (`error.log`) to TCP, [Apache SkyWalking](https://skywalking.apache.org/), or ClickHouse servers. You can also set the error log level to send the logs to server.
The `error-log-logger` Plugin is used to push APISIX's error logs (`error.log`) to TCP, [Apache SkyWalking](https://skywalking.apache.org/), Apache Kafka or ClickHouse servers. You can also set the error log level to send the logs to server.

It might take some time to receive the log data. It will be automatically sent after the timer function in the [batch processor](../batch-processor.md) expires.

Expand All @@ -48,6 +48,18 @@ It might take some time to receive the log data. It will be automatically sent a
| clickhouse.password | String | False | | | ClickHouse password. |
| clickhouse.database | String | False | | | Name of the database to store the logs. |
| clickhouse.logtable | String | False | | | Table name to store the logs. |
| kafka.brokers | array | True | | | List of Kafka brokers (nodes). |
| kafka.brokers.host | string | True | | | The host of Kafka broker, e.g, `192.168.1.1`. |
| kafka.brokers.port | integer | True | | [0, 65535] | The port of Kafka broker |
| kafka.brokers.sasl_config | object | False | | | The sasl config of Kafka broker |
| kafka.brokers.sasl_config.mechanism | string | False | "PLAIN" | ["PLAIN"] | The mechaism of sasl config |
| kafka.brokers.sasl_config.user | string | True | | | The user of sasl_config. If sasl_config exists, it's required. |
| kafka.brokers.sasl_config.password | string | True | | | The password of sasl_config. If sasl_config exists, it's required. |
| kafka.kafka_topic | string | True | | | Target topic to push the logs for organisation. |
| kafka.producer_type | string | False | async | ["async", "sync"] | Message sending mode of the producer. |
| kafka.required_acks | integer | False | 1 | [0, 1, -1] | Number of acknowledgements the leader needs to receive for the producer to consider the request complete. This controls the durability of the sent records. The attribute follows the same configuration as the Kafka `acks` attribute. See [Apache Kafka documentation](https://kafka.apache.org/documentation/#producerconfigs_acks) for more. |
| kafka.key | string | False | | | Key used for allocating partitions for messages. |
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cluster_name is not documented?

| kafka.cluster_name | integer | False | 1 | [0,...] | Name of the cluster. Used when there are two or more Kafka clusters. Only works if the `producer_type` attribute is set to `async`. |
| timeout | integer | False | 3 | [1,...] | Timeout (in seconds) for the upstream to connect and send data. |
| keepalive | integer | False | 30 | [1,...] | Time in seconds to keep the connection alive after sending data. |
| level | string | False | WARN | ["STDERR", "EMERG", "ALERT", "CRIT", "ERR", "ERROR", "WARN", "NOTICE", "INFO", "DEBUG"] | Log level to filter the error logs. `ERR` is same as `ERROR`. |
Expand Down Expand Up @@ -118,6 +130,28 @@ curl http://127.0.0.1:9180/apisix/admin/plugin_metadata/error-log-logger -H 'X-A
}'
```

### Configuring Kafka server

The Plugin sends the error log to Kafka, you can configure it as shown below:

```shell
curl http://127.0.0.1:9180/apisix/admin/plugin_metadata/error-log-logger \
-H 'X-API-KEY: edd1c9f034335f136f87ad84b625c8f1' -X PUT -d '
{
"kafka":{
"brokers":[
{
"host":"127.0.0.1",
"port":9092
}
],
"kafka_topic":"test2"
},
"level":"ERROR",
"inactive_timeout":1
}'
```

## Disable Plugin

To disable the Plugin, you can remove it from your configuration file (`conf/config.yaml`):
Expand Down
Loading