Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add pekko grpc support #3124

Merged
merged 4 commits into from
Sep 27, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 30 additions & 0 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,7 @@ lazy val rawAllAggregates = core.projectRefs ++
protobuf.projectRefs ++
pbDirectProtobuf.projectRefs ++
grpcExamples.projectRefs ++
pekkoGrpcExamples.projectRefs ++
apispecDocs.projectRefs ++
openapiDocs.projectRefs ++
asyncapiDocs.projectRefs ++
Expand All @@ -200,6 +201,7 @@ lazy val rawAllAggregates = core.projectRefs ++
akkaHttpServer.projectRefs ++
akkaGrpcServer.projectRefs ++
pekkoHttpServer.projectRefs ++
pekkoGrpcServer.projectRefs ++
armeriaServer.projectRefs ++
armeriaServerCats.projectRefs ++
armeriaServerZio.projectRefs ++
Expand Down Expand Up @@ -987,6 +989,23 @@ lazy val grpcExamples: ProjectMatrix = (projectMatrix in file("grpc/examples"))
akkaGrpcServer
)

lazy val pekkoGrpcExamples: ProjectMatrix = (projectMatrix in file("grpc/pekko-examples"))
pjfanning marked this conversation as resolved.
Show resolved Hide resolved
.settings(commonSettings)
.settings(
name := "tapir-pekko-grpc-examples",
libraryDependencies ++= Seq(
"org.apache.pekko" %% "pekko-discovery" % "1.0.1"
),
fork := true
)
.enablePlugins(PekkoGrpcPlugin)
.jvmPlatform(scalaVersions = scala2Versions)
.dependsOn(
protobuf,
pbDirectProtobuf,
pekkoGrpcServer
)

// metrics

