From d9dec580d5eabd22e35bdbec3778fe3434b3b8ec Mon Sep 17 00:00:00 2001 From: Olivier NOUGUIER Date: Thu, 23 Feb 2017 21:45:29 +0100 Subject: [PATCH] Initial commit for apache geode connector --- build.sbt | 10 + docker-compose.yml | 20 ++ docs/src/main/paradox/connectors.md | 1 + docs/src/main/paradox/geode.md | 180 ++++++++++++++ geode/scripts/cache.xml | 29 +++ geode/scripts/geode.sh | 13 + geode/scripts/start-local.sh | 2 + .../alpakka/geode/javadsl/ReactiveGeode.java | 107 +++++++++ .../alpakka/geode/AkkaPdxSerializer.scala | 10 + .../stream/alpakka/geode/GeodeSettings.scala | 41 ++++ .../alpakka/geode/internal/GeodeCache.scala | 47 ++++ .../geode/internal/GeodeCapabilities.scala | 32 +++ .../pdx/DelegatingPdxSerializer.scala | 61 +++++ .../geode/internal/pdx/PdxDecoder.scala | 165 +++++++++++++ .../geode/internal/pdx/PdxEncoder.scala | 227 ++++++++++++++++++ .../internal/pdx/ShapelessPdxSerializer.scala | 33 +++ .../stage/GeodeCQueryGraphLogic.scala | 119 +++++++++ .../stage/GeodeContinuousSourceStage.scala | 62 +++++ .../stage/GeodeFiniteSourceStage.scala | 45 ++++ .../geode/internal/stage/GeodeFlowStage.scala | 54 +++++ .../internal/stage/GeodeQueryGraphLogic.scala | 24 ++ .../stage/GeodeSourceStageLogic.scala | 31 +++ .../geode/scaladsl/ReactiveGeode.scala | 112 +++++++++ .../stream/alpakka/geode/javadsl/Animal.java | 34 +++ .../geode/javadsl/AnimalPdxSerializer.java | 39 +++ .../geode/javadsl/GeodeBaseTestCase.java | 82 +++++++ .../GeodeContinuousSourceTestCase.java | 54 +++++ .../javadsl/GeodeFiniteSourceTestCase.java | 40 +++ .../geode/javadsl/GeodeFlowTestCase.java | 45 ++++ .../geode/javadsl/GeodeSinkTestCase.java | 64 +++++ .../stream/alpakka/geode/javadsl/Person.java | 36 +++ .../geode/javadsl/PersonPdxSerializer.java | 40 +++ geode/src/test/resources/log4j2.xml | 23 ++ geode/src/test/resources/logback.xml | 16 ++ .../geode/internal/pdx/PDXDecoderSpec.scala | 47 ++++ .../geode/internal/pdx/PDXEncodeSpec.scala | 36 +++ .../geode/internal/pdx/PdxWriterMock.scala | 82 +++++++ .../geode/scaladsl/GeodeBaseSpec.scala | 48 ++++ .../scaladsl/GeodeContinuousSourceSpec.scala | 56 +++++ .../scaladsl/GeodeFiniteSourceSpec.scala | 49 ++++ .../geode/scaladsl/GeodeFlowSpec.scala | 47 ++++ .../geode/scaladsl/GeodeSinkSpec.scala | 64 +++++ .../stream/alpakka/geode/scaladsl/Model.scala | 11 + .../geode/scaladsl/PersonPdxSerializer.scala | 32 +++ project/Dependencies.scala | 15 ++ 45 files changed, 2385 insertions(+) create mode 100644 docs/src/main/paradox/geode.md create mode 100644 geode/scripts/cache.xml create mode 100755 geode/scripts/geode.sh create mode 100755 geode/scripts/start-local.sh create mode 100644 geode/src/main/java/akka/stream/alpakka/geode/javadsl/ReactiveGeode.java create mode 100644 geode/src/main/scala/akka/stream/alpakka/geode/AkkaPdxSerializer.scala create mode 100644 geode/src/main/scala/akka/stream/alpakka/geode/GeodeSettings.scala create mode 100644 geode/src/main/scala/akka/stream/alpakka/geode/internal/GeodeCache.scala create mode 100644 geode/src/main/scala/akka/stream/alpakka/geode/internal/GeodeCapabilities.scala create mode 100644 geode/src/main/scala/akka/stream/alpakka/geode/internal/pdx/DelegatingPdxSerializer.scala create mode 100644 geode/src/main/scala/akka/stream/alpakka/geode/internal/pdx/PdxDecoder.scala create mode 100644 geode/src/main/scala/akka/stream/alpakka/geode/internal/pdx/PdxEncoder.scala create mode 100644 geode/src/main/scala/akka/stream/alpakka/geode/internal/pdx/ShapelessPdxSerializer.scala create mode 100644 geode/src/main/scala/akka/stream/alpakka/geode/internal/stage/GeodeCQueryGraphLogic.scala create mode 100644 geode/src/main/scala/akka/stream/alpakka/geode/internal/stage/GeodeContinuousSourceStage.scala create mode 100644 geode/src/main/scala/akka/stream/alpakka/geode/internal/stage/GeodeFiniteSourceStage.scala create mode 100644 geode/src/main/scala/akka/stream/alpakka/geode/internal/stage/GeodeFlowStage.scala create mode 100644 geode/src/main/scala/akka/stream/alpakka/geode/internal/stage/GeodeQueryGraphLogic.scala create mode 100644 geode/src/main/scala/akka/stream/alpakka/geode/internal/stage/GeodeSourceStageLogic.scala create mode 100644 geode/src/main/scala/akka/stream/alpakka/geode/scaladsl/ReactiveGeode.scala create mode 100644 geode/src/test/java/akka/stream/alpakka/geode/javadsl/Animal.java create mode 100644 geode/src/test/java/akka/stream/alpakka/geode/javadsl/AnimalPdxSerializer.java create mode 100644 geode/src/test/java/akka/stream/alpakka/geode/javadsl/GeodeBaseTestCase.java create mode 100644 geode/src/test/java/akka/stream/alpakka/geode/javadsl/GeodeContinuousSourceTestCase.java create mode 100644 geode/src/test/java/akka/stream/alpakka/geode/javadsl/GeodeFiniteSourceTestCase.java create mode 100644 geode/src/test/java/akka/stream/alpakka/geode/javadsl/GeodeFlowTestCase.java create mode 100644 geode/src/test/java/akka/stream/alpakka/geode/javadsl/GeodeSinkTestCase.java create mode 100644 geode/src/test/java/akka/stream/alpakka/geode/javadsl/Person.java create mode 100644 geode/src/test/java/akka/stream/alpakka/geode/javadsl/PersonPdxSerializer.java create mode 100644 geode/src/test/resources/log4j2.xml create mode 100644 geode/src/test/resources/logback.xml create mode 100644 geode/src/test/scala/akka/stream/alpakka/geode/internal/pdx/PDXDecoderSpec.scala create mode 100644 geode/src/test/scala/akka/stream/alpakka/geode/internal/pdx/PDXEncodeSpec.scala create mode 100644 geode/src/test/scala/akka/stream/alpakka/geode/internal/pdx/PdxWriterMock.scala create mode 100644 geode/src/test/scala/akka/stream/alpakka/geode/scaladsl/GeodeBaseSpec.scala create mode 100644 geode/src/test/scala/akka/stream/alpakka/geode/scaladsl/GeodeContinuousSourceSpec.scala create mode 100644 geode/src/test/scala/akka/stream/alpakka/geode/scaladsl/GeodeFiniteSourceSpec.scala create mode 100644 geode/src/test/scala/akka/stream/alpakka/geode/scaladsl/GeodeFlowSpec.scala create mode 100644 geode/src/test/scala/akka/stream/alpakka/geode/scaladsl/GeodeSinkSpec.scala create mode 100644 geode/src/test/scala/akka/stream/alpakka/geode/scaladsl/Model.scala create mode 100644 geode/src/test/scala/akka/stream/alpakka/geode/scaladsl/PersonPdxSerializer.scala diff --git a/build.sbt b/build.sbt index 60a957a802..e4f17a87de 100644 --- a/build.sbt +++ b/build.sbt @@ -9,6 +9,7 @@ lazy val alpakka = project dynamodb, files, ftp, + geode, googleCloudPubSub, hbase, ironmq, @@ -98,6 +99,15 @@ lazy val ftp = project parallelExecution in Test := false ) +lazy val geode = project + .enablePlugins(AutomateHeaderPlugin) + .settings( + name := "akka-stream-alpakka-geode", + Dependencies.Geode, + fork in Test := true, + parallelExecution in Test := false + ) + lazy val googleCloudPubSub = project .in(file("google-cloud-pub-sub")) .enablePlugins(AutomateHeaderPlugin) diff --git a/docker-compose.yml b/docker-compose.yml index 12db706d89..a45ed61cdc 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -22,6 +22,26 @@ services: image: deangiberson/aws-dynamodb-local ports: - "8001:8000" + geode: + container_name: geode + image: apachegeode/geode + hostname: geode + mem_limit: 2g + expose: + - "10334" + - "1099" + - "7575" + - "40404" + ports: + - "1099:1099" + - "10334:10334" + - "7575:7575" + - "7070:7070" + - "40404:40404" + - "8081:8080" + volumes: + - ./geode/scripts/:/scripts/ + command: /scripts/geode.sh ironauth: image: iron/auth ports: diff --git a/docs/src/main/paradox/connectors.md b/docs/src/main/paradox/connectors.md index 2be0bb3b2c..cbac74f149 100644 --- a/docs/src/main/paradox/connectors.md +++ b/docs/src/main/paradox/connectors.md @@ -3,6 +3,7 @@ @@@ index * [AMQP Connector](amqp.md) +* [Apache Geode Connector](geode.md) * [AWS DynamoDB Connector](dynamodb.md) * [AWS Lambda Connector](awslambda.md) * [AWS S3 Connector](s3.md) diff --git a/docs/src/main/paradox/geode.md b/docs/src/main/paradox/geode.md new file mode 100644 index 0000000000..a59ea6aec7 --- /dev/null +++ b/docs/src/main/paradox/geode.md @@ -0,0 +1,180 @@ +#Apache Geode connector + +[Apache Geode](http://geode.apache.org) is a distributed datagrid (ex Gemfire). + +This connector provides flow and a sink to put element in and source to retrieve element from geode. + +Basically it can store data as key, value. Key and value must be serialized, more on this later. + +## Artifacts + +sbt +: @@@vars + ```scala + libraryDependencies += "com.lightbend.akka" %% "akka-stream-alpakka-geode" % "$version$" + ``` + @@@ + +Maven +: @@@vars + ```xml + + com.lightbend.akka + akka-stream-alpakka-geode_$scala.binaryVersion$ + $version$ + + ``` + @@@ + +Gradle +: @@@vars + ```gradle + dependencies { + compile group: "com.lightbend.akka", name: "akka-stream-alpakka-geode_$scala.binaryVersion$", version: "$version$" + } + ``` + @@@ + + +#Usage + +##Connection + +First of all you need to connect to the geode cache. In a client application, connection is handle by a + [ClientCache](https://geode.apache.org/docs/guide/11/basic_config/the_cache/managing_a_client_cache.html). A single + ClientCache per application is enough. ClientCache also holds a single PDXSerializer. + +scala +: @@snip (../../../../geode/src/test/scala/akka/stream/alpakka/geode/scaladsl/GeodeFlowSpec.scala) { #connection } + +java +: @@snip (../../../../geode/src/test/java/akka/stream/alpakka/geode/javadsl/GeodeBaseTestCase.java) { #connection } + +Apache Geode supports continuous queries. Continuous query relies on server event, thus reactive geode needs to listen to + those event. This behaviour, as it consumes more resources is isolated in a scala trait and/or an specialized java class. + +scala +: @@snip (../../../../geode/src/test/scala/akka/stream/alpakka/geode/scaladsl/GeodeContinuousSourceSpec.scala) { #connection-with-pool } + +java +: @@snip (../../../../geode/src/test/java/akka/stream/alpakka/geode/javadsl/GeodeBaseTestCase.java) { #connection-with-pool } + +##Region + +Define a [region](https://geode.apache.org/docs/guide/11/basic_config/data_regions/chapter_overview.html) setting to +describe how to access region and the key extraction function. + +scala +: @@snip (../../../../geode/src/test/scala/akka/stream/alpakka/geode/scaladsl/GeodeBaseSpec.scala) { #region } + +java +: @@snip (../../../../geode/src/test/java/akka/stream/alpakka/geode/javadsl/GeodeBaseTestCase.java) { #region } + + +###Serialization + +Object must be serialized to flow in a geode region. + +* opaque format (eq json/xml) +* java serialisation +* pdx geode format + +PDX format is the only one supported. + +PDXEncoder support many options, see [gemfire_pdx_serialization.html](http://geode.apache.org/docs/guide/11/developing/data_serialization/gemfire_pdx_serialization.html) + +PdxSerializer must be provided to geode when reading or writing to a region. + +scala +: @@snip (../../../../geode/src/test/scala/akka/stream/alpakka/geode/scaladsl/PersonPdxSerializer.scala) { #person-pdx-serializer } + +java +: @@snip (../../../../geode/src/test/java/akka/stream/alpakka/geode/javadsl/PersonPdxSerializer.java) { #person-pdx-serializer } + + + +This project provides a generic solution for scala user based on [shapeless](https://github.com/milessabin/shapeless), then case classe serializer if not provided will be generated compile time. +Java user will need to write by hand their custom serializer. + + +Runtime reflection is also an option see [auto_serialization.html](http://geode.apache.org/docs/guide/11/developing/data_serialization/auto_serialization.html). + +###Flow usage + +This sample stores (case) classes in Geode. + +scala +: @@snip (../../../../geode/src/test/scala/akka/stream/alpakka/geode/scaladsl/GeodeFlowSpec.scala) { #flow } + +java +: @@snip (../../../../geode/src/test/java/akka/stream/alpakka/geode/javadsl/GeodeFlowTestCase.java) { #flow } + + +###Sink usage + +scala +: @@snip (../../../../geode/src/test/scala/akka/stream/alpakka/geode/scaladsl/GeodeSinkSpec.scala) { #sink } + +java +: @@snip (../../../../geode/src/test/java/akka/stream/alpakka/geode/javadsl/GeodeSinkTestCase.java) { #sink } + + +###Source usage + +####Simple query + +Apache Geode support simple queries. + +scala +: @@snip (../../../../geode/src/test/scala/akka/stream/alpakka/geode/scaladsl/GeodeFiniteSourceSpec.scala) { #query } + +java +: @@snip (../../../../geode/src/test/java/akka/stream/alpakka/geode/javadsl/GeodeFiniteSourceTestCase.java) { #query } + + +####Continuous query + + +scala +: @@snip (../../../../geode/src/test/scala/akka/stream/alpakka/geode/scaladsl/GeodeContinuousSourceSpec.scala) { #continuousQuery } + +java +: @@snip (../../../../geode/src/test/java/akka/stream/alpakka/geode/javadsl/GeodeContinuousSourceTestCase.java) { #continuousQuery } + + +##Geode basic command: + +Assuming Apache geode is installed: + +``` +gfsh +``` + +From the geode shell: + +``` +start locator --name=locator +configure pdx --read-serialized=true +start server --name=server + +create region --name=animals --type=PARTITION_REDUNDANT --redundant-copies=2 +create region --name=persons --type=PARTITION_REDUNDANT --redundant-copies=2 + +``` + +###Run the test + +Integration test are run against localhost geode, but IT_GEODE_HOSTNAME environment variable can change this: + +```bash +export IT_GEODE_HOSTNAME=geode-host-locator + +sbt +``` + +From sbt shell + +```sbtshell +project geode +test +``` diff --git a/geode/scripts/cache.xml b/geode/scripts/cache.xml new file mode 100644 index 0000000000..ab32281104 --- /dev/null +++ b/geode/scripts/cache.xml @@ -0,0 +1,29 @@ + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/geode/scripts/geode.sh b/geode/scripts/geode.sh new file mode 100755 index 0000000000..ddc1af16ce --- /dev/null +++ b/geode/scripts/geode.sh @@ -0,0 +1,13 @@ +#!/bin/bash + +rm -rf /geode +mkdir -p /geode/locator +mkdir -p /geode/server + + +gfsh -e "start locator --name=$HOSTNAME-locator --dir=/geode/locator --mcast-port=0 --hostname-for-clients=0.0.0.0" -e "start server --name=$HOSTNAME-server --locators=localhost[10334] --dir=/geode/server --server-port=40404 --max-heap=1G --hostname-for-clients=0.0.0.0 --cache-xml-file=/scripts/cache.xml" + +while true; do + sleep 10 +done + diff --git a/geode/scripts/start-local.sh b/geode/scripts/start-local.sh new file mode 100755 index 0000000000..4559010830 --- /dev/null +++ b/geode/scripts/start-local.sh @@ -0,0 +1,2 @@ +gfsh -e"start locator --name=locator" -e"configure pdx --read-serialized=true" -e "start server --name=server" + diff --git a/geode/src/main/java/akka/stream/alpakka/geode/javadsl/ReactiveGeode.java b/geode/src/main/java/akka/stream/alpakka/geode/javadsl/ReactiveGeode.java new file mode 100644 index 0000000000..a6bf588f60 --- /dev/null +++ b/geode/src/main/java/akka/stream/alpakka/geode/javadsl/ReactiveGeode.java @@ -0,0 +1,107 @@ +/* + * Copyright (C) 2016-2017 Lightbend Inc. + */ +package akka.stream.alpakka.geode.javadsl; + +import akka.Done; +import akka.NotUsed; +import akka.stream.alpakka.geode.AkkaPdxSerializer; +import akka.stream.alpakka.geode.GeodeSettings; +import akka.stream.alpakka.geode.RegionSettings; +import akka.stream.alpakka.geode.internal.GeodeCache; +import akka.stream.alpakka.geode.internal.stage.GeodeContinuousSourceStage; +import akka.stream.alpakka.geode.internal.stage.GeodeFiniteSourceStage; +import akka.stream.alpakka.geode.internal.stage.GeodeFlowStage; +import akka.stream.javadsl.Flow; +import akka.stream.javadsl.Keep; +import akka.stream.javadsl.Sink; +import akka.stream.javadsl.Source; +import org.apache.geode.cache.client.ClientCacheFactory; +import org.apache.geode.cache.query.CqException; +import org.apache.geode.cache.query.CqQuery; +import org.apache.geode.cache.query.QueryService; +import scala.Symbol; +import scala.concurrent.Future; + +import java.util.concurrent.CompletionStage; + + +/** + * Reactive geode without server event subscription. Cannot build continuous source. + */ +public class ReactiveGeode extends GeodeCache { + + final GeodeSettings geodeSettings; + + public ReactiveGeode(GeodeSettings settings) { + super(settings); + this.geodeSettings = settings; + } + + @Override + public ClientCacheFactory configure(ClientCacheFactory factory) { + return factory.addPoolLocator(geodeSettings.hostname(), geodeSettings.port()); + } + + public Source> query( String query, AkkaPdxSerializer serializer) { + + registerPDXSerializer(serializer, serializer.clazz()); + return Source.fromGraph(new GeodeFiniteSourceStage(cache(), query)); + } + + + public Flow flow(RegionSettings regionSettings, AkkaPdxSerializer serializer) { + + registerPDXSerializer(serializer, serializer.clazz()); + + return Flow.fromGraph(new GeodeFlowStage(cache(), regionSettings)); + + } + + public Sink> sink(RegionSettings regionSettings, AkkaPdxSerializer serializer) { + return flow(regionSettings, serializer) + .toMat(Sink.ignore(), Keep.right()); + } + + public void close() { + close(false); + } + + +} + +/** + * Reactive geode with server event subscription. Can build continuous source. + */ +class ReactiveGeodeWithPoolSubscription extends ReactiveGeode { + + /** + * Subscribes to server event. + * @return ClientCacheFactory with server event subscription. + */ + final public ClientCacheFactory configure(ClientCacheFactory factory) { + return super.configure(factory).setPoolSubscriptionEnabled(true); + } + + public ReactiveGeodeWithPoolSubscription(GeodeSettings settings) { + super(settings); + } + + public Source> continuousQuery(String queryName, String query, AkkaPdxSerializer serializer) { + + registerPDXSerializer(serializer, serializer.clazz()); + return Source.fromGraph(new GeodeContinuousSourceStage(cache(), Symbol.apply(queryName), query)); + } + + public boolean closeContinuousQuery(String name) throws CqException { + + QueryService qs = cache().getQueryService(); + CqQuery query = qs.getCq(name); + if (query == null) + return false; + query.close(); + return true; + + } + +} \ No newline at end of file diff --git a/geode/src/main/scala/akka/stream/alpakka/geode/AkkaPdxSerializer.scala b/geode/src/main/scala/akka/stream/alpakka/geode/AkkaPdxSerializer.scala new file mode 100644 index 0000000000..f16bb62161 --- /dev/null +++ b/geode/src/main/scala/akka/stream/alpakka/geode/AkkaPdxSerializer.scala @@ -0,0 +1,10 @@ +/* + * Copyright (C) 2016-2017 Lightbend Inc. + */ +package akka.stream.alpakka.geode + +import org.apache.geode.pdx.PdxSerializer + +trait AkkaPdxSerializer[V] extends PdxSerializer { + def clazz: Class[V] +} diff --git a/geode/src/main/scala/akka/stream/alpakka/geode/GeodeSettings.scala b/geode/src/main/scala/akka/stream/alpakka/geode/GeodeSettings.scala new file mode 100644 index 0000000000..c9b425d895 --- /dev/null +++ b/geode/src/main/scala/akka/stream/alpakka/geode/GeodeSettings.scala @@ -0,0 +1,41 @@ +/* + * Copyright (C) 2016-2017 Lightbend Inc. + */ +package akka.stream.alpakka.geode + +import org.apache.geode.cache.client.ClientCacheFactory + +/** + * General settings to connect Apache Geode. + * + * @param hostname + * @param port default to 10334 + * @param pdxCompat a function that determines if two class are equivalent (java class / scala case class) + */ +case class GeodeSettings(hostname: String, + port: Int = 10334, + configure: Option[ClientCacheFactory => ClientCacheFactory] = None, + pdxCompat: (Class[_], Class[_]) => Boolean = (c1, c2) => + c1.getSimpleName equals c2.getSimpleName) { + + /** + * @param configure function to configure geode client factory + */ + def withConfiguration(configure: ClientCacheFactory => ClientCacheFactory) = + copy(configure = Some(configure)) + + /** + * @param pdxCompat a function that determines if two class are equivalent (java class / scala case class) + */ + def withPdxCompat(pdxCompat: (Class[_], Class[_]) => Boolean) = + copy(pdxCompat = pdxCompat) +} + +object GeodeSettings { + def create(hostname: String) = new GeodeSettings(hostname) + + def create(hostname: String, port: Int) = new GeodeSettings(hostname, port) + +} + +case class RegionSettings[K, V](name: String, keyExtractor: V => K) diff --git a/geode/src/main/scala/akka/stream/alpakka/geode/internal/GeodeCache.scala b/geode/src/main/scala/akka/stream/alpakka/geode/internal/GeodeCache.scala new file mode 100644 index 0000000000..9dc2c6ded3 --- /dev/null +++ b/geode/src/main/scala/akka/stream/alpakka/geode/internal/GeodeCache.scala @@ -0,0 +1,47 @@ +/* + * Copyright (C) 2016-2017 Lightbend Inc. + */ +package akka.stream.alpakka.geode.internal + +import akka.stream.alpakka.geode.GeodeSettings +import akka.stream.alpakka.geode.internal.pdx.DelegatingPdxSerializer +import org.apache.geode.cache.client.{ClientCache, ClientCacheFactory} +import org.apache.geode.pdx.PdxSerializer + +/** + * Base of all geode client. + * + */ +abstract class GeodeCache(geodeSettings: GeodeSettings) { + + private lazy val serializer = new DelegatingPdxSerializer(geodeSettings.pdxCompat) + + protected def registerPDXSerializer[V](pdxSerializer: PdxSerializer, clazz: Class[V]): Unit = + serializer.register(pdxSerializer, clazz) + + /** + * This method will overloaded to provide server event subscription. + * + * @return + */ + protected def configure(factory: ClientCacheFactory): ClientCacheFactory + + /** + * Return ClientCacheFactory: + *
    + *
  • with PDX support
  • + *
  • configured by sub classes
  • + *
  • customized by client application
  • + *
