Skip to content

Commit

Permalink
Change query range response structure
Browse files Browse the repository at this point in the history
Signed-off-by: Vamsi Manohar <[email protected]>
  • Loading branch information
vmmusings committed Jul 18, 2023
1 parent 8c8e08c commit 8f04562
Show file tree
Hide file tree
Showing 10 changed files with 117 additions and 152 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,22 +7,28 @@

package org.opensearch.sql.ppl;

import static org.opensearch.sql.prometheus.data.constants.PrometheusFieldConstants.LABELS;
import static org.opensearch.sql.prometheus.data.constants.PrometheusFieldConstants.TIMESTAMP;
import static org.opensearch.sql.prometheus.data.constants.PrometheusFieldConstants.VALUE;
import static org.opensearch.sql.util.MatcherUtils.assertJsonEquals;
import static org.opensearch.sql.util.MatcherUtils.schema;
import static org.opensearch.sql.util.MatcherUtils.verifySchema;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.io.Resources;
import java.io.IOException;
import java.net.URI;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.text.SimpleDateFormat;
import java.util.Date;
import lombok.Data;
import lombok.SneakyThrows;
import org.apache.commons.lang3.StringUtils;
import org.json.JSONArray;
import org.json.JSONObject;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.jupiter.api.Assertions;
Expand All @@ -43,7 +49,7 @@ public class PrometheusDataSourceCommandsIT extends PPLIntegTestCase {
*/
@BeforeClass
protected static void metricGenerationWait() throws InterruptedException {
Thread.sleep(10000);
Thread.sleep(30000);
}

@Override
Expand Down Expand Up @@ -218,4 +224,34 @@ public void testMetricSumAggregationCommand() {
}
}


@Test
@SneakyThrows
public void testQueryRange() {
long currentTimestamp = new Date().getTime();
JSONObject response =
executeQuery("source=my_prometheus.query_range('prometheus_http_requests_total',"
+ ((currentTimestamp/1000)-3600) + "," + currentTimestamp/1000 + ", " + 14 + ")" );
System.out.println(response.toString());
verifySchema(response,
schema(VALUE, "array"),
schema(TIMESTAMP, "array"),
schema(LABELS, "struct"));
Assertions.assertTrue(response.getInt("size") > 0);
}

@Test
public void explainQueryRange() throws Exception {
String expected = loadFromFile("expectedOutput/ppl/explain_query_range.json");
assertJsonEquals(
expected,
explainQueryToString("source = my_prometheus"
+ ".query_range('prometheus_http_requests_total',1689281439,1689291439,14)")
);
}

