From fa5eb8ed60ee9df2d3843e42ea790e63eec0f58e Mon Sep 17 00:00:00 2001 From: Gaurav Sehgal Date: Wed, 9 Aug 2023 22:56:31 -0400 Subject: [PATCH] Add connector SPI for writer scaling options Using WriterScalingOptions connector can control scaling by providing the following configurations. 1. isWriterTasksScalingEnabled 2. isPerTaskWriterScalingEnabled 3. perTaskMaxScaledWriterCount Additionally, for now scaling is only enabled for hive, iceberg and delta connector. --- .../main/java/io/trino/metadata/Metadata.java | 11 ++ .../io/trino/metadata/MetadataManager.java | 17 +++ .../sql/planner/LocalExecutionPlanner.java | 40 +++++-- .../io/trino/sql/planner/LogicalPlanner.java | 6 +- .../planner/optimizations/AddExchanges.java | 5 +- .../optimizations/AddLocalExchanges.java | 4 +- .../optimizations/BeginTableWrite.java | 14 ++- .../sql/planner/plan/TableWriterNode.java | 112 +++++++++++++++++- .../sanity/ValidateScaledWritersUsage.java | 42 ++++--- .../tracing/TracingConnectorMetadata.java | 19 +++ .../io/trino/tracing/TracingMetadata.java | 19 +++ .../io/trino/connector/MockConnector.java | 18 ++- .../trino/connector/MockConnectorFactory.java | 20 +++- .../trino/metadata/AbstractMockMetadata.java | 13 ++ .../operator/TestTableWriterOperator.java | 4 +- .../sql/planner/TestingWriterTarget.java | 7 ++ .../TestRemoveEmptyMergeWriterRuleSet.java | 3 +- .../iterative/rule/test/PlanBuilder.java | 13 +- .../TestAddExchangesScaledWriters.java | 23 +++- ...tAddLocalExchangesForTaskScaleWriters.java | 93 +++++++++++++-- .../TestLimitMaxWriterNodesCount.java | 2 + .../TestValidateScaledWritersUsage.java | 56 ++++++++- .../spi/connector/ConnectorMetadata.java | 10 ++ .../spi/connector/WriterScalingOptions.java | 63 ++++++++++ .../ClassLoaderSafeConnectorMetadata.java | 17 +++ .../plugin/deltalake/DeltaLakeMetadata.java | 13 ++ .../io/trino/plugin/hive/HiveMetadata.java | 13 ++ .../trino/plugin/iceberg/IcebergMetadata.java | 13 ++ 28 files changed, 605 insertions(+), 65 deletions(-) create mode 100644 core/trino-spi/src/main/java/io/trino/spi/connector/WriterScalingOptions.java diff --git a/core/trino-main/src/main/java/io/trino/metadata/Metadata.java b/core/trino-main/src/main/java/io/trino/metadata/Metadata.java index 92f79f46d31e..326de733d15e 100644 --- a/core/trino-main/src/main/java/io/trino/metadata/Metadata.java +++ b/core/trino-main/src/main/java/io/trino/metadata/Metadata.java @@ -47,6 +47,7 @@ import io.trino.spi.connector.TableFunctionApplicationResult; import io.trino.spi.connector.TableScanRedirectApplicationResult; import io.trino.spi.connector.TopNApplicationResult; +import io.trino.spi.connector.WriterScalingOptions; import io.trino.spi.expression.ConnectorExpression; import io.trino.spi.function.AggregationFunctionMetadata; import io.trino.spi.function.FunctionMetadata; @@ -751,4 +752,14 @@ default boolean isMaterializedView(Session session, QualifiedObjectName viewName * Note: It is ignored when retry policy is set to TASK */ OptionalInt getMaxWriterTasks(Session session, String catalogName); + + /** + * Returns writer scaling options for the specified table. This method is called when table handle is not available during CTAS. + */ + WriterScalingOptions getNewTableWriterScalingOptions(Session session, QualifiedObjectName tableName, Map tableProperties); + + /** + * Returns writer scaling options for the specified table. + */ + WriterScalingOptions getInsertWriterScalingOptions(Session session, TableHandle tableHandle); } diff --git a/core/trino-main/src/main/java/io/trino/metadata/MetadataManager.java b/core/trino-main/src/main/java/io/trino/metadata/MetadataManager.java index 942027c99026..a7b2e1ed75c1 100644 --- a/core/trino-main/src/main/java/io/trino/metadata/MetadataManager.java +++ b/core/trino-main/src/main/java/io/trino/metadata/MetadataManager.java @@ -83,6 +83,7 @@ import io.trino.spi.connector.TableFunctionApplicationResult; import io.trino.spi.connector.TableScanRedirectApplicationResult; import io.trino.spi.connector.TopNApplicationResult; +import io.trino.spi.connector.WriterScalingOptions; import io.trino.spi.expression.ConnectorExpression; import io.trino.spi.expression.Variable; import io.trino.spi.function.AggregationFunctionMetadata; @@ -2671,6 +2672,22 @@ public OptionalInt getMaxWriterTasks(Session session, String catalogName) return catalogMetadata.getMetadata(session).getMaxWriterTasks(session.toConnectorSession(catalogHandle)); } + @Override + public WriterScalingOptions getNewTableWriterScalingOptions(Session session, QualifiedObjectName tableName, Map tableProperties) + { + CatalogMetadata catalogMetadata = getCatalogMetadataForWrite(session, tableName.getCatalogName()); + CatalogHandle catalogHandle = catalogMetadata.getCatalogHandle(session, tableName); + ConnectorMetadata metadata = catalogMetadata.getMetadataFor(session, catalogHandle); + return metadata.getNewTableWriterScalingOptions(session.toConnectorSession(catalogHandle), tableName.asSchemaTableName(), tableProperties); + } + + @Override + public WriterScalingOptions getInsertWriterScalingOptions(Session session, TableHandle tableHandle) + { + ConnectorMetadata metadata = getMetadataForWrite(session, tableHandle.getCatalogHandle()); + return metadata.getInsertWriterScalingOptions(session.toConnectorSession(tableHandle.getCatalogHandle()), tableHandle.getConnectorHandle()); + } + private Optional toConnectorVersion(Optional version) { Optional connectorVersion = Optional.empty(); diff --git a/core/trino-main/src/main/java/io/trino/sql/planner/LocalExecutionPlanner.java b/core/trino-main/src/main/java/io/trino/sql/planner/LocalExecutionPlanner.java index 44595589c953..19247305128c 100644 --- a/core/trino-main/src/main/java/io/trino/sql/planner/LocalExecutionPlanner.java +++ b/core/trino-main/src/main/java/io/trino/sql/planner/LocalExecutionPlanner.java @@ -161,6 +161,7 @@ import io.trino.spi.connector.DynamicFilter; import io.trino.spi.connector.RecordSet; import io.trino.spi.connector.SortOrder; +import io.trino.spi.connector.WriterScalingOptions; import io.trino.spi.function.AggregationImplementation; import io.trino.spi.function.BoundSignature; import io.trino.spi.function.FunctionId; @@ -379,6 +380,7 @@ import static io.trino.util.SpatialJoinUtils.ST_WITHIN; import static io.trino.util.SpatialJoinUtils.extractSupportedSpatialComparisons; import static io.trino.util.SpatialJoinUtils.extractSupportedSpatialFunctions; +import static java.lang.Math.min; import static java.lang.Math.toIntExact; import static java.lang.String.format; import static java.util.Objects.requireNonNull; @@ -3280,7 +3282,11 @@ public PhysicalOperation visitRefreshMaterializedView(RefreshMaterializedViewNod public PhysicalOperation visitTableWriter(TableWriterNode node, LocalExecutionPlanContext context) { // Set table writer count - int maxWriterCount = getWriterCount(session, node.getPartitioningScheme(), node.getSource()); + int maxWriterCount = getWriterCount( + session, + node.getTarget().getWriterScalingOptions(metadata, session), + node.getPartitioningScheme(), + node.getSource()); context.setDriverInstanceCount(maxWriterCount); context.taskContext.setMaxWriterCount(maxWriterCount); @@ -3438,7 +3444,11 @@ public PhysicalOperation visitSimpleTableExecuteNode(SimpleTableExecuteNode node public PhysicalOperation visitTableExecute(TableExecuteNode node, LocalExecutionPlanContext context) { // Set table writer count - int maxWriterCount = getWriterCount(session, node.getPartitioningScheme(), node.getSource()); + int maxWriterCount = getWriterCount( + session, + node.getTarget().getWriterScalingOptions(metadata, session), + node.getPartitioningScheme(), + node.getSource()); context.setDriverInstanceCount(maxWriterCount); context.taskContext.setMaxWriterCount(maxWriterCount); @@ -3465,7 +3475,7 @@ public PhysicalOperation visitTableExecute(TableExecuteNode node, LocalExecution return new PhysicalOperation(operatorFactory, outputMapping.buildOrThrow(), context, source); } - private int getWriterCount(Session session, Optional partitioningScheme, PlanNode source) + private int getWriterCount(Session session, WriterScalingOptions connectorScalingOptions, Optional partitioningScheme, PlanNode source) { // This check is required because we don't know which writer count to use when exchange is // single distribution. It could be possible that when scaling is enabled, a single distribution is @@ -3475,12 +3485,24 @@ private int getWriterCount(Session session, Optional partiti return 1; } - // The default value of partitioned writer count is 32 which is high enough to use it - // for both cases when scaling is enabled or not. Additionally, it doesn't lead to too many - // small files since when scaling is disabled only single writer will handle a single partition. - return partitioningScheme - .map(scheme -> getTaskPartitionedWriterCount(session)) - .orElseGet(() -> isLocalScaledWriterExchange(source) ? getTaskScaleWritersMaxWriterCount(session) : getTaskWriterCount(session)); + if (partitioningScheme.isPresent()) { + // The default value of partitioned writer count is 32 which is high enough to use it + // for both cases when scaling is enabled or not. Additionally, it doesn't lead to too many + // small files since when scaling is disabled only single writer will handle a single partition. + if (isLocalScaledWriterExchange(source)) { + return connectorScalingOptions.perTaskMaxScaledWriterCount() + .map(writerCount -> min(writerCount, getTaskPartitionedWriterCount(session))) + .orElse(getTaskPartitionedWriterCount(session)); + } + return getTaskPartitionedWriterCount(session); + } + + if (isLocalScaledWriterExchange(source)) { + return connectorScalingOptions.perTaskMaxScaledWriterCount() + .map(writerCount -> min(writerCount, getTaskScaleWritersMaxWriterCount(session))) + .orElse(getTaskScaleWritersMaxWriterCount(session)); + } + return getTaskWriterCount(session); } private boolean isSingleGatheringExchange(PlanNode node) diff --git a/core/trino-main/src/main/java/io/trino/sql/planner/LogicalPlanner.java b/core/trino-main/src/main/java/io/trino/sql/planner/LogicalPlanner.java index fc04b1cb320a..198530887691 100644 --- a/core/trino-main/src/main/java/io/trino/sql/planner/LogicalPlanner.java +++ b/core/trino-main/src/main/java/io/trino/sql/planner/LogicalPlanner.java @@ -944,7 +944,11 @@ private RelationPlan createTableExecutePlan(Analysis analysis, TableExecute stat .map(ColumnMetadata::getName) .collect(toImmutableList()); - TableWriterNode.TableExecuteTarget tableExecuteTarget = new TableWriterNode.TableExecuteTarget(executeHandle, Optional.empty(), tableName.asSchemaTableName()); + TableWriterNode.TableExecuteTarget tableExecuteTarget = new TableWriterNode.TableExecuteTarget( + executeHandle, + Optional.empty(), + tableName.asSchemaTableName(), + metadata.getInsertWriterScalingOptions(session, tableHandle)); Optional layout = metadata.getLayoutForTableExecute(session, executeHandle); diff --git a/core/trino-main/src/main/java/io/trino/sql/planner/optimizations/AddExchanges.java b/core/trino-main/src/main/java/io/trino/sql/planner/optimizations/AddExchanges.java index 454472b6adde..9c1f9391859b 100644 --- a/core/trino-main/src/main/java/io/trino/sql/planner/optimizations/AddExchanges.java +++ b/core/trino-main/src/main/java/io/trino/sql/planner/optimizations/AddExchanges.java @@ -33,6 +33,7 @@ import io.trino.spi.connector.CatalogHandle; import io.trino.spi.connector.GroupingProperty; import io.trino.spi.connector.LocalProperty; +import io.trino.spi.connector.WriterScalingOptions; import io.trino.sql.PlannerContext; import io.trino.sql.planner.DomainTranslator; import io.trino.sql.planner.Partitioning; @@ -760,13 +761,14 @@ public PlanWithProperties visitMergeWriter(MergeWriterNode node, PreferredProper private PlanWithProperties getWriterPlanWithProperties(Optional partitioningScheme, PlanWithProperties newSource, TableWriterNode.WriterTarget writerTarget) { + WriterScalingOptions scalingOptions = writerTarget.getWriterScalingOptions(plannerContext.getMetadata(), session); if (partitioningScheme.isEmpty()) { // use maxWritersTasks to set PartitioningScheme.partitionCount field to limit number of tasks that will take part in executing writing stage int maxWriterTasks = writerTarget.getMaxWriterTasks(plannerContext.getMetadata(), session).orElse(getMaxWriterTaskCount(session)); Optional maxWritersNodesCount = getRetryPolicy(session) != RetryPolicy.TASK ? Optional.of(Math.min(maxWriterTasks, getMaxWriterTaskCount(session))) : Optional.empty(); - if (scaleWriters) { + if (scaleWriters && scalingOptions.isWriterTasksScalingEnabled()) { partitioningScheme = Optional.of(new PartitioningScheme(Partitioning.create(SCALED_WRITER_ROUND_ROBIN_DISTRIBUTION, ImmutableList.of()), newSource.getNode().getOutputSymbols(), Optional.empty(), false, Optional.empty(), maxWritersNodesCount)); } else if (redistributeWrites) { @@ -774,6 +776,7 @@ else if (redistributeWrites) { } } else if (scaleWriters + && scalingOptions.isWriterTasksScalingEnabled() && writerTarget.supportsMultipleWritersPerPartition(plannerContext.getMetadata(), session) // do not insert an exchange if partitioning is compatible && !newSource.getProperties().isCompatibleTablePartitioningWith(partitioningScheme.get().getPartitioning(), false, plannerContext.getMetadata(), session)) { diff --git a/core/trino-main/src/main/java/io/trino/sql/planner/optimizations/AddLocalExchanges.java b/core/trino-main/src/main/java/io/trino/sql/planner/optimizations/AddLocalExchanges.java index 952f7ecc590d..73fc9743d9e2 100644 --- a/core/trino-main/src/main/java/io/trino/sql/planner/optimizations/AddLocalExchanges.java +++ b/core/trino-main/src/main/java/io/trino/sql/planner/optimizations/AddLocalExchanges.java @@ -23,6 +23,7 @@ import io.trino.spi.connector.ConstantProperty; import io.trino.spi.connector.GroupingProperty; import io.trino.spi.connector.LocalProperty; +import io.trino.spi.connector.WriterScalingOptions; import io.trino.sql.PlannerContext; import io.trino.sql.planner.Partitioning; import io.trino.sql.planner.PartitioningHandle; @@ -686,13 +687,14 @@ public PlanWithProperties visitSimpleTableExecuteNode(SimpleTableExecuteNode nod @Override public PlanWithProperties visitTableWriter(TableWriterNode node, StreamPreferredProperties parentPreferences) { + WriterScalingOptions scalingOptions = node.getTarget().getWriterScalingOptions(plannerContext.getMetadata(), session); return visitTableWriter( node, node.getPartitioningScheme(), node.getSource(), parentPreferences, node.getTarget(), - isTaskScaleWritersEnabled(session)); + isTaskScaleWritersEnabled(session) && scalingOptions.isPerTaskWriterScalingEnabled()); } @Override diff --git a/core/trino-main/src/main/java/io/trino/sql/planner/optimizations/BeginTableWrite.java b/core/trino-main/src/main/java/io/trino/sql/planner/optimizations/BeginTableWrite.java index ff2d94831fb6..33d8fd70e2fa 100644 --- a/core/trino-main/src/main/java/io/trino/sql/planner/optimizations/BeginTableWrite.java +++ b/core/trino-main/src/main/java/io/trino/sql/planner/optimizations/BeginTableWrite.java @@ -208,7 +208,8 @@ public WriterTarget getWriterTarget(PlanNode node) return new TableExecuteTarget( target.getExecuteHandle(), findTableScanHandleForTableExecute(((TableExecuteNode) node).getSource()), - target.getSchemaTableName()); + target.getSchemaTableName(), + target.getWriterScalingOptions()); } if (node instanceof MergeWriterNode mergeWriterNode) { @@ -244,14 +245,16 @@ private WriterTarget createWriterTarget(WriterTarget target) metadata.beginCreateTable(session, create.getCatalog(), create.getTableMetadata(), create.getLayout()), create.getTableMetadata().getTable(), target.supportsMultipleWritersPerPartition(metadata, session), - target.getMaxWriterTasks(metadata, session)); + target.getMaxWriterTasks(metadata, session), + target.getWriterScalingOptions(metadata, session)); } if (target instanceof InsertReference insert) { return new InsertTarget( metadata.beginInsert(session, insert.getHandle(), insert.getColumns()), metadata.getTableName(session, insert.getHandle()).getSchemaTableName(), target.supportsMultipleWritersPerPartition(metadata, session), - target.getMaxWriterTasks(metadata, session)); + target.getMaxWriterTasks(metadata, session), + target.getWriterScalingOptions(metadata, session)); } if (target instanceof MergeTarget merge) { MergeHandle mergeHandle = metadata.beginMerge(session, merge.getHandle()); @@ -266,11 +269,12 @@ private WriterTarget createWriterTarget(WriterTarget target) refreshMV.getStorageTableHandle(), metadata.beginRefreshMaterializedView(session, refreshMV.getStorageTableHandle(), refreshMV.getSourceTableHandles()), metadata.getTableName(session, refreshMV.getStorageTableHandle()).getSchemaTableName(), - refreshMV.getSourceTableHandles()); + refreshMV.getSourceTableHandles(), + refreshMV.getWriterScalingOptions(metadata, session)); } if (target instanceof TableExecuteTarget tableExecute) { BeginTableExecuteResult result = metadata.beginTableExecute(session, tableExecute.getExecuteHandle(), tableExecute.getMandatorySourceHandle()); - return new TableExecuteTarget(result.getTableExecuteHandle(), Optional.of(result.getSourceHandle()), tableExecute.getSchemaTableName()); + return new TableExecuteTarget(result.getTableExecuteHandle(), Optional.of(result.getSourceHandle()), tableExecute.getSchemaTableName(), tableExecute.getWriterScalingOptions()); } throw new IllegalArgumentException("Unhandled target type: " + target.getClass().getSimpleName()); } diff --git a/core/trino-main/src/main/java/io/trino/sql/planner/plan/TableWriterNode.java b/core/trino-main/src/main/java/io/trino/sql/planner/plan/TableWriterNode.java index c8cf83aebbd7..099ae3465f96 100644 --- a/core/trino-main/src/main/java/io/trino/sql/planner/plan/TableWriterNode.java +++ b/core/trino-main/src/main/java/io/trino/sql/planner/plan/TableWriterNode.java @@ -26,6 +26,7 @@ import io.trino.metadata.MergeHandle; import io.trino.metadata.Metadata; import io.trino.metadata.OutputTableHandle; +import io.trino.metadata.QualifiedObjectName; import io.trino.metadata.TableExecuteHandle; import io.trino.metadata.TableHandle; import io.trino.metadata.TableLayout; @@ -33,6 +34,7 @@ import io.trino.spi.connector.ConnectorTableMetadata; import io.trino.spi.connector.RowChangeParadigm; import io.trino.spi.connector.SchemaTableName; +import io.trino.spi.connector.WriterScalingOptions; import io.trino.spi.type.Type; import io.trino.sql.planner.PartitioningScheme; import io.trino.sql.planner.Symbol; @@ -195,6 +197,8 @@ public abstract static class WriterTarget public abstract boolean supportsMultipleWritersPerPartition(Metadata metadata, Session session); public abstract OptionalInt getMaxWriterTasks(Metadata metadata, Session session); + + public abstract WriterScalingOptions getWriterScalingOptions(Metadata metadata, Session session); } // only used during planning -- will not be serialized @@ -229,6 +233,16 @@ public OptionalInt getMaxWriterTasks(Metadata metadata, Session session) return metadata.getMaxWriterTasks(session, catalog); } + @Override + public WriterScalingOptions getWriterScalingOptions(Metadata metadata, Session session) + { + QualifiedObjectName tableName = new QualifiedObjectName( + catalog, + tableMetadata.getTableSchema().getTable().getSchemaName(), + tableMetadata.getTableSchema().getTable().getTableName()); + return metadata.getNewTableWriterScalingOptions(session, tableName, tableMetadata.getProperties()); + } + public Optional getLayout() { return layout; @@ -253,18 +267,21 @@ public static class CreateTarget private final SchemaTableName schemaTableName; private final boolean multipleWritersPerPartitionSupported; private final OptionalInt maxWriterTasks; + private final WriterScalingOptions writerScalingOptions; @JsonCreator public CreateTarget( @JsonProperty("handle") OutputTableHandle handle, @JsonProperty("schemaTableName") SchemaTableName schemaTableName, @JsonProperty("multipleWritersPerPartitionSupported") boolean multipleWritersPerPartitionSupported, - @JsonProperty("maxWriterTasks") OptionalInt maxWriterTasks) + @JsonProperty("maxWriterTasks") OptionalInt maxWriterTasks, + @JsonProperty("writerScalingOptions") WriterScalingOptions writerScalingOptions) { this.handle = requireNonNull(handle, "handle is null"); this.schemaTableName = requireNonNull(schemaTableName, "schemaTableName is null"); this.multipleWritersPerPartitionSupported = multipleWritersPerPartitionSupported; this.maxWriterTasks = requireNonNull(maxWriterTasks, "maxWriterTasks is null"); + this.writerScalingOptions = requireNonNull(writerScalingOptions, "writerScalingOptions is null"); } @JsonProperty @@ -285,6 +302,12 @@ public boolean isMultipleWritersPerPartitionSupported() return multipleWritersPerPartitionSupported; } + @JsonProperty + public WriterScalingOptions getWriterScalingOptions() + { + return writerScalingOptions; + } + @Override public String toString() { @@ -302,6 +325,12 @@ public OptionalInt getMaxWriterTasks(Metadata metadata, Session session) { return maxWriterTasks; } + + @Override + public WriterScalingOptions getWriterScalingOptions(Metadata metadata, Session session) + { + return writerScalingOptions; + } } // only used during planning -- will not be serialized @@ -346,6 +375,12 @@ public OptionalInt getMaxWriterTasks(Metadata metadata, Session session) { return metadata.getMaxWriterTasks(session, handle.getCatalogHandle().getCatalogName()); } + + @Override + public WriterScalingOptions getWriterScalingOptions(Metadata metadata, Session session) + { + return metadata.getInsertWriterScalingOptions(session, handle); + } } public static class InsertTarget @@ -355,18 +390,21 @@ public static class InsertTarget private final SchemaTableName schemaTableName; private final boolean multipleWritersPerPartitionSupported; private final OptionalInt maxWriterTasks; + private final WriterScalingOptions writerScalingOptions; @JsonCreator public InsertTarget( @JsonProperty("handle") InsertTableHandle handle, @JsonProperty("schemaTableName") SchemaTableName schemaTableName, @JsonProperty("multipleWritersPerPartitionSupported") boolean multipleWritersPerPartitionSupported, - @JsonProperty("maxWriterTasks") OptionalInt maxWriterTasks) + @JsonProperty("maxWriterTasks") OptionalInt maxWriterTasks, + @JsonProperty("writerScalingOptions") WriterScalingOptions writerScalingOptions) { this.handle = requireNonNull(handle, "handle is null"); this.schemaTableName = requireNonNull(schemaTableName, "schemaTableName is null"); this.multipleWritersPerPartitionSupported = multipleWritersPerPartitionSupported; this.maxWriterTasks = requireNonNull(maxWriterTasks, "maxWriterTasks is null"); + this.writerScalingOptions = requireNonNull(writerScalingOptions, "writerScalingOptions is null"); } @JsonProperty @@ -387,6 +425,12 @@ public boolean isMultipleWritersPerPartitionSupported() return multipleWritersPerPartitionSupported; } + @JsonProperty + public WriterScalingOptions getWriterScalingOptions() + { + return writerScalingOptions; + } + @Override public String toString() { @@ -404,6 +448,12 @@ public OptionalInt getMaxWriterTasks(Metadata metadata, Session session) { return maxWriterTasks; } + + @Override + public WriterScalingOptions getWriterScalingOptions(Metadata metadata, Session session) + { + return writerScalingOptions; + } } public static class RefreshMaterializedViewReference @@ -449,6 +499,12 @@ public OptionalInt getMaxWriterTasks(Metadata metadata, Session session) { return metadata.getMaxWriterTasks(session, storageTableHandle.getCatalogHandle().getCatalogName()); } + + @Override + public WriterScalingOptions getWriterScalingOptions(Metadata metadata, Session session) + { + return metadata.getInsertWriterScalingOptions(session, storageTableHandle); + } } public static class RefreshMaterializedViewTarget @@ -458,18 +514,21 @@ public static class RefreshMaterializedViewTarget private final InsertTableHandle insertHandle; private final SchemaTableName schemaTableName; private final List sourceTableHandles; + private final WriterScalingOptions writerScalingOptions; @JsonCreator public RefreshMaterializedViewTarget( @JsonProperty("tableHandle") TableHandle tableHandle, @JsonProperty("insertHandle") InsertTableHandle insertHandle, @JsonProperty("schemaTableName") SchemaTableName schemaTableName, - @JsonProperty("sourceTableHandles") List sourceTableHandles) + @JsonProperty("sourceTableHandles") List sourceTableHandles, + @JsonProperty("writerScalingOptions") WriterScalingOptions writerScalingOptions) { this.tableHandle = requireNonNull(tableHandle, "tableHandle is null"); this.insertHandle = requireNonNull(insertHandle, "insertHandle is null"); this.schemaTableName = requireNonNull(schemaTableName, "schemaTableName is null"); this.sourceTableHandles = ImmutableList.copyOf(sourceTableHandles); + this.writerScalingOptions = requireNonNull(writerScalingOptions, "writerScalingOptions is null"); } @JsonProperty @@ -496,6 +555,12 @@ public List getSourceTableHandles() return sourceTableHandles; } + @JsonProperty + public WriterScalingOptions getWriterScalingOptions() + { + return writerScalingOptions; + } + @Override public String toString() { @@ -515,6 +580,12 @@ public OptionalInt getMaxWriterTasks(Metadata metadata, Session session) { return metadata.getMaxWriterTasks(session, tableHandle.getCatalogHandle().getCatalogName()); } + + @Override + public WriterScalingOptions getWriterScalingOptions(Metadata metadata, Session session) + { + return writerScalingOptions; + } } public static class DeleteTarget @@ -567,6 +638,12 @@ public OptionalInt getMaxWriterTasks(Metadata metadata, Session session) { throw new UnsupportedOperationException(); } + + @Override + public WriterScalingOptions getWriterScalingOptions(Metadata metadata, Session session) + { + throw new UnsupportedOperationException(); + } } public static class UpdateTarget @@ -638,6 +715,12 @@ public OptionalInt getMaxWriterTasks(Metadata metadata, Session session) { throw new UnsupportedOperationException(); } + + @Override + public WriterScalingOptions getWriterScalingOptions(Metadata metadata, Session session) + { + throw new UnsupportedOperationException(); + } } public static class TableExecuteTarget @@ -646,16 +729,19 @@ public static class TableExecuteTarget private final TableExecuteHandle executeHandle; private final Optional sourceHandle; private final SchemaTableName schemaTableName; + private final WriterScalingOptions writerScalingOptions; @JsonCreator public TableExecuteTarget( @JsonProperty("executeHandle") TableExecuteHandle executeHandle, @JsonProperty("sourceHandle") Optional sourceHandle, - @JsonProperty("schemaTableName") SchemaTableName schemaTableName) + @JsonProperty("schemaTableName") SchemaTableName schemaTableName, + @JsonProperty("writerScalingOptions") WriterScalingOptions writerScalingOptions) { this.executeHandle = requireNonNull(executeHandle, "handle is null"); this.sourceHandle = requireNonNull(sourceHandle, "sourceHandle is null"); this.schemaTableName = requireNonNull(schemaTableName, "schemaTableName is null"); + this.writerScalingOptions = requireNonNull(writerScalingOptions, "writerScalingOptions is null"); } @JsonProperty @@ -681,6 +767,12 @@ public SchemaTableName getSchemaTableName() return schemaTableName; } + @JsonProperty + public WriterScalingOptions getWriterScalingOptions() + { + return writerScalingOptions; + } + @Override public String toString() { @@ -700,6 +792,12 @@ public OptionalInt getMaxWriterTasks(Metadata metadata, Session session) { return metadata.getMaxWriterTasks(session, executeHandle.getCatalogHandle().getCatalogName()); } + + @Override + public WriterScalingOptions getWriterScalingOptions(Metadata metadata, Session session) + { + return writerScalingOptions; + } } public static class MergeTarget @@ -764,6 +862,12 @@ public OptionalInt getMaxWriterTasks(Metadata metadata, Session session) { return OptionalInt.empty(); } + + @Override + public WriterScalingOptions getWriterScalingOptions(Metadata metadata, Session session) + { + return WriterScalingOptions.DISABLED; + } } public static class MergeParadigmAndTypes diff --git a/core/trino-main/src/main/java/io/trino/sql/planner/sanity/ValidateScaledWritersUsage.java b/core/trino-main/src/main/java/io/trino/sql/planner/sanity/ValidateScaledWritersUsage.java index 3aaf0879ac93..5bd5938b50b9 100644 --- a/core/trino-main/src/main/java/io/trino/sql/planner/sanity/ValidateScaledWritersUsage.java +++ b/core/trino-main/src/main/java/io/trino/sql/planner/sanity/ValidateScaledWritersUsage.java @@ -16,6 +16,7 @@ import com.google.common.collect.ImmutableList; import io.trino.Session; import io.trino.execution.warnings.WarningCollector; +import io.trino.spi.connector.WriterScalingOptions; import io.trino.sql.PlannerContext; import io.trino.sql.planner.PartitioningHandle; import io.trino.sql.planner.TypeAnalyzer; @@ -55,7 +56,7 @@ public void validate( } private static class Visitor - extends PlanVisitor, Void> + extends PlanVisitor, Void> { private final Session session; private final PlannerContext plannerContext; @@ -67,39 +68,50 @@ private Visitor(Session session, PlannerContext plannerContext) } @Override - protected List visitPlan(PlanNode node, Void context) + protected List visitPlan(PlanNode node, Void context) { - return collectPartitioningHandles(node.getSources()); + return collectExchanges(node.getSources()); } @Override - public List visitTableWriter(TableWriterNode node, Void context) + public List visitTableWriter(TableWriterNode node, Void context) { - List children = collectPartitioningHandles(node.getSources()); - List scaleWriterPartitioningHandle = children.stream() - .filter(PartitioningHandle::isScaleWriters) + List scaleWriterExchanges = collectExchanges(node.getSources()).stream() + .filter(exchangeNode -> exchangeNode.getPartitioningScheme().getPartitioning().getHandle().isScaleWriters()) .collect(toImmutableList()); TableWriterNode.WriterTarget target = node.getTarget(); - scaleWriterPartitioningHandle.forEach(partitioningHandle -> { - if (isScaledWriterHashDistribution(partitioningHandle)) { + scaleWriterExchanges.forEach(exchangeNode -> { + PartitioningHandle handle = exchangeNode.getPartitioningScheme().getPartitioning().getHandle(); + WriterScalingOptions scalingOptions = target.getWriterScalingOptions(plannerContext.getMetadata(), session); + if (exchangeNode.getScope() == ExchangeNode.Scope.LOCAL) { + checkState(scalingOptions.isPerTaskWriterScalingEnabled(), + "The scaled writer per task partitioning scheme is set but writer target %s doesn't support it", target); + } + + if (exchangeNode.getScope() == ExchangeNode.Scope.REMOTE) { + checkState(scalingOptions.isWriterTasksScalingEnabled(), + "The scaled writer across tasks partitioning scheme is set but writer target %s doesn't support it", target); + } + + if (isScaledWriterHashDistribution(handle)) { checkState(target.supportsMultipleWritersPerPartition(plannerContext.getMetadata(), session), "The hash scaled writer partitioning scheme is set for the partitioned write but writer target %s doesn't support multiple writers per partition", target); } }); - return children; + return scaleWriterExchanges; } @Override - public List visitExchange(ExchangeNode node, Void context) + public List visitExchange(ExchangeNode node, Void context) { - return ImmutableList.builder() - .add(node.getPartitioningScheme().getPartitioning().getHandle()) - .addAll(collectPartitioningHandles(node.getSources())) + return ImmutableList.builder() + .add(node) + .addAll(collectExchanges(node.getSources())) .build(); } - private List collectPartitioningHandles(List nodes) + private List collectExchanges(List nodes) { return nodes.stream() .map(node -> node.accept(this, null)) diff --git a/core/trino-main/src/main/java/io/trino/tracing/TracingConnectorMetadata.java b/core/trino-main/src/main/java/io/trino/tracing/TracingConnectorMetadata.java index 9d2ab408622d..1ba927fa79b4 100644 --- a/core/trino-main/src/main/java/io/trino/tracing/TracingConnectorMetadata.java +++ b/core/trino-main/src/main/java/io/trino/tracing/TracingConnectorMetadata.java @@ -63,6 +63,7 @@ import io.trino.spi.connector.TableFunctionApplicationResult; import io.trino.spi.connector.TableScanRedirectApplicationResult; import io.trino.spi.connector.TopNApplicationResult; +import io.trino.spi.connector.WriterScalingOptions; import io.trino.spi.expression.ConnectorExpression; import io.trino.spi.function.AggregationFunctionMetadata; import io.trino.spi.function.BoundSignature; @@ -1245,6 +1246,24 @@ public OptionalInt getMaxWriterTasks(ConnectorSession session) } } + @Override + public WriterScalingOptions getNewTableWriterScalingOptions(ConnectorSession session, SchemaTableName tableName, Map tableProperties) + { + Span span = startSpan("getNewTableWriterScalingOptions", tableName); + try (var ignored = scopedSpan(span)) { + return delegate.getNewTableWriterScalingOptions(session, tableName, tableProperties); + } + } + + @Override + public WriterScalingOptions getInsertWriterScalingOptions(ConnectorSession session, ConnectorTableHandle tableHandle) + { + Span span = startSpan("getInsertWriterScalingOptions", tableHandle); + try (var ignored = scopedSpan(span)) { + return delegate.getInsertWriterScalingOptions(session, tableHandle); + } + } + private Span startSpan(String methodName) { return tracer.spanBuilder("ConnectorMetadata." + methodName) diff --git a/core/trino-main/src/main/java/io/trino/tracing/TracingMetadata.java b/core/trino-main/src/main/java/io/trino/tracing/TracingMetadata.java index 1386a7d1232b..d83a313061c0 100644 --- a/core/trino-main/src/main/java/io/trino/tracing/TracingMetadata.java +++ b/core/trino-main/src/main/java/io/trino/tracing/TracingMetadata.java @@ -74,6 +74,7 @@ import io.trino.spi.connector.TableFunctionApplicationResult; import io.trino.spi.connector.TableScanRedirectApplicationResult; import io.trino.spi.connector.TopNApplicationResult; +import io.trino.spi.connector.WriterScalingOptions; import io.trino.spi.expression.ConnectorExpression; import io.trino.spi.function.AggregationFunctionMetadata; import io.trino.spi.function.FunctionMetadata; @@ -1358,6 +1359,24 @@ public OptionalInt getMaxWriterTasks(Session session, String catalogName) } } + @Override + public WriterScalingOptions getNewTableWriterScalingOptions(Session session, QualifiedObjectName tableName, Map tableProperties) + { + Span span = startSpan("getNewTableWriterScalingOptions", tableName); + try (var ignored = scopedSpan(span)) { + return delegate.getNewTableWriterScalingOptions(session, tableName, tableProperties); + } + } + + @Override + public WriterScalingOptions getInsertWriterScalingOptions(Session session, TableHandle tableHandle) + { + Span span = startSpan("getInsertWriterScalingOptions", tableHandle); + try (var ignored = scopedSpan(span)) { + return delegate.getInsertWriterScalingOptions(session, tableHandle); + } + } + private Span startSpan(String methodName) { return tracer.spanBuilder("Metadata." + methodName) diff --git a/core/trino-main/src/test/java/io/trino/connector/MockConnector.java b/core/trino-main/src/test/java/io/trino/connector/MockConnector.java index f7a09f1970b5..a741bea2ac57 100644 --- a/core/trino-main/src/test/java/io/trino/connector/MockConnector.java +++ b/core/trino-main/src/test/java/io/trino/connector/MockConnector.java @@ -83,6 +83,7 @@ import io.trino.spi.connector.TableProcedureMetadata; import io.trino.spi.connector.TableScanRedirectApplicationResult; import io.trino.spi.connector.TopNApplicationResult; +import io.trino.spi.connector.WriterScalingOptions; import io.trino.spi.eventlistener.EventListener; import io.trino.spi.expression.ConnectorExpression; import io.trino.spi.function.FunctionProvider; @@ -178,6 +179,7 @@ public class MockConnector private final Function tableFunctionSplitsSources; private final OptionalInt maxWriterTasks; private final BiFunction> getLayoutForTableExecute; + private final WriterScalingOptions writerScalingOptions; MockConnector( Function metadataWrapper, @@ -224,7 +226,8 @@ public class MockConnector Supplier>> columnProperties, Function tableFunctionSplitsSources, OptionalInt maxWriterTasks, - BiFunction> getLayoutForTableExecute) + BiFunction> getLayoutForTableExecute, + WriterScalingOptions writerScalingOptions) { this.metadataWrapper = requireNonNull(metadataWrapper, "metadataWrapper is null"); this.sessionProperties = ImmutableList.copyOf(requireNonNull(sessionProperties, "sessionProperties is null")); @@ -271,6 +274,7 @@ public class MockConnector this.tableFunctionSplitsSources = requireNonNull(tableFunctionSplitsSources, "tableFunctionSplitsSources is null"); this.maxWriterTasks = requireNonNull(maxWriterTasks, "maxWriterTasks is null"); this.getLayoutForTableExecute = requireNonNull(getLayoutForTableExecute, "getLayoutForTableExecute is null"); + this.writerScalingOptions = requireNonNull(writerScalingOptions, "writerScalingOptions is null"); } @Override @@ -872,6 +876,18 @@ public BeginTableExecuteResult(tableExecuteHandle, updatedSourceTableHandle); } + @Override + public WriterScalingOptions getNewTableWriterScalingOptions(ConnectorSession session, SchemaTableName tableName, Map tableProperties) + { + return writerScalingOptions; + } + + @Override + public WriterScalingOptions getInsertWriterScalingOptions(ConnectorSession session, ConnectorTableHandle tableHandle) + { + return writerScalingOptions; + } + private MockConnectorAccessControl getMockAccessControl() { return (MockConnectorAccessControl) getAccessControl(); diff --git a/core/trino-main/src/test/java/io/trino/connector/MockConnectorFactory.java b/core/trino-main/src/test/java/io/trino/connector/MockConnectorFactory.java index b57d66d60e43..648415a5b59f 100644 --- a/core/trino-main/src/test/java/io/trino/connector/MockConnectorFactory.java +++ b/core/trino-main/src/test/java/io/trino/connector/MockConnectorFactory.java @@ -51,6 +51,7 @@ import io.trino.spi.connector.TableProcedureMetadata; import io.trino.spi.connector.TableScanRedirectApplicationResult; import io.trino.spi.connector.TopNApplicationResult; +import io.trino.spi.connector.WriterScalingOptions; import io.trino.spi.eventlistener.EventListener; import io.trino.spi.expression.ConnectorExpression; import io.trino.spi.function.FunctionProvider; @@ -138,6 +139,8 @@ public class MockConnectorFactory private final OptionalInt maxWriterTasks; private final BiFunction> getLayoutForTableExecute; + private final WriterScalingOptions writerScalingOptions; + private MockConnectorFactory( String name, List> sessionProperty, @@ -184,7 +187,8 @@ private MockConnectorFactory( boolean allowMissingColumnsOnInsert, Function tableFunctionSplitsSources, OptionalInt maxWriterTasks, - BiFunction> getLayoutForTableExecute) + BiFunction> getLayoutForTableExecute, + WriterScalingOptions writerScalingOptions) { this.name = requireNonNull(name, "name is null"); this.sessionProperty = ImmutableList.copyOf(requireNonNull(sessionProperty, "sessionProperty is null")); @@ -232,6 +236,7 @@ private MockConnectorFactory( this.tableFunctionSplitsSources = requireNonNull(tableFunctionSplitsSources, "tableFunctionSplitsSources is null"); this.maxWriterTasks = maxWriterTasks; this.getLayoutForTableExecute = requireNonNull(getLayoutForTableExecute, "getLayoutForTableExecute is null"); + this.writerScalingOptions = requireNonNull(writerScalingOptions, "writerScalingOptions is null"); } @Override @@ -288,7 +293,8 @@ public Connector create(String catalogName, Map config, Connecto columnProperties, tableFunctionSplitsSources, maxWriterTasks, - getLayoutForTableExecute); + getLayoutForTableExecute, + writerScalingOptions); } public static MockConnectorFactory create() @@ -438,6 +444,7 @@ public static final class Builder private boolean allowMissingColumnsOnInsert; private OptionalInt maxWriterTasks = OptionalInt.empty(); private BiFunction> getLayoutForTableExecute = (session, handle) -> Optional.empty(); + private WriterScalingOptions writerScalingOptions = WriterScalingOptions.DISABLED; private Builder() {} @@ -759,6 +766,12 @@ public Builder withAllowMissingColumnsOnInsert(boolean allowMissingColumnsOnInse return this; } + public Builder withWriterScalingOptions(WriterScalingOptions writerScalingOptions) + { + this.writerScalingOptions = writerScalingOptions; + return this; + } + public MockConnectorFactory build() { Optional accessControl = Optional.empty(); @@ -811,7 +824,8 @@ public MockConnectorFactory build() allowMissingColumnsOnInsert, tableFunctionSplitsSources, maxWriterTasks, - getLayoutForTableExecute); + getLayoutForTableExecute, + writerScalingOptions); } public static Function> defaultListSchemaNames() diff --git a/core/trino-main/src/test/java/io/trino/metadata/AbstractMockMetadata.java b/core/trino-main/src/test/java/io/trino/metadata/AbstractMockMetadata.java index 38cb84ca41fa..606dca48852b 100644 --- a/core/trino-main/src/test/java/io/trino/metadata/AbstractMockMetadata.java +++ b/core/trino-main/src/test/java/io/trino/metadata/AbstractMockMetadata.java @@ -53,6 +53,7 @@ import io.trino.spi.connector.TableFunctionApplicationResult; import io.trino.spi.connector.TableScanRedirectApplicationResult; import io.trino.spi.connector.TopNApplicationResult; +import io.trino.spi.connector.WriterScalingOptions; import io.trino.spi.expression.ConnectorExpression; import io.trino.spi.function.AggregationFunctionMetadata; import io.trino.spi.function.BoundSignature; @@ -934,4 +935,16 @@ public OptionalInt getMaxWriterTasks(Session session, String catalogName) { throw new UnsupportedOperationException(); } + + @Override + public WriterScalingOptions getNewTableWriterScalingOptions(Session session, QualifiedObjectName tableName, Map tableProperties) + { + throw new UnsupportedOperationException(); + } + + @Override + public WriterScalingOptions getInsertWriterScalingOptions(Session session, TableHandle tableHandle) + { + throw new UnsupportedOperationException(); + } } diff --git a/core/trino-main/src/test/java/io/trino/operator/TestTableWriterOperator.java b/core/trino-main/src/test/java/io/trino/operator/TestTableWriterOperator.java index 8c581dca4e83..5b59409a848e 100644 --- a/core/trino-main/src/test/java/io/trino/operator/TestTableWriterOperator.java +++ b/core/trino-main/src/test/java/io/trino/operator/TestTableWriterOperator.java @@ -35,6 +35,7 @@ import io.trino.spi.connector.ConnectorSession; import io.trino.spi.connector.ConnectorTransactionHandle; import io.trino.spi.connector.SchemaTableName; +import io.trino.spi.connector.WriterScalingOptions; import io.trino.spi.type.Type; import io.trino.split.PageSinkManager; import io.trino.sql.planner.plan.PlanNodeId; @@ -298,7 +299,8 @@ private Operator createTableWriterOperator( new ConnectorOutputTableHandle() {}), schemaTableName, false, - OptionalInt.empty()), + OptionalInt.empty(), + WriterScalingOptions.DISABLED), ImmutableList.of(0), session, statisticsAggregation, diff --git a/core/trino-main/src/test/java/io/trino/sql/planner/TestingWriterTarget.java b/core/trino-main/src/test/java/io/trino/sql/planner/TestingWriterTarget.java index a7ea3662a885..636c5480a84a 100644 --- a/core/trino-main/src/test/java/io/trino/sql/planner/TestingWriterTarget.java +++ b/core/trino-main/src/test/java/io/trino/sql/planner/TestingWriterTarget.java @@ -16,6 +16,7 @@ import io.trino.Session; import io.trino.metadata.Metadata; +import io.trino.spi.connector.WriterScalingOptions; import io.trino.sql.planner.plan.TableWriterNode; import java.util.OptionalInt; @@ -40,4 +41,10 @@ public OptionalInt getMaxWriterTasks(Metadata metadata, Session session) { return OptionalInt.empty(); } + + @Override + public WriterScalingOptions getWriterScalingOptions(Metadata metadata, Session session) + { + return WriterScalingOptions.DISABLED; + } } diff --git a/core/trino-main/src/test/java/io/trino/sql/planner/iterative/rule/TestRemoveEmptyMergeWriterRuleSet.java b/core/trino-main/src/test/java/io/trino/sql/planner/iterative/rule/TestRemoveEmptyMergeWriterRuleSet.java index fa70ef647aff..639732dff4ab 100644 --- a/core/trino-main/src/test/java/io/trino/sql/planner/iterative/rule/TestRemoveEmptyMergeWriterRuleSet.java +++ b/core/trino-main/src/test/java/io/trino/sql/planner/iterative/rule/TestRemoveEmptyMergeWriterRuleSet.java @@ -15,6 +15,7 @@ import io.trino.spi.connector.CatalogHandle; import io.trino.spi.connector.SchemaTableName; +import io.trino.spi.connector.WriterScalingOptions; import io.trino.sql.planner.Partitioning; import io.trino.sql.planner.PartitioningScheme; import io.trino.sql.planner.Symbol; @@ -89,7 +90,7 @@ private void testRemoveEmptyMergeRewrite(Rule rule, boolean pla List.of(rowCount)); return p.tableFinish( planWithExchange ? withExchange(p, merge, rowCount) : merge, - p.createTarget(catalogHandle, schemaTableName, true), + p.createTarget(catalogHandle, schemaTableName, true, WriterScalingOptions.ENABLED), rowCount); }) .matches(values("A")); diff --git a/core/trino-main/src/test/java/io/trino/sql/planner/iterative/rule/test/PlanBuilder.java b/core/trino-main/src/test/java/io/trino/sql/planner/iterative/rule/test/PlanBuilder.java index 7d431f8d0c4e..2463d5a313ef 100644 --- a/core/trino-main/src/test/java/io/trino/sql/planner/iterative/rule/test/PlanBuilder.java +++ b/core/trino-main/src/test/java/io/trino/sql/planner/iterative/rule/test/PlanBuilder.java @@ -33,6 +33,7 @@ import io.trino.spi.connector.ColumnHandle; import io.trino.spi.connector.SchemaTableName; import io.trino.spi.connector.SortOrder; +import io.trino.spi.connector.WriterScalingOptions; import io.trino.spi.function.table.ConnectorTableFunctionHandle; import io.trino.spi.predicate.TupleDomain; import io.trino.spi.type.Type; @@ -716,7 +717,7 @@ public TableFinishNode tableWithExchangeCreate(WriterTarget target, PlanNode sou rowCountSymbol); } - public CreateTarget createTarget(CatalogHandle catalogHandle, SchemaTableName schemaTableName, boolean multipleWritersPerPartitionSupported, OptionalInt maxWriterTasks) + public CreateTarget createTarget(CatalogHandle catalogHandle, SchemaTableName schemaTableName, boolean multipleWritersPerPartitionSupported, OptionalInt maxWriterTasks, WriterScalingOptions writerScalingOptions) { OutputTableHandle tableHandle = new OutputTableHandle( catalogHandle, @@ -727,12 +728,13 @@ public CreateTarget createTarget(CatalogHandle catalogHandle, SchemaTableName sc tableHandle, schemaTableName, multipleWritersPerPartitionSupported, - maxWriterTasks); + maxWriterTasks, + writerScalingOptions); } - public CreateTarget createTarget(CatalogHandle catalogHandle, SchemaTableName schemaTableName, boolean multipleWritersPerPartitionSupported) + public CreateTarget createTarget(CatalogHandle catalogHandle, SchemaTableName schemaTableName, boolean multipleWritersPerPartitionSupported, WriterScalingOptions writerScalingOptions) { - return createTarget(catalogHandle, schemaTableName, multipleWritersPerPartitionSupported, OptionalInt.empty()); + return createTarget(catalogHandle, schemaTableName, multipleWritersPerPartitionSupported, OptionalInt.empty(), writerScalingOptions); } public MergeWriterNode merge(SchemaTableName schemaTableName, PlanNode mergeSource, Symbol mergeRow, Symbol rowId, List outputs) @@ -1206,7 +1208,8 @@ public TableExecuteNode tableExecute( TestingTransactionHandle.create(), new TestingTableExecuteHandle()), Optional.empty(), - new SchemaTableName("schemaName", "tableName")), + new SchemaTableName("schemaName", "tableName"), + WriterScalingOptions.DISABLED), symbol("partialrows", BIGINT), symbol("fragment", VARBINARY), columns, diff --git a/core/trino-main/src/test/java/io/trino/sql/planner/optimizations/TestAddExchangesScaledWriters.java b/core/trino-main/src/test/java/io/trino/sql/planner/optimizations/TestAddExchangesScaledWriters.java index 7ac201fd02b9..9a24c905a2a1 100644 --- a/core/trino-main/src/test/java/io/trino/sql/planner/optimizations/TestAddExchangesScaledWriters.java +++ b/core/trino-main/src/test/java/io/trino/sql/planner/optimizations/TestAddExchangesScaledWriters.java @@ -18,6 +18,7 @@ import io.trino.Session; import io.trino.connector.MockConnectorFactory; import io.trino.plugin.tpch.TpchConnectorFactory; +import io.trino.spi.connector.WriterScalingOptions; import io.trino.sql.planner.LogicalPlanner; import io.trino.sql.planner.SubPlan; import io.trino.sql.planner.assertions.BasePlanTest; @@ -42,15 +43,17 @@ protected LocalQueryRunner createLocalQueryRunner() .build(); LocalQueryRunner queryRunner = LocalQueryRunner.create(session); queryRunner.createCatalog("tpch", new TpchConnectorFactory(1), ImmutableMap.of()); - queryRunner.createCatalog("catalog", createConnectorFactory("catalog"), ImmutableMap.of()); + queryRunner.createCatalog("catalog_with_scaled_writers", createConnectorFactory("catalog_with_scaled_writers", true), ImmutableMap.of()); + queryRunner.createCatalog("catalog_without_scaled_writers", createConnectorFactory("catalog_without_scaled_writers", false), ImmutableMap.of()); return queryRunner; } - private MockConnectorFactory createConnectorFactory(String name) + private MockConnectorFactory createConnectorFactory(String name, boolean writerScalingEnabledAcrossTasks) { return MockConnectorFactory.builder() .withGetTableHandle(((session, schemaTableName) -> null)) .withName(name) + .withWriterScalingOptions(new WriterScalingOptions(writerScalingEnabledAcrossTasks, true)) .build(); } @@ -68,7 +71,7 @@ public void testScaledWriters(boolean isScaleWritersEnabled) .build(); @Language("SQL") - String query = "CREATE TABLE catalog.mock.test AS SELECT * FROM tpch.tiny.nation"; + String query = "CREATE TABLE catalog_with_scaled_writers.mock.test AS SELECT * FROM tpch.tiny.nation"; SubPlan subPlan = subplan(query, LogicalPlanner.Stage.OPTIMIZED_AND_VALIDATED, false, session); if (isScaleWritersEnabled) { assertThat(subPlan.getAllFragments().get(1).getPartitioning().getConnectorHandle()).isEqualTo(SCALED_WRITER_ROUND_ROBIN_DISTRIBUTION.getConnectorHandle()); @@ -78,4 +81,18 @@ public void testScaledWriters(boolean isScaleWritersEnabled) fragment -> assertThat(fragment.getPartitioning().getConnectorHandle()).isNotEqualTo(SCALED_WRITER_ROUND_ROBIN_DISTRIBUTION.getConnectorHandle())); } } + + @Test(dataProvider = "scale_writers") + public void testScaledWritersWithTasksScalingDisabled(boolean isScaleWritersEnabled) + { + Session session = testSessionBuilder() + .setSystemProperty("scale_writers", Boolean.toString(isScaleWritersEnabled)) + .build(); + + @Language("SQL") + String query = "CREATE TABLE catalog_without_scaled_writers.mock.test AS SELECT * FROM tpch.tiny.nation"; + SubPlan subPlan = subplan(query, LogicalPlanner.Stage.OPTIMIZED_AND_VALIDATED, false, session); + subPlan.getAllFragments().forEach( + fragment -> assertThat(fragment.getPartitioning().getConnectorHandle()).isNotEqualTo(SCALED_WRITER_ROUND_ROBIN_DISTRIBUTION.getConnectorHandle())); + } } diff --git a/core/trino-main/src/test/java/io/trino/sql/planner/optimizations/TestAddLocalExchangesForTaskScaleWriters.java b/core/trino-main/src/test/java/io/trino/sql/planner/optimizations/TestAddLocalExchangesForTaskScaleWriters.java index a46ade141e6e..0b7224b0902f 100644 --- a/core/trino-main/src/test/java/io/trino/sql/planner/optimizations/TestAddLocalExchangesForTaskScaleWriters.java +++ b/core/trino-main/src/test/java/io/trino/sql/planner/optimizations/TestAddLocalExchangesForTaskScaleWriters.java @@ -27,6 +27,7 @@ import io.trino.spi.connector.ConnectorPartitioningHandle; import io.trino.spi.connector.ConnectorTableLayout; import io.trino.spi.connector.TableProcedureMetadata; +import io.trino.spi.connector.WriterScalingOptions; import io.trino.spi.session.PropertyMetadata; import io.trino.spi.statistics.ColumnStatistics; import io.trino.spi.statistics.Estimate; @@ -75,19 +76,24 @@ protected LocalQueryRunner createLocalQueryRunner() { LocalQueryRunner queryRunner = LocalQueryRunner.create(testSessionBuilder().build()); queryRunner.createCatalog( - "mock_without_multiple_writer_per_partition", - createConnectorFactory("mock_without_multiple_writer_per_partition", false), + "mock_with_scaled_writers", + createConnectorFactory("mock_with_scaled_writers", true, true), + ImmutableMap.of()); + queryRunner.createCatalog( + "mock_without_scaled_writers", + createConnectorFactory("mock_without_scaled_writers", true, false), ImmutableMap.of()); queryRunner.createCatalog( - "mock_with_multiple_writer_per_partition", - createConnectorFactory("mock_with_multiple_writer_per_partition", true), + "mock_without_multiple_writer_per_partition", + createConnectorFactory("mock_without_multiple_writer_per_partition", false, true), ImmutableMap.of()); return queryRunner; } private MockConnectorFactory createConnectorFactory( String catalogHandle, - boolean supportsMultipleWritersPerPartition) + boolean supportsMultipleWritersPerPartition, + boolean writerScalingEnabledWithinTask) { return MockConnectorFactory.builder() .withGetTableHandle(((session, tableName) -> { @@ -99,6 +105,7 @@ private MockConnectorFactory createConnectorFactory( } return null; })) + .withWriterScalingOptions(new WriterScalingOptions(true, writerScalingEnabledWithinTask)) .withGetTableStatistics(tableName -> { if (tableName.getTableName().equals("source_table")) { return new TableStatistics( @@ -175,6 +182,42 @@ public void testLocalScaledUnpartitionedWriterDistribution() tableScan("source_table", ImmutableMap.of("customer", "customer", "year", "year"))))))); } + @Test + public void testLocalScaledUnpartitionedWriterWithPerTaskScalingDisabled() + { + assertDistributedPlan( + "INSERT INTO unpartitioned_table SELECT * FROM source_table", + testSessionBuilder() + .setCatalog("mock_without_scaled_writers") + .setSchema("mock") + .setSystemProperty(TASK_SCALE_WRITERS_ENABLED, "true") + .setSystemProperty(SCALE_WRITERS, "false") + .build(), + anyTree( + tableWriter( + ImmutableList.of("customer", "year"), + ImmutableList.of("customer", "year"), + exchange(LOCAL, GATHER, SINGLE_DISTRIBUTION, + exchange(REMOTE, REPARTITION, FIXED_ARBITRARY_DISTRIBUTION, + tableScan("source_table", ImmutableMap.of("customer", "customer", "year", "year"))))))); + + assertDistributedPlan( + "INSERT INTO unpartitioned_table SELECT * FROM source_table", + testSessionBuilder() + .setCatalog("mock_without_scaled_writers") + .setSchema("mock") + .setSystemProperty(TASK_SCALE_WRITERS_ENABLED, "false") + .setSystemProperty(SCALE_WRITERS, "false") + .build(), + anyTree( + tableWriter( + ImmutableList.of("customer", "year"), + ImmutableList.of("customer", "year"), + exchange(LOCAL, GATHER, SINGLE_DISTRIBUTION, + exchange(REMOTE, REPARTITION, FIXED_ARBITRARY_DISTRIBUTION, + tableScan("source_table", ImmutableMap.of("customer", "customer", "year", "year"))))))); + } + @Test(dataProvider = "taskScaleWritersOption") public void testLocalScaledPartitionedWriterWithoutSupportForMultipleWritersPerPartition(boolean taskScaleWritersEnabled) { @@ -201,6 +244,32 @@ public void testLocalScaledPartitionedWriterWithoutSupportForMultipleWritersPerP tableScan("source_table", ImmutableMap.of("customer", "customer", "year", "year"))))))); } + @Test(dataProvider = "taskScaleWritersOption") + public void testLocalScaledPartitionedWriterWithPerTaskScalingDisabled(boolean taskScaleWritersEnabled) + { + String catalogName = "mock_without_scaled_writers"; + PartitioningHandle partitioningHandle = new PartitioningHandle( + Optional.of(getCatalogHandle(catalogName)), + Optional.of(MockConnectorTransactionHandle.INSTANCE), + CONNECTOR_PARTITIONING_HANDLE); + + assertDistributedPlan( + "INSERT INTO connector_partitioned_table SELECT * FROM source_table", + testSessionBuilder() + .setCatalog(catalogName) + .setSchema("mock") + .setSystemProperty(TASK_SCALE_WRITERS_ENABLED, String.valueOf(taskScaleWritersEnabled)) + .setSystemProperty(SCALE_WRITERS, "false") + .build(), + anyTree( + tableWriter( + ImmutableList.of("customer", "year"), + ImmutableList.of("customer", "year"), + exchange(LOCAL, REPARTITION, partitioningHandle, + exchange(REMOTE, REPARTITION, partitioningHandle, + tableScan("source_table", ImmutableMap.of("customer", "customer", "year", "year"))))))); + } + @DataProvider public Object[][] taskScaleWritersOption() { @@ -213,7 +282,7 @@ public void testLocalScaledPartitionedWriterForSystemPartitioningWithEnforcedPre assertDistributedPlan( "INSERT INTO system_partitioned_table SELECT * FROM source_table", testSessionBuilder() - .setCatalog("mock_with_multiple_writer_per_partition") + .setCatalog("mock_with_scaled_writers") .setSchema("mock") // Enforce preferred partitioning .setSystemProperty(USE_PREFERRED_WRITE_PARTITIONING, "true") @@ -233,7 +302,7 @@ public void testLocalScaledPartitionedWriterForSystemPartitioningWithEnforcedPre assertDistributedPlan( "INSERT INTO system_partitioned_table SELECT * FROM source_table", testSessionBuilder() - .setCatalog("mock_with_multiple_writer_per_partition") + .setCatalog("mock_with_scaled_writers") .setSchema("mock") // Enforce preferred partitioning .setSystemProperty(USE_PREFERRED_WRITE_PARTITIONING, "true") @@ -254,7 +323,7 @@ public void testLocalScaledPartitionedWriterForSystemPartitioningWithEnforcedPre @Test public void testLocalScaledPartitionedWriterForConnectorPartitioning() { - String catalogName = "mock_with_multiple_writer_per_partition"; + String catalogName = "mock_with_scaled_writers"; PartitioningHandle partitioningHandle = new PartitioningHandle( Optional.of(getCatalogHandle(catalogName)), Optional.of(MockConnectorTransactionHandle.INSTANCE), @@ -304,7 +373,7 @@ public void testLocalScaledPartitionedWriterWithEnforcedLocalPreferredPartitioni assertDistributedPlan( "INSERT INTO system_partitioned_table SELECT * FROM source_table", testSessionBuilder() - .setCatalog("mock_with_multiple_writer_per_partition") + .setCatalog("mock_with_scaled_writers") .setSchema("mock") .setSystemProperty(TASK_SCALE_WRITERS_ENABLED, "true") .setSystemProperty(SCALE_WRITERS, "false") @@ -322,7 +391,7 @@ public void testLocalScaledPartitionedWriterWithEnforcedLocalPreferredPartitioni assertDistributedPlan( "INSERT INTO system_partitioned_table SELECT * FROM source_table", testSessionBuilder() - .setCatalog("mock_with_multiple_writer_per_partition") + .setCatalog("mock_with_scaled_writers") .setSchema("mock") .setSystemProperty(TASK_SCALE_WRITERS_ENABLED, "false") .setSystemProperty(SCALE_WRITERS, "false") @@ -344,7 +413,7 @@ public void testTableExecuteLocalScalingDisabledForPartitionedTable() @Language("SQL") String query = "ALTER TABLE system_partitioned_table EXECUTE optimize(file_size_threshold => '10MB')"; Session session = Session.builder(getQueryRunner().getDefaultSession()) - .setCatalog("mock_with_multiple_writer_per_partition") + .setCatalog("mock_with_scaled_writers") .setSchema("mock") .setSystemProperty(TASK_SCALE_WRITERS_ENABLED, "true") .build(); @@ -367,7 +436,7 @@ public void testTableExecuteLocalScalingDisabledForUnpartitionedTable() @Language("SQL") String query = "ALTER TABLE unpartitioned_table EXECUTE optimize(file_size_threshold => '10MB')"; Session session = Session.builder(getQueryRunner().getDefaultSession()) - .setCatalog("mock_with_multiple_writer_per_partition") + .setCatalog("mock_with_scaled_writers") .setSchema("mock") .setSystemProperty(TASK_SCALE_WRITERS_ENABLED, "true") .build(); diff --git a/core/trino-main/src/test/java/io/trino/sql/planner/optimizations/TestLimitMaxWriterNodesCount.java b/core/trino-main/src/test/java/io/trino/sql/planner/optimizations/TestLimitMaxWriterNodesCount.java index 4cd5064693ef..68dc833fdc0f 100644 --- a/core/trino-main/src/test/java/io/trino/sql/planner/optimizations/TestLimitMaxWriterNodesCount.java +++ b/core/trino-main/src/test/java/io/trino/sql/planner/optimizations/TestLimitMaxWriterNodesCount.java @@ -25,6 +25,7 @@ import io.trino.spi.connector.ConnectorPartitioningHandle; import io.trino.spi.connector.ConnectorTableLayout; import io.trino.spi.connector.TableProcedureMetadata; +import io.trino.spi.connector.WriterScalingOptions; import io.trino.spi.session.PropertyMetadata; import io.trino.sql.planner.SystemPartitioningHandle; import io.trino.sql.planner.TestTableScanNodePartitioning; @@ -99,6 +100,7 @@ private MockConnectorFactory prepareConnectorFactory(String catalogName, Optiona } return null; })) + .withWriterScalingOptions(WriterScalingOptions.ENABLED) .withGetInsertLayout((session, tableMetadata) -> { if (tableMetadata.getTableName().equals(partitionedTable)) { return Optional.of(new ConnectorTableLayout(ImmutableList.of("column_a"))); diff --git a/core/trino-main/src/test/java/io/trino/sql/planner/sanity/TestValidateScaledWritersUsage.java b/core/trino-main/src/test/java/io/trino/sql/planner/sanity/TestValidateScaledWritersUsage.java index 19a3e4931da0..1575e0ac3f81 100644 --- a/core/trino-main/src/test/java/io/trino/sql/planner/sanity/TestValidateScaledWritersUsage.java +++ b/core/trino-main/src/test/java/io/trino/sql/planner/sanity/TestValidateScaledWritersUsage.java @@ -24,6 +24,7 @@ import io.trino.spi.connector.ConnectorPartitioningHandle; import io.trino.spi.connector.ConnectorTransactionHandle; import io.trino.spi.connector.SchemaTableName; +import io.trino.spi.connector.WriterScalingOptions; import io.trino.sql.PlannerContext; import io.trino.sql.planner.Partitioning; import io.trino.sql.planner.PartitioningHandle; @@ -33,6 +34,7 @@ import io.trino.sql.planner.TypeProvider; import io.trino.sql.planner.assertions.BasePlanTest; import io.trino.sql.planner.iterative.rule.test.PlanBuilder; +import io.trino.sql.planner.plan.ExchangeNode; import io.trino.sql.planner.plan.PlanNode; import io.trino.sql.planner.plan.TableScanNode; import io.trino.testing.LocalQueryRunner; @@ -117,12 +119,60 @@ public void testScaledWritersUsedAndTargetSupportsIt(PartitioningHandle scaledWr PlanNode root = planBuilder.output( outputBuilder -> outputBuilder .source(planBuilder.tableWithExchangeCreate( - planBuilder.createTarget(catalog, schemaTableName, true), + planBuilder.createTarget(catalog, schemaTableName, true, WriterScalingOptions.ENABLED), tableWriterSource, symbol))); validatePlan(root); } + @Test(dataProvider = "scaledWriterPartitioningHandles") + public void testScaledWritersUsedAndTargetDoesNotSupportScalingPerTask(PartitioningHandle scaledWriterPartitionHandle) + { + PlanNode tableWriterSource = planBuilder.exchange(ex -> + ex + .partitioningScheme(new PartitioningScheme(Partitioning.create(scaledWriterPartitionHandle, ImmutableList.of()), ImmutableList.of(symbol))) + .addInputsSet(symbol) + .addSource(planBuilder.exchange(innerExchange -> + innerExchange + .scope(ExchangeNode.Scope.LOCAL) + .partitioningScheme(new PartitioningScheme(Partitioning.create(scaledWriterPartitionHandle, ImmutableList.of()), ImmutableList.of(symbol))) + .addInputsSet(symbol) + .addSource(tableScanNode)))); + PlanNode root = planBuilder.output( + outputBuilder -> outputBuilder + .source(planBuilder.tableWithExchangeCreate( + planBuilder.createTarget(catalog, schemaTableName, true, new WriterScalingOptions(true, false)), + tableWriterSource, + symbol))); + assertThatThrownBy(() -> validatePlan(root)) + .isInstanceOf(IllegalStateException.class) + .hasMessage("The scaled writer per task partitioning scheme is set but writer target catalog:INSTANCE doesn't support it"); + } + + @Test(dataProvider = "scaledWriterPartitioningHandles") + public void testScaledWritersUsedAndTargetDoesNotSupportScalingAcrossTasks(PartitioningHandle scaledWriterPartitionHandle) + { + PlanNode tableWriterSource = planBuilder.exchange(ex -> + ex + .partitioningScheme(new PartitioningScheme(Partitioning.create(scaledWriterPartitionHandle, ImmutableList.of()), ImmutableList.of(symbol))) + .addInputsSet(symbol) + .addSource(planBuilder.exchange(innerExchange -> + innerExchange + .scope(ExchangeNode.Scope.REMOTE) + .partitioningScheme(new PartitioningScheme(Partitioning.create(scaledWriterPartitionHandle, ImmutableList.of()), ImmutableList.of(symbol))) + .addInputsSet(symbol) + .addSource(tableScanNode)))); + PlanNode root = planBuilder.output( + outputBuilder -> outputBuilder + .source(planBuilder.tableWithExchangeCreate( + planBuilder.createTarget(catalog, schemaTableName, true, new WriterScalingOptions(false, true)), + tableWriterSource, + symbol))); + assertThatThrownBy(() -> validatePlan(root)) + .isInstanceOf(IllegalStateException.class) + .hasMessage("The scaled writer across tasks partitioning scheme is set but writer target catalog:INSTANCE doesn't support it"); + } + @Test(dataProvider = "scaledWriterPartitioningHandles") public void testScaledWriterUsedAndTargetDoesNotSupportMultipleWritersPerPartition(PartitioningHandle scaledWriterPartitionHandle) { @@ -138,7 +188,7 @@ public void testScaledWriterUsedAndTargetDoesNotSupportMultipleWritersPerPartiti PlanNode root = planBuilder.output( outputBuilder -> outputBuilder .source(planBuilder.tableWithExchangeCreate( - planBuilder.createTarget(catalog, schemaTableName, false), + planBuilder.createTarget(catalog, schemaTableName, false, WriterScalingOptions.ENABLED), tableWriterSource, symbol))); @@ -173,7 +223,7 @@ public void testScaledWriterWithMultipleSourceExchangesAndTargetDoesNotSupportMu PlanNode root = planBuilder.output( outputBuilder -> outputBuilder .source(planBuilder.tableWithExchangeCreate( - planBuilder.createTarget(catalog, schemaTableName, false), + planBuilder.createTarget(catalog, schemaTableName, false, WriterScalingOptions.ENABLED), tableWriterSource, symbol))); diff --git a/core/trino-spi/src/main/java/io/trino/spi/connector/ConnectorMetadata.java b/core/trino-spi/src/main/java/io/trino/spi/connector/ConnectorMetadata.java index e838032a021a..17b23b05d37c 100644 --- a/core/trino-spi/src/main/java/io/trino/spi/connector/ConnectorMetadata.java +++ b/core/trino-spi/src/main/java/io/trino/spi/connector/ConnectorMetadata.java @@ -1604,6 +1604,16 @@ default OptionalInt getMaxWriterTasks(ConnectorSession session) return OptionalInt.empty(); } + default WriterScalingOptions getNewTableWriterScalingOptions(ConnectorSession session, SchemaTableName tableName, Map tableProperties) + { + return WriterScalingOptions.DISABLED; + } + + default WriterScalingOptions getInsertWriterScalingOptions(ConnectorSession session, ConnectorTableHandle tableHandle) + { + return WriterScalingOptions.DISABLED; + } + final class Helper { private Helper() {} diff --git a/core/trino-spi/src/main/java/io/trino/spi/connector/WriterScalingOptions.java b/core/trino-spi/src/main/java/io/trino/spi/connector/WriterScalingOptions.java new file mode 100644 index 000000000000..52acb3632092 --- /dev/null +++ b/core/trino-spi/src/main/java/io/trino/spi/connector/WriterScalingOptions.java @@ -0,0 +1,63 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.spi.connector; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; + +import java.util.Optional; + +import static java.util.Objects.requireNonNull; + +public record WriterScalingOptions(boolean isWriterTasksScalingEnabled, boolean isPerTaskWriterScalingEnabled, Optional perTaskMaxScaledWriterCount) +{ + public static final WriterScalingOptions DISABLED = new WriterScalingOptions(false, false); + public static final WriterScalingOptions ENABLED = new WriterScalingOptions(true, true); + + public WriterScalingOptions(boolean writerTasksScalingEnabled, boolean perTaskWriterScalingEnabled) + { + this(writerTasksScalingEnabled, perTaskWriterScalingEnabled, Optional.empty()); + } + + @JsonCreator + public WriterScalingOptions( + @JsonProperty boolean isWriterTasksScalingEnabled, + @JsonProperty boolean isPerTaskWriterScalingEnabled, + @JsonProperty Optional perTaskMaxScaledWriterCount) + { + this.isWriterTasksScalingEnabled = isWriterTasksScalingEnabled; + this.isPerTaskWriterScalingEnabled = isPerTaskWriterScalingEnabled; + this.perTaskMaxScaledWriterCount = requireNonNull(perTaskMaxScaledWriterCount, "perTaskMaxScaledWriterCount is null"); + } + + @Override + @JsonProperty + public boolean isWriterTasksScalingEnabled() + { + return isWriterTasksScalingEnabled; + } + + @Override + @JsonProperty + public boolean isPerTaskWriterScalingEnabled() + { + return isPerTaskWriterScalingEnabled; + } + + @JsonProperty + public Optional perTaskMaxScaledWriterCount() + { + return perTaskMaxScaledWriterCount; + } +} diff --git a/lib/trino-plugin-toolkit/src/main/java/io/trino/plugin/base/classloader/ClassLoaderSafeConnectorMetadata.java b/lib/trino-plugin-toolkit/src/main/java/io/trino/plugin/base/classloader/ClassLoaderSafeConnectorMetadata.java index 01d52b1374a0..e80c958ac10a 100644 --- a/lib/trino-plugin-toolkit/src/main/java/io/trino/plugin/base/classloader/ClassLoaderSafeConnectorMetadata.java +++ b/lib/trino-plugin-toolkit/src/main/java/io/trino/plugin/base/classloader/ClassLoaderSafeConnectorMetadata.java @@ -63,6 +63,7 @@ import io.trino.spi.connector.TableFunctionApplicationResult; import io.trino.spi.connector.TableScanRedirectApplicationResult; import io.trino.spi.connector.TopNApplicationResult; +import io.trino.spi.connector.WriterScalingOptions; import io.trino.spi.expression.ConnectorExpression; import io.trino.spi.function.AggregationFunctionMetadata; import io.trino.spi.function.BoundSignature; @@ -1136,4 +1137,20 @@ public OptionalInt getMaxWriterTasks(ConnectorSession session) return delegate.getMaxWriterTasks(session); } } + + @Override + public WriterScalingOptions getNewTableWriterScalingOptions(ConnectorSession session, SchemaTableName tableName, Map tableProperties) + { + try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(classLoader)) { + return delegate.getNewTableWriterScalingOptions(session, tableName, tableProperties); + } + } + + @Override + public WriterScalingOptions getInsertWriterScalingOptions(ConnectorSession session, ConnectorTableHandle tableHandle) + { + try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(classLoader)) { + return delegate.getInsertWriterScalingOptions(session, tableHandle); + } + } } diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadata.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadata.java index 899cd3681643..4c55aa5c3ee5 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadata.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadata.java @@ -109,6 +109,7 @@ import io.trino.spi.connector.TableColumnsMetadata; import io.trino.spi.connector.TableNotFoundException; import io.trino.spi.connector.TableScanRedirectApplicationResult; +import io.trino.spi.connector.WriterScalingOptions; import io.trino.spi.expression.ConnectorExpression; import io.trino.spi.expression.Variable; import io.trino.spi.predicate.Domain; @@ -3148,6 +3149,18 @@ private Optional getRawSystemTable(SchemaTableName systemTableName) }; } + @Override + public WriterScalingOptions getNewTableWriterScalingOptions(ConnectorSession session, SchemaTableName tableName, Map tableProperties) + { + return WriterScalingOptions.ENABLED; + } + + @Override + public WriterScalingOptions getInsertWriterScalingOptions(ConnectorSession session, ConnectorTableHandle tableHandle) + { + return WriterScalingOptions.ENABLED; + } + private static Map toDeltaLakeColumnStatistics(Collection computedStatistics) { // Only statistics for whole table are collected diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveMetadata.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveMetadata.java index 943b40d8be29..29ab11dd2297 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveMetadata.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveMetadata.java @@ -102,6 +102,7 @@ import io.trino.spi.connector.TableNotFoundException; import io.trino.spi.connector.TableScanRedirectApplicationResult; import io.trino.spi.connector.ViewNotFoundException; +import io.trino.spi.connector.WriterScalingOptions; import io.trino.spi.expression.ConnectorExpression; import io.trino.spi.expression.Variable; import io.trino.spi.predicate.Domain; @@ -3844,6 +3845,18 @@ private Optional redirectTableToHudi(Optional ta return Optional.empty(); } + @Override + public WriterScalingOptions getNewTableWriterScalingOptions(ConnectorSession session, SchemaTableName tableName, Map tableProperties) + { + return WriterScalingOptions.ENABLED; + } + + @Override + public WriterScalingOptions getInsertWriterScalingOptions(ConnectorSession session, ConnectorTableHandle tableHandle) + { + return WriterScalingOptions.ENABLED; + } + private static TableNameSplitResult splitTableName(String tableName) { int metadataMarkerIndex = tableName.lastIndexOf('$'); diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java index cf57078bdbc6..eec0ce128bb0 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java @@ -88,6 +88,7 @@ import io.trino.spi.connector.SystemTable; import io.trino.spi.connector.TableColumnsMetadata; import io.trino.spi.connector.TableNotFoundException; +import io.trino.spi.connector.WriterScalingOptions; import io.trino.spi.expression.ConnectorExpression; import io.trino.spi.expression.FunctionName; import io.trino.spi.expression.Variable; @@ -2884,6 +2885,18 @@ public Optional redirectTable(ConnectorSession session, return catalog.redirectTable(session, tableName, targetCatalogName.get()); } + @Override + public WriterScalingOptions getNewTableWriterScalingOptions(ConnectorSession session, SchemaTableName tableName, Map tableProperties) + { + return WriterScalingOptions.ENABLED; + } + + @Override + public WriterScalingOptions getInsertWriterScalingOptions(ConnectorSession session, ConnectorTableHandle tableHandle) + { + return WriterScalingOptions.ENABLED; + } + private static CollectedStatistics processComputedTableStatistics(Table table, Collection computedStatistics) { Map columnNameToId = table.schema().columns().stream()