Skip to content

Commit

Permalink
refactor: Reorganized options and CLI args
Browse files Browse the repository at this point in the history
  • Loading branch information
jruaux committed Nov 2, 2023
1 parent a752cb7 commit 6ec48bd
Show file tree
Hide file tree
Showing 46 changed files with 1,140 additions and 1,350 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -137,11 +137,11 @@ public static boolean isGoogleStorageResource(String file) {
}

@SuppressWarnings({ "rawtypes", "unchecked" })
public static <T> JsonItemReader<T> jsonReader(Resource resource, Class<? super T> clazz) {
public static <T> JsonItemReader<T> jsonReader(Resource resource, Class<? super T> type) {
JsonItemReaderBuilder<T> jsonReaderBuilder = new JsonItemReaderBuilder<>();
jsonReaderBuilder.name(resource.getFilename() + "-json-file-reader");
jsonReaderBuilder.resource(resource);
JacksonJsonObjectReader<T> jsonObjectReader = new JacksonJsonObjectReader(clazz);
JacksonJsonObjectReader<T> jsonObjectReader = new JacksonJsonObjectReader(type);
jsonObjectReader.setMapper(objectMapper());
jsonReaderBuilder.jsonObjectReader(jsonObjectReader);
return jsonReaderBuilder.build();
Expand All @@ -154,11 +154,11 @@ public static ObjectMapper objectMapper() {
}

@SuppressWarnings({ "unchecked", "rawtypes" })
public static <T> XmlItemReader<T> xmlReader(Resource resource, Class<? super T> clazz) {
public static <T> XmlItemReader<T> xmlReader(Resource resource, Class<? super T> type) {
XmlItemReaderBuilder<T> xmlReaderBuilder = new XmlItemReaderBuilder<>();
xmlReaderBuilder.name(resource.getFilename() + "-xml-file-reader");
xmlReaderBuilder.resource(resource);
XmlObjectReader<T> xmlObjectReader = new XmlObjectReader(clazz);
XmlObjectReader<T> xmlObjectReader = new XmlObjectReader(type);
xmlObjectReader.setMapper(xmlMapper());
xmlReaderBuilder.xmlObjectReader(xmlObjectReader);
return xmlReaderBuilder.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
import org.springframework.batch.item.UnexpectedInputException;

import com.redis.riot.core.RedisOptions;
import com.redis.riot.core.RedisUriOptions;
import com.redis.riot.core.operation.HsetBuilder;
import com.redis.spring.batch.test.AbstractTestBase;

Expand Down Expand Up @@ -53,9 +52,7 @@ void fileImportJSON() throws UnexpectedInputException, ParseException, NonTransi
private RedisOptions redisClientOptions() {
RedisOptions options = new RedisOptions();
options.setCluster(getRedisServer().isCluster());
RedisUriOptions uriOptions = new RedisUriOptions();
uriOptions.setUri(getRedisServer().getRedisURI());
options.setUriOptions(uriOptions);
options.setUri(getRedisServer().getRedisURI());
return options;
}

Expand Down
152 changes: 74 additions & 78 deletions core/riot-core/src/main/java/com/redis/riot/core/AbstractExport.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,83 +22,79 @@

public abstract class AbstractExport extends AbstractJobRunnable {

private RedisReaderOptions readerOptions = new RedisReaderOptions();

private KeyFilterOptions keyFilterOptions = new KeyFilterOptions();

protected KeyValueProcessorOptions processorOptions = new KeyValueProcessorOptions();

public void setKeyFilterOptions(KeyFilterOptions keyFilterOptions) {
this.keyFilterOptions = keyFilterOptions;
}

public void setReaderOptions(RedisReaderOptions readerOptions) {
this.readerOptions = readerOptions;
}

public void setProcessorOptions(KeyValueProcessorOptions options) {
this.processorOptions = options;
}

protected <K> ItemProcessor<KeyValue<K>, KeyValue<K>> processor(RedisCodec<K, ?> codec, RiotContext context) {
ToStringKeyValueFunction<K> code = new ToStringKeyValueFunction<>(codec);
StringKeyValueFunction<K> decode = new StringKeyValueFunction<>(codec);
return new FunctionItemProcessor<>(code.andThen(function(context.getEvaluationContext())).andThen(decode));
}

private Function<KeyValue<String>, KeyValue<String>> function(EvaluationContext context) {
KeyValueOperator operator = new KeyValueOperator();
if (processorOptions.getKeyExpression() != null) {
operator.setKeyFunction(ExpressionFunction.of(context, processorOptions.getKeyExpression()));
}
if (processorOptions.isDropTtl()) {
operator.setTtlFunction(t -> 0);
} else {
if (processorOptions.getTtlExpression() != null) {
operator.setTtlFunction(new LongExpressionFunction<>(context, processorOptions.getTtlExpression()));
}
}
if (processorOptions.isDropStreamMessageId() && isStruct()) {
operator.setValueFunction(new DropStreamMessageIdFunction());
}
if (processorOptions.getTypeExpression() != null) {
Function<KeyValue<String>, String> function = ExpressionFunction.of(context, processorOptions.getTypeExpression());
operator.setTypeFunction(function.andThen(DataType::of));
}
return operator;
}

protected abstract boolean isStruct();

protected <K, V> void configureReader(RedisItemReader<K, V, ?> reader, RedisContext context) {
reader.setChunkSize(readerOptions.getChunkSize());
reader.setDatabase(context.getUri().getDatabase());
reader.setKeyProcessor(keyFilteringProcessor(reader.getCodec()));
reader.setKeyPattern(readerOptions.getKeyPattern());
reader.setKeyType(readerOptions.getKeyType());
reader.setFlushInterval(readerOptions.getFlushInterval());
reader.setIdleTimeout(readerOptions.getIdleTimeout());
if (reader instanceof KeyValueItemReader) {
KeyValueItemReader<?, ?> keyValueReader = (KeyValueItemReader<?, ?>) reader;
keyValueReader.setMemoryUsageLimit(readerOptions.getMemoryUsageLimit());
keyValueReader.setMemoryUsageSamples(readerOptions.getMemoryUsageSamples());
keyValueReader.setPoolSize(readerOptions.getPoolSize());
}
reader.setNotificationQueueCapacity(readerOptions.getNotificationQueueCapacity());
reader.setOrderingStrategy(readerOptions.getOrderingStrategy());
reader.setPollTimeout(readerOptions.getPollTimeout());
reader.setQueueCapacity(readerOptions.getQueueCapacity());
reader.setReadFrom(readerOptions.getReadFrom());
reader.setScanCount(readerOptions.getScanCount());
reader.setThreads(readerOptions.getThreads());
}

public <K> ItemProcessor<K, K> keyFilteringProcessor(RedisCodec<K, ?> codec) {
Predicate<K> predicate = RiotUtils.keyFilterPredicate(codec, keyFilterOptions);
if (predicate == null) {
return null;
}
return new PredicateItemProcessor<>(predicate);
}
private RedisReaderOptions readerOptions = new RedisReaderOptions();

protected KeyValueProcessorOptions processorOptions = new KeyValueProcessorOptions();

public void setReaderOptions(RedisReaderOptions readerOptions) {
this.readerOptions = readerOptions;
}

public void setProcessorOptions(KeyValueProcessorOptions options) {
this.processorOptions = options;
}

protected <K> ItemProcessor<KeyValue<K>, KeyValue<K>> processor(RedisCodec<K, ?> codec, RiotContext context) {
ToStringKeyValueFunction<K> code = new ToStringKeyValueFunction<>(codec);
StringKeyValueFunction<K> decode = new StringKeyValueFunction<>(codec);
Function<KeyValue<String>, KeyValue<String>> function = processorFunction(context.getEvaluationContext());
return new FunctionItemProcessor<>(code.andThen(function).andThen(decode));
}

private Function<KeyValue<String>, KeyValue<String>> processorFunction(EvaluationContext context) {
KeyValueOperator operator = new KeyValueOperator();
if (processorOptions.getKeyExpression() != null) {
operator.setKeyFunction(ExpressionFunction.of(context, processorOptions.getKeyExpression()));
}
if (processorOptions.isDropTtl()) {
operator.setTtlFunction(t -> 0);
} else {
if (processorOptions.getTtlExpression() != null) {
operator.setTtlFunction(new LongExpressionFunction<>(context, processorOptions.getTtlExpression()));
}
}
if (processorOptions.isDropStreamMessageId() && isStruct()) {
operator.setValueFunction(new DropStreamMessageIdFunction());
}
if (processorOptions.getTypeExpression() != null) {
Function<KeyValue<String>, String> function = ExpressionFunction.of(context,
processorOptions.getTypeExpression());
operator.setTypeFunction(function.andThen(DataType::of));
}
return operator;
}

protected abstract boolean isStruct();

protected <K, V> void configureReader(RedisItemReader<K, V, ?> reader, RedisContext context) {
reader.setChunkSize(readerOptions.getChunkSize());
reader.setDatabase(context.getUri().getDatabase());
reader.setKeyProcessor(keyFilteringProcessor(reader.getCodec()));
reader.setKeyPattern(readerOptions.getKeyPattern());
reader.setKeyType(readerOptions.getKeyType());
reader.setFlushInterval(readerOptions.getFlushInterval());
reader.setIdleTimeout(readerOptions.getIdleTimeout());
if (reader instanceof KeyValueItemReader) {
KeyValueItemReader<?, ?> keyValueReader = (KeyValueItemReader<?, ?>) reader;
keyValueReader.setMemoryUsageLimit(readerOptions.getMemoryUsageLimit());
keyValueReader.setMemoryUsageSamples(readerOptions.getMemoryUsageSamples());
keyValueReader.setPoolSize(readerOptions.getPoolSize());
}
reader.setNotificationQueueCapacity(readerOptions.getNotificationQueueCapacity());
reader.setOrderingStrategy(readerOptions.getOrderingStrategy());
reader.setPollTimeout(readerOptions.getPollTimeout());
reader.setQueueCapacity(readerOptions.getQueueCapacity());
reader.setReadFrom(readerOptions.getReadFrom());
reader.setScanCount(readerOptions.getScanCount());
reader.setThreads(readerOptions.getThreads());
}

public <K> ItemProcessor<K, K> keyFilteringProcessor(RedisCodec<K, ?> codec) {
Predicate<K> predicate = RiotUtils.keyFilterPredicate(codec, readerOptions.getKeyFilterOptions());
if (predicate == null) {
return null;
}
return new PredicateItemProcessor<>(predicate);
}

}
Loading

0 comments on commit 6ec48bd

Please sign in to comment.