diff --git a/CHANGELOG.md b/CHANGELOG.md index 9a3a152ee..a4a4fa9f8 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -18,6 +18,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), ### Enhancements ### Bug Fixes ### Infrastructure +- Update batch related tests to use batch_size in processor & refactor BWC version check ([#852](https://github.com/opensearch-project/neural-search/pull/852)) ### Documentation ### Maintenance ### Refactoring diff --git a/qa/restart-upgrade/build.gradle b/qa/restart-upgrade/build.gradle index fe3db254c..0163c36c1 100644 --- a/qa/restart-upgrade/build.gradle +++ b/qa/restart-upgrade/build.gradle @@ -54,6 +54,13 @@ testClusters { } } +def versionsBelow2_11 = ["2.9", "2.10"] +def versionsBelow2_12 = versionsBelow2_11 + "2.11" +def versionsBelow2_13 = versionsBelow2_12 + "2.12" +def versionsBelow2_14 = versionsBelow2_13 + "2.13" +def versionsBelow2_15 = versionsBelow2_14 + "2.14" +def versionsBelow2_16 = versionsBelow2_15 + "2.15" + // Task to run BWC tests against the old cluster task testAgainstOldCluster(type: StandaloneRestIntegTestTask) { if(!ext.bwcBundleTest){ @@ -67,7 +74,7 @@ task testAgainstOldCluster(type: StandaloneRestIntegTestTask) { // Excluding MultiModalSearchIT, HybridSearchIT, NeuralSparseSearchIT, NeuralQueryEnricherProcessorIT tests from neural search version 2.9 and 2.10 // because these features were released in 2.11 version. - if (ext.neural_search_bwc_version.startsWith("2.9") || ext.neural_search_bwc_version.startsWith("2.10")){ + if (versionsBelow2_11.any { ext.neural_search_bwc_version.startsWith(it) }){ filter { excludeTestsMatching "org.opensearch.neuralsearch.bwc.MultiModalSearchIT.*" excludeTestsMatching "org.opensearch.neuralsearch.bwc.HybridSearchIT.*" @@ -76,35 +83,35 @@ task testAgainstOldCluster(type: StandaloneRestIntegTestTask) { } } - // Excluding the test because we introduce this feature in 2.13 - if (ext.neural_search_bwc_version.startsWith("2.11") || ext.neural_search_bwc_version.startsWith("2.12")){ + // Excluding the these tests because we introduce them in 2.13 + if (versionsBelow2_13.any { ext.neural_search_bwc_version.startsWith(it) }){ filter { excludeTestsMatching "org.opensearch.neuralsearch.bwc.NeuralQueryEnricherProcessorIT.testNeuralQueryEnricherProcessor_NeuralSparseSearch_E2EFlow" - } - } - - // Excluding the text chunking processor test because we introduce this feature in 2.13 - if (ext.neural_search_bwc_version.startsWith("2.9") || ext.neural_search_bwc_version.startsWith("2.10") || ext.neural_search_bwc_version.startsWith("2.11") || ext.neural_search_bwc_version.startsWith("2.12")){ - filter { excludeTestsMatching "org.opensearch.neuralsearch.bwc.TextChunkingProcessorIT.*" } } - // Excluding the k-NN radial search tests and batch ingestion tests because we introduce these features in 2.14 - if (ext.neural_search_bwc_version.startsWith("2.9") || ext.neural_search_bwc_version.startsWith("2.10") || ext.neural_search_bwc_version.startsWith("2.11") || ext.neural_search_bwc_version.startsWith("2.12") || ext.neural_search_bwc_version.startsWith("2.13")){ + // Excluding the k-NN radial search tests because we introduce this feature in 2.14 + if (versionsBelow2_14.any { ext.neural_search_bwc_version.startsWith(it) }){ filter { excludeTestsMatching "org.opensearch.neuralsearch.bwc.KnnRadialSearchIT.*" - excludeTestsMatching "org.opensearch.neuralsearch.bwc.BatchIngestionIT.*" } } // Excluding the NeuralSparseQuery two-phase search pipeline tests because we introduce this feature in 2.15 - if (ext.neural_search_bwc_version.startsWith("2.9") || ext.neural_search_bwc_version.startsWith("2.10") || ext.neural_search_bwc_version.startsWith("2.11") || ext.neural_search_bwc_version.startsWith("2.12") || ext.neural_search_bwc_version.startsWith("2.13") || ext.neural_search_bwc_version.startsWith("2.14")){ + if (versionsBelow2_15.any { ext.neural_search_bwc_version.startsWith(it) }){ filter { excludeTestsMatching "org.opensearch.neuralsearch.bwc.NeuralSparseTwoPhaseProcessorIT.*" } } + // Excluding the batching processor tests because we introduce this feature in 2.16 + if (versionsBelow2_16.any { ext.neural_search_bwc_version.startsWith(it) }){ + filter { + excludeTestsMatching "org.opensearch.neuralsearch.bwc.BatchIngestionIT.*" + } + } + nonInputProperties.systemProperty('tests.rest.cluster', "${-> testClusters."${baseName}".allHttpSocketURI.join(",")}") nonInputProperties.systemProperty('tests.clustername', "${-> testClusters."${baseName}".getName()}") systemProperty 'tests.security.manager', 'false' @@ -131,7 +138,7 @@ task testAgainstNewCluster(type: StandaloneRestIntegTestTask) { // Excluding MultiModalSearchIT, HybridSearchIT, NeuralSparseSearchIT, NeuralQueryEnricherProcessorIT tests from neural search version 2.9 and 2.10 // because these features were released in 2.11 version. - if (ext.neural_search_bwc_version.startsWith("2.9") || ext.neural_search_bwc_version.startsWith("2.10")){ + if (versionsBelow2_11.any { ext.neural_search_bwc_version.startsWith(it) }){ filter { excludeTestsMatching "org.opensearch.neuralsearch.bwc.MultiModalSearchIT.*" excludeTestsMatching "org.opensearch.neuralsearch.bwc.HybridSearchIT.*" @@ -140,35 +147,35 @@ task testAgainstNewCluster(type: StandaloneRestIntegTestTask) { } } - // Excluding the test because we introduce this feature in 2.13 - if (ext.neural_search_bwc_version.startsWith("2.11") || ext.neural_search_bwc_version.startsWith("2.12")){ + // Excluding these tests because we introduce them in 2.13 + if (versionsBelow2_13.any { ext.neural_search_bwc_version.startsWith(it) }){ filter { excludeTestsMatching "org.opensearch.neuralsearch.bwc.NeuralQueryEnricherProcessorIT.testNeuralQueryEnricherProcessor_NeuralSparseSearch_E2EFlow" - } - } - - // Excluding the text chunking processor test because we introduce this feature in 2.13 - if (ext.neural_search_bwc_version.startsWith("2.9") || ext.neural_search_bwc_version.startsWith("2.10") || ext.neural_search_bwc_version.startsWith("2.11") || ext.neural_search_bwc_version.startsWith("2.12")){ - filter { excludeTestsMatching "org.opensearch.neuralsearch.bwc.TextChunkingProcessorIT.*" } } - // Excluding the k-NN radial search tests and batch ingestion tests because we introduce these features in 2.14 - if (ext.neural_search_bwc_version.startsWith("2.9") || ext.neural_search_bwc_version.startsWith("2.10") || ext.neural_search_bwc_version.startsWith("2.11") || ext.neural_search_bwc_version.startsWith("2.12") || ext.neural_search_bwc_version.startsWith("2.13")){ + // Excluding the k-NN radial search tests because we introduce this feature in 2.14 + if (versionsBelow2_14.any { ext.neural_search_bwc_version.startsWith(it) }){ filter { excludeTestsMatching "org.opensearch.neuralsearch.bwc.KnnRadialSearchIT.*" - excludeTestsMatching "org.opensearch.neuralsearch.bwc.BatchIngestionIT.*" } } // Excluding the NeuralSparseQuery two-phase search pipeline tests because we introduce this feature in 2.15 - if (ext.neural_search_bwc_version.startsWith("2.9") || ext.neural_search_bwc_version.startsWith("2.10") || ext.neural_search_bwc_version.startsWith("2.11") || ext.neural_search_bwc_version.startsWith("2.12") || ext.neural_search_bwc_version.startsWith("2.13") || ext.neural_search_bwc_version.startsWith("2.14")){ + if (versionsBelow2_15.any { ext.neural_search_bwc_version.startsWith(it) }){ filter { excludeTestsMatching "org.opensearch.neuralsearch.bwc.NeuralSparseTwoPhaseProcessorIT.*" } } + // Excluding the batch processor tests because we introduce this feature in 2.16 + if (versionsBelow2_16.any { ext.neural_search_bwc_version.startsWith(it) }){ + filter { + excludeTestsMatching "org.opensearch.neuralsearch.bwc.BatchIngestionIT.*" + } + } + nonInputProperties.systemProperty('tests.rest.cluster', "${-> testClusters."${baseName}".allHttpSocketURI.join(",")}") nonInputProperties.systemProperty('tests.clustername', "${-> testClusters."${baseName}".getName()}") systemProperty 'tests.security.manager', 'false' diff --git a/qa/restart-upgrade/src/test/java/org/opensearch/neuralsearch/bwc/AbstractRestartUpgradeRestTestCase.java b/qa/restart-upgrade/src/test/java/org/opensearch/neuralsearch/bwc/AbstractRestartUpgradeRestTestCase.java index bdbba92e8..7028888ca 100644 --- a/qa/restart-upgrade/src/test/java/org/opensearch/neuralsearch/bwc/AbstractRestartUpgradeRestTestCase.java +++ b/qa/restart-upgrade/src/test/java/org/opensearch/neuralsearch/bwc/AbstractRestartUpgradeRestTestCase.java @@ -76,7 +76,7 @@ protected String registerModelGroupAndGetModelId(final String requestBody) throw protected void createPipelineProcessor(final String modelId, final String pipelineName) throws Exception { String requestBody = Files.readString(Path.of(classLoader.getResource("processor/PipelineConfiguration.json").toURI())); - createPipelineProcessor(requestBody, pipelineName, modelId); + createPipelineProcessor(requestBody, pipelineName, modelId, null); } protected String uploadSparseEncodingModel() throws Exception { @@ -90,20 +90,25 @@ protected void createPipelineForTextImageProcessor(final String modelId, final S String requestBody = Files.readString( Path.of(classLoader.getResource("processor/PipelineForTextImageProcessorConfiguration.json").toURI()) ); - createPipelineProcessor(requestBody, pipelineName, modelId); + createPipelineProcessor(requestBody, pipelineName, modelId, null); } - protected void createPipelineForSparseEncodingProcessor(final String modelId, final String pipelineName) throws Exception { + protected void createPipelineForSparseEncodingProcessor(final String modelId, final String pipelineName, final Integer batchSize) + throws Exception { String requestBody = Files.readString( Path.of(classLoader.getResource("processor/PipelineForSparseEncodingProcessorConfiguration.json").toURI()) ); - createPipelineProcessor(requestBody, pipelineName, modelId); + createPipelineProcessor(requestBody, pipelineName, modelId, batchSize); + } + + protected void createPipelineForSparseEncodingProcessor(final String modelId, final String pipelineName) throws Exception { + createPipelineForSparseEncodingProcessor(modelId, pipelineName, null); } protected void createPipelineForTextChunkingProcessor(String pipelineName) throws Exception { String requestBody = Files.readString( Path.of(classLoader.getResource("processor/PipelineForTextChunkingProcessorConfiguration.json").toURI()) ); - createPipelineProcessor(requestBody, pipelineName, ""); + createPipelineProcessor(requestBody, pipelineName, "", null); } } diff --git a/qa/restart-upgrade/src/test/java/org/opensearch/neuralsearch/bwc/BatchIngestionIT.java b/qa/restart-upgrade/src/test/java/org/opensearch/neuralsearch/bwc/BatchIngestionIT.java index 0e490e2e4..f9cd11251 100644 --- a/qa/restart-upgrade/src/test/java/org/opensearch/neuralsearch/bwc/BatchIngestionIT.java +++ b/qa/restart-upgrade/src/test/java/org/opensearch/neuralsearch/bwc/BatchIngestionIT.java @@ -27,14 +27,14 @@ public void testBatchIngestionWithNeuralSparseProcessor_E2EFlow() throws Excepti if (isRunningAgainstOldCluster()) { String modelId = uploadSparseEncodingModel(); loadModel(modelId); - createPipelineForSparseEncodingProcessor(modelId, PIPELINE_NAME); + createPipelineForSparseEncodingProcessor(modelId, PIPELINE_NAME, batchSize); createIndexWithConfiguration( indexName, Files.readString(Path.of(classLoader.getResource("processor/SparseIndexMappings.json").toURI())), PIPELINE_NAME ); List> docs = prepareDataForBulkIngestion(0, 5); - bulkAddDocuments(indexName, TEXT_FIELD_NAME, PIPELINE_NAME, docs, batchSize); + bulkAddDocuments(indexName, TEXT_FIELD_NAME, PIPELINE_NAME, docs); validateDocCountAndInfo(indexName, 5, () -> getDocById(indexName, "4"), EMBEDDING_FIELD_NAME, Map.class); } else { String modelId = null; @@ -42,7 +42,7 @@ public void testBatchIngestionWithNeuralSparseProcessor_E2EFlow() throws Excepti loadModel(modelId); try { List> docs = prepareDataForBulkIngestion(5, 5); - bulkAddDocuments(indexName, TEXT_FIELD_NAME, PIPELINE_NAME, docs, batchSize); + bulkAddDocuments(indexName, TEXT_FIELD_NAME, PIPELINE_NAME, docs); validateDocCountAndInfo(indexName, 10, () -> getDocById(indexName, "9"), EMBEDDING_FIELD_NAME, Map.class); } finally { wipeOfTestResources(indexName, PIPELINE_NAME, modelId, null); diff --git a/qa/rolling-upgrade/build.gradle b/qa/rolling-upgrade/build.gradle index 7d21c5f9e..68e2c5566 100644 --- a/qa/rolling-upgrade/build.gradle +++ b/qa/rolling-upgrade/build.gradle @@ -54,6 +54,12 @@ testClusters { } } +def versionsBelow2_11 = ["2.9", "2.10"] +def versionsBelow2_12 = versionsBelow2_11 + "2.11" +def versionsBelow2_13 = versionsBelow2_12 + "2.12" +def versionsBelow2_14 = versionsBelow2_13 + "2.13" +def versionsBelow2_15 = versionsBelow2_14 + "2.14" +def versionsBelow2_16 = versionsBelow2_15 + "2.15" // Task to run BWC tests against the old cluster task testAgainstOldCluster(type: StandaloneRestIntegTestTask) { @@ -67,7 +73,7 @@ task testAgainstOldCluster(type: StandaloneRestIntegTestTask) { //Excluding MultiModalSearchIT, HybridSearchIT, NeuralSparseSearchIT, NeuralQueryEnricherProcessorIT tests from neural search version 2.9 and 2.10 // because these features were released in 2.11 version. - if (ext.neural_search_bwc_version.startsWith("2.9") || ext.neural_search_bwc_version.startsWith("2.10")){ + if (versionsBelow2_11.any { ext.neural_search_bwc_version.startsWith(it) }){ filter { excludeTestsMatching "org.opensearch.neuralsearch.bwc.MultiModalSearchIT.*" excludeTestsMatching "org.opensearch.neuralsearch.bwc.HybridSearchIT.*" @@ -76,37 +82,34 @@ task testAgainstOldCluster(type: StandaloneRestIntegTestTask) { } } - // Excluding the test because we introduce this feature in 2.13 - if (ext.neural_search_bwc_version.startsWith("2.11") || ext.neural_search_bwc_version.startsWith("2.12")){ + // Excluding the tests because we introduce these features in 2.13 + if (versionsBelow2_13.any { ext.neural_search_bwc_version.startsWith(it) }){ filter { excludeTestsMatching "org.opensearch.neuralsearch.bwc.NeuralQueryEnricherProcessorIT.testNeuralQueryEnricherProcessor_NeuralSparseSearch_E2EFlow" - } - } - - // Excluding the text chunking processor test because we introduce this feature in 2.13 - if (ext.neural_search_bwc_version.startsWith("2.9") || ext.neural_search_bwc_version.startsWith("2.10") || ext.neural_search_bwc_version.startsWith("2.11") || ext.neural_search_bwc_version.startsWith("2.12")){ - filter { excludeTestsMatching "org.opensearch.neuralsearch.bwc.TextChunkingProcessorIT.*" } } // Excluding the k-NN radial search and batch ingestion tests because we introduce these features in 2.14 - if (ext.neural_search_bwc_version.startsWith("2.9") || ext.neural_search_bwc_version.startsWith("2.10") || ext.neural_search_bwc_version.startsWith("2.11") || ext.neural_search_bwc_version.startsWith("2.12") || ext.neural_search_bwc_version.startsWith("2.13")){ + if (versionsBelow2_14.any { ext.neural_search_bwc_version.startsWith(it) }){ filter { excludeTestsMatching "org.opensearch.neuralsearch.bwc.KnnRadialSearchIT.*" - excludeTestsMatching "org.opensearch.neuralsearch.bwc.BatchIngestionIT.*" } } // Excluding the neural sparse two phase processor test because we introduce this feature in 2.15 - if (ext.neural_search_bwc_version.startsWith("2.9") || ext.neural_search_bwc_version.startsWith("2.10") - || ext.neural_search_bwc_version.startsWith("2.11") || ext.neural_search_bwc_version.startsWith("2.12") - || ext.neural_search_bwc_version.startsWith("2.13") || ext.neural_search_bwc_version.startsWith("2.14")){ + if (versionsBelow2_15.any { ext.neural_search_bwc_version.startsWith(it) }){ filter { excludeTestsMatching "org.opensearch.neuralsearch.bwc.NeuralSparseTwoPhaseProcessorIT.*" } } + // Excluding the batching processor tests because we introduce this feature in 2.16 + if (versionsBelow2_16.any { ext.neural_search_bwc_version.startsWith(it) }){ + filter { + excludeTestsMatching "org.opensearch.neuralsearch.bwc.BatchIngestionIT.*" + } + } nonInputProperties.systemProperty('tests.rest.cluster', "${-> testClusters."${baseName}".allHttpSocketURI.join(",")}") nonInputProperties.systemProperty('tests.clustername', "${-> testClusters."${baseName}".getName()}") @@ -135,7 +138,7 @@ task testAgainstOneThirdUpgradedCluster(type: StandaloneRestIntegTestTask) { //Excluding MultiModalSearchIT, HybridSearchIT, NeuralSparseSearchIT, NeuralQueryEnricherProcessorIT tests from neural search version 2.9 and 2.10 // because these features were released in 2.11 version. - if (ext.neural_search_bwc_version.startsWith("2.9") || ext.neural_search_bwc_version.startsWith("2.10")){ + if (versionsBelow2_11.any { ext.neural_search_bwc_version.startsWith(it) }){ filter { excludeTestsMatching "org.opensearch.neuralsearch.bwc.MultiModalSearchIT.*" excludeTestsMatching "org.opensearch.neuralsearch.bwc.HybridSearchIT.*" @@ -144,22 +147,16 @@ task testAgainstOneThirdUpgradedCluster(type: StandaloneRestIntegTestTask) { } } - // Excluding the test because we introduce this feature in 2.13 - if (ext.neural_search_bwc_version.startsWith("2.11") || ext.neural_search_bwc_version.startsWith("2.12")){ + // Excluding the tests because we introduce these features in 2.13 + if (versionsBelow2_13.any { ext.neural_search_bwc_version.startsWith(it) }){ filter { excludeTestsMatching "org.opensearch.neuralsearch.bwc.NeuralQueryEnricherProcessorIT.testNeuralQueryEnricherProcessor_NeuralSparseSearch_E2EFlow" - } - } - - // Excluding the text chunking processor test because we introduce this feature in 2.13 - if (ext.neural_search_bwc_version.startsWith("2.9") || ext.neural_search_bwc_version.startsWith("2.10") || ext.neural_search_bwc_version.startsWith("2.11") || ext.neural_search_bwc_version.startsWith("2.12")){ - filter { excludeTestsMatching "org.opensearch.neuralsearch.bwc.TextChunkingProcessorIT.*" } } // Excluding the k-NN radial search and batch ingestion tests because we introduce these features in 2.14 - if (ext.neural_search_bwc_version.startsWith("2.9") || ext.neural_search_bwc_version.startsWith("2.10") || ext.neural_search_bwc_version.startsWith("2.11") || ext.neural_search_bwc_version.startsWith("2.12") || ext.neural_search_bwc_version.startsWith("2.13")){ + if (versionsBelow2_14.any { ext.neural_search_bwc_version.startsWith(it) }){ filter { excludeTestsMatching "org.opensearch.neuralsearch.bwc.KnnRadialSearchIT.*" excludeTestsMatching "org.opensearch.neuralsearch.bwc.BatchIngestionIT.*" @@ -167,14 +164,18 @@ task testAgainstOneThirdUpgradedCluster(type: StandaloneRestIntegTestTask) { } // Excluding the neural sparse two phase processor test because we introduce this feature in 2.15 - if (ext.neural_search_bwc_version.startsWith("2.9") || ext.neural_search_bwc_version.startsWith("2.10") - || ext.neural_search_bwc_version.startsWith("2.11") || ext.neural_search_bwc_version.startsWith("2.12") - || ext.neural_search_bwc_version.startsWith("2.13") || ext.neural_search_bwc_version.startsWith("2.14")){ + if (versionsBelow2_15.any { ext.neural_search_bwc_version.startsWith(it) }){ filter { excludeTestsMatching "org.opensearch.neuralsearch.bwc.NeuralSparseTwoPhaseProcessorIT.*" } } + // Excluding the batching processor tests because we introduce this feature in 2.16 + if (versionsBelow2_16.any { ext.neural_search_bwc_version.startsWith(it) }){ + filter { + excludeTestsMatching "org.opensearch.neuralsearch.bwc.BatchIngestionIT.*" + } + } nonInputProperties.systemProperty('tests.rest.cluster', "${-> testClusters."${baseName}".allHttpSocketURI.join(",")}") nonInputProperties.systemProperty('tests.clustername', "${-> testClusters."${baseName}".getName()}") @@ -202,7 +203,7 @@ task testAgainstTwoThirdsUpgradedCluster(type: StandaloneRestIntegTestTask) { // Excluding MultiModalSearchIT, HybridSearchIT, NeuralSparseSearchIT, NeuralQueryEnricherProcessorIT tests from neural search version 2.9 and 2.10 // because these features were released in 2.11 version. - if (ext.neural_search_bwc_version.startsWith("2.9") || ext.neural_search_bwc_version.startsWith("2.10")){ + if (versionsBelow2_11.any { ext.neural_search_bwc_version.startsWith(it) }){ filter { excludeTestsMatching "org.opensearch.neuralsearch.bwc.MultiModalSearchIT.*" excludeTestsMatching "org.opensearch.neuralsearch.bwc.HybridSearchIT.*" @@ -211,22 +212,16 @@ task testAgainstTwoThirdsUpgradedCluster(type: StandaloneRestIntegTestTask) { } } - // Excluding the test because we introduce this feature in 2.13 - if (ext.neural_search_bwc_version.startsWith("2.11") || ext.neural_search_bwc_version.startsWith("2.12")){ + // Excluding the tests because we introduce these features in 2.13 + if (versionsBelow2_13.any { ext.neural_search_bwc_version.startsWith(it) }){ filter { excludeTestsMatching "org.opensearch.neuralsearch.bwc.NeuralQueryEnricherProcessorIT.testNeuralQueryEnricherProcessor_NeuralSparseSearch_E2EFlow" - } - } - - // Excluding the text chunking processor test because we introduce this feature in 2.13 - if (ext.neural_search_bwc_version.startsWith("2.9") || ext.neural_search_bwc_version.startsWith("2.10") || ext.neural_search_bwc_version.startsWith("2.11") || ext.neural_search_bwc_version.startsWith("2.12")){ - filter { excludeTestsMatching "org.opensearch.neuralsearch.bwc.TextChunkingProcessorIT.*" } } // Excluding the k-NN radial search and batch ingestion tests because we introduce these features in 2.14 - if (ext.neural_search_bwc_version.startsWith("2.9") || ext.neural_search_bwc_version.startsWith("2.10") || ext.neural_search_bwc_version.startsWith("2.11") || ext.neural_search_bwc_version.startsWith("2.12") || ext.neural_search_bwc_version.startsWith("2.13")){ + if (versionsBelow2_14.any { ext.neural_search_bwc_version.startsWith(it) }){ filter { excludeTestsMatching "org.opensearch.neuralsearch.bwc.KnnRadialSearchIT.*" excludeTestsMatching "org.opensearch.neuralsearch.bwc.BatchIngestionIT.*" @@ -234,14 +229,18 @@ task testAgainstTwoThirdsUpgradedCluster(type: StandaloneRestIntegTestTask) { } // Excluding the neural sparse two phase processor test because we introduce this feature in 2.15 - if (ext.neural_search_bwc_version.startsWith("2.9") || ext.neural_search_bwc_version.startsWith("2.10") - || ext.neural_search_bwc_version.startsWith("2.11") || ext.neural_search_bwc_version.startsWith("2.12") - || ext.neural_search_bwc_version.startsWith("2.13") || ext.neural_search_bwc_version.startsWith("2.14")){ + if (versionsBelow2_15.any { ext.neural_search_bwc_version.startsWith(it) }){ filter { excludeTestsMatching "org.opensearch.neuralsearch.bwc.NeuralSparseTwoPhaseProcessorIT.*" } } + // Excluding the batching processor tests because we introduce this feature in 2.16 + if (versionsBelow2_16.any { ext.neural_search_bwc_version.startsWith(it) }){ + filter { + excludeTestsMatching "org.opensearch.neuralsearch.bwc.BatchIngestionIT.*" + } + } nonInputProperties.systemProperty('tests.rest.cluster', "${-> testClusters."${baseName}".allHttpSocketURI.join(",")}") nonInputProperties.systemProperty('tests.clustername', "${-> testClusters."${baseName}".getName()}") @@ -269,7 +268,7 @@ task testRollingUpgrade(type: StandaloneRestIntegTestTask) { //Excluding MultiModalSearchIT, HybridSearchIT, NeuralSparseSearchIT, NeuralQueryEnricherProcessorIT tests from neural search version 2.9 and 2.10 // because these features were released in 2.11 version. - if (ext.neural_search_bwc_version.startsWith("2.9") || ext.neural_search_bwc_version.startsWith("2.10")){ + if (versionsBelow2_11.any { ext.neural_search_bwc_version.startsWith(it) }){ filter { excludeTestsMatching "org.opensearch.neuralsearch.bwc.MultiModalSearchIT.*" excludeTestsMatching "org.opensearch.neuralsearch.bwc.HybridSearchIT.*" @@ -278,22 +277,16 @@ task testRollingUpgrade(type: StandaloneRestIntegTestTask) { } } - // Excluding the test because we introduce this feature in 2.13 - if (ext.neural_search_bwc_version.startsWith("2.11") || ext.neural_search_bwc_version.startsWith("2.12")){ + // Excluding the tests because we introduce these features in 2.13 + if (versionsBelow2_13.any { ext.neural_search_bwc_version.startsWith(it) }){ filter { excludeTestsMatching "org.opensearch.neuralsearch.bwc.NeuralQueryEnricherProcessorIT.testNeuralQueryEnricherProcessor_NeuralSparseSearch_E2EFlow" - } - } - - // Excluding the text chunking processor test because we introduce this feature in 2.13 - if (ext.neural_search_bwc_version.startsWith("2.9") || ext.neural_search_bwc_version.startsWith("2.10") || ext.neural_search_bwc_version.startsWith("2.11") || ext.neural_search_bwc_version.startsWith("2.12")){ - filter { excludeTestsMatching "org.opensearch.neuralsearch.bwc.TextChunkingProcessorIT.*" } } // Excluding the k-NN radial search and batch ingestion tests because we introduce these features in 2.14 - if (ext.neural_search_bwc_version.startsWith("2.9") || ext.neural_search_bwc_version.startsWith("2.10") || ext.neural_search_bwc_version.startsWith("2.11") || ext.neural_search_bwc_version.startsWith("2.12") || ext.neural_search_bwc_version.startsWith("2.13")){ + if (versionsBelow2_14.any { ext.neural_search_bwc_version.startsWith(it) }){ filter { excludeTestsMatching "org.opensearch.neuralsearch.bwc.KnnRadialSearchIT.*" excludeTestsMatching "org.opensearch.neuralsearch.bwc.BatchIngestionIT.*" @@ -301,14 +294,19 @@ task testRollingUpgrade(type: StandaloneRestIntegTestTask) { } // Excluding the neural sparse two phase processor test because we introduce this feature in 2.15 - if (ext.neural_search_bwc_version.startsWith("2.9") || ext.neural_search_bwc_version.startsWith("2.10") - || ext.neural_search_bwc_version.startsWith("2.11") || ext.neural_search_bwc_version.startsWith("2.12") - || ext.neural_search_bwc_version.startsWith("2.13") || ext.neural_search_bwc_version.startsWith("2.14")){ + if (versionsBelow2_15.any { ext.neural_search_bwc_version.startsWith(it) }){ filter { excludeTestsMatching "org.opensearch.neuralsearch.bwc.NeuralSparseTwoPhaseProcessorIT.*" } } + // Excluding the batching processor tests because we introduce this feature in 2.16 + if (versionsBelow2_16.any { ext.neural_search_bwc_version.startsWith(it) }){ + filter { + excludeTestsMatching "org.opensearch.neuralsearch.bwc.BatchIngestionIT.*" + } + } + nonInputProperties.systemProperty('tests.rest.cluster', "${-> testClusters."${baseName}".allHttpSocketURI.join(",")}") nonInputProperties.systemProperty('tests.clustername', "${-> testClusters."${baseName}".getName()}") systemProperty 'tests.security.manager', 'false' diff --git a/qa/rolling-upgrade/src/test/java/org/opensearch/neuralsearch/bwc/AbstractRollingUpgradeTestCase.java b/qa/rolling-upgrade/src/test/java/org/opensearch/neuralsearch/bwc/AbstractRollingUpgradeTestCase.java index fffc878e8..48688938c 100644 --- a/qa/rolling-upgrade/src/test/java/org/opensearch/neuralsearch/bwc/AbstractRollingUpgradeTestCase.java +++ b/qa/rolling-upgrade/src/test/java/org/opensearch/neuralsearch/bwc/AbstractRollingUpgradeTestCase.java @@ -102,7 +102,7 @@ protected String registerModelGroupAndGetModelId(String requestBody) throws Exce protected void createPipelineProcessor(String modelId, String pipelineName) throws Exception { String requestBody = Files.readString(Path.of(classLoader.getResource("processor/PipelineConfiguration.json").toURI())); - createPipelineProcessor(requestBody, pipelineName, modelId); + createPipelineProcessor(requestBody, pipelineName, modelId, null); } protected String uploadTextImageEmbeddingModel() throws Exception { @@ -114,7 +114,7 @@ protected void createPipelineForTextImageProcessor(String modelId, String pipeli String requestBody = Files.readString( Path.of(classLoader.getResource("processor/PipelineForTextImageProcessorConfiguration.json").toURI()) ); - createPipelineProcessor(requestBody, pipelineName, modelId); + createPipelineProcessor(requestBody, pipelineName, modelId, null); } protected String uploadSparseEncodingModel() throws Exception { @@ -124,17 +124,29 @@ protected String uploadSparseEncodingModel() throws Exception { return registerModelGroupAndGetModelId(requestBody); } - protected void createPipelineForSparseEncodingProcessor(String modelId, String pipelineName) throws Exception { + protected void createPipelineForSparseEncodingProcessor(String modelId, String pipelineName, Integer batchSize) throws Exception { String requestBody = Files.readString( Path.of(classLoader.getResource("processor/PipelineForSparseEncodingProcessorConfiguration.json").toURI()) ); - createPipelineProcessor(requestBody, pipelineName, modelId); + final String batchSizeTag = "{{batch_size}}"; + if (requestBody.contains(batchSizeTag)) { + if (batchSize != null) { + requestBody = requestBody.replace(batchSizeTag, String.format(LOCALE, "\n\"batch_size\": %d,\n", batchSize)); + } else { + requestBody = requestBody.replace(batchSizeTag, ""); + } + } + createPipelineProcessor(requestBody, pipelineName, modelId, null); + } + + protected void createPipelineForSparseEncodingProcessor(String modelId, String pipelineName) throws Exception { + createPipelineForSparseEncodingProcessor(modelId, pipelineName, null); } protected void createPipelineForTextChunkingProcessor(String pipelineName) throws Exception { String requestBody = Files.readString( Path.of(classLoader.getResource("processor/PipelineForTextChunkingProcessorConfiguration.json").toURI()) ); - createPipelineProcessor(requestBody, pipelineName, ""); + createPipelineProcessor(requestBody, pipelineName, "", null); } } diff --git a/qa/rolling-upgrade/src/test/java/org/opensearch/neuralsearch/bwc/BatchIngestionIT.java b/qa/rolling-upgrade/src/test/java/org/opensearch/neuralsearch/bwc/BatchIngestionIT.java index 3052b48cd..e57802816 100644 --- a/qa/rolling-upgrade/src/test/java/org/opensearch/neuralsearch/bwc/BatchIngestionIT.java +++ b/qa/rolling-upgrade/src/test/java/org/opensearch/neuralsearch/bwc/BatchIngestionIT.java @@ -28,21 +28,21 @@ public void testBatchIngestion_SparseEncodingProcessor_E2EFlow() throws Exceptio case OLD: sparseModelId = uploadSparseEncodingModel(); loadModel(sparseModelId); - createPipelineForSparseEncodingProcessor(sparseModelId, SPARSE_PIPELINE); + createPipelineForSparseEncodingProcessor(sparseModelId, SPARSE_PIPELINE, 2); createIndexWithConfiguration( indexName, Files.readString(Path.of(classLoader.getResource("processor/SparseIndexMappings.json").toURI())), SPARSE_PIPELINE ); List> docs = prepareDataForBulkIngestion(0, 5); - bulkAddDocuments(indexName, TEXT_FIELD_NAME, SPARSE_PIPELINE, docs, 2); + bulkAddDocuments(indexName, TEXT_FIELD_NAME, SPARSE_PIPELINE, docs); validateDocCountAndInfo(indexName, 5, () -> getDocById(indexName, "4"), EMBEDDING_FIELD_NAME, Map.class); break; case MIXED: sparseModelId = TestUtils.getModelId(getIngestionPipeline(SPARSE_PIPELINE), SPARSE_ENCODING_PROCESSOR); loadModel(sparseModelId); List> docsForMixed = prepareDataForBulkIngestion(5, 5); - bulkAddDocuments(indexName, TEXT_FIELD_NAME, SPARSE_PIPELINE, docsForMixed, 3); + bulkAddDocuments(indexName, TEXT_FIELD_NAME, SPARSE_PIPELINE, docsForMixed); validateDocCountAndInfo(indexName, 10, () -> getDocById(indexName, "9"), EMBEDDING_FIELD_NAME, Map.class); break; case UPGRADED: @@ -50,7 +50,7 @@ public void testBatchIngestion_SparseEncodingProcessor_E2EFlow() throws Exceptio sparseModelId = TestUtils.getModelId(getIngestionPipeline(SPARSE_PIPELINE), SPARSE_ENCODING_PROCESSOR); loadModel(sparseModelId); List> docsForUpgraded = prepareDataForBulkIngestion(10, 5); - bulkAddDocuments(indexName, TEXT_FIELD_NAME, SPARSE_PIPELINE, docsForUpgraded, 2); + bulkAddDocuments(indexName, TEXT_FIELD_NAME, SPARSE_PIPELINE, docsForUpgraded); validateDocCountAndInfo(indexName, 15, () -> getDocById(indexName, "14"), EMBEDDING_FIELD_NAME, Map.class); } finally { wipeOfTestResources(indexName, SPARSE_PIPELINE, sparseModelId, null); diff --git a/qa/rolling-upgrade/src/test/resources/processor/PipelineForSparseEncodingProcessorConfiguration.json b/qa/rolling-upgrade/src/test/resources/processor/PipelineForSparseEncodingProcessorConfiguration.json index fe885a0a2..a597c2939 100644 --- a/qa/rolling-upgrade/src/test/resources/processor/PipelineForSparseEncodingProcessorConfiguration.json +++ b/qa/rolling-upgrade/src/test/resources/processor/PipelineForSparseEncodingProcessorConfiguration.json @@ -4,6 +4,7 @@ { "sparse_encoding": { "model_id": "%s", + "batch_size": "%d", "field_map": { "passage_text": "passage_embedding" } diff --git a/src/test/java/org/opensearch/neuralsearch/processor/TextChunkingProcessorIT.java b/src/test/java/org/opensearch/neuralsearch/processor/TextChunkingProcessorIT.java index d85865bb5..de5ca820e 100644 --- a/src/test/java/org/opensearch/neuralsearch/processor/TextChunkingProcessorIT.java +++ b/src/test/java/org/opensearch/neuralsearch/processor/TextChunkingProcessorIT.java @@ -196,7 +196,7 @@ private void createPipelineProcessor(String pipelineName) throws Exception { URL pipelineURLPath = classLoader.getResource(PIPELINE_CONFIGS_BY_NAME.get(pipelineName)); Objects.requireNonNull(pipelineURLPath); String requestBody = Files.readString(Path.of(pipelineURLPath.toURI())); - createPipelineProcessor(requestBody, pipelineName, ""); + createPipelineProcessor(requestBody, pipelineName, "", null); } private void createTextChunkingIndex(String indexName, String pipelineName) throws Exception { diff --git a/src/test/java/org/opensearch/neuralsearch/processor/TextEmbeddingProcessorIT.java b/src/test/java/org/opensearch/neuralsearch/processor/TextEmbeddingProcessorIT.java index 6bd452e55..638a34a3c 100644 --- a/src/test/java/org/opensearch/neuralsearch/processor/TextEmbeddingProcessorIT.java +++ b/src/test/java/org/opensearch/neuralsearch/processor/TextEmbeddingProcessorIT.java @@ -82,9 +82,9 @@ public void testTextEmbeddingProcessor_batch() throws Exception { try { modelId = uploadTextEmbeddingModel(); loadModel(modelId); - createPipelineProcessor(modelId, PIPELINE_NAME, ProcessorType.TEXT_EMBEDDING); + createPipelineProcessor(modelId, PIPELINE_NAME, ProcessorType.TEXT_EMBEDDING, 2); createTextEmbeddingIndex(); - ingestBatchDocumentWithBulk("batch_", 2, 2, Collections.emptySet(), Collections.emptySet()); + ingestBatchDocumentWithBulk("batch_", 2, Collections.emptySet(), Collections.emptySet()); assertEquals(2, getDocCount(INDEX_NAME)); ingestDocument(String.format(LOCALE, INGEST_DOC1, "success"), "1"); @@ -182,10 +182,10 @@ public void testTextEmbeddingProcessor_withBatchSizeInProcessor() throws Excepti URL pipelineURLPath = classLoader.getResource("processor/PipelineConfigurationWithBatchSize.json"); Objects.requireNonNull(pipelineURLPath); String requestBody = Files.readString(Path.of(pipelineURLPath.toURI())); - createPipelineProcessor(requestBody, PIPELINE_NAME, modelId); + createPipelineProcessor(requestBody, PIPELINE_NAME, modelId, null); createTextEmbeddingIndex(); int docCount = 5; - ingestBatchDocumentWithBulk("batch_", docCount, docCount, Collections.emptySet(), Collections.emptySet()); + ingestBatchDocumentWithBulk("batch_", docCount, Collections.emptySet(), Collections.emptySet()); assertEquals(5, getDocCount(INDEX_NAME)); for (int i = 0; i < docCount; ++i) { @@ -214,10 +214,10 @@ public void testTextEmbeddingProcessor_withFailureAndSkip() throws Exception { URL pipelineURLPath = classLoader.getResource("processor/PipelineConfigurationWithBatchSize.json"); Objects.requireNonNull(pipelineURLPath); String requestBody = Files.readString(Path.of(pipelineURLPath.toURI())); - createPipelineProcessor(requestBody, PIPELINE_NAME, modelId); + createPipelineProcessor(requestBody, PIPELINE_NAME, modelId, null); createTextEmbeddingIndex(); int docCount = 5; - ingestBatchDocumentWithBulk("batch_", docCount, docCount, Set.of(0), Set.of(1)); + ingestBatchDocumentWithBulk("batch_", docCount, Set.of(0), Set.of(1)); assertEquals(3, getDocCount(INDEX_NAME)); for (int i = 2; i < docCount; ++i) { @@ -274,7 +274,7 @@ private void ingestDocument(String doc, String id) throws Exception { assertEquals("created", map.get("result")); } - private void ingestBatchDocumentWithBulk(String idPrefix, int docCount, int batchSize, Set failedIds, Set droppedIds) + private void ingestBatchDocumentWithBulk(String idPrefix, int docCount, Set failedIds, Set droppedIds) throws Exception { StringBuilder payloadBuilder = new StringBuilder(); for (int i = 0; i < docCount; ++i) { @@ -294,7 +294,6 @@ private void ingestBatchDocumentWithBulk(String idPrefix, int docCount, int batc final String payload = payloadBuilder.toString(); Map params = new HashMap<>(); params.put("refresh", "true"); - params.put("batch_size", String.valueOf(batchSize)); Response response = makeRequest( client(), "POST", diff --git a/src/test/resources/processor/PipelineConfiguration.json b/src/test/resources/processor/PipelineConfiguration.json index d833576a0..65dce44a2 100644 --- a/src/test/resources/processor/PipelineConfiguration.json +++ b/src/test/resources/processor/PipelineConfiguration.json @@ -4,6 +4,7 @@ { "text_embedding": { "model_id": "%s", + "batch_size": "%d", "field_map": { "title": "title_knn", "favor_list": "favor_list_knn", diff --git a/src/test/resources/processor/SparseEncodingPipelineConfiguration.json b/src/test/resources/processor/SparseEncodingPipelineConfiguration.json index 04a4baf80..4166e2082 100644 --- a/src/test/resources/processor/SparseEncodingPipelineConfiguration.json +++ b/src/test/resources/processor/SparseEncodingPipelineConfiguration.json @@ -4,6 +4,7 @@ { "sparse_encoding": { "model_id": "%s", + "batch_size": "%d", "field_map": { "title": "title_sparse", "favor_list": "favor_list_sparse", diff --git a/src/testFixtures/java/org/opensearch/neuralsearch/BaseNeuralSearchIT.java b/src/testFixtures/java/org/opensearch/neuralsearch/BaseNeuralSearchIT.java index 13c8e230a..3b0339047 100644 --- a/src/testFixtures/java/org/opensearch/neuralsearch/BaseNeuralSearchIT.java +++ b/src/testFixtures/java/org/opensearch/neuralsearch/BaseNeuralSearchIT.java @@ -294,17 +294,31 @@ protected void createIndexWithConfiguration(final String indexName, String index protected void createPipelineProcessor(final String modelId, final String pipelineName, final ProcessorType processorType) throws Exception { + createPipelineProcessor(modelId, pipelineName, processorType, null); + } + + protected void createPipelineProcessor( + final String modelId, + final String pipelineName, + final ProcessorType processorType, + final Integer batchSize + ) throws Exception { String requestBody = Files.readString(Path.of(classLoader.getResource(PIPELINE_CONFIGS_BY_TYPE.get(processorType)).toURI())); - createPipelineProcessor(requestBody, pipelineName, modelId); + createPipelineProcessor(requestBody, pipelineName, modelId, batchSize); } - protected void createPipelineProcessor(final String requestBody, final String pipelineName, final String modelId) throws Exception { + protected void createPipelineProcessor( + final String requestBody, + final String pipelineName, + final String modelId, + final Integer batchSize + ) throws Exception { Response pipelineCreateResponse = makeRequest( client(), "PUT", "/_ingest/pipeline/" + pipelineName, null, - toHttpEntity(String.format(LOCALE, requestBody, modelId)), + toHttpEntity(String.format(LOCALE, requestBody, modelId, batchSize == null ? 1 : batchSize)), ImmutableList.of(new BasicHeader(HttpHeaders.USER_AGENT, DEFAULT_USER_AGENT)) ); Map node = XContentHelper.convertToMap( @@ -747,13 +761,8 @@ protected void addSparseEncodingDoc( assertEquals(request.getEndpoint() + ": failed", RestStatus.CREATED, RestStatus.fromCode(response.getStatusLine().getStatusCode())); } - protected void bulkAddDocuments( - final String index, - final String textField, - final String pipeline, - final List> docs, - final int batchSize - ) throws IOException, ParseException { + protected void bulkAddDocuments(final String index, final String textField, final String pipeline, final List> docs) + throws IOException { StringBuilder builder = new StringBuilder(); for (int i = 0; i < docs.size(); ++i) { String doc = String.format( @@ -767,10 +776,7 @@ protected void bulkAddDocuments( builder.append(doc); builder.append("\n"); } - Request request = new Request( - "POST", - String.format(Locale.ROOT, "/_bulk?refresh=true&pipeline=%s&batch_size=%d", pipeline, batchSize) - ); + Request request = new Request("POST", String.format(Locale.ROOT, "/_bulk?refresh=true&pipeline=%s", pipeline)); request.setJsonEntity(builder.toString()); Response response = client().performRequest(request);