Skip to content

Commit

Permalink
refactored SpEL processor
Browse files Browse the repository at this point in the history
  • Loading branch information
jruaux committed Mar 21, 2021
1 parent 48698b8 commit c4f25a9
Show file tree
Hide file tree
Showing 18 changed files with 132 additions and 120 deletions.
16 changes: 10 additions & 6 deletions README.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -23,34 +23,38 @@ image:https://snyk.io/test/github/redis-developer/riot/badge.svg?targetFile=buil
image:https://img.shields.io/github/license/redis-developer/riot.svg["License", link="https://github.com/redis-developer/riot"]
endif::[]

[.lead]
Redis Input/Output Tools (RIOT) is a series of utilities designed to help you get data in and out of Redis.

== {site-url}/db[RIOT DB]: SQL import/export
== {site-url}/db[RIOT DB]
.SQL import/export
[source,bash]
----
riot-db import "SELECT * FROM emp" --url jdbc:oracle:thin@host:1521:orcl hset -p emp -k id
----

== {site-url}/file[RIOT File]: File import/export
== {site-url}/file[RIOT File]
.File import/export
[source,bash]
----
riot-file import beers.csv --header hset --keyspace beer --keys id
----

== {site-url}/gen[RIOT Gen]: Data generator
== {site-url}/gen[RIOT Gen]
.Data generator
[source,bash]
----
riot-gen import fact=chuckNorris.fact sadd --keyspace facts --members fact
----

== {site-url}/redis[RIOT Redis]: Redis to Redis replication
== {site-url}/redis[RIOT Redis]
.Redis to Redis replication
[source,bash]
----
riot-redis --uri redis://source replicate --uri redis://target --mode live
----

== {site-url}/stream[RIOT Stream]: Streaming between Kafka and Redis
== {site-url}/stream[RIOT Stream]
.Streaming between Kafka and Redis
[source,bash]
----
riot-stream import topic --broker localhost:9092
Expand Down
4 changes: 2 additions & 2 deletions connectors/db/src/docs/asciidoc/index.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -59,10 +59,10 @@ Use the `import` command to import the result set of a SQL statement:
include::src/test/resources/import-postgresql[]
----

include::../../core/src/docs/asciidoc/processing.adoc[]

include::../../core/src/docs/asciidoc/redis-commands.adoc[]

include::../../core/src/docs/asciidoc/processing.adoc[]

== Exporting

Use the `export` command to export data from Redis to the database:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ protected Flow flow() throws Exception {
}

@Override
protected ItemProcessor<Map<String, Object>, Map<String, Object>> processor() {
return processingOptions.processor(connection);
protected ItemProcessor<Map<String, Object>, Map<String, Object>> processor() throws NoSuchMethodException {
return processingOptions.processor(client);
}
}
4 changes: 2 additions & 2 deletions connectors/file/src/docs/asciidoc/index.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -178,10 +178,10 @@ These files look like this:
include::src/docs/asciidoc/redis-dump.json[]
----

include::../../core/src/docs/asciidoc/processing.adoc[]

include::../../core/src/docs/asciidoc/redis-commands.adoc[]

include::../../core/src/docs/asciidoc/processing.adoc[]

== Exporting

Use the `export` command to export data from Redis to files:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ private FileType type(String file) {
return type;
}

