forked from vectordotdev/vector
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathbuffering.rs
139 lines (119 loc) · 4.88 KB
/
buffering.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
#![cfg(feature = "leveldb")]
use futures::{SinkExt, StreamExt};
use shared::assert_event_data_eq;
use tempfile::tempdir;
use tokio::runtime::Runtime;
use tracing::trace;
use vector::{
buffers::BufferConfig,
config,
test_util::{
random_events_with_stream, runtime, start_topology, trace_init, wait_for_atomic_usize,
CountReceiver,
},
topology,
};
mod support;
fn terminate_abruptly(rt: Runtime, topology: topology::RunningTopology) {
drop(rt);
drop(topology);
}
#[test]
fn test_buffering() {
trace_init();
let data_dir = tempdir().unwrap();
let data_dir = data_dir.path().to_path_buf();
trace!(message = "Test data dir", ?data_dir);
let num_events: usize = 10;
let line_length = 100;
let max_size = 10_000;
let expected_events_count = num_events * 2;
assert!(
line_length * expected_events_count <= max_size,
"Test parameters are invalid, this test implies that all lines will fit
into the buffer, but the buffer is not big enough"
);
// Run vector with a dead sink, and then shut it down without sink ever
// accepting any data.
let (in_tx, source_config, source_event_counter) = support::source_with_event_counter();
let sink_config = support::sink_dead();
let config = {
let mut config = config::Config::builder();
config.add_source("in", source_config);
config.add_sink("out", &["in"], sink_config);
config.sinks["out"].buffer = BufferConfig::Disk {
max_size,
when_full: Default::default(),
};
config.global.data_dir = Some(data_dir.clone());
config.build().unwrap()
};
let rt = runtime();
let (topology, input_events) = rt.block_on(async move {
let (topology, _crash) = start_topology(config, false).await;
let (input_events, input_events_stream) =
random_events_with_stream(line_length, num_events, None);
let mut input_events_stream = input_events_stream.map(Ok);
let _ = in_tx
.sink_map_err(|error| panic!("{}", error))
.send_all(&mut input_events_stream)
.await
.unwrap();
// We need to wait for at least the source to process events.
// This is to avoid a race after we send all events, at that point two things
// can happen in any order, we reaching `terminate_abruptly` and source processing
// all of the events. We need for the source to process events before `terminate_abruptly`
// so we wait for that here.
wait_for_atomic_usize(source_event_counter, |x| x == num_events).await;
(topology, input_events)
});
// Now we give it some time for the events to propagate to File.
// This is to avoid a race after the source processes all events, at that point two things
// can happen in any order, we reaching `terminate_abruptly` and events being written
// to file. We need for the events to be written to the file before `terminate_abruptly`.
// We can't know when exactly all of the events have been written, so we have to guess.
// But it should be shortly after source processing all of the events.
std::thread::sleep(std::time::Duration::from_secs(1));
// Simulate a crash.
terminate_abruptly(rt, topology);
// Then run vector again with a sink that accepts events now. It should
// send all of the events from the first run.
let (in_tx, source_config) = support::source();
let (out_rx, sink_config) = support::sink(10);
let config = {
let mut config = config::Config::builder();
config.add_source("in", source_config);
config.add_sink("out", &["in"], sink_config);
config.sinks["out"].buffer = BufferConfig::Disk {
max_size,
when_full: Default::default(),
};
config.global.data_dir = Some(data_dir);
config.build().unwrap()
};
let rt = runtime();
rt.block_on(async move {
let (topology, _crash) = start_topology(config, false).await;
let (input_events2, input_events_stream) =
random_events_with_stream(line_length, num_events, None);
let mut input_events_stream = input_events_stream.map(Ok);
let _ = in_tx
.sink_map_err(|error| panic!("{}", error))
.send_all(&mut input_events_stream)
.await
.unwrap();
let output_events = CountReceiver::receive_events(out_rx);
topology.stop().await;
let output_events = output_events.await;
assert_eq!(
expected_events_count,
output_events.len(),
"Expected: {:?}{:?}, got: {:?}",
input_events,
input_events2,
output_events
);
assert_event_data_eq!(&input_events[..], &output_events[..num_events]);
assert_event_data_eq!(&input_events2[..], &output_events[num_events..]);
});
}