Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support for Monix Task and Observable #214

Merged
merged 4 commits into from
Feb 17, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 15 additions & 2 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ lazy val root = project
.enablePlugins(ScalaJSPlugin)
.settings(skip in publish := true)
.settings(historyPath := None)
.aggregate(coreJVM, coreJS, http4s, akkaHttp, catsInteropJVM, catsInteropJS, codegen)
.aggregate(coreJVM, coreJS, http4s, akkaHttp, catsInteropJVM, catsInteropJS, monixInterop, codegen)

lazy val core = crossProject(JSPlatform, JVMPlatform)
.crossType(CrossType.Pure)
Expand Down Expand Up @@ -107,6 +107,19 @@ lazy val catsInterop = crossProject(JSPlatform, JVMPlatform)
lazy val catsInteropJVM = catsInterop.jvm
lazy val catsInteropJS = catsInterop.js

lazy val monixInterop = project
.in(file("interop/monix"))
.settings(name := "caliban-monix")
.settings(commonSettings)
.settings(
libraryDependencies ++= Seq(
"dev.zio" %% "zio-interop-reactivestreams" % "1.0.3.5-RC3",
"dev.zio" %% "zio-interop-cats" % "2.0.0.0-RC10",
yoohaemin marked this conversation as resolved.
Show resolved Hide resolved
"io.monix" %% "monix" % "3.1.0"
)
)
.dependsOn(coreJVM)

lazy val http4s = project
.in(file("http4s"))
.settings(name := "caliban-http4s")
Expand Down Expand Up @@ -151,7 +164,7 @@ lazy val examples = project
.in(file("examples"))
.settings(commonSettings)
.settings(skip in publish := true)
.dependsOn(akkaHttp, http4s, catsInteropJVM)
.dependsOn(akkaHttp, http4s, catsInteropJVM, monixInterop)

lazy val benchmarks = project
.in(file("benchmarks"))
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package caliban.interop.monix

import caliban.GraphQL.graphQL
import caliban.ResponseValue.{ ObjectValue, StreamValue }
import caliban.RootResolver
import cats.effect.ExitCode
import monix.eval.{ Task, TaskApp }
import monix.reactive.Observable
import zio.DefaultRuntime
import zio.interop.reactiveStreams._

object ExampleMonixInterop extends TaskApp {

import caliban.interop.monix.implicits._

implicit val runtime: DefaultRuntime = new DefaultRuntime {}

case class Number(value: Int)
case class Queries(numbers: List[Number], randomNumber: Task[Number])
case class Subscriptions(numbers: Observable[Int])

val numbers = List(1, 2, 3, 4).map(Number)
val randomNumber = Task.eval(scala.util.Random.nextInt()).map(Number)
val queries = Queries(numbers, randomNumber)

val subscriptions = Subscriptions(Observable.fromIterable(List(1, 2, 3)))
val api = graphQL(RootResolver(queries, Option.empty[Unit], Some(subscriptions)))
val interpreter = api.interpreter

val query = """
{
numbers {
value
}

randomNumber {
value
}
}"""

val subscription = """
subscription {
numbers
}"""

override def run(args: List[String]): Task[ExitCode] =
for {
_ <- api.checkAsync(query)
result <- interpreter.executeAsync(query)
_ <- Task.eval(println(result.data))

_ <- api.checkAsync(subscription)
result <- interpreter.executeAsync(subscription)
_ <- result.data match {
case ObjectValue(("numbers", StreamValue(stream)) :: Nil) =>
// get back an observable
val obs = Observable.fromReactivePublisher(runtime.unsafeRun(stream.toPublisher))
obs.foreachL(println)
case _ => Task.eval(println(s"Wrong result: ${result.data}"))
}
} yield ExitCode.Success
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package caliban.interop.monix

import caliban.introspection.adt.__Type
import caliban.schema.Step.{ QueryStep, StreamStep }
import caliban.schema.{ Schema, Step }
import caliban.{ GraphQL, GraphQLInterpreter, GraphQLResponse, InputValue }
import cats.effect.ConcurrentEffect
import monix.eval.{ Task => MonixTask }
import monix.reactive.Observable
import zio._
import zio.interop.catz._
import zio.interop.reactiveStreams._
import zio.stream.ZStream
import zquery.ZQuery

object MonixInterop {

def executeAsync[R, E](graphQL: GraphQLInterpreter[R, E])(
query: String,
operationName: Option[String] = None,
variables: Map[String, InputValue] = Map(),
skipValidation: Boolean = false
)(implicit runtime: Runtime[R]): MonixTask[GraphQLResponse[E]] =
MonixTask.async { cb =>
val execution = graphQL.execute(query, operationName, variables, skipValidation)
runtime.unsafeRunAsync(execution)(exit => cb(exit.toEither))
}

def checkAsync[R](graphQL: GraphQL[R])(query: String)(implicit runtime: Runtime[R]): MonixTask[Unit] =
MonixTask.async { cb =>
runtime.unsafeRunAsync(graphQL.check(query))(exit => cb(exit.toEither))
}

def taskSchema[R, A](implicit ev: Schema[R, A], ev2: ConcurrentEffect[MonixTask]): Schema[R, MonixTask[A]] =
new Schema[R, MonixTask[A]] {
override def toType(isInput: Boolean): __Type = ev.toType(isInput)
override def optional: Boolean = ev.optional
override def resolve(value: MonixTask[A]): Step[R] =
QueryStep(ZQuery.fromEffect(value.to[Task].map(ev.resolve)))
}

def observableSchema[R, A](
queueSize: Int
)(implicit ev: Schema[R, A], ev2: ConcurrentEffect[MonixTask]): Schema[R, Observable[A]] =
new Schema[R, Observable[A]] {
override def optional: Boolean = ev.optional
override def toType(isInput: Boolean = false): __Type = ev.toType(isInput)
override def resolve(value: Observable[A]): Step[R] =
StreamStep(
ZStream.flatten(
ZStream.fromEffect(
MonixTask
.deferAction(
implicit sc => MonixTask.eval(value.toReactivePublisher.toStream(queueSize).map(ev.resolve))
)
.to[Task]
)
)
)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package caliban.interop.monix

import caliban.schema.{ Schema, SubscriptionSchema }
import caliban.{ GraphQL, GraphQLInterpreter, GraphQLResponse, InputValue }
import cats.effect.ConcurrentEffect
import monix.eval.Task
import monix.reactive.Observable
import zio.Runtime

package object implicits {

implicit class MonixGraphQLInterpreter[R, E](underlying: GraphQLInterpreter[R, E]) {

def executeAsync(
query: String,
operationName: Option[String] = None,
variables: Map[String, InputValue] = Map(),
skipValidation: Boolean = false
)(implicit runtime: Runtime[R]): Task[GraphQLResponse[E]] =
MonixInterop.executeAsync(underlying)(
query,
operationName,
variables,
skipValidation
)
}

implicit class MonixGraphQL[R, E](underlying: GraphQL[R]) {

def checkAsync(query: String)(implicit runtime: Runtime[R]): Task[Unit] =
MonixInterop.checkAsync(underlying)(query)
}

implicit def effectSchema[R, A](implicit ev: Schema[R, A], ev2: ConcurrentEffect[Task]): Schema[R, Task[A]] =
MonixInterop.taskSchema

implicit def observableSchema[R, A](
implicit ev: Schema[R, A],
ev2: ConcurrentEffect[Task]
): Schema[R, Observable[A]] =
MonixInterop.observableSchema(16) // Size of the internal buffer. Use a power of 2 for best performance.

implicit def observableSubscriptionSchema[A]: SubscriptionSchema[Observable[A]] =
new SubscriptionSchema[Observable[A]] {}
}
5 changes: 3 additions & 2 deletions vuepress/docs/docs/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,10 @@ libraryDependencies += "com.github.ghostdogpr" %% "caliban" % "0.5.1"
The following modules are optional:

```
libraryDependencies += "com.github.ghostdogpr" %% "caliban-http4s" % "0.5.1" // routes for http4s
libraryDependencies += "com.github.ghostdogpr" %% "caliban-http4s" % "0.5.1" // routes for http4s
libraryDependencies += "com.github.ghostdogpr" %% "caliban-akka-http" % "0.5.1" // routes for akka-http
libraryDependencies += "com.github.ghostdogpr" %% "caliban-cats" % "0.5.1" // interop with cats effect
libraryDependencies += "com.github.ghostdogpr" %% "caliban-cats" % "0.5.1" // interop with cats effect
libraryDependencies += "com.github.ghostdogpr" %% "caliban-monix" % "0.5.1" // interop with monix
```

Note that Caliban is also available for ScalaJS.
Expand Down
49 changes: 47 additions & 2 deletions vuepress/docs/docs/interop.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
# Interop (Cats)
# Interop (Cats, Monix)

If you prefer using [Monix](https://github.com/monix/monix) or [Cats IO](https://github.com/typelevel/cats-effect) rather than ZIO, you can use the `caliban-cats` module.
If you prefer using [Cats Effect](https://github.com/typelevel/cats-effect) or [Monix](https://github.com/monix/monix) rather than ZIO, you can use the respective `caliban-cats` and `caliban-monix` modules.

## Cats Effect
You first need to import `caliban.interop.cats.implicits._` and have an implicit `zio.Runtime` in scope. Then a few helpers are available:

- the GraphQL object is enriched with `executeAsync` and `checkAsync`, variants of `execute` and `check` that return an `F[_]: Async` instead of a `ZIO`.
Expand Down Expand Up @@ -42,3 +43,47 @@ object ExampleCatsInterop extends IOApp {
```

You can find this example within the [examples](https://github.com/ghostdogpr/caliban/blob/master/examples/src/main/scala/caliban/interop/cats/ExampleCatsInterop.scala) project.

## Monix
You first need to import `caliban.interop.monix.implicits._` and have an implicit `zio.Runtime` in scope. Then a few helpers are available:

- the GraphQL object is enriched with `executeAsync` and `checkAsync`, variants of `execute` and `check` that return a Monix `Task` instead of a `ZIO`.

In addition to that, a `Schema` for any Monix `Task` as well as `Observable` is provided.

The following example shows how to create an interpreter and run a query while only using Monix Task.

```scala
import caliban.GraphQL.graphQL
import caliban.RootResolver
import caliban.interop.monix.implicits._
import cats.effect.ExitCode
import monix.eval.{ Task, TaskApp }
import monix.execution.Scheduler
import zio.DefaultRuntime

object ExampleMonixInterop extends TaskApp {

implicit val runtime = new DefaultRuntime {}
implicit val monixScheduler: Scheduler = scheduler

case class Queries(numbers: List[Int], randomNumber: Task[Int])

val queries = Queries(List(1, 2, 3, 4), Task.eval(scala.util.Random.nextInt()))
val interpreter = graphQL(RootResolver(queries)).interpreter

val query = """
{
numbers
randomNumber
}"""

override def run(args: List[String]): Task[ExitCode] =
for {
result <- interpreter.executeAsync(query)
_ <- Task.eval(println(result.data))
} yield ExitCode.Success
}
```

You can find this example within the [examples](https://github.com/ghostdogpr/caliban/blob/master/examples/src/main/scala/caliban/interop/monix/ExampleMonixInterop.scala) project.