diff --git a/core/src/test/java/org/opensearch/sql/analysis/AnalyzerTestBase.java b/core/src/test/java/org/opensearch/sql/analysis/AnalyzerTestBase.java index d7222d466f..b6e2600041 100644 --- a/core/src/test/java/org/opensearch/sql/analysis/AnalyzerTestBase.java +++ b/core/src/test/java/org/opensearch/sql/analysis/AnalyzerTestBase.java @@ -70,7 +70,7 @@ public Pair resolve( FunctionSignature unresolvedSignature) { FunctionName functionName = FunctionName.of("query_range"); FunctionSignature functionSignature = - new FunctionSignature(functionName, List.of(STRING, LONG, LONG, LONG)); + new FunctionSignature(functionName, List.of(STRING, LONG, LONG, STRING)); return Pair.of( functionSignature, (functionProperties, args) -> diff --git a/docs/user/ppl/admin/prometheus_connector.rst b/docs/user/ppl/admin/prometheus_connector.rst index 15f1f2445c..1dfe6cda22 100644 --- a/docs/user/ppl/admin/prometheus_connector.rst +++ b/docs/user/ppl/admin/prometheus_connector.rst @@ -191,12 +191,11 @@ PromQL Support for prometheus Connector `query_range` Table Function ---------------------------- -Prometheus connector offers `query_range` table function. This table function can be used to query metrics in a specific time range using promQL. -The function takes inputs similar to parameters mentioned for query range api mentioned here: https://prometheus.io/docs/prometheus/latest/querying/api/ -Arguments should be either passed by name or positionArguments should be either passed by name or position. -`source=my_prometheus.query_range('prometheus_http_requests_total', 1686694425, 1686700130, 14)` -or -`source=my_prometheus.query_range(query='prometheus_http_requests_total', starttime=1686694425, endtime=1686700130, step=14)` +* Prometheus connector offers `query_range` table function. This table function can be used to query metrics in a specific time range using promQL. +* The function takes inputs similar to parameters mentioned for query range api mentioned here: https://prometheus.io/docs/prometheus/latest/querying/api/ +* Arguments should be either passed by name or positionArguments should be either passed by name or position. + - `source=my_prometheus.query_range('prometheus_http_requests_total', 1686694425, 1686700130, 14)` + - `source=my_prometheus.query_range(query='prometheus_http_requests_total', starttime=1686694425, endtime=1686700130, step=14)` Example:: > source=my_prometheus.query_range('prometheus_http_requests_total', 1686694425, 1686700130, 14) @@ -210,3 +209,71 @@ Example:: | 9 | "2022-11-03 07:18:54" | "/-/promql" | 400 | 192.15.2.1 | prometheus | | 11 | "2022-11-03 07:18:64" |"/-/metrics" | 500 | 192.15.2.1 | prometheus | +------------+------------------------+--------------------------------+---------------+-------------+-------------+ + + +Prometheus Connector Table Functions +========================================== + +`query_exemplars` Table Function +---------------------------- +* This table function can be used to fetch exemplars of a query in a specific time range. +* The function takes inputs similar to parameters mentioned for query exemplars api mentioned here: https://prometheus.io/docs/prometheus/latest/querying/api/ +* Arguments should be either passed by name or positionArguments should be either passed by name or position. + - `source=my_prometheus.query_exemplars('prometheus_http_requests_total', 1686694425, 1686700130)` + - `source=my_prometheus.query_exemplars(query='prometheus_http_requests_total', starttime=1686694425, endtime=1686700130)` +Example:: + + > source=my_prometheus.query_exemplars('prometheus_http_requests_total', 1686694425, 1686700130) + "schema": [ + { + "name": "seriesLabels", + "type": "struct" + }, + { + "name": "exemplars", + "type": "array" + } + ], + "datarows": [ + [ + { + "instance": "localhost:8090", + "__name__": "test_exemplar_metric_total", + "service": "bar", + "job": "prometheus" + }, + [ + { + "labels": { + "traceID": "EpTxMJ40fUus7aGY" + }, + "timestamp": "2020-09-14 15:22:25.479", + "value": 6.0 + } + ] + ], + [ + { + "instance": "localhost:8090", + "__name__": "test_exemplar_metric_total", + "service": "foo", + "job": "prometheus" + }, + [ + { + "labels": { + "traceID": "Olp9XHlq763ccsfa" + }, + "timestamp": "2020-09-14 15:22:35.479", + "value": 19.0 + }, + { + "labels": { + "traceID": "hCtjygkIHwAN9vs4" + }, + "timestamp": "2020-09-14 15:22:45.489", + "value": 20.0 + } + ] + ] + ] diff --git a/integ-test/build.gradle b/integ-test/build.gradle index 95762d079b..4a5f2015e0 100644 --- a/integ-test/build.gradle +++ b/integ-test/build.gradle @@ -38,7 +38,7 @@ apply plugin: 'java' apply plugin: 'io.freefair.lombok' apply plugin: 'com.wiredforcode.spawn' -String baseVersion = "2.9.0" +String baseVersion = "2.10.0" String bwcVersion = baseVersion + ".0"; String baseName = "sqlBwcCluster" String bwcFilePath = "src/test/resources/bwc/" diff --git a/integ-test/src/test/java/org/opensearch/sql/ppl/PrometheusDataSourceCommandsIT.java b/integ-test/src/test/java/org/opensearch/sql/ppl/PrometheusDataSourceCommandsIT.java index d0b682594b..011f91eed5 100644 --- a/integ-test/src/test/java/org/opensearch/sql/ppl/PrometheusDataSourceCommandsIT.java +++ b/integ-test/src/test/java/org/opensearch/sql/ppl/PrometheusDataSourceCommandsIT.java @@ -23,7 +23,6 @@ import java.nio.file.Paths; import java.text.SimpleDateFormat; import java.util.Date; -import lombok.Data; import lombok.SneakyThrows; import org.apache.commons.lang3.StringUtils; import org.json.JSONArray; @@ -231,11 +230,11 @@ public void testQueryRange() { long currentTimestamp = new Date().getTime(); JSONObject response = executeQuery("source=my_prometheus.query_range('prometheus_http_requests_total'," - + ((currentTimestamp/1000)-3600) + "," + currentTimestamp/1000 + ", " + 14 + ")" ); + + ((currentTimestamp/1000)-3600) + "," + currentTimestamp/1000 + ", " + "'14'" + ")" ); verifySchema(response, + schema(LABELS, "struct"), schema(VALUE, "array"), - schema(TIMESTAMP, "array"), - schema(LABELS, "struct")); + schema(TIMESTAMP, "array")); Assertions.assertTrue(response.getInt("size") > 0); } @@ -249,8 +248,20 @@ public void explainQueryRange() throws Exception { ); } + @Test + public void testExplainForQueryExemplars() throws Exception { + String expected = loadFromFile("expectedOutput/ppl/explain_query_exemplars.json"); + assertJsonEquals( + expected, + explainQueryToString("source = my_prometheus." + + "query_exemplars('app_ads_ad_requests_total',1689228292,1689232299)") + ); + } + String loadFromFile(String filename) throws Exception { URI uri = Resources.getResource(filename).toURI(); return new String(Files.readAllBytes(Paths.get(uri))); } + + } diff --git a/integ-test/src/test/resources/expectedOutput/ppl/explain_query_exemplars.json b/integ-test/src/test/resources/expectedOutput/ppl/explain_query_exemplars.json new file mode 100644 index 0000000000..b7af2e79b0 --- /dev/null +++ b/integ-test/src/test/resources/expectedOutput/ppl/explain_query_exemplars.json @@ -0,0 +1,9 @@ +{ + "root": { + "name": "QueryExemplarsFunctionTableScanOperator", + "description": { + "request": "query_exemplars(app_ads_ad_requests_total, 1689228292, 1689232299)" + }, + "children": [] + } +} \ No newline at end of file diff --git a/prometheus/src/main/java/org/opensearch/sql/prometheus/client/PrometheusClient.java b/prometheus/src/main/java/org/opensearch/sql/prometheus/client/PrometheusClient.java index ebc3e2dd39..f58ca56a8c 100644 --- a/prometheus/src/main/java/org/opensearch/sql/prometheus/client/PrometheusClient.java +++ b/prometheus/src/main/java/org/opensearch/sql/prometheus/client/PrometheusClient.java @@ -8,6 +8,7 @@ import java.io.IOException; import java.util.List; import java.util.Map; +import org.json.JSONArray; import org.json.JSONObject; import org.opensearch.sql.prometheus.request.system.model.MetricMetadata; @@ -18,4 +19,6 @@ public interface PrometheusClient { List getLabels(String metricName) throws IOException; Map> getAllMetrics() throws IOException; + + JSONArray queryExemplars(String query, Long start, Long end) throws IOException; } diff --git a/prometheus/src/main/java/org/opensearch/sql/prometheus/client/PrometheusClientImpl.java b/prometheus/src/main/java/org/opensearch/sql/prometheus/client/PrometheusClientImpl.java index cc625fc9ec..9472be7487 100644 --- a/prometheus/src/main/java/org/opensearch/sql/prometheus/client/PrometheusClientImpl.java +++ b/prometheus/src/main/java/org/opensearch/sql/prometheus/client/PrometheusClientImpl.java @@ -83,6 +83,20 @@ public Map> getAllMetrics() throws IOException { return new ObjectMapper().readValue(jsonObject.getJSONObject("data").toString(), typeRef); } + @Override + public JSONArray queryExemplars(String query, Long start, Long end) throws IOException { + String queryUrl = String.format("%s/api/v1/query_exemplars?query=%s&start=%s&end=%s", + uri.toString().replaceAll("/$", ""), URLEncoder.encode(query, StandardCharsets.UTF_8), + start, end); + logger.debug("queryUrl: " + queryUrl); + Request request = new Request.Builder() + .url(queryUrl) + .build(); + Response response = this.okHttpClient.newCall(request).execute(); + JSONObject jsonObject = readResponse(response); + return jsonObject.getJSONArray("data"); + } + private List toListOfLabels(JSONArray array) { List result = new ArrayList<>(); for (int i = 0; i < array.length(); i++) { diff --git a/prometheus/src/main/java/org/opensearch/sql/prometheus/data/constants/PrometheusFieldConstants.java b/prometheus/src/main/java/org/opensearch/sql/prometheus/data/constants/PrometheusFieldConstants.java index 1afab200b3..88e9df6a88 100644 --- a/prometheus/src/main/java/org/opensearch/sql/prometheus/data/constants/PrometheusFieldConstants.java +++ b/prometheus/src/main/java/org/opensearch/sql/prometheus/data/constants/PrometheusFieldConstants.java @@ -9,4 +9,15 @@ public class PrometheusFieldConstants { public static final String TIMESTAMP = "@timestamp"; public static final String VALUE = "@value"; public static final String LABELS = "@labels"; + public static final String MATRIX_KEY = "matrix"; + public static final String RESULT_TYPE_KEY = "resultType"; + public static final String METRIC_KEY = "metric"; + public static final String RESULT_KEY = "result"; + public static final String VALUES_KEY = "values"; + public static final String SERIES_LABELS_KEY = "seriesLabels"; + public static final String EXEMPLARS_KEY = "exemplars"; + public static final String TRACE_ID_KEY = "traceID"; + public static final String LABELS_KEY = "labels"; + public static final String TIMESTAMP_KEY = "timestamp"; + public static final String VALUE_KEY = "value"; } diff --git a/prometheus/src/main/java/org/opensearch/sql/prometheus/functions/implementation/QueryExemplarFunctionImplementation.java b/prometheus/src/main/java/org/opensearch/sql/prometheus/functions/implementation/QueryExemplarFunctionImplementation.java new file mode 100644 index 0000000000..9d455b3cfc --- /dev/null +++ b/prometheus/src/main/java/org/opensearch/sql/prometheus/functions/implementation/QueryExemplarFunctionImplementation.java @@ -0,0 +1,105 @@ +/* + * + * * Copyright OpenSearch Contributors + * * SPDX-License-Identifier: Apache-2.0 + * + */ + +package org.opensearch.sql.prometheus.functions.implementation; + +import static org.opensearch.sql.prometheus.functions.resolver.QueryRangeTableFunctionResolver.ENDTIME; +import static org.opensearch.sql.prometheus.functions.resolver.QueryRangeTableFunctionResolver.QUERY; +import static org.opensearch.sql.prometheus.functions.resolver.QueryRangeTableFunctionResolver.STARTTIME; + +import java.util.List; +import java.util.stream.Collectors; +import org.opensearch.sql.data.model.ExprValue; +import org.opensearch.sql.data.type.ExprCoreType; +import org.opensearch.sql.data.type.ExprType; +import org.opensearch.sql.exception.ExpressionEvaluationException; +import org.opensearch.sql.expression.Expression; +import org.opensearch.sql.expression.FunctionExpression; +import org.opensearch.sql.expression.NamedArgumentExpression; +import org.opensearch.sql.expression.env.Environment; +import org.opensearch.sql.expression.function.FunctionName; +import org.opensearch.sql.expression.function.TableFunctionImplementation; +import org.opensearch.sql.prometheus.client.PrometheusClient; +import org.opensearch.sql.prometheus.request.PrometheusQueryExemplarsRequest; +import org.opensearch.sql.prometheus.storage.QueryExemplarsTable; +import org.opensearch.sql.storage.Table; + +public class QueryExemplarFunctionImplementation extends FunctionExpression implements + TableFunctionImplementation { + + private final FunctionName functionName; + private final List arguments; + private final PrometheusClient prometheusClient; + + /** + * Required argument constructor. + * + * @param functionName name of the function + * @param arguments a list of arguments provided + */ + public QueryExemplarFunctionImplementation(FunctionName functionName, List arguments, + PrometheusClient prometheusClient) { + super(functionName, arguments); + this.functionName = functionName; + this.arguments = arguments; + this.prometheusClient = prometheusClient; + } + + @Override + public ExprValue valueOf(Environment valueEnv) { + throw new UnsupportedOperationException(String.format( + "Prometheus defined function [%s] is only " + + "supported in SOURCE clause with prometheus connector catalog", + functionName)); + } + + @Override + public ExprType type() { + return ExprCoreType.STRUCT; + } + + @Override + public String toString() { + List args = arguments.stream() + .map(arg -> String.format("%s=%s", ((NamedArgumentExpression) arg) + .getArgName(), ((NamedArgumentExpression) arg).getValue().toString())) + .collect(Collectors.toList()); + return String.format("%s(%s)", functionName, String.join(", ", args)); + } + + @Override + public Table applyArguments() { + return new QueryExemplarsTable(prometheusClient, buildExemplarsQueryRequest(arguments)); + } + + private PrometheusQueryExemplarsRequest buildExemplarsQueryRequest(List arguments) { + + PrometheusQueryExemplarsRequest request = new PrometheusQueryExemplarsRequest(); + arguments.forEach(arg -> { + String argName = ((NamedArgumentExpression) arg).getArgName(); + Expression argValue = ((NamedArgumentExpression) arg).getValue(); + ExprValue literalValue = argValue.valueOf(); + switch (argName) { + case QUERY: + request + .setQuery((String) literalValue.value()); + break; + case STARTTIME: + request.setStartTime(((Number) literalValue.value()).longValue()); + break; + case ENDTIME: + request.setEndTime(((Number) literalValue.value()).longValue()); + break; + default: + throw new ExpressionEvaluationException( + String.format("Invalid Function Argument:%s", argName)); + } + }); + return request; + } + +} diff --git a/prometheus/src/main/java/org/opensearch/sql/prometheus/functions/resolver/QueryExemplarsTableFunctionResolver.java b/prometheus/src/main/java/org/opensearch/sql/prometheus/functions/resolver/QueryExemplarsTableFunctionResolver.java new file mode 100644 index 0000000000..a82e5a397a --- /dev/null +++ b/prometheus/src/main/java/org/opensearch/sql/prometheus/functions/resolver/QueryExemplarsTableFunctionResolver.java @@ -0,0 +1,58 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.prometheus.functions.resolver; + +import static org.opensearch.sql.data.type.ExprCoreType.LONG; +import static org.opensearch.sql.data.type.ExprCoreType.STRING; +import static org.opensearch.sql.prometheus.utils.TableFunctionUtils.getNamedArgumentsOfTableFunction; +import static org.opensearch.sql.prometheus.utils.TableFunctionUtils.validatePrometheusTableFunctionArguments; + +import java.util.List; +import lombok.RequiredArgsConstructor; +import org.apache.commons.lang3.tuple.Pair; +import org.opensearch.sql.expression.Expression; +import org.opensearch.sql.expression.function.FunctionBuilder; +import org.opensearch.sql.expression.function.FunctionName; +import org.opensearch.sql.expression.function.FunctionResolver; +import org.opensearch.sql.expression.function.FunctionSignature; +import org.opensearch.sql.prometheus.client.PrometheusClient; +import org.opensearch.sql.prometheus.functions.implementation.QueryExemplarFunctionImplementation; + +/** + * This class is for query_exemplars table function resolver {@link FunctionResolver}. + * It takes care of validating function arguments and also creating + * required {@link org.opensearch.sql.expression.function.TableFunctionImplementation} Class. + */ +@RequiredArgsConstructor +public class QueryExemplarsTableFunctionResolver implements FunctionResolver { + + private final PrometheusClient prometheusClient; + public static final String QUERY_EXEMPLARS = "query_exemplars"; + + public static final String QUERY = "query"; + public static final String STARTTIME = "starttime"; + public static final String ENDTIME = "endtime"; + + @Override + public Pair resolve(FunctionSignature unresolvedSignature) { + final FunctionName functionName = FunctionName.of(QUERY_EXEMPLARS); + FunctionSignature functionSignature = + new FunctionSignature(FunctionName.of(QUERY_EXEMPLARS), List.of(STRING, LONG, LONG)); + FunctionBuilder functionBuilder = (functionProperties, arguments) -> { + final List argumentNames = List.of(QUERY, STARTTIME, ENDTIME); + validatePrometheusTableFunctionArguments(arguments, argumentNames); + List namedArguments = getNamedArgumentsOfTableFunction(arguments, argumentNames); + return new QueryExemplarFunctionImplementation(functionName, + namedArguments, prometheusClient); + }; + return Pair.of(functionSignature, functionBuilder); + } + + @Override + public FunctionName getFunctionName() { + return FunctionName.of(QUERY_EXEMPLARS); + } +} diff --git a/prometheus/src/main/java/org/opensearch/sql/prometheus/functions/resolver/QueryRangeTableFunctionResolver.java b/prometheus/src/main/java/org/opensearch/sql/prometheus/functions/resolver/QueryRangeTableFunctionResolver.java index a1d72f98c3..8bb2a2d758 100644 --- a/prometheus/src/main/java/org/opensearch/sql/prometheus/functions/resolver/QueryRangeTableFunctionResolver.java +++ b/prometheus/src/main/java/org/opensearch/sql/prometheus/functions/resolver/QueryRangeTableFunctionResolver.java @@ -9,18 +9,13 @@ import static org.opensearch.sql.data.type.ExprCoreType.LONG; import static org.opensearch.sql.data.type.ExprCoreType.STRING; +import static org.opensearch.sql.prometheus.utils.TableFunctionUtils.getNamedArgumentsOfTableFunction; +import static org.opensearch.sql.prometheus.utils.TableFunctionUtils.validatePrometheusTableFunctionArguments; -import java.util.ArrayList; -import java.util.HashSet; import java.util.List; -import java.util.Set; -import java.util.stream.Collectors; import lombok.RequiredArgsConstructor; -import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.tuple.Pair; -import org.opensearch.sql.exception.SemanticCheckException; import org.opensearch.sql.expression.Expression; -import org.opensearch.sql.expression.NamedArgumentExpression; import org.opensearch.sql.expression.function.FunctionBuilder; import org.opensearch.sql.expression.function.FunctionName; import org.opensearch.sql.expression.function.FunctionResolver; @@ -32,7 +27,6 @@ public class QueryRangeTableFunctionResolver implements FunctionResolver { private final PrometheusClient prometheusClient; - public static final String QUERY_RANGE = "query_range"; public static final String QUERY = "query"; public static final String STARTTIME = "starttime"; @@ -43,53 +37,16 @@ public class QueryRangeTableFunctionResolver implements FunctionResolver { public Pair resolve(FunctionSignature unresolvedSignature) { FunctionName functionName = FunctionName.of(QUERY_RANGE); FunctionSignature functionSignature = - new FunctionSignature(functionName, List.of(STRING, LONG, LONG, LONG)); + new FunctionSignature(functionName, List.of(STRING, LONG, LONG, STRING)); final List argumentNames = List.of(QUERY, STARTTIME, ENDTIME, STEP); - FunctionBuilder functionBuilder = (functionProperties, arguments) -> { - Boolean argumentsPassedByName = arguments.stream() - .noneMatch(arg -> StringUtils.isEmpty(((NamedArgumentExpression) arg).getArgName())); - Boolean argumentsPassedByPosition = arguments.stream() - .allMatch(arg -> StringUtils.isEmpty(((NamedArgumentExpression) arg).getArgName())); - if (!(argumentsPassedByName || argumentsPassedByPosition)) { - throw new SemanticCheckException("Arguments should be either passed by name or position"); - } - - if (arguments.size() != argumentNames.size()) { - throw new SemanticCheckException( - generateErrorMessageForMissingArguments(argumentsPassedByPosition, arguments, - argumentNames)); - } - - if (argumentsPassedByPosition) { - List namedArguments = new ArrayList<>(); - for (int i = 0; i < arguments.size(); i++) { - namedArguments.add(new NamedArgumentExpression(argumentNames.get(i), - ((NamedArgumentExpression) arguments.get(i)).getValue())); - } - return new QueryRangeFunctionImplementation(functionName, namedArguments, prometheusClient); - } - return new QueryRangeFunctionImplementation(functionName, arguments, prometheusClient); + validatePrometheusTableFunctionArguments(arguments, argumentNames); + List namedArguments = getNamedArgumentsOfTableFunction(arguments, argumentNames); + return new QueryRangeFunctionImplementation(functionName, namedArguments, prometheusClient); }; return Pair.of(functionSignature, functionBuilder); } - private String generateErrorMessageForMissingArguments(Boolean argumentsPassedByPosition, - List arguments, - List argumentNames) { - if (argumentsPassedByPosition) { - return String.format("Missing arguments:[%s]", - String.join(",", argumentNames.subList(arguments.size(), argumentNames.size()))); - } else { - Set requiredArguments = new HashSet<>(argumentNames); - Set providedArguments = - arguments.stream().map(expression -> ((NamedArgumentExpression) expression).getArgName()) - .collect(Collectors.toSet()); - requiredArguments.removeAll(providedArguments); - return String.format("Missing arguments:[%s]", String.join(",", requiredArguments)); - } - } - @Override public FunctionName getFunctionName() { return FunctionName.of(QUERY_RANGE); diff --git a/prometheus/src/main/java/org/opensearch/sql/prometheus/functions/response/DefaultQueryRangeFunctionResponseHandle.java b/prometheus/src/main/java/org/opensearch/sql/prometheus/functions/response/DefaultQueryRangeFunctionResponseHandle.java deleted file mode 100644 index d4353d2f99..0000000000 --- a/prometheus/src/main/java/org/opensearch/sql/prometheus/functions/response/DefaultQueryRangeFunctionResponseHandle.java +++ /dev/null @@ -1,116 +0,0 @@ -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - */ - -package org.opensearch.sql.prometheus.functions.response; - -import static org.opensearch.sql.prometheus.data.constants.PrometheusFieldConstants.LABELS; -import static org.opensearch.sql.prometheus.data.constants.PrometheusFieldConstants.TIMESTAMP; -import static org.opensearch.sql.prometheus.data.constants.PrometheusFieldConstants.VALUE; - -import java.time.Instant; -import java.util.ArrayList; -import java.util.Iterator; -import java.util.LinkedHashMap; -import java.util.List; -import org.json.JSONArray; -import org.json.JSONObject; -import org.opensearch.sql.data.model.ExprCollectionValue; -import org.opensearch.sql.data.model.ExprDoubleValue; -import org.opensearch.sql.data.model.ExprStringValue; -import org.opensearch.sql.data.model.ExprTimestampValue; -import org.opensearch.sql.data.model.ExprTupleValue; -import org.opensearch.sql.data.model.ExprValue; -import org.opensearch.sql.data.type.ExprCoreType; -import org.opensearch.sql.executor.ExecutionEngine; - -/** - * Default implementation of QueryRangeFunctionResponseHandle. - */ -public class DefaultQueryRangeFunctionResponseHandle implements QueryRangeFunctionResponseHandle { - - private final JSONObject responseObject; - private Iterator responseIterator; - private ExecutionEngine.Schema schema; - - /** - * Constructor. - * - * @param responseObject Prometheus responseObject. - */ - public DefaultQueryRangeFunctionResponseHandle(JSONObject responseObject) { - this.responseObject = responseObject; - constructSchema(); - constructIterator(); - } - - private void constructIterator() { - List result = new ArrayList<>(); - if ("matrix".equals(responseObject.getString("resultType"))) { - JSONArray itemArray = responseObject.getJSONArray("result"); - for (int i = 0; i < itemArray.length(); i++) { - LinkedHashMap linkedHashMap = new LinkedHashMap<>(); - JSONObject item = itemArray.getJSONObject(i); - linkedHashMap.put(LABELS, extractLabels(item.getJSONObject("metric"))); - extractTimestampAndValues(item.getJSONArray("values"), linkedHashMap); - result.add(new ExprTupleValue(linkedHashMap)); - } - } else { - throw new RuntimeException(String.format("Unexpected Result Type: %s during Prometheus " - + "Response Parsing. 'matrix' resultType is expected", - responseObject.getString("resultType"))); - } - this.responseIterator = result.iterator(); - } - - private static void extractTimestampAndValues(JSONArray values, - LinkedHashMap linkedHashMap) { - List timestampList = new ArrayList<>(); - List valueList = new ArrayList<>(); - for (int j = 0; j < values.length(); j++) { - JSONArray value = values.getJSONArray(j); - timestampList.add(new ExprTimestampValue( - Instant.ofEpochMilli((long) (value.getDouble(0) * 1000)))); - valueList.add(new ExprDoubleValue(value.getDouble(1))); - } - linkedHashMap.put(TIMESTAMP, - new ExprCollectionValue(timestampList)); - linkedHashMap.put(VALUE, new ExprCollectionValue(valueList)); - } - - private void constructSchema() { - this.schema = new ExecutionEngine.Schema(getColumnList()); - } - - private ExprValue extractLabels(JSONObject metric) { - LinkedHashMap labelsMap = new LinkedHashMap<>(); - metric.keySet().forEach(key - -> labelsMap.put(key, new ExprStringValue(metric.getString(key)))); - return new ExprTupleValue(labelsMap); - } - - - private List getColumnList() { - List columnList = new ArrayList<>(); - columnList.add(new ExecutionEngine.Schema.Column(TIMESTAMP, TIMESTAMP, ExprCoreType.ARRAY)); - columnList.add(new ExecutionEngine.Schema.Column(VALUE, VALUE, ExprCoreType.ARRAY)); - columnList.add(new ExecutionEngine.Schema.Column(LABELS, LABELS, ExprCoreType.STRUCT)); - return columnList; - } - - @Override - public boolean hasNext() { - return responseIterator.hasNext(); - } - - @Override - public ExprValue next() { - return responseIterator.next(); - } - - @Override - public ExecutionEngine.Schema schema() { - return schema; - } -} diff --git a/prometheus/src/main/java/org/opensearch/sql/prometheus/functions/response/PrometheusFunctionResponseHandle.java b/prometheus/src/main/java/org/opensearch/sql/prometheus/functions/response/PrometheusFunctionResponseHandle.java new file mode 100644 index 0000000000..f2cefa85ec --- /dev/null +++ b/prometheus/src/main/java/org/opensearch/sql/prometheus/functions/response/PrometheusFunctionResponseHandle.java @@ -0,0 +1,31 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.prometheus.functions.response; + +import org.opensearch.sql.data.model.ExprValue; +import org.opensearch.sql.executor.ExecutionEngine; + +/** + * Handle Prometheus response. + */ +public interface PrometheusFunctionResponseHandle { + + /** + * Return true if Prometheus response has more result. + */ + boolean hasNext(); + + /** + * Return Prometheus response as {@link ExprValue}. Attention, the method must been called when + * hasNext return true. + */ + ExprValue next(); + + /** + * Return ExecutionEngine.Schema of the Prometheus response. + */ + ExecutionEngine.Schema schema(); +} diff --git a/prometheus/src/main/java/org/opensearch/sql/prometheus/functions/response/QueryExemplarsFunctionResponseHandle.java b/prometheus/src/main/java/org/opensearch/sql/prometheus/functions/response/QueryExemplarsFunctionResponseHandle.java new file mode 100644 index 0000000000..f734159720 --- /dev/null +++ b/prometheus/src/main/java/org/opensearch/sql/prometheus/functions/response/QueryExemplarsFunctionResponseHandle.java @@ -0,0 +1,115 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.prometheus.functions.response; + +import static org.opensearch.sql.data.type.ExprCoreType.STRUCT; +import static org.opensearch.sql.prometheus.data.constants.PrometheusFieldConstants.EXEMPLARS_KEY; +import static org.opensearch.sql.prometheus.data.constants.PrometheusFieldConstants.LABELS_KEY; +import static org.opensearch.sql.prometheus.data.constants.PrometheusFieldConstants.SERIES_LABELS_KEY; +import static org.opensearch.sql.prometheus.data.constants.PrometheusFieldConstants.TIMESTAMP_KEY; +import static org.opensearch.sql.prometheus.data.constants.PrometheusFieldConstants.TRACE_ID_KEY; +import static org.opensearch.sql.prometheus.data.constants.PrometheusFieldConstants.VALUE_KEY; + +import java.time.Instant; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.LinkedHashMap; +import java.util.List; +import org.json.JSONArray; +import org.json.JSONObject; +import org.opensearch.sql.data.model.ExprCollectionValue; +import org.opensearch.sql.data.model.ExprDoubleValue; +import org.opensearch.sql.data.model.ExprStringValue; +import org.opensearch.sql.data.model.ExprTimestampValue; +import org.opensearch.sql.data.model.ExprTupleValue; +import org.opensearch.sql.data.model.ExprValue; +import org.opensearch.sql.data.type.ExprCoreType; +import org.opensearch.sql.executor.ExecutionEngine; + +public class QueryExemplarsFunctionResponseHandle implements PrometheusFunctionResponseHandle { + private Iterator responseIterator; + private ExecutionEngine.Schema schema; + + /** + * Constructor. + * + * @param responseArray Query Exemplars responseObject. + */ + public QueryExemplarsFunctionResponseHandle(JSONArray responseArray) { + constructIteratorAndSchema(responseArray); + } + + private void constructIteratorAndSchema(JSONArray responseArray) { + List columnList = new ArrayList<>(); + columnList.add(new ExecutionEngine.Schema.Column(SERIES_LABELS_KEY, SERIES_LABELS_KEY, STRUCT)); + columnList.add(new ExecutionEngine.Schema.Column(EXEMPLARS_KEY, EXEMPLARS_KEY, + ExprCoreType.ARRAY)); + this.schema = new ExecutionEngine.Schema(columnList); + List result = new ArrayList<>(); + for (int i = 0; i < responseArray.length(); i++) { + JSONObject rowObject = responseArray.getJSONObject(i); + LinkedHashMap rowMap = new LinkedHashMap<>(); + JSONObject seriesLabels = rowObject.getJSONObject(SERIES_LABELS_KEY); + rowMap.put(SERIES_LABELS_KEY, constructSeriesLabels(seriesLabels)); + JSONArray exemplars = rowObject.getJSONArray(EXEMPLARS_KEY); + rowMap.put(EXEMPLARS_KEY, constructExemplarList(exemplars)); + result.add(new ExprTupleValue(rowMap)); + } + this.responseIterator = result.iterator(); + } + + private ExprValue constructSeriesLabels(JSONObject seriesLabels) { + LinkedHashMap seriesLabelsMap = new LinkedHashMap<>(); + seriesLabels.keySet() + .forEach(key -> seriesLabelsMap.put(key, new ExprStringValue(seriesLabels.getString(key)))); + return new ExprTupleValue(seriesLabelsMap); + } + + private ExprValue constructExemplarList(JSONArray exemplars) { + List exemplarsList = new ArrayList<>(); + for (int i = 0; i < exemplars.length(); i++) { + JSONObject exemplarsJSONObject = exemplars.getJSONObject(i); + exemplarsList.add(constructExemplar(exemplarsJSONObject)); + } + return new ExprCollectionValue(exemplarsList); + } + + private ExprValue constructExemplar(JSONObject exemplarsJSONObject) { + LinkedHashMap exemplarHashMap = new LinkedHashMap<>(); + exemplarHashMap.put(LABELS_KEY, + constructLabelsInExemplar(exemplarsJSONObject.getJSONObject(LABELS_KEY))); + exemplarHashMap.put(TIMESTAMP_KEY, + new ExprTimestampValue(Instant.ofEpochMilli((long)( + exemplarsJSONObject.getDouble(TIMESTAMP_KEY) * 1000)))); + exemplarHashMap.put(VALUE_KEY, + new ExprDoubleValue(exemplarsJSONObject.getDouble(VALUE_KEY))); + return new ExprTupleValue(exemplarHashMap); + } + + private ExprValue constructLabelsInExemplar(JSONObject labelsObject) { + LinkedHashMap labelsMap = new LinkedHashMap<>(); + for (String key : labelsObject.keySet()) { + labelsMap.put(key, new ExprStringValue(labelsObject.getString(key))); + } + return new ExprTupleValue(labelsMap); + } + + @Override + public boolean hasNext() { + return responseIterator.hasNext(); + } + + @Override + public ExprValue next() { + return responseIterator.next(); + } + + + @Override + public ExecutionEngine.Schema schema() { + return schema; + } +} diff --git a/prometheus/src/main/java/org/opensearch/sql/prometheus/functions/response/QueryRangeFunctionResponseHandle.java b/prometheus/src/main/java/org/opensearch/sql/prometheus/functions/response/QueryRangeFunctionResponseHandle.java index 3bc666b703..a3c68617e8 100644 --- a/prometheus/src/main/java/org/opensearch/sql/prometheus/functions/response/QueryRangeFunctionResponseHandle.java +++ b/prometheus/src/main/java/org/opensearch/sql/prometheus/functions/response/QueryRangeFunctionResponseHandle.java @@ -5,27 +5,117 @@ package org.opensearch.sql.prometheus.functions.response; +import static org.opensearch.sql.prometheus.data.constants.PrometheusFieldConstants.LABELS; +import static org.opensearch.sql.prometheus.data.constants.PrometheusFieldConstants.MATRIX_KEY; +import static org.opensearch.sql.prometheus.data.constants.PrometheusFieldConstants.METRIC_KEY; +import static org.opensearch.sql.prometheus.data.constants.PrometheusFieldConstants.RESULT_KEY; +import static org.opensearch.sql.prometheus.data.constants.PrometheusFieldConstants.RESULT_TYPE_KEY; +import static org.opensearch.sql.prometheus.data.constants.PrometheusFieldConstants.TIMESTAMP; +import static org.opensearch.sql.prometheus.data.constants.PrometheusFieldConstants.VALUE; +import static org.opensearch.sql.prometheus.data.constants.PrometheusFieldConstants.VALUES_KEY; + +import java.time.Instant; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.LinkedHashMap; +import java.util.List; +import org.json.JSONArray; +import org.json.JSONObject; +import org.opensearch.sql.data.model.ExprCollectionValue; +import org.opensearch.sql.data.model.ExprDoubleValue; +import org.opensearch.sql.data.model.ExprStringValue; +import org.opensearch.sql.data.model.ExprTimestampValue; +import org.opensearch.sql.data.model.ExprTupleValue; import org.opensearch.sql.data.model.ExprValue; +import org.opensearch.sql.data.type.ExprCoreType; import org.opensearch.sql.executor.ExecutionEngine; /** - * Handle Prometheus response. + * Default implementation of QueryRangeFunctionResponseHandle. */ -public interface QueryRangeFunctionResponseHandle { +public class QueryRangeFunctionResponseHandle implements PrometheusFunctionResponseHandle { - /** - * Return true if Prometheus response has more result. - */ - boolean hasNext(); + private final JSONObject responseObject; + private Iterator responseIterator; + private ExecutionEngine.Schema schema; /** - * Return Prometheus response as {@link ExprValue}. Attention, the method must been called when - * hasNext return true. + * Constructor. + * + * @param responseObject Prometheus responseObject. */ - ExprValue next(); + public QueryRangeFunctionResponseHandle(JSONObject responseObject) { + this.responseObject = responseObject; + constructSchema(); + constructIterator(); + } - /** - * Return ExecutionEngine.Schema of the Prometheus response. - */ - ExecutionEngine.Schema schema(); + private void constructIterator() { + List result = new ArrayList<>(); + if (MATRIX_KEY.equals(responseObject.getString(RESULT_TYPE_KEY))) { + JSONArray itemArray = responseObject.getJSONArray(RESULT_KEY); + for (int i = 0; i < itemArray.length(); i++) { + LinkedHashMap linkedHashMap = new LinkedHashMap<>(); + JSONObject item = itemArray.getJSONObject(i); + linkedHashMap.put(LABELS, extractLabels(item.getJSONObject(METRIC_KEY))); + extractTimestampAndValues(item.getJSONArray(VALUES_KEY), linkedHashMap); + result.add(new ExprTupleValue(linkedHashMap)); + } + } else { + throw new RuntimeException(String.format("Unexpected Result Type: %s during Prometheus " + + "Response Parsing. 'matrix' resultType is expected", + responseObject.getString("resultType"))); + } + this.responseIterator = result.iterator(); + } + + private static void extractTimestampAndValues(JSONArray values, + LinkedHashMap linkedHashMap) { + List timestampList = new ArrayList<>(); + List valueList = new ArrayList<>(); + for (int j = 0; j < values.length(); j++) { + JSONArray value = values.getJSONArray(j); + timestampList.add(new ExprTimestampValue( + Instant.ofEpochMilli((long) (value.getDouble(0) * 1000)))); + valueList.add(new ExprDoubleValue(value.getDouble(1))); + } + linkedHashMap.put(TIMESTAMP, + new ExprCollectionValue(timestampList)); + linkedHashMap.put(VALUE, new ExprCollectionValue(valueList)); + } + + private void constructSchema() { + this.schema = new ExecutionEngine.Schema(getColumnList()); + } + + private ExprValue extractLabels(JSONObject metric) { + LinkedHashMap labelsMap = new LinkedHashMap<>(); + metric.keySet().forEach(key + -> labelsMap.put(key, new ExprStringValue(metric.getString(key)))); + return new ExprTupleValue(labelsMap); + } + + + private List getColumnList() { + List columnList = new ArrayList<>(); + columnList.add(new ExecutionEngine.Schema.Column(LABELS, LABELS, ExprCoreType.STRUCT)); + columnList.add(new ExecutionEngine.Schema.Column(TIMESTAMP, TIMESTAMP, ExprCoreType.ARRAY)); + columnList.add(new ExecutionEngine.Schema.Column(VALUE, VALUE, ExprCoreType.ARRAY)); + return columnList; + } + + @Override + public boolean hasNext() { + return responseIterator.hasNext(); + } + + @Override + public ExprValue next() { + return responseIterator.next(); + } + + @Override + public ExecutionEngine.Schema schema() { + return schema; + } } diff --git a/prometheus/src/main/java/org/opensearch/sql/prometheus/functions/scan/QueryExemplarsFunctionTableScanBuilder.java b/prometheus/src/main/java/org/opensearch/sql/prometheus/functions/scan/QueryExemplarsFunctionTableScanBuilder.java new file mode 100644 index 0000000000..8364173889 --- /dev/null +++ b/prometheus/src/main/java/org/opensearch/sql/prometheus/functions/scan/QueryExemplarsFunctionTableScanBuilder.java @@ -0,0 +1,38 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.prometheus.functions.scan; + +import lombok.AllArgsConstructor; +import org.opensearch.sql.planner.logical.LogicalProject; +import org.opensearch.sql.prometheus.client.PrometheusClient; +import org.opensearch.sql.prometheus.request.PrometheusQueryExemplarsRequest; +import org.opensearch.sql.storage.TableScanOperator; +import org.opensearch.sql.storage.read.TableScanBuilder; + +/** + * TableScanBuilder for query_exemplars table function of prometheus connector. + */ +@AllArgsConstructor +public class QueryExemplarsFunctionTableScanBuilder extends TableScanBuilder { + + private final PrometheusClient prometheusClient; + + private final PrometheusQueryExemplarsRequest prometheusQueryExemplarsRequest; + + @Override + public TableScanOperator build() { + return new QueryExemplarsFunctionTableScanOperator(prometheusClient, + prometheusQueryExemplarsRequest); + } + + // Since we are determining the schema after table scan, + // we are ignoring default Logical Project added in the plan. + @Override + public boolean pushDownProject(LogicalProject project) { + return true; + } + +} diff --git a/prometheus/src/main/java/org/opensearch/sql/prometheus/functions/scan/QueryExemplarsFunctionTableScanOperator.java b/prometheus/src/main/java/org/opensearch/sql/prometheus/functions/scan/QueryExemplarsFunctionTableScanOperator.java new file mode 100644 index 0000000000..85ba6c854a --- /dev/null +++ b/prometheus/src/main/java/org/opensearch/sql/prometheus/functions/scan/QueryExemplarsFunctionTableScanOperator.java @@ -0,0 +1,85 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.prometheus.functions.scan; + +import java.io.IOException; +import java.security.AccessController; +import java.security.PrivilegedAction; +import java.util.Locale; +import lombok.Getter; +import lombok.RequiredArgsConstructor; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.json.JSONArray; +import org.opensearch.sql.data.model.ExprValue; +import org.opensearch.sql.executor.ExecutionEngine; +import org.opensearch.sql.prometheus.client.PrometheusClient; +import org.opensearch.sql.prometheus.functions.response.QueryExemplarsFunctionResponseHandle; +import org.opensearch.sql.prometheus.request.PrometheusQueryExemplarsRequest; +import org.opensearch.sql.storage.TableScanOperator; + +/** + * This class is for QueryExemplars function {@link TableScanOperator}. + * This takes care of getting exemplar data from prometheus by making + * {@link PrometheusQueryExemplarsRequest}. + */ +@RequiredArgsConstructor +public class QueryExemplarsFunctionTableScanOperator extends TableScanOperator { + + private final PrometheusClient prometheusClient; + + @Getter + private final PrometheusQueryExemplarsRequest request; + private QueryExemplarsFunctionResponseHandle queryExemplarsFunctionResponseHandle; + private static final Logger LOG = LogManager.getLogger(); + + @Override + public void open() { + super.open(); + this.queryExemplarsFunctionResponseHandle + = AccessController + .doPrivileged((PrivilegedAction) () -> { + try { + JSONArray responseArray = prometheusClient.queryExemplars( + request.getQuery(), + request.getStartTime(), request.getEndTime()); + return new QueryExemplarsFunctionResponseHandle(responseArray); + } catch (IOException e) { + LOG.error(e.getMessage()); + throw new RuntimeException( + String.format("Error fetching data from prometheus server: %s", e.getMessage())); + } + }); + } + + @Override + public void close() { + super.close(); + } + + @Override + public boolean hasNext() { + return this.queryExemplarsFunctionResponseHandle.hasNext(); + } + + @Override + public ExprValue next() { + return this.queryExemplarsFunctionResponseHandle.next(); + } + + @Override + public String explain() { + return String.format(Locale.ROOT, "query_exemplars(%s, %s, %s)", + request.getQuery(), + request.getStartTime(), + request.getEndTime()); + } + + @Override + public ExecutionEngine.Schema schema() { + return queryExemplarsFunctionResponseHandle.schema(); + } +} diff --git a/prometheus/src/main/java/org/opensearch/sql/prometheus/functions/scan/QueryRangeFunctionTableScanOperator.java b/prometheus/src/main/java/org/opensearch/sql/prometheus/functions/scan/QueryRangeFunctionTableScanOperator.java index 019f9cffcb..68b9b60643 100644 --- a/prometheus/src/main/java/org/opensearch/sql/prometheus/functions/scan/QueryRangeFunctionTableScanOperator.java +++ b/prometheus/src/main/java/org/opensearch/sql/prometheus/functions/scan/QueryRangeFunctionTableScanOperator.java @@ -18,7 +18,7 @@ import org.opensearch.sql.data.model.ExprValue; import org.opensearch.sql.executor.ExecutionEngine; import org.opensearch.sql.prometheus.client.PrometheusClient; -import org.opensearch.sql.prometheus.functions.response.DefaultQueryRangeFunctionResponseHandle; +import org.opensearch.sql.prometheus.functions.response.PrometheusFunctionResponseHandle; import org.opensearch.sql.prometheus.functions.response.QueryRangeFunctionResponseHandle; import org.opensearch.sql.prometheus.request.PrometheusQueryRequest; import org.opensearch.sql.storage.TableScanOperator; @@ -32,7 +32,7 @@ public class QueryRangeFunctionTableScanOperator extends TableScanOperator { private final PrometheusClient prometheusClient; private final PrometheusQueryRequest request; - private QueryRangeFunctionResponseHandle prometheusResponseHandle; + private PrometheusFunctionResponseHandle prometheusResponseHandle; private static final Logger LOG = LogManager.getLogger(); @@ -40,12 +40,12 @@ public class QueryRangeFunctionTableScanOperator extends TableScanOperator { public void open() { super.open(); this.prometheusResponseHandle - = AccessController.doPrivileged((PrivilegedAction) () -> { + = AccessController.doPrivileged((PrivilegedAction) () -> { try { JSONObject responseObject = prometheusClient.queryRange( request.getPromQl(), request.getStartTime(), request.getEndTime(), request.getStep()); - return new DefaultQueryRangeFunctionResponseHandle(responseObject); + return new QueryRangeFunctionResponseHandle(responseObject); } catch (IOException e) { LOG.error(e.getMessage()); throw new RuntimeException( diff --git a/prometheus/src/main/java/org/opensearch/sql/prometheus/request/PrometheusQueryExemplarsRequest.java b/prometheus/src/main/java/org/opensearch/sql/prometheus/request/PrometheusQueryExemplarsRequest.java new file mode 100644 index 0000000000..9cf3d41522 --- /dev/null +++ b/prometheus/src/main/java/org/opensearch/sql/prometheus/request/PrometheusQueryExemplarsRequest.java @@ -0,0 +1,40 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + + +package org.opensearch.sql.prometheus.request; + +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.EqualsAndHashCode; +import lombok.NoArgsConstructor; +import lombok.ToString; + +/** + * Prometheus metric query request. + */ +@EqualsAndHashCode +@Data +@ToString +@AllArgsConstructor +@NoArgsConstructor +public class PrometheusQueryExemplarsRequest { + + /** + * PromQL. + */ + private String query; + + /** + * startTime of the query. + */ + private Long startTime; + + /** + * endTime of the query. + */ + private Long endTime; + +} diff --git a/prometheus/src/main/java/org/opensearch/sql/prometheus/response/PrometheusResponse.java b/prometheus/src/main/java/org/opensearch/sql/prometheus/response/PrometheusResponse.java index bd9e36ccdc..2c75588e4c 100644 --- a/prometheus/src/main/java/org/opensearch/sql/prometheus/response/PrometheusResponse.java +++ b/prometheus/src/main/java/org/opensearch/sql/prometheus/response/PrometheusResponse.java @@ -8,6 +8,11 @@ import static org.opensearch.sql.data.type.ExprCoreType.INTEGER; import static org.opensearch.sql.data.type.ExprCoreType.LONG; import static org.opensearch.sql.prometheus.data.constants.PrometheusFieldConstants.LABELS; +import static org.opensearch.sql.prometheus.data.constants.PrometheusFieldConstants.MATRIX_KEY; +import static org.opensearch.sql.prometheus.data.constants.PrometheusFieldConstants.METRIC_KEY; +import static org.opensearch.sql.prometheus.data.constants.PrometheusFieldConstants.RESULT_KEY; +import static org.opensearch.sql.prometheus.data.constants.PrometheusFieldConstants.RESULT_TYPE_KEY; +import static org.opensearch.sql.prometheus.data.constants.PrometheusFieldConstants.VALUES_KEY; import java.time.Instant; import java.util.ArrayList; @@ -53,12 +58,12 @@ public PrometheusResponse(JSONObject responseObject, @Override public Iterator iterator() { List result = new ArrayList<>(); - if ("matrix".equals(responseObject.getString("resultType"))) { - JSONArray itemArray = responseObject.getJSONArray("result"); + if (MATRIX_KEY.equals(responseObject.getString(RESULT_TYPE_KEY))) { + JSONArray itemArray = responseObject.getJSONArray(RESULT_KEY); for (int i = 0; i < itemArray.length(); i++) { JSONObject item = itemArray.getJSONObject(i); - JSONObject metric = item.getJSONObject("metric"); - JSONArray values = item.getJSONArray("values"); + JSONObject metric = item.getJSONObject(METRIC_KEY); + JSONArray values = item.getJSONArray(VALUES_KEY); for (int j = 0; j < values.length(); j++) { LinkedHashMap linkedHashMap = new LinkedHashMap<>(); JSONArray val = values.getJSONArray(j); @@ -73,7 +78,7 @@ public Iterator iterator() { } else { throw new RuntimeException(String.format("Unexpected Result Type: %s during Prometheus " + "Response Parsing. 'matrix' resultType is expected", - responseObject.getString("resultType"))); + responseObject.getString(RESULT_TYPE_KEY))); } return result.iterator(); } diff --git a/prometheus/src/main/java/org/opensearch/sql/prometheus/storage/PrometheusStorageEngine.java b/prometheus/src/main/java/org/opensearch/sql/prometheus/storage/PrometheusStorageEngine.java index 4bda60b822..e19b369a97 100644 --- a/prometheus/src/main/java/org/opensearch/sql/prometheus/storage/PrometheusStorageEngine.java +++ b/prometheus/src/main/java/org/opensearch/sql/prometheus/storage/PrometheusStorageEngine.java @@ -10,6 +10,7 @@ import static org.opensearch.sql.analysis.DataSourceSchemaIdentifierNameResolver.INFORMATION_SCHEMA_NAME; import static org.opensearch.sql.utils.SystemIndexUtils.isSystemIndex; +import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import lombok.RequiredArgsConstructor; @@ -17,6 +18,7 @@ import org.opensearch.sql.exception.SemanticCheckException; import org.opensearch.sql.expression.function.FunctionResolver; import org.opensearch.sql.prometheus.client.PrometheusClient; +import org.opensearch.sql.prometheus.functions.resolver.QueryExemplarsTableFunctionResolver; import org.opensearch.sql.prometheus.functions.resolver.QueryRangeTableFunctionResolver; import org.opensearch.sql.prometheus.storage.system.PrometheusSystemTable; import org.opensearch.sql.storage.StorageEngine; @@ -34,8 +36,10 @@ public class PrometheusStorageEngine implements StorageEngine { @Override public Collection getFunctions() { - return Collections.singletonList( - new QueryRangeTableFunctionResolver(prometheusClient)); + ArrayList functionList = new ArrayList<>(); + functionList.add(new QueryRangeTableFunctionResolver(prometheusClient)); + functionList.add(new QueryExemplarsTableFunctionResolver(prometheusClient)); + return functionList; } @Override diff --git a/prometheus/src/main/java/org/opensearch/sql/prometheus/storage/QueryExemplarsTable.java b/prometheus/src/main/java/org/opensearch/sql/prometheus/storage/QueryExemplarsTable.java new file mode 100644 index 0000000000..dcb87c2cce --- /dev/null +++ b/prometheus/src/main/java/org/opensearch/sql/prometheus/storage/QueryExemplarsTable.java @@ -0,0 +1,53 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.prometheus.storage; + +import java.util.Collections; +import java.util.Map; +import lombok.Getter; +import lombok.RequiredArgsConstructor; +import org.opensearch.sql.data.type.ExprType; +import org.opensearch.sql.planner.logical.LogicalPlan; +import org.opensearch.sql.planner.physical.PhysicalPlan; +import org.opensearch.sql.prometheus.client.PrometheusClient; +import org.opensearch.sql.prometheus.functions.scan.QueryExemplarsFunctionTableScanBuilder; +import org.opensearch.sql.prometheus.request.PrometheusQueryExemplarsRequest; +import org.opensearch.sql.prometheus.storage.implementor.PrometheusDefaultImplementor; +import org.opensearch.sql.storage.Table; +import org.opensearch.sql.storage.read.TableScanBuilder; + +/** + * This is {@link Table} for querying exemplars in prometheus Table. + * Since {@link PrometheusMetricTable} is overloaded with query_range and normal + * PPL metric queries. Created a separate table for handling + * {@link PrometheusQueryExemplarsRequest} + */ +@RequiredArgsConstructor +public class QueryExemplarsTable implements Table { + + @Getter + private final PrometheusClient prometheusClient; + + @Getter + private final PrometheusQueryExemplarsRequest exemplarsRequest; + + + @Override + public Map getFieldTypes() { + return Collections.emptyMap(); + } + + @Override + public PhysicalPlan implement(LogicalPlan plan) { + return plan.accept(new PrometheusDefaultImplementor(), null); + } + + @Override + public TableScanBuilder createScanBuilder() { + return new QueryExemplarsFunctionTableScanBuilder(prometheusClient, exemplarsRequest); + } + +} diff --git a/prometheus/src/main/java/org/opensearch/sql/prometheus/utils/TableFunctionUtils.java b/prometheus/src/main/java/org/opensearch/sql/prometheus/utils/TableFunctionUtils.java new file mode 100644 index 0000000000..35edc83614 --- /dev/null +++ b/prometheus/src/main/java/org/opensearch/sql/prometheus/utils/TableFunctionUtils.java @@ -0,0 +1,89 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.prometheus.utils; + +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.stream.Collectors; +import lombok.experimental.UtilityClass; +import org.apache.commons.lang3.StringUtils; +import org.opensearch.sql.exception.SemanticCheckException; +import org.opensearch.sql.expression.Expression; +import org.opensearch.sql.expression.NamedArgumentExpression; + +/** + * Utility class for common table function methods. + */ +@UtilityClass +public class TableFunctionUtils { + + /** + * Validates if function arguments are valid + * in both the cases when the arguments are passed by position or name. + * + * @param arguments arguments of function provided in the input order. + * @param argumentNames ordered argument names of the function. + */ + public static void validatePrometheusTableFunctionArguments(List arguments, + List argumentNames) { + Boolean argumentsPassedByName = arguments.stream() + .noneMatch(arg -> StringUtils.isEmpty(((NamedArgumentExpression) arg).getArgName())); + Boolean argumentsPassedByPosition = arguments.stream() + .allMatch(arg -> StringUtils.isEmpty(((NamedArgumentExpression) arg).getArgName())); + if (!(argumentsPassedByName || argumentsPassedByPosition)) { + throw new SemanticCheckException("Arguments should be either passed by name or position"); + } + + if (arguments.size() != argumentNames.size()) { + throw new SemanticCheckException( + generateErrorMessageForMissingArguments(argumentsPassedByPosition, arguments, + argumentNames)); + } + } + + /** + * Get Named Arguments of Table Function Arguments. + * If they are passed by position create new ones or else return the same arguments passed. + * + * @param arguments arguments of function provided in the input order. + * @param argumentNames ordered argument names of the function. + */ + public static List getNamedArgumentsOfTableFunction(List arguments, + List argumentNames) { + boolean argumentsPassedByPosition = arguments.stream() + .allMatch(arg -> StringUtils.isEmpty(((NamedArgumentExpression) arg).getArgName())); + if (argumentsPassedByPosition) { + List namedArguments = new ArrayList<>(); + for (int i = 0; i < arguments.size(); i++) { + namedArguments.add(new NamedArgumentExpression(argumentNames.get(i), + ((NamedArgumentExpression) arguments.get(i)).getValue())); + } + return namedArguments; + } + return arguments; + } + + private static String generateErrorMessageForMissingArguments( + Boolean areArgumentsPassedByPosition, + List arguments, + List argumentNames) { + if (areArgumentsPassedByPosition) { + return String.format("Missing arguments:[%s]", + String.join(",", argumentNames.subList(arguments.size(), argumentNames.size()))); + } else { + Set requiredArguments = new HashSet<>(argumentNames); + Set providedArguments = + arguments.stream().map(expression -> ((NamedArgumentExpression) expression).getArgName()) + .collect(Collectors.toSet()); + requiredArguments.removeAll(providedArguments); + return String.format("Missing arguments:[%s]", String.join(",", requiredArguments)); + } + } + + +} diff --git a/prometheus/src/test/java/org/opensearch/sql/prometheus/client/PrometheusClientImplTest.java b/prometheus/src/test/java/org/opensearch/sql/prometheus/client/PrometheusClientImplTest.java index 76abb05751..b26a45e301 100644 --- a/prometheus/src/test/java/org/opensearch/sql/prometheus/client/PrometheusClientImplTest.java +++ b/prometheus/src/test/java/org/opensearch/sql/prometheus/client/PrometheusClientImplTest.java @@ -28,6 +28,7 @@ import okhttp3.mockwebserver.MockResponse; import okhttp3.mockwebserver.MockWebServer; import okhttp3.mockwebserver.RecordedRequest; +import org.json.JSONArray; import org.json.JSONObject; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; @@ -133,6 +134,20 @@ void testGetAllMetrics() { verifyGetAllMetricsCall(recordedRequest); } + + @Test + @SneakyThrows + void testQueryExemplars() { + MockResponse mockResponse = new MockResponse() + .addHeader("Content-Type", "application/json; charset=utf-8") + .setBody(getJson("query_exemplars_response.json")); + mockWebServer.enqueue(mockResponse); + JSONArray jsonArray = prometheusClient.queryExemplars(QUERY, STARTTIME, ENDTIME); + assertTrue(new JSONArray(getJson("query_exemplars_result.json")).similar(jsonArray)); + RecordedRequest recordedRequest = mockWebServer.takeRequest(); + verifyQueryExemplarsCall(recordedRequest); + } + @AfterEach void tearDown() throws IOException { mockWebServer.shutdown(); @@ -164,4 +179,13 @@ private void verifyGetAllMetricsCall(RecordedRequest recordedRequest) { assertEquals("/api/v1/metadata", httpUrl.encodedPath()); } + private void verifyQueryExemplarsCall(RecordedRequest recordedRequest) { + HttpUrl httpUrl = recordedRequest.getRequestUrl(); + assertEquals("GET", recordedRequest.getMethod()); + assertNotNull(httpUrl); + assertEquals("/api/v1/query_exemplars", httpUrl.encodedPath()); + assertEquals(QUERY, httpUrl.queryParameter("query")); + assertEquals(STARTTIME.toString(), httpUrl.queryParameter("start")); + assertEquals(ENDTIME.toString(), httpUrl.queryParameter("end")); + } } diff --git a/prometheus/src/test/java/org/opensearch/sql/prometheus/functions/implementation/QueryExemplarsFunctionImplementationTest.java b/prometheus/src/test/java/org/opensearch/sql/prometheus/functions/implementation/QueryExemplarsFunctionImplementationTest.java new file mode 100644 index 0000000000..b5a52b0e8a --- /dev/null +++ b/prometheus/src/test/java/org/opensearch/sql/prometheus/functions/implementation/QueryExemplarsFunctionImplementationTest.java @@ -0,0 +1,92 @@ +/* + * + * * Copyright OpenSearch Contributors + * * SPDX-License-Identifier: Apache-2.0 + * + */ + +package org.opensearch.sql.prometheus.functions.implementation; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertThrows; + +import java.util.List; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; +import org.opensearch.sql.data.type.ExprCoreType; +import org.opensearch.sql.exception.ExpressionEvaluationException; +import org.opensearch.sql.expression.DSL; +import org.opensearch.sql.expression.Expression; +import org.opensearch.sql.expression.function.FunctionName; +import org.opensearch.sql.prometheus.client.PrometheusClient; +import org.opensearch.sql.prometheus.functions.implementation.QueryExemplarFunctionImplementation; +import org.opensearch.sql.prometheus.request.PrometheusQueryExemplarsRequest; +import org.opensearch.sql.prometheus.storage.QueryExemplarsTable; + + +@ExtendWith(MockitoExtension.class) +class QueryExemplarsFunctionImplementationTest { + + @Mock + private PrometheusClient client; + + + @Test + void testValueOfAndTypeAndToString() { + FunctionName functionName = new FunctionName("query_exemplars"); + List namedArgumentExpressionList + = List.of(DSL.namedArgument("query", DSL.literal("http_latency")), + DSL.namedArgument("starttime", DSL.literal(12345)), + DSL.namedArgument("endtime", DSL.literal(12345))); + QueryExemplarFunctionImplementation queryExemplarFunctionImplementation + = + new QueryExemplarFunctionImplementation(functionName, namedArgumentExpressionList, client); + UnsupportedOperationException exception = assertThrows(UnsupportedOperationException.class, + () -> queryExemplarFunctionImplementation.valueOf()); + assertEquals("Prometheus defined function [query_exemplars] is only " + + "supported in SOURCE clause with prometheus connector catalog", exception.getMessage()); + assertEquals("query_exemplars(query=\"http_latency\", starttime=12345, endtime=12345)", + queryExemplarFunctionImplementation.toString()); + assertEquals(ExprCoreType.STRUCT, queryExemplarFunctionImplementation.type()); + } + + @Test + void testApplyArguments() { + FunctionName functionName = new FunctionName("query_exemplars"); + List namedArgumentExpressionList + = List.of(DSL.namedArgument("query", DSL.literal("http_latency")), + DSL.namedArgument("starttime", DSL.literal(12345)), + DSL.namedArgument("endtime", DSL.literal(1234))); + QueryExemplarFunctionImplementation queryExemplarFunctionImplementation + = + new QueryExemplarFunctionImplementation(functionName, namedArgumentExpressionList, client); + QueryExemplarsTable queryExemplarsTable + = (QueryExemplarsTable) queryExemplarFunctionImplementation.applyArguments(); + assertNotNull(queryExemplarsTable.getExemplarsRequest()); + PrometheusQueryExemplarsRequest request = queryExemplarsTable.getExemplarsRequest(); + assertEquals("http_latency", request.getQuery()); + assertEquals(12345, request.getStartTime()); + assertEquals(1234, request.getEndTime()); + } + + @Test + void testApplyArgumentsException() { + FunctionName functionName = new FunctionName("query_exemplars"); + List namedArgumentExpressionList + = List.of(DSL.namedArgument("query", DSL.literal("http_latency")), + DSL.namedArgument("starttime", DSL.literal(12345)), + DSL.namedArgument("end_time", DSL.literal(1234))); + QueryExemplarFunctionImplementation queryExemplarFunctionImplementation + = + new QueryExemplarFunctionImplementation(functionName, namedArgumentExpressionList, client); + ExpressionEvaluationException exception = assertThrows(ExpressionEvaluationException.class, + () -> queryExemplarFunctionImplementation.applyArguments()); + assertEquals("Invalid Function Argument:end_time", exception.getMessage()); + } + + +} + diff --git a/prometheus/src/test/java/org/opensearch/sql/prometheus/functions/QueryRangeFunctionImplementationTest.java b/prometheus/src/test/java/org/opensearch/sql/prometheus/functions/implementation/QueryRangeFunctionImplementationTest.java similarity index 98% rename from prometheus/src/test/java/org/opensearch/sql/prometheus/functions/QueryRangeFunctionImplementationTest.java rename to prometheus/src/test/java/org/opensearch/sql/prometheus/functions/implementation/QueryRangeFunctionImplementationTest.java index 98195eecf7..9732999a92 100644 --- a/prometheus/src/test/java/org/opensearch/sql/prometheus/functions/QueryRangeFunctionImplementationTest.java +++ b/prometheus/src/test/java/org/opensearch/sql/prometheus/functions/implementation/QueryRangeFunctionImplementationTest.java @@ -5,7 +5,7 @@ * */ -package org.opensearch.sql.prometheus.functions; +package org.opensearch.sql.prometheus.functions.implementation; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; diff --git a/prometheus/src/test/java/org/opensearch/sql/prometheus/functions/resolver/QueryExemplarsTableFunctionResolverTest.java b/prometheus/src/test/java/org/opensearch/sql/prometheus/functions/resolver/QueryExemplarsTableFunctionResolverTest.java new file mode 100644 index 0000000000..1e296a80c3 --- /dev/null +++ b/prometheus/src/test/java/org/opensearch/sql/prometheus/functions/resolver/QueryExemplarsTableFunctionResolverTest.java @@ -0,0 +1,75 @@ +/* + * + * * Copyright OpenSearch Contributors + * * SPDX-License-Identifier: Apache-2.0 + * + */ + +package org.opensearch.sql.prometheus.functions.resolver; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.opensearch.sql.data.type.ExprCoreType.LONG; +import static org.opensearch.sql.data.type.ExprCoreType.STRING; + +import java.util.List; +import java.util.stream.Collectors; +import org.apache.commons.lang3.tuple.Pair; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; +import org.opensearch.sql.expression.DSL; +import org.opensearch.sql.expression.Expression; +import org.opensearch.sql.expression.function.FunctionBuilder; +import org.opensearch.sql.expression.function.FunctionName; +import org.opensearch.sql.expression.function.FunctionProperties; +import org.opensearch.sql.expression.function.FunctionSignature; +import org.opensearch.sql.expression.function.TableFunctionImplementation; +import org.opensearch.sql.prometheus.client.PrometheusClient; +import org.opensearch.sql.prometheus.functions.implementation.QueryExemplarFunctionImplementation; +import org.opensearch.sql.prometheus.request.PrometheusQueryExemplarsRequest; +import org.opensearch.sql.prometheus.storage.QueryExemplarsTable; + +@ExtendWith(MockitoExtension.class) +class QueryExemplarsTableFunctionResolverTest { + + @Mock + private PrometheusClient client; + + @Mock + private FunctionProperties functionProperties; + + @Test + void testResolve() { + QueryExemplarsTableFunctionResolver queryExemplarsTableFunctionResolver + = new QueryExemplarsTableFunctionResolver(client); + FunctionName functionName = FunctionName.of("query_exemplars"); + List expressions + = List.of(DSL.namedArgument("query", DSL.literal("http_latency")), + DSL.namedArgument("starttime", DSL.literal(12345)), + DSL.namedArgument("endtime", DSL.literal(12345))); + FunctionSignature functionSignature = new FunctionSignature(functionName, expressions + .stream().map(Expression::type).collect(Collectors.toList())); + Pair resolution + = queryExemplarsTableFunctionResolver.resolve(functionSignature); + assertEquals(functionName, resolution.getKey().getFunctionName()); + assertEquals(functionName, queryExemplarsTableFunctionResolver.getFunctionName()); + assertEquals(List.of(STRING, LONG, LONG), resolution.getKey().getParamTypeList()); + FunctionBuilder functionBuilder = resolution.getValue(); + TableFunctionImplementation functionImplementation + = (TableFunctionImplementation) functionBuilder.apply(functionProperties, expressions); + assertTrue(functionImplementation instanceof QueryExemplarFunctionImplementation); + QueryExemplarsTable queryExemplarsTable + = (QueryExemplarsTable) functionImplementation.applyArguments(); + assertNotNull(queryExemplarsTable.getExemplarsRequest()); + PrometheusQueryExemplarsRequest prometheusQueryExemplarsRequest = + queryExemplarsTable.getExemplarsRequest(); + assertEquals("http_latency", prometheusQueryExemplarsRequest.getQuery()); + assertEquals(12345L, prometheusQueryExemplarsRequest.getStartTime()); + assertEquals(12345L, prometheusQueryExemplarsRequest.getEndTime()); + } + +} + diff --git a/prometheus/src/test/java/org/opensearch/sql/prometheus/functions/QueryRangeTableFunctionResolverTest.java b/prometheus/src/test/java/org/opensearch/sql/prometheus/functions/resolver/QueryRangeTableFunctionResolverTest.java similarity index 94% rename from prometheus/src/test/java/org/opensearch/sql/prometheus/functions/QueryRangeTableFunctionResolverTest.java rename to prometheus/src/test/java/org/opensearch/sql/prometheus/functions/resolver/QueryRangeTableFunctionResolverTest.java index 06f003c9b6..0f7aa91abc 100644 --- a/prometheus/src/test/java/org/opensearch/sql/prometheus/functions/QueryRangeTableFunctionResolverTest.java +++ b/prometheus/src/test/java/org/opensearch/sql/prometheus/functions/resolver/QueryRangeTableFunctionResolverTest.java @@ -5,7 +5,7 @@ * */ -package org.opensearch.sql.prometheus.functions; +package org.opensearch.sql.prometheus.functions.resolver; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; @@ -60,7 +60,7 @@ void testResolve() { = queryRangeTableFunctionResolver.resolve(functionSignature); assertEquals(functionName, resolution.getKey().getFunctionName()); assertEquals(functionName, queryRangeTableFunctionResolver.getFunctionName()); - assertEquals(List.of(STRING, LONG, LONG, LONG), resolution.getKey().getParamTypeList()); + assertEquals(List.of(STRING, LONG, LONG, STRING), resolution.getKey().getParamTypeList()); FunctionBuilder functionBuilder = resolution.getValue(); TableFunctionImplementation functionImplementation = (TableFunctionImplementation) functionBuilder.apply(functionProperties, expressions); @@ -94,7 +94,7 @@ void testArgumentsPassedByPosition() { assertEquals(functionName, resolution.getKey().getFunctionName()); assertEquals(functionName, queryRangeTableFunctionResolver.getFunctionName()); - assertEquals(List.of(STRING, LONG, LONG, LONG), resolution.getKey().getParamTypeList()); + assertEquals(List.of(STRING, LONG, LONG, STRING), resolution.getKey().getParamTypeList()); FunctionBuilder functionBuilder = resolution.getValue(); TableFunctionImplementation functionImplementation = (TableFunctionImplementation) functionBuilder.apply(functionProperties, expressions); @@ -129,7 +129,7 @@ void testArgumentsPassedByNameWithDifferentOrder() { assertEquals(functionName, resolution.getKey().getFunctionName()); assertEquals(functionName, queryRangeTableFunctionResolver.getFunctionName()); - assertEquals(List.of(STRING, LONG, LONG, LONG), resolution.getKey().getParamTypeList()); + assertEquals(List.of(STRING, LONG, LONG, STRING), resolution.getKey().getParamTypeList()); FunctionBuilder functionBuilder = resolution.getValue(); TableFunctionImplementation functionImplementation = (TableFunctionImplementation) functionBuilder.apply(functionProperties, expressions); @@ -162,7 +162,7 @@ void testMixedArgumentTypes() { assertEquals(functionName, resolution.getKey().getFunctionName()); assertEquals(functionName, queryRangeTableFunctionResolver.getFunctionName()); - assertEquals(List.of(STRING, LONG, LONG, LONG), resolution.getKey().getParamTypeList()); + assertEquals(List.of(STRING, LONG, LONG, STRING), resolution.getKey().getParamTypeList()); SemanticCheckException exception = assertThrows(SemanticCheckException.class, () -> resolution.getValue().apply(functionProperties, expressions)); @@ -184,7 +184,7 @@ void testWrongArgumentsSizeWhenPassedByName() { assertEquals(functionName, resolution.getKey().getFunctionName()); assertEquals(functionName, queryRangeTableFunctionResolver.getFunctionName()); - assertEquals(List.of(STRING, LONG, LONG, LONG), resolution.getKey().getParamTypeList()); + assertEquals(List.of(STRING, LONG, LONG, STRING), resolution.getKey().getParamTypeList()); SemanticCheckException exception = assertThrows(SemanticCheckException.class, () -> resolution.getValue().apply(functionProperties, expressions)); @@ -206,7 +206,7 @@ void testWrongArgumentsSizeWhenPassedByPosition() { assertEquals(functionName, resolution.getKey().getFunctionName()); assertEquals(functionName, queryRangeTableFunctionResolver.getFunctionName()); - assertEquals(List.of(STRING, LONG, LONG, LONG), resolution.getKey().getParamTypeList()); + assertEquals(List.of(STRING, LONG, LONG, STRING), resolution.getKey().getParamTypeList()); SemanticCheckException exception = assertThrows(SemanticCheckException.class, () -> resolution.getValue().apply(functionProperties, expressions)); diff --git a/prometheus/src/test/java/org/opensearch/sql/prometheus/functions/scan/QueryExemplarsFunctionTableScanBuilderTest.java b/prometheus/src/test/java/org/opensearch/sql/prometheus/functions/scan/QueryExemplarsFunctionTableScanBuilderTest.java new file mode 100644 index 0000000000..6fd782b417 --- /dev/null +++ b/prometheus/src/test/java/org/opensearch/sql/prometheus/functions/scan/QueryExemplarsFunctionTableScanBuilderTest.java @@ -0,0 +1,61 @@ +/* + * + * * Copyright OpenSearch Contributors + * * SPDX-License-Identifier: Apache-2.0 + * + */ + +package org.opensearch.sql.prometheus.functions.scan; + + +import static org.opensearch.sql.prometheus.constants.TestConstants.ENDTIME; +import static org.opensearch.sql.prometheus.constants.TestConstants.QUERY; +import static org.opensearch.sql.prometheus.constants.TestConstants.STARTTIME; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; +import org.mockito.Mock; +import org.opensearch.sql.planner.logical.LogicalProject; +import org.opensearch.sql.prometheus.client.PrometheusClient; +import org.opensearch.sql.prometheus.request.PrometheusQueryExemplarsRequest; +import org.opensearch.sql.storage.TableScanOperator; + +public class QueryExemplarsFunctionTableScanBuilderTest { + + @Mock + private PrometheusClient prometheusClient; + + @Mock + private LogicalProject logicalProject; + + @Test + void testBuild() { + PrometheusQueryExemplarsRequest exemplarsRequest + = new PrometheusQueryExemplarsRequest(); + exemplarsRequest.setQuery(QUERY); + exemplarsRequest.setStartTime(STARTTIME); + exemplarsRequest.setEndTime(ENDTIME); + + QueryExemplarsFunctionTableScanBuilder queryExemplarsFunctionTableScanBuilder + = new QueryExemplarsFunctionTableScanBuilder(prometheusClient, exemplarsRequest); + TableScanOperator queryExemplarsFunctionTableScanOperator + = queryExemplarsFunctionTableScanBuilder.build(); + Assertions.assertNotNull(queryExemplarsFunctionTableScanOperator); + Assertions.assertTrue(queryExemplarsFunctionTableScanOperator + instanceof QueryExemplarsFunctionTableScanOperator); + } + + @Test + void testPushProject() { + PrometheusQueryExemplarsRequest exemplarsRequest + = new PrometheusQueryExemplarsRequest(); + exemplarsRequest.setQuery(QUERY); + exemplarsRequest.setStartTime(STARTTIME); + exemplarsRequest.setEndTime(ENDTIME); + + QueryExemplarsFunctionTableScanBuilder queryExemplarsFunctionTableScanBuilder + = new QueryExemplarsFunctionTableScanBuilder(prometheusClient, exemplarsRequest); + Assertions.assertTrue(queryExemplarsFunctionTableScanBuilder + .pushDownProject(logicalProject)); + } +} diff --git a/prometheus/src/test/java/org/opensearch/sql/prometheus/functions/scan/QueryExemplarsFunctionTableScanOperatorTest.java b/prometheus/src/test/java/org/opensearch/sql/prometheus/functions/scan/QueryExemplarsFunctionTableScanOperatorTest.java new file mode 100644 index 0000000000..bd77eb8c2b --- /dev/null +++ b/prometheus/src/test/java/org/opensearch/sql/prometheus/functions/scan/QueryExemplarsFunctionTableScanOperatorTest.java @@ -0,0 +1,192 @@ +/* + * + * * Copyright OpenSearch Contributors + * * SPDX-License-Identifier: Apache-2.0 + * + */ + +package org.opensearch.sql.prometheus.functions.scan; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.when; +import static org.opensearch.sql.prometheus.constants.TestConstants.ENDTIME; +import static org.opensearch.sql.prometheus.constants.TestConstants.QUERY; +import static org.opensearch.sql.prometheus.constants.TestConstants.STARTTIME; +import static org.opensearch.sql.prometheus.utils.TestUtils.getJson; + +import java.io.IOException; +import java.time.Instant; +import java.util.ArrayList; +import java.util.LinkedHashMap; +import java.util.List; +import lombok.SneakyThrows; +import org.json.JSONArray; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; +import org.opensearch.sql.data.model.ExprCollectionValue; +import org.opensearch.sql.data.model.ExprDoubleValue; +import org.opensearch.sql.data.model.ExprStringValue; +import org.opensearch.sql.data.model.ExprTimestampValue; +import org.opensearch.sql.data.model.ExprTupleValue; +import org.opensearch.sql.data.model.ExprValue; +import org.opensearch.sql.data.type.ExprCoreType; +import org.opensearch.sql.executor.ExecutionEngine; +import org.opensearch.sql.prometheus.client.PrometheusClient; +import org.opensearch.sql.prometheus.request.PrometheusQueryExemplarsRequest; + +@ExtendWith(MockitoExtension.class) +public class QueryExemplarsFunctionTableScanOperatorTest { + @Mock + private PrometheusClient prometheusClient; + + @Test + @SneakyThrows + void testQueryResponseIterator() { + + PrometheusQueryExemplarsRequest prometheusQueryExemplarsRequest + = new PrometheusQueryExemplarsRequest(); + prometheusQueryExemplarsRequest.setQuery(QUERY); + prometheusQueryExemplarsRequest.setStartTime(STARTTIME); + prometheusQueryExemplarsRequest.setEndTime(ENDTIME); + + QueryExemplarsFunctionTableScanOperator queryExemplarsFunctionTableScanOperator + = new QueryExemplarsFunctionTableScanOperator(prometheusClient, + prometheusQueryExemplarsRequest); + + when(prometheusClient.queryExemplars(any(), any(), any())) + .thenReturn(new JSONArray(getJson("query_exemplars_result.json"))); + queryExemplarsFunctionTableScanOperator.open(); + Assertions.assertTrue(queryExemplarsFunctionTableScanOperator.hasNext()); + LinkedHashMap seriesLabelsHashMap = new LinkedHashMap<>(); + seriesLabelsHashMap.put("instance", new ExprStringValue("localhost:8090")); + seriesLabelsHashMap.put("__name__", new ExprStringValue("test_exemplar_metric_total")); + seriesLabelsHashMap.put("service", new ExprStringValue("bar")); + seriesLabelsHashMap.put("job", new ExprStringValue("prometheus")); + LinkedHashMap exemplarMap = new LinkedHashMap<>(); + exemplarMap.put("labels", new ExprTupleValue(new LinkedHashMap<>() { + { + put("traceID", new ExprStringValue("EpTxMJ40fUus7aGY")); + } + }) + ); + exemplarMap.put("timestamp", new ExprTimestampValue(Instant.ofEpochMilli(1600096945479L))); + exemplarMap.put("value", new ExprDoubleValue(6)); + List exprValueList = new ArrayList<>(); + exprValueList.add(new ExprTupleValue(exemplarMap)); + ExprCollectionValue exemplars = new ExprCollectionValue(exprValueList); + ExprTupleValue seriesLabels = new ExprTupleValue(seriesLabelsHashMap); + ExprTupleValue firstRow = new ExprTupleValue(new LinkedHashMap<>() { + { + put("seriesLabels", seriesLabels); + put("exemplars", exemplars); + } + }); + + assertEquals(firstRow, queryExemplarsFunctionTableScanOperator.next()); + } + + @Test + @SneakyThrows + void testEmptyQueryWithNoMatrixKeyInResultJson() { + PrometheusQueryExemplarsRequest prometheusQueryExemplarsRequest + = new PrometheusQueryExemplarsRequest(); + prometheusQueryExemplarsRequest.setQuery(QUERY); + prometheusQueryExemplarsRequest.setStartTime(STARTTIME); + prometheusQueryExemplarsRequest.setEndTime(ENDTIME); + + QueryExemplarsFunctionTableScanOperator queryExemplarsFunctionTableScanOperator + = new QueryExemplarsFunctionTableScanOperator(prometheusClient, + prometheusQueryExemplarsRequest); + + when(prometheusClient.queryExemplars(any(), any(), any())) + .thenReturn(new JSONArray(getJson("query_exemplars_empty_result.json"))); + queryExemplarsFunctionTableScanOperator.open(); + Assertions.assertFalse(queryExemplarsFunctionTableScanOperator.hasNext()); + } + + @Test + @SneakyThrows + void testQuerySchema() { + + PrometheusQueryExemplarsRequest prometheusQueryExemplarsRequest + = new PrometheusQueryExemplarsRequest(); + prometheusQueryExemplarsRequest.setQuery(QUERY); + prometheusQueryExemplarsRequest.setStartTime(STARTTIME); + prometheusQueryExemplarsRequest.setEndTime(ENDTIME); + + QueryExemplarsFunctionTableScanOperator queryExemplarsFunctionTableScanOperator + = new QueryExemplarsFunctionTableScanOperator(prometheusClient, + prometheusQueryExemplarsRequest); + + when(prometheusClient.queryExemplars(any(), any(), any())) + .thenReturn(new JSONArray(getJson("query_exemplars_result.json"))); + queryExemplarsFunctionTableScanOperator.open(); + Assertions.assertTrue(queryExemplarsFunctionTableScanOperator.hasNext()); + + ArrayList columns = new ArrayList<>(); + columns.add( + new ExecutionEngine.Schema.Column("seriesLabels", "seriesLabels", ExprCoreType.STRUCT)); + columns.add(new ExecutionEngine.Schema.Column("exemplars", "exemplars", ExprCoreType.ARRAY)); + ExecutionEngine.Schema expectedSchema = new ExecutionEngine.Schema(columns); + assertEquals(expectedSchema, queryExemplarsFunctionTableScanOperator.schema()); + } + + @Test + @SneakyThrows + void testEmptyQueryWithException() { + + PrometheusQueryExemplarsRequest prometheusQueryExemplarsRequest + = new PrometheusQueryExemplarsRequest(); + prometheusQueryExemplarsRequest.setQuery(QUERY); + prometheusQueryExemplarsRequest.setStartTime(STARTTIME); + prometheusQueryExemplarsRequest.setEndTime(ENDTIME); + + QueryExemplarsFunctionTableScanOperator queryExemplarsFunctionTableScanOperator + = new QueryExemplarsFunctionTableScanOperator(prometheusClient, + prometheusQueryExemplarsRequest); + when(prometheusClient.queryExemplars(any(), any(), any())) + .thenThrow(new IOException("Error Message")); + RuntimeException runtimeException + = assertThrows(RuntimeException.class, queryExemplarsFunctionTableScanOperator::open); + assertEquals("Error fetching data from prometheus server: Error Message", + runtimeException.getMessage()); + } + + + @Test + @SneakyThrows + void testExplain() { + + PrometheusQueryExemplarsRequest prometheusQueryExemplarsRequest + = new PrometheusQueryExemplarsRequest(); + prometheusQueryExemplarsRequest.setQuery(QUERY); + prometheusQueryExemplarsRequest.setStartTime(STARTTIME); + prometheusQueryExemplarsRequest.setEndTime(ENDTIME); + + QueryExemplarsFunctionTableScanOperator queryExemplarsFunctionTableScanOperator + = new QueryExemplarsFunctionTableScanOperator(prometheusClient, + prometheusQueryExemplarsRequest); + Assertions.assertEquals("query_exemplars(test_query, 1664767694133, 1664771294133)", + queryExemplarsFunctionTableScanOperator.explain()); + } + + @Test + @SneakyThrows + void testClose() { + PrometheusQueryExemplarsRequest prometheusQueryExemplarsRequest + = new PrometheusQueryExemplarsRequest(); + prometheusQueryExemplarsRequest.setQuery(QUERY); + prometheusQueryExemplarsRequest.setStartTime(STARTTIME); + prometheusQueryExemplarsRequest.setEndTime(ENDTIME); + + QueryExemplarsFunctionTableScanOperator queryExemplarsFunctionTableScanOperator + = new QueryExemplarsFunctionTableScanOperator(prometheusClient, + prometheusQueryExemplarsRequest); + queryExemplarsFunctionTableScanOperator.close(); + } +} \ No newline at end of file diff --git a/prometheus/src/test/java/org/opensearch/sql/prometheus/functions/scan/QueryRangeFunctionTableScanBuilderTest.java b/prometheus/src/test/java/org/opensearch/sql/prometheus/functions/scan/QueryRangeFunctionTableScanBuilderTest.java index 5997439029..8532a35395 100644 --- a/prometheus/src/test/java/org/opensearch/sql/prometheus/functions/scan/QueryRangeFunctionTableScanBuilderTest.java +++ b/prometheus/src/test/java/org/opensearch/sql/prometheus/functions/scan/QueryRangeFunctionTableScanBuilderTest.java @@ -21,7 +21,7 @@ import org.opensearch.sql.prometheus.request.PrometheusQueryRequest; import org.opensearch.sql.storage.TableScanOperator; -public class QueryRangeFunctionTableScanBuilderTest { +public class QueryRangeFunctionTableScanBuilderTest { @Mock private PrometheusClient prometheusClient; diff --git a/prometheus/src/test/java/org/opensearch/sql/prometheus/functions/scan/QueryRangeFunctionTableScanOperatorTest.java b/prometheus/src/test/java/org/opensearch/sql/prometheus/functions/scan/QueryRangeFunctionTableScanOperatorTest.java index 79da8b466c..b476471153 100644 --- a/prometheus/src/test/java/org/opensearch/sql/prometheus/functions/scan/QueryRangeFunctionTableScanOperatorTest.java +++ b/prometheus/src/test/java/org/opensearch/sql/prometheus/functions/scan/QueryRangeFunctionTableScanOperatorTest.java @@ -134,9 +134,9 @@ void testQuerySchema() { .thenReturn(new JSONObject(getJson("query_range_result.json"))); queryRangeFunctionTableScanOperator.open(); ArrayList columns = new ArrayList<>(); + columns.add(new ExecutionEngine.Schema.Column(LABELS, LABELS, ExprCoreType.STRUCT)); columns.add(new ExecutionEngine.Schema.Column(TIMESTAMP, TIMESTAMP, ExprCoreType.ARRAY)); columns.add(new ExecutionEngine.Schema.Column(VALUE, VALUE, ExprCoreType.ARRAY)); - columns.add(new ExecutionEngine.Schema.Column(LABELS, LABELS, ExprCoreType.STRUCT)); ExecutionEngine.Schema expectedSchema = new ExecutionEngine.Schema(columns); assertEquals(expectedSchema, queryRangeFunctionTableScanOperator.schema()); } diff --git a/prometheus/src/test/java/org/opensearch/sql/prometheus/storage/PrometheusStorageEngineTest.java b/prometheus/src/test/java/org/opensearch/sql/prometheus/storage/PrometheusStorageEngineTest.java index f4c9734e15..4e8d470373 100644 --- a/prometheus/src/test/java/org/opensearch/sql/prometheus/storage/PrometheusStorageEngineTest.java +++ b/prometheus/src/test/java/org/opensearch/sql/prometheus/storage/PrometheusStorageEngineTest.java @@ -12,6 +12,7 @@ import static org.opensearch.sql.utils.SystemIndexUtils.TABLE_INFO; import java.util.Collection; +import java.util.Iterator; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.Mock; @@ -20,6 +21,7 @@ import org.opensearch.sql.exception.SemanticCheckException; import org.opensearch.sql.expression.function.FunctionResolver; import org.opensearch.sql.prometheus.client.PrometheusClient; +import org.opensearch.sql.prometheus.functions.resolver.QueryExemplarsTableFunctionResolver; import org.opensearch.sql.prometheus.functions.resolver.QueryRangeTableFunctionResolver; import org.opensearch.sql.prometheus.storage.system.PrometheusSystemTable; import org.opensearch.sql.storage.Table; @@ -44,9 +46,12 @@ public void getFunctions() { Collection functionResolverCollection = engine.getFunctions(); assertNotNull(functionResolverCollection); - assertEquals(1, functionResolverCollection.size()); + assertEquals(2, functionResolverCollection.size()); + Iterator iterator = functionResolverCollection.iterator(); assertTrue( - functionResolverCollection.iterator().next() instanceof QueryRangeTableFunctionResolver); + iterator.next() instanceof QueryRangeTableFunctionResolver); + assertTrue( + iterator.next() instanceof QueryExemplarsTableFunctionResolver); } @Test diff --git a/prometheus/src/test/java/org/opensearch/sql/prometheus/storage/QueryExemplarsTableTest.java b/prometheus/src/test/java/org/opensearch/sql/prometheus/storage/QueryExemplarsTableTest.java new file mode 100644 index 0000000000..81af30769e --- /dev/null +++ b/prometheus/src/test/java/org/opensearch/sql/prometheus/storage/QueryExemplarsTableTest.java @@ -0,0 +1,82 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.prometheus.storage; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.opensearch.sql.prometheus.constants.TestConstants.ENDTIME; +import static org.opensearch.sql.prometheus.constants.TestConstants.QUERY; +import static org.opensearch.sql.prometheus.constants.TestConstants.STARTTIME; + +import java.util.Map; +import lombok.SneakyThrows; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; +import org.opensearch.sql.data.type.ExprType; +import org.opensearch.sql.planner.logical.LogicalPlan; +import org.opensearch.sql.planner.physical.PhysicalPlan; +import org.opensearch.sql.prometheus.client.PrometheusClient; +import org.opensearch.sql.prometheus.functions.scan.QueryExemplarsFunctionTableScanBuilder; +import org.opensearch.sql.prometheus.functions.scan.QueryExemplarsFunctionTableScanOperator; +import org.opensearch.sql.prometheus.request.PrometheusQueryExemplarsRequest; +import org.opensearch.sql.storage.read.TableScanBuilder; + +@ExtendWith(MockitoExtension.class) +class QueryExemplarsTableTest { + + @Mock + private PrometheusClient client; + + @Test + @SneakyThrows + void testGetFieldTypes() { + PrometheusQueryExemplarsRequest exemplarsRequest + = new PrometheusQueryExemplarsRequest(); + exemplarsRequest.setQuery(QUERY); + exemplarsRequest.setStartTime(STARTTIME); + exemplarsRequest.setEndTime(ENDTIME); + QueryExemplarsTable queryExemplarsTable = new QueryExemplarsTable(client, exemplarsRequest); + Map fieldTypes = queryExemplarsTable.getFieldTypes(); + Assertions.assertNotNull(fieldTypes); + assertEquals(0, fieldTypes.size()); + } + + @Test + void testImplementWithBasicMetricQuery() { + + PrometheusQueryExemplarsRequest exemplarsRequest + = new PrometheusQueryExemplarsRequest(); + exemplarsRequest.setQuery(QUERY); + exemplarsRequest.setStartTime(STARTTIME); + exemplarsRequest.setEndTime(ENDTIME); + LogicalPlan logicalPlan = new QueryExemplarsFunctionTableScanBuilder(client, exemplarsRequest); + QueryExemplarsTable queryExemplarsTable = new QueryExemplarsTable(client, exemplarsRequest); + PhysicalPlan physicalPlan = queryExemplarsTable.implement(logicalPlan); + + assertTrue(physicalPlan instanceof QueryExemplarsFunctionTableScanOperator); + QueryExemplarsFunctionTableScanOperator tableScanOperator = + (QueryExemplarsFunctionTableScanOperator) physicalPlan; + assertEquals("test_query", tableScanOperator.getRequest().getQuery()); + } + + @Test + void testCreateScanBuilderWithQueryRangeTableFunction() { + PrometheusQueryExemplarsRequest exemplarsRequest + = new PrometheusQueryExemplarsRequest(); + exemplarsRequest.setQuery(QUERY); + exemplarsRequest.setStartTime(STARTTIME); + exemplarsRequest.setEndTime(ENDTIME); + QueryExemplarsTable queryExemplarsTable = new QueryExemplarsTable(client, exemplarsRequest); + TableScanBuilder tableScanBuilder = queryExemplarsTable.createScanBuilder(); + Assertions.assertNotNull(tableScanBuilder); + Assertions.assertTrue(tableScanBuilder instanceof QueryExemplarsFunctionTableScanBuilder); + } + +} + diff --git a/prometheus/src/test/resources/query_exemplars_empty_result.json b/prometheus/src/test/resources/query_exemplars_empty_result.json new file mode 100644 index 0000000000..c44dc44f37 --- /dev/null +++ b/prometheus/src/test/resources/query_exemplars_empty_result.json @@ -0,0 +1,3 @@ +[ + +] \ No newline at end of file diff --git a/prometheus/src/test/resources/query_exemplars_response.json b/prometheus/src/test/resources/query_exemplars_response.json new file mode 100644 index 0000000000..50c078d855 --- /dev/null +++ b/prometheus/src/test/resources/query_exemplars_response.json @@ -0,0 +1,46 @@ +{ + "status": "success", + "data": [ + { + "seriesLabels": { + "__name__": "test_exemplar_metric_total", + "instance": "localhost:8090", + "job": "prometheus", + "service": "bar" + }, + "exemplars": [ + { + "labels": { + "traceID": "EpTxMJ40fUus7aGY" + }, + "value": "6", + "timestamp": 1600096945.479 + } + ] + }, + { + "seriesLabels": { + "__name__": "test_exemplar_metric_total", + "instance": "localhost:8090", + "job": "prometheus", + "service": "foo" + }, + "exemplars": [ + { + "labels": { + "traceID": "Olp9XHlq763ccsfa" + }, + "value": "19", + "timestamp": 1600096955.479 + }, + { + "labels": { + "traceID": "hCtjygkIHwAN9vs4" + }, + "value": "20", + "timestamp": 1600096965.489 + } + ] + } + ] +} \ No newline at end of file diff --git a/prometheus/src/test/resources/query_exemplars_result.json b/prometheus/src/test/resources/query_exemplars_result.json new file mode 100644 index 0000000000..3c1cda5f96 --- /dev/null +++ b/prometheus/src/test/resources/query_exemplars_result.json @@ -0,0 +1,43 @@ +[ + { + "seriesLabels": { + "__name__": "test_exemplar_metric_total", + "instance": "localhost:8090", + "job": "prometheus", + "service": "bar" + }, + "exemplars": [ + { + "labels": { + "traceID": "EpTxMJ40fUus7aGY" + }, + "value": "6", + "timestamp": 1600096945.479 + } + ] + }, + { + "seriesLabels": { + "__name__": "test_exemplar_metric_total", + "instance": "localhost:8090", + "job": "prometheus", + "service": "foo" + }, + "exemplars": [ + { + "labels": { + "traceID": "Olp9XHlq763ccsfa" + }, + "value": "19", + "timestamp": 1600096955.479 + }, + { + "labels": { + "traceID": "hCtjygkIHwAN9vs4" + }, + "value": "20", + "timestamp": 1600096965.489 + } + ] + } +] \ No newline at end of file