+ * + */ + final protected def newCacheFactory(): ClientCacheFactory = { + val factory = configure(new ClientCacheFactory().setPdxSerializer(serializer)) + geodeSettings.configure.map(_(factory)).getOrElse(factory) + } + + lazy val cache: ClientCache = newCacheFactory().create() + + def close(keepAlive: Boolean = false): Unit = cache.close(keepAlive) + +} diff --git a/geode/src/main/scala/akka/stream/alpakka/geode/internal/GeodeCapabilities.scala b/geode/src/main/scala/akka/stream/alpakka/geode/internal/GeodeCapabilities.scala new file mode 100644 index 0000000000..29914334aa --- /dev/null +++ b/geode/src/main/scala/akka/stream/alpakka/geode/internal/GeodeCapabilities.scala @@ -0,0 +1,32 @@ +/* + * Copyright (C) 2016-2017 Lightbend Inc. + */ +package akka.stream.alpakka.geode.internal + +import akka.stream.alpakka.geode.RegionSettings +import akka.stream.stage.StageLogging +import org.apache.geode.cache.client.{ClientCache, ClientRegionShortcut} + +import scala.util.control.NonFatal + +trait GeodeCapabilities[K, V] { this: StageLogging => + + def regionSettings: RegionSettings[K, V] + + def clientCache: ClientCache + + private lazy val region = + clientCache.createClientRegionFactory[K, V](ClientRegionShortcut.CACHING_PROXY).create(regionSettings.name) + + def put(v: V): Unit = region.put(regionSettings.keyExtractor(v), v) + + def close(): Unit = + try { + if (clientCache.isClosed) + return + region.close() + log.debug("region closed") + } catch { + case NonFatal(ex) => log.error(ex, "Problem occurred during producer region closing") + } +} diff --git a/geode/src/main/scala/akka/stream/alpakka/geode/internal/pdx/DelegatingPdxSerializer.scala b/geode/src/main/scala/akka/stream/alpakka/geode/internal/pdx/DelegatingPdxSerializer.scala new file mode 100644 index 0000000000..bf4f6d27da --- /dev/null +++ b/geode/src/main/scala/akka/stream/alpakka/geode/internal/pdx/DelegatingPdxSerializer.scala @@ -0,0 +1,61 @@ +/* + * Copyright (C) 2016-2017 Lightbend Inc. + */ +package akka.stream.alpakka.geode.internal.pdx + +import java.util.Properties + +import org.apache.geode.cache.Declarable +import org.apache.geode.pdx.{PdxReader, PdxSerializer, PdxWriter} + +/** + * Geode ClientCache does not support more than one serializer. + *
+ * This serializer delegates to lazily registered serializer. + */ +class DelegatingPdxSerializer( + isPdxCompat: (Class[_], Class[_]) => Boolean +) extends PdxSerializer + with Declarable { + + private var serializers = Map[Class[_], PdxSerializer]() + + def register[V](serializer: PdxSerializer, clazz: Class[V]): Unit = synchronized { + if (!serializers.contains(clazz)) + serializers += (clazz -> serializer) + } + + /** + * Marshalls a class with a registered serializer. + * + * @return true on success + */ + override def toData(o: scala.Any, out: PdxWriter): Boolean = + serializers.get(o.getClass).map(_.toData(o, out)).isDefined + + /** + * Unmarshalls with registered serializer. + *
+ * Tries to find a registered serializer for a given class + *
    + *
  • Lookup on class basis
  • + *
  • Iterating through all serializer to find a compatible one
  • + *
+ * By doing this, a java pojo can be unmarshalled from a scala case class (and vice versa) + * + * @return unmarshalled class or null + */ + override def fromData(clazz: Class[_], in: PdxReader): AnyRef = + serializers + .get(clazz) + .map(_.fromData(clazz, in)) + .orElse(serializers.collectFirst { + case (c, ser) if isPdxCompat(c, clazz) => + val v = ser.fromData(clazz, in) + if (v != null) register(ser, clazz) + v + }) + .orNull + + override def init(props: Properties): Unit = {} +} diff --git a/geode/src/main/scala/akka/stream/alpakka/geode/internal/pdx/PdxDecoder.scala b/geode/src/main/scala/akka/stream/alpakka/geode/internal/pdx/PdxDecoder.scala new file mode 100644 index 0000000000..56d05b2d3e --- /dev/null +++ b/geode/src/main/scala/akka/stream/alpakka/geode/internal/pdx/PdxDecoder.scala @@ -0,0 +1,165 @@ +/* + * Copyright (C) 2016-2017 Lightbend Inc. + */ +package akka.stream.alpakka.geode.internal.pdx + +import java.util.{Date, UUID} + +import org.apache.geode.pdx.PdxReader + +import scala.util.{Failure, Success, Try} + +trait PdxDecoder[A] { + + def decode(reader: PdxReader, fieldName: Symbol = null): Try[A] + +} + +object PdxDecoder { + + import shapeless._ + import shapeless.labelled._ + + private def instance[A](f: (PdxReader, Symbol) => Try[A]): PdxDecoder[A] = + new PdxDecoder[A] { + def decode(reader: PdxReader, fieldName: Symbol) = f(reader, fieldName) + } + + implicit val hnilDecoder: PdxDecoder[HNil] = instance((_, _) => Success(HNil)) + + implicit val booleanDecoder: PdxDecoder[Boolean] = instance { + case (reader, fieldName) => + Success(reader.readBoolean(fieldName.name)) + } + + implicit val booleanListDecoder: PdxDecoder[List[Boolean]] = instance { + case (reader, fieldName) => + Success(reader.readBooleanArray(fieldName.name).toList) + } + + implicit val booleanArrayDecoder: PdxDecoder[Array[Boolean]] = instance { + case (reader, fieldName) => + Success(reader.readBooleanArray(fieldName.name)) + } + + implicit val intDecoder: PdxDecoder[Int] = instance { (reader, fieldName) => + Success(reader.readInt(fieldName.name)) + } + + implicit val intListDecoder: PdxDecoder[List[Int]] = instance { (reader, fieldName) => + Success(reader.readIntArray(fieldName.name).toList) + } + + implicit val intArrayDecoder: PdxDecoder[Array[Int]] = instance { (reader, fieldName) => + Success(reader.readIntArray(fieldName.name)) + } + + implicit val doubleDecoder: PdxDecoder[Double] = instance { (reader, fieldName) => + Success(reader.readDouble(fieldName.name)) + } + + implicit val doubleListDecoder: PdxDecoder[List[Double]] = instance { (reader, fieldName) => + Success(reader.readDoubleArray(fieldName.name).toList) + } + + implicit val doubleArrayDecoder: PdxDecoder[Array[Double]] = instance { (reader, fieldName) => + Success(reader.readDoubleArray(fieldName.name)) + } + + implicit val floatDecoder: PdxDecoder[Float] = instance { (reader, fieldName) => + Success(reader.readFloat(fieldName.name)) + } + + implicit val floatListDecoder: PdxDecoder[List[Float]] = instance { (reader, fieldName) => + Success(reader.readFloatArray(fieldName.name).toList) + } + implicit val floatArrayDecoder: PdxDecoder[Array[Float]] = instance { (reader, fieldName) => + Success(reader.readFloatArray(fieldName.name)) + } + + implicit val longDecoder: PdxDecoder[Long] = instance { (reader, fieldName) => + Success(reader.readLong(fieldName.name)) + } + + implicit val longListDecoder: PdxDecoder[List[Long]] = instance { (reader, fieldName) => + Success(reader.readLongArray(fieldName.name).toList) + } + implicit val longArrayDecoder: PdxDecoder[Array[Long]] = instance { (reader, fieldName) => + Success(reader.readLongArray(fieldName.name)) + } + + implicit val charDecoder: PdxDecoder[Char] = instance { + case (reader, fieldName) => + Success(reader.readChar(fieldName.name)) + } + + implicit val charListDecoder: PdxDecoder[List[Char]] = instance { + case (reader, fieldName) => + Success(reader.readCharArray(fieldName.name).toList) + } + implicit val charArrayDecoder: PdxDecoder[Array[Char]] = instance { + case (reader, fieldName) => + Success(reader.readCharArray(fieldName.name)) + } + + implicit val stringDecoder: PdxDecoder[String] = instance { + case (reader, fieldName) => + Success(reader.readString(fieldName.name)) + } + + implicit val stringListDecoder: PdxDecoder[List[String]] = instance { + case (reader, fieldName) => + Success(reader.readStringArray(fieldName.name).toList) + } + + implicit val stringArrayDecoder: PdxDecoder[Array[String]] = instance { + case (reader, fieldName) => + Success(reader.readStringArray(fieldName.name)) + } + + implicit val dategDecoder: PdxDecoder[Date] = instance { + case (reader, fieldName) => + Success(reader.readDate(fieldName.name)) + } + + implicit val uuidDecoder: PdxDecoder[UUID] = instance { + case (reader, fieldName) => + Try(UUID.fromString(reader.readString(fieldName.name))) + } + + implicit def listDecoder[T <: AnyRef]: PdxDecoder[List[T]] = instance { + case (reader, fieldName) => + Try(reader.readObjectArray(fieldName.name).toList.asInstanceOf[List[T]]) + } + + implicit def setDecoder[T <: AnyRef]: PdxDecoder[Set[T]] = instance { + case (reader, fieldName) => + Try(reader.readObjectArray(fieldName.name).toSet.asInstanceOf[Set[T]]) + } + + implicit def hlistDecoder[K <: Symbol, H, T <: HList]( + implicit witness: Witness.Aux[K], + hDecoder: Lazy[PdxDecoder[H]], + tDecoder: Lazy[PdxDecoder[T]] + ): PdxDecoder[FieldType[K, H] :: T] = instance { + case (reader, fieldName) => { + val headField = hDecoder.value.decode(reader, witness.value) + val tailFields = tDecoder.value.decode(reader, fieldName) + (headField, tailFields) match { + case (Success(h), Success(t)) => Success(field[K](h) :: t) + case _ => Failure(null) + } + } + case e => Failure(null) + } + + implicit def objectDecoder[A, Repr <: HList]( + implicit gen: LabelledGeneric.Aux[A, Repr], + hlistDecoder: PdxDecoder[Repr] + ): PdxDecoder[A] = instance { (reader, fieldName) => + hlistDecoder.decode(reader, fieldName).map(gen.from) + } + + def apply[A](implicit ev: PdxDecoder[A]): PdxDecoder[A] = ev + +} diff --git a/geode/src/main/scala/akka/stream/alpakka/geode/internal/pdx/PdxEncoder.scala b/geode/src/main/scala/akka/stream/alpakka/geode/internal/pdx/PdxEncoder.scala new file mode 100644 index 0000000000..16441d39ea --- /dev/null +++ b/geode/src/main/scala/akka/stream/alpakka/geode/internal/pdx/PdxEncoder.scala @@ -0,0 +1,227 @@ +/* + * Copyright (C) 2016-2017 Lightbend Inc. + */ +package akka.stream.alpakka.geode.internal.pdx + +import java.util.{Date, UUID} + +import org.apache.geode.pdx.PdxWriter +import shapeless.ops.hlist.IsHCons + +trait PdxEncoder[A] { + def encode(writer: PdxWriter, a: A, fieldName: Symbol = null): Boolean +} + +object PdxEncoder { + + import shapeless._ + import shapeless.labelled._ + + private def instance[A](f: (PdxWriter, A, Symbol) => Boolean) = + new PdxEncoder[A] { + def encode(writer: PdxWriter, a: A, fieldName: Symbol = null): Boolean = f(writer, a, fieldName) + } + + implicit val hnilEncoder: PdxEncoder[HNil] = + instance[HNil] { case _ => true } + + implicit def hlistEncoder[K <: Symbol, H, T <: shapeless.HList]( + implicit witness: Witness.Aux[K], + isHCons: IsHCons.Aux[H :: T, H, T], + hEncoder: Lazy[PdxEncoder[H]], + tEncoder: Lazy[PdxEncoder[T]] + ): PdxEncoder[FieldType[K, H] :: T] = + instance[FieldType[K, H] :: T] { + case (writer, o, fieldName) => + hEncoder.value.encode(writer, isHCons.head(o), witness.value) + tEncoder.value.encode(writer, isHCons.tail(o), fieldName) + + } + + implicit def objectEncoder[A, Repr <: HList]( + implicit gen: LabelledGeneric.Aux[A, Repr], + hlistEncoder: Lazy[PdxEncoder[Repr]] + ): PdxEncoder[A] = instance { + case (writer, o, fieldName) => + hlistEncoder.value.encode(writer, gen.to(o), fieldName) + } + + def apply[A](implicit enc: PdxEncoder[A]): PdxEncoder[A] = enc + + implicit def booleanEncoder: PdxEncoder[Boolean] = instance { + case (writer: PdxWriter, b: Boolean, fieldName: Symbol) => + writer.writeBoolean(fieldName.name, b) + true + case _ => false + } + + implicit def booleanListEncoder: PdxEncoder[List[Boolean]] = instance { + case (writer: PdxWriter, bs: List[Boolean], fieldName: Symbol) => + writer.writeBooleanArray(fieldName.name, bs.toArray) + true + case _ => false + } + + implicit def booleanArrayEncoder: PdxEncoder[Array[Boolean]] = instance { + case (writer: PdxWriter, bs: Array[Boolean], fieldName: Symbol) => + writer.writeBooleanArray(fieldName.name, bs) + true + case _ => false + } + + implicit def intEncoder: PdxEncoder[Int] = instance { + case (writer: PdxWriter, i: Int, fieldName: Symbol) => + writer.writeInt(fieldName.name, i) + true + case _ => false + } + + implicit def intListEncoder: PdxEncoder[List[Int]] = instance { + case (writer: PdxWriter, is: List[Int], fieldName: Symbol) => + writer.writeIntArray(fieldName.name, is.toArray) + true + case _ => false + } + + implicit def intArrayEncoder: PdxEncoder[Array[Int]] = instance { + case (writer: PdxWriter, is: Array[Int], fieldName: Symbol) => + writer.writeIntArray(fieldName.name, is) + true + case _ => false + } + + implicit def doubleEncoder: PdxEncoder[Double] = instance { + case (writer: PdxWriter, d: Double, fieldName: Symbol) => + writer.writeDouble(fieldName.name, d) + true + case _ => false + } + + implicit def doubleListEncoder: PdxEncoder[List[Double]] = instance { + case (writer: PdxWriter, ds: List[Double], fieldName: Symbol) => + writer.writeDoubleArray(fieldName.name, ds.toArray) + true + case _ => false + } + implicit def doubleArrayEncoder: PdxEncoder[Array[Double]] = instance { + case (writer: PdxWriter, ds: Array[Double], fieldName: Symbol) => + writer.writeDoubleArray(fieldName.name, ds) + true + case _ => false + } + + implicit def floatEncoder: PdxEncoder[Float] = instance { + case (writer: PdxWriter, f: Float, fieldName: Symbol) => + writer.writeFloat(fieldName.name, f) + true + case _ => false + } + + implicit def floatListEncoder: PdxEncoder[List[Float]] = instance { + case (writer: PdxWriter, fs: List[Float], fieldName: Symbol) => + writer.writeFloatArray(fieldName.name, fs.toArray) + true + case _ => false + } + implicit def floatArrayEncoder: PdxEncoder[Array[Float]] = instance { + case (writer: PdxWriter, fs: Array[Float], fieldName: Symbol) => + writer.writeFloatArray(fieldName.name, fs) + true + case _ => false + } + + implicit def longEncoder: PdxEncoder[Long] = instance { + case (writer: PdxWriter, l: Long, fieldName: Symbol) => + writer.writeLong(fieldName.name, l) + true + case _ => false + } + + implicit def longListEncoder: PdxEncoder[List[Long]] = instance { + case (writer: PdxWriter, ls: List[Long], fieldName: Symbol) => + writer.writeLongArray(fieldName.name, ls.toArray) + true + case _ => false + } + implicit def longArrayEncoder: PdxEncoder[Array[Long]] = instance { + case (writer: PdxWriter, ls: Array[Long], fieldName: Symbol) => + writer.writeLongArray(fieldName.name, ls) + true + case _ => false + } + + implicit def dateEncoder: PdxEncoder[Date] = instance { + case (writer: PdxWriter, d: Date, fieldName: Symbol) => + writer.writeDate(fieldName.name, d) + true + case _ => false + } + + implicit def charEncoder: PdxEncoder[Char] = + instance { + case (writer: PdxWriter, c: Char, fieldName: Symbol) => + writer.writeChar(fieldName.name, c) + true + case _ => false + } + implicit def charListEncoder: PdxEncoder[List[Char]] = + instance { + case (writer: PdxWriter, cs: List[Char], fieldName: Symbol) => + writer.writeCharArray(fieldName.name, cs.toArray) + true + case _ => false + } + implicit def charArrayEncoder: PdxEncoder[Array[Char]] = + instance { + case (writer: PdxWriter, cs: Array[Char], fieldName: Symbol) => + writer.writeCharArray(fieldName.name, cs) + true + case _ => false + } + + implicit def stringEncoder: PdxEncoder[String] = + instance { + case (writer: PdxWriter, str: String, fieldName: Symbol) => + writer.writeString(fieldName.name, str) + true + case _ => false + } + + implicit def stringListEncoder: PdxEncoder[List[String]] = + instance { + case (writer: PdxWriter, strs: List[String], fieldName: Symbol) => + writer.writeStringArray(fieldName.name, strs.toArray) + true + case _ => false + } + implicit def stringArrayEncoder: PdxEncoder[Array[String]] = + instance { + case (writer: PdxWriter, strs: Array[String], fieldName: Symbol) => + writer.writeStringArray(fieldName.name, strs) + true + case _ => false + } + + implicit def uuidEncoder: PdxEncoder[UUID] = + instance { + case (writer: PdxWriter, uuid: UUID, fieldName: Symbol) => + writer.writeString(fieldName.name, uuid.toString) + true + case _ => false + } + + implicit def listEncoder[T <: AnyRef]: PdxEncoder[List[T]] = instance { + case (writer: PdxWriter, list: List[T], fieldName: Symbol) => + writer.writeObjectArray(fieldName.name, list.toArray) + true + case _ => false + } + + implicit def setEncoder[T <: AnyRef]: PdxEncoder[Set[T]] = instance { + case (writer: PdxWriter, set: Set[T], fieldName: Symbol) => + writer.writeObjectArray(fieldName.name, set.toArray) + true + case _ => false + } + +} diff --git a/geode/src/main/scala/akka/stream/alpakka/geode/internal/pdx/ShapelessPdxSerializer.scala b/geode/src/main/scala/akka/stream/alpakka/geode/internal/pdx/ShapelessPdxSerializer.scala new file mode 100644 index 0000000000..c6844a1027 --- /dev/null +++ b/geode/src/main/scala/akka/stream/alpakka/geode/internal/pdx/ShapelessPdxSerializer.scala @@ -0,0 +1,33 @@ +/* + * Copyright (C) 2016-2017 Lightbend Inc. + */ +package akka.stream.alpakka.geode.internal.pdx + +import java.util.Properties + +import org.apache.geode.cache.Declarable +import org.apache.geode.pdx.{PdxReader, PdxSerializer, PdxWriter} + +import scala.reflect.ClassTag +import scala.util.Success + +//#shapeless-pdx-serializer +private[geode] class ShapelessPdxSerializer[A <: AnyRef](enc: PdxEncoder[A], + dec: PdxDecoder[A])(implicit tag: ClassTag[A]) + extends PdxSerializer + with Declarable { + + override def toData(o: scala.Any, out: PdxWriter): Boolean = + tag.runtimeClass.isInstance(o) && + enc.encode(out, o.asInstanceOf[A]) + + override def fromData(clazz: Class[_], in: PdxReader): A = + dec.decode(in, null) match { + case Success(e) => e + case _ => null.asInstanceOf[A] + } + + override def init(props: Properties): Unit = {} +} + +//#shapeless-pdx-serializer diff --git a/geode/src/main/scala/akka/stream/alpakka/geode/internal/stage/GeodeCQueryGraphLogic.scala b/geode/src/main/scala/akka/stream/alpakka/geode/internal/stage/GeodeCQueryGraphLogic.scala new file mode 100644 index 0000000000..e7d08399c3 --- /dev/null +++ b/geode/src/main/scala/akka/stream/alpakka/geode/internal/stage/GeodeCQueryGraphLogic.scala @@ -0,0 +1,119 @@ +/* + * Copyright (C) 2016-2017 Lightbend Inc. + */ +package akka.stream.alpakka.geode.internal.stage + +import java.util +import java.util.concurrent.Semaphore + +import akka.stream.stage.{AsyncCallback, StageLogging} +import akka.stream.{Outlet, SourceShape} +import org.apache.geode.cache.client.ClientCache +import org.apache.geode.cache.query.{CqAttributesFactory, CqEvent, CqQuery, Struct} +import org.apache.geode.cache.util.CqListenerAdapter + +import scala.collection.mutable +import scala.util.Try + +abstract class GeodeCQueryGraphLogic[V](val shape: SourceShape[V], + val clientCache: ClientCache, + val queryName: Symbol, + val sql: String) + extends GeodeSourceStageLogic[V](shape, clientCache) + with StageLogging { + + /** + * Queue containing, only + */ + private val incomingQueue = mutable.Queue[V]() + + private val semaphore = new Semaphore(10) + + val onElement: AsyncCallback[V] + + private var query: CqQuery = _ + + override def executeQuery() = Try { + + val cqf = new CqAttributesFactory() + + val eventListener = new CqListenerAdapter() { + override def onEvent(ev: CqEvent): Unit = + onGeodeElement((ev.getNewValue().asInstanceOf[V])) + + override def onError(ev: CqEvent): Unit = + log.error(ev.getThrowable, s"$ev") + + override def close(): Unit = { + log.debug("closes") + inFinish.invoke(()) + } + } + cqf.addCqListener(eventListener) + + val cqa = cqf.create() + + query = qs.newCq(queryName.name, sql, cqa) + + buildInitialResulsIterator(query) + + } + + private def buildInitialResulsIterator(q: CqQuery) = { + val res = q.executeWithInitialResults[Struct] + val it = res.iterator() + new util.Iterator[V] { + override def next(): V = + it.next().getFieldValues()(1).asInstanceOf[V] + + override def hasNext: Boolean = it.hasNext + } + } + + /** + * May lock on semaphore.acquires(). + */ + protected def onGeodeElement(v: V): Unit = { + semaphore.acquire() + onElement.invoke(v) + } + + protected def enqueue(v: V): Unit = + incomingQueue.enqueue(v) + + protected def dequeue(): Option[V] = + if (incomingQueue.isEmpty) + None + else + Some(incomingQueue.dequeue()) + + /** + * Pushes an element downstream and releases a semaphore acquired in onGeodeElement. + */ + protected def pushElement(out: Outlet[V], element: V) = { + push(out, element) + semaphore.release() + } + + override def postStop(): Unit = { + if (clientCache.isClosed) + return + qs.closeCqs() + } + + /** + * Geode upstream is terminated. + */ + @volatile + private var upstreamTerminated = false + + val inFinish: AsyncCallback[Unit] = getAsyncCallback[Unit] { v => + upstreamTerminated = true + handleTerminaison() + } + + def handleTerminaison() = + if (upstreamTerminated && incomingQueue.isEmpty) + completeStage() + +} diff --git a/geode/src/main/scala/akka/stream/alpakka/geode/internal/stage/GeodeContinuousSourceStage.scala b/geode/src/main/scala/akka/stream/alpakka/geode/internal/stage/GeodeContinuousSourceStage.scala new file mode 100644 index 0000000000..1da48bb4f6 --- /dev/null +++ b/geode/src/main/scala/akka/stream/alpakka/geode/internal/stage/GeodeContinuousSourceStage.scala @@ -0,0 +1,62 @@ +/* + * Copyright (C) 2016-2017 Lightbend Inc. + */ +package akka.stream.alpakka.geode.internal.stage + +import akka.Done +import akka.stream.stage._ +import akka.stream.{ActorAttributes, Attributes, Outlet, SourceShape} +import org.apache.geode.cache.client.ClientCache + +import scala.concurrent.{Future, Promise} + +class GeodeContinuousSourceStage[V](cache: ClientCache, name: Symbol, sql: String) + extends GraphStageWithMaterializedValue[SourceShape[V], Future[Done]] { + + override protected def initialAttributes: Attributes = + Attributes + .name("GeodeContinuousSource") + .and(ActorAttributes.dispatcher("akka.stream.default-blocking-io-dispatcher")) + + val out = Outlet[V](s"geode.continuousSource") + + override def shape: SourceShape[V] = SourceShape.of(out) + + override def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, Future[Done]) = { + val subPromise = Promise[Done] + + (new GeodeCQueryGraphLogic[V](shape, cache, name, sql) { + + override val onConnect: AsyncCallback[Unit] = getAsyncCallback[Unit] { v => + subPromise.success(Done) + } + + val onElement: AsyncCallback[V] = getAsyncCallback[V] { element => + if (isAvailable(out)) { + pushElement(out, element) + } else + enqueue(element) + handleTerminaison() + } + + // + // This handler, will first forward initial (old) result, then new ones (continuous). + // + setHandler( + out, + new OutHandler { + override def onPull() = { + if (initialResultsIterator.hasNext) + push(out, initialResultsIterator.next()) + else + dequeue() foreach { e => + pushElement(out, e) + } + handleTerminaison() + } + } + ) + + }, subPromise.future) + } +} diff --git a/geode/src/main/scala/akka/stream/alpakka/geode/internal/stage/GeodeFiniteSourceStage.scala b/geode/src/main/scala/akka/stream/alpakka/geode/internal/stage/GeodeFiniteSourceStage.scala new file mode 100644 index 0000000000..e1792032a1 --- /dev/null +++ b/geode/src/main/scala/akka/stream/alpakka/geode/internal/stage/GeodeFiniteSourceStage.scala @@ -0,0 +1,45 @@ +/* + * Copyright (C) 2016-2017 Lightbend Inc. + */ +package akka.stream.alpakka.geode.internal.stage + +import akka.Done +import akka.stream.stage._ +import akka.stream.{ActorAttributes, Attributes, Outlet, SourceShape} +import org.apache.geode.cache.client.ClientCache + +import scala.concurrent.{Future, Promise} + +class GeodeFiniteSourceStage[V](cache: ClientCache, sql: String) + extends GraphStageWithMaterializedValue[SourceShape[V], Future[Done]] { + + override protected def initialAttributes: Attributes = + Attributes.name("GeodeFiniteSource").and(ActorAttributes.dispatcher("akka.stream.default-blocking-io-dispatcher")) + + val out = Outlet[V]("geode.finiteSource") + + override def shape: SourceShape[V] = SourceShape.of(out) + + override def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, Future[Done]) = { + val subPromise = Promise[Done] + + (new GeodeQueryGraphLogic[V](shape, cache, sql) { + + override val onConnect: AsyncCallback[Unit] = getAsyncCallback[Unit] { v => + subPromise.success(Done) + } + + setHandler( + out, + new OutHandler { + override def onPull() = + if (initialResultsIterator.hasNext) + push(out, initialResultsIterator.next()) + else + completeStage() + } + ) + + }, subPromise.future) + } +} diff --git a/geode/src/main/scala/akka/stream/alpakka/geode/internal/stage/GeodeFlowStage.scala b/geode/src/main/scala/akka/stream/alpakka/geode/internal/stage/GeodeFlowStage.scala new file mode 100644 index 0000000000..373c0ee3c8 --- /dev/null +++ b/geode/src/main/scala/akka/stream/alpakka/geode/internal/stage/GeodeFlowStage.scala @@ -0,0 +1,54 @@ +/* + * Copyright (C) 2016-2017 Lightbend Inc. + */ +package akka.stream.alpakka.geode.internal.stage + +import akka.stream._ +import akka.stream.alpakka.geode.RegionSettings +import akka.stream.alpakka.geode.internal.GeodeCapabilities +import akka.stream.stage._ +import org.apache.geode.cache.client.ClientCache + +private[geode] class GeodeFlowStage[K, T <: AnyRef](cache: ClientCache, settings: RegionSettings[K, T]) + extends GraphStage[FlowShape[T, T]] { + + override protected def initialAttributes: Attributes = + Attributes.name("GeodeFLow").and(ActorAttributes.dispatcher("akka.stream.default-blocking-io-dispatcher")) + + private val in = Inlet[T]("geode.in") + private val out = Outlet[T]("geode.out") + + override val shape = FlowShape(in, out) + + override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = + new GraphStageLogic(shape) with StageLogging with GeodeCapabilities[K, T] { + + override protected def logSource = classOf[GeodeFlowStage[K, T]] + + val regionSettings = settings + + val clientCache = cache + + setHandler(out, new OutHandler { + override def onPull() = + pull(in) + }) + + setHandler(in, new InHandler { + override def onPush() = { + val msg = grab(in) + + put(msg) + + push(out, msg) + } + + }) + + override def postStop() = { + log.debug("Stage completed") + close() + } + } + +} diff --git a/geode/src/main/scala/akka/stream/alpakka/geode/internal/stage/GeodeQueryGraphLogic.scala b/geode/src/main/scala/akka/stream/alpakka/geode/internal/stage/GeodeQueryGraphLogic.scala new file mode 100644 index 0000000000..1ced1896dc --- /dev/null +++ b/geode/src/main/scala/akka/stream/alpakka/geode/internal/stage/GeodeQueryGraphLogic.scala @@ -0,0 +1,24 @@ +/* + * Copyright (C) 2016-2017 Lightbend Inc. + */ +package akka.stream.alpakka.geode.internal.stage + +import akka.stream.SourceShape +import akka.stream.stage.StageLogging +import org.apache.geode.cache.client.ClientCache +import org.apache.geode.cache.query.SelectResults + +import scala.util.Try + +abstract class GeodeQueryGraphLogic[V](val shape: SourceShape[V], val clientCache: ClientCache, val query: String) + extends GeodeSourceStageLogic[V](shape, clientCache) + with StageLogging { + + override def executeQuery() = Try { + qs.newQuery(query) + .execute() + .asInstanceOf[SelectResults[V]] + .iterator() + } + +} diff --git a/geode/src/main/scala/akka/stream/alpakka/geode/internal/stage/GeodeSourceStageLogic.scala b/geode/src/main/scala/akka/stream/alpakka/geode/internal/stage/GeodeSourceStageLogic.scala new file mode 100644 index 0000000000..9ab64e1ef9 --- /dev/null +++ b/geode/src/main/scala/akka/stream/alpakka/geode/internal/stage/GeodeSourceStageLogic.scala @@ -0,0 +1,31 @@ +/* + * Copyright (C) 2016-2017 Lightbend Inc. + */ +package akka.stream.alpakka.geode.internal.stage + +import akka.stream.SourceShape +import akka.stream.stage.{AsyncCallback, GraphStageLogic} +import org.apache.geode.cache.client.ClientCache + +import scala.util.{Failure, Success, Try} + +abstract class GeodeSourceStageLogic[V](shape: SourceShape[V], clientCache: ClientCache) + extends GraphStageLogic(shape) { + + protected var initialResultsIterator: java.util.Iterator[V] = _ + + val onConnect: AsyncCallback[Unit] + + lazy val qs = clientCache.getQueryService() + + def executeQuery(): Try[java.util.Iterator[V]] + + final override def preStart(): Unit = executeQuery() match { + case Success(it) => + initialResultsIterator = it + onConnect.invoke(()) + case Failure(e) => + failStage(e) + + } +} diff --git a/geode/src/main/scala/akka/stream/alpakka/geode/scaladsl/ReactiveGeode.scala b/geode/src/main/scala/akka/stream/alpakka/geode/scaladsl/ReactiveGeode.scala new file mode 100644 index 0000000000..0b8da425bc --- /dev/null +++ b/geode/src/main/scala/akka/stream/alpakka/geode/scaladsl/ReactiveGeode.scala @@ -0,0 +1,112 @@ +/* + * Copyright (C) 2016-2017 Lightbend Inc. + */ +package akka.stream.alpakka.geode.scaladsl + +import akka.stream.alpakka.geode.internal._ +import akka.stream.alpakka.geode.internal.pdx.{PdxDecoder, PdxEncoder, ShapelessPdxSerializer} +import akka.stream.alpakka.geode.internal.stage.{GeodeContinuousSourceStage, GeodeFiniteSourceStage, GeodeFlowStage} +import akka.stream.alpakka.geode.{AkkaPdxSerializer, GeodeSettings, RegionSettings} +import akka.stream.scaladsl.{Flow, Keep, Sink, Source} +import akka.{Done, NotUsed} +import org.apache.geode.cache.client.ClientCacheFactory + +import scala.concurrent.Future +import scala.reflect.ClassTag + +class ReactiveGeode(settings: GeodeSettings) extends GeodeCache(settings) { + + /** + * This method will overloaded to provide server event subscription. + */ + override protected def configure(factory: ClientCacheFactory): ClientCacheFactory = + factory.addPoolLocator(settings.hostname, settings.port) + + def query[V <: AnyRef](query: String, serializer: AkkaPdxSerializer[V]): Source[V, Future[Done]] = { + + registerPDXSerializer(serializer, serializer.clazz) + + Source.fromGraph(new GeodeFiniteSourceStage[V](cache, query)) + } + + def flow[K, V <: AnyRef](settings: RegionSettings[K, V], serializer: AkkaPdxSerializer[V]): Flow[V, V, NotUsed] = { + + registerPDXSerializer(serializer, serializer.clazz) + + Flow.fromGraph(new GeodeFlowStage[K, V](cache, settings)) + } + + def sink[K, V <: AnyRef](settings: RegionSettings[K, V], serializer: AkkaPdxSerializer[V]): Sink[V, Future[Done]] = + Flow[V].via(flow(settings, serializer)).toMat(Sink.ignore)(Keep.right) + + /** + * Shapeless powered implicit serializer. + */ + def query[V <: AnyRef]( + query: String + )(implicit tag: ClassTag[V], enc: PdxEncoder[V], dec: PdxDecoder[V]): Source[V, Future[Done]] = { + + registerPDXSerializer(new ShapelessPdxSerializer[V](enc, dec), tag.runtimeClass) + + Source.fromGraph(new GeodeFiniteSourceStage[V](cache, query)) + } + + /** + * Shapeless powered implicit serializer. + */ + def flow[K, V <: AnyRef]( + settings: RegionSettings[K, V] + )(implicit tag: ClassTag[V], enc: PdxEncoder[V], dec: PdxDecoder[V]): Flow[V, V, NotUsed] = { + + registerPDXSerializer(new ShapelessPdxSerializer[V](enc, dec), tag.runtimeClass) + + Flow.fromGraph(new GeodeFlowStage[K, V](cache, settings)) + } + + /** + * Shapeless powered implicit serializer. + */ + def sink[K, V <: AnyRef]( + settings: RegionSettings[K, V] + )(implicit tag: ClassTag[V], enc: PdxEncoder[V], dec: PdxDecoder[V]): Sink[V, Future[Done]] = + Flow[V].via(flow(settings)).toMat(Sink.ignore)(Keep.right) + +} + +trait PoolSubscription extends ReactiveGeode { + + /** + * Pool subscription is mandatory for continuous query. + */ + final override protected def configure(factory: ClientCacheFactory) = + super.configure(factory).setPoolSubscriptionEnabled(true) + + def continuousQuery[V <: AnyRef](queryName: Symbol, + query: String, + serializer: AkkaPdxSerializer[V]): Source[V, Future[Done]] = { + + registerPDXSerializer(serializer, serializer.clazz) + + Source.fromGraph(new GeodeContinuousSourceStage[V](cache, queryName, query)) + } + + /** + * Shapeless powered implicit serializer. + */ + def continuousQuery[V <: AnyRef]( + queryName: Symbol, + query: String + )(implicit tag: ClassTag[V], enc: PdxEncoder[V], dec: PdxDecoder[V]): Source[V, Future[Done]] = { + + registerPDXSerializer(new ShapelessPdxSerializer[V](enc, dec), tag.runtimeClass) + + Source.fromGraph(new GeodeContinuousSourceStage[V](cache, queryName, query)) + } + + def closeContinuousQuery(queryName: Symbol) = + for { + qs <- Option(cache.getQueryService()) + query <- Option(qs.getCq(queryName.name)) + } yield (query.close()) + +} diff --git a/geode/src/test/java/akka/stream/alpakka/geode/javadsl/Animal.java b/geode/src/test/java/akka/stream/alpakka/geode/javadsl/Animal.java new file mode 100644 index 0000000000..9667572473 --- /dev/null +++ b/geode/src/test/java/akka/stream/alpakka/geode/javadsl/Animal.java @@ -0,0 +1,34 @@ +/* + * Copyright (C) 2016-2017 Lightbend Inc. + */ +package akka.stream.alpakka.geode.javadsl; + +public class Animal { + final private int id; + final private String name; + final private int owner; + + + public Animal(int id, String name, int owner) { + this.id = id; + this.name = name; + this.owner = owner; + } + + public int getId() { + return id; + } + + public String getName() { + return name; + } + + public int getOwner() { + return owner; + } + + @Override + public String toString() { + return getId() +": " + getName() + " owner: " + getOwner(); + } +} \ No newline at end of file diff --git a/geode/src/test/java/akka/stream/alpakka/geode/javadsl/AnimalPdxSerializer.java b/geode/src/test/java/akka/stream/alpakka/geode/javadsl/AnimalPdxSerializer.java new file mode 100644 index 0000000000..d977752641 --- /dev/null +++ b/geode/src/test/java/akka/stream/alpakka/geode/javadsl/AnimalPdxSerializer.java @@ -0,0 +1,39 @@ +/* + * Copyright (C) 2016-2017 Lightbend Inc. + */ +package akka.stream.alpakka.geode.javadsl; + +import akka.stream.alpakka.geode.AkkaPdxSerializer; +import org.apache.geode.pdx.PdxReader; +import org.apache.geode.pdx.PdxWriter; + +//#animal-pdx-serializer +public class AnimalPdxSerializer implements AkkaPdxSerializer { + @Override + public Class clazz() { + return Animal.class; + } + + @Override + public boolean toData(Object o, PdxWriter out) { + if(o instanceof Animal){ + Animal p = (Animal) o; + out.writeInt("id", p.getId()); + out.writeString("name", p.getName()); + out.writeInt("owner", p.getOwner()); + return true; + } + return false; + } + + @Override + public Object fromData(Class clazz, PdxReader in) { + int id = in.readInt("id"); + String name = in.readString("name"); + int owner = in.readInt("owner"); + return new Animal(id, name, owner); + } + + +} +//#animal-pdx-serializer \ No newline at end of file diff --git a/geode/src/test/java/akka/stream/alpakka/geode/javadsl/GeodeBaseTestCase.java b/geode/src/test/java/akka/stream/alpakka/geode/javadsl/GeodeBaseTestCase.java new file mode 100644 index 0000000000..c685e070e8 --- /dev/null +++ b/geode/src/test/java/akka/stream/alpakka/geode/javadsl/GeodeBaseTestCase.java @@ -0,0 +1,82 @@ +/* + * Copyright (C) 2016-2017 Lightbend Inc. + */ +package akka.stream.alpakka.geode.javadsl; + +import akka.NotUsed; +import akka.actor.ActorSystem; +import akka.stream.ActorMaterializer; +import akka.stream.Materializer; +import akka.stream.alpakka.geode.GeodeSettings; +import akka.stream.alpakka.geode.RegionSettings; +import akka.stream.javadsl.Source; +import akka.testkit.JavaTestKit; +import org.junit.AfterClass; +import org.junit.BeforeClass; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Arrays; +import java.util.Date; + +import static scala.compat.java8.JFunction.func; + +public class GeodeBaseTestCase { + + + protected static final Logger LOGGER = LoggerFactory.getLogger(GeodeFlowTestCase.class); + + private static ActorSystem system; + protected static Materializer materializer; + private String geodeDockerHostname="localhost"; + + { + String geodeItHostname=System.getenv("IT_GEODE_HOSTNAME"); + if(geodeItHostname!=null) + geodeDockerHostname=geodeItHostname; + } + + //#region + protected RegionSettings personRegionSettings = new RegionSettings<>("persons", func(Person::getId)); + protected RegionSettings animalRegionSettings = new RegionSettings<>("animals", func(Animal::getId)); + //#region + + @BeforeClass + public static void setup() { + system = ActorSystem.create(); + materializer = ActorMaterializer.create(system); + + } + + static Source buildPersonsSource(Integer... ids) { + return Source.from(Arrays.asList(ids)) + .map((i) -> new Person(i, String.format("Person Java %d", i), new Date())); + } + + static Source buildAnimalsSource(Integer... ids) { + return Source.from(Arrays.asList(ids)) + .map((i) -> new Animal(i, String.format("Animal Java %d", i), 1)); + } + + protected ReactiveGeode createReactiveGeode() { + //#connection + GeodeSettings settings = GeodeSettings.create(geodeDockerHostname, 10334) + .withConfiguration(func(c->c.setPoolIdleTimeout(10))); + return new ReactiveGeode(settings); + //#connection + } + + protected ReactiveGeodeWithPoolSubscription createReactiveGeodeWithPoolSubscription() { + GeodeSettings settings = GeodeSettings.create(geodeDockerHostname, 10334); + //#connection-with-pool + return new ReactiveGeodeWithPoolSubscription(settings); + //#connection-with-pool + } + + @AfterClass + public static void teardown() { + JavaTestKit.shutdownActorSystem(system); + } + +} diff --git a/geode/src/test/java/akka/stream/alpakka/geode/javadsl/GeodeContinuousSourceTestCase.java b/geode/src/test/java/akka/stream/alpakka/geode/javadsl/GeodeContinuousSourceTestCase.java new file mode 100644 index 0000000000..27f64352ba --- /dev/null +++ b/geode/src/test/java/akka/stream/alpakka/geode/javadsl/GeodeContinuousSourceTestCase.java @@ -0,0 +1,54 @@ +/* + * Copyright (C) 2016-2017 Lightbend Inc. + */ +package akka.stream.alpakka.geode.javadsl; + +import akka.Done; +import akka.NotUsed; +import akka.japi.Pair; +import akka.stream.javadsl.Flow; +import akka.stream.javadsl.Keep; +import akka.stream.javadsl.Sink; +import akka.stream.javadsl.Source; +import org.junit.Test; + +import java.util.Arrays; +import java.util.Date; +import java.util.List; +import java.util.concurrent.CompletionStage; +import java.util.concurrent.ExecutionException; + + +public class GeodeContinuousSourceTestCase extends GeodeBaseTestCase { + + @Test + public void continuousSourceTest() throws ExecutionException, InterruptedException { + + ReactiveGeodeWithPoolSubscription reactiveGeode = createReactiveGeodeWithPoolSubscription(); + + //#continuousQuery + CompletionStage fut = reactiveGeode.continuousQuery("test", "select * from /persons", new PersonPdxSerializer()) + .runForeach(p -> { + LOGGER.debug(p.toString()); + if (p.getId() == 120) { + reactiveGeode.closeContinuousQuery("test"); + } + }, materializer); + //#continuousQuery + + Flow flow = reactiveGeode.flow(personRegionSettings, new PersonPdxSerializer()); + + Pair>> run = Source.from(Arrays.asList(120)).map((i) + -> new Person(i, String.format("Java flow %d", i), new Date())).via(flow).toMat(Sink.seq(), Keep.both()).run(materializer); + + run.second().toCompletableFuture().get(); + + + fut.toCompletableFuture().get(); + + reactiveGeode.close(); + + } + + +} \ No newline at end of file diff --git a/geode/src/test/java/akka/stream/alpakka/geode/javadsl/GeodeFiniteSourceTestCase.java b/geode/src/test/java/akka/stream/alpakka/geode/javadsl/GeodeFiniteSourceTestCase.java new file mode 100644 index 0000000000..946e20a054 --- /dev/null +++ b/geode/src/test/java/akka/stream/alpakka/geode/javadsl/GeodeFiniteSourceTestCase.java @@ -0,0 +1,40 @@ +/* + * Copyright (C) 2016-2017 Lightbend Inc. + */ +package akka.stream.alpakka.geode.javadsl; + +import akka.Done; +import org.junit.Test; + +import java.util.concurrent.CompletionStage; +import java.util.concurrent.ExecutionException; + + +public class GeodeFiniteSourceTestCase extends GeodeBaseTestCase { + + @Test + public void finiteSourceTest() throws ExecutionException, InterruptedException { + + ReactiveGeode reactiveGeode = createReactiveGeode(); + + //#query + CompletionStage personsDone = reactiveGeode.query("select * from /persons", new PersonPdxSerializer()) + .runForeach(p -> { + LOGGER.debug(p.toString()); + }, materializer); + //#query + + personsDone.toCompletableFuture().get(); + + CompletionStage animalsDone = reactiveGeode.query("select * from /animals", new AnimalPdxSerializer()) + .runForeach(p -> { + LOGGER.debug(p.toString()); + }, materializer); + + + animalsDone.toCompletableFuture().get(); + reactiveGeode.close(); + + } + +} \ No newline at end of file diff --git a/geode/src/test/java/akka/stream/alpakka/geode/javadsl/GeodeFlowTestCase.java b/geode/src/test/java/akka/stream/alpakka/geode/javadsl/GeodeFlowTestCase.java new file mode 100644 index 0000000000..1fff0a9bab --- /dev/null +++ b/geode/src/test/java/akka/stream/alpakka/geode/javadsl/GeodeFlowTestCase.java @@ -0,0 +1,45 @@ +/* + * Copyright (C) 2016-2017 Lightbend Inc. + */ +package akka.stream.alpakka.geode.javadsl; + +import akka.NotUsed; +import akka.stream.javadsl.Flow; +import akka.stream.javadsl.Keep; +import akka.stream.javadsl.Sink; +import akka.stream.javadsl.Source; +import org.junit.Test; + +import java.util.List; +import java.util.concurrent.CompletionStage; +import java.util.concurrent.ExecutionException; + + + +public class GeodeFlowTestCase extends GeodeBaseTestCase { + + + @Test + public void flow() throws ExecutionException, InterruptedException { + + ReactiveGeode reactiveGeode = createReactiveGeode(); + + Source source = buildPersonsSource(110, 111, 113, 114, 115); + + //#flow + Flow flow = reactiveGeode.flow(personRegionSettings, new PersonPdxSerializer()); + + CompletionStage> run = source + .via(flow) + .toMat(Sink.seq(), Keep.right()) + .run(materializer); + //#flow + + run.toCompletableFuture().get(); + + reactiveGeode.close(); + + } + + +} \ No newline at end of file diff --git a/geode/src/test/java/akka/stream/alpakka/geode/javadsl/GeodeSinkTestCase.java b/geode/src/test/java/akka/stream/alpakka/geode/javadsl/GeodeSinkTestCase.java new file mode 100644 index 0000000000..9dcbdb9cbb --- /dev/null +++ b/geode/src/test/java/akka/stream/alpakka/geode/javadsl/GeodeSinkTestCase.java @@ -0,0 +1,64 @@ +/* + * Copyright (C) 2016-2017 Lightbend Inc. + */ +package akka.stream.alpakka.geode.javadsl; + +import akka.Done; +import akka.NotUsed; +import akka.stream.javadsl.Keep; +import akka.stream.javadsl.RunnableGraph; +import akka.stream.javadsl.Sink; +import akka.stream.javadsl.Source; +import org.junit.Test; + +import java.util.concurrent.CompletionStage; +import java.util.concurrent.ExecutionException; + + +public class GeodeSinkTestCase extends GeodeBaseTestCase { + + + @Test + public void sinkTest() throws ExecutionException, InterruptedException { + + ReactiveGeode reactiveGeode = createReactiveGeode(); + + Sink> sink = reactiveGeode.sink(personRegionSettings, new PersonPdxSerializer()); + + Source source = buildPersonsSource(100, 101, 103, 104, 105); + + RunnableGraph> runnableGraph = source + .toMat(sink, Keep.right()); + + CompletionStage stage = runnableGraph.run(materializer); + + stage.toCompletableFuture().get(); + + reactiveGeode.close(); + + } + + @Test + public void sinkAnimalTest() throws ExecutionException, InterruptedException { + + ReactiveGeode reactiveGeode = createReactiveGeode(); + + Source source = buildAnimalsSource(100, 101, 103, 104, 105); + + //#sink + Sink> sink = reactiveGeode.sink(animalRegionSettings, new AnimalPdxSerializer()); + + RunnableGraph> runnableGraph = source + .toMat(sink, Keep.right()); + //#sink + + CompletionStage stage = runnableGraph.run(materializer); + + stage.toCompletableFuture().get(); + + reactiveGeode.close(); + + } + + +} \ No newline at end of file diff --git a/geode/src/test/java/akka/stream/alpakka/geode/javadsl/Person.java b/geode/src/test/java/akka/stream/alpakka/geode/javadsl/Person.java new file mode 100644 index 0000000000..a4853f0bb2 --- /dev/null +++ b/geode/src/test/java/akka/stream/alpakka/geode/javadsl/Person.java @@ -0,0 +1,36 @@ +/* + * Copyright (C) 2016-2017 Lightbend Inc. + */ +package akka.stream.alpakka.geode.javadsl; + +import java.util.Date; + +public class Person { + final private int id; + final private String name; + final private Date birthDate; + + + public Person(int id, String name, Date birthDate) { + this.id = id; + this.name = name; + this.birthDate = birthDate; + } + + public int getId() { + return id; + } + + public String getName() { + return name; + } + + public Date getBirthDate() { + return birthDate; + } + + @Override + public String toString() { + return getId() +": " + getName(); + } +} diff --git a/geode/src/test/java/akka/stream/alpakka/geode/javadsl/PersonPdxSerializer.java b/geode/src/test/java/akka/stream/alpakka/geode/javadsl/PersonPdxSerializer.java new file mode 100644 index 0000000000..82c1456f10 --- /dev/null +++ b/geode/src/test/java/akka/stream/alpakka/geode/javadsl/PersonPdxSerializer.java @@ -0,0 +1,40 @@ +/* + * Copyright (C) 2016-2017 Lightbend Inc. + */ +package akka.stream.alpakka.geode.javadsl; + +import akka.stream.alpakka.geode.AkkaPdxSerializer; +import org.apache.geode.pdx.PdxReader; +import org.apache.geode.pdx.PdxWriter; + +import java.util.Date; + +//#person-pdx-serializer +public class PersonPdxSerializer implements AkkaPdxSerializer { + + @Override + public Class clazz() { + return Person.class; + } + + @Override + public boolean toData(Object o, PdxWriter out) { + if(o instanceof Person){ + Person p = (Person)o; + out.writeInt("id", p.getId()); + out.writeString("name", p.getName()); + out.writeDate("birthDate", p.getBirthDate()); + return true; + } + return false; + } + + @Override + public Object fromData(Class clazz, PdxReader in) { + int id = in.readInt("id"); + String name = in.readString("name"); + Date birthDate = in.readDate("birthDate"); + return new Person(id, name, birthDate); + } +} +//#person-pdx-serializer diff --git a/geode/src/test/resources/log4j2.xml b/geode/src/test/resources/log4j2.xml new file mode 100644 index 0000000000..d75580b962 --- /dev/null +++ b/geode/src/test/resources/log4j2.xml @@ -0,0 +1,23 @@ + + + + [%level{lowerCase=true} %date{yyyy/MM/dd HH:mm:ss.SSS z} <%thread> tid=%tid] %message%n%throwable%n + true + + + + + + + + + + + + + + + + + + diff --git a/geode/src/test/resources/logback.xml b/geode/src/test/resources/logback.xml new file mode 100644 index 0000000000..c15c9b85a5 --- /dev/null +++ b/geode/src/test/resources/logback.xml @@ -0,0 +1,16 @@ + + + + System.out + + + %-6level[%logger{0}]: %msg%n + + + + + + + + + diff --git a/geode/src/test/scala/akka/stream/alpakka/geode/internal/pdx/PDXDecoderSpec.scala b/geode/src/test/scala/akka/stream/alpakka/geode/internal/pdx/PDXDecoderSpec.scala new file mode 100644 index 0000000000..19d1205bb6 --- /dev/null +++ b/geode/src/test/scala/akka/stream/alpakka/geode/internal/pdx/PDXDecoderSpec.scala @@ -0,0 +1,47 @@ +/* + * Copyright (C) 2016-2017 Lightbend Inc. + */ +package akka.stream.alpakka.geode.internal.pdx + +import java.util.{Date, UUID} + +import org.scalatest.{Matchers, WordSpec} + +class PDXDecoderSpec extends WordSpec with Matchers { + + "PDX decoder" should { + "decode primitive type" in { + PdxDecoder[Boolean] + PdxDecoder[Int] + PdxDecoder[List[Int]] + PdxDecoder[Array[Int]] + PdxDecoder[Long] + PdxDecoder[List[Long]] + PdxDecoder[Array[Long]] + PdxDecoder[Float] + PdxDecoder[List[Float]] + PdxDecoder[Array[Float]] + PdxDecoder[Double] + PdxDecoder[List[Double]] + PdxDecoder[Array[Double]] + + PdxDecoder[Char] + PdxDecoder[List[Char]] + PdxDecoder[Array[Char]] + PdxDecoder[String] + PdxDecoder[List[String]] + PdxDecoder[Array[String]] + + } + + "decode basic types" in { + PdxDecoder[Date] + PdxDecoder[List[Date]] + + PdxDecoder[UUID] + PdxDecoder[List[UUID]] + + } + } + +} diff --git a/geode/src/test/scala/akka/stream/alpakka/geode/internal/pdx/PDXEncodeSpec.scala b/geode/src/test/scala/akka/stream/alpakka/geode/internal/pdx/PDXEncodeSpec.scala new file mode 100644 index 0000000000..02d73dd637 --- /dev/null +++ b/geode/src/test/scala/akka/stream/alpakka/geode/internal/pdx/PDXEncodeSpec.scala @@ -0,0 +1,36 @@ +/* + * Copyright (C) 2016-2017 Lightbend Inc. + */ +package akka.stream.alpakka.geode.internal.pdx + +import java.util.{Date, UUID} + +import org.scalatest.{Matchers, WordSpec} + +class PDXEncodeSpec extends WordSpec with Matchers { + + "PDXEncoder" should { + + "provides encoder for primitive types" in { + PdxEncoder[Boolean] + PdxEncoder[Int] + PdxEncoder[List[Int]] + PdxEncoder[Array[Int]] + PdxEncoder[Double] + PdxEncoder[List[Double]] + PdxEncoder[Array[Double]] + PdxEncoder[Float] + PdxEncoder[List[Float]] + PdxEncoder[Array[Float]] + PdxEncoder[Long] + PdxEncoder[Char] + PdxEncoder[String] + + } + + "provides encoder for basic types" in { + PdxEncoder[Date] + PdxEncoder[UUID] + } + } +} diff --git a/geode/src/test/scala/akka/stream/alpakka/geode/internal/pdx/PdxWriterMock.scala b/geode/src/test/scala/akka/stream/alpakka/geode/internal/pdx/PdxWriterMock.scala new file mode 100644 index 0000000000..b1e88ccaae --- /dev/null +++ b/geode/src/test/scala/akka/stream/alpakka/geode/internal/pdx/PdxWriterMock.scala @@ -0,0 +1,82 @@ +/* + * Copyright (C) 2016-2017 Lightbend Inc. + */ +package akka.stream.alpakka.geode.internal.pdx + +import java.util.Date + +import org.apache.geode.pdx.{PdxUnreadFields, PdxWriter} + +object PdxMocks { + + class WriterMock extends PdxWriter { + override def writeShortArray(fieldName: String, value: Array[Short]) = { println("Write value"); this } + + override def markIdentityField(fieldName: String) = { println(s"Write value"); this } + + override def writeString(fieldName: String, value: String) = { println(s"Write $value"); this } + + override def writeDate(fieldName: String, value: Date) = { println(s"Write $value"); this } + + override def writeFloat(fieldName: String, value: Float) = { println(s"Write $value"); this } + + override def writeCharArray(fieldName: String, value: Array[Char]) = { println(s"Write $value"); this } + + override def writeDouble(fieldName: String, value: Double) = { println(s"Write $value"); this } + + override def writeObjectArray(fieldName: String, value: Array[AnyRef]) = { println(s"Write $value"); this } + + override def writeObjectArray(fieldName: String, value: Array[AnyRef], checkPortability: Boolean) = { + println(s"Write $value"); this + } + + override def writeShort(fieldName: String, value: Short) = { println(s"Write $value"); this } + + override def writeArrayOfByteArrays(fieldName: String, value: Array[Array[Byte]]) = { + println(s"Write $value"); this + } + + override def writeField[CT, VT <: CT](fieldName: String, fieldValue: VT, fieldType: Class[CT]) = { + println(s"Write $fieldName"); this + } + + override def writeField[CT, VT <: CT](fieldName: String, + fieldValue: VT, + fieldType: Class[CT], + checkPortability: Boolean) = { println(s"Write $fieldName"); this } + + override def writeInt(fieldName: String, value: Int) = { println(s"Write $value"); this } + + override def writeDoubleArray(fieldName: String, value: Array[Double]) = { println(s"Write $value"); this } + + override def writeLongArray(fieldName: String, value: Array[Long]) = { println(s"Write $value"); this } + + override def writeByteArray(fieldName: String, value: Array[Byte]) = { println(s"Write $value"); this } + + override def writeBooleanArray(fieldName: String, value: Array[Boolean]) = { println(s"Write $value"); this } + + override def writeIntArray(fieldName: String, value: Array[Int]) = { println(s"Write $value"); this } + + override def writeBoolean(fieldName: String, value: Boolean) = { println(s"Write $value"); this } + + override def writeStringArray(fieldName: String, value: Array[String]) = { println(s"Write $value"); this } + + override def writeObject(fieldName: String, value: scala.Any) = { println(s"Write $value"); this } + + override def writeObject(fieldName: String, value: scala.Any, checkPortability: Boolean) = { + println(s"Write $value"); this + } + + override def writeFloatArray(fieldName: String, value: Array[Float]) = { println(s"Write $value"); this } + + override def writeChar(fieldName: String, value: Char) = { println(s"Write $value"); this } + + override def writeLong(fieldName: String, value: Long) = { println(s"Write $value"); this } + + override def writeUnreadFields(unread: PdxUnreadFields) = { println(s"Write $unread"); this } + + override def writeByte(fieldName: String, value: Byte) = { println(s"Write $value"); this } + } + + implicit val writerMock = new WriterMock() +} diff --git a/geode/src/test/scala/akka/stream/alpakka/geode/scaladsl/GeodeBaseSpec.scala b/geode/src/test/scala/akka/stream/alpakka/geode/scaladsl/GeodeBaseSpec.scala new file mode 100644 index 0000000000..3590bd0dc3 --- /dev/null +++ b/geode/src/test/scala/akka/stream/alpakka/geode/scaladsl/GeodeBaseSpec.scala @@ -0,0 +1,48 @@ +/* + * Copyright (C) 2016-2017 Lightbend Inc. + */ +package akka.stream.alpakka.geode.scaladsl + +import java.util.{Date, UUID} + +import akka.actor.ActorSystem +import akka.stream.ActorMaterializer +import akka.stream.alpakka.geode.{GeodeSettings, RegionSettings} +import akka.stream.scaladsl.Source +import org.scalatest.{BeforeAndAfterAll, Matchers, WordSpec} + +import scala.concurrent.Await +import scala.concurrent.duration.DurationInt +import scala.language.postfixOps + +class GeodeBaseSpec extends WordSpec with Matchers with BeforeAndAfterAll { + + implicit val system = ActorSystem("test") + implicit val materializer = ActorMaterializer() + + //#region + val personsRegionSettings = RegionSettings("persons", (p: Person) => p.id) + val animalsRegionSettings = RegionSettings("animals", (a: Animal) => a.id) + val complexesRegionSettings = RegionSettings("complexes", (a: Complex) => a.id) + + //#region + + /** + * Run IT test only if geode is available. + * @param f + */ + def it(f: GeodeSettings => Unit): Unit = + f(GeodeSettings(sys.env.get("IT_GEODE_HOSTNAME").getOrElse("localhost"))) + + protected def buildPersonsSource(range: Range): Source[Person, Any] = + Source(range).map(i => Person(i, s"Person Scala $i", new Date())) + + protected def buildAnimalsSource(range: Range): Source[Animal, Any] = + Source(range).map(i => Animal(i, s"Animal Scala $i", 1)) + + protected def buildComplexesSource(range: Range): Source[Complex, Any] = + Source(range).map(i => Complex(UUID.randomUUID(), List(1, 2, 3), List(new Date()), Set(UUID.randomUUID()))) + + override protected def afterAll(): Unit = + Await.result(system.terminate(), 10 seconds) +} diff --git a/geode/src/test/scala/akka/stream/alpakka/geode/scaladsl/GeodeContinuousSourceSpec.scala b/geode/src/test/scala/akka/stream/alpakka/geode/scaladsl/GeodeContinuousSourceSpec.scala new file mode 100644 index 0000000000..805133ea87 --- /dev/null +++ b/geode/src/test/scala/akka/stream/alpakka/geode/scaladsl/GeodeContinuousSourceSpec.scala @@ -0,0 +1,56 @@ +/* + * Copyright (C) 2016-2017 Lightbend Inc. + */ +package akka.stream.alpakka.geode.scaladsl + +import akka.NotUsed +import akka.stream.scaladsl.{Flow, Sink} +import org.slf4j.LoggerFactory + +import scala.concurrent.Await +import scala.concurrent.duration.DurationInt +import scala.language.postfixOps + +class GeodeContinuousSourceSpec extends GeodeBaseSpec { + + private val log = LoggerFactory.getLogger(classOf[GeodeContinuousSourceSpec]) + + "Geode continuousQuery" should { + it { geodeSettings => + "retrieves continuously elements from geode" in { + + //#connection-with-pool + val reactiveGeode = new ReactiveGeode(geodeSettings) with PoolSubscription + //#connection-with-pool + + val flow: Flow[Person, Person, NotUsed] = reactiveGeode.flow(personsRegionSettings) + + //#continuousQuery + val source = + reactiveGeode + .continuousQuery[Person]('test, s"select * from /persons") + .runWith(Sink.fold(0) { (c, p) => + log.debug(s"$p $c") + if (c == 19) { + reactiveGeode.closeContinuousQuery('test).foreach { _ => + log.debug("test cQuery is closed") + } + + } + c + 1 + }) + //#continuousQuery + + val f = buildPersonsSource(1 to 20) + .via(flow) //geode flow + .runWith(Sink.ignore) + + Await.result(f, 10 seconds) + + Await.result(source, 5 seconds) + + reactiveGeode.close() + } + } + } +} diff --git a/geode/src/test/scala/akka/stream/alpakka/geode/scaladsl/GeodeFiniteSourceSpec.scala b/geode/src/test/scala/akka/stream/alpakka/geode/scaladsl/GeodeFiniteSourceSpec.scala new file mode 100644 index 0000000000..b192dfb2db --- /dev/null +++ b/geode/src/test/scala/akka/stream/alpakka/geode/scaladsl/GeodeFiniteSourceSpec.scala @@ -0,0 +1,49 @@ +/* + * Copyright (C) 2016-2017 Lightbend Inc. + */ +package akka.stream.alpakka.geode.scaladsl + +import akka.stream.scaladsl.Sink +import org.slf4j.LoggerFactory + +import scala.concurrent.Await +import scala.concurrent.duration.DurationInt +import scala.language.postfixOps + +class GeodeFiniteSourceSpec extends GeodeBaseSpec { + + private val log = LoggerFactory.getLogger(classOf[GeodeFiniteSourceSpec]) + + "Geode finite source" should { + it { geodeSettings => + "retrieves finite elements from geode" in { + + val reactiveGeode = new ReactiveGeode(geodeSettings) + + //#query + val source = + reactiveGeode + .query[Person](s"select * from /persons order by id") + .runWith(Sink.foreach(e => log.debug(s"$e"))) + //#query + Await.ready(source, 10 seconds) + + val animals = + reactiveGeode + .query[Animal](s"select * from /animals order by id") + .runWith(Sink.foreach(e => log.debug(s"$e"))) + + Await.ready(animals, 10 seconds) + + val complexes = + reactiveGeode + .query[Complex](s"select * from /complexes order by id") + .runWith(Sink.foreach(e => log.debug(s"$e"))) + + Await.ready(complexes, 10 seconds) + + reactiveGeode.close() + } + } + } +} diff --git a/geode/src/test/scala/akka/stream/alpakka/geode/scaladsl/GeodeFlowSpec.scala b/geode/src/test/scala/akka/stream/alpakka/geode/scaladsl/GeodeFlowSpec.scala new file mode 100644 index 0000000000..2276431cd4 --- /dev/null +++ b/geode/src/test/scala/akka/stream/alpakka/geode/scaladsl/GeodeFlowSpec.scala @@ -0,0 +1,47 @@ +/* + * Copyright (C) 2016-2017 Lightbend Inc. + */ +package akka.stream.alpakka.geode.scaladsl + +import akka.NotUsed +import akka.stream.scaladsl.{Flow, Sink} + +import scala.concurrent.Await +import scala.concurrent.duration.DurationInt +import scala.language.postfixOps + +class GeodeFlowSpec extends GeodeBaseSpec { + + "Reactive geode" should { + it { geodeSettings => + "flow with shapeless pdx serializer" in { + //#connection + val reactiveGeode = new ReactiveGeode(geodeSettings) + //#connection + + val source = buildPersonsSource(1 to 10) + + //#flow + val flow: Flow[Person, Person, NotUsed] = reactiveGeode.flow(personsRegionSettings) + + val fut = source.via(flow).runWith(Sink.ignore) + //#flow + Await.ready(fut, 10 seconds) + + reactiveGeode.close() + } + "flow with explicit pdx serializer" in { + val reactiveGeode = new ReactiveGeode(geodeSettings) + + val source = buildPersonsSource(1 to 20) + + val flow: Flow[Person, Person, NotUsed] = reactiveGeode.flow(personsRegionSettings, PersonPdxSerializer) + + val fut = source.via(flow).runWith(Sink.ignore) + Await.ready(fut, 10 seconds) + + reactiveGeode.close() + } + } + } +} diff --git a/geode/src/test/scala/akka/stream/alpakka/geode/scaladsl/GeodeSinkSpec.scala b/geode/src/test/scala/akka/stream/alpakka/geode/scaladsl/GeodeSinkSpec.scala new file mode 100644 index 0000000000..ba2576d444 --- /dev/null +++ b/geode/src/test/scala/akka/stream/alpakka/geode/scaladsl/GeodeSinkSpec.scala @@ -0,0 +1,64 @@ +/* + * Copyright (C) 2016-2017 Lightbend Inc. + */ +package akka.stream.alpakka.geode.scaladsl + +import scala.concurrent.Await +import scala.concurrent.duration.DurationInt +import scala.language.postfixOps + +class GeodeSinkSpec extends GeodeBaseSpec { + + "Reactive geode sink" should { + it { geodeSettings => + "stores persons in geode" in { + + val reactiveGeode = new ReactiveGeode(geodeSettings) with PoolSubscription + + val source = buildPersonsSource(30 to 40) + + val sink = reactiveGeode.sink(personsRegionSettings) + val fut = source.runWith(sink) + + Await.ready(fut, 5 seconds) + + reactiveGeode.close() + } + + "stores animals in geode" in { + + val reactiveGeode = new ReactiveGeode(geodeSettings) with PoolSubscription + + val source = buildAnimalsSource(1 to 40) + + //#sink + val sink = reactiveGeode.sink(animalsRegionSettings) + + val fut = source.runWith(sink) + //#sink + Await.ready(fut, 10 seconds) + + reactiveGeode.close() + } + + "stores complex in geode" in { + + val reactiveGeode = new ReactiveGeode(geodeSettings) with PoolSubscription + + val source = buildComplexesSource(1 to 40) + + //#sink + val sink = reactiveGeode.sink(complexesRegionSettings) + + val fut = source.runWith(sink) + //#sink + Await.ready(fut, 10 seconds) + + reactiveGeode.close() + } + + } + + } + +} diff --git a/geode/src/test/scala/akka/stream/alpakka/geode/scaladsl/Model.scala b/geode/src/test/scala/akka/stream/alpakka/geode/scaladsl/Model.scala new file mode 100644 index 0000000000..2783095e55 --- /dev/null +++ b/geode/src/test/scala/akka/stream/alpakka/geode/scaladsl/Model.scala @@ -0,0 +1,11 @@ +/* + * Copyright (C) 2016-2017 Lightbend Inc. + */ +package akka.stream.alpakka.geode.scaladsl + +import java.util.{Date, UUID} + +case class Person(id: Int, name: String, birthDate: Date) +case class Animal(id: Int, name: String, owner: Int) + +case class Complex(id: UUID, ints: List[Int], dates: List[Date], ids: Set[UUID] = Set()) diff --git a/geode/src/test/scala/akka/stream/alpakka/geode/scaladsl/PersonPdxSerializer.scala b/geode/src/test/scala/akka/stream/alpakka/geode/scaladsl/PersonPdxSerializer.scala new file mode 100644 index 0000000000..9b2d3945f1 --- /dev/null +++ b/geode/src/test/scala/akka/stream/alpakka/geode/scaladsl/PersonPdxSerializer.scala @@ -0,0 +1,32 @@ +/* + * Copyright (C) 2016-2017 Lightbend Inc. + */ +package akka.stream.alpakka.geode.scaladsl + +import java.util.Date + +import akka.stream.alpakka.geode.AkkaPdxSerializer +import org.apache.geode.pdx.{PdxReader, PdxWriter} + +//#person-pdx-serializer +object PersonPdxSerializer extends AkkaPdxSerializer[Person] { + override def clazz: Class[Person] = classOf[Person] + + override def toData(o: scala.Any, out: PdxWriter): Boolean = + if (o.isInstanceOf[Person]) { + val p = o.asInstanceOf[Person] + out.writeInt("id", p.id) + out.writeString("name", p.name) + out.writeDate("birthDate", p.birthDate) + true + } else + false + + override def fromData(clazz: Class[_], in: PdxReader): AnyRef = { + val id: Int = in.readInt("id") + val name: String = in.readString("name") + val birthDate: Date = in.readDate("birthDate") + Person(id, name, birthDate) + } +} +//#person-pdx-serializer diff --git a/project/Dependencies.scala b/project/Dependencies.scala index 485b881205..864ea275f2 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -48,6 +48,21 @@ object Dependencies { val Csv = Seq() + val Geode = { + val geodeVersion = "1.1.1" + Seq( + libraryDependencies ++= Seq("com.chuusai" %% "shapeless" % "2.3.2") ++ + Seq("geode-core","geode-cq") + .map("org.apache.geode" % _ % geodeVersion exclude("org.slf4j", "slf4j-log4j12")) ++ + Seq("org.slf4j" % "log4j-over-slf4j" % "1.7.21" % Test, // MIT like: http://www.slf4j.org/license.html + "org.slf4j" % "slf4j-api" % "1.7.21" % Test, // MIT like: http://www.slf4j.org/license.html + "ch.qos.logback" % "logback-classic" % "1.1.7" % Test, // Eclipse Public License 1.0: http://logback.qos.ch/license.html + "ch.qos.logback" % "logback-core" % "1.1.7" % Test // Eclipse Public License 1.0: http://logback.qos.ch/license.html + ) + + ) + } + val HBase = { val hbaseVersion = "1.2.4" val hadoopVersion = "2.5.1"