Skip to content

Commit

Permalink
Add support for Redshift fault tolerant execution
Browse files Browse the repository at this point in the history
  • Loading branch information
grantatspothero committed Apr 12, 2023
1 parent 98e3d47 commit c0adbdd
Show file tree
Hide file tree
Showing 8 changed files with 205 additions and 6 deletions.
26 changes: 26 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -542,6 +542,7 @@ jobs:
- { modules: plugin/trino-redis }
- { modules: plugin/trino-redshift }
- { modules: plugin/trino-redshift, profile: cloud-tests }
- { modules: testing/trino-faulttolerant-tests, profile: test-fault-tolerant-redshift }
- { modules: plugin/trino-singlestore }
- { modules: plugin/trino-sqlserver }
- { modules: testing/trino-faulttolerant-tests, profile: default }
Expand Down Expand Up @@ -602,6 +603,7 @@ jobs:
&& ! (contains(matrix.modules, 'trino-iceberg') && contains(matrix.profile, 'cloud-tests'))
&& ! (contains(matrix.modules, 'trino-bigquery') && contains(matrix.profile, 'cloud-tests-arrow'))
&& ! (contains(matrix.modules, 'trino-redshift') && contains(matrix.profile, 'cloud-tests'))
&& ! (contains(matrix.modules, 'testing/trino-faulttolerant-tests') && contains(matrix.profile, 'test-fault-tolerant-redshift'))
run: $MAVEN test ${MAVEN_TEST} -pl ${{ matrix.modules }} ${{ matrix.profile != '' && format('-P {0}', matrix.profile) || '' }}
# Additional tests for selected modules
- name: Cloud Delta Lake Tests
Expand Down Expand Up @@ -707,6 +709,30 @@ jobs:
-Dtest.redshift.aws.region="${AWS_REGION}" \
-Dtest.redshift.aws.access-key="${AWS_ACCESS_KEY_ID}" \
-Dtest.redshift.aws.secret-key="${AWS_SECRET_ACCESS_KEY}"
- name: Fault Tolerant Redshift Tests
env:
AWS_REGION: ${{ vars.REDSHIFT_AWS_REGION }}
AWS_ACCESS_KEY_ID: ${{ secrets.REDSHIFT_AWS_ACCESS_KEY_ID }}
AWS_SECRET_ACCESS_KEY: ${{ secrets.REDSHIFT_AWS_SECRET_ACCESS_KEY }}
REDSHIFT_SUBNET_GROUP_NAME: ${{ vars.REDSHIFT_SUBNET_GROUP_NAME }}
REDSHIFT_IAM_ROLES: ${{ vars.REDSHIFT_IAM_ROLES }}
REDSHIFT_VPC_SECURITY_GROUP_IDS: ${{ vars.REDSHIFT_VPC_SECURITY_GROUP_IDS }}
REDSHIFT_S3_TPCH_TABLES_ROOT: ${{ vars.REDSHIFT_S3_TPCH_TABLES_ROOT }}
if: >-
contains(matrix.modules, 'testing/trino-faulttolerant-tests') && contains(matrix.profile, 'test-fault-tolerant-redshift') &&
(env.AWS_ACCESS_KEY_ID != '' || env.REDSHIFT_SUBNET_GROUP_NAME != '')
run: |
source .github/bin/redshift/setup-aws-redshift.sh
$MAVEN test ${MAVEN_TEST} -pl 'testing/trino-faulttolerant-tests' ${{ format('-P {0}', matrix.profile) }} \
-Dtest.redshift.jdbc.user="${REDSHIFT_USER}" \
-Dtest.redshift.jdbc.password="${REDSHIFT_PASSWORD}" \
-Dtest.redshift.jdbc.endpoint="${REDSHIFT_ENDPOINT}:${REDSHIFT_PORT}/" \
-Dtest.redshift.s3.tpch.tables.root="${REDSHIFT_S3_TPCH_TABLES_ROOT}" \
-Dtest.redshift.iam.role="${REDSHIFT_IAM_ROLES}" \
-Dtest.redshift.aws.region="${AWS_REGION}" \
-Dtest.redshift.aws.access-key="${AWS_ACCESS_KEY_ID}" \
-Dtest.redshift.aws.secret-key="${AWS_SECRET_ACCESS_KEY}"
- name: Cleanup ephemeral Redshift Cluster
env:
AWS_REGION: ${{ vars.REDSHIFT_AWS_REGION }}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,7 @@ public RedshiftClient(
RemoteQueryModifier queryModifier,
RedshiftConfig redshiftConfig)
{
super("\"", connectionFactory, queryBuilder, config.getJdbcTypesMappedToVarchar(), identifierMapping, queryModifier, false);
super("\"", connectionFactory, queryBuilder, config.getJdbcTypesMappedToVarchar(), identifierMapping, queryModifier, true);
this.disableAutomaticFetchSize = redshiftConfig.isDisableAutomaticFetchSize();
this.legacyTypeMapping = redshiftConfig.isLegacyTypeMapping();
ConnectorExpressionRewriter<ParameterizedExpression> connectorExpressionRewriter = JdbcConnectorExpressionRewriterBuilder.newBuilder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.function.Consumer;

import static io.airlift.testing.Closeables.closeAllSuppress;
import static io.trino.plugin.tpch.TpchMetadata.TINY_SCHEMA_NAME;
Expand Down Expand Up @@ -79,20 +80,43 @@ public static DistributedQueryRunner createRedshiftQueryRunner(
return createRedshiftQueryRunner(
createSession(),
extraProperties,
Map.of(),
connectorProperties,
tables);
tables,
queryRunner -> {});
}

