From 80a13133316bcb0612a42cb577dad3af69167700 Mon Sep 17 00:00:00 2001 From: Kamil Kloch Date: Fri, 24 Nov 2023 12:45:27 +0100 Subject: [PATCH 1/3] Improve Http4sWebSockets.pipeToBody --- .../server/http4s/Http4sWebSockets.scala | 52 +++++++++---------- 1 file changed, 25 insertions(+), 27 deletions(-) diff --git a/server/http4s-server/src/main/scala/sttp/tapir/server/http4s/Http4sWebSockets.scala b/server/http4s-server/src/main/scala/sttp/tapir/server/http4s/Http4sWebSockets.scala index 41aca09379..b152429f95 100644 --- a/server/http4s-server/src/main/scala/sttp/tapir/server/http4s/Http4sWebSockets.scala +++ b/server/http4s-server/src/main/scala/sttp/tapir/server/http4s/Http4sWebSockets.scala @@ -17,7 +17,7 @@ private[http4s] object Http4sWebSockets { pipe: Pipe[F, REQ, RESP], o: WebSocketBodyOutput[Pipe[F, REQ, RESP], REQ, RESP, _, Fs2Streams[F]] ): F[Pipe[F, Http4sWebSocketFrame, Http4sWebSocketFrame]] = { - Queue.bounded[F, WebSocketFrame](1).map { pongs => (in: Stream[F, Http4sWebSocketFrame]) => + Queue.bounded[F, WebSocketFrame](2).map { pongs => (in: Stream[F, Http4sWebSocketFrame]) => val sttpFrames = in.map(http4sFrameToFrame) val concatenated = optionallyConcatenateFrames(sttpFrames, o.concatenateFragmentedFrames) val ignorePongs = optionallyIgnorePong(concatenated, o.ignorePong) @@ -28,15 +28,16 @@ private[http4s] object Http4sWebSockets { } (autoPongs - .map { - case _: WebSocketFrame.Close if !o.decodeCloseRequests => None - case f => - o.requests.decode(f) match { - case failure: DecodeResult.Failure => throw new WebSocketFrameDecodeFailure(f, failure) - case DecodeResult.Value(v) => Some(v) - } + .takeWhile { + case _: WebSocketFrame.Close if !o.decodeCloseRequests => false + case _ => true + } + .map { f => + o.requests.decode(f) match { + case x: DecodeResult.Value[_] => x.v + case failure: DecodeResult.Failure => throw new WebSocketFrameDecodeFailure(f, failure) + } } - .unNoneTerminate .through(pipe) .map(o.responses.encode) .mergeHaltL(Stream.repeatEval(pongs.take)) @@ -47,22 +48,21 @@ private[http4s] object Http4sWebSockets { private def http4sFrameToFrame(f: Http4sWebSocketFrame): WebSocketFrame = f match { - case t: Http4sWebSocketFrame.Text => WebSocketFrame.Text(t.str, t.last, None) - case Http4sWebSocketFrame.Ping(data) => WebSocketFrame.Ping(data.toArray) - case Http4sWebSocketFrame.Pong(data) => WebSocketFrame.Pong(data.toArray) - case c: Http4sWebSocketFrame.Close => WebSocketFrame.Close(c.closeCode, "") - case _ => WebSocketFrame.Binary(f.data.toArray, f.last, None) + case t: Http4sWebSocketFrame.Text => WebSocketFrame.Text(t.str, t.last, None) + case x: Http4sWebSocketFrame.Ping => WebSocketFrame.Ping(x.data.toArray) + case x: Http4sWebSocketFrame.Pong => WebSocketFrame.Pong(x.data.toArray) + case c: Http4sWebSocketFrame.Close => WebSocketFrame.Close(c.closeCode, "") + case _ => WebSocketFrame.Binary(f.data.toArray, f.last, None) } - private def frameToHttp4sFrame(w: WebSocketFrame): Http4sWebSocketFrame = { + private def frameToHttp4sFrame(w: WebSocketFrame): Http4sWebSocketFrame = w match { - case WebSocketFrame.Text(p, finalFragment, _) => Http4sWebSocketFrame.Text(p, finalFragment) - case WebSocketFrame.Binary(p, finalFragment, _) => Http4sWebSocketFrame.Binary(ByteVector(p), finalFragment) - case WebSocketFrame.Ping(p) => Http4sWebSocketFrame.Ping(ByteVector(p)) - case WebSocketFrame.Pong(p) => Http4sWebSocketFrame.Pong(ByteVector(p)) - case WebSocketFrame.Close(code, reason) => Http4sWebSocketFrame.Close(code, reason).fold(throw _, identity) + case x: WebSocketFrame.Text => Http4sWebSocketFrame.Text(x.payload, x.finalFragment) + case x: WebSocketFrame.Binary => Http4sWebSocketFrame.Binary(ByteVector(x.payload), x.finalFragment) + case x: WebSocketFrame.Ping => Http4sWebSocketFrame.Ping(ByteVector(x.payload)) + case x: WebSocketFrame.Pong => Http4sWebSocketFrame.Pong(ByteVector(x.payload)) + case x: WebSocketFrame.Close => Http4sWebSocketFrame.Close(x.statusCode, x.reasonText).fold(throw _, identity) } - } private def optionallyConcatenateFrames[F[_]](s: Stream[F, WebSocketFrame], doConcatenate: Boolean): Stream[F, WebSocketFrame] = { if (doConcatenate) { @@ -87,7 +87,7 @@ private[http4s] object Http4sWebSockets { private def optionallyIgnorePong[F[_]](s: Stream[F, WebSocketFrame], doIgnore: Boolean): Stream[F, WebSocketFrame] = { if (doIgnore) { s.filter { - case WebSocketFrame.Pong(_) => false + case _: WebSocketFrame.Pong => false case _ => true } } else s @@ -99,11 +99,9 @@ private[http4s] object Http4sWebSockets { doAuto: Boolean ): Stream[F, WebSocketFrame] = { if (doAuto) { - s.evalMap { - case WebSocketFrame.Ping(payload) => pongs.offer(WebSocketFrame.Pong(payload)).map(_ => none[WebSocketFrame]) - case f => f.some.pure[F] - }.collect { case Some(f) => - f + s.evalMapFilter { + case ping: WebSocketFrame.Ping => pongs.offer(WebSocketFrame.Pong(ping.payload)).as(None) + case f => f.some.pure[F] } } else s } From c885d857fbd2a7333a68ea93781191fe825058fc Mon Sep 17 00:00:00 2001 From: Kamil Kloch Date: Fri, 24 Nov 2023 13:01:31 +0100 Subject: [PATCH 2/3] Fix compilation error. --- .../scala/sttp/tapir/server/http4s/Http4sWebSockets.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/server/http4s-server/src/main/scala/sttp/tapir/server/http4s/Http4sWebSockets.scala b/server/http4s-server/src/main/scala/sttp/tapir/server/http4s/Http4sWebSockets.scala index b152429f95..fe8d37c6b4 100644 --- a/server/http4s-server/src/main/scala/sttp/tapir/server/http4s/Http4sWebSockets.scala +++ b/server/http4s-server/src/main/scala/sttp/tapir/server/http4s/Http4sWebSockets.scala @@ -34,7 +34,7 @@ private[http4s] object Http4sWebSockets { } .map { f => o.requests.decode(f) match { - case x: DecodeResult.Value[_] => x.v + case x: DecodeResult.Value[REQ] => x.v case failure: DecodeResult.Failure => throw new WebSocketFrameDecodeFailure(f, failure) } } @@ -100,7 +100,7 @@ private[http4s] object Http4sWebSockets { ): Stream[F, WebSocketFrame] = { if (doAuto) { s.evalMapFilter { - case ping: WebSocketFrame.Ping => pongs.offer(WebSocketFrame.Pong(ping.payload)).as(None) + case ping: WebSocketFrame.Ping => pongs.offer(WebSocketFrame.Pong(ping.payload)).as[Option[WebSocketFrame]](None) case f => f.some.pure[F] } } else s From 38597974d5ef780d553d55bd744f8d772e7962b0 Mon Sep 17 00:00:00 2001 From: Kamil Kloch Date: Fri, 24 Nov 2023 13:24:37 +0100 Subject: [PATCH 3/3] Improve Http4sWebSockets.pipeToBody. --- .../tapir/server/http4s/Http4sWebSockets.scala | 17 +++++++++++------ 1 file changed, 11 insertions(+), 6 deletions(-) diff --git a/server/http4s-server/src/main/scala/sttp/tapir/server/http4s/Http4sWebSockets.scala b/server/http4s-server/src/main/scala/sttp/tapir/server/http4s/Http4sWebSockets.scala index fe8d37c6b4..28886769bb 100644 --- a/server/http4s-server/src/main/scala/sttp/tapir/server/http4s/Http4sWebSockets.scala +++ b/server/http4s-server/src/main/scala/sttp/tapir/server/http4s/Http4sWebSockets.scala @@ -26,15 +26,12 @@ private[http4s] object Http4sWebSockets { case Some((interval, frame)) => Stream.awakeEvery[F](interval).map(_ => frame) case None => Stream.empty } + val decodeClose = optionallyDecodeClose(autoPongs, o.decodeCloseRequests) - (autoPongs - .takeWhile { - case _: WebSocketFrame.Close if !o.decodeCloseRequests => false - case _ => true - } + (decodeClose .map { f => o.requests.decode(f) match { - case x: DecodeResult.Value[REQ] => x.v + case x: DecodeResult.Value[REQ] => x.v case failure: DecodeResult.Failure => throw new WebSocketFrameDecodeFailure(f, failure) } } @@ -105,4 +102,12 @@ private[http4s] object Http4sWebSockets { } } else s } + + private def optionallyDecodeClose[F[_]](s: Stream[F, WebSocketFrame], doDecodeClose: Boolean): Stream[F, WebSocketFrame] = + if (!doDecodeClose) { + s.takeWhile { + case _: WebSocketFrame.Close => false + case _ => true + } + } else s }