Skip to content

Commit

Permalink
Add decoder tests
Browse files Browse the repository at this point in the history
Signed-off-by: Jesse Szwedko <[email protected]>
  • Loading branch information
jszwedko committed May 21, 2021
1 parent d1a8cc0 commit d5e8aab
Showing 1 changed file with 271 additions and 25 deletions.
296 changes: 271 additions & 25 deletions src/sources/fluent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,18 +88,9 @@ impl TcpSource for FluentSource {
}

fn build_event(&self, frame: FluentFrame, host: Bytes) -> Option<Event> {
let FluentFrame {
tag,
timestamp,
record,
} = frame;

let mut log = LogEvent::default();
log.insert(log_schema().host_key(), host.clone());
log.insert(log_schema().timestamp_key(), timestamp);
log.insert("tag", tag);
for (key, value) in record.into_iter() {
log.insert_flat(key, value)
let mut log = LogEvent::from(frame);
if log.get(log_schema().host_key()).is_none() {
log.insert(log_schema().host_key(), host);
}
Some(Event::from(log))
}
Expand Down Expand Up @@ -201,16 +192,16 @@ impl FluentDecoder {
Ok(())
}
FluentMessage::PackedForwardWithOptions(tag, bin, options) => {
let buf = match options.compressed.as_str() {
"gzip" => {
let buf = match options.compressed.as_deref() {
Some("gzip") => {
let mut buf = Vec::new();
MultiGzDecoder::new(io::Cursor::new(bin.into_vec()))
.read_to_end(&mut buf)
.map(|_| buf)
.map_err(Into::into)
}
"text" => Ok(bin.into_vec()),
s => Err(DecodeError::UnknownCompression(s.to_owned())),
Some("text") | None => Ok(bin.into_vec()),
Some(s) => Err(DecodeError::UnknownCompression(s.to_owned())),
}?;

let mut buf = BytesMut::from(&buf[..]);
Expand Down Expand Up @@ -325,13 +316,31 @@ impl Decoder for FluentEntryStreamDecoder {
}

/// Normalized fluent message.
#[derive(Debug)]
#[derive(Debug, PartialEq)]
struct FluentFrame {
tag: FluentTag,
timestamp: FluentTimestamp,
record: FluentRecord,
}

impl From<FluentFrame> for LogEvent {
fn from(frame: FluentFrame) -> LogEvent {
let FluentFrame {
tag,
timestamp,
record,
} = frame;

let mut log = LogEvent::default();
log.insert(log_schema().timestamp_key(), timestamp);
log.insert("tag", tag);
for (key, value) in record.into_iter() {
log.insert_flat(key, value)
}
log
}
}

/// Fluent msgpack messages can be encoded in one of three ways, each with and without
/// options, all using arrays to encode the top-level fields.
///
Expand All @@ -358,17 +367,20 @@ enum FluentMessage {
ForwardWithOptions(FluentTag, Vec<FluentEntry>, FluentMessageOptions),
PackedForward(FluentTag, serde_bytes::ByteBuf),
PackedForwardWithOptions(FluentTag, serde_bytes::ByteBuf, FluentMessageOptions),

// should be last as it'll match any other message
Heartbeat(rmpv::Value), // should be Nil if heartbeat
}

/// Server options sent by client.
///
/// https://github.com/fluent/fluentd/wiki/Forward-Protocol-Specification-v1#option
#[derive(Debug, Deserialize)]
#[derive(Default, Debug, Deserialize)]
#[serde(default)]
struct FluentMessageOptions {
size: Option<u64>, // client provided hint for the number of entries
chunk: Option<String>, // unused right now, would be used for acks
compressed: String, // this one is required if present
size: Option<u64>, // client provided hint for the number of entries
chunk: Option<String>, // unused right now, would be used for acks
compressed: Option<String>, // this one is required if present
}

/// Fluent entry consisting of timestamp and record.
Expand All @@ -386,7 +398,7 @@ type FluentTag = String;
/// Value for fluent record key.
///
/// Used mostly just to implement value conversion.
#[derive(Debug, Deserialize)]
#[derive(Debug, Deserialize, PartialEq)]
struct FluentValue(rmpv::Value);

impl From<FluentValue> for Value {
Expand Down Expand Up @@ -431,7 +443,7 @@ impl From<FluentValue> for Value {
/// Fluent message timestamp.
///
/// Message timestamps can be a unix timestamp or EventTime messagepack ext.
#[derive(Clone, Debug, Deserialize)]
#[derive(Clone, Debug, Deserialize, PartialEq)]
#[serde(untagged)]
enum FluentTimestamp {
#[serde(with = "ts_seconds")]
Expand All @@ -452,7 +464,7 @@ impl From<FluentTimestamp> for Value {
/// Custom decoder for Fluent's EventTime msgpack extension.
///
/// https://github.com/fluent/fluentd/wiki/Forward-Protocol-Specification-v1#eventtime-ext-format
#[derive(Clone, Debug)]
#[derive(Clone, Debug, PartialEq)]
struct FluentEventTime(DateTime<Utc>);

impl<'de> serde::de::Deserialize<'de> for FluentEventTime {
Expand Down Expand Up @@ -518,11 +530,246 @@ impl<'de> serde::de::Deserialize<'de> for FluentEventTime {
#[cfg(test)]
mod tests {
use super::*;
use shared::{assert_event_data_eq, btreemap};

#[test]
fn generate_config() {
crate::test_util::test_generate_config::<FluentConfig>();
}

// useful references for msgpack:
// Spec: https://github.com/msgpack/msgpack/blob/master/spec.md
// Encode to array of bytes: https://kawanet.github.io/msgpack-lite/
// Decode base64: https://toolslick.com/conversion/data/messagepack-to-json

#[test]
fn decode_message_mode() {
//[
// "tag.name",
// 1441588984,
// {"message": "bar"},
//]
let message: Vec<u8> = vec![
147, 168, 116, 97, 103, 46, 110, 97, 109, 101, 206, 85, 236, 230, 248, 129, 167, 109,
101, 115, 115, 97, 103, 101, 163, 98, 97, 114,
];

let expected = LogEvent::from(btreemap! {
"message" => "bar",
"tag" => "tag.name",
"timestamp" => Value::Timestamp(DateTime::parse_from_rfc3339("2015-09-07T01:23:04Z").unwrap().into()),
});
assert_event_data_eq!(decode_all(message).unwrap()[0], expected);
}

#[test]
fn decode_message_mode_with_options() {
//[
// "tag.name",
// 1441588984,
// { "message": "bar" },
// { "size": 1 }
//]
let message: Vec<u8> = vec![
148, 168, 116, 97, 103, 46, 110, 97, 109, 101, 206, 85, 236, 230, 248, 129, 167, 109,
101, 115, 115, 97, 103, 101, 163, 98, 97, 114, 129, 164, 115, 105, 122, 101, 1,
];

let expected = LogEvent::from(btreemap! {
"message" => "bar",
"tag" => "tag.name",
"timestamp" => Value::Timestamp(DateTime::parse_from_rfc3339("2015-09-07T01:23:04Z").unwrap().into()),
});
assert_event_data_eq!(decode_all(message).unwrap()[0], expected);
}

#[test]
fn decode_forward_mode() {
//[
// "tag.name",
// [
// [1441588984, {"message": "foo"}],
// [1441588985, {"message": "bar"}],
// [1441588986, {"message": "baz"}]
// ]
//]
let message: Vec<u8> = vec![
146, 168, 116, 97, 103, 46, 110, 97, 109, 101, 147, 146, 206, 85, 236, 230, 248, 129,
167, 109, 101, 115, 115, 97, 103, 101, 163, 102, 111, 111, 146, 206, 85, 236, 230, 249,
129, 167, 109, 101, 115, 115, 97, 103, 101, 163, 98, 97, 114, 146, 206, 85, 236, 230,
250, 129, 167, 109, 101, 115, 115, 97, 103, 101, 163, 98, 97, 122,
];

let expected = vec![
LogEvent::from(btreemap! {
"message" => "foo",
"tag" => "tag.name",
"timestamp" => Value::Timestamp(DateTime::parse_from_rfc3339("2015-09-07T01:23:04Z").unwrap().into()),
}),
LogEvent::from(btreemap! {
"message" => "bar",
"tag" => "tag.name",
"timestamp" => Value::Timestamp(DateTime::parse_from_rfc3339("2015-09-07T01:23:05Z").unwrap().into()),
}),
LogEvent::from(btreemap! {
"message" => "baz",
"tag" => "tag.name",
"timestamp" => Value::Timestamp(DateTime::parse_from_rfc3339("2015-09-07T01:23:06Z").unwrap().into()),
}),
];

let got = decode_all(message).unwrap();

assert_event_data_eq!(got[0], expected[0]);
assert_event_data_eq!(got[1], expected[1]);
assert_event_data_eq!(got[2], expected[2]);
}

#[test]
fn decode_forward_mode_with_options() {
//[
// "tag.name",
// [
// [1441588984, {"message": "foo"}],
// [1441588985, {"message": "bar"}],
// [1441588986, {"message": "baz"}]
// ],
// {"size": 3}
//]
let message: Vec<u8> = vec![
147, 168, 116, 97, 103, 46, 110, 97, 109, 101, 147, 146, 206, 85, 236, 230, 248, 129,
167, 109, 101, 115, 115, 97, 103, 101, 163, 102, 111, 111, 146, 206, 85, 236, 230, 249,
129, 167, 109, 101, 115, 115, 97, 103, 101, 163, 98, 97, 114, 146, 206, 85, 236, 230,
250, 129, 167, 109, 101, 115, 115, 97, 103, 101, 163, 98, 97, 122, 129, 164, 115, 105,
122, 101, 3,
];

let expected = vec![
LogEvent::from(btreemap! {
"message" => "foo",
"tag" => "tag.name",
"timestamp" => Value::Timestamp(DateTime::parse_from_rfc3339("2015-09-07T01:23:04Z").unwrap().into()),
}),
LogEvent::from(btreemap! {
"message" => "bar",
"tag" => "tag.name",
"timestamp" => Value::Timestamp(DateTime::parse_from_rfc3339("2015-09-07T01:23:05Z").unwrap().into()),
}),
LogEvent::from(btreemap! {
"message" => "baz",
"tag" => "tag.name",
"timestamp" => Value::Timestamp(DateTime::parse_from_rfc3339("2015-09-07T01:23:06Z").unwrap().into()),
}),
];

let got = decode_all(message).unwrap();

assert_event_data_eq!(got[0], expected[0]);
assert_event_data_eq!(got[1], expected[1]);
assert_event_data_eq!(got[2], expected[2]);
}

#[test]
fn decode_packed_forward_mode() {
//[
// "tag.name",
// <packed messages>
//]
//
//With packed messages as bin:
// [1441588984, {"message": "foo"}]
// [1441588985, {"message": "bar"}]
// [1441588986, {"message": "baz"}]
let message: Vec<u8> = vec![
147, 168, 116, 97, 103, 46, 110, 97, 109, 101, 196, 57, 146, 206, 85, 236, 230, 248,
129, 167, 109, 101, 115, 115, 97, 103, 101, 163, 102, 111, 111, 146, 206, 85, 236, 230,
249, 129, 167, 109, 101, 115, 115, 97, 103, 101, 163, 98, 97, 114, 146, 206, 85, 236,
230, 250, 129, 167, 109, 101, 115, 115, 97, 103, 101, 163, 98, 97, 122, 129, 167, 109,
101, 115, 115, 97, 103, 101, 163, 102, 111, 111,
];

let expected = vec![
LogEvent::from(btreemap! {
"message" => "foo",
"tag" => "tag.name",
"timestamp" => Value::Timestamp(DateTime::parse_from_rfc3339("2015-09-07T01:23:04Z").unwrap().into()),
}),
LogEvent::from(btreemap! {
"message" => "bar",
"tag" => "tag.name",
"timestamp" => Value::Timestamp(DateTime::parse_from_rfc3339("2015-09-07T01:23:05Z").unwrap().into()),
}),
LogEvent::from(btreemap! {
"message" => "baz",
"tag" => "tag.name",
"timestamp" => Value::Timestamp(DateTime::parse_from_rfc3339("2015-09-07T01:23:06Z").unwrap().into()),
}),
];

let got = decode_all(message).unwrap();

assert_event_data_eq!(got[0], expected[0]);
assert_event_data_eq!(got[1], expected[1]);
assert_event_data_eq!(got[2], expected[2]);
}

// TODO
#[test]
fn decode_compressed_packed_forward_mode() {
//[
// "tag.name",
// <packed messages>,
// {"compressed": "gzip"}
//]
//
//With gzip'd packed messages as bin:
// [1441588984, {"message": "foo"}]
// [1441588985, {"message": "bar"}]
// [1441588986, {"message": "baz"}]
let message: Vec<u8> = vec![
147, 168, 116, 97, 103, 46, 110, 97, 109, 101, 196, 55, 31, 139, 8, 0, 245, 10, 168,
96, 0, 3, 155, 116, 46, 244, 205, 179, 31, 141, 203, 115, 83, 139, 139, 19, 211, 83,
23, 167, 229, 231, 79, 2, 9, 253, 68, 8, 37, 37, 22, 129, 133, 126, 33, 11, 85, 1, 0,
53, 3, 158, 28, 57, 0, 0, 0, 129, 170, 99, 111, 109, 112, 114, 101, 115, 115, 101, 100,
164, 103, 122, 105, 112,
];

let expected = vec![
LogEvent::from(btreemap! {
"message" => "foo",
"tag" => "tag.name",
"timestamp" => Value::Timestamp(DateTime::parse_from_rfc3339("2015-09-07T01:23:04Z").unwrap().into()),
}),
LogEvent::from(btreemap! {
"message" => "bar",
"tag" => "tag.name",
"timestamp" => Value::Timestamp(DateTime::parse_from_rfc3339("2015-09-07T01:23:05Z").unwrap().into()),
}),
LogEvent::from(btreemap! {
"message" => "baz",
"tag" => "tag.name",
"timestamp" => Value::Timestamp(DateTime::parse_from_rfc3339("2015-09-07T01:23:06Z").unwrap().into()),
}),
];

let got = decode_all(message).unwrap();

assert_event_data_eq!(got[0], expected[0]);
assert_event_data_eq!(got[1], expected[1]);
assert_event_data_eq!(got[2], expected[2]);
}

fn decode_all(message: Vec<u8>) -> Result<Vec<LogEvent>, DecodeError> {
let mut buf = BytesMut::from(&message[..]);

let mut decoder = FluentDecoder::new();

let mut frames = vec![];
while let Some(frame) = decoder.decode(&mut buf)? {
frames.push(LogEvent::from(frame))
}
Ok(frames)
}
}

#[cfg(all(test, feature = "fluent-integration-tests"))]
Expand Down Expand Up @@ -711,7 +958,6 @@ mod integration_tests {
sleep(Duration::from_secs(5)).await;

let events = collect_ready(out).await;
dbg!(&events);

remove_container(&docker, &container.id).await;

Expand Down

0 comments on commit d5e8aab

Please sign in to comment.