Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Async server use needs more documentation #170

Open
anlumo opened this issue Mar 19, 2018 · 3 comments
Open

Async server use needs more documentation #170

anlumo opened this issue Mar 19, 2018 · 3 comments

Comments

@anlumo
Copy link

anlumo commented Mar 19, 2018

Hello,

I'm moderately experienced with Rust and trying to get an async websocket server to run using this crate. I'm really having a hard time wrapping my head around this, and there's no documentation for this other than a single far too simplistic example and the reference documentation that's nearly unreadable due to all of the async stuff using types that go all over the place. The error messages I get from rustc are larger than my screen and remind me of my heavily templated C++ past.

For example, even the most basic thing of sending a text message via the websocket is not really explained anywhere.

What I found out about the example code is this:

stream.take_while(|m| Ok(!m.is_close()))

This is like the beginning of an async while loop.

.filter_map(|m| {

Receive one message from the websocket, and optionally transform the object to something else (why?).

match m {
    OwnedMessage::Ping(p) => Some(OwnedMessage::Pong(p)),
    OwnedMessage::Pong(_) => None,
    _ => Some(m),
}

Convert ping to pong, pong to not sending anything and pass along anything else. I don't see anything here actually sending a message, though. This just generates packets.

.forward(sink)

This I assume is the end of the while loop (I'm not 100% sure, that's just a hunch). I have no idea where that OwnedMessage goes to that's generated in the previous async block. It also doesn't explain how to send a message that's not a direct response to a message on the websocket.

.and_then(|(_, sink)| {sink.send(OwnedMessage::Close(None))})

When the take_while/forward-loop is done, close the socket.

I can't believe I'm the only one who has problems wrapping his head around this undocumented code. I've also been unable to find any source code online that's using this crate to create an async websocket server, which might be related to this issue.

Could you please write up some documentation and maybe even write a more thorough example? A simple chat server for example, where you can send and receive messages in a group chat.

@Aerolivier
Copy link

Yes, I am having the exact same problems.

I've had to read into Tokio and now have a vague-ish understanding of how it works.

Sending messages in direct response to incoming messages seems simple enough, but sending delayed or spontaneous messages isn't going so well.
I'm trying to piece together the async-server example with this Tokio example: https://github.com/tokio-rs/tokio/blob/master/examples/chat.rs , but I'm still running into problems resulting in type errors of 1400 chars in length, I think related to the error type. (But with such long type errors, it's hard to interpret just what I exactly need to change)

The technique seems to be to use a Tokio mpsc channel pair and attach it to each socket, but I'm still scratching my head for now — a documented example would be extremely useful, and much appreciated, if anyone can figure out how to do it..

@anlumo
Copy link
Author

anlumo commented Mar 24, 2018

You sometimes need to get rid of error types (convert them to ()) via map_err.

@anlumo
Copy link
Author

anlumo commented Mar 24, 2018

I think I've conquered the websocket futures. The trick is to create a channel and link it to the websocket like this:

let f = upgrade.accept().and_then(move |(s, _)| {
  let (tx, rx) = mpsc::channel(8);
  let (sink, stream) = s.split();

  let writer = rx.map_err(|_| { WebSocketError::ProtocolError("Can't happen") }).forward(sink).and_then(|(_, sink)| {
      sink.send(OwnedMessage::Close(None))
  }).map_err(|err| {
      error!("{}", err);
  }).map(|_| {});
  handle_inner.spawn(writer);

  stream.take_while(|m| Ok(!m.is_close()))
      .for_each(move |m| {
          let tx_inner = tx.clone();
          println!("Incoming message: {:?}", m);
          // handle message here
          Ok(())
      })
});
spawn_future(f, "Websocket", &handle);

Now, to send a message to that websocket from anywhere, you need a reference to tx and handle (both have a clone() function, so you can pass them around everywhere) and do this:

handle_inner.spawn(tx_inner.send(OwnedMessage::Text("OK!".to_string())).map_err(|err| {
    error!("{}", err);
}).map(|_| {}));

You have to consume both the error (WebsocketError) and the regular result (which is a useless Sink), because spawn expects a Future with both Item and Error as ().

Note that for proper socket closing, all tx clones have to go out of scope once the Future created from stream is finished.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants