From de26f1e0139eb7beec96fb4293f56cfef8eaeaca Mon Sep 17 00:00:00 2001 From: Vamsi Manohar Date: Fri, 8 Sep 2023 17:03:37 -0700 Subject: [PATCH] Create Job API Signed-off-by: Vamsi Manohar --- common/build.gradle | 4 +- .../sql/common/setting/Settings.java | 2 +- integ-test/build.gradle | 1 + .../setting/OpenSearchSettings.java | 6 + .../org/opensearch/sql/plugin/SQLPlugin.java | 42 ++++- .../plugin-metadata/plugin-security.policy | 3 + spark/build.gradle | 4 +- .../sql/spark/client/EmrServerlessClient.java | 9 + .../spark/client/EmrServerlessClientImpl.java | 104 +++++++++++ .../config/SparkExecutionEngineConfig.java | 21 +++ .../spark/data/constants/SparkConstants.java | 1 + .../dispatcher/SparkQueryDispatcher.java | 52 ++++++ .../sql/spark/jobs/JobManagementService.java | 15 ++ .../spark/jobs/JobManagementServiceImpl.java | 42 +++++ .../spark/jobs/JobMetadataStorageService.java | 11 ++ .../OpensearchJobMetadataStorageService.java | 162 ++++++++++++++++++ .../sql/spark/jobs/model/JobMetadata.java | 87 ++++++++++ .../resources/job-metadata-index-mapping.yml | 20 +++ .../resources/job-metadata-index-settings.yml | 11 ++ .../client/EmrServerlessClientImplTest.java | 62 +++++++ .../sql/spark/constants/TestConstants.java | 3 + 21 files changed, 657 insertions(+), 5 deletions(-) create mode 100644 spark/src/main/java/org/opensearch/sql/spark/client/EmrServerlessClient.java create mode 100644 spark/src/main/java/org/opensearch/sql/spark/client/EmrServerlessClientImpl.java create mode 100644 spark/src/main/java/org/opensearch/sql/spark/config/SparkExecutionEngineConfig.java create mode 100644 spark/src/main/java/org/opensearch/sql/spark/dispatcher/SparkQueryDispatcher.java create mode 100644 spark/src/main/java/org/opensearch/sql/spark/jobs/JobManagementService.java create mode 100644 spark/src/main/java/org/opensearch/sql/spark/jobs/JobManagementServiceImpl.java create mode 100644 spark/src/main/java/org/opensearch/sql/spark/jobs/JobMetadataStorageService.java create mode 100644 spark/src/main/java/org/opensearch/sql/spark/jobs/OpensearchJobMetadataStorageService.java create mode 100644 spark/src/main/java/org/opensearch/sql/spark/jobs/model/JobMetadata.java create mode 100644 spark/src/main/resources/job-metadata-index-mapping.yml create mode 100644 spark/src/main/resources/job-metadata-index-settings.yml create mode 100644 spark/src/test/java/org/opensearch/sql/spark/client/EmrServerlessClientImplTest.java diff --git a/common/build.gradle b/common/build.gradle index 5cf219fbae..109cad59cb 100644 --- a/common/build.gradle +++ b/common/build.gradle @@ -39,8 +39,8 @@ dependencies { api group: 'org.apache.commons', name: 'commons-lang3', version: '3.12.0' api group: 'com.squareup.okhttp3', name: 'okhttp', version: '4.9.3' implementation 'com.github.babbel:okhttp-aws-signer:1.0.2' - api group: 'com.amazonaws', name: 'aws-java-sdk-core', version: '1.12.1' - api group: 'com.amazonaws', name: 'aws-java-sdk-sts', version: '1.12.1' + api group: 'com.amazonaws', name: 'aws-java-sdk-core', version: '1.12.545' + api group: 'com.amazonaws', name: 'aws-java-sdk-sts', version: '1.12.545' implementation "com.github.seancfoley:ipaddress:5.4.0" testImplementation group: 'junit', name: 'junit', version: '4.13.2' diff --git a/common/src/main/java/org/opensearch/sql/common/setting/Settings.java b/common/src/main/java/org/opensearch/sql/common/setting/Settings.java index be780e8d80..8daf0e9bf6 100644 --- a/common/src/main/java/org/opensearch/sql/common/setting/Settings.java +++ b/common/src/main/java/org/opensearch/sql/common/setting/Settings.java @@ -35,7 +35,7 @@ public enum Key { METRICS_ROLLING_WINDOW("plugins.query.metrics.rolling_window"), METRICS_ROLLING_INTERVAL("plugins.query.metrics.rolling_interval"), - + SPARK_EXECUTION_ENGINE_CONFIG("plugins.query.executionengine.spark.config"), CLUSTER_NAME("cluster.name"); @Getter private final String keyValue; diff --git a/integ-test/build.gradle b/integ-test/build.gradle index 0404900450..dc92f9ebb3 100644 --- a/integ-test/build.gradle +++ b/integ-test/build.gradle @@ -162,6 +162,7 @@ configurations.all { resolutionStrategy.force "org.jetbrains.kotlin:kotlin-stdlib-jdk7:1.5.31" resolutionStrategy.force "joda-time:joda-time:2.10.12" resolutionStrategy.force "org.slf4j:slf4j-api:1.7.36" + resolutionStrategy.force "com.amazonaws:aws-java-sdk-core:1.12.545" } configurations { diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/setting/OpenSearchSettings.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/setting/OpenSearchSettings.java index 48ceacaf10..7efc955aee 100644 --- a/opensearch/src/main/java/org/opensearch/sql/opensearch/setting/OpenSearchSettings.java +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/setting/OpenSearchSettings.java @@ -129,6 +129,12 @@ public class OpenSearchSettings extends Settings { Setting.Property.NodeScope, Setting.Property.Dynamic); + public static final Setting SPARK_EXECUTION_ENGINE_CONFIG = + Setting.simpleString( + Key.SPARK_EXECUTION_ENGINE_CONFIG.getKeyValue(), + Setting.Property.NodeScope, + Setting.Property.Dynamic); + /** Construct OpenSearchSetting. The OpenSearchSetting must be singleton. */ @SuppressWarnings("unchecked") public OpenSearchSettings(ClusterSettings clusterSettings) { diff --git a/plugin/src/main/java/org/opensearch/sql/plugin/SQLPlugin.java b/plugin/src/main/java/org/opensearch/sql/plugin/SQLPlugin.java index 80e1a6b1a3..97c617865c 100644 --- a/plugin/src/main/java/org/opensearch/sql/plugin/SQLPlugin.java +++ b/plugin/src/main/java/org/opensearch/sql/plugin/SQLPlugin.java @@ -6,7 +6,11 @@ package org.opensearch.sql.plugin; import static org.opensearch.sql.datasource.model.DataSourceMetadata.defaultOpenSearchDataSourceMetadata; +import static org.opensearch.sql.spark.data.constants.SparkConstants.STEP_ID_FIELD; +import com.amazonaws.auth.DefaultAWSCredentialsProviderChain; +import com.amazonaws.services.emrserverless.AWSEMRServerless; +import com.amazonaws.services.emrserverless.AWSEMRServerlessClientBuilder; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; import java.util.Arrays; @@ -83,6 +87,15 @@ import org.opensearch.sql.plugin.transport.TransportPPLQueryAction; import org.opensearch.sql.plugin.transport.TransportPPLQueryResponse; import org.opensearch.sql.prometheus.storage.PrometheusStorageFactory; +import org.opensearch.sql.spark.client.EmrServerlessClient; +import org.opensearch.sql.spark.client.EmrServerlessClientImpl; +import org.opensearch.sql.spark.config.SparkExecutionEngineConfig; +import org.opensearch.sql.spark.dispatcher.SparkQueryDispatcher; +import org.opensearch.sql.spark.jobs.JobManagementService; +import org.opensearch.sql.spark.jobs.JobManagementServiceImpl; +import org.opensearch.sql.spark.jobs.JobMetadataStorageService; +import org.opensearch.sql.spark.jobs.OpensearchJobMetadataStorageService; +import org.opensearch.sql.spark.response.SparkResponse; import org.opensearch.sql.spark.rest.RestJobManagementAction; import org.opensearch.sql.spark.storage.SparkStorageFactory; import org.opensearch.sql.spark.transport.TransportCreateJobRequestAction; @@ -110,6 +123,7 @@ public class SQLPlugin extends Plugin implements ActionPlugin, ScriptPlugin { private NodeClient client; private DataSourceServiceImpl dataSourceService; + private JobManagementService jobManagementService; private Injector injector; public String name() { @@ -202,6 +216,7 @@ public Collection createComponents( dataSourceService.createDataSource(defaultOpenSearchDataSourceMetadata()); LocalClusterState.state().setClusterService(clusterService); LocalClusterState.state().setPluginSettings((OpenSearchSettings) pluginSettings); + this.jobManagementService = createJobManagementService(); ModulesBuilder modules = new ModulesBuilder(); modules.add(new OpenSearchPluginModule()); @@ -213,7 +228,7 @@ public Collection createComponents( }); injector = modules.createInjector(); - return ImmutableList.of(dataSourceService); + return ImmutableList.of(dataSourceService, jobManagementService); } @Override @@ -270,4 +285,29 @@ private DataSourceServiceImpl createDataSourceService() { dataSourceMetadataStorage, dataSourceUserAuthorizationHelper); } + + private JobManagementService createJobManagementService() { + JobMetadataStorageService jobMetadataStorageService = + new OpensearchJobMetadataStorageService(client, clusterService); + EmrServerlessClient emrServerlessClient = createEMRServerlessClient(); + SparkQueryDispatcher sparkQueryDispatcher = + new SparkQueryDispatcher(emrServerlessClient, this.dataSourceService); + return new JobManagementServiceImpl( + jobMetadataStorageService, sparkQueryDispatcher, pluginSettings); + } + + private EmrServerlessClient createEMRServerlessClient() { + String sparkExecutionEngineConfigString = + this.pluginSettings.getSettingValue( + org.opensearch.sql.common.setting.Settings.Key.SPARK_EXECUTION_ENGINE_CONFIG); + SparkExecutionEngineConfig sparkExecutionEngineConfig = + SparkExecutionEngineConfig.toSparkExecutionEngineConfig(sparkExecutionEngineConfigString); + AWSEMRServerless awsemrServerless = + AWSEMRServerlessClientBuilder.standard() + .withRegion(sparkExecutionEngineConfig.getRegion()) + .withCredentials(new DefaultAWSCredentialsProviderChain()) + .build(); + return new EmrServerlessClientImpl( + awsemrServerless, new SparkResponse(client, null, STEP_ID_FIELD)); + } } diff --git a/plugin/src/main/plugin-metadata/plugin-security.policy b/plugin/src/main/plugin-metadata/plugin-security.policy index aec517aa84..dfdb033960 100644 --- a/plugin/src/main/plugin-metadata/plugin-security.policy +++ b/plugin/src/main/plugin-metadata/plugin-security.policy @@ -15,4 +15,7 @@ grant { // ml-commons client permission java.lang.RuntimePermission "setContextClassLoader"; + + // aws credentials + permission java.io.FilePermission "${user.home}${/}.aws${/}*", "read"; }; diff --git a/spark/build.gradle b/spark/build.gradle index b93e3327ce..1c03d242ff 100644 --- a/spark/build.gradle +++ b/spark/build.gradle @@ -19,7 +19,9 @@ dependencies { implementation group: 'org.opensearch', name: 'opensearch', version: "${opensearch_version}" implementation group: 'org.json', name: 'json', version: '20230227' - implementation group: 'com.amazonaws', name: 'aws-java-sdk-emr', version: '1.12.1' + api group: 'com.amazonaws', name: 'aws-java-sdk-emr', version: '1.12.545' + api group: 'com.amazonaws', name: 'aws-java-sdk-emrserverless', version: '1.12.545' + implementation group: 'commons-io', name: 'commons-io', version: '2.8.0' testImplementation('org.junit.jupiter:junit-jupiter:5.6.2') testImplementation group: 'org.mockito', name: 'mockito-core', version: '5.2.0' diff --git a/spark/src/main/java/org/opensearch/sql/spark/client/EmrServerlessClient.java b/spark/src/main/java/org/opensearch/sql/spark/client/EmrServerlessClient.java new file mode 100644 index 0000000000..3cde40ffb7 --- /dev/null +++ b/spark/src/main/java/org/opensearch/sql/spark/client/EmrServerlessClient.java @@ -0,0 +1,9 @@ +package org.opensearch.sql.spark.client; + +import org.opensearch.sql.spark.helper.FlintHelper; + +public interface EmrServerlessClient { + + String startJobRun( + String applicationId, String query, String executionRoleArn, FlintHelper flintHelper); +} diff --git a/spark/src/main/java/org/opensearch/sql/spark/client/EmrServerlessClientImpl.java b/spark/src/main/java/org/opensearch/sql/spark/client/EmrServerlessClientImpl.java new file mode 100644 index 0000000000..fc4d4a3b8b --- /dev/null +++ b/spark/src/main/java/org/opensearch/sql/spark/client/EmrServerlessClientImpl.java @@ -0,0 +1,104 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.spark.client; + +import static org.opensearch.sql.spark.data.constants.SparkConstants.SPARK_INDEX_NAME; +import static org.opensearch.sql.spark.data.constants.SparkConstants.SPARK_SQL_APPLICATION_JAR; + +import com.amazonaws.services.emrserverless.AWSEMRServerless; +import com.amazonaws.services.emrserverless.model.CancelJobRunRequest; +import com.amazonaws.services.emrserverless.model.GetJobRunRequest; +import com.amazonaws.services.emrserverless.model.GetJobRunResult; +import com.amazonaws.services.emrserverless.model.JobDriver; +import com.amazonaws.services.emrserverless.model.SparkSubmit; +import com.amazonaws.services.emrserverless.model.StartJobRunRequest; +import com.amazonaws.services.emrserverless.model.StartJobRunResult; +import java.util.Set; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.sql.spark.helper.FlintHelper; +import org.opensearch.sql.spark.response.SparkResponse; + +public class EmrServerlessClientImpl implements EmrServerlessClient { + + private final AWSEMRServerless emrServerless; + private final String sparkApplicationJar; + private SparkResponse sparkResponse; + private static final Logger logger = LogManager.getLogger(EmrServerlessClientImpl.class); + private static final Set terminalStates = Set.of("CANCELLED", "FAILED", "SUCCESS"); + private static final String JOB_NAME = "flint-opensearch-query"; + + public EmrServerlessClientImpl(AWSEMRServerless emrServerless, SparkResponse sparkResponse) { + this.emrServerless = emrServerless; + this.sparkApplicationJar = SPARK_SQL_APPLICATION_JAR; + this.sparkResponse = sparkResponse; + } + + @Override + public String startJobRun( + String applicationId, String query, String executionRoleArn, FlintHelper flint) { + StartJobRunRequest request = + new StartJobRunRequest() + .withName(JOB_NAME) + .withApplicationId(applicationId) + .withExecutionRoleArn(executionRoleArn) + .withJobDriver( + new JobDriver() + .withSparkSubmit( + new SparkSubmit() + .withEntryPoint(sparkApplicationJar) + .withEntryPointArguments( + query, + SPARK_INDEX_NAME, + flint.getFlintHost(), + flint.getFlintPort(), + flint.getFlintScheme(), + flint.getFlintAuth(), + flint.getFlintRegion()) + .withSparkSubmitParameters( + "--class org.opensearch.sql.SQLJob" + + " --conf spark.driver.cores=1" + + " --conf spark.driver.memory=1g" + + " --conf spark.executor.cores=2" + + " --conf spark.executor.memory=4g" + + " --conf spark.jars=" + + flint.getFlintIntegrationJar() + + " --conf spark.datasource.flint.host=" + + flint.getFlintHost() + + " --conf spark.datasource.flint.port=" + + flint.getFlintPort() + + " --conf spark.datasource.flint.scheme=" + + flint.getFlintScheme() + + " --conf spark.datasource.flint.auth=" + + flint.getFlintAuth() + + " --conf spark.datasource.flint.region=" + + flint.getFlintRegion() + + " --conf" + + " spark.emr-serverless.driverEnv.JAVA_HOME=/usr/lib/jvm/java-17-amazon-corretto.x86_64/" + + " --conf" + + " spark.executorEnv.JAVA_HOME=/usr/lib/jvm/java-17-amazon-corretto.x86_64/" + + " --conf" + + " spark.hadoop.hive.metastore.client.factory.class=com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory"))); + StartJobRunResult response = emrServerless.startJobRun(request); + logger.info("Job Run ID: " + response.getJobRunId()); + sparkResponse.setValue(response.getJobRunId()); + return response.getJobRunId(); + } + + public String getJobRunState(String applicationId, String jobRunId) { + GetJobRunRequest request = + new GetJobRunRequest().withApplicationId(applicationId).withJobRunId(jobRunId); + GetJobRunResult response = emrServerless.getJobRun(request); + logger.info("Job Run state: " + response.getJobRun().getState()); + return response.getJobRun().getState(); + } + + public void cancelJobRun(String applicationId, String jobRunId) { + // Cancel the job run + emrServerless.cancelJobRun( + new CancelJobRunRequest().withApplicationId(applicationId).withJobRunId(jobRunId)); + } +} diff --git a/spark/src/main/java/org/opensearch/sql/spark/config/SparkExecutionEngineConfig.java b/spark/src/main/java/org/opensearch/sql/spark/config/SparkExecutionEngineConfig.java new file mode 100644 index 0000000000..3879f7c566 --- /dev/null +++ b/spark/src/main/java/org/opensearch/sql/spark/config/SparkExecutionEngineConfig.java @@ -0,0 +1,21 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.spark.config; + +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import com.google.gson.Gson; +import lombok.Data; + +@Data +@JsonIgnoreProperties(ignoreUnknown = true) +public class SparkExecutionEngineConfig { + private String applicationId; + private String region; + + public static SparkExecutionEngineConfig toSparkExecutionEngineConfig(String jsonString) { + return new Gson().fromJson(jsonString, SparkExecutionEngineConfig.class); + } +} diff --git a/spark/src/main/java/org/opensearch/sql/spark/data/constants/SparkConstants.java b/spark/src/main/java/org/opensearch/sql/spark/data/constants/SparkConstants.java index 65d5a01ba2..3441090353 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/data/constants/SparkConstants.java +++ b/spark/src/main/java/org/opensearch/sql/spark/data/constants/SparkConstants.java @@ -7,6 +7,7 @@ public class SparkConstants { public static final String EMR = "emr"; + public static final String EMRS = "emr-serverless"; public static final String STEP_ID_FIELD = "stepId.keyword"; public static final String SPARK_SQL_APPLICATION_JAR = "s3://spark-datasource/sql-job.jar"; public static final String SPARK_INDEX_NAME = ".query_execution_result"; diff --git a/spark/src/main/java/org/opensearch/sql/spark/dispatcher/SparkQueryDispatcher.java b/spark/src/main/java/org/opensearch/sql/spark/dispatcher/SparkQueryDispatcher.java new file mode 100644 index 0000000000..08ac4cd142 --- /dev/null +++ b/spark/src/main/java/org/opensearch/sql/spark/dispatcher/SparkQueryDispatcher.java @@ -0,0 +1,52 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.spark.dispatcher; + +import static org.opensearch.sql.spark.data.constants.SparkConstants.FLINT_DEFAULT_AUTH; +import static org.opensearch.sql.spark.data.constants.SparkConstants.FLINT_DEFAULT_HOST; +import static org.opensearch.sql.spark.data.constants.SparkConstants.FLINT_DEFAULT_PORT; +import static org.opensearch.sql.spark.data.constants.SparkConstants.FLINT_DEFAULT_REGION; +import static org.opensearch.sql.spark.data.constants.SparkConstants.FLINT_DEFAULT_SCHEME; +import static org.opensearch.sql.spark.data.constants.SparkConstants.FLINT_INTEGRATION_JAR; + +import lombok.AllArgsConstructor; +import org.opensearch.sql.datasource.DataSourceService; +import org.opensearch.sql.datasource.model.DataSourceMetadata; +import org.opensearch.sql.spark.client.EmrServerlessClient; +import org.opensearch.sql.spark.helper.FlintHelper; + +@AllArgsConstructor +public class SparkQueryDispatcher { + + private EmrServerlessClient emrServerlessClient; + + private DataSourceService dataSourceService; + + public String dispatch(String applicationId, String query) { + String datasourceName = getDataSourceName(query); + return emrServerlessClient.startJobRun( + applicationId, query, getExecutionRoleARN(datasourceName), getFlintHelper(datasourceName)); + } + + private String getDataSourceName(String query) { + return "myS3glue"; + } + + private String getExecutionRoleARN(String datasourceName) { + DataSourceMetadata dataSourceMetadata = dataSourceService.getDataSourceMetadata(datasourceName); + return dataSourceMetadata.getProperties().get("glue.auth.role_arn"); + } + + private FlintHelper getFlintHelper(String datasourceName) { + return new FlintHelper( + FLINT_INTEGRATION_JAR, + FLINT_DEFAULT_HOST, + FLINT_DEFAULT_PORT, + FLINT_DEFAULT_SCHEME, + FLINT_DEFAULT_AUTH, + FLINT_DEFAULT_REGION); + } +} diff --git a/spark/src/main/java/org/opensearch/sql/spark/jobs/JobManagementService.java b/spark/src/main/java/org/opensearch/sql/spark/jobs/JobManagementService.java new file mode 100644 index 0000000000..7081a0b87e --- /dev/null +++ b/spark/src/main/java/org/opensearch/sql/spark/jobs/JobManagementService.java @@ -0,0 +1,15 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.spark.jobs; + +public interface JobManagementService { + + String createJob(String query); + + String getJob(String jobId); + + String cancelJob(String jobIds); +} diff --git a/spark/src/main/java/org/opensearch/sql/spark/jobs/JobManagementServiceImpl.java b/spark/src/main/java/org/opensearch/sql/spark/jobs/JobManagementServiceImpl.java new file mode 100644 index 0000000000..f288addcbf --- /dev/null +++ b/spark/src/main/java/org/opensearch/sql/spark/jobs/JobManagementServiceImpl.java @@ -0,0 +1,42 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.spark.jobs; + +import lombok.AllArgsConstructor; +import org.opensearch.sql.common.setting.Settings; +import org.opensearch.sql.spark.config.SparkExecutionEngineConfig; +import org.opensearch.sql.spark.dispatcher.SparkQueryDispatcher; +import org.opensearch.sql.spark.jobs.model.JobMetadata; + +@AllArgsConstructor +public class JobManagementServiceImpl implements JobManagementService { + private JobMetadataStorageService jobMetadataStorageService; + private SparkQueryDispatcher sparkQueryDispatcher; + private Settings settings; + + @Override + public String createJob(String query) { + String sparkExecutionEngineConfigString = + settings.getSettingValue(Settings.Key.SPARK_EXECUTION_ENGINE_CONFIG); + SparkExecutionEngineConfig sparkExecutionEngineConfig = + SparkExecutionEngineConfig.toSparkExecutionEngineConfig(sparkExecutionEngineConfigString); + String jobId = + sparkQueryDispatcher.dispatch(sparkExecutionEngineConfig.getApplicationId(), query); + jobMetadataStorageService.storeJobMetadata( + new JobMetadata(jobId, sparkExecutionEngineConfig.getApplicationId())); + return jobId; + } + + @Override + public String getJob(String jobId) { + return null; + } + + @Override + public String cancelJob(String jobIds) { + return null; + } +} diff --git a/spark/src/main/java/org/opensearch/sql/spark/jobs/JobMetadataStorageService.java b/spark/src/main/java/org/opensearch/sql/spark/jobs/JobMetadataStorageService.java new file mode 100644 index 0000000000..52873d4c25 --- /dev/null +++ b/spark/src/main/java/org/opensearch/sql/spark/jobs/JobMetadataStorageService.java @@ -0,0 +1,11 @@ +package org.opensearch.sql.spark.jobs; + +import java.util.Optional; +import org.opensearch.sql.spark.jobs.model.JobMetadata; + +public interface JobMetadataStorageService { + + void storeJobMetadata(JobMetadata jobMetadata); + + Optional getJobMetadata(String jobId); +} diff --git a/spark/src/main/java/org/opensearch/sql/spark/jobs/OpensearchJobMetadataStorageService.java b/spark/src/main/java/org/opensearch/sql/spark/jobs/OpensearchJobMetadataStorageService.java new file mode 100644 index 0000000000..2e42eca245 --- /dev/null +++ b/spark/src/main/java/org/opensearch/sql/spark/jobs/OpensearchJobMetadataStorageService.java @@ -0,0 +1,162 @@ +package org.opensearch.sql.spark.jobs; + +import java.io.IOException; +import java.io.InputStream; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.List; +import java.util.Optional; +import org.apache.commons.io.IOUtils; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.action.DocWriteRequest; +import org.opensearch.action.DocWriteResponse; +import org.opensearch.action.admin.indices.create.CreateIndexRequest; +import org.opensearch.action.admin.indices.create.CreateIndexResponse; +import org.opensearch.action.index.IndexRequest; +import org.opensearch.action.index.IndexResponse; +import org.opensearch.action.search.SearchRequest; +import org.opensearch.action.search.SearchResponse; +import org.opensearch.action.support.WriteRequest; +import org.opensearch.client.Client; +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.action.ActionFuture; +import org.opensearch.common.util.concurrent.ThreadContext; +import org.opensearch.common.xcontent.XContentType; +import org.opensearch.index.query.QueryBuilder; +import org.opensearch.index.query.QueryBuilders; +import org.opensearch.search.SearchHit; +import org.opensearch.search.builder.SearchSourceBuilder; +import org.opensearch.sql.spark.jobs.model.JobMetadata; + +public class OpensearchJobMetadataStorageService implements JobMetadataStorageService { + + public static final String JOB_METADATA_INDEX = ".ql-job-metadata"; + private static final String JOB_METADATA_INDEX_MAPPING_FILE_NAME = + "job-metadata-index-mapping.yml"; + private static final String JOB_METADATA_INDEX_SETTINGS_FILE_NAME = + "job-metadata-index-settings.yml"; + private static final Logger LOG = LogManager.getLogger(); + private final Client client; + private final ClusterService clusterService; + + /** + * This class implements JobMetadataStorageService interface using OpenSearch as underlying + * storage. + * + * @param client opensearch NodeClient. + * @param clusterService ClusterService. + */ + public OpensearchJobMetadataStorageService(Client client, ClusterService clusterService) { + this.client = client; + this.clusterService = clusterService; + } + + @Override + public void storeJobMetadata(JobMetadata jobMetadata) { + if (!this.clusterService.state().routingTable().hasIndex(JOB_METADATA_INDEX)) { + createDataSourcesIndex(); + } + IndexRequest indexRequest = new IndexRequest(JOB_METADATA_INDEX); + indexRequest.id(jobMetadata.getJobId()); + indexRequest.opType(DocWriteRequest.OpType.CREATE); + indexRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); + ActionFuture indexResponseActionFuture; + IndexResponse indexResponse; + try (ThreadContext.StoredContext storedContext = + client.threadPool().getThreadContext().stashContext()) { + indexRequest.source(JobMetadata.convertToXContent(jobMetadata)); + indexResponseActionFuture = client.index(indexRequest); + indexResponse = indexResponseActionFuture.actionGet(); + } catch (Exception e) { + throw new RuntimeException(e); + } + + if (indexResponse.getResult().equals(DocWriteResponse.Result.CREATED)) { + LOG.debug("JobMetadata : {} successfully created", jobMetadata.getJobId()); + } else { + throw new RuntimeException( + "Saving dataSource metadata information failed with result : " + + indexResponse.getResult().getLowercase()); + } + } + + @Override + public Optional getJobMetadata(String jobId) { + if (!this.clusterService.state().routingTable().hasIndex(JOB_METADATA_INDEX)) { + createDataSourcesIndex(); + return Optional.empty(); + } + return searchInDataSourcesIndex(QueryBuilders.termQuery("jobId", jobId)).stream().findFirst(); + } + + private void createDataSourcesIndex() { + try { + InputStream mappingFileStream = + OpensearchJobMetadataStorageService.class + .getClassLoader() + .getResourceAsStream(JOB_METADATA_INDEX_MAPPING_FILE_NAME); + InputStream settingsFileStream = + OpensearchJobMetadataStorageService.class + .getClassLoader() + .getResourceAsStream(JOB_METADATA_INDEX_SETTINGS_FILE_NAME); + CreateIndexRequest createIndexRequest = new CreateIndexRequest(JOB_METADATA_INDEX); + createIndexRequest + .mapping(IOUtils.toString(mappingFileStream, StandardCharsets.UTF_8), XContentType.YAML) + .settings( + IOUtils.toString(settingsFileStream, StandardCharsets.UTF_8), XContentType.YAML); + ActionFuture createIndexResponseActionFuture; + try (ThreadContext.StoredContext ignored = + client.threadPool().getThreadContext().stashContext()) { + createIndexResponseActionFuture = client.admin().indices().create(createIndexRequest); + } + CreateIndexResponse createIndexResponse = createIndexResponseActionFuture.actionGet(); + if (createIndexResponse.isAcknowledged()) { + LOG.info("Index: {} creation Acknowledged", JOB_METADATA_INDEX); + } else { + throw new RuntimeException("Index creation is not acknowledged."); + } + } catch (Throwable e) { + throw new RuntimeException( + "Internal server error while creating" + + JOB_METADATA_INDEX + + " index:: " + + e.getMessage()); + } + } + + private List searchInDataSourcesIndex(QueryBuilder query) { + SearchRequest searchRequest = new SearchRequest(); + searchRequest.indices(JOB_METADATA_INDEX); + SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); + searchSourceBuilder.query(query); + searchSourceBuilder.size(1); + searchRequest.source(searchSourceBuilder); + // https://github.com/opensearch-project/sql/issues/1801. + searchRequest.preference("_primary_first"); + ActionFuture searchResponseActionFuture; + try (ThreadContext.StoredContext ignored = + client.threadPool().getThreadContext().stashContext()) { + searchResponseActionFuture = client.search(searchRequest); + } + SearchResponse searchResponse = searchResponseActionFuture.actionGet(); + if (searchResponse.status().getStatus() != 200) { + throw new RuntimeException( + "Fetching dataSource metadata information failed with status : " + + searchResponse.status()); + } else { + List list = new ArrayList<>(); + for (SearchHit searchHit : searchResponse.getHits().getHits()) { + String sourceAsString = searchHit.getSourceAsString(); + JobMetadata jobMetadata; + try { + jobMetadata = JobMetadata.toJobMetadata(sourceAsString); + } catch (IOException e) { + throw new RuntimeException(e); + } + list.add(jobMetadata); + } + return list; + } + } +} diff --git a/spark/src/main/java/org/opensearch/sql/spark/jobs/model/JobMetadata.java b/spark/src/main/java/org/opensearch/sql/spark/jobs/model/JobMetadata.java new file mode 100644 index 0000000000..4af54730a0 --- /dev/null +++ b/spark/src/main/java/org/opensearch/sql/spark/jobs/model/JobMetadata.java @@ -0,0 +1,87 @@ +package org.opensearch.sql.spark.jobs.model; + +import static org.opensearch.core.xcontent.XContentParserUtils.ensureExpectedToken; + +import java.io.IOException; +import lombok.AllArgsConstructor; +import lombok.Data; +import org.opensearch.common.xcontent.XContentFactory; +import org.opensearch.common.xcontent.XContentType; +import org.opensearch.core.xcontent.DeprecationHandler; +import org.opensearch.core.xcontent.NamedXContentRegistry; +import org.opensearch.core.xcontent.XContentBuilder; +import org.opensearch.core.xcontent.XContentParser; + +/** This class models all the metadata required for a job. */ +@Data +@AllArgsConstructor +public class JobMetadata { + private String jobId; + private String applicationId; + + /** + * Converts JobMetadata to XContentBuilder. + * + * @param metadata metadata. + * @return XContentBuilder {@link XContentBuilder} + * @throws Exception Exception. + */ + public static XContentBuilder convertToXContent(JobMetadata metadata) throws Exception { + XContentBuilder builder = XContentFactory.jsonBuilder(); + builder.startObject(); + builder.field("jobId", metadata.getJobId()); + builder.field("applicationId", metadata.getApplicationId()); + builder.endObject(); + return builder; + } + + /** + * Converts json string to DataSourceMetadata. + * + * @param json jsonstring. + * @return jobmetadata {@link JobMetadata} + * @throws java.io.IOException IOException. + */ + public static JobMetadata toJobMetadata(String json) throws IOException { + try (XContentParser parser = + XContentType.JSON + .xContent() + .createParser( + NamedXContentRegistry.EMPTY, + DeprecationHandler.THROW_UNSUPPORTED_OPERATION, + json)) { + return toJobMetadata(parser); + } + } + + /** + * Convert xcontent parser to JobMetadata. + * + * @param parser parser. + * @return JobMetadata {@link JobMetadata} + * @throws IOException IOException. + */ + public static JobMetadata toJobMetadata(XContentParser parser) throws IOException { + String jobId = null; + String applicationId = null; + ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.nextToken(), parser); + while (parser.nextToken() != XContentParser.Token.END_OBJECT) { + String fieldName = parser.currentName(); + parser.nextToken(); + switch (fieldName) { + case "jobId": + jobId = parser.textOrNull(); + break; + case "applicationId": + applicationId = parser.textOrNull(); + break; + default: + throw new IllegalArgumentException("Unknown field: " + fieldName); + } + } + if (jobId == null || applicationId == null) { + throw new IllegalArgumentException("jobId and applicationId are required fields."); + } + return new JobMetadata(jobId, applicationId); + } +} diff --git a/spark/src/main/resources/job-metadata-index-mapping.yml b/spark/src/main/resources/job-metadata-index-mapping.yml new file mode 100644 index 0000000000..ec2c83a4df --- /dev/null +++ b/spark/src/main/resources/job-metadata-index-mapping.yml @@ -0,0 +1,20 @@ +--- +## +# Copyright OpenSearch Contributors +# SPDX-License-Identifier: Apache-2.0 +## + +# Schema file for the .ql-job-metadata index +# Also "dynamic" is set to "false" so that other fields can be added. +dynamic: false +properties: + jobId: + type: text + fields: + keyword: + type: keyword + applicationId: + type: text + fields: + keyword: + type: keyword \ No newline at end of file diff --git a/spark/src/main/resources/job-metadata-index-settings.yml b/spark/src/main/resources/job-metadata-index-settings.yml new file mode 100644 index 0000000000..be93f4645c --- /dev/null +++ b/spark/src/main/resources/job-metadata-index-settings.yml @@ -0,0 +1,11 @@ +--- +## +# Copyright OpenSearch Contributors +# SPDX-License-Identifier: Apache-2.0 +## + +# Settings file for the .ql-job-metadata index +index: + number_of_shards: "1" + auto_expand_replicas: "0-2" + number_of_replicas: "0" \ No newline at end of file diff --git a/spark/src/test/java/org/opensearch/sql/spark/client/EmrServerlessClientImplTest.java b/spark/src/test/java/org/opensearch/sql/spark/client/EmrServerlessClientImplTest.java new file mode 100644 index 0000000000..f8f28a0333 --- /dev/null +++ b/spark/src/test/java/org/opensearch/sql/spark/client/EmrServerlessClientImplTest.java @@ -0,0 +1,62 @@ +/* Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.spark.client; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.when; +import static org.opensearch.sql.spark.constants.TestConstants.EMRS_APPLICATION_ID; +import static org.opensearch.sql.spark.constants.TestConstants.EMRS_EXECUTION_ROLE; +import static org.opensearch.sql.spark.constants.TestConstants.QUERY; + +import com.amazonaws.services.emrserverless.AWSEMRServerless; +import com.amazonaws.services.emrserverless.model.CancelJobRunResult; +import com.amazonaws.services.emrserverless.model.GetJobRunResult; +import com.amazonaws.services.emrserverless.model.JobRun; +import com.amazonaws.services.emrserverless.model.StartJobRunResult; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; +import org.opensearch.sql.spark.helper.FlintHelper; +import org.opensearch.sql.spark.response.SparkResponse; + +@ExtendWith(MockitoExtension.class) +public class EmrServerlessClientImplTest { + @Mock private AWSEMRServerless emrServerless; + @Mock private FlintHelper flint; + @Mock private SparkResponse sparkResponse; + + @Test + void testStartJobRun() { + StartJobRunResult response = new StartJobRunResult(); + when(emrServerless.startJobRun(any())).thenReturn(response); + + EmrServerlessClientImpl emrServerlessClient = + new EmrServerlessClientImpl(emrServerless, sparkResponse); + emrServerlessClient.startJobRun(EMRS_APPLICATION_ID, QUERY, EMRS_EXECUTION_ROLE, flint); + } + + @Test + void testGetJobRunState() { + JobRun jobRun = new JobRun(); + jobRun.setState("Running"); + GetJobRunResult response = new GetJobRunResult(); + response.setJobRun(jobRun); + when(emrServerless.getJobRun(any())).thenReturn(response); + + EmrServerlessClientImpl emrServerlessClient = + new EmrServerlessClientImpl(emrServerless, sparkResponse); + emrServerlessClient.getJobRunState(EMRS_APPLICATION_ID, "123"); + } + + @Test + void testCancelJobRun() { + when(emrServerless.cancelJobRun(any())).thenReturn(new CancelJobRunResult()); + + EmrServerlessClientImpl emrServerlessClient = + new EmrServerlessClientImpl(emrServerless, sparkResponse); + emrServerlessClient.cancelJobRun(EMRS_APPLICATION_ID, "123"); + } +} diff --git a/spark/src/test/java/org/opensearch/sql/spark/constants/TestConstants.java b/spark/src/test/java/org/opensearch/sql/spark/constants/TestConstants.java index 2b1020568a..e8a5c2c5cd 100644 --- a/spark/src/test/java/org/opensearch/sql/spark/constants/TestConstants.java +++ b/spark/src/test/java/org/opensearch/sql/spark/constants/TestConstants.java @@ -8,4 +8,7 @@ public class TestConstants { public static final String QUERY = "select 1"; public static final String EMR_CLUSTER_ID = "j-123456789"; + public static final String EMRS_APPLICATION_ID = "xxxxx"; + public static final String EMRS_EXECUTION_ROLE = "execution_role"; + public static final String EMRS_JOB_NAME = "job_name"; }