Skip to content

Commit

Permalink
Searchbackpressure Service Reader and UTs added (opensearch-project#427
Browse files Browse the repository at this point in the history
…) (opensearch-project#495)

* Remove log files and add DCO (Signed-off-by: Jeffrey Liu [email protected])

Signed-off-by: CoderJeffrey <[email protected]>

* Remove extra files (Signed-off-by: Jeffrey Liu [email protected])

Signed-off-by: CoderJeffrey <[email protected]>

* Remove styling difference (Signed-off-by: Jeffrey Liu [email protected])

Signed-off-by: CoderJeffrey <[email protected]>

* Remove unnecessary file changes (Signed-off-by: Jeffrey Liu [email protected])

Signed-off-by: CoderJeffrey <[email protected]>

* Use Commons package for metric table naming and column naming (Signed-off-by: Jeffrey Liu [email protected])

Signed-off-by: CoderJeffrey <[email protected]>

* Remove unnecessary comments and add more descriptive comments (Signed-off-by: Jeffrey Liu [email protected])

Signed-off-by: CoderJeffrey <[email protected]>

* Remove unnecessary global variables (Signed-off-by: Jeffrey Liu [email protected])

Signed-off-by: CoderJeffrey <[email protected]>

* Add comments and adjust UTs (Signed-off-by: Jeffrey Liu [email protected])

Signed-off-by: CoderJeffrey <[email protected]>

* Remove tab space and change avg datatype to LONG  (Signed-off-by: Jeffrey Liu [email protected])

Signed-off-by: CoderJeffrey <[email protected]>

* Remove tab space and change avg datatype to LONG and additional JAVADOC (Signed-off-by: Jeffrey Liu [email protected])

Signed-off-by: CoderJeffrey <[email protected]>

* Add description for searchbp stats in sqlitedb (signed-off-by: Jeffrey Liu [email protected])

Signed-off-by: CoderJeffrey <[email protected]>

---------

Signed-off-by: CoderJeffrey <[email protected]>
(cherry picked from commit 53e3b94)

Co-authored-by: Jeffrey Liu <[email protected]>
  • Loading branch information
1 parent d29ac81 commit aef3981
Show file tree
Hide file tree
Showing 7 changed files with 838 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,13 @@ public enum ReaderMetrics implements MeasurementSet {
"FaultDetectionMetricsEmitterExecutionTime",
"millis",
StatsType.LATENCIES,
Statistics.SUM);
Statistics.SUM),
SEARCH_BACK_PRESSURE_METRICS_EMITTER_EXECUTION_TIME(
"SearchBackPressureMetricsEmitterExecutionTime",
"millis",
StatsType.LATENCIES,
Statistics.SUM),
;

/** What we want to appear as the metric name. */
private String name;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -749,6 +749,136 @@ public static void emitGarbageCollectionInfo(
ReaderMetrics.GC_INFO_EMITTER_EXECUTION_TIME, mFinalT - mCurrT);
}

public static void emitSearchBackPressureMetrics(
MetricsDB metricsDB,
SearchBackPressureMetricsSnapShot searchBackPressureMetricsSnapShot) {
long mCurrT = System.currentTimeMillis();
Result<Record> searchbp_records = searchBackPressureMetricsSnapShot.fetchAll();

// String SEARCHBP_MODE_DIM = "searchbp_mode";
String SEARCHBP_TYPE_DIM =
AllMetrics.SearchBackPressureStatsValue.SEARCHBP_TYPE_DIM.toString();
String SEARCHBP_TABLE_NAME =
AllMetrics.SearchBackPressureStatsValue.SEARCHBP_TABLE_NAME.toString();

List<String> dims =
new ArrayList<String>() {
{
this.add(SEARCHBP_TYPE_DIM);
}
};

// stats type in sqlitedb is similar to:
// stats_type_name | sum | avg | min | max
List<String> stats_types =
new ArrayList<String>() {
{
// Shard/Task Stats Cancellation Count
// searchbp_shard_stats_cancellationCount|0.0|0.0|0.0|0.0
this.add(
AllMetrics.SearchBackPressureStatsValue
.SEARCHBP_SHARD_STATS_CANCELLATIONCOUNT
.toString());
// searchbp_task_stats_cancellationCount|0.0|0.0|0.0|0.0
this.add(
AllMetrics.SearchBackPressureStatsValue
.SEARCHBP_TASK_STATS_CANCELLATIONCOUNT
.toString());
// Shard Stats Resource Heap / CPU Usage
// searchbp_shard_stats_resource_heap_usage_cancellationCount|0.0|0.0|0.0|0.0
this.add(
AllMetrics.SearchBackPressureStatsValue
.SEARCHBP_SHARD_STATS_RESOURCE_HEAP_USAGE_CANCELLATIONCOUNT
.toString());
// searchbp_shard_stats_resource_heap_usage_currentMax|0.0|0.0|0.0|0.0
this.add(
AllMetrics.SearchBackPressureStatsValue
.SEARCHBP_SHARD_STATS_RESOURCE_HEAP_USAGE_CURRENTMAX
.toString());
// searchbp_shard_stats_resource_heap_usage_rollingAvg|0.0|0.0|0.0|0.0
this.add(
AllMetrics.SearchBackPressureStatsValue
.SEARCHBP_SHARD_STATS_RESOURCE_HEAP_USAGE_ROLLINGAVG
.toString());
// searchbp_shard_stats_resource_cpu_usage_cancellationCount|0.0|0.0|0.0|0.0
this.add(
AllMetrics.SearchBackPressureStatsValue
.SEARCHBP_SHARD_STATS_RESOURCE_CPU_USAGE_CANCELLATIONCOUNT
.toString());
// searchbp_shard_stats_resource_cpu_usage_currentMax|0.0|0.0|0.0|0.0
this.add(
AllMetrics.SearchBackPressureStatsValue
.SEARCHBP_SHARD_STATS_RESOURCE_CPU_USAGE_CURRENTMAX
.toString());
// searchbp_shard_stats_resource_cpu_usage_currentAvg|0.0|0.0|0.0|0.0
this.add(
AllMetrics.SearchBackPressureStatsValue
.SEARCHBP_SHARD_STATS_RESOURCE_CPU_USAGE_CURRENTAVG
.toString());
// Task Stats Resource Heap / CPU Usage
// searchbp_task_stats_resource_heap_usage_cancellationCount|0.0|0.0|0.0|0.0
this.add(
AllMetrics.SearchBackPressureStatsValue
.SEARCHBP_TASK_STATS_RESOURCE_HEAP_USAGE_CANCELLATIONCOUNT
.toString());
// searchbp_task_stats_resource_heap_usage_currentMax|0.0|0.0|0.0|0.0
this.add(
AllMetrics.SearchBackPressureStatsValue
.SEARCHBP_TASK_STATS_RESOURCE_HEAP_USAGE_CURRENTMAX
.toString());
// searchbp_task_stats_resource_heap_usage_rollingAvg|0.0|0.0|0.0|0.0
this.add(
AllMetrics.SearchBackPressureStatsValue
.SEARCHBP_TASK_STATS_RESOURCE_HEAP_USAGE_ROLLINGAVG
.toString());
// searchbp_task_stats_resource_cpu_usage_cancellationCount|0.0|0.0|0.0|0.0
this.add(
AllMetrics.SearchBackPressureStatsValue
.SEARCHBP_TASK_STATS_RESOURCE_CPU_USAGE_CANCELLATIONCOUNT
.toString());
// searchbp_task_stats_resource_cpu_usage_currentMax|0.0|0.0|0.0|0.0
this.add(
AllMetrics.SearchBackPressureStatsValue
.SEARCHBP_TASK_STATS_RESOURCE_CPU_USAGE_CURRENTMAX
.toString());
// searchbp_task_stats_resource_cpu_usage_currentAvg|0.0|0.0|0.0|0.0
this.add(
AllMetrics.SearchBackPressureStatsValue
.SEARCHBP_TASK_STATS_RESOURCE_CPU_USAGE_CURRENTAVG
.toString());
}
};

metricsDB.createMetric(new Metric<>(SEARCHBP_TABLE_NAME, 0d), dims);

BatchBindStep handle = metricsDB.startBatchPut(new Metric<>(SEARCHBP_TABLE_NAME, 0d), dims);

for (Record record : searchbp_records) {
for (String stats_type : stats_types) {
Optional<Object> tmpStatsObj = Optional.ofNullable(record.get(stats_type));

handle.bind(
stats_type,
// the rest are agg fields: sum, avg, min, max which don't make sense for
// searchbackpressure
tmpStatsObj.map(o -> Long.parseLong(o.toString())).orElse(0L),
tmpStatsObj.map(o -> Long.parseLong(o.toString())).orElse(0L),
tmpStatsObj.map(o -> Long.parseLong(o.toString())).orElse(0L),
tmpStatsObj.map(o -> Long.parseLong(o.toString())).orElse(0L));
}
}

handle.execute();

long mFinalT = System.currentTimeMillis();
LOG.debug(
"Total time taken for writing Search Back Pressure info into metricsDB: {}",
mFinalT - mCurrT);
ServiceMetrics.READER_METRICS_AGGREGATOR.updateStat(
ReaderMetrics.SEARCH_BACK_PRESSURE_METRICS_EMITTER_EXECUTION_TIME,
mFinalT - mCurrT);
}

public static void emitAdmissionControlMetrics(
MetricsDB metricsDB, AdmissionControlSnapshot snapshot) {
long mCurrT = System.currentTimeMillis();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ public class ReaderMetricsProcessor implements Runnable {
clusterManagerThrottlingMetricsMap;
private NavigableMap<Long, ShardStateMetricsSnapshot> shardStateMetricsMap;
private NavigableMap<Long, AdmissionControlSnapshot> admissionControlMetricsMap;
private NavigableMap<Long, SearchBackPressureMetricsSnapShot> searchBackPressureMetricsMap;

private static final int MAX_DATABASES = 2;
private static final int OS_SNAPSHOTS = 4;
Expand All @@ -81,6 +82,7 @@ public class ReaderMetricsProcessor implements Runnable {
private static final int GC_INFO_SNAPSHOTS = 4;
private static final int CLUSTER_MANAGER_THROTTLING_SNAPSHOTS = 2;
private static final int AC_SNAPSHOTS = 2;
private static final int SEARCH_BP_SNAPSHOTS = 4;
private final String rootLocation;

private final AppContext appContext;
Expand Down Expand Up @@ -125,6 +127,8 @@ public ReaderMetricsProcessor(
gcInfoMap = new TreeMap<>();
clusterManagerThrottlingMetricsMap = new TreeMap<>();
admissionControlMetricsMap = new TreeMap<>();
searchBackPressureMetricsMap = new TreeMap<>();

this.rootLocation = rootLocation;
this.configOverridesApplier = new ConfigOverridesApplier();

Expand Down Expand Up @@ -268,6 +272,7 @@ public void trimOldSnapshots() throws Exception {
trimMap(gcInfoMap, GC_INFO_SNAPSHOTS);
trimMap(clusterManagerThrottlingMetricsMap, CLUSTER_MANAGER_THROTTLING_SNAPSHOTS);
trimMap(admissionControlMetricsMap, AC_SNAPSHOTS);
trimMap(searchBackPressureMetricsMap, SEARCH_BP_SNAPSHOTS);

for (NavigableMap<Long, MemoryDBSnapshot> snap : nodeMetricsMap.values()) {
// do the same thing as OS_SNAPSHOTS. Eventually MemoryDBSnapshot
Expand Down Expand Up @@ -397,6 +402,7 @@ private void emitMetrics(long currWindowStartTime) throws Exception {
emitAdmissionControlMetrics(prevWindowStartTime, metricsDB);
emitClusterManagerMetrics(prevWindowStartTime, metricsDB);
emitClusterManagerThrottlingMetrics(prevWindowStartTime, metricsDB);
emitSearchBackPressureMetrics(prevWindowStartTime, metricsDB);

metricsDB.commit();
metricsDBMap.put(prevWindowStartTime, metricsDB);
Expand Down Expand Up @@ -594,6 +600,19 @@ private void emitClusterManagerThrottlingMetrics(
}
}

private void emitSearchBackPressureMetrics(long prevWindowStartTime, MetricsDB metricsDB)
throws Exception {
if (searchBackPressureMetricsMap.containsKey(prevWindowStartTime)) {
SearchBackPressureMetricsSnapShot prevSearchBPSnapShot =
searchBackPressureMetricsMap.get(prevWindowStartTime);
MetricsEmitter.emitSearchBackPressureMetrics(metricsDB, prevSearchBPSnapShot);
} else {
LOG.debug(
"Search Back Pressure snapshot does not exist for the previous window. "
+ "Not emitting metrics.");
}
}

/**
* OS, Request, Http and cluster_manager first aligns the currentTimeStamp with a 5 second
* interval. In the current format, a file (previously a directory) is written every 5 seconds.
Expand Down Expand Up @@ -679,6 +698,9 @@ is ready so it starts to read that file (go back two windows and
EventProcessor admissionControlProcessor =
AdmissionControlProcessor.build(
currWindowStartTime, conn, admissionControlMetricsMap);
EventProcessor searchBackPressureMetricsProcessor =
SearchBackPressureMetricsProcessor.buildSearchBackPressureMetricsProcessor(
currWindowStartTime, conn, searchBackPressureMetricsMap);

// The event dispatcher dispatches events to each of the registered event processors.
// In addition to event processing each processor has an initialize/finalize function that
Expand All @@ -702,6 +724,7 @@ is ready so it starts to read that file (go back two windows and
eventDispatcher.registerEventProcessor(faultDetectionProcessor);
eventDispatcher.registerEventProcessor(garbageCollectorInfoProcessor);
eventDispatcher.registerEventProcessor(admissionControlProcessor);
eventDispatcher.registerEventProcessor(searchBackPressureMetricsProcessor);

eventDispatcher.initializeProcessing(
currWindowStartTime, currWindowStartTime + MetricsConfiguration.SAMPLING_INTERVAL);
Expand Down
Loading

0 comments on commit aef3981

Please sign in to comment.