Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add spark connector #1780

Merged
merged 19 commits into from
Jul 5, 2023
Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@
public enum DataSourceType {
PROMETHEUS("prometheus"),
OPENSEARCH("opensearch"),
JDBC("jdbc");

JDBC("jdbc"),
rupal-bq marked this conversation as resolved.
Show resolved Hide resolved
SPARK("spark");
private String text;

DataSourceType(String text) {
Expand Down
1 change: 1 addition & 0 deletions plugin/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ dependencies {
api project(':opensearch')
api project(':prometheus')
api project(':datasources')
api project(':spark')

testImplementation group: 'net.bytebuddy', name: 'byte-buddy-agent', version: '1.12.13'
testImplementation group: 'org.hamcrest', name: 'hamcrest-library', version: '2.1'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@
import org.opensearch.sql.plugin.transport.TransportPPLQueryAction;
import org.opensearch.sql.plugin.transport.TransportPPLQueryResponse;
import org.opensearch.sql.prometheus.storage.PrometheusStorageFactory;
import org.opensearch.sql.spark.storage.SparkStorageFactory;
import org.opensearch.sql.storage.DataSourceFactory;
import org.opensearch.threadpool.ExecutorBuilder;
import org.opensearch.threadpool.FixedExecutorBuilder;
Expand Down Expand Up @@ -221,6 +222,7 @@ private DataSourceServiceImpl createDataSourceService() {
.add(new OpenSearchDataSourceFactory(
new OpenSearchNodeClient(this.client), pluginSettings))
.add(new PrometheusStorageFactory(pluginSettings))
.add(new SparkStorageFactory(this.client, pluginSettings))
.build(),
dataSourceMetadataStorage,
dataSourceUserAuthorizationHelper);
Expand Down
4 changes: 3 additions & 1 deletion settings.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,6 @@ include 'legacy'
include 'sql'
include 'prometheus'
include 'benchmarks'
include 'datasources'
include 'datasources'
include 'spark'

42 changes: 42 additions & 0 deletions spark/.gitignore
rupal-bq marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
.gradle
build/
!gradle/wrapper/gradle-wrapper.jar
!**/src/main/**/build/
!**/src/test/**/build/

### IntelliJ IDEA ###
.idea/modules.xml
.idea/jarRepositories.xml
.idea/compiler.xml
.idea/libraries/
*.iws
*.iml
*.ipr
out/
!**/src/main/**/out/
!**/src/test/**/out/

### Eclipse ###
.apt_generated
.classpath
.factorypath
.project
.settings
.springBeans
.sts4-cache
bin/
!**/src/main/**/bin/
!**/src/test/**/bin/

### NetBeans ###
/nbproject/private/
/nbbuild/
/dist/
/nbdist/
/.nb-gradle/

### VS Code ###
.vscode/

### Mac OS ###
.DS_Store
73 changes: 73 additions & 0 deletions spark/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

plugins {
id 'java-library'
id "io.freefair.lombok"
id 'jacoco'
}

repositories {
mavenCentral()
}

dependencies {
api project(':core')
implementation project(':datasources')

implementation group: 'org.opensearch', name: 'opensearch', version: "${opensearch_version}"
penghuo marked this conversation as resolved.
Show resolved Hide resolved
implementation group: 'org.json', name: 'json', version: '20230227'

testImplementation('org.junit.jupiter:junit-jupiter:5.6.2')
testImplementation group: 'org.mockito', name: 'mockito-core', version: '3.12.4'
testImplementation group: 'org.mockito', name: 'mockito-junit-jupiter', version: '3.12.4'
}

test {
useJUnitPlatform()
testLogging {
events "passed", "skipped", "failed"
exceptionFormat "full"
}
}

jacocoTestReport {
reports {
html.enabled true
xml.enabled true
}
afterEvaluate {
classDirectories.setFrom(files(classDirectories.files.collect {
fileTree(dir: it)
}))
}
}
test.finalizedBy(project.tasks.jacocoTestReport)

jacocoTestCoverageVerification {
violationRules {
rule {
element = 'CLASS'
excludes = [
'org.opensearch.sql.spark.data.constants.*'
]
limit {
counter = 'LINE'
minimum = 1.0
}
limit {
counter = 'BRANCH'
minimum = 1.0
}
}
}
afterEvaluate {
classDirectories.setFrom(files(classDirectories.files.collect {
fileTree(dir: it)
}))
}
}
check.dependsOn jacocoTestCoverageVerification
jacocoTestCoverageVerification.dependsOn jacocoTestReport
3 changes: 3 additions & 0 deletions spark/lombok.config
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
# This file is generated by the 'io.freefair.lombok' Gradle plugin
rupal-bq marked this conversation as resolved.
Show resolved Hide resolved
config.stopBubbling = true
lombok.addLombokGeneratedAnnotation = true
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.spark.client;

import java.io.IOException;
import org.json.JSONObject;

/**
* Interface class for Spark Client.
*/
public interface SparkClient {
/**
* This method executes spark sql query.
*
* @param query spark sql query
* @return spark query response
*/
JSONObject sql(String query) throws IOException;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.spark.functions.implementation;

import static org.opensearch.sql.spark.functions.resolver.SqlTableFunctionResolver.QUERY;

import java.util.List;
import java.util.stream.Collectors;
import org.opensearch.sql.data.model.ExprValue;
import org.opensearch.sql.data.type.ExprCoreType;
import org.opensearch.sql.data.type.ExprType;
import org.opensearch.sql.exception.ExpressionEvaluationException;
import org.opensearch.sql.expression.Expression;
import org.opensearch.sql.expression.FunctionExpression;
import org.opensearch.sql.expression.NamedArgumentExpression;
import org.opensearch.sql.expression.env.Environment;
import org.opensearch.sql.expression.function.FunctionName;
import org.opensearch.sql.expression.function.TableFunctionImplementation;
import org.opensearch.sql.spark.client.SparkClient;
import org.opensearch.sql.spark.request.SparkQueryRequest;
import org.opensearch.sql.spark.storage.SparkMetricTable;
import org.opensearch.sql.storage.Table;

/**
* Spark SQL function implementation.
*/
public class SqlFunctionImplementation extends FunctionExpression
rupal-bq marked this conversation as resolved.
Show resolved Hide resolved
implements TableFunctionImplementation {

private final FunctionName functionName;
private final List<Expression> arguments;
private final SparkClient sparkClient;

/**
* Constructor for spark sql function.
*
* @param functionName name of the function
* @param arguments a list of expressions
* @param sparkClient spark client
*/
public SqlFunctionImplementation(
FunctionName functionName, List<Expression> arguments, SparkClient sparkClient) {
super(functionName, arguments);
this.functionName = functionName;
this.arguments = arguments;
this.sparkClient = sparkClient;
}

@Override
public ExprValue valueOf(Environment<Expression, ExprValue> valueEnv) {
throw new UnsupportedOperationException(String.format(
"Spark defined function [%s] is only "
+ "supported in SOURCE clause with spark connector catalog", functionName));
}

@Override
public ExprType type() {
return ExprCoreType.STRUCT;
}

@Override
public String toString() {
List<String> args = arguments.stream()
.map(arg -> String.format("%s=%s",
((NamedArgumentExpression) arg).getArgName(),
((NamedArgumentExpression) arg).getValue().toString()))
.collect(Collectors.toList());
return String.format("%s(%s)", functionName, String.join(", ", args));
}

@Override
public Table applyArguments() {
return new SparkMetricTable(sparkClient, buildQueryFromSqlFunction(arguments));
}

/**
* This method builds a spark query request.
*
* @param arguments spark sql function arguments
* @return spark query request
*/
private SparkQueryRequest buildQueryFromSqlFunction(List<Expression> arguments) {

SparkQueryRequest sparkQueryRequest = new SparkQueryRequest();
arguments.forEach(arg -> {
String argName = ((NamedArgumentExpression) arg).getArgName();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is arg always NamedArgumentExpression? How do you guarantee that?
You can do cast in constructor though.

Expression argValue = ((NamedArgumentExpression) arg).getValue();
ExprValue literalValue = argValue.valueOf();
if (argName.equals(QUERY)) {
sparkQueryRequest.setSql((String) literalValue.value());
} else {
throw new ExpressionEvaluationException(
String.format("Invalid Function Argument:%s", argName));
}
});
return sparkQueryRequest;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.spark.functions.resolver;

import static org.opensearch.sql.data.type.ExprCoreType.STRING;

import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
import lombok.RequiredArgsConstructor;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.opensearch.sql.exception.SemanticCheckException;
import org.opensearch.sql.expression.Expression;
import org.opensearch.sql.expression.NamedArgumentExpression;
import org.opensearch.sql.expression.function.FunctionBuilder;
import org.opensearch.sql.expression.function.FunctionName;
import org.opensearch.sql.expression.function.FunctionResolver;
import org.opensearch.sql.expression.function.FunctionSignature;
import org.opensearch.sql.spark.client.SparkClient;
import org.opensearch.sql.spark.functions.implementation.SqlFunctionImplementation;

/**
* Function resolver for sql function of spark connector.
*/
@RequiredArgsConstructor
public class SqlTableFunctionResolver implements FunctionResolver {
private final SparkClient sparkClient;

public static final String SQL = "sql";
public static final String QUERY = "query";

@Override
public Pair<FunctionSignature, FunctionBuilder> resolve(FunctionSignature unresolvedSignature) {
FunctionName functionName = FunctionName.of(SQL);
FunctionSignature functionSignature =
new FunctionSignature(functionName, List.of(STRING));
final List<String> argumentNames = List.of(QUERY);

FunctionBuilder functionBuilder = (functionProperties, arguments) -> {
penghuo marked this conversation as resolved.
Show resolved Hide resolved
Boolean argumentsPassedByName = arguments.stream()
.noneMatch(arg -> StringUtils.isEmpty(((NamedArgumentExpression) arg).getArgName()));
Boolean argumentsPassedByPosition = arguments.stream()
.allMatch(arg -> StringUtils.isEmpty(((NamedArgumentExpression) arg).getArgName()));
if (!(argumentsPassedByName || argumentsPassedByPosition)) {
throw new SemanticCheckException("Arguments should be either passed by name or position");
}

if (arguments.size() != argumentNames.size()) {
throw new SemanticCheckException(
String.format("Missing arguments:[%s]",
String.join(",", argumentNames.subList(arguments.size(), argumentNames.size()))));
}

if (argumentsPassedByPosition) {
List<Expression> namedArguments = new ArrayList<>();
for (int i = 0; i < arguments.size(); i++) {
namedArguments.add(new NamedArgumentExpression(argumentNames.get(i),
((NamedArgumentExpression) arguments.get(i)).getValue()));
}
return new SqlFunctionImplementation(functionName, namedArguments, sparkClient);
}
return new SqlFunctionImplementation(functionName, arguments, sparkClient);
};
return Pair.of(functionSignature, functionBuilder);
}

@Override
public FunctionName getFunctionName() {
return FunctionName.of(SQL);
}
}
Loading