Skip to content

Commit

Permalink
Resolve merge conflicts
Browse files Browse the repository at this point in the history
Signed-off-by: Michael Froh <[email protected]>
  • Loading branch information
msfroh committed Jun 28, 2023
1 parent cb845b6 commit 096417c
Show file tree
Hide file tree
Showing 4 changed files with 30 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ protected void afterResponseProcessor(Processor processor, long timeInNanos) {}

protected void onResponseProcessorFailed(Processor processor) {}

SearchRequest transformRequest(SearchRequest request) throws Exception {
SearchRequest transformRequest(SearchRequest request) {
if (searchRequestProcessors.isEmpty() == false) {
long pipelineStart = relativeTimeSupplier.getAsLong();
beforeTransformRequest();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,12 +43,22 @@ class PipelineWithMetrics extends Pipeline {
Integer version,
List<SearchRequestProcessor> requestProcessors,
List<SearchResponseProcessor> responseProcessors,
List<SearchPhaseResultsProcessor> phaseResultsProcessorsm,
NamedWriteableRegistry namedWriteableRegistry,
OperationMetrics totalRequestMetrics,
OperationMetrics totalResponseMetrics,
LongSupplier relativeTimeSupplier
) {
super(id, description, version, requestProcessors, responseProcessors, namedWriteableRegistry, relativeTimeSupplier);
super(
id,
description,
version,
requestProcessors,
responseProcessors,
phaseResultsProcessorsm,
namedWriteableRegistry,
relativeTimeSupplier
);
this.totalRequestMetrics = totalRequestMetrics;
this.totalResponseMetrics = totalResponseMetrics;
for (Processor requestProcessor : getSearchRequestProcessors()) {
Expand All @@ -64,6 +74,7 @@ static PipelineWithMetrics create(
Map<String, Object> config,
Map<String, Processor.Factory<SearchRequestProcessor>> requestProcessorFactories,
Map<String, Processor.Factory<SearchResponseProcessor>> responseProcessorFactories,
Map<String, Processor.Factory<SearchPhaseResultsProcessor>> phaseResultsProcessorFactories,
NamedWriteableRegistry namedWriteableRegistry,
OperationMetrics totalRequestProcessingMetrics,
OperationMetrics totalResponseProcessingMetrics
Expand All @@ -79,6 +90,16 @@ static PipelineWithMetrics create(
RESPONSE_PROCESSORS_KEY
);
List<SearchResponseProcessor> responseProcessors = readProcessors(responseProcessorFactories, responseProcessorConfigs);
final List<Map<String, Object>> phaseProcessorConfigs = ConfigurationUtils.readOptionalList(
null,
null,
config,
PHASE_PROCESSORS_KEY
);
final List<SearchPhaseResultsProcessor> phaseResultsProcessors = readProcessors(
phaseResultsProcessorFactories,
phaseProcessorConfigs
);
if (config.isEmpty() == false) {
throw new OpenSearchParseException(
"pipeline ["
Expand All @@ -93,6 +114,7 @@ static PipelineWithMetrics create(
version,
requestProcessors,
responseProcessors,
phaseResultsProcessors,
namedWriteableRegistry,
totalRequestProcessingMetrics,
totalResponseProcessingMetrics,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ public class SearchPipelineService implements ClusterStateApplier, ReportingServ
private final Map<String, Processor.Factory<SearchRequestProcessor>> requestProcessorFactories;
private final Map<String, Processor.Factory<SearchResponseProcessor>> responseProcessorFactories;

private final Map<String, Processor.Factory<SearchPhaseResultsProcessor>> phaseInjectorProcessorFactories;
private final Map<String, Processor.Factory<SearchPhaseResultsProcessor>> phaseResultsProcessorFactories;
private volatile Map<String, PipelineHolder> pipelines = Collections.emptyMap();
private final ThreadPool threadPool;
private final List<Consumer<ClusterState>> searchPipelineClusterStateListeners = new CopyOnWriteArrayList<>();
Expand Down Expand Up @@ -118,7 +118,7 @@ public SearchPipelineService(
);
this.requestProcessorFactories = processorFactories(searchPipelinePlugins, p -> p.getRequestProcessors(parameters));
this.responseProcessorFactories = processorFactories(searchPipelinePlugins, p -> p.getResponseProcessors(parameters));
this.phaseInjectorProcessorFactories = processorFactories(searchPipelinePlugins, p -> p.getPhaseResultsProcessors(parameters));
this.phaseResultsProcessorFactories = processorFactories(searchPipelinePlugins, p -> p.getPhaseResultsProcessors(parameters));
putPipelineTaskKey = clusterService.registerClusterManagerTask(ClusterManagerTaskKeys.PUT_SEARCH_PIPELINE_KEY, true);
deletePipelineTaskKey = clusterService.registerClusterManagerTask(ClusterManagerTaskKeys.DELETE_SEARCH_PIPELINE_KEY, true);
this.isEnabled = isEnabled;
Expand Down Expand Up @@ -184,7 +184,7 @@ void innerUpdatePipelines(SearchPipelineMetadata newSearchPipelineMetadata) {
newConfiguration.getConfigAsMap(),
requestProcessorFactories,
responseProcessorFactories,
phaseInjectorProcessorFactories,
phaseResultsProcessorFactories,
namedWriteableRegistry,
totalRequestProcessingMetrics,
totalResponseProcessingMetrics
Expand Down Expand Up @@ -284,7 +284,7 @@ void validatePipeline(Map<DiscoveryNode, SearchPipelineInfo> searchPipelineInfos
pipelineConfig,
requestProcessorFactories,
responseProcessorFactories,
phaseInjectorProcessorFactories
phaseResultsProcessorFactories,
namedWriteableRegistry,
new OperationMetrics(), // Use ephemeral metrics for validation
new OperationMetrics()
Expand Down Expand Up @@ -383,7 +383,7 @@ public PipelinedRequest resolvePipeline(SearchRequest searchRequest) {
searchRequest.source().searchPipelineSource(),
requestProcessorFactories,
responseProcessorFactories,
phaseInjectorProcessorFactories,
phaseResultsProcessorFactories,
namedWriteableRegistry,
totalRequestProcessingMetrics,
totalResponseProcessingMetrics
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -982,7 +982,7 @@ public void testStats() throws Exception {
"throwing_response",
(pf, t, f, c) -> throwingResponseProcessor
);
SearchPipelineService searchPipelineService = createWithProcessors(requestProcessors, responseProcessors);
SearchPipelineService searchPipelineService = createWithProcessors(requestProcessors, responseProcessors, Collections.emptyMap());

SearchPipelineMetadata metadata = new SearchPipelineMetadata(
Map.of(
Expand Down

0 comments on commit 096417c

Please sign in to comment.