From f65ea657a50c65b050789427336e81848685faf7 Mon Sep 17 00:00:00 2001 From: Kamil Kloch Date: Wed, 13 Dec 2023 14:45:30 +0100 Subject: [PATCH] Updated Http4sWebSockets, removed `ignorePing` from WebSocketBodyOutput. --- .../main/scala/sttp/tapir/EndpointIO.scala | 151 ++++++++---------- core/src/main/scala/sttp/tapir/Tapir.scala | 83 +++++----- .../http4s/Http4sServerInterpreter.scala | 46 +++--- .../server/http4s/Http4sToResponseBody.scala | 23 ++- .../server/http4s/Http4sWebSockets.scala | 44 ++--- .../server/http4s/ztapir/ConvertStreams.scala | 11 +- 6 files changed, 165 insertions(+), 193 deletions(-) diff --git a/core/src/main/scala/sttp/tapir/EndpointIO.scala b/core/src/main/scala/sttp/tapir/EndpointIO.scala index 6ca9c78273..e0d1b119f5 100644 --- a/core/src/main/scala/sttp/tapir/EndpointIO.scala +++ b/core/src/main/scala/sttp/tapir/EndpointIO.scala @@ -240,11 +240,11 @@ object EndpointInput extends EndpointInputMacros { /** An input with authentication credentials metadata, used when generating documentation. */ case class Auth[T, TYPE <: AuthType]( - input: Single[T], - challenge: WWWAuthenticateChallenge, - authType: TYPE, - info: AuthInfo - ) extends Single[T] { + input: Single[T], + challenge: WWWAuthenticateChallenge, + authType: TYPE, + info: AuthInfo + ) extends Single[T] { override private[tapir] type ThisType[X] = Auth[X, TYPE] override def show: String = if (isInputEmpty) s"auth(-)" else @@ -285,11 +285,11 @@ object EndpointInput extends EndpointInputMacros { } case class ApiKey() extends AuthType case class OAuth2( - authorizationUrl: Option[String], - tokenUrl: Option[String], - scopes: ListMap[String, String], - refreshUrl: Option[String] - ) extends AuthType { + authorizationUrl: Option[String], + tokenUrl: Option[String], + scopes: ListMap[String, String], + refreshUrl: Option[String] + ) extends AuthType { def requiredScopes(requiredScopes: Seq[String]): ScopedOAuth2 = ScopedOAuth2(this, requiredScopes) } case class ScopedOAuth2(oauth2: OAuth2, requiredScopes: Seq[String]) extends AuthType { @@ -298,12 +298,12 @@ object EndpointInput extends EndpointInputMacros { } case class AuthInfo( - securitySchemeName: Option[String], - description: Option[String], - attributes: AttributeMap, - group: Option[String], - bearerFormat: Option[String] - ) { + securitySchemeName: Option[String], + description: Option[String], + attributes: AttributeMap, + group: Option[String], + bearerFormat: Option[String] + ) { def securitySchemeName(name: String): AuthInfo = copy(securitySchemeName = Some(name)) def description(d: String): AuthInfo = copy(description = Some(d)) def group(g: String): AuthInfo = copy(group = Some(g)) @@ -325,12 +325,12 @@ object EndpointInput extends EndpointInputMacros { } case class Pair[T, U, TU]( - left: EndpointInput[T], - right: EndpointInput[U], - private[tapir] val combine: CombineParams, - private[tapir] val split: SplitParams - ) extends EndpointInput[TU] - with EndpointTransput.Pair[TU] { + left: EndpointInput[T], + right: EndpointInput[U], + private[tapir] val combine: CombineParams, + private[tapir] val split: SplitParams + ) extends EndpointInput[TU] + with EndpointTransput.Pair[TU] { override private[tapir] type ThisType[X] = EndpointInput[X] override def map[V](m: Mapping[TU, V]): EndpointInput[V] = MappedPair[T, U, TU, V](this, m) } @@ -351,10 +351,10 @@ object EndpointOutput extends EndpointOutputMacros { // case class StatusCode[T]( - documentedCodes: Map[Either[sttp.model.StatusCode, StatusCodeRange], Info[Unit]], - codec: Codec[sttp.model.StatusCode, T, TextPlain], - info: Info[T] - ) extends Atom[T] { + documentedCodes: Map[Either[sttp.model.StatusCode, StatusCodeRange], Info[Unit]], + codec: Codec[sttp.model.StatusCode, T, TextPlain], + info: Info[T] + ) extends Atom[T] { override private[tapir] type ThisType[X] = StatusCode[X] override private[tapir] type L = sttp.model.StatusCode override private[tapir] type CF = TextPlain @@ -411,9 +411,9 @@ object EndpointOutput extends EndpointOutputMacros { * checking the run-time class of the value, due to type erasure (if `O` has type parameters). */ case class OneOfVariant[O] private[tapir] ( - output: EndpointOutput[O], - appliesTo: Any => Boolean - ) + output: EndpointOutput[O], + appliesTo: Any => Boolean + ) case class OneOf[O, T](variants: List[OneOfVariant[_ <: O]], mapping: Mapping[O, T]) extends Single[T] { override private[tapir] type ThisType[X] = OneOf[O, X] @@ -441,12 +441,12 @@ object EndpointOutput extends EndpointOutputMacros { } case class Pair[T, U, TU]( - left: EndpointOutput[T], - right: EndpointOutput[U], - private[tapir] val combine: CombineParams, - private[tapir] val split: SplitParams - ) extends EndpointOutput[TU] - with EndpointTransput.Pair[TU] { + left: EndpointOutput[T], + right: EndpointOutput[U], + private[tapir] val combine: CombineParams, + private[tapir] val split: SplitParams + ) extends EndpointOutput[TU] + with EndpointTransput.Pair[TU] { override private[tapir] type ThisType[X] = EndpointOutput[X] override def map[V](m: Mapping[TU, V]): EndpointOutput[V] = MappedPair[T, U, TU, V](this, m) } @@ -558,12 +558,12 @@ object EndpointIO { } case class Pair[T, U, TU]( - left: EndpointIO[T], - right: EndpointIO[U], - private[tapir] val combine: CombineParams, - private[tapir] val split: SplitParams - ) extends EndpointIO[TU] - with EndpointTransput.Pair[TU] { + left: EndpointIO[T], + right: EndpointIO[U], + private[tapir] val combine: CombineParams, + private[tapir] val split: SplitParams + ) extends EndpointIO[TU] + with EndpointTransput.Pair[TU] { override private[tapir] type ThisType[X] = EndpointIO[X] override def map[V](m: Mapping[TU, V]): EndpointIO[V] = MappedPair[T, U, TU, V](this, m) } @@ -585,11 +585,11 @@ object EndpointIO { Example(value, name, summary, description) def copy[TT]( - value: TT = this.value, - name: Option[String] = this.name, - summary: Option[String] = this.summary, - description: Option[String] = this.description - ): Example[TT] = Example(value, name, summary, description) + value: TT = this.value, + name: Option[String] = this.name, + summary: Option[String] = this.summary, + description: Option[String] = this.description + ): Example[TT] = Example(value, name, summary, description) } object Example { @@ -601,11 +601,11 @@ object EndpointIO { } case class Info[T]( - description: Option[String], - examples: List[Example[T]], - deprecated: Boolean, - attributes: AttributeMap - ) { + description: Option[String], + examples: List[Example[T]], + deprecated: Boolean, + attributes: AttributeMap + ) { def description(d: String): Info[T] = copy(description = Some(d)) def example: Option[T] = examples.headOption.map(_.value) def example(value: T): Info[T] = example(Example.of(value)) @@ -694,12 +694,12 @@ However, this decreases type safety, as the streaming requirement is lost. BS == streams.BinaryStream, but we can't express this using dependent types here. */ case class StreamBodyIO[BS, T, S]( - streams: Streams[S], - codec: Codec[BS, T, CodecFormat], - info: Info[T], - charset: Option[Charset], - encodedExamples: List[Example[Any]] -) extends EndpointTransput.Atom[T] { + streams: Streams[S], + codec: Codec[BS, T, CodecFormat], + info: Info[T], + charset: Option[Charset], + encodedExamples: List[Example[Any]] + ) extends EndpointTransput.Atom[T] { override private[tapir] type ThisType[X] = StreamBodyIO[BS, X, S] override private[tapir] type L = BS override private[tapir] type CF = CodecFormat @@ -728,21 +728,20 @@ Same rationale as for StreamBodyIO applies. P == streams.Pipe, but we can't express this using dependent types here. */ case class WebSocketBodyOutput[PIPE_REQ_RESP, REQ, RESP, T, S]( - streams: Streams[S], - requests: Codec[WebSocketFrame, REQ, CodecFormat], - responses: Codec[WebSocketFrame, RESP, CodecFormat], - codec: Codec[PIPE_REQ_RESP, T, CodecFormat], - info: Info[T], - requestsInfo: Info[REQ], - responsesInfo: Info[RESP], - concatenateFragmentedFrames: Boolean, - ignorePing: Boolean, - ignorePong: Boolean, - autoPongOnPing: Boolean, - decodeCloseRequests: Boolean, - decodeCloseResponses: Boolean, - autoPing: Option[(FiniteDuration, WebSocketFrame.Ping)] -) extends EndpointTransput.Atom[T] { + streams: Streams[S], + requests: Codec[WebSocketFrame, REQ, CodecFormat], + responses: Codec[WebSocketFrame, RESP, CodecFormat], + codec: Codec[PIPE_REQ_RESP, T, CodecFormat], + info: Info[T], + requestsInfo: Info[REQ], + responsesInfo: Info[RESP], + concatenateFragmentedFrames: Boolean, + ignorePong: Boolean, + autoPongOnPing: Boolean, + decodeCloseRequests: Boolean, + decodeCloseResponses: Boolean, + autoPing: Option[(FiniteDuration, WebSocketFrame.Ping)] + ) extends EndpointTransput.Atom[T] { override private[tapir] type ThisType[X] = WebSocketBodyOutput[PIPE_REQ_RESP, REQ, RESP, X, S] override private[tapir] type L = PIPE_REQ_RESP override private[tapir] type CF = CodecFormat @@ -775,14 +774,6 @@ case class WebSocketBodyOutput[PIPE_REQ_RESP, REQ, RESP, T, S]( def concatenateFragmentedFrames(c: Boolean): WebSocketBodyOutput[PIPE_REQ_RESP, REQ, RESP, T, S] = this.copy(concatenateFragmentedFrames = c) - /** Note: some interpreters ignore this setting. - * - * @param i - * If `true`, [[WebSocketFrame.Ping]] frames will be ignored and won't be passed to the codecs for decoding. Note that only some - * interpreters expose ping-pong frames. - */ - def ignorePing(i: Boolean): WebSocketBodyOutput[PIPE_REQ_RESP, REQ, RESP, T, S] = this.copy(ignorePing = i) - /** Note: some interpreters ignore this setting. * @param i * If `true`, [[WebSocketFrame.Pong]] frames will be ignored and won't be passed to the codecs for decoding. Note that only some diff --git a/core/src/main/scala/sttp/tapir/Tapir.scala b/core/src/main/scala/sttp/tapir/Tapir.scala index 55f3ee092d..dbdf74790d 100644 --- a/core/src/main/scala/sttp/tapir/Tapir.scala +++ b/core/src/main/scala/sttp/tapir/Tapir.scala @@ -133,8 +133,8 @@ trait Tapir extends TapirExtensions with TapirComputedInputs with TapirStaticCon * A supported streams implementation. */ def streamBinaryBody[S]( - s: Streams[S] - )(format: CodecFormat): StreamBodyIO[s.BinaryStream, s.BinaryStream, S] = + s: Streams[S] + )(format: CodecFormat): StreamBodyIO[s.BinaryStream, s.BinaryStream, S] = StreamBodyIO(s, Codec.id(format, Schema.binary), EndpointIO.Info.empty, None, Nil) /** Creates a stream body with a text schema. @@ -146,8 +146,8 @@ trait Tapir extends TapirExtensions with TapirComputedInputs with TapirStaticCon * An optional charset of the resulting stream's data, to be used in the content type. */ def streamTextBody[S]( - s: Streams[S] - )(format: CodecFormat, charset: Option[Charset] = None): StreamBodyIO[s.BinaryStream, s.BinaryStream, S] = + s: Streams[S] + )(format: CodecFormat, charset: Option[Charset] = None): StreamBodyIO[s.BinaryStream, s.BinaryStream, S] = StreamBodyIO(s, Codec.id(format, Schema.string), EndpointIO.Info.empty, charset, Nil) /** Creates a stream body with the given schema. @@ -161,18 +161,18 @@ trait Tapir extends TapirExtensions with TapirComputedInputs with TapirStaticCon * An optional charset of the resulting stream's data, to be used in the content type. */ def streamBody[S, T]( - s: Streams[S] - )(schema: Schema[T], format: CodecFormat, charset: Option[Charset] = None): StreamBodyIO[s.BinaryStream, s.BinaryStream, S] = + s: Streams[S] + )(schema: Schema[T], format: CodecFormat, charset: Option[Charset] = None): StreamBodyIO[s.BinaryStream, s.BinaryStream, S] = StreamBodyIO(s, Codec.id(format, schema.as[s.BinaryStream]), EndpointIO.Info.empty, charset, Nil) // the intermediate class is needed so that the S type parameter can be inferred final class WebSocketBodyBuilder[REQ, REQ_CF <: CodecFormat, RESP, RESP_CF <: CodecFormat] { def apply[S]( - s: Streams[S] - )(implicit - requests: Codec[WebSocketFrame, REQ, REQ_CF], - responses: Codec[WebSocketFrame, RESP, RESP_CF] - ): WebSocketBodyOutput[s.Pipe[REQ, RESP], REQ, RESP, s.Pipe[REQ, RESP], S] = + s: Streams[S] + )(implicit + requests: Codec[WebSocketFrame, REQ, REQ_CF], + responses: Codec[WebSocketFrame, RESP, RESP_CF] + ): WebSocketBodyOutput[s.Pipe[REQ, RESP], REQ, RESP, s.Pipe[REQ, RESP], S] = WebSocketBodyOutput( s, requests, @@ -182,7 +182,6 @@ trait Tapir extends TapirExtensions with TapirComputedInputs with TapirStaticCon EndpointIO.Info.empty, EndpointIO.Info.empty, concatenateFragmentedFrames = true, - ignorePing = false, ignorePong = true, autoPongOnPing = true, decodeCloseRequests = requests.schema.isOptional, @@ -203,8 +202,8 @@ trait Tapir extends TapirExtensions with TapirComputedInputs with TapirStaticCon def webSocketBody[REQ, REQ_CF <: CodecFormat, RESP, RESP_CF <: CodecFormat]: WebSocketBodyBuilder[REQ, REQ_CF, RESP, RESP_CF] = new WebSocketBodyBuilder[REQ, REQ_CF, RESP, RESP_CF] def webSocketBodyRaw[S]( - s: Streams[S] - ): WebSocketBodyOutput[s.Pipe[WebSocketFrame, WebSocketFrame], WebSocketFrame, WebSocketFrame, s.Pipe[ + s: Streams[S] + ): WebSocketBodyOutput[s.Pipe[WebSocketFrame, WebSocketFrame], WebSocketFrame, WebSocketFrame, s.Pipe[ WebSocketFrame, WebSocketFrame ], S] = @@ -297,9 +296,9 @@ trait Tapir extends TapirExtensions with TapirComputedInputs with TapirStaticCon * Should be used in [[oneOf]] output descriptions. */ def oneOfVariantClassMatcher[T]( - output: EndpointOutput[T], - runtimeClass: Class[_] - ): OneOfVariant[T] = { + output: EndpointOutput[T], + runtimeClass: Class[_] + ): OneOfVariant[T] = { // when used with a primitive type or Unit, the class tag will correspond to the primitive type, but at runtime // we'll get boxed values val rc = primitiveToBoxedClasses.getOrElse(runtimeClass, runtimeClass) @@ -312,17 +311,17 @@ trait Tapir extends TapirExtensions with TapirComputedInputs with TapirStaticCon * Should be used in [[oneOf]] output descriptions. */ def oneOfVariantClassMatcher[T]( - code: StatusCode, - output: EndpointOutput[T], - runtimeClass: Class[_] - ): OneOfVariant[T] = oneOfVariantClassMatcher(statusCode(code).and(output), runtimeClass) + code: StatusCode, + output: EndpointOutput[T], + runtimeClass: Class[_] + ): OneOfVariant[T] = oneOfVariantClassMatcher(statusCode(code).and(output), runtimeClass) /** Create a one-of-variant which uses `output` if the provided value (when interpreting as a server matches the `matcher` predicate). * * Should be used in [[oneOf]] output descriptions. */ def oneOfVariantValueMatcher[T](output: EndpointOutput[T])( - matcher: PartialFunction[Any, Boolean] + matcher: PartialFunction[Any, Boolean] ): OneOfVariant[T] = OneOfVariant(output, matcher.lift.andThen(_.getOrElse(false))) @@ -332,7 +331,7 @@ trait Tapir extends TapirExtensions with TapirComputedInputs with TapirStaticCon * Should be used in [[oneOf]] output descriptions. */ def oneOfVariantValueMatcher[T](code: StatusCode, output: EndpointOutput[T])( - matcher: PartialFunction[Any, Boolean] + matcher: PartialFunction[Any, Boolean] ): OneOfVariant[T] = OneOfVariant(statusCode(code).and(output), matcher.lift.andThen(_.getOrElse(false))) @@ -341,11 +340,11 @@ trait Tapir extends TapirExtensions with TapirComputedInputs with TapirStaticCon * Should be used in [[oneOf]] output descriptions. */ def oneOfVariantExactMatcher[T: ClassTag]( - output: EndpointOutput[T] - )( - firstExactValue: T, - rest: T* - ): OneOfVariant[T] = + output: EndpointOutput[T] + )( + firstExactValue: T, + rest: T* + ): OneOfVariant[T] = oneOfVariantValueMatcher(output)(exactMatch(rest.toSet + firstExactValue)) /** Create a one-of-variant which uses `output` if the provided value exactly matches one of the values provided in the second argument @@ -354,12 +353,12 @@ trait Tapir extends TapirExtensions with TapirComputedInputs with TapirStaticCon * Should be used in [[oneOf]] output descriptions. */ def oneOfVariantExactMatcher[T: ClassTag]( - code: StatusCode, - output: EndpointOutput[T] - )( - firstExactValue: T, - rest: T* - ): OneOfVariant[T] = + code: StatusCode, + output: EndpointOutput[T] + )( + firstExactValue: T, + rest: T* + ): OneOfVariant[T] = oneOfVariantValueMatcher(code, output)(exactMatch(rest.toSet + firstExactValue)) /** Create a one-of-variant which uses `output` if the provided value matches the target type, as checked by [[MatchType]]. Instances of @@ -424,9 +423,9 @@ trait Tapir extends TapirExtensions with TapirComputedInputs with TapirStaticCon * as specified by the body's codec. This is only used when choosing which body to decode. */ def oneOfBody[T]( - first: (ContentTypeRange, EndpointIO.Body[_, T]), - others: (ContentTypeRange, EndpointIO.Body[_, T])* - ): EndpointIO.OneOfBody[T, T] = + first: (ContentTypeRange, EndpointIO.Body[_, T]), + others: (ContentTypeRange, EndpointIO.Body[_, T])* + ): EndpointIO.OneOfBody[T, T] = EndpointIO.OneOfBody[T, T]((first +: others.toList).map { case (r, b) => EndpointIO.OneOfBodyVariant(r, Left(b)) }, Mapping.id) /** Streaming variant of [[oneOfBody]]. @@ -435,11 +434,11 @@ trait Tapir extends TapirExtensions with TapirComputedInputs with TapirStaticCon * as specified by the body's codec. This is only used when choosing which body to decode. */ def oneOfBody[T]( - first: (ContentTypeRange, EndpointIO.StreamBodyWrapper[_, T]), - // this is needed so that the signature is different from the previous method - second: (ContentTypeRange, EndpointIO.StreamBodyWrapper[_, T]), - others: (ContentTypeRange, EndpointIO.StreamBodyWrapper[_, T])* - ): EndpointIO.OneOfBody[T, T] = + first: (ContentTypeRange, EndpointIO.StreamBodyWrapper[_, T]), + // this is needed so that the signature is different from the previous method + second: (ContentTypeRange, EndpointIO.StreamBodyWrapper[_, T]), + others: (ContentTypeRange, EndpointIO.StreamBodyWrapper[_, T])* + ): EndpointIO.OneOfBody[T, T] = EndpointIO.OneOfBody[T, T]( (first +: second +: others.toList).map { case (r, b) => EndpointIO.OneOfBodyVariant(r, Right(b)) }, Mapping.id diff --git a/server/http4s-server/src/main/scala/sttp/tapir/server/http4s/Http4sServerInterpreter.scala b/server/http4s-server/src/main/scala/sttp/tapir/server/http4s/Http4sServerInterpreter.scala index 6a081953c0..9f00f4b8c2 100644 --- a/server/http4s-server/src/main/scala/sttp/tapir/server/http4s/Http4sServerInterpreter.scala +++ b/server/http4s-server/src/main/scala/sttp/tapir/server/http4s/Http4sServerInterpreter.scala @@ -18,7 +18,6 @@ import sttp.tapir.server.interpreter.{BodyListener, FilterServerEndpoints, Serve import sttp.tapir.server.model.ServerResponse import scala.reflect.ClassTag -import cats.NonEmptyParallel class Http4sInvalidWebSocketUse(val message: String) extends Exception @@ -28,8 +27,7 @@ class Http4sInvalidWebSocketUse(val message: String) extends Exception trait Context[T] trait Http4sServerInterpreter[F[_]] { - implicit def fa: Async[F] - implicit def nep: NonEmptyParallel[F] + implicit def fa: Async[F] def http4sServerOptions: Http4sServerOptions[F] = Http4sServerOptions.default[F] @@ -45,8 +43,8 @@ trait Http4sServerInterpreter[F[_]] { toWebSocketRoutes(List(se)) def toWebSocketRoutes( - serverEndpoints: List[ServerEndpoint[Fs2Streams[F] with WebSockets, F]] - ): WebSocketBuilder2[F] => HttpRoutes[F] = wsb => toRoutes(serverEndpoints, Some(wsb)) + serverEndpoints: List[ServerEndpoint[Fs2Streams[F] with WebSockets, F]] + ): WebSocketBuilder2[F] => HttpRoutes[F] = wsb => toRoutes(serverEndpoints, Some(wsb)) def toContextRoutes[T: ClassTag](se: ServerEndpoint[Fs2Streams[F] with Context[T], F]): ContextRoutes[T, F] = toContextRoutes(contextAttributeKey[T], List(se), None) @@ -55,8 +53,8 @@ trait Http4sServerInterpreter[F[_]] { toContextRoutes(contextAttributeKey[T], ses, None) private def createInterpreter[T]( - serverEndpoints: List[ServerEndpoint[Fs2Streams[F] with WebSockets with Context[T], F]] - ): ServerInterpreter[Fs2Streams[F] with WebSockets with Context[T], F, Http4sResponseBody[F], Fs2Streams[F]] = { + serverEndpoints: List[ServerEndpoint[Fs2Streams[F] with WebSockets with Context[T], F]] + ): ServerInterpreter[Fs2Streams[F] with WebSockets with Context[T], F, Http4sResponseBody[F], Fs2Streams[F]] = { implicit val monad: CatsMonadError[F] = new CatsMonadError[F] implicit val bodyListener: BodyListener[F, Http4sResponseBody[F]] = new Http4sBodyListener[F] @@ -70,19 +68,19 @@ trait Http4sServerInterpreter[F[_]] { } private def toResponse[T]( - interpreter: ServerInterpreter[Fs2Streams[F] with WebSockets with Context[T], F, Http4sResponseBody[F], Fs2Streams[F]], - serverRequest: Http4sServerRequest[F], - webSocketBuilder: Option[WebSocketBuilder2[F]] - ): OptionT[F, Response[F]] = + interpreter: ServerInterpreter[Fs2Streams[F] with WebSockets with Context[T], F, Http4sResponseBody[F], Fs2Streams[F]], + serverRequest: Http4sServerRequest[F], + webSocketBuilder: Option[WebSocketBuilder2[F]] + ): OptionT[F, Response[F]] = OptionT(interpreter(serverRequest).flatMap { case _: RequestResult.Failure => none.pure[F] case RequestResult.Response(response) => serverResponseToHttp4s(response, webSocketBuilder).map(_.some) }) private def toRoutes( - serverEndpoints: List[ServerEndpoint[Fs2Streams[F] with WebSockets, F]], - webSocketBuilder: Option[WebSocketBuilder2[F]] - ): HttpRoutes[F] = { + serverEndpoints: List[ServerEndpoint[Fs2Streams[F] with WebSockets, F]], + webSocketBuilder: Option[WebSocketBuilder2[F]] + ): HttpRoutes[F] = { val interpreter = createInterpreter(serverEndpoints) Kleisli { (req: Request[F]) => @@ -92,10 +90,10 @@ trait Http4sServerInterpreter[F[_]] { } private def toContextRoutes[T]( - contextAttributeKey: AttributeKey[T], - serverEndpoints: List[ServerEndpoint[Fs2Streams[F] with WebSockets with Context[T], F]], - webSocketBuilder: Option[WebSocketBuilder2[F]] - ): ContextRoutes[T, F] = { + contextAttributeKey: AttributeKey[T], + serverEndpoints: List[ServerEndpoint[Fs2Streams[F] with WebSockets with Context[T], F]], + webSocketBuilder: Option[WebSocketBuilder2[F]] + ): ContextRoutes[T, F] = { val interpreter = createInterpreter(serverEndpoints) Kleisli { (contextRequest: ContextRequest[F, T]) => @@ -108,9 +106,9 @@ trait Http4sServerInterpreter[F[_]] { } private def serverResponseToHttp4s( - response: ServerResponse[Http4sResponseBody[F]], - webSocketBuilder: Option[WebSocketBuilder2[F]] - ): F[Response[F]] = { + response: ServerResponse[Http4sResponseBody[F]], + webSocketBuilder: Option[WebSocketBuilder2[F]] + ): F[Response[F]] = { implicit val monad: CatsMonadError[F] = new CatsMonadError[F] val statusCode = statusCodeToHttp4sStatus(response.code) @@ -148,17 +146,15 @@ trait Http4sServerInterpreter[F[_]] { object Http4sServerInterpreter { - def apply[F[_]]()(implicit _fa: Async[F], _nep: NonEmptyParallel[F]): Http4sServerInterpreter[F] = { + def apply[F[_]]()(implicit _fa: Async[F]): Http4sServerInterpreter[F] = { new Http4sServerInterpreter[F] { override implicit def fa: Async[F] = _fa - override implicit def nep: NonEmptyParallel[F] = _nep } } - def apply[F[_]](serverOptions: Http4sServerOptions[F])(implicit _fa: Async[F], _nep: NonEmptyParallel[F]): Http4sServerInterpreter[F] = { + def apply[F[_]](serverOptions: Http4sServerOptions[F])(implicit _fa: Async[F]): Http4sServerInterpreter[F] = { new Http4sServerInterpreter[F] { override implicit def fa: Async[F] = _fa - override implicit def nep: NonEmptyParallel[F] = _nep override def http4sServerOptions: Http4sServerOptions[F] = serverOptions } } diff --git a/server/http4s-server/src/main/scala/sttp/tapir/server/http4s/Http4sToResponseBody.scala b/server/http4s-server/src/main/scala/sttp/tapir/server/http4s/Http4sToResponseBody.scala index 4126cdd5ff..9b95506e85 100644 --- a/server/http4s-server/src/main/scala/sttp/tapir/server/http4s/Http4sToResponseBody.scala +++ b/server/http4s-server/src/main/scala/sttp/tapir/server/http4s/Http4sToResponseBody.scala @@ -1,6 +1,5 @@ package sttp.tapir.server.http4s -import cats.NonEmptyParallel import cats.effect.{Async, Sync} import cats.syntax.all._ import fs2.io.file.Files @@ -18,26 +17,26 @@ import sttp.tapir.{CodecFormat, RawBodyType, RawPart, WebSocketBodyOutput} import java.io.InputStream import java.nio.charset.Charset -private[http4s] class Http4sToResponseBody[F[_]: Async: NonEmptyParallel]( - serverOptions: Http4sServerOptions[F] -) extends ToResponseBody[Http4sResponseBody[F], Fs2Streams[F]] { +private[http4s] class Http4sToResponseBody[F[_]: Async]( + serverOptions: Http4sServerOptions[F] + ) extends ToResponseBody[Http4sResponseBody[F], Fs2Streams[F]] { override val streams: Fs2Streams[F] = Fs2Streams[F] override def fromRawValue[R](v: R, headers: HasHeaders, format: CodecFormat, bodyType: RawBodyType[R]): Http4sResponseBody[F] = Right(rawValueToEntity(bodyType, v)) override def fromStreamValue( - v: Stream[F, Byte], - headers: HasHeaders, - format: CodecFormat, - charset: Option[Charset] - ): Http4sResponseBody[F] = + v: Stream[F, Byte], + headers: HasHeaders, + format: CodecFormat, + charset: Option[Charset] + ): Http4sResponseBody[F] = Right((v, None)) override def fromWebSocketPipe[REQ, RESP]( - pipe: streams.Pipe[REQ, RESP], - o: WebSocketBodyOutput[streams.Pipe[REQ, RESP], REQ, RESP, _, Fs2Streams[F]] - ): Http4sResponseBody[F] = Left(Http4sWebSockets.pipeToBody(pipe, o)) + pipe: streams.Pipe[REQ, RESP], + o: WebSocketBodyOutput[streams.Pipe[REQ, RESP], REQ, RESP, _, Fs2Streams[F]] + ): Http4sResponseBody[F] = Left(Http4sWebSockets.pipeToBody(pipe, o)) private def rawValueToEntity[CF <: CodecFormat, R](bodyType: RawBodyType[R], r: R): (EntityBody[F], Option[Long]) = { bodyType match { diff --git a/server/http4s-server/src/main/scala/sttp/tapir/server/http4s/Http4sWebSockets.scala b/server/http4s-server/src/main/scala/sttp/tapir/server/http4s/Http4sWebSockets.scala index ebff140479..d9b68f9af5 100644 --- a/server/http4s-server/src/main/scala/sttp/tapir/server/http4s/Http4sWebSockets.scala +++ b/server/http4s-server/src/main/scala/sttp/tapir/server/http4s/Http4sWebSockets.scala @@ -1,9 +1,8 @@ package sttp.tapir.server.http4s -import cats.effect.{Temporal, Sync} -import cats.NonEmptyParallel +import cats.effect.Temporal +import cats.{Applicative, Monad} import cats.syntax.all._ -import cats.{Monad, NonEmptyParallel, Applicative} import fs2._ import fs2.concurrent.Channel import org.http4s.websocket.{WebSocketFrame => Http4sWebSocketFrame} @@ -12,13 +11,14 @@ import sttp.capabilities.fs2.Fs2Streams import sttp.tapir.model.WebSocketFrameDecodeFailure import sttp.tapir.{DecodeResult, WebSocketBodyOutput} import sttp.ws.WebSocketFrame +import cats.effect.implicits.parallelForGenSpawn private[http4s] object Http4sWebSockets { - def pipeToBody[F[_]: NonEmptyParallel: Temporal, REQ, RESP]( + def pipeToBody[F[_]: Temporal, REQ, RESP]( pipe: Pipe[F, REQ, RESP], o: WebSocketBodyOutput[Pipe[F, REQ, RESP], REQ, RESP, _, Fs2Streams[F]] ): F[Pipe[F, Http4sWebSocketFrame, Http4sWebSocketFrame]] = { - if ((!o.concatenateFragmentedFrames) && (!o.ignorePing) && (!o.ignorePong) && (!o.autoPongOnPing) && o.autoPing.isEmpty) { + if ((!o.concatenateFragmentedFrames) && (!o.ignorePong) && (!o.autoPongOnPing) && o.autoPing.isEmpty) { // fast track: lift Http4sWebSocketFrames into REQ, run through pipe, convert RESP back to Http4sWebSocketFrame (in: Stream[F, Http4sWebSocketFrame]) => @@ -42,14 +42,14 @@ private[http4s] object Http4sWebSockets { val decodeClose = optionallyDecodeClose(in, o.decodeCloseRequests) val sttpFrames = decodeClose.map(http4sFrameToFrame) val concatenated = optionallyConcatenateFrames(sttpFrames, o.concatenateFragmentedFrames) - val autoPongs = optionallyAutoPong(concatenated, c, o.autoPongOnPing) - val ignorePingPongs = optionallyIgnorePingPong(autoPongs, o.ignorePing, o.ignorePong) + val ignorePongs = optionallyIgnorePong(concatenated, o.ignorePong) + val autoPongs = optionallyAutoPong(ignorePongs, c, o.autoPongOnPing) val autoPings = o.autoPing match { case Some((interval, frame)) => (c.send(Chunk(frameToHttp4sFrame(frame))) >> Temporal[F].sleep(interval)).foreverM[Unit] case None => Applicative[F].unit } - val outputProducer = ignorePingPongs + val outputProducer = autoPongs .map { f => o.requests.decode(f) match { case x: DecodeResult.Value[REQ] => x.v @@ -105,26 +105,14 @@ private[http4s] object Http4sWebSockets { }.collect { case (_, Some(f)) => f } } else s - private def optionallyIgnorePingPong[F[_]](s: Stream[F, WebSocketFrame], ignorePing: Boolean, ignorePong: Boolean): Stream[F, WebSocketFrame] = - (ignorePing, ignorePong) match { - case (false, false) => s - case (true, false) => - s.filter { - case _: WebSocketFrame.Ping => false - case _ => true - } - case (false, true) => - s.filter { - case _: WebSocketFrame.Pong => false - case _ => true - } - case (true, true) => - s.filter { - case _: WebSocketFrame.Ping => false - case _: WebSocketFrame.Pong => false - case _ => true - } - } + private def optionallyIgnorePong[F[_]](s: Stream[F, WebSocketFrame], doIgnore: Boolean): Stream[F, WebSocketFrame] = { + if (doIgnore) { + s.filter { + case _: WebSocketFrame.Pong => false + case _ => true + } + } else s + } private def optionallyAutoPong[F[_] : Monad]( s: Stream[F, WebSocketFrame], diff --git a/server/http4s-server/zio/src/main/scala/sttp/tapir/server/http4s/ztapir/ConvertStreams.scala b/server/http4s-server/zio/src/main/scala/sttp/tapir/server/http4s/ztapir/ConvertStreams.scala index 874fd0a817..f8bd4f4a27 100644 --- a/server/http4s-server/zio/src/main/scala/sttp/tapir/server/http4s/ztapir/ConvertStreams.scala +++ b/server/http4s-server/zio/src/main/scala/sttp/tapir/server/http4s/ztapir/ConvertStreams.scala @@ -14,8 +14,8 @@ import zio.{RIO, Task} object ConvertStreams { def apply[R, C]( - se: ZServerEndpoint[R, ZioStreams with C] - ): ServerEndpoint[Fs2Streams[RIO[R, *]] with C, RIO[R, *]] = + se: ZServerEndpoint[R, ZioStreams with C] + ): ServerEndpoint[Fs2Streams[RIO[R, *]] with C, RIO[R, *]] = ServerEndpoint( Endpoint( forInput(se.securityInput).asInstanceOf[EndpointInput[se.SECURITY_INPUT]], @@ -112,7 +112,7 @@ object ConvertStreams { } private def fs2PipeToZioPipeCodec[A, B] - : Codec[fs2.Pipe[Task, A, B], zio.stream.Stream[Throwable, A] => zio.stream.Stream[Throwable, B], OctetStream] = + : Codec[fs2.Pipe[Task, A, B], zio.stream.Stream[Throwable, A] => zio.stream.Stream[Throwable, B], OctetStream] = Codec .id[fs2.Pipe[Task, A, B], OctetStream](OctetStream(), Schema.binary) .map { (fs2Pipe: fs2.Pipe[Task, A, B]) => (zioStreamA: zio.stream.Stream[Throwable, A]) => @@ -122,8 +122,8 @@ object ConvertStreams { } private def apply[R, PIPE_REQ_RESP, REQ, RESP, T, S]( - w: WebSocketBodyOutput[PIPE_REQ_RESP, REQ, RESP, T, S] - ): WebSocketBodyOutput[fs2.Pipe[Task, REQ, RESP], REQ, RESP, T, Fs2Streams[RIO[R, *]]] = { + w: WebSocketBodyOutput[PIPE_REQ_RESP, REQ, RESP, T, S] + ): WebSocketBodyOutput[fs2.Pipe[Task, REQ, RESP], REQ, RESP, T, Fs2Streams[RIO[R, *]]] = { // we know that: // * PIPE_REQ_RESP == zio.stream.Stream[Throwable, REQ] => zio.stream.Stream[Throwable, RESP] // * S == ZioStreams @@ -141,7 +141,6 @@ object ConvertStreams { w2.requestsInfo, w2.responsesInfo, w2.concatenateFragmentedFrames, - w2.ignorePing, w2.ignorePong, w2.autoPongOnPing, w2.decodeCloseRequests,