Skip to content

Commit

Permalink
Addressing Martin Comments
Browse files Browse the repository at this point in the history
Signed-off-by: Varun Jain <[email protected]>
  • Loading branch information
vibrantvarun committed Jan 9, 2024
1 parent 98a9805 commit 26485d7
Show file tree
Hide file tree
Showing 13 changed files with 161 additions and 50 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import org.opensearch.index.query.MatchQueryBuilder;
import static org.opensearch.neuralsearch.TestUtils.getModelId;
Expand All @@ -21,44 +22,97 @@

public class HybridSearchIT extends AbstractRestartUpgradeRestTestCase {
private static final String PIPELINE_NAME = "nlp-hybrid-pipeline";
private static final String PIPELINE1_NAME = "nlp-hybrid-1-pipeline";
private static final String SEARCH_PIPELINE_NAME = "nlp-search-pipeline";
private static final String SEARCH_PIPELINE1_NAME = "nlp-search-1-pipeline";
private static final String TEST_FIELD = "passage_text";
private static final String TEXT = "Hello world";
private static final String TEXT_1 = "Hi planet";
private static final String query = "Hi world";
private static final String TEXT_1 = "Hello world";
private static final String TEXT_2 = "Hi planet";
private static final String TEXT_3 = "Hi earth";
private static final String TEXT_4 = "Hi amazon";
private static final String TEXT_5 = "Hi mars";
private static final String TEXT_6 = "Hi opensearch";
private static final String QUERY = "Hi world";

// Test restart-upgrade Hybrid Search
// Create Text Embedding Processor, Ingestion Pipeline, add document and search pipeline with noramlization processor
// Test restart-upgrade normalization processor when index with multiple shards
// Create Text Embedding Processor, Ingestion Pipeline, add document and search pipeline with normalization processor
// Validate process , pipeline and document count in restart-upgrade scenario
public void testHybridSearch_E2EFlow() throws Exception {
public void testNormalizationProcessor_whenIndexWithMultipleShards_E2EFlow() throws Exception {
waitForClusterHealthGreen(NODES_BWC_CLUSTER);
if (isRunningAgainstOldCluster()) {
String index = getIndexNameForTest();
String modelId = uploadTextEmbeddingModel();
loadModel(modelId);
createPipelineProcessor(modelId, PIPELINE_NAME);
createIndexWithConfiguration(
getIndexNameForTest(),
index,
Files.readString(Path.of(classLoader.getResource("processor/IndexMappings.json").toURI())),
PIPELINE_NAME
);
addDocument(getIndexNameForTest(), "0", TEST_FIELD, TEXT, null, null);
addDocument(index, "0", TEST_FIELD, TEXT_1, null, null);
addDocument(index, "1", TEST_FIELD, TEXT_2, null, null);
addDocument(index, "2", TEST_FIELD, TEXT_3, null, null);
addDocument(index, "3", TEST_FIELD, TEXT_4, null, null);
addDocument(index, "4", TEST_FIELD, TEXT_5, null, null);
createSearchPipeline(
SEARCH_PIPELINE_NAME,
DEFAULT_NORMALIZATION_METHOD,
DEFAULT_COMBINATION_METHOD,
Map.of(PARAM_NAME_WEIGHTS, Arrays.toString(new float[] { 0.3f, 0.7f }))
);
} else {
String index = getIndexNameForTest();
Map<String, Object> pipeline = getIngestionPipeline(PIPELINE_NAME);
assertNotNull(pipeline);
String modelId = getModelId(pipeline, TEXT_EMBEDDING_PROCESSOR);
loadModel(modelId);
addDocument(getIndexNameForTest(), "1", TEST_FIELD, TEXT_1, null, null);
validateTestIndex(modelId);
addDocument(index, "5", TEST_FIELD, TEXT_6, null, null);
validateTestIndex(modelId, index, SEARCH_PIPELINE_NAME);
deleteSearchPipeline(SEARCH_PIPELINE_NAME);
deletePipeline(PIPELINE_NAME);
deleteModel(modelId);
deleteIndex(getIndexNameForTest());
deleteIndex(index);
}
}

// Test restart-upgrade normalization processor when index with single shard
// Create Text Embedding Processor, Ingestion Pipeline, add document and search pipeline with normalization processor
// Validate process , pipeline and document count in restart-upgrade scenario
public void testNormalizationProcessor_whenIndexWithSingleShard_E2EFlow() throws Exception {
waitForClusterHealthGreen(NODES_BWC_CLUSTER);
if (isRunningAgainstOldCluster()) {
String index = getIndexNameForTest() + "1";
String modelId = uploadTextEmbeddingModel();
loadModel(modelId);
createPipelineProcessor(modelId, PIPELINE1_NAME);
createIndexWithConfiguration(
index,
Files.readString(Path.of(classLoader.getResource("processor/Index1Mappings.json").toURI())),
PIPELINE1_NAME
);
addDocument(index, "0", TEST_FIELD, TEXT_1, null, null);
addDocument(index, "1", TEST_FIELD, TEXT_2, null, null);
addDocument(index, "2", TEST_FIELD, TEXT_3, null, null);
addDocument(index, "3", TEST_FIELD, TEXT_4, null, null);
addDocument(index, "4", TEST_FIELD, TEXT_5, null, null);
createSearchPipeline(
SEARCH_PIPELINE1_NAME,
DEFAULT_NORMALIZATION_METHOD,
DEFAULT_COMBINATION_METHOD,
Map.of(PARAM_NAME_WEIGHTS, Arrays.toString(new float[] { 0.3f, 0.7f }))
);
} else {
String index = getIndexNameForTest() + "1";
Map<String, Object> pipeline = getIngestionPipeline(PIPELINE1_NAME);
assertNotNull(pipeline);
String modelId = getModelId(pipeline, TEXT_EMBEDDING_PROCESSOR);
loadModel(modelId);
addDocument(index, "5", TEST_FIELD, TEXT_6, null, null);
validateTestIndex(modelId, index, SEARCH_PIPELINE1_NAME);
deleteSearchPipeline(SEARCH_PIPELINE1_NAME);
deletePipeline(PIPELINE1_NAME);
deleteModel(modelId);
deleteIndex(index);
}
}

Expand All @@ -82,31 +136,28 @@ private void createPipelineProcessor(String modelId, String pipelineName) throws
createPipelineProcessor(requestBody, pipelineName, modelId);
}

private void validateTestIndex(String modelId) throws Exception {
int docCount = getDocCount(getIndexNameForTest());
assertEquals(2, docCount);
loadModel(modelId);
private void validateTestIndex(String modelId, String index, String searchPipeline) throws Exception {
int docCount = getDocCount(index);
assertEquals(6, docCount);
HybridQueryBuilder hybridQueryBuilder = getQueryBuilder(modelId);
Map<String, Object> searchResponseAsMap = search(
getIndexNameForTest(),
hybridQueryBuilder,
null,
1,
Map.of("search_pipeline", SEARCH_PIPELINE_NAME)
);
Map<String, Object> searchResponseAsMap = search(index, hybridQueryBuilder, null, 1, Map.of("search_pipeline", searchPipeline));
assertNotNull(searchResponseAsMap);
int hits = getHitCount(searchResponseAsMap);
assertEquals(1, hits);
List<Double> scoresList = getNormalizationScoreList(searchResponseAsMap);
for (Double score : scoresList) {
assertTrue(0 < score && score < 1);
}
}

public HybridQueryBuilder getQueryBuilder(String modelId) {
NeuralQueryBuilder neuralQueryBuilder = new NeuralQueryBuilder();
neuralQueryBuilder.fieldName("passage_embedding");
neuralQueryBuilder.modelId(modelId);
neuralQueryBuilder.queryText(query);
neuralQueryBuilder.queryText(QUERY);
neuralQueryBuilder.k(5);

MatchQueryBuilder matchQueryBuilder = new MatchQueryBuilder("text", query);
MatchQueryBuilder matchQueryBuilder = new MatchQueryBuilder("text", QUERY);

HybridQueryBuilder hybridQueryBuilder = new HybridQueryBuilder();
hybridQueryBuilder.add(matchQueryBuilder);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,10 @@ public class MultiModalSearchIT extends AbstractRestartUpgradeRestTestCase {
private static final String TEST_IMAGE_TEXT = "/9j/4AAQSkZJRgABAQAASABIAAD";
private static final String TEST_IMAGE_TEXT_1 = "/9j/4AAQSkZJRgbdwoeicfhoid";

// Test restart-upgrade MultiModal Search
// Test restart-upgrade test image embedding processor
// Create Text Image Embedding Processor, Ingestion Pipeline and add document
// Validate process , pipeline and document count in restart-upgrade scenario
public void testMultiModalSearch_E2EFlow() throws Exception {
public void testTextImageEmbeddingProcessor_E2EFlow() throws Exception {
waitForClusterHealthGreen(NODES_BWC_CLUSTER);

if (isRunningAgainstOldCluster()) {
Expand Down Expand Up @@ -54,7 +54,6 @@ public void testMultiModalSearch_E2EFlow() throws Exception {
private void validateTestIndex(String modelId) throws Exception {
int docCount = getDocCount(getIndexNameForTest());
assertEquals(2, docCount);
loadModel(modelId);
NeuralQueryBuilder neuralQueryBuilder = new NeuralQueryBuilder("passage_embedding", TEXT, TEST_IMAGE_TEXT, modelId, 1, null, null);
Map<String, Object> response = search(getIndexNameForTest(), neuralQueryBuilder, 1);
assertNotNull(response);
Expand All @@ -77,7 +76,7 @@ private String registerModelGroupAndGetModelId(String requestBody) throws Except

protected void createPipelineProcessor(String modelId, String pipelineName) throws Exception {
String requestBody = Files.readString(
Path.of(classLoader.getResource("processor/PipelineForTextImagingProcessorConfiguration.json").toURI())
Path.of(classLoader.getResource("processor/PipelineForTextImageProcessorConfiguration.json").toURI())
);
createPipelineProcessor(requestBody, pipelineName, modelId);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,14 @@
public class NeuralSparseSearchIT extends AbstractRestartUpgradeRestTestCase {
private static final String PIPELINE_NAME = "nlp-ingest-pipeline-sparse";
private static final String TEST_FIELD = "passage_text";
private static final String TEXT = "Hello world";
private static final String TEXT_1 = "Hi planet";
private static final String query = "Hi world";
private static final String TEXT_1 = "Hello world";
private static final String TEXT_2 = "Hi planet";
private static final String QUERY = "Hi world";

public void testNeuralSparseSearch_E2EFlow() throws Exception {
// Test restart-upgrade test sparse embedding processor
// Create Sparse Encoding Processor, Ingestion Pipeline and add document
// Validate process , pipeline and document count in restart-upgrade scenario
public void testSparseEncodingProcessor_E2EFlow() throws Exception {
waitForClusterHealthGreen(NODES_BWC_CLUSTER);
if (isRunningAgainstOldCluster()) {
String modelId = uploadTextEmbeddingModel();
Expand All @@ -31,13 +34,13 @@ public void testNeuralSparseSearch_E2EFlow() throws Exception {
Files.readString(Path.of(classLoader.getResource("processor/SparseIndexMappings.json").toURI())),
PIPELINE_NAME
);
addDocument(getIndexNameForTest(), "0", TEST_FIELD, TEXT, null, null);
addDocument(getIndexNameForTest(), "0", TEST_FIELD, TEXT_1, null, null);
} else {
Map<String, Object> pipeline = getIngestionPipeline(PIPELINE_NAME);
assertNotNull(pipeline);
String modelId = TestUtils.getModelId(pipeline, SPARSE_ENCODING_PROCESSOR);
loadModel(modelId);
addDocument(getIndexNameForTest(), "1", TEST_FIELD, TEXT_1, null, null);
addDocument(getIndexNameForTest(), "1", TEST_FIELD, TEXT_2, null, null);
validateTestIndex(modelId);
deletePipeline(PIPELINE_NAME);
deleteModel(modelId);
Expand All @@ -49,10 +52,9 @@ public void testNeuralSparseSearch_E2EFlow() throws Exception {
private void validateTestIndex(String modelId) throws Exception {
int docCount = getDocCount(getIndexNameForTest());
assertEquals(2, docCount);
loadModel(modelId);
NeuralSparseQueryBuilder neuralSparseQueryBuilder = new NeuralSparseQueryBuilder();
neuralSparseQueryBuilder.fieldName("passage_embedding");
neuralSparseQueryBuilder.queryText(query);
neuralSparseQueryBuilder.queryText(QUERY);
neuralSparseQueryBuilder.modelId(modelId);
Map<String, Object> response = search(getIndexNameForTest(), neuralSparseQueryBuilder, 1);
assertNotNull(response);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@ public void testTextEmbeddingProcessor_E2EFlow() throws Exception {
private void validateTestIndex(String modelId) throws Exception {
int docCount = getDocCount(getIndexNameForTest());
assertEquals(2, docCount);
loadModel(modelId);
NeuralQueryBuilder neuralQueryBuilder = new NeuralQueryBuilder();
neuralQueryBuilder.fieldName("passage_embedding");
neuralQueryBuilder.modelId(modelId);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
{
"settings": {
"index": {
"knn": true,
"knn.algo_param.ef_search": 100,
"refresh_interval": "30s",
"default_pipeline": "%s"
},
"number_of_shards": 1,
"number_of_replicas": 0
},
"mappings": {
"properties": {
"passage_embedding": {
"type": "knn_vector",
"dimension": 768,
"method": {
"name": "hnsw",
"space_type": "l2",
"engine": "lucene",
"parameters": {
"ef_construction": 128,
"m": 24
}
}
},
"passage_text": {
"type": "text"
},
"passage_image": {
"type": "binary"
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
"refresh_interval": "30s",
"default_pipeline": "%s"
},
"number_of_shards": 6,
"number_of_shards": 3,
"number_of_replicas": 1
},
"mappings": {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,13 @@ public class HybridSearchIT extends AbstractRollingUpgradeTestCase {
private static final String TEXT = "Hello world";
private static final String TEXT_MIXED = "Hi planet";
private static final String TEXT_UPGRADED = "Hi earth";
private static final String query = "Hi world";
private static final String QUERY = "Hi world";
private static final int NUM_DOCS_PER_ROUND = 1;

// Test rolling-upgrade Hybrid Search
// Create Text Embedding Processor, Ingestion Pipeline, add document and search pipeline with noramlization processor
// Create Text Embedding Processor, Ingestion Pipeline, add document and search pipeline with normalization processor
// Validate process , pipeline and document count in rolling-upgrade scenario
public void testHybridSearch_E2EFlow() throws Exception {
public void testNormalizationProcessor_E2EFlow() throws Exception {
waitForClusterHealthGreen(NODES_BWC_CLUSTER);
switch (getClusterType()) {
case OLD:
Expand Down Expand Up @@ -118,10 +118,10 @@ public HybridQueryBuilder getQueryBuilder(String modelId) {
NeuralQueryBuilder neuralQueryBuilder = new NeuralQueryBuilder();
neuralQueryBuilder.fieldName("passage_embedding");
neuralQueryBuilder.modelId(modelId);
neuralQueryBuilder.queryText(query);
neuralQueryBuilder.queryText(QUERY);
neuralQueryBuilder.k(5);

MatchQueryBuilder matchQueryBuilder = new MatchQueryBuilder("text", query);
MatchQueryBuilder matchQueryBuilder = new MatchQueryBuilder("text", QUERY);

HybridQueryBuilder hybridQueryBuilder = new HybridQueryBuilder();
hybridQueryBuilder.add(matchQueryBuilder);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,10 @@ public class MultiModalSearchIT extends AbstractRollingUpgradeTestCase {

private static final int NUM_DOCS_PER_ROUND = 1;

// Test rolling-upgrade MultiModal Search
// Test rolling-upgrade test image embedding processor
// Create Text Image Embedding Processor, Ingestion Pipeline and add document
// Validate process , pipeline and document count in rolling-upgrade scenario
public void testMultiModalSearch_E2EFlow() throws Exception {
public void testTextImageEmbeddingProcessor_E2EFlow() throws Exception {
waitForClusterHealthGreen(NODES_BWC_CLUSTER);
switch (getClusterType()) {
case OLD:
Expand Down Expand Up @@ -94,7 +94,7 @@ private String registerModelGroupAndGetModelId(String requestBody) throws Except

protected void createPipelineProcessor(String modelId, String pipelineName) throws Exception {
String requestBody = Files.readString(
Path.of(classLoader.getResource("processor/PipelineForTextImagingProcessorConfiguration.json").toURI())
Path.of(classLoader.getResource("processor/PipelineForTextImageProcessorConfiguration.json").toURI())
);
createPipelineProcessor(requestBody, pipelineName, modelId);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,12 @@ public class NeuralSparseSearchIT extends AbstractRollingUpgradeTestCase {
private static final String TEXT_MIXED = "Hi planet";
private static final String TEXT_UPGRADED = "Hi earth";
private static final int NUM_DOCS_PER_ROUND = 1;
private static final String query = "Hi world";
private static final String QUERY = "Hi world";

public void testNeuralSparseSearch_E2EFlow() throws Exception {
// Test rolling-upgrade test sparse embedding processor
// Create Sparse Encoding Processor, Ingestion Pipeline and add document
// Validate process , pipeline and document count in restart-upgrade scenario
public void testSparseEncodingProcessor_E2EFlow() throws Exception {
waitForClusterHealthGreen(NODES_BWC_CLUSTER);
switch (getClusterType()) {
case OLD:
Expand Down Expand Up @@ -67,7 +70,7 @@ private void validateTestIndexOnUpgrade(int numberOfDocs, String modelId) throws
loadModel(modelId);
NeuralSparseQueryBuilder neuralSparseQueryBuilder = new NeuralSparseQueryBuilder();
neuralSparseQueryBuilder.fieldName("passage_embedding");
neuralSparseQueryBuilder.queryText(query);
neuralSparseQueryBuilder.queryText(QUERY);
neuralSparseQueryBuilder.modelId(modelId);
Map<String, Object> response = search(getIndexNameForTest(), neuralSparseQueryBuilder, 1);
assertNotNull(response);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
"refresh_interval": "30s",
"default_pipeline": "%s"
},
"number_of_shards": 6,
"number_of_shards": 3,
"number_of_replicas": 1
},
"mappings": {
Expand All @@ -26,6 +26,9 @@
},
"passage_text": {
"type": "text"
},
"passage_image": {
"type": "binary"
}
}
}
Expand Down
Loading

0 comments on commit 26485d7

Please sign in to comment.