From 1b378d623302a5bae4fff25130113753bd4f0839 Mon Sep 17 00:00:00 2001 From: ashing Date: Mon, 16 Jan 2023 21:09:06 +0800 Subject: [PATCH 1/8] feat: support send error-log to kafka brokers --- apisix/plugins/error-log-logger.lua | 110 +++++++++++++++- t/plugin/error-log-logger-kafka.t | 196 ++++++++++++++++++++++++++++ 2 files changed, 305 insertions(+), 1 deletion(-) create mode 100644 t/plugin/error-log-logger-kafka.t diff --git a/apisix/plugins/error-log-logger.lua b/apisix/plugins/error-log-logger.lua index d847ca1ce852..ca705caa8033 100644 --- a/apisix/plugins/error-log-logger.lua +++ b/apisix/plugins/error-log-logger.lua @@ -19,6 +19,7 @@ local core = require("apisix.core") local errlog = require("ngx.errlog") local batch_processor = require("apisix.utils.batch-processor") local plugin = require("apisix.plugin") +local producer = require ("resty.kafka.producer") local timers = require("apisix.timers") local http = require("resty.http") local plugin_name = "error-log-logger" @@ -66,6 +67,62 @@ local metadata_schema = { }, required = {"endpoint_addr", "user", "password", "database", "logtable"} }, + kafka = { + type = "object", + properties = { + brokers = { + type = "array", + minItems = 1, + items = { + type = "object", + properties = { + host = { + type = "string", + description = "the host of kafka broker", + }, + port = { + type = "integer", + minimum = 1, + maximum = 65535, + description = "the port of kafka broker", + }, + sasl_config = { + type = "object", + description = "sasl config", + properties = { + mechanism = { + type = "string", + default = "PLAIN", + enum = {"PLAIN"}, + }, + user = { type = "string", description = "user" }, + password = { type = "string", description = "password" }, + }, + required = {"user", "password"}, + }, + }, + required = {"host", "port"}, + }, + uniqueItems = true, + }, + kafka_topic = {type = "string"}, + producer_type = { + type = "string", + default = "async", + enum = {"async", "sync"}, + }, + required_acks = { + type = "integer", + default = 1, + enum = { 0, 1, -1 }, + }, + key = {type = "string"}, + -- in lua-resty-kafka, cluster_name is defined as number + -- see https://github.com/doujiang24/lua-resty-kafka#new-1 + cluster_name = {type = "integer", minimum = 1, default = 1}, + }, + required = {"brokers", "kafka_topic"}, + }, name = {type = "string", default = plugin_name}, level = {type = "string", default = "WARN", enum = {"STDERR", "EMERG", "ALERT", "CRIT", "ERR", "ERROR", "WARN", "NOTICE", "INFO", "DEBUG"}}, @@ -81,6 +138,7 @@ local metadata_schema = { {required = {"skywalking"}}, {required = {"tcp"}}, {required = {"clickhouse"}}, + {required = {"kafka"}}, -- for compatible with old schema {required = {"host", "port"}} }, @@ -272,6 +330,53 @@ local function send_to_clickhouse(log_message) end +local function create_producer(broker_list, broker_config, cluster_name) + core.log.info("create new kafka producer instance") + return producer:new(broker_list, broker_config, cluster_name) +end + + +local function send_to_kafka(log_message) + core.log.info("sending a batch logs to kafka brokers: ", core.json.encode(config.kafka.brokers)) + + local broker_config = {} + broker_config["request_timeout"] = config.timeout * 1000 + broker_config["producer_type"] = config.kafka.producer_type + broker_config["required_acks"] = config.kafka.required_acks + + local prod + local err + + local metadata = plugin.plugin_metadata(plugin_name) + if not (metadata and metadata.value and metadata.modifiedIndex) then + core.log.info("please set the correct plugin_metadata for ", plugin_name) + return + else + -- reuse producer via lrucache to avoid unbalanced partitions of messages in kafka + prod, err = lrucache(plugin_name .. "#kafka", metadata.modifiedIndex, + create_producer, config.kafka.brokers, broker_config, config.kafka.cluster_name) + if not prod then + return false, "get kafka producer failed " .. err + end + core.log.info("kafka cluster name ", config.kafka.cluster_name, ", broker_list[1] port ", + prod.client.broker_list[1].port) + end + + + local ok + for i = 1, #log_message, 2 do + ok, err = prod:send(config.kafka.kafka_topic, config.kafka.key, core.json.encode(log_message[i])) + if not ok then + return false, "failed to send data to Kafka topic: " .. err .. + ", brokers: " .. core.json.encode(config.kafka.brokers) + end + core.log.info("send data to kafka: ", core.json.encode(log_message[i])) + end + + return true +end + + local function update_filter(value) local level = log_level[value.level] local status, err = errlog.set_filter_level(level) @@ -286,10 +391,13 @@ end local function send(data) + core.log.info("send data is ", core.json.encode(data,false)) if config.skywalking then return send_to_skywalking(data) elseif config.clickhouse then return send_to_clickhouse(data) + elseif config.kafka then + return send_to_kafka(data) end return send_to_tcp_server(data) end @@ -307,7 +415,7 @@ local function process() core.log.warn("set log filter failed for ", err) return end - if not (config.tcp or config.skywalking or config.clickhouse) then + if not (config.tcp or config.skywalking or config.clickhouse or config.kafka) then config.tcp = { host = config.host, port = config.port, diff --git a/t/plugin/error-log-logger-kafka.t b/t/plugin/error-log-logger-kafka.t new file mode 100644 index 000000000000..527ee9f1be86 --- /dev/null +++ b/t/plugin/error-log-logger-kafka.t @@ -0,0 +1,196 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +use t::APISIX 'no_plan'; + +log_level("info"); +repeat_each(1); +no_long_string(); +no_root_location(); + +add_block_preprocessor(sub { + my ($block) = @_; + + if (!defined $block->request) { + $block->set_value("request", "GET /t"); + } + + if (!defined $block->extra_yaml_config) { + my $extra_yaml_config = <<_EOC_; +plugins: + - error-log-logger +_EOC_ + $block->set_value("extra_yaml_config", $extra_yaml_config); + } +}); + +run_tests(); + +__DATA__ + +=== TEST 1: test schema checker +--- config + location /t { + content_by_lua_block { + local core = require("apisix.core") + local plugin = require("apisix.plugins.error-log-logger") + local ok, err = plugin.check_schema( + { + kafka = { + brokers = { + { + host = "127.0.0.1", + port = 9092 + } + }, + kafka_topic = "test2" + } + }, + core.schema.TYPE_METADATA + ) + if not ok then + ngx.say(err) + end + + ngx.say("done") + } + } +--- response_body +done + + + +=== TEST 2: put plugin metadata and log an error level message - no auth kafka +--- config + location /t { + content_by_lua_block { + local core = require("apisix.core") + local t = require("lib.test_admin").test + local code, body = t('/apisix/admin/plugin_metadata/error-log-logger', + ngx.HTTP_PUT, + [[{ + "kafka": { + "brokers": [{ + "host": "127.0.0.1", + "port": 9092 + }], + "kafka_topic": "test2" + }, + "level": "ERROR", + "inactive_timeout": 1 + }]] + ) + ngx.sleep(2) + core.log.error("this is a error message for test2.") + } + } +--- response_body +--- error_log eval +[qr/this is a error message for test2/, +qr/send data to kafka: .*test2/] +--- wait: 3 + + + +=== TEST 3: log a error level message +--- config + location /t { + content_by_lua_block { + local core = require("apisix.core") + core.log.error("this is a error message for test3.") + } + } +--- response_body +--- error_log eval +[qr/this is a error message for test3/, +qr/send data to kafka: .*test3/] +--- wait: 5 + + + +=== TEST 4: log an warning level message - will not send to kafka brokers +--- config + location /t { + content_by_lua_block { + local core = require("apisix.core") + core.log.warn("this is an warning message for test4.") + } + } +--- response_body +--- error_log +this is an warning message for test4 +--- no_error_log eval +qr/send data to kafka: .*test4/ +--- wait: 5 + + + +=== TEST 5: put plugin metadata and log an error level message - auth kafka +--- config + location /t { + content_by_lua_block { + local core = require("apisix.core") + local t = require("lib.test_admin").test + local code, body = t('/apisix/admin/plugin_metadata/error-log-logger', + ngx.HTTP_PUT, + [[{ + "kafka": { + "brokers": [{ + "host": "127.0.0.1", + "port": 19094, + "sasl_config": { + "mechanism": "PLAIN", + "user": "admin", + "password": "admin-secret" + } + }], + "producer_type": "sync", + "kafka_topic": "test4" + }, + "level": "ERROR", + "inactive_timeout": 1 + }]] + ) + ngx.sleep(2) + core.log.error("this is a error message for test5.") + } + } +--- response_body +--- error_log eval +[qr/this is a error message for test5/, +qr/send data to kafka: .*test5/] +--- wait: 3 + + + +=== TEST 6: delete metadata for the plugin, recover to the default +--- config + location /t { + content_by_lua_block { + local core = require("apisix.core") + local t = require("lib.test_admin").test + local code, body = t('/apisix/admin/plugin_metadata/error-log-logger', + ngx.HTTP_DELETE) + + if code >= 300 then + ngx.status = code + end + + ngx.say(body) + } + } +--- response_body +passed From 202a3a236be48832d21dd9e8ce32e5ee53c3820f Mon Sep 17 00:00:00 2001 From: ashing Date: Mon, 16 Jan 2023 23:11:13 +0800 Subject: [PATCH 2/8] fix: ci --- apisix/plugins/error-log-logger.lua | 7 ++++--- t/plugin/error-log-logger-kafka.t | 1 - 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/apisix/plugins/error-log-logger.lua b/apisix/plugins/error-log-logger.lua index ca705caa8033..22a664a673ec 100644 --- a/apisix/plugins/error-log-logger.lua +++ b/apisix/plugins/error-log-logger.lua @@ -354,7 +354,8 @@ local function send_to_kafka(log_message) else -- reuse producer via lrucache to avoid unbalanced partitions of messages in kafka prod, err = lrucache(plugin_name .. "#kafka", metadata.modifiedIndex, - create_producer, config.kafka.brokers, broker_config, config.kafka.cluster_name) + create_producer, config.kafka.brokers, broker_config, + config.kafka.cluster_name) if not prod then return false, "get kafka producer failed " .. err end @@ -365,7 +366,8 @@ local function send_to_kafka(log_message) local ok for i = 1, #log_message, 2 do - ok, err = prod:send(config.kafka.kafka_topic, config.kafka.key, core.json.encode(log_message[i])) + ok, err = prod:send(config.kafka.kafka_topic, + config.kafka.key, core.json.encode(log_message[i])) if not ok then return false, "failed to send data to Kafka topic: " .. err .. ", brokers: " .. core.json.encode(config.kafka.brokers) @@ -391,7 +393,6 @@ end local function send(data) - core.log.info("send data is ", core.json.encode(data,false)) if config.skywalking then return send_to_skywalking(data) elseif config.clickhouse then diff --git a/t/plugin/error-log-logger-kafka.t b/t/plugin/error-log-logger-kafka.t index 527ee9f1be86..15cf04e6d60a 100644 --- a/t/plugin/error-log-logger-kafka.t +++ b/t/plugin/error-log-logger-kafka.t @@ -157,7 +157,6 @@ qr/send data to kafka: .*test4/ "password": "admin-secret" } }], - "producer_type": "sync", "kafka_topic": "test4" }, "level": "ERROR", From afcdcfaf6637c5ecf296fd7d7237070ea23f09df Mon Sep 17 00:00:00 2001 From: ashing Date: Tue, 17 Jan 2023 11:06:34 +0800 Subject: [PATCH 3/8] docs: add error log to kafka --- docs/en/latest/plugins/error-log-logger.md | 35 +++++++++++++++++++- docs/zh/latest/plugins/error-log-logger.md | 37 ++++++++++++++++++++-- 2 files changed, 69 insertions(+), 3 deletions(-) diff --git a/docs/en/latest/plugins/error-log-logger.md b/docs/en/latest/plugins/error-log-logger.md index 63b1be1c727b..5f79e1119c27 100644 --- a/docs/en/latest/plugins/error-log-logger.md +++ b/docs/en/latest/plugins/error-log-logger.md @@ -28,7 +28,7 @@ description: This document contains information about the Apache APISIX error-lo ## Description -The `error-log-logger` Plugin is used to push APISIX's error logs (`error.log`) to TCP, [Apache SkyWalking](https://skywalking.apache.org/), or ClickHouse servers. You can also set the error log level to send the logs to server. +The `error-log-logger` Plugin is used to push APISIX's error logs (`error.log`) to TCP, [Apache SkyWalking](https://skywalking.apache.org/), Apache Kafka or ClickHouse servers. You can also set the error log level to send the logs to server. It might take some time to receive the log data. It will be automatically sent after the timer function in the [batch processor](../batch-processor.md) expires. @@ -48,6 +48,17 @@ It might take some time to receive the log data. It will be automatically sent a | clickhouse.password | String | False | | | ClickHouse password. | | clickhouse.database | String | False | | | Name of the database to store the logs. | | clickhouse.logtable | String | False | | | Table name to store the logs. | +| kafka.brokers | array | True | | | List of Kafka brokers (nodes). | +| kafka.brokers.host | string | True | | | The host of Kafka broker, e.g, `192.168.1.1`. | +| kafka.brokers.port | integer | True | | [0, 65535] | The port of Kafka broker | +| kafka.brokers.sasl_config | object | False | | | The sasl config of Kafka broker | +| kafka.brokers.sasl_config.mechanism | string | False | "PLAIN" | ["PLAIN"] | The mechaism of sasl config | +| kafka.brokers.sasl_config.user | string | True | | | The user of sasl_config. If sasl_config exists, it's required. | +| kafka.brokers.sasl_config.password | string | True | | | The password of sasl_config. If sasl_config exists, it's required. | +| kafka.kafka_topic | string | True | | | Target topic to push the logs for organisation. | +| kafka.producer_type | string | False | async | ["async", "sync"] | Message sending mode of the producer. | +| kafka.required_acks | integer | False | 1 | [0, 1, -1] | Number of acknowledgements the leader needs to receive for the producer to consider the request complete. This controls the durability of the sent records. The attribute follows the same configuration as the Kafka `acks` attribute. See [Apache Kafka documentation](https://kafka.apache.org/documentation/#producerconfigs_acks) for more. | +| kafka.key | string | False | | | Key used for allocating partitions for messages. | | timeout | integer | False | 3 | [1,...] | Timeout (in seconds) for the upstream to connect and send data. | | keepalive | integer | False | 30 | [1,...] | Time in seconds to keep the connection alive after sending data. | | level | string | False | WARN | ["STDERR", "EMERG", "ALERT", "CRIT", "ERR", "ERROR", "WARN", "NOTICE", "INFO", "DEBUG"] | Log level to filter the error logs. `ERR` is same as `ERROR`. | @@ -118,6 +129,28 @@ curl http://127.0.0.1:9180/apisix/admin/plugin_metadata/error-log-logger -H 'X-A }' ``` +### Configuring Kafka server + +The Plugin sends the error log to Kafka, you can configure it as shown below: + +```shell +curl http://127.0.0.1:9180/apisix/admin/plugin_metadata/error-log-logger \ +-H 'X-API-KEY: edd1c9f034335f136f87ad84b625c8f1' -X PUT -d ' +{ + "kafka":{ + "brokers":[ + { + "host":"127.0.0.1", + "port":9092 + } + ], + "kafka_topic":"test2" + }, + "level":"ERROR", + "inactive_timeout":1 +}' +``` + ## Disable Plugin To disable the Plugin, you can remove it from your configuration file (`conf/config.yaml`): diff --git a/docs/zh/latest/plugins/error-log-logger.md b/docs/zh/latest/plugins/error-log-logger.md index daee4c521f5a..4b6b8b5d09d5 100644 --- a/docs/zh/latest/plugins/error-log-logger.md +++ b/docs/zh/latest/plugins/error-log-logger.md @@ -5,7 +5,7 @@ keywords: - API 网关 - 错误日志 - Plugin -description: API 网关 Apache APISIX error-log-logger 插件用于将 APISIX 的错误日志推送到 TCP、Apache SkyWalking 或 ClickHouse 服务器。 +description: API 网关 Apache APISIX error-log-logger 插件用于将 APISIX 的错误日志推送到 TCP、Apache SkyWalking、Apache Kafka 或 ClickHouse 服务器。 ---