From d15011785f93f41f59351bbdf6cc373df3ff22be Mon Sep 17 00:00:00 2001 From: Mingshi Liu <113382730+mingshl@users.noreply.github.com> Date: Wed, 5 Jul 2023 15:00:10 -0700 Subject: [PATCH] [Backport 2.x] [Search Pipelines] Add stats for search pipelines (#8376) backport commit https://github.com/opensearch-project/OpenSearch/commit/46c9a211b6b9490f6a7ac9425e946986cd51bed2 to `2.x` branch. * [Search Pipelines] Add stats for search pipelines (#8053) This adds statistics on executions and time spent on search pipeline operations, similar to the stats that are available for ingest pipelines. Signed-off-by: Mingshi Liu Co-authored-by: Michael Froh --- CHANGELOG.md | 1 + .../ingest/common/IngestRestartIT.java | 2 +- .../admin/cluster/node/stats/NodeStats.java | 24 +- .../cluster/node/stats/NodesStatsRequest.java | 3 +- .../node/stats/TransportNodesStatsAction.java | 3 +- .../cluster/stats/ClusterStatsNodes.java | 19 +- .../stats/TransportClusterStatsAction.java | 1 + .../opensearch/common/metrics/MeanMetric.java | 5 + .../common/metrics/OperationMetrics.java | 68 ++++ .../common/metrics/OperationStats.java | 107 +++++ .../opensearch/ingest/CompoundProcessor.java | 17 +- .../ingest/ConditionalProcessor.java | 13 +- .../org/opensearch/ingest/IngestMetric.java | 112 ------ .../org/opensearch/ingest/IngestService.java | 39 +- .../org/opensearch/ingest/IngestStats.java | 131 +------ .../java/org/opensearch/ingest/Pipeline.java | 13 +- .../java/org/opensearch/node/NodeService.java | 6 +- .../opensearch/search/pipeline/Pipeline.java | 170 ++++---- .../search/pipeline/PipelineWithMetrics.java | 227 +++++++++++ .../pipeline/SearchPipelineService.java | 52 ++- .../search/pipeline/SearchPipelineStats.java | 367 ++++++++++++++++++ .../cluster/node/stats/NodeStatsTests.java | 49 +-- .../cluster/stats/ClusterStatsNodesTests.java | 16 +- .../opensearch/cluster/DiskUsageTests.java | 6 + .../ingest/CompoundProcessorTests.java | 11 +- .../ingest/ConditionalProcessorTests.java | 11 +- .../opensearch/ingest/IngestServiceTests.java | 13 +- .../opensearch/ingest/IngestStatsTests.java | 29 +- .../ingest/PipelineProcessorTests.java | 31 +- .../pipeline/SearchPipelineServiceTests.java | 123 ++++++ .../pipeline/SearchPipelineStatsTests.java | 185 +++++++++ .../MockInternalClusterInfoService.java | 3 +- .../opensearch/test/InternalTestCluster.java | 1 + 33 files changed, 1399 insertions(+), 459 deletions(-) create mode 100644 server/src/main/java/org/opensearch/common/metrics/OperationMetrics.java create mode 100644 server/src/main/java/org/opensearch/common/metrics/OperationStats.java delete mode 100644 server/src/main/java/org/opensearch/ingest/IngestMetric.java create mode 100644 server/src/main/java/org/opensearch/search/pipeline/PipelineWithMetrics.java create mode 100644 server/src/main/java/org/opensearch/search/pipeline/SearchPipelineStats.java create mode 100644 server/src/test/java/org/opensearch/search/pipeline/SearchPipelineStatsTests.java diff --git a/CHANGELOG.md b/CHANGELOG.md index 99873e441c5ab..1ab9c6b2d4166 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,6 +11,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Implement concurrent aggregations support without profile option ([#7514](https://github.com/opensearch-project/OpenSearch/pull/7514)) - Add dynamic index and cluster setting for concurrent segment search ([#7956](https://github.com/opensearch-project/OpenSearch/pull/7956)) - Add descending order search optimization through reverse segment read. ([#7967](https://github.com/opensearch-project/OpenSearch/pull/7967)) +- [Search pipelines] Added search pipelines output to node stats ([#8053](https://github.com/opensearch-project/OpenSearch/pull/8053)) - Update components of segrep backpressure to support remote store. ([#8020](https://github.com/opensearch-project/OpenSearch/pull/8020)) - Make remote cluster connection setup in async ([#8038](https://github.com/opensearch-project/OpenSearch/pull/8038)) - Add API to initialize extensions ([#8029]()https://github.com/opensearch-project/OpenSearch/pull/8029) diff --git a/modules/ingest-common/src/internalClusterTest/java/org/opensearch/ingest/common/IngestRestartIT.java b/modules/ingest-common/src/internalClusterTest/java/org/opensearch/ingest/common/IngestRestartIT.java index f27c903d8795f..3fb29f7f64f08 100644 --- a/modules/ingest-common/src/internalClusterTest/java/org/opensearch/ingest/common/IngestRestartIT.java +++ b/modules/ingest-common/src/internalClusterTest/java/org/opensearch/ingest/common/IngestRestartIT.java @@ -132,7 +132,7 @@ public void testFailureInConditionalProcessor() { for (int k = 0; k < nodeCount; k++) { List stats = r.getNodes().get(k).getIngestStats().getProcessorStats().get(pipelineId); for (IngestStats.ProcessorStat st : stats) { - assertThat(st.getStats().getIngestCurrent(), greaterThanOrEqualTo(0L)); + assertThat(st.getStats().getCurrent(), greaterThanOrEqualTo(0L)); } } } diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/node/stats/NodeStats.java b/server/src/main/java/org/opensearch/action/admin/cluster/node/stats/NodeStats.java index 8599884fa4c90..57b6c8de5889b 100644 --- a/server/src/main/java/org/opensearch/action/admin/cluster/node/stats/NodeStats.java +++ b/server/src/main/java/org/opensearch/action/admin/cluster/node/stats/NodeStats.java @@ -60,6 +60,7 @@ import org.opensearch.script.ScriptCacheStats; import org.opensearch.script.ScriptStats; import org.opensearch.search.backpressure.stats.SearchBackpressureStats; +import org.opensearch.search.pipeline.SearchPipelineStats; import org.opensearch.tasks.TaskCancellationStats; import org.opensearch.threadpool.ThreadPoolStats; import org.opensearch.transport.TransportStats; @@ -139,6 +140,9 @@ public class NodeStats extends BaseNodeResponse implements ToXContentFragment { @Nullable private TaskCancellationStats taskCancellationStats; + @Nullable + private SearchPipelineStats searchPipelineStats; + public NodeStats(StreamInput in) throws IOException { super(in); timestamp = in.readVLong(); @@ -202,6 +206,11 @@ public NodeStats(StreamInput in) throws IOException { } else { taskCancellationStats = null; } + if (in.getVersion().onOrAfter(Version.V_2_9_0)) { + searchPipelineStats = in.readOptionalWriteable(SearchPipelineStats::new); + } else { + searchPipelineStats = null; + } } public NodeStats( @@ -227,7 +236,8 @@ public NodeStats( @Nullable ClusterManagerThrottlingStats clusterManagerThrottlingStats, @Nullable WeightedRoutingStats weightedRoutingStats, @Nullable FileCacheStats fileCacheStats, - @Nullable TaskCancellationStats taskCancellationStats + @Nullable TaskCancellationStats taskCancellationStats, + @Nullable SearchPipelineStats searchPipelineStats ) { super(node); this.timestamp = timestamp; @@ -252,6 +262,7 @@ public NodeStats( this.weightedRoutingStats = weightedRoutingStats; this.fileCacheStats = fileCacheStats; this.taskCancellationStats = taskCancellationStats; + this.searchPipelineStats = searchPipelineStats; } public long getTimestamp() { @@ -384,6 +395,11 @@ public TaskCancellationStats getTaskCancellationStats() { return taskCancellationStats; } + @Nullable + public SearchPipelineStats getSearchPipelineStats() { + return searchPipelineStats; + } + @Override public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); @@ -430,6 +446,9 @@ public void writeTo(StreamOutput out) throws IOException { if (out.getVersion().onOrAfter(Version.V_2_9_0)) { out.writeOptionalWriteable(taskCancellationStats); } + if (out.getVersion().onOrAfter(Version.V_2_9_0)) { + out.writeOptionalWriteable(searchPipelineStats); + } } @Override @@ -517,6 +536,9 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws if (getTaskCancellationStats() != null) { getTaskCancellationStats().toXContent(builder, params); } + if (getSearchPipelineStats() != null) { + getSearchPipelineStats().toXContent(builder, params); + } return builder; } diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/node/stats/NodesStatsRequest.java b/server/src/main/java/org/opensearch/action/admin/cluster/node/stats/NodesStatsRequest.java index d65cb621904ca..341cd9f3cedf8 100644 --- a/server/src/main/java/org/opensearch/action/admin/cluster/node/stats/NodesStatsRequest.java +++ b/server/src/main/java/org/opensearch/action/admin/cluster/node/stats/NodesStatsRequest.java @@ -242,7 +242,8 @@ public enum Metric { CLUSTER_MANAGER_THROTTLING("cluster_manager_throttling"), WEIGHTED_ROUTING_STATS("weighted_routing"), FILE_CACHE_STATS("file_cache"), - TASK_CANCELLATION("task_cancellation"); + TASK_CANCELLATION("task_cancellation"), + SEARCH_PIPELINE("search_pipeline"); private String metricName; diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/node/stats/TransportNodesStatsAction.java b/server/src/main/java/org/opensearch/action/admin/cluster/node/stats/TransportNodesStatsAction.java index 286b1a308efce..05d0bbceefe7d 100644 --- a/server/src/main/java/org/opensearch/action/admin/cluster/node/stats/TransportNodesStatsAction.java +++ b/server/src/main/java/org/opensearch/action/admin/cluster/node/stats/TransportNodesStatsAction.java @@ -123,7 +123,8 @@ protected NodeStats nodeOperation(NodeStatsRequest nodeStatsRequest) { NodesStatsRequest.Metric.CLUSTER_MANAGER_THROTTLING.containedIn(metrics), NodesStatsRequest.Metric.WEIGHTED_ROUTING_STATS.containedIn(metrics), NodesStatsRequest.Metric.FILE_CACHE_STATS.containedIn(metrics), - NodesStatsRequest.Metric.TASK_CANCELLATION.containedIn(metrics) + NodesStatsRequest.Metric.TASK_CANCELLATION.containedIn(metrics), + NodesStatsRequest.Metric.SEARCH_PIPELINE.containedIn(metrics) ); } diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/stats/ClusterStatsNodes.java b/server/src/main/java/org/opensearch/action/admin/cluster/stats/ClusterStatsNodes.java index cf66ed480c7a7..bbeda453578aa 100644 --- a/server/src/main/java/org/opensearch/action/admin/cluster/stats/ClusterStatsNodes.java +++ b/server/src/main/java/org/opensearch/action/admin/cluster/stats/ClusterStatsNodes.java @@ -41,6 +41,7 @@ import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.cluster.node.DiscoveryNodeRole; import org.opensearch.common.Strings; +import org.opensearch.common.metrics.OperationStats; import org.opensearch.common.network.NetworkModule; import org.opensearch.common.settings.Settings; import org.opensearch.common.transport.TransportAddress; @@ -800,18 +801,18 @@ static class IngestStats implements ToXContentFragment { pipelineIds.add(processorStats.getKey()); for (org.opensearch.ingest.IngestStats.ProcessorStat stat : processorStats.getValue()) { stats.compute(stat.getType(), (k, v) -> { - org.opensearch.ingest.IngestStats.Stats nodeIngestStats = stat.getStats(); + OperationStats nodeIngestStats = stat.getStats(); if (v == null) { return new long[] { - nodeIngestStats.getIngestCount(), - nodeIngestStats.getIngestFailedCount(), - nodeIngestStats.getIngestCurrent(), - nodeIngestStats.getIngestTimeInMillis() }; + nodeIngestStats.getCount(), + nodeIngestStats.getFailedCount(), + nodeIngestStats.getCurrent(), + nodeIngestStats.getTotalTimeInMillis() }; } else { - v[0] += nodeIngestStats.getIngestCount(); - v[1] += nodeIngestStats.getIngestFailedCount(); - v[2] += nodeIngestStats.getIngestCurrent(); - v[3] += nodeIngestStats.getIngestTimeInMillis(); + v[0] += nodeIngestStats.getCount(); + v[1] += nodeIngestStats.getFailedCount(); + v[2] += nodeIngestStats.getCurrent(); + v[3] += nodeIngestStats.getTotalTimeInMillis(); return v; } }); diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/stats/TransportClusterStatsAction.java b/server/src/main/java/org/opensearch/action/admin/cluster/stats/TransportClusterStatsAction.java index 166474104fa49..b1f36c13b8419 100644 --- a/server/src/main/java/org/opensearch/action/admin/cluster/stats/TransportClusterStatsAction.java +++ b/server/src/main/java/org/opensearch/action/admin/cluster/stats/TransportClusterStatsAction.java @@ -167,6 +167,7 @@ protected ClusterStatsNodeResponse nodeOperation(ClusterStatsNodeRequest nodeReq false, false, false, + false, false ); List shardsStats = new ArrayList<>(); diff --git a/server/src/main/java/org/opensearch/common/metrics/MeanMetric.java b/server/src/main/java/org/opensearch/common/metrics/MeanMetric.java index 79c04d431e97b..33f12c8cb42d3 100644 --- a/server/src/main/java/org/opensearch/common/metrics/MeanMetric.java +++ b/server/src/main/java/org/opensearch/common/metrics/MeanMetric.java @@ -49,6 +49,11 @@ public void inc(long n) { sum.add(n); } + public void add(MeanMetric other) { + counter.add(other.counter.sum()); + sum.add(other.sum.sum()); + } + public void dec(long n) { counter.decrement(); sum.add(-n); diff --git a/server/src/main/java/org/opensearch/common/metrics/OperationMetrics.java b/server/src/main/java/org/opensearch/common/metrics/OperationMetrics.java new file mode 100644 index 0000000000000..97fbbc2ce5cde --- /dev/null +++ b/server/src/main/java/org/opensearch/common/metrics/OperationMetrics.java @@ -0,0 +1,68 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.common.metrics; + +import java.util.concurrent.atomic.AtomicLong; + +/** + * Mutable tracker of a repeated operation. + * + * @opensearch.internal + */ +public class OperationMetrics { + /** + * The mean time it takes to complete the measured item. + */ + private final MeanMetric time = new MeanMetric(); + /** + * The current count of things being measured. + * Useful when aggregating multiple metrics to see how many things are in flight. + */ + private final AtomicLong current = new AtomicLong(); + /** + * The non-decreasing count of failures + */ + private final CounterMetric failed = new CounterMetric(); + + /** + * Invoked before the given operation begins. + */ + public void before() { + current.incrementAndGet(); + } + + /** + * Invoked upon completion (success or failure) of the given operation + * @param currentTime elapsed time of the operation + */ + public void after(long currentTime) { + current.decrementAndGet(); + time.inc(currentTime); + } + + /** + * Invoked upon failure of the operation. + */ + public void failed() { + failed.inc(); + } + + public void add(OperationMetrics other) { + // Don't try copying over current, since in-flight requests will be linked to the existing metrics instance. + failed.inc(other.failed.count()); + time.add(other.time); + } + + /** + * @return an immutable snapshot of the current metric values. + */ + public OperationStats createStats() { + return new OperationStats(time.count(), time.sum(), current.get(), failed.count()); + } +} diff --git a/server/src/main/java/org/opensearch/common/metrics/OperationStats.java b/server/src/main/java/org/opensearch/common/metrics/OperationStats.java new file mode 100644 index 0000000000000..a820f848393bb --- /dev/null +++ b/server/src/main/java/org/opensearch/common/metrics/OperationStats.java @@ -0,0 +1,107 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.common.metrics; + +import org.opensearch.common.io.stream.StreamInput; +import org.opensearch.common.io.stream.StreamOutput; +import org.opensearch.common.io.stream.Writeable; +import org.opensearch.common.unit.TimeValue; +import org.opensearch.core.xcontent.ToXContentFragment; +import org.opensearch.core.xcontent.XContentBuilder; + +import java.io.IOException; +import java.util.Objects; +import java.util.concurrent.TimeUnit; + +/** + * An immutable representation of a {@link OperationMetrics} + */ +public class OperationStats implements Writeable, ToXContentFragment { + private final long count; + private final long totalTimeInMillis; + private final long current; + private final long failedCount; + + public OperationStats(long count, long totalTimeInMillis, long current, long failedCount) { + this.count = count; + this.totalTimeInMillis = totalTimeInMillis; + this.current = current; + this.failedCount = failedCount; + } + + /** + * Read from a stream. + */ + public OperationStats(StreamInput in) throws IOException { + count = in.readVLong(); + totalTimeInMillis = in.readVLong(); + current = in.readVLong(); + failedCount = in.readVLong(); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeVLong(count); + out.writeVLong(totalTimeInMillis); + out.writeVLong(current); + out.writeVLong(failedCount); + } + + /** + * @return The total number of executed operations. + */ + public long getCount() { + return count; + } + + /** + * @return The total time spent of in millis. + */ + public long getTotalTimeInMillis() { + return totalTimeInMillis; + } + + /** + * @return The total number of operations currently executing. + */ + public long getCurrent() { + return current; + } + + /** + * @return The total number of operations that have failed. + */ + public long getFailedCount() { + return failedCount; + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + return builder.field("count", count) + .humanReadableField("time_in_millis", "time", new TimeValue(totalTimeInMillis, TimeUnit.MILLISECONDS)) + .field("current", current) + .field("failed", failedCount); + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + OperationStats that = (OperationStats) o; + return Objects.equals(count, that.count) + && Objects.equals(totalTimeInMillis, that.totalTimeInMillis) + && Objects.equals(failedCount, that.failedCount) + && Objects.equals(current, that.current); + } + + @Override + public int hashCode() { + return Objects.hash(count, totalTimeInMillis, failedCount, current); + } +} diff --git a/server/src/main/java/org/opensearch/ingest/CompoundProcessor.java b/server/src/main/java/org/opensearch/ingest/CompoundProcessor.java index 8cdbc487dc137..a5f4870029e87 100644 --- a/server/src/main/java/org/opensearch/ingest/CompoundProcessor.java +++ b/server/src/main/java/org/opensearch/ingest/CompoundProcessor.java @@ -34,6 +34,7 @@ import org.opensearch.OpenSearchException; import org.opensearch.common.collect.Tuple; +import org.opensearch.common.metrics.OperationMetrics; import java.util.ArrayList; import java.util.Arrays; @@ -60,7 +61,7 @@ public class CompoundProcessor implements Processor { private final boolean ignoreFailure; private final List processors; private final List onFailureProcessors; - private final List> processorsWithMetrics; + private final List> processorsWithMetrics; private final LongSupplier relativeTimeProvider; CompoundProcessor(LongSupplier relativeTimeProvider, Processor... processor) { @@ -87,10 +88,10 @@ public CompoundProcessor(boolean ignoreFailure, List processors, List this.onFailureProcessors = onFailureProcessors; this.relativeTimeProvider = relativeTimeProvider; this.processorsWithMetrics = new ArrayList<>(processors.size()); - processors.forEach(p -> processorsWithMetrics.add(new Tuple<>(p, new IngestMetric()))); + processors.forEach(p -> processorsWithMetrics.add(new Tuple<>(p, new OperationMetrics()))); } - List> getProcessorsWithMetrics() { + List> getProcessorsWithMetrics() { return processorsWithMetrics; } @@ -155,17 +156,17 @@ void innerExecute(int currentProcessor, IngestDocument ingestDocument, BiConsume return; } - Tuple processorWithMetric = processorsWithMetrics.get(currentProcessor); + Tuple processorWithMetric = processorsWithMetrics.get(currentProcessor); final Processor processor = processorWithMetric.v1(); - final IngestMetric metric = processorWithMetric.v2(); + final OperationMetrics metric = processorWithMetric.v2(); final long startTimeInNanos = relativeTimeProvider.getAsLong(); - metric.preIngest(); + metric.before(); processor.execute(ingestDocument, (result, e) -> { long ingestTimeInMillis = TimeUnit.NANOSECONDS.toMillis(relativeTimeProvider.getAsLong() - startTimeInNanos); - metric.postIngest(ingestTimeInMillis); + metric.after(ingestTimeInMillis); if (e != null) { - metric.ingestFailed(); + metric.failed(); if (ignoreFailure) { innerExecute(currentProcessor + 1, ingestDocument, handler); } else { diff --git a/server/src/main/java/org/opensearch/ingest/ConditionalProcessor.java b/server/src/main/java/org/opensearch/ingest/ConditionalProcessor.java index 591a71fd72b8f..8bf489805f7ca 100644 --- a/server/src/main/java/org/opensearch/ingest/ConditionalProcessor.java +++ b/server/src/main/java/org/opensearch/ingest/ConditionalProcessor.java @@ -32,6 +32,7 @@ package org.opensearch.ingest; +import org.opensearch.common.metrics.OperationMetrics; import org.opensearch.script.IngestConditionalScript; import org.opensearch.script.Script; import org.opensearch.script.ScriptException; @@ -66,7 +67,7 @@ public class ConditionalProcessor extends AbstractProcessor implements WrappingP private final Script condition; private final ScriptService scriptService; private final Processor processor; - private final IngestMetric metric; + private final OperationMetrics metric; private final LongSupplier relativeTimeProvider; private final IngestConditionalScript precompiledConditionScript; @@ -86,7 +87,7 @@ public class ConditionalProcessor extends AbstractProcessor implements WrappingP this.condition = script; this.scriptService = scriptService; this.processor = processor; - this.metric = new IngestMetric(); + this.metric = new OperationMetrics(); this.relativeTimeProvider = relativeTimeProvider; try { @@ -114,12 +115,12 @@ public void execute(IngestDocument ingestDocument, BiConsumer { long ingestTimeInMillis = TimeUnit.NANOSECONDS.toMillis(relativeTimeProvider.getAsLong() - startTimeInNanos); - metric.postIngest(ingestTimeInMillis); + metric.after(ingestTimeInMillis); if (e != null) { - metric.ingestFailed(); + metric.failed(); handler.accept(null, e); } else { handler.accept(result, null); @@ -148,7 +149,7 @@ public Processor getInnerProcessor() { return processor; } - IngestMetric getMetric() { + OperationMetrics getMetric() { return metric; } diff --git a/server/src/main/java/org/opensearch/ingest/IngestMetric.java b/server/src/main/java/org/opensearch/ingest/IngestMetric.java deleted file mode 100644 index 2d4a1dc9cfdee..0000000000000 --- a/server/src/main/java/org/opensearch/ingest/IngestMetric.java +++ /dev/null @@ -1,112 +0,0 @@ -/* - * SPDX-License-Identifier: Apache-2.0 - * - * The OpenSearch Contributors require contributions made to - * this file be licensed under the Apache-2.0 license or a - * compatible open source license. - */ - -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -/* - * Modifications Copyright OpenSearch Contributors. See - * GitHub history for details. - */ - -package org.opensearch.ingest; - -import org.opensearch.common.metrics.CounterMetric; -import org.opensearch.common.metrics.MeanMetric; - -import java.util.concurrent.atomic.AtomicLong; - -/** - *

Metrics to measure ingest actions. - *

This counts measure documents and timings for a given scope. - * The scope is determined by the calling code. For example you can use this class to count all documents across all pipeline, - * or you can use this class to count documents for a given pipeline or a specific processor. - * This class does not make assumptions about it's given scope. - * - * @opensearch.internal - */ -class IngestMetric { - - /** - * The time it takes to complete the measured item. - */ - private final MeanMetric ingestTime = new MeanMetric(); - /** - * The current count of things being measure. Should most likely ever be 0 or 1. - * Useful when aggregating multiple metrics to see how many things are in flight. - */ - private final AtomicLong ingestCurrent = new AtomicLong(); - /** - * The ever increasing count of things being measured - */ - private final CounterMetric ingestCount = new CounterMetric(); - /** - * The only increasing count of failures - */ - private final CounterMetric ingestFailed = new CounterMetric(); - - /** - * Call this prior to the ingest action. - */ - void preIngest() { - ingestCurrent.incrementAndGet(); - } - - /** - * Call this after the performing the ingest action, even if the action failed. - * @param ingestTimeInMillis The time it took to perform the action. - */ - void postIngest(long ingestTimeInMillis) { - ingestCurrent.decrementAndGet(); - ingestTime.inc(ingestTimeInMillis); - ingestCount.inc(); - } - - /** - * Call this if the ingest action failed. - */ - void ingestFailed() { - ingestFailed.inc(); - } - - /** - *

Add two sets of metrics together. - *

Note - this method does not add the current count values. - * The current count value is ephemeral and requires a increase/decrease operation pairs to keep the value correct. - * - * @param metrics The metric to add. - */ - void add(IngestMetric metrics) { - ingestCount.inc(metrics.ingestCount.count()); - ingestTime.inc(metrics.ingestTime.sum()); - ingestFailed.inc(metrics.ingestFailed.count()); - } - - /** - * Creates a serializable representation for these metrics. - */ - IngestStats.Stats createStats() { - return new IngestStats.Stats(ingestCount.count(), ingestTime.sum(), ingestCurrent.get(), ingestFailed.count()); - } -} diff --git a/server/src/main/java/org/opensearch/ingest/IngestService.java b/server/src/main/java/org/opensearch/ingest/IngestService.java index b9785d9ec036f..0984046ca3077 100644 --- a/server/src/main/java/org/opensearch/ingest/IngestService.java +++ b/server/src/main/java/org/opensearch/ingest/IngestService.java @@ -60,6 +60,7 @@ import org.opensearch.cluster.service.ClusterManagerTaskThrottler; import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.collect.Tuple; +import org.opensearch.common.metrics.OperationMetrics; import org.opensearch.common.regex.Regex; import org.opensearch.common.settings.Settings; import org.opensearch.common.unit.TimeValue; @@ -114,7 +115,7 @@ public class IngestService implements ClusterStateApplier, ReportingService pipelines = Collections.emptyMap(); private final ThreadPool threadPool; - private final IngestMetric totalMetrics = new IngestMetric(); + private final OperationMetrics totalMetrics = new OperationMetrics(); private final List> ingestClusterStateListeners = new CopyOnWriteArrayList<>(); private final ClusterManagerTaskThrottler.ThrottlingKey putPipelineTaskKey; private final ClusterManagerTaskThrottler.ThrottlingKey deletePipelineTaskKey; @@ -440,17 +441,17 @@ Map pipelines() { * Recursive method to obtain all of the non-failure processors for given compoundProcessor. Since conditionals are implemented as * wrappers to the actual processor, always prefer the actual processor's metric over the conditional processor's metric. * @param compoundProcessor The compound processor to start walking the non-failure processors - * @param processorMetrics The list of {@link Processor} {@link IngestMetric} tuples. + * @param processorMetrics The list of {@link Processor} {@link OperationMetrics} tuples. * @return the processorMetrics for all non-failure processor that belong to the original compoundProcessor */ - private static List> getProcessorMetrics( + private static List> getProcessorMetrics( CompoundProcessor compoundProcessor, - List> processorMetrics + List> processorMetrics ) { // only surface the top level non-failure processors, on-failure processor times will be included in the top level non-failure - for (Tuple processorWithMetric : compoundProcessor.getProcessorsWithMetrics()) { + for (Tuple processorWithMetric : compoundProcessor.getProcessorsWithMetrics()) { Processor processor = processorWithMetric.v1(); - IngestMetric metric = processorWithMetric.v2(); + OperationMetrics metric = processorWithMetric.v2(); if (processor instanceof CompoundProcessor) { getProcessorMetrics((CompoundProcessor) processor, processorMetrics); } else { @@ -614,7 +615,7 @@ private void executePipelines( if (Objects.equals(originalIndex, newIndex) == false) { if (hasFinalPipeline && it.hasNext() == false) { - totalMetrics.ingestFailed(); + totalMetrics.failed(); onFailure.accept( slot, new IllegalStateException("final pipeline [" + pipelineId + "] can't change the target index") @@ -680,11 +681,11 @@ public IngestStats stats() { Pipeline pipeline = holder.pipeline; CompoundProcessor rootProcessor = pipeline.getCompoundProcessor(); statsBuilder.addPipelineMetrics(id, pipeline.getMetrics()); - List> processorMetrics = new ArrayList<>(); + List> processorMetrics = new ArrayList<>(); getProcessorMetrics(rootProcessor, processorMetrics); processorMetrics.forEach(t -> { Processor processor = t.v1(); - IngestMetric processorMetric = t.v2(); + OperationMetrics processorMetric = t.v2(); statsBuilder.addProcessorMetrics(id, getProcessorName(processor), processor.getType(), processorMetric); }); }); @@ -739,7 +740,7 @@ private void innerExecute( long startTimeInNanos = System.nanoTime(); // the pipeline specific stat holder may not exist and that is fine: // (e.g. the pipeline may have been removed while we're ingesting a document - totalMetrics.preIngest(); + totalMetrics.before(); String index = indexRequest.index(); String id = indexRequest.id(); String routing = indexRequest.routing(); @@ -749,9 +750,9 @@ private void innerExecute( IngestDocument ingestDocument = new IngestDocument(index, id, routing, version, versionType, sourceAsMap); ingestDocument.executePipeline(pipeline, (result, e) -> { long ingestTimeInMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTimeInNanos); - totalMetrics.postIngest(ingestTimeInMillis); + totalMetrics.after(ingestTimeInMillis); if (e != null) { - totalMetrics.ingestFailed(); + totalMetrics.failed(); handler.accept(e); } else if (result == null) { itemDroppedHandler.accept(slot); @@ -835,22 +836,22 @@ void innerUpdatePipelines(IngestMetadata newIngestMetadata) { } Pipeline oldPipeline = previous.pipeline; newPipeline.getMetrics().add(oldPipeline.getMetrics()); - List> oldPerProcessMetrics = new ArrayList<>(); - List> newPerProcessMetrics = new ArrayList<>(); + List> oldPerProcessMetrics = new ArrayList<>(); + List> newPerProcessMetrics = new ArrayList<>(); getProcessorMetrics(oldPipeline.getCompoundProcessor(), oldPerProcessMetrics); getProcessorMetrics(newPipeline.getCompoundProcessor(), newPerProcessMetrics); // Best attempt to populate new processor metrics using a parallel array of the old metrics. This is not ideal since // the per processor metrics may get reset when the arrays don't match. However, to get to an ideal model, unique and // consistent id's per processor and/or semantic equals for each processor will be needed. if (newPerProcessMetrics.size() == oldPerProcessMetrics.size()) { - Iterator> oldMetricsIterator = oldPerProcessMetrics.iterator(); - for (Tuple compositeMetric : newPerProcessMetrics) { + Iterator> oldMetricsIterator = oldPerProcessMetrics.iterator(); + for (Tuple compositeMetric : newPerProcessMetrics) { String type = compositeMetric.v1().getType(); - IngestMetric metric = compositeMetric.v2(); + OperationMetrics metric = compositeMetric.v2(); if (oldMetricsIterator.hasNext()) { - Tuple oldCompositeMetric = oldMetricsIterator.next(); + Tuple oldCompositeMetric = oldMetricsIterator.next(); String oldType = oldCompositeMetric.v1().getType(); - IngestMetric oldMetric = oldCompositeMetric.v2(); + OperationMetrics oldMetric = oldCompositeMetric.v2(); if (type.equals(oldType)) { metric.add(oldMetric); } diff --git a/server/src/main/java/org/opensearch/ingest/IngestStats.java b/server/src/main/java/org/opensearch/ingest/IngestStats.java index c2a97be398ec9..ad05466f1370d 100644 --- a/server/src/main/java/org/opensearch/ingest/IngestStats.java +++ b/server/src/main/java/org/opensearch/ingest/IngestStats.java @@ -36,7 +36,8 @@ import org.opensearch.common.io.stream.StreamInput; import org.opensearch.common.io.stream.StreamOutput; import org.opensearch.common.io.stream.Writeable; -import org.opensearch.common.unit.TimeValue; +import org.opensearch.common.metrics.OperationMetrics; +import org.opensearch.common.metrics.OperationStats; import org.opensearch.core.xcontent.ToXContentFragment; import org.opensearch.core.xcontent.XContentBuilder; @@ -47,15 +48,14 @@ import java.util.List; import java.util.Map; import java.util.Objects; -import java.util.concurrent.TimeUnit; /** - * Stats for an ingest processor pipeline + * OperationStats for an ingest processor pipeline * * @opensearch.internal */ public class IngestStats implements Writeable, ToXContentFragment { - private final Stats totalStats; + private final OperationStats totalStats; private final List pipelineStats; private final Map> processorStats; @@ -65,7 +65,7 @@ public class IngestStats implements Writeable, ToXContentFragment { * @param pipelineStats - The stats for a given ingest pipeline. * @param processorStats - The per-processor stats for a given pipeline. A map keyed by the pipeline identifier. */ - public IngestStats(Stats totalStats, List pipelineStats, Map> processorStats) { + public IngestStats(OperationStats totalStats, List pipelineStats, Map> processorStats) { this.totalStats = totalStats; this.pipelineStats = pipelineStats; this.processorStats = processorStats; @@ -75,13 +75,13 @@ public IngestStats(Stats totalStats, List pipelineStats, Map(size); this.processorStats = new HashMap<>(size); for (int i = 0; i < size; i++) { String pipelineId = in.readString(); - Stats pipelineStat = new Stats(in); + OperationStats pipelineStat = new OperationStats(in); this.pipelineStats.add(new PipelineStat(pipelineId, pipelineStat)); int processorsSize = in.readVInt(); List processorStatsPerPipeline = new ArrayList<>(processorsSize); @@ -91,7 +91,7 @@ public IngestStats(StreamInput in) throws IOException { if (in.getVersion().onOrAfter(LegacyESVersion.V_7_6_0)) { processorType = in.readString(); } - Stats processorStat = new Stats(in); + OperationStats processorStat = new OperationStats(in); processorStatsPerPipeline.add(new ProcessorStat(processorName, processorType, processorStat)); } this.processorStats.put(pipelineId, processorStatsPerPipeline); @@ -153,7 +153,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws return builder; } - public Stats getTotalStats() { + public OperationStats getTotalStats() { return totalStats; } @@ -181,115 +181,24 @@ public int hashCode() { } /** - * The ingest statistics. - * - * @opensearch.internal - */ - public static class Stats implements Writeable, ToXContentFragment { - - private final long ingestCount; - private final long ingestTimeInMillis; - private final long ingestCurrent; - private final long ingestFailedCount; - - public Stats(long ingestCount, long ingestTimeInMillis, long ingestCurrent, long ingestFailedCount) { - this.ingestCount = ingestCount; - this.ingestTimeInMillis = ingestTimeInMillis; - this.ingestCurrent = ingestCurrent; - this.ingestFailedCount = ingestFailedCount; - } - - /** - * Read from a stream. - */ - public Stats(StreamInput in) throws IOException { - ingestCount = in.readVLong(); - ingestTimeInMillis = in.readVLong(); - ingestCurrent = in.readVLong(); - ingestFailedCount = in.readVLong(); - } - - @Override - public void writeTo(StreamOutput out) throws IOException { - out.writeVLong(ingestCount); - out.writeVLong(ingestTimeInMillis); - out.writeVLong(ingestCurrent); - out.writeVLong(ingestFailedCount); - } - - /** - * @return The total number of executed ingest preprocessing operations. - */ - public long getIngestCount() { - return ingestCount; - } - - /** - * @return The total time spent of ingest preprocessing in millis. - */ - public long getIngestTimeInMillis() { - return ingestTimeInMillis; - } - - /** - * @return The total number of ingest preprocessing operations currently executing. - */ - public long getIngestCurrent() { - return ingestCurrent; - } - - /** - * @return The total number of ingest preprocessing operations that have failed. - */ - public long getIngestFailedCount() { - return ingestFailedCount; - } - - @Override - public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { - builder.field("count", ingestCount); - builder.humanReadableField("time_in_millis", "time", new TimeValue(ingestTimeInMillis, TimeUnit.MILLISECONDS)); - builder.field("current", ingestCurrent); - builder.field("failed", ingestFailedCount); - return builder; - } - - @Override - public boolean equals(Object o) { - if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; - IngestStats.Stats that = (IngestStats.Stats) o; - return Objects.equals(ingestCount, that.ingestCount) - && Objects.equals(ingestTimeInMillis, that.ingestTimeInMillis) - && Objects.equals(ingestFailedCount, that.ingestFailedCount) - && Objects.equals(ingestCurrent, that.ingestCurrent); - } - - @Override - public int hashCode() { - return Objects.hash(ingestCount, ingestTimeInMillis, ingestFailedCount, ingestCurrent); - } - } - - /** - * Easy conversion from scoped {@link IngestMetric} objects to a serializable Stats objects + * Easy conversion from scoped {@link OperationMetrics} objects to a serializable OperationStats objects */ static class Builder { - private Stats totalStats; + private OperationStats totalStats; private List pipelineStats = new ArrayList<>(); private Map> processorStats = new HashMap<>(); - Builder addTotalMetrics(IngestMetric totalMetric) { + Builder addTotalMetrics(OperationMetrics totalMetric) { this.totalStats = totalMetric.createStats(); return this; } - Builder addPipelineMetrics(String pipelineId, IngestMetric pipelineMetric) { + Builder addPipelineMetrics(String pipelineId, OperationMetrics pipelineMetric) { this.pipelineStats.add(new PipelineStat(pipelineId, pipelineMetric.createStats())); return this; } - Builder addProcessorMetrics(String pipelineId, String processorName, String processorType, IngestMetric metric) { + Builder addProcessorMetrics(String pipelineId, String processorName, String processorType, OperationMetrics metric) { this.processorStats.computeIfAbsent(pipelineId, k -> new ArrayList<>()) .add(new ProcessorStat(processorName, processorType, metric.createStats())); return this; @@ -305,9 +214,9 @@ IngestStats build() { */ public static class PipelineStat { private final String pipelineId; - private final Stats stats; + private final OperationStats stats; - public PipelineStat(String pipelineId, Stats stats) { + public PipelineStat(String pipelineId, OperationStats stats) { this.pipelineId = pipelineId; this.stats = stats; } @@ -316,7 +225,7 @@ public String getPipelineId() { return pipelineId; } - public Stats getStats() { + public OperationStats getStats() { return stats; } @@ -340,9 +249,9 @@ public int hashCode() { public static class ProcessorStat { private final String name; private final String type; - private final Stats stats; + private final OperationStats stats; - public ProcessorStat(String name, String type, Stats stats) { + public ProcessorStat(String name, String type, OperationStats stats) { this.name = name; this.type = type; this.stats = stats; @@ -356,7 +265,7 @@ public String getType() { return type; } - public Stats getStats() { + public OperationStats getStats() { return stats; } diff --git a/server/src/main/java/org/opensearch/ingest/Pipeline.java b/server/src/main/java/org/opensearch/ingest/Pipeline.java index 9b3725fd65d9d..766fb9cd66777 100644 --- a/server/src/main/java/org/opensearch/ingest/Pipeline.java +++ b/server/src/main/java/org/opensearch/ingest/Pipeline.java @@ -43,6 +43,7 @@ import java.util.function.BiConsumer; import java.util.function.LongSupplier; +import org.opensearch.common.metrics.OperationMetrics; import org.opensearch.script.ScriptService; /** @@ -63,7 +64,7 @@ public final class Pipeline { @Nullable private final Integer version; private final CompoundProcessor compoundProcessor; - private final IngestMetric metrics; + private final OperationMetrics metrics; private final LongSupplier relativeTimeProvider; public Pipeline(String id, @Nullable String description, @Nullable Integer version, CompoundProcessor compoundProcessor) { @@ -82,7 +83,7 @@ public Pipeline(String id, @Nullable String description, @Nullable Integer versi this.description = description; this.compoundProcessor = compoundProcessor; this.version = version; - this.metrics = new IngestMetric(); + this.metrics = new OperationMetrics(); this.relativeTimeProvider = relativeTimeProvider; } @@ -129,12 +130,12 @@ public static Pipeline create( */ public void execute(IngestDocument ingestDocument, BiConsumer handler) { final long startTimeInNanos = relativeTimeProvider.getAsLong(); - metrics.preIngest(); + metrics.before(); compoundProcessor.execute(ingestDocument, (result, e) -> { long ingestTimeInMillis = TimeUnit.NANOSECONDS.toMillis(relativeTimeProvider.getAsLong() - startTimeInNanos); - metrics.postIngest(ingestTimeInMillis); + metrics.after(ingestTimeInMillis); if (e != null) { - metrics.ingestFailed(); + metrics.failed(); } handler.accept(result, e); }); @@ -198,7 +199,7 @@ public List flattenAllProcessors() { /** * The metrics associated with this pipeline. */ - public IngestMetric getMetrics() { + public OperationMetrics getMetrics() { return metrics; } } diff --git a/server/src/main/java/org/opensearch/node/NodeService.java b/server/src/main/java/org/opensearch/node/NodeService.java index 9382746081c18..6f4fe1e083ad7 100644 --- a/server/src/main/java/org/opensearch/node/NodeService.java +++ b/server/src/main/java/org/opensearch/node/NodeService.java @@ -216,7 +216,8 @@ public NodeStats stats( boolean clusterManagerThrottling, boolean weightedRoutingStats, boolean fileCacheStats, - boolean taskCancellation + boolean taskCancellation, + boolean searchPipelineStats ) { // for indices stats we want to include previous allocated shards stats as well (it will // only be applied to the sensible ones to use, like refresh/merge/flush/indexing stats) @@ -243,7 +244,8 @@ public NodeStats stats( clusterManagerThrottling ? this.clusterService.getClusterManagerService().getThrottlingStats() : null, weightedRoutingStats ? WeightedRoutingStats.getInstance() : null, fileCacheStats && fileCache != null ? fileCache.fileCacheStats() : null, - taskCancellation ? this.taskCancellationMonitoringService.stats() : null + taskCancellation ? this.taskCancellationMonitoringService.stats() : null, + searchPipelineStats ? this.searchPipelineService.stats() : null ); } diff --git a/server/src/main/java/org/opensearch/search/pipeline/Pipeline.java b/server/src/main/java/org/opensearch/search/pipeline/Pipeline.java index c9a5f865d507e..6f44daf48ed21 100644 --- a/server/src/main/java/org/opensearch/search/pipeline/Pipeline.java +++ b/server/src/main/java/org/opensearch/search/pipeline/Pipeline.java @@ -8,7 +8,6 @@ package org.opensearch.search.pipeline; -import org.opensearch.OpenSearchParseException; import org.opensearch.action.search.SearchRequest; import org.opensearch.action.search.SearchResponse; import org.opensearch.common.Nullable; @@ -16,17 +15,11 @@ import org.opensearch.common.io.stream.NamedWriteableAwareStreamInput; import org.opensearch.common.io.stream.NamedWriteableRegistry; import org.opensearch.common.io.stream.StreamInput; -import org.opensearch.ingest.ConfigurationUtils; -import java.util.ArrayList; -import java.util.Arrays; import java.util.Collections; import java.util.List; -import java.util.Map; - -import static org.opensearch.ingest.ConfigurationUtils.TAG_KEY; -import static org.opensearch.ingest.Pipeline.DESCRIPTION_KEY; -import static org.opensearch.ingest.Pipeline.VERSION_KEY; +import java.util.concurrent.TimeUnit; +import java.util.function.LongSupplier; /** * Concrete representation of a search pipeline, holding multiple processors. @@ -45,73 +38,24 @@ class Pipeline { private final List searchResponseProcessors; private final NamedWriteableRegistry namedWriteableRegistry; + private final LongSupplier relativeTimeSupplier; - private Pipeline( + Pipeline( String id, @Nullable String description, @Nullable Integer version, List requestProcessors, List responseProcessors, - NamedWriteableRegistry namedWriteableRegistry + NamedWriteableRegistry namedWriteableRegistry, + LongSupplier relativeTimeSupplier ) { this.id = id; this.description = description; this.version = version; - this.searchRequestProcessors = requestProcessors; - this.searchResponseProcessors = responseProcessors; + this.searchRequestProcessors = Collections.unmodifiableList(requestProcessors); + this.searchResponseProcessors = Collections.unmodifiableList(responseProcessors); this.namedWriteableRegistry = namedWriteableRegistry; - } - - static Pipeline create( - String id, - Map config, - Map> requestProcessorFactories, - Map> responseProcessorFactories, - NamedWriteableRegistry namedWriteableRegistry - ) throws Exception { - String description = ConfigurationUtils.readOptionalStringProperty(null, null, config, DESCRIPTION_KEY); - Integer version = ConfigurationUtils.readIntProperty(null, null, config, VERSION_KEY, null); - List> requestProcessorConfigs = ConfigurationUtils.readOptionalList(null, null, config, REQUEST_PROCESSORS_KEY); - List requestProcessors = readProcessors(requestProcessorFactories, requestProcessorConfigs); - List> responseProcessorConfigs = ConfigurationUtils.readOptionalList( - null, - null, - config, - RESPONSE_PROCESSORS_KEY - ); - List responseProcessors = readProcessors(responseProcessorFactories, responseProcessorConfigs); - if (config.isEmpty() == false) { - throw new OpenSearchParseException( - "pipeline [" - + id - + "] doesn't support one or more provided configuration parameters " - + Arrays.toString(config.keySet().toArray()) - ); - } - return new Pipeline(id, description, version, requestProcessors, responseProcessors, namedWriteableRegistry); - } - - private static List readProcessors( - Map> processorFactories, - List> requestProcessorConfigs - ) throws Exception { - List processors = new ArrayList<>(); - if (requestProcessorConfigs == null) { - return processors; - } - for (Map processorConfigWithKey : requestProcessorConfigs) { - for (Map.Entry entry : processorConfigWithKey.entrySet()) { - String type = entry.getKey(); - if (!processorFactories.containsKey(type)) { - throw new IllegalArgumentException("Invalid processor type " + type); - } - Map config = (Map) entry.getValue(); - String tag = ConfigurationUtils.readOptionalStringProperty(null, null, config, TAG_KEY); - String description = ConfigurationUtils.readOptionalStringProperty(null, tag, config, DESCRIPTION_KEY); - processors.add(processorFactories.get(type).create(processorFactories, tag, description, config)); - } - } - return Collections.unmodifiableList(processors); + this.relativeTimeSupplier = relativeTimeSupplier; } String getId() { @@ -134,32 +78,94 @@ List getSearchResponseProcessors() { return searchResponseProcessors; } - SearchRequest transformRequest(SearchRequest request) throws Exception { + protected void beforeTransformRequest() {} + + protected void afterTransformRequest(long timeInNanos) {} + + protected void onTransformRequestFailure() {} + + protected void beforeRequestProcessor(Processor processor) {} + + protected void afterRequestProcessor(Processor processor, long timeInNanos) {} + + protected void onRequestProcessorFailed(Processor processor) {} + + protected void beforeTransformResponse() {} + + protected void afterTransformResponse(long timeInNanos) {} + + protected void onTransformResponseFailure() {} + + protected void beforeResponseProcessor(Processor processor) {} + + protected void afterResponseProcessor(Processor processor, long timeInNanos) {} + + protected void onResponseProcessorFailed(Processor processor) {} + + SearchRequest transformRequest(SearchRequest request) throws SearchPipelineProcessingException { if (searchRequestProcessors.isEmpty() == false) { - try (BytesStreamOutput bytesStreamOutput = new BytesStreamOutput()) { - request.writeTo(bytesStreamOutput); - try (StreamInput in = bytesStreamOutput.bytes().streamInput()) { - try (StreamInput input = new NamedWriteableAwareStreamInput(in, namedWriteableRegistry)) { - request = new SearchRequest(input); + long pipelineStart = relativeTimeSupplier.getAsLong(); + beforeTransformRequest(); + try { + try (BytesStreamOutput bytesStreamOutput = new BytesStreamOutput()) { + request.writeTo(bytesStreamOutput); + try (StreamInput in = bytesStreamOutput.bytes().streamInput()) { + try (StreamInput input = new NamedWriteableAwareStreamInput(in, namedWriteableRegistry)) { + request = new SearchRequest(input); + } } } - } - for (SearchRequestProcessor searchRequestProcessor : searchRequestProcessors) { - request = searchRequestProcessor.processRequest(request); + for (SearchRequestProcessor processor : searchRequestProcessors) { + beforeRequestProcessor(processor); + long start = relativeTimeSupplier.getAsLong(); + try { + request = processor.processRequest(request); + } catch (Exception e) { + onRequestProcessorFailed(processor); + throw e; + } finally { + long took = TimeUnit.NANOSECONDS.toMillis(relativeTimeSupplier.getAsLong() - start); + afterRequestProcessor(processor, took); + } + } + } catch (Exception e) { + onTransformRequestFailure(); + throw new SearchPipelineProcessingException(e); + } finally { + long took = TimeUnit.NANOSECONDS.toMillis(relativeTimeSupplier.getAsLong() - pipelineStart); + afterTransformRequest(took); } } return request; } SearchResponse transformResponse(SearchRequest request, SearchResponse response) throws SearchPipelineProcessingException { - try { - for (SearchResponseProcessor responseProcessor : searchResponseProcessors) { - response = responseProcessor.processResponse(request, response); + if (searchResponseProcessors.isEmpty() == false) { + long pipelineStart = relativeTimeSupplier.getAsLong(); + beforeTransformResponse(); + try { + for (SearchResponseProcessor processor : searchResponseProcessors) { + beforeResponseProcessor(processor); + long start = relativeTimeSupplier.getAsLong(); + try { + response = processor.processResponse(request, response); + } catch (Exception e) { + onResponseProcessorFailed(processor); + throw e; + } finally { + long took = TimeUnit.NANOSECONDS.toMillis(relativeTimeSupplier.getAsLong() - start); + afterResponseProcessor(processor, took); + } + } + } catch (Exception e) { + onTransformResponseFailure(); + throw new SearchPipelineProcessingException(e); + } finally { + long took = TimeUnit.NANOSECONDS.toMillis(relativeTimeSupplier.getAsLong() - pipelineStart); + afterTransformResponse(took); } - return response; - } catch (Exception e) { - throw new SearchPipelineProcessingException(e); } + return response; } static final Pipeline NO_OP_PIPELINE = new Pipeline( @@ -168,6 +174,8 @@ SearchResponse transformResponse(SearchRequest request, SearchResponse response) 0, Collections.emptyList(), Collections.emptyList(), - null + null, + () -> 0L ); + } diff --git a/server/src/main/java/org/opensearch/search/pipeline/PipelineWithMetrics.java b/server/src/main/java/org/opensearch/search/pipeline/PipelineWithMetrics.java new file mode 100644 index 0000000000000..662473f190006 --- /dev/null +++ b/server/src/main/java/org/opensearch/search/pipeline/PipelineWithMetrics.java @@ -0,0 +1,227 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.search.pipeline; + +import org.opensearch.OpenSearchParseException; +import org.opensearch.common.io.stream.NamedWriteableRegistry; +import org.opensearch.common.metrics.OperationMetrics; +import org.opensearch.ingest.ConfigurationUtils; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.function.LongSupplier; + +import static org.opensearch.ingest.ConfigurationUtils.TAG_KEY; +import static org.opensearch.ingest.Pipeline.DESCRIPTION_KEY; +import static org.opensearch.ingest.Pipeline.VERSION_KEY; + +/** + * Specialization of {@link Pipeline} that adds metrics to track executions of the pipeline and individual processors. + */ +class PipelineWithMetrics extends Pipeline { + + private final OperationMetrics totalRequestMetrics; + private final OperationMetrics totalResponseMetrics; + private final OperationMetrics pipelineRequestMetrics = new OperationMetrics(); + private final OperationMetrics pipelineResponseMetrics = new OperationMetrics(); + private final Map requestProcessorMetrics = new HashMap<>(); + private final Map responseProcessorMetrics = new HashMap<>(); + + PipelineWithMetrics( + String id, + String description, + Integer version, + List requestProcessors, + List responseProcessors, + NamedWriteableRegistry namedWriteableRegistry, + OperationMetrics totalRequestMetrics, + OperationMetrics totalResponseMetrics, + LongSupplier relativeTimeSupplier + ) { + super(id, description, version, requestProcessors, responseProcessors, namedWriteableRegistry, relativeTimeSupplier); + this.totalRequestMetrics = totalRequestMetrics; + this.totalResponseMetrics = totalResponseMetrics; + for (Processor requestProcessor : getSearchRequestProcessors()) { + requestProcessorMetrics.putIfAbsent(getProcessorKey(requestProcessor), new OperationMetrics()); + } + for (Processor responseProcessor : getSearchResponseProcessors()) { + responseProcessorMetrics.putIfAbsent(getProcessorKey(responseProcessor), new OperationMetrics()); + } + } + + static PipelineWithMetrics create( + String id, + Map config, + Map> requestProcessorFactories, + Map> responseProcessorFactories, + NamedWriteableRegistry namedWriteableRegistry, + OperationMetrics totalRequestProcessingMetrics, + OperationMetrics totalResponseProcessingMetrics + ) throws Exception { + String description = ConfigurationUtils.readOptionalStringProperty(null, null, config, DESCRIPTION_KEY); + Integer version = ConfigurationUtils.readIntProperty(null, null, config, VERSION_KEY, null); + List> requestProcessorConfigs = ConfigurationUtils.readOptionalList(null, null, config, REQUEST_PROCESSORS_KEY); + List requestProcessors = readProcessors(requestProcessorFactories, requestProcessorConfigs); + List> responseProcessorConfigs = ConfigurationUtils.readOptionalList( + null, + null, + config, + RESPONSE_PROCESSORS_KEY + ); + List responseProcessors = readProcessors(responseProcessorFactories, responseProcessorConfigs); + if (config.isEmpty() == false) { + throw new OpenSearchParseException( + "pipeline [" + + id + + "] doesn't support one or more provided configuration parameters " + + Arrays.toString(config.keySet().toArray()) + ); + } + return new PipelineWithMetrics( + id, + description, + version, + requestProcessors, + responseProcessors, + namedWriteableRegistry, + totalRequestProcessingMetrics, + totalResponseProcessingMetrics, + System::nanoTime + ); + + } + + private static List readProcessors( + Map> processorFactories, + List> requestProcessorConfigs + ) throws Exception { + List processors = new ArrayList<>(); + if (requestProcessorConfigs == null) { + return processors; + } + for (Map processorConfigWithKey : requestProcessorConfigs) { + for (Map.Entry entry : processorConfigWithKey.entrySet()) { + String type = entry.getKey(); + if (!processorFactories.containsKey(type)) { + throw new IllegalArgumentException("Invalid processor type " + type); + } + Map config = (Map) entry.getValue(); + String tag = ConfigurationUtils.readOptionalStringProperty(null, null, config, TAG_KEY); + String description = ConfigurationUtils.readOptionalStringProperty(null, tag, config, DESCRIPTION_KEY); + processors.add(processorFactories.get(type).create(processorFactories, tag, description, config)); + } + } + return Collections.unmodifiableList(processors); + } + + @Override + protected void beforeTransformRequest() { + super.beforeTransformRequest(); + totalRequestMetrics.before(); + pipelineRequestMetrics.before(); + } + + @Override + protected void afterTransformRequest(long timeInNanos) { + super.afterTransformRequest(timeInNanos); + totalRequestMetrics.after(timeInNanos); + pipelineRequestMetrics.after(timeInNanos); + } + + @Override + protected void onTransformRequestFailure() { + super.onTransformRequestFailure(); + totalRequestMetrics.failed(); + pipelineRequestMetrics.failed(); + } + + protected void beforeRequestProcessor(Processor processor) { + requestProcessorMetrics.get(getProcessorKey(processor)).before(); + } + + protected void afterRequestProcessor(Processor processor, long timeInNanos) { + requestProcessorMetrics.get(getProcessorKey(processor)).after(timeInNanos); + } + + protected void onRequestProcessorFailed(Processor processor) { + requestProcessorMetrics.get(getProcessorKey(processor)).failed(); + } + + protected void beforeTransformResponse() { + super.beforeTransformRequest(); + totalResponseMetrics.before(); + pipelineResponseMetrics.before(); + } + + protected void afterTransformResponse(long timeInNanos) { + super.afterTransformResponse(timeInNanos); + totalResponseMetrics.after(timeInNanos); + pipelineResponseMetrics.after(timeInNanos); + } + + protected void onTransformResponseFailure() { + super.onTransformResponseFailure(); + totalResponseMetrics.failed(); + pipelineResponseMetrics.failed(); + } + + protected void beforeResponseProcessor(Processor processor) { + responseProcessorMetrics.get(getProcessorKey(processor)).before(); + } + + protected void afterResponseProcessor(Processor processor, long timeInNanos) { + responseProcessorMetrics.get(getProcessorKey(processor)).after(timeInNanos); + } + + protected void onResponseProcessorFailed(Processor processor) { + responseProcessorMetrics.get(getProcessorKey(processor)).failed(); + } + + void copyMetrics(PipelineWithMetrics oldPipeline) { + pipelineRequestMetrics.add(oldPipeline.pipelineRequestMetrics); + pipelineResponseMetrics.add(oldPipeline.pipelineResponseMetrics); + copyProcessorMetrics(requestProcessorMetrics, oldPipeline.requestProcessorMetrics); + copyProcessorMetrics(responseProcessorMetrics, oldPipeline.responseProcessorMetrics); + } + + private static void copyProcessorMetrics( + Map newProcessorMetrics, + Map oldProcessorMetrics + ) { + for (Map.Entry oldProcessorMetric : oldProcessorMetrics.entrySet()) { + if (newProcessorMetrics.containsKey(oldProcessorMetric.getKey())) { + newProcessorMetrics.get(oldProcessorMetric.getKey()).add(oldProcessorMetric.getValue()); + } + } + } + + private static String getProcessorKey(Processor processor) { + String key = processor.getType(); + if (processor.getTag() != null) { + return key + ":" + processor.getTag(); + } + return key; + } + + void populateStats(SearchPipelineStats.Builder statsBuilder) { + statsBuilder.addPipelineStats(getId(), pipelineRequestMetrics, pipelineResponseMetrics); + for (Processor processor : getSearchRequestProcessors()) { + String key = getProcessorKey(processor); + statsBuilder.addRequestProcessorStats(getId(), key, processor.getType(), requestProcessorMetrics.get(key)); + } + for (Processor processor : getSearchResponseProcessors()) { + String key = getProcessorKey(processor); + statsBuilder.addResponseProcessorStats(getId(), key, processor.getType(), responseProcessorMetrics.get(key)); + } + } +} diff --git a/server/src/main/java/org/opensearch/search/pipeline/SearchPipelineService.java b/server/src/main/java/org/opensearch/search/pipeline/SearchPipelineService.java index 87c09bd971284..434c8fbfacc74 100644 --- a/server/src/main/java/org/opensearch/search/pipeline/SearchPipelineService.java +++ b/server/src/main/java/org/opensearch/search/pipeline/SearchPipelineService.java @@ -30,6 +30,7 @@ import org.opensearch.cluster.service.ClusterManagerTaskThrottler; import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.io.stream.NamedWriteableRegistry; +import org.opensearch.common.metrics.OperationMetrics; import org.opensearch.common.regex.Regex; import org.opensearch.common.settings.Settings; import org.opensearch.common.unit.TimeValue; @@ -80,6 +81,9 @@ public class SearchPipelineService implements ClusterStateApplier, ReportingServ private final NamedWriteableRegistry namedWriteableRegistry; private volatile ClusterState state; + private final OperationMetrics totalRequestProcessingMetrics = new OperationMetrics(); + private final OperationMetrics totalResponseProcessingMetrics = new OperationMetrics(); + private final boolean isEnabled; public SearchPipelineService( @@ -172,26 +176,26 @@ void innerUpdatePipelines(SearchPipelineMetadata newSearchPipelineMetadata) { newPipelines = new HashMap<>(existingPipelines); } try { - Pipeline newPipeline = Pipeline.create( + PipelineWithMetrics newPipeline = PipelineWithMetrics.create( newConfiguration.getId(), newConfiguration.getConfigAsMap(), requestProcessorFactories, responseProcessorFactories, - namedWriteableRegistry + namedWriteableRegistry, + totalRequestProcessingMetrics, + totalResponseProcessingMetrics ); newPipelines.put(newConfiguration.getId(), new PipelineHolder(newConfiguration, newPipeline)); - if (previous == null) { - continue; + if (previous != null) { + newPipeline.copyMetrics(previous.pipeline); } - // TODO -- once we add in pipeline metrics (like in ingest pipelines), we will need to deep-copy - // the old pipeline's metrics into the new pipeline. } catch (Exception e) { OpenSearchParseException parseException = new OpenSearchParseException( "Error updating pipeline with id [" + newConfiguration.getId() + "]", e ); - // TODO -- replace pipeline with one that throws an exception when we try to use it + // TODO -- replace pipeline with one that throws this exception when we try to use it if (exceptions == null) { exceptions = new ArrayList<>(); } @@ -271,12 +275,14 @@ void validatePipeline(Map searchPipelineInfos throw new IllegalStateException("Search pipeline info is empty"); } Map pipelineConfig = XContentHelper.convertToMap(request.getSource(), false, request.getXContentType()).v2(); - Pipeline pipeline = Pipeline.create( + Pipeline pipeline = PipelineWithMetrics.create( request.getId(), pipelineConfig, requestProcessorFactories, responseProcessorFactories, - namedWriteableRegistry + namedWriteableRegistry, + new OperationMetrics(), // Use ephemeral metrics for validation + new OperationMetrics() ); List exceptions = new ArrayList<>(); for (SearchRequestProcessor processor : pipeline.getSearchRequestProcessors()) { @@ -367,12 +373,14 @@ public PipelinedRequest resolvePipeline(SearchRequest searchRequest) throws Exce ); } try { - pipeline = Pipeline.create( + pipeline = PipelineWithMetrics.create( AD_HOC_PIPELINE_ID, searchRequest.source().searchPipelineSource(), requestProcessorFactories, responseProcessorFactories, - namedWriteableRegistry + namedWriteableRegistry, + totalRequestProcessingMetrics, + totalResponseProcessingMetrics ); } catch (Exception e) { throw new SearchPipelineProcessingException(e); @@ -400,12 +408,8 @@ public PipelinedRequest resolvePipeline(SearchRequest searchRequest) throws Exce pipeline = pipelineHolder.pipeline; } } - try { - SearchRequest transformedRequest = pipeline.transformRequest(searchRequest); - return new PipelinedRequest(pipeline, transformedRequest); - } catch (Exception e) { - throw new SearchPipelineProcessingException(e); - } + SearchRequest transformedRequest = pipeline.transformRequest(searchRequest); + return new PipelinedRequest(pipeline, transformedRequest); } Map> getRequestProcessorFactories() { @@ -431,6 +435,16 @@ public SearchPipelineInfo info() { ); } + public SearchPipelineStats stats() { + SearchPipelineStats.Builder builder = new SearchPipelineStats.Builder(); + builder.withTotalStats(totalRequestProcessingMetrics, totalResponseProcessingMetrics); + for (PipelineHolder pipelineHolder : pipelines.values()) { + PipelineWithMetrics pipeline = pipelineHolder.pipeline; + pipeline.populateStats(builder); + } + return builder.build(); + } + public static List getPipelines(ClusterState clusterState, String... ids) { SearchPipelineMetadata metadata = clusterState.getMetadata().custom(SearchPipelineMetadata.TYPE); return innerGetPipelines(metadata, ids); @@ -474,9 +488,9 @@ Map getPipelines() { static class PipelineHolder { final PipelineConfiguration configuration; - final Pipeline pipeline; + final PipelineWithMetrics pipeline; - PipelineHolder(PipelineConfiguration configuration, Pipeline pipeline) { + PipelineHolder(PipelineConfiguration configuration, PipelineWithMetrics pipeline) { this.configuration = Objects.requireNonNull(configuration); this.pipeline = Objects.requireNonNull(pipeline); } diff --git a/server/src/main/java/org/opensearch/search/pipeline/SearchPipelineStats.java b/server/src/main/java/org/opensearch/search/pipeline/SearchPipelineStats.java new file mode 100644 index 0000000000000..4261bfe99160a --- /dev/null +++ b/server/src/main/java/org/opensearch/search/pipeline/SearchPipelineStats.java @@ -0,0 +1,367 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.search.pipeline; + +import org.opensearch.common.io.stream.StreamInput; +import org.opensearch.common.io.stream.StreamOutput; +import org.opensearch.common.io.stream.Writeable; +import org.opensearch.common.metrics.OperationMetrics; +import org.opensearch.common.metrics.OperationStats; +import org.opensearch.core.xcontent.ToXContentFragment; +import org.opensearch.core.xcontent.XContentBuilder; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.TreeMap; + +import static java.util.Collections.emptyList; +import static java.util.Collections.unmodifiableList; +import static java.util.Collections.unmodifiableMap; + +/** + * Serializable, immutable search pipeline statistics to be returned via stats APIs. + * + * @opensearch.internal + */ +public class SearchPipelineStats implements Writeable, ToXContentFragment { + + private final OperationStats totalRequestStats; + private final OperationStats totalResponseStats; + private final List perPipelineStats; + private final Map perPipelineProcessorStats; + + public SearchPipelineStats( + OperationStats totalRequestStats, + OperationStats totalResponseStats, + List perPipelineStats, + Map perPipelineProcessorStats + ) { + this.totalRequestStats = totalRequestStats; + this.totalResponseStats = totalResponseStats; + this.perPipelineStats = perPipelineStats; + this.perPipelineProcessorStats = perPipelineProcessorStats; + } + + public SearchPipelineStats(StreamInput in) throws IOException { + this.totalRequestStats = new OperationStats(in); + this.totalResponseStats = new OperationStats(in); + int size = in.readVInt(); + List perPipelineStats = new ArrayList<>(size); + Map pipelineDetailStatsMap = new TreeMap<>(); + for (int i = 0; i < size; i++) { + String pipelineId = in.readString(); + OperationStats pipelineRequestStats = new OperationStats(in); + OperationStats pipelineResponseStats = new OperationStats(in); + perPipelineStats.add(new PerPipelineStats(pipelineId, pipelineRequestStats, pipelineResponseStats)); + int numRequestProcessors = in.readVInt(); + List requestProcessorStats = new ArrayList<>(numRequestProcessors); + for (int j = 0; j < numRequestProcessors; j++) { + String processorName = in.readString(); + String processorType = in.readString(); + OperationStats processorStats = new OperationStats(in); + requestProcessorStats.add(new ProcessorStats(processorName, processorType, processorStats)); + } + int numResponseProcessors = in.readVInt(); + List responseProcessorStats = new ArrayList<>(numResponseProcessors); + for (int j = 0; j < numResponseProcessors; j++) { + String processorName = in.readString(); + String processorType = in.readString(); + OperationStats processorStats = new OperationStats(in); + responseProcessorStats.add(new ProcessorStats(processorName, processorType, processorStats)); + } + pipelineDetailStatsMap.put( + pipelineId, + new PipelineDetailStats(unmodifiableList(requestProcessorStats), unmodifiableList(responseProcessorStats)) + ); + } + this.perPipelineStats = unmodifiableList(perPipelineStats); + this.perPipelineProcessorStats = unmodifiableMap(pipelineDetailStatsMap); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject("search_pipeline"); + builder.startObject("total_request"); + totalRequestStats.toXContent(builder, params); + builder.endObject(); + builder.startObject("total_response"); + totalResponseStats.toXContent(builder, params); + builder.endObject(); + builder.startObject("pipelines"); + for (PerPipelineStats pipelineStat : perPipelineStats) { + builder.startObject(pipelineStat.pipelineId); + builder.startObject("request"); + pipelineStat.requestStats.toXContent(builder, params); + builder.endObject(); + builder.startObject("response"); + pipelineStat.responseStats.toXContent(builder, params); + builder.endObject(); + + PipelineDetailStats pipelineDetailStats = perPipelineProcessorStats.get(pipelineStat.pipelineId); + builder.startArray("request_processors"); + for (ProcessorStats processorStats : pipelineDetailStats.requestProcessorStats) { + builder.startObject(); + processorStats.toXContent(builder, params); + builder.endObject(); + } + builder.endArray(); + builder.startArray("response_processors"); + for (ProcessorStats processorStats : pipelineDetailStats.responseProcessorStats) { + builder.startObject(); + processorStats.toXContent(builder, params); + builder.endObject(); + } + builder.endArray(); + builder.endObject(); + } + builder.endObject(); + builder.endObject(); + return builder; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + totalRequestStats.writeTo(out); + totalResponseStats.writeTo(out); + out.writeVInt(perPipelineStats.size()); + for (PerPipelineStats pipelineStat : perPipelineStats) { + out.writeString(pipelineStat.pipelineId); + pipelineStat.requestStats.writeTo(out); + pipelineStat.responseStats.writeTo(out); + PipelineDetailStats pipelineDetailStats = perPipelineProcessorStats.get(pipelineStat.pipelineId); + out.writeVInt(pipelineDetailStats.requestProcessorStats.size()); + for (ProcessorStats processorStats : pipelineDetailStats.requestProcessorStats) { + out.writeString(processorStats.processorName); + out.writeString(processorStats.processorType); + processorStats.stats.writeTo(out); + } + out.writeVInt(pipelineDetailStats.responseProcessorStats.size()); + for (ProcessorStats processorStats : pipelineDetailStats.responseProcessorStats) { + out.writeString(processorStats.processorName); + out.writeString(processorStats.processorType); + processorStats.stats.writeTo(out); + } + } + } + + static class Builder { + private OperationStats totalRequestStats; + private OperationStats totalResponseStats; + private final List perPipelineStats = new ArrayList<>(); + private final Map> requestProcessorStatsPerPipeline = new HashMap<>(); + private final Map> responseProcessorStatsPerPipeline = new HashMap<>(); + + Builder withTotalStats(OperationMetrics totalRequestMetrics, OperationMetrics totalResponseMetrics) { + this.totalRequestStats = totalRequestMetrics.createStats(); + this.totalResponseStats = totalResponseMetrics.createStats(); + return this; + } + + Builder addPipelineStats(String pipelineId, OperationMetrics pipelineRequestMetrics, OperationMetrics pipelineResponseMetrics) { + this.perPipelineStats.add( + new PerPipelineStats(pipelineId, pipelineRequestMetrics.createStats(), pipelineResponseMetrics.createStats()) + ); + return this; + } + + Builder addRequestProcessorStats(String pipelineId, String processorName, String processorType, OperationMetrics processorMetrics) { + this.requestProcessorStatsPerPipeline.computeIfAbsent(pipelineId, k -> new ArrayList<>()) + .add(new ProcessorStats(processorName, processorType, processorMetrics.createStats())); + return this; + } + + Builder addResponseProcessorStats( + String pipelineId, + String processorName, + String processorType, + OperationMetrics processorMetrics + ) { + this.responseProcessorStatsPerPipeline.computeIfAbsent(pipelineId, k -> new ArrayList<>()) + .add(new ProcessorStats(processorName, processorType, processorMetrics.createStats())); + return this; + } + + SearchPipelineStats build() { + Map pipelineDetailStatsMap = new TreeMap<>(); + for (PerPipelineStats pipelineStat : perPipelineStats) { + List requestProcessorStats = requestProcessorStatsPerPipeline.getOrDefault( + pipelineStat.pipelineId, + emptyList() + ); + List responseProcessorStats = responseProcessorStatsPerPipeline.getOrDefault( + pipelineStat.pipelineId, + emptyList() + ); + PipelineDetailStats pipelineDetailStats = new PipelineDetailStats( + unmodifiableList(requestProcessorStats), + unmodifiableList(responseProcessorStats) + ); + pipelineDetailStatsMap.put(pipelineStat.pipelineId, pipelineDetailStats); + } + return new SearchPipelineStats( + totalRequestStats, + totalResponseStats, + unmodifiableList(perPipelineStats), + unmodifiableMap(pipelineDetailStatsMap) + ); + } + } + + static class PerPipelineStats { + private final String pipelineId; + private final OperationStats requestStats; + private final OperationStats responseStats; + + public PerPipelineStats(String pipelineId, OperationStats requestStats, OperationStats responseStats) { + this.pipelineId = pipelineId; + this.requestStats = requestStats; + this.responseStats = responseStats; + } + + public String getPipelineId() { + return pipelineId; + } + + public OperationStats getRequestStats() { + return requestStats; + } + + public OperationStats getResponseStats() { + return responseStats; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + PerPipelineStats that = (PerPipelineStats) o; + return pipelineId.equals(that.pipelineId) && requestStats.equals(that.requestStats) && responseStats.equals(that.responseStats); + } + + @Override + public int hashCode() { + return Objects.hash(pipelineId, requestStats, responseStats); + } + } + + static class PipelineDetailStats { + private final List requestProcessorStats; + private final List responseProcessorStats; + + public PipelineDetailStats(List requestProcessorStats, List responseProcessorStats) { + this.requestProcessorStats = requestProcessorStats; + this.responseProcessorStats = responseProcessorStats; + } + + public List requestProcessorStats() { + return requestProcessorStats; + } + + public List responseProcessorStats() { + return responseProcessorStats; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + PipelineDetailStats that = (PipelineDetailStats) o; + return requestProcessorStats.equals(that.requestProcessorStats) && responseProcessorStats.equals(that.responseProcessorStats); + } + + @Override + public int hashCode() { + return Objects.hash(requestProcessorStats, responseProcessorStats); + } + } + + static class ProcessorStats implements ToXContentFragment { + private final String processorName; // type:tag + private final String processorType; + private final OperationStats stats; + + public ProcessorStats(String processorName, String processorType, OperationStats stats) { + this.processorName = processorName; + this.processorType = processorType; + this.stats = stats; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + ProcessorStats that = (ProcessorStats) o; + return processorName.equals(that.processorName) && processorType.equals(that.processorType) && stats.equals(that.stats); + } + + @Override + public int hashCode() { + return Objects.hash(processorName, processorType, stats); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(processorName); + builder.field("type", processorType); + builder.startObject("stats"); + stats.toXContent(builder, params); + builder.endObject(); + builder.endObject(); + return builder; + } + + String getProcessorName() { + return processorName; + } + + String getProcessorType() { + return processorType; + } + + OperationStats getStats() { + return stats; + } + } + + OperationStats getTotalRequestStats() { + return totalRequestStats; + } + + OperationStats getTotalResponseStats() { + return totalResponseStats; + } + + List getPipelineStats() { + return perPipelineStats; + } + + Map getPerPipelineProcessorStats() { + return perPipelineProcessorStats; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + SearchPipelineStats stats = (SearchPipelineStats) o; + return totalRequestStats.equals(stats.totalRequestStats) + && totalResponseStats.equals(stats.totalResponseStats) + && perPipelineStats.equals(stats.perPipelineStats) + && perPipelineProcessorStats.equals(stats.perPipelineProcessorStats); + } + + @Override + public int hashCode() { + return Objects.hash(totalRequestStats, totalResponseStats, perPipelineStats, perPipelineProcessorStats); + } +} diff --git a/server/src/test/java/org/opensearch/action/admin/cluster/node/stats/NodeStatsTests.java b/server/src/test/java/org/opensearch/action/admin/cluster/node/stats/NodeStatsTests.java index d99b93b780140..1e2e085333e50 100644 --- a/server/src/test/java/org/opensearch/action/admin/cluster/node/stats/NodeStatsTests.java +++ b/server/src/test/java/org/opensearch/action/admin/cluster/node/stats/NodeStatsTests.java @@ -37,6 +37,7 @@ import org.opensearch.cluster.service.ClusterManagerThrottlingStats; import org.opensearch.common.io.stream.BytesStreamOutput; import org.opensearch.common.io.stream.StreamInput; +import org.opensearch.common.metrics.OperationStats; import org.opensearch.discovery.DiscoveryStats; import org.opensearch.cluster.coordination.PendingClusterStateStats; import org.opensearch.cluster.coordination.PublishClusterStateStats; @@ -338,40 +339,31 @@ public void testSerialization() throws IOException { if (ingestStats == null) { assertNull(deserializedIngestStats); } else { - IngestStats.Stats totalStats = ingestStats.getTotalStats(); - assertEquals(totalStats.getIngestCount(), deserializedIngestStats.getTotalStats().getIngestCount()); - assertEquals(totalStats.getIngestCurrent(), deserializedIngestStats.getTotalStats().getIngestCurrent()); - assertEquals(totalStats.getIngestFailedCount(), deserializedIngestStats.getTotalStats().getIngestFailedCount()); - assertEquals(totalStats.getIngestTimeInMillis(), deserializedIngestStats.getTotalStats().getIngestTimeInMillis()); + OperationStats totalStats = ingestStats.getTotalStats(); + assertEquals(totalStats.getCount(), deserializedIngestStats.getTotalStats().getCount()); + assertEquals(totalStats.getCurrent(), deserializedIngestStats.getTotalStats().getCurrent()); + assertEquals(totalStats.getFailedCount(), deserializedIngestStats.getTotalStats().getFailedCount()); + assertEquals(totalStats.getTotalTimeInMillis(), deserializedIngestStats.getTotalStats().getTotalTimeInMillis()); assertEquals(ingestStats.getPipelineStats().size(), deserializedIngestStats.getPipelineStats().size()); for (IngestStats.PipelineStat pipelineStat : ingestStats.getPipelineStats()) { String pipelineId = pipelineStat.getPipelineId(); - IngestStats.Stats deserializedPipelineStats = getPipelineStats( - deserializedIngestStats.getPipelineStats(), - pipelineId - ); - assertEquals(pipelineStat.getStats().getIngestFailedCount(), deserializedPipelineStats.getIngestFailedCount()); - assertEquals(pipelineStat.getStats().getIngestTimeInMillis(), deserializedPipelineStats.getIngestTimeInMillis()); - assertEquals(pipelineStat.getStats().getIngestCurrent(), deserializedPipelineStats.getIngestCurrent()); - assertEquals(pipelineStat.getStats().getIngestCount(), deserializedPipelineStats.getIngestCount()); + OperationStats deserializedPipelineStats = getPipelineStats(deserializedIngestStats.getPipelineStats(), pipelineId); + assertEquals(pipelineStat.getStats().getFailedCount(), deserializedPipelineStats.getFailedCount()); + assertEquals(pipelineStat.getStats().getTotalTimeInMillis(), deserializedPipelineStats.getTotalTimeInMillis()); + assertEquals(pipelineStat.getStats().getCurrent(), deserializedPipelineStats.getCurrent()); + assertEquals(pipelineStat.getStats().getCount(), deserializedPipelineStats.getCount()); List processorStats = ingestStats.getProcessorStats().get(pipelineId); // intentionally validating identical order Iterator it = deserializedIngestStats.getProcessorStats().get(pipelineId).iterator(); for (IngestStats.ProcessorStat processorStat : processorStats) { IngestStats.ProcessorStat deserializedProcessorStat = it.next(); + assertEquals(processorStat.getStats().getFailedCount(), deserializedProcessorStat.getStats().getFailedCount()); assertEquals( - processorStat.getStats().getIngestFailedCount(), - deserializedProcessorStat.getStats().getIngestFailedCount() - ); - assertEquals( - processorStat.getStats().getIngestTimeInMillis(), - deserializedProcessorStat.getStats().getIngestTimeInMillis() + processorStat.getStats().getTotalTimeInMillis(), + deserializedProcessorStat.getStats().getTotalTimeInMillis() ); - assertEquals( - processorStat.getStats().getIngestCurrent(), - deserializedProcessorStat.getStats().getIngestCurrent() - ); - assertEquals(processorStat.getStats().getIngestCount(), deserializedProcessorStat.getStats().getIngestCount()); + assertEquals(processorStat.getStats().getCurrent(), deserializedProcessorStat.getStats().getCurrent()); + assertEquals(processorStat.getStats().getCount(), deserializedProcessorStat.getStats().getCount()); } assertFalse(it.hasNext()); } @@ -650,7 +642,7 @@ public static NodeStats createNodeStats() { : null; IngestStats ingestStats = null; if (frequently()) { - IngestStats.Stats totalStats = new IngestStats.Stats( + OperationStats totalStats = new OperationStats( randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong(), @@ -665,7 +657,7 @@ public static NodeStats createNodeStats() { ingestPipelineStats.add( new IngestStats.PipelineStat( pipelineId, - new IngestStats.Stats( + new OperationStats( randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong(), @@ -676,7 +668,7 @@ public static NodeStats createNodeStats() { List processorPerPipeline = new ArrayList<>(numProcessors); for (int j = 0; j < numProcessors; j++) { - IngestStats.Stats processorStats = new IngestStats.Stats( + OperationStats processorStats = new OperationStats( randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong(), @@ -750,11 +742,12 @@ public static NodeStats createNodeStats() { clusterManagerThrottlingStats, weightedRoutingStats, null, + null, null ); } - private IngestStats.Stats getPipelineStats(List pipelineStats, String id) { + private OperationStats getPipelineStats(List pipelineStats, String id) { return pipelineStats.stream().filter(p1 -> p1.getPipelineId().equals(id)).findFirst().map(p2 -> p2.getStats()).orElse(null); } } diff --git a/server/src/test/java/org/opensearch/action/admin/cluster/stats/ClusterStatsNodesTests.java b/server/src/test/java/org/opensearch/action/admin/cluster/stats/ClusterStatsNodesTests.java index d3a40868bc389..627ada7092273 100644 --- a/server/src/test/java/org/opensearch/action/admin/cluster/stats/ClusterStatsNodesTests.java +++ b/server/src/test/java/org/opensearch/action/admin/cluster/stats/ClusterStatsNodesTests.java @@ -89,15 +89,15 @@ public void testIngestStats() throws Exception { processorStats.compute(stat.getType(), (key, value) -> { if (value == null) { return new long[] { - stat.getStats().getIngestCount(), - stat.getStats().getIngestFailedCount(), - stat.getStats().getIngestCurrent(), - stat.getStats().getIngestTimeInMillis() }; + stat.getStats().getCount(), + stat.getStats().getFailedCount(), + stat.getStats().getCurrent(), + stat.getStats().getTotalTimeInMillis() }; } else { - value[0] += stat.getStats().getIngestCount(); - value[1] += stat.getStats().getIngestFailedCount(); - value[2] += stat.getStats().getIngestCurrent(); - value[3] += stat.getStats().getIngestTimeInMillis(); + value[0] += stat.getStats().getCount(); + value[1] += stat.getStats().getFailedCount(); + value[2] += stat.getStats().getCurrent(); + value[3] += stat.getStats().getTotalTimeInMillis(); return value; } }); diff --git a/server/src/test/java/org/opensearch/cluster/DiskUsageTests.java b/server/src/test/java/org/opensearch/cluster/DiskUsageTests.java index 73349d45bd5c7..e5833ea619774 100644 --- a/server/src/test/java/org/opensearch/cluster/DiskUsageTests.java +++ b/server/src/test/java/org/opensearch/cluster/DiskUsageTests.java @@ -189,6 +189,7 @@ public void testFillDiskUsage() { null, null, null, + null, null ), new NodeStats( @@ -214,6 +215,7 @@ public void testFillDiskUsage() { null, null, null, + null, null ), new NodeStats( @@ -239,6 +241,7 @@ public void testFillDiskUsage() { null, null, null, + null, null ) ); @@ -295,6 +298,7 @@ public void testFillDiskUsageSomeInvalidValues() { null, null, null, + null, null ), new NodeStats( @@ -320,6 +324,7 @@ public void testFillDiskUsageSomeInvalidValues() { null, null, null, + null, null ), new NodeStats( @@ -345,6 +350,7 @@ public void testFillDiskUsageSomeInvalidValues() { null, null, null, + null, null ) ); diff --git a/server/src/test/java/org/opensearch/ingest/CompoundProcessorTests.java b/server/src/test/java/org/opensearch/ingest/CompoundProcessorTests.java index b299ac4d66996..76301acac0c19 100644 --- a/server/src/test/java/org/opensearch/ingest/CompoundProcessorTests.java +++ b/server/src/test/java/org/opensearch/ingest/CompoundProcessorTests.java @@ -33,6 +33,7 @@ package org.opensearch.ingest; import org.opensearch.OpenSearchException; +import org.opensearch.common.metrics.OperationStats; import org.opensearch.test.OpenSearchTestCase; import org.junit.Before; @@ -433,10 +434,10 @@ private void assertStats(CompoundProcessor compoundProcessor, long count, long f } private void assertStats(int processor, CompoundProcessor compoundProcessor, long current, long count, long failed, long time) { - IngestStats.Stats stats = compoundProcessor.getProcessorsWithMetrics().get(processor).v2().createStats(); - assertThat(stats.getIngestCount(), equalTo(count)); - assertThat(stats.getIngestCurrent(), equalTo(current)); - assertThat(stats.getIngestFailedCount(), equalTo(failed)); - assertThat(stats.getIngestTimeInMillis(), equalTo(time)); + OperationStats stats = compoundProcessor.getProcessorsWithMetrics().get(processor).v2().createStats(); + assertThat(stats.getCount(), equalTo(count)); + assertThat(stats.getCurrent(), equalTo(current)); + assertThat(stats.getFailedCount(), equalTo(failed)); + assertThat(stats.getTotalTimeInMillis(), equalTo(time)); } } diff --git a/server/src/test/java/org/opensearch/ingest/ConditionalProcessorTests.java b/server/src/test/java/org/opensearch/ingest/ConditionalProcessorTests.java index a383ab9b97918..921ac10c02862 100644 --- a/server/src/test/java/org/opensearch/ingest/ConditionalProcessorTests.java +++ b/server/src/test/java/org/opensearch/ingest/ConditionalProcessorTests.java @@ -32,6 +32,7 @@ package org.opensearch.ingest; +import org.opensearch.common.metrics.OperationStats; import org.opensearch.common.settings.Settings; import org.opensearch.script.IngestConditionalScript; import org.opensearch.script.MockScriptEngine; @@ -250,10 +251,10 @@ private static void assertMutatingCtxThrows(Consumer> mutati } private static void assertStats(ConditionalProcessor conditionalProcessor, long count, long failed, long time) { - IngestStats.Stats stats = conditionalProcessor.getMetric().createStats(); - assertThat(stats.getIngestCount(), equalTo(count)); - assertThat(stats.getIngestCurrent(), equalTo(0L)); - assertThat(stats.getIngestFailedCount(), equalTo(failed)); - assertThat(stats.getIngestTimeInMillis(), greaterThanOrEqualTo(time)); + OperationStats stats = conditionalProcessor.getMetric().createStats(); + assertThat(stats.getCount(), equalTo(count)); + assertThat(stats.getCurrent(), equalTo(0L)); + assertThat(stats.getFailedCount(), equalTo(failed)); + assertThat(stats.getTotalTimeInMillis(), greaterThanOrEqualTo(time)); } } diff --git a/server/src/test/java/org/opensearch/ingest/IngestServiceTests.java b/server/src/test/java/org/opensearch/ingest/IngestServiceTests.java index 4176a32e32ad3..19fef468c529e 100644 --- a/server/src/test/java/org/opensearch/ingest/IngestServiceTests.java +++ b/server/src/test/java/org/opensearch/ingest/IngestServiceTests.java @@ -58,6 +58,7 @@ import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.SetOnce; import org.opensearch.common.bytes.BytesArray; +import org.opensearch.common.metrics.OperationStats; import org.opensearch.common.settings.Settings; import org.opensearch.common.util.concurrent.OpenSearchExecutors; import org.opensearch.core.xcontent.XContentBuilder; @@ -1739,14 +1740,14 @@ private void assertPipelineStats(List pipelineStats, S assertStats(getPipelineStats(pipelineStats, pipelineId), count, failed, time); } - private void assertStats(IngestStats.Stats stats, long count, long failed, long time) { - assertThat(stats.getIngestCount(), equalTo(count)); - assertThat(stats.getIngestCurrent(), equalTo(0L)); - assertThat(stats.getIngestFailedCount(), equalTo(failed)); - assertThat(stats.getIngestTimeInMillis(), greaterThanOrEqualTo(time)); + private void assertStats(OperationStats stats, long count, long failed, long time) { + assertThat(stats.getCount(), equalTo(count)); + assertThat(stats.getCurrent(), equalTo(0L)); + assertThat(stats.getFailedCount(), equalTo(failed)); + assertThat(stats.getTotalTimeInMillis(), greaterThanOrEqualTo(time)); } - private IngestStats.Stats getPipelineStats(List pipelineStats, String id) { + private OperationStats getPipelineStats(List pipelineStats, String id) { return pipelineStats.stream().filter(p1 -> p1.getPipelineId().equals(id)).findFirst().map(p2 -> p2.getStats()).orElse(null); } } diff --git a/server/src/test/java/org/opensearch/ingest/IngestStatsTests.java b/server/src/test/java/org/opensearch/ingest/IngestStatsTests.java index 0486c9f29f86e..2dae90e858d29 100644 --- a/server/src/test/java/org/opensearch/ingest/IngestStatsTests.java +++ b/server/src/test/java/org/opensearch/ingest/IngestStatsTests.java @@ -36,6 +36,7 @@ import org.opensearch.common.collect.MapBuilder; import org.opensearch.common.io.stream.BytesStreamOutput; import org.opensearch.common.io.stream.StreamInput; +import org.opensearch.common.metrics.OperationStats; import org.opensearch.test.OpenSearchTestCase; import org.opensearch.test.VersionUtils; @@ -50,7 +51,7 @@ public class IngestStatsTests extends OpenSearchTestCase { public void testSerialization() throws IOException { - IngestStats.Stats totalStats = new IngestStats.Stats(50, 100, 200, 300); + OperationStats totalStats = new OperationStats(50, 100, 200, 300); List pipelineStats = createPipelineStats(); Map> processorStats = createProcessorStats(pipelineStats); IngestStats ingestStats = new IngestStats(totalStats, pipelineStats, processorStats); @@ -59,7 +60,7 @@ public void testSerialization() throws IOException { } public void testBWCIngestProcessorTypeStats() throws IOException { - IngestStats.Stats totalStats = new IngestStats.Stats(50, 100, 200, 300); + OperationStats totalStats = new OperationStats(50, 100, 200, 300); List pipelineStats = createPipelineStats(); Map> processorStats = createProcessorStats(pipelineStats); IngestStats expectedIngestStats = new IngestStats(totalStats, pipelineStats, processorStats); @@ -76,20 +77,20 @@ public void testBWCIngestProcessorTypeStats() throws IOException { } private List createPipelineStats() { - IngestStats.PipelineStat pipeline1Stats = new IngestStats.PipelineStat("pipeline1", new IngestStats.Stats(3, 3, 3, 3)); - IngestStats.PipelineStat pipeline2Stats = new IngestStats.PipelineStat("pipeline2", new IngestStats.Stats(47, 97, 197, 297)); - IngestStats.PipelineStat pipeline3Stats = new IngestStats.PipelineStat("pipeline3", new IngestStats.Stats(0, 0, 0, 0)); + IngestStats.PipelineStat pipeline1Stats = new IngestStats.PipelineStat("pipeline1", new OperationStats(3, 3, 3, 3)); + IngestStats.PipelineStat pipeline2Stats = new IngestStats.PipelineStat("pipeline2", new OperationStats(47, 97, 197, 297)); + IngestStats.PipelineStat pipeline3Stats = new IngestStats.PipelineStat("pipeline3", new OperationStats(0, 0, 0, 0)); return Stream.of(pipeline1Stats, pipeline2Stats, pipeline3Stats).collect(Collectors.toList()); } private Map> createProcessorStats(List pipelineStats) { assert (pipelineStats.size() >= 2); - IngestStats.ProcessorStat processor1Stat = new IngestStats.ProcessorStat("processor1", "type", new IngestStats.Stats(1, 1, 1, 1)); - IngestStats.ProcessorStat processor2Stat = new IngestStats.ProcessorStat("processor2", "type", new IngestStats.Stats(2, 2, 2, 2)); + IngestStats.ProcessorStat processor1Stat = new IngestStats.ProcessorStat("processor1", "type", new OperationStats(1, 1, 1, 1)); + IngestStats.ProcessorStat processor2Stat = new IngestStats.ProcessorStat("processor2", "type", new OperationStats(2, 2, 2, 2)); IngestStats.ProcessorStat processor3Stat = new IngestStats.ProcessorStat( "processor3", "type", - new IngestStats.Stats(47, 97, 197, 297) + new OperationStats(47, 97, 197, 297) ); // pipeline1 -> processor1,processor2; pipeline2 -> processor3 return MapBuilder.>newMapBuilder() @@ -151,14 +152,14 @@ private void assertIngestStats( } - private void assertStats(IngestStats.Stats fromObject, IngestStats.Stats fromStream) { - assertEquals(fromObject.getIngestCount(), fromStream.getIngestCount()); - assertEquals(fromObject.getIngestFailedCount(), fromStream.getIngestFailedCount()); - assertEquals(fromObject.getIngestTimeInMillis(), fromStream.getIngestTimeInMillis()); - assertEquals(fromObject.getIngestCurrent(), fromStream.getIngestCurrent()); + private void assertStats(OperationStats fromObject, OperationStats fromStream) { + assertEquals(fromObject.getCount(), fromStream.getCount()); + assertEquals(fromObject.getFailedCount(), fromStream.getFailedCount()); + assertEquals(fromObject.getTotalTimeInMillis(), fromStream.getTotalTimeInMillis()); + assertEquals(fromObject.getCurrent(), fromStream.getCurrent()); } - private IngestStats.Stats getPipelineStats(List pipelineStats, String id) { + private OperationStats getPipelineStats(List pipelineStats, String id) { return pipelineStats.stream().filter(p1 -> p1.getPipelineId().equals(id)).findFirst().map(p2 -> p2.getStats()).orElse(null); } } diff --git a/server/src/test/java/org/opensearch/ingest/PipelineProcessorTests.java b/server/src/test/java/org/opensearch/ingest/PipelineProcessorTests.java index 9f8dda15eeb65..3708b5bc32955 100644 --- a/server/src/test/java/org/opensearch/ingest/PipelineProcessorTests.java +++ b/server/src/test/java/org/opensearch/ingest/PipelineProcessorTests.java @@ -32,6 +32,7 @@ package org.opensearch.ingest; import org.opensearch.OpenSearchException; +import org.opensearch.common.metrics.OperationStats; import org.opensearch.script.ScriptService; import org.opensearch.script.TemplateScript; import org.opensearch.test.OpenSearchTestCase; @@ -192,29 +193,29 @@ public void testPipelineProcessorWithPipelineChain() throws Exception { assertNotNull(ingestDocument.getSourceAndMetadata().get(key1)); // check the stats - IngestStats.Stats pipeline1Stats = pipeline1.getMetrics().createStats(); - IngestStats.Stats pipeline2Stats = pipeline2.getMetrics().createStats(); - IngestStats.Stats pipeline3Stats = pipeline3.getMetrics().createStats(); + OperationStats pipeline1Stats = pipeline1.getMetrics().createStats(); + OperationStats pipeline2Stats = pipeline2.getMetrics().createStats(); + OperationStats pipeline3Stats = pipeline3.getMetrics().createStats(); // current - assertThat(pipeline1Stats.getIngestCurrent(), equalTo(0L)); - assertThat(pipeline2Stats.getIngestCurrent(), equalTo(0L)); - assertThat(pipeline3Stats.getIngestCurrent(), equalTo(0L)); + assertThat(pipeline1Stats.getCurrent(), equalTo(0L)); + assertThat(pipeline2Stats.getCurrent(), equalTo(0L)); + assertThat(pipeline3Stats.getCurrent(), equalTo(0L)); // count - assertThat(pipeline1Stats.getIngestCount(), equalTo(1L)); - assertThat(pipeline2Stats.getIngestCount(), equalTo(1L)); - assertThat(pipeline3Stats.getIngestCount(), equalTo(1L)); + assertThat(pipeline1Stats.getCount(), equalTo(1L)); + assertThat(pipeline2Stats.getCount(), equalTo(1L)); + assertThat(pipeline3Stats.getCount(), equalTo(1L)); // time - assertThat(pipeline1Stats.getIngestTimeInMillis(), equalTo(0L)); - assertThat(pipeline2Stats.getIngestTimeInMillis(), equalTo(3L)); - assertThat(pipeline3Stats.getIngestTimeInMillis(), equalTo(2L)); + assertThat(pipeline1Stats.getTotalTimeInMillis(), equalTo(0L)); + assertThat(pipeline2Stats.getTotalTimeInMillis(), equalTo(3L)); + assertThat(pipeline3Stats.getTotalTimeInMillis(), equalTo(2L)); // failure - assertThat(pipeline1Stats.getIngestFailedCount(), equalTo(0L)); - assertThat(pipeline2Stats.getIngestFailedCount(), equalTo(0L)); - assertThat(pipeline3Stats.getIngestFailedCount(), equalTo(1L)); + assertThat(pipeline1Stats.getFailedCount(), equalTo(0L)); + assertThat(pipeline2Stats.getFailedCount(), equalTo(0L)); + assertThat(pipeline3Stats.getFailedCount(), equalTo(1L)); } public void testIngestPipelineMetadata() { diff --git a/server/src/test/java/org/opensearch/search/pipeline/SearchPipelineServiceTests.java b/server/src/test/java/org/opensearch/search/pipeline/SearchPipelineServiceTests.java index d49d9fd41031c..219dddff40b35 100644 --- a/server/src/test/java/org/opensearch/search/pipeline/SearchPipelineServiceTests.java +++ b/server/src/test/java/org/opensearch/search/pipeline/SearchPipelineServiceTests.java @@ -30,6 +30,7 @@ import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.bytes.BytesArray; import org.opensearch.common.io.stream.NamedWriteableRegistry; +import org.opensearch.common.metrics.OperationStats; import org.opensearch.common.settings.Settings; import org.opensearch.common.util.concurrent.OpenSearchExecutors; import org.opensearch.common.xcontent.XContentType; @@ -786,4 +787,126 @@ public void testExceptionOnResponseProcessing() throws Exception { // Exception thrown when processing response expectThrows(SearchPipelineProcessingException.class, () -> pipelinedRequest.transformResponse(response)); } + + public void testStats() throws Exception { + SearchRequestProcessor throwingRequestProcessor = new FakeRequestProcessor("throwing_request", "1", null, r -> { + throw new RuntimeException(); + }); + Map> requestProcessors = Map.of( + "successful_request", + (pf, t, f, c) -> new FakeRequestProcessor("successful_request", "2", null, r -> {}), + "throwing_request", + (pf, t, f, c) -> throwingRequestProcessor + ); + SearchResponseProcessor throwingResponseProcessor = new FakeResponseProcessor("throwing_response", "3", null, r -> { + throw new RuntimeException(); + }); + Map> responseProcessors = Map.of( + "successful_response", + (pf, t, f, c) -> new FakeResponseProcessor("successful_response", "4", null, r -> {}), + "throwing_response", + (pf, t, f, c) -> throwingResponseProcessor + ); + SearchPipelineService searchPipelineService = createWithProcessors(requestProcessors, responseProcessors); + + SearchPipelineMetadata metadata = new SearchPipelineMetadata( + Map.of( + "good_response_pipeline", + new PipelineConfiguration( + "good_response_pipeline", + new BytesArray("{\"response_processors\" : [ { \"successful_response\": {} } ] }"), + XContentType.JSON + ), + "bad_response_pipeline", + new PipelineConfiguration( + "bad_response_pipeline", + new BytesArray("{\"response_processors\" : [ { \"throwing_response\": {} } ] }"), + XContentType.JSON + ), + "good_request_pipeline", + new PipelineConfiguration( + "good_request_pipeline", + new BytesArray("{\"request_processors\" : [ { \"successful_request\": {} } ] }"), + XContentType.JSON + ), + "bad_request_pipeline", + new PipelineConfiguration( + "bad_request_pipeline", + new BytesArray("{\"request_processors\" : [ { \"throwing_request\": {} } ] }"), + XContentType.JSON + ) + ) + ); + ClusterState clusterState = ClusterState.builder(new ClusterName("_name")).build(); + ClusterState previousState = clusterState; + clusterState = ClusterState.builder(clusterState) + .metadata(Metadata.builder().putCustom(SearchPipelineMetadata.TYPE, metadata)) + .build(); + searchPipelineService.applyClusterState(new ClusterChangedEvent("", clusterState, previousState)); + + SearchRequest request = new SearchRequest(); + SearchResponse response = new SearchResponse(null, null, 0, 0, 0, 0, null, null); + + searchPipelineService.resolvePipeline(request.pipeline("good_request_pipeline")).transformResponse(response); + expectThrows( + SearchPipelineProcessingException.class, + () -> searchPipelineService.resolvePipeline(request.pipeline("bad_request_pipeline")).transformResponse(response) + ); + searchPipelineService.resolvePipeline(request.pipeline("good_response_pipeline")).transformResponse(response); + expectThrows( + SearchPipelineProcessingException.class, + () -> searchPipelineService.resolvePipeline(request.pipeline("bad_response_pipeline")).transformResponse(response) + ); + + SearchPipelineStats stats = searchPipelineService.stats(); + assertPipelineStats(stats.getTotalRequestStats(), 2, 1); + assertPipelineStats(stats.getTotalResponseStats(), 2, 1); + for (SearchPipelineStats.PerPipelineStats perPipelineStats : stats.getPipelineStats()) { + SearchPipelineStats.PipelineDetailStats detailStats = stats.getPerPipelineProcessorStats() + .get(perPipelineStats.getPipelineId()); + switch (perPipelineStats.getPipelineId()) { + case "good_request_pipeline": + assertPipelineStats(perPipelineStats.getRequestStats(), 1, 0); + assertPipelineStats(perPipelineStats.getResponseStats(), 0, 0); + assertEquals(1, detailStats.requestProcessorStats().size()); + assertEquals(0, detailStats.responseProcessorStats().size()); + assertEquals("successful_request:2", detailStats.requestProcessorStats().get(0).getProcessorName()); + assertEquals("successful_request", detailStats.requestProcessorStats().get(0).getProcessorType()); + assertPipelineStats(detailStats.requestProcessorStats().get(0).getStats(), 1, 0); + break; + case "bad_request_pipeline": + assertPipelineStats(perPipelineStats.getRequestStats(), 1, 1); + assertPipelineStats(perPipelineStats.getResponseStats(), 0, 0); + assertEquals(1, detailStats.requestProcessorStats().size()); + assertEquals(0, detailStats.responseProcessorStats().size()); + assertEquals("throwing_request:1", detailStats.requestProcessorStats().get(0).getProcessorName()); + assertEquals("throwing_request", detailStats.requestProcessorStats().get(0).getProcessorType()); + assertPipelineStats(detailStats.requestProcessorStats().get(0).getStats(), 1, 1); + break; + case "good_response_pipeline": + assertPipelineStats(perPipelineStats.getRequestStats(), 0, 0); + assertPipelineStats(perPipelineStats.getResponseStats(), 1, 0); + assertEquals(0, detailStats.requestProcessorStats().size()); + assertEquals(1, detailStats.responseProcessorStats().size()); + assertEquals("successful_response:4", detailStats.responseProcessorStats().get(0).getProcessorName()); + assertEquals("successful_response", detailStats.responseProcessorStats().get(0).getProcessorType()); + assertPipelineStats(detailStats.responseProcessorStats().get(0).getStats(), 1, 0); + break; + case "bad_response_pipeline": + assertPipelineStats(perPipelineStats.getRequestStats(), 0, 0); + assertPipelineStats(perPipelineStats.getResponseStats(), 1, 1); + assertEquals(0, detailStats.requestProcessorStats().size()); + assertEquals(1, detailStats.responseProcessorStats().size()); + assertEquals("throwing_response:3", detailStats.responseProcessorStats().get(0).getProcessorName()); + assertEquals("throwing_response", detailStats.responseProcessorStats().get(0).getProcessorType()); + assertPipelineStats(detailStats.responseProcessorStats().get(0).getStats(), 1, 1); + break; + } + } + } + + private static void assertPipelineStats(OperationStats stats, long count, long failed) { + assertEquals(stats.getCount(), count); + assertEquals(stats.getFailedCount(), failed); + } } diff --git a/server/src/test/java/org/opensearch/search/pipeline/SearchPipelineStatsTests.java b/server/src/test/java/org/opensearch/search/pipeline/SearchPipelineStatsTests.java new file mode 100644 index 0000000000000..dac41f0db4e00 --- /dev/null +++ b/server/src/test/java/org/opensearch/search/pipeline/SearchPipelineStatsTests.java @@ -0,0 +1,185 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.search.pipeline; + +import org.opensearch.common.bytes.BytesReference; +import org.opensearch.common.io.stream.BytesStreamOutput; +import org.opensearch.common.io.stream.StreamInput; +import org.opensearch.common.metrics.OperationStats; +import org.opensearch.common.xcontent.XContentHelper; +import org.opensearch.common.xcontent.XContentType; +import org.opensearch.common.xcontent.json.JsonXContent; +import org.opensearch.core.xcontent.DeprecationHandler; +import org.opensearch.core.xcontent.MediaType; +import org.opensearch.core.xcontent.XContentBuilder; +import org.opensearch.core.xcontent.XContentParser; +import org.opensearch.test.OpenSearchTestCase; + +import java.io.IOException; +import java.util.List; +import java.util.Map; + +public class SearchPipelineStatsTests extends OpenSearchTestCase { + public void testSerializationRoundtrip() throws IOException { + SearchPipelineStats stats = createStats(); + SearchPipelineStats deserialized; + try (BytesStreamOutput bytesStreamOutput = new BytesStreamOutput()) { + stats.writeTo(bytesStreamOutput); + try (StreamInput bytesStreamInput = bytesStreamOutput.bytes().streamInput()) { + deserialized = new SearchPipelineStats(bytesStreamInput); + } + } + assertEquals(stats, deserialized); + } + + private static SearchPipelineStats createStats() { + return new SearchPipelineStats( + new OperationStats(1, 2, 3, 4), + new OperationStats(5, 6, 7, 8), + List.of( + new SearchPipelineStats.PerPipelineStats("p1", new OperationStats(9, 10, 11, 12), new OperationStats(13, 14, 15, 16)), + new SearchPipelineStats.PerPipelineStats("p2", new OperationStats(17, 18, 19, 20), new OperationStats(21, 22, 23, 24)) + + ), + Map.of( + "p1", + new SearchPipelineStats.PipelineDetailStats( + List.of(new SearchPipelineStats.ProcessorStats("req1:a", "req1", new OperationStats(25, 26, 27, 28))), + List.of(new SearchPipelineStats.ProcessorStats("rsp1:a", "rsp1", new OperationStats(29, 30, 31, 32))) + ), + "p2", + new SearchPipelineStats.PipelineDetailStats( + List.of( + new SearchPipelineStats.ProcessorStats("req1:a", "req1", new OperationStats(33, 34, 35, 36)), + new SearchPipelineStats.ProcessorStats("req2", "req2", new OperationStats(37, 38, 39, 40)) + ), + List.of() + ) + ) + ); + } + + public void testToXContent() throws IOException { + XContentBuilder actualBuilder = XContentBuilder.builder(JsonXContent.jsonXContent); + actualBuilder.startObject(); + createStats().toXContent(actualBuilder, null); + actualBuilder.endObject(); + + String expected = "{" + + " \"search_pipeline\" : {" + + " \"total_request\" : {" + + " \"count\" : 1," + + " \"time_in_millis\" : 2," + + " \"current\" : 3," + + " \"failed\" : 4" + + " }," + + " \"total_response\" : {" + + " \"count\" : 5," + + " \"time_in_millis\" : 6," + + " \"current\" : 7," + + " \"failed\" : 8" + + " }," + + " \"pipelines\" : {" + + " \"p1\" : {" + + " \"request\" : {" + + " \"count\" : 9," + + " \"time_in_millis\" : 10," + + " \"current\" : 11," + + " \"failed\" : 12" + + " }," + + " \"response\" : {" + + " \"count\" : 13," + + " \"time_in_millis\" : 14," + + " \"current\" : 15," + + " \"failed\" : 16" + + " }," + + " \"request_processors\" : [" + + " {" + + " \"req1:a\" : {" + + " \"type\" : \"req1\"," + + " \"stats\" : {" + + " \"count\" : 25," + + " \"time_in_millis\" : 26," + + " \"current\" : 27," + + " \"failed\" : 28" + + " }" + + " }" + + " }" + + " ]," + + " \"response_processors\" : [" + + " {" + + " \"rsp1:a\" : {" + + " \"type\" : \"rsp1\"," + + " \"stats\" : {" + + " \"count\" : 29," + + " \"time_in_millis\" : 30," + + " \"current\" : 31," + + " \"failed\" : 32" + + " }" + + " }" + + " }" + + " ]" + + " }," + + " \"p2\" : {" + + " \"request\" : {" + + " \"count\" : 17," + + " \"time_in_millis\" : 18," + + " \"current\" : 19," + + " \"failed\" : 20" + + " }," + + " \"response\" : {" + + " \"count\" : 21," + + " \"time_in_millis\" : 22," + + " \"current\" : 23," + + " \"failed\" : 24" + + " }," + + " \"request_processors\" : [" + + " {" + + " \"req1:a\" : {" + + " \"type\" : \"req1\"," + + " \"stats\" : {" + + " \"count\" : 33," + + " \"time_in_millis\" : 34," + + " \"current\" : 35," + + " \"failed\" : 36" + + " }" + + " }" + + " }," + + " {" + + " \"req2\" : {" + + " \"type\" : \"req2\"," + + " \"stats\" : {" + + " \"count\" : 37," + + " \"time_in_millis\" : 38," + + " \"current\" : 39," + + " \"failed\" : 40" + + " }" + + " }" + + " }" + + " ]," + + " \"response_processors\" : [ ]" + + " }" + + " }" + + " }" + + "}"; + + XContentParser expectedParser = JsonXContent.jsonXContent.createParser( + this.xContentRegistry(), + DeprecationHandler.THROW_UNSUPPORTED_OPERATION, + expected + ); + XContentBuilder expectedBuilder = XContentBuilder.builder(JsonXContent.jsonXContent); + expectedBuilder.generator().copyCurrentStructure(expectedParser); + + assertEquals( + XContentHelper.convertToMap(BytesReference.bytes(expectedBuilder), false, (MediaType) XContentType.JSON), + XContentHelper.convertToMap(BytesReference.bytes(actualBuilder), false, (MediaType) XContentType.JSON) + ); + } +} diff --git a/test/framework/src/main/java/org/opensearch/cluster/MockInternalClusterInfoService.java b/test/framework/src/main/java/org/opensearch/cluster/MockInternalClusterInfoService.java index cf5f6613c3ea1..6634d1b4dbafc 100644 --- a/test/framework/src/main/java/org/opensearch/cluster/MockInternalClusterInfoService.java +++ b/test/framework/src/main/java/org/opensearch/cluster/MockInternalClusterInfoService.java @@ -119,7 +119,8 @@ List adjustNodesStats(List nodesStats) { nodeStats.getClusterManagerThrottlingStats(), nodeStats.getWeightedRoutingStats(), nodeStats.getFileCacheStats(), - nodeStats.getTaskCancellationStats() + nodeStats.getTaskCancellationStats(), + nodeStats.getSearchPipelineStats() ); }).collect(Collectors.toList()); } diff --git a/test/framework/src/main/java/org/opensearch/test/InternalTestCluster.java b/test/framework/src/main/java/org/opensearch/test/InternalTestCluster.java index 6bc4ecacbd980..f67924a2a7719 100644 --- a/test/framework/src/main/java/org/opensearch/test/InternalTestCluster.java +++ b/test/framework/src/main/java/org/opensearch/test/InternalTestCluster.java @@ -2685,6 +2685,7 @@ public void ensureEstimatedStats() { false, false, false, + false, false ); assertThat(