diff --git a/core/trino-main/src/main/java/io/trino/metadata/Metadata.java b/core/trino-main/src/main/java/io/trino/metadata/Metadata.java index 92f79f46d31e..326de733d15e 100644 --- a/core/trino-main/src/main/java/io/trino/metadata/Metadata.java +++ b/core/trino-main/src/main/java/io/trino/metadata/Metadata.java @@ -47,6 +47,7 @@ import io.trino.spi.connector.TableFunctionApplicationResult; import io.trino.spi.connector.TableScanRedirectApplicationResult; import io.trino.spi.connector.TopNApplicationResult; +import io.trino.spi.connector.WriterScalingOptions; import io.trino.spi.expression.ConnectorExpression; import io.trino.spi.function.AggregationFunctionMetadata; import io.trino.spi.function.FunctionMetadata; @@ -751,4 +752,14 @@ default boolean isMaterializedView(Session session, QualifiedObjectName viewName * Note: It is ignored when retry policy is set to TASK */ OptionalInt getMaxWriterTasks(Session session, String catalogName); + + /** + * Returns writer scaling options for the specified table. This method is called when table handle is not available during CTAS. + */ + WriterScalingOptions getNewTableWriterScalingOptions(Session session, QualifiedObjectName tableName, Map tableProperties); + + /** + * Returns writer scaling options for the specified table. + */ + WriterScalingOptions getInsertWriterScalingOptions(Session session, TableHandle tableHandle); } diff --git a/core/trino-main/src/main/java/io/trino/metadata/MetadataManager.java b/core/trino-main/src/main/java/io/trino/metadata/MetadataManager.java index 942027c99026..a7b2e1ed75c1 100644 --- a/core/trino-main/src/main/java/io/trino/metadata/MetadataManager.java +++ b/core/trino-main/src/main/java/io/trino/metadata/MetadataManager.java @@ -83,6 +83,7 @@ import io.trino.spi.connector.TableFunctionApplicationResult; import io.trino.spi.connector.TableScanRedirectApplicationResult; import io.trino.spi.connector.TopNApplicationResult; +import io.trino.spi.connector.WriterScalingOptions; import io.trino.spi.expression.ConnectorExpression; import io.trino.spi.expression.Variable; import io.trino.spi.function.AggregationFunctionMetadata; @@ -2671,6 +2672,22 @@ public OptionalInt getMaxWriterTasks(Session session, String catalogName) return catalogMetadata.getMetadata(session).getMaxWriterTasks(session.toConnectorSession(catalogHandle)); } + @Override + public WriterScalingOptions getNewTableWriterScalingOptions(Session session, QualifiedObjectName tableName, Map tableProperties) + { + CatalogMetadata catalogMetadata = getCatalogMetadataForWrite(session, tableName.getCatalogName()); + CatalogHandle catalogHandle = catalogMetadata.getCatalogHandle(session, tableName); + ConnectorMetadata metadata = catalogMetadata.getMetadataFor(session, catalogHandle); + return metadata.getNewTableWriterScalingOptions(session.toConnectorSession(catalogHandle), tableName.asSchemaTableName(), tableProperties); + } + + @Override + public WriterScalingOptions getInsertWriterScalingOptions(Session session, TableHandle tableHandle) + { + ConnectorMetadata metadata = getMetadataForWrite(session, tableHandle.getCatalogHandle()); + return metadata.getInsertWriterScalingOptions(session.toConnectorSession(tableHandle.getCatalogHandle()), tableHandle.getConnectorHandle()); + } + private Optional toConnectorVersion(Optional version) { Optional connectorVersion = Optional.empty(); diff --git a/core/trino-main/src/main/java/io/trino/sql/planner/LocalExecutionPlanner.java b/core/trino-main/src/main/java/io/trino/sql/planner/LocalExecutionPlanner.java index 44595589c953..19247305128c 100644 --- a/core/trino-main/src/main/java/io/trino/sql/planner/LocalExecutionPlanner.java +++ b/core/trino-main/src/main/java/io/trino/sql/planner/LocalExecutionPlanner.java @@ -161,6 +161,7 @@ import io.trino.spi.connector.DynamicFilter; import io.trino.spi.connector.RecordSet; import io.trino.spi.connector.SortOrder; +import io.trino.spi.connector.WriterScalingOptions; import io.trino.spi.function.AggregationImplementation; import io.trino.spi.function.BoundSignature; import io.trino.spi.function.FunctionId; @@ -379,6 +380,7 @@ import static io.trino.util.SpatialJoinUtils.ST_WITHIN; import static io.trino.util.SpatialJoinUtils.extractSupportedSpatialComparisons; import static io.trino.util.SpatialJoinUtils.extractSupportedSpatialFunctions; +import static java.lang.Math.min; import static java.lang.Math.toIntExact; import static java.lang.String.format; import static java.util.Objects.requireNonNull; @@ -3280,7 +3282,11 @@ public PhysicalOperation visitRefreshMaterializedView(RefreshMaterializedViewNod public PhysicalOperation visitTableWriter(TableWriterNode node, LocalExecutionPlanContext context) { // Set table writer count - int maxWriterCount = getWriterCount(session, node.getPartitioningScheme(), node.getSource()); + int maxWriterCount = getWriterCount( + session, + node.getTarget().getWriterScalingOptions(metadata, session), + node.getPartitioningScheme(), + node.getSource()); context.setDriverInstanceCount(maxWriterCount); context.taskContext.setMaxWriterCount(maxWriterCount); @@ -3438,7 +3444,11 @@ public PhysicalOperation visitSimpleTableExecuteNode(SimpleTableExecuteNode node public PhysicalOperation visitTableExecute(TableExecuteNode node, LocalExecutionPlanContext context) { // Set table writer count - int maxWriterCount = getWriterCount(session, node.getPartitioningScheme(), node.getSource()); + int maxWriterCount = getWriterCount( + session, + node.getTarget().getWriterScalingOptions(metadata, session), + node.getPartitioningScheme(), + node.getSource()); context.setDriverInstanceCount(maxWriterCount); context.taskContext.setMaxWriterCount(maxWriterCount); @@ -3465,7 +3475,7 @@ public PhysicalOperation visitTableExecute(TableExecuteNode node, LocalExecution return new PhysicalOperation(operatorFactory, outputMapping.buildOrThrow(), context, source); } - private int getWriterCount(Session session, Optional partitioningScheme, PlanNode source) + private int getWriterCount(Session session, WriterScalingOptions connectorScalingOptions, Optional partitioningScheme, PlanNode source) { // This check is required because we don't know which writer count to use when exchange is // single distribution. It could be possible that when scaling is enabled, a single distribution is @@ -3475,12 +3485,24 @@ private int getWriterCount(Session session, Optional partiti return 1; } - // The default value of partitioned writer count is 32 which is high enough to use it - // for both cases when scaling is enabled or not. Additionally, it doesn't lead to too many - // small files since when scaling is disabled only single writer will handle a single partition. - return partitioningScheme - .map(scheme -> getTaskPartitionedWriterCount(session)) - .orElseGet(() -> isLocalScaledWriterExchange(source) ? getTaskScaleWritersMaxWriterCount(session) : getTaskWriterCount(session)); + if (partitioningScheme.isPresent()) { + // The default value of partitioned writer count is 32 which is high enough to use it + // for both cases when scaling is enabled or not. Additionally, it doesn't lead to too many + // small files since when scaling is disabled only single writer will handle a single partition. + if (isLocalScaledWriterExchange(source)) { + return connectorScalingOptions.perTaskMaxScaledWriterCount() + .map(writerCount -> min(writerCount, getTaskPartitionedWriterCount(session))) + .orElse(getTaskPartitionedWriterCount(session)); + } + return getTaskPartitionedWriterCount(session); + } + + if (isLocalScaledWriterExchange(source)) { + return connectorScalingOptions.perTaskMaxScaledWriterCount() + .map(writerCount -> min(writerCount, getTaskScaleWritersMaxWriterCount(session))) + .orElse(getTaskScaleWritersMaxWriterCount(session)); + } + return getTaskWriterCount(session); } private boolean isSingleGatheringExchange(PlanNode node) diff --git a/core/trino-main/src/main/java/io/trino/sql/planner/LogicalPlanner.java b/core/trino-main/src/main/java/io/trino/sql/planner/LogicalPlanner.java index fc04b1cb320a..198530887691 100644 --- a/core/trino-main/src/main/java/io/trino/sql/planner/LogicalPlanner.java +++ b/core/trino-main/src/main/java/io/trino/sql/planner/LogicalPlanner.java @@ -944,7 +944,11 @@ private RelationPlan createTableExecutePlan(Analysis analysis, TableExecute stat .map(ColumnMetadata::getName) .collect(toImmutableList()); - TableWriterNode.TableExecuteTarget tableExecuteTarget = new TableWriterNode.TableExecuteTarget(executeHandle, Optional.empty(), tableName.asSchemaTableName()); + TableWriterNode.TableExecuteTarget tableExecuteTarget = new TableWriterNode.TableExecuteTarget( + executeHandle, + Optional.empty(), + tableName.asSchemaTableName(), + metadata.getInsertWriterScalingOptions(session, tableHandle)); Optional layout = metadata.getLayoutForTableExecute(session, executeHandle); diff --git a/core/trino-main/src/main/java/io/trino/sql/planner/optimizations/AddExchanges.java b/core/trino-main/src/main/java/io/trino/sql/planner/optimizations/AddExchanges.java index 454472b6adde..9c1f9391859b 100644 --- a/core/trino-main/src/main/java/io/trino/sql/planner/optimizations/AddExchanges.java +++ b/core/trino-main/src/main/java/io/trino/sql/planner/optimizations/AddExchanges.java @@ -33,6 +33,7 @@ import io.trino.spi.connector.CatalogHandle; import io.trino.spi.connector.GroupingProperty; import io.trino.spi.connector.LocalProperty; +import io.trino.spi.connector.WriterScalingOptions; import io.trino.sql.PlannerContext; import io.trino.sql.planner.DomainTranslator; import io.trino.sql.planner.Partitioning; @@ -760,13 +761,14 @@ public PlanWithProperties visitMergeWriter(MergeWriterNode node, PreferredProper private PlanWithProperties getWriterPlanWithProperties(Optional partitioningScheme, PlanWithProperties newSource, TableWriterNode.WriterTarget writerTarget) { + WriterScalingOptions scalingOptions = writerTarget.getWriterScalingOptions(plannerContext.getMetadata(), session); if (partitioningScheme.isEmpty()) { // use maxWritersTasks to set PartitioningScheme.partitionCount field to limit number of tasks that will take part in executing writing stage int maxWriterTasks = writerTarget.getMaxWriterTasks(plannerContext.getMetadata(), session).orElse(getMaxWriterTaskCount(session)); Optional maxWritersNodesCount = getRetryPolicy(session) != RetryPolicy.TASK ? Optional.of(Math.min(maxWriterTasks, getMaxWriterTaskCount(session))) : Optional.empty(); - if (scaleWriters) { + if (scaleWriters && scalingOptions.isWriterTasksScalingEnabled()) { partitioningScheme = Optional.of(new PartitioningScheme(Partitioning.create(SCALED_WRITER_ROUND_ROBIN_DISTRIBUTION, ImmutableList.of()), newSource.getNode().getOutputSymbols(), Optional.empty(), false, Optional.empty(), maxWritersNodesCount)); } else if (redistributeWrites) { @@ -774,6 +776,7 @@ else if (redistributeWrites) { } } else if (scaleWriters + && scalingOptions.isWriterTasksScalingEnabled() && writerTarget.supportsMultipleWritersPerPartition(plannerContext.getMetadata(), session) // do not insert an exchange if partitioning is compatible && !newSource.getProperties().isCompatibleTablePartitioningWith(partitioningScheme.get().getPartitioning(), false, plannerContext.getMetadata(), session)) { diff --git a/core/trino-main/src/main/java/io/trino/sql/planner/optimizations/AddLocalExchanges.java b/core/trino-main/src/main/java/io/trino/sql/planner/optimizations/AddLocalExchanges.java index 952f7ecc590d..73fc9743d9e2 100644 --- a/core/trino-main/src/main/java/io/trino/sql/planner/optimizations/AddLocalExchanges.java +++ b/core/trino-main/src/main/java/io/trino/sql/planner/optimizations/AddLocalExchanges.java @@ -23,6 +23,7 @@ import io.trino.spi.connector.ConstantProperty; import io.trino.spi.connector.GroupingProperty; import io.trino.spi.connector.LocalProperty; +import io.trino.spi.connector.WriterScalingOptions; import io.trino.sql.PlannerContext; import io.trino.sql.planner.Partitioning; import io.trino.sql.planner.PartitioningHandle; @@ -686,13 +687,14 @@ public PlanWithProperties visitSimpleTableExecuteNode(SimpleTableExecuteNode nod @Override public PlanWithProperties visitTableWriter(TableWriterNode node, StreamPreferredProperties parentPreferences) { + WriterScalingOptions scalingOptions = node.getTarget().getWriterScalingOptions(plannerContext.getMetadata(), session); return visitTableWriter( node, node.getPartitioningScheme(), node.getSource(), parentPreferences, node.getTarget(), - isTaskScaleWritersEnabled(session)); + isTaskScaleWritersEnabled(session) && scalingOptions.isPerTaskWriterScalingEnabled()); } @Override diff --git a/core/trino-main/src/main/java/io/trino/sql/planner/optimizations/BeginTableWrite.java b/core/trino-main/src/main/java/io/trino/sql/planner/optimizations/BeginTableWrite.java index ff2d94831fb6..33d8fd70e2fa 100644 --- a/core/trino-main/src/main/java/io/trino/sql/planner/optimizations/BeginTableWrite.java +++ b/core/trino-main/src/main/java/io/trino/sql/planner/optimizations/BeginTableWrite.java @@ -208,7 +208,8 @@ public WriterTarget getWriterTarget(PlanNode node) return new TableExecuteTarget( target.getExecuteHandle(), findTableScanHandleForTableExecute(((TableExecuteNode) node).getSource()), - target.getSchemaTableName()); + target.getSchemaTableName(), + target.getWriterScalingOptions()); } if (node instanceof MergeWriterNode mergeWriterNode) { @@ -244,14 +245,16 @@ private WriterTarget createWriterTarget(WriterTarget target) metadata.beginCreateTable(session, create.getCatalog(), create.getTableMetadata(), create.getLayout()), create.getTableMetadata().getTable(), target.supportsMultipleWritersPerPartition(metadata, session), - target.getMaxWriterTasks(metadata, session)); + target.getMaxWriterTasks(metadata, session), + target.getWriterScalingOptions(metadata, session)); } if (target instanceof InsertReference insert) { return new InsertTarget( metadata.beginInsert(session, insert.getHandle(), insert.getColumns()), metadata.getTableName(session, insert.getHandle()).getSchemaTableName(), target.supportsMultipleWritersPerPartition(metadata, session), - target.getMaxWriterTasks(metadata, session)); + target.getMaxWriterTasks(metadata, session), + target.getWriterScalingOptions(metadata, session)); } if (target instanceof MergeTarget merge) { MergeHandle mergeHandle = metadata.beginMerge(session, merge.getHandle()); @@ -266,11 +269,12 @@ private WriterTarget createWriterTarget(WriterTarget target) refreshMV.getStorageTableHandle(), metadata.beginRefreshMaterializedView(session, refreshMV.getStorageTableHandle(), refreshMV.getSourceTableHandles()), metadata.getTableName(session, refreshMV.getStorageTableHandle()).getSchemaTableName(), - refreshMV.getSourceTableHandles()); + refreshMV.getSourceTableHandles(), + refreshMV.getWriterScalingOptions(metadata, session)); } if (target instanceof TableExecuteTarget tableExecute) { BeginTableExecuteResult result = metadata.beginTableExecute(session, tableExecute.getExecuteHandle(), tableExecute.getMandatorySourceHandle()); - return new TableExecuteTarget(result.getTableExecuteHandle(), Optional.of(result.getSourceHandle()), tableExecute.getSchemaTableName()); + return new TableExecuteTarget(result.getTableExecuteHandle(), Optional.of(result.getSourceHandle()), tableExecute.getSchemaTableName(), tableExecute.getWriterScalingOptions()); } throw new IllegalArgumentException("Unhandled target type: " + target.getClass().getSimpleName()); } diff --git a/core/trino-main/src/main/java/io/trino/sql/planner/plan/TableWriterNode.java b/core/trino-main/src/main/java/io/trino/sql/planner/plan/TableWriterNode.java index c8cf83aebbd7..099ae3465f96 100644 --- a/core/trino-main/src/main/java/io/trino/sql/planner/plan/TableWriterNode.java +++ b/core/trino-main/src/main/java/io/trino/sql/planner/plan/TableWriterNode.java @@ -26,6 +26,7 @@ import io.trino.metadata.MergeHandle; import io.trino.metadata.Metadata; import io.trino.metadata.OutputTableHandle; +import io.trino.metadata.QualifiedObjectName; import io.trino.metadata.TableExecuteHandle; import io.trino.metadata.TableHandle; import io.trino.metadata.TableLayout; @@ -33,6 +34,7 @@ import io.trino.spi.connector.ConnectorTableMetadata; import io.trino.spi.connector.RowChangeParadigm; import io.trino.spi.connector.SchemaTableName; +import io.trino.spi.connector.WriterScalingOptions; import io.trino.spi.type.Type; import io.trino.sql.planner.PartitioningScheme; import io.trino.sql.planner.Symbol; @@ -195,6 +197,8 @@ public abstract static class WriterTarget public abstract boolean supportsMultipleWritersPerPartition(Metadata metadata, Session session); public abstract OptionalInt getMaxWriterTasks(Metadata metadata, Session session); + + public abstract WriterScalingOptions getWriterScalingOptions(Metadata metadata, Session session); } // only used during planning -- will not be serialized @@ -229,6 +233,16 @@ public OptionalInt getMaxWriterTasks(Metadata metadata, Session session) return metadata.getMaxWriterTasks(session, catalog); } + @Override + public WriterScalingOptions getWriterScalingOptions(Metadata metadata, Session session) + { + QualifiedObjectName tableName = new QualifiedObjectName( + catalog, + tableMetadata.getTableSchema().getTable().getSchemaName(), + tableMetadata.getTableSchema().getTable().getTableName()); + return metadata.getNewTableWriterScalingOptions(session, tableName, tableMetadata.getProperties()); + } + public Optional getLayout() { return layout; @@ -253,18 +267,21 @@ public static class CreateTarget private final SchemaTableName schemaTableName; private final boolean multipleWritersPerPartitionSupported; private final OptionalInt maxWriterTasks; + private final WriterScalingOptions writerScalingOptions; @JsonCreator public CreateTarget( @JsonProperty("handle") OutputTableHandle handle, @JsonProperty("schemaTableName") SchemaTableName schemaTableName, @JsonProperty("multipleWritersPerPartitionSupported") boolean multipleWritersPerPartitionSupported, - @JsonProperty("maxWriterTasks") OptionalInt maxWriterTasks) + @JsonProperty("maxWriterTasks") OptionalInt maxWriterTasks, + @JsonProperty("writerScalingOptions") WriterScalingOptions writerScalingOptions) { this.handle = requireNonNull(handle, "handle is null"); this.schemaTableName = requireNonNull(schemaTableName, "schemaTableName is null"); this.multipleWritersPerPartitionSupported = multipleWritersPerPartitionSupported; this.maxWriterTasks = requireNonNull(maxWriterTasks, "maxWriterTasks is null"); + this.writerScalingOptions = requireNonNull(writerScalingOptions, "writerScalingOptions is null"); } @JsonProperty @@ -285,6 +302,12 @@ public boolean isMultipleWritersPerPartitionSupported() return multipleWritersPerPartitionSupported; } + @JsonProperty + public WriterScalingOptions getWriterScalingOptions() + { + return writerScalingOptions; + } + @Override public String toString() { @@ -302,6 +325,12 @@ public OptionalInt getMaxWriterTasks(Metadata metadata, Session session) { return maxWriterTasks; } + + @Override + public WriterScalingOptions getWriterScalingOptions(Metadata metadata, Session session) + { + return writerScalingOptions; + } } // only used during planning -- will not be serialized @@ -346,6 +375,12 @@ public OptionalInt getMaxWriterTasks(Metadata metadata, Session session) { return metadata.getMaxWriterTasks(session, handle.getCatalogHandle().getCatalogName()); } + + @Override + public WriterScalingOptions getWriterScalingOptions(Metadata metadata, Session session) + { + return metadata.getInsertWriterScalingOptions(session, handle); + } } public static class InsertTarget @@ -355,18 +390,21 @@ public static class InsertTarget private final SchemaTableName schemaTableName; private final boolean multipleWritersPerPartitionSupported; private final OptionalInt maxWriterTasks; + private final WriterScalingOptions writerScalingOptions; @JsonCreator public InsertTarget( @JsonProperty("handle") InsertTableHandle handle, @JsonProperty("schemaTableName") SchemaTableName schemaTableName, @JsonProperty("multipleWritersPerPartitionSupported") boolean multipleWritersPerPartitionSupported, - @JsonProperty("maxWriterTasks") OptionalInt maxWriterTasks) + @JsonProperty("maxWriterTasks") OptionalInt maxWriterTasks, + @JsonProperty("writerScalingOptions") WriterScalingOptions writerScalingOptions) { this.handle = requireNonNull(handle, "handle is null"); this.schemaTableName = requireNonNull(schemaTableName, "schemaTableName is null"); this.multipleWritersPerPartitionSupported = multipleWritersPerPartitionSupported; this.maxWriterTasks = requireNonNull(maxWriterTasks, "maxWriterTasks is null"); + this.writerScalingOptions = requireNonNull(writerScalingOptions, "writerScalingOptions is null"); } @JsonProperty @@ -387,6 +425,12 @@ public boolean isMultipleWritersPerPartitionSupported() return multipleWritersPerPartitionSupported; } + @JsonProperty + public WriterScalingOptions getWriterScalingOptions() + { + return writerScalingOptions; + } + @Override public String toString() { @@ -404,6 +448,12 @@ public OptionalInt getMaxWriterTasks(Metadata metadata, Session session) { return maxWriterTasks; } + + @Override + public WriterScalingOptions getWriterScalingOptions(Metadata metadata, Session session) + { + return writerScalingOptions; + } } public static class RefreshMaterializedViewReference @@ -449,6 +499,12 @@ public OptionalInt getMaxWriterTasks(Metadata metadata, Session session) { return metadata.getMaxWriterTasks(session, storageTableHandle.getCatalogHandle().getCatalogName()); } + + @Override + public WriterScalingOptions getWriterScalingOptions(Metadata metadata, Session session) + { + return metadata.getInsertWriterScalingOptions(session, storageTableHandle); + } } public static class RefreshMaterializedViewTarget @@ -458,18 +514,21 @@ public static class RefreshMaterializedViewTarget private final InsertTableHandle insertHandle; private final SchemaTableName schemaTableName; private final List sourceTableHandles; + private final WriterScalingOptions writerScalingOptions; @JsonCreator public RefreshMaterializedViewTarget( @JsonProperty("tableHandle") TableHandle tableHandle, @JsonProperty("insertHandle") InsertTableHandle insertHandle, @JsonProperty("schemaTableName") SchemaTableName schemaTableName, - @JsonProperty("sourceTableHandles") List sourceTableHandles) + @JsonProperty("sourceTableHandles") List sourceTableHandles, + @JsonProperty("writerScalingOptions") WriterScalingOptions writerScalingOptions) { this.tableHandle = requireNonNull(tableHandle, "tableHandle is null"); this.insertHandle = requireNonNull(insertHandle, "insertHandle is null"); this.schemaTableName = requireNonNull(schemaTableName, "schemaTableName is null"); this.sourceTableHandles = ImmutableList.copyOf(sourceTableHandles); + this.writerScalingOptions = requireNonNull(writerScalingOptions, "writerScalingOptions is null"); } @JsonProperty @@ -496,6 +555,12 @@ public List getSourceTableHandles() return sourceTableHandles; } + @JsonProperty + public WriterScalingOptions getWriterScalingOptions() + { + return writerScalingOptions; + } + @Override public String toString() { @@ -515,6 +580,12 @@ public OptionalInt getMaxWriterTasks(Metadata metadata, Session session) { return metadata.getMaxWriterTasks(session, tableHandle.getCatalogHandle().getCatalogName()); } + + @Override + public WriterScalingOptions getWriterScalingOptions(Metadata metadata, Session session) + { + return writerScalingOptions; + } } public static class DeleteTarget @@ -567,6 +638,12 @@ public OptionalInt getMaxWriterTasks(Metadata metadata, Session session) { throw new UnsupportedOperationException(); } + + @Override + public WriterScalingOptions getWriterScalingOptions(Metadata metadata, Session session) + { + throw new UnsupportedOperationException(); + } } public static class UpdateTarget @@ -638,6 +715,12 @@ public OptionalInt getMaxWriterTasks(Metadata metadata, Session session) { throw new UnsupportedOperationException(); } + + @Override + public WriterScalingOptions getWriterScalingOptions(Metadata metadata, Session session) + { + throw new UnsupportedOperationException(); + } } public static class TableExecuteTarget @@ -646,16 +729,19 @@ public static class TableExecuteTarget private final TableExecuteHandle executeHandle; private final Optional sourceHandle; private final SchemaTableName schemaTableName; + private final WriterScalingOptions writerScalingOptions; @JsonCreator public TableExecuteTarget( @JsonProperty("executeHandle") TableExecuteHandle executeHandle, @JsonProperty("sourceHandle") Optional sourceHandle, - @JsonProperty("schemaTableName") SchemaTableName schemaTableName) + @JsonProperty("schemaTableName") SchemaTableName schemaTableName, + @JsonProperty("writerScalingOptions") WriterScalingOptions writerScalingOptions) { this.executeHandle = requireNonNull(executeHandle, "handle is null"); this.sourceHandle = requireNonNull(sourceHandle, "sourceHandle is null"); this.schemaTableName = requireNonNull(schemaTableName, "schemaTableName is null"); + this.writerScalingOptions = requireNonNull(writerScalingOptions, "writerScalingOptions is null"); } @JsonProperty @@ -681,6 +767,12 @@ public SchemaTableName getSchemaTableName() return schemaTableName; } + @JsonProperty + public WriterScalingOptions getWriterScalingOptions() + { + return writerScalingOptions; + } + @Override public String toString() { @@ -700,6 +792,12 @@ public OptionalInt getMaxWriterTasks(Metadata metadata, Session session) { return metadata.getMaxWriterTasks(session, executeHandle.getCatalogHandle().getCatalogName()); } + + @Override + public WriterScalingOptions getWriterScalingOptions(Metadata metadata, Session session) + { + return writerScalingOptions; + } } public static class MergeTarget @@ -764,6 +862,12 @@ public OptionalInt getMaxWriterTasks(Metadata metadata, Session session) { return OptionalInt.empty(); } + + @Override + public WriterScalingOptions getWriterScalingOptions(Metadata metadata, Session session) + { + return WriterScalingOptions.DISABLED; + } } public static class MergeParadigmAndTypes diff --git a/core/trino-main/src/main/java/io/trino/sql/planner/sanity/ValidateScaledWritersUsage.java b/core/trino-main/src/main/java/io/trino/sql/planner/sanity/ValidateScaledWritersUsage.java index 3aaf0879ac93..5bd5938b50b9 100644 --- a/core/trino-main/src/main/java/io/trino/sql/planner/sanity/ValidateScaledWritersUsage.java +++ b/core/trino-main/src/main/java/io/trino/sql/planner/sanity/ValidateScaledWritersUsage.java @@ -16,6 +16,7 @@ import com.google.common.collect.ImmutableList; import io.trino.Session; import io.trino.execution.warnings.WarningCollector; +import io.trino.spi.connector.WriterScalingOptions; import io.trino.sql.PlannerContext; import io.trino.sql.planner.PartitioningHandle; import io.trino.sql.planner.TypeAnalyzer; @@ -55,7 +56,7 @@ public void validate( } private static class Visitor - extends PlanVisitor, Void> + extends PlanVisitor, Void> { private final Session session; private final PlannerContext plannerContext; @@ -67,39 +68,50 @@ private Visitor(Session session, PlannerContext plannerContext) } @Override - protected List visitPlan(PlanNode node, Void context) + protected List visitPlan(PlanNode node, Void context) { - return collectPartitioningHandles(node.getSources()); + return collectExchanges(node.getSources()); } @Override - public List visitTableWriter(TableWriterNode node, Void context) + public List visitTableWriter(TableWriterNode node, Void context) { - List children = collectPartitioningHandles(node.getSources()); - List scaleWriterPartitioningHandle = children.stream() - .filter(PartitioningHandle::isScaleWriters) + List scaleWriterExchanges = collectExchanges(node.getSources()).stream() + .filter(exchangeNode -> exchangeNode.getPartitioningScheme().getPartitioning().getHandle().isScaleWriters()) .collect(toImmutableList()); TableWriterNode.WriterTarget target = node.getTarget(); - scaleWriterPartitioningHandle.forEach(partitioningHandle -> { - if (isScaledWriterHashDistribution(partitioningHandle)) { + scaleWriterExchanges.forEach(exchangeNode -> { + PartitioningHandle handle = exchangeNode.getPartitioningScheme().getPartitioning().getHandle(); + WriterScalingOptions scalingOptions = target.getWriterScalingOptions(plannerContext.getMetadata(), session); + if (exchangeNode.getScope() == ExchangeNode.Scope.LOCAL) { + checkState(scalingOptions.isPerTaskWriterScalingEnabled(), + "The scaled writer per task partitioning scheme is set but writer target %s doesn't support it", target); + } + + if (exchangeNode.getScope() == ExchangeNode.Scope.REMOTE) { + checkState(scalingOptions.isWriterTasksScalingEnabled(), + "The scaled writer across tasks partitioning scheme is set but writer target %s doesn't support it", target); + } + + if (isScaledWriterHashDistribution(handle)) { checkState(target.supportsMultipleWritersPerPartition(plannerContext.getMetadata(), session), "The hash scaled writer partitioning scheme is set for the partitioned write but writer target %s doesn't support multiple writers per partition", target); } }); - return children; + return scaleWriterExchanges; } @Override - public List visitExchange(ExchangeNode node, Void context) + public List visitExchange(ExchangeNode node, Void context) { - return ImmutableList.builder() - .add(node.getPartitioningScheme().getPartitioning().getHandle()) - .addAll(collectPartitioningHandles(node.getSources())) + return ImmutableList.builder() + .add(node) + .addAll(collectExchanges(node.getSources())) .build(); } - private List collectPartitioningHandles(List nodes) + private List collectExchanges(List nodes) { return nodes.stream() .map(node -> node.accept(this, null)) diff --git a/core/trino-main/src/main/java/io/trino/tracing/TracingConnectorMetadata.java b/core/trino-main/src/main/java/io/trino/tracing/TracingConnectorMetadata.java index 9d2ab408622d..1ba927fa79b4 100644 --- a/core/trino-main/src/main/java/io/trino/tracing/TracingConnectorMetadata.java +++ b/core/trino-main/src/main/java/io/trino/tracing/TracingConnectorMetadata.java @@ -63,6 +63,7 @@ import io.trino.spi.connector.TableFunctionApplicationResult; import io.trino.spi.connector.TableScanRedirectApplicationResult; import io.trino.spi.connector.TopNApplicationResult; +import io.trino.spi.connector.WriterScalingOptions; import io.trino.spi.expression.ConnectorExpression; import io.trino.spi.function.AggregationFunctionMetadata; import io.trino.spi.function.BoundSignature; @@ -1245,6 +1246,24 @@ public OptionalInt getMaxWriterTasks(ConnectorSession session) } } + @Override + public WriterScalingOptions getNewTableWriterScalingOptions(ConnectorSession session, SchemaTableName tableName, Map tableProperties) + { + Span span = startSpan("getNewTableWriterScalingOptions", tableName); + try (var ignored = scopedSpan(span)) { + return delegate.getNewTableWriterScalingOptions(session, tableName, tableProperties); + } + } + + @Override + public WriterScalingOptions getInsertWriterScalingOptions(ConnectorSession session, ConnectorTableHandle tableHandle) + { + Span span = startSpan("getInsertWriterScalingOptions", tableHandle); + try (var ignored = scopedSpan(span)) { + return delegate.getInsertWriterScalingOptions(session, tableHandle); + } + } + private Span startSpan(String methodName) { return tracer.spanBuilder("ConnectorMetadata." + methodName) diff --git a/core/trino-main/src/main/java/io/trino/tracing/TracingMetadata.java b/core/trino-main/src/main/java/io/trino/tracing/TracingMetadata.java index 1386a7d1232b..d83a313061c0 100644 --- a/core/trino-main/src/main/java/io/trino/tracing/TracingMetadata.java +++ b/core/trino-main/src/main/java/io/trino/tracing/TracingMetadata.java @@ -74,6 +74,7 @@ import io.trino.spi.connector.TableFunctionApplicationResult; import io.trino.spi.connector.TableScanRedirectApplicationResult; import io.trino.spi.connector.TopNApplicationResult; +import io.trino.spi.connector.WriterScalingOptions; import io.trino.spi.expression.ConnectorExpression; import io.trino.spi.function.AggregationFunctionMetadata; import io.trino.spi.function.FunctionMetadata; @@ -1358,6 +1359,24 @@ public OptionalInt getMaxWriterTasks(Session session, String catalogName) } } + @Override + public WriterScalingOptions getNewTableWriterScalingOptions(Session session, QualifiedObjectName tableName, Map tableProperties) + { + Span span = startSpan("getNewTableWriterScalingOptions", tableName); + try (var ignored = scopedSpan(span)) { + return delegate.getNewTableWriterScalingOptions(session, tableName, tableProperties); + } + } + + @Override + public WriterScalingOptions getInsertWriterScalingOptions(Session session, TableHandle tableHandle) + { + Span span = startSpan("getInsertWriterScalingOptions", tableHandle); + try (var ignored = scopedSpan(span)) { + return delegate.getInsertWriterScalingOptions(session, tableHandle); + } + } + private Span startSpan(String methodName) { return tracer.spanBuilder("Metadata." + methodName) diff --git a/core/trino-main/src/test/java/io/trino/connector/MockConnector.java b/core/trino-main/src/test/java/io/trino/connector/MockConnector.java index f7a09f1970b5..a741bea2ac57 100644 --- a/core/trino-main/src/test/java/io/trino/connector/MockConnector.java +++ b/core/trino-main/src/test/java/io/trino/connector/MockConnector.java @@ -83,6 +83,7 @@ import io.trino.spi.connector.TableProcedureMetadata; import io.trino.spi.connector.TableScanRedirectApplicationResult; import io.trino.spi.connector.TopNApplicationResult; +import io.trino.spi.connector.WriterScalingOptions; import io.trino.spi.eventlistener.EventListener; import io.trino.spi.expression.ConnectorExpression; import io.trino.spi.function.FunctionProvider; @@ -178,6 +179,7 @@ public class MockConnector private final Function tableFunctionSplitsSources; private final OptionalInt maxWriterTasks; private final BiFunction> getLayoutForTableExecute; + private final WriterScalingOptions writerScalingOptions; MockConnector( Function metadataWrapper, @@ -224,7 +226,8 @@ public class MockConnector Supplier>> columnProperties, Function tableFunctionSplitsSources, OptionalInt maxWriterTasks, - BiFunction> getLayoutForTableExecute) + BiFunction> getLayoutForTableExecute, + WriterScalingOptions writerScalingOptions) { this.metadataWrapper = requireNonNull(metadataWrapper, "metadataWrapper is null"); this.sessionProperties = ImmutableList.copyOf(requireNonNull(sessionProperties, "sessionProperties is null")); @@ -271,6 +274,7 @@ public class MockConnector this.tableFunctionSplitsSources = requireNonNull(tableFunctionSplitsSources, "tableFunctionSplitsSources is null"); this.maxWriterTasks = requireNonNull(maxWriterTasks, "maxWriterTasks is null"); this.getLayoutForTableExecute = requireNonNull(getLayoutForTableExecute, "getLayoutForTableExecute is null"); + this.writerScalingOptions = requireNonNull(writerScalingOptions, "writerScalingOptions is null"); } @Override @@ -872,6 +876,18 @@ public BeginTableExecuteResult(tableExecuteHandle, updatedSourceTableHandle); } + @Override + public WriterScalingOptions getNewTableWriterScalingOptions(ConnectorSession session, SchemaTableName tableName, Map tableProperties) + { + return writerScalingOptions; + } + + @Override + public WriterScalingOptions getInsertWriterScalingOptions(ConnectorSession session, ConnectorTableHandle tableHandle) + { + return writerScalingOptions; + } + private MockConnectorAccessControl getMockAccessControl() { return (MockConnectorAccessControl) getAccessControl(); diff --git a/core/trino-main/src/test/java/io/trino/connector/MockConnectorFactory.java b/core/trino-main/src/test/java/io/trino/connector/MockConnectorFactory.java index b57d66d60e43..648415a5b59f 100644 --- a/core/trino-main/src/test/java/io/trino/connector/MockConnectorFactory.java +++ b/core/trino-main/src/test/java/io/trino/connector/MockConnectorFactory.java @@ -51,6 +51,7 @@ import io.trino.spi.connector.TableProcedureMetadata; import io.trino.spi.connector.TableScanRedirectApplicationResult; import io.trino.spi.connector.TopNApplicationResult; +import io.trino.spi.connector.WriterScalingOptions; import io.trino.spi.eventlistener.EventListener; import io.trino.spi.expression.ConnectorExpression; import io.trino.spi.function.FunctionProvider; @@ -138,6 +139,8 @@ public class MockConnectorFactory private final OptionalInt maxWriterTasks; private final BiFunction> getLayoutForTableExecute; + private final WriterScalingOptions writerScalingOptions; + private MockConnectorFactory( String name, List> sessionProperty, @@ -184,7 +187,8 @@ private MockConnectorFactory( boolean allowMissingColumnsOnInsert, Function tableFunctionSplitsSources, OptionalInt maxWriterTasks, - BiFunction> getLayoutForTableExecute) + BiFunction> getLayoutForTableExecute, + WriterScalingOptions writerScalingOptions) { this.name = requireNonNull(name, "name is null"); this.sessionProperty = ImmutableList.copyOf(requireNonNull(sessionProperty, "sessionProperty is null")); @@ -232,6 +236,7 @@ private MockConnectorFactory( this.tableFunctionSplitsSources = requireNonNull(tableFunctionSplitsSources, "tableFunctionSplitsSources is null"); this.maxWriterTasks = maxWriterTasks; this.getLayoutForTableExecute = requireNonNull(getLayoutForTableExecute, "getLayoutForTableExecute is null"); + this.writerScalingOptions = requireNonNull(writerScalingOptions, "writerScalingOptions is null"); } @Override @@ -288,7 +293,8 @@ public Connector create(String catalogName, Map config, Connecto columnProperties, tableFunctionSplitsSources, maxWriterTasks, - getLayoutForTableExecute); + getLayoutForTableExecute, + writerScalingOptions); } public static MockConnectorFactory create() @@ -438,6 +444,7 @@ public static final class Builder private boolean allowMissingColumnsOnInsert; private OptionalInt maxWriterTasks = OptionalInt.empty(); private BiFunction> getLayoutForTableExecute = (session, handle) -> Optional.empty(); + private WriterScalingOptions writerScalingOptions = WriterScalingOptions.DISABLED; private Builder() {} @@ -759,6 +766,12 @@ public Builder withAllowMissingColumnsOnInsert(boolean allowMissingColumnsOnInse return this; } + public Builder withWriterScalingOptions(WriterScalingOptions writerScalingOptions) + { + this.writerScalingOptions = writerScalingOptions; + return this; + } + public MockConnectorFactory build() { Optional accessControl = Optional.empty(); @@ -811,7 +824,8 @@ public MockConnectorFactory build() allowMissingColumnsOnInsert, tableFunctionSplitsSources, maxWriterTasks, - getLayoutForTableExecute); + getLayoutForTableExecute, + writerScalingOptions); } public static Function> defaultListSchemaNames() diff --git a/core/trino-main/src/test/java/io/trino/metadata/AbstractMockMetadata.java b/core/trino-main/src/test/java/io/trino/metadata/AbstractMockMetadata.java index 38cb84ca41fa..606dca48852b 100644 --- a/core/trino-main/src/test/java/io/trino/metadata/AbstractMockMetadata.java +++ b/core/trino-main/src/test/java/io/trino/metadata/AbstractMockMetadata.java @@ -53,6 +53,7 @@ import io.trino.spi.connector.TableFunctionApplicationResult; import io.trino.spi.connector.TableScanRedirectApplicationResult; import io.trino.spi.connector.TopNApplicationResult; +import io.trino.spi.connector.WriterScalingOptions; import io.trino.spi.expression.ConnectorExpression; import io.trino.spi.function.AggregationFunctionMetadata; import io.trino.spi.function.BoundSignature; @@ -934,4 +935,16 @@ public OptionalInt getMaxWriterTasks(Session session, String catalogName) { throw new UnsupportedOperationException(); } + + @Override + public WriterScalingOptions getNewTableWriterScalingOptions(Session session, QualifiedObjectName tableName, Map tableProperties) + { + throw new UnsupportedOperationException(); + } + + @Override + public WriterScalingOptions getInsertWriterScalingOptions(Session session, TableHandle tableHandle) + { + throw new UnsupportedOperationException(); + } } diff --git a/core/trino-main/src/test/java/io/trino/operator/TestTableWriterOperator.java b/core/trino-main/src/test/java/io/trino/operator/TestTableWriterOperator.java index 8c581dca4e83..5b59409a848e 100644 --- a/core/trino-main/src/test/java/io/trino/operator/TestTableWriterOperator.java +++ b/core/trino-main/src/test/java/io/trino/operator/TestTableWriterOperator.java @@ -35,6 +35,7 @@ import io.trino.spi.connector.ConnectorSession; import io.trino.spi.connector.ConnectorTransactionHandle; import io.trino.spi.connector.SchemaTableName; +import io.trino.spi.connector.WriterScalingOptions; import io.trino.spi.type.Type; import io.trino.split.PageSinkManager; import io.trino.sql.planner.plan.PlanNodeId; @@ -298,7 +299,8 @@ private Operator createTableWriterOperator( new ConnectorOutputTableHandle() {}), schemaTableName, false, - OptionalInt.empty()), + OptionalInt.empty(), + WriterScalingOptions.DISABLED), ImmutableList.of(0), session, statisticsAggregation, diff --git a/core/trino-main/src/test/java/io/trino/sql/planner/TestingWriterTarget.java b/core/trino-main/src/test/java/io/trino/sql/planner/TestingWriterTarget.java index a7ea3662a885..636c5480a84a 100644 --- a/core/trino-main/src/test/java/io/trino/sql/planner/TestingWriterTarget.java +++ b/core/trino-main/src/test/java/io/trino/sql/planner/TestingWriterTarget.java @@ -16,6 +16,7 @@ import io.trino.Session; import io.trino.metadata.Metadata; +import io.trino.spi.connector.WriterScalingOptions; import io.trino.sql.planner.plan.TableWriterNode; import java.util.OptionalInt; @@ -40,4 +41,10 @@ public OptionalInt getMaxWriterTasks(Metadata metadata, Session session) { return OptionalInt.empty(); } + + @Override + public WriterScalingOptions getWriterScalingOptions(Metadata metadata, Session session) + { + return WriterScalingOptions.DISABLED; + } } diff --git a/core/trino-main/src/test/java/io/trino/sql/planner/iterative/rule/TestRemoveEmptyMergeWriterRuleSet.java b/core/trino-main/src/test/java/io/trino/sql/planner/iterative/rule/TestRemoveEmptyMergeWriterRuleSet.java index fa70ef647aff..639732dff4ab 100644 --- a/core/trino-main/src/test/java/io/trino/sql/planner/iterative/rule/TestRemoveEmptyMergeWriterRuleSet.java +++ b/core/trino-main/src/test/java/io/trino/sql/planner/iterative/rule/TestRemoveEmptyMergeWriterRuleSet.java @@ -15,6 +15,7 @@ import io.trino.spi.connector.CatalogHandle; import io.trino.spi.connector.SchemaTableName; +import io.trino.spi.connector.WriterScalingOptions; import io.trino.sql.planner.Partitioning; import io.trino.sql.planner.PartitioningScheme; import io.trino.sql.planner.Symbol; @@ -89,7 +90,7 @@ private void testRemoveEmptyMergeRewrite(Rule rule, boolean pla List.of(rowCount)); return p.tableFinish( planWithExchange ? withExchange(p, merge, rowCount) : merge, - p.createTarget(catalogHandle, schemaTableName, true), + p.createTarget(catalogHandle, schemaTableName, true, WriterScalingOptions.ENABLED), rowCount); }) .matches(values("A")); diff --git a/core/trino-main/src/test/java/io/trino/sql/planner/iterative/rule/test/PlanBuilder.java b/core/trino-main/src/test/java/io/trino/sql/planner/iterative/rule/test/PlanBuilder.java index 7d431f8d0c4e..2463d5a313ef 100644 --- a/core/trino-main/src/test/java/io/trino/sql/planner/iterative/rule/test/PlanBuilder.java +++ b/core/trino-main/src/test/java/io/trino/sql/planner/iterative/rule/test/PlanBuilder.java @@ -33,6 +33,7 @@ import io.trino.spi.connector.ColumnHandle; import io.trino.spi.connector.SchemaTableName; import io.trino.spi.connector.SortOrder; +import io.trino.spi.connector.WriterScalingOptions; import io.trino.spi.function.table.ConnectorTableFunctionHandle; import io.trino.spi.predicate.TupleDomain; import io.trino.spi.type.Type; @@ -716,7 +717,7 @@ public TableFinishNode tableWithExchangeCreate(WriterTarget target, PlanNode sou rowCountSymbol); } - public CreateTarget createTarget(CatalogHandle catalogHandle, SchemaTableName schemaTableName, boolean multipleWritersPerPartitionSupported, OptionalInt maxWriterTasks) + public CreateTarget createTarget(CatalogHandle catalogHandle, SchemaTableName schemaTableName, boolean multipleWritersPerPartitionSupported, OptionalInt maxWriterTasks, WriterScalingOptions writerScalingOptions) { OutputTableHandle tableHandle = new OutputTableHandle( catalogHandle, @@ -727,12 +728,13 @@ public CreateTarget createTarget(CatalogHandle catalogHandle, SchemaTableName sc tableHandle, schemaTableName, multipleWritersPerPartitionSupported, - maxWriterTasks); + maxWriterTasks, + writerScalingOptions); } - public CreateTarget createTarget(CatalogHandle catalogHandle, SchemaTableName schemaTableName, boolean multipleWritersPerPartitionSupported) + public CreateTarget createTarget(CatalogHandle catalogHandle, SchemaTableName schemaTableName, boolean multipleWritersPerPartitionSupported, WriterScalingOptions writerScalingOptions) { - return createTarget(catalogHandle, schemaTableName, multipleWritersPerPartitionSupported, OptionalInt.empty()); + return createTarget(catalogHandle, schemaTableName, multipleWritersPerPartitionSupported, OptionalInt.empty(), writerScalingOptions); } public MergeWriterNode merge(SchemaTableName schemaTableName, PlanNode mergeSource, Symbol mergeRow, Symbol rowId, List outputs) @@ -1206,7 +1208,8 @@ public TableExecuteNode tableExecute( TestingTransactionHandle.create(), new TestingTableExecuteHandle()), Optional.empty(), - new SchemaTableName("schemaName", "tableName")), + new SchemaTableName("schemaName", "tableName"), + WriterScalingOptions.DISABLED), symbol("partialrows", BIGINT), symbol("fragment", VARBINARY), columns, diff --git a/core/trino-main/src/test/java/io/trino/sql/planner/optimizations/TestAddExchangesScaledWriters.java b/core/trino-main/src/test/java/io/trino/sql/planner/optimizations/TestAddExchangesScaledWriters.java index 7ac201fd02b9..9a24c905a2a1 100644 --- a/core/trino-main/src/test/java/io/trino/sql/planner/optimizations/TestAddExchangesScaledWriters.java +++ b/core/trino-main/src/test/java/io/trino/sql/planner/optimizations/TestAddExchangesScaledWriters.java @@ -18,6 +18,7 @@ import io.trino.Session; import io.trino.connector.MockConnectorFactory; import io.trino.plugin.tpch.TpchConnectorFactory; +import io.trino.spi.connector.WriterScalingOptions; import io.trino.sql.planner.LogicalPlanner; import io.trino.sql.planner.SubPlan; import io.trino.sql.planner.assertions.BasePlanTest; @@ -42,15 +43,17 @@ protected LocalQueryRunner createLocalQueryRunner() .build(); LocalQueryRunner queryRunner = LocalQueryRunner.create(session); queryRunner.createCatalog("tpch", new TpchConnectorFactory(1), ImmutableMap.of()); - queryRunner.createCatalog("catalog", createConnectorFactory("catalog"), ImmutableMap.of()); + queryRunner.createCatalog("catalog_with_scaled_writers", createConnectorFactory("catalog_with_scaled_writers", true), ImmutableMap.of()); + queryRunner.createCatalog("catalog_without_scaled_writers", createConnectorFactory("catalog_without_scaled_writers", false), ImmutableMap.of()); return queryRunner; } - private MockConnectorFactory createConnectorFactory(String name) + private MockConnectorFactory createConnectorFactory(String name, boolean writerScalingEnabledAcrossTasks) { return MockConnectorFactory.builder() .withGetTableHandle(((session, schemaTableName) -> null)) .withName(name) + .withWriterScalingOptions(new WriterScalingOptions(writerScalingEnabledAcrossTasks, true)) .build(); } @@ -68,7 +71,7 @@ public void testScaledWriters(boolean isScaleWritersEnabled) .build(); @Language("SQL") - String query = "CREATE TABLE catalog.mock.test AS SELECT * FROM tpch.tiny.nation"; + String query = "CREATE TABLE catalog_with_scaled_writers.mock.test AS SELECT * FROM tpch.tiny.nation"; SubPlan subPlan = subplan(query, LogicalPlanner.Stage.OPTIMIZED_AND_VALIDATED, false, session); if (isScaleWritersEnabled) { assertThat(subPlan.getAllFragments().get(1).getPartitioning().getConnectorHandle()).isEqualTo(SCALED_WRITER_ROUND_ROBIN_DISTRIBUTION.getConnectorHandle()); @@ -78,4 +81,18 @@ public void testScaledWriters(boolean isScaleWritersEnabled) fragment -> assertThat(fragment.getPartitioning().getConnectorHandle()).isNotEqualTo(SCALED_WRITER_ROUND_ROBIN_DISTRIBUTION.getConnectorHandle())); } } + + @Test(dataProvider = "scale_writers") + public void testScaledWritersWithTasksScalingDisabled(boolean isScaleWritersEnabled) + { + Session session = testSessionBuilder() + .setSystemProperty("scale_writers", Boolean.toString(isScaleWritersEnabled)) + .build(); + + @Language("SQL") + String query = "CREATE TABLE catalog_without_scaled_writers.mock.test AS SELECT * FROM tpch.tiny.nation"; + SubPlan subPlan = subplan(query, LogicalPlanner.Stage.OPTIMIZED_AND_VALIDATED, false, session); + subPlan.getAllFragments().forEach( + fragment -> assertThat(fragment.getPartitioning().getConnectorHandle()).isNotEqualTo(SCALED_WRITER_ROUND_ROBIN_DISTRIBUTION.getConnectorHandle())); + } } diff --git a/core/trino-main/src/test/java/io/trino/sql/planner/optimizations/TestAddLocalExchangesForTaskScaleWriters.java b/core/trino-main/src/test/java/io/trino/sql/planner/optimizations/TestAddLocalExchangesForTaskScaleWriters.java index a46ade141e6e..0b7224b0902f 100644 --- a/core/trino-main/src/test/java/io/trino/sql/planner/optimizations/TestAddLocalExchangesForTaskScaleWriters.java +++ b/core/trino-main/src/test/java/io/trino/sql/planner/optimizations/TestAddLocalExchangesForTaskScaleWriters.java @@ -27,6 +27,7 @@ import io.trino.spi.connector.ConnectorPartitioningHandle; import io.trino.spi.connector.ConnectorTableLayout; import io.trino.spi.connector.TableProcedureMetadata; +import io.trino.spi.connector.WriterScalingOptions; import io.trino.spi.session.PropertyMetadata; import io.trino.spi.statistics.ColumnStatistics; import io.trino.spi.statistics.Estimate; @@ -75,19 +76,24 @@ protected LocalQueryRunner createLocalQueryRunner() { LocalQueryRunner queryRunner = LocalQueryRunner.create(testSessionBuilder().build()); queryRunner.createCatalog( - "mock_without_multiple_writer_per_partition", - createConnectorFactory("mock_without_multiple_writer_per_partition", false), + "mock_with_scaled_writers", + createConnectorFactory("mock_with_scaled_writers", true, true), + ImmutableMap.of()); + queryRunner.createCatalog( + "mock_without_scaled_writers", + createConnectorFactory("mock_without_scaled_writers", true, false), ImmutableMap.of()); queryRunner.createCatalog( - "mock_with_multiple_writer_per_partition", - createConnectorFactory("mock_with_multiple_writer_per_partition", true), + "mock_without_multiple_writer_per_partition", + createConnectorFactory("mock_without_multiple_writer_per_partition", false, true), ImmutableMap.of()); return queryRunner; } private MockConnectorFactory createConnectorFactory( String catalogHandle, - boolean supportsMultipleWritersPerPartition) + boolean supportsMultipleWritersPerPartition, + boolean writerScalingEnabledWithinTask) { return MockConnectorFactory.builder() .withGetTableHandle(((session, tableName) -> { @@ -99,6 +105,7 @@ private MockConnectorFactory createConnectorFactory( } return null; })) + .withWriterScalingOptions(new WriterScalingOptions(true, writerScalingEnabledWithinTask)) .withGetTableStatistics(tableName -> { if (tableName.getTableName().equals("source_table")) { return new TableStatistics( @@ -175,6 +182,42 @@ public void testLocalScaledUnpartitionedWriterDistribution() tableScan("source_table", ImmutableMap.of("customer", "customer", "year", "year"))))))); } + @Test + public void testLocalScaledUnpartitionedWriterWithPerTaskScalingDisabled() + { + assertDistributedPlan( + "INSERT INTO unpartitioned_table SELECT * FROM source_table", + testSessionBuilder() + .setCatalog("mock_without_scaled_writers") + .setSchema("mock") + .setSystemProperty(TASK_SCALE_WRITERS_ENABLED, "true") + .setSystemProperty(SCALE_WRITERS, "false") + .build(), + anyTree( + tableWriter( + ImmutableList.of("customer", "year"), + ImmutableList.of("customer", "year"), + exchange(LOCAL, GATHER, SINGLE_DISTRIBUTION, + exchange(REMOTE, REPARTITION, FIXED_ARBITRARY_DISTRIBUTION, + tableScan("source_table", ImmutableMap.of("customer", "customer", "year", "year"))))))); + + assertDistributedPlan( + "INSERT INTO unpartitioned_table SELECT * FROM source_table", + testSessionBuilder() + .setCatalog("mock_without_scaled_writers") + .setSchema("mock") + .setSystemProperty(TASK_SCALE_WRITERS_ENABLED, "false") + .setSystemProperty(SCALE_WRITERS, "false") + .build(), + anyTree( + tableWriter( + ImmutableList.of("customer", "year"), + ImmutableList.of("customer", "year"), + exchange(LOCAL, GATHER, SINGLE_DISTRIBUTION, + exchange(REMOTE, REPARTITION, FIXED_ARBITRARY_DISTRIBUTION, + tableScan("source_table", ImmutableMap.of("customer", "customer", "year", "year"))))))); + } + @Test(dataProvider = "taskScaleWritersOption") public void testLocalScaledPartitionedWriterWithoutSupportForMultipleWritersPerPartition(boolean taskScaleWritersEnabled) { @@ -201,6 +244,32 @@ public void testLocalScaledPartitionedWriterWithoutSupportForMultipleWritersPerP tableScan("source_table", ImmutableMap.of("customer", "customer", "year", "year"))))))); } + @Test(dataProvider = "taskScaleWritersOption") + public void testLocalScaledPartitionedWriterWithPerTaskScalingDisabled(boolean taskScaleWritersEnabled) + { + String catalogName = "mock_without_scaled_writers"; + PartitioningHandle partitioningHandle = new PartitioningHandle( + Optional.of(getCatalogHandle(catalogName)), + Optional.of(MockConnectorTransactionHandle.INSTANCE), + CONNECTOR_PARTITIONING_HANDLE); + + assertDistributedPlan( + "INSERT INTO connector_partitioned_table SELECT * FROM source_table", + testSessionBuilder() + .setCatalog(catalogName) + .setSchema("mock") + .setSystemProperty(TASK_SCALE_WRITERS_ENABLED, String.valueOf(taskScaleWritersEnabled)) + .setSystemProperty(SCALE_WRITERS, "false") + .build(), + anyTree( + tableWriter( + ImmutableList.of("customer", "year"), + ImmutableList.of("customer", "year"), + exchange(LOCAL, REPARTITION, partitioningHandle, + exchange(REMOTE, REPARTITION, partitioningHandle, + tableScan("source_table", ImmutableMap.of("customer", "customer", "year", "year"))))))); + } + @DataProvider public Object[][] taskScaleWritersOption() { @@ -213,7 +282,7 @@ public void testLocalScaledPartitionedWriterForSystemPartitioningWithEnforcedPre assertDistributedPlan( "INSERT INTO system_partitioned_table SELECT * FROM source_table", testSessionBuilder() - .setCatalog("mock_with_multiple_writer_per_partition") + .setCatalog("mock_with_scaled_writers") .setSchema("mock") // Enforce preferred partitioning .setSystemProperty(USE_PREFERRED_WRITE_PARTITIONING, "true") @@ -233,7 +302,7 @@ public void testLocalScaledPartitionedWriterForSystemPartitioningWithEnforcedPre assertDistributedPlan( "INSERT INTO system_partitioned_table SELECT * FROM source_table", testSessionBuilder() - .setCatalog("mock_with_multiple_writer_per_partition") + .setCatalog("mock_with_scaled_writers") .setSchema("mock") // Enforce preferred partitioning .setSystemProperty(USE_PREFERRED_WRITE_PARTITIONING, "true") @@ -254,7 +323,7 @@ public void testLocalScaledPartitionedWriterForSystemPartitioningWithEnforcedPre @Test public void testLocalScaledPartitionedWriterForConnectorPartitioning() { - String catalogName = "mock_with_multiple_writer_per_partition"; + String catalogName = "mock_with_scaled_writers"; PartitioningHandle partitioningHandle = new PartitioningHandle( Optional.of(getCatalogHandle(catalogName)), Optional.of(MockConnectorTransactionHandle.INSTANCE), @@ -304,7 +373,7 @@ public void testLocalScaledPartitionedWriterWithEnforcedLocalPreferredPartitioni assertDistributedPlan( "INSERT INTO system_partitioned_table SELECT * FROM source_table", testSessionBuilder() - .setCatalog("mock_with_multiple_writer_per_partition") + .setCatalog("mock_with_scaled_writers") .setSchema("mock") .setSystemProperty(TASK_SCALE_WRITERS_ENABLED, "true") .setSystemProperty(SCALE_WRITERS, "false") @@ -322,7 +391,7 @@ public void testLocalScaledPartitionedWriterWithEnforcedLocalPreferredPartitioni assertDistributedPlan( "INSERT INTO system_partitioned_table SELECT * FROM source_table", testSessionBuilder() - .setCatalog("mock_with_multiple_writer_per_partition") + .setCatalog("mock_with_scaled_writers") .setSchema("mock") .setSystemProperty(TASK_SCALE_WRITERS_ENABLED, "false") .setSystemProperty(SCALE_WRITERS, "false") @@ -344,7 +413,7 @@ public void testTableExecuteLocalScalingDisabledForPartitionedTable() @Language("SQL") String query = "ALTER TABLE system_partitioned_table EXECUTE optimize(file_size_threshold => '10MB')"; Session session = Session.builder(getQueryRunner().getDefaultSession()) - .setCatalog("mock_with_multiple_writer_per_partition") + .setCatalog("mock_with_scaled_writers") .setSchema("mock") .setSystemProperty(TASK_SCALE_WRITERS_ENABLED, "true") .build(); @@ -367,7 +436,7 @@ public void testTableExecuteLocalScalingDisabledForUnpartitionedTable() @Language("SQL") String query = "ALTER TABLE unpartitioned_table EXECUTE optimize(file_size_threshold => '10MB')"; Session session = Session.builder(getQueryRunner().getDefaultSession()) - .setCatalog("mock_with_multiple_writer_per_partition") + .setCatalog("mock_with_scaled_writers") .setSchema("mock") .setSystemProperty(TASK_SCALE_WRITERS_ENABLED, "true") .build(); diff --git a/core/trino-main/src/test/java/io/trino/sql/planner/optimizations/TestLimitMaxWriterNodesCount.java b/core/trino-main/src/test/java/io/trino/sql/planner/optimizations/TestLimitMaxWriterNodesCount.java index 4cd5064693ef..68dc833fdc0f 100644 --- a/core/trino-main/src/test/java/io/trino/sql/planner/optimizations/TestLimitMaxWriterNodesCount.java +++ b/core/trino-main/src/test/java/io/trino/sql/planner/optimizations/TestLimitMaxWriterNodesCount.java @@ -25,6 +25,7 @@ import io.trino.spi.connector.ConnectorPartitioningHandle; import io.trino.spi.connector.ConnectorTableLayout; import io.trino.spi.connector.TableProcedureMetadata; +import io.trino.spi.connector.WriterScalingOptions; import io.trino.spi.session.PropertyMetadata; import io.trino.sql.planner.SystemPartitioningHandle; import io.trino.sql.planner.TestTableScanNodePartitioning; @@ -99,6 +100,7 @@ private MockConnectorFactory prepareConnectorFactory(String catalogName, Optiona } return null; })) + .withWriterScalingOptions(WriterScalingOptions.ENABLED) .withGetInsertLayout((session, tableMetadata) -> { if (tableMetadata.getTableName().equals(partitionedTable)) { return Optional.of(new ConnectorTableLayout(ImmutableList.of("column_a"))); diff --git a/core/trino-main/src/test/java/io/trino/sql/planner/sanity/TestValidateScaledWritersUsage.java b/core/trino-main/src/test/java/io/trino/sql/planner/sanity/TestValidateScaledWritersUsage.java index 19a3e4931da0..1575e0ac3f81 100644 --- a/core/trino-main/src/test/java/io/trino/sql/planner/sanity/TestValidateScaledWritersUsage.java +++ b/core/trino-main/src/test/java/io/trino/sql/planner/sanity/TestValidateScaledWritersUsage.java @@ -24,6 +24,7 @@ import io.trino.spi.connector.ConnectorPartitioningHandle; import io.trino.spi.connector.ConnectorTransactionHandle; import io.trino.spi.connector.SchemaTableName; +import io.trino.spi.connector.WriterScalingOptions; import io.trino.sql.PlannerContext; import io.trino.sql.planner.Partitioning; import io.trino.sql.planner.PartitioningHandle; @@ -33,6 +34,7 @@ import io.trino.sql.planner.TypeProvider; import io.trino.sql.planner.assertions.BasePlanTest; import io.trino.sql.planner.iterative.rule.test.PlanBuilder; +import io.trino.sql.planner.plan.ExchangeNode; import io.trino.sql.planner.plan.PlanNode; import io.trino.sql.planner.plan.TableScanNode; import io.trino.testing.LocalQueryRunner; @@ -117,12 +119,60 @@ public void testScaledWritersUsedAndTargetSupportsIt(PartitioningHandle scaledWr PlanNode root = planBuilder.output( outputBuilder -> outputBuilder .source(planBuilder.tableWithExchangeCreate( - planBuilder.createTarget(catalog, schemaTableName, true), + planBuilder.createTarget(catalog, schemaTableName, true, WriterScalingOptions.ENABLED), tableWriterSource, symbol))); validatePlan(root); } + @Test(dataProvider = "scaledWriterPartitioningHandles") + public void testScaledWritersUsedAndTargetDoesNotSupportScalingPerTask(PartitioningHandle scaledWriterPartitionHandle) + { + PlanNode tableWriterSource = planBuilder.exchange(ex -> + ex + .partitioningScheme(new PartitioningScheme(Partitioning.create(scaledWriterPartitionHandle, ImmutableList.of()), ImmutableList.of(symbol))) + .addInputsSet(symbol) + .addSource(planBuilder.exchange(innerExchange -> + innerExchange + .scope(ExchangeNode.Scope.LOCAL) + .partitioningScheme(new PartitioningScheme(Partitioning.create(scaledWriterPartitionHandle, ImmutableList.of()), ImmutableList.of(symbol))) + .addInputsSet(symbol) + .addSource(tableScanNode)))); + PlanNode root = planBuilder.output( + outputBuilder -> outputBuilder + .source(planBuilder.tableWithExchangeCreate( + planBuilder.createTarget(catalog, schemaTableName, true, new WriterScalingOptions(true, false)), + tableWriterSource, + symbol))); + assertThatThrownBy(() -> validatePlan(root)) + .isInstanceOf(IllegalStateException.class) + .hasMessage("The scaled writer per task partitioning scheme is set but writer target catalog:INSTANCE doesn't support it"); + } + + @Test(dataProvider = "scaledWriterPartitioningHandles") + public void testScaledWritersUsedAndTargetDoesNotSupportScalingAcrossTasks(PartitioningHandle scaledWriterPartitionHandle) + { + PlanNode tableWriterSource = planBuilder.exchange(ex -> + ex + .partitioningScheme(new PartitioningScheme(Partitioning.create(scaledWriterPartitionHandle, ImmutableList.of()), ImmutableList.of(symbol))) + .addInputsSet(symbol) + .addSource(planBuilder.exchange(innerExchange -> + innerExchange + .scope(ExchangeNode.Scope.REMOTE) + .partitioningScheme(new PartitioningScheme(Partitioning.create(scaledWriterPartitionHandle, ImmutableList.of()), ImmutableList.of(symbol))) + .addInputsSet(symbol) + .addSource(tableScanNode)))); + PlanNode root = planBuilder.output( + outputBuilder -> outputBuilder + .source(planBuilder.tableWithExchangeCreate( + planBuilder.createTarget(catalog, schemaTableName, true, new WriterScalingOptions(false, true)), + tableWriterSource, + symbol))); + assertThatThrownBy(() -> validatePlan(root)) + .isInstanceOf(IllegalStateException.class) + .hasMessage("The scaled writer across tasks partitioning scheme is set but writer target catalog:INSTANCE doesn't support it"); + } + @Test(dataProvider = "scaledWriterPartitioningHandles") public void testScaledWriterUsedAndTargetDoesNotSupportMultipleWritersPerPartition(PartitioningHandle scaledWriterPartitionHandle) { @@ -138,7 +188,7 @@ public void testScaledWriterUsedAndTargetDoesNotSupportMultipleWritersPerPartiti PlanNode root = planBuilder.output( outputBuilder -> outputBuilder .source(planBuilder.tableWithExchangeCreate( - planBuilder.createTarget(catalog, schemaTableName, false), + planBuilder.createTarget(catalog, schemaTableName, false, WriterScalingOptions.ENABLED), tableWriterSource, symbol))); @@ -173,7 +223,7 @@ public void testScaledWriterWithMultipleSourceExchangesAndTargetDoesNotSupportMu PlanNode root = planBuilder.output( outputBuilder -> outputBuilder .source(planBuilder.tableWithExchangeCreate( - planBuilder.createTarget(catalog, schemaTableName, false), + planBuilder.createTarget(catalog, schemaTableName, false, WriterScalingOptions.ENABLED), tableWriterSource, symbol))); diff --git a/core/trino-spi/src/main/java/io/trino/spi/connector/ConnectorMetadata.java b/core/trino-spi/src/main/java/io/trino/spi/connector/ConnectorMetadata.java index e838032a021a..17b23b05d37c 100644 --- a/core/trino-spi/src/main/java/io/trino/spi/connector/ConnectorMetadata.java +++ b/core/trino-spi/src/main/java/io/trino/spi/connector/ConnectorMetadata.java @@ -1604,6 +1604,16 @@ default OptionalInt getMaxWriterTasks(ConnectorSession session) return OptionalInt.empty(); } + default WriterScalingOptions getNewTableWriterScalingOptions(ConnectorSession session, SchemaTableName tableName, Map tableProperties) + { + return WriterScalingOptions.DISABLED; + } + + default WriterScalingOptions getInsertWriterScalingOptions(ConnectorSession session, ConnectorTableHandle tableHandle) + { + return WriterScalingOptions.DISABLED; + } + final class Helper { private Helper() {} diff --git a/core/trino-spi/src/main/java/io/trino/spi/connector/WriterScalingOptions.java b/core/trino-spi/src/main/java/io/trino/spi/connector/WriterScalingOptions.java new file mode 100644 index 000000000000..52acb3632092 --- /dev/null +++ b/core/trino-spi/src/main/java/io/trino/spi/connector/WriterScalingOptions.java @@ -0,0 +1,63 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.spi.connector; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; + +import java.util.Optional; + +import static java.util.Objects.requireNonNull; + +public record WriterScalingOptions(boolean isWriterTasksScalingEnabled, boolean isPerTaskWriterScalingEnabled, Optional perTaskMaxScaledWriterCount) +{ + public static final WriterScalingOptions DISABLED = new WriterScalingOptions(false, false); + public static final WriterScalingOptions ENABLED = new WriterScalingOptions(true, true); + + public WriterScalingOptions(boolean writerTasksScalingEnabled, boolean perTaskWriterScalingEnabled) + { + this(writerTasksScalingEnabled, perTaskWriterScalingEnabled, Optional.empty()); + } + + @JsonCreator + public WriterScalingOptions( + @JsonProperty boolean isWriterTasksScalingEnabled, + @JsonProperty boolean isPerTaskWriterScalingEnabled, + @JsonProperty Optional perTaskMaxScaledWriterCount) + { + this.isWriterTasksScalingEnabled = isWriterTasksScalingEnabled; + this.isPerTaskWriterScalingEnabled = isPerTaskWriterScalingEnabled; + this.perTaskMaxScaledWriterCount = requireNonNull(perTaskMaxScaledWriterCount, "perTaskMaxScaledWriterCount is null"); + } + + @Override + @JsonProperty + public boolean isWriterTasksScalingEnabled() + { + return isWriterTasksScalingEnabled; + } + + @Override + @JsonProperty + public boolean isPerTaskWriterScalingEnabled() + { + return isPerTaskWriterScalingEnabled; + } + + @JsonProperty + public Optional perTaskMaxScaledWriterCount() + { + return perTaskMaxScaledWriterCount; + } +} diff --git a/lib/trino-plugin-toolkit/src/main/java/io/trino/plugin/base/classloader/ClassLoaderSafeConnectorMetadata.java b/lib/trino-plugin-toolkit/src/main/java/io/trino/plugin/base/classloader/ClassLoaderSafeConnectorMetadata.java index 01d52b1374a0..e80c958ac10a 100644 --- a/lib/trino-plugin-toolkit/src/main/java/io/trino/plugin/base/classloader/ClassLoaderSafeConnectorMetadata.java +++ b/lib/trino-plugin-toolkit/src/main/java/io/trino/plugin/base/classloader/ClassLoaderSafeConnectorMetadata.java @@ -63,6 +63,7 @@ import io.trino.spi.connector.TableFunctionApplicationResult; import io.trino.spi.connector.TableScanRedirectApplicationResult; import io.trino.spi.connector.TopNApplicationResult; +import io.trino.spi.connector.WriterScalingOptions; import io.trino.spi.expression.ConnectorExpression; import io.trino.spi.function.AggregationFunctionMetadata; import io.trino.spi.function.BoundSignature; @@ -1136,4 +1137,20 @@ public OptionalInt getMaxWriterTasks(ConnectorSession session) return delegate.getMaxWriterTasks(session); } } + + @Override + public WriterScalingOptions getNewTableWriterScalingOptions(ConnectorSession session, SchemaTableName tableName, Map tableProperties) + { + try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(classLoader)) { + return delegate.getNewTableWriterScalingOptions(session, tableName, tableProperties); + } + } + + @Override + public WriterScalingOptions getInsertWriterScalingOptions(ConnectorSession session, ConnectorTableHandle tableHandle) + { + try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(classLoader)) { + return delegate.getInsertWriterScalingOptions(session, tableHandle); + } + } } diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadata.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadata.java index 899cd3681643..4c55aa5c3ee5 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadata.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadata.java @@ -109,6 +109,7 @@ import io.trino.spi.connector.TableColumnsMetadata; import io.trino.spi.connector.TableNotFoundException; import io.trino.spi.connector.TableScanRedirectApplicationResult; +import io.trino.spi.connector.WriterScalingOptions; import io.trino.spi.expression.ConnectorExpression; import io.trino.spi.expression.Variable; import io.trino.spi.predicate.Domain; @@ -3148,6 +3149,18 @@ private Optional getRawSystemTable(SchemaTableName systemTableName) }; } + @Override + public WriterScalingOptions getNewTableWriterScalingOptions(ConnectorSession session, SchemaTableName tableName, Map tableProperties) + { + return WriterScalingOptions.ENABLED; + } + + @Override + public WriterScalingOptions getInsertWriterScalingOptions(ConnectorSession session, ConnectorTableHandle tableHandle) + { + return WriterScalingOptions.ENABLED; + } + private static Map toDeltaLakeColumnStatistics(Collection computedStatistics) { // Only statistics for whole table are collected diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveMetadata.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveMetadata.java index 943b40d8be29..29ab11dd2297 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveMetadata.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveMetadata.java @@ -102,6 +102,7 @@ import io.trino.spi.connector.TableNotFoundException; import io.trino.spi.connector.TableScanRedirectApplicationResult; import io.trino.spi.connector.ViewNotFoundException; +import io.trino.spi.connector.WriterScalingOptions; import io.trino.spi.expression.ConnectorExpression; import io.trino.spi.expression.Variable; import io.trino.spi.predicate.Domain; @@ -3844,6 +3845,18 @@ private Optional redirectTableToHudi(Optional ta return Optional.empty(); } + @Override + public WriterScalingOptions getNewTableWriterScalingOptions(ConnectorSession session, SchemaTableName tableName, Map tableProperties) + { + return WriterScalingOptions.ENABLED; + } + + @Override + public WriterScalingOptions getInsertWriterScalingOptions(ConnectorSession session, ConnectorTableHandle tableHandle) + { + return WriterScalingOptions.ENABLED; + } + private static TableNameSplitResult splitTableName(String tableName) { int metadataMarkerIndex = tableName.lastIndexOf('$'); diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java index cf57078bdbc6..eec0ce128bb0 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java @@ -88,6 +88,7 @@ import io.trino.spi.connector.SystemTable; import io.trino.spi.connector.TableColumnsMetadata; import io.trino.spi.connector.TableNotFoundException; +import io.trino.spi.connector.WriterScalingOptions; import io.trino.spi.expression.ConnectorExpression; import io.trino.spi.expression.FunctionName; import io.trino.spi.expression.Variable; @@ -2884,6 +2885,18 @@ public Optional redirectTable(ConnectorSession session, return catalog.redirectTable(session, tableName, targetCatalogName.get()); } + @Override + public WriterScalingOptions getNewTableWriterScalingOptions(ConnectorSession session, SchemaTableName tableName, Map tableProperties) + { + return WriterScalingOptions.ENABLED; + } + + @Override + public WriterScalingOptions getInsertWriterScalingOptions(ConnectorSession session, ConnectorTableHandle tableHandle) + { + return WriterScalingOptions.ENABLED; + } + private static CollectedStatistics processComputedTableStatistics(Table table, Collection computedStatistics) { Map columnNameToId = table.schema().columns().stream()