Skip to content

Commit

Permalink
refactor: Simplified step configuration
Browse files Browse the repository at this point in the history
  • Loading branch information
jruaux committed Apr 19, 2024
1 parent 1220604 commit c386146
Show file tree
Hide file tree
Showing 11 changed files with 45 additions and 101 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
import org.springframework.batch.item.database.AbstractCursorItemReader;
import org.springframework.batch.item.database.builder.JdbcCursorItemReaderBuilder;
import org.springframework.jdbc.core.ColumnMapRowMapper;
import org.springframework.util.ClassUtils;

import com.redis.riot.core.AbstractImport;

Expand Down Expand Up @@ -92,8 +91,7 @@ public void setVerifyCursorPosition(boolean verifyCursorPosition) {

@Override
protected Job job() {
String name = ClassUtils.getShortName(getClass());
return jobBuilder().start(step(name, reader(), writer())).build();
return jobBuilder().start(step(getName(), reader(), writer()).build()).build();
}

private ItemReader<Map<String, Object>> reader() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,8 @@
import java.util.Map;

import org.springframework.batch.core.Job;
import org.springframework.batch.item.ItemWriter;
import org.springframework.expression.Expression;
import org.springframework.util.Assert;
import org.springframework.util.ClassUtils;

import com.redis.lettucemod.api.sync.RediSearchCommands;
import com.redis.lettucemod.search.Field;
Expand Down Expand Up @@ -64,10 +62,7 @@ public void setLocale(Locale locale) {

@Override
protected Job job() {
String name = ClassUtils.getShortName(getClass());
FakerItemReader reader = reader();
ItemWriter<Map<String, Object>> writer = writer();
return jobBuilder().start(step(name, reader, writer)).build();
return jobBuilder().start(step(getName(), reader(), writer()).build()).build();
}

private FakerItemReader reader() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import java.io.IOException;

import org.springframework.batch.core.Job;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.json.JacksonJsonObjectMarshaller;
import org.springframework.batch.item.json.JsonObjectMarshaller;
Expand Down Expand Up @@ -126,7 +127,8 @@ protected Job job() {
RedisItemReader<String, String, KeyValue<String, Object>> reader = RedisItemReader.struct();
reader.setClient(getRedisClient());
configureReader(reader);
return jobBuilder().start(step(reader, processor(StringCodec.UTF8), writer())).build();
ItemProcessor<KeyValue<String, Object>, KeyValue<String, Object>> processor = processor(StringCodec.UTF8);
return jobBuilder().start(step(getName(), reader, writer()).processor(processor).build()).build();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@

import com.redis.riot.core.AbstractStructImport;
import com.redis.spring.batch.KeyValue;
import com.redis.spring.batch.RedisItemWriter;

public class FileDumpImport extends AbstractStructImport {

Expand Down Expand Up @@ -47,9 +46,7 @@ protected Job job() {
}
List<TaskletStep> steps = new ArrayList<>();
for (Resource resource : resources) {
ItemReader<KeyValue<String, Object>> reader = reader(resource);
RedisItemWriter<String, String, KeyValue<String, Object>> writer = writer();
steps.add(step(resource.getFilename(), reader, writer));
steps.add(step(resource.getFilename(), reader(resource), writer()).build());
}
Iterator<TaskletStep> iterator = steps.iterator();
SimpleJobBuilder job = jobBuilder().start(iterator.next());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
import org.springframework.batch.core.step.tasklet.TaskletStep;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.file.FlatFileItemReader;
import org.springframework.batch.item.file.builder.FlatFileItemReaderBuilder;
import org.springframework.batch.item.file.mapping.FieldSetMapper;
Expand Down Expand Up @@ -146,9 +145,7 @@ private TaskletStep step(Resource resource) {
if (maxItemCount != null && reader instanceof AbstractItemCountingItemStreamItemReader) {
((AbstractItemCountingItemStreamItemReader<Map<String, Object>>) reader).setMaxItemCount(maxItemCount);
}
ItemProcessor<Map<String, Object>, Map<String, Object>> processor = processor();
ItemWriter<Map<String, Object>> writer = writer();
return step(resource.getFilename(), reader, processor, writer);
return step(resource.getFilename(), reader, writer()).processor(processor()).build();
}

private ItemReader<Map<String, Object>> reader(Resource resource) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,7 @@ public class GeneratorImport extends AbstractStructImport {

@Override
protected Job job() {
return jobBuilder().start(step(reader(), processor(), writer())).build();
}

private ItemProcessor<Item, KeyValue<String, Object>> processor() {
return PROCESSOR;
return jobBuilder().start(step(getName(), reader(), writer()).processor(PROCESSOR).build()).build();
}

private GeneratorItemReader reader() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import org.springframework.batch.core.job.builder.JobFlowBuilder;
import org.springframework.batch.core.job.builder.SimpleJobBuilder;
import org.springframework.batch.core.job.flow.support.SimpleFlow;
import org.springframework.batch.core.step.builder.FaultTolerantStepBuilder;
import org.springframework.batch.core.step.builder.SimpleStepBuilder;
import org.springframework.batch.core.step.tasklet.TaskletStep;
import org.springframework.batch.item.ItemProcessor;
Expand Down Expand Up @@ -101,21 +102,21 @@ protected void close() {
protected Job job() {
ItemProcessor<KeyValue<byte[], Object>, KeyValue<byte[], Object>> processor = processor(
ByteArrayCodec.INSTANCE);
SimpleStepBuilder<KeyValue<byte[], Object>, KeyValue<byte[], Object>> scanStep = stepBuilder(STEP_SCAN,
reader(), processor, writer());
SimpleStepBuilder<KeyValue<byte[], Object>, KeyValue<byte[], Object>> scanStep = step(STEP_SCAN, reader(),
writer()).processor(processor);
RedisItemReader<byte[], byte[], KeyValue<byte[], Object>> liveReader = reader();
liveReader.setMode(ReaderMode.LIVE);
FlushingStepBuilder<KeyValue<byte[], Object>, KeyValue<byte[], Object>> liveStep = flushingStep(
stepBuilder(STEP_LIVE, liveReader, processor, writer()));
step(STEP_LIVE, liveReader, writer()).processor(processor));
KeyComparisonStatusCountItemWriter compareWriter = new KeyComparisonStatusCountItemWriter();
TaskletStep compareStep = step(STEP_COMPARE, comparisonReader(), compareWriter);
TaskletStep compareStep = step(STEP_COMPARE, comparisonReader(), compareWriter).build();
switch (mode) {
case COMPARE:
return jobBuilder().start(compareStep).build();
case LIVE:
checkKeyspaceNotificationEnabled();
SimpleFlow scanFlow = flow("scan").start(build(scanStep)).build();
SimpleFlow liveFlow = flow("live").start(build(liveStep)).build();
SimpleFlow scanFlow = flow("scan").start(scanStep.build()).build();
SimpleFlow liveFlow = flow("live").start(liveStep.build()).build();
SimpleFlow replicateFlow = flow("replicate").split(new SimpleAsyncTaskExecutor()).add(liveFlow, scanFlow)
.build();
JobFlowBuilder live = jobBuilder().start(replicateFlow);
Expand Down Expand Up @@ -146,9 +147,8 @@ private boolean shouldCompare() {
}

@Override
protected void configureStep(SimpleStepBuilder<?, ?> step, String name, ItemReader<?> reader,
ItemWriter<?> writer) {
super.configureStep(step, name, reader, writer);
protected <I, O> FaultTolerantStepBuilder<I, O> step(String name, ItemReader<I> reader, ItemWriter<O> writer) {
FaultTolerantStepBuilder<I, O> step = super.step(name, reader, writer);
switch (name) {
case STEP_COMPARE:
if (showDiffs) {
Expand All @@ -165,6 +165,7 @@ protected void configureStep(SimpleStepBuilder<?, ?> step, String name, ItemRead
default:
break;
}
return step;
}

private void checkKeyspaceNotificationEnabled() {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.redis.riot.core;

import java.text.ParseException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
Expand All @@ -15,18 +16,13 @@
import org.springframework.batch.core.step.builder.SimpleStepBuilder;
import org.springframework.batch.core.step.skip.NeverSkipItemSkipPolicy;
import org.springframework.batch.core.step.skip.SkipPolicy;
import org.springframework.batch.core.step.tasklet.TaskletStep;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.ItemStreamReader;
import org.springframework.batch.item.ItemStreamSupport;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.support.SynchronizedItemReader;
import org.springframework.batch.item.support.SynchronizedItemStreamReader;
import org.springframework.core.task.SyncTaskExecutor;
import org.springframework.core.task.TaskExecutor;
import org.springframework.retry.policy.MaxAttemptsRetryPolicy;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.util.ClassUtils;

import com.redis.spring.batch.RedisItemReader;
Expand All @@ -50,7 +46,7 @@ public abstract class AbstractJobRunnable extends AbstractRunnable {
private static final String FAILED_JOB_MESSAGE = "Error executing job %s";

private String name;
private List<StepConfigurator> stepConfigurators = new ArrayList<>();
private List<StepConfiguration> stepConfigurations = new ArrayList<>();
private int threads = DEFAULT_THREADS;
private int chunkSize = DEFAULT_CHUNK_SIZE;
private Duration sleep = DEFAULT_SLEEP;
Expand All @@ -74,8 +70,8 @@ protected String name(String... suffixes) {
return String.join("-", elements);
}

public void addStepConfigurator(StepConfigurator configurator) {
stepConfigurators.add(configurator);
public void addStepConfiguration(StepConfiguration configuration) {
stepConfigurations.add(configuration);
}

public void setJobFactory(JobFactory jobFactory) {
Expand Down Expand Up @@ -136,80 +132,45 @@ protected void writer(RedisItemWriter<?, ?, ?> writer, RedisWriterOptions option
}
}

protected <T> TaskletStep step(ItemReader<T> reader, ItemWriter<T> writer) {
return step(getName(), reader, null, writer);
}

protected <I, O> TaskletStep step(ItemReader<I> reader, ItemProcessor<I, O> processor, ItemWriter<O> writer) {
return step(getName(), reader, processor, writer);
}

protected <I, O> TaskletStep step(String name, ItemReader<I> reader, ItemWriter<O> writer) {
return step(name, reader, null, writer);
}

protected <I, O> TaskletStep step(String name, ItemReader<I> reader, ItemProcessor<I, O> processor,
ItemWriter<O> writer) {
return faultTolerant(stepBuilder(name, reader, processor, writer)).build();
}

protected <I, O> SimpleStepBuilder<I, O> stepBuilder(String name, ItemReader<I> reader,
ItemProcessor<I, O> processor, ItemWriter<O> writer) {
protected <I, O> FaultTolerantStepBuilder<I, O> step(String name, ItemReader<I> reader, ItemWriter<O> writer) {
SimpleStepBuilder<I, O> builder = jobFactory.step(name, chunkSize);
if (reader instanceof ItemStreamSupport) {
((ItemStreamSupport) reader).setName(name(name, "reader"));
}
builder.reader(synchronize(reader));
builder.processor(processor);
if (isMultiThreaded()) {
builder.reader(synchronize(reader));
builder.taskExecutor(JobFactory.threadPoolTaskExecutor(threads));
} else {
builder.reader(reader);
}
builder.writer(writer(writer));
builder.taskExecutor(taskExecutor());
configureStep(builder, name, reader, writer);
stepConfigurators.forEach(s -> s.configure(builder, name, reader, writer));
return builder;
}

protected void configureStep(SimpleStepBuilder<?, ?> step, String name, ItemReader<?> reader,
ItemWriter<?> writer) {
}

protected <I, O> TaskletStep build(SimpleStepBuilder<I, O> step) {
return faultTolerant(step).build();
stepConfigurations.forEach(s -> s.configure(builder, name, reader, writer));
return faultTolerant(builder);
}

protected <I, O> FaultTolerantStepBuilder<I, O> faultTolerant(SimpleStepBuilder<I, O> step) {
FaultTolerantStepBuilder<I, O> ftStep = step.faultTolerant();
ftStep.skipLimit(skipLimit);
ftStep.retryLimit(retryLimit);
ftStep.retry(RedisCommandTimeoutException.class);
ftStep.skip(ParseException.class);
ftStep.skip(RedisCommandExecutionException.class);
ftStep.noRetry(ParseException.class);
ftStep.noRetry(RedisCommandExecutionException.class);
ftStep.noSkip(RedisCommandTimeoutException.class);
ftStep.retry(RedisCommandTimeoutException.class);
return ftStep;
}

private TaskExecutor taskExecutor() {
if (isMultiThreaded()) {
ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
taskExecutor.setMaxPoolSize(threads);
taskExecutor.setCorePoolSize(threads);
taskExecutor.setQueueCapacity(threads);
taskExecutor.initialize();
return taskExecutor;
}
return new SyncTaskExecutor();
}

private <T> ItemReader<T> synchronize(ItemReader<T> reader) {
if (reader instanceof RedisItemReader) {
return reader;
}
if (isMultiThreaded()) {
if (reader instanceof ItemStreamReader) {
SynchronizedItemStreamReader<T> synchronizedReader = new SynchronizedItemStreamReader<>();
synchronizedReader.setDelegate((ItemStreamReader<T>) reader);
return synchronizedReader;
}
return new SynchronizedItemReader<>(reader);
if (reader instanceof ItemStreamReader) {
SynchronizedItemStreamReader<T> synchronizedReader = new SynchronizedItemStreamReader<>();
synchronizedReader.setDelegate((ItemStreamReader<T>) reader);
return synchronizedReader;
}
return reader;
return new SynchronizedItemReader<>(reader);
}

private boolean isMultiThreaded() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,7 @@ public void setKeyRegex(Pattern pattern) {

@Override
protected Job job() {
RedisItemReader<String, String, KeyValue<String, Object>> reader = reader();
ItemProcessor<KeyValue<String, Object>, Map<String, Object>> processor = processor();
ItemWriter<Map<String, Object>> writer = writer();
return jobBuilder().start(step(reader, processor, writer)).build();
return jobBuilder().start(step(getName(), reader(), writer()).processor(processor()).build()).build();
}

protected RedisItemReader<String, String, KeyValue<String, Object>> reader() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.ItemWriter;

public interface StepConfigurator {
public interface StepConfiguration {

void configure(SimpleStepBuilder<?, ?> step, String stepName, ItemReader<?> reader, ItemWriter<?> writer);
<I, O> void configure(SimpleStepBuilder<I, O> step, String name, ItemReader<I> reader, ItemWriter<O> writer);

}
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ protected AbstractJobRunnable runnable() {
runnable.setSleep(Duration.ofMillis(sleep));
runnable.setThreads(threads);
if (progressStyle != ProgressStyle.NONE) {
runnable.addStepConfigurator(this::configureProgress);
runnable.addStepConfiguration(this::configureProgress);
}
return runnable;
}
Expand Down

0 comments on commit c386146

Please sign in to comment.