From 21788be3cb9785dfa92c917c66f223f0d4003692 Mon Sep 17 00:00:00 2001 From: kciesielski Date: Wed, 27 Mar 2024 01:36:57 +0100 Subject: [PATCH] Parameterize tests --- .../server/akkahttp/AkkaHttpServerTest.scala | 2 +- .../server/http4s/Http4sServerTest.scala | 2 +- .../http4s/ztapir/ZHttp4sServerTest.scala | 2 +- .../netty/cats/NettyCatsServerTest.scala | 2 +- .../WebSocketControlFrameHandler.scala | 6 +- .../pekkohttp/PekkoHttpServerTest.scala | 2 +- .../tapir/server/play/PlayServerTest.scala | 2 +- .../tapir/server/play/PlayServerTest.scala | 2 +- .../server/tests/ServerWebSocketTests.scala | 175 ++++++++++++------ .../vertx/cats/CatsVertxServerTest.scala | 2 +- .../tapir/server/vertx/VertxServerTest.scala | 4 +- .../server/vertx/zio/ZioVertxServerTest.scala | 2 +- .../server/ziohttp/ZioHttpServerTest.scala | 2 +- 13 files changed, 132 insertions(+), 73 deletions(-) diff --git a/server/akka-http-server/src/test/scala/sttp/tapir/server/akkahttp/AkkaHttpServerTest.scala b/server/akka-http-server/src/test/scala/sttp/tapir/server/akkahttp/AkkaHttpServerTest.scala index 3cb5e63b73..49e0fc2aaa 100644 --- a/server/akka-http-server/src/test/scala/sttp/tapir/server/akkahttp/AkkaHttpServerTest.scala +++ b/server/akka-http-server/src/test/scala/sttp/tapir/server/akkahttp/AkkaHttpServerTest.scala @@ -157,7 +157,7 @@ class AkkaHttpServerTest extends TestSuite with EitherValues { new AllServerTests(createServerTest, interpreter, backend).tests() ++ new ServerStreamingTests(createServerTest).tests(AkkaStreams)(drainAkka) ++ - new ServerWebSocketTests(createServerTest, AkkaStreams) { + new ServerWebSocketTests(createServerTest, AkkaStreams, autoPing = false, failingPipe = true, handlePong = false) { override def functionToPipe[A, B](f: A => B): streams.Pipe[A, B] = Flow.fromFunction(f) override def emptyPipe[A, B]: Flow[A, B, Any] = Flow.fromSinkAndSource(Sink.ignore, Source.empty) }.tests() ++ diff --git a/server/http4s-server/src/test/scala/sttp/tapir/server/http4s/Http4sServerTest.scala b/server/http4s-server/src/test/scala/sttp/tapir/server/http4s/Http4sServerTest.scala index 745ec69f68..530aad401f 100644 --- a/server/http4s-server/src/test/scala/sttp/tapir/server/http4s/Http4sServerTest.scala +++ b/server/http4s-server/src/test/scala/sttp/tapir/server/http4s/Http4sServerTest.scala @@ -139,7 +139,7 @@ class Http4sServerTest[R >: Fs2Streams[IO] with WebSockets] extends TestSuite wi new AllServerTests(createServerTest, interpreter, backend).tests() ++ new ServerStreamingTests(createServerTest).tests(Fs2Streams[IO])(drainFs2) ++ - new ServerWebSocketTests(createServerTest, Fs2Streams[IO]) { + new ServerWebSocketTests(createServerTest, Fs2Streams[IO], autoPing = true, failingPipe = true, handlePong = false) { override def functionToPipe[A, B](f: A => B): streams.Pipe[A, B] = in => in.map(f) override def emptyPipe[A, B]: Pipe[IO, A, B] = _ => fs2.Stream.empty }.tests() ++ diff --git a/server/http4s-server/zio/src/test/scala/sttp/tapir/server/http4s/ztapir/ZHttp4sServerTest.scala b/server/http4s-server/zio/src/test/scala/sttp/tapir/server/http4s/ztapir/ZHttp4sServerTest.scala index 99ff7c2d4d..b2cfc5ae58 100644 --- a/server/http4s-server/zio/src/test/scala/sttp/tapir/server/http4s/ztapir/ZHttp4sServerTest.scala +++ b/server/http4s-server/zio/src/test/scala/sttp/tapir/server/http4s/ztapir/ZHttp4sServerTest.scala @@ -55,7 +55,7 @@ class ZHttp4sServerTest extends TestSuite with OptionValues { new AllServerTests(createServerTest, interpreter, backend).tests() ++ new ServerStreamingTests(createServerTest).tests(ZioStreams)(drainZStream) ++ - new ServerWebSocketTests(createServerTest, ZioStreams) { + new ServerWebSocketTests(createServerTest, ZioStreams, autoPing = true, failingPipe = false, handlePong = false) { override def functionToPipe[A, B](f: A => B): streams.Pipe[A, B] = in => in.map(f) override def emptyPipe[A, B]: streams.Pipe[A, B] = _ => ZStream.empty }.tests() ++ diff --git a/server/netty-server/cats/src/test/scala/sttp/tapir/server/netty/cats/NettyCatsServerTest.scala b/server/netty-server/cats/src/test/scala/sttp/tapir/server/netty/cats/NettyCatsServerTest.scala index e3252a25a1..49dfc5d479 100644 --- a/server/netty-server/cats/src/test/scala/sttp/tapir/server/netty/cats/NettyCatsServerTest.scala +++ b/server/netty-server/cats/src/test/scala/sttp/tapir/server/netty/cats/NettyCatsServerTest.scala @@ -41,7 +41,7 @@ class NettyCatsServerTest extends TestSuite with EitherValues { new ServerCancellationTests(createServerTest)(m, IO.asyncForIO).tests() ++ new NettyFs2StreamingCancellationTest(createServerTest).tests() ++ new ServerGracefulShutdownTests(createServerTest, ioSleeper).tests() ++ - new ServerWebSocketTests(createServerTest, Fs2Streams[IO]) { + new ServerWebSocketTests(createServerTest, Fs2Streams[IO], autoPing = true, failingPipe = true, handlePong = true) { override def functionToPipe[A, B](f: A => B): streams.Pipe[A, B] = in => in.map(f) override def emptyPipe[A, B]: fs2.Pipe[IO, A, B] = _ => fs2.Stream.empty }.tests() diff --git a/server/netty-server/src/main/scala/sttp/tapir/server/netty/internal/WebSocketControlFrameHandler.scala b/server/netty-server/src/main/scala/sttp/tapir/server/netty/internal/WebSocketControlFrameHandler.scala index ffdb7bbe06..cd7e6ca919 100644 --- a/server/netty-server/src/main/scala/sttp/tapir/server/netty/internal/WebSocketControlFrameHandler.scala +++ b/server/netty-server/src/main/scala/sttp/tapir/server/netty/internal/WebSocketControlFrameHandler.scala @@ -17,8 +17,10 @@ class NettyControlFrameHandler(ignorePong: Boolean, autoPongOnPing: Boolean, dec if (autoPongOnPing) { val _ = ctx.writeAndFlush(new PongWebSocketFrame(ping.content().retain())) } - case pong: PongWebSocketFrame if !ignorePong => - val _ = ctx.fireChannelRead(pong) + case pong: PongWebSocketFrame => + if (!ignorePong) { + val _ = ctx.fireChannelRead(pong) + } case close: CloseWebSocketFrame => if (decodeCloseRequests) { // Passing the Close frame for further processing diff --git a/server/pekko-http-server/src/test/scala/sttp/tapir/server/pekkohttp/PekkoHttpServerTest.scala b/server/pekko-http-server/src/test/scala/sttp/tapir/server/pekkohttp/PekkoHttpServerTest.scala index 7f15eecf29..a48dc86063 100644 --- a/server/pekko-http-server/src/test/scala/sttp/tapir/server/pekkohttp/PekkoHttpServerTest.scala +++ b/server/pekko-http-server/src/test/scala/sttp/tapir/server/pekkohttp/PekkoHttpServerTest.scala @@ -105,7 +105,7 @@ class PekkoHttpServerTest extends TestSuite with EitherValues { new AllServerTests(createServerTest, interpreter, backend).tests() ++ new ServerStreamingTests(createServerTest).tests(PekkoStreams)(drainPekko) ++ - new ServerWebSocketTests(createServerTest, PekkoStreams) { + new ServerWebSocketTests(createServerTest, PekkoStreams, autoPing = false, failingPipe = true, handlePong = false) { override def functionToPipe[A, B](f: A => B): streams.Pipe[A, B] = Flow.fromFunction(f) override def emptyPipe[A, B]: Flow[A, B, Any] = Flow.fromSinkAndSource(Sink.ignore, Source.empty) }.tests() ++ diff --git a/server/play-server/src/test/scala/sttp/tapir/server/play/PlayServerTest.scala b/server/play-server/src/test/scala/sttp/tapir/server/play/PlayServerTest.scala index 3662a63fd3..af479d230e 100644 --- a/server/play-server/src/test/scala/sttp/tapir/server/play/PlayServerTest.scala +++ b/server/play-server/src/test/scala/sttp/tapir/server/play/PlayServerTest.scala @@ -122,7 +122,7 @@ class PlayServerTest extends TestSuite { ).tests() ++ new ServerStreamingTests(createServerTest).tests(PekkoStreams)(drainPekko) ++ new PlayServerWithContextTest(backend).tests() ++ - new ServerWebSocketTests(createServerTest, PekkoStreams) { + new ServerWebSocketTests(createServerTest, PekkoStreams, autoPing = false, failingPipe = true, handlePong = false) { override def functionToPipe[A, B](f: A => B): streams.Pipe[A, B] = Flow.fromFunction(f) override def emptyPipe[A, B]: Flow[A, B, Any] = Flow.fromSinkAndSource(Sink.ignore, Source.empty) }.tests() ++ diff --git a/server/play29-server/src/test/scala/sttp/tapir/server/play/PlayServerTest.scala b/server/play29-server/src/test/scala/sttp/tapir/server/play/PlayServerTest.scala index 096b077f44..2634bc5fa9 100644 --- a/server/play29-server/src/test/scala/sttp/tapir/server/play/PlayServerTest.scala +++ b/server/play29-server/src/test/scala/sttp/tapir/server/play/PlayServerTest.scala @@ -117,7 +117,7 @@ class PlayServerTest extends TestSuite { new AllServerTests(createServerTest, interpreter, backend, basic = false, multipart = false, options = false).tests() ++ new ServerStreamingTests(createServerTest).tests(AkkaStreams)(drainAkka) ++ new PlayServerWithContextTest(backend).tests() ++ - new ServerWebSocketTests(createServerTest, AkkaStreams) { + new ServerWebSocketTests(createServerTest, AkkaStreams, autoPing = false, failingPipe = true, handlePong = false) { override def functionToPipe[A, B](f: A => B): streams.Pipe[A, B] = Flow.fromFunction(f) override def emptyPipe[A, B]: Flow[A, B, Any] = Flow.fromSinkAndSource(Sink.ignore, Source.empty) }.tests() ++ diff --git a/server/tests/src/main/scala/sttp/tapir/server/tests/ServerWebSocketTests.scala b/server/tests/src/main/scala/sttp/tapir/server/tests/ServerWebSocketTests.scala index 37dee6d201..7c5fac8d63 100644 --- a/server/tests/src/main/scala/sttp/tapir/server/tests/ServerWebSocketTests.scala +++ b/server/tests/src/main/scala/sttp/tapir/server/tests/ServerWebSocketTests.scala @@ -19,10 +19,14 @@ import sttp.tapir.tests.data.Fruit import sttp.ws.{WebSocket, WebSocketFrame} import scala.concurrent.duration._ +import sttp.tapir.model.UnsupportedWebSocketFrameException abstract class ServerWebSocketTests[F[_], S <: Streams[S], OPTIONS, ROUTE]( createServerTest: CreateServerTest[F, S with WebSockets, OPTIONS, ROUTE], - val streams: S + val streams: S, + autoPing: Boolean, + failingPipe: Boolean, + handlePong: Boolean )(implicit m: MonadError[F] ) extends EitherValues { @@ -123,61 +127,6 @@ abstract class ServerWebSocketTests[F[_], S <: Streams[S], OPTIONS, ROUTE]( ) ) }, - testServer( - endpoint.out(webSocketBody[String, CodecFormat.TextPlain, String, CodecFormat.TextPlain](streams)), - "failing pipe" - )((_: Unit) => - pureResult(functionToPipe[String, String] { - case "error-trigger" => throw new Exception("Boom!") - case msg => s"echo: $msg" - }.asRight[Unit]) - ) { (backend, baseUri) => - basicRequest - .response(asWebSocket { (ws: WebSocket[IO]) => - for { - _ <- ws.sendText("test1") - _ <- ws.sendText("test2") - _ <- ws.sendText("error-trigger") - m1 <- ws.eitherClose(ws.receiveText()) - m2 <- ws.eitherClose(ws.receiveText()) - m3 <- ws.eitherClose(ws.receiveText()) - } yield List(m1, m2, m3) - }) - .get(baseUri.scheme("ws")) - .send(backend) - .map { r => - val results = r.body.map(_.map(_.left.map(_.statusCode))).value - results.take(2) shouldBe - List(Right("echo: test1"), Right("echo: test2")) - val closeCode = results.last.left.value - assert(closeCode == 1000 || closeCode == 1011) // some servers respond with Close(normal), some with Close(error) - } - }, - testServer( - endpoint.out( - webSocketBody[String, CodecFormat.TextPlain, String, CodecFormat.TextPlain](streams) - .autoPing(Some((200.millis, WebSocketFrame.ping))) - ), - "auto ping" - )((_: Unit) => pureResult(stringEcho.asRight[Unit])) { (backend, baseUri) => - basicRequest - .response(asWebSocket { (ws: WebSocket[IO]) => - for { - _ <- ws.sendText("test1") - _ <- IO.sleep(250.millis) - _ <- ws.sendText("test2") - m1 <- ws.receive() - m2 <- ws.receive() - _ <- ws.sendText("test3") - m3 <- ws.receive() - } yield List(m1, m2, m3) - }) - .get(baseUri.scheme("ws")) - .send(backend) - .map((r: Response[Either[String, List[WebSocketFrame]]]) => - assert(r.body.value.exists(_.isInstanceOf[WebSocketFrame.Ping]), s"Missing Ping frame in WS responses: $r") - ) - }, testServer( endpoint.out( webSocketBody[String, CodecFormat.TextPlain, String, CodecFormat.TextPlain](streams) @@ -199,7 +148,10 @@ abstract class ServerWebSocketTests[F[_], S <: Streams[S], OPTIONS, ROUTE]( .get(baseUri.scheme("ws")) .send(backend) .map((r: Response[Either[String, List[WebSocketFrame]]]) => - assert(r.body.value.exists(_.isInstanceOf[WebSocketFrame.Pong]), s"Missing Pong frame in WS responses: $r") + assert(r.body.value exists { + case WebSocketFrame.Pong(array) => array sameElements "test-ping-text".getBytes + case _ => false + }, s"Missing Pong(test-ping-text) in ${r.body}") ) }, testServer( @@ -249,7 +201,112 @@ abstract class ServerWebSocketTests[F[_], S <: Streams[S], OPTIONS, ROUTE]( .send(backend) .map(_.body shouldBe Left("Not a WS!")) } - ) + ) ++ autoPingTests ++ failingPipeTests ++ handlePongTests + + val autoPingTests = + if (autoPing) + List( + testServer( + endpoint.out( + webSocketBody[String, CodecFormat.TextPlain, String, CodecFormat.TextPlain](streams) + .autoPing(Some((200.millis, WebSocketFrame.ping))) + ), + "auto ping" + )((_: Unit) => pureResult(stringEcho.asRight[Unit])) { (backend, baseUri) => + basicRequest + .response(asWebSocket { (ws: WebSocket[IO]) => + for { + _ <- ws.sendText("test1") + _ <- IO.sleep(250.millis) + _ <- ws.sendText("test2") + m1 <- ws.receive() + m2 <- ws.receive() + _ <- ws.sendText("test3") + m3 <- ws.receive() + } yield List(m1, m2, m3) + }) + .get(baseUri.scheme("ws")) + .send(backend) + .map((r: Response[Either[String, List[WebSocketFrame]]]) => + assert(r.body.value.exists(_.isInstanceOf[WebSocketFrame.Ping]), s"Missing Ping frame in WS responses: $r") + ) + } + ) + else List.empty + + val failingPipeTests = + if (failingPipe) + List( + testServer( + endpoint.out(webSocketBody[String, CodecFormat.TextPlain, String, CodecFormat.TextPlain](streams)), + "failing pipe" + )((_: Unit) => + pureResult(functionToPipe[String, String] { + case "error-trigger" => throw new Exception("Boom!") + case msg => s"echo: $msg" + }.asRight[Unit]) + ) { (backend, baseUri) => + basicRequest + .response(asWebSocket { (ws: WebSocket[IO]) => + for { + _ <- ws.sendText("test1") + _ <- ws.sendText("test2") + _ <- ws.sendText("error-trigger") + m1 <- ws.eitherClose(ws.receiveText()) + m2 <- ws.eitherClose(ws.receiveText()) + m3 <- ws.eitherClose(ws.receiveText()) + } yield List(m1, m2, m3) + }) + .get(baseUri.scheme("ws")) + .send(backend) + .map { r => + val results = r.body.map(_.map(_.left.map(_.statusCode))).value + results.take(2) shouldBe + List(Right("echo: test1"), Right("echo: test2")) + val closeCode = results.last.left.value + assert(closeCode == 1000 || closeCode == 1011) // some servers respond with Close(normal), some with Close(error) + } + } + ) + else List.empty - // TODO: tests for ping/pong (control frames handling) + val handlePongTests = if (handlePong) List( + testServer( + { + implicit def textOrPongWebSocketFrame[A, CF <: CodecFormat](implicit + stringCodec: Codec[String, A, CF] + ): Codec[WebSocketFrame, A, CF] = + Codec // A custom codec to handle Pongs + .id[WebSocketFrame, CF](stringCodec.format, Schema.string) + .mapDecode { + case WebSocketFrame.Text(p, _, _) => stringCodec.decode(p) + case WebSocketFrame.Pong(payload) => + println(payload.length) + stringCodec.decode(new String(payload)) + case f => DecodeResult.Error(f.toString, new UnsupportedWebSocketFrameException(f)) + }(a => WebSocketFrame.text(stringCodec.encode(a))) + .schema(stringCodec.schema) + + endpoint.out( + webSocketBody[String, CodecFormat.TextPlain, String, CodecFormat.TextPlain](streams) + .autoPing(None) + .ignorePong(false) + ) + }, + "not ignore pong" + )((_: Unit) => pureResult(stringEcho.asRight[Unit])) { (backend, baseUri) => + basicRequest + .response(asWebSocket { (ws: WebSocket[IO]) => + for { + _ <- ws.sendText("test1") + _ <- ws.send(WebSocketFrame.Pong("test-pong-text".getBytes())) + m1 <- ws.receiveText() + _ <- ws.sendText("test2") + m2 <- ws.receiveText() + } yield List(m1, m2) + }) + .get(baseUri.scheme("ws")) + .send(backend) + .map(_.body shouldBe Right(List("echo: test1", "echo: test-pong-text"))) + }) else List.empty } diff --git a/server/vertx-server/cats/src/test/scala/sttp/tapir/server/vertx/cats/CatsVertxServerTest.scala b/server/vertx-server/cats/src/test/scala/sttp/tapir/server/vertx/cats/CatsVertxServerTest.scala index 9a96f67fd1..aac2e40122 100644 --- a/server/vertx-server/cats/src/test/scala/sttp/tapir/server/vertx/cats/CatsVertxServerTest.scala +++ b/server/vertx-server/cats/src/test/scala/sttp/tapir/server/vertx/cats/CatsVertxServerTest.scala @@ -37,7 +37,7 @@ class CatsVertxServerTest extends TestSuite { partOtherHeaderSupport = false ).tests() ++ new ServerStreamingTests(createServerTest).tests(Fs2Streams.apply[IO])(drainFs2) ++ - new ServerWebSocketTests(createServerTest, Fs2Streams.apply[IO]) { + new ServerWebSocketTests(createServerTest, Fs2Streams.apply[IO], autoPing = false, failingPipe = true, handlePong = true) { override def functionToPipe[A, B](f: A => B): streams.Pipe[A, B] = in => in.map(f) override def emptyPipe[A, B]: streams.Pipe[A, B] = _ => Stream.empty }.tests() diff --git a/server/vertx-server/src/test/scala/sttp/tapir/server/vertx/VertxServerTest.scala b/server/vertx-server/src/test/scala/sttp/tapir/server/vertx/VertxServerTest.scala index 8a7a8f57ce..394a8d22a2 100644 --- a/server/vertx-server/src/test/scala/sttp/tapir/server/vertx/VertxServerTest.scala +++ b/server/vertx-server/src/test/scala/sttp/tapir/server/vertx/VertxServerTest.scala @@ -27,7 +27,7 @@ class VertxServerTest extends TestSuite { def drainVertx[T](source: ReadStream[T]): Future[Unit] = { val p = Promise[Unit]() // Handler for stream data - do nothing with the data - val dataHandler: Handler[T] = (_: T) => () + val dataHandler: Handler[T] = (_: T) => () // End handler - complete the promise when the stream ends val endHandler: Handler[Void] = (_: Void) => p.success(()) @@ -53,7 +53,7 @@ class VertxServerTest extends TestSuite { partContentTypeHeaderSupport = true, partOtherHeaderSupport = false ).tests() ++ new ServerStreamingTests(createServerTest).tests(VertxStreams)(drainVertx[Buffer]) ++ - (new ServerWebSocketTests(createServerTest, VertxStreams) { + (new ServerWebSocketTests(createServerTest, VertxStreams, autoPing = false, failingPipe = false, handlePong = true) { override def functionToPipe[A, B](f: A => B): VertxStreams.Pipe[A, B] = in => new ReadStreamMapping(in, f) override def emptyPipe[A, B]: VertxStreams.Pipe[A, B] = _ => new EmptyReadStream() }).tests() diff --git a/server/vertx-server/zio/src/test/scala/sttp/tapir/server/vertx/zio/ZioVertxServerTest.scala b/server/vertx-server/zio/src/test/scala/sttp/tapir/server/vertx/zio/ZioVertxServerTest.scala index 0a8c27f4e4..f2ca8846c5 100644 --- a/server/vertx-server/zio/src/test/scala/sttp/tapir/server/vertx/zio/ZioVertxServerTest.scala +++ b/server/vertx-server/zio/src/test/scala/sttp/tapir/server/vertx/zio/ZioVertxServerTest.scala @@ -43,7 +43,7 @@ class ZioVertxServerTest extends TestSuite with OptionValues { partOtherHeaderSupport = false ).tests() ++ additionalTests() ++ new ServerStreamingTests(createServerTest).tests(ZioStreams)(drainZStream) ++ - new ServerWebSocketTests(createServerTest, ZioStreams) { + new ServerWebSocketTests(createServerTest, ZioStreams, autoPing = true, failingPipe = true, handlePong = true) { override def functionToPipe[A, B](f: A => B): streams.Pipe[A, B] = in => in.map(f) override def emptyPipe[A, B]: streams.Pipe[A, B] = _ => ZStream.empty }.tests() diff --git a/server/zio-http-server/src/test/scala/sttp/tapir/server/ziohttp/ZioHttpServerTest.scala b/server/zio-http-server/src/test/scala/sttp/tapir/server/ziohttp/ZioHttpServerTest.scala index 828290d902..7e6043166a 100644 --- a/server/zio-http-server/src/test/scala/sttp/tapir/server/ziohttp/ZioHttpServerTest.scala +++ b/server/zio-http-server/src/test/scala/sttp/tapir/server/ziohttp/ZioHttpServerTest.scala @@ -266,7 +266,7 @@ class ZioHttpServerTest extends TestSuite { ).tests() ++ new ServerStreamingTests(createServerTest).tests(ZioStreams)(drainZStream) ++ new ZioHttpCompositionTest(createServerTest).tests() ++ - new ServerWebSocketTests(createServerTest, ZioStreams) { + new ServerWebSocketTests(createServerTest, ZioStreams, autoPing = true, failingPipe = false, handlePong = false) { override def functionToPipe[A, B](f: A => B): ZioStreams.Pipe[A, B] = in => in.map(f) override def emptyPipe[A, B]: ZioStreams.Pipe[A, B] = _ => ZStream.empty }.tests() ++