-
Notifications
You must be signed in to change notification settings - Fork 287
/
simple_consumer.rs
133 lines (115 loc) · 4.6 KB
/
simple_consumer.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
use clap::{App, Arg};
use log::{info, warn};
use rdkafka::client::ClientContext;
use rdkafka::config::{ClientConfig, RDKafkaLogLevel};
use rdkafka::consumer::stream_consumer::StreamConsumer;
use rdkafka::consumer::{BaseConsumer, CommitMode, Consumer, ConsumerContext, Rebalance};
use rdkafka::error::KafkaResult;
use rdkafka::message::{Headers, Message};
use rdkafka::topic_partition_list::TopicPartitionList;
use rdkafka::util::get_rdkafka_version;
use crate::example_utils::setup_logger;
mod example_utils;
// A context can be used to change the behavior of producers and consumers by adding callbacks
// that will be executed by librdkafka.
// This particular context sets up custom callbacks to log rebalancing events.
struct CustomContext;
impl ClientContext for CustomContext {}
impl ConsumerContext for CustomContext {
fn pre_rebalance(&self, _: &BaseConsumer<Self>, rebalance: &Rebalance) {
info!("Pre rebalance {:?}", rebalance);
}
fn post_rebalance(&self, _: &BaseConsumer<Self>, rebalance: &Rebalance) {
info!("Post rebalance {:?}", rebalance);
}
fn commit_callback(&self, result: KafkaResult<()>, _offsets: &TopicPartitionList) {
info!("Committing offsets: {:?}", result);
}
}
// A type alias with your custom consumer can be created for convenience.
type LoggingConsumer = StreamConsumer<CustomContext>;
async fn consume_and_print(brokers: &str, group_id: &str, topics: &[&str]) {
let context = CustomContext;
let consumer: LoggingConsumer = ClientConfig::new()
.set("group.id", group_id)
.set("bootstrap.servers", brokers)
.set("enable.partition.eof", "false")
.set("session.timeout.ms", "6000")
.set("enable.auto.commit", "true")
//.set("statistics.interval.ms", "30000")
//.set("auto.offset.reset", "smallest")
.set_log_level(RDKafkaLogLevel::Debug)
.create_with_context(context)
.expect("Consumer creation failed");
consumer
.subscribe(&topics.to_vec())
.expect("Can't subscribe to specified topics");
loop {
match consumer.recv().await {
Err(e) => warn!("Kafka error: {}", e),
Ok(m) => {
let payload = match m.payload_view::<str>() {
None => "",
Some(Ok(s)) => s,
Some(Err(e)) => {
warn!("Error while deserializing message payload: {:?}", e);
""
}
};
info!("key: '{:?}', payload: '{}', topic: {}, partition: {}, offset: {}, timestamp: {:?}",
m.key(), payload, m.topic(), m.partition(), m.offset(), m.timestamp());
if let Some(headers) = m.headers() {
for header in headers.iter() {
info!(" Header {:#?}: {:?}", header.key, header.value);
}
}
consumer.commit_message(&m, CommitMode::Async).unwrap();
}
};
}
}
#[tokio::main]
async fn main() {
let matches = App::new("consumer example")
.version(option_env!("CARGO_PKG_VERSION").unwrap_or(""))
.about("Simple command line consumer")
.arg(
Arg::with_name("brokers")
.short("b")
.long("brokers")
.help("Broker list in kafka format")
.takes_value(true)
.default_value("localhost:9092"),
)
.arg(
Arg::with_name("group-id")
.short("g")
.long("group-id")
.help("Consumer group id")
.takes_value(true)
.default_value("example_consumer_group_id"),
)
.arg(
Arg::with_name("log-conf")
.long("log-conf")
.help("Configure the logging format (example: 'rdkafka=trace')")
.takes_value(true),
)
.arg(
Arg::with_name("topics")
.short("t")
.long("topics")
.help("Topic list")
.takes_value(true)
.multiple(true)
.required(true),
)
.get_matches();
setup_logger(true, matches.value_of("log-conf"));
let (version_n, version_s) = get_rdkafka_version();
info!("rd_kafka_version: 0x{:08x}, {}", version_n, version_s);
let topics = matches.values_of("topics").unwrap().collect::<Vec<&str>>();
let brokers = matches.value_of("brokers").unwrap();
let group_id = matches.value_of("group-id").unwrap();
consume_and_print(brokers, group_id, &topics).await
}