Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Extend JdbcSplit with additional predicate field #71

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,8 @@ public ConnectorSplitSource getSplits(JdbcTableLayoutHandle layoutHandle)
tableHandle.getCatalogName(),
tableHandle.getSchemaName(),
tableHandle.getTableName(),
layoutHandle.getTupleDomain());
layoutHandle.getTupleDomain(),
Optional.empty());
return new FixedSplitSource(ImmutableList.of(jdbcSplit));
}

Expand Down Expand Up @@ -258,7 +259,8 @@ public PreparedStatement buildSql(Connection connection, JdbcSplit split, List<J
split.getSchemaName(),
split.getTableName(),
columnHandles,
split.getTupleDomain());
split.getTupleDomain(),
split.getAdditionalPredicate());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import javax.annotation.Nullable;

import java.util.List;
import java.util.Optional;

import static java.util.Objects.requireNonNull;

Expand All @@ -35,20 +36,23 @@ public class JdbcSplit
private final String schemaName;
private final String tableName;
private final TupleDomain<ColumnHandle> tupleDomain;
private final Optional<String> additionalPredicate;

@JsonCreator
public JdbcSplit(
@JsonProperty("connectorId") String connectorId,
@JsonProperty("catalogName") @Nullable String catalogName,
@JsonProperty("schemaName") @Nullable String schemaName,
@JsonProperty("tableName") String tableName,
@JsonProperty("tupleDomain") TupleDomain<ColumnHandle> tupleDomain)
@JsonProperty("tupleDomain") TupleDomain<ColumnHandle> tupleDomain,
@JsonProperty("additionalProperty") Optional<String> additionalPredicate)
{
this.connectorId = requireNonNull(connectorId, "connector id is null");
this.catalogName = catalogName;
this.schemaName = schemaName;
this.tableName = requireNonNull(tableName, "table name is null");
this.tupleDomain = requireNonNull(tupleDomain, "tupleDomain is null");
this.additionalPredicate = requireNonNull(additionalPredicate, "additionalPredicate is null");
}

@JsonProperty
Expand Down Expand Up @@ -83,6 +87,12 @@ public TupleDomain<ColumnHandle> getTupleDomain()
return tupleDomain;
}

@JsonProperty
public Optional<String> getAdditionalPredicate()
{
return additionalPredicate;
}

@Override
public boolean isRemotelyAccessible()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkState;
Expand Down Expand Up @@ -90,7 +91,15 @@ public QueryBuilder(String quote)
this.quote = requireNonNull(quote, "quote is null");
}

