diff --git a/src/sources/fluent.rs b/src/sources/fluent.rs index 5f8f6fb5ccbf3b..85314fcd78247e 100644 --- a/src/sources/fluent.rs +++ b/src/sources/fluent.rs @@ -88,18 +88,9 @@ impl TcpSource for FluentSource { } fn build_event(&self, frame: FluentFrame, host: Bytes) -> Option { - let FluentFrame { - tag, - timestamp, - record, - } = frame; - - let mut log = LogEvent::default(); - log.insert(log_schema().host_key(), host.clone()); - log.insert(log_schema().timestamp_key(), timestamp); - log.insert("tag", tag); - for (key, value) in record.into_iter() { - log.insert_flat(key, value) + let mut log = LogEvent::from(frame); + if log.get(log_schema().host_key()).is_none() { + log.insert(log_schema().host_key(), host); } Some(Event::from(log)) } @@ -201,16 +192,16 @@ impl FluentDecoder { Ok(()) } FluentMessage::PackedForwardWithOptions(tag, bin, options) => { - let buf = match options.compressed.as_str() { - "gzip" => { + let buf = match options.compressed.as_deref() { + Some("gzip") => { let mut buf = Vec::new(); MultiGzDecoder::new(io::Cursor::new(bin.into_vec())) .read_to_end(&mut buf) .map(|_| buf) .map_err(Into::into) } - "text" => Ok(bin.into_vec()), - s => Err(DecodeError::UnknownCompression(s.to_owned())), + Some("text") | None => Ok(bin.into_vec()), + Some(s) => Err(DecodeError::UnknownCompression(s.to_owned())), }?; let mut buf = BytesMut::from(&buf[..]); @@ -325,13 +316,31 @@ impl Decoder for FluentEntryStreamDecoder { } /// Normalized fluent message. -#[derive(Debug)] +#[derive(Debug, PartialEq)] struct FluentFrame { tag: FluentTag, timestamp: FluentTimestamp, record: FluentRecord, } +impl From for LogEvent { + fn from(frame: FluentFrame) -> LogEvent { + let FluentFrame { + tag, + timestamp, + record, + } = frame; + + let mut log = LogEvent::default(); + log.insert(log_schema().timestamp_key(), timestamp); + log.insert("tag", tag); + for (key, value) in record.into_iter() { + log.insert_flat(key, value) + } + log + } +} + /// Fluent msgpack messages can be encoded in one of three ways, each with and without /// options, all using arrays to encode the top-level fields. /// @@ -358,17 +367,20 @@ enum FluentMessage { ForwardWithOptions(FluentTag, Vec, FluentMessageOptions), PackedForward(FluentTag, serde_bytes::ByteBuf), PackedForwardWithOptions(FluentTag, serde_bytes::ByteBuf, FluentMessageOptions), + + // should be last as it'll match any other message Heartbeat(rmpv::Value), // should be Nil if heartbeat } /// Server options sent by client. /// /// https://github.com/fluent/fluentd/wiki/Forward-Protocol-Specification-v1#option -#[derive(Debug, Deserialize)] +#[derive(Default, Debug, Deserialize)] +#[serde(default)] struct FluentMessageOptions { - size: Option, // client provided hint for the number of entries - chunk: Option, // unused right now, would be used for acks - compressed: String, // this one is required if present + size: Option, // client provided hint for the number of entries + chunk: Option, // unused right now, would be used for acks + compressed: Option, // this one is required if present } /// Fluent entry consisting of timestamp and record. @@ -386,7 +398,7 @@ type FluentTag = String; /// Value for fluent record key. /// /// Used mostly just to implement value conversion. -#[derive(Debug, Deserialize)] +#[derive(Debug, Deserialize, PartialEq)] struct FluentValue(rmpv::Value); impl From for Value { @@ -431,7 +443,7 @@ impl From for Value { /// Fluent message timestamp. /// /// Message timestamps can be a unix timestamp or EventTime messagepack ext. -#[derive(Clone, Debug, Deserialize)] +#[derive(Clone, Debug, Deserialize, PartialEq)] #[serde(untagged)] enum FluentTimestamp { #[serde(with = "ts_seconds")] @@ -452,7 +464,7 @@ impl From for Value { /// Custom decoder for Fluent's EventTime msgpack extension. /// /// https://github.com/fluent/fluentd/wiki/Forward-Protocol-Specification-v1#eventtime-ext-format -#[derive(Clone, Debug)] +#[derive(Clone, Debug, PartialEq)] struct FluentEventTime(DateTime); impl<'de> serde::de::Deserialize<'de> for FluentEventTime { @@ -518,11 +530,246 @@ impl<'de> serde::de::Deserialize<'de> for FluentEventTime { #[cfg(test)] mod tests { use super::*; + use shared::{assert_event_data_eq, btreemap}; #[test] fn generate_config() { crate::test_util::test_generate_config::(); } + + // useful references for msgpack: + // Spec: https://github.com/msgpack/msgpack/blob/master/spec.md + // Encode to array of bytes: https://kawanet.github.io/msgpack-lite/ + // Decode base64: https://toolslick.com/conversion/data/messagepack-to-json + + #[test] + fn decode_message_mode() { + //[ + // "tag.name", + // 1441588984, + // {"message": "bar"}, + //] + let message: Vec = vec![ + 147, 168, 116, 97, 103, 46, 110, 97, 109, 101, 206, 85, 236, 230, 248, 129, 167, 109, + 101, 115, 115, 97, 103, 101, 163, 98, 97, 114, + ]; + + let expected = LogEvent::from(btreemap! { + "message" => "bar", + "tag" => "tag.name", + "timestamp" => Value::Timestamp(DateTime::parse_from_rfc3339("2015-09-07T01:23:04Z").unwrap().into()), + }); + assert_event_data_eq!(decode_all(message).unwrap()[0], expected); + } + + #[test] + fn decode_message_mode_with_options() { + //[ + // "tag.name", + // 1441588984, + // { "message": "bar" }, + // { "size": 1 } + //] + let message: Vec = vec![ + 148, 168, 116, 97, 103, 46, 110, 97, 109, 101, 206, 85, 236, 230, 248, 129, 167, 109, + 101, 115, 115, 97, 103, 101, 163, 98, 97, 114, 129, 164, 115, 105, 122, 101, 1, + ]; + + let expected = LogEvent::from(btreemap! { + "message" => "bar", + "tag" => "tag.name", + "timestamp" => Value::Timestamp(DateTime::parse_from_rfc3339("2015-09-07T01:23:04Z").unwrap().into()), + }); + assert_event_data_eq!(decode_all(message).unwrap()[0], expected); + } + + #[test] + fn decode_forward_mode() { + //[ + // "tag.name", + // [ + // [1441588984, {"message": "foo"}], + // [1441588985, {"message": "bar"}], + // [1441588986, {"message": "baz"}] + // ] + //] + let message: Vec = vec![ + 146, 168, 116, 97, 103, 46, 110, 97, 109, 101, 147, 146, 206, 85, 236, 230, 248, 129, + 167, 109, 101, 115, 115, 97, 103, 101, 163, 102, 111, 111, 146, 206, 85, 236, 230, 249, + 129, 167, 109, 101, 115, 115, 97, 103, 101, 163, 98, 97, 114, 146, 206, 85, 236, 230, + 250, 129, 167, 109, 101, 115, 115, 97, 103, 101, 163, 98, 97, 122, + ]; + + let expected = vec![ + LogEvent::from(btreemap! { + "message" => "foo", + "tag" => "tag.name", + "timestamp" => Value::Timestamp(DateTime::parse_from_rfc3339("2015-09-07T01:23:04Z").unwrap().into()), + }), + LogEvent::from(btreemap! { + "message" => "bar", + "tag" => "tag.name", + "timestamp" => Value::Timestamp(DateTime::parse_from_rfc3339("2015-09-07T01:23:05Z").unwrap().into()), + }), + LogEvent::from(btreemap! { + "message" => "baz", + "tag" => "tag.name", + "timestamp" => Value::Timestamp(DateTime::parse_from_rfc3339("2015-09-07T01:23:06Z").unwrap().into()), + }), + ]; + + let got = decode_all(message).unwrap(); + + assert_event_data_eq!(got[0], expected[0]); + assert_event_data_eq!(got[1], expected[1]); + assert_event_data_eq!(got[2], expected[2]); + } + + #[test] + fn decode_forward_mode_with_options() { + //[ + // "tag.name", + // [ + // [1441588984, {"message": "foo"}], + // [1441588985, {"message": "bar"}], + // [1441588986, {"message": "baz"}] + // ], + // {"size": 3} + //] + let message: Vec = vec![ + 147, 168, 116, 97, 103, 46, 110, 97, 109, 101, 147, 146, 206, 85, 236, 230, 248, 129, + 167, 109, 101, 115, 115, 97, 103, 101, 163, 102, 111, 111, 146, 206, 85, 236, 230, 249, + 129, 167, 109, 101, 115, 115, 97, 103, 101, 163, 98, 97, 114, 146, 206, 85, 236, 230, + 250, 129, 167, 109, 101, 115, 115, 97, 103, 101, 163, 98, 97, 122, 129, 164, 115, 105, + 122, 101, 3, + ]; + + let expected = vec![ + LogEvent::from(btreemap! { + "message" => "foo", + "tag" => "tag.name", + "timestamp" => Value::Timestamp(DateTime::parse_from_rfc3339("2015-09-07T01:23:04Z").unwrap().into()), + }), + LogEvent::from(btreemap! { + "message" => "bar", + "tag" => "tag.name", + "timestamp" => Value::Timestamp(DateTime::parse_from_rfc3339("2015-09-07T01:23:05Z").unwrap().into()), + }), + LogEvent::from(btreemap! { + "message" => "baz", + "tag" => "tag.name", + "timestamp" => Value::Timestamp(DateTime::parse_from_rfc3339("2015-09-07T01:23:06Z").unwrap().into()), + }), + ]; + + let got = decode_all(message).unwrap(); + + assert_event_data_eq!(got[0], expected[0]); + assert_event_data_eq!(got[1], expected[1]); + assert_event_data_eq!(got[2], expected[2]); + } + + #[test] + fn decode_packed_forward_mode() { + //[ + // "tag.name", + // + //] + // + //With packed messages as bin: + // [1441588984, {"message": "foo"}] + // [1441588985, {"message": "bar"}] + // [1441588986, {"message": "baz"}] + let message: Vec = vec![ + 147, 168, 116, 97, 103, 46, 110, 97, 109, 101, 196, 57, 146, 206, 85, 236, 230, 248, + 129, 167, 109, 101, 115, 115, 97, 103, 101, 163, 102, 111, 111, 146, 206, 85, 236, 230, + 249, 129, 167, 109, 101, 115, 115, 97, 103, 101, 163, 98, 97, 114, 146, 206, 85, 236, + 230, 250, 129, 167, 109, 101, 115, 115, 97, 103, 101, 163, 98, 97, 122, 129, 167, 109, + 101, 115, 115, 97, 103, 101, 163, 102, 111, 111, + ]; + + let expected = vec![ + LogEvent::from(btreemap! { + "message" => "foo", + "tag" => "tag.name", + "timestamp" => Value::Timestamp(DateTime::parse_from_rfc3339("2015-09-07T01:23:04Z").unwrap().into()), + }), + LogEvent::from(btreemap! { + "message" => "bar", + "tag" => "tag.name", + "timestamp" => Value::Timestamp(DateTime::parse_from_rfc3339("2015-09-07T01:23:05Z").unwrap().into()), + }), + LogEvent::from(btreemap! { + "message" => "baz", + "tag" => "tag.name", + "timestamp" => Value::Timestamp(DateTime::parse_from_rfc3339("2015-09-07T01:23:06Z").unwrap().into()), + }), + ]; + + let got = decode_all(message).unwrap(); + + assert_event_data_eq!(got[0], expected[0]); + assert_event_data_eq!(got[1], expected[1]); + assert_event_data_eq!(got[2], expected[2]); + } + + // TODO + #[test] + fn decode_compressed_packed_forward_mode() { + //[ + // "tag.name", + // , + // {"compressed": "gzip"} + //] + // + //With gzip'd packed messages as bin: + // [1441588984, {"message": "foo"}] + // [1441588985, {"message": "bar"}] + // [1441588986, {"message": "baz"}] + let message: Vec = vec![ + 147, 168, 116, 97, 103, 46, 110, 97, 109, 101, 196, 55, 31, 139, 8, 0, 245, 10, 168, + 96, 0, 3, 155, 116, 46, 244, 205, 179, 31, 141, 203, 115, 83, 139, 139, 19, 211, 83, + 23, 167, 229, 231, 79, 2, 9, 253, 68, 8, 37, 37, 22, 129, 133, 126, 33, 11, 85, 1, 0, + 53, 3, 158, 28, 57, 0, 0, 0, 129, 170, 99, 111, 109, 112, 114, 101, 115, 115, 101, 100, + 164, 103, 122, 105, 112, + ]; + + let expected = vec![ + LogEvent::from(btreemap! { + "message" => "foo", + "tag" => "tag.name", + "timestamp" => Value::Timestamp(DateTime::parse_from_rfc3339("2015-09-07T01:23:04Z").unwrap().into()), + }), + LogEvent::from(btreemap! { + "message" => "bar", + "tag" => "tag.name", + "timestamp" => Value::Timestamp(DateTime::parse_from_rfc3339("2015-09-07T01:23:05Z").unwrap().into()), + }), + LogEvent::from(btreemap! { + "message" => "baz", + "tag" => "tag.name", + "timestamp" => Value::Timestamp(DateTime::parse_from_rfc3339("2015-09-07T01:23:06Z").unwrap().into()), + }), + ]; + + let got = decode_all(message).unwrap(); + + assert_event_data_eq!(got[0], expected[0]); + assert_event_data_eq!(got[1], expected[1]); + assert_event_data_eq!(got[2], expected[2]); + } + + fn decode_all(message: Vec) -> Result, DecodeError> { + let mut buf = BytesMut::from(&message[..]); + + let mut decoder = FluentDecoder::new(); + + let mut frames = vec![]; + while let Some(frame) = decoder.decode(&mut buf)? { + frames.push(LogEvent::from(frame)) + } + Ok(frames) + } } #[cfg(all(test, feature = "fluent-integration-tests"))] @@ -711,7 +958,6 @@ mod integration_tests { sleep(Duration::from_secs(5)).await; let events = collect_ready(out).await; - dbg!(&events); remove_container(&docker, &container.id).await;