Skip to content

Commit

Permalink
Update to tungstenite 0.20, deal with max_send_buffer_len
Browse files Browse the repository at this point in the history
Tungstenite 0.20 starts buffering messages and thus has a max buffer size. When sending more data in, it returns an error.
This is a problem for us as this would be fatal. We don't want to send any more data in than the buffer allows.

TODO in follow up commit:
- tests
- documentation
  • Loading branch information
najamelan committed Oct 7, 2023
1 parent 87c1bf8 commit 4fded46
Show file tree
Hide file tree
Showing 20 changed files with 270 additions and 126 deletions.
23 changes: 22 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,27 @@

## [Unreleased]

[Unreleased]: https://github.com/najamelan/ws_stream_tungstenite/compare/0.10.0...dev
[Unreleased]: https://github.com/najamelan/ws_stream_tungstenite/compare/0.11.0...dev


## [0.11.0 - 2023-10-07]

[0.11.0]: https://github.com/najamelan/ws_stream_tungstenite/compare/0.9.0...0.10.0

- **BREAKING_CHANGE**/**SECURITY UPDATE**: update tungstenite to 0.20.1.
See: [RUSTSEC-2023-0065](https://rustsec.org/advisories/RUSTSEC-2023-0065).
Make sure to check how the new version of tungstenite
[handles buffering](https://docs.rs/tungstenite/latest/tungstenite/protocol/struct.WebSocketConfig.html)
messages before sending them. Having `write_buffer_size` to anything but `0` might cause
messages not to be sent until you flush. _ws_stream_tungstenite_ will make sure to respect
`max_write_buffer_size`, so you shouldn't have to deal with the errors, but note that if
you set it to something really small it might lead to performance issues on throughput.
I wanted to roll this version out fast for the security vulnerability, but note that the
implementation of `AsyncWrite::poll_write_vectored` that handles compliance with `max_write_buffer_size`
currently has no tests. If you want to use it, please review the code.
- **BREAKING_CHANGE**: update async-tungstenite to 0.23
- **BREAKING_CHANGE**: switched to tracing for logging (check out the _tracing-log_ crate
if you need to consume the events with a log consumer)


## [0.10.0]
Expand All @@ -13,6 +33,7 @@
- **BREAKING_CHANGE**: update async-tungstenite to 0.22
- **BREAKING_CHANGE**: update tungstenite to 0.19


## [0.9.0]

[0.9.0]: https://github.com/najamelan/ws_stream_tungstenite/compare/0.8.0...0.9.0
Expand Down
16 changes: 10 additions & 6 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,6 @@ version = "^0.3"
default-features = false
version = "^0.3"

[dependencies.log]
default-features = false
version = "^0.4"

[dependencies.pharos]
default-features = false
version = "^0.5"
Expand All @@ -52,6 +48,9 @@ default-features = false
optional = true
version = "^1"

[dependencies.tracing]
version = "^0.1"

[dependencies.tungstenite]
default-features = false
version = "^0.20"
Expand All @@ -60,12 +59,12 @@ version = "^0.20"
assert_matches = "^1"
async_progress = "^0.2"
asynchronous-codec = "^0.6"
flexi_logger = "^0.25"
futures = "^0.3"
futures-test = "^0.3"
futures-timer = "^3"
futures_ringbuf = "^0.3"
pin-utils = "^0.1"
tracing-log = "^0.1"
url = "^2"

[dev-dependencies.async-std]
Expand All @@ -86,6 +85,11 @@ default-features = false
features = ["codec"]
version = "^0.7"

[dev-dependencies.tracing-subscriber]
default-features = false
features = ["ansi", "env-filter", "fmt", "json", "tracing-log"]
version = "^0.3"

[[example]]
name = "tokio_codec"
path = "examples/tokio_codec.rs"
Expand All @@ -108,7 +112,7 @@ license = "Unlicense"
name = "ws_stream_tungstenite"
readme = "README.md"
repository = "https://github.com/najamelan/ws_stream_tungstenite"
version = "0.10.0"
version = "0.11.0"

[package.metadata]
[package.metadata.docs]
Expand Down
8 changes: 5 additions & 3 deletions Cargo.yml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ package:
# - `git tag x.x.x` with version number.
# - `git push && git push --tags`
#
version : 0.10.0
version : 0.11.0
name : ws_stream_tungstenite
edition : '2021'
authors : [ Naja Melan <[email protected]> ]
Expand Down Expand Up @@ -69,11 +69,11 @@ dependencies:
futures-sink : { version: ^0.3 , default-features: false }
futures-io : { version: ^0.3 , default-features: false }
futures-util : { version: ^0.3 , default-features: false }
log : { version: ^0.4 , default-features: false }
tungstenite : { version: ^0.20, default-features: false }
pharos : { version: ^0.5 , default-features: false }
async-tungstenite : { version: ^0.23, default-features: false }
tokio : { version: ^1 , default-features: false, optional: true }
tracing : { version: ^0.1 }

# private deps
#
Expand All @@ -87,7 +87,6 @@ dev-dependencies:
async-tungstenite : { version: ^0.23, features: [ tokio-runtime, async-std-runtime ] }
assert_matches : ^1
async_progress : ^0.2
flexi_logger : ^0.25
futures : ^0.3
futures-test : ^0.3
futures-timer : ^3
Expand All @@ -96,6 +95,9 @@ dev-dependencies:
# pretty_assertions : ^0.6
tokio : { version: ^1, default-features: false, features: [ net, rt, rt-multi-thread, macros ] }
tokio-util : { version: ^0.7, default-features: false, features: [ codec ] }
tracing-subscriber : { version: ^0.3, default-features: false, features: [ ansi, env-filter, fmt, json, tracing-log ] }
tracing-log : ^0.1

# tokio-stream : { version: ^0.1, default-features: false, features: [] }
url : ^2
pin-utils : ^0.1
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ use
{
ws_stream_tungstenite :: { * } ,
futures :: { StreamExt } ,
log :: { * } ,
tracing :: { * } ,
async_tungstenite :: { accept_async } ,
asynchronous_codec :: { LinesCodec, Framed } ,
async_std :: { net::TcpListener } ,
Expand Down
28 changes: 10 additions & 18 deletions examples/close.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,31 +2,23 @@
//
use
{
ws_stream_tungstenite :: { * } ,
ws_stream_tungstenite :: { * } ,
futures :: { TryFutureExt, StreamExt, SinkExt, join, executor::block_on } ,
asynchronous_codec :: { LinesCodec, Framed } ,
tokio :: { net::{ TcpListener } } ,
futures :: { FutureExt, select, future::{ ok, ready } } ,
async_tungstenite :: { accept_async, tokio::{ TokioAdapter, connect_async } } ,
url :: { Url } ,
log :: { * } ,
std :: { time::Duration } ,
futures_timer :: { Delay } ,
pin_utils :: { pin_mut } ,
asynchronous_codec :: { LinesCodec, Framed } ,
tokio :: { net::{ TcpListener } } ,
futures :: { FutureExt, select, future::{ ok, ready } } ,
async_tungstenite :: { accept_async, tokio::{ TokioAdapter, connect_async } } ,
url :: { Url } ,
tracing :: { * } ,
std :: { time::Duration } ,
futures_timer :: { Delay } ,
pin_utils :: { pin_mut } ,
};



fn main()
{
flexi_logger::Logger

::try_with_str( "close=info, tungstenite=warn, tokio_tungstenite=warn, ws_stream_tungstenite=warn" )
.expect( "flexi_logger")
.start()
.expect( "flexi_logger")
;

block_on( async
{
join!( server(), client() );
Expand Down
2 changes: 1 addition & 1 deletion examples/echo.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use
ws_stream_tungstenite :: { * } ,
futures :: { AsyncReadExt, io::{ BufReader, copy_buf } } ,
std :: { env, net::SocketAddr, io } ,
log :: { * } ,
tracing :: { * } ,
tokio :: { net::{ TcpListener, TcpStream } } ,
async_tungstenite :: { accept_async, tokio::{ TokioAdapter } } ,
};
Expand Down
2 changes: 1 addition & 1 deletion examples/echo_tt.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use
futures :: { StreamExt } ,
async_tungstenite :: { accept_async, tokio::TokioAdapter } ,
tokio :: { net::{ TcpListener, TcpStream } } ,
log :: { * } ,
tracing :: { * } ,
std :: { env, net::SocketAddr } ,
};

Expand Down
13 changes: 2 additions & 11 deletions examples/tokio_codec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,28 +4,19 @@
//
use
{
tungstenite :: { protocol::Role } ,
ws_stream_tungstenite :: { * } ,
futures :: { StreamExt, SinkExt, future::join } ,
tokio_util::codec :: { LinesCodec, Framed } ,
futures_ringbuf :: { Endpoint } ,

log :: { * } ,
tungstenite::{ protocol::Role } ,
tracing :: { * } ,
};


#[ tokio::main ]
//
async fn main()
{
flexi_logger::Logger

::try_with_str( "futures_ringbuf=info, tokio_codec=trace, tokio_util=trace, ws_stream_tungstenite=trace, tokio=trace" )
.expect( "flexi_logger")
.start()
.expect( "flexi_logger")
;

let (server_con, client_con) = Endpoint::pair( 64, 64 );

let server = async move
Expand Down
5 changes: 2 additions & 3 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ pub use
};



mod import
{
pub(crate) use
Expand All @@ -46,7 +45,7 @@ mod import
futures_sink :: { Sink } ,
futures_io :: { AsyncRead, AsyncWrite, AsyncBufRead } ,
futures_util :: { FutureExt } ,
log :: { error } ,
tracing :: { error } ,
std :: { io, io::{ IoSlice, IoSliceMut }, pin::Pin, fmt, borrow::Cow } ,
std :: { collections::VecDeque, sync::Arc, task::{ Context, Poll } } ,
async_tungstenite :: { WebSocketStream as ATungSocket } ,
Expand Down Expand Up @@ -77,7 +76,7 @@ mod import
futures_ringbuf :: { Endpoint } ,
futures :: { future::{ join } } ,
tungstenite :: { protocol::{ Role } } ,
log :: { * } ,
tracing :: { * } ,
};
}

15 changes: 8 additions & 7 deletions src/tung_websocket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ bitflags!


/// A wrapper around a WebSocket provided by tungstenite. This provides Stream/Sink Vec<u8> to
/// simplify implementing AsyncRead/AsyncWrite on top of tokio-tungstenite.
/// simplify implementing AsyncRead/AsyncWrite on top of async-tungstenite.
//
pub(crate) struct TungWebSocket<S> where S: AsyncRead + AsyncWrite + Send + Unpin
{
Expand Down Expand Up @@ -546,25 +546,26 @@ fn to_io_error( err: TungErr ) -> io::Error
TungErr::Capacity(string) => io::Error::new( io::ErrorKind::InvalidData, string ),


// This is dealt with by backpressure in the compat layer over tokio-tungstenite.
// We should never see this error.
// This can happen if we send a message bigger than the tungstenite `max_write_buffer_len`.
// However `WsStream` looks at the size of this buffer and only sends up to `max_write_buffer_len`
// bytes in one message.
//
TungErr::WriteBufferFull(_) |
TungErr::WriteBufferFull(_) => unreachable!( "TungErr::WriteBufferFull" ),

// These are handshake errors
//
TungErr::Url (..) |
TungErr::Url(_) => unreachable!( "TungErr::Url" ),

// This is an error specific to Text Messages that we don't use
//
TungErr::Utf8 |
TungErr::Utf8 => unreachable!( "TungErr::Utf8" ),

// I'd rather have this match exhaustive, but tungstenite has a Tls variant that
// is only there if they have a feature enabled. Since we cannot check whether
// a feature is enabled on a dependency, we have to go for wildcard here.
// As of tungstenite 0.19 Http and HttpFormat are also behind a feature flag.
//
_ => unreachable!() ,
x => unreachable!( "unmatched tungstenite error: {x}" ),
}
}

Expand Down
Loading

0 comments on commit 4fded46

Please sign in to comment.