Skip to content

Commit

Permalink
Remove function name from TableFunctionHandle
Browse files Browse the repository at this point in the history
Use the ConnectorTableFunctionHandle as the function's
identity in ConnectorSplitManager and FunctionProvider.
  • Loading branch information
kasiafi committed Apr 21, 2023
1 parent a50cfc1 commit c4c4859
Show file tree
Hide file tree
Showing 23 changed files with 113 additions and 139 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import io.trino.spi.connector.ConnectorSplitSource;
import io.trino.spi.connector.ConnectorTransactionHandle;
import io.trino.spi.connector.SystemTable;
import io.trino.spi.function.SchemaFunctionName;
import io.trino.spi.procedure.Procedure;
import io.trino.spi.ptf.ConnectorTableFunction;
import io.trino.spi.ptf.ConnectorTableFunctionHandle;
Expand Down Expand Up @@ -93,7 +92,7 @@ public ConnectorSplitManager getSplitManager()
return new ConnectorSplitManager()
{
@Override
public ConnectorSplitSource getSplits(ConnectorTransactionHandle transaction, ConnectorSession session, SchemaFunctionName name, ConnectorTableFunctionHandle functionHandle)
public ConnectorSplitSource getSplits(ConnectorTransactionHandle transaction, ConnectorSession session, ConnectorTableFunctionHandle functionHandle)
{
if (functionHandle instanceof SequenceFunctionHandle sequenceFunctionHandle) {
return getSequenceFunctionSplitSource(sequenceFunctionHandle);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
import io.trino.spi.function.InvocationConvention;
import io.trino.spi.function.InvocationConvention.InvocationArgumentConvention;
import io.trino.spi.function.ScalarFunctionImplementation;
import io.trino.spi.function.SchemaFunctionName;
import io.trino.spi.function.WindowFunctionSupplier;
import io.trino.spi.ptf.TableFunctionProcessorProvider;
import io.trino.spi.type.Type;
Expand Down Expand Up @@ -153,18 +152,17 @@ private WindowFunctionSupplier getWindowFunctionSupplierInternal(ResolvedFunctio
public TableFunctionProcessorProvider getTableFunctionProcessorProvider(TableFunctionHandle tableFunctionHandle)
{
CatalogHandle catalogHandle = tableFunctionHandle.getCatalogHandle();
SchemaFunctionName functionName = tableFunctionHandle.getSchemaFunctionName();

FunctionProvider provider;
if (catalogHandle.equals(GlobalSystemConnector.CATALOG_HANDLE)) {
provider = globalFunctionCatalog;
}
else {
provider = functionProviders.getService(catalogHandle);
checkArgument(provider != null, "No function provider for catalog: '%s' (function '%s')", catalogHandle, functionName);
checkArgument(provider != null, "No function provider for catalog: '%s'", catalogHandle);
}

return provider.getTableFunctionProcessorProvider(functionName);
return provider.getTableFunctionProcessorProvider(tableFunctionHandle.getFunctionHandle());
}

private FunctionDependencies getFunctionDependencies(ResolvedFunction resolvedFunction)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@
import com.google.common.collect.ImmutableListMultimap;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Multimap;
import io.trino.operator.table.ExcludeColumns;
import io.trino.operator.table.Sequence;
import io.trino.operator.table.ExcludeColumns.ExcludeColumnsFunctionHandle;
import io.trino.operator.table.Sequence.SequenceFunctionHandle;
import io.trino.spi.function.AggregationFunctionMetadata;
import io.trino.spi.function.AggregationImplementation;
import io.trino.spi.function.BoundSignature;
Expand All @@ -33,6 +33,7 @@
import io.trino.spi.function.SchemaFunctionName;
import io.trino.spi.function.Signature;
import io.trino.spi.function.WindowFunctionSupplier;
import io.trino.spi.ptf.ConnectorTableFunctionHandle;
import io.trino.spi.ptf.TableFunctionProcessorProvider;
import io.trino.spi.type.TypeSignature;

Expand Down Expand Up @@ -178,12 +179,12 @@ public ScalarFunctionImplementation getScalarFunctionImplementation(
}

@Override
public TableFunctionProcessorProvider getTableFunctionProcessorProvider(SchemaFunctionName name)
public TableFunctionProcessorProvider getTableFunctionProcessorProvider(ConnectorTableFunctionHandle functionHandle)
{
if (name.equals(new SchemaFunctionName(BUILTIN_SCHEMA, ExcludeColumns.NAME))) {
if (functionHandle instanceof ExcludeColumnsFunctionHandle) {
return getExcludeColumnsFunctionProcessorProvider();
}
if (name.equals(new SchemaFunctionName(BUILTIN_SCHEMA, Sequence.NAME))) {
if (functionHandle instanceof SequenceFunctionHandle) {
return getSequenceFunctionProcessorProvider();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,27 +17,23 @@
import com.fasterxml.jackson.annotation.JsonProperty;
import io.trino.spi.connector.CatalogHandle;
import io.trino.spi.connector.ConnectorTransactionHandle;
import io.trino.spi.function.SchemaFunctionName;
import io.trino.spi.ptf.ConnectorTableFunctionHandle;

import static java.util.Objects.requireNonNull;

public class TableFunctionHandle
{
private final CatalogHandle catalogHandle;
private final SchemaFunctionName schemaFunctionName;
private final ConnectorTableFunctionHandle functionHandle;
private final ConnectorTransactionHandle transactionHandle;

@JsonCreator
public TableFunctionHandle(
@JsonProperty("catalogHandle") CatalogHandle catalogHandle,
@JsonProperty("schemaFunctionName") SchemaFunctionName schemaFunctionName,
@JsonProperty("functionHandle") ConnectorTableFunctionHandle functionHandle,
@JsonProperty("transactionHandle") ConnectorTransactionHandle transactionHandle)
{
this.catalogHandle = requireNonNull(catalogHandle, "catalogHandle is null");
this.schemaFunctionName = requireNonNull(schemaFunctionName, "schemaFunctionName is null");
this.functionHandle = requireNonNull(functionHandle, "functionHandle is null");
this.transactionHandle = requireNonNull(transactionHandle, "transactionHandle is null");
}
Expand All @@ -48,12 +44,6 @@ public CatalogHandle getCatalogHandle()
return catalogHandle;
}

@JsonProperty
public SchemaFunctionName getSchemaFunctionName()
{
return schemaFunctionName;
}

@JsonProperty
public ConnectorTableFunctionHandle getFunctionHandle()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ public TableFunctionAnalysis analyze(ConnectorSession session, ConnectorTransact
return TableFunctionAnalysis.builder()
.requiredColumns(TABLE_ARGUMENT_NAME, requiredColumns.build())
.returnedType(new Descriptor(returnedType))
// there's no information to remember. All logic is effectively delegated to the engine via `requiredColumns`. We do not pass a ConnectorTableHandle. EMPTY_HANDLE will be used.
.handle(new ExcludeColumnsFunctionHandle())
.build();
}
}
Expand All @@ -161,4 +161,10 @@ public TableFunctionDataProcessor getDataProcessor(ConnectorTableFunctionHandle
}
};
}

public record ExcludeColumnsFunctionHandle()
implements ConnectorTableFunctionHandle
{
// there's no information to remember. All logic is effectively delegated to the engine via `requiredColumns`.
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,6 @@ public SplitSource getSplits(Session session, Span parentSpan, TableFunctionHand
ConnectorSplitSource source = splitManager.getSplits(
function.getTransactionHandle(),
session.toConnectorSession(catalogHandle),
function.getSchemaFunctionName(),
function.getFunctionHandle());

SplitSource splitSource = new ConnectorAwareSplitSource(catalogHandle, source);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2234,7 +2234,6 @@ public TableArgumentAnalysis build()
public static class TableFunctionInvocationAnalysis
{
private final CatalogHandle catalogHandle;
private final String schemaName;
private final String functionName;
private final Map<String, Argument> arguments;
private final List<TableArgumentAnalysis> tableArgumentAnalyses;
Expand All @@ -2246,7 +2245,6 @@ public static class TableFunctionInvocationAnalysis

public TableFunctionInvocationAnalysis(
CatalogHandle catalogHandle,
String schemaName,
String functionName,
Map<String, Argument> arguments,
List<TableArgumentAnalysis> tableArgumentAnalyses,
Expand All @@ -2257,7 +2255,6 @@ public TableFunctionInvocationAnalysis(
ConnectorTransactionHandle transactionHandle)
{
this.catalogHandle = requireNonNull(catalogHandle, "catalogHandle is null");
this.schemaName = requireNonNull(schemaName, "schemaName is null");
this.functionName = requireNonNull(functionName, "functionName is null");
this.arguments = ImmutableMap.copyOf(arguments);
this.tableArgumentAnalyses = ImmutableList.copyOf(tableArgumentAnalyses);
Expand All @@ -2274,11 +2271,6 @@ public CatalogHandle getCatalogHandle()
return catalogHandle;
}

public String getSchemaName()
{
return schemaName;
}

public String getFunctionName()
{
return functionName;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1690,7 +1690,6 @@ else if (argument.getPartitionBy().isPresent()) {

analysis.setTableFunctionAnalysis(node, new TableFunctionInvocationAnalysis(
catalogHandle,
function.getSchema(),
function.getName(),
argumentsAnalysis.getPassedArguments(),
orderedTableArguments.build(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import io.trino.metadata.TableFunctionHandle;
import io.trino.metadata.TableHandle;
import io.trino.spi.connector.ColumnHandle;
import io.trino.spi.function.SchemaFunctionName;
import io.trino.spi.type.RowType;
import io.trino.spi.type.Type;
import io.trino.sql.ExpressionUtils;
Expand Down Expand Up @@ -476,7 +475,6 @@ else if (tableArgument.getPartitionBy().isPresent()) {
functionAnalysis.getCopartitioningLists(),
new TableFunctionHandle(
functionAnalysis.getCatalogHandle(),
new SchemaFunctionName(functionAnalysis.getSchemaName(), functionAnalysis.getFunctionName()),
functionAnalysis.getConnectorTableFunctionHandle(),
functionAnalysis.getTransactionHandle()));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -902,8 +902,7 @@ public Optional<TableFunctionApplicationResult<TableHandle>> applyTableFunction(
{
Span span = startSpan("applyTableFunction")
.setAttribute(TrinoAttributes.CATALOG, handle.getCatalogHandle().getCatalogName())
.setAttribute(TrinoAttributes.SCHEMA, handle.getSchemaFunctionName().getSchemaName())
.setAttribute(TrinoAttributes.FUNCTION, handle.getSchemaFunctionName().getFunctionName());
.setAttribute(TrinoAttributes.HANDLE, handle.getFunctionHandle().toString());
try (var ignored = scopedSpan(span)) {
return delegate.applyTableFunction(session, handle);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,6 @@
import io.trino.spi.eventlistener.EventListener;
import io.trino.spi.expression.ConnectorExpression;
import io.trino.spi.function.FunctionProvider;
import io.trino.spi.function.SchemaFunctionName;
import io.trino.spi.metrics.Metrics;
import io.trino.spi.procedure.Procedure;
import io.trino.spi.ptf.ConnectorTableFunction;
Expand Down Expand Up @@ -173,7 +172,7 @@ public class MockConnector
private final Supplier<List<PropertyMetadata<?>>> tableProperties;
private final Supplier<List<PropertyMetadata<?>>> columnProperties;
private final List<PropertyMetadata<?>> sessionProperties;
private final Map<SchemaFunctionName, Function<ConnectorTableFunctionHandle, ConnectorSplitSource>> tableFunctionSplitsSources;
private final Function<ConnectorTableFunctionHandle, ConnectorSplitSource> tableFunctionSplitsSources;
private final OptionalInt maxWriterTasks;
private final BiFunction<ConnectorSession, ConnectorTableExecuteHandle, Optional<ConnectorTableLayout>> getLayoutForTableExecute;

Expand Down Expand Up @@ -219,7 +218,7 @@ public class MockConnector
Supplier<List<PropertyMetadata<?>>> tableProperties,
Supplier<List<PropertyMetadata<?>>> columnProperties,
boolean supportsReportingWrittenBytes,
Map<SchemaFunctionName, Function<ConnectorTableFunctionHandle, ConnectorSplitSource>> tableFunctionSplitsSources,
Function<ConnectorTableFunctionHandle, ConnectorSplitSource> tableFunctionSplitsSources,
OptionalInt maxWriterTasks,
BiFunction<ConnectorSession, ConnectorTableExecuteHandle, Optional<ConnectorTableLayout>> getLayoutForTableExecute)
{
Expand Down Expand Up @@ -264,7 +263,7 @@ public class MockConnector
this.schemaProperties = requireNonNull(schemaProperties, "schemaProperties is null");
this.tableProperties = requireNonNull(tableProperties, "tableProperties is null");
this.columnProperties = requireNonNull(columnProperties, "columnProperties is null");
this.tableFunctionSplitsSources = ImmutableMap.copyOf(tableFunctionSplitsSources);
this.tableFunctionSplitsSources = requireNonNull(tableFunctionSplitsSources, "tableFunctionSplitsSources is null");
this.maxWriterTasks = requireNonNull(maxWriterTasks, "maxWriterTasks is null");
this.getLayoutForTableExecute = requireNonNull(getLayoutForTableExecute, "getLayoutForTableExecute is null");
}
Expand Down Expand Up @@ -316,11 +315,10 @@ public ConnectorSplitSource getSplits(
}

@Override
public ConnectorSplitSource getSplits(ConnectorTransactionHandle transaction, ConnectorSession session, SchemaFunctionName name, ConnectorTableFunctionHandle functionHandle)
public ConnectorSplitSource getSplits(ConnectorTransactionHandle transaction, ConnectorSession session, ConnectorTableFunctionHandle functionHandle)
{
Function<ConnectorTableFunctionHandle, ConnectorSplitSource> splitSourceProvider = tableFunctionSplitsSources.get(name);
requireNonNull(splitSourceProvider, "missing ConnectorSplitSource for table function " + name);
return splitSourceProvider.apply(functionHandle);
ConnectorSplitSource splits = tableFunctionSplitsSources.apply(functionHandle);
return requireNonNull(splits, "missing ConnectorSplitSource for table function handle " + functionHandle.getClass().getSimpleName());
}
};
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@
import io.trino.spi.eventlistener.EventListener;
import io.trino.spi.expression.ConnectorExpression;
import io.trino.spi.function.FunctionProvider;
import io.trino.spi.function.SchemaFunctionName;
import io.trino.spi.metrics.Metrics;
import io.trino.spi.procedure.Procedure;
import io.trino.spi.ptf.ConnectorTableFunction;
Expand All @@ -64,7 +63,6 @@
import io.trino.spi.statistics.TableStatistics;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -126,7 +124,7 @@ public class MockConnectorFactory
private final Supplier<List<PropertyMetadata<?>>> tableProperties;
private final Supplier<List<PropertyMetadata<?>>> columnProperties;
private final Optional<ConnectorNodePartitioningProvider> partitioningProvider;
private final Map<SchemaFunctionName, Function<ConnectorTableFunctionHandle, ConnectorSplitSource>> tableFunctionSplitsSources;
private final Function<ConnectorTableFunctionHandle, ConnectorSplitSource> tableFunctionSplitsSources;

// access control
private final ListRoleGrants roleGrants;
Expand Down Expand Up @@ -178,7 +176,7 @@ private MockConnectorFactory(
boolean supportsReportingWrittenBytes,
Optional<ConnectorAccessControl> accessControl,
boolean allowMissingColumnsOnInsert,
Map<SchemaFunctionName, Function<ConnectorTableFunctionHandle, ConnectorSplitSource>> tableFunctionSplitsSources,
Function<ConnectorTableFunctionHandle, ConnectorSplitSource> tableFunctionSplitsSources,
OptionalInt maxWriterTasks,
BiFunction<ConnectorSession, ConnectorTableExecuteHandle, Optional<ConnectorTableLayout>> getLayoutForTableExecute)
{
Expand Down Expand Up @@ -224,7 +222,7 @@ private MockConnectorFactory(
this.functionProvider = requireNonNull(functionProvider, "functionProvider is null");
this.allowMissingColumnsOnInsert = allowMissingColumnsOnInsert;
this.supportsReportingWrittenBytes = supportsReportingWrittenBytes;
this.tableFunctionSplitsSources = ImmutableMap.copyOf(tableFunctionSplitsSources);
this.tableFunctionSplitsSources = requireNonNull(tableFunctionSplitsSources, "tableFunctionSplitsSources is null");
this.maxWriterTasks = maxWriterTasks;
this.getLayoutForTableExecute = requireNonNull(getLayoutForTableExecute, "getLayoutForTableExecute is null");
}
Expand Down Expand Up @@ -409,7 +407,7 @@ public static final class Builder
private Supplier<List<PropertyMetadata<?>>> tableProperties = ImmutableList::of;
private Supplier<List<PropertyMetadata<?>>> columnProperties = ImmutableList::of;
private Optional<ConnectorNodePartitioningProvider> partitioningProvider = Optional.empty();
private final Map<SchemaFunctionName, Function<ConnectorTableFunctionHandle, ConnectorSplitSource>> tableFunctionSplitsSources = new HashMap<>();
private Function<ConnectorTableFunctionHandle, ConnectorSplitSource> tableFunctionSplitsSources = handle -> null;

// access control
private boolean provideAccessControl;
Expand Down Expand Up @@ -678,9 +676,9 @@ public Builder withPartitionProvider(ConnectorNodePartitioningProvider partition
return this;
}

public Builder withTableFunctionSplitSource(SchemaFunctionName name, Function<ConnectorTableFunctionHandle, ConnectorSplitSource> sourceProvider)
public Builder withTableFunctionSplitSources(Function<ConnectorTableFunctionHandle, ConnectorSplitSource> sourceProvider)
{
tableFunctionSplitsSources.put(name, sourceProvider);
tableFunctionSplitsSources = requireNonNull(sourceProvider, "sourceProvider is null");
return this;
}

Expand Down
Loading

0 comments on commit c4c4859

Please sign in to comment.