lazy val prometheusMetrics: ProjectMatrix = (projectMatrix in file("metrics/prometheus-metrics"))
Expand Down Expand Up @@ -1216,6 +1235,17 @@ lazy val akkaGrpcServer: ProjectMatrix = (projectMatrix in file("server/akka-grp
.jvmPlatform(scalaVersions = scala2Versions)
.dependsOn(serverCore, akkaHttpServer)

lazy val pekkoGrpcServer: ProjectMatrix = (projectMatrix in file("server/pekko-grpc-server"))
.settings(commonJvmSettings)
.settings(
name := "tapir-pekko-grpc-server",
libraryDependencies ++= Seq(
"org.apache.pekko" %% "pekko-grpc-runtime" % "1.0.0"
)
)
.jvmPlatform(scalaVersions = scala2And3Versions)
.dependsOn(serverCore, pekkoHttpServer)

lazy val armeriaServer: ProjectMatrix = (projectMatrix in file("server/armeria-server"))
.settings(commonJvmSettings)
.settings(
Expand Down
2 changes: 2 additions & 0 deletions doc/grpc.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ from these endpoints' definitions.
necessary to add this module for generating proto files with the `protobuf` module.
* `akkaGrpcServer` - a module that provides `AkkaGrpcServerInterpreter` implementation. It should be used to serve tapir
grpc endpoints.
* `pekkoGrpcServer` - a module that provides `PekkoGrpcServerInterpreter` implementation. It can be used as an
alternative to `akkaGrpcServer`.
* `grpcExamples` - contains example use cases

## Defining endpoints
Expand Down
19 changes: 19 additions & 0 deletions grpc/pekko-examples/src/main/protobuf/simple_books_example.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
syntax = "proto3";

option java_multiple_files = true;
option java_package = "sttp.tapir.grpc.examples.grpc_simple_books_example.gen";

service Library {
rpc AddBook (AddBookMsg) returns (SimpleBook) {}
}

message SimpleBook {
int32 id = 1;
string title = 2;
string description = 3;
}

message AddBookMsg {
string title = 1;
string description = 2;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
package sttp.tapir.grpc.examples

import org.apache.pekko.actor.ActorSystem
import org.apache.pekko.grpc.GrpcClientSettings
import org.apache.pekko.http.scaladsl.Http
import cats.implicits._
import com.typesafe.config.ConfigFactory
import com.typesafe.scalalogging.StrictLogging
import sttp.tapir._
import sttp.tapir.grpc.protobuf._
import sttp.tapir.grpc.protobuf.pbdirect._
import sttp.tapir.server.ServerEndpoint
import sttp.tapir.server.pekkogrpc.PekkoGrpcServerInterpreter
import sttp.tapir.generic.auto._
import sttp.tapir.grpc.examples.grpc_simple_books_example.gen.{
Library => GenLibrary,
LibraryClient => GenLibraryClient,
AddBookMsg => GenAddBookMsg
}

import java.util.concurrent.atomic.AtomicLong
import scala.concurrent.{Await, ExecutionContext, Future}

case class SimpleBook(id: Long, title: String, description: String)
case class AddBookMsg(title: String, description: String)

/** Descriptions of endpoints used in the example.
*/
object Endpoints {
val addBook = endpoint
.in("Library" / "AddBook")
.in(grpcBody[AddBookMsg])
.out(grpcBody[SimpleBook])

val endpoints = List(addBook)
}

object SimpleBooksExampleServer extends StrictLogging {

import Endpoints._

private val counter = new AtomicLong(0)

def booksServerEndpoints: List[ServerEndpoint[Any, Future]] =
List(
addBook.serverLogic { book =>
logger.info(s"Adding a new book [$book]")
Future.successful(SimpleBook(counter.getAndIncrement(), book.title, book.description).asRight[Unit])
}
)

def main(args: Array[String]): Unit = {
val conf = ConfigFactory
.parseString("pekko.http.server.preview.enable-http2 = on")
.withFallback(ConfigFactory.defaultApplication())
val system = ActorSystem("HelloWorld", conf)

new ExampleGrpcServer(system).run()
}
}

class ExampleGrpcServer(system: ActorSystem) extends StrictLogging {
def run(): Future[Http.ServerBinding] = {
// Pekko boot up code
implicit val sys: ActorSystem = system
implicit val ec: ExecutionContext = sys.dispatcher

val route = PekkoGrpcServerInterpreter().toRoute(SimpleBooksExampleServer.booksServerEndpoints)

val binding = Http().newServerAt("127.0.0.1", 8080).bind(route)

// report successful binding
binding.foreach { binding => logger.info(s"gRPC server bound to: ${binding.localAddress}") }

binding
}
}

object SimpleBookExampleProtoGenerator extends App {
ProtoSchemaGenerator.renderToFile(
path = "grpc/examples/src/main/protobuf/simple_books_example.proto",
packageName = "sttp.tapir.grpc.examples.grpc_simple_books_example.gen",
endpoints = Endpoints.endpoints
)
}

object SimpleBookExampleClient extends App with StrictLogging {

import scala.concurrent.duration._

implicit val sys = ActorSystem("HelloWorldClient")
implicit val ec = sys.dispatcher

val client = GenLibraryClient(GrpcClientSettings.connectToServiceAt("localhost", 8080).withTls(false))
val result = Await.result(client.addBook(GenAddBookMsg("TEST_BOOK", "TEST")), 10.second)

logger.info(s"Result: [$result]")
}
3 changes: 3 additions & 0 deletions project/plugins.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -18,5 +18,8 @@ addSbtPlugin("io.gatling" % "gatling-sbt" % "4.5.0")
addSbtPlugin("com.eed3si9n" % "sbt-buildinfo" % "0.11.0")
addSbtPlugin("org.scala-native" % "sbt-scala-native" % "0.4.15")
addSbtPlugin("com.lightbend.akka.grpc" % "sbt-akka-grpc" % "2.1.4")
addSbtPlugin("org.apache.pekko" % "pekko-grpc-sbt-plugin" % "1.0.0")
// needed to override the Android flavor of Guava coming from pekko-grpc-sbt-plugin, which causes failures in Scala.JS builds
dependencyOverrides += "com.google.guava" % "guava" % "32.1.2-jre"

addDependencyTreePlugin
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package sttp.tapir.server.pekkogrpc

import org.apache.pekko.grpc.internal.{GrpcProtocolNative, Identity, SingleParameterSink}
import org.apache.pekko.http.scaladsl.model.HttpEntity
import org.apache.pekko.http.scaladsl.server.RequestContext
import org.apache.pekko.stream.Materializer
import org.apache.pekko.util.ByteString
import sttp.capabilities.pekko.PekkoStreams
import sttp.tapir.{InputStreamRange, RawBodyType}
import sttp.tapir.model.ServerRequest
import sttp.tapir.server.pekkohttp.PekkoHttpServerOptions
import sttp.tapir.server.interpreter.{RawValue, RequestBody}

import java.io.ByteArrayInputStream
import scala.concurrent.{ExecutionContext, Future}
import scala.util.Try

private[pekkogrpc] class PekkoGrpcRequestBody(serverOptions: PekkoHttpServerOptions)(implicit
mat: Materializer,
ec: ExecutionContext
) extends RequestBody[Future, PekkoStreams] {
private val grpcProtocol = GrpcProtocolNative.newReader(Identity)

override val streams: PekkoStreams = PekkoStreams
override def toRaw[R](request: ServerRequest, bodyType: RawBodyType[R]): Future[RawValue[R]] =
toRawFromEntity(request, akkaRequestEntity(request), bodyType)

override def toStream(request: ServerRequest): streams.BinaryStream = ???

private def akkaRequestEntity(request: ServerRequest) = request.underlying.asInstanceOf[RequestContext].request.entity

private def toRawFromEntity[R](request: ServerRequest, body: HttpEntity, bodyType: RawBodyType[R]): Future[RawValue[R]] = {
// Copy-paste from akka.grpc.scaladsl.GrpcMarshalling#unmarshal
body match {
case HttpEntity.Strict(_, data) => Future.fromTry(Try(toExpectedBodyType(data, bodyType)))
case _ => body.dataBytes.via(grpcProtocol.dataFrameDecoder).map(toExpectedBodyType(_, bodyType)).runWith(SingleParameterSink())
}
}

private def toExpectedBodyType[R](byteString: ByteString, bodyType: RawBodyType[R]): RawValue[R] = {
bodyType match {
case RawBodyType.ByteArrayBody => RawValue(byteString.toArray)
case RawBodyType.ByteBufferBody => RawValue(byteString.asByteBuffer)
case RawBodyType.InputStreamBody => RawValue(new ByteArrayInputStream(byteString.toArray))
case RawBodyType.InputStreamRangeBody => RawValue(InputStreamRange(() => new ByteArrayInputStream(byteString.toArray)))
case RawBodyType.FileBody => ???
case m: RawBodyType.MultipartBody => ???
case _ => ???
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package sttp.tapir.server.pekkogrpc

import org.apache.pekko.http.scaladsl.server.Route
import sttp.capabilities.WebSockets
import sttp.capabilities.pekko.PekkoStreams
import sttp.tapir.server.ServerEndpoint
import sttp.tapir.server.pekkohttp.PekkoHttpServerInterpreter

import scala.concurrent.{ExecutionContext, Future}

trait PekkoGrpcServerInterpreter extends PekkoHttpServerInterpreter {
override def toRoute(ses: List[ServerEndpoint[PekkoStreams with WebSockets, Future]]): Route =
toRoute(new PekkoGrpcRequestBody(pekkoHttpServerOptions)(_, _), new PekkoGrpcToResponseBody()(_, _))(ses)

}

object PekkoGrpcServerInterpreter {
def apply()(implicit _ec: ExecutionContext): PekkoGrpcServerInterpreter = new PekkoGrpcServerInterpreter {
override implicit def executionContext: ExecutionContext = _ec
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
package sttp.tapir.server.pekkogrpc

import org.apache.pekko.grpc.internal.AbstractGrpcProtocol
import org.apache.pekko.http.scaladsl.model._
import org.apache.pekko.stream.Materializer
import org.apache.pekko.util.ByteString
import sttp.capabilities.pekko.PekkoStreams
import sttp.model.HasHeaders
import sttp.tapir.internal.charset
import sttp.tapir.server.pekkohttp.PekkoResponseBody
import sttp.tapir.server.interpreter.ToResponseBody
import sttp.tapir.{CodecFormat, RawBodyType, WebSocketBodyOutput}

import java.nio.charset.{Charset, StandardCharsets}
import scala.concurrent.ExecutionContext

private[pekkogrpc] class PekkoGrpcToResponseBody(implicit m: Materializer, ec: ExecutionContext)
extends ToResponseBody[PekkoResponseBody, PekkoStreams] {
override val streams: PekkoStreams = PekkoStreams

override def fromRawValue[R](v: R, headers: HasHeaders, format: CodecFormat, bodyType: RawBodyType[R]): PekkoResponseBody =
Right(
overrideContentTypeIfDefined(
rawValueToResponseEntity(bodyType, formatToContentType(format, charset(bodyType)), headers.contentLength, v),
headers
)
)

override def fromStreamValue(
v: streams.BinaryStream,
headers: HasHeaders,
format: CodecFormat,
charset: Option[Charset]
): PekkoResponseBody = ???

override def fromWebSocketPipe[REQ, RESP](
pipe: streams.Pipe[REQ, RESP],
o: WebSocketBodyOutput[streams.Pipe[REQ, RESP], REQ, RESP, _, PekkoStreams]
): PekkoResponseBody = ???

private def rawValueToResponseEntity[CF <: CodecFormat, R](
bodyType: RawBodyType[R],
ct: ContentType,
contentLength: Option[Long],
r: R
): ResponseEntity = {
bodyType match {
case RawBodyType.StringBody(charset) => ???
case RawBodyType.ByteArrayBody => HttpEntity(ct, encodeDataToFrameBytes(ByteString(r)))
case RawBodyType.ByteBufferBody => HttpEntity(ct, encodeDataToFrameBytes(ByteString(r)))
case RawBodyType.InputStreamBody => ???
case RawBodyType.InputStreamRangeBody => ???
case RawBodyType.FileBody => ???
case m: RawBodyType.MultipartBody => ???
}
}

private def formatToContentType(format: CodecFormat, charset: Option[Charset]): ContentType = {
format match {
case CodecFormat.Json() => ContentTypes.`application/json`
case CodecFormat.TextPlain() => MediaTypes.`text/plain`.withCharset(charsetToHttpCharset(charset.getOrElse(StandardCharsets.UTF_8)))
case CodecFormat.TextHtml() => MediaTypes.`text/html`.withCharset(charsetToHttpCharset(charset.getOrElse(StandardCharsets.UTF_8)))
case CodecFormat.OctetStream() => MediaTypes.`application/octet-stream`
case CodecFormat.Zip() => MediaTypes.`application/zip`
case CodecFormat.XWwwFormUrlencoded() => MediaTypes.`application/x-www-form-urlencoded`
case CodecFormat.MultipartFormData() => MediaTypes.`multipart/form-data`
case f =>
val mt = if (f.mediaType.isText) charset.fold(f.mediaType)(f.mediaType.charset(_)) else f.mediaType
parseContentType(mt.toString())
}
}

private def parseContentType(ct: String): ContentType =
ContentType.parse(ct).getOrElse(throw new IllegalArgumentException(s"Cannot parse content type: $ct"))

private def charsetToHttpCharset(charset: Charset): HttpCharset = HttpCharset.custom(charset.name())

private def overrideContentTypeIfDefined[RE <: ResponseEntity](re: RE, headers: HasHeaders): RE = {
headers.contentType match {
case Some(ct) => re.withContentType(parseContentType(ct)).asInstanceOf[RE]
case None => re
}
}

// TODO support for compressed body
private def encodeDataToFrameBytes(data: ByteString): ByteString =
AbstractGrpcProtocol.encodeFrameData(data, isCompressed = false, isTrailer = false)
}
Loading