public static DistributedQueryRunner createRedshiftQueryRunner(
Map<String, String> extraProperties,
Map<String, String> coordinatorProperties,
Map<String, String> connectorProperties,
Iterable<TpchTable<?>> tables,
Consumer<QueryRunner> additionalSetup)
throws Exception
{
return createRedshiftQueryRunner(
createSession(),
extraProperties,
coordinatorProperties,
connectorProperties,
tables,
additionalSetup);
}

public static DistributedQueryRunner createRedshiftQueryRunner(
Session session,
Map<String, String> extraProperties,
Map<String, String> coordinatorProperties,
Map<String, String> connectorProperties,
Iterable<TpchTable<?>> tables)
Iterable<TpchTable<?>> tables,
Consumer<QueryRunner> additionalSetup)
throws Exception
{
DistributedQueryRunner.Builder<?> builder = DistributedQueryRunner.builder(session);
extraProperties.forEach(builder::addExtraProperty);
DistributedQueryRunner runner = builder.build();
DistributedQueryRunner runner = DistributedQueryRunner.builder(session)
.setExtraProperties(extraProperties)
.setCoordinatorProperties(coordinatorProperties)
.setAdditionalSetup(additionalSetup)
.build();
try {
runner.installPlugin(new TpchPlugin());
runner.createCatalog(TPCH_CATALOG, "tpch", Map.of());
Expand Down
13 changes: 13 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -593,6 +593,19 @@
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>io.trino</groupId>
<artifactId>trino-redshift</artifactId>
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>io.trino</groupId>
<artifactId>trino-redshift</artifactId>
<type>test-jar</type>
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>io.trino</groupId>
<artifactId>trino-resource-group-managers</artifactId>
Expand Down
32 changes: 32 additions & 0 deletions testing/trino-faulttolerant-tests/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,19 @@
<scope>test</scope>
</dependency>

<dependency>
<groupId>io.trino</groupId>
<artifactId>trino-redshift</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>io.trino</groupId>
<artifactId>trino-redshift</artifactId>
<type>test-jar</type>
<scope>test</scope>
</dependency>

<dependency>
<groupId>io.trino</groupId>
<artifactId>trino-resource-group-managers</artifactId>
Expand Down Expand Up @@ -490,6 +503,7 @@
<exclude>**/io/trino/faulttolerant/delta/Test*.java</exclude>
<exclude>**/io/trino/faulttolerant/hive/Test*.java</exclude>
<exclude>**/io/trino/faulttolerant/iceberg/Test*.java</exclude>
<exclude>**/io/trino/faulttolerant/redshift/Test*.java</exclude>
</excludes>
</configuration>
</plugin>
Expand Down Expand Up @@ -553,5 +567,23 @@
</plugins>
</build>
</profile>
<profile>
<id>test-fault-tolerant-redshift</id>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<configuration>
<!-- Failure recovery tests spend most of the time waiting for a retry -->
<threadCount>4</threadCount>
<includes>
<include>**/io/trino/faulttolerant/redshift/Test*.java</include>
</includes>
</configuration>
</plugin>
</plugins>
</build>
</profile>
</profiles>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.faulttolerant.redshift;

import com.google.common.collect.ImmutableMap;
import io.trino.operator.RetryPolicy;
import io.trino.plugin.exchange.filesystem.FileSystemExchangePlugin;
import io.trino.plugin.jdbc.BaseJdbcFailureRecoveryTest;
import io.trino.testing.QueryRunner;
import io.trino.tpch.TpchTable;

import java.util.List;
import java.util.Map;

import static io.trino.plugin.redshift.RedshiftQueryRunner.createRedshiftQueryRunner;

public abstract class BaseRedshiftFailureRecoveryTest
extends BaseJdbcFailureRecoveryTest
{
public BaseRedshiftFailureRecoveryTest(RetryPolicy retryPolicy)
{
super(retryPolicy);
}

@Override
protected QueryRunner createQueryRunner(
List<TpchTable<?>> requiredTpchTables,
Map<String, String> configProperties,
Map<String, String> coordinatorProperties)
throws Exception
{
return createRedshiftQueryRunner(
configProperties,
coordinatorProperties,
Map.of(),
requiredTpchTables,
runner -> {
runner.installPlugin(new FileSystemExchangePlugin());
runner.loadExchangeManager("filesystem", ImmutableMap.of(
"exchange.base-directories", System.getProperty("java.io.tmpdir") + "/trino-local-file-system-exchange-manager"));
});
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.faulttolerant.redshift;

import io.trino.operator.RetryPolicy;

public class TestRedshiftQueryFailureRecoveryTest
extends BaseRedshiftFailureRecoveryTest
{
public TestRedshiftQueryFailureRecoveryTest()
{
super(RetryPolicy.QUERY);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.faulttolerant.redshift;

import io.trino.operator.RetryPolicy;

public class TestRedshiftTaskFailureRecoveryTest
extends BaseRedshiftFailureRecoveryTest
{
public TestRedshiftTaskFailureRecoveryTest()
{
super(RetryPolicy.TASK);
}
}

0 comments on commit c0adbdd

Please sign in to comment.