Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ox WebSockets #2187

Merged
merged 22 commits into from
May 27, 2024
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Add scaladoc
kciesielski committed May 24, 2024

Verified

This commit was signed with the committer’s verified signature.
kciesielski Krzysztof Ciesielski
commit 95de8c066cc5e2b4ee3ab2a13b5aa6997bf979af
Original file line number Diff line number Diff line change
@@ -7,7 +7,27 @@ import sttp.ws.WebSocketFrame

import scala.util.control.NonFatal

def asSourceAndSink(ws: SyncWebSocket, concatenateFragmented: Boolean = true, pongOnPing: Boolean = true)(using
/** Converts a [[SyncWebSocket]] into a pair of `Source` of server responses and a `Sink` for client requests. The
kciesielski marked this conversation as resolved.
Show resolved Hide resolved
* `Source` starts receiving frames immediately, its internal buffer size can be adjusted with an implicit
* [[ox.channels.StageCapacity]].
* Make sure that the `Source` is contiunually read. This will guarantee that server-side Close signal is received and handled.
* If you don't want to process frames from the server, you can at least handle it with a `fork { source.drain() }`.
*
* You don't need to manually call `ws.close()` when using this approach, this will be
* handled automatically underneath, according to following rules:
* - If the request `Sink` fails with an error, the `Source` is automatically completed, sending a `Close` frame to
* the server if needed.
* - If the request `Sink` completes without an error, a `Close` frame is sent, and the response `Sink` keeps
* receiving responses until the server closes communication.
* - If the response `Source` is completed (either due to completion or an error), the request Sink is completed,
* right after sending all outstanding buffered frames.
*
* @param ws
* a `SyncWebSocket` where the underlying `Sink` will send requests, and where the `Source` will pull responses from.
* @param concatenateFragmented
* whether fragmented frames from the server should be concatenated into a single frame (true by default).
*/
def asSourceAndSink(ws: SyncWebSocket, concatenateFragmented: Boolean = true)(using
Ox,
StageCapacity
): (Source[WebSocketFrame], Sink[WebSocketFrame]) =
@@ -28,7 +48,7 @@ def asSourceAndSink(ws: SyncWebSocket, concatenateFragmented: Boolean = true, po
responsesChannel.doneOrClosed().discard
false
case ping: WebSocketFrame.Ping =>
if pongOnPing then requestsChannel.sendOrClosed(WebSocketFrame.Pong(ping.payload)).discard
requestsChannel.sendOrClosed(WebSocketFrame.Pong(ping.payload)).discard
kciesielski marked this conversation as resolved.
Show resolved Hide resolved
true
case _: WebSocketFrame.Pong =>
// ignore pongs