Skip to content

Commit

Permalink
Support metadata DELETE in JDBC connectors
Browse files Browse the repository at this point in the history
  • Loading branch information
kokosing committed Jul 2, 2021
1 parent 3013849 commit 825e6d8
Show file tree
Hide file tree
Showing 19 changed files with 367 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.OptionalLong;
import java.util.Set;
import java.util.UUID;
import java.util.function.BiFunction;
Expand Down Expand Up @@ -1003,6 +1004,25 @@ public Map<String, Object> getTableProperties(ConnectorSession session, JdbcTabl
return emptyMap();
}

@Override
public OptionalLong delete(ConnectorSession session, JdbcTableHandle handle)
{
checkArgument(handle.isNamedRelation(), "Unable to delete from synthetic table: %s", handle);
checkArgument(handle.getLimit().isEmpty(), "Unable to delete when limit is set: %s", handle);
checkArgument(handle.getSortOrder().isEmpty(), "Unable to delete when sort order is set: %s", handle);
try (Connection connection = connectionFactory.openConnection(session)) {
verify(connection.getAutoCommit());
QueryBuilder queryBuilder = new QueryBuilder(this);
PreparedQuery preparedQuery = queryBuilder.prepareDelete(session, connection, handle.getRequiredNamedRelation(), handle.getConstraint());
try (PreparedStatement preparedStatement = queryBuilder.prepareStatement(session, connection, preparedQuery)) {
return OptionalLong.of(preparedStatement.executeUpdate());
}
}
catch (SQLException e) {
throw new TrinoException(JDBC_ERROR, e);
}
}

protected String quoted(@Nullable String catalog, @Nullable String schema, String table)
{
StringBuilder sb = new StringBuilder();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.OptionalLong;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
Expand Down Expand Up @@ -451,6 +452,14 @@ public void onDataChanged(JdbcTableHandle handle)
invalidateCache(statisticsCache, key -> key.tableHandle.equals(handle));
}

@Override
public OptionalLong delete(ConnectorSession session, JdbcTableHandle handle)
{
OptionalLong deletedRowsCount = delegate.delete(session, handle);
onDataChanged(handle.getRequiredNamedRelation().getSchemaTableName());
return deletedRowsCount;
}