String loadFromFile(String filename) throws Exception {
URI uri = Resources.getResource(filename).toURI();
return new String(Files.readAllBytes(Paths.get(uri)));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
{
"root": {
"name": "QueryRangeFunctionTableScanOperator",
"description": {
"request": "query_range(prometheus_http_requests_total, 1689281439, 1689291439, 14)"
},
"children": []
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,24 +5,25 @@

package org.opensearch.sql.prometheus.functions.response;

import static org.opensearch.sql.prometheus.data.constants.PrometheusFieldConstants.LABELS;
import static org.opensearch.sql.prometheus.data.constants.PrometheusFieldConstants.TIMESTAMP;
import static org.opensearch.sql.prometheus.data.constants.PrometheusFieldConstants.VALUE;

import java.time.Instant;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import org.jetbrains.annotations.NotNull;
import org.json.JSONArray;
import org.json.JSONObject;
import org.opensearch.sql.data.model.ExprCollectionValue;
import org.opensearch.sql.data.model.ExprDoubleValue;
import org.opensearch.sql.data.model.ExprStringValue;
import org.opensearch.sql.data.model.ExprTimestampValue;
import org.opensearch.sql.data.model.ExprTupleValue;
import org.opensearch.sql.data.model.ExprValue;
import org.opensearch.sql.data.type.ExprCoreType;
import org.opensearch.sql.executor.ExecutionEngine;
import org.opensearch.sql.prometheus.data.constants.PrometheusFieldConstants;

/**
* Default implementation of QueryRangeFunctionResponseHandle.
Expand All @@ -40,63 +41,61 @@ public class DefaultQueryRangeFunctionResponseHandle implements QueryRangeFuncti
*/
public DefaultQueryRangeFunctionResponseHandle(JSONObject responseObject) {
this.responseObject = responseObject;
constructIteratorAndSchema();
constructSchema();
constructIterator();
}

private void constructIteratorAndSchema() {
private void constructIterator() {
List<ExprValue> result = new ArrayList<>();
List<ExecutionEngine.Schema.Column> columnList = new ArrayList<>();
if ("matrix".equals(responseObject.getString("resultType"))) {
JSONArray itemArray = responseObject.getJSONArray("result");
for (int i = 0; i < itemArray.length(); i++) {
LinkedHashMap<String, ExprValue> linkedHashMap = new LinkedHashMap<>();
JSONObject item = itemArray.getJSONObject(i);
JSONObject metric = item.getJSONObject("metric");
JSONArray values = item.getJSONArray("values");
if (i == 0) {
columnList = getColumnList(metric);
}
for (int j = 0; j < values.length(); j++) {
LinkedHashMap<String, ExprValue> linkedHashMap =
extractRow(metric, values.getJSONArray(j), columnList);
result.add(new ExprTupleValue(linkedHashMap));
}
linkedHashMap.put(LABELS, extractLabels(item.getJSONObject("metric")));
extractTimestampAndValues(item.getJSONArray("values"), linkedHashMap);
result.add(new ExprTupleValue(linkedHashMap));
}
} else {
throw new RuntimeException(String.format("Unexpected Result Type: %s during Prometheus "
+ "Response Parsing. 'matrix' resultType is expected",
responseObject.getString("resultType")));
}
this.schema = new ExecutionEngine.Schema(columnList);
this.responseIterator = result.iterator();
}

@NotNull
private static LinkedHashMap<String, ExprValue> extractRow(JSONObject metric,
JSONArray values, List<ExecutionEngine.Schema.Column> columnList) {
LinkedHashMap<String, ExprValue> linkedHashMap = new LinkedHashMap<>();
for (ExecutionEngine.Schema.Column column : columnList) {
if (PrometheusFieldConstants.TIMESTAMP.equals(column.getName())) {
linkedHashMap.put(PrometheusFieldConstants.TIMESTAMP,
new ExprTimestampValue(Instant.ofEpochMilli((long) (values.getDouble(0) * 1000))));
} else if (column.getName().equals(VALUE)) {
linkedHashMap.put(VALUE, new ExprDoubleValue(values.getDouble(1)));
} else {
linkedHashMap.put(column.getName(),
new ExprStringValue(metric.getString(column.getName())));
}
private static void extractTimestampAndValues(JSONArray values,
LinkedHashMap<String, ExprValue> linkedHashMap) {
List<ExprValue> timestampList = new ArrayList<>();
List<ExprValue> valueList = new ArrayList<>();
for (int j = 0; j < values.length(); j++) {
JSONArray value = values.getJSONArray(j);
timestampList.add(new ExprTimestampValue(
Instant.ofEpochMilli((long) (value.getDouble(0) * 1000))));
valueList.add(new ExprDoubleValue(value.getDouble(1)));
}
return linkedHashMap;
linkedHashMap.put(TIMESTAMP,
new ExprCollectionValue(timestampList));
linkedHashMap.put(VALUE, new ExprCollectionValue(valueList));
}

private void constructSchema() {
this.schema = new ExecutionEngine.Schema(getColumnList());
}

private List<ExecutionEngine.Schema.Column> getColumnList(JSONObject metric) {
private ExprValue extractLabels(JSONObject metric) {
LinkedHashMap<String, ExprValue> labelsMap = new LinkedHashMap<>();
metric.keySet().forEach(key
-> labelsMap.put(key, new ExprStringValue(metric.getString(key))));
return new ExprTupleValue(labelsMap);
}


private List<ExecutionEngine.Schema.Column> getColumnList() {
List<ExecutionEngine.Schema.Column> columnList = new ArrayList<>();
columnList.add(new ExecutionEngine.Schema.Column(PrometheusFieldConstants.TIMESTAMP,
PrometheusFieldConstants.TIMESTAMP, ExprCoreType.TIMESTAMP));
columnList.add(new ExecutionEngine.Schema.Column(VALUE, VALUE, ExprCoreType.DOUBLE));
for (String key : metric.keySet()) {
columnList.add(new ExecutionEngine.Schema.Column(key, key, ExprCoreType.STRING));
}
columnList.add(new ExecutionEngine.Schema.Column(TIMESTAMP, TIMESTAMP, ExprCoreType.ARRAY));
columnList.add(new ExecutionEngine.Schema.Column(VALUE, VALUE, ExprCoreType.ARRAY));
columnList.add(new ExecutionEngine.Schema.Column(LABELS, LABELS, ExprCoreType.STRUCT));
return columnList;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,6 @@ public class PrometheusResponse implements Iterable<ExprValue> {

private final PrometheusResponseFieldNames prometheusResponseFieldNames;

private final Boolean isQueryRangeFunctionScan;

/**
* Constructor.
*
Expand All @@ -46,11 +44,9 @@ public class PrometheusResponse implements Iterable<ExprValue> {
* and timestamp fieldName.
*/
public PrometheusResponse(JSONObject responseObject,
PrometheusResponseFieldNames prometheusResponseFieldNames,
Boolean isQueryRangeFunctionScan) {
PrometheusResponseFieldNames prometheusResponseFieldNames) {
this.responseObject = responseObject;
this.prometheusResponseFieldNames = prometheusResponseFieldNames;
this.isQueryRangeFunctionScan = isQueryRangeFunctionScan;
}

@NonNull
Expand All @@ -70,24 +66,7 @@ public Iterator<ExprValue> iterator() {
new ExprTimestampValue(Instant.ofEpochMilli((long) (val.getDouble(0) * 1000))));
linkedHashMap.put(prometheusResponseFieldNames.getValueFieldName(), getValue(val, 1,
prometheusResponseFieldNames.getValueType()));
// Concept:
// {\"instance\":\"localhost:9090\",\"__name__\":\"up\",\"job\":\"prometheus\"}"
// This is the label string in the prometheus response.
// Q: how do we map this to columns in a table.
// For queries like source = prometheus.metric_name | ....
// we can get the labels list in prior as we know which metric we are working on.
// In case of commands like source = prometheus.query_range('promQL');
// Any arbitrary command can be written and we don't know the labels
// in the prometheus response in prior.
// So for PPL like commands...output structure is @value, @timestamp
// and each label is treated as a separate column where as in case of query_range
// function irrespective of promQL, the output structure is
// @value, @timestamp, @labels [jsonfied string of all the labels for a data point]
if (isQueryRangeFunctionScan) {
linkedHashMap.put(LABELS, new ExprStringValue(metric.toString()));
} else {
insertLabels(linkedHashMap, metric);
}
insertLabels(linkedHashMap, metric);
result.add(new ExprTupleValue(linkedHashMap));
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,6 @@ public class PrometheusMetricScan extends TableScanOperator {

private Iterator<ExprValue> iterator;

@Setter
@Getter
private Boolean isQueryRangeFunctionScan = Boolean.FALSE;

@Setter
private PrometheusResponseFieldNames prometheusResponseFieldNames;

Expand All @@ -69,8 +65,7 @@ public void open() {
JSONObject responseObject = prometheusClient.queryRange(
request.getPromQl(),
request.getStartTime(), request.getEndTime(), request.getStep());
return new PrometheusResponse(responseObject, prometheusResponseFieldNames,
isQueryRangeFunctionScan).iterator();
return new PrometheusResponse(responseObject, prometheusResponseFieldNames).iterator();
} catch (IOException e) {
LOG.error(e.getMessage());
throw new RuntimeException("Error fetching data from prometheus server. " + e.getMessage());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,10 +97,6 @@ public Map<String, ExprType> getFieldTypes() {
public PhysicalPlan implement(LogicalPlan plan) {
PrometheusMetricScan metricScan =
new PrometheusMetricScan(prometheusClient);
if (prometheusQueryRequest != null) {
metricScan.setRequest(prometheusQueryRequest);
metricScan.setIsQueryRangeFunctionScan(Boolean.TRUE);
}
return plan.accept(new PrometheusDefaultImplementor(), metricScan);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,14 +94,12 @@ public PhysicalPlan visitIndexAggregation(PrometheusLogicalMetricAgg node,
public PhysicalPlan visitRelation(LogicalRelation node,
PrometheusMetricScan context) {
PrometheusMetricTable prometheusMetricTable = (PrometheusMetricTable) node.getTable();
if (prometheusMetricTable.getMetricName() != null) {
String query = SeriesSelectionQueryBuilder.build(node.getRelationName(), null);
context.getRequest().setPromQl(query);
setTimeRangeParameters(null, context);
context.getRequest()
.setStep(StepParameterResolver.resolve(context.getRequest().getStartTime(),
context.getRequest().getEndTime(), null));
}
String query = SeriesSelectionQueryBuilder.build(node.getRelationName(), null);
context.getRequest().setPromQl(query);
setTimeRangeParameters(null, context);
context.getRequest()
.setStep(StepParameterResolver.resolve(context.getRequest().getStartTime(),
context.getRequest().getEndTime(), null));
return context;
}

Expand Down
Loading

0 comments on commit 8f04562

Please sign in to comment.