From e8f68903f5d502f32285ebb234941fafbe66462f Mon Sep 17 00:00:00 2001 From: Payam Meyer Date: Mon, 9 May 2022 13:42:10 -0400 Subject: [PATCH] =?UTF-8?q?Added=20handling=20local=20file=20to=20the=20RE?= =?UTF-8?q?ST=20as=20opposed=20to=20forcing=20byte=20arra=E2=80=A6=20(#65)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Added handling local file to the REST as opposed to forcing byte array * Updated FileStorageTest and ZuliaCmdUtil to use the file based REST. * Remove workaround for https://github.com/micronaut-projects/micronaut-core/issues/6084 * Merged in micronaut bug workaround * bumped micronaut to the latest (3.4.3 from 3.2.3) * Added better timeouts for okhttp rest client Co-authored-by: Matthew Davis --- gradle.properties | 2 +- .../client/command/StoreLargeAssociated.java | 19 ++++- .../io/zulia/client/rest/ZuliaRESTClient.java | 57 +++++++++++--- .../main/java/io/zulia/util/ZuliaUtil.java | 21 +++++- .../zulia/server/cmd/common/ZuliaCmdUtil.java | 25 +++---- .../controllers/AssociatedController.java | 75 +------------------ .../server/test/util/FileStorageTest.java | 21 ++---- 7 files changed, 101 insertions(+), 119 deletions(-) diff --git a/gradle.properties b/gradle.properties index 1151d665..ad34c27f 100644 --- a/gradle.properties +++ b/gradle.properties @@ -3,7 +3,7 @@ luceneVersion=9.0.0 mongoDriverVersion=4.5.0 grpcVersion=1.45.1 protobufVersion=3.19.2 -micronautVersion=3.2.3 +micronautVersion=3.4.3 okHttpVersion=4.9.3 gsonVersion=2.9.0 amazonVersion=2.17.140 \ No newline at end of file diff --git a/zulia-client/src/main/java/io/zulia/client/command/StoreLargeAssociated.java b/zulia-client/src/main/java/io/zulia/client/command/StoreLargeAssociated.java index 9bf85f72..7c9cfdad 100644 --- a/zulia-client/src/main/java/io/zulia/client/command/StoreLargeAssociated.java +++ b/zulia-client/src/main/java/io/zulia/client/command/StoreLargeAssociated.java @@ -6,13 +6,16 @@ import io.zulia.client.result.StoreLargeAssociatedResult; import org.bson.Document; +import java.io.File; + public class StoreLargeAssociated extends RESTCommand implements ShardRoutableCommand { - private String uniqueId; - private String fileName; - private String indexName; + private final String uniqueId; + private final String fileName; + private final String indexName; private Document meta; private byte[] bytes; + private File file; public StoreLargeAssociated(String uniqueId, String indexName, String fileName, byte[] bytes) { this.uniqueId = uniqueId; @@ -21,6 +24,13 @@ public StoreLargeAssociated(String uniqueId, String indexName, String fileName, this.bytes = bytes; } + public StoreLargeAssociated(String uniqueId, String indexName, String fileName, File file) { + this.uniqueId = uniqueId; + this.fileName = fileName; + this.indexName = indexName; + this.file = file; + } + public Document getMeta() { return meta; } @@ -45,6 +55,9 @@ public StoreLargeAssociatedResult execute(ZuliaRESTClient zuliaRESTClient) throw if (bytes != null) { zuliaRESTClient.storeAssociated(uniqueId, indexName, fileName, meta, bytes); } + else if (file != null) { + zuliaRESTClient.storeAssociated(uniqueId, indexName, fileName, meta, file); + } else { throw new Exception("File or input stream must be set"); } diff --git a/zulia-client/src/main/java/io/zulia/client/rest/ZuliaRESTClient.java b/zulia-client/src/main/java/io/zulia/client/rest/ZuliaRESTClient.java index 9b34d953..643ef823 100644 --- a/zulia-client/src/main/java/io/zulia/client/rest/ZuliaRESTClient.java +++ b/zulia-client/src/main/java/io/zulia/client/rest/ZuliaRESTClient.java @@ -21,6 +21,7 @@ import java.nio.charset.StandardCharsets; import java.util.HashMap; import java.util.Objects; +import java.util.concurrent.TimeUnit; import java.util.logging.Level; import java.util.logging.Logger; import java.util.zip.ZipEntry; @@ -35,30 +36,38 @@ public class ZuliaRESTClient { public ZuliaRESTClient(String server, int restPort) { url = "http://" + server + ":" + restPort; - client = new OkHttpClient().newBuilder().build(); + client = new OkHttpClient().newBuilder().readTimeout(120, TimeUnit.SECONDS).connectTimeout(120, TimeUnit.SECONDS).callTimeout(120, TimeUnit.SECONDS) + .build(); LOG.info("Created OkHttp client for url: " + url); } public void storeAssociated(String uniqueId, String indexName, String fileName, Document metadata, byte[] bytes) throws Exception { try { - - RequestBody body; - if (metadata != null) { - body = new MultipartBody.Builder().setType(MultipartBody.FORM).addFormDataPart("id", uniqueId).addFormDataPart("fileName", fileName) - .addFormDataPart("indexName", indexName).addFormDataPart("metaJson", metadata.toJson()) - .addFormDataPart("file", fileName, RequestBody.create(bytes, MediaType.parse(ZuliaUtil.guessExtension(bytes)))).build(); + RequestBody body = getRequestBody(uniqueId, indexName, fileName, metadata, null, bytes); + Request request = new Request.Builder().url(url + ZuliaConstants.ASSOCIATED_DOCUMENTS_URL).method("POST", body).build(); + Response response = client.newCall(request).execute(); + response.close(); + } + catch (Exception e) { + if (e.getMessage().startsWith("Out of size:")) { + LOG.log(Level.WARNING, "Failed to store file <" + fileName + "> due to mismatch size."); } else { - body = new MultipartBody.Builder().setType(MultipartBody.FORM).addFormDataPart("id", uniqueId).addFormDataPart("fileName", fileName) - .addFormDataPart("indexName", indexName) - .addFormDataPart("file", fileName, RequestBody.create(bytes, MediaType.parse(ZuliaUtil.guessExtension(bytes)))).build(); + LOG.log(Level.SEVERE, "Failed to store file <" + fileName + ">", e); + throw e; } + } + + } + + public void storeAssociated(String uniqueId, String indexName, String fileName, Document metadata, File file) throws Exception { + try { + RequestBody body = getRequestBody(uniqueId, indexName, fileName, metadata, file, null); Request request = new Request.Builder().url(url + ZuliaConstants.ASSOCIATED_DOCUMENTS_URL).method("POST", body).build(); Response response = client.newCall(request).execute(); response.close(); - } catch (Exception e) { if (e.getMessage().startsWith("Out of size:")) { @@ -162,4 +171,30 @@ private HashMap createParameters(String uniqueId, String indexNa return parameters; } + private RequestBody getRequestBody(String uniqueId, String indexName, String fileName, Document metadata, File file, byte[] bytes) { + + RequestBody body; + RequestBody uploadFileRequest; + + if (file != null) { + uploadFileRequest = RequestBody.create(file, MediaType.parse(ZuliaUtil.guessExtension(file))); + } + else { + uploadFileRequest = RequestBody.create(bytes, MediaType.parse(ZuliaUtil.guessExtension(bytes))); + } + + if (metadata != null) { + body = new MultipartBody.Builder().setType(MultipartBody.FORM).addFormDataPart("id", uniqueId).addFormDataPart("fileName", fileName) + .addFormDataPart("indexName", indexName).addFormDataPart("metaJson", metadata.toJson()).addFormDataPart("file", fileName, uploadFileRequest) + .build(); + } + else { + body = new MultipartBody.Builder().setType(MultipartBody.FORM).addFormDataPart("id", uniqueId).addFormDataPart("fileName", fileName) + .addFormDataPart("indexName", indexName).addFormDataPart("file", fileName, uploadFileRequest).build(); + } + + return body; + + } + } diff --git a/zulia-common/src/main/java/io/zulia/util/ZuliaUtil.java b/zulia-common/src/main/java/io/zulia/util/ZuliaUtil.java index 1e37bfe3..d4867fd7 100644 --- a/zulia-common/src/main/java/io/zulia/util/ZuliaUtil.java +++ b/zulia-common/src/main/java/io/zulia/util/ZuliaUtil.java @@ -12,6 +12,7 @@ import org.bson.codecs.configuration.CodecRegistry; import org.bson.io.BasicOutputBuffer; +import java.io.File; import java.nio.ByteBuffer; import java.util.Collection; import java.util.LinkedHashSet; @@ -170,10 +171,26 @@ public static CodecRegistry getPojoCodecRegistry() { return pojoCodecRegistry; } + public static String guessExtension(File file) { + ContentInfoUtil util = new ContentInfoUtil(); + try { + ContentInfo info = util.findMatch(file); + return info.getContentType().getMimeType(); + } + catch (Exception e) { + return ""; + } + } + public static String guessExtension(byte[] bytes) { ContentInfoUtil util = new ContentInfoUtil(); - ContentInfo info = util.findMatch(bytes); - return info.getMimeType(); + try { + ContentInfo info = util.findMatch(bytes); + return info.getContentType().getMimeType(); + } + catch (Exception e) { + return ""; + } } } diff --git a/zulia-server/src/main/java/io/zulia/server/cmd/common/ZuliaCmdUtil.java b/zulia-server/src/main/java/io/zulia/server/cmd/common/ZuliaCmdUtil.java index 0d47591c..8ea3b192 100644 --- a/zulia-server/src/main/java/io/zulia/server/cmd/common/ZuliaCmdUtil.java +++ b/zulia-server/src/main/java/io/zulia/server/cmd/common/ZuliaCmdUtil.java @@ -12,7 +12,6 @@ import io.zulia.client.pool.ZuliaWorkPool; import io.zulia.client.result.AssociatedResult; import io.zulia.client.result.FetchResult; -import io.zulia.doc.AssociatedBuilder; import io.zulia.doc.ResultDocBuilder; import org.apache.commons.compress.archivers.zip.ZipArchiveEntry; import org.apache.commons.compress.archivers.zip.ZipArchiveInputStream; @@ -154,15 +153,16 @@ public static void index(String inputDir, String recordsFilename, String idField } Document meta = null; - byte[] associatedBytes = new byte[0]; + String filename = null; + File file = null; for (Path filePath : filesPaths) { try { if (filePath.toFile().getName().endsWith("_metadata.json")) { meta = Document.parse(Files.readString(filePath)); } else { - associatedBytes = Files.readAllBytes(filePath); + file = filePath.toFile(); filename = filePath.toFile().getName(); } } @@ -173,15 +173,15 @@ public static void index(String inputDir, String recordsFilename, String idField if (skipExistingFiles) { if (!fileExists(workPool, id, filename, index)) { - storeAssociatedDoc(index, workPool, id, filename, meta, associatedBytes); + storeAssociatedDoc(index, workPool, id, filename, meta, file); } } else { - storeAssociatedDoc(index, workPool, id, filename, meta, associatedBytes); + storeAssociatedDoc(index, workPool, id, filename, meta, file); } } catch (Throwable t) { - LOG.log(Level.SEVERE, "Could not list the individual files for dir <" + path.getFileName() + ">"); + LOG.log(Level.SEVERE, "Could not list the individual files for dir <" + path.getFileName() + ">", t); } } else { @@ -192,6 +192,7 @@ public static void index(String inputDir, String recordsFilename, String idField // clean up temp work try (Stream walk = Files.walk(destDir.toPath())) { walk.sorted(Comparator.reverseOrder()).map(Path::toFile).forEach(File::delete); + destDir.delete(); } } @@ -252,16 +253,8 @@ private static void decompressZipEntryToDisk(File destDir, byte[] buffer, ZipArc } } - private static void storeAssociatedDoc(String index, ZuliaWorkPool workPool, String id, String filename, Document meta, byte[] associatedBytes) - throws Exception { - if (associatedBytes.length > 32 * 1024 * 1024) { - workPool.storeLargeAssociated(new StoreLargeAssociated(id, index, filename, associatedBytes).setMeta(meta)); - } - else { - Store associatedDocStore = new Store(id, index); - associatedDocStore.addAssociatedDocument(AssociatedBuilder.newBuilder().setDocument(associatedBytes).setMetadata(meta).setFilename(filename)); - workPool.store(associatedDocStore); - } + private static void storeAssociatedDoc(String index, ZuliaWorkPool workPool, String id, String filename, Document meta, File file) throws Exception { + workPool.storeLargeAssociated(new StoreLargeAssociated(id, index, filename, file).setMeta(meta)); } private static boolean fileExists(ZuliaWorkPool zuliaWorkPool, String id, String fileName, String indexName) throws Exception { diff --git a/zulia-server/src/main/java/io/zulia/server/rest/controllers/AssociatedController.java b/zulia-server/src/main/java/io/zulia/server/rest/controllers/AssociatedController.java index bb1adb86..7f7d391d 100644 --- a/zulia-server/src/main/java/io/zulia/server/rest/controllers/AssociatedController.java +++ b/zulia-server/src/main/java/io/zulia/server/rest/controllers/AssociatedController.java @@ -12,21 +12,14 @@ import io.micronaut.http.annotation.Post; import io.micronaut.http.annotation.Produces; import io.micronaut.http.annotation.QueryValue; -import io.micronaut.http.multipart.MultipartException; -import io.micronaut.http.multipart.PartData; import io.micronaut.http.multipart.StreamingFileUpload; import io.micronaut.http.server.types.files.StreamedFile; -import io.micronaut.scheduling.TaskExecutors; import io.zulia.ZuliaConstants; import io.zulia.message.ZuliaBase; import io.zulia.server.index.ZuliaIndexManager; import io.zulia.server.util.ZuliaNodeProvider; -import jakarta.inject.Inject; -import jakarta.inject.Named; import org.bson.Document; import org.reactivestreams.Publisher; -import org.reactivestreams.Subscriber; -import org.reactivestreams.Subscription; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.core.scheduler.Schedulers; @@ -37,7 +30,6 @@ import java.io.OutputStream; import java.util.List; import java.util.Map; -import java.util.concurrent.ExecutorService; import java.util.logging.Level; import java.util.logging.Logger; @@ -50,68 +42,6 @@ public class AssociatedController { private final static Logger LOG = Logger.getLogger(AssociatedController.class.getSimpleName()); - @Inject - @Named(TaskExecutors.IO) - ExecutorService ioExecutor; - - //work around for - //https://github.com/micronaut-projects/micronaut-core/issues/6084 - public static Publisher transferToStream(ExecutorService ioExecutor, StreamingFileUpload fileUpload, OutputStream outputStream) { - - return Mono.create(emitter -> - - Flux.from(fileUpload).subscribeOn(Schedulers.fromExecutorService(ioExecutor)).subscribe(new Subscriber<>() { - Subscription subscription; - - @Override - public void onSubscribe(Subscription s) { - subscription = s; - subscription.request(1); - } - - @Override - public void onNext(PartData o) { - try { - outputStream.write(o.getBytes()); - subscription.request(1); - } - catch (IOException e) { - handleError(e); - } - } - - @Override - public void onError(Throwable t) { - emitter.error(t); - try { - if (outputStream != null) { - outputStream.close(); - } - } - catch (IOException e) { - System.err.println("Failed to close file stream : " + fileUpload.getName()); - } - } - - @Override - public void onComplete() { - try { - outputStream.close(); - emitter.success(true); - } - catch (IOException e) { - System.err.println("Failed to close file stream : " + fileUpload.getName()); - emitter.success(false); - } - } - - private void handleError(Throwable t) { - subscription.cancel(); - onError(new MultipartException("Error transferring file: " + fileUpload.getName(), t)); - } - })).flux(); - - } @Get("/metadata") @Produces(MediaType.APPLICATION_OCTET_STREAM) @@ -212,8 +142,9 @@ public Publisher> post(StreamingFileUpload file, Map uploadPublisher = transferToStream(ioExecutor, file, associatedDocumentOutputStream); - return Flux.from(uploadPublisher).map(success -> { + + Publisher uploadPublisher = file.transferTo(associatedDocumentOutputStream); + return Flux.from(uploadPublisher).publishOn(Schedulers.boundedElastic()).map(success -> { if (success) { try { associatedDocumentOutputStream.close(); diff --git a/zulia-server/src/test/java/io/zulia/server/test/util/FileStorageTest.java b/zulia-server/src/test/java/io/zulia/server/test/util/FileStorageTest.java index 9bf2d26e..4112ad91 100644 --- a/zulia-server/src/test/java/io/zulia/server/test/util/FileStorageTest.java +++ b/zulia-server/src/test/java/io/zulia/server/test/util/FileStorageTest.java @@ -10,12 +10,12 @@ import io.zulia.client.config.ZuliaPoolConfig; import io.zulia.client.pool.ZuliaWorkPool; import io.zulia.client.result.SearchResult; -import io.zulia.doc.AssociatedBuilder; import io.zulia.doc.ResultDocBuilder; import io.zulia.fields.FieldConfigBuilder; import io.zulia.message.ZuliaIndex; import org.bson.Document; +import java.io.File; import java.nio.file.Files; import java.nio.file.Paths; import java.util.Random; @@ -35,7 +35,7 @@ public static void main(String[] args) throws Exception { fileStorageTest.createIndex(); // This needs to be changed to a local dir with PDF files. - String pdfsDir = "/data/chemrxiv"; + String pdfsDir = "/path/to/dir/of/pdfs"; System.out.println("Indexing documents."); AtomicInteger idCounter = new AtomicInteger(1); @@ -50,7 +50,7 @@ public static void main(String[] args) throws Exception { Document meta = new Document(); meta.put("extension", "pdf"); meta.put("contentType", "application/pdf"); - fileStorageTest.storeFile(doc.getString("id"), file.toFile().getName(), meta, Files.readAllBytes(file)); + fileStorageTest.storeFile(doc.getString("id"), file.toFile().getName(), meta, file.toFile()); } catch (Exception e) { e.printStackTrace(); @@ -126,20 +126,13 @@ private void createIndex() throws Exception { zuliaWorkPool.createIndex(createOrUpdateIndex); } - private void storeFile(String documentId, String filename, Document meta, byte[] content) throws Exception { + private void storeFile(String documentId, String filename, Document meta, File content) throws Exception { zuliaWorkPool.delete(new DeleteAssociated(documentId, TEST_INDEX, filename)); - if (content.length > 32 * 1024 * 1024) { - StoreLargeAssociated storeLargeAssociated = new StoreLargeAssociated(documentId, TEST_INDEX, filename, content); - storeLargeAssociated.setMeta(meta); - zuliaWorkPool.storeLargeAssociated(storeLargeAssociated); - } - else { - Store associatedDocStore = new Store(documentId, TEST_INDEX); - associatedDocStore.addAssociatedDocument(AssociatedBuilder.newBuilder().setDocument(content).setFilename(filename).setMetadata(meta)); - zuliaWorkPool.store(associatedDocStore); - } + StoreLargeAssociated storeLargeAssociated = new StoreLargeAssociated(documentId, TEST_INDEX, filename, content); + storeLargeAssociated.setMeta(meta); + zuliaWorkPool.storeLargeAssociated(storeLargeAssociated); }