-
Notifications
You must be signed in to change notification settings - Fork 5.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Kafka Producer & Consumer - Topic based routing based on criteria specified in an expression #4170
Comments
On the Kafka output, have you tried using the For the Kafka input, you can use the Take a look at these and let me know what cases they are not handling. |
@danielnelson Topic suffix tags do not meet our needs for the Kafka output plugin. We have tried it and we are looking to have more flexibility as we
The suggestion for the Kafka input may be workable. I will look into it. |
Here is a good example of tag based routing, it is not as direct as what is suggested in that issue, but it ends up being very flexible. As a potential workaround for topic selection until we can add something more elegant, you could try using the measurement filtering and multiple kafka outputs: [[outputs.kafka]]
brokers = ["localhost:9092"]
topic = "telegraf"
[outputs.kafka.topic_suffix]
method = "tags"
keys = ["foo", "bar"]
separator = "_"
# must have foo and bar tags
[outputs.kafka.tagpass]
foo = ["*"]
bar = ["*"]
[[outputs.kafka]]
brokers = ["localhost:9092"]
topic = "telegraf"
# must not have foo and bar tags
[outputs.kafka.tagdrop]
foo = ["*"]
bar = ["*"] |
@danielnelson your example is invalid (maybe it wasn't at the time?) because tagpass/tagdrop is OR and not AND according to the docs at https://docs.influxdata.com/telegraf/v1.9/administration/configuration/#aggregator-configuration-examples |
Yes that's a good point, the comments should read "must have foo or bar tags" |
I was the originator of the requested use case. We are going to be posting a pull request at some point soon that has a somewhat generic implementation where we use measurement name regexp as the criteria to drive the mapping to topic name. @scottgrover Will be working on it for us. |
Sounds good, update us with a sample of what the configuration design could be before getting to deep into implementation. |
Hi there, After reviewing how the kafka output plugin works, I was thinking about adding the logic Jim and I discussed in the GetTopicName function. We want to add something similar to the topic_suffix logic, possibly named topic_routing to avoid any naming confusion. It's possible this could grow to multiple types of routing but we'd like to start out with regex based routing to start. Here's an example of what the configuration could look like. Let me know your thoughts. [kafka.output.topic_routing]
method = "regex"
[[route]]
# Route measurement names ending in 0-9 to topic1
routeregex = "measurement_\d{1}\b"
topicname = "topic1"
[[route]]
# Route measurement name ending in double digits to topic2
routeregex = "measurement_\d{2}\b"
topicname = "topic2"
[[route]]
# Route measurement name starting with 3 a's to topic3
routeregex = "[a]{3}.*"
topicname = "topic3"
Scott |
This type of functionality, routing based on the metric data, is useful in many or even most of the outputs,. This makes me think that whatever solution we do should be pulled back into the filtering options and/or processors. I feel the same way about the topic_suffix work from before as well. In these examples, I notice that the topic is static, so it should be possible to do something today like: [[outputs.kafka]]
topic = "topic1"
namepass = ["measurement_[0-9]"]
[[outputs.kafka]]
topic = "topic2"
namepass = ["measurement_[0-9][0-9]"]
[[outputs.kafka]]
topic = "topic3"
namepass = ["aaa*"] Of course these are just examples, and obviously the glob syntax is not as advanced as a regular expressions. Can you describe some cases where you are routing off of the measurement but globs are not powerful enough? Generally, I think of the measurement name to be similar to the name of a class or struct, and so it wouldn't have a ton of variation, could you give me some more information on how they are being assigned? |
I agree with what you say about the measurement name. We don't have too much variation. Most of our measurement names are straight forward, containing strings separated by anything allowed by influx. Examples: Some example topics are: Some questions:
We're under the impression that multiple Kafka output pipelines, and namepass to forward the metrics to said plugins, will have a larger overhead than a single output plugin with routing logic discussed embedded into it. Especially if we move towards pulling 50+ metrics out of our default topic, and routing them to their own kafka topics. At the same time, when I look at the code I'm under the understanding that when the namepass evaluation passes and the topic gets set, that, if no topic suffix is configured, it would hit the default case in GetTopicName and be routed to the correct topic configured by the namepass with minimal overhead. The only reason I doubt this is I have yet to dig much deeper into how telegraf initializes these output plugins, especially in the case that there's multiple output plugins defined. If there is large overhead with multiple output pipelines, we'll be moving forward with developing routing within the Kafka output plugin as we're looking towards moving a large amount of metrics (50+) into their own topics. |
This does define multiple independent outputs, and there are some consequences from moving to multiple plugins. The biggest one is that each output plugin has its own metric buffer. Separate metric buffers means can set the buffer limit and batch size separately for each plugin if needed. The memory use would potentially be higher, as I assume that the sum of the buffer limits would probably be larger than a single shared buffer. When used with the kafka_consumer though, the number of metrics read on the input would stay the same, limited by If you have output plugins that handle very few messages, they could potentially not fill their batch as quickly which would cause them to wait until their next flush interval. Recall that output plugins write after they receive metric_batch size new metrics or the flush interval is reached, whichever comes first. For the best throughput, it may be required to increase the You would also have connections for each output plugin to Kafka, in this example it would have 3 times the connections. Each output does create an extra goroutine, but the extra overhead of this is very minor, as is the extra memory used by the plugins. These are not worth considering compared to the overall cost of the system. Namepass also uses globs which are faster than regular expressions in Go, but I think this would also be a minor difference. If nothing above sounds too problematic, I suggest trying out this method first on a few topics since support for it already exists. If it doesn't work well for some reason and we can't think of an easy resolution, I think it would make sense for us to design a processor that adds a tag to metric based on a pattern, and then add |
I think we can deal with the memory increase, and the metric buffer issue you mentioned. The metrics we're going to be pushing into separate topics are the ones that send at a ridiculously high rate, not something that we'll be waiting on the metric buffer to get filled. I don't believe adding more processor plugins into our pipeline will work for our use case. We have a huge influx pipeline. @JimHagan keep me honest, but I don't think it'd be much of a stretch to say its one of the largest. I've forked telegraf, so I can push my code publicly, and pushed a small change to the kafka output plugin that allows the functionality we're looking for. It's in its infancy, as we're still testing it on our pipeline and looking to add to its current design. But @danielnelson I'd love your perspective on how we're approaching it. I still need to write the unit test for my addition, but should be able to handle get that pushed tomorrow or early Thursday. Let me know your thoughts: https://github.com/scottgrover/telegraf |
Code looks good, my only suggestion is that you will want to use |
Feature Request
Proposal:
Wayfair is requesting this capability to enable smarter routing of Kafka traffic to various Topics.
Telegraf Kafka Consumer could then direct the data to different InfluxDB instances based on the Topic.
Below is an example of what this would look like:
We are looking to have flexible Kafka routing engine for both the Telegraf Kafka consumer and producer.
For Kafka Producer…
We are looking for something that works sort of like a map data structure (or something like a case statement). Imagine the following…
TOPIC_ROUTES:
CRITERIA_SET_1:
TOPIC(S)
CRITERIA_SET_2:
TOPIC(S)
CRITERIA_SET_3:
TOPIC(S)
DEFAULT:
TOPIC(S)
NOTE: we want the option of using measurement name or tag based criteria sets. Note we may want the flexibility of replicating data to more than one topic.
For the Kafka Consumer we are looking for a routing scheme that will enable us to specify a specific cluster and DB destination per topic set (not necessarily a v1 feature)
Note we may want the flexibility of replicating data to more than one db (not necessarily a v1 feature)
It would allow Telegraf to direct data to appropriate InfluxDB cluster or instance. A very powerful feature which will greatly simplify scaling up or segmenting workloads.
The text was updated successfully, but these errors were encountered: