Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handle Reader thread termination gracefully #476

Merged
merged 5 commits into from
Sep 7, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 9 additions & 8 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ buildscript {

dependencies {
classpath "org.opensearch.gradle:build-tools:${opensearch_version}"
classpath group: 'com.google.guava', name: 'guava', version: '31.1-jre'
}
}

Expand Down Expand Up @@ -186,15 +185,15 @@ jacocoTestCoverageVerification {
}
}
}
}
}
else {
violationRules {
rule {
limit {
minimum = 0.6
}
}
}
}
}
}

Expand Down Expand Up @@ -373,16 +372,18 @@ dependencies {
strictly "2.23.0"
}
}
testImplementation group: 'org.powermock', name: 'powermock-core', version: '2.0.0'
testImplementation group: 'org.powermock', name: 'powermock-api-support', version: '2.0.0'
testImplementation group: 'org.powermock', name: 'powermock-module-junit4-common', version: '2.0.0'
testImplementation group: 'org.javassist', name: 'javassist', version: '3.24.0-GA'
testImplementation group: 'org.powermock', name: 'powermock-reflect', version: '2.0.0'
testImplementation group: 'org.powermock', name: 'powermock-core', version: '2.0.2'
testImplementation group: 'org.powermock', name: 'powermock-api-support', version: '2.0.2'
testImplementation group: 'org.powermock', name: 'powermock-module-junit4-common', version: '2.0.2'
testImplementation group: 'org.javassist', name: 'javassist', version: '3.28.0-GA'
testImplementation group: 'org.powermock', name: 'powermock-reflect', version: '2.0.2'
testImplementation group: 'net.bytebuddy', name: 'byte-buddy', version: '1.9.3'
testImplementation group: 'org.objenesis', name: 'objenesis', version: '3.0.1'
testImplementation group: 'org.hamcrest', name: 'hamcrest-library', version: '2.1'
testImplementation group: 'org.hamcrest', name: 'hamcrest', version: '2.1'
testImplementation group: 'junit', name: 'junit', version: "${junitVersion}"
testImplementation group: 'org.xerial', name: 'sqlite-jdbc', version: '3.41.2.2'
testImplementation group: 'com.github.stefanbirkner', name: 'system-rules', version: '1.19.0'

// Required for Docker build
dockerBuild group: 'org.opensearch.plugin', name:'performance-analyzer', version: "${opensearch_build}"
Expand Down
2 changes: 0 additions & 2 deletions docker/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,6 @@ services:

opensearch2:
container_name: opensearch2
environment:
- node.cluster_manager=false
image: opensearch/pa-rca:3.0
mem_limit: 4g
networks:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.performanceanalyzer;


import io.netty.handler.codec.http.HttpMethod;
import java.io.DataOutputStream;
import java.net.HttpURLConnection;
import java.net.URL;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

