Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Change query range response structure #1867

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -7,22 +7,28 @@

package org.opensearch.sql.ppl;

import static org.opensearch.sql.prometheus.data.constants.PrometheusFieldConstants.LABELS;
import static org.opensearch.sql.prometheus.data.constants.PrometheusFieldConstants.TIMESTAMP;
import static org.opensearch.sql.prometheus.data.constants.PrometheusFieldConstants.VALUE;
import static org.opensearch.sql.util.MatcherUtils.assertJsonEquals;
import static org.opensearch.sql.util.MatcherUtils.schema;
import static org.opensearch.sql.util.MatcherUtils.verifySchema;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.io.Resources;
import java.io.IOException;
import java.net.URI;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.text.SimpleDateFormat;
import java.util.Date;
import lombok.Data;
import lombok.SneakyThrows;
import org.apache.commons.lang3.StringUtils;
import org.json.JSONArray;
import org.json.JSONObject;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.jupiter.api.Assertions;
Expand Down Expand Up @@ -218,4 +224,33 @@ public void testMetricSumAggregationCommand() {
}
}


@Test
@SneakyThrows
public void testQueryRange() {
long currentTimestamp = new Date().getTime();
JSONObject response =
executeQuery("source=my_prometheus.query_range('prometheus_http_requests_total',"
+ ((currentTimestamp/1000)-3600) + "," + currentTimestamp/1000 + ", " + 14 + ")" );
verifySchema(response,
schema(VALUE, "array"),
schema(TIMESTAMP, "array"),
schema(LABELS, "struct"));
Assertions.assertTrue(response.getInt("size") > 0);
}

@Test
public void explainQueryRange() throws Exception {
String expected = loadFromFile("expectedOutput/ppl/explain_query_range.json");
assertJsonEquals(
expected,
explainQueryToString("source = my_prometheus"
+ ".query_range('prometheus_http_requests_total',1689281439,1689291439,14)")
);
}

