From 1181f8476f9f9e7861c48852af5604f4f7fa3334 Mon Sep 17 00:00:00 2001 From: Harpo Roeder Date: Mon, 20 Jul 2020 09:53:27 -0700 Subject: [PATCH 01/17] add split chunk test --- src/decode.rs | 21 +++++++++++++++++++++ 1 file changed, 21 insertions(+) diff --git a/src/decode.rs b/src/decode.rs index 4e06155..aa13759 100644 --- a/src/decode.rs +++ b/src/decode.rs @@ -365,6 +365,27 @@ mod tests { ); assert_eq!(decoded.poll(), Ok(Ready(None))); + let mut decoded = Decoded::new(one_chunk(b"message:hell") + .chain(delay_one_then(chunk(b"o\n\nmessage:"))) + .chain(delay_one_then(chunk(b"world\n\n")))); + + assert_eq!(decoded.poll(), Ok(NotReady)); + assert_eq!( + decoded.poll(), + Ok(Ready(Some(event( + "", + &btreemap! {"message" => &b"hello"[..]} + )))) + ); + assert_eq!( + decoded.poll(), + Ok(Ready(Some(event( + "", + &btreemap! {"message" => &b"world"[..]} + )))) + ); + assert_eq!(decoded.poll(), Ok(Ready(None))); + let interrupted_after_event = one_chunk(b"message: hello\n\n") .chain(stream::poll_fn(|| Err(dummy_stream_error("read error")))); let mut decoded = Decoded::new(interrupted_after_event); From c9a1d32818efec11a9209a589683d1dd560afb94 Mon Sep 17 00:00:00 2001 From: Harpo Roeder Date: Mon, 20 Jul 2020 09:56:59 -0700 Subject: [PATCH 02/17] add multiple data lines test --- src/decode.rs | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/src/decode.rs b/src/decode.rs index aa13759..186f028 100644 --- a/src/decode.rs +++ b/src/decode.rs @@ -386,6 +386,19 @@ mod tests { ); assert_eq!(decoded.poll(), Ok(Ready(None))); + let mut decoded = Decoded::new(one_chunk(b"data:hello\n") + .chain(delay_one_then(chunk(b"data:world\n\n")))); + + assert_eq!(decoded.poll(), Ok(NotReady)); + assert_eq!( + decoded.poll(), + Ok(Ready(Some(event( + "", + &btreemap! {"data" => &b"hello\nworld"[..]} + )))) + ); + assert_eq!(decoded.poll(), Ok(Ready(None))); + let interrupted_after_event = one_chunk(b"message: hello\n\n") .chain(stream::poll_fn(|| Err(dummy_stream_error("read error")))); let mut decoded = Decoded::new(interrupted_after_event); From 5b7bd275408844b3b80f2b5725249dc9dc74d237 Mon Sep 17 00:00:00 2001 From: Sam Stokes Date: Mon, 20 Jul 2020 12:06:35 -0700 Subject: [PATCH 03/17] fix formatting and add a NotReady so errors show up in CI --- src/decode.rs | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/src/decode.rs b/src/decode.rs index 186f028..0c94afc 100644 --- a/src/decode.rs +++ b/src/decode.rs @@ -365,10 +365,13 @@ mod tests { ); assert_eq!(decoded.poll(), Ok(Ready(None))); - let mut decoded = Decoded::new(one_chunk(b"message:hell") - .chain(delay_one_then(chunk(b"o\n\nmessage:"))) - .chain(delay_one_then(chunk(b"world\n\n")))); + let mut decoded = Decoded::new( + one_chunk(b"message:hell") + .chain(delay_one_then(chunk(b"o\n\nmessage:"))) + .chain(delay_one_then(chunk(b"world\n\n"))), + ); + assert_eq!(decoded.poll(), Ok(NotReady)); assert_eq!(decoded.poll(), Ok(NotReady)); assert_eq!( decoded.poll(), @@ -386,8 +389,9 @@ mod tests { ); assert_eq!(decoded.poll(), Ok(Ready(None))); - let mut decoded = Decoded::new(one_chunk(b"data:hello\n") - .chain(delay_one_then(chunk(b"data:world\n\n")))); + let mut decoded = Decoded::new( + one_chunk(b"data:hello\n").chain(delay_one_then(chunk(b"data:world\n\n"))), + ); assert_eq!(decoded.poll(), Ok(NotReady)); assert_eq!( From a853cc07232ff3ae4c72ed2c4524fec6a57a265f Mon Sep 17 00:00:00 2001 From: Sam Stokes Date: Tue, 11 Aug 2020 22:58:28 -0700 Subject: [PATCH 04/17] Rewrite chunk-to-line decoding to handle line terminators _within_ chunk --- src/decode.rs | 82 ++++++++++++++++++++++++++++++++++++++++----------- 1 file changed, 64 insertions(+), 18 deletions(-) diff --git a/src/decode.rs b/src/decode.rs index 0c94afc..2c48add 100644 --- a/src/decode.rs +++ b/src/decode.rs @@ -117,28 +117,74 @@ where trace!("decoder got a chunk: {:?}", logify(&chunk)); - match self.incomplete_line.as_mut() { - // TODO can we avoid these copies? - None => self.incomplete_line = Some(chunk.to_vec()), - Some(incomplete_line) => incomplete_line.extend(chunk.into_iter()), + // Decoding a chunk has two phases: decode the chunk into lines, and decode the lines + // into events. + + // Phase 1: decode the chunk into lines. + + let mut complete_lines: Vec> = Vec::with_capacity(10); + let mut maybe_incomplete_line: Option<&[u8]> = None; + + // TODO also handle lines ending in \r, \r\n (and EOF?) + let mut lines = chunk.split(|&b| b == b'\n'); + // The first and last elements in this split are special. The spec requires lines to be + // terminated. But lines may span chunks, so: + // * the last line, if non-empty (i.e. if chunk didn't end with a line terminator), + // should be buffered as an incomplete line + // * the first line should be appended to the incomplete line, if any + + for line in lines { + trace!("decoder got a line: {:?}", logify(line)); + + if self.incomplete_line.is_some() { + trace!( + "completing line: {:?}", + logify(self.incomplete_line.as_ref().unwrap()) + ); + + // only the first line can hit this case, since it clears self.incomplete_line + // and we don't fill it again until the end of the loop + let mut incomplete_line = + std::mem::replace(&mut self.incomplete_line, None).unwrap(); + incomplete_line.extend_from_slice(line); + complete_lines.push(incomplete_line); + continue; + } + + if maybe_incomplete_line.is_some() { + // we saw the next line, so the previous one must have been complete after all + trace!( + "previous line was complete: {:?}", + logify(maybe_incomplete_line.as_ref().unwrap()) + ); + let actually_complete_line = + std::mem::replace(&mut maybe_incomplete_line, Some(line)).unwrap(); + complete_lines.push(actually_complete_line.to_vec()); + } else { + trace!("potentially incomplete line: {:?}", logify(line)); + maybe_incomplete_line = Some(line); + } } - let incomplete_line = self.incomplete_line.as_ref().unwrap(); - let chunk = if incomplete_line.ends_with(b"\n") { - std::mem::replace(&mut self.incomplete_line, None).unwrap() - } else { - debug!("Chunk does not end with newline!"); - continue; - }; - // strip off final newline so that .split below doesn't yield a - // bogus empty string as the last "line" - let chunk = &chunk[..chunk.len() - 1]; + match maybe_incomplete_line { + Some(b"") => trace!("last line was empty"), + Some(incomplete_line) => { + trace!("buffering incomplete line: {:?}", logify(incomplete_line)); + self.incomplete_line = Some(incomplete_line.to_vec()); + } + None => trace!("no last line?"), // TODO + } + + for line in &complete_lines { + trace!("complete line: {:?}", logify(line)); + } + + // Phase 2: decode the lines into events. - let lines = chunk.split(|&b| b'\n' == b); let mut seen_empty_line = false; - for line in lines { - trace!("Decoder got a line: {}", logify(line)); + for line in complete_lines { + trace!("Decoder got a line: {:?}", logify(&line)); if line.is_empty() { trace!("empty line"); @@ -146,7 +192,7 @@ where continue; } - if let Some((key, value)) = parse_field(line)? { + if let Some((key, value)) = parse_field(&line)? { if self.event.is_none() { self.event = Some(Event::new()); } From c9fdd78ecced5843f84c779436b0b69ca6ba530e Mon Sep 17 00:00:00 2001 From: Sam Stokes Date: Tue, 11 Aug 2020 23:02:24 -0700 Subject: [PATCH 05/17] tweak test delay handling, add another test case --- src/decode.rs | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) diff --git a/src/decode.rs b/src/decode.rs index 2c48add..6b6b10d 100644 --- a/src/decode.rs +++ b/src/decode.rs @@ -411,13 +411,24 @@ mod tests { ); assert_eq!(decoded.poll(), Ok(Ready(None))); + let mut decoded = + Decoded::new(one_chunk(b"message:hell").chain(delay_one_then(chunk(b"o\n\n")))); + assert_eq!(decoded.poll(), Ok(NotReady)); + assert_eq!( + decoded.poll(), + Ok(Ready(Some(event( + "", + &btreemap! {"message" => &b"hello"[..]} + )))) + ); + assert_eq!(decoded.poll(), Ok(Ready(None))); + let mut decoded = Decoded::new( one_chunk(b"message:hell") .chain(delay_one_then(chunk(b"o\n\nmessage:"))) .chain(delay_one_then(chunk(b"world\n\n"))), ); - assert_eq!(decoded.poll(), Ok(NotReady)); assert_eq!(decoded.poll(), Ok(NotReady)); assert_eq!( decoded.poll(), @@ -426,6 +437,7 @@ mod tests { &btreemap! {"message" => &b"hello"[..]} )))) ); + assert_eq!(decoded.poll(), Ok(NotReady)); assert_eq!( decoded.poll(), Ok(Ready(Some(event( From bb4d2022d01fe14208b6c2abd1ccb931999d7a61 Mon Sep 17 00:00:00 2001 From: Sam Stokes Date: Wed, 12 Aug 2020 14:27:05 -0700 Subject: [PATCH 06/17] Append subsequent values for the same field instead of replacing --- src/decode.rs | 23 +++++++++++++++++++---- 1 file changed, 19 insertions(+), 4 deletions(-) diff --git a/src/decode.rs b/src/decode.rs index 6b6b10d..477b72f 100644 --- a/src/decode.rs +++ b/src/decode.rs @@ -28,8 +28,23 @@ impl Event { self.fields.get(name).map(|buf| buf.as_slice()) } - fn set_field(&mut self, name: &str, value: &[u8]) { - self.fields.insert(name.into(), value.to_owned()); + // Set the named field to the given value, or append to any existing value, as required by the + // spec. Will append a newline each time. + fn append_field(&mut self, name: &str, value: &[u8]) { + let existing = match self.fields.get_mut(name) { + None => { + let empty = Vec::with_capacity(value.len() + 1); + self.fields.insert(name.into(), empty); + self.fields.get_mut(name).unwrap() + } + Some(nonempty) => { + nonempty.reserve(value.len() + 1); + nonempty + } + }; + + existing.extend(value); + existing.push(b'\n'); } } @@ -204,7 +219,7 @@ where .map_err(Error::InvalidEventType)? .to_string(); } else { - event.set_field(key, value); + event.append_field(key, value); } } } @@ -297,7 +312,7 @@ mod tests { let mut evt = Event::new(); evt.event_type = typ.to_string(); for (k, v) in fields { - evt.set_field(k, v); + evt.append_field(k, v); } evt } From 0908167f7588a9d6b85068f5b85cf27ccc6365a3 Mon Sep 17 00:00:00 2001 From: Sam Stokes Date: Wed, 12 Aug 2020 14:28:31 -0700 Subject: [PATCH 07/17] remove unnecessary mut --- src/decode.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/decode.rs b/src/decode.rs index 477b72f..ad5cb24 100644 --- a/src/decode.rs +++ b/src/decode.rs @@ -141,7 +141,7 @@ where let mut maybe_incomplete_line: Option<&[u8]> = None; // TODO also handle lines ending in \r, \r\n (and EOF?) - let mut lines = chunk.split(|&b| b == b'\n'); + let lines = chunk.split(|&b| b == b'\n'); // The first and last elements in this split are special. The spec requires lines to be // terminated. But lines may span chunks, so: // * the last line, if non-empty (i.e. if chunk didn't end with a line terminator), From 1cc8cae13929b2c8615a7c93cba89b49dd12e9e9 Mon Sep 17 00:00:00 2001 From: Sam Stokes Date: Wed, 12 Aug 2020 15:03:58 -0700 Subject: [PATCH 08/17] clean up trace logging a bit --- src/decode.rs | 16 +++++++--------- 1 file changed, 7 insertions(+), 9 deletions(-) diff --git a/src/decode.rs b/src/decode.rs index ad5cb24..b61dd4a 100644 --- a/src/decode.rs +++ b/src/decode.rs @@ -149,16 +149,16 @@ where // * the first line should be appended to the incomplete line, if any for line in lines { - trace!("decoder got a line: {:?}", logify(line)); - if self.incomplete_line.is_some() { + // only the first line can hit this case, since it clears self.incomplete_line + // and we don't fill it again until the end of the loop + trace!( - "completing line: {:?}", - logify(self.incomplete_line.as_ref().unwrap()) + "completing line from previous chunk: {:?}+{:?}", + logify(self.incomplete_line.as_ref().unwrap()), + logify(line) ); - // only the first line can hit this case, since it clears self.incomplete_line - // and we don't fill it again until the end of the loop let mut incomplete_line = std::mem::replace(&mut self.incomplete_line, None).unwrap(); incomplete_line.extend_from_slice(line); @@ -182,7 +182,7 @@ where } match maybe_incomplete_line { - Some(b"") => trace!("last line was empty"), + Some(b"") => trace!("chunk ended with a line terminator"), Some(incomplete_line) => { trace!("buffering incomplete line: {:?}", logify(incomplete_line)); self.incomplete_line = Some(incomplete_line.to_vec()); @@ -199,8 +199,6 @@ where let mut seen_empty_line = false; for line in complete_lines { - trace!("Decoder got a line: {:?}", logify(&line)); - if line.is_empty() { trace!("empty line"); seen_empty_line = true; From 685146b8a13f0019f547d91edecd39d7a655956d Mon Sep 17 00:00:00 2001 From: Sam Stokes Date: Wed, 12 Aug 2020 16:18:02 -0700 Subject: [PATCH 09/17] handle incomplete chunks in middle of stream --- src/decode.rs | 49 ++++++++++++++++++++++++++++++++++++++++--------- 1 file changed, 40 insertions(+), 9 deletions(-) diff --git a/src/decode.rs b/src/decode.rs index b61dd4a..e9ed62f 100644 --- a/src/decode.rs +++ b/src/decode.rs @@ -138,7 +138,7 @@ where // Phase 1: decode the chunk into lines. let mut complete_lines: Vec> = Vec::with_capacity(10); - let mut maybe_incomplete_line: Option<&[u8]> = None; + let mut maybe_incomplete_line: Option> = None; // TODO also handle lines ending in \r, \r\n (and EOF?) let lines = chunk.split(|&b| b == b'\n'); @@ -162,7 +162,8 @@ where let mut incomplete_line = std::mem::replace(&mut self.incomplete_line, None).unwrap(); incomplete_line.extend_from_slice(line); - complete_lines.push(incomplete_line); + + maybe_incomplete_line = Some(incomplete_line); // safe to clobber since this is the first line continue; } @@ -173,21 +174,21 @@ where logify(maybe_incomplete_line.as_ref().unwrap()) ); let actually_complete_line = - std::mem::replace(&mut maybe_incomplete_line, Some(line)).unwrap(); - complete_lines.push(actually_complete_line.to_vec()); + std::mem::replace(&mut maybe_incomplete_line, Some(line.to_vec())).unwrap(); + complete_lines.push(actually_complete_line); } else { trace!("potentially incomplete line: {:?}", logify(line)); - maybe_incomplete_line = Some(line); + maybe_incomplete_line = Some(line.to_vec()); } } match maybe_incomplete_line { - Some(b"") => trace!("chunk ended with a line terminator"), + Some(l) if l.is_empty() => trace!("chunk ended with a line terminator"), Some(incomplete_line) => { - trace!("buffering incomplete line: {:?}", logify(incomplete_line)); - self.incomplete_line = Some(incomplete_line.to_vec()); + trace!("buffering incomplete line: {:?}", logify(&incomplete_line)); + self.incomplete_line = Some(incomplete_line); } - None => trace!("no last line?"), // TODO + None => unreachable!(), // we always set it after processing a line, and we always have at least one line } for line in &complete_lines { @@ -339,6 +340,36 @@ mod tests { delay_one_poll().chain(stream::once(Ok(t))) } + #[test] + fn test_decode_incomplete_chunks() { + let one_empty = one_chunk(b""); + assert_eq!(Decoded::new(one_empty).poll(), Ok(Ready(None))); + + let empty_after_incomplete = one_chunk(b"message:foo") + .chain(one_chunk(b"")) + .chain(one_chunk(b"baz\n\n")); + let mut decoded = Decoded::new(empty_after_incomplete); + assert_eq!( + decoded.poll(), + Ok(Ready(Some(event( + "", + &btreemap! {"message" => &b"foobaz"[..]} + )))) + ); + + let incomplete_after_incomplete = one_chunk(b"message:foo") + .chain(one_chunk(b"bar")) + .chain(one_chunk(b"baz\n\n")); + let mut decoded = Decoded::new(incomplete_after_incomplete); + assert_eq!( + decoded.poll(), + Ok(Ready(Some(event( + "", + &btreemap! {"message" => &b"foobarbaz"[..]} + )))) + ); + } + #[test] fn test_decod() { let empty = stream::empty::(); From 6f30d2f3d8924e78d8eadcd6f669e6a2312aec75 Mon Sep 17 00:00:00 2001 From: Sam Stokes Date: Thu, 13 Aug 2020 16:18:21 -0700 Subject: [PATCH 10/17] add env_logger in example script and mention logging in DEVELOPING.md --- Cargo.toml | 1 + DEVELOPING.md | 9 +++++++++ examples/tail.rs | 2 ++ 3 files changed, 12 insertions(+) create mode 100644 DEVELOPING.md diff --git a/Cargo.toml b/Cargo.toml index f30b5c6..3d75339 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -17,6 +17,7 @@ reqwest = "0.9.11" tokio-timer = "0.2.11" [dev-dependencies] +env_logger = "0.7.1" maplit = "1.0.1" simplelog = "0.5.3" tokio = "0.1.16" diff --git a/DEVELOPING.md b/DEVELOPING.md new file mode 100644 index 0000000..c3abde8 --- /dev/null +++ b/DEVELOPING.md @@ -0,0 +1,9 @@ +# Guide to developing rust-eventsource-client + +Incomplete. + +## Get detailed logging + +eventsource-client uses the standard [log crate](https://crates.io/crates/log) for logging. It will log additional detail about the protocol implementation at `trace` level. + +e.g. if using [env_logger](https://crates.io/crates/env_logger) (as the example script does), set `RUST_LOG=eventsource_client=trace`. diff --git a/examples/tail.rs b/examples/tail.rs index 00c6e88..92566d5 100644 --- a/examples/tail.rs +++ b/examples/tail.rs @@ -5,6 +5,8 @@ use futures::{future::Future, lazy, stream::Stream}; use eventsource_client as es; fn main() -> Result<(), es::Error> { + env_logger::init(); + let args: Vec = env::args().collect(); if args.len() != 3 { From 75fb7afd660067acd3aaa7ce25f6d4cf399474b3 Mon Sep 17 00:00:00 2001 From: Sam Stokes Date: Thu, 13 Aug 2020 16:25:46 -0700 Subject: [PATCH 11/17] get rid of some unwrap() for readability --- src/decode.rs | 31 +++++++++++++++++-------------- 1 file changed, 17 insertions(+), 14 deletions(-) diff --git a/src/decode.rs b/src/decode.rs index e9ed62f..a2a40dc 100644 --- a/src/decode.rs +++ b/src/decode.rs @@ -149,13 +149,13 @@ where // * the first line should be appended to the incomplete line, if any for line in lines { - if self.incomplete_line.is_some() { + if let Some(incomplete_line) = &mut self.incomplete_line { // only the first line can hit this case, since it clears self.incomplete_line // and we don't fill it again until the end of the loop trace!( "completing line from previous chunk: {:?}+{:?}", - logify(self.incomplete_line.as_ref().unwrap()), + logify(&incomplete_line), logify(line) ); @@ -167,18 +167,21 @@ where continue; } - if maybe_incomplete_line.is_some() { - // we saw the next line, so the previous one must have been complete after all - trace!( - "previous line was complete: {:?}", - logify(maybe_incomplete_line.as_ref().unwrap()) - ); - let actually_complete_line = - std::mem::replace(&mut maybe_incomplete_line, Some(line.to_vec())).unwrap(); - complete_lines.push(actually_complete_line); - } else { - trace!("potentially incomplete line: {:?}", logify(line)); - maybe_incomplete_line = Some(line.to_vec()); + match &mut maybe_incomplete_line { + None => { + trace!("potentially incomplete line: {:?}", logify(line)); + maybe_incomplete_line = Some(line.to_vec()); + } + Some(actually_complete_line) => { + // we saw the next line, so the previous one must have been complete after all + trace!( + "previous line was complete: {:?}", + logify(actually_complete_line) + ); + let actually_complete_line = + std::mem::replace(actually_complete_line, line.to_vec()); + complete_lines.push(actually_complete_line); + } } } From 0a932c398f87ac9f2e2e138e309db2a536028066 Mon Sep 17 00:00:00 2001 From: Sam Stokes Date: Thu, 13 Aug 2020 16:34:52 -0700 Subject: [PATCH 12/17] remove placeholder test --- src/lib.rs | 8 -------- 1 file changed, 8 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index 858b7c6..f54177e 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -43,11 +43,3 @@ pub use client::*; pub use config::*; pub use decode::{Event, EventStream}; pub use error::*; - -#[cfg(test)] -mod tests { - #[test] - fn it_works() { - assert_eq!(2 + 2, 4); - } -} From cee673e6f68b7f95c128afc813260ed3af3a2c30 Mon Sep 17 00:00:00 2001 From: Sam Stokes Date: Thu, 13 Aug 2020 16:47:26 -0700 Subject: [PATCH 13/17] less confusing ordering --- src/decode.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/decode.rs b/src/decode.rs index a2a40dc..f1d8a2b 100644 --- a/src/decode.rs +++ b/src/decode.rs @@ -137,9 +137,6 @@ where // Phase 1: decode the chunk into lines. - let mut complete_lines: Vec> = Vec::with_capacity(10); - let mut maybe_incomplete_line: Option> = None; - // TODO also handle lines ending in \r, \r\n (and EOF?) let lines = chunk.split(|&b| b == b'\n'); // The first and last elements in this split are special. The spec requires lines to be @@ -148,6 +145,9 @@ where // should be buffered as an incomplete line // * the first line should be appended to the incomplete line, if any + let mut complete_lines: Vec> = Vec::with_capacity(10); + let mut maybe_incomplete_line: Option> = None; + for line in lines { if let Some(incomplete_line) = &mut self.incomplete_line { // only the first line can hit this case, since it clears self.incomplete_line From 98044c4b4b598787e1db51b9fb11f9d98a20baf9 Mon Sep 17 00:00:00 2001 From: Sam Stokes Date: Thu, 13 Aug 2020 16:48:20 -0700 Subject: [PATCH 14/17] only strip a single initial space --- src/decode.rs | 20 +++++++++++++------- 1 file changed, 13 insertions(+), 7 deletions(-) diff --git a/src/decode.rs b/src/decode.rs index f1d8a2b..c4b39aa 100644 --- a/src/decode.rs +++ b/src/decode.rs @@ -79,11 +79,12 @@ fn parse_field(line: &[u8]) -> Result> { let key = &line[0..colon_pos]; let key = from_utf8(key) .map_err(|e| Error::InvalidLine(format!("malformed key: {:?}", e)))?; - let value = &line[colon_pos + 1..]; - let value = match value.iter().position(|&b| !b.is_ascii_whitespace()) { - Some(start) => &value[start..], - None => b"", - }; + + let mut value = &line[colon_pos + 1..]; + // remove the first initial space character if any (but remove no other whitespace) + if value.starts_with(b" ") { + value = &value[1..]; + } debug!("key: {}, value: {}", key, logify(value)); @@ -279,10 +280,15 @@ mod tests { fn test_parse_field_valid() { assert_eq!(parse_field(b"event:foo"), field("event", b"foo")); assert_eq!(parse_field(b"event: foo"), field("event", b"foo")); - assert_eq!(parse_field(b"event: foo"), field("event", b"foo")); - assert_eq!(parse_field(b"event:\tfoo"), field("event", b"foo")); + assert_eq!(parse_field(b"event: foo"), field("event", b" foo")); + assert_eq!(parse_field(b"event:\tfoo"), field("event", b"\tfoo")); assert_eq!(parse_field(b"event: foo "), field("event", b"foo ")); + assert_eq!(parse_field(b"disconnect:"), field("disconnect", b"")); + assert_eq!(parse_field(b"disconnect: "), field("disconnect", b"")); + assert_eq!(parse_field(b"disconnect: "), field("disconnect", b" ")); + assert_eq!(parse_field(b"disconnect:\t"), field("disconnect", b"\t")); + assert_eq!(parse_field(b" : foo"), field(" ", b"foo")); assert_eq!(parse_field(b"\xe2\x98\x83: foo"), field("☃", b"foo")); } From 8b2dd4233ab9e3ec23197be40203243e139c8639 Mon Sep 17 00:00:00 2001 From: Sam Stokes Date: Thu, 13 Aug 2020 17:05:13 -0700 Subject: [PATCH 15/17] handle lines with no colons correctly --- src/decode.rs | 25 +++++++++++++++---------- 1 file changed, 15 insertions(+), 10 deletions(-) diff --git a/src/decode.rs b/src/decode.rs index c4b39aa..ae8484f 100644 --- a/src/decode.rs +++ b/src/decode.rs @@ -67,6 +67,12 @@ fn logify(bytes: &[u8]) -> &str { } fn parse_field(line: &[u8]) -> Result> { + if line.is_empty() { + return Err(Error::InvalidLine( + "should never try to parse an empty line (probably a bug)".into(), + )); + } + match line.iter().position(|&b| b':' == b) { Some(0) => { debug!( @@ -77,8 +83,7 @@ fn parse_field(line: &[u8]) -> Result> { } Some(colon_pos) => { let key = &line[0..colon_pos]; - let key = from_utf8(key) - .map_err(|e| Error::InvalidLine(format!("malformed key: {:?}", e)))?; + let key = parse_key(key)?; let mut value = &line[colon_pos + 1..]; // remove the first initial space character if any (but remove no other whitespace) @@ -90,10 +95,14 @@ fn parse_field(line: &[u8]) -> Result> { Ok(Some((key, value))) } - None => Err(Error::InvalidLine("line missing ':' byte".to_string())), + None => Ok(Some((parse_key(line)?, b""))), } } +fn parse_key(key: &[u8]) -> Result<&str> { + from_utf8(key).map_err(|e| Error::InvalidLine(format!("malformed key: {:?}", e))) +} + #[must_use = "streams do nothing unless polled"] pub struct Decoded { chunk_stream: Fuse, @@ -249,19 +258,13 @@ where mod tests { use super::{Error::*, *}; - fn invalid(msg: &str) -> Error { - InvalidLine(msg.to_string()) - } - fn field<'a>(key: &'a str, value: &'a [u8]) -> Result> { Ok(Some((key, value))) } #[test] fn test_parse_field_invalid() { - assert_eq!(parse_field(b""), Err(invalid("line missing ':' byte"))); - - assert_eq!(parse_field(b"event"), Err(invalid("line missing ':' byte"))); + assert!(parse_field(b"").is_err()); match parse_field(b"\x80: invalid UTF-8") { Err(InvalidLine(msg)) => assert!(msg.contains("Utf8Error")), @@ -289,6 +292,8 @@ mod tests { assert_eq!(parse_field(b"disconnect: "), field("disconnect", b" ")); assert_eq!(parse_field(b"disconnect:\t"), field("disconnect", b"\t")); + assert_eq!(parse_field(b"disconnect"), field("disconnect", b"")); + assert_eq!(parse_field(b" : foo"), field(" ", b"foo")); assert_eq!(parse_field(b"\xe2\x98\x83: foo"), field("☃", b"foo")); } From 2a1a2a806095891976a006521582d30253582c78 Mon Sep 17 00:00:00 2001 From: Sam Stokes Date: Thu, 13 Aug 2020 17:47:07 -0700 Subject: [PATCH 16/17] jiggle around test cases instead of having one gigantic test function --- src/decode.rs | 160 ++++++++++++++++++++++++++------------------------ 1 file changed, 84 insertions(+), 76 deletions(-) diff --git a/src/decode.rs b/src/decode.rs index ae8484f..a6e4f51 100644 --- a/src/decode.rs +++ b/src/decode.rs @@ -355,74 +355,7 @@ mod tests { } #[test] - fn test_decode_incomplete_chunks() { - let one_empty = one_chunk(b""); - assert_eq!(Decoded::new(one_empty).poll(), Ok(Ready(None))); - - let empty_after_incomplete = one_chunk(b"message:foo") - .chain(one_chunk(b"")) - .chain(one_chunk(b"baz\n\n")); - let mut decoded = Decoded::new(empty_after_incomplete); - assert_eq!( - decoded.poll(), - Ok(Ready(Some(event( - "", - &btreemap! {"message" => &b"foobaz"[..]} - )))) - ); - - let incomplete_after_incomplete = one_chunk(b"message:foo") - .chain(one_chunk(b"bar")) - .chain(one_chunk(b"baz\n\n")); - let mut decoded = Decoded::new(incomplete_after_incomplete); - assert_eq!( - decoded.poll(), - Ok(Ready(Some(event( - "", - &btreemap! {"message" => &b"foobarbaz"[..]} - )))) - ); - } - - #[test] - fn test_decod() { - let empty = stream::empty::(); - assert_eq!(Decoded::new(empty).poll(), Ok(Ready(None))); - - assert_eq!(Decoded::new(one_chunk(b":hello\n")).poll(), Ok(Ready(None))); - - let one_comment_unterminated = - futures::stream::once::(Ok(chunk(b":hello"))); - let mut decoded = Decoded::new(one_comment_unterminated); - assert_eq!(decoded.poll(), Err(UnexpectedEof)); - - assert_eq!( - Decoded::new(one_chunk(b"message: hello\n")).poll(), - Ok(Ready(None)) - ); - - let interrupted_after_comment = - one_chunk(b":hello\n").chain(stream::poll_fn(|| Err(dummy_stream_error("read error")))); - match Decoded::new(interrupted_after_comment).poll() { - Err(err) => { - assert!(err.is_http_stream_error()); - let description = format!("{}", err.source().unwrap()); - assert!(description.contains("read error"), description); - } - res => panic!("expected HttpStream error, got {:?}", res), - } - - let interrupted_after_field = one_chunk(b"message: hello\n") - .chain(stream::poll_fn(|| Err(dummy_stream_error("read error")))); - match Decoded::new(interrupted_after_field).poll() { - Err(err) => { - assert!(err.is_http_stream_error()); - let description = format!("{}", err.source().unwrap()); - assert!(description.contains("read error"), description); - } - res => panic!("expected HttpStream error, got {:?}", res), - } - + fn test_decode_chunks_simple() { let mut decoded = Decoded::new(one_chunk(b"message: hello\n\n")); assert_eq!( decoded.poll(), @@ -447,6 +380,15 @@ mod tests { ); assert_eq!(decoded.poll(), Ok(Ready(None))); + assert_eq!( + Decoded::new(one_chunk(b":hello\n")).poll(), + Ok(Ready(None)), + "comments are ignored" + ); + } + + #[test] + fn test_decode_message_split_across_chunks() { let mut decoded = Decoded::new(one_chunk(b"message:").chain(one_chunk(b"hello\n\n"))); assert_eq!( decoded.poll(), @@ -457,9 +399,7 @@ mod tests { ); assert_eq!(decoded.poll(), Ok(Ready(None))); - let mut decoded = - Decoded::new(one_chunk(b"message:").chain(delay_one_then(chunk(b"hello\n\n")))); - assert_eq!(decoded.poll(), Ok(NotReady)); + let mut decoded = Decoded::new(one_chunk(b"message:hell").chain(one_chunk(b"o\n\n"))); assert_eq!( decoded.poll(), Ok(Ready(Some(event( @@ -470,7 +410,7 @@ mod tests { assert_eq!(decoded.poll(), Ok(Ready(None))); let mut decoded = - Decoded::new(one_chunk(b"message:hell").chain(delay_one_then(chunk(b"o\n\n")))); + Decoded::new(one_chunk(b"message:").chain(delay_one_then(chunk(b"hello\n\n")))); assert_eq!(decoded.poll(), Ok(NotReady)); assert_eq!( decoded.poll(), @@ -483,11 +423,10 @@ mod tests { let mut decoded = Decoded::new( one_chunk(b"message:hell") - .chain(delay_one_then(chunk(b"o\n\nmessage:"))) - .chain(delay_one_then(chunk(b"world\n\n"))), + .chain(one_chunk(b"o\n\nmessage:")) + .chain(one_chunk(b"world\n\n")), ); - assert_eq!(decoded.poll(), Ok(NotReady)); assert_eq!( decoded.poll(), Ok(Ready(Some(event( @@ -495,7 +434,6 @@ mod tests { &btreemap! {"message" => &b"hello"[..]} )))) ); - assert_eq!(decoded.poll(), Ok(NotReady)); assert_eq!( decoded.poll(), Ok(Ready(Some(event( @@ -504,7 +442,37 @@ mod tests { )))) ); assert_eq!(decoded.poll(), Ok(Ready(None))); + } + + #[test] + fn test_decode_line_split_across_chunks() { + let empty_after_incomplete = one_chunk(b"message:foo") + .chain(one_chunk(b"")) + .chain(one_chunk(b"baz\n\n")); + let mut decoded = Decoded::new(empty_after_incomplete); + assert_eq!( + decoded.poll(), + Ok(Ready(Some(event( + "", + &btreemap! {"message" => &b"foobaz"[..]} + )))) + ); + let incomplete_after_incomplete = one_chunk(b"message:foo") + .chain(one_chunk(b"bar")) + .chain(one_chunk(b"baz\n\n")); + let mut decoded = Decoded::new(incomplete_after_incomplete); + assert_eq!( + decoded.poll(), + Ok(Ready(Some(event( + "", + &btreemap! {"message" => &b"foobarbaz"[..]} + )))) + ); + } + + #[test] + fn test_decode_concatenates_multiple_values_for_same_field() { let mut decoded = Decoded::new( one_chunk(b"data:hello\n").chain(delay_one_then(chunk(b"data:world\n\n"))), ); @@ -518,6 +486,46 @@ mod tests { )))) ); assert_eq!(decoded.poll(), Ok(Ready(None))); + } + + #[test] + fn test_decode_edge_cases() { + let empty = stream::empty::(); + assert_eq!(Decoded::new(empty).poll(), Ok(Ready(None))); + + let one_empty = one_chunk(b""); + assert_eq!(Decoded::new(one_empty).poll(), Ok(Ready(None))); + + let one_comment_unterminated = one_chunk(b":hello"); + let mut decoded = Decoded::new(one_comment_unterminated); + assert_eq!(decoded.poll(), Err(UnexpectedEof)); + + assert_eq!( + Decoded::new(one_chunk(b"message: hello\n")).poll(), + Ok(Ready(None)) + ); + + let interrupted_after_comment = + one_chunk(b":hello\n").chain(stream::poll_fn(|| Err(dummy_stream_error("read error")))); + match Decoded::new(interrupted_after_comment).poll() { + Err(err) => { + assert!(err.is_http_stream_error()); + let description = format!("{}", err.source().unwrap()); + assert!(description.contains("read error"), description); + } + res => panic!("expected HttpStream error, got {:?}", res), + } + + let interrupted_after_field = one_chunk(b"message: hello\n") + .chain(stream::poll_fn(|| Err(dummy_stream_error("read error")))); + match Decoded::new(interrupted_after_field).poll() { + Err(err) => { + assert!(err.is_http_stream_error()); + let description = format!("{}", err.source().unwrap()); + assert!(description.contains("read error"), description); + } + res => panic!("expected HttpStream error, got {:?}", res), + } let interrupted_after_event = one_chunk(b"message: hello\n\n") .chain(stream::poll_fn(|| Err(dummy_stream_error("read error")))); From 0883ae7d73ecd584d3e5626ccd8f946bbd3c7753 Mon Sep 17 00:00:00 2001 From: Sam Stokes Date: Thu, 13 Aug 2020 18:01:20 -0700 Subject: [PATCH 17/17] add CH link for line terminator TODO --- src/decode.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/decode.rs b/src/decode.rs index a6e4f51..8b38edb 100644 --- a/src/decode.rs +++ b/src/decode.rs @@ -147,7 +147,7 @@ where // Phase 1: decode the chunk into lines. - // TODO also handle lines ending in \r, \r\n (and EOF?) + // TODO(ch86257) also handle lines ending in \r, \r\n (and EOF?) let lines = chunk.split(|&b| b == b'\n'); // The first and last elements in this split are special. The spec requires lines to be // terminated. But lines may span chunks, so: