Skip to content

Commit

Permalink
Initial commit for apache geode connector
Browse files Browse the repository at this point in the history
  • Loading branch information
cheleb committed Jun 2, 2017
1 parent 3993ce7 commit d9dec58
Show file tree
Hide file tree
Showing 45 changed files with 2,385 additions and 0 deletions.
10 changes: 10 additions & 0 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ lazy val alpakka = project
dynamodb,
files,
ftp,
geode,
googleCloudPubSub,
hbase,
ironmq,
Expand Down Expand Up @@ -98,6 +99,15 @@ lazy val ftp = project
parallelExecution in Test := false
)

lazy val geode = project
.enablePlugins(AutomateHeaderPlugin)
.settings(
name := "akka-stream-alpakka-geode",
Dependencies.Geode,
fork in Test := true,
parallelExecution in Test := false
)

lazy val googleCloudPubSub = project
.in(file("google-cloud-pub-sub"))
.enablePlugins(AutomateHeaderPlugin)
Expand Down
20 changes: 20 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,26 @@ services:
image: deangiberson/aws-dynamodb-local
ports:
- "8001:8000"
geode:
container_name: geode
image: apachegeode/geode
hostname: geode
mem_limit: 2g
expose:
- "10334"
- "1099"
- "7575"
- "40404"
ports:
- "1099:1099"
- "10334:10334"
- "7575:7575"
- "7070:7070"
- "40404:40404"
- "8081:8080"
volumes:
- ./geode/scripts/:/scripts/
command: /scripts/geode.sh
ironauth:
image: iron/auth
ports:
Expand Down
1 change: 1 addition & 0 deletions docs/src/main/paradox/connectors.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
@@@ index

* [AMQP Connector](amqp.md)
* [Apache Geode Connector](geode.md)
* [AWS DynamoDB Connector](dynamodb.md)
* [AWS Lambda Connector](awslambda.md)
* [AWS S3 Connector](s3.md)
Expand Down
180 changes: 180 additions & 0 deletions docs/src/main/paradox/geode.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,180 @@
#Apache Geode connector

