Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Backport 2.x] Merging Async Query APIs feature branch into main. #2163

Merged
merged 1 commit into from
Sep 27, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions common/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,8 @@ dependencies {
api group: 'org.apache.commons', name: 'commons-lang3', version: '3.12.0'
api group: 'com.squareup.okhttp3', name: 'okhttp', version: '4.9.3'
implementation 'com.github.babbel:okhttp-aws-signer:1.0.2'
api group: 'com.amazonaws', name: 'aws-java-sdk-core', version: '1.12.1'
api group: 'com.amazonaws', name: 'aws-java-sdk-sts', version: '1.12.1'
api group: 'com.amazonaws', name: 'aws-java-sdk-core', version: '1.12.545'
api group: 'com.amazonaws', name: 'aws-java-sdk-sts', version: '1.12.545'
implementation "com.github.seancfoley:ipaddress:5.4.0"

testImplementation group: 'junit', name: 'junit', version: '4.13.2'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ public enum Key {

METRICS_ROLLING_WINDOW("plugins.query.metrics.rolling_window"),
METRICS_ROLLING_INTERVAL("plugins.query.metrics.rolling_interval"),

SPARK_EXECUTION_ENGINE_CONFIG("plugins.query.executionengine.spark.config"),
CLUSTER_NAME("cluster.name");

@Getter private final String keyValue;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,15 @@ public interface DataSourceService {
*/
DataSourceMetadata getDataSourceMetadata(String name);

/**
* Returns dataSourceMetadata object with specific name. The returned objects contain all the
* metadata information without any filtering.
*
* @param name name of the {@link DataSource}.
* @return set of {@link DataSourceMetadata}.
*/
DataSourceMetadata getRawDataSourceMetadata(String name);

/**
* Register {@link DataSource} defined by {@link DataSourceMetadata}.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@
public enum DataSourceType {
PROMETHEUS("prometheus"),
OPENSEARCH("opensearch"),
SPARK("spark");
SPARK("spark"),
S3GLUE("s3glue");

private String text;

DataSourceType(String text) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,11 @@ public DataSourceMetadata getDataSourceMetadata(String name) {
return null;
}

@Override
public DataSourceMetadata getRawDataSourceMetadata(String name) {
return null;
}

@Override
public void createDataSource(DataSourceMetadata metadata) {
throw new UnsupportedOperationException("unsupported operation");
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package org.opensearch.sql.datasources.glue;

import java.net.URISyntaxException;
import java.net.UnknownHostException;
import java.util.Map;
import java.util.Set;
import lombok.RequiredArgsConstructor;
import org.opensearch.sql.common.setting.Settings;
import org.opensearch.sql.datasource.model.DataSource;
import org.opensearch.sql.datasource.model.DataSourceMetadata;
import org.opensearch.sql.datasource.model.DataSourceType;
import org.opensearch.sql.datasources.utils.DatasourceValidationUtils;
import org.opensearch.sql.storage.DataSourceFactory;

@RequiredArgsConstructor
public class GlueDataSourceFactory implements DataSourceFactory {

private final Settings pluginSettings;

// Glue configuration properties
public static final String GLUE_AUTH_TYPE = "glue.auth.type";
public static final String GLUE_ROLE_ARN = "glue.auth.role_arn";
public static final String FLINT_URI = "glue.indexstore.opensearch.uri";
public static final String FLINT_AUTH = "glue.indexstore.opensearch.auth";
public static final String FLINT_REGION = "glue.indexstore.opensearch.region";

@Override
public DataSourceType getDataSourceType() {
return DataSourceType.S3GLUE;
}

@Override
public DataSource createDataSource(DataSourceMetadata metadata) {
try {
validateGlueDataSourceConfiguration(metadata.getProperties());
return new DataSource(
metadata.getName(),
metadata.getConnector(),
(dataSourceSchemaName, tableName) -> {
throw new UnsupportedOperationException("Glue storage engine is not supported.");
});
} catch (URISyntaxException | UnknownHostException e) {
throw new IllegalArgumentException("Invalid flint host in properties.");
}
}

private void validateGlueDataSourceConfiguration(Map<String, String> dataSourceMetadataConfig)
throws URISyntaxException, UnknownHostException {
DatasourceValidationUtils.validateLengthAndRequiredFields(
dataSourceMetadataConfig,
Set.of(GLUE_AUTH_TYPE, GLUE_ROLE_ARN, FLINT_URI, FLINT_REGION, FLINT_AUTH));
DatasourceValidationUtils.validateHost(
dataSourceMetadataConfig.get(FLINT_URI),
pluginSettings.getSettingValue(Settings.Key.DATASOURCES_URI_HOSTS_DENY_LIST));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,7 @@ public List<Route> routes() {
new Route(GET, BASE_DATASOURCE_ACTION_URL),

/*
* GET datasources
* Request URL: GET
* PUT datasources
* Request body:
* Ref
* [org.opensearch.sql.plugin.transport.datasource.model.UpdateDataSourceActionRequest]
Expand All @@ -100,8 +99,7 @@ public List<Route> routes() {
new Route(PUT, BASE_DATASOURCE_ACTION_URL),

/*
* GET datasources
* Request URL: GET
* DELETE datasources
* Request body: Ref
* [org.opensearch.sql.plugin.transport.datasource.model.DeleteDataSourceActionRequest]
* Response body: Ref
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,29 +64,17 @@ public Set<DataSourceMetadata> getDataSourceMetadata(boolean isDefaultDataSource
}

@Override
public DataSourceMetadata getDataSourceMetadata(String datasourceName) {
Optional<DataSourceMetadata> dataSourceMetadataOptional =
getDataSourceMetadataFromName(datasourceName);
if (dataSourceMetadataOptional.isEmpty()) {
throw new IllegalArgumentException(
"DataSource with name: " + datasourceName + " doesn't exist.");
}
removeAuthInfo(dataSourceMetadataOptional.get());
return dataSourceMetadataOptional.get();
public DataSourceMetadata getDataSourceMetadata(String dataSourceName) {
DataSourceMetadata dataSourceMetadata = getRawDataSourceMetadata(dataSourceName);
removeAuthInfo(dataSourceMetadata);
return dataSourceMetadata;
}

@Override
public DataSource getDataSource(String dataSourceName) {
Optional<DataSourceMetadata> dataSourceMetadataOptional =
getDataSourceMetadataFromName(dataSourceName);
if (dataSourceMetadataOptional.isEmpty()) {
throw new DataSourceNotFoundException(
String.format("DataSource with name %s doesn't exist.", dataSourceName));
} else {
DataSourceMetadata dataSourceMetadata = dataSourceMetadataOptional.get();
this.dataSourceUserAuthorizationHelper.authorizeDataSource(dataSourceMetadata);
return dataSourceLoaderCache.getOrLoadDataSource(dataSourceMetadata);
}
DataSourceMetadata dataSourceMetadata = getRawDataSourceMetadata(dataSourceName);
this.dataSourceUserAuthorizationHelper.authorizeDataSource(dataSourceMetadata);
return dataSourceLoaderCache.getOrLoadDataSource(dataSourceMetadata);
}

@Override
Expand Down Expand Up @@ -146,11 +134,20 @@ private void validateDataSourceMetaData(DataSourceMetadata metadata) {
+ " Properties are required parameters.");
}

private Optional<DataSourceMetadata> getDataSourceMetadataFromName(String dataSourceName) {
@Override
public DataSourceMetadata getRawDataSourceMetadata(String dataSourceName) {
if (dataSourceName.equals(DEFAULT_DATASOURCE_NAME)) {
return Optional.of(DataSourceMetadata.defaultOpenSearchDataSourceMetadata());
return DataSourceMetadata.defaultOpenSearchDataSourceMetadata();

} else {
return this.dataSourceMetadataStorage.getDataSourceMetadata(dataSourceName);
Optional<DataSourceMetadata> dataSourceMetadataOptional =
this.dataSourceMetadataStorage.getDataSourceMetadata(dataSourceName);
if (dataSourceMetadataOptional.isEmpty()) {
throw new DataSourceNotFoundException(
String.format("DataSource with name %s doesn't exist.", dataSourceName));
} else {
return dataSourceMetadataOptional.get();
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,7 @@ public static void validateLengthAndRequiredFields(
StringBuilder errorStringBuilder = new StringBuilder();
if (missingFields.size() > 0) {
errorStringBuilder.append(
String.format(
"Missing %s fields in the Prometheus connector properties.", missingFields));
String.format("Missing %s fields in the connector properties.", missingFields));
}

if (invalidLengthFields.size() > 0) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
package org.opensearch.sql.datasources.glue;

import static org.mockito.Mockito.when;

import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import lombok.SneakyThrows;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.opensearch.sql.DataSourceSchemaName;
import org.opensearch.sql.common.setting.Settings;
import org.opensearch.sql.datasource.model.DataSource;
import org.opensearch.sql.datasource.model.DataSourceMetadata;
import org.opensearch.sql.datasource.model.DataSourceType;

@ExtendWith(MockitoExtension.class)
public class GlueDataSourceFactoryTest {

@Mock private Settings settings;

@Test
void testGetConnectorType() {
GlueDataSourceFactory glueDatasourceFactory = new GlueDataSourceFactory(settings);
Assertions.assertEquals(DataSourceType.S3GLUE, glueDatasourceFactory.getDataSourceType());
}

@Test
@SneakyThrows
void testCreateGLueDatSource() {
when(settings.getSettingValue(Settings.Key.DATASOURCES_URI_HOSTS_DENY_LIST))
.thenReturn(Collections.emptyList());
GlueDataSourceFactory glueDatasourceFactory = new GlueDataSourceFactory(settings);

DataSourceMetadata metadata = new DataSourceMetadata();
HashMap<String, String> properties = new HashMap<>();
properties.put("glue.auth.type", "iam_role");
properties.put("glue.auth.role_arn", "role_arn");
properties.put("glue.indexstore.opensearch.uri", "http://localhost:9200");
properties.put("glue.indexstore.opensearch.auth", "false");
properties.put("glue.indexstore.opensearch.region", "us-west-2");

metadata.setName("my_glue");
metadata.setConnector(DataSourceType.S3GLUE);
metadata.setProperties(properties);
DataSource dataSource = glueDatasourceFactory.createDataSource(metadata);
Assertions.assertEquals(DataSourceType.S3GLUE, dataSource.getConnectorType());
UnsupportedOperationException unsupportedOperationException =
Assertions.assertThrows(
UnsupportedOperationException.class,
() ->
dataSource
.getStorageEngine()
.getTable(new DataSourceSchemaName("my_glue", "default"), "alb_logs"));
Assertions.assertEquals(
"Glue storage engine is not supported.", unsupportedOperationException.getMessage());
}

@Test
@SneakyThrows
void testCreateGLueDatSourceWithInvalidFlintHost() {
when(settings.getSettingValue(Settings.Key.DATASOURCES_URI_HOSTS_DENY_LIST))
.thenReturn(List.of("127.0.0.0/8"));
GlueDataSourceFactory glueDatasourceFactory = new GlueDataSourceFactory(settings);

DataSourceMetadata metadata = new DataSourceMetadata();
HashMap<String, String> properties = new HashMap<>();
properties.put("glue.auth.type", "iam_role");
properties.put("glue.auth.role_arn", "role_arn");
properties.put("glue.indexstore.opensearch.uri", "http://localhost:9200");
properties.put("glue.indexstore.opensearch.auth", "false");
properties.put("glue.indexstore.opensearch.region", "us-west-2");

metadata.setName("my_glue");
metadata.setConnector(DataSourceType.S3GLUE);
metadata.setProperties(properties);
IllegalArgumentException illegalArgumentException =
Assertions.assertThrows(
IllegalArgumentException.class, () -> glueDatasourceFactory.createDataSource(metadata));
Assertions.assertEquals(
"Disallowed hostname in the uri. "
+ "Validate with plugins.query.datasources.uri.hosts.denylist config",
illegalArgumentException.getMessage());
}

@Test
@SneakyThrows
void testCreateGLueDatSourceWithInvalidFlintHostSyntax() {
when(settings.getSettingValue(Settings.Key.DATASOURCES_URI_HOSTS_DENY_LIST))
.thenReturn(List.of("127.0.0.0/8"));
GlueDataSourceFactory glueDatasourceFactory = new GlueDataSourceFactory(settings);

DataSourceMetadata metadata = new DataSourceMetadata();
HashMap<String, String> properties = new HashMap<>();
properties.put("glue.auth.type", "iam_role");
properties.put("glue.auth.role_arn", "role_arn");
properties.put(
"glue.indexstore.opensearch.uri",
"http://dummyprometheus.com:9090? paramt::localhost:9200");
properties.put("glue.indexstore.opensearch.auth", "false");
properties.put("glue.indexstore.opensearch.region", "us-west-2");

metadata.setName("my_glue");
metadata.setConnector(DataSourceType.S3GLUE);
metadata.setProperties(properties);
IllegalArgumentException illegalArgumentException =
Assertions.assertThrows(
IllegalArgumentException.class, () -> glueDatasourceFactory.createDataSource(metadata));
Assertions.assertEquals(
"Invalid flint host in properties.", illegalArgumentException.getMessage());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -359,11 +359,11 @@ void testRemovalOfAuthorizationInfo() {
@Test
void testGetDataSourceMetadataForNonExistingDataSource() {
when(dataSourceMetadataStorage.getDataSourceMetadata("testDS")).thenReturn(Optional.empty());
IllegalArgumentException exception =
DataSourceNotFoundException exception =
assertThrows(
IllegalArgumentException.class,
DataSourceNotFoundException.class,
() -> dataSourceService.getDataSourceMetadata("testDS"));
assertEquals("DataSource with name: testDS doesn't exist.", exception.getMessage());
assertEquals("DataSource with name testDS doesn't exist.", exception.getMessage());
}

@Test
Expand All @@ -385,4 +385,28 @@ void testGetDataSourceMetadataForSpecificDataSourceName() {
assertFalse(dataSourceMetadata.getProperties().containsKey("prometheus.auth.password"));
verify(dataSourceMetadataStorage, times(1)).getDataSourceMetadata("testDS");
}

@Test
void testGetRawDataSourceMetadata() {
HashMap<String, String> properties = new HashMap<>();
properties.put("prometheus.uri", "https://localhost:9090");
properties.put("prometheus.auth.type", "basicauth");
properties.put("prometheus.auth.username", "username");
properties.put("prometheus.auth.password", "password");
DataSourceMetadata dataSourceMetadata =
new DataSourceMetadata(
"testDS",
DataSourceType.PROMETHEUS,
Collections.singletonList("prometheus_access"),
properties);
when(dataSourceMetadataStorage.getDataSourceMetadata("testDS"))
.thenReturn(Optional.of(dataSourceMetadata));

DataSourceMetadata dataSourceMetadata1 = dataSourceService.getRawDataSourceMetadata("testDS");
assertEquals("testDS", dataSourceMetadata1.getName());
assertEquals(DataSourceType.PROMETHEUS, dataSourceMetadata1.getConnector());
assertTrue(dataSourceMetadata1.getProperties().containsKey("prometheus.auth.type"));
assertTrue(dataSourceMetadata1.getProperties().containsKey("prometheus.auth.username"));
assertTrue(dataSourceMetadata1.getProperties().containsKey("prometheus.auth.password"));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ public void testValidateLengthAndRequiredFieldsWithAbsentField() {
DatasourceValidationUtils.validateLengthAndRequiredFields(
config, Set.of("s3.uri", "s3.auth.type")));
Assertions.assertEquals(
"Missing [s3.auth.type] fields in the Prometheus connector properties.",
"Missing [s3.auth.type] fields in the connector properties.",
illegalArgumentException.getMessage());
}

Expand All @@ -63,7 +63,7 @@ public void testValidateLengthAndRequiredFieldsWithInvalidLength() {
DatasourceValidationUtils.validateLengthAndRequiredFields(
config, Set.of("s3.uri", "s3.auth.type")));
Assertions.assertEquals(
"Missing [s3.auth.type] fields in the Prometheus connector properties.Fields "
"Missing [s3.auth.type] fields in the connector properties.Fields "
+ "[s3.uri] exceeds more than 1000 characters.",
illegalArgumentException.getMessage());
}
Expand Down
Loading
Loading