Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Test PR with search concurrency #338

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ jobs:
- name: Checkout Branch
uses: actions/checkout@v2
- name: Build with Gradle
run: ./gradlew build
run: ./gradlew build -Dtests.heap.size=1G
- name: Pull and Run Docker for security tests
run: |
version=`./gradlew properties -q | grep "opensearch_version:" | awk '{print $2}'`
Expand Down Expand Up @@ -74,10 +74,10 @@ jobs:
if [ $security -gt 0 ]
then
echo "Security plugin is available"
./gradlew integTest -Dtests.rest.cluster=localhost:9200 -Dtests.cluster=localhost:9200 -Dtests.clustername="docker-cluster" -Dhttps=true -Duser=admin -Dpassword=admin
./gradlew integTest -Dtests.rest.cluster=localhost:9200 -Dtests.cluster=localhost:9200 -Dtests.clustername="docker-cluster" -Dhttps=true -Duser=admin -Dpassword=admin -Dtests.heap.size=1G
else
echo "Security plugin is NOT available"
./gradlew integTest -Dtests.rest.cluster=localhost:9200 -Dtests.cluster=localhost:9200 -Dtests.clustername="docker-cluster"
./gradlew integTest -Dtests.rest.cluster=localhost:9200 -Dtests.cluster=localhost:9200 -Dtests.clustername="docker-cluster" -Dtests.heap.size=1G
fi
- name: Upload failed logs
uses: actions/upload-artifact@v2
Expand Down
5 changes: 3 additions & 2 deletions .github/workflows/multi-node-test-workflow.yml
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,15 @@ jobs:
with:
java-version: ${{ env.java_version }}
# This step uses the checkout Github action: https://github.com/actions/checkout
# Custom setting - heap size = 1G ( 512MB is default )
- name: Checkout Branch
uses: actions/checkout@v2
- name: Run integration tests with multi node config
run: ./gradlew integTest -PnumNodes=5
run: ./gradlew integTest -PnumNodes=5 -Dtests.heap.size=1G
- name: Run Backwards Compatibility Tests
run: |
echo "Running backwards compatibility tests ..."
./gradlew bwcTestSuite -Dtests.security.manager=false
./gradlew bwcTestSuite -Dtests.security.manager=false -Dtests.heap.size=1G
- name: Upload failed logs
uses: actions/upload-artifact@v2
if: failure()
Expand Down
6 changes: 5 additions & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ apply plugin: 'opensearch.pluginzip'


checkstyle {
toolVersion = '8.29'
toolVersion = '10.12.1'
configFile file("checkstyle/checkstyle.xml")
}

