diff --git a/connectors/riot-file/src/main/java/com/redis/riot/file/FileUtils.java b/connectors/riot-file/src/main/java/com/redis/riot/file/FileUtils.java index 8e85a54dc..9823517bf 100644 --- a/connectors/riot-file/src/main/java/com/redis/riot/file/FileUtils.java +++ b/connectors/riot-file/src/main/java/com/redis/riot/file/FileUtils.java @@ -137,11 +137,11 @@ public static boolean isGoogleStorageResource(String file) { } @SuppressWarnings({ "rawtypes", "unchecked" }) - public static JsonItemReader jsonReader(Resource resource, Class clazz) { + public static JsonItemReader jsonReader(Resource resource, Class type) { JsonItemReaderBuilder jsonReaderBuilder = new JsonItemReaderBuilder<>(); jsonReaderBuilder.name(resource.getFilename() + "-json-file-reader"); jsonReaderBuilder.resource(resource); - JacksonJsonObjectReader jsonObjectReader = new JacksonJsonObjectReader(clazz); + JacksonJsonObjectReader jsonObjectReader = new JacksonJsonObjectReader(type); jsonObjectReader.setMapper(objectMapper()); jsonReaderBuilder.jsonObjectReader(jsonObjectReader); return jsonReaderBuilder.build(); @@ -154,11 +154,11 @@ public static ObjectMapper objectMapper() { } @SuppressWarnings({ "unchecked", "rawtypes" }) - public static XmlItemReader xmlReader(Resource resource, Class clazz) { + public static XmlItemReader xmlReader(Resource resource, Class type) { XmlItemReaderBuilder xmlReaderBuilder = new XmlItemReaderBuilder<>(); xmlReaderBuilder.name(resource.getFilename() + "-xml-file-reader"); xmlReaderBuilder.resource(resource); - XmlObjectReader xmlObjectReader = new XmlObjectReader(clazz); + XmlObjectReader xmlObjectReader = new XmlObjectReader(type); xmlObjectReader.setMapper(xmlMapper()); xmlReaderBuilder.xmlObjectReader(xmlObjectReader); return xmlReaderBuilder.build(); diff --git a/connectors/riot-file/src/test/java/com/redis/riot/file/FileTests.java b/connectors/riot-file/src/test/java/com/redis/riot/file/FileTests.java index e9a1b3d13..51a92dd22 100644 --- a/connectors/riot-file/src/test/java/com/redis/riot/file/FileTests.java +++ b/connectors/riot-file/src/test/java/com/redis/riot/file/FileTests.java @@ -16,7 +16,6 @@ import org.springframework.batch.item.UnexpectedInputException; import com.redis.riot.core.RedisOptions; -import com.redis.riot.core.RedisUriOptions; import com.redis.riot.core.operation.HsetBuilder; import com.redis.spring.batch.test.AbstractTestBase; @@ -53,9 +52,7 @@ void fileImportJSON() throws UnexpectedInputException, ParseException, NonTransi private RedisOptions redisClientOptions() { RedisOptions options = new RedisOptions(); options.setCluster(getRedisServer().isCluster()); - RedisUriOptions uriOptions = new RedisUriOptions(); - uriOptions.setUri(getRedisServer().getRedisURI()); - options.setUriOptions(uriOptions); + options.setUri(getRedisServer().getRedisURI()); return options; } diff --git a/core/riot-core/src/main/java/com/redis/riot/core/AbstractExport.java b/core/riot-core/src/main/java/com/redis/riot/core/AbstractExport.java index 2b838e19a..13ee834fb 100644 --- a/core/riot-core/src/main/java/com/redis/riot/core/AbstractExport.java +++ b/core/riot-core/src/main/java/com/redis/riot/core/AbstractExport.java @@ -22,83 +22,79 @@ public abstract class AbstractExport extends AbstractJobRunnable { - private RedisReaderOptions readerOptions = new RedisReaderOptions(); - - private KeyFilterOptions keyFilterOptions = new KeyFilterOptions(); - - protected KeyValueProcessorOptions processorOptions = new KeyValueProcessorOptions(); - - public void setKeyFilterOptions(KeyFilterOptions keyFilterOptions) { - this.keyFilterOptions = keyFilterOptions; - } - - public void setReaderOptions(RedisReaderOptions readerOptions) { - this.readerOptions = readerOptions; - } - - public void setProcessorOptions(KeyValueProcessorOptions options) { - this.processorOptions = options; - } - - protected ItemProcessor, KeyValue> processor(RedisCodec codec, RiotContext context) { - ToStringKeyValueFunction code = new ToStringKeyValueFunction<>(codec); - StringKeyValueFunction decode = new StringKeyValueFunction<>(codec); - return new FunctionItemProcessor<>(code.andThen(function(context.getEvaluationContext())).andThen(decode)); - } - - private Function, KeyValue> function(EvaluationContext context) { - KeyValueOperator operator = new KeyValueOperator(); - if (processorOptions.getKeyExpression() != null) { - operator.setKeyFunction(ExpressionFunction.of(context, processorOptions.getKeyExpression())); - } - if (processorOptions.isDropTtl()) { - operator.setTtlFunction(t -> 0); - } else { - if (processorOptions.getTtlExpression() != null) { - operator.setTtlFunction(new LongExpressionFunction<>(context, processorOptions.getTtlExpression())); - } - } - if (processorOptions.isDropStreamMessageId() && isStruct()) { - operator.setValueFunction(new DropStreamMessageIdFunction()); - } - if (processorOptions.getTypeExpression() != null) { - Function, String> function = ExpressionFunction.of(context, processorOptions.getTypeExpression()); - operator.setTypeFunction(function.andThen(DataType::of)); - } - return operator; - } - - protected abstract boolean isStruct(); - - protected void configureReader(RedisItemReader reader, RedisContext context) { - reader.setChunkSize(readerOptions.getChunkSize()); - reader.setDatabase(context.getUri().getDatabase()); - reader.setKeyProcessor(keyFilteringProcessor(reader.getCodec())); - reader.setKeyPattern(readerOptions.getKeyPattern()); - reader.setKeyType(readerOptions.getKeyType()); - reader.setFlushInterval(readerOptions.getFlushInterval()); - reader.setIdleTimeout(readerOptions.getIdleTimeout()); - if (reader instanceof KeyValueItemReader) { - KeyValueItemReader keyValueReader = (KeyValueItemReader) reader; - keyValueReader.setMemoryUsageLimit(readerOptions.getMemoryUsageLimit()); - keyValueReader.setMemoryUsageSamples(readerOptions.getMemoryUsageSamples()); - keyValueReader.setPoolSize(readerOptions.getPoolSize()); - } - reader.setNotificationQueueCapacity(readerOptions.getNotificationQueueCapacity()); - reader.setOrderingStrategy(readerOptions.getOrderingStrategy()); - reader.setPollTimeout(readerOptions.getPollTimeout()); - reader.setQueueCapacity(readerOptions.getQueueCapacity()); - reader.setReadFrom(readerOptions.getReadFrom()); - reader.setScanCount(readerOptions.getScanCount()); - reader.setThreads(readerOptions.getThreads()); - } - - public ItemProcessor keyFilteringProcessor(RedisCodec codec) { - Predicate predicate = RiotUtils.keyFilterPredicate(codec, keyFilterOptions); - if (predicate == null) { - return null; - } - return new PredicateItemProcessor<>(predicate); - } + private RedisReaderOptions readerOptions = new RedisReaderOptions(); + + protected KeyValueProcessorOptions processorOptions = new KeyValueProcessorOptions(); + + public void setReaderOptions(RedisReaderOptions readerOptions) { + this.readerOptions = readerOptions; + } + + public void setProcessorOptions(KeyValueProcessorOptions options) { + this.processorOptions = options; + } + + protected ItemProcessor, KeyValue> processor(RedisCodec codec, RiotContext context) { + ToStringKeyValueFunction code = new ToStringKeyValueFunction<>(codec); + StringKeyValueFunction decode = new StringKeyValueFunction<>(codec); + Function, KeyValue> function = processorFunction(context.getEvaluationContext()); + return new FunctionItemProcessor<>(code.andThen(function).andThen(decode)); + } + + private Function, KeyValue> processorFunction(EvaluationContext context) { + KeyValueOperator operator = new KeyValueOperator(); + if (processorOptions.getKeyExpression() != null) { + operator.setKeyFunction(ExpressionFunction.of(context, processorOptions.getKeyExpression())); + } + if (processorOptions.isDropTtl()) { + operator.setTtlFunction(t -> 0); + } else { + if (processorOptions.getTtlExpression() != null) { + operator.setTtlFunction(new LongExpressionFunction<>(context, processorOptions.getTtlExpression())); + } + } + if (processorOptions.isDropStreamMessageId() && isStruct()) { + operator.setValueFunction(new DropStreamMessageIdFunction()); + } + if (processorOptions.getTypeExpression() != null) { + Function, String> function = ExpressionFunction.of(context, + processorOptions.getTypeExpression()); + operator.setTypeFunction(function.andThen(DataType::of)); + } + return operator; + } + + protected abstract boolean isStruct(); + + protected void configureReader(RedisItemReader reader, RedisContext context) { + reader.setChunkSize(readerOptions.getChunkSize()); + reader.setDatabase(context.getUri().getDatabase()); + reader.setKeyProcessor(keyFilteringProcessor(reader.getCodec())); + reader.setKeyPattern(readerOptions.getKeyPattern()); + reader.setKeyType(readerOptions.getKeyType()); + reader.setFlushInterval(readerOptions.getFlushInterval()); + reader.setIdleTimeout(readerOptions.getIdleTimeout()); + if (reader instanceof KeyValueItemReader) { + KeyValueItemReader keyValueReader = (KeyValueItemReader) reader; + keyValueReader.setMemoryUsageLimit(readerOptions.getMemoryUsageLimit()); + keyValueReader.setMemoryUsageSamples(readerOptions.getMemoryUsageSamples()); + keyValueReader.setPoolSize(readerOptions.getPoolSize()); + } + reader.setNotificationQueueCapacity(readerOptions.getNotificationQueueCapacity()); + reader.setOrderingStrategy(readerOptions.getOrderingStrategy()); + reader.setPollTimeout(readerOptions.getPollTimeout()); + reader.setQueueCapacity(readerOptions.getQueueCapacity()); + reader.setReadFrom(readerOptions.getReadFrom()); + reader.setScanCount(readerOptions.getScanCount()); + reader.setThreads(readerOptions.getThreads()); + } + + public ItemProcessor keyFilteringProcessor(RedisCodec codec) { + Predicate predicate = RiotUtils.keyFilterPredicate(codec, readerOptions.getKeyFilterOptions()); + if (predicate == null) { + return null; + } + return new PredicateItemProcessor<>(predicate); + } } diff --git a/core/riot-core/src/main/java/com/redis/riot/core/AbstractJobRunnable.java b/core/riot-core/src/main/java/com/redis/riot/core/AbstractJobRunnable.java index 19d952bc3..d7141134d 100644 --- a/core/riot-core/src/main/java/com/redis/riot/core/AbstractJobRunnable.java +++ b/core/riot-core/src/main/java/com/redis/riot/core/AbstractJobRunnable.java @@ -1,5 +1,6 @@ package com.redis.riot.core; +import java.time.Duration; import java.util.ArrayList; import java.util.Arrays; import java.util.List; @@ -21,6 +22,8 @@ import org.springframework.batch.core.repository.support.AbstractJobRepositoryFactoryBean; import org.springframework.batch.core.step.builder.FaultTolerantStepBuilder; import org.springframework.batch.core.step.builder.SimpleStepBuilder; +import org.springframework.batch.core.step.skip.NeverSkipItemSkipPolicy; +import org.springframework.batch.core.step.skip.SkipPolicy; import org.springframework.batch.item.ItemProcessor; import org.springframework.batch.item.ItemReader; import org.springframework.batch.item.ItemStreamReader; @@ -28,6 +31,7 @@ import org.springframework.batch.item.support.SynchronizedItemStreamReader; import org.springframework.beans.factory.InitializingBean; import org.springframework.core.task.SyncTaskExecutor; +import org.springframework.retry.policy.MaxAttemptsRetryPolicy; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import org.springframework.transaction.PlatformTransactionManager; import org.springframework.util.ClassUtils; @@ -42,6 +46,18 @@ public abstract class AbstractJobRunnable extends AbstractRiotRunnable { + public static final SkipPolicy DEFAULT_SKIP_POLICY = new NeverSkipItemSkipPolicy(); + + public static final int DEFAULT_SKIP_LIMIT = 0; + + public static final int DEFAULT_RETRY_LIMIT = MaxAttemptsRetryPolicy.DEFAULT_MAX_ATTEMPTS; + + public static final Duration DEFAULT_SLEEP = Duration.ZERO; + + public static final int DEFAULT_CHUNK_SIZE = 50; + + public static final int DEFAULT_THREADS = 1; + private static final String FAILED_JOB_MESSAGE = "Error executing job %s"; protected final Logger log = LoggerFactory.getLogger(getClass()); @@ -52,8 +68,6 @@ public abstract class AbstractJobRunnable extends AbstractRiotRunnable { private PlatformTransactionManager transactionManager; - private StepOptions stepOptions = new StepOptions(); - private JobBuilderFactory jobBuilderFactory; private StepBuilderFactory stepBuilderFactory; @@ -63,6 +77,66 @@ public abstract class AbstractJobRunnable extends AbstractRiotRunnable { private Consumer> stepConfigurer = s -> { }; + private int threads = DEFAULT_THREADS; + + private int chunkSize = DEFAULT_CHUNK_SIZE; + + private Duration sleep = DEFAULT_SLEEP; + + private boolean dryRun; + + private int skipLimit = DEFAULT_SKIP_LIMIT; + + private int retryLimit = DEFAULT_RETRY_LIMIT; + + public boolean isDryRun() { + return dryRun; + } + + public void setDryRun(boolean dryRun) { + this.dryRun = dryRun; + } + + public int getThreads() { + return threads; + } + + public void setThreads(int threads) { + this.threads = threads; + } + + public int getChunkSize() { + return chunkSize; + } + + public void setChunkSize(int chunkSize) { + this.chunkSize = chunkSize; + } + + public Duration getSleep() { + return sleep; + } + + public void setSleep(Duration sleep) { + this.sleep = sleep; + } + + public int getSkipLimit() { + return skipLimit; + } + + public void setSkipLimit(int skipLimit) { + this.skipLimit = skipLimit; + } + + public int getRetryLimit() { + return retryLimit; + } + + public void setRetryLimit(int retryLimit) { + this.retryLimit = retryLimit; + } + public String getName() { return name; } @@ -78,14 +152,6 @@ protected String name(String... suffixes) { return String.join("-", elements); } - public StepOptions getStepOptions() { - return stepOptions; - } - - public void setStepOptions(StepOptions stepOptions) { - this.stepOptions = stepOptions; - } - public void setJobRepository(JobRepository jobRepository) { this.jobRepository = jobRepository; } @@ -182,18 +248,18 @@ protected FaultTolerantStepBuilder step(String name, ItemReader } protected FaultTolerantStepBuilder step(RiotStep riotStep) { - SimpleStepBuilder step = stepBuilderFactory.get(riotStep.getName()).chunk(stepOptions.getChunkSize()); + SimpleStepBuilder step = stepBuilderFactory.get(riotStep.getName()).chunk(chunkSize); step.reader(reader(riotStep.getReader())); step.processor(processor(riotStep.getProcessor())); step.writer(writer(riotStep.getWriter())); - if (stepOptions.getThreads() > 1) { + if (threads > 1) { ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor(); - taskExecutor.setMaxPoolSize(stepOptions.getThreads()); - taskExecutor.setCorePoolSize(stepOptions.getThreads()); - taskExecutor.setQueueCapacity(stepOptions.getThreads()); + taskExecutor.setMaxPoolSize(threads); + taskExecutor.setCorePoolSize(threads); + taskExecutor.setQueueCapacity(threads); taskExecutor.afterPropertiesSet(); step.taskExecutor(taskExecutor); - step.throttleLimit(stepOptions.getThreads()); + step.throttleLimit(threads); } riotStep.getConfigurer().accept(step); if (riotStep.getReader() instanceof RedisItemReader) { @@ -206,8 +272,8 @@ protected FaultTolerantStepBuilder step(RiotStep riotStep) { } } FaultTolerantStepBuilder ftStep = step.faultTolerant(); - ftStep.skipLimit(stepOptions.getSkipLimit()); - ftStep.retryLimit(stepOptions.getRetryLimit()); + ftStep.skipLimit(skipLimit); + ftStep.retryLimit(retryLimit); ftStep.retry(RedisCommandTimeoutException.class); ftStep.noRetry(RedisCommandExecutionException.class); return ftStep; @@ -233,7 +299,7 @@ private ItemReader reader(ItemReader reader) { if (reader instanceof RedisItemReader) { return reader; } - if (stepOptions.getThreads() > 1 && reader instanceof ItemStreamReader) { + if (threads > 1 && reader instanceof ItemStreamReader) { SynchronizedItemStreamReader synchronizedReader = new SynchronizedItemStreamReader<>(); synchronizedReader.setDelegate((ItemStreamReader) reader); return synchronizedReader; @@ -242,14 +308,14 @@ private ItemReader reader(ItemReader reader) { } private ItemWriter writer(ItemWriter writer) { - if (stepOptions.isDryRun()) { + if (dryRun) { return new NoopItemWriter<>(); } initializeBean(writer); - if (stepOptions.getSleep() == null || stepOptions.getSleep().isNegative() || stepOptions.getSleep().isZero()) { - return writer; + if (RiotUtils.isPositive(sleep)) { + return new ThrottledItemWriter<>(writer, sleep); } - return new ThrottledItemWriter<>(writer, stepOptions.getSleep()); + return writer; } } diff --git a/core/riot-core/src/main/java/com/redis/riot/core/AbstractMapImport.java b/core/riot-core/src/main/java/com/redis/riot/core/AbstractMapImport.java index 8762e1b71..7edbddba0 100644 --- a/core/riot-core/src/main/java/com/redis/riot/core/AbstractMapImport.java +++ b/core/riot-core/src/main/java/com/redis/riot/core/AbstractMapImport.java @@ -1,14 +1,31 @@ package com.redis.riot.core; +import java.util.ArrayList; import java.util.Arrays; +import java.util.LinkedHashMap; import java.util.List; import java.util.Map; +import java.util.Map.Entry; +import java.util.function.Function; +import java.util.function.Predicate; import java.util.stream.Collectors; import org.springframework.batch.item.ItemProcessor; import org.springframework.batch.item.ItemWriter; +import org.springframework.batch.item.function.FunctionItemProcessor; +import org.springframework.context.expression.MapAccessor; +import org.springframework.expression.AccessException; +import org.springframework.expression.EvaluationContext; +import org.springframework.expression.Expression; +import org.springframework.expression.TypedValue; +import org.springframework.expression.spel.support.StandardEvaluationContext; +import org.springframework.lang.Nullable; import org.springframework.util.Assert; +import org.springframework.util.CollectionUtils; +import com.redis.lettucemod.util.GeoLocation; +import com.redis.riot.core.function.ExpressionFunction; +import com.redis.riot.core.function.MapFunction; import com.redis.spring.batch.RedisItemWriter; import com.redis.spring.batch.writer.OperationItemWriter; import com.redis.spring.batch.writer.WriteOperation; @@ -17,18 +34,77 @@ public abstract class AbstractMapImport extends AbstractJobRunnable { - private ProcessorOptions processorOptions = new ProcessorOptions(); + private Map processorExpressions; + + private Expression filterExpression; + + public Map getProcessorExpressions() { + return processorExpressions; + } + + public void setProcessorExpressions(Map expressions) { + this.processorExpressions = expressions; + } + + public Expression getFilterExpression() { + return filterExpression; + } + + public void setFilterExpression(Expression filter) { + this.filterExpression = filter; + } + + public ItemProcessor, Map> processor(StandardEvaluationContext context) { + context.addPropertyAccessor(new QuietMapAccessor()); + try { + context.registerFunction("geo", GeoLocation.class.getDeclaredMethod("toString", String.class, String.class)); + } catch (NoSuchMethodException e) { + // ignore + } + List, Map>> processors = new ArrayList<>(); + if (!CollectionUtils.isEmpty(processorExpressions)) { + Map, Object>> functions = new LinkedHashMap<>(); + for (Entry field : processorExpressions.entrySet()) { + functions.put(field.getKey(), new ExpressionFunction<>(context, field.getValue(), Object.class)); + } + processors.add(new FunctionItemProcessor<>(new MapFunction(functions))); + } + if (filterExpression != null) { + Predicate> predicate = RiotUtils.predicate(context, filterExpression); + processors.add(new PredicateItemProcessor<>(predicate)); + } + return RiotUtils.processor(processors); + } + + /** + * {@link org.springframework.context.expression.MapAccessor} that always returns true for canRead and does not throw + * AccessExceptions + * + */ + public static class QuietMapAccessor extends MapAccessor { + + @Override + public boolean canRead(EvaluationContext context, @Nullable Object target, String name) { + return true; + } + + @Override + public TypedValue read(EvaluationContext context, @Nullable Object target, String name) { + try { + return super.read(context, target, name); + } catch (AccessException e) { + return new TypedValue(null); + } + } + + } private RedisWriterOptions writerOptions = new RedisWriterOptions(); private List>> operations; protected ItemProcessor, Map> processor(RiotContext context) { - return processorOptions.processor(context.getEvaluationContext()); - } - - public void setProcessorOptions(ProcessorOptions options) { - this.processorOptions = options; + return processor(context.getEvaluationContext()); } @SuppressWarnings("unchecked") diff --git a/core/riot-core/src/main/java/com/redis/riot/core/AbstractRiotRunnable.java b/core/riot-core/src/main/java/com/redis/riot/core/AbstractRiotRunnable.java index c0c095c38..ad6409424 100644 --- a/core/riot-core/src/main/java/com/redis/riot/core/AbstractRiotRunnable.java +++ b/core/riot-core/src/main/java/com/redis/riot/core/AbstractRiotRunnable.java @@ -2,7 +2,10 @@ import java.text.SimpleDateFormat; import java.time.Duration; +import java.util.LinkedHashMap; +import java.util.Map; +import org.springframework.expression.Expression; import org.springframework.expression.spel.support.StandardEvaluationContext; import org.springframework.util.CollectionUtils; import org.springframework.util.ObjectUtils; @@ -34,10 +37,40 @@ public abstract class AbstractRiotRunnable implements Runnable { private RedisOptions redisOptions = new RedisOptions(); - private EvaluationContextOptions evaluationContextOptions = new EvaluationContextOptions(); + public static final String DEFAULT_DATE_FORMAT = "yyyy-MM-dd'T'HH:mm:ss.SSSZ"; - public void setEvaluationContextOptions(EvaluationContextOptions options) { - this.evaluationContextOptions = options; + private String dateFormat = DEFAULT_DATE_FORMAT; + + private Map expressions = new LinkedHashMap<>(); + + private Map variables = new LinkedHashMap<>(); + + public String getDateFormat() { + return dateFormat; + } + + public void setDateFormat(String format) { + this.dateFormat = format; + } + + public Map getExpressions() { + return expressions; + } + + public void setExpressions(Map expressions) { + this.expressions = expressions; + } + + public Map getVariables() { + return variables; + } + + public void setVariables(Map variables) { + this.variables = variables; + } + + public RedisOptions getRedisOptions() { + return redisOptions; } public void setRedisOptions(RedisOptions options) { @@ -58,27 +91,27 @@ protected RiotContext createExecutionContext() { } protected RedisContext redisContext(RedisOptions options) { - RedisURI redisURI = redisURI(options.getUriOptions()); + RedisURI redisURI = redisURI(options); AbstractRedisClient client = client(redisURI, options); return new RedisContext(redisURI, client); } private StandardEvaluationContext evaluationContext(RedisContext redisContext) { StandardEvaluationContext context = new StandardEvaluationContext(); - context.setVariable(DATE_VARIABLE_NAME, new SimpleDateFormat(evaluationContextOptions.getDateFormat())); + context.setVariable(DATE_VARIABLE_NAME, new SimpleDateFormat(dateFormat)); context.setVariable(REDIS_VARIABLE_NAME, redisContext.getConnection().sync()); - if (!CollectionUtils.isEmpty(evaluationContextOptions.getVariables())) { - evaluationContextOptions.getVariables().forEach(context::setVariable); + if (!CollectionUtils.isEmpty(variables)) { + variables.forEach(context::setVariable); } - if (!CollectionUtils.isEmpty(evaluationContextOptions.getExpressions())) { - evaluationContextOptions.getExpressions().forEach((k, v) -> context.setVariable(k, v.getValue(context))); + if (!CollectionUtils.isEmpty(expressions)) { + expressions.forEach((k, v) -> context.setVariable(k, v.getValue(context))); } return context; } protected abstract void execute(RiotContext executionContext); - public RedisURI redisURI(RedisUriOptions options) { + public RedisURI redisURI(RedisOptions options) { RedisURI.Builder builder = redisURIBuilder(options); if (options.getDatabase() > 0) { builder.withDatabase(options.getDatabase()); @@ -103,7 +136,7 @@ public RedisURI redisURI(RedisUriOptions options) { return builder.build(); } - private RedisURI.Builder redisURIBuilder(RedisUriOptions options) { + private RedisURI.Builder redisURIBuilder(RedisOptions options) { if (StringUtils.hasLength(options.getUri())) { return RedisURI.builder(RedisURI.create(options.getUri())); } @@ -117,22 +150,22 @@ private AbstractRedisClient client(RedisURI redisURI, RedisOptions options) { ClientResources resources = clientResources(options); if (options.isCluster()) { RedisModulesClusterClient client = RedisModulesClusterClient.create(resources, redisURI); - client.setOptions(clientOptions(ClusterClientOptions.builder(), options.getClientOptions()).build()); + client.setOptions(clientOptions(ClusterClientOptions.builder(), options).build()); return client; } RedisModulesClient client = RedisModulesClient.create(resources, redisURI); - client.setOptions(clientOptions(ClientOptions.builder(), options.getClientOptions()).build()); + client.setOptions(clientOptions(ClientOptions.builder(), options).build()); return client; } - private B clientOptions(B builder, RedisClientOptions options) { + private B clientOptions(B builder, RedisOptions options) { builder.autoReconnect(options.isAutoReconnect()); - builder.sslOptions(sslOptions(options.getSslOptions())); + builder.sslOptions(sslOptions(options)); builder.protocolVersion(options.getProtocolVersion()); return builder; } - public SslOptions sslOptions(RedisSslOptions options) { + public SslOptions sslOptions(RedisOptions options) { Builder ssl = SslOptions.builder(); if (options.getKey() != null) { ssl.keyManager(options.getKeyCert(), options.getKey(), options.getKeyPassword()); diff --git a/core/riot-core/src/main/java/com/redis/riot/core/CompareMode.java b/core/riot-core/src/main/java/com/redis/riot/core/CompareMode.java new file mode 100644 index 000000000..645efaacf --- /dev/null +++ b/core/riot-core/src/main/java/com/redis/riot/core/CompareMode.java @@ -0,0 +1,7 @@ +package com.redis.riot.core; + +public enum CompareMode { + + FULL, QUICK, NONE + +} diff --git a/core/riot-core/src/main/java/com/redis/riot/core/EvaluationContextOptions.java b/core/riot-core/src/main/java/com/redis/riot/core/EvaluationContextOptions.java deleted file mode 100644 index c4408a9c5..000000000 --- a/core/riot-core/src/main/java/com/redis/riot/core/EvaluationContextOptions.java +++ /dev/null @@ -1,42 +0,0 @@ -package com.redis.riot.core; - -import java.util.LinkedHashMap; -import java.util.Map; - -import org.springframework.expression.Expression; - -public class EvaluationContextOptions { - - public static final String DEFAULT_DATE_FORMAT = "yyyy-MM-dd'T'HH:mm:ss.SSSZ"; - - private String dateFormat = DEFAULT_DATE_FORMAT; - - private Map expressions = new LinkedHashMap<>(); - - private Map variables = new LinkedHashMap<>(); - - public String getDateFormat() { - return dateFormat; - } - - public void setDateFormat(String format) { - this.dateFormat = format; - } - - public Map getExpressions() { - return expressions; - } - - public void setExpressions(Map expressions) { - this.expressions = expressions; - } - - public Map getVariables() { - return variables; - } - - public void setVariables(Map variables) { - this.variables = variables; - } - -} diff --git a/core/riot-core/src/main/java/com/redis/riot/core/KeyComparisonMode.java b/core/riot-core/src/main/java/com/redis/riot/core/KeyComparisonMode.java deleted file mode 100644 index a90955d95..000000000 --- a/core/riot-core/src/main/java/com/redis/riot/core/KeyComparisonMode.java +++ /dev/null @@ -1,7 +0,0 @@ -package com.redis.riot.core; - -public enum KeyComparisonMode { - - FULL, QUICK - -} diff --git a/core/riot-core/src/main/java/com/redis/riot/core/KeyComparisonOptions.java b/core/riot-core/src/main/java/com/redis/riot/core/KeyComparisonOptions.java deleted file mode 100644 index f5419fbed..000000000 --- a/core/riot-core/src/main/java/com/redis/riot/core/KeyComparisonOptions.java +++ /dev/null @@ -1,51 +0,0 @@ -package com.redis.riot.core; - -import java.time.Duration; - -import com.redis.spring.batch.reader.KeyComparisonValueReader; - -public class KeyComparisonOptions { - - public static final KeyComparisonMode DEFAULT_MODE = KeyComparisonMode.QUICK; - - private boolean skip; - - private boolean showDiffs; - - private KeyComparisonMode mode = DEFAULT_MODE; - - private Duration ttlTolerance = KeyComparisonValueReader.DEFAULT_TTL_TOLERANCE; - - public KeyComparisonMode getMode() { - return mode; - } - - public void setMode(KeyComparisonMode mode) { - this.mode = mode; - } - - public boolean isSkip() { - return skip; - } - - public void setSkip(boolean skip) { - this.skip = skip; - } - - public boolean isShowDiffs() { - return showDiffs; - } - - public void setShowDiffs(boolean showDiff) { - this.showDiffs = showDiff; - } - - public Duration getTtlTolerance() { - return ttlTolerance; - } - - public void setTtlTolerance(Duration ttlTolerance) { - this.ttlTolerance = ttlTolerance; - } - -} diff --git a/core/riot-core/src/main/java/com/redis/riot/core/ProcessorOptions.java b/core/riot-core/src/main/java/com/redis/riot/core/ProcessorOptions.java deleted file mode 100644 index 450821ada..000000000 --- a/core/riot-core/src/main/java/com/redis/riot/core/ProcessorOptions.java +++ /dev/null @@ -1,93 +0,0 @@ -package com.redis.riot.core; - -import java.util.ArrayList; -import java.util.LinkedHashMap; -import java.util.List; -import java.util.Map; -import java.util.Map.Entry; -import java.util.function.Function; -import java.util.function.Predicate; - -import org.springframework.batch.item.ItemProcessor; -import org.springframework.batch.item.function.FunctionItemProcessor; -import org.springframework.context.expression.MapAccessor; -import org.springframework.expression.AccessException; -import org.springframework.expression.EvaluationContext; -import org.springframework.expression.Expression; -import org.springframework.expression.TypedValue; -import org.springframework.expression.spel.support.StandardEvaluationContext; -import org.springframework.lang.Nullable; -import org.springframework.util.CollectionUtils; - -import com.redis.lettucemod.util.GeoLocation; -import com.redis.riot.core.function.ExpressionFunction; -import com.redis.riot.core.function.MapFunction; - -public class ProcessorOptions { - - private Map expressions; - - private Expression filter; - - public Map getExpressions() { - return expressions; - } - - public void setExpressions(Map expressions) { - this.expressions = expressions; - } - - public Expression getFilter() { - return filter; - } - - public void setFilter(Expression filter) { - this.filter = filter; - } - - public ItemProcessor, Map> processor(StandardEvaluationContext context) { - context.addPropertyAccessor(new QuietMapAccessor()); - try { - context.registerFunction("geo", GeoLocation.class.getDeclaredMethod("toString", String.class, String.class)); - } catch (NoSuchMethodException e) { - // ignore - } - List, Map>> processors = new ArrayList<>(); - if (!CollectionUtils.isEmpty(expressions)) { - Map, Object>> functions = new LinkedHashMap<>(); - for (Entry field : expressions.entrySet()) { - functions.put(field.getKey(), new ExpressionFunction<>(context, field.getValue(), Object.class)); - } - processors.add(new FunctionItemProcessor<>(new MapFunction(functions))); - } - if (filter != null) { - Predicate> predicate = RiotUtils.predicate(context, filter); - processors.add(new PredicateItemProcessor<>(predicate)); - } - return RiotUtils.processor(processors); - } - - /** - * {@link org.springframework.context.expression.MapAccessor} that always returns true for canRead and does not throw - * AccessExceptions - * - */ - public static class QuietMapAccessor extends MapAccessor { - - @Override - public boolean canRead(EvaluationContext context, @Nullable Object target, String name) { - return true; - } - - @Override - public TypedValue read(EvaluationContext context, @Nullable Object target, String name) { - try { - return super.read(context, target, name); - } catch (AccessException e) { - return new TypedValue(null); - } - } - - } - -} diff --git a/core/riot-core/src/main/java/com/redis/riot/core/RedisClientOptions.java b/core/riot-core/src/main/java/com/redis/riot/core/RedisClientOptions.java deleted file mode 100644 index 9b8a13e97..000000000 --- a/core/riot-core/src/main/java/com/redis/riot/core/RedisClientOptions.java +++ /dev/null @@ -1,38 +0,0 @@ -package com.redis.riot.core; - -import io.lettuce.core.ClientOptions; -import io.lettuce.core.protocol.ProtocolVersion; - -public class RedisClientOptions { - - private RedisSslOptions sslOptions = new RedisSslOptions(); - - private boolean autoReconnect = ClientOptions.DEFAULT_AUTO_RECONNECT; - - private ProtocolVersion protocolVersion; - - public RedisSslOptions getSslOptions() { - return sslOptions; - } - - public void setSslOptions(RedisSslOptions sslOptions) { - this.sslOptions = sslOptions; - } - - public boolean isAutoReconnect() { - return autoReconnect; - } - - public void setAutoReconnect(boolean autoReconnect) { - this.autoReconnect = autoReconnect; - } - - public ProtocolVersion getProtocolVersion() { - return protocolVersion; - } - - public void setProtocolVersion(ProtocolVersion protocolVersion) { - this.protocolVersion = protocolVersion; - } - -} diff --git a/core/riot-core/src/main/java/com/redis/riot/core/RedisOptions.java b/core/riot-core/src/main/java/com/redis/riot/core/RedisOptions.java index 59aa7e7c9..0bb65a052 100644 --- a/core/riot-core/src/main/java/com/redis/riot/core/RedisOptions.java +++ b/core/riot-core/src/main/java/com/redis/riot/core/RedisOptions.java @@ -1,31 +1,233 @@ package com.redis.riot.core; +import java.io.File; import java.time.Duration; +import io.lettuce.core.ClientOptions; +import io.lettuce.core.RedisURI; +import io.lettuce.core.SslVerifyMode; +import io.lettuce.core.protocol.ProtocolVersion; + public class RedisOptions { - private RedisUriOptions uriOptions = new RedisUriOptions(); + public static final String DEFAULT_HOST = "127.0.0.1"; + + public static final int DEFAULT_PORT = 6379; + + public static final SslVerifyMode DEFAULT_VERIFY_PEER = SslVerifyMode.FULL; + + private String uri; + + private String host = DEFAULT_HOST; + + private int port = DEFAULT_PORT; + + private String socket; + + private String username; + + private char[] password; + + private Duration timeout = RedisURI.DEFAULT_TIMEOUT_DURATION; + + private int database; + + private boolean tls; + + private SslVerifyMode verifyPeer = DEFAULT_VERIFY_PEER; - private RedisClientOptions clientOptions = new RedisClientOptions(); + private String clientName; private boolean cluster; private Duration metricsStep; - public RedisUriOptions getUriOptions() { - return uriOptions; + private boolean autoReconnect = ClientOptions.DEFAULT_AUTO_RECONNECT; + + private ProtocolVersion protocolVersion; + + private File keystore; + + private char[] keystorePassword; + + private File truststore; + + private char[] truststorePassword; + + private File keyCert; + + private File key; + + private char[] keyPassword; + + private File trustedCerts; + + public String getClientName() { + return clientName; + } + + public void setClientName(String clientName) { + this.clientName = clientName; + } + + public String getUri() { + return uri; + } + + public void setUri(String uri) { + this.uri = uri; + } + + public String getHost() { + return host; + } + + public void setHost(String host) { + this.host = host; + } + + public int getPort() { + return port; + } + + public void setPort(int port) { + this.port = port; + } + + public String getSocket() { + return socket; + } + + public void setSocket(String socket) { + this.socket = socket; + } + + public String getUsername() { + return username; + } + + public void setUsername(String username) { + this.username = username; + } + + public char[] getPassword() { + return password; + } + + public void setPassword(char[] password) { + this.password = password; + } + + public Duration getTimeout() { + return timeout; + } + + public void setTimeout(Duration timeout) { + this.timeout = timeout; + } + + public int getDatabase() { + return database; + } + + public void setDatabase(int database) { + this.database = database; + } + + public boolean isTls() { + return tls; + } + + public void setTls(boolean tls) { + this.tls = tls; + } + + public SslVerifyMode getVerifyPeer() { + return verifyPeer; + } + + public void setVerifyPeer(SslVerifyMode mode) { + this.verifyPeer = mode; + } + + public File getKeystore() { + return keystore; + } + + public void setKeystore(File keystore) { + this.keystore = keystore; + } + + public char[] getKeystorePassword() { + return keystorePassword; + } + + public void setKeystorePassword(char[] keystorePassword) { + this.keystorePassword = keystorePassword; + } + + public File getTruststore() { + return truststore; + } + + public void setTruststore(File truststore) { + this.truststore = truststore; + } + + public char[] getTruststorePassword() { + return truststorePassword; + } + + public void setTruststorePassword(char[] truststorePassword) { + this.truststorePassword = truststorePassword; + } + + public File getKeyCert() { + return keyCert; + } + + public void setKeyCert(File keyCert) { + this.keyCert = keyCert; + } + + public File getKey() { + return key; + } + + public void setKey(File key) { + this.key = key; + } + + public char[] getKeyPassword() { + return keyPassword; + } + + public void setKeyPassword(char[] keyPassword) { + this.keyPassword = keyPassword; + } + + public File getTrustedCerts() { + return trustedCerts; + } + + public void setTrustedCerts(File trustedCerts) { + this.trustedCerts = trustedCerts; + } + + public boolean isAutoReconnect() { + return autoReconnect; } - public void setUriOptions(RedisUriOptions uriOptions) { - this.uriOptions = uriOptions; + public void setAutoReconnect(boolean autoReconnect) { + this.autoReconnect = autoReconnect; } - public RedisClientOptions getClientOptions() { - return clientOptions; + public ProtocolVersion getProtocolVersion() { + return protocolVersion; } - public void setClientOptions(RedisClientOptions clientOptions) { - this.clientOptions = clientOptions; + public void setProtocolVersion(ProtocolVersion protocolVersion) { + this.protocolVersion = protocolVersion; } public boolean isCluster() { diff --git a/core/riot-core/src/main/java/com/redis/riot/core/RedisReaderOptions.java b/core/riot-core/src/main/java/com/redis/riot/core/RedisReaderOptions.java index cb29290b5..be1b141e4 100644 --- a/core/riot-core/src/main/java/com/redis/riot/core/RedisReaderOptions.java +++ b/core/riot-core/src/main/java/com/redis/riot/core/RedisReaderOptions.java @@ -14,178 +14,188 @@ public class RedisReaderOptions { - public static final int DEFAULT_QUEUE_CAPACITY = RedisItemReader.DEFAULT_QUEUE_CAPACITY; + public static final int DEFAULT_QUEUE_CAPACITY = RedisItemReader.DEFAULT_QUEUE_CAPACITY; - public static final Duration DEFAULT_POLL_TIMEOUT = RedisItemReader.DEFAULT_POLL_TIMEOUT; + public static final Duration DEFAULT_POLL_TIMEOUT = RedisItemReader.DEFAULT_POLL_TIMEOUT; - public static final int DEFAULT_THREADS = RedisItemReader.DEFAULT_THREADS; + public static final int DEFAULT_THREADS = RedisItemReader.DEFAULT_THREADS; - public static final int DEFAULT_CHUNK_SIZE = RedisItemReader.DEFAULT_CHUNK_SIZE; + public static final int DEFAULT_CHUNK_SIZE = RedisItemReader.DEFAULT_CHUNK_SIZE; - public static final int DEFAULT_POOL_SIZE = KeyValueItemReader.DEFAULT_POOL_SIZE; + public static final int DEFAULT_POOL_SIZE = KeyValueItemReader.DEFAULT_POOL_SIZE; - public static final DataSize DEFAULT_MEMORY_USAGE_LIMIT = KeyValueItemReader.DEFAULT_MEMORY_USAGE_LIMIT; + public static final DataSize DEFAULT_MEMORY_USAGE_LIMIT = KeyValueItemReader.DEFAULT_MEMORY_USAGE_LIMIT; - public static final int DEFAULT_MEMORY_USAGE_SAMPLES = KeyValueItemReader.DEFAULT_MEMORY_USAGE_SAMPLES; + public static final int DEFAULT_MEMORY_USAGE_SAMPLES = KeyValueItemReader.DEFAULT_MEMORY_USAGE_SAMPLES; - public static final OrderingStrategy DEFAULT_ORDERING = RedisItemReader.DEFAULT_ORDERING; + public static final OrderingStrategy DEFAULT_ORDERING = RedisItemReader.DEFAULT_ORDERING; - public static final int DEFAULT_NOTIFICATION_QUEUE_CAPACITY = RedisItemReader.DEFAULT_NOTIFICATION_QUEUE_CAPACITY; + public static final int DEFAULT_NOTIFICATION_QUEUE_CAPACITY = RedisItemReader.DEFAULT_NOTIFICATION_QUEUE_CAPACITY; - public static final long DEFAULT_SCAN_COUNT = 1000; + public static final long DEFAULT_SCAN_COUNT = 1000; - public static final Duration DEFAULT_FLUSH_INTERVAL = FlushingChunkProvider.DEFAULT_FLUSH_INTERVAL; + public static final Duration DEFAULT_FLUSH_INTERVAL = FlushingChunkProvider.DEFAULT_FLUSH_INTERVAL; - public static final Duration DEFAULT_IDLE_TIMEOUT = FlushingChunkProvider.DEFAULT_IDLE_TIMEOUT; + public static final Duration DEFAULT_IDLE_TIMEOUT = FlushingChunkProvider.DEFAULT_IDLE_TIMEOUT; - private String keyPattern; + private String keyPattern; - private DataType keyType; + private DataType keyType; - private long scanCount = DEFAULT_SCAN_COUNT; + private long scanCount = DEFAULT_SCAN_COUNT; - private int queueCapacity = DEFAULT_QUEUE_CAPACITY; + private int queueCapacity = DEFAULT_QUEUE_CAPACITY; - private Duration pollTimeout = DEFAULT_POLL_TIMEOUT; + private Duration pollTimeout = DEFAULT_POLL_TIMEOUT; - private int threads = DEFAULT_THREADS; + private int threads = DEFAULT_THREADS; - private int chunkSize = DEFAULT_CHUNK_SIZE; + private int chunkSize = DEFAULT_CHUNK_SIZE; - private int poolSize = DEFAULT_POOL_SIZE; + private int poolSize = DEFAULT_POOL_SIZE; - private ReadFrom readFrom; + private ReadFrom readFrom; - private DataSize memoryUsageLimit = DEFAULT_MEMORY_USAGE_LIMIT; + private DataSize memoryUsageLimit = DEFAULT_MEMORY_USAGE_LIMIT; - private int memoryUsageSamples = DEFAULT_MEMORY_USAGE_SAMPLES; + private int memoryUsageSamples = DEFAULT_MEMORY_USAGE_SAMPLES; - private OrderingStrategy orderingStrategy = DEFAULT_ORDERING; + private OrderingStrategy orderingStrategy = DEFAULT_ORDERING; - private int notificationQueueCapacity = DEFAULT_NOTIFICATION_QUEUE_CAPACITY; + private int notificationQueueCapacity = DEFAULT_NOTIFICATION_QUEUE_CAPACITY; - private Duration flushInterval = DEFAULT_FLUSH_INTERVAL; + private Duration flushInterval = DEFAULT_FLUSH_INTERVAL; - private Duration idleTimeout = DEFAULT_IDLE_TIMEOUT; + private Duration idleTimeout = DEFAULT_IDLE_TIMEOUT; - public Duration getFlushInterval() { - return flushInterval; - } + private KeyFilterOptions keyFilterOptions = new KeyFilterOptions(); - public void setFlushInterval(Duration flushInterval) { - this.flushInterval = flushInterval; - } + public KeyFilterOptions getKeyFilterOptions() { + return keyFilterOptions; + } - public Duration getIdleTimeout() { - return idleTimeout; - } + public void setKeyFilterOptions(KeyFilterOptions keyFilterOptions) { + this.keyFilterOptions = keyFilterOptions; + } - public void setIdleTimeout(Duration idleTimeout) { - this.idleTimeout = idleTimeout; - } + public Duration getFlushInterval() { + return flushInterval; + } - public OrderingStrategy getOrderingStrategy() { - return orderingStrategy; - } + public void setFlushInterval(Duration flushInterval) { + this.flushInterval = flushInterval; + } - public void setOrderingStrategy(OrderingStrategy orderingStrategy) { - this.orderingStrategy = orderingStrategy; - } + public Duration getIdleTimeout() { + return idleTimeout; + } - public int getNotificationQueueCapacity() { - return notificationQueueCapacity; - } + public void setIdleTimeout(Duration idleTimeout) { + this.idleTimeout = idleTimeout; + } - public void setNotificationQueueCapacity(int notificationQueueCapacity) { - this.notificationQueueCapacity = notificationQueueCapacity; - } + public OrderingStrategy getOrderingStrategy() { + return orderingStrategy; + } - public String getKeyPattern() { - return keyPattern; - } + public void setOrderingStrategy(OrderingStrategy orderingStrategy) { + this.orderingStrategy = orderingStrategy; + } - public void setKeyPattern(String scanMatch) { - this.keyPattern = scanMatch; - } + public int getNotificationQueueCapacity() { + return notificationQueueCapacity; + } - public long getScanCount() { - return scanCount; - } + public void setNotificationQueueCapacity(int notificationQueueCapacity) { + this.notificationQueueCapacity = notificationQueueCapacity; + } - public void setScanCount(long count) { - this.scanCount = count; - } + public String getKeyPattern() { + return keyPattern; + } - public DataType getKeyType() { - return keyType; - } + public void setKeyPattern(String scanMatch) { + this.keyPattern = scanMatch; + } - public void setKeyType(DataType type) { - this.keyType = type; - } + public long getScanCount() { + return scanCount; + } - public int getQueueCapacity() { - return queueCapacity; - } + public void setScanCount(long count) { + this.scanCount = count; + } - public void setQueueCapacity(int queueCapacity) { - this.queueCapacity = queueCapacity; - } + public DataType getKeyType() { + return keyType; + } - public Duration getPollTimeout() { - return pollTimeout; - } + public void setKeyType(DataType type) { + this.keyType = type; + } - public void setPollTimeout(Duration pollTimeout) { - this.pollTimeout = pollTimeout; - } + public int getQueueCapacity() { + return queueCapacity; + } - public int getThreads() { - return threads; - } + public void setQueueCapacity(int queueCapacity) { + this.queueCapacity = queueCapacity; + } - public void setThreads(int threads) { - this.threads = threads; - } + public Duration getPollTimeout() { + return pollTimeout; + } - public int getChunkSize() { - return chunkSize; - } + public void setPollTimeout(Duration pollTimeout) { + this.pollTimeout = pollTimeout; + } - public void setChunkSize(int chunkSize) { - this.chunkSize = chunkSize; - } + public int getThreads() { + return threads; + } - public int getPoolSize() { - return poolSize; - } + public void setThreads(int threads) { + this.threads = threads; + } - public void setPoolSize(int poolSize) { - this.poolSize = poolSize; - } + public int getChunkSize() { + return chunkSize; + } - public ReadFrom getReadFrom() { - return readFrom; - } + public void setChunkSize(int chunkSize) { + this.chunkSize = chunkSize; + } - public void setReadFrom(ReadFrom readFrom) { - this.readFrom = readFrom; - } + public int getPoolSize() { + return poolSize; + } - public DataSize getMemoryUsageLimit() { - return memoryUsageLimit; - } + public void setPoolSize(int poolSize) { + this.poolSize = poolSize; + } - public void setMemoryUsageLimit(DataSize memoryUsageLimit) { - this.memoryUsageLimit = memoryUsageLimit; - } + public ReadFrom getReadFrom() { + return readFrom; + } - public int getMemoryUsageSamples() { - return memoryUsageSamples; - } + public void setReadFrom(ReadFrom readFrom) { + this.readFrom = readFrom; + } - public void setMemoryUsageSamples(int memoryUsageSamples) { - this.memoryUsageSamples = memoryUsageSamples; - } + public DataSize getMemoryUsageLimit() { + return memoryUsageLimit; + } + + public void setMemoryUsageLimit(DataSize memoryUsageLimit) { + this.memoryUsageLimit = memoryUsageLimit; + } + + public int getMemoryUsageSamples() { + return memoryUsageSamples; + } + + public void setMemoryUsageSamples(int memoryUsageSamples) { + this.memoryUsageSamples = memoryUsageSamples; + } } diff --git a/core/riot-core/src/main/java/com/redis/riot/core/RedisSslOptions.java b/core/riot-core/src/main/java/com/redis/riot/core/RedisSslOptions.java deleted file mode 100644 index 4c2d03036..000000000 --- a/core/riot-core/src/main/java/com/redis/riot/core/RedisSslOptions.java +++ /dev/null @@ -1,87 +0,0 @@ -package com.redis.riot.core; - -import java.io.File; - -public class RedisSslOptions { - - private File keystore; - - private char[] keystorePassword; - - private File truststore; - - private char[] truststorePassword; - - private File keyCert; - - private File key; - - private char[] keyPassword; - - private File trustedCerts; - - public File getKeystore() { - return keystore; - } - - public void setKeystore(File keystore) { - this.keystore = keystore; - } - - public char[] getKeystorePassword() { - return keystorePassword; - } - - public void setKeystorePassword(char[] keystorePassword) { - this.keystorePassword = keystorePassword; - } - - public File getTruststore() { - return truststore; - } - - public void setTruststore(File truststore) { - this.truststore = truststore; - } - - public char[] getTruststorePassword() { - return truststorePassword; - } - - public void setTruststorePassword(char[] truststorePassword) { - this.truststorePassword = truststorePassword; - } - - public File getKeyCert() { - return keyCert; - } - - public void setKeyCert(File keyCert) { - this.keyCert = keyCert; - } - - public File getKey() { - return key; - } - - public void setKey(File key) { - this.key = key; - } - - public char[] getKeyPassword() { - return keyPassword; - } - - public void setKeyPassword(char[] keyPassword) { - this.keyPassword = keyPassword; - } - - public File getTrustedCerts() { - return trustedCerts; - } - - public void setTrustedCerts(File trustedCerts) { - this.trustedCerts = trustedCerts; - } - -} diff --git a/core/riot-core/src/main/java/com/redis/riot/core/RedisUriOptions.java b/core/riot-core/src/main/java/com/redis/riot/core/RedisUriOptions.java deleted file mode 100644 index 10187f5a5..000000000 --- a/core/riot-core/src/main/java/com/redis/riot/core/RedisUriOptions.java +++ /dev/null @@ -1,126 +0,0 @@ -package com.redis.riot.core; - -import java.time.Duration; - -import io.lettuce.core.RedisURI; -import io.lettuce.core.SslVerifyMode; - -public class RedisUriOptions { - - public static final String DEFAULT_HOST = "127.0.0.1"; - - public static final int DEFAULT_PORT = 6379; - - public static final SslVerifyMode DEFAULT_VERIFY_PEER = SslVerifyMode.FULL; - - private String uri; - - private String host = DEFAULT_HOST; - - private int port = DEFAULT_PORT; - - private String socket; - - private String username; - - private char[] password; - - private Duration timeout = RedisURI.DEFAULT_TIMEOUT_DURATION; - - private int database; - - private boolean tls; - - private SslVerifyMode verifyPeer = DEFAULT_VERIFY_PEER; - - private String clientName; - - public String getClientName() { - return clientName; - } - - public void setClientName(String clientName) { - this.clientName = clientName; - } - - public String getUri() { - return uri; - } - - public void setUri(String uri) { - this.uri = uri; - } - - public String getHost() { - return host; - } - - public void setHost(String host) { - this.host = host; - } - - public int getPort() { - return port; - } - - public void setPort(int port) { - this.port = port; - } - - public String getSocket() { - return socket; - } - - public void setSocket(String socket) { - this.socket = socket; - } - - public String getUsername() { - return username; - } - - public void setUsername(String username) { - this.username = username; - } - - public char[] getPassword() { - return password; - } - - public void setPassword(char[] password) { - this.password = password; - } - - public Duration getTimeout() { - return timeout; - } - - public void setTimeout(Duration timeout) { - this.timeout = timeout; - } - - public int getDatabase() { - return database; - } - - public void setDatabase(int database) { - this.database = database; - } - - public boolean isTls() { - return tls; - } - - public void setTls(boolean tls) { - this.tls = tls; - } - - public SslVerifyMode getVerifyPeer() { - return verifyPeer; - } - - public void setVerifyPeer(SslVerifyMode mode) { - this.verifyPeer = mode; - } - -} diff --git a/core/riot-core/src/main/java/com/redis/riot/core/Replication.java b/core/riot-core/src/main/java/com/redis/riot/core/Replication.java index e3fdfa3d9..ed67a626e 100644 --- a/core/riot-core/src/main/java/com/redis/riot/core/Replication.java +++ b/core/riot-core/src/main/java/com/redis/riot/core/Replication.java @@ -1,5 +1,7 @@ package com.redis.riot.core; +import java.time.Duration; + import org.springframework.batch.core.Job; import org.springframework.batch.core.job.builder.FlowBuilder; import org.springframework.batch.core.job.builder.JobFlowBuilder; @@ -20,6 +22,7 @@ import com.redis.spring.batch.common.KeyComparisonItemReader; import com.redis.spring.batch.common.KeyValue; import com.redis.spring.batch.reader.DumpItemReader; +import com.redis.spring.batch.reader.KeyComparisonValueReader; import com.redis.spring.batch.reader.KeyTypeItemReader; import com.redis.spring.batch.reader.KeyValueItemReader; import com.redis.spring.batch.reader.StructItemReader; @@ -55,28 +58,74 @@ public class Replication extends AbstractExport { private ReplicationType type = DEFAULT_TYPE; + public static final CompareMode DEFAULT_COMPARE_MODE = CompareMode.QUICK; + + private boolean showDiffs; + + private CompareMode compareMode = DEFAULT_COMPARE_MODE; + + private Duration ttlTolerance = KeyComparisonValueReader.DEFAULT_TTL_TOLERANCE; + private RedisOptions targetRedisOptions = new RedisOptions(); private ReadFrom targetReadFrom; - private RedisWriterOptions targetWriterOptions = new RedisWriterOptions(); + private RedisWriterOptions writerOptions = new RedisWriterOptions(); + + public CompareMode getCompareMode() { + return compareMode; + } + + public void setCompareMode(CompareMode mode) { + this.compareMode = mode; + } + + public boolean isShowDiffs() { + return showDiffs; + } + + public void setShowDiffs(boolean showDiff) { + this.showDiffs = showDiff; + } + + public Duration getTtlTolerance() { + return ttlTolerance; + } + + public void setTtlTolerance(Duration ttlTolerance) { + this.ttlTolerance = ttlTolerance; + } - private KeyComparisonOptions comparisonOptions = new KeyComparisonOptions(); + public RedisOptions getTargetRedisOptions() { + return targetRedisOptions; + } + + public void setTargetRedisOptions(RedisOptions targetRedisOptions) { + this.targetRedisOptions = targetRedisOptions; + } - public void setTargetRedisOptions(RedisOptions options) { - this.targetRedisOptions = options; + public ReadFrom getTargetReadFrom() { + return targetReadFrom; } - public void setTargetReadFrom(ReadFrom readFrom) { - this.targetReadFrom = readFrom; + public void setTargetReadFrom(ReadFrom targetReadFrom) { + this.targetReadFrom = targetReadFrom; } - public void setTargetWriterOptions(RedisWriterOptions options) { - this.targetWriterOptions = options; + public RedisWriterOptions getWriterOptions() { + return writerOptions; } - public void setComparisonOptions(KeyComparisonOptions options) { - this.comparisonOptions = options; + public void setWriterOptions(RedisWriterOptions writerOptions) { + this.writerOptions = writerOptions; + } + + public ReplicationMode getMode() { + return mode; + } + + public ReplicationType getType() { + return type; } public void setMode(ReplicationMode mode) { @@ -139,7 +188,7 @@ private FlowBuilder flow(String name) { } private boolean shouldCompare() { - return !comparisonOptions.isSkip() && !getStepOptions().isDryRun(); + return compareMode != CompareMode.NONE && !isDryRun(); } private FaultTolerantStepBuilder, KeyValue> scanStep(ReplicationContext context) { @@ -197,7 +246,7 @@ private TaskletStep compareStep(ReplicationContext context) { reader.setName(name(STEP_COMPARE, "reader")); KeyComparisonStatusCountItemWriter writer = new KeyComparisonStatusCountItemWriter(); FaultTolerantStepBuilder step = step(name(STEP_COMPARE), reader, writer); - if (comparisonOptions.isShowDiffs()) { + if (showDiffs) { step.listener(new KeyComparisonDiffLogger()); } step.listener(new KeyComparisonSummaryLogger(writer)); @@ -209,30 +258,26 @@ private KeyComparisonItemReader comparisonReader(ReplicationContext context) { configureReader(sourceReader, context.getRedisContext()); KeyValueItemReader targetReader = comparisonKeyValueReader(context.getTargetRedisContext().getClient()); targetReader.setReadFrom(targetReadFrom); - targetReader.setPoolSize(targetWriterOptions.getPoolSize()); + targetReader.setPoolSize(writerOptions.getPoolSize()); KeyComparisonItemReader comparisonReader = new KeyComparisonItemReader(sourceReader, targetReader); configureReader(comparisonReader, context.getRedisContext()); comparisonReader.setProcessor(processor(StringCodec.UTF8, context)); - comparisonReader.setTtlTolerance(comparisonOptions.getTtlTolerance()); + comparisonReader.setTtlTolerance(ttlTolerance); comparisonReader.setCompareStreamMessageIds(!processorOptions.isDropStreamMessageId()); return comparisonReader; } private KeyValueItemReader comparisonKeyValueReader(AbstractRedisClient client) { - if (isFullComparison()) { + if (compareMode == CompareMode.FULL) { return new StructItemReader<>(client, StringCodec.UTF8); } return new KeyTypeItemReader<>(client, StringCodec.UTF8); } - private boolean isFullComparison() { - return comparisonOptions.getMode() == KeyComparisonMode.FULL; - } - private RedisItemWriter> writer(ReplicationContext context) { AbstractRedisClient targetRedisClient = context.getTargetRedisContext().getClient(); KeyValueItemWriter writer = writer(targetRedisClient); - return writer(writer, targetWriterOptions); + return writer(writer, writerOptions); } private KeyValueItemWriter writer(AbstractRedisClient client) { diff --git a/core/riot-core/src/main/java/com/redis/riot/core/RiotUtils.java b/core/riot-core/src/main/java/com/redis/riot/core/RiotUtils.java index 80060c6e3..3b2a03ed3 100644 --- a/core/riot-core/src/main/java/com/redis/riot/core/RiotUtils.java +++ b/core/riot-core/src/main/java/com/redis/riot/core/RiotUtils.java @@ -1,5 +1,6 @@ package com.redis.riot.core; +import java.time.Duration; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; @@ -131,4 +132,8 @@ public static ItemWriter writer(Collection> write return composite; } + public static boolean isPositive(Duration duration) { + return duration != null && !duration.isNegative() && !duration.isZero(); + } + } diff --git a/core/riot-core/src/main/java/com/redis/riot/core/StepOptions.java b/core/riot-core/src/main/java/com/redis/riot/core/StepOptions.java deleted file mode 100644 index 582e87375..000000000 --- a/core/riot-core/src/main/java/com/redis/riot/core/StepOptions.java +++ /dev/null @@ -1,83 +0,0 @@ -package com.redis.riot.core; - -import java.time.Duration; - -import org.springframework.batch.core.step.skip.NeverSkipItemSkipPolicy; -import org.springframework.batch.core.step.skip.SkipPolicy; -import org.springframework.retry.policy.MaxAttemptsRetryPolicy; - -public class StepOptions { - - public static final SkipPolicy DEFAULT_SKIP_POLICY = new NeverSkipItemSkipPolicy(); - - public static final int DEFAULT_SKIP_LIMIT = 0; - - public static final int DEFAULT_RETRY_LIMIT = MaxAttemptsRetryPolicy.DEFAULT_MAX_ATTEMPTS; - - public static final Duration DEFAULT_SLEEP = Duration.ZERO; - - public static final int DEFAULT_CHUNK_SIZE = 50; - - public static final int DEFAULT_THREADS = 1; - - private int threads = DEFAULT_THREADS; - - private int chunkSize = DEFAULT_CHUNK_SIZE; - - private Duration sleep = DEFAULT_SLEEP; - - private boolean dryRun; - - private int skipLimit = DEFAULT_SKIP_LIMIT; - - private int retryLimit = DEFAULT_RETRY_LIMIT; - - public boolean isDryRun() { - return dryRun; - } - - public void setDryRun(boolean dryRun) { - this.dryRun = dryRun; - } - - public int getThreads() { - return threads; - } - - public void setThreads(int threads) { - this.threads = threads; - } - - public int getChunkSize() { - return chunkSize; - } - - public void setChunkSize(int chunkSize) { - this.chunkSize = chunkSize; - } - - public Duration getSleep() { - return sleep; - } - - public void setSleep(Duration sleep) { - this.sleep = sleep; - } - - public int getSkipLimit() { - return skipLimit; - } - - public void setSkipLimit(int skipLimit) { - this.skipLimit = skipLimit; - } - - public int getRetryLimit() { - return retryLimit; - } - - public void setRetryLimit(int retryLimit) { - this.retryLimit = retryLimit; - } - -} diff --git a/core/riot-core/src/main/java/com/redis/riot/core/ThrottledItemWriter.java b/core/riot-core/src/main/java/com/redis/riot/core/ThrottledItemWriter.java index 09ff3f349..14ee4f34c 100644 --- a/core/riot-core/src/main/java/com/redis/riot/core/ThrottledItemWriter.java +++ b/core/riot-core/src/main/java/com/redis/riot/core/ThrottledItemWriter.java @@ -17,13 +17,12 @@ public class ThrottledItemWriter extends AbstractItemStreamItemWriter { private final long sleep; - public ThrottledItemWriter(ItemWriter delegate, Duration sleepDuration) { + public ThrottledItemWriter(ItemWriter delegate, Duration sleep) { setName(ClassUtils.getShortName(getClass())); Assert.notNull(delegate, "Delegate must not be null"); - Assert.notNull(sleepDuration, "Sleep duration must not be null"); - Assert.isTrue(!sleepDuration.isNegative() && !sleepDuration.isZero(), "Sleep duration must be strictly positive"); + Assert.isTrue(RiotUtils.isPositive(sleep), "Sleep duration must be strictly positive"); this.delegate = delegate; - this.sleep = sleepDuration.toMillis(); + this.sleep = sleep.toMillis(); } @Override diff --git a/core/riot-core/src/test/java/com/redis/riot/core/test/FunctionTests.java b/core/riot-core/src/test/java/com/redis/riot/core/test/FunctionTests.java index 536946517..92b4da1da 100644 --- a/core/riot-core/src/test/java/com/redis/riot/core/test/FunctionTests.java +++ b/core/riot-core/src/test/java/com/redis/riot/core/test/FunctionTests.java @@ -1,17 +1,11 @@ package com.redis.riot.core.test; import java.util.HashMap; -import java.util.LinkedHashMap; import java.util.Map; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; -import org.springframework.batch.item.ItemProcessor; -import org.springframework.expression.Expression; -import org.springframework.expression.spel.support.StandardEvaluationContext; -import com.redis.riot.core.ProcessorOptions; -import com.redis.riot.core.RiotUtils; import com.redis.riot.core.function.StringToMapFunction; import com.redis.riot.core.function.StructToMapFunction; import com.redis.spring.batch.common.DataType; @@ -39,16 +33,4 @@ void keyValueToMap() { Assertions.assertEquals("value1", hashMap.get("field1")); } - @Test - void testMapProcessor() throws Exception { - ProcessorOptions options = new ProcessorOptions(); - Map expressions = new LinkedHashMap<>(); - expressions.put("field1", RiotUtils.parse("'test:1'")); - options.setExpressions(expressions); - ItemProcessor, Map> processor = options.processor(new StandardEvaluationContext()); - Map map = processor.process(new HashMap<>()); - Assertions.assertEquals("test:1", map.get("field1")); - // Assertions.assertEquals("1", map.get("id")); - } - } diff --git a/core/riot-core/src/test/java/com/redis/riot/core/test/ProcessorTests.java b/core/riot-core/src/test/java/com/redis/riot/core/test/ProcessorTests.java index b1badf1cc..7a00b44df 100644 --- a/core/riot-core/src/test/java/com/redis/riot/core/test/ProcessorTests.java +++ b/core/riot-core/src/test/java/com/redis/riot/core/test/ProcessorTests.java @@ -10,12 +10,14 @@ import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; +import org.springframework.batch.core.Job; import org.springframework.batch.item.ItemProcessor; import org.springframework.expression.Expression; import org.springframework.expression.spel.support.StandardEvaluationContext; +import com.redis.riot.core.AbstractMapImport; import com.redis.riot.core.KeyFilterOptions; -import com.redis.riot.core.ProcessorOptions; +import com.redis.riot.core.RiotContext; import com.redis.riot.core.RiotUtils; class ProcessorTests { @@ -30,17 +32,40 @@ void keyFilter() { Assertions.assertFalse(predicate.test("key")); } + @Test + void testMapProcessor() throws Exception { + DummyMapImport mapImport = new DummyMapImport(); + Map expressions = new LinkedHashMap<>(); + expressions.put("field1", RiotUtils.parse("'test:1'")); + mapImport.setProcessorExpressions(expressions); + StandardEvaluationContext evaluationContext = new StandardEvaluationContext(); + ItemProcessor, Map> processor = mapImport.processor(evaluationContext); + Map map = processor.process(new HashMap<>()); + Assertions.assertEquals("test:1", map.get("field1")); + // Assertions.assertEquals("1", map.get("id")); + } + + private static class DummyMapImport extends AbstractMapImport { + + @Override + protected Job job(RiotContext executionContext) { + return null; + } + + } + @Test void processor() throws Exception { - ProcessorOptions options = new ProcessorOptions(); + DummyMapImport mapImport = new DummyMapImport(); Map expressions = new LinkedHashMap<>(); expressions.put("field1", RiotUtils.parse("'value1'")); expressions.put("field2", RiotUtils.parse("field1")); expressions.put("field3", RiotUtils.parse("1")); expressions.put("field4", RiotUtils.parse("2")); expressions.put("field5", RiotUtils.parse("field3+field4")); - options.setExpressions(expressions); - ItemProcessor, Map> processor = options.processor(new StandardEvaluationContext()); + mapImport.setProcessorExpressions(expressions); + ItemProcessor, Map> processor = mapImport + .processor(new StandardEvaluationContext()); for (int index = 0; index < 10; index++) { Map result = processor.process(new HashMap<>()); assertEquals(5, result.size()); @@ -52,9 +77,10 @@ void processor() throws Exception { @Test void processorFilter() throws Exception { - ProcessorOptions options = new ProcessorOptions(); - options.setFilter(RiotUtils.parse("index<10")); - ItemProcessor, Map> processor = options.processor(new StandardEvaluationContext()); + DummyMapImport mapImport = new DummyMapImport(); + mapImport.setFilterExpression(RiotUtils.parse("index<10")); + ItemProcessor, Map> processor = mapImport + .processor(new StandardEvaluationContext()); for (int index = 0; index < 100; index++) { Map map = new HashMap<>(); map.put("index", index); diff --git a/core/riot-core/src/test/java/com/redis/riot/core/test/ReplicationTests.java b/core/riot-core/src/test/java/com/redis/riot/core/test/ReplicationTests.java index 1d7128f2e..f8998138f 100644 --- a/core/riot-core/src/test/java/com/redis/riot/core/test/ReplicationTests.java +++ b/core/riot-core/src/test/java/com/redis/riot/core/test/ReplicationTests.java @@ -13,7 +13,6 @@ import com.redis.riot.core.KeyValueProcessorOptions; import com.redis.riot.core.PredicateItemProcessor; import com.redis.riot.core.RedisOptions; -import com.redis.riot.core.RedisUriOptions; import com.redis.riot.core.Replication; import com.redis.riot.core.ReplicationType; import com.redis.riot.core.RiotUtils; @@ -59,9 +58,7 @@ protected void execute(Replication replication, TestInfo info) { private RedisOptions redisOptions(RedisServer redis) { RedisOptions options = new RedisOptions(); - RedisUriOptions uriOptions = new RedisUriOptions(); - uriOptions.setUri(redis.getRedisURI()); - options.setUriOptions(uriOptions); + options.setUri(redis.getRedisURI()); options.setCluster(redis.isCluster()); return options; } diff --git a/plugins/riot/src/main/java/com/redis/riot/cli/AbstractCommand.java b/plugins/riot/src/main/java/com/redis/riot/cli/AbstractCommand.java index 737f4e9ba..40e66d9b8 100644 --- a/plugins/riot/src/main/java/com/redis/riot/cli/AbstractCommand.java +++ b/plugins/riot/src/main/java/com/redis/riot/cli/AbstractCommand.java @@ -14,7 +14,7 @@ public abstract class AbstractCommand extends BaseCommand implements Runnable { @Override public void run() { AbstractRiotRunnable executable = executable(); - executable.setRedisOptions(parent.redisArgs.redisClientOptions()); + executable.setRedisOptions(parent.redisArgs.redisOptions()); executable.run(); } diff --git a/plugins/riot/src/main/java/com/redis/riot/cli/AbstractExportCommand.java b/plugins/riot/src/main/java/com/redis/riot/cli/AbstractExportCommand.java index 288f329a5..672817505 100644 --- a/plugins/riot/src/main/java/com/redis/riot/cli/AbstractExportCommand.java +++ b/plugins/riot/src/main/java/com/redis/riot/cli/AbstractExportCommand.java @@ -12,12 +12,9 @@ public abstract class AbstractExportCommand extends AbstractJobCommand { - @ArgGroup(exclusive = false, heading = "Redis reader options%n") + @ArgGroup(exclusive = false, heading = "Reader options%n") RedisReaderArgs readerArgs = new RedisReaderArgs(); - @ArgGroup(exclusive = false) - KeyFilterArgs keyFilterArgs = new KeyFilterArgs(); - @ArgGroup(exclusive = false, heading = "Processor options%n") KeyValueProcessorArgs processorArgs = new KeyValueProcessorArgs(); @@ -25,8 +22,6 @@ public abstract class AbstractExportCommand extends AbstractJobCommand { protected AbstractJobRunnable getJobExecutable() { AbstractExport export = getExport(); export.setReaderOptions(readerArgs.readerOptions()); - export.setKeyFilterOptions(keyFilterArgs.keyFilterOptions()); - export.setEvaluationContextOptions(evaluationContextOptions()); export.setProcessorOptions(processorArgs.processorOptions()); return export; } diff --git a/plugins/riot/src/main/java/com/redis/riot/cli/AbstractImportCommand.java b/plugins/riot/src/main/java/com/redis/riot/cli/AbstractImportCommand.java index f58b97a79..273825f23 100644 --- a/plugins/riot/src/main/java/com/redis/riot/cli/AbstractImportCommand.java +++ b/plugins/riot/src/main/java/com/redis/riot/cli/AbstractImportCommand.java @@ -5,6 +5,8 @@ import java.util.Map; import java.util.stream.Collectors; +import org.springframework.expression.Expression; + import com.redis.riot.cli.operation.DelCommand; import com.redis.riot.cli.operation.ExpireCommand; import com.redis.riot.cli.operation.GeoaddCommand; @@ -23,8 +25,8 @@ import com.redis.riot.core.RiotStep; import com.redis.spring.batch.writer.WriteOperation; -import picocli.CommandLine.ArgGroup; import picocli.CommandLine.Command; +import picocli.CommandLine.Option; @Command(subcommands = { ExpireCommand.class, DelCommand.class, GeoaddCommand.class, HsetCommand.class, LpushCommand.class, RpushCommand.class, SaddCommand.class, SetCommand.class, XaddCommand.class, ZaddCommand.class, SugaddCommand.class, @@ -32,8 +34,11 @@ TsAddCommand.class }, subcommandsRepeatable = true, synopsisSubcommandLabel = "[REDIS COMMAND...]", commandListHeading = "Redis commands:%n") public abstract class AbstractImportCommand extends AbstractJobCommand { - @ArgGroup(exclusive = false, heading = "Processor options%n") - ProcessorArgs processorArgs = new ProcessorArgs(); + @Option(arity = "1..*", names = "--proc", description = "SpEL expressions in the form field1=\"exp\" field2=\"exp\"...", paramLabel = "") + Map processorExpressions; + + @Option(names = "--filter", description = "Discard records using a SpEL expression.", paramLabel = "") + Expression filter; /** * Initialized manually during command parsing @@ -56,7 +61,8 @@ protected List>> operations() protected AbstractMapImport getJobExecutable() { AbstractMapImport executable = getMapImportExecutable(); executable.setOperations(operations()); - executable.setProcessorOptions(processorArgs.processorOptions()); + executable.setProcessorExpressions(processorExpressions); + executable.setFilterExpression(filter); return executable; } diff --git a/plugins/riot/src/main/java/com/redis/riot/cli/AbstractJobCommand.java b/plugins/riot/src/main/java/com/redis/riot/cli/AbstractJobCommand.java index 9317561fd..76c45486c 100644 --- a/plugins/riot/src/main/java/com/redis/riot/cli/AbstractJobCommand.java +++ b/plugins/riot/src/main/java/com/redis/riot/cli/AbstractJobCommand.java @@ -1,5 +1,7 @@ package com.redis.riot.cli; +import java.time.Duration; +import java.util.Map; import java.util.function.LongSupplier; import java.util.function.Supplier; @@ -7,27 +9,56 @@ import org.slf4j.LoggerFactory; import org.springframework.batch.core.ItemWriteListener; import org.springframework.batch.core.StepExecutionListener; +import org.springframework.expression.Expression; import org.springframework.util.ClassUtils; -import com.redis.riot.cli.ProgressArgs.ProgressStyle; import com.redis.riot.core.AbstractJobRunnable; -import com.redis.riot.core.EvaluationContextOptions; import com.redis.riot.core.RiotStep; import me.tongfei.progressbar.DelegatingProgressBarConsumer; import me.tongfei.progressbar.ProgressBarBuilder; -import picocli.CommandLine.ArgGroup; +import me.tongfei.progressbar.ProgressBarStyle; +import picocli.CommandLine.Option; abstract class AbstractJobCommand extends AbstractCommand { - @ArgGroup(exclusive = false, heading = "Execution options%n") - StepArgs stepArgs = new StepArgs(); + public enum ProgressStyle { + BLOCK, BAR, ASCII, LOG, NONE + } + + @Option(names = "--sleep", description = "Duration in ms to sleep after each batch write (default: ${DEFAULT-VALUE}).", paramLabel = "") + long sleep; + + @Option(names = "--threads", description = "Thread count (default: ${DEFAULT-VALUE}).", paramLabel = "") + int threads = AbstractJobRunnable.DEFAULT_THREADS; + + @Option(names = { "-b", + "--batch" }, description = "Number of items in each batch (default: ${DEFAULT-VALUE}).", paramLabel = "") + int chunkSize = AbstractJobRunnable.DEFAULT_CHUNK_SIZE; + + @Option(names = "--dry-run", description = "Enable dummy writes.") + boolean dryRun; + + @Option(names = "--ft", description = "Enable step fault-tolerance. Use in conjunction with retry and skip limit/policy.") + boolean faultTolerance; + + @Option(names = "--skip-limit", description = "LIMIT skip policy: max number of failed items before considering the transfer has failed (default: ${DEFAULT-VALUE}).", paramLabel = "") + int skipLimit = AbstractJobRunnable.DEFAULT_SKIP_LIMIT; + + @Option(names = "--retry-limit", description = "Maximum number of times to try failed items. 0 and 1 both translate to no retry. (default: ${DEFAULT-VALUE}).", paramLabel = "") + private int retryLimit = AbstractJobRunnable.DEFAULT_RETRY_LIMIT; - @ArgGroup(exclusive = false) - ProgressArgs progressArgs = new ProgressArgs(); + @Option(names = "--progress", description = "Progress style: ${COMPLETION-CANDIDATES} (default: ${DEFAULT-VALUE}).", paramLabel = "