diff --git a/connectors/riot-db/src/main/java/com/redis/riot/db/DatabaseImport.java b/connectors/riot-db/src/main/java/com/redis/riot/db/DatabaseImport.java index 265dda470..493aa21f7 100644 --- a/connectors/riot-db/src/main/java/com/redis/riot/db/DatabaseImport.java +++ b/connectors/riot-db/src/main/java/com/redis/riot/db/DatabaseImport.java @@ -7,7 +7,6 @@ import org.springframework.batch.item.database.AbstractCursorItemReader; import org.springframework.batch.item.database.builder.JdbcCursorItemReaderBuilder; import org.springframework.jdbc.core.ColumnMapRowMapper; -import org.springframework.util.ClassUtils; import com.redis.riot.core.AbstractImport; @@ -92,8 +91,7 @@ public void setVerifyCursorPosition(boolean verifyCursorPosition) { @Override protected Job job() { - String name = ClassUtils.getShortName(getClass()); - return jobBuilder().start(step(name, reader(), writer())).build(); + return jobBuilder().start(step(getName(), reader(), writer()).build()).build(); } private ItemReader> reader() { diff --git a/connectors/riot-faker/src/main/java/com/redis/riot/faker/FakerImport.java b/connectors/riot-faker/src/main/java/com/redis/riot/faker/FakerImport.java index da210bfdb..8fef49f5c 100644 --- a/connectors/riot-faker/src/main/java/com/redis/riot/faker/FakerImport.java +++ b/connectors/riot-faker/src/main/java/com/redis/riot/faker/FakerImport.java @@ -5,10 +5,8 @@ import java.util.Map; import org.springframework.batch.core.Job; -import org.springframework.batch.item.ItemWriter; import org.springframework.expression.Expression; import org.springframework.util.Assert; -import org.springframework.util.ClassUtils; import com.redis.lettucemod.api.sync.RediSearchCommands; import com.redis.lettucemod.search.Field; @@ -64,10 +62,7 @@ public void setLocale(Locale locale) { @Override protected Job job() { - String name = ClassUtils.getShortName(getClass()); - FakerItemReader reader = reader(); - ItemWriter> writer = writer(); - return jobBuilder().start(step(name, reader, writer)).build(); + return jobBuilder().start(step(getName(), reader(), writer()).build()).build(); } private FakerItemReader reader() { diff --git a/connectors/riot-file/src/main/java/com/redis/riot/file/FileDumpExport.java b/connectors/riot-file/src/main/java/com/redis/riot/file/FileDumpExport.java index 000d3f6c6..fc20a3c74 100644 --- a/connectors/riot-file/src/main/java/com/redis/riot/file/FileDumpExport.java +++ b/connectors/riot-file/src/main/java/com/redis/riot/file/FileDumpExport.java @@ -3,6 +3,7 @@ import java.io.IOException; import org.springframework.batch.core.Job; +import org.springframework.batch.item.ItemProcessor; import org.springframework.batch.item.ItemWriter; import org.springframework.batch.item.json.JacksonJsonObjectMarshaller; import org.springframework.batch.item.json.JsonObjectMarshaller; @@ -126,7 +127,8 @@ protected Job job() { RedisItemReader> reader = RedisItemReader.struct(); reader.setClient(getRedisClient()); configureReader(reader); - return jobBuilder().start(step(reader, processor(StringCodec.UTF8), writer())).build(); + ItemProcessor, KeyValue> processor = processor(StringCodec.UTF8); + return jobBuilder().start(step(getName(), reader, writer()).processor(processor).build()).build(); } } diff --git a/connectors/riot-file/src/main/java/com/redis/riot/file/FileDumpImport.java b/connectors/riot-file/src/main/java/com/redis/riot/file/FileDumpImport.java index 0e62ab39a..74730617c 100644 --- a/connectors/riot-file/src/main/java/com/redis/riot/file/FileDumpImport.java +++ b/connectors/riot-file/src/main/java/com/redis/riot/file/FileDumpImport.java @@ -13,7 +13,6 @@ import com.redis.riot.core.AbstractStructImport; import com.redis.spring.batch.KeyValue; -import com.redis.spring.batch.RedisItemWriter; public class FileDumpImport extends AbstractStructImport { @@ -47,9 +46,7 @@ protected Job job() { } List steps = new ArrayList<>(); for (Resource resource : resources) { - ItemReader> reader = reader(resource); - RedisItemWriter> writer = writer(); - steps.add(step(resource.getFilename(), reader, writer)); + steps.add(step(resource.getFilename(), reader(resource), writer()).build()); } Iterator iterator = steps.iterator(); SimpleJobBuilder job = jobBuilder().start(iterator.next()); diff --git a/connectors/riot-file/src/main/java/com/redis/riot/file/FileImport.java b/connectors/riot-file/src/main/java/com/redis/riot/file/FileImport.java index 91e5ac2ce..74f761cd5 100644 --- a/connectors/riot-file/src/main/java/com/redis/riot/file/FileImport.java +++ b/connectors/riot-file/src/main/java/com/redis/riot/file/FileImport.java @@ -18,7 +18,6 @@ import org.springframework.batch.core.step.tasklet.TaskletStep; import org.springframework.batch.item.ItemProcessor; import org.springframework.batch.item.ItemReader; -import org.springframework.batch.item.ItemWriter; import org.springframework.batch.item.file.FlatFileItemReader; import org.springframework.batch.item.file.builder.FlatFileItemReaderBuilder; import org.springframework.batch.item.file.mapping.FieldSetMapper; @@ -146,9 +145,7 @@ private TaskletStep step(Resource resource) { if (maxItemCount != null && reader instanceof AbstractItemCountingItemStreamItemReader) { ((AbstractItemCountingItemStreamItemReader>) reader).setMaxItemCount(maxItemCount); } - ItemProcessor, Map> processor = processor(); - ItemWriter> writer = writer(); - return step(resource.getFilename(), reader, processor, writer); + return step(resource.getFilename(), reader, writer()).processor(processor()).build(); } private ItemReader> reader(Resource resource) { diff --git a/connectors/riot-redis/src/main/java/com/redis/riot/redis/GeneratorImport.java b/connectors/riot-redis/src/main/java/com/redis/riot/redis/GeneratorImport.java index 3d307631f..9eb3ffd2e 100644 --- a/connectors/riot-redis/src/main/java/com/redis/riot/redis/GeneratorImport.java +++ b/connectors/riot-redis/src/main/java/com/redis/riot/redis/GeneratorImport.java @@ -43,11 +43,7 @@ public class GeneratorImport extends AbstractStructImport { @Override protected Job job() { - return jobBuilder().start(step(reader(), processor(), writer())).build(); - } - - private ItemProcessor> processor() { - return PROCESSOR; + return jobBuilder().start(step(getName(), reader(), writer()).processor(PROCESSOR).build()).build(); } private GeneratorItemReader reader() { diff --git a/connectors/riot-redis/src/main/java/com/redis/riot/redis/Replication.java b/connectors/riot-redis/src/main/java/com/redis/riot/redis/Replication.java index 5cb994d78..33b77a082 100644 --- a/connectors/riot-redis/src/main/java/com/redis/riot/redis/Replication.java +++ b/connectors/riot-redis/src/main/java/com/redis/riot/redis/Replication.java @@ -9,6 +9,7 @@ import org.springframework.batch.core.job.builder.JobFlowBuilder; import org.springframework.batch.core.job.builder.SimpleJobBuilder; import org.springframework.batch.core.job.flow.support.SimpleFlow; +import org.springframework.batch.core.step.builder.FaultTolerantStepBuilder; import org.springframework.batch.core.step.builder.SimpleStepBuilder; import org.springframework.batch.core.step.tasklet.TaskletStep; import org.springframework.batch.item.ItemProcessor; @@ -101,21 +102,21 @@ protected void close() { protected Job job() { ItemProcessor, KeyValue> processor = processor( ByteArrayCodec.INSTANCE); - SimpleStepBuilder, KeyValue> scanStep = stepBuilder(STEP_SCAN, - reader(), processor, writer()); + SimpleStepBuilder, KeyValue> scanStep = step(STEP_SCAN, reader(), + writer()).processor(processor); RedisItemReader> liveReader = reader(); liveReader.setMode(ReaderMode.LIVE); FlushingStepBuilder, KeyValue> liveStep = flushingStep( - stepBuilder(STEP_LIVE, liveReader, processor, writer())); + step(STEP_LIVE, liveReader, writer()).processor(processor)); KeyComparisonStatusCountItemWriter compareWriter = new KeyComparisonStatusCountItemWriter(); - TaskletStep compareStep = step(STEP_COMPARE, comparisonReader(), compareWriter); + TaskletStep compareStep = step(STEP_COMPARE, comparisonReader(), compareWriter).build(); switch (mode) { case COMPARE: return jobBuilder().start(compareStep).build(); case LIVE: checkKeyspaceNotificationEnabled(); - SimpleFlow scanFlow = flow("scan").start(build(scanStep)).build(); - SimpleFlow liveFlow = flow("live").start(build(liveStep)).build(); + SimpleFlow scanFlow = flow("scan").start(scanStep.build()).build(); + SimpleFlow liveFlow = flow("live").start(liveStep.build()).build(); SimpleFlow replicateFlow = flow("replicate").split(new SimpleAsyncTaskExecutor()).add(liveFlow, scanFlow) .build(); JobFlowBuilder live = jobBuilder().start(replicateFlow); @@ -146,9 +147,8 @@ private boolean shouldCompare() { } @Override - protected void configureStep(SimpleStepBuilder step, String name, ItemReader reader, - ItemWriter writer) { - super.configureStep(step, name, reader, writer); + protected FaultTolerantStepBuilder step(String name, ItemReader reader, ItemWriter writer) { + FaultTolerantStepBuilder step = super.step(name, reader, writer); switch (name) { case STEP_COMPARE: if (showDiffs) { @@ -165,6 +165,7 @@ protected void configureStep(SimpleStepBuilder step, String name, ItemRead default: break; } + return step; } private void checkKeyspaceNotificationEnabled() { diff --git a/core/riot-core/src/main/java/com/redis/riot/core/AbstractJobRunnable.java b/core/riot-core/src/main/java/com/redis/riot/core/AbstractJobRunnable.java index f4adbe9d9..efce2dab7 100644 --- a/core/riot-core/src/main/java/com/redis/riot/core/AbstractJobRunnable.java +++ b/core/riot-core/src/main/java/com/redis/riot/core/AbstractJobRunnable.java @@ -1,5 +1,6 @@ package com.redis.riot.core; +import java.text.ParseException; import java.time.Duration; import java.util.ArrayList; import java.util.Arrays; @@ -15,18 +16,13 @@ import org.springframework.batch.core.step.builder.SimpleStepBuilder; import org.springframework.batch.core.step.skip.NeverSkipItemSkipPolicy; import org.springframework.batch.core.step.skip.SkipPolicy; -import org.springframework.batch.core.step.tasklet.TaskletStep; -import org.springframework.batch.item.ItemProcessor; import org.springframework.batch.item.ItemReader; import org.springframework.batch.item.ItemStreamReader; import org.springframework.batch.item.ItemStreamSupport; import org.springframework.batch.item.ItemWriter; import org.springframework.batch.item.support.SynchronizedItemReader; import org.springframework.batch.item.support.SynchronizedItemStreamReader; -import org.springframework.core.task.SyncTaskExecutor; -import org.springframework.core.task.TaskExecutor; import org.springframework.retry.policy.MaxAttemptsRetryPolicy; -import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import org.springframework.util.ClassUtils; import com.redis.spring.batch.RedisItemReader; @@ -50,7 +46,7 @@ public abstract class AbstractJobRunnable extends AbstractRunnable { private static final String FAILED_JOB_MESSAGE = "Error executing job %s"; private String name; - private List stepConfigurators = new ArrayList<>(); + private List stepConfigurations = new ArrayList<>(); private int threads = DEFAULT_THREADS; private int chunkSize = DEFAULT_CHUNK_SIZE; private Duration sleep = DEFAULT_SLEEP; @@ -74,8 +70,8 @@ protected String name(String... suffixes) { return String.join("-", elements); } - public void addStepConfigurator(StepConfigurator configurator) { - stepConfigurators.add(configurator); + public void addStepConfiguration(StepConfiguration configuration) { + stepConfigurations.add(configuration); } public void setJobFactory(JobFactory jobFactory) { @@ -136,80 +132,45 @@ protected void writer(RedisItemWriter writer, RedisWriterOptions option } } - protected TaskletStep step(ItemReader reader, ItemWriter writer) { - return step(getName(), reader, null, writer); - } - - protected TaskletStep step(ItemReader reader, ItemProcessor processor, ItemWriter writer) { - return step(getName(), reader, processor, writer); - } - - protected TaskletStep step(String name, ItemReader reader, ItemWriter writer) { - return step(name, reader, null, writer); - } - - protected TaskletStep step(String name, ItemReader reader, ItemProcessor processor, - ItemWriter writer) { - return faultTolerant(stepBuilder(name, reader, processor, writer)).build(); - } - - protected SimpleStepBuilder stepBuilder(String name, ItemReader reader, - ItemProcessor processor, ItemWriter writer) { + protected FaultTolerantStepBuilder step(String name, ItemReader reader, ItemWriter writer) { SimpleStepBuilder builder = jobFactory.step(name, chunkSize); if (reader instanceof ItemStreamSupport) { ((ItemStreamSupport) reader).setName(name(name, "reader")); } - builder.reader(synchronize(reader)); - builder.processor(processor); + if (isMultiThreaded()) { + builder.reader(synchronize(reader)); + builder.taskExecutor(JobFactory.threadPoolTaskExecutor(threads)); + } else { + builder.reader(reader); + } builder.writer(writer(writer)); - builder.taskExecutor(taskExecutor()); - configureStep(builder, name, reader, writer); - stepConfigurators.forEach(s -> s.configure(builder, name, reader, writer)); - return builder; - } - - protected void configureStep(SimpleStepBuilder step, String name, ItemReader reader, - ItemWriter writer) { - } - - protected TaskletStep build(SimpleStepBuilder step) { - return faultTolerant(step).build(); + stepConfigurations.forEach(s -> s.configure(builder, name, reader, writer)); + return faultTolerant(builder); } protected FaultTolerantStepBuilder faultTolerant(SimpleStepBuilder step) { FaultTolerantStepBuilder ftStep = step.faultTolerant(); ftStep.skipLimit(skipLimit); ftStep.retryLimit(retryLimit); - ftStep.retry(RedisCommandTimeoutException.class); + ftStep.skip(ParseException.class); + ftStep.skip(RedisCommandExecutionException.class); + ftStep.noRetry(ParseException.class); ftStep.noRetry(RedisCommandExecutionException.class); + ftStep.noSkip(RedisCommandTimeoutException.class); + ftStep.retry(RedisCommandTimeoutException.class); return ftStep; } - private TaskExecutor taskExecutor() { - if (isMultiThreaded()) { - ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor(); - taskExecutor.setMaxPoolSize(threads); - taskExecutor.setCorePoolSize(threads); - taskExecutor.setQueueCapacity(threads); - taskExecutor.initialize(); - return taskExecutor; - } - return new SyncTaskExecutor(); - } - private ItemReader synchronize(ItemReader reader) { if (reader instanceof RedisItemReader) { return reader; } - if (isMultiThreaded()) { - if (reader instanceof ItemStreamReader) { - SynchronizedItemStreamReader synchronizedReader = new SynchronizedItemStreamReader<>(); - synchronizedReader.setDelegate((ItemStreamReader) reader); - return synchronizedReader; - } - return new SynchronizedItemReader<>(reader); + if (reader instanceof ItemStreamReader) { + SynchronizedItemStreamReader synchronizedReader = new SynchronizedItemStreamReader<>(); + synchronizedReader.setDelegate((ItemStreamReader) reader); + return synchronizedReader; } - return reader; + return new SynchronizedItemReader<>(reader); } private boolean isMultiThreaded() { diff --git a/core/riot-core/src/main/java/com/redis/riot/core/AbstractMapExport.java b/core/riot-core/src/main/java/com/redis/riot/core/AbstractMapExport.java index e77af11bd..d387cdbde 100644 --- a/core/riot-core/src/main/java/com/redis/riot/core/AbstractMapExport.java +++ b/core/riot-core/src/main/java/com/redis/riot/core/AbstractMapExport.java @@ -27,10 +27,7 @@ public void setKeyRegex(Pattern pattern) { @Override protected Job job() { - RedisItemReader> reader = reader(); - ItemProcessor, Map> processor = processor(); - ItemWriter> writer = writer(); - return jobBuilder().start(step(reader, processor, writer)).build(); + return jobBuilder().start(step(getName(), reader(), writer()).processor(processor()).build()).build(); } protected RedisItemReader> reader() { diff --git a/core/riot-core/src/main/java/com/redis/riot/core/StepConfigurator.java b/core/riot-core/src/main/java/com/redis/riot/core/StepConfiguration.java similarity index 58% rename from core/riot-core/src/main/java/com/redis/riot/core/StepConfigurator.java rename to core/riot-core/src/main/java/com/redis/riot/core/StepConfiguration.java index 38bb73402..318198197 100644 --- a/core/riot-core/src/main/java/com/redis/riot/core/StepConfigurator.java +++ b/core/riot-core/src/main/java/com/redis/riot/core/StepConfiguration.java @@ -4,8 +4,8 @@ import org.springframework.batch.item.ItemReader; import org.springframework.batch.item.ItemWriter; -public interface StepConfigurator { +public interface StepConfiguration { - void configure(SimpleStepBuilder step, String stepName, ItemReader reader, ItemWriter writer); + void configure(SimpleStepBuilder step, String name, ItemReader reader, ItemWriter writer); } diff --git a/plugins/riot/src/main/java/com/redis/riot/cli/AbstractJobCommand.java b/plugins/riot/src/main/java/com/redis/riot/cli/AbstractJobCommand.java index aa44af189..43e9212be 100644 --- a/plugins/riot/src/main/java/com/redis/riot/cli/AbstractJobCommand.java +++ b/plugins/riot/src/main/java/com/redis/riot/cli/AbstractJobCommand.java @@ -85,7 +85,7 @@ protected AbstractJobRunnable runnable() { runnable.setSleep(Duration.ofMillis(sleep)); runnable.setThreads(threads); if (progressStyle != ProgressStyle.NONE) { - runnable.addStepConfigurator(this::configureProgress); + runnable.addStepConfiguration(this::configureProgress); } return runnable; }