Skip to content

Commit

Permalink
Split pipeline metrics tracking into its own class
Browse files Browse the repository at this point in the history
Thanks @saratvemulapalli for the suggestion! This lets the Pipeline
class focus on transforming requests / responses, while the subclass
focuses on tracking and managing metrics.

Signed-off-by: Michael Froh <[email protected]>
  • Loading branch information
msfroh committed Jun 22, 2023
1 parent 585231f commit 159e568
Show file tree
Hide file tree
Showing 3 changed files with 280 additions and 182 deletions.
223 changes: 47 additions & 176 deletions server/src/main/java/org/opensearch/search/pipeline/Pipeline.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,30 +8,18 @@

package org.opensearch.search.pipeline;

import org.opensearch.OpenSearchParseException;
import org.opensearch.action.search.SearchRequest;
import org.opensearch.action.search.SearchResponse;
import org.opensearch.common.Nullable;
import org.opensearch.common.io.stream.BytesStreamOutput;
import org.opensearch.common.io.stream.NamedWriteableAwareStreamInput;
import org.opensearch.common.io.stream.NamedWriteableRegistry;
import org.opensearch.common.io.stream.StreamInput;
import org.opensearch.common.metrics.OperationMetrics;
import org.opensearch.ingest.ConfigurationUtils;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.function.LongSupplier;
import java.util.stream.Collectors;

import static org.opensearch.ingest.ConfigurationUtils.TAG_KEY;
import static org.opensearch.ingest.Pipeline.DESCRIPTION_KEY;
import static org.opensearch.ingest.Pipeline.VERSION_KEY;