[Apache Geode](http://geode.apache.org) is a distributed datagrid (ex Gemfire).

This connector provides flow and a sink to put element in and source to retrieve element from geode.

Basically it can store data as key, value. Key and value must be serialized, more on this later.

## Artifacts

sbt
: @@@vars
```scala
libraryDependencies += "com.lightbend.akka" %% "akka-stream-alpakka-geode" % "$version$"
```
@@@

Maven
: @@@vars
```xml
<dependency>
<groupId>com.lightbend.akka</groupId>
<artifactId>akka-stream-alpakka-geode_$scala.binaryVersion$</artifactId>
<version>$version$</version>
</dependency>
```
@@@

Gradle
: @@@vars
```gradle
dependencies {
compile group: "com.lightbend.akka", name: "akka-stream-alpakka-geode_$scala.binaryVersion$", version: "$version$"
}
```
@@@


#Usage

##Connection

First of all you need to connect to the geode cache. In a client application, connection is handle by a
[ClientCache](https://geode.apache.org/docs/guide/11/basic_config/the_cache/managing_a_client_cache.html). A single
ClientCache per application is enough. ClientCache also holds a single PDXSerializer.

scala
: @@snip (../../../../geode/src/test/scala/akka/stream/alpakka/geode/scaladsl/GeodeFlowSpec.scala) { #connection }

java
: @@snip (../../../../geode/src/test/java/akka/stream/alpakka/geode/javadsl/GeodeBaseTestCase.java) { #connection }

Apache Geode supports continuous queries. Continuous query relies on server event, thus reactive geode needs to listen to
those event. This behaviour, as it consumes more resources is isolated in a scala trait and/or an specialized java class.

scala
: @@snip (../../../../geode/src/test/scala/akka/stream/alpakka/geode/scaladsl/GeodeContinuousSourceSpec.scala) { #connection-with-pool }

java
: @@snip (../../../../geode/src/test/java/akka/stream/alpakka/geode/javadsl/GeodeBaseTestCase.java) { #connection-with-pool }

##Region

Define a [region](https://geode.apache.org/docs/guide/11/basic_config/data_regions/chapter_overview.html) setting to
describe how to access region and the key extraction function.

scala
: @@snip (../../../../geode/src/test/scala/akka/stream/alpakka/geode/scaladsl/GeodeBaseSpec.scala) { #region }

java
: @@snip (../../../../geode/src/test/java/akka/stream/alpakka/geode/javadsl/GeodeBaseTestCase.java) { #region }


###Serialization

Object must be serialized to flow in a geode region.

* opaque format (eq json/xml)
* java serialisation
* pdx geode format

PDX format is the only one supported.

PDXEncoder support many options, see [gemfire_pdx_serialization.html](http://geode.apache.org/docs/guide/11/developing/data_serialization/gemfire_pdx_serialization.html)

PdxSerializer must be provided to geode when reading or writing to a region.

scala
: @@snip (../../../../geode/src/test/scala/akka/stream/alpakka/geode/scaladsl/PersonPdxSerializer.scala) { #person-pdx-serializer }

java
: @@snip (../../../../geode/src/test/java/akka/stream/alpakka/geode/javadsl/PersonPdxSerializer.java) { #person-pdx-serializer }



This project provides a generic solution for scala user based on [shapeless](https://github.com/milessabin/shapeless), then case classe serializer if not provided will be generated compile time.
Java user will need to write by hand their custom serializer.


Runtime reflection is also an option see [auto_serialization.html](http://geode.apache.org/docs/guide/11/developing/data_serialization/auto_serialization.html).

###Flow usage

This sample stores (case) classes in Geode.

scala
: @@snip (../../../../geode/src/test/scala/akka/stream/alpakka/geode/scaladsl/GeodeFlowSpec.scala) { #flow }

java
: @@snip (../../../../geode/src/test/java/akka/stream/alpakka/geode/javadsl/GeodeFlowTestCase.java) { #flow }


###Sink usage

scala
: @@snip (../../../../geode/src/test/scala/akka/stream/alpakka/geode/scaladsl/GeodeSinkSpec.scala) { #sink }

java
: @@snip (../../../../geode/src/test/java/akka/stream/alpakka/geode/javadsl/GeodeSinkTestCase.java) { #sink }


###Source usage

####Simple query

Apache Geode support simple queries.

scala
: @@snip (../../../../geode/src/test/scala/akka/stream/alpakka/geode/scaladsl/GeodeFiniteSourceSpec.scala) { #query }

java
: @@snip (../../../../geode/src/test/java/akka/stream/alpakka/geode/javadsl/GeodeFiniteSourceTestCase.java) { #query }


####Continuous query


scala
: @@snip (../../../../geode/src/test/scala/akka/stream/alpakka/geode/scaladsl/GeodeContinuousSourceSpec.scala) { #continuousQuery }

java
: @@snip (../../../../geode/src/test/java/akka/stream/alpakka/geode/javadsl/GeodeContinuousSourceTestCase.java) { #continuousQuery }


##Geode basic command:

Assuming Apache geode is installed:

```
gfsh
```

From the geode shell:

```
start locator --name=locator
configure pdx --read-serialized=true
start server --name=server
create region --name=animals --type=PARTITION_REDUNDANT --redundant-copies=2
create region --name=persons --type=PARTITION_REDUNDANT --redundant-copies=2
```

###Run the test

Integration test are run against localhost geode, but IT_GEODE_HOSTNAME environment variable can change this:

```bash
export IT_GEODE_HOSTNAME=geode-host-locator

sbt
```

From sbt shell

```sbtshell
project geode
test
```
29 changes: 29 additions & 0 deletions geode/scripts/cache.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
<?xml version="1.0" encoding="UTF-8"?>
<cache
xmlns="http://geode.apache.org/schema/cache"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://geode.apache.org/schema/cache http://geode.apache.org/schema/cache/cache-1.0.xsd"
version="1.0">
<cache-server hostname-for-clients="localhost" />

<region name="persons">
<region-attributes refid="PARTITION_REDUNDANT" >
<partition-attributes redundant-copies="2" />
</region-attributes>
</region>

<region name="animals">
<region-attributes refid="PARTITION_REDUNDANT" >
<partition-attributes redundant-copies="2" />
</region-attributes>
</region>

<!-- This last region is only used in scala test with shapeless serializer-->
<region name="complexes">
<region-attributes refid="PARTITION_REDUNDANT" >
<partition-attributes redundant-copies="2" />
</region-attributes>
</region>


</cache>
13 changes: 13 additions & 0 deletions geode/scripts/geode.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
#!/bin/bash

rm -rf /geode
mkdir -p /geode/locator
mkdir -p /geode/server


gfsh -e "start locator --name=$HOSTNAME-locator --dir=/geode/locator --mcast-port=0 --hostname-for-clients=0.0.0.0" -e "start server --name=$HOSTNAME-server --locators=localhost[10334] --dir=/geode/server --server-port=40404 --max-heap=1G --hostname-for-clients=0.0.0.0 --cache-xml-file=/scripts/cache.xml"

while true; do
sleep 10
done

2 changes: 2 additions & 0 deletions geode/scripts/start-local.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
gfsh -e"start locator --name=locator" -e"configure pdx --read-serialized=true" -e "start server --name=server"

Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
/*
* Copyright (C) 2016-2017 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.stream.alpakka.geode.javadsl;

import akka.Done;
import akka.NotUsed;
import akka.stream.alpakka.geode.AkkaPdxSerializer;
import akka.stream.alpakka.geode.GeodeSettings;
import akka.stream.alpakka.geode.RegionSettings;
import akka.stream.alpakka.geode.internal.GeodeCache;
import akka.stream.alpakka.geode.internal.stage.GeodeContinuousSourceStage;
import akka.stream.alpakka.geode.internal.stage.GeodeFiniteSourceStage;
import akka.stream.alpakka.geode.internal.stage.GeodeFlowStage;
import akka.stream.javadsl.Flow;
import akka.stream.javadsl.Keep;
import akka.stream.javadsl.Sink;
import akka.stream.javadsl.Source;
import org.apache.geode.cache.client.ClientCacheFactory;
import org.apache.geode.cache.query.CqException;
import org.apache.geode.cache.query.CqQuery;
import org.apache.geode.cache.query.QueryService;
import scala.Symbol;
import scala.concurrent.Future;

import java.util.concurrent.CompletionStage;


/**
* Reactive geode without server event subscription. Cannot build continuous source.
*/
public class ReactiveGeode extends GeodeCache {

final GeodeSettings geodeSettings;

public ReactiveGeode(GeodeSettings settings) {
super(settings);
this.geodeSettings = settings;
}

@Override
public ClientCacheFactory configure(ClientCacheFactory factory) {
return factory.addPoolLocator(geodeSettings.hostname(), geodeSettings.port());
}

public <V> Source<V, Future<Done>> query( String query, AkkaPdxSerializer<V> serializer) {

registerPDXSerializer(serializer, serializer.clazz());
return Source.fromGraph(new GeodeFiniteSourceStage<V>(cache(), query));
}


public <K, V> Flow<V, V, NotUsed> flow(RegionSettings<K, V> regionSettings, AkkaPdxSerializer<V> serializer) {

registerPDXSerializer(serializer, serializer.clazz());

return Flow.fromGraph(new GeodeFlowStage<K, V>(cache(), regionSettings));

}

public <K, V> Sink<V, CompletionStage<Done>> sink(RegionSettings<K, V> regionSettings, AkkaPdxSerializer<V> serializer) {
return flow(regionSettings, serializer)
.toMat(Sink.ignore(), Keep.right());
}

public void close() {
close(false);
}


}

/**
* Reactive geode with server event subscription. Can build continuous source.
*/
class ReactiveGeodeWithPoolSubscription extends ReactiveGeode {

/**
* Subscribes to server event.
* @return ClientCacheFactory with server event subscription.
*/
final public ClientCacheFactory configure(ClientCacheFactory factory) {
return super.configure(factory).setPoolSubscriptionEnabled(true);
}

public ReactiveGeodeWithPoolSubscription(GeodeSettings settings) {
super(settings);
}

public <V> Source<V, Future<Done>> continuousQuery(String queryName, String query, AkkaPdxSerializer<V> serializer) {

registerPDXSerializer(serializer, serializer.clazz());
return Source.fromGraph(new GeodeContinuousSourceStage<V>(cache(), Symbol.apply(queryName), query));
}

public boolean closeContinuousQuery(String name) throws CqException {

QueryService qs = cache().getQueryService();
CqQuery query = qs.getCq(name);
if (query == null)
return false;
query.close();
return true;

}

}
Loading

0 comments on commit d9dec58

Please sign in to comment.