diff --git a/core/trino-main/src/main/java/io/trino/connector/system/GlobalSystemConnector.java b/core/trino-main/src/main/java/io/trino/connector/system/GlobalSystemConnector.java index a08b3a97e147..e981679a8a65 100644 --- a/core/trino-main/src/main/java/io/trino/connector/system/GlobalSystemConnector.java +++ b/core/trino-main/src/main/java/io/trino/connector/system/GlobalSystemConnector.java @@ -23,7 +23,6 @@ import io.trino.spi.connector.ConnectorSplitSource; import io.trino.spi.connector.ConnectorTransactionHandle; import io.trino.spi.connector.SystemTable; -import io.trino.spi.function.SchemaFunctionName; import io.trino.spi.procedure.Procedure; import io.trino.spi.ptf.ConnectorTableFunction; import io.trino.spi.ptf.ConnectorTableFunctionHandle; @@ -93,7 +92,7 @@ public ConnectorSplitManager getSplitManager() return new ConnectorSplitManager() { @Override - public ConnectorSplitSource getSplits(ConnectorTransactionHandle transaction, ConnectorSession session, SchemaFunctionName name, ConnectorTableFunctionHandle functionHandle) + public ConnectorSplitSource getSplits(ConnectorTransactionHandle transaction, ConnectorSession session, ConnectorTableFunctionHandle functionHandle) { if (functionHandle instanceof SequenceFunctionHandle sequenceFunctionHandle) { return getSequenceFunctionSplitSource(sequenceFunctionHandle); diff --git a/core/trino-main/src/main/java/io/trino/metadata/FunctionManager.java b/core/trino-main/src/main/java/io/trino/metadata/FunctionManager.java index 0e18eca190ae..1d47190b8e11 100644 --- a/core/trino-main/src/main/java/io/trino/metadata/FunctionManager.java +++ b/core/trino-main/src/main/java/io/trino/metadata/FunctionManager.java @@ -32,7 +32,6 @@ import io.trino.spi.function.InvocationConvention; import io.trino.spi.function.InvocationConvention.InvocationArgumentConvention; import io.trino.spi.function.ScalarFunctionImplementation; -import io.trino.spi.function.SchemaFunctionName; import io.trino.spi.function.WindowFunctionSupplier; import io.trino.spi.ptf.TableFunctionProcessorProvider; import io.trino.spi.type.Type; @@ -153,7 +152,6 @@ private WindowFunctionSupplier getWindowFunctionSupplierInternal(ResolvedFunctio public TableFunctionProcessorProvider getTableFunctionProcessorProvider(TableFunctionHandle tableFunctionHandle) { CatalogHandle catalogHandle = tableFunctionHandle.getCatalogHandle(); - SchemaFunctionName functionName = tableFunctionHandle.getSchemaFunctionName(); FunctionProvider provider; if (catalogHandle.equals(GlobalSystemConnector.CATALOG_HANDLE)) { @@ -161,10 +159,10 @@ public TableFunctionProcessorProvider getTableFunctionProcessorProvider(TableFun } else { provider = functionProviders.getService(catalogHandle); - checkArgument(provider != null, "No function provider for catalog: '%s' (function '%s')", catalogHandle, functionName); + checkArgument(provider != null, "No function provider for catalog: '%s'", catalogHandle); } - return provider.getTableFunctionProcessorProvider(functionName); + return provider.getTableFunctionProcessorProvider(tableFunctionHandle.getFunctionHandle()); } private FunctionDependencies getFunctionDependencies(ResolvedFunction resolvedFunction) diff --git a/core/trino-main/src/main/java/io/trino/metadata/GlobalFunctionCatalog.java b/core/trino-main/src/main/java/io/trino/metadata/GlobalFunctionCatalog.java index 08cb59e97a92..803fe0d3b888 100644 --- a/core/trino-main/src/main/java/io/trino/metadata/GlobalFunctionCatalog.java +++ b/core/trino-main/src/main/java/io/trino/metadata/GlobalFunctionCatalog.java @@ -17,8 +17,8 @@ import com.google.common.collect.ImmutableListMultimap; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Multimap; -import io.trino.operator.table.ExcludeColumns; -import io.trino.operator.table.Sequence; +import io.trino.operator.table.ExcludeColumns.ExcludeColumnsFunctionHandle; +import io.trino.operator.table.Sequence.SequenceFunctionHandle; import io.trino.spi.function.AggregationFunctionMetadata; import io.trino.spi.function.AggregationImplementation; import io.trino.spi.function.BoundSignature; @@ -33,6 +33,7 @@ import io.trino.spi.function.SchemaFunctionName; import io.trino.spi.function.Signature; import io.trino.spi.function.WindowFunctionSupplier; +import io.trino.spi.ptf.ConnectorTableFunctionHandle; import io.trino.spi.ptf.TableFunctionProcessorProvider; import io.trino.spi.type.TypeSignature; @@ -178,12 +179,12 @@ public ScalarFunctionImplementation getScalarFunctionImplementation( } @Override - public TableFunctionProcessorProvider getTableFunctionProcessorProvider(SchemaFunctionName name) + public TableFunctionProcessorProvider getTableFunctionProcessorProvider(ConnectorTableFunctionHandle functionHandle) { - if (name.equals(new SchemaFunctionName(BUILTIN_SCHEMA, ExcludeColumns.NAME))) { + if (functionHandle instanceof ExcludeColumnsFunctionHandle) { return getExcludeColumnsFunctionProcessorProvider(); } - if (name.equals(new SchemaFunctionName(BUILTIN_SCHEMA, Sequence.NAME))) { + if (functionHandle instanceof SequenceFunctionHandle) { return getSequenceFunctionProcessorProvider(); } diff --git a/core/trino-main/src/main/java/io/trino/metadata/TableFunctionHandle.java b/core/trino-main/src/main/java/io/trino/metadata/TableFunctionHandle.java index 6d2885f7d69f..2bc0b3fccb65 100644 --- a/core/trino-main/src/main/java/io/trino/metadata/TableFunctionHandle.java +++ b/core/trino-main/src/main/java/io/trino/metadata/TableFunctionHandle.java @@ -17,7 +17,6 @@ import com.fasterxml.jackson.annotation.JsonProperty; import io.trino.spi.connector.CatalogHandle; import io.trino.spi.connector.ConnectorTransactionHandle; -import io.trino.spi.function.SchemaFunctionName; import io.trino.spi.ptf.ConnectorTableFunctionHandle; import static java.util.Objects.requireNonNull; @@ -25,19 +24,16 @@ public class TableFunctionHandle { private final CatalogHandle catalogHandle; - private final SchemaFunctionName schemaFunctionName; private final ConnectorTableFunctionHandle functionHandle; private final ConnectorTransactionHandle transactionHandle; @JsonCreator public TableFunctionHandle( @JsonProperty("catalogHandle") CatalogHandle catalogHandle, - @JsonProperty("schemaFunctionName") SchemaFunctionName schemaFunctionName, @JsonProperty("functionHandle") ConnectorTableFunctionHandle functionHandle, @JsonProperty("transactionHandle") ConnectorTransactionHandle transactionHandle) { this.catalogHandle = requireNonNull(catalogHandle, "catalogHandle is null"); - this.schemaFunctionName = requireNonNull(schemaFunctionName, "schemaFunctionName is null"); this.functionHandle = requireNonNull(functionHandle, "functionHandle is null"); this.transactionHandle = requireNonNull(transactionHandle, "transactionHandle is null"); } @@ -48,12 +44,6 @@ public CatalogHandle getCatalogHandle() return catalogHandle; } - @JsonProperty - public SchemaFunctionName getSchemaFunctionName() - { - return schemaFunctionName; - } - @JsonProperty public ConnectorTableFunctionHandle getFunctionHandle() { diff --git a/core/trino-main/src/main/java/io/trino/operator/table/ExcludeColumns.java b/core/trino-main/src/main/java/io/trino/operator/table/ExcludeColumns.java index a05ec926c65c..755447922987 100644 --- a/core/trino-main/src/main/java/io/trino/operator/table/ExcludeColumns.java +++ b/core/trino-main/src/main/java/io/trino/operator/table/ExcludeColumns.java @@ -140,7 +140,7 @@ public TableFunctionAnalysis analyze(ConnectorSession session, ConnectorTransact return TableFunctionAnalysis.builder() .requiredColumns(TABLE_ARGUMENT_NAME, requiredColumns.build()) .returnedType(new Descriptor(returnedType)) - // there's no information to remember. All logic is effectively delegated to the engine via `requiredColumns`. We do not pass a ConnectorTableHandle. EMPTY_HANDLE will be used. + .handle(new ExcludeColumnsFunctionHandle()) .build(); } } @@ -161,4 +161,10 @@ public TableFunctionDataProcessor getDataProcessor(ConnectorTableFunctionHandle } }; } + + public record ExcludeColumnsFunctionHandle() + implements ConnectorTableFunctionHandle + { + // there's no information to remember. All logic is effectively delegated to the engine via `requiredColumns`. + } } diff --git a/core/trino-main/src/main/java/io/trino/split/SplitManager.java b/core/trino-main/src/main/java/io/trino/split/SplitManager.java index dbb11680b375..9e1d62dbfd22 100644 --- a/core/trino-main/src/main/java/io/trino/split/SplitManager.java +++ b/core/trino-main/src/main/java/io/trino/split/SplitManager.java @@ -96,7 +96,6 @@ public SplitSource getSplits(Session session, Span parentSpan, TableFunctionHand ConnectorSplitSource source = splitManager.getSplits( function.getTransactionHandle(), session.toConnectorSession(catalogHandle), - function.getSchemaFunctionName(), function.getFunctionHandle()); SplitSource splitSource = new ConnectorAwareSplitSource(catalogHandle, source); diff --git a/core/trino-main/src/main/java/io/trino/sql/analyzer/Analysis.java b/core/trino-main/src/main/java/io/trino/sql/analyzer/Analysis.java index 597cceaa400d..136e976cf121 100644 --- a/core/trino-main/src/main/java/io/trino/sql/analyzer/Analysis.java +++ b/core/trino-main/src/main/java/io/trino/sql/analyzer/Analysis.java @@ -2234,7 +2234,6 @@ public TableArgumentAnalysis build() public static class TableFunctionInvocationAnalysis { private final CatalogHandle catalogHandle; - private final String schemaName; private final String functionName; private final Map arguments; private final List tableArgumentAnalyses; @@ -2246,7 +2245,6 @@ public static class TableFunctionInvocationAnalysis public TableFunctionInvocationAnalysis( CatalogHandle catalogHandle, - String schemaName, String functionName, Map arguments, List tableArgumentAnalyses, @@ -2257,7 +2255,6 @@ public TableFunctionInvocationAnalysis( ConnectorTransactionHandle transactionHandle) { this.catalogHandle = requireNonNull(catalogHandle, "catalogHandle is null"); - this.schemaName = requireNonNull(schemaName, "schemaName is null"); this.functionName = requireNonNull(functionName, "functionName is null"); this.arguments = ImmutableMap.copyOf(arguments); this.tableArgumentAnalyses = ImmutableList.copyOf(tableArgumentAnalyses); @@ -2274,11 +2271,6 @@ public CatalogHandle getCatalogHandle() return catalogHandle; } - public String getSchemaName() - { - return schemaName; - } - public String getFunctionName() { return functionName; diff --git a/core/trino-main/src/main/java/io/trino/sql/analyzer/StatementAnalyzer.java b/core/trino-main/src/main/java/io/trino/sql/analyzer/StatementAnalyzer.java index 419fd8da0a7f..4b67f20f9ddd 100644 --- a/core/trino-main/src/main/java/io/trino/sql/analyzer/StatementAnalyzer.java +++ b/core/trino-main/src/main/java/io/trino/sql/analyzer/StatementAnalyzer.java @@ -1690,7 +1690,6 @@ else if (argument.getPartitionBy().isPresent()) { analysis.setTableFunctionAnalysis(node, new TableFunctionInvocationAnalysis( catalogHandle, - function.getSchema(), function.getName(), argumentsAnalysis.getPassedArguments(), orderedTableArguments.build(), diff --git a/core/trino-main/src/main/java/io/trino/sql/planner/RelationPlanner.java b/core/trino-main/src/main/java/io/trino/sql/planner/RelationPlanner.java index fbe2a093e905..ae1527599426 100644 --- a/core/trino-main/src/main/java/io/trino/sql/planner/RelationPlanner.java +++ b/core/trino-main/src/main/java/io/trino/sql/planner/RelationPlanner.java @@ -22,7 +22,6 @@ import io.trino.metadata.TableFunctionHandle; import io.trino.metadata.TableHandle; import io.trino.spi.connector.ColumnHandle; -import io.trino.spi.function.SchemaFunctionName; import io.trino.spi.type.RowType; import io.trino.spi.type.Type; import io.trino.sql.ExpressionUtils; @@ -476,7 +475,6 @@ else if (tableArgument.getPartitionBy().isPresent()) { functionAnalysis.getCopartitioningLists(), new TableFunctionHandle( functionAnalysis.getCatalogHandle(), - new SchemaFunctionName(functionAnalysis.getSchemaName(), functionAnalysis.getFunctionName()), functionAnalysis.getConnectorTableFunctionHandle(), functionAnalysis.getTransactionHandle())); diff --git a/core/trino-main/src/main/java/io/trino/tracing/TracingMetadata.java b/core/trino-main/src/main/java/io/trino/tracing/TracingMetadata.java index f8e09503bace..92cb4efaf53e 100644 --- a/core/trino-main/src/main/java/io/trino/tracing/TracingMetadata.java +++ b/core/trino-main/src/main/java/io/trino/tracing/TracingMetadata.java @@ -902,8 +902,7 @@ public Optional> applyTableFunction( { Span span = startSpan("applyTableFunction") .setAttribute(TrinoAttributes.CATALOG, handle.getCatalogHandle().getCatalogName()) - .setAttribute(TrinoAttributes.SCHEMA, handle.getSchemaFunctionName().getSchemaName()) - .setAttribute(TrinoAttributes.FUNCTION, handle.getSchemaFunctionName().getFunctionName()); + .setAttribute(TrinoAttributes.HANDLE, handle.getFunctionHandle().toString()); try (var ignored = scopedSpan(span)) { return delegate.applyTableFunction(session, handle); } diff --git a/core/trino-main/src/test/java/io/trino/connector/MockConnector.java b/core/trino-main/src/test/java/io/trino/connector/MockConnector.java index fb1f220c0d3e..08a910be0335 100644 --- a/core/trino-main/src/test/java/io/trino/connector/MockConnector.java +++ b/core/trino-main/src/test/java/io/trino/connector/MockConnector.java @@ -85,7 +85,6 @@ import io.trino.spi.eventlistener.EventListener; import io.trino.spi.expression.ConnectorExpression; import io.trino.spi.function.FunctionProvider; -import io.trino.spi.function.SchemaFunctionName; import io.trino.spi.metrics.Metrics; import io.trino.spi.procedure.Procedure; import io.trino.spi.ptf.ConnectorTableFunction; @@ -173,7 +172,7 @@ public class MockConnector private final Supplier>> tableProperties; private final Supplier>> columnProperties; private final List> sessionProperties; - private final Map> tableFunctionSplitsSources; + private final Function tableFunctionSplitsSources; private final OptionalInt maxWriterTasks; private final BiFunction> getLayoutForTableExecute; @@ -219,7 +218,7 @@ public class MockConnector Supplier>> tableProperties, Supplier>> columnProperties, boolean supportsReportingWrittenBytes, - Map> tableFunctionSplitsSources, + Function tableFunctionSplitsSources, OptionalInt maxWriterTasks, BiFunction> getLayoutForTableExecute) { @@ -264,7 +263,7 @@ public class MockConnector this.schemaProperties = requireNonNull(schemaProperties, "schemaProperties is null"); this.tableProperties = requireNonNull(tableProperties, "tableProperties is null"); this.columnProperties = requireNonNull(columnProperties, "columnProperties is null"); - this.tableFunctionSplitsSources = ImmutableMap.copyOf(tableFunctionSplitsSources); + this.tableFunctionSplitsSources = requireNonNull(tableFunctionSplitsSources, "tableFunctionSplitsSources is null"); this.maxWriterTasks = requireNonNull(maxWriterTasks, "maxWriterTasks is null"); this.getLayoutForTableExecute = requireNonNull(getLayoutForTableExecute, "getLayoutForTableExecute is null"); } @@ -316,11 +315,10 @@ public ConnectorSplitSource getSplits( } @Override - public ConnectorSplitSource getSplits(ConnectorTransactionHandle transaction, ConnectorSession session, SchemaFunctionName name, ConnectorTableFunctionHandle functionHandle) + public ConnectorSplitSource getSplits(ConnectorTransactionHandle transaction, ConnectorSession session, ConnectorTableFunctionHandle functionHandle) { - Function splitSourceProvider = tableFunctionSplitsSources.get(name); - requireNonNull(splitSourceProvider, "missing ConnectorSplitSource for table function " + name); - return splitSourceProvider.apply(functionHandle); + ConnectorSplitSource splits = tableFunctionSplitsSources.apply(functionHandle); + return requireNonNull(splits, "missing ConnectorSplitSource for table function handle " + functionHandle.getClass().getSimpleName()); } }; } diff --git a/core/trino-main/src/test/java/io/trino/connector/MockConnectorFactory.java b/core/trino-main/src/test/java/io/trino/connector/MockConnectorFactory.java index f079b91c3206..07517e732e4d 100644 --- a/core/trino-main/src/test/java/io/trino/connector/MockConnectorFactory.java +++ b/core/trino-main/src/test/java/io/trino/connector/MockConnectorFactory.java @@ -52,7 +52,6 @@ import io.trino.spi.eventlistener.EventListener; import io.trino.spi.expression.ConnectorExpression; import io.trino.spi.function.FunctionProvider; -import io.trino.spi.function.SchemaFunctionName; import io.trino.spi.metrics.Metrics; import io.trino.spi.procedure.Procedure; import io.trino.spi.ptf.ConnectorTableFunction; @@ -64,7 +63,6 @@ import io.trino.spi.statistics.TableStatistics; import java.util.ArrayList; -import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -126,7 +124,7 @@ public class MockConnectorFactory private final Supplier>> tableProperties; private final Supplier>> columnProperties; private final Optional partitioningProvider; - private final Map> tableFunctionSplitsSources; + private final Function tableFunctionSplitsSources; // access control private final ListRoleGrants roleGrants; @@ -178,7 +176,7 @@ private MockConnectorFactory( boolean supportsReportingWrittenBytes, Optional accessControl, boolean allowMissingColumnsOnInsert, - Map> tableFunctionSplitsSources, + Function tableFunctionSplitsSources, OptionalInt maxWriterTasks, BiFunction> getLayoutForTableExecute) { @@ -224,7 +222,7 @@ private MockConnectorFactory( this.functionProvider = requireNonNull(functionProvider, "functionProvider is null"); this.allowMissingColumnsOnInsert = allowMissingColumnsOnInsert; this.supportsReportingWrittenBytes = supportsReportingWrittenBytes; - this.tableFunctionSplitsSources = ImmutableMap.copyOf(tableFunctionSplitsSources); + this.tableFunctionSplitsSources = requireNonNull(tableFunctionSplitsSources, "tableFunctionSplitsSources is null"); this.maxWriterTasks = maxWriterTasks; this.getLayoutForTableExecute = requireNonNull(getLayoutForTableExecute, "getLayoutForTableExecute is null"); } @@ -409,7 +407,7 @@ public static final class Builder private Supplier>> tableProperties = ImmutableList::of; private Supplier>> columnProperties = ImmutableList::of; private Optional partitioningProvider = Optional.empty(); - private final Map> tableFunctionSplitsSources = new HashMap<>(); + private Function tableFunctionSplitsSources = handle -> null; // access control private boolean provideAccessControl; @@ -678,9 +676,9 @@ public Builder withPartitionProvider(ConnectorNodePartitioningProvider partition return this; } - public Builder withTableFunctionSplitSource(SchemaFunctionName name, Function sourceProvider) + public Builder withTableFunctionSplitSources(Function sourceProvider) { - tableFunctionSplitsSources.put(name, sourceProvider); + tableFunctionSplitsSources = requireNonNull(sourceProvider, "sourceProvider is null"); return this; } diff --git a/core/trino-main/src/test/java/io/trino/connector/TestingTableFunctions.java b/core/trino-main/src/test/java/io/trino/connector/TestingTableFunctions.java index faad15a6b910..3df5c413921a 100644 --- a/core/trino-main/src/test/java/io/trino/connector/TestingTableFunctions.java +++ b/core/trino-main/src/test/java/io/trino/connector/TestingTableFunctions.java @@ -28,6 +28,7 @@ import io.trino.spi.connector.ConnectorTransactionHandle; import io.trino.spi.connector.FixedSplitSource; import io.trino.spi.connector.SchemaTableName; +import io.trino.spi.function.SchemaFunctionName; import io.trino.spi.predicate.TupleDomain; import io.trino.spi.ptf.AbstractConnectorTableFunction; import io.trino.spi.ptf.Argument; @@ -479,11 +480,13 @@ public MockConnectorTableHandle getTableHandle() public static class IdentityFunction extends AbstractConnectorTableFunction { + public static final String FUNCTION_NAME = "identity_function"; + public IdentityFunction() { super( SCHEMA_NAME, - "identity_function", + FUNCTION_NAME, ImmutableList.of( TableArgumentSpecification.builder() .name("INPUT") @@ -500,7 +503,7 @@ public TableFunctionAnalysis analyze(ConnectorSession session, ConnectorTransact .map(field -> new Descriptor.Field(field.getName().orElse("anonymous_column"), Optional.of(field.getType()))) .collect(toImmutableList())); return TableFunctionAnalysis.builder() - .handle(new EmptyTableFunctionHandle()) + .handle(new EmptyTableFunctionHandle(new SchemaFunctionName(SCHEMA_NAME, FUNCTION_NAME))) .returnedType(returnedType) .requiredColumns("INPUT", IntStream.range(0, inputColumns.size()).boxed().collect(toImmutableList())) .build(); @@ -526,11 +529,13 @@ public TableFunctionDataProcessor getDataProcessor(ConnectorTableFunctionHandle public static class IdentityPassThroughFunction extends AbstractConnectorTableFunction { + public static final String FUNCTION_NAME = "identity_pass_through_function"; + public IdentityPassThroughFunction() { super( SCHEMA_NAME, - "identity_pass_through_function", + FUNCTION_NAME, ImmutableList.of( TableArgumentSpecification.builder() .name("INPUT") @@ -544,7 +549,7 @@ public IdentityPassThroughFunction() public TableFunctionAnalysis analyze(ConnectorSession session, ConnectorTransactionHandle transaction, Map arguments) { return TableFunctionAnalysis.builder() - .handle(new EmptyTableFunctionHandle()) + .handle(new EmptyTableFunctionHandle(new SchemaFunctionName(SCHEMA_NAME, FUNCTION_NAME))) .requiredColumns("INPUT", ImmutableList.of(0)) // per spec, function must require at least one column .build(); } @@ -707,11 +712,13 @@ public TableFunctionProcessorState process(List> input) public static class EmptyOutputFunction extends AbstractConnectorTableFunction { + public static final String FUNCTION_NAME = "empty_output"; + public EmptyOutputFunction() { super( SCHEMA_NAME, - "empty_output", + FUNCTION_NAME, ImmutableList.of(TableArgumentSpecification.builder() .name("INPUT") .keepWhenEmpty() @@ -723,7 +730,7 @@ public EmptyOutputFunction() public TableFunctionAnalysis analyze(ConnectorSession session, ConnectorTransactionHandle transaction, Map arguments) { return TableFunctionAnalysis.builder() - .handle(new EmptyTableFunctionHandle()) + .handle(new EmptyTableFunctionHandle(new SchemaFunctionName(SCHEMA_NAME, FUNCTION_NAME))) .requiredColumns("INPUT", IntStream.range(0, ((TableArgument) arguments.get("INPUT")).getRowType().getFields().size()).boxed().collect(toImmutableList())) .build(); } @@ -758,11 +765,13 @@ public TableFunctionProcessorState process(List> input) public static class EmptyOutputWithPassThroughFunction extends AbstractConnectorTableFunction { + public static final String FUNCTION_NAME = "empty_output_with_pass_through"; + public EmptyOutputWithPassThroughFunction() { super( SCHEMA_NAME, - "empty_output_with_pass_through", + FUNCTION_NAME, ImmutableList.of(TableArgumentSpecification.builder() .name("INPUT") .keepWhenEmpty() @@ -775,7 +784,7 @@ public EmptyOutputWithPassThroughFunction() public TableFunctionAnalysis analyze(ConnectorSession session, ConnectorTransactionHandle transaction, Map arguments) { return TableFunctionAnalysis.builder() - .handle(new EmptyTableFunctionHandle()) + .handle(new EmptyTableFunctionHandle(new SchemaFunctionName(SCHEMA_NAME, FUNCTION_NAME))) .requiredColumns("INPUT", IntStream.range(0, ((TableArgument) arguments.get("INPUT")).getRowType().getFields().size()).boxed().collect(toImmutableList())) .build(); } @@ -813,11 +822,13 @@ public TableFunctionProcessorState process(List> input) public static class TestInputsFunction extends AbstractConnectorTableFunction { + public static final String FUNCTION_NAME = "test_inputs_function"; + public TestInputsFunction() { super( SCHEMA_NAME, - "test_inputs_function", + FUNCTION_NAME, ImmutableList.of( TableArgumentSpecification.builder() .rowSemantics() @@ -842,7 +853,7 @@ public TestInputsFunction() public TableFunctionAnalysis analyze(ConnectorSession session, ConnectorTransactionHandle transaction, Map arguments) { return TableFunctionAnalysis.builder() - .handle(new EmptyTableFunctionHandle()) + .handle(new EmptyTableFunctionHandle(new SchemaFunctionName(SCHEMA_NAME, FUNCTION_NAME))) .requiredColumns("INPUT_1", IntStream.range(0, ((TableArgument) arguments.get("INPUT_1")).getRowType().getFields().size()).boxed().collect(toImmutableList())) .requiredColumns("INPUT_2", IntStream.range(0, ((TableArgument) arguments.get("INPUT_2")).getRowType().getFields().size()).boxed().collect(toImmutableList())) .requiredColumns("INPUT_3", IntStream.range(0, ((TableArgument) arguments.get("INPUT_3")).getRowType().getFields().size()).boxed().collect(toImmutableList())) @@ -874,11 +885,13 @@ public TableFunctionDataProcessor getDataProcessor(ConnectorTableFunctionHandle public static class PassThroughInputFunction extends AbstractConnectorTableFunction { + public static final String FUNCTION_NAME = "pass_through"; + public PassThroughInputFunction() { super( SCHEMA_NAME, - "pass_through", + FUNCTION_NAME, ImmutableList.of( TableArgumentSpecification.builder() .name("INPUT_1") @@ -899,7 +912,7 @@ public PassThroughInputFunction() public TableFunctionAnalysis analyze(ConnectorSession session, ConnectorTransactionHandle transaction, Map arguments) { return TableFunctionAnalysis.builder() - .handle(new EmptyTableFunctionHandle()) + .handle(new EmptyTableFunctionHandle(new SchemaFunctionName(SCHEMA_NAME, FUNCTION_NAME))) .requiredColumns("INPUT_1", ImmutableList.of(0)) .requiredColumns("INPUT_2", ImmutableList.of(0)) .build(); @@ -977,11 +990,13 @@ public TableFunctionProcessorState process(List> input) public static class TestInputFunction extends AbstractConnectorTableFunction { + public static final String FUNCTION_NAME = "test_input"; + public TestInputFunction() { super( SCHEMA_NAME, - "test_input", + FUNCTION_NAME, ImmutableList.of(TableArgumentSpecification.builder() .name("INPUT") .keepWhenEmpty() @@ -993,7 +1008,7 @@ public TestInputFunction() public TableFunctionAnalysis analyze(ConnectorSession session, ConnectorTransactionHandle transaction, Map arguments) { return TableFunctionAnalysis.builder() - .handle(new EmptyTableFunctionHandle()) + .handle(new EmptyTableFunctionHandle(new SchemaFunctionName(SCHEMA_NAME, FUNCTION_NAME))) .requiredColumns("INPUT", IntStream.range(0, ((TableArgument) arguments.get("INPUT")).getRowType().getFields().size()).boxed().collect(toImmutableList())) .build(); } @@ -1035,11 +1050,13 @@ public TableFunctionProcessorState process(List> input) public static class TestSingleInputRowSemanticsFunction extends AbstractConnectorTableFunction { + public static final String FUNCTION_NAME = "test_single_input_function"; + public TestSingleInputRowSemanticsFunction() { super( SCHEMA_NAME, - "test_single_input_function", + FUNCTION_NAME, ImmutableList.of(TableArgumentSpecification.builder() .rowSemantics() .name("INPUT") @@ -1051,7 +1068,7 @@ public TestSingleInputRowSemanticsFunction() public TableFunctionAnalysis analyze(ConnectorSession session, ConnectorTransactionHandle transaction, Map arguments) { return TableFunctionAnalysis.builder() - .handle(new EmptyTableFunctionHandle()) + .handle(new EmptyTableFunctionHandle(new SchemaFunctionName(SCHEMA_NAME, FUNCTION_NAME))) .requiredColumns("INPUT", IntStream.range(0, ((TableArgument) arguments.get("INPUT")).getRowType().getFields().size()).boxed().collect(toImmutableList())) .build(); } @@ -1247,11 +1264,13 @@ public long getRetainedSizeInBytes() public static class EmptySourceFunction extends AbstractConnectorTableFunction { + public static final String FUNCTION_NAME = "empty_source"; + public EmptySourceFunction() { super( SCHEMA_NAME, - "empty_source", + FUNCTION_NAME, ImmutableList.of(), new DescribedTable(new Descriptor(ImmutableList.of(new Descriptor.Field("column", Optional.of(BOOLEAN)))))); } @@ -1260,7 +1279,7 @@ public EmptySourceFunction() public TableFunctionAnalysis analyze(ConnectorSession session, ConnectorTransactionHandle transaction, Map arguments) { return TableFunctionAnalysis.builder() - .handle(new EmptyTableFunctionHandle()) + .handle(new EmptyTableFunctionHandle(new SchemaFunctionName(SCHEMA_NAME, FUNCTION_NAME))) .build(); } @@ -1293,8 +1312,12 @@ public TableFunctionProcessorState process() } } - public static class EmptyTableFunctionHandle + public record EmptyTableFunctionHandle(SchemaFunctionName name) implements ConnectorTableFunctionHandle { + public EmptyTableFunctionHandle + { + requireNonNull(name, "name is null"); + } } } diff --git a/core/trino-main/src/test/java/io/trino/sql/planner/iterative/rule/test/PlanBuilder.java b/core/trino-main/src/test/java/io/trino/sql/planner/iterative/rule/test/PlanBuilder.java index 2dfe735377a7..ededc71dbbac 100644 --- a/core/trino-main/src/test/java/io/trino/sql/planner/iterative/rule/test/PlanBuilder.java +++ b/core/trino-main/src/test/java/io/trino/sql/planner/iterative/rule/test/PlanBuilder.java @@ -33,7 +33,6 @@ import io.trino.spi.connector.ColumnHandle; import io.trino.spi.connector.SchemaTableName; import io.trino.spi.connector.SortOrder; -import io.trino.spi.function.SchemaFunctionName; import io.trino.spi.predicate.TupleDomain; import io.trino.spi.ptf.ConnectorTableFunctionHandle; import io.trino.spi.type.Type; @@ -1237,7 +1236,7 @@ public TableFunctionNode tableFunction( sources, tableArgumentProperties, copartitioningLists, - new TableFunctionHandle(TEST_CATALOG_HANDLE, new SchemaFunctionName("system", name), new ConnectorTableFunctionHandle() {}, TestingTransactionHandle.create())); + new TableFunctionHandle(TEST_CATALOG_HANDLE, new ConnectorTableFunctionHandle() {}, TestingTransactionHandle.create())); } public TableFunctionProcessorNode tableFunctionProcessor(Consumer consumer) diff --git a/core/trino-main/src/test/java/io/trino/sql/planner/iterative/rule/test/TableFunctionProcessorBuilder.java b/core/trino-main/src/test/java/io/trino/sql/planner/iterative/rule/test/TableFunctionProcessorBuilder.java index 2b271f8674d1..5b4f3604cef6 100644 --- a/core/trino-main/src/test/java/io/trino/sql/planner/iterative/rule/test/TableFunctionProcessorBuilder.java +++ b/core/trino-main/src/test/java/io/trino/sql/planner/iterative/rule/test/TableFunctionProcessorBuilder.java @@ -16,7 +16,6 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; import io.trino.metadata.TableFunctionHandle; -import io.trino.spi.function.SchemaFunctionName; import io.trino.spi.ptf.ConnectorTableFunctionHandle; import io.trino.sql.planner.PlanNodeIdAllocator; import io.trino.sql.planner.Symbol; @@ -130,6 +129,6 @@ public TableFunctionProcessorNode build(PlanNodeIdAllocator idAllocator) prePartitioned, preSorted, hashSymbol, - new TableFunctionHandle(TEST_CATALOG_HANDLE, new SchemaFunctionName("system", name), new ConnectorTableFunctionHandle() {}, TestingTransactionHandle.create())); + new TableFunctionHandle(TEST_CATALOG_HANDLE, new ConnectorTableFunctionHandle() {}, TestingTransactionHandle.create())); } } diff --git a/core/trino-spi/pom.xml b/core/trino-spi/pom.xml index 7eaaa0aa84cc..998395f8981a 100644 --- a/core/trino-spi/pom.xml +++ b/core/trino-spi/pom.xml @@ -239,6 +239,12 @@ interface io.trino.spi.ptf.TableFunctionSplitProcessor It is just marked @Experimental + + true + java.class.removed + enum io.trino.spi.ptf.EmptyTableFunctionHandle + Default implementation should not be used, as ConnectorTableHandle represents table function identity. + true java.class.externalClassExposedInAPI diff --git a/core/trino-spi/src/main/java/io/trino/spi/connector/ConnectorSplitManager.java b/core/trino-spi/src/main/java/io/trino/spi/connector/ConnectorSplitManager.java index 4fe598a96c29..a1be4d4fa1a6 100644 --- a/core/trino-spi/src/main/java/io/trino/spi/connector/ConnectorSplitManager.java +++ b/core/trino-spi/src/main/java/io/trino/spi/connector/ConnectorSplitManager.java @@ -14,7 +14,6 @@ package io.trino.spi.connector; import io.trino.spi.Experimental; -import io.trino.spi.function.SchemaFunctionName; import io.trino.spi.ptf.ConnectorTableFunctionHandle; public interface ConnectorSplitManager @@ -33,7 +32,6 @@ default ConnectorSplitSource getSplits( default ConnectorSplitSource getSplits( ConnectorTransactionHandle transaction, ConnectorSession session, - SchemaFunctionName name, ConnectorTableFunctionHandle function) { throw new UnsupportedOperationException(); diff --git a/core/trino-spi/src/main/java/io/trino/spi/function/FunctionProvider.java b/core/trino-spi/src/main/java/io/trino/spi/function/FunctionProvider.java index 30e9a7c189eb..00f831226d26 100644 --- a/core/trino-spi/src/main/java/io/trino/spi/function/FunctionProvider.java +++ b/core/trino-spi/src/main/java/io/trino/spi/function/FunctionProvider.java @@ -14,6 +14,7 @@ package io.trino.spi.function; import io.trino.spi.Experimental; +import io.trino.spi.ptf.ConnectorTableFunctionHandle; import io.trino.spi.ptf.TableFunctionProcessorProvider; @Experimental(eta = "2023-03-31") @@ -38,7 +39,7 @@ default WindowFunctionSupplier getWindowFunctionSupplier(FunctionId functionId, throw new UnsupportedOperationException("%s does not provide window functions".formatted(getClass().getName())); } - default TableFunctionProcessorProvider getTableFunctionProcessorProvider(SchemaFunctionName name) + default TableFunctionProcessorProvider getTableFunctionProcessorProvider(ConnectorTableFunctionHandle functionHandle) { throw new UnsupportedOperationException("%s does not provide table functions".formatted(getClass().getName())); } diff --git a/core/trino-spi/src/main/java/io/trino/spi/ptf/EmptyTableFunctionHandle.java b/core/trino-spi/src/main/java/io/trino/spi/ptf/EmptyTableFunctionHandle.java deleted file mode 100644 index ab49c94a77f4..000000000000 --- a/core/trino-spi/src/main/java/io/trino/spi/ptf/EmptyTableFunctionHandle.java +++ /dev/null @@ -1,20 +0,0 @@ -/* - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.trino.spi.ptf; - -public enum EmptyTableFunctionHandle - implements ConnectorTableFunctionHandle -{ - EMPTY_HANDLE -} diff --git a/core/trino-spi/src/main/java/io/trino/spi/ptf/TableFunctionAnalysis.java b/core/trino-spi/src/main/java/io/trino/spi/ptf/TableFunctionAnalysis.java index 583103f707cc..cb58e97f9b55 100644 --- a/core/trino-spi/src/main/java/io/trino/spi/ptf/TableFunctionAnalysis.java +++ b/core/trino-spi/src/main/java/io/trino/spi/ptf/TableFunctionAnalysis.java @@ -20,7 +20,6 @@ import java.util.Map; import java.util.Optional; -import static io.trino.spi.ptf.EmptyTableFunctionHandle.EMPTY_HANDLE; import static io.trino.spi.ptf.Preconditions.checkArgument; import static java.util.Objects.requireNonNull; import static java.util.stream.Collectors.toMap; @@ -82,7 +81,7 @@ public static final class Builder { private Descriptor returnedType; private final Map> requiredColumns = new HashMap<>(); - private ConnectorTableFunctionHandle handle = EMPTY_HANDLE; + private ConnectorTableFunctionHandle handle; private Builder() {} diff --git a/lib/trino-plugin-toolkit/src/main/java/io/trino/plugin/base/classloader/ClassLoaderSafeConnectorSplitManager.java b/lib/trino-plugin-toolkit/src/main/java/io/trino/plugin/base/classloader/ClassLoaderSafeConnectorSplitManager.java index 716687081720..ae14194a0374 100644 --- a/lib/trino-plugin-toolkit/src/main/java/io/trino/plugin/base/classloader/ClassLoaderSafeConnectorSplitManager.java +++ b/lib/trino-plugin-toolkit/src/main/java/io/trino/plugin/base/classloader/ClassLoaderSafeConnectorSplitManager.java @@ -21,7 +21,6 @@ import io.trino.spi.connector.ConnectorTransactionHandle; import io.trino.spi.connector.Constraint; import io.trino.spi.connector.DynamicFilter; -import io.trino.spi.function.SchemaFunctionName; import io.trino.spi.ptf.ConnectorTableFunctionHandle; import javax.inject.Inject; @@ -58,11 +57,10 @@ public ConnectorSplitSource getSplits( public ConnectorSplitSource getSplits( ConnectorTransactionHandle transaction, ConnectorSession session, - SchemaFunctionName name, ConnectorTableFunctionHandle function) { try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(classLoader)) { - return delegate.getSplits(transaction, session, name, function); + return delegate.getSplits(transaction, session, function); } } } diff --git a/testing/trino-tests/src/test/java/io/trino/tests/TestMockConnector.java b/testing/trino-tests/src/test/java/io/trino/tests/TestMockConnector.java index a3a109be2ed6..fd2333469752 100644 --- a/testing/trino-tests/src/test/java/io/trino/tests/TestMockConnector.java +++ b/testing/trino-tests/src/test/java/io/trino/tests/TestMockConnector.java @@ -235,7 +235,7 @@ public void testTableProcedure() public void testTableFunction() { assertThatThrownBy(() -> assertUpdate("SELECT * FROM TABLE(mock.system.simple_table_function())")) - .hasMessage("missing ConnectorSplitSource for table function system.simple_table_function"); + .hasMessage("missing ConnectorSplitSource for table function handle SimpleTableFunctionHandle"); assertThatThrownBy(() -> assertUpdate("SELECT * FROM TABLE(mock.system.non_existing_table_function())")) .hasMessageContaining("Table function mock.system.non_existing_table_function not registered"); } diff --git a/testing/trino-tests/src/test/java/io/trino/tests/TestTableFunctionInvocation.java b/testing/trino-tests/src/test/java/io/trino/tests/TestTableFunctionInvocation.java index d40d838658d3..24c3cf54c89f 100644 --- a/testing/trino-tests/src/test/java/io/trino/tests/TestTableFunctionInvocation.java +++ b/testing/trino-tests/src/test/java/io/trino/tests/TestTableFunctionInvocation.java @@ -26,6 +26,7 @@ import io.trino.connector.TestingTableFunctions.EmptyOutputWithPassThroughFunction.EmptyOutputWithPassThroughProcessorProvider; import io.trino.connector.TestingTableFunctions.EmptySourceFunction; import io.trino.connector.TestingTableFunctions.EmptySourceFunction.EmptySourceFunctionProcessorProvider; +import io.trino.connector.TestingTableFunctions.EmptyTableFunctionHandle; import io.trino.connector.TestingTableFunctions.IdentityFunction; import io.trino.connector.TestingTableFunctions.IdentityFunction.IdentityFunctionProcessorProvider; import io.trino.connector.TestingTableFunctions.IdentityPassThroughFunction; @@ -33,6 +34,7 @@ import io.trino.connector.TestingTableFunctions.PassThroughInputFunction; import io.trino.connector.TestingTableFunctions.PassThroughInputFunction.PassThroughInputProcessorProvider; import io.trino.connector.TestingTableFunctions.RepeatFunction; +import io.trino.connector.TestingTableFunctions.RepeatFunction.RepeatFunctionHandle; import io.trino.connector.TestingTableFunctions.RepeatFunction.RepeatFunctionProcessorProvider; import io.trino.connector.TestingTableFunctions.SimpleTableFunction; import io.trino.connector.TestingTableFunctions.SimpleTableFunction.SimpleTableFunctionHandle; @@ -47,6 +49,7 @@ import io.trino.spi.connector.TableFunctionApplicationResult; import io.trino.spi.function.FunctionProvider; import io.trino.spi.function.SchemaFunctionName; +import io.trino.spi.ptf.ConnectorTableFunctionHandle; import io.trino.spi.ptf.TableFunctionProcessorProvider; import io.trino.testing.AbstractTestQueryFramework; import io.trino.testing.DistributedQueryRunner; @@ -107,51 +110,42 @@ public void setUp() .withFunctionProvider(Optional.of(new FunctionProvider() { @Override - public TableFunctionProcessorProvider getTableFunctionProcessorProvider(SchemaFunctionName name) + public TableFunctionProcessorProvider getTableFunctionProcessorProvider(ConnectorTableFunctionHandle functionHandle) { - if (name.equals(new SchemaFunctionName("system", "identity_function"))) { - return new IdentityFunctionProcessorProvider(); + if (functionHandle instanceof EmptyTableFunctionHandle handle) { + return switch (handle.name().getFunctionName()) { + case "identity_function" -> new IdentityFunctionProcessorProvider(); + case "identity_pass_through_function" -> new IdentityPassThroughFunctionProcessorProvider(); + case "empty_output" -> new EmptyOutputProcessorProvider(); + case "empty_output_with_pass_through" -> new EmptyOutputWithPassThroughProcessorProvider(); + case "test_inputs_function" -> new TestInputsFunctionProcessorProvider(); + case "pass_through" -> new PassThroughInputProcessorProvider(); + case "test_input" -> new TestInputProcessorProvider(); + case "test_single_input_function" -> new TestSingleInputFunctionProcessorProvider(); + case "empty_source" -> new EmptySourceFunctionProcessorProvider(); + default -> throw new IllegalArgumentException("unexpected table function: " + handle.name()); + }; } - if (name.equals(new SchemaFunctionName("system", "identity_pass_through_function"))) { - return new IdentityPassThroughFunctionProcessorProvider(); - } - if (name.equals(new SchemaFunctionName("system", "repeat"))) { + if (functionHandle instanceof RepeatFunctionHandle) { return new RepeatFunctionProcessorProvider(); } - if (name.equals(new SchemaFunctionName("system", "empty_output"))) { - return new EmptyOutputProcessorProvider(); - } - if (name.equals(new SchemaFunctionName("system", "empty_output_with_pass_through"))) { - return new EmptyOutputWithPassThroughProcessorProvider(); - } - if (name.equals(new SchemaFunctionName("system", "test_inputs_function"))) { - return new TestInputsFunctionProcessorProvider(); - } - if (name.equals(new SchemaFunctionName("system", "pass_through"))) { - return new PassThroughInputProcessorProvider(); - } - if (name.equals(new SchemaFunctionName("system", "test_input"))) { - return new TestInputProcessorProvider(); - } - if (name.equals(new SchemaFunctionName("system", "test_single_input_function"))) { - return new TestSingleInputFunctionProcessorProvider(); - } - if (name.equals(new SchemaFunctionName("system", "constant"))) { + if (functionHandle instanceof ConstantFunctionHandle) { return new ConstantFunctionProcessorProvider(); } - if (name.equals(new SchemaFunctionName("system", "empty_source"))) { - return new EmptySourceFunctionProcessorProvider(); - } return null; } })) - .withTableFunctionSplitSource( - new SchemaFunctionName("system", "constant"), - handle -> getConstantFunctionSplitSource((ConstantFunctionHandle) handle)) - .withTableFunctionSplitSource( - new SchemaFunctionName("system", "empty_source"), - handle -> new FixedSplitSource(ImmutableList.of(MOCK_CONNECTOR_SPLIT))) + .withTableFunctionSplitSources(functionHandle -> { + if (functionHandle instanceof ConstantFunctionHandle handle) { + return getConstantFunctionSplitSource(handle); + } + if (functionHandle instanceof EmptyTableFunctionHandle handle && handle.name().equals(new SchemaFunctionName("system", "empty_source"))) { + return new FixedSplitSource(ImmutableList.of(MOCK_CONNECTOR_SPLIT)); + } + + return null; + }) .build())); queryRunner.createCatalog(TESTING_CATALOG, "mock");