/**
* Concrete representation of a search pipeline, holding multiple processors.
Expand All @@ -46,111 +34,30 @@ class Pipeline {

// TODO: Refactor org.opensearch.ingest.CompoundProcessor to implement our generic Processor interface
// Then these can be CompoundProcessors instead of lists.
private final List<ProcessorWithMetrics<SearchRequestProcessor>> searchRequestProcessors;
private final List<ProcessorWithMetrics<SearchResponseProcessor>> searchResponseProcessors;
private final List<SearchRequestProcessor> searchRequestProcessors;
private final List<SearchResponseProcessor> searchResponseProcessors;

private final NamedWriteableRegistry namedWriteableRegistry;
private final OperationMetrics totalRequestMetrics;
private final OperationMetrics totalResponseMetrics;
private final OperationMetrics pipelineRequestMetrics = new OperationMetrics();
private final OperationMetrics pipelineResponseMetrics = new OperationMetrics();
private final LongSupplier relativeTimeSupplier;

private static class ProcessorWithMetrics<T extends Processor> {
private final T processor;
private final OperationMetrics metrics = new OperationMetrics();

public ProcessorWithMetrics(T processor) {
this.processor = processor;
}
}

private Pipeline(
Pipeline(
String id,
@Nullable String description,
@Nullable Integer version,
List<SearchRequestProcessor> requestProcessors,
List<SearchResponseProcessor> responseProcessors,
NamedWriteableRegistry namedWriteableRegistry,
OperationMetrics totalRequestMetrics,
OperationMetrics totalResponseMetrics,
LongSupplier relativeTimeSupplier
) {
this.id = id;
this.description = description;
this.version = version;
this.searchRequestProcessors = requestProcessors.stream().map(ProcessorWithMetrics::new).collect(Collectors.toList());
this.searchResponseProcessors = responseProcessors.stream().map(ProcessorWithMetrics::new).collect(Collectors.toList());
this.searchRequestProcessors = Collections.unmodifiableList(requestProcessors);
this.searchResponseProcessors = Collections.unmodifiableList(responseProcessors);
this.namedWriteableRegistry = namedWriteableRegistry;
this.totalRequestMetrics = totalRequestMetrics;
this.totalResponseMetrics = totalResponseMetrics;
this.relativeTimeSupplier = relativeTimeSupplier;
}

static Pipeline create(
String id,
Map<String, Object> config,
Map<String, Processor.Factory<SearchRequestProcessor>> requestProcessorFactories,
Map<String, Processor.Factory<SearchResponseProcessor>> responseProcessorFactories,
NamedWriteableRegistry namedWriteableRegistry,
OperationMetrics totalRequestProcessingMetrics,
OperationMetrics totalResponseProcessingMetrics
) throws Exception {
String description = ConfigurationUtils.readOptionalStringProperty(null, null, config, DESCRIPTION_KEY);
Integer version = ConfigurationUtils.readIntProperty(null, null, config, VERSION_KEY, null);
List<Map<String, Object>> requestProcessorConfigs = ConfigurationUtils.readOptionalList(null, null, config, REQUEST_PROCESSORS_KEY);
List<SearchRequestProcessor> requestProcessors = readProcessors(requestProcessorFactories, requestProcessorConfigs);
List<Map<String, Object>> responseProcessorConfigs = ConfigurationUtils.readOptionalList(
null,
null,
config,
RESPONSE_PROCESSORS_KEY
);
List<SearchResponseProcessor> responseProcessors = readProcessors(responseProcessorFactories, responseProcessorConfigs);
if (config.isEmpty() == false) {
throw new OpenSearchParseException(
"pipeline ["
+ id
+ "] doesn't support one or more provided configuration parameters "
+ Arrays.toString(config.keySet().toArray())
);
}
return new Pipeline(
id,
description,
version,
requestProcessors,
responseProcessors,
namedWriteableRegistry,
totalRequestProcessingMetrics,
totalResponseProcessingMetrics,
System::nanoTime
);
}

private static <T extends Processor> List<T> readProcessors(
Map<String, Processor.Factory<T>> processorFactories,
List<Map<String, Object>> requestProcessorConfigs
) throws Exception {
List<T> processors = new ArrayList<>();
if (requestProcessorConfigs == null) {
return processors;
}
for (Map<String, Object> processorConfigWithKey : requestProcessorConfigs) {
for (Map.Entry<String, Object> entry : processorConfigWithKey.entrySet()) {
String type = entry.getKey();
if (!processorFactories.containsKey(type)) {
throw new IllegalArgumentException("Invalid processor type " + type);
}
Map<String, Object> config = (Map<String, Object>) entry.getValue();
String tag = ConfigurationUtils.readOptionalStringProperty(null, null, config, TAG_KEY);
String description = ConfigurationUtils.readOptionalStringProperty(null, tag, config, DESCRIPTION_KEY);
processors.add(processorFactories.get(type).create(processorFactories, tag, description, config));
}
}
return Collections.unmodifiableList(processors);
}

String getId() {
return id;
}
Expand All @@ -164,18 +71,41 @@ Integer getVersion() {
}

List<SearchRequestProcessor> getSearchRequestProcessors() {
return searchRequestProcessors.stream().map(p -> p.processor).collect(Collectors.toList());
return searchRequestProcessors;
}

List<SearchResponseProcessor> getSearchResponseProcessors() {
return searchResponseProcessors.stream().map(p -> p.processor).collect(Collectors.toList());
return searchResponseProcessors;
}

protected void beforeTransformRequest() {}

protected void afterTransformRequest(long timeInNanos) {}

protected void onTransformRequestFailure() {}

protected void beforeRequestProcessor(Processor processor) {}

protected void afterRequestProcessor(Processor processor, long timeInNanos) {}

protected void onRequestProcessorFailed(Processor processor) {}

protected void beforeTransformResponse() {}

protected void afterTransformResponse(long timeInNanos) {}

protected void onTransformResponseFailure() {}

protected void beforeResponseProcessor(Processor processor) {}

protected void afterResponseProcessor(Processor processor, long timeInNanos) {}

protected void onResponseProcessorFailed(Processor processor) {}

SearchRequest transformRequest(SearchRequest request) throws SearchPipelineProcessingException {
if (searchRequestProcessors.isEmpty() == false) {
long pipelineStart = relativeTimeSupplier.getAsLong();
totalRequestMetrics.before();
pipelineRequestMetrics.before();
beforeTransformRequest();
try {
try (BytesStreamOutput bytesStreamOutput = new BytesStreamOutput()) {
request.writeTo(bytesStreamOutput);
Expand All @@ -185,27 +115,25 @@ SearchRequest transformRequest(SearchRequest request) throws SearchPipelineProce
}
}
}
for (ProcessorWithMetrics<SearchRequestProcessor> processorWithMetrics : searchRequestProcessors) {
processorWithMetrics.metrics.before();
for (SearchRequestProcessor processor : searchRequestProcessors) {
beforeRequestProcessor(processor);
long start = relativeTimeSupplier.getAsLong();
try {
request = processorWithMetrics.processor.processRequest(request);
request = processor.processRequest(request);
} catch (Exception e) {
processorWithMetrics.metrics.failed();
onRequestProcessorFailed(processor);
throw e;
} finally {
long took = TimeUnit.NANOSECONDS.toMillis(relativeTimeSupplier.getAsLong() - start);
processorWithMetrics.metrics.after(took);
afterRequestProcessor(processor, took);
}
}
} catch (Exception e) {
totalRequestMetrics.failed();
pipelineRequestMetrics.failed();
onTransformRequestFailure();
throw new SearchPipelineProcessingException(e);
} finally {
long took = TimeUnit.NANOSECONDS.toMillis(relativeTimeSupplier.getAsLong() - pipelineStart);
totalRequestMetrics.after(took);
pipelineRequestMetrics.after(took);
afterTransformRequest(took);
}
}
return request;
Expand All @@ -214,30 +142,27 @@ SearchRequest transformRequest(SearchRequest request) throws SearchPipelineProce
SearchResponse transformResponse(SearchRequest request, SearchResponse response) throws SearchPipelineProcessingException {
if (searchResponseProcessors.isEmpty() == false) {
long pipelineStart = relativeTimeSupplier.getAsLong();
totalResponseMetrics.before();
pipelineResponseMetrics.before();
beforeTransformResponse();
try {
for (ProcessorWithMetrics<SearchResponseProcessor> processorWithMetrics : searchResponseProcessors) {
processorWithMetrics.metrics.before();
for (SearchResponseProcessor processor : searchResponseProcessors) {
beforeResponseProcessor(processor);
long start = relativeTimeSupplier.getAsLong();
try {
response = processorWithMetrics.processor.processResponse(request, response);
response = processor.processResponse(request, response);
} catch (Exception e) {
processorWithMetrics.metrics.failed();
onResponseProcessorFailed(processor);
throw e;
} finally {
long took = TimeUnit.NANOSECONDS.toMillis(relativeTimeSupplier.getAsLong() - start);
processorWithMetrics.metrics.after(took);
afterResponseProcessor(processor, took);
}
}
} catch (Exception e) {
totalResponseMetrics.failed();
pipelineResponseMetrics.failed();
onTransformResponseFailure();
throw new SearchPipelineProcessingException(e);
} finally {
long took = TimeUnit.NANOSECONDS.toMillis(relativeTimeSupplier.getAsLong() - pipelineStart);
totalResponseMetrics.after(took);
pipelineResponseMetrics.after(took);
afterTransformResponse(took);
}
}
return response;
Expand All @@ -250,61 +175,7 @@ SearchResponse transformResponse(SearchRequest request, SearchResponse response)
Collections.emptyList(),
Collections.emptyList(),
null,
new OperationMetrics(),
new OperationMetrics(),
() -> 0L
);

void copyMetrics(Pipeline oldPipeline) {
pipelineRequestMetrics.add(oldPipeline.pipelineRequestMetrics);
pipelineResponseMetrics.add(oldPipeline.pipelineResponseMetrics);
copyProcessorMetrics(searchRequestProcessors, oldPipeline.searchRequestProcessors);
copyProcessorMetrics(searchResponseProcessors, oldPipeline.searchResponseProcessors);
}

private static <T extends Processor> void copyProcessorMetrics(
List<ProcessorWithMetrics<T>> newProcessorsWithMetrics,
List<ProcessorWithMetrics<T>> oldProcessorsWithMetrics
) {
Map<String, ProcessorWithMetrics<T>> requestProcessorsByKey = new HashMap<>();
for (ProcessorWithMetrics<T> processorWithMetrics : newProcessorsWithMetrics) {
requestProcessorsByKey.putIfAbsent(getProcessorKey(processorWithMetrics.processor), processorWithMetrics);
}
for (ProcessorWithMetrics<T> oldProcessorWithMetrics : oldProcessorsWithMetrics) {
ProcessorWithMetrics<T> newProcessor = requestProcessorsByKey.get(getProcessorKey(oldProcessorWithMetrics.processor));
if (newProcessor != null) {
newProcessor.metrics.add(oldProcessorWithMetrics.metrics);
}
}
}

private static String getProcessorKey(Processor processor) {
String key = processor.getType();
if (processor.getTag() != null) {
return key + ":" + processor.getTag();
}
return key;
}

void populateStats(SearchPipelineStats.Builder statsBuilder) {
statsBuilder.addPipelineStats(getId(), pipelineRequestMetrics, pipelineResponseMetrics);
for (ProcessorWithMetrics<SearchRequestProcessor> requestProcessorWithMetrics : searchRequestProcessors) {
Processor processor = requestProcessorWithMetrics.processor;
statsBuilder.addRequestProcessorStats(
getId(),
getProcessorKey(processor),
processor.getType(),
requestProcessorWithMetrics.metrics
);
}
for (ProcessorWithMetrics<SearchResponseProcessor> responseProcessorWithMetrics : searchResponseProcessors) {
Processor processor = responseProcessorWithMetrics.processor;
statsBuilder.addResponseProcessorStats(
getId(),
getProcessorKey(processor),
processor.getType(),
responseProcessorWithMetrics.metrics
);
}
}
}
Loading

0 comments on commit 159e568

Please sign in to comment.