Skip to content

Commit

Permalink
refactor: Added file reader and writer registries
Browse files Browse the repository at this point in the history
  • Loading branch information
jruaux committed Nov 21, 2024
1 parent 13f2bc4 commit 15e7613
Show file tree
Hide file tree
Showing 54 changed files with 1,469 additions and 945 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,26 @@ public abstract class AbstractCallableCommand extends BaseCommand implements Cal

@Override
public Integer call() throws Exception {
initialize();
try {
execute();
} finally {
teardown();
}
return 0;
}

protected void initialize() throws RiotInitializationException {
if (log == null) {
log = LoggerFactory.getLogger(getClass());
}
execute();
return 0;
}

protected abstract void execute() throws Exception;
protected abstract void execute() throws RiotExecutionException;

protected void teardown() {
// do nothing
}

public Logger getLog() {
return log;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,32 @@ public abstract class AbstractJobCommand extends AbstractCallableCommand {
private PlatformTransactionManager transactionManager;
private JobLauncher jobLauncher;

private TaskExecutorJobLauncher taskExecutorJobLauncher() throws Exception {
@Override
protected void initialize() throws RiotInitializationException {
super.initialize();
if (jobName == null) {
jobName = jobName();
}
if (jobRepository == null) {
try {
jobRepository = JobUtils.jobRepositoryFactoryBean(jobRepositoryName).getObject();
} catch (Exception e) {
throw new RiotInitializationException("Could not create job repository", e);
}
}
if (transactionManager == null) {
transactionManager = JobUtils.resourcelessTransactionManager();
}
if (jobLauncher == null) {
try {
jobLauncher = jobLauncher();
} catch (Exception e) {
throw new RiotInitializationException("Could not create job launcher", e);
}
}
}

private JobLauncher jobLauncher() throws Exception {
TaskExecutorJobLauncher launcher = new TaskExecutorJobLauncher();
launcher.setJobRepository(jobRepository);
launcher.setTaskExecutor(new SyncTaskExecutor());
Expand All @@ -82,26 +107,20 @@ private JobBuilder jobBuilder() {
}

@Override
protected void execute() throws Exception {
if (jobName == null) {
jobName = jobName();
}
if (jobRepository == null) {
jobRepository = JobUtils.jobRepositoryFactoryBean(jobRepositoryName).getObject();
}
if (transactionManager == null) {
transactionManager = JobUtils.resourcelessTransactionManager();
}
if (jobLauncher == null) {
jobLauncher = taskExecutorJobLauncher();
protected void execute() throws RiotExecutionException {
Job job = job();
JobExecution jobExecution;
try {
jobExecution = jobLauncher.run(job, new JobParameters());
} catch (JobExecutionException e) {
throw new RiotExecutionException("Could not run job " + job.getName(), e);
}
JobExecution jobExecution = jobLauncher.run(job(), new JobParameters());
if (JobUtils.isFailed(jobExecution.getExitStatus())) {
for (StepExecution stepExecution : jobExecution.getStepExecutions()) {
ExitStatus stepExitStatus = stepExecution.getExitStatus();
if (JobUtils.isFailed(stepExitStatus)) {
if (CollectionUtils.isEmpty(stepExecution.getFailureExceptions())) {
throw new JobExecutionException(stepExitStatus.getExitDescription());
throw new RiotExecutionException(stepExitStatus.getExitDescription());
}
throw wrapException(stepExecution.getFailureExceptions());
}
Expand All @@ -117,11 +136,11 @@ private String jobName() {
return commandSpec.name();
}

private JobExecutionException wrapException(List<Throwable> throwables) {
private RiotExecutionException wrapException(List<Throwable> throwables) {
if (throwables.isEmpty()) {
return new JobExecutionException("Job failed");
return new RiotExecutionException("Job failed");
}
return new JobExecutionException("Job failed", throwables.get(0));
return new RiotExecutionException("Job failed", throwables.get(0));
}

protected Job job(Step<?, ?>... steps) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package com.redis.riot.core;

@SuppressWarnings("serial")
public class RiotExecutionException extends Exception {

public RiotExecutionException(String message, Throwable cause) {
super(message, cause);
}

public RiotExecutionException(String message) {
super(message);
}

public RiotExecutionException(Throwable cause) {
super(cause);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package com.redis.riot.core;

@SuppressWarnings("serial")
public class RiotInitializationException extends Exception {

public RiotInitializationException(String message, Throwable cause) {
super(message, cause);
}

public RiotInitializationException(String message) {
super(message);
}

public RiotInitializationException(Throwable cause) {
super(cause);
}

}

This file was deleted.

Loading

0 comments on commit 15e7613

Please sign in to comment.