Skip to content

Commit

Permalink
NIFI-13719 allow Elasticsearch response to contain Long values for th…
Browse files Browse the repository at this point in the history
…e took field
  • Loading branch information
ChrisSamo632 committed Sep 7, 2024
1 parent fafdb96 commit c356562
Show file tree
Hide file tree
Showing 7 changed files with 114 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,8 @@ public boolean hasErrors() {
@SuppressWarnings("unchecked")
public static IndexOperationResponse fromJsonResponse(final String response) throws IOException {
final Map<String, Object> parsedResponse = OBJECT_MAPPER.readValue(response, Map.class);
final int took = (int) parsedResponse.get("took");
// took should be an int, but could be a long (bg in Elasticsearch 8.15.0)
final long took = Long.parseLong(String.valueOf(parsedResponse.get("took")));
final boolean hasErrors = (boolean) parsedResponse.get("errors");
final List<Map<String, Object>> items = (List<Map<String, Object>>) parsedResponse.get("items");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,15 @@ public class SearchResponse implements OperationResponse {
private final List<Map<String, Object>> hits;
private final Map<String, Object> aggregations;
private final long numberOfHits;
private final int took;
private final long took;
private final boolean timedOut;
private final String pitId;
private final String scrollId;
private final String searchAfter;
private final List<String> warnings;

public SearchResponse(final List<Map<String, Object>> hits, final Map<String, Object> aggregations, final String pitId,
final String scrollId, final String searchAfter, final int numberOfHits, final int took, final boolean timedOut,
final String scrollId, final String searchAfter, final int numberOfHits, final long took, final boolean timedOut,
final List<String> warnings) {
this.hits = hits;
this.aggregations = aggregations;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -579,7 +579,7 @@ private List<String> parseResponseWarningHeaders(final Response response) {
final List<String> warnings = Arrays.stream(response.getHeaders())
.filter(h -> "Warning".equalsIgnoreCase(h.getName()))
.map(Header::getValue)
.collect(Collectors.toList());
.toList();

warnings.forEach(w -> getLogger().warn("Elasticsearch Warning: {}", w));

Expand Down Expand Up @@ -627,13 +627,11 @@ protected void buildRequest(final IndexOperationRequest request, final StringBui
final String header = buildBulkHeader(request);
builder.append(header).append("\n");
switch (request.getOperation()) {
case Index:
case Create:
case Index, Create:
final String indexDocument = mapper.writeValueAsString(request.getFields());
builder.append(indexDocument).append("\n");
break;
case Update:
case Upsert:
case Update, Upsert:
final Map<String, Object> updateBody = new HashMap<>(2, 1);
if (request.getScript() != null && !request.getScript().isEmpty()) {
updateBody.put("script", request.getScript());
Expand Down Expand Up @@ -945,7 +943,8 @@ private SearchResponse buildSearchResponse(final Response response) throws JsonP
final Map<String, Object> parsed = parseResponse(response);
final List<String> warnings = parseResponseWarningHeaders(response);

final int took = (Integer) parsed.get("took");
// took should be an int, but could be a long (bg in Elasticsearch 8.15.0)
final long took = Long.parseLong(String.valueOf(parsed.get("took")));
final boolean timedOut = (Boolean) parsed.get("timed_out");
final String pitId = parsed.get("pit_id") != null ? (String) parsed.get("pit_id") : null;
final String scrollId = parsed.get("_scroll_id") != null ? (String) parsed.get("_scroll_id") : null;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.nifi.elasticsearch;

import com.fasterxml.jackson.databind.ObjectMapper;
import org.junit.jupiter.api.Test;

import java.util.Collections;
import java.util.List;
import java.util.Map;

import static org.junit.jupiter.api.Assertions.assertEquals;

class IndexOperationResponseTest {
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();

@Test
void testTook() {
long took = 100;
final IndexOperationResponse response = new IndexOperationResponse(took);
assertEquals(took, response.getTook());
}

@Test
void testFromJson() throws Exception {
final long took = 100;
final boolean errors = false;
final List<Map<String, Object>> items = Collections.emptyList();

final Map<String, Object> responseMap = Map.of(
"took", Long.valueOf(took).intValue(),
"errors", errors,
"items", items
);
final String responseJson = OBJECT_MAPPER.writeValueAsString(responseMap);
final IndexOperationResponse response = IndexOperationResponse.fromJsonResponse(responseJson);

assertEquals(took, response.getTook());
assertEquals(errors, response.hasErrors());
assertEquals(items, response.getItems());
}

@Test
void testLongTookTime() {
final long took = 34493262031L; // example "took" from Elasticsearch 8.15.0
final IndexOperationResponse response = new IndexOperationResponse(took);
assertEquals(took, response.getTook());
}

@Test
void testFromJsonLongTook() throws Exception {
final long took = 34493262031L; // example "took" from Elasticsearch 8.15.0
final boolean errors = true;
final List<Map<String, Object>> items = Collections.singletonList(Collections.emptyMap());

final Map<String, Object> responseMap = Map.of(
"took", took,
"errors", errors,
"items", items
);
final String responseJson = OBJECT_MAPPER.writeValueAsString(responseMap);
final IndexOperationResponse response = IndexOperationResponse.fromJsonResponse(responseJson);

assertEquals(took, response.getTook());
assertEquals(errors, response.hasErrors());
assertEquals(items, response.getItems());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;

public class SearchResponseTest {
class SearchResponseTest {
@Test
void test() {
final List<Map<String, Object>> results = new ArrayList<>();
Expand All @@ -37,7 +37,7 @@ void test() {
final String scrollId = "scrollId";
final String searchAfter = "searchAfter";
final int num = 10;
final int took = 100;
final long took = 100;
final boolean timeout = false;
final List<String> warnings = Collections.singletonList("auth");
final SearchResponse response = new SearchResponse(results, aggs, pitId, scrollId, searchAfter, num, took, timeout, warnings);
Expand All @@ -61,4 +61,20 @@ void test() {
assertTrue(str.contains("timedOut"));
assertTrue(str.contains("warnings"));
}

@Test
void testLongTookTime() {
final List<Map<String, Object>> results = new ArrayList<>();
final Map<String, Object> aggs = new HashMap<>();
final String pitId = "pitId";
final String scrollId = "scrollId";
final String searchAfter = "searchAfter";
final int num = 10;
final long took = 34493262031L; // example "took" from Elasticsearch 8.15.0
final boolean timeout = false;
final List<String> warnings = Collections.singletonList("auth");
final SearchResponse response = new SearchResponse(results, aggs, pitId, scrollId, searchAfter, num, took, timeout, warnings);

assertEquals(took, response.getTook());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@
public abstract class AbstractElasticsearchITBase {
// default Elasticsearch version should (ideally) match that in the nifi-elasticsearch-bundle#pom.xml for the integration-tests profile
protected static final DockerImageName IMAGE = DockerImageName
.parse(System.getProperty("elasticsearch.docker.image", "docker.elastic.co/elasticsearch/elasticsearch:8.13.3"));
.parse(System.getProperty("elasticsearch.docker.image", "docker.elastic.co/elasticsearch/elasticsearch:8.15.1"));
protected static final String ELASTIC_USER_PASSWORD = System.getProperty("elasticsearch.elastic_user.password", RandomStringUtils.randomAlphanumeric(10, 20));
private static final int PORT = 9200;
protected static final ElasticsearchContainer ELASTICSEARCH_CONTAINER = new ElasticsearchContainer(IMAGE)
Expand Down
6 changes: 3 additions & 3 deletions nifi-extension-bundles/nifi-elasticsearch-bundle/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ language governing permissions and limitations under the License. -->
</modules>

<properties>
<elasticsearch.client.version>8.15.0</elasticsearch.client.version>
<elasticsearch.client.version>8.15.1</elasticsearch.client.version>
</properties>

<dependencyManagement>
Expand Down Expand Up @@ -88,7 +88,7 @@ language governing permissions and limitations under the License. -->
</activation>
<properties>
<!-- also update the default Elasticsearch version in nifi-elasticsearch-test-utils#src/main/java/org/apache/nifi/elasticsearch/integration/AbstractElasticsearchITBase.java-->
<elasticsearch_docker_image>8.15.0</elasticsearch_docker_image>
<elasticsearch_docker_image>${elasticsearch.client.version}</elasticsearch_docker_image>
<elasticsearch.elastic.password>s3cret</elasticsearch.elastic.password>
</properties>
<build>
Expand Down Expand Up @@ -119,7 +119,7 @@ language governing permissions and limitations under the License. -->
<profile>
<id>elasticsearch7</id>
<properties>
<elasticsearch_docker_image>7.17.21</elasticsearch_docker_image>
<elasticsearch_docker_image>7.17.23</elasticsearch_docker_image>
</properties>
</profile>
</profiles>
Expand Down

0 comments on commit c356562

Please sign in to comment.