Skip to content

Commit

Permalink
Resolve table function based StorageEngine provided function resolver (
Browse files Browse the repository at this point in the history
…#1354)

Signed-off-by: Peng Huo <[email protected]>
(cherry picked from commit bcfda37)
  • Loading branch information
penghuo authored and github-actions[bot] committed Mar 10, 2023
1 parent 11d0d6d commit d0e2db9
Show file tree
Hide file tree
Showing 7 changed files with 194 additions and 172 deletions.
2 changes: 1 addition & 1 deletion core/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ dependencies {
test {
useJUnitPlatform()
testLogging {
events "passed", "skipped", "failed"
events "skipped", "failed"
exceptionFormat "full"
}
}
Expand Down
4 changes: 3 additions & 1 deletion core/src/main/java/org/opensearch/sql/analysis/Analyzer.java
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,9 @@ public LogicalPlan visitTableFunction(TableFunction node, AnalysisContext contex
.collect(Collectors.toList());
TableFunctionImplementation tableFunctionImplementation
= (TableFunctionImplementation) repository.compile(context.getFunctionProperties(),
dataSourceSchemaIdentifierNameResolver.getDataSourceName(), functionName, arguments);
dataSourceService
.getDataSource(dataSourceSchemaIdentifierNameResolver.getDataSourceName())
.getStorageEngine().getFunctions(), functionName, arguments);
context.push();
TypeEnvironment curEnv = context.peek();
Table table = tableFunctionImplementation.applyArguments();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,5 @@
public enum DataSourceType {
PROMETHEUS,
OPENSEARCH,
FILESYSTEM
JDBC
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,12 @@

import com.google.common.annotations.VisibleForTesting;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.commons.lang3.tuple.Pair;
Expand All @@ -32,6 +35,7 @@
import org.opensearch.sql.expression.system.SystemFunctions;
import org.opensearch.sql.expression.text.TextFunction;
import org.opensearch.sql.expression.window.WindowFunctions;
import org.opensearch.sql.storage.StorageEngine;

/**
* Builtin Function Repository.
Expand All @@ -40,22 +44,20 @@
*
*/
public class BuiltinFunctionRepository {
public static final String DEFAULT_NAMESPACE = "default";

private final Map<String, Map<FunctionName, FunctionResolver>> namespaceFunctionResolverMap;
private final Map<FunctionName, FunctionResolver> functionResolverMap;

/** The singleton instance. */
private static BuiltinFunctionRepository instance;

/**
* Construct a function repository with the given function registered. This is only used in test.
*
* @param namespaceFunctionResolverMap function supported
* @param functionResolverMap function supported
*/
@VisibleForTesting
BuiltinFunctionRepository(
Map<String, Map<FunctionName, FunctionResolver>> namespaceFunctionResolverMap) {
this.namespaceFunctionResolverMap = namespaceFunctionResolverMap;
BuiltinFunctionRepository(Map<FunctionName, FunctionResolver> functionResolverMap) {
this.functionResolverMap = functionResolverMap;
}

/**
Expand Down Expand Up @@ -86,106 +88,86 @@ public static synchronized BuiltinFunctionRepository getInstance() {
}

/**
* Register {@link DefaultFunctionResolver} to the Builtin Function Repository
* under default namespace.
* Register {@link DefaultFunctionResolver} to the Builtin Function Repository.
*
* @param resolver {@link DefaultFunctionResolver} to be registered
*/
public void register(FunctionResolver resolver) {
register(DEFAULT_NAMESPACE, resolver);
functionResolverMap.put(resolver.getFunctionName(), resolver);
}

/**
* Register {@link DefaultFunctionResolver} to the Builtin Function Repository with
* specified namespace.
*
* @param resolver {@link DefaultFunctionResolver} to be registered
*/
public void register(String namespace, FunctionResolver resolver) {
Map<FunctionName, FunctionResolver> functionResolverMap;
if (!namespaceFunctionResolverMap.containsKey(namespace)) {
functionResolverMap = new HashMap<>();
namespaceFunctionResolverMap.put(namespace, functionResolverMap);
}
namespaceFunctionResolverMap.get(namespace).put(resolver.getFunctionName(), resolver);
}

/**
* Compile FunctionExpression under default namespace.
* Compile FunctionExpression using core function resolver.
*
*/
public FunctionImplementation compile(FunctionProperties functionProperties,
FunctionName functionName, List<Expression> expressions) {
return compile(functionProperties, DEFAULT_NAMESPACE, functionName, expressions);
return compile(functionProperties, Collections.emptyList(), functionName, expressions);
}


/**
* Compile FunctionExpression within given namespace.
* Checks for default namespace first and then tries to compile from given namespace.
* Compile FunctionExpression within {@link StorageEngine} provided {@link FunctionResolver}.
*/
public FunctionImplementation compile(FunctionProperties functionProperties,
String namespace,
Collection<FunctionResolver> dataSourceFunctionResolver,
FunctionName functionName,
List<Expression> expressions) {
List<String> namespaceList = new ArrayList<>(List.of(DEFAULT_NAMESPACE));
if (!namespace.equals(DEFAULT_NAMESPACE)) {
namespaceList.add(namespace);
}
FunctionBuilder resolvedFunctionBuilder = resolve(
namespaceList, new FunctionSignature(functionName, expressions
.stream().map(Expression::type).collect(Collectors.toList())));
FunctionBuilder resolvedFunctionBuilder =
resolve(
dataSourceFunctionResolver,
new FunctionSignature(
functionName,
expressions.stream().map(Expression::type).collect(Collectors.toList())));
return resolvedFunctionBuilder.apply(functionProperties, expressions);
}

/**
* Resolve the {@link FunctionBuilder} in
* repository under a list of namespaces.
* Returns the First FunctionBuilder found.
* So list of namespaces is also the priority of namespaces.
* Resolve the {@link FunctionBuilder} in repository under a list of namespaces. Returns the First
* FunctionBuilder found. So list of namespaces is also the priority of namespaces.
*
* @param functionSignature {@link FunctionSignature} functionsignature.
* @return Original function builder if it's a cast function or all arguments have expected types
* or otherwise wrap its arguments by cast function as needed.
* or otherwise wrap its arguments by cast function as needed.
*/
public FunctionBuilder
resolve(List<String> namespaces,
FunctionSignature functionSignature) {
FunctionName functionName = functionSignature.getFunctionName();
FunctionBuilder result = null;
for (String namespace : namespaces) {
if (namespaceFunctionResolverMap.containsKey(namespace)
&& namespaceFunctionResolverMap.get(namespace).containsKey(functionName)) {
result = getFunctionBuilder(functionSignature, functionName,
namespaceFunctionResolverMap.get(namespace));
break;
}
}
if (result == null) {
throw new ExpressionEvaluationException(
String.format("unsupported function name: %s", functionName.getFunctionName()));
} else {
return result;
}
@VisibleForTesting
public FunctionBuilder resolve(
Collection<FunctionResolver> dataSourceFunctionResolver,
FunctionSignature functionSignature) {
Map<FunctionName, FunctionResolver> dataSourceFunctionMap = dataSourceFunctionResolver.stream()
.collect(Collectors.toMap(FunctionResolver::getFunctionName, t -> t));

// first, resolve in datasource provide function resolver.
// second, resolve in builtin function resolver.
return resolve(functionSignature, dataSourceFunctionMap)
.or(() -> resolve(functionSignature, functionResolverMap))
.orElseThrow(
() ->
new ExpressionEvaluationException(
String.format(
"unsupported function name: %s", functionSignature.getFunctionName())));
}

private FunctionBuilder getFunctionBuilder(
private Optional<FunctionBuilder> resolve(
FunctionSignature functionSignature,
FunctionName functionName,
Map<FunctionName, FunctionResolver> functionResolverMap) {
Pair<FunctionSignature, FunctionBuilder> resolvedSignature =
functionResolverMap.get(functionName).resolve(functionSignature);

List<ExprType> sourceTypes = functionSignature.getParamTypeList();
List<ExprType> targetTypes = resolvedSignature.getKey().getParamTypeList();
FunctionBuilder funcBuilder = resolvedSignature.getValue();
if (isCastFunction(functionName)
|| FunctionSignature.isVarArgFunction(targetTypes)
|| sourceTypes.equals(targetTypes)) {
return funcBuilder;
FunctionName functionName = functionSignature.getFunctionName();
if (functionResolverMap.containsKey(functionName)) {
Pair<FunctionSignature, FunctionBuilder> resolvedSignature =
functionResolverMap.get(functionName).resolve(functionSignature);

List<ExprType> sourceTypes = functionSignature.getParamTypeList();
List<ExprType> targetTypes = resolvedSignature.getKey().getParamTypeList();
FunctionBuilder funcBuilder = resolvedSignature.getValue();
if (isCastFunction(functionName)
|| FunctionSignature.isVarArgFunction(targetTypes)
|| sourceTypes.equals(targetTypes)) {
return Optional.of(funcBuilder);
}
return Optional.of(castArguments(sourceTypes, targetTypes, funcBuilder));
} else {
return Optional.empty();
}
return castArguments(sourceTypes,
targetTypes, funcBuilder);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,23 @@
package org.opensearch.sql.analysis;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.opensearch.sql.analysis.DataSourceSchemaIdentifierNameResolver.DEFAULT_DATASOURCE_NAME;
import static org.opensearch.sql.data.type.ExprCoreType.LONG;
import static org.opensearch.sql.data.type.ExprCoreType.STRING;

import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.commons.lang3.tuple.Pair;
import org.opensearch.sql.DataSourceSchemaName;
import org.opensearch.sql.analysis.symbol.Namespace;
import org.opensearch.sql.analysis.symbol.Symbol;
import org.opensearch.sql.analysis.symbol.SymbolTable;
Expand Down Expand Up @@ -53,6 +60,39 @@ protected StorageEngine storageEngine() {
return (dataSourceSchemaName, tableName) -> table;
}

protected StorageEngine prometheusStorageEngine() {
return new StorageEngine() {
@Override
public Collection<FunctionResolver> getFunctions() {
return Collections.singletonList(
new FunctionResolver() {

@Override
public Pair<FunctionSignature, FunctionBuilder> resolve(
FunctionSignature unresolvedSignature) {
FunctionName functionName = FunctionName.of("query_range");
FunctionSignature functionSignature =
new FunctionSignature(functionName, List.of(STRING, LONG, LONG, LONG));
return Pair.of(
functionSignature,
(functionProperties, args) ->
new TestTableFunctionImplementation(functionName, args, table));
}

@Override
public FunctionName getFunctionName() {
return FunctionName.of("query_range");
}
});
}

@Override
public Table getTable(DataSourceSchemaName dataSourceSchemaName, String tableName) {
return table;
}
};
}

protected Table table() {
return Optional.ofNullable(table).orElseGet(() -> new Table() {
@Override
Expand Down Expand Up @@ -110,30 +150,11 @@ protected Environment<Expression, ExprType> typeEnv() {

protected DataSourceService dataSourceService = dataSourceService();

protected Analyzer analyzer = analyzer(expressionAnalyzer(), dataSourceService, table);
protected Analyzer analyzer = analyzer(expressionAnalyzer(), dataSourceService);

protected Analyzer analyzer(ExpressionAnalyzer expressionAnalyzer,
DataSourceService dataSourceService,
Table table) {
DataSourceService dataSourceService) {
BuiltinFunctionRepository functionRepository = BuiltinFunctionRepository.getInstance();
functionRepository.register("prometheus", new FunctionResolver() {

@Override
public Pair<FunctionSignature, FunctionBuilder> resolve(
FunctionSignature unresolvedSignature) {
FunctionName functionName = FunctionName.of("query_range");
FunctionSignature functionSignature =
new FunctionSignature(functionName, List.of(STRING, LONG, LONG, LONG));
return Pair.of(functionSignature,
(functionProperties, args) -> new TestTableFunctionImplementation(functionName, args,
table));
}

@Override
public FunctionName getFunctionName() {
return FunctionName.of("query_range");
}
});
return new Analyzer(expressionAnalyzer, dataSourceService, functionRepository);
}

Expand All @@ -159,20 +180,26 @@ protected LogicalPlan analyze(UnresolvedPlan unresolvedPlan) {

private class DefaultDataSourceService implements DataSourceService {

private StorageEngine storageEngine = storageEngine();
private final DataSource dataSource
= new DataSource("prometheus", DataSourceType.PROMETHEUS, storageEngine);
private final DataSource opensearchDataSource = new DataSource(DEFAULT_DATASOURCE_NAME,
DataSourceType.OPENSEARCH, storageEngine());
private final DataSource prometheusDataSource
= new DataSource("prometheus", DataSourceType.PROMETHEUS, prometheusStorageEngine());


@Override
public Set<DataSourceMetadata> getDataSourceMetadataSet() {
return ImmutableSet.of(new DataSourceMetadata(dataSource.getName(),
dataSource.getConnectorType(), ImmutableMap.of()));
return Stream.of(opensearchDataSource, prometheusDataSource)
.map(ds -> new DataSourceMetadata(ds.getName(),
ds.getConnectorType(), ImmutableMap.of())).collect(Collectors.toSet());
}

@Override
public DataSource getDataSource(String dataSourceName) {
return dataSource;
if ("prometheus".equals(dataSourceName)) {
return prometheusDataSource;
} else {
return opensearchDataSource;
}
}

@Override
Expand Down
Loading

0 comments on commit d0e2db9

Please sign in to comment.