Skip to content

Commit

Permalink
refactor: Made job runnable top level
Browse files Browse the repository at this point in the history
  • Loading branch information
jruaux committed Apr 19, 2024
1 parent 840e38f commit b87d16c
Show file tree
Hide file tree
Showing 14 changed files with 61 additions and 108 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,10 @@

import org.HdrHistogram.Histogram;
import org.LatencyUtils.LatencyStats;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.step.tasklet.CallableTaskletAdapter;
import org.springframework.batch.core.step.tasklet.TaskletStep;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.util.Assert;

import com.redis.riot.core.AbstractRunnable;
Expand Down Expand Up @@ -83,7 +87,17 @@ public void setPercentiles(double[] percentiles) {
}

@Override
protected void doRun() {
protected Job job() {
TaskletStep step = new TaskletStep();
CallableTaskletAdapter tasklet = new CallableTaskletAdapter();
tasklet.setCallable(this::call);
step.setName(getName());
step.setJobRepository(getJobFactory().getJobRepository());
step.setTasklet(tasklet);
return jobBuilder().start(step).build();
}

private RepeatStatus call() {
for (int iteration = 0; iteration < iterations; iteration++) {
LatencyStats stats = new LatencyStats();
for (int index = 0; index < count; index++) {
Expand Down Expand Up @@ -114,6 +128,7 @@ protected void doRun() {
}
}
}
return RepeatStatus.FINISHED;
}

private long toTimeUnit(long value) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@
import org.springframework.core.task.SimpleAsyncTaskExecutor;
import org.springframework.expression.spel.support.StandardEvaluationContext;

import com.redis.lettucemod.api.StatefulRedisModulesConnection;
import com.redis.lettucemod.util.RedisModulesUtils;
import com.redis.riot.core.AbstractExport;
import com.redis.riot.core.RedisClientOptions;
import com.redis.riot.core.RedisWriterOptions;
Expand Down Expand Up @@ -64,19 +62,22 @@ public class Replication extends AbstractExport {

private RedisURI targetRedisURI;
private AbstractRedisClient targetRedisClient;
private StatefulRedisModulesConnection<String, String> targetRedisConnection;

@Override
protected boolean isStruct() {
return type == ReplicationType.STRUCT;
}

@Override
protected void open() throws Exception {
targetRedisURI = targetRedisClientOptions.redisURI();
targetRedisClient = targetRedisClientOptions.client(targetRedisURI);
targetRedisConnection = RedisModulesUtils.connection(targetRedisClient);
super.open();
public void run() {
try {
targetRedisURI = targetRedisClientOptions.redisURI();
targetRedisClient = targetRedisClientOptions.client(targetRedisURI);
super.run();
} finally {
targetRedisClient.close();
targetRedisClient.getResources().shutdown();
}
}

@Override
Expand All @@ -87,17 +88,6 @@ protected StandardEvaluationContext evaluationContext() {
return evaluationContext;
}

@Override
protected void close() {
super.close();
try {
targetRedisConnection.close();
} finally {
targetRedisClient.close();
targetRedisClient.getResources().shutdown();
}
}

@Override
protected Job job() {
ItemProcessor<KeyValue<byte[], Object>, KeyValue<byte[], Object>> processor = processor(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@

import io.lettuce.core.codec.RedisCodec;

public abstract class AbstractExport extends AbstractJobRunnable {
public abstract class AbstractExport extends AbstractRunnable {

private static final String REDIS_VAR = "redis";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@

import io.lettuce.core.AbstractRedisClient;

public abstract class AbstractImport extends AbstractJobRunnable {
public abstract class AbstractImport extends AbstractRunnable {

private RedisWriterOptions writerOptions = new RedisWriterOptions();
private EvaluationContextOptions evaluationContextOptions = new EvaluationContextOptions();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,8 @@
import java.util.Arrays;
import java.util.List;

import org.springframework.batch.core.ExitStatus;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobExecutionException;
import org.springframework.batch.core.StepExecution;
import org.springframework.batch.core.job.builder.JobBuilder;
import org.springframework.batch.core.step.builder.FaultTolerantStepBuilder;
import org.springframework.batch.core.step.builder.SimpleStepBuilder;
Expand All @@ -34,7 +31,7 @@
import io.lettuce.core.RedisCommandExecutionException;
import io.lettuce.core.RedisCommandTimeoutException;

public abstract class AbstractJobRunnable extends AbstractRunnable {
public abstract class AbstractJobRunnable implements Runnable {

public static final SkipPolicy DEFAULT_SKIP_POLICY = new NeverSkipItemSkipPolicy();
public static final int DEFAULT_SKIP_LIMIT = 0;
Expand Down Expand Up @@ -79,44 +76,24 @@ public void setJobFactory(JobFactory jobFactory) {
}

@Override
protected void open() throws Exception {
super.open();
public void run() {
if (jobFactory == null) {
jobFactory = new JobFactory();
jobFactory.afterPropertiesSet();
try {
jobFactory.afterPropertiesSet();
} catch (Exception e) {
throw new ExecutionException("Could not initialize job infrastructure", e);
}
}
}

@Override
protected void doRun() {
Job job = job();
JobExecution jobExecution;
try {
jobExecution = jobFactory.run(job);
JobFactory.checkJobExecution(jobFactory.run(job()));
} catch (JobExecutionException e) {
throw new ExecutionException(String.format(FAILED_JOB_MESSAGE, job.getName()), e);
}
if (jobExecution.getExitStatus().getExitCode().equals(ExitStatus.FAILED.getExitCode())) {
for (StepExecution stepExecution : jobExecution.getStepExecutions()) {
ExitStatus exitStatus = stepExecution.getExitStatus();
if (exitStatus.getExitCode().equals(ExitStatus.FAILED.getExitCode())) {
String message = String.format("Error executing step %s in job %s: %s", stepExecution.getStepName(),
job.getName(), exitStatus.getExitDescription());
if (stepExecution.getFailureExceptions().isEmpty()) {
throw new ExecutionException(message);
}
throw new ExecutionException(message, stepExecution.getFailureExceptions().get(0));
}
}
if (jobExecution.getAllFailureExceptions().isEmpty()) {
throw new ExecutionException(String.format("Error executing job %s: %s", job.getName(),
jobExecution.getExitStatus().getExitDescription()));
}
throw new ExecutionException(String.format(FAILED_JOB_MESSAGE, name), e);
}
}

protected JobBuilder jobBuilder() {
return jobFactory.jobBuilder(getName());
return jobFactory.jobBuilder(name);
}

protected abstract Job job();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import io.lettuce.core.AbstractRedisClient;
import io.lettuce.core.RedisURI;

public abstract class AbstractRunnable implements Runnable {
public abstract class AbstractRunnable extends AbstractJobRunnable {

private RedisClientOptions redisClientOptions = new RedisClientOptions();

Expand All @@ -16,25 +16,11 @@ public abstract class AbstractRunnable implements Runnable {

@Override
public void run() {
try {
open();
} catch (Exception e) {
throw new ExecutionException("Could not initialize RIOT", e);
}
doRun();
close();
}

protected abstract void doRun();

protected void open() throws Exception {
redisURI = redisClientOptions.redisURI();
redisClient = redisClientOptions.client(redisURI);
redisConnection = RedisModulesUtils.connection(redisClient);
}

protected void close() {
try {
redisClient = redisClientOptions.client(redisURI);
redisConnection = RedisModulesUtils.connection(redisClient);
super.run();
redisConnection.close();
} finally {
redisClient.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import com.redis.spring.batch.KeyValue;
import com.redis.spring.batch.RedisItemWriter;

public abstract class AbstractStructImport extends AbstractJobRunnable {
public abstract class AbstractStructImport extends AbstractRunnable {

private RedisWriterOptions writerOptions = new RedisWriterOptions();

Expand Down
2 changes: 1 addition & 1 deletion gradle.properties
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ latencyutilsVersion = 2.0.3
lettucemodVersion = 3.7.3
picocliVersion = 4.7.5
progressbarVersion = 0.10.0
springBatchRedisVersion = 4.1.2
springBatchRedisVersion = 4.1.3-SNAPSHOT
testcontainersRedisVersion = 2.2.0

org.gradle.daemon = false
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import org.springframework.batch.item.ItemReader;

import com.redis.riot.core.AbstractExport;
import com.redis.riot.core.AbstractJobRunnable;
import com.redis.riot.core.AbstractRunnable;
import com.redis.spring.batch.RedisItemReader;
import com.redis.spring.batch.RedisItemReader.ReaderMode;
import com.redis.spring.batch.reader.ScanSizeEstimator;
Expand All @@ -21,7 +21,7 @@ public abstract class AbstractExportCommand extends AbstractJobCommand {
KeyValueProcessorArgs processorArgs = new KeyValueProcessorArgs();

@Override
protected AbstractJobRunnable jobRunnable() {
protected AbstractRunnable runnable() {
AbstractExport export = exportRunnable();
export.setReaderOptions(readerArgs.readerOptions());
export.setProcessorOptions(processorArgs.processorOptions());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ protected List<Operation<String, String, Map<String, Object>, Object>> operation
}

@Override
protected AbstractImport jobRunnable() {
protected AbstractImport runnable() {
AbstractImport runnable = importRunnable();
runnable.setOperations(operations());
runnable.setEvaluationContextOptions(evaluationContextArgs.evaluationContextOptions());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,18 +14,25 @@
import org.springframework.util.ClassUtils;

import com.redis.riot.core.AbstractJobRunnable;
import com.redis.riot.core.AbstractRunnable;

import me.tongfei.progressbar.DelegatingProgressBarConsumer;
import me.tongfei.progressbar.ProgressBarBuilder;
import me.tongfei.progressbar.ProgressBarStyle;
import picocli.CommandLine.Command;
import picocli.CommandLine.Option;
import picocli.CommandLine.ParentCommand;

abstract class AbstractJobCommand extends AbstractSubCommand {
@Command
abstract class AbstractJobCommand extends BaseCommand implements Runnable {

public enum ProgressStyle {
BLOCK, BAR, ASCII, LOG, NONE
}

@ParentCommand
protected AbstractMainCommand parent;

@Option(names = "--sleep", description = "Duration in ms to sleep after each batch write (default: ${DEFAULT-VALUE}).", paramLabel = "<ms>")
long sleep;

Expand Down Expand Up @@ -73,8 +80,8 @@ private ProgressBarStyle progressBarStyle() {
}

@Override
protected AbstractJobRunnable runnable() {
AbstractJobRunnable runnable = jobRunnable();
public void run() {
AbstractRunnable runnable = runnable();
if (name != null) {
runnable.setName(name);
}
Expand All @@ -87,7 +94,8 @@ protected AbstractJobRunnable runnable() {
if (progressStyle != ProgressStyle.NONE) {
runnable.addStepConfiguration(this::configureProgress);
}
return runnable;
runnable.setRedisClientOptions(parent.redisArgs.redisOptions());
runnable.run();
}

private void configureProgress(SimpleStepBuilder<?, ?> step, String stepName, ItemReader<?> reader,
Expand Down Expand Up @@ -120,6 +128,6 @@ protected LongSupplier initialMaxSupplier(String stepName, ItemReader<?> reader)
return () -> ProgressStepExecutionListener.UNKNOWN_SIZE;
}

protected abstract AbstractJobRunnable jobRunnable();
protected abstract AbstractRunnable runnable();

}
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ public abstract class AbstractStructImportCommand extends AbstractJobCommand {
RedisWriterArgs writerArgs = new RedisWriterArgs();

@Override
protected AbstractStructImport jobRunnable() {
protected AbstractStructImport runnable() {
AbstractStructImport runnable = importRunnable();
runnable.setWriterOptions(writerArgs.writerOptions());
return runnable;
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
import picocli.CommandLine.Option;

@Command(name = "ping", description = "Test connectivity to a Redis server.")
public class PingCommand extends AbstractSubCommand {
public class PingCommand extends AbstractJobCommand {

@Option(names = "--iterations", description = "Number of test iterations. Use a negative value to test endlessly. (default: ${DEFAULT-VALUE}).", paramLabel = "<count>")
int iterations = Ping.DEFAULT_ITERATIONS;
Expand Down

0 comments on commit b87d16c

Please sign in to comment.