Skip to content

Commit

Permalink
add pekko grpc support
Browse files Browse the repository at this point in the history
pekko example

pekko grpc release
  • Loading branch information
pjfanning authored and adamw committed Sep 20, 2023
1 parent adbeec5 commit ab64270
Show file tree
Hide file tree
Showing 8 changed files with 309 additions and 0 deletions.
28 changes: 28 additions & 0 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -987,6 +987,23 @@ lazy val grpcExamples: ProjectMatrix = (projectMatrix in file("grpc/examples"))
akkaGrpcServer
)

lazy val pekkoGrpcExamples: ProjectMatrix = (projectMatrix in file("grpc/pekko-examples"))
.settings(commonSettings)
.settings(
name := "tapir-pekko-grpc-examples",
libraryDependencies ++= Seq(
"org.apache.pekko" %% "pekko-discovery" % "1.0.1"
),
fork := true
)
.enablePlugins(PekkoGrpcPlugin)
.jvmPlatform(scalaVersions = scala2And3Versions)
.dependsOn(
protobuf,
pbDirectProtobuf,
pekkoGrpcServer
)

// metrics

lazy val prometheusMetrics: ProjectMatrix = (projectMatrix in file("metrics/prometheus-metrics"))
Expand Down Expand Up @@ -1216,6 +1233,17 @@ lazy val akkaGrpcServer: ProjectMatrix = (projectMatrix in file("server/akka-grp
.jvmPlatform(scalaVersions = scala2Versions)
.dependsOn(serverCore, akkaHttpServer)

lazy val pekkoGrpcServer: ProjectMatrix = (projectMatrix in file("server/pekko-grpc-server"))
.settings(commonJvmSettings)
.settings(
name := "tapir-pekko-grpc-server",
libraryDependencies ++= Seq(
"org.apache.pekko" %% "pekko-grpc-runtime" % "1.0.0"
)
)
.jvmPlatform(scalaVersions = scala2And3Versions)
.dependsOn(serverCore, pekkoHttpServer)

lazy val armeriaServer: ProjectMatrix = (projectMatrix in file("server/armeria-server"))
.settings(commonJvmSettings)
.settings(
Expand Down
2 changes: 2 additions & 0 deletions doc/grpc.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ from these endpoints' definitions.
necessary to add this module for generating proto files with the `protobuf` module.
* `akkaGrpcServer` - a module that provides `AkkaGrpcServerInterpreter` implementation. It should be used to serve tapir
grpc endpoints.
* `pekkoGrpcServer` - a module that provides `PekkoGrpcServerInterpreter` implementation. It can be used as an
alternative to `akkaGrpcServer`.
* `grpcExamples` - contains example use cases

## Defining endpoints
Expand Down
19 changes: 19 additions & 0 deletions grpc/pekko-examples/src/main/protobuf/simple_books_example.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
syntax = "proto3";

option java_multiple_files = true;
option java_package = "sttp.tapir.grpc.examples.grpc_simple_books_example.gen";

service Library {
rpc AddBook (AddBookMsg) returns (SimpleBook) {}
}

message SimpleBook {
int32 id = 1;
string title = 2;
string description = 3;
}

message AddBookMsg {
string title = 1;
string description = 2;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
package sttp.tapir.grpc.examples

import org.apache.pekko.actor.ActorSystem
import org.apache.pekko.grpc.GrpcClientSettings
import org.apache.pekko.http.scaladsl.Http
import cats.implicits._
import com.typesafe.config.ConfigFactory
import com.typesafe.scalalogging.StrictLogging
import sttp.tapir._
import sttp.tapir.grpc.protobuf._
import sttp.tapir.grpc.protobuf.pbdirect._
import sttp.tapir.server.ServerEndpoint
import sttp.tapir.server.pekkogrpc.PekkoGrpcServerInterpreter
import sttp.tapir.generic.auto._
import sttp.tapir.grpc.examples.grpc_simple_books_example.gen.{
Library => GenLibrary,
LibraryClient => GenLibraryClient,
AddBookMsg => GenAddBookMsg
}

import java.util.concurrent.atomic.AtomicLong
import scala.concurrent.{Await, ExecutionContext, Future}

case class SimpleBook(id: Long, title: String, description: String)
case class AddBookMsg(title: String, description: String)

/** Descriptions of endpoints used in the example.
*/
object Endpoints {
val addBook = endpoint
.in("Library" / "AddBook")
.in(grpcBody[AddBookMsg])
.out(grpcBody[SimpleBook])

val endpoints = List(addBook)
}

object SimpleBooksExampleServer extends StrictLogging {

import Endpoints._

private val counter = new AtomicLong(0)

def booksServerEndpoints: List[ServerEndpoint[Any, Future]] =
List(
addBook.serverLogic { book =>
logger.info(s"Adding a new book [$book]")
Future.successful(SimpleBook(counter.getAndIncrement(), book.title, book.description).asRight[Unit])
}
)

def main(args: Array[String]): Unit = {
val conf = ConfigFactory
.parseString("pekko.http.server.preview.enable-http2 = on")
.withFallback(ConfigFactory.defaultApplication())
val system = ActorSystem("HelloWorld", conf)

new ExampleGrpcServer(system).run()
}
}

class ExampleGrpcServer(system: ActorSystem) extends StrictLogging {
def run(): Future[Http.ServerBinding] = {
// Pekko boot up code
implicit val sys: ActorSystem = system
implicit val ec: ExecutionContext = sys.dispatcher

val route = PekkoGrpcServerInterpreter().toRoute(SimpleBooksExampleServer.booksServerEndpoints)

val binding = Http().newServerAt("127.0.0.1", 8080).bind(route)

// report successful binding
binding.foreach { binding => logger.info(s"gRPC server bound to: ${binding.localAddress}") }

binding
}
}

object SimpleBookExampleProtoGenerator extends App {
ProtoSchemaGenerator.renderToFile(
path = "grpc/examples/src/main/protobuf/simple_books_example.proto",
packageName = "sttp.tapir.grpc.examples.grpc_simple_books_example.gen",
endpoints = Endpoints.endpoints
)
}

object SimpleBookExampleClient extends App with StrictLogging {

import scala.concurrent.duration._

implicit val sys = ActorSystem("HelloWorldClient")
implicit val ec = sys.dispatcher

val client = GenLibraryClient(GrpcClientSettings.connectToServiceAt("localhost", 8080).withTls(false))
val result = Await.result(client.addBook(GenAddBookMsg("TEST_BOOK", "TEST")), 10.second)

logger.info(s"Result: [$result]")
}
1 change: 1 addition & 0 deletions project/plugins.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -18,5 +18,6 @@ addSbtPlugin("io.gatling" % "gatling-sbt" % "4.5.0")
addSbtPlugin("com.eed3si9n" % "sbt-buildinfo" % "0.11.0")
addSbtPlugin("org.scala-native" % "sbt-scala-native" % "0.4.15")
addSbtPlugin("com.lightbend.akka.grpc" % "sbt-akka-grpc" % "2.1.4")
addSbtPlugin("org.apache.pekko" % "pekko-grpc-sbt-plugin" % "1.0.0")

addDependencyTreePlugin
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package sttp.tapir.server.pekkogrpc

import org.apache.pekko.grpc.internal.{GrpcProtocolNative, Identity, SingleParameterSink}
import org.apache.pekko.http.scaladsl.model.HttpEntity
import org.apache.pekko.http.scaladsl.server.RequestContext
import org.apache.pekko.stream.Materializer
import org.apache.pekko.util.ByteString
import sttp.capabilities.pekko.PekkoStreams
import sttp.tapir.{InputStreamRange, RawBodyType}
import sttp.tapir.model.ServerRequest
import sttp.tapir.server.pekkohttp.PekkoHttpServerOptions
import sttp.tapir.server.interpreter.{RawValue, RequestBody}

import java.io.ByteArrayInputStream
import scala.concurrent.{ExecutionContext, Future}
import scala.util.Try

private[pekkogrpc] class PekkoGrpcRequestBody(serverOptions: PekkoHttpServerOptions)(implicit
mat: Materializer,
ec: ExecutionContext
) extends RequestBody[Future, PekkoStreams] {
private val grpcProtocol = GrpcProtocolNative.newReader(Identity)

override val streams: PekkoStreams = PekkoStreams
override def toRaw[R](request: ServerRequest, bodyType: RawBodyType[R]): Future[RawValue[R]] =
toRawFromEntity(request, akkaRequestEntity(request), bodyType)

override def toStream(request: ServerRequest): streams.BinaryStream = ???

private def akkaRequestEntity(request: ServerRequest) = request.underlying.asInstanceOf[RequestContext].request.entity

private def toRawFromEntity[R](request: ServerRequest, body: HttpEntity, bodyType: RawBodyType[R]): Future[RawValue[R]] = {
// Copy-paste from akka.grpc.scaladsl.GrpcMarshalling#unmarshal
body match {
case HttpEntity.Strict(_, data) => Future.fromTry(Try(toExpectedBodyType(data, bodyType)))
case _ => body.dataBytes.via(grpcProtocol.dataFrameDecoder).map(toExpectedBodyType(_, bodyType)).runWith(SingleParameterSink())
}
}

private def toExpectedBodyType[R](byteString: ByteString, bodyType: RawBodyType[R]): RawValue[R] = {
bodyType match {
case RawBodyType.ByteArrayBody => RawValue(byteString.toArray)
case RawBodyType.ByteBufferBody => RawValue(byteString.asByteBuffer)
case RawBodyType.InputStreamBody => RawValue(new ByteArrayInputStream(byteString.toArray))
case RawBodyType.InputStreamRangeBody => RawValue(InputStreamRange(() => new ByteArrayInputStream(byteString.toArray)))
case RawBodyType.FileBody => ???
case m: RawBodyType.MultipartBody => ???
case _ => ???
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package sttp.tapir.server.pekkogrpc

import org.apache.pekko.http.scaladsl.server.Route
import sttp.capabilities.WebSockets
import sttp.capabilities.pekko.PekkoStreams
import sttp.tapir.server.ServerEndpoint
import sttp.tapir.server.pekkohttp.PekkoHttpServerInterpreter

import scala.concurrent.{ExecutionContext, Future}

trait PekkoGrpcServerInterpreter extends PekkoHttpServerInterpreter {
override def toRoute(ses: List[ServerEndpoint[PekkoStreams with WebSockets, Future]]): Route =
toRoute(new PekkoGrpcRequestBody(pekkoHttpServerOptions)(_, _), new PekkoGrpcToResponseBody()(_, _))(ses)

}

object PekkoGrpcServerInterpreter {
def apply()(implicit _ec: ExecutionContext): PekkoGrpcServerInterpreter = new PekkoGrpcServerInterpreter {
override implicit def executionContext: ExecutionContext = _ec
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
package sttp.tapir.server.pekkogrpc

import org.apache.pekko.grpc.internal.AbstractGrpcProtocol
import org.apache.pekko.http.scaladsl.model._
import org.apache.pekko.stream.Materializer
import org.apache.pekko.util.ByteString
import sttp.capabilities.pekko.PekkoStreams
import sttp.model.HasHeaders
import sttp.tapir.internal.charset
import sttp.tapir.server.pekkohttp.PekkoResponseBody
import sttp.tapir.server.interpreter.ToResponseBody
import sttp.tapir.{CodecFormat, RawBodyType, WebSocketBodyOutput}

import java.nio.charset.{Charset, StandardCharsets}
import scala.concurrent.ExecutionContext

private[pekkogrpc] class PekkoGrpcToResponseBody(implicit m: Materializer, ec: ExecutionContext)
extends ToResponseBody[PekkoResponseBody, PekkoStreams] {
override val streams: PekkoStreams = PekkoStreams

override def fromRawValue[R](v: R, headers: HasHeaders, format: CodecFormat, bodyType: RawBodyType[R]): PekkoResponseBody =
Right(
overrideContentTypeIfDefined(
rawValueToResponseEntity(bodyType, formatToContentType(format, charset(bodyType)), headers.contentLength, v),
headers
)
)

override def fromStreamValue(
v: streams.BinaryStream,
headers: HasHeaders,
format: CodecFormat,
charset: Option[Charset]
): PekkoResponseBody = ???

override def fromWebSocketPipe[REQ, RESP](
pipe: streams.Pipe[REQ, RESP],
o: WebSocketBodyOutput[streams.Pipe[REQ, RESP], REQ, RESP, _, PekkoStreams]
): PekkoResponseBody = ???

private def rawValueToResponseEntity[CF <: CodecFormat, R](
bodyType: RawBodyType[R],
ct: ContentType,
contentLength: Option[Long],
r: R
): ResponseEntity = {
bodyType match {
case RawBodyType.StringBody(charset) => ???
case RawBodyType.ByteArrayBody => HttpEntity(ct, encodeDataToFrameBytes(ByteString(r)))
case RawBodyType.ByteBufferBody => HttpEntity(ct, encodeDataToFrameBytes(ByteString(r)))
case RawBodyType.InputStreamBody => ???
case RawBodyType.InputStreamRangeBody => ???
case RawBodyType.FileBody => ???
case m: RawBodyType.MultipartBody => ???
}
}

private def formatToContentType(format: CodecFormat, charset: Option[Charset]): ContentType = {
format match {
case CodecFormat.Json() => ContentTypes.`application/json`
case CodecFormat.TextPlain() => MediaTypes.`text/plain`.withCharset(charsetToHttpCharset(charset.getOrElse(StandardCharsets.UTF_8)))
case CodecFormat.TextHtml() => MediaTypes.`text/html`.withCharset(charsetToHttpCharset(charset.getOrElse(StandardCharsets.UTF_8)))
case CodecFormat.OctetStream() => MediaTypes.`application/octet-stream`
case CodecFormat.Zip() => MediaTypes.`application/zip`
case CodecFormat.XWwwFormUrlencoded() => MediaTypes.`application/x-www-form-urlencoded`
case CodecFormat.MultipartFormData() => MediaTypes.`multipart/form-data`
case f =>
val mt = if (f.mediaType.isText) charset.fold(f.mediaType)(f.mediaType.charset(_)) else f.mediaType
parseContentType(mt.toString())
}
}

private def parseContentType(ct: String): ContentType =
ContentType.parse(ct).getOrElse(throw new IllegalArgumentException(s"Cannot parse content type: $ct"))

private def charsetToHttpCharset(charset: Charset): HttpCharset = HttpCharset.custom(charset.name())

private def overrideContentTypeIfDefined[RE <: ResponseEntity](re: RE, headers: HasHeaders): RE = {
headers.contentType match {
case Some(ct) => re.withContentType(parseContentType(ct)).asInstanceOf[RE]
case None => re
}
}

// TODO support for compressed body
private def encodeDataToFrameBytes(data: ByteString): ByteString =
AbstractGrpcProtocol.encodeFrameData(data, isCompressed = false, isTrailer = false)
}

0 comments on commit ab64270

Please sign in to comment.