Skip to content

Commit

Permalink
fix: Added systematic name on steps
Browse files Browse the repository at this point in the history
  • Loading branch information
jruaux committed Jun 11, 2024
1 parent 9911589 commit 1165b0c
Show file tree
Hide file tree
Showing 31 changed files with 240 additions and 300 deletions.
1 change: 1 addition & 0 deletions connectors/riot-faker/riot-faker.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
* limitations under the License.
*/
dependencies {
implementation project(':riot-core')
implementation 'org.springframework.batch:spring-batch-core'
implementation group: 'net.datafaker', name: 'datafaker', version: datafakerVersion
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,13 @@

import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.support.AbstractItemCountingItemStreamItemReader;
import org.springframework.expression.Expression;
import org.springframework.expression.spel.support.DataBindingMethodResolver;
import org.springframework.expression.spel.support.StandardEvaluationContext;
import org.springframework.util.Assert;
import org.springframework.util.ClassUtils;

import com.redis.riot.core.Expression;

import net.datafaker.Faker;

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@
import org.junit.jupiter.api.Test;
import org.springframework.batch.item.ExecutionContext;
import org.springframework.batch.item.ItemReader;
import org.springframework.expression.Expression;
import org.springframework.expression.spel.standard.SpelExpressionParser;

import com.redis.riot.core.Expression;

class FakerReaderTests {

Expand All @@ -25,14 +25,13 @@ public static <T> List<T> readAll(ItemReader<T> reader) throws Exception {

@Test
void fakerReader() throws Exception {
SpelExpressionParser parser = new SpelExpressionParser();
int count = 100;
FakerItemReader reader = new FakerItemReader();
Map<String, Expression> fields = new LinkedHashMap<>();
fields.put("index", parser.parseExpression("index"));
fields.put("firstName", parser.parseExpression("name.firstName"));
fields.put("lastName", parser.parseExpression("name.lastName"));
fields.put("thread", parser.parseExpression("thread"));
fields.put("index", Expression.parse("index"));
fields.put("firstName", Expression.parse("name.firstName"));
fields.put("lastName", Expression.parse("name.lastName"));
fields.put("thread", Expression.parse("thread"));
reader.setFields(fields);
reader.setMaxItemCount(count);
reader.open(new ExecutionContext());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,4 +131,11 @@ public void setHeader(boolean header) {
this.header = header;
}

@Override
public String toString() {
return "amazonS3Args=" + amazonS3Args + ", googleStorageArgs=" + googleStorageArgs + ", delimiter=" + delimiter
+ ", encoding=" + encoding + ", fileType=" + fileType + ", gzipped=" + gzipped + ", header=" + header
+ ", quoteCharacter=" + quoteCharacter;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@ public class FileReaderArgs extends FileArgs {

@Override
public Resource resource(String location) {

if (FileUtils.isStdin(location)) {
return new FilenameInputStreamResource(System.in, "stdin", "Standard Input");
}
Expand Down Expand Up @@ -72,6 +71,10 @@ public Resource resource(String location) {
return resource;
}

public List<Resource> resources() {
return files.stream().flatMap(FileUtils::expand).map(this::resource).collect(Collectors.toList());
}

public List<String> getFiles() {
return files;
}
Expand Down Expand Up @@ -140,8 +143,12 @@ public void setMaxItemCount(int maxItemCount) {
this.maxItemCount = maxItemCount;
}

public List<Resource> resources() {
return files.stream().flatMap(FileUtils::expand).map(this::resource).collect(Collectors.toList());
@Override
public String toString() {
return "FileReaderArgs [files=" + files + ", " + super.toString() + ", columnRanges=" + columnRanges
+ ", continuationString=" + continuationString + ", fields=" + fields + ", headerLine=" + headerLine
+ ", includedFields=" + includedFields + ", linesToSkip=" + linesToSkip + ", maxItemCount="
+ maxItemCount + "]";
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,22 @@ public WritableResource resource(String location) {
return writableResource;
}

public FileType fileType() {
try {
return fileType(file);
} catch (IOException e) {
throw new IllegalArgumentException("Could not determine type of file " + file, e);
}
}

@Override
public String toString() {
return "FileWriterArgs [file=" + file + ", " + super.toString() + ", formatterString=" + formatterString
+ ", append=" + append + ", forceSync=" + forceSync + ", rootName=" + rootName + ", elementName="
+ elementName + ", lineSeparator=" + lineSeparator + ", shouldDeleteIfEmpty=" + shouldDeleteIfEmpty
+ ", shouldDeleteIfExists=" + shouldDeleteIfExists + ", transactional=" + transactional + "]";
}

public String getFile() {
return file;
}
Expand Down Expand Up @@ -166,12 +182,4 @@ public void setTransactional(boolean transactional) {
this.transactional = transactional;
}

public FileType fileType() {
try {
return fileType(file);
} catch (IOException e) {
throw new IllegalArgumentException("Could not determine type of file " + file, e);
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,13 @@ public class FileWriterFactory {

private final Logger log = LoggerFactory.getLogger(getClass());

private FileWriterArgs options = new FileWriterArgs();
private FileWriterArgs args = new FileWriterArgs();
private Supplier<Map<String, Object>> headerSupplier = () -> null;

@SuppressWarnings("unchecked")
public <T> ItemWriter<T> create(String file) {
WritableResource resource = options.resource(file);
FileType type = options.fileType(resource);
WritableResource resource = args.resource(file);
FileType type = args.fileType(resource);
switch (type) {
case CSV:
return (ItemWriter<T>) delimitedWriter(resource);
Expand Down Expand Up @@ -67,9 +67,9 @@ private <T extends ObjectMapper> T objectMapper(T objectMapper) {
private <T> JsonFileItemWriter<T> jsonWriter(WritableResource resource) {
JsonFileItemWriterBuilder<T> writer = new JsonFileItemWriterBuilder<>();
writer.name(resource.getFilename());
writer.append(options.isAppend());
writer.encoding(options.getEncoding());
writer.lineSeparator(options.getLineSeparator());
writer.append(args.isAppend());
writer.encoding(args.getEncoding());
writer.lineSeparator(args.getLineSeparator());
writer.resource(resource);
writer.saveState(false);
ObjectMapper mapper = objectMapper(new ObjectMapper());
Expand All @@ -80,31 +80,31 @@ private <T> JsonFileItemWriter<T> jsonWriter(WritableResource resource) {
private <T> XmlResourceItemWriter<T> xmlWriter(WritableResource resource) {
XmlResourceItemWriterBuilder<T> writer = new XmlResourceItemWriterBuilder<>();
writer.name(resource.getFilename());
writer.append(options.isAppend());
writer.encoding(options.getEncoding());
writer.lineSeparator(options.getLineSeparator());
writer.rootName(options.getRootName());
writer.append(args.isAppend());
writer.encoding(args.getEncoding());
writer.lineSeparator(args.getLineSeparator());
writer.rootName(args.getRootName());
writer.resource(resource);
writer.saveState(false);
XmlMapper mapper = objectMapper(new XmlMapper());
mapper.setConfig(mapper.getSerializationConfig().withRootName(options.getElementName()));
mapper.setConfig(mapper.getSerializationConfig().withRootName(args.getElementName()));
writer.xmlObjectMarshaller(new JacksonJsonObjectMarshaller<>(mapper));
return writer.build();
}

private ItemWriter<Map<String, Object>> delimitedWriter(WritableResource resource) {
FlatFileItemWriterBuilder<Map<String, Object>> writer = flatFileWriter(resource);
DelimitedBuilder<Map<String, Object>> delimitedBuilder = writer.delimited();
delimitedBuilder.delimiter(options.getDelimiter());
delimitedBuilder.delimiter(args.getDelimiter());
delimitedBuilder.fieldExtractor(new PassThroughFieldExtractor<>());
delimitedBuilder.quoteCharacter(String.valueOf(options.getQuoteCharacter()));
delimitedBuilder.quoteCharacter(String.valueOf(args.getQuoteCharacter()));
return writer(writer, delimitedBuilder.build());
}

private FlatFileItemWriter<Map<String, Object>> writer(FlatFileItemWriterBuilder<Map<String, Object>> writer,
LineAggregator<Map<String, Object>> lineAggregator) {
writer.lineAggregator(lineAggregator);
if (options.isHeader()) {
if (args.isHeader()) {
Map<String, Object> headerRecord = headerSupplier.get();
if (CollectionUtils.isEmpty(headerRecord)) {
log.warn("Could not determine header");
Expand All @@ -122,7 +122,7 @@ private FlatFileItemWriter<Map<String, Object>> writer(FlatFileItemWriterBuilder
private ItemWriter<Map<String, Object>> fixedLengthWriter(WritableResource resource) {
FlatFileItemWriterBuilder<Map<String, Object>> writer = flatFileWriter(resource);
FormattedBuilder<Map<String, Object>> formattedBuilder = writer.formatted();
formattedBuilder.format(options.getFormatterString());
formattedBuilder.format(args.getFormatterString());
formattedBuilder.fieldExtractor(new PassThroughFieldExtractor<>());
return writer(writer, formattedBuilder.build());
}
Expand All @@ -131,23 +131,23 @@ private <T> FlatFileItemWriterBuilder<T> flatFileWriter(WritableResource resourc
FlatFileItemWriterBuilder<T> builder = new FlatFileItemWriterBuilder<>();
builder.name(resource.getFilename());
builder.resource(resource);
builder.append(options.isAppend());
builder.encoding(options.getEncoding());
builder.forceSync(options.isForceSync());
builder.lineSeparator(options.getLineSeparator());
builder.append(args.isAppend());
builder.encoding(args.getEncoding());
builder.forceSync(args.isForceSync());
builder.lineSeparator(args.getLineSeparator());
builder.saveState(false);
builder.shouldDeleteIfEmpty(options.isShouldDeleteIfEmpty());
builder.shouldDeleteIfExists(options.isShouldDeleteIfExists());
builder.transactional(options.isTransactional());
builder.shouldDeleteIfEmpty(args.isShouldDeleteIfEmpty());
builder.shouldDeleteIfExists(args.isShouldDeleteIfExists());
builder.transactional(args.isTransactional());
return builder;
}

public FileWriterArgs getOptions() {
return options;
public FileWriterArgs getArgs() {
return args;
}

public void setOptions(FileWriterArgs options) {
this.options = options;
public void setArgs(FileWriterArgs options) {
this.args = options;
}

public Supplier<Map<String, Object>> getHeaderSupplier() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,18 +159,14 @@ private <I, O> TaskletStep step(Step<I, O> step) {
}

private <I, O> SimpleStepBuilder<I, O> simpleStepBuilder(Step<I, O> step) {
if (step.getName() == null) {
step.name(jobName);
} else {
step.name(jobName + "-" + step.getName());
}
String stepName = jobName + "-" + step.getName();
if (step.getReader() instanceof ItemStreamSupport) {
ItemStreamSupport support = (ItemStreamSupport) step.getReader();
Assert.notNull(support.getName(), "No name specified for reader in step " + step.getName());
support.setName(step.getName() + "-" + support.getName());
Assert.notNull(support.getName(), "No name specified for reader in step " + stepName);
support.setName(stepName + "-" + support.getName());
}
log.info("Creating step {} with chunk size {}", step.getName(), stepArgs.getChunkSize());
SimpleStepBuilder<I, O> builder = new StepBuilder(step.getName(), jobRepository).chunk(stepArgs.getChunkSize(),
log.info("Creating step {} with chunk size {}", stepName, stepArgs.getChunkSize());
SimpleStepBuilder<I, O> builder = new StepBuilder(stepName, jobRepository).chunk(stepArgs.getChunkSize(),
transactionManager);
builder.reader(reader(step));
builder.writer(writer(step));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
import java.util.LinkedHashMap;
import java.util.Map;

import org.springframework.expression.Expression;
import org.springframework.expression.spel.support.StandardEvaluationContext;
import org.springframework.util.CollectionUtils;

Expand All @@ -16,10 +15,10 @@ public class EvaluationContextArgs {
public static final String DATE_VAR = "date";

@Option(arity = "1..*", names = "--var", description = "SpEL expressions for context variables, in the form var=\"exp\"", paramLabel = "<v=exp>")
Map<String, Expression> varExpressions = new LinkedHashMap<>();
private Map<String, Expression> varExpressions = new LinkedHashMap<>();

@Option(names = "--date-format", description = "Date/time format (default: ${DEFAULT-VALUE}). For details see https://www.baeldung.com/java-simple-date-format#date_time_patterns", paramLabel = "<fmt>")
String dateFormat = DEFAULT_DATE_FORMAT;
private String dateFormat = DEFAULT_DATE_FORMAT;

private Map<String, Object> vars = new LinkedHashMap<>();

Expand Down
52 changes: 52 additions & 0 deletions core/riot-core/src/main/java/com/redis/riot/core/Expression.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package com.redis.riot.core;

import java.util.function.Predicate;

import org.springframework.expression.EvaluationContext;
import org.springframework.expression.common.TemplateParserContext;
import org.springframework.expression.spel.standard.SpelExpressionParser;

public class Expression {

protected static final SpelExpressionParser PARSER = new SpelExpressionParser();

protected final org.springframework.expression.Expression spelExpression;

public Expression(org.springframework.expression.Expression expression) {
this.spelExpression = expression;
}

@Override
public String toString() {
return spelExpression.getExpressionString();
}

public <T> Predicate<T> predicate(EvaluationContext context) {
return t -> spelExpression.getValue(context, t, Boolean.class);
}

public Object getValue(EvaluationContext context) {
return spelExpression.getValue(context);
}

public Object getValue(EvaluationContext context, Object rootObject) {
return spelExpression.getValue(context, rootObject);
}

public Long getLong(EvaluationContext context, Object rootObject) {
return spelExpression.getValue(context, rootObject, Long.class);
}

public String getString(EvaluationContext context, Object rootObject) {
return spelExpression.getValue(context, rootObject, String.class);
}

public static Expression parse(String expression) {
return new Expression(PARSER.parseExpression(expression));
}

public static TemplateExpression parseTemplate(String expression) {
return new TemplateExpression(PARSER.parseExpression(expression, new TemplateParserContext()));
}

}
28 changes: 0 additions & 28 deletions core/riot-core/src/main/java/com/redis/riot/core/RiotUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
import java.util.List;
import java.util.Objects;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.Stream;

Expand All @@ -25,32 +24,12 @@
import org.springframework.batch.item.function.FunctionItemProcessor;
import org.springframework.batch.item.support.CompositeItemProcessor;
import org.springframework.batch.item.support.CompositeItemWriter;
import org.springframework.expression.EvaluationContext;
import org.springframework.expression.Expression;
import org.springframework.expression.common.TemplateParserContext;
import org.springframework.expression.spel.standard.SpelExpressionParser;

public abstract class RiotUtils {

private static final SpelExpressionParser parser = new SpelExpressionParser();

private RiotUtils() {
}

public static Expression parse(String expressionString) {
return parser.parseExpression(expressionString);
}

public static TemplateExpression parseTemplate(String expressionString) {
TemplateExpression expression = new TemplateExpression();
expression.setExpression(parser.parseExpression(expressionString, new TemplateParserContext()));
return expression;
}

public static <T> Predicate<T> predicate(EvaluationContext context, Expression expression) {
return t -> expression.getValue(context, t, Boolean.class);
}

public static <S, T> ItemProcessor<S, T> processor(Collection<? extends Function<?, ?>> functions) {
return processor(functions.toArray(new Function[0]));
}
Expand Down Expand Up @@ -125,11 +104,4 @@ public static String toString(ByteArrayOutputStream out) {
}
}

public static String toString(Expression expression) {
if (expression == null) {
return String.valueOf(expression);
}
return expression.getExpressionString();
}

}
Loading

0 comments on commit 1165b0c

Please sign in to comment.