Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Backport 2.x] [Prometheus]Bug fix for less than and greater than operators on @time… #1271

Merged
merged 1 commit into from
Jan 11, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/user/ppl/admin/prometheus_connector.rst
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ Prometheus Connector Limitations
* Only one aggregation is supported in stats command.
* Span Expression is compulsory in stats command.
* AVG, MAX, MIN, SUM, COUNT are the only aggregations supported in prometheus connector.
* Where clause only supports EQUALS(=) operation on metric dimensions and Comparative(> , < , >= , <=) Operations on @timestamp attribute.

Example queries
---------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
import static org.opensearch.sql.util.MatcherUtils.schema;
import static org.opensearch.sql.util.MatcherUtils.verifySchema;

import java.text.SimpleDateFormat;
import java.util.Date;
import lombok.SneakyThrows;
import org.apache.commons.lang3.StringUtils;
import org.json.JSONArray;
Expand Down Expand Up @@ -42,6 +44,27 @@ public void testSourceMetricCommand() {
}
}

@Test
@SneakyThrows
public void testSourceMetricCommandWithTimestamp() {
SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
String query = "source=my_prometheus.prometheus_http_requests_total | where @timestamp > '"
+ format.format(new Date(System.currentTimeMillis() - 3600 * 1000))
+ "' | sort + @timestamp | head 5";

JSONObject response =
executeQuery(query);
verifySchema(response,
schema(VALUE, "double"),
schema(TIMESTAMP, "timestamp"),
schema("handler", "string"),
schema("code", "string"),
schema("instance", "string"),
schema("job", "string"));
// <TODO>Currently, data is not injected into prometheus,
// so asserting on result is not possible. Verifying only schema.
}

@Test
@SneakyThrows
public void testMetricAvgAggregationCommand() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import org.opensearch.sql.expression.ExpressionNodeVisitor;
import org.opensearch.sql.expression.FunctionExpression;
import org.opensearch.sql.expression.ReferenceExpression;
import org.opensearch.sql.expression.function.BuiltinFunctionName;

