diff --git a/build.sbt b/build.sbt index 5088e6cb3..41ab6053f 100644 --- a/build.sbt +++ b/build.sbt @@ -160,6 +160,13 @@ lazy val `prometheus` = project //// DROPWIZARD //// //////////////////// +lazy val `dropwizard` = project + .in(file("modules/metrics/dropwizard")) + .dependsOn(`internal-core`) + .dependsOn(testing % "test->test") + .settings(moduleName := "mu-rpc-dropwizard") + .settings(dropwizardMetricsSettings) + lazy val `dropwizard-server` = project .in(file("modules/dropwizard/server")) .dependsOn(`prometheus-server` % "compile->compile;test->test") @@ -175,12 +182,16 @@ lazy val `dropwizard-client` = project .settings(moduleName := "mu-rpc-dropwizard-client") .settings(dropwizardSettings) -lazy val `dropwizard` = project - .in(file("modules/metrics/dropwizard")) - .dependsOn(`internal-core`) - .dependsOn(testing % "test->test") - .settings(moduleName := "mu-rpc-dropwizard") - .settings(dropwizardMetricsSettings) +/////////////////// +//// HTTP/REST //// +/////////////////// + +lazy val `http` = project + .in(file("modules/http")) + .dependsOn(common % "compile->compile;test->test") + .dependsOn(server % "compile->compile;test->test") + .settings(moduleName := "mu-rpc-http") + .settings(httpSettings) //////////////// //// IDLGEN //// @@ -395,6 +406,7 @@ lazy val allModules: Seq[ProjectReference] = Seq( testing, ssl, `idlgen-core`, + `http`, `marshallers-jodatime`, `example-routeguide-protocol`, `example-routeguide-common`, diff --git a/modules/common/src/main/scala/higherkindness/mu/rpc/protocol/protocol.scala b/modules/common/src/main/scala/higherkindness/mu/rpc/protocol/protocol.scala index fb455d6e8..3561f1a62 100644 --- a/modules/common/src/main/scala/higherkindness/mu/rpc/protocol/protocol.scala +++ b/modules/common/src/main/scala/higherkindness/mu/rpc/protocol/protocol.scala @@ -37,6 +37,7 @@ class message extends StaticAnnotation class option(name: String, value: Any) extends StaticAnnotation class outputPackage(value: String) extends StaticAnnotation class outputName(value: String) extends StaticAnnotation +class http extends StaticAnnotation @message object Empty diff --git a/modules/http/src/main/scala/higherkindness/mu/http/implicits.scala b/modules/http/src/main/scala/higherkindness/mu/http/implicits.scala new file mode 100644 index 000000000..6b7494a31 --- /dev/null +++ b/modules/http/src/main/scala/higherkindness/mu/http/implicits.scala @@ -0,0 +1,133 @@ +/* + * Copyright 2017-2019 47 Degrees, LLC. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package higherkindness.mu.http + +import cats.ApplicativeError +import cats.effect._ +import cats.implicits._ +import cats.syntax.either._ +import fs2.{RaiseThrowable, Stream} +import io.grpc.Status.Code._ +import org.typelevel.jawn.ParseException +import io.circe._ +import io.circe.jawn.CirceSupportParser.facade +import io.circe.syntax._ +import io.grpc.{Status => _, _} +import jawnfs2._ +import org.http4s._ +import org.http4s.dsl.Http4sDsl +import org.http4s.Status.Ok +import scala.util.control.NoStackTrace + +object implicits { + + implicit val unexpectedErrorEncoder: Encoder[UnexpectedError] = new Encoder[UnexpectedError] { + final def apply(a: UnexpectedError): Json = Json.obj( + ("className", Json.fromString(a.className)), + ("msg", a.msg.fold(Json.Null)(s => Json.fromString(s))) + ) + } + + implicit val unexpectedErrorDecoder: Decoder[UnexpectedError] = new Decoder[UnexpectedError] { + final def apply(c: HCursor): Decoder.Result[UnexpectedError] = + for { + className <- c.downField("className").as[String] + msg <- c.downField("msg").as[Option[String]] + } yield UnexpectedError(className, msg) + } + + implicit def EitherDecoder[A, B](implicit a: Decoder[A], b: Decoder[B]): Decoder[Either[A, B]] = + a.map(Left.apply) or b.map(Right.apply) + + implicit def EitherEncoder[A, B](implicit ea: Encoder[A], eb: Encoder[B]): Encoder[Either[A, B]] = + new Encoder[Either[A, B]] { + final def apply(a: Either[A, B]): Json = a.fold(_.asJson, _.asJson) + } + + implicit class MessageOps[F[_]](private val message: Message[F]) extends AnyVal { + + def jsonBodyAsStream[A]( + implicit decoder: Decoder[A], + F: ApplicativeError[F, Throwable]): Stream[F, A] = + message.body.chunks.parseJsonStream.map(_.as[A]).rethrow + } + + implicit class RequestOps[F[_]](private val request: Request[F]) { + + def asStream[A](implicit decoder: Decoder[A], F: ApplicativeError[F, Throwable]): Stream[F, A] = + request + .jsonBodyAsStream[A] + .adaptError { // mimic behavior of MessageOps.as[T] in handling of parsing errors + case ex: ParseException => + MalformedMessageBodyFailure(ex.getMessage, Some(ex)) // will return 400 instead of 500 + } + } + + implicit class ResponseOps[F[_]](private val response: Response[F]) { + + def asStream[A]( + implicit decoder: Decoder[A], + F: ApplicativeError[F, Throwable], + R: RaiseThrowable[F]): Stream[F, A] = + if (response.status.code != Ok.code) Stream.raiseError(ResponseError(response.status)) + else response.jsonBodyAsStream[Either[UnexpectedError, A]].rethrow + } + + implicit class Fs2StreamOps[F[_], A](private val stream: Stream[F, A]) { + + def asJsonEither(implicit encoder: Encoder[A]): Stream[F, Json] = + stream.attempt.map(_.bimap(_.toUnexpected, identity).asJson) + } + + implicit class FResponseOps[F[_]: Sync](private val response: F[Response[F]]) + extends Http4sDsl[F] { + + def adaptErrors: F[Response[F]] = response.handleErrorWith { + case se: StatusException => errorFromStatus(se.getStatus, se.getMessage) + case sre: StatusRuntimeException => errorFromStatus(sre.getStatus, sre.getMessage) + case other: Throwable => InternalServerError(other.getMessage) + } + + private def errorFromStatus(status: io.grpc.Status, message: String): F[Response[F]] = + status.getCode match { + case INVALID_ARGUMENT => BadRequest(message) + case UNAUTHENTICATED => Forbidden(message) + case PERMISSION_DENIED => Forbidden(message) + case NOT_FOUND => NotFound(message) + case UNAVAILABLE => ServiceUnavailable(message) + case _ => InternalServerError(message) + } + } + + def handleResponseError[F[_]: Sync](errorResponse: Response[F]): F[Throwable] = + errorResponse.bodyAsText.compile.foldMonoid.map(body => + ResponseError(errorResponse.status, Some(body).filter(_.nonEmpty))) + + implicit class ThrowableOps(self: Throwable) { + def toUnexpected: UnexpectedError = + UnexpectedError(self.getClass.getName, Option(self.getMessage)) + } + +} + +final case class UnexpectedError(className: String, msg: Option[String]) + extends RuntimeException(className + msg.fold("")(": " + _)) + with NoStackTrace + +final case class ResponseError(status: Status, msg: Option[String] = None) + extends RuntimeException(status + msg.fold("")(": " + _)) + with NoStackTrace diff --git a/modules/http/src/main/scala/higherkindness/mu/http/protocol.scala b/modules/http/src/main/scala/higherkindness/mu/http/protocol.scala new file mode 100644 index 000000000..d7e8bda25 --- /dev/null +++ b/modules/http/src/main/scala/higherkindness/mu/http/protocol.scala @@ -0,0 +1,37 @@ +/* + * Copyright 2017-2019 47 Degrees, LLC. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package higherkindness.mu.http + +import cats.effect.{ConcurrentEffect, Timer} +import org.http4s.HttpRoutes +import org.http4s.server.blaze.BlazeServerBuilder +import org.http4s.implicits._ +import org.http4s.server.Router + +case class RouteMap[F[_]](prefix: String, route: HttpRoutes[F]) + +object HttpServer { + + def bind[F[_]: ConcurrentEffect: Timer]( + port: Int, + host: String, + routes: RouteMap[F]*): BlazeServerBuilder[F] = + BlazeServerBuilder[F] + .bindHttp(port, host) + .withHttpApp(Router(routes.map(r => (s"/${r.prefix}", r.route)): _*).orNotFound) + +} diff --git a/modules/http/src/test/resources/logback-test.xml b/modules/http/src/test/resources/logback-test.xml new file mode 100644 index 000000000..c3523a582 --- /dev/null +++ b/modules/http/src/test/resources/logback-test.xml @@ -0,0 +1,11 @@ + + + + %d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n + + + + + + + diff --git a/modules/http/src/test/scala/higherkindness/mu/rpc/http/GreeterDerivedRestTests.scala b/modules/http/src/test/scala/higherkindness/mu/rpc/http/GreeterDerivedRestTests.scala new file mode 100644 index 000000000..98d0d6dd0 --- /dev/null +++ b/modules/http/src/test/scala/higherkindness/mu/rpc/http/GreeterDerivedRestTests.scala @@ -0,0 +1,200 @@ +/* + * Copyright 2017-2019 47 Degrees, LLC. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package higherkindness.mu.rpc.http + +import cats.effect.{IO, _} +import fs2.Stream +import fs2.interop.reactivestreams._ +import higherkindness.mu.http.{HttpServer, ResponseError, RouteMap, UnexpectedError} +import higherkindness.mu.rpc.common.RpcBaseTestSuite +import monix.reactive.Observable +import io.circe.generic.auto._ +import org.http4s._ +import org.http4s.client.blaze.BlazeClientBuilder +import org.http4s.server.blaze._ +import org.scalatest._ + +import scala.concurrent.duration._ + +class GreeterDerivedRestTests extends RpcBaseTestSuite with BeforeAndAfter { + + val host = "localhost" + val port = 8080 + val serviceUri: Uri = Uri.unsafeFromString(s"http://$host:$port") + + implicit val ec = monix.execution.Scheduler.Implicits.global + implicit val cs: ContextShift[IO] = IO.contextShift(ec) + implicit val timer: Timer[IO] = IO.timer(ec) + + implicit val unaryHandlerIO = new UnaryGreeterHandler[IO] + implicit val fs2HandlerIO = new Fs2GreeterHandler[IO] + implicit val monixHandlerIO = new MonixGreeterHandler[IO] + + val unaryRoute: RouteMap[IO] = UnaryGreeter.route[IO] + val fs2Route: RouteMap[IO] = Fs2Greeter.route[IO] + val monixRoute: RouteMap[IO] = MonixGreeter.route[IO] + + val server: BlazeServerBuilder[IO] = HttpServer.bind(port, host, unaryRoute, fs2Route, monixRoute) + + var serverTask: Fiber[IO, Nothing] = _ + before(serverTask = server.resource.use(_ => IO.never).start.unsafeRunSync()) + after(serverTask.cancel) + + "REST Service" should { + + val unaryClient = UnaryGreeter.httpClient[IO](serviceUri) + val fs2Client = Fs2Greeter.httpClient[IO](serviceUri) + val monixClient = MonixGreeter.httpClient[IO](serviceUri) + + "serve a GET request" in { + val response: IO[HelloResponse] = + BlazeClientBuilder[IO](ec).resource.use(unaryClient.getHello(_)) + response.unsafeRunSync() shouldBe HelloResponse("hey") + } + + "serve a unary POST request" in { + val response: IO[HelloResponse] = + BlazeClientBuilder[IO](ec).resource.use(unaryClient.sayHello(HelloRequest("hey"))(_)) + response.unsafeRunSync() shouldBe HelloResponse("hey") + } + + "handle a raised gRPC exception in a unary POST request" in { + val response: IO[HelloResponse] = + BlazeClientBuilder[IO](ec).resource.use(unaryClient.sayHello(HelloRequest("SRE"))(_)) + + the[ResponseError] thrownBy response.unsafeRunSync() shouldBe ResponseError( + Status.BadRequest, + Some("INVALID_ARGUMENT: SRE")) + } + + "handle a raised non-gRPC exception in a unary POST request" in { + val response: IO[HelloResponse] = + BlazeClientBuilder[IO](ec).resource.use(unaryClient.sayHello(HelloRequest("RTE"))(_)) + + the[ResponseError] thrownBy response.unsafeRunSync() shouldBe ResponseError( + Status.InternalServerError, + Some("RTE")) + } + + "handle a thrown exception in a unary POST request" in { + val response: IO[HelloResponse] = + BlazeClientBuilder[IO](ec).resource.use(unaryClient.sayHello(HelloRequest("TR"))(_)) + + the[ResponseError] thrownBy response.unsafeRunSync() shouldBe ResponseError( + Status.InternalServerError) + } + + "serve a POST request with fs2 streaming request" in { + + val requests = Stream(HelloRequest("hey"), HelloRequest("there")) + + val response: IO[HelloResponse] = + BlazeClientBuilder[IO](ec).resource.use(fs2Client.sayHellos(requests)(_)) + response.unsafeRunSync() shouldBe HelloResponse("hey, there") + } + + "serve a POST request with empty fs2 streaming request" in { + val requests = Stream.empty + val response = + BlazeClientBuilder[IO](ec).resource.use(fs2Client.sayHellos(requests)(_)) + response.unsafeRunSync() shouldBe HelloResponse("") + } + + "serve a POST request with Observable streaming request" in { + val requests = Observable(HelloRequest("hey"), HelloRequest("there")) + val response = + BlazeClientBuilder[IO](ec).resource.use(monixClient.sayHellos(requests)(_)) + response.unsafeRunSync() shouldBe HelloResponse("hey, there") + } + + "serve a POST request with empty Observable streaming request" in { + val requests = Observable.empty + val response = + BlazeClientBuilder[IO](ec).resource.use(monixClient.sayHellos(requests)(_)) + response.unsafeRunSync() shouldBe HelloResponse("") + } + + "serve a POST request with fs2 streaming response" in { + val request = HelloRequest("hey") + val responses = + BlazeClientBuilder[IO](ec).stream.flatMap(fs2Client.sayHelloAll(request)(_)) + responses.compile.toList + .unsafeRunSync() shouldBe List(HelloResponse("hey"), HelloResponse("hey")) + } + + "serve a POST request with Observable streaming response" in { + val request = HelloRequest("hey") + val responses = BlazeClientBuilder[IO](ec).stream + .flatMap(monixClient.sayHelloAll(request)(_).toReactivePublisher.toStream[IO]) + responses.compile.toList + .unsafeRunTimed(10.seconds) + .getOrElse(sys.error("Stuck!")) shouldBe List(HelloResponse("hey"), HelloResponse("hey")) + } + + "handle errors with fs2 streaming response" in { + val request = HelloRequest("") + val responses = + BlazeClientBuilder[IO](ec).stream.flatMap(fs2Client.sayHelloAll(request)(_)) + the[UnexpectedError] thrownBy responses.compile.toList + .unsafeRunSync() should have message "java.lang.IllegalArgumentException: empty greeting" + } + + "handle errors with Observable streaming response" in { + val request = HelloRequest("") + val responses = BlazeClientBuilder[IO](ec).stream + .flatMap(monixClient.sayHelloAll(request)(_).toReactivePublisher.toStream[IO]) + the[UnexpectedError] thrownBy responses.compile.toList + .unsafeRunTimed(10.seconds) + .getOrElse(sys.error("Stuck!")) should have message "java.lang.IllegalArgumentException: empty greeting" + } + + "serve a POST request with bidirectional fs2 streaming" in { + val requests = Stream(HelloRequest("hey"), HelloRequest("there")) + val responses = + BlazeClientBuilder[IO](ec).stream.flatMap(fs2Client.sayHellosAll(requests)(_)) + responses.compile.toList + .unsafeRunSync() shouldBe List(HelloResponse("hey"), HelloResponse("there")) + } + + "serve an empty POST request with bidirectional fs2 streaming" in { + val requests = Stream.empty + val responses = + BlazeClientBuilder[IO](ec).stream.flatMap(fs2Client.sayHellosAll(requests)(_)) + responses.compile.toList.unsafeRunSync() shouldBe Nil + } + + "serve a POST request with bidirectional Observable streaming" in { + val requests = Observable(HelloRequest("hey"), HelloRequest("there")) + val responses = BlazeClientBuilder[IO](ec).stream + .flatMap(monixClient.sayHellosAll(requests)(_).toReactivePublisher.toStream[IO]) + responses.compile.toList + .unsafeRunTimed(10.seconds) + .getOrElse(sys.error("Stuck!")) shouldBe List(HelloResponse("hey"), HelloResponse("there")) + } + + "serve an empty POST request with bidirectional Observable streaming" in { + val requests = Observable.empty + val responses = BlazeClientBuilder[IO](ec).stream + .flatMap(monixClient.sayHellosAll(requests)(_).toReactivePublisher.toStream[IO]) + responses.compile.toList + .unsafeRunTimed(10.seconds) + .getOrElse(sys.error("Stuck!")) shouldBe Nil + } + + } + +} diff --git a/modules/http/src/test/scala/higherkindness/mu/rpc/http/GreeterHandlers.scala b/modules/http/src/test/scala/higherkindness/mu/rpc/http/GreeterHandlers.scala new file mode 100644 index 000000000..5fa8ac058 --- /dev/null +++ b/modules/http/src/test/scala/higherkindness/mu/rpc/http/GreeterHandlers.scala @@ -0,0 +1,80 @@ +/* + * Copyright 2017-2019 47 Degrees, LLC. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package higherkindness.mu.rpc.http + +import cats.{Applicative, MonadError} +import cats.effect._ + +class UnaryGreeterHandler[F[_]: Applicative](implicit F: MonadError[F, Throwable]) + extends UnaryGreeter[F] { + + import cats.syntax.applicative._ + import higherkindness.mu.rpc.protocol.Empty + import io.grpc.Status._ + + def getHello(request: Empty.type): F[HelloResponse] = HelloResponse("hey").pure + + def sayHello(request: HelloRequest): F[HelloResponse] = request.hello match { + case "SE" => F.raiseError(INVALID_ARGUMENT.withDescription("SE").asException) + case "SRE" => F.raiseError(INVALID_ARGUMENT.withDescription("SRE").asRuntimeException) + case "RTE" => F.raiseError(new IllegalArgumentException("RTE")) + case "TR" => throw new IllegalArgumentException("Thrown") + case other => HelloResponse(other).pure + } + +} + +class Fs2GreeterHandler[F[_]: Sync] extends Fs2Greeter[F] { + + import fs2.Stream + + def sayHellos(requests: Stream[F, HelloRequest]): F[HelloResponse] = + requests.compile.fold(HelloResponse("")) { + case (response, request) => + HelloResponse( + if (response.hello.isEmpty) request.hello else s"${response.hello}, ${request.hello}") + } + + def sayHelloAll(request: HelloRequest): Stream[F, HelloResponse] = + if (request.hello.isEmpty) Stream.raiseError(new IllegalArgumentException("empty greeting")) + else Stream(HelloResponse(request.hello), HelloResponse(request.hello)) + + def sayHellosAll(requests: Stream[F, HelloRequest]): Stream[F, HelloResponse] = + requests.map(request => HelloResponse(request.hello)) +} + +class MonixGreeterHandler[F[_]: Async](implicit sc: monix.execution.Scheduler) + extends MonixGreeter[F] { + + import monix.reactive.Observable + + def sayHellos(requests: Observable[HelloRequest]): F[HelloResponse] = + requests + .foldLeftL(HelloResponse("")) { + case (response, request) => + HelloResponse( + if (response.hello.isEmpty) request.hello else s"${response.hello}, ${request.hello}") + } + .to[F] + + def sayHelloAll(request: HelloRequest): Observable[HelloResponse] = + if (request.hello.isEmpty) Observable.raiseError(new IllegalArgumentException("empty greeting")) + else Observable(HelloResponse(request.hello), HelloResponse(request.hello)) + + def sayHellosAll(requests: Observable[HelloRequest]): Observable[HelloResponse] = + requests.map(request => HelloResponse(request.hello)) +} diff --git a/modules/http/src/test/scala/higherkindness/mu/rpc/http/GreeterRestClients.scala b/modules/http/src/test/scala/higherkindness/mu/rpc/http/GreeterRestClients.scala new file mode 100644 index 000000000..f22e8d2ab --- /dev/null +++ b/modules/http/src/test/scala/higherkindness/mu/rpc/http/GreeterRestClients.scala @@ -0,0 +1,111 @@ +/* + * Copyright 2017-2019 47 Degrees, LLC. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package higherkindness.mu.rpc.http + +import cats.effect._ +import fs2.Stream +import fs2.interop.reactivestreams._ +import io.circe.syntax._ +import higherkindness.mu.http.implicits._ +import org.http4s._ +import org.http4s.circe._ +import org.http4s.client._ + +class UnaryGreeterRestClient[F[_]: Sync](uri: Uri) { + + def getHello()(client: Client[F])( + implicit decoderHelloResponse: io.circe.Decoder[HelloResponse]): F[HelloResponse] = { + val request = Request[F](Method.GET, uri / "getHello") + client.expectOr[HelloResponse](request)(handleResponseError)(jsonOf[F, HelloResponse]) + } + + def sayHello(arg: HelloRequest)(client: Client[F])( + implicit encoderHelloRequest: io.circe.Encoder[HelloRequest], + decoderHelloResponse: io.circe.Decoder[HelloResponse]): F[HelloResponse] = { + val request = Request[F](Method.POST, uri / "sayHello") + client.expectOr[HelloResponse](request.withEntity(arg.asJson))(handleResponseError)( + jsonOf[F, HelloResponse]) + } + +} + +class Fs2GreeterRestClient[F[_]: Sync](uri: Uri) { + + def sayHellos(arg: Stream[F, HelloRequest])(client: Client[F])( + implicit encoderHelloRequest: io.circe.Encoder[HelloRequest], + decoderHelloResponse: io.circe.Decoder[HelloResponse]): F[HelloResponse] = { + val request = Request[F](Method.POST, uri / "sayHellos") + client.expectOr[HelloResponse](request.withEntity(arg.map(_.asJson)))(handleResponseError)( + jsonOf[F, HelloResponse]) + } + + def sayHelloAll(arg: HelloRequest)(client: Client[F])( + implicit encoderHelloRequest: io.circe.Encoder[HelloRequest], + decoderHelloResponse: io.circe.Decoder[HelloResponse]): Stream[F, HelloResponse] = { + val request = Request[F](Method.POST, uri / "sayHelloAll") + client.stream(request.withEntity(arg.asJson)).flatMap(_.asStream[HelloResponse]) + } + + def sayHellosAll(arg: Stream[F, HelloRequest])(client: Client[F])( + implicit encoderHelloRequest: io.circe.Encoder[HelloRequest], + decoderHelloResponse: io.circe.Decoder[HelloResponse]): Stream[F, HelloResponse] = { + val request = Request[F](Method.POST, uri / "sayHellosAll") + client.stream(request.withEntity(arg.map(_.asJson))).flatMap(_.asStream[HelloResponse]) + } + +} + +class MonixGreeterRestClient[F[_]: ConcurrentEffect](uri: Uri)( + implicit ec: scala.concurrent.ExecutionContext) { + + import monix.reactive.Observable + import higherkindness.mu.http.implicits._ + + implicit val sc: monix.execution.Scheduler = monix.execution.Scheduler(ec) + + def sayHellos(arg: Observable[HelloRequest])(client: Client[F])( + implicit encoderHelloRequest: io.circe.Encoder[HelloRequest], + decoderHelloResponse: io.circe.Decoder[HelloResponse]): F[HelloResponse] = { + val request = Request[F](Method.POST, uri / "sayHellos") + client.expectOr[HelloResponse]( + request.withEntity(arg.toReactivePublisher.toStream.map(_.asJson)))(handleResponseError)( + jsonOf[F, HelloResponse]) + } + + def sayHelloAll(arg: HelloRequest)(client: Client[F])( + implicit encoderHelloRequest: io.circe.Encoder[HelloRequest], + decoderHelloResponse: io.circe.Decoder[HelloResponse]): Observable[HelloResponse] = { + val request = Request[F](Method.POST, uri / "sayHelloAll") + Observable.fromReactivePublisher( + client + .stream(request.withEntity(arg.asJson)) + .flatMap(_.asStream[HelloResponse]) + .toUnicastPublisher) + } + + def sayHellosAll(arg: Observable[HelloRequest])(client: Client[F])( + implicit encoderHelloRequest: io.circe.Encoder[HelloRequest], + decoderHelloResponse: io.circe.Decoder[HelloResponse]): Observable[HelloResponse] = { + val request = Request[F](Method.POST, uri / "sayHellosAll") + Observable.fromReactivePublisher( + client + .stream(request.withEntity(arg.toReactivePublisher.toStream.map(_.asJson))) + .flatMap(_.asStream[HelloResponse]) + .toUnicastPublisher) + } + +} diff --git a/modules/http/src/test/scala/higherkindness/mu/rpc/http/GreeterRestServices.scala b/modules/http/src/test/scala/higherkindness/mu/rpc/http/GreeterRestServices.scala new file mode 100644 index 000000000..319d64b50 --- /dev/null +++ b/modules/http/src/test/scala/higherkindness/mu/rpc/http/GreeterRestServices.scala @@ -0,0 +1,112 @@ +/* + * Copyright 2017-2019 47 Degrees, LLC. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package higherkindness.mu.rpc.http + +import cats.effect._ +import cats.syntax.flatMap._ +import cats.syntax.functor._ +import io.circe.syntax._ +import higherkindness.mu.http.implicits._ +import fs2.interop.reactivestreams._ +import monix.reactive.Observable +import org.http4s._ +import org.http4s.circe._ +import org.http4s.dsl.Http4sDsl + +class UnaryGreeterRestService[F[_]: Sync]( + implicit handler: UnaryGreeter[F], + decoderHelloRequest: io.circe.Decoder[HelloRequest], + encoderHelloResponse: io.circe.Encoder[HelloResponse]) + extends Http4sDsl[F] { + + import higherkindness.mu.rpc.protocol.Empty + + private implicit val requestDecoder: EntityDecoder[F, HelloRequest] = jsonOf[F, HelloRequest] + + def service: HttpRoutes[F] = HttpRoutes.of[F] { + + case GET -> Root / "getHello" => Ok(handler.getHello(Empty).map(_.asJson)) + + case msg @ POST -> Root / "sayHello" => + for { + request <- msg.as[HelloRequest] + response <- Ok(handler.sayHello(request).map(_.asJson)).adaptErrors + } yield response + } +} + +class Fs2GreeterRestService[F[_]: Sync]( + implicit handler: Fs2Greeter[F], + decoderHelloRequest: io.circe.Decoder[HelloRequest], + encoderHelloResponse: io.circe.Encoder[HelloResponse]) + extends Http4sDsl[F] { + + private implicit val requestDecoder: EntityDecoder[F, HelloRequest] = jsonOf[F, HelloRequest] + + def service: HttpRoutes[F] = HttpRoutes.of[F] { + + case msg @ POST -> Root / "sayHellos" => + val requests = msg.asStream[HelloRequest] + Ok(handler.sayHellos(requests).map(_.asJson)) + + case msg @ POST -> Root / "sayHelloAll" => + for { + request <- msg.as[HelloRequest] + responses <- Ok(handler.sayHelloAll(request).asJsonEither) + } yield responses + + case msg @ POST -> Root / "sayHellosAll" => + val requests = msg.asStream[HelloRequest] + Ok(handler.sayHellosAll(requests).asJsonEither) + } +} + +class MonixGreeterRestService[F[_]: ConcurrentEffect]( + implicit handler: MonixGreeter[F], + ec: scala.concurrent.ExecutionContext, + decoderHelloRequest: io.circe.Decoder[HelloRequest], + encoderHelloResponse: io.circe.Encoder[HelloResponse]) + extends Http4sDsl[F] { + + private implicit val requestDecoder: EntityDecoder[F, HelloRequest] = jsonOf[F, HelloRequest] + implicit val scheduler: monix.execution.Scheduler = monix.execution.Scheduler(ec) + + def service: HttpRoutes[F] = HttpRoutes.of[F] { + + case msg @ POST -> Root / "sayHellos" => + val requests = msg.asStream[HelloRequest] + Ok( + handler + .sayHellos(Observable.fromReactivePublisher(requests.toUnicastPublisher)) + .map(_.asJson)) + + case msg @ POST -> Root / "sayHelloAll" => + for { + request <- msg.as[HelloRequest] + responses <- Ok(handler.sayHelloAll(request).toReactivePublisher.toStream.asJsonEither) + } yield responses + + case msg @ POST -> Root / "sayHellosAll" => + val requests = msg.asStream[HelloRequest] + Ok( + handler + .sayHellosAll(Observable.fromReactivePublisher(requests.toUnicastPublisher)) + .toReactivePublisher + .toStream + .asJsonEither) + } +} diff --git a/modules/http/src/test/scala/higherkindness/mu/rpc/http/GreeterRestTests.scala b/modules/http/src/test/scala/higherkindness/mu/rpc/http/GreeterRestTests.scala new file mode 100644 index 000000000..ae9dca4ed --- /dev/null +++ b/modules/http/src/test/scala/higherkindness/mu/rpc/http/GreeterRestTests.scala @@ -0,0 +1,254 @@ +/* + * Copyright 2017-2019 47 Degrees, LLC. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package higherkindness.mu.rpc.http + +import cats.effect.{IO, _} +import fs2.Stream +import fs2.interop.reactivestreams._ +import higherkindness.mu.http.{ResponseError, UnexpectedError} +import higherkindness.mu.rpc.common.RpcBaseTestSuite +import higherkindness.mu.http.implicits._ +import io.circe.Json +import io.circe.generic.auto._ +import io.circe.syntax._ +import monix.reactive.Observable +import org.http4s._ +import org.http4s.circe._ +import org.http4s.client.UnexpectedStatus +import org.http4s.client.blaze.BlazeClientBuilder +import org.http4s.server.blaze._ +import org.scalatest._ +import org.http4s.implicits._ +import org.http4s.server.Router + +import scala.concurrent.duration._ + +class GreeterRestTests extends RpcBaseTestSuite with BeforeAndAfter { + + val Hostname = "localhost" + val Port = 8080 + + val serviceUri: Uri = Uri.unsafeFromString(s"http://$Hostname:$Port") + + val UnaryServicePrefix = "UnaryGreeter" + val Fs2ServicePrefix = "Fs2Greeter" + val MonixServicePrefix = "MonixGreeter" + + implicit val ec = monix.execution.Scheduler.Implicits.global + implicit val cs: ContextShift[IO] = IO.contextShift(ec) + implicit val timer: Timer[IO] = IO.timer(ec) + + implicit val unaryHandlerIO = new UnaryGreeterHandler[IO] + implicit val fs2HandlerIO = new Fs2GreeterHandler[IO] + implicit val monixHandlerIO = new MonixGreeterHandler[IO] + + val unaryService: HttpRoutes[IO] = new UnaryGreeterRestService[IO].service + val fs2Service: HttpRoutes[IO] = new Fs2GreeterRestService[IO].service + val monixService: HttpRoutes[IO] = new MonixGreeterRestService[IO].service + + val server: BlazeServerBuilder[IO] = BlazeServerBuilder[IO] + .bindHttp(Port, Hostname) + .withHttpApp( + Router( + s"/$UnaryServicePrefix" -> unaryService, + s"/$Fs2ServicePrefix" -> fs2Service, + s"/$MonixServicePrefix" -> monixService).orNotFound) + + var serverTask: Fiber[IO, Nothing] = _ + before(serverTask = server.resource.use(_ => IO.never).start.unsafeRunSync()) + after(serverTask.cancel) + + "REST Server" should { + + "serve a GET request" in { + val request = Request[IO](Method.GET, serviceUri / UnaryServicePrefix / "getHello") + val response = BlazeClientBuilder[IO](ec).resource.use(_.expect[Json](request)) + response.unsafeRunSync() shouldBe HelloResponse("hey").asJson + } + + "serve a POST request" in { + val request = Request[IO](Method.POST, serviceUri / UnaryServicePrefix / "sayHello") + val requestBody = HelloRequest("hey").asJson + val response = + BlazeClientBuilder[IO](ec).resource.use(_.expect[Json](request.withEntity(requestBody))) + response.unsafeRunSync() shouldBe HelloResponse("hey").asJson + } + + "return a 400 Bad Request for a malformed unary POST request" in { + val request = Request[IO](Method.POST, serviceUri / UnaryServicePrefix / "sayHello") + val requestBody = "{" + val response = + BlazeClientBuilder[IO](ec).resource.use(_.expect[Json](request.withEntity(requestBody))) + the[UnexpectedStatus] thrownBy response.unsafeRunSync() shouldBe UnexpectedStatus( + Status.BadRequest) + } + + "return a 400 Bad Request for a malformed streaming POST request" in { + val request = Request[IO](Method.POST, serviceUri / Fs2ServicePrefix / "sayHellos") + val requestBody = "{" + val response = + BlazeClientBuilder[IO](ec).resource.use(_.expect[Json](request.withEntity(requestBody))) + the[UnexpectedStatus] thrownBy response.unsafeRunSync() shouldBe UnexpectedStatus( + Status.BadRequest) + } + + } + + val unaryServiceClient: UnaryGreeterRestClient[IO] = + new UnaryGreeterRestClient[IO](serviceUri / UnaryServicePrefix) + val fs2ServiceClient: Fs2GreeterRestClient[IO] = + new Fs2GreeterRestClient[IO](serviceUri / Fs2ServicePrefix) + val monixServiceClient: MonixGreeterRestClient[IO] = + new MonixGreeterRestClient[IO](serviceUri / MonixServicePrefix) + + "REST Service" should { + + "serve a GET request" in { + val response = BlazeClientBuilder[IO](ec).resource.use(unaryServiceClient.getHello()(_)) + response.unsafeRunSync() shouldBe HelloResponse("hey") + } + + "serve a unary POST request" in { + val request = HelloRequest("hey") + val response = + BlazeClientBuilder[IO](ec).resource.use(unaryServiceClient.sayHello(request)(_)) + response.unsafeRunSync() shouldBe HelloResponse("hey") + } + + "handle a raised gRPC exception in a unary POST request" in { + val request = HelloRequest("SRE") + val response = + BlazeClientBuilder[IO](ec).resource.use(unaryServiceClient.sayHello(request)(_)) + the[ResponseError] thrownBy response.unsafeRunSync() shouldBe ResponseError( + Status.BadRequest, + Some("INVALID_ARGUMENT: SRE")) + } + + "handle a raised non-gRPC exception in a unary POST request" in { + val request = HelloRequest("RTE") + val response = + BlazeClientBuilder[IO](ec).resource.use(unaryServiceClient.sayHello(request)(_)) + the[ResponseError] thrownBy response.unsafeRunSync() shouldBe ResponseError( + Status.InternalServerError, + Some("RTE")) + } + + "handle a thrown exception in a unary POST request" in { + val request = HelloRequest("TR") + val response = + BlazeClientBuilder[IO](ec).resource.use(unaryServiceClient.sayHello(request)(_)) + the[ResponseError] thrownBy response.unsafeRunSync() shouldBe ResponseError( + Status.InternalServerError) + } + + "serve a POST request with fs2 streaming request" in { + val requests = Stream(HelloRequest("hey"), HelloRequest("there")) + val response = + BlazeClientBuilder[IO](ec).resource.use(fs2ServiceClient.sayHellos(requests)(_)) + response.unsafeRunSync() shouldBe HelloResponse("hey, there") + } + + "serve a POST request with empty fs2 streaming request" in { + val requests = Stream.empty + val response = + BlazeClientBuilder[IO](ec).resource.use(fs2ServiceClient.sayHellos(requests)(_)) + response.unsafeRunSync() shouldBe HelloResponse("") + } + + "serve a POST request with Observable streaming request" in { + val requests = Observable(HelloRequest("hey"), HelloRequest("there")) + val response = + BlazeClientBuilder[IO](ec).resource.use(monixServiceClient.sayHellos(requests)(_)) + response.unsafeRunSync() shouldBe HelloResponse("hey, there") + } + + "serve a POST request with empty Observable streaming request" in { + val requests = Observable.empty + val response = + BlazeClientBuilder[IO](ec).resource.use(monixServiceClient.sayHellos(requests)(_)) + response.unsafeRunSync() shouldBe HelloResponse("") + } + + "serve a POST request with fs2 streaming response" in { + val request = HelloRequest("hey") + val responses = + BlazeClientBuilder[IO](ec).stream.flatMap(fs2ServiceClient.sayHelloAll(request)(_)) + responses.compile.toList + .unsafeRunSync() shouldBe List(HelloResponse("hey"), HelloResponse("hey")) + } + + "serve a POST request with Observable streaming response" in { + val request = HelloRequest("hey") + val responses = BlazeClientBuilder[IO](ec).stream + .flatMap(monixServiceClient.sayHelloAll(request)(_).toReactivePublisher.toStream[IO]) + responses.compile.toList + .unsafeRunTimed(10.seconds) + .getOrElse(sys.error("Stuck!")) shouldBe List(HelloResponse("hey"), HelloResponse("hey")) + } + + "handle errors with fs2 streaming response" in { + val request = HelloRequest("") + val responses = + BlazeClientBuilder[IO](ec).stream.flatMap(fs2ServiceClient.sayHelloAll(request)(_)) + the[UnexpectedError] thrownBy responses.compile.toList + .unsafeRunSync() should have message "java.lang.IllegalArgumentException: empty greeting" + } + + "handle errors with Observable streaming response" in { + val request = HelloRequest("") + val responses = BlazeClientBuilder[IO](ec).stream + .flatMap(monixServiceClient.sayHelloAll(request)(_).toReactivePublisher.toStream[IO]) + the[UnexpectedError] thrownBy responses.compile.toList + .unsafeRunTimed(10.seconds) + .getOrElse(sys.error("Stuck!")) should have message "java.lang.IllegalArgumentException: empty greeting" + } + + "serve a POST request with bidirectional fs2 streaming" in { + val requests = Stream(HelloRequest("hey"), HelloRequest("there")) + val responses = + BlazeClientBuilder[IO](ec).stream.flatMap(fs2ServiceClient.sayHellosAll(requests)(_)) + responses.compile.toList + .unsafeRunSync() shouldBe List(HelloResponse("hey"), HelloResponse("there")) + } + + "serve an empty POST request with bidirectional fs2 streaming" in { + val requests = Stream.empty + val responses = + BlazeClientBuilder[IO](ec).stream.flatMap(fs2ServiceClient.sayHellosAll(requests)(_)) + responses.compile.toList.unsafeRunSync() shouldBe Nil + } + + "serve a POST request with bidirectional Observable streaming" in { + val requests = Observable(HelloRequest("hey"), HelloRequest("there")) + val responses = BlazeClientBuilder[IO](ec).stream + .flatMap(monixServiceClient.sayHellosAll(requests)(_).toReactivePublisher.toStream[IO]) + responses.compile.toList + .unsafeRunTimed(10.seconds) + .getOrElse(sys.error("Stuck!")) shouldBe List(HelloResponse("hey"), HelloResponse("there")) + } + + "serve an empty POST request with bidirectional Observable streaming" in { + val requests = Observable.empty + val responses = BlazeClientBuilder[IO](ec).stream + .flatMap(monixServiceClient.sayHellosAll(requests)(_).toReactivePublisher.toStream[IO]) + responses.compile.toList + .unsafeRunTimed(10.seconds) + .getOrElse(sys.error("Stuck!")) shouldBe Nil + } + + } +} diff --git a/modules/http/src/test/scala/higherkindness/mu/rpc/http/GreeterServices.scala b/modules/http/src/test/scala/higherkindness/mu/rpc/http/GreeterServices.scala new file mode 100644 index 000000000..7ac15e1f7 --- /dev/null +++ b/modules/http/src/test/scala/higherkindness/mu/rpc/http/GreeterServices.scala @@ -0,0 +1,54 @@ +/* + * Copyright 2017-2019 47 Degrees, LLC. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package higherkindness.mu.rpc.http + +import higherkindness.mu.rpc.protocol._ + +@message final case class HelloRequest(hello: String) + +@message final case class HelloResponse(hello: String) + +// We don't actually need to split the various streaming types into their own services, +// but this allows for more specific dependencies and type constraints (Sync, Async, Effect...) in their implementations. + +@service(Avro) trait UnaryGreeter[F[_]] { + + @http def getHello(request: Empty.type): F[HelloResponse] + + @http def sayHello(request: HelloRequest): F[HelloResponse] +} + +import fs2.Stream +@service(Avro) trait Fs2Greeter[F[_]] { + + @http def sayHellos(requests: Stream[F, HelloRequest]): F[HelloResponse] + + @http def sayHelloAll(request: HelloRequest): Stream[F, HelloResponse] + + @http def sayHellosAll(requests: Stream[F, HelloRequest]): Stream[F, HelloResponse] +} + +import monix.reactive.Observable +@service(Avro) trait MonixGreeter[F[_]] { + + @http def sayHellos(requests: Observable[HelloRequest]): F[HelloResponse] + + @http def sayHelloAll(request: HelloRequest): Observable[HelloResponse] + + @http def sayHellosAll(requests: Observable[HelloRequest]): Observable[HelloResponse] + +} diff --git a/modules/internal/src/main/scala/higherkindness/mu/rpc/internal/serviceImpl.scala b/modules/internal/src/main/scala/higherkindness/mu/rpc/internal/serviceImpl.scala index f690bcd4b..830c1c138 100644 --- a/modules/internal/src/main/scala/higherkindness/mu/rpc/internal/serviceImpl.scala +++ b/modules/internal/src/main/scala/higherkindness/mu/rpc/internal/serviceImpl.scala @@ -28,6 +28,70 @@ object serviceImpl { import c.universe._ import Flag._ + abstract class TypeTypology(tpe: Tree, inner: Option[Tree]) extends Product with Serializable { + def getTpe: Tree = tpe + def getInner: Option[Tree] = inner + def safeInner: Tree = inner.getOrElse(tpe) + def safeType: Tree = tpe match { + case tq"$s[..$tpts]" if isStreaming => tpts.last + case other => other + } + def flatName: String = safeInner.toString + + def isEmpty: Boolean = this match { + case _: EmptyTpe => true + case _ => false + } + + def isStreaming: Boolean = this match { + case _: Fs2StreamTpe => true + case _: MonixObservableTpe => true + case _ => false + } + } + object TypeTypology { + def apply(t: Tree): TypeTypology = t match { + case tq"Observable[..$tpts]" => MonixObservableTpe(t, tpts.headOption) + case tq"Stream[$carrier, ..$tpts]" => Fs2StreamTpe(t, tpts.headOption) + case tq"Empty.type" => EmptyTpe(t) + case tq"$carrier[..$tpts]" => UnaryTpe(t, tpts.headOption) + } + } + case class EmptyTpe(tpe: Tree) extends TypeTypology(tpe, None) + case class UnaryTpe(tpe: Tree, inner: Option[Tree]) extends TypeTypology(tpe, inner) + case class Fs2StreamTpe(tpe: Tree, inner: Option[Tree]) extends TypeTypology(tpe, inner) + case class MonixObservableTpe(tpe: Tree, inner: Option[Tree]) extends TypeTypology(tpe, inner) + + case class Operation(name: TermName, request: TypeTypology, response: TypeTypology) { + + val isStreaming: Boolean = request.isStreaming || response.isStreaming + + val streamingType: Option[StreamingType] = (request.isStreaming, response.isStreaming) match { + case (true, true) => Some(BidirectionalStreaming) + case (true, false) => Some(RequestStreaming) + case (false, true) => Some(ResponseStreaming) + case _ => None + } + + val validStreamingComb: Boolean = (request, response) match { + case (Fs2StreamTpe(_, _), MonixObservableTpe(_, _)) => false + case (MonixObservableTpe(_, _), Fs2StreamTpe(_, _)) => false + case _ => true + } + + require( + validStreamingComb, + s"RPC service $name has different streaming implementations for request and response") + + val isMonixObservable: Boolean = List(request, response).collect { + case m: MonixObservableTpe => m + }.nonEmpty + + val prevalentStreamingTarget: TypeTypology = + if (streamingType.contains(ResponseStreaming)) response else request + + } + trait SupressWarts[T] { def supressWarts(warts: String*)(t: T): T } @@ -117,7 +181,10 @@ object serviceImpl { params <- d.vparamss _ = require(params.length == 1, s"RPC call ${d.name} has more than one request parameter") p <- params.headOption.toList - } yield RpcRequest(d.name, p.tpt, d.tpt, compressionType(serviceDef.mods.annotations)) + } yield + RpcRequest( + Operation(d.name, TypeTypology(p.tpt), TypeTypology(d.tpt)), + compressionType(serviceDef.mods.annotations)) val imports: List[Tree] = defs.collect { case imp: Import => imp @@ -259,40 +326,25 @@ object serviceImpl { .getOrElse(if (params.isDefinedAt(pos)) params(pos).toString else default.getOrElse(sys.error(s"Missing annotation parameter $name"))) + private def findAnnotation(mods: Modifiers, name: String): Option[Tree] = + mods.annotations find { + case Apply(Select(New(Ident(TypeName(`name`))), _), _) => true + case Apply(Select(New(Select(_, TypeName(`name`))), _), _) => true + case _ => false + } + //todo: validate that the request and responses are case classes, if possible case class RpcRequest( - methodName: TermName, - requestType: Tree, - responseType: Tree, + operation: Operation, compressionOption: Tree ) { - private val requestStreamingImpl: Option[StreamingImpl] = streamingImplFor(requestType) - private val responseStreamingImpl: Option[StreamingImpl] = streamingImplFor(responseType) - private val streamingImpls: Set[StreamingImpl] = - Set(requestStreamingImpl, responseStreamingImpl).flatten - require( - streamingImpls.size < 2, - s"RPC service $serviceName has different streaming implementations for request and response") - private val streamingImpl: Option[StreamingImpl] = streamingImpls.headOption - - private val streamingType: Option[StreamingType] = - if (requestStreamingImpl.isDefined && responseStreamingImpl.isDefined) - Some(BidirectionalStreaming) - else if (requestStreamingImpl.isDefined) Some(RequestStreaming) - else if (responseStreamingImpl.isDefined) Some(ResponseStreaming) - else None - - private def streamingImplFor(t: Tree): Option[StreamingImpl] = t match { - case tq"$tpt[..$tpts]" if tpt.toString.endsWith("Observable") => Some(MonixObservable) - case tq"$tpt[..$tpts]" if tpt.toString.endsWith("Stream") => Some(Fs2Stream) - case _ => None - } + import operation._ - private val clientCallsImpl = streamingImpl match { - case Some(Fs2Stream) => q"_root_.higherkindness.mu.rpc.internal.client.fs2Calls" - case Some(MonixObservable) => q"_root_.higherkindness.mu.rpc.internal.client.monixCalls" - case None => q"_root_.higherkindness.mu.rpc.internal.client.unaryCalls" + private val clientCallsImpl = prevalentStreamingTarget match { + case _: Fs2StreamTpe => q"_root_.higherkindness.mu.rpc.internal.client.fs2Calls" + case _: MonixObservableTpe => q"_root_.higherkindness.mu.rpc.internal.client.monixCalls" + case _ => q"_root_.higherkindness.mu.rpc.internal.client.unaryCalls" } private val streamingMethodType = { @@ -305,15 +357,11 @@ object serviceImpl { q"_root_.io.grpc.MethodDescriptor.MethodType.${TermName(suffix)}" } - private val methodDescriptorName = TermName(methodName + "MethodDescriptor") + private val methodDescriptorName = TermName(name + "MethodDescriptor") - private val reqType = requestType match { - case tq"$s[..$tpts]" if requestStreamingImpl.isDefined => tpts.last - case other => other - } - private val respType = responseType match { - case tq"$x[..$tpts]" => tpts.last - } + private val reqType = request.safeType + + private val respType = response.safeInner val methodDescriptor: DefDef = q""" def $methodDescriptorName(implicit @@ -327,7 +375,7 @@ object serviceImpl { .setType($streamingMethodType) .setFullMethodName( _root_.io.grpc.MethodDescriptor.generateFullMethodName(${lit(serviceName)}, ${lit( - methodName)})) + name)})) .build() } """.supressWarts("Null", "ExplicitImplicitTypes") @@ -338,47 +386,242 @@ object serviceImpl { val clientDef: Tree = streamingType match { case Some(RequestStreaming) => q""" - def $methodName(input: $requestType): $responseType = ${clientCallMethodFor( + def $name(input: ${request.getTpe}): ${response.getTpe} = ${clientCallMethodFor( "clientStreaming")}""" case Some(ResponseStreaming) => q""" - def $methodName(input: $requestType): $responseType = ${clientCallMethodFor( + def $name(input: ${request.getTpe}): ${response.getTpe} = ${clientCallMethodFor( "serverStreaming")}""" case Some(BidirectionalStreaming) => q""" - def $methodName(input: $requestType): $responseType = ${clientCallMethodFor( + def $name(input: ${request.getTpe}): ${response.getTpe} = ${clientCallMethodFor( "bidiStreaming")}""" case None => q""" - def $methodName(input: $requestType): $responseType = ${clientCallMethodFor("unary")}""" + def $name(input: ${request.getTpe}): ${response.getTpe} = ${clientCallMethodFor("unary")}""" } private def serverCallMethodFor(serverMethodName: String) = - q"_root_.higherkindness.mu.rpc.internal.server.monixCalls.${TermName(serverMethodName)}(algebra.$methodName, $compressionOption)" + q"_root_.higherkindness.mu.rpc.internal.server.monixCalls.${TermName(serverMethodName)}(algebra.$name, $compressionOption)" val descriptorAndHandler: Tree = { - val handler = (streamingType, streamingImpl) match { - case (Some(RequestStreaming), Some(Fs2Stream)) => - q"_root_.higherkindness.mu.rpc.internal.server.fs2Calls.clientStreamingMethod(algebra.$methodName, $compressionOption)" - case (Some(RequestStreaming), Some(MonixObservable)) => + val handler = (streamingType, prevalentStreamingTarget) match { + case (Some(RequestStreaming), Fs2StreamTpe(_, _)) => + q"_root_.higherkindness.mu.rpc.internal.server.fs2Calls.clientStreamingMethod(algebra.$name, $compressionOption)" + case (Some(RequestStreaming), MonixObservableTpe(_, _)) => q"_root_.io.grpc.stub.ServerCalls.asyncClientStreamingCall(${serverCallMethodFor("clientStreamingMethod")})" - case (Some(ResponseStreaming), Some(Fs2Stream)) => - q"_root_.higherkindness.mu.rpc.internal.server.fs2Calls.serverStreamingMethod(algebra.$methodName, $compressionOption)" - case (Some(ResponseStreaming), Some(MonixObservable)) => + case (Some(ResponseStreaming), Fs2StreamTpe(_, _)) => + q"_root_.higherkindness.mu.rpc.internal.server.fs2Calls.serverStreamingMethod(algebra.$name, $compressionOption)" + case (Some(ResponseStreaming), MonixObservableTpe(_, _)) => q"_root_.io.grpc.stub.ServerCalls.asyncServerStreamingCall(${serverCallMethodFor("serverStreamingMethod")})" - case (Some(BidirectionalStreaming), Some(Fs2Stream)) => - q"_root_.higherkindness.mu.rpc.internal.server.fs2Calls.bidiStreamingMethod(algebra.$methodName, $compressionOption)" - case (Some(BidirectionalStreaming), Some(MonixObservable)) => + case (Some(BidirectionalStreaming), Fs2StreamTpe(_, _)) => + q"_root_.higherkindness.mu.rpc.internal.server.fs2Calls.bidiStreamingMethod(algebra.$name, $compressionOption)" + case (Some(BidirectionalStreaming), MonixObservableTpe(_, _)) => q"_root_.io.grpc.stub.ServerCalls.asyncBidiStreamingCall(${serverCallMethodFor("bidiStreamingMethod")})" - case (None, None) => - q"_root_.io.grpc.stub.ServerCalls.asyncUnaryCall(_root_.higherkindness.mu.rpc.internal.server.unaryCalls.unaryMethod(algebra.$methodName, $compressionOption))" + case (None, _) => + q"_root_.io.grpc.stub.ServerCalls.asyncUnaryCall(_root_.higherkindness.mu.rpc.internal.server.unaryCalls.unaryMethod(algebra.$name, $compressionOption))" case _ => sys.error( - s"Unable to define a handler for the streaming type $streamingType and $streamingImpl for the method $methodName in the service $serviceName") + s"Unable to define a handler for the streaming type $streamingType and $prevalentStreamingTarget for the method $name in the service $serviceName") } q"($methodDescriptorName, $handler)" } } + + case class HttpOperation(operation: Operation) { + + import operation._ + + val uri = name.toString + + val method: TermName = request match { + case _: EmptyTpe => TermName("GET") + case _ => TermName("POST") + } + + val executionClient: Tree = response match { + case MonixObservableTpe(_, _) => + q"_root_.monix.reactive.Observable.fromReactivePublisher(client.stream(request).flatMap(_.asStream[${response.safeInner}]).toUnicastPublisher)" + case Fs2StreamTpe(_, _) => + q"client.stream(request).flatMap(_.asStream[${response.safeInner}])" + case _ => + q"""client.expectOr[${response.safeInner}](request)(handleResponseError)(jsonOf[F, ${response.safeInner}])""" + } + + val requestTypology: Tree = request match { + case _: UnaryTpe => + q"val request = _root_.org.http4s.Request[F](_root_.org.http4s.Method.$method, uri / ${uri + .replace("\"", "")}).withEntity(req.asJson)" + case _: Fs2StreamTpe => + q"val request = _root_.org.http4s.Request[F](_root_.org.http4s.Method.$method, uri / ${uri + .replace("\"", "")}).withEntity(req.map(_.asJson))" + case _: MonixObservableTpe => + q"val request = _root_.org.http4s.Request[F](_root_.org.http4s.Method.$method, uri / ${uri + .replace("\"", "")}).withEntity(req.toReactivePublisher.toStream.map(_.asJson))" + case _ => + q"val request = _root_.org.http4s.Request[F](_root_.org.http4s.Method.$method, uri / ${uri + .replace("\"", "")})" + } + + val responseEncoder = + q"""implicit val responseEntityDecoder: _root_.org.http4s.EntityDecoder[F, ${response.safeInner}] = jsonOf[F, ${response.safeInner}]""" + + def toRequestTree: Tree = request match { + case _: EmptyTpe => + q"""def $name(client: _root_.org.http4s.client.Client[F])( + implicit responseDecoder: _root_.io.circe.Decoder[${response.safeInner}]): ${response.getTpe} = { + $responseEncoder + $requestTypology + $executionClient + }""" + case _ => + q"""def $name(req: ${request.getTpe})(client: _root_.org.http4s.client.Client[F])( + implicit requestEncoder: _root_.io.circe.Encoder[${request.safeInner}], + responseDecoder: _root_.io.circe.Decoder[${response.safeInner}] + ): ${response.getTpe} = { + $responseEncoder + $requestTypology + $executionClient + }""" + } + + val routeTypology: Tree = (request, response) match { + case (_: Fs2StreamTpe, _: UnaryTpe) => + q"""val requests = msg.asStream[${operation.request.safeInner}] + _root_.org.http4s.Status.Ok.apply(handler.${operation.name}(requests).map(_.asJson))""" + + case (_: UnaryTpe, _: Fs2StreamTpe) => + q"""for { + request <- msg.as[${operation.request.safeInner}] + responses <- _root_.org.http4s.Status.Ok.apply(handler.${operation.name}(request).asJsonEither) + } yield responses""" + + case (_: Fs2StreamTpe, _: Fs2StreamTpe) => + q"""val requests = msg.asStream[${operation.request.safeInner}] + _root_.org.http4s.Status.Ok.apply(handler.${operation.name}(requests).asJsonEither)""" + + case (_: MonixObservableTpe, _: UnaryTpe) => + q"""val requests = msg.asStream[${operation.request.safeInner}] + _root_.org.http4s.Status.Ok.apply(handler.${operation.name}(_root_.monix.reactive.Observable.fromReactivePublisher(requests.toUnicastPublisher)).map(_.asJson))""" + + case (_: UnaryTpe, _: MonixObservableTpe) => + q"""for { + request <- msg.as[${operation.request.safeInner}] + responses <- _root_.org.http4s.Status.Ok.apply(handler.${operation.name}(request).toReactivePublisher.toStream.asJsonEither) + } yield responses""" + + case (_: MonixObservableTpe, _: MonixObservableTpe) => + q"""val requests = msg.asStream[${operation.request.safeInner}] + _root_.org.http4s.Status.Ok.apply(handler.${operation.name}(_root_.monix.reactive.Observable.fromReactivePublisher(requests.toUnicastPublisher)).toReactivePublisher.toStream.asJsonEither)""" + + case (_: EmptyTpe, _) => + q"""_root_.org.http4s.Status.Ok.apply(handler.${operation.name}(_root_.higherkindness.mu.rpc.protocol.Empty).map(_.asJson))""" + + case _ => + q"""for { + request <- msg.as[${operation.request.safeInner}] + response <- _root_.org.http4s.Status.Ok.apply(handler.${operation.name}(request).map(_.asJson)).adaptErrors + } yield response""" + } + + val getPattern = + pq"_root_.org.http4s.Method.GET -> _root_.org.http4s.dsl.impl.Root / ${operation.name.toString}" + val postPattern = + pq"msg @ _root_.org.http4s.Method.POST -> _root_.org.http4s.dsl.impl.Root / ${operation.name.toString}" + + def toRouteTree: Tree = request match { + case _: EmptyTpe => cq"$getPattern => $routeTypology" + case _ => cq"$postPattern => $routeTypology" + } + + } + + val operations: List[HttpOperation] = for { + d <- rpcDefs.collect { case x if findAnnotation(x.mods, "http").isDefined => x } + args <- findAnnotation(d.mods, "http").collect({ case Apply(_, args) => args }).toList + params <- d.vparamss + _ = require(params.length == 1, s"RPC call ${d.name} has more than one request parameter") + p <- params.headOption.toList + } yield HttpOperation(Operation(d.name, TypeTypology(p.tpt), TypeTypology(d.tpt))) + + val streamConstraints: List[Tree] = operations + .find(_.operation.isMonixObservable) + .fold(List(q"F: _root_.cats.effect.Sync[$F]"))( + _ => + List( + q"F: _root_.cats.effect.ConcurrentEffect[$F]", + q"ec: _root_.scala.concurrent.ExecutionContext" + )) + + val executionContextStreaming: List[Tree] = operations + .find(_.operation.isMonixObservable) + .fold(List.empty[Tree])(_ => + List(q"implicit val sc: _root_.monix.execution.Scheduler = _root_.monix.execution.Scheduler(ec)")) + + val httpRequests = operations.map(_.toRequestTree) + + val HttpClient = TypeName("HttpClient") + val httpClientClass = q""" + class $HttpClient[$F_](uri: _root_.org.http4s.Uri)(implicit ..$streamConstraints) { + ..$executionContextStreaming + ..$httpRequests + }""" + + val httpClient = q""" + def httpClient[$F_](uri: _root_.org.http4s.Uri) + (implicit ..$streamConstraints): $HttpClient[$F] = { + new $HttpClient[$F](uri / ${serviceDef.name.toString}) + }""" + + val httpImports: List[Tree] = List( + q"import _root_.higherkindness.mu.http.implicits._", + q"import _root_.fs2.interop.reactivestreams._", + q"import _root_.cats.syntax.flatMap._", + q"import _root_.cats.syntax.functor._", + q"import _root_.org.http4s.circe._", + q"import _root_.io.circe.syntax._" + ) + + val httpRoutesCases: Seq[Tree] = operations.map(_.toRouteTree) + + val routesPF: Tree = q"{ case ..$httpRoutesCases }" + + val requestTypes: Set[String] = + operations.filterNot(_.operation.request.isEmpty).map(_.operation.request.flatName).toSet + + val responseTypes: Set[String] = + operations.filterNot(_.operation.response.isEmpty).map(_.operation.response.flatName).toSet + + val requestDecoders = + requestTypes.map(n => + q"""implicit private val ${TermName("entityDecoder" + n)}:_root_.org.http4s.EntityDecoder[F, ${TypeName( + n)}] = jsonOf[F, ${TypeName(n)}]""") + + val HttpRestService: TypeName = TypeName(serviceDef.name.toString + "RestService") + + val arguments: List[Tree] = List(q"handler: ${serviceDef.name}[F]") ++ + requestTypes.map(n => + q"${TermName("decoder" + n)}: _root_.io.circe.Decoder[${TypeName(n)}]") ++ + responseTypes.map(n => + q"${TermName("encoder" + n)}: _root_.io.circe.Encoder[${TypeName(n)}]") ++ + streamConstraints + + val httpRestServiceClass: Tree = q""" + class $HttpRestService[$F_](implicit ..$arguments) extends _root_.org.http4s.dsl.Http4sDsl[F] { + ..$requestDecoders + ..$executionContextStreaming + def service = _root_.org.http4s.HttpRoutes.of[F]{$routesPF} + }""" + + val httpService = q""" + def route[$F_](implicit ..$arguments): _root_.higherkindness.mu.http.RouteMap[F] = { + _root_.higherkindness.mu.http.RouteMap[F](${serviceDef.name.toString}, new $HttpRestService[$F].service) + }""" + + val http = + if (httpRequests.isEmpty) Nil + else + httpImports ++ List(httpClientClass, httpClient, httpRestServiceClass, httpService) } val classAndMaybeCompanion = annottees.map(_.tree) @@ -420,9 +663,10 @@ object serviceImpl { service.clientFromChannel, service.unsafeClient, service.unsafeClientFromChannel - ) + ) ++ service.http ) ) + List(serviceDef, enrichedCompanion) case _ => sys.error("@service-annotated definition must be a trait or abstract class") } @@ -430,8 +674,4 @@ object serviceImpl { } } -sealed trait StreamingImpl extends Product with Serializable -case object Fs2Stream extends StreamingImpl -case object MonixObservable extends StreamingImpl - // $COVERAGE-ON$ diff --git a/modules/prometheus/client/src/test/scala/higherkindness/mu/rpc/prometheus/client/BaseMonitorClientInterceptorTests.scala b/modules/prometheus/client/src/test/scala/higherkindness/mu/rpc/prometheus/client/BaseMonitorClientInterceptorTests.scala index c390205c6..758a04206 100644 --- a/modules/prometheus/client/src/test/scala/higherkindness/mu/rpc/prometheus/client/BaseMonitorClientInterceptorTests.scala +++ b/modules/prometheus/client/src/test/scala/higherkindness/mu/rpc/prometheus/client/BaseMonitorClientInterceptorTests.scala @@ -307,8 +307,6 @@ abstract class BaseMonitorClientInterceptorTests extends RpcBaseTestSuite { "work when combining multiple calls" in { - ignoreOnTravis("TODO: restore once https://github.com/higherkindness/mu/issues/168 is fixed") - def unary[F[_]](implicit APP: MyRPCClient[F]): F[C] = APP.u(a1.x, a1.y) @@ -327,12 +325,13 @@ abstract class BaseMonitorClientInterceptorTests extends RpcBaseTestSuite { import clientRuntime._ (for { - _ <- serverStart[ConcurrentMonad] - _ <- unary[ConcurrentMonad] - _ <- clientStreaming[ConcurrentMonad] - assertion <- check - _ <- serverStop[ConcurrentMonad] - } yield assertion).unsafeRunSync() + _ <- serverStart[ConcurrentMonad] + _ <- unary[ConcurrentMonad] + _ <- clientStreaming[ConcurrentMonad] + _ <- serverStop[ConcurrentMonad] + } yield ()).unsafeRunSync() + + check.unsafeRunSync() } diff --git a/project/ProjectPlugin.scala b/project/ProjectPlugin.scala index 960a0382e..cc67afeae 100644 --- a/project/ProjectPlugin.scala +++ b/project/ProjectPlugin.scala @@ -34,6 +34,7 @@ object ProjectPlugin extends AutoPlugin { val fs2Grpc: String = "0.4.0-M6" val grpc: String = "1.18.0" val jodaTime: String = "2.10.1" + val http4s = "0.20.0-M6" val kindProjector: String = "0.9.9" val log4s: String = "1.7.0" val logback: String = "1.2.3" @@ -127,6 +128,19 @@ object ProjectPlugin extends AutoPlugin { ) ) + lazy val httpSettings: Seq[Def.Setting[_]] = Seq( + libraryDependencies ++= Seq( + %%("http4s-dsl", V.http4s), + %%("http4s-blaze-server", V.http4s), + %%("http4s-circe", V.http4s), + "co.fs2" %% "fs2-reactive-streams" % V.reactiveStreams, + %%("monix", V.monix), + %%("http4s-blaze-client", V.http4s) % Test, + %%("circe-generic") % Test, + "ch.qos.logback" % "logback-classic" % V.logback % Test + ) + ) + lazy val configSettings: Seq[Def.Setting[_]] = Seq( libraryDependencies ++= Seq( %%("pureconfig", V.pureconfig)