Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add concurrent writes reconciliation for blind append INSERT in Delta Lake #18506

Original file line number Diff line number Diff line change
Expand Up @@ -383,7 +383,7 @@ Optional<TableExecuteHandle> getTableHandleForExecute(
/**
* Finish insert query
*/
Optional<ConnectorOutputMetadata> finishInsert(Session session, InsertTableHandle tableHandle, Collection<Slice> fragments, Collection<ComputedStatistics> computedStatistics);
Optional<ConnectorOutputMetadata> finishInsert(Session session, InsertTableHandle tableHandle, List<TableHandle> sourceTableHandles, Collection<Slice> fragments, Collection<ComputedStatistics> computedStatistics);

/**
* Returns true if materialized view refresh should be delegated to connector
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1153,11 +1153,15 @@ public boolean supportsMissingColumnsOnInsert(Session session, TableHandle table
}

@Override
public Optional<ConnectorOutputMetadata> finishInsert(Session session, InsertTableHandle tableHandle, Collection<Slice> fragments, Collection<ComputedStatistics> computedStatistics)
public Optional<ConnectorOutputMetadata> finishInsert(Session session, InsertTableHandle tableHandle, List<TableHandle> sourceTableHandles, Collection<Slice> fragments, Collection<ComputedStatistics> computedStatistics)
{
CatalogHandle catalogHandle = tableHandle.getCatalogHandle();
ConnectorMetadata metadata = getMetadata(session, catalogHandle);
return metadata.finishInsert(session.toConnectorSession(catalogHandle), tableHandle.getConnectorHandle(), fragments, computedStatistics);
List<ConnectorTableHandle> sourceConnectorHandles = sourceTableHandles.stream()
.filter(handle -> handle.getCatalogHandle().equals(catalogHandle))
.map(TableHandle::getConnectorHandle)
findinpath marked this conversation as resolved.
Show resolved Hide resolved
.collect(toImmutableList());
return metadata.finishInsert(session.toConnectorSession(catalogHandle), tableHandle.getConnectorHandle(), sourceConnectorHandles, fragments, computedStatistics);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4154,8 +4154,8 @@ private static TableFinisher createTableFinisher(Session session, TableFinishNod
if (target instanceof CreateTarget) {
return metadata.finishCreateTable(session, ((CreateTarget) target).getHandle(), fragments, statistics);
}
if (target instanceof InsertTarget) {
return metadata.finishInsert(session, ((InsertTarget) target).getHandle(), fragments, statistics);
if (target instanceof InsertTarget insertTarget) {
return metadata.finishInsert(session, insertTarget.getHandle(), insertTarget.getSourceTableHandles(), fragments, statistics);
}
if (target instanceof TableWriterNode.RefreshMaterializedViewTarget refreshTarget) {
return metadata.finishRefreshMaterializedView(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import java.util.concurrent.atomic.AtomicInteger;

import static com.google.common.base.Verify.verify;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static com.google.common.collect.Iterables.getOnlyElement;
import static io.trino.sql.planner.planprinter.PlanPrinter.textLogicalPlan;
import static java.util.Objects.requireNonNull;
Expand Down Expand Up @@ -171,7 +172,7 @@ public PlanNode visitTableFinish(TableFinishNode node, RewriteContext<Optional<W
PlanNode child = node.getSource();

WriterTarget originalTarget = getWriterTarget(child);
WriterTarget newTarget = createWriterTarget(originalTarget);
WriterTarget newTarget = createWriterTarget(originalTarget, child);

child = context.rewrite(child, Optional.of(newTarget));

Expand Down Expand Up @@ -222,7 +223,7 @@ public WriterTarget getWriterTarget(PlanNode node)
throw new IllegalArgumentException("Invalid child for TableCommitNode: " + node.getClass().getSimpleName());
}

private WriterTarget createWriterTarget(WriterTarget target)
private WriterTarget createWriterTarget(WriterTarget target, PlanNode planNode)
{
// TODO: begin these operations in pre-execution step, not here
// TODO: we shouldn't need to store the schemaTableName in the handles, but there isn't a good way to pass this around with the current architecture
Expand All @@ -241,7 +242,8 @@ private WriterTarget createWriterTarget(WriterTarget target)
metadata.getTableName(session, insert.getHandle()).getSchemaTableName(),
target.supportsMultipleWritersPerPartition(metadata, session),
target.getMaxWriterTasks(metadata, session),
target.getWriterScalingOptions(metadata, session));
target.getWriterScalingOptions(metadata, session),
findSourceTableHandles(planNode));
}
if (target instanceof MergeTarget merge) {
MergeHandle mergeHandle = metadata.beginMerge(session, merge.getHandle());
Expand All @@ -267,6 +269,17 @@ private WriterTarget createWriterTarget(WriterTarget target)
throw new IllegalArgumentException("Unhandled target type: " + target.getClass().getSimpleName());
}

private static List<TableHandle> findSourceTableHandles(PlanNode startNode)
{
return PlanNodeSearcher.searchFrom(startNode)
.where(TableScanNode.class::isInstance)
.findAll()
.stream()
.map(TableScanNode.class::cast)
.map(TableScanNode::getTable)
.collect(toImmutableList());
}

private Optional<TableHandle> findTableScanHandleForTableExecute(PlanNode startNode)
{
List<PlanNode> tableScanNodes = PlanNodeSearcher.searchFrom(startNode)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -407,20 +407,23 @@ public static class InsertTarget
private final boolean multipleWritersPerPartitionSupported;
private final OptionalInt maxWriterTasks;
private final WriterScalingOptions writerScalingOptions;
private final List<TableHandle> sourceTableHandles;

@JsonCreator
public InsertTarget(
@JsonProperty("handle") InsertTableHandle handle,
@JsonProperty("schemaTableName") SchemaTableName schemaTableName,
@JsonProperty("multipleWritersPerPartitionSupported") boolean multipleWritersPerPartitionSupported,
@JsonProperty("maxWriterTasks") OptionalInt maxWriterTasks,
@JsonProperty("writerScalingOptions") WriterScalingOptions writerScalingOptions)
@JsonProperty("writerScalingOptions") WriterScalingOptions writerScalingOptions,
@JsonProperty("sourceTableHandles") List<TableHandle> sourceTableHandles)
{
this.handle = requireNonNull(handle, "handle is null");
this.schemaTableName = requireNonNull(schemaTableName, "schemaTableName is null");
this.multipleWritersPerPartitionSupported = multipleWritersPerPartitionSupported;
this.maxWriterTasks = requireNonNull(maxWriterTasks, "maxWriterTasks is null");
this.writerScalingOptions = requireNonNull(writerScalingOptions, "writerScalingOptions is null");
this.sourceTableHandles = ImmutableList.copyOf(sourceTableHandles);
}

@JsonProperty
Expand Down Expand Up @@ -470,6 +473,12 @@ public WriterScalingOptions getWriterScalingOptions(Metadata metadata, Session s
{
return writerScalingOptions;
}

@JsonProperty
public List<TableHandle> getSourceTableHandles()
{
return sourceTableHandles;
}
}

public static class RefreshMaterializedViewReference
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -330,7 +330,7 @@ public ConnectorInsertTableHandle beginInsert(ConnectorSession session, Connecto
}

@Override
public Optional<ConnectorOutputMetadata> finishInsert(ConnectorSession session, ConnectorInsertTableHandle insertHandle, Collection<Slice> fragments, Collection<ComputedStatistics> computedStatistics)
public Optional<ConnectorOutputMetadata> finishInsert(ConnectorSession session, ConnectorInsertTableHandle insertHandle, List<ConnectorTableHandle> sourceTableHandles, Collection<Slice> fragments, Collection<ComputedStatistics> computedStatistics)
{
return Optional.empty();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -707,6 +707,18 @@ public Optional<ConnectorOutputMetadata> finishInsert(ConnectorSession session,
}
}

@Override
public Optional<ConnectorOutputMetadata> finishInsert(ConnectorSession session, ConnectorInsertTableHandle insertHandle, List<ConnectorTableHandle> sourceTableHandles, Collection<Slice> fragments, Collection<ComputedStatistics> computedStatistics)
{
Span span = startSpan("finishInsert");
if (span.isRecording()) {
span.setAttribute(TrinoAttributes.HANDLE, insertHandle.toString());
}
try (var ignored = scopedSpan(span)) {
return delegate.finishInsert(session, insertHandle, sourceTableHandles, fragments, computedStatistics);
}
}

@Override
public boolean delegateMaterializedViewRefreshToConnector(ConnectorSession session, SchemaTableName viewName)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -696,14 +696,14 @@ public boolean supportsMissingColumnsOnInsert(Session session, TableHandle table
}

@Override
public Optional<ConnectorOutputMetadata> finishInsert(Session session, InsertTableHandle tableHandle, Collection<Slice> fragments, Collection<ComputedStatistics> computedStatistics)
public Optional<ConnectorOutputMetadata> finishInsert(Session session, InsertTableHandle tableHandle, List<TableHandle> sourceTableHandles, Collection<Slice> fragments, Collection<ComputedStatistics> computedStatistics)
{
Span span = startSpan("finishInsert", tableHandle.getCatalogHandle().getCatalogName());
if (span.isRecording()) {
span.setAttribute(TrinoAttributes.TABLE, tableHandle.getConnectorHandle().toString());
}
try (var ignored = scopedSpan(span)) {
return delegate.finishInsert(session, tableHandle, fragments, computedStatistics);
return delegate.finishInsert(session, tableHandle, sourceTableHandles, fragments, computedStatistics);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -474,7 +474,7 @@ public boolean supportsMissingColumnsOnInsert(Session session, TableHandle table
}

@Override
public Optional<ConnectorOutputMetadata> finishInsert(Session session, InsertTableHandle tableHandle, Collection<Slice> fragments, Collection<ComputedStatistics> computedStatistics)
public Optional<ConnectorOutputMetadata> finishInsert(Session session, InsertTableHandle tableHandle, List<TableHandle> sourceTableHandles, Collection<Slice> fragments, Collection<ComputedStatistics> computedStatistics)
{
throw new UnsupportedOperationException();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -850,12 +850,29 @@ default boolean supportsMissingColumnsOnInsert()

/**
* Finish insert query
*
* @deprecated use {@link #finishInsert(ConnectorSession, ConnectorInsertTableHandle, List, Collection, Collection)}
*/
@Deprecated
default Optional<ConnectorOutputMetadata> finishInsert(ConnectorSession session, ConnectorInsertTableHandle insertHandle, Collection<Slice> fragments, Collection<ComputedStatistics> computedStatistics)
{
throw new TrinoException(GENERIC_INTERNAL_ERROR, "ConnectorMetadata beginInsert() is implemented without finishInsert()");
}

/**
* Finish insert query
*/
default Optional<ConnectorOutputMetadata> finishInsert(
ConnectorSession session,
ConnectorInsertTableHandle insertHandle,
List<ConnectorTableHandle> sourceTableHandles,
Collection<Slice> fragments,
Collection<ComputedStatistics> computedStatistics)
{
// Delegate to deprecated SPI to not break existing connectors
return finishInsert(session, insertHandle, fragments, computedStatistics);
}

/**
* Returns true if materialized view refresh should be delegated to connector using {@link ConnectorMetadata#refreshMaterializedView}
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,15 @@
import com.google.cloud.storage.BlobInfo;
import com.google.cloud.storage.Storage;
import com.google.cloud.storage.Storage.BlobTargetOption;
import com.google.cloud.storage.StorageException;
import io.airlift.slice.Slice;
import io.trino.filesystem.Location;
import io.trino.filesystem.TrinoOutputFile;
import io.trino.memory.context.AggregatedMemoryContext;

import java.io.IOException;
import java.io.OutputStream;
import java.net.HttpURLConnection;
import java.nio.file.FileAlreadyExistsException;

import static com.google.common.base.Preconditions.checkArgument;
Expand Down Expand Up @@ -80,6 +82,13 @@ public void createExclusive(Slice content, AggregatedMemoryContext memoryContext
catch (FileAlreadyExistsException e) {
throw e;
}
catch (StorageException e) {
// When the file corresponding to `location` already exists, the operation will fail with the exception message `412 Precondition Failed`
if (e.getCode() == HttpURLConnection.HTTP_PRECON_FAILED) {
throw new FileAlreadyExistsException(location.toString());
}
throw handleGcsException(e, "writing file", location);
}
catch (RuntimeException e) {
throw handleGcsException(e, "writing file", location);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -626,6 +626,14 @@ public Optional<ConnectorOutputMetadata> finishInsert(ConnectorSession session,
}
}

@Override
public Optional<ConnectorOutputMetadata> finishInsert(ConnectorSession session, ConnectorInsertTableHandle insertHandle, List<ConnectorTableHandle> sourceTableHandles, Collection<Slice> fragments, Collection<ComputedStatistics> computedStatistics)
{
try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(classLoader)) {
return delegate.finishInsert(session, insertHandle, sourceTableHandles, fragments, computedStatistics);
}
}

@Override
public boolean delegateMaterializedViewRefreshToConnector(ConnectorSession session, SchemaTableName viewName)
{
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.plugin.deltalake;

import com.google.common.collect.ImmutableList;
import io.trino.plugin.deltalake.transactionlog.CommitInfoEntry;
import io.trino.plugin.deltalake.transactionlog.DeltaLakeTransactionLogEntry;
import io.trino.plugin.deltalake.transactionlog.MetadataEntry;
import io.trino.plugin.deltalake.transactionlog.ProtocolEntry;

import java.util.List;
import java.util.Optional;

import static java.util.Objects.requireNonNull;

public class DeltaLakeCommitSummary
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to update this class when supporting v2 checkpoint #19507? Just asking, no requested change.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reading through https://github.com/delta-io/delta/blob/master/PROTOCOL.md#v2-spec makes me think that there are no expected changes in this class . The way that the add / remove files are organized is transparent to this class.

{
private final List<MetadataEntry> metadataUpdates;
private final Optional<ProtocolEntry> protocol;
private final Optional<Boolean> isBlindAppend;

public DeltaLakeCommitSummary(List<DeltaLakeTransactionLogEntry> transactionLogEntries)
{
requireNonNull(transactionLogEntries, "transactionLogEntries is null");
ImmutableList.Builder<MetadataEntry> metadataUpdatesBuilder = ImmutableList.builder();
Optional<ProtocolEntry> optionalProtocol = Optional.empty();
Optional<CommitInfoEntry> optionalCommitInfo = Optional.empty();

for (DeltaLakeTransactionLogEntry transactionLogEntry : transactionLogEntries) {
if (transactionLogEntry.getMetaData() != null) {
metadataUpdatesBuilder.add(transactionLogEntry.getMetaData());
}
else if (transactionLogEntry.getProtocol() != null) {
optionalProtocol = Optional.of(transactionLogEntry.getProtocol());
}
else if (transactionLogEntry.getCommitInfo() != null) {
optionalCommitInfo = Optional.of(transactionLogEntry.getCommitInfo());
}
}

metadataUpdates = metadataUpdatesBuilder.build();
protocol = optionalProtocol;
isBlindAppend = optionalCommitInfo.flatMap(CommitInfoEntry::isBlindAppend);
}

public List<MetadataEntry> getMetadataUpdates()
{
return metadataUpdates;
}

public Optional<ProtocolEntry> getProtocol()
{
return protocol;
}

public Optional<Boolean> getIsBlindAppend()
{
return isBlindAppend;
}
}
Loading
Loading