diff --git a/.gitignore b/.gitignore index c3e7efc778..a2b684eeb8 100644 --- a/.gitignore +++ b/.gitignore @@ -24,7 +24,7 @@ core/native/local.sbt .metals/ .bloop/ -project/metals.sbt +metals.sbt .bsp/ .java-version diff --git a/build.sbt b/build.sbt index c35d147e2c..bface22422 100644 --- a/build.sbt +++ b/build.sbt @@ -145,6 +145,10 @@ val akkaHttp = "com.typesafe.akka" %% "akka-http" % "10.2.10" val akkaStreamVersion = "2.6.20" val akkaStreams = "com.typesafe.akka" %% "akka-stream" % akkaStreamVersion +val pekkoHttp = "org.apache.pekko" %% "pekko-http" % "1.0.0" +val pekkoStreamVersion = "1.0.1" +val pekkoStreams = "org.apache.pekko" %% "pekko-stream" % pekkoStreamVersion + val scalaTest = libraryDependencies ++= Seq("freespec", "funsuite", "flatspec", "wordspec", "shouldmatchers").map(m => "org.scalatest" %%% s"scalatest-$m" % "3.2.15" % Test ) @@ -155,7 +159,7 @@ val zio1InteropRsVersion = "1.3.12" val zio2InteropRsVersion = "2.0.1" val sttpModelVersion = "1.5.5" -val sttpSharedVersion = "1.3.15" +val sttpSharedVersion = "1.3.16" val logback = "ch.qos.logback" % "logback-classic" % "1.4.5" @@ -196,6 +200,7 @@ lazy val allAggregates = projectsWithOptionalNative ++ zio1.projectRefs ++ zio.projectRefs ++ akkaHttpBackend.projectRefs ++ + pekkoHttpBackend.projectRefs ++ asyncHttpClientBackend.projectRefs ++ asyncHttpClientFutureBackend.projectRefs ++ asyncHttpClientScalazBackend.projectRefs ++ @@ -541,6 +546,25 @@ lazy val akkaHttpBackend = (projectMatrix in file("akka-http-backend")) scalaVersions = scala2alive ) +//-- pekko +lazy val pekkoHttpBackend = (projectMatrix in file("pekko-http-backend")) + .settings(commonJvmSettings) + .settings(testServerSettings) + .settings( + name := "pekko-http-backend", + libraryDependencies ++= Seq( + pekkoHttp, + // provided as we don't want to create a transitive dependency on a specific streams version, + // just as akka-http doesn't + pekkoStreams % "provided", + "com.softwaremill.sttp.shared" %% "pekko" % sttpSharedVersion + ) + ) + .dependsOn(core % compileAndTest) + .jvmPlatform( + scalaVersions = scala2alive ++ scala3 + ) + //-- async http client lazy val asyncHttpClientBackend = (projectMatrix in file("async-http-client-backend")) .settings(commonJvmSettings) @@ -1001,7 +1025,8 @@ lazy val examples = (projectMatrix in file("examples")) libraryDependencies ++= dependenciesFor(scalaVersion.value)( "io.circe" %% "circe-generic" % circeVersion(_), _ => "org.json4s" %% "json4s-native" % json4sVersion, - _ => akkaStreams, + _ => akkaStreams.exclude("org.scala-lang.modules", "scala-java8-compat_2.12"), + _ => pekkoStreams, _ => logback ) ) @@ -1010,6 +1035,7 @@ lazy val examples = (projectMatrix in file("examples")) core, asyncHttpClientZioBackend, akkaHttpBackend, + pekkoHttpBackend, asyncHttpClientFs2Backend, json4s, circe, @@ -1036,7 +1062,8 @@ lazy val docs: ProjectMatrix = (projectMatrix in file("generated-docs")) // impo "BRAVE_OPENTRACING_VERSION" -> braveOpentracingVersion, "ZIPKIN_SENDER_OKHTTP_VERSION" -> zipkinSenderOkHttpVersion, "AKKA_STREAM_VERSION" -> akkaStreamVersion, - "CIRCE_VERSION" -> circeVersion(None) + "CIRCE_VERSION" -> circeVersion(None), + "PEKKO_STREAM_VERSION" -> pekkoStreamVersion ), mdocOut := file("generated-docs/out"), mdocExtraArguments := Seq("--clean-target"), @@ -1053,13 +1080,15 @@ lazy val docs: ProjectMatrix = (projectMatrix in file("generated-docs")) // impo "io.opentracing.brave" % "brave-opentracing" % braveOpentracingVersion, "io.zipkin.reporter2" % "zipkin-sender-okhttp3" % zipkinSenderOkHttpVersion, "io.opentelemetry" % "opentelemetry-semconv" % "1.2.0-alpha", - akkaStreams + akkaStreams, + pekkoStreams ), evictionErrorLevel := Level.Info ) .dependsOn( core % "compile->test", akkaHttpBackend, + pekkoHttpBackend, json4s, circe, sprayJson, diff --git a/docs/backends/pekko.md b/docs/backends/pekko.md new file mode 100644 index 0000000000..0216aed9f9 --- /dev/null +++ b/docs/backends/pekko.md @@ -0,0 +1,123 @@ +# Pekko backend + +This backend is based on [pekko-http](https://pekko.apache.org/docs/pekko-http/current/). To use, add the following dependency to your project: + +``` +"com.softwaremill.sttp.client3" %% "pekko-http-backend" % "@VERSION@" +``` + +A fully **asynchronous** backend. Uses the `Future` effect to return responses. There are also [other `Future`-based backends](future.md), which don't depend on Pekko. + +Note that you'll also need an explicit dependency on pekko-streams, as pekko-http doesn't depend on any specific pekko-streams version. So you'll also need to add, for example: + +``` +"org.apache.pekko" %% "pekko-stream" % "@PEKKO_STREAM_VERSION@" +``` + +Next you'll need to add create the backend instance: + +```scala mdoc:compile-only +import sttp.client3.pekkohttp._ +val backend = PekkoHttpBackend() +``` + +or, if you'd like to use an existing actor system: + +```scala mdoc:compile-only +import sttp.client3.pekkohttp._ +import org.apache.pekko.actor.ActorSystem + +val actorSystem: ActorSystem = ??? +val backend = PekkoHttpBackend.usingActorSystem(actorSystem) +``` + +This backend supports sending and receiving [pekko-streams](https://pekko.apache.org/docs/pekko/current/stream/index.html) streams. The streams capability is represented as `sttp.client3.pekkohttp.PekkoStreams`. + +To set the request body as a stream: + +```scala mdoc:compile-only +import sttp.capabilities.pekko.PekkoStreams +import sttp.client3._ + +import org.apache.pekko +import pekko.stream.scaladsl.Source +import pekko.util.ByteString + +val source: Source[ByteString, Any] = ??? + +basicRequest + .post(uri"...") + .streamBody(PekkoStreams)(source) +``` + +To receive the response body as a stream: + +```scala mdoc:compile-only +import scala.concurrent.Future +import sttp.capabilities.pekko.PekkoStreams +import sttp.client3._ +import sttp.client3.pekkohttp.PekkoHttpBackend + +import org.apache.pekko +import pekko.stream.scaladsl.Source +import pekko.util.ByteString + +val backend = PekkoHttpBackend() + +val response: Future[Response[Either[String, Source[ByteString, Any]]]] = + basicRequest + .post(uri"...") + .response(asStreamUnsafe(PekkoStreams)) + .send(backend) +``` + +The pekko-http backend support both regular and streaming [websockets](../websockets.md). + +## Testing + +Apart from testing using [the stub](../testing.md), you can create a backend using any `HttpRequest => Future[HttpResponse]` function, or an pekko-http `Route`. + +That way, you can "mock" a server that the backend will talk to, without starting any actual server or making any HTTP calls. + +If your application provides a client library for its dependants to use, this is a great way to ensure that the client actually matches the routes exposed by your application: + +```scala mdoc:compile-only +import sttp.client3.pekkohttp._ +import org.apache.pekko +import pekko.http.scaladsl.server.Route +import pekko.actor.ActorSystem + +val route: Route = ??? +implicit val system: ActorSystem = ??? + +val backend = PekkoHttpBackend.usingClient(system, http = PekkoHttpClient.stubFromRoute(route)) +``` + +## WebSockets + +Non-standard behavior: + +* pekko always automatically responds with a `Pong` to a `Ping` message +* `WebSocketFrame.Ping` and `WebSocketFrame.Pong` frames are ignored; instead, you can configure automatic [keep-alive pings](https://pekko.apache.org/docs/pekko-http/current/client-side/websocket-support.html#automatic-keep-alive-ping-support) + +## Server-sent events + +Received data streams can be parsed to a stream of server-sent events (SSE): + +```scala mdoc:compile-only +import scala.concurrent.Future + +import org.apache.pekko.stream.scaladsl.Source + +import sttp.capabilities.pekko.PekkoStreams +import sttp.client3.pekkohttp.PekkoHttpServerSentEvents +import sttp.model.sse.ServerSentEvent +import sttp.client3._ + +def processEvents(source: Source[ServerSentEvent, Any]): Future[Unit] = ??? + +basicRequest + .get(uri"...") + .response(asStream(PekkoStreams)(stream => + processEvents(stream.via(PekkoHttpServerSentEvents.parse)))) +``` diff --git a/docs/examples.md b/docs/examples.md index 12278df643..8b4eb79252 100644 --- a/docs/examples.md +++ b/docs/examples.md @@ -201,6 +201,21 @@ Example code: :language: scala ``` +## Open a websocket using Pekko + +Required dependencies: + +```scala +libraryDependencies ++= List("com.softwaremill.sttp.client3" %% "pekko-http-backend" % "@VERSION@") +``` + +Example code: + +```eval_rst +.. literalinclude:: ../../examples/src/main/scala/sttp/client3/examples/WebSocketPekko.scala + :language: scala +``` + ## Open a websocket using Monix Required dependencies: diff --git a/docs/index.md b/docs/index.md index e66f869dd9..cf98c2a0d0 100644 --- a/docs/index.md +++ b/docs/index.md @@ -7,7 +7,7 @@ Welcome! [sttp client](https://github.com/softwaremill/sttp) is an open-source library which provides a clean, programmer-friendly API to describe HTTP requests and how to handle responses. Requests are sent using one of the backends, which wrap other Scala or Java HTTP client implementations. The backends can integrate with a variety of Scala stacks, providing both synchronous and asynchronous, procedural and functional interfaces. -Backend implementations include ones based on [akka-http](https://doc.akka.io/docs/akka-http/current/scala/http/), [http4s](https://http4s.org), [OkHttp](http://square.github.io/okhttp/), and HTTP clients which ship with Java. They integrate with [Akka](https://akka.io), [Monix](https://monix.io), [fs2](https://github.com/functional-streams-for-scala/fs2), [cats-effect](https://github.com/typelevel/cats-effect), [scalaz](https://github.com/scalaz/scalaz) and [ZIO](https://github.com/zio/zio). Supported Scala versions include 2.11, 2.12, 2.13 and 3, Scala.JS and Scala Native. +Backend implementations include ones based on [akka-http](https://doc.akka.io/docs/akka-http/current/scala/http/), [pekko-http](https://pekko.apache.org/docs/pekko-http/current/), [http4s](https://http4s.org), [OkHttp](http://square.github.io/okhttp/), and HTTP clients which ship with Java. They integrate with [Akka](https://akka.io), [Monix](https://monix.io), [fs2](https://github.com/functional-streams-for-scala/fs2), [cats-effect](https://github.com/typelevel/cats-effect), [scalaz](https://github.com/scalaz/scalaz) and [ZIO](https://github.com/zio/zio). Supported Scala versions include 2.11, 2.12, 2.13 and 3, Scala.JS and Scala Native. Here's a quick example of sttp client in action: @@ -124,6 +124,7 @@ We offer commercial support for sttp and related technologies, as well as develo backends/start_stop backends/synchronous backends/akka + backends/pekko backends/future backends/monix backends/catseffect diff --git a/examples/src/main/scala/sttp/client3/examples/WebSocketPekko.scala b/examples/src/main/scala/sttp/client3/examples/WebSocketPekko.scala new file mode 100644 index 0000000000..0f71c71837 --- /dev/null +++ b/examples/src/main/scala/sttp/client3/examples/WebSocketPekko.scala @@ -0,0 +1,29 @@ +package sttp.client3.examples + +import sttp.client3._ +import sttp.client3.pekkohttp.PekkoHttpBackend +import sttp.ws.WebSocket + +import scala.concurrent.ExecutionContext.Implicits.global +import scala.concurrent.Future + +object WebSocketPekko extends App { + def useWebSocket(ws: WebSocket[Future]): Future[Unit] = { + def send(i: Int) = ws.sendText(s"Hello $i!") + def receive() = ws.receiveText().map(t => println(s"RECEIVED: $t")) + for { + _ <- send(1) + _ <- send(2) + _ <- receive() + _ <- receive() + } yield () + } + + val backend = PekkoHttpBackend() + + basicRequest + .get(uri"wss://ws.postman-echo.com/raw") + .response(asWebSocket(useWebSocket)) + .send(backend) + .onComplete(_ => backend.close()) +} diff --git a/generated-docs/out/examples.md b/generated-docs/out/examples.md index 32b3fdcd56..ca4b0cdc70 100644 --- a/generated-docs/out/examples.md +++ b/generated-docs/out/examples.md @@ -201,6 +201,21 @@ Example code: :language: scala ``` +## Open a websocket using Pekko + +Required dependencies: + +```scala +libraryDependencies ++= List("com.softwaremill.sttp.client3" %% "pekko-http-backend" % "@VERSION@") +``` + +Example code: + +```eval_rst +.. literalinclude:: ../../examples/src/main/scala/sttp/client3/examples/WebSocketPekko.scala + :language: scala +``` + ## Open a websocket using Monix Required dependencies: diff --git a/pekko-http-backend/src/main/scala/sttp/client3/pekkohttp/BodyFromPekko.scala b/pekko-http-backend/src/main/scala/sttp/client3/pekkohttp/BodyFromPekko.scala new file mode 100644 index 0000000000..d026d3cced --- /dev/null +++ b/pekko-http-backend/src/main/scala/sttp/client3/pekkohttp/BodyFromPekko.scala @@ -0,0 +1,218 @@ +package sttp.client3.pekkohttp + +import java.util.concurrent.atomic.AtomicBoolean +import org.apache.pekko +import pekko.http.scaladsl.model.ws.{BinaryMessage, Message, TextMessage} +import pekko.http.scaladsl.model.{HttpEntity, HttpResponse} +import pekko.stream.scaladsl.{FileIO, Flow, Sink, SinkQueueWithCancel, Source, SourceQueueWithComplete} +import pekko.stream.{Materializer, OverflowStrategy, QueueOfferResult} +import pekko.util.ByteString +import pekko.{Done, NotUsed} +import sttp.capabilities.pekko.PekkoStreams +import sttp.client3._ +import sttp.client3.internal._ +import sttp.client3.ws.{GotAWebSocketException, NotAWebSocketException} +import sttp.model.{Headers, ResponseMetadata} +import sttp.monad.{FutureMonad, MonadError} +import sttp.ws.{WebSocket, WebSocketBufferFull, WebSocketClosed, WebSocketFrame} + +import scala.concurrent.{ExecutionContext, Future, Promise} +import scala.util.Failure + +private[pekkohttp] class BodyFromPekko()(implicit ec: ExecutionContext, mat: Materializer, m: MonadError[Future]) { + def apply[T, R]( + responseAs: ResponseAs[T, R], + meta: ResponseMetadata, + response: Either[HttpResponse, Promise[Flow[Message, Message, NotUsed]]] + ): Future[T] = + bodyFromResponseAs(responseAs, meta, response) + + private lazy val bodyFromResponseAs = + new BodyFromResponseAs[Future, HttpResponse, Promise[Flow[Message, Message, NotUsed]], PekkoStreams.BinaryStream] { + override protected def withReplayableBody( + response: HttpResponse, + replayableBody: Either[Array[Byte], SttpFile] + ): Future[HttpResponse] = { + val replayEntity = replayableBody match { + case Left(byteArray) => HttpEntity(byteArray) + case Right(file) => HttpEntity.fromFile(response.entity.contentType, file.toFile) + } + + Future.successful(response.copy(entity = replayEntity)) + } + + override protected def regularIgnore(response: HttpResponse): Future[Unit] = + // todo: Replace with HttpResponse#discardEntityBytes() once https://github.com/akka/akka-http/issues/1459 is resolved + response.entity.dataBytes.runWith(Sink.ignore).map(_ => ()) + + override protected def regularAsByteArray(response: HttpResponse): Future[Array[Byte]] = + response.entity.dataBytes + .runFold(ByteString(""))(_ ++ _) + .map(_.toArray[Byte]) + + override protected def regularAsFile(response: HttpResponse, file: SttpFile): Future[SttpFile] = { + val f = file.toFile + if (!f.exists()) { + f.getParentFile.mkdirs() + f.createNewFile() + } + + response.entity.dataBytes.runWith(FileIO.toPath(file.toPath)).map(_ => file) + } + + override protected def regularAsStream( + response: HttpResponse + ): Future[(Source[ByteString, Any], () => Future[Unit])] = + Future.successful( + ( + response.entity.dataBytes, + // ignoring exceptions that occur when discarding (i.e. double-materialisation exceptions) + () => response.discardEntityBytes().future().map(_ => ()).recover { case _ => () } + ) + ) + + override protected def handleWS[T]( + responseAs: WebSocketResponseAs[T, _], + meta: ResponseMetadata, + ws: Promise[Flow[Message, Message, NotUsed]] + ): Future[T] = wsFromPekko(responseAs, ws, meta) + + override protected def cleanupWhenNotAWebSocket(response: HttpResponse, e: NotAWebSocketException): Future[Unit] = + response.entity.discardBytes().future().map(_ => ()) + + override protected def cleanupWhenGotWebSocket( + response: Promise[Flow[Message, Message, NotUsed]], + e: GotAWebSocketException + ): Future[Unit] = Future.successful(response.failure(e)) + } + + private def wsFromPekko[T, R]( + rr: WebSocketResponseAs[T, R], + wsFlow: Promise[Flow[Message, Message, NotUsed]], + meta: ResponseMetadata + )(implicit ec: ExecutionContext, mat: Materializer): Future[T] = + rr match { + case ResponseAsWebSocket(f) => + val (flow, wsFuture) = webSocketAndFlow(meta) + wsFlow.success(flow) + wsFuture.flatMap { ws => + val result = f.asInstanceOf[(WebSocket[Future], ResponseMetadata) => Future[T]](ws, meta) + result.onComplete(_ => ws.close()) + result + } + case ResponseAsWebSocketUnsafe() => + val (flow, wsFuture) = webSocketAndFlow(meta) + wsFlow.success(flow) + wsFuture.asInstanceOf[Future[T]] + case ResponseAsWebSocketStream(_, p) => + val donePromise = Promise[Done]() + + val flow = Flow[Message] + .mapAsync(1)(messageToFrame) + .via(p.asInstanceOf[PekkoStreams.Pipe[WebSocketFrame.Data[_], WebSocketFrame]]) + .takeWhile { + case WebSocketFrame.Close(_, _) => false + case _ => true + } + .mapConcat(incoming => frameToMessage(incoming).toList) + .watchTermination() { (notUsed, done) => + donePromise.completeWith(done) + notUsed + } + + wsFlow.success(flow) + + donePromise.future.map(_ => ()) + } + + private def webSocketAndFlow(meta: ResponseMetadata)(implicit + ec: ExecutionContext, + mat: Materializer + ): (Flow[Message, Message, NotUsed], Future[WebSocket[Future]]) = { + val sinkQueuePromise = Promise[SinkQueueWithCancel[Message]]() + val sink = Sink + .queue[Message]() + .mapMaterializedValue(sinkQueuePromise.success) + + val sourceQueuePromise = Promise[SourceQueueWithComplete[Message]]() + val source = + Source.queue[Message](1, OverflowStrategy.backpressure).mapMaterializedValue(sourceQueuePromise.success) + + val flow = Flow.fromSinkAndSource(sink, source) + + val ws = for { + sinkQueue <- sinkQueuePromise.future + sourceQueue <- sourceQueuePromise.future + } yield new WebSocket[Future] { + private val open = new AtomicBoolean(true) + private val closeReceived = new AtomicBoolean(false) + + override def receive(): Future[WebSocketFrame] = { + val result = sinkQueue.pull().flatMap { + case Some(m) => messageToFrame(m) + case None => + open.set(false) + val c = closeReceived.getAndSet(true) + if (!c) Future.successful(WebSocketFrame.close) + else Future.failed(WebSocketClosed(Some(WebSocketFrame.close))) + } + + result.onComplete { + case Failure(_) => open.set(false) + case _ => + } + + result + } + + override def send(f: WebSocketFrame, isContinuation: Boolean): Future[Unit] = + f match { + case WebSocketFrame.Close(_, _) => + val wasOpen = open.getAndSet(false) + if (wasOpen) sourceQueue.complete() + sourceQueue.watchCompletion().map(_ => ()) + + case frame: WebSocketFrame => + frameToMessage(frame) match { + case Some(m) => + sourceQueue.offer(m).flatMap { + case QueueOfferResult.Enqueued => Future.successful(()) + case QueueOfferResult.Dropped => + Future.failed(throw new IllegalStateException(WebSocketBufferFull(1))) + case QueueOfferResult.Failure(cause) => Future.failed(cause) + case QueueOfferResult.QueueClosed => + Future.failed(throw new IllegalStateException(WebSocketClosed(None))) + } + case None => Future.successful(()) + } + } + + override def upgradeHeaders: Headers = Headers(meta.headers) + + override def isOpen(): Future[Boolean] = Future.successful(open.get()) + + override implicit def monad: MonadError[Future] = new FutureMonad()(ec) + } + + (flow, ws) + } + + private def messageToFrame( + m: Message + )(implicit ec: ExecutionContext, mat: Materializer): Future[WebSocketFrame.Data[_]] = + m match { + case msg: TextMessage => + msg.textStream.runFold("")(_ + _).map(t => WebSocketFrame.text(t)) + case msg: BinaryMessage => + msg.dataStream.runFold(ByteString.empty)(_ ++ _).map(b => WebSocketFrame.binary(b.toArray)) + } + + private def frameToMessage(w: WebSocketFrame): Option[Message] = + w match { + case WebSocketFrame.Text(p, _, _) => Some(TextMessage(p)) + case WebSocketFrame.Binary(p, _, _) => Some(BinaryMessage(ByteString(p))) + case WebSocketFrame.Ping(_) => None + case WebSocketFrame.Pong(_) => None + case WebSocketFrame.Close(_, _) => throw WebSocketClosed(None) + } +} diff --git a/pekko-http-backend/src/main/scala/sttp/client3/pekkohttp/BodyToPekko.scala b/pekko-http-backend/src/main/scala/sttp/client3/pekkohttp/BodyToPekko.scala new file mode 100644 index 0000000000..0278e1f5c4 --- /dev/null +++ b/pekko-http-backend/src/main/scala/sttp/client3/pekkohttp/BodyToPekko.scala @@ -0,0 +1,103 @@ +package sttp.client3.pekkohttp + +import org.apache.pekko +import pekko.http.scaladsl.model.{ + ContentType, + HttpCharsets, + HttpEntity, + HttpRequest, + MediaType, + Multipart => PekkoMultipart, + RequestEntity +} +import pekko.stream.scaladsl.{Source, StreamConverters} +import pekko.util.ByteString +import sttp.capabilities.pekko.PekkoStreams +import sttp.client3.internal.throwNestedMultipartNotAllowed +import sttp.client3._ +import sttp.model.{HeaderNames, Part} + +import scala.collection.immutable.Seq +import scala.util.{Failure, Success, Try} + +private[pekkohttp] object BodyToPekko { + def apply[R]( + r: Request[_, R], + body: RequestBody[R], + ar: HttpRequest + ): Try[HttpRequest] = { + def ctWithCharset(ct: ContentType, charset: String) = + HttpCharsets + .getForKey(charset) + .map(hc => ContentType.apply(ct.mediaType, () => hc)) + .getOrElse(ct) + + def contentLength = r.headers.find(_.is(HeaderNames.ContentLength)).flatMap(h => Try(h.value.toLong).toOption) + + def toBodyPart(mp: Part[RequestBody[_]]): Try[PekkoMultipart.FormData.BodyPart] = { + def streamPartEntity(contentType: ContentType, s: PekkoStreams.BinaryStream) = + mp.contentLength match { + case None => HttpEntity.IndefiniteLength(contentType, s) + case Some(l) => HttpEntity(contentType, l, s) + } + + def entity(ct: ContentType) = + mp.body match { + case StringBody(b, encoding, _) => HttpEntity(ctWithCharset(ct, encoding), b.getBytes(encoding)) + case ByteArrayBody(b, _) => HttpEntity(ct, b) + case ByteBufferBody(b, _) => HttpEntity(ct, ByteString(b)) + case isb: InputStreamBody => streamPartEntity(ct, StreamConverters.fromInputStream(() => isb.b)) + case FileBody(b, _) => HttpEntity.fromPath(ct, b.toPath) + case StreamBody(b) => streamPartEntity(ct, b.asInstanceOf[PekkoStreams.BinaryStream]) + case MultipartBody(_) => throwNestedMultipartNotAllowed + case NoBody => HttpEntity.Empty + } + + for { + ct <- Util.parseContentTypeOrOctetStream(mp.contentType) + headers <- ToPekko.headers(mp.headers.toList) + } yield PekkoMultipart.FormData.BodyPart(mp.name, entity(ct), mp.dispositionParams, headers) + } + + def streamEntity(contentType: ContentType, s: PekkoStreams.BinaryStream) = + contentLength match { + case None => HttpEntity(contentType, s) + case Some(l) => HttpEntity(contentType, l, s) + } + + Util.parseContentTypeOrOctetStream(r).flatMap { ct => + body match { + case NoBody => Success(ar) + case StringBody(b, encoding, _) => Success(ar.withEntity(ctWithCharset(ct, encoding), b.getBytes(encoding))) + case ByteArrayBody(b, _) => Success(ar.withEntity(HttpEntity(ct, b))) + case ByteBufferBody(b, _) => Success(ar.withEntity(HttpEntity(ct, ByteString(b)))) + case InputStreamBody(b, _) => + Success(ar.withEntity(streamEntity(ct, StreamConverters.fromInputStream(() => b)))) + case FileBody(b, _) => Success(ar.withEntity(ct, b.toPath)) + case StreamBody(s) => Success(ar.withEntity(streamEntity(ct, s.asInstanceOf[PekkoStreams.BinaryStream]))) + case m: MultipartBody[_] => + Util + .traverseTry(m.parts.map(toBodyPart)) + .flatMap(bodyParts => multipartEntity(r, bodyParts).map(ar.withEntity)) + } + } + } + + private def multipartEntity( + r: Request[_, _], + bodyParts: Seq[PekkoMultipart.FormData.BodyPart] + ): Try[RequestEntity] = + r.headers.find(Util.isContentType) match { + case None => Success(PekkoMultipart.FormData(bodyParts: _*).toEntity) + case Some(ct) => + Util.parseContentType(ct.value).map(_.mediaType).flatMap { + case m: MediaType.Multipart => + Success( + PekkoMultipart + .General(m, Source(bodyParts.map(bp => PekkoMultipart.General.BodyPart(bp.entity, bp.headers)))) + .toEntity + ) + case _ => Failure(new RuntimeException(s"Non-multipart content type: $ct")) + } + } +} diff --git a/pekko-http-backend/src/main/scala/sttp/client3/pekkohttp/FromPekko.scala b/pekko-http-backend/src/main/scala/sttp/client3/pekkohttp/FromPekko.scala new file mode 100644 index 0000000000..def617801e --- /dev/null +++ b/pekko-http-backend/src/main/scala/sttp/client3/pekkohttp/FromPekko.scala @@ -0,0 +1,32 @@ +package sttp.client3.pekkohttp + +import org.apache.pekko +import pekko.http.scaladsl.model.HttpResponse +import sttp.client3.{Request, SttpClientException} +import sttp.model.{Header, HeaderNames} + +import scala.collection.immutable.Seq + +private[pekkohttp] object FromPekko { + def headers(hr: HttpResponse): Seq[Header] = { + val ch = Header(HeaderNames.ContentType, hr.entity.contentType.toString()) + val cl = + hr.entity.contentLengthOption.map(v => Header.contentLength(v)) + val other = hr.headers.map(h => Header(h.name, h.value)) + ch :: (cl.toList ++ other) + } + + def exception(request: Request[_, _], e: Exception): Option[Exception] = + e match { + case e: pekko.stream.ConnectionException => Some(new SttpClientException.ConnectException(request, e)) + case e: pekko.stream.StreamTcpException => + e.getCause match { + case ee: Exception => + exception(request, ee).orElse(Some(new SttpClientException.ReadException(request, e))) + case _ => Some(new SttpClientException.ReadException(request, e)) + } + case e: pekko.stream.scaladsl.TcpIdleTimeoutException => + Some(new SttpClientException.TimeoutException(request, e)) + case e: Exception => SttpClientException.defaultExceptionToSttpClientException(request, e) + } +} diff --git a/pekko-http-backend/src/main/scala/sttp/client3/pekkohttp/PekkoHttpBackend.scala b/pekko-http-backend/src/main/scala/sttp/client3/pekkohttp/PekkoHttpBackend.scala new file mode 100644 index 0000000000..d0c9ad5c7f --- /dev/null +++ b/pekko-http-backend/src/main/scala/sttp/client3/pekkohttp/PekkoHttpBackend.scala @@ -0,0 +1,303 @@ +package sttp.client3.pekkohttp + +import java.io.UnsupportedEncodingException +import org.apache.pekko +import pekko.{Done, NotUsed} +import pekko.actor.{ActorSystem, CoordinatedShutdown} +import pekko.event.LoggingAdapter +import pekko.http.scaladsl.coding.Coders +import pekko.http.scaladsl.model.headers.{BasicHttpCredentials, HttpEncoding, HttpEncodings} +import pekko.http.scaladsl.model.ws.{InvalidUpgradeResponse, Message, ValidUpgrade, WebSocketRequest} +import pekko.http.scaladsl.model.{StatusCode => _, _} +import pekko.http.scaladsl.settings.ConnectionPoolSettings +import pekko.http.scaladsl.{ClientTransport, Http, HttpsConnectionContext} +import pekko.stream.Materializer +import pekko.stream.scaladsl.{Flow, Sink} +import sttp.capabilities.pekko.PekkoStreams +import sttp.capabilities.{Effect, WebSockets} +import sttp.client3 +import sttp.client3.pekkohttp.PekkoHttpBackend.EncodingHandler +import sttp.client3.testing.SttpBackendStub +import sttp.client3.{FollowRedirectsBackend, Response, SttpBackend, SttpBackendOptions, _} +import sttp.model.{ResponseMetadata, StatusCode} +import sttp.monad.{FutureMonad, MonadError} + +import scala.concurrent.{ExecutionContext, Future, Promise} + +class PekkoHttpBackend private ( + actorSystem: ActorSystem, + ec: ExecutionContext, + terminateActorSystemOnClose: Boolean, + opts: SttpBackendOptions, + customConnectionPoolSettings: Option[ConnectionPoolSettings], + http: PekkoHttpClient, + customizeRequest: HttpRequest => HttpRequest, + customizeWebsocketRequest: WebSocketRequest => WebSocketRequest, + customizeResponse: (HttpRequest, HttpResponse) => HttpResponse, + customEncodingHandler: EncodingHandler +) extends SttpBackend[Future, PekkoStreams with WebSockets] { + type PE = PekkoStreams with WebSockets with Effect[Future] + + private implicit val as: ActorSystem = actorSystem + private implicit val _ec: ExecutionContext = ec + + private val connectionPoolSettings = customConnectionPoolSettings + .getOrElse(ConnectionPoolSettings(actorSystem)) + .withUpdatedConnectionSettings(_.withConnectingTimeout(opts.connectionTimeout)) + + override def send[T, R >: PE](r: Request[T, R]): Future[Response[T]] = + adjustExceptions(r) { + if (r.isWebSocket) sendWebSocket(r) else sendRegular(r) + } + + private def sendRegular[T, R >: PE](r: Request[T, R]): Future[Response[T]] = + Future + .fromTry(ToPekko.request(r).flatMap(BodyToPekko(r, r.body, _))) + .map(customizeRequest) + .flatMap(request => + http + .singleRequest(request, connectionSettings(r)) + .flatMap(response => + Future(customizeResponse(request, response)) + .flatMap(response => responseFromPekko(r, response, None).recoverWith(consumeResponseOnFailure(response))) + ) + ) + + private def sendWebSocket[T, R >: PE](r: Request[T, R]): Future[Response[T]] = { + val pekkoWebsocketRequest = ToPekko + .headers(r.headers) + .map(h => WebSocketRequest(uri = r.uri.toString, extraHeaders = h)) + .map(customizeWebsocketRequest) + + val flowPromise = Promise[Flow[Message, Message, NotUsed]]() + + Future + .fromTry(pekkoWebsocketRequest) + .flatMap(request => + http.singleWebsocketRequest( + request, + Flow.futureFlow(flowPromise.future), + connectionSettings(r).connectionSettings + ) + ) + .flatMap { + case (ValidUpgrade(response, _), _) => + responseFromPekko(r, response, Some(flowPromise)).recoverWith(consumeResponseOnFailure(response)) + case (InvalidUpgradeResponse(response, _), _) => + flowPromise.failure(new InterruptedException) + responseFromPekko(r, response, None).recoverWith(consumeResponseOnFailure(response)) + } + } + + private def consumeResponseOnFailure[T](response: HttpResponse): PartialFunction[Throwable, Future[T]] = { + case t: Throwable => + response.entity.dataBytes + .runWith(Sink.ignore) + .flatMap(_ => Future.failed(t)) + .recoverWith { case _ => Future.failed(t) } + } + + override val responseMonad: MonadError[Future] = new FutureMonad()(ec) + + private def connectionSettings(r: Request[_, _]): ConnectionPoolSettings = { + val connectionPoolSettingsWithProxy = opts.proxy match { + case Some(p) if r.uri.host.forall(!p.ignoreProxy(_)) => + val clientTransport = p.auth match { + case Some(proxyAuth) => + ClientTransport.httpsProxy( + p.inetSocketAddress, + BasicHttpCredentials(proxyAuth.username, proxyAuth.password) + ) + case None => ClientTransport.httpsProxy(p.inetSocketAddress) + } + connectionPoolSettings.withTransport(clientTransport) + case _ => connectionPoolSettings + } + connectionPoolSettingsWithProxy + .withUpdatedConnectionSettings(_.withIdleTimeout(r.options.readTimeout)) + } + + private lazy val bodyFromPekko = new BodyFromPekko()(ec, implicitly[Materializer], responseMonad) + + private def responseFromPekko[T]( + r: Request[T, PE], + hr: HttpResponse, + wsFlow: Option[Promise[Flow[Message, Message, NotUsed]]] + ): Future[Response[T]] = { + val code = StatusCode(hr.status.intValue()) + val statusText = hr.status.reason() + + val headers = FromPekko.headers(hr) + + val responseMetadata = ResponseMetadata(code, statusText, headers) + val body = bodyFromPekko( + r.response, + responseMetadata, + wsFlow.map(Right(_)).getOrElse(Left(decodePekkoResponse(hr, r.autoDecompressionDisabled))) + ) + + body.map(client3.Response(_, code, statusText, headers, Nil, r.onlyMetadata)) + } + + // http://doc.akka.io/docs/akka-http/10.0.7/scala/http/common/de-coding.html + private def decodePekkoResponse(response: HttpResponse, disableAutoDecompression: Boolean): HttpResponse = + if (!response.status.allowsEntity() || disableAutoDecompression) response + else customEncodingHandler.orElse(EncodingHandler(standardEncoding)).apply(response -> response.encoding) + + private def standardEncoding: (HttpResponse, HttpEncoding) => HttpResponse = { + case (body, HttpEncodings.gzip) => Coders.Gzip.decodeMessage(body) + case (body, HttpEncodings.deflate) => Coders.Deflate.decodeMessage(body) + case (body, HttpEncodings.identity) => Coders.NoCoding.decodeMessage(body) + case (_, ce) => throw new UnsupportedEncodingException(s"Unsupported encoding: $ce") + } + + private def adjustExceptions[T](request: Request[_, _])(t: => Future[T]): Future[T] = + SttpClientException.adjustExceptions(responseMonad)(t)(FromPekko.exception(request, _)) + + override def close(): Future[Unit] = + if (terminateActorSystemOnClose) { + CoordinatedShutdown(as).addTask( + CoordinatedShutdown.PhaseServiceRequestsDone, + "shut down all connection pools" + )(() => Http(as).shutdownAllConnectionPools().map(_ => Done)) + actorSystem.terminate().map(_ => ()) + } else Future.successful(()) +} + +object PekkoHttpBackend { + type EncodingHandler = PartialFunction[(HttpResponse, HttpEncoding), HttpResponse] + object EncodingHandler { + def apply(f: (HttpResponse, HttpEncoding) => HttpResponse): EncodingHandler = { case (body, encoding) => + f(body, encoding) + } + } + + private def make( + actorSystem: ActorSystem, + ec: ExecutionContext, + terminateActorSystemOnClose: Boolean, + options: SttpBackendOptions, + customConnectionPoolSettings: Option[ConnectionPoolSettings], + http: PekkoHttpClient, + customizeRequest: HttpRequest => HttpRequest, + customizeWebsocketRequest: WebSocketRequest => WebSocketRequest = identity, + customizeResponse: (HttpRequest, HttpResponse) => HttpResponse = (_, r) => r, + customEncodingHandler: EncodingHandler = PartialFunction.empty + ): SttpBackend[Future, PekkoStreams with WebSockets] = + new FollowRedirectsBackend( + new PekkoHttpBackend( + actorSystem, + ec, + terminateActorSystemOnClose, + options, + customConnectionPoolSettings, + http, + customizeRequest, + customizeWebsocketRequest, + customizeResponse, + customEncodingHandler + ) + ) + + /** @param ec + * The execution context for running non-network related operations, e.g. mapping responses. Defaults to the + * execution context backing the given `actorSystem`. + */ + def apply( + options: SttpBackendOptions = SttpBackendOptions.Default, + customHttpsContext: Option[HttpsConnectionContext] = None, + customConnectionPoolSettings: Option[ConnectionPoolSettings] = None, + customLog: Option[LoggingAdapter] = None, + customizeRequest: HttpRequest => HttpRequest = identity, + customizeWebsocketRequest: WebSocketRequest => WebSocketRequest = identity, + customizeResponse: (HttpRequest, HttpResponse) => HttpResponse = (_, r) => r, + customEncodingHandler: EncodingHandler = PartialFunction.empty + )(implicit + ec: Option[ExecutionContext] = None + ): SttpBackend[Future, PekkoStreams with WebSockets] = { + val actorSystem = ActorSystem("sttp") + + make( + actorSystem, + ec.getOrElse(actorSystem.dispatcher), + terminateActorSystemOnClose = true, + options, + customConnectionPoolSettings, + PekkoHttpClient.default(actorSystem, customHttpsContext, customLog), + customizeRequest, + customizeWebsocketRequest, + customizeResponse, + customEncodingHandler + ) + } + + /** @param actorSystem + * The actor system which will be used for the http-client actors. + * @param ec + * The execution context for running non-network related operations, e.g. mapping responses. Defaults to the + * execution context backing the given `actorSystem`. + */ + def usingActorSystem( + actorSystem: ActorSystem, + options: SttpBackendOptions = SttpBackendOptions.Default, + customHttpsContext: Option[HttpsConnectionContext] = None, + customConnectionPoolSettings: Option[ConnectionPoolSettings] = None, + customLog: Option[LoggingAdapter] = None, + customizeRequest: HttpRequest => HttpRequest = identity, + customizeWebsocketRequest: WebSocketRequest => WebSocketRequest = identity, + customizeResponse: (HttpRequest, HttpResponse) => HttpResponse = (_, r) => r, + customEncodingHandler: EncodingHandler = PartialFunction.empty + )(implicit + ec: Option[ExecutionContext] = None + ): SttpBackend[Future, PekkoStreams with WebSockets] = + usingClient( + actorSystem, + options, + customConnectionPoolSettings, + PekkoHttpClient.default(actorSystem, customHttpsContext, customLog), + customizeRequest, + customizeWebsocketRequest, + customizeResponse, + customEncodingHandler + ) + + /** @param actorSystem + * The actor system which will be used for the http-client actors. + * @param ec + * The execution context for running non-network related operations, e.g. mapping responses. Defaults to the + * execution context backing the given `actorSystem`. + */ + def usingClient( + actorSystem: ActorSystem, + options: SttpBackendOptions = SttpBackendOptions.Default, + customConnectionPoolSettings: Option[ConnectionPoolSettings] = None, + http: PekkoHttpClient, + customizeRequest: HttpRequest => HttpRequest = identity, + customizeWebsocketRequest: WebSocketRequest => WebSocketRequest = identity, + customizeResponse: (HttpRequest, HttpResponse) => HttpResponse = (_, r) => r, + customEncodingHandler: EncodingHandler = PartialFunction.empty + )(implicit + ec: Option[ExecutionContext] = None + ): SttpBackend[Future, PekkoStreams with WebSockets] = + make( + actorSystem, + ec.getOrElse(actorSystem.dispatcher), + terminateActorSystemOnClose = false, + options, + customConnectionPoolSettings, + http, + customizeRequest, + customizeWebsocketRequest, + customizeResponse, + customEncodingHandler + ) + + /** Create a stub backend for testing, which uses the [[Future]] response wrapper, and doesn't support streaming. + * + * See [[SttpBackendStub]] for details on how to configure stub responses. + */ + def stub(implicit + ec: ExecutionContext = ExecutionContext.global + ): SttpBackendStub[Future, PekkoStreams with WebSockets] = + SttpBackendStub(new FutureMonad()) +} diff --git a/pekko-http-backend/src/main/scala/sttp/client3/pekkohttp/PekkoHttpClient.scala b/pekko-http-backend/src/main/scala/sttp/client3/pekkohttp/PekkoHttpClient.scala new file mode 100644 index 0000000000..03e8e65784 --- /dev/null +++ b/pekko-http-backend/src/main/scala/sttp/client3/pekkohttp/PekkoHttpClient.scala @@ -0,0 +1,88 @@ +package sttp.client3.pekkohttp + +import org.apache.pekko +import pekko.actor.ActorSystem +import pekko.event.LoggingAdapter +import pekko.http.scaladsl.model.ws.{Message, WebSocketRequest, WebSocketUpgradeResponse} +import pekko.http.scaladsl.model.{HttpRequest, HttpResponse} +import pekko.http.scaladsl.server.{ExceptionHandler, RejectionHandler, Route, RoutingLog} +import pekko.http.scaladsl.settings.{ClientConnectionSettings, ConnectionPoolSettings, ParserSettings, RoutingSettings} +import pekko.http.scaladsl.{Http, HttpsConnectionContext} +import pekko.stream.Materializer +import pekko.stream.scaladsl.Flow + +import scala.concurrent.{ExecutionContext, ExecutionContextExecutor, Future} + +trait PekkoHttpClient { + def singleRequest( + request: HttpRequest, + settings: ConnectionPoolSettings + ): Future[HttpResponse] + + def singleWebsocketRequest[WS_RESULT]( + request: WebSocketRequest, + clientFlow: Flow[Message, Message, WS_RESULT], + settings: ClientConnectionSettings + )(implicit ec: ExecutionContext, mat: Materializer): Future[(WebSocketUpgradeResponse, WS_RESULT)] +} + +object PekkoHttpClient { + def default( + system: ActorSystem, + connectionContext: Option[HttpsConnectionContext], + customLog: Option[LoggingAdapter] + ): PekkoHttpClient = + new PekkoHttpClient { + private val http = Http()(system) + + override def singleRequest( + request: HttpRequest, + settings: ConnectionPoolSettings + ): Future[HttpResponse] = + http.singleRequest( + request, + connectionContext.getOrElse(http.defaultClientHttpsContext), + settings, + customLog.getOrElse(system.log) + ) + + override def singleWebsocketRequest[WS_RESULT]( + request: WebSocketRequest, + clientFlow: Flow[Message, Message, WS_RESULT], + settings: ClientConnectionSettings + )(implicit ec: ExecutionContext, mat: Materializer): Future[(WebSocketUpgradeResponse, WS_RESULT)] = { + val (wsResponse, wsResult) = http.singleWebSocketRequest( + request, + clientFlow, + connectionContext.getOrElse(http.defaultClientHttpsContext), + None, + settings, + customLog.getOrElse(system.log) + ) + wsResponse.map((_, wsResult)) + } + } + + def stubFromAsyncHandler(run: HttpRequest => Future[HttpResponse]): PekkoHttpClient = + new PekkoHttpClient { + def singleRequest(request: HttpRequest, settings: ConnectionPoolSettings): Future[HttpResponse] = + run(request) + + override def singleWebsocketRequest[WS_RESULT]( + request: WebSocketRequest, + clientFlow: Flow[Message, Message, WS_RESULT], + settings: ClientConnectionSettings + )(implicit ec: ExecutionContext, mat: Materializer): Future[(WebSocketUpgradeResponse, WS_RESULT)] = + Future.failed(new RuntimeException("Websockets are not supported")) + } + + def stubFromRoute(route: Route)(implicit + routingSettings: RoutingSettings, + parserSettings: ParserSettings, + materializer: Materializer, + routingLog: RoutingLog, + executionContext: ExecutionContextExecutor = null, + rejectionHandler: RejectionHandler = RejectionHandler.default, + exceptionHandler: ExceptionHandler = null + ): PekkoHttpClient = stubFromAsyncHandler(Route.asyncHandler(route)) +} diff --git a/pekko-http-backend/src/main/scala/sttp/client3/pekkohttp/PekkoHttpServerSentEvents.scala b/pekko-http-backend/src/main/scala/sttp/client3/pekkohttp/PekkoHttpServerSentEvents.scala new file mode 100644 index 0000000000..2e04c8d1b6 --- /dev/null +++ b/pekko-http-backend/src/main/scala/sttp/client3/pekkohttp/PekkoHttpServerSentEvents.scala @@ -0,0 +1,16 @@ +package sttp.client3.pekkohttp + +import org.apache.pekko +import pekko.NotUsed +import pekko.stream.scaladsl.{Flow, Framing} +import pekko.util.ByteString +import sttp.model.sse.ServerSentEvent + +object PekkoHttpServerSentEvents { + val parse: Flow[ByteString, ServerSentEvent, NotUsed] = + Framing + .delimiter(ByteString("\n\n"), maximumFrameLength = Int.MaxValue, allowTruncation = true) + .map(_.utf8String) + .map(_.split("\n").toList) + .map(ServerSentEvent.parse) +} diff --git a/pekko-http-backend/src/main/scala/sttp/client3/pekkohttp/ToPekko.scala b/pekko-http-backend/src/main/scala/sttp/client3/pekkohttp/ToPekko.scala new file mode 100644 index 0000000000..dec03fb591 --- /dev/null +++ b/pekko-http-backend/src/main/scala/sttp/client3/pekkohttp/ToPekko.scala @@ -0,0 +1,53 @@ +package sttp.client3.pekkohttp + +import org.apache.pekko +import pekko.http.scaladsl.model.{HttpHeader, HttpMethod, HttpMethods, HttpRequest} +import pekko.http.scaladsl.model.HttpHeader.ParsingResult +import sttp.client3.Request +import sttp.model.{Header, Method} + +import scala.collection.immutable.Seq +import scala.util.{Failure, Success, Try} + +private[pekkohttp] object ToPekko { + def request(r: Request[_, _]): Try[HttpRequest] = { + val ar = HttpRequest(uri = r.uri.toString, method = method(r.method)) + ToPekko.headers(r.headers).map(ar.withHeaders) + } + + def headers(headers: Seq[Header]): Try[Seq[HttpHeader]] = { + // content-type and content-length headers have to be set via the body + // entity, not as headers + val parsed = + headers + .filterNot(Util.isContentType) + .filterNot(Util.isContentLength) + .map(h => HttpHeader.parse(h.name, h.value)) + val errors = parsed.collect { case ParsingResult.Error(e) => + e + } + if (errors.isEmpty) { + val headers = parsed.collect { case ParsingResult.Ok(h, _) => + h + } + + Success(headers.toList) + } else { + Failure(new RuntimeException(s"Cannot parse headers: $errors")) + } + } + + private def method(m: Method): HttpMethod = + m match { + case Method.GET => HttpMethods.GET + case Method.HEAD => HttpMethods.HEAD + case Method.POST => HttpMethods.POST + case Method.PUT => HttpMethods.PUT + case Method.DELETE => HttpMethods.DELETE + case Method.OPTIONS => HttpMethods.OPTIONS + case Method.PATCH => HttpMethods.PATCH + case Method.CONNECT => HttpMethods.CONNECT + case Method.TRACE => HttpMethods.TRACE + case _ => HttpMethod.custom(m.method) + } +} diff --git a/pekko-http-backend/src/main/scala/sttp/client3/pekkohttp/Util.scala b/pekko-http-backend/src/main/scala/sttp/client3/pekkohttp/Util.scala new file mode 100644 index 0000000000..23889fe03e --- /dev/null +++ b/pekko-http-backend/src/main/scala/sttp/client3/pekkohttp/Util.scala @@ -0,0 +1,45 @@ +package sttp.client3.pekkohttp + +import org.apache.pekko +import pekko.http.scaladsl.model.ContentType +import pekko.http.scaladsl.model.ContentTypes.`application/octet-stream` +import pekko.http.scaladsl.model.headers.{`Content-Length`, `Content-Type`} +import sttp.client3.Request +import sttp.model.Header + +import scala.collection.immutable.Seq +import scala.util.{Failure, Success, Try} + +private[pekkohttp] object Util { + def traverseTry[T](l: Seq[Try[T]]): Try[Seq[T]] = { + // https://stackoverflow.com/questions/15495678/flatten-scala-try + val (ss: Seq[Success[T]] @unchecked, fs: Seq[Failure[T]] @unchecked) = + l.partition(_.isSuccess) + + if (fs.isEmpty) Success(ss.map(_.get)) + else Failure[Seq[T]](fs.head.exception) + } + + def parseContentTypeOrOctetStream(r: Request[_, _]): Try[ContentType] = + parseContentTypeOrOctetStream( + r.headers + .find(isContentType) + .map(_.value) + ) + + def parseContentTypeOrOctetStream(ctHeader: Option[String]): Try[ContentType] = + ctHeader + .map(parseContentType) + .getOrElse(Success(`application/octet-stream`)) + + def parseContentType(ctHeader: String): Try[ContentType] = + ContentType + .parse(ctHeader) + .fold(errors => Failure(new RuntimeException(s"Cannot parse content type: $errors")), Success(_)) + + def isContentType(header: Header): Boolean = + header.name.toLowerCase.contains(`Content-Type`.lowercaseName) + + def isContentLength(header: Header): Boolean = + header.name.toLowerCase.contains(`Content-Length`.lowercaseName) +} diff --git a/pekko-http-backend/src/test/scala/sttp/client3/pekkohttp/BackendStubPekkoTests.scala b/pekko-http-backend/src/test/scala/sttp/client3/pekkohttp/BackendStubPekkoTests.scala new file mode 100644 index 0000000000..6dff7afa54 --- /dev/null +++ b/pekko-http-backend/src/test/scala/sttp/client3/pekkohttp/BackendStubPekkoTests.scala @@ -0,0 +1,38 @@ +package sttp.client3.pekkohttp + +import org.apache.pekko +import pekko.actor.ActorSystem +import org.scalatest.BeforeAndAfterAll +import org.scalatest.concurrent.ScalaFutures +import org.scalatest.flatspec.AnyFlatSpec +import org.scalatest.matchers.should.Matchers +import sttp.client3._ + +import scala.concurrent.Await +import scala.concurrent.ExecutionContext.Implicits.global +import scala.concurrent.duration._ + +class BackendStubPekkoTests extends AnyFlatSpec with Matchers with ScalaFutures with BeforeAndAfterAll { + + implicit val system: ActorSystem = ActorSystem() + + override protected def afterAll(): Unit = { + Await.result(system.terminate().map(_ => ()), 5.seconds) + } + + "backend stub" should "cycle through responses using a single sent request" in { + // given + val backend = PekkoHttpBackend.stub + .whenRequestMatches(_ => true) + .thenRespondCyclic("a", "b", "c") + + // when + def r = basicRequest.get(uri"http://example.org/a/b/c").send(backend).futureValue + + // then + r.body shouldBe Right("a") + r.body shouldBe Right("b") + r.body shouldBe Right("c") + r.body shouldBe Right("a") + } +} diff --git a/pekko-http-backend/src/test/scala/sttp/client3/pekkohttp/PekkoHttpClientHttpTest.scala b/pekko-http-backend/src/test/scala/sttp/client3/pekkohttp/PekkoHttpClientHttpTest.scala new file mode 100644 index 0000000000..9c2926f78e --- /dev/null +++ b/pekko-http-backend/src/test/scala/sttp/client3/pekkohttp/PekkoHttpClientHttpTest.scala @@ -0,0 +1,14 @@ +package sttp.client3.pekkohttp + +import sttp.client3.SttpBackend +import sttp.client3.testing.{ConvertToFuture, HttpTest} + +import scala.concurrent.Future + +class PekkoHttpClientHttpTest extends HttpTest[Future] { + override val backend: SttpBackend[Future, Any] = PekkoHttpBackend() + override implicit val convertToFuture: ConvertToFuture[Future] = ConvertToFuture.future + + override def supportsCancellation: Boolean = false + override def timeoutToNone[T](t: Future[T], timeoutMillis: Int): Future[Option[T]] = t.map(Some(_)) +} diff --git a/pekko-http-backend/src/test/scala/sttp/client3/pekkohttp/PekkoHttpRouteBackendTest.scala b/pekko-http-backend/src/test/scala/sttp/client3/pekkohttp/PekkoHttpRouteBackendTest.scala new file mode 100644 index 0000000000..64091401b8 --- /dev/null +++ b/pekko-http-backend/src/test/scala/sttp/client3/pekkohttp/PekkoHttpRouteBackendTest.scala @@ -0,0 +1,56 @@ +package sttp.client3.pekkohttp + +import org.apache.pekko +import pekko.actor.ActorSystem +import pekko.http.scaladsl.server.Route +import org.scalatest.BeforeAndAfterAll +import sttp.model.StatusCode + +import scala.concurrent.duration._ +import scala.concurrent.{Await, Future} +import org.scalatest.matchers.should.Matchers +import org.scalatest.wordspec.AsyncWordSpec +import sttp.client3.SttpBackend + +class PekkoHttpRouteBackendTest extends AsyncWordSpec with Matchers with BeforeAndAfterAll { + + implicit val system: ActorSystem = ActorSystem() + + override protected def afterAll(): Unit = { + Await.result(system.terminate(), 5.seconds) + } + + val backend: SttpBackend[Future, Any] = + PekkoHttpBackend.usingClient(system, http = PekkoHttpClient.stubFromRoute(Routes.route)) + + import sttp.client3._ + + "matched route" should { + + "respond" in { + basicRequest.get(uri"http://localhost/hello").send(backend).map { response => + response.code shouldBe StatusCode.Ok + response.body.right.get shouldBe "Hello, world!" + } + } + } + + "unmatched route" should { + "respond with 404" in { + basicRequest.get(uri"http://localhost/not-matching").send(backend).map { response => + response.code shouldBe StatusCode.NotFound + response.body.left.get shouldBe "The requested resource could not be found." + } + } + } + +} + +object Routes { + import pekko.http.scaladsl.server.Directives._ + + val route: Route = + pathPrefix("hello") { + complete("Hello, world!") + } +} diff --git a/pekko-http-backend/src/test/scala/sttp/client3/pekkohttp/PekkoHttpStreamingTest.scala b/pekko-http-backend/src/test/scala/sttp/client3/pekkohttp/PekkoHttpStreamingTest.scala new file mode 100644 index 0000000000..aed9856c8d --- /dev/null +++ b/pekko-http-backend/src/test/scala/sttp/client3/pekkohttp/PekkoHttpStreamingTest.scala @@ -0,0 +1,36 @@ +package sttp.client3.pekkohttp + +import org.apache.pekko +import pekko.actor.ActorSystem +import pekko.stream.scaladsl.Source +import pekko.util.ByteString +import sttp.capabilities.pekko.PekkoStreams +import sttp.model.sse.ServerSentEvent +import sttp.client3.testing.ConvertToFuture +import sttp.client3.testing.streaming.StreamingTest + +import scala.concurrent.Future +import sttp.client3.SttpBackend + +class PekkoHttpStreamingTest extends StreamingTest[Future, PekkoStreams] { + override val streams: PekkoStreams = PekkoStreams + + private implicit val actorSystem: ActorSystem = ActorSystem("sttp-test") + + override val backend: SttpBackend[Future, PekkoStreams] = + PekkoHttpBackend.usingActorSystem(actorSystem) + + override implicit val convertToFuture: ConvertToFuture[Future] = + ConvertToFuture.future + + override def bodyProducer(chunks: Iterable[Array[Byte]]): Source[ByteString, Any] = + Source.apply(chunks.toList.map(ByteString(_))) + + override def bodyConsumer(stream: Source[ByteString, Any]): Future[String] = + stream.map(_.utf8String).runReduce(_ + _) + + override def sseConsumer(stream: Source[ByteString, Any]): Future[List[ServerSentEvent]] = + stream.via(PekkoHttpServerSentEvents.parse).runFold(List.empty[ServerSentEvent]) { case (list, event) => + list :+ event + } +} diff --git a/pekko-http-backend/src/test/scala/sttp/client3/pekkohttp/PekkoHttpWebSocketTest.scala b/pekko-http-backend/src/test/scala/sttp/client3/pekkohttp/PekkoHttpWebSocketTest.scala new file mode 100644 index 0000000000..f6b7d5a741 --- /dev/null +++ b/pekko-http-backend/src/test/scala/sttp/client3/pekkohttp/PekkoHttpWebSocketTest.scala @@ -0,0 +1,41 @@ +package sttp.client3.pekkohttp + +import org.apache.pekko +import pekko.stream.scaladsl.{Flow, Source} +import sttp.capabilities.pekko.PekkoStreams +import sttp.client3._ +import sttp.client3.testing.ConvertToFuture +import sttp.client3.testing.websocket.{WebSocketConcurrentTest, WebSocketStreamingTest, WebSocketTest} +import sttp.monad.{FutureMonad, MonadError} +import sttp.ws.WebSocketFrame + +import scala.concurrent.{ExecutionContext, Future} +import sttp.capabilities.WebSockets + +class PekkoHttpWebSocketTest + extends WebSocketTest[Future] + with WebSocketStreamingTest[Future, PekkoStreams] + with WebSocketConcurrentTest[Future] { + override val streams: PekkoStreams = PekkoStreams + implicit val ec: ExecutionContext = scala.concurrent.ExecutionContext.global + + override val backend: SttpBackend[Future, PekkoStreams with WebSockets] = PekkoHttpBackend() + override implicit val convertToFuture: ConvertToFuture[Future] = ConvertToFuture.future + override implicit val monad: MonadError[Future] = new FutureMonad + + override def functionToPipe( + f: WebSocketFrame.Data[_] => Option[WebSocketFrame] + ): Flow[WebSocketFrame.Data[_], WebSocketFrame, Any] = + Flow.fromFunction(f).mapConcat(_.toList): Flow[WebSocketFrame.Data[_], WebSocketFrame, Any] + + override def prepend( + item: WebSocketFrame.Text + )(to: Flow[WebSocketFrame.Data[_], WebSocketFrame, Any]): Flow[WebSocketFrame.Data[_], WebSocketFrame, Any] = + to.prepend(Source(List(item))) + + override def fromTextPipe(function: String => WebSocketFrame): Flow[WebSocketFrame.Data[_], WebSocketFrame, Any] = { + Flow[WebSocketFrame.Data[_]].collect { case tf: WebSocketFrame.Text => function(tf.payload) } + } + + override def concurrently[T](fs: List[() => Future[T]]): Future[List[T]] = Future.sequence(fs.map(_())) +} diff --git a/project/plugins.sbt b/project/plugins.sbt index eda7ceb76c..6022984d57 100644 --- a/project/plugins.sbt +++ b/project/plugins.sbt @@ -4,7 +4,7 @@ addSbtPlugin("org.scala-native" % "sbt-scala-native" % "0.4.14") addSbtPlugin("io.spray" % "sbt-revolver" % "0.9.1") addSbtPlugin("com.eed3si9n" % "sbt-projectmatrix" % "0.9.0") -val sbtSoftwareMillVersion = "2.0.12" +val sbtSoftwareMillVersion = "2.0.13" addSbtPlugin("com.softwaremill.sbt-softwaremill" % "sbt-softwaremill-common" % sbtSoftwareMillVersion) addSbtPlugin("com.softwaremill.sbt-softwaremill" % "sbt-softwaremill-publish" % sbtSoftwareMillVersion) addSbtPlugin("com.softwaremill.sbt-softwaremill" % "sbt-softwaremill-browser-test-js" % sbtSoftwareMillVersion)