Skip to content

Commit

Permalink
Merge 882b99a into 572e7bc
Browse files Browse the repository at this point in the history
  • Loading branch information
khushbr authored Aug 14, 2023
2 parents 572e7bc + 882b99a commit 1c73519
Show file tree
Hide file tree
Showing 6 changed files with 219 additions and 21 deletions.
17 changes: 9 additions & 8 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ buildscript {

dependencies {
classpath "org.opensearch.gradle:build-tools:${opensearch_version}"
classpath group: 'com.google.guava', name: 'guava', version: '31.1-jre'
}
}

Expand Down Expand Up @@ -186,15 +185,15 @@ jacocoTestCoverageVerification {
}
}
}
}
}
else {
violationRules {
rule {
limit {
minimum = 0.6
}
}
}
}
}
}

Expand Down Expand Up @@ -373,16 +372,18 @@ dependencies {
strictly "2.23.0"
}
}
testImplementation group: 'org.powermock', name: 'powermock-core', version: '2.0.0'
testImplementation group: 'org.powermock', name: 'powermock-api-support', version: '2.0.0'
testImplementation group: 'org.powermock', name: 'powermock-module-junit4-common', version: '2.0.0'
testImplementation group: 'org.javassist', name: 'javassist', version: '3.24.0-GA'
testImplementation group: 'org.powermock', name: 'powermock-reflect', version: '2.0.0'
testImplementation group: 'org.powermock', name: 'powermock-core', version: '2.0.2'
testImplementation group: 'org.powermock', name: 'powermock-api-support', version: '2.0.2'
testImplementation group: 'org.powermock', name: 'powermock-module-junit4-common', version: '2.0.2'
testImplementation group: 'org.javassist', name: 'javassist', version: '3.28.0-GA'
testImplementation group: 'org.powermock', name: 'powermock-reflect', version: '2.0.2'
testImplementation group: 'net.bytebuddy', name: 'byte-buddy', version: '1.9.3'
testImplementation group: 'org.objenesis', name: 'objenesis', version: '3.0.1'
testImplementation group: 'org.hamcrest', name: 'hamcrest-library', version: '2.1'
testImplementation group: 'org.hamcrest', name: 'hamcrest', version: '2.1'
testImplementation group: 'junit', name: 'junit', version: "${junitVersion}"
testImplementation group: 'org.xerial', name: 'sqlite-jdbc', version: '3.41.2.2'
testImplementation group: 'com.github.stefanbirkner', name: 'system-rules', version: '1.19.0'

// Required for Docker build
dockerBuild group: 'org.opensearch.plugin', name:'performance-analyzer', version: "${opensearch_build}"
Expand Down
2 changes: 0 additions & 2 deletions docker/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,6 @@ services:

opensearch2:
container_name: opensearch2
environment:
- node.cluster_manager=false
image: opensearch/pa-rca:3.0
mem_limit: 4g
networks:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.performanceanalyzer;


import io.netty.handler.codec.http.HttpMethod;
import java.io.DataOutputStream;
import java.net.HttpURLConnection;
import java.net.URL;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