String loadFromFile(String filename) throws Exception {
URI uri = Resources.getResource(filename).toURI();
return new String(Files.readAllBytes(Paths.get(uri)));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
{
"root": {
"name": "QueryRangeFunctionTableScanOperator",
"description": {
"request": "query_range(prometheus_http_requests_total, 1689281439, 1689291439, 14)"
},
"children": []
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,24 +5,25 @@

package org.opensearch.sql.prometheus.functions.response;

import static org.opensearch.sql.prometheus.data.constants.PrometheusFieldConstants.LABELS;
import static org.opensearch.sql.prometheus.data.constants.PrometheusFieldConstants.TIMESTAMP;
import static org.opensearch.sql.prometheus.data.constants.PrometheusFieldConstants.VALUE;

import java.time.Instant;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import org.jetbrains.annotations.NotNull;
import org.json.JSONArray;
import org.json.JSONObject;
import org.opensearch.sql.data.model.ExprCollectionValue;
import org.opensearch.sql.data.model.ExprDoubleValue;
import org.opensearch.sql.data.model.ExprStringValue;
import org.opensearch.sql.data.model.ExprTimestampValue;
import org.opensearch.sql.data.model.ExprTupleValue;
import org.opensearch.sql.data.model.ExprValue;
import org.opensearch.sql.data.type.ExprCoreType;
import org.opensearch.sql.executor.ExecutionEngine;
import org.opensearch.sql.prometheus.data.constants.PrometheusFieldConstants;

/**
* Default implementation of QueryRangeFunctionResponseHandle.
Expand All @@ -40,63 +41,61 @@ public class DefaultQueryRangeFunctionResponseHandle implements QueryRangeFuncti
*/
public DefaultQueryRangeFunctionResponseHandle(JSONObject responseObject) {
this.responseObject = responseObject;
constructIteratorAndSchema();
constructSchema();
constructIterator();
}

private void constructIteratorAndSchema() {
private void constructIterator() {
List<ExprValue> result = new ArrayList<>();
List<ExecutionEngine.Schema.Column> columnList = new ArrayList<>();
if ("matrix".equals(responseObject.getString("resultType"))) {
JSONArray itemArray = responseObject.getJSONArray("result");
for (int i = 0; i < itemArray.length(); i++) {
LinkedHashMap<String, ExprValue> linkedHashMap = new LinkedHashMap<>();
JSONObject item = itemArray.getJSONObject(i);
JSONObject metric = item.getJSONObject("metric");
JSONArray values = item.getJSONArray("values");
if (i == 0) {
columnList = getColumnList(metric);
}
for (int j = 0; j < values.length(); j++) {
LinkedHashMap<String, ExprValue> linkedHashMap =
extractRow(metric, values.getJSONArray(j), columnList);
result.add(new ExprTupleValue(linkedHashMap));
}
linkedHashMap.put(LABELS, extractLabels(item.getJSONObject("metric")));
extractTimestampAndValues(item.getJSONArray("values"), linkedHashMap);
result.add(new ExprTupleValue(linkedHashMap));
}
} else {
throw new RuntimeException(String.format("Unexpected Result Type: %s during Prometheus "
+ "Response Parsing. 'matrix' resultType is expected",
responseObject.getString("resultType")));
}
this.schema = new ExecutionEngine.Schema(columnList);
this.responseIterator = result.iterator();
}

@NotNull
private static LinkedHashMap<String, ExprValue> extractRow(JSONObject metric,
JSONArray values, List<ExecutionEngine.Schema.Column> columnList) {
LinkedHashMap<String, ExprValue> linkedHashMap = new LinkedHashMap<>();
for (ExecutionEngine.Schema.Column column : columnList) {
if (PrometheusFieldConstants.TIMESTAMP.equals(column.getName())) {
linkedHashMap.put(PrometheusFieldConstants.TIMESTAMP,
new ExprTimestampValue(Instant.ofEpochMilli((long) (values.getDouble(0) * 1000))));
} else if (column.getName().equals(VALUE)) {
linkedHashMap.put(VALUE, new ExprDoubleValue(values.getDouble(1)));
} else {
linkedHashMap.put(column.getName(),
new ExprStringValue(metric.getString(column.getName())));
}
private static void extractTimestampAndValues(JSONArray values,
LinkedHashMap<String, ExprValue> linkedHashMap) {
List<ExprValue> timestampList = new ArrayList<>();
List<ExprValue> valueList = new ArrayList<>();
for (int j = 0; j < values.length(); j++) {
JSONArray value = values.getJSONArray(j);
timestampList.add(new ExprTimestampValue(
Instant.ofEpochMilli((long) (value.getDouble(0) * 1000))));
valueList.add(new ExprDoubleValue(value.getDouble(1)));
}
return linkedHashMap;
linkedHashMap.put(TIMESTAMP,
new ExprCollectionValue(timestampList));
linkedHashMap.put(VALUE, new ExprCollectionValue(valueList));
}

private void constructSchema() {
this.schema = new ExecutionEngine.Schema(getColumnList());
}

private List<ExecutionEngine.Schema.Column> getColumnList(JSONObject metric) {
private ExprValue extractLabels(JSONObject metric) {
LinkedHashMap<String, ExprValue> labelsMap = new LinkedHashMap<>();
metric.keySet().forEach(key
-> labelsMap.put(key, new ExprStringValue(metric.getString(key))));
return new ExprTupleValue(labelsMap);
}


private List<ExecutionEngine.Schema.Column> getColumnList() {
List<ExecutionEngine.Schema.Column> columnList = new ArrayList<>();
columnList.add(new ExecutionEngine.Schema.Column(PrometheusFieldConstants.TIMESTAMP,
PrometheusFieldConstants.TIMESTAMP, ExprCoreType.TIMESTAMP));
columnList.add(new ExecutionEngine.Schema.Column(VALUE, VALUE, ExprCoreType.DOUBLE));
for (String key : metric.keySet()) {
columnList.add(new ExecutionEngine.Schema.Column(key, key, ExprCoreType.STRING));
}
columnList.add(new ExecutionEngine.Schema.Column(TIMESTAMP, TIMESTAMP, ExprCoreType.ARRAY));
columnList.add(new ExecutionEngine.Schema.Column(VALUE, VALUE, ExprCoreType.ARRAY));
columnList.add(new ExecutionEngine.Schema.Column(LABELS, LABELS, ExprCoreType.STRUCT));
return columnList;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,6 @@ public class PrometheusResponse implements Iterable<ExprValue> {

private final PrometheusResponseFieldNames prometheusResponseFieldNames;

private final Boolean isQueryRangeFunctionScan;

/**
* Constructor.
*
Expand All @@ -46,11 +44,9 @@ public class PrometheusResponse implements Iterable<ExprValue> {
* and timestamp fieldName.
*/
public PrometheusResponse(JSONObject responseObject,
PrometheusResponseFieldNames prometheusResponseFieldNames,
Boolean isQueryRangeFunctionScan) {
PrometheusResponseFieldNames prometheusResponseFieldNames) {
this.responseObject = responseObject;
this.prometheusResponseFieldNames = prometheusResponseFieldNames;
this.isQueryRangeFunctionScan = isQueryRangeFunctionScan;
}

@NonNull
Expand All @@ -70,24 +66,7 @@ public Iterator<ExprValue> iterator() {
new ExprTimestampValue(Instant.ofEpochMilli((long) (val.getDouble(0) * 1000))));
linkedHashMap.put(prometheusResponseFieldNames.getValueFieldName(), getValue(val, 1,
prometheusResponseFieldNames.getValueType()));
// Concept:
// {\"instance\":\"localhost:9090\",\"__name__\":\"up\",\"job\":\"prometheus\"}"
// This is the label string in the prometheus response.
// Q: how do we map this to columns in a table.
// For queries like source = prometheus.metric_name | ....
// we can get the labels list in prior as we know which metric we are working on.
// In case of commands like source = prometheus.query_range('promQL');
// Any arbitrary command can be written and we don't know the labels
// in the prometheus response in prior.
// So for PPL like commands...output structure is @value, @timestamp
// and each label is treated as a separate column where as in case of query_range
// function irrespective of promQL, the output structure is
// @value, @timestamp, @labels [jsonfied string of all the labels for a data point]
if (isQueryRangeFunctionScan) {
linkedHashMap.put(LABELS, new ExprStringValue(metric.toString()));
} else {
insertLabels(linkedHashMap, metric);
}
insertLabels(linkedHashMap, metric);
result.add(new ExprTupleValue(linkedHashMap));
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,6 @@ public class PrometheusMetricScan extends TableScanOperator {

private Iterator<ExprValue> iterator;

@Setter
@Getter
private Boolean isQueryRangeFunctionScan = Boolean.FALSE;

@Setter
private PrometheusResponseFieldNames prometheusResponseFieldNames;

Expand All @@ -69,8 +65,7 @@ public void open() {
JSONObject responseObject = prometheusClient.queryRange(
request.getPromQl(),
request.getStartTime(), request.getEndTime(), request.getStep());
return new PrometheusResponse(responseObject, prometheusResponseFieldNames,
isQueryRangeFunctionScan).iterator();
return new PrometheusResponse(responseObject, prometheusResponseFieldNames).iterator();
} catch (IOException e) {
LOG.error(e.getMessage());
throw new RuntimeException("Error fetching data from prometheus server. " + e.getMessage());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,10 +97,6 @@ public Map<String, ExprType> getFieldTypes() {
public PhysicalPlan implement(LogicalPlan plan) {
PrometheusMetricScan metricScan =
new PrometheusMetricScan(prometheusClient);
if (prometheusQueryRequest != null) {
metricScan.setRequest(prometheusQueryRequest);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see request field is still in use in metric scan. So where we set it now?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is set in https://github.com/opensearch-project/sql/blob/main/prometheus/src/main/java/org/opensearch/sql/prometheus/storage/implementor/PrometheusDefaultImplementor.java#L109 here

Implementation of query_range and ppl.prometheus_http_requests_total got coupled with weird logic. Need to refactor in another PR.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

query_range...we directly set the prometheusQueryRequest.
In case of source = my_prometheus.prometheus_http_requests_total we set these parameters in PrometheusDefaultImplementator class.

metricScan.setIsQueryRangeFunctionScan(Boolean.TRUE);
}
return plan.accept(new PrometheusDefaultImplementor(), metricScan);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,14 +94,12 @@ public PhysicalPlan visitIndexAggregation(PrometheusLogicalMetricAgg node,
public PhysicalPlan visitRelation(LogicalRelation node,
PrometheusMetricScan context) {
PrometheusMetricTable prometheusMetricTable = (PrometheusMetricTable) node.getTable();
if (prometheusMetricTable.getMetricName() != null) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just my feel that it's a little hard to follow why several if check removed in this PR. it's fine if it's covered in UT.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In earlier implementation, query_range function implementation got coupled with normal PPL queries source = my_prometheus.proemtheus_http_requests_total implementation. This PR at least decouples few of the things.

Need to refactor the existing implementation. I tried to refactor in this PR, the changes are becoming huge. Will cover them in another PR.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With the new implementation. Visit Relation will always have metricName.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. Please add TODO comment or put all refactor items in Github issue. Thanks!

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

String query = SeriesSelectionQueryBuilder.build(node.getRelationName(), null);
context.getRequest().setPromQl(query);
setTimeRangeParameters(null, context);
context.getRequest()
.setStep(StepParameterResolver.resolve(context.getRequest().getStartTime(),
context.getRequest().getEndTime(), null));
}
String query = SeriesSelectionQueryBuilder.build(node.getRelationName(), null);
context.getRequest().setPromQl(query);
setTimeRangeParameters(null, context);
context.getRequest()
.setStep(StepParameterResolver.resolve(context.getRequest().getStartTime(),
context.getRequest().getEndTime(), null));
return context;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,15 @@
import static org.opensearch.sql.prometheus.constants.TestConstants.QUERY;
import static org.opensearch.sql.prometheus.constants.TestConstants.STARTTIME;
import static org.opensearch.sql.prometheus.constants.TestConstants.STEP;
import static org.opensearch.sql.prometheus.data.constants.PrometheusFieldConstants.LABELS;
import static org.opensearch.sql.prometheus.data.constants.PrometheusFieldConstants.TIMESTAMP;
import static org.opensearch.sql.prometheus.data.constants.PrometheusFieldConstants.VALUE;
import static org.opensearch.sql.prometheus.utils.TestUtils.getJson;

import java.io.IOException;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Collections;
import java.util.LinkedHashMap;
import lombok.SneakyThrows;
import org.json.JSONObject;
Expand All @@ -30,17 +32,19 @@
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.opensearch.sql.data.model.ExprCollectionValue;
import org.opensearch.sql.data.model.ExprDoubleValue;
import org.opensearch.sql.data.model.ExprStringValue;
import org.opensearch.sql.data.model.ExprTimestampValue;
import org.opensearch.sql.data.model.ExprTupleValue;
import org.opensearch.sql.data.model.ExprValue;
import org.opensearch.sql.data.type.ExprCoreType;
import org.opensearch.sql.executor.ExecutionEngine;
import org.opensearch.sql.prometheus.client.PrometheusClient;
import org.opensearch.sql.prometheus.request.PrometheusQueryRequest;

@ExtendWith(MockitoExtension.class)
public class QueryRangeFunctionTableScanOperatorTest {
class QueryRangeFunctionTableScanOperatorTest {
@Mock
private PrometheusClient prometheusClient;

Expand All @@ -61,22 +65,32 @@ void testQueryResponseIterator() {
.thenReturn(new JSONObject(getJson("query_range_result.json")));
queryRangeFunctionTableScanOperator.open();
Assertions.assertTrue(queryRangeFunctionTableScanOperator.hasNext());
ExprTupleValue firstRow = new ExprTupleValue(new LinkedHashMap<>() {{
put(TIMESTAMP, new ExprTimestampValue(Instant.ofEpochMilli(1435781430781L)));
put(VALUE, new ExprDoubleValue(1));
LinkedHashMap<String, ExprValue> labelsMap = new LinkedHashMap<>() {{
put("instance", new ExprStringValue("localhost:9090"));
put("__name__", new ExprStringValue("up"));
put("job", new ExprStringValue("prometheus"));
}};
ExprTupleValue firstRow = new ExprTupleValue(new LinkedHashMap<>() {{
put(LABELS, new ExprTupleValue(labelsMap));
put(TIMESTAMP, new ExprCollectionValue(Collections
.singletonList(new ExprTimestampValue(Instant.ofEpochMilli(1435781430781L)))));
put(VALUE, new ExprCollectionValue(Collections.singletonList(new ExprDoubleValue(1))));
}
});

assertEquals(firstRow, queryRangeFunctionTableScanOperator.next());
Assertions.assertTrue(queryRangeFunctionTableScanOperator.hasNext());
ExprTupleValue secondRow = new ExprTupleValue(new LinkedHashMap<>() {{
put("@timestamp", new ExprTimestampValue(Instant.ofEpochMilli(1435781430781L)));
put("@value", new ExprDoubleValue(0));

LinkedHashMap<String, ExprValue> labelsMap2 = new LinkedHashMap<>() {{
put("instance", new ExprStringValue("localhost:9091"));
put("__name__", new ExprStringValue("up"));
put("job", new ExprStringValue("node"));
}};
ExprTupleValue secondRow = new ExprTupleValue(new LinkedHashMap<>() {{
put(LABELS, new ExprTupleValue(labelsMap2));
put(TIMESTAMP, new ExprCollectionValue(Collections
.singletonList(new ExprTimestampValue(Instant.ofEpochMilli(1435781430781L)))));
put(VALUE, new ExprCollectionValue(Collections.singletonList(new ExprDoubleValue(0))));
}
});
assertEquals(secondRow, queryRangeFunctionTableScanOperator.next());
Expand Down Expand Up @@ -120,11 +134,9 @@ void testQuerySchema() {
.thenReturn(new JSONObject(getJson("query_range_result.json")));
queryRangeFunctionTableScanOperator.open();
ArrayList<ExecutionEngine.Schema.Column> columns = new ArrayList<>();
columns.add(new ExecutionEngine.Schema.Column(TIMESTAMP, TIMESTAMP, ExprCoreType.TIMESTAMP));
columns.add(new ExecutionEngine.Schema.Column(VALUE, VALUE, ExprCoreType.DOUBLE));
columns.add(new ExecutionEngine.Schema.Column("instance", "instance", ExprCoreType.STRING));
columns.add(new ExecutionEngine.Schema.Column("__name__", "__name__", ExprCoreType.STRING));
columns.add(new ExecutionEngine.Schema.Column("job", "job", ExprCoreType.STRING));
columns.add(new ExecutionEngine.Schema.Column(TIMESTAMP, TIMESTAMP, ExprCoreType.ARRAY));
columns.add(new ExecutionEngine.Schema.Column(VALUE, VALUE, ExprCoreType.ARRAY));
columns.add(new ExecutionEngine.Schema.Column(LABELS, LABELS, ExprCoreType.STRUCT));
ExecutionEngine.Schema expectedSchema = new ExecutionEngine.Schema(columns);
assertEquals(expectedSchema, queryRangeFunctionTableScanOperator.schema());
}
Expand Down
Loading
Loading