Skip to content

Commit

Permalink
Initial restart upgrade bwc test
Browse files Browse the repository at this point in the history
Signed-off-by: Ryan Bogan <[email protected]>
  • Loading branch information
ryanbogan committed Dec 2, 2024
1 parent 5033a31 commit 2c00025
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,12 @@
public class HybridSearchIT extends AbstractRestartUpgradeRestTestCase {
private static final String PIPELINE_NAME = "nlp-hybrid-pipeline";
private static final String PIPELINE1_NAME = "nlp-hybrid-1-pipeline";
private static final String PIPELINE_RRF_NAME = "nlp-hybrid-rrf-pipeline";
private static final String PIPELINE1_RRF_NAME = "nlp-hybrid-rrf-1-pipeline";
private static final String SEARCH_PIPELINE_NAME = "nlp-search-pipeline";
private static final String SEARCH_PIPELINE1_NAME = "nlp-search-1-pipeline";
private static final String SEARCH_PIPELINE_RRF_NAME = "nlp-search-rrf-pipeline";
private static final String SEARCH_PIPELINE1_RRF_NAME = "nlp-search-rrf-1-pipeline";
private static final String TEST_FIELD = "passage_text";
private static final String TEXT_1 = "Hello world";
private static final String TEXT_2 = "Hi planet";
Expand All @@ -41,17 +45,31 @@ public class HybridSearchIT extends AbstractRestartUpgradeRestTestCase {
// Create Text Embedding Processor, Ingestion Pipeline, add document and search pipeline with normalization processor
// Validate process , pipeline and document count in restart-upgrade scenario
public void testNormalizationProcessor_whenIndexWithMultipleShards_E2EFlow() throws Exception {
validateNormalizationProcessor("processor/IndexMappingMultipleShard.json", PIPELINE_NAME, SEARCH_PIPELINE_NAME);
validateProcessor("processor/IndexMappingMultipleShard.json", PIPELINE_NAME, SEARCH_PIPELINE_NAME, false);
}

// Test restart-upgrade normalization processor when index with single shard
// Create Text Embedding Processor, Ingestion Pipeline, add document and search pipeline with normalization processor
// Validate process , pipeline and document count in restart-upgrade scenario
public void testNormalizationProcessor_whenIndexWithSingleShard_E2EFlow() throws Exception {
validateNormalizationProcessor("processor/IndexMappingSingleShard.json", PIPELINE1_NAME, SEARCH_PIPELINE1_NAME);
validateProcessor("processor/IndexMappingSingleShard.json", PIPELINE1_NAME, SEARCH_PIPELINE1_NAME, false);
}

private void validateNormalizationProcessor(final String fileName, final String pipelineName, final String searchPipelineName)
// Test restart-upgrade rrf processor when index with multiple shards
// Create Text Embedding Processor, Ingestion Pipeline, add document and search pipeline with rrf processor
// Validate process , pipeline and document count in restart-upgrade scenario
public void testRRFProcessor_whenIndexWithMultipleShards_E2EFlow() throws Exception {
validateProcessor("processor/IndexMappingMultipleShard.json", PIPELINE_RRF_NAME, SEARCH_PIPELINE_RRF_NAME, true);
}

// Test restart-upgrade rrf processor when index with single shard
// Create Text Embedding Processor, Ingestion Pipeline, add document and search pipeline with rrf processor
// Validate process , pipeline and document count in restart-upgrade scenario
public void testRRFProcessor_whenIndexWithSingleShard_E2EFlow() throws Exception {
validateProcessor("processor/IndexMappingSingleShard.json", PIPELINE1_RRF_NAME, SEARCH_PIPELINE1_RRF_NAME, true);
}

private void validateProcessor(final String fileName, final String pipelineName, final String searchPipelineName, boolean isRRF)
throws Exception {
waitForClusterHealthGreen(NODES_BWC_CLUSTER);
if (isRunningAgainstOldCluster()) {
Expand All @@ -64,7 +82,11 @@ private void validateNormalizationProcessor(final String fileName, final String
pipelineName
);
addDocuments(getIndexNameForTest(), true);
createSearchPipeline(searchPipelineName);
if (isRRF) {
createRRFSearchPipeline(searchPipelineName);
} else {
createSearchPipeline(searchPipelineName);
}
} else {
String modelId = null;
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ public void testRRF_whenValidInput_thenSucceed() {
Collections.singletonList(new KNNFieldConfig("passage_embedding", RRF_DIMENSION, TEST_SPACE_TYPE))
);
addDocuments();
createDefaultRRFSearchPipeline();
createRRFSearchPipeline(RRF_SEARCH_PIPELINE);

HybridQueryBuilder hybridQueryBuilder = getHybridQueryBuilder();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1444,7 +1444,7 @@ protected enum ProcessorType {
}

@SneakyThrows
protected void createDefaultRRFSearchPipeline() {
protected void createRRFSearchPipeline(String pipelineName) {
String requestBody = XContentFactory.jsonBuilder()
.startObject()
.field("description", "Post processor for hybrid search")
Expand All @@ -1463,7 +1463,7 @@ protected void createDefaultRRFSearchPipeline() {
makeRequest(
client(),
"PUT",
String.format(LOCALE, "/_search/pipeline/%s", RRF_SEARCH_PIPELINE),
String.format(LOCALE, "/_search/pipeline/%s", pipelineName),
null,
toHttpEntity(String.format(LOCALE, requestBody)),
ImmutableList.of(new BasicHeader(HttpHeaders.USER_AGENT, DEFAULT_USER_AGENT))
Expand Down

0 comments on commit 2c00025

Please sign in to comment.