diff --git a/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/collectors/MasterServiceEventMetrics.java b/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/collectors/MasterServiceEventMetrics.java index 0488e6b2..520e0e96 100644 --- a/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/collectors/MasterServiceEventMetrics.java +++ b/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/collectors/MasterServiceEventMetrics.java @@ -31,8 +31,8 @@ import org.elasticsearch.common.util.concurrent.PrioritizedEsThreadPoolExecutor; import com.amazon.opendistro.elasticsearch.performanceanalyzer.ESResources; -import com.amazon.opendistro.elasticsearch.performanceanalyzer.metrics.AllMetrics.Master_Metric_Dimensions; -import com.amazon.opendistro.elasticsearch.performanceanalyzer.metrics.AllMetrics.Master_Metric_Values; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.metrics.AllMetrics.MasterMetricDimensions; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.metrics.AllMetrics.MasterMetricValues; import com.amazon.opendistro.elasticsearch.performanceanalyzer.metrics.MetricsConfiguration; import com.amazon.opendistro.elasticsearch.performanceanalyzer.metrics.MetricsProcessor; import com.amazon.opendistro.elasticsearch.performanceanalyzer.metrics.ThreadIDUtil; @@ -107,17 +107,17 @@ public void collectMetrics(long startTime) { lastTaskInsertionOrder = firstPending.insertionOrder; int firstSpaceIndex = task.source().indexOf(" "); value.append(PerformanceAnalyzerMetrics.getCurrentTimeMetric()); - PerformanceAnalyzerMetrics.addMetricEntry(value, Master_Metric_Dimensions.MASTER_TASK_PRIORITY.toString(), + PerformanceAnalyzerMetrics.addMetricEntry(value, MasterMetricDimensions.MASTER_TASK_PRIORITY.toString(), firstPending.priority.toString()); //- as it is sampling, we won't exactly know the start time of the current task, we will be //- capturing start time as midpoint of previous time bucket - PerformanceAnalyzerMetrics.addMetricEntry(value, Master_Metric_Values.START_TIME.toString(), + PerformanceAnalyzerMetrics.addMetricEntry(value, MasterMetricValues.START_TIME.toString(), startTime - SAMPLING_TIME_INTERVAL / 2); - PerformanceAnalyzerMetrics.addMetricEntry(value, Master_Metric_Dimensions.MASTER_TASK_TYPE.toString(), + PerformanceAnalyzerMetrics.addMetricEntry(value, MasterMetricDimensions.MASTER_TASK_TYPE.toString(), firstSpaceIndex == -1 ? task.source() : task.source().substring(0, firstSpaceIndex)); - PerformanceAnalyzerMetrics.addMetricEntry(value, Master_Metric_Dimensions.MASTER_TASK_METADATA.toString(), + PerformanceAnalyzerMetrics.addMetricEntry(value, MasterMetricDimensions.MASTER_TASK_METADATA.toString(), firstSpaceIndex == -1 ? "" : task.source().substring(firstSpaceIndex)); - PerformanceAnalyzerMetrics.addMetricEntry(value, Master_Metric_Dimensions.MASTER_TASK_AGE.toString(), + PerformanceAnalyzerMetrics.addMetricEntry(value, MasterMetricDimensions.MASTER_TASK_QUEUE_TIME.toString(), task.getAgeInMillis()); saveMetricValues(value.toString(), startTime, String.valueOf(getMasterThreadId()), @@ -137,7 +137,7 @@ public void collectMetrics(long startTime) { private void generateFinishMetrics(long startTime) { if (lastTaskInsertionOrder != -1) { value.append(PerformanceAnalyzerMetrics.getCurrentTimeMetric()); - PerformanceAnalyzerMetrics.addMetricEntry(value, Master_Metric_Values.FINISH_TIME.toString(), + PerformanceAnalyzerMetrics.addMetricEntry(value, MasterMetricValues.FINISH_TIME.toString(), startTime - SAMPLING_TIME_INTERVAL / 2); saveMetricValues(value.toString(), startTime, String.valueOf(currentThreadId), String.valueOf(lastTaskInsertionOrder), PerformanceAnalyzerMetrics.FINISH_FILE_NAME); diff --git a/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/metrics/AllMetrics.java b/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/metrics/AllMetrics.java index f7199517..876263b4 100644 --- a/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/metrics/AllMetrics.java +++ b/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/metrics/AllMetrics.java @@ -576,15 +576,17 @@ public static class Constants { } } - public enum Master_Metric_Dimensions implements MetricDimension { - MASTER_TASK_PRIORITY("Master_Task_Priority"), - MASTER_TASK_TYPE("Master_Task_Type"), - MASTER_TASK_METADATA("Master_Task_Metadata"), - MASTER_TASK_AGE("Master_Task_Age"); + public enum MasterMetricDimensions implements MetricDimension { + MASTER_TASK_PRIORITY("MasterTaskPriority"), + MASTER_TASK_TYPE("MasterTaskType"), + MASTER_TASK_METADATA("MasterTaskMetadata"), + MASTER_TASK_QUEUE_TIME("MasterTaskQueueTime"), + MASTER_TASK_RUN_TIME("MasterTaskRunTime"), + MASTER_TASK_INSERT_ORDER("MasterTaskInsertOrder"); private final String value; - Master_Metric_Dimensions(String value) { + MasterMetricDimensions(String value) { this.value = value; } @@ -594,14 +596,16 @@ public String toString() { } } - public enum Master_Metric_Values implements MetricValue { + public enum MasterMetricValues implements MetricValue { //-todo : Migrate to CommonMetric.Constants + MASTER_TASK_QUEUE_TIME("Master_Task_Queue_Time"), + MASTER_TASK_RUN_TIME("Master_Task_Run_Time"), START_TIME("StartTime"), FINISH_TIME("FinishTime"); private final String value; - Master_Metric_Values(String value) { + MasterMetricValues(String value) { this.value = value; } diff --git a/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/model/MetricsModel.java b/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/model/MetricsModel.java index 8f0877b3..e0d1bd13 100644 --- a/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/model/MetricsModel.java +++ b/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/model/MetricsModel.java @@ -18,6 +18,7 @@ import java.util.HashMap; import java.util.Map; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.metrics.AllMetrics; import com.amazon.opendistro.elasticsearch.performanceanalyzer.metrics.AllMetrics.AggregatedOSDimension; import com.amazon.opendistro.elasticsearch.performanceanalyzer.metrics.AllMetrics.CircuitBreakerDimension; import com.amazon.opendistro.elasticsearch.performanceanalyzer.metrics.AllMetrics.CircuitBreakerValue; @@ -227,5 +228,11 @@ public class MetricsModel { // Master Metrics ALL_METRICS.put(MasterPendingValue.MASTER_PENDING_QUEUE_SIZE.toString(), new MetricAttributes(MetricUnits.COUNT.toString(), EmptyDimension.values())); + + ALL_METRICS.put(AllMetrics.MasterMetricValues.MASTER_TASK_QUEUE_TIME.toString(), + new MetricAttributes(MetricUnits.MILLISECOND.toString(), AllMetrics.MasterMetricDimensions.values())); + + ALL_METRICS.put(AllMetrics.MasterMetricValues.MASTER_TASK_RUN_TIME.toString(), + new MetricAttributes(MetricUnits.MILLISECOND.toString(), AllMetrics.MasterMetricDimensions.values())); } } diff --git a/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/reader/MasterEventMetricsSnapshot.java b/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/reader/MasterEventMetricsSnapshot.java new file mode 100644 index 00000000..644fc637 --- /dev/null +++ b/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/reader/MasterEventMetricsSnapshot.java @@ -0,0 +1,311 @@ +/* + * Copyright <2019> Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package com.amazon.opendistro.elasticsearch.performanceanalyzer.reader; + +import com.amazon.opendistro.elasticsearch.performanceanalyzer.DBUtils; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.metrics.AllMetrics; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.metrics.MetricsConfiguration; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.metricsdb.MetricsDB; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.jooq.BatchBindStep; +import org.jooq.DSLContext; +import org.jooq.Field; +import org.jooq.Record; +import org.jooq.Result; +import org.jooq.SQLDialect; +import org.jooq.SelectField; +import org.jooq.SelectHavingStep; +import org.jooq.impl.DSL; + + +import java.sql.Connection; +import java.util.ArrayList; +import java.util.List; + + +public class MasterEventMetricsSnapshot implements Removable { + private static final Logger LOG = LogManager.getLogger(MasterEventMetricsSnapshot.class); + + private final DSLContext create; + private final Long windowStartTime; + private final String tableName; + private static final Long EXPIRE_AFTER = 1200000L; + private List> columns; + + + public enum Fields { + + TID("tid"), + IS_CURRENT("isCurrent"), + OLD_START("oldStart"), + ST("st"), + ET("et"), + LAT ("lat"); + + private final String fieldValue; + + Fields(String fieldValue) { + this.fieldValue = fieldValue; + } + + @Override + public String toString() { + return fieldValue; + } + }; + + public MasterEventMetricsSnapshot(Connection conn, Long windowStartTime) { + this.create = DSL.using(conn, SQLDialect.SQLITE); + this.windowStartTime = windowStartTime; + this.tableName = "master_event_" + windowStartTime; + + this.columns = new ArrayList>() { { + this.add(DSL.field(DSL.name(Fields.TID.toString()), String.class)); + this.add(DSL.field(DSL.name(AllMetrics.MasterMetricDimensions.MASTER_TASK_INSERT_ORDER.toString()), String.class)); + this.add(DSL.field(DSL.name(AllMetrics.MasterMetricDimensions.MASTER_TASK_PRIORITY.toString()), String.class)); + this.add(DSL.field(DSL.name(AllMetrics.MasterMetricDimensions.MASTER_TASK_TYPE.toString()), String.class)); + this.add(DSL.field(DSL.name(AllMetrics.MasterMetricDimensions.MASTER_TASK_METADATA.toString()), String.class)); + this.add(DSL.field(DSL.name(AllMetrics.MasterMetricDimensions.MASTER_TASK_QUEUE_TIME.toString()), String.class)); + this.add(DSL.field(DSL.name(Fields.ST.toString()), Long.class)); + this.add(DSL.field(DSL.name(Fields.ET.toString()), Long.class)); + } }; + + create.createTable(this.tableName) + .columns(columns) + .execute(); + } + + + @Override + public void remove() throws Exception { + + create.dropTable(DSL.table(this.tableName)).execute(); + } + + public void rolloverInflightRequests(MasterEventMetricsSnapshot prevSnap) { + //Fetch all entries that have not ended and write to current table. + create.insertInto(DSL.table(this.tableName)).select(prevSnap.fetchInflightRequests()).execute(); + + LOG.debug("Inflight shard requests"); + LOG.debug(() -> fetchAll()); + } + + private SelectHavingStep fetchInflightRequests() { + + ArrayList> fields = new ArrayList>() { { + this.add(DSL.field(DSL.name(Fields.TID.toString()), String.class)); + this.add(DSL.field(DSL.name(AllMetrics.MasterMetricDimensions.MASTER_TASK_INSERT_ORDER.toString()), String.class)); + this.add(DSL.field(DSL.name(AllMetrics.MasterMetricDimensions.MASTER_TASK_PRIORITY.toString()), String.class)); + this.add(DSL.field(DSL.name(AllMetrics.MasterMetricDimensions.MASTER_TASK_TYPE.toString()), String.class)); + this.add(DSL.field(DSL.name(AllMetrics.MasterMetricDimensions.MASTER_TASK_METADATA.toString()), String.class)); + this.add(DSL.field(DSL.name(AllMetrics.MasterMetricDimensions.MASTER_TASK_QUEUE_TIME.toString()), String.class)); + this.add(DSL.field(DSL.name(Fields.ST.toString()), Long.class)); + this.add(DSL.field(DSL.name(Fields.ET.toString()), Long.class)); + } }; + + return create.select(fields).from(groupByInsertOrder()) + .where(DSL.field(Fields.ST.toString()).isNotNull() + .and(DSL.field(Fields.ET.toString()).isNull()) + .and(DSL.field(Fields.ST.toString()).gt(this.windowStartTime - EXPIRE_AFTER))); + } + + + /** + * Return all master task event in the current window. + * + * Actual Table + * |tid |insertOrder|taskType |priority|queueTime|metadata| st| et| + * +-----+-----------+------------+--------+---------+--------+-------------+-------------+ + * |111 |1 |create-index|urgent |3 |{string}|1535065340625| {null}| + * |111 |2 |create-index|urgent |12 |{string}|1535065340825| {null}| + * |111 |1 | {null}| {null}| {null}| {null}| {null}|1535065340725| + * + * @return aggregated master task + */ + public Result fetchAll() { + + return create.select().from(DSL.table(this.tableName)).fetch(); + } + + public BatchBindStep startBatchPut() { + + List dummyValues = new ArrayList<>(); + for (int i = 0; i < columns.size(); i++) { + dummyValues.add(null); + } + return create.batch(create.insertInto(DSL.table(this.tableName)).values(dummyValues)); + } + + + /** + * Return one row per master task event. Group by the InsertOrder. + * It has 12 columns + * |InsertOrder|Priority|Type|Metadata|SUM_QueueTime|AVG_QueueTime|MIN_QueueTime|MAX_QueueTime| + * SUM_RUNTIME|AVG_RUNTIME|MIN_RUNTIME|MAX_RUNTIME| + * + * @return aggregated master task + */ + public Result fetchQueueAndRunTime() { + + List> fields = new ArrayList>() { { + this.add(DSL.field(DSL.name(AllMetrics.MasterMetricDimensions.MASTER_TASK_INSERT_ORDER.toString()), String.class)); + this.add(DSL.field(DSL.name(AllMetrics.MasterMetricDimensions.MASTER_TASK_PRIORITY.toString()), String.class)); + this.add(DSL.field(DSL.name(AllMetrics.MasterMetricDimensions.MASTER_TASK_TYPE.toString()), String.class)); + this.add(DSL.field(DSL.name(AllMetrics.MasterMetricDimensions.MASTER_TASK_METADATA.toString()), String.class)); + + this.add(DSL.sum(DSL.field(DSL.name(AllMetrics.MasterMetricDimensions.MASTER_TASK_QUEUE_TIME.toString()), Double.class)) + .as(DBUtils.getAggFieldName(AllMetrics.MasterMetricDimensions.MASTER_TASK_QUEUE_TIME.toString(), MetricsDB.SUM))); + this.add(DSL.avg(DSL.field(DSL.name(AllMetrics.MasterMetricDimensions.MASTER_TASK_QUEUE_TIME.toString()), Double.class)) + .as(DBUtils.getAggFieldName(AllMetrics.MasterMetricDimensions.MASTER_TASK_QUEUE_TIME.toString(), MetricsDB.AVG))); + this.add(DSL.min(DSL.field(DSL.name(AllMetrics.MasterMetricDimensions.MASTER_TASK_QUEUE_TIME.toString()), Double.class)) + .as(DBUtils.getAggFieldName(AllMetrics.MasterMetricDimensions.MASTER_TASK_QUEUE_TIME.toString(), MetricsDB.MIN))); + this.add(DSL.max(DSL.field(DSL.name(AllMetrics.MasterMetricDimensions.MASTER_TASK_QUEUE_TIME.toString()), Double.class)) + .as(DBUtils.getAggFieldName(AllMetrics.MasterMetricDimensions.MASTER_TASK_QUEUE_TIME.toString(), MetricsDB.MAX))); + + this.add(DSL.sum(DSL.field(DSL.name(AllMetrics.MasterMetricDimensions.MASTER_TASK_RUN_TIME.toString()), Double.class)) + .as(DBUtils.getAggFieldName(AllMetrics.MasterMetricDimensions.MASTER_TASK_RUN_TIME.toString(), MetricsDB.SUM))); + this.add(DSL.avg(DSL.field(DSL.name(AllMetrics.MasterMetricDimensions.MASTER_TASK_RUN_TIME.toString()), Double.class)) + .as(DBUtils.getAggFieldName(AllMetrics.MasterMetricDimensions.MASTER_TASK_RUN_TIME.toString(), MetricsDB.AVG))); + this.add(DSL.min(DSL.field(DSL.name(AllMetrics.MasterMetricDimensions.MASTER_TASK_RUN_TIME.toString()), Double.class)) + .as(DBUtils.getAggFieldName(AllMetrics.MasterMetricDimensions.MASTER_TASK_RUN_TIME.toString(), MetricsDB.MIN))); + this.add(DSL.max(DSL.field(DSL.name(AllMetrics.MasterMetricDimensions.MASTER_TASK_RUN_TIME.toString()), Double.class)) + .as(DBUtils.getAggFieldName(AllMetrics.MasterMetricDimensions.MASTER_TASK_RUN_TIME.toString(), MetricsDB.MAX))); + } }; + + ArrayList> groupByFields = new ArrayList>() { { + this.add(DSL.field(DSL.name(AllMetrics.MasterMetricDimensions.MASTER_TASK_INSERT_ORDER.toString()), String.class)); + } }; + + return create.select(fields).from(fetchRunTimeHelper()) + .groupBy(groupByFields) + .fetch(); + } + + private SelectHavingStep fetchRunTimeHelper() { + + List> fields = new ArrayList>() { { + this.add(DSL.field(DSL.name(AllMetrics.MasterMetricDimensions.MASTER_TASK_INSERT_ORDER.toString()), String.class)); + this.add(DSL.field(DSL.name(AllMetrics.MasterMetricDimensions.MASTER_TASK_PRIORITY.toString()), String.class)); + this.add(DSL.field(DSL.name(AllMetrics.MasterMetricDimensions.MASTER_TASK_TYPE.toString()), String.class)); + this.add(DSL.field(DSL.name(AllMetrics.MasterMetricDimensions.MASTER_TASK_METADATA.toString()), String.class)); + this.add(DSL.field(DSL.name(AllMetrics.MasterMetricDimensions.MASTER_TASK_QUEUE_TIME.toString()), String.class)); + this.add(DSL.field(Fields.ET.toString()).minus(DSL.field(Fields.ST.toString())). + as(DSL.name(AllMetrics.MasterMetricDimensions.MASTER_TASK_RUN_TIME.toString()))); + } }; + + return create.select(fields).from(groupByInsertOrderAndAutoFillEndTime()) + .where(DSL.field(Fields.ET.toString()).isNotNull().and( + DSL.field(Fields.ST.toString()).isNotNull())); + } + + + /** + * Return one row per master task event. Group by the InsertOrder. + * For a master task without a finish event, we will use the current window end time + * + * CurrentWindowEndTime: 1535065341025 + * Actual Table + * |tid |insertOrder|taskType |priority|queueTime|metadata| st| et| + * +-----+-----------+------------+--------+---------+--------+-------------+-------------+ + * |111 |1 |create-index|urgent |3 |{string}|1535065340625| {null}| + * |111 |2 |create-index|urgent |12 |{string}|1535065340825| {null}| + * |111 |1 | {null}| {null}| {null}| {null}| {null}|1535065340725| + * + * Returned: + * + * |tid |insertOrder|taskType |priority|queueTime|metadata| st| et| + * +-----+-----------+------------+--------+---------+--------+-------------+-------------+ + * |111 |1 |create-index|urgent |3 |{string}|1535065340625|1535065340725| + * |111 |2 |create-index|urgent |12 |{string}|1535065340825|1535065341025| + * + * @return aggregated master task + */ + private SelectHavingStep groupByInsertOrderAndAutoFillEndTime() { + + Long endTime = windowStartTime + MetricsConfiguration.SAMPLING_INTERVAL; + List> fields = getGroupByInsertOrderSelectFields(); + fields.add(DSL.least(DSL.coalesce(DSL.max(DSL.field(Fields.ET.toString(), Long.class)), endTime), endTime) + .as(DSL.name(Fields.ET.toString()))); + + ArrayList> groupByInsertOrder = new ArrayList>() { { + this.add(DSL.field(DSL.name(AllMetrics.MasterMetricDimensions.MASTER_TASK_INSERT_ORDER.toString()), String.class)); + } }; + + return create.select(fields) + .from(DSL.table(this.tableName)) + .groupBy(groupByInsertOrder); + } + + /** + * Return one row per master task event. Group by the InsertOrder, with possible et remains as null + * + * Actual Table + * |tid |insertOrder|taskType |priority|queueTime|metadata| st| et| + * +-----+-----------+------------+--------+---------+--------+-------------+-------------+ + * |111 |1 |create-index|urgent |3 |{string}|1535065340625| {null}| + * |111 |2 |create-index|urgent |12 |{string}|1535065340825| {null}| + * |111 |1 | {null}| {null}| {null}| {null}| {null}|1535065340725| + * + * Returned: + * + * |tid |insertOrder|taskType |priority|queueTime|metadata| st| et| + * +-----+-----------+------------+--------+---------+--------+-------------+-------------+ + * |111 |1 |create-index|urgent |3 |{string}|1535065340625|1535065340725| + * |111 |2 |create-index|urgent |12 |{string}|1535065340825| {null}| + * + * @return aggregated latency rows for each shard request + */ + private SelectHavingStep groupByInsertOrder() { + + ArrayList> fields = getGroupByInsertOrderSelectFields(); + + fields.add(DSL.max(DSL.field(Fields.ET.toString(), Long.class)).as(DSL.name(Fields.ET.toString()))); + fields.add(DSL.field(DSL.name(Fields.TID.toString()), String.class)); + + ArrayList> groupByInsertOrder = new ArrayList>() { { + this.add(DSL.field(DSL.name(AllMetrics.MasterMetricDimensions.MASTER_TASK_INSERT_ORDER.toString()), String.class)); + } }; + + return create.select(fields) + .from(DSL.table(this.tableName)) + .groupBy(groupByInsertOrder); + } + + private ArrayList> getGroupByInsertOrderSelectFields() { + + ArrayList> fields = new ArrayList>() { { + this.add(DSL.field(DSL.name(AllMetrics.MasterMetricDimensions.MASTER_TASK_INSERT_ORDER.toString()), String.class)); + + this.add(DSL.max(DSL.field(AllMetrics.MasterMetricDimensions.MASTER_TASK_TYPE.toString())) + .as(DSL.name(AllMetrics.MasterMetricDimensions.MASTER_TASK_TYPE.toString()))); + + this.add(DSL.max(DSL.field(AllMetrics.MasterMetricDimensions.MASTER_TASK_METADATA.toString())) + .as(DSL.name(AllMetrics.MasterMetricDimensions.MASTER_TASK_METADATA.toString()))); + + this.add(DSL.max(DSL.field(AllMetrics.MasterMetricDimensions.MASTER_TASK_QUEUE_TIME.toString())) + .as(DSL.name(AllMetrics.MasterMetricDimensions.MASTER_TASK_QUEUE_TIME.toString()))); + + this.add(DSL.max(DSL.field(AllMetrics.MasterMetricDimensions.MASTER_TASK_PRIORITY.toString())) + .as(DSL.name(AllMetrics.MasterMetricDimensions.MASTER_TASK_PRIORITY.toString()))); + + this.add(DSL.max(DSL.field(Fields.ST.toString(), Long.class)).as(DSL.name(Fields.ST.toString()))); + + } }; + + return fields; + } +} diff --git a/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/reader/MetricsEmitter.java b/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/reader/MetricsEmitter.java index c2e0f1c3..408442ac 100644 --- a/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/reader/MetricsEmitter.java +++ b/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/reader/MetricsEmitter.java @@ -61,6 +61,16 @@ public class MetricsEmitter { private static final Pattern TRANS_SERVER_PATTERN = Pattern.compile(".*elasticsearch.*\\[transport_server_worker.*"); private static final Pattern TRANS_CLIENT_PATTERN = Pattern.compile(".*elasticsearch.*\\[transport_client_boss\\].*"); + private static final List LATENCY_TABLE_DIMENSIONS = new ArrayList() { { + this.add(ShardRequestMetricsSnapshot.Fields.OPERATION.toString()); + this.add(HttpRequestMetricsSnapshot.Fields.EXCEPTION.toString()); + this.add(HttpRequestMetricsSnapshot.Fields.INDICES.toString()); + this.add(HttpRequestMetricsSnapshot.Fields.HTTP_RESP_CODE.toString()); + this.add(ShardRequestMetricsSnapshot.Fields.SHARD_ID.toString()); + this.add(ShardRequestMetricsSnapshot.Fields.INDEX_NAME.toString()); + this.add(ShardRequestMetricsSnapshot.Fields.SHARD_ROLE.toString()); + } }; + public static void emitAggregatedOSMetrics(final DSLContext create, final MetricsDB db, final OSMetricsSnapshot osMetricsSnap, @@ -195,20 +205,11 @@ public static void emitWorkloadMetrics(final DSLContext create, final MetricsDB final ShardRequestMetricsSnapshot rqMetricsSnap) throws Exception { long mCurrT = System.currentTimeMillis(); Result res = rqMetricsSnap.fetchLatencyByOp(); - List dims = new ArrayList() { { - this.add(ShardRequestMetricsSnapshot.Fields.OPERATION.toString()); - this.add(HttpRequestMetricsSnapshot.Fields.EXCEPTION.toString()); - this.add(HttpRequestMetricsSnapshot.Fields.INDICES.toString()); - this.add(HttpRequestMetricsSnapshot.Fields.HTTP_RESP_CODE.toString()); - this.add(ShardRequestMetricsSnapshot.Fields.SHARD_ID.toString()); - this.add(ShardRequestMetricsSnapshot.Fields.INDEX_NAME.toString()); - this.add(ShardRequestMetricsSnapshot.Fields.SHARD_ROLE.toString()); - } }; db.createMetric(new Metric(CommonMetric.LATENCY.toString(), 0d), - dims); + LATENCY_TABLE_DIMENSIONS); BatchBindStep handle = db.startBatchPut(new Metric( - CommonMetric.LATENCY.toString(), 0d), dims); + CommonMetric.LATENCY.toString(), 0d), LATENCY_TABLE_DIMENSIONS); //Dims need to be changed. List shardDims = new ArrayList() { { @@ -381,8 +382,10 @@ public static void emitHttpMetrics(final DSLContext create, final MetricsDB db, this.add(HttpRequestMetricsSnapshot.Fields.INDICES.toString()); this.add(HttpRequestMetricsSnapshot.Fields.HTTP_RESP_CODE.toString()); } }; + db.createMetric(new Metric(AllMetrics.CommonMetric.LATENCY.toString(), 0d), - dims); + LATENCY_TABLE_DIMENSIONS); + db.createMetric(new Metric(AllMetrics.HttpMetric.HTTP_TOTAL_REQUESTS.toString(), 0d), dims); db.createMetric(new Metric(AllMetrics.HttpMetric.HTTP_REQUEST_DOCS.toString(), 0d), @@ -440,6 +443,103 @@ public static void emitHttpMetrics(final DSLContext create, final MetricsDB db, LOG.info("Total time taken for writing http metrics metricsdb: {}", mFinalT - mCurrT); } + public static void emitMasterEventMetrics(MetricsDB metricsDB, MasterEventMetricsSnapshot masterEventMetricsSnapshot) { + + long mCurrT = System.currentTimeMillis(); + Result queueAndRunTimeResult = masterEventMetricsSnapshot.fetchQueueAndRunTime(); + + List dims = new ArrayList() { { + this.add(AllMetrics.MasterMetricDimensions.MASTER_TASK_INSERT_ORDER.toString()); + this.add(AllMetrics.MasterMetricDimensions.MASTER_TASK_PRIORITY.toString()); + this.add(AllMetrics.MasterMetricDimensions.MASTER_TASK_TYPE.toString()); + this.add(AllMetrics.MasterMetricDimensions.MASTER_TASK_METADATA.toString()); + } }; + + emitQueueTimeMetric(metricsDB, queueAndRunTimeResult, dims); + emitRuntimeMetric(metricsDB, queueAndRunTimeResult, dims); + + long mFinalT = System.currentTimeMillis(); + LOG.info("Total time taken for writing master event queue metrics metricsdb: {}", mFinalT - mCurrT); + } + + private static void emitRuntimeMetric(MetricsDB metricsDB, Result res, List dims) { + + metricsDB.createMetric( + new Metric(AllMetrics.MasterMetricValues.MASTER_TASK_RUN_TIME.toString(), 0d), dims); + + BatchBindStep handle = metricsDB.startBatchPut(new Metric( + AllMetrics.MasterMetricValues.MASTER_TASK_RUN_TIME.toString(), 0d), dims); + + for (Record r: res) { + + Double sumQueueTime = Double.parseDouble(r.get(DBUtils. + getAggFieldName(AllMetrics.MasterMetricDimensions.MASTER_TASK_RUN_TIME.toString(), MetricsDB.SUM)) + .toString()); + + Double avgQueueTime = Double.parseDouble(r.get(DBUtils. + getAggFieldName(AllMetrics.MasterMetricDimensions.MASTER_TASK_RUN_TIME.toString(), MetricsDB.AVG)) + .toString()); + + Double minQueueTime = Double.parseDouble(r.get(DBUtils. + getAggFieldName(AllMetrics.MasterMetricDimensions.MASTER_TASK_RUN_TIME.toString(), MetricsDB.MIN)) + .toString()); + + Double maxQueueTime = Double.parseDouble(r.get(DBUtils. + getAggFieldName(AllMetrics.MasterMetricDimensions.MASTER_TASK_RUN_TIME.toString(), MetricsDB.MAX)) + .toString()); + + handle.bind(r.get(AllMetrics.MasterMetricDimensions.MASTER_TASK_INSERT_ORDER.toString()).toString(), + r.get(AllMetrics.MasterMetricDimensions.MASTER_TASK_PRIORITY.toString()).toString(), + r.get(AllMetrics.MasterMetricDimensions.MASTER_TASK_TYPE.toString()).toString(), + r.get(AllMetrics.MasterMetricDimensions.MASTER_TASK_METADATA.toString()).toString(), + sumQueueTime, + avgQueueTime, + minQueueTime, + maxQueueTime); + } + + handle.execute(); + } + + private static void emitQueueTimeMetric(MetricsDB metricsDB, Result res, List dims) { + + metricsDB.createMetric( + new Metric(AllMetrics.MasterMetricValues.MASTER_TASK_QUEUE_TIME.toString(), 0d), dims); + + BatchBindStep handle = metricsDB.startBatchPut(new Metric( + AllMetrics.MasterMetricValues.MASTER_TASK_QUEUE_TIME.toString(), 0d), dims); + + for (Record r: res) { + + Double sumQueueTime = Double.parseDouble(r.get(DBUtils. + getAggFieldName(AllMetrics.MasterMetricDimensions.MASTER_TASK_QUEUE_TIME.toString(), MetricsDB.SUM)) + .toString()); + + Double avgQueueTime = Double.parseDouble(r.get(DBUtils. + getAggFieldName(AllMetrics.MasterMetricDimensions.MASTER_TASK_QUEUE_TIME.toString(), MetricsDB.AVG)) + .toString()); + + Double minQueueTime = Double.parseDouble(r.get(DBUtils. + getAggFieldName(AllMetrics.MasterMetricDimensions.MASTER_TASK_QUEUE_TIME.toString(), MetricsDB.MIN)) + .toString()); + + Double maxQueueTime = Double.parseDouble(r.get(DBUtils. + getAggFieldName(AllMetrics.MasterMetricDimensions.MASTER_TASK_QUEUE_TIME.toString(), MetricsDB.MAX)) + .toString()); + + handle.bind(r.get(AllMetrics.MasterMetricDimensions.MASTER_TASK_INSERT_ORDER.toString()).toString(), + r.get(AllMetrics.MasterMetricDimensions.MASTER_TASK_PRIORITY.toString()).toString(), + r.get(AllMetrics.MasterMetricDimensions.MASTER_TASK_TYPE.toString()).toString(), + r.get(AllMetrics.MasterMetricDimensions.MASTER_TASK_METADATA.toString()).toString(), + sumQueueTime, + avgQueueTime, + minQueueTime, + maxQueueTime); + } + + handle.execute(); + } + /** * TODO: Some of these metrics have default value like tcp.SSThresh:-1. * Should we count them in aggregation? diff --git a/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/reader/MetricsParser.java b/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/reader/MetricsParser.java index f01b60ff..f1395729 100644 --- a/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/reader/MetricsParser.java +++ b/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/reader/MetricsParser.java @@ -18,8 +18,10 @@ import java.io.File; import java.util.ArrayList; import java.util.HashMap; +import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.Queue; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -31,6 +33,8 @@ import com.amazon.opendistro.elasticsearch.performanceanalyzer.metrics.AllMetrics.OSMetrics; import com.amazon.opendistro.elasticsearch.performanceanalyzer.metrics.AllMetrics.ShardBulkDimension; import com.amazon.opendistro.elasticsearch.performanceanalyzer.metrics.AllMetrics.ShardBulkMetric; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.metrics.AllMetrics.CommonMetric; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.metrics.AllMetrics.MasterMetricDimensions; import com.amazon.opendistro.elasticsearch.performanceanalyzer.metrics.PerformanceAnalyzerMetrics; import com.amazon.opendistro.elasticsearch.performanceanalyzer.util.FileHelper; @@ -159,6 +163,159 @@ public void parseHttpMetrics(String rootLocation, long startTime, } } + public void parseMasterEventMetrics(String rootLocation, + long startTime, + long endTime, + MasterEventMetricsSnapshot masterEventMetricsSnapshot) { + + long startTimeThirtySecondBucket = getThirtySecondBucket(startTime); + File threadsFile = new File(rootLocation + File.separator + + startTimeThirtySecondBucket + File.separator + + PerformanceAnalyzerMetrics.sThreadsPath); + + BatchBindStep handle = masterEventMetricsSnapshot.startBatchPut(); + if (threadsFile.exists()) { + + try { + // Perform level order traversal on file directories + Queue queue = new LinkedList<>(); + Queue idQueue = new LinkedList<>(); + + expandThreadDirectory(threadsFile, queue); + expandThreadIDDirectory(queue, idQueue); + expandOperationDirectory(queue, idQueue); + expandInsertOrderDirectory(queue, idQueue); + + emitMasterStartFinishMetrics(startTime, endTime, handle, queue, idQueue); + } catch (Exception e) { + LOG.error("Failed to parse master metrics", e); + } + } + + if (handle.size() > 0) { + handle.execute(); + } + } + + private void emitMasterStartFinishMetrics(long startTime, + long endTime, + BatchBindStep handle, + Queue queue, + Queue idQueue) { + + // process start and finish + while (!queue.isEmpty()) { + File metricsFile = queue.poll(); + String threadID = idQueue.poll(); + String insertOder = idQueue.poll(); + + long lastModified = FileHelper.getLastModified(metricsFile, startTime, endTime); + if (lastModified < startTime || lastModified >= endTime) { + continue; + } + + String metrics = PerformanceAnalyzerMetrics.getMetric(metricsFile.getAbsolutePath()); + try { + if (metricsFile.getName().equals(PerformanceAnalyzerMetrics.START_FILE_NAME)) { + emitStartMasterEventMetric(metrics, insertOder, threadID, handle); + } else if (metricsFile.getName().equals(PerformanceAnalyzerMetrics.FINISH_FILE_NAME)) { + emitEndMasterEventMetric(metrics, insertOder, threadID, handle); + } + } catch (Exception e) { + LOG.error(e, e); + LOG.error("Error parsing file - {},\n {}", metricsFile.getAbsolutePath(), metrics); + } + } + } + + private void expandInsertOrderDirectory(Queue queue, Queue idQueue) { + + int size = queue.size(); + for (int i = 0; i < size; i++) { + File insertOrderFile = queue.poll(); + String threadID = idQueue.poll(); + String insertOder = insertOrderFile.getName(); + + for (File metricsFile: insertOrderFile.listFiles()) { + queue.add(metricsFile); + idQueue.add(threadID); + idQueue.add(insertOder); + } + } + } + + private void expandOperationDirectory(Queue queue, Queue idQueue) { + + int size = queue.size(); + for (int i = 0; i < size; i++) { + File opFile = queue.poll(); + String threadId = idQueue.poll(); + + for (File insertOrderFile : opFile.listFiles()) { + queue.add(insertOrderFile); + idQueue.add(threadId); + } + } + } + + private void expandThreadIDDirectory(Queue queue, Queue idQueue) { + + int size = queue.size(); + for (int i = 0; i < size; i++) { + File threadIDFile = queue.poll(); + String threadID = threadIDFile.getName(); + + for (File opFile : threadIDFile.listFiles()) { + if (opFile.getName().equals(PerformanceAnalyzerMetrics.sMasterTaskPath)) { + queue.add(opFile); + idQueue.add(threadID); + } + } + } + } + + private void expandThreadDirectory(File threadsFile, Queue queue) { + + for (File threadIDFile: threadsFile.listFiles()) { + if (!threadIDFile.getName().equals(PerformanceAnalyzerMetrics.sHttpPath)) { + queue.add(threadIDFile); + } + } + } + + private void emitStartMasterEventMetric(String startMetrics, + String insertOrder, + String threadId, + BatchBindStep handle) { + + String priority = PerformanceAnalyzerMetrics.extractMetricValue(startMetrics, + MasterMetricDimensions.MASTER_TASK_PRIORITY.toString()); + + long st = Long.parseLong(PerformanceAnalyzerMetrics.extractMetricValue(startMetrics, + CommonMetric.START_TIME.toString())); + + String taskType = PerformanceAnalyzerMetrics.extractMetricValue(startMetrics, + MasterMetricDimensions.MASTER_TASK_TYPE.toString()); + + String taskMetadata = PerformanceAnalyzerMetrics.extractMetricValue(startMetrics, + MasterMetricDimensions.MASTER_TASK_METADATA.toString()); + + long queueTime = Long.parseLong(PerformanceAnalyzerMetrics.extractMetricValue(startMetrics, + MasterMetricDimensions.MASTER_TASK_QUEUE_TIME.toString())); + + handle.bind(threadId, insertOrder, priority, taskType, taskMetadata, queueTime, st, null); + } + + private void emitEndMasterEventMetric(String startMetrics, + String insertOrder, + String threadId, + BatchBindStep handle) { + + long finishTime = Long.parseLong(PerformanceAnalyzerMetrics.extractMetricValue(startMetrics, + CommonMetric.FINISH_TIME.toString())); + handle.bind(threadId, insertOrder, null, null, null, null, null, finishTime); + } + private void emitStartHttpMetric(File metricFile, String rid, String operation, BatchBindStep handle) { diff --git a/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/reader/ReaderMetricsProcessor.java b/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/reader/ReaderMetricsProcessor.java index 0fc73caf..ae45ee4b 100644 --- a/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/reader/ReaderMetricsProcessor.java +++ b/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/reader/ReaderMetricsProcessor.java @@ -52,11 +52,13 @@ public class ReaderMetricsProcessor implements Runnable { private NavigableMap osMetricsMap; private NavigableMap shardRqMetricsMap; private NavigableMap httpRqMetricsMap; + private NavigableMap masterEventMetricsMap; private Map> nodeMetricsMap; private static final int MAX_DATABASES = 4; private static final int OS_SNAPSHOTS = 4; private static final int RQ_SNAPSHOTS = 4; private static final int HTTP_RQ_SNAPSHOTS = 4; + private static final int MASTER_EVENT_SNAPSHOTS = 4; private final MetricsParser metricsParser; private final String rootLocation; @@ -69,6 +71,7 @@ public ReaderMetricsProcessor(String rootLocation) throws Exception { osMetricsMap = new TreeMap<>(); shardRqMetricsMap = new TreeMap<>(); httpRqMetricsMap = new TreeMap<>(); + masterEventMetricsMap = new TreeMap<>(); metricsParser = new MetricsParser(); this.rootLocation = rootLocation; @@ -166,6 +169,7 @@ public void trimOldSnapshots() throws Exception { trimMap(osMetricsMap, OS_SNAPSHOTS); trimMap(shardRqMetricsMap, RQ_SNAPSHOTS); trimMap(httpRqMetricsMap, HTTP_RQ_SNAPSHOTS); + trimMap(masterEventMetricsMap, MASTER_EVENT_SNAPSHOTS); trimMap(metricsDBMap, MAX_DATABASES); for (NavigableMap snap : nodeMetricsMap .values()) { @@ -375,47 +379,93 @@ public void emitMetrics(long currWindowStartTime) throws Exception { } long mCurrT = System.currentTimeMillis(); - - Map.Entry prevRqEntry = shardRqMetricsMap.floorEntry(prevWindowStartTime); - if (prevRqEntry == null) { - LOG.info("Request snapshot for the previous window does not exist. Not emitting metrics."); - return; - } - ShardRequestMetricsSnapshot prevRqSnap = prevRqEntry.getValue(); - prevWindowStartTime = prevRqEntry.getKey(); - //This is object holds a reference to the temporary os snapshot. It is used to delete tables at the end of this //reader cycle. The OSMetricsSnapshot expects windowEndTime in the constructor. OSMetricsSnapshot alignedOSSnapHolder = new OSMetricsSnapshot(this.conn, "os_aligned_", currWindowStartTime); - OSMetricsSnapshot osAlignedSnap = alignOSMetrics(prevWindowStartTime, prevWindowStartTime + MetricsConfiguration.SAMPLING_INTERVAL, alignedOSSnapHolder); long mFinalT = System.currentTimeMillis(); LOG.info("Total time taken for aligning OS Metrics: {}", mFinalT - mCurrT); - if (osAlignedSnap == null) { - LOG.info("OS snapshot for the previous window does not exist. Not emitting metrics."); - alignedOSSnapHolder.remove(); - return; - } - - MetricsDB metricsDB = createMetricsDB(prevWindowStartTime); mCurrT = System.currentTimeMillis(); - MetricsEmitter.emitAggregatedOSMetrics(create, metricsDB, osAlignedSnap, prevRqSnap); - MetricsEmitter.emitWorkloadMetrics(create, metricsDB, prevRqSnap); - MetricsEmitter.emitThreadNameMetrics(create, metricsDB, osAlignedSnap); - HttpRequestMetricsSnapshot prevHttpRqSnap = httpRqMetricsMap.get(prevWindowStartTime); - MetricsEmitter.emitHttpMetrics(create, metricsDB, prevHttpRqSnap); - alignedOSSnapHolder.remove(); + MetricsDB metricsDB = createMetricsDB(prevWindowStartTime); + + emitMasterMetrics(prevWindowStartTime, metricsDB); + emitShardRequestMetrics(prevWindowStartTime, alignedOSSnapHolder, osAlignedSnap, metricsDB); + emitHttpRequestMetrics(prevWindowStartTime, metricsDB); emitNodeMetrics(currWindowStartTime, metricsDB); + metricsDB.commit(); metricsDBMap.put(prevWindowStartTime, metricsDB); mFinalT = System.currentTimeMillis(); LOG.info("Total time taken for emitting Metrics: {}", mFinalT - mCurrT); } + private void emitHttpRequestMetrics(long prevWindowStartTime, MetricsDB metricsDB) throws Exception { + + if (httpRqMetricsMap.containsKey(prevWindowStartTime)) { + + HttpRequestMetricsSnapshot prevHttpRqSnap = httpRqMetricsMap.get(prevWindowStartTime); + MetricsEmitter.emitHttpMetrics(create, metricsDB, prevHttpRqSnap); + }else { + LOG.info("Http request snapshot for the previous window does not exist. Not emitting metrics."); + } + } + + private void emitShardRequestMetrics(long prevWindowStartTime, + OSMetricsSnapshot alignedOSSnapHolder, + OSMetricsSnapshot osAlignedSnap, + MetricsDB metricsDB) throws Exception { + + if (shardRqMetricsMap.containsKey(prevWindowStartTime)) { + + ShardRequestMetricsSnapshot preShardRequestMetricsSnapshot = shardRqMetricsMap.get(prevWindowStartTime); + MetricsEmitter.emitWorkloadMetrics(create, metricsDB, preShardRequestMetricsSnapshot); // calculate latency + if (osAlignedSnap != null) { + MetricsEmitter.emitAggregatedOSMetrics(create, metricsDB, osAlignedSnap, preShardRequestMetricsSnapshot); // table join + MetricsEmitter.emitThreadNameMetrics(create, metricsDB, osAlignedSnap); // threads other than bulk and query + } + alignedOSSnapHolder.remove(); + }else { + LOG.info("Shard request snapshot for the previous window does not exist. Not emitting metrics."); + } + } + + private void emitMasterMetrics(long prevWindowStartTime, MetricsDB metricsDB) { + + if (masterEventMetricsMap.containsKey(prevWindowStartTime)) { + + MasterEventMetricsSnapshot preMasterEventSnapshot = masterEventMetricsMap.get(prevWindowStartTime); + MetricsEmitter.emitMasterEventMetrics(metricsDB, preMasterEventSnapshot); + }else { + LOG.info("Master snapshot for the previous window does not exist. Not emitting metrics."); + } + } + + private void parseMasterEventMetrics(String rootLocation, long currWindowStartTime, long currWindowEndTime) { + + long mCurrT = System.currentTimeMillis(); + if (masterEventMetricsMap.get(currWindowStartTime) == null) { + MasterEventMetricsSnapshot masterEventMetricsSnapshot = new MasterEventMetricsSnapshot(conn, currWindowStartTime); + Map.Entry entry = masterEventMetricsMap.lastEntry(); + + if (entry != null) { + masterEventMetricsSnapshot.rolloverInflightRequests(entry.getValue()); + } + + metricsParser.parseMasterEventMetrics(rootLocation, currWindowStartTime, + currWindowEndTime, masterEventMetricsSnapshot); + LOG.debug(() -> masterEventMetricsSnapshot.fetchAll()); + masterEventMetricsMap.put(currWindowStartTime, masterEventMetricsSnapshot); + LOG.info("Adding new Master Event snapshot- currTimestamp {}", currWindowStartTime); + } + + long mFinalT = System.currentTimeMillis(); + LOG.info("Total time taken for parsing Master Event Metrics: {}", mFinalT - mCurrT); + } + public void processMetrics(String rootLocation, long currTimestamp) throws Exception { parseNodeMetrics(currTimestamp); long currWindowEndTime = PerformanceAnalyzerMetrics.getTimeInterval(currTimestamp, MetricsConfiguration.SAMPLING_INTERVAL); @@ -423,6 +473,7 @@ public void processMetrics(String rootLocation, long currTimestamp) throws Excep parseOSMetrics(rootLocation, currWindowEndTime, currWindowEndTime + MetricsConfiguration.SAMPLING_INTERVAL); parseRequestMetrics(rootLocation, currWindowStartTime, currWindowEndTime); parseHttpRequestMetrics(rootLocation, currWindowStartTime, currWindowEndTime); + parseMasterEventMetrics(rootLocation, currWindowStartTime, currWindowEndTime); emitMetrics(currWindowStartTime); } diff --git a/src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/reader/MasterEventMetricsSnapshotTests.java b/src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/reader/MasterEventMetricsSnapshotTests.java new file mode 100644 index 00000000..71a7c6c8 --- /dev/null +++ b/src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/reader/MasterEventMetricsSnapshotTests.java @@ -0,0 +1,163 @@ +/* + * Copyright <2019> Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + + +package com.amazon.opendistro.elasticsearch.performanceanalyzer.reader; + +import com.amazon.opendistro.elasticsearch.performanceanalyzer.metrics.AllMetrics; +import org.jooq.BatchBindStep; +import org.jooq.Record; +import org.jooq.Result; +import org.junit.Before; +import org.junit.Test; + +import java.math.BigDecimal; +import java.sql.Connection; +import java.sql.DriverManager; + +import static org.junit.Assert.assertEquals; + + +public class MasterEventMetricsSnapshotTests { + + private static final String DB_URL = "jdbc:sqlite:"; + private Connection conn; + + @Before + public void setup() throws Exception { + + conn = DriverManager.getConnection(DB_URL); + } + + @Test + public void testStartEventOnly() { + MasterEventMetricsSnapshot masterEventMetricsSnapshot = new MasterEventMetricsSnapshot(conn, 1535065195000L); + BatchBindStep handle = masterEventMetricsSnapshot.startBatchPut(); + + handle.bind("111","1","urgent","create-index","metadata",12,1535065195001L,null); + handle.execute(); + Result rt = masterEventMetricsSnapshot.fetchQueueAndRunTime(); + + assertEquals(1, rt.size()); + assertEquals(4999L, + ((BigDecimal)(rt.get(0).get("sum_" + AllMetrics.MasterMetricDimensions.MASTER_TASK_RUN_TIME.toString()))).longValue()); + assertEquals("urgent", + rt.get(0).get(AllMetrics.MasterMetricDimensions.MASTER_TASK_PRIORITY.toString())); + assertEquals("create-index", + rt.get(0).get(AllMetrics.MasterMetricDimensions.MASTER_TASK_TYPE.toString())); + assertEquals("metadata", + rt.get(0).get(AllMetrics.MasterMetricDimensions.MASTER_TASK_METADATA.toString())); + assertEquals(12L, + ((BigDecimal)(rt.get(0).get("sum_" + AllMetrics.MasterMetricDimensions.MASTER_TASK_QUEUE_TIME.toString()))).longValue()); + } + + @Test + public void testStartAndEndEvents() { + MasterEventMetricsSnapshot masterEventMetricsSnapshot = new MasterEventMetricsSnapshot(conn, 1535065195000L); + BatchBindStep handle = masterEventMetricsSnapshot.startBatchPut(); + + handle.bind("111","1","urgent","create-index","metadata",12,1535065195001L,null); + handle.bind("111","1",null,null,null,12, null, 1535065195005L); + handle.execute(); + Result rt = masterEventMetricsSnapshot.fetchQueueAndRunTime(); + + assertEquals(1, rt.size()); + assertEquals(4L, + ((BigDecimal)(rt.get(0).get("sum_" + AllMetrics.MasterMetricDimensions.MASTER_TASK_RUN_TIME.toString()))).longValue()); + assertEquals("urgent", + rt.get(0).get(AllMetrics.MasterMetricDimensions.MASTER_TASK_PRIORITY.toString())); + assertEquals("create-index", + rt.get(0).get(AllMetrics.MasterMetricDimensions.MASTER_TASK_TYPE.toString())); + assertEquals("metadata", + rt.get(0).get(AllMetrics.MasterMetricDimensions.MASTER_TASK_METADATA.toString())); + assertEquals(12L, + ((BigDecimal)(rt.get(0).get("sum_" + AllMetrics.MasterMetricDimensions.MASTER_TASK_QUEUE_TIME.toString()))).longValue()); + } + + @Test + public void testMultipleInsertOrderStartAndEndEvents() { + MasterEventMetricsSnapshot masterEventMetricsSnapshot = new MasterEventMetricsSnapshot(conn, 1535065195000L); + BatchBindStep handle = masterEventMetricsSnapshot.startBatchPut(); + + handle.bind("111","1","urgent","create-index","metadata",12,1535065195001L,null); + handle.bind("111","1",null,null,null,12, null, 1535065195005L); + handle.bind("111","2","high","remapping","metadata2",2,1535065195007L,null); + handle.execute(); + + Result rt = masterEventMetricsSnapshot.fetchQueueAndRunTime(); + + assertEquals(2, rt.size()); + assertEquals(4L, + ((BigDecimal)(rt.get(0).get("sum_" + AllMetrics.MasterMetricDimensions.MASTER_TASK_RUN_TIME.toString()))).longValue()); + assertEquals("urgent", + rt.get(0).get(AllMetrics.MasterMetricDimensions.MASTER_TASK_PRIORITY.toString())); + assertEquals("create-index", + rt.get(0).get(AllMetrics.MasterMetricDimensions.MASTER_TASK_TYPE.toString())); + assertEquals("metadata", + rt.get(0).get(AllMetrics.MasterMetricDimensions.MASTER_TASK_METADATA.toString())); + assertEquals(12L, + ((BigDecimal)(rt.get(0).get("sum_" + AllMetrics.MasterMetricDimensions.MASTER_TASK_QUEUE_TIME.toString()))).longValue()); + + assertEquals(4993L, + ((BigDecimal)(rt.get(1).get("sum_" + AllMetrics.MasterMetricDimensions.MASTER_TASK_RUN_TIME.toString()))).longValue()); + assertEquals("high", + rt.get(1).get(AllMetrics.MasterMetricDimensions.MASTER_TASK_PRIORITY.toString())); + assertEquals("remapping", + rt.get(1).get(AllMetrics.MasterMetricDimensions.MASTER_TASK_TYPE.toString())); + assertEquals("metadata2", + rt.get(1).get(AllMetrics.MasterMetricDimensions.MASTER_TASK_METADATA.toString())); + assertEquals(2L, + ((BigDecimal)(rt.get(1).get("sum_" + AllMetrics.MasterMetricDimensions.MASTER_TASK_QUEUE_TIME.toString()))).longValue()); + + } + + @Test + public void testRollOver() { + MasterEventMetricsSnapshot masterEventMetricsSnapshotPre = new MasterEventMetricsSnapshot(conn, 1535065195000L); + BatchBindStep handle = masterEventMetricsSnapshotPre.startBatchPut(); + + handle.bind("111","1","urgent","create-index","metadata",12,1535065195001L,null); + handle.execute(); + + MasterEventMetricsSnapshot masterEventMetricsSnapshotCurrent = new MasterEventMetricsSnapshot(conn, 1535065200000L); + Result rt = masterEventMetricsSnapshotCurrent.fetchAll(); + assertEquals(0, rt.size()); + + masterEventMetricsSnapshotCurrent.rolloverInflightRequests(masterEventMetricsSnapshotPre); + + Result rt2 = masterEventMetricsSnapshotCurrent.fetchAll(); + assertEquals(1, rt2.size()); + } + + @Test + public void testNotRollOverExpired() { + MasterEventMetricsSnapshot masterEventMetricsSnapshotPre = new MasterEventMetricsSnapshot(conn, 1535065195000L); + BatchBindStep handle = masterEventMetricsSnapshotPre.startBatchPut(); + + handle.bind("111","1","urgent","create-index","metadata",12,1435065195001L,null); + handle.execute(); + + MasterEventMetricsSnapshot masterEventMetricsSnapshotCurrent = new MasterEventMetricsSnapshot(conn, 1535065200000L); + Result rt = masterEventMetricsSnapshotCurrent.fetchAll(); + assertEquals(0, rt.size()); + + masterEventMetricsSnapshotCurrent.rolloverInflightRequests(masterEventMetricsSnapshotPre); + + Result rt2 = masterEventMetricsSnapshotCurrent.fetchAll(); + assertEquals(0, rt2.size()); + } + +} +