Skip to content

Commit

Permalink
Merge pull request #2 from kxu913/develop
Browse files Browse the repository at this point in the history
refactor handler and support batch insert using esclient.
  • Loading branch information
kxu913 authored Nov 24, 2023
2 parents a438777 + 1ce656b commit f21a268
Show file tree
Hide file tree
Showing 4 changed files with 18 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import lombok.extern.slf4j.Slf4j;

import java.util.function.Consumer;
import java.util.function.Function;

@Slf4j
Expand All @@ -18,9 +19,9 @@ public class AsyncResultHandler {
* @param func, the function that handle the success object.
* @param <T> success object that used to handle.
*/
public static <T> void handleAsyncResult(AsyncResult<T> ar, Function<T, Void> func) {
public static <T> void handleAsyncResult(AsyncResult<T> ar, Consumer<T> func) {
if (ar.succeeded()) {
func.apply(ar.result());
func.accept(ar.result());
} else {
log.error(ar.cause().getMessage(), ar.cause());
}
Expand Down Expand Up @@ -55,7 +56,7 @@ public static <T, R> R handleAsyncResultWithReturn(AsyncResult<T> ar, Function<T
* @param func, the function that handle the success object.
* @param <T> success object that used to handle
*/
public static <T> void handleFuture(Future<T> future, Function<T, Void> func) {
public static <T> void handleFuture(Future<T> future, Consumer<T> func) {
future.onComplete(ar -> {
handleAsyncResult(ar, func);
}).onFailure(err -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,12 @@
import lombok.extern.slf4j.Slf4j;
import org.apache.http.Header;
import org.apache.http.HttpHost;
import org.apache.http.HttpResponseInterceptor;
import org.apache.http.message.BasicHeader;
import org.elasticsearch.client.RestClient;

import java.util.HashMap;
import java.util.List;
import java.util.Map;

@Slf4j
Expand Down Expand Up @@ -88,7 +90,12 @@ public RestClient getESRestClient() {
return RestClient.builder(new HttpHost(esConfig.getString("host"), esConfig.getInteger("port"), esConfig.getString("schema")))
.setDefaultHeaders(new Header[]{
new BasicHeader("Content-type", esConfig.getString("data-type"))
}).build();
}).setHttpClientConfigCallback(httpAsyncClientBuilder ->
httpAsyncClientBuilder
.addInterceptorLast((HttpResponseInterceptor) (response, context) ->
response.setHeader("X-Elastic-Product", esConfig.getString("product-id"))
)
.setMaxConnPerRoute(100)).build();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import io.vertx.ext.web.RoutingContext;
import lombok.extern.slf4j.Slf4j;

import java.util.function.Consumer;
import java.util.function.Function;

@Slf4j
Expand All @@ -18,9 +19,9 @@ public class AsyncResultHandler {
* @param func, the function that handle the success object.
* @param <T> success object that used to handle.
*/
public static <T> void handleAsyncResult(AsyncResult<T> ar, RoutingContext ctx, Function<T, Void> func) {
public static <T> void handleAsyncResult(AsyncResult<T> ar, RoutingContext ctx, Consumer<T> func) {
if (ar.succeeded()) {
func.apply(ar.result());
func.accept(ar.result());
} else {
log.error(ar.cause().getMessage(), ar.cause());
ctx.fail(500, ar.cause());
Expand Down Expand Up @@ -55,7 +56,7 @@ public static <T, R> R handleAsyncResultWithReturn(AsyncResult<T> ar, Function<T
* @param func, the function that handle the success object.
* @param <T> success object that used to handle
*/
public static <T> void handleFuture(Future<T> future, RoutingContext ctx, Function<T, Void> func) {
public static <T> void handleFuture(Future<T> future, RoutingContext ctx, Consumer<T> func) {
future.onComplete(ar -> {
handleAsyncResult(ar, ctx, func);
}).onFailure(err -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
"host": "localhost",
"port": 9200,
"schema": "http",
"data-type": "application/json"
"data-type": "application/json",
"product-id": "es"
}
}

0 comments on commit f21a268

Please sign in to comment.