Skip to content

Commit

Permalink
Fix Workplace Search Client tests
Browse files Browse the repository at this point in the history
  • Loading branch information
dadoonet committed Feb 10, 2022
1 parent 5e4d021 commit 92495ee
Show file tree
Hide file tree
Showing 9 changed files with 148 additions and 77 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,25 +19,30 @@

package fr.pilato.elasticsearch.crawler.fs.thirdparty.wpsearch;

import com.jayway.jsonpath.DocumentContext;
import com.jayway.jsonpath.JsonPath;
import fr.pilato.elasticsearch.crawler.fs.framework.FsCrawlerUtil;
import fr.pilato.elasticsearch.crawler.fs.framework.TimeValue;
import fr.pilato.elasticsearch.crawler.fs.framework.Version;
import fr.pilato.elasticsearch.crawler.fs.framework.bulk.FsCrawlerBulkProcessor;
import fr.pilato.elasticsearch.crawler.fs.framework.bulk.FsCrawlerRetryBulkProcessorListener;
import jakarta.ws.rs.NotFoundException;
import jakarta.ws.rs.ProcessingException;
import jakarta.ws.rs.WebApplicationException;
import jakarta.ws.rs.client.Client;
import jakarta.ws.rs.client.ClientBuilder;
import jakarta.ws.rs.client.Entity;
import jakarta.ws.rs.client.Invocation;
import jakarta.ws.rs.client.WebTarget;
import jakarta.ws.rs.core.MediaType;
import jakarta.ws.rs.core.Response;
import org.apache.commons.io.FilenameUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.glassfish.jersey.client.ClientConfig;
import org.glassfish.jersey.client.ClientProperties;
import org.glassfish.jersey.client.authentication.HttpAuthenticationFeature;
import org.glassfish.jersey.logging.LoggingFeature;

import java.io.Closeable;
import java.io.IOException;
Expand All @@ -47,9 +52,11 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.logging.Level;

import static fr.pilato.elasticsearch.crawler.fs.framework.FsCrawlerUtil.*;
import static fr.pilato.elasticsearch.crawler.fs.framework.JsonUtil.parseJson;
import static fr.pilato.elasticsearch.crawler.fs.framework.JsonUtil.parseJsonAsDocumentContext;

/**
* Workplace Search Java client
Expand All @@ -60,16 +67,16 @@ public class WPSearchClient implements Closeable {

private static final Logger logger = LogManager.getLogger(WPSearchClient.class);

private final static String DEFAULT_ENDPOINT = "/api/ws/v1/";
private static final String USER_AGENT = "FSCrawler-Rest-Client-" + Version.getVersion();

final static String DEFAULT_WS_ENDPOINT = "/api/ws/v1/";
final static String DEFAULT_ENT_ENDPOINT = "/api/ent/v1/";
private final static String DEFAULT_HOST = "http://127.0.0.1:3002";

private Client client;
private String userAgent;
private String endpoint = DEFAULT_ENDPOINT;
private String host = DEFAULT_HOST;
private String username;
private String password;
private String urlForApi;
private int bulkSize;
private TimeValue flushInterval;

Expand All @@ -85,7 +92,6 @@ public class WPSearchClient implements Closeable {
public WPSearchClient(Path rootDir, Path jobMappingDir) {
this.rootDir = rootDir;
this.jobMappingDir = jobMappingDir;
this.urlForApi = host + endpoint;
}

/**
Expand Down Expand Up @@ -118,35 +124,13 @@ public WPSearchClient withPassword(String password, String defaultValue) {
return this;
}

/**
* If needed we can allow passing a specific user-agent
* @param userAgent User Agent
* @return the current instance
*/
public WPSearchClient withUserAgent(String userAgent) {
this.userAgent = userAgent;
return this;
}

/**
* Define a specific endpoint. Defaults to "/api/ws/v1"
* @param endpoint If we need to change the default endpoint
* @return the current instance
*/
public WPSearchClient withEndpoint(String endpoint) {
this.endpoint = endpoint;
this.urlForApi = host + endpoint;
return this;
}

