Skip to content

Commit

Permalink
refactor: Added context creation hook
Browse files Browse the repository at this point in the history
  • Loading branch information
Julien Ruaux committed Aug 15, 2022
1 parent 724ef3b commit 991cdbd
Show file tree
Hide file tree
Showing 8 changed files with 34 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.springframework.expression.spel.support.StandardEvaluationContext;

import com.redis.riot.FlushingTransferOptions;
import com.redis.riot.JobCommandContext;
import com.redis.riot.KeyValueProcessorOptions;
import com.redis.riot.ProgressMonitor;
import com.redis.riot.RedisWriterOptions;
Expand Down Expand Up @@ -69,7 +70,7 @@ public RedisWriterOptions getWriterOptions() {
}

@Override
protected Job job(TargetCommandContext context) {
protected Job job(JobCommandContext context) {
switch (replicationOptions.getMode()) {
case LIVE:
SimpleFlow liveFlow = new FlowBuilder<SimpleFlow>("live-replication-live-flow")
Expand All @@ -90,13 +91,13 @@ protected Job job(TargetCommandContext context) {
}
}

private Job job(TargetCommandContext context, String name, Function<TargetCommandContext, Step> step) {
private Job job(JobCommandContext context, String name, Function<JobCommandContext, Step> step) {
SimpleJobBuilder job = context.job(name).start(step.apply(context));
optionalVerificationStep(context).ifPresent(job::next);
return job.build();
}

protected Optional<Step> optionalVerificationStep(TargetCommandContext context) {
protected Optional<Step> optionalVerificationStep(JobCommandContext context) {
if (replicationOptions.isVerify()) {
if (writerOptions.isDryRun()) {
return Optional.empty();
Expand All @@ -111,15 +112,15 @@ protected Optional<Step> optionalVerificationStep(TargetCommandContext context)
return Optional.empty();
}

private TaskletStep scanStep(TargetCommandContext context) {
private TaskletStep scanStep(JobCommandContext context) {
RedisItemReader<byte[], T> reader = reader(context, "scan-reader").build();
RedisItemWriter<byte[], byte[], T> writer = createWriter(context).build();
RedisScanSizeEstimator estimator = estimator(context).build();
ProgressMonitor monitor = progressMonitor().task("Scanning").initialMax(estimator::execute).build();
return step(step(context, "snapshot-replication", reader, processor(context), writer), monitor).build();
}

private TaskletStep liveReplicationStep(TargetCommandContext context) {
private TaskletStep liveReplicationStep(JobCommandContext context) {
RedisItemReader<byte[], T> reader = flushingTransferOptions.configure(reader(context, "redis-live-reader")
.live().notificationQueueCapacity(replicationOptions.getNotificationQueueCapacity())
.database(context.getRedisOptions().uri().getDatabase())).build();
Expand All @@ -130,28 +131,29 @@ private TaskletStep liveReplicationStep(TargetCommandContext context) {
return step(step, monitor).build();
}

private RedisItemWriter.Builder<byte[], byte[], T> createWriter(TargetCommandContext context) {
private RedisItemWriter.Builder<byte[], byte[], T> createWriter(JobCommandContext context) {
if (writerOptions.isDryRun()) {
return RedisItemWriter.operation(context.getTargetRedisClient(), ByteArrayCodec.INSTANCE, new Noop<>());
return RedisItemWriter.operation(((TargetCommandContext) context).getTargetRedisClient(),
ByteArrayCodec.INSTANCE, new Noop<>());
}
return writerOptions.configure(writer(context));
return writerOptions.configure(writer((TargetCommandContext) context));
}

private RedisItemReader.Builder<byte[], byte[], T> reader(TargetCommandContext context, String name) {
private RedisItemReader.Builder<byte[], byte[], T> reader(JobCommandContext context, String name) {
return readerOptions.configure(reader(context)).name(name);
}

protected abstract RedisItemWriter.Builder<byte[], byte[], T> writer(TargetCommandContext context);

protected abstract RedisItemReader.Builder<byte[], byte[], T> reader(TargetCommandContext context);
protected abstract RedisItemReader.Builder<byte[], byte[], T> reader(JobCommandContext context);

private ItemProcessor<T, T> processor(TargetCommandContext context) {
private ItemProcessor<T, T> processor(JobCommandContext context) {
SpelExpressionParser parser = new SpelExpressionParser();
List<ItemProcessor<? extends KeyValue<byte[], ?>, ? extends KeyValue<byte[], ?>>> processors = new ArrayList<>();
processorOptions.getKeyProcessor().ifPresent(p -> {
EvaluationContext evaluationContext = new StandardEvaluationContext();
evaluationContext.setVariable("src", context.getRedisOptions().uri());
evaluationContext.setVariable("dest", context.getTargetRedisOptions().uri());
evaluationContext.setVariable("dest", ((TargetCommandContext) context).getTargetRedisOptions().uri());
Expression expression = parser.parseExpression(p, new TemplateParserContext());
processors.add(new KeyValueKeyProcessor<>(expression, evaluationContext));
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
import java.util.logging.Logger;

import org.springframework.batch.core.ExitStatus;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.StepExecution;
import org.springframework.batch.core.listener.StepExecutionListenerSupport;
Expand All @@ -22,6 +21,7 @@
import com.redis.spring.batch.compare.KeyComparisonItemWriter;
import com.redis.spring.batch.compare.KeyComparisonLogger;
import com.redis.spring.batch.compare.KeyComparisonResults;
import com.redis.spring.batch.support.JobRunner;

import picocli.CommandLine.ArgGroup;
import picocli.CommandLine.Mixin;
Expand Down Expand Up @@ -54,24 +54,23 @@ public CompareOptions getCompareOptions() {
}

@Override
protected Job job(JobCommandContext context) throws Exception {
return job(new TargetCommandContext(context, targetRedisOptions));
protected JobCommandContext context(JobRunner jobRunner, RedisOptions redisOptions) {
return new TargetCommandContext(jobRunner, redisOptions, targetRedisOptions);
}

protected abstract Job job(TargetCommandContext context);

@Override
protected RedisScanSizeEstimator.Builder estimator(JobCommandContext context) {
return super.estimator(context).match(readerOptions.getScanMatch()).sampleSize(readerOptions.getSampleSize())
.type(readerOptions.getScanType());
}

protected Step verificationStep(TargetCommandContext context) {
protected Step verificationStep(JobCommandContext context) {
RedisItemReader<String, DataStructure<String>> sourceReader = readerOptions
.configure(RedisItemReader.dataStructure(context.getRedisClient())).jobRunner(context.getJobRunner())
.build();
RedisItemReader<String, DataStructure<String>> targetReader = readerOptions
.configure(RedisItemReader.dataStructure(context.getTargetRedisClient())).build();
.configure(RedisItemReader.dataStructure(((TargetCommandContext) context).getTargetRedisClient()))
.build();
log.log(Level.FINE, "Creating key comparator with TTL tolerance of {0} seconds",
compareOptions.getTtlTolerance());
KeyComparisonItemWriter writer = KeyComparisonItemWriter.valueReader(targetReader.getValueReader())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

import org.springframework.batch.core.Job;

import com.redis.riot.JobCommandContext;

import picocli.CommandLine.Command;

@Command(name = "compare", description = "Compare 2 Redis databases and print the differences")
Expand All @@ -10,8 +12,8 @@ public class CompareCommand extends AbstractTargetCommand {
private static final String NAME = "compare";

@Override
protected Job job(TargetCommandContext context) {
return context.job(NAME).start(verificationStep(context)).build();
protected Job job(JobCommandContext context) {
return context.job(NAME).start(verificationStep((TargetCommandContext) context)).build();
}

}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.redis.riot.redis;

import com.redis.riot.JobCommandContext;
import com.redis.spring.batch.KeyValue;
import com.redis.spring.batch.RedisItemReader;
import com.redis.spring.batch.RedisItemWriter;
Expand All @@ -16,7 +17,7 @@ protected RedisItemWriter.Builder<byte[], byte[], KeyValue<byte[], byte[]>> writ
}

@Override
protected RedisItemReader.Builder<byte[], byte[], KeyValue<byte[], byte[]>> reader(TargetCommandContext context) {
protected RedisItemReader.Builder<byte[], byte[], KeyValue<byte[], byte[]>> reader(JobCommandContext context) {
return RedisItemReader.keyDump(context.getRedisClient(), ByteArrayCodec.INSTANCE);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.redis.riot.redis;

import com.redis.riot.JobCommandContext;
import com.redis.spring.batch.DataStructure;
import com.redis.spring.batch.RedisItemReader;
import com.redis.spring.batch.RedisItemWriter;
Expand All @@ -17,7 +18,7 @@ protected RedisItemWriter.Builder<byte[], byte[], DataStructure<byte[]>> writer(
}

@Override
protected RedisItemReader.Builder<byte[], byte[], DataStructure<byte[]>> reader(TargetCommandContext context) {
protected RedisItemReader.Builder<byte[], byte[], DataStructure<byte[]>> reader(JobCommandContext context) {
return RedisItemReader.dataStructure(context.getRedisClient(), ByteArrayCodec.INSTANCE);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,6 @@ public class TargetCommandContext extends JobCommandContext {
private final RedisOptions targetRedisOptions;
private final AbstractRedisClient targetRedisClient;

public TargetCommandContext(JobCommandContext context, RedisOptions targetRedisOptions) {
this(context.getJobRunner(), context.getRedisOptions(), targetRedisOptions);
}

public TargetCommandContext(JobRunner jobRunner, RedisOptions redisOptions, RedisOptions targetRedisOptions) {
super(jobRunner, redisOptions);
this.targetRedisOptions = targetRedisOptions;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,17 @@ public void setApp(RiotApp app) {
@Override
public Integer call() throws Exception {
JobRunner jobRunner = JobRunner.inMemory();
JobExecution execution = jobRunner.run(job(new JobCommandContext(jobRunner, app.getRedisOptions())));
JobExecution execution = jobRunner.run(job(context(jobRunner, app.getRedisOptions())));
if (execution.getStatus().isUnsuccessful()) {
return 1;
}
return 0;
}

protected JobCommandContext context(JobRunner jobRunner, RedisOptions redisOptions) {
return new JobCommandContext(jobRunner, redisOptions);
}

protected abstract Job job(JobCommandContext context) throws Exception;

protected SimpleJobBuilder job(JobCommandContext context, String name, Step step) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ public RedisURI uri() {
return redisURI;
}

public ClientResources clientResources() {
private ClientResources clientResources() {
DefaultClientResources.Builder builder = DefaultClientResources.builder();
if (showMetrics) {
builder.commandLatencyRecorder(
Expand Down

0 comments on commit 991cdbd

Please sign in to comment.