Skip to content

Commit

Permalink
Create Job API
Browse files Browse the repository at this point in the history
Signed-off-by: Vamsi Manohar <[email protected]>
  • Loading branch information
vmmusings committed Sep 14, 2023
1 parent a7af359 commit fb37a58
Show file tree
Hide file tree
Showing 33 changed files with 913 additions and 22 deletions.
4 changes: 2 additions & 2 deletions common/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@ dependencies {
api group: 'org.apache.commons', name: 'commons-lang3', version: '3.12.0'
api group: 'com.squareup.okhttp3', name: 'okhttp', version: '4.9.3'
implementation 'com.github.babbel:okhttp-aws-signer:1.0.2'
api group: 'com.amazonaws', name: 'aws-java-sdk-core', version: '1.12.1'
api group: 'com.amazonaws', name: 'aws-java-sdk-sts', version: '1.12.1'
api group: 'com.amazonaws', name: 'aws-java-sdk-core', version: '1.12.545'
api group: 'com.amazonaws', name: 'aws-java-sdk-sts', version: '1.12.545'
implementation "com.github.seancfoley:ipaddress:5.4.0"

testImplementation group: 'junit', name: 'junit', version: '4.13.2'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ public enum Key {

METRICS_ROLLING_WINDOW("plugins.query.metrics.rolling_window"),
METRICS_ROLLING_INTERVAL("plugins.query.metrics.rolling_interval"),

SPARK_EXECUTION_ENGINE_CONFIG("plugins.query.executionengine.spark.config"),
CLUSTER_NAME("cluster.name");

@Getter private final String keyValue;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,15 @@ public interface DataSourceService {
*/
DataSourceMetadata getDataSourceMetadata(String name);

/**
* Returns dataSourceMetadata object with specific name. The returned objects contain all the
* metadata information.
*
* @param name name of the {@link DataSource}.
* @return set of {@link DataSourceMetadata}.
*/
DataSourceMetadata getRawDataSourceMetadata(String name);

/**
* Register {@link DataSource} defined by {@link DataSourceMetadata}.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ void execute(
/** Data class that encapsulates ExprValue. */
@Data
class QueryResponse {
private String status = "COMPLETED";
private final Schema schema;
private final List<ExprValue> results;
private final Cursor cursor;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,17 @@ public DataSourceMetadata getDataSourceMetadata(String datasourceName) {
return dataSourceMetadataOptional.get();
}

@Override
public DataSourceMetadata getRawDataSourceMetadata(String datasourceName) {
Optional<DataSourceMetadata> dataSourceMetadataOptional =
getDataSourceMetadataFromName(datasourceName);
if (dataSourceMetadataOptional.isEmpty()) {
throw new IllegalArgumentException(
"DataSource with name: " + datasourceName + " doesn't exist.");
}
return dataSourceMetadataOptional.get();
}

@Override
public DataSource getDataSource(String dataSourceName) {
Optional<DataSourceMetadata> dataSourceMetadataOptional =
Expand Down
1 change: 1 addition & 0 deletions integ-test/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,7 @@ configurations.all {
resolutionStrategy.force "org.jetbrains.kotlin:kotlin-stdlib-jdk7:1.5.31"
resolutionStrategy.force "joda-time:joda-time:2.10.12"
resolutionStrategy.force "org.slf4j:slf4j-api:1.7.36"
resolutionStrategy.force "com.amazonaws:aws-java-sdk-core:1.12.545"
}

configurations {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,12 @@ public class OpenSearchSettings extends Settings {
Setting.Property.NodeScope,
Setting.Property.Dynamic);

public static final Setting<String> SPARK_EXECUTION_ENGINE_CONFIG =
Setting.simpleString(
Key.SPARK_EXECUTION_ENGINE_CONFIG.getKeyValue(),
Setting.Property.NodeScope,
Setting.Property.Dynamic);

/** Construct OpenSearchSetting. The OpenSearchSetting must be singleton. */
@SuppressWarnings("unchecked")
public OpenSearchSettings(ClusterSettings clusterSettings) {
Expand Down Expand Up @@ -193,6 +199,12 @@ public OpenSearchSettings(ClusterSettings clusterSettings) {
Key.DATASOURCES_URI_HOSTS_DENY_LIST,
DATASOURCE_URI_HOSTS_DENY_LIST,
new Updater(Key.DATASOURCES_URI_HOSTS_DENY_LIST));
register(
settingBuilder,
clusterSettings,
Key.SPARK_EXECUTION_ENGINE_CONFIG,
SPARK_EXECUTION_ENGINE_CONFIG,
new Updater(Key.SPARK_EXECUTION_ENGINE_CONFIG));
registerNonDynamicSettings(
settingBuilder, clusterSettings, Key.CLUSTER_NAME, ClusterName.CLUSTER_NAME_SETTING);
defaultSettings = settingBuilder.build();
Expand Down Expand Up @@ -257,6 +269,7 @@ public static List<Setting<?>> pluginSettings() {
.add(METRICS_ROLLING_WINDOW_SETTING)
.add(METRICS_ROLLING_INTERVAL_SETTING)
.add(DATASOURCE_URI_HOSTS_DENY_LIST)
.add(SPARK_EXECUTION_ENGINE_CONFIG)
.build();
}

Expand Down
49 changes: 48 additions & 1 deletion plugin/src/main/java/org/opensearch/sql/plugin/SQLPlugin.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,15 @@
package org.opensearch.sql.plugin;

import static org.opensearch.sql.datasource.model.DataSourceMetadata.defaultOpenSearchDataSourceMetadata;
import static org.opensearch.sql.spark.data.constants.SparkConstants.STEP_ID_FIELD;

import com.amazonaws.auth.DefaultAWSCredentialsProviderChain;
import com.amazonaws.services.emrserverless.AWSEMRServerless;
import com.amazonaws.services.emrserverless.AWSEMRServerlessClientBuilder;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import java.security.AccessController;
import java.security.PrivilegedAction;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
Expand Down Expand Up @@ -83,6 +89,15 @@
import org.opensearch.sql.plugin.transport.TransportPPLQueryAction;
import org.opensearch.sql.plugin.transport.TransportPPLQueryResponse;
import org.opensearch.sql.prometheus.storage.PrometheusStorageFactory;
import org.opensearch.sql.spark.client.EmrServerlessClient;
import org.opensearch.sql.spark.client.EmrServerlessClientImpl;
import org.opensearch.sql.spark.config.SparkExecutionEngineConfig;
import org.opensearch.sql.spark.dispatcher.SparkQueryDispatcher;
import org.opensearch.sql.spark.jobs.JobManagementService;
import org.opensearch.sql.spark.jobs.JobManagementServiceImpl;
import org.opensearch.sql.spark.jobs.JobMetadataStorageService;
import org.opensearch.sql.spark.jobs.OpensearchJobMetadataStorageService;
import org.opensearch.sql.spark.response.SparkResponse;
import org.opensearch.sql.spark.rest.RestJobManagementAction;
import org.opensearch.sql.spark.storage.SparkStorageFactory;
import org.opensearch.sql.spark.transport.TransportCreateJobRequestAction;
Expand Down Expand Up @@ -110,6 +125,7 @@ public class SQLPlugin extends Plugin implements ActionPlugin, ScriptPlugin {

private NodeClient client;
private DataSourceServiceImpl dataSourceService;
private JobManagementService jobManagementService;
private Injector injector;

public String name() {
Expand Down Expand Up @@ -202,6 +218,7 @@ public Collection<Object> createComponents(
dataSourceService.createDataSource(defaultOpenSearchDataSourceMetadata());
LocalClusterState.state().setClusterService(clusterService);
LocalClusterState.state().setPluginSettings((OpenSearchSettings) pluginSettings);
this.jobManagementService = createJobManagementService();

ModulesBuilder modules = new ModulesBuilder();
modules.add(new OpenSearchPluginModule());
Expand All @@ -213,7 +230,7 @@ public Collection<Object> createComponents(
});

injector = modules.createInjector();
return ImmutableList.of(dataSourceService);
return ImmutableList.of(dataSourceService, jobManagementService);
}

@Override
Expand Down Expand Up @@ -270,4 +287,34 @@ private DataSourceServiceImpl createDataSourceService() {
dataSourceMetadataStorage,
dataSourceUserAuthorizationHelper);
}

private JobManagementService createJobManagementService() {
JobMetadataStorageService jobMetadataStorageService =
new OpensearchJobMetadataStorageService(client, clusterService);
EmrServerlessClient emrServerlessClient = createEMRServerlessClient();
SparkQueryDispatcher sparkQueryDispatcher =
new SparkQueryDispatcher(emrServerlessClient, this.dataSourceService);
return new JobManagementServiceImpl(
jobMetadataStorageService, sparkQueryDispatcher, pluginSettings);
}

private EmrServerlessClient createEMRServerlessClient() {
String sparkExecutionEngineConfigString =
this.pluginSettings.getSettingValue(
org.opensearch.sql.common.setting.Settings.Key.SPARK_EXECUTION_ENGINE_CONFIG);
return AccessController.doPrivileged(
(PrivilegedAction<EmrServerlessClient>)
() -> {
SparkExecutionEngineConfig sparkExecutionEngineConfig =
SparkExecutionEngineConfig.toSparkExecutionEngineConfig(
sparkExecutionEngineConfigString);
AWSEMRServerless awsemrServerless =
AWSEMRServerlessClientBuilder.standard()
.withRegion(sparkExecutionEngineConfig.getRegion())
.withCredentials(new DefaultAWSCredentialsProviderChain())
.build();
return new EmrServerlessClientImpl(
awsemrServerless, new SparkResponse(client, null, STEP_ID_FIELD));
});
}
}
9 changes: 9 additions & 0 deletions plugin/src/main/plugin-metadata/plugin-security.policy
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,13 @@ grant {

// ml-commons client
permission java.lang.RuntimePermission "setContextClassLoader";

// aws credentials
permission java.io.FilePermission "${user.home}${/}.aws${/}*", "read";

// Permissions for aws emr servless sdk
permission javax.management.MBeanServerPermission "createMBeanServer";
permission javax.management.MBeanServerPermission "findMBeanServer";
permission javax.management.MBeanPermission "com.amazonaws.metrics.*", "*";
permission javax.management.MBeanTrustPermission "register";
};
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import java.util.Map;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.Setter;
import org.opensearch.sql.data.model.ExprValue;
import org.opensearch.sql.data.model.ExprValueUtils;
import org.opensearch.sql.executor.ExecutionEngine;
Expand All @@ -25,6 +26,8 @@
@RequiredArgsConstructor
public class QueryResult implements Iterable<Object[]> {

@Setter @Getter private String status;

@Getter private final ExecutionEngine.Schema schema;

/** Results which are collection of expression. */
Expand Down
5 changes: 4 additions & 1 deletion spark/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,14 @@ repositories {

dependencies {
api project(':core')
implementation project(':protocol')
implementation project(':datasources')

implementation group: 'org.opensearch', name: 'opensearch', version: "${opensearch_version}"
implementation group: 'org.json', name: 'json', version: '20230227'
implementation group: 'com.amazonaws', name: 'aws-java-sdk-emr', version: '1.12.1'
api group: 'com.amazonaws', name: 'aws-java-sdk-emr', version: '1.12.545'
api group: 'com.amazonaws', name: 'aws-java-sdk-emrserverless', version: '1.12.545'
implementation group: 'commons-io', name: 'commons-io', version: '2.8.0'

testImplementation('org.junit.jupiter:junit-jupiter:5.6.2')
testImplementation group: 'org.mockito', name: 'mockito-core', version: '5.2.0'
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package org.opensearch.sql.spark.client;

import org.json.JSONObject;
import org.opensearch.sql.spark.helper.FlintHelper;

public interface EmrServerlessClient {

String startJobRun(
String applicationId,
String query,
String datasourceRoleArn,
String executionRoleArn,
FlintHelper flintHelper);

JSONObject getJobResult(String applicationId, String jobId);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.spark.client;

import static org.opensearch.sql.spark.data.constants.SparkConstants.GLUE_CATALOG_HIVE_JAR;
import static org.opensearch.sql.spark.data.constants.SparkConstants.SPARK_INDEX_NAME;
import static org.opensearch.sql.spark.data.constants.SparkConstants.SPARK_SQL_APPLICATION_JAR;

import com.amazonaws.services.emrserverless.AWSEMRServerless;
import com.amazonaws.services.emrserverless.model.CancelJobRunRequest;
import com.amazonaws.services.emrserverless.model.GetJobRunRequest;
import com.amazonaws.services.emrserverless.model.GetJobRunResult;
import com.amazonaws.services.emrserverless.model.JobDriver;
import com.amazonaws.services.emrserverless.model.SparkSubmit;
import com.amazonaws.services.emrserverless.model.StartJobRunRequest;
import com.amazonaws.services.emrserverless.model.StartJobRunResult;
import java.util.Set;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.json.JSONObject;
import org.opensearch.sql.spark.helper.FlintHelper;
import org.opensearch.sql.spark.response.SparkResponse;

public class EmrServerlessClientImpl implements EmrServerlessClient {

private final AWSEMRServerless emrServerless;
private final String sparkApplicationJar;
private SparkResponse sparkResponse;
private static final Logger logger = LogManager.getLogger(EmrServerlessClientImpl.class);
private static final Set<String> terminalStates = Set.of("CANCELLED", "FAILED", "SUCCESS");
private static final String JOB_NAME = "flint-opensearch-query";

public EmrServerlessClientImpl(AWSEMRServerless emrServerless, SparkResponse sparkResponse) {
this.emrServerless = emrServerless;
this.sparkApplicationJar = SPARK_SQL_APPLICATION_JAR;
this.sparkResponse = sparkResponse;
}

@Override
public String startJobRun(
String applicationId,
String query,
String datasourceRoleArn,
String executionRoleArn,
FlintHelper flint) {
StartJobRunRequest request =
new StartJobRunRequest()
.withName(JOB_NAME)
.withApplicationId(applicationId)
.withExecutionRoleArn(executionRoleArn)
.withJobDriver(
new JobDriver()
.withSparkSubmit(
new SparkSubmit()
.withEntryPoint(sparkApplicationJar)
.withEntryPointArguments(query, SPARK_INDEX_NAME)
.withSparkSubmitParameters(
"--class org.opensearch.sql.FlintJob --conf"
+ " spark.hadoop.fs.s3.customAWSCredentialsProvider=com.amazonaws.emr.AssumeRoleAWSCredentialsProvider"
+ " --conf"
+ " spark.emr-serverless.driverEnv.ASSUME_ROLE_CREDENTIALS_ROLE_ARN="
+ datasourceRoleArn
+ " --conf spark.executorEnv.ASSUME_ROLE_CREDENTIALS_ROLE_ARN="
+ datasourceRoleArn
+ " --conf"
+ " spark.hadoop.aws.catalog.credentials.provider.factory.class="
+ "com.amazonaws.glue.catalog.metastore.STSAssumeRoleSessionCredentialsProviderFactory"
+ " --conf spark.hive.metastore.glue.role.arn="
+ datasourceRoleArn
// + " --conf
// spark.driver.cores=1"
// + " --conf
// spark.driver.memory=1g"
// + " --conf
// spark.executor.cores=2"
// + " --conf
// spark.executor.memory=4g"
+ " --conf spark.jars="
+ GLUE_CATALOG_HIVE_JAR
+ " --conf spark.jars.packages="
+ "org.opensearch:opensearch-spark-standalone_2.12:0.1.0-SNAPSHOT"
+ " --conf spark.jars.repositories="
+ "https://aws.oss.sonatype.org/content/repositories/snapshots"
+ " --conf"
+ " spark.emr-serverless.driverEnv.JAVA_HOME=/usr/lib/jvm/java-17-amazon-corretto.x86_64/"
+ " --conf"
+ " spark.executorEnv.JAVA_HOME=/usr/lib/jvm/java-17-amazon-corretto.x86_64/"
+ " --conf spark.datasource.flint.host="
+ flint.getFlintHost()
+ " --conf spark.datasource.flint.port="
+ flint.getFlintPort()
+ " --conf spark.datasource.flint.scheme="
+ flint.getFlintScheme()
+ " --conf spark.datasource.flint.auth="
+ flint.getFlintAuth()
+ " --conf spark.datasource.flint.region="
+ flint.getFlintRegion()
+ " --conf"
+ " spark.datasource.flint.customAWSCredentialsProvider=com.amazonaws.emr.AssumeRoleAWSCredentialsProvider"
+ " --conf"
+ " spark.sql.extensions=org.opensearch.flint.spark.FlintSparkExtensions"
+ " --conf spark.hadoop.hive.metastore.client.factory.class="
+ "com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory")));
StartJobRunResult response = emrServerless.startJobRun(request);
logger.info("Job Run ID: " + response.getJobRunId());
sparkResponse.setValue(response.getJobRunId());
return response.getJobRunId();
}

@Override
public JSONObject getJobResult(String applicationId, String jobId) {
sparkResponse.setValue(jobId);
JSONObject result = sparkResponse.getResultFromOpensearchIndex();
if (result == null) {
result = new JSONObject();
result.put("status", getJobRunState(applicationId, jobId));
}
return result;
}

public String getJobRunState(String applicationId, String jobRunId) {
GetJobRunRequest request =
new GetJobRunRequest().withApplicationId(applicationId).withJobRunId(jobRunId);
GetJobRunResult response = emrServerless.getJobRun(request);
logger.info("Job Run state: " + response.getJobRun().getState());
return response.getJobRun().getState();
}

public void cancelJobRun(String applicationId, String jobRunId) {
// Cancel the job run
emrServerless.cancelJobRun(
new CancelJobRunRequest().withApplicationId(applicationId).withJobRunId(jobRunId));
}
}
Loading

0 comments on commit fb37a58

Please sign in to comment.