From 208ad7e3c43e7c35d331a381735348281591763c Mon Sep 17 00:00:00 2001 From: Naja Melan Date: Fri, 19 Jul 2019 19:59:02 +0200 Subject: [PATCH 01/22] move futures_01 dependency to dev-dependencies --- Cargo.toml | 8 ++++---- Cargo.yml | 2 +- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 9288ba3..46d59c7 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -9,10 +9,6 @@ pharos = "^0.2" features = ["io-compat", "compat"] version = "^0.3.0-alpha.17" -[dependencies.futures_01] -package = "futures" -version = "^0.1" - [dependencies.js-sys] version = "^0.3" @@ -40,6 +36,10 @@ serde_cbor = "0.9.0" tokio-serde-cbor = "0.3.1" wasm-bindgen-test = "^0.2" +[dev-dependencies.futures_01] +package = "futures" +version = "^0.1" + [dev-dependencies.serde] features = ["derive"] version = "1.0.87" diff --git a/Cargo.yml b/Cargo.yml index 4b3c052..b4aa84d 100644 --- a/Cargo.yml +++ b/Cargo.yml @@ -22,7 +22,6 @@ dependencies: failure : ^0.1 futures-preview : { version: ^0.3.0-alpha.17, features: [io-compat, compat] } log : ^0.4 - futures_01 : { version: ^0.1, package: futures } js-sys : { version: ^0.3 } wasm-bindgen : { version: ^0.2 } @@ -55,3 +54,4 @@ dev-dependencies: flexi_logger : ^0.11 futures_codec : ^0.2 wasm-bindgen-test : ^0.2 + futures_01 : { version: ^0.1, package: futures } From ce5eacced8a90c33264e31a9d5b3a0cd56b87598 Mon Sep 17 00:00:00 2001 From: Naja Melan Date: Sat, 20 Jul 2019 00:47:24 +0200 Subject: [PATCH 02/22] basic chat client layout --- examples/chat_client/Cargo.toml | 25 +++++++ examples/chat_client/Cargo.yml | 36 ++++++++++ examples/chat_client/index.html | 121 ++++++++++++++++++++++++++++++++ 3 files changed, 182 insertions(+) create mode 100644 examples/chat_client/Cargo.toml create mode 100644 examples/chat_client/Cargo.yml create mode 100644 examples/chat_client/index.html diff --git a/examples/chat_client/Cargo.toml b/examples/chat_client/Cargo.toml new file mode 100644 index 0000000..1d9a240 --- /dev/null +++ b/examples/chat_client/Cargo.toml @@ -0,0 +1,25 @@ +# Auto-generated from "Cargo.yml" +[dependencies] +wasm-bindgen = "^0.2" + +[dependencies.async_runtime] +default-features = false +package = "naja_async_runtime" +version = "^0.1" + +[dependencies.web-sys] +features = ["console", "Document", "Element", "HtmlElement", "Node", "Window"] +version = "^0.3" + +[lib] +crate-type = ["cdylib"] +path = "src/entrypoint.rs" + +[package] +authors = ["Naja Melan "] +description = "An example for using async_runtime in wasm." +edition = "2018" +license = "Unlicence" +name = "ws_stream_wasm_chat_client" +repository = "https::/github.com/najamelan/ws_stream_wasm" +version = "0.1.0" diff --git a/examples/chat_client/Cargo.yml b/examples/chat_client/Cargo.yml new file mode 100644 index 0000000..b8dc727 --- /dev/null +++ b/examples/chat_client/Cargo.yml @@ -0,0 +1,36 @@ +package: + + name : ws_stream_wasm_chat_client + version : 0.1.0 + authors : [ Naja Melan ] + edition : '2018' + description: An example for using websockets in rust WASM. + repository : https::/github.com/najamelan/async_runtime + license : Unlicence + + +lib: + + crate-type : [ cdylib ] + path : src/entrypoint.rs + + +dependencies: + + naja_async_runtime: { version: ^0.1, default-features: false } + ws_stream_wasm : ^0.1 + wasm-bindgen : ^0.2 + + web-sys: + + version : ^0.3 + + features : + [ + console , + Document , + Element , + HtmlElement , + Node , + Window , + ] diff --git a/examples/chat_client/index.html b/examples/chat_client/index.html new file mode 100644 index 0000000..ca55426 --- /dev/null +++ b/examples/chat_client/index.html @@ -0,0 +1,121 @@ + + + + ws_stream_wasm Chat Client Example< + + + + + + + + + + +

ws_stream_wasm Chat Client Example

+ + +
 
+
 
+ + +
+ + + + +
+ + + + + From 3b37fe019c314df3fa1a978175e5925fde4a0750 Mon Sep 17 00:00:00 2001 From: Naja Melan Date: Sat, 20 Jul 2019 02:13:13 +0200 Subject: [PATCH 03/22] We are receiving a message from the chat server --- examples/chat_client/Cargo.toml | 20 +++++-- examples/chat_client/Cargo.yml | 7 ++- examples/chat_client/index.html | 8 ++- examples/chat_client/src/entrypoint.rs | 79 ++++++++++++++++++++++++++ 4 files changed, 105 insertions(+), 9 deletions(-) create mode 100644 examples/chat_client/src/entrypoint.rs diff --git a/examples/chat_client/Cargo.toml b/examples/chat_client/Cargo.toml index 1d9a240..b29e8e4 100644 --- a/examples/chat_client/Cargo.toml +++ b/examples/chat_client/Cargo.toml @@ -1,11 +1,20 @@ # Auto-generated from "Cargo.yml" [dependencies] +console_log = "^0.1" +log = "^0.4" wasm-bindgen = "^0.2" +ws_stream_wasm = "^0.1" -[dependencies.async_runtime] +[dependencies.futures-preview] +features = ["io-compat", "compat"] +version = "^0.3.0-alpha.17" + +[dependencies.futures_codec] +git = "https://github.com/najamelan/futures-codec" + +[dependencies.naja_async_runtime] default-features = false -package = "naja_async_runtime" -version = "^0.1" +version = "^0.2" [dependencies.web-sys] features = ["console", "Document", "Element", "HtmlElement", "Node", "Window"] @@ -17,9 +26,8 @@ path = "src/entrypoint.rs" [package] authors = ["Naja Melan "] -description = "An example for using async_runtime in wasm." +description = "An example for using websockets in rust WASM." edition = "2018" -license = "Unlicence" name = "ws_stream_wasm_chat_client" -repository = "https::/github.com/najamelan/ws_stream_wasm" +repository = "https::/github.com/najamelan/async_runtime" version = "0.1.0" diff --git a/examples/chat_client/Cargo.yml b/examples/chat_client/Cargo.yml index b8dc727..5fad93e 100644 --- a/examples/chat_client/Cargo.yml +++ b/examples/chat_client/Cargo.yml @@ -6,7 +6,6 @@ package: edition : '2018' description: An example for using websockets in rust WASM. repository : https::/github.com/najamelan/async_runtime - license : Unlicence lib: @@ -17,9 +16,13 @@ lib: dependencies: - naja_async_runtime: { version: ^0.1, default-features: false } + naja_async_runtime: { version: ^0.2, default-features: false } ws_stream_wasm : ^0.1 wasm-bindgen : ^0.2 + futures_codec : { git: "https://github.com/najamelan/futures-codec" } + futures-preview : { version: ^0.3.0-alpha.17, features: [io-compat, compat] } + console_log : ^0.1 + log : ^0.4 web-sys: diff --git a/examples/chat_client/index.html b/examples/chat_client/index.html index ca55426..0c16e65 100644 --- a/examples/chat_client/index.html +++ b/examples/chat_client/index.html @@ -5,6 +5,12 @@ @@ -116,7 +163,7 @@

