Skip to content

Commit

Permalink
feat: Improved error handling
Browse files Browse the repository at this point in the history
  • Loading branch information
jruaux committed Oct 11, 2023
1 parent c16e831 commit a6cf0d5
Show file tree
Hide file tree
Showing 61 changed files with 1,068 additions and 1,353 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,14 @@

import org.springframework.batch.core.Job;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.database.AbstractCursorItemReader;
import org.springframework.batch.item.database.builder.JdbcCursorItemReaderBuilder;
import org.springframework.jdbc.core.ColumnMapRowMapper;
import org.springframework.util.ClassUtils;

import com.redis.riot.core.AbstractMapImport;
import com.redis.riot.core.RiotContext;
import com.redis.riot.core.StepBuilder;

public class DatabaseImport extends AbstractMapImport {

Expand Down Expand Up @@ -104,11 +105,10 @@ public void setVerifyCursorPosition(boolean verifyCursorPosition) {

@Override
protected Job job(RiotContext executionContext) {
StepBuilder<Map<String, Object>, Map<String, Object>> step = createStep();
step.name(getName());
step.reader(reader());
step.writer(writer(executionContext));
return jobBuilder().start(step.build().build()).build();
String name = ClassUtils.getShortName(getClass());
ItemReader<Map<String, Object>> reader = reader();
ItemWriter<Map<String, Object>> writer = writer(executionContext);
return jobBuilder().start(step(name, reader, writer).build()).build();
}

private ItemReader<Map<String, Object>> reader() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,10 @@
import java.util.Map;

import org.springframework.batch.core.Job;
import org.springframework.batch.item.ItemWriter;
import org.springframework.expression.Expression;
import org.springframework.util.Assert;
import org.springframework.util.ClassUtils;

import com.redis.lettucemod.api.sync.RediSearchCommands;
import com.redis.lettucemod.search.Field;
Expand All @@ -15,7 +17,6 @@
import com.redis.riot.core.AbstractMapImport;
import com.redis.riot.core.RiotContext;
import com.redis.riot.core.RiotUtils;
import com.redis.riot.core.StepBuilder;
import com.redis.spring.batch.common.Range;

public class FakerImport extends AbstractMapImport {
Expand Down Expand Up @@ -69,11 +70,10 @@ public void setLocale(Locale locale) {

@Override
protected Job job(RiotContext executionContext) {
StepBuilder<Map<String, Object>, Map<String, Object>> step = createStep();
step.name(getName());
step.reader(reader(executionContext));
step.writer(writer(executionContext));
return jobBuilder().start(step.build().build()).build();
String name = ClassUtils.getShortName(getClass());
FakerItemReader reader = reader(executionContext);
ItemWriter<Map<String, Object>> writer = writer(executionContext);
return jobBuilder().start(step(name, reader, writer).build()).build();
}

private FakerItemReader reader(RiotContext executionContext) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,18 +1,26 @@
package com.redis.riot.faker;

import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;

import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.springframework.batch.item.ExecutionContext;

import com.redis.spring.batch.util.BatchUtils;
import org.springframework.batch.item.ItemReader;

class FakerReaderTests {

@SuppressWarnings("deprecation")
public static <T> List<T> readAll(ItemReader<T> reader) throws Exception {
List<T> list = new ArrayList<>();
T element;
while ((element = reader.read()) != null) {
list.add(element);
}
return list;
}

@Test
void fakerReader() throws Exception {
int count = 100;
Expand All @@ -25,7 +33,7 @@ void fakerReader() throws Exception {
reader.setStringFields(fields);
reader.setMaxItemCount(count);
reader.open(new ExecutionContext());
List<Map<String, Object>> items = BatchUtils.readAll(reader);
List<Map<String, Object>> items = readAll(reader);
reader.close();
Assertions.assertEquals(count, items.size());
Assertions.assertEquals(1, items.get(0).get("index"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import java.io.IOException;

import org.springframework.batch.core.Job;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.json.JacksonJsonObjectMarshaller;
import org.springframework.batch.item.json.JsonObjectMarshaller;
Expand All @@ -14,7 +15,6 @@
import com.redis.riot.core.RedisContext;
import com.redis.riot.core.RiotContext;
import com.redis.riot.core.RiotExecutionException;
import com.redis.riot.core.StepBuilder;
import com.redis.riot.file.resource.JsonResourceItemWriter;
import com.redis.riot.file.resource.JsonResourceItemWriterBuilder;
import com.redis.riot.file.resource.XmlResourceItemWriter;
Expand Down Expand Up @@ -129,12 +129,10 @@ private JsonObjectMarshaller<KeyValue<String>> xmlMarshaller() {

@Override
protected Job job(RiotContext context) {
StepBuilder<KeyValue<String>, KeyValue<String>> step = createStep();
step.name(getName());
step.reader(reader(context.getRedisContext()));
step.writer(writer());
step.processor(processor(StringCodec.UTF8, context));
return jobBuilder().start(step.build().build()).build();
StructItemReader<String, String> reader = reader(context.getRedisContext());
ItemWriter<KeyValue<String>> writer = writer();
ItemProcessor<KeyValue<String>, KeyValue<String>> processor = processor(StringCodec.UTF8, context);
return jobBuilder().start(step(getName(), reader, processor, writer).build()).build();
}

private StructItemReader<String, String> reader(RedisContext context) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@

import com.redis.riot.core.AbstractStructImport;
import com.redis.riot.core.RiotContext;
import com.redis.riot.core.StepBuilder;
import com.redis.spring.batch.RedisItemWriter;
import com.redis.spring.batch.common.KeyValue;

public class FileDumpImport extends AbstractStructImport {
Expand Down Expand Up @@ -43,7 +43,7 @@ public void setType(FileDumpType type) {
@Override
protected Job job(RiotContext executionContext) {
Iterator<TaskletStep> steps = FileUtils.inputResources(files, fileOptions).stream().map(r -> step(executionContext, r))
.map(StepBuilder::build).map(FaultTolerantStepBuilder::build).iterator();
.map(FaultTolerantStepBuilder::build).iterator();
if (!steps.hasNext()) {
throw new IllegalArgumentException("No file found");
}
Expand All @@ -54,17 +54,13 @@ protected Job job(RiotContext executionContext) {
return job.build();
}

@SuppressWarnings({ "rawtypes", "unchecked" })
private StepBuilder<KeyValue<String>, KeyValue<String>> step(RiotContext executionContext, Resource resource) {
StepBuilder<KeyValue<String>, KeyValue<String>> step = createStep();
step.name(resource.getDescription());
step.reader((ItemReader) reader(resource));
step.writer(writer(executionContext));
return step;
private FaultTolerantStepBuilder<KeyValue<String>, KeyValue<String>> step(RiotContext executionContext, Resource resource) {
ItemReader<KeyValue<String>> reader = reader(resource);
RedisItemWriter<String, String, KeyValue<String>> writer = writer(executionContext);
return step(resource.getDescription(), reader, writer);
}

@SuppressWarnings("rawtypes")
private ItemReader<KeyValue> reader(Resource resource) {
private ItemReader<KeyValue<String>> reader(Resource resource) {
if (type == FileDumpType.XML) {
return FileUtils.xmlReader(resource, KeyValue.class);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,10 @@
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.job.builder.SimpleJobBuilder;
import org.springframework.batch.core.step.builder.FaultTolerantStepBuilder;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.file.FlatFileItemReader;
import org.springframework.batch.item.file.builder.FlatFileItemReaderBuilder;
import org.springframework.batch.item.file.mapping.FieldSetMapper;
Expand All @@ -37,11 +39,10 @@

import com.redis.riot.core.AbstractMapImport;
import com.redis.riot.core.RiotContext;
import com.redis.riot.core.StepBuilder;
import com.redis.riot.core.RiotUtils;
import com.redis.riot.core.function.MapToFieldFunction;
import com.redis.riot.core.function.RegexNamedGroupFunction;
import com.redis.riot.core.function.ToMapFunction;
import com.redis.spring.batch.util.BatchUtils;

public class FileImport extends AbstractMapImport {

Expand Down Expand Up @@ -160,15 +161,13 @@ private Step step(RiotContext context, Resource resource) {
if (maxItemCount != null && reader instanceof AbstractItemCountingItemStreamItemReader) {
((AbstractItemCountingItemStreamItemReader<Map<String, Object>>) reader).setMaxItemCount(maxItemCount);
}
String name = resource.getDescription();
StepBuilder<Map<String, Object>, Map<String, Object>> step = createStep();
step.name(name);
step.reader(reader);
step.writer(writer(context));
step.processor(processor(context));
step.addSkippableException(ParseException.class);
step.addNonRetriableException(ParseException.class);
return step.build().build();
ItemProcessor<Map<String, Object>, Map<String, Object>> processor = processor(context);
ItemWriter<Map<String, Object>> writer = writer(context);
FaultTolerantStepBuilder<Map<String, Object>, Map<String, Object>> step = step(resource.getDescription(), reader,
processor, writer);
step.skip(ParseException.class);
step.noRetry(ParseException.class);
return step.build();
}

private ItemReader<Map<String, Object>> reader(Resource resource) {
Expand Down Expand Up @@ -319,7 +318,7 @@ protected ItemProcessor<Map<String, Object>, Map<String, Object>> processor(Riot
if (processor == null) {
return regexProcessor;
}
return BatchUtils.processor(processor, regexProcessor);
return RiotUtils.processor(processor, regexProcessor);

}

Expand Down
2 changes: 1 addition & 1 deletion core/riot-core/riot-core.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ dependencies {
implementation 'org.apache.commons:commons-pool2'
testImplementation 'org.slf4j:slf4j-simple'
testImplementation group: 'com.redis', name: 'spring-batch-redis', version: springBatchRedisVersion, classifier: 'tests'
testImplementation group: 'org.awaitility', name: 'awaitility', version: awaitilityVersion
testImplementation 'org.awaitility:awaitility'
}

compileJava {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,10 @@
import org.springframework.batch.item.function.FunctionItemProcessor;
import org.springframework.expression.EvaluationContext;

import com.redis.riot.core.function.DropStreamMessageIdFunction;
import com.redis.riot.core.function.ExpressionFunction;
import com.redis.riot.core.function.KeyValueOperator;
import com.redis.riot.core.function.LongExpressionFunction;
import com.redis.riot.core.function.StreamMessageIdDropOperator;
import com.redis.riot.core.function.StringKeyValueFunction;
import com.redis.riot.core.function.ToStringKeyValueFunction;
import com.redis.spring.batch.RedisItemReader;
Expand All @@ -26,7 +26,7 @@ public abstract class AbstractExport extends AbstractJobRunnable {

private KeyFilterOptions keyFilterOptions = new KeyFilterOptions();

private KeyValueProcessorOptions processorOptions = new KeyValueProcessorOptions();
protected KeyValueProcessorOptions processorOptions = new KeyValueProcessorOptions();

public void setKeyFilterOptions(KeyFilterOptions keyFilterOptions) {
this.keyFilterOptions = keyFilterOptions;
Expand All @@ -46,7 +46,6 @@ protected <K> ItemProcessor<KeyValue<K>, KeyValue<K>> processor(RedisCodec<K, ?>
return new FunctionItemProcessor<>(code.andThen(function(context.getEvaluationContext())).andThen(decode));
}

@SuppressWarnings({ "unchecked", "rawtypes" })
private Function<KeyValue<String>, KeyValue<String>> function(EvaluationContext context) {
KeyValueOperator operator = new KeyValueOperator();
if (processorOptions.getKeyExpression() != null) {
Expand All @@ -59,8 +58,8 @@ private Function<KeyValue<String>, KeyValue<String>> function(EvaluationContext
operator.setTtlFunction(new LongExpressionFunction<>(context, processorOptions.getTtlExpression()));
}
}
if (processorOptions.isDropStreamMessageId()) {
operator.setValueFunction((Function) new StreamMessageIdDropOperator());
if (processorOptions.isDropStreamMessageId() && isStruct()) {
operator.setValueFunction(new DropStreamMessageIdFunction());
}
if (processorOptions.getTypeExpression() != null) {
Function<KeyValue<String>, String> function = ExpressionFunction.of(context, processorOptions.getTypeExpression());
Expand All @@ -69,6 +68,10 @@ private Function<KeyValue<String>, KeyValue<String>> function(EvaluationContext
return operator;
}

protected boolean isStruct() {
return true;
}

protected <K, V> void configureReader(RedisItemReader<K, V, ?> reader, RedisContext context) {
reader.setChunkSize(readerOptions.getChunkSize());
reader.setDatabase(context.getUri().getDatabase());
Expand Down
Loading

0 comments on commit a6cf0d5

Please sign in to comment.