Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use RemoteTableName in JdbcOutputTableHandle #23090

Merged
merged 1 commit into from
Aug 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -931,9 +931,7 @@ protected JdbcOutputTableHandle createTable(
}

return new JdbcOutputTableHandle(
catalog,
remoteSchema,
remoteTable,
new RemoteTableName(Optional.ofNullable(catalog), Optional.ofNullable(remoteSchema), remoteTable),
columnNames.build(),
columnTypes.build(),
Optional.empty(),
Expand Down Expand Up @@ -1015,9 +1013,7 @@ protected JdbcOutputTableHandle beginInsertTable(

if (isNonTransactionalInsert(session)) {
return new JdbcOutputTableHandle(
catalog,
remoteSchema,
remoteTable,
new RemoteTableName(Optional.ofNullable(catalog), Optional.ofNullable(remoteSchema), remoteTable),
columnNames.build(),
columnTypes.build(),
Optional.of(jdbcColumnTypes.build()),
Expand All @@ -1039,9 +1035,7 @@ protected JdbcOutputTableHandle beginInsertTable(
}

return new JdbcOutputTableHandle(
catalog,
remoteSchema,
remoteTable,
new RemoteTableName(Optional.ofNullable(catalog), Optional.ofNullable(remoteSchema), remoteTable),
columnNames.build(),
columnTypes.build(),
Optional.of(jdbcColumnTypes.build()),
Expand Down Expand Up @@ -1075,10 +1069,10 @@ public void commitCreateTable(ConnectorSession session, JdbcOutputTableHandle ha
else {
renameTable(
session,
handle.getCatalogName(),
handle.getSchemaName(),
handle.getRemoteTableName().getCatalogName().orElse(null),
handle.getRemoteTableName().getSchemaName().orElse(null),
handle.getTemporaryTableName().orElseThrow(() -> new IllegalStateException("Temporary table name missing")),
new SchemaTableName(handle.getSchemaName(), handle.getTableName()));
handle.getRemoteTableName().getSchemaTableName());
}
}

Expand Down Expand Up @@ -1123,8 +1117,8 @@ private RemoteTableName constructPageSinkIdsTable(ConnectorSession session, Conn
verify(handle.getPageSinkIdColumnName().isPresent(), "Output table handle's pageSinkIdColumn is empty");

RemoteTableName pageSinkTable = new RemoteTableName(
Optional.ofNullable(handle.getCatalogName()),
Optional.ofNullable(handle.getSchemaName()),
handle.getRemoteTableName().getCatalogName(),
handle.getRemoteTableName().getSchemaName(),
generateTemporaryTableName(session));

int maxBatchSize = getWriteBatchSize(session);
Expand Down Expand Up @@ -1173,13 +1167,9 @@ public void finishInsertTable(ConnectorSession session, JdbcOutputTableHandle ha
}

RemoteTableName temporaryTable = new RemoteTableName(
Optional.ofNullable(handle.getCatalogName()),
Optional.ofNullable(handle.getSchemaName()),
handle.getRemoteTableName().getCatalogName(),
handle.getRemoteTableName().getSchemaName(),
handle.getTemporaryTableName().orElseThrow());
RemoteTableName targetTable = new RemoteTableName(
Optional.ofNullable(handle.getCatalogName()),
Optional.ofNullable(handle.getSchemaName()),
handle.getTableName());

// We conditionally create more than the one table, so keep a list of the tables that need to be dropped.
Closer closer = Closer.create();
Expand All @@ -1192,7 +1182,7 @@ public void finishInsertTable(ConnectorSession session, JdbcOutputTableHandle ha
.collect(joining(", "));

String insertSql = format("INSERT INTO %s (%s) SELECT %s FROM %s temp_table",
postProcessInsertTableNameClause(session, quoted(targetTable)),
postProcessInsertTableNameClause(session, quoted(handle.getRemoteTableName())),
columns,
columns,
quoted(temporaryTable));
Expand Down Expand Up @@ -1358,8 +1348,8 @@ public void rollbackCreateTable(ConnectorSession session, JdbcOutputTableHandle
if (handle.getTemporaryTableName().isPresent()) {
dropTable(session,
new RemoteTableName(
Optional.ofNullable(handle.getCatalogName()),
Optional.ofNullable(handle.getSchemaName()),
handle.getRemoteTableName().getCatalogName(),
handle.getRemoteTableName().getSchemaName(),
handle.getTemporaryTableName().get()),
true);
}
Expand All @@ -1383,7 +1373,10 @@ public String buildInsertSql(JdbcOutputTableHandle handle, List<WriteFunction> c
checkArgument(handle.getColumnNames().size() == columnWriters.size(), "handle and columnWriters mismatch: %s, %s", handle, columnWriters);
return format(
"INSERT INTO %s (%s%s) VALUES (%s%s)",
quoted(handle.getCatalogName(), handle.getSchemaName(), handle.getTemporaryTableName().orElseGet(handle::getTableName)),
quoted(
chenjian2664 marked this conversation as resolved.
Show resolved Hide resolved
handle.getRemoteTableName().getCatalogName().orElse(null),
handle.getRemoteTableName().getSchemaName().orElse(null),
handle.getTemporaryTableName().orElseGet(() -> handle.getRemoteTableName().getTableName())),
handle.getColumnNames().stream()
.map(this::quoted)
.collect(joining(", ")),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -388,7 +388,7 @@ public JdbcProcedureHandle getProcedureHandle(ConnectorSession session, Procedur
public void commitCreateTable(ConnectorSession session, JdbcOutputTableHandle handle, Set<Long> pageSinkIds)
{
delegate.commitCreateTable(session, handle, pageSinkIds);
invalidateTableCaches(new SchemaTableName(handle.getSchemaName(), handle.getTableName()));
invalidateTableCaches(handle.getRemoteTableName().getSchemaTableName());
}

@Override
Expand All @@ -401,7 +401,7 @@ public JdbcOutputTableHandle beginInsertTable(ConnectorSession session, JdbcTabl
public void finishInsertTable(ConnectorSession session, JdbcOutputTableHandle handle, Set<Long> pageSinkIds)
{
delegate.finishInsertTable(session, handle, pageSinkIds);
onDataChanged(new SchemaTableName(handle.getSchemaName(), handle.getTableName()));
onDataChanged(handle.getRemoteTableName().getSchemaTableName());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,22 +19,18 @@
import io.trino.spi.connector.ConnectorInsertTableHandle;
import io.trino.spi.connector.ConnectorOutputTableHandle;
import io.trino.spi.type.Type;
import jakarta.annotation.Nullable;

import java.util.List;
import java.util.Objects;
import java.util.Optional;

import static com.google.common.base.Preconditions.checkArgument;
import static java.lang.String.format;
import static java.util.Objects.requireNonNull;

public class JdbcOutputTableHandle
implements ConnectorOutputTableHandle, ConnectorInsertTableHandle
{
private final String catalogName;
private final String schemaName;
private final String tableName;
private final RemoteTableName remoteTableName;
private final List<String> columnNames;
private final List<Type> columnTypes;
private final Optional<List<JdbcTypeHandle>> jdbcColumnTypes;
Expand All @@ -43,18 +39,14 @@ public class JdbcOutputTableHandle

@JsonCreator
public JdbcOutputTableHandle(
@JsonProperty("catalogName") @Nullable String catalogName,
@JsonProperty("schemaName") @Nullable String schemaName,
@JsonProperty("tableName") String tableName,
@JsonProperty("remoteTableName") RemoteTableName remoteTableName,
chenjian2664 marked this conversation as resolved.
Show resolved Hide resolved
@JsonProperty("columnNames") List<String> columnNames,
@JsonProperty("columnTypes") List<Type> columnTypes,
@JsonProperty("jdbcColumnTypes") Optional<List<JdbcTypeHandle>> jdbcColumnTypes,
@JsonProperty("temporaryTableName") Optional<String> temporaryTableName,
@JsonProperty("pageSinkIdColumnName") Optional<String> pageSinkIdColumnName)
{
this.catalogName = catalogName;
this.schemaName = schemaName;
this.tableName = requireNonNull(tableName, "tableName is null");
this.remoteTableName = requireNonNull(remoteTableName, "remoteTableName is null");
this.temporaryTableName = requireNonNull(temporaryTableName, "temporaryTableName is null");

requireNonNull(columnNames, "columnNames is null");
Expand All @@ -68,23 +60,9 @@ public JdbcOutputTableHandle(
}

@JsonProperty
@Nullable
public String getCatalogName()
public RemoteTableName getRemoteTableName()
{
return catalogName;
}

@JsonProperty
@Nullable
public String getSchemaName()
{
return schemaName;
}

@JsonProperty
public String getTableName()
{
return tableName;
return remoteTableName;
}

@JsonProperty
Expand Down Expand Up @@ -120,16 +98,14 @@ public Optional<String> getPageSinkIdColumnName()
@Override
public String toString()
{
return format("jdbc:%s.%s.%s", catalogName, schemaName, tableName);
return "jdbc:%s".formatted(remoteTableName);
}

@Override
public int hashCode()
{
return Objects.hash(
catalogName,
schemaName,
tableName,
remoteTableName,
columnNames,
columnTypes,
jdbcColumnTypes,
Expand All @@ -147,9 +123,7 @@ public boolean equals(Object obj)
return false;
}
JdbcOutputTableHandle other = (JdbcOutputTableHandle) obj;
return Objects.equals(this.catalogName, other.catalogName) &&
Objects.equals(this.schemaName, other.schemaName) &&
Objects.equals(this.tableName, other.tableName) &&
return Objects.equals(this.remoteTableName, other.remoteTableName) &&
Objects.equals(this.columnNames, other.columnNames) &&
Objects.equals(this.columnTypes, other.columnTypes) &&
Objects.equals(this.jdbcColumnTypes, other.jdbcColumnTypes) &&
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Joiner;
import io.trino.spi.connector.SchemaTableName;

import java.util.Objects;
import java.util.Optional;
Expand Down Expand Up @@ -58,6 +59,11 @@ public String getTableName()
return tableName;
}

public SchemaTableName getSchemaTableName()
{
return new SchemaTableName(schemaName.orElseThrow(), tableName);
}

@Override
public boolean equals(Object o)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,7 @@ public class TestJdbcOutputTableHandle
public void testJsonRoundTrip()
{
JdbcOutputTableHandle handleForCreate = new JdbcOutputTableHandle(
"catalog",
"schema",
"table",
new RemoteTableName(Optional.of("catalog"), Optional.of("schema"), "table"),
ImmutableList.of("abc", "xyz"),
ImmutableList.of(VARCHAR, VARCHAR),
Optional.empty(),
Expand All @@ -41,9 +39,7 @@ public void testJsonRoundTrip()
assertJsonRoundTrip(OUTPUT_TABLE_CODEC, handleForCreate);

JdbcOutputTableHandle handleForInsert = new JdbcOutputTableHandle(
"catalog",
"schema",
"table",
new RemoteTableName(Optional.of("catalog"), Optional.of("schema"), "table"),
ImmutableList.of("abc", "xyz"),
ImmutableList.of(VARCHAR, VARCHAR),
Optional.of(ImmutableList.of(JDBC_VARCHAR, JDBC_VARCHAR)),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import io.trino.plugin.jdbc.LongWriteFunction;
import io.trino.plugin.jdbc.PreparedQuery;
import io.trino.plugin.jdbc.QueryBuilder;
import io.trino.plugin.jdbc.RemoteTableName;
import io.trino.plugin.jdbc.WriteFunction;
import io.trino.plugin.jdbc.WriteMapping;
import io.trino.plugin.jdbc.aggregation.ImplementAvgFloatingPoint;
Expand Down Expand Up @@ -415,8 +416,7 @@ public JdbcOutputTableHandle beginCreateTable(ConnectorSession session, Connecto
execute(session, connection, sql);

return new IgniteOutputTableHandle(
schemaTableName.getSchemaName(),
schemaTableName.getTableName(),
new RemoteTableName(Optional.empty(), Optional.of(schemaTableName.getSchemaName()), schemaTableName.getTableName()),
columnNames,
columnTypes.build(),
Optional.empty(),
Expand Down Expand Up @@ -568,7 +568,7 @@ public String buildInsertSql(JdbcOutputTableHandle handle, List<WriteFunction> c
}
return format(
"INSERT INTO %s (%s) VALUES (%s)",
quoted(null, handle.getSchemaName(), handle.getTableName()),
quoted(handle.getRemoteTableName()),
columns,
params);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,8 +104,7 @@ public ConnectorInsertTableHandle beginInsert(ConnectorSession session, Connecto

RemoteTableName remoteTableName = handle.asPlainTable().getRemoteTableName();
return new IgniteOutputTableHandle(
remoteTableName.getSchemaName().orElse(null),
remoteTableName.getTableName(),
remoteTableName,
columnNames.build(),
columnTypes.build(),
Optional.of(columnJdbcTypeHandles.build()),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@
import com.fasterxml.jackson.annotation.JsonProperty;
import io.trino.plugin.jdbc.JdbcOutputTableHandle;
import io.trino.plugin.jdbc.JdbcTypeHandle;
import io.trino.plugin.jdbc.RemoteTableName;
import io.trino.spi.type.Type;
import jakarta.annotation.Nullable;

import java.util.List;
import java.util.Optional;
Expand All @@ -32,14 +32,13 @@ public class IgniteOutputTableHandle

@JsonCreator
public IgniteOutputTableHandle(
@Nullable @JsonProperty("schemaName") String schemaName,
@JsonProperty("tableName") String tableName,
@JsonProperty("remoteTableName") RemoteTableName remoteTableName,
@JsonProperty("columnNames") List<String> columnNames,
@JsonProperty("columnTypes") List<Type> columnTypes,
@JsonProperty("jdbcColumnTypes") Optional<List<JdbcTypeHandle>> jdbcColumnTypes,
@JsonProperty("dummyIdColumn") Optional<String> dummyIdColumn)
{
super("", schemaName, tableName, columnNames, columnTypes, jdbcColumnTypes, Optional.empty(), Optional.empty());
super(remoteTableName, columnNames, columnTypes, jdbcColumnTypes, Optional.empty(), Optional.empty());
this.dummyIdColumn = requireNonNull(dummyIdColumn, "dummyIdColumn is null");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -410,13 +410,13 @@ public String buildInsertSql(JdbcOutputTableHandle handle, List<WriteFunction> c
if (outputHandle.rowkeyColumn().isPresent()) {
String nextId = format(
"NEXT VALUE FOR %s, ",
quoted(null, handle.getSchemaName(), handle.getTableName() + "_sequence"));
quoted(null, handle.getRemoteTableName().getSchemaName().orElse(null), handle.getRemoteTableName().getTableName() + "_sequence"));
params = nextId + params;
columns = outputHandle.rowkeyColumn().get() + ", " + columns;
}
return format(
"UPSERT INTO %s (%s) VALUES (%s)",
quoted(null, handle.getSchemaName(), handle.getTableName()),
quoted(handle.getRemoteTableName()),
columns,
params);
}
Expand Down Expand Up @@ -696,8 +696,7 @@ public JdbcOutputTableHandle beginCreateTable(ConnectorSession session, Connecto
execute(session, sql);

return new PhoenixOutputTableHandle(
schema,
table,
new RemoteTableName(Optional.empty(), Optional.ofNullable(schema), table),
columnNames.build(),
columnTypes.build(),
Optional.empty(),
Expand Down
Loading