From e280866ad6a248e852528326ecf568c95b4958a2 Mon Sep 17 00:00:00 2001 From: Peng Huo Date: Fri, 18 Nov 2022 14:54:47 -0800 Subject: [PATCH] Move DataSourceServiceImpl to core module (#1084) Signed-off-by: Peng Huo --- .../sql/datasource/DataSourceService.java | 25 ++- .../sql/datasource/DataSourceServiceImpl.java | 99 +++++++++ .../sql/datasource/model/DataSource.java | 5 +- .../datasource/model/DataSourceMetadata.java | 20 +- ...ConnectorType.java => DataSourceType.java} | 2 +- .../sql/storage/DataSourceFactory.java | 31 +++ .../sql/storage/StorageEngineFactory.java | 19 -- .../sql/analysis/AnalyzerTestBase.java | 17 +- .../datasource/DataSourceServiceImplTest.java | 158 +++++++++++++++ .../model/auth/AuthenticationTypeTest.java | 29 +++ .../datasource/DataSourceTableScanTest.java | 6 +- .../org/opensearch/sql/ppl/StandaloneIT.java | 16 +- .../storage/OpenSearchDataSourceFactory.java | 34 ++++ .../OpenSearchDataSourceFactoryTest.java | 51 +++++ .../org/opensearch/sql/plugin/SQLPlugin.java | 71 +++++-- .../datasource/DataSourceServiceImpl.java | 191 ------------------ .../datasource/DataSourceMetaDataTest.java | 129 ++++++++++++ .../datasource/DataSourceServiceImplTest.java | 141 ------------- .../resources/datasource_missing_name.json | 11 - .../resources/duplicate_datasource_names.json | 22 -- .../src/test/resources/empty_datasource.json | 1 - .../resources/illegal_datasource_name.json | 12 -- .../storage/PrometheusStorageFactory.java | 22 +- .../storage/PrometheusStorageFactoryTest.java | 22 +- 24 files changed, 686 insertions(+), 448 deletions(-) create mode 100644 core/src/main/java/org/opensearch/sql/datasource/DataSourceServiceImpl.java rename core/src/main/java/org/opensearch/sql/datasource/model/{ConnectorType.java => DataSourceType.java} (84%) create mode 100644 core/src/main/java/org/opensearch/sql/storage/DataSourceFactory.java delete mode 100644 core/src/main/java/org/opensearch/sql/storage/StorageEngineFactory.java create mode 100644 core/src/test/java/org/opensearch/sql/datasource/DataSourceServiceImplTest.java create mode 100644 core/src/test/java/org/opensearch/sql/datasource/model/auth/AuthenticationTypeTest.java create mode 100644 opensearch/src/main/java/org/opensearch/sql/opensearch/storage/OpenSearchDataSourceFactory.java create mode 100644 opensearch/src/test/java/org/opensearch/sql/opensearch/storage/OpenSearchDataSourceFactoryTest.java delete mode 100644 plugin/src/main/java/org/opensearch/sql/plugin/datasource/DataSourceServiceImpl.java create mode 100644 plugin/src/test/java/org/opensearch/sql/plugin/datasource/DataSourceMetaDataTest.java delete mode 100644 plugin/src/test/java/org/opensearch/sql/plugin/datasource/DataSourceServiceImplTest.java delete mode 100644 plugin/src/test/resources/datasource_missing_name.json delete mode 100644 plugin/src/test/resources/duplicate_datasource_names.json delete mode 100644 plugin/src/test/resources/empty_datasource.json delete mode 100644 plugin/src/test/resources/illegal_datasource_name.json diff --git a/core/src/main/java/org/opensearch/sql/datasource/DataSourceService.java b/core/src/main/java/org/opensearch/sql/datasource/DataSourceService.java index 88ba8e508c..37e6f8e085 100644 --- a/core/src/main/java/org/opensearch/sql/datasource/DataSourceService.java +++ b/core/src/main/java/org/opensearch/sql/datasource/DataSourceService.java @@ -7,34 +7,37 @@ import java.util.Set; import org.opensearch.sql.datasource.model.DataSource; -import org.opensearch.sql.storage.StorageEngine; +import org.opensearch.sql.datasource.model.DataSourceMetadata; /** - * DataSource Service manages datasources. + * DataSource Service manage {@link DataSource}. */ public interface DataSourceService { /** - * Returns all datasource objects. + * Returns all DataSource objects. * - * @return DataSource datasources. + * @return set of {@link DataSource}. */ Set getDataSources(); /** - * Returns DataSource with corresponding to the datasource name. + * Returns {@link DataSource} with corresponding to the DataSource name. * - * @param dataSourceName Name of the datasource. - * @return DataSource datasource. + * @param dataSourceName Name of the {@link DataSource}. + * @return {@link DataSource}. */ DataSource getDataSource(String dataSourceName); /** - * Default opensearch engine is not defined in datasources config. - * So the registration of default datasource happens separately. + * Register {@link DataSource} defined by {@link DataSourceMetadata}. * - * @param storageEngine StorageEngine. + * @param metadatas list of {@link DataSourceMetadata}. */ - void registerDefaultOpenSearchDataSource(StorageEngine storageEngine); + void addDataSource(DataSourceMetadata... metadatas); + /** + * remove all the registered {@link DataSource}. + */ + void clear(); } diff --git a/core/src/main/java/org/opensearch/sql/datasource/DataSourceServiceImpl.java b/core/src/main/java/org/opensearch/sql/datasource/DataSourceServiceImpl.java new file mode 100644 index 0000000000..274024e548 --- /dev/null +++ b/core/src/main/java/org/opensearch/sql/datasource/DataSourceServiceImpl.java @@ -0,0 +1,99 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.datasource; + +import com.google.common.base.Preconditions; +import com.google.common.base.Strings; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.stream.Collectors; +import org.opensearch.sql.common.utils.StringUtils; +import org.opensearch.sql.datasource.model.DataSource; +import org.opensearch.sql.datasource.model.DataSourceMetadata; +import org.opensearch.sql.datasource.model.DataSourceType; +import org.opensearch.sql.storage.DataSourceFactory; + +/** + * Default implementation of {@link DataSourceService}. It is per-jvm single instance. + * + *

{@link DataSourceService} is constructed by the list of {@link DataSourceFactory} at service + * bootstrap time. The set of {@link DataSourceFactory} is immutable. Client could add {@link + * DataSource} defined by {@link DataSourceMetadata} at any time. {@link DataSourceService} use + * {@link DataSourceFactory} to create {@link DataSource}. + */ +public class DataSourceServiceImpl implements DataSourceService { + + private static String DATASOURCE_NAME_REGEX = "[@*A-Za-z]+?[*a-zA-Z_\\-0-9]*"; + + private final ConcurrentHashMap dataSourceMap; + + private final Map dataSourceFactoryMap; + + /** + * Construct from the set of {@link DataSourceFactory} at bootstrap time. + */ + public DataSourceServiceImpl(Set dataSourceFactories) { + dataSourceFactoryMap = + dataSourceFactories.stream() + .collect(Collectors.toMap(DataSourceFactory::getDataSourceType, f -> f)); + dataSourceMap = new ConcurrentHashMap<>(); + } + + @Override + public Set getDataSources() { + return Set.copyOf(dataSourceMap.values()); + } + + @Override + public DataSource getDataSource(String dataSourceName) { + if (!dataSourceMap.containsKey(dataSourceName)) { + throw new IllegalArgumentException( + String.format("DataSource with name %s doesn't exist.", dataSourceName)); + } + return dataSourceMap.get(dataSourceName); + } + + @Override + public void addDataSource(DataSourceMetadata... metadatas) { + for (DataSourceMetadata metadata : metadatas) { + validateDataSourceMetaData(metadata); + dataSourceMap.put( + metadata.getName(), + dataSourceFactoryMap.get(metadata.getConnector()).createDataSource(metadata)); + } + } + + @Override + public void clear() { + dataSourceMap.clear(); + } + + /** + * This can be moved to a different validator class when we introduce more connectors. + * + * @param metadata {@link DataSourceMetadata}. + */ + private void validateDataSourceMetaData(DataSourceMetadata metadata) { + Preconditions.checkArgument( + !Strings.isNullOrEmpty(metadata.getName()), + "Missing Name Field from a DataSource. Name is a required parameter."); + Preconditions.checkArgument( + !dataSourceMap.containsKey(metadata.getName()), + StringUtils.format( + "Datasource name should be unique, Duplicate datasource found %s.", + metadata.getName())); + Preconditions.checkArgument( + metadata.getName().matches(DATASOURCE_NAME_REGEX), + StringUtils.format( + "DataSource Name: %s contains illegal characters. Allowed characters: a-zA-Z0-9_-*@.", + metadata.getName())); + Preconditions.checkArgument( + !Objects.isNull(metadata.getProperties()), + "Missing properties field in catalog configuration. Properties are required parameters."); + } +} diff --git a/core/src/main/java/org/opensearch/sql/datasource/model/DataSource.java b/core/src/main/java/org/opensearch/sql/datasource/model/DataSource.java index a6ac9a6d66..5deb460961 100644 --- a/core/src/main/java/org/opensearch/sql/datasource/model/DataSource.java +++ b/core/src/main/java/org/opensearch/sql/datasource/model/DataSource.java @@ -12,6 +12,9 @@ import lombok.RequiredArgsConstructor; import org.opensearch.sql.storage.StorageEngine; +/** + * Each user configured datasource mapping to one instance of DataSource per JVM. + */ @Getter @RequiredArgsConstructor @EqualsAndHashCode @@ -19,7 +22,7 @@ public class DataSource { private final String name; - private final ConnectorType connectorType; + private final DataSourceType connectorType; @EqualsAndHashCode.Exclude private final StorageEngine storageEngine; diff --git a/core/src/main/java/org/opensearch/sql/datasource/model/DataSourceMetadata.java b/core/src/main/java/org/opensearch/sql/datasource/model/DataSourceMetadata.java index dbde5040e9..f97d272bb9 100644 --- a/core/src/main/java/org/opensearch/sql/datasource/model/DataSourceMetadata.java +++ b/core/src/main/java/org/opensearch/sql/datasource/model/DataSourceMetadata.java @@ -5,16 +5,23 @@ package org.opensearch.sql.datasource.model; + +import static org.opensearch.sql.analysis.DataSourceSchemaIdentifierNameResolver.DEFAULT_DATASOURCE_NAME; + import com.fasterxml.jackson.annotation.JsonFormat; import com.fasterxml.jackson.annotation.JsonIgnoreProperties; import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.collect.ImmutableMap; import java.util.Map; +import lombok.EqualsAndHashCode; import lombok.Getter; import lombok.Setter; +import org.opensearch.sql.datasource.DataSourceService; @JsonIgnoreProperties(ignoreUnknown = true) @Getter @Setter +@EqualsAndHashCode public class DataSourceMetadata { @JsonProperty(required = true) @@ -22,9 +29,20 @@ public class DataSourceMetadata { @JsonProperty(required = true) @JsonFormat(with = JsonFormat.Feature.ACCEPT_CASE_INSENSITIVE_PROPERTIES) - private ConnectorType connector; + private DataSourceType connector; @JsonProperty(required = true) private Map properties; + /** + * Default OpenSearch {@link DataSourceMetadata}. Which is used to register default OpenSearch + * {@link DataSource} to {@link DataSourceService}. + */ + public static DataSourceMetadata defaultOpenSearchDataSourceMetadata() { + DataSourceMetadata dataSourceMetadata = new DataSourceMetadata(); + dataSourceMetadata.setName(DEFAULT_DATASOURCE_NAME); + dataSourceMetadata.setConnector(DataSourceType.OPENSEARCH); + dataSourceMetadata.setProperties(ImmutableMap.of()); + return dataSourceMetadata; + } } diff --git a/core/src/main/java/org/opensearch/sql/datasource/model/ConnectorType.java b/core/src/main/java/org/opensearch/sql/datasource/model/DataSourceType.java similarity index 84% rename from core/src/main/java/org/opensearch/sql/datasource/model/ConnectorType.java rename to core/src/main/java/org/opensearch/sql/datasource/model/DataSourceType.java index b540d7d401..b6176de5b4 100644 --- a/core/src/main/java/org/opensearch/sql/datasource/model/ConnectorType.java +++ b/core/src/main/java/org/opensearch/sql/datasource/model/DataSourceType.java @@ -5,6 +5,6 @@ package org.opensearch.sql.datasource.model; -public enum ConnectorType { +public enum DataSourceType { PROMETHEUS,OPENSEARCH } diff --git a/core/src/main/java/org/opensearch/sql/storage/DataSourceFactory.java b/core/src/main/java/org/opensearch/sql/storage/DataSourceFactory.java new file mode 100644 index 0000000000..20d263e601 --- /dev/null +++ b/core/src/main/java/org/opensearch/sql/storage/DataSourceFactory.java @@ -0,0 +1,31 @@ +/* + * + * * Copyright OpenSearch Contributors + * * SPDX-License-Identifier: Apache-2.0 + * + */ + +package org.opensearch.sql.storage; + +import org.opensearch.sql.datasource.DataSourceService; +import org.opensearch.sql.datasource.model.DataSource; +import org.opensearch.sql.datasource.model.DataSourceMetadata; +import org.opensearch.sql.datasource.model.DataSourceType; + +/** + * {@link DataSourceFactory} is used to create {@link DataSource} from {@link DataSourceMetadata}. + * Each data source define {@link DataSourceFactory} and register to {@link DataSourceService}. + * {@link DataSourceFactory} is one instance per JVM . Each {@link DataSourceType} mapping to one + * {@link DataSourceFactory}. + */ +public interface DataSourceFactory { + /** + * Get {@link DataSourceType}. + */ + DataSourceType getDataSourceType(); + + /** + * Create {@link DataSource}. + */ + DataSource createDataSource(DataSourceMetadata metadata); +} diff --git a/core/src/main/java/org/opensearch/sql/storage/StorageEngineFactory.java b/core/src/main/java/org/opensearch/sql/storage/StorageEngineFactory.java deleted file mode 100644 index 85d29abf5c..0000000000 --- a/core/src/main/java/org/opensearch/sql/storage/StorageEngineFactory.java +++ /dev/null @@ -1,19 +0,0 @@ -/* - * - * * Copyright OpenSearch Contributors - * * SPDX-License-Identifier: Apache-2.0 - * - */ - -package org.opensearch.sql.storage; - -import java.util.Map; -import org.opensearch.sql.datasource.model.ConnectorType; - -public interface StorageEngineFactory { - - ConnectorType getConnectorType(); - - StorageEngine getStorageEngine(String catalogName, Map requiredConfig); - -} diff --git a/core/src/test/java/org/opensearch/sql/analysis/AnalyzerTestBase.java b/core/src/test/java/org/opensearch/sql/analysis/AnalyzerTestBase.java index b1da7b3e86..a485541a34 100644 --- a/core/src/test/java/org/opensearch/sql/analysis/AnalyzerTestBase.java +++ b/core/src/test/java/org/opensearch/sql/analysis/AnalyzerTestBase.java @@ -22,10 +22,10 @@ import org.opensearch.sql.config.TestConfig; import org.opensearch.sql.data.type.ExprType; import org.opensearch.sql.datasource.DataSourceService; -import org.opensearch.sql.datasource.model.ConnectorType; import org.opensearch.sql.datasource.model.DataSource; +import org.opensearch.sql.datasource.model.DataSourceMetadata; +import org.opensearch.sql.datasource.model.DataSourceType; import org.opensearch.sql.exception.ExpressionEvaluationException; -import org.opensearch.sql.expression.DSL; import org.opensearch.sql.expression.Expression; import org.opensearch.sql.expression.ReferenceExpression; import org.opensearch.sql.expression.env.Environment; @@ -144,9 +144,7 @@ protected Environment typeEnv() { @Bean protected Analyzer analyzer(ExpressionAnalyzer expressionAnalyzer, DataSourceService dataSourceService, - StorageEngine storageEngine, Table table) { - dataSourceService.registerDefaultOpenSearchDataSource(storageEngine); BuiltinFunctionRepository functionRepository = BuiltinFunctionRepository.getInstance(); functionRepository.register("prometheus", new FunctionResolver() { @@ -195,7 +193,7 @@ private class DefaultDataSourceService implements DataSourceService { private StorageEngine storageEngine = storageEngine(); private final DataSource dataSource - = new DataSource("prometheus", ConnectorType.PROMETHEUS, storageEngine); + = new DataSource("prometheus", DataSourceType.PROMETHEUS, storageEngine); @Override @@ -209,8 +207,13 @@ public DataSource getDataSource(String dataSourceName) { } @Override - public void registerDefaultOpenSearchDataSource(StorageEngine storageEngine) { - this.storageEngine = storageEngine; + public void addDataSource(DataSourceMetadata... metadatas) { + throw new UnsupportedOperationException(); + } + + @Override + public void clear() { + throw new UnsupportedOperationException(); } } diff --git a/core/src/test/java/org/opensearch/sql/datasource/DataSourceServiceImplTest.java b/core/src/test/java/org/opensearch/sql/datasource/DataSourceServiceImplTest.java new file mode 100644 index 0000000000..2b40b32ee6 --- /dev/null +++ b/core/src/test/java/org/opensearch/sql/datasource/DataSourceServiceImplTest.java @@ -0,0 +1,158 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.datasource; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.lenient; +import static org.mockito.Mockito.when; +import static org.opensearch.sql.analysis.DataSourceSchemaIdentifierNameResolver.DEFAULT_DATASOURCE_NAME; + +import com.google.common.collect.ImmutableMap; +import java.util.HashSet; +import java.util.Map; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; +import org.opensearch.sql.datasource.model.DataSource; +import org.opensearch.sql.datasource.model.DataSourceMetadata; +import org.opensearch.sql.datasource.model.DataSourceType; +import org.opensearch.sql.storage.DataSourceFactory; +import org.opensearch.sql.storage.StorageEngine; + +@ExtendWith(MockitoExtension.class) +class DataSourceServiceImplTest { + + static final String NAME = "opensearch"; + + @Mock private DataSourceFactory dataSourceFactory; + + @Mock private StorageEngine storageEngine; + + private DataSourceService dataSourceService; + + @BeforeEach + public void setup() { + lenient() + .doAnswer( + invocation -> { + DataSourceMetadata metadata = invocation.getArgument(0); + return new DataSource(metadata.getName(), metadata.getConnector(), storageEngine); + }) + .when(dataSourceFactory) + .createDataSource(any()); + when(dataSourceFactory.getDataSourceType()).thenReturn(DataSourceType.OPENSEARCH); + dataSourceService = + new DataSourceServiceImpl( + new HashSet<>() { + { + add(dataSourceFactory); + } + }); + } + + @AfterEach + public void clear() { + dataSourceService.clear(); + } + + @Test + void getDataSourceSuccess() { + dataSourceService.addDataSource(DataSourceMetadata.defaultOpenSearchDataSourceMetadata()); + + assertEquals( + new DataSource(DEFAULT_DATASOURCE_NAME, DataSourceType.OPENSEARCH, storageEngine), + dataSourceService.getDataSource(DEFAULT_DATASOURCE_NAME)); + } + + @Test + void getNotExistDataSourceShouldFail() { + IllegalArgumentException exception = + assertThrows(IllegalArgumentException.class, () -> dataSourceService.getDataSource("mock")); + assertEquals("DataSource with name mock doesn't exist.", exception.getMessage()); + } + + @Test + void getAddDataSourcesShouldSuccess() { + assertEquals(0, dataSourceService.getDataSources().size()); + + dataSourceService.addDataSource(metadata(NAME, DataSourceType.OPENSEARCH, ImmutableMap.of())); + assertEquals(1, dataSourceService.getDataSources().size()); + } + + @Test + void noDataSourceExistAfterClear() { + dataSourceService.addDataSource(metadata(NAME, DataSourceType.OPENSEARCH, ImmutableMap.of())); + assertEquals(1, dataSourceService.getDataSources().size()); + + dataSourceService.clear(); + assertEquals(0, dataSourceService.getDataSources().size()); + } + + @Test + void metaDataMissingNameShouldFail() { + IllegalArgumentException exception = + assertThrows( + IllegalArgumentException.class, + () -> + dataSourceService.addDataSource( + metadata(null, DataSourceType.OPENSEARCH, ImmutableMap.of()))); + assertEquals( + "Missing Name Field from a DataSource. Name is a required parameter.", + exception.getMessage()); + } + + @Test + void metaDataHasIllegalDataSourceNameShouldFail() { + IllegalArgumentException exception = + assertThrows( + IllegalArgumentException.class, + () -> + dataSourceService.addDataSource( + metadata("prometheus.test", DataSourceType.OPENSEARCH, ImmutableMap.of()))); + assertEquals( + "DataSource Name: prometheus.test contains illegal characters. " + + "Allowed characters: a-zA-Z0-9_-*@.", + exception.getMessage()); + } + + @Test + void metaDataMissingPropertiesShouldFail() { + IllegalArgumentException exception = + assertThrows( + IllegalArgumentException.class, + () -> dataSourceService.addDataSource(metadata(NAME, DataSourceType.OPENSEARCH, null))); + assertEquals( + "Missing properties field in catalog configuration. Properties are required parameters.", + exception.getMessage()); + } + + @Test + void metaDataHasDuplicateNameShouldFail() { + dataSourceService.addDataSource(metadata(NAME, DataSourceType.OPENSEARCH, ImmutableMap.of())); + assertEquals(1, dataSourceService.getDataSources().size()); + + IllegalArgumentException exception = + assertThrows( + IllegalArgumentException.class, + () -> dataSourceService.addDataSource(metadata(NAME, DataSourceType.OPENSEARCH, null))); + assertEquals( + String.format("Datasource name should be unique, Duplicate datasource found %s.", NAME), + exception.getMessage()); + } + + DataSourceMetadata metadata(String name, DataSourceType type, Map properties) { + DataSourceMetadata dataSourceMetadata = new DataSourceMetadata(); + dataSourceMetadata.setName(name); + dataSourceMetadata.setConnector(type); + dataSourceMetadata.setProperties(properties); + return dataSourceMetadata; + } +} diff --git a/core/src/test/java/org/opensearch/sql/datasource/model/auth/AuthenticationTypeTest.java b/core/src/test/java/org/opensearch/sql/datasource/model/auth/AuthenticationTypeTest.java new file mode 100644 index 0000000000..f9e4f3ce59 --- /dev/null +++ b/core/src/test/java/org/opensearch/sql/datasource/model/auth/AuthenticationTypeTest.java @@ -0,0 +1,29 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.datasource.model.auth; + + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNull; + +import org.junit.jupiter.api.Test; + +class AuthenticationTypeTest { + @Test + void getAuthType() { + assertEquals( + AuthenticationType.BASICAUTH, + AuthenticationType.get(AuthenticationType.BASICAUTH.getName())); + assertEquals( + AuthenticationType.AWSSIGV4AUTH, + AuthenticationType.get(AuthenticationType.AWSSIGV4AUTH.getName())); + } + + @Test + void getNotExistAuthType() { + assertNull(AuthenticationType.get("mock")); + } +} diff --git a/core/src/test/java/org/opensearch/sql/planner/physical/datasource/DataSourceTableScanTest.java b/core/src/test/java/org/opensearch/sql/planner/physical/datasource/DataSourceTableScanTest.java index a57d1b1a89..2f7188a248 100644 --- a/core/src/test/java/org/opensearch/sql/planner/physical/datasource/DataSourceTableScanTest.java +++ b/core/src/test/java/org/opensearch/sql/planner/physical/datasource/DataSourceTableScanTest.java @@ -24,8 +24,8 @@ import org.opensearch.sql.data.model.ExprTupleValue; import org.opensearch.sql.data.model.ExprValueUtils; import org.opensearch.sql.datasource.DataSourceService; -import org.opensearch.sql.datasource.model.ConnectorType; import org.opensearch.sql.datasource.model.DataSource; +import org.opensearch.sql.datasource.model.DataSourceType; import org.opensearch.sql.storage.StorageEngine; @ExtendWith(MockitoExtension.class) @@ -52,8 +52,8 @@ void testExplain() { @Test void testIterator() { Set dataSourceSet = new HashSet<>(); - dataSourceSet.add(new DataSource("prometheus", ConnectorType.PROMETHEUS, storageEngine)); - dataSourceSet.add(new DataSource("opensearch", ConnectorType.OPENSEARCH, storageEngine)); + dataSourceSet.add(new DataSource("prometheus", DataSourceType.PROMETHEUS, storageEngine)); + dataSourceSet.add(new DataSource("opensearch", DataSourceType.OPENSEARCH, storageEngine)); when(dataSourceService.getDataSources()).thenReturn(dataSourceSet); assertFalse(dataSourceTableScan.hasNext()); diff --git a/integ-test/src/test/java/org/opensearch/sql/ppl/StandaloneIT.java b/integ-test/src/test/java/org/opensearch/sql/ppl/StandaloneIT.java index a3b341c4a8..b8abc4d45e 100644 --- a/integ-test/src/test/java/org/opensearch/sql/ppl/StandaloneIT.java +++ b/integ-test/src/test/java/org/opensearch/sql/ppl/StandaloneIT.java @@ -6,9 +6,11 @@ package org.opensearch.sql.ppl; +import static org.opensearch.sql.datasource.model.DataSourceMetadata.defaultOpenSearchDataSourceMetadata; import static org.opensearch.sql.protocol.response.format.JsonResponseFormatter.Style.PRETTY; import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; import java.io.IOException; import java.util.Collections; import java.util.List; @@ -23,6 +25,7 @@ import org.opensearch.sql.common.response.ResponseListener; import org.opensearch.sql.common.setting.Settings; import org.opensearch.sql.datasource.DataSourceService; +import org.opensearch.sql.datasource.DataSourceServiceImpl; import org.opensearch.sql.executor.DefaultQueryManager; import org.opensearch.sql.executor.ExecutionEngine; import org.opensearch.sql.executor.ExecutionEngine.QueryResponse; @@ -35,14 +38,14 @@ import org.opensearch.sql.opensearch.client.OpenSearchRestClient; import org.opensearch.sql.opensearch.executor.OpenSearchExecutionEngine; import org.opensearch.sql.opensearch.executor.protector.OpenSearchExecutionProtector; -import org.opensearch.sql.opensearch.storage.OpenSearchStorageEngine; +import org.opensearch.sql.opensearch.storage.OpenSearchDataSourceFactory; import org.opensearch.sql.planner.Planner; import org.opensearch.sql.planner.optimizer.LogicalPlanOptimizer; -import org.opensearch.sql.plugin.datasource.DataSourceServiceImpl; import org.opensearch.sql.ppl.config.PPLServiceConfig; import org.opensearch.sql.ppl.domain.PPLQueryRequest; import org.opensearch.sql.protocol.response.QueryResult; import org.opensearch.sql.protocol.response.format.SimpleJsonResponseFormatter; +import org.opensearch.sql.storage.DataSourceFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.config.ConfigurableBeanFactory; import org.springframework.context.annotation.AnnotationConfigApplicationContext; @@ -72,9 +75,12 @@ public void init() { new OpenSearchExecutionProtector(new AlwaysHealthyMonitor()))); context.registerBean(OpenSearchClient.class, () -> client); context.registerBean(Settings.class, () -> defaultSettings()); - OpenSearchStorageEngine openSearchStorageEngine = new OpenSearchStorageEngine(client, defaultSettings()); - DataSourceServiceImpl.getInstance().registerDefaultOpenSearchDataSource(openSearchStorageEngine); - context.registerBean(DataSourceService.class, DataSourceServiceImpl::getInstance); + DataSourceService dataSourceService = new DataSourceServiceImpl( + new ImmutableSet.Builder() + .add(new OpenSearchDataSourceFactory(client, defaultSettings())) + .build()); + dataSourceService.addDataSource(defaultOpenSearchDataSourceMetadata()); + context.registerBean(DataSourceService.class, () -> dataSourceService); context.register(StandaloneConfig.class); context.register(PPLServiceConfig.class); context.refresh(); diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/OpenSearchDataSourceFactory.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/OpenSearchDataSourceFactory.java new file mode 100644 index 0000000000..011f6236fb --- /dev/null +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/OpenSearchDataSourceFactory.java @@ -0,0 +1,34 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.opensearch.storage; + +import lombok.RequiredArgsConstructor; +import org.opensearch.sql.common.setting.Settings; +import org.opensearch.sql.datasource.model.DataSource; +import org.opensearch.sql.datasource.model.DataSourceMetadata; +import org.opensearch.sql.datasource.model.DataSourceType; +import org.opensearch.sql.opensearch.client.OpenSearchClient; +import org.opensearch.sql.storage.DataSourceFactory; + +@RequiredArgsConstructor +public class OpenSearchDataSourceFactory implements DataSourceFactory { + + /** OpenSearch client connection. */ + private final OpenSearchClient client; + + private final Settings settings; + + @Override + public DataSourceType getDataSourceType() { + return DataSourceType.OPENSEARCH; + } + + @Override + public DataSource createDataSource(DataSourceMetadata metadata) { + return new DataSource(metadata.getName(), DataSourceType.OPENSEARCH, + new OpenSearchStorageEngine(client, settings)); + } +} diff --git a/opensearch/src/test/java/org/opensearch/sql/opensearch/storage/OpenSearchDataSourceFactoryTest.java b/opensearch/src/test/java/org/opensearch/sql/opensearch/storage/OpenSearchDataSourceFactoryTest.java new file mode 100644 index 0000000000..a9e4e153fc --- /dev/null +++ b/opensearch/src/test/java/org/opensearch/sql/opensearch/storage/OpenSearchDataSourceFactoryTest.java @@ -0,0 +1,51 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.opensearch.storage; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.mockito.Mockito.when; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; +import org.opensearch.sql.common.setting.Settings; +import org.opensearch.sql.datasource.model.DataSource; +import org.opensearch.sql.datasource.model.DataSourceMetadata; +import org.opensearch.sql.datasource.model.DataSourceType; +import org.opensearch.sql.opensearch.client.OpenSearchClient; + +@ExtendWith(MockitoExtension.class) +class OpenSearchDataSourceFactoryTest { + + @Mock private OpenSearchClient client; + + @Mock private Settings settings; + + @Mock private DataSourceMetadata dataSourceMetadata; + + private OpenSearchDataSourceFactory factory; + + @BeforeEach + public void setup() { + factory = new OpenSearchDataSourceFactory(client, settings); + } + + @Test + void getDataSourceType() { + assertEquals(DataSourceType.OPENSEARCH, factory.getDataSourceType()); + } + + @Test + void createDataSource() { + when(dataSourceMetadata.getName()).thenReturn("opensearch"); + + DataSource dataSource = factory.createDataSource(dataSourceMetadata); + assertEquals("opensearch", dataSource.getName()); + assertEquals(DataSourceType.OPENSEARCH, dataSource.getConnectorType()); + } +} diff --git a/plugin/src/main/java/org/opensearch/sql/plugin/SQLPlugin.java b/plugin/src/main/java/org/opensearch/sql/plugin/SQLPlugin.java index 137422d4ac..fab14966d8 100644 --- a/plugin/src/main/java/org/opensearch/sql/plugin/SQLPlugin.java +++ b/plugin/src/main/java/org/opensearch/sql/plugin/SQLPlugin.java @@ -5,13 +5,24 @@ package org.opensearch.sql.plugin; +import static org.opensearch.sql.datasource.model.DataSourceMetadata.defaultOpenSearchDataSourceMetadata; + +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.DeserializationFeature; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; +import java.io.IOException; +import java.io.InputStream; import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.List; import java.util.Objects; import java.util.function.Supplier; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import org.opensearch.action.ActionRequest; import org.opensearch.action.ActionResponse; import org.opensearch.action.ActionType; @@ -41,6 +52,9 @@ import org.opensearch.script.ScriptEngine; import org.opensearch.script.ScriptService; import org.opensearch.sql.datasource.DataSourceService; +import org.opensearch.sql.datasource.DataSourceServiceImpl; +import org.opensearch.sql.datasource.model.DataSource; +import org.opensearch.sql.datasource.model.DataSourceMetadata; import org.opensearch.sql.legacy.esdomain.LocalClusterState; import org.opensearch.sql.legacy.executor.AsyncRestExecutor; import org.opensearch.sql.legacy.metrics.Metrics; @@ -50,11 +64,10 @@ import org.opensearch.sql.opensearch.security.SecurityAccess; import org.opensearch.sql.opensearch.setting.LegacyOpenDistroSettings; import org.opensearch.sql.opensearch.setting.OpenSearchSettings; -import org.opensearch.sql.opensearch.storage.OpenSearchStorageEngine; +import org.opensearch.sql.opensearch.storage.OpenSearchDataSourceFactory; import org.opensearch.sql.opensearch.storage.script.ExpressionScriptEngine; import org.opensearch.sql.opensearch.storage.serialization.DefaultExpressionSerializer; import org.opensearch.sql.plugin.config.OpenSearchPluginConfig; -import org.opensearch.sql.plugin.datasource.DataSourceServiceImpl; import org.opensearch.sql.plugin.datasource.DataSourceSettings; import org.opensearch.sql.plugin.rest.RestPPLQueryAction; import org.opensearch.sql.plugin.rest.RestPPLStatsAction; @@ -63,8 +76,9 @@ import org.opensearch.sql.plugin.transport.TransportPPLQueryAction; import org.opensearch.sql.plugin.transport.TransportPPLQueryResponse; import org.opensearch.sql.ppl.config.PPLServiceConfig; +import org.opensearch.sql.prometheus.storage.PrometheusStorageFactory; import org.opensearch.sql.sql.config.SQLServiceConfig; -import org.opensearch.sql.storage.StorageEngine; +import org.opensearch.sql.storage.DataSourceFactory; import org.opensearch.threadpool.ExecutorBuilder; import org.opensearch.threadpool.FixedExecutorBuilder; import org.opensearch.threadpool.ThreadPool; @@ -73,6 +87,8 @@ public class SQLPlugin extends Plugin implements ActionPlugin, ScriptPlugin, ReloadablePlugin { + private static final Logger LOG = LogManager.getLogger(); + private ClusterService clusterService; /** @@ -84,6 +100,8 @@ public class SQLPlugin extends Plugin implements ActionPlugin, ScriptPlugin, Rel private AnnotationConfigApplicationContext applicationContext; + private DataSourceService dataSourceService; + public String name() { return "sql"; } @@ -142,9 +160,15 @@ public Collection createComponents( this.clusterService = clusterService; this.pluginSettings = new OpenSearchSettings(clusterService.getClusterSettings()); this.client = (NodeClient) client; - DataSourceServiceImpl.getInstance().loadConnectors(clusterService.getSettings()); - DataSourceServiceImpl.getInstance() - .registerDefaultOpenSearchDataSource(openSearchStorageEngine()); + this.dataSourceService = + new DataSourceServiceImpl( + new ImmutableSet.Builder() + .add(new OpenSearchDataSourceFactory( + new OpenSearchNodeClient(this.client), pluginSettings)) + .add(new PrometheusStorageFactory()) + .build()); + dataSourceService.addDataSource(defaultOpenSearchDataSourceMetadata()); + loadDataSources(dataSourceService, clusterService.getSettings()); LocalClusterState.state().setClusterService(clusterService); LocalClusterState.state().setPluginSettings((OpenSearchSettings) pluginSettings); @@ -156,7 +180,7 @@ public Collection createComponents( applicationContext.registerBean( org.opensearch.sql.common.setting.Settings.class, () -> pluginSettings); applicationContext.registerBean( - DataSourceService.class, () -> DataSourceServiceImpl.getInstance()); + DataSourceService.class, () -> dataSourceService); applicationContext.register(OpenSearchPluginConfig.class); applicationContext.register(PPLServiceConfig.class); applicationContext.register(SQLServiceConfig.class); @@ -196,13 +220,34 @@ public ScriptEngine getScriptEngine(Settings settings, Collection { + InputStream inputStream = DataSourceSettings.DATASOURCE_CONFIG.get(settings); + if (inputStream != null) { + ObjectMapper objectMapper = new ObjectMapper(); + objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); + try { + List metadataList = + objectMapper.readValue(inputStream, new TypeReference<>() {}); + dataSourceService.addDataSource(metadataList.toArray(new DataSourceMetadata[0])); + } catch (IOException e) { + LOG.error( + "DataSource Configuration File uploaded is malformed. Verify and re-upload.", e); + } catch (Throwable e) { + LOG.error("DataSource construction failed.", e); + } + } + return null; + }); } - } diff --git a/plugin/src/main/java/org/opensearch/sql/plugin/datasource/DataSourceServiceImpl.java b/plugin/src/main/java/org/opensearch/sql/plugin/datasource/DataSourceServiceImpl.java deleted file mode 100644 index be7cc70af0..0000000000 --- a/plugin/src/main/java/org/opensearch/sql/plugin/datasource/DataSourceServiceImpl.java +++ /dev/null @@ -1,191 +0,0 @@ -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - */ - -package org.opensearch.sql.plugin.datasource; - -import static org.opensearch.sql.analysis.DataSourceSchemaIdentifierNameResolver.DEFAULT_DATASOURCE_NAME; - -import com.fasterxml.jackson.core.type.TypeReference; -import com.fasterxml.jackson.databind.DeserializationFeature; -import com.fasterxml.jackson.databind.ObjectMapper; -import java.io.IOException; -import java.io.InputStream; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Objects; -import java.util.Set; -import org.apache.commons.lang3.StringUtils; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; -import org.opensearch.common.settings.Settings; -import org.opensearch.sql.datasource.DataSourceService; -import org.opensearch.sql.datasource.model.ConnectorType; -import org.opensearch.sql.datasource.model.DataSource; -import org.opensearch.sql.datasource.model.DataSourceMetadata; -import org.opensearch.sql.expression.function.BuiltinFunctionRepository; -import org.opensearch.sql.opensearch.security.SecurityAccess; -import org.opensearch.sql.prometheus.storage.PrometheusStorageFactory; -import org.opensearch.sql.storage.StorageEngine; -import org.opensearch.sql.storage.StorageEngineFactory; - -/** - * This class manages datasources and responsible for creating connectors to the datasources. - */ -public class DataSourceServiceImpl implements DataSourceService { - - private static final DataSourceServiceImpl INSTANCE = new DataSourceServiceImpl(); - - private static final String DATASOURCE_NAME_REGEX = "[@*A-Za-z]+?[*a-zA-Z_\\-0-9]*"; - - private static final Logger LOG = LogManager.getLogger(); - - private Map datasourceMap = new HashMap<>(); - - private final Map connectorTypeStorageEngineFactoryMap; - - public static DataSourceServiceImpl getInstance() { - return INSTANCE; - } - - private DataSourceServiceImpl() { - connectorTypeStorageEngineFactoryMap = new HashMap<>(); - PrometheusStorageFactory prometheusStorageFactory = new PrometheusStorageFactory(); - connectorTypeStorageEngineFactoryMap.put(prometheusStorageFactory.getConnectorType(), - prometheusStorageFactory); - } - - /** - * This function reads settings and loads connectors to the data stores. - * This will be invoked during start up and also when settings are updated. - * - * @param settings settings. - */ - public void loadConnectors(Settings settings) { - SecurityAccess.doPrivileged(() -> { - InputStream inputStream = DataSourceSettings.DATASOURCE_CONFIG.get(settings); - if (inputStream != null) { - ObjectMapper objectMapper = new ObjectMapper(); - objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); - try { - List dataSourceMetadataList = - objectMapper.readValue(inputStream, new TypeReference<>() { - }); - validateDataSourceMetadata(dataSourceMetadataList); - constructConnectors(dataSourceMetadataList); - } catch (IOException e) { - LOG.error("DataSources Configuration File uploaded is malformed. Verify and re-upload.", - e); - } catch (Throwable e) { - LOG.error("DataSource construction failed.", e); - } - } - return null; - }); - } - - @Override - public Set getDataSources() { - return new HashSet<>(datasourceMap.values()); - } - - @Override - public DataSource getDataSource(String dataSourceName) { - if (!datasourceMap.containsKey(dataSourceName)) { - throw new IllegalArgumentException( - String.format("DataSource with name %s doesn't exist.", dataSourceName)); - } - return datasourceMap.get(dataSourceName); - } - - - @Override - public void registerDefaultOpenSearchDataSource(StorageEngine storageEngine) { - if (storageEngine == null) { - throw new IllegalArgumentException("Default storage engine can't be null"); - } - datasourceMap.put(DEFAULT_DATASOURCE_NAME, - new DataSource(DEFAULT_DATASOURCE_NAME, ConnectorType.OPENSEARCH, storageEngine)); - registerFunctions(DEFAULT_DATASOURCE_NAME, storageEngine); - } - - private StorageEngine createStorageEngine(DataSourceMetadata dataSourceMetadata) { - ConnectorType connector = dataSourceMetadata.getConnector(); - switch (connector) { - case PROMETHEUS: - StorageEngine storageEngine = connectorTypeStorageEngineFactoryMap - .get(dataSourceMetadata.getConnector()) - .getStorageEngine(dataSourceMetadata.getName(), dataSourceMetadata.getProperties()); - registerFunctions(dataSourceMetadata.getName(), storageEngine); - return storageEngine; - default: - throw new IllegalStateException( - String.format("Unsupported Connector: %s", connector.name())); - } - } - - private void constructConnectors(List dataSourceMetadataList) { - datasourceMap = new HashMap<>(); - for (DataSourceMetadata dataSourceMetadata : dataSourceMetadataList) { - try { - String dataSourceName = dataSourceMetadata.getName(); - StorageEngine storageEngine = createStorageEngine(dataSourceMetadata); - datasourceMap.put(dataSourceName, - new DataSource(dataSourceMetadata.getName(), dataSourceMetadata.getConnector(), - storageEngine)); - } catch (Throwable e) { - LOG.error("DataSource : {} storage engine creation failed with the following message: {}", - dataSourceMetadata.getName(), e.getMessage(), e); - } - } - } - - /** - * This can be moved to a different validator class - * when we introduce more connectors. - * - * @param dataSourceMetadataList dataSourceMetadataList. - */ - private void validateDataSourceMetadata(List dataSourceMetadataList) { - - Set reviewedDataSources = new HashSet<>(); - for (DataSourceMetadata dataSourceMetadata : dataSourceMetadataList) { - - if (StringUtils.isEmpty(dataSourceMetadata.getName())) { - throw new IllegalArgumentException( - "Missing Name Field for a dataSource. Name is a required parameter."); - } - - if (!dataSourceMetadata.getName().matches(DATASOURCE_NAME_REGEX)) { - throw new IllegalArgumentException( - String.format("DataSource Name: %s contains illegal characters." - + " Allowed characters: a-zA-Z0-9_-*@ ", dataSourceMetadata.getName())); - } - - String dataSourceName = dataSourceMetadata.getName(); - if (reviewedDataSources.contains(dataSourceName)) { - throw new IllegalArgumentException("DataSources with same name are not allowed."); - } else { - reviewedDataSources.add(dataSourceName); - } - - if (Objects.isNull(dataSourceMetadata.getProperties())) { - throw new IllegalArgumentException("Missing properties field in dataSource configuration." - + "Properties are required parameters"); - } - - } - } - - // TODO: for now register storage engine functions here which should be static per storage engine - private void registerFunctions(String catalogName, StorageEngine storageEngine) { - storageEngine.getFunctions() - .forEach(functionResolver -> - BuiltinFunctionRepository.getInstance().register(catalogName, functionResolver)); - } - - -} diff --git a/plugin/src/test/java/org/opensearch/sql/plugin/datasource/DataSourceMetaDataTest.java b/plugin/src/test/java/org/opensearch/sql/plugin/datasource/DataSourceMetaDataTest.java new file mode 100644 index 0000000000..dc6eadbf7e --- /dev/null +++ b/plugin/src/test/java/org/opensearch/sql/plugin/datasource/DataSourceMetaDataTest.java @@ -0,0 +1,129 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.plugin.datasource; + +import static org.junit.Assert.assertEquals; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +import com.google.common.collect.ImmutableMap; +import java.io.IOException; +import java.net.URISyntaxException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import lombok.SneakyThrows; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.ArgumentCaptor; +import org.mockito.Mock; +import org.mockito.junit.MockitoJUnitRunner; +import org.opensearch.common.settings.MockSecureSettings; +import org.opensearch.common.settings.Settings; +import org.opensearch.sql.datasource.DataSourceService; +import org.opensearch.sql.datasource.model.DataSourceMetadata; +import org.opensearch.sql.datasource.model.DataSourceType; +import org.opensearch.sql.plugin.SQLPlugin; + +@RunWith(MockitoJUnitRunner.class) +public class DataSourceMetaDataTest { + + public static final String DATASOURCE_SETTING_METADATA_KEY = + "plugins.query.federation.datasources.config"; + + @Mock + private DataSourceService dataSourceService; + + @SneakyThrows + @Test + public void testLoadConnectors() { + Settings settings = getDataSourceSettings("datasources.json"); + loadConnectors(settings); + List expected = + new ArrayList<>() { + { + add( + metadata( + "prometheus", + DataSourceType.PROMETHEUS, + ImmutableMap.of( + "prometheus.uri", "http://localhost:9090", + "prometheus.auth.type", "basicauth", + "prometheus.auth.username", "admin", + "prometheus.auth.password", "type"))); + } + }; + + verifyAddDataSourceWithMetadata(expected); + } + + @SneakyThrows + @Test + public void testLoadConnectorsWithMultipleDataSources() { + Settings settings = getDataSourceSettings("multiple_datasources.json"); + loadConnectors(settings); + List expected = new ArrayList<>() {{ + add(metadata("prometheus", DataSourceType.PROMETHEUS, ImmutableMap.of( + "prometheus.uri", "http://localhost:9090", + "prometheus.auth.type", "basicauth", + "prometheus.auth.username", "admin", + "prometheus.auth.password", "type" + ))); + add(metadata("prometheus-1", DataSourceType.PROMETHEUS, ImmutableMap.of( + "prometheus.uri", "http://localhost:9090", + "prometheus.auth.type", "awssigv4", + "prometheus.auth.region", "us-east-1", + "prometheus.auth.access_key", "accessKey", + "prometheus.auth.secret_key", "secretKey" + ))); + }}; + + verifyAddDataSourceWithMetadata(expected); + } + + @SneakyThrows + @Test + public void testLoadConnectorsWithMalformedJson() { + Settings settings = getDataSourceSettings("malformed_datasources.json"); + loadConnectors(settings); + + verify(dataSourceService, never()).addDataSource(any()); + } + + private Settings getDataSourceSettings(String filename) throws URISyntaxException, IOException { + MockSecureSettings mockSecureSettings = new MockSecureSettings(); + ClassLoader classLoader = getClass().getClassLoader(); + Path filepath = Paths.get(classLoader.getResource(filename).toURI()); + mockSecureSettings.setFile(DATASOURCE_SETTING_METADATA_KEY, Files.readAllBytes(filepath)); + return Settings.builder().setSecureSettings(mockSecureSettings).build(); + } + + void loadConnectors(Settings settings) { + SQLPlugin.loadDataSources(dataSourceService, settings); + } + + void verifyAddDataSourceWithMetadata(List metadataList) { + ArgumentCaptor metadataCaptor = + ArgumentCaptor.forClass(DataSourceMetadata.class); + verify(dataSourceService, times(1)).addDataSource(metadataCaptor.capture()); + List actualValues = metadataCaptor.getAllValues(); + assertEquals(metadataList.size(), actualValues.size()); + assertEquals(metadataList, actualValues); + } + + DataSourceMetadata metadata(String name, DataSourceType type, Map properties) { + DataSourceMetadata dataSourceMetadata = new DataSourceMetadata(); + dataSourceMetadata.setName(name); + dataSourceMetadata.setConnector(type); + dataSourceMetadata.setProperties(properties); + return dataSourceMetadata; + } +} diff --git a/plugin/src/test/java/org/opensearch/sql/plugin/datasource/DataSourceServiceImplTest.java b/plugin/src/test/java/org/opensearch/sql/plugin/datasource/DataSourceServiceImplTest.java deleted file mode 100644 index e53cdd64c3..0000000000 --- a/plugin/src/test/java/org/opensearch/sql/plugin/datasource/DataSourceServiceImplTest.java +++ /dev/null @@ -1,141 +0,0 @@ -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - */ - -package org.opensearch.sql.plugin.datasource; - -import static org.opensearch.sql.analysis.DataSourceSchemaIdentifierNameResolver.DEFAULT_DATASOURCE_NAME; - -import java.io.IOException; -import java.net.URISyntaxException; -import java.nio.file.Files; -import java.nio.file.Path; -import java.nio.file.Paths; -import java.util.HashSet; -import java.util.Set; -import lombok.SneakyThrows; -import org.junit.Assert; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.mockito.Mock; -import org.mockito.junit.MockitoJUnitRunner; -import org.opensearch.common.settings.MockSecureSettings; -import org.opensearch.common.settings.Settings; -import org.opensearch.sql.datasource.model.ConnectorType; -import org.opensearch.sql.datasource.model.DataSource; -import org.opensearch.sql.storage.StorageEngine; - -@RunWith(MockitoJUnitRunner.class) -public class DataSourceServiceImplTest { - - public static final String DATASOURCE_SETTING_METADATA_KEY = - "plugins.query.federation.datasources.config"; - - @Mock - private StorageEngine storageEngine; - - @SneakyThrows - @Test - public void testLoadConnectors() { - Settings settings = getDataSourceSettings("datasources.json"); - DataSourceServiceImpl.getInstance().loadConnectors(settings); - Set expected = new HashSet<>() {{ - add(new DataSource("prometheus", ConnectorType.PROMETHEUS, storageEngine)); - }}; - Assert.assertEquals(expected, DataSourceServiceImpl.getInstance().getDataSources()); - } - - - @SneakyThrows - @Test - public void testLoadConnectorsWithMultipleDataSources() { - Settings settings = getDataSourceSettings("multiple_datasources.json"); - DataSourceServiceImpl.getInstance().loadConnectors(settings); - Set expected = new HashSet<>() {{ - add(new DataSource("prometheus", ConnectorType.PROMETHEUS, storageEngine)); - add(new DataSource("prometheus-1", ConnectorType.PROMETHEUS, storageEngine)); - }}; - Assert.assertEquals(expected, DataSourceServiceImpl.getInstance().getDataSources()); - } - - @SneakyThrows - @Test - public void testLoadConnectorsWithMissingName() { - Settings settings = getDataSourceSettings("datasource_missing_name.json"); - Set expected = DataSourceServiceImpl.getInstance().getDataSources(); - DataSourceServiceImpl.getInstance().loadConnectors(settings); - Assert.assertEquals(expected, DataSourceServiceImpl.getInstance().getDataSources()); - } - - @SneakyThrows - @Test - public void testLoadConnectorsWithDuplicateDataSourceNames() { - Settings settings = getDataSourceSettings("duplicate_datasource_names.json"); - Set expected = DataSourceServiceImpl.getInstance().getDataSources(); - DataSourceServiceImpl.getInstance().loadConnectors(settings); - Assert.assertEquals(expected, DataSourceServiceImpl.getInstance().getDataSources()); - } - - @SneakyThrows - @Test - public void testLoadConnectorsWithMalformedJson() { - Settings settings = getDataSourceSettings("malformed_datasources.json"); - Set expected = DataSourceServiceImpl.getInstance().getDataSources(); - DataSourceServiceImpl.getInstance().loadConnectors(settings); - Assert.assertEquals(expected, DataSourceServiceImpl.getInstance().getDataSources()); - } - - @SneakyThrows - @Test - public void testGetStorageEngineAfterGetDataSources() { - Settings settings = getDataSourceSettings("empty_datasource.json"); - DataSourceServiceImpl.getInstance().loadConnectors(settings); - DataSourceServiceImpl.getInstance().registerDefaultOpenSearchDataSource(storageEngine); - Set expected = new HashSet<>(); - expected.add(new DataSource(DEFAULT_DATASOURCE_NAME, ConnectorType.OPENSEARCH, storageEngine)); - Assert.assertEquals(expected, DataSourceServiceImpl.getInstance().getDataSources()); - Assert.assertEquals(storageEngine, - DataSourceServiceImpl.getInstance() - .getDataSource(DEFAULT_DATASOURCE_NAME).getStorageEngine()); - Assert.assertEquals(expected, DataSourceServiceImpl.getInstance().getDataSources()); - Assert.assertEquals(storageEngine, - DataSourceServiceImpl.getInstance() - .getDataSource(DEFAULT_DATASOURCE_NAME).getStorageEngine()); - IllegalArgumentException illegalArgumentException - = Assert.assertThrows(IllegalArgumentException.class, - () -> DataSourceServiceImpl.getInstance().getDataSource("test")); - Assert.assertEquals("DataSource with name test doesn't exist.", - illegalArgumentException.getMessage()); - } - - - @SneakyThrows - @Test - public void testGetStorageEngineAfterLoadingConnectors() { - Settings settings = getDataSourceSettings("empty_datasource.json"); - DataSourceServiceImpl.getInstance().registerDefaultOpenSearchDataSource(storageEngine); - //Load Connectors will empty the dataSourceMap.So OpenSearch Storage Engine - DataSourceServiceImpl.getInstance().loadConnectors(settings); - Set expected = new HashSet<>(); - Assert.assertEquals(expected, DataSourceServiceImpl.getInstance().getDataSources()); - } - - @SneakyThrows - @Test - public void testLoadConnectorsWithIllegalDataSourceNames() { - Settings settings = getDataSourceSettings("illegal_datasource_name.json"); - Set expected = DataSourceServiceImpl.getInstance().getDataSources(); - DataSourceServiceImpl.getInstance().loadConnectors(settings); - Assert.assertEquals(expected, DataSourceServiceImpl.getInstance().getDataSources()); - } - - private Settings getDataSourceSettings(String filename) throws URISyntaxException, IOException { - MockSecureSettings mockSecureSettings = new MockSecureSettings(); - ClassLoader classLoader = getClass().getClassLoader(); - Path filepath = Paths.get(classLoader.getResource(filename).toURI()); - mockSecureSettings.setFile(DATASOURCE_SETTING_METADATA_KEY, Files.readAllBytes(filepath)); - return Settings.builder().setSecureSettings(mockSecureSettings).build(); - } - -} diff --git a/plugin/src/test/resources/datasource_missing_name.json b/plugin/src/test/resources/datasource_missing_name.json deleted file mode 100644 index 4491ebb0db..0000000000 --- a/plugin/src/test/resources/datasource_missing_name.json +++ /dev/null @@ -1,11 +0,0 @@ -[ - { - "connector": "prometheus", - "properties" : { - "prometheus.uri" : "http://localhost:9090", - "prometheus.auth.type" : "basicauth", - "prometheus.auth.username" : "admin", - "prometheus.auth.password" : "type" - } - } -] \ No newline at end of file diff --git a/plugin/src/test/resources/duplicate_datasource_names.json b/plugin/src/test/resources/duplicate_datasource_names.json deleted file mode 100644 index eefc56b6ef..0000000000 --- a/plugin/src/test/resources/duplicate_datasource_names.json +++ /dev/null @@ -1,22 +0,0 @@ -[ - { - "name" : "prometheus", - "connector": "prometheus", - "properties" : { - "prometheus.uri" : "http://localhost:9090", - "prometheus.auth.type" : "basicauth", - "prometheus.auth.username" : "admin", - "prometheus.auth.password" : "type" - } - }, - { - "name" : "prometheus", - "connector": "prometheus", - "properties" : { - "prometheus.uri" : "http://localhost:9090", - "prometheus.auth.type" : "basicauth", - "prometheus.auth.username" : "admin", - "prometheus.auth.password" : "type" - } - } -] \ No newline at end of file diff --git a/plugin/src/test/resources/empty_datasource.json b/plugin/src/test/resources/empty_datasource.json deleted file mode 100644 index 0637a088a0..0000000000 --- a/plugin/src/test/resources/empty_datasource.json +++ /dev/null @@ -1 +0,0 @@ -[] \ No newline at end of file diff --git a/plugin/src/test/resources/illegal_datasource_name.json b/plugin/src/test/resources/illegal_datasource_name.json deleted file mode 100644 index 212ca6ec93..0000000000 --- a/plugin/src/test/resources/illegal_datasource_name.json +++ /dev/null @@ -1,12 +0,0 @@ -[ - { - "name" : "prometheus.test", - "connector": "prometheus", - "properties" : { - "prometheus.uri" : "http://localhost:9090", - "prometheus.auth.type" : "basicauth", - "prometheus.auth.username" : "admin", - "prometheus.auth.password" : "type" - } - } -] \ No newline at end of file diff --git a/prometheus/src/main/java/org/opensearch/sql/prometheus/storage/PrometheusStorageFactory.java b/prometheus/src/main/java/org/opensearch/sql/prometheus/storage/PrometheusStorageFactory.java index 6aec951b3d..4e8b30af2f 100644 --- a/prometheus/src/main/java/org/opensearch/sql/prometheus/storage/PrometheusStorageFactory.java +++ b/prometheus/src/main/java/org/opensearch/sql/prometheus/storage/PrometheusStorageFactory.java @@ -14,16 +14,18 @@ import java.util.Set; import java.util.concurrent.TimeUnit; import okhttp3.OkHttpClient; -import org.opensearch.sql.datasource.model.ConnectorType; +import org.opensearch.sql.datasource.model.DataSource; +import org.opensearch.sql.datasource.model.DataSourceMetadata; +import org.opensearch.sql.datasource.model.DataSourceType; import org.opensearch.sql.datasource.model.auth.AuthenticationType; import org.opensearch.sql.prometheus.authinterceptors.AwsSigningInterceptor; import org.opensearch.sql.prometheus.authinterceptors.BasicAuthenticationInterceptor; import org.opensearch.sql.prometheus.client.PrometheusClient; import org.opensearch.sql.prometheus.client.PrometheusClientImpl; +import org.opensearch.sql.storage.DataSourceFactory; import org.opensearch.sql.storage.StorageEngine; -import org.opensearch.sql.storage.StorageEngineFactory; -public class PrometheusStorageFactory implements StorageEngineFactory { +public class PrometheusStorageFactory implements DataSourceFactory { public static final String URI = "prometheus.uri"; public static final String AUTH_TYPE = "prometheus.auth.type"; @@ -33,14 +35,20 @@ public class PrometheusStorageFactory implements StorageEngineFactory { public static final String ACCESS_KEY = "prometheus.auth.access_key"; public static final String SECRET_KEY = "prometheus.auth.secret_key"; - @Override - public ConnectorType getConnectorType() { - return ConnectorType.PROMETHEUS; + public DataSourceType getDataSourceType() { + return DataSourceType.PROMETHEUS; } @Override - public StorageEngine getStorageEngine(String catalogName, Map requiredConfig) { + public DataSource createDataSource(DataSourceMetadata metadata) { + return new DataSource( + metadata.getName(), + DataSourceType.PROMETHEUS, + getStorageEngine(metadata.getName(), metadata.getProperties())); + } + + StorageEngine getStorageEngine(String catalogName, Map requiredConfig) { validateFieldsInConfig(requiredConfig, Set.of(URI)); PrometheusClient prometheusClient; try { diff --git a/prometheus/src/test/java/org/opensearch/sql/prometheus/storage/PrometheusStorageFactoryTest.java b/prometheus/src/test/java/org/opensearch/sql/prometheus/storage/PrometheusStorageFactoryTest.java index d2f445a093..91cb8df1ea 100644 --- a/prometheus/src/test/java/org/opensearch/sql/prometheus/storage/PrometheusStorageFactoryTest.java +++ b/prometheus/src/test/java/org/opensearch/sql/prometheus/storage/PrometheusStorageFactoryTest.java @@ -13,7 +13,9 @@ import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.junit.jupiter.MockitoExtension; -import org.opensearch.sql.datasource.model.ConnectorType; +import org.opensearch.sql.datasource.model.DataSource; +import org.opensearch.sql.datasource.model.DataSourceMetadata; +import org.opensearch.sql.datasource.model.DataSourceType; import org.opensearch.sql.storage.StorageEngine; @ExtendWith(MockitoExtension.class) @@ -22,7 +24,8 @@ public class PrometheusStorageFactoryTest { @Test void testGetConnectorType() { PrometheusStorageFactory prometheusStorageFactory = new PrometheusStorageFactory(); - Assertions.assertEquals(ConnectorType.PROMETHEUS, prometheusStorageFactory.getConnectorType()); + Assertions.assertEquals( + DataSourceType.PROMETHEUS, prometheusStorageFactory.getDataSourceType()); } @Test @@ -130,6 +133,21 @@ void testGetStorageEngineWithInvalidURISyntax() { exception.getMessage().contains("Prometheus Client creation failed due to:")); } + @Test + void createDataSourceSuccess() { + HashMap properties = new HashMap<>(); + properties.put("prometheus.uri", "http://dummyprometheus:9090"); + properties.put("prometheus.auth.type", "basicauth"); + properties.put("prometheus.auth.username", "admin"); + properties.put("prometheus.auth.password", "admin"); + DataSourceMetadata metadata = new DataSourceMetadata(); + metadata.setName("prometheus"); + metadata.setConnector(DataSourceType.PROMETHEUS); + metadata.setProperties(properties); + + DataSource dataSource = new PrometheusStorageFactory().createDataSource(metadata); + Assertions.assertTrue(dataSource.getStorageEngine() instanceof PrometheusStorageEngine); + } }