Skip to content

Commit

Permalink
Change catalog to datasource
Browse files Browse the repository at this point in the history
Signed-off-by: Peng Huo <[email protected]>
  • Loading branch information
penghuo committed Nov 17, 2022
1 parent 0504c71 commit 738020f
Show file tree
Hide file tree
Showing 38 changed files with 805 additions and 611 deletions.
58 changes: 25 additions & 33 deletions core/src/main/java/org/opensearch/sql/analysis/Analyzer.java
Original file line number Diff line number Diff line change
Expand Up @@ -11,18 +11,10 @@
import static org.opensearch.sql.ast.tree.Sort.SortOrder.ASC;
import static org.opensearch.sql.ast.tree.Sort.SortOrder.DESC;
import static org.opensearch.sql.data.type.ExprCoreType.STRUCT;
import static org.opensearch.sql.utils.MLCommonsConstants.ACTION;
import static org.opensearch.sql.utils.MLCommonsConstants.MODELID;
import static org.opensearch.sql.utils.MLCommonsConstants.PREDICT;
import static org.opensearch.sql.utils.MLCommonsConstants.RCF_ANOMALOUS;
import static org.opensearch.sql.utils.MLCommonsConstants.RCF_ANOMALY_GRADE;
import static org.opensearch.sql.utils.MLCommonsConstants.RCF_SCORE;
import static org.opensearch.sql.utils.MLCommonsConstants.RCF_TIMESTAMP;
import static org.opensearch.sql.utils.MLCommonsConstants.STATUS;
import static org.opensearch.sql.utils.MLCommonsConstants.TASKID;
import static org.opensearch.sql.utils.MLCommonsConstants.TIME_FIELD;
import static org.opensearch.sql.utils.MLCommonsConstants.TRAIN;
import static org.opensearch.sql.utils.MLCommonsConstants.TRAINANDPREDICT;
import static org.opensearch.sql.utils.SystemIndexUtils.CATALOGS_TABLE_NAME;

import com.google.common.collect.ImmutableList;
Expand Down Expand Up @@ -69,8 +61,8 @@
import org.opensearch.sql.ast.tree.TableFunction;
import org.opensearch.sql.ast.tree.UnresolvedPlan;
import org.opensearch.sql.ast.tree.Values;
import org.opensearch.sql.catalog.CatalogService;
import org.opensearch.sql.catalog.model.Catalog;
import org.opensearch.sql.datasource.DataSourceService;
import org.opensearch.sql.datasource.model.DataSource;
import org.opensearch.sql.data.model.ExprMissingValue;
import org.opensearch.sql.data.type.ExprCoreType;
import org.opensearch.sql.exception.SemanticCheckException;
Expand Down Expand Up @@ -101,7 +93,7 @@
import org.opensearch.sql.planner.logical.LogicalRename;
import org.opensearch.sql.planner.logical.LogicalSort;
import org.opensearch.sql.planner.logical.LogicalValues;
import org.opensearch.sql.planner.physical.catalog.CatalogTable;
import org.opensearch.sql.planner.physical.datasource.DataSourceTable;
import org.opensearch.sql.storage.Table;
import org.opensearch.sql.utils.ParseUtils;

Expand All @@ -117,7 +109,7 @@ public class Analyzer extends AbstractNodeVisitor<LogicalPlan, AnalysisContext>

private final NamedExpressionAnalyzer namedExpressionAnalyzer;

private final CatalogService catalogService;
private final DataSourceService dataSourceService;

private final BuiltinFunctionRepository repository;

