From 2c00025875dfef51e9e0fbfbb1166affb2680df4 Mon Sep 17 00:00:00 2001 From: Ryan Bogan Date: Mon, 2 Dec 2024 12:01:11 -0800 Subject: [PATCH] Initial restart upgrade bwc test Signed-off-by: Ryan Bogan --- .../neuralsearch/bwc/HybridSearchIT.java | 30 ++++++++++++++++--- .../processor/RRFProcessorIT.java | 2 +- .../neuralsearch/BaseNeuralSearchIT.java | 4 +-- 3 files changed, 29 insertions(+), 7 deletions(-) diff --git a/qa/restart-upgrade/src/test/java/org/opensearch/neuralsearch/bwc/HybridSearchIT.java b/qa/restart-upgrade/src/test/java/org/opensearch/neuralsearch/bwc/HybridSearchIT.java index fe69c577e..d540e1ef8 100644 --- a/qa/restart-upgrade/src/test/java/org/opensearch/neuralsearch/bwc/HybridSearchIT.java +++ b/qa/restart-upgrade/src/test/java/org/opensearch/neuralsearch/bwc/HybridSearchIT.java @@ -26,8 +26,12 @@ public class HybridSearchIT extends AbstractRestartUpgradeRestTestCase { private static final String PIPELINE_NAME = "nlp-hybrid-pipeline"; private static final String PIPELINE1_NAME = "nlp-hybrid-1-pipeline"; + private static final String PIPELINE_RRF_NAME = "nlp-hybrid-rrf-pipeline"; + private static final String PIPELINE1_RRF_NAME = "nlp-hybrid-rrf-1-pipeline"; private static final String SEARCH_PIPELINE_NAME = "nlp-search-pipeline"; private static final String SEARCH_PIPELINE1_NAME = "nlp-search-1-pipeline"; + private static final String SEARCH_PIPELINE_RRF_NAME = "nlp-search-rrf-pipeline"; + private static final String SEARCH_PIPELINE1_RRF_NAME = "nlp-search-rrf-1-pipeline"; private static final String TEST_FIELD = "passage_text"; private static final String TEXT_1 = "Hello world"; private static final String TEXT_2 = "Hi planet"; @@ -41,17 +45,31 @@ public class HybridSearchIT extends AbstractRestartUpgradeRestTestCase { // Create Text Embedding Processor, Ingestion Pipeline, add document and search pipeline with normalization processor // Validate process , pipeline and document count in restart-upgrade scenario public void testNormalizationProcessor_whenIndexWithMultipleShards_E2EFlow() throws Exception { - validateNormalizationProcessor("processor/IndexMappingMultipleShard.json", PIPELINE_NAME, SEARCH_PIPELINE_NAME); + validateProcessor("processor/IndexMappingMultipleShard.json", PIPELINE_NAME, SEARCH_PIPELINE_NAME, false); } // Test restart-upgrade normalization processor when index with single shard // Create Text Embedding Processor, Ingestion Pipeline, add document and search pipeline with normalization processor // Validate process , pipeline and document count in restart-upgrade scenario public void testNormalizationProcessor_whenIndexWithSingleShard_E2EFlow() throws Exception { - validateNormalizationProcessor("processor/IndexMappingSingleShard.json", PIPELINE1_NAME, SEARCH_PIPELINE1_NAME); + validateProcessor("processor/IndexMappingSingleShard.json", PIPELINE1_NAME, SEARCH_PIPELINE1_NAME, false); } - private void validateNormalizationProcessor(final String fileName, final String pipelineName, final String searchPipelineName) + // Test restart-upgrade rrf processor when index with multiple shards + // Create Text Embedding Processor, Ingestion Pipeline, add document and search pipeline with rrf processor + // Validate process , pipeline and document count in restart-upgrade scenario + public void testRRFProcessor_whenIndexWithMultipleShards_E2EFlow() throws Exception { + validateProcessor("processor/IndexMappingMultipleShard.json", PIPELINE_RRF_NAME, SEARCH_PIPELINE_RRF_NAME, true); + } + + // Test restart-upgrade rrf processor when index with single shard + // Create Text Embedding Processor, Ingestion Pipeline, add document and search pipeline with rrf processor + // Validate process , pipeline and document count in restart-upgrade scenario + public void testRRFProcessor_whenIndexWithSingleShard_E2EFlow() throws Exception { + validateProcessor("processor/IndexMappingSingleShard.json", PIPELINE1_RRF_NAME, SEARCH_PIPELINE1_RRF_NAME, true); + } + + private void validateProcessor(final String fileName, final String pipelineName, final String searchPipelineName, boolean isRRF) throws Exception { waitForClusterHealthGreen(NODES_BWC_CLUSTER); if (isRunningAgainstOldCluster()) { @@ -64,7 +82,11 @@ private void validateNormalizationProcessor(final String fileName, final String pipelineName ); addDocuments(getIndexNameForTest(), true); - createSearchPipeline(searchPipelineName); + if (isRRF) { + createRRFSearchPipeline(searchPipelineName); + } else { + createSearchPipeline(searchPipelineName); + } } else { String modelId = null; try { diff --git a/src/test/java/org/opensearch/neuralsearch/processor/RRFProcessorIT.java b/src/test/java/org/opensearch/neuralsearch/processor/RRFProcessorIT.java index fccabab5c..8609097c6 100644 --- a/src/test/java/org/opensearch/neuralsearch/processor/RRFProcessorIT.java +++ b/src/test/java/org/opensearch/neuralsearch/processor/RRFProcessorIT.java @@ -36,7 +36,7 @@ public void testRRF_whenValidInput_thenSucceed() { Collections.singletonList(new KNNFieldConfig("passage_embedding", RRF_DIMENSION, TEST_SPACE_TYPE)) ); addDocuments(); - createDefaultRRFSearchPipeline(); + createRRFSearchPipeline(RRF_SEARCH_PIPELINE); HybridQueryBuilder hybridQueryBuilder = getHybridQueryBuilder(); diff --git a/src/testFixtures/java/org/opensearch/neuralsearch/BaseNeuralSearchIT.java b/src/testFixtures/java/org/opensearch/neuralsearch/BaseNeuralSearchIT.java index f8021b08e..f48c66d0d 100644 --- a/src/testFixtures/java/org/opensearch/neuralsearch/BaseNeuralSearchIT.java +++ b/src/testFixtures/java/org/opensearch/neuralsearch/BaseNeuralSearchIT.java @@ -1444,7 +1444,7 @@ protected enum ProcessorType { } @SneakyThrows - protected void createDefaultRRFSearchPipeline() { + protected void createRRFSearchPipeline(String pipelineName) { String requestBody = XContentFactory.jsonBuilder() .startObject() .field("description", "Post processor for hybrid search") @@ -1463,7 +1463,7 @@ protected void createDefaultRRFSearchPipeline() { makeRequest( client(), "PUT", - String.format(LOCALE, "/_search/pipeline/%s", RRF_SEARCH_PIPELINE), + String.format(LOCALE, "/_search/pipeline/%s", pipelineName), null, toHttpEntity(String.format(LOCALE, requestBody)), ImmutableList.of(new BasicHeader(HttpHeaders.USER_AGENT, DEFAULT_USER_AGENT))