diff --git a/docs/user/ppl/admin/prometheus_connector.rst b/docs/user/ppl/admin/prometheus_connector.rst index fd84f8114c..237181778e 100644 --- a/docs/user/ppl/admin/prometheus_connector.rst +++ b/docs/user/ppl/admin/prometheus_connector.rst @@ -111,6 +111,7 @@ Prometheus Connector Limitations * Only one aggregation is supported in stats command. * Span Expression is compulsory in stats command. * AVG, MAX, MIN, SUM, COUNT are the only aggregations supported in prometheus connector. +* Where clause only supports EQUALS(=) operation on metric dimensions and Comparative(> , < , >= , <=) Operations on @timestamp attribute. Example queries --------------- diff --git a/integ-test/src/test/java/org/opensearch/sql/ppl/PrometheusDataSourceCommandsIT.java b/integ-test/src/test/java/org/opensearch/sql/ppl/PrometheusDataSourceCommandsIT.java index 292ea26fc1..492a40066d 100644 --- a/integ-test/src/test/java/org/opensearch/sql/ppl/PrometheusDataSourceCommandsIT.java +++ b/integ-test/src/test/java/org/opensearch/sql/ppl/PrometheusDataSourceCommandsIT.java @@ -12,6 +12,8 @@ import static org.opensearch.sql.util.MatcherUtils.schema; import static org.opensearch.sql.util.MatcherUtils.verifySchema; +import java.text.SimpleDateFormat; +import java.util.Date; import lombok.SneakyThrows; import org.apache.commons.lang3.StringUtils; import org.json.JSONArray; @@ -42,6 +44,27 @@ public void testSourceMetricCommand() { } } + @Test + @SneakyThrows + public void testSourceMetricCommandWithTimestamp() { + SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); + String query = "source=my_prometheus.prometheus_http_requests_total | where @timestamp > '" + + format.format(new Date(System.currentTimeMillis() - 3600 * 1000)) + + "' | sort + @timestamp | head 5"; + + JSONObject response = + executeQuery(query); + verifySchema(response, + schema(VALUE, "double"), + schema(TIMESTAMP, "timestamp"), + schema("handler", "string"), + schema("code", "string"), + schema("instance", "string"), + schema("job", "string")); + // Currently, data is not injected into prometheus, + // so asserting on result is not possible. Verifying only schema. + } + @Test @SneakyThrows public void testMetricAvgAggregationCommand() { diff --git a/prometheus/src/main/java/org/opensearch/sql/prometheus/storage/querybuilder/SeriesSelectionQueryBuilder.java b/prometheus/src/main/java/org/opensearch/sql/prometheus/storage/querybuilder/SeriesSelectionQueryBuilder.java index c749c12758..461b5341f8 100644 --- a/prometheus/src/main/java/org/opensearch/sql/prometheus/storage/querybuilder/SeriesSelectionQueryBuilder.java +++ b/prometheus/src/main/java/org/opensearch/sql/prometheus/storage/querybuilder/SeriesSelectionQueryBuilder.java @@ -17,6 +17,7 @@ import org.opensearch.sql.expression.ExpressionNodeVisitor; import org.opensearch.sql.expression.FunctionExpression; import org.opensearch.sql.expression.ReferenceExpression; +import org.opensearch.sql.expression.function.BuiltinFunctionName; /** * This class builds metric selection query from the filter condition @@ -47,23 +48,25 @@ public static String build(String metricName, Expression filterCondition) { static class SeriesSelectionExpressionNodeVisitor extends ExpressionNodeVisitor { @Override public String visitFunction(FunctionExpression func, Object context) { - if (func.getFunctionName().getFunctionName().equals("and")) { + if (BuiltinFunctionName.AND.getName().equals(func.getFunctionName())) { return func.getArguments().stream() .map(arg -> visitFunction((FunctionExpression) arg, context)) .filter(StringUtils::isNotEmpty) .collect(Collectors.joining(" , ")); - } else if (func.getFunctionName().getFunctionName().contains("=")) { - ReferenceExpression ref = (ReferenceExpression) func.getArguments().get(0); - if (!ref.getAttr().equals(TIMESTAMP)) { - return func.getArguments().get(0) - + func.getFunctionName().getFunctionName() - + func.getArguments().get(1); - } else { - return null; - } + } else if ((BuiltinFunctionName.LTE.getName().equals(func.getFunctionName()) + || BuiltinFunctionName.GTE.getName().equals(func.getFunctionName()) + || BuiltinFunctionName.LESS.getName().equals(func.getFunctionName()) + || BuiltinFunctionName.GREATER.getName().equals(func.getFunctionName())) + && ((ReferenceExpression) func.getArguments().get(0)).getAttr().equals(TIMESTAMP)) { + return null; + } else if (BuiltinFunctionName.EQUAL.getName().equals(func.getFunctionName())) { + return func.getArguments().get(0) + + func.getFunctionName().getFunctionName() + + func.getArguments().get(1); } else { throw new RuntimeException( - String.format("Prometheus Catalog doesn't support %s in where command.", + String.format("Prometheus Datasource doesn't support %s " + + "in where command.", func.getFunctionName().getFunctionName())); } } diff --git a/prometheus/src/main/java/org/opensearch/sql/prometheus/storage/querybuilder/TimeRangeParametersResolver.java b/prometheus/src/main/java/org/opensearch/sql/prometheus/storage/querybuilder/TimeRangeParametersResolver.java index 810ed71379..b462f6bafe 100644 --- a/prometheus/src/main/java/org/opensearch/sql/prometheus/storage/querybuilder/TimeRangeParametersResolver.java +++ b/prometheus/src/main/java/org/opensearch/sql/prometheus/storage/querybuilder/TimeRangeParametersResolver.java @@ -7,6 +7,8 @@ package org.opensearch.sql.prometheus.storage.querybuilder; +import static org.opensearch.sql.prometheus.data.constants.PrometheusFieldConstants.TIMESTAMP; + import java.util.Date; import lombok.NoArgsConstructor; import org.apache.commons.math3.util.Pair; @@ -15,6 +17,7 @@ import org.opensearch.sql.expression.ExpressionNodeVisitor; import org.opensearch.sql.expression.FunctionExpression; import org.opensearch.sql.expression.ReferenceExpression; +import org.opensearch.sql.expression.function.BuiltinFunctionName; @NoArgsConstructor public class TimeRangeParametersResolver extends ExpressionNodeVisitor { @@ -53,10 +56,13 @@ public Pair resolve(Expression filterCondition) { @Override public Void visitFunction(FunctionExpression func, Object context) { - if (func.getFunctionName().getFunctionName().contains("=")) { + if ((BuiltinFunctionName.LTE.getName().equals(func.getFunctionName()) + || BuiltinFunctionName.GTE.getName().equals(func.getFunctionName()) + || BuiltinFunctionName.LESS.getName().equals(func.getFunctionName()) + || BuiltinFunctionName.GREATER.getName().equals(func.getFunctionName()))) { ReferenceExpression ref = (ReferenceExpression) func.getArguments().get(0); Expression rightExpr = func.getArguments().get(1); - if (ref.getAttr().equals("@timestamp")) { + if (ref.getAttr().equals(TIMESTAMP)) { ExprValue literalValue = rightExpr.valueOf(); if (func.getFunctionName().getFunctionName().contains(">")) { startTime = literalValue.timestampValue().toEpochMilli() / 1000; @@ -67,6 +73,8 @@ public Void visitFunction(FunctionExpression func, Object context) { } } else { func.getArguments() + .stream() + .filter(arg -> arg instanceof FunctionExpression) .forEach(arg -> visitFunction((FunctionExpression) arg, context)); } return null; diff --git a/prometheus/src/test/java/org/opensearch/sql/prometheus/storage/PrometheusMetricTableTest.java b/prometheus/src/test/java/org/opensearch/sql/prometheus/storage/PrometheusMetricTableTest.java index b03b0b9ebc..01e3e8d899 100644 --- a/prometheus/src/test/java/org/opensearch/sql/prometheus/storage/PrometheusMetricTableTest.java +++ b/prometheus/src/test/java/org/opensearch/sql/prometheus/storage/PrometheusMetricTableTest.java @@ -696,7 +696,8 @@ void testImplementWithORConditionInWhereClause() { DSL.equal(DSL.ref("handler", STRING), DSL.literal(stringValue("/ready/"))))); RuntimeException exception = assertThrows(RuntimeException.class, () -> prometheusMetricTable.implement(plan)); - assertEquals("Prometheus Catalog doesn't support or in where command.", exception.getMessage()); + assertEquals("Prometheus Datasource doesn't support or in where command.", + exception.getMessage()); } @Test @@ -753,6 +754,66 @@ void testImplementWithRelationAndTimestampFilter() { assertEquals(List.of(VALUE, TIMESTAMP), outputFields); } + + @Test + void testImplementWithRelationAndTimestampLTFilter() { + List finalProjectList = new ArrayList<>(); + finalProjectList.add(DSL.named(VALUE, DSL.ref(VALUE, STRING))); + finalProjectList.add(DSL.named(TIMESTAMP, DSL.ref(TIMESTAMP, ExprCoreType.TIMESTAMP))); + DateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); + Long endTime = new Date(System.currentTimeMillis()).getTime(); + PrometheusMetricTable prometheusMetricTable = + new PrometheusMetricTable(client, "prometheus_http_total_requests"); + LogicalPlan logicalPlan = project(indexScan("prometheus_http_total_requests", + DSL.less(DSL.ref("@timestamp", ExprCoreType.TIMESTAMP), + DSL.literal( + fromObjectValue(dateFormat.format(new Date(endTime)), + ExprCoreType.TIMESTAMP))) + ), finalProjectList, null); + PhysicalPlan physicalPlan = prometheusMetricTable.implement(logicalPlan); + assertTrue(physicalPlan instanceof ProjectOperator); + assertTrue(((ProjectOperator) physicalPlan).getInput() instanceof PrometheusMetricScan); + PrometheusQueryRequest request + = ((PrometheusMetricScan) ((ProjectOperator) physicalPlan).getInput()).getRequest(); + assertEquals((3600 / 250) + "s", request.getStep()); + assertEquals("prometheus_http_total_requests", + request.getPromQl()); + List projectList = ((ProjectOperator) physicalPlan).getProjectList(); + List outputFields + = projectList.stream().map(NamedExpression::getName).collect(Collectors.toList()); + assertEquals(List.of(VALUE, TIMESTAMP), outputFields); + } + + + @Test + void testImplementWithRelationAndTimestampGTFilter() { + List finalProjectList = new ArrayList<>(); + finalProjectList.add(DSL.named(VALUE, DSL.ref(VALUE, STRING))); + finalProjectList.add(DSL.named(TIMESTAMP, DSL.ref(TIMESTAMP, ExprCoreType.TIMESTAMP))); + DateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); + Long endTime = new Date(System.currentTimeMillis()).getTime(); + PrometheusMetricTable prometheusMetricTable = + new PrometheusMetricTable(client, "prometheus_http_total_requests"); + LogicalPlan logicalPlan = project(indexScan("prometheus_http_total_requests", + DSL.greater(DSL.ref("@timestamp", ExprCoreType.TIMESTAMP), + DSL.literal( + fromObjectValue(dateFormat.format(new Date(endTime)), + ExprCoreType.TIMESTAMP))) + ), finalProjectList, null); + PhysicalPlan physicalPlan = prometheusMetricTable.implement(logicalPlan); + assertTrue(physicalPlan instanceof ProjectOperator); + assertTrue(((ProjectOperator) physicalPlan).getInput() instanceof PrometheusMetricScan); + PrometheusQueryRequest request + = ((PrometheusMetricScan) ((ProjectOperator) physicalPlan).getInput()).getRequest(); + assertEquals((3600 / 250) + "s", request.getStep()); + assertEquals("prometheus_http_total_requests", + request.getPromQl()); + List projectList = ((ProjectOperator) physicalPlan).getProjectList(); + List outputFields + = projectList.stream().map(NamedExpression::getName).collect(Collectors.toList()); + assertEquals(List.of(VALUE, TIMESTAMP), outputFields); + } + @Test void testOptimize() { PrometheusQueryRequest prometheusQueryRequest = new PrometheusQueryRequest(); @@ -803,4 +864,41 @@ void testImplementPrometheusQueryWithBackQuotedFieldNamesInStatsQuery() { prometheusQueryRequest.getPromQl()); } + + @Test + void testImplementPrometheusQueryWithFilterQuery() { + + PrometheusMetricTable prometheusMetricTable = + new PrometheusMetricTable(client, "prometheus_http_total_requests"); + + // IndexScanAgg without Filter + PhysicalPlan plan = prometheusMetricTable.implement( + indexScan("prometheus_http_total_requests", + DSL.and(DSL.equal(DSL.ref("code", STRING), DSL.literal(stringValue("200"))), + DSL.equal(DSL.ref("handler", STRING), DSL.literal(stringValue("/ready/")))))); + + assertTrue(plan instanceof PrometheusMetricScan); + PrometheusQueryRequest prometheusQueryRequest = + ((PrometheusMetricScan) plan).getRequest(); + assertEquals( + "prometheus_http_total_requests{code=\"200\" , handler=\"/ready/\"}", + prometheusQueryRequest.getPromQl()); + } + + @Test + void testImplementPrometheusQueryWithUnsupportedFilterQuery() { + + PrometheusMetricTable prometheusMetricTable = + new PrometheusMetricTable(client, "prometheus_http_total_requests"); + + RuntimeException exception = assertThrows(RuntimeException.class, + () -> prometheusMetricTable.implement(indexScan("prometheus_http_total_requests", + DSL.and(DSL.lte(DSL.ref("code", STRING), DSL.literal(stringValue("200"))), + DSL.equal(DSL.ref("handler", STRING), DSL.literal(stringValue("/ready/"))))))); + assertEquals("Prometheus Datasource doesn't support <= in where command.", + exception.getMessage()); + } + + + } diff --git a/prometheus/src/test/java/org/opensearch/sql/prometheus/storage/querybuilders/TimeRangeParametersResolverTest.java b/prometheus/src/test/java/org/opensearch/sql/prometheus/storage/querybuilders/TimeRangeParametersResolverTest.java new file mode 100644 index 0000000000..73839e2152 --- /dev/null +++ b/prometheus/src/test/java/org/opensearch/sql/prometheus/storage/querybuilders/TimeRangeParametersResolverTest.java @@ -0,0 +1,30 @@ +/* + * + * * Copyright OpenSearch Contributors + * * SPDX-License-Identifier: Apache-2.0 + * + */ + +package org.opensearch.sql.prometheus.storage.querybuilders; + +import static org.opensearch.sql.data.model.ExprValueUtils.stringValue; +import static org.opensearch.sql.data.type.ExprCoreType.STRING; + +import org.apache.commons.math3.util.Pair; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; +import org.opensearch.sql.expression.DSL; +import org.opensearch.sql.prometheus.storage.querybuilder.TimeRangeParametersResolver; + +public class TimeRangeParametersResolverTest { + + @Test + void testTimeRangeParametersWithoutTimestampFilter() { + TimeRangeParametersResolver timeRangeParametersResolver = new TimeRangeParametersResolver(); + Pair result = timeRangeParametersResolver.resolve( + DSL.and(DSL.less(DSL.ref("code", STRING), DSL.literal(stringValue("200"))), + DSL.equal(DSL.ref("handler", STRING), DSL.literal(stringValue("/ready/"))))); + Assertions.assertNotNull(result); + Assertions.assertEquals(3600, result.getSecond() - result.getFirst()); + } +}