forked from vectordotdev/vector
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathhttp.rs
103 lines (93 loc) · 3.64 KB
/
http.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
use criterion::{criterion_group, BatchSize, BenchmarkId, Criterion, SamplingMode, Throughput};
use futures::TryFutureExt;
use hyper::{
service::{make_service_fn, service_fn},
Body, Response, Server,
};
use std::net::SocketAddr;
use tokio::runtime::Runtime;
use vector::{
config, sinks,
sinks::util::Compression,
sources,
test_util::{next_addr, random_lines, runtime, send_lines, start_topology, wait_for_tcp},
Error,
};
fn benchmark_http(c: &mut Criterion) {
let num_lines: usize = 1_000;
let line_size: usize = 100;
let in_addr = next_addr();
let out_addr = next_addr();
let _srv = serve(out_addr);
let mut group = c.benchmark_group("http");
group.throughput(Throughput::Bytes((num_lines * line_size) as u64));
group.sampling_mode(SamplingMode::Flat);
for compression in [Compression::None, Compression::gzip_default()].iter() {
group.bench_with_input(
BenchmarkId::new("compression", compression),
compression,
|b, compression| {
b.iter_batched(
|| {
let mut config = config::Config::builder();
config.add_source(
"in",
sources::socket::SocketConfig::make_basic_tcp_config(in_addr),
);
config.add_sink(
"out",
&["in"],
sinks::http::HttpSinkConfig {
uri: out_addr.to_string().parse::<http::Uri>().unwrap().into(),
compression: *compression,
method: Default::default(),
auth: Default::default(),
headers: Default::default(),
batch: sinks::util::BatchConfig {
max_bytes: Some(num_lines * line_size),
..Default::default()
},
encoding: sinks::http::Encoding::Text.into(),
request: Default::default(),
tls: Default::default(),
},
);
let mut rt = runtime();
let topology = rt.block_on(async move {
let (topology, _crash) =
start_topology(config.build().unwrap(), false).await;
wait_for_tcp(in_addr).await;
topology
});
(rt, topology)
},
|(mut rt, topology)| {
rt.block_on(async move {
let lines = random_lines(line_size).take(num_lines);
send_lines(in_addr, lines).await.unwrap();
topology.stop().await;
})
},
BatchSize::PerIteration,
)
},
);
}
group.finish();
}
fn serve(addr: SocketAddr) -> Runtime {
let rt = runtime();
rt.spawn(async move {
let make_service = make_service_fn(|_| async {
Ok::<_, Error>(service_fn(|_req| async {
Ok::<_, Error>(Response::new(Body::empty()))
}))
});
Server::bind(&addr)
.serve(make_service)
.map_err(|e| panic!(e))
.await
});
rt
}
criterion_group!(benches, benchmark_http);