Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fault Tolerant Execution for PostgreSQL and MySQL connectors #14445

Merged
merged 4 commits into from
Oct 22, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ jobs:
run: |
set -x
# Fake identity to tag rebased commits with.
# These commits will be discarded, but the important part is that `git rebase` executes commands between all commits.
# These commits will be discarded, but the important part is that `git rebase` executes commands between all commits.
git config user.name "Compile all commits builder"
git config user.email "[email protected]"
# Show the entire PR branch and the base ref all the way to the fork point.
Expand Down Expand Up @@ -477,6 +477,8 @@ jobs:
- { modules: testing/trino-faulttolerant-tests, profile: test-fault-tolerant-hive-1 }
- { modules: testing/trino-faulttolerant-tests, profile: test-fault-tolerant-hive-2 }
- { modules: testing/trino-faulttolerant-tests, profile: test-fault-tolerant-iceberg }
- { modules: testing/trino-faulttolerant-tests, profile: test-fault-tolerant-postgresql }
- { modules: testing/trino-faulttolerant-tests, profile: test-fault-tolerant-mysql }
- { modules: testing/trino-tests }
EOF
./.github/bin/build-matrix-from-impacted.py -v -i gib-impacted.log -m .github/test-matrix.yaml -o matrix.json
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import io.trino.spi.Page;
import io.trino.spi.block.Block;
import io.trino.spi.connector.ConnectorMergeSink;
import io.trino.split.PageSinkId;
import io.trino.split.PageSinkManager;
import io.trino.sql.planner.plan.PlanNodeId;
import io.trino.sql.planner.plan.TableWriterNode.MergeTarget;
Expand Down Expand Up @@ -59,7 +60,7 @@ public Operator createOperator(DriverContext driverContext)
{
checkState(!closed, "Factory is already closed");
OperatorContext context = driverContext.addOperatorContext(operatorId, planNodeId, MergeWriterOperator.class.getSimpleName());
ConnectorMergeSink mergeSink = pageSinkManager.createMergeSink(session, target.getMergeHandle().orElseThrow());
ConnectorMergeSink mergeSink = pageSinkManager.createMergeSink(session, target.getMergeHandle().orElseThrow(), PageSinkId.fromTaskId(driverContext.getTaskId()));
return new MergeWriterOperator(context, mergeSink, pagePreprocessor);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import io.trino.spi.block.RunLengthEncodedBlock;
import io.trino.spi.connector.ConnectorPageSink;
import io.trino.spi.type.Type;
import io.trino.split.PageSinkId;
import io.trino.split.PageSinkManager;
import io.trino.sql.planner.plan.PlanNodeId;
import io.trino.sql.planner.plan.TableWriterNode;
Expand Down Expand Up @@ -112,22 +113,22 @@ public Operator createOperator(DriverContext driverContext)
OperatorContext context = driverContext.addOperatorContext(operatorId, planNodeId, TableWriterOperator.class.getSimpleName());
Operator statisticsAggregationOperator = statisticsAggregationOperatorFactory.createOperator(driverContext);
boolean statisticsCpuTimerEnabled = !(statisticsAggregationOperator instanceof DevNullOperator) && isStatisticsCpuTimerEnabled(session);
return new TableWriterOperator(context, createPageSink(), columnChannels, statisticsAggregationOperator, types, statisticsCpuTimerEnabled);
return new TableWriterOperator(context, createPageSink(driverContext), columnChannels, statisticsAggregationOperator, types, statisticsCpuTimerEnabled);
}

private ConnectorPageSink createPageSink()
private ConnectorPageSink createPageSink(DriverContext driverContext)
{
if (target instanceof CreateTarget) {
return pageSinkManager.createPageSink(session, ((CreateTarget) target).getHandle());
return pageSinkManager.createPageSink(session, ((CreateTarget) target).getHandle(), PageSinkId.fromTaskId(driverContext.getTaskId()));
}
if (target instanceof InsertTarget) {
return pageSinkManager.createPageSink(session, ((InsertTarget) target).getHandle());
return pageSinkManager.createPageSink(session, ((InsertTarget) target).getHandle(), PageSinkId.fromTaskId(driverContext.getTaskId()));
}
if (target instanceof TableWriterNode.RefreshMaterializedViewTarget) {
return pageSinkManager.createPageSink(session, ((TableWriterNode.RefreshMaterializedViewTarget) target).getInsertHandle());
return pageSinkManager.createPageSink(session, ((TableWriterNode.RefreshMaterializedViewTarget) target).getInsertHandle(), PageSinkId.fromTaskId(driverContext.getTaskId()));
}
if (target instanceof TableWriterNode.TableExecuteTarget) {
return pageSinkManager.createPageSink(session, ((TableWriterNode.TableExecuteTarget) target).getExecuteHandle());
return pageSinkManager.createPageSink(session, ((TableWriterNode.TableExecuteTarget) target).getExecuteHandle(), PageSinkId.fromTaskId(driverContext.getTaskId()));
}
throw new UnsupportedOperationException("Unhandled target type: " + target.getClass().getName());
}
Expand Down
47 changes: 47 additions & 0 deletions core/trino-main/src/main/java/io/trino/split/PageSinkId.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.split;

import io.trino.execution.TaskId;
import io.trino.spi.connector.ConnectorPageSinkId;

import static com.google.common.base.Preconditions.checkArgument;

public class PageSinkId
implements ConnectorPageSinkId
{
private final long id;

public static PageSinkId fromTaskId(TaskId taskId)
{
long stageId = taskId.getStageId().getId();
mwd410 marked this conversation as resolved.
Show resolved Hide resolved
long partitionId = taskId.getPartitionId();
checkArgument(partitionId == (partitionId & 0x00FFFFFF), "partitionId is out of allowable range");
long attemptId = taskId.getAttemptId();
checkArgument(attemptId == (attemptId & 0xFF), "attemptId is out of allowable range");
long id = (stageId << 32) + (partitionId << 8) + attemptId;
return new PageSinkId(id);
}

private PageSinkId(long id)
{
this.id = id;
}

@Override
public long getId()
{
return this.id;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import io.trino.metadata.TableHandle;
import io.trino.spi.connector.ConnectorMergeSink;
import io.trino.spi.connector.ConnectorPageSink;
import io.trino.spi.connector.ConnectorPageSinkId;
import io.trino.spi.connector.ConnectorPageSinkProvider;
import io.trino.spi.connector.ConnectorSession;

Expand All @@ -42,35 +43,35 @@ public PageSinkManager(CatalogServiceProvider<ConnectorPageSinkProvider> pageSin
}

@Override
public ConnectorPageSink createPageSink(Session session, OutputTableHandle tableHandle)
public ConnectorPageSink createPageSink(Session session, OutputTableHandle tableHandle, ConnectorPageSinkId pageSinkId)
{
ConnectorSession connectorSession = session.toConnectorSession(tableHandle.getCatalogHandle());
return providerFor(tableHandle.getCatalogHandle()).createPageSink(tableHandle.getTransactionHandle(), connectorSession, tableHandle.getConnectorHandle());
return providerFor(tableHandle.getCatalogHandle()).createPageSink(tableHandle.getTransactionHandle(), connectorSession, tableHandle.getConnectorHandle(), pageSinkId);
}

@Override
public ConnectorPageSink createPageSink(Session session, InsertTableHandle tableHandle)
public ConnectorPageSink createPageSink(Session session, InsertTableHandle tableHandle, ConnectorPageSinkId pageSinkId)
{
// assumes connectorId and catalog are the same
ConnectorSession connectorSession = session.toConnectorSession(tableHandle.getCatalogHandle());
return providerFor(tableHandle.getCatalogHandle()).createPageSink(tableHandle.getTransactionHandle(), connectorSession, tableHandle.getConnectorHandle());
return providerFor(tableHandle.getCatalogHandle()).createPageSink(tableHandle.getTransactionHandle(), connectorSession, tableHandle.getConnectorHandle(), pageSinkId);
}

@Override
public ConnectorPageSink createPageSink(Session session, TableExecuteHandle tableHandle)
public ConnectorPageSink createPageSink(Session session, TableExecuteHandle tableHandle, ConnectorPageSinkId pageSinkId)
{
// assumes connectorId and catalog are the same
ConnectorSession connectorSession = session.toConnectorSession(tableHandle.getCatalogHandle());
return providerFor(tableHandle.getCatalogHandle()).createPageSink(tableHandle.getTransactionHandle(), connectorSession, tableHandle.getConnectorHandle());
return providerFor(tableHandle.getCatalogHandle()).createPageSink(tableHandle.getTransactionHandle(), connectorSession, tableHandle.getConnectorHandle(), pageSinkId);
}

@Override
public ConnectorMergeSink createMergeSink(Session session, MergeHandle mergeHandle)
public ConnectorMergeSink createMergeSink(Session session, MergeHandle mergeHandle, ConnectorPageSinkId pageSinkId)
{
// assumes connectorId and catalog are the same
TableHandle tableHandle = mergeHandle.getTableHandle();
ConnectorSession connectorSession = session.toConnectorSession(tableHandle.getCatalogHandle());
return providerFor(tableHandle.getCatalogHandle()).createMergeSink(tableHandle.getTransaction(), connectorSession, mergeHandle.getConnectorMergeHandle());
return providerFor(tableHandle.getCatalogHandle()).createMergeSink(tableHandle.getTransaction(), connectorSession, mergeHandle.getConnectorMergeHandle(), pageSinkId);
}

private ConnectorPageSinkProvider providerFor(CatalogHandle catalogHandle)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,23 +20,24 @@
import io.trino.metadata.TableExecuteHandle;
import io.trino.spi.connector.ConnectorMergeSink;
import io.trino.spi.connector.ConnectorPageSink;
import io.trino.spi.connector.ConnectorPageSinkId;

public interface PageSinkProvider
{
/*
* Used for CTAS
*/
ConnectorPageSink createPageSink(Session session, OutputTableHandle tableHandle);
ConnectorPageSink createPageSink(Session session, OutputTableHandle tableHandle, ConnectorPageSinkId pageSinkId);

/*
* Used to insert into an existing table
*/
ConnectorPageSink createPageSink(Session session, InsertTableHandle tableHandle);
ConnectorPageSink createPageSink(Session session, InsertTableHandle tableHandle, ConnectorPageSinkId pageSinkId);

ConnectorPageSink createPageSink(Session session, TableExecuteHandle tableHandle);
ConnectorPageSink createPageSink(Session session, TableExecuteHandle tableHandle, ConnectorPageSinkId pageSinkId);

/*
* Used to write the result of SQL MERGE to an existing table
*/
ConnectorMergeSink createMergeSink(Session session, MergeHandle mergeHandle);
ConnectorMergeSink createMergeSink(Session session, MergeHandle mergeHandle, ConnectorPageSinkId pageSinkId);
}
58 changes: 58 additions & 0 deletions core/trino-main/src/test/java/io/trino/split/TestPageSinkId.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.split;

import io.trino.execution.StageId;
import io.trino.execution.TaskId;
import io.trino.spi.QueryId;
import org.testng.annotations.Test;

import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.testng.Assert.assertEquals;

public class TestPageSinkId
{
private PageSinkId fromTaskId(int stageId, int partitionId, int attemptId)
{
return PageSinkId.fromTaskId(new TaskId(new StageId(new QueryId("query"), stageId), partitionId, attemptId));
}

@Test
public void testFromTaskId()
{
PageSinkId pageSinkId = fromTaskId(1, 2, 3);
long expected = (1L << 32) + (2L << 8) + 3L;
assertEquals(pageSinkId.getId(), expected);
}

@Test
public void testFromTaskIdChecks()
{
assertThatThrownBy(() -> {
fromTaskId(1, 1 << 24, 3);
}).hasMessageContaining("partitionId is out of allowable range");

assertThatThrownBy(() -> {
fromTaskId(1, -1, 3);
}).hasMessageContaining("partitionId is negative");

assertThatThrownBy(() -> {
fromTaskId(1, 2, 256);
}).hasMessageContaining("attemptId is out of allowable range");

assertThatThrownBy(() -> {
fromTaskId(1, 2, -1);
}).hasMessageContaining("attemptId is negative");
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.spi.connector;

/**
* Represents an identifier for the associated {@link ConnectorPageSink}
* which will be unique within a particular query.
*/
public interface ConnectorPageSinkId
{
/**
* @return An 8-byte representation of this identifier
*/
long getId();
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,41 @@

public interface ConnectorPageSinkProvider
{
@Deprecated // TODO(Issue #14705): Remove
ConnectorPageSink createPageSink(ConnectorTransactionHandle transactionHandle, ConnectorSession session, ConnectorOutputTableHandle outputTableHandle);

default ConnectorPageSink createPageSink(ConnectorTransactionHandle transactionHandle, ConnectorSession session, ConnectorOutputTableHandle outputTableHandle, ConnectorPageSinkId pageSinkId)
mwd410 marked this conversation as resolved.
Show resolved Hide resolved
{
return createPageSink(transactionHandle, session, outputTableHandle);
}

@Deprecated // TODO(Issue #14705): Remove
ConnectorPageSink createPageSink(ConnectorTransactionHandle transactionHandle, ConnectorSession session, ConnectorInsertTableHandle insertTableHandle);

default ConnectorPageSink createPageSink(ConnectorTransactionHandle transactionHandle, ConnectorSession session, ConnectorInsertTableHandle insertTableHandle, ConnectorPageSinkId pageSinkId)
{
return createPageSink(transactionHandle, session, insertTableHandle);
}

@Deprecated // TODO(Issue #14705): Remove
default ConnectorPageSink createPageSink(ConnectorTransactionHandle transactionHandle, ConnectorSession session, ConnectorTableExecuteHandle tableExecuteHandle)
{
throw new IllegalArgumentException("createPageSink not supported for tableExecuteHandle");
}

default ConnectorPageSink createPageSink(ConnectorTransactionHandle transactionHandle, ConnectorSession session, ConnectorTableExecuteHandle tableExecuteHandle, ConnectorPageSinkId pageSinkId)
{
return createPageSink(transactionHandle, session, tableExecuteHandle);
}

@Deprecated // TODO(Issue #14705): Remove
default ConnectorMergeSink createMergeSink(ConnectorTransactionHandle transactionHandle, ConnectorSession session, ConnectorMergeTableHandle mergeHandle)
{
throw new TrinoException(NOT_SUPPORTED, "This connector does not support SQL MERGE operations");
}

default ConnectorMergeSink createMergeSink(ConnectorTransactionHandle transactionHandle, ConnectorSession session, ConnectorMergeTableHandle mergeHandle, ConnectorPageSinkId pageSinkId)
{
return createMergeSink(transactionHandle, session, mergeHandle);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import io.trino.spi.connector.ConnectorMergeTableHandle;
import io.trino.spi.connector.ConnectorOutputTableHandle;
import io.trino.spi.connector.ConnectorPageSink;
import io.trino.spi.connector.ConnectorPageSinkId;
import io.trino.spi.connector.ConnectorPageSinkProvider;
import io.trino.spi.connector.ConnectorSession;
import io.trino.spi.connector.ConnectorTableExecuteHandle;
Expand Down Expand Up @@ -49,6 +50,14 @@ public ConnectorPageSink createPageSink(ConnectorTransactionHandle transactionHa
}
}

@Override
public ConnectorPageSink createPageSink(ConnectorTransactionHandle transactionHandle, ConnectorSession session, ConnectorOutputTableHandle outputTableHandle, ConnectorPageSinkId pageSinkId)
{
try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(classLoader)) {
return new ClassLoaderSafeConnectorPageSink(delegate.createPageSink(transactionHandle, session, outputTableHandle, pageSinkId), classLoader);
}
}

@Override
public ConnectorPageSink createPageSink(ConnectorTransactionHandle transactionHandle, ConnectorSession session, ConnectorInsertTableHandle insertTableHandle)
{
Expand All @@ -57,6 +66,14 @@ public ConnectorPageSink createPageSink(ConnectorTransactionHandle transactionHa
}
}

@Override
public ConnectorPageSink createPageSink(ConnectorTransactionHandle transactionHandle, ConnectorSession session, ConnectorInsertTableHandle insertTableHandle, ConnectorPageSinkId pageSinkId)
{
try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(classLoader)) {
return new ClassLoaderSafeConnectorPageSink(delegate.createPageSink(transactionHandle, session, insertTableHandle, pageSinkId), classLoader);
}
}

@Override
public ConnectorPageSink createPageSink(ConnectorTransactionHandle transactionHandle, ConnectorSession session, ConnectorTableExecuteHandle tableExecuteHandle)
{
Expand All @@ -65,11 +82,27 @@ public ConnectorPageSink createPageSink(ConnectorTransactionHandle transactionHa
}
}

@Override
public ConnectorPageSink createPageSink(ConnectorTransactionHandle transactionHandle, ConnectorSession session, ConnectorTableExecuteHandle tableExecuteHandle, ConnectorPageSinkId pageSinkId)
{
try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(classLoader)) {
return new ClassLoaderSafeConnectorPageSink(delegate.createPageSink(transactionHandle, session, tableExecuteHandle, pageSinkId), classLoader);
}
}

@Override
public ConnectorMergeSink createMergeSink(ConnectorTransactionHandle transactionHandle, ConnectorSession session, ConnectorMergeTableHandle mergeHandle)
{
try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(classLoader)) {
return new ClassLoaderSafeConnectorMergeSink(delegate.createMergeSink(transactionHandle, session, mergeHandle), classLoader);
}
}

@Override
public ConnectorMergeSink createMergeSink(ConnectorTransactionHandle transactionHandle, ConnectorSession session, ConnectorMergeTableHandle mergeHandle, ConnectorPageSinkId pageSinkId)
{
try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(classLoader)) {
return new ClassLoaderSafeConnectorMergeSink(delegate.createMergeSink(transactionHandle, session, mergeHandle, pageSinkId), classLoader);
}
}
}
Loading