diff --git a/build.sbt b/build.sbt
index 60a957a802..e4f17a87de 100644
--- a/build.sbt
+++ b/build.sbt
@@ -9,6 +9,7 @@ lazy val alpakka = project
dynamodb,
files,
ftp,
+ geode,
googleCloudPubSub,
hbase,
ironmq,
@@ -98,6 +99,15 @@ lazy val ftp = project
parallelExecution in Test := false
)
+lazy val geode = project
+ .enablePlugins(AutomateHeaderPlugin)
+ .settings(
+ name := "akka-stream-alpakka-geode",
+ Dependencies.Geode,
+ fork in Test := true,
+ parallelExecution in Test := false
+ )
+
lazy val googleCloudPubSub = project
.in(file("google-cloud-pub-sub"))
.enablePlugins(AutomateHeaderPlugin)
diff --git a/docker-compose.yml b/docker-compose.yml
index 12db706d89..a45ed61cdc 100644
--- a/docker-compose.yml
+++ b/docker-compose.yml
@@ -22,6 +22,26 @@ services:
image: deangiberson/aws-dynamodb-local
ports:
- "8001:8000"
+ geode:
+ container_name: geode
+ image: apachegeode/geode
+ hostname: geode
+ mem_limit: 2g
+ expose:
+ - "10334"
+ - "1099"
+ - "7575"
+ - "40404"
+ ports:
+ - "1099:1099"
+ - "10334:10334"
+ - "7575:7575"
+ - "7070:7070"
+ - "40404:40404"
+ - "8081:8080"
+ volumes:
+ - ./geode/scripts/:/scripts/
+ command: /scripts/geode.sh
ironauth:
image: iron/auth
ports:
diff --git a/docs/src/main/paradox/connectors.md b/docs/src/main/paradox/connectors.md
index 2be0bb3b2c..cbac74f149 100644
--- a/docs/src/main/paradox/connectors.md
+++ b/docs/src/main/paradox/connectors.md
@@ -3,6 +3,7 @@
@@@ index
* [AMQP Connector](amqp.md)
+* [Apache Geode Connector](geode.md)
* [AWS DynamoDB Connector](dynamodb.md)
* [AWS Lambda Connector](awslambda.md)
* [AWS S3 Connector](s3.md)
diff --git a/docs/src/main/paradox/geode.md b/docs/src/main/paradox/geode.md
new file mode 100644
index 0000000000..a59ea6aec7
--- /dev/null
+++ b/docs/src/main/paradox/geode.md
@@ -0,0 +1,180 @@
+#Apache Geode connector
+
+[Apache Geode](http://geode.apache.org) is a distributed datagrid (ex Gemfire).
+
+This connector provides flow and a sink to put element in and source to retrieve element from geode.
+
+Basically it can store data as key, value. Key and value must be serialized, more on this later.
+
+## Artifacts
+
+sbt
+: @@@vars
+ ```scala
+ libraryDependencies += "com.lightbend.akka" %% "akka-stream-alpakka-geode" % "$version$"
+ ```
+ @@@
+
+Maven
+: @@@vars
+ ```xml
+
+ com.lightbend.akka
+ akka-stream-alpakka-geode_$scala.binaryVersion$
+ $version$
+
+ ```
+ @@@
+
+Gradle
+: @@@vars
+ ```gradle
+ dependencies {
+ compile group: "com.lightbend.akka", name: "akka-stream-alpakka-geode_$scala.binaryVersion$", version: "$version$"
+ }
+ ```
+ @@@
+
+
+#Usage
+
+##Connection
+
+First of all you need to connect to the geode cache. In a client application, connection is handle by a
+ [ClientCache](https://geode.apache.org/docs/guide/11/basic_config/the_cache/managing_a_client_cache.html). A single
+ ClientCache per application is enough. ClientCache also holds a single PDXSerializer.
+
+scala
+: @@snip (../../../../geode/src/test/scala/akka/stream/alpakka/geode/scaladsl/GeodeFlowSpec.scala) { #connection }
+
+java
+: @@snip (../../../../geode/src/test/java/akka/stream/alpakka/geode/javadsl/GeodeBaseTestCase.java) { #connection }
+
+Apache Geode supports continuous queries. Continuous query relies on server event, thus reactive geode needs to listen to
+ those event. This behaviour, as it consumes more resources is isolated in a scala trait and/or an specialized java class.
+
+scala
+: @@snip (../../../../geode/src/test/scala/akka/stream/alpakka/geode/scaladsl/GeodeContinuousSourceSpec.scala) { #connection-with-pool }
+
+java
+: @@snip (../../../../geode/src/test/java/akka/stream/alpakka/geode/javadsl/GeodeBaseTestCase.java) { #connection-with-pool }
+
+##Region
+
+Define a [region](https://geode.apache.org/docs/guide/11/basic_config/data_regions/chapter_overview.html) setting to
+describe how to access region and the key extraction function.
+
+scala
+: @@snip (../../../../geode/src/test/scala/akka/stream/alpakka/geode/scaladsl/GeodeBaseSpec.scala) { #region }
+
+java
+: @@snip (../../../../geode/src/test/java/akka/stream/alpakka/geode/javadsl/GeodeBaseTestCase.java) { #region }
+
+
+###Serialization
+
+Object must be serialized to flow in a geode region.
+
+* opaque format (eq json/xml)
+* java serialisation
+* pdx geode format
+
+PDX format is the only one supported.
+
+PDXEncoder support many options, see [gemfire_pdx_serialization.html](http://geode.apache.org/docs/guide/11/developing/data_serialization/gemfire_pdx_serialization.html)
+
+PdxSerializer must be provided to geode when reading or writing to a region.
+
+scala
+: @@snip (../../../../geode/src/test/scala/akka/stream/alpakka/geode/scaladsl/PersonPdxSerializer.scala) { #person-pdx-serializer }
+
+java
+: @@snip (../../../../geode/src/test/java/akka/stream/alpakka/geode/javadsl/PersonPdxSerializer.java) { #person-pdx-serializer }
+
+
+
+This project provides a generic solution for scala user based on [shapeless](https://github.com/milessabin/shapeless), then case classe serializer if not provided will be generated compile time.
+Java user will need to write by hand their custom serializer.
+
+
+Runtime reflection is also an option see [auto_serialization.html](http://geode.apache.org/docs/guide/11/developing/data_serialization/auto_serialization.html).
+
+###Flow usage
+
+This sample stores (case) classes in Geode.
+
+scala
+: @@snip (../../../../geode/src/test/scala/akka/stream/alpakka/geode/scaladsl/GeodeFlowSpec.scala) { #flow }
+
+java
+: @@snip (../../../../geode/src/test/java/akka/stream/alpakka/geode/javadsl/GeodeFlowTestCase.java) { #flow }
+
+
+###Sink usage
+
+scala
+: @@snip (../../../../geode/src/test/scala/akka/stream/alpakka/geode/scaladsl/GeodeSinkSpec.scala) { #sink }
+
+java
+: @@snip (../../../../geode/src/test/java/akka/stream/alpakka/geode/javadsl/GeodeSinkTestCase.java) { #sink }
+
+
+###Source usage
+
+####Simple query
+
+Apache Geode support simple queries.
+
+scala
+: @@snip (../../../../geode/src/test/scala/akka/stream/alpakka/geode/scaladsl/GeodeFiniteSourceSpec.scala) { #query }
+
+java
+: @@snip (../../../../geode/src/test/java/akka/stream/alpakka/geode/javadsl/GeodeFiniteSourceTestCase.java) { #query }
+
+
+####Continuous query
+
+
+scala
+: @@snip (../../../../geode/src/test/scala/akka/stream/alpakka/geode/scaladsl/GeodeContinuousSourceSpec.scala) { #continuousQuery }
+
+java
+: @@snip (../../../../geode/src/test/java/akka/stream/alpakka/geode/javadsl/GeodeContinuousSourceTestCase.java) { #continuousQuery }
+
+
+##Geode basic command:
+
+Assuming Apache geode is installed:
+
+```
+gfsh
+```
+
+From the geode shell:
+
+```
+start locator --name=locator
+configure pdx --read-serialized=true
+start server --name=server
+
+create region --name=animals --type=PARTITION_REDUNDANT --redundant-copies=2
+create region --name=persons --type=PARTITION_REDUNDANT --redundant-copies=2
+
+```
+
+###Run the test
+
+Integration test are run against localhost geode, but IT_GEODE_HOSTNAME environment variable can change this:
+
+```bash
+export IT_GEODE_HOSTNAME=geode-host-locator
+
+sbt
+```
+
+From sbt shell
+
+```sbtshell
+project geode
+test
+```
diff --git a/geode/scripts/cache.xml b/geode/scripts/cache.xml
new file mode 100644
index 0000000000..ab32281104
--- /dev/null
+++ b/geode/scripts/cache.xml
@@ -0,0 +1,29 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/geode/scripts/geode.sh b/geode/scripts/geode.sh
new file mode 100755
index 0000000000..ddc1af16ce
--- /dev/null
+++ b/geode/scripts/geode.sh
@@ -0,0 +1,13 @@
+#!/bin/bash
+
+rm -rf /geode
+mkdir -p /geode/locator
+mkdir -p /geode/server
+
+
+gfsh -e "start locator --name=$HOSTNAME-locator --dir=/geode/locator --mcast-port=0 --hostname-for-clients=0.0.0.0" -e "start server --name=$HOSTNAME-server --locators=localhost[10334] --dir=/geode/server --server-port=40404 --max-heap=1G --hostname-for-clients=0.0.0.0 --cache-xml-file=/scripts/cache.xml"
+
+while true; do
+ sleep 10
+done
+
diff --git a/geode/scripts/start-local.sh b/geode/scripts/start-local.sh
new file mode 100755
index 0000000000..4559010830
--- /dev/null
+++ b/geode/scripts/start-local.sh
@@ -0,0 +1,2 @@
+gfsh -e"start locator --name=locator" -e"configure pdx --read-serialized=true" -e "start server --name=server"
+
diff --git a/geode/src/main/java/akka/stream/alpakka/geode/javadsl/ReactiveGeode.java b/geode/src/main/java/akka/stream/alpakka/geode/javadsl/ReactiveGeode.java
new file mode 100644
index 0000000000..a6bf588f60
--- /dev/null
+++ b/geode/src/main/java/akka/stream/alpakka/geode/javadsl/ReactiveGeode.java
@@ -0,0 +1,107 @@
+/*
+ * Copyright (C) 2016-2017 Lightbend Inc.
+ */
+package akka.stream.alpakka.geode.javadsl;
+
+import akka.Done;
+import akka.NotUsed;
+import akka.stream.alpakka.geode.AkkaPdxSerializer;
+import akka.stream.alpakka.geode.GeodeSettings;
+import akka.stream.alpakka.geode.RegionSettings;
+import akka.stream.alpakka.geode.internal.GeodeCache;
+import akka.stream.alpakka.geode.internal.stage.GeodeContinuousSourceStage;
+import akka.stream.alpakka.geode.internal.stage.GeodeFiniteSourceStage;
+import akka.stream.alpakka.geode.internal.stage.GeodeFlowStage;
+import akka.stream.javadsl.Flow;
+import akka.stream.javadsl.Keep;
+import akka.stream.javadsl.Sink;
+import akka.stream.javadsl.Source;
+import org.apache.geode.cache.client.ClientCacheFactory;
+import org.apache.geode.cache.query.CqException;
+import org.apache.geode.cache.query.CqQuery;
+import org.apache.geode.cache.query.QueryService;
+import scala.Symbol;
+import scala.concurrent.Future;
+
+import java.util.concurrent.CompletionStage;
+
+
+/**
+ * Reactive geode without server event subscription. Cannot build continuous source.
+ */
+public class ReactiveGeode extends GeodeCache {
+
+ final GeodeSettings geodeSettings;
+
+ public ReactiveGeode(GeodeSettings settings) {
+ super(settings);
+ this.geodeSettings = settings;
+ }
+
+ @Override
+ public ClientCacheFactory configure(ClientCacheFactory factory) {
+ return factory.addPoolLocator(geodeSettings.hostname(), geodeSettings.port());
+ }
+
+ public Source> query( String query, AkkaPdxSerializer serializer) {
+
+ registerPDXSerializer(serializer, serializer.clazz());
+ return Source.fromGraph(new GeodeFiniteSourceStage(cache(), query));
+ }
+
+
+ public Flow flow(RegionSettings regionSettings, AkkaPdxSerializer serializer) {
+
+ registerPDXSerializer(serializer, serializer.clazz());
+
+ return Flow.fromGraph(new GeodeFlowStage(cache(), regionSettings));
+
+ }
+
+ public Sink> sink(RegionSettings regionSettings, AkkaPdxSerializer serializer) {
+ return flow(regionSettings, serializer)
+ .toMat(Sink.ignore(), Keep.right());
+ }
+
+ public void close() {
+ close(false);
+ }
+
+
+}
+
+/**
+ * Reactive geode with server event subscription. Can build continuous source.
+ */
+class ReactiveGeodeWithPoolSubscription extends ReactiveGeode {
+
+ /**
+ * Subscribes to server event.
+ * @return ClientCacheFactory with server event subscription.
+ */
+ final public ClientCacheFactory configure(ClientCacheFactory factory) {
+ return super.configure(factory).setPoolSubscriptionEnabled(true);
+ }
+
+ public ReactiveGeodeWithPoolSubscription(GeodeSettings settings) {
+ super(settings);
+ }
+
+ public Source> continuousQuery(String queryName, String query, AkkaPdxSerializer serializer) {
+
+ registerPDXSerializer(serializer, serializer.clazz());
+ return Source.fromGraph(new GeodeContinuousSourceStage(cache(), Symbol.apply(queryName), query));
+ }
+
+ public boolean closeContinuousQuery(String name) throws CqException {
+
+ QueryService qs = cache().getQueryService();
+ CqQuery query = qs.getCq(name);
+ if (query == null)
+ return false;
+ query.close();
+ return true;
+
+ }
+
+}
\ No newline at end of file
diff --git a/geode/src/main/scala/akka/stream/alpakka/geode/AkkaPdxSerializer.scala b/geode/src/main/scala/akka/stream/alpakka/geode/AkkaPdxSerializer.scala
new file mode 100644
index 0000000000..f16bb62161
--- /dev/null
+++ b/geode/src/main/scala/akka/stream/alpakka/geode/AkkaPdxSerializer.scala
@@ -0,0 +1,10 @@
+/*
+ * Copyright (C) 2016-2017 Lightbend Inc.
+ */
+package akka.stream.alpakka.geode
+
+import org.apache.geode.pdx.PdxSerializer
+
+trait AkkaPdxSerializer[V] extends PdxSerializer {
+ def clazz: Class[V]
+}
diff --git a/geode/src/main/scala/akka/stream/alpakka/geode/GeodeSettings.scala b/geode/src/main/scala/akka/stream/alpakka/geode/GeodeSettings.scala
new file mode 100644
index 0000000000..c9b425d895
--- /dev/null
+++ b/geode/src/main/scala/akka/stream/alpakka/geode/GeodeSettings.scala
@@ -0,0 +1,41 @@
+/*
+ * Copyright (C) 2016-2017 Lightbend Inc.
+ */
+package akka.stream.alpakka.geode
+
+import org.apache.geode.cache.client.ClientCacheFactory
+
+/**
+ * General settings to connect Apache Geode.
+ *
+ * @param hostname
+ * @param port default to 10334
+ * @param pdxCompat a function that determines if two class are equivalent (java class / scala case class)
+ */
+case class GeodeSettings(hostname: String,
+ port: Int = 10334,
+ configure: Option[ClientCacheFactory => ClientCacheFactory] = None,
+ pdxCompat: (Class[_], Class[_]) => Boolean = (c1, c2) =>
+ c1.getSimpleName equals c2.getSimpleName) {
+
+ /**
+ * @param configure function to configure geode client factory
+ */
+ def withConfiguration(configure: ClientCacheFactory => ClientCacheFactory) =
+ copy(configure = Some(configure))
+
+ /**
+ * @param pdxCompat a function that determines if two class are equivalent (java class / scala case class)
+ */
+ def withPdxCompat(pdxCompat: (Class[_], Class[_]) => Boolean) =
+ copy(pdxCompat = pdxCompat)
+}
+
+object GeodeSettings {
+ def create(hostname: String) = new GeodeSettings(hostname)
+
+ def create(hostname: String, port: Int) = new GeodeSettings(hostname, port)
+
+}
+
+case class RegionSettings[K, V](name: String, keyExtractor: V => K)
diff --git a/geode/src/main/scala/akka/stream/alpakka/geode/internal/GeodeCache.scala b/geode/src/main/scala/akka/stream/alpakka/geode/internal/GeodeCache.scala
new file mode 100644
index 0000000000..9dc2c6ded3
--- /dev/null
+++ b/geode/src/main/scala/akka/stream/alpakka/geode/internal/GeodeCache.scala
@@ -0,0 +1,47 @@
+/*
+ * Copyright (C) 2016-2017 Lightbend Inc.
+ */
+package akka.stream.alpakka.geode.internal
+
+import akka.stream.alpakka.geode.GeodeSettings
+import akka.stream.alpakka.geode.internal.pdx.DelegatingPdxSerializer
+import org.apache.geode.cache.client.{ClientCache, ClientCacheFactory}
+import org.apache.geode.pdx.PdxSerializer
+
+/**
+ * Base of all geode client.
+ *
+ */
+abstract class GeodeCache(geodeSettings: GeodeSettings) {
+
+ private lazy val serializer = new DelegatingPdxSerializer(geodeSettings.pdxCompat)
+
+ protected def registerPDXSerializer[V](pdxSerializer: PdxSerializer, clazz: Class[V]): Unit =
+ serializer.register(pdxSerializer, clazz)
+
+ /**
+ * This method will overloaded to provide server event subscription.
+ *
+ * @return
+ */
+ protected def configure(factory: ClientCacheFactory): ClientCacheFactory
+
+ /**
+ * Return ClientCacheFactory:
+ *
+ * - with PDX support
+ * - configured by sub classes
+ * - customized by client application
+ *
+ *
+ */
+ final protected def newCacheFactory(): ClientCacheFactory = {
+ val factory = configure(new ClientCacheFactory().setPdxSerializer(serializer))
+ geodeSettings.configure.map(_(factory)).getOrElse(factory)
+ }
+
+ lazy val cache: ClientCache = newCacheFactory().create()
+
+ def close(keepAlive: Boolean = false): Unit = cache.close(keepAlive)
+
+}
diff --git a/geode/src/main/scala/akka/stream/alpakka/geode/internal/GeodeCapabilities.scala b/geode/src/main/scala/akka/stream/alpakka/geode/internal/GeodeCapabilities.scala
new file mode 100644
index 0000000000..29914334aa
--- /dev/null
+++ b/geode/src/main/scala/akka/stream/alpakka/geode/internal/GeodeCapabilities.scala
@@ -0,0 +1,32 @@
+/*
+ * Copyright (C) 2016-2017 Lightbend Inc.
+ */
+package akka.stream.alpakka.geode.internal
+
+import akka.stream.alpakka.geode.RegionSettings
+import akka.stream.stage.StageLogging
+import org.apache.geode.cache.client.{ClientCache, ClientRegionShortcut}
+
+import scala.util.control.NonFatal
+
+trait GeodeCapabilities[K, V] { this: StageLogging =>
+
+ def regionSettings: RegionSettings[K, V]
+
+ def clientCache: ClientCache
+
+ private lazy val region =
+ clientCache.createClientRegionFactory[K, V](ClientRegionShortcut.CACHING_PROXY).create(regionSettings.name)
+
+ def put(v: V): Unit = region.put(regionSettings.keyExtractor(v), v)
+
+ def close(): Unit =
+ try {
+ if (clientCache.isClosed)
+ return
+ region.close()
+ log.debug("region closed")
+ } catch {
+ case NonFatal(ex) => log.error(ex, "Problem occurred during producer region closing")
+ }
+}
diff --git a/geode/src/main/scala/akka/stream/alpakka/geode/internal/pdx/DelegatingPdxSerializer.scala b/geode/src/main/scala/akka/stream/alpakka/geode/internal/pdx/DelegatingPdxSerializer.scala
new file mode 100644
index 0000000000..bf4f6d27da
--- /dev/null
+++ b/geode/src/main/scala/akka/stream/alpakka/geode/internal/pdx/DelegatingPdxSerializer.scala
@@ -0,0 +1,61 @@
+/*
+ * Copyright (C) 2016-2017 Lightbend Inc.
+ */
+package akka.stream.alpakka.geode.internal.pdx
+
+import java.util.Properties
+
+import org.apache.geode.cache.Declarable
+import org.apache.geode.pdx.{PdxReader, PdxSerializer, PdxWriter}
+
+/**
+ * Geode ClientCache does not support more than one serializer.
+ *
+ * This serializer delegates to lazily registered serializer.
+ */
+class DelegatingPdxSerializer(
+ isPdxCompat: (Class[_], Class[_]) => Boolean
+) extends PdxSerializer
+ with Declarable {
+
+ private var serializers = Map[Class[_], PdxSerializer]()
+
+ def register[V](serializer: PdxSerializer, clazz: Class[V]): Unit = synchronized {
+ if (!serializers.contains(clazz))
+ serializers += (clazz -> serializer)
+ }
+
+ /**
+ * Marshalls a class with a registered serializer.
+ *
+ * @return true on success
+ */
+ override def toData(o: scala.Any, out: PdxWriter): Boolean =
+ serializers.get(o.getClass).map(_.toData(o, out)).isDefined
+
+ /**
+ * Unmarshalls with registered serializer.
+ *
+ * Tries to find a registered serializer for a given class
+ *
+ * - Lookup on class basis
+ * - Iterating through all serializer to find a compatible one
+ *
+ * By doing this, a java pojo can be unmarshalled from a scala case class (and vice versa)
+ *
+ * @return unmarshalled class or null
+ */
+ override def fromData(clazz: Class[_], in: PdxReader): AnyRef =
+ serializers
+ .get(clazz)
+ .map(_.fromData(clazz, in))
+ .orElse(serializers.collectFirst {
+ case (c, ser) if isPdxCompat(c, clazz) =>
+ val v = ser.fromData(clazz, in)
+ if (v != null) register(ser, clazz)
+ v
+ })
+ .orNull
+
+ override def init(props: Properties): Unit = {}
+}
diff --git a/geode/src/main/scala/akka/stream/alpakka/geode/internal/pdx/PdxDecoder.scala b/geode/src/main/scala/akka/stream/alpakka/geode/internal/pdx/PdxDecoder.scala
new file mode 100644
index 0000000000..56d05b2d3e
--- /dev/null
+++ b/geode/src/main/scala/akka/stream/alpakka/geode/internal/pdx/PdxDecoder.scala
@@ -0,0 +1,165 @@
+/*
+ * Copyright (C) 2016-2017 Lightbend Inc.
+ */
+package akka.stream.alpakka.geode.internal.pdx
+
+import java.util.{Date, UUID}
+
+import org.apache.geode.pdx.PdxReader
+
+import scala.util.{Failure, Success, Try}
+
+trait PdxDecoder[A] {
+
+ def decode(reader: PdxReader, fieldName: Symbol = null): Try[A]
+
+}
+
+object PdxDecoder {
+
+ import shapeless._
+ import shapeless.labelled._
+
+ private def instance[A](f: (PdxReader, Symbol) => Try[A]): PdxDecoder[A] =
+ new PdxDecoder[A] {
+ def decode(reader: PdxReader, fieldName: Symbol) = f(reader, fieldName)
+ }
+
+ implicit val hnilDecoder: PdxDecoder[HNil] = instance((_, _) => Success(HNil))
+
+ implicit val booleanDecoder: PdxDecoder[Boolean] = instance {
+ case (reader, fieldName) =>
+ Success(reader.readBoolean(fieldName.name))
+ }
+
+ implicit val booleanListDecoder: PdxDecoder[List[Boolean]] = instance {
+ case (reader, fieldName) =>
+ Success(reader.readBooleanArray(fieldName.name).toList)
+ }
+
+ implicit val booleanArrayDecoder: PdxDecoder[Array[Boolean]] = instance {
+ case (reader, fieldName) =>
+ Success(reader.readBooleanArray(fieldName.name))
+ }
+
+ implicit val intDecoder: PdxDecoder[Int] = instance { (reader, fieldName) =>
+ Success(reader.readInt(fieldName.name))
+ }
+
+ implicit val intListDecoder: PdxDecoder[List[Int]] = instance { (reader, fieldName) =>
+ Success(reader.readIntArray(fieldName.name).toList)
+ }
+
+ implicit val intArrayDecoder: PdxDecoder[Array[Int]] = instance { (reader, fieldName) =>
+ Success(reader.readIntArray(fieldName.name))
+ }
+
+ implicit val doubleDecoder: PdxDecoder[Double] = instance { (reader, fieldName) =>
+ Success(reader.readDouble(fieldName.name))
+ }
+
+ implicit val doubleListDecoder: PdxDecoder[List[Double]] = instance { (reader, fieldName) =>
+ Success(reader.readDoubleArray(fieldName.name).toList)
+ }
+
+ implicit val doubleArrayDecoder: PdxDecoder[Array[Double]] = instance { (reader, fieldName) =>
+ Success(reader.readDoubleArray(fieldName.name))
+ }
+
+ implicit val floatDecoder: PdxDecoder[Float] = instance { (reader, fieldName) =>
+ Success(reader.readFloat(fieldName.name))
+ }
+
+ implicit val floatListDecoder: PdxDecoder[List[Float]] = instance { (reader, fieldName) =>
+ Success(reader.readFloatArray(fieldName.name).toList)
+ }
+ implicit val floatArrayDecoder: PdxDecoder[Array[Float]] = instance { (reader, fieldName) =>
+ Success(reader.readFloatArray(fieldName.name))
+ }
+
+ implicit val longDecoder: PdxDecoder[Long] = instance { (reader, fieldName) =>
+ Success(reader.readLong(fieldName.name))
+ }
+
+ implicit val longListDecoder: PdxDecoder[List[Long]] = instance { (reader, fieldName) =>
+ Success(reader.readLongArray(fieldName.name).toList)
+ }
+ implicit val longArrayDecoder: PdxDecoder[Array[Long]] = instance { (reader, fieldName) =>
+ Success(reader.readLongArray(fieldName.name))
+ }
+
+ implicit val charDecoder: PdxDecoder[Char] = instance {
+ case (reader, fieldName) =>
+ Success(reader.readChar(fieldName.name))
+ }
+
+ implicit val charListDecoder: PdxDecoder[List[Char]] = instance {
+ case (reader, fieldName) =>
+ Success(reader.readCharArray(fieldName.name).toList)
+ }
+ implicit val charArrayDecoder: PdxDecoder[Array[Char]] = instance {
+ case (reader, fieldName) =>
+ Success(reader.readCharArray(fieldName.name))
+ }
+
+ implicit val stringDecoder: PdxDecoder[String] = instance {
+ case (reader, fieldName) =>
+ Success(reader.readString(fieldName.name))
+ }
+
+ implicit val stringListDecoder: PdxDecoder[List[String]] = instance {
+ case (reader, fieldName) =>
+ Success(reader.readStringArray(fieldName.name).toList)
+ }
+
+ implicit val stringArrayDecoder: PdxDecoder[Array[String]] = instance {
+ case (reader, fieldName) =>
+ Success(reader.readStringArray(fieldName.name))
+ }
+
+ implicit val dategDecoder: PdxDecoder[Date] = instance {
+ case (reader, fieldName) =>
+ Success(reader.readDate(fieldName.name))
+ }
+
+ implicit val uuidDecoder: PdxDecoder[UUID] = instance {
+ case (reader, fieldName) =>
+ Try(UUID.fromString(reader.readString(fieldName.name)))
+ }
+
+ implicit def listDecoder[T <: AnyRef]: PdxDecoder[List[T]] = instance {
+ case (reader, fieldName) =>
+ Try(reader.readObjectArray(fieldName.name).toList.asInstanceOf[List[T]])
+ }
+
+ implicit def setDecoder[T <: AnyRef]: PdxDecoder[Set[T]] = instance {
+ case (reader, fieldName) =>
+ Try(reader.readObjectArray(fieldName.name).toSet.asInstanceOf[Set[T]])
+ }
+
+ implicit def hlistDecoder[K <: Symbol, H, T <: HList](
+ implicit witness: Witness.Aux[K],
+ hDecoder: Lazy[PdxDecoder[H]],
+ tDecoder: Lazy[PdxDecoder[T]]
+ ): PdxDecoder[FieldType[K, H] :: T] = instance {
+ case (reader, fieldName) => {
+ val headField = hDecoder.value.decode(reader, witness.value)
+ val tailFields = tDecoder.value.decode(reader, fieldName)
+ (headField, tailFields) match {
+ case (Success(h), Success(t)) => Success(field[K](h) :: t)
+ case _ => Failure(null)
+ }
+ }
+ case e => Failure(null)
+ }
+
+ implicit def objectDecoder[A, Repr <: HList](
+ implicit gen: LabelledGeneric.Aux[A, Repr],
+ hlistDecoder: PdxDecoder[Repr]
+ ): PdxDecoder[A] = instance { (reader, fieldName) =>
+ hlistDecoder.decode(reader, fieldName).map(gen.from)
+ }
+
+ def apply[A](implicit ev: PdxDecoder[A]): PdxDecoder[A] = ev
+
+}
diff --git a/geode/src/main/scala/akka/stream/alpakka/geode/internal/pdx/PdxEncoder.scala b/geode/src/main/scala/akka/stream/alpakka/geode/internal/pdx/PdxEncoder.scala
new file mode 100644
index 0000000000..16441d39ea
--- /dev/null
+++ b/geode/src/main/scala/akka/stream/alpakka/geode/internal/pdx/PdxEncoder.scala
@@ -0,0 +1,227 @@
+/*
+ * Copyright (C) 2016-2017 Lightbend Inc.
+ */
+package akka.stream.alpakka.geode.internal.pdx
+
+import java.util.{Date, UUID}
+
+import org.apache.geode.pdx.PdxWriter
+import shapeless.ops.hlist.IsHCons
+
+trait PdxEncoder[A] {
+ def encode(writer: PdxWriter, a: A, fieldName: Symbol = null): Boolean
+}
+
+object PdxEncoder {
+
+ import shapeless._
+ import shapeless.labelled._
+
+ private def instance[A](f: (PdxWriter, A, Symbol) => Boolean) =
+ new PdxEncoder[A] {
+ def encode(writer: PdxWriter, a: A, fieldName: Symbol = null): Boolean = f(writer, a, fieldName)
+ }
+
+ implicit val hnilEncoder: PdxEncoder[HNil] =
+ instance[HNil] { case _ => true }
+
+ implicit def hlistEncoder[K <: Symbol, H, T <: shapeless.HList](
+ implicit witness: Witness.Aux[K],
+ isHCons: IsHCons.Aux[H :: T, H, T],
+ hEncoder: Lazy[PdxEncoder[H]],
+ tEncoder: Lazy[PdxEncoder[T]]
+ ): PdxEncoder[FieldType[K, H] :: T] =
+ instance[FieldType[K, H] :: T] {
+ case (writer, o, fieldName) =>
+ hEncoder.value.encode(writer, isHCons.head(o), witness.value)
+ tEncoder.value.encode(writer, isHCons.tail(o), fieldName)
+
+ }
+
+ implicit def objectEncoder[A, Repr <: HList](
+ implicit gen: LabelledGeneric.Aux[A, Repr],
+ hlistEncoder: Lazy[PdxEncoder[Repr]]
+ ): PdxEncoder[A] = instance {
+ case (writer, o, fieldName) =>
+ hlistEncoder.value.encode(writer, gen.to(o), fieldName)
+ }
+
+ def apply[A](implicit enc: PdxEncoder[A]): PdxEncoder[A] = enc
+
+ implicit def booleanEncoder: PdxEncoder[Boolean] = instance {
+ case (writer: PdxWriter, b: Boolean, fieldName: Symbol) =>
+ writer.writeBoolean(fieldName.name, b)
+ true
+ case _ => false
+ }
+
+ implicit def booleanListEncoder: PdxEncoder[List[Boolean]] = instance {
+ case (writer: PdxWriter, bs: List[Boolean], fieldName: Symbol) =>
+ writer.writeBooleanArray(fieldName.name, bs.toArray)
+ true
+ case _ => false
+ }
+
+ implicit def booleanArrayEncoder: PdxEncoder[Array[Boolean]] = instance {
+ case (writer: PdxWriter, bs: Array[Boolean], fieldName: Symbol) =>
+ writer.writeBooleanArray(fieldName.name, bs)
+ true
+ case _ => false
+ }
+
+ implicit def intEncoder: PdxEncoder[Int] = instance {
+ case (writer: PdxWriter, i: Int, fieldName: Symbol) =>
+ writer.writeInt(fieldName.name, i)
+ true
+ case _ => false
+ }
+
+ implicit def intListEncoder: PdxEncoder[List[Int]] = instance {
+ case (writer: PdxWriter, is: List[Int], fieldName: Symbol) =>
+ writer.writeIntArray(fieldName.name, is.toArray)
+ true
+ case _ => false
+ }
+
+ implicit def intArrayEncoder: PdxEncoder[Array[Int]] = instance {
+ case (writer: PdxWriter, is: Array[Int], fieldName: Symbol) =>
+ writer.writeIntArray(fieldName.name, is)
+ true
+ case _ => false
+ }
+
+ implicit def doubleEncoder: PdxEncoder[Double] = instance {
+ case (writer: PdxWriter, d: Double, fieldName: Symbol) =>
+ writer.writeDouble(fieldName.name, d)
+ true
+ case _ => false
+ }
+
+ implicit def doubleListEncoder: PdxEncoder[List[Double]] = instance {
+ case (writer: PdxWriter, ds: List[Double], fieldName: Symbol) =>
+ writer.writeDoubleArray(fieldName.name, ds.toArray)
+ true
+ case _ => false
+ }
+ implicit def doubleArrayEncoder: PdxEncoder[Array[Double]] = instance {
+ case (writer: PdxWriter, ds: Array[Double], fieldName: Symbol) =>
+ writer.writeDoubleArray(fieldName.name, ds)
+ true
+ case _ => false
+ }
+
+ implicit def floatEncoder: PdxEncoder[Float] = instance {
+ case (writer: PdxWriter, f: Float, fieldName: Symbol) =>
+ writer.writeFloat(fieldName.name, f)
+ true
+ case _ => false
+ }
+
+ implicit def floatListEncoder: PdxEncoder[List[Float]] = instance {
+ case (writer: PdxWriter, fs: List[Float], fieldName: Symbol) =>
+ writer.writeFloatArray(fieldName.name, fs.toArray)
+ true
+ case _ => false
+ }
+ implicit def floatArrayEncoder: PdxEncoder[Array[Float]] = instance {
+ case (writer: PdxWriter, fs: Array[Float], fieldName: Symbol) =>
+ writer.writeFloatArray(fieldName.name, fs)
+ true
+ case _ => false
+ }
+
+ implicit def longEncoder: PdxEncoder[Long] = instance {
+ case (writer: PdxWriter, l: Long, fieldName: Symbol) =>
+ writer.writeLong(fieldName.name, l)
+ true
+ case _ => false
+ }
+
+ implicit def longListEncoder: PdxEncoder[List[Long]] = instance {
+ case (writer: PdxWriter, ls: List[Long], fieldName: Symbol) =>
+ writer.writeLongArray(fieldName.name, ls.toArray)
+ true
+ case _ => false
+ }
+ implicit def longArrayEncoder: PdxEncoder[Array[Long]] = instance {
+ case (writer: PdxWriter, ls: Array[Long], fieldName: Symbol) =>
+ writer.writeLongArray(fieldName.name, ls)
+ true
+ case _ => false
+ }
+
+ implicit def dateEncoder: PdxEncoder[Date] = instance {
+ case (writer: PdxWriter, d: Date, fieldName: Symbol) =>
+ writer.writeDate(fieldName.name, d)
+ true
+ case _ => false
+ }
+
+ implicit def charEncoder: PdxEncoder[Char] =
+ instance {
+ case (writer: PdxWriter, c: Char, fieldName: Symbol) =>
+ writer.writeChar(fieldName.name, c)
+ true
+ case _ => false
+ }
+ implicit def charListEncoder: PdxEncoder[List[Char]] =
+ instance {
+ case (writer: PdxWriter, cs: List[Char], fieldName: Symbol) =>
+ writer.writeCharArray(fieldName.name, cs.toArray)
+ true
+ case _ => false
+ }
+ implicit def charArrayEncoder: PdxEncoder[Array[Char]] =
+ instance {
+ case (writer: PdxWriter, cs: Array[Char], fieldName: Symbol) =>
+ writer.writeCharArray(fieldName.name, cs)
+ true
+ case _ => false
+ }
+
+ implicit def stringEncoder: PdxEncoder[String] =
+ instance {
+ case (writer: PdxWriter, str: String, fieldName: Symbol) =>
+ writer.writeString(fieldName.name, str)
+ true
+ case _ => false
+ }
+
+ implicit def stringListEncoder: PdxEncoder[List[String]] =
+ instance {
+ case (writer: PdxWriter, strs: List[String], fieldName: Symbol) =>
+ writer.writeStringArray(fieldName.name, strs.toArray)
+ true
+ case _ => false
+ }
+ implicit def stringArrayEncoder: PdxEncoder[Array[String]] =
+ instance {
+ case (writer: PdxWriter, strs: Array[String], fieldName: Symbol) =>
+ writer.writeStringArray(fieldName.name, strs)
+ true
+ case _ => false
+ }
+
+ implicit def uuidEncoder: PdxEncoder[UUID] =
+ instance {
+ case (writer: PdxWriter, uuid: UUID, fieldName: Symbol) =>
+ writer.writeString(fieldName.name, uuid.toString)
+ true
+ case _ => false
+ }
+
+ implicit def listEncoder[T <: AnyRef]: PdxEncoder[List[T]] = instance {
+ case (writer: PdxWriter, list: List[T], fieldName: Symbol) =>
+ writer.writeObjectArray(fieldName.name, list.toArray)
+ true
+ case _ => false
+ }
+
+ implicit def setEncoder[T <: AnyRef]: PdxEncoder[Set[T]] = instance {
+ case (writer: PdxWriter, set: Set[T], fieldName: Symbol) =>
+ writer.writeObjectArray(fieldName.name, set.toArray)
+ true
+ case _ => false
+ }
+
+}
diff --git a/geode/src/main/scala/akka/stream/alpakka/geode/internal/pdx/ShapelessPdxSerializer.scala b/geode/src/main/scala/akka/stream/alpakka/geode/internal/pdx/ShapelessPdxSerializer.scala
new file mode 100644
index 0000000000..c6844a1027
--- /dev/null
+++ b/geode/src/main/scala/akka/stream/alpakka/geode/internal/pdx/ShapelessPdxSerializer.scala
@@ -0,0 +1,33 @@
+/*
+ * Copyright (C) 2016-2017 Lightbend Inc.
+ */
+package akka.stream.alpakka.geode.internal.pdx
+
+import java.util.Properties
+
+import org.apache.geode.cache.Declarable
+import org.apache.geode.pdx.{PdxReader, PdxSerializer, PdxWriter}
+
+import scala.reflect.ClassTag
+import scala.util.Success
+
+//#shapeless-pdx-serializer
+private[geode] class ShapelessPdxSerializer[A <: AnyRef](enc: PdxEncoder[A],
+ dec: PdxDecoder[A])(implicit tag: ClassTag[A])
+ extends PdxSerializer
+ with Declarable {
+
+ override def toData(o: scala.Any, out: PdxWriter): Boolean =
+ tag.runtimeClass.isInstance(o) &&
+ enc.encode(out, o.asInstanceOf[A])
+
+ override def fromData(clazz: Class[_], in: PdxReader): A =
+ dec.decode(in, null) match {
+ case Success(e) => e
+ case _ => null.asInstanceOf[A]
+ }
+
+ override def init(props: Properties): Unit = {}
+}
+
+//#shapeless-pdx-serializer
diff --git a/geode/src/main/scala/akka/stream/alpakka/geode/internal/stage/GeodeCQueryGraphLogic.scala b/geode/src/main/scala/akka/stream/alpakka/geode/internal/stage/GeodeCQueryGraphLogic.scala
new file mode 100644
index 0000000000..e7d08399c3
--- /dev/null
+++ b/geode/src/main/scala/akka/stream/alpakka/geode/internal/stage/GeodeCQueryGraphLogic.scala
@@ -0,0 +1,119 @@
+/*
+ * Copyright (C) 2016-2017 Lightbend Inc.
+ */
+package akka.stream.alpakka.geode.internal.stage
+
+import java.util
+import java.util.concurrent.Semaphore
+
+import akka.stream.stage.{AsyncCallback, StageLogging}
+import akka.stream.{Outlet, SourceShape}
+import org.apache.geode.cache.client.ClientCache
+import org.apache.geode.cache.query.{CqAttributesFactory, CqEvent, CqQuery, Struct}
+import org.apache.geode.cache.util.CqListenerAdapter
+
+import scala.collection.mutable
+import scala.util.Try
+
+abstract class GeodeCQueryGraphLogic[V](val shape: SourceShape[V],
+ val clientCache: ClientCache,
+ val queryName: Symbol,
+ val sql: String)
+ extends GeodeSourceStageLogic[V](shape, clientCache)
+ with StageLogging {
+
+ /**
+ * Queue containing, only
+ */
+ private val incomingQueue = mutable.Queue[V]()
+
+ private val semaphore = new Semaphore(10)
+
+ val onElement: AsyncCallback[V]
+
+ private var query: CqQuery = _
+
+ override def executeQuery() = Try {
+
+ val cqf = new CqAttributesFactory()
+
+ val eventListener = new CqListenerAdapter() {
+ override def onEvent(ev: CqEvent): Unit =
+ onGeodeElement((ev.getNewValue().asInstanceOf[V]))
+
+ override def onError(ev: CqEvent): Unit =
+ log.error(ev.getThrowable, s"$ev")
+
+ override def close(): Unit = {
+ log.debug("closes")
+ inFinish.invoke(())
+ }
+ }
+ cqf.addCqListener(eventListener)
+
+ val cqa = cqf.create()
+
+ query = qs.newCq(queryName.name, sql, cqa)
+
+ buildInitialResulsIterator(query)
+
+ }
+
+ private def buildInitialResulsIterator(q: CqQuery) = {
+ val res = q.executeWithInitialResults[Struct]
+ val it = res.iterator()
+ new util.Iterator[V] {
+ override def next(): V =
+ it.next().getFieldValues()(1).asInstanceOf[V]
+
+ override def hasNext: Boolean = it.hasNext
+ }
+ }
+
+ /**
+ * May lock on semaphore.acquires().
+ */
+ protected def onGeodeElement(v: V): Unit = {
+ semaphore.acquire()
+ onElement.invoke(v)
+ }
+
+ protected def enqueue(v: V): Unit =
+ incomingQueue.enqueue(v)
+
+ protected def dequeue(): Option[V] =
+ if (incomingQueue.isEmpty)
+ None
+ else
+ Some(incomingQueue.dequeue())
+
+ /**
+ * Pushes an element downstream and releases a semaphore acquired in onGeodeElement.
+ */
+ protected def pushElement(out: Outlet[V], element: V) = {
+ push(out, element)
+ semaphore.release()
+ }
+
+ override def postStop(): Unit = {
+ if (clientCache.isClosed)
+ return
+ qs.closeCqs()
+ }
+
+ /**
+ * Geode upstream is terminated.
+ */
+ @volatile
+ private var upstreamTerminated = false
+
+ val inFinish: AsyncCallback[Unit] = getAsyncCallback[Unit] { v =>
+ upstreamTerminated = true
+ handleTerminaison()
+ }
+
+ def handleTerminaison() =
+ if (upstreamTerminated && incomingQueue.isEmpty)
+ completeStage()
+
+}
diff --git a/geode/src/main/scala/akka/stream/alpakka/geode/internal/stage/GeodeContinuousSourceStage.scala b/geode/src/main/scala/akka/stream/alpakka/geode/internal/stage/GeodeContinuousSourceStage.scala
new file mode 100644
index 0000000000..1da48bb4f6
--- /dev/null
+++ b/geode/src/main/scala/akka/stream/alpakka/geode/internal/stage/GeodeContinuousSourceStage.scala
@@ -0,0 +1,62 @@
+/*
+ * Copyright (C) 2016-2017 Lightbend Inc.
+ */
+package akka.stream.alpakka.geode.internal.stage
+
+import akka.Done
+import akka.stream.stage._
+import akka.stream.{ActorAttributes, Attributes, Outlet, SourceShape}
+import org.apache.geode.cache.client.ClientCache
+
+import scala.concurrent.{Future, Promise}
+
+class GeodeContinuousSourceStage[V](cache: ClientCache, name: Symbol, sql: String)
+ extends GraphStageWithMaterializedValue[SourceShape[V], Future[Done]] {
+
+ override protected def initialAttributes: Attributes =
+ Attributes
+ .name("GeodeContinuousSource")
+ .and(ActorAttributes.dispatcher("akka.stream.default-blocking-io-dispatcher"))
+
+ val out = Outlet[V](s"geode.continuousSource")
+
+ override def shape: SourceShape[V] = SourceShape.of(out)
+
+ override def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, Future[Done]) = {
+ val subPromise = Promise[Done]
+
+ (new GeodeCQueryGraphLogic[V](shape, cache, name, sql) {
+
+ override val onConnect: AsyncCallback[Unit] = getAsyncCallback[Unit] { v =>
+ subPromise.success(Done)
+ }
+
+ val onElement: AsyncCallback[V] = getAsyncCallback[V] { element =>
+ if (isAvailable(out)) {
+ pushElement(out, element)
+ } else
+ enqueue(element)
+ handleTerminaison()
+ }
+
+ //
+ // This handler, will first forward initial (old) result, then new ones (continuous).
+ //
+ setHandler(
+ out,
+ new OutHandler {
+ override def onPull() = {
+ if (initialResultsIterator.hasNext)
+ push(out, initialResultsIterator.next())
+ else
+ dequeue() foreach { e =>
+ pushElement(out, e)
+ }
+ handleTerminaison()
+ }
+ }
+ )
+
+ }, subPromise.future)
+ }
+}
diff --git a/geode/src/main/scala/akka/stream/alpakka/geode/internal/stage/GeodeFiniteSourceStage.scala b/geode/src/main/scala/akka/stream/alpakka/geode/internal/stage/GeodeFiniteSourceStage.scala
new file mode 100644
index 0000000000..e1792032a1
--- /dev/null
+++ b/geode/src/main/scala/akka/stream/alpakka/geode/internal/stage/GeodeFiniteSourceStage.scala
@@ -0,0 +1,45 @@
+/*
+ * Copyright (C) 2016-2017 Lightbend Inc.
+ */
+package akka.stream.alpakka.geode.internal.stage
+
+import akka.Done
+import akka.stream.stage._
+import akka.stream.{ActorAttributes, Attributes, Outlet, SourceShape}
+import org.apache.geode.cache.client.ClientCache
+
+import scala.concurrent.{Future, Promise}
+
+class GeodeFiniteSourceStage[V](cache: ClientCache, sql: String)
+ extends GraphStageWithMaterializedValue[SourceShape[V], Future[Done]] {
+
+ override protected def initialAttributes: Attributes =
+ Attributes.name("GeodeFiniteSource").and(ActorAttributes.dispatcher("akka.stream.default-blocking-io-dispatcher"))
+
+ val out = Outlet[V]("geode.finiteSource")
+
+ override def shape: SourceShape[V] = SourceShape.of(out)
+
+ override def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, Future[Done]) = {
+ val subPromise = Promise[Done]
+
+ (new GeodeQueryGraphLogic[V](shape, cache, sql) {
+
+ override val onConnect: AsyncCallback[Unit] = getAsyncCallback[Unit] { v =>
+ subPromise.success(Done)
+ }
+
+ setHandler(
+ out,
+ new OutHandler {
+ override def onPull() =
+ if (initialResultsIterator.hasNext)
+ push(out, initialResultsIterator.next())
+ else
+ completeStage()
+ }
+ )
+
+ }, subPromise.future)
+ }
+}
diff --git a/geode/src/main/scala/akka/stream/alpakka/geode/internal/stage/GeodeFlowStage.scala b/geode/src/main/scala/akka/stream/alpakka/geode/internal/stage/GeodeFlowStage.scala
new file mode 100644
index 0000000000..373c0ee3c8
--- /dev/null
+++ b/geode/src/main/scala/akka/stream/alpakka/geode/internal/stage/GeodeFlowStage.scala
@@ -0,0 +1,54 @@
+/*
+ * Copyright (C) 2016-2017 Lightbend Inc.
+ */
+package akka.stream.alpakka.geode.internal.stage
+
+import akka.stream._
+import akka.stream.alpakka.geode.RegionSettings
+import akka.stream.alpakka.geode.internal.GeodeCapabilities
+import akka.stream.stage._
+import org.apache.geode.cache.client.ClientCache
+
+private[geode] class GeodeFlowStage[K, T <: AnyRef](cache: ClientCache, settings: RegionSettings[K, T])
+ extends GraphStage[FlowShape[T, T]] {
+
+ override protected def initialAttributes: Attributes =
+ Attributes.name("GeodeFLow").and(ActorAttributes.dispatcher("akka.stream.default-blocking-io-dispatcher"))
+
+ private val in = Inlet[T]("geode.in")
+ private val out = Outlet[T]("geode.out")
+
+ override val shape = FlowShape(in, out)
+
+ override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
+ new GraphStageLogic(shape) with StageLogging with GeodeCapabilities[K, T] {
+
+ override protected def logSource = classOf[GeodeFlowStage[K, T]]
+
+ val regionSettings = settings
+
+ val clientCache = cache
+
+ setHandler(out, new OutHandler {
+ override def onPull() =
+ pull(in)
+ })
+
+ setHandler(in, new InHandler {
+ override def onPush() = {
+ val msg = grab(in)
+
+ put(msg)
+
+ push(out, msg)
+ }
+
+ })
+
+ override def postStop() = {
+ log.debug("Stage completed")
+ close()
+ }
+ }
+
+}
diff --git a/geode/src/main/scala/akka/stream/alpakka/geode/internal/stage/GeodeQueryGraphLogic.scala b/geode/src/main/scala/akka/stream/alpakka/geode/internal/stage/GeodeQueryGraphLogic.scala
new file mode 100644
index 0000000000..1ced1896dc
--- /dev/null
+++ b/geode/src/main/scala/akka/stream/alpakka/geode/internal/stage/GeodeQueryGraphLogic.scala
@@ -0,0 +1,24 @@
+/*
+ * Copyright (C) 2016-2017 Lightbend Inc.
+ */
+package akka.stream.alpakka.geode.internal.stage
+
+import akka.stream.SourceShape
+import akka.stream.stage.StageLogging
+import org.apache.geode.cache.client.ClientCache
+import org.apache.geode.cache.query.SelectResults
+
+import scala.util.Try
+
+abstract class GeodeQueryGraphLogic[V](val shape: SourceShape[V], val clientCache: ClientCache, val query: String)
+ extends GeodeSourceStageLogic[V](shape, clientCache)
+ with StageLogging {
+
+ override def executeQuery() = Try {
+ qs.newQuery(query)
+ .execute()
+ .asInstanceOf[SelectResults[V]]
+ .iterator()
+ }
+
+}
diff --git a/geode/src/main/scala/akka/stream/alpakka/geode/internal/stage/GeodeSourceStageLogic.scala b/geode/src/main/scala/akka/stream/alpakka/geode/internal/stage/GeodeSourceStageLogic.scala
new file mode 100644
index 0000000000..9ab64e1ef9
--- /dev/null
+++ b/geode/src/main/scala/akka/stream/alpakka/geode/internal/stage/GeodeSourceStageLogic.scala
@@ -0,0 +1,31 @@
+/*
+ * Copyright (C) 2016-2017 Lightbend Inc.
+ */
+package akka.stream.alpakka.geode.internal.stage
+
+import akka.stream.SourceShape
+import akka.stream.stage.{AsyncCallback, GraphStageLogic}
+import org.apache.geode.cache.client.ClientCache
+
+import scala.util.{Failure, Success, Try}
+
+abstract class GeodeSourceStageLogic[V](shape: SourceShape[V], clientCache: ClientCache)
+ extends GraphStageLogic(shape) {
+
+ protected var initialResultsIterator: java.util.Iterator[V] = _
+
+ val onConnect: AsyncCallback[Unit]
+
+ lazy val qs = clientCache.getQueryService()
+
+ def executeQuery(): Try[java.util.Iterator[V]]
+
+ final override def preStart(): Unit = executeQuery() match {
+ case Success(it) =>
+ initialResultsIterator = it
+ onConnect.invoke(())
+ case Failure(e) =>
+ failStage(e)
+
+ }
+}
diff --git a/geode/src/main/scala/akka/stream/alpakka/geode/scaladsl/ReactiveGeode.scala b/geode/src/main/scala/akka/stream/alpakka/geode/scaladsl/ReactiveGeode.scala
new file mode 100644
index 0000000000..0b8da425bc
--- /dev/null
+++ b/geode/src/main/scala/akka/stream/alpakka/geode/scaladsl/ReactiveGeode.scala
@@ -0,0 +1,112 @@
+/*
+ * Copyright (C) 2016-2017 Lightbend Inc.
+ */
+package akka.stream.alpakka.geode.scaladsl
+
+import akka.stream.alpakka.geode.internal._
+import akka.stream.alpakka.geode.internal.pdx.{PdxDecoder, PdxEncoder, ShapelessPdxSerializer}
+import akka.stream.alpakka.geode.internal.stage.{GeodeContinuousSourceStage, GeodeFiniteSourceStage, GeodeFlowStage}
+import akka.stream.alpakka.geode.{AkkaPdxSerializer, GeodeSettings, RegionSettings}
+import akka.stream.scaladsl.{Flow, Keep, Sink, Source}
+import akka.{Done, NotUsed}
+import org.apache.geode.cache.client.ClientCacheFactory
+
+import scala.concurrent.Future
+import scala.reflect.ClassTag
+
+class ReactiveGeode(settings: GeodeSettings) extends GeodeCache(settings) {
+
+ /**
+ * This method will overloaded to provide server event subscription.
+ */
+ override protected def configure(factory: ClientCacheFactory): ClientCacheFactory =
+ factory.addPoolLocator(settings.hostname, settings.port)
+
+ def query[V <: AnyRef](query: String, serializer: AkkaPdxSerializer[V]): Source[V, Future[Done]] = {
+
+ registerPDXSerializer(serializer, serializer.clazz)
+
+ Source.fromGraph(new GeodeFiniteSourceStage[V](cache, query))
+ }
+
+ def flow[K, V <: AnyRef](settings: RegionSettings[K, V], serializer: AkkaPdxSerializer[V]): Flow[V, V, NotUsed] = {
+
+ registerPDXSerializer(serializer, serializer.clazz)
+
+ Flow.fromGraph(new GeodeFlowStage[K, V](cache, settings))
+ }
+
+ def sink[K, V <: AnyRef](settings: RegionSettings[K, V], serializer: AkkaPdxSerializer[V]): Sink[V, Future[Done]] =
+ Flow[V].via(flow(settings, serializer)).toMat(Sink.ignore)(Keep.right)
+
+ /**
+ * Shapeless powered implicit serializer.
+ */
+ def query[V <: AnyRef](
+ query: String
+ )(implicit tag: ClassTag[V], enc: PdxEncoder[V], dec: PdxDecoder[V]): Source[V, Future[Done]] = {
+
+ registerPDXSerializer(new ShapelessPdxSerializer[V](enc, dec), tag.runtimeClass)
+
+ Source.fromGraph(new GeodeFiniteSourceStage[V](cache, query))
+ }
+
+ /**
+ * Shapeless powered implicit serializer.
+ */
+ def flow[K, V <: AnyRef](
+ settings: RegionSettings[K, V]
+ )(implicit tag: ClassTag[V], enc: PdxEncoder[V], dec: PdxDecoder[V]): Flow[V, V, NotUsed] = {
+
+ registerPDXSerializer(new ShapelessPdxSerializer[V](enc, dec), tag.runtimeClass)
+
+ Flow.fromGraph(new GeodeFlowStage[K, V](cache, settings))
+ }
+
+ /**
+ * Shapeless powered implicit serializer.
+ */
+ def sink[K, V <: AnyRef](
+ settings: RegionSettings[K, V]
+ )(implicit tag: ClassTag[V], enc: PdxEncoder[V], dec: PdxDecoder[V]): Sink[V, Future[Done]] =
+ Flow[V].via(flow(settings)).toMat(Sink.ignore)(Keep.right)
+
+}
+
+trait PoolSubscription extends ReactiveGeode {
+
+ /**
+ * Pool subscription is mandatory for continuous query.
+ */
+ final override protected def configure(factory: ClientCacheFactory) =
+ super.configure(factory).setPoolSubscriptionEnabled(true)
+
+ def continuousQuery[V <: AnyRef](queryName: Symbol,
+ query: String,
+ serializer: AkkaPdxSerializer[V]): Source[V, Future[Done]] = {
+
+ registerPDXSerializer(serializer, serializer.clazz)
+
+ Source.fromGraph(new GeodeContinuousSourceStage[V](cache, queryName, query))
+ }
+
+ /**
+ * Shapeless powered implicit serializer.
+ */
+ def continuousQuery[V <: AnyRef](
+ queryName: Symbol,
+ query: String
+ )(implicit tag: ClassTag[V], enc: PdxEncoder[V], dec: PdxDecoder[V]): Source[V, Future[Done]] = {
+
+ registerPDXSerializer(new ShapelessPdxSerializer[V](enc, dec), tag.runtimeClass)
+
+ Source.fromGraph(new GeodeContinuousSourceStage[V](cache, queryName, query))
+ }
+
+ def closeContinuousQuery(queryName: Symbol) =
+ for {
+ qs <- Option(cache.getQueryService())
+ query <- Option(qs.getCq(queryName.name))
+ } yield (query.close())
+
+}
diff --git a/geode/src/test/java/akka/stream/alpakka/geode/javadsl/Animal.java b/geode/src/test/java/akka/stream/alpakka/geode/javadsl/Animal.java
new file mode 100644
index 0000000000..9667572473
--- /dev/null
+++ b/geode/src/test/java/akka/stream/alpakka/geode/javadsl/Animal.java
@@ -0,0 +1,34 @@
+/*
+ * Copyright (C) 2016-2017 Lightbend Inc.
+ */
+package akka.stream.alpakka.geode.javadsl;
+
+public class Animal {
+ final private int id;
+ final private String name;
+ final private int owner;
+
+
+ public Animal(int id, String name, int owner) {
+ this.id = id;
+ this.name = name;
+ this.owner = owner;
+ }
+
+ public int getId() {
+ return id;
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public int getOwner() {
+ return owner;
+ }
+
+ @Override
+ public String toString() {
+ return getId() +": " + getName() + " owner: " + getOwner();
+ }
+}
\ No newline at end of file
diff --git a/geode/src/test/java/akka/stream/alpakka/geode/javadsl/AnimalPdxSerializer.java b/geode/src/test/java/akka/stream/alpakka/geode/javadsl/AnimalPdxSerializer.java
new file mode 100644
index 0000000000..d977752641
--- /dev/null
+++ b/geode/src/test/java/akka/stream/alpakka/geode/javadsl/AnimalPdxSerializer.java
@@ -0,0 +1,39 @@
+/*
+ * Copyright (C) 2016-2017 Lightbend Inc.
+ */
+package akka.stream.alpakka.geode.javadsl;
+
+import akka.stream.alpakka.geode.AkkaPdxSerializer;
+import org.apache.geode.pdx.PdxReader;
+import org.apache.geode.pdx.PdxWriter;
+
+//#animal-pdx-serializer
+public class AnimalPdxSerializer implements AkkaPdxSerializer {
+ @Override
+ public Class clazz() {
+ return Animal.class;
+ }
+
+ @Override
+ public boolean toData(Object o, PdxWriter out) {
+ if(o instanceof Animal){
+ Animal p = (Animal) o;
+ out.writeInt("id", p.getId());
+ out.writeString("name", p.getName());
+ out.writeInt("owner", p.getOwner());
+ return true;
+ }
+ return false;
+ }
+
+ @Override
+ public Object fromData(Class> clazz, PdxReader in) {
+ int id = in.readInt("id");
+ String name = in.readString("name");
+ int owner = in.readInt("owner");
+ return new Animal(id, name, owner);
+ }
+
+
+}
+//#animal-pdx-serializer
\ No newline at end of file
diff --git a/geode/src/test/java/akka/stream/alpakka/geode/javadsl/GeodeBaseTestCase.java b/geode/src/test/java/akka/stream/alpakka/geode/javadsl/GeodeBaseTestCase.java
new file mode 100644
index 0000000000..c685e070e8
--- /dev/null
+++ b/geode/src/test/java/akka/stream/alpakka/geode/javadsl/GeodeBaseTestCase.java
@@ -0,0 +1,82 @@
+/*
+ * Copyright (C) 2016-2017 Lightbend Inc.
+ */
+package akka.stream.alpakka.geode.javadsl;
+
+import akka.NotUsed;
+import akka.actor.ActorSystem;
+import akka.stream.ActorMaterializer;
+import akka.stream.Materializer;
+import akka.stream.alpakka.geode.GeodeSettings;
+import akka.stream.alpakka.geode.RegionSettings;
+import akka.stream.javadsl.Source;
+import akka.testkit.JavaTestKit;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+import java.util.Date;
+
+import static scala.compat.java8.JFunction.func;
+
+public class GeodeBaseTestCase {
+
+
+ protected static final Logger LOGGER = LoggerFactory.getLogger(GeodeFlowTestCase.class);
+
+ private static ActorSystem system;
+ protected static Materializer materializer;
+ private String geodeDockerHostname="localhost";
+
+ {
+ String geodeItHostname=System.getenv("IT_GEODE_HOSTNAME");
+ if(geodeItHostname!=null)
+ geodeDockerHostname=geodeItHostname;
+ }
+
+ //#region
+ protected RegionSettings personRegionSettings = new RegionSettings<>("persons", func(Person::getId));
+ protected RegionSettings animalRegionSettings = new RegionSettings<>("animals", func(Animal::getId));
+ //#region
+
+ @BeforeClass
+ public static void setup() {
+ system = ActorSystem.create();
+ materializer = ActorMaterializer.create(system);
+
+ }
+
+ static Source buildPersonsSource(Integer... ids) {
+ return Source.from(Arrays.asList(ids))
+ .map((i) -> new Person(i, String.format("Person Java %d", i), new Date()));
+ }
+
+ static Source buildAnimalsSource(Integer... ids) {
+ return Source.from(Arrays.asList(ids))
+ .map((i) -> new Animal(i, String.format("Animal Java %d", i), 1));
+ }
+
+ protected ReactiveGeode createReactiveGeode() {
+ //#connection
+ GeodeSettings settings = GeodeSettings.create(geodeDockerHostname, 10334)
+ .withConfiguration(func(c->c.setPoolIdleTimeout(10)));
+ return new ReactiveGeode(settings);
+ //#connection
+ }
+
+ protected ReactiveGeodeWithPoolSubscription createReactiveGeodeWithPoolSubscription() {
+ GeodeSettings settings = GeodeSettings.create(geodeDockerHostname, 10334);
+ //#connection-with-pool
+ return new ReactiveGeodeWithPoolSubscription(settings);
+ //#connection-with-pool
+ }
+
+ @AfterClass
+ public static void teardown() {
+ JavaTestKit.shutdownActorSystem(system);
+ }
+
+}
diff --git a/geode/src/test/java/akka/stream/alpakka/geode/javadsl/GeodeContinuousSourceTestCase.java b/geode/src/test/java/akka/stream/alpakka/geode/javadsl/GeodeContinuousSourceTestCase.java
new file mode 100644
index 0000000000..27f64352ba
--- /dev/null
+++ b/geode/src/test/java/akka/stream/alpakka/geode/javadsl/GeodeContinuousSourceTestCase.java
@@ -0,0 +1,54 @@
+/*
+ * Copyright (C) 2016-2017 Lightbend Inc.
+ */
+package akka.stream.alpakka.geode.javadsl;
+
+import akka.Done;
+import akka.NotUsed;
+import akka.japi.Pair;
+import akka.stream.javadsl.Flow;
+import akka.stream.javadsl.Keep;
+import akka.stream.javadsl.Sink;
+import akka.stream.javadsl.Source;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.Date;
+import java.util.List;
+import java.util.concurrent.CompletionStage;
+import java.util.concurrent.ExecutionException;
+
+
+public class GeodeContinuousSourceTestCase extends GeodeBaseTestCase {
+
+ @Test
+ public void continuousSourceTest() throws ExecutionException, InterruptedException {
+
+ ReactiveGeodeWithPoolSubscription reactiveGeode = createReactiveGeodeWithPoolSubscription();
+
+ //#continuousQuery
+ CompletionStage fut = reactiveGeode.continuousQuery("test", "select * from /persons", new PersonPdxSerializer())
+ .runForeach(p -> {
+ LOGGER.debug(p.toString());
+ if (p.getId() == 120) {
+ reactiveGeode.closeContinuousQuery("test");
+ }
+ }, materializer);
+ //#continuousQuery
+
+ Flow flow = reactiveGeode.flow(personRegionSettings, new PersonPdxSerializer());
+
+ Pair>> run = Source.from(Arrays.asList(120)).map((i)
+ -> new Person(i, String.format("Java flow %d", i), new Date())).via(flow).toMat(Sink.seq(), Keep.both()).run(materializer);
+
+ run.second().toCompletableFuture().get();
+
+
+ fut.toCompletableFuture().get();
+
+ reactiveGeode.close();
+
+ }
+
+
+}
\ No newline at end of file
diff --git a/geode/src/test/java/akka/stream/alpakka/geode/javadsl/GeodeFiniteSourceTestCase.java b/geode/src/test/java/akka/stream/alpakka/geode/javadsl/GeodeFiniteSourceTestCase.java
new file mode 100644
index 0000000000..946e20a054
--- /dev/null
+++ b/geode/src/test/java/akka/stream/alpakka/geode/javadsl/GeodeFiniteSourceTestCase.java
@@ -0,0 +1,40 @@
+/*
+ * Copyright (C) 2016-2017 Lightbend Inc.
+ */
+package akka.stream.alpakka.geode.javadsl;
+
+import akka.Done;
+import org.junit.Test;
+
+import java.util.concurrent.CompletionStage;
+import java.util.concurrent.ExecutionException;
+
+
+public class GeodeFiniteSourceTestCase extends GeodeBaseTestCase {
+
+ @Test
+ public void finiteSourceTest() throws ExecutionException, InterruptedException {
+
+ ReactiveGeode reactiveGeode = createReactiveGeode();
+
+ //#query
+ CompletionStage personsDone = reactiveGeode.query("select * from /persons", new PersonPdxSerializer())
+ .runForeach(p -> {
+ LOGGER.debug(p.toString());
+ }, materializer);
+ //#query
+
+ personsDone.toCompletableFuture().get();
+
+ CompletionStage animalsDone = reactiveGeode.query("select * from /animals", new AnimalPdxSerializer())
+ .runForeach(p -> {
+ LOGGER.debug(p.toString());
+ }, materializer);
+
+
+ animalsDone.toCompletableFuture().get();
+ reactiveGeode.close();
+
+ }
+
+}
\ No newline at end of file
diff --git a/geode/src/test/java/akka/stream/alpakka/geode/javadsl/GeodeFlowTestCase.java b/geode/src/test/java/akka/stream/alpakka/geode/javadsl/GeodeFlowTestCase.java
new file mode 100644
index 0000000000..1fff0a9bab
--- /dev/null
+++ b/geode/src/test/java/akka/stream/alpakka/geode/javadsl/GeodeFlowTestCase.java
@@ -0,0 +1,45 @@
+/*
+ * Copyright (C) 2016-2017 Lightbend Inc.
+ */
+package akka.stream.alpakka.geode.javadsl;
+
+import akka.NotUsed;
+import akka.stream.javadsl.Flow;
+import akka.stream.javadsl.Keep;
+import akka.stream.javadsl.Sink;
+import akka.stream.javadsl.Source;
+import org.junit.Test;
+
+import java.util.List;
+import java.util.concurrent.CompletionStage;
+import java.util.concurrent.ExecutionException;
+
+
+
+public class GeodeFlowTestCase extends GeodeBaseTestCase {
+
+
+ @Test
+ public void flow() throws ExecutionException, InterruptedException {
+
+ ReactiveGeode reactiveGeode = createReactiveGeode();
+
+ Source source = buildPersonsSource(110, 111, 113, 114, 115);
+
+ //#flow
+ Flow flow = reactiveGeode.flow(personRegionSettings, new PersonPdxSerializer());
+
+ CompletionStage> run = source
+ .via(flow)
+ .toMat(Sink.seq(), Keep.right())
+ .run(materializer);
+ //#flow
+
+ run.toCompletableFuture().get();
+
+ reactiveGeode.close();
+
+ }
+
+
+}
\ No newline at end of file
diff --git a/geode/src/test/java/akka/stream/alpakka/geode/javadsl/GeodeSinkTestCase.java b/geode/src/test/java/akka/stream/alpakka/geode/javadsl/GeodeSinkTestCase.java
new file mode 100644
index 0000000000..9dcbdb9cbb
--- /dev/null
+++ b/geode/src/test/java/akka/stream/alpakka/geode/javadsl/GeodeSinkTestCase.java
@@ -0,0 +1,64 @@
+/*
+ * Copyright (C) 2016-2017 Lightbend Inc.
+ */
+package akka.stream.alpakka.geode.javadsl;
+
+import akka.Done;
+import akka.NotUsed;
+import akka.stream.javadsl.Keep;
+import akka.stream.javadsl.RunnableGraph;
+import akka.stream.javadsl.Sink;
+import akka.stream.javadsl.Source;
+import org.junit.Test;
+
+import java.util.concurrent.CompletionStage;
+import java.util.concurrent.ExecutionException;
+
+
+public class GeodeSinkTestCase extends GeodeBaseTestCase {
+
+
+ @Test
+ public void sinkTest() throws ExecutionException, InterruptedException {
+
+ ReactiveGeode reactiveGeode = createReactiveGeode();
+
+ Sink> sink = reactiveGeode.sink(personRegionSettings, new PersonPdxSerializer());
+
+ Source source = buildPersonsSource(100, 101, 103, 104, 105);
+
+ RunnableGraph> runnableGraph = source
+ .toMat(sink, Keep.right());
+
+ CompletionStage stage = runnableGraph.run(materializer);
+
+ stage.toCompletableFuture().get();
+
+ reactiveGeode.close();
+
+ }
+
+ @Test
+ public void sinkAnimalTest() throws ExecutionException, InterruptedException {
+
+ ReactiveGeode reactiveGeode = createReactiveGeode();
+
+ Source source = buildAnimalsSource(100, 101, 103, 104, 105);
+
+ //#sink
+ Sink> sink = reactiveGeode.sink(animalRegionSettings, new AnimalPdxSerializer());
+
+ RunnableGraph> runnableGraph = source
+ .toMat(sink, Keep.right());
+ //#sink
+
+ CompletionStage stage = runnableGraph.run(materializer);
+
+ stage.toCompletableFuture().get();
+
+ reactiveGeode.close();
+
+ }
+
+
+}
\ No newline at end of file
diff --git a/geode/src/test/java/akka/stream/alpakka/geode/javadsl/Person.java b/geode/src/test/java/akka/stream/alpakka/geode/javadsl/Person.java
new file mode 100644
index 0000000000..a4853f0bb2
--- /dev/null
+++ b/geode/src/test/java/akka/stream/alpakka/geode/javadsl/Person.java
@@ -0,0 +1,36 @@
+/*
+ * Copyright (C) 2016-2017 Lightbend Inc.
+ */
+package akka.stream.alpakka.geode.javadsl;
+
+import java.util.Date;
+
+public class Person {
+ final private int id;
+ final private String name;
+ final private Date birthDate;
+
+
+ public Person(int id, String name, Date birthDate) {
+ this.id = id;
+ this.name = name;
+ this.birthDate = birthDate;
+ }
+
+ public int getId() {
+ return id;
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public Date getBirthDate() {
+ return birthDate;
+ }
+
+ @Override
+ public String toString() {
+ return getId() +": " + getName();
+ }
+}
diff --git a/geode/src/test/java/akka/stream/alpakka/geode/javadsl/PersonPdxSerializer.java b/geode/src/test/java/akka/stream/alpakka/geode/javadsl/PersonPdxSerializer.java
new file mode 100644
index 0000000000..82c1456f10
--- /dev/null
+++ b/geode/src/test/java/akka/stream/alpakka/geode/javadsl/PersonPdxSerializer.java
@@ -0,0 +1,40 @@
+/*
+ * Copyright (C) 2016-2017 Lightbend Inc.
+ */
+package akka.stream.alpakka.geode.javadsl;
+
+import akka.stream.alpakka.geode.AkkaPdxSerializer;
+import org.apache.geode.pdx.PdxReader;
+import org.apache.geode.pdx.PdxWriter;
+
+import java.util.Date;
+
+//#person-pdx-serializer
+public class PersonPdxSerializer implements AkkaPdxSerializer {
+
+ @Override
+ public Class clazz() {
+ return Person.class;
+ }
+
+ @Override
+ public boolean toData(Object o, PdxWriter out) {
+ if(o instanceof Person){
+ Person p = (Person)o;
+ out.writeInt("id", p.getId());
+ out.writeString("name", p.getName());
+ out.writeDate("birthDate", p.getBirthDate());
+ return true;
+ }
+ return false;
+ }
+
+ @Override
+ public Object fromData(Class> clazz, PdxReader in) {
+ int id = in.readInt("id");
+ String name = in.readString("name");
+ Date birthDate = in.readDate("birthDate");
+ return new Person(id, name, birthDate);
+ }
+}
+//#person-pdx-serializer
diff --git a/geode/src/test/resources/log4j2.xml b/geode/src/test/resources/log4j2.xml
new file mode 100644
index 0000000000..d75580b962
--- /dev/null
+++ b/geode/src/test/resources/log4j2.xml
@@ -0,0 +1,23 @@
+
+
+
+ [%level{lowerCase=true} %date{yyyy/MM/dd HH:mm:ss.SSS z} <%thread> tid=%tid] %message%n%throwable%n
+ true
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/geode/src/test/resources/logback.xml b/geode/src/test/resources/logback.xml
new file mode 100644
index 0000000000..c15c9b85a5
--- /dev/null
+++ b/geode/src/test/resources/logback.xml
@@ -0,0 +1,16 @@
+
+
+
+ System.out
+
+
+ %-6level[%logger{0}]: %msg%n
+
+
+
+
+
+
+
+
+
diff --git a/geode/src/test/scala/akka/stream/alpakka/geode/internal/pdx/PDXDecoderSpec.scala b/geode/src/test/scala/akka/stream/alpakka/geode/internal/pdx/PDXDecoderSpec.scala
new file mode 100644
index 0000000000..19d1205bb6
--- /dev/null
+++ b/geode/src/test/scala/akka/stream/alpakka/geode/internal/pdx/PDXDecoderSpec.scala
@@ -0,0 +1,47 @@
+/*
+ * Copyright (C) 2016-2017 Lightbend Inc.
+ */
+package akka.stream.alpakka.geode.internal.pdx
+
+import java.util.{Date, UUID}
+
+import org.scalatest.{Matchers, WordSpec}
+
+class PDXDecoderSpec extends WordSpec with Matchers {
+
+ "PDX decoder" should {
+ "decode primitive type" in {
+ PdxDecoder[Boolean]
+ PdxDecoder[Int]
+ PdxDecoder[List[Int]]
+ PdxDecoder[Array[Int]]
+ PdxDecoder[Long]
+ PdxDecoder[List[Long]]
+ PdxDecoder[Array[Long]]
+ PdxDecoder[Float]
+ PdxDecoder[List[Float]]
+ PdxDecoder[Array[Float]]
+ PdxDecoder[Double]
+ PdxDecoder[List[Double]]
+ PdxDecoder[Array[Double]]
+
+ PdxDecoder[Char]
+ PdxDecoder[List[Char]]
+ PdxDecoder[Array[Char]]
+ PdxDecoder[String]
+ PdxDecoder[List[String]]
+ PdxDecoder[Array[String]]
+
+ }
+
+ "decode basic types" in {
+ PdxDecoder[Date]
+ PdxDecoder[List[Date]]
+
+ PdxDecoder[UUID]
+ PdxDecoder[List[UUID]]
+
+ }
+ }
+
+}
diff --git a/geode/src/test/scala/akka/stream/alpakka/geode/internal/pdx/PDXEncodeSpec.scala b/geode/src/test/scala/akka/stream/alpakka/geode/internal/pdx/PDXEncodeSpec.scala
new file mode 100644
index 0000000000..02d73dd637
--- /dev/null
+++ b/geode/src/test/scala/akka/stream/alpakka/geode/internal/pdx/PDXEncodeSpec.scala
@@ -0,0 +1,36 @@
+/*
+ * Copyright (C) 2016-2017 Lightbend Inc.
+ */
+package akka.stream.alpakka.geode.internal.pdx
+
+import java.util.{Date, UUID}
+
+import org.scalatest.{Matchers, WordSpec}
+
+class PDXEncodeSpec extends WordSpec with Matchers {
+
+ "PDXEncoder" should {
+
+ "provides encoder for primitive types" in {
+ PdxEncoder[Boolean]
+ PdxEncoder[Int]
+ PdxEncoder[List[Int]]
+ PdxEncoder[Array[Int]]
+ PdxEncoder[Double]
+ PdxEncoder[List[Double]]
+ PdxEncoder[Array[Double]]
+ PdxEncoder[Float]
+ PdxEncoder[List[Float]]
+ PdxEncoder[Array[Float]]
+ PdxEncoder[Long]
+ PdxEncoder[Char]
+ PdxEncoder[String]
+
+ }
+
+ "provides encoder for basic types" in {
+ PdxEncoder[Date]
+ PdxEncoder[UUID]
+ }
+ }
+}
diff --git a/geode/src/test/scala/akka/stream/alpakka/geode/internal/pdx/PdxWriterMock.scala b/geode/src/test/scala/akka/stream/alpakka/geode/internal/pdx/PdxWriterMock.scala
new file mode 100644
index 0000000000..b1e88ccaae
--- /dev/null
+++ b/geode/src/test/scala/akka/stream/alpakka/geode/internal/pdx/PdxWriterMock.scala
@@ -0,0 +1,82 @@
+/*
+ * Copyright (C) 2016-2017 Lightbend Inc.
+ */
+package akka.stream.alpakka.geode.internal.pdx
+
+import java.util.Date
+
+import org.apache.geode.pdx.{PdxUnreadFields, PdxWriter}
+
+object PdxMocks {
+
+ class WriterMock extends PdxWriter {
+ override def writeShortArray(fieldName: String, value: Array[Short]) = { println("Write value"); this }
+
+ override def markIdentityField(fieldName: String) = { println(s"Write value"); this }
+
+ override def writeString(fieldName: String, value: String) = { println(s"Write $value"); this }
+
+ override def writeDate(fieldName: String, value: Date) = { println(s"Write $value"); this }
+
+ override def writeFloat(fieldName: String, value: Float) = { println(s"Write $value"); this }
+
+ override def writeCharArray(fieldName: String, value: Array[Char]) = { println(s"Write $value"); this }
+
+ override def writeDouble(fieldName: String, value: Double) = { println(s"Write $value"); this }
+
+ override def writeObjectArray(fieldName: String, value: Array[AnyRef]) = { println(s"Write $value"); this }
+
+ override def writeObjectArray(fieldName: String, value: Array[AnyRef], checkPortability: Boolean) = {
+ println(s"Write $value"); this
+ }
+
+ override def writeShort(fieldName: String, value: Short) = { println(s"Write $value"); this }
+
+ override def writeArrayOfByteArrays(fieldName: String, value: Array[Array[Byte]]) = {
+ println(s"Write $value"); this
+ }
+
+ override def writeField[CT, VT <: CT](fieldName: String, fieldValue: VT, fieldType: Class[CT]) = {
+ println(s"Write $fieldName"); this
+ }
+
+ override def writeField[CT, VT <: CT](fieldName: String,
+ fieldValue: VT,
+ fieldType: Class[CT],
+ checkPortability: Boolean) = { println(s"Write $fieldName"); this }
+
+ override def writeInt(fieldName: String, value: Int) = { println(s"Write $value"); this }
+
+ override def writeDoubleArray(fieldName: String, value: Array[Double]) = { println(s"Write $value"); this }
+
+ override def writeLongArray(fieldName: String, value: Array[Long]) = { println(s"Write $value"); this }
+
+ override def writeByteArray(fieldName: String, value: Array[Byte]) = { println(s"Write $value"); this }
+
+ override def writeBooleanArray(fieldName: String, value: Array[Boolean]) = { println(s"Write $value"); this }
+
+ override def writeIntArray(fieldName: String, value: Array[Int]) = { println(s"Write $value"); this }
+
+ override def writeBoolean(fieldName: String, value: Boolean) = { println(s"Write $value"); this }
+
+ override def writeStringArray(fieldName: String, value: Array[String]) = { println(s"Write $value"); this }
+
+ override def writeObject(fieldName: String, value: scala.Any) = { println(s"Write $value"); this }
+
+ override def writeObject(fieldName: String, value: scala.Any, checkPortability: Boolean) = {
+ println(s"Write $value"); this
+ }
+
+ override def writeFloatArray(fieldName: String, value: Array[Float]) = { println(s"Write $value"); this }
+
+ override def writeChar(fieldName: String, value: Char) = { println(s"Write $value"); this }
+
+ override def writeLong(fieldName: String, value: Long) = { println(s"Write $value"); this }
+
+ override def writeUnreadFields(unread: PdxUnreadFields) = { println(s"Write $unread"); this }
+
+ override def writeByte(fieldName: String, value: Byte) = { println(s"Write $value"); this }
+ }
+
+ implicit val writerMock = new WriterMock()
+}
diff --git a/geode/src/test/scala/akka/stream/alpakka/geode/scaladsl/GeodeBaseSpec.scala b/geode/src/test/scala/akka/stream/alpakka/geode/scaladsl/GeodeBaseSpec.scala
new file mode 100644
index 0000000000..3590bd0dc3
--- /dev/null
+++ b/geode/src/test/scala/akka/stream/alpakka/geode/scaladsl/GeodeBaseSpec.scala
@@ -0,0 +1,48 @@
+/*
+ * Copyright (C) 2016-2017 Lightbend Inc.
+ */
+package akka.stream.alpakka.geode.scaladsl
+
+import java.util.{Date, UUID}
+
+import akka.actor.ActorSystem
+import akka.stream.ActorMaterializer
+import akka.stream.alpakka.geode.{GeodeSettings, RegionSettings}
+import akka.stream.scaladsl.Source
+import org.scalatest.{BeforeAndAfterAll, Matchers, WordSpec}
+
+import scala.concurrent.Await
+import scala.concurrent.duration.DurationInt
+import scala.language.postfixOps
+
+class GeodeBaseSpec extends WordSpec with Matchers with BeforeAndAfterAll {
+
+ implicit val system = ActorSystem("test")
+ implicit val materializer = ActorMaterializer()
+
+ //#region
+ val personsRegionSettings = RegionSettings("persons", (p: Person) => p.id)
+ val animalsRegionSettings = RegionSettings("animals", (a: Animal) => a.id)
+ val complexesRegionSettings = RegionSettings("complexes", (a: Complex) => a.id)
+
+ //#region
+
+ /**
+ * Run IT test only if geode is available.
+ * @param f
+ */
+ def it(f: GeodeSettings => Unit): Unit =
+ f(GeodeSettings(sys.env.get("IT_GEODE_HOSTNAME").getOrElse("localhost")))
+
+ protected def buildPersonsSource(range: Range): Source[Person, Any] =
+ Source(range).map(i => Person(i, s"Person Scala $i", new Date()))
+
+ protected def buildAnimalsSource(range: Range): Source[Animal, Any] =
+ Source(range).map(i => Animal(i, s"Animal Scala $i", 1))
+
+ protected def buildComplexesSource(range: Range): Source[Complex, Any] =
+ Source(range).map(i => Complex(UUID.randomUUID(), List(1, 2, 3), List(new Date()), Set(UUID.randomUUID())))
+
+ override protected def afterAll(): Unit =
+ Await.result(system.terminate(), 10 seconds)
+}
diff --git a/geode/src/test/scala/akka/stream/alpakka/geode/scaladsl/GeodeContinuousSourceSpec.scala b/geode/src/test/scala/akka/stream/alpakka/geode/scaladsl/GeodeContinuousSourceSpec.scala
new file mode 100644
index 0000000000..805133ea87
--- /dev/null
+++ b/geode/src/test/scala/akka/stream/alpakka/geode/scaladsl/GeodeContinuousSourceSpec.scala
@@ -0,0 +1,56 @@
+/*
+ * Copyright (C) 2016-2017 Lightbend Inc.
+ */
+package akka.stream.alpakka.geode.scaladsl
+
+import akka.NotUsed
+import akka.stream.scaladsl.{Flow, Sink}
+import org.slf4j.LoggerFactory
+
+import scala.concurrent.Await
+import scala.concurrent.duration.DurationInt
+import scala.language.postfixOps
+
+class GeodeContinuousSourceSpec extends GeodeBaseSpec {
+
+ private val log = LoggerFactory.getLogger(classOf[GeodeContinuousSourceSpec])
+
+ "Geode continuousQuery" should {
+ it { geodeSettings =>
+ "retrieves continuously elements from geode" in {
+
+ //#connection-with-pool
+ val reactiveGeode = new ReactiveGeode(geodeSettings) with PoolSubscription
+ //#connection-with-pool
+
+ val flow: Flow[Person, Person, NotUsed] = reactiveGeode.flow(personsRegionSettings)
+
+ //#continuousQuery
+ val source =
+ reactiveGeode
+ .continuousQuery[Person]('test, s"select * from /persons")
+ .runWith(Sink.fold(0) { (c, p) =>
+ log.debug(s"$p $c")
+ if (c == 19) {
+ reactiveGeode.closeContinuousQuery('test).foreach { _ =>
+ log.debug("test cQuery is closed")
+ }
+
+ }
+ c + 1
+ })
+ //#continuousQuery
+
+ val f = buildPersonsSource(1 to 20)
+ .via(flow) //geode flow
+ .runWith(Sink.ignore)
+
+ Await.result(f, 10 seconds)
+
+ Await.result(source, 5 seconds)
+
+ reactiveGeode.close()
+ }
+ }
+ }
+}
diff --git a/geode/src/test/scala/akka/stream/alpakka/geode/scaladsl/GeodeFiniteSourceSpec.scala b/geode/src/test/scala/akka/stream/alpakka/geode/scaladsl/GeodeFiniteSourceSpec.scala
new file mode 100644
index 0000000000..b192dfb2db
--- /dev/null
+++ b/geode/src/test/scala/akka/stream/alpakka/geode/scaladsl/GeodeFiniteSourceSpec.scala
@@ -0,0 +1,49 @@
+/*
+ * Copyright (C) 2016-2017 Lightbend Inc.
+ */
+package akka.stream.alpakka.geode.scaladsl
+
+import akka.stream.scaladsl.Sink
+import org.slf4j.LoggerFactory
+
+import scala.concurrent.Await
+import scala.concurrent.duration.DurationInt
+import scala.language.postfixOps
+
+class GeodeFiniteSourceSpec extends GeodeBaseSpec {
+
+ private val log = LoggerFactory.getLogger(classOf[GeodeFiniteSourceSpec])
+
+ "Geode finite source" should {
+ it { geodeSettings =>
+ "retrieves finite elements from geode" in {
+
+ val reactiveGeode = new ReactiveGeode(geodeSettings)
+
+ //#query
+ val source =
+ reactiveGeode
+ .query[Person](s"select * from /persons order by id")
+ .runWith(Sink.foreach(e => log.debug(s"$e")))
+ //#query
+ Await.ready(source, 10 seconds)
+
+ val animals =
+ reactiveGeode
+ .query[Animal](s"select * from /animals order by id")
+ .runWith(Sink.foreach(e => log.debug(s"$e")))
+
+ Await.ready(animals, 10 seconds)
+
+ val complexes =
+ reactiveGeode
+ .query[Complex](s"select * from /complexes order by id")
+ .runWith(Sink.foreach(e => log.debug(s"$e")))
+
+ Await.ready(complexes, 10 seconds)
+
+ reactiveGeode.close()
+ }
+ }
+ }
+}
diff --git a/geode/src/test/scala/akka/stream/alpakka/geode/scaladsl/GeodeFlowSpec.scala b/geode/src/test/scala/akka/stream/alpakka/geode/scaladsl/GeodeFlowSpec.scala
new file mode 100644
index 0000000000..2276431cd4
--- /dev/null
+++ b/geode/src/test/scala/akka/stream/alpakka/geode/scaladsl/GeodeFlowSpec.scala
@@ -0,0 +1,47 @@
+/*
+ * Copyright (C) 2016-2017 Lightbend Inc.
+ */
+package akka.stream.alpakka.geode.scaladsl
+
+import akka.NotUsed
+import akka.stream.scaladsl.{Flow, Sink}
+
+import scala.concurrent.Await
+import scala.concurrent.duration.DurationInt
+import scala.language.postfixOps
+
+class GeodeFlowSpec extends GeodeBaseSpec {
+
+ "Reactive geode" should {
+ it { geodeSettings =>
+ "flow with shapeless pdx serializer" in {
+ //#connection
+ val reactiveGeode = new ReactiveGeode(geodeSettings)
+ //#connection
+
+ val source = buildPersonsSource(1 to 10)
+
+ //#flow
+ val flow: Flow[Person, Person, NotUsed] = reactiveGeode.flow(personsRegionSettings)
+
+ val fut = source.via(flow).runWith(Sink.ignore)
+ //#flow
+ Await.ready(fut, 10 seconds)
+
+ reactiveGeode.close()
+ }
+ "flow with explicit pdx serializer" in {
+ val reactiveGeode = new ReactiveGeode(geodeSettings)
+
+ val source = buildPersonsSource(1 to 20)
+
+ val flow: Flow[Person, Person, NotUsed] = reactiveGeode.flow(personsRegionSettings, PersonPdxSerializer)
+
+ val fut = source.via(flow).runWith(Sink.ignore)
+ Await.ready(fut, 10 seconds)
+
+ reactiveGeode.close()
+ }
+ }
+ }
+}
diff --git a/geode/src/test/scala/akka/stream/alpakka/geode/scaladsl/GeodeSinkSpec.scala b/geode/src/test/scala/akka/stream/alpakka/geode/scaladsl/GeodeSinkSpec.scala
new file mode 100644
index 0000000000..ba2576d444
--- /dev/null
+++ b/geode/src/test/scala/akka/stream/alpakka/geode/scaladsl/GeodeSinkSpec.scala
@@ -0,0 +1,64 @@
+/*
+ * Copyright (C) 2016-2017 Lightbend Inc.
+ */
+package akka.stream.alpakka.geode.scaladsl
+
+import scala.concurrent.Await
+import scala.concurrent.duration.DurationInt
+import scala.language.postfixOps
+
+class GeodeSinkSpec extends GeodeBaseSpec {
+
+ "Reactive geode sink" should {
+ it { geodeSettings =>
+ "stores persons in geode" in {
+
+ val reactiveGeode = new ReactiveGeode(geodeSettings) with PoolSubscription
+
+ val source = buildPersonsSource(30 to 40)
+
+ val sink = reactiveGeode.sink(personsRegionSettings)
+ val fut = source.runWith(sink)
+
+ Await.ready(fut, 5 seconds)
+
+ reactiveGeode.close()
+ }
+
+ "stores animals in geode" in {
+
+ val reactiveGeode = new ReactiveGeode(geodeSettings) with PoolSubscription
+
+ val source = buildAnimalsSource(1 to 40)
+
+ //#sink
+ val sink = reactiveGeode.sink(animalsRegionSettings)
+
+ val fut = source.runWith(sink)
+ //#sink
+ Await.ready(fut, 10 seconds)
+
+ reactiveGeode.close()
+ }
+
+ "stores complex in geode" in {
+
+ val reactiveGeode = new ReactiveGeode(geodeSettings) with PoolSubscription
+
+ val source = buildComplexesSource(1 to 40)
+
+ //#sink
+ val sink = reactiveGeode.sink(complexesRegionSettings)
+
+ val fut = source.runWith(sink)
+ //#sink
+ Await.ready(fut, 10 seconds)
+
+ reactiveGeode.close()
+ }
+
+ }
+
+ }
+
+}
diff --git a/geode/src/test/scala/akka/stream/alpakka/geode/scaladsl/Model.scala b/geode/src/test/scala/akka/stream/alpakka/geode/scaladsl/Model.scala
new file mode 100644
index 0000000000..2783095e55
--- /dev/null
+++ b/geode/src/test/scala/akka/stream/alpakka/geode/scaladsl/Model.scala
@@ -0,0 +1,11 @@
+/*
+ * Copyright (C) 2016-2017 Lightbend Inc.
+ */
+package akka.stream.alpakka.geode.scaladsl
+
+import java.util.{Date, UUID}
+
+case class Person(id: Int, name: String, birthDate: Date)
+case class Animal(id: Int, name: String, owner: Int)
+
+case class Complex(id: UUID, ints: List[Int], dates: List[Date], ids: Set[UUID] = Set())
diff --git a/geode/src/test/scala/akka/stream/alpakka/geode/scaladsl/PersonPdxSerializer.scala b/geode/src/test/scala/akka/stream/alpakka/geode/scaladsl/PersonPdxSerializer.scala
new file mode 100644
index 0000000000..9b2d3945f1
--- /dev/null
+++ b/geode/src/test/scala/akka/stream/alpakka/geode/scaladsl/PersonPdxSerializer.scala
@@ -0,0 +1,32 @@
+/*
+ * Copyright (C) 2016-2017 Lightbend Inc.
+ */
+package akka.stream.alpakka.geode.scaladsl
+
+import java.util.Date
+
+import akka.stream.alpakka.geode.AkkaPdxSerializer
+import org.apache.geode.pdx.{PdxReader, PdxWriter}
+
+//#person-pdx-serializer
+object PersonPdxSerializer extends AkkaPdxSerializer[Person] {
+ override def clazz: Class[Person] = classOf[Person]
+
+ override def toData(o: scala.Any, out: PdxWriter): Boolean =
+ if (o.isInstanceOf[Person]) {
+ val p = o.asInstanceOf[Person]
+ out.writeInt("id", p.id)
+ out.writeString("name", p.name)
+ out.writeDate("birthDate", p.birthDate)
+ true
+ } else
+ false
+
+ override def fromData(clazz: Class[_], in: PdxReader): AnyRef = {
+ val id: Int = in.readInt("id")
+ val name: String = in.readString("name")
+ val birthDate: Date = in.readDate("birthDate")
+ Person(id, name, birthDate)
+ }
+}
+//#person-pdx-serializer
diff --git a/project/Dependencies.scala b/project/Dependencies.scala
index 485b881205..864ea275f2 100644
--- a/project/Dependencies.scala
+++ b/project/Dependencies.scala
@@ -48,6 +48,21 @@ object Dependencies {
val Csv = Seq()
+ val Geode = {
+ val geodeVersion = "1.1.1"
+ Seq(
+ libraryDependencies ++= Seq("com.chuusai" %% "shapeless" % "2.3.2") ++
+ Seq("geode-core","geode-cq")
+ .map("org.apache.geode" % _ % geodeVersion exclude("org.slf4j", "slf4j-log4j12")) ++
+ Seq("org.slf4j" % "log4j-over-slf4j" % "1.7.21" % Test, // MIT like: http://www.slf4j.org/license.html
+ "org.slf4j" % "slf4j-api" % "1.7.21" % Test, // MIT like: http://www.slf4j.org/license.html
+ "ch.qos.logback" % "logback-classic" % "1.1.7" % Test, // Eclipse Public License 1.0: http://logback.qos.ch/license.html
+ "ch.qos.logback" % "logback-core" % "1.1.7" % Test // Eclipse Public License 1.0: http://logback.qos.ch/license.html
+ )
+
+ )
+ }
+
val HBase = {
val hbaseVersion = "1.2.4"
val hadoopVersion = "2.5.1"