-
Notifications
You must be signed in to change notification settings - Fork 72
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Reciprocal Rank Fusion (RRF) normalization technique in hybrid query #874
Changes from all commits
b3fe6c1
55917e3
7590532
93e4778
632f2e0
1b7d150
cc47084
5aeb509
274109f
92946e7
139c132
5ea082c
0fa9fcb
3862af9
677485f
6cdaa5d
896c1ba
e26e785
300e425
43493b9
4342681
aadcd35
b260942
e392328
17d4812
6e9cb61
2cc92e2
b92a2bd
c9e5fa4
08a862f
cfd0113
0647ad6
84a627c
4fdfcd2
a07b379
2734488
c93b946
2b80c81
92a10c7
f07b713
44cdc6f
a6237aa
f6d5148
08c969a
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,35 @@ | ||
/* | ||
* Copyright OpenSearch Contributors | ||
* SPDX-License-Identifier: Apache-2.0 | ||
*/ | ||
package org.opensearch.neuralsearch.processor; | ||
|
||
import lombok.AllArgsConstructor; | ||
import lombok.Builder; | ||
import lombok.Getter; | ||
import lombok.NonNull; | ||
import org.opensearch.neuralsearch.processor.combination.ScoreCombinationTechnique; | ||
import org.opensearch.neuralsearch.processor.normalization.ScoreNormalizationTechnique; | ||
import org.opensearch.search.fetch.FetchSearchResult; | ||
import org.opensearch.search.query.QuerySearchResult; | ||
|
||
import java.util.List; | ||
import java.util.Optional; | ||
|
||
/** | ||
* DTO object to hold data in NormalizationProcessorWorkflow class | ||
* in NormalizationProcessorWorkflow. | ||
*/ | ||
@AllArgsConstructor | ||
@Builder | ||
@Getter | ||
public class NormalizationExecuteDTO { | ||
@NonNull | ||
private List<QuerySearchResult> querySearchResults; | ||
@NonNull | ||
private Optional<FetchSearchResult> fetchSearchResultOptional; | ||
@NonNull | ||
private ScoreNormalizationTechnique normalizationTechnique; | ||
@NonNull | ||
private ScoreCombinationTechnique combinationTechnique; | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,26 @@ | ||
/* | ||
* Copyright OpenSearch Contributors | ||
* SPDX-License-Identifier: Apache-2.0 | ||
*/ | ||
package org.opensearch.neuralsearch.processor; | ||
|
||
import lombok.AllArgsConstructor; | ||
import lombok.Builder; | ||
import lombok.Getter; | ||
import lombok.NonNull; | ||
import org.opensearch.neuralsearch.processor.normalization.ScoreNormalizationTechnique; | ||
|
||
import java.util.List; | ||
|
||
/** | ||
* DTO object to hold data required for score normalization. | ||
*/ | ||
@AllArgsConstructor | ||
@Builder | ||
@Getter | ||
public class NormalizeScoresDTO { | ||
@NonNull | ||
private List<CompoundTopDocs> queryTopDocs; | ||
@NonNull | ||
private ScoreNormalizationTechnique normalizationTechnique; | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,139 @@ | ||
/* | ||
* Copyright OpenSearch Contributors | ||
* SPDX-License-Identifier: Apache-2.0 | ||
*/ | ||
package org.opensearch.neuralsearch.processor; | ||
|
||
import static org.opensearch.neuralsearch.search.util.HybridSearchResultFormatUtil.isHybridQueryStartStopElement; | ||
|
||
import java.util.stream.Collectors; | ||
|
||
import java.util.List; | ||
import java.util.Objects; | ||
import java.util.Optional; | ||
|
||
import lombok.Getter; | ||
import org.opensearch.neuralsearch.processor.combination.ScoreCombinationTechnique; | ||
import org.opensearch.neuralsearch.processor.normalization.ScoreNormalizationTechnique; | ||
import org.opensearch.search.fetch.FetchSearchResult; | ||
import org.opensearch.search.query.QuerySearchResult; | ||
|
||
import org.opensearch.action.search.QueryPhaseResultConsumer; | ||
import org.opensearch.action.search.SearchPhaseContext; | ||
import org.opensearch.action.search.SearchPhaseName; | ||
import org.opensearch.action.search.SearchPhaseResults; | ||
import org.opensearch.search.SearchPhaseResult; | ||
import org.opensearch.search.internal.SearchContext; | ||
import org.opensearch.search.pipeline.SearchPhaseResultsProcessor; | ||
|
||
import lombok.AllArgsConstructor; | ||
import lombok.extern.log4j.Log4j2; | ||
|
||
/** | ||
* Processor for implementing reciprocal rank fusion technique on post | ||
* query search results. Updates query results with | ||
* normalized and combined scores for next phase (typically it's FETCH) | ||
* by using ranks from individual subqueries to calculate 'normalized' | ||
* scores before combining results from subqueries into final results | ||
*/ | ||
@Log4j2 | ||
@AllArgsConstructor | ||
public class RRFProcessor implements SearchPhaseResultsProcessor { | ||
public static final String TYPE = "score-ranker-processor"; | ||
martin-gaievski marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
@Getter | ||
private final String tag; | ||
@Getter | ||
private final String description; | ||
private final ScoreNormalizationTechnique normalizationTechnique; | ||
private final ScoreCombinationTechnique combinationTechnique; | ||
yuye-aws marked this conversation as resolved.
Show resolved
Hide resolved
|
||
private final NormalizationProcessorWorkflow normalizationWorkflow; | ||
|
||
/** | ||
* Method abstracts functional aspect of score normalization and score combination. Exact methods for each processing stage | ||
* are set as part of class constructor | ||
* @param searchPhaseResult {@link SearchPhaseResults} DTO that has query search results. Results will be mutated as part of this method execution | ||
* @param searchPhaseContext {@link SearchContext} | ||
*/ | ||
@Override | ||
public <Result extends SearchPhaseResult> void process( | ||
final SearchPhaseResults<Result> searchPhaseResult, | ||
final SearchPhaseContext searchPhaseContext | ||
) { | ||
if (shouldSkipProcessor(searchPhaseResult)) { | ||
log.debug("Query results are not compatible with RRF processor"); | ||
return; | ||
} | ||
List<QuerySearchResult> querySearchResults = getQueryPhaseSearchResults(searchPhaseResult); | ||
Optional<FetchSearchResult> fetchSearchResult = getFetchSearchResults(searchPhaseResult); | ||
|
||
// make data transfer object to pass in, execute will get object with 4 or 5 fields, depending | ||
// on coming from NormalizationProcessor or RRFProcessor | ||
NormalizationExecuteDTO normalizationExecuteDTO = NormalizationExecuteDTO.builder() | ||
.querySearchResults(querySearchResults) | ||
.fetchSearchResultOptional(fetchSearchResult) | ||
.normalizationTechnique(normalizationTechnique) | ||
.combinationTechnique(combinationTechnique) | ||
.build(); | ||
normalizationWorkflow.execute(normalizationExecuteDTO); | ||
} | ||
|
||
@Override | ||
public SearchPhaseName getBeforePhase() { | ||
return SearchPhaseName.QUERY; | ||
} | ||
|
||
@Override | ||
public SearchPhaseName getAfterPhase() { | ||
return SearchPhaseName.FETCH; | ||
} | ||
|
||
@Override | ||
public String getType() { | ||
return TYPE; | ||
} | ||
|
||
@Override | ||
public boolean isIgnoreFailure() { | ||
return false; | ||
} | ||
|
||
private <Result extends SearchPhaseResult> boolean shouldSkipProcessor(SearchPhaseResults<Result> searchPhaseResult) { | ||
if (Objects.isNull(searchPhaseResult) || !(searchPhaseResult instanceof QueryPhaseResultConsumer queryPhaseResultConsumer)) { | ||
return true; | ||
} | ||
|
||
return queryPhaseResultConsumer.getAtomicArray().asList().stream().filter(Objects::nonNull).noneMatch(this::isHybridQuery); | ||
} | ||
|
||
/** | ||
* Return true if results are from hybrid query. | ||
* @param searchPhaseResult | ||
* @return true if results are from hybrid query | ||
*/ | ||
private boolean isHybridQuery(final SearchPhaseResult searchPhaseResult) { | ||
// check for delimiter at the end of the score docs. | ||
return Objects.nonNull(searchPhaseResult.queryResult()) | ||
&& Objects.nonNull(searchPhaseResult.queryResult().topDocs()) | ||
&& Objects.nonNull(searchPhaseResult.queryResult().topDocs().topDocs.scoreDocs) | ||
&& searchPhaseResult.queryResult().topDocs().topDocs.scoreDocs.length > 0 | ||
&& isHybridQueryStartStopElement(searchPhaseResult.queryResult().topDocs().topDocs.scoreDocs[0]); | ||
} | ||
|
||
private <Result extends SearchPhaseResult> List<QuerySearchResult> getQueryPhaseSearchResults( | ||
final SearchPhaseResults<Result> results | ||
) { | ||
return results.getAtomicArray() | ||
.asList() | ||
.stream() | ||
.map(result -> result == null ? null : result.queryResult()) | ||
.collect(Collectors.toList()); | ||
} | ||
|
||
private <Result extends SearchPhaseResult> Optional<FetchSearchResult> getFetchSearchResults( | ||
final SearchPhaseResults<Result> searchPhaseResults | ||
) { | ||
Optional<Result> optionalFirstSearchPhaseResult = searchPhaseResults.getAtomicArray().asList().stream().findFirst(); | ||
return optionalFirstSearchPhaseResult.map(SearchPhaseResult::fetchResult); | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,32 @@ | ||
/* | ||
* Copyright OpenSearch Contributors | ||
* SPDX-License-Identifier: Apache-2.0 | ||
*/ | ||
package org.opensearch.neuralsearch.processor.combination; | ||
|
||
import lombok.ToString; | ||
import lombok.extern.log4j.Log4j2; | ||
|
||
import java.util.Map; | ||
|
||
@Log4j2 | ||
/** | ||
* Abstracts combination of scores based on reciprocal rank fusion algorithm | ||
*/ | ||
@ToString(onlyExplicitlyIncluded = true) | ||
public class RRFScoreCombinationTechnique implements ScoreCombinationTechnique { | ||
@ToString.Include | ||
public static final String TECHNIQUE_NAME = "rrf"; | ||
|
||
// Not currently using weights for RRF, no need to modify or verify these params | ||
public RRFScoreCombinationTechnique(final Map<String, Object> params, final ScoreCombinationUtil combinationUtil) {} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm OK that weights are not supported in the first release. This class does nothing but adding all the scores together. I'm afraid it's too over designed to introduce a new class for such a single sum operation. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We're reusing NormalizationProcessorWorkflow that is quite a big class, and it accepts both normalization and combination techniques classes as input arguments. Plus it's a single responsibility principle. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I understand. For this PR, both params and combinationUtil are unused. You'd better delete them. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ack, will do in next PR, please remind me if I forget |
||
|
||
@Override | ||
public float combine(final float[] scores) { | ||
float sumScores = 0.0f; | ||
for (float score : scores) { | ||
sumScores += score; | ||
} | ||
return sumScores; | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have a very high level question: Instead of creating a new processor, can't we reuse the same normalization processor where in place of normalization technique user can sent "rrf" ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we can we don't want to. There will be some limitations if we go that route: factory logic will became overloaded and not generic anymore, mainly that because for normalization processor today we allow any combination of techniques, but that would not be the case with rrf techniques. There are some rank based techniques that we can add later, that will follow the same route and make sense only in scope of ranking.
Basically with such approach we'll be leaning towards the single multifunctional processor, while with current design it's more like smaller specialized processors.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
RFC has the single processor approach as an alternative, it has been reviewed and team agreed on a new processor approach as preferred one #865
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also having the same question.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think both of you asking different questions: @vibrantvarun is referring to a single processor for RRF and score normalization, and @yuye-aws mentioning Alternative 2, which is about adding a new processor for RRF, but exposing both normalization and combination technique as params to end-user.
I can answer both in a similar fashion:
Fundamentally score normalization and rank based combination are different, so combining them in existing normalization processor isn't intuitive. Besides that it will require additional validation logic and at the code level will ruin existing abstractions, mainly because for normalization processor today we allow pairing of any normalization technique with any combination techniques. With addition of RRF we have to break this.
RRF is leaning towards the combination method as per offline discussion with our PM, exposing normalization function doesn't make sense/not adding value.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there somewhere to validate that
RRFNormalizationTechnique
is used together withRRFScoreCombinationTechnique
? Theexecute
method inNormalizationProcessorWorkflow
class doing normalization and them combination.