-
Notifications
You must be signed in to change notification settings - Fork 287
/
mocking.rs
84 lines (74 loc) · 2.57 KB
/
mocking.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
//! This example is similar to the roundtrip one but uses the mock API.
use std::convert::TryInto;
use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
use hdrhistogram::Histogram;
use rdkafka::config::ClientConfig;
use rdkafka::consumer::{Consumer, StreamConsumer};
use rdkafka::message::Message;
use rdkafka::mocking::MockCluster;
use rdkafka::producer::{FutureProducer, FutureRecord};
#[tokio::main]
async fn main() {
const TOPIC: &str = "test_topic";
let mock_cluster = MockCluster::new(3).unwrap();
mock_cluster
.create_topic(TOPIC, 32, 3)
.expect("Failed to create topic");
let producer: FutureProducer = ClientConfig::new()
.set("bootstrap.servers", mock_cluster.bootstrap_servers())
.create()
.expect("Producer creation error");
let consumer: StreamConsumer = ClientConfig::new()
.set("bootstrap.servers", mock_cluster.bootstrap_servers())
.set("group.id", "rust-rdkafka-mock-example")
.create()
.expect("Consumer creation failed");
consumer.subscribe(&[TOPIC]).unwrap();
tokio::spawn(async move {
let mut i = 0_usize;
loop {
producer
.send_result(
FutureRecord::to(TOPIC)
.key(&i.to_string())
.payload("dummy")
.timestamp(now()),
)
.unwrap()
.await
.unwrap()
.unwrap();
i += 1;
}
});
let start = Instant::now();
let mut latencies = Histogram::<u64>::new(5).unwrap();
println!("Warming up for 10s...");
loop {
let message = consumer.recv().await.unwrap();
let then = message.timestamp().to_millis().unwrap();
if start.elapsed() < Duration::from_secs(10) {
// Warming up.
} else if start.elapsed() < Duration::from_secs(20) {
if latencies.is_empty() {
println!("Recording for 10s...");
}
latencies += (now() - then) as u64;
} else {
break;
}
}
println!("measurements: {}", latencies.len());
println!("mean latency: {}ms", latencies.mean());
println!("p50 latency: {}ms", latencies.value_at_quantile(0.50));
println!("p90 latency: {}ms", latencies.value_at_quantile(0.90));
println!("p99 latency: {}ms", latencies.value_at_quantile(0.99));
}
fn now() -> i64 {
SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_millis()
.try_into()
.unwrap()
}