diff --git a/x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFramePivotRestIT.java b/x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFramePivotRestIT.java index 71f0dc248b8b6..770eaec7bd141 100644 --- a/x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFramePivotRestIT.java +++ b/x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFramePivotRestIT.java @@ -428,6 +428,56 @@ public void testPivotWithBucketScriptAgg() throws Exception { assertEquals(3.878048780, actual.doubleValue(), 0.000001); } + public void testPivotWithGeoCentroidAgg() throws Exception { + String transformId = "geoCentroidPivot"; + String dataFrameIndex = "geo_centroid_pivot_reviews"; + setupDataAccessRole(DATA_ACCESS_ROLE, REVIEWS_INDEX_NAME, dataFrameIndex); + + final Request createDataframeTransformRequest = createRequestWithAuth("PUT", DATAFRAME_ENDPOINT + transformId, + BASIC_AUTH_VALUE_DATA_FRAME_ADMIN_WITH_SOME_DATA_ACCESS); + + String config = "{" + + " \"source\": {\"index\":\"" + REVIEWS_INDEX_NAME + "\"}," + + " \"dest\": {\"index\":\"" + dataFrameIndex + "\"},"; + + config += " \"pivot\": {" + + " \"group_by\": {" + + " \"reviewer\": {" + + " \"terms\": {" + + " \"field\": \"user_id\"" + + " } } }," + + " \"aggregations\": {" + + " \"avg_rating\": {" + + " \"avg\": {" + + " \"field\": \"stars\"" + + " } }," + + " \"location\": {" + + " \"geo_centroid\": {\"field\": \"location\"}" + + " } } }" + + "}"; + + createDataframeTransformRequest.setJsonEntity(config); + Map createDataframeTransformResponse = entityAsMap(client().performRequest(createDataframeTransformRequest)); + assertThat(createDataframeTransformResponse.get("acknowledged"), equalTo(Boolean.TRUE)); + + startAndWaitForTransform(transformId, dataFrameIndex, BASIC_AUTH_VALUE_DATA_FRAME_ADMIN_WITH_SOME_DATA_ACCESS); + assertTrue(indexExists(dataFrameIndex)); + + // we expect 27 documents as there shall be 27 user_id's + Map indexStats = getAsMap(dataFrameIndex + "/_stats"); + assertEquals(27, XContentMapValues.extractValue("_all.total.docs.count", indexStats)); + + // get and check some users + Map searchResult = getAsMap(dataFrameIndex + "/_search?q=reviewer:user_4"); + assertEquals(1, XContentMapValues.extractValue("hits.total.value", searchResult)); + Number actual = (Number) ((List) XContentMapValues.extractValue("hits.hits._source.avg_rating", searchResult)).get(0); + assertEquals(3.878048780, actual.doubleValue(), 0.000001); + String actualString = (String) ((List) XContentMapValues.extractValue("hits.hits._source.location", searchResult)).get(0); + String[] latlon = actualString.split(","); + assertEquals((4 + 10), Double.valueOf(latlon[0]), 0.000001); + assertEquals((4 + 15), Double.valueOf(latlon[1]), 0.000001); + } + private void assertOnePivotValue(String query, double expected) throws IOException { Map searchResult = getAsMap(query); diff --git a/x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameRestTestCase.java b/x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameRestTestCase.java index 4344aa823b4cc..db07e8513cc2d 100644 --- a/x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameRestTestCase.java +++ b/x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameRestTestCase.java @@ -77,6 +77,9 @@ protected void createReviewsIndex() throws IOException { .startObject("stars") .field("type", "integer") .endObject() + .startObject("location") + .field("type", "geo_point") + .endObject() .endObject() .endObject(); } @@ -104,6 +107,7 @@ protected void createReviewsIndex() throws IOException { min = 10 + (i % 49); } int sec = 10 + (i % 49); + String location = (user + 10) + "," + (user + 15); String date_string = "2017-01-" + day + "T" + hour + ":" + min + ":" + sec + "Z"; bulk.append("{\"user_id\":\"") @@ -114,7 +118,9 @@ protected void createReviewsIndex() throws IOException { .append(business) .append("\",\"stars\":") .append(stars) - .append(",\"timestamp\":\"") + .append(",\"location\":\"") + .append(location) + .append("\",\"timestamp\":\"") .append(date_string) .append("\"}\n"); diff --git a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/pivot/AggregationResultUtils.java b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/pivot/AggregationResultUtils.java index 8c4fa96a144ec..f8857591b2322 100644 --- a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/pivot/AggregationResultUtils.java +++ b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/pivot/AggregationResultUtils.java @@ -13,6 +13,7 @@ import org.elasticsearch.search.aggregations.AggregationBuilder; import org.elasticsearch.search.aggregations.PipelineAggregationBuilder; import org.elasticsearch.search.aggregations.bucket.composite.CompositeAggregation; +import org.elasticsearch.search.aggregations.metrics.GeoCentroid; import org.elasticsearch.search.aggregations.metrics.NumericMetricsAggregation; import org.elasticsearch.search.aggregations.metrics.NumericMetricsAggregation.SingleValue; import org.elasticsearch.search.aggregations.metrics.ScriptedMetric; @@ -84,6 +85,8 @@ public static Stream> extractCompositeAggregationResults(Com } } else if (aggResult instanceof ScriptedMetric) { updateDocument(document, aggName, ((ScriptedMetric) aggResult).aggregation()); + } else if (aggResult instanceof GeoCentroid) { + updateDocument(document, aggName, ((GeoCentroid) aggResult).centroid().toString()); } else { // Execution should never reach this point! // Creating transforms with unsupported aggregations shall not be possible diff --git a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/pivot/Aggregations.java b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/pivot/Aggregations.java index e7257c463ce7d..615c9b2e8d2e6 100644 --- a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/pivot/Aggregations.java +++ b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/pivot/Aggregations.java @@ -35,6 +35,7 @@ enum AggregationType { MAX("max", SOURCE), MIN("min", SOURCE), SUM("sum", SOURCE), + GEO_CENTROID("geo_centroid", "geo_point"), SCRIPTED_METRIC("scripted_metric", DYNAMIC), BUCKET_SCRIPT("bucket_script", DYNAMIC); diff --git a/x-pack/plugin/data-frame/src/test/java/org/elasticsearch/xpack/dataframe/transforms/pivot/AggregationsTests.java b/x-pack/plugin/data-frame/src/test/java/org/elasticsearch/xpack/dataframe/transforms/pivot/AggregationsTests.java index 5fb8463ae5412..8443699430a2a 100644 --- a/x-pack/plugin/data-frame/src/test/java/org/elasticsearch/xpack/dataframe/transforms/pivot/AggregationsTests.java +++ b/x-pack/plugin/data-frame/src/test/java/org/elasticsearch/xpack/dataframe/transforms/pivot/AggregationsTests.java @@ -38,11 +38,15 @@ public void testResolveTargetMapping() { assertEquals("double", Aggregations.resolveTargetMapping("sum", "double")); assertEquals("half_float", Aggregations.resolveTargetMapping("sum", "half_float")); + // geo_centroid + assertEquals("geo_point", Aggregations.resolveTargetMapping("geo_centroid", "geo_point")); + assertEquals("geo_point", Aggregations.resolveTargetMapping("geo_centroid", null)); + // scripted_metric assertEquals("_dynamic", Aggregations.resolveTargetMapping("scripted_metric", null)); assertEquals("_dynamic", Aggregations.resolveTargetMapping("scripted_metric", "int")); - // scripted_metric + // bucket_script assertEquals("_dynamic", Aggregations.resolveTargetMapping("bucket_script", null)); assertEquals("_dynamic", Aggregations.resolveTargetMapping("bucket_script", "int")); }