Skip to content

Commit

Permalink
Support merge in Postgresql connector
Browse files Browse the repository at this point in the history
Co-Authored-By: Grzegorz Kokosiński <[email protected]>
  • Loading branch information
chenjian2664 and kokosing committed Dec 9, 2024
1 parent c37f5f4 commit e88e2b1
Show file tree
Hide file tree
Showing 24 changed files with 439 additions and 110 deletions.
12 changes: 12 additions & 0 deletions docs/src/main/sphinx/connector/postgresql.md
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,18 @@ catalog named `sales` using the configured connector.
```{include} non-transactional-insert.fragment
```

### Non-transactional MERGE

The connector supports adding rows using {doc}`MERGE statements </sql/merge>`.
However, the connector only support merge modifying directly to the target
table at current, to use merge you need to set the `merge.non-transactional-merge.enabled`
catalog property or the corresponding `non_transactional_merge_enabled` catalog session property to
`true`.

Note that with this property enabled, data can be corrupted in rare cases where
exceptions occur during the merge operation. With transactions disabled, no
rollback can be performed.

(postgresql-fte-support)=
### Fault-tolerant execution support

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1584,6 +1584,12 @@ public boolean isLimitGuaranteed(ConnectorSession session)
throw new TrinoException(JDBC_ERROR, "limitFunction() is implemented without isLimitGuaranteed()");
}

@Override
public boolean supportsMerge()
{
return false;
}

@Override
public String quoted(String name)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -354,6 +354,12 @@ public boolean isLimitGuaranteed(ConnectorSession session)
return delegate.isLimitGuaranteed(session);
}

@Override
public boolean supportsMerge()
{
return delegate.supportsMerge();
}

@Override
public Optional<JdbcTableHandle> getTableHandle(ConnectorSession session, SchemaTableName schemaTableName)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@
import static io.trino.plugin.jdbc.JdbcMetadataSessionProperties.isJoinPushdownEnabled;
import static io.trino.plugin.jdbc.JdbcMetadataSessionProperties.isTopNPushdownEnabled;
import static io.trino.plugin.jdbc.JdbcWriteSessionProperties.isNonTransactionalInsert;
import static io.trino.plugin.jdbc.JdbcWriteSessionProperties.isNonTransactionalMerge;
import static io.trino.spi.StandardErrorCode.NOT_SUPPORTED;
import static io.trino.spi.connector.RetryMode.NO_RETRIES;
import static io.trino.spi.connector.RowChangeParadigm.CHANGE_ONLY_UPDATED_COLUMNS;
Expand Down Expand Up @@ -1297,11 +1298,24 @@ public ColumnHandle getMergeRowIdColumnHandle(ConnectorSession session, Connecto
@Override
public ConnectorMergeTableHandle beginMerge(ConnectorSession session, ConnectorTableHandle tableHandle, Map<Integer, Collection<ColumnHandle>> updateColumnHandles, RetryMode retryMode)
{
if (retryMode != NO_RETRIES) {
throw new TrinoException(NOT_SUPPORTED, "This connector does not support MERGE with fault-tolerant execution");
}

if (!jdbcClient.supportsMerge()) {
throw new TrinoException(NOT_SUPPORTED, MODIFYING_ROWS_MESSAGE);
}

if (!isNonTransactionalMerge(session)) {
throw new TrinoException(NOT_SUPPORTED, "Non-transactional MERGE is disabled");
}

JdbcTableHandle handle = (JdbcTableHandle) tableHandle;
checkArgument(handle.isNamedRelation(), "Merge target must be named relation table");

List<JdbcColumnHandle> primaryKeys = jdbcClient.getPrimaryKeys(session, handle.getRequiredNamedRelation().getRemoteTableName());
if (primaryKeys.isEmpty()) {
throw new TrinoException(NOT_SUPPORTED, MODIFYING_ROWS_MESSAGE);
throw new TrinoException(NOT_SUPPORTED, "The connector can not perform merge on the target table without primary keys");
}

SchemaTableName schemaTableName = handle.getRequiredNamedRelation().getSchemaTableName();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -353,6 +353,12 @@ public boolean isLimitGuaranteed(ConnectorSession session)
return delegate().isLimitGuaranteed(session);
}

@Override
public boolean supportsMerge()
{
return delegate().supportsMerge();
}

@Override
public Optional<String> getTableComment(ResultSet resultSet)
throws SQLException
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,8 @@ Optional<PreparedQuery> legacyImplementJoin(

boolean isLimitGuaranteed(ConnectorSession session);

boolean supportsMerge();

default Optional<String> getTableComment(ResultSet resultSet)
throws SQLException
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,10 @@ public class JdbcWriteConfig
// This means that the write operation can fail and leave the table in an inconsistent state.
private boolean nonTransactionalInsert;

// Do not create temporary table during merge.
// This means that the write operation can fail and leave the table in an inconsistent state.
private boolean nonTransactionalMerge;

@Min(1)
@Max(MAX_ALLOWED_WRITE_BATCH_SIZE)
public int getWriteBatchSize()
Expand Down Expand Up @@ -59,6 +63,20 @@ public JdbcWriteConfig setNonTransactionalInsert(boolean nonTransactionalInsert)
return this;
}

public boolean isNonTransactionalMerge()
{
return nonTransactionalMerge;
}

@Config("merge.non-transactional-merge.enabled")
@ConfigDescription("Enables support for non-transactional MERGE. " +
"This means that the write operation can fail and leave the table in an inconsistent state.")
public JdbcWriteConfig setNonTransactionalMerge(boolean nonTransactionalMerge)
{
this.nonTransactionalMerge = nonTransactionalMerge;
return this;
}

@Min(1)
@Max(128)
public int getWriteParallelism()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ public class JdbcWriteSessionProperties
{
public static final String WRITE_BATCH_SIZE = "write_batch_size";
public static final String NON_TRANSACTIONAL_INSERT = "non_transactional_insert";
public static final String NON_TRANSACTIONAL_MERGE = "non_transactional_merge";
public static final String WRITE_PARALLELISM = "write_parallelism";

private final List<PropertyMetadata<?>> properties;
Expand All @@ -49,9 +50,14 @@ public JdbcWriteSessionProperties(JdbcWriteConfig writeConfig)
false))
.add(booleanProperty(
NON_TRANSACTIONAL_INSERT,
"Do not use temporary table on insert to table",
"Enables support for non-transactional MERGE",
writeConfig.isNonTransactionalInsert(),
false))
.add(booleanProperty(
NON_TRANSACTIONAL_MERGE,
"Do not use temporary table on merge",
writeConfig.isNonTransactionalMerge(),
false))
.add(integerProperty(
WRITE_PARALLELISM,
"Maximum number of parallel write tasks",
Expand Down Expand Up @@ -81,6 +87,11 @@ public static boolean isNonTransactionalInsert(ConnectorSession session)
return session.getProperty(NON_TRANSACTIONAL_INSERT, Boolean.class);
}

public static boolean isNonTransactionalMerge(ConnectorSession session)
{
return session.getProperty(NON_TRANSACTIONAL_MERGE, Boolean.class);
}

private static void validateWriteBatchSize(int maxBatchSize)
{
if (maxBatchSize < 1) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,13 @@ public boolean isLimitGuaranteed(ConnectorSession session)
return delegate.isLimitGuaranteed(session);
}

@Override
public boolean supportsMerge()
{
// there should be no remote database interaction
return delegate.supportsMerge();
}

@Override
public Optional<String> getTableComment(ResultSet resultSet)
throws SQLException
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -440,6 +440,12 @@ public boolean isLimitGuaranteed(ConnectorSession session)
return delegate().isLimitGuaranteed(session);
}