Expand Down Expand Up @@ -134,6 +134,7 @@ dependencies {
implementation "org.opensearch:common-utils:${common_utils_version}"
configurations.all {
resolutionStrategy {
force "com.google.guava:guava:32.0.1-jre"
force "com.puppycrawl.tools:checkstyle:${project.checkstyle.toolVersion}"
}
}
Expand Down Expand Up @@ -170,6 +171,7 @@ def securityEnabled = System.getProperty("security", "false") == "true"
test {
systemProperty 'tests.security.manager', 'false'
systemProperty 'es.set.netty.runtime.available.processors', 'false'
systemProperty 'tests.heap.size', '1G'

}

Expand All @@ -196,6 +198,7 @@ testClusters.integTest {

integTest {
systemProperty 'tests.security.manager', 'false'
systemProperty 'tests.heap.size', '1G'
systemProperty 'java.io.tmpdir', opensearch_tmp_dir.absolutePath
systemProperty 'buildDir', buildDir.path
systemProperty "https", System.getProperty("https", securityEnabled.toString())
Expand Down Expand Up @@ -229,6 +232,7 @@ task integTestRemote(type: RestIntegTestTask) {
testClassesDirs = sourceSets.test.output.classesDirs
classpath = sourceSets.test.runtimeClasspath
systemProperty 'tests.security.manager', 'false'
systemProperty 'tests.heap.size', '1G'
systemProperty 'java.io.tmpdir', opensearch_tmp_dir.absolutePath

systemProperty "https", System.getProperty("https")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

package org.opensearch.search.asynchronous.restIT;

import org.opensearch.action.search.SearchRequest;
import org.opensearch.core.common.Strings;
import org.opensearch.core.xcontent.DeprecationHandler;
import org.opensearch.core.xcontent.MediaType;
Expand Down Expand Up @@ -105,6 +106,12 @@ protected AsynchronousSearchResponse executeSubmitAsynchronousSearch(
return executeSubmitAsynchronousSearch(submitAsynchronousSearchRequest, false);
}

protected SearchResponse executeSubmitAsynchronousSearch(
@Nullable SearchRequest submitAsynchronousSearchRequest)
throws IOException {
return executeSubmitAsynchronousSearch(submitAsynchronousSearchRequest, false);
}

protected AsynchronousSearchResponse executeSubmitAsynchronousSearch(
@Nullable SubmitAsynchronousSearchRequest submitAsynchronousSearchRequest, boolean shouldUseLegacyApi)
throws IOException {
Expand All @@ -113,6 +120,14 @@ protected AsynchronousSearchResponse executeSubmitAsynchronousSearch(
return parseEntity(resp.getEntity(), AsynchronousSearchResponse::fromXContent);
}


protected SearchResponse executeSubmitAsynchronousSearch(
@Nullable SearchRequest submitAsynchronousSearchRequest, boolean shouldUseLegacyApi)
throws IOException {
Request request = RestTestUtils.buildHttpRequest(submitAsynchronousSearchRequest, shouldUseLegacyApi);
Response resp = client().performRequest(request);
return parseEntity(resp.getEntity(), SearchResponse::fromXContent);
}
Response executeDeleteAsynchronousSearch(DeleteAsynchronousSearchRequest deleteAsynchronousSearchRequest) throws IOException {
Request request = RestTestUtils.buildHttpRequest(deleteAsynchronousSearchRequest);
return client().performRequest(request);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@

package org.opensearch.search.asynchronous.restIT;

import org.opensearch.action.search.SearchResponse;
import org.opensearch.monitor.jvm.JvmInfo;
import org.opensearch.search.asynchronous.context.active.AsynchronousSearchActiveStore;
import org.opensearch.search.asynchronous.context.state.AsynchronousSearchState;
import org.opensearch.search.asynchronous.request.GetAsynchronousSearchRequest;
Expand All @@ -16,6 +18,8 @@
import org.opensearch.common.unit.TimeValue;

import java.io.IOException;
import java.lang.management.ManagementFactory;
import java.lang.management.MemoryMXBean;
import java.util.Arrays;
import java.util.LinkedList;
import java.util.List;
Expand Down Expand Up @@ -59,33 +63,46 @@ public void testSubmitInvalidWaitForCompletion() throws Exception {
}

public void testMaxRunningAsynchronousSearchContexts() throws Exception {
int numThreads = 50;
List<Thread> threadsList = new LinkedList<>();
CyclicBarrier barrier = new CyclicBarrier(numThreads + 1);
for (int i = 0; i < numThreads; i++) {
threadsList.add(new Thread(() -> {
try {
SubmitAsynchronousSearchRequest validRequest = new SubmitAsynchronousSearchRequest(new SearchRequest());
validRequest.keepAlive(TimeValue.timeValueHours(1));
AsynchronousSearchResponse asResponse = executeSubmitAsynchronousSearch(validRequest);
assertNotNull(asResponse.getSearchResponse());
} catch (IOException e) {
fail("submit request failed");
} finally {
JvmInfo jvmInfo = JvmInfo.jvmInfo();
//ByteSizeValue maxHeapSize = jvmInfo.getMem().getHeapMax();
String useCompressedOops = jvmInfo.useCompressedOops();
MemoryMXBean MEMORY_MX_BEAN = ManagementFactory.getMemoryMXBean();
double size =MEMORY_MX_BEAN.getHeapMemoryUsage().getUsed();
logger.info("USED size here : {}", MEMORY_MX_BEAN.getHeapMemoryUsage().getUsed()/1024/1024);
logger.info("heap size [{}], compressed ordinary object pointers [{}]", JvmInfo.jvmInfo().getMem().getHeapMax(), useCompressedOops);
try {
int numThreads = 50;
List<Thread> threadsList = new LinkedList<>();
logger.info(threadsList.size());
CyclicBarrier barrier = new CyclicBarrier(numThreads + 1);
for (int i = 0; i < numThreads; i++) {
threadsList.add(new Thread(() -> {
try {
barrier.await();
} catch (Exception e) {
fail();
SubmitAsynchronousSearchRequest validRequest = new SubmitAsynchronousSearchRequest(new SearchRequest());
validRequest.keepAlive(TimeValue.timeValueHours(1));
logger.info("USED size before : {}", MEMORY_MX_BEAN.getHeapMemoryUsage().getUsed() / 1024 / 1024);
AsynchronousSearchResponse asResponse = executeSubmitAsynchronousSearch(validRequest);
assertNotNull(asResponse.getSearchResponse());
logger.info("USED size after : {}", MEMORY_MX_BEAN.getHeapMemoryUsage().getUsed() / 1024 / 1024);
} catch (IOException e) {
fail("submit request failed");
} finally {
try {
barrier.await();
} catch (Exception e) {
fail();
}
}
}
));
}
threadsList.forEach(Thread::start);
barrier.await();
for (Thread thread : threadsList) {
logger.info("USED size thread : {}", MEMORY_MX_BEAN.getHeapMemoryUsage().getUsed() / 1024 / 1024);
thread.join();
}
));
}
threadsList.forEach(Thread::start);
barrier.await();
for (Thread thread : threadsList) {
thread.join();
}


updateClusterSettings(AsynchronousSearchActiveStore.NODE_CONCURRENT_RUNNING_SEARCHES_SETTING.getKey(), 0);
threadsList.clear();
Expand Down Expand Up @@ -116,9 +133,96 @@ public void testMaxRunningAsynchronousSearchContexts() throws Exception {
for (Thread thread : threadsList) {
thread.join();
}
assertEquals(numFailures.get(), 50);
assertEquals(numFailures.get(), 170);
updateClusterSettings(AsynchronousSearchActiveStore.NODE_CONCURRENT_RUNNING_SEARCHES_SETTING.getKey(),
AsynchronousSearchActiveStore.NODE_CONCURRENT_RUNNING_SEARCHES);
} catch (Exception e) {
logger.info("========== EXCEPTION : " + e.getMessage());
logger.info("============== USED SIZE : " + MEMORY_MX_BEAN.getHeapMemoryUsage().getUsed() / 1024 / 1024);
}
}

public void testMaxRunningAsynchronousSearchContexts1() throws Exception {
JvmInfo jvmInfo = JvmInfo.jvmInfo();
//ByteSizeValue maxHeapSize = jvmInfo.getMem().getHeapMax();
String useCompressedOops = jvmInfo.useCompressedOops();
MemoryMXBean MEMORY_MX_BEAN = ManagementFactory.getMemoryMXBean();
double size =MEMORY_MX_BEAN.getHeapMemoryUsage().getUsed();
logger.info("USED size here : {}", MEMORY_MX_BEAN.getHeapMemoryUsage().getUsed()/1024/1024);
logger.info("heap size [{}], compressed ordinary object pointers [{}]", JvmInfo.jvmInfo().getMem().getHeapMax(), useCompressedOops);
try {
int numThreads = 50;
List<Thread> threadsList = new LinkedList<>();
logger.info(threadsList.size());
CyclicBarrier barrier = new CyclicBarrier(numThreads + 1);
for (int i = 0; i < numThreads; i++) {
threadsList.add(new Thread(() -> {
try {
SearchRequest validRequest = new SearchRequest();
validRequest.indices("test");
//validRequest.keepAlive(TimeValue.timeValueHours(1));
logger.info("USED size before : {}", MEMORY_MX_BEAN.getHeapMemoryUsage().getUsed() / 1024 / 1024);
SearchResponse asResponse = executeSubmitAsynchronousSearch(validRequest);
assertNotNull(asResponse);
logger.info("USED size after : {}", MEMORY_MX_BEAN.getHeapMemoryUsage().getUsed() / 1024 / 1024);
} catch (IOException e) {
fail("submit request failed");
} finally {
try {

barrier.await();
} catch (Exception e) {
fail();
}
}
}
));
}
threadsList.forEach(Thread::start);
barrier.await();
for (Thread thread : threadsList) {
logger.info("USED size thread : {}", MEMORY_MX_BEAN.getHeapMemoryUsage().getUsed() / 1024 / 1024);
thread.join();
}


//updateClusterSettings(AsynchronousSearchActiveStore.NODE_CONCURRENT_RUNNING_SEARCHES_SETTING.getKey(), 0);
threadsList.clear();
AtomicInteger numFailures = new AtomicInteger();
for (int i = 0; i < numThreads; i++) {
threadsList.add(new Thread(() -> {
try {
SearchRequest validRequest = new SearchRequest();
//validRequest.waitForCompletionTimeout(TimeValue.timeValueMillis(1));
SearchResponse asResponse = executeSubmitAsynchronousSearch(validRequest);
} catch (Exception e) {
assertTrue(e instanceof ResponseException);
assertThat(e.getMessage(), containsString("Trying to create too many concurrent searches"));
numFailures.getAndIncrement();

} finally {
try {
numFailures.getAndIncrement();
barrier.await();
} catch (Exception e) {
fail();
}
}
}
));
}
threadsList.forEach(Thread::start);
barrier.await();
for (Thread thread : threadsList) {
thread.join();
}
assertEquals(numFailures.get(), 50);
// updateClusterSettings(AsynchronousSearchActiveStore.NODE_CONCURRENT_RUNNING_SEARCHES_SETTING.getKey(),
// AsynchronousSearchActiveStore.NODE_CONCURRENT_RUNNING_SEARCHES);
} catch (Exception e) {
logger.info("========== EXCEPTION : " + e.getMessage());
logger.info("============== USED SIZE : " + MEMORY_MX_BEAN.getHeapMemoryUsage().getUsed() / 1024 / 1024);
}
}

public void testStoreAsyncSearchWithFailures() throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,29 @@ public static Request buildHttpRequest(SubmitAsynchronousSearchRequest submitAsy
return request;
}

public static Request buildHttpRequest(SearchRequest submitAsynchronousSearchRequest
, boolean shouldUseLegacyApi) throws IOException {

SearchRequest searchRequest = submitAsynchronousSearchRequest;
Request request =
new Request(
HttpGet.METHOD_NAME,
/*trim first backslash*/
endpoint(
searchRequest.indices(),
"_search"));

Params params = new Params();
//addSearchRequestParams(params, searchRequest);
//addSubmitAsynchronousSearchRequestParams(params, submitAsynchronousSearchRequest);

if (searchRequest.source() != null) {
request.setEntity(createEntity(searchRequest.source(), REQUEST_BODY_CONTENT_TYPE));
}
request.addParameters(params.asMap());
return request;
}

public static Request buildHttpRequest(GetAsynchronousSearchRequest getAsynchronousSearchRequest) {
return buildHttpRequest(getAsynchronousSearchRequest, false);
}
Expand Down