Skip to content

Commit

Permalink
Add new metrics - ShardBulkDocs and ShardEvents
Browse files Browse the repository at this point in the history
* ShardBulkDocs - number of documents indexed by the shard in the window.
* ShardEvents - number of events(bulk, query, fetch) run on the shard.
  • Loading branch information
Adithya Chandra committed Feb 18, 2019
1 parent 1753626 commit ffbd759
Show file tree
Hide file tree
Showing 5 changed files with 94 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -695,7 +695,8 @@ public enum ShardBulkMetric implements MetricValue {
START_TIME(Constants.START_TIME_VALUE),
ITEM_COUNT(Constants.ITEM_COUNT_VALUE),
FINISH_TIME(Constants.FINISH_TIME_VALUE),
LATENCY(Constants.LATENCY_VALUE);
LATENCY(Constants.LATENCY_VALUE),
DOC_COUNT(Constants.DOC_COUNT);

private final String value;

Expand All @@ -713,6 +714,26 @@ public static class Constants {
public static final String ITEM_COUNT_VALUE = "ItemCount";
public static final String FINISH_TIME_VALUE = CommonMetric.FINISH_TIME.toString();
public static final String LATENCY_VALUE = CommonMetric.LATENCY.toString();
public static final String DOC_COUNT = "ShardBulkDocs";
}
}