@SuppressWarnings({"unchecked", "rawtypes"})
private AbstractItemStreamItemReader<Map<String, Object>> reader(String file, FileType fileType, Resource resource) {
switch (fileType) {
case DELIMITED:
Expand Down Expand Up @@ -145,8 +146,8 @@ private String delimiter(String file) {
}

@Override
protected ItemProcessor<Map<String, Object>, Map<String, Object>> processor() {
return processingOptions.processor(connection);
protected ItemProcessor<Map<String, Object>, Map<String, Object>> processor() throws NoSuchMethodException {
return processingOptions.processor(client);
}

private FlatFileItemReader<Map<String, Object>> flatFileReader(Resource resource, AbstractLineTokenizer tokenizer) {
Expand Down
4 changes: 2 additions & 2 deletions connectors/gen/src/docs/asciidoc/index.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,10 @@ include::src/test/resources/import-hset[]
include::src/test/resources/import-sadd[]
----

include::../../core/src/docs/asciidoc/processing.adoc[]

include::../../core/src/docs/asciidoc/redis-commands.adoc[]

include::../../core/src/docs/asciidoc/processing.adoc[]

=== Introspecting RediSearch

[source,bash]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ public class GenerateCommand extends AbstractImportCommand<Map<String, Object>,
private ProcessorOptions processingOptions = ProcessorOptions.builder().build();

@Override
protected Flow flow() {
protected Flow flow() throws Exception {
log.info("Creating Faker reader with {}", options);
Map<String, String> fields = options.getFakerFields() == null ? new LinkedHashMap<>() : new LinkedHashMap<>(options.getFakerFields());
if (options.getFakerIndex() != null) {
Expand Down Expand Up @@ -62,7 +62,7 @@ private Map<String, String> fieldsFromIndex(String index) {
}

@Override
protected ItemProcessor<Map<String, Object>, Map<String, Object>> processor() {
return processingOptions.processor(connection);
protected ItemProcessor<Map<String, Object>, Map<String, Object>> processor() throws NoSuchMethodException {
return processingOptions.processor(client);
}
}
2 changes: 1 addition & 1 deletion connectors/gen/src/test/resources/import-sadd
Original file line number Diff line number Diff line change
@@ -1 +1 @@
riot-gen import name="gameOfThrones.character" --max 10000 sadd --members name --keyspace got:characters
riot-gen import name="gameOfThrones.character" --max 10000 sadd --keyspace got:characters --members name
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ enum ReplicationStrategy {
public void afterPropertiesSet() throws Exception {
this.targetClient = targetRedisOptions.client();
this.targetPool = pool(targetRedisOptions, targetClient);
this.targetConnection = connection(targetClient);
this.targetConnection = RedisOptions.connection(targetClient);
super.afterPropertiesSet();
if (mode == ReplicationMode.LIVE || mode == ReplicationMode.LIVEONLY) {
this.pubSubConnection = pubSubConnection(client);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ public KafkaItemReader(Properties consumerProperties, String topicName, List<Int
Assert.isTrue(consumerProperties.containsKey(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG), ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG + " property must be provided");
this.consumerProperties = consumerProperties;
Assert.hasLength(topicName, "Topic name must not be null or empty");
Assert.isTrue(!partitions.isEmpty(), "At least one partition must be provided");
Assert.notEmpty(partitions, "At least one partition must be provided");
this.topicPartitions = new ArrayList<>();
for (Integer partition : partitions) {
this.topicPartitions.add(new TopicPartition(topicName, partition));
Expand Down
38 changes: 12 additions & 26 deletions core/src/docs/asciidoc/processing.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -8,41 +8,27 @@ The following processors can be applied to records in that order:
==== Transforms

Produce key/value pairs using https://docs.spring.io/spring/docs/current/spring-framework-reference/core.html#expressions[Spring Expression Language] (SpEL):
Transforms allow you to create/update/delete fields using the https://docs.spring.io/spring/docs/current/spring-framework-reference/core.html#expressions[Spring Expression Language] (SpEL):

[source,bash]
----
--process field1="<exp>" field2="<exp>" ...
----
* `field1='foo'` -> generate a field named `field1` containing the string `foo`
* `temp=(temp-32)*5/9` -> convert temperature from Fahrenheit to Celsius
* `name=remove(first).concat(remove(last))` -> concatenate `first` and `last` fields and delete them
* `field2=null` -> delete `field2`

For example `--process "field1='foo'"` generates a field named `field1` with always the same value `foo`.

Input fields are accessed by name (e.g. `field3=field1+field2`).

The processor also exposes the following variables that can be called with the `#` prefix:

* `redis`

Redis connection to issue any command
[source,bash]
----
--process name="#redis.hget('person1','lastName')"
----

* `date`

Date parser/formatter
[source,bash]
----
--process epoch="#date.parse(mydate).getTime()"
----
The transform processor also exposes functions and variables that can be accessed using the `#` prefix:

* `index`
`date`;; Date parser/formatter (https://docs.oracle.com/javase/7/docs/api/java/text/SimpleDateFormat.html[API doc])
`geo`;; Convenience method that takes a longitude and a latitude to produce a RediSearch geo-location string in the form: `longitude,latitude`
`index`;; Sequence number of the item being generated
`redis`;; Handle to invoke Redis commands (https://lettuce.io/core/release/api/io/lettuce/core/api/sync/RedisCommands.html[API doc])

Sequence number of the item being generated
.Processor Example
[source,bash]
----
--process "id=#index"
--process epoch="#date.parse(mydate).getTime()" location="#geo(lon,lat)" id="#index" name="#redis.hget('person1','lastName')"
----

==== Regular Expressions
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,12 @@ public abstract class AbstractImportCommand<I, O> extends AbstractTransferComman
@Getter
private final List<RedisCommand<O>> redisCommands = new ArrayList<>();

protected AbstractTaskletStepBuilder<SimpleStepBuilder<I, O>> step(String name, String taskName, ItemReader<I> reader) {
protected AbstractTaskletStepBuilder<SimpleStepBuilder<I, O>> step(String name, String taskName, ItemReader<I> reader) throws Exception {
StepBuilder<I, O> step = stepBuilder(name, taskName);
return step.reader(reader).processor(processor()).writer(writer()).build();
}

protected abstract ItemProcessor<I, O> processor();
protected abstract ItemProcessor<I, O> processor() throws Exception;

protected ItemWriter<O> writer() {
Assert.notNull(redisCommands, "RedisCommands not set");
Expand Down
45 changes: 36 additions & 9 deletions core/src/main/java/com/redislabs/riot/ProcessorOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,24 @@

import com.redislabs.riot.convert.RegexNamedGroupsExtractor;
import com.redislabs.riot.processor.FilteringProcessor;
import com.redislabs.riot.processor.MapAccessor;
import com.redislabs.riot.processor.MapProcessor;
import com.redislabs.riot.processor.SpelProcessor;
import io.lettuce.core.api.StatefulConnection;
import io.lettuce.core.AbstractRedisClient;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.NoArgsConstructor;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.support.CompositeItemProcessor;
import org.springframework.core.convert.converter.Converter;
import org.springframework.expression.EvaluationContext;
import org.springframework.expression.Expression;
import org.springframework.expression.spel.standard.SpelExpressionParser;
import org.springframework.expression.spel.support.StandardEvaluationContext;
import org.springframework.util.ObjectUtils;
import picocli.CommandLine.Option;

import java.lang.reflect.Method;
import java.text.SimpleDateFormat;
import java.util.*;

Expand All @@ -22,22 +28,22 @@
@AllArgsConstructor
public class ProcessorOptions {

@Option(arity = "1..*", names = "--process", description = "SpEL processors in the form: --process field1=\"<expression>\" field2=\"<expression>\"", paramLabel = "<f=exp>")
private Map<String, String> spelFields;
@Option(arity = "1..*", names = "--var", description = "Register a variable in the SpEL processor context", paramLabel = "<v=exp>")
@Option(arity = "1..*", names = "--process", description = "SpEL processing expressions in the form: <field>=\"<exp>\"", paramLabel = "<f=exp>")
private Map<String, Expression> spelFields;
@Option(arity = "1..*", names = "--var", description = "Register a variable in the SpEL processor context.", paramLabel = "<v=exp>")
private Map<String, String> variables;
@Builder.Default
@Option(names = "--date", description = "Processor date format (default: ${DEFAULT-VALUE})", paramLabel = "<string>")
@Option(names = "--date", description = "Processor date format (default: ${DEFAULT-VALUE}).", paramLabel = "<string>")
private String dateFormat = new SimpleDateFormat().toPattern();
@Option(arity = "1..*",names = "--filter", description = "Discard records using SpEL boolean expressions", paramLabel = "<exp>")
@Option(arity = "1..*", names = "--filter", description = "Discard records using SpEL boolean expressions.", paramLabel = "<exp>")
private String[] filters;
@Option(arity = "1..*", names = "--regex", description = "Extract named values from source field using regex", paramLabel = "<f=exp>")
@Option(arity = "1..*", names = "--regex", description = "Extract named values from source field using regex.", paramLabel = "<f=exp>")
private Map<String, String> regexes;

public ItemProcessor<Map<String, Object>, Map<String, Object>> processor(StatefulConnection<String, String> connection) {
public ItemProcessor<Map<String, Object>, Map<String, Object>> processor(AbstractRedisClient client) throws NoSuchMethodException {
List<ItemProcessor<Map<String, Object>, Map<String, Object>>> processors = new ArrayList<>();
if (!ObjectUtils.isEmpty(spelFields)) {
processors.add(new SpelProcessor(connection, new SimpleDateFormat(dateFormat), variables, spelFields));
processors.add(new SpelProcessor(context(client), spelFields));
}
if (!ObjectUtils.isEmpty(regexes)) {
Map<String, Converter<String, Map<String, String>>> fields = new LinkedHashMap<>();
Expand All @@ -58,4 +64,25 @@ public ItemProcessor<Map<String, Object>, Map<String, Object>> processor(Statefu
return compositeItemProcessor;
}

private EvaluationContext context(AbstractRedisClient client) throws NoSuchMethodException {
StandardEvaluationContext context = new StandardEvaluationContext();
context.setVariable("date", new SimpleDateFormat(dateFormat));
context.setVariable("redis", RedisOptions.commands(client));
SpelExpressionParser parser = new SpelExpressionParser();
if (variables != null) {
variables.forEach((k, v) -> context.setVariable(k, parser.parseExpression(v).getValue(context)));
}
Method geoMethod = getClass().getDeclaredMethod("geo", String.class, String.class);
context.registerFunction("geo", geoMethod);
context.setPropertyAccessors(Collections.singletonList(new MapAccessor()));
return context;
}

public static String geo(String longitude, String latitude) {
if (longitude == null || latitude == null) {
return null;
}
return longitude + "," + latitude;
}

}
18 changes: 17 additions & 1 deletion core/src/main/java/com/redislabs/riot/RedisOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import io.lettuce.core.*;
import io.lettuce.core.api.StatefulConnection;
import io.lettuce.core.api.sync.BaseRedisCommands;
import io.lettuce.core.cluster.ClusterClientOptions;
import io.lettuce.core.cluster.RedisClusterClient;
import io.lettuce.core.event.DefaultEventPublisherOptions;
Expand Down Expand Up @@ -87,6 +88,13 @@ public class RedisOptions {
@Option(names = "--client", description = "Client name used to connect to Redis.", paramLabel = "<name>")
private String clientName;

public static BaseRedisCommands<String, String> commands(AbstractRedisClient client) {
if (client instanceof RedisClusterClient) {
return ((RedisClusterClient) client).connect().sync();
}
return ((RedisClient) client).connect().sync();
}

public List<RedisURI> uris() {
List<RedisURI> redisURIs = new ArrayList<>();
if (ObjectUtils.isEmpty(uris)) {
Expand Down Expand Up @@ -163,7 +171,7 @@ public RedisClient redisClient() {
return client;
}

public <T extends StatefulConnection<String, String>> GenericObjectPoolConfig<T> poolConfig() {
public <T> GenericObjectPoolConfig<T> poolConfig() {
GenericObjectPoolConfig<T> config = new GenericObjectPoolConfig<>();
config.setMaxTotal(poolMaxTotal);
return config;
Expand All @@ -183,4 +191,12 @@ public <B extends CommandTimeoutBuilder<B>> B configureCommandTimeout(B builder)
log.info("Setting {} command timeout to {}", ClassUtils.getShortName(builder.getClass()), commandTimeout);
return builder.commandTimeout(commandTimeout);
}

public static StatefulConnection<String, String> connection(AbstractRedisClient client) {
if (client instanceof RedisClusterClient) {
return ((RedisClusterClient) client).connect();
}
return ((RedisClient) client).connect();
}

}
15 changes: 14 additions & 1 deletion core/src/main/java/com/redislabs/riot/RiotApp.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
import io.netty.util.internal.logging.JdkLoggerFactory;
import lombok.Getter;
import org.springframework.core.NestedExceptionUtils;
import org.springframework.expression.Expression;
import org.springframework.expression.spel.standard.SpelExpressionParser;
import picocli.AutoComplete;
import picocli.CommandLine;
import picocli.CommandLine.*;
Expand Down Expand Up @@ -197,8 +199,19 @@ public RedisURI convert(String value) {

}

static class ExpressionConverter implements CommandLine.ITypeConverter<Expression> {

private final SpelExpressionParser parser = new SpelExpressionParser();

@Override
public Expression convert(String value) throws Exception {
return parser.parseExpression(value);
}
}

protected void registerConverters(CommandLine commandLine) {
commandLine.registerConverter(io.lettuce.core.RedisURI.class, new RedisURIConverter());
commandLine.registerConverter(RedisURI.class, new RedisURIConverter());
commandLine.registerConverter(Expression.class, new ExpressionConverter());
}

/**
Expand Down
Loading

0 comments on commit c4f25a9

Please sign in to comment.