public class ESLocalhostConnection {

private static final Logger LOG = LogManager.getLogger(ESLocalhostConnection.class);

private static final int TIMEOUT_MILLIS = 30000;

public static int makeHttpRequest(String path, HttpMethod httpMethod, String requestBody) {
HttpURLConnection connection = null;
try {
connection = createHTTPConnection(path, httpMethod);
DataOutputStream stream = new DataOutputStream(connection.getOutputStream());
stream.writeBytes(requestBody);
stream.flush();
stream.close();
return connection.getResponseCode();
} catch (Exception e) {
throw new RuntimeException("Request failed: " + e.getMessage(), e);
} finally {
if (connection != null) {
connection.disconnect();
}
}
}

public static HttpURLConnection createHTTPConnection(String path, HttpMethod httpMethod) {
try {
String endPoint = "https://localhost:9200" + path;
URL endpointUrl = new URL(endPoint);
HttpURLConnection connection = (HttpURLConnection) endpointUrl.openConnection();
connection.setRequestMethod(httpMethod.toString());

connection.setConnectTimeout(TIMEOUT_MILLIS);
connection.setReadTimeout(TIMEOUT_MILLIS);
connection.setUseCaches(false);
connection.setDoOutput(true);
return connection;
} catch (Exception e) {
throw new RuntimeException(
"Failed to create OpenSearch Connection: " + e.getMessage(), e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.sun.net.httpserver.HttpServer;
import io.netty.handler.codec.http.HttpMethod;
import java.net.HttpURLConnection;
import java.util.*;
import java.util.concurrent.*;
import org.apache.logging.log4j.LogManager;
Expand Down Expand Up @@ -51,6 +53,7 @@ public class PerformanceAnalyzerApp {

private static final Logger LOG = LogManager.getLogger(PerformanceAnalyzerApp.class);

public static final int READER_RESTART_MAX_ATTEMPTS = 3;
khushbr marked this conversation as resolved.
Show resolved Hide resolved
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should probably do some test runs in the docker env to understand what could be the correct value for READER_RESTART_MAX_ATTEMPTS. We can also consider exponential backoff here in case the env set up takes time.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will increase the number of attempts here to 12(1min).

Exponential Backoff makes sense for API calls but here, a thread is crashing while attempting to read data from disk - unlikely, the backoff retry will help here.

private static final int EXCEPTION_QUEUE_LENGTH = 1;
private static final ScheduledMetricCollectorsExecutor METRIC_COLLECTOR_EXECUTOR =
new ScheduledMetricCollectorsExecutor(1, false);
Expand Down Expand Up @@ -234,13 +237,14 @@ public static Thread startGrpcServerThread(
return grpcServerThread;
}

private static void startReaderThread(
public static void startReaderThread(
final AppContext appContext, final ThreadProvider threadProvider) {
PluginSettings settings = PluginSettings.instance();
final Thread readerThread =
threadProvider.createThreadForRunnable(
() -> {
while (true) {
int retryAttemptLeft = READER_RESTART_MAX_ATTEMPTS;
while (retryAttemptLeft > 0) {
try {
ReaderMetricsProcessor mp =
new ReaderMetricsProcessor(
Expand All @@ -250,23 +254,71 @@ private static void startReaderThread(
ReaderMetricsProcessor.setCurrentInstance(mp);
mp.run();
} catch (Throwable e) {
if (TroubleshootingConfig.getEnableDevAssert()) {
break;
}
retryAttemptLeft--;
LOG.error(
"Error in ReaderMetricsProcessor...restarting, ExceptionCode: {}",
StatExceptionCode.READER_RESTART_PROCESSING.toString());
"Error in ReaderMetricsProcessor...restarting, retryCount left: {}."
+ "Exception: {}",
retryAttemptLeft,
e.getMessage());
StatsCollector.instance()
.logException(
StatExceptionCode.READER_RESTART_PROCESSING);

if (TroubleshootingConfig.getEnableDevAssert()) break;

// All retry attempts exhausted; handle thread failure
if (retryAttemptLeft <= 0) handleReaderThreadFailed();
}
}
},
PerformanceAnalyzerThreads.PA_READER);

readerThread.start();
}

private static void handleReaderThreadFailed() {
// Reader subcomponent is responsible for processing, cleaning metrics written by PA.
// Since Reader thread fails to start successfully, execute following:
//
// 1. Disable PA - Stop collecting OpenSearch metrics
// 2. Terminate RCA Process - Gracefully shutdown all
// existing resources/channels, including Reader.
try {
LOG.info(
"Exhausted {} attempts - unable to start Reader Thread successfully; disable PA",
READER_RESTART_MAX_ATTEMPTS);
disablePA();
LOG.info("Attempt to disable PA succeeded.");
StatsCollector.instance()
.logException(StatExceptionCode.READER_ERROR_PA_DISABLE_SUCCESS);
} catch (Throwable e) {
LOG.info("Attempt to disable PA failing: {}", e.getMessage());
StatsCollector.instance()
.logException(StatExceptionCode.READER_ERROR_PA_DISABLE_FAILED);
} finally {
cleanupAndExit();
}
}

private static void disablePA() {
String PA_CONFIG_PATH = Util.PA_BASE_URL + "/cluster/config";
String PA_DISABLE_PAYLOAD = "{\"enabled\": false}";

int resCode =
ESLocalhostConnection.makeHttpRequest(
PA_CONFIG_PATH, HttpMethod.POST, PA_DISABLE_PAYLOAD);
khushbr marked this conversation as resolved.
Show resolved Hide resolved
if (resCode != HttpURLConnection.HTTP_OK) {
throw new RuntimeException("Failed to disable PA");
khushbr marked this conversation as resolved.
Show resolved Hide resolved
}
}

private static void cleanupAndExit() {
LOG.info("Reader thread not coming up successfully - Shutting down RCA Runtime");
StatsCollector.instance().logException(StatExceptionCode.READER_ERROR_RCA_AGENT_STOPPED);

// Terminate Java Runtime, executes {@link #shutDownGracefully(ClientServers clientServers)}
System.exit(1);
Copy link
Contributor

@sgup432 sgup432 Aug 15, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In case users want(or try) to enable PA/RCA back via CLI dynamically, I guess it wouldn't work?
How do we handle such scenarios?

Does it make sense if we just terminate the reader thread? As killing rca process, disabling PA plugin(via 9200) all seems too intrusive.

Copy link
Collaborator Author

@khushbr khushbr Aug 22, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I disagree, killing only the PA thread while keeping the process idle is wasting the system resource. The RCA process as we know depends on reader subcomponent for providing the data, which then flows through analysis graph and made available to nodes/users via the grpc and web server resp.

In case users want(or try) to enable PA/RCA back via CLI dynamically, I guess it wouldn't work?
How do we handle such scenarios?

PA can be enabled using the REST API call, while starting RCA will require bringing up the RCA Agent via the performance-analyzer-agent tool. This updated behavior will be added to the documentation.

Let me know your thoughts.

}

/**
* Start all the servers and clients for request processing. We start two servers: - httpServer:
* To handle the curl requests sent to the endpoint. This is human readable and also used by the
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,14 @@ public final class Version {
* transferred packets should be dropped. Every increment here should be accompanied with a line
* describing the version bump.
*
* Note: The RCA version is agnostic of OpenSearch version.
* <p>Note: The RCA version is agnostic of OpenSearch version.
*/
static final class Major {
// Bumping this post the Commons Lib(https://github.com/opensearch-project/performance-analyzer-commons/issues/2)
// and Service Metrics(https://github.com/opensearch-project/performance-analyzer-commons/issues/8) change
// Bumping this post the Commons
// Lib(https://github.com/opensearch-project/performance-analyzer-commons/issues/2)
// and Service
// Metrics(https://github.com/opensearch-project/performance-analyzer-commons/issues/8)
// change
static final int RCA_MAJ_VERSION = 1;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,22 +5,49 @@

package org.opensearch.performanceanalyzer;

import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.times;
import static org.powermock.api.mockito.PowerMockito.doNothing;
import static org.powermock.api.mockito.PowerMockito.doThrow;
import static org.powermock.api.mockito.PowerMockito.mock;
import static org.powermock.api.mockito.PowerMockito.verifyPrivate;

import io.netty.handler.codec.http.HttpMethod;
import java.util.concurrent.ArrayBlockingQueue;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.contrib.java.lang.system.ExpectedSystemExit;
import org.junit.runner.RunWith;
import org.opensearch.performanceanalyzer.commons.config.ConfigStatus;
import org.opensearch.performanceanalyzer.commons.stats.metrics.StatExceptionCode;
import org.opensearch.performanceanalyzer.rca.RcaTestHelper;
import org.opensearch.performanceanalyzer.reader.ReaderMetricsProcessor;
import org.opensearch.performanceanalyzer.threads.ThreadProvider;
import org.opensearch.performanceanalyzer.threads.exceptions.PAThreadException;
import org.powermock.api.mockito.PowerMockito;
import org.powermock.core.classloader.annotations.PowerMockIgnore;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;

@RunWith(PowerMockRunner.class)
@PrepareForTest({ESLocalhostConnection.class, PerformanceAnalyzerApp.class})
@PowerMockIgnore({
"com.sun.org.apache.xerces.*",
"javax.xml.*",
"org.xml.*",
"javax.management.*",
"org.w3c.*"
})
public class PerformanceAnalyzerAppTest {
@Rule public final ExpectedSystemExit exit = ExpectedSystemExit.none();

@Before
public void setup() {
System.setProperty("performanceanalyzer.metrics.log.enabled", "False");
RcaTestHelper.cleanUpLogs();
}

@Test
Expand Down Expand Up @@ -49,4 +76,54 @@ public void testStartErrorHandlingThread() throws InterruptedException {
RcaTestHelper.verifyStatException(
StatExceptionCode.ERROR_HANDLER_THREAD_STOPPED.toString()));
}

@Test
public void testStartReaderThreadAllAttemptFail() throws Exception {
ThreadProvider threadProvider = new ThreadProvider();
AppContext appContext = new AppContext();

PowerMockito.mockStatic(ESLocalhostConnection.class);
ReaderMetricsProcessor readerMetricsProcessor = mock(ReaderMetricsProcessor.class);
doThrow(new RuntimeException("Force Crashing Reader Thread"))
.when(readerMetricsProcessor)
.run();
PowerMockito.whenNew(ReaderMetricsProcessor.class)
.withAnyArguments()
.thenReturn(readerMetricsProcessor);

PowerMockito.spy(PerformanceAnalyzerApp.class);
doNothing().when(PerformanceAnalyzerApp.class, "cleanupAndExit");

// PA Disable Success
PowerMockito.when(
ESLocalhostConnection.makeHttpRequest(
anyString(), eq(HttpMethod.POST), anyString()))
.thenReturn(200);
PerformanceAnalyzerApp.startReaderThread(appContext, threadProvider);
Assert.assertTrue(
"READER_RESTART_PROCESSING metric missing",
RcaTestHelper.verifyStatException(
StatExceptionCode.READER_RESTART_PROCESSING.toString()));
Assert.assertTrue(
"READER_ERROR_PA_DISABLE_SUCCESS metric missing",
RcaTestHelper.verifyStatException(
StatExceptionCode.READER_ERROR_PA_DISABLE_SUCCESS.toString()));
verifyPrivate(PerformanceAnalyzerApp.class, times(1)).invoke("cleanupAndExit");

// PA Disable Fail
PowerMockito.when(
ESLocalhostConnection.makeHttpRequest(
anyString(), eq(HttpMethod.POST), anyString()))
.thenReturn(500);
PerformanceAnalyzerApp.startReaderThread(appContext, threadProvider);
Assert.assertTrue(
"READER_RESTART_PROCESSING metric missing",
RcaTestHelper.verifyStatException(
StatExceptionCode.READER_RESTART_PROCESSING.toString()));
Assert.assertTrue(
"READER_ERROR_PA_DISABLE_FAILED metric missing",
RcaTestHelper.verifyStatException(
StatExceptionCode.READER_ERROR_PA_DISABLE_FAILED.toString()));
verifyPrivate(PerformanceAnalyzerApp.class, times(2)).invoke("cleanupAndExit");
}
}
1 change: 1 addition & 0 deletions src/test/resources/rca/batch_metrics_enabled.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
false