public class ESLocalhostConnection {

private static final Logger LOG = LogManager.getLogger(ESLocalhostConnection.class);

private static final int TIMEOUT_MILLIS = 30000;

public static int makeHttpRequest(String path, HttpMethod httpMethod, String requestBody) {
HttpURLConnection connection = null;
try {
connection = createHTTPConnection(path, httpMethod);
DataOutputStream stream = new DataOutputStream(connection.getOutputStream());
stream.writeBytes(requestBody);
stream.flush();
stream.close();
return connection.getResponseCode();
} catch (Exception e) {
throw new RuntimeException("Request failed: " + e.getMessage(), e);
} finally {
if (connection != null) {
connection.disconnect();
}
}
}

public static HttpURLConnection createHTTPConnection(String path, HttpMethod httpMethod) {
try {
String endPoint = "https://localhost:9200" + path;
URL endpointUrl = new URL(endPoint);
HttpURLConnection connection = (HttpURLConnection) endpointUrl.openConnection();
connection.setRequestMethod(httpMethod.toString());

connection.setConnectTimeout(TIMEOUT_MILLIS);
connection.setReadTimeout(TIMEOUT_MILLIS);
connection.setUseCaches(false);
connection.setDoOutput(true);
return connection;
} catch (Exception e) {
throw new RuntimeException(
"Failed to create OpenSearch Connection: " + e.getMessage(), e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.sun.net.httpserver.HttpServer;
import io.netty.handler.codec.http.HttpMethod;
import java.net.HttpURLConnection;
import java.util.*;
import java.util.concurrent.*;
import org.apache.logging.log4j.LogManager;
Expand Down Expand Up @@ -51,6 +53,7 @@ public class PerformanceAnalyzerApp {

private static final Logger LOG = LogManager.getLogger(PerformanceAnalyzerApp.class);

public static final int READER_RESTART_MAX_ATTEMPTS = 3;
private static final int EXCEPTION_QUEUE_LENGTH = 1;
private static final ScheduledMetricCollectorsExecutor METRIC_COLLECTOR_EXECUTOR =
new ScheduledMetricCollectorsExecutor(1, false);
Expand Down Expand Up @@ -234,13 +237,14 @@ public static Thread startGrpcServerThread(
return grpcServerThread;
}

private static void startReaderThread(
public static Thread startReaderThread(
final AppContext appContext, final ThreadProvider threadProvider) {
PluginSettings settings = PluginSettings.instance();
final Thread readerThread =
threadProvider.createThreadForRunnable(
() -> {
while (true) {
int retryAttemptLeft = READER_RESTART_MAX_ATTEMPTS;
while (retryAttemptLeft > 0) {
try {
ReaderMetricsProcessor mp =
new ReaderMetricsProcessor(
Expand All @@ -250,21 +254,66 @@ private static void startReaderThread(
ReaderMetricsProcessor.setCurrentInstance(mp);
mp.run();
} catch (Throwable e) {
if (TroubleshootingConfig.getEnableDevAssert()) {
break;
}
retryAttemptLeft--;
LOG.error(
"Error in ReaderMetricsProcessor...restarting, ExceptionCode: {}",
StatExceptionCode.READER_RESTART_PROCESSING.toString());
"Error in ReaderMetricsProcessor...restarting, retryCount left: {}."
+ "Exception: {}",
retryAttemptLeft,
e.getMessage());
StatsCollector.instance()
.logException(
StatExceptionCode.READER_RESTART_PROCESSING);

if (TroubleshootingConfig.getEnableDevAssert()) break;

// All retry attempts exhausted; handle thread failure
if (retryAttemptLeft <= 0) handleReaderThreadFailed();
}
}
},
PerformanceAnalyzerThreads.PA_READER);

readerThread.start();
return readerThread;
}

private static void handleReaderThreadFailed() {
// Reader subcomponent is responsible for processing, cleaning metrics written by PA.
// Since Reader thread fails to start successfully, execute following:
//
// 1. Disable PA - Stop collecting OpenSearch metrics
// 2. Terminate RCA Process - Gracefully shutdown all
// existing resources/channels, including Reader.
try {
LOG.info(
"Exhausted {} attempts - unable to start Reader Thread successfully; disable PA",
READER_RESTART_MAX_ATTEMPTS);
disablePA();
LOG.info("Attempt to disable PA succeeded.");
StatsCollector.instance()
.logException(StatExceptionCode.READER_ERROR_PA_DISABLE_SUCCESS);
} catch (Throwable e) {
LOG.info("Attempt to disable PA failing: {}", e.getMessage());
StatsCollector.instance()
.logException(StatExceptionCode.READER_ERROR_PA_DISABLE_FAILED);
}

LOG.info("Reader thread not coming up successfully - Shutting down RCA Runtime");
StatsCollector.instance().logException(StatExceptionCode.READER_ERROR_RCA_AGENT_STOPPED);

// Terminate Java Runtime, executes {@link #shutDownGracefully(ClientServers clientServers)}
System.exit(1);
}

private static void disablePA() {
String PA_CONFIG_PATH = Util.PA_BASE_URL + "/cluster/config";
String PA_DISABLE_PAYLOAD = "{\"enabled\": false}";

int resCode =
ESLocalhostConnection.makeHttpRequest(
PA_CONFIG_PATH, HttpMethod.POST, PA_DISABLE_PAYLOAD);
if (resCode != HttpURLConnection.HTTP_OK) {
throw new RuntimeException("Failed to disable PA");
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,14 @@ public final class Version {
* transferred packets should be dropped. Every increment here should be accompanied with a line
* describing the version bump.
*
* Note: The RCA version is agnostic of OpenSearch version.
* <p>Note: The RCA version is agnostic of OpenSearch version.
*/
static final class Major {
// Bumping this post the Commons Lib(https://github.com/opensearch-project/performance-analyzer-commons/issues/2)
// and Service Metrics(https://github.com/opensearch-project/performance-analyzer-commons/issues/8) change
// Bumping this post the Commons
// Lib(https://github.com/opensearch-project/performance-analyzer-commons/issues/2)
// and Service
// Metrics(https://github.com/opensearch-project/performance-analyzer-commons/issues/8)
// change
static final int RCA_MAJ_VERSION = 1;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,22 +5,46 @@

package org.opensearch.performanceanalyzer;

import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.eq;
import static org.powermock.api.mockito.PowerMockito.doThrow;
import static org.powermock.api.mockito.PowerMockito.mock;

import io.netty.handler.codec.http.HttpMethod;
import java.util.concurrent.ArrayBlockingQueue;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.contrib.java.lang.system.ExpectedSystemExit;
import org.junit.runner.RunWith;
import org.opensearch.performanceanalyzer.commons.config.ConfigStatus;
import org.opensearch.performanceanalyzer.commons.stats.metrics.StatExceptionCode;
import org.opensearch.performanceanalyzer.rca.RcaTestHelper;
import org.opensearch.performanceanalyzer.reader.ReaderMetricsProcessor;
import org.opensearch.performanceanalyzer.threads.ThreadProvider;
import org.opensearch.performanceanalyzer.threads.exceptions.PAThreadException;
import org.powermock.api.mockito.PowerMockito;
import org.powermock.core.classloader.annotations.PowerMockIgnore;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;

@RunWith(PowerMockRunner.class)
@PrepareForTest({PerformanceAnalyzerApp.class, ESLocalhostConnection.class})
@PowerMockIgnore({
"com.sun.org.apache.xerces.*",
"javax.xml.*",
"org.xml.*",
"javax.management.*",
"org.w3c.*"
})
public class PerformanceAnalyzerAppTest {
@Rule public final ExpectedSystemExit exit = ExpectedSystemExit.none();

@Before
public void setup() {
System.setProperty("performanceanalyzer.metrics.log.enabled", "False");
RcaTestHelper.cleanUpLogs();
}

@Test
Expand Down Expand Up @@ -49,4 +73,70 @@ public void testStartErrorHandlingThread() throws InterruptedException {
RcaTestHelper.verifyStatException(
StatExceptionCode.ERROR_HANDLER_THREAD_STOPPED.toString()));
}

@Test
public void testStartReaderThreadOneAttemptFailed() throws Exception {
ThreadProvider threadProvider = new ThreadProvider();
AppContext appContext = new AppContext();

Thread readerThread = PerformanceAnalyzerApp.startReaderThread(appContext, threadProvider);
readerThread.interrupt();
Assert.assertTrue(
"READER_RESTART_PROCESSING metric missing",
RcaTestHelper.verifyStatException(
StatExceptionCode.READER_RESTART_PROCESSING.toString()));
Assert.assertFalse(
"READER_ERROR_RCA_AGENT_STOPPED metric present",
RcaTestHelper.verifyStatException(
StatExceptionCode.READER_ERROR_RCA_AGENT_STOPPED.toString()));
}

@Test
public void testStartReaderThreadAllAttemptFail() throws Exception {
ThreadProvider threadProvider = new ThreadProvider();
AppContext appContext = new AppContext();

PowerMockito.mockStatic(ESLocalhostConnection.class);
ReaderMetricsProcessor readerMetricsProcessor = mock(ReaderMetricsProcessor.class);
doThrow(new RuntimeException("Force Crashing Reader Thread"))
.when(readerMetricsProcessor)
.run();
PowerMockito.whenNew(ReaderMetricsProcessor.class)
.withAnyArguments()
.thenReturn(readerMetricsProcessor);

// PA Disable Success
PowerMockito.when(
ESLocalhostConnection.makeHttpRequest(
anyString(), eq(HttpMethod.POST), anyString()))
.thenReturn(200);
PerformanceAnalyzerApp.startReaderThread(appContext, threadProvider);
Assert.assertTrue(
RcaTestHelper.verifyStatException(
StatExceptionCode.READER_RESTART_PROCESSING.toString()));
Assert.assertTrue(
RcaTestHelper.verifyStatException(
StatExceptionCode.READER_ERROR_PA_DISABLE_SUCCESS.toString()));
Assert.assertTrue(
RcaTestHelper.verifyStatException(
StatExceptionCode.READER_ERROR_RCA_AGENT_STOPPED.toString()));
exit.expectSystemExitWithStatus(1);

// PA Disable Fail
PowerMockito.when(
ESLocalhostConnection.makeHttpRequest(
anyString(), eq(HttpMethod.POST), anyString()))
.thenReturn(500);
PerformanceAnalyzerApp.startReaderThread(appContext, threadProvider);
Assert.assertTrue(
RcaTestHelper.verifyStatException(
StatExceptionCode.READER_RESTART_PROCESSING.toString()));
Assert.assertTrue(
RcaTestHelper.verifyStatException(
StatExceptionCode.READER_ERROR_PA_DISABLE_FAILED.toString()));
Assert.assertTrue(
RcaTestHelper.verifyStatException(
StatExceptionCode.READER_ERROR_RCA_AGENT_STOPPED.toString()));
exit.expectSystemExitWithStatus(1);
}
}

0 comments on commit 1c73519

Please sign in to comment.