From 41b86901fc98fbca8b4e667aaf6709832505692d Mon Sep 17 00:00:00 2001 From: Bharathwaj G Date: Tue, 5 Sep 2023 12:30:40 +0530 Subject: [PATCH 1/2] Increasing test heap size and incrementing guava version Signed-off-by: Bharathwaj G --- .github/workflows/build.yml | 6 +++--- .github/workflows/multi-node-test-workflow.yml | 5 +++-- build.gradle | 6 +++++- 3 files changed, 11 insertions(+), 6 deletions(-) diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index 60cf75d1..43c7a950 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -29,7 +29,7 @@ jobs: - name: Checkout Branch uses: actions/checkout@v2 - name: Build with Gradle - run: ./gradlew build + run: ./gradlew build -Dtests.heap.size=1G - name: Pull and Run Docker for security tests run: | version=`./gradlew properties -q | grep "opensearch_version:" | awk '{print $2}'` @@ -74,10 +74,10 @@ jobs: if [ $security -gt 0 ] then echo "Security plugin is available" - ./gradlew integTest -Dtests.rest.cluster=localhost:9200 -Dtests.cluster=localhost:9200 -Dtests.clustername="docker-cluster" -Dhttps=true -Duser=admin -Dpassword=admin + ./gradlew integTest -Dtests.rest.cluster=localhost:9200 -Dtests.cluster=localhost:9200 -Dtests.clustername="docker-cluster" -Dhttps=true -Duser=admin -Dpassword=admin -Dtests.heap.size=1G else echo "Security plugin is NOT available" - ./gradlew integTest -Dtests.rest.cluster=localhost:9200 -Dtests.cluster=localhost:9200 -Dtests.clustername="docker-cluster" + ./gradlew integTest -Dtests.rest.cluster=localhost:9200 -Dtests.cluster=localhost:9200 -Dtests.clustername="docker-cluster" -Dtests.heap.size=1G fi - name: Upload failed logs uses: actions/upload-artifact@v2 diff --git a/.github/workflows/multi-node-test-workflow.yml b/.github/workflows/multi-node-test-workflow.yml index 60bfa30b..2613ccd1 100644 --- a/.github/workflows/multi-node-test-workflow.yml +++ b/.github/workflows/multi-node-test-workflow.yml @@ -24,14 +24,15 @@ jobs: with: java-version: ${{ env.java_version }} # This step uses the checkout Github action: https://github.com/actions/checkout + # Custom setting - heap size = 1G ( 512MB is default ) - name: Checkout Branch uses: actions/checkout@v2 - name: Run integration tests with multi node config - run: ./gradlew integTest -PnumNodes=5 + run: ./gradlew integTest -PnumNodes=5 -Dtests.heap.size=1G - name: Run Backwards Compatibility Tests run: | echo "Running backwards compatibility tests ..." - ./gradlew bwcTestSuite -Dtests.security.manager=false + ./gradlew bwcTestSuite -Dtests.security.manager=false -Dtests.heap.size=1G - name: Upload failed logs uses: actions/upload-artifact@v2 if: failure() diff --git a/build.gradle b/build.gradle index 4917cfa6..76cca3f4 100644 --- a/build.gradle +++ b/build.gradle @@ -62,7 +62,7 @@ apply plugin: 'opensearch.pluginzip' checkstyle { - toolVersion = '8.29' + toolVersion = '10.12.1' configFile file("checkstyle/checkstyle.xml") } @@ -134,6 +134,7 @@ dependencies { implementation "org.opensearch:common-utils:${common_utils_version}" configurations.all { resolutionStrategy { + force "com.google.guava:guava:32.0.1-jre" force "com.puppycrawl.tools:checkstyle:${project.checkstyle.toolVersion}" } } @@ -170,6 +171,7 @@ def securityEnabled = System.getProperty("security", "false") == "true" test { systemProperty 'tests.security.manager', 'false' systemProperty 'es.set.netty.runtime.available.processors', 'false' + systemProperty 'tests.heap.size', '1G' } @@ -196,6 +198,7 @@ testClusters.integTest { integTest { systemProperty 'tests.security.manager', 'false' + systemProperty 'tests.heap.size', '1G' systemProperty 'java.io.tmpdir', opensearch_tmp_dir.absolutePath systemProperty 'buildDir', buildDir.path systemProperty "https", System.getProperty("https", securityEnabled.toString()) @@ -229,6 +232,7 @@ task integTestRemote(type: RestIntegTestTask) { testClassesDirs = sourceSets.test.output.classesDirs classpath = sourceSets.test.runtimeClasspath systemProperty 'tests.security.manager', 'false' + systemProperty 'tests.heap.size', '1G' systemProperty 'java.io.tmpdir', opensearch_tmp_dir.absolutePath systemProperty "https", System.getProperty("https") From d5983d3f84cc6aeb36ba112250bbba9dfe3e6b33 Mon Sep 17 00:00:00 2001 From: Bharathwaj G Date: Wed, 6 Sep 2023 13:28:11 +0530 Subject: [PATCH 2/2] search changes Signed-off-by: Bharathwaj G --- .../AsynchronousSearchRestTestCase.java | 15 ++ .../restIT/AsynchronousSearchSettingsIT.java | 152 +++++++++++++++--- .../asynchronous/utils/RestTestUtils.java | 23 +++ 3 files changed, 166 insertions(+), 24 deletions(-) diff --git a/src/test/java/org/opensearch/search/asynchronous/restIT/AsynchronousSearchRestTestCase.java b/src/test/java/org/opensearch/search/asynchronous/restIT/AsynchronousSearchRestTestCase.java index 94c78893..b78374bd 100644 --- a/src/test/java/org/opensearch/search/asynchronous/restIT/AsynchronousSearchRestTestCase.java +++ b/src/test/java/org/opensearch/search/asynchronous/restIT/AsynchronousSearchRestTestCase.java @@ -5,6 +5,7 @@ package org.opensearch.search.asynchronous.restIT; +import org.opensearch.action.search.SearchRequest; import org.opensearch.core.common.Strings; import org.opensearch.core.xcontent.DeprecationHandler; import org.opensearch.core.xcontent.MediaType; @@ -105,6 +106,12 @@ protected AsynchronousSearchResponse executeSubmitAsynchronousSearch( return executeSubmitAsynchronousSearch(submitAsynchronousSearchRequest, false); } + protected SearchResponse executeSubmitAsynchronousSearch( + @Nullable SearchRequest submitAsynchronousSearchRequest) + throws IOException { + return executeSubmitAsynchronousSearch(submitAsynchronousSearchRequest, false); + } + protected AsynchronousSearchResponse executeSubmitAsynchronousSearch( @Nullable SubmitAsynchronousSearchRequest submitAsynchronousSearchRequest, boolean shouldUseLegacyApi) throws IOException { @@ -113,6 +120,14 @@ protected AsynchronousSearchResponse executeSubmitAsynchronousSearch( return parseEntity(resp.getEntity(), AsynchronousSearchResponse::fromXContent); } + + protected SearchResponse executeSubmitAsynchronousSearch( + @Nullable SearchRequest submitAsynchronousSearchRequest, boolean shouldUseLegacyApi) + throws IOException { + Request request = RestTestUtils.buildHttpRequest(submitAsynchronousSearchRequest, shouldUseLegacyApi); + Response resp = client().performRequest(request); + return parseEntity(resp.getEntity(), SearchResponse::fromXContent); + } Response executeDeleteAsynchronousSearch(DeleteAsynchronousSearchRequest deleteAsynchronousSearchRequest) throws IOException { Request request = RestTestUtils.buildHttpRequest(deleteAsynchronousSearchRequest); return client().performRequest(request); diff --git a/src/test/java/org/opensearch/search/asynchronous/restIT/AsynchronousSearchSettingsIT.java b/src/test/java/org/opensearch/search/asynchronous/restIT/AsynchronousSearchSettingsIT.java index d2a6cf17..c8098a80 100644 --- a/src/test/java/org/opensearch/search/asynchronous/restIT/AsynchronousSearchSettingsIT.java +++ b/src/test/java/org/opensearch/search/asynchronous/restIT/AsynchronousSearchSettingsIT.java @@ -5,6 +5,8 @@ package org.opensearch.search.asynchronous.restIT; +import org.opensearch.action.search.SearchResponse; +import org.opensearch.monitor.jvm.JvmInfo; import org.opensearch.search.asynchronous.context.active.AsynchronousSearchActiveStore; import org.opensearch.search.asynchronous.context.state.AsynchronousSearchState; import org.opensearch.search.asynchronous.request.GetAsynchronousSearchRequest; @@ -16,6 +18,8 @@ import org.opensearch.common.unit.TimeValue; import java.io.IOException; +import java.lang.management.ManagementFactory; +import java.lang.management.MemoryMXBean; import java.util.Arrays; import java.util.LinkedList; import java.util.List; @@ -59,33 +63,46 @@ public void testSubmitInvalidWaitForCompletion() throws Exception { } public void testMaxRunningAsynchronousSearchContexts() throws Exception { - int numThreads = 50; - List threadsList = new LinkedList<>(); - CyclicBarrier barrier = new CyclicBarrier(numThreads + 1); - for (int i = 0; i < numThreads; i++) { - threadsList.add(new Thread(() -> { - try { - SubmitAsynchronousSearchRequest validRequest = new SubmitAsynchronousSearchRequest(new SearchRequest()); - validRequest.keepAlive(TimeValue.timeValueHours(1)); - AsynchronousSearchResponse asResponse = executeSubmitAsynchronousSearch(validRequest); - assertNotNull(asResponse.getSearchResponse()); - } catch (IOException e) { - fail("submit request failed"); - } finally { + JvmInfo jvmInfo = JvmInfo.jvmInfo(); + //ByteSizeValue maxHeapSize = jvmInfo.getMem().getHeapMax(); + String useCompressedOops = jvmInfo.useCompressedOops(); + MemoryMXBean MEMORY_MX_BEAN = ManagementFactory.getMemoryMXBean(); + double size =MEMORY_MX_BEAN.getHeapMemoryUsage().getUsed(); + logger.info("USED size here : {}", MEMORY_MX_BEAN.getHeapMemoryUsage().getUsed()/1024/1024); + logger.info("heap size [{}], compressed ordinary object pointers [{}]", JvmInfo.jvmInfo().getMem().getHeapMax(), useCompressedOops); + try { + int numThreads = 50; + List threadsList = new LinkedList<>(); + logger.info(threadsList.size()); + CyclicBarrier barrier = new CyclicBarrier(numThreads + 1); + for (int i = 0; i < numThreads; i++) { + threadsList.add(new Thread(() -> { try { - barrier.await(); - } catch (Exception e) { - fail(); + SubmitAsynchronousSearchRequest validRequest = new SubmitAsynchronousSearchRequest(new SearchRequest()); + validRequest.keepAlive(TimeValue.timeValueHours(1)); + logger.info("USED size before : {}", MEMORY_MX_BEAN.getHeapMemoryUsage().getUsed() / 1024 / 1024); + AsynchronousSearchResponse asResponse = executeSubmitAsynchronousSearch(validRequest); + assertNotNull(asResponse.getSearchResponse()); + logger.info("USED size after : {}", MEMORY_MX_BEAN.getHeapMemoryUsage().getUsed() / 1024 / 1024); + } catch (IOException e) { + fail("submit request failed"); + } finally { + try { + barrier.await(); + } catch (Exception e) { + fail(); + } } } + )); + } + threadsList.forEach(Thread::start); + barrier.await(); + for (Thread thread : threadsList) { + logger.info("USED size thread : {}", MEMORY_MX_BEAN.getHeapMemoryUsage().getUsed() / 1024 / 1024); + thread.join(); } - )); - } - threadsList.forEach(Thread::start); - barrier.await(); - for (Thread thread : threadsList) { - thread.join(); - } + updateClusterSettings(AsynchronousSearchActiveStore.NODE_CONCURRENT_RUNNING_SEARCHES_SETTING.getKey(), 0); threadsList.clear(); @@ -116,9 +133,96 @@ public void testMaxRunningAsynchronousSearchContexts() throws Exception { for (Thread thread : threadsList) { thread.join(); } - assertEquals(numFailures.get(), 50); + assertEquals(numFailures.get(), 170); updateClusterSettings(AsynchronousSearchActiveStore.NODE_CONCURRENT_RUNNING_SEARCHES_SETTING.getKey(), AsynchronousSearchActiveStore.NODE_CONCURRENT_RUNNING_SEARCHES); + } catch (Exception e) { + logger.info("========== EXCEPTION : " + e.getMessage()); + logger.info("============== USED SIZE : " + MEMORY_MX_BEAN.getHeapMemoryUsage().getUsed() / 1024 / 1024); + } + } + + public void testMaxRunningAsynchronousSearchContexts1() throws Exception { + JvmInfo jvmInfo = JvmInfo.jvmInfo(); + //ByteSizeValue maxHeapSize = jvmInfo.getMem().getHeapMax(); + String useCompressedOops = jvmInfo.useCompressedOops(); + MemoryMXBean MEMORY_MX_BEAN = ManagementFactory.getMemoryMXBean(); + double size =MEMORY_MX_BEAN.getHeapMemoryUsage().getUsed(); + logger.info("USED size here : {}", MEMORY_MX_BEAN.getHeapMemoryUsage().getUsed()/1024/1024); + logger.info("heap size [{}], compressed ordinary object pointers [{}]", JvmInfo.jvmInfo().getMem().getHeapMax(), useCompressedOops); + try { + int numThreads = 50; + List threadsList = new LinkedList<>(); + logger.info(threadsList.size()); + CyclicBarrier barrier = new CyclicBarrier(numThreads + 1); + for (int i = 0; i < numThreads; i++) { + threadsList.add(new Thread(() -> { + try { + SearchRequest validRequest = new SearchRequest(); + validRequest.indices("test"); + //validRequest.keepAlive(TimeValue.timeValueHours(1)); + logger.info("USED size before : {}", MEMORY_MX_BEAN.getHeapMemoryUsage().getUsed() / 1024 / 1024); + SearchResponse asResponse = executeSubmitAsynchronousSearch(validRequest); + assertNotNull(asResponse); + logger.info("USED size after : {}", MEMORY_MX_BEAN.getHeapMemoryUsage().getUsed() / 1024 / 1024); + } catch (IOException e) { + fail("submit request failed"); + } finally { + try { + + barrier.await(); + } catch (Exception e) { + fail(); + } + } + } + )); + } + threadsList.forEach(Thread::start); + barrier.await(); + for (Thread thread : threadsList) { + logger.info("USED size thread : {}", MEMORY_MX_BEAN.getHeapMemoryUsage().getUsed() / 1024 / 1024); + thread.join(); + } + + + //updateClusterSettings(AsynchronousSearchActiveStore.NODE_CONCURRENT_RUNNING_SEARCHES_SETTING.getKey(), 0); + threadsList.clear(); + AtomicInteger numFailures = new AtomicInteger(); + for (int i = 0; i < numThreads; i++) { + threadsList.add(new Thread(() -> { + try { + SearchRequest validRequest = new SearchRequest(); + //validRequest.waitForCompletionTimeout(TimeValue.timeValueMillis(1)); + SearchResponse asResponse = executeSubmitAsynchronousSearch(validRequest); + } catch (Exception e) { + assertTrue(e instanceof ResponseException); + assertThat(e.getMessage(), containsString("Trying to create too many concurrent searches")); + numFailures.getAndIncrement(); + + } finally { + try { + numFailures.getAndIncrement(); + barrier.await(); + } catch (Exception e) { + fail(); + } + } + } + )); + } + threadsList.forEach(Thread::start); + barrier.await(); + for (Thread thread : threadsList) { + thread.join(); + } + assertEquals(numFailures.get(), 50); +// updateClusterSettings(AsynchronousSearchActiveStore.NODE_CONCURRENT_RUNNING_SEARCHES_SETTING.getKey(), +// AsynchronousSearchActiveStore.NODE_CONCURRENT_RUNNING_SEARCHES); + } catch (Exception e) { + logger.info("========== EXCEPTION : " + e.getMessage()); + logger.info("============== USED SIZE : " + MEMORY_MX_BEAN.getHeapMemoryUsage().getUsed() / 1024 / 1024); + } } public void testStoreAsyncSearchWithFailures() throws Exception { diff --git a/src/test/java/org/opensearch/search/asynchronous/utils/RestTestUtils.java b/src/test/java/org/opensearch/search/asynchronous/utils/RestTestUtils.java index 86aeb5a0..6432ddea 100644 --- a/src/test/java/org/opensearch/search/asynchronous/utils/RestTestUtils.java +++ b/src/test/java/org/opensearch/search/asynchronous/utils/RestTestUtils.java @@ -78,6 +78,29 @@ public static Request buildHttpRequest(SubmitAsynchronousSearchRequest submitAsy return request; } + public static Request buildHttpRequest(SearchRequest submitAsynchronousSearchRequest + , boolean shouldUseLegacyApi) throws IOException { + + SearchRequest searchRequest = submitAsynchronousSearchRequest; + Request request = + new Request( + HttpGet.METHOD_NAME, + /*trim first backslash*/ + endpoint( + searchRequest.indices(), + "_search")); + + Params params = new Params(); + //addSearchRequestParams(params, searchRequest); + //addSubmitAsynchronousSearchRequestParams(params, submitAsynchronousSearchRequest); + + if (searchRequest.source() != null) { + request.setEntity(createEntity(searchRequest.source(), REQUEST_BODY_CONTENT_TYPE)); + } + request.addParameters(params.asMap()); + return request; + } + public static Request buildHttpRequest(GetAsynchronousSearchRequest getAsynchronousSearchRequest) { return buildHttpRequest(getAsynchronousSearchRequest, false); }