Skip to content

Commit

Permalink
add snappy compression for stored BSON documents and metadata by defa…
Browse files Browse the repository at this point in the history
…ult. enable it to be turned off via an index flag in create/update index (#109)
  • Loading branch information
mdavis95 authored May 28, 2023
1 parent c2828db commit 07f1d15
Show file tree
Hide file tree
Showing 12 changed files with 86 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,10 @@ public int getRAMBufferMB() {
return indexSettings.getRamBufferMB();
}

public boolean isCompressionEnabled() {
return !indexSettings.getDisableCompression();
}

public Set<String> getMatchingFields(String field) {
return getMatchingIndexFields(field, true);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ public class UpdateIndex extends SimpleCommand<UpdateIndexRequest, UpdateIndexRe
private Integer ramBufferMB;
private Integer numberOfReplicas;

private Boolean disableCompression;

private final UpdateIndexSettings.Operation.Builder analyzerSettingsOperation = UpdateIndexSettings.Operation.newBuilder();
private List<ZuliaIndex.AnalyzerSettings> analyzerSettingsList = Collections.emptyList();

Expand Down Expand Up @@ -311,6 +313,15 @@ public UpdateIndex setRamBufferMB(Integer ramBufferMB) {
return this;
}

public Boolean getDisableCompression() {
return disableCompression;
}

public UpdateIndex setDisableCompression(Boolean disableCompression) {
this.disableCompression = disableCompression;
return this;
}

public Integer getNumberOfReplicas() {
return numberOfReplicas;
}
Expand Down Expand Up @@ -482,6 +493,11 @@ public UpdateIndexRequest getRequest() {
updateIndexSettings.setRamBufferMB(ramBufferMB);
}

if (disableCompression != null) {
updateIndexSettings.setSetDisableCompression(true);
updateIndexSettings.setDisableCompression(disableCompression);
}

updateIndexSettings.setMetaUpdateOperation(metaDataOperation);
if (!metadata.isEmpty()) {
updateIndexSettings.setMetadata(ZuliaUtil.mongoDocumentToByteString(metadata));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ public class ClientIndexConfig {
private Integer ramBufferMB;
private Integer numberOfReplicas;

private Boolean disableCompression;

private TreeMap<String, FieldConfig> fieldMap;
private TreeMap<String, AnalyzerSettings> analyzerSettingsMap;

Expand Down Expand Up @@ -102,6 +104,11 @@ public Integer getNumberOfShards() {
return numberOfShards;
}

public ClientIndexConfig setNumberOfShards(Integer numberOfShards) {
this.numberOfShards = numberOfShards;
return this;
}

public Integer getRamBufferMB() {
return ramBufferMB;
}
Expand All @@ -111,8 +118,12 @@ public ClientIndexConfig setRamBufferMB(Integer ramBufferMB) {
return this;
}

public ClientIndexConfig setNumberOfShards(Integer numberOfShards) {
this.numberOfShards = numberOfShards;
public Boolean getDisableCompression() {
return disableCompression;
}

public ClientIndexConfig setDisableCompression(Boolean disableCompression) {
this.disableCompression = disableCompression;
return this;
}

Expand Down Expand Up @@ -337,6 +348,10 @@ public IndexSettings getIndexSettings() {
isb.setRamBufferMB(ramBufferMB);
}

if (disableCompression != null) {
isb.setDisableCompression(disableCompression);
}

if (meta != null) {
isb.setMeta(ZuliaUtil.mongoDocumentToByteString(meta));
}
Expand Down Expand Up @@ -394,6 +409,7 @@ public void configure(IndexSettings indexSettings) {

this.indexWeight = indexSettings.getIndexWeight();
this.ramBufferMB = indexSettings.getRamBufferMB();
this.disableCompression = indexSettings.getDisableCompression();

this.meta = ZuliaUtil.byteStringToMongoDocument(indexSettings.getMeta());

Expand Down
1 change: 1 addition & 0 deletions zulia-common/src/main/proto/zulia_base.proto
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ message IdInfo {
uint64 timestamp = 2;
uint32 majorVersion = 3;
uint32 minorVersion = 4;
bool compressedDoc = 5;
}

enum MasterSlaveSettings {
Expand Down
6 changes: 6 additions & 0 deletions zulia-common/src/main/proto/zulia_index.proto
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ message IndexSettings {

repeated FieldMapping fieldMapping = 21;

bool disableCompression = 22;

}


Expand Down Expand Up @@ -117,6 +119,10 @@ message UpdateIndexSettings {

repeated FieldMapping fieldMapping = 29;
Operation fieldMappingOperation = 30; // keyed by alias

bool setDisableCompression = 31;
bool disableCompression = 32;

}


Expand Down
2 changes: 1 addition & 1 deletion zulia-server/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ dependencies {
implementation("org.mongodb:mongodb-driver-sync:$mongoDriverVersion")

implementation("org.apache.commons:commons-compress:1.22")
implementation("org.xerial.snappy:snappy-java:1.1.9.1")
implementation("org.xerial.snappy:snappy-java:1.1.10.0")
implementation(platform("software.amazon.awssdk:bom:$amazonVersion"))
implementation("software.amazon.awssdk:s3")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import org.apache.lucene.search.ScoreDoc;
import org.apache.lucene.search.highlight.TextFragment;
import org.apache.lucene.util.BytesRef;
import org.xerial.snappy.Snappy;

import java.io.IOException;
import java.util.ArrayList;
Expand Down Expand Up @@ -119,13 +120,19 @@ protected ZuliaQuery.ScoredResult handleDocument(LeafReaderContext currentLeaf,
if (meta) {
if (metaDocValues != null && metaDocValues.advanceExact(localDocId)) {
byte[] metaBytes = BytesRefUtil.getByteArray(metaDocValues.binaryValue());
if (idInfo.getCompressedDoc()) {
metaBytes = Snappy.uncompress(metaBytes);
}
rdBuilder.setMetadata(ByteString.copyFrom(metaBytes));
}
}

if (full) {
if (fullDocValues != null && fullDocValues.advanceExact(localDocId)) {
byte[] docBytes = BytesRefUtil.getByteArray(fullDocValues.binaryValue());
if (idInfo.getCompressedDoc()) {
docBytes = Snappy.uncompress(docBytes);
}
rdBuilder.setDocument(ByteString.copyFrom(docBytes));

if (needsHighlight || needsAnalysis || needsDocFiltering) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.apache.lucene.index.VectorSimilarityFunction;
import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.NumericUtils;
import org.xerial.snappy.Snappy;

import java.io.IOException;
import java.nio.ByteBuffer;
Expand Down Expand Up @@ -80,18 +81,21 @@ public Document getIndexDocument(String uniqueId, long timestamp, DocumentContai
luceneDocument.add(new SortedSetDocValuesField(idSortField, new BytesRef(uniqueId)));
luceneDocument.add(new LongPoint(ZuliaFieldConstants.TIMESTAMP_FIELD, timestamp));

boolean compressionEnabled = indexConfig.isCompressionEnabled();
ZuliaBase.IdInfo idInfo = ZuliaBase.IdInfo.newBuilder().setId(uniqueId).setTimestamp(timestamp).setMajorVersion(majorVersion)
.setMinorVersion(minorVersion).build();
.setMinorVersion(minorVersion).setCompressedDoc(compressionEnabled).build();

byte[] idInfoBytes = idInfo.toByteArray();

luceneDocument.add(new BinaryDocValuesField(ZuliaFieldConstants.STORED_ID_FIELD, new BytesRef(idInfoBytes)));

if (metadata.hasDocument()) {
luceneDocument.add(new BinaryDocValuesField(ZuliaFieldConstants.STORED_META_FIELD, new BytesRef(metadata.getByteArray())));
byte[] bytes = compressionEnabled ? Snappy.compress(metadata.getByteArray()) : metadata.getByteArray();
luceneDocument.add(new BinaryDocValuesField(ZuliaFieldConstants.STORED_META_FIELD, new BytesRef(bytes)));
}
if (mongoDocument.hasDocument()) {
luceneDocument.add(new BinaryDocValuesField(ZuliaFieldConstants.STORED_DOC_FIELD, new BytesRef(mongoDocument.getByteArray())));
byte[] bytes = compressionEnabled ? Snappy.compress(mongoDocument.getByteArray()) : mongoDocument.getByteArray();
luceneDocument.add(new BinaryDocValuesField(ZuliaFieldConstants.STORED_DOC_FIELD, new BytesRef(bytes)));
addUserFields(mongoDocument.getDocument(), luceneDocument, taxoWriter);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -553,6 +553,10 @@ public UpdateIndexResponse updateIndex(UpdateIndexRequest request) throws Except
existingSettings.setIndexWeight(updateIndexSettings.getIndexWeight());
}

if (updateIndexSettings.getSetDisableCompression()) {
existingSettings.setDisableCompression(updateIndexSettings.getDisableCompression());
}

Operation metaUpdateOperation = updateIndexSettings.getMetaUpdateOperation();
if (metaUpdateOperation.getEnable()) {
Document existingMeta = ZuliaUtil.byteStringToMongoDocument(existingSettings.getMeta());
Expand Down
13 changes: 11 additions & 2 deletions zulia-server/src/main/java/io/zulia/server/index/ZuliaShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import io.zulia.server.search.ShardQuery;
import io.zulia.server.util.BytesRefUtil;
import org.apache.lucene.search.Query;
import org.xerial.snappy.Snappy;

import java.io.IOException;
import java.util.EnumSet;
Expand Down Expand Up @@ -157,8 +158,16 @@ public void reindex() throws IOException {

String uniqueId = idInfo.getId();

DocumentContainer metadata = new DocumentContainer(d.meta());
DocumentContainer mongoDocument = new DocumentContainer(d.fullDoc());
DocumentContainer metadata;
DocumentContainer mongoDocument;
if (idInfo.getCompressedDoc()) {
metadata = new DocumentContainer(d.meta() != null ? Snappy.uncompress(BytesRefUtil.getByteArray(d.meta())) : null);
mongoDocument = new DocumentContainer(d.fullDoc() != null ? Snappy.uncompress(BytesRefUtil.getByteArray(d.fullDoc())) : null);
}
else {
metadata = new DocumentContainer(d.meta());
mongoDocument = new DocumentContainer(d.fullDoc());
}

if (!trackedIds.contains(uniqueId)) {
shardWriteManager.indexDocument(uniqueId, timestamp, mongoDocument, metadata);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ public void createIndex() throws Exception {
.setPinToCache(true));

indexConfig.setIndexName(INDEX_TEST);
indexConfig.setDisableCompression(true);
indexConfig.setNumberOfShards(1);

zuliaWorkPool.createIndex(indexConfig);
Expand All @@ -70,6 +71,8 @@ public void createIndex() throws Exception {

ClientIndexConfig indexConfigFromServer = zuliaWorkPool.getIndexConfig(INDEX_TEST).getIndexConfig();

Assertions.assertTrue(indexConfigFromServer.getDisableCompression());

Assertions.assertEquals(4, indexConfigFromServer.getFieldConfigMap().size());

ZuliaIndex.FieldConfig idFieldConfig = indexConfigFromServer.getFieldConfig("id");
Expand Down Expand Up @@ -139,13 +142,17 @@ public void createIndex() throws Exception {
new Search(INDEX_TEST).setSearchLabel("searching for cash").addQuery(new ScoredQuery("title:cash")).setPinToCache(true));
indexConfig.addFieldMapping(new FieldMapping("title").addMappedFields("category").includeSelf());
indexConfig.addFieldMapping(new FieldMapping("test").addMappedFields("title", "category"));
indexConfig.setDisableCompression(false);

zuliaWorkPool.createIndex(indexConfig);

}

{
ClientIndexConfig indexConfigFromServer = zuliaWorkPool.getIndexConfig(INDEX_TEST).getIndexConfig();

Assertions.assertFalse(indexConfigFromServer.getDisableCompression());

Assertions.assertEquals(indexConfigFromServer.getFieldConfigMap().size(), 3);

List<String> defaultSearchFields = indexConfigFromServer.getDefaultSearchFields();
Expand All @@ -163,6 +170,7 @@ public void updateIndex() throws Exception {
{
UpdateIndex updateIndex = new UpdateIndex(INDEX_TEST);
updateIndex.setIndexWeight(4);
updateIndex.setDisableCompression(true);

FieldConfigBuilder newField = FieldConfigBuilder.createString("newField").indexAs(DefaultAnalyzers.LC_KEYWORD).sort();
updateIndex.mergeFieldConfig(newField);
Expand All @@ -176,6 +184,7 @@ public void updateIndex() throws Exception {
ClientIndexConfig indexConfigFromServer = zuliaWorkPool.getIndexConfig(INDEX_TEST).getIndexConfig();

Assertions.assertEquals(4, indexConfigFromServer.getIndexWeight());
Assertions.assertTrue(indexConfigFromServer.getDisableCompression());
Assertions.assertEquals(4, indexConfigFromServer.getFieldConfigMap().size());
ZuliaIndex.FieldConfig newField = indexConfigFromServer.getFieldConfig("newField");
Assertions.assertEquals(1, newField.getSortAsCount());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ public void createIndex() throws Exception {
indexConfig.setIndexName(FACET_TEST_INDEX);
indexConfig.setNumberOfShards(1);
indexConfig.setShardCommitInterval(20); //force some commits
indexConfig.setDisableCompression(true);

//optional meta
indexConfig.setMeta(new Document().append("createTime", new Date()).append("myLabel", "greatLabel"));
Expand Down Expand Up @@ -386,6 +387,7 @@ public void reindex() throws Exception {
indexConfig.addFieldConfig(FieldConfigBuilder.createBool("testBool").index().facet().sort());
indexConfig.setIndexName(FACET_TEST_INDEX);
indexConfig.setNumberOfShards(1);
indexConfig.setDisableCompression(false); // default values, just for clarity

zuliaWorkPool.createIndex(indexConfig);

Expand Down

0 comments on commit 07f1d15

Please sign in to comment.