public PreparedStatement buildSql(JdbcClient client, Connection connection, String catalog, String schema, String table, List<JdbcColumnHandle> columns, TupleDomain<ColumnHandle> tupleDomain)
public PreparedStatement buildSql(
JdbcClient client,
Connection connection,
String catalog,
String schema,
String table,
List<JdbcColumnHandle> columns,
TupleDomain<ColumnHandle> tupleDomain,
Optional<String> additionalPredicate)
throws SQLException
{
StringBuilder sql = new StringBuilder();
Expand Down Expand Up @@ -118,6 +127,12 @@ public PreparedStatement buildSql(JdbcClient client, Connection connection, Stri
List<TypeAndValue> accumulator = new ArrayList<>();

List<String> clauses = toConjuncts(columns, tupleDomain, accumulator);
if (additionalPredicate.isPresent()) {
clauses = ImmutableList.<String>builder()
.addAll(clauses)
.add(additionalPredicate.get())
.build();
}
if (!clauses.isEmpty()) {
sql.append(" WHERE ")
.append(Joiner.on(" AND ").join(clauses));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import java.time.LocalDateTime;
import java.util.List;
import java.util.Locale;
import java.util.Optional;

import static io.airlift.slice.Slices.utf8Slice;
import static io.airlift.testing.Assertions.assertContains;
Expand Down Expand Up @@ -196,7 +197,7 @@ public void testNormalBuildSql()
.build());

Connection connection = database.getConnection();
try (PreparedStatement preparedStatement = new QueryBuilder("\"").buildSql(jdbcClient, connection, "", "", "test_table", columns, tupleDomain);
try (PreparedStatement preparedStatement = new QueryBuilder("\"").buildSql(jdbcClient, connection, "", "", "test_table", columns, tupleDomain, Optional.empty());
ResultSet resultSet = preparedStatement.executeQuery()) {
ImmutableSet.Builder<Long> builder = ImmutableSet.builder();
while (resultSet.next()) {
Expand All @@ -219,7 +220,7 @@ public void testBuildSqlWithFloat()
false)));

Connection connection = database.getConnection();
try (PreparedStatement preparedStatement = new QueryBuilder("\"").buildSql(jdbcClient, connection, "", "", "test_table", columns, tupleDomain);
try (PreparedStatement preparedStatement = new QueryBuilder("\"").buildSql(jdbcClient, connection, "", "", "test_table", columns, tupleDomain, Optional.empty());
ResultSet resultSet = preparedStatement.executeQuery()) {
ImmutableSet.Builder<Long> longBuilder = ImmutableSet.builder();
ImmutableSet.Builder<Float> floatBuilder = ImmutableSet.builder();
Expand All @@ -245,7 +246,7 @@ public void testBuildSqlWithVarchar()
false)));

Connection connection = database.getConnection();
try (PreparedStatement preparedStatement = new QueryBuilder("\"").buildSql(jdbcClient, connection, "", "", "test_table", columns, tupleDomain);
try (PreparedStatement preparedStatement = new QueryBuilder("\"").buildSql(jdbcClient, connection, "", "", "test_table", columns, tupleDomain, Optional.empty());
ResultSet resultSet = preparedStatement.executeQuery()) {
ImmutableSet.Builder<String> builder = ImmutableSet.builder();
while (resultSet.next()) {
Expand Down Expand Up @@ -273,7 +274,7 @@ public void testBuildSqlWithChar()
false)));

Connection connection = database.getConnection();
try (PreparedStatement preparedStatement = new QueryBuilder("\"").buildSql(jdbcClient, connection, "", "", "test_table", columns, tupleDomain);
try (PreparedStatement preparedStatement = new QueryBuilder("\"").buildSql(jdbcClient, connection, "", "", "test_table", columns, tupleDomain, Optional.empty());
ResultSet resultSet = preparedStatement.executeQuery()) {
ImmutableSet.Builder<String> builder = ImmutableSet.builder();
while (resultSet.next()) {
Expand Down Expand Up @@ -306,7 +307,7 @@ public void testBuildSqlWithDateTime()
false)));

Connection connection = database.getConnection();
try (PreparedStatement preparedStatement = new QueryBuilder("\"").buildSql(jdbcClient, connection, "", "", "test_table", columns, tupleDomain);
try (PreparedStatement preparedStatement = new QueryBuilder("\"").buildSql(jdbcClient, connection, "", "", "test_table", columns, tupleDomain, Optional.empty());
ResultSet resultSet = preparedStatement.executeQuery()) {
ImmutableSet.Builder<Date> dateBuilder = ImmutableSet.builder();
ImmutableSet.Builder<Time> timeBuilder = ImmutableSet.builder();
Expand Down Expand Up @@ -339,7 +340,7 @@ public void testBuildSqlWithTimestamp()
false)));

Connection connection = database.getConnection();
try (PreparedStatement preparedStatement = new QueryBuilder("\"").buildSql(jdbcClient, connection, "", "", "test_table", columns, tupleDomain);
try (PreparedStatement preparedStatement = new QueryBuilder("\"").buildSql(jdbcClient, connection, "", "", "test_table", columns, tupleDomain, Optional.empty());
ResultSet resultSet = preparedStatement.executeQuery()) {
ImmutableSet.Builder<Timestamp> builder = ImmutableSet.builder();
while (resultSet.next()) {
Expand All @@ -366,7 +367,7 @@ public void testEmptyBuildSql()
columns.get(1), Domain.onlyNull(DOUBLE)));

Connection connection = database.getConnection();
try (PreparedStatement preparedStatement = new QueryBuilder("\"").buildSql(jdbcClient, connection, "", "", "test_table", columns, tupleDomain);
try (PreparedStatement preparedStatement = new QueryBuilder("\"").buildSql(jdbcClient, connection, "", "", "test_table", columns, tupleDomain, Optional.empty());
ResultSet resultSet = preparedStatement.executeQuery()) {
assertEquals(resultSet.next(), false);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,14 @@
import io.prestosql.spi.predicate.TupleDomain;
import org.testng.annotations.Test;

import java.util.Optional;

import static io.airlift.json.JsonCodec.jsonCodec;
import static org.testng.Assert.assertEquals;

public class TestJdbcSplit
{
private final JdbcSplit split = new JdbcSplit("connectorId", "catalog", "schemaName", "tableName", TupleDomain.all());
private final JdbcSplit split = new JdbcSplit("connectorId", "catalog", "schemaName", "tableName", TupleDomain.all(), Optional.empty());

@Test
public void testAddresses()
Expand All @@ -32,7 +34,7 @@ public void testAddresses()
assertEquals(split.getAddresses(), ImmutableList.of());
assertEquals(split.isRemotelyAccessible(), true);

JdbcSplit jdbcSplit = new JdbcSplit("connectorId", "catalog", "schemaName", "tableName", TupleDomain.all());
JdbcSplit jdbcSplit = new JdbcSplit("connectorId", "catalog", "schemaName", "tableName", TupleDomain.all(), Optional.empty());
assertEquals(jdbcSplit.getAddresses(), ImmutableList.of());
}

Expand Down