Expand All @@ -126,10 +118,10 @@ public class Analyzer extends AbstractNodeVisitor<LogicalPlan, AnalysisContext>
*/
public Analyzer(
ExpressionAnalyzer expressionAnalyzer,
CatalogService catalogService,
DataSourceService dataSourceService,
BuiltinFunctionRepository repository) {
this.expressionAnalyzer = expressionAnalyzer;
this.catalogService = catalogService;
this.dataSourceService = dataSourceService;
this.selectExpressionAnalyzer = new SelectExpressionAnalyzer(expressionAnalyzer);
this.namedExpressionAnalyzer = new NamedExpressionAnalyzer(expressionAnalyzer);
this.repository = repository;
Expand All @@ -142,25 +134,25 @@ public LogicalPlan analyze(UnresolvedPlan unresolved, AnalysisContext context) {
@Override
public LogicalPlan visitRelation(Relation node, AnalysisContext context) {
QualifiedName qualifiedName = node.getTableQualifiedName();
Set<String> allowedCatalogNames = catalogService.getCatalogs()
Set<String> allowedCatalogNames = dataSourceService.getDataSources()
.stream()
.map(Catalog::getName)
.map(DataSource::getName)
.collect(Collectors.toSet());
CatalogSchemaIdentifierNameResolver catalogSchemaIdentifierNameResolver
= new CatalogSchemaIdentifierNameResolver(qualifiedName.getParts(), allowedCatalogNames);
String tableName = catalogSchemaIdentifierNameResolver.getIdentifierName();
DataSourceSchemaIdentifierNameResolver dataSourceSchemaIdentifierNameResolver
= new DataSourceSchemaIdentifierNameResolver(qualifiedName.getParts(), allowedCatalogNames);
String tableName = dataSourceSchemaIdentifierNameResolver.getIdentifierName();
context.push();
TypeEnvironment curEnv = context.peek();
Table table;
if (CATALOGS_TABLE_NAME.equals(tableName)) {
table = new CatalogTable(catalogService);
table = new DataSourceTable(dataSourceService);
} else {
table = catalogService
.getCatalog(catalogSchemaIdentifierNameResolver.getCatalogName())
table = dataSourceService
.getDataSource(dataSourceSchemaIdentifierNameResolver.getDataSourceName())
.getStorageEngine()
.getTable(new CatalogSchemaName(catalogSchemaIdentifierNameResolver.getCatalogName(),
catalogSchemaIdentifierNameResolver.getSchemaName()),
catalogSchemaIdentifierNameResolver.getIdentifierName());
.getTable(new CatalogSchemaName(dataSourceSchemaIdentifierNameResolver.getDataSourceName(),
dataSourceSchemaIdentifierNameResolver.getSchemaName()),
dataSourceSchemaIdentifierNameResolver.getIdentifierName());
}
table.getFieldTypes().forEach((k, v) -> curEnv.define(new Symbol(Namespace.FIELD_NAME, k), v));

Expand Down Expand Up @@ -188,28 +180,28 @@ public LogicalPlan visitRelationSubquery(RelationSubquery node, AnalysisContext
@Override
public LogicalPlan visitTableFunction(TableFunction node, AnalysisContext context) {
QualifiedName qualifiedName = node.getFunctionName();
Set<String> allowedCatalogNames = catalogService.getCatalogs()
Set<String> allowedCatalogNames = dataSourceService.getDataSources()
.stream()
.map(Catalog::getName)
.map(DataSource::getName)
.collect(Collectors.toSet());
CatalogSchemaIdentifierNameResolver catalogSchemaIdentifierNameResolver
= new CatalogSchemaIdentifierNameResolver(qualifiedName.getParts(), allowedCatalogNames);
DataSourceSchemaIdentifierNameResolver dataSourceSchemaIdentifierNameResolver
= new DataSourceSchemaIdentifierNameResolver(qualifiedName.getParts(), allowedCatalogNames);

FunctionName functionName
= FunctionName.of(catalogSchemaIdentifierNameResolver.getIdentifierName());
= FunctionName.of(dataSourceSchemaIdentifierNameResolver.getIdentifierName());
List<Expression> arguments = node.getArguments().stream()
.map(unresolvedExpression -> this.expressionAnalyzer.analyze(unresolvedExpression, context))
.collect(Collectors.toList());
TableFunctionImplementation tableFunctionImplementation
= (TableFunctionImplementation) repository.compile(
catalogSchemaIdentifierNameResolver.getCatalogName(), functionName, arguments);
dataSourceSchemaIdentifierNameResolver.getDataSourceName(), functionName, arguments);
context.push();
TypeEnvironment curEnv = context.peek();
Table table = tableFunctionImplementation.applyArguments();
table.getFieldTypes().forEach((k, v) -> curEnv.define(new Symbol(Namespace.FIELD_NAME, k), v));
curEnv.define(new Symbol(Namespace.INDEX_NAME,
catalogSchemaIdentifierNameResolver.getIdentifierName()), STRUCT);
return new LogicalRelation(catalogSchemaIdentifierNameResolver.getIdentifierName(),
dataSourceSchemaIdentifierNameResolver.getIdentifierName()), STRUCT);
return new LogicalRelation(dataSourceSchemaIdentifierNameResolver.getIdentifierName(),
tableFunctionImplementation.applyArguments());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,13 @@
import java.util.List;
import java.util.Set;

public class CatalogSchemaIdentifierNameResolver {
public class DataSourceSchemaIdentifierNameResolver {

public static final String DEFAULT_CATALOG_NAME = "@opensearch";
public static final String DEFAULT_DATASOURCE_NAME = "@opensearch";
public static final String DEFAULT_SCHEMA_NAME = "default";
public static final String INFORMATION_SCHEMA_NAME = "information_schema";

private String catalogName = DEFAULT_CATALOG_NAME;
private String dataSourceName = DEFAULT_DATASOURCE_NAME;
private String schemaName = DEFAULT_SCHEMA_NAME;
private String identifierName;

Expand All @@ -31,17 +31,17 @@ public class CatalogSchemaIdentifierNameResolver {
* @param parts parts of qualifiedName.
* @param allowedCatalogs allowedCatalogs.
*/
public CatalogSchemaIdentifierNameResolver(List<String> parts, Set<String> allowedCatalogs) {
List<String> remainingParts = captureSchemaName(captureCatalogName(parts, allowedCatalogs));
public DataSourceSchemaIdentifierNameResolver(List<String> parts, Set<String> allowedCatalogs) {
List<String> remainingParts = captureSchemaName(captureDataSourceName(parts, allowedCatalogs));
identifierName = String.join(DOT, remainingParts);
}

public String getIdentifierName() {
return identifierName;
}

public String getCatalogName() {
return catalogName;
public String getDataSourceName() {
return dataSourceName;
}

public String getSchemaName() {
Expand All @@ -51,10 +51,10 @@ public String getSchemaName() {

// Capture catalog name and return remaining parts(schema name and table name)
// from the fully qualified name.
private List<String> captureCatalogName(List<String> parts, Set<String> allowedCatalogs) {
private List<String> captureDataSourceName(List<String> parts, Set<String> allowedCatalogs) {
if (parts.size() > 1 && allowedCatalogs.contains(parts.get(0))
|| DEFAULT_CATALOG_NAME.equals(parts.get(0))) {
catalogName = parts.get(0);
|| DEFAULT_DATASOURCE_NAME.equals(parts.get(0))) {
dataSourceName = parts.get(0);
return parts.subList(1, parts.size());
} else {
return parts;
Expand Down
40 changes: 0 additions & 40 deletions core/src/main/java/org/opensearch/sql/catalog/CatalogService.java

This file was deleted.

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.datasource;

import java.util.Set;
import org.opensearch.sql.datasource.model.DataSource;
import org.opensearch.sql.datasource.model.DataSourceMetadata;

/**
* DataSource Service manage {@link DataSource}.
*/
public interface DataSourceService {

/**
* Returns all DataSource objects.
*
* @return set of {@link DataSource}.
*/
Set<DataSource> getDataSources();

/**
* Returns {@link DataSource} with corresponding to the DataSource name.
*
* @param dataSourceName Name of the {@link DataSource}.
* @return {@link DataSource}.
*/
DataSource getDataSource(String dataSourceName);

/**
* Register {@link DataSource} defined by {@link DataSourceMetadata}.
*
* @param dataSourceMetadata {@link DataSourceMetadata}.
*/
void addDataSource(DataSourceMetadata dataSourceMetadata);

/**
* remove all the registered {@link DataSource}.
*/
void clear();
}
Loading

0 comments on commit 738020f

Please sign in to comment.