Skip to content

Commit

Permalink
Some refactoring of the statement batcher.
Browse files Browse the repository at this point in the history
  • Loading branch information
ePaul committed Jul 28, 2023
1 parent 494af9c commit c6941de
Show file tree
Hide file tree
Showing 3 changed files with 137 additions and 64 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,23 +6,26 @@
import org.springframework.jdbc.core.namedparam.NamedParameterJdbcTemplate;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.Spliterator;
import java.util.Spliterators;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;

import static java.util.stream.Collectors.joining;
import static java.util.stream.Collectors.toList;
import static org.springframework.jdbc.core.namedparam.SqlParameterSource.TYPE_UNKNOWN;
import static org.zalando.fahrschein.Preconditions.checkArgument;

/**
* A helper class to simulate query batching for SQL statements which return data,
* i.e. SELECT or anything with a RETURNING clause.
* Inspired by https://javaranch.com/journal/200510/batching.html.
* Inspired by <a href="https://javaranch.com/journal/200510/batching.html">Batching Select Statements in JDBC</a>
* (by Jeanne Boyarski).
* <p>
* The idea here is to prepare prepared statements returning result sets of a common row type,
* for several input batch sizes (e.g. 51, 11, 4, 1), and then split our actual input into these
Expand All @@ -37,69 +40,113 @@
*/
public class QueryStatementBatcher<T> {

final private String templatePrefix;
final private String templateSuffix;
final private String templateRepeated;
final private String templateSeparator;
final private String templatePlaceholder;
public static final String DEFAULT_TEMPLATE_PLACEHOLDER = "#";
public static final String DEFAULT_TEMPLATE_SEPARATOR = ", ";

final private RowMapper<T> resultRowMapper;
private static final int[] DEFAULT_TEMPLATE_SIZES = {51, 13, 4, 1};

final private RowMapper<T> resultRowMapper;
final List<SubTemplate> subTemplates;

public QueryStatementBatcher(String templatePrefix, String templateRepeated, String templateSeparator, String templateSuffix, RowMapper<T> resultMapper) {
this(templatePrefix, templateRepeated, "#", templateSeparator, templateSuffix, resultMapper);
/**
* Sets up a QueryStatementBatcher for a specific set of statements, composed from prefix, repeated part, (default separator) and suffix.
* Sizes will be determined
* @param templatePrefix A prefix which will be prepended to the repeated part of the query. It can contain
* parameter placeholders as usual for NamedParameterJdbcTemplate.
* @param templateRepeated The part of the query string which will be repeated according to the number of parameter sets.
* The parameter placeholders in here should contain {@code "#"}.
* Occurrences of this in the generated queries will
* be separated by the default operator (a comma).
* @param templateSuffix A suffix which will be used after the repeated part. It can contain
* parameter placeholders as usual for NamedParameterJdbcTemplate.
* @param resultMapper A mapper which will be used to map the results of the queries (JDBC ResultSets) to whatever
* output format is desired.
*/
public QueryStatementBatcher(String templatePrefix, String templateRepeated, String templateSuffix, RowMapper<T> resultMapper) {
this(templatePrefix, templateRepeated, DEFAULT_TEMPLATE_PLACEHOLDER, DEFAULT_TEMPLATE_SEPARATOR, templateSuffix, resultMapper, DEFAULT_TEMPLATE_SIZES);
}

QueryStatementBatcher(String templatePrefix, String templateRepeated, String templatePlaceholder, String templateSeparator, String templateSuffix, RowMapper<T> resultMapper) {
this(templatePrefix, templateRepeated, templatePlaceholder, templateSeparator, templateSuffix, resultMapper, 51, 13, 4, 1);
this(templatePrefix, templateRepeated, templatePlaceholder, templateSeparator, templateSuffix, resultMapper, DEFAULT_TEMPLATE_SIZES);
}

QueryStatementBatcher(String templatePrefix, String templateRepeated, String templateSeparator, String templateSuffix, RowMapper<T> resultMapper,
QueryStatementBatcher(String templatePrefix, String templateRepeated, String templateSuffix, RowMapper<T> resultMapper,
int... templateSizes) {
this(templatePrefix, templateRepeated, "#", templateSeparator, templateSuffix, resultMapper, templateSizes);
this(templatePrefix, templateRepeated, DEFAULT_TEMPLATE_PLACEHOLDER, DEFAULT_TEMPLATE_SEPARATOR, templateSuffix, resultMapper, templateSizes);
}

/**
* @param templatePrefix
* @param templateRepeated
* @param templatePlaceholder
* @param templateSeparator
* @param templateSuffix
* @param templateSizes An descending ordered sequence of integers. Last one needs to be 1.
* Sets up a QueryStatementBatcher for a specific set of statements composed from prefix, repeated part, separator and suffix.
* @param templatePrefix A prefix which will be prepended to the repeated part of the query. It can contain
* parameter placeholders as usual for NamedParameterJdbcTemplate.
* @param templateRepeated The part of the query string which will be repeated according to the number of parameter sets.
* The parameter placeholders in here (if they vary between parameter sets) should contain the
* templatePlaceholder.
* @param templatePlaceholder This placeholder is to be used as part of the parameter names in the repeated templates.
* @param templateSeparator This separator will be used between the repeated parts of the query.
* @param templateSuffix A suffix which will be used after the repeated part. It can contain
* parameter placeholders as usual for NamedParameterJdbcTemplate.
* @param resultMapper A mapper which will be used to map the results of the queries (JDBC ResultSets) to whatever
* output format is desired.
* @param templateSizes A sequence of integers. Smallest one needs to be 1.
* This indicates the sizes (number of parameter sets used) to be used for the individual queries.
*/
QueryStatementBatcher(String templatePrefix, String templateRepeated, String templatePlaceholder,
String templateSeparator, String templateSuffix, RowMapper<T> resultMapper,
int... templateSizes) {
this.templatePrefix = templatePrefix;
this.templateSuffix = templateSuffix;
this.templateRepeated = templateRepeated;
this.templateSeparator = templateSeparator;
this.templatePlaceholder = templatePlaceholder;
this.resultRowMapper = resultMapper;

sortDescending(templateSizes);
checkArgument(templateSizes[templateSizes.length-1] == 1,
"smallest template size is not 1!");
this.subTemplates = IntStream.of(templateSizes)
.mapToObj(size -> new SubTemplate(size, composeTemplate(size), templatePlaceholder))
.collect(Collectors.toList());
.mapToObj(size -> new SubTemplate(
size,
composeTemplate(size, templatePrefix, templateRepeated, templatePlaceholder,
templateSeparator, templateSuffix),
templatePlaceholder))
.collect(toList());
}

String composeTemplate(int valueCount) {
static String composeTemplate(int valueCount, String prefix, String repeated, String placeholder, String separator, String suffix) {
return IntStream.range(0, valueCount)
.mapToObj(i -> templateRepeated.replace(templatePlaceholder, String.valueOf(i)))
.collect(joining(templateSeparator, templatePrefix, templateSuffix));
.mapToObj(i -> repeated.replace(placeholder, String.valueOf(i)))
.collect(joining(separator, prefix, suffix));
}

public Stream<T> queryForStream(NamedParameterJdbcTemplate template,
/**
* Queries the database for a set of parameter sources, in an optimized way.
* This version should be used if there are no parameters in the non-repeated part
* of the query tempate.
* @param database the DB connection in form of a spring NamedParameterJdbcTemplate.
* @param repeatedInputs A stream of repeated inputs. The names of the parameters here
* should contain the placeholder (by default "#").
* @return A stream of results, one for each parameter source in the repeated input.
*/
public Stream<T> queryForStream(NamedParameterJdbcTemplate database,
Stream<MapSqlParameterSource> repeatedInputs) {
return queryForStream(template, new MapSqlParameterSource(), repeatedInputs);
return queryForStream(database, new MapSqlParameterSource(), repeatedInputs);
}

public Stream<T> queryForStream(NamedParameterJdbcTemplate template,
/**
* Queries the database for a set of parameter sources, in an optimized way.
* This version should be used if there are parameters in the non-repeated part
* of the template.
* @param database the DB connection in form of a spring NamedParameterJdbcTemplate.
* @param commonArguments a parameter source for any template parameters in the
* non-repeated part of the query (or parameters in the
* repeated part which don't change between input).
* @param repeatedInputs A stream of repeated inputs. The names of the parameters here
* * should contain the placeholder (by default "#").
* @return A stream of results, one for each parameter source in the repeated input.
*/
public Stream<T> queryForStream(NamedParameterJdbcTemplate database,
MapSqlParameterSource commonArguments,
Stream<MapSqlParameterSource> repeatedInputs) {
return queryForStreamRecursive(template, commonArguments, repeatedInputs, 0);
return queryForStreamRecursive(database, commonArguments, repeatedInputs, 0);
}

private Stream<T> queryForStreamRecursive(NamedParameterJdbcTemplate template,
private Stream<T> queryForStreamRecursive(NamedParameterJdbcTemplate database,
MapSqlParameterSource commonArguments,
Stream<MapSqlParameterSource> repeatedInputs,
int subTemplateIndex) {
Expand All @@ -108,13 +155,16 @@ private Stream<T> queryForStreamRecursive(NamedParameterJdbcTemplate template,
Stream<List<MapSqlParameterSource>> chunkedStream = chunkStream(repeatedInputs, firstSubTemplate.inputCount);
return chunkedStream.flatMap(chunk -> {
if (chunk.size() == firstSubTemplate.inputCount) {
return firstSubTemplate.queryForStream(template, commonArguments, chunk, resultRowMapper);
return firstSubTemplate.queryForStream(database, commonArguments, chunk, resultRowMapper);
} else {
return queryForStreamRecursive(template, commonArguments, chunk.stream(), subTemplateIndex + 1);
return queryForStreamRecursive(database, commonArguments, chunk.stream(), subTemplateIndex + 1);
}
});
}

/**
* This nested class handles a single "batch size".
*/
static class SubTemplate {
final int inputCount;
final String expandedTemplate;
Expand All @@ -126,13 +176,13 @@ private SubTemplate(int inputCount, String expandedTemplate, String namePlacehol
this.namePlaceholder = namePlaceholder;
}

<T> Stream<T> queryForStream(NamedParameterJdbcTemplate template,
<T> Stream<T> queryForStream(NamedParameterJdbcTemplate database,
MapSqlParameterSource commonArguments,
List<MapSqlParameterSource> repeatedInputs,
List<? extends MapSqlParameterSource> repeatedInputs,
RowMapper<T> mapper) {
if (repeatedInputs.size() != inputCount) {
throw new IllegalArgumentException(String.format("input size = %s != %s = inputCount", repeatedInputs.size(), inputCount));
}
checkArgument(repeatedInputs.size() == inputCount,
"input size = %s != %s = inputCount", repeatedInputs.size(), inputCount);

MapSqlParameterSource params = new MapSqlParameterSource();
Stream.of(commonArguments.getParameterNames())
.forEach(name -> copyTypeAndValue(commonArguments, name, params, name));
Expand All @@ -145,10 +195,11 @@ <T> Stream<T> queryForStream(NamedParameterJdbcTemplate template,
params, name.replace(namePlaceholder, textIndex)));
});

return template.queryForStream(expandedTemplate, params, mapper);
return database.queryForStream(expandedTemplate, params, mapper);
}

private void copyTypeAndValue(MapSqlParameterSource source, String sourceName, MapSqlParameterSource target, String targetName) {
private static void copyTypeAndValue(MapSqlParameterSource source, String sourceName,
MapSqlParameterSource target, String targetName) {
target.addValue(targetName, source.getValue(sourceName));
int type = source.getSqlType(sourceName);
if (type != TYPE_UNKNOWN) {
Expand All @@ -175,11 +226,13 @@ public String toString() {
* This is a terminal operation on {@code input} (it's spliterator is requested), but its elements are
* only accessed when the return stream is processed.
*
* @param input
* @param input a stream of elements to be chunked.
* @param chunkSize the size of each chunk.
* @param <T> the type of elements in input.
* @return a new stream of lists. The returned lists can be modified, but that
* doesn't have any impact on the source of input.
* doesn't have any impact on the source of input.
* The stream is non-null, and preserves the ordered/immutable/concurrent/distinct
* properties of the input stream.
*/
static <T> Stream<List<T>> chunkStream(Stream<T> input, int chunkSize) {
// inspired by https://stackoverflow.com/a/59164175/600500
Expand Down Expand Up @@ -214,4 +267,20 @@ public List<T> next() {
}
}, characteristics), false);
}

private static void sortDescending(int[] templateSizes) {
// there is no Arrays.sort with comparator (or with flag to tell "descending"), so we sort it normally and then reverse it.
Arrays.sort(templateSizes);
reverse(templateSizes);
}

private static void reverse(int[] array) {
// https://stackoverflow.com/a/3523066/600500
for(int left = 0, right = array.length -1; left < right; left++, right --) {
int temp = array[left];
array[left] = array[right];
array[right] = temp;
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ public void dropTable() {
@Test
public void testStreamEvents() {
QueryStatementBatcher<Integer> batcher = new QueryStatementBatcher<>(
"INSERT INTO x (a, b) VALUES ", "(:a#, :b#)", ", ", " RETURNING id",
"INSERT INTO x (a, b) VALUES ", "(:a#, :b#)", " RETURNING id",
(row, n) -> row.getInt("id"),
51, 13, 4, 1);
MapSqlParameterSource commonArguments = new MapSqlParameterSource();
Expand All @@ -44,7 +44,7 @@ public void testStreamEvents() {


List<Integer> resultList = batcher.queryForStream(
jdbcTemplate, commonArguments, repeatedInputs.stream())
jdbcTemplate, repeatedInputs.stream())
.collect(Collectors.toList());
assertThat(resultList, hasSize(expectedCount));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,33 +10,37 @@ public class QueryStatementBatcherTest
{
@Test
public void testComposeTemplateInsert() {
QueryStatementBatcher<Integer> batcher = new QueryStatementBatcher<>(
"INSERT INTO x (a, b) VALUES ",
"(:a#, :b#)",
", ",
" RETURNING id",
(row, n) -> row.getInt("id")
);

assertThat(batcher.composeTemplate(1), is("INSERT INTO x (a, b) VALUES (:a0, :b0) RETURNING id"));
assertThat(batcher.composeTemplate(2), is("INSERT INTO x (a, b) VALUES (:a0, :b0), (:a1, :b1) RETURNING id"));

String prefix = "INSERT INTO x (a, b) VALUES ";
String repeated = "(:a#, :b#)";
String placeholder = "#";
String separator = ", ";
String suffix = " RETURNING id";

assertThat(QueryStatementBatcher.composeTemplate(1, prefix, repeated, placeholder, separator, suffix),
is("INSERT INTO x (a, b) VALUES (:a0, :b0) RETURNING id"));
assertThat(QueryStatementBatcher.composeTemplate(2, prefix, repeated, placeholder, separator, suffix),
is("INSERT INTO x (a, b) VALUES (:a0, :b0), (:a1, :b1) RETURNING id"));
}

@Test
public void testComposeTemplateSelectWhere() {
QueryStatementBatcher<Void> batcher = new QueryStatementBatcher<>(
"SELECT a, b FROM x WHERE id IN (", ":id#", ", ", ")",
(row, n) -> null
);
String prefix = "SELECT a, b FROM x WHERE id IN (";
String repeated = ":id#";
String separator = ", ";
String placeholder = "#";
String suffix = ")";

assertThat(batcher.composeTemplate(1), is("SELECT a, b FROM x WHERE id IN (:id0)"));
assertThat(batcher.composeTemplate(2), is("SELECT a, b FROM x WHERE id IN (:id0, :id1)"));
assertThat(QueryStatementBatcher.composeTemplate(1, prefix, repeated, placeholder, separator, suffix),
is("SELECT a, b FROM x WHERE id IN (:id0)"));
assertThat(QueryStatementBatcher.composeTemplate(2, prefix, repeated, placeholder, separator, suffix),
is("SELECT a, b FROM x WHERE id IN (:id0, :id1)"));
}

@Test
public void testCreateSubTemplates() {
QueryStatementBatcher<Void> batcher = new QueryStatementBatcher<>(
"SELECT a, b FROM x WHERE id IN (", ":id#", ", ", ")",
"SELECT a, b FROM x WHERE id IN (", ":id#", ")",
(row, n) -> null,
21, 6, 1);
assertThat(batcher.subTemplates, hasSize(3));
Expand Down

0 comments on commit c6941de

Please sign in to comment.