diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml
index 086fc01298ca..913ed2e81b63 100644
--- a/.github/workflows/ci.yml
+++ b/.github/workflows/ci.yml
@@ -740,6 +740,7 @@ jobs:
REDSHIFT_IAM_ROLES: ${{ vars.REDSHIFT_IAM_ROLES }}
REDSHIFT_VPC_SECURITY_GROUP_IDS: ${{ vars.REDSHIFT_VPC_SECURITY_GROUP_IDS }}
REDSHIFT_S3_TPCH_TABLES_ROOT: ${{ vars.REDSHIFT_S3_TPCH_TABLES_ROOT }}
+ REDSHIFT_S3_UNLOAD_ROOT: ${{ vars.REDSHIFT_S3_UNLOAD_ROOT }}
if: >-
contains(matrix.modules, 'trino-redshift') &&
(contains(matrix.profile, 'cloud-tests') || contains(matrix.profile, 'fte-tests')) &&
@@ -752,6 +753,7 @@ jobs:
-Dtest.redshift.jdbc.password="${REDSHIFT_PASSWORD}" \
-Dtest.redshift.jdbc.endpoint="${REDSHIFT_ENDPOINT}:${REDSHIFT_PORT}/" \
-Dtest.redshift.s3.tpch.tables.root="${REDSHIFT_S3_TPCH_TABLES_ROOT}" \
+ -Dtest.redshift.s3.unload.root="${REDSHIFT_S3_UNLOAD_ROOT}" \
-Dtest.redshift.iam.role="${REDSHIFT_IAM_ROLES}" \
-Dtest.redshift.aws.region="${AWS_REGION}" \
-Dtest.redshift.aws.access-key="${AWS_ACCESS_KEY_ID}" \
diff --git a/docs/src/main/sphinx/connector/redshift.md b/docs/src/main/sphinx/connector/redshift.md
index ba21b13134cd..659905f56ff5 100644
--- a/docs/src/main/sphinx/connector/redshift.md
+++ b/docs/src/main/sphinx/connector/redshift.md
@@ -64,6 +64,43 @@ documentation](https://docs.aws.amazon.com/redshift/latest/mgmt/jdbc20-configura
```{include} jdbc-authentication.fragment
```
+### UNLOAD configuration
+
+This feature enables using Amazon S3 to efficiently transfer data out of Redshift
+instead of the default single threaded JDBC based implementation.
+The connector automatically triggers the appropriate `UNLOAD` command
+on Redshift to extract the output from Redshift to the configured
+S3 bucket in the form of Parquet files. These Parquet files are read in parallel
+from S3 to improve latency of reading from Redshift tables. The Parquet
+files will be removed when Trino finishes executing the query. It is recommended
+to define a custom life cycle policy on the S3 bucket used for unloading the
+Redshift query results.
+This feature is supported only when the Redshift cluster and the configured S3
+bucket are in the same AWS region.
+
+The following table describes configuration properties for using
+`UNLOAD` command in Redshift connector. `redshift.unload-location` must be set
+to use `UNLOAD`.
+
+:::{list-table} UNLOAD configuration properties
+:widths: 30, 60
+:header-rows: 1
+
+* - Property value
+ - Description
+* - `redshift.unload-location`
+ - A writeable location in Amazon S3, to be used for temporarily unloading
+ Redshift query results.
+* - `redshift.unload-iam-role`
+ - Optional. Fully specified ARN of the IAM Role attached to the Redshift cluster.
+ Provided role will be used in `UNLOAD` command. IAM role must have access to
+ Redshift cluster and write access to S3 bucket. The default IAM role attached to
+ Redshift cluster is used when this property is not configured.
+:::
+
+Additionally, define appropriate [S3 configurations](/object-storage/file-system-s3)
+except `fs.native-s3.enabled`, required to read Parquet files from S3 bucket.
+
### Multiple Redshift databases or clusters
The Redshift connector can only access a single database within
diff --git a/plugin/trino-redshift/pom.xml b/plugin/trino-redshift/pom.xml
index e755eaa2ae92..8ebcc07a11a8 100644
--- a/plugin/trino-redshift/pom.xml
+++ b/plugin/trino-redshift/pom.xml
@@ -20,6 +20,11 @@
2.1.0.30
+
+ com.fasterxml.jackson.core
+ jackson-databind
+
+
com.google.guava
guava
@@ -30,21 +35,61 @@
guice
+
+ io.airlift
+ bootstrap
+
+
io.airlift
configuration
+
+ io.airlift
+ json
+
+
+
+ io.airlift
+ log
+
+
+
+ io.airlift
+ units
+
+
io.trino
trino-base-jdbc
+
+ io.trino
+ trino-filesystem
+
+
+
+ io.trino
+ trino-filesystem-s3
+
+
io.trino
trino-matching
+
+ io.trino
+ trino-memory-context
+
+
+
+ io.trino
+ trino-parquet
+
+
io.trino
trino-plugin-toolkit
@@ -55,6 +100,16 @@
jakarta.validation-api
+
+ joda-time
+ joda-time
+
+
+
+ org.apache.parquet
+ parquet-column
+
+
org.jdbi
jdbi3-core
@@ -116,19 +171,31 @@
io.airlift
- log
+ log-manager
runtime
- io.airlift
- log-manager
+ software.amazon.awssdk
+ auth
runtime
- io.airlift
- units
+ software.amazon.awssdk
+ aws-core
+ runtime
+
+
+
+ software.amazon.awssdk
+ regions
+ runtime
+
+
+
+ software.amazon.awssdk
+ s3
runtime
@@ -236,9 +303,11 @@
**/TestRedshiftAutomaticJoinPushdown.java
**/TestRedshiftCastPushdown.java
**/TestRedshiftConnectorTest.java
+ **/TestRedshiftUnload.java
**/TestRedshiftConnectorSmokeTest.java
**/TestRedshiftTableStatisticsReader.java
**/TestRedshiftTypeMapping.java
+ **/TestRedshiftUnloadTypeMapping.java
**/Test*FailureRecoveryTest.java
**/Test*FailureRecoverySmokeTest.java
@@ -265,6 +334,8 @@
**/TestRedshiftCastPushdown.java
**/TestRedshiftConnectorSmokeTest.java
+ **/TestRedshiftUnloadTypeMapping.java
+ **/TestRedshiftUnload.java
diff --git a/plugin/trino-redshift/src/main/java/io/trino/plugin/redshift/RedshiftClientModule.java b/plugin/trino-redshift/src/main/java/io/trino/plugin/redshift/RedshiftClientModule.java
index 93962bd5a501..0c6e01999ab1 100644
--- a/plugin/trino-redshift/src/main/java/io/trino/plugin/redshift/RedshiftClientModule.java
+++ b/plugin/trino-redshift/src/main/java/io/trino/plugin/redshift/RedshiftClientModule.java
@@ -15,32 +15,44 @@
import com.amazon.redshift.Driver;
import com.google.inject.Binder;
+import com.google.inject.Key;
import com.google.inject.Provides;
import com.google.inject.Scopes;
import com.google.inject.Singleton;
import io.airlift.configuration.AbstractConfigurationAwareModule;
import io.opentelemetry.api.OpenTelemetry;
+import io.trino.filesystem.s3.S3FileSystemModule;
+import io.trino.plugin.base.metrics.FileFormatDataSourceStats;
import io.trino.plugin.jdbc.BaseJdbcConfig;
import io.trino.plugin.jdbc.ConnectionFactory;
import io.trino.plugin.jdbc.DecimalModule;
import io.trino.plugin.jdbc.DriverConnectionFactory;
import io.trino.plugin.jdbc.ForBaseJdbc;
+import io.trino.plugin.jdbc.ForJdbcDynamicFiltering;
import io.trino.plugin.jdbc.JdbcClient;
+import io.trino.plugin.jdbc.JdbcConnector;
import io.trino.plugin.jdbc.JdbcJoinPushdownSupportModule;
import io.trino.plugin.jdbc.JdbcMetadataConfig;
+import io.trino.plugin.jdbc.JdbcQueryEventListener;
import io.trino.plugin.jdbc.JdbcRecordSetProvider;
+import io.trino.plugin.jdbc.JdbcSplitManager;
import io.trino.plugin.jdbc.JdbcStatisticsConfig;
import io.trino.plugin.jdbc.RemoteQueryCancellationModule;
import io.trino.plugin.jdbc.credential.CredentialProvider;
import io.trino.plugin.jdbc.ptf.Query;
+import io.trino.spi.connector.Connector;
import io.trino.spi.connector.ConnectorRecordSetProvider;
+import io.trino.spi.connector.ConnectorSplitManager;
import io.trino.spi.function.table.ConnectorTableFunction;
import java.util.Properties;
import static com.google.inject.Scopes.SINGLETON;
import static com.google.inject.multibindings.Multibinder.newSetBinder;
+import static com.google.inject.multibindings.OptionalBinder.newOptionalBinder;
+import static io.airlift.configuration.ConditionalModule.conditionalModule;
import static io.airlift.configuration.ConfigBinder.configBinder;
+import static io.trino.plugin.jdbc.JdbcModule.bindSessionPropertiesProvider;
public class RedshiftClientModule
extends AbstractConfigurationAwareModule
@@ -53,13 +65,28 @@ public void setup(Binder binder)
configBinder(binder).bindConfigDefaults(JdbcMetadataConfig.class, config -> config.setBulkListColumns(true));
newSetBinder(binder, ConnectorTableFunction.class).addBinding().toProvider(Query.class).in(SINGLETON);
configBinder(binder).bindConfig(JdbcStatisticsConfig.class);
+ bindSessionPropertiesProvider(binder, RedshiftSessionProperties.class);
install(new DecimalModule());
install(new JdbcJoinPushdownSupportModule());
install(new RemoteQueryCancellationModule());
binder.bind(ConnectorRecordSetProvider.class).to(JdbcRecordSetProvider.class).in(Scopes.SINGLETON);
- binder.bind(RedshiftConnector.class).in(Scopes.SINGLETON);
+ install(conditionalModule(
+ RedshiftConfig.class,
+ config -> config.getUnloadLocation().isPresent(),
+ unloadBinder -> {
+ install(new S3FileSystemModule());
+ unloadBinder.bind(JdbcSplitManager.class).in(Scopes.SINGLETON);
+ unloadBinder.bind(Connector.class).to(RedshiftUnloadConnector.class).in(Scopes.SINGLETON);
+ unloadBinder.bind(FileFormatDataSourceStats.class).in(Scopes.SINGLETON);
+
+ newSetBinder(unloadBinder, JdbcQueryEventListener.class).addBinding().to(RedshiftUnloadJdbcQueryEventListener.class).in(Scopes.SINGLETON);
+
+ newOptionalBinder(unloadBinder, Key.get(ConnectorSplitManager.class, ForJdbcDynamicFiltering.class))
+ .setBinding().to(RedshiftSplitManager.class).in(SINGLETON);
+ },
+ jdbcBinder -> jdbcBinder.bind(Connector.class).to(JdbcConnector.class).in(Scopes.SINGLETON)));
}
@Singleton
diff --git a/plugin/trino-redshift/src/main/java/io/trino/plugin/redshift/RedshiftConfig.java b/plugin/trino-redshift/src/main/java/io/trino/plugin/redshift/RedshiftConfig.java
index 370f31e7dc8b..0d8a193ff5f4 100644
--- a/plugin/trino-redshift/src/main/java/io/trino/plugin/redshift/RedshiftConfig.java
+++ b/plugin/trino-redshift/src/main/java/io/trino/plugin/redshift/RedshiftConfig.java
@@ -17,6 +17,7 @@
import io.airlift.configuration.ConfigDescription;
import io.airlift.configuration.DefunctConfig;
import jakarta.validation.constraints.Min;
+import jakarta.validation.constraints.Pattern;
import java.util.Optional;
@@ -27,6 +28,8 @@
public class RedshiftConfig
{
private Integer fetchSize;
+ private String unloadLocation;
+ private String unloadIamRole;
public Optional<@Min(0) Integer> getFetchSize()
{
@@ -40,4 +43,30 @@ public RedshiftConfig setFetchSize(Integer fetchSize)
this.fetchSize = fetchSize;
return this;
}
+
+ public Optional<@Pattern(regexp = "^s3://[^/]+(/[^/]+)?$", message = "Path shouldn't end with trailing slash") String> getUnloadLocation()
+ {
+ return Optional.ofNullable(unloadLocation);
+ }
+
+ @Config("redshift.unload-location")
+ @ConfigDescription("A writeable location in Amazon S3, to be used for unloading Redshift query results")
+ public RedshiftConfig setUnloadLocation(String unloadLocation)
+ {
+ this.unloadLocation = unloadLocation;
+ return this;
+ }
+
+ public Optional getUnloadIamRole()
+ {
+ return Optional.ofNullable(unloadIamRole);
+ }
+
+ @Config("redshift.unload-iam-role")
+ @ConfigDescription("Fully specified ARN of the IAM Role attached to the Redshift cluster and having access to S3")
+ public RedshiftConfig setUnloadIamRole(String unloadIamRole)
+ {
+ this.unloadIamRole = unloadIamRole;
+ return this;
+ }
}
diff --git a/plugin/trino-redshift/src/main/java/io/trino/plugin/redshift/RedshiftConnectorFactory.java b/plugin/trino-redshift/src/main/java/io/trino/plugin/redshift/RedshiftConnectorFactory.java
index 5312ca010f68..6a2a5e2e563c 100644
--- a/plugin/trino-redshift/src/main/java/io/trino/plugin/redshift/RedshiftConnectorFactory.java
+++ b/plugin/trino-redshift/src/main/java/io/trino/plugin/redshift/RedshiftConnectorFactory.java
@@ -63,6 +63,6 @@ public Connector create(String catalogName, Map requiredConfig,
.setRequiredConfigurationProperties(requiredConfig)
.initialize();
- return injector.getInstance(RedshiftConnector.class);
+ return injector.getInstance(Connector.class);
}
}
diff --git a/plugin/trino-redshift/src/main/java/io/trino/plugin/redshift/RedshiftErrorCode.java b/plugin/trino-redshift/src/main/java/io/trino/plugin/redshift/RedshiftErrorCode.java
index e09279270270..2d9fa999a2ac 100644
--- a/plugin/trino-redshift/src/main/java/io/trino/plugin/redshift/RedshiftErrorCode.java
+++ b/plugin/trino-redshift/src/main/java/io/trino/plugin/redshift/RedshiftErrorCode.java
@@ -23,6 +23,10 @@ public enum RedshiftErrorCode
implements ErrorCodeSupplier
{
REDSHIFT_INVALID_TYPE(0, EXTERNAL),
+ REDSHIFT_PARQUET_BAD_DATA(1, EXTERNAL),
+ REDSHIFT_PARQUET_CURSOR_ERROR(2, EXTERNAL),
+ REDSHIFT_FILESYSTEM_ERROR(3, EXTERNAL),
+ REDSHIFT_S3_CROSS_REGION_UNSUPPORTED(4, EXTERNAL),
/**/;
private final ErrorCode errorCode;
diff --git a/plugin/trino-redshift/src/main/java/io/trino/plugin/redshift/RedshiftPageSourceProvider.java b/plugin/trino-redshift/src/main/java/io/trino/plugin/redshift/RedshiftPageSourceProvider.java
index 60f22c656d29..ff14082a2878 100644
--- a/plugin/trino-redshift/src/main/java/io/trino/plugin/redshift/RedshiftPageSourceProvider.java
+++ b/plugin/trino-redshift/src/main/java/io/trino/plugin/redshift/RedshiftPageSourceProvider.java
@@ -13,6 +13,22 @@
*/
package io.trino.plugin.redshift;
+import com.google.common.collect.ImmutableList;
+import io.trino.filesystem.Location;
+import io.trino.filesystem.TrinoFileSystem;
+import io.trino.filesystem.TrinoFileSystemFactory;
+import io.trino.filesystem.TrinoInputFile;
+import io.trino.parquet.Column;
+import io.trino.parquet.ParquetReaderOptions;
+import io.trino.parquet.metadata.BlockMetadata;
+import io.trino.parquet.metadata.ParquetMetadata;
+import io.trino.parquet.reader.MetadataReader;
+import io.trino.parquet.reader.ParquetReader;
+import io.trino.parquet.reader.RowGroupInfo;
+import io.trino.plugin.base.metrics.FileFormatDataSourceStats;
+import io.trino.plugin.jdbc.JdbcColumnHandle;
+import io.trino.plugin.jdbc.JdbcSplit;
+import io.trino.spi.TrinoException;
import io.trino.spi.connector.ColumnHandle;
import io.trino.spi.connector.ConnectorPageSource;
import io.trino.spi.connector.ConnectorPageSourceProvider;
@@ -23,19 +39,38 @@
import io.trino.spi.connector.ConnectorTransactionHandle;
import io.trino.spi.connector.DynamicFilter;
import io.trino.spi.connector.RecordPageSource;
+import org.apache.parquet.column.ColumnDescriptor;
+import org.apache.parquet.io.MessageColumnIO;
+import org.apache.parquet.schema.MessageType;
+import org.joda.time.DateTimeZone;
+import java.io.IOException;
import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import static io.trino.memory.context.AggregatedMemoryContext.newSimpleAggregatedMemoryContext;
+import static io.trino.parquet.ParquetTypeUtils.constructField;
+import static io.trino.parquet.ParquetTypeUtils.getColumnIO;
+import static io.trino.parquet.ParquetTypeUtils.getDescriptors;
+import static io.trino.parquet.ParquetTypeUtils.lookupColumnByName;
+import static io.trino.parquet.metadata.PrunedBlockMetadata.createPrunedColumnsMetadata;
+import static io.trino.plugin.redshift.RedshiftErrorCode.REDSHIFT_PARQUET_CURSOR_ERROR;
+import static java.lang.String.format;
import static java.util.Objects.requireNonNull;
public class RedshiftPageSourceProvider
implements ConnectorPageSourceProvider
{
private final ConnectorRecordSetProvider recordSetProvider;
+ private final TrinoFileSystemFactory fileSystemFactory;
+ private final FileFormatDataSourceStats fileFormatDataSourceStats;
- public RedshiftPageSourceProvider(ConnectorRecordSetProvider recordSetProvider)
+ public RedshiftPageSourceProvider(ConnectorRecordSetProvider recordSetProvider, TrinoFileSystemFactory fileSystemFactory, FileFormatDataSourceStats fileFormatDataSourceStats)
{
this.recordSetProvider = requireNonNull(recordSetProvider, "recordSetProvider is null");
+ this.fileSystemFactory = requireNonNull(fileSystemFactory, "fileSystemFactory is null");
+ this.fileFormatDataSourceStats = requireNonNull(fileFormatDataSourceStats, "fileFormatDataSourceStats is null");
}
@Override
@@ -47,7 +82,64 @@ public ConnectorPageSource createPageSource(
List columns,
DynamicFilter dynamicFilter)
{
- // TODO switch between two modes of operation - JDBC and UNLOAD
- return new RecordPageSource(recordSetProvider.getRecordSet(transaction, session, split, table, columns));
+ if (split instanceof JdbcSplit) {
+ return new RecordPageSource(recordSetProvider.getRecordSet(transaction, session, split, table, columns));
+ }
+
+ RedshiftUnloadSplit redshiftUnloadSplit = ((RedshiftUnloadSplit) split);
+ String path = redshiftUnloadSplit.path();
+ Location location = Location.of(path);
+ TrinoFileSystem fileSystem = fileSystemFactory.create(session);
+ TrinoInputFile inputFile = fileSystem.newInputFile(location, redshiftUnloadSplit.length());
+ ParquetReader parquetReader;
+ try {
+ parquetReader = parquetReader(inputFile, columns);
+ }
+ catch (IOException e) {
+ throw new TrinoException(REDSHIFT_PARQUET_CURSOR_ERROR, format("Failed to open Parquet file: %s", path), e);
+ }
+ return new RedshiftParquetPageSource(parquetReader);
+ }
+
+ private ParquetReader parquetReader(TrinoInputFile inputFile, List columns)
+ throws IOException
+ {
+ ParquetReaderOptions options = new ParquetReaderOptions();
+ TrinoParquetDataSource dataSource = new TrinoParquetDataSource(inputFile, options, fileFormatDataSourceStats);
+ ParquetMetadata parquetMetadata = MetadataReader.readFooter(dataSource, Optional.empty());
+ MessageType fileSchema = parquetMetadata.getFileMetaData().getSchema();
+ MessageColumnIO messageColumn = getColumnIO(fileSchema, fileSchema);
+ Map, ColumnDescriptor> descriptorsByPath = getDescriptors(fileSchema, fileSchema);
+ DateTimeZone timeZone = DateTimeZone.UTC;
+ List fields = fields(columns, messageColumn);
+ long nextStart = 0;
+ ImmutableList.Builder rowGroupInfoBuilder = ImmutableList.builder();
+ for (BlockMetadata block : parquetMetadata.getBlocks()) {
+ rowGroupInfoBuilder.add(new RowGroupInfo(createPrunedColumnsMetadata(block, dataSource.getId(), descriptorsByPath), nextStart, Optional.empty()));
+ nextStart += block.rowCount();
+ }
+ return new ParquetReader(
+ Optional.ofNullable(parquetMetadata.getFileMetaData().getCreatedBy()),
+ fields,
+ rowGroupInfoBuilder.build(),
+ dataSource,
+ timeZone,
+ newSimpleAggregatedMemoryContext(),
+ options,
+ RedshiftParquetPageSource::handleException,
+ Optional.empty(),
+ Optional.empty());
+ }
+
+ private static List fields(List columns, MessageColumnIO messageColumn)
+ {
+ ImmutableList.Builder parquetColumnFieldsBuilder = ImmutableList.builder();
+ for (ColumnHandle column : columns) {
+ JdbcColumnHandle jdbcColumn = (JdbcColumnHandle) column;
+ constructField(jdbcColumn.getColumnType(), lookupColumnByName(messageColumn, jdbcColumn.getColumnName()))
+ .ifPresent(field -> parquetColumnFieldsBuilder.add(new Column(jdbcColumn.getColumnName(), field)));
+ }
+
+ return parquetColumnFieldsBuilder.build();
}
}
diff --git a/plugin/trino-redshift/src/main/java/io/trino/plugin/redshift/RedshiftParquetPageSource.java b/plugin/trino-redshift/src/main/java/io/trino/plugin/redshift/RedshiftParquetPageSource.java
new file mode 100644
index 000000000000..3371f28c6e98
--- /dev/null
+++ b/plugin/trino-redshift/src/main/java/io/trino/plugin/redshift/RedshiftParquetPageSource.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.trino.plugin.redshift;
+
+import io.trino.parquet.ParquetCorruptionException;
+import io.trino.parquet.reader.ParquetReader;
+import io.trino.spi.Page;
+import io.trino.spi.TrinoException;
+import io.trino.spi.connector.ConnectorPageSource;
+import io.trino.spi.metrics.Metrics;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.OptionalLong;
+
+import static io.trino.plugin.base.util.Closables.closeAllSuppress;
+import static io.trino.plugin.redshift.RedshiftErrorCode.REDSHIFT_PARQUET_BAD_DATA;
+import static io.trino.plugin.redshift.RedshiftErrorCode.REDSHIFT_PARQUET_CURSOR_ERROR;
+import static java.util.Objects.requireNonNull;
+
+public class RedshiftParquetPageSource
+ implements ConnectorPageSource
+{
+ private final ParquetReader parquetReader;
+ private boolean closed;
+ private long completedPositions;
+
+ public RedshiftParquetPageSource(ParquetReader parquetReader)
+ {
+ this.parquetReader = requireNonNull(parquetReader, "parquetReader is null");
+ }
+
+ @Override
+ public long getCompletedBytes()
+ {
+ return parquetReader.getDataSource().getReadBytes();
+ }
+
+ @Override
+ public OptionalLong getCompletedPositions()
+ {
+ return OptionalLong.of(completedPositions);
+ }
+
+ @Override
+ public long getReadTimeNanos()
+ {
+ return parquetReader.getDataSource().getReadTimeNanos();
+ }
+
+ @Override
+ public boolean isFinished()
+ {
+ return closed;
+ }
+
+ @Override
+ public Page getNextPage()
+ {
+ Page page;
+ try {
+ page = parquetReader.nextPage();
+ }
+ catch (IOException | RuntimeException e) {
+ closeAllSuppress(e, this);
+ throw handleException(e);
+ }
+
+ if (closed || page == null) {
+ close();
+ return null;
+ }
+
+ completedPositions += page.getPositionCount();
+ return page;
+ }
+
+ @Override
+ public long getMemoryUsage()
+ {
+ return parquetReader.getMemoryContext().getBytes();
+ }
+
+ @Override
+ public void close()
+ {
+ if (closed) {
+ return;
+ }
+ closed = true;
+
+ try {
+ parquetReader.close();
+ }
+ catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+ }
+
+ @Override
+ public Metrics getMetrics()
+ {
+ return parquetReader.getMetrics();
+ }
+
+ static TrinoException handleException(Exception exception)
+ {
+ if (exception instanceof TrinoException) {
+ return (TrinoException) exception;
+ }
+ if (exception instanceof ParquetCorruptionException) {
+ return new TrinoException(REDSHIFT_PARQUET_BAD_DATA, exception);
+ }
+ return new TrinoException(REDSHIFT_PARQUET_CURSOR_ERROR, exception.getMessage(), exception);
+ }
+}
diff --git a/plugin/trino-redshift/src/main/java/io/trino/plugin/redshift/RedshiftSessionProperties.java b/plugin/trino-redshift/src/main/java/io/trino/plugin/redshift/RedshiftSessionProperties.java
new file mode 100644
index 000000000000..48c7c75a2290
--- /dev/null
+++ b/plugin/trino-redshift/src/main/java/io/trino/plugin/redshift/RedshiftSessionProperties.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.trino.plugin.redshift;
+
+import com.google.common.collect.ImmutableList;
+import com.google.inject.Inject;
+import io.trino.plugin.base.session.SessionPropertiesProvider;
+import io.trino.spi.TrinoException;
+import io.trino.spi.connector.ConnectorSession;
+import io.trino.spi.session.PropertyMetadata;
+
+import java.util.List;
+
+import static io.trino.spi.StandardErrorCode.INVALID_SESSION_PROPERTY;
+import static io.trino.spi.session.PropertyMetadata.booleanProperty;
+
+public class RedshiftSessionProperties
+ implements SessionPropertiesProvider
+{
+ private static final String UNLOAD_ENABLED = "unload_enabled";
+
+ private final List> sessionProperties;
+
+ @Inject
+ public RedshiftSessionProperties(RedshiftConfig config)
+ {
+ sessionProperties = ImmutableList.>builder()
+ .add(booleanProperty(
+ UNLOAD_ENABLED,
+ "Use UNLOAD for reading query results",
+ config.getUnloadLocation().isPresent(),
+ value -> {
+ if (value && config.getUnloadLocation().isEmpty()) {
+ throw new TrinoException(INVALID_SESSION_PROPERTY, "Cannot use UNLOAD when unload location is not configured");
+ }
+ },
+ false))
+ .build();
+ }
+
+ @Override
+ public List> getSessionProperties()
+ {
+ return sessionProperties;
+ }
+
+ public static boolean isUnloadEnabled(ConnectorSession session)
+ {
+ return session.getProperty(UNLOAD_ENABLED, Boolean.class);
+ }
+}
diff --git a/plugin/trino-redshift/src/main/java/io/trino/plugin/redshift/RedshiftSplitManager.java b/plugin/trino-redshift/src/main/java/io/trino/plugin/redshift/RedshiftSplitManager.java
new file mode 100644
index 000000000000..5851bfb3c151
--- /dev/null
+++ b/plugin/trino-redshift/src/main/java/io/trino/plugin/redshift/RedshiftSplitManager.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.trino.plugin.redshift;
+
+import com.google.inject.Inject;
+import io.airlift.log.Logger;
+import io.trino.filesystem.TrinoFileSystemFactory;
+import io.trino.filesystem.s3.FileSystemS3;
+import io.trino.plugin.jdbc.ForRecordCursor;
+import io.trino.plugin.jdbc.JdbcClient;
+import io.trino.plugin.jdbc.JdbcColumnHandle;
+import io.trino.plugin.jdbc.JdbcProcedureHandle;
+import io.trino.plugin.jdbc.JdbcSplit;
+import io.trino.plugin.jdbc.JdbcSplitManager;
+import io.trino.plugin.jdbc.JdbcTableHandle;
+import io.trino.plugin.jdbc.QueryBuilder;
+import io.trino.plugin.jdbc.logging.RemoteQueryModifier;
+import io.trino.spi.connector.ConnectorSession;
+import io.trino.spi.connector.ConnectorSplitManager;
+import io.trino.spi.connector.ConnectorSplitSource;
+import io.trino.spi.connector.ConnectorTableHandle;
+import io.trino.spi.connector.ConnectorTransactionHandle;
+import io.trino.spi.connector.Constraint;
+import io.trino.spi.connector.DynamicFilter;
+import io.trino.spi.connector.FixedSplitSource;
+import io.trino.spi.type.DecimalType;
+import io.trino.spi.type.TimeType;
+import io.trino.spi.type.VarbinaryType;
+
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.ExecutorService;
+
+import static io.trino.plugin.jdbc.JdbcDynamicFilteringSessionProperties.dynamicFilteringEnabled;
+import static io.trino.plugin.redshift.RedshiftSessionProperties.isUnloadEnabled;
+import static java.util.Objects.requireNonNull;
+
+public class RedshiftSplitManager
+ implements ConnectorSplitManager
+{
+ private static final Logger log = Logger.get(RedshiftSplitManager.class);
+
+ private final JdbcClient jdbcClient;
+ private final QueryBuilder queryBuilder;
+ private final RemoteQueryModifier queryModifier;
+ private final JdbcSplitManager jdbcSplitManager;
+ private final Optional unloadLocation;
+ private final Optional unloadAuthorization;
+ private final ExecutorService executor;
+ private final TrinoFileSystemFactory fileSystemFactory;
+
+ @Inject
+ public RedshiftSplitManager(
+ JdbcClient jdbcClient,
+ QueryBuilder queryBuilder,
+ RemoteQueryModifier queryModifier,
+ JdbcSplitManager jdbcSplitManager,
+ RedshiftConfig redshiftConfig,
+ @FileSystemS3 TrinoFileSystemFactory fileSystemFactory,
+ @ForRecordCursor ExecutorService executor)
+ {
+ this.jdbcClient = requireNonNull(jdbcClient, "jdbcClient is null");
+ this.queryBuilder = requireNonNull(queryBuilder, "queryBuilder is null");
+ this.queryModifier = requireNonNull(queryModifier, "queryModifier is null");
+ this.jdbcSplitManager = requireNonNull(jdbcSplitManager, "jdbcSplitManager is null");
+ this.unloadLocation = redshiftConfig.getUnloadLocation();
+ this.unloadAuthorization = redshiftConfig.getUnloadIamRole();
+ this.fileSystemFactory = requireNonNull(fileSystemFactory, "fileSystemFactory is null");
+ this.executor = requireNonNull(executor, "executor is null");
+ }
+
+ @Override
+ public ConnectorSplitSource getSplits(ConnectorTransactionHandle transaction, ConnectorSession session, ConnectorTableHandle table, DynamicFilter dynamicFilter, Constraint constraint)
+ {
+ if (table instanceof JdbcProcedureHandle) {
+ return jdbcSplitManager.getSplits(transaction, session, table, dynamicFilter, constraint);
+ }
+ ConnectorSplitSource fallbackSplitSource = new FixedSplitSource(new JdbcSplit(Optional.empty()));
+ if (!isUnloadEnabled(session)) {
+ return fallbackSplitSource;
+ }
+ JdbcTableHandle jdbcTable = (JdbcTableHandle) table;
+ JdbcTableHandle jdbcTableHandle = dynamicFilteringEnabled(session) ? jdbcTable.intersectedWithConstraint(dynamicFilter.getCurrentPredicate()) : jdbcTable;
+ List columns = jdbcTableHandle.getColumns()
+ .orElseGet(() -> jdbcClient.getColumns(
+ session,
+ jdbcTableHandle.getRequiredNamedRelation().getSchemaTableName(),
+ jdbcTableHandle.getRequiredNamedRelation().getRemoteTableName()));
+
+ if (!isUnloadSupported(jdbcTable, columns)) {
+ log.debug("Unsupported query shape detected. Falling back to using JDBC");
+ return fallbackSplitSource;
+ }
+ return new RedshiftUnloadSplitSource(
+ executor,
+ session,
+ jdbcClient,
+ jdbcTableHandle,
+ columns,
+ queryBuilder,
+ queryModifier,
+ unloadLocation.orElseThrow(),
+ unloadAuthorization,
+ fileSystemFactory.create(session));
+ }
+
+ private static boolean isUnloadSupported(JdbcTableHandle table, List columns)
+ {
+ // Nothing to unload as there are no columns to be fetched from Redshift
+ if (table.getColumns().isPresent() && table.getColumns().get().isEmpty()) {
+ return false;
+ }
+ if (containsUnsupportedType(columns)) {
+ return false;
+ }
+ // Unload command doesn't support limit clause. However, Trino can implement the workaround of wrapping limit query as inner query. See https://github.com/trinodb/trino/issues/24480
+ if (table.getLimit().isPresent()) {
+ return false;
+ }
+ if (containsFilterConditionOnDecimalTypeColumn(table)) {
+ return false;
+ }
+ return true;
+ }
+
+ // Unsupported unload command data types when using Parquet output file format
+ private static boolean containsUnsupportedType(List columns)
+ {
+ // ERROR: UNLOAD varbyte column "col_0" is only supported for TEXT/CSV.
+ // ERROR: UNLOAD time without time zone column "value" is only supported for TEXT/CSV.
+ return columns.stream().anyMatch(column -> column.getColumnType() instanceof TimeType || column.getColumnType() instanceof VarbinaryType);
+ }
+
+ // Redshift driver generates incorrect cast precision in select query for filter condition on decimal columns. See https://github.com/aws/amazon-redshift-jdbc-driver/issues/129
+ private static boolean containsFilterConditionOnDecimalTypeColumn(JdbcTableHandle table)
+ {
+ if (table.getConstraint().getDomains()
+ .map(domains -> domains.keySet().stream().anyMatch(column -> ((JdbcColumnHandle) column).getColumnType() instanceof DecimalType))
+ .orElse(false)) {
+ return true;
+ }
+ return table.getConstraintExpressions().stream()
+ .flatMap(expression -> expression.parameters().stream())
+ .anyMatch(parameter -> parameter.getType() instanceof DecimalType);
+ }
+}
diff --git a/plugin/trino-redshift/src/main/java/io/trino/plugin/redshift/RedshiftConnector.java b/plugin/trino-redshift/src/main/java/io/trino/plugin/redshift/RedshiftUnloadConnector.java
similarity index 93%
rename from plugin/trino-redshift/src/main/java/io/trino/plugin/redshift/RedshiftConnector.java
rename to plugin/trino-redshift/src/main/java/io/trino/plugin/redshift/RedshiftUnloadConnector.java
index e1f98eed5703..c3a17e9642e5 100644
--- a/plugin/trino-redshift/src/main/java/io/trino/plugin/redshift/RedshiftConnector.java
+++ b/plugin/trino-redshift/src/main/java/io/trino/plugin/redshift/RedshiftUnloadConnector.java
@@ -16,7 +16,10 @@
import com.google.common.collect.ImmutableSet;
import com.google.inject.Inject;
import io.airlift.bootstrap.LifeCycleManager;
+import io.trino.filesystem.TrinoFileSystemFactory;
+import io.trino.filesystem.s3.FileSystemS3;
import io.trino.plugin.base.classloader.ClassLoaderSafeConnectorMetadata;
+import io.trino.plugin.base.metrics.FileFormatDataSourceStats;
import io.trino.plugin.base.session.SessionPropertiesProvider;
import io.trino.plugin.jdbc.JdbcTransactionManager;
import io.trino.plugin.jdbc.TablePropertiesProvider;
@@ -44,7 +47,7 @@
import static io.trino.spi.connector.ConnectorCapabilities.NOT_NULL_COLUMN_CONSTRAINT;
import static java.util.Objects.requireNonNull;
-public class RedshiftConnector
+public class RedshiftUnloadConnector
implements Connector
{
private final LifeCycleManager lifeCycleManager;
@@ -59,7 +62,7 @@ public class RedshiftConnector
private final RedshiftPageSourceProvider pageSourceProvider;
@Inject
- public RedshiftConnector(
+ public RedshiftUnloadConnector(
LifeCycleManager lifeCycleManager,
ConnectorSplitManager jdbcSplitManager,
ConnectorRecordSetProvider jdbcRecordSetProvider,
@@ -69,7 +72,9 @@ public RedshiftConnector(
Set connectorTableFunctions,
Set sessionProperties,
Set tableProperties,
- JdbcTransactionManager transactionManager)
+ JdbcTransactionManager transactionManager,
+ @FileSystemS3 TrinoFileSystemFactory fileSystemFactory,
+ FileFormatDataSourceStats fileFormatDataSourceStats)
{
this.lifeCycleManager = requireNonNull(lifeCycleManager, "lifeCycleManager is null");
this.jdbcSplitManager = requireNonNull(jdbcSplitManager, "jdbcSplitManager is null");
@@ -84,7 +89,7 @@ public RedshiftConnector(
.flatMap(tablePropertiesProvider -> tablePropertiesProvider.getTableProperties().stream())
.collect(toImmutableList());
this.transactionManager = requireNonNull(transactionManager, "transactionManager is null");
- this.pageSourceProvider = new RedshiftPageSourceProvider(jdbcRecordSetProvider);
+ this.pageSourceProvider = new RedshiftPageSourceProvider(jdbcRecordSetProvider, fileSystemFactory, fileFormatDataSourceStats);
}
@Override
diff --git a/plugin/trino-redshift/src/main/java/io/trino/plugin/redshift/RedshiftUnloadJdbcQueryEventListener.java b/plugin/trino-redshift/src/main/java/io/trino/plugin/redshift/RedshiftUnloadJdbcQueryEventListener.java
new file mode 100644
index 000000000000..5b3055a3db17
--- /dev/null
+++ b/plugin/trino-redshift/src/main/java/io/trino/plugin/redshift/RedshiftUnloadJdbcQueryEventListener.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.trino.plugin.redshift;
+
+import com.google.inject.Inject;
+import io.trino.filesystem.Location;
+import io.trino.filesystem.TrinoFileSystemFactory;
+import io.trino.filesystem.s3.FileSystemS3;
+import io.trino.plugin.jdbc.JdbcQueryEventListener;
+import io.trino.spi.connector.ConnectorSession;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+
+import static io.trino.plugin.redshift.RedshiftSessionProperties.isUnloadEnabled;
+import static java.util.Objects.requireNonNull;
+
+public class RedshiftUnloadJdbcQueryEventListener
+ implements JdbcQueryEventListener
+{
+ private final TrinoFileSystemFactory fileSystemFactory;
+ private final String unloadLocation;
+
+ @Inject
+ public RedshiftUnloadJdbcQueryEventListener(@FileSystemS3 TrinoFileSystemFactory fileSystemFactory, RedshiftConfig redshiftConfig)
+ {
+ this.fileSystemFactory = requireNonNull(fileSystemFactory, "fileSystemFactory is null");
+ this.unloadLocation = redshiftConfig.getUnloadLocation().orElseThrow();
+ }
+
+ @Override
+ public void beginQuery(ConnectorSession session) {}
+
+ @Override
+ public void cleanupQuery(ConnectorSession session)
+ {
+ if (isUnloadEnabled(session)) {
+ try {
+ fileSystemFactory.create(session).deleteDirectory(Location.of(unloadLocation + "/" + session.getQueryId()));
+ }
+ catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+ }
+ }
+}
diff --git a/plugin/trino-redshift/src/main/java/io/trino/plugin/redshift/RedshiftUnloadSplit.java b/plugin/trino-redshift/src/main/java/io/trino/plugin/redshift/RedshiftUnloadSplit.java
new file mode 100644
index 000000000000..049c6fc69b58
--- /dev/null
+++ b/plugin/trino-redshift/src/main/java/io/trino/plugin/redshift/RedshiftUnloadSplit.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.trino.plugin.redshift;
+
+import com.google.common.collect.ImmutableMap;
+import io.trino.spi.connector.ConnectorSplit;
+
+import java.util.Map;
+
+import static io.airlift.slice.SizeOf.estimatedSizeOf;
+import static io.airlift.slice.SizeOf.instanceSize;
+import static io.airlift.slice.SizeOf.sizeOf;
+import static java.util.Objects.requireNonNull;
+
+public record RedshiftUnloadSplit(String path, long length)
+ implements ConnectorSplit
+{
+ private static final int INSTANCE_SIZE = instanceSize(RedshiftUnloadSplit.class);
+
+ public RedshiftUnloadSplit
+ {
+ requireNonNull(path, "path is null");
+ }
+
+ @Override
+ public Map getSplitInfo()
+ {
+ return ImmutableMap.of("path", path);
+ }
+
+ @Override
+ public long getRetainedSizeInBytes()
+ {
+ return INSTANCE_SIZE + estimatedSizeOf(path) + sizeOf(length);
+ }
+}
diff --git a/plugin/trino-redshift/src/main/java/io/trino/plugin/redshift/RedshiftUnloadSplitSource.java b/plugin/trino-redshift/src/main/java/io/trino/plugin/redshift/RedshiftUnloadSplitSource.java
new file mode 100644
index 000000000000..5acb5dd6bd21
--- /dev/null
+++ b/plugin/trino-redshift/src/main/java/io/trino/plugin/redshift/RedshiftUnloadSplitSource.java
@@ -0,0 +1,196 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.trino.plugin.redshift;
+
+import com.amazon.redshift.jdbc.RedshiftPreparedStatement;
+import com.amazon.redshift.util.RedshiftException;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import io.airlift.json.ObjectMapperProvider;
+import io.airlift.log.Logger;
+import io.trino.filesystem.Location;
+import io.trino.filesystem.TrinoFileSystem;
+import io.trino.filesystem.TrinoInputFile;
+import io.trino.filesystem.TrinoInputStream;
+import io.trino.plugin.jdbc.JdbcClient;
+import io.trino.plugin.jdbc.JdbcColumnHandle;
+import io.trino.plugin.jdbc.JdbcTableHandle;
+import io.trino.plugin.jdbc.PreparedQuery;
+import io.trino.plugin.jdbc.QueryBuilder;
+import io.trino.plugin.jdbc.logging.RemoteQueryModifier;
+import io.trino.spi.TrinoException;
+import io.trino.spi.connector.ConnectorSession;
+import io.trino.spi.connector.ConnectorSplit;
+import io.trino.spi.connector.ConnectorSplitSource;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.util.List;
+import java.util.Optional;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+
+import static com.google.common.collect.ImmutableList.toImmutableList;
+import static io.airlift.units.Duration.nanosSince;
+import static io.trino.plugin.redshift.RedshiftErrorCode.REDSHIFT_FILESYSTEM_ERROR;
+import static io.trino.plugin.redshift.RedshiftErrorCode.REDSHIFT_S3_CROSS_REGION_UNSUPPORTED;
+import static java.util.Objects.requireNonNull;
+
+public class RedshiftUnloadSplitSource
+ implements ConnectorSplitSource
+{
+ private static final Logger log = Logger.get(RedshiftUnloadSplitSource.class);
+ private static final ObjectMapper OBJECT_MAPPER = new ObjectMapperProvider().get();
+
+ private final JdbcClient jdbcClient;
+ private final QueryBuilder queryBuilder;
+ private final RemoteQueryModifier queryModifier;
+ private final Optional unloadAuthorization;
+ private final String unloadOutputPath;
+ private final TrinoFileSystem fileSystem;
+ private final CompletableFuture resultSetFuture;
+
+ private boolean finished;
+
+ public RedshiftUnloadSplitSource(
+ ExecutorService executor,
+ ConnectorSession session,
+ JdbcClient jdbcClient,
+ JdbcTableHandle jdbcTableHandle,
+ List columns,
+ QueryBuilder queryBuilder,
+ RemoteQueryModifier queryModifier,
+ String unloadLocation,
+ Optional unloadAuthorization,
+ TrinoFileSystem fileSystem)
+ {
+ requireNonNull(executor, "executor is null");
+ requireNonNull(session, "session is null");
+ this.jdbcClient = requireNonNull(jdbcClient, "jdbcClient is null");
+ requireNonNull(jdbcTableHandle, "jdbcTableHandle is null");
+ requireNonNull(columns, "columns is null");
+ this.queryBuilder = requireNonNull(queryBuilder, "queryBuilder is null");
+ this.queryModifier = requireNonNull(queryModifier, "queryModifier is null");
+ this.unloadAuthorization = requireNonNull(unloadAuthorization, "unloadAuthorization is null");
+ this.fileSystem = requireNonNull(fileSystem, "fileSystem is null");
+
+ String queryFragmentId = session.getQueryId() + "/" + UUID.randomUUID();
+ this.unloadOutputPath = unloadLocation + "/" + queryFragmentId + "/";
+
+ resultSetFuture = CompletableFuture.runAsync(() -> {
+ try (Connection connection = jdbcClient.getConnection(session)) {
+ String redshiftSelectSql = buildRedshiftSelectSql(session, connection, jdbcTableHandle, columns);
+ try (PreparedStatement statement = buildRedshiftUnloadSql(session, connection, columns, redshiftSelectSql, unloadOutputPath)) {
+ // Exclusively set readOnly to false to avoid query failing with "ERROR: transaction is read-only".
+ connection.setReadOnly(false);
+ log.debug("Executing: %s", statement);
+ long start = System.nanoTime();
+ statement.execute(); // Return value of `statement.execute()` is not useful to determine whether UNLOAD command produced any result as it always return false.
+ log.info("Redshift UNLOAD command for %s query took %s", queryFragmentId, nanosSince(start));
+ }
+ }
+ catch (SQLException e) {
+ if (e instanceof RedshiftException && e.getMessage() != null && e.getMessage().contains("The S3 bucket addressed by the query is in a different region from this cluster")) {
+ throw new TrinoException(REDSHIFT_S3_CROSS_REGION_UNSUPPORTED, "Redshift cluster and S3 bucket in different regions is not supported", e);
+ }
+ throw new RuntimeException(e);
+ }
+ }, executor);
+ }
+
+ @Override
+ public CompletableFuture getNextBatch(int maxSize)
+ {
+ return resultSetFuture
+ .thenApply(_ -> {
+ ConnectorSplitBatch connectorSplitBatch = new ConnectorSplitBatch(readUnloadedFilePaths().stream()
+ .map(fileInfo -> (ConnectorSplit) new RedshiftUnloadSplit(fileInfo.path, fileInfo.size))
+ .collect(toImmutableList()), true);
+ finished = true;
+ return connectorSplitBatch;
+ });
+ }
+
+ @Override
+ public void close()
+ {
+ resultSetFuture.cancel(true);
+ }
+
+ @Override
+ public boolean isFinished()
+ {
+ return finished;
+ }
+
+ private String buildRedshiftSelectSql(ConnectorSession session, Connection connection, JdbcTableHandle table, List columns)
+ throws SQLException
+ {
+ PreparedQuery preparedQuery = jdbcClient.prepareQuery(session, table, Optional.empty(), columns, ImmutableMap.of());
+ String selectQuerySql;
+ try (PreparedStatement openTelemetryPreparedStatement = queryBuilder.prepareStatement(jdbcClient, session, connection, preparedQuery, Optional.of(columns.size()))) {
+ RedshiftPreparedStatement redshiftPreparedStatement = openTelemetryPreparedStatement.unwrap(RedshiftPreparedStatement.class);
+ selectQuerySql = redshiftPreparedStatement.toString();
+ }
+ return queryModifier.apply(session, selectQuerySql);
+ }
+
+ private PreparedStatement buildRedshiftUnloadSql(ConnectorSession session, Connection connection, List columns, String redshiftSelectSql, String unloadOutputPath)
+ throws SQLException
+ {
+ String unloadSql = "UNLOAD ('%s') TO '%s' IAM_ROLE %s FORMAT PARQUET MAXFILESIZE 64MB MANIFEST VERBOSE".formatted(
+ escapeUnloadIllegalCharacters(redshiftSelectSql),
+ unloadOutputPath,
+ unloadAuthorization.map("'%s'"::formatted).orElse("DEFAULT"));
+ return queryBuilder.prepareStatement(jdbcClient, session, connection, new PreparedQuery(unloadSql, List.of()), Optional.of(columns.size()));
+ }
+
+ private List readUnloadedFilePaths()
+ {
+ Location manifestLocation = Location.of(unloadOutputPath + "manifest");
+ TrinoInputFile inputFile = fileSystem.newInputFile(manifestLocation);
+ JsonNode outputFileEntries;
+ try (TrinoInputStream inputStream = inputFile.newStream()) {
+ byte[] manifestContent = inputStream.readAllBytes();
+ outputFileEntries = OBJECT_MAPPER.readTree(manifestContent).path("entries");
+ }
+ // manifest is not generated if unload query doesn't produce any results.
+ // Rely on the catching `FileNotFoundException` as opposed to calling `TrinoInputFile#exists` for determining absence of manifest file as `TrinoInputFile#exists` adds additional call to S3.
+ catch (FileNotFoundException e) {
+ return ImmutableList.of();
+ }
+ catch (IOException e) {
+ throw new TrinoException(REDSHIFT_FILESYSTEM_ERROR, e);
+ }
+ ImmutableList.Builder unloadedFilePaths = ImmutableList.builder();
+ outputFileEntries.elements()
+ .forEachRemaining(fileInfo -> unloadedFilePaths.add(new FileInfo(fileInfo.get("url").asText(), fileInfo.get("meta").get("content_length").longValue())));
+ return unloadedFilePaths.build();
+ }
+
+ private static String escapeUnloadIllegalCharacters(String value)
+ {
+ return value
+ .replace("'", "''") // escape single quotes with single quotes
+ .replace("\\b", "\\\\b"); // escape backspace with backslash
+ }
+
+ private record FileInfo(String path, long size) {}
+}
diff --git a/plugin/trino-redshift/src/main/java/io/trino/plugin/redshift/TrinoParquetDataSource.java b/plugin/trino-redshift/src/main/java/io/trino/plugin/redshift/TrinoParquetDataSource.java
new file mode 100644
index 000000000000..57d5a1f7fc9d
--- /dev/null
+++ b/plugin/trino-redshift/src/main/java/io/trino/plugin/redshift/TrinoParquetDataSource.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.trino.plugin.redshift;
+
+import io.airlift.slice.Slice;
+import io.trino.filesystem.TrinoInput;
+import io.trino.filesystem.TrinoInputFile;
+import io.trino.parquet.AbstractParquetDataSource;
+import io.trino.parquet.ParquetDataSourceId;
+import io.trino.parquet.ParquetReaderOptions;
+import io.trino.plugin.base.metrics.FileFormatDataSourceStats;
+
+import java.io.IOException;
+
+import static java.util.Objects.requireNonNull;
+
+// Copied as-is from io.trino.plugin.hive.parquet.TrinoParquetDataSource
+public class TrinoParquetDataSource
+ extends AbstractParquetDataSource
+{
+ private final FileFormatDataSourceStats stats;
+ private final TrinoInput input;
+
+ public TrinoParquetDataSource(TrinoInputFile file, ParquetReaderOptions options, FileFormatDataSourceStats stats)
+ throws IOException
+ {
+ super(new ParquetDataSourceId(file.location().toString()), file.length(), options);
+ this.stats = requireNonNull(stats, "stats is null");
+ this.input = file.newInput();
+ }
+
+ @Override
+ public void close()
+ throws IOException
+ {
+ input.close();
+ }
+
+ @Override
+ protected Slice readTailInternal(int length)
+ throws IOException
+ {
+ long readStart = System.nanoTime();
+ Slice tail = input.readTail(length);
+ stats.readDataBytesPerSecond(tail.length(), System.nanoTime() - readStart);
+ return tail;
+ }
+
+ @Override
+ protected void readInternal(long position, byte[] buffer, int bufferOffset, int bufferLength)
+ throws IOException
+ {
+ long readStart = System.nanoTime();
+ input.readFully(position, buffer, bufferOffset, bufferLength);
+ stats.readDataBytesPerSecond(bufferLength, System.nanoTime() - readStart);
+ }
+}
diff --git a/plugin/trino-redshift/src/test/java/io/trino/plugin/redshift/RedshiftQueryRunner.java b/plugin/trino-redshift/src/test/java/io/trino/plugin/redshift/RedshiftQueryRunner.java
index d1d806f0a2eb..c74274090d2d 100644
--- a/plugin/trino-redshift/src/test/java/io/trino/plugin/redshift/RedshiftQueryRunner.java
+++ b/plugin/trino-redshift/src/test/java/io/trino/plugin/redshift/RedshiftQueryRunner.java
@@ -56,7 +56,7 @@ private RedshiftQueryRunner() {}
private static final Logger log = Logger.get(RedshiftQueryRunner.class);
private static final String S3_TPCH_TABLES_ROOT = requiredNonEmptySystemProperty("test.redshift.s3.tpch.tables.root");
- private static final String IAM_ROLE = requiredNonEmptySystemProperty("test.redshift.iam.role");
+ public static final String IAM_ROLE = requiredNonEmptySystemProperty("test.redshift.iam.role");
private static final String TEST_CATALOG = "redshift";
private static final String CONNECTOR_NAME = "redshift";
diff --git a/plugin/trino-redshift/src/test/java/io/trino/plugin/redshift/TestRedshiftConfig.java b/plugin/trino-redshift/src/test/java/io/trino/plugin/redshift/TestRedshiftConfig.java
index 6a507d29d87b..2c7498cb19d4 100644
--- a/plugin/trino-redshift/src/test/java/io/trino/plugin/redshift/TestRedshiftConfig.java
+++ b/plugin/trino-redshift/src/test/java/io/trino/plugin/redshift/TestRedshiftConfig.java
@@ -28,7 +28,9 @@ public class TestRedshiftConfig
public void testDefaults()
{
assertRecordedDefaults(recordDefaults(RedshiftConfig.class)
- .setFetchSize(null));
+ .setFetchSize(null)
+ .setUnloadLocation(null)
+ .setUnloadIamRole(null));
}
@Test
@@ -36,10 +38,14 @@ public void testExplicitPropertyMappings()
{
Map properties = ImmutableMap.builder()
.put("redshift.fetch-size", "2000")
+ .put("redshift.unload-location", "s3://bucket")
+ .put("redshift.unload-iam-role", "arn:aws:iam::123456789000:role/redshift_iam_role")
.buildOrThrow();
RedshiftConfig expected = new RedshiftConfig()
- .setFetchSize(2000);
+ .setFetchSize(2000)
+ .setUnloadLocation("s3://bucket")
+ .setUnloadIamRole("arn:aws:iam::123456789000:role/redshift_iam_role");
assertFullMapping(properties, expected);
}
diff --git a/plugin/trino-redshift/src/test/java/io/trino/plugin/redshift/TestRedshiftPlugin.java b/plugin/trino-redshift/src/test/java/io/trino/plugin/redshift/TestRedshiftPlugin.java
index 9d4fdaad3e80..596ec7ffb1a1 100644
--- a/plugin/trino-redshift/src/test/java/io/trino/plugin/redshift/TestRedshiftPlugin.java
+++ b/plugin/trino-redshift/src/test/java/io/trino/plugin/redshift/TestRedshiftPlugin.java
@@ -35,4 +35,22 @@ public void testCreateConnector()
"bootstrap.quiet", "true"),
new TestingConnectorContext()).shutdown();
}
+
+ @Test
+ public void testCreateUnloadConnector()
+ {
+ Plugin plugin = new RedshiftPlugin();
+ ConnectorFactory factory = getOnlyElement(plugin.getConnectorFactories());
+ factory.create(
+ "test",
+ ImmutableMap.of(
+ "connection-url", "jdbc:redshift:test",
+ "redshift.unload-location", "s3://bucket/path",
+ "redshift.unload-iam-role", "role",
+ "s3.aws-access-key", "access-key",
+ "s3.aws-secret-key", "secret-key",
+ "s3.region", "region",
+ "bootstrap.quiet", "true"),
+ new TestingConnectorContext()).shutdown();
+ }
}
diff --git a/plugin/trino-redshift/src/test/java/io/trino/plugin/redshift/TestRedshiftUnload.java b/plugin/trino-redshift/src/test/java/io/trino/plugin/redshift/TestRedshiftUnload.java
new file mode 100644
index 000000000000..d526361e98ce
--- /dev/null
+++ b/plugin/trino-redshift/src/test/java/io/trino/plugin/redshift/TestRedshiftUnload.java
@@ -0,0 +1,232 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.trino.plugin.redshift;
+
+import com.google.common.collect.ImmutableList;
+import io.trino.Session;
+import io.trino.operator.OperatorInfo;
+import io.trino.operator.SplitOperatorInfo;
+import io.trino.testing.AbstractTestQueries;
+import io.trino.testing.QueryRunner;
+import io.trino.testing.sql.SqlExecutor;
+import io.trino.testing.sql.TestTable;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInstance;
+import org.junit.jupiter.api.parallel.Execution;
+import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
+import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
+import software.amazon.awssdk.regions.Region;
+import software.amazon.awssdk.services.s3.S3Client;
+
+import java.net.URI;
+import java.util.List;
+import java.util.Map;
+
+import static com.google.common.collect.ImmutableList.toImmutableList;
+import static com.google.common.collect.MoreCollectors.onlyElement;
+import static io.trino.plugin.redshift.RedshiftQueryRunner.IAM_ROLE;
+import static io.trino.plugin.redshift.TestingRedshiftServer.TEST_SCHEMA;
+import static io.trino.testing.TestingNames.randomNameSuffix;
+import static io.trino.testing.TestingProperties.requiredNonEmptySystemProperty;
+import static io.trino.testing.TestingSession.testSessionBuilder;
+import static io.trino.tpch.TpchTable.NATION;
+import static java.util.Locale.ENGLISH;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.jupiter.api.TestInstance.Lifecycle.PER_CLASS;
+import static org.junit.jupiter.api.parallel.ExecutionMode.CONCURRENT;
+
+@TestInstance(PER_CLASS)
+@Execution(CONCURRENT)
+final class TestRedshiftUnload
+ extends AbstractTestQueries
+{
+ private static final String S3_UNLOAD_ROOT = requiredNonEmptySystemProperty("test.redshift.s3.unload.root");
+ private static final String AWS_REGION = requiredNonEmptySystemProperty("test.redshift.aws.region");
+ private static final String AWS_ACCESS_KEY = requiredNonEmptySystemProperty("test.redshift.aws.access-key");
+ private static final String AWS_SECRET_KEY = requiredNonEmptySystemProperty("test.redshift.aws.secret-key");
+
+ @Override
+ protected QueryRunner createQueryRunner()
+ throws Exception
+ {
+ return RedshiftQueryRunner.builder()
+ .setConnectorProperties(
+ Map.of(
+ "redshift.unload-location", S3_UNLOAD_ROOT,
+ "redshift.unload-iam-role", IAM_ROLE,
+ "s3.region", AWS_REGION,
+ "s3.aws-access-key", AWS_ACCESS_KEY,
+ "s3.aws-secret-key", AWS_SECRET_KEY))
+ .setInitialTables(List.of(NATION))
+ .build();
+ }
+
+ @Test
+ void testUnloadEnabled()
+ {
+ assertQuery(
+ "SHOW SESSION LIKE 'redshift.unload_enabled'",
+ "VALUES ('redshift.unload_enabled', 'true', 'true', 'boolean', 'Use UNLOAD for reading query results')");
+ }
+
+ @Test
+ void testUnload()
+ {
+ assertQueryStats(
+ getSession(),
+ """
+ SELECT nationkey, name FROM nation WHERE regionkey = 0
+ UNION
+ SELECT nationkey, name FROM nation WHERE regionkey = 1
+ """,
+ queryStats -> {
+ List