private JdbcIdentityCacheKey getIdentityKey(ConnectorSession session)
{
return identityMapping.getRemoteUserCacheKey(JdbcIdentity.from(session));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import io.trino.spi.connector.ConnectorOutputTableHandle;
import io.trino.spi.connector.ConnectorSession;
import io.trino.spi.connector.ConnectorTableHandle;
import io.trino.spi.connector.ConnectorTableLayoutHandle;
import io.trino.spi.connector.ConnectorTableMetadata;
import io.trino.spi.connector.ConnectorTableProperties;
import io.trino.spi.connector.ConnectorTableSchema;
Expand Down Expand Up @@ -57,6 +58,7 @@
import io.trino.spi.statistics.ComputedStatistics;
import io.trino.spi.statistics.TableStatistics;

import java.sql.Types;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
Expand All @@ -75,7 +77,9 @@
import static io.trino.plugin.jdbc.JdbcMetadataSessionProperties.isAggregationPushdownEnabled;
import static io.trino.plugin.jdbc.JdbcMetadataSessionProperties.isJoinPushdownEnabled;
import static io.trino.plugin.jdbc.JdbcMetadataSessionProperties.isTopNPushdownEnabled;
import static io.trino.spi.StandardErrorCode.NOT_SUPPORTED;
import static io.trino.spi.StandardErrorCode.PERMISSION_DENIED;
import static io.trino.spi.type.BigintType.BIGINT;
import static java.lang.Math.max;
import static java.util.Objects.requireNonNull;

Expand Down Expand Up @@ -660,6 +664,40 @@ public Optional<ConnectorOutputMetadata> finishInsert(ConnectorSession session,
return Optional.empty();
}

@Override
public ColumnHandle getDeleteRowIdColumnHandle(ConnectorSession session, ConnectorTableHandle tableHandle)
{
// The column is used for row-level delete, which is not supported, but it's required during analysis anyway.
return new JdbcColumnHandle(
"$update_row_id",
new JdbcTypeHandle(Types.BIGINT, Optional.of("bigint"), Optional.empty(), Optional.empty(), Optional.empty(), Optional.empty()),
BIGINT);
}

@Override
public ConnectorTableHandle beginDelete(ConnectorSession session, ConnectorTableHandle tableHandle)
{
throw new TrinoException(NOT_SUPPORTED, "Unsupported delete");
}

@Override
public boolean supportsMetadataDelete(ConnectorSession session, ConnectorTableHandle tableHandle, ConnectorTableLayoutHandle tableLayoutHandle)
{
return true;
}

@Override
public Optional<ConnectorTableHandle> applyDelete(ConnectorSession session, ConnectorTableHandle handle)
{
return Optional.of(handle);
}

@Override
public OptionalLong executeDelete(ConnectorSession session, ConnectorTableHandle handle)
{
return jdbcClient.delete(session, (JdbcTableHandle) handle);
}

@Override
public void setColumnComment(ConnectorSession session, ConnectorTableHandle table, ColumnHandle column, Optional<String> comment)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.OptionalLong;
import java.util.Set;
import java.util.function.Supplier;

Expand Down Expand Up @@ -332,4 +333,10 @@ public Optional<TableScanRedirectApplicationResult> getTableScanRedirection(Conn
{
return delegate().getTableScanRedirection(session, tableHandle);
}

@Override
public OptionalLong delete(ConnectorSession session, JdbcTableHandle handle)
{
return delegate().delete(session, handle);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.OptionalLong;
import java.util.Set;

import static io.trino.spi.StandardErrorCode.NOT_SUPPORTED;
Expand Down Expand Up @@ -171,4 +172,6 @@ default Optional<TableScanRedirectApplicationResult> getTableScanRedirection(Con
{
return Optional.empty();
}

OptionalLong delete(ConnectorSession session, JdbcTableHandle handle);
}
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,23 @@ protected static String formatJoinType(JoinType joinType)
throw new IllegalStateException("Unsupported join type: " + joinType);
}

public PreparedQuery prepareDelete(
ConnectorSession session,
Connection connection,
JdbcNamedRelationHandle baseRelation,
TupleDomain<ColumnHandle> tupleDomain)
{
String sql = "DELETE FROM " + getRelation(baseRelation.getRemoteTableName());

ImmutableList.Builder<QueryParameter> accumulator = ImmutableList.builder();

List<String> clauses = toConjuncts(session, connection, tupleDomain, accumulator::add);
if (!clauses.isEmpty()) {
sql += " WHERE " + Joiner.on(" AND ").join(clauses);
}
return new PreparedQuery(sql, accumulator.build());
}

public PreparedStatement prepareStatement(
ConnectorSession session,
Connection connection,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ public final class JdbcClientStats
private final JdbcApiStats toWriteMapping = new JdbcApiStats();
private final JdbcApiStats implementAggregation = new JdbcApiStats();
private final JdbcApiStats getTableScanRedirection = new JdbcApiStats();
private final JdbcApiStats delete = new JdbcApiStats();

@Managed
@Nested
Expand Down Expand Up @@ -290,4 +291,11 @@ public JdbcApiStats getGetTableScanRedirection()
{
return getTableScanRedirection;
}

@Managed
@Nested
public JdbcApiStats getDelete()
{
return delete;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.OptionalLong;
import java.util.Set;

import static java.util.Objects.requireNonNull;
Expand Down Expand Up @@ -351,4 +352,10 @@ public Optional<TableScanRedirectApplicationResult> getTableScanRedirection(Conn
{
return stats.getGetTableScanRedirection().wrap(() -> delegate().getTableScanRedirection(session, tableHandle));
}

@Override
public OptionalLong delete(ConnectorSession session, JdbcTableHandle handle)
{
return stats.getDelete().wrap(() -> delegate().delete(session, handle));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,20 @@
package io.trino.plugin.jdbc;

import io.trino.testing.BaseConnectorSmokeTest;
import io.trino.testing.TestingConnectorBehavior;

public abstract class BaseJdbcConnectorSmokeTest
extends BaseConnectorSmokeTest {}
extends BaseConnectorSmokeTest
{
@Override
protected boolean hasBehavior(TestingConnectorBehavior connectorBehavior)
{
switch (connectorBehavior) {
case SUPPORTS_DELETE:
return true;

default:
return super.hasBehavior(connectorBehavior);
}
}
}
Loading

0 comments on commit 825e6d8

Please sign in to comment.