Skip to content

Commit

Permalink
Merge pull request #50 from zuliaio/okhttp
Browse files Browse the repository at this point in the history
Removed micronaut client dependency and introduced okHttp. Bumped mic…
  • Loading branch information
payammeyer authored Dec 29, 2021
2 parents 5b2c3e7 + fd6802e commit 9e23b9a
Show file tree
Hide file tree
Showing 6 changed files with 69 additions and 142 deletions.
4 changes: 3 additions & 1 deletion gradle.properties
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,6 @@ luceneVersion=8.9.0
mongoDriverVersion=4.4.0
grpcVersion=1.40.1
protobufVersion=3.17.3
micronautVersion=3.0.0
micronautVersion=3.2.3
okHttpVersion=4.9.3
gsonVersion=2.8.9
8 changes: 4 additions & 4 deletions zulia-client/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,12 @@ plugins {

description = "Zulia Client"

val micronautVersion: String by project
val okHttpVersion: String by project
val gsonVersion: String by project

dependencies {
api(project(":zulia-common"))
implementation("com.google.code.gson:gson:2.8.6")
implementation("io.micronaut:micronaut-http-client:$micronautVersion") // forcing to working 3.0 until we switch to native java http client.
implementation("io.micronaut.reactor:micronaut-reactor-http-client:2.0.0")
implementation("com.google.code.gson:gson:$gsonVersion")
implementation("com.squareup.okhttp3:okhttp:$okHttpVersion")
}

Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
package io.zulia.client.command.builder;

import io.zulia.message.ZuliaQuery;
import org.jetbrains.annotations.NotNull;

import javax.validation.constraints.NotNull;
import java.util.List;

public abstract class StandardQuery implements QueryBuilder {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,10 +124,7 @@ public void updateNodes(List<Node> nodes) {
connection.close();
}

ZuliaRESTClient restClient = zuliaRestPoolMap.remove(removedNode);
if (restClient != null) {
restClient.close();
}
zuliaRestPoolMap.remove(removedNode);

}
catch (Exception e) {
Expand Down Expand Up @@ -271,9 +268,7 @@ public void close() {
for (GenericObjectPool<ZuliaConnection> pool : zuliaConnectionPoolMap.values()) {
pool.close();
}
for (ZuliaRESTClient pool : zuliaRestPoolMap.values()) {
pool.close();
}

isClosed = true;
}

Expand Down

This file was deleted.

123 changes: 59 additions & 64 deletions zulia-client/src/main/java/io/zulia/client/rest/ZuliaRESTClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,22 @@
import com.google.gson.JsonArray;
import com.google.gson.JsonObject;
import com.google.gson.JsonParser;
import io.micronaut.http.HttpRequest;
import io.micronaut.http.HttpResponse;
import io.micronaut.http.MediaType;
import io.micronaut.http.client.multipart.MultipartBody;
import io.zulia.ZuliaConstants;
import io.zulia.util.HttpHelper;
import io.zulia.util.ZuliaUtil;
import okhttp3.MediaType;
import okhttp3.MultipartBody;
import okhttp3.OkHttpClient;
import okhttp3.Request;
import okhttp3.RequestBody;
import okhttp3.Response;
import okio.BufferedSink;
import okio.Okio;
import org.bson.Document;
import reactor.core.publisher.Flux;

import java.io.File;
import java.io.IOException;
import java.io.OutputStream;
import java.net.URLConnection;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Objects;
Expand All @@ -24,38 +27,41 @@
import java.util.zip.ZipEntry;
import java.util.zip.ZipOutputStream;

import static io.micronaut.http.HttpRequest.GET;

public class ZuliaRESTClient {

private static final Logger LOG = Logger.getLogger(ZuliaRESTClient.class.getName());
private final String url;
private MicronautHttpClient client;
private final OkHttpClient client;

public ZuliaRESTClient(String server, int restPort) {
url = "http://" + server + ":" + restPort;

client = MicronautHttpClient.createClient(url);

client = new OkHttpClient().newBuilder().build();
LOG.info("Created OkHttp client for url: " + url);
}

public void storeAssociated(String uniqueId, String indexName, String fileName, Document metadata, byte[] bytes) throws Exception {

MultipartBody body;
if (metadata != null) {
body = MultipartBody.builder().addPart("id", uniqueId).addPart("indexName", indexName).addPart("fileName", fileName)
.addPart("metaJson", metadata.toJson()).addPart("file", fileName, MediaType.forFilename(fileName), bytes).build();
}
else {
body = MultipartBody.builder().addPart("id", uniqueId).addPart("indexName", indexName).addPart("fileName", fileName)
.addPart("file", fileName, MediaType.forFilename(fileName), bytes).build();
}

try {
Flux<HttpResponse<String>> post = Flux.from(client.exchange(
HttpRequest.POST(ZuliaConstants.ASSOCIATED_DOCUMENTS_URL, body).contentType(MediaType.MULTIPART_FORM_DATA).accept(MediaType.TEXT_PLAIN),
String.class));
post.blockFirst();

RequestBody body;
if (metadata != null) {
body = new MultipartBody.Builder().setType(MultipartBody.FORM).addFormDataPart("id", uniqueId).addFormDataPart("fileName", fileName)
.addFormDataPart("indexName", indexName).addFormDataPart("metaJson", metadata.toJson())
.addFormDataPart("file", fileName, RequestBody.create(bytes, MediaType.parse(URLConnection.guessContentTypeFromName(fileName))))
.build();
}
else {
body = new MultipartBody.Builder().setType(MultipartBody.FORM).addFormDataPart("id", uniqueId).addFormDataPart("fileName", fileName)
.addFormDataPart("indexName", indexName)
.addFormDataPart("file", fileName, RequestBody.create(bytes, MediaType.parse(URLConnection.guessContentTypeFromName(fileName))))
.build();
}

Request request = new Request.Builder().url(url + ZuliaConstants.ASSOCIATED_DOCUMENTS_URL).method("POST", body).build();
Response response = client.newCall(request).execute();
response.close();

}
catch (Exception e) {
if (e.getMessage().startsWith("Out of size:")) {
Expand All @@ -72,59 +78,52 @@ public void storeAssociated(String uniqueId, String indexName, String fileName,
public void fetchAssociated(String uniqueId, String indexName, String fileName, OutputStream destination, boolean closeStream) throws Exception {

try {

Flux<HttpResponse<byte[]>> data = Flux.from(client.exchange(
GET(ZuliaConstants.ASSOCIATED_DOCUMENTS_URL + "?" + HttpHelper.createQuery(createParameters(uniqueId, indexName, fileName))),
byte[].class));

data.doOnNext(httpResponse -> {
try {
destination.write(Objects.requireNonNull(httpResponse.body(), "No body for file"));
}
catch (IOException e) {
throw new RuntimeException("Failed to fetch <" + fileName + "> for id <" + uniqueId + "> for index <" + indexName + ">: " + e.getMessage());
}
}).blockLast();

OkHttpClient client = new OkHttpClient().newBuilder().build();
Request request = new Request.Builder().url(
url + ZuliaConstants.ASSOCIATED_DOCUMENTS_URL + "?" + HttpHelper.createQuery(createParameters(uniqueId, indexName, fileName)))
.method("GET", null).build();
Response response = client.newCall(request).execute();
BufferedSink sink = Okio.buffer(Okio.sink(destination));
sink.writeAll(Objects.requireNonNull(response.body(), "No body for file '" + fileName + "'.").source());
sink.close();
response.close();
}
finally {
if (closeStream) {
destination.close();
}
}

}

public void fetchAssociatedMetadata(String uniqueId, String indexName, String fileName, OutputStream destination) {

try {

Flux<HttpResponse<byte[]>> data = Flux.from(client.exchange(
GET(ZuliaConstants.ASSOCIATED_DOCUMENTS_METADATA_URL + "?" + HttpHelper.createQuery(createParameters(uniqueId, indexName, fileName))),
byte[].class));

data.doOnNext(httpResponse -> {
try {
Document document = ZuliaUtil.byteArrayToMongoDocument(httpResponse.body());
destination.write(Objects.requireNonNull(document.toJson().getBytes(StandardCharsets.UTF_8), "No body for file"));
}
catch (IOException e) {
throw new RuntimeException(
"Failed to fetch metadata for file <" + fileName + "> for id <" + uniqueId + "> for index <" + indexName + ">: " + e.getMessage());
}
}).blockLast();

OkHttpClient client = new OkHttpClient().newBuilder().build();
Request request = new Request.Builder().url(
url + ZuliaConstants.ASSOCIATED_DOCUMENTS_URL + "?" + HttpHelper.createQuery(createParameters(uniqueId, indexName, fileName)))
.method("GET", null).build();
Response response = client.newCall(request).execute();
Document document = ZuliaUtil.byteArrayToMongoDocument(Objects.requireNonNull(response.body()).bytes());
destination.write(Objects.requireNonNull(document.toJson().getBytes(StandardCharsets.UTF_8), "No body for file"));
response.close();
}
catch (Throwable throwable) {
catch (Throwable t) {
LOG.log(Level.SEVERE,
"Failed to fetch metadata for file <" + fileName + "> for id <" + uniqueId + "> for index <" + indexName + ">: " + throwable.getMessage());
"Failed to fetch metadata for file <" + fileName + "> for id <" + uniqueId + "> for index <" + indexName + ">: " + t.getMessage());
}

}

public void fetchAssociated(String uniqueId, String indexName, OutputStream destination, boolean closeStream) throws Exception {

String allIdsJson = client.toBlocking()
.retrieve(GET(ZuliaConstants.ASSOCIATED_DOCUMENTS_ALL_FOR_ID_URL + "?" + HttpHelper.createQuery(createParameters(uniqueId, indexName))));
OkHttpClient client = new OkHttpClient().newBuilder().build();
Request request = new Request.Builder().url(
url + ZuliaConstants.ASSOCIATED_DOCUMENTS_ALL_FOR_ID_URL + "?" + HttpHelper.createQuery(createParameters(uniqueId, indexName)))
.method("GET", null).build();
Response response = client.newCall(request).execute();

String allIdsJson = Objects.requireNonNull(response.body()).string();
JsonObject result = JsonParser.parseString(allIdsJson).getAsJsonObject();
JsonArray filenames = result.getAsJsonArray("filenames");

Expand All @@ -149,6 +148,7 @@ public void fetchAssociated(String uniqueId, String indexName, OutputStream dest
zipOutputStream.close();
}
}
response.close();
}

}
Expand All @@ -168,9 +168,4 @@ private HashMap<String, Object> createParameters(String uniqueId, String indexNa
return parameters;
}

public void close() {
LOG.info("Closing REST client pool to " + url);
client.close();
}

}

0 comments on commit 9e23b9a

Please sign in to comment.