From 46bd63e4079d03254767cec6a0710eaf486a4f6b Mon Sep 17 00:00:00 2001 From: CoderJeffrey Date: Thu, 22 Jun 2023 11:29:11 -0700 Subject: [PATCH 01/11] Remove log files and add DCO (Signed-off-by: Jeffrey Liu ujeffliu@amazon.com) Signed-off-by: CoderJeffrey --- .../model/MetricAttributes.java | 1 - .../model/MetricsModel.java | 5 + .../performanceanalyzer/rca/Version.java | 7 +- .../api/metrics/SearchBackPressureStats.java | 15 ++ .../rca/framework/metrics/ReaderMetrics.java | 8 +- .../rca/store/OpenSearchAnalysisGraph.java | 3 +- .../reader/MetricsEmitter.java | 114 ++++++++++ .../reader/ReaderMetricsProcessor.java | 23 ++ .../SearchBackPressureMetricsProcessor.java | 197 ++++++++++++++++++ .../SearchBackPressureMetricsSnapShot.java | 179 ++++++++++++++++ ...earchBackPressureMetricsProcessorTest.java | 161 ++++++++++++++ ...SearchBackPressureMetricsSnapShotTest.java | 136 ++++++++++++ 12 files changed, 844 insertions(+), 5 deletions(-) create mode 100644 src/main/java/org/opensearch/performanceanalyzer/rca/framework/api/metrics/SearchBackPressureStats.java create mode 100644 src/main/java/org/opensearch/performanceanalyzer/reader/SearchBackPressureMetricsProcessor.java create mode 100644 src/main/java/org/opensearch/performanceanalyzer/reader/SearchBackPressureMetricsSnapShot.java create mode 100644 src/test/java/org/opensearch/performanceanalyzer/reader/SearchBackPressureMetricsProcessorTest.java create mode 100644 src/test/java/org/opensearch/performanceanalyzer/reader/SearchBackPressureMetricsSnapShotTest.java diff --git a/src/main/java/org/opensearch/performanceanalyzer/model/MetricAttributes.java b/src/main/java/org/opensearch/performanceanalyzer/model/MetricAttributes.java index 36230b8e5..414e266b7 100644 --- a/src/main/java/org/opensearch/performanceanalyzer/model/MetricAttributes.java +++ b/src/main/java/org/opensearch/performanceanalyzer/model/MetricAttributes.java @@ -14,7 +14,6 @@ public class MetricAttributes { public HashSet dimensionNames; MetricAttributes(String unit, MetricDimension[] dimensions) { - this.unit = unit; this.dimensionNames = new HashSet(); for (MetricDimension dimension : dimensions) { diff --git a/src/main/java/org/opensearch/performanceanalyzer/model/MetricsModel.java b/src/main/java/org/opensearch/performanceanalyzer/model/MetricsModel.java index 5b144ac12..f7c781ac1 100644 --- a/src/main/java/org/opensearch/performanceanalyzer/model/MetricsModel.java +++ b/src/main/java/org/opensearch/performanceanalyzer/model/MetricsModel.java @@ -464,6 +464,11 @@ public class MetricsModel { MetricUnits.MILLISECOND.toString(), AllMetrics.ShardIndexingPressureDimension.values())); + // Search Back Pressure Metrics + allMetricsInitializer.put( + AllMetrics.SearchBackPressureStatsValue.SEARCHBP_SHARD_STATS_CANCELLATIONCOUNT + .toString(), + new MetricAttributes(MetricUnits.COUNT.toString(), EmptyDimension.values())); ALL_METRICS = Collections.unmodifiableMap(allMetricsInitializer); } } diff --git a/src/main/java/org/opensearch/performanceanalyzer/rca/Version.java b/src/main/java/org/opensearch/performanceanalyzer/rca/Version.java index bfc85fcd3..ac53b4d72 100644 --- a/src/main/java/org/opensearch/performanceanalyzer/rca/Version.java +++ b/src/main/java/org/opensearch/performanceanalyzer/rca/Version.java @@ -22,8 +22,11 @@ public final class Version { * Note: The RCA version is agnostic of OpenSearch version. */ static final class Major { - // Bumping this post the Commons Lib(https://github.com/opensearch-project/performance-analyzer-commons/issues/2) - // and Service Metrics(https://github.com/opensearch-project/performance-analyzer-commons/issues/8) change + // Bumping this post the Commons + // Lib(https://github.com/opensearch-project/performance-analyzer-commons/issues/2) + // and Service + // Metrics(https://github.com/opensearch-project/performance-analyzer-commons/issues/8) + // change static final int RCA_MAJ_VERSION = 1; } diff --git a/src/main/java/org/opensearch/performanceanalyzer/rca/framework/api/metrics/SearchBackPressureStats.java b/src/main/java/org/opensearch/performanceanalyzer/rca/framework/api/metrics/SearchBackPressureStats.java new file mode 100644 index 000000000..ae5c59814 --- /dev/null +++ b/src/main/java/org/opensearch/performanceanalyzer/rca/framework/api/metrics/SearchBackPressureStats.java @@ -0,0 +1,15 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.performanceanalyzer.rca.framework.api.metrics; + + +import org.opensearch.performanceanalyzer.rca.framework.api.Metric; + +public class SearchBackPressureStats extends Metric { + public SearchBackPressureStats(long evaluationIntervalSeconds) { + super("searchbp_shard_stats_cancellationCount", evaluationIntervalSeconds); + } +} diff --git a/src/main/java/org/opensearch/performanceanalyzer/rca/framework/metrics/ReaderMetrics.java b/src/main/java/org/opensearch/performanceanalyzer/rca/framework/metrics/ReaderMetrics.java index 4c9fc5a04..ec6ce0fd9 100644 --- a/src/main/java/org/opensearch/performanceanalyzer/rca/framework/metrics/ReaderMetrics.java +++ b/src/main/java/org/opensearch/performanceanalyzer/rca/framework/metrics/ReaderMetrics.java @@ -86,7 +86,13 @@ public enum ReaderMetrics implements MeasurementSet { "FaultDetectionMetricsEmitterExecutionTime", "millis", StatsType.LATENCIES, - Statistics.SUM); + Statistics.SUM), + SEARCH_BACK_PRESSURE_METRICS_EMITTER_EXECUTION_TIME( + "SearchBackPressureMetricsEmitterExecutionTime", + "millis", + StatsType.LATENCIES, + Statistics.SUM), + ; /** What we want to appear as the metric name. */ private String name; diff --git a/src/main/java/org/opensearch/performanceanalyzer/rca/store/OpenSearchAnalysisGraph.java b/src/main/java/org/opensearch/performanceanalyzer/rca/store/OpenSearchAnalysisGraph.java index e144f2ee1..80763befb 100644 --- a/src/main/java/org/opensearch/performanceanalyzer/rca/store/OpenSearchAnalysisGraph.java +++ b/src/main/java/org/opensearch/performanceanalyzer/rca/store/OpenSearchAnalysisGraph.java @@ -183,7 +183,8 @@ public void construct() { // Use EVALUATION_INTERVAL_SECONDS instead of RCA_PERIOD which resolved to 12 seconds. // This is resulting in this RCA not getting executed in every 5 seconds. Rca> threadMetricsRca = - new ThreadMetricsRca(threadBlockedTime, threadWaitedTime, EVALUATION_INTERVAL_SECONDS); + new ThreadMetricsRca( + threadBlockedTime, threadWaitedTime, EVALUATION_INTERVAL_SECONDS); threadMetricsRca.addTag( RcaConsts.RcaTagConstants.TAG_LOCUS, RcaConsts.RcaTagConstants.LOCUS_DATA_CLUSTER_MANAGER_NODE); diff --git a/src/main/java/org/opensearch/performanceanalyzer/reader/MetricsEmitter.java b/src/main/java/org/opensearch/performanceanalyzer/reader/MetricsEmitter.java index d209bf7f1..b060fb346 100644 --- a/src/main/java/org/opensearch/performanceanalyzer/reader/MetricsEmitter.java +++ b/src/main/java/org/opensearch/performanceanalyzer/reader/MetricsEmitter.java @@ -749,6 +749,120 @@ public static void emitGarbageCollectionInfo( ReaderMetrics.GC_INFO_EMITTER_EXECUTION_TIME, mFinalT - mCurrT); } + public static void emitSearchBackPressureMetrics( + MetricsDB metricsDB, + SearchBackPressureMetricsSnapShot searchBackPressureMetricsSnapShot) { + long mCurrT = System.currentTimeMillis(); + Result searchbp_records = searchBackPressureMetricsSnapShot.fetchAll(); + + // String SEARCHBP_MODE_DIM = "searchbp_mode"; + String SEARCHBP_TYPE_DIM = "SearchBackPressureStats"; + String SEARCHBP_TABLE_NAME = "searchbp_stats"; + + List dims = + new ArrayList() { + { + this.add(SEARCHBP_TYPE_DIM); + } + }; + + List stats_types = + new ArrayList() { + { + // Shard/Task Stats Cancellation Count + this.add( + AllMetrics.SearchBackPressureStatsValue + .SEARCHBP_SHARD_STATS_CANCELLATIONCOUNT + .toString()); + this.add( + AllMetrics.SearchBackPressureStatsValue + .SEARCHBP_TASK_STATS_CANCELLATIONCOUNT + .toString()); + // Shard Stats Resource Heap / CPU Usage + this.add( + AllMetrics.SearchBackPressureStatsValue + .SEARCHBP_SHARD_STATS_RESOURCE_HEAP_USAGE_CANCELLATIONCOUNT + .toString()); + this.add( + AllMetrics.SearchBackPressureStatsValue + .SEARCHBP_SHARD_STATS_RESOURCE_HEAP_USAGE_CURRENTMAX + .toString()); + this.add( + AllMetrics.SearchBackPressureStatsValue + .SEARCHBP_SHARD_STATS_RESOURCE_HEAP_USAGE_ROLLINGAVG + .toString()); + this.add( + AllMetrics.SearchBackPressureStatsValue + .SEARCHBP_SHARD_STATS_RESOURCE_CPU_USAGE_CANCELLATIONCOUNT + .toString()); + this.add( + AllMetrics.SearchBackPressureStatsValue + .SEARCHBP_SHARD_STATS_RESOURCE_CPU_USAGE_CURRENTMAX + .toString()); + this.add( + AllMetrics.SearchBackPressureStatsValue + .SEARCHBP_SHARD_STATS_RESOURCE_CPU_USAGE_CURRENTAVG + .toString()); + // Task Stats Resource Heap / CPU Usage + this.add( + AllMetrics.SearchBackPressureStatsValue + .SEARCHBP_TASK_STATS_RESOURCE_HEAP_USAGE_CANCELLATIONCOUNT + .toString()); + this.add( + AllMetrics.SearchBackPressureStatsValue + .SEARCHBP_TASK_STATS_RESOURCE_HEAP_USAGE_CURRENTMAX + .toString()); + this.add( + AllMetrics.SearchBackPressureStatsValue + .SEARCHBP_TASK_STATS_RESOURCE_HEAP_USAGE_ROLLINGAVG + .toString()); + this.add( + AllMetrics.SearchBackPressureStatsValue + .SEARCHBP_TASK_STATS_RESOURCE_CPU_USAGE_CANCELLATIONCOUNT + .toString()); + this.add( + AllMetrics.SearchBackPressureStatsValue + .SEARCHBP_TASK_STATS_RESOURCE_CPU_USAGE_CURRENTMAX + .toString()); + this.add( + AllMetrics.SearchBackPressureStatsValue + .SEARCHBP_TASK_STATS_RESOURCE_CPU_USAGE_CURRENTAVG + .toString()); + } + }; + + metricsDB.createMetric(new Metric<>(SEARCHBP_TABLE_NAME, 0d), dims); + + BatchBindStep handle = metricsDB.startBatchPut(new Metric<>(SEARCHBP_TABLE_NAME, 0d), dims); + + for (Record record : searchbp_records) { + for (String stats_type : stats_types) { + Optional tmpStatsObj = Optional.ofNullable(record.get(stats_type)); + // LOG.info(stats_type + " is: " + tmpStatsObj.map(o -> + // Long.parseLong(o.toString())).toString()); + + handle.bind( + stats_type, + // the rest are agg fields: sum, avg, min, max which don't make sense for + // searchbackpressure + tmpStatsObj.map(o -> Long.parseLong(o.toString())).orElse(0L), + tmpStatsObj.map(o -> Long.parseLong(o.toString())).orElse(0L), + tmpStatsObj.map(o -> Long.parseLong(o.toString())).orElse(0L), + tmpStatsObj.map(o -> Long.parseLong(o.toString())).orElse(0L)); + } + } + + handle.execute(); + + long mFinalT = System.currentTimeMillis(); + LOG.debug( + "Total time taken for writing Search Back Pressure info into metricsDB: {}", + mFinalT - mCurrT); + ServiceMetrics.READER_METRICS_AGGREGATOR.updateStat( + ReaderMetrics.SEARCH_BACK_PRESSURE_METRICS_EMITTER_EXECUTION_TIME, + mFinalT - mCurrT); + } + public static void emitAdmissionControlMetrics( MetricsDB metricsDB, AdmissionControlSnapshot snapshot) { long mCurrT = System.currentTimeMillis(); diff --git a/src/main/java/org/opensearch/performanceanalyzer/reader/ReaderMetricsProcessor.java b/src/main/java/org/opensearch/performanceanalyzer/reader/ReaderMetricsProcessor.java index 512c52f6d..3b446d95e 100644 --- a/src/main/java/org/opensearch/performanceanalyzer/reader/ReaderMetricsProcessor.java +++ b/src/main/java/org/opensearch/performanceanalyzer/reader/ReaderMetricsProcessor.java @@ -70,6 +70,7 @@ public class ReaderMetricsProcessor implements Runnable { clusterManagerThrottlingMetricsMap; private NavigableMap shardStateMetricsMap; private NavigableMap admissionControlMetricsMap; + private NavigableMap searchBackPressureMetricsMap; private static final int MAX_DATABASES = 2; private static final int OS_SNAPSHOTS = 4; @@ -81,6 +82,7 @@ public class ReaderMetricsProcessor implements Runnable { private static final int GC_INFO_SNAPSHOTS = 4; private static final int CLUSTER_MANAGER_THROTTLING_SNAPSHOTS = 2; private static final int AC_SNAPSHOTS = 2; + private static final int SEARCH_BP_SNAPSHOTS = 4; private final String rootLocation; private final AppContext appContext; @@ -125,6 +127,8 @@ public ReaderMetricsProcessor( gcInfoMap = new TreeMap<>(); clusterManagerThrottlingMetricsMap = new TreeMap<>(); admissionControlMetricsMap = new TreeMap<>(); + searchBackPressureMetricsMap = new TreeMap<>(); + this.rootLocation = rootLocation; this.configOverridesApplier = new ConfigOverridesApplier(); @@ -268,6 +272,7 @@ public void trimOldSnapshots() throws Exception { trimMap(gcInfoMap, GC_INFO_SNAPSHOTS); trimMap(clusterManagerThrottlingMetricsMap, CLUSTER_MANAGER_THROTTLING_SNAPSHOTS); trimMap(admissionControlMetricsMap, AC_SNAPSHOTS); + trimMap(searchBackPressureMetricsMap, SEARCH_BP_SNAPSHOTS); for (NavigableMap snap : nodeMetricsMap.values()) { // do the same thing as OS_SNAPSHOTS. Eventually MemoryDBSnapshot @@ -397,6 +402,7 @@ private void emitMetrics(long currWindowStartTime) throws Exception { emitAdmissionControlMetrics(prevWindowStartTime, metricsDB); emitClusterManagerMetrics(prevWindowStartTime, metricsDB); emitClusterManagerThrottlingMetrics(prevWindowStartTime, metricsDB); + emitSearchBackPressureMetrics(prevWindowStartTime, metricsDB); metricsDB.commit(); metricsDBMap.put(prevWindowStartTime, metricsDB); @@ -594,6 +600,19 @@ private void emitClusterManagerThrottlingMetrics( } } + private void emitSearchBackPressureMetrics(long prevWindowStartTime, MetricsDB metricsDB) + throws Exception { + if (searchBackPressureMetricsMap.containsKey(prevWindowStartTime)) { + SearchBackPressureMetricsSnapShot prevSearchBPSnapShot = + searchBackPressureMetricsMap.get(prevWindowStartTime); + MetricsEmitter.emitSearchBackPressureMetrics(metricsDB, prevSearchBPSnapShot); + } else { + LOG.debug( + "Search Back Pressure snapshot does not exist for the previous window. " + + "Not emitting metrics."); + } + } + /** * OS, Request, Http and cluster_manager first aligns the currentTimeStamp with a 5 second * interval. In the current format, a file (previously a directory) is written every 5 seconds. @@ -679,6 +698,9 @@ is ready so it starts to read that file (go back two windows and EventProcessor admissionControlProcessor = AdmissionControlProcessor.build( currWindowStartTime, conn, admissionControlMetricsMap); + EventProcessor searchBackPressureMetricsProcessor = + SearchBackPressureMetricsProcessor.buildSearchBackPressureMetricsProcessor( + currWindowStartTime, conn, searchBackPressureMetricsMap); // The event dispatcher dispatches events to each of the registered event processors. // In addition to event processing each processor has an initialize/finalize function that @@ -702,6 +724,7 @@ is ready so it starts to read that file (go back two windows and eventDispatcher.registerEventProcessor(faultDetectionProcessor); eventDispatcher.registerEventProcessor(garbageCollectorInfoProcessor); eventDispatcher.registerEventProcessor(admissionControlProcessor); + eventDispatcher.registerEventProcessor(searchBackPressureMetricsProcessor); eventDispatcher.initializeProcessing( currWindowStartTime, currWindowStartTime + MetricsConfiguration.SAMPLING_INTERVAL); diff --git a/src/main/java/org/opensearch/performanceanalyzer/reader/SearchBackPressureMetricsProcessor.java b/src/main/java/org/opensearch/performanceanalyzer/reader/SearchBackPressureMetricsProcessor.java new file mode 100644 index 000000000..8c6e93d8c --- /dev/null +++ b/src/main/java/org/opensearch/performanceanalyzer/reader/SearchBackPressureMetricsProcessor.java @@ -0,0 +1,197 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.performanceanalyzer.reader; + + +import java.sql.Connection; +import java.util.ArrayList; +import java.util.Map; +import java.util.NavigableMap; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.jooq.BatchBindStep; +import org.opensearch.performanceanalyzer.commons.event_process.Event; +import org.opensearch.performanceanalyzer.commons.event_process.EventProcessor; +import org.opensearch.performanceanalyzer.commons.metrics.AllMetrics; +import org.opensearch.performanceanalyzer.commons.metrics.PerformanceAnalyzerMetrics; +import org.opensearch.performanceanalyzer.commons.util.JsonConverter; + +public class SearchBackPressureMetricsProcessor implements EventProcessor { + + private static final Logger LOG = + LogManager.getLogger(SearchBackPressureMetricsProcessor.class); + + // instance of SearchBackPressureMetricsSnapShot to interact with the backend db + private SearchBackPressureMetricsSnapShot searchBackPressureMetricsSnapShot; + + // entry point for batch queries + private BatchBindStep handle; + + // normally starTime and endTime are gapped by 5 seconds (default sampling interval) + private long startTime; + private long endTime; + + private SearchBackPressureMetricsProcessor( + SearchBackPressureMetricsSnapShot searchBackPressureMetricsSnapShot) { + this.searchBackPressureMetricsSnapShot = searchBackPressureMetricsSnapShot; + } + + /* + * if current SnapShotMap has the snapshot for currentWindowStartTime, use the snapshot to build the processor + * else create a new Instance of SearchBackPressureMetricsSnapShot to initialize the processor + */ + static SearchBackPressureMetricsProcessor buildSearchBackPressureMetricsProcessor( + long currentWindowStartTime, + Connection connection, + NavigableMap + searchBackPressureSnapshotNavigableMap) { + // if current metrics is in searchBackPressureSnapshotNavigableMap map + if (searchBackPressureSnapshotNavigableMap.get(currentWindowStartTime) == null) { + SearchBackPressureMetricsSnapShot searchBackPressureMetricsSnapShot = + new SearchBackPressureMetricsSnapShot(connection, currentWindowStartTime); + searchBackPressureSnapshotNavigableMap.put( + currentWindowStartTime, searchBackPressureMetricsSnapShot); + return new SearchBackPressureMetricsProcessor(searchBackPressureMetricsSnapShot); + } + + return new SearchBackPressureMetricsProcessor( + searchBackPressureSnapshotNavigableMap.get(currentWindowStartTime)); + } + + @Override + public void initializeProcessing(long startTime, long endTime) { + this.startTime = startTime; + this.endTime = endTime; + this.handle = searchBackPressureMetricsSnapShot.startBatchPut(); + } + + @Override + public void finalizeProcessing() { + if (handle.size() > 0) { + handle.execute(); + } + } + + @Override + public boolean shouldProcessEvent(Event event) { + return event.key.contains(PerformanceAnalyzerMetrics.sSearchBackPressureMetricsPath); + } + + @Override + public void commitBatchIfRequired() { + if (handle.size() >= BATCH_LIMIT) { + handle.execute(); + handle = searchBackPressureMetricsSnapShot.startBatchPut(); + } + } + + // Handler method for incoming events + private void handleSearchBackPressureEvent(String eventValue) { + String[] lines = eventValue.split(System.lineSeparator()); + // 0thline is current time string (e.g. {current_time:1686952296889}) + // 1st line is the payload the metrics + if (lines.length < 2) { + throw new RuntimeException("Missing SearchBackPressure Metrics payload and timestamp."); + // return; + } + + // Parse metrics payload + parseJsonLine(lines[1]); + } + + private void parseJsonLine(final String jsonString) { + Map map = JsonConverter.createMapFrom(jsonString); + + if (map.isEmpty()) { + throw new RuntimeException("Missing SearchBackPressure Metrics payload."); + // return; + } + // A list of dims to be collected + ArrayList required_searchbp_dims = + new ArrayList() { + { + // Shard/Task Stats Cancellation Count + this.add( + AllMetrics.SearchBackPressureStatsValue + .SEARCHBP_SHARD_STATS_CANCELLATIONCOUNT + .toString()); + this.add( + AllMetrics.SearchBackPressureStatsValue + .SEARCHBP_TASK_STATS_CANCELLATIONCOUNT + .toString()); + + // Shard Stats Resource Heap / CPU Usage + this.add( + AllMetrics.SearchBackPressureStatsValue + .SEARCHBP_SHARD_STATS_RESOURCE_HEAP_USAGE_CANCELLATIONCOUNT + .toString()); + this.add( + AllMetrics.SearchBackPressureStatsValue + .SEARCHBP_SHARD_STATS_RESOURCE_HEAP_USAGE_CURRENTMAX + .toString()); + this.add( + AllMetrics.SearchBackPressureStatsValue + .SEARCHBP_SHARD_STATS_RESOURCE_HEAP_USAGE_ROLLINGAVG + .toString()); + this.add( + AllMetrics.SearchBackPressureStatsValue + .SEARCHBP_SHARD_STATS_RESOURCE_CPU_USAGE_CANCELLATIONCOUNT + .toString()); + this.add( + AllMetrics.SearchBackPressureStatsValue + .SEARCHBP_SHARD_STATS_RESOURCE_CPU_USAGE_CURRENTMAX + .toString()); + this.add( + AllMetrics.SearchBackPressureStatsValue + .SEARCHBP_SHARD_STATS_RESOURCE_CPU_USAGE_CURRENTAVG + .toString()); + + // Task Stats Resource Heap / CPU Usage + this.add( + AllMetrics.SearchBackPressureStatsValue + .SEARCHBP_TASK_STATS_RESOURCE_HEAP_USAGE_CANCELLATIONCOUNT + .toString()); + this.add( + AllMetrics.SearchBackPressureStatsValue + .SEARCHBP_TASK_STATS_RESOURCE_HEAP_USAGE_CURRENTMAX + .toString()); + this.add( + AllMetrics.SearchBackPressureStatsValue + .SEARCHBP_TASK_STATS_RESOURCE_HEAP_USAGE_ROLLINGAVG + .toString()); + this.add( + AllMetrics.SearchBackPressureStatsValue + .SEARCHBP_TASK_STATS_RESOURCE_CPU_USAGE_CANCELLATIONCOUNT + .toString()); + this.add( + AllMetrics.SearchBackPressureStatsValue + .SEARCHBP_TASK_STATS_RESOURCE_CPU_USAGE_CURRENTMAX + .toString()); + this.add( + AllMetrics.SearchBackPressureStatsValue + .SEARCHBP_TASK_STATS_RESOURCE_CPU_USAGE_CURRENTAVG + .toString()); + } + }; + + Object[] bindVals = new Object[required_searchbp_dims.size()]; + int idx = 0; + for (String dimension : required_searchbp_dims) { + bindVals[idx++] = map.get(dimension); + } + + handle.bind(bindVals); + } + + @Override + public void processEvent(Event event) { + // Handler method for incoming event + handleSearchBackPressureEvent(event.value); + + // commit Batch queries is overflow the limit + commitBatchIfRequired(); + } +} diff --git a/src/main/java/org/opensearch/performanceanalyzer/reader/SearchBackPressureMetricsSnapShot.java b/src/main/java/org/opensearch/performanceanalyzer/reader/SearchBackPressureMetricsSnapShot.java new file mode 100644 index 000000000..b995cbe44 --- /dev/null +++ b/src/main/java/org/opensearch/performanceanalyzer/reader/SearchBackPressureMetricsSnapShot.java @@ -0,0 +1,179 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.performanceanalyzer.reader; + + +import java.sql.Connection; +import java.util.ArrayList; +import java.util.List; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.jooq.BatchBindStep; +import org.jooq.DSLContext; +import org.jooq.Field; +import org.jooq.Record; +import org.jooq.Result; +import org.jooq.SQLDialect; +import org.jooq.impl.DSL; +import org.opensearch.performanceanalyzer.commons.metrics.AllMetrics; + +public class SearchBackPressureMetricsSnapShot implements Removable { + + // Logger for current class + private static final Logger LOG = LogManager.getLogger(SearchBackPressureMetricsSnapShot.class); + + // entry point to interact with SQLite db + private final DSLContext create; + + private final String tableName; + private List> columns; + + // Global variables for naming + private static final String SEARCHBP_CONTROLLER_NAME_VALUE = "ControllerName"; + private static final String SEARCHBP_MODE_VALUE = "searchbp_mode"; + + // Create a table with specifed fields (columns) + public SearchBackPressureMetricsSnapShot(Connection conn, Long windowStartTime) { + this.create = DSL.using(conn, SQLDialect.SQLITE); + this.tableName = "search_back_pressure_" + windowStartTime; + + // Add the ControllerName, searchbp_mode columns in the table + this.columns = + new ArrayList>() { + { + // Shard/Task Stats Cancellation Count + this.add( + DSL.field( + DSL.name( + AllMetrics.SearchBackPressureStatsValue + .SEARCHBP_SHARD_STATS_CANCELLATIONCOUNT + .toString()), + Long.class)); + this.add( + DSL.field( + DSL.name( + AllMetrics.SearchBackPressureStatsValue + .SEARCHBP_TASK_STATS_CANCELLATIONCOUNT + .toString()), + Long.class)); + + // Shard Stats Resource Heap / CPU Usage + this.add( + DSL.field( + DSL.name( + AllMetrics.SearchBackPressureStatsValue + .SEARCHBP_SHARD_STATS_RESOURCE_HEAP_USAGE_CANCELLATIONCOUNT + .toString()), + Long.class)); + this.add( + DSL.field( + DSL.name( + AllMetrics.SearchBackPressureStatsValue + .SEARCHBP_SHARD_STATS_RESOURCE_HEAP_USAGE_CURRENTMAX + .toString()), + Long.class)); + this.add( + DSL.field( + DSL.name( + AllMetrics.SearchBackPressureStatsValue + .SEARCHBP_SHARD_STATS_RESOURCE_HEAP_USAGE_ROLLINGAVG + .toString()), + Long.class)); + this.add( + DSL.field( + DSL.name( + AllMetrics.SearchBackPressureStatsValue + .SEARCHBP_SHARD_STATS_RESOURCE_CPU_USAGE_CANCELLATIONCOUNT + .toString()), + Long.class)); + + this.add( + DSL.field( + DSL.name( + AllMetrics.SearchBackPressureStatsValue + .SEARCHBP_SHARD_STATS_RESOURCE_CPU_USAGE_CURRENTMAX + .toString()), + Long.class)); + this.add( + DSL.field( + DSL.name( + AllMetrics.SearchBackPressureStatsValue + .SEARCHBP_SHARD_STATS_RESOURCE_CPU_USAGE_CURRENTAVG + .toString()), + Long.class)); + + // Task Stats Resource Heap / CPU Usage + this.add( + DSL.field( + DSL.name( + AllMetrics.SearchBackPressureStatsValue + .SEARCHBP_TASK_STATS_RESOURCE_HEAP_USAGE_CANCELLATIONCOUNT + .toString()), + Long.class)); + this.add( + DSL.field( + DSL.name( + AllMetrics.SearchBackPressureStatsValue + .SEARCHBP_TASK_STATS_RESOURCE_HEAP_USAGE_CURRENTMAX + .toString()), + Long.class)); + this.add( + DSL.field( + DSL.name( + AllMetrics.SearchBackPressureStatsValue + .SEARCHBP_TASK_STATS_RESOURCE_HEAP_USAGE_ROLLINGAVG + .toString()), + Long.class)); + this.add( + DSL.field( + DSL.name( + AllMetrics.SearchBackPressureStatsValue + .SEARCHBP_TASK_STATS_RESOURCE_CPU_USAGE_CANCELLATIONCOUNT + .toString()), + Long.class)); + + this.add( + DSL.field( + DSL.name( + AllMetrics.SearchBackPressureStatsValue + .SEARCHBP_TASK_STATS_RESOURCE_CPU_USAGE_CURRENTMAX + .toString()), + Long.class)); + this.add( + DSL.field( + DSL.name( + AllMetrics.SearchBackPressureStatsValue + .SEARCHBP_TASK_STATS_RESOURCE_CPU_USAGE_CURRENTAVG + .toString()), + Long.class)); + } + }; + + // create table with columns specified + create.createTable(tableName).columns(columns).execute(); + } + + public DSLContext getDSLContext() { + return create; + } + + public BatchBindStep startBatchPut() { + List dummyValues = new ArrayList<>(); + for (int i = 0; i < columns.size(); i++) { + dummyValues.add(null); + } + return create.batch(create.insertInto(DSL.table(this.tableName)).values(dummyValues)); + } + + public Result fetchAll() { + return create.select().from(DSL.table(tableName)).fetch(); + } + + @Override + public void remove() throws Exception { + create.dropTable(DSL.table(tableName)).execute(); + } +} diff --git a/src/test/java/org/opensearch/performanceanalyzer/reader/SearchBackPressureMetricsProcessorTest.java b/src/test/java/org/opensearch/performanceanalyzer/reader/SearchBackPressureMetricsProcessorTest.java new file mode 100644 index 000000000..6abbf4a90 --- /dev/null +++ b/src/test/java/org/opensearch/performanceanalyzer/reader/SearchBackPressureMetricsProcessorTest.java @@ -0,0 +1,161 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.performanceanalyzer.reader; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.util.NavigableMap; +import java.util.TreeMap; +import org.jooq.Record; +import org.jooq.Result; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.opensearch.performanceanalyzer.commons.event_process.Event; +import org.opensearch.performanceanalyzer.commons.metrics.AllMetrics; +import org.opensearch.performanceanalyzer.commons.metrics.PerformanceAnalyzerMetrics; + +public class SearchBackPressureMetricsProcessorTest { + private static final String DB_URL = "jdbc:sqlite:"; + // private static final String TEST_MEM_POOL = "testMemPool"; + // private static final String COLLECTOR_NAME = "testCollectorName"; + private static final String SEARCH_BACK_PRESSURE_STATS_KEY = "search_back_pressure_stats"; + private SearchBackPressureMetricsProcessor searchBackPressureMetricsProcessor; + private long currTimeStamp; + + private NavigableMap searchBackPressureStatsMap; + Connection conn; + + // mock SearchBackPressureStatsCollector to test Event processing + private static final String SERIALIZED_EVENT = + "{\"searchbp_shard_stats_cancellationCount\":2," + + "\"searchbp_shard_stats_limitReachedCount\":2," + + "\"searchbp_shard_stats_resource_heap_usage_cancellationCount\":3," + + "\"searchbp_shard_stats_resource_heap_usage_currentMax\":3," + + "\"searchbp_shard_stats_resource_heap_usage_rollingAvg\":3," + + "\"searchbp_shard_stats_resource_cpu_usage_cancellationCount\":5," + + "\"searchbp_shard_stats_resource_cpu_usage_currentMax\":5," + + "\"searchbp_shard_stats_resource_cpu_usage_currentAvg\":5," + + "\"searchbp_shard_stats_resource_elaspedtime_usage_cancellationCount\":2," + + "\"searchbp_shard_stats_resource_elaspedtime_usage_currentMax\":2," + + "\"searchbp_shard_stats_resource_elaspedtime_usage_currentAvg\":2," + + "\"searchbp_task_stats_cancellationCount\":0," + + "\"searchbp_task_stats_limitReachedCount\":0," + + "\"searchbp_task_stats_resource_heap_usage_cancellationCount\":0," + + "\"searchbp_task_stats_resource_heap_usage_currentMax\":0," + + "\"searchbp_task_stats_resource_heap_usage_rollingAvg\":0," + + "\"searchbp_task_stats_resource_cpu_usage_cancellationCount\":0," + + "\"searchbp_task_stats_resource_cpu_usage_currentMax\":0," + + "\"searchbp_task_stats_resource_cpu_usage_currentAvg\":0," + + "\"searchbp_task_stats_resource_elaspedtime_usage_cancellationCount\":0," + + "\"searchbp_task_stats_resource_elaspedtime_usage_currentMax\":0," + + "\"searchbp_task_stats_resource_elaspedtime_usage_currentAvg\":0," + + "\"searchbp_mode\":\"MONITOR_ONLY\"," + + "\"searchbp_nodeid\":\"FgNAAAQQQDSROABCDEFHTX\"}"; + + @Before + public void setup() throws Exception { + Class.forName("org.sqlite.JDBC"); + System.setProperty("java.io.tmpdir", "/tmp"); + conn = DriverManager.getConnection(DB_URL); + this.currTimeStamp = System.currentTimeMillis(); + this.searchBackPressureStatsMap = new TreeMap<>(); + this.searchBackPressureMetricsProcessor = + searchBackPressureMetricsProcessor.buildSearchBackPressureMetricsProcessor( + currTimeStamp, conn, searchBackPressureStatsMap); + } + + // Test valid case of the handleSearchBackPressureEvent() + @Test + public void testSearchBackPressureProcessEvent() throws Exception { + // Create a SearchBackPressureEvent + Event testEvent = buildTestSearchBackPressureStatsEvent(); + + // Test the SearchBackPressureMetricsSnapShot + searchBackPressureMetricsProcessor.initializeProcessing( + this.currTimeStamp, System.currentTimeMillis()); + assertTrue(searchBackPressureMetricsProcessor.shouldProcessEvent(testEvent)); + + searchBackPressureMetricsProcessor.processEvent(testEvent); + searchBackPressureMetricsProcessor.finalizeProcessing(); + + SearchBackPressureMetricsSnapShot currSnapshot = + searchBackPressureStatsMap.get(this.currTimeStamp); + Result result = currSnapshot.fetchAll(); + assertEquals(1, result.size()); + + // SEARCHBP_SHARD_STATS_RESOURCE_HEAP_USAGE_ROLLINGAVG value is 3L according to the + // SERIALIZED_EVENT, should EQUAL + Assert.assertEquals( + 3L, + result.get(0) + .get( + AllMetrics.SearchBackPressureStatsValue + .SEARCHBP_SHARD_STATS_RESOURCE_HEAP_USAGE_ROLLINGAVG + .toString())); + // SEARCHBP_TASK_STATS_RESOURCE_CPU_USAGE_CANCELLATIONCOUNT value is 0L according to the + // SERIALIZED_EVENT, should EQUAL + Assert.assertEquals( + 0L, + result.get(0) + .get( + AllMetrics.SearchBackPressureStatsValue + .SEARCHBP_TASK_STATS_RESOURCE_CPU_USAGE_CANCELLATIONCOUNT + .toString())); + + // SEARCHBP_TASK_STATS_RESOURCE_CPU_USAGE_CANCELLATIONCOUNT value is 0L according to the + // SERIALIZED_EVENT, should NOT EQUAL + Assert.assertNotEquals( + 2L, + result.get(0) + .get( + AllMetrics.SearchBackPressureStatsValue + .SEARCHBP_TASK_STATS_RESOURCE_CPU_USAGE_CANCELLATIONCOUNT + .toString())); + } + + @Test + public void testEmptySearchBackPressureProcessEvent() throws Exception { + // Create a SearchBackPressureEvent + Event testEvent = buildEmptyTestSearchBackPressureStatsEvent(); + + // Test the SearchBackPressureMetricsSnapShot + searchBackPressureMetricsProcessor.initializeProcessing( + this.currTimeStamp, System.currentTimeMillis()); + assertTrue(searchBackPressureMetricsProcessor.shouldProcessEvent(testEvent)); + + try { + searchBackPressureMetricsProcessor.processEvent(testEvent); + Assert.assertFalse( + "Negative scenario test: Should catch a RuntimeException and skip this test", + true); + } catch (RuntimeException ex) { + // should catch the exception and the previous assertion should not be executed + } + } + + private Event buildTestSearchBackPressureStatsEvent() { + StringBuilder str = new StringBuilder(); + str.append(PerformanceAnalyzerMetrics.getJsonCurrentMilliSeconds()) + .append(PerformanceAnalyzerMetrics.sMetricNewLineDelimitor); + + str.append(SERIALIZED_EVENT).append(PerformanceAnalyzerMetrics.sMetricNewLineDelimitor); + return new Event( + SEARCH_BACK_PRESSURE_STATS_KEY, str.toString(), System.currentTimeMillis()); + } + + private Event buildEmptyTestSearchBackPressureStatsEvent() { + StringBuilder str = new StringBuilder(); + str.append(PerformanceAnalyzerMetrics.getJsonCurrentMilliSeconds()) + .append(PerformanceAnalyzerMetrics.sMetricNewLineDelimitor); + + return new Event( + SEARCH_BACK_PRESSURE_STATS_KEY, str.toString(), System.currentTimeMillis()); + } +} diff --git a/src/test/java/org/opensearch/performanceanalyzer/reader/SearchBackPressureMetricsSnapShotTest.java b/src/test/java/org/opensearch/performanceanalyzer/reader/SearchBackPressureMetricsSnapShotTest.java new file mode 100644 index 000000000..eeaa1a30f --- /dev/null +++ b/src/test/java/org/opensearch/performanceanalyzer/reader/SearchBackPressureMetricsSnapShotTest.java @@ -0,0 +1,136 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.performanceanalyzer.reader; + +import static org.junit.Assert.assertEquals; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.util.ArrayList; +import org.jooq.BatchBindStep; +import org.jooq.Record; +import org.jooq.Result; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.opensearch.performanceanalyzer.commons.metrics.AllMetrics; + +public class SearchBackPressureMetricsSnapShotTest { + private static final String DB_URL = "jdbc:sqlite:"; + private Connection conn; + SearchBackPressureMetricsSnapShot snapshot; + + ArrayList required_searchbp_dims = + new ArrayList() { + { + // Shard/Task Stats Cancellation Count + this.add( + AllMetrics.SearchBackPressureStatsValue + .SEARCHBP_SHARD_STATS_CANCELLATIONCOUNT + .toString()); + this.add( + AllMetrics.SearchBackPressureStatsValue + .SEARCHBP_TASK_STATS_CANCELLATIONCOUNT + .toString()); + + // Shard Stats Resource Heap / CPU Usage + this.add( + AllMetrics.SearchBackPressureStatsValue + .SEARCHBP_SHARD_STATS_RESOURCE_HEAP_USAGE_CANCELLATIONCOUNT + .toString()); + this.add( + AllMetrics.SearchBackPressureStatsValue + .SEARCHBP_SHARD_STATS_RESOURCE_HEAP_USAGE_CURRENTMAX + .toString()); + this.add( + AllMetrics.SearchBackPressureStatsValue + .SEARCHBP_SHARD_STATS_RESOURCE_HEAP_USAGE_ROLLINGAVG + .toString()); + this.add( + AllMetrics.SearchBackPressureStatsValue + .SEARCHBP_SHARD_STATS_RESOURCE_CPU_USAGE_CANCELLATIONCOUNT + .toString()); + this.add( + AllMetrics.SearchBackPressureStatsValue + .SEARCHBP_SHARD_STATS_RESOURCE_CPU_USAGE_CURRENTMAX + .toString()); + this.add( + AllMetrics.SearchBackPressureStatsValue + .SEARCHBP_SHARD_STATS_RESOURCE_CPU_USAGE_CURRENTAVG + .toString()); + + // Task Stats Resource Heap / CPU Usage + this.add( + AllMetrics.SearchBackPressureStatsValue + .SEARCHBP_TASK_STATS_RESOURCE_HEAP_USAGE_CANCELLATIONCOUNT + .toString()); + this.add( + AllMetrics.SearchBackPressureStatsValue + .SEARCHBP_TASK_STATS_RESOURCE_HEAP_USAGE_CURRENTMAX + .toString()); + this.add( + AllMetrics.SearchBackPressureStatsValue + .SEARCHBP_TASK_STATS_RESOURCE_HEAP_USAGE_ROLLINGAVG + .toString()); + this.add( + AllMetrics.SearchBackPressureStatsValue + .SEARCHBP_TASK_STATS_RESOURCE_CPU_USAGE_CANCELLATIONCOUNT + .toString()); + this.add( + AllMetrics.SearchBackPressureStatsValue + .SEARCHBP_TASK_STATS_RESOURCE_CPU_USAGE_CURRENTMAX + .toString()); + this.add( + AllMetrics.SearchBackPressureStatsValue + .SEARCHBP_TASK_STATS_RESOURCE_CPU_USAGE_CURRENTAVG + .toString()); + } + }; + + @Before + public void setup() throws Exception { + Class.forName("org.sqlite.JDBC"); + System.setProperty("java.io.tmpdir", "/tmp"); + conn = DriverManager.getConnection(DB_URL); + snapshot = new SearchBackPressureMetricsSnapShot(conn, System.currentTimeMillis()); + } + + @Test + public void testReadSearchBackPressureMetricsSnapshot() throws Exception { + final BatchBindStep handle = snapshot.startBatchPut(); + insertIntoTable(handle); + + final Result result = snapshot.fetchAll(); + + assertEquals(1, result.size()); + // for 14 (length of required_searchbp_dims) fields, each assign a value from 0 to 13 + // test each field and verify the result + for (long i = 0; i < required_searchbp_dims.size(); i++) { + Assert.assertEquals( + AllMetrics.SearchBackPressureStatsValue.SEARCHBP_SHARD_STATS_CANCELLATIONCOUNT + .toString() + + " should be " + + String.valueOf(i), + i, + result.get(0).get(required_searchbp_dims.get((int) i))); + } + } + + @After + public void tearDown() throws Exception { + conn.close(); + } + + private void insertIntoTable(BatchBindStep handle) { + Object[] bindVals = new Object[required_searchbp_dims.size()]; + for (int i = 0; i < required_searchbp_dims.size(); i++) { + bindVals[i] = Long.valueOf(i); + } + + handle.bind(bindVals).execute(); + } +} From 92c3fc808a7aff3e914e44797847b8bbbb2b5264 Mon Sep 17 00:00:00 2001 From: CoderJeffrey Date: Thu, 22 Jun 2023 11:38:55 -0700 Subject: [PATCH 02/11] Remove extra files (Signed-off-by: Jeffrey Liu ujeffliu@amazon.com) Signed-off-by: CoderJeffrey --- .../api/metrics/SearchBackPressureStats.java | 15 --------------- .../SearchBackPressureMetricsProcessor.java | 2 +- 2 files changed, 1 insertion(+), 16 deletions(-) delete mode 100644 src/main/java/org/opensearch/performanceanalyzer/rca/framework/api/metrics/SearchBackPressureStats.java diff --git a/src/main/java/org/opensearch/performanceanalyzer/rca/framework/api/metrics/SearchBackPressureStats.java b/src/main/java/org/opensearch/performanceanalyzer/rca/framework/api/metrics/SearchBackPressureStats.java deleted file mode 100644 index ae5c59814..000000000 --- a/src/main/java/org/opensearch/performanceanalyzer/rca/framework/api/metrics/SearchBackPressureStats.java +++ /dev/null @@ -1,15 +0,0 @@ -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - */ - -package org.opensearch.performanceanalyzer.rca.framework.api.metrics; - - -import org.opensearch.performanceanalyzer.rca.framework.api.Metric; - -public class SearchBackPressureStats extends Metric { - public SearchBackPressureStats(long evaluationIntervalSeconds) { - super("searchbp_shard_stats_cancellationCount", evaluationIntervalSeconds); - } -} diff --git a/src/main/java/org/opensearch/performanceanalyzer/reader/SearchBackPressureMetricsProcessor.java b/src/main/java/org/opensearch/performanceanalyzer/reader/SearchBackPressureMetricsProcessor.java index 8c6e93d8c..8eec8a831 100644 --- a/src/main/java/org/opensearch/performanceanalyzer/reader/SearchBackPressureMetricsProcessor.java +++ b/src/main/java/org/opensearch/performanceanalyzer/reader/SearchBackPressureMetricsProcessor.java @@ -38,7 +38,7 @@ private SearchBackPressureMetricsProcessor( SearchBackPressureMetricsSnapShot searchBackPressureMetricsSnapShot) { this.searchBackPressureMetricsSnapShot = searchBackPressureMetricsSnapShot; } - + /* * if current SnapShotMap has the snapshot for currentWindowStartTime, use the snapshot to build the processor * else create a new Instance of SearchBackPressureMetricsSnapShot to initialize the processor From a47388af973f39d93f049ef60e9d9b043bf9aa2b Mon Sep 17 00:00:00 2001 From: CoderJeffrey Date: Thu, 22 Jun 2023 11:42:43 -0700 Subject: [PATCH 03/11] Remove styling difference (Signed-off-by: Jeffrey Liu ujeffliu@amazon.com) Signed-off-by: CoderJeffrey --- .../performanceanalyzer/model/MetricAttributes.java | 1 + .../org/opensearch/performanceanalyzer/rca/Version.java | 7 ++----- .../rca/store/OpenSearchAnalysisGraph.java | 3 +-- 3 files changed, 4 insertions(+), 7 deletions(-) diff --git a/src/main/java/org/opensearch/performanceanalyzer/model/MetricAttributes.java b/src/main/java/org/opensearch/performanceanalyzer/model/MetricAttributes.java index 414e266b7..36230b8e5 100644 --- a/src/main/java/org/opensearch/performanceanalyzer/model/MetricAttributes.java +++ b/src/main/java/org/opensearch/performanceanalyzer/model/MetricAttributes.java @@ -14,6 +14,7 @@ public class MetricAttributes { public HashSet dimensionNames; MetricAttributes(String unit, MetricDimension[] dimensions) { + this.unit = unit; this.dimensionNames = new HashSet(); for (MetricDimension dimension : dimensions) { diff --git a/src/main/java/org/opensearch/performanceanalyzer/rca/Version.java b/src/main/java/org/opensearch/performanceanalyzer/rca/Version.java index ac53b4d72..bfc85fcd3 100644 --- a/src/main/java/org/opensearch/performanceanalyzer/rca/Version.java +++ b/src/main/java/org/opensearch/performanceanalyzer/rca/Version.java @@ -22,11 +22,8 @@ public final class Version { * Note: The RCA version is agnostic of OpenSearch version. */ static final class Major { - // Bumping this post the Commons - // Lib(https://github.com/opensearch-project/performance-analyzer-commons/issues/2) - // and Service - // Metrics(https://github.com/opensearch-project/performance-analyzer-commons/issues/8) - // change + // Bumping this post the Commons Lib(https://github.com/opensearch-project/performance-analyzer-commons/issues/2) + // and Service Metrics(https://github.com/opensearch-project/performance-analyzer-commons/issues/8) change static final int RCA_MAJ_VERSION = 1; } diff --git a/src/main/java/org/opensearch/performanceanalyzer/rca/store/OpenSearchAnalysisGraph.java b/src/main/java/org/opensearch/performanceanalyzer/rca/store/OpenSearchAnalysisGraph.java index 80763befb..e144f2ee1 100644 --- a/src/main/java/org/opensearch/performanceanalyzer/rca/store/OpenSearchAnalysisGraph.java +++ b/src/main/java/org/opensearch/performanceanalyzer/rca/store/OpenSearchAnalysisGraph.java @@ -183,8 +183,7 @@ public void construct() { // Use EVALUATION_INTERVAL_SECONDS instead of RCA_PERIOD which resolved to 12 seconds. // This is resulting in this RCA not getting executed in every 5 seconds. Rca> threadMetricsRca = - new ThreadMetricsRca( - threadBlockedTime, threadWaitedTime, EVALUATION_INTERVAL_SECONDS); + new ThreadMetricsRca(threadBlockedTime, threadWaitedTime, EVALUATION_INTERVAL_SECONDS); threadMetricsRca.addTag( RcaConsts.RcaTagConstants.TAG_LOCUS, RcaConsts.RcaTagConstants.LOCUS_DATA_CLUSTER_MANAGER_NODE); From 8b86501bec7ec07d0c87c82515883c61e7dacb72 Mon Sep 17 00:00:00 2001 From: CoderJeffrey Date: Thu, 22 Jun 2023 12:00:58 -0700 Subject: [PATCH 04/11] Remove unnecessary file changes (Signed-off-by: Jeffrey Liu ujeffliu@amazon.com) Signed-off-by: CoderJeffrey --- .../opensearch/performanceanalyzer/model/MetricsModel.java | 5 ----- 1 file changed, 5 deletions(-) diff --git a/src/main/java/org/opensearch/performanceanalyzer/model/MetricsModel.java b/src/main/java/org/opensearch/performanceanalyzer/model/MetricsModel.java index f7c781ac1..5b144ac12 100644 --- a/src/main/java/org/opensearch/performanceanalyzer/model/MetricsModel.java +++ b/src/main/java/org/opensearch/performanceanalyzer/model/MetricsModel.java @@ -464,11 +464,6 @@ public class MetricsModel { MetricUnits.MILLISECOND.toString(), AllMetrics.ShardIndexingPressureDimension.values())); - // Search Back Pressure Metrics - allMetricsInitializer.put( - AllMetrics.SearchBackPressureStatsValue.SEARCHBP_SHARD_STATS_CANCELLATIONCOUNT - .toString(), - new MetricAttributes(MetricUnits.COUNT.toString(), EmptyDimension.values())); ALL_METRICS = Collections.unmodifiableMap(allMetricsInitializer); } } From 8ace36ca6aae5fb645f1d71758d1eda590500c69 Mon Sep 17 00:00:00 2001 From: CoderJeffrey Date: Mon, 26 Jun 2023 16:20:15 -0700 Subject: [PATCH 05/11] Use Commons package for metric table naming and column naming (Signed-off-by: Jeffrey Liu ujeffliu@amazon.com) Signed-off-by: CoderJeffrey --- .../opensearch/performanceanalyzer/reader/MetricsEmitter.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/main/java/org/opensearch/performanceanalyzer/reader/MetricsEmitter.java b/src/main/java/org/opensearch/performanceanalyzer/reader/MetricsEmitter.java index b060fb346..28d7b6dbe 100644 --- a/src/main/java/org/opensearch/performanceanalyzer/reader/MetricsEmitter.java +++ b/src/main/java/org/opensearch/performanceanalyzer/reader/MetricsEmitter.java @@ -756,8 +756,8 @@ public static void emitSearchBackPressureMetrics( Result searchbp_records = searchBackPressureMetricsSnapShot.fetchAll(); // String SEARCHBP_MODE_DIM = "searchbp_mode"; - String SEARCHBP_TYPE_DIM = "SearchBackPressureStats"; - String SEARCHBP_TABLE_NAME = "searchbp_stats"; + String SEARCHBP_TYPE_DIM = AllMetrics.SearchBackPressureStatsValue.SEARCHBP_TYPE_DIM.toString(); + String SEARCHBP_TABLE_NAME = AllMetrics.SearchBackPressureStatsValue.SEARCHBP_TABLE_NAME.toString(); List dims = new ArrayList() { From 43208c71f56f362275e2dc0ccb460b248f88ec06 Mon Sep 17 00:00:00 2001 From: CoderJeffrey Date: Tue, 27 Jun 2023 15:24:21 -0700 Subject: [PATCH 06/11] Remove unnecessary comments and add more descriptive comments (Signed-off-by: Jeffrey Liu ujeffliu@amazon.com) Signed-off-by: CoderJeffrey --- .../reader/SearchBackPressureMetricsProcessor.java | 5 +---- .../reader/SearchBackPressureMetricsSnapShot.java | 1 + 2 files changed, 2 insertions(+), 4 deletions(-) diff --git a/src/main/java/org/opensearch/performanceanalyzer/reader/SearchBackPressureMetricsProcessor.java b/src/main/java/org/opensearch/performanceanalyzer/reader/SearchBackPressureMetricsProcessor.java index 8eec8a831..e3ab8a90a 100644 --- a/src/main/java/org/opensearch/performanceanalyzer/reader/SearchBackPressureMetricsProcessor.java +++ b/src/main/java/org/opensearch/performanceanalyzer/reader/SearchBackPressureMetricsProcessor.java @@ -91,11 +91,8 @@ public void commitBatchIfRequired() { // Handler method for incoming events private void handleSearchBackPressureEvent(String eventValue) { String[] lines = eventValue.split(System.lineSeparator()); - // 0thline is current time string (e.g. {current_time:1686952296889}) - // 1st line is the payload the metrics if (lines.length < 2) { throw new RuntimeException("Missing SearchBackPressure Metrics payload and timestamp."); - // return; } // Parse metrics payload @@ -107,8 +104,8 @@ private void parseJsonLine(final String jsonString) { if (map.isEmpty()) { throw new RuntimeException("Missing SearchBackPressure Metrics payload."); - // return; } + // A list of dims to be collected ArrayList required_searchbp_dims = new ArrayList() { diff --git a/src/main/java/org/opensearch/performanceanalyzer/reader/SearchBackPressureMetricsSnapShot.java b/src/main/java/org/opensearch/performanceanalyzer/reader/SearchBackPressureMetricsSnapShot.java index b995cbe44..259b33797 100644 --- a/src/main/java/org/opensearch/performanceanalyzer/reader/SearchBackPressureMetricsSnapShot.java +++ b/src/main/java/org/opensearch/performanceanalyzer/reader/SearchBackPressureMetricsSnapShot.java @@ -161,6 +161,7 @@ public DSLContext getDSLContext() { } public BatchBindStep startBatchPut() { + // Add dummy values because jooq requires this to support multiple bind statements with single insert query List dummyValues = new ArrayList<>(); for (int i = 0; i < columns.size(); i++) { dummyValues.add(null); From 55cd8c0b22cbee0ce0d19f92cf6f852b09aef51d Mon Sep 17 00:00:00 2001 From: CoderJeffrey Date: Tue, 27 Jun 2023 15:49:50 -0700 Subject: [PATCH 07/11] Remove unnecessary global variables (Signed-off-by: Jeffrey Liu ujeffliu@amazon.com) Signed-off-by: CoderJeffrey --- .../reader/SearchBackPressureMetricsSnapShot.java | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/src/main/java/org/opensearch/performanceanalyzer/reader/SearchBackPressureMetricsSnapShot.java b/src/main/java/org/opensearch/performanceanalyzer/reader/SearchBackPressureMetricsSnapShot.java index 259b33797..0f4246cd9 100644 --- a/src/main/java/org/opensearch/performanceanalyzer/reader/SearchBackPressureMetricsSnapShot.java +++ b/src/main/java/org/opensearch/performanceanalyzer/reader/SearchBackPressureMetricsSnapShot.java @@ -31,10 +31,6 @@ public class SearchBackPressureMetricsSnapShot implements Removable { private final String tableName; private List> columns; - // Global variables for naming - private static final String SEARCHBP_CONTROLLER_NAME_VALUE = "ControllerName"; - private static final String SEARCHBP_MODE_VALUE = "searchbp_mode"; - // Create a table with specifed fields (columns) public SearchBackPressureMetricsSnapShot(Connection conn, Long windowStartTime) { this.create = DSL.using(conn, SQLDialect.SQLITE); @@ -161,7 +157,8 @@ public DSLContext getDSLContext() { } public BatchBindStep startBatchPut() { - // Add dummy values because jooq requires this to support multiple bind statements with single insert query + // Add dummy values because jooq requires this to support multiple bind statements with + // single insert query List dummyValues = new ArrayList<>(); for (int i = 0; i < columns.size(); i++) { dummyValues.add(null); From 6578073b269cf96b294cee174535e9f5432d64cf Mon Sep 17 00:00:00 2001 From: CoderJeffrey Date: Thu, 29 Jun 2023 17:36:21 -0700 Subject: [PATCH 08/11] Add comments and adjust UTs (Signed-off-by: Jeffrey Liu ujeffliu@amazon.com) Signed-off-by: CoderJeffrey --- .../rca/store/OpenSearchAnalysisGraph.java | 3 +- .../reader/MetricsEmitter.java | 8 ++--- .../SearchBackPressureMetricsProcessor.java | 4 +-- .../SearchBackPressureMetricsSnapShot.java | 35 +++++++++++-------- ...earchBackPressureMetricsProcessorTest.java | 6 ++-- ...SearchBackPressureMetricsSnapShotTest.java | 4 +-- 6 files changed, 34 insertions(+), 26 deletions(-) diff --git a/src/main/java/org/opensearch/performanceanalyzer/rca/store/OpenSearchAnalysisGraph.java b/src/main/java/org/opensearch/performanceanalyzer/rca/store/OpenSearchAnalysisGraph.java index e144f2ee1..80763befb 100644 --- a/src/main/java/org/opensearch/performanceanalyzer/rca/store/OpenSearchAnalysisGraph.java +++ b/src/main/java/org/opensearch/performanceanalyzer/rca/store/OpenSearchAnalysisGraph.java @@ -183,7 +183,8 @@ public void construct() { // Use EVALUATION_INTERVAL_SECONDS instead of RCA_PERIOD which resolved to 12 seconds. // This is resulting in this RCA not getting executed in every 5 seconds. Rca> threadMetricsRca = - new ThreadMetricsRca(threadBlockedTime, threadWaitedTime, EVALUATION_INTERVAL_SECONDS); + new ThreadMetricsRca( + threadBlockedTime, threadWaitedTime, EVALUATION_INTERVAL_SECONDS); threadMetricsRca.addTag( RcaConsts.RcaTagConstants.TAG_LOCUS, RcaConsts.RcaTagConstants.LOCUS_DATA_CLUSTER_MANAGER_NODE); diff --git a/src/main/java/org/opensearch/performanceanalyzer/reader/MetricsEmitter.java b/src/main/java/org/opensearch/performanceanalyzer/reader/MetricsEmitter.java index 28d7b6dbe..30d07ce8b 100644 --- a/src/main/java/org/opensearch/performanceanalyzer/reader/MetricsEmitter.java +++ b/src/main/java/org/opensearch/performanceanalyzer/reader/MetricsEmitter.java @@ -756,8 +756,10 @@ public static void emitSearchBackPressureMetrics( Result searchbp_records = searchBackPressureMetricsSnapShot.fetchAll(); // String SEARCHBP_MODE_DIM = "searchbp_mode"; - String SEARCHBP_TYPE_DIM = AllMetrics.SearchBackPressureStatsValue.SEARCHBP_TYPE_DIM.toString(); - String SEARCHBP_TABLE_NAME = AllMetrics.SearchBackPressureStatsValue.SEARCHBP_TABLE_NAME.toString(); + String SEARCHBP_TYPE_DIM = + AllMetrics.SearchBackPressureStatsValue.SEARCHBP_TYPE_DIM.toString(); + String SEARCHBP_TABLE_NAME = + AllMetrics.SearchBackPressureStatsValue.SEARCHBP_TABLE_NAME.toString(); List dims = new ArrayList() { @@ -838,8 +840,6 @@ public static void emitSearchBackPressureMetrics( for (Record record : searchbp_records) { for (String stats_type : stats_types) { Optional tmpStatsObj = Optional.ofNullable(record.get(stats_type)); - // LOG.info(stats_type + " is: " + tmpStatsObj.map(o -> - // Long.parseLong(o.toString())).toString()); handle.bind( stats_type, diff --git a/src/main/java/org/opensearch/performanceanalyzer/reader/SearchBackPressureMetricsProcessor.java b/src/main/java/org/opensearch/performanceanalyzer/reader/SearchBackPressureMetricsProcessor.java index e3ab8a90a..766ed8bbe 100644 --- a/src/main/java/org/opensearch/performanceanalyzer/reader/SearchBackPressureMetricsProcessor.java +++ b/src/main/java/org/opensearch/performanceanalyzer/reader/SearchBackPressureMetricsProcessor.java @@ -56,7 +56,7 @@ static SearchBackPressureMetricsProcessor buildSearchBackPressureMetricsProcesso currentWindowStartTime, searchBackPressureMetricsSnapShot); return new SearchBackPressureMetricsProcessor(searchBackPressureMetricsSnapShot); } - + return new SearchBackPressureMetricsProcessor( searchBackPressureSnapshotNavigableMap.get(currentWindowStartTime)); } @@ -105,7 +105,7 @@ private void parseJsonLine(final String jsonString) { if (map.isEmpty()) { throw new RuntimeException("Missing SearchBackPressure Metrics payload."); } - + // A list of dims to be collected ArrayList required_searchbp_dims = new ArrayList() { diff --git a/src/main/java/org/opensearch/performanceanalyzer/reader/SearchBackPressureMetricsSnapShot.java b/src/main/java/org/opensearch/performanceanalyzer/reader/SearchBackPressureMetricsSnapShot.java index 0f4246cd9..907ab5540 100644 --- a/src/main/java/org/opensearch/performanceanalyzer/reader/SearchBackPressureMetricsSnapShot.java +++ b/src/main/java/org/opensearch/performanceanalyzer/reader/SearchBackPressureMetricsSnapShot.java @@ -28,7 +28,14 @@ public class SearchBackPressureMetricsSnapShot implements Removable { // entry point to interact with SQLite db private final DSLContext create; + /* + * This is a tmp table created to populate searchbp stats + * table name is the search_back_pressure_ + windowStartTime + */ private final String tableName; + + /* columns are the key metrics to be collected (e.g. shar-level search back pressure cancellation count) + */ private List> columns; // Create a table with specifed fields (columns) @@ -47,14 +54,14 @@ public SearchBackPressureMetricsSnapShot(Connection conn, Long windowStartTime) AllMetrics.SearchBackPressureStatsValue .SEARCHBP_SHARD_STATS_CANCELLATIONCOUNT .toString()), - Long.class)); + Integer.class)); this.add( DSL.field( DSL.name( AllMetrics.SearchBackPressureStatsValue .SEARCHBP_TASK_STATS_CANCELLATIONCOUNT .toString()), - Long.class)); + Integer.class)); // Shard Stats Resource Heap / CPU Usage this.add( @@ -63,28 +70,28 @@ public SearchBackPressureMetricsSnapShot(Connection conn, Long windowStartTime) AllMetrics.SearchBackPressureStatsValue .SEARCHBP_SHARD_STATS_RESOURCE_HEAP_USAGE_CANCELLATIONCOUNT .toString()), - Long.class)); + Integer.class)); this.add( DSL.field( DSL.name( AllMetrics.SearchBackPressureStatsValue .SEARCHBP_SHARD_STATS_RESOURCE_HEAP_USAGE_CURRENTMAX .toString()), - Long.class)); + Integer.class)); this.add( DSL.field( DSL.name( AllMetrics.SearchBackPressureStatsValue .SEARCHBP_SHARD_STATS_RESOURCE_HEAP_USAGE_ROLLINGAVG .toString()), - Long.class)); + Integer.class)); this.add( DSL.field( DSL.name( AllMetrics.SearchBackPressureStatsValue .SEARCHBP_SHARD_STATS_RESOURCE_CPU_USAGE_CANCELLATIONCOUNT .toString()), - Long.class)); + Integer.class)); this.add( DSL.field( @@ -92,14 +99,14 @@ public SearchBackPressureMetricsSnapShot(Connection conn, Long windowStartTime) AllMetrics.SearchBackPressureStatsValue .SEARCHBP_SHARD_STATS_RESOURCE_CPU_USAGE_CURRENTMAX .toString()), - Long.class)); + Integer.class)); this.add( DSL.field( DSL.name( AllMetrics.SearchBackPressureStatsValue .SEARCHBP_SHARD_STATS_RESOURCE_CPU_USAGE_CURRENTAVG .toString()), - Long.class)); + Integer.class)); // Task Stats Resource Heap / CPU Usage this.add( @@ -108,28 +115,28 @@ public SearchBackPressureMetricsSnapShot(Connection conn, Long windowStartTime) AllMetrics.SearchBackPressureStatsValue .SEARCHBP_TASK_STATS_RESOURCE_HEAP_USAGE_CANCELLATIONCOUNT .toString()), - Long.class)); + Integer.class)); this.add( DSL.field( DSL.name( AllMetrics.SearchBackPressureStatsValue .SEARCHBP_TASK_STATS_RESOURCE_HEAP_USAGE_CURRENTMAX .toString()), - Long.class)); + Integer.class)); this.add( DSL.field( DSL.name( AllMetrics.SearchBackPressureStatsValue .SEARCHBP_TASK_STATS_RESOURCE_HEAP_USAGE_ROLLINGAVG .toString()), - Long.class)); + Integer.class)); this.add( DSL.field( DSL.name( AllMetrics.SearchBackPressureStatsValue .SEARCHBP_TASK_STATS_RESOURCE_CPU_USAGE_CANCELLATIONCOUNT .toString()), - Long.class)); + Integer.class)); this.add( DSL.field( @@ -137,14 +144,14 @@ public SearchBackPressureMetricsSnapShot(Connection conn, Long windowStartTime) AllMetrics.SearchBackPressureStatsValue .SEARCHBP_TASK_STATS_RESOURCE_CPU_USAGE_CURRENTMAX .toString()), - Long.class)); + Integer.class)); this.add( DSL.field( DSL.name( AllMetrics.SearchBackPressureStatsValue .SEARCHBP_TASK_STATS_RESOURCE_CPU_USAGE_CURRENTAVG .toString()), - Long.class)); + Integer.class)); } }; diff --git a/src/test/java/org/opensearch/performanceanalyzer/reader/SearchBackPressureMetricsProcessorTest.java b/src/test/java/org/opensearch/performanceanalyzer/reader/SearchBackPressureMetricsProcessorTest.java index 6abbf4a90..2f5fc10ed 100644 --- a/src/test/java/org/opensearch/performanceanalyzer/reader/SearchBackPressureMetricsProcessorTest.java +++ b/src/test/java/org/opensearch/performanceanalyzer/reader/SearchBackPressureMetricsProcessorTest.java @@ -93,7 +93,7 @@ public void testSearchBackPressureProcessEvent() throws Exception { // SEARCHBP_SHARD_STATS_RESOURCE_HEAP_USAGE_ROLLINGAVG value is 3L according to the // SERIALIZED_EVENT, should EQUAL Assert.assertEquals( - 3L, + 3, result.get(0) .get( AllMetrics.SearchBackPressureStatsValue @@ -102,7 +102,7 @@ public void testSearchBackPressureProcessEvent() throws Exception { // SEARCHBP_TASK_STATS_RESOURCE_CPU_USAGE_CANCELLATIONCOUNT value is 0L according to the // SERIALIZED_EVENT, should EQUAL Assert.assertEquals( - 0L, + 0, result.get(0) .get( AllMetrics.SearchBackPressureStatsValue @@ -112,7 +112,7 @@ public void testSearchBackPressureProcessEvent() throws Exception { // SEARCHBP_TASK_STATS_RESOURCE_CPU_USAGE_CANCELLATIONCOUNT value is 0L according to the // SERIALIZED_EVENT, should NOT EQUAL Assert.assertNotEquals( - 2L, + 2, result.get(0) .get( AllMetrics.SearchBackPressureStatsValue diff --git a/src/test/java/org/opensearch/performanceanalyzer/reader/SearchBackPressureMetricsSnapShotTest.java b/src/test/java/org/opensearch/performanceanalyzer/reader/SearchBackPressureMetricsSnapShotTest.java index eeaa1a30f..b2c3626d5 100644 --- a/src/test/java/org/opensearch/performanceanalyzer/reader/SearchBackPressureMetricsSnapShotTest.java +++ b/src/test/java/org/opensearch/performanceanalyzer/reader/SearchBackPressureMetricsSnapShotTest.java @@ -109,7 +109,7 @@ public void testReadSearchBackPressureMetricsSnapshot() throws Exception { assertEquals(1, result.size()); // for 14 (length of required_searchbp_dims) fields, each assign a value from 0 to 13 // test each field and verify the result - for (long i = 0; i < required_searchbp_dims.size(); i++) { + for (int i = 0; i < required_searchbp_dims.size(); i++) { Assert.assertEquals( AllMetrics.SearchBackPressureStatsValue.SEARCHBP_SHARD_STATS_CANCELLATIONCOUNT .toString() @@ -128,7 +128,7 @@ public void tearDown() throws Exception { private void insertIntoTable(BatchBindStep handle) { Object[] bindVals = new Object[required_searchbp_dims.size()]; for (int i = 0; i < required_searchbp_dims.size(); i++) { - bindVals[i] = Long.valueOf(i); + bindVals[i] = Integer.valueOf(i); } handle.bind(bindVals).execute(); From 353c3a6490476c42dd28d3a9b9e2274cc8dadace Mon Sep 17 00:00:00 2001 From: CoderJeffrey Date: Mon, 3 Jul 2023 11:14:55 -0700 Subject: [PATCH 09/11] Remove tab space and change avg datatype to LONG (Signed-off-by: Jeffrey Liu ujeffliu@amazon.com) Signed-off-by: CoderJeffrey --- .../reader/SearchBackPressureMetricsProcessor.java | 1 - .../reader/SearchBackPressureMetricsSnapShot.java | 8 ++++---- .../reader/SearchBackPressureMetricsProcessorTest.java | 2 +- .../reader/SearchBackPressureMetricsSnapShotTest.java | 2 +- 4 files changed, 6 insertions(+), 7 deletions(-) diff --git a/src/main/java/org/opensearch/performanceanalyzer/reader/SearchBackPressureMetricsProcessor.java b/src/main/java/org/opensearch/performanceanalyzer/reader/SearchBackPressureMetricsProcessor.java index 766ed8bbe..6e9547754 100644 --- a/src/main/java/org/opensearch/performanceanalyzer/reader/SearchBackPressureMetricsProcessor.java +++ b/src/main/java/org/opensearch/performanceanalyzer/reader/SearchBackPressureMetricsProcessor.java @@ -56,7 +56,6 @@ static SearchBackPressureMetricsProcessor buildSearchBackPressureMetricsProcesso currentWindowStartTime, searchBackPressureMetricsSnapShot); return new SearchBackPressureMetricsProcessor(searchBackPressureMetricsSnapShot); } - return new SearchBackPressureMetricsProcessor( searchBackPressureSnapshotNavigableMap.get(currentWindowStartTime)); } diff --git a/src/main/java/org/opensearch/performanceanalyzer/reader/SearchBackPressureMetricsSnapShot.java b/src/main/java/org/opensearch/performanceanalyzer/reader/SearchBackPressureMetricsSnapShot.java index 907ab5540..1f1a05ea7 100644 --- a/src/main/java/org/opensearch/performanceanalyzer/reader/SearchBackPressureMetricsSnapShot.java +++ b/src/main/java/org/opensearch/performanceanalyzer/reader/SearchBackPressureMetricsSnapShot.java @@ -84,7 +84,7 @@ public SearchBackPressureMetricsSnapShot(Connection conn, Long windowStartTime) AllMetrics.SearchBackPressureStatsValue .SEARCHBP_SHARD_STATS_RESOURCE_HEAP_USAGE_ROLLINGAVG .toString()), - Integer.class)); + Long.class)); this.add( DSL.field( DSL.name( @@ -106,7 +106,7 @@ public SearchBackPressureMetricsSnapShot(Connection conn, Long windowStartTime) AllMetrics.SearchBackPressureStatsValue .SEARCHBP_SHARD_STATS_RESOURCE_CPU_USAGE_CURRENTAVG .toString()), - Integer.class)); + Long.class)); // Task Stats Resource Heap / CPU Usage this.add( @@ -129,7 +129,7 @@ public SearchBackPressureMetricsSnapShot(Connection conn, Long windowStartTime) AllMetrics.SearchBackPressureStatsValue .SEARCHBP_TASK_STATS_RESOURCE_HEAP_USAGE_ROLLINGAVG .toString()), - Integer.class)); + Long.class)); this.add( DSL.field( DSL.name( @@ -151,7 +151,7 @@ public SearchBackPressureMetricsSnapShot(Connection conn, Long windowStartTime) AllMetrics.SearchBackPressureStatsValue .SEARCHBP_TASK_STATS_RESOURCE_CPU_USAGE_CURRENTAVG .toString()), - Integer.class)); + Long.class)); } }; diff --git a/src/test/java/org/opensearch/performanceanalyzer/reader/SearchBackPressureMetricsProcessorTest.java b/src/test/java/org/opensearch/performanceanalyzer/reader/SearchBackPressureMetricsProcessorTest.java index 2f5fc10ed..575712a2d 100644 --- a/src/test/java/org/opensearch/performanceanalyzer/reader/SearchBackPressureMetricsProcessorTest.java +++ b/src/test/java/org/opensearch/performanceanalyzer/reader/SearchBackPressureMetricsProcessorTest.java @@ -93,7 +93,7 @@ public void testSearchBackPressureProcessEvent() throws Exception { // SEARCHBP_SHARD_STATS_RESOURCE_HEAP_USAGE_ROLLINGAVG value is 3L according to the // SERIALIZED_EVENT, should EQUAL Assert.assertEquals( - 3, + 3L, result.get(0) .get( AllMetrics.SearchBackPressureStatsValue diff --git a/src/test/java/org/opensearch/performanceanalyzer/reader/SearchBackPressureMetricsSnapShotTest.java b/src/test/java/org/opensearch/performanceanalyzer/reader/SearchBackPressureMetricsSnapShotTest.java index b2c3626d5..2e88aa574 100644 --- a/src/test/java/org/opensearch/performanceanalyzer/reader/SearchBackPressureMetricsSnapShotTest.java +++ b/src/test/java/org/opensearch/performanceanalyzer/reader/SearchBackPressureMetricsSnapShotTest.java @@ -116,7 +116,7 @@ public void testReadSearchBackPressureMetricsSnapshot() throws Exception { + " should be " + String.valueOf(i), i, - result.get(0).get(required_searchbp_dims.get((int) i))); + ((Number) result.get(0).get(required_searchbp_dims.get(i))).intValue()); } } From ebde9f7999400b7deaa3e348fb65ae69543e859d Mon Sep 17 00:00:00 2001 From: CoderJeffrey Date: Mon, 3 Jul 2023 11:18:15 -0700 Subject: [PATCH 10/11] Remove tab space and change avg datatype to LONG and additional JAVADOC (Signed-off-by: Jeffrey Liu ujeffliu@amazon.com) Signed-off-by: CoderJeffrey --- .../reader/SearchBackPressureMetricsSnapShot.java | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/src/main/java/org/opensearch/performanceanalyzer/reader/SearchBackPressureMetricsSnapShot.java b/src/main/java/org/opensearch/performanceanalyzer/reader/SearchBackPressureMetricsSnapShot.java index 1f1a05ea7..bcf5c09ad 100644 --- a/src/main/java/org/opensearch/performanceanalyzer/reader/SearchBackPressureMetricsSnapShot.java +++ b/src/main/java/org/opensearch/performanceanalyzer/reader/SearchBackPressureMetricsSnapShot.java @@ -20,6 +20,10 @@ import org.jooq.impl.DSL; import org.opensearch.performanceanalyzer.commons.metrics.AllMetrics; +/* + * SearchBackPressure cluster/node-level RCA would consume these data in the snapshots and determine whether the search back pressure service + * has cancelled too much/ too less requests, by comparing with predefined threshold. + */ public class SearchBackPressureMetricsSnapShot implements Removable { // Logger for current class From 16b6db415e5e833883f5997a6c559a9bf8ee5464 Mon Sep 17 00:00:00 2001 From: CoderJeffrey Date: Mon, 3 Jul 2023 11:45:19 -0700 Subject: [PATCH 11/11] Add description for searchbp stats in sqlitedb (signed-off-by: Jeffrey Liu ujeffliu@amazon.com) Signed-off-by: CoderJeffrey --- .../reader/MetricsEmitter.java | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/src/main/java/org/opensearch/performanceanalyzer/reader/MetricsEmitter.java b/src/main/java/org/opensearch/performanceanalyzer/reader/MetricsEmitter.java index 30d07ce8b..8b2043c77 100644 --- a/src/main/java/org/opensearch/performanceanalyzer/reader/MetricsEmitter.java +++ b/src/main/java/org/opensearch/performanceanalyzer/reader/MetricsEmitter.java @@ -768,64 +768,80 @@ public static void emitSearchBackPressureMetrics( } }; + // stats type in sqlitedb is similar to: + // stats_type_name | sum | avg | min | max List stats_types = new ArrayList() { { // Shard/Task Stats Cancellation Count + // searchbp_shard_stats_cancellationCount|0.0|0.0|0.0|0.0 this.add( AllMetrics.SearchBackPressureStatsValue .SEARCHBP_SHARD_STATS_CANCELLATIONCOUNT .toString()); + // searchbp_task_stats_cancellationCount|0.0|0.0|0.0|0.0 this.add( AllMetrics.SearchBackPressureStatsValue .SEARCHBP_TASK_STATS_CANCELLATIONCOUNT .toString()); // Shard Stats Resource Heap / CPU Usage + // searchbp_shard_stats_resource_heap_usage_cancellationCount|0.0|0.0|0.0|0.0 this.add( AllMetrics.SearchBackPressureStatsValue .SEARCHBP_SHARD_STATS_RESOURCE_HEAP_USAGE_CANCELLATIONCOUNT .toString()); + // searchbp_shard_stats_resource_heap_usage_currentMax|0.0|0.0|0.0|0.0 this.add( AllMetrics.SearchBackPressureStatsValue .SEARCHBP_SHARD_STATS_RESOURCE_HEAP_USAGE_CURRENTMAX .toString()); + // searchbp_shard_stats_resource_heap_usage_rollingAvg|0.0|0.0|0.0|0.0 this.add( AllMetrics.SearchBackPressureStatsValue .SEARCHBP_SHARD_STATS_RESOURCE_HEAP_USAGE_ROLLINGAVG .toString()); + // searchbp_shard_stats_resource_cpu_usage_cancellationCount|0.0|0.0|0.0|0.0 this.add( AllMetrics.SearchBackPressureStatsValue .SEARCHBP_SHARD_STATS_RESOURCE_CPU_USAGE_CANCELLATIONCOUNT .toString()); + // searchbp_shard_stats_resource_cpu_usage_currentMax|0.0|0.0|0.0|0.0 this.add( AllMetrics.SearchBackPressureStatsValue .SEARCHBP_SHARD_STATS_RESOURCE_CPU_USAGE_CURRENTMAX .toString()); + // searchbp_shard_stats_resource_cpu_usage_currentAvg|0.0|0.0|0.0|0.0 this.add( AllMetrics.SearchBackPressureStatsValue .SEARCHBP_SHARD_STATS_RESOURCE_CPU_USAGE_CURRENTAVG .toString()); // Task Stats Resource Heap / CPU Usage + // searchbp_task_stats_resource_heap_usage_cancellationCount|0.0|0.0|0.0|0.0 this.add( AllMetrics.SearchBackPressureStatsValue .SEARCHBP_TASK_STATS_RESOURCE_HEAP_USAGE_CANCELLATIONCOUNT .toString()); + // searchbp_task_stats_resource_heap_usage_currentMax|0.0|0.0|0.0|0.0 this.add( AllMetrics.SearchBackPressureStatsValue .SEARCHBP_TASK_STATS_RESOURCE_HEAP_USAGE_CURRENTMAX .toString()); + // searchbp_task_stats_resource_heap_usage_rollingAvg|0.0|0.0|0.0|0.0 this.add( AllMetrics.SearchBackPressureStatsValue .SEARCHBP_TASK_STATS_RESOURCE_HEAP_USAGE_ROLLINGAVG .toString()); + // searchbp_task_stats_resource_cpu_usage_cancellationCount|0.0|0.0|0.0|0.0 this.add( AllMetrics.SearchBackPressureStatsValue .SEARCHBP_TASK_STATS_RESOURCE_CPU_USAGE_CANCELLATIONCOUNT .toString()); + // searchbp_task_stats_resource_cpu_usage_currentMax|0.0|0.0|0.0|0.0 this.add( AllMetrics.SearchBackPressureStatsValue .SEARCHBP_TASK_STATS_RESOURCE_CPU_USAGE_CURRENTMAX .toString()); + // searchbp_task_stats_resource_cpu_usage_currentAvg|0.0|0.0|0.0|0.0 this.add( AllMetrics.SearchBackPressureStatsValue .SEARCHBP_TASK_STATS_RESOURCE_CPU_USAGE_CURRENTAVG