Skip to content

Commit

Permalink
Parameterize tests
Browse files Browse the repository at this point in the history
  • Loading branch information
kciesielski committed Mar 27, 2024
1 parent b1a7293 commit 21788be
Show file tree
Hide file tree
Showing 13 changed files with 132 additions and 73 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ class AkkaHttpServerTest extends TestSuite with EitherValues {

new AllServerTests(createServerTest, interpreter, backend).tests() ++
new ServerStreamingTests(createServerTest).tests(AkkaStreams)(drainAkka) ++
new ServerWebSocketTests(createServerTest, AkkaStreams) {
new ServerWebSocketTests(createServerTest, AkkaStreams, autoPing = false, failingPipe = true, handlePong = false) {
override def functionToPipe[A, B](f: A => B): streams.Pipe[A, B] = Flow.fromFunction(f)
override def emptyPipe[A, B]: Flow[A, B, Any] = Flow.fromSinkAndSource(Sink.ignore, Source.empty)
}.tests() ++
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ class Http4sServerTest[R >: Fs2Streams[IO] with WebSockets] extends TestSuite wi

new AllServerTests(createServerTest, interpreter, backend).tests() ++
new ServerStreamingTests(createServerTest).tests(Fs2Streams[IO])(drainFs2) ++
new ServerWebSocketTests(createServerTest, Fs2Streams[IO]) {
new ServerWebSocketTests(createServerTest, Fs2Streams[IO], autoPing = true, failingPipe = true, handlePong = false) {
override def functionToPipe[A, B](f: A => B): streams.Pipe[A, B] = in => in.map(f)
override def emptyPipe[A, B]: Pipe[IO, A, B] = _ => fs2.Stream.empty
}.tests() ++
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ class ZHttp4sServerTest extends TestSuite with OptionValues {

new AllServerTests(createServerTest, interpreter, backend).tests() ++
new ServerStreamingTests(createServerTest).tests(ZioStreams)(drainZStream) ++
new ServerWebSocketTests(createServerTest, ZioStreams) {
new ServerWebSocketTests(createServerTest, ZioStreams, autoPing = true, failingPipe = false, handlePong = false) {
override def functionToPipe[A, B](f: A => B): streams.Pipe[A, B] = in => in.map(f)
override def emptyPipe[A, B]: streams.Pipe[A, B] = _ => ZStream.empty
}.tests() ++
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ class NettyCatsServerTest extends TestSuite with EitherValues {
new ServerCancellationTests(createServerTest)(m, IO.asyncForIO).tests() ++
new NettyFs2StreamingCancellationTest(createServerTest).tests() ++
new ServerGracefulShutdownTests(createServerTest, ioSleeper).tests() ++
new ServerWebSocketTests(createServerTest, Fs2Streams[IO]) {
new ServerWebSocketTests(createServerTest, Fs2Streams[IO], autoPing = true, failingPipe = true, handlePong = true) {
override def functionToPipe[A, B](f: A => B): streams.Pipe[A, B] = in => in.map(f)
override def emptyPipe[A, B]: fs2.Pipe[IO, A, B] = _ => fs2.Stream.empty
}.tests()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,10 @@ class NettyControlFrameHandler(ignorePong: Boolean, autoPongOnPing: Boolean, dec
if (autoPongOnPing) {
val _ = ctx.writeAndFlush(new PongWebSocketFrame(ping.content().retain()))
}
case pong: PongWebSocketFrame if !ignorePong =>
val _ = ctx.fireChannelRead(pong)
case pong: PongWebSocketFrame =>
if (!ignorePong) {
val _ = ctx.fireChannelRead(pong)
}
case close: CloseWebSocketFrame =>
if (decodeCloseRequests) {
// Passing the Close frame for further processing
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ class PekkoHttpServerTest extends TestSuite with EitherValues {

new AllServerTests(createServerTest, interpreter, backend).tests() ++
new ServerStreamingTests(createServerTest).tests(PekkoStreams)(drainPekko) ++
new ServerWebSocketTests(createServerTest, PekkoStreams) {
new ServerWebSocketTests(createServerTest, PekkoStreams, autoPing = false, failingPipe = true, handlePong = false) {
override def functionToPipe[A, B](f: A => B): streams.Pipe[A, B] = Flow.fromFunction(f)
override def emptyPipe[A, B]: Flow[A, B, Any] = Flow.fromSinkAndSource(Sink.ignore, Source.empty)
}.tests() ++
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ class PlayServerTest extends TestSuite {
).tests() ++
new ServerStreamingTests(createServerTest).tests(PekkoStreams)(drainPekko) ++
new PlayServerWithContextTest(backend).tests() ++
new ServerWebSocketTests(createServerTest, PekkoStreams) {
new ServerWebSocketTests(createServerTest, PekkoStreams, autoPing = false, failingPipe = true, handlePong = false) {
override def functionToPipe[A, B](f: A => B): streams.Pipe[A, B] = Flow.fromFunction(f)
override def emptyPipe[A, B]: Flow[A, B, Any] = Flow.fromSinkAndSource(Sink.ignore, Source.empty)
}.tests() ++
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ class PlayServerTest extends TestSuite {
new AllServerTests(createServerTest, interpreter, backend, basic = false, multipart = false, options = false).tests() ++
new ServerStreamingTests(createServerTest).tests(AkkaStreams)(drainAkka) ++
new PlayServerWithContextTest(backend).tests() ++
new ServerWebSocketTests(createServerTest, AkkaStreams) {
new ServerWebSocketTests(createServerTest, AkkaStreams, autoPing = false, failingPipe = true, handlePong = false) {
override def functionToPipe[A, B](f: A => B): streams.Pipe[A, B] = Flow.fromFunction(f)
override def emptyPipe[A, B]: Flow[A, B, Any] = Flow.fromSinkAndSource(Sink.ignore, Source.empty)
}.tests() ++
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,14 @@ import sttp.tapir.tests.data.Fruit
import sttp.ws.{WebSocket, WebSocketFrame}

import scala.concurrent.duration._
import sttp.tapir.model.UnsupportedWebSocketFrameException

abstract class ServerWebSocketTests[F[_], S <: Streams[S], OPTIONS, ROUTE](
createServerTest: CreateServerTest[F, S with WebSockets, OPTIONS, ROUTE],
val streams: S
val streams: S,
autoPing: Boolean,
failingPipe: Boolean,
handlePong: Boolean
)(implicit
m: MonadError[F]
) extends EitherValues {
Expand Down Expand Up @@ -123,61 +127,6 @@ abstract class ServerWebSocketTests[F[_], S <: Streams[S], OPTIONS, ROUTE](
)
)
},
testServer(
endpoint.out(webSocketBody[String, CodecFormat.TextPlain, String, CodecFormat.TextPlain](streams)),
"failing pipe"
)((_: Unit) =>
pureResult(functionToPipe[String, String] {
case "error-trigger" => throw new Exception("Boom!")
case msg => s"echo: $msg"
}.asRight[Unit])
) { (backend, baseUri) =>
basicRequest
.response(asWebSocket { (ws: WebSocket[IO]) =>
for {
_ <- ws.sendText("test1")
_ <- ws.sendText("test2")
_ <- ws.sendText("error-trigger")
m1 <- ws.eitherClose(ws.receiveText())
m2 <- ws.eitherClose(ws.receiveText())
m3 <- ws.eitherClose(ws.receiveText())
} yield List(m1, m2, m3)
})
.get(baseUri.scheme("ws"))
.send(backend)
.map { r =>
val results = r.body.map(_.map(_.left.map(_.statusCode))).value
results.take(2) shouldBe
List(Right("echo: test1"), Right("echo: test2"))
val closeCode = results.last.left.value
assert(closeCode == 1000 || closeCode == 1011) // some servers respond with Close(normal), some with Close(error)
}
},
testServer(
endpoint.out(
webSocketBody[String, CodecFormat.TextPlain, String, CodecFormat.TextPlain](streams)
.autoPing(Some((200.millis, WebSocketFrame.ping)))
),
"auto ping"
)((_: Unit) => pureResult(stringEcho.asRight[Unit])) { (backend, baseUri) =>
basicRequest
.response(asWebSocket { (ws: WebSocket[IO]) =>
for {
_ <- ws.sendText("test1")
_ <- IO.sleep(250.millis)
_ <- ws.sendText("test2")
m1 <- ws.receive()
m2 <- ws.receive()
_ <- ws.sendText("test3")
m3 <- ws.receive()
} yield List(m1, m2, m3)
})
.get(baseUri.scheme("ws"))
.send(backend)
.map((r: Response[Either[String, List[WebSocketFrame]]]) =>
assert(r.body.value.exists(_.isInstanceOf[WebSocketFrame.Ping]), s"Missing Ping frame in WS responses: $r")
)
},
testServer(
endpoint.out(
webSocketBody[String, CodecFormat.TextPlain, String, CodecFormat.TextPlain](streams)
Expand All @@ -199,7 +148,10 @@ abstract class ServerWebSocketTests[F[_], S <: Streams[S], OPTIONS, ROUTE](
.get(baseUri.scheme("ws"))
.send(backend)
.map((r: Response[Either[String, List[WebSocketFrame]]]) =>
assert(r.body.value.exists(_.isInstanceOf[WebSocketFrame.Pong]), s"Missing Pong frame in WS responses: $r")
assert(r.body.value exists {
case WebSocketFrame.Pong(array) => array sameElements "test-ping-text".getBytes
case _ => false
}, s"Missing Pong(test-ping-text) in ${r.body}")
)
},
testServer(
Expand Down Expand Up @@ -249,7 +201,112 @@ abstract class ServerWebSocketTests[F[_], S <: Streams[S], OPTIONS, ROUTE](
.send(backend)
.map(_.body shouldBe Left("Not a WS!"))
}
)
) ++ autoPingTests ++ failingPipeTests ++ handlePongTests

val autoPingTests =
if (autoPing)
List(
testServer(
endpoint.out(
webSocketBody[String, CodecFormat.TextPlain, String, CodecFormat.TextPlain](streams)
.autoPing(Some((200.millis, WebSocketFrame.ping)))
),
"auto ping"
)((_: Unit) => pureResult(stringEcho.asRight[Unit])) { (backend, baseUri) =>
basicRequest
.response(asWebSocket { (ws: WebSocket[IO]) =>
for {
_ <- ws.sendText("test1")
_ <- IO.sleep(250.millis)
_ <- ws.sendText("test2")
m1 <- ws.receive()
m2 <- ws.receive()
_ <- ws.sendText("test3")
m3 <- ws.receive()
} yield List(m1, m2, m3)
})
.get(baseUri.scheme("ws"))
.send(backend)
.map((r: Response[Either[String, List[WebSocketFrame]]]) =>
assert(r.body.value.exists(_.isInstanceOf[WebSocketFrame.Ping]), s"Missing Ping frame in WS responses: $r")
)
}
)
else List.empty

val failingPipeTests =
if (failingPipe)
List(
testServer(
endpoint.out(webSocketBody[String, CodecFormat.TextPlain, String, CodecFormat.TextPlain](streams)),
"failing pipe"
)((_: Unit) =>
pureResult(functionToPipe[String, String] {
case "error-trigger" => throw new Exception("Boom!")
case msg => s"echo: $msg"
}.asRight[Unit])
) { (backend, baseUri) =>
basicRequest
.response(asWebSocket { (ws: WebSocket[IO]) =>
for {
_ <- ws.sendText("test1")
_ <- ws.sendText("test2")
_ <- ws.sendText("error-trigger")
m1 <- ws.eitherClose(ws.receiveText())
m2 <- ws.eitherClose(ws.receiveText())
m3 <- ws.eitherClose(ws.receiveText())
} yield List(m1, m2, m3)
})
.get(baseUri.scheme("ws"))
.send(backend)
.map { r =>
val results = r.body.map(_.map(_.left.map(_.statusCode))).value
results.take(2) shouldBe
List(Right("echo: test1"), Right("echo: test2"))
val closeCode = results.last.left.value
assert(closeCode == 1000 || closeCode == 1011) // some servers respond with Close(normal), some with Close(error)
}
}
)
else List.empty

// TODO: tests for ping/pong (control frames handling)
val handlePongTests = if (handlePong) List(
testServer(
{
implicit def textOrPongWebSocketFrame[A, CF <: CodecFormat](implicit
stringCodec: Codec[String, A, CF]
): Codec[WebSocketFrame, A, CF] =
Codec // A custom codec to handle Pongs
.id[WebSocketFrame, CF](stringCodec.format, Schema.string)
.mapDecode {
case WebSocketFrame.Text(p, _, _) => stringCodec.decode(p)
case WebSocketFrame.Pong(payload) =>
println(payload.length)
stringCodec.decode(new String(payload))
case f => DecodeResult.Error(f.toString, new UnsupportedWebSocketFrameException(f))
}(a => WebSocketFrame.text(stringCodec.encode(a)))
.schema(stringCodec.schema)

endpoint.out(
webSocketBody[String, CodecFormat.TextPlain, String, CodecFormat.TextPlain](streams)
.autoPing(None)
.ignorePong(false)
)
},
"not ignore pong"
)((_: Unit) => pureResult(stringEcho.asRight[Unit])) { (backend, baseUri) =>
basicRequest
.response(asWebSocket { (ws: WebSocket[IO]) =>
for {
_ <- ws.sendText("test1")
_ <- ws.send(WebSocketFrame.Pong("test-pong-text".getBytes()))
m1 <- ws.receiveText()
_ <- ws.sendText("test2")
m2 <- ws.receiveText()
} yield List(m1, m2)
})
.get(baseUri.scheme("ws"))
.send(backend)
.map(_.body shouldBe Right(List("echo: test1", "echo: test-pong-text")))
}) else List.empty
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ class CatsVertxServerTest extends TestSuite {
partOtherHeaderSupport = false
).tests() ++
new ServerStreamingTests(createServerTest).tests(Fs2Streams.apply[IO])(drainFs2) ++
new ServerWebSocketTests(createServerTest, Fs2Streams.apply[IO]) {
new ServerWebSocketTests(createServerTest, Fs2Streams.apply[IO], autoPing = false, failingPipe = true, handlePong = true) {
override def functionToPipe[A, B](f: A => B): streams.Pipe[A, B] = in => in.map(f)
override def emptyPipe[A, B]: streams.Pipe[A, B] = _ => Stream.empty
}.tests()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ class VertxServerTest extends TestSuite {
def drainVertx[T](source: ReadStream[T]): Future[Unit] = {
val p = Promise[Unit]()
// Handler for stream data - do nothing with the data
val dataHandler: Handler[T] = (_: T) => ()
val dataHandler: Handler[T] = (_: T) => ()

// End handler - complete the promise when the stream ends
val endHandler: Handler[Void] = (_: Void) => p.success(())
Expand All @@ -53,7 +53,7 @@ class VertxServerTest extends TestSuite {
partContentTypeHeaderSupport = true,
partOtherHeaderSupport = false
).tests() ++ new ServerStreamingTests(createServerTest).tests(VertxStreams)(drainVertx[Buffer]) ++
(new ServerWebSocketTests(createServerTest, VertxStreams) {
(new ServerWebSocketTests(createServerTest, VertxStreams, autoPing = false, failingPipe = false, handlePong = true) {
override def functionToPipe[A, B](f: A => B): VertxStreams.Pipe[A, B] = in => new ReadStreamMapping(in, f)
override def emptyPipe[A, B]: VertxStreams.Pipe[A, B] = _ => new EmptyReadStream()
}).tests()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ class ZioVertxServerTest extends TestSuite with OptionValues {
partOtherHeaderSupport = false
).tests() ++ additionalTests() ++
new ServerStreamingTests(createServerTest).tests(ZioStreams)(drainZStream) ++
new ServerWebSocketTests(createServerTest, ZioStreams) {
new ServerWebSocketTests(createServerTest, ZioStreams, autoPing = true, failingPipe = true, handlePong = true) {
override def functionToPipe[A, B](f: A => B): streams.Pipe[A, B] = in => in.map(f)
override def emptyPipe[A, B]: streams.Pipe[A, B] = _ => ZStream.empty
}.tests()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,7 @@ class ZioHttpServerTest extends TestSuite {
).tests() ++
new ServerStreamingTests(createServerTest).tests(ZioStreams)(drainZStream) ++
new ZioHttpCompositionTest(createServerTest).tests() ++
new ServerWebSocketTests(createServerTest, ZioStreams) {
new ServerWebSocketTests(createServerTest, ZioStreams, autoPing = true, failingPipe = false, handlePong = false) {
override def functionToPipe[A, B](f: A => B): ZioStreams.Pipe[A, B] = in => in.map(f)
override def emptyPipe[A, B]: ZioStreams.Pipe[A, B] = _ => ZStream.empty
}.tests() ++
Expand Down

0 comments on commit 21788be

Please sign in to comment.