Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reuse KNNVectorFieldData for reduce disk usage #1571

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),

## [Unreleased 3.0](https://github.com/opensearch-project/k-NN/compare/2.x...HEAD)
### Features
* Reuse KNNVectorFieldData for reduce disk usage [#1571](https://github.com/opensearch-project/k-NN/pull/1571)
### Enhancements
### Bug Fixes
### Infrastructure
Expand Down
24 changes: 23 additions & 1 deletion src/main/java/org/opensearch/knn/index/KNNSettings.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.opensearch.core.common.unit.ByteSizeValue;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.index.IndexModule;
import org.opensearch.index.IndexSettings;
import org.opensearch.knn.index.memory.NativeMemoryCacheManager;
import org.opensearch.knn.index.memory.NativeMemoryCacheManagerDto;
import org.opensearch.knn.index.util.IndexHyperParametersUtil;
Expand Down Expand Up @@ -80,6 +81,7 @@ public class KNNSettings {
public static final String MODEL_CACHE_SIZE_LIMIT = "knn.model.cache.size.limit";
public static final String ADVANCED_FILTERED_EXACT_SEARCH_THRESHOLD = "index.knn.advanced.filtered_exact_search_threshold";
public static final String KNN_FAISS_AVX2_DISABLED = "knn.faiss.avx2.disabled";
public static final String KNN_SYNTHETIC_SOURCE_ENABLED = "index.knn.synthetic_source.enabled";

/**
* Default setting values
Expand Down Expand Up @@ -252,6 +254,13 @@ public class KNNSettings {
NodeScope
);

public static final Setting<Boolean> KNN_SYNTHETIC_SOURCE_ENABLED_SETTING = Setting.boolSetting(
KNN_SYNTHETIC_SOURCE_ENABLED,
false,
IndexScope,
Dynamic
);

/**
* Dynamic settings
*/
Expand Down Expand Up @@ -369,6 +378,10 @@ private Setting<?> getSetting(String key) {
return KNN_VECTOR_STREAMING_MEMORY_LIMIT_PCT_SETTING;
}

if (KNN_SYNTHETIC_SOURCE_ENABLED.equals(key)) {
return KNN_SYNTHETIC_SOURCE_ENABLED_SETTING;
}

throw new IllegalArgumentException("Cannot find setting by key [" + key + "]");
}

Expand All @@ -387,7 +400,8 @@ public List<Setting<?>> getSettings() {
MODEL_CACHE_SIZE_LIMIT_SETTING,
ADVANCED_FILTERED_EXACT_SEARCH_THRESHOLD_SETTING,
KNN_FAISS_AVX2_DISABLED_SETTING,
KNN_VECTOR_STREAMING_MEMORY_LIMIT_PCT_SETTING
KNN_VECTOR_STREAMING_MEMORY_LIMIT_PCT_SETTING,
KNN_SYNTHETIC_SOURCE_ENABLED_SETTING
);
return Stream.concat(settings.stream(), dynamicCacheSettings.values().stream()).collect(Collectors.toList());
}
Expand Down Expand Up @@ -432,6 +446,14 @@ public static Integer getFilteredExactSearchThreshold(final String indexName) {
.getAsInt(ADVANCED_FILTERED_EXACT_SEARCH_THRESHOLD, ADVANCED_FILTERED_EXACT_SEARCH_THRESHOLD_DEFAULT_VALUE);
}

/**
* check this index enabled/disabled synthetic source
* @param indexSettings settings
*/
public static boolean isKNNSyntheticSourceEnabled(IndexSettings indexSettings) {
return indexSettings.getValue(KNN_SYNTHETIC_SOURCE_ENABLED_SETTING);
}