/**
* This class builds metric selection query from the filter condition
Expand Down Expand Up @@ -47,23 +48,25 @@ public static String build(String metricName, Expression filterCondition) {
static class SeriesSelectionExpressionNodeVisitor extends ExpressionNodeVisitor<String, Object> {
@Override
public String visitFunction(FunctionExpression func, Object context) {
if (func.getFunctionName().getFunctionName().equals("and")) {
if (BuiltinFunctionName.AND.getName().equals(func.getFunctionName())) {
return func.getArguments().stream()
.map(arg -> visitFunction((FunctionExpression) arg, context))
.filter(StringUtils::isNotEmpty)
.collect(Collectors.joining(" , "));
} else if (func.getFunctionName().getFunctionName().contains("=")) {
ReferenceExpression ref = (ReferenceExpression) func.getArguments().get(0);
if (!ref.getAttr().equals(TIMESTAMP)) {
return func.getArguments().get(0)
+ func.getFunctionName().getFunctionName()
+ func.getArguments().get(1);
} else {
return null;
}
} else if ((BuiltinFunctionName.LTE.getName().equals(func.getFunctionName())
|| BuiltinFunctionName.GTE.getName().equals(func.getFunctionName())
|| BuiltinFunctionName.LESS.getName().equals(func.getFunctionName())
|| BuiltinFunctionName.GREATER.getName().equals(func.getFunctionName()))
&& ((ReferenceExpression) func.getArguments().get(0)).getAttr().equals(TIMESTAMP)) {
return null;
} else if (BuiltinFunctionName.EQUAL.getName().equals(func.getFunctionName())) {
return func.getArguments().get(0)
+ func.getFunctionName().getFunctionName()
+ func.getArguments().get(1);
} else {
throw new RuntimeException(
String.format("Prometheus Catalog doesn't support %s in where command.",
String.format("Prometheus Datasource doesn't support %s "
+ "in where command.",
func.getFunctionName().getFunctionName()));
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@

package org.opensearch.sql.prometheus.storage.querybuilder;

import static org.opensearch.sql.prometheus.data.constants.PrometheusFieldConstants.TIMESTAMP;

import java.util.Date;
import lombok.NoArgsConstructor;
import org.apache.commons.math3.util.Pair;
Expand All @@ -15,6 +17,7 @@
import org.opensearch.sql.expression.ExpressionNodeVisitor;
import org.opensearch.sql.expression.FunctionExpression;
import org.opensearch.sql.expression.ReferenceExpression;
import org.opensearch.sql.expression.function.BuiltinFunctionName;

@NoArgsConstructor
public class TimeRangeParametersResolver extends ExpressionNodeVisitor<Void, Object> {
Expand Down Expand Up @@ -53,10 +56,13 @@ public Pair<Long, Long> resolve(Expression filterCondition) {

@Override
public Void visitFunction(FunctionExpression func, Object context) {
if (func.getFunctionName().getFunctionName().contains("=")) {
if ((BuiltinFunctionName.LTE.getName().equals(func.getFunctionName())
|| BuiltinFunctionName.GTE.getName().equals(func.getFunctionName())
|| BuiltinFunctionName.LESS.getName().equals(func.getFunctionName())
|| BuiltinFunctionName.GREATER.getName().equals(func.getFunctionName()))) {
ReferenceExpression ref = (ReferenceExpression) func.getArguments().get(0);
Expression rightExpr = func.getArguments().get(1);
if (ref.getAttr().equals("@timestamp")) {
if (ref.getAttr().equals(TIMESTAMP)) {
ExprValue literalValue = rightExpr.valueOf();
if (func.getFunctionName().getFunctionName().contains(">")) {
startTime = literalValue.timestampValue().toEpochMilli() / 1000;
Expand All @@ -67,6 +73,8 @@ public Void visitFunction(FunctionExpression func, Object context) {
}
} else {
func.getArguments()
.stream()
.filter(arg -> arg instanceof FunctionExpression)
.forEach(arg -> visitFunction((FunctionExpression) arg, context));
}
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -696,7 +696,8 @@ void testImplementWithORConditionInWhereClause() {
DSL.equal(DSL.ref("handler", STRING), DSL.literal(stringValue("/ready/")))));
RuntimeException exception
= assertThrows(RuntimeException.class, () -> prometheusMetricTable.implement(plan));
assertEquals("Prometheus Catalog doesn't support or in where command.", exception.getMessage());
assertEquals("Prometheus Datasource doesn't support or in where command.",
exception.getMessage());
}

@Test
Expand Down Expand Up @@ -753,6 +754,66 @@ void testImplementWithRelationAndTimestampFilter() {
assertEquals(List.of(VALUE, TIMESTAMP), outputFields);
}


@Test
void testImplementWithRelationAndTimestampLTFilter() {
List<NamedExpression> finalProjectList = new ArrayList<>();
finalProjectList.add(DSL.named(VALUE, DSL.ref(VALUE, STRING)));
finalProjectList.add(DSL.named(TIMESTAMP, DSL.ref(TIMESTAMP, ExprCoreType.TIMESTAMP)));
DateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
Long endTime = new Date(System.currentTimeMillis()).getTime();
PrometheusMetricTable prometheusMetricTable =
new PrometheusMetricTable(client, "prometheus_http_total_requests");
LogicalPlan logicalPlan = project(indexScan("prometheus_http_total_requests",
DSL.less(DSL.ref("@timestamp", ExprCoreType.TIMESTAMP),
DSL.literal(
fromObjectValue(dateFormat.format(new Date(endTime)),
ExprCoreType.TIMESTAMP)))
), finalProjectList, null);
PhysicalPlan physicalPlan = prometheusMetricTable.implement(logicalPlan);
assertTrue(physicalPlan instanceof ProjectOperator);
assertTrue(((ProjectOperator) physicalPlan).getInput() instanceof PrometheusMetricScan);
PrometheusQueryRequest request
= ((PrometheusMetricScan) ((ProjectOperator) physicalPlan).getInput()).getRequest();
assertEquals((3600 / 250) + "s", request.getStep());
assertEquals("prometheus_http_total_requests",
request.getPromQl());
List<NamedExpression> projectList = ((ProjectOperator) physicalPlan).getProjectList();
List<String> outputFields
= projectList.stream().map(NamedExpression::getName).collect(Collectors.toList());
assertEquals(List.of(VALUE, TIMESTAMP), outputFields);
}


@Test
void testImplementWithRelationAndTimestampGTFilter() {
List<NamedExpression> finalProjectList = new ArrayList<>();
finalProjectList.add(DSL.named(VALUE, DSL.ref(VALUE, STRING)));
finalProjectList.add(DSL.named(TIMESTAMP, DSL.ref(TIMESTAMP, ExprCoreType.TIMESTAMP)));
DateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
Long endTime = new Date(System.currentTimeMillis()).getTime();
PrometheusMetricTable prometheusMetricTable =
new PrometheusMetricTable(client, "prometheus_http_total_requests");
LogicalPlan logicalPlan = project(indexScan("prometheus_http_total_requests",
DSL.greater(DSL.ref("@timestamp", ExprCoreType.TIMESTAMP),
DSL.literal(
fromObjectValue(dateFormat.format(new Date(endTime)),
ExprCoreType.TIMESTAMP)))
), finalProjectList, null);
PhysicalPlan physicalPlan = prometheusMetricTable.implement(logicalPlan);
assertTrue(physicalPlan instanceof ProjectOperator);
assertTrue(((ProjectOperator) physicalPlan).getInput() instanceof PrometheusMetricScan);
PrometheusQueryRequest request
= ((PrometheusMetricScan) ((ProjectOperator) physicalPlan).getInput()).getRequest();
assertEquals((3600 / 250) + "s", request.getStep());
assertEquals("prometheus_http_total_requests",
request.getPromQl());
List<NamedExpression> projectList = ((ProjectOperator) physicalPlan).getProjectList();
List<String> outputFields
= projectList.stream().map(NamedExpression::getName).collect(Collectors.toList());
assertEquals(List.of(VALUE, TIMESTAMP), outputFields);
}

@Test
void testOptimize() {
PrometheusQueryRequest prometheusQueryRequest = new PrometheusQueryRequest();
Expand Down Expand Up @@ -803,4 +864,41 @@ void testImplementPrometheusQueryWithBackQuotedFieldNamesInStatsQuery() {
prometheusQueryRequest.getPromQl());

}

@Test
void testImplementPrometheusQueryWithFilterQuery() {

PrometheusMetricTable prometheusMetricTable =
new PrometheusMetricTable(client, "prometheus_http_total_requests");

// IndexScanAgg without Filter
PhysicalPlan plan = prometheusMetricTable.implement(
indexScan("prometheus_http_total_requests",
DSL.and(DSL.equal(DSL.ref("code", STRING), DSL.literal(stringValue("200"))),
DSL.equal(DSL.ref("handler", STRING), DSL.literal(stringValue("/ready/"))))));

assertTrue(plan instanceof PrometheusMetricScan);
PrometheusQueryRequest prometheusQueryRequest =
((PrometheusMetricScan) plan).getRequest();
assertEquals(
"prometheus_http_total_requests{code=\"200\" , handler=\"/ready/\"}",
prometheusQueryRequest.getPromQl());
}

@Test
void testImplementPrometheusQueryWithUnsupportedFilterQuery() {

PrometheusMetricTable prometheusMetricTable =
new PrometheusMetricTable(client, "prometheus_http_total_requests");

RuntimeException exception = assertThrows(RuntimeException.class,
() -> prometheusMetricTable.implement(indexScan("prometheus_http_total_requests",
DSL.and(DSL.lte(DSL.ref("code", STRING), DSL.literal(stringValue("200"))),
DSL.equal(DSL.ref("handler", STRING), DSL.literal(stringValue("/ready/")))))));
assertEquals("Prometheus Datasource doesn't support <= in where command.",
exception.getMessage());
}



}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
*
* * Copyright OpenSearch Contributors
* * SPDX-License-Identifier: Apache-2.0
*
*/

package org.opensearch.sql.prometheus.storage.querybuilders;

import static org.opensearch.sql.data.model.ExprValueUtils.stringValue;
import static org.opensearch.sql.data.type.ExprCoreType.STRING;

import org.apache.commons.math3.util.Pair;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.opensearch.sql.expression.DSL;
import org.opensearch.sql.prometheus.storage.querybuilder.TimeRangeParametersResolver;

public class TimeRangeParametersResolverTest {

@Test
void testTimeRangeParametersWithoutTimestampFilter() {
TimeRangeParametersResolver timeRangeParametersResolver = new TimeRangeParametersResolver();
Pair<Long, Long> result = timeRangeParametersResolver.resolve(
DSL.and(DSL.less(DSL.ref("code", STRING), DSL.literal(stringValue("200"))),
DSL.equal(DSL.ref("handler", STRING), DSL.literal(stringValue("/ready/")))));
Assertions.assertNotNull(result);
Assertions.assertEquals(3600, result.getSecond() - result.getFirst());
}
}