Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Web Sockets for netty-loom #3675

Merged
merged 27 commits into from
Apr 19, 2024
Merged

Web Sockets for netty-loom #3675

merged 27 commits into from
Apr 19, 2024

Conversation

kciesielski
Copy link
Member

@kciesielski kciesielski commented Apr 12, 2024

Closes #3666

This PR adds support for Web Sockects in the tapir-netty-loom backend, using Ox for efficient and safe concurrency.
From the user's perspective, creating a WS endpoint requires defining a pipe function of type Ox ?=> Source[REQ] => Source[RESP], where Source is ox.channels.Source.
For example:

import ox.*
import ox.channels.*
import sttp.tapir.*
import sttp.tapir.server.netty.loom.Id
import sttp.tapir.server.netty.loom.OxStreams
import sttp.tapir.server.netty.loom.OxStreams.Pipe // alias for Ox ?=> Source[A] => Source[B]
import sttp.tapir.server.netty.loom.NettySyncServer
import sttp.ws.WebSocketFrame

import scala.concurrent.duration.*

object WebSocketsNettySyncServer:
  // Web socket endpoint
  val wsEndpoint =
    endpoint.get
      .in("ws") 
      .out(
        webSocketBody[String, CodecFormat.TextPlain, String, CodecFormat.TextPlain](OxStreams)
          .concatenateFragmentedFrames(false) // All these options are supported by tapir-netty
          .ignorePong(true)
          .autoPongOnPing(true)
          .decodeCloseRequests(false)
          .decodeCloseResponses(false)
          .autoPing(Some((10.seconds, WebSocketFrame.Ping("ping-content".getBytes))))
      )

  // Your processor transforming a stream of requests into a stream of responses
  val wsPipe: Pipe[String, String] = requestStream => requestStream.map(_.toUpperCase)
  // Alternatively, requests and responses can be treated separately, for example to emit frames to the client from another source: 
  val wsPipe2: Pipe[String, String] = { in =>
    fork {
      in.drain() // read and ignore requests
    }
    // emit periodic responses
    Source.tick(1.second).map(_ => System.currentTimeMillis()).map(_.toString)
  }

  // The WebSocket endpoint, builds the pipeline in serverLogicSuccess
  val wsServerEndpoint = wsEndpoint.serverLogicSuccess[Id](_ => wsPipe)

  // A regular /GET endpoint
  val helloWorldEndpoint: PublicEndpoint[String, Unit, String, Any] =
    endpoint.get.in("hello").in(query[String]("name")).out(stringBody)

  val helloWorldServerEndpoint = helloWorldEndpoint
    .serverLogicSuccess(name => s"Hello, $name!")

  def main(args: Array[String]): Unit = 
    NettySyncServer()
      .host("0.0.0.0")
      .port(8080)
      .addEndpoints(List(wsServerEndpoint, helloWorldServerEndpoint))
      .startAndWait()

The implementation has been tested with our performance tests and it's latency is really good comparing to other fast WS-supporting backends. The tests were run with all bells and whistles enabled (frame concatenation, very frequent auto ping - 40ms, auto pong on ping, etc).

image

@kciesielski
Copy link
Member Author

@adamw Can I get a pre-review? The implementation passes all our WS tests, has great latency in perf tests (a few ms on p99.999 on my machine, comparing to 10-20 for other backends). There's no documentation and little comments, probably some missing error handling? Would be great to get some comments 🙇

@kciesielski kciesielski requested a review from adamw April 16, 2024 17:46
@kciesielski kciesielski marked this pull request as ready for review April 18, 2024 12:06
@kciesielski kciesielski requested a review from adamw April 18, 2024 12:09
@kciesielski kciesielski changed the title [WIP] Web Sockets for netty-loom Web Sockets for netty-loom Apr 18, 2024
@kciesielski
Copy link
Member Author

@adamw PR no longer in draft mode, re-requesting a review :)

Copy link
Member

@adamw adamw left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nicely done :)

Btw. I presume the commented out tests pass as well?

@kciesielski
Copy link
Member Author

kciesielski commented Apr 18, 2024

I checked one of them separately already (failing pipe), and the other tests should be rather related to shared netty features (ping/pong stuff). However, I will merge them today (#3676) because I finally found the one causing flakiness, so this will give a final confirmation.\

Update: test merged, and pass

@kciesielski kciesielski added enhancement New feature or request Netty labels Apr 18, 2024
@kciesielski kciesielski merged commit 1cf911d into master Apr 19, 2024
28 checks passed
@kciesielski kciesielski deleted the 3666-netty-loom-ws branch April 19, 2024 08:25
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request Netty
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Web Sockets for netty-server-loom
2 participants