/**
* Define a specific host. Defaults to "http://localhost:3002"
* @param host If we need to change the default host
* @return the current instance
*/
public WPSearchClient withHost(String host) {
this.host = host;
this.urlForApi = host + endpoint;
return this;
}

Expand Down Expand Up @@ -187,6 +171,13 @@ public void start() {
config.property(ClientProperties.SUPPRESS_HTTP_COMPLIANCE_VALIDATION, true);
HttpAuthenticationFeature feature = HttpAuthenticationFeature.basic(username, password);
client = ClientBuilder.newClient(config);
if (logger.isTraceEnabled()) {
client
// .property(LoggingFeature.LOGGING_FEATURE_LOGGER_NAME_CLIENT, WPSearchClient.class.getName())
.property(LoggingFeature.LOGGING_FEATURE_LOGGER_LEVEL_CLIENT, Level.FINEST.getName())
.property(LoggingFeature.LOGGING_FEATURE_VERBOSITY_CLIENT, LoggingFeature.Verbosity.PAYLOAD_ANY)
.property(LoggingFeature.LOGGING_FEATURE_MAX_ENTITY_SIZE_CLIENT, 8000);
}
client.register(feature);

// Create the BulkProcessor instance
Expand All @@ -198,6 +189,15 @@ public void start() {
.setFlushInterval(flushInterval)
.build();

// We check that the service is available
try {
String version = getVersion();
logger.info("Wokplace Search Client connected to a service running version {}", version);
} catch (Exception e) {
logger.warn("failed to create workplace search client on {}, disabling crawler...", host);
throw e;
}

started = true;
}

Expand Down Expand Up @@ -256,7 +256,7 @@ public String getDocument(String id) {
checkStarted();
logger.debug("Getting document {} to custom source {}", id, sourceId);
try {
return get("/sources/" + sourceId + "/documents/" + id, String.class);
return get(DEFAULT_WS_ENDPOINT, "sources/" + sourceId + "/documents/" + id, String.class);
} catch (NotFoundException e) {
return null;
}
Expand All @@ -272,7 +272,7 @@ public boolean destroyDocuments(String sourceId, List<String> ids) {
checkStarted();
logger.debug("Removing from source {} documents {}", sourceId, ids);
try {
String response = post("sources/" + sourceId + "/documents/bulk_destroy", ids, String.class);
String response = post(DEFAULT_WS_ENDPOINT, "sources/" + sourceId + "/documents/bulk_destroy", ids, String.class);
logger.debug("Removing documents response: {}", response);
// TODO parse the response to check for errors
return true;
Expand Down Expand Up @@ -310,7 +310,7 @@ public String search(String query, Map<String, Object> filters) {
request.put("filters", filters);
}

String json = post("search", request, String.class);
String json = post(DEFAULT_WS_ENDPOINT, "search", request, String.class);

logger.debug("Search response: {}", json);
return json;
Expand All @@ -322,7 +322,7 @@ public String search(String query, Map<String, Object> filters) {
* @return the source as a json document.
*/
public String getCustomSourceById(String id) {
return get("sources/" + id, String.class);
return get(DEFAULT_WS_ENDPOINT, "sources/" + id, String.class);
}

/**
Expand Down Expand Up @@ -366,7 +366,7 @@ public List<String> getCustomSourcesByName(String name) {

// TODO add pagination
public String listAllCustomSources(int page) {
return get("sources", String.class);
return get(DEFAULT_WS_ENDPOINT, "sources", String.class);
}

/**
Expand All @@ -386,7 +386,7 @@ public String createCustomSource(String sourceName, String version) throws IOExc
// We need to replace the place holder values
json = json.replaceAll("SOURCE_NAME", sourceName);

String response = post("sources/", json, String.class);
String response = post(DEFAULT_WS_ENDPOINT, "sources/", json, String.class);
logger.trace("Source [{}] created. Response: {}", sourceName, response);

// We parse the json
Expand All @@ -406,7 +406,7 @@ public void removeCustomSource(String id) {
checkStarted();

// Delete the source
String response = delete("sources/" + id, null, String.class);
String response = delete(DEFAULT_WS_ENDPOINT, "sources/" + id, null, String.class);
logger.debug("removeCustomSource({}): {}", id, response);
}

Expand All @@ -415,8 +415,10 @@ public void removeCustomSource(String id) {
* @return the version number
*/
public String getVersion() {
throw new jakarta.ws.rs.ServiceUnavailableException("The service is not running... Check the service.");
// return null;
logger.debug("get version");
String home = get(DEFAULT_ENT_ENDPOINT, "internal/version", String.class);
DocumentContext context = parseJsonAsDocumentContext(home);
return context.read("$.number");
}

public void flush() {
Expand Down Expand Up @@ -445,57 +447,67 @@ private void checkStarted() {
}
}

<T> T get(String path, Class<T> clazz) {
logger.trace("Calling GET {}{}", urlForApi, path);
<T> T get(String urlForApi, String path, Class<T> clazz) {
logger.debug("Calling GET {}{}", urlForApi, path);
try {
return prepareHttpCall(path).get(clazz);
Response response = prepareHttpCall(urlForApi, path).build("GET").invoke();
logger.trace("Response headers: {}", response.getHeaders());
T entity = response.readEntity(clazz);
logger.trace("Response entity: {}", entity);
logger.trace("Status: {}", response.getStatusInfo());
if (response.getStatusInfo().getStatusCode() == Response.Status.NOT_FOUND.getStatusCode()) {
throw new NotFoundException();
}

return entity;
} catch (NotFoundException e) {
logger.debug("Calling GET {}{} gives {}", urlForApi, path, e.getMessage());
throw e;
} catch (WebApplicationException e) {
logger.warn("Error while running GET {}{}: {}", urlForApi, path, e.getResponse().readEntity(String.class));
throw e;
}
}

<T> T post(String path, Object data, Class<T> clazz) {
<T> T post(String urlForApi, String path, Object data, Class<T> clazz) {
logger.trace("Calling POST {}{}", urlForApi, path);
try {
return prepareHttpCall(path).post(Entity.json(data), clazz);
return prepareHttpCall(urlForApi, path).post(Entity.json(data), clazz);
} catch (WebApplicationException e) {
logger.warn("Error while running POST {}{}: {}", urlForApi, path, e.getResponse().readEntity(String.class));
throw e;
}
}

private <T> T delete(String path, Object data, Class<T> clazz) {
private <T> T delete(String urlForApi, String path, Object data, Class<T> clazz) {
logger.trace("Calling DELETE {}{}", urlForApi, path);
try {
return prepareHttpCall(path).method("DELETE", Entity.json(data), clazz);
return prepareHttpCall(urlForApi, path).method("DELETE", Entity.json(data), clazz);
} catch (WebApplicationException e) {
logger.warn("Error while running DELETE {}{}: {}", urlForApi, path, e.getResponse().readEntity(String.class));
throw e;
}
}

private Invocation.Builder prepareHttpCall(String path) {
private Invocation.Builder prepareHttpCall(String urlForApi, String path) {
WebTarget target = client
.target(urlForApi)
.target(host)
.path(urlForApi)
.path(path);

Invocation.Builder builder = target
.request(MediaType.APPLICATION_JSON)
.header("Content-Type", "application/json");

if (userAgent != null) {
builder.header("User-Agent", userAgent);
}
builder.header("User-Agent", USER_AGENT);

return builder;
}

@Override
public String toString() {
return "WPSearchClient{" + "endpoint='" + endpoint + '\'' +
return "WPSearchClient{" +
", host='" + host + '\'' +
", urlForApi='" + urlForApi + '\'' +
'}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
import java.util.List;
import java.util.Map;

import static fr.pilato.elasticsearch.crawler.fs.thirdparty.wpsearch.WPSearchClient.DEFAULT_WS_ENDPOINT;

public class WPSearchEngine implements Engine<WPSearchOperation, WPSearchBulkRequest, WPSearchBulkResponse> {
private static final Logger logger = LogManager.getLogger(WPSearchEngine.class);
private final WPSearchClient wpSearchClient;
Expand Down Expand Up @@ -55,7 +57,7 @@ public WPSearchBulkResponse bulk(WPSearchBulkRequest request) {

logger.debug("Sending a bulk request of [{}] documents to the Workplace Search service [{}]",
operationsBySource.get(sourceId).size(), wpSearchClient.toString());
String response = wpSearchClient.post(urlForBulkCreate, operationsBySource.get(sourceId), String.class);
String response = wpSearchClient.post(DEFAULT_WS_ENDPOINT, urlForBulkCreate, operationsBySource.get(sourceId), String.class);
responses.put(sourceId, response);
} catch (Exception e) {
logger.error(e);
Expand Down
4 changes: 3 additions & 1 deletion contrib/docker-compose-it/.env
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
# Elastic Stack settings
ELASTIC_VERSION=7.17.0
ELASTIC_PASSWORD=changeme
LICENSE_TYPE=basic
# Trial license is only needed to run integration tests
# LICENSE_TYPE=basic
LICENSE_TYPE=trial

# Elasticsearch settings
IMG_ELASTICSEARCH=docker.elastic.co/elasticsearch/elasticsearch
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import fr.pilato.elasticsearch.crawler.fs.client.WorkplaceSearchClient;
import fr.pilato.elasticsearch.crawler.fs.framework.JsonUtil;
import fr.pilato.elasticsearch.crawler.fs.settings.FsSettings;
import jakarta.ws.rs.ServiceUnavailableException;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

Expand All @@ -60,7 +61,11 @@ public FsCrawlerDocumentServiceWorkplaceSearchImpl(Path config, FsSettings setti

@Override
public void start() throws IOException {
client.start();
try {
client.start();
} catch (ServiceUnavailableException e) {
logger.fatal("Can not start the Workplace Search client.");
}
logger.debug("Workplace Search Document Service started");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import fr.pilato.elasticsearch.crawler.fs.framework.bulk.FsCrawlerBulkProcessor;
import fr.pilato.elasticsearch.crawler.fs.framework.bulk.FsCrawlerRetryBulkProcessorListener;
import fr.pilato.elasticsearch.crawler.fs.settings.FsSettings;
import fr.pilato.elasticsearch.crawler.fs.thirdparty.wpsearch.WPSearchClient;
import jakarta.ws.rs.NotFoundException;
import jakarta.ws.rs.WebApplicationException;
import jakarta.ws.rs.client.Client;
Expand Down Expand Up @@ -59,6 +60,7 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.logging.Level;

import static fr.pilato.elasticsearch.crawler.fs.framework.FsCrawlerUtil.*;
import static fr.pilato.elasticsearch.crawler.fs.framework.JsonUtil.*;
Expand Down Expand Up @@ -111,8 +113,11 @@ public void start() throws IOException {
settings.getElasticsearch().getPassword());
client = ClientBuilder.newClient(config);
if (logger.isTraceEnabled()) {
client.property(LoggingFeature.LOGGING_FEATURE_VERBOSITY_CLIENT, LoggingFeature.Verbosity.PAYLOAD_ANY)
.property(LoggingFeature.LOGGING_FEATURE_LOGGER_LEVEL_CLIENT, "FINER");
client
// .property(LoggingFeature.LOGGING_FEATURE_LOGGER_NAME_CLIENT, ElasticsearchClient.class.getName())
.property(LoggingFeature.LOGGING_FEATURE_LOGGER_LEVEL_CLIENT, Level.FINEST.getName())
.property(LoggingFeature.LOGGING_FEATURE_VERBOSITY_CLIENT, LoggingFeature.Verbosity.PAYLOAD_ANY)
.property(LoggingFeature.LOGGING_FEATURE_MAX_ENTITY_SIZE_CLIENT, 8000);
}
client.register(feature);

Expand Down Expand Up @@ -705,6 +710,7 @@ private String httpCall(String method, String path, Object data, Map.Entry<Strin
logger.trace("{} {}/{} gives {}", method, node, path == null ? "" : path, response);
return response;
} catch (WebApplicationException e) {
// TODO Test with non existing nodes. It should raise a ProcessingException -> ConnectException
if (e.getResponse().getStatusInfo().getFamily() == Response.Status.Family.SERVER_ERROR) {
logger.warn("Error on server side. We need to try another node if possible. {} -> {}",
e.getResponse().getStatus(),
Expand Down
Loading

0 comments on commit 92495ee

Please sign in to comment.