ws_stream_wasm Chat Client Example

-
@@ -127,6 +174,32 @@

ws_stream_wasm Chat Client Example

+
+ +
+ +

+ + + + + + + +
+ +
+ diff --git a/examples/chat_client/src/entrypoint.rs b/examples/chat_client/src/entrypoint.rs index 93296c0..4ad866b 100644 --- a/examples/chat_client/src/entrypoint.rs +++ b/examples/chat_client/src/entrypoint.rs @@ -11,7 +11,7 @@ mod import { pub(crate) use { - chat_format :: { futures_serde_cbor::{ Codec, Error }, ServerMsg, ClientMsg } , + chat_format :: { futures_serde_cbor::{ Codec, Error }, ServerMsg, ClientMsg, ChatErr } , async_runtime :: { * } , web_sys :: { *, console::log_1 as dbg } , ws_stream_wasm :: { * } , @@ -31,9 +31,6 @@ mod import use crate::{ import::*, e_handler::*, color::*, user_list::* }; - -const URL : &str = "ws://127.0.0.1:3412"; - const HELP: &str = "Available commands: /nick NEWNAME # change nick (must be between 1 and 15 word characters) /help # Print available commands"; @@ -61,31 +58,21 @@ pub fn main() -> Result<(), JsValue> let program = async { - let chat = document().get_element_by_id( "chat" ).expect( "find chat" ); - - let (ws, wsio) = match WsStream::connect( URL, None ).await - { - Ok(conn) => conn, - Err(e) => - { - error!( "{}", e ); - return; - } - }; + let chat = get_id( "chat" ); + let cform = get_id( "connect_form" ); + let tarea = get_id( "chat_input" ); - let framed = Framed::new( wsio, Codec::new() ); - let (out, msgs) = framed.split(); + let cnick: HtmlInputElement = get_id( "connect_nick" ).unchecked_into(); + cnick.set_value( random_name() ); - let send = document().get_element_by_id( "chat_submit" ).expect_throw( "find chat_submit" ); - let form = document().get_element_by_id( "chat_form" ).expect_throw( "find chat_form" ); - let tarea = document().get_element_by_id( "chat_input" ).expect_throw( "find chat_input" ); + let on_enter = EHandler::new( &tarea, "keypress", false ); + let on_csubmit = EHandler::new( &cform, "submit" , false ); + let on_creset = EHandler::new( &cform, "reset" , false ); - let on_send = EHandler::new( &form , "submit" , false ); - let on_enter = EHandler::new( &tarea, "keypress", false ); + rt::spawn_local( on_key ( on_enter ) ).expect( "spawn on_key" ); + rt::spawn_local( on_cresets ( on_creset ) ).expect( "spawn on_key" ); - rt::spawn_local( on_msg ( msgs ) ).expect( "spawn on_msg" ); - rt::spawn_local( on_submit( on_send , out ) ).expect( "spawn on_submit" ); - rt::spawn_local( on_key ( on_enter ) ).expect( "spawn on_key" ); + on_connect ( on_csubmit ).await; }; rt::spawn_local( program ).expect( "spawn program" ); @@ -96,10 +83,10 @@ pub fn main() -> Result<(), JsValue> fn append_line( chat: &Element, time: f64, nick: &str, line: &str, color: &Color, color_all: bool ) { - let p: HtmlElement = document().create_element( "p" ).expect( "create p" ).unchecked_into(); - let n: HtmlElement = document().create_element( "span" ).expect( "create span" ).unchecked_into(); - let m: HtmlElement = document().create_element( "span" ).expect( "create span" ).unchecked_into(); - let t: HtmlElement = document().create_element( "span" ).expect( "create span" ).unchecked_into(); + let p: HtmlElement = document().create_element( "p" ).expect_throw( "create p" ).unchecked_into(); + let n: HtmlElement = document().create_element( "span" ).expect_throw( "create span" ).unchecked_into(); + let m: HtmlElement = document().create_element( "span" ).expect_throw( "create span" ).unchecked_into(); + let t: HtmlElement = document().create_element( "span" ).expect_throw( "create span" ).unchecked_into(); debug!( "setting color to: {}", color.to_css() ); @@ -148,7 +135,7 @@ fn append_line( chat: &Element, time: f64, nick: &str, line: &str, color: &Color async fn on_msg( mut stream: impl Stream> + Unpin ) { - let chat = document().get_element_by_id( "chat" ).expect_throw( "find chat" ); + let chat = get_id( "chat" ); let mut u_list = UserList::new(); let mut colors: HashMap = HashMap::new(); @@ -193,7 +180,7 @@ async fn on_msg( mut stream: impl Stream> + Unpin { users.into_iter().for_each( |(s,n)| u_list.insert(s,n) ); - let udiv = document().get_element_by_id( "users" ).expect_throw( "find users elem" ); + let udiv = get_id( "users" ); u_list.render( udiv.unchecked_ref() ); append_line( &chat, time as f64, "Server", &txt, colors.get( &0 ).unwrap(), true ); @@ -243,7 +230,7 @@ async fn on_msg( mut stream: impl Stream> + Unpin append_line( &chat, time as f64, "Server", &format!( "Sadly, {} has left us.", &nick ), colors.get( &0 ).unwrap(), true ); } - // _ => {} + _ => {} } } } @@ -255,7 +242,7 @@ async fn on_submit mut out : impl Sink < ClientMsg, Error=Error > + Unpin , ) { - let chat = document().get_element_by_id( "chat" ).expect_throw( "find chat" ); + let chat = get_id( "chat" ); let nickre = Regex::new( r"^/nick (\w{1,15})" ).unwrap(); @@ -263,7 +250,7 @@ async fn on_submit // let helpre = Regex::new(r"^/help\n$").unwrap(); - let textarea = document().get_element_by_id( "chat_input" ).expect_throw( "find chat_input" ); + let textarea = get_id( "chat_input" ); let textarea: &HtmlTextAreaElement = textarea.unchecked_ref(); @@ -312,7 +299,6 @@ async fn on_submit } - match out.send( msg ).await { Ok(()) => {} @@ -334,7 +320,7 @@ async fn on_key mut keys: impl Stream< Item=Event > + Unpin , ) { - let send: HtmlElement = document().get_element_by_id( "chat_submit" ).expect_throw( "find chat_submit" ).unchecked_into(); + let send: HtmlElement = get_id( "chat_submit" ).unchecked_into(); while let Some( evt ) = keys.next().await @@ -351,6 +337,132 @@ async fn on_key +async fn on_connect( mut evts: impl Stream< Item=Event > + Unpin ) +{ + while let Some(_evt) = evts.next().await + { + // validate form + // + let (nick, url) = match validate_connect_form() + { + Ok(ok) => ok, + + Err( e ) => + { + // report error to the user + // continue loop + // + unreachable!() + } + }; + + let (ws, wsio) = match WsStream::connect( url, None ).await + { + Ok(conn) => conn, + + Err(e) => + { + // report error to the user + // + error!( "{}", e ); + continue; + } + }; + + let framed = Framed::new( wsio, Codec::new() ); + let (mut out, mut msgs) = framed.split(); + + let form = get_id( "chat_form" ); + let on_send = EHandler::new( &form, "submit", false ); + + + // hide the connect form + // + let cform: HtmlElement = get_id( "connect_form" ).unchecked_into(); + + + // Ask the server to join + // + match out.send( ClientMsg::Join( nick ) ).await + { + Ok(()) => {} + Err(e) => { error!( "{}", e ); } + }; + + + // Error handling + // + let cerror: HtmlElement = get_id( "connect_error" ).unchecked_into(); + + + if let Some(response) = msgs.next().await + { + match response + { + Ok( ServerMsg::JoinSuccess ) => + { + cform.style().set_property( "display", "none" ).expect_throw( "set cform display none" ); + + rt::spawn_local( on_msg ( msgs ) ).expect( "spawn on_msg" ); + rt::spawn_local( on_submit( on_send , out ) ).expect( "spawn on_submit" ); + } + + // Show an error message on the connect form and let the user try again + // + Ok( ServerMsg::NickInUse{ .. } ) => + { + cerror.set_inner_text( "The nick name is already in use. Please choose another." ); + cerror.style().set_property( "display", "block" ).expect_throw( "set display block on cerror" ); + + continue; + } + + Ok( ServerMsg::NickInvalid{ .. } ) => + { + cerror.set_inner_text( "The nick name is invalid. It must be between 1 and 15 word characters." ); + cerror.style().set_property( "display", "block" ).expect_throw( "set display block on cerror" ); + + continue; + + } + + // cbor decoding error + // + Err(_) => + { + + } + + _ => { } + } + } + } +} + + + +fn validate_connect_form() -> Result< (String, String), ChatErr > +{ + let nick_field: HtmlInputElement = get_id( "connect_nick" ).unchecked_into(); + let url_field : HtmlInputElement = get_id( "connect_url" ).unchecked_into(); + + let nick = nick_field.value(); + let url = url_field .value(); + + Ok((nick, url)) +} + + + +async fn on_cresets( _evts: impl Stream< Item=Event > + Unpin ) +{ + let cnick: HtmlInputElement = get_id( "connect_nick" ).unchecked_into(); + + cnick.set_value( random_name() ); +} + + + pub fn document() -> Document { let window = web_sys::window().expect_throw( "no global `window` exists"); @@ -359,3 +471,53 @@ pub fn document() -> Document } + +// Return a random name +// +pub fn random_name() -> &'static str +{ + // I wanted to use the crate scottish_names to generate a random username, but + // it uses the rand crate which doesn't support wasm for now, so we're just using + // a small sample. + // + let list = vec! + [ + "Elena" + , "Arya" + , "Nora" + , "Amaya" + , "Noor" + , "Ebony" + , "Inaaya" + , "Nuala" + , "Hailie" + , "Hafsa" + , "Iqra" + , "Aleeza" + , "Emme" + , "Teya" + , "Susheela" + , "Pippa" + , "Kobi" + , "Orin" + , "Azaan" + , "Rhuaridh" + , "Salah" + , "Aoun" + ]; + + // pick one + // + list[ Math::floor( Math::random() * list.len() as f64 ) as usize ] +} + + +fn get_id( id: &str ) -> Element +{ + document().get_element_by_id( id ).expect_throw( &format!( "find {}", id ) ) +} + + + + + From 97b00fb1932b367cfbdbc99cc160ad161f03d188 Mon Sep 17 00:00:00 2001 From: Naja Melan Date: Thu, 1 Aug 2019 17:23:36 +0200 Subject: [PATCH 19/22] chat example: fix reset button --- examples/chat_client/index.html | 4 ++-- examples/chat_client/src/e_handler.rs | 2 +- examples/chat_client/src/entrypoint.rs | 19 ++++++++++--------- 3 files changed, 13 insertions(+), 12 deletions(-) diff --git a/examples/chat_client/index.html b/examples/chat_client/index.html index 1992d4d..c819b5a 100644 --- a/examples/chat_client/index.html +++ b/examples/chat_client/index.html @@ -192,8 +192,8 @@

ws_stream_wasm Chat Client Example

diff --git a/examples/chat_client/src/e_handler.rs b/examples/chat_client/src/e_handler.rs index 505f975..99e3cb5 100644 --- a/examples/chat_client/src/e_handler.rs +++ b/examples/chat_client/src/e_handler.rs @@ -15,7 +15,7 @@ impl EHandler { pub fn new( target: &EventTarget, event: &'static str, passive: bool ) -> Self { - debug!( "set event handler" ); + // debug!( "set event handler" ); let (sender, receiver) = mpsc::unbounded(); let options = match passive diff --git a/examples/chat_client/src/entrypoint.rs b/examples/chat_client/src/entrypoint.rs index 4ad866b..cd644ca 100644 --- a/examples/chat_client/src/entrypoint.rs +++ b/examples/chat_client/src/entrypoint.rs @@ -50,12 +50,6 @@ pub fn main() -> Result<(), JsValue> // wasm_logger::init( wasm_logger::Config::new(Level::Debug).message_on_new_line() ); - // Since there is no threads in wasm for the moment, this is optional if you include async_runtime - // with `default-dependencies = false`, the local pool will be the default. However this might - // change in the future. - // - rt::init( RtConfig::Local ).expect( "Set default executor" ); - let program = async { let chat = get_id( "chat" ); @@ -454,11 +448,18 @@ fn validate_connect_form() -> Result< (String, String), ChatErr > -async fn on_cresets( _evts: impl Stream< Item=Event > + Unpin ) +async fn on_cresets( mut evts: impl Stream< Item=Event > + Unpin ) { - let cnick: HtmlInputElement = get_id( "connect_nick" ).unchecked_into(); + while let Some( evt ) = evts.next().await + { + evt.prevent_default(); - cnick.set_value( random_name() ); + let cnick: HtmlInputElement = get_id( "connect_nick" ).unchecked_into(); + let curl : HtmlInputElement = get_id( "connect_url" ).unchecked_into(); + + cnick.set_value( random_name() ); + curl .set_value( "ws://127.0.0.1:3412" ); + } } From c8103d8346c0cf41bf2c0ebaf02c5e99d15339a9 Mon Sep 17 00:00:00 2001 From: Naja Melan Date: Fri, 2 Aug 2019 20:25:18 +0200 Subject: [PATCH 20/22] Fix: Correctly wake up tasks waiting for a next message if the connection gets closed externally. This prevents these tasks from hanging indefinitely. --- Cargo.toml | 2 +- Cargo.yml | 2 +- TODO.md | 1 - src/lib.rs | 17 +++++ src/ws_io.rs | 117 ++++++++++++++++++++++++----------- src/ws_stream.rs | 151 +++++++++++++++++++++++++++------------------ tests/events.rs | 2 +- tests/ws_io.rs | 115 +++++++++++++++++++++++++++++++++- tests/ws_stream.rs | 29 ++++++++- 9 files changed, 334 insertions(+), 102 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 46d59c7..0f53491 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -14,7 +14,7 @@ version = "^0.3" [dependencies.naja_async_runtime] default-features = false -version = "^0.2" +version = "^0.3" [dependencies.wasm-bindgen] version = "^0.2" diff --git a/Cargo.yml b/Cargo.yml index b4aa84d..64f474f 100644 --- a/Cargo.yml +++ b/Cargo.yml @@ -18,7 +18,7 @@ dependencies: bitflags : ^1 pharos : ^0.2 - naja_async_runtime : { version: ^0.2, default-features: false } + naja_async_runtime : { version: ^0.3, default-features: false } failure : ^0.1 futures-preview : { version: ^0.3.0-alpha.17, features: [io-compat, compat] } log : ^0.4 diff --git a/TODO.md b/TODO.md index e4a1466..5aa9f71 100644 --- a/TODO.md +++ b/TODO.md @@ -4,7 +4,6 @@ - reconnect? ## Testing -- verify Cargo.yml + all dependencies ## Documentation - chat client example diff --git a/src/lib.rs b/src/lib.rs index e076442..e322714 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -200,3 +200,20 @@ mod import pharos :: { Pharos, Observable, UnboundedObservable } , }; } + + +use import::*; + +/// Helper function to reduce code bloat +// +pub (crate) fn notify( pharos: Rc>>, evt: WsEvent ) +{ + let notify = async move + { + let mut pharos = pharos.borrow_mut(); + + pharos.notify( &evt ).await; + }; + + rt::spawn_local( notify ).expect_throw( "spawn notify closing" ); +} diff --git a/src/ws_io.rs b/src/ws_io.rs index 14166f7..ef1d71a 100644 --- a/src/ws_io.rs +++ b/src/ws_io.rs @@ -1,6 +1,6 @@ use { - crate :: { import::*, WsErr, WsErrKind, WsMessage, WsState, WsEvent, NextEvent, WsEventType }, + crate :: { import::*, WsErr, WsErrKind, WsMessage, WsState, WsEvent, NextEvent, WsEventType, notify }, }; @@ -13,46 +13,51 @@ use // pub struct WsIo { - ws : Rc< WebSocket > , + ws: Rc< WebSocket >, // The queue of received messages // - queue : Rc >> , + queue: Rc >>, // Last waker of task that wants to read incoming messages // to be woken up on a new message // - waker : Rc >> , + waker: Rc >>, + // Last waker of task that wants to write to the Sink + // + sink_waker: Rc >>, // A pointer to the pharos of WsStream for when we // need to listen to events // - pharos : Rc >> , + pharos: Rc >>, // State information for partially read messages in // AsyncRead // - state : ReadState , + state: ReadState, // The closure that will receive the messages // - _on_mesg: Closure< dyn FnMut( MessageEvent ) > , + _on_mesg: Closure< dyn FnMut( MessageEvent ) >, // This allows us to store a future to poll when // Sink::poll_close is called // - closer : Option< NextEvent > , + closer: Option< NextEvent >, } + impl WsIo { /// Create a new WsIo. // pub fn new( ws: Rc, pharos : Rc >> ) -> Self { - let waker: Rc>> = Rc::new( RefCell::new( None )); + let waker : Rc>> = Rc::new( RefCell::new( None )); + let sink_waker: Rc>> = Rc::new( RefCell::new( None )); let state = ReadState::PendingChunk; let queue = Rc::new( RefCell::new( VecDeque::new() ) ); @@ -82,15 +87,49 @@ impl WsIo ws.set_onmessage ( Some( on_mesg.as_ref().unchecked_ref() ) ); + // When the connection closes, we need to verify if there are any tasks + // waiting on poll_next. We need to wake them up. + // + let ph = pharos.clone(); + let wake = waker.clone(); + let swake = sink_waker.clone(); + + let wake_on_close = async move + { + let rx; + + // Scope to avoid borrowing across await point. + // + { + rx = ph.borrow_mut().observe_unbounded(); + } + + NextEvent::new( rx, WsEventType::CLOSE ).await; + + if let Some(w) = &*wake.borrow() + { + w.wake_by_ref(); + } + + if let Some(w) = &*swake.borrow() + { + w.wake_by_ref(); + } + }; + + rt::spawn_local( wake_on_close ).expect_throw( "spawn wake_on_close" ); + + Self { ws , queue , state , waker , + sink_waker , pharos , - _on_mesg: on_mesg , closer : None , + _on_mesg: on_mesg , } } @@ -139,18 +178,27 @@ impl fmt::Debug for WsIo impl Drop for WsIo { // We don't block here, just tell the browser to close the connection and move on. - // TODO: is this necessary or would it be closed automatically when we drop the WebSocket - // object? Note that there is also the WsStream which holds a clone. // fn drop( &mut self ) { trace!( "Drop WsIo" ); - // This can't fail - // - self.ws.close_with_code( 1000 ).expect( "WsIo::drop - close ws socket" ); + match self.ready_state() + { + WsState::Closing | WsState::Closed => {} + + _ => + { + // This can't fail + // + self.ws.close_with_code( 1000 ).expect( "WsIo::drop - close ws socket" ); + - rt::block_on( self.pharos.borrow_mut().notify( &WsEvent::Closing ) ); + // Notify Observers + // + notify( self.pharos.clone(), WsEvent::Closing ) + } + } self.ws.set_onmessage( None ); } @@ -192,8 +240,6 @@ impl Stream for WsIo - - impl Sink for WsIo { type Error = WsErr; @@ -201,13 +247,19 @@ impl Sink for WsIo // Web API does not really seem to let us check for readiness, other than the connection state. // - fn poll_ready( self: Pin<&mut Self>, _: &mut Context ) -> Poll> + fn poll_ready( mut self: Pin<&mut Self>, cx: &mut Context ) -> Poll> { trace!( "Sink for WsIo: poll_ready" ); match self.ready_state() { - WsState::Connecting => Poll::Pending , + WsState::Connecting => + { + *self.sink_waker.borrow_mut() = Some( cx.waker().clone() ); + + Poll::Pending + } + WsState::Open => Poll::Ready( Ok(()) ), _ => Poll::Ready( Err( WsErrKind::ConnectionNotOpen.into() )), } @@ -267,6 +319,8 @@ impl Sink for WsIo let state = self.ready_state(); + // First close the inner connection + // if state == WsState::Connecting || state == WsState::Open { @@ -274,12 +328,12 @@ impl Sink for WsIo // self.ws.close().unwrap_throw(); - // notify observers - // - rt::block_on( self.pharos.borrow_mut().notify( &WsEvent::Closing ) ); + notify( self.pharos.clone(), WsEvent::Closing ); } + // Check whether it's closed + // match state { WsState::Closed => @@ -310,8 +364,6 @@ impl Sink for WsIo - - impl AsyncWrite for WsIo { fn poll_write( mut self: Pin<&mut Self>, cx: &mut Context, buf: &[u8] ) -> Poll> @@ -401,19 +453,11 @@ impl AsyncRead for WsIo let end = cmp::min( *chunk_start + buf.len(), chunk.len() ); let len = end - *chunk_start; - buf[..len].copy_from_slice( &chunk[*chunk_start..end] ); - + buf[..len].copy_from_slice( &chunk[ *chunk_start..end ] ); - if chunk.len() == end - { - self.state = ReadState::PendingChunk; - } - - else - { - *chunk_start = end; - } + if chunk.len() == end { self.state = ReadState::PendingChunk } + else { *chunk_start = end } return Poll::Ready( Ok(len) ); } @@ -446,7 +490,6 @@ impl AsyncRead for WsIo Poll::Pending => { trace!( "poll_read: return Pending" ); - return Poll::Pending; } } diff --git a/src/ws_stream.rs b/src/ws_stream.rs index a71b6b3..3f1784d 100644 --- a/src/ws_stream.rs +++ b/src/ws_stream.rs @@ -1,6 +1,6 @@ use { - crate :: { import::*, WsErr, WsErrKind, WsState, WsIo, WsEvent, CloseEvent, NextEvent, WsEventType }, + crate :: { import::*, WsErr, WsErrKind, WsState, WsIo, WsEvent, CloseEvent, NextEvent, WsEventType, notify }, }; @@ -76,7 +76,7 @@ impl WsStream // let ws = match res { - Ok(ws) => ws, + Ok(ws) => Rc::new( ws ), Err(e) => { @@ -107,7 +107,10 @@ impl WsStream { trace!( "websocket open event" ); - rt::block_on( ph1.borrow_mut().notify( &WsEvent::Open ) ); + // notify observers + // + notify( ph1.clone(), WsEvent::Open ) + }) as Box< dyn FnMut() > ); @@ -116,7 +119,10 @@ impl WsStream { trace!( "websocket error event" ); - rt::block_on( ph2.borrow_mut().notify( &WsEvent::Error ) ); + // notify observers + // + notify( ph2.clone(), WsEvent::Error ) + }) as Box< dyn FnMut() > ); @@ -125,14 +131,15 @@ impl WsStream { trace!( "websocket close event" ); - let e = CloseEvent + let c = WsEvent::Close( CloseEvent { code : evt.code() , reason : evt.reason() , was_clean: evt.was_clean(), - }; + }); + + notify( ph3.clone(), c ) - rt::block_on( ph3.borrow_mut().notify( &WsEvent::Close(e) )); }) as Box< dyn FnMut( JsCloseEvt ) > ); @@ -168,7 +175,6 @@ impl WsStream // ws.set_binary_type( BinaryType::Arraybuffer ); - let ws = Rc::new( ws ); Ok (( @@ -190,18 +196,28 @@ impl WsStream /// Close the socket. The future will resolve once the socket's state has become `WsState::CLOSED`. /// See: [MDN Documentation](https://developer.mozilla.org/en-US/docs/Web/API/WebSocket/close) // - pub async fn close( &self ) -> CloseEvent + pub async fn close( &self ) -> Result< CloseEvent, WsErr > { - // This can not throw normally, because the only errors the api - // can return is if we use a code or a reason string, which we don't. - // See [mdn](https://developer.mozilla.org/en-US/docs/Web/API/WebSocket/close#Exceptions_thrown). - // - self.ws.close().unwrap_throw(); + match self.ready_state() + { + WsState::Closed => return Err( WsErrKind::ConnectionNotOpen.into() ), + WsState::Closing => {} + _ => + { + // This can not throw normally, because the only errors the api + // can return is if we use a code or a reason string, which we don't. + // See [mdn](https://developer.mozilla.org/en-US/docs/Web/API/WebSocket/close#Exceptions_thrown). + // + self.ws.close().unwrap_throw(); + + + // Notify Observers + // + notify( self.pharos.clone(), WsEvent::Closing ) + } + } - // Notify Observers - // - rt::block_on( self.pharos.borrow_mut().notify( &WsEvent::Closing ) ); let evts = NextEvent::new( self.pharos.borrow_mut().observe_unbounded(), WsEventType::CLOSE ); @@ -213,7 +229,7 @@ impl WsStream let ce = evts.await.expect_throw( "receive a close event" ); trace!( "WebSocket connection closed!" ); - if let WsEvent::Close(e) = ce { e } + if let WsEvent::Close(e) = ce { Ok( e ) } else { unreachable!() } } @@ -225,32 +241,40 @@ impl WsStream // pub async fn close_code( &self, code: u16 ) -> Result { - match self.ws.close_with_code( code ) + match self.ready_state() { - Ok(_) => + WsState::Closed => return Err( WsErrKind::ConnectionNotOpen.into() ), + WsState::Closing => {} + + _ => { - // Notify Observers - // - rt::block_on( self.pharos.borrow_mut().notify( &WsEvent::Closing ) ); + match self.ws.close_with_code( code ) + { + // Notify Observers + // + Ok(_) => notify( self.pharos.clone(), WsEvent::Closing ), + - let evts = NextEvent::new( self.pharos.borrow_mut().observe_unbounded(), WsEventType::CLOSE ); + Err(_) => + { + let e = WsErr::from( WsErrKind::InvalidCloseCode( code ) ); - let ce = evts.await.expect_throw( "receive a close event" ); - trace!( "WebSocket connection closed!" ); + error!( "{}", e ); - if let WsEvent::Close(e) = ce { Ok(e) } - else { unreachable!() } + return Err( e ); + } + } } + } - Err(_) => - { - let e = WsErr::from( WsErrKind::InvalidCloseCode( code ) ); - error!( "{}", e ); + let evts = NextEvent::new( self.pharos.borrow_mut().observe_unbounded(), WsEventType::CLOSE ); - Err( e ) - } - } + let ce = evts.await.expect_throw( "receive a close event" ); + trace!( "WebSocket connection closed!" ); + + if let WsEvent::Close(e) = ce { Ok(e) } + else { unreachable!() } } @@ -260,42 +284,49 @@ impl WsStream // pub async fn close_reason( &self, code: u16, reason: impl AsRef ) -> Result { - if reason.as_ref().len() > 123 + match self.ready_state() { - let e = WsErr::from( WsErrKind::ReasonStringToLong ); - - error!( "{}", e ); + WsState::Closed => return Err( WsErrKind::ConnectionNotOpen.into() ), + WsState::Closing => {} - return Err( e ); - } + _ => + { + if reason.as_ref().len() > 123 + { + let e = WsErr::from( WsErrKind::ReasonStringToLong ); + error!( "{}", e ); - match self.ws.close_with_code_and_reason( code, reason.as_ref() ) - { - Ok(_) => - { - // Notify Observers - // - rt::block_on( self.pharos.borrow_mut().notify( &WsEvent::Closing ) ); + return Err( e ); + } - let evts = NextEvent::new( self.pharos.borrow_mut().observe_unbounded(), WsEventType::CLOSE ); - let ce = evts.await.expect_throw( "receive a close event" ); - trace!( "WebSocket connection closed!" ); + match self.ws.close_with_code_and_reason( code, reason.as_ref() ) + { + // Notify Observers + // + Ok(_) => notify( self.pharos.clone(), WsEvent::Closing ), - if let WsEvent::Close(e) = ce { Ok(e) } - else { unreachable!() } - } - Err(_) => - { - let e = WsErr::from( WsErrKind::InvalidCloseCode( code ) ); + Err(_) => + { + let e = WsErr::from( WsErrKind::InvalidCloseCode( code ) ); - error!( "{}", e ); + error!( "{}", e ); - Err( e ) + return Err( e ) + } + } } } + + let evts = NextEvent::new( self.pharos.borrow_mut().observe_unbounded(), WsEventType::CLOSE ); + + let ce = evts.await.expect_throw( "receive a close event" ); + trace!( "WebSocket connection closed!" ); + + if let WsEvent::Close(e) = ce { Ok(e) } + else { unreachable!() } } @@ -415,3 +446,5 @@ impl Drop for WsStream self.ws.set_onerror( None ); } } + + diff --git a/tests/events.rs b/tests/events.rs index 3c85fca..70db980 100644 --- a/tests/events.rs +++ b/tests/events.rs @@ -40,7 +40,7 @@ pub fn close_events() -> impl Future01 let mut evts = ws.observe_unbounded(); - ws.close().await; + ws.close().await.expect_throw( "close ws" ); assert_eq!( WsEventType::CLOSING, evts.next().await.unwrap_throw().ws_type() ); assert_eq!( WsEventType::CLOSE , evts.next().await.unwrap_throw().ws_type() ); diff --git a/tests/ws_io.rs b/tests/ws_io.rs index d9c3292..af83cb2 100644 --- a/tests/ws_io.rs +++ b/tests/ws_io.rs @@ -18,11 +18,15 @@ wasm_bindgen_test_configure!(run_in_browser); use { futures_01 :: Future as Future01, + async_runtime :: * , futures::prelude :: * , + futures::sink :: * , + futures::io :: * , wasm_bindgen::prelude :: * , wasm_bindgen_test :: * , ws_stream_wasm :: * , log :: * , + pharos :: * , }; @@ -131,7 +135,7 @@ fn send_after_close() -> impl Future01 { let (ws, mut wsio) = WsStream::connect( URL, None ).await.expect_throw( "Could not create websocket" ); - ws.close().await; + ws.close().await.expect_throw( "close ws" ); let res = wsio.send( WsMessage::Text("Hello from browser".into() ) ).await; @@ -143,6 +147,115 @@ fn send_after_close() -> impl Future01 } + +// Verify closing that when closing from WsStream, WsIo next() returns none. +// +#[ wasm_bindgen_test(async) ] +// +pub fn close_from_wsstream() -> impl Future01 +{ + let _ = console_log::init_with_level( Level::Trace ); + + info!( "starting test: close_from_wsstream" ); + + async + { + let (ws, mut wsio) = WsStream::connect( URL, None ).await.expect_throw( "Could not create websocket" ); + + ws.close().await.expect_throw( "close ws" ); + + assert!( wsio.next().await.is_none() ); + + Ok(()) + + }.boxed_local().compat() +} + + + +// Verify that closing wakes up a task pending on poll_next() +// +#[ wasm_bindgen_test(async) ] +// +pub fn close_from_wsstream_while_pending() -> impl Future01 +{ + let _ = console_log::init_with_level( Level::Trace ); + + info!( "starting test: close_from_wsstream_while_pending" ); + + async + { + let (ws, mut wsio) = WsStream::connect( URL, None ).await.expect_throw( "Could not create websocket" ); + + rt::spawn_local( async move { ws.close().await.expect_throw( "close ws" ); } ).expect_throw( "spawn close" ); + + // if we don't wake up the task, this will hang + // + assert!( wsio.next().await.is_none() ); + + Ok(()) + + }.boxed_local().compat() +} + + + +// Verify that closing wakes up a task pending on poll_next() +// +#[ wasm_bindgen_test(async) ] +// +pub fn close_event_from_sink() -> impl Future01 +{ + let _ = console_log::init_with_level( Level::Trace ); + + info!( "starting test: close_event_from_sink" ); + + async + { + let (mut ws, mut wsio) = WsStream::connect( URL, None ).await.expect_throw( "Could not create websocket" ); + + let mut evts = ws.observe_unbounded(); + + SinkExt::close( &mut wsio ).await.expect_throw( "close ws" ); + + assert_eq!( WsEventType::CLOSING, evts.next().await.unwrap_throw().ws_type() ); + assert_eq!( WsEventType::CLOSE , evts.next().await.unwrap_throw().ws_type() ); + + Ok(()) + + }.boxed_local().compat() +} + + + +// Verify that closing wakes up a task pending on poll_next() +// +#[ wasm_bindgen_test(async) ] +// +pub fn close_event_from_async_write() -> impl Future01 +{ + let _ = console_log::init_with_level( Level::Trace ); + + info!( "starting test: close_event_from_async_write" ); + + async + { + let (mut ws, mut wsio) = WsStream::connect( URL, None ).await.expect_throw( "Could not create websocket" ); + + let mut evts = ws.observe_unbounded(); + + AsyncWriteExt::close( &mut wsio ).await.expect_throw( "close ws" ); + + assert_eq!( WsEventType::CLOSING, evts.next().await.unwrap_throw().ws_type() ); + assert_eq!( WsEventType::CLOSE , evts.next().await.unwrap_throw().ws_type() ); + + Ok(()) + + }.boxed_local().compat() +} + + + // Verify Debug impl. // #[ wasm_bindgen_test(async) ] diff --git a/tests/ws_stream.rs b/tests/ws_stream.rs index c9783d4..6941500 100644 --- a/tests/ws_stream.rs +++ b/tests/ws_stream.rs @@ -195,7 +195,7 @@ pub fn state() -> impl Future01 assert_eq!( WsState::Closing, ws .ready_state() ); assert_eq!( WsState::Closing, wsio.ready_state() ); - ws.close().await; + ws.close().await.expect_throw( "close ws" ); assert_eq!( WsState::Closed, ws .ready_state() ); assert_eq!( WsState::Closed, wsio.ready_state() ); @@ -313,6 +313,33 @@ pub fn protocols_server_accept_none() -> impl Future01 impl Future01 +{ + let _ = console_log::init_with_level( Level::Trace ); + + info!( "starting test: close_twice" ); + + async + { + let (ws, _wsio) = WsStream::connect( URL, None ).await.expect_throw( "Could not create websocket" ); + + let res = ws.close().await; + + assert!( res.is_ok() ); + + assert_eq!( ws.close () .await.unwrap_err().kind(), &WsErrKind::ConnectionNotOpen ); + assert_eq!( ws.close_code ( 1000 ).await.unwrap_err().kind(), &WsErrKind::ConnectionNotOpen ); + assert_eq!( ws.close_reason( 1000, "Normal shutdown" ).await.unwrap_err().kind(), &WsErrKind::ConnectionNotOpen ); + + Ok(()) + + }.boxed_local().compat() +} + + + #[ wasm_bindgen_test(async) ] // pub fn close_code_valid() -> impl Future01 From 4fda04b614a4b5bb7bec677e5e0f428264e61b54 Mon Sep 17 00:00:00 2001 From: Naja Melan Date: Fri, 2 Aug 2019 21:33:30 +0200 Subject: [PATCH 21/22] Bump version to 0.2.0 --- CHANGELOG.md | 9 +++++++++ Cargo.toml | 2 +- Cargo.yml | 2 +- README.md | 7 ++++++- src/lib.rs | 5 +++++ 5 files changed, 22 insertions(+), 3 deletions(-) create mode 100644 CHANGELOG.md diff --git a/CHANGELOG.md b/CHANGELOG.md new file mode 100644 index 0000000..d4153c6 --- /dev/null +++ b/CHANGELOG.md @@ -0,0 +1,9 @@ +# Changelog + +## 0.2 - 2019-08-02 + +- **BREAKING CHANGE**: Fix: Correctly wake up tasks waiting for a next message if the connection gets closed externally. + This prevents these tasks from hanging indefinitely. + As a consequence, `WsStream::close` now returns a `Result`, taking into account that if the connection is already + closed, we don't have the `CloseEvent`. Instead a `WsErr` of kind `WsErrKind::ConnectionNotOpen` is returned. +- update to async_runtime 0.3 diff --git a/Cargo.toml b/Cargo.toml index 0f53491..be07a6d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -61,4 +61,4 @@ license = "Unlicense" name = "ws_stream_wasm" readme = "README.md" repository = "https://github.com/najamelan/ws_stream_wasm" -version = "0.1.0" +version = "0.2.0" diff --git a/Cargo.yml b/Cargo.yml index 64f474f..b8f683f 100644 --- a/Cargo.yml +++ b/Cargo.yml @@ -1,7 +1,7 @@ package: name : ws_stream_wasm - version : 0.1.0 + version : 0.2.0 edition : '2018' authors : [ Naja Melan ] description : A convenience library for using websockets in WASM diff --git a/README.md b/README.md index c5be247..0f49a0f 100644 --- a/README.md +++ b/README.md @@ -27,9 +27,10 @@ WebSockets, check out [ws_stream](https://crates.io/crates/ws_stream). ## Table of Contents - [Install](#install) + - [Upgrade](#upgrade) - [Dependencies](#dependencies) - [Usage](#usage) -- [API](#api) + - [API](#api) - [References](#references) - [Contributing](#contributing) - [Code of Conduct](#code-of-conduct) @@ -54,6 +55,10 @@ With raw Cargo.toml ws_stream_wasm = "^0.1" ``` +### Upgrade + +Please check out the [changelog](https://github.com/najamelan/ws_wasm_stream/blob/master/CHANGELOG.md) when upgrading. + ### Dependencies This crate has few dependiencies. Cargo will automatically handle it's dependencies for you. diff --git a/src/lib.rs b/src/lib.rs index e322714..a6b2430 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -27,6 +27,7 @@ //! ## Table of Contents //! //! - [Install](#install) +//! - [Upgrade](#upgrade) //! - [Dependencies](#dependencies) //! - [Usage](#usage) //! - [Basic Events Example](basic-events-example) @@ -55,6 +56,10 @@ //! ws_stream_wasm = "^0.1" //! ``` //! +//! ### Upgrade +//! +//! Please check out the [changelog](https://github.com/najamelan/ws_wasm_stream/blob/master/CHANGELOG.md) when upgrading. +//! //! ### Dependencies //! //! This crate has few dependiencies. Cargo will automatically handle it's dependencies for you. From 613532d6455a9bca996eb15596edd0fe82dc0f6f Mon Sep 17 00:00:00 2001 From: Naja Melan Date: Fri, 2 Aug 2019 21:36:32 +0200 Subject: [PATCH 22/22] chat example: Working Disconnect button --- examples/chat_client/Cargo.toml | 6 ++- examples/chat_client/Cargo.yml | 4 +- examples/chat_client/README.md | 1 + examples/chat_client/index.html | 6 +-- examples/chat_client/src/entrypoint.rs | 63 ++++++++++++++++++-------- examples/chat_client/src/user_list.rs | 2 +- 6 files changed, 56 insertions(+), 26 deletions(-) diff --git a/examples/chat_client/Cargo.toml b/examples/chat_client/Cargo.toml index ebcfe31..18858c4 100644 --- a/examples/chat_client/Cargo.toml +++ b/examples/chat_client/Cargo.toml @@ -7,7 +7,6 @@ log = "^0.4" regex = "^1" wasm-bindgen = "^0.2" wasm-logger = "^0.1" -ws_stream_wasm = "^0.1" [dependencies.chat_format] path = "../../../ws_stream/examples/chat_format" @@ -21,12 +20,15 @@ git = "https://github.com/rustwasm/gloo" [dependencies.naja_async_runtime] default-features = false -version = "^0.2" +version = "^0.3" [dependencies.web-sys] features = ["console", "CssStyleDeclaration", "Document", "Element", "HtmlDivElement", "HtmlElement", "HtmlFormElement", "HtmlInputElement", "HtmlParagraphElement", "HtmlTextAreaElement", "KeyboardEvent", "Node", "Window"] version = "^0.3" +[dependencies.ws_stream_wasm] +path = "../../" + [lib] crate-type = ["cdylib"] path = "src/entrypoint.rs" diff --git a/examples/chat_client/Cargo.yml b/examples/chat_client/Cargo.yml index e834b01..200f51f 100644 --- a/examples/chat_client/Cargo.yml +++ b/examples/chat_client/Cargo.yml @@ -18,8 +18,8 @@ dependencies: console_error_panic_hook: ^0.1 chat_format : { path: ../../../ws_stream/examples/chat_format } - naja_async_runtime : { version: ^0.2, default-features: false } - ws_stream_wasm : ^0.1 + naja_async_runtime : { version: ^0.3, default-features: false } + ws_stream_wasm : { path: ../../ } wasm-bindgen : ^0.2 futures_codec : ^0.2 futures-preview : { version: ^0.3.0-alpha.17, features: [io-compat, compat] } diff --git a/examples/chat_client/README.md b/examples/chat_client/README.md index bcb53b6..72c17ec 100644 --- a/examples/chat_client/README.md +++ b/examples/chat_client/README.md @@ -33,6 +33,7 @@ Now you can open the `index.html` from this crate in several web browser tabs an ## TODO - disconnect button +- server side disconnect - reread all code and cleanup - document as example - gui diff --git a/examples/chat_client/index.html b/examples/chat_client/index.html index c819b5a..e1e1d94 100644 --- a/examples/chat_client/index.html +++ b/examples/chat_client/index.html @@ -158,8 +158,8 @@

ws_stream_wasm Chat Client Example

-
 
-
 
+
+
@@ -170,7 +170,7 @@

ws_stream_wasm Chat Client Example

- + diff --git a/examples/chat_client/src/entrypoint.rs b/examples/chat_client/src/entrypoint.rs index cd644ca..0a23aa1 100644 --- a/examples/chat_client/src/entrypoint.rs +++ b/examples/chat_client/src/entrypoint.rs @@ -1,5 +1,5 @@ #![ feature( async_await ) ] -#![ allow( unused_imports, unused_variables ) ] +#![ allow( unused_imports ) ] pub(crate) mod e_handler ; pub(crate) mod color ; @@ -52,21 +52,20 @@ pub fn main() -> Result<(), JsValue> let program = async { - let chat = get_id( "chat" ); let cform = get_id( "connect_form" ); let tarea = get_id( "chat_input" ); let cnick: HtmlInputElement = get_id( "connect_nick" ).unchecked_into(); cnick.set_value( random_name() ); - let on_enter = EHandler::new( &tarea, "keypress", false ); - let on_csubmit = EHandler::new( &cform, "submit" , false ); - let on_creset = EHandler::new( &cform, "reset" , false ); + let enter_evts = EHandler::new( &tarea, "keypress", false ); + let csubmit_evts = EHandler::new( &cform, "submit" , false ); + let creset_evts = EHandler::new( &cform, "reset" , false ); - rt::spawn_local( on_key ( on_enter ) ).expect( "spawn on_key" ); - rt::spawn_local( on_cresets ( on_creset ) ).expect( "spawn on_key" ); + rt::spawn_local( on_key ( enter_evts ) ).expect( "spawn on_key" ); - on_connect ( on_csubmit ).await; + rt::spawn_local( on_cresets ( creset_evts ) ).expect( "spawn on_key" ); + on_connect( csubmit_evts ).await; }; rt::spawn_local( program ).expect( "spawn program" ); @@ -82,8 +81,6 @@ fn append_line( chat: &Element, time: f64, nick: &str, line: &str, color: &Color let m: HtmlElement = document().create_element( "span" ).expect_throw( "create span" ).unchecked_into(); let t: HtmlElement = document().create_element( "span" ).expect_throw( "create span" ).unchecked_into(); - debug!( "setting color to: {}", color.to_css() ); - n.style().set_property( "color", &color.to_css() ).expect_throw( "set color" ); if color_all @@ -112,9 +109,6 @@ fn append_line( chat: &Element, time: f64, nick: &str, line: &str, color: &Color let max_scroll = chat.scroll_height() - chat.client_height(); chat.append_child( &p ).expect( "Coundn't append child" ); - debug!( "max_scroll: {}, scroll_top: {}", max_scroll, chat.scroll_top() ); - - // Check whether we are scolled to the bottom. If so, we autoscroll new messages // into vies. If the user has scrolled up, we don't. // @@ -275,7 +269,7 @@ async fn on_submit // if this is a /help message // - else if let Some( cap ) = helpre.captures( &text ) + else if helpre.is_match( &text ) { debug!( "handle /help: {:#?}", &text ); @@ -299,6 +293,8 @@ async fn on_submit Err(e) => { error!( "{}", e ); } }; }; + + debug!( "leaving on_msg" ); } @@ -341,7 +337,7 @@ async fn on_connect( mut evts: impl Stream< Item=Event > + Unpin ) { Ok(ok) => ok, - Err( e ) => + Err( _ ) => { // report error to the user // continue loop @@ -397,9 +393,17 @@ async fn on_connect( mut evts: impl Stream< Item=Event > + Unpin ) { cform.style().set_property( "display", "none" ).expect_throw( "set cform display none" ); - rt::spawn_local( on_msg ( msgs ) ).expect( "spawn on_msg" ); - rt::spawn_local( on_submit( on_send , out ) ).expect( "spawn on_submit" ); - } + let chat = get_id( "chat_form" ); + let reset_evts = EHandler::new( &chat , "reset" , false ); + + rt::spawn_local( on_msg ( msgs ) ).expect( "spawn on_msg" ); + rt::spawn_local( on_submit ( on_send , out ) ).expect( "spawn on_submit" ); + + on_disconnect( reset_evts ).await; + ws.close().await.expect_throw( "close ws" ); + + debug!( "connection closed by disconnect" ); + } // Show an error message on the connect form and let the user try again // @@ -464,6 +468,29 @@ async fn on_cresets( mut evts: impl Stream< Item=Event > + Unpin ) +async fn on_disconnect( mut evts: impl Stream< Item=Event > + Unpin ) +{ + while evts.next().await.is_some() + { + debug!( "on_disconnect" ); + + // show the connect form + // + let cform: HtmlElement = get_id( "connect_form" ).unchecked_into(); + let chat : HtmlElement = get_id( "chat" ).unchecked_into(); + + cform.style().set_property( "display", "flex" ).expect_throw( "set cform display none" ); + chat.set_inner_html( "" ); + + let udiv = get_id( "users" ); + udiv.set_inner_html( "" ); + + break; + } +} + + + pub fn document() -> Document { let window = web_sys::window().expect_throw( "no global `window` exists"); diff --git a/examples/chat_client/src/user_list.rs b/examples/chat_client/src/user_list.rs index d7eaeb9..1643ebd 100644 --- a/examples/chat_client/src/user_list.rs +++ b/examples/chat_client/src/user_list.rs @@ -91,7 +91,7 @@ impl UserList // TODO: Get rid of clone // existing users know if they are in the dom, so we don't call render on them. // - .and_modify( |e| e.change_nick( nick.clone() ) ) + .and_modify( |usr| usr.change_nick( nick.clone() ) ) .or_insert_with ( || {