Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support send error-log to kafka brokers #8693

Merged
merged 8 commits into from
Jan 31, 2023

Conversation

ronething
Copy link
Contributor

@ronething ronething commented Jan 16, 2023

Description

Fixes #8678

support send error-log to kafka brokers.

  • for producer via lrucache, i use plugin_name .. "#kafka" as key, and metadata.modifiedIndex as version.
        -- reuse producer via lrucache to avoid unbalanced partitions of messages in kafka
        prod, err = lrucache(plugin_name .. "#kafka", metadata.modifiedIndex,
                             create_producer, config.kafka.brokers, broker_config, config.kafka.cluster_name)

Checklist

  • I have explained the need for this PR and the problem it solves
  • I have explained the changes or the new features added to this PR
  • I have added tests corresponding to this change
  • I have updated the documentation to reflect this change
  • I have verified that this change is backward compatible (If not, please discuss on the APISIX mailing list first)

@ronething ronething marked this pull request as ready for review January 17, 2023 03:59
@@ -19,6 +19,7 @@ local core = require("apisix.core")
local errlog = require("ngx.errlog")
local batch_processor = require("apisix.utils.batch-processor")
local plugin = require("apisix.plugin")
local producer = require ("resty.kafka.producer")
local timers = require("apisix.timers")
local http = require("resty.http")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's group the resty module together.



local function send_to_kafka(log_message)
core.log.info("sending a batch logs to kafka brokers: ", core.json.encode(config.kafka.brokers))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we use delay_encode?

if not (metadata and metadata.value and metadata.modifiedIndex) then
core.log.info("please set the correct plugin_metadata for ", plugin_name)
return
else
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can use end here, no need to nest the code

create_producer, config.kafka.brokers, broker_config,
config.kafka.cluster_name)
if not prod then
return false, "get kafka producer failed " .. err
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
return false, "get kafka producer failed " .. err
return false, "get kafka producer failed: " .. err

| kafka.kafka_topic | string | True | | | Target topic to push the logs for organisation. |
| kafka.producer_type | string | False | async | ["async", "sync"] | Message sending mode of the producer. |
| kafka.required_acks | integer | False | 1 | [0, 1, -1] | Number of acknowledgements the leader needs to receive for the producer to consider the request complete. This controls the durability of the sent records. The attribute follows the same configuration as the Kafka `acks` attribute. See [Apache Kafka documentation](https://kafka.apache.org/documentation/#producerconfigs_acks) for more. |
| kafka.key | string | False | | | Key used for allocating partitions for messages. |
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cluster_name is not documented?

return false, "failed to send data to Kafka topic: " .. err ..
", brokers: " .. core.json.encode(config.kafka.brokers)
end
core.log.info("send data to kafka: ", core.json.encode(log_message[i]))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we use delay_encode?

core.log.info("sending a batch logs to kafka brokers: ", core.json.encode(config.kafka.brokers))

local broker_config = {}
broker_config["request_timeout"] = config.timeout * 1000
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where is the config from? This function doesn't have an argument called config.

Copy link
Contributor Author

@ronething ronething Jan 17, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it comes from

timeout = {type = "integer", minimum = 1, default = 3},

local config = {}

and set by
config, err = lrucache(plugin_name, metadata.modifiedIndex, update_filter, metadata.value)

return
else
-- reuse producer via lrucache to avoid unbalanced partitions of messages in kafka
prod, err = lrucache(plugin_name .. "#kafka", metadata.modifiedIndex,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Better to use a separate lrucache to cache different data.

core.log.error("this is a error message for test2.")
}
}
--- response_body
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The response_body in these tests is unnecessary as we don't provide responses at all.

core.log.error("this is a error message for test3.")
}
}
--- response_body
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ditto

@ronething
Copy link
Contributor Author

i will change code and update PR later.

@ronething ronething requested a review from spacewander January 17, 2023 08:47
local metadata = plugin.plugin_metadata(plugin_name)
if not (metadata and metadata.value and metadata.modifiedIndex) then
core.log.info("please set the correct plugin_metadata for ", plugin_name)
return
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should return a boolean here?


local function send_to_kafka(log_message)
core.log.info("sending a batch logs to kafka brokers: ",
core.json.delay_encode(config.kafka.brokers))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
core.json.delay_encode(config.kafka.brokers))
core.json.delay_encode(config.kafka.brokers))

end

-- reuse producer via kafka_prod_lrucache to avoid unbalanced partitions of messages in kafka
local prod, err = kafka_prod_lrucache(plugin_name .. "#kafka", metadata.modifiedIndex,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't need to add "#kafka" suffix as this cache is individual

config.kafka.key, core.json.encode(log_message[i]))
if not ok then
return false, "failed to send data to Kafka topic: " .. err ..
", brokers: " .. core.json.delay_encode(config.kafka.brokers)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

delay_encode is only for log

return false, "get kafka producer failed: " .. err
end
core.log.info("kafka cluster name ", config.kafka.cluster_name, ", broker_list[1] port ",
prod.client.broker_list[1].port)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
prod.client.broker_list[1].port)
prod.client.broker_list[1].port)

broker_config["producer_type"] = config.kafka.producer_type
broker_config["required_acks"] = config.kafka.required_acks

local metadata = plugin.plugin_metadata(plugin_name)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems using the config passed from process here might create a race:

Consider we have c1(config) and m1 (modifiedIndex) in process, and c2/m2 in send. It looks like we might use m2 as key and c1 as value in the cache below.

Copy link
Contributor Author

@ronething ronething Jan 28, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@spacewander

It looks like we might use m2 as key and c1 as value in the cache below.

Do you mean that we need to clone config before we use it like below? then we can use m2 and c2 in send.

local config = core.table.clone(config)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A clone of c1 still has a c1's value.
Maybe we can get the c2 in send like what we have done in process.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, i will change the code and update the PR later.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@spacewander Could you please take a look, thanks.

@ronething ronething requested a review from spacewander January 28, 2023 09:58
@ronething
Copy link
Contributor Author

@soulbird Could you please take a look, thanks.

@spacewander spacewander merged commit a5dc4c3 into apache:master Jan 31, 2023
@ronething ronething deleted the feat/error_logger_support_kafka branch January 31, 2023 01:37
hongbinhsu added a commit to fitphp/apix that referenced this pull request Feb 3, 2023
* upstream/master:
  feat(elasticsearch-logger): support multi elasticsearch endpoints (apache#8604)
  chore: use operator # instead of string.len (apache#8751)
  chore: hi 2023 (apache#8748)
  refactor(admin): stream_routes/upstreams/protos/services/global_rules/consumer_groups/plugin_configs (apache#8661)
  feat: support send error-log to kafka brokers (apache#8693)
  chore: upgrade `casbin` to `1.41.5` (apache#8744)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

feat: As a user, I want to support send error-log to kafka, so that we can get error logs from kafka
3 participants