From a5c6b314fa023be69a2321b75a75859c7de29ae5 Mon Sep 17 00:00:00 2001 From: Suraj Singh Date: Tue, 29 Aug 2023 19:01:55 -0700 Subject: [PATCH 1/5] Add _primary preference only for segment replication enabled indices Signed-off-by: Suraj Singh --- .../storage/OpenSearchDataSourceMetadataStorage.java | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/datasources/src/main/java/org/opensearch/sql/datasources/storage/OpenSearchDataSourceMetadataStorage.java b/datasources/src/main/java/org/opensearch/sql/datasources/storage/OpenSearchDataSourceMetadataStorage.java index 3de924dcbb..dfe16aba6e 100644 --- a/datasources/src/main/java/org/opensearch/sql/datasources/storage/OpenSearchDataSourceMetadataStorage.java +++ b/datasources/src/main/java/org/opensearch/sql/datasources/storage/OpenSearchDataSourceMetadataStorage.java @@ -32,6 +32,7 @@ import org.opensearch.action.update.UpdateRequest; import org.opensearch.action.update.UpdateResponse; import org.opensearch.client.Client; +import org.opensearch.cluster.ClusterState; import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.action.ActionFuture; import org.opensearch.common.util.concurrent.ThreadContext; @@ -84,7 +85,7 @@ public List getDataSourceMetadata() { createDataSourcesIndex(); return Collections.emptyList(); } - return searchInDataSourcesIndex(QueryBuilders.matchAllQuery()); + return searchInDataSourcesIndex(this.clusterService.state(), QueryBuilders.matchAllQuery()); } @Override @@ -93,7 +94,7 @@ public Optional getDataSourceMetadata(String datasourceName) createDataSourcesIndex(); return Optional.empty(); } - return searchInDataSourcesIndex(QueryBuilders.termQuery("name", datasourceName)).stream() + return searchInDataSourcesIndex(this.clusterService.state(), QueryBuilders.termQuery("name", datasourceName)).stream() .findFirst() .map(x -> this.encryptDecryptAuthenticationData(x, false)); } @@ -217,13 +218,17 @@ private void createDataSourcesIndex() { } } - private List searchInDataSourcesIndex(QueryBuilder query) { + private List searchInDataSourcesIndex(ClusterState state, QueryBuilder query) { SearchRequest searchRequest = new SearchRequest(); searchRequest.indices(DATASOURCE_INDEX_NAME); SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); searchSourceBuilder.query(query); searchSourceBuilder.size(DATASOURCE_QUERY_RESULT_SIZE); searchRequest.source(searchSourceBuilder); + if (state.isSegmentReplicationEnabled(DATASOURCE_INDEX_NAME)) { + // https://github.com/opensearch-project/sql/issues/1801. + searchRequest.preference("_primary"); + } ActionFuture searchResponseActionFuture; try (ThreadContext.StoredContext ignored = client.threadPool().getThreadContext().stashContext()) { From 83df30447d750e5469a8a776ec9d090968b66a3c Mon Sep 17 00:00:00 2001 From: Suraj Singh Date: Wed, 30 Aug 2023 09:41:13 -0700 Subject: [PATCH 2/5] Add test for segment replication tests Signed-off-by: Suraj Singh --- ...enSearchDataSourceMetadataStorageTest.java | 34 +++++++++++++++++++ 1 file changed, 34 insertions(+) diff --git a/datasources/src/test/java/org/opensearch/sql/datasources/storage/OpenSearchDataSourceMetadataStorageTest.java b/datasources/src/test/java/org/opensearch/sql/datasources/storage/OpenSearchDataSourceMetadataStorageTest.java index 7d41737b2d..7c0b517e4b 100644 --- a/datasources/src/test/java/org/opensearch/sql/datasources/storage/OpenSearchDataSourceMetadataStorageTest.java +++ b/datasources/src/test/java/org/opensearch/sql/datasources/storage/OpenSearchDataSourceMetadataStorageTest.java @@ -32,6 +32,7 @@ import org.opensearch.action.search.SearchResponse; import org.opensearch.action.update.UpdateResponse; import org.opensearch.client.Client; +import org.opensearch.cluster.ClusterState; import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.action.ActionFuture; import org.opensearch.core.index.shard.ShardId; @@ -103,6 +104,39 @@ public void testGetDataSourceMetadata() { "basicauth", dataSourceMetadata.getProperties().get("prometheus.auth.type")); } + @SneakyThrows + @Test + public void testGetDataSourceMetadataWithSegRepEnabled() { + Mockito.when(clusterService.state().routingTable().hasIndex(DATASOURCE_INDEX_NAME)) + .thenReturn(true); + Mockito.when(clusterService.state().isSegmentReplicationEnabled(DATASOURCE_INDEX_NAME)) + .thenReturn(true); + Mockito.when(client.search(ArgumentMatchers.any())).thenReturn(searchResponseActionFuture); + Mockito.when(searchResponseActionFuture.actionGet()).thenReturn(searchResponse); + Mockito.when(searchResponse.status()).thenReturn(RestStatus.OK); + Mockito.when(searchResponse.getHits()) + .thenReturn( + new SearchHits( + new SearchHit[] {searchHit}, new TotalHits(21, TotalHits.Relation.EQUAL_TO), 1.0F)); + Mockito.when(searchHit.getSourceAsString()).thenReturn(getBasicDataSourceMetadataString()); + Mockito.when(encryptor.decrypt("password")).thenReturn("password"); + Mockito.when(encryptor.decrypt("username")).thenReturn("username"); + + Optional dataSourceMetadataOptional = + openSearchDataSourceMetadataStorage.getDataSourceMetadata(TEST_DATASOURCE_INDEX_NAME); + + Assertions.assertFalse(dataSourceMetadataOptional.isEmpty()); + DataSourceMetadata dataSourceMetadata = dataSourceMetadataOptional.get(); + Assertions.assertEquals(TEST_DATASOURCE_INDEX_NAME, dataSourceMetadata.getName()); + Assertions.assertEquals(DataSourceType.PROMETHEUS, dataSourceMetadata.getConnector()); + Assertions.assertEquals( + "password", dataSourceMetadata.getProperties().get("prometheus.auth.password")); + Assertions.assertEquals( + "username", dataSourceMetadata.getProperties().get("prometheus.auth.username")); + Assertions.assertEquals( + "basicauth", dataSourceMetadata.getProperties().get("prometheus.auth.type")); + } + @SneakyThrows @Test public void testGetDataSourceMetadataWith404SearchResponse() { From 02825b3f886738d3d683e05807c4122682b8a9cb Mon Sep 17 00:00:00 2001 From: Suraj Singh Date: Wed, 30 Aug 2023 13:15:14 -0700 Subject: [PATCH 3/5] Remove isSegRepEnabled check and use _primary_first preference Signed-off-by: Suraj Singh --- .../OpenSearchDataSourceMetadataStorage.java | 6 ++-- ...enSearchDataSourceMetadataStorageTest.java | 33 ------------------- 2 files changed, 2 insertions(+), 37 deletions(-) diff --git a/datasources/src/main/java/org/opensearch/sql/datasources/storage/OpenSearchDataSourceMetadataStorage.java b/datasources/src/main/java/org/opensearch/sql/datasources/storage/OpenSearchDataSourceMetadataStorage.java index dfe16aba6e..fb56e47233 100644 --- a/datasources/src/main/java/org/opensearch/sql/datasources/storage/OpenSearchDataSourceMetadataStorage.java +++ b/datasources/src/main/java/org/opensearch/sql/datasources/storage/OpenSearchDataSourceMetadataStorage.java @@ -225,10 +225,8 @@ private List searchInDataSourcesIndex(ClusterState state, Qu searchSourceBuilder.query(query); searchSourceBuilder.size(DATASOURCE_QUERY_RESULT_SIZE); searchRequest.source(searchSourceBuilder); - if (state.isSegmentReplicationEnabled(DATASOURCE_INDEX_NAME)) { - // https://github.com/opensearch-project/sql/issues/1801. - searchRequest.preference("_primary"); - } + // https://github.com/opensearch-project/sql/issues/1801. + searchRequest.preference("_primary_first"); ActionFuture searchResponseActionFuture; try (ThreadContext.StoredContext ignored = client.threadPool().getThreadContext().stashContext()) { diff --git a/datasources/src/test/java/org/opensearch/sql/datasources/storage/OpenSearchDataSourceMetadataStorageTest.java b/datasources/src/test/java/org/opensearch/sql/datasources/storage/OpenSearchDataSourceMetadataStorageTest.java index 7c0b517e4b..7bca9d201c 100644 --- a/datasources/src/test/java/org/opensearch/sql/datasources/storage/OpenSearchDataSourceMetadataStorageTest.java +++ b/datasources/src/test/java/org/opensearch/sql/datasources/storage/OpenSearchDataSourceMetadataStorageTest.java @@ -104,39 +104,6 @@ public void testGetDataSourceMetadata() { "basicauth", dataSourceMetadata.getProperties().get("prometheus.auth.type")); } - @SneakyThrows - @Test - public void testGetDataSourceMetadataWithSegRepEnabled() { - Mockito.when(clusterService.state().routingTable().hasIndex(DATASOURCE_INDEX_NAME)) - .thenReturn(true); - Mockito.when(clusterService.state().isSegmentReplicationEnabled(DATASOURCE_INDEX_NAME)) - .thenReturn(true); - Mockito.when(client.search(ArgumentMatchers.any())).thenReturn(searchResponseActionFuture); - Mockito.when(searchResponseActionFuture.actionGet()).thenReturn(searchResponse); - Mockito.when(searchResponse.status()).thenReturn(RestStatus.OK); - Mockito.when(searchResponse.getHits()) - .thenReturn( - new SearchHits( - new SearchHit[] {searchHit}, new TotalHits(21, TotalHits.Relation.EQUAL_TO), 1.0F)); - Mockito.when(searchHit.getSourceAsString()).thenReturn(getBasicDataSourceMetadataString()); - Mockito.when(encryptor.decrypt("password")).thenReturn("password"); - Mockito.when(encryptor.decrypt("username")).thenReturn("username"); - - Optional dataSourceMetadataOptional = - openSearchDataSourceMetadataStorage.getDataSourceMetadata(TEST_DATASOURCE_INDEX_NAME); - - Assertions.assertFalse(dataSourceMetadataOptional.isEmpty()); - DataSourceMetadata dataSourceMetadata = dataSourceMetadataOptional.get(); - Assertions.assertEquals(TEST_DATASOURCE_INDEX_NAME, dataSourceMetadata.getName()); - Assertions.assertEquals(DataSourceType.PROMETHEUS, dataSourceMetadata.getConnector()); - Assertions.assertEquals( - "password", dataSourceMetadata.getProperties().get("prometheus.auth.password")); - Assertions.assertEquals( - "username", dataSourceMetadata.getProperties().get("prometheus.auth.username")); - Assertions.assertEquals( - "basicauth", dataSourceMetadata.getProperties().get("prometheus.auth.type")); - } - @SneakyThrows @Test public void testGetDataSourceMetadataWith404SearchResponse() { From 1980ddaea65d6900b54b2a449ca2dccaa1fd695a Mon Sep 17 00:00:00 2001 From: Suraj Singh Date: Wed, 30 Aug 2023 13:18:15 -0700 Subject: [PATCH 4/5] Remove unused ClusterState reference Signed-off-by: Suraj Singh --- .../storage/OpenSearchDataSourceMetadataStorage.java | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/datasources/src/main/java/org/opensearch/sql/datasources/storage/OpenSearchDataSourceMetadataStorage.java b/datasources/src/main/java/org/opensearch/sql/datasources/storage/OpenSearchDataSourceMetadataStorage.java index fb56e47233..5f5e087ce0 100644 --- a/datasources/src/main/java/org/opensearch/sql/datasources/storage/OpenSearchDataSourceMetadataStorage.java +++ b/datasources/src/main/java/org/opensearch/sql/datasources/storage/OpenSearchDataSourceMetadataStorage.java @@ -32,7 +32,6 @@ import org.opensearch.action.update.UpdateRequest; import org.opensearch.action.update.UpdateResponse; import org.opensearch.client.Client; -import org.opensearch.cluster.ClusterState; import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.action.ActionFuture; import org.opensearch.common.util.concurrent.ThreadContext; @@ -85,7 +84,7 @@ public List getDataSourceMetadata() { createDataSourcesIndex(); return Collections.emptyList(); } - return searchInDataSourcesIndex(this.clusterService.state(), QueryBuilders.matchAllQuery()); + return searchInDataSourcesIndex(QueryBuilders.matchAllQuery()); } @Override @@ -94,7 +93,7 @@ public Optional getDataSourceMetadata(String datasourceName) createDataSourcesIndex(); return Optional.empty(); } - return searchInDataSourcesIndex(this.clusterService.state(), QueryBuilders.termQuery("name", datasourceName)).stream() + return searchInDataSourcesIndex(QueryBuilders.termQuery("name", datasourceName)).stream() .findFirst() .map(x -> this.encryptDecryptAuthenticationData(x, false)); } @@ -218,7 +217,7 @@ private void createDataSourcesIndex() { } } - private List searchInDataSourcesIndex(ClusterState state, QueryBuilder query) { + private List searchInDataSourcesIndex(QueryBuilder query) { SearchRequest searchRequest = new SearchRequest(); searchRequest.indices(DATASOURCE_INDEX_NAME); SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); From f9e6ee0bd05f847c876679dae289aac9356df563 Mon Sep 17 00:00:00 2001 From: Suraj Singh Date: Wed, 30 Aug 2023 13:19:10 -0700 Subject: [PATCH 5/5] Self review Signed-off-by: Suraj Singh --- .../storage/OpenSearchDataSourceMetadataStorageTest.java | 1 - 1 file changed, 1 deletion(-) diff --git a/datasources/src/test/java/org/opensearch/sql/datasources/storage/OpenSearchDataSourceMetadataStorageTest.java b/datasources/src/test/java/org/opensearch/sql/datasources/storage/OpenSearchDataSourceMetadataStorageTest.java index 7bca9d201c..7d41737b2d 100644 --- a/datasources/src/test/java/org/opensearch/sql/datasources/storage/OpenSearchDataSourceMetadataStorageTest.java +++ b/datasources/src/test/java/org/opensearch/sql/datasources/storage/OpenSearchDataSourceMetadataStorageTest.java @@ -32,7 +32,6 @@ import org.opensearch.action.search.SearchResponse; import org.opensearch.action.update.UpdateResponse; import org.opensearch.client.Client; -import org.opensearch.cluster.ClusterState; import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.action.ActionFuture; import org.opensearch.core.index.shard.ShardId;