Skip to content

Commit

Permalink
Merge branch 'opensearch-project:2.x' into 2.x
Browse files Browse the repository at this point in the history
  • Loading branch information
YANG-DB authored Oct 12, 2023
2 parents d044caa + ab02486 commit 6488247
Show file tree
Hide file tree
Showing 12 changed files with 294 additions and 78 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@
public class DataSourceServiceImpl implements DataSourceService {

private static String DATASOURCE_NAME_REGEX = "[@*A-Za-z]+?[*a-zA-Z_\\-0-9]*";
public static final Set<String> CONFIDENTIAL_AUTH_KEYS =
Set.of("auth.username", "auth.password", "auth.access_key", "auth.secret_key");

private final DataSourceLoaderCache dataSourceLoaderCache;

Expand Down Expand Up @@ -159,7 +161,12 @@ private void removeAuthInfo(Set<DataSourceMetadata> dataSourceMetadataSet) {

private void removeAuthInfo(DataSourceMetadata dataSourceMetadata) {
HashMap<String, String> safeProperties = new HashMap<>(dataSourceMetadata.getProperties());
safeProperties.entrySet().removeIf(entry -> entry.getKey().contains("auth"));
safeProperties
.entrySet()
.removeIf(
entry ->
CONFIDENTIAL_AUTH_KEYS.stream()
.anyMatch(confidentialKey -> entry.getKey().endsWith(confidentialKey)));
dataSourceMetadata.setProperties(safeProperties);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,7 @@ void testGetDataSourceMetadataSet() {
assertEquals(1, dataSourceMetadataSet.size());
DataSourceMetadata dataSourceMetadata = dataSourceMetadataSet.iterator().next();
assertTrue(dataSourceMetadata.getProperties().containsKey("prometheus.uri"));
assertFalse(dataSourceMetadata.getProperties().containsKey("prometheus.auth.type"));
assertTrue(dataSourceMetadata.getProperties().containsKey("prometheus.auth.type"));
assertFalse(dataSourceMetadata.getProperties().containsKey("prometheus.auth.username"));
assertFalse(dataSourceMetadata.getProperties().containsKey("prometheus.auth.password"));
assertFalse(
Expand Down Expand Up @@ -352,11 +352,72 @@ void testRemovalOfAuthorizationInfo() {
DataSourceMetadata dataSourceMetadata1 = dataSourceService.getDataSourceMetadata("testDS");
assertEquals("testDS", dataSourceMetadata1.getName());
assertEquals(DataSourceType.PROMETHEUS, dataSourceMetadata1.getConnector());
assertFalse(dataSourceMetadata1.getProperties().containsKey("prometheus.auth.type"));
assertTrue(dataSourceMetadata1.getProperties().containsKey("prometheus.auth.type"));
assertFalse(dataSourceMetadata1.getProperties().containsKey("prometheus.auth.username"));
assertFalse(dataSourceMetadata1.getProperties().containsKey("prometheus.auth.password"));
}

@Test
void testRemovalOfAuthorizationInfoForAccessKeyAndSecretKye() {
HashMap<String, String> properties = new HashMap<>();
properties.put("prometheus.uri", "https://localhost:9090");
properties.put("prometheus.auth.type", "awssigv4");
properties.put("prometheus.auth.access_key", "access_key");
properties.put("prometheus.auth.secret_key", "secret_key");
DataSourceMetadata dataSourceMetadata =
new DataSourceMetadata(
"testDS",
DataSourceType.PROMETHEUS,
Collections.singletonList("prometheus_access"),
properties,
null);
when(dataSourceMetadataStorage.getDataSourceMetadata("testDS"))
.thenReturn(Optional.of(dataSourceMetadata));

DataSourceMetadata dataSourceMetadata1 = dataSourceService.getDataSourceMetadata("testDS");
assertEquals("testDS", dataSourceMetadata1.getName());
assertEquals(DataSourceType.PROMETHEUS, dataSourceMetadata1.getConnector());
assertTrue(dataSourceMetadata1.getProperties().containsKey("prometheus.auth.type"));
assertFalse(dataSourceMetadata1.getProperties().containsKey("prometheus.auth.access_key"));
assertFalse(dataSourceMetadata1.getProperties().containsKey("prometheus.auth.secret_key"));
}

@Test
void testRemovalOfAuthorizationInfoForGlueWithRoleARN() {
HashMap<String, String> properties = new HashMap<>();
properties.put("glue.auth.type", "iam_role");
properties.put("glue.auth.role_arn", "role_arn");
properties.put("glue.indexstore.opensearch.uri", "http://localhost:9200");
properties.put("glue.indexstore.opensearch.auth", "basicauth");
properties.put("glue.indexstore.opensearch.auth.username", "username");
properties.put("glue.indexstore.opensearch.auth.password", "password");
DataSourceMetadata dataSourceMetadata =
new DataSourceMetadata(
"testGlue",
DataSourceType.S3GLUE,
Collections.singletonList("glue_access"),
properties,
null);
when(dataSourceMetadataStorage.getDataSourceMetadata("testGlue"))
.thenReturn(Optional.of(dataSourceMetadata));

DataSourceMetadata dataSourceMetadata1 = dataSourceService.getDataSourceMetadata("testGlue");
assertEquals("testGlue", dataSourceMetadata1.getName());
assertEquals(DataSourceType.S3GLUE, dataSourceMetadata1.getConnector());
assertTrue(dataSourceMetadata1.getProperties().containsKey("glue.auth.type"));
assertTrue(dataSourceMetadata1.getProperties().containsKey("glue.auth.role_arn"));
assertTrue(dataSourceMetadata1.getProperties().containsKey("glue.indexstore.opensearch.uri"));
assertTrue(dataSourceMetadata1.getProperties().containsKey("glue.indexstore.opensearch.auth"));
assertFalse(
dataSourceMetadata1
.getProperties()
.containsKey("glue.indexstore.opensearch.auth.username"));
assertFalse(
dataSourceMetadata1
.getProperties()
.containsKey("glue.indexstore.opensearch.auth.password"));
}

@Test
void testGetDataSourceMetadataForNonExistingDataSource() {
when(dataSourceMetadataStorage.getDataSourceMetadata("testDS")).thenReturn(Optional.empty());
Expand All @@ -381,7 +442,7 @@ void testGetDataSourceMetadataForSpecificDataSourceName() {
"testDS", DataSourceType.PROMETHEUS, Collections.emptyList(), properties)));
DataSourceMetadata dataSourceMetadata = this.dataSourceService.getDataSourceMetadata("testDS");
assertTrue(dataSourceMetadata.getProperties().containsKey("prometheus.uri"));
assertFalse(dataSourceMetadata.getProperties().containsKey("prometheus.auth.type"));
assertTrue(dataSourceMetadata.getProperties().containsKey("prometheus.auth.type"));
assertFalse(dataSourceMetadata.getProperties().containsKey("prometheus.auth.username"));
assertFalse(dataSourceMetadata.getProperties().containsKey("prometheus.auth.password"));
verify(dataSourceMetadataStorage, times(1)).getDataSourceMetadata("testDS");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,10 @@ public void createDataSourceAPITest() {
new Gson().fromJson(getResponseString, DataSourceMetadata.class);
Assert.assertEquals(
"https://localhost:9090", dataSourceMetadata.getProperties().get("prometheus.uri"));
Assert.assertEquals(
"basicauth", dataSourceMetadata.getProperties().get("prometheus.auth.type"));
Assert.assertNull(dataSourceMetadata.getProperties().get("prometheus.auth.username"));
Assert.assertNull(dataSourceMetadata.getProperties().get("prometheus.auth.password"));
Assert.assertEquals("Prometheus Creation for Integ test", dataSourceMetadata.getDescription());
}

Expand Down Expand Up @@ -239,6 +243,10 @@ public void issue2196() {
new Gson().fromJson(getResponseString, DataSourceMetadata.class);
Assert.assertEquals(
"https://localhost:9090", dataSourceMetadata.getProperties().get("prometheus.uri"));
Assert.assertEquals(
"basicauth", dataSourceMetadata.getProperties().get("prometheus.auth.type"));
Assert.assertNull(dataSourceMetadata.getProperties().get("prometheus.auth.username"));
Assert.assertNull(dataSourceMetadata.getProperties().get("prometheus.auth.password"));
Assert.assertEquals("Prometheus Creation for Integ test", dataSourceMetadata.getDescription());
}
}
34 changes: 21 additions & 13 deletions plugin/src/main/java/org/opensearch/sql/plugin/SQLPlugin.java
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,8 @@
import org.opensearch.sql.spark.client.EMRServerlessClient;
import org.opensearch.sql.spark.client.EmrServerlessClientImpl;
import org.opensearch.sql.spark.config.SparkExecutionEngineConfig;
import org.opensearch.sql.spark.config.SparkExecutionEngineConfigSupplier;
import org.opensearch.sql.spark.config.SparkExecutionEngineConfigSupplierImpl;
import org.opensearch.sql.spark.dispatcher.SparkQueryDispatcher;
import org.opensearch.sql.spark.flint.FlintIndexMetadataReaderImpl;
import org.opensearch.sql.spark.response.JobExecutionResponseReader;
Expand Down Expand Up @@ -216,15 +218,21 @@ public Collection<Object> createComponents(
dataSourceService.createDataSource(defaultOpenSearchDataSourceMetadata());
LocalClusterState.state().setClusterService(clusterService);
LocalClusterState.state().setPluginSettings((OpenSearchSettings) pluginSettings);
if (StringUtils.isEmpty(this.pluginSettings.getSettingValue(SPARK_EXECUTION_ENGINE_CONFIG))) {
SparkExecutionEngineConfigSupplier sparkExecutionEngineConfigSupplier =
new SparkExecutionEngineConfigSupplierImpl(pluginSettings);
SparkExecutionEngineConfig sparkExecutionEngineConfig =
sparkExecutionEngineConfigSupplier.getSparkExecutionEngineConfig();
if (StringUtils.isEmpty(sparkExecutionEngineConfig.getRegion())) {
LOGGER.warn(
String.format(
"Async Query APIs are disabled as %s is not configured in cluster settings. "
"Async Query APIs are disabled as %s is not configured properly in cluster settings. "
+ "Please configure and restart the domain to enable Async Query APIs",
SPARK_EXECUTION_ENGINE_CONFIG.getKeyValue()));
this.asyncQueryExecutorService = new AsyncQueryExecutorServiceImpl();
} else {
this.asyncQueryExecutorService = createAsyncQueryExecutorService();
this.asyncQueryExecutorService =
createAsyncQueryExecutorService(
sparkExecutionEngineConfigSupplier, sparkExecutionEngineConfig);
}

ModulesBuilder modules = new ModulesBuilder();
Expand Down Expand Up @@ -295,10 +303,13 @@ private DataSourceServiceImpl createDataSourceService() {
dataSourceUserAuthorizationHelper);
}

private AsyncQueryExecutorService createAsyncQueryExecutorService() {
private AsyncQueryExecutorService createAsyncQueryExecutorService(
SparkExecutionEngineConfigSupplier sparkExecutionEngineConfigSupplier,
SparkExecutionEngineConfig sparkExecutionEngineConfig) {
AsyncQueryJobMetadataStorageService asyncQueryJobMetadataStorageService =
new OpensearchAsyncQueryJobMetadataStorageService(client, clusterService);
EMRServerlessClient emrServerlessClient = createEMRServerlessClient();
EMRServerlessClient emrServerlessClient =
createEMRServerlessClient(sparkExecutionEngineConfig.getRegion());
JobExecutionResponseReader jobExecutionResponseReader = new JobExecutionResponseReader(client);
SparkQueryDispatcher sparkQueryDispatcher =
new SparkQueryDispatcher(
Expand All @@ -309,21 +320,18 @@ private AsyncQueryExecutorService createAsyncQueryExecutorService() {
new FlintIndexMetadataReaderImpl(client),
client);
return new AsyncQueryExecutorServiceImpl(
asyncQueryJobMetadataStorageService, sparkQueryDispatcher, pluginSettings);
asyncQueryJobMetadataStorageService,
sparkQueryDispatcher,
sparkExecutionEngineConfigSupplier);
}

private EMRServerlessClient createEMRServerlessClient() {
String sparkExecutionEngineConfigString =
this.pluginSettings.getSettingValue(SPARK_EXECUTION_ENGINE_CONFIG);
private EMRServerlessClient createEMRServerlessClient(String region) {
return AccessController.doPrivileged(
(PrivilegedAction<EMRServerlessClient>)
() -> {
SparkExecutionEngineConfig sparkExecutionEngineConfig =
SparkExecutionEngineConfig.toSparkExecutionEngineConfig(
sparkExecutionEngineConfigString);
AWSEMRServerless awsemrServerless =
AWSEMRServerlessClientBuilder.standard()
.withRegion(sparkExecutionEngineConfig.getRegion())
.withRegion(region)
.withCredentials(new DefaultAWSCredentialsProviderChain())
.build();
return new EmrServerlessClientImpl(awsemrServerless);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,26 +5,22 @@

package org.opensearch.sql.spark.asyncquery;

import static org.opensearch.sql.common.setting.Settings.Key.CLUSTER_NAME;
import static org.opensearch.sql.common.setting.Settings.Key.SPARK_EXECUTION_ENGINE_CONFIG;
import static org.opensearch.sql.spark.data.constants.SparkConstants.ERROR_FIELD;
import static org.opensearch.sql.spark.data.constants.SparkConstants.STATUS_FIELD;

import com.amazonaws.services.emrserverless.model.JobRunState;
import java.security.AccessController;
import java.security.PrivilegedAction;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import lombok.AllArgsConstructor;
import org.json.JSONObject;
import org.opensearch.cluster.ClusterName;
import org.opensearch.sql.common.setting.Settings;
import org.opensearch.sql.data.model.ExprValue;
import org.opensearch.sql.spark.asyncquery.exceptions.AsyncQueryNotFoundException;
import org.opensearch.sql.spark.asyncquery.model.AsyncQueryExecutionResponse;
import org.opensearch.sql.spark.asyncquery.model.AsyncQueryJobMetadata;
import org.opensearch.sql.spark.config.SparkExecutionEngineConfig;
import org.opensearch.sql.spark.config.SparkExecutionEngineConfigSupplier;
import org.opensearch.sql.spark.dispatcher.SparkQueryDispatcher;
import org.opensearch.sql.spark.dispatcher.model.DispatchQueryRequest;
import org.opensearch.sql.spark.dispatcher.model.DispatchQueryResponse;
Expand All @@ -37,7 +33,7 @@
public class AsyncQueryExecutorServiceImpl implements AsyncQueryExecutorService {
private AsyncQueryJobMetadataStorageService asyncQueryJobMetadataStorageService;
private SparkQueryDispatcher sparkQueryDispatcher;
private Settings settings;
private SparkExecutionEngineConfigSupplier sparkExecutionEngineConfigSupplier;
private Boolean isSparkJobExecutionEnabled;

public AsyncQueryExecutorServiceImpl() {
Expand All @@ -47,26 +43,19 @@ public AsyncQueryExecutorServiceImpl() {
public AsyncQueryExecutorServiceImpl(
AsyncQueryJobMetadataStorageService asyncQueryJobMetadataStorageService,
SparkQueryDispatcher sparkQueryDispatcher,
Settings settings) {
SparkExecutionEngineConfigSupplier sparkExecutionEngineConfigSupplier) {
this.isSparkJobExecutionEnabled = Boolean.TRUE;
this.asyncQueryJobMetadataStorageService = asyncQueryJobMetadataStorageService;
this.sparkQueryDispatcher = sparkQueryDispatcher;
this.settings = settings;
this.sparkExecutionEngineConfigSupplier = sparkExecutionEngineConfigSupplier;
}

@Override
public CreateAsyncQueryResponse createAsyncQuery(
CreateAsyncQueryRequest createAsyncQueryRequest) {
validateSparkExecutionEngineSettings();
String sparkExecutionEngineConfigString =
settings.getSettingValue(SPARK_EXECUTION_ENGINE_CONFIG);
SparkExecutionEngineConfig sparkExecutionEngineConfig =
AccessController.doPrivileged(
(PrivilegedAction<SparkExecutionEngineConfig>)
() ->
SparkExecutionEngineConfig.toSparkExecutionEngineConfig(
sparkExecutionEngineConfigString));
ClusterName clusterName = settings.getSettingValue(CLUSTER_NAME);
sparkExecutionEngineConfigSupplier.getSparkExecutionEngineConfig();
DispatchQueryResponse dispatchQueryResponse =
sparkQueryDispatcher.dispatch(
new DispatchQueryRequest(
Expand All @@ -75,7 +64,7 @@ public CreateAsyncQueryResponse createAsyncQuery(
createAsyncQueryRequest.getDatasource(),
createAsyncQueryRequest.getLang(),
sparkExecutionEngineConfig.getExecutionRoleARN(),
clusterName.value(),
sparkExecutionEngineConfig.getClusterName(),
sparkExecutionEngineConfig.getSparkSubmitParameters()));
asyncQueryJobMetadataStorageService.storeJobMetadata(
new AsyncQueryJobMetadata(
Expand Down
Original file line number Diff line number Diff line change
@@ -1,25 +1,21 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.spark.config;

import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.google.gson.Gson;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

/**
* POJO for spark Execution Engine Config. Interface between {@link
* org.opensearch.sql.spark.asyncquery.AsyncQueryExecutorService} and {@link
* SparkExecutionEngineConfigSupplier}
*/
@Data
@JsonIgnoreProperties(ignoreUnknown = true)
@NoArgsConstructor
@AllArgsConstructor
public class SparkExecutionEngineConfig {
private String applicationId;
private String region;
private String executionRoleARN;

/** Additional Spark submit parameters to append to request. */
private String sparkSubmitParameters;

public static SparkExecutionEngineConfig toSparkExecutionEngineConfig(String jsonString) {
return new Gson().fromJson(jsonString, SparkExecutionEngineConfig.class);
}
private String clusterName;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.spark.config;

import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.google.gson.Gson;
import lombok.Data;

/**
* This POJO is just for reading stringified json in `plugins.query.executionengine.spark.config`
* setting.
*/
@Data
@JsonIgnoreProperties(ignoreUnknown = true)
public class SparkExecutionEngineConfigClusterSetting {
private String applicationId;
private String region;
private String executionRoleARN;

/** Additional Spark submit parameters to append to request. */
private String sparkSubmitParameters;

public static SparkExecutionEngineConfigClusterSetting toSparkExecutionEngineConfig(
String jsonString) {
return new Gson().fromJson(jsonString, SparkExecutionEngineConfigClusterSetting.class);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package org.opensearch.sql.spark.config;

/** Interface for extracting and providing SparkExecutionEngineConfig */
public interface SparkExecutionEngineConfigSupplier {

/**
* Get SparkExecutionEngineConfig
*
* @return {@link SparkExecutionEngineConfig}.
*/
SparkExecutionEngineConfig getSparkExecutionEngineConfig();
}
Loading

0 comments on commit 6488247

Please sign in to comment.