Skip to content

Commit

Permalink
Merge branch 'millmanw-main'
Browse files Browse the repository at this point in the history
  • Loading branch information
mdavis95 committed Mar 17, 2022
2 parents acd7623 + e17078f commit b3acb92
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 11 deletions.
1 change: 1 addition & 0 deletions zulia-server/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ dependencies {
api("org.mongodb:mongodb-driver-sync:$mongoDriverVersion")

api("org.apache.commons:commons-compress:1.20")
implementation("org.xerial.snappy:snappy-java:1.1.8.4")
implementation(platform("software.amazon.awssdk:bom:$amazonVersion"))
implementation("software.amazon.awssdk:s3")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,15 @@
import com.mongodb.client.MongoDatabase;
import com.mongodb.client.model.Filters;
import com.mongodb.client.model.IndexOptions;
import com.mongodb.internal.HexUtils;
import io.zulia.message.ZuliaBase.AssociatedDocument;
import io.zulia.message.ZuliaQuery.FetchType;
import io.zulia.server.config.cluster.S3Config;
import io.zulia.server.filestorage.io.S3OutputStream;
import io.zulia.util.ZuliaUtil;
import org.bson.Document;
import org.xerial.snappy.SnappyInputStream;
import org.xerial.snappy.SnappyOutputStream;
import software.amazon.awssdk.core.ResponseInputStream;
import software.amazon.awssdk.core.sync.RequestBody;
import software.amazon.awssdk.regions.Region;
Expand All @@ -26,11 +29,9 @@
import software.amazon.awssdk.services.s3.model.ObjectIdentifier;
import software.amazon.awssdk.services.s3.model.PutObjectRequest;

import java.io.BufferedInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.Writer;
import java.io.*;
import java.nio.charset.StandardCharsets;
import java.security.MessageDigest;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Collections;
Expand Down Expand Up @@ -95,15 +96,25 @@ public void storeAssociatedDocument(AssociatedDocument doc) throws Exception {

Document TOC = parseAssociated(doc, (long) bytes.length);

String key = String.join("/", indexName, doc.getDocumentUniqueId(), doc.getFilename());
String hex = HexUtils.hexMD5(doc.getFilename().getBytes(StandardCharsets.UTF_8));
String key = String.join("/", indexName, doc.getDocumentUniqueId(), String.join(".", hex, "sz"));

Document s3Location = new Document();
s3Location.put("bucket", bucket);
s3Location.put("region", region);
s3Location.put("key", key);
TOC.put("s3", s3Location);

PutObjectRequest req = PutObjectRequest.builder().bucket(bucket).key(key).contentLength((long) bytes.length).build();
s3.putObject(req, RequestBody.fromBytes(bytes));

ByteArrayOutputStream baos = new ByteArrayOutputStream(bytes.length);
SnappyOutputStream os = new SnappyOutputStream(baos);
os.write(bytes);
os.flush();
byte[] compressed = baos.toByteArray();
os.close();

s3.putObject(req, RequestBody.fromBytes(compressed));
client.getDatabase(dbName).getCollection(COLLECTION).insertOne(TOC);
}

Expand Down Expand Up @@ -184,7 +195,9 @@ public OutputStream getAssociatedDocumentOutputStream(String uniqueId, String fi
metadataMap.put(DOCUMENT_UNIQUE_ID_KEY, uniqueId);
metadataMap.put(FILE_UNIQUE_ID_KEY, String.join("-", uniqueId, fileName));

String key = String.join("/", indexName, uniqueId, fileName);
String hex = HexUtils.hexMD5(fileName.getBytes(StandardCharsets.UTF_8));
String key = String.join("/", indexName, uniqueId, String.join(".", hex, "sz"));

Document s3Location = new Document();
s3Location.put("bucket", bucket);
s3Location.put("region", region);
Expand All @@ -193,7 +206,7 @@ public OutputStream getAssociatedDocumentOutputStream(String uniqueId, String fi

client.getDatabase(dbName).getCollection(COLLECTION).insertOne(TOC);

return new S3OutputStream(s3, bucket, key);
return new SnappyOutputStream(new S3OutputStream(s3, bucket, key));
}

@Override
Expand All @@ -204,7 +217,7 @@ public InputStream getAssociatedDocumentStream(String uniqueId, String filename)
Document s3Info = doc.get("s3", Document.class);
GetObjectRequest gor = GetObjectRequest.builder().bucket(s3Info.getString("bucket")).key(s3Info.getString("key")).build();
ResponseInputStream<GetObjectResponse> results = s3.getObject(gor);
return new BufferedInputStream(results);
return new BufferedInputStream(new SnappyInputStream(results));
}
return null;
}
Expand Down Expand Up @@ -323,7 +336,10 @@ private AssociatedDocument parseTOC(Document doc) throws IOException {
.key(s3Info.getString("key"))
.build() ;
ResponseInputStream<GetObjectResponse> results = s3.getObject(gor);
aBuilder.setDocument(ByteString.readFrom(results));
InputStream compression = new SnappyInputStream(results);
try (compression) {
aBuilder.setDocument(ByteString.readFrom(compression));
}

return aBuilder.build();
}
Expand Down

0 comments on commit b3acb92

Please sign in to comment.