diff --git a/gradle.properties b/gradle.properties index fed62c48..6bea4621 100644 --- a/gradle.properties +++ b/gradle.properties @@ -5,4 +5,5 @@ grpcVersion=1.40.1 protobufVersion=3.17.3 micronautVersion=3.2.3 okHttpVersion=4.9.3 -gsonVersion=2.8.9 \ No newline at end of file +gsonVersion=2.8.9 +amazonVersion=2.17.140 \ No newline at end of file diff --git a/zulia-server/build.gradle.kts b/zulia-server/build.gradle.kts index 7d7bf03d..608c9088 100644 --- a/zulia-server/build.gradle.kts +++ b/zulia-server/build.gradle.kts @@ -9,6 +9,7 @@ val luceneVersion: String by project val mongoDriverVersion: String by project val protobufVersion: String by project val micronautVersion: String by project +val amazonVersion: String by project defaultTasks("build", "installDist") @@ -30,6 +31,8 @@ dependencies { api("org.mongodb:mongodb-driver-sync:$mongoDriverVersion") api("org.apache.commons:commons-compress:1.20") + implementation(platform("software.amazon.awssdk:bom:$amazonVersion")) + implementation("software.amazon.awssdk:s3") annotationProcessor(platform("io.micronaut:micronaut-bom:$micronautVersion")) annotationProcessor("io.micronaut:micronaut-inject-java") diff --git a/zulia-server/src/main/java/io/zulia/server/config/ZuliaConfig.java b/zulia-server/src/main/java/io/zulia/server/config/ZuliaConfig.java index 20f42783..3c72e709 100644 --- a/zulia-server/src/main/java/io/zulia/server/config/ZuliaConfig.java +++ b/zulia-server/src/main/java/io/zulia/server/config/ZuliaConfig.java @@ -2,6 +2,7 @@ import io.zulia.server.config.cluster.MongoAuth; import io.zulia.server.config.cluster.MongoServer; +import io.zulia.server.config.cluster.S3Config; import jakarta.inject.Singleton; import java.util.Collections; @@ -13,6 +14,8 @@ public class ZuliaConfig { private String dataPath = "data"; private boolean cluster = false; private String clusterName = "zulia"; + private String clusterStorageEngine = "gridfs"; + private S3Config s3; private List mongoServers = Collections.singletonList(new MongoServer()); private MongoAuth mongoAuth; private String serverAddress = null; //null means autodetect @@ -85,4 +88,20 @@ public MongoAuth getMongoAuth() { public void setMongoAuth(MongoAuth mongoAuth) { this.mongoAuth = mongoAuth; } + + public String getClusterStorageEngine() { + return clusterStorageEngine; + } + + public void setClusterStorageEngine(String clusterStorageEngine) { + this.clusterStorageEngine = clusterStorageEngine; + } + + public S3Config getS3() { + return s3; + } + + public void setS3(S3Config s3) { + this.s3 = s3; + } } diff --git a/zulia-server/src/main/java/io/zulia/server/config/cluster/S3Config.java b/zulia-server/src/main/java/io/zulia/server/config/cluster/S3Config.java new file mode 100644 index 00000000..9dc04505 --- /dev/null +++ b/zulia-server/src/main/java/io/zulia/server/config/cluster/S3Config.java @@ -0,0 +1,22 @@ +package io.zulia.server.config.cluster; + +public class S3Config { + private String s3BucketName; + private String region; + + public String getS3BucketName() { + return s3BucketName; + } + + public void setS3BucketName(String s3BucketName) { + this.s3BucketName = s3BucketName; + } + + public String getRegion() { + return region; + } + + public void setRegion(String region) { + this.region = region; + } +} diff --git a/zulia-server/src/main/java/io/zulia/server/filestorage/S3DocumentStorage.java b/zulia-server/src/main/java/io/zulia/server/filestorage/S3DocumentStorage.java new file mode 100644 index 00000000..ad26858a --- /dev/null +++ b/zulia-server/src/main/java/io/zulia/server/filestorage/S3DocumentStorage.java @@ -0,0 +1,320 @@ +package io.zulia.server.filestorage; + +import com.google.protobuf.ByteString; +import com.mongodb.BasicDBObject; +import com.mongodb.client.FindIterable; +import com.mongodb.client.MongoClient; +import com.mongodb.client.MongoCollection; +import com.mongodb.client.MongoDatabase; +import com.mongodb.client.model.Filters; +import com.mongodb.client.model.IndexOptions; +import io.zulia.message.ZuliaBase.*; +import io.zulia.message.ZuliaQuery.*; +import io.zulia.server.config.cluster.S3Config; +import io.zulia.server.filestorage.io.S3OutputStream; +import io.zulia.util.ZuliaUtil; +import org.bson.Document; +import software.amazon.awssdk.core.ResponseInputStream; +import software.amazon.awssdk.core.sync.RequestBody; +import software.amazon.awssdk.regions.Region; +import software.amazon.awssdk.services.s3.S3Client; +import software.amazon.awssdk.services.s3.model.*; + +import java.io.*; +import java.time.Instant; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Date; +import java.util.List; +import java.util.concurrent.ForkJoinPool; +import java.util.stream.Collectors; + +public class S3DocumentStorage implements DocumentStorage { + private static final String TIMESTAMP = "_tstamp_"; + private static final String DOCUMENT_UNIQUE_ID_KEY = "_uid_"; + private static final String FILE_UNIQUE_ID_KEY = "_fid_"; + private static final String COLLECTION = "associatedFiles.info"; + public static final String FILENAME = "filename"; + + private final MongoClient client; + private final String indexName; + private final String dbName; + private final boolean sharded; + private final String bucket; + private final S3Client s3; + private final String region; + + public S3DocumentStorage(MongoClient mongoClient, String indexName, String dbName, boolean sharded, S3Config s3Config) { + if (null == s3Config) + throw new IllegalArgumentException("Must provide the s3 config section"); + if (null == s3Config.getS3BucketName()) + throw new IllegalArgumentException("Must provide the S3 bucket that is going to be used to store content"); + if (null == s3Config.getRegion()) + throw new IllegalArgumentException("Must provide the region the s3 bucket lives in."); + this.bucket = s3Config.getS3BucketName(); + this.region = s3Config.getRegion(); + this.client = mongoClient; + this.indexName = indexName; + this.dbName = dbName; + this.sharded = sharded; + this.s3 = S3Client.builder().region(Region.of(this.region)).build(); + + ForkJoinPool.commonPool().execute(() -> { + MongoDatabase db = client.getDatabase(dbName); + MongoCollection coll = db.getCollection(COLLECTION); + coll.createIndex(new Document("metadata." + DOCUMENT_UNIQUE_ID_KEY, 1), new IndexOptions().background(true)); + coll.createIndex(new Document("metadata." + FILE_UNIQUE_ID_KEY, 1), new IndexOptions().background(true)); + if (sharded) { + MongoDatabase adminDb = client.getDatabase("admin"); + Document enableCommand = new Document(); + enableCommand.put("enablesharding", dbName); + adminDb.runCommand(enableCommand); + + Document shardCommand = new Document(); + MongoCollection collection = db.getCollection(COLLECTION); + shardCommand.put("shardcollection", collection.getNamespace().getFullName()); + shardCommand.put("key", new BasicDBObject("_id", 1)); + adminDb.runCommand(shardCommand); + } + }); + } + + @Override + public void storeAssociatedDocument(AssociatedDocument doc) throws Exception { + byte[] bytes = doc.getDocument().toByteArray(); + + Document TOC = parseAssociated(doc, (long) bytes.length); + + String key = String.join("/", indexName, doc.getDocumentUniqueId(), doc.getFilename()); + Document s3Location = new Document(); + s3Location.put("bucket", bucket); + s3Location.put("region", region); + s3Location.put("key", key); + TOC.put("s3", s3Location); + + PutObjectRequest req = PutObjectRequest.builder().bucket(bucket).key(key).contentLength((long) bytes.length).build(); + s3.putObject(req, RequestBody.fromBytes(bytes)); + client.getDatabase(dbName).getCollection(COLLECTION).insertOne(TOC); + } + + @Override + public List getAssociatedDocuments(String uniqueId, FetchType fetchType) throws Exception { + if (FetchType.NONE.equals(fetchType)) + return Collections.emptyList(); + FindIterable found = client.getDatabase(dbName).getCollection(COLLECTION).find(Filters.eq("metadata." + DOCUMENT_UNIQUE_ID_KEY, uniqueId)); + + List docs = new ArrayList<>(); + for (Document doc : found) { + docs.add(parseTOC(doc)); + } + return docs; + } + + @Override + public AssociatedDocument getAssociatedDocument(String uniqueId, String filename, FetchType fetchType) throws Exception { + if (!FetchType.NONE.equals(fetchType)) { + String uid = String.join("-", uniqueId, filename); + FindIterable found = client.getDatabase(dbName).getCollection(COLLECTION).find(Filters.eq("metadata." + FILE_UNIQUE_ID_KEY, uid)); + Document doc = found.first(); + if (null != doc) { + return parseTOC(doc); + } + } + return null; + } + + @Override + public void getAssociatedDocuments(Writer outputstream, Document filter) throws Exception { + FindIterable found = client.getDatabase(dbName).getCollection(COLLECTION).find(filter); + outputstream.write("{\n"); + outputstream.write(" \"associatedDocs\": [\n"); + + boolean first = true; + for (Document doc : found) { + if (first) { + first = false; + } + else { + outputstream.write(",\n"); + } + + Document metadata = doc.get("metadata", Document.class); + + String uniqueId = metadata.getString(DOCUMENT_UNIQUE_ID_KEY); + outputstream.write(" { \"uniqueId\": \"" + uniqueId + "\", "); + + String filename = doc.getString("filename"); + outputstream.write("\"filename\": \"" + filename + "\", "); + + Date uploadDate = doc.getDate("uploadDate"); + outputstream.write("\"uploadDate\": {\"$date\":" + uploadDate.getTime() + "}"); + + metadata.remove(TIMESTAMP); + metadata.remove(DOCUMENT_UNIQUE_ID_KEY); + metadata.remove(FILE_UNIQUE_ID_KEY); + + if (!metadata.isEmpty()) { + String metaJson = metadata.toJson(); + String metaString = ", \"meta\": " + metaJson; + outputstream.write(metaString); + } + outputstream.write(" }"); + } + outputstream.write("\n ]\n}"); + } + + @Override + public OutputStream getAssociatedDocumentOutputStream(String uniqueId, String fileName, long timestamp, Document metadataMap) throws Exception { + deleteAssociatedDocument(uniqueId, fileName); + + Document TOC = new Document(); + TOC.put("filename", fileName); + TOC.put("metadata", metadataMap); + metadataMap.put(TIMESTAMP, timestamp); + metadataMap.put(DOCUMENT_UNIQUE_ID_KEY, uniqueId); + metadataMap.put(FILE_UNIQUE_ID_KEY, String.join("-", uniqueId, fileName)); + + String key = String.join("/", indexName, uniqueId, fileName); + Document s3Location = new Document(); + s3Location.put("bucket", bucket); + s3Location.put("region", region); + s3Location.put("key", key); + TOC.put("s3", s3Location); + + client.getDatabase(dbName).getCollection(COLLECTION).insertOne(TOC); + + return new S3OutputStream(s3, bucket, key); + } + + @Override + public InputStream getAssociatedDocumentStream(String uniqueId, String filename) throws Exception { + FindIterable found = client.getDatabase(dbName).getCollection(COLLECTION).find(Filters.eq("metadata." + FILE_UNIQUE_ID_KEY, String.join("-", uniqueId, filename))); + Document doc = found.first(); + if (null != doc) { + Document s3Info = doc.get("s3", Document.class); + GetObjectRequest gor = GetObjectRequest.builder().bucket(s3Info.getString("bucket")).key(s3Info.getString("key")).build(); + ResponseInputStream results = s3.getObject(gor); + return results; + } + return null; + } + + @Override + public List getAssociatedFilenames(String uniqueId) throws Exception { + FindIterable found = client.getDatabase(dbName).getCollection(COLLECTION).find(Filters.eq("metadata." + DOCUMENT_UNIQUE_ID_KEY, uniqueId)); + List files = new ArrayList<>(); + found.map(doc -> doc.getString(FILENAME)).forEach(files::add); + return files; + } + + @Override + public void deleteAssociatedDocument(String uniqueId, String fileName) throws Exception { + FindIterable found = client.getDatabase(dbName).getCollection(COLLECTION).find(Filters.eq("metadata." + FILE_UNIQUE_ID_KEY, String.join("-", uniqueId, fileName))); + Document doc = found.first(); + if (null != doc) { + client.getDatabase(dbName).getCollection(COLLECTION).deleteOne(Filters.eq("_id", doc.getObjectId("_id"))); + Document s3Info = doc.get("s3", Document.class); + DeleteObjectRequest dor = DeleteObjectRequest.builder().bucket(s3Info.getString("bucket")).key(s3Info.getString("key")).build(); + s3.deleteObject(dor); + } + } + + @Override + public void deleteAssociatedDocuments(String uniqueId) throws Exception { + FindIterable found = client.getDatabase(dbName).getCollection(COLLECTION).find(Filters.eq("metadata." + DOCUMENT_UNIQUE_ID_KEY, uniqueId)); + for (Document doc : found) { + client.getDatabase(dbName).getCollection(COLLECTION).deleteOne(Filters.eq("_id", doc.getObjectId("_id"))); + Document s3Info = doc.get("s3", Document.class); + DeleteObjectRequest dor = DeleteObjectRequest.builder().bucket(s3Info.getString("bucket")).key(s3Info.getString("key")).build(); + s3.deleteObject(dor); + } + } + + @Override + public void drop() throws Exception { + FindIterable found = client.getDatabase(dbName).getCollection(COLLECTION).find(); + deleteAllKeys(found); + client.getDatabase(dbName).drop(); + } + + @Override + public void deleteAllDocuments() throws Exception { + FindIterable found = client.getDatabase(dbName).getCollection(COLLECTION).find(); + deleteAllKeys(found); + client.getDatabase(dbName).getCollection(COLLECTION).drop(); + } + + private void deleteAllKeys(FindIterable found) { + List keyBatch = new ArrayList<>(1000); + for (Document doc : found) { + Document s3Info = doc.get("s3", Document.class); + keyBatch.add(s3Info.getString("key")); + if (keyBatch.size() % 1000 == 0) { + deleteKeys(keyBatch); + keyBatch.clear(); + } + } + if (keyBatch.size() > 0) { + deleteKeys(keyBatch); + keyBatch.clear(); + } + } + + private void deleteKeys(List keyBatch) { + DeleteObjectsRequest dor = DeleteObjectsRequest.builder() + .bucket(bucket) + .bypassGovernanceRetention(true) + .delete(Delete.builder() + .objects( + keyBatch.stream() + .map(s -> ObjectIdentifier.builder().key(s).build()) + .collect(Collectors.toList()) + ).build()) + .build(); + s3.deleteObjects(dor); + } + + private Document parseAssociated(AssociatedDocument doc, Long length) { + Document metadata; + if (!doc.getMetadata().isEmpty()) { + metadata = ZuliaUtil.byteArrayToMongoDocument(doc.getMetadata().toByteArray()); + } else { + metadata = new Document(); + } + + metadata.put(TIMESTAMP, doc.getTimestamp()); + metadata.put(FILE_UNIQUE_ID_KEY, String.join("-", doc.getDocumentUniqueId(), doc.getFilename())); + metadata.put(DOCUMENT_UNIQUE_ID_KEY, doc.getDocumentUniqueId()); + + Document TOC = new Document(); + TOC.put("metadata", metadata); + TOC.put(FILENAME, doc.getFilename()); + TOC.put("length", length); + TOC.put("uploadDate", Instant.now()); + return TOC; + } + + private AssociatedDocument parseTOC(Document doc) throws IOException { + AssociatedDocument.Builder aBuilder = AssociatedDocument.newBuilder(); + aBuilder.setFilename(doc.getString(FILENAME)); + + Document meta = doc.get("metadata", Document.class); + aBuilder.setDocumentUniqueId(meta.getString(DOCUMENT_UNIQUE_ID_KEY)); + aBuilder.setTimestamp(meta.getLong(TIMESTAMP)); + aBuilder.setIndexName(indexName); + meta.remove(TIMESTAMP); + meta.remove(DOCUMENT_UNIQUE_ID_KEY); + meta.remove(FILE_UNIQUE_ID_KEY); + aBuilder.setMetadata(ZuliaUtil.mongoDocumentToByteString(meta)); + + Document s3Info = doc.get("s3", Document.class); + GetObjectRequest gor = GetObjectRequest.builder() + .bucket(s3Info.getString("bucket")) + .key(s3Info.getString("key")) + .build() ; + ResponseInputStream results = s3.getObject(gor); + aBuilder.setDocument(ByteString.readFrom(results)); + + return aBuilder.build(); + } +} diff --git a/zulia-server/src/main/java/io/zulia/server/filestorage/io/S3OutputStream.java b/zulia-server/src/main/java/io/zulia/server/filestorage/io/S3OutputStream.java new file mode 100644 index 00000000..79b2e3eb --- /dev/null +++ b/zulia-server/src/main/java/io/zulia/server/filestorage/io/S3OutputStream.java @@ -0,0 +1,176 @@ +package io.zulia.server.filestorage.io; + +import java.io.ByteArrayInputStream; +import java.io.OutputStream; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; + +import software.amazon.awssdk.core.sync.RequestBody; +import software.amazon.awssdk.services.s3.S3Client; +import software.amazon.awssdk.services.s3.model.*; + +/** + * OutputStream which wraps S3Client, with support for streaming large files directly to S3 + * + */ +public class S3OutputStream extends OutputStream { + + /** Default chunk size is 10MB */ + protected static final int BUFFER_SIZE = 10000000; + + /** S3 client.*/ + private final S3Client s3Client; + + /** The bucket-name on S3 */ + private final String bucket; + + /** The key name within the bucket */ + private final String key; + + /** The temporary buffer used for storing the chunks */ + private final byte[] buf; + + /** The position in the buffer */ + private int position; + + /** The unique id for this upload */ + private String uploadId; + + /** List of parts that have been completed; so I can close the stream*/ + private final List completedParts; + + /** indicates whether the stream is still open / valid */ + private boolean open; + + /** + * Creates a new S3 OutputStream + * @param s3Client the AmazonS3 client + * @param bucket name of the bucket + * @param key path within the bucket + */ + public S3OutputStream(S3Client s3Client, String bucket, String key) { + this.s3Client = s3Client; + this.bucket = bucket; + this.key = key; + this.buf = new byte[BUFFER_SIZE]; + this.position = 0; + this.completedParts = new ArrayList<>(); + this.open = true; + } + + /** + * Write an array to the S3 output stream. + * + * @param b the byte-array to append + */ + @Override + public void write(byte[] b) { + write(b,0,b.length); + } + + /** + * Writes an array to the S3 Output Stream + * + * @param byteArray the array to write + * @param o the offset into the array + * @param l the number of bytes to write + */ + @Override + public void write(final byte[] byteArray, final int o, final int l) { + this.assertOpen(); + int ofs = o, len = l; + int size; + while (len > (size = this.buf.length - position)) { + System.arraycopy(byteArray, ofs, this.buf, this.position, size); + this.position += size; + flushBufferAndRewind(); + ofs += size; + len -= size; + } + System.arraycopy(byteArray, ofs, this.buf, this.position, len); + this.position += len; + } + + /** + * Flushes the buffer by uploading a part to S3. + */ + @Override + public synchronized void flush() { + this.assertOpen(); + } + + protected void flushBufferAndRewind() { + if (uploadId == null) { + final CreateMultipartUploadRequest cmur = CreateMultipartUploadRequest.builder().bucket(this.bucket).key(this.key).build(); + CreateMultipartUploadResponse resp = s3Client.createMultipartUpload(cmur); + this.uploadId = resp.uploadId(); + } + uploadPart(); + this.position = 0; + } + + protected synchronized void uploadPart() { + int partNumber = this.completedParts.size() + 1; + UploadPartRequest upr = UploadPartRequest.builder() + .bucket(this.bucket) + .key(this.key) + .uploadId(this.uploadId) + .partNumber(partNumber) + .build(); + UploadPartResponse resp = s3Client.uploadPart(upr, RequestBody.fromInputStream(new ByteArrayInputStream(buf, 0, this.position), this.position)); + completedParts.add(CompletedPart.builder().partNumber(partNumber).eTag(resp.eTag()).build()); + } + + @Override + public void close() { + if (this.open) { + this.open = false; + if (this.uploadId != null) { + if (this.position > 0) { + uploadPart(); + } + CompletedMultipartUpload cmu = CompletedMultipartUpload.builder().parts(this.completedParts).build(); + CompleteMultipartUploadRequest cmur = CompleteMultipartUploadRequest.builder() + .bucket(this.bucket) + .key(this.key) + .uploadId(this.uploadId) + .multipartUpload(cmu) + .build(); + + this.s3Client.completeMultipartUpload(cmur); + } + else { + PutObjectRequest req = PutObjectRequest.builder().bucket(bucket).key(key).contentLength((long) this.buf.length).build(); + s3Client.putObject(req, RequestBody.fromBytes(this.buf)); + } + } + } + + public void cancel() { + this.open = false; + if (this.uploadId != null) { + AbortMultipartUploadRequest amur = AbortMultipartUploadRequest.builder() + .bucket(bucket) + .key(key) + .uploadId(uploadId) + .build(); + this.s3Client.abortMultipartUpload(amur); + } + } + + @Override + public void write(int b) { + this.assertOpen(); + if (position >= this.buf.length) { + flushBufferAndRewind(); + } + this.buf[position++] = (byte)b; + } + + private void assertOpen() { + if (!this.open) { + throw new IllegalStateException("Closed"); + } + } +} \ No newline at end of file diff --git a/zulia-server/src/main/java/io/zulia/server/index/ZuliaIndexManager.java b/zulia-server/src/main/java/io/zulia/server/index/ZuliaIndexManager.java index a11e18b1..8fd124a2 100644 --- a/zulia-server/src/main/java/io/zulia/server/index/ZuliaIndexManager.java +++ b/zulia-server/src/main/java/io/zulia/server/index/ZuliaIndexManager.java @@ -22,6 +22,7 @@ import io.zulia.server.filestorage.DocumentStorage; import io.zulia.server.filestorage.FileDocumentStorage; import io.zulia.server.filestorage.MongoDocumentStorage; +import io.zulia.server.filestorage.S3DocumentStorage; import io.zulia.server.index.federator.ClearRequestFederator; import io.zulia.server.index.federator.CreateIndexAliasRequestFederator; import io.zulia.server.index.federator.CreateIndexRequestFederator; @@ -172,7 +173,14 @@ private void loadIndex(IndexSettings indexSettings) throws Exception { DocumentStorage documentStorage; if (zuliaConfig.isCluster()) { - documentStorage = new MongoDocumentStorage(MongoProvider.getMongoClient(), serverIndexConfig.getIndexName(), dbName, false); + switch (zuliaConfig.getClusterStorageEngine()) { + case "s3": + documentStorage = new S3DocumentStorage(MongoProvider.getMongoClient(), serverIndexConfig.getIndexName(), dbName, false, zuliaConfig.getS3()); + break; + default: + documentStorage = new MongoDocumentStorage(MongoProvider.getMongoClient(), serverIndexConfig.getIndexName(), dbName, false); + break; + }; } else { documentStorage = new FileDocumentStorage(zuliaConfig, serverIndexConfig.getIndexName());