Skip to content

Commit

Permalink
Added handling local file to the REST as opposed to forcing byte arra… (
Browse files Browse the repository at this point in the history
#65)

* Added handling local file to the REST as opposed to forcing byte array
* Updated FileStorageTest and ZuliaCmdUtil to use the file based REST.
* Remove workaround for micronaut-projects/micronaut-core#6084
* Merged in micronaut bug workaround
* bumped micronaut to the latest (3.4.3 from 3.2.3)
* Added better timeouts for okhttp rest client

Co-authored-by: Matthew Davis <[email protected]>
  • Loading branch information
payammeyer and mdavis95 authored May 9, 2022
1 parent b115ed3 commit e8f6890
Show file tree
Hide file tree
Showing 7 changed files with 101 additions and 119 deletions.
2 changes: 1 addition & 1 deletion gradle.properties
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ luceneVersion=9.0.0
mongoDriverVersion=4.5.0
grpcVersion=1.45.1
protobufVersion=3.19.2
micronautVersion=3.2.3
micronautVersion=3.4.3
okHttpVersion=4.9.3
gsonVersion=2.9.0
amazonVersion=2.17.140
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,16 @@
import io.zulia.client.result.StoreLargeAssociatedResult;
import org.bson.Document;

import java.io.File;

public class StoreLargeAssociated extends RESTCommand<StoreLargeAssociatedResult> implements ShardRoutableCommand {

private String uniqueId;
private String fileName;
private String indexName;
private final String uniqueId;
private final String fileName;
private final String indexName;
private Document meta;
private byte[] bytes;
private File file;

public StoreLargeAssociated(String uniqueId, String indexName, String fileName, byte[] bytes) {
this.uniqueId = uniqueId;
Expand All @@ -21,6 +24,13 @@ public StoreLargeAssociated(String uniqueId, String indexName, String fileName,
this.bytes = bytes;
}

public StoreLargeAssociated(String uniqueId, String indexName, String fileName, File file) {
this.uniqueId = uniqueId;
this.fileName = fileName;
this.indexName = indexName;
this.file = file;
}

public Document getMeta() {
return meta;
}
Expand All @@ -45,6 +55,9 @@ public StoreLargeAssociatedResult execute(ZuliaRESTClient zuliaRESTClient) throw
if (bytes != null) {
zuliaRESTClient.storeAssociated(uniqueId, indexName, fileName, meta, bytes);
}
else if (file != null) {
zuliaRESTClient.storeAssociated(uniqueId, indexName, fileName, meta, file);
}
else {
throw new Exception("File or input stream must be set");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;
import java.util.zip.ZipEntry;
Expand All @@ -35,30 +36,38 @@ public class ZuliaRESTClient {
public ZuliaRESTClient(String server, int restPort) {
url = "http://" + server + ":" + restPort;

client = new OkHttpClient().newBuilder().build();
client = new OkHttpClient().newBuilder().readTimeout(120, TimeUnit.SECONDS).connectTimeout(120, TimeUnit.SECONDS).callTimeout(120, TimeUnit.SECONDS)
.build();
LOG.info("Created OkHttp client for url: " + url);
}

public void storeAssociated(String uniqueId, String indexName, String fileName, Document metadata, byte[] bytes) throws Exception {

try {

RequestBody body;
if (metadata != null) {
body = new MultipartBody.Builder().setType(MultipartBody.FORM).addFormDataPart("id", uniqueId).addFormDataPart("fileName", fileName)
.addFormDataPart("indexName", indexName).addFormDataPart("metaJson", metadata.toJson())
.addFormDataPart("file", fileName, RequestBody.create(bytes, MediaType.parse(ZuliaUtil.guessExtension(bytes)))).build();
RequestBody body = getRequestBody(uniqueId, indexName, fileName, metadata, null, bytes);
Request request = new Request.Builder().url(url + ZuliaConstants.ASSOCIATED_DOCUMENTS_URL).method("POST", body).build();
Response response = client.newCall(request).execute();
response.close();
}
catch (Exception e) {
if (e.getMessage().startsWith("Out of size:")) {
LOG.log(Level.WARNING, "Failed to store file <" + fileName + "> due to mismatch size.");
}
else {
body = new MultipartBody.Builder().setType(MultipartBody.FORM).addFormDataPart("id", uniqueId).addFormDataPart("fileName", fileName)
.addFormDataPart("indexName", indexName)
.addFormDataPart("file", fileName, RequestBody.create(bytes, MediaType.parse(ZuliaUtil.guessExtension(bytes)))).build();
LOG.log(Level.SEVERE, "Failed to store file <" + fileName + ">", e);
throw e;
}
}

}

public void storeAssociated(String uniqueId, String indexName, String fileName, Document metadata, File file) throws Exception {

try {
RequestBody body = getRequestBody(uniqueId, indexName, fileName, metadata, file, null);
Request request = new Request.Builder().url(url + ZuliaConstants.ASSOCIATED_DOCUMENTS_URL).method("POST", body).build();
Response response = client.newCall(request).execute();
response.close();

}
catch (Exception e) {
if (e.getMessage().startsWith("Out of size:")) {
Expand Down Expand Up @@ -162,4 +171,30 @@ private HashMap<String, Object> createParameters(String uniqueId, String indexNa
return parameters;
}

private RequestBody getRequestBody(String uniqueId, String indexName, String fileName, Document metadata, File file, byte[] bytes) {

RequestBody body;
RequestBody uploadFileRequest;

if (file != null) {
uploadFileRequest = RequestBody.create(file, MediaType.parse(ZuliaUtil.guessExtension(file)));
}
else {
uploadFileRequest = RequestBody.create(bytes, MediaType.parse(ZuliaUtil.guessExtension(bytes)));
}

if (metadata != null) {
body = new MultipartBody.Builder().setType(MultipartBody.FORM).addFormDataPart("id", uniqueId).addFormDataPart("fileName", fileName)
.addFormDataPart("indexName", indexName).addFormDataPart("metaJson", metadata.toJson()).addFormDataPart("file", fileName, uploadFileRequest)
.build();
}
else {
body = new MultipartBody.Builder().setType(MultipartBody.FORM).addFormDataPart("id", uniqueId).addFormDataPart("fileName", fileName)
.addFormDataPart("indexName", indexName).addFormDataPart("file", fileName, uploadFileRequest).build();
}

return body;

}

}
21 changes: 19 additions & 2 deletions zulia-common/src/main/java/io/zulia/util/ZuliaUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import org.bson.codecs.configuration.CodecRegistry;
import org.bson.io.BasicOutputBuffer;

import java.io.File;
import java.nio.ByteBuffer;
import java.util.Collection;
import java.util.LinkedHashSet;
Expand Down Expand Up @@ -170,10 +171,26 @@ public static CodecRegistry getPojoCodecRegistry() {
return pojoCodecRegistry;
}

public static String guessExtension(File file) {
ContentInfoUtil util = new ContentInfoUtil();
try {
ContentInfo info = util.findMatch(file);
return info.getContentType().getMimeType();
}
catch (Exception e) {
return "";
}
}

public static String guessExtension(byte[] bytes) {
ContentInfoUtil util = new ContentInfoUtil();
ContentInfo info = util.findMatch(bytes);
return info.getMimeType();
try {
ContentInfo info = util.findMatch(bytes);
return info.getContentType().getMimeType();
}
catch (Exception e) {
return "";
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
import io.zulia.client.pool.ZuliaWorkPool;
import io.zulia.client.result.AssociatedResult;
import io.zulia.client.result.FetchResult;
import io.zulia.doc.AssociatedBuilder;
import io.zulia.doc.ResultDocBuilder;
import org.apache.commons.compress.archivers.zip.ZipArchiveEntry;
import org.apache.commons.compress.archivers.zip.ZipArchiveInputStream;
Expand Down Expand Up @@ -154,15 +153,16 @@ public static void index(String inputDir, String recordsFilename, String idField
}

Document meta = null;
byte[] associatedBytes = new byte[0];

String filename = null;
File file = null;
for (Path filePath : filesPaths) {
try {
if (filePath.toFile().getName().endsWith("_metadata.json")) {
meta = Document.parse(Files.readString(filePath));
}
else {
associatedBytes = Files.readAllBytes(filePath);
file = filePath.toFile();
filename = filePath.toFile().getName();
}
}
Expand All @@ -173,15 +173,15 @@ public static void index(String inputDir, String recordsFilename, String idField

if (skipExistingFiles) {
if (!fileExists(workPool, id, filename, index)) {
storeAssociatedDoc(index, workPool, id, filename, meta, associatedBytes);
storeAssociatedDoc(index, workPool, id, filename, meta, file);
}
}
else {
storeAssociatedDoc(index, workPool, id, filename, meta, associatedBytes);
storeAssociatedDoc(index, workPool, id, filename, meta, file);
}
}
catch (Throwable t) {
LOG.log(Level.SEVERE, "Could not list the individual files for dir <" + path.getFileName() + ">");
LOG.log(Level.SEVERE, "Could not list the individual files for dir <" + path.getFileName() + ">", t);
}
}
else {
Expand All @@ -192,6 +192,7 @@ public static void index(String inputDir, String recordsFilename, String idField
// clean up temp work
try (Stream<Path> walk = Files.walk(destDir.toPath())) {
walk.sorted(Comparator.reverseOrder()).map(Path::toFile).forEach(File::delete);
destDir.delete();
}

}
Expand Down Expand Up @@ -252,16 +253,8 @@ private static void decompressZipEntryToDisk(File destDir, byte[] buffer, ZipArc
}
}

private static void storeAssociatedDoc(String index, ZuliaWorkPool workPool, String id, String filename, Document meta, byte[] associatedBytes)
throws Exception {
if (associatedBytes.length > 32 * 1024 * 1024) {
workPool.storeLargeAssociated(new StoreLargeAssociated(id, index, filename, associatedBytes).setMeta(meta));
}
else {
Store associatedDocStore = new Store(id, index);
associatedDocStore.addAssociatedDocument(AssociatedBuilder.newBuilder().setDocument(associatedBytes).setMetadata(meta).setFilename(filename));
workPool.store(associatedDocStore);
}
private static void storeAssociatedDoc(String index, ZuliaWorkPool workPool, String id, String filename, Document meta, File file) throws Exception {
workPool.storeLargeAssociated(new StoreLargeAssociated(id, index, filename, file).setMeta(meta));
}

private static boolean fileExists(ZuliaWorkPool zuliaWorkPool, String id, String fileName, String indexName) throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,21 +12,14 @@
import io.micronaut.http.annotation.Post;
import io.micronaut.http.annotation.Produces;
import io.micronaut.http.annotation.QueryValue;
import io.micronaut.http.multipart.MultipartException;
import io.micronaut.http.multipart.PartData;
import io.micronaut.http.multipart.StreamingFileUpload;
import io.micronaut.http.server.types.files.StreamedFile;
import io.micronaut.scheduling.TaskExecutors;
import io.zulia.ZuliaConstants;
import io.zulia.message.ZuliaBase;
import io.zulia.server.index.ZuliaIndexManager;
import io.zulia.server.util.ZuliaNodeProvider;
import jakarta.inject.Inject;
import jakarta.inject.Named;
import org.bson.Document;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
Expand All @@ -37,7 +30,6 @@
import java.io.OutputStream;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.logging.Level;
import java.util.logging.Logger;

Expand All @@ -50,68 +42,6 @@ public class AssociatedController {

private final static Logger LOG = Logger.getLogger(AssociatedController.class.getSimpleName());

@Inject
@Named(TaskExecutors.IO)
ExecutorService ioExecutor;

//work around for
//https://github.com/micronaut-projects/micronaut-core/issues/6084
public static Publisher<Boolean> transferToStream(ExecutorService ioExecutor, StreamingFileUpload fileUpload, OutputStream outputStream) {

return Mono.<Boolean>create(emitter ->

Flux.from(fileUpload).subscribeOn(Schedulers.fromExecutorService(ioExecutor)).subscribe(new Subscriber<>() {
Subscription subscription;

@Override
public void onSubscribe(Subscription s) {
subscription = s;
subscription.request(1);
}

@Override
public void onNext(PartData o) {
try {
outputStream.write(o.getBytes());
subscription.request(1);
}
catch (IOException e) {
handleError(e);
}
}

@Override
public void onError(Throwable t) {
emitter.error(t);
try {
if (outputStream != null) {
outputStream.close();
}
}
catch (IOException e) {
System.err.println("Failed to close file stream : " + fileUpload.getName());
}
}

@Override
public void onComplete() {
try {
outputStream.close();
emitter.success(true);
}
catch (IOException e) {
System.err.println("Failed to close file stream : " + fileUpload.getName());
emitter.success(false);
}
}

private void handleError(Throwable t) {
subscription.cancel();
onError(new MultipartException("Error transferring file: " + fileUpload.getName(), t));
}
})).flux();

}

@Get("/metadata")
@Produces(MediaType.APPLICATION_OCTET_STREAM)
Expand Down Expand Up @@ -212,8 +142,9 @@ public Publisher<HttpResponse<?>> post(StreamingFileUpload file, Map<String, Obj
OutputStream associatedDocumentOutputStream;
try {
associatedDocumentOutputStream = indexManager.getAssociatedDocumentOutputStream(indexName, id, fileName, metaDoc);
Publisher<Boolean> uploadPublisher = transferToStream(ioExecutor, file, associatedDocumentOutputStream);
return Flux.from(uploadPublisher).map(success -> {

Publisher<Boolean> uploadPublisher = file.transferTo(associatedDocumentOutputStream);
return Flux.from(uploadPublisher).publishOn(Schedulers.boundedElastic()).map(success -> {
if (success) {
try {
associatedDocumentOutputStream.close();
Expand Down
Loading

0 comments on commit e8f6890

Please sign in to comment.