Skip to content

Commit

Permalink
Multi-index queries for ElasticSearch Integration: Fixes Hyperfoil#1207
Browse files Browse the repository at this point in the history
  • Loading branch information
johnaohara committed Feb 4, 2024
1 parent e669f43 commit e1b6fa2
Show file tree
Hide file tree
Showing 11 changed files with 219 additions and 26 deletions.
58 changes: 55 additions & 3 deletions docs/site/content/en/docs/Tutorials/query-elasticserach/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ The `json` payload for a single Elasticsearch document is as follows:
```json
{
"index": ".ds-kibana_sample_data_logs-2024.01.11-000001",
"type": "DOC",
"type": "doc",
"query": "RUO1-IwBIG0DwQQtm-ea"
}
```
Expand All @@ -58,14 +58,14 @@ $ curl 'http://localhost:8080/api/run/data?test='$TEST'&start='$START'&stop='$ST

The api will return the `RunID` for the document retrieved and analyzed from Elasticsearch.

### Query Multiple documents from Elasticsearch datastore
### Query Multiple documents from single index in Elasticsearch datastore

It is also possible to query multiple documents from Elasticsearch with a single call to the Horreum API.

```json
{
"index": ".ds-kibana_sample_data_logs-2023.12.13-000001",
"type": "SEARCH",
"type": "search",
"query": {
"from" : 0, "size" : 100,
"query": {
Expand Down Expand Up @@ -107,6 +107,58 @@ $ curl 'http://localhost:8080/api/run/data?test='$TEST'&start='$START'&stop='$ST

The query will return a list of `RunID`'s for each document retrieved and analyzed from Elasticsearch.

### Query Multiple Index for documents in Elasticsearch datastore

If your ElasticSearch instance contains meta-data and the associated documents in separate indexes, it is possible to query the meta-data index to retrive a list of documents to analyse with Horreum using a "MULTI_INDEX" query

```json
{
"index": ".ds-elastic_meta-data-index",
"type": "multi-index",
"query": {
"targetIndex": ".ds-elastic_secondary-index",
"docField": "remoteDocField",
"metaQuery": {
"from": 0,
"size": 100,
"query": {
"bool": {
"must": [
{
"term": {
"host": "artifacts.elastic.co"
}
}
],
"filter": {
"range": {
"utc_time": {
"gte": "2023-12-01T09:28:48+00:00",
"lte": "2023-12-14T09:28:48+00:00"
}
}
},
"boost": 1.0
}
}
}
}
}
```

where;

- **index**: name of the Elasticsearch index storing the **meta-data**
- **type**: "mult-index" for a multi-index query
- **query**:
- **targetIndex**: the name of the index containing the documents to analyze
- **docField**: the field in the meta-data index that contains the document id of the document to analyze
- **metaQuery**: the Elasticsearch query to execute on the meta-data index

Horreum will query the **meta-data** index, retrieve all matching documents. The meta-data and document contents can be used in any Horreum analysis.

The query will return a list of `RunID`'s for each document retrieved and analyzed from Elasticsearch.

## What Next?

After successfully querying data from Elasticsearch, you can now:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,6 @@
import jakarta.ws.rs.BadRequestException;
import org.apache.http.Header;
import org.apache.http.HttpHost;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.CredentialsProvider;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.apache.http.message.BasicHeader;
import org.elasticsearch.client.Request;
import org.elasticsearch.client.Response;
Expand Down Expand Up @@ -79,6 +75,10 @@ public DatastoreResponse handleRun(JsonNode payload,

Request request;
String finalString;
String schemaUri;
ArrayNode elasticResults;
ArrayNode extractedResults;

switch (apiRequest.type){
case DOC:
request = new Request(
Expand All @@ -95,7 +95,7 @@ public DatastoreResponse handleRun(JsonNode payload,

return new DatastoreResponse(mapper.readTree(finalString).get("_source"), payload);
case SEARCH:
String schemaUri = schemaUriOptional.orElse(null);
schemaUri = schemaUriOptional.orElse(null);
if( schemaUri == null){
throw new BadRequestException("Schema is required for search requests");
}
Expand All @@ -106,12 +106,72 @@ public DatastoreResponse handleRun(JsonNode payload,
request.setJsonEntity(mapper.writeValueAsString(apiRequest.query));
finalString = extracted(restClient, request);

ArrayNode elasticResults = (ArrayNode) mapper.readTree(finalString).get("hits").get("hits");
ArrayNode extractedResults = mapper.createArrayNode();
elasticResults = (ArrayNode) mapper.readTree(finalString).get("hits").get("hits");
extractedResults = mapper.createArrayNode();

elasticResults.forEach(jsonNode -> extractedResults.add(((ObjectNode) jsonNode.get("_source")).put("$schema", schemaUri)));

return new DatastoreResponse(extractedResults, payload);

case MULTI_INDEX:
schemaUri = schemaUriOptional.orElse(null);
if( schemaUri == null){
throw new BadRequestException("Schema is required for search requests");
}

//TODO: error handling
final MultiIndexQuery multiIndexQuery = mapper.treeToValue(apiRequest.query, MultiIndexQuery.class);

//1st retrieve the list of docs from 1st Index
request = new Request(
"GET",
"/" + apiRequest.index + "/_search");

request.setJsonEntity(mapper.writeValueAsString(multiIndexQuery.metaQuery));
finalString = extracted(restClient, request);

elasticResults = (ArrayNode) mapper.readTree(finalString).get("hits").get("hits");
extractedResults = mapper.createArrayNode();

//2nd retrieve the docs from 2nd Index and combine into a single result with metadata and doc contents
elasticResults.forEach(jsonNode -> {

ObjectNode result = ((ObjectNode) jsonNode.get("_source")).put("$schema", schemaUri);
String docString = """
{
"error": "Could not retrieve doc from secondary index"
"msg": "ERR_MSG"
}
""";

var subRequest = new Request(
"GET",
"/" + multiIndexQuery.targetIndex + "/_doc/" + jsonNode.get("_source").get(multiIndexQuery.docField).textValue());

try {
docString = extracted(restClient, subRequest);

} catch (IOException e) {

docString.replaceAll("ERR_MSG", e.getMessage());
String msg = String.format("Could not query doc request: index: %s; docID: %s (%s)", multiIndexQuery.targetIndex, multiIndexQuery.docField, e.getMessage());
log.error(msg);
}

try {
result.put("$doc", mapper.readTree(docString));
} catch (JsonProcessingException e) {
docString.replaceAll("ERR_MSG", e.getMessage());
String msg = String.format("Could not parse doc result: %s, %s", docString, e.getMessage());
log.error(msg);
}

extractedResults.add(result);

});

return new DatastoreResponse(extractedResults, payload);

default:
throw new BadRequestException("Invalid request type: " + apiRequest.type);
}
Expand Down Expand Up @@ -146,15 +206,29 @@ public UploadType uploadType() {
}

private static class ElasticRequest {
public ElasticRequest() {
}

public String index;
public RequestType type;
public JsonNode query;

}

static class MultiIndexQuery {
public MultiIndexQuery() {
}

public String docField;
public String targetIndex;
public JsonNode metaQuery;
}


private enum RequestType {
DOC ("doc"),
SEARCH ("search");
SEARCH ("search"),
MULTI_INDEX ("multi-index");
private final String type;

RequestType(String s) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,6 @@ public class DatasourceTest extends BaseServiceTest{
@Inject
RestClient elasticRestClient;

@Inject
ConfigService configService;


@org.junit.jupiter.api.Test
public void testRetrieveDataFromElastic(TestInfo info) throws InterruptedException {

Expand All @@ -69,7 +65,7 @@ public void testRetrieveDataFromElastic(TestInfo info) throws InterruptedExcepti
"type": "DOC",
"query": "{docID}"
}
""".replace("{docID}", "1047");
""".replace("{docID}", "f4a0c0ea-a3cc-4c2e-bb28-00d1a25b0135");

String runID = uploadRun(payload, testConfig.test.name, testConfig.schema.uri);

Expand All @@ -89,8 +85,6 @@ public void testRetrieveDataFromElastic(TestInfo info) throws InterruptedExcepti
public void multidocPayload(TestInfo info) throws InterruptedException {
TestConfig testConfig = createNewTestAndDatastores(info);

BlockingQueue<Dataset.EventNew> dataSetQueue = eventConsumerQueue(Dataset.EventNew.class, MessageBusChannels.DATASET_NEW, e -> e.testId == testConfig.test.id);

String payload = """
{
"index": "tfb",
Expand All @@ -111,10 +105,48 @@ public void multidocPayload(TestInfo info) throws InterruptedException {
String runID = uploadRun(payload, testConfig.test.name, testConfig.schema.uri);

assertNotNull(runID);
// Assert.assertNotEquals(runID, "");
Assert.assertEquals(4, runID.split(",").length);


}

@org.junit.jupiter.api.Test
public void multiQueryPayload(TestInfo info) throws InterruptedException {
TestConfig testConfig = createNewTestAndDatastores(info);


String payload = """
{
"index": "meta",
"type": "MULTI_INDEX",
"query": {
"targetIndex": "tfb",
"docField": "uid",
"metaQuery": {
"from": 0,
"size": 100,
"query": {
"bool": {
"must": [
{
"term": {
"env": "aws"
}
}
],
"boost": 1.0
}
}
}
}
}
""";
String runID = uploadRun(payload, testConfig.test.name, testConfig.schema.uri);

assertNotNull(runID);
Assert.assertEquals(2, runID.split(",").length);

}

@org.junit.jupiter.api.Test
public void testDeleteDatasource(TestInfo info) throws InterruptedException {
Expand Down Expand Up @@ -198,19 +230,30 @@ public TestConfig(Test test, Schema schema, Datastore datastore) {
@BeforeAll
public void configureElasticDatasets(){

uploadDoc("data/experiment-ds1.json");
uploadDoc("data/experiment-ds2.json");
uploadDoc("data/experiment-ds3.json");
uploadDoc("data/config-quickstart.jvm.json");
uploadDoc("meta", "uid", "data/experiment-meta-data-d1.json");
uploadDoc("meta", "uid", "data/experiment-meta-data-d2.json");
uploadDoc("meta", "uid", "data/experiment-meta-data-d3.json");

uploadDoc("tfb", "uid", "data/experiment-ds1.json");
uploadDoc("tfb", "uid", "data/experiment-ds2.json");
uploadDoc("tfb", "uid", "data/experiment-ds3.json");
uploadDoc("tfb", "uid", "data/config-quickstart.jvm.json");

//nasty hack; sleep for 10 seconds to "ensure" that the uploaded test data is indexed by ES
try {
Thread.currentThread().sleep(10_000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}

}

private void uploadDoc(String resourcepath){
private void uploadDoc(String index, String idField, String resourcepath){
try {
JsonNode payload = new ObjectMapper().readTree(resourceToString(resourcepath));
Request request = new Request(
"PUT",
"/tfb/_doc/" + payload.get("build-id").toString());
"/" + index + "/_doc/" + payload.get(idField).textValue());
request.setJsonEntity(payload.toString());
Response response = elasticRestClient.performRequest(request);
assertNotNull(response);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
{
"uid": "f4a0c0ea-a3cc-4c2e-bb28-00d1a25b0135",
"start": "2021-07-12T04:17:48Z",
"stop": "2021-07-12T04:25:59Z",
"job": "Dummy",
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
{
"uid": "1361483a-dfb0-4a49-95b6-7b2f07bde40a",
"start": "2021-07-12T04:17:48Z",
"stop": "2021-07-12T04:25:59Z",
"job": "Dummy",
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
{
"uid": "bdce584b-c5a5-4287-8259-1d12f2424ab3",
"start": "2021-07-13T04:17:48Z",
"stop": "2021-07-13T04:25:59Z",
"job": "Dummy",
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
{
"uid": "0bb2b7ce-b13e-4d9a-86e7-2060aec5f298",
"start": "2021-07-14T04:17:48Z",
"stop": "2021-07-14T04:25:59Z",
"job": "Dummy",
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
{"start": "2021-07-12T04:17:48Z",
{
"uid": "74d24337-b90e-4e49-9386-9d0d84da88ca",
"start": "2021-07-12T04:17:48Z",
"stop": "2021-07-12T04:25:59Z",
"job": "Dummy",
"build-id": 101,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{
"start": "2021-07-12T04:17:48Z",
"job": "Dummy",
"env": "aws",
"uid": "1361483a-dfb0-4a49-95b6-7b2f07bde40a"
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{
"start": "2021-07-13T04:17:48Z",
"job": "Dummy",
"env": "aws",
"uid": "bdce584b-c5a5-4287-8259-1d12f2424ab3"
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{
"start": "2021-07-14T04:17:48Z",
"job": "Dummy",
"env": "gcc",
"uid": "0bb2b7ce-b13e-4d9a-86e7-2060aec5f298"
}

0 comments on commit e1b6fa2

Please sign in to comment.