diff --git a/core/src/main/java/org/opensearch/sql/datasource/DataSourceLoaderCache.java b/core/src/main/java/org/opensearch/sql/datasource/DataSourceLoaderCache.java new file mode 100644 index 0000000000..cce70fe584 --- /dev/null +++ b/core/src/main/java/org/opensearch/sql/datasource/DataSourceLoaderCache.java @@ -0,0 +1,20 @@ +package org.opensearch.sql.datasource; + +import org.opensearch.sql.datasource.model.DataSource; +import org.opensearch.sql.datasource.model.DataSourceMetadata; + +/** + * Interface for DataSourceLoaderCache which provides methods for + * fetch, loading and invalidating DataSource cache. + */ +public interface DataSourceLoaderCache { + + /** + * Returns cached datasource object or loads a new one if not present. + * + * @param dataSourceMetadata {@link DataSourceMetadata}. + * @return {@link DataSource} + */ + DataSource getOrLoadDataSource(DataSourceMetadata dataSourceMetadata); + +} diff --git a/core/src/main/java/org/opensearch/sql/datasource/DataSourceLoaderCacheImpl.java b/core/src/main/java/org/opensearch/sql/datasource/DataSourceLoaderCacheImpl.java new file mode 100644 index 0000000000..56b7bec08b --- /dev/null +++ b/core/src/main/java/org/opensearch/sql/datasource/DataSourceLoaderCacheImpl.java @@ -0,0 +1,50 @@ +package org.opensearch.sql.datasource; + +import com.google.common.cache.Cache; +import com.google.common.cache.CacheBuilder; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; +import org.opensearch.sql.datasource.model.DataSource; +import org.opensearch.sql.datasource.model.DataSourceMetadata; +import org.opensearch.sql.datasource.model.DataSourceType; +import org.opensearch.sql.storage.DataSourceFactory; + +/** + * Default implementation of DataSourceLoaderCache. This implementation + * utilizes Google Guava Cache {@link Cache} for caching DataSource objects + * against {@link DataSourceMetadata}. Expires the cache objects every 24 hrs after + * the last access. + */ +public class DataSourceLoaderCacheImpl implements DataSourceLoaderCache { + private final Map dataSourceFactoryMap; + private final Cache dataSourceCache; + + /** + * DataSourceLoaderCacheImpl constructor. + * + * @param dataSourceFactorySet set of {@link DataSourceFactory}. + */ + public DataSourceLoaderCacheImpl(Set dataSourceFactorySet) { + this.dataSourceFactoryMap = dataSourceFactorySet.stream() + .collect(Collectors.toMap(DataSourceFactory::getDataSourceType, f -> f)); + this.dataSourceCache = CacheBuilder.newBuilder() + .maximumSize(1000) + .expireAfterAccess(24, TimeUnit.HOURS) + .build(); + } + + @Override + public DataSource getOrLoadDataSource(DataSourceMetadata dataSourceMetadata) { + DataSource dataSource = this.dataSourceCache.getIfPresent(dataSourceMetadata); + if (dataSource == null) { + dataSource = this.dataSourceFactoryMap.get(dataSourceMetadata.getConnector()) + .createDataSource(dataSourceMetadata); + this.dataSourceCache.put(dataSourceMetadata, dataSource); + return dataSource; + } + return dataSource; + } + +} diff --git a/core/src/main/java/org/opensearch/sql/datasource/DataSourceService.java b/core/src/main/java/org/opensearch/sql/datasource/DataSourceService.java index f621ce5c55..9167737a70 100644 --- a/core/src/main/java/org/opensearch/sql/datasource/DataSourceService.java +++ b/core/src/main/java/org/opensearch/sql/datasource/DataSourceService.java @@ -27,9 +27,21 @@ public interface DataSourceService { * Returns all dataSource Metadata objects. The returned objects won't contain * any of the credential info. * + * @param isDefaultDataSourceRequired is used to specify + * if default opensearch connector is required in the output list. * @return set of {@link DataSourceMetadata}. */ - Set getDataSourceMetadataSet(); + Set getDataSourceMetadata(boolean isDefaultDataSourceRequired); + + + /** + * Returns dataSourceMetadata object with specific name. + * The returned objects won't contain any crendetial info. + * + * @param name name of the {@link DataSource}. + * @return set of {@link DataSourceMetadata}. + */ + DataSourceMetadata getDataSourceMetadata(String name); /** * Register {@link DataSource} defined by {@link DataSourceMetadata}. diff --git a/core/src/main/java/org/opensearch/sql/datasource/DataSourceServiceImpl.java b/core/src/main/java/org/opensearch/sql/datasource/DataSourceServiceImpl.java index 51bad94af8..bc3f2d0abf 100644 --- a/core/src/main/java/org/opensearch/sql/datasource/DataSourceServiceImpl.java +++ b/core/src/main/java/org/opensearch/sql/datasource/DataSourceServiceImpl.java @@ -9,6 +9,10 @@ import com.google.common.base.Preconditions; import com.google.common.base.Strings; +import com.google.common.cache.CacheBuilder; +import com.google.common.cache.CacheLoader; +import com.google.common.cache.LoadingCache; +import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -17,7 +21,9 @@ import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.stream.Collectors; +import javax.xml.crypto.Data; import org.opensearch.sql.common.utils.StringUtils; +import org.opensearch.sql.datasource.exceptions.DataSourceNotFoundException; import org.opensearch.sql.datasource.model.DataSource; import org.opensearch.sql.datasource.model.DataSourceMetadata; import org.opensearch.sql.datasource.model.DataSourceType; @@ -35,9 +41,7 @@ public class DataSourceServiceImpl implements DataSourceService { private static String DATASOURCE_NAME_REGEX = "[@*A-Za-z]+?[*a-zA-Z_\\-0-9]*"; - private final ConcurrentHashMap dataSourceMap; - - private final Map dataSourceFactoryMap; + private final DataSourceLoaderCache dataSourceLoaderCache; private final DataSourceMetadataStorage dataSourceMetadataStorage; @@ -50,36 +54,48 @@ public DataSourceServiceImpl(Set dataSourceFactories, DataSourceMetadataStorage dataSourceMetadataStorage, DataSourceUserAuthorizationHelper dataSourceUserAuthorizationHelper) { - dataSourceFactoryMap = - dataSourceFactories.stream() - .collect(Collectors.toMap(DataSourceFactory::getDataSourceType, f -> f)); - dataSourceMap = new ConcurrentHashMap<>(); this.dataSourceMetadataStorage = dataSourceMetadataStorage; this.dataSourceUserAuthorizationHelper = dataSourceUserAuthorizationHelper; + this.dataSourceLoaderCache = new DataSourceLoaderCacheImpl(dataSourceFactories); } @Override - public Set getDataSourceMetadataSet() { + public Set getDataSourceMetadata(boolean isDefaultDataSourceRequired) { List dataSourceMetadataList = this.dataSourceMetadataStorage.getDataSourceMetadata(); Set dataSourceMetadataSet = new HashSet<>(dataSourceMetadataList); - dataSourceMetadataSet.add(DataSourceMetadata.defaultOpenSearchDataSourceMetadata()); + if (isDefaultDataSourceRequired) { + dataSourceMetadataSet.add(DataSourceMetadata.defaultOpenSearchDataSourceMetadata()); + } + removeAuthInfo(dataSourceMetadataSet); return dataSourceMetadataSet; } + @Override + public DataSourceMetadata getDataSourceMetadata(String datasourceName) { + Optional dataSourceMetadataOptional + = getDataSourceMetadataFromName(datasourceName); + if (dataSourceMetadataOptional.isEmpty()) { + throw new IllegalArgumentException("DataSource with name: " + datasourceName + + " doesn't exist."); + } + removeAuthInfo(dataSourceMetadataOptional.get()); + return dataSourceMetadataOptional.get(); + } + @Override public DataSource getDataSource(String dataSourceName) { Optional - dataSourceMetadataOptional = getDataSourceMetadata(dataSourceName); + dataSourceMetadataOptional = getDataSourceMetadataFromName(dataSourceName); if (dataSourceMetadataOptional.isEmpty()) { - throw new IllegalArgumentException( + throw new DataSourceNotFoundException( String.format("DataSource with name %s doesn't exist.", dataSourceName)); } else { DataSourceMetadata dataSourceMetadata = dataSourceMetadataOptional.get(); this.dataSourceUserAuthorizationHelper .authorizeDataSource(dataSourceMetadata); - return getDataSourceFromMetadata(dataSourceMetadata); + return dataSourceLoaderCache.getOrLoadDataSource(dataSourceMetadata); } } @@ -87,20 +103,31 @@ public DataSource getDataSource(String dataSourceName) { public void createDataSource(DataSourceMetadata metadata) { validateDataSourceMetaData(metadata); if (!metadata.getName().equals(DEFAULT_DATASOURCE_NAME)) { + this.dataSourceLoaderCache.getOrLoadDataSource(metadata); this.dataSourceMetadataStorage.createDataSourceMetadata(metadata); } - dataSourceMap.put(metadata, - dataSourceFactoryMap.get(metadata.getConnector()).createDataSource(metadata)); } @Override public void updateDataSource(DataSourceMetadata dataSourceMetadata) { - throw new UnsupportedOperationException("will be supported in future"); + validateDataSourceMetaData(dataSourceMetadata); + if (!dataSourceMetadata.getName().equals(DEFAULT_DATASOURCE_NAME)) { + this.dataSourceLoaderCache.getOrLoadDataSource(dataSourceMetadata); + this.dataSourceMetadataStorage.updateDataSourceMetadata(dataSourceMetadata); + } else { + throw new UnsupportedOperationException( + "Not allowed to update default datasource :" + DEFAULT_DATASOURCE_NAME); + } } @Override public void deleteDataSource(String dataSourceName) { - throw new UnsupportedOperationException("will be supported in future"); + if (dataSourceName.equals(DEFAULT_DATASOURCE_NAME)) { + throw new UnsupportedOperationException( + "Not allowed to delete default datasource :" + DEFAULT_DATASOURCE_NAME); + } else { + this.dataSourceMetadataStorage.deleteDataSourceMetadata(dataSourceName); + } } @Override @@ -130,7 +157,7 @@ private void validateDataSourceMetaData(DataSourceMetadata metadata) { + " Properties are required parameters."); } - private Optional getDataSourceMetadata(String dataSourceName) { + private Optional getDataSourceMetadataFromName(String dataSourceName) { if (dataSourceName.equals(DEFAULT_DATASOURCE_NAME)) { return Optional.of(DataSourceMetadata.defaultOpenSearchDataSourceMetadata()); } else { @@ -138,19 +165,19 @@ private Optional getDataSourceMetadata(String dataSourceName } } - private DataSource getDataSourceFromMetadata(DataSourceMetadata dataSourceMetadata) { - if (!dataSourceMap.containsKey(dataSourceMetadata)) { - clearDataSource(dataSourceMetadata); - dataSourceMap.put(dataSourceMetadata, - dataSourceFactoryMap.get(dataSourceMetadata.getConnector()) - .createDataSource(dataSourceMetadata)); - } - return dataSourceMap.get(dataSourceMetadata); - } - private void clearDataSource(DataSourceMetadata dataSourceMetadata) { - dataSourceMap.entrySet() - .removeIf(entry -> entry.getKey().getName().equals(dataSourceMetadata.getName())); + // It is advised to avoid sending any kind credential + // info in api response from security point of view. + private void removeAuthInfo(Set dataSourceMetadataSet) { + dataSourceMetadataSet.forEach(this::removeAuthInfo); } + private void removeAuthInfo(DataSourceMetadata dataSourceMetadata) { + HashMap safeProperties + = new HashMap<>(dataSourceMetadata.getProperties()); + safeProperties + .entrySet() + .removeIf(entry -> entry.getKey().contains("auth")); + dataSourceMetadata.setProperties(safeProperties); + } } diff --git a/core/src/main/java/org/opensearch/sql/datasource/exceptions/DataSourceNotFoundException.java b/core/src/main/java/org/opensearch/sql/datasource/exceptions/DataSourceNotFoundException.java new file mode 100644 index 0000000000..0a068ccdfc --- /dev/null +++ b/core/src/main/java/org/opensearch/sql/datasource/exceptions/DataSourceNotFoundException.java @@ -0,0 +1,18 @@ +/* + * + * * Copyright OpenSearch Contributors + * * SPDX-License-Identifier: Apache-2.0 + * + */ + +package org.opensearch.sql.datasource.exceptions; + +/** + * DataSourceNotFoundException. + */ +public class DataSourceNotFoundException extends RuntimeException { + public DataSourceNotFoundException(String message) { + super(message); + } + +} diff --git a/core/src/main/java/org/opensearch/sql/planner/physical/datasource/DataSourceTableScan.java b/core/src/main/java/org/opensearch/sql/planner/physical/datasource/DataSourceTableScan.java index 60969f4d54..93e65054b5 100644 --- a/core/src/main/java/org/opensearch/sql/planner/physical/datasource/DataSourceTableScan.java +++ b/core/src/main/java/org/opensearch/sql/planner/physical/datasource/DataSourceTableScan.java @@ -48,7 +48,7 @@ public String explain() { public void open() { List exprValues = new ArrayList<>(); Set dataSourceMetadataSet - = dataSourceService.getDataSourceMetadataSet(); + = dataSourceService.getDataSourceMetadata(true); for (DataSourceMetadata dataSourceMetadata : dataSourceMetadataSet) { exprValues.add( new ExprTupleValue(new LinkedHashMap<>(ImmutableMap.of( diff --git a/core/src/main/java/org/opensearch/sql/storage/DataSourceFactory.java b/core/src/main/java/org/opensearch/sql/storage/DataSourceFactory.java index 20d263e601..d0f24d0e5a 100644 --- a/core/src/main/java/org/opensearch/sql/storage/DataSourceFactory.java +++ b/core/src/main/java/org/opensearch/sql/storage/DataSourceFactory.java @@ -7,6 +7,7 @@ package org.opensearch.sql.storage; +import java.util.Map; import org.opensearch.sql.datasource.DataSourceService; import org.opensearch.sql.datasource.model.DataSource; import org.opensearch.sql.datasource.model.DataSourceMetadata; @@ -28,4 +29,5 @@ public interface DataSourceFactory { * Create {@link DataSource}. */ DataSource createDataSource(DataSourceMetadata metadata); + } diff --git a/core/src/test/java/org/opensearch/sql/analysis/AnalyzerTestBase.java b/core/src/test/java/org/opensearch/sql/analysis/AnalyzerTestBase.java index a040e2a53f..2ec411ba54 100644 --- a/core/src/test/java/org/opensearch/sql/analysis/AnalyzerTestBase.java +++ b/core/src/test/java/org/opensearch/sql/analysis/AnalyzerTestBase.java @@ -185,13 +185,18 @@ private class DefaultDataSourceService implements DataSourceService { @Override - public Set getDataSourceMetadataSet() { + public Set getDataSourceMetadata(boolean isDefaultDataSourceRequired) { return Stream.of(opensearchDataSource, prometheusDataSource) .map(ds -> new DataSourceMetadata(ds.getName(), ds.getConnectorType(),Collections.emptyList(), ImmutableMap.of())).collect(Collectors.toSet()); } + @Override + public DataSourceMetadata getDataSourceMetadata(String name) { + return null; + } + @Override public void createDataSource(DataSourceMetadata metadata) { throw new UnsupportedOperationException("unsupported operation"); diff --git a/core/src/test/java/org/opensearch/sql/datasource/DataSourceLoaderCacheImplTest.java b/core/src/test/java/org/opensearch/sql/datasource/DataSourceLoaderCacheImplTest.java new file mode 100644 index 0000000000..fae69e7feb --- /dev/null +++ b/core/src/test/java/org/opensearch/sql/datasource/DataSourceLoaderCacheImplTest.java @@ -0,0 +1,85 @@ +package org.opensearch.sql.datasource; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.lenient; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoMoreInteractions; +import static org.mockito.Mockito.when; + +import com.google.common.collect.ImmutableMap; +import java.util.Collections; +import java.util.List; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; +import org.opensearch.sql.datasource.model.DataSource; +import org.opensearch.sql.datasource.model.DataSourceMetadata; +import org.opensearch.sql.datasource.model.DataSourceType; +import org.opensearch.sql.storage.DataSourceFactory; +import org.opensearch.sql.storage.StorageEngine; + +@ExtendWith(MockitoExtension.class) +class DataSourceLoaderCacheImplTest { + + @Mock + private DataSourceFactory dataSourceFactory; + + @Mock + private StorageEngine storageEngine; + + @BeforeEach + public void setup() { + lenient() + .doAnswer( + invocation -> { + DataSourceMetadata metadata = invocation.getArgument(0); + return new DataSource(metadata.getName(), metadata.getConnector(), storageEngine); + }) + .when(dataSourceFactory) + .createDataSource(any()); + when(dataSourceFactory.getDataSourceType()).thenReturn(DataSourceType.OPENSEARCH); + } + + @Test + void testGetOrLoadDataSource() { + DataSourceLoaderCache dataSourceLoaderCache = + new DataSourceLoaderCacheImpl(Collections.singleton(dataSourceFactory)); + DataSourceMetadata dataSourceMetadata = new DataSourceMetadata(); + dataSourceMetadata.setName("testDS"); + dataSourceMetadata.setConnector(DataSourceType.OPENSEARCH); + dataSourceMetadata.setAllowedRoles(Collections.emptyList()); + dataSourceMetadata.setProperties(ImmutableMap.of()); + DataSource dataSource = dataSourceLoaderCache.getOrLoadDataSource(dataSourceMetadata); + verify(dataSourceFactory, times(1)).createDataSource(dataSourceMetadata); + Assertions.assertEquals(dataSource, + dataSourceLoaderCache.getOrLoadDataSource(dataSourceMetadata)); + verifyNoMoreInteractions(dataSourceFactory); + } + + @Test + void testGetOrLoadDataSourceWithMetadataUpdate() { + DataSourceLoaderCache dataSourceLoaderCache = + new DataSourceLoaderCacheImpl(Collections.singleton(dataSourceFactory)); + DataSourceMetadata dataSourceMetadata = getMetadata(); + dataSourceLoaderCache.getOrLoadDataSource(dataSourceMetadata); + dataSourceLoaderCache.getOrLoadDataSource(dataSourceMetadata); + dataSourceMetadata.setAllowedRoles(List.of("testDS_access")); + dataSourceLoaderCache.getOrLoadDataSource(dataSourceMetadata); + dataSourceLoaderCache.getOrLoadDataSource(dataSourceMetadata); + verify(dataSourceFactory, times(2)).createDataSource(dataSourceMetadata); + } + + private DataSourceMetadata getMetadata() { + DataSourceMetadata dataSourceMetadata = new DataSourceMetadata(); + dataSourceMetadata.setName("testDS"); + dataSourceMetadata.setConnector(DataSourceType.OPENSEARCH); + dataSourceMetadata.setAllowedRoles(Collections.emptyList()); + dataSourceMetadata.setProperties(ImmutableMap.of()); + return dataSourceMetadata; + } + +} diff --git a/core/src/test/java/org/opensearch/sql/datasource/DataSourceServiceImplTest.java b/core/src/test/java/org/opensearch/sql/datasource/DataSourceServiceImplTest.java index 68a9475f76..98e17e9166 100644 --- a/core/src/test/java/org/opensearch/sql/datasource/DataSourceServiceImplTest.java +++ b/core/src/test/java/org/opensearch/sql/datasource/DataSourceServiceImplTest.java @@ -6,6 +6,7 @@ package org.opensearch.sql.datasource; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentMatchers.any; @@ -15,13 +16,13 @@ import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyNoInteractions; -import static org.mockito.Mockito.verifyNoMoreInteractions; import static org.mockito.Mockito.when; import static org.opensearch.sql.analysis.DataSourceSchemaIdentifierNameResolver.DEFAULT_DATASOURCE_NAME; import com.google.common.collect.ImmutableMap; import java.util.ArrayList; import java.util.Collections; +import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -33,6 +34,7 @@ import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; +import org.opensearch.sql.datasource.exceptions.DataSourceNotFoundException; import org.opensearch.sql.datasource.model.DataSource; import org.opensearch.sql.datasource.model.DataSourceMetadata; import org.opensearch.sql.datasource.model.DataSourceType; @@ -42,8 +44,6 @@ @ExtendWith(MockitoExtension.class) class DataSourceServiceImplTest { - static final String NAME = "opensearch"; - @Mock private DataSourceFactory dataSourceFactory; @Mock @@ -91,9 +91,9 @@ void testGetDataSourceForDefaultOpenSearchDataSource() { void testGetDataSourceForNonExistingDataSource() { when(dataSourceMetadataStorage.getDataSourceMetadata("test")) .thenReturn(Optional.empty()); - IllegalArgumentException exception = + DataSourceNotFoundException exception = assertThrows( - IllegalArgumentException.class, + DataSourceNotFoundException.class, () -> dataSourceService.getDataSource("test")); assertEquals("DataSource with name test doesn't exist.", exception.getMessage()); @@ -130,7 +130,7 @@ void testGetDataSourceWithAuthorizationFailure() { SecurityException securityException = Assertions.assertThrows(SecurityException.class, - () -> dataSourceService.getDataSource("test")); + () -> dataSourceService.getDataSource("test")); Assertions.assertEquals("User is not authorized to access datasource test. " + "User should be mapped to any of the roles in [prometheus_access] for access.", securityException.getMessage()); @@ -158,7 +158,6 @@ void testCreateDataSourceSuccessCase() { assertEquals("testDS", dataSource.getName()); assertEquals(storageEngine, dataSource.getStorageEngine()); assertEquals(DataSourceType.OPENSEARCH, dataSource.getConnectorType()); - verifyNoMoreInteractions(dataSourceFactory); } @Test @@ -213,6 +212,32 @@ void testCreateDataSourceWithNullParameters() { @Test void testGetDataSourceMetadataSet() { + HashMap properties = new HashMap<>(); + properties.put("prometheus.uri", "http://localhost:9200"); + properties.put("prometheus.auth.type", "basicauth"); + properties.put("prometheus.auth.username", "username"); + properties.put("prometheus.auth.password", "password"); + when(dataSourceMetadataStorage.getDataSourceMetadata()).thenReturn(new ArrayList<>() { + { + add(metadata("testDS", DataSourceType.PROMETHEUS, Collections.emptyList(), + properties)); + } + }); + Set dataSourceMetadataSet + = dataSourceService.getDataSourceMetadata(false); + assertEquals(1, dataSourceMetadataSet.size()); + DataSourceMetadata dataSourceMetadata = dataSourceMetadataSet.iterator().next(); + assertTrue(dataSourceMetadata.getProperties().containsKey("prometheus.uri")); + assertFalse(dataSourceMetadata.getProperties().containsKey("prometheus.auth.type")); + assertFalse(dataSourceMetadata.getProperties().containsKey("prometheus.auth.username")); + assertFalse(dataSourceMetadata.getProperties().containsKey("prometheus.auth.password")); + assertFalse(dataSourceMetadataSet + .contains(DataSourceMetadata.defaultOpenSearchDataSourceMetadata())); + verify(dataSourceMetadataStorage, times(1)).getDataSourceMetadata(); + } + + @Test + void testGetDataSourceMetadataSetWithDefaultDatasource() { when(dataSourceMetadataStorage.getDataSourceMetadata()).thenReturn(new ArrayList<>() { { add(metadata("testDS", DataSourceType.PROMETHEUS, Collections.emptyList(), @@ -220,7 +245,7 @@ void testGetDataSourceMetadataSet() { } }); Set dataSourceMetadataSet - = dataSourceService.getDataSourceMetadataSet(); + = dataSourceService.getDataSourceMetadata(true); assertEquals(2, dataSourceMetadataSet.size()); assertTrue(dataSourceMetadataSet .contains(DataSourceMetadata.defaultOpenSearchDataSourceMetadata())); @@ -228,17 +253,42 @@ void testGetDataSourceMetadataSet() { } @Test - void testUpdateDatasource() { - assertThrows( - UnsupportedOperationException.class, - () -> dataSourceService.updateDataSource(new DataSourceMetadata())); + void testUpdateDataSourceSuccessCase() { + + DataSourceMetadata dataSourceMetadata = metadata("testDS", DataSourceType.OPENSEARCH, + Collections.emptyList(), ImmutableMap.of()); + dataSourceService.updateDataSource(dataSourceMetadata); + verify(dataSourceMetadataStorage, times(1)) + .updateDataSourceMetadata(dataSourceMetadata); + verify(dataSourceFactory, times(1)) + .createDataSource(dataSourceMetadata); + } + + @Test + void testUpdateDefaultDataSource() { + DataSourceMetadata dataSourceMetadata = metadata(DEFAULT_DATASOURCE_NAME, + DataSourceType.OPENSEARCH, Collections.emptyList(), ImmutableMap.of()); + UnsupportedOperationException unsupportedOperationException + = assertThrows(UnsupportedOperationException.class, + () -> dataSourceService.updateDataSource(dataSourceMetadata)); + assertEquals("Not allowed to update default datasource :" + DEFAULT_DATASOURCE_NAME, + unsupportedOperationException.getMessage()); } @Test void testDeleteDatasource() { - assertThrows( - UnsupportedOperationException.class, - () -> dataSourceService.deleteDataSource(NAME)); + dataSourceService.deleteDataSource("testDS"); + verify(dataSourceMetadataStorage, times(1)) + .deleteDataSourceMetadata("testDS"); + } + + @Test + void testDeleteDefaultDatasource() { + UnsupportedOperationException unsupportedOperationException + = assertThrows(UnsupportedOperationException.class, + () -> dataSourceService.deleteDataSource(DEFAULT_DATASOURCE_NAME)); + assertEquals("Not allowed to delete default datasource :" + DEFAULT_DATASOURCE_NAME, + unsupportedOperationException.getMessage()); } @Test @@ -268,4 +318,56 @@ DataSourceMetadata metadata(String name, DataSourceType type, dataSourceMetadata.setProperties(properties); return dataSourceMetadata; } + + @Test + void testRemovalOfAuthorizationInfo() { + HashMap properties = new HashMap<>(); + properties.put("prometheus.uri", "https://localhost:9090"); + properties.put("prometheus.auth.type", "basicauth"); + properties.put("prometheus.auth.username", "username"); + properties.put("prometheus.auth.password", "password"); + DataSourceMetadata dataSourceMetadata = + new DataSourceMetadata("testDS", DataSourceType.PROMETHEUS, + Collections.singletonList("prometheus_access"), properties); + when(dataSourceMetadataStorage.getDataSourceMetadata("testDS")) + .thenReturn(Optional.of(dataSourceMetadata)); + + DataSourceMetadata dataSourceMetadata1 + = dataSourceService.getDataSourceMetadata("testDS"); + assertEquals("testDS", dataSourceMetadata1.getName()); + assertEquals(DataSourceType.PROMETHEUS, dataSourceMetadata1.getConnector()); + assertFalse(dataSourceMetadata1.getProperties().containsKey("prometheus.auth.type")); + assertFalse(dataSourceMetadata1.getProperties().containsKey("prometheus.auth.username")); + assertFalse(dataSourceMetadata1.getProperties().containsKey("prometheus.auth.password")); + } + + @Test + void testGetDataSourceMetadataForNonExistingDataSource() { + when(dataSourceMetadataStorage.getDataSourceMetadata("testDS")) + .thenReturn(Optional.empty()); + IllegalArgumentException exception = assertThrows(IllegalArgumentException.class, + () -> dataSourceService.getDataSourceMetadata("testDS")); + assertEquals("DataSource with name: testDS doesn't exist.", exception.getMessage()); + } + + @Test + void testGetDataSourceMetadataForSpecificDataSourceName() { + HashMap properties = new HashMap<>(); + properties.put("prometheus.uri", "http://localhost:9200"); + properties.put("prometheus.auth.type", "basicauth"); + properties.put("prometheus.auth.username", "username"); + properties.put("prometheus.auth.password", "password"); + when(dataSourceMetadataStorage.getDataSourceMetadata("testDS")) + .thenReturn(Optional.ofNullable( + metadata("testDS", DataSourceType.PROMETHEUS, Collections.emptyList(), + properties))); + DataSourceMetadata dataSourceMetadata + = this.dataSourceService.getDataSourceMetadata("testDS"); + assertTrue(dataSourceMetadata.getProperties().containsKey("prometheus.uri")); + assertFalse(dataSourceMetadata.getProperties().containsKey("prometheus.auth.type")); + assertFalse(dataSourceMetadata.getProperties().containsKey("prometheus.auth.username")); + assertFalse(dataSourceMetadata.getProperties().containsKey("prometheus.auth.password")); + verify(dataSourceMetadataStorage, times(1)).getDataSourceMetadata("testDS"); + } + } diff --git a/core/src/test/java/org/opensearch/sql/planner/physical/datasource/DataSourceTableScanTest.java b/core/src/test/java/org/opensearch/sql/planner/physical/datasource/DataSourceTableScanTest.java index 0f95f05944..93c02def86 100644 --- a/core/src/test/java/org/opensearch/sql/planner/physical/datasource/DataSourceTableScanTest.java +++ b/core/src/test/java/org/opensearch/sql/planner/physical/datasource/DataSourceTableScanTest.java @@ -62,7 +62,7 @@ void testIterator() { .map(dataSource -> new DataSourceMetadata(dataSource.getName(), dataSource.getConnectorType(), Collections.emptyList(), ImmutableMap.of())) .collect(Collectors.toSet()); - when(dataSourceService.getDataSourceMetadataSet()).thenReturn(dataSourceMetadata); + when(dataSourceService.getDataSourceMetadata(true)).thenReturn(dataSourceMetadata); assertFalse(dataSourceTableScan.hasNext()); dataSourceTableScan.open(); diff --git a/integ-test/src/test/java/org/opensearch/sql/datasource/DataSourceAPIsIT.java b/integ-test/src/test/java/org/opensearch/sql/datasource/DataSourceAPIsIT.java index e324e976ba..d559207cc1 100644 --- a/integ-test/src/test/java/org/opensearch/sql/datasource/DataSourceAPIsIT.java +++ b/integ-test/src/test/java/org/opensearch/sql/datasource/DataSourceAPIsIT.java @@ -1,49 +1,192 @@ -package org.opensearch.sql.datasource;/* +/* * Copyright OpenSearch Contributors * SPDX-License-Identifier: Apache-2.0 */ +package org.opensearch.sql.datasource; +import static org.opensearch.sql.legacy.TestUtils.getResponseBody; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; import com.google.gson.Gson; -import java.io.IOException; +import com.google.gson.JsonObject; +import com.google.gson.reflect.TypeToken; +import java.lang.reflect.Type; import java.util.ArrayList; -import java.util.HashMap; -import java.util.Map; +import java.util.List; +import lombok.SneakyThrows; +import org.apache.commons.lang3.StringUtils; import org.junit.Assert; import org.junit.Test; import org.opensearch.client.Request; import org.opensearch.client.RequestOptions; +import org.opensearch.client.Response; +import org.opensearch.client.ResponseException; import org.opensearch.sql.datasource.model.DataSourceMetadata; import org.opensearch.sql.datasource.model.DataSourceType; import org.opensearch.sql.ppl.PPLIntegTestCase; public class DataSourceAPIsIT extends PPLIntegTestCase { + @SneakyThrows + @Test + public void createDataSourceAPITest() { + //create datasource + DataSourceMetadata createDSM = + new DataSourceMetadata("create_prometheus", DataSourceType.PROMETHEUS, + ImmutableList.of(), ImmutableMap.of("prometheus.uri", "https://localhost:9090")); + Request createRequest = getCreateDataSourceRequest(createDSM); + Response response = client().performRequest(createRequest); + Assert.assertEquals(201, response.getStatusLine().getStatusCode()); + String createResponseString = getResponseBody(response); + Assert.assertEquals("Created DataSource with name create_prometheus", createResponseString); + //Datasource is not immediately created. so introducing a sleep of 2s. + Thread.sleep(2000); + + //get datasource to validate the creation. + Request getRequest = getFetchDataSourceRequest("create_prometheus"); + Response getResponse = client().performRequest(getRequest); + Assert.assertEquals(200, getResponse.getStatusLine().getStatusCode()); + String getResponseString = getResponseBody(getResponse); + DataSourceMetadata dataSourceMetadata = + new Gson().fromJson(getResponseString, DataSourceMetadata.class); + Assert.assertEquals("https://localhost:9090", + dataSourceMetadata.getProperties().get("prometheus.uri")); + } + + + @SneakyThrows + @Test + public void updateDataSourceAPITest() { + //create datasource + DataSourceMetadata createDSM = + new DataSourceMetadata("update_prometheus", DataSourceType.PROMETHEUS, + ImmutableList.of(), ImmutableMap.of("prometheus.uri", "https://localhost:9090")); + Request createRequest = getCreateDataSourceRequest(createDSM); + client().performRequest(createRequest); + //Datasource is not immediately created. so introducing a sleep of 2s. + Thread.sleep(2000); + + //update datasource + DataSourceMetadata updateDSM = + new DataSourceMetadata("update_prometheus", DataSourceType.PROMETHEUS, + ImmutableList.of(), ImmutableMap.of("prometheus.uri", "https://randomtest:9090")); + Request updateRequest = getUpdateDataSourceRequest(updateDSM); + Response updateResponse = client().performRequest(updateRequest); + Assert.assertEquals(200, updateResponse.getStatusLine().getStatusCode()); + String updateResponseString = getResponseBody(updateResponse); + Assert.assertEquals("Updated DataSource with name update_prometheus", updateResponseString); + + //Datasource is not immediately updated. so introducing a sleep of 2s. + Thread.sleep(2000); + + //get datasource to validate the modification. + //get datasource + Request getRequest = getFetchDataSourceRequest("update_prometheus"); + Response getResponse = client().performRequest(getRequest); + Assert.assertEquals(200, getResponse.getStatusLine().getStatusCode()); + String getResponseString = getResponseBody(getResponse); + DataSourceMetadata dataSourceMetadata = + new Gson().fromJson(getResponseString, DataSourceMetadata.class); + Assert.assertEquals("https://randomtest:9090", + dataSourceMetadata.getProperties().get("prometheus.uri")); + } + + + @SneakyThrows + @Test + public void deleteDataSourceTest() { + + //create datasource for deletion + DataSourceMetadata createDSM = + new DataSourceMetadata("delete_prometheus", DataSourceType.PROMETHEUS, + ImmutableList.of(), ImmutableMap.of("prometheus.uri", "https://localhost:9090")); + Request createRequest = getCreateDataSourceRequest(createDSM); + client().performRequest(createRequest); + //Datasource is not immediately created. so introducing a sleep of 2s. + Thread.sleep(2000); + + //delete datasource + Request deleteRequest = getDeleteDataSourceRequest("delete_prometheus"); + Response deleteResponse = client().performRequest(deleteRequest); + Assert.assertEquals(204, deleteResponse.getStatusLine().getStatusCode()); + + //Datasource is not immediately deleted. so introducing a sleep of 2s. + Thread.sleep(2000); + + //get datasources to verify the deletion + final Request prometheusGetRequest = getFetchDataSourceRequest("delete_prometheus"); + ResponseException prometheusGetResponseException + = Assert.assertThrows(ResponseException.class, () -> client().performRequest(prometheusGetRequest)); + Assert.assertEquals( 400, prometheusGetResponseException.getResponse().getStatusLine().getStatusCode()); + String prometheusGetResponseString = getResponseBody(prometheusGetResponseException.getResponse()); + JsonObject errorMessage = new Gson().fromJson(prometheusGetResponseString, JsonObject.class); + Assert.assertEquals("DataSource with name: delete_prometheus doesn't exist.", + errorMessage.get("error").getAsJsonObject().get("details").getAsString()); + + } + + @SneakyThrows @Test - public void createDataSourceTest() throws IOException { - Request request = getCreateDataSourceRequest(getDataSourceMetadataJsonString()); - String response = executeRequest(request); - Assert.assertEquals("Created DataSource with name prometheus1", response); + public void getAllDataSourceTest() { +//create datasource for deletion + DataSourceMetadata createDSM = + new DataSourceMetadata("get_all_prometheus", DataSourceType.PROMETHEUS, + ImmutableList.of(), ImmutableMap.of("prometheus.uri", "https://localhost:9090")); + Request createRequest = getCreateDataSourceRequest(createDSM); + client().performRequest(createRequest); + //Datasource is not immediately created. so introducing a sleep of 2s. + Thread.sleep(2000); + + Request getRequest = getFetchDataSourceRequest(null); + Response getResponse = client().performRequest(getRequest); + Assert.assertEquals(200, getResponse.getStatusLine().getStatusCode()); + String getResponseString = getResponseBody(getResponse); + Type listType = new TypeToken>() {}.getType(); + List dataSourceMetadataList = + new Gson().fromJson(getResponseString, listType); + Assert.assertTrue( + dataSourceMetadataList.stream().anyMatch(ds -> ds.getName().equals("get_all_prometheus"))); } - private Request getCreateDataSourceRequest(String dataSourceMetadataJson) { + + private Request getCreateDataSourceRequest(DataSourceMetadata dataSourceMetadata) { Request request = new Request("POST", "/_plugins/_query/_datasources"); - request.setJsonEntity(dataSourceMetadataJson); + request.setJsonEntity(new Gson().toJson(dataSourceMetadata)); + RequestOptions.Builder restOptionsBuilder = RequestOptions.DEFAULT.toBuilder(); + restOptionsBuilder.addHeader("Content-Type", "application/json"); + request.setOptions(restOptionsBuilder); + return request; + } + + private Request getUpdateDataSourceRequest(DataSourceMetadata dataSourceMetadata) { + Request request = new Request("PUT", "/_plugins/_query/_datasources"); + request.setJsonEntity(new Gson().toJson(dataSourceMetadata)); + RequestOptions.Builder restOptionsBuilder = RequestOptions.DEFAULT.toBuilder(); + restOptionsBuilder.addHeader("Content-Type", "application/json"); + request.setOptions(restOptionsBuilder); + return request; + } + + private Request getFetchDataSourceRequest(String name) { + Request request = new Request("GET", "/_plugins/_query/_datasources" + "/" + name); + if (StringUtils.isEmpty(name)) { + request = new Request("GET", "/_plugins/_query/_datasources"); + } RequestOptions.Builder restOptionsBuilder = RequestOptions.DEFAULT.toBuilder(); restOptionsBuilder.addHeader("Content-Type", "application/json"); request.setOptions(restOptionsBuilder); return request; } - private String getDataSourceMetadataJsonString() { - DataSourceMetadata dataSourceMetadata = new DataSourceMetadata(); - dataSourceMetadata.setName("prometheus1"); - dataSourceMetadata.setConnector(DataSourceType.PROMETHEUS); - dataSourceMetadata.setAllowedRoles(new ArrayList<>()); - Map propertiesMap = new HashMap<>(); - propertiesMap.put("prometheus.uri", "http://localhost:9200"); - dataSourceMetadata.setProperties(propertiesMap); - return new Gson().toJson(dataSourceMetadata); + + private Request getDeleteDataSourceRequest(String name) { + Request request = new Request("DELETE", "/_plugins/_query/_datasources" + "/" + name); + RequestOptions.Builder restOptionsBuilder = RequestOptions.DEFAULT.toBuilder(); + restOptionsBuilder.addHeader("Content-Type", "application/json"); + request.setOptions(restOptionsBuilder); + return request; } } diff --git a/plugin/src/main/java/org/opensearch/sql/plugin/SQLPlugin.java b/plugin/src/main/java/org/opensearch/sql/plugin/SQLPlugin.java index 3d733233be..2fc47082d7 100644 --- a/plugin/src/main/java/org/opensearch/sql/plugin/SQLPlugin.java +++ b/plugin/src/main/java/org/opensearch/sql/plugin/SQLPlugin.java @@ -47,10 +47,12 @@ import org.opensearch.script.ScriptEngine; import org.opensearch.script.ScriptService; import org.opensearch.sql.common.encryptor.EncryptorImpl; +import org.opensearch.sql.datasource.DataSourceLoaderCache; import org.opensearch.sql.datasource.DataSourceMetadataStorage; import org.opensearch.sql.datasource.DataSourceService; import org.opensearch.sql.datasource.DataSourceServiceImpl; import org.opensearch.sql.datasource.DataSourceUserAuthorizationHelper; +import org.opensearch.sql.datasource.model.DataSource; import org.opensearch.sql.legacy.esdomain.LocalClusterState; import org.opensearch.sql.legacy.executor.AsyncRestExecutor; import org.opensearch.sql.legacy.metrics.Metrics; @@ -67,6 +69,9 @@ import org.opensearch.sql.plugin.datasource.DataSourceUserAuthorizationHelperImpl; import org.opensearch.sql.plugin.datasource.OpenSearchDataSourceMetadataStorage; import org.opensearch.sql.plugin.model.CreateDataSourceActionResponse; +import org.opensearch.sql.plugin.model.DeleteDataSourceActionResponse; +import org.opensearch.sql.plugin.model.GetDataSourceActionResponse; +import org.opensearch.sql.plugin.model.UpdateDataSourceActionResponse; import org.opensearch.sql.plugin.rest.RestDataSourceQueryAction; import org.opensearch.sql.plugin.rest.RestPPLQueryAction; import org.opensearch.sql.plugin.rest.RestPPLStatsAction; @@ -75,6 +80,9 @@ import org.opensearch.sql.plugin.transport.TransportPPLQueryAction; import org.opensearch.sql.plugin.transport.TransportPPLQueryResponse; import org.opensearch.sql.plugin.transport.datasource.TransportCreateDataSourceAction; +import org.opensearch.sql.plugin.transport.datasource.TransportDeleteDataSourceAction; +import org.opensearch.sql.plugin.transport.datasource.TransportGetDataSourceAction; +import org.opensearch.sql.plugin.transport.datasource.TransportUpdateDataSourceAction; import org.opensearch.sql.prometheus.storage.PrometheusStorageFactory; import org.opensearch.sql.storage.DataSourceFactory; import org.opensearch.threadpool.ExecutorBuilder; @@ -91,7 +99,7 @@ public class SQLPlugin extends Plugin implements ActionPlugin, ScriptPlugin { */ private org.opensearch.sql.common.setting.Settings pluginSettings; private NodeClient client; - private DataSourceService dataSourceService; + private DataSourceServiceImpl dataSourceService; private Injector injector; public String name() { @@ -136,7 +144,13 @@ public List getRestHandlers( new ActionType<>(PPLQueryAction.NAME, TransportPPLQueryResponse::new), TransportPPLQueryAction.class), new ActionHandler<>(new ActionType<>(TransportCreateDataSourceAction.NAME, - CreateDataSourceActionResponse::new), TransportCreateDataSourceAction.class)); + CreateDataSourceActionResponse::new), TransportCreateDataSourceAction.class), + new ActionHandler<>(new ActionType<>(TransportGetDataSourceAction.NAME, + GetDataSourceActionResponse::new), TransportGetDataSourceAction.class), + new ActionHandler<>(new ActionType<>(TransportUpdateDataSourceAction.NAME, + UpdateDataSourceActionResponse::new), TransportUpdateDataSourceAction.class), + new ActionHandler<>(new ActionType<>(TransportDeleteDataSourceAction.NAME, + DeleteDataSourceActionResponse::new), TransportDeleteDataSourceAction.class)); } @Override @@ -155,22 +169,7 @@ public Collection createComponents( this.clusterService = clusterService; this.pluginSettings = new OpenSearchSettings(clusterService.getClusterSettings()); this.client = (NodeClient) client; - String masterKey = DataSourceSettings - .DATASOURCE_MASTER_SECRET_KEY.get(clusterService.getSettings()); - DataSourceMetadataStorage dataSourceMetadataStorage - = new OpenSearchDataSourceMetadataStorage(client, clusterService, - new EncryptorImpl(masterKey)); - DataSourceUserAuthorizationHelper dataSourceUserAuthorizationHelper - = new DataSourceUserAuthorizationHelperImpl(client); - this.dataSourceService = - new DataSourceServiceImpl( - new ImmutableSet.Builder() - .add(new OpenSearchDataSourceFactory( - new OpenSearchNodeClient(this.client), pluginSettings)) - .add(new PrometheusStorageFactory()) - .build(), - dataSourceMetadataStorage, - dataSourceUserAuthorizationHelper); + this.dataSourceService = createDataSourceService(); dataSourceService.createDataSource(defaultOpenSearchDataSourceMetadata()); LocalClusterState.state().setClusterService(clusterService); LocalClusterState.state().setPluginSettings((OpenSearchSettings) pluginSettings); @@ -213,4 +212,22 @@ public ScriptEngine getScriptEngine(Settings settings, Collection() + .add(new OpenSearchDataSourceFactory( + new OpenSearchNodeClient(this.client), pluginSettings)) + .add(new PrometheusStorageFactory()) + .build(), + dataSourceMetadataStorage, + dataSourceUserAuthorizationHelper); + } + } diff --git a/plugin/src/main/java/org/opensearch/sql/plugin/datasource/OpenSearchDataSourceMetadataStorage.java b/plugin/src/main/java/org/opensearch/sql/plugin/datasource/OpenSearchDataSourceMetadataStorage.java index b3c433f7e6..7e2a2ad705 100644 --- a/plugin/src/main/java/org/opensearch/sql/plugin/datasource/OpenSearchDataSourceMetadataStorage.java +++ b/plugin/src/main/java/org/opensearch/sql/plugin/datasource/OpenSearchDataSourceMetadataStorage.java @@ -20,23 +20,32 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.opensearch.action.ActionFuture; +import org.opensearch.action.DocWriteRequest; import org.opensearch.action.DocWriteResponse; import org.opensearch.action.admin.indices.create.CreateIndexRequest; import org.opensearch.action.admin.indices.create.CreateIndexResponse; +import org.opensearch.action.delete.DeleteRequest; +import org.opensearch.action.delete.DeleteResponse; import org.opensearch.action.index.IndexRequest; import org.opensearch.action.index.IndexResponse; import org.opensearch.action.search.SearchRequest; import org.opensearch.action.search.SearchResponse; +import org.opensearch.action.support.WriteRequest; +import org.opensearch.action.update.UpdateRequest; +import org.opensearch.action.update.UpdateResponse; import org.opensearch.client.Client; import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.util.concurrent.ThreadContext; import org.opensearch.common.xcontent.XContentType; +import org.opensearch.index.engine.DocumentMissingException; +import org.opensearch.index.engine.VersionConflictEngineException; import org.opensearch.index.query.QueryBuilder; import org.opensearch.index.query.QueryBuilders; import org.opensearch.search.SearchHit; import org.opensearch.search.builder.SearchSourceBuilder; import org.opensearch.sql.common.encryptor.Encryptor; import org.opensearch.sql.datasource.DataSourceMetadataStorage; +import org.opensearch.sql.datasource.exceptions.DataSourceNotFoundException; import org.opensearch.sql.datasource.model.DataSourceMetadata; import org.opensearch.sql.datasource.model.auth.AuthenticationType; import org.opensearch.sql.plugin.utils.XContentParserUtils; @@ -45,6 +54,8 @@ public class OpenSearchDataSourceMetadataStorage implements DataSourceMetadataSt public static final String DATASOURCE_INDEX_NAME = ".ql-datasources"; private static final String DATASOURCE_INDEX_MAPPING_FILE_NAME = "datasources-index-mapping.yml"; + + private static final Integer DATASOURCE_QUERY_RESULT_SIZE = 10000; private static final String DATASOURCE_INDEX_SETTINGS_FILE_NAME = "datasources-index-settings.yml"; private static final Logger LOG = LogManager.getLogger(); @@ -96,28 +107,77 @@ public void createDataSourceMetadata(DataSourceMetadata dataSourceMetadata) { } IndexRequest indexRequest = new IndexRequest(DATASOURCE_INDEX_NAME); indexRequest.id(dataSourceMetadata.getName()); + indexRequest.opType(DocWriteRequest.OpType.CREATE); + indexRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); ActionFuture indexResponseActionFuture; + IndexResponse indexResponse; try (ThreadContext.StoredContext storedContext = client.threadPool().getThreadContext() .stashContext()) { indexRequest.source(XContentParserUtils.convertToXContent(dataSourceMetadata)); indexResponseActionFuture = client.index(indexRequest); + indexResponse = indexResponseActionFuture.actionGet(); + } catch (VersionConflictEngineException exception) { + throw new IllegalArgumentException("A datasource already exists with name: " + + dataSourceMetadata.getName()); } catch (Exception e) { throw new RuntimeException(e); } - IndexResponse indexResponse = indexResponseActionFuture.actionGet(); + if (indexResponse.getResult().equals(DocWriteResponse.Result.CREATED)) { LOG.debug("DatasourceMetadata : {} successfully created", dataSourceMetadata.getName()); + } else { + throw new RuntimeException("Saving dataSource metadata information failed with result : " + + indexResponse.getResult().getLowercase()); } } @Override public void updateDataSourceMetadata(DataSourceMetadata dataSourceMetadata) { - throw new UnsupportedOperationException("will be supported in future."); + encryptDecryptAuthenticationData(dataSourceMetadata, true); + UpdateRequest updateRequest + = new UpdateRequest(DATASOURCE_INDEX_NAME, dataSourceMetadata.getName()); + UpdateResponse updateResponse; + try (ThreadContext.StoredContext storedContext = client.threadPool().getThreadContext() + .stashContext()) { + updateRequest.doc(XContentParserUtils.convertToXContent(dataSourceMetadata)); + updateRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); + ActionFuture updateResponseActionFuture + = client.update(updateRequest); + updateResponse = updateResponseActionFuture.actionGet(); + } catch (DocumentMissingException exception) { + throw new DataSourceNotFoundException("Datasource with name: " + + dataSourceMetadata.getName() + " doesn't exist"); + } catch (Exception e) { + throw new RuntimeException(e); + } + + if (updateResponse.getResult().equals(DocWriteResponse.Result.UPDATED)) { + LOG.debug("DatasourceMetadata : {} successfully updated", dataSourceMetadata.getName()); + } else { + throw new RuntimeException("Saving dataSource metadata information failed with result : " + + updateResponse.getResult().getLowercase()); + } } @Override public void deleteDataSourceMetadata(String datasourceName) { - throw new UnsupportedOperationException("will be supported in future."); + DeleteRequest deleteRequest = new DeleteRequest(DATASOURCE_INDEX_NAME); + deleteRequest.id(datasourceName); + ActionFuture deleteResponseActionFuture; + try (ThreadContext.StoredContext storedContext = client.threadPool().getThreadContext() + .stashContext()) { + deleteResponseActionFuture = client.delete(deleteRequest); + } + DeleteResponse deleteResponse = deleteResponseActionFuture.actionGet(); + if (deleteResponse.getResult().equals(DocWriteResponse.Result.DELETED)) { + LOG.debug("DatasourceMetadata : {} successfully deleted", datasourceName); + } else if (deleteResponse.getResult().equals(DocWriteResponse.Result.NOT_FOUND)) { + throw new DataSourceNotFoundException("Datasource with name: " + + datasourceName + " doesn't exist"); + } else { + throw new RuntimeException("Deleting dataSource metadata information failed with result : " + + deleteResponse.getResult().getLowercase()); + } } private void createDataSourcesIndex() { @@ -141,7 +201,7 @@ private void createDataSourcesIndex() { if (createIndexResponse.isAcknowledged()) { LOG.info("Index: {} creation Acknowledged", DATASOURCE_INDEX_NAME); } else { - throw new IllegalStateException( + throw new RuntimeException( String.format("Index: %s creation failed", DATASOURCE_INDEX_NAME)); } } catch (Throwable e) { @@ -156,6 +216,7 @@ private List searchInDataSourcesIndex(QueryBuilder query) { searchRequest.indices(DATASOURCE_INDEX_NAME); SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); searchSourceBuilder.query(query); + searchSourceBuilder.size(DATASOURCE_QUERY_RESULT_SIZE); searchRequest.source(searchSourceBuilder); ActionFuture searchResponseActionFuture; try (ThreadContext.StoredContext storedContext = client.threadPool().getThreadContext() @@ -164,12 +225,12 @@ private List searchInDataSourcesIndex(QueryBuilder query) { } SearchResponse searchResponse = searchResponseActionFuture.actionGet(); if (searchResponse.status().getStatus() != 200) { - throw new RuntimeException( - "Internal server error while fetching datasource metadata information"); + throw new RuntimeException("Fetching dataSource metadata information failed with status : " + + searchResponse.status()); } else { List list = new ArrayList<>(); - for (SearchHit documentFields : searchResponse.getHits().getHits()) { - String sourceAsString = documentFields.getSourceAsString(); + for (SearchHit searchHit : searchResponse.getHits().getHits()) { + String sourceAsString = searchHit.getSourceAsString(); DataSourceMetadata dataSourceMetadata; try { dataSourceMetadata = XContentParserUtils.toDataSourceMetadata(sourceAsString); diff --git a/plugin/src/main/java/org/opensearch/sql/plugin/model/DeleteDataSourceActionRequest.java b/plugin/src/main/java/org/opensearch/sql/plugin/model/DeleteDataSourceActionRequest.java new file mode 100644 index 0000000000..7377f25dcb --- /dev/null +++ b/plugin/src/main/java/org/opensearch/sql/plugin/model/DeleteDataSourceActionRequest.java @@ -0,0 +1,51 @@ +/* + * + * * Copyright OpenSearch Contributors + * * SPDX-License-Identifier: Apache-2.0 + * + */ + +package org.opensearch.sql.plugin.model; + +import static org.opensearch.sql.analysis.DataSourceSchemaIdentifierNameResolver.DEFAULT_DATASOURCE_NAME; + +import java.io.IOException; +import lombok.Getter; +import org.apache.commons.lang3.StringUtils; +import org.opensearch.action.ActionRequest; +import org.opensearch.action.ActionRequestValidationException; +import org.opensearch.common.io.stream.StreamInput; + +public class DeleteDataSourceActionRequest extends ActionRequest { + + @Getter + private String dataSourceName; + + /** Constructor of DeleteDataSourceActionRequest from StreamInput. */ + public DeleteDataSourceActionRequest(StreamInput in) throws IOException { + super(in); + } + + public DeleteDataSourceActionRequest(String dataSourceName) { + this.dataSourceName = dataSourceName; + } + + @Override + public ActionRequestValidationException validate() { + if (StringUtils.isEmpty(this.dataSourceName)) { + ActionRequestValidationException exception = new ActionRequestValidationException(); + exception + .addValidationError("Datasource Name cannot be empty or null"); + return exception; + } else if (this.dataSourceName.equals(DEFAULT_DATASOURCE_NAME)) { + ActionRequestValidationException exception = new ActionRequestValidationException(); + exception + .addValidationError( + "Not allowed to delete datasource with name : " + DEFAULT_DATASOURCE_NAME); + return exception; + } else { + return null; + } + } + +} \ No newline at end of file diff --git a/plugin/src/main/java/org/opensearch/sql/plugin/model/DeleteDataSourceActionResponse.java b/plugin/src/main/java/org/opensearch/sql/plugin/model/DeleteDataSourceActionResponse.java new file mode 100644 index 0000000000..3a2f136e31 --- /dev/null +++ b/plugin/src/main/java/org/opensearch/sql/plugin/model/DeleteDataSourceActionResponse.java @@ -0,0 +1,33 @@ +/* + * + * * Copyright OpenSearch Contributors + * * SPDX-License-Identifier: Apache-2.0 + * + */ + +package org.opensearch.sql.plugin.model; + +import java.io.IOException; +import lombok.Getter; +import lombok.RequiredArgsConstructor; +import org.opensearch.action.ActionResponse; +import org.opensearch.common.io.stream.StreamInput; +import org.opensearch.common.io.stream.StreamOutput; + +@RequiredArgsConstructor +public class DeleteDataSourceActionResponse extends ActionResponse { + + @Getter + private final String result; + + public DeleteDataSourceActionResponse(StreamInput in) throws IOException { + super(in); + result = in.readString(); + } + + @Override + public void writeTo(StreamOutput streamOutput) throws IOException { + streamOutput.writeString(result); + } + +} \ No newline at end of file diff --git a/plugin/src/main/java/org/opensearch/sql/plugin/model/GetDataSourceActionRequest.java b/plugin/src/main/java/org/opensearch/sql/plugin/model/GetDataSourceActionRequest.java new file mode 100644 index 0000000000..41b854888e --- /dev/null +++ b/plugin/src/main/java/org/opensearch/sql/plugin/model/GetDataSourceActionRequest.java @@ -0,0 +1,49 @@ +/* + * + * * Copyright OpenSearch Contributors + * * SPDX-License-Identifier: Apache-2.0 + * + */ + +package org.opensearch.sql.plugin.model; + +import static org.opensearch.sql.analysis.DataSourceSchemaIdentifierNameResolver.DEFAULT_DATASOURCE_NAME; + +import java.io.IOException; +import lombok.Getter; +import lombok.NoArgsConstructor; +import org.opensearch.action.ActionRequest; +import org.opensearch.action.ActionRequestValidationException; +import org.opensearch.common.io.stream.StreamInput; + +@NoArgsConstructor +public class GetDataSourceActionRequest extends ActionRequest { + + @Getter + private String dataSourceName; + + /** + * Constructor of GetDataSourceActionRequest from StreamInput. + */ + public GetDataSourceActionRequest(StreamInput in) throws IOException { + super(in); + } + + public GetDataSourceActionRequest(String dataSourceName) { + this.dataSourceName = dataSourceName; + } + + @Override + public ActionRequestValidationException validate() { + if (this.dataSourceName != null && this.dataSourceName.equals(DEFAULT_DATASOURCE_NAME)) { + ActionRequestValidationException exception = new ActionRequestValidationException(); + exception + .addValidationError( + "Not allowed to fetch datasource with name : " + DEFAULT_DATASOURCE_NAME); + return exception; + } else { + return null; + } + } + +} \ No newline at end of file diff --git a/plugin/src/main/java/org/opensearch/sql/plugin/model/GetDataSourceActionResponse.java b/plugin/src/main/java/org/opensearch/sql/plugin/model/GetDataSourceActionResponse.java new file mode 100644 index 0000000000..e419ad1e08 --- /dev/null +++ b/plugin/src/main/java/org/opensearch/sql/plugin/model/GetDataSourceActionResponse.java @@ -0,0 +1,33 @@ +/* + * + * * Copyright OpenSearch Contributors + * * SPDX-License-Identifier: Apache-2.0 + * + */ + +package org.opensearch.sql.plugin.model; + +import java.io.IOException; +import lombok.Getter; +import lombok.RequiredArgsConstructor; +import org.opensearch.action.ActionResponse; +import org.opensearch.common.io.stream.StreamInput; +import org.opensearch.common.io.stream.StreamOutput; + +@RequiredArgsConstructor +public class GetDataSourceActionResponse extends ActionResponse { + + @Getter + private final String result; + + public GetDataSourceActionResponse(StreamInput in) throws IOException { + super(in); + result = in.readString(); + } + + @Override + public void writeTo(StreamOutput streamOutput) throws IOException { + streamOutput.writeString(result); + } + +} \ No newline at end of file diff --git a/plugin/src/main/java/org/opensearch/sql/plugin/model/UpdateDataSourceActionRequest.java b/plugin/src/main/java/org/opensearch/sql/plugin/model/UpdateDataSourceActionRequest.java new file mode 100644 index 0000000000..73140c6b29 --- /dev/null +++ b/plugin/src/main/java/org/opensearch/sql/plugin/model/UpdateDataSourceActionRequest.java @@ -0,0 +1,49 @@ +/* + * + * * Copyright OpenSearch Contributors + * * SPDX-License-Identifier: Apache-2.0 + * + */ + +package org.opensearch.sql.plugin.model; + + +import static org.opensearch.sql.analysis.DataSourceSchemaIdentifierNameResolver.DEFAULT_DATASOURCE_NAME; + +import java.io.IOException; +import lombok.Getter; +import org.json.JSONObject; +import org.opensearch.action.ActionRequest; +import org.opensearch.action.ActionRequestValidationException; +import org.opensearch.common.io.stream.StreamInput; +import org.opensearch.sql.datasource.model.DataSourceMetadata; +import org.opensearch.sql.protocol.response.format.JsonResponseFormatter; + +public class UpdateDataSourceActionRequest + extends ActionRequest { + + @Getter + private DataSourceMetadata dataSourceMetadata; + + /** Constructor of UpdateDataSourceActionRequest from StreamInput. */ + public UpdateDataSourceActionRequest(StreamInput in) throws IOException { + super(in); + } + + public UpdateDataSourceActionRequest(DataSourceMetadata dataSourceMetadata) { + this.dataSourceMetadata = dataSourceMetadata; + } + + @Override + public ActionRequestValidationException validate() { + if (this.dataSourceMetadata.getName().equals(DEFAULT_DATASOURCE_NAME)) { + ActionRequestValidationException exception = new ActionRequestValidationException(); + exception + .addValidationError( + "Not allowed to update datasource with name : " + DEFAULT_DATASOURCE_NAME); + return exception; + } else { + return null; + } + } +} \ No newline at end of file diff --git a/plugin/src/main/java/org/opensearch/sql/plugin/model/UpdateDataSourceActionResponse.java b/plugin/src/main/java/org/opensearch/sql/plugin/model/UpdateDataSourceActionResponse.java new file mode 100644 index 0000000000..f2ae4e6472 --- /dev/null +++ b/plugin/src/main/java/org/opensearch/sql/plugin/model/UpdateDataSourceActionResponse.java @@ -0,0 +1,33 @@ +/* + * + * * Copyright OpenSearch Contributors + * * SPDX-License-Identifier: Apache-2.0 + * + */ + +package org.opensearch.sql.plugin.model; + +import java.io.IOException; +import lombok.Getter; +import lombok.RequiredArgsConstructor; +import org.opensearch.action.ActionResponse; +import org.opensearch.common.io.stream.StreamInput; +import org.opensearch.common.io.stream.StreamOutput; + +@RequiredArgsConstructor +public class UpdateDataSourceActionResponse + extends ActionResponse { + + @Getter + private final String result; + + public UpdateDataSourceActionResponse(StreamInput in) throws IOException { + super(in); + result = in.readString(); + } + + @Override + public void writeTo(StreamOutput streamOutput) throws IOException { + streamOutput.writeString(result); + } +} \ No newline at end of file diff --git a/plugin/src/main/java/org/opensearch/sql/plugin/rest/RestDataSourceQueryAction.java b/plugin/src/main/java/org/opensearch/sql/plugin/rest/RestDataSourceQueryAction.java index 7314362e39..2468d004a9 100644 --- a/plugin/src/main/java/org/opensearch/sql/plugin/rest/RestDataSourceQueryAction.java +++ b/plugin/src/main/java/org/opensearch/sql/plugin/rest/RestDataSourceQueryAction.java @@ -7,31 +7,45 @@ package org.opensearch.sql.plugin.rest; +import static org.opensearch.rest.RestRequest.Method.DELETE; +import static org.opensearch.rest.RestRequest.Method.GET; import static org.opensearch.rest.RestRequest.Method.POST; +import static org.opensearch.rest.RestRequest.Method.PUT; import static org.opensearch.rest.RestStatus.BAD_REQUEST; +import static org.opensearch.rest.RestStatus.NOT_FOUND; import static org.opensearch.rest.RestStatus.SERVICE_UNAVAILABLE; import static org.opensearch.sql.plugin.utils.Scheduler.schedule; import com.google.common.collect.ImmutableList; import java.io.IOException; import java.util.List; +import java.util.Locale; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.opensearch.action.ActionListener; import org.opensearch.client.node.NodeClient; -import org.opensearch.index.IndexNotFoundException; import org.opensearch.rest.BaseRestHandler; import org.opensearch.rest.BytesRestResponse; import org.opensearch.rest.RestChannel; import org.opensearch.rest.RestRequest; import org.opensearch.rest.RestStatus; +import org.opensearch.sql.datasource.exceptions.DataSourceNotFoundException; import org.opensearch.sql.datasource.model.DataSourceMetadata; import org.opensearch.sql.legacy.metrics.MetricName; import org.opensearch.sql.legacy.metrics.Metrics; import org.opensearch.sql.opensearch.response.error.ErrorMessageFactory; import org.opensearch.sql.plugin.model.CreateDataSourceActionRequest; import org.opensearch.sql.plugin.model.CreateDataSourceActionResponse; +import org.opensearch.sql.plugin.model.DeleteDataSourceActionRequest; +import org.opensearch.sql.plugin.model.DeleteDataSourceActionResponse; +import org.opensearch.sql.plugin.model.GetDataSourceActionRequest; +import org.opensearch.sql.plugin.model.GetDataSourceActionResponse; +import org.opensearch.sql.plugin.model.UpdateDataSourceActionRequest; +import org.opensearch.sql.plugin.model.UpdateDataSourceActionResponse; import org.opensearch.sql.plugin.transport.datasource.TransportCreateDataSourceAction; +import org.opensearch.sql.plugin.transport.datasource.TransportDeleteDataSourceAction; +import org.opensearch.sql.plugin.transport.datasource.TransportGetDataSourceAction; +import org.opensearch.sql.plugin.transport.datasource.TransportUpdateDataSourceAction; import org.opensearch.sql.plugin.utils.XContentParserUtils; public class RestDataSourceQueryAction extends BaseRestHandler { @@ -59,7 +73,42 @@ public List routes() { * Response body: * Ref [org.opensearch.sql.plugin.transport.datasource.model.CreateDataSourceActionResponse] */ - new Route(POST, BASE_DATASOURCE_ACTION_URL) + new Route(POST, BASE_DATASOURCE_ACTION_URL), + + /* + * GET datasources + * Request URL: GET + * Request body: + * Ref [org.opensearch.sql.plugin.transport.datasource.model.GetDataSourceActionRequest] + * Response body: + * Ref [org.opensearch.sql.plugin.transport.datasource.model.GetDataSourceActionResponse] + */ + new Route(GET, String.format(Locale.ROOT, "%s/{%s}", + BASE_DATASOURCE_ACTION_URL, "dataSourceName")), + new Route(GET, BASE_DATASOURCE_ACTION_URL), + + /* + * GET datasources + * Request URL: GET + * Request body: + * Ref + * [org.opensearch.sql.plugin.transport.datasource.model.UpdateDataSourceActionRequest] + * Response body: + * Ref + * [org.opensearch.sql.plugin.transport.datasource.model.UpdateDataSourceActionResponse] + */ + new Route(PUT, BASE_DATASOURCE_ACTION_URL), + + /* + * GET datasources + * Request URL: GET + * Request body: Ref + * [org.opensearch.sql.plugin.transport.datasource.model.DeleteDataSourceActionRequest] + * Response body: Ref + * [org.opensearch.sql.plugin.transport.datasource.model.DeleteDataSourceActionResponse] + */ + new Route(DELETE, String.format(Locale.ROOT, "%s/{%s}", + BASE_DATASOURCE_ACTION_URL, "dataSourceName")) ); } @@ -69,6 +118,12 @@ protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient switch (restRequest.method()) { case POST: return executePostRequest(restRequest, nodeClient); + case GET: + return executeGetRequest(restRequest, nodeClient); + case PUT: + return executeUpdateRequest(restRequest, nodeClient); + case DELETE: + return executeDeleteRequest(restRequest, nodeClient); default: return restChannel -> restChannel.sendResponse(new BytesRestResponse(RestStatus.METHOD_NOT_ALLOWED, @@ -89,32 +144,103 @@ private RestChannelConsumer executePostRequest(RestRequest restRequest, public void onResponse( CreateDataSourceActionResponse createDataSourceActionResponse) { restChannel.sendResponse( - new BytesRestResponse(RestStatus.OK, "application/json; charset=UTF-8", + new BytesRestResponse(RestStatus.CREATED, "application/json; charset=UTF-8", createDataSourceActionResponse.getResult())); } @Override public void onFailure(Exception e) { - if (e instanceof IllegalAccessException) { - reportError(restChannel, e, BAD_REQUEST); - } else { - LOG.error("Error happened during query handling", e); - if (isClientError(e)) { - Metrics.getInstance() - .getNumericalMetric(MetricName.DATASOURCE_FAILED_REQ_COUNT_CUS) - .increment(); - reportError(restChannel, e, BAD_REQUEST); - } else { - Metrics.getInstance() - .getNumericalMetric(MetricName.DATASOURCE_FAILED_REQ_COUNT_SYS) - .increment(); - reportError(restChannel, e, SERVICE_UNAVAILABLE); - } - } + handleException(e, restChannel); + } + })); + } + + private RestChannelConsumer executeGetRequest(RestRequest restRequest, + NodeClient nodeClient) { + String dataSourceName = restRequest.param("dataSourceName"); + return restChannel -> schedule(nodeClient, + () -> nodeClient.execute(TransportGetDataSourceAction.ACTION_TYPE, + new GetDataSourceActionRequest(dataSourceName), + new ActionListener<>() { + @Override + public void onResponse(GetDataSourceActionResponse getDataSourceActionResponse) { + restChannel.sendResponse( + new BytesRestResponse(RestStatus.OK, "application/json; charset=UTF-8", + getDataSourceActionResponse.getResult())); + } + + @Override + public void onFailure(Exception e) { + handleException(e, restChannel); + } + })); + } + + private RestChannelConsumer executeUpdateRequest(RestRequest restRequest, + NodeClient nodeClient) throws IOException { + DataSourceMetadata dataSourceMetadata + = XContentParserUtils.toDataSourceMetadata(restRequest.contentParser()); + return restChannel -> schedule(nodeClient, + () -> nodeClient.execute(TransportUpdateDataSourceAction.ACTION_TYPE, + new UpdateDataSourceActionRequest(dataSourceMetadata), + new ActionListener<>() { + @Override + public void onResponse( + UpdateDataSourceActionResponse updateDataSourceActionResponse) { + restChannel.sendResponse( + new BytesRestResponse(RestStatus.OK, "application/json; charset=UTF-8", + updateDataSourceActionResponse.getResult())); + } + + @Override + public void onFailure(Exception e) { + handleException(e, restChannel); + } + })); + } + + private RestChannelConsumer executeDeleteRequest(RestRequest restRequest, + NodeClient nodeClient) { + + String dataSourceName = restRequest.param("dataSourceName"); + return restChannel -> schedule(nodeClient, + () -> nodeClient.execute(TransportDeleteDataSourceAction.ACTION_TYPE, + new DeleteDataSourceActionRequest(dataSourceName), + new ActionListener<>() { + @Override + public void onResponse( + DeleteDataSourceActionResponse deleteDataSourceActionResponse) { + restChannel.sendResponse( + new BytesRestResponse(RestStatus.NO_CONTENT, "application/json; charset=UTF-8", + deleteDataSourceActionResponse.getResult())); + } + + @Override + public void onFailure(Exception e) { + handleException(e, restChannel); } })); } + private void handleException(Exception e, RestChannel restChannel) { + if (e instanceof DataSourceNotFoundException) { + reportError(restChannel, e, NOT_FOUND); + } else { + LOG.error("Error happened during request handling", e); + if (isClientError(e)) { + Metrics.getInstance() + .getNumericalMetric(MetricName.DATASOURCE_FAILED_REQ_COUNT_CUS) + .increment(); + reportError(restChannel, e, BAD_REQUEST); + } else { + Metrics.getInstance() + .getNumericalMetric(MetricName.DATASOURCE_FAILED_REQ_COUNT_SYS) + .increment(); + reportError(restChannel, e, SERVICE_UNAVAILABLE); + } + } + } + private void reportError(final RestChannel channel, final Exception e, final RestStatus status) { channel.sendResponse( new BytesRestResponse( @@ -124,8 +250,7 @@ private void reportError(final RestChannel channel, final Exception e, final Res private static boolean isClientError(Exception e) { return e instanceof NullPointerException // NPE is hard to differentiate but more likely caused by bad query - || e instanceof IllegalArgumentException - || e instanceof IndexNotFoundException; + || e instanceof IllegalArgumentException; } } \ No newline at end of file diff --git a/plugin/src/main/java/org/opensearch/sql/plugin/transport/datasource/TransportCreateDataSourceAction.java b/plugin/src/main/java/org/opensearch/sql/plugin/transport/datasource/TransportCreateDataSourceAction.java index 006837c256..05d724ba2a 100644 --- a/plugin/src/main/java/org/opensearch/sql/plugin/transport/datasource/TransportCreateDataSourceAction.java +++ b/plugin/src/main/java/org/opensearch/sql/plugin/transport/datasource/TransportCreateDataSourceAction.java @@ -30,7 +30,7 @@ public class TransportCreateDataSourceAction extends HandledTransportAction { private static final Logger LOG = LogManager.getLogger(); - public static final String NAME = "cluster:admin/opensearch/datasources/create"; + public static final String NAME = "cluster:admin/opensearch/ql/datasources/create"; public static final ActionType ACTION_TYPE = new ActionType<>(NAME, CreateDataSourceActionResponse::new); @@ -61,13 +61,10 @@ protected void doExecute(Task task, CreateDataSourceActionRequest request, ActionListener actionListener) { Metrics.getInstance().getNumericalMetric(MetricName.DATASOURCE_REQ_COUNT).increment(); - actionListener.onResponse(execute(request.getDataSourceMetadata())); - } - - private CreateDataSourceActionResponse execute(DataSourceMetadata dataSourceMetadata) { + DataSourceMetadata dataSourceMetadata = request.getDataSourceMetadata(); dataSourceService.createDataSource(dataSourceMetadata); - return new CreateDataSourceActionResponse("Created DataSource with name " - + dataSourceMetadata.getName()); + actionListener.onResponse(new CreateDataSourceActionResponse("Created DataSource with name " + + dataSourceMetadata.getName())); } } \ No newline at end of file diff --git a/plugin/src/main/java/org/opensearch/sql/plugin/transport/datasource/TransportDeleteDataSourceAction.java b/plugin/src/main/java/org/opensearch/sql/plugin/transport/datasource/TransportDeleteDataSourceAction.java new file mode 100644 index 0000000000..9557daee4e --- /dev/null +++ b/plugin/src/main/java/org/opensearch/sql/plugin/transport/datasource/TransportDeleteDataSourceAction.java @@ -0,0 +1,65 @@ +/* + * + * * Copyright OpenSearch Contributors + * * SPDX-License-Identifier: Apache-2.0 + * + */ + +package org.opensearch.sql.plugin.transport.datasource; + +import org.opensearch.action.ActionListener; +import org.opensearch.action.ActionType; +import org.opensearch.action.support.ActionFilters; +import org.opensearch.action.support.HandledTransportAction; +import org.opensearch.client.Client; +import org.opensearch.client.node.NodeClient; +import org.opensearch.common.inject.Inject; +import org.opensearch.sql.datasource.DataSourceService; +import org.opensearch.sql.datasource.DataSourceServiceImpl; +import org.opensearch.sql.legacy.metrics.MetricName; +import org.opensearch.sql.legacy.metrics.Metrics; +import org.opensearch.sql.opensearch.security.SecurityAccess; +import org.opensearch.sql.plugin.model.DeleteDataSourceActionRequest; +import org.opensearch.sql.plugin.model.DeleteDataSourceActionResponse; +import org.opensearch.tasks.Task; +import org.opensearch.transport.TransportService; + +public class TransportDeleteDataSourceAction + extends HandledTransportAction { + + public static final String NAME = "cluster:admin/opensearch/ql/datasources/delete"; + public static final ActionType + ACTION_TYPE = new ActionType<>(NAME, DeleteDataSourceActionResponse::new); + + private DataSourceService dataSourceService; + private Client client; + + /** + * TransportDeleteDataSourceAction action for deleting datasource. + * + * @param transportService transportService. + * @param actionFilters actionFilters. + * @param client client. + * @param dataSourceService dataSourceService. + */ + @Inject + public TransportDeleteDataSourceAction(TransportService transportService, + ActionFilters actionFilters, + NodeClient client, + DataSourceServiceImpl dataSourceService) { + super(TransportDeleteDataSourceAction.NAME, transportService, actionFilters, + DeleteDataSourceActionRequest::new); + this.client = client; + this.dataSourceService = dataSourceService; + } + + @Override + protected void doExecute(Task task, DeleteDataSourceActionRequest request, + ActionListener actionListener) { + Metrics.getInstance().getNumericalMetric(MetricName.DATASOURCE_REQ_COUNT).increment(); + dataSourceService.deleteDataSource(request.getDataSourceName()); + actionListener.onResponse(new DeleteDataSourceActionResponse("Deleted DataSource with name " + + request.getDataSourceName())); + } + +} \ No newline at end of file diff --git a/plugin/src/main/java/org/opensearch/sql/plugin/transport/datasource/TransportGetDataSourceAction.java b/plugin/src/main/java/org/opensearch/sql/plugin/transport/datasource/TransportGetDataSourceAction.java new file mode 100644 index 0000000000..179535ff9f --- /dev/null +++ b/plugin/src/main/java/org/opensearch/sql/plugin/transport/datasource/TransportGetDataSourceAction.java @@ -0,0 +1,101 @@ +/* + * + * * Copyright OpenSearch Contributors + * * SPDX-License-Identifier: Apache-2.0 + * + */ + +package org.opensearch.sql.plugin.transport.datasource; + +import static org.opensearch.sql.protocol.response.format.JsonResponseFormatter.Style.PRETTY; + +import java.util.Set; +import org.opensearch.action.ActionListener; +import org.opensearch.action.ActionType; +import org.opensearch.action.support.ActionFilters; +import org.opensearch.action.support.HandledTransportAction; +import org.opensearch.client.Client; +import org.opensearch.client.node.NodeClient; +import org.opensearch.common.inject.Inject; +import org.opensearch.sql.datasource.DataSourceService; +import org.opensearch.sql.datasource.DataSourceServiceImpl; +import org.opensearch.sql.datasource.model.DataSourceMetadata; +import org.opensearch.sql.plugin.model.GetDataSourceActionRequest; +import org.opensearch.sql.plugin.model.GetDataSourceActionResponse; +import org.opensearch.sql.protocol.response.format.JsonResponseFormatter; +import org.opensearch.tasks.Task; +import org.opensearch.transport.TransportService; + +public class TransportGetDataSourceAction + extends HandledTransportAction { + + public static final String NAME = "cluster:admin/opensearch/ql/datasources/read"; + public static final ActionType + ACTION_TYPE = new ActionType<>(NAME, GetDataSourceActionResponse::new); + + private DataSourceService dataSourceService; + private Client client; + + /** + * TransportGetDataSourceAction action for getting datasource. + * + * @param transportService transportService. + * @param actionFilters actionFilters. + * @param client client. + * @param dataSourceService dataSourceService. + */ + @Inject + public TransportGetDataSourceAction(TransportService transportService, + ActionFilters actionFilters, + NodeClient client, + DataSourceServiceImpl dataSourceService) { + super(TransportGetDataSourceAction.NAME, transportService, actionFilters, + GetDataSourceActionRequest::new); + this.client = client; + this.dataSourceService = dataSourceService; + } + + @Override + protected void doExecute(Task task, GetDataSourceActionRequest request, + ActionListener actionListener) { + try { + String responseContent; + if (request.getDataSourceName() == null) { + responseContent = handleGetAllDataSourcesRequest(); + + } else { + responseContent = handleSingleDataSourceRequest(request.getDataSourceName()); + } + actionListener.onResponse(new GetDataSourceActionResponse(responseContent)); + } catch (Exception e) { + actionListener.onFailure(e); + } + } + + private String handleGetAllDataSourcesRequest() { + String responseContent; + Set dataSourceMetadataSet = + dataSourceService.getDataSourceMetadata(false); + responseContent = new JsonResponseFormatter>(PRETTY) { + @Override + protected Object buildJsonObject(Set response) { + return response; + } + }.format(dataSourceMetadataSet); + return responseContent; + } + + private String handleSingleDataSourceRequest(String datasourceName) { + String responseContent; + DataSourceMetadata dataSourceMetadata + = dataSourceService + .getDataSourceMetadata(datasourceName); + responseContent = new JsonResponseFormatter(PRETTY) { + @Override + protected Object buildJsonObject(DataSourceMetadata response) { + return response; + } + }.format(dataSourceMetadata); + return responseContent; + } +} \ No newline at end of file diff --git a/plugin/src/main/java/org/opensearch/sql/plugin/transport/datasource/TransportUpdateDataSourceAction.java b/plugin/src/main/java/org/opensearch/sql/plugin/transport/datasource/TransportUpdateDataSourceAction.java new file mode 100644 index 0000000000..dfc7311e77 --- /dev/null +++ b/plugin/src/main/java/org/opensearch/sql/plugin/transport/datasource/TransportUpdateDataSourceAction.java @@ -0,0 +1,69 @@ +/* + * + * * Copyright OpenSearch Contributors + * * SPDX-License-Identifier: Apache-2.0 + * + */ + +package org.opensearch.sql.plugin.transport.datasource; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.action.ActionListener; +import org.opensearch.action.ActionType; +import org.opensearch.action.support.ActionFilters; +import org.opensearch.action.support.HandledTransportAction; +import org.opensearch.client.Client; +import org.opensearch.client.node.NodeClient; +import org.opensearch.common.inject.Inject; +import org.opensearch.sql.datasource.DataSourceService; +import org.opensearch.sql.datasource.DataSourceServiceImpl; +import org.opensearch.sql.legacy.metrics.MetricName; +import org.opensearch.sql.legacy.metrics.Metrics; +import org.opensearch.sql.opensearch.security.SecurityAccess; +import org.opensearch.sql.plugin.model.UpdateDataSourceActionRequest; +import org.opensearch.sql.plugin.model.UpdateDataSourceActionResponse; +import org.opensearch.tasks.Task; +import org.opensearch.transport.TransportService; + +public class TransportUpdateDataSourceAction + extends HandledTransportAction { + + private static final Logger LOG = LogManager.getLogger(); + public static final String NAME = "cluster:admin/opensearch/ql/datasources/update"; + public static final ActionType + ACTION_TYPE = new ActionType<>(NAME, UpdateDataSourceActionResponse::new); + + private DataSourceService dataSourceService; + private Client client; + + /** + * TransportUpdateDataSourceAction action for updating datasource. + * + * @param transportService transportService. + * @param actionFilters actionFilters. + * @param client client. + * @param dataSourceService dataSourceService. + */ + @Inject + public TransportUpdateDataSourceAction(TransportService transportService, + ActionFilters actionFilters, + NodeClient client, + DataSourceServiceImpl dataSourceService) { + super(TransportUpdateDataSourceAction.NAME, transportService, actionFilters, + UpdateDataSourceActionRequest::new); + this.dataSourceService = dataSourceService; + this.client = client; + } + + @Override + protected void doExecute(Task task, UpdateDataSourceActionRequest request, + ActionListener actionListener) { + + Metrics.getInstance().getNumericalMetric(MetricName.DATASOURCE_REQ_COUNT).increment(); + dataSourceService.updateDataSource(request.getDataSourceMetadata()); + actionListener.onResponse(new UpdateDataSourceActionResponse("Updated DataSource with name " + + request.getDataSourceMetadata().getName())); + } + +} \ No newline at end of file diff --git a/plugin/src/test/java/org/opensearch/sql/plugin/datasource/OpenSearchDataSourceMetadataStorageTest.java b/plugin/src/test/java/org/opensearch/sql/plugin/datasource/OpenSearchDataSourceMetadataStorageTest.java index 140d4e0edd..471cb7f9db 100644 --- a/plugin/src/test/java/org/opensearch/sql/plugin/datasource/OpenSearchDataSourceMetadataStorageTest.java +++ b/plugin/src/test/java/org/opensearch/sql/plugin/datasource/OpenSearchDataSourceMetadataStorageTest.java @@ -7,10 +7,10 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertThrows; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoInteractions; import static org.mockito.Mockito.when; import static org.opensearch.sql.plugin.datasource.OpenSearchDataSourceMetadataStorage.DATASOURCE_INDEX_NAME; @@ -29,8 +29,12 @@ import org.mockito.Mock; import org.mockito.junit.MockitoJUnitRunner; import org.opensearch.action.ActionFuture; +import org.opensearch.action.DocWriteResponse; import org.opensearch.action.admin.indices.create.CreateIndexResponse; +import org.opensearch.action.delete.DeleteResponse; +import org.opensearch.action.index.IndexResponse; import org.opensearch.action.search.SearchResponse; +import org.opensearch.action.update.UpdateResponse; import org.opensearch.client.Client; import org.opensearch.cluster.service.ClusterService; import org.opensearch.rest.RestStatus; @@ -58,6 +62,18 @@ public class OpenSearchDataSourceMetadataStorageTest { @Mock private ActionFuture createIndexResponseActionFuture; @Mock + private ActionFuture indexResponseActionFuture; + @Mock + private IndexResponse indexResponse; + @Mock + private ActionFuture updateResponseActionFuture; + @Mock + private UpdateResponse updateResponse; + @Mock + private ActionFuture deleteResponseActionFuture; + @Mock + private DeleteResponse deleteResponse; + @Mock private SearchHit searchHit; @InjectMocks private OpenSearchDataSourceMetadataStorage openSearchDataSourceMetadataStorage; @@ -144,6 +160,9 @@ public void testCreateDataSourceMetadata() { .thenReturn(createIndexResponseActionFuture); when(createIndexResponseActionFuture.actionGet()) .thenReturn(new CreateIndexResponse(true, true, DATASOURCE_INDEX_NAME)); + when(client.index(any())).thenReturn(indexResponseActionFuture); + when(indexResponseActionFuture.actionGet()).thenReturn(indexResponse); + when(indexResponse.getResult()).thenReturn(DocWriteResponse.Result.CREATED); DataSourceMetadata dataSourceMetadata = getDataSourceMetadata(); this.openSearchDataSourceMetadataStorage.createDataSourceMetadata(dataSourceMetadata); @@ -159,18 +178,38 @@ public void testCreateDataSourceMetadata() { @Test public void testUpdateDataSourceMetadata() { - assertThrows( - UnsupportedOperationException.class, - () -> openSearchDataSourceMetadataStorage - .updateDataSourceMetadata(new DataSourceMetadata())); + when(clusterService.state().routingTable().hasIndex(DATASOURCE_INDEX_NAME)) + .thenReturn(Boolean.TRUE); + when(encryptor.encrypt("secret_key")).thenReturn("secret_key"); + when(encryptor.encrypt("access_key")).thenReturn("access_key"); + when(client.update(any())).thenReturn(updateResponseActionFuture); + when(updateResponseActionFuture.actionGet()).thenReturn(updateResponse); + when(updateResponse.getResult()).thenReturn(DocWriteResponse.Result.UPDATED); + DataSourceMetadata dataSourceMetadata = getDataSourceMetadata(); + + this.openSearchDataSourceMetadataStorage.updateDataSourceMetadata(dataSourceMetadata); + + verify(encryptor, times(1)).encrypt("secret_key"); + verify(encryptor, times(1)).encrypt("access_key"); + verify(client.admin().indices(), times(0)).create(any()); + verify(client, times(1)).update(any()); + verify(client.threadPool().getThreadContext(), times(1)).stashContext(); + } @Test public void testDeleteDataSourceMetadata() { - assertThrows( - UnsupportedOperationException.class, - () -> openSearchDataSourceMetadataStorage - .deleteDataSourceMetadata(TEST_DATASOURCE_INDEX_NAME)); + when(client.delete(any())).thenReturn(deleteResponseActionFuture); + when(deleteResponseActionFuture.actionGet()).thenReturn(deleteResponse); + when(deleteResponse.getResult()).thenReturn(DocWriteResponse.Result.DELETED); + DataSourceMetadata dataSourceMetadata = getDataSourceMetadata(); + + this.openSearchDataSourceMetadataStorage.deleteDataSourceMetadata("testDS"); + + verifyNoInteractions(encryptor); + verify(client.admin().indices(), times(0)).create(any()); + verify(client, times(1)).delete(any()); + verify(client.threadPool().getThreadContext(), times(1)).stashContext(); } private String getBasicDataSourceMetadataString() throws JsonProcessingException { diff --git a/prometheus/src/main/java/org/opensearch/sql/prometheus/storage/PrometheusStorageFactory.java b/prometheus/src/main/java/org/opensearch/sql/prometheus/storage/PrometheusStorageFactory.java index dbc753f1f5..bb95ca93a2 100644 --- a/prometheus/src/main/java/org/opensearch/sql/prometheus/storage/PrometheusStorageFactory.java +++ b/prometheus/src/main/java/org/opensearch/sql/prometheus/storage/PrometheusStorageFactory.java @@ -39,6 +39,8 @@ public class PrometheusStorageFactory implements DataSourceFactory { public static final String ACCESS_KEY = "prometheus.auth.access_key"; public static final String SECRET_KEY = "prometheus.auth.secret_key"; + private static final Integer MAX_LENGTH_FOR_CONFIG_PROPERTY = 1000; + @Override public DataSourceType getDataSourceType() { return DataSourceType.PROMETHEUS; @@ -52,8 +54,24 @@ public DataSource createDataSource(DataSourceMetadata metadata) { getStorageEngine(metadata.getName(), metadata.getProperties())); } + + private void validateDataSourceConfigProperties(Map dataSourceMetadataConfig) { + if (dataSourceMetadataConfig.get(AUTH_TYPE) != null) { + AuthenticationType authenticationType + = AuthenticationType.get(dataSourceMetadataConfig.get(AUTH_TYPE)); + if (AuthenticationType.BASICAUTH.equals(authenticationType)) { + validateFields(dataSourceMetadataConfig, Set.of(URI, USERNAME, PASSWORD)); + } else if (AuthenticationType.AWSSIGV4AUTH.equals(authenticationType)) { + validateFields(dataSourceMetadataConfig, Set.of(URI, ACCESS_KEY, SECRET_KEY, + REGION)); + } + } else { + validateFields(dataSourceMetadataConfig, Set.of(URI)); + } + } + StorageEngine getStorageEngine(String catalogName, Map requiredConfig) { - validateFieldsInConfig(requiredConfig, Set.of(URI)); + validateDataSourceConfigProperties(requiredConfig); PrometheusClient prometheusClient; prometheusClient = AccessController.doPrivileged((PrivilegedAction) () -> { @@ -76,11 +94,9 @@ private OkHttpClient getHttpClient(Map config) { if (config.get(AUTH_TYPE) != null) { AuthenticationType authenticationType = AuthenticationType.get(config.get(AUTH_TYPE)); if (AuthenticationType.BASICAUTH.equals(authenticationType)) { - validateFieldsInConfig(config, Set.of(USERNAME, PASSWORD)); okHttpClient.addInterceptor(new BasicAuthenticationInterceptor(config.get(USERNAME), config.get(PASSWORD))); } else if (AuthenticationType.AWSSIGV4AUTH.equals(authenticationType)) { - validateFieldsInConfig(config, Set.of(REGION, ACCESS_KEY, SECRET_KEY)); okHttpClient.addInterceptor(new AwsSigningInterceptor( new AWSStaticCredentialsProvider( new BasicAWSCredentials(config.get(ACCESS_KEY), config.get(SECRET_KEY))), @@ -94,17 +110,29 @@ private OkHttpClient getHttpClient(Map config) { return okHttpClient.build(); } - private void validateFieldsInConfig(Map config, Set fields) { + private void validateFields(Map config, Set fields) { Set missingFields = new HashSet<>(); + Set invalidLengthFields = new HashSet<>(); for (String field : fields) { if (!config.containsKey(field)) { missingFields.add(field); + } else if (config.get(field).length() > MAX_LENGTH_FOR_CONFIG_PROPERTY) { + invalidLengthFields.add(field); } } + StringBuilder errorStringBuilder = new StringBuilder(); if (missingFields.size() > 0) { - throw new IllegalArgumentException(String.format( + errorStringBuilder.append(String.format( "Missing %s fields in the Prometheus connector properties.", missingFields)); } + + if (invalidLengthFields.size() > 0) { + errorStringBuilder.append(String.format( + "Fields %s exceeds more than 1000 characters.", invalidLengthFields)); + } + if (errorStringBuilder.length() > 0) { + throw new IllegalArgumentException(errorStringBuilder.toString()); + } } diff --git a/prometheus/src/test/java/org/opensearch/sql/prometheus/storage/PrometheusStorageFactoryTest.java b/prometheus/src/test/java/org/opensearch/sql/prometheus/storage/PrometheusStorageFactoryTest.java index 91cb8df1ea..36f7e5b5f1 100644 --- a/prometheus/src/test/java/org/opensearch/sql/prometheus/storage/PrometheusStorageFactoryTest.java +++ b/prometheus/src/test/java/org/opensearch/sql/prometheus/storage/PrometheusStorageFactoryTest.java @@ -9,6 +9,7 @@ import java.util.HashMap; import lombok.SneakyThrows; +import org.apache.commons.lang3.RandomStringUtils; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; @@ -90,6 +91,24 @@ void testGetStorageEngineWithMissingRegionInAWS() { exception.getMessage()); } + + @Test + @SneakyThrows + void testGetStorageEngineWithLongConfigProperties() { + PrometheusStorageFactory prometheusStorageFactory = new PrometheusStorageFactory(); + HashMap properties = new HashMap<>(); + properties.put("prometheus.uri", RandomStringUtils.random(1001)); + properties.put("prometheus.auth.type", "awssigv4"); + properties.put("prometheus.auth.secret_key", "accessKey"); + properties.put("prometheus.auth.access_key", "secretKey"); + IllegalArgumentException exception = Assertions.assertThrows(IllegalArgumentException.class, + () -> prometheusStorageFactory.getStorageEngine("my_prometheus", properties)); + Assertions.assertEquals("Missing [prometheus.auth.region] fields in the " + + "Prometheus connector properties." + + "Fields [prometheus.uri] exceeds more than 1000 characters.", + exception.getMessage()); + } + @Test @SneakyThrows void testGetStorageEngineWithWrongAuthType() {