From c4f25a9e590f75c6b684ab49eca4780db3ab2648 Mon Sep 17 00:00:00 2001 From: jruaux Date: Sat, 20 Mar 2021 22:18:25 -0700 Subject: [PATCH] refactored SpEL processor --- README.adoc | 16 +++-- connectors/db/src/docs/asciidoc/index.adoc | 4 +- .../riot/db/DatabaseImportCommand.java | 4 +- connectors/file/src/docs/asciidoc/index.adoc | 4 +- .../riot/file/FileImportCommand.java | 5 +- connectors/gen/src/docs/asciidoc/index.adoc | 4 +- .../redislabs/riot/gen/GenerateCommand.java | 6 +- connectors/gen/src/test/resources/import-sadd | 2 +- .../riot/redis/ReplicateCommand.java | 2 +- .../riot/stream/kafka/KafkaItemReader.java | 2 +- core/src/docs/asciidoc/processing.adoc | 38 ++++------ .../redislabs/riot/AbstractImportCommand.java | 4 +- .../com/redislabs/riot/ProcessorOptions.java | 45 +++++++++--- .../java/com/redislabs/riot/RedisOptions.java | 18 ++++- .../main/java/com/redislabs/riot/RiotApp.java | 15 +++- .../java/com/redislabs/riot/RiotCommand.java | 10 +-- .../riot/convert/MapFilteringConverter.java | 3 +- .../riot/processor/SpelProcessor.java | 70 ++++++------------- 18 files changed, 132 insertions(+), 120 deletions(-) diff --git a/README.adoc b/README.adoc index 217da6605..fbf1cc968 100644 --- a/README.adoc +++ b/README.adoc @@ -23,34 +23,38 @@ image:https://snyk.io/test/github/redis-developer/riot/badge.svg?targetFile=buil image:https://img.shields.io/github/license/redis-developer/riot.svg["License", link="https://github.com/redis-developer/riot"] endif::[] -[.lead] Redis Input/Output Tools (RIOT) is a series of utilities designed to help you get data in and out of Redis. -== {site-url}/db[RIOT DB]: SQL import/export +== {site-url}/db[RIOT DB] +.SQL import/export [source,bash] ---- riot-db import "SELECT * FROM emp" --url jdbc:oracle:thin@host:1521:orcl hset -p emp -k id ---- -== {site-url}/file[RIOT File]: File import/export +== {site-url}/file[RIOT File] +.File import/export [source,bash] ---- riot-file import beers.csv --header hset --keyspace beer --keys id ---- -== {site-url}/gen[RIOT Gen]: Data generator +== {site-url}/gen[RIOT Gen] +.Data generator [source,bash] ---- riot-gen import fact=chuckNorris.fact sadd --keyspace facts --members fact ---- -== {site-url}/redis[RIOT Redis]: Redis to Redis replication +== {site-url}/redis[RIOT Redis] +.Redis to Redis replication [source,bash] ---- riot-redis --uri redis://source replicate --uri redis://target --mode live ---- -== {site-url}/stream[RIOT Stream]: Streaming between Kafka and Redis +== {site-url}/stream[RIOT Stream] +.Streaming between Kafka and Redis [source,bash] ---- riot-stream import topic --broker localhost:9092 diff --git a/connectors/db/src/docs/asciidoc/index.adoc b/connectors/db/src/docs/asciidoc/index.adoc index 7597654e5..58ebb31e1 100644 --- a/connectors/db/src/docs/asciidoc/index.adoc +++ b/connectors/db/src/docs/asciidoc/index.adoc @@ -59,10 +59,10 @@ Use the `import` command to import the result set of a SQL statement: include::src/test/resources/import-postgresql[] ---- -include::../../core/src/docs/asciidoc/processing.adoc[] - include::../../core/src/docs/asciidoc/redis-commands.adoc[] +include::../../core/src/docs/asciidoc/processing.adoc[] + == Exporting Use the `export` command to export data from Redis to the database: diff --git a/connectors/db/src/main/java/com/redislabs/riot/db/DatabaseImportCommand.java b/connectors/db/src/main/java/com/redislabs/riot/db/DatabaseImportCommand.java index 24370869b..9df82d661 100644 --- a/connectors/db/src/main/java/com/redislabs/riot/db/DatabaseImportCommand.java +++ b/connectors/db/src/main/java/com/redislabs/riot/db/DatabaseImportCommand.java @@ -54,7 +54,7 @@ protected Flow flow() throws Exception { } @Override - protected ItemProcessor, Map> processor() { - return processingOptions.processor(connection); + protected ItemProcessor, Map> processor() throws NoSuchMethodException { + return processingOptions.processor(client); } } diff --git a/connectors/file/src/docs/asciidoc/index.adoc b/connectors/file/src/docs/asciidoc/index.adoc index 7e547cce5..1581eb6e4 100644 --- a/connectors/file/src/docs/asciidoc/index.adoc +++ b/connectors/file/src/docs/asciidoc/index.adoc @@ -178,10 +178,10 @@ These files look like this: include::src/docs/asciidoc/redis-dump.json[] ---- -include::../../core/src/docs/asciidoc/processing.adoc[] - include::../../core/src/docs/asciidoc/redis-commands.adoc[] +include::../../core/src/docs/asciidoc/processing.adoc[] + == Exporting Use the `export` command to export data from Redis to files: diff --git a/connectors/file/src/main/java/com/redislabs/riot/file/FileImportCommand.java b/connectors/file/src/main/java/com/redislabs/riot/file/FileImportCommand.java index bc3b570f2..316f07552 100644 --- a/connectors/file/src/main/java/com/redislabs/riot/file/FileImportCommand.java +++ b/connectors/file/src/main/java/com/redislabs/riot/file/FileImportCommand.java @@ -94,6 +94,7 @@ private FileType type(String file) { return type; } + @SuppressWarnings({"unchecked", "rawtypes"}) private AbstractItemStreamItemReader> reader(String file, FileType fileType, Resource resource) { switch (fileType) { case DELIMITED: @@ -145,8 +146,8 @@ private String delimiter(String file) { } @Override - protected ItemProcessor, Map> processor() { - return processingOptions.processor(connection); + protected ItemProcessor, Map> processor() throws NoSuchMethodException { + return processingOptions.processor(client); } private FlatFileItemReader> flatFileReader(Resource resource, AbstractLineTokenizer tokenizer) { diff --git a/connectors/gen/src/docs/asciidoc/index.adoc b/connectors/gen/src/docs/asciidoc/index.adoc index b8e1b3d09..ee23cd134 100644 --- a/connectors/gen/src/docs/asciidoc/index.adoc +++ b/connectors/gen/src/docs/asciidoc/index.adoc @@ -36,10 +36,10 @@ include::src/test/resources/import-hset[] include::src/test/resources/import-sadd[] ---- -include::../../core/src/docs/asciidoc/processing.adoc[] - include::../../core/src/docs/asciidoc/redis-commands.adoc[] +include::../../core/src/docs/asciidoc/processing.adoc[] + === Introspecting RediSearch [source,bash] diff --git a/connectors/gen/src/main/java/com/redislabs/riot/gen/GenerateCommand.java b/connectors/gen/src/main/java/com/redislabs/riot/gen/GenerateCommand.java index 4f9600a4d..05ae483c1 100644 --- a/connectors/gen/src/main/java/com/redislabs/riot/gen/GenerateCommand.java +++ b/connectors/gen/src/main/java/com/redislabs/riot/gen/GenerateCommand.java @@ -22,7 +22,7 @@ public class GenerateCommand extends AbstractImportCommand, private ProcessorOptions processingOptions = ProcessorOptions.builder().build(); @Override - protected Flow flow() { + protected Flow flow() throws Exception { log.info("Creating Faker reader with {}", options); Map fields = options.getFakerFields() == null ? new LinkedHashMap<>() : new LinkedHashMap<>(options.getFakerFields()); if (options.getFakerIndex() != null) { @@ -62,7 +62,7 @@ private Map fieldsFromIndex(String index) { } @Override - protected ItemProcessor, Map> processor() { - return processingOptions.processor(connection); + protected ItemProcessor, Map> processor() throws NoSuchMethodException { + return processingOptions.processor(client); } } diff --git a/connectors/gen/src/test/resources/import-sadd b/connectors/gen/src/test/resources/import-sadd index 7f70ea772..a8855fa14 100644 --- a/connectors/gen/src/test/resources/import-sadd +++ b/connectors/gen/src/test/resources/import-sadd @@ -1 +1 @@ -riot-gen import name="gameOfThrones.character" --max 10000 sadd --members name --keyspace got:characters \ No newline at end of file +riot-gen import name="gameOfThrones.character" --max 10000 sadd --keyspace got:characters --members name \ No newline at end of file diff --git a/connectors/redis/src/main/java/com/redislabs/riot/redis/ReplicateCommand.java b/connectors/redis/src/main/java/com/redislabs/riot/redis/ReplicateCommand.java index 8d5768187..ead93e724 100644 --- a/connectors/redis/src/main/java/com/redislabs/riot/redis/ReplicateCommand.java +++ b/connectors/redis/src/main/java/com/redislabs/riot/redis/ReplicateCommand.java @@ -73,7 +73,7 @@ enum ReplicationStrategy { public void afterPropertiesSet() throws Exception { this.targetClient = targetRedisOptions.client(); this.targetPool = pool(targetRedisOptions, targetClient); - this.targetConnection = connection(targetClient); + this.targetConnection = RedisOptions.connection(targetClient); super.afterPropertiesSet(); if (mode == ReplicationMode.LIVE || mode == ReplicationMode.LIVEONLY) { this.pubSubConnection = pubSubConnection(client); diff --git a/connectors/stream/src/main/java/com/redislabs/riot/stream/kafka/KafkaItemReader.java b/connectors/stream/src/main/java/com/redislabs/riot/stream/kafka/KafkaItemReader.java index 13dda9ebf..a29babff7 100644 --- a/connectors/stream/src/main/java/com/redislabs/riot/stream/kafka/KafkaItemReader.java +++ b/connectors/stream/src/main/java/com/redislabs/riot/stream/kafka/KafkaItemReader.java @@ -77,7 +77,7 @@ public KafkaItemReader(Properties consumerProperties, String topicName, List(); for (Integer partition : partitions) { this.topicPartitions.add(new TopicPartition(topicName, partition)); diff --git a/core/src/docs/asciidoc/processing.adoc b/core/src/docs/asciidoc/processing.adoc index 9db227e39..12a6564ab 100644 --- a/core/src/docs/asciidoc/processing.adoc +++ b/core/src/docs/asciidoc/processing.adoc @@ -8,41 +8,27 @@ The following processors can be applied to records in that order: ==== Transforms -Produce key/value pairs using https://docs.spring.io/spring/docs/current/spring-framework-reference/core.html#expressions[Spring Expression Language] (SpEL): +Transforms allow you to create/update/delete fields using the https://docs.spring.io/spring/docs/current/spring-framework-reference/core.html#expressions[Spring Expression Language] (SpEL): -[source,bash] ----- ---process field1="" field2="" ... ----- +* `field1='foo'` -> generate a field named `field1` containing the string `foo` +* `temp=(temp-32)*5/9` -> convert temperature from Fahrenheit to Celsius +* `name=remove(first).concat(remove(last))` -> concatenate `first` and `last` fields and delete them +* `field2=null` -> delete `field2` -For example `--process "field1='foo'"` generates a field named `field1` with always the same value `foo`. Input fields are accessed by name (e.g. `field3=field1+field2`). -The processor also exposes the following variables that can be called with the `#` prefix: - -* `redis` - -Redis connection to issue any command -[source,bash] ----- ---process name="#redis.hget('person1','lastName')" ----- - -* `date` - -Date parser/formatter -[source,bash] ----- ---process epoch="#date.parse(mydate).getTime()" ----- +The transform processor also exposes functions and variables that can be accessed using the `#` prefix: -* `index` +`date`;; Date parser/formatter (https://docs.oracle.com/javase/7/docs/api/java/text/SimpleDateFormat.html[API doc]) +`geo`;; Convenience method that takes a longitude and a latitude to produce a RediSearch geo-location string in the form: `longitude,latitude` +`index`;; Sequence number of the item being generated +`redis`;; Handle to invoke Redis commands (https://lettuce.io/core/release/api/io/lettuce/core/api/sync/RedisCommands.html[API doc]) -Sequence number of the item being generated +.Processor Example [source,bash] ---- ---process "id=#index" +--process epoch="#date.parse(mydate).getTime()" location="#geo(lon,lat)" id="#index" name="#redis.hget('person1','lastName')" ---- ==== Regular Expressions diff --git a/core/src/main/java/com/redislabs/riot/AbstractImportCommand.java b/core/src/main/java/com/redislabs/riot/AbstractImportCommand.java index 9a60214b2..1231e656f 100644 --- a/core/src/main/java/com/redislabs/riot/AbstractImportCommand.java +++ b/core/src/main/java/com/redislabs/riot/AbstractImportCommand.java @@ -30,12 +30,12 @@ public abstract class AbstractImportCommand extends AbstractTransferComman @Getter private final List> redisCommands = new ArrayList<>(); - protected AbstractTaskletStepBuilder> step(String name, String taskName, ItemReader reader) { + protected AbstractTaskletStepBuilder> step(String name, String taskName, ItemReader reader) throws Exception { StepBuilder step = stepBuilder(name, taskName); return step.reader(reader).processor(processor()).writer(writer()).build(); } - protected abstract ItemProcessor processor(); + protected abstract ItemProcessor processor() throws Exception; protected ItemWriter writer() { Assert.notNull(redisCommands, "RedisCommands not set"); diff --git a/core/src/main/java/com/redislabs/riot/ProcessorOptions.java b/core/src/main/java/com/redislabs/riot/ProcessorOptions.java index e84a5ef89..9f8fc8d34 100644 --- a/core/src/main/java/com/redislabs/riot/ProcessorOptions.java +++ b/core/src/main/java/com/redislabs/riot/ProcessorOptions.java @@ -2,18 +2,24 @@ import com.redislabs.riot.convert.RegexNamedGroupsExtractor; import com.redislabs.riot.processor.FilteringProcessor; +import com.redislabs.riot.processor.MapAccessor; import com.redislabs.riot.processor.MapProcessor; import com.redislabs.riot.processor.SpelProcessor; -import io.lettuce.core.api.StatefulConnection; +import io.lettuce.core.AbstractRedisClient; import lombok.AllArgsConstructor; import lombok.Builder; import lombok.NoArgsConstructor; import org.springframework.batch.item.ItemProcessor; import org.springframework.batch.item.support.CompositeItemProcessor; import org.springframework.core.convert.converter.Converter; +import org.springframework.expression.EvaluationContext; +import org.springframework.expression.Expression; +import org.springframework.expression.spel.standard.SpelExpressionParser; +import org.springframework.expression.spel.support.StandardEvaluationContext; import org.springframework.util.ObjectUtils; import picocli.CommandLine.Option; +import java.lang.reflect.Method; import java.text.SimpleDateFormat; import java.util.*; @@ -22,22 +28,22 @@ @AllArgsConstructor public class ProcessorOptions { - @Option(arity = "1..*", names = "--process", description = "SpEL processors in the form: --process field1=\"\" field2=\"\" …", paramLabel = "") - private Map spelFields; - @Option(arity = "1..*", names = "--var", description = "Register a variable in the SpEL processor context", paramLabel = "") + @Option(arity = "1..*", names = "--process", description = "SpEL processing expressions in the form: =\"\"", paramLabel = "") + private Map spelFields; + @Option(arity = "1..*", names = "--var", description = "Register a variable in the SpEL processor context.", paramLabel = "") private Map variables; @Builder.Default - @Option(names = "--date", description = "Processor date format (default: ${DEFAULT-VALUE})", paramLabel = "") + @Option(names = "--date", description = "Processor date format (default: ${DEFAULT-VALUE}).", paramLabel = "") private String dateFormat = new SimpleDateFormat().toPattern(); - @Option(arity = "1..*",names = "--filter", description = "Discard records using SpEL boolean expressions", paramLabel = "") + @Option(arity = "1..*", names = "--filter", description = "Discard records using SpEL boolean expressions.", paramLabel = "") private String[] filters; - @Option(arity = "1..*", names = "--regex", description = "Extract named values from source field using regex", paramLabel = "") + @Option(arity = "1..*", names = "--regex", description = "Extract named values from source field using regex.", paramLabel = "") private Map regexes; - public ItemProcessor, Map> processor(StatefulConnection connection) { + public ItemProcessor, Map> processor(AbstractRedisClient client) throws NoSuchMethodException { List, Map>> processors = new ArrayList<>(); if (!ObjectUtils.isEmpty(spelFields)) { - processors.add(new SpelProcessor(connection, new SimpleDateFormat(dateFormat), variables, spelFields)); + processors.add(new SpelProcessor(context(client), spelFields)); } if (!ObjectUtils.isEmpty(regexes)) { Map>> fields = new LinkedHashMap<>(); @@ -58,4 +64,25 @@ public ItemProcessor, Map> processor(Statefu return compositeItemProcessor; } + private EvaluationContext context(AbstractRedisClient client) throws NoSuchMethodException { + StandardEvaluationContext context = new StandardEvaluationContext(); + context.setVariable("date", new SimpleDateFormat(dateFormat)); + context.setVariable("redis", RedisOptions.commands(client)); + SpelExpressionParser parser = new SpelExpressionParser(); + if (variables != null) { + variables.forEach((k, v) -> context.setVariable(k, parser.parseExpression(v).getValue(context))); + } + Method geoMethod = getClass().getDeclaredMethod("geo", String.class, String.class); + context.registerFunction("geo", geoMethod); + context.setPropertyAccessors(Collections.singletonList(new MapAccessor())); + return context; + } + + public static String geo(String longitude, String latitude) { + if (longitude == null || latitude == null) { + return null; + } + return longitude + "," + latitude; + } + } diff --git a/core/src/main/java/com/redislabs/riot/RedisOptions.java b/core/src/main/java/com/redislabs/riot/RedisOptions.java index b0bfa2d80..66d3c3699 100644 --- a/core/src/main/java/com/redislabs/riot/RedisOptions.java +++ b/core/src/main/java/com/redislabs/riot/RedisOptions.java @@ -2,6 +2,7 @@ import io.lettuce.core.*; import io.lettuce.core.api.StatefulConnection; +import io.lettuce.core.api.sync.BaseRedisCommands; import io.lettuce.core.cluster.ClusterClientOptions; import io.lettuce.core.cluster.RedisClusterClient; import io.lettuce.core.event.DefaultEventPublisherOptions; @@ -87,6 +88,13 @@ public class RedisOptions { @Option(names = "--client", description = "Client name used to connect to Redis.", paramLabel = "") private String clientName; + public static BaseRedisCommands commands(AbstractRedisClient client) { + if (client instanceof RedisClusterClient) { + return ((RedisClusterClient) client).connect().sync(); + } + return ((RedisClient) client).connect().sync(); + } + public List uris() { List redisURIs = new ArrayList<>(); if (ObjectUtils.isEmpty(uris)) { @@ -163,7 +171,7 @@ public RedisClient redisClient() { return client; } - public > GenericObjectPoolConfig poolConfig() { + public GenericObjectPoolConfig poolConfig() { GenericObjectPoolConfig config = new GenericObjectPoolConfig<>(); config.setMaxTotal(poolMaxTotal); return config; @@ -183,4 +191,12 @@ public > B configureCommandTimeout(B builder) log.info("Setting {} command timeout to {}", ClassUtils.getShortName(builder.getClass()), commandTimeout); return builder.commandTimeout(commandTimeout); } + + public static StatefulConnection connection(AbstractRedisClient client) { + if (client instanceof RedisClusterClient) { + return ((RedisClusterClient) client).connect(); + } + return ((RedisClient) client).connect(); + } + } diff --git a/core/src/main/java/com/redislabs/riot/RiotApp.java b/core/src/main/java/com/redislabs/riot/RiotApp.java index e2bddd24a..c47eb9ccd 100644 --- a/core/src/main/java/com/redislabs/riot/RiotApp.java +++ b/core/src/main/java/com/redislabs/riot/RiotApp.java @@ -6,6 +6,8 @@ import io.netty.util.internal.logging.JdkLoggerFactory; import lombok.Getter; import org.springframework.core.NestedExceptionUtils; +import org.springframework.expression.Expression; +import org.springframework.expression.spel.standard.SpelExpressionParser; import picocli.AutoComplete; import picocli.CommandLine; import picocli.CommandLine.*; @@ -197,8 +199,19 @@ public RedisURI convert(String value) { } + static class ExpressionConverter implements CommandLine.ITypeConverter { + + private final SpelExpressionParser parser = new SpelExpressionParser(); + + @Override + public Expression convert(String value) throws Exception { + return parser.parseExpression(value); + } + } + protected void registerConverters(CommandLine commandLine) { - commandLine.registerConverter(io.lettuce.core.RedisURI.class, new RedisURIConverter()); + commandLine.registerConverter(RedisURI.class, new RedisURIConverter()); + commandLine.registerConverter(Expression.class, new ExpressionConverter()); } /** diff --git a/core/src/main/java/com/redislabs/riot/RiotCommand.java b/core/src/main/java/com/redislabs/riot/RiotCommand.java index ad9ab010b..13c0eae94 100644 --- a/core/src/main/java/com/redislabs/riot/RiotCommand.java +++ b/core/src/main/java/com/redislabs/riot/RiotCommand.java @@ -59,7 +59,7 @@ public Integer call() throws Exception { public void afterPropertiesSet() throws Exception { this.client = app.getRedisOptions().client(); this.pool = pool(app.getRedisOptions(), client); - this.connection = connection(client); + this.connection = RedisOptions.connection(client); } public void shutdown() { @@ -82,14 +82,6 @@ protected BaseRedisCommands sync() { return ((StatefulRedisConnection) connection).sync(); } - - protected StatefulConnection connection(AbstractRedisClient client) { - if (client instanceof RedisClusterClient) { - return ((RedisClusterClient) client).connect(); - } - return ((RedisClient) client).connect(); - } - protected GenericObjectPool> pool(RedisOptions redisOptions, AbstractRedisClient client) { if (client instanceof RedisClusterClient) { return ConnectionPoolSupport.createGenericObjectPool(((RedisClusterClient) client)::connect, redisOptions.poolConfig()); diff --git a/core/src/main/java/com/redislabs/riot/convert/MapFilteringConverter.java b/core/src/main/java/com/redislabs/riot/convert/MapFilteringConverter.java index 9e53f4fb3..29e1ca498 100644 --- a/core/src/main/java/com/redislabs/riot/convert/MapFilteringConverter.java +++ b/core/src/main/java/com/redislabs/riot/convert/MapFilteringConverter.java @@ -2,6 +2,7 @@ import org.springframework.core.convert.converter.Converter; import org.springframework.util.Assert; +import org.springframework.util.ObjectUtils; import java.util.*; @@ -17,7 +18,7 @@ public MapFilteringConverter(Set includes, Set excludes) { @Override public Map convert(Map source) { - Map filtered = includes.isEmpty() ? source : new LinkedHashMap<>(); + Map filtered = ObjectUtils.isEmpty(includes) ? source : new LinkedHashMap<>(); includes.forEach(f -> filtered.put(f, source.get(f))); excludes.forEach(filtered::remove); return filtered; diff --git a/core/src/main/java/com/redislabs/riot/processor/SpelProcessor.java b/core/src/main/java/com/redislabs/riot/processor/SpelProcessor.java index b569293d6..92bcc148a 100644 --- a/core/src/main/java/com/redislabs/riot/processor/SpelProcessor.java +++ b/core/src/main/java/com/redislabs/riot/processor/SpelProcessor.java @@ -1,54 +1,30 @@ package com.redislabs.riot.processor; -import java.lang.reflect.Method; -import java.text.DateFormat; -import java.util.Collections; -import java.util.HashMap; -import java.util.LinkedHashMap; -import java.util.Map; -import java.util.concurrent.atomic.AtomicLong; - +import lombok.extern.slf4j.Slf4j; import org.springframework.batch.item.ItemProcessor; +import org.springframework.expression.EvaluationContext; import org.springframework.expression.Expression; import org.springframework.expression.ExpressionInvocationTargetException; -import org.springframework.expression.spel.standard.SpelExpressionParser; -import org.springframework.expression.spel.support.StandardEvaluationContext; import org.springframework.util.Assert; -import io.lettuce.core.api.StatefulConnection; -import io.lettuce.core.api.StatefulRedisConnection; -import io.lettuce.core.cluster.api.StatefulRedisClusterConnection; -import lombok.extern.slf4j.Slf4j; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.atomic.AtomicLong; @Slf4j public class SpelProcessor implements ItemProcessor, Map> { - private final StandardEvaluationContext context; - private final Map expressions = new LinkedHashMap<>(); - private final AtomicLong index = new AtomicLong(); - - public SpelProcessor(StatefulConnection connection, DateFormat dateFormat, Map variables, Map fields) { - Assert.notNull(connection, "A Redis connection is required."); - Assert.notNull(dateFormat, "A DateFormat instance is required."); - Assert.isTrue(fields != null && !fields.isEmpty(), "At least one field is required."); - // keep a reference to the connection object to avoid GC reclaiming it - this.context = new StandardEvaluationContext(); - context.setVariable("date", dateFormat); - context.setVariable("index", index); - context.setVariable("redis", connection instanceof StatefulRedisClusterConnection ? ((StatefulRedisClusterConnection) connection).sync() : ((StatefulRedisConnection) connection).sync()); - SpelExpressionParser parser = new SpelExpressionParser(); - if (variables != null) { - variables.forEach((k, v) -> context.setVariable(k, parser.parseExpression(v).getValue(context))); - } - Method geoMethod; - try { - geoMethod = getClass().getDeclaredMethod("geo", String.class, String.class); - context.registerFunction("geo", geoMethod); - } catch (NoSuchMethodException | SecurityException e) { - log.error("Could not register geo function", e); - } - context.setPropertyAccessors(Collections.singletonList(new MapAccessor())); - fields.forEach((k, v) -> expressions.put(k, parser.parseExpression(v))); + private final Map expressions; + private final EvaluationContext context; + private final AtomicLong index; + + public SpelProcessor(EvaluationContext context, Map expressions) { + Assert.notNull(context, "A SpEL evaluation context is required."); + Assert.notEmpty(expressions, "At least one field is required."); + this.index = new AtomicLong(); + this.context = context; + this.context.setVariable("index", index); + this.expressions = expressions; } @Override @@ -56,9 +32,12 @@ public Map process(Map item) { Map map = new HashMap<>(item); synchronized (context) { for (String field : expressions.keySet()) { + Expression expression = expressions.get(field); try { - Object value = expressions.get(field).getValue(context, map); - if (value != null) { + Object value = expression.getValue(context, map); + if (value == null) { + map.remove(field); + } else { map.put(field, value); } } catch (ExpressionInvocationTargetException e) { @@ -71,11 +50,4 @@ public Map process(Map item) { return map; } - public static String geo(String longitude, String latitude) { - if (longitude == null || latitude == null) { - return null; - } - return longitude + "," + latitude; - } - }