diff --git a/connectors/riot-db/src/main/java/com/redis/riot/db/DatabaseImport.java b/connectors/riot-db/src/main/java/com/redis/riot/db/DatabaseImport.java index cf2a2938c..7d023d041 100644 --- a/connectors/riot-db/src/main/java/com/redis/riot/db/DatabaseImport.java +++ b/connectors/riot-db/src/main/java/com/redis/riot/db/DatabaseImport.java @@ -6,13 +6,14 @@ import org.springframework.batch.core.Job; import org.springframework.batch.item.ItemReader; +import org.springframework.batch.item.ItemWriter; import org.springframework.batch.item.database.AbstractCursorItemReader; import org.springframework.batch.item.database.builder.JdbcCursorItemReaderBuilder; import org.springframework.jdbc.core.ColumnMapRowMapper; +import org.springframework.util.ClassUtils; import com.redis.riot.core.AbstractMapImport; import com.redis.riot.core.RiotContext; -import com.redis.riot.core.StepBuilder; public class DatabaseImport extends AbstractMapImport { @@ -104,11 +105,10 @@ public void setVerifyCursorPosition(boolean verifyCursorPosition) { @Override protected Job job(RiotContext executionContext) { - StepBuilder, Map> step = createStep(); - step.name(getName()); - step.reader(reader()); - step.writer(writer(executionContext)); - return jobBuilder().start(step.build().build()).build(); + String name = ClassUtils.getShortName(getClass()); + ItemReader> reader = reader(); + ItemWriter> writer = writer(executionContext); + return jobBuilder().start(step(name, reader, writer).build()).build(); } private ItemReader> reader() { diff --git a/connectors/riot-faker/src/main/java/com/redis/riot/faker/FakerImport.java b/connectors/riot-faker/src/main/java/com/redis/riot/faker/FakerImport.java index 6c18941b3..45c38a999 100644 --- a/connectors/riot-faker/src/main/java/com/redis/riot/faker/FakerImport.java +++ b/connectors/riot-faker/src/main/java/com/redis/riot/faker/FakerImport.java @@ -5,8 +5,10 @@ import java.util.Map; import org.springframework.batch.core.Job; +import org.springframework.batch.item.ItemWriter; import org.springframework.expression.Expression; import org.springframework.util.Assert; +import org.springframework.util.ClassUtils; import com.redis.lettucemod.api.sync.RediSearchCommands; import com.redis.lettucemod.search.Field; @@ -15,7 +17,6 @@ import com.redis.riot.core.AbstractMapImport; import com.redis.riot.core.RiotContext; import com.redis.riot.core.RiotUtils; -import com.redis.riot.core.StepBuilder; import com.redis.spring.batch.common.Range; public class FakerImport extends AbstractMapImport { @@ -69,11 +70,10 @@ public void setLocale(Locale locale) { @Override protected Job job(RiotContext executionContext) { - StepBuilder, Map> step = createStep(); - step.name(getName()); - step.reader(reader(executionContext)); - step.writer(writer(executionContext)); - return jobBuilder().start(step.build().build()).build(); + String name = ClassUtils.getShortName(getClass()); + FakerItemReader reader = reader(executionContext); + ItemWriter> writer = writer(executionContext); + return jobBuilder().start(step(name, reader, writer).build()).build(); } private FakerItemReader reader(RiotContext executionContext) { diff --git a/connectors/riot-faker/src/test/java/com/redis/riot/faker/FakerReaderTests.java b/connectors/riot-faker/src/test/java/com/redis/riot/faker/FakerReaderTests.java index cff4b5317..95603d814 100644 --- a/connectors/riot-faker/src/test/java/com/redis/riot/faker/FakerReaderTests.java +++ b/connectors/riot-faker/src/test/java/com/redis/riot/faker/FakerReaderTests.java @@ -1,5 +1,6 @@ package com.redis.riot.faker; +import java.util.ArrayList; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; @@ -7,12 +8,19 @@ import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; import org.springframework.batch.item.ExecutionContext; - -import com.redis.spring.batch.util.BatchUtils; +import org.springframework.batch.item.ItemReader; class FakerReaderTests { - @SuppressWarnings("deprecation") + public static List readAll(ItemReader reader) throws Exception { + List list = new ArrayList<>(); + T element; + while ((element = reader.read()) != null) { + list.add(element); + } + return list; + } + @Test void fakerReader() throws Exception { int count = 100; @@ -25,7 +33,7 @@ void fakerReader() throws Exception { reader.setStringFields(fields); reader.setMaxItemCount(count); reader.open(new ExecutionContext()); - List> items = BatchUtils.readAll(reader); + List> items = readAll(reader); reader.close(); Assertions.assertEquals(count, items.size()); Assertions.assertEquals(1, items.get(0).get("index")); diff --git a/connectors/riot-file/src/main/java/com/redis/riot/file/FileDumpExport.java b/connectors/riot-file/src/main/java/com/redis/riot/file/FileDumpExport.java index e80460caa..c9bad66bd 100644 --- a/connectors/riot-file/src/main/java/com/redis/riot/file/FileDumpExport.java +++ b/connectors/riot-file/src/main/java/com/redis/riot/file/FileDumpExport.java @@ -3,6 +3,7 @@ import java.io.IOException; import org.springframework.batch.core.Job; +import org.springframework.batch.item.ItemProcessor; import org.springframework.batch.item.ItemWriter; import org.springframework.batch.item.json.JacksonJsonObjectMarshaller; import org.springframework.batch.item.json.JsonObjectMarshaller; @@ -14,7 +15,6 @@ import com.redis.riot.core.RedisContext; import com.redis.riot.core.RiotContext; import com.redis.riot.core.RiotExecutionException; -import com.redis.riot.core.StepBuilder; import com.redis.riot.file.resource.JsonResourceItemWriter; import com.redis.riot.file.resource.JsonResourceItemWriterBuilder; import com.redis.riot.file.resource.XmlResourceItemWriter; @@ -129,12 +129,10 @@ private JsonObjectMarshaller> xmlMarshaller() { @Override protected Job job(RiotContext context) { - StepBuilder, KeyValue> step = createStep(); - step.name(getName()); - step.reader(reader(context.getRedisContext())); - step.writer(writer()); - step.processor(processor(StringCodec.UTF8, context)); - return jobBuilder().start(step.build().build()).build(); + StructItemReader reader = reader(context.getRedisContext()); + ItemWriter> writer = writer(); + ItemProcessor, KeyValue> processor = processor(StringCodec.UTF8, context); + return jobBuilder().start(step(getName(), reader, processor, writer).build()).build(); } private StructItemReader reader(RedisContext context) { diff --git a/connectors/riot-file/src/main/java/com/redis/riot/file/FileDumpImport.java b/connectors/riot-file/src/main/java/com/redis/riot/file/FileDumpImport.java index 5cf4cd4b7..1586960c5 100644 --- a/connectors/riot-file/src/main/java/com/redis/riot/file/FileDumpImport.java +++ b/connectors/riot-file/src/main/java/com/redis/riot/file/FileDumpImport.java @@ -13,7 +13,7 @@ import com.redis.riot.core.AbstractStructImport; import com.redis.riot.core.RiotContext; -import com.redis.riot.core.StepBuilder; +import com.redis.spring.batch.RedisItemWriter; import com.redis.spring.batch.common.KeyValue; public class FileDumpImport extends AbstractStructImport { @@ -43,7 +43,7 @@ public void setType(FileDumpType type) { @Override protected Job job(RiotContext executionContext) { Iterator steps = FileUtils.inputResources(files, fileOptions).stream().map(r -> step(executionContext, r)) - .map(StepBuilder::build).map(FaultTolerantStepBuilder::build).iterator(); + .map(FaultTolerantStepBuilder::build).iterator(); if (!steps.hasNext()) { throw new IllegalArgumentException("No file found"); } @@ -54,17 +54,13 @@ protected Job job(RiotContext executionContext) { return job.build(); } - @SuppressWarnings({ "rawtypes", "unchecked" }) - private StepBuilder, KeyValue> step(RiotContext executionContext, Resource resource) { - StepBuilder, KeyValue> step = createStep(); - step.name(resource.getDescription()); - step.reader((ItemReader) reader(resource)); - step.writer(writer(executionContext)); - return step; + private FaultTolerantStepBuilder, KeyValue> step(RiotContext executionContext, Resource resource) { + ItemReader> reader = reader(resource); + RedisItemWriter> writer = writer(executionContext); + return step(resource.getDescription(), reader, writer); } - @SuppressWarnings("rawtypes") - private ItemReader reader(Resource resource) { + private ItemReader> reader(Resource resource) { if (type == FileDumpType.XML) { return FileUtils.xmlReader(resource, KeyValue.class); } diff --git a/connectors/riot-file/src/main/java/com/redis/riot/file/FileImport.java b/connectors/riot-file/src/main/java/com/redis/riot/file/FileImport.java index e7dddacc8..e778221fc 100644 --- a/connectors/riot-file/src/main/java/com/redis/riot/file/FileImport.java +++ b/connectors/riot-file/src/main/java/com/redis/riot/file/FileImport.java @@ -16,8 +16,10 @@ import org.springframework.batch.core.Job; import org.springframework.batch.core.Step; import org.springframework.batch.core.job.builder.SimpleJobBuilder; +import org.springframework.batch.core.step.builder.FaultTolerantStepBuilder; import org.springframework.batch.item.ItemProcessor; import org.springframework.batch.item.ItemReader; +import org.springframework.batch.item.ItemWriter; import org.springframework.batch.item.file.FlatFileItemReader; import org.springframework.batch.item.file.builder.FlatFileItemReaderBuilder; import org.springframework.batch.item.file.mapping.FieldSetMapper; @@ -37,11 +39,10 @@ import com.redis.riot.core.AbstractMapImport; import com.redis.riot.core.RiotContext; -import com.redis.riot.core.StepBuilder; +import com.redis.riot.core.RiotUtils; import com.redis.riot.core.function.MapToFieldFunction; import com.redis.riot.core.function.RegexNamedGroupFunction; import com.redis.riot.core.function.ToMapFunction; -import com.redis.spring.batch.util.BatchUtils; public class FileImport extends AbstractMapImport { @@ -160,15 +161,13 @@ private Step step(RiotContext context, Resource resource) { if (maxItemCount != null && reader instanceof AbstractItemCountingItemStreamItemReader) { ((AbstractItemCountingItemStreamItemReader>) reader).setMaxItemCount(maxItemCount); } - String name = resource.getDescription(); - StepBuilder, Map> step = createStep(); - step.name(name); - step.reader(reader); - step.writer(writer(context)); - step.processor(processor(context)); - step.addSkippableException(ParseException.class); - step.addNonRetriableException(ParseException.class); - return step.build().build(); + ItemProcessor, Map> processor = processor(context); + ItemWriter> writer = writer(context); + FaultTolerantStepBuilder, Map> step = step(resource.getDescription(), reader, + processor, writer); + step.skip(ParseException.class); + step.noRetry(ParseException.class); + return step.build(); } private ItemReader> reader(Resource resource) { @@ -319,7 +318,7 @@ protected ItemProcessor, Map> processor(Riot if (processor == null) { return regexProcessor; } - return BatchUtils.processor(processor, regexProcessor); + return RiotUtils.processor(processor, regexProcessor); } diff --git a/core/riot-core/riot-core.gradle b/core/riot-core/riot-core.gradle index 4b7d06cf2..4c848d6a5 100644 --- a/core/riot-core/riot-core.gradle +++ b/core/riot-core/riot-core.gradle @@ -32,7 +32,7 @@ dependencies { implementation 'org.apache.commons:commons-pool2' testImplementation 'org.slf4j:slf4j-simple' testImplementation group: 'com.redis', name: 'spring-batch-redis', version: springBatchRedisVersion, classifier: 'tests' - testImplementation group: 'org.awaitility', name: 'awaitility', version: awaitilityVersion + testImplementation 'org.awaitility:awaitility' } compileJava { diff --git a/core/riot-core/src/main/java/com/redis/riot/core/AbstractExport.java b/core/riot-core/src/main/java/com/redis/riot/core/AbstractExport.java index 8cf2d4529..2eb448b19 100644 --- a/core/riot-core/src/main/java/com/redis/riot/core/AbstractExport.java +++ b/core/riot-core/src/main/java/com/redis/riot/core/AbstractExport.java @@ -7,10 +7,10 @@ import org.springframework.batch.item.function.FunctionItemProcessor; import org.springframework.expression.EvaluationContext; +import com.redis.riot.core.function.DropStreamMessageIdFunction; import com.redis.riot.core.function.ExpressionFunction; import com.redis.riot.core.function.KeyValueOperator; import com.redis.riot.core.function.LongExpressionFunction; -import com.redis.riot.core.function.StreamMessageIdDropOperator; import com.redis.riot.core.function.StringKeyValueFunction; import com.redis.riot.core.function.ToStringKeyValueFunction; import com.redis.spring.batch.RedisItemReader; @@ -26,7 +26,7 @@ public abstract class AbstractExport extends AbstractJobRunnable { private KeyFilterOptions keyFilterOptions = new KeyFilterOptions(); - private KeyValueProcessorOptions processorOptions = new KeyValueProcessorOptions(); + protected KeyValueProcessorOptions processorOptions = new KeyValueProcessorOptions(); public void setKeyFilterOptions(KeyFilterOptions keyFilterOptions) { this.keyFilterOptions = keyFilterOptions; @@ -46,7 +46,6 @@ protected ItemProcessor, KeyValue> processor(RedisCodec return new FunctionItemProcessor<>(code.andThen(function(context.getEvaluationContext())).andThen(decode)); } - @SuppressWarnings({ "unchecked", "rawtypes" }) private Function, KeyValue> function(EvaluationContext context) { KeyValueOperator operator = new KeyValueOperator(); if (processorOptions.getKeyExpression() != null) { @@ -59,8 +58,8 @@ private Function, KeyValue> function(EvaluationContext operator.setTtlFunction(new LongExpressionFunction<>(context, processorOptions.getTtlExpression())); } } - if (processorOptions.isDropStreamMessageId()) { - operator.setValueFunction((Function) new StreamMessageIdDropOperator()); + if (processorOptions.isDropStreamMessageId() && isStruct()) { + operator.setValueFunction(new DropStreamMessageIdFunction()); } if (processorOptions.getTypeExpression() != null) { Function, String> function = ExpressionFunction.of(context, processorOptions.getTypeExpression()); @@ -69,6 +68,10 @@ private Function, KeyValue> function(EvaluationContext return operator; } + protected boolean isStruct() { + return true; + } + protected void configureReader(RedisItemReader reader, RedisContext context) { reader.setChunkSize(readerOptions.getChunkSize()); reader.setDatabase(context.getUri().getDatabase()); diff --git a/core/riot-core/src/main/java/com/redis/riot/core/AbstractJobRunnable.java b/core/riot-core/src/main/java/com/redis/riot/core/AbstractJobRunnable.java index f085025f8..f02c4d425 100644 --- a/core/riot-core/src/main/java/com/redis/riot/core/AbstractJobRunnable.java +++ b/core/riot-core/src/main/java/com/redis/riot/core/AbstractJobRunnable.java @@ -1,8 +1,7 @@ package com.redis.riot.core; -import java.text.MessageFormat; -import java.util.ArrayList; -import java.util.List; +import java.util.Optional; +import java.util.function.Consumer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -10,44 +9,54 @@ import org.springframework.batch.core.JobExecution; import org.springframework.batch.core.JobExecutionException; import org.springframework.batch.core.JobParameters; +import org.springframework.batch.core.StepExecution; import org.springframework.batch.core.configuration.annotation.JobBuilderFactory; import org.springframework.batch.core.configuration.annotation.StepBuilderFactory; import org.springframework.batch.core.job.builder.JobBuilder; -import org.springframework.batch.core.launch.JobLauncher; import org.springframework.batch.core.launch.support.SimpleJobLauncher; import org.springframework.batch.core.repository.JobRepository; -import org.springframework.batch.core.repository.support.MapJobRepositoryFactoryBean; -import org.springframework.batch.support.transaction.ResourcelessTransactionManager; +import org.springframework.batch.core.repository.support.AbstractJobRepositoryFactoryBean; +import org.springframework.batch.core.step.builder.FaultTolerantStepBuilder; +import org.springframework.batch.core.step.builder.SimpleStepBuilder; +import org.springframework.batch.item.ItemProcessor; +import org.springframework.batch.item.ItemReader; +import org.springframework.batch.item.ItemStreamReader; +import org.springframework.batch.item.ItemWriter; +import org.springframework.batch.item.support.SynchronizedItemStreamReader; +import org.springframework.beans.factory.InitializingBean; import org.springframework.core.task.SyncTaskExecutor; +import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; +import org.springframework.transaction.PlatformTransactionManager; import org.springframework.util.ClassUtils; +import com.redis.spring.batch.RedisItemReader; +import com.redis.spring.batch.step.FlushingStepBuilder; import com.redis.spring.batch.writer.AbstractOperationItemWriter; import com.redis.spring.batch.writer.StructItemWriter; -@SuppressWarnings("deprecation") +import io.lettuce.core.RedisCommandExecutionException; +import io.lettuce.core.RedisCommandTimeoutException; + public abstract class AbstractJobRunnable extends AbstractRiotRunnable { protected final Logger log = LoggerFactory.getLogger(getClass()); - protected JobRepository jobRepository; + private String name = ClassUtils.getShortName(getClass()); - private JobBuilderFactory jobFactory; + protected JobRepository jobRepository; - private StepBuilderFactory stepFactory; + private PlatformTransactionManager transactionManager; private StepOptions stepOptions = new StepOptions(); - private String name; + private JobBuilderFactory jobBuilderFactory; - private List stepConfigurationStrategies = new ArrayList<>(); + private StepBuilderFactory stepBuilderFactory; - protected AbstractJobRunnable() { - setName(ClassUtils.getShortName(getClass())); - } + private SimpleJobLauncher jobLauncher; - public void addStepConfigurationStrategy(StepConfigurationStrategy strategy) { - stepConfigurationStrategies.add(strategy); - } + private Consumer> stepConfigurer = s -> { + }; public String getName() { return name; @@ -65,65 +74,64 @@ public void setStepOptions(StepOptions stepOptions) { this.stepOptions = stepOptions; } + public void setJobRepository(JobRepository jobRepository) { + this.jobRepository = jobRepository; + } + + public void setTransactionManager(PlatformTransactionManager transactionManager) { + this.transactionManager = transactionManager; + } + + public void setStepConfigurer(Consumer> stepConfigurer) { + this.stepConfigurer = stepConfigurer; + } + @Override protected void execute(RiotContext executionContext) { - checkJobRepository(); - jobFactory = new JobBuilderFactory(jobRepository); - stepFactory = stepBuilderFactory(); + initialize(); Job job = job(executionContext); JobExecution execution; try { - execution = jobLauncher().run(job, new JobParameters()); + execution = jobLauncher.run(job, new JobParameters()); + if (execution.getStatus().isUnsuccessful()) { + Optional failedStepExecution = execution.getStepExecutions().stream() + .filter(e -> e.getStatus().isUnsuccessful()).findFirst(); + failedStepExecution.ifPresent(this::handleFailedStepExecution); + throw new RiotExecutionException("Error during job execution: " + execution.getStatus()); + } } catch (JobExecutionException e) { throw new RiotExecutionException("Could not run job", e); } - if (execution.getStatus().isUnsuccessful()) { - List exceptions = execution.getAllFailureExceptions(); - String msg = MessageFormat.format("Error executing {0}", execution.getJobInstance().getJobName()); - if (exceptions.isEmpty()) { - throw new RiotExecutionException(msg); - } - throw new RiotExecutionException(msg, exceptions.get(0)); - } } - private void checkJobRepository() { - if (jobRepository == null) { - MapJobRepositoryFactoryBean bean = new MapJobRepositoryFactoryBean(); - try { - bean.afterPropertiesSet(); - jobRepository = bean.getObject(); - } catch (Exception e) { - throw new RiotExecutionException("Could not initialize job repository", e); - } - } - } - - private StepBuilderFactory stepBuilderFactory() { - return new StepBuilderFactory(jobRepository, new ResourcelessTransactionManager()); - } - - private JobLauncher jobLauncher() { - SimpleJobLauncher launcher = new SimpleJobLauncher(); - launcher.setJobRepository(jobRepository); - launcher.setTaskExecutor(new SyncTaskExecutor()); - return launcher; + protected JobBuilder jobBuilder() { + return jobBuilderFactory.get(name); } - protected JobBuilder jobBuilder() { - return jobFactory.get(name); + private void initialize() { + if (jobRepository == null || transactionManager == null) { + @SuppressWarnings("deprecation") + AbstractJobRepositoryFactoryBean bean = new org.springframework.batch.core.repository.support.MapJobRepositoryFactoryBean(); + if (jobRepository == null) { + try { + jobRepository = bean.getObject(); + } catch (Exception e) { + throw new RiotExecutionException("Could not initialize job repository", e); + } + } + if (transactionManager == null) { + transactionManager = bean.getTransactionManager(); + } + } + jobBuilderFactory = new JobBuilderFactory(jobRepository); + stepBuilderFactory = new StepBuilderFactory(jobRepository, transactionManager); + jobLauncher = new SimpleJobLauncher(); + jobLauncher.setJobRepository(jobRepository); + jobLauncher.setTaskExecutor(new SyncTaskExecutor()); } protected abstract Job job(RiotContext executionContext); - protected StepBuilder createStep() { - StepBuilder step = new StepBuilder<>(stepFactory); - step.name(getName()); - step.options(stepOptions); - step.configurationStrategies(stepConfigurationStrategies); - return step; - } - protected > W writer(W writer, RedisWriterOptions options) { writer.setMultiExec(options.isMultiExec()); writer.setPoolSize(options.getPoolSize()); @@ -135,4 +143,98 @@ protected StepBuilder createStep() { return writer; } + private void handleFailedStepExecution(StepExecution stepExecution) { + String msg = String.format("Error executing step %s", stepExecution.getStepName()); + if (stepExecution.getFailureExceptions().isEmpty()) { + throw new RiotExecutionException(msg); + } + throw new RiotExecutionException(msg, stepExecution.getFailureExceptions().get(0)); + } + + protected FaultTolerantStepBuilder step(String name, ItemReader reader, ItemWriter writer) { + return step(name, reader, null, writer); + } + + protected FaultTolerantStepBuilder step(String name, ItemReader reader, ItemProcessor processor, + ItemWriter writer) { + RiotStep step = new RiotStep<>(); + step.setName(name); + step.setReader(reader); + step.setProcessor(processor); + step.setWriter(writer); + stepConfigurer.accept(step); + return step(step); + } + + protected FaultTolerantStepBuilder step(RiotStep riotStep) { + SimpleStepBuilder step = stepBuilderFactory.get(riotStep.getName()).chunk(stepOptions.getChunkSize()); + step.reader(reader(riotStep.getReader())); + step.processor(processor(riotStep.getProcessor())); + step.writer(writer(riotStep.getWriter())); + if (stepOptions.getThreads() > 1) { + ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor(); + taskExecutor.setMaxPoolSize(stepOptions.getThreads()); + taskExecutor.setCorePoolSize(stepOptions.getThreads()); + taskExecutor.setQueueCapacity(stepOptions.getThreads()); + taskExecutor.afterPropertiesSet(); + step.taskExecutor(taskExecutor); + step.throttleLimit(stepOptions.getThreads()); + } + riotStep.getConfigurer().accept(step); + if (riotStep.getReader() instanceof RedisItemReader) { + RedisItemReader redisReader = (RedisItemReader) riotStep.getReader(); + if (redisReader.isLive()) { + FlushingStepBuilder flushingStep = new FlushingStepBuilder<>(step); + flushingStep.interval(redisReader.getFlushInterval()); + flushingStep.idleTimeout(redisReader.getIdleTimeout()); + step = flushingStep; + } + } + FaultTolerantStepBuilder ftStep = step.faultTolerant(); + ftStep.skipLimit(stepOptions.getSkipLimit()); + ftStep.retryLimit(stepOptions.getRetryLimit()); + ftStep.retry(RedisCommandTimeoutException.class); + ftStep.noRetry(RedisCommandExecutionException.class); + return ftStep; + } + + private ItemProcessor processor(ItemProcessor processor) { + initialize(processor); + return processor; + } + + private void initialize(Object object) { + if (object instanceof InitializingBean) { + try { + ((InitializingBean) object).afterPropertiesSet(); + } catch (Exception e) { + throw new RiotExecutionException("Could not initialize " + object, e); + } + } + } + + private ItemReader reader(ItemReader reader) { + initialize(reader); + if (reader instanceof RedisItemReader) { + return reader; + } + if (stepOptions.getThreads() > 1 && reader instanceof ItemStreamReader) { + SynchronizedItemStreamReader synchronizedReader = new SynchronizedItemStreamReader<>(); + synchronizedReader.setDelegate((ItemStreamReader) reader); + return synchronizedReader; + } + return reader; + } + + private ItemWriter writer(ItemWriter writer) { + if (stepOptions.isDryRun()) { + return new NoopItemWriter<>(); + } + initialize(writer); + if (stepOptions.getSleep() == null || stepOptions.getSleep().isNegative() || stepOptions.getSleep().isZero()) { + return writer; + } + return new ThrottledItemWriter<>(writer, stepOptions.getSleep()); + } + } diff --git a/core/riot-core/src/main/java/com/redis/riot/core/AbstractMapExport.java b/core/riot-core/src/main/java/com/redis/riot/core/AbstractMapExport.java index 4d5012a2f..03933f8e8 100644 --- a/core/riot-core/src/main/java/com/redis/riot/core/AbstractMapExport.java +++ b/core/riot-core/src/main/java/com/redis/riot/core/AbstractMapExport.java @@ -12,7 +12,6 @@ import com.redis.riot.core.function.StructToMapFunction; import com.redis.spring.batch.common.KeyValue; import com.redis.spring.batch.reader.StructItemReader; -import com.redis.spring.batch.util.BatchUtils; import io.lettuce.core.codec.StringCodec; @@ -28,12 +27,10 @@ public void setKeyPattern(Pattern pattern) { @Override protected Job job(RiotContext context) { - StepBuilder, Map> step = createStep(); - step.name(getName()); - step.reader(reader(context.getRedisContext())); - step.writer(writer()); - step.processor(processor(context)); - return jobBuilder().start(step.build().build()).build(); + StructItemReader reader = reader(context.getRedisContext()); + ItemProcessor, Map> processor = processor(context); + ItemWriter> writer = writer(); + return jobBuilder().start(step(getName(), reader, processor, writer).build()).build(); } protected StructItemReader reader(RedisContext context) { @@ -42,13 +39,13 @@ protected StructItemReader reader(RedisContext context) { return reader; } - private ItemProcessor, Map> processor(RiotContext context) { + protected ItemProcessor, Map> processor(RiotContext context) { ItemProcessor, KeyValue> processor = processor(StringCodec.UTF8, context); StructToMapFunction toMapFunction = new StructToMapFunction(); if (keyPattern != null) { toMapFunction.setKey(new RegexNamedGroupFunction(keyPattern)); } - return BatchUtils.processor(processor, new FunctionItemProcessor<>(toMapFunction)); + return RiotUtils.processor(processor, new FunctionItemProcessor<>(toMapFunction)); } protected abstract ItemWriter> writer(); diff --git a/core/riot-core/src/main/java/com/redis/riot/core/AbstractMapImport.java b/core/riot-core/src/main/java/com/redis/riot/core/AbstractMapImport.java index 641c818c9..8762e1b71 100644 --- a/core/riot-core/src/main/java/com/redis/riot/core/AbstractMapImport.java +++ b/core/riot-core/src/main/java/com/redis/riot/core/AbstractMapImport.java @@ -10,7 +10,6 @@ import org.springframework.util.Assert; import com.redis.spring.batch.RedisItemWriter; -import com.redis.spring.batch.util.BatchUtils; import com.redis.spring.batch.writer.OperationItemWriter; import com.redis.spring.batch.writer.WriteOperation; @@ -48,7 +47,7 @@ public void setWriterOptions(RedisWriterOptions operationOptions) { protected ItemWriter> writer(RiotContext context) { Assert.notEmpty(operations, "No operation specified"); AbstractRedisClient client = context.getRedisContext().getClient(); - return BatchUtils.writer(operations.stream().map(o -> writer(client, o)).collect(Collectors.toList())); + return RiotUtils.writer(operations.stream().map(o -> writer(client, o)).collect(Collectors.toList())); } private ItemWriter writer(AbstractRedisClient client, WriteOperation operation) { diff --git a/core/riot-core/src/main/java/com/redis/riot/core/AbstractRiotRunnable.java b/core/riot-core/src/main/java/com/redis/riot/core/AbstractRiotRunnable.java index defef7a76..c0c095c38 100644 --- a/core/riot-core/src/main/java/com/redis/riot/core/AbstractRiotRunnable.java +++ b/core/riot-core/src/main/java/com/redis/riot/core/AbstractRiotRunnable.java @@ -28,9 +28,9 @@ public abstract class AbstractRiotRunnable implements Runnable { - public static final String DATE_VARIABLE_NAME = "date"; + private static final String DATE_VARIABLE_NAME = "date"; - public static final String REDIS_VARIABLE_NAME = "redis"; + private static final String REDIS_VARIABLE_NAME = "redis"; private RedisOptions redisOptions = new RedisOptions(); diff --git a/core/riot-core/src/main/java/com/redis/riot/core/GeneratorImport.java b/core/riot-core/src/main/java/com/redis/riot/core/GeneratorImport.java index 8f446a6e0..323d47ffa 100644 --- a/core/riot-core/src/main/java/com/redis/riot/core/GeneratorImport.java +++ b/core/riot-core/src/main/java/com/redis/riot/core/GeneratorImport.java @@ -4,6 +4,7 @@ import org.springframework.batch.core.Job; +import com.redis.spring.batch.RedisItemWriter; import com.redis.spring.batch.common.DataType; import com.redis.spring.batch.common.KeyValue; import com.redis.spring.batch.common.Range; @@ -151,10 +152,9 @@ public void setTypes(List types) { @Override protected Job job(RiotContext context) { - StepBuilder, KeyValue> step = createStep(); - step.reader(reader()); - step.writer(writer(context)); - return jobBuilder().start(step.build().build()).build(); + GeneratorItemReader reader = reader(); + RedisItemWriter> writer = writer(context); + return jobBuilder().start(step(getName(), reader, writer).build()).build(); } private GeneratorItemReader reader() { diff --git a/core/riot-core/src/main/java/com/redis/riot/core/KeyComparisonOptions.java b/core/riot-core/src/main/java/com/redis/riot/core/KeyComparisonOptions.java index 6c39ff87b..f5419fbed 100644 --- a/core/riot-core/src/main/java/com/redis/riot/core/KeyComparisonOptions.java +++ b/core/riot-core/src/main/java/com/redis/riot/core/KeyComparisonOptions.java @@ -2,7 +2,7 @@ import java.time.Duration; -import com.redis.spring.batch.common.KeyComparisonItemReader; +import com.redis.spring.batch.reader.KeyComparisonValueReader; public class KeyComparisonOptions { @@ -14,7 +14,7 @@ public class KeyComparisonOptions { private KeyComparisonMode mode = DEFAULT_MODE; - private Duration ttlTolerance = KeyComparisonItemReader.DEFAULT_TTL_TOLERANCE; + private Duration ttlTolerance = KeyComparisonValueReader.DEFAULT_TTL_TOLERANCE; public KeyComparisonMode getMode() { return mode; diff --git a/core/riot-core/src/main/java/com/redis/riot/core/KeyComparisonStatusCountItemWriter.java b/core/riot-core/src/main/java/com/redis/riot/core/KeyComparisonStatusCountItemWriter.java index fd7b76ff7..05dc1bf9b 100644 --- a/core/riot-core/src/main/java/com/redis/riot/core/KeyComparisonStatusCountItemWriter.java +++ b/core/riot-core/src/main/java/com/redis/riot/core/KeyComparisonStatusCountItemWriter.java @@ -40,12 +40,8 @@ public long getCount(Status status) { return counts.get(status).get(); } - public Long[] getCounts(Status... statuses) { - Long[] array = new Long[statuses.length]; - for (int index = 0; index < statuses.length; index++) { - array[index] = getCount(statuses[index]); - } - return array; + public List getCounts(Status... statuses) { + return Stream.of(statuses).map(this::getCount).collect(Collectors.toList()); } public long getTotal() { diff --git a/core/riot-core/src/main/java/com/redis/riot/core/NoopItemWriter.java b/core/riot-core/src/main/java/com/redis/riot/core/NoopItemWriter.java new file mode 100644 index 000000000..2023a7538 --- /dev/null +++ b/core/riot-core/src/main/java/com/redis/riot/core/NoopItemWriter.java @@ -0,0 +1,14 @@ +package com.redis.riot.core; + +import java.util.List; + +import org.springframework.batch.item.ItemWriter; + +public class NoopItemWriter implements ItemWriter { + + @Override + public void write(List items) throws Exception { + // Do nothing + } + +} diff --git a/core/riot-core/src/main/java/com/redis/riot/core/ProcessorOptions.java b/core/riot-core/src/main/java/com/redis/riot/core/ProcessorOptions.java index bfb443ac4..450821ada 100644 --- a/core/riot-core/src/main/java/com/redis/riot/core/ProcessorOptions.java +++ b/core/riot-core/src/main/java/com/redis/riot/core/ProcessorOptions.java @@ -22,7 +22,6 @@ import com.redis.lettucemod.util.GeoLocation; import com.redis.riot.core.function.ExpressionFunction; import com.redis.riot.core.function.MapFunction; -import com.redis.spring.batch.util.BatchUtils; public class ProcessorOptions { @@ -65,7 +64,7 @@ public ItemProcessor, Map> processor(Standar Predicate> predicate = RiotUtils.predicate(context, filter); processors.add(new PredicateItemProcessor<>(predicate)); } - return BatchUtils.processor(processors); + return RiotUtils.processor(processors); } /** diff --git a/core/riot-core/src/main/java/com/redis/riot/core/Replication.java b/core/riot-core/src/main/java/com/redis/riot/core/Replication.java index f52551767..75015a13a 100644 --- a/core/riot-core/src/main/java/com/redis/riot/core/Replication.java +++ b/core/riot-core/src/main/java/com/redis/riot/core/Replication.java @@ -3,11 +3,13 @@ import java.io.PrintWriter; import org.springframework.batch.core.Job; -import org.springframework.batch.core.Step; import org.springframework.batch.core.job.builder.FlowBuilder; import org.springframework.batch.core.job.builder.JobFlowBuilder; import org.springframework.batch.core.job.builder.SimpleJobBuilder; import org.springframework.batch.core.job.flow.support.SimpleFlow; +import org.springframework.batch.core.step.builder.FaultTolerantStepBuilder; +import org.springframework.batch.core.step.tasklet.TaskletStep; +import org.springframework.batch.item.ItemProcessor; import org.springframework.core.task.SimpleAsyncTaskExecutor; import org.springframework.core.task.TaskExecutor; import org.springframework.expression.spel.support.StandardEvaluationContext; @@ -93,14 +95,18 @@ public void setType(ReplicationType type) { this.type = type; } + @Override + protected boolean isStruct() { + return type == ReplicationType.STRUCT; + } + @Override protected RiotContext createExecutionContext() { - RiotContext baseContext = super.createExecutionContext(); - RedisContext targetRedisContext = redisContext(targetRedisOptions); - StandardEvaluationContext evaluationContext = baseContext.getEvaluationContext(); - evaluationContext.setVariable(VARIABLE_SOURCE, baseContext.getRedisContext().getUri()); - evaluationContext.setVariable(VARIABLE_TARGET, targetRedisContext.getUri()); - return new ReplicationContext(baseContext.getRedisContext(), evaluationContext, targetRedisContext); + ReplicationContext context = new ReplicationContext(super.createExecutionContext(), redisContext(targetRedisOptions)); + StandardEvaluationContext evaluationContext = context.getEvaluationContext(); + evaluationContext.setVariable(VARIABLE_SOURCE, context.getRedisContext().getUri()); + evaluationContext.setVariable(VARIABLE_TARGET, context.getTargetRedisContext().getUri()); + return context; } @Override @@ -111,8 +117,8 @@ protected Job job(RiotContext context) { case COMPARE: return jobBuilder().start(compareStep(replicationContext)).build(); case LIVE: - SimpleFlow scanFlow = flow("scan").start(scanStep(replicationContext).build().build()).build(); - SimpleFlow liveFlow = flow("live").start(liveStep(replicationContext).build().build()).build(); + SimpleFlow scanFlow = flow("scan").start(scanStep(replicationContext).build()).build(); + SimpleFlow liveFlow = flow("live").start(liveStep(replicationContext).build()).build(); SimpleFlow replicateFlow = flow("replicate").split(asyncTaskExecutor()).add(liveFlow, scanFlow).build(); JobFlowBuilder live = jobBuilder().start(replicateFlow); if (shouldCompare()) { @@ -120,9 +126,9 @@ protected Job job(RiotContext context) { } return live.build().build(); case LIVEONLY: - return jobBuilder().start(liveStep(replicationContext).build().build()).build(); + return jobBuilder().start(liveStep(replicationContext).build()).build(); case SNAPSHOT: - SimpleJobBuilder snapshot = jobBuilder().start(scanStep(replicationContext).build().build()); + SimpleJobBuilder snapshot = jobBuilder().start(scanStep(replicationContext).build()); if (shouldCompare()) { snapshot.next(compareStep(replicationContext)); } @@ -144,25 +150,24 @@ private boolean shouldCompare() { return !comparisonOptions.isSkip() && !getStepOptions().isDryRun(); } - private StepBuilder, KeyValue> scanStep(ReplicationContext context) { + private FaultTolerantStepBuilder, KeyValue> scanStep(ReplicationContext context) { return step(context, STEP_SCAN, reader(context.getRedisContext())); } - private StepBuilder, KeyValue> step(ReplicationContext context, String name, + private FaultTolerantStepBuilder, KeyValue> step(ReplicationContext context, String name, RedisItemReader> reader) { reader.setName(name + "-reader"); - StepBuilder, KeyValue> step = createStep(); - step.name(name); - step.reader(reader); - step.processor(processor(ByteArrayCodec.INSTANCE, context)); - step.writer(writer(context)); + RedisItemWriter> writer = writer(context); + writer.setName(name + "-writer"); + ItemProcessor, KeyValue> processor = processor(ByteArrayCodec.INSTANCE, context); + FaultTolerantStepBuilder, KeyValue> step = step(name, reader, processor, writer); if (log.isDebugEnabled()) { - step.addWriteListener(new KeyValueWriteListener<>(reader.getCodec(), log)); + step.listener(new KeyValueWriteListener<>(reader.getCodec(), log)); } return step; } - private StepBuilder, KeyValue> liveStep(ReplicationContext context) { + private FaultTolerantStepBuilder, KeyValue> liveStep(ReplicationContext context) { validateRedisConfig(context); RedisItemReader> reader = reader(context.getRedisContext()); reader.setMode(ReaderMode.LIVE); @@ -176,7 +181,7 @@ private RedisItemReader> reader(RedisContext co } private KeyValueItemReader reader(AbstractRedisClient client) { - if (type == ReplicationType.STRUCT) { + if (isStruct()) { return new StructItemReader<>(client, ByteArrayCodec.INSTANCE); } return new DumpItemReader(client); @@ -195,34 +200,39 @@ private void validateRedisConfig(ReplicationContext context) { } } - private Step compareStep(ReplicationContext context) { + private TaskletStep compareStep(ReplicationContext context) { KeyComparisonItemReader reader = comparisonReader(context); KeyComparisonStatusCountItemWriter writer = new KeyComparisonStatusCountItemWriter(); - StepBuilder step = createStep(); - step.name(STEP_COMPARE); - step.reader(reader); - step.writer(writer); + FaultTolerantStepBuilder step = step(STEP_COMPARE, reader, writer); if (comparisonOptions.isShowDiffs()) { - step.addWriteListener(new KeyComparisonDiffLogger(out)); + step.listener(new KeyComparisonDiffLogger(out)); } - step.addExecutionListener(new KeyComparisonSummaryLogger(writer, out)); - return step.build().build(); + step.listener(new KeyComparisonSummaryLogger(writer, out)); + return step.build(); } private KeyComparisonItemReader comparisonReader(ReplicationContext context) { - KeyValueItemReader sourceReader = compareReader(context.getRedisContext()); + KeyValueItemReader sourceReader = comparisonKeyValueReader(context.getRedisContext().getClient()); configureReader(sourceReader, context.getRedisContext()); - KeyValueItemReader targetReader = compareReader(context.getTargetRedisContext()); + KeyValueItemReader targetReader = comparisonKeyValueReader(context.getTargetRedisContext().getClient()); targetReader.setReadFrom(targetReadFrom); targetReader.setPoolSize(targetWriterOptions.getPoolSize()); - return new KeyComparisonItemReader(sourceReader, targetReader); + KeyComparisonItemReader comparisonReader = new KeyComparisonItemReader(sourceReader, targetReader); + comparisonReader.setProcessor(processor(StringCodec.UTF8, context)); + comparisonReader.setTtlTolerance(comparisonOptions.getTtlTolerance()); + comparisonReader.setCompareStreamMessageIds(!processorOptions.isDropStreamMessageId()); + return comparisonReader; } - private KeyValueItemReader compareReader(RedisContext context) { - if (comparisonOptions.getMode() == KeyComparisonMode.FULL) { - return new StructItemReader<>(context.getClient(), StringCodec.UTF8); + private KeyValueItemReader comparisonKeyValueReader(AbstractRedisClient client) { + if (isFullComparison()) { + return new StructItemReader<>(client, StringCodec.UTF8); } - return new KeyTypeItemReader<>(context.getClient(), StringCodec.UTF8); + return new KeyTypeItemReader<>(client, StringCodec.UTF8); + } + + private boolean isFullComparison() { + return comparisonOptions.getMode() == KeyComparisonMode.FULL; } private RedisItemWriter> writer(ReplicationContext context) { @@ -232,7 +242,7 @@ private RedisItemWriter> writer(ReplicationCont } private KeyValueItemWriter writer(AbstractRedisClient client) { - if (type == ReplicationType.STRUCT) { + if (isStruct()) { return new StructItemWriter<>(client, ByteArrayCodec.INSTANCE); } return new DumpItemWriter(client); diff --git a/core/riot-core/src/main/java/com/redis/riot/core/ReplicationContext.java b/core/riot-core/src/main/java/com/redis/riot/core/ReplicationContext.java index f20366619..326b994a1 100644 --- a/core/riot-core/src/main/java/com/redis/riot/core/ReplicationContext.java +++ b/core/riot-core/src/main/java/com/redis/riot/core/ReplicationContext.java @@ -1,13 +1,11 @@ package com.redis.riot.core; -import org.springframework.expression.spel.support.StandardEvaluationContext; - public class ReplicationContext extends RiotContext { private final RedisContext targetRedisContext; - public ReplicationContext(RedisContext source, StandardEvaluationContext evaluationContext, RedisContext target) { - super(source, evaluationContext); + public ReplicationContext(RiotContext context, RedisContext target) { + super(context.getRedisContext(), context.getEvaluationContext()); this.targetRedisContext = target; } diff --git a/core/riot-core/src/main/java/com/redis/riot/core/RiotStep.java b/core/riot-core/src/main/java/com/redis/riot/core/RiotStep.java new file mode 100644 index 000000000..0fbf87d66 --- /dev/null +++ b/core/riot-core/src/main/java/com/redis/riot/core/RiotStep.java @@ -0,0 +1,63 @@ +package com.redis.riot.core; + +import java.util.function.Consumer; + +import org.springframework.batch.core.step.builder.SimpleStepBuilder; +import org.springframework.batch.item.ItemProcessor; +import org.springframework.batch.item.ItemReader; +import org.springframework.batch.item.ItemWriter; + +public class RiotStep { + + private String name; + + private ItemReader reader; + + private ItemProcessor processor; + + private ItemWriter writer; + + private Consumer> configurer = b -> { + }; + + public String getName() { + return name; + } + + public void setName(String name) { + this.name = name; + } + + public ItemReader getReader() { + return reader; + } + + public void setReader(ItemReader reader) { + this.reader = reader; + } + + public ItemProcessor getProcessor() { + return processor; + } + + public void setProcessor(ItemProcessor processor) { + this.processor = processor; + } + + public ItemWriter getWriter() { + return writer; + } + + public void setWriter(ItemWriter writer) { + this.writer = writer; + } + + public Consumer> getConfigurer() { + return configurer; + } + + public void setConfigurer(Consumer> configurer) { + this.configurer = configurer; + } + +} diff --git a/core/riot-core/src/main/java/com/redis/riot/core/RiotUtils.java b/core/riot-core/src/main/java/com/redis/riot/core/RiotUtils.java index c50259586..80060c6e3 100644 --- a/core/riot-core/src/main/java/com/redis/riot/core/RiotUtils.java +++ b/core/riot-core/src/main/java/com/redis/riot/core/RiotUtils.java @@ -1,8 +1,15 @@ package com.redis.riot.core; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; import java.util.List; import java.util.function.Predicate; +import org.springframework.batch.item.ItemProcessor; +import org.springframework.batch.item.ItemWriter; +import org.springframework.batch.item.support.CompositeItemProcessor; +import org.springframework.batch.item.support.CompositeItemWriter; import org.springframework.expression.EvaluationContext; import org.springframework.expression.Expression; import org.springframework.expression.common.TemplateParserContext; @@ -90,4 +97,38 @@ public static Predicate globPredicate(List patterns) { return Predicates.or(patterns.stream().map(Predicates::glob)); } + public static ItemProcessor processor(ItemProcessor... processors) { + return processor(Arrays.asList(processors)); + } + + @SuppressWarnings("unchecked") + public static ItemProcessor processor(Collection> processors) { + if (processors.isEmpty()) { + return null; + } + if (processors.size() == 1) { + return (ItemProcessor) processors.iterator().next(); + } + CompositeItemProcessor composite = new CompositeItemProcessor<>(); + composite.setDelegates(new ArrayList<>(processors)); + return composite; + } + + @SuppressWarnings("unchecked") + public static ItemWriter writer(ItemWriter... writers) { + return writer(Arrays.asList(writers)); + } + + public static ItemWriter writer(Collection> writers) { + if (writers.isEmpty()) { + throw new IllegalArgumentException("At least one writer must be specified"); + } + if (writers.size() == 1) { + return writers.iterator().next(); + } + CompositeItemWriter composite = new CompositeItemWriter<>(); + composite.setDelegates(new ArrayList<>(writers)); + return composite; + } + } diff --git a/core/riot-core/src/main/java/com/redis/riot/core/StepBuilder.java b/core/riot-core/src/main/java/com/redis/riot/core/StepBuilder.java deleted file mode 100644 index 2ebf166af..000000000 --- a/core/riot-core/src/main/java/com/redis/riot/core/StepBuilder.java +++ /dev/null @@ -1,226 +0,0 @@ -package com.redis.riot.core; - -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; - -import org.springframework.batch.core.ItemWriteListener; -import org.springframework.batch.core.StepExecutionListener; -import org.springframework.batch.core.configuration.annotation.StepBuilderFactory; -import org.springframework.batch.core.step.builder.FaultTolerantStepBuilder; -import org.springframework.batch.core.step.builder.SimpleStepBuilder; -import org.springframework.batch.item.ItemProcessor; -import org.springframework.batch.item.ItemReader; -import org.springframework.batch.item.ItemStreamReader; -import org.springframework.batch.item.ItemWriter; -import org.springframework.batch.item.support.SynchronizedItemStreamReader; -import org.springframework.beans.factory.InitializingBean; - -import com.redis.spring.batch.RedisItemReader; -import com.redis.spring.batch.step.FlushingStepBuilder; -import com.redis.spring.batch.util.BatchUtils; - -import io.lettuce.core.RedisCommandExecutionException; -import io.lettuce.core.RedisCommandTimeoutException; - -public class StepBuilder { - - private final StepBuilderFactory factory; - - private String name; - - private ItemReader reader; - - private ItemWriter writer; - - private ItemProcessor processor; - - private List executionListeners = new ArrayList<>(); - - private List> writeListeners = new ArrayList<>(); - - private StepOptions options = new StepOptions(); - - private List configurationStrategies = new ArrayList<>(); - - private List> skippableExceptions = defaultNonRetriableExceptions(); - - private List> nonSkippableExceptions = defaultRetriableExceptions(); - - private List> retriableExceptions = defaultRetriableExceptions(); - - private List> nonRetriableExceptions = defaultNonRetriableExceptions(); - - @SuppressWarnings("unchecked") - public static List> defaultRetriableExceptions() { - return modifiableList(RedisCommandTimeoutException.class); - } - - @SuppressWarnings("unchecked") - public static List> defaultNonRetriableExceptions() { - return modifiableList(RedisCommandExecutionException.class); - } - - @SuppressWarnings("unchecked") - private static List modifiableList(T... elements) { - return new ArrayList<>(Arrays.asList(elements)); - } - - public StepBuilder(StepBuilderFactory factory) { - this.factory = factory; - } - - public void addSkippableException(Class exception) { - skippableExceptions.add(exception); - } - - public void addNonSkippableException(Class exception) { - nonSkippableExceptions.add(exception); - } - - public void addRetriableException(Class exception) { - retriableExceptions.add(exception); - } - - public void addNonRetriableException(Class exception) { - nonRetriableExceptions.add(exception); - } - - public String getName() { - return name; - } - - public ItemReader getReader() { - return reader; - } - - public ItemWriter getWriter() { - return writer; - } - - public StepBuilder reader(ItemReader reader) { - this.reader = reader; - return this; - } - - public StepBuilder writer(ItemWriter writer) { - this.writer = writer; - return this; - } - - public StepBuilder name(String name) { - this.name = name; - return this; - } - - public StepBuilder options(StepOptions options) { - this.options = options; - return this; - } - - public void accept(StepConfigurationStrategy strategy) { - strategy.configure(this); - } - - public void addWriteListener(ItemWriteListener listener) { - this.writeListeners.add(listener); - } - - public void addExecutionListener(StepExecutionListener listener) { - this.executionListeners.add(listener); - } - - public StepBuilder processor(ItemProcessor processor) { - this.processor = processor; - return this; - } - - public FaultTolerantStepBuilder build() { - configurationStrategies.forEach(s -> s.configure(this)); - FaultTolerantStepBuilder step = simpleStep().faultTolerant(); - step.reader(reader()); - step.processor(processor()); - step.writer(writer()); - if (options.getThreads() > 1) { - step.taskExecutor(BatchUtils.threadPoolTaskExecutor(options.getThreads())); - step.throttleLimit(options.getThreads()); - } - executionListeners.forEach(step::listener); - writeListeners.forEach(step::listener); - step.skipLimit(options.getSkipLimit()); - step.retryLimit(options.getRetryLimit()); - skippableExceptions.forEach(step::skip); - nonSkippableExceptions.forEach(step::noSkip); - retriableExceptions.forEach(step::retry); - nonRetriableExceptions.forEach(step::noRetry); - return step; - } - - private SimpleStepBuilder simpleStep() { - SimpleStepBuilder step = factory.get(name).chunk(options.getChunkSize()); - if (reader instanceof RedisItemReader) { - RedisItemReader redisReader = (RedisItemReader) reader; - if (redisReader.isLive()) { - FlushingStepBuilder flushingStep = new FlushingStepBuilder<>(step); - flushingStep.interval(redisReader.getFlushInterval()); - flushingStep.idleTimeout(redisReader.getIdleTimeout()); - return flushingStep; - } - } - return step; - } - - private ItemProcessor processor() { - initialize(processor); - return processor; - } - - private void initialize(Object object) { - if (object instanceof InitializingBean) { - try { - ((InitializingBean) object).afterPropertiesSet(); - } catch (Exception e) { - throw new RiotExecutionException("Could not initialize " + object, e); - } - } - } - - private ItemReader reader() { - initialize(reader); - if (reader instanceof RedisItemReader) { - return reader; - } - if (options.getThreads() > 1 && reader instanceof ItemStreamReader) { - SynchronizedItemStreamReader synchronizedReader = new SynchronizedItemStreamReader<>(); - synchronizedReader.setDelegate((ItemStreamReader) reader); - return synchronizedReader; - } - return reader; - } - - private ItemWriter writer() { - initialize(writer); - if (options.isDryRun()) { - return new NoopItemWriter<>(); - } - if (options.getSleep() == null || options.getSleep().isNegative() || options.getSleep().isZero()) { - return writer; - } - return new ThrottledItemWriter<>(writer, options.getSleep()); - } - - private static class NoopItemWriter implements ItemWriter { - - @Override - public void write(List items) throws Exception { - // Do nothing - } - - } - - public StepBuilder configurationStrategies(List strategies) { - this.configurationStrategies = strategies; - return this; - } - -} diff --git a/core/riot-core/src/main/java/com/redis/riot/core/StepConfigurationStrategy.java b/core/riot-core/src/main/java/com/redis/riot/core/StepConfigurationStrategy.java deleted file mode 100644 index 492bcc8d7..000000000 --- a/core/riot-core/src/main/java/com/redis/riot/core/StepConfigurationStrategy.java +++ /dev/null @@ -1,7 +0,0 @@ -package com.redis.riot.core; - -public interface StepConfigurationStrategy { - - void configure(StepBuilder step); - -} diff --git a/core/riot-core/src/main/java/com/redis/riot/core/function/DropStreamMessageIdFunction.java b/core/riot-core/src/main/java/com/redis/riot/core/function/DropStreamMessageIdFunction.java new file mode 100644 index 000000000..cc575f38f --- /dev/null +++ b/core/riot-core/src/main/java/com/redis/riot/core/function/DropStreamMessageIdFunction.java @@ -0,0 +1,34 @@ +package com.redis.riot.core.function; + +import java.util.Collection; +import java.util.function.Function; +import java.util.stream.Collectors; + +import org.springframework.util.CollectionUtils; + +import com.redis.spring.batch.common.DataType; +import com.redis.spring.batch.common.KeyValue; + +import io.lettuce.core.StreamMessage; + +@SuppressWarnings("unchecked") +public class DropStreamMessageIdFunction implements Function, Object> { + + @SuppressWarnings("rawtypes") + @Override + public Object apply(KeyValue t) { + if (t.getType() == DataType.STREAM) { + Collection messages = (Collection) t.getValue(); + if (!CollectionUtils.isEmpty(messages)) { + return messages.stream().map(this::message).collect(Collectors.toList()); + } + } + return t.getValue(); + } + + @SuppressWarnings("rawtypes") + private StreamMessage message(StreamMessage message) { + return new StreamMessage(message.getStream(), null, message.getBody()); + } + +} diff --git a/core/riot-core/src/main/java/com/redis/riot/core/function/StreamMessageIdDropOperator.java b/core/riot-core/src/main/java/com/redis/riot/core/function/StreamMessageIdDropOperator.java deleted file mode 100644 index c7df0573f..000000000 --- a/core/riot-core/src/main/java/com/redis/riot/core/function/StreamMessageIdDropOperator.java +++ /dev/null @@ -1,26 +0,0 @@ -package com.redis.riot.core.function; - -import java.util.Collection; -import java.util.function.UnaryOperator; -import java.util.stream.Collectors; - -import com.redis.spring.batch.common.DataType; -import com.redis.spring.batch.common.KeyValue; - -import io.lettuce.core.StreamMessage; - -@SuppressWarnings("unchecked") -public class StreamMessageIdDropOperator implements UnaryOperator> { - - @SuppressWarnings("rawtypes") - @Override - public KeyValue apply(KeyValue t) { - if (t.getType() == DataType.STREAM) { - Collection messages = (Collection) t.getValue(); - t.setValue(messages.stream().map(m -> new StreamMessage(m.getStream(), null, m.getBody())) - .collect(Collectors.toList())); - } - return t; - } - -} diff --git a/core/riot-core/src/main/java/com/redis/riot/core/operation/CompositeBatchWriteOperation.java b/core/riot-core/src/main/java/com/redis/riot/core/operation/CompositeBatchWriteOperation.java index af6374b1c..779a15758 100644 --- a/core/riot-core/src/main/java/com/redis/riot/core/operation/CompositeBatchWriteOperation.java +++ b/core/riot-core/src/main/java/com/redis/riot/core/operation/CompositeBatchWriteOperation.java @@ -17,7 +17,7 @@ public CompositeBatchWriteOperation(List> delegates } @Override - public List> execute(BaseRedisAsyncCommands commands, List items) { + public List> execute(BaseRedisAsyncCommands commands, List items) { List> futures = new ArrayList<>(); for (BatchWriteOperation delegate : delegates) { futures.addAll(delegate.execute(commands, items)); diff --git a/core/riot-core/src/test/java/com/redis/riot/core/test/JsonSerdeTests.java b/core/riot-core/src/test/java/com/redis/riot/core/test/JsonSerdeTests.java index b4fdb3d6b..24bb79412 100644 --- a/core/riot-core/src/test/java/com/redis/riot/core/test/JsonSerdeTests.java +++ b/core/riot-core/src/test/java/com/redis/riot/core/test/JsonSerdeTests.java @@ -25,7 +25,7 @@ import com.redis.spring.batch.common.DataType; import com.redis.spring.batch.common.KeyValue; import com.redis.spring.batch.gen.GeneratorItemReader; -import com.redis.spring.batch.util.BatchUtils; +import com.redis.spring.batch.test.AbstractTestBase; @TestInstance(Lifecycle.PER_CLASS) class JsonSerdeTests { @@ -76,7 +76,7 @@ void serde() throws Exception { GeneratorItemReader reader = new GeneratorItemReader(); reader.setMaxItemCount(17); reader.open(new ExecutionContext()); - List> items = BatchUtils.readAll(reader); + List> items = AbstractTestBase.readAll(reader); for (KeyValue item : items) { String json = mapper.writeValueAsString(item); KeyValue result = mapper.readValue(json, KeyValue.class); diff --git a/core/riot-core/src/test/java/com/redis/riot/core/test/ReplicationTests.java b/core/riot-core/src/test/java/com/redis/riot/core/test/ReplicationTests.java index 0f9394039..39187646b 100644 --- a/core/riot-core/src/test/java/com/redis/riot/core/test/ReplicationTests.java +++ b/core/riot-core/src/test/java/com/redis/riot/core/test/ReplicationTests.java @@ -11,7 +11,6 @@ import org.springframework.batch.core.JobExecutionException; import org.springframework.batch.item.support.ListItemWriter; -import com.redis.lettucemod.util.RedisModulesUtils; import com.redis.riot.core.KeyValueProcessorOptions; import com.redis.riot.core.PredicateItemProcessor; import com.redis.riot.core.RedisOptions; @@ -111,9 +110,9 @@ void keyProcessorWithDate(TestInfo info) throws Throwable { protected KeyComparisonItemReader comparisonReader(TestInfo info) { StructItemReader sourceReader = RedisItemReader.struct(client); - configureReader(new SimpleTestInfo(info, "source"), sourceReader); + configureReader(testInfo(info, "source"), sourceReader); StructItemReader targetReader = RedisItemReader.struct(targetClient); - configureReader(new SimpleTestInfo(info, "target"), targetReader); + configureReader(testInfo(info, "target"), targetReader); KeyComparisonItemReader comparator = new KeyComparisonItemReader(sourceReader, targetReader); comparator.setTtlTolerance(Duration.ofMillis(100)); return comparator; @@ -121,7 +120,7 @@ protected KeyComparisonItemReader comparisonReader(TestInfo info) { @Test void filterKeySlot(TestInfo info) throws Exception { - RedisModulesUtils.connection(client).sync().configSet("notify-keyspace-events", "AK"); + enableKeyspaceNotifications(client); RedisItemReader> reader = RedisItemReader.struct(client); configureReader(info, reader); reader.setMode(ReaderMode.LIVE); @@ -132,8 +131,7 @@ void filterKeySlot(TestInfo info) throws Exception { Executors.newSingleThreadScheduledExecutor().execute(() -> { awaitUntil(reader::isOpen); int count = 100; - GeneratorItemReader gen = new GeneratorItemReader(); - gen.setMaxItemCount(count); + GeneratorItemReader gen = generator(count); try { generate(info, gen); } catch (JobExecutionException e) { diff --git a/gradle.properties b/gradle.properties index 76ae6ade2..32784ea46 100644 --- a/gradle.properties +++ b/gradle.properties @@ -27,13 +27,11 @@ gitPluginVersion = 3.0.0 kordampBuildVersion = 3.1.0 kordampPluginVersion = 0.51.0 -awaitilityVersion = 4.2.0 awsVersion = 2.2.6.RELEASE commonsIoVersion = 2.11.0 db2Version = 11.5.8.0 datafakerVersion = 2.0.1 gcpVersion = 1.2.8.RELEASE -googleHttpVersion = 1.38.0 jacocoVersion = 0.8.10 jdksPluginVersion = 1.5.0 jsr305Version = 3.0.2 @@ -45,7 +43,7 @@ oracleVersion = 19.3.0.0 picocliVersion = 4.7.5 plexusVersion = 4.0.0 progressbarVersion = 0.10.0 -springBatchRedisVersion = 3.7.1 +springBatchRedisVersion = 3.7.2 sqliteVersion = 3.43.0.0 testcontainersRedisVersion = 1.6.4 testcontainersVersion = 1.19.0 diff --git a/plugins/riot/riot.gradle b/plugins/riot/riot.gradle index 312fd0570..9bee6b621 100644 --- a/plugins/riot/riot.gradle +++ b/plugins/riot/riot.gradle @@ -65,7 +65,7 @@ dependencies { implementation group: 'org.codehaus.plexus', name: 'plexus-utils', version:plexusVersion implementation 'org.slf4j:slf4j-simple' testImplementation group: 'com.redis', name: 'spring-batch-redis', version: springBatchRedisVersion, classifier: 'tests' - testImplementation group: 'org.awaitility', name: 'awaitility', version: awaitilityVersion + testImplementation 'org.awaitility:awaitility' testImplementation 'org.springframework.boot:spring-boot-autoconfigure' testImplementation 'org.springframework:spring-jdbc' testImplementation group: 'org.testcontainers', name: 'postgresql', version: testcontainersVersion diff --git a/plugins/riot/src/main/java/com/redis/riot/cli/AbstractExportCommand.java b/plugins/riot/src/main/java/com/redis/riot/cli/AbstractExportCommand.java index e1aeb3eb0..288f329a5 100644 --- a/plugins/riot/src/main/java/com/redis/riot/cli/AbstractExportCommand.java +++ b/plugins/riot/src/main/java/com/redis/riot/cli/AbstractExportCommand.java @@ -1,12 +1,12 @@ package com.redis.riot.cli; -import java.util.function.Supplier; +import java.util.function.LongSupplier; import com.redis.riot.core.AbstractExport; import com.redis.riot.core.AbstractJobRunnable; -import com.redis.riot.core.StepBuilder; +import com.redis.riot.core.RiotStep; +import com.redis.spring.batch.RedisItemReader; import com.redis.spring.batch.reader.ScanSizeEstimator; -import com.redis.spring.batch.reader.StructItemReader; import picocli.CommandLine.ArgGroup; @@ -33,26 +33,18 @@ protected AbstractJobRunnable getJobExecutable() { protected abstract AbstractExport getExport(); - @SuppressWarnings("unchecked") @Override - protected long size(StepBuilder step) { - StructItemReader reader = (StructItemReader) step.getReader(); + protected LongSupplier initialMaxSupplier(RiotStep step) { + RedisItemReader reader = (RedisItemReader) step.getReader(); + if (reader.isLive()) { + return () -> ProgressStepExecutionListener.UNKNOWN_SIZE; + } ScanSizeEstimator estimator = new ScanSizeEstimator(reader.getClient()); estimator.setScanMatch(readerArgs.scanMatch); if (readerArgs.scanType != null) { estimator.setScanType(readerArgs.scanType.getString()); } - return estimator.getAsLong(); - } - - @Override - protected String taskName(StepBuilder step) { - return "Exporting"; - } - - @Override - protected Supplier extraMessage(StepBuilder step) { - return null; + return estimator; } } diff --git a/plugins/riot/src/main/java/com/redis/riot/cli/AbstractImportCommand.java b/plugins/riot/src/main/java/com/redis/riot/cli/AbstractImportCommand.java index 42d1be90e..f58b97a79 100644 --- a/plugins/riot/src/main/java/com/redis/riot/cli/AbstractImportCommand.java +++ b/plugins/riot/src/main/java/com/redis/riot/cli/AbstractImportCommand.java @@ -3,7 +3,6 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; -import java.util.function.Supplier; import java.util.stream.Collectors; import com.redis.riot.cli.operation.DelCommand; @@ -21,7 +20,7 @@ import com.redis.riot.cli.operation.XaddCommand; import com.redis.riot.cli.operation.ZaddCommand; import com.redis.riot.core.AbstractMapImport; -import com.redis.riot.core.StepBuilder; +import com.redis.riot.core.RiotStep; import com.redis.spring.batch.writer.WriteOperation; import picocli.CommandLine.ArgGroup; @@ -64,13 +63,8 @@ protected AbstractMapImport getJobExecutable() { protected abstract AbstractMapImport getMapImportExecutable(); @Override - protected String taskName(StepBuilder step) { + protected String taskName(RiotStep step) { return "Importing"; } - @Override - protected Supplier extraMessage(StepBuilder step) { - return null; - } - } diff --git a/plugins/riot/src/main/java/com/redis/riot/cli/AbstractJobCommand.java b/plugins/riot/src/main/java/com/redis/riot/cli/AbstractJobCommand.java index 3e8289fa4..57aff94cd 100644 --- a/plugins/riot/src/main/java/com/redis/riot/cli/AbstractJobCommand.java +++ b/plugins/riot/src/main/java/com/redis/riot/cli/AbstractJobCommand.java @@ -1,14 +1,18 @@ package com.redis.riot.cli; +import java.util.function.LongSupplier; import java.util.function.Supplier; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.springframework.batch.core.ItemWriteListener; +import org.springframework.batch.core.StepExecutionListener; +import org.springframework.util.ClassUtils; import com.redis.riot.cli.ProgressArgs.ProgressStyle; import com.redis.riot.core.AbstractJobRunnable; import com.redis.riot.core.EvaluationContextOptions; -import com.redis.riot.core.StepBuilder; +import com.redis.riot.core.RiotStep; import me.tongfei.progressbar.DelegatingProgressBarConsumer; import me.tongfei.progressbar.ProgressBarBuilder; @@ -30,7 +34,7 @@ protected AbstractJobRunnable executable() { AbstractJobRunnable executable = getJobExecutable(); executable.setStepOptions(stepArgs.stepOptions()); executable.setEvaluationContextOptions(evaluationContextOptions()); - executable.addStepConfigurationStrategy(this::configure); + executable.setStepConfigurer(this::configureStep); return executable; } @@ -38,12 +42,12 @@ protected EvaluationContextOptions evaluationContextOptions() { return evaluationContextArgs.evaluationContextOptions(); } - @SuppressWarnings("unchecked") - protected void configure(StepBuilder step) { + private void configureStep(RiotStep step) { if (progressArgs.style == ProgressStyle.NONE) { return; } ProgressBarBuilder progressBar = new ProgressBarBuilder(); + progressBar.setTaskName(taskName(step)); progressBar.setStyle(progressArgs.progressBarStyle()); progressBar.setUpdateIntervalMillis(progressArgs.updateInterval); progressBar.showSpeed(); @@ -51,22 +55,26 @@ protected void configure(StepBuilder step) { Logger logger = LoggerFactory.getLogger(getClass()); progressBar.setConsumer(new DelegatingProgressBarConsumer(logger::info)); } - progressBar.setInitialMax(size(step)); - progressBar.setTaskName(taskName(step)); - ProgressStepListener listener = new ProgressStepListener(progressBar); - Supplier extraMessage = extraMessage(step); - if (extraMessage != null) { - listener = listener.extraMessage(extraMessage); - } - step.addExecutionListener(listener); - step.addWriteListener(listener); + ProgressStepExecutionListener listener = new ProgressStepExecutionListener(progressBar); + listener.setExtraMessageSupplier(extraMessageSupplier(step)); + listener.setInitialMaxSupplier(initialMaxSupplier(step)); + step.setConfigurer(s -> { + s.listener((StepExecutionListener) listener); + s.listener((ItemWriteListener) listener); + }); } - protected abstract Supplier extraMessage(StepBuilder step); + protected String taskName(RiotStep step) { + return ClassUtils.getShortName(getClass()); + } - protected abstract String taskName(StepBuilder step); + protected Supplier extraMessageSupplier(RiotStep step) { + return () -> ProgressStepExecutionListener.EMPTY_STRING; + } - protected abstract long size(StepBuilder step); + protected LongSupplier initialMaxSupplier(RiotStep step) { + return () -> ProgressStepExecutionListener.UNKNOWN_SIZE; + } protected abstract AbstractJobRunnable getJobExecutable(); diff --git a/plugins/riot/src/main/java/com/redis/riot/cli/AbstractStructImportCommand.java b/plugins/riot/src/main/java/com/redis/riot/cli/AbstractStructImportCommand.java index 5d27e1b94..8f85e1f98 100644 --- a/plugins/riot/src/main/java/com/redis/riot/cli/AbstractStructImportCommand.java +++ b/plugins/riot/src/main/java/com/redis/riot/cli/AbstractStructImportCommand.java @@ -1,9 +1,6 @@ package com.redis.riot.cli; -import java.util.function.Supplier; - import com.redis.riot.core.AbstractStructImport; -import com.redis.riot.core.StepBuilder; import picocli.CommandLine.ArgGroup; @@ -21,9 +18,4 @@ protected AbstractStructImport getJobExecutable() { protected abstract AbstractStructImport getKeyValueImportExecutable(); - @Override - protected Supplier extraMessage(StepBuilder step) { - return null; - } - } diff --git a/plugins/riot/src/main/java/com/redis/riot/cli/DatabaseImportCommand.java b/plugins/riot/src/main/java/com/redis/riot/cli/DatabaseImportCommand.java index f47355ad6..190ddbfb4 100644 --- a/plugins/riot/src/main/java/com/redis/riot/cli/DatabaseImportCommand.java +++ b/plugins/riot/src/main/java/com/redis/riot/cli/DatabaseImportCommand.java @@ -1,9 +1,7 @@ package com.redis.riot.cli; import com.redis.riot.core.AbstractMapImport; -import com.redis.riot.core.StepBuilder; import com.redis.riot.db.DatabaseImport; -import com.redis.spring.batch.util.BatchUtils; import picocli.CommandLine.ArgGroup; import picocli.CommandLine.Command; @@ -29,9 +27,4 @@ protected AbstractMapImport getMapImportExecutable() { return executable; } - @Override - protected long size(StepBuilder step) { - return BatchUtils.SIZE_UNKNOWN; - } - } diff --git a/plugins/riot/src/main/java/com/redis/riot/cli/FakerImportCommand.java b/plugins/riot/src/main/java/com/redis/riot/cli/FakerImportCommand.java index f2b9a0da4..934d1c62e 100644 --- a/plugins/riot/src/main/java/com/redis/riot/cli/FakerImportCommand.java +++ b/plugins/riot/src/main/java/com/redis/riot/cli/FakerImportCommand.java @@ -2,10 +2,11 @@ import java.util.Locale; import java.util.Map; +import java.util.function.LongSupplier; import org.springframework.expression.Expression; -import com.redis.riot.core.StepBuilder; +import com.redis.riot.core.RiotStep; import com.redis.riot.faker.FakerImport; import picocli.CommandLine.Command; @@ -38,8 +39,8 @@ protected FakerImport getMapImportExecutable() { } @Override - protected long size(StepBuilder step) { - return count; + protected LongSupplier initialMaxSupplier(RiotStep step) { + return () -> count; } } diff --git a/plugins/riot/src/main/java/com/redis/riot/cli/FileDumpImportCommand.java b/plugins/riot/src/main/java/com/redis/riot/cli/FileDumpImportCommand.java index 85cbf1859..b73700bae 100644 --- a/plugins/riot/src/main/java/com/redis/riot/cli/FileDumpImportCommand.java +++ b/plugins/riot/src/main/java/com/redis/riot/cli/FileDumpImportCommand.java @@ -2,10 +2,9 @@ import java.util.List; -import com.redis.riot.core.StepBuilder; +import com.redis.riot.core.RiotStep; import com.redis.riot.file.FileDumpImport; import com.redis.riot.file.FileDumpType; -import com.redis.spring.batch.util.BatchUtils; import picocli.CommandLine.ArgGroup; import picocli.CommandLine.Command; @@ -34,13 +33,8 @@ protected FileDumpImport getKeyValueImportExecutable() { } @Override - protected String taskName(StepBuilder step) { + protected String taskName(RiotStep step) { return "Importing"; } - @Override - protected long size(StepBuilder step) { - return BatchUtils.SIZE_UNKNOWN; - } - } diff --git a/plugins/riot/src/main/java/com/redis/riot/cli/FileImportCommand.java b/plugins/riot/src/main/java/com/redis/riot/cli/FileImportCommand.java index 10c9472d8..cfd2bd7fb 100644 --- a/plugins/riot/src/main/java/com/redis/riot/cli/FileImportCommand.java +++ b/plugins/riot/src/main/java/com/redis/riot/cli/FileImportCommand.java @@ -6,10 +6,8 @@ import java.util.regex.Pattern; import com.redis.riot.core.AbstractMapImport; -import com.redis.riot.core.StepBuilder; import com.redis.riot.file.FileImport; import com.redis.riot.file.FileType; -import com.redis.spring.batch.util.BatchUtils; import picocli.CommandLine.ArgGroup; import picocli.CommandLine.Command; @@ -81,9 +79,4 @@ protected AbstractMapImport getMapImportExecutable() { return executable; } - @Override - protected long size(StepBuilder step) { - return BatchUtils.SIZE_UNKNOWN; - } - } diff --git a/plugins/riot/src/main/java/com/redis/riot/cli/GenerateCommand.java b/plugins/riot/src/main/java/com/redis/riot/cli/GenerateCommand.java index 2a2b49967..85a7bbc97 100644 --- a/plugins/riot/src/main/java/com/redis/riot/cli/GenerateCommand.java +++ b/plugins/riot/src/main/java/com/redis/riot/cli/GenerateCommand.java @@ -2,9 +2,10 @@ import java.time.Instant; import java.util.List; +import java.util.function.LongSupplier; import com.redis.riot.core.GeneratorImport; -import com.redis.riot.core.StepBuilder; +import com.redis.riot.core.RiotStep; import com.redis.spring.batch.common.DataType; import com.redis.spring.batch.common.Range; import com.redis.spring.batch.gen.CollectionOptions; @@ -21,6 +22,8 @@ @Command(name = "generate", description = "Generate data structures.") public class GenerateCommand extends AbstractStructImportCommand { + private static final String TASK_NAME = "Generating"; + @Option(names = "--count", description = "Number of items to generate (default: ${DEFAULT-VALUE}).", paramLabel = "") int count = GeneratorImport.DEFAULT_COUNT; @@ -88,13 +91,13 @@ public class GenerateCommand extends AbstractStructImportCommand { Range zsetScore = ZsetOptions.DEFAULT_SCORE; @Override - protected String taskName(StepBuilder step) { - return "Generating"; + protected String taskName(RiotStep step) { + return TASK_NAME; } @Override - protected long size(StepBuilder step) { - return count; + protected LongSupplier initialMaxSupplier(RiotStep step) { + return () -> count; } @Override @@ -127,7 +130,9 @@ private ZsetOptions zsetOptions() { private TimeSeriesOptions timeseriesOptions() { TimeSeriesOptions options = new TimeSeriesOptions(); options.setSampleCount(timeseriesSampleCount); - options.setStartTime(timeseriesStartTime); + if (timeseriesStartTime != null) { + options.setStartTime(timeseriesStartTime); + } return options; } diff --git a/plugins/riot/src/main/java/com/redis/riot/cli/KeyValueProcessorArgs.java b/plugins/riot/src/main/java/com/redis/riot/cli/KeyValueProcessorArgs.java index 048265956..6878fbaad 100644 --- a/plugins/riot/src/main/java/com/redis/riot/cli/KeyValueProcessorArgs.java +++ b/plugins/riot/src/main/java/com/redis/riot/cli/KeyValueProcessorArgs.java @@ -21,8 +21,8 @@ public class KeyValueProcessorArgs { @Option(names = "--no-ttl", description = "Do not propagate key TTLs.") boolean dropTtl; - @Option(names = "--no-stream-id", description = "Do not propagate stream message IDs.") - boolean dropStreamMessageId; + @Option(names = "--stream-ids", description = "Propagate stream message IDs from source to target.") + boolean streamMessageIds; public KeyValueProcessorOptions processorOptions() { KeyValueProcessorOptions options = new KeyValueProcessorOptions(); @@ -30,7 +30,7 @@ public KeyValueProcessorOptions processorOptions() { options.setTtlExpression(ttlExpression); options.setTypeExpression(typeExpression); options.setDropTtl(dropTtl); - options.setDropStreamMessageId(dropStreamMessageId); + options.setDropStreamMessageId(!streamMessageIds); return options; } diff --git a/plugins/riot/src/main/java/com/redis/riot/cli/LoggingMixin.java b/plugins/riot/src/main/java/com/redis/riot/cli/LoggingMixin.java index ca7608a5b..828cc23cf 100644 --- a/plugins/riot/src/main/java/com/redis/riot/cli/LoggingMixin.java +++ b/plugins/riot/src/main/java/com/redis/riot/cli/LoggingMixin.java @@ -33,7 +33,7 @@ public class LoggingMixin { @Spec(Target.MIXEE) private CommandSpec mixee; - Level level; + Level level = Level.WARN; String logFile; diff --git a/plugins/riot/src/main/java/com/redis/riot/cli/ProgressStepExecutionListener.java b/plugins/riot/src/main/java/com/redis/riot/cli/ProgressStepExecutionListener.java new file mode 100644 index 000000000..4dad70322 --- /dev/null +++ b/plugins/riot/src/main/java/com/redis/riot/cli/ProgressStepExecutionListener.java @@ -0,0 +1,75 @@ +package com.redis.riot.cli; + +import java.util.List; +import java.util.function.LongSupplier; +import java.util.function.Supplier; + +import org.springframework.batch.core.ExitStatus; +import org.springframework.batch.core.StepExecution; +import org.springframework.batch.core.StepExecutionListener; +import org.springframework.batch.core.listener.ItemListenerSupport; + +import me.tongfei.progressbar.ProgressBar; +import me.tongfei.progressbar.ProgressBarBuilder; + +/** + * Listener tracking writer or step progress with by a progress bar. + * + * @author Julien Ruaux + * @since 3.1.2 + */ +@SuppressWarnings("rawtypes") +public class ProgressStepExecutionListener extends ItemListenerSupport implements StepExecutionListener { + + public static final long UNKNOWN_SIZE = -1; + + public static final String EMPTY_STRING = ""; + + private final ProgressBarBuilder builder; + + private LongSupplier initialMaxSupplier = () -> UNKNOWN_SIZE; + + private Supplier extraMessageSupplier = () -> EMPTY_STRING; + + private ProgressBar progressBar; + + public ProgressStepExecutionListener(ProgressBarBuilder builder) { + this.builder = builder; + } + + public void setInitialMaxSupplier(LongSupplier supplier) { + this.initialMaxSupplier = supplier; + } + + public void setExtraMessageSupplier(Supplier supplier) { + this.extraMessageSupplier = supplier; + } + + @Override + public void beforeStep(StepExecution stepExecution) { + progressBar = builder.build(); + progressBar.maxHint(initialMaxSupplier.getAsLong()); + } + + @Override + public ExitStatus afterStep(StepExecution stepExecution) { + if (!stepExecution.getStatus().isUnsuccessful()) { + progressBar.stepTo(progressBar.getMax()); + } + progressBar.close(); + return stepExecution.getExitStatus(); + } + + @Override + public void afterWrite(List items) { + progressBar.stepBy(items.size()); + progressBar.setExtraMessage(extraMessageSupplier.get()); + } + + @Override + public void onWriteError(Exception exception, List items) { + progressBar.stepBy(items.size()); + + } + +} diff --git a/plugins/riot/src/main/java/com/redis/riot/cli/ProgressStepListener.java b/plugins/riot/src/main/java/com/redis/riot/cli/ProgressStepListener.java deleted file mode 100644 index 3e0220886..000000000 --- a/plugins/riot/src/main/java/com/redis/riot/cli/ProgressStepListener.java +++ /dev/null @@ -1,84 +0,0 @@ -package com.redis.riot.cli; - -import java.util.List; -import java.util.function.Supplier; - -import org.springframework.batch.core.ExitStatus; -import org.springframework.batch.core.StepExecution; -import org.springframework.batch.core.listener.StepListenerSupport; - -import me.tongfei.progressbar.ProgressBar; -import me.tongfei.progressbar.ProgressBarBuilder; - -/** - * Listener tracking writer or step progress with by a progress bar. - * - * @author Julien Ruaux - * @since 3.1.2 - */ -@SuppressWarnings("rawtypes") -public class ProgressStepListener extends StepListenerSupport { - - private final ProgressBarBuilder progressBarBuilder; - - protected ProgressBar progressBar; - - public ProgressStepListener(ProgressBarBuilder progressBarBuilder) { - this.progressBarBuilder = progressBarBuilder; - } - - public ProgressBarBuilder getProgressBarBuilder() { - return progressBarBuilder; - } - - @Override - public void beforeStep(StepExecution stepExecution) { - progressBar = progressBarBuilder.build(); - } - - @Override - public ExitStatus afterStep(StepExecution stepExecution) { - if (!stepExecution.getStatus().isUnsuccessful()) { - progressBar.stepTo(progressBar.getMax()); - } - progressBar.close(); - return stepExecution.getExitStatus(); - } - - @Override - public void beforeWrite(List items) { - // Do nothing - } - - @Override - public void afterWrite(List items) { - progressBar.stepBy(items.size()); - } - - @Override - public void onWriteError(Exception exception, List items) { - progressBar.stepBy(items.size()); - } - - public ExtraMessageProgressStepListener extraMessage(Supplier extraMessage) { - return new ExtraMessageProgressStepListener(progressBarBuilder, extraMessage); - } - - private static class ExtraMessageProgressStepListener extends ProgressStepListener { - - private final Supplier extraMessage; - - public ExtraMessageProgressStepListener(ProgressBarBuilder progressBarBuilder, Supplier extraMessage) { - super(progressBarBuilder); - this.extraMessage = extraMessage; - } - - @Override - public void afterWrite(List items) { - progressBar.setExtraMessage(extraMessage.get()); - super.afterWrite(items); - } - - } - -} diff --git a/plugins/riot/src/main/java/com/redis/riot/cli/ReplicateCommand.java b/plugins/riot/src/main/java/com/redis/riot/cli/ReplicateCommand.java index 8c191b55e..75f421fd2 100644 --- a/plugins/riot/src/main/java/com/redis/riot/cli/ReplicateCommand.java +++ b/plugins/riot/src/main/java/com/redis/riot/cli/ReplicateCommand.java @@ -1,5 +1,7 @@ package com.redis.riot.cli; +import java.util.HashMap; +import java.util.Map; import java.util.function.Supplier; import com.redis.riot.cli.RedisReaderArgs.ReadFromEnum; @@ -8,11 +10,10 @@ import com.redis.riot.core.Replication; import com.redis.riot.core.ReplicationMode; import com.redis.riot.core.ReplicationType; -import com.redis.riot.core.StepBuilder; +import com.redis.riot.core.RiotStep; import com.redis.spring.batch.RedisItemReader; import com.redis.spring.batch.common.KeyComparison.Status; import com.redis.spring.batch.reader.KeyspaceNotificationItemReader; -import com.redis.spring.batch.util.BatchUtils; import picocli.CommandLine.ArgGroup; import picocli.CommandLine.Command; @@ -23,11 +24,13 @@ public class ReplicateCommand extends AbstractExportCommand { private static final Status[] STATUSES = { Status.OK, Status.MISSING, Status.TYPE, Status.VALUE, Status.TTL }; - private static final String QUEUE_MESSAGE = " | %,d queue capacity"; + private static final String QUEUE_MESSAGE = " | %,d queue space"; private static final String NUMBER_FORMAT = "%,d"; - private static final String COMPARE_MESSAGE = " | %s: %s"; + private static final String COMPARE_MESSAGE = compareMessageFormat(); + + private static final Map taskNames = taskNames(); @Option(names = "--mode", description = "Replication mode: ${COMPLETION-CANDIDATES} (default: ${DEFAULT-VALUE}).", paramLabel = "") ReplicationMode mode = ReplicationMode.SNAPSHOT; @@ -47,6 +50,14 @@ public class ReplicateCommand extends AbstractExportCommand { @ArgGroup(exclusive = false, heading = "Compare options%n") ReplicationCompareArgs compareArgs = new ReplicationCompareArgs(); + private static Map taskNames() { + Map map = new HashMap<>(); + map.put(Replication.STEP_SCAN, "Scanning"); + map.put(Replication.STEP_LIVE, "Listening"); + map.put(Replication.STEP_COMPARE, "Comparing"); + return map; + } + @Override protected AbstractExport getExport() { Replication replication = new Replication(parent.out); @@ -62,68 +73,43 @@ protected AbstractExport getExport() { } @Override - protected String taskName(StepBuilder step) { - switch (step.getName()) { - case Replication.STEP_SCAN: - return "Scanning"; - case Replication.STEP_LIVE: - return "Listening"; - case Replication.STEP_COMPARE: - return "Comparing"; - default: - return "Unknown"; - } + protected String taskName(RiotStep step) { + return taskNames.getOrDefault(step.getName(), "Unknown"); } - @Override - protected long size(StepBuilder step) { - if (Replication.STEP_LIVE.equals(step.getName())) { - return BatchUtils.SIZE_UNKNOWN; + private static String compareMessageFormat() { + StringBuilder builder = new StringBuilder(); + for (Status status : STATUSES) { + builder.append(String.format(" | %s: %s", status.name().toLowerCase(), NUMBER_FORMAT)); } - return super.size(step); + return builder.toString(); } @Override - protected Supplier extraMessage(StepBuilder step) { + protected Supplier extraMessageSupplier(RiotStep step) { switch (step.getName()) { case Replication.STEP_COMPARE: - return compareMessage(step); + return () -> compareExtraMessage(step); case Replication.STEP_LIVE: - return liveMessage(step); + RedisItemReader reader = (RedisItemReader) step.getReader(); + return () -> liveExtraMessage(reader); default: - return super.extraMessage(step); + return super.extraMessageSupplier(step); } } - private Supplier liveMessage(StepBuilder step) { - RedisItemReader reader = (RedisItemReader) step.getReader(); - return () -> liveMessage(reader); + private String compareExtraMessage(RiotStep step) { + KeyComparisonStatusCountItemWriter writer = (KeyComparisonStatusCountItemWriter) step.getWriter(); + return String.format(COMPARE_MESSAGE, writer.getCounts(STATUSES).toArray()); } - private String liveMessage(RedisItemReader reader) { + private String liveExtraMessage(RedisItemReader reader) { KeyspaceNotificationItemReader keyReader = (KeyspaceNotificationItemReader) reader.getKeyReader(); if (keyReader == null) { - return ""; + return ProgressStepExecutionListener.EMPTY_STRING; } return String.format(QUEUE_MESSAGE, keyReader.getQueue().remainingCapacity()); } - private Supplier compareMessage(StepBuilder step) { - String format = compareFormat(); - return () -> String.format(format, counts(step)); - } - - private String compareFormat() { - StringBuilder builder = new StringBuilder(); - for (Status status : STATUSES) { - builder.append(String.format(COMPARE_MESSAGE, status.name().toLowerCase(), NUMBER_FORMAT)); - } - return builder.toString(); - } - - private Object[] counts(StepBuilder step) { - return ((KeyComparisonStatusCountItemWriter) step.getWriter()).getCounts(STATUSES); - } - } diff --git a/plugins/riot/src/main/java/com/redis/riot/cli/ReplicationCompareArgs.java b/plugins/riot/src/main/java/com/redis/riot/cli/ReplicationCompareArgs.java index 6e2d50b74..43f7a6896 100644 --- a/plugins/riot/src/main/java/com/redis/riot/cli/ReplicationCompareArgs.java +++ b/plugins/riot/src/main/java/com/redis/riot/cli/ReplicationCompareArgs.java @@ -4,7 +4,7 @@ import com.redis.riot.core.KeyComparisonMode; import com.redis.riot.core.KeyComparisonOptions; -import com.redis.spring.batch.common.KeyComparisonItemReader; +import com.redis.spring.batch.reader.KeyComparisonValueReader; import picocli.CommandLine.Option; @@ -14,7 +14,7 @@ public class ReplicationCompareArgs { boolean noVerify; @Option(names = "--ttl-tolerance", description = "Max TTL offset in millis to use for dataset verification (default: ${DEFAULT-VALUE}).", paramLabel = "") - long ttlTolerance = KeyComparisonItemReader.DEFAULT_TTL_TOLERANCE.toMillis(); + long ttlTolerance = KeyComparisonValueReader.DEFAULT_TTL_TOLERANCE.toMillis(); @Option(names = "--show-diffs", description = "Print details of key mismatches during dataset verification. Disables progress reporting.") boolean showDiffs; diff --git a/plugins/riot/src/test/java/com/redis/riot/cli/AbstractRiotTests.java b/plugins/riot/src/test/java/com/redis/riot/cli/AbstractRiotTests.java deleted file mode 100644 index 8c19be1cd..000000000 --- a/plugins/riot/src/test/java/com/redis/riot/cli/AbstractRiotTests.java +++ /dev/null @@ -1,245 +0,0 @@ -package com.redis.riot.cli; - -import java.io.InputStream; -import java.io.PrintWriter; -import java.nio.charset.Charset; -import java.time.Duration; -import java.util.UUID; -import java.util.concurrent.Callable; -import java.util.function.Supplier; - -import org.awaitility.Awaitility; -import org.codehaus.plexus.util.cli.CommandLineUtils; -import org.junit.jupiter.api.AfterAll; -import org.junit.jupiter.api.Assertions; -import org.junit.jupiter.api.BeforeAll; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.TestInstance; -import org.junit.jupiter.api.TestInstance.Lifecycle; -import org.slf4j.event.Level; -import org.slf4j.impl.SimpleLogger; -import org.springframework.batch.core.JobExecution; -import org.springframework.batch.core.JobExecutionException; -import org.springframework.batch.core.JobParameters; -import org.springframework.batch.core.configuration.annotation.JobBuilderFactory; -import org.springframework.batch.core.configuration.annotation.StepBuilderFactory; -import org.springframework.batch.core.launch.support.SimpleJobLauncher; -import org.springframework.batch.core.repository.JobRepository; -import org.springframework.batch.core.repository.support.MapJobRepositoryFactoryBean; -import org.springframework.batch.core.step.builder.SimpleStepBuilder; -import org.springframework.batch.item.ItemReader; -import org.springframework.batch.item.ItemWriter; -import org.springframework.batch.support.transaction.ResourcelessTransactionManager; -import org.springframework.core.task.SyncTaskExecutor; -import org.springframework.transaction.PlatformTransactionManager; - -import com.redis.lettucemod.RedisModulesClient; -import com.redis.lettucemod.api.StatefulRedisModulesConnection; -import com.redis.lettucemod.api.sync.RedisModulesCommands; -import com.redis.lettucemod.cluster.RedisModulesClusterClient; -import com.redis.lettucemod.util.RedisModulesUtils; -import com.redis.riot.cli.ProgressArgs.ProgressStyle; -import com.redis.riot.cli.operation.OperationCommand; -import com.redis.spring.batch.common.DataType; -import com.redis.spring.batch.common.Range; -import com.redis.spring.batch.gen.GeneratorItemReader; -import com.redis.spring.batch.reader.StructItemReader; -import com.redis.spring.batch.writer.StructItemWriter; -import com.redis.testcontainers.RedisServer; - -import io.lettuce.core.AbstractRedisClient; -import io.lettuce.core.RedisURI; -import io.lettuce.core.codec.StringCodec; -import io.micrometer.core.instrument.util.IOUtils; -import picocli.CommandLine.ExitCode; -import picocli.CommandLine.IExecutionStrategy; -import picocli.CommandLine.ParseResult; - -@SuppressWarnings("deprecation") -@TestInstance(Lifecycle.PER_CLASS) -public abstract class AbstractRiotTests { - - private static final String PREFIX = "riot "; - - private static final Duration DEFAULT_AWAIT_TIMEOUT = Duration.ofSeconds(1); - - private static final Range DEFAULT_GENERATOR_KEY_RANGE = Range.to(10000); - - protected static final int DEFAULT_BATCH_SIZE = 50; - - private static final int DEFAULT_GENERATOR_COUNT = 100; - - protected JobRepository jobRepository; - - private JobBuilderFactory jobBuilderFactory; - - private SimpleJobLauncher jobLauncher; - - private StepBuilderFactory stepBuilderFactory; - - protected AbstractRedisClient client; - - protected StatefulRedisModulesConnection connection; - - private PlatformTransactionManager transactionManager; - - protected abstract RedisServer getRedisServer(); - - protected static void assertExecutionSuccessful(int exitCode) { - Assertions.assertEquals(0, exitCode); - } - - @BeforeAll - void setup() { - System.setProperty(SimpleLogger.DEFAULT_LOG_LEVEL_KEY, Level.DEBUG.name()); - RedisServer redis = getRedisServer(); - redis.start(); - client = client(redis); - connection = RedisModulesUtils.connection(client); - } - - @AfterAll - void teardown() { - connection.close(); - client.shutdown(); - client.getResources().shutdown(); - getRedisServer().close(); - } - - @BeforeEach - void setupTest() throws Exception { - MapJobRepositoryFactoryBean bean = new MapJobRepositoryFactoryBean(); - bean.afterPropertiesSet(); - jobRepository = bean.getObject(); - transactionManager = bean.getTransactionManager(); - jobBuilderFactory = new JobBuilderFactory(jobRepository); - jobLauncher = new SimpleJobLauncher(); - jobLauncher.setJobRepository(jobRepository); - jobLauncher.setTaskExecutor(new SyncTaskExecutor()); - stepBuilderFactory = new StepBuilderFactory(jobRepository, new ResourcelessTransactionManager()); - connection.sync().flushall(); - RedisModulesCommands sync = connection.sync(); - awaitEquals(() -> 0l, sync::dbsize); - } - - protected T command(ParseResult parseResult) { - return parseResult.subcommands().get(0).commandSpec().commandLine().getCommand(); - } - - protected void awaitUntilFalse(Callable conditionEvaluator) { - awaitUntil(() -> !conditionEvaluator.call()); - } - - protected void awaitUntil(Callable conditionEvaluator) { - Awaitility.await().timeout(DEFAULT_AWAIT_TIMEOUT).until(conditionEvaluator); - } - - protected void awaitEquals(Supplier expected, Supplier actual) { - awaitUntil(() -> expected.get().equals(actual.get())); - } - - protected AbstractRedisClient client(RedisServer server) { - RedisURI uri = RedisURI.create(server.getRedisURI()); - if (server.isCluster()) { - return RedisModulesClusterClient.create(uri); - } - return RedisModulesClient.create(uri); - } - - protected int execute(String filename, IExecutionStrategy... executionStrategies) throws Exception { - String[] args = args(filename); - return Main.run(new PrintWriter(System.out), new PrintWriter(System.err), args, executionStrategy(executionStrategies)); - } - - private IExecutionStrategy executionStrategy(IExecutionStrategy... executionStrategies) { - CompositeExecutionStrategy strategy = new CompositeExecutionStrategy(); - strategy.addDelegates(this::execute); - strategy.addDelegates(executionStrategies); - return strategy; - } - - private int execute(ParseResult parseResult) { - Main main = (Main) parseResult.commandSpec().commandLine().getCommand(); - main.redisArgs.uriArgs.uri = getRedisServer().getRedisURI(); - main.redisArgs.cluster = getRedisServer().isCluster(); - for (ParseResult subParseResult : parseResult.subcommands()) { - Object command = subParseResult.commandSpec().commandLine().getCommand(); - if (command instanceof OperationCommand) { - command = subParseResult.commandSpec().parent().commandLine().getCommand(); - } - configureCommand(command); - } - return ExitCode.OK; - } - - protected void configureCommand(Object command) { - if (command instanceof AbstractJobCommand) { - AbstractJobCommand jobCommand = ((AbstractJobCommand) command); - jobCommand.progressArgs.style = ProgressStyle.NONE; - } - } - - private static String[] args(String filename) throws Exception { - try (InputStream inputStream = Main.class.getResourceAsStream("/" + filename)) { - String command = IOUtils.toString(inputStream, Charset.defaultCharset()); - if (command.startsWith(PREFIX)) { - command = command.substring(PREFIX.length()); - } - return CommandLineUtils.translateCommandline(command); - } - } - - protected GeneratorItemReader generator() { - return generator(DEFAULT_GENERATOR_COUNT); - } - - protected GeneratorItemReader generator(int count) { - GeneratorItemReader generator = new GeneratorItemReader(); - generator.setKeyRange(DEFAULT_GENERATOR_KEY_RANGE); - generator.setTypes(DataType.ZSET); - generator.setMaxItemCount(count); - return generator; - } - - protected void generate(String name) throws JobExecutionException { - generate(name, DEFAULT_BATCH_SIZE, generator()); - } - - protected void generate(String name, GeneratorItemReader reader) throws JobExecutionException { - generate(name, DEFAULT_BATCH_SIZE, reader); - } - - protected void generate(String name, int chunkSize, GeneratorItemReader reader) throws JobExecutionException { - run(name + "-generate", chunkSize, reader, structWriter(client)); - awaitUntilFalse(reader::isOpen); - } - - protected StructItemWriter structWriter(AbstractRedisClient client) { - return new StructItemWriter<>(client, StringCodec.UTF8); - } - - protected StructItemReader structReader(AbstractRedisClient client) { - StructItemReader reader = new StructItemReader<>(client, StringCodec.UTF8); - reader.setJobRepository(jobRepository); - reader.setTransactionManager(transactionManager); - return reader; - } - - protected JobExecution run(String name, int chunkSize, ItemReader reader, ItemWriter writer) - throws JobExecutionException { - SimpleStepBuilder step = step(name, chunkSize, reader, writer); - return jobLauncher.run(jobBuilderFactory.get(name).start(step.build()).build(), new JobParameters()); - } - - protected String id() { - return UUID.randomUUID().toString(); - } - - protected SimpleStepBuilder step(String name, int chunkSize, ItemReader reader, ItemWriter writer) { - SimpleStepBuilder step = stepBuilderFactory.get(name).chunk(chunkSize); - step.reader(reader); - step.writer(writer); - return step; - } - -} diff --git a/plugins/riot/src/test/java/com/redis/riot/cli/DatabaseTests.java b/plugins/riot/src/test/java/com/redis/riot/cli/DatabaseTests.java index 385ef9f67..b29724fbd 100644 --- a/plugins/riot/src/test/java/com/redis/riot/cli/DatabaseTests.java +++ b/plugins/riot/src/test/java/com/redis/riot/cli/DatabaseTests.java @@ -14,13 +14,15 @@ import org.springframework.boot.autoconfigure.jdbc.DataSourceProperties; import org.testcontainers.containers.JdbcDatabaseContainer; +import com.redis.spring.batch.common.DataType; +import com.redis.spring.batch.test.AbstractTestBase; import com.redis.testcontainers.RedisServer; import com.redis.testcontainers.RedisStackContainer; import picocli.CommandLine.ExitCode; import picocli.CommandLine.ParseResult; -abstract class DatabaseTests extends AbstractRiotTests { +abstract class DatabaseTests extends RiotTests { private static final RedisStackContainer REDIS = new RedisStackContainer( RedisStackContainer.DEFAULT_IMAGE_NAME.withTag(RedisStackContainer.DEFAULT_TAG)); @@ -31,6 +33,11 @@ abstract class DatabaseTests extends AbstractRiotTests { protected DataSource dataSource; + @Override + protected DataType[] generatorDataTypes() { + return AbstractTestBase.REDIS_MODULES_GENERATOR_TYPES; + } + @BeforeAll public void setupContainers() throws SQLException { JdbcDatabaseContainer container = getJdbcDatabaseContainer(); @@ -54,6 +61,11 @@ protected RedisServer getRedisServer() { return REDIS; } + @Override + protected RedisServer getTargetRedisServer() { + return REDIS; + } + protected void executeScript(String file) throws IOException, SQLException { SqlScriptRunner scriptRunner = new SqlScriptRunner(databaseConnection); scriptRunner.setAutoCommit(false); @@ -70,7 +82,7 @@ protected int executeDatabaseImport(ParseResult parseResult) { configureDatabase(command.args); return ExitCode.OK; } - + protected int executeDatabaseExport(ParseResult parseResult) { DatabaseExportCommand command = command(parseResult); configureDatabase(command.args); diff --git a/plugins/riot/src/test/java/com/redis/riot/cli/EnterpriseToStackTests.java b/plugins/riot/src/test/java/com/redis/riot/cli/EnterpriseToStackTests.java new file mode 100644 index 000000000..df5809dc4 --- /dev/null +++ b/plugins/riot/src/test/java/com/redis/riot/cli/EnterpriseToStackTests.java @@ -0,0 +1,32 @@ +package com.redis.riot.cli; + +import org.junit.jupiter.api.condition.EnabledOnOs; +import org.junit.jupiter.api.condition.OS; + +import com.redis.spring.batch.common.DataType; +import com.redis.spring.batch.test.AbstractTestBase; +import com.redis.testcontainers.RedisServer; + +@EnabledOnOs(OS.LINUX) +class EnterpriseToStackTests extends IntegrationTests { + + private static final RedisServer SOURCE = RedisContainerFactory.enterprise(); + + private static final RedisServer TARGET = RedisContainerFactory.stack(); + + @Override + protected RedisServer getRedisServer() { + return SOURCE; + } + + @Override + protected RedisServer getTargetRedisServer() { + return TARGET; + } + + @Override + protected DataType[] generatorDataTypes() { + return AbstractTestBase.REDIS_MODULES_GENERATOR_TYPES; + } + +} diff --git a/plugins/riot/src/test/java/com/redis/riot/cli/AbstractIntegrationTests.java b/plugins/riot/src/test/java/com/redis/riot/cli/IntegrationTests.java similarity index 50% rename from plugins/riot/src/test/java/com/redis/riot/cli/AbstractIntegrationTests.java rename to plugins/riot/src/test/java/com/redis/riot/cli/IntegrationTests.java index f1e903551..7427af2b1 100644 --- a/plugins/riot/src/test/java/com/redis/riot/cli/AbstractIntegrationTests.java +++ b/plugins/riot/src/test/java/com/redis/riot/cli/IntegrationTests.java @@ -4,42 +4,29 @@ import java.io.IOException; import java.nio.file.Files; import java.nio.file.Path; -import java.time.Duration; import java.time.Instant; -import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.Map; import java.util.Set; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import java.util.zip.GZIPInputStream; -import org.awaitility.Awaitility; -import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeAll; -import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.batch.core.JobExecutionException; +import org.junit.jupiter.api.TestInfo; import org.springframework.batch.item.ExecutionContext; import org.springframework.batch.item.json.JacksonJsonObjectReader; import org.springframework.batch.item.json.JsonItemReader; import org.springframework.batch.item.json.builder.JsonItemReaderBuilder; -import org.springframework.batch.item.support.AbstractItemCountingItemStreamItemReader; import org.springframework.core.io.FileSystemResource; import org.springframework.core.io.InputStreamResource; import org.springframework.util.FileCopyUtils; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.dataformat.xml.XmlMapper; -import com.redis.lettucemod.api.StatefulRedisModulesConnection; -import com.redis.lettucemod.api.sync.RedisModulesCommands; import com.redis.lettucemod.search.CreateOptions; import com.redis.lettucemod.search.Document; import com.redis.lettucemod.search.Field; @@ -50,50 +37,26 @@ import com.redis.lettucemod.timeseries.MRangeOptions; import com.redis.lettucemod.timeseries.RangeResult; import com.redis.lettucemod.timeseries.TimeRange; -import com.redis.lettucemod.util.RedisModulesUtils; import com.redis.riot.core.GeneratorImport; -import com.redis.riot.core.ReplicationMode; import com.redis.riot.file.resource.XmlItemReader; import com.redis.riot.file.resource.XmlItemReaderBuilder; import com.redis.riot.file.resource.XmlObjectReader; -import com.redis.spring.batch.common.DataType; -import com.redis.spring.batch.common.KeyComparison; -import com.redis.spring.batch.common.KeyComparison.Status; -import com.redis.spring.batch.common.KeyComparisonItemReader; import com.redis.spring.batch.common.KeyValue; import com.redis.spring.batch.gen.GeneratorItemReader; -import com.redis.testcontainers.RedisServer; -import io.lettuce.core.AbstractRedisClient; import io.lettuce.core.GeoArgs; import io.lettuce.core.Range; import io.lettuce.core.StreamMessage; -import io.lettuce.core.api.sync.RedisGeoCommands; -import io.lettuce.core.api.sync.RedisHashCommands; -import io.lettuce.core.api.sync.RedisKeyCommands; -import io.lettuce.core.api.sync.RedisServerCommands; -import io.lettuce.core.api.sync.RedisSetCommands; -import io.lettuce.core.api.sync.RedisSortedSetCommands; -import io.lettuce.core.api.sync.RedisStreamCommands; -import io.lettuce.core.cluster.SlotHash; import picocli.CommandLine.ExitCode; import picocli.CommandLine.ParseResult; @SuppressWarnings("unchecked") -public abstract class AbstractIntegrationTests extends AbstractRiotTests { - - private final Logger log = LoggerFactory.getLogger(getClass()); +public abstract class IntegrationTests extends RiotTests { public static final int BEER_CSV_COUNT = 2410; public static final int BEER_JSON_COUNT = 216; - private static final Duration DEFAULT_IDLE_TIMEOUT = Duration.ofSeconds(1); - - private static final Duration COMPARE_TIMEOUT = Duration.ofSeconds(3); - - private static final int DEFAULT_NOTIFICATION_QUEUE_CAPACITY = 100000; - private static Path tempDir; protected static String name(Map beer) { @@ -115,54 +78,11 @@ public void setupFiles() throws IOException { protected List testImport(String filename, String pattern, int count) throws Exception { execute(filename); - RedisKeyCommands sync = connection.sync(); - List keys = sync.keys(pattern); + List keys = commands.keys(pattern); Assertions.assertEquals(count, keys.size()); return keys; } - private AbstractRedisClient targetClient; - - private StatefulRedisModulesConnection targetConnection; - - @BeforeAll - void setupTarget() { - RedisServer target = getTargetRedisServer(); - target.start(); - targetClient = client(target); - targetConnection = RedisModulesUtils.connection(targetClient); - } - - @AfterAll - void teardownTarget() { - targetConnection.close(); - targetClient.shutdown(); - targetClient.getResources().shutdown(); - getTargetRedisServer().close(); - } - - protected abstract RedisServer getTargetRedisServer(); - - @BeforeEach - void flushAllTarget() throws InterruptedException { - targetConnection.sync().flushall(); - RedisModulesCommands sync = targetConnection.sync(); - awaitEquals(() -> 0L, sync::dbsize); - } - - @Override - protected void configureCommand(Object command) { - super.configureCommand(command); - if (command instanceof ReplicateCommand) { - ReplicateCommand replicationCommand = (ReplicateCommand) command; - replicationCommand.targetRedisClientArgs.uriArgs.uri = getTargetRedisServer().getRedisURI(); - if (replicationCommand.mode == ReplicationMode.LIVE || replicationCommand.mode == ReplicationMode.LIVEONLY) { - replicationCommand.readerArgs.setIdleTimeout(DEFAULT_IDLE_TIMEOUT.toMillis()); - replicationCommand.readerArgs.setNotificationQueueCapacity(DEFAULT_NOTIFICATION_QUEUE_CAPACITY); - } - } - } - private String replace(String file) { return file.replace("/tmp", tempDir.toString()); } @@ -175,22 +95,10 @@ protected Path tempFile(String filename) throws IOException { return path; } - protected List readAll(AbstractItemCountingItemStreamItemReader reader) throws Exception { - reader.open(new ExecutionContext()); - List records = new ArrayList<>(); - T record; - while ((record = reader.read()) != null) { - records.add(record); - } - reader.close(); - return records; - } - @Test void fileImportFW() throws Exception { testImport("file-import-fw", "account:*", 5); - RedisHashCommands hash = connection.sync(); - Map account101 = hash.hgetall("account:101"); + Map account101 = commands.hgetall("account:101"); // Account LastName FirstName Balance CreditLimit AccountCreated Rating // 101 Reeves Keanu 9315.45 10000.00 1/17/1998 A Assertions.assertEquals("Reeves", account101.get("LastName")); @@ -231,8 +139,7 @@ void fileImportType() throws Exception { @Test void fileImportExclude() throws Exception { execute("file-import-exclude"); - RedisHashCommands sync = connection.sync(); - Map beer1036 = sync.hgetall("beer:1036"); + Map beer1036 = commands.hgetall("beer:1036"); Assertions.assertEquals("Lower De Boom", name(beer1036)); Assertions.assertEquals("American Barleywine", style(beer1036)); Assertions.assertEquals("368", beer1036.get("brewery_id")); @@ -243,8 +150,7 @@ void fileImportExclude() throws Exception { @Test void fileImportInclude() throws Exception { execute("file-import-include"); - RedisHashCommands sync = connection.sync(); - Map beer1036 = sync.hgetall("beer:1036"); + Map beer1036 = commands.hgetall("beer:1036"); Assertions.assertEquals(3, beer1036.size()); Assertions.assertEquals("Lower De Boom", name(beer1036)); Assertions.assertEquals("American Barleywine", style(beer1036)); @@ -259,8 +165,7 @@ void fileImportFilter() throws Exception { @Test void fileImportRegex() throws Exception { execute("file-import-regex"); - RedisHashCommands sync = connection.sync(); - Map airport1 = sync.hgetall("airport:1"); + Map airport1 = commands.hgetall("airport:1"); Assertions.assertEquals("Pacific", airport1.get("region")); Assertions.assertEquals("Port_Moresby", airport1.get("city")); } @@ -268,8 +173,7 @@ void fileImportRegex() throws Exception { @Test void fileImportGlob() throws Exception { execute("file-import-glob", this::executeImportGlob); - RedisKeyCommands sync = connection.sync(); - List keys = sync.keys("beer:*"); + List keys = commands.keys("beer:*"); Assertions.assertEquals(BEER_CSV_COUNT, keys.size()); } @@ -293,8 +197,7 @@ private int executeImportGlob(ParseResult parseResult) { @Test void fileImportGeoadd() throws Exception { execute("file-import-geoadd"); - RedisGeoCommands sync = connection.sync(); - Set results = sync.georadius("airportgeo", -21, 64, 200, GeoArgs.Unit.mi); + Set results = commands.georadius("airportgeo", -21, 64, 200, GeoArgs.Unit.mi); Assertions.assertTrue(results.contains("18")); Assertions.assertTrue(results.contains("19")); Assertions.assertTrue(results.contains("11")); @@ -303,16 +206,14 @@ void fileImportGeoadd() throws Exception { @Test void fileImportGeoProcessor() throws Exception { execute("file-import-geo-processor"); - RedisHashCommands sync = connection.sync(); - Map airport3469 = sync.hgetall("airport:18"); + Map airport3469 = commands.hgetall("airport:18"); Assertions.assertEquals("-21.9405994415,64.1299972534", airport3469.get("location")); } @Test void fileImportProcess() throws Exception { testImport("file-import-process", "event:*", 568); - RedisHashCommands hash = connection.sync(); - Map event = hash.hgetall("event:248206"); + Map event = commands.hgetall("event:248206"); Instant date = Instant.ofEpochMilli(Long.parseLong(event.get("EpochStart"))); Assertions.assertTrue(date.isBefore(Instant.now())); } @@ -320,23 +221,21 @@ void fileImportProcess() throws Exception { @Test void fileImportProcessElvis() throws Exception { testImport("file-import-process-elvis", "beer:*", BEER_CSV_COUNT); - Map beer1436 = connection.sync().hgetall("beer:1436"); + Map beer1436 = commands.hgetall("beer:1436"); Assertions.assertEquals("10", beer1436.get("ibu")); } @Test void fileImportMultiCommands() throws Exception { execute("file-import-multi-commands"); - RedisKeyCommands sync = connection.sync(); - List beers = sync.keys("beer:*"); + List beers = commands.keys("beer:*"); Assertions.assertEquals(BEER_CSV_COUNT, beers.size()); for (String beer : beers) { - Map hash = ((RedisHashCommands) sync).hgetall(beer); + Map hash = commands.hgetall(beer); Assertions.assertTrue(hash.containsKey("name")); Assertions.assertTrue(hash.containsKey("brewery_id")); } - RedisSetCommands set = connection.sync(); - Set breweries = set.smembers("breweries"); + Set breweries = commands.smembers("breweries"); Assertions.assertEquals(558, breweries.size()); } @@ -348,25 +247,24 @@ void fileImportBad() throws Exception { @Test void fileImportGCS() throws Exception { testImport("file-import-gcs", "beer:*", 4432); - Map beer1 = connection.sync().hgetall("beer:1"); + Map beer1 = commands.hgetall("beer:1"); Assertions.assertEquals("Hocus Pocus", name(beer1)); } @Test void fileImportS3() throws Exception { testImport("file-import-s3", "beer:*", 4432); - Map beer1 = connection.sync().hgetall("beer:1"); + Map beer1 = commands.hgetall("beer:1"); Assertions.assertEquals("Hocus Pocus", name(beer1)); } @SuppressWarnings("rawtypes") @Test - void fileDumpImport() throws Exception { - List records = exportToJsonFile(); - RedisServerCommands sync = connection.sync(); - sync.flushall(); + void fileDumpImport(TestInfo info) throws Exception { + List records = exportToJsonFile(info); + commands.flushall(); execute("dump-import", this::executeFileDumpImport); - awaitEquals(records::size, () -> Math.toIntExact(sync.dbsize())); + awaitUntil(() -> records.size() == Math.toIntExact(commands.dbsize())); } private int executeFileDumpImport(ParseResult parseResult) { @@ -385,9 +283,8 @@ private int executeFileDumpExport(ParseResult parseResult) { @Disabled("Needs update") void fileImportJSONElastic() throws Exception { execute("file-import-json-elastic"); - RedisKeyCommands sync = connection.sync(); - Assertions.assertEquals(2, sync.keys("estest:*").size()); - Map doc1 = ((RedisHashCommands) sync).hgetall("estest:doc1"); + Assertions.assertEquals(2, commands.keys("estest:*").size()); + Map doc1 = commands.hgetall("estest:doc1"); Assertions.assertEquals("ruan", doc1.get("_source.name")); Assertions.assertEquals("3", doc1.get("_source.articles[1]")); } @@ -395,23 +292,22 @@ void fileImportJSONElastic() throws Exception { @Test void fileImportJSON() throws Exception { testImport("file-import-json", "beer:*", BEER_JSON_COUNT); - Map beer1 = connection.sync().hgetall("beer:1"); + Map beer1 = commands.hgetall("beer:1"); Assertions.assertEquals("Hocus Pocus", beer1.get("name")); } @Test void fileImportXML() throws Exception { testImport("file-import-xml", "trade:*", 3); - Map trade1 = connection.sync().hgetall("trade:1"); + Map trade1 = commands.hgetall("trade:1"); Assertions.assertEquals("XYZ0001", trade1.get("isin")); } @SuppressWarnings("rawtypes") @Test - void fileExportJSON() throws Exception { - List records = exportToJsonFile(); - RedisServerCommands sync = connection.sync(); - Assertions.assertEquals(sync.dbsize(), records.size()); + void fileExportJSON(TestInfo info) throws Exception { + List records = exportToJsonFile(info); + Assertions.assertEquals(commands.dbsize(), records.size()); } @SuppressWarnings("rawtypes") @@ -429,16 +325,21 @@ void fileExportJSONGz() throws Exception { objectReader.setMapper(new ObjectMapper()); builder.jsonObjectReader(objectReader); JsonItemReader reader = builder.build(); - List records = readAll(reader); - RedisKeyCommands sync = connection.sync(); - Assertions.assertEquals(sync.keys("beer:*").size(), records.size()); + reader.open(new ExecutionContext()); + try { + List records = readAll(reader); + Assertions.assertEquals(commands.keys("beer:*").size(), records.size()); + } finally { + reader.close(); + } } @SuppressWarnings("rawtypes") - private List exportToJsonFile() throws Exception { + private List exportToJsonFile(TestInfo info) throws Exception { String filename = "file-export-json"; Path file = tempFile("redis.json"); - generate(filename); + generate(info); + Thread.sleep(300); execute(filename, this::executeFileDumpExport); JsonItemReaderBuilder builder = new JsonItemReaderBuilder<>(); builder.name("json-data-structure-file-reader"); @@ -447,14 +348,19 @@ private List exportToJsonFile() throws Exception { objectReader.setMapper(new ObjectMapper()); builder.jsonObjectReader(objectReader); JsonItemReader reader = builder.build(); - return readAll(reader); + reader.open(new ExecutionContext()); + try { + return readAll(reader); + } finally { + reader.close(); + } } @SuppressWarnings("rawtypes") @Test - void fileExportXml() throws Exception { + void fileExportXml(TestInfo info) throws Exception { String filename = "file-export-xml"; - generate(filename); + generate(info); Path file = tempFile("redis.xml"); execute(filename, this::executeFileDumpExport); XmlItemReaderBuilder builder = new XmlItemReaderBuilder<>(); @@ -464,17 +370,18 @@ void fileExportXml() throws Exception { xmlObjectReader.setMapper(new XmlMapper()); builder.xmlObjectReader(xmlObjectReader); XmlItemReader reader = builder.build(); + reader.open(new ExecutionContext()); List records = readAll(reader); - RedisModulesCommands sync = connection.sync(); - Assertions.assertEquals(sync.dbsize(), records.size()); + reader.close(); + Assertions.assertEquals(commands.dbsize(), records.size()); for (KeyValue record : records) { String key = record.getKey(); switch (record.getType()) { case HASH: - Assertions.assertEquals(record.getValue(), sync.hgetall(key)); + Assertions.assertEquals(record.getValue(), commands.hgetall(key)); break; case STRING: - Assertions.assertEquals(record.getValue(), sync.get(key)); + Assertions.assertEquals(record.getValue(), commands.get(key)); break; default: break; @@ -490,7 +397,7 @@ void fileImportJSONGzip() throws Exception { @Test void fileImportSugadd() throws Exception { assertExecutionSuccessful(execute("file-import-sugadd")); - List> suggestions = connection.sync().ftSugget("names", "Bea", + List> suggestions = commands.ftSugget("names", "Bea", SuggetOptions.builder().withPayloads(true).build()); Assertions.assertEquals(5, suggestions.size()); Assertions.assertEquals("American Blonde Ale", suggestions.get(0).getPayload()); @@ -499,10 +406,9 @@ void fileImportSugadd() throws Exception { @Test void fileImportElasticJSON() throws Exception { assertExecutionSuccessful(execute("file-import-json-elastic-jsonset")); - RedisModulesCommands sync = connection.sync(); - Assertions.assertEquals(2, sync.keys("elastic:*").size()); + Assertions.assertEquals(2, commands.keys("elastic:*").size()); ObjectMapper mapper = new ObjectMapper(); - String doc1 = sync.jsonGet("elastic:doc1"); + String doc1 = commands.jsonGet("elastic:doc1"); String expected = "{\"_index\":\"test-index\",\"_type\":\"docs\",\"_id\":\"doc1\",\"_score\":1,\"_source\":{\"name\":\"ruan\",\"age\":30,\"articles\":[\"1\",\"3\"]}}"; Assertions.assertEquals(mapper.readTree(expected), mapper.readTree(doc1)); } @@ -510,7 +416,7 @@ void fileImportElasticJSON() throws Exception { @Test void fakerHash() throws Exception { List keys = testImport("faker-import-hset", "person:*", 1000); - Map person = connection.sync().hgetall(keys.get(0)); + Map person = commands.hgetall(keys.get(0)); Assertions.assertTrue(person.containsKey("firstName")); Assertions.assertTrue(person.containsKey("lastName")); Assertions.assertTrue(person.containsKey("address")); @@ -519,8 +425,7 @@ void fakerHash() throws Exception { @Test void fakerSet() throws Exception { execute("faker-import-sadd"); - RedisSetCommands sync = connection.sync(); - Set names = sync.smembers("got:characters"); + Set names = commands.smembers("got:characters"); Assertions.assertTrue(names.size() > 10); for (String name : names) { Assertions.assertFalse(name.isEmpty()); @@ -530,18 +435,16 @@ void fakerSet() throws Exception { @Test void fakerZset() throws Exception { execute("faker-import-zadd"); - RedisKeyCommands sync = connection.sync(); - List keys = sync.keys("leases:*"); + List keys = commands.keys("leases:*"); Assertions.assertTrue(keys.size() > 100); String key = keys.get(0); - Assertions.assertTrue(((RedisSortedSetCommands) sync).zcard(key) > 0); + Assertions.assertTrue(commands.zcard(key) > 0); } @Test void fakerStream() throws Exception { execute("faker-import-xadd"); - RedisStreamCommands sync = connection.sync(); - List> messages = sync.xrange("teststream:1", Range.unbounded()); + List> messages = commands.xrange("teststream:1", Range.unbounded()); Assertions.assertTrue(messages.size() > 0); } @@ -554,12 +457,12 @@ void fakerInfer() throws Exception { String FIELD_NAME = "name"; String FIELD_STYLE = "style"; String FIELD_OUNCES = "ounces"; - connection.sync().ftCreate(INDEX, CreateOptions. builder().prefix("beer:").build(), + commands.ftCreate(INDEX, CreateOptions. builder().prefix("beer:").build(), Field.tag(FIELD_ID).sortable().build(), Field.text(FIELD_NAME).sortable().build(), Field.text(FIELD_STYLE).matcher(PhoneticMatcher.ENGLISH).sortable().build(), Field.numeric(FIELD_ABV).sortable().build(), Field.numeric(FIELD_OUNCES).sortable().build()); execute("faker-import-infer"); - SearchResults results = connection.sync().ftSearch(INDEX, "*"); + SearchResults results = commands.ftSearch(INDEX, "*"); Assertions.assertEquals(1000, results.getCount()); Document doc1 = results.get(0); Assertions.assertNotNull(doc1.get(FIELD_ABV)); @@ -569,13 +472,13 @@ void fakerInfer() throws Exception { @Disabled("Flaky test") void fakerTsAdd() throws Exception { execute("faker-import-tsadd"); - Assertions.assertEquals(10, connection.sync().tsRange("ts:gen", TimeRange.unbounded()).size()); + Assertions.assertEquals(10, commands.tsRange("ts:gen", TimeRange.unbounded()).size()); } @Test void fakerTsAddWithOptions() throws Exception { execute("faker-import-tsadd-options"); - List> results = connection.sync().tsMrange(TimeRange.unbounded(), + List> results = commands.tsMrange(TimeRange.unbounded(), MRangeOptions. filters("character1=Einstein").build()); Assertions.assertFalse(results.isEmpty()); } @@ -584,185 +487,7 @@ void fakerTsAddWithOptions() throws Exception { void generateTypes() throws Exception { execute("generate"); Assertions.assertEquals(Math.min(GeneratorImport.DEFAULT_COUNT, GeneratorItemReader.DEFAULT_KEY_RANGE.getMax()), - connection.sync().dbsize()); - } - - @Test - void replicate() throws Throwable { - String filename = "replicate"; - generate(filename); - Assertions.assertTrue(connection.sync().dbsize() > 0); - execute(filename); - } - - @Test - void replicateDryRun() throws Throwable { - String filename = "replicate-dry-run"; - generate(filename); - Assertions.assertTrue(connection.sync().dbsize() > 0); - execute(filename); - Assertions.assertEquals(0, targetConnection.sync().dbsize()); - } - - @Test - void replicateHLL() throws Throwable { - String key = "crawled:20171124"; - String value = "http://www.google.com/"; - connection.sync().pfadd(key, value); - Assertions.assertEquals(0, execute("replicate-hll")); - awaitCompare(); - } - - private void awaitCompare() { - Awaitility.await().timeout(COMPARE_TIMEOUT).until(this::compare); - } - - @Test - void replicateKeyProcessor() throws Throwable { - String filename = "replicate-key-processor"; - GeneratorItemReader generator = generator(); - generator.setMaxItemCount(200); - generator.setTypes(DataType.HASH); - generate(filename, DEFAULT_BATCH_SIZE, generator); - Long sourceSize = connection.sync().dbsize(); - Assertions.assertTrue(sourceSize > 0); - execute(filename); - Assertions.assertEquals(sourceSize, targetConnection.sync().dbsize()); - Assertions.assertEquals(connection.sync().hgetall("gen:1"), targetConnection.sync().hgetall("0:gen:1")); - } - - @Test - void replicateKeyExclude() throws Throwable { - String filename = "replicate-key-exclude"; - int goodCount = 200; - GeneratorItemReader generator = generator(); - generator.setMaxItemCount(goodCount); - generator.setTypes(DataType.HASH); - generate(filename, generator); - GeneratorItemReader generator2 = generator(); - int badCount = 100; - generator2.setMaxItemCount(badCount); - generator2.setKeyspace("bad"); - generator2.setTypes(DataType.HASH); - generate(filename + "-2", generator2); - Assertions.assertEquals(badCount, connection.sync().keys("bad:*").size()); - execute(filename); - Assertions.assertEquals(goodCount, targetConnection.sync().keys("gen:*").size()); - } - - @Test - void replicateLiveKeyExclude() throws Throwable { - int goodCount = 200; - int badCount = 100; - String filename = "replicate-live-key-exclude"; - connection.sync().configSet("notify-keyspace-events", "AK"); - ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor(); - executor.schedule(() -> { - GeneratorItemReader generator = generator(); - generator.setMaxItemCount(goodCount); - generator.setTypes(DataType.HASH); - GeneratorItemReader generator2 = generator(); - generator2.setMaxItemCount(badCount); - generator2.setTypes(DataType.HASH); - generator2.setKeyspace("bad"); - try { - generate(filename, generator); - generate(filename + "-2", generator2); - } catch (Exception e) { - log.error("Could not generate data", e); - } - }, 500, TimeUnit.MILLISECONDS); - execute(filename); - awaitUntil(() -> connection.sync().keys("bad:*").size() == badCount); - awaitUntil(() -> targetConnection.sync().keys("gen:*").size() == goodCount); - } - - @Test - void replicateLive() throws Exception { - runLiveReplication("replicate-live"); - } - - @Test - void replicateLiveKeySlot() throws Exception { - String filename = "replicate-live-keyslot"; - connection.sync().configSet("notify-keyspace-events", "AK"); - ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor(); - GeneratorItemReader generator = generator(); - generator.setMaxItemCount(300); - executor.schedule(() -> { - try { - generate(filename, 1, generator); - } catch (Exception e) { - log.error("Could not generate data", e); - } - }, 500, TimeUnit.MILLISECONDS); - execute(filename); - List keys = targetConnection.sync().keys("*"); - for (String key : keys) { - int slot = SlotHash.getSlot(key); - Assertions.assertTrue(slot >= 0 && slot <= 8000); - } - } - - @Test - void replicateDs() throws Throwable { - String filename = "replicate-ds"; - GeneratorItemReader generator = generator(); - generator.setMaxItemCount(12000); - generate(filename, DEFAULT_BATCH_SIZE, generator); - Assertions.assertTrue(connection.sync().dbsize() > 0); - execute(filename); - } - - protected void runLiveReplication(String filename) throws Exception { - connection.sync().configSet("notify-keyspace-events", "AK"); - generate(filename, DEFAULT_BATCH_SIZE, generator(3000)); - ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor(); - executor.schedule(() -> { - GeneratorItemReader generator = generator(3500); - generator.setCurrentItemCount(3000); - generator.setMaxItemCount(3500); - try { - run(filename + "-generate-live", 1, generator, structWriter(client)); - } catch (Exception e) { - log.error("Could not generate data", e); - } - awaitUntilFalse(generator::isOpen); - }, 500, TimeUnit.MILLISECONDS); - execute(filename); - awaitCompare(); - } - - protected KeyComparisonItemReader comparisonReader() { - KeyComparisonItemReader reader = new KeyComparisonItemReader(structReader(client), structReader(targetClient)); - reader.setTtlTolerance(Duration.ofMillis(100)); - return reader; - } - - protected boolean compare() throws JobExecutionException { - if (connection.sync().dbsize().equals(0L)) { - log.info("Source database is empty"); - return false; - } - if (!connection.sync().dbsize().equals(targetConnection.sync().dbsize())) { - log.info("Source and target databases have different sizes"); - return false; - } - KeyComparisonItemReader reader = comparisonReader(); - SynchronizedListItemWriter writer = new SynchronizedListItemWriter<>(); - run("compare-" + id(), DEFAULT_BATCH_SIZE, reader, writer); - awaitUntilFalse(reader::isOpen); - if (writer.getWrittenItems().isEmpty()) { - log.info("No comparison items were written"); - return false; - } - for (KeyComparison comparison : writer.getWrittenItems()) { - if (comparison.getStatus() != Status.OK) { - log.error(String.format("Key %s has status %s", comparison.getSource().getKey(), comparison.getStatus())); - return false; - } - } - return true; + commands.dbsize()); } } diff --git a/plugins/riot/src/test/java/com/redis/riot/cli/PostgresTests.java b/plugins/riot/src/test/java/com/redis/riot/cli/PostgresTests.java index 72547f096..fedd21ce4 100644 --- a/plugins/riot/src/test/java/com/redis/riot/cli/PostgresTests.java +++ b/plugins/riot/src/test/java/com/redis/riot/cli/PostgresTests.java @@ -14,13 +14,13 @@ import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestInfo; import org.testcontainers.containers.JdbcDatabaseContainer; import org.testcontainers.containers.PostgreSQLContainer; import org.testcontainers.utility.DockerImageName; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectReader; -import com.redis.lettucemod.api.sync.RedisModulesCommands; import com.redis.spring.batch.common.DataType; import com.redis.spring.batch.gen.GeneratorItemReader; @@ -49,14 +49,14 @@ void clearTables() throws SQLException { } @Test - void export() throws Exception { + void export(TestInfo info) throws Exception { String filename = "db-export-postgresql"; try (Statement statement = databaseConnection.createStatement()) { statement.execute("CREATE TABLE mytable (id smallint NOT NULL, field1 bpchar, field2 bpchar)"); statement.execute("ALTER TABLE ONLY mytable ADD CONSTRAINT pk_mytable PRIMARY KEY (id)"); GeneratorItemReader generator = generator(); generator.setTypes(DataType.HASH); - generate(filename, DEFAULT_BATCH_SIZE, generator); + generate(testInfo(info, "gen"), generator); execute(filename, this::executeDatabaseExport); statement.execute("SELECT COUNT(*) AS count FROM mytable"); ResultSet countResultSet = statement.getResultSet(); @@ -70,7 +70,7 @@ void export() throws Exception { Assertions.assertNotNull(resultSet.getString("field2")); count++; } - Assertions.assertEquals(connection.sync().dbsize(), count); + Assertions.assertEquals(commands.dbsize(), count); } } @@ -79,14 +79,13 @@ void nullValueExport() throws Exception { try (Statement statement = databaseConnection.createStatement()) { statement.execute("CREATE TABLE mytable (id smallint NOT NULL, field1 bpchar, field2 bpchar)"); statement.execute("ALTER TABLE ONLY mytable ADD CONSTRAINT pk_mytable PRIMARY KEY (id)"); - RedisModulesCommands sync = connection.sync(); Map hash1 = new HashMap<>(); hash1.put("field1", "value1"); hash1.put("field2", "value2"); - sync.hmset("gen:1", hash1); + commands.hmset("gen:1", hash1); Map hash2 = new HashMap<>(); hash2.put("field2", "value2"); - sync.hmset("gen:2", hash2); + commands.hmset("gen:2", hash2); execute("db-export-postgresql", this::executeDatabaseExport); statement.execute("SELECT COUNT(*) AS count FROM mytable"); ResultSet countResultSet = statement.getResultSet(); @@ -99,7 +98,7 @@ void nullValueExport() throws Exception { Assertions.assertEquals(index + 1, resultSet.getInt("id")); index++; } - Assertions.assertEquals(sync.dbsize().longValue(), index); + Assertions.assertEquals(commands.dbsize().longValue(), index); } } @@ -108,12 +107,11 @@ void hashImport() throws Exception { execute("db-import-postgresql", this::executeDatabaseImport); try (Statement statement = databaseConnection.createStatement()) { statement.execute("SELECT COUNT(*) AS count FROM orders"); - RedisModulesCommands sync = connection.sync(); - List keys = sync.keys("order:*"); + List keys = commands.keys("order:*"); ResultSet resultSet = statement.getResultSet(); resultSet.next(); Assertions.assertEquals(resultSet.getLong("count"), keys.size()); - Map order = sync.hgetall("order:10248"); + Map order = commands.hgetall("order:10248"); Assertions.assertEquals("10248", order.get("order_id")); Assertions.assertEquals("VINET", order.get("customer_id")); } @@ -122,14 +120,13 @@ void hashImport() throws Exception { @Test void multiThreadedImport() throws Exception { execute("db-import-postgresql-multithreaded", this::executeDatabaseImport); - RedisModulesCommands sync = connection.sync(); - List keys = sync.keys("order:*"); + List keys = commands.keys("order:*"); try (Statement statement = databaseConnection.createStatement()) { try (ResultSet resultSet = statement.executeQuery("SELECT COUNT(*) AS count FROM orders")) { Assertions.assertTrue(resultSet.next()); Assertions.assertEquals(resultSet.getLong("count"), keys.size()); } - Map order = sync.hgetall("order:10248"); + Map order = commands.hgetall("order:10248"); Assertions.assertEquals("10248", order.get("order_id")); Assertions.assertEquals("VINET", order.get("customer_id")); } @@ -141,11 +138,10 @@ void setImport() throws Exception { try (Statement statement = databaseConnection.createStatement()) { statement.execute("SELECT * FROM orders"); ResultSet resultSet = statement.getResultSet(); - RedisModulesCommands sync = connection.sync(); long count = 0; while (resultSet.next()) { int orderId = resultSet.getInt("order_id"); - String order = sync.get("order:" + orderId); + String order = commands.get("order:" + orderId); ObjectMapper mapper = new ObjectMapper(); ObjectReader reader = mapper.readerFor(Map.class); Map orderMap = reader.readValue(order); @@ -157,7 +153,7 @@ void setImport() throws Exception { Assertions.assertEquals(resultSet.getFloat("freight"), ((Double) orderMap.get("freight")).floatValue(), 0); count++; } - List keys = sync.keys("order:*"); + List keys = commands.keys("order:*"); Assertions.assertEquals(count, keys.size()); } } diff --git a/plugins/riot/src/test/java/com/redis/riot/cli/ReplicationTests.java b/plugins/riot/src/test/java/com/redis/riot/cli/ReplicationTests.java new file mode 100644 index 000000000..3bd46148e --- /dev/null +++ b/plugins/riot/src/test/java/com/redis/riot/cli/ReplicationTests.java @@ -0,0 +1,179 @@ +package com.redis.riot.cli; + +import java.util.List; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestInfo; +import org.slf4j.event.Level; +import org.slf4j.impl.SimpleLogger; +import org.springframework.batch.core.step.builder.SimpleStepBuilder; + +import com.redis.spring.batch.RedisItemWriter; +import com.redis.spring.batch.common.DataType; +import com.redis.spring.batch.common.KeyComparison; +import com.redis.spring.batch.common.KeyValue; +import com.redis.spring.batch.gen.GeneratorItemReader; +import com.redis.spring.batch.writer.StructItemWriter; + +import io.lettuce.core.cluster.SlotHash; + +abstract class ReplicationTests extends RiotTests { + + static { + System.setProperty(SimpleLogger.LOG_KEY_PREFIX + "com.redis.spring", Level.DEBUG.name()); + } + + @Test + void replicate(TestInfo info) throws Throwable { + String filename = "replicate"; + generate(info); + Assertions.assertTrue(commands.dbsize() > 0); + execute(filename); + } + + @Test + void replicateDryRun(TestInfo info) throws Throwable { + String filename = "replicate-dry-run"; + generate(info); + Assertions.assertTrue(commands.dbsize() > 0); + execute(filename); + Assertions.assertEquals(0, targetCommands.dbsize()); + } + + @Test + void replicateHLL(TestInfo info) throws Throwable { + String key = "crawled:20171124"; + String value = "http://www.google.com/"; + commands.pfadd(key, value); + Assertions.assertEquals(0, execute("replicate-hll")); + Assertions.assertTrue(compare(info).isEmpty()); + } + + @Test + void replicateKeyProcessor(TestInfo info) throws Throwable { + String filename = "replicate-key-processor"; + GeneratorItemReader gen = generator(1, DataType.HASH); + generate(info, gen); + Long sourceSize = commands.dbsize(); + Assertions.assertTrue(sourceSize > 0); + execute(filename); + Assertions.assertEquals(sourceSize, targetCommands.dbsize()); + Assertions.assertEquals(commands.hgetall("gen:1"), targetCommands.hgetall("0:gen:1")); + } + + @Test + void replicateKeyExclude(TestInfo info) throws Throwable { + String filename = "replicate-key-exclude"; + int goodCount = 200; + GeneratorItemReader gen = generator(goodCount, DataType.HASH); + generate(info, gen); + int badCount = 100; + GeneratorItemReader generator2 = generator(badCount, DataType.HASH); + generator2.setKeyspace("bad"); + generate(testInfo(info, "2"), generator2); + Assertions.assertEquals(badCount, commands.keys("bad:*").size()); + execute(filename); + Assertions.assertEquals(goodCount, targetCommands.keys("gen:*").size()); + } + + @Test + void replicateLiveKeyExclude(TestInfo info) throws Throwable { + int goodCount = 200; + int badCount = 100; + String filename = "replicate-live-key-exclude"; + enableKeyspaceNotifications(client); + ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor(); + executor.schedule(() -> { + GeneratorItemReader generator = generator(goodCount, DataType.HASH); + GeneratorItemReader generator2 = generator(badCount, DataType.HASH); + generator2.setKeyspace("bad"); + try { + generate(testInfo(info, "gen1"), generator); + generate(testInfo(info, "gen2"), generator2); + } catch (Exception e) { + log.error("Could not generate data", e); + } + }, 500, TimeUnit.MILLISECONDS); + execute(filename); + Assertions.assertEquals(badCount, commands.keys("bad:*").size()); + Assertions.assertEquals(goodCount, targetCommands.keys("gen:*").size()); + } + + @Test + void replicateLive(TestInfo info) throws Exception { + runLiveReplication(info, "replicate-live"); + List diffs = compare(info); + Assertions.assertTrue(diffs.isEmpty()); + } + + @Test + void replicateLiveMultiThreaded(TestInfo info) throws Exception { + runLiveReplication(info, "replicate-live-threads"); + Assertions.assertTrue(compare(info).isEmpty()); + } + + @Test + void replicateLiveStruct(TestInfo info) throws Exception { + runLiveReplication(info, "replicate-live-struct"); + List diffs = compare(info); + Assertions.assertTrue(diffs.isEmpty()); + } + + @Test + void replicateLiveKeySlot(TestInfo info) throws Exception { + String filename = "replicate-live-keyslot"; + enableKeyspaceNotifications(client); + ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor(); + GeneratorItemReader generator = generator(300); + executor.schedule(() -> { + TestInfo genInfo = testInfo(info, filename, "gen-live"); + try { + run(genInfo, step(genInfo, 1, generator, null, RedisItemWriter.struct(client))); + } catch (Exception e) { + log.error("Could not generate data", e); + } + }, 500, TimeUnit.MILLISECONDS); + execute(filename); + List keys = targetCommands.keys("*"); + for (String key : keys) { + int slot = SlotHash.getSlot(key); + Assertions.assertTrue(slot >= 0 && slot <= 8000); + } + } + + @Test + void replicateStruct(TestInfo info) throws Throwable { + String filename = "replicate-struct"; + GeneratorItemReader generator = generator(12000); + generate(testInfo(info, "gen"), generator); + Assertions.assertTrue(commands.dbsize() > 0); + execute(filename); + } + + protected void runLiveReplication(TestInfo info, String filename) throws Exception { + enableKeyspaceNotifications(client); + GeneratorItemReader gen = generator(3000, DataType.HASH, DataType.STRING, DataType.LIST, DataType.ZSET); + generate(testInfo(info, "liveReplicationGen"), gen); + ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor(); + executor.schedule(() -> { + GeneratorItemReader generator = generator(3500, DataType.HASH, DataType.STRING, DataType.LIST, DataType.ZSET); + generator.setCurrentItemCount(3000); + StructItemWriter writer = RedisItemWriter.struct(client); + TestInfo stepInfo = testInfo(info, filename, "gen-live"); + SimpleStepBuilder, KeyValue> step = step(stepInfo, 1, generator, null, writer); + try { + run(testInfo(info, filename, "gen-live"), step); + } catch (Exception e) { + log.error("Could not generate data", e); + } + awaitUntilFalse(generator::isOpen); + }, 500, TimeUnit.MILLISECONDS); + execute(filename); + Thread.sleep(300); + } + +} diff --git a/plugins/riot/src/test/java/com/redis/riot/cli/RiotTests.java b/plugins/riot/src/test/java/com/redis/riot/cli/RiotTests.java new file mode 100644 index 000000000..c98ac56ff --- /dev/null +++ b/plugins/riot/src/test/java/com/redis/riot/cli/RiotTests.java @@ -0,0 +1,85 @@ +package com.redis.riot.cli; + +import java.io.InputStream; +import java.io.PrintWriter; +import java.nio.charset.Charset; + +import org.codehaus.plexus.util.cli.CommandLineUtils; +import org.junit.jupiter.api.Assertions; + +import com.redis.riot.cli.ProgressArgs.ProgressStyle; +import com.redis.riot.cli.operation.OperationCommand; +import com.redis.riot.core.ReplicationMode; +import com.redis.spring.batch.test.AbstractTargetTestBase; + +import io.micrometer.core.instrument.util.IOUtils; +import picocli.CommandLine.ExitCode; +import picocli.CommandLine.IExecutionStrategy; +import picocli.CommandLine.ParseResult; + +public abstract class RiotTests extends AbstractTargetTestBase { + + private static final int DEFAULT_NOTIFICATION_QUEUE_CAPACITY = 100000; + + private static final String PREFIX = "riot "; + + protected static void assertExecutionSuccessful(int exitCode) { + Assertions.assertEquals(0, exitCode); + } + + protected T command(ParseResult parseResult) { + return parseResult.subcommands().get(0).commandSpec().commandLine().getCommand(); + } + + protected int execute(String filename, IExecutionStrategy... executionStrategies) throws Exception { + String[] args = args(filename); + return Main.run(new PrintWriter(System.out), new PrintWriter(System.err), args, executionStrategy(executionStrategies)); + } + + private IExecutionStrategy executionStrategy(IExecutionStrategy... executionStrategies) { + CompositeExecutionStrategy strategy = new CompositeExecutionStrategy(); + strategy.addDelegates(this::execute); + strategy.addDelegates(executionStrategies); + return strategy; + } + + private int execute(ParseResult parseResult) { + Main main = (Main) parseResult.commandSpec().commandLine().getCommand(); + main.redisArgs.uriArgs.uri = getRedisServer().getRedisURI(); + main.redisArgs.cluster = getRedisServer().isCluster(); + for (ParseResult subParseResult : parseResult.subcommands()) { + Object command = subParseResult.commandSpec().commandLine().getCommand(); + if (command instanceof OperationCommand) { + command = subParseResult.commandSpec().parent().commandLine().getCommand(); + } + configureCommand(command); + } + return ExitCode.OK; + } + + protected void configureCommand(Object command) { + if (command instanceof AbstractJobCommand) { + AbstractJobCommand jobCommand = ((AbstractJobCommand) command); + jobCommand.progressArgs.style = ProgressStyle.NONE; + } + if (command instanceof ReplicateCommand) { + ReplicateCommand replicationCommand = (ReplicateCommand) command; + replicationCommand.targetRedisClientArgs.uriArgs.uri = getTargetRedisServer().getRedisURI(); + if (replicationCommand.mode == ReplicationMode.LIVE || replicationCommand.mode == ReplicationMode.LIVEONLY) { + replicationCommand.readerArgs.setIdleTimeout(DEFAULT_IDLE_TIMEOUT.toMillis()); + replicationCommand.readerArgs.setNotificationQueueCapacity(DEFAULT_NOTIFICATION_QUEUE_CAPACITY); + } + } + } + + private static String[] args(String filename) throws Exception { + try (InputStream inputStream = Main.class.getResourceAsStream("/" + filename)) { + String command = IOUtils.toString(inputStream, Charset.defaultCharset()); + if (command.startsWith(PREFIX)) { + command = command.substring(PREFIX.length()); + } + return CommandLineUtils.translateCommandline(command); + } + } + +} diff --git a/plugins/riot/src/test/java/com/redis/riot/cli/StackEnterpriseTests.java b/plugins/riot/src/test/java/com/redis/riot/cli/StackEnterpriseTests.java deleted file mode 100644 index d6aa637c2..000000000 --- a/plugins/riot/src/test/java/com/redis/riot/cli/StackEnterpriseTests.java +++ /dev/null @@ -1,23 +0,0 @@ -package com.redis.riot.cli; - -import org.junit.jupiter.api.condition.EnabledOnOs; -import org.junit.jupiter.api.condition.OS; - -import com.redis.testcontainers.RedisServer; - -@EnabledOnOs(OS.LINUX) -class StackEnterpriseTests extends AbstractIntegrationTests { - - private static final RedisServer SOURCE = RedisContainerFactory.stack(); - private static final RedisServer TARGET = RedisContainerFactory.enterprise(); - - @Override - protected RedisServer getRedisServer() { - return SOURCE; - } - - @Override - protected RedisServer getTargetRedisServer() { - return TARGET; - } -} diff --git a/plugins/riot/src/test/java/com/redis/riot/cli/StackTests.java b/plugins/riot/src/test/java/com/redis/riot/cli/StackTests.java deleted file mode 100644 index d8f074021..000000000 --- a/plugins/riot/src/test/java/com/redis/riot/cli/StackTests.java +++ /dev/null @@ -1,33 +0,0 @@ -package com.redis.riot.cli; - -import org.junit.jupiter.api.Test; - -import com.redis.testcontainers.RedisServer; -import com.redis.testcontainers.RedisStackContainer; - -class StackTests extends AbstractIntegrationTests { - - public static final RedisStackContainer SOURCE = RedisContainerFactory.stack(); - public static final RedisStackContainer TARGET = RedisContainerFactory.stack(); - - @Override - protected RedisServer getRedisServer() { - return SOURCE; - } - - @Override - protected RedisServer getTargetRedisServer() { - return TARGET; - } - - @Test - void replicateLiveMultiThreaded() throws Exception { - runLiveReplication("replicate-live-threads"); - } - - @Test - void replicateDsLive() throws Exception { - runLiveReplication("replicate-ds-live"); - } - -} diff --git a/plugins/riot/src/test/java/com/redis/riot/cli/EnterpriseStackTests.java b/plugins/riot/src/test/java/com/redis/riot/cli/StackToEnterpriseTests.java similarity index 65% rename from plugins/riot/src/test/java/com/redis/riot/cli/EnterpriseStackTests.java rename to plugins/riot/src/test/java/com/redis/riot/cli/StackToEnterpriseTests.java index a7961c450..35b1a8583 100644 --- a/plugins/riot/src/test/java/com/redis/riot/cli/EnterpriseStackTests.java +++ b/plugins/riot/src/test/java/com/redis/riot/cli/StackToEnterpriseTests.java @@ -3,14 +3,16 @@ import org.junit.jupiter.api.condition.EnabledOnOs; import org.junit.jupiter.api.condition.OS; +import com.redis.spring.batch.common.DataType; +import com.redis.spring.batch.test.AbstractTestBase; import com.redis.testcontainers.RedisServer; @EnabledOnOs(OS.LINUX) -class EnterpriseStackTests extends AbstractIntegrationTests { +class StackToEnterpriseTests extends IntegrationTests { - private static final RedisServer SOURCE = RedisContainerFactory.enterprise(); + private static final RedisServer SOURCE = RedisContainerFactory.stack(); - private static final RedisServer TARGET = RedisContainerFactory.stack(); + private static final RedisServer TARGET = RedisContainerFactory.enterprise(); @Override protected RedisServer getRedisServer() { @@ -22,4 +24,9 @@ protected RedisServer getTargetRedisServer() { return TARGET; } + @Override + protected DataType[] generatorDataTypes() { + return AbstractTestBase.REDIS_MODULES_GENERATOR_TYPES; + } + } diff --git a/plugins/riot/src/test/java/com/redis/riot/cli/StackToStackIntegrationTests.java b/plugins/riot/src/test/java/com/redis/riot/cli/StackToStackIntegrationTests.java new file mode 100644 index 000000000..54a6f75b7 --- /dev/null +++ b/plugins/riot/src/test/java/com/redis/riot/cli/StackToStackIntegrationTests.java @@ -0,0 +1,29 @@ +package com.redis.riot.cli; + +import com.redis.spring.batch.common.DataType; +import com.redis.spring.batch.test.AbstractTestBase; +import com.redis.testcontainers.RedisServer; +import com.redis.testcontainers.RedisStackContainer; + +class StackToStackIntegrationTests extends IntegrationTests { + + public static final RedisStackContainer SOURCE = RedisContainerFactory.stack(); + + public static final RedisStackContainer TARGET = RedisContainerFactory.stack(); + + @Override + protected RedisServer getRedisServer() { + return SOURCE; + } + + @Override + protected RedisServer getTargetRedisServer() { + return TARGET; + } + + @Override + protected DataType[] generatorDataTypes() { + return AbstractTestBase.REDIS_MODULES_GENERATOR_TYPES; + } + +} diff --git a/plugins/riot/src/test/java/com/redis/riot/cli/StackToStackReplicationTests.java b/plugins/riot/src/test/java/com/redis/riot/cli/StackToStackReplicationTests.java new file mode 100644 index 000000000..dd046d519 --- /dev/null +++ b/plugins/riot/src/test/java/com/redis/riot/cli/StackToStackReplicationTests.java @@ -0,0 +1,29 @@ +package com.redis.riot.cli; + +import com.redis.spring.batch.common.DataType; +import com.redis.spring.batch.test.AbstractTestBase; +import com.redis.testcontainers.RedisServer; +import com.redis.testcontainers.RedisStackContainer; + +class StackToStackReplicationTests extends ReplicationTests { + + public static final RedisStackContainer SOURCE = RedisContainerFactory.stack(); + + public static final RedisStackContainer TARGET = RedisContainerFactory.stack(); + + @Override + protected RedisServer getRedisServer() { + return SOURCE; + } + + @Override + protected RedisServer getTargetRedisServer() { + return TARGET; + } + + @Override + protected DataType[] generatorDataTypes() { + return AbstractTestBase.REDIS_MODULES_GENERATOR_TYPES; + } + +} diff --git a/plugins/riot/src/test/java/com/redis/riot/cli/SynchronizedListItemWriter.java b/plugins/riot/src/test/java/com/redis/riot/cli/SynchronizedListItemWriter.java deleted file mode 100644 index f1f994f07..000000000 --- a/plugins/riot/src/test/java/com/redis/riot/cli/SynchronizedListItemWriter.java +++ /dev/null @@ -1,21 +0,0 @@ -package com.redis.riot.cli; - -import java.util.ArrayList; -import java.util.List; - -import org.springframework.batch.item.support.AbstractItemStreamItemWriter; - -public class SynchronizedListItemWriter extends AbstractItemStreamItemWriter { - - private List writtenItems = new ArrayList<>(); - - @Override - public synchronized void write(List items) throws Exception { - writtenItems.addAll(items); - } - - public List getWrittenItems() { - return this.writtenItems; - } - -} \ No newline at end of file diff --git a/plugins/riot/src/test/resources/replicate-key-processor b/plugins/riot/src/test/resources/replicate-key-processor index 09a4ce63e..2738aa13c 100644 --- a/plugins/riot/src/test/resources/replicate-key-processor +++ b/plugins/riot/src/test/resources/replicate-key-processor @@ -1 +1 @@ -riot -h source -p 6379 replicate -h target -p 6380 --batch 10 --key-proc="#{#source.database}:#{key}" \ No newline at end of file +riot -h source -p 6379 replicate -h target -p 6380 --batch 1 --key-proc="#{#source.database}:#{key}" \ No newline at end of file diff --git a/plugins/riot/src/test/resources/replicate-ds-live b/plugins/riot/src/test/resources/replicate-live-struct similarity index 100% rename from plugins/riot/src/test/resources/replicate-ds-live rename to plugins/riot/src/test/resources/replicate-live-struct diff --git a/plugins/riot/src/test/resources/replicate-ds b/plugins/riot/src/test/resources/replicate-struct similarity index 100% rename from plugins/riot/src/test/resources/replicate-ds rename to plugins/riot/src/test/resources/replicate-struct