Skip to content
This repository has been archived by the owner on Apr 5, 2022. It is now read-only.

Adding SQL support for JDBC incremental sink to HDFS #1894

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.batch.integration.x;

import java.util.HashMap;
Expand Down Expand Up @@ -56,6 +57,8 @@ public class IncrementalColumnRangePartitioner implements Partitioner, StepExecu

private String table;

private String sql;

private String column;

private int partitions;
Expand Down Expand Up @@ -120,6 +123,10 @@ public void setOverrideValue(Long overrideValue) {
this.overrideValue = overrideValue;
}

public void setSql(String sql) {
this.sql = sql;
}

/**
* Partition a database table assuming that the data in the column specified
* are uniformly distributed. The execution context values will have keys
Expand All @@ -133,14 +140,14 @@ public Map<String, ExecutionContext> partition(int gridSize) {
StringBuilder incrementalClause = new StringBuilder();
Map<String, ExecutionContext> result = new HashMap<>();

if(!StringUtils.hasText(checkColumn) && !StringUtils.hasText(column)) {
if (!StringUtils.hasText(checkColumn) && !StringUtils.hasText(column)) {
ExecutionContext value = new ExecutionContext();
value.put("partClause", "");
result.put("partition0", value);
value.put("partSuffix", "");
}
else {
if(StringUtils.hasText(checkColumn)) {
if (StringUtils.hasText(checkColumn)) {
incrementalClause.append(checkColumn).append(" > ").append(this.incrementalMin);
}

Expand All @@ -158,14 +165,15 @@ public Map<String, ExecutionContext> partition(int gridSize) {
end = this.partitionMax;
}

if(StringUtils.hasText(checkColumn)) {
value.putString("partClause", String.format("WHERE (%s BETWEEN %s AND %s) AND %s", column, start, end, incrementalClause.toString()));
if (StringUtils.hasText(checkColumn)) {
value.putString("partClause", String.format("WHERE (%s BETWEEN %s AND %s) AND %s", column, start,
end, incrementalClause.toString()));
}
else {
value.putString("partClause", String.format("WHERE (%s BETWEEN %s AND %s)", column, start, end));
}

value.putString("partSuffix", "-p"+number);
value.putString("partSuffix", "-p" + number);
start += targetSize;
end += targetSize;
number++;
Expand All @@ -179,9 +187,9 @@ public Map<String, ExecutionContext> partition(int gridSize) {

@Override
public void beforeStep(StepExecution stepExecution) {
if(StringUtils.hasText(checkColumn)) {
if (StringUtils.hasText(checkColumn)) {

if(overrideValue != null && overrideValue >= 0) {
if (overrideValue != null && overrideValue >= 0) {
this.incrementalMin = overrideValue;
}
else {
Expand All @@ -190,20 +198,20 @@ public void beforeStep(StepExecution stepExecution) {
// Get the last jobInstance...not the current one
List<JobInstance> jobInstances = jobExplorer.getJobInstances(jobName, 1, 1);

if(jobInstances.size() > 0) {
if (jobInstances.size() > 0) {
JobInstance lastInstance = jobInstances.get(jobInstances.size() - 1);

List<JobExecution> executions = jobExplorer.getJobExecutions(lastInstance);

JobExecution lastExecution = executions.get(0);

for (JobExecution execution : executions) {
if(lastExecution.getEndTime().getTime() < execution.getEndTime().getTime()) {
if (lastExecution.getEndTime().getTime() < execution.getEndTime().getTime()) {
lastExecution = execution;
}
}

if(lastExecution.getExecutionContext().containsKey(BATCH_INCREMENTAL_MAX_ID)) {
if (lastExecution.getExecutionContext().containsKey(BATCH_INCREMENTAL_MAX_ID)) {
this.incrementalMin = lastExecution.getExecutionContext().getLong(BATCH_INCREMENTAL_MAX_ID);
}
else {
Expand All @@ -215,15 +223,24 @@ public void beforeStep(StepExecution stepExecution) {
}
}

long newMin = jdbcTemplate.queryForObject(String.format("select max(%s) from %s", checkColumn, table), Integer.class);

stepExecution.getExecutionContext().put(BATCH_INCREMENTAL_MAX_ID, newMin);
if (StringUtils.hasText(table)) {
long newMin = jdbcTemplate.queryForObject(String.format("select max(%s) from %s", checkColumn, table),
Integer.class);
stepExecution.getExecutionContext().put(BATCH_INCREMENTAL_MAX_ID, newMin);
}
else if (StringUtils.hasText(sql)) {
String maxSqlStr = "SELECT max(" + checkColumn + ") from (" + sql + ") as boundQry";
long newMin = jdbcTemplate.queryForObject(maxSqlStr, Long.class);
stepExecution.getExecutionContext().put(BATCH_INCREMENTAL_MAX_ID, newMin);
}
}

if(StringUtils.hasText(column) && StringUtils.hasText(table)) {
if(StringUtils.hasText(checkColumn)) {
Long minResult = jdbcTemplate.queryForObject("SELECT MIN(" + column + ") from " + table + " where " + checkColumn + " > " + this.incrementalMin, Long.class);
Long maxResult = jdbcTemplate.queryForObject("SELECT MAX(" + column + ") from " + table + " where " + checkColumn + " > " + this.incrementalMin, Long.class);
if (StringUtils.hasText(column) && StringUtils.hasText(table)) {
if (StringUtils.hasText(checkColumn)) {
Long minResult = jdbcTemplate.queryForObject("SELECT MIN(" + column + ") from " + table + " where "
+ checkColumn + " > " + this.incrementalMin, Long.class);
Long maxResult = jdbcTemplate.queryForObject("SELECT MAX(" + column + ") from " + table + " where "
+ checkColumn + " > " + this.incrementalMin, Long.class);
this.partitionMin = minResult != null ? minResult : Long.MIN_VALUE;
this.partitionMax = maxResult != null ? maxResult : Long.MAX_VALUE;
}
Expand All @@ -234,6 +251,16 @@ public void beforeStep(StepExecution stepExecution) {
this.partitionMax = maxResult != null ? maxResult : Long.MAX_VALUE;
}
}
else if (StringUtils.hasText(sql)) {
String maxSqlStr = "SELECT MIN(" + column + ") from (" + sql + ") as boundQry where " + checkColumn + " > "
+ this.incrementalMin;
String minSqlStr = "SELECT MAX(" + column + ") from (" + sql + ") as boundQry where " + checkColumn + " > "
+ this.incrementalMin;
Long minResult = jdbcTemplate.queryForObject(maxSqlStr, Long.class);
Long maxResult = jdbcTemplate.queryForObject(minSqlStr, Long.class);
this.partitionMin = minResult != null ? minResult : Long.MIN_VALUE;
this.partitionMax = maxResult != null ? maxResult : Long.MAX_VALUE;
}
}

@Override
Expand All @@ -243,7 +270,7 @@ public ExitStatus afterStep(StepExecution stepExecution) {

@Override
public void afterPropertiesSet() throws Exception {
if(!StringUtils.hasText(this.column)) {
if (!StringUtils.hasText(this.column)) {
this.column = this.checkColumn;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,12 @@

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.Map;

import javax.sql.DataSource;
Expand All @@ -29,9 +34,16 @@
import org.junit.runner.RunWith;

import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobInstance;
import org.springframework.batch.core.StepExecution;
import org.springframework.batch.core.explore.JobExplorer;
import org.springframework.batch.core.explore.support.MapJobExplorerFactoryBean;
import org.springframework.batch.core.launch.support.SimpleJobLauncher;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.core.repository.support.MapJobRepositoryFactoryBean;
import org.springframework.batch.item.ExecutionContext;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.core.task.SimpleAsyncTaskExecutor;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
Expand Down Expand Up @@ -106,4 +118,106 @@ public void testFivePartitions() {
assertEquals("-p4", partitions.get("partition4").get("partSuffix"));
}


@Test
public void testTwoPartitionsForSql() throws Throwable {
jdbc.execute("insert into bar (foo) values (1), (2), (3), (4)");
partitioner.setCheckColumn("foo");
partitioner.setColumn("foo");
partitioner.setSql("select * from bar");
partitioner.setPartitions(2);
SimpleJobLauncher jobLauncher = new SimpleJobLauncher();
MapJobRepositoryFactoryBean repositoryFactory = new MapJobRepositoryFactoryBean();
repositoryFactory.afterPropertiesSet();
JobRepository jobRepository = repositoryFactory.getObject();
jobLauncher.setJobRepository(jobRepository);
jobLauncher.setTaskExecutor(new SimpleAsyncTaskExecutor());
jobLauncher.afterPropertiesSet();
JobExplorer jobExplorer = new MapJobExplorerFactoryBean(repositoryFactory).getObject();
partitioner.setJobExplorer(jobExplorer);
JobExecution jobExec = new JobExecution(5l);
JobInstance jobInstance = new JobInstance(1l, "testIncrementalJDBCSqlJob");
jobExec.setJobInstance(jobInstance);
StepExecution stepExec = new StepExecution("step1", jobExec);
List<StepExecution> stepExecutions = new ArrayList<StepExecution>();
stepExecutions.add(stepExec);
jobExec.addStepExecutions(stepExecutions);

partitioner.beforeStep(new StepExecution("step1", jobExec));
Map<String, ExecutionContext> partitions = partitioner.partition(1);
assertEquals(2, partitions.size());
assertTrue(partitions.containsKey("partition0"));
String part1Expected = "WHERE (foo BETWEEN 1 AND 2) AND foo > " + Long.MIN_VALUE;
String part1Actual = (String) partitions.get("partition0").get("partClause");
assertEquals(part1Expected, part1Actual);
assertEquals("-p0", partitions.get("partition0").get("partSuffix"));
assertTrue(partitions.containsKey("partition1"));
String part2Expected = "WHERE (foo BETWEEN 3 AND 4) AND foo > " + Long.MIN_VALUE;
String part2Actual = (String) partitions.get("partition1").get("partClause");
assertEquals(part2Expected, part2Actual);
assertEquals("-p1", partitions.get("partition1").get("partSuffix"));
}


@Test
public void testIncrementalSqlNextIterationValue() throws Throwable {
jdbc.execute("insert into bar (foo) values (1), (2), (3), (4)");
partitioner.setCheckColumn("foo");
partitioner.setColumn("foo");
partitioner.setSql("select * from bar");
partitioner.setPartitions(2);
JobExplorer jobExplorer = mock(JobExplorer.class);
partitioner.setJobExplorer(jobExplorer);
JobExecution jobExec = new JobExecution(1l);
JobInstance jobInstance1 = new JobInstance(1l, "testIncrementalJDBCSqlJob");
jobExec.setJobInstance(jobInstance1);
StepExecution stepExecution = new StepExecution("step1", jobExec);

when(jobExplorer.getJobInstances("testIncrementalJDBCSqlJob", 1, 1)).thenReturn(new ArrayList<JobInstance>());
partitioner.beforeStep(stepExecution);
//first time the value is long minimum as there is no previous instance
Map<String, ExecutionContext> partitions = partitioner.partition(1);
String queryPartClause = (String) partitions.get("partition0").get("partClause");
assertTrue(queryPartClause.endsWith(Long.MIN_VALUE + ""));
//mark end of job and adjust the max
jobExec.setEndTime(new Date(System.currentTimeMillis()));
jobExec.getExecutionContext().put("batch.incremental.maxId", 4l);

jdbc.execute("insert into bar (foo) values (5), (6), (7), (8)");

List<JobInstance> jobInstances = new ArrayList<JobInstance>();
jobInstances.add(jobInstance1);
JobInstance jobInstance2 = new JobInstance(2l, "testIncrementalJDBCSqlJob");
jobExec.setJobInstance(jobInstance2);
jobInstances.add(jobInstance2);
when(jobExplorer.getJobInstances("testIncrementalJDBCSqlJob", 1, 1)).thenReturn(jobInstances);
List<JobExecution> executions = new ArrayList<JobExecution>();
executions.add(jobExec);
when(jobExplorer.getJobExecutions(jobInstance2)).thenReturn(executions);
partitioner.beforeStep(new StepExecution("step1", jobExec));
//this time the value should be 4
partitions = partitioner.partition(1);
queryPartClause = (String) partitions.get("partition0").get("partClause");
assertTrue(queryPartClause.endsWith(4l + ""));
//mark end of job and adjust the max
jobExec.setEndTime(new Date(System.currentTimeMillis()));
jobExec.getExecutionContext().put("batch.incremental.maxId", 8l);

jdbc.execute("insert into bar (foo) values (9), (10), (7), (8)");

JobInstance jobInstance3 = new JobInstance(3l, "testIncrementalJDBCSqlJob");
jobExec.setJobInstance(jobInstance3);
jobInstances.add(jobInstance3);
when(jobExplorer.getJobInstances("testIncrementalJDBCSqlJob", 1, 1)).thenReturn(jobInstances);
executions.add(jobExec);
when(jobExplorer.getJobExecutions(jobInstance3)).thenReturn(executions);
partitioner.beforeStep(new StepExecution("step1", jobExec));
//this time the value should be 4
partitions = partitioner.partition(1);
queryPartClause = (String) partitions.get("partition0").get("partClause");
assertTrue(queryPartClause.endsWith(8l + ""));
//mark end of job and adjust the max
jobExec.setEndTime(new Date(System.currentTimeMillis()));
jobExec.getExecutionContext().put("batch.incremental.maxId", 8l);
}
}
1 change: 1 addition & 0 deletions modules/job/jdbchdfs/config/jdbchdfs.xml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
<bean id="partitioner" class="org.springframework.batch.integration.x.IncrementalColumnRangePartitioner" scope="step">
<property name="dataSource" ref="moduleDataSource"/>
<property name="table" value="${tableName}"/>
<property name="sql" value="${sql}" />
<property name="column" value="${partitionColumn}"/>
<property name="partitions" value="${partitions}"/>
<property name="jobExplorer" ref="jobExplorer"/>
Expand Down