public void initialize(Client client, ClusterService clusterService) {
this.client = client;
this.clusterService = clusterService;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,16 @@

package org.opensearch.knn.index;

import org.apache.lucene.index.BinaryDocValues;
import org.apache.lucene.index.DocValues;
import org.apache.lucene.index.FieldInfo;
import org.apache.lucene.index.LeafReader;
import org.apache.lucene.search.DocIdSetIterator;
import org.opensearch.index.fielddata.LeafFieldData;
import org.opensearch.index.fielddata.ScriptDocValues;
import org.opensearch.index.fielddata.SortedBinaryDocValues;
import org.opensearch.index.mapper.DocValueFetcher;
import org.opensearch.search.DocValueFormat;

import java.io.IOException;

Expand Down Expand Up @@ -70,4 +73,41 @@ public ScriptDocValues<float[]> getScriptValues() {
public SortedBinaryDocValues getBytesValues() {
throw new UnsupportedOperationException("knn vector field '" + fieldName + "' doesn't support sorting");
}

@Override
public DocValueFetcher.Leaf getLeafValueFetcher(DocValueFormat format) {
final BinaryDocValues binaryDocValues;

try {
binaryDocValues = DocValues.getBinary(reader, fieldName);
} catch (IOException e) {
throw new IllegalStateException("Cannot load KNNDocValues from lucene", e);
}

return new DocValueFetcher.Leaf() {
float[] floats;
boolean docExists = false;

@Override
public boolean advanceExact(int docId) throws IOException {
if (binaryDocValues.advanceExact(docId)) {
docExists = true;
floats = vectorDataType.getVectorFromBytesRef(binaryDocValues.binaryValue());
return docExists;
}
docExists = false;
return docExists;
}

@Override
public int docValueCount() throws IOException {
return 1;
}

@Override
public Object nextValue() throws IOException {
return floats;
}
};
}
}
259 changes: 259 additions & 0 deletions src/main/java/org/opensearch/knn/index/fetch/KNNFetchSubPhase.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,259 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*
* Modifications Copyright OpenSearch Contributors. See
* GitHub history for details.
*/

package org.opensearch.knn.index.fetch;

import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.extern.log4j.Log4j2;
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.search.DocIdSetIterator;
import org.apache.lucene.search.Query;
import org.apache.lucene.search.ScoreMode;
import org.apache.lucene.search.Scorer;
import org.apache.lucene.search.Weight;
import org.apache.lucene.util.BitSet;
import org.opensearch.common.io.stream.BytesStreamOutput;
import org.opensearch.common.lucene.search.Queries;
import org.opensearch.common.xcontent.XContentType;
import org.opensearch.core.common.bytes.BytesReference;
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.index.IndexSettings;
import org.opensearch.index.mapper.DocValueFetcher;
import org.opensearch.index.mapper.DocumentMapper;
import org.opensearch.index.mapper.MappedFieldType;
import org.opensearch.index.mapper.MapperService;
import org.opensearch.index.mapper.ObjectMapper;
import org.opensearch.index.mapper.ValueFetcher;
import org.opensearch.knn.index.KNNSettings;
import org.opensearch.knn.index.mapper.KNNVectorFieldMapper;
import org.opensearch.search.SearchHit;
import org.opensearch.search.fetch.FetchContext;
import org.opensearch.search.fetch.FetchSubPhase;
import org.opensearch.search.fetch.FetchSubPhaseProcessor;
import org.opensearch.search.internal.ContextIndexSearcher;
import org.opensearch.search.lookup.SourceLookup;

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import static org.opensearch.knn.common.KNNConstants.BYTES_PER_KILOBYTES;

/**
* Fetch sub phase which pull data from doc values.
* and fulfill the value into source map
*/
@Log4j2
public class KNNFetchSubPhase implements FetchSubPhase {
navneet1v marked this conversation as resolved.
Show resolved Hide resolved
luyuncheng marked this conversation as resolved.
Show resolved Hide resolved

@Override
public FetchSubPhaseProcessor getProcessor(FetchContext fetchContext) throws IOException {
luyuncheng marked this conversation as resolved.
Show resolved Hide resolved
IndexSettings indexSettings = fetchContext.getIndexSettings();
if (!KNNSettings.isKNNSyntheticSourceEnabled(indexSettings)) {
log.debug("Synthetic is disabled for index: {}", fetchContext.getIndexName());
return null;
}
MapperService mapperService = fetchContext.mapperService();

List<DocValueField> fields = new ArrayList<>();
for (MappedFieldType mappedFieldType : mapperService.fieldTypes()) {
if (mappedFieldType != null && mappedFieldType instanceof KNNVectorFieldMapper.KNNVectorFieldType) {
String fieldName = mappedFieldType.name();
ValueFetcher fetcher = new DocValueFetcher(
mappedFieldType.docValueFormat(null, null),
fetchContext.searchLookup().doc().getForField(mappedFieldType)
);
fields.add(new DocValueField(fieldName, fetcher));
}
}
return new KNNFetchSubPhaseProcessor(fetchContext, fields);
}

@AllArgsConstructor
@Getter
class KNNFetchSubPhaseProcessor implements FetchSubPhaseProcessor {

private final FetchContext fetchContext;
private final List<DocValueField> fields;

@Override
public void setNextReader(LeafReaderContext readerContext) throws IOException {
for (DocValueField f : fields) {
f.fetcher.setNextReader(readerContext);
}
}

@Override
public void process(HitContext hitContext) throws IOException {
MapperService mapperService = fetchContext.mapperService();
final boolean hasNested = mapperService.hasNested();
SearchHit hit = hitContext.hit();
Map<String, Object> maps = hit.getSourceAsMap();
if (maps == null) {
// when source is disabled, return
return;
}

if (hasNested) {
syntheticNestedFieldWithDocValues(mapperService, hitContext, maps);
}
for (DocValueField f : fields) {
if (maps.containsKey(f.field)) {
continue;
}
List<Object> docValuesSource = f.fetcher.fetchValues(hitContext.sourceLookup());
if (docValuesSource.size() > 0) {
maps.put(f.field, docValuesSource.get(0));
}
}
BytesStreamOutput streamOutput = new BytesStreamOutput(BYTES_PER_KILOBYTES);
jmazanec15 marked this conversation as resolved.
Show resolved Hide resolved
XContentBuilder builder = new XContentBuilder(XContentType.JSON.xContent(), streamOutput);
builder.value(maps);
hitContext.hit().sourceRef(BytesReference.bytes(builder));
}

protected void syntheticNestedFieldWithDocValues(MapperService mapperService, HitContext hitContext, Map<String, Object> sourceMaps)
throws IOException {
luyuncheng marked this conversation as resolved.
Show resolved Hide resolved
DocumentMapper documentMapper = mapperService.documentMapper();
Map<String, ObjectMapper> mapperMap = documentMapper.objectMappers();

for (ObjectMapper objectMapper : mapperMap.values()) {
if (objectMapper == null) {
continue;
}
if (!objectMapper.nested().isNested()) {
continue;
}
String path = objectMapper.fullPath();
for (DocValueField f : fields) {
if (!checkNestedField(path, f, sourceMaps)) {
continue;
}
// nested array in one nested path
Object nestedObj = sourceMaps.get(path);
ArrayList nestedDocList = (ArrayList) nestedObj;

log.debug(
"object mapper: nested:"
+ objectMapper.nested().isNested()
+ " Value:"
+ objectMapper.fullPath()
+ " field:"
+ f.field
);

innerProcessOneNestedField(objectMapper, hitContext, nestedDocList, f, path);
}
}
}

private void innerProcessOneNestedField(
ObjectMapper objectMapper,
HitContext hitContext,
ArrayList nestedDocList,
DocValueField f,
String path
) throws IOException {

BitSet parentBits = getParentDocBitSet(hitContext);
DocIdSetIterator childIter = getChildDocIdSetIterator(objectMapper, hitContext);
LeafReaderContext subReaderContext = hitContext.readerContext();

SearchHit hit = hitContext.hit();
int currentParent = hit.docId() - subReaderContext.docBase;
int previousParent = parentBits.prevSetBit(currentParent - 1);
int childDocId = childIter.advance(previousParent + 1);
SourceLookup nestedVecSourceLookup = new SourceLookup();

// when nested field only have vector field and exclude source, list is empty
boolean isEmpty = nestedDocList.isEmpty();

for (int offset = 0; childDocId < currentParent && childDocId != DocIdSetIterator.NO_MORE_DOCS; childDocId = childIter
.nextDoc(), offset++) {
nestedVecSourceLookup.setSegmentAndDocument(subReaderContext, childDocId);
List<Object> nestedVecDocValuesSource = f.fetcher.fetchValues(nestedVecSourceLookup);
if (nestedVecDocValuesSource == null || nestedVecDocValuesSource.isEmpty()) {
continue;
}
if (isEmpty) {
nestedDocList.add(new HashMap<String, Object>());
}
if (offset < nestedDocList.size()) {
Object sourceObj = nestedDocList.get(offset);
if (sourceObj instanceof Map) {
Map<String, Object> sourceMap = (Map<String, Object>) sourceObj;
String suffix = f.field.substring(path.length() + 1);
sourceMap.put(suffix, nestedVecDocValuesSource.get(0));
}
} else {
/**
* TODO nested field partial doc only have vector and source exclude
* this source map nestedDocList would out-of-order, can not fill the vector into right offset
* "nested_field" : [
* {"nested_vector": [2.6, 2.6]},
* {"nested_numeric": 2, "nested_vector": [3.1, 2.3]}
* ]
*/
throw new UnsupportedOperationException(
String.format("\"Nested Path \"%s\" in Field \"%s\" with _ID \"%s\" can not be empty\"", path, f.field, hit.getId())
);
}
}
}

private BitSet getParentDocBitSet(HitContext hitContext) throws IOException {
Query parentFilter = Queries.newNonNestedFilter();
LeafReaderContext subReaderContext = hitContext.readerContext();
BitSet parentBits = fetchContext.getQueryShardContext().bitsetFilter(parentFilter).getBitSet(subReaderContext);
return parentBits;
}

private DocIdSetIterator getChildDocIdSetIterator(ObjectMapper objectMapper, HitContext hitContext) throws IOException {
Query childFilter = objectMapper.nestedTypeFilter();
ContextIndexSearcher searcher = fetchContext.searcher();
LeafReaderContext subReaderContext = hitContext.readerContext();
final Weight childWeight = searcher.createWeight(searcher.rewrite(childFilter), ScoreMode.COMPLETE_NO_SCORES, 1f);
Scorer childScorer = childWeight.scorer(subReaderContext);
DocIdSetIterator childIter = childScorer.iterator();
return childIter;
}

private boolean checkNestedField(String path, DocValueField f, Map<String, Object> sourceMaps) {
if (!f.field.startsWith(path)) {
return false;
}
if (!sourceMaps.containsKey(path)) {
return false;
}

// path to nested field:
Object nestedObj = sourceMaps.get(path);
if (!(nestedObj instanceof ArrayList)) {
return false;
}
return true;
}
}

@Getter
public static class DocValueField {
private final String field;
private final ValueFetcher fetcher;

DocValueField(String field, ValueFetcher fetcher) {
this.field = field;
this.fetcher = fetcher;
}
}
}
Loading
Loading