Skip to content

Commit

Permalink
Allow bypassing temporary table during insert for jdbc-based connectors
Browse files Browse the repository at this point in the history
This allows INSERT to write data directly to the target table, bypassing
creation of a temporary table and moving data to a target table. This in
some cases could improve performance of INSERT queries for JDBC based
connectors.
  • Loading branch information
wendigo authored and hashhar committed Jul 13, 2021
1 parent e84aeb1 commit 22ddd59
Show file tree
Hide file tree
Showing 5 changed files with 75 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -59,13 +59,15 @@
import java.util.function.Function;

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkState;
import static com.google.common.base.Strings.emptyToNull;
import static com.google.common.base.Strings.isNullOrEmpty;
import static com.google.common.base.Verify.verify;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static com.google.common.collect.ImmutableSet.toImmutableSet;
import static com.google.common.collect.Iterables.getOnlyElement;
import static io.trino.plugin.jdbc.JdbcErrorCode.JDBC_ERROR;
import static io.trino.plugin.jdbc.JdbcMetadataSessionProperties.isNonTransactionalInsert;
import static io.trino.plugin.jdbc.PredicatePushdownController.DISABLE_PUSHDOWN;
import static io.trino.plugin.jdbc.StandardColumnMappings.bigintWriteFunction;
import static io.trino.plugin.jdbc.StandardColumnMappings.booleanWriteFunction;
Expand Down Expand Up @@ -599,7 +601,6 @@ public JdbcOutputTableHandle beginInsertTable(ConnectorSession session, JdbcTabl
try (Connection connection = connectionFactory.openConnection(session)) {
String remoteSchema = identifierMapping.toRemoteSchemaName(identity, connection, schemaTableName.getSchemaName());
String remoteTable = identifierMapping.toRemoteTableName(identity, connection, remoteSchema, schemaTableName.getTableName());
String remoteTemporaryTableName = identifierMapping.toRemoteTableName(identity, connection, remoteSchema, generateTemporaryTableName());
String catalog = connection.getCatalog();

ImmutableList.Builder<String> columnNames = ImmutableList.builder();
Expand All @@ -611,6 +612,18 @@ public JdbcOutputTableHandle beginInsertTable(ConnectorSession session, JdbcTabl
jdbcColumnTypes.add(column.getJdbcTypeHandle());
}

if (isNonTransactionalInsert(session)) {
return new JdbcOutputTableHandle(
catalog,
remoteSchema,
remoteTable,
columnNames.build(),
columnTypes.build(),
Optional.of(jdbcColumnTypes.build()),
remoteTable);
}

String remoteTemporaryTableName = identifierMapping.toRemoteTableName(identity, connection, remoteSchema, generateTemporaryTableName());
copyTableSchema(connection, catalog, remoteSchema, remoteTable, remoteTemporaryTableName, columnNames.build());

return new JdbcOutputTableHandle(
Expand Down Expand Up @@ -683,6 +696,11 @@ protected void renameTable(ConnectorSession session, String catalogName, String
@Override
public void finishInsertTable(ConnectorSession session, JdbcOutputTableHandle handle)
{
if (isNonTransactionalInsert(session)) {
checkState(handle.getTemporaryTableName().equals(handle.getTableName()), "Unexpected use of temporary table when non transactional inserts are enabled");
return;
}

RemoteTableName temporaryTable = new RemoteTableName(
Optional.ofNullable(handle.getCatalogName()),
Optional.ofNullable(handle.getSchemaName()),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,10 @@ public class JdbcMetadataConfig

private int insertBatchSize = 1000;

// Do not create temporary table during insert.
// This means that the write operation can fail and leave the table in an inconsistent state.
private boolean nonTransactionalInsert;

public boolean isAllowDropTable()
{
return allowDropTable;
Expand Down Expand Up @@ -127,4 +131,18 @@ public JdbcMetadataConfig setInsertBatchSize(int insertBatchSize)
this.insertBatchSize = insertBatchSize;
return this;
}

public boolean isNonTransactionalInsert()
{
return nonTransactionalInsert;
}

@Config("insert.non-transactional-insert.enabled")
@ConfigDescription("Do not create temporary table during insert. " +
"This means that the write operation can fail and leave the table in an inconsistent state.")
public JdbcMetadataConfig setNonTransactionalInsert(boolean nonTransactionalInsert)
{
this.nonTransactionalInsert = nonTransactionalInsert;
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ public class JdbcMetadataSessionProperties
public static final String TOPN_PUSHDOWN_ENABLED = "topn_pushdown_enabled";
public static final String DOMAIN_COMPACTION_THRESHOLD = "domain_compaction_threshold";
public static final String INSERT_BATCH_SIZE = "insert_batch_size";
public static final String NON_TRANSACTIONAL_INSERT = "non_transactional_insert";

private final List<PropertyMetadata<?>> properties;

Expand Down Expand Up @@ -73,6 +74,11 @@ public JdbcMetadataSessionProperties(JdbcMetadataConfig jdbcMetadataConfig, @Max
jdbcMetadataConfig.getInsertBatchSize(),
value -> validateInsertBatchSize(value, MAX_ALLOWED_INSERT_BATCH_SIZE),
false))
.add(booleanProperty(
NON_TRANSACTIONAL_INSERT,
"Do not use temporary table on insert to table",
jdbcMetadataConfig.isNonTransactionalInsert(),
false))
.build();
}

Expand Down Expand Up @@ -107,6 +113,11 @@ public static int getInsertBatchSize(ConnectorSession session)
return session.getProperty(INSERT_BATCH_SIZE, Integer.class);
}

public static boolean isNonTransactionalInsert(ConnectorSession session)
{
return session.getProperty(NON_TRANSACTIONAL_INSERT, Boolean.class);
}

private static void validateDomainCompactionThreshold(int domainCompactionThreshold, Optional<Integer> maxDomainCompactionThreshold)
{
if (domainCompactionThreshold < 1) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1260,6 +1260,26 @@ public void testDeleteWithVarcharPredicate()
throw new SkipException("This is implemented by testDeleteWithVarcharEqualityPredicate");
}

@Test
public void testInsertWithoutTemporaryTable()
{
if (!hasBehavior(SUPPORTS_CREATE_TABLE)) {
throw new SkipException("CREATE TABLE is required for testing non-transactional write support");
}
Session session = Session.builder(getSession())
.setCatalogSessionProperty(getSession().getCatalog().orElseThrow(), "non_transactional_insert", "false")
.build();

try (TestTable table = new TestTable(
getQueryRunner()::execute,
"test_bypass_temp",
"(a varchar(36), b bigint)")) {
String values = String.join(",", buildRowsForInsert(5000));
assertUpdate(session, "INSERT INTO " + table.getName() + " (a, b) VALUES " + values, 5000);
assertQuery("SELECT COUNT(*) FROM " + table.getName(), format("VALUES %d", 5000));
}
}

@Test(dataProvider = "batchSizeAndTotalNumberOfRowsToInsertDataProvider")
public void testInsertBatchSizeSessionProperty(Integer batchSize, Integer numberOfRows)
{
Expand All @@ -1274,13 +1294,13 @@ public void testInsertBatchSizeSessionProperty(Integer batchSize, Integer number
getQueryRunner()::execute,
"test_insert_batch_size",
"(a varchar(36), b bigint)")) {
String values = String.join(",", makeValuesForInsertBatchSizeSessionPropertyTest(numberOfRows));
String values = String.join(",", buildRowsForInsert(numberOfRows));
assertUpdate(session, "INSERT INTO " + table.getName() + " (a, b) VALUES " + values, numberOfRows);
assertQuery("SELECT COUNT(*) FROM " + table.getName(), format("VALUES %d", numberOfRows));
}
}

private static List<String> makeValuesForInsertBatchSizeSessionPropertyTest(int numberOfRows)
private static List<String> buildRowsForInsert(int numberOfRows)
{
List<String> result = new ArrayList<>(numberOfRows);
for (int i = 0; i < numberOfRows; i++) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@ public void testDefaults()
.setAggregationPushdownEnabled(true)
.setTopNPushdownEnabled(true)
.setDomainCompactionThreshold(32)
.setInsertBatchSize(1000));
.setInsertBatchSize(1000)
.setNonTransactionalInsert(false));
}

@Test
Expand All @@ -49,6 +50,7 @@ public void testExplicitPropertyMappings()
.put("domain-compaction-threshold", "42")
.put("topn-pushdown.enabled", "false")
.put("insert.batch-size", "24")
.put("insert.non-transactional-insert.enabled", "true")
.build();

JdbcMetadataConfig expected = new JdbcMetadataConfig()
Expand All @@ -57,7 +59,8 @@ public void testExplicitPropertyMappings()
.setAggregationPushdownEnabled(false)
.setTopNPushdownEnabled(false)
.setDomainCompactionThreshold(42)
.setInsertBatchSize(24);
.setInsertBatchSize(24)
.setNonTransactionalInsert(true);

assertFullMapping(properties, expected);
}
Expand Down

0 comments on commit 22ddd59

Please sign in to comment.