Skip to content

Commit

Permalink
Prototype of google cloud pub/sub connector using akka-grpc
Browse files Browse the repository at this point in the history
  • Loading branch information
2m committed Mar 16, 2018
1 parent 6fcbda3 commit 5048e1e
Show file tree
Hide file tree
Showing 6 changed files with 162 additions and 0 deletions.
8 changes: 8 additions & 0 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ lazy val modules: Seq[ProjectReference] = Seq(
ftp,
geode,
googleCloudPubSub,
googleCloudPubSubGrpc,
hbase,
ironmq,
jms,
Expand Down Expand Up @@ -102,6 +103,13 @@ lazy val googleCloudPubSub = alpakkaProject(
parallelExecution in Test := false
)

lazy val googleCloudPubSubGrpc = alpakkaProject(
"google-cloud-pub-sub-grpc",
"googlecloud.pubsub.grpc",
Dependencies.GooglePubSubGrpc,
PB.protoSources in Compile += target.value / "protobuf_external"

This comment has been minimized.

Copy link
@raboof

raboof Mar 16, 2018

Perhaps we should add this by default from the AkkaGrpcPlugin? (added akka#109)

).enablePlugins(AkkaGrpcPlugin, JavaAgent)

lazy val hbase = alpakkaProject("hbase", "hbase", Dependencies.HBase, fork in Test := true)

lazy val ironmq = alpakkaProject("ironmq", "ironmq", Dependencies.IronMq)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Copyright (C) 2016-2018 Lightbend Inc. <http://www.lightbend.com>
*/

package akka.stream.alpakka.googlecloud.pubsub

import akka.stream.Materializer
import com.google.auth.Credentials
import com.google.pubsub.v1.pubsub.{PublisherClient, SubscriberClient}
import io.grpc.auth.MoreCallCredentials
import io.grpc.{CallOptions, ManagedChannelBuilder}

import scala.concurrent.ExecutionContext

private object GrpcApi extends GrpcApi {
final val DefaultPubSubGoogleApisHost = "pubsub.googleapis.com"
}

private trait GrpcApi {
import GrpcApi._

private val channel = ManagedChannelBuilder.forTarget(DefaultPubSubGoogleApisHost).build()

def publisher(credentials: Credentials)(implicit mat: Materializer, ec: ExecutionContext) =
new PublisherClient(channel,
CallOptions.DEFAULT
.withCallCredentials(MoreCallCredentials.from(credentials)))

def subscriber(credentials: Credentials)(implicit mat: Materializer, ec: ExecutionContext) =
new SubscriberClient(channel,
CallOptions.DEFAULT
.withCallCredentials(MoreCallCredentials.from(credentials)))
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Copyright (C) 2016-2018 Lightbend Inc. <http://www.lightbend.com>
*/

package akka.stream.alpakka.googlecloud.pubsub.scaladsl

import akka.stream.Materializer
import akka.stream.alpakka.googlecloud.pubsub._
import akka.stream.scaladsl.{Flow, Keep, Sink, Source}
import akka.{Done, NotUsed}
import com.google.auth.Credentials
import com.google.pubsub.v1.pubsub._

import scala.collection.immutable
import scala.concurrent.Future

object GooglePubSub extends GooglePubSub {
private[pubsub] override val grpcApi = GrpcApi
}

protected[pubsub] trait GooglePubSub {
private[pubsub] def grpcApi: GrpcApi

/**
* Creates a flow to that publish messages to a topic and emits the message ids
*/
def publish(creds: Credentials, parallelism: Int = 1)(
implicit materializer: Materializer
): Flow[PublishRequest, immutable.Seq[String], NotUsed] = {
import materializer.executionContext

Flow[PublishRequest]
.mapAsyncUnordered(parallelism)(grpcApi.publisher(creds).publish)
.map(_.messageIds.toVector)
}

def subscribe(subscription: String, creds: Credentials)(
implicit materializer: Materializer
): Source[ReceivedMessage, NotUsed] = {
import materializer.executionContext

val req = StreamingPullRequest(subscription = subscription)
grpcApi
.subscriber(creds)
.streamingPull(Source.single(req))
.mapConcat(_.receivedMessages.toVector)
.mapMaterializedValue(_ => NotUsed)
}

def acknowledge(creds: Credentials,
parallelism: Int = 1)(implicit materializer: Materializer): Sink[AcknowledgeRequest, Future[Done]] = {
import materializer.executionContext

Flow[AcknowledgeRequest]
.mapAsyncUnordered(parallelism)(grpcApi.subscriber(creds).acknowledge)
.toMat(Sink.ignore)(Keep.right)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Copyright (C) 2016-2018 Lightbend Inc. <http://www.lightbend.com>
*/

package akka.stream.alpakka.googlecloud.pubsub

import akka.actor.ActorSystem
import akka.stream.{ActorMaterializer, Materializer}
import akka.stream.alpakka.googlecloud.pubsub.scaladsl.GooglePubSub
import akka.stream.scaladsl.Source
import com.google.auth.oauth2.GoogleCredentials
import com.google.protobuf.ByteString
import com.google.pubsub.v1.pubsub.{PublishRequest, PubsubMessage}

import scala.collection.immutable
import scala.concurrent.Await
import scala.concurrent.duration._

object TestApp {
def main(args: Array[String]): Unit = {
implicit val sys = ActorSystem()
implicit val mat = ActorMaterializer()

try {
run()
scala.io.StdIn.readLine()
} finally {
Await.ready(sys.terminate(), 5.seconds)
}
}

def run()(implicit sys: ActorSystem, mat: Materializer): Unit = {
val requests = Source(
immutable.Seq(
PublishRequest(
"projects/kraziu-tvirtove/topics/testTopic",
Seq(
PubsubMessage(ByteString.copyFromUtf8("hi")),
PubsubMessage(ByteString.copyFromUtf8("my name is")),
PubsubMessage(ByteString.copyFromUtf8("who"))
)
)
)
)

// Loads credentials from a json file, which is found
// from GOOGLE_APPLICATION_CREDENTIALS env variable
val creds = GoogleCredentials.getApplicationDefault

requests.via(GooglePubSub.publish(creds)).runForeach(println)
}
}
8 changes: 8 additions & 0 deletions project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,14 @@ object Dependencies {
)
)

val GooglePubSubGrpc = Seq(
libraryDependencies ++= Seq(
"io.grpc" % "grpc-auth" % "1.10.0", // ApacheV2
"com.google.api.grpc" % "grpc-google-cloud-pubsub-v1" % "0.3.0" % "protobuf", // ApacheV2
"com.google.auth" % "google-auth-library-oauth2-http" % "0.9.0" % "test" // BSD 3-clause
)
)

val HBase = {
val hbaseVersion = "1.2.4"
val hadoopVersion = "2.5.1"
Expand Down
3 changes: 3 additions & 0 deletions project/plugins.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,6 @@ addSbtPlugin("com.eed3si9n" % "sbt-unidoc" % "0.4.1")
addSbtPlugin("com.thoughtworks.sbt-api-mappings" % "sbt-api-mappings" % "2.0.2")
addSbtPlugin("org.foundweekends" % "sbt-bintray" % "0.5.2")
addSbtPlugin("com.lightbend" % "sbt-whitesource" % "0.1.10")

addSbtPlugin("com.lightbend.akka.grpc" % "akka-grpc-sbt-plugin" % "4ba91516")
addSbtPlugin("com.lightbend.sbt" % "sbt-javaagent" % "0.1.4")

0 comments on commit 5048e1e

Please sign in to comment.