-
Notifications
You must be signed in to change notification settings - Fork 287
/
asynchronous_processing.rs
191 lines (175 loc) · 7 KB
/
asynchronous_processing.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
use std::thread;
use std::time::Duration;
use clap::{value_t, App, Arg};
use futures::stream::FuturesUnordered;
use futures::{StreamExt, TryStreamExt};
use log::info;
use rdkafka::config::ClientConfig;
use rdkafka::consumer::stream_consumer::StreamConsumer;
use rdkafka::consumer::Consumer;
use rdkafka::message::{BorrowedMessage, OwnedMessage};
use rdkafka::producer::{FutureProducer, FutureRecord};
use rdkafka::Message;
use crate::example_utils::setup_logger;
mod example_utils;
async fn record_borrowed_message_receipt(msg: &BorrowedMessage<'_>) {
// Simulate some work that must be done in the same order as messages are
// received; i.e., before truly parallel processing can begin.
info!("Message received: {}", msg.offset());
}
async fn record_owned_message_receipt(_msg: &OwnedMessage) {
// Like `record_borrowed_message_receipt`, but takes an `OwnedMessage`
// instead, as in a real-world use case an `OwnedMessage` might be more
// convenient than a `BorrowedMessage`.
}
// Emulates an expensive, synchronous computation.
fn expensive_computation<'a>(msg: OwnedMessage) -> String {
info!("Starting expensive computation on message {}", msg.offset());
thread::sleep(Duration::from_millis(rand::random::<u64>() % 5000));
info!(
"Expensive computation completed on message {}",
msg.offset()
);
match msg.payload_view::<str>() {
Some(Ok(payload)) => format!("Payload len for {} is {}", payload, payload.len()),
Some(Err(_)) => "Message payload is not a string".to_owned(),
None => "No payload".to_owned(),
}
}
// Creates all the resources and runs the event loop. The event loop will:
// 1) receive a stream of messages from the `StreamConsumer`.
// 2) filter out eventual Kafka errors.
// 3) send the message to a thread pool for processing.
// 4) produce the result to the output topic.
// `tokio::spawn` is used to handle IO-bound tasks in parallel (e.g., producing
// the messages), while `tokio::task::spawn_blocking` is used to handle the
// simulated CPU-bound task.
async fn run_async_processor(
brokers: String,
group_id: String,
input_topic: String,
output_topic: String,
) {
// Create the `StreamConsumer`, to receive the messages from the topic in form of a `Stream`.
let consumer: StreamConsumer = ClientConfig::new()
.set("group.id", &group_id)
.set("bootstrap.servers", &brokers)
.set("enable.partition.eof", "false")
.set("session.timeout.ms", "6000")
.set("enable.auto.commit", "false")
.create()
.expect("Consumer creation failed");
consumer
.subscribe(&[&input_topic])
.expect("Can't subscribe to specified topic");
// Create the `FutureProducer` to produce asynchronously.
let producer: FutureProducer = ClientConfig::new()
.set("bootstrap.servers", &brokers)
.set("message.timeout.ms", "5000")
.create()
.expect("Producer creation error");
// Create the outer pipeline on the message stream.
let stream_processor = consumer.stream().try_for_each(|borrowed_message| {
let producer = producer.clone();
let output_topic = output_topic.to_string();
async move {
// Process each message
record_borrowed_message_receipt(&borrowed_message).await;
// Borrowed messages can't outlive the consumer they are received from, so they need to
// be owned in order to be sent to a separate thread.
let owned_message = borrowed_message.detach();
record_owned_message_receipt(&owned_message).await;
tokio::spawn(async move {
// The body of this block will be executed on the main thread pool,
// but we perform `expensive_computation` on a separate thread pool
// for CPU-intensive tasks via `tokio::task::spawn_blocking`.
let computation_result =
tokio::task::spawn_blocking(|| expensive_computation(owned_message))
.await
.expect("failed to wait for expensive computation");
let produce_future = producer.send(
FutureRecord::to(&output_topic)
.key("some key")
.payload(&computation_result),
Duration::from_secs(0),
);
match produce_future.await {
Ok(delivery) => println!("Sent: {:?}", delivery),
Err((e, _)) => println!("Error: {:?}", e),
}
});
Ok(())
}
});
info!("Starting event loop");
stream_processor.await.expect("stream processing failed");
info!("Stream processing terminated");
}
#[tokio::main]
async fn main() {
let matches = App::new("Async example")
.version(option_env!("CARGO_PKG_VERSION").unwrap_or(""))
.about("Asynchronous computation example")
.arg(
Arg::with_name("brokers")
.short("b")
.long("brokers")
.help("Broker list in kafka format")
.takes_value(true)
.default_value("localhost:9092"),
)
.arg(
Arg::with_name("group-id")
.short("g")
.long("group-id")
.help("Consumer group id")
.takes_value(true)
.default_value("example_consumer_group_id"),
)
.arg(
Arg::with_name("log-conf")
.long("log-conf")
.help("Configure the logging format (example: 'rdkafka=trace')")
.takes_value(true),
)
.arg(
Arg::with_name("input-topic")
.long("input-topic")
.help("Input topic")
.takes_value(true)
.required(true),
)
.arg(
Arg::with_name("output-topic")
.long("output-topic")
.help("Output topic")
.takes_value(true)
.required(true),
)
.arg(
Arg::with_name("num-workers")
.long("num-workers")
.help("Number of workers")
.takes_value(true)
.default_value("1"),
)
.get_matches();
setup_logger(true, matches.value_of("log-conf"));
let brokers = matches.value_of("brokers").unwrap();
let group_id = matches.value_of("group-id").unwrap();
let input_topic = matches.value_of("input-topic").unwrap();
let output_topic = matches.value_of("output-topic").unwrap();
let num_workers = value_t!(matches, "num-workers", usize).unwrap();
(0..num_workers)
.map(|_| {
tokio::spawn(run_async_processor(
brokers.to_owned(),
group_id.to_owned(),
input_topic.to_owned(),
output_topic.to_owned(),
))
})
.collect::<FuturesUnordered<_>>()
.for_each(|_| async { () })
.await
}