diff --git a/build.sbt b/build.sbt index 03c7127ea9..114f2ea079 100644 --- a/build.sbt +++ b/build.sbt @@ -987,6 +987,23 @@ lazy val grpcExamples: ProjectMatrix = (projectMatrix in file("grpc/examples")) akkaGrpcServer ) +lazy val pekkoGrpcExamples: ProjectMatrix = (projectMatrix in file("grpc/pekko-examples")) + .settings(commonSettings) + .settings( + name := "tapir-pekko-grpc-examples", + libraryDependencies ++= Seq( + "org.apache.pekko" %% "pekko-discovery" % "1.0.1" + ), + fork := true + ) + .enablePlugins(PekkoGrpcPlugin) + .jvmPlatform(scalaVersions = scala2And3Versions) + .dependsOn( + protobuf, + pbDirectProtobuf, + pekkoGrpcServer + ) + // metrics lazy val prometheusMetrics: ProjectMatrix = (projectMatrix in file("metrics/prometheus-metrics")) @@ -1216,6 +1233,17 @@ lazy val akkaGrpcServer: ProjectMatrix = (projectMatrix in file("server/akka-grp .jvmPlatform(scalaVersions = scala2Versions) .dependsOn(serverCore, akkaHttpServer) +lazy val pekkoGrpcServer: ProjectMatrix = (projectMatrix in file("server/pekko-grpc-server")) + .settings(commonJvmSettings) + .settings( + name := "tapir-pekko-grpc-server", + libraryDependencies ++= Seq( + "org.apache.pekko" %% "pekko-grpc-runtime" % "1.0.0" + ) + ) + .jvmPlatform(scalaVersions = scala2And3Versions) + .dependsOn(serverCore, pekkoHttpServer) + lazy val armeriaServer: ProjectMatrix = (projectMatrix in file("server/armeria-server")) .settings(commonJvmSettings) .settings( diff --git a/doc/grpc.md b/doc/grpc.md index 6006e72087..740a061766 100644 --- a/doc/grpc.md +++ b/doc/grpc.md @@ -13,6 +13,8 @@ from these endpoints' definitions. necessary to add this module for generating proto files with the `protobuf` module. * `akkaGrpcServer` - a module that provides `AkkaGrpcServerInterpreter` implementation. It should be used to serve tapir grpc endpoints. +* `pekkoGrpcServer` - a module that provides `PekkoGrpcServerInterpreter` implementation. It can be used as an + alternative to `akkaGrpcServer`. * `grpcExamples` - contains example use cases ## Defining endpoints diff --git a/grpc/pekko-examples/src/main/protobuf/simple_books_example.proto b/grpc/pekko-examples/src/main/protobuf/simple_books_example.proto new file mode 100644 index 0000000000..5e0e7a99c3 --- /dev/null +++ b/grpc/pekko-examples/src/main/protobuf/simple_books_example.proto @@ -0,0 +1,19 @@ +syntax = "proto3"; + +option java_multiple_files = true; +option java_package = "sttp.tapir.grpc.examples.grpc_simple_books_example.gen"; + +service Library { + rpc AddBook (AddBookMsg) returns (SimpleBook) {} +} + +message SimpleBook { + int32 id = 1; + string title = 2; + string description = 3; +} + +message AddBookMsg { + string title = 1; + string description = 2; +} \ No newline at end of file diff --git a/grpc/pekko-examples/src/main/scala/sttp/tapir/grpc/examples/PekkoGrpcSimpleBooksExample.scala b/grpc/pekko-examples/src/main/scala/sttp/tapir/grpc/examples/PekkoGrpcSimpleBooksExample.scala new file mode 100644 index 0000000000..3021adf857 --- /dev/null +++ b/grpc/pekko-examples/src/main/scala/sttp/tapir/grpc/examples/PekkoGrpcSimpleBooksExample.scala @@ -0,0 +1,98 @@ +package sttp.tapir.grpc.examples + +import org.apache.pekko.actor.ActorSystem +import org.apache.pekko.grpc.GrpcClientSettings +import org.apache.pekko.http.scaladsl.Http +import cats.implicits._ +import com.typesafe.config.ConfigFactory +import com.typesafe.scalalogging.StrictLogging +import sttp.tapir._ +import sttp.tapir.grpc.protobuf._ +import sttp.tapir.grpc.protobuf.pbdirect._ +import sttp.tapir.server.ServerEndpoint +import sttp.tapir.server.pekkogrpc.PekkoGrpcServerInterpreter +import sttp.tapir.generic.auto._ +import sttp.tapir.grpc.examples.grpc_simple_books_example.gen.{ + Library => GenLibrary, + LibraryClient => GenLibraryClient, + AddBookMsg => GenAddBookMsg +} + +import java.util.concurrent.atomic.AtomicLong +import scala.concurrent.{Await, ExecutionContext, Future} + +case class SimpleBook(id: Long, title: String, description: String) +case class AddBookMsg(title: String, description: String) + +/** Descriptions of endpoints used in the example. + */ +object Endpoints { + val addBook = endpoint + .in("Library" / "AddBook") + .in(grpcBody[AddBookMsg]) + .out(grpcBody[SimpleBook]) + + val endpoints = List(addBook) +} + +object SimpleBooksExampleServer extends StrictLogging { + + import Endpoints._ + + private val counter = new AtomicLong(0) + + def booksServerEndpoints: List[ServerEndpoint[Any, Future]] = + List( + addBook.serverLogic { book => + logger.info(s"Adding a new book [$book]") + Future.successful(SimpleBook(counter.getAndIncrement(), book.title, book.description).asRight[Unit]) + } + ) + + def main(args: Array[String]): Unit = { + val conf = ConfigFactory + .parseString("pekko.http.server.preview.enable-http2 = on") + .withFallback(ConfigFactory.defaultApplication()) + val system = ActorSystem("HelloWorld", conf) + + new ExampleGrpcServer(system).run() + } +} + +class ExampleGrpcServer(system: ActorSystem) extends StrictLogging { + def run(): Future[Http.ServerBinding] = { + // Pekko boot up code + implicit val sys: ActorSystem = system + implicit val ec: ExecutionContext = sys.dispatcher + + val route = PekkoGrpcServerInterpreter().toRoute(SimpleBooksExampleServer.booksServerEndpoints) + + val binding = Http().newServerAt("127.0.0.1", 8080).bind(route) + + // report successful binding + binding.foreach { binding => logger.info(s"gRPC server bound to: ${binding.localAddress}") } + + binding + } +} + +object SimpleBookExampleProtoGenerator extends App { + ProtoSchemaGenerator.renderToFile( + path = "grpc/examples/src/main/protobuf/simple_books_example.proto", + packageName = "sttp.tapir.grpc.examples.grpc_simple_books_example.gen", + endpoints = Endpoints.endpoints + ) +} + +object SimpleBookExampleClient extends App with StrictLogging { + + import scala.concurrent.duration._ + + implicit val sys = ActorSystem("HelloWorldClient") + implicit val ec = sys.dispatcher + + val client = GenLibraryClient(GrpcClientSettings.connectToServiceAt("localhost", 8080).withTls(false)) + val result = Await.result(client.addBook(GenAddBookMsg("TEST_BOOK", "TEST")), 10.second) + + logger.info(s"Result: [$result]") +} diff --git a/project/plugins.sbt b/project/plugins.sbt index cb648bf7f5..4a58419e9b 100644 --- a/project/plugins.sbt +++ b/project/plugins.sbt @@ -18,5 +18,6 @@ addSbtPlugin("io.gatling" % "gatling-sbt" % "4.5.0") addSbtPlugin("com.eed3si9n" % "sbt-buildinfo" % "0.11.0") addSbtPlugin("org.scala-native" % "sbt-scala-native" % "0.4.15") addSbtPlugin("com.lightbend.akka.grpc" % "sbt-akka-grpc" % "2.1.4") +addSbtPlugin("org.apache.pekko" % "pekko-grpc-sbt-plugin" % "1.0.0") addDependencyTreePlugin diff --git a/server/pekko-grpc-server/src/main/scala/sttp/tapir/server/pekkogrpc/PekkoGrpcRequestBody.scala b/server/pekko-grpc-server/src/main/scala/sttp/tapir/server/pekkogrpc/PekkoGrpcRequestBody.scala new file mode 100644 index 0000000000..3eccd49401 --- /dev/null +++ b/server/pekko-grpc-server/src/main/scala/sttp/tapir/server/pekkogrpc/PekkoGrpcRequestBody.scala @@ -0,0 +1,52 @@ +package sttp.tapir.server.pekkogrpc + +import org.apache.pekko.grpc.internal.{GrpcProtocolNative, Identity, SingleParameterSink} +import org.apache.pekko.http.scaladsl.model.HttpEntity +import org.apache.pekko.http.scaladsl.server.RequestContext +import org.apache.pekko.stream.Materializer +import org.apache.pekko.util.ByteString +import sttp.capabilities.pekko.PekkoStreams +import sttp.tapir.{InputStreamRange, RawBodyType} +import sttp.tapir.model.ServerRequest +import sttp.tapir.server.pekkohttp.PekkoHttpServerOptions +import sttp.tapir.server.interpreter.{RawValue, RequestBody} + +import java.io.ByteArrayInputStream +import scala.concurrent.{ExecutionContext, Future} +import scala.util.Try + +private[pekkogrpc] class PekkoGrpcRequestBody(serverOptions: PekkoHttpServerOptions)(implicit + mat: Materializer, + ec: ExecutionContext +) extends RequestBody[Future, PekkoStreams] { + private val grpcProtocol = GrpcProtocolNative.newReader(Identity) + + override val streams: PekkoStreams = PekkoStreams + override def toRaw[R](request: ServerRequest, bodyType: RawBodyType[R]): Future[RawValue[R]] = + toRawFromEntity(request, akkaRequestEntity(request), bodyType) + + override def toStream(request: ServerRequest): streams.BinaryStream = ??? + + private def akkaRequestEntity(request: ServerRequest) = request.underlying.asInstanceOf[RequestContext].request.entity + + private def toRawFromEntity[R](request: ServerRequest, body: HttpEntity, bodyType: RawBodyType[R]): Future[RawValue[R]] = { + // Copy-paste from akka.grpc.scaladsl.GrpcMarshalling#unmarshal + body match { + case HttpEntity.Strict(_, data) => Future.fromTry(Try(toExpectedBodyType(data, bodyType))) + case _ => body.dataBytes.via(grpcProtocol.dataFrameDecoder).map(toExpectedBodyType(_, bodyType)).runWith(SingleParameterSink()) + } + } + + private def toExpectedBodyType[R](byteString: ByteString, bodyType: RawBodyType[R]): RawValue[R] = { + bodyType match { + case RawBodyType.ByteArrayBody => RawValue(byteString.toArray) + case RawBodyType.ByteBufferBody => RawValue(byteString.asByteBuffer) + case RawBodyType.InputStreamBody => RawValue(new ByteArrayInputStream(byteString.toArray)) + case RawBodyType.InputStreamRangeBody => RawValue(InputStreamRange(() => new ByteArrayInputStream(byteString.toArray))) + case RawBodyType.FileBody => ??? + case m: RawBodyType.MultipartBody => ??? + case _ => ??? + } + } + +} diff --git a/server/pekko-grpc-server/src/main/scala/sttp/tapir/server/pekkogrpc/PekkoGrpcServerInterpreter.scala b/server/pekko-grpc-server/src/main/scala/sttp/tapir/server/pekkogrpc/PekkoGrpcServerInterpreter.scala new file mode 100644 index 0000000000..075baf4ff9 --- /dev/null +++ b/server/pekko-grpc-server/src/main/scala/sttp/tapir/server/pekkogrpc/PekkoGrpcServerInterpreter.scala @@ -0,0 +1,21 @@ +package sttp.tapir.server.pekkogrpc + +import org.apache.pekko.http.scaladsl.server.Route +import sttp.capabilities.WebSockets +import sttp.capabilities.pekko.PekkoStreams +import sttp.tapir.server.ServerEndpoint +import sttp.tapir.server.pekkohttp.PekkoHttpServerInterpreter + +import scala.concurrent.{ExecutionContext, Future} + +trait PekkoGrpcServerInterpreter extends PekkoHttpServerInterpreter { + override def toRoute(ses: List[ServerEndpoint[PekkoStreams with WebSockets, Future]]): Route = + toRoute(new PekkoGrpcRequestBody(pekkoHttpServerOptions)(_, _), new PekkoGrpcToResponseBody()(_, _))(ses) + +} + +object PekkoGrpcServerInterpreter { + def apply()(implicit _ec: ExecutionContext): PekkoGrpcServerInterpreter = new PekkoGrpcServerInterpreter { + override implicit def executionContext: ExecutionContext = _ec + } +} diff --git a/server/pekko-grpc-server/src/main/scala/sttp/tapir/server/pekkogrpc/PekkoGrpcToResponseBody.scala b/server/pekko-grpc-server/src/main/scala/sttp/tapir/server/pekkogrpc/PekkoGrpcToResponseBody.scala new file mode 100644 index 0000000000..6eb0cd13ff --- /dev/null +++ b/server/pekko-grpc-server/src/main/scala/sttp/tapir/server/pekkogrpc/PekkoGrpcToResponseBody.scala @@ -0,0 +1,88 @@ +package sttp.tapir.server.pekkogrpc + +import org.apache.pekko.grpc.internal.AbstractGrpcProtocol +import org.apache.pekko.http.scaladsl.model._ +import org.apache.pekko.stream.Materializer +import org.apache.pekko.util.ByteString +import sttp.capabilities.pekko.PekkoStreams +import sttp.model.HasHeaders +import sttp.tapir.internal.charset +import sttp.tapir.server.pekkohttp.PekkoResponseBody +import sttp.tapir.server.interpreter.ToResponseBody +import sttp.tapir.{CodecFormat, RawBodyType, WebSocketBodyOutput} + +import java.nio.charset.{Charset, StandardCharsets} +import scala.concurrent.ExecutionContext + +private[pekkogrpc] class PekkoGrpcToResponseBody(implicit m: Materializer, ec: ExecutionContext) + extends ToResponseBody[PekkoResponseBody, PekkoStreams] { + override val streams: PekkoStreams = PekkoStreams + + override def fromRawValue[R](v: R, headers: HasHeaders, format: CodecFormat, bodyType: RawBodyType[R]): PekkoResponseBody = + Right( + overrideContentTypeIfDefined( + rawValueToResponseEntity(bodyType, formatToContentType(format, charset(bodyType)), headers.contentLength, v), + headers + ) + ) + + override def fromStreamValue( + v: streams.BinaryStream, + headers: HasHeaders, + format: CodecFormat, + charset: Option[Charset] + ): PekkoResponseBody = ??? + + override def fromWebSocketPipe[REQ, RESP]( + pipe: streams.Pipe[REQ, RESP], + o: WebSocketBodyOutput[streams.Pipe[REQ, RESP], REQ, RESP, _, PekkoStreams] + ): PekkoResponseBody = ??? + + private def rawValueToResponseEntity[CF <: CodecFormat, R]( + bodyType: RawBodyType[R], + ct: ContentType, + contentLength: Option[Long], + r: R + ): ResponseEntity = { + bodyType match { + case RawBodyType.StringBody(charset) => ??? + case RawBodyType.ByteArrayBody => HttpEntity(ct, encodeDataToFrameBytes(ByteString(r))) + case RawBodyType.ByteBufferBody => HttpEntity(ct, encodeDataToFrameBytes(ByteString(r))) + case RawBodyType.InputStreamBody => ??? + case RawBodyType.InputStreamRangeBody => ??? + case RawBodyType.FileBody => ??? + case m: RawBodyType.MultipartBody => ??? + } + } + + private def formatToContentType(format: CodecFormat, charset: Option[Charset]): ContentType = { + format match { + case CodecFormat.Json() => ContentTypes.`application/json` + case CodecFormat.TextPlain() => MediaTypes.`text/plain`.withCharset(charsetToHttpCharset(charset.getOrElse(StandardCharsets.UTF_8))) + case CodecFormat.TextHtml() => MediaTypes.`text/html`.withCharset(charsetToHttpCharset(charset.getOrElse(StandardCharsets.UTF_8))) + case CodecFormat.OctetStream() => MediaTypes.`application/octet-stream` + case CodecFormat.Zip() => MediaTypes.`application/zip` + case CodecFormat.XWwwFormUrlencoded() => MediaTypes.`application/x-www-form-urlencoded` + case CodecFormat.MultipartFormData() => MediaTypes.`multipart/form-data` + case f => + val mt = if (f.mediaType.isText) charset.fold(f.mediaType)(f.mediaType.charset(_)) else f.mediaType + parseContentType(mt.toString()) + } + } + + private def parseContentType(ct: String): ContentType = + ContentType.parse(ct).getOrElse(throw new IllegalArgumentException(s"Cannot parse content type: $ct")) + + private def charsetToHttpCharset(charset: Charset): HttpCharset = HttpCharset.custom(charset.name()) + + private def overrideContentTypeIfDefined[RE <: ResponseEntity](re: RE, headers: HasHeaders): RE = { + headers.contentType match { + case Some(ct) => re.withContentType(parseContentType(ct)).asInstanceOf[RE] + case None => re + } + } + + // TODO support for compressed body + private def encodeDataToFrameBytes(data: ByteString): ByteString = + AbstractGrpcProtocol.encodeFrameData(data, isCompressed = false, isTrailer = false) +}