Skip to content

Commit

Permalink
refactor: Merged all redis-* into one
Browse files Browse the repository at this point in the history
  • Loading branch information
Julien Ruaux committed Mar 20, 2023
1 parent c5ba4d9 commit 391ddae
Show file tree
Hide file tree
Showing 111 changed files with 1,847 additions and 2,015 deletions.
4 changes: 4 additions & 0 deletions core/riot-core/riot-core.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -50,4 +50,8 @@ asciidoctor {
include '**/*'
}
}
}

compileJava {
options.release = 8
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,15 @@ public void setReaderOptions(RedisReaderOptions readerOptions) {
}

protected RedisItemReader<String, DataStructure<String>> reader(JobCommandContext context) {
return RedisItemReader.dataStructure(context.pool(), context.getJobRunner())
.options(readerOptions.readerOptions()).build();
return context.reader().readerOptions(readerOptions.readerOptions()).scanOptions(readerOptions.scanOptions())
.dataStructure();
}

protected <I, O> Job job(JobCommandContext context, String name, SimpleStepBuilder<I, O> step, String task) {
ScanSizeEstimator estimator = ScanSizeEstimator.builder(context.pool())
.options(readerOptions.estimatorOptions()).build();
return super.job(context, name, step, progressMonitor().task(task).initialMax(estimator::execute).build());
ScanSizeEstimator estimator = ScanSizeEstimator.client(context.getRedisClient())
.options(readerOptions.scanSizeEstimatorOptions()).build();
ProgressMonitor monitor = progressMonitor().task(task).initialMax(estimator::execute).build();
return context.job(name).start(step(step, monitor).build()).build();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
import java.util.Map;
import java.util.stream.Collectors;

import org.springframework.batch.core.Job;
import org.springframework.batch.core.step.builder.SimpleStepBuilder;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.ItemWriter;
Expand Down Expand Up @@ -40,7 +39,6 @@
import com.redis.riot.processor.MapAccessor;
import com.redis.riot.processor.MapProcessor;
import com.redis.riot.processor.SpelProcessor;
import com.redis.spring.batch.RedisItemWriter;
import com.redis.spring.batch.writer.Operation;

import picocli.CommandLine.ArgGroup;
Expand Down Expand Up @@ -113,7 +111,7 @@ protected ItemProcessor<Map<String, Object>, Map<String, Object>> processor(JobC
if (processorOptions.hasFilters()) {
processors.add(new FilteringProcessor(processorOptions.getFilters()));
}
return CompositeItemStreamItemProcessor.delegates(processors.toArray(ItemProcessor[]::new));
return CompositeItemStreamItemProcessor.delegates(processors.toArray(new ItemProcessor[0]));
}

@SuppressWarnings({ "unchecked", "rawtypes" })
Expand All @@ -132,12 +130,7 @@ protected ItemWriter<Map<String, Object>> writer(JobCommandContext context) {

private ItemWriter<Map<String, Object>> writer(JobCommandContext context,
Operation<String, String, Map<String, Object>> operation) {
return RedisItemWriter.operation(context.pool(), operation).options(writerOptions.writerOptions()).build();
}

protected Job job(JobCommandContext context, String name,
AbstractItemCountingItemStreamItemReader<Map<String, Object>> reader, ProgressMonitor monitor) {
return job(context, name, step(context, name, reader), monitor);
return context.writer().options(writerOptions.writerOptions()).operation(operation);
}

protected SimpleStepBuilder<Map<String, Object>, Map<String, Object>> step(JobCommandContext context, String name,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,23 +4,17 @@

import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.job.builder.SimpleJobBuilder;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.batch.core.JobParameters;

import com.redis.spring.batch.common.JobRunner;

import picocli.CommandLine.Command;
import picocli.CommandLine.Mixin;
import picocli.CommandLine.Model.CommandSpec;
import picocli.CommandLine.ParentCommand;
import picocli.CommandLine.Spec;

@Command(usageHelpAutoWidth = true)
public abstract class AbstractJobCommand implements Callable<Integer> {

@Spec
protected CommandSpec commandSpec;
@ParentCommand
private Main app;
@Mixin
Expand All @@ -34,7 +28,8 @@ public void setApp(Main app) {
public Integer call() throws Exception {
JobRunner jobRunner = JobRunner.inMemory();
try (JobCommandContext context = context(jobRunner, app.getRedisOptions())) {
JobExecution execution = jobRunner.run(job(context));
JobExecution execution = jobRunner.getJobLauncher().run(job(context), new JobParameters());
jobRunner.awaitTermination(execution);
if (execution.getStatus().isUnsuccessful()) {
return 1;
}
Expand All @@ -46,14 +41,6 @@ protected JobCommandContext context(JobRunner jobRunner, RedisOptions redisOptio
return new JobCommandContext(jobRunner, redisOptions);
}

protected abstract Job job(JobCommandContext context) throws Exception;

protected SimpleJobBuilder job(JobCommandContext context, String name, Step step) {
return context.job(name).start(step);
}

protected SimpleJobBuilder job(JobCommandContext context, String name, Tasklet tasklet) {
return job(context, name, context.step(name).tasklet(tasklet).build());
}
protected abstract Job job(JobCommandContext context);

}
Original file line number Diff line number Diff line change
@@ -1,31 +1,18 @@
package com.redis.riot;

import java.time.Duration;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeoutException;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import org.springframework.batch.core.ItemWriteListener;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.StepExecutionListener;
import org.springframework.batch.core.step.builder.FaultTolerantStepBuilder;
import org.springframework.batch.core.step.builder.SimpleStepBuilder;
import org.springframework.batch.core.step.skip.LimitCheckingItemSkipPolicy;
import org.springframework.batch.core.step.skip.SkipPolicy;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.ItemStreamReader;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.support.SynchronizedItemStreamReader;
import org.springframework.core.task.SyncTaskExecutor;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

import com.redis.spring.batch.common.JobRunner;
import com.redis.spring.batch.reader.PollableItemReader;

import io.lettuce.core.RedisCommandExecutionException;
import io.lettuce.core.RedisCommandTimeoutException;
import picocli.CommandLine.Mixin;

public abstract class AbstractTransferCommand extends AbstractJobCommand {
Expand All @@ -43,8 +30,17 @@ public void setTransferOptions(TransferOptions options) {

protected <I, O> SimpleStepBuilder<I, O> step(JobCommandContext context, String name, ItemReader<I> reader,
ItemProcessor<I, O> processor, ItemWriter<O> writer) {
return context.step(name).<I, O>chunk(options.getChunkSize()).reader(throttle(synchronize(reader)))
.processor(processor).writer(writer);
SimpleStepBuilder<I, O> step = context.step(name).<I, O>chunk(options.getChunkSize())
.reader(throttle(synchronize(reader))).processor(processor).writer(writer);
JobRunner.multiThreaded(step, options.getThreads());
return step;
}

private <I> ItemReader<I> synchronize(ItemReader<I> reader) {
if (options.getThreads() > 1) {
return JobRunner.synchronize(reader);
}
return reader;
}

private <I> ItemReader<I> throttle(ItemReader<I> reader) {
Expand All @@ -54,56 +50,16 @@ private <I> ItemReader<I> throttle(ItemReader<I> reader) {
return ThrottledItemReader.create(reader, Duration.ofMillis(options.getSleep()));
}

protected <I, O> Job job(JobCommandContext context, String name, SimpleStepBuilder<I, O> step,
ProgressMonitor monitor) {
return job(context, name, step(step, monitor).build()).build();
}

protected ProgressMonitor.Builder progressMonitor() {
return options.progressMonitor();
}

protected <I, O> FaultTolerantStepBuilder<I, O> step(SimpleStepBuilder<I, O> step, ProgressMonitor monitor) {
if (options.getThreads() > 1) {
ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
taskExecutor.setCorePoolSize(options.getThreads());
taskExecutor.setMaxPoolSize(options.getThreads());
taskExecutor.setQueueCapacity(options.getThreads());
taskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
taskExecutor.afterPropertiesSet();
step.taskExecutor(taskExecutor);
step.throttleLimit(options.getThreads());
} else {
step.taskExecutor(new SyncTaskExecutor());
}
if (options.isProgressEnabled()) {
step.listener((StepExecutionListener) monitor);
step.listener((ItemWriteListener<Object>) monitor);
}
SkipPolicy skipPolicy = options.getSkipPolicy().getSkipPolicy();
if (skipPolicy instanceof LimitCheckingItemSkipPolicy) {
LimitCheckingItemSkipPolicy limitSkipPolicy = (LimitCheckingItemSkipPolicy) skipPolicy;
limitSkipPolicy.setSkippableExceptionMap(
Stream.of(RedisCommandExecutionException.class, RedisCommandTimeoutException.class,
TimeoutException.class).collect(Collectors.toMap(Function.identity(), t -> true)));
}
return step.faultTolerant().skipPolicy(options.getSkipPolicy().getSkipPolicy());
}

private <I> ItemReader<I> synchronize(ItemReader<I> reader) {
if (options.getThreads() > 1) {
if (reader instanceof PollableItemReader) {
SynchronizedPollableItemReader<I> pollableReader = new SynchronizedPollableItemReader<>();
pollableReader.setDelegate((PollableItemReader<I>) reader);
return pollableReader;
}
if (reader instanceof ItemStreamReader) {
SynchronizedItemStreamReader<I> streamReader = new SynchronizedItemStreamReader<>();
streamReader.setDelegate((ItemStreamReader<I>) reader);
return streamReader;
}
}
return reader;
return JobRunner.faultTolerant(step, options.faultToleranceOptions());
}

protected <I, O> FaultTolerantStepBuilder<I, O> faultTolerant(SimpleStepBuilder<I, O> step) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,15 @@

import org.springframework.util.Assert;

import com.redis.spring.batch.step.FlushingOptions;
import com.redis.spring.batch.common.FlushingOptions;
import com.redis.spring.batch.step.FlushingChunkProvider;

import picocli.CommandLine.Option;

public class FlushingTransferOptions {

@Option(names = "--flush-interval", description = "Max duration between flushes (default: ${DEFAULT-VALUE}).", paramLabel = "<ms>")
private long flushInterval = FlushingOptions.DEFAULT_FLUSHING_INTERVAL.toMillis();
private long flushInterval = FlushingChunkProvider.DEFAULT_FLUSHING_INTERVAL.toMillis();
@Option(names = "--idle-timeout", description = "Min duration of inactivity to consider transfer complete.", paramLabel = "<ms>")
private Optional<Long> idleTimeout = Optional.empty();

Expand All @@ -32,8 +33,8 @@ public String toString() {
}

public FlushingOptions flushingOptions() {
return FlushingOptions.builder().interval(Duration.ofMillis(flushInterval))
.timeout(idleTimeout.map(Duration::ofMillis)).build();
return FlushingOptions.builder().flushingInterval(Duration.ofMillis(flushInterval))
.idleTimeout(idleTimeout.map(Duration::ofMillis)).build();
}

}
55 changes: 37 additions & 18 deletions core/riot-core/src/main/java/com/redis/riot/JobCommandContext.java
Original file line number Diff line number Diff line change
@@ -1,18 +1,19 @@
package com.redis.riot;

import org.apache.commons.pool2.impl.GenericObjectPool;
import org.springframework.batch.core.job.builder.JobBuilder;
import org.springframework.batch.core.step.builder.StepBuilder;

import com.redis.lettucemod.RedisModulesClient;
import com.redis.lettucemod.api.StatefulRedisModulesConnection;
import com.redis.lettucemod.cluster.RedisModulesClusterClient;
import com.redis.spring.batch.RedisItemReader;
import com.redis.spring.batch.RedisItemWriter;
import com.redis.spring.batch.common.JobRunner;

import io.lettuce.core.AbstractRedisClient;
import io.lettuce.core.RedisURI;
import io.lettuce.core.api.StatefulConnection;
import io.lettuce.core.codec.RedisCodec;
import io.lettuce.core.codec.StringCodec;
import io.lettuce.core.pubsub.StatefulRedisPubSubConnection;

public class JobCommandContext implements AutoCloseable {
Expand All @@ -29,10 +30,6 @@ public JobCommandContext(JobRunner jobRunner, RedisOptions redisOptions) {
this.redisClient = redisOptions.client();
}

public JobRunner getJobRunner() {
return jobRunner;
}

public RedisOptions getRedisOptions() {
return redisOptions;
}
Expand All @@ -51,14 +48,6 @@ public void close() throws Exception {
redisClient.getResources().shutdown();
}

public JobBuilder job(String name) {
return jobRunner.job(name);
}

public StepBuilder step(String name) {
return jobRunner.step(name);
}

public StatefulRedisModulesConnection<String, String> connection() {
return connection(redisClient);
}
Expand All @@ -82,12 +71,42 @@ public <K, V> StatefulRedisPubSubConnection<K, V> pubSubConnection(AbstractRedis
return ((RedisModulesClient) client).connectPubSub(codec);
}

public GenericObjectPool<StatefulConnection<String, String>> pool() {
return redisOptions.pool(redisClient);
public <K, V> RedisItemReader.Builder<K, V> reader(RedisCodec<K, V> codec) {
return reader(redisClient, codec);
}

public RedisItemReader.Builder<String, String> reader() {
return reader(StringCodec.UTF8);
}

protected <K, V> RedisItemReader.Builder<K, V> reader(AbstractRedisClient client, RedisCodec<K, V> codec) {
if (client instanceof RedisModulesClusterClient) {
return RedisItemReader.client((RedisModulesClusterClient) client, codec).jobRunner(jobRunner);
}
return RedisItemReader.client((RedisModulesClient) client, codec).jobRunner(jobRunner);
}

public RedisItemWriter.Builder<String, String> writer() {
return writer(StringCodec.UTF8);
}

public <K, V> RedisItemWriter.Builder<K, V> writer(RedisCodec<K, V> codec) {
return writer(redisClient, codec);
}

public <K, V> GenericObjectPool<StatefulConnection<K, V>> pool(RedisCodec<K, V> codec) {
return redisOptions.pool(redisClient, codec);
protected static <K, V> RedisItemWriter.Builder<K, V> writer(AbstractRedisClient client, RedisCodec<K, V> codec) {
if (client instanceof RedisModulesClusterClient) {
return RedisItemWriter.client((RedisModulesClusterClient) client, codec);
}
return RedisItemWriter.client((RedisModulesClient) client, codec);
}

public StepBuilder step(String name) {
return jobRunner.step(name);
}

public JobBuilder job(String name) {
return jobRunner.job(name);
}

}
Loading

0 comments on commit 391ddae

Please sign in to comment.