public enum ShardOperationMetric implements MetricValue {
SHARD_OP_COUNT(Constants.SHARD_OP_COUNT_VALUE);

private final String value;

ShardOperationMetric(String value) {
this.value = value;
}

@Override
public String toString() {
return value;
}

public static class Constants {
public static final String SHARD_OP_COUNT_VALUE = "ShardEvents";
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@
import com.amazon.opendistro.performanceanalyzer.metrics.AllMetrics.MasterPendingValue;
import com.amazon.opendistro.performanceanalyzer.metrics.AllMetrics.MetricUnits;
import com.amazon.opendistro.performanceanalyzer.metrics.AllMetrics.OSMetrics;
import com.amazon.opendistro.performanceanalyzer.metrics.AllMetrics.ShardBulkMetric;
import com.amazon.opendistro.performanceanalyzer.metrics.AllMetrics.ShardOperationMetric;
import com.amazon.opendistro.performanceanalyzer.metrics.AllMetrics.ShardStatsValue;
import com.amazon.opendistro.performanceanalyzer.metrics.AllMetrics.ShardStatsDerivedDimension;
import com.amazon.opendistro.performanceanalyzer.metrics.AllMetrics.TCPDimension;
Expand Down Expand Up @@ -85,6 +87,11 @@ public class MetricsModel {
ALL_METRICS.put(CommonMetric.LATENCY.toString(),
new MetricAttributes(MetricUnits.MILLISECOND.toString(), LatencyDimension.values()));

ALL_METRICS.put(ShardOperationMetric.SHARD_OP_COUNT.toString(),
new MetricAttributes(MetricUnits.COUNT.toString(), AggregatedOSDimension.values()));
ALL_METRICS.put(ShardBulkMetric.DOC_COUNT.toString(),
new MetricAttributes(MetricUnits.COUNT.toString(), AggregatedOSDimension.values()));

// HTTP Metrics
ALL_METRICS.put(HttpMetric.HTTP_REQUEST_DOCS.toString(),
new MetricAttributes(MetricUnits.COUNT.toString(), HttpOnlyDimension.values()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,23 @@ public static void emitWorkloadMetrics(final DSLContext create, final MetricsDB
BatchBindStep handle = db.startBatchPut(new Metric<Double>(
CommonMetric.LATENCY.toString(), 0d), dims);

//Dims need to be changed.
List<String> shardDims = new ArrayList<String>() { {
this.add(ShardRequestMetricsSnapshot.Fields.OPERATION.toString());
this.add(ShardRequestMetricsSnapshot.Fields.SHARD_ID.toString());
this.add(ShardRequestMetricsSnapshot.Fields.INDEX_NAME.toString());
this.add(ShardRequestMetricsSnapshot.Fields.SHARD_ROLE.toString());
} };

db.createMetric(new Metric<Double>(AllMetrics.ShardOperationMetric.SHARD_OP_COUNT.toString(), 0d),
shardDims);
BatchBindStep countHandle = db.startBatchPut(new Metric<Double>(
"ShardOpCount", 0d), shardDims);

db.createMetric(new Metric<Double>(AllMetrics.ShardBulkMetric.DOC_COUNT.toString(), 0d),
shardDims);
BatchBindStep bulkDocHandle = db.startBatchPut(new Metric<Double>(
AllMetrics.ShardBulkMetric.DOC_COUNT.toString(), 0d), shardDims);

for (Record r: res) {
Double sumLatency = Double.parseDouble(r.get(DBUtils.
Expand Down Expand Up @@ -237,7 +254,35 @@ public static void emitWorkloadMetrics(final DSLContext create, final MetricsDB
minLatency,
maxLatency
);

Double count = Double.parseDouble(r.get("ShardOpCount").toString());
countHandle.bind(r.get(ShardRequestMetricsSnapshot.Fields.OPERATION.toString()).toString(),
r.get(ShardRequestMetricsSnapshot.Fields.SHARD_ID.toString()).toString(),
r.get(ShardRequestMetricsSnapshot.Fields.INDEX_NAME.toString()).toString(),
r.get(ShardRequestMetricsSnapshot.Fields.SHARD_ROLE.toString()).toString(),
count,
count,
count,
count
);

Object bulkDocCountObj = r.get(AllMetrics.ShardBulkMetric.DOC_COUNT.toString());
if (bulkDocCountObj != null) {
Double bulkDocCount = Double.parseDouble(bulkDocCountObj.toString());
bulkDocHandle.bind(r.get(ShardRequestMetricsSnapshot.Fields.OPERATION.toString()).toString(),
r.get(ShardRequestMetricsSnapshot.Fields.SHARD_ID.toString()).toString(),
r.get(ShardRequestMetricsSnapshot.Fields.INDEX_NAME.toString()).toString(),
r.get(ShardRequestMetricsSnapshot.Fields.SHARD_ROLE.toString()).toString(),
bulkDocCount,
bulkDocCount,
bulkDocCount,
bulkDocCount
);
}
}
handle.execute();
countHandle.execute();
bulkDocHandle.execute();
long mFinalT = System.currentTimeMillis();
LOG.info("Total time taken for writing workload metrics metricsdb: {}", mFinalT - mCurrT);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,13 @@ private void emitStartMetric(String startMetrics, String rid, String threadId,
ShardBulkDimension.SHARD_ID.toString());
String primary = getPrimary(PerformanceAnalyzerMetrics.extractMetricValue(startMetrics,
ShardBulkDimension.PRIMARY.toString()));
handle.bind(shardId, indexName, rid, threadId, operation, primary, st, null);
String docCountString = PerformanceAnalyzerMetrics.extractMetricValue(startMetrics,
ShardBulkMetric.ITEM_COUNT.toString());
long docCount = 0;
if (docCountString != null) {
docCount = Long.parseLong(docCountString);
}
handle.bind(shardId, indexName, rid, threadId, operation, primary, st, null, docCount);
}

private void emitFinishMetric(String finishMetrics, String rid, String threadId,
Expand All @@ -289,7 +295,7 @@ private void emitFinishMetric(String finishMetrics, String rid, String threadId,
ShardBulkDimension.SHARD_ID.toString());
String primary = getPrimary(PerformanceAnalyzerMetrics.extractMetricValue(finishMetrics,
ShardBulkDimension.PRIMARY.toString()));
handle.bind(shardId, indexName, rid, threadId, operation, primary, null, ft);
handle.bind(shardId, indexName, rid, threadId, operation, primary, null, ft, null);
}

private void handleidFile(File idFile, String threadID, long startTime,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@

import com.amazon.opendistro.performanceanalyzer.DBUtils;
import com.amazon.opendistro.performanceanalyzer.metrics.AllMetrics.CommonDimension;
import com.amazon.opendistro.performanceanalyzer.metrics.AllMetrics.ShardBulkMetric;
import com.amazon.opendistro.performanceanalyzer.metrics.AllMetrics.ShardOperationMetric;
import com.amazon.opendistro.performanceanalyzer.metrics.MetricsConfiguration;
import com.amazon.opendistro.performanceanalyzer.metricsdb.MetricsDB;

Expand Down Expand Up @@ -70,7 +72,8 @@ public enum Fields {
LAT(HttpRequestMetricsSnapshot.Fields.LAT.toString()),
TUTIL("tUtil"),
TTIME("ttime"),
LATEST("latest");
LATEST("latest"),
DOC_COUNT(ShardBulkMetric.DOC_COUNT.toString());

private final String fieldValue;

Expand Down Expand Up @@ -99,6 +102,7 @@ public ShardRequestMetricsSnapshot(Connection conn, Long windowStartTime) throws
this.add(DSL.field(DSL.name(Fields.SHARD_ROLE.toString()), String.class));
this.add(DSL.field(DSL.name(Fields.ST.toString()), Long.class));
this.add(DSL.field(DSL.name(Fields.ET.toString()), Long.class));
this.add(DSL.field(DSL.name(Fields.DOC_COUNT.toString()), Long.class));
} };

create.createTable(this.tableName)
Expand Down Expand Up @@ -173,6 +177,7 @@ public SelectHavingStep<Record> fetchLatency() {
this.add(DSL.field(DSL.name(Fields.SHARD_ROLE.toString()), String.class));
this.add(DSL.field(DSL.name(Fields.ST.toString()), Long.class));
this.add(DSL.field(DSL.name(Fields.ET.toString()), Long.class));
this.add(DSL.field(DSL.name(Fields.DOC_COUNT.toString()), Double.class));
this.add(DSL.field(Fields.ET.toString()).minus(DSL.field(Fields.ST.toString())).as(DSL.name(Fields.LAT.toString())));
} };

Expand Down Expand Up @@ -217,6 +222,9 @@ public Result<Record> fetchLatencyByOp() {
.as(DBUtils.getAggFieldName(Fields.LAT.toString(), MetricsDB.MIN)));
this.add(DSL.max(DSL.field(DSL.name(Fields.LAT.toString()), Double.class))
.as(DBUtils.getAggFieldName(Fields.LAT.toString(), MetricsDB.MAX)));
this.add(DSL.count().as(ShardOperationMetric.SHARD_OP_COUNT.toString()));
this.add(DSL.sum(DSL.field(DSL.name(Fields.DOC_COUNT.toString()), Double.class))
.as(ShardBulkMetric.DOC_COUNT.toString()));
} };

ArrayList<Field<?>> groupByFields = new ArrayList<Field<?>>() { {
Expand Down Expand Up @@ -299,6 +307,7 @@ public SelectHavingStep<Record> groupByRidOpSelect() {
this.add(DSL.field(DSL.name(Fields.TID.toString()), String.class));
this.add(DSL.field(DSL.name(Fields.OPERATION.toString()), String.class));
this.add(DSL.field(DSL.name(Fields.SHARD_ROLE.toString()), String.class));
this.add(DSL.field(DSL.name(Fields.DOC_COUNT.toString()), Double.class));
this.add(DSL.max(DSL.field(Fields.ST.toString())).as(DSL.name(Fields.ST.toString())));
this.add(DSL.max(DSL.field(Fields.ET.toString())).as(DSL.name(Fields.ET.toString())));
} };
Expand All @@ -321,6 +330,7 @@ public SelectHavingStep<Record> requestsPerThreadSelect() {
this.add(DSL.field(DSL.name(Fields.SHARD_ROLE.toString()), String.class));
this.add(DSL.field(DSL.name(Fields.ST.toString()), Long.class));
this.add(DSL.field(DSL.name(Fields.ET.toString()), Long.class));
this.add(DSL.field(DSL.name(Fields.DOC_COUNT.toString()), Double.class));
this.add(DSL.field(DSL.name(Fields.LATEST.toString()), Long.class));
} };
SelectHavingStep<Record2<Long, String>> threadTable = create
Expand Down Expand Up @@ -376,6 +386,7 @@ public SelectHavingStep<Record> fetchInflightSelect() {
this.add(DSL.field(DSL.name(Fields.SHARD_ROLE.toString()), String.class));
this.add(DSL.field(DSL.name(Fields.ST.toString()), Long.class));
this.add(DSL.field(DSL.name(Fields.ET.toString()), Long.class));
this.add(DSL.field(DSL.name(Fields.DOC_COUNT.toString()), Long.class));
} };

SelectHavingStep<Record> reqPerThread = requestsPerThreadSelect();
Expand Down

0 comments on commit ffbd759

Please sign in to comment.