@Override
public boolean supportsMerge()
{
return delegate().supportsMerge();
}

@Override
public void createSchema(ConnectorSession session, String schemaName)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1875,8 +1875,8 @@ public void testConstantUpdateWithVarcharEqualityPredicates()
public void testConstantUpdateWithVarcharInequalityPredicates()
{
skipTestUnless(hasBehavior(SUPPORTS_CREATE_TABLE) && hasBehavior(SUPPORTS_UPDATE));
try (TestTable table = new TestTable(getQueryRunner()::execute, "test_update_varchar", "(col1 INT, col2 varchar(1))", ImmutableList.of("1, 'a'", "2, 'A'"))) {
if (!hasBehavior(SUPPORTS_PREDICATE_PUSHDOWN_WITH_VARCHAR_INEQUALITY)) {
try (TestTable table = createTestTableForWrites("test_update_varchar", "(col1 INT, col2 varchar(1))", ImmutableList.of("1, 'a'", "2, 'A'"), "col2")) {
if (!hasBehavior(SUPPORTS_PREDICATE_PUSHDOWN_WITH_VARCHAR_INEQUALITY) && !hasBehavior(SUPPORTS_ROW_LEVEL_UPDATE)) {
assertQueryFails("UPDATE " + table.getName() + " SET col1 = 20 WHERE col2 != 'A'", MODIFYING_ROWS_MESSAGE);
return;
}
Expand All @@ -1890,8 +1890,8 @@ public void testConstantUpdateWithVarcharInequalityPredicates()
public void testConstantUpdateWithVarcharGreaterAndLowerPredicate()
{
skipTestUnless(hasBehavior(SUPPORTS_CREATE_TABLE) && hasBehavior(SUPPORTS_UPDATE));
try (TestTable table = new TestTable(getQueryRunner()::execute, "test_update_varchar", "(col1 INT, col2 varchar(1))", ImmutableList.of("1, 'a'", "2, 'A'"))) {
if (!hasBehavior(SUPPORTS_PREDICATE_PUSHDOWN_WITH_VARCHAR_INEQUALITY)) {
try (TestTable table = createTestTableForWrites("test_update_varchar", "(col1 INT, col2 varchar(1))", ImmutableList.of("1, 'a'", "2, 'A'"), "col2")) {
if (!hasBehavior(SUPPORTS_PREDICATE_PUSHDOWN_WITH_VARCHAR_INEQUALITY) && !hasBehavior(SUPPORTS_ROW_LEVEL_UPDATE)) {
assertQueryFails("UPDATE " + table.getName() + " SET col1 = 20 WHERE col2 > 'A'", MODIFYING_ROWS_MESSAGE);
assertQueryFails("UPDATE " + table.getName() + " SET col1 = 20 WHERE col2 < 'A'", MODIFYING_ROWS_MESSAGE);
return;
Expand Down Expand Up @@ -1943,14 +1943,14 @@ public void testDeleteWithVarcharInequalityPredicate()
{
skipTestUnless(hasBehavior(SUPPORTS_CREATE_TABLE) && hasBehavior(SUPPORTS_ROW_LEVEL_DELETE));
// TODO (https://github.com/trinodb/trino/issues/5901) Use longer table name once Oracle version is updated
try (TestTable table = new TestTable(getQueryRunner()::execute, "test_delete_varchar", "(col varchar(1))", ImmutableList.of("'a'", "'A'", "null"))) {
try (TestTable table = createTestTableForWrites("test_delete_varchar", "(col varchar(1), pk int)", ImmutableList.of("'a', 0", "'A', 1", "null, 2"), "pk")) {
if (!hasBehavior(SUPPORTS_PREDICATE_PUSHDOWN_WITH_VARCHAR_INEQUALITY) && !hasBehavior(SUPPORTS_MERGE)) {
assertQueryFails("DELETE FROM " + table.getName() + " WHERE col != 'A'", MODIFYING_ROWS_MESSAGE);
return;
}

assertUpdate("DELETE FROM " + table.getName() + " WHERE col != 'A'", 1);
assertQuery("SELECT * FROM " + table.getName(), "VALUES 'A', null");
assertQuery("SELECT col FROM " + table.getName(), "VALUES 'A', null");
}
}

Expand All @@ -1959,17 +1959,17 @@ public void testDeleteWithVarcharGreaterAndLowerPredicate()
{
skipTestUnless(hasBehavior(SUPPORTS_CREATE_TABLE) && hasBehavior(SUPPORTS_ROW_LEVEL_DELETE));
// TODO (https://github.com/trinodb/trino/issues/5901) Use longer table name once Oracle version is updated
try (TestTable table = new TestTable(getQueryRunner()::execute, "test_delete_varchar", "(col varchar(1))", ImmutableList.of("'0'", "'a'", "'A'", "'b'", "null"))) {
try (TestTable table = createTestTableForWrites("test_delete_varchar", "(col varchar(1), pk int)", ImmutableList.of("'0', 0", "'a', 1", "'A', 2", "'b', 3", "null, 4"), "pk")) {
if (!hasBehavior(SUPPORTS_PREDICATE_PUSHDOWN_WITH_VARCHAR_INEQUALITY) && !hasBehavior(SUPPORTS_MERGE)) {
assertQueryFails("DELETE FROM " + table.getName() + " WHERE col < 'A'", MODIFYING_ROWS_MESSAGE);
assertQueryFails("DELETE FROM " + table.getName() + " WHERE col > 'A'", MODIFYING_ROWS_MESSAGE);
return;
}

assertUpdate("DELETE FROM " + table.getName() + " WHERE col < 'A'", 1);
assertQuery("SELECT * FROM " + table.getName(), "VALUES 'a', 'A', 'b', null");
assertQuery("SELECT col FROM " + table.getName(), "VALUES 'a', 'A', 'b', null");
assertUpdate("DELETE FROM " + table.getName() + " WHERE col > 'A'", 2);
assertQuery("SELECT * FROM " + table.getName(), "VALUES 'A', null");
assertQuery("SELECT col FROM " + table.getName(), "VALUES 'A', null");
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@ public void testDefaults()
assertRecordedDefaults(recordDefaults(JdbcWriteConfig.class)
.setWriteBatchSize(1000)
.setWriteParallelism(8)
.setNonTransactionalInsert(false));
.setNonTransactionalInsert(false)
.setNonTransactionalMerge(false));
}

@Test
Expand All @@ -43,12 +44,14 @@ public void testExplicitPropertyMappings()
Map<String, String> properties = ImmutableMap.<String, String>builder()
.put("write.batch-size", "24")
.put("insert.non-transactional-insert.enabled", "true")
.put("merge.non-transactional-merge.enabled", "true")
.put("write.parallelism", "16")
.buildOrThrow();

JdbcWriteConfig expected = new JdbcWriteConfig()
.setWriteBatchSize(24)
.setNonTransactionalInsert(true)
.setNonTransactionalMerge(true)
.setWriteParallelism(16);

assertFullMapping(properties, expected);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -283,9 +283,9 @@ public void verifySupportsRowLevelUpdateDeclaration()
}

@Override
protected String createTableForWrites(String createTable)
protected void createTableForWrites(String createTable, String tableName, Optional<String> primaryKey, OptionalInt updateCount)
{
return createTable + " WITH (transactional = true)";
assertUpdate(createTable + " WITH (transactional = true)");
}

@Override
Expand Down
Loading

0 comments on commit e88e2b1

Please sign in to comment.