From f5b7aad8b7c2ba3bf04633391faebaccf4d7dfe1 Mon Sep 17 00:00:00 2001 From: Przemko Robakowski Date: Tue, 23 Mar 2021 14:34:32 +0100 Subject: [PATCH] Add stats endpoint to GeoIpDownloader (#70282) This change adds _geoip/stats endpoint that can be used to collect basic data about geoip downloader (successful, failed and skipped downloads, current db count and total time spent downloading). It also fixes missing/wrong origins for clients that will break if used with security. Relates to #68920 --- client/rest-high-level/build.gradle | 5 + .../client/GeoIpStatsResponse.java | 222 ++++++++++++++++++ .../elasticsearch/client/IngestClient.java | 59 +++-- .../client/IngestRequestConverters.java | 5 + .../GeoIpStatsResponseTests.java | 44 ++++ .../elasticsearch/client/IngestClientIT.java | 14 +- modules/ingest-geoip/build.gradle | 5 + .../ingest/geoip/GeoIpDownloaderStatsIT.java | 106 +++++++++ .../ingest/geoip/DatabaseRegistry.java | 37 ++- .../ingest/geoip/GeoIpDownloader.java | 17 +- .../geoip/GeoIpDownloaderTaskExecutor.java | 16 +- .../ingest/geoip/IngestGeoIpPlugin.java | 48 +++- .../geoip/stats/GeoIpDownloaderStats.java | 156 ++++++++++++ .../stats/GeoIpDownloaderStatsAction.java | 194 +++++++++++++++ .../GeoIpDownloaderStatsTransportAction.java | 70 ++++++ .../stats/RestGeoIpDownloaderStatsAction.java | 37 +++ ...atsActionNodeResponseSerializingTests.java | 38 +++ ...erStatsActionResponseSerializingTests.java | 32 +++ .../GeoIpDownloaderStatsSerializingTests.java | 50 ++++ .../test/ingest_geoip/30_geoip_stats.yml | 10 + .../api/ingest.geo_ip_stats.json | 25 ++ 21 files changed, 1146 insertions(+), 44 deletions(-) create mode 100644 client/rest-high-level/src/main/java/org/elasticsearch/client/GeoIpStatsResponse.java create mode 100644 client/rest-high-level/src/test/java/org/elasticsearch/GeoIpStatsResponseTests.java create mode 100644 modules/ingest-geoip/src/internalClusterTest/java/org/elasticsearch/ingest/geoip/GeoIpDownloaderStatsIT.java create mode 100644 modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/stats/GeoIpDownloaderStats.java create mode 100644 modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/stats/GeoIpDownloaderStatsAction.java create mode 100644 modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/stats/GeoIpDownloaderStatsTransportAction.java create mode 100644 modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/stats/RestGeoIpDownloaderStatsAction.java create mode 100644 modules/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/stats/GeoIpDownloaderStatsActionNodeResponseSerializingTests.java create mode 100644 modules/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/stats/GeoIpDownloaderStatsActionResponseSerializingTests.java create mode 100644 modules/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/stats/GeoIpDownloaderStatsSerializingTests.java create mode 100644 modules/ingest-geoip/src/yamlRestTest/resources/rest-api-spec/test/ingest_geoip/30_geoip_stats.yml create mode 100644 rest-api-spec/src/main/resources/rest-api-spec/api/ingest.geo_ip_stats.json diff --git a/client/rest-high-level/build.gradle b/client/rest-high-level/build.gradle index 6872f6e7da894..252abd51c8626 100644 --- a/client/rest-high-level/build.gradle +++ b/client/rest-high-level/build.gradle @@ -50,6 +50,9 @@ dependencies { testImplementation(project(':x-pack:plugin:core')) { exclude group: 'org.elasticsearch', module: 'elasticsearch-rest-high-level-client' } + testImplementation(project(':modules:ingest-geoip')) { + exclude group: 'com.fasterxml.jackson.core', module: 'jackson-annotations' + } testImplementation(project(':x-pack:plugin:eql')) } @@ -84,6 +87,8 @@ tasks.named("check").configure { testClusters.all { testDistribution = 'DEFAULT' systemProperty 'es.scripting.update.ctx_in_params', 'false' + systemProperty 'es.geoip_v2_feature_flag_enabled', 'true' + setting 'geoip.downloader.enabled', 'false' setting 'reindex.remote.whitelist', '[ "[::1]:*", "127.0.0.1:*" ]' setting 'xpack.license.self_generated.type', 'trial' setting 'xpack.security.enabled', 'true' diff --git a/client/rest-high-level/src/main/java/org/elasticsearch/client/GeoIpStatsResponse.java b/client/rest-high-level/src/main/java/org/elasticsearch/client/GeoIpStatsResponse.java new file mode 100644 index 0000000000000..fad22de5d6800 --- /dev/null +++ b/client/rest-high-level/src/main/java/org/elasticsearch/client/GeoIpStatsResponse.java @@ -0,0 +1,222 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.client; + +import org.elasticsearch.common.ParseField; +import org.elasticsearch.common.collect.Tuple; +import org.elasticsearch.common.xcontent.ConstructingObjectParser; +import org.elasticsearch.common.xcontent.ToXContentObject; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentParser; + +import java.io.IOException; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.function.Function; +import java.util.stream.Collectors; + +import static org.elasticsearch.common.xcontent.ConstructingObjectParser.constructorArg; +import static org.elasticsearch.common.xcontent.ConstructingObjectParser.optionalConstructorArg; + +public class GeoIpStatsResponse implements ToXContentObject { + + @SuppressWarnings("unchecked") + private static final ConstructingObjectParser PARSER = new ConstructingObjectParser<>("geoip_stats", a -> { + Map stats = (Map) a[0]; + List> nodes = (List>) a[1]; + + return new GeoIpStatsResponse((int) stats.get("successful_downloads"), (int) stats.get("failed_downloads"), + ((Number) stats.get("total_download_time")).longValue(), (int) stats.get("databases_count"), (int) stats.get("skipped_updates"), + nodes.stream().collect(Collectors.toMap(Tuple::v1, Tuple::v2))); + }); + + static { + PARSER.declareObject(constructorArg(), (p, c) -> p.map(), new ParseField("stats")); + PARSER.declareNamedObjects(constructorArg(), (p, c, name) -> Tuple.tuple(name, NodeInfo.PARSER.apply(p, c)), + new ParseField("nodes")); + } + + private final int successfulDownloads; + private final int failedDownloads; + private final long totalDownloadTime; + private final int databasesCount; + private final int skippedDownloads; + private final Map nodes; + + public GeoIpStatsResponse(int successfulDownloads, int failedDownloads, long totalDownloadTime, int databasesCount, + int skippedDownloads, Map nodes) { + this.successfulDownloads = successfulDownloads; + this.failedDownloads = failedDownloads; + this.totalDownloadTime = totalDownloadTime; + this.databasesCount = databasesCount; + this.skippedDownloads = skippedDownloads; + this.nodes = nodes; + } + + public int getSuccessfulDownloads() { + return successfulDownloads; + } + + public int getFailedDownloads() { + return failedDownloads; + } + + public long getTotalDownloadTime() { + return totalDownloadTime; + } + + public int getDatabasesCount() { + return databasesCount; + } + + public int getSkippedDownloads() { + return skippedDownloads; + } + + public Map getNodes() { + return nodes; + } + + public static GeoIpStatsResponse fromXContent(XContentParser parser) { + return PARSER.apply(parser, null); + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + GeoIpStatsResponse that = (GeoIpStatsResponse) o; + return successfulDownloads == that.successfulDownloads + && failedDownloads == that.failedDownloads + && totalDownloadTime == that.totalDownloadTime + && databasesCount == that.databasesCount + && skippedDownloads == that.skippedDownloads + && nodes.equals(that.nodes); + } + + @Override + public int hashCode() { + return Objects.hash(successfulDownloads, failedDownloads, totalDownloadTime, databasesCount, skippedDownloads, nodes); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(); + builder.startObject("stats"); + { + builder.field("successful_downloads", successfulDownloads); + builder.field("failed_downloads", failedDownloads); + builder.field("skipped_updates", skippedDownloads); + builder.field("total_download_time", totalDownloadTime); + builder.field("databases_count", databasesCount); + } + builder.endObject(); + builder.field("nodes", nodes); + builder.endObject(); + return builder; + } + + public static final class NodeInfo implements ToXContentObject { + @SuppressWarnings("unchecked") + private static final ConstructingObjectParser PARSER = new ConstructingObjectParser<>("node_info", a -> { + List databases = (List) a[1]; + return new NodeInfo((Collection) a[0], databases.stream().collect(Collectors.toMap(DatabaseInfo::getName, + Function.identity()))); + }); + + static { + PARSER.declareStringArray(optionalConstructorArg(), new ParseField("files_in_temp")); + PARSER.declareObjectArray(optionalConstructorArg(), DatabaseInfo.PARSER, new ParseField("databases")); + } + + private final List filesInTemp; + private final Map databases; + + public NodeInfo(Collection filesInTemp, Map databases) { + this.filesInTemp = List.copyOf(filesInTemp); + this.databases = Map.copyOf(databases); + } + + public List getFilesInTemp() { + return filesInTemp; + } + + public Map getDatabases() { + return databases; + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(); + builder.field("files_in_temp", filesInTemp); + builder.field("databases", databases.entrySet().stream() + .sorted(Map.Entry.comparingByKey()) + .map(Map.Entry::getValue) + .collect(Collectors.toList())); + builder.endObject(); + return builder; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + NodeInfo nodeInfo = (NodeInfo) o; + return filesInTemp.equals(nodeInfo.filesInTemp) && databases.equals(nodeInfo.databases); + } + + @Override + public int hashCode() { + return Objects.hash(filesInTemp, databases); + } + } + + public static final class DatabaseInfo implements ToXContentObject { + + private static final ConstructingObjectParser PARSER = new ConstructingObjectParser<>("database_info", + a -> new DatabaseInfo((String) a[0])); + + static { + PARSER.declareString(constructorArg(), new ParseField("name")); + } + + private final String name; + + public DatabaseInfo(String name) { + this.name = name; + } + + public String getName() { + return name; + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(); + builder.field("name", name); + builder.endObject(); + return builder; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + DatabaseInfo that = (DatabaseInfo) o; + return name.equals(that.name); + } + + @Override + public int hashCode() { + return Objects.hash(name); + } + } +} diff --git a/client/rest-high-level/src/main/java/org/elasticsearch/client/IngestClient.java b/client/rest-high-level/src/main/java/org/elasticsearch/client/IngestClient.java index a5a00fb3edc4f..9dbf3f7f8f072 100644 --- a/client/rest-high-level/src/main/java/org/elasticsearch/client/IngestClient.java +++ b/client/rest-high-level/src/main/java/org/elasticsearch/client/IngestClient.java @@ -16,6 +16,7 @@ import org.elasticsearch.action.ingest.SimulatePipelineRequest; import org.elasticsearch.action.ingest.SimulatePipelineResponse; import org.elasticsearch.action.support.master.AcknowledgedResponse; +import org.elasticsearch.client.core.MainRequest; import java.io.IOException; import java.util.Collections; @@ -39,13 +40,14 @@ public final class IngestClient { * Add a pipeline or update an existing pipeline. * See * Put Pipeline API on elastic.co + * * @param request the request * @param options the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized * @return the response * @throws IOException in case there is a problem sending the request or parsing back the response */ public AcknowledgedResponse putPipeline(PutPipelineRequest request, RequestOptions options) throws IOException { - return restHighLevelClient.performRequestAndParseEntity( request, IngestRequestConverters::putPipeline, options, + return restHighLevelClient.performRequestAndParseEntity(request, IngestRequestConverters::putPipeline, options, AcknowledgedResponse::fromXContent, emptySet()); } @@ -53,13 +55,14 @@ public AcknowledgedResponse putPipeline(PutPipelineRequest request, RequestOptio * Asynchronously add a pipeline or update an existing pipeline. * See * Put Pipeline API on elastic.co - * @param request the request - * @param options the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized + * + * @param request the request + * @param options the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized * @param listener the listener to be notified upon request completion * @return cancellable that may be used to cancel the request */ public Cancellable putPipelineAsync(PutPipelineRequest request, RequestOptions options, ActionListener listener) { - return restHighLevelClient.performRequestAsyncAndParseEntity( request, IngestRequestConverters::putPipeline, options, + return restHighLevelClient.performRequestAsyncAndParseEntity(request, IngestRequestConverters::putPipeline, options, AcknowledgedResponse::fromXContent, listener, emptySet()); } @@ -67,13 +70,14 @@ public Cancellable putPipelineAsync(PutPipelineRequest request, RequestOptions o * Get an existing pipeline. * See * Get Pipeline API on elastic.co + * * @param request the request * @param options the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized * @return the response * @throws IOException in case there is a problem sending the request or parsing back the response */ public GetPipelineResponse getPipeline(GetPipelineRequest request, RequestOptions options) throws IOException { - return restHighLevelClient.performRequestAndParseEntity( request, IngestRequestConverters::getPipeline, options, + return restHighLevelClient.performRequestAndParseEntity(request, IngestRequestConverters::getPipeline, options, GetPipelineResponse::fromXContent, Collections.singleton(404)); } @@ -81,13 +85,14 @@ public GetPipelineResponse getPipeline(GetPipelineRequest request, RequestOption * Asynchronously get an existing pipeline. * See * Get Pipeline API on elastic.co - * @param request the request - * @param options the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized + * + * @param request the request + * @param options the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized * @param listener the listener to be notified upon request completion * @return cancellable that may be used to cancel the request */ public Cancellable getPipelineAsync(GetPipelineRequest request, RequestOptions options, ActionListener listener) { - return restHighLevelClient.performRequestAsyncAndParseEntity( request, IngestRequestConverters::getPipeline, options, + return restHighLevelClient.performRequestAsyncAndParseEntity(request, IngestRequestConverters::getPipeline, options, GetPipelineResponse::fromXContent, listener, Collections.singleton(404)); } @@ -95,14 +100,15 @@ public Cancellable getPipelineAsync(GetPipelineRequest request, RequestOptions o * Delete an existing pipeline. * See * - * Delete Pipeline API on elastic.co + * Delete Pipeline API on elastic.co + * * @param request the request * @param options the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized * @return the response * @throws IOException in case there is a problem sending the request or parsing back the response */ public AcknowledgedResponse deletePipeline(DeletePipelineRequest request, RequestOptions options) throws IOException { - return restHighLevelClient.performRequestAndParseEntity( request, IngestRequestConverters::deletePipeline, options, + return restHighLevelClient.performRequestAndParseEntity(request, IngestRequestConverters::deletePipeline, options, AcknowledgedResponse::fromXContent, emptySet()); } @@ -110,15 +116,16 @@ public AcknowledgedResponse deletePipeline(DeletePipelineRequest request, Reques * Asynchronously delete an existing pipeline. * See * - * Delete Pipeline API on elastic.co - * @param request the request - * @param options the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized + * Delete Pipeline API on elastic.co + * + * @param request the request + * @param options the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized * @param listener the listener to be notified upon request completion * @return cancellable that may be used to cancel the request */ public Cancellable deletePipelineAsync(DeletePipelineRequest request, RequestOptions options, ActionListener listener) { - return restHighLevelClient.performRequestAsyncAndParseEntity( request, + return restHighLevelClient.performRequestAsyncAndParseEntity(request, IngestRequestConverters::deletePipeline, options, AcknowledgedResponse::fromXContent, listener, emptySet()); } @@ -128,14 +135,15 @@ public Cancellable deletePipelineAsync(DeletePipelineRequest request, RequestOpt *

* See * - * Simulate Pipeline API on elastic.co + * Simulate Pipeline API on elastic.co + * * @param request the request * @param options the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized * @return the response * @throws IOException in case there is a problem sending the request or parsing back the response */ public SimulatePipelineResponse simulate(SimulatePipelineRequest request, RequestOptions options) throws IOException { - return restHighLevelClient.performRequestAndParseEntity( request, IngestRequestConverters::simulatePipeline, options, + return restHighLevelClient.performRequestAndParseEntity(request, IngestRequestConverters::simulatePipeline, options, SimulatePipelineResponse::fromXContent, emptySet()); } @@ -144,16 +152,27 @@ public SimulatePipelineResponse simulate(SimulatePipelineRequest request, Reques *

* See * - * Simulate Pipeline API on elastic.co - * @param request the request - * @param options the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized + * Simulate Pipeline API on elastic.co + * + * @param request the request + * @param options the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized * @param listener the listener to be notified upon request completion * @return cancellable that may be used to cancel the request */ public Cancellable simulateAsync(SimulatePipelineRequest request, RequestOptions options, ActionListener listener) { - return restHighLevelClient.performRequestAsyncAndParseEntity( request, IngestRequestConverters::simulatePipeline, options, + return restHighLevelClient.performRequestAsyncAndParseEntity(request, IngestRequestConverters::simulatePipeline, options, SimulatePipelineResponse::fromXContent, listener, emptySet()); } + + public GeoIpStatsResponse geoIpStats(MainRequest request, RequestOptions options) throws IOException { + return restHighLevelClient.performRequestAndParseEntity(new MainRequest(), IngestRequestConverters::geoIpStats, options, + GeoIpStatsResponse::fromXContent, emptySet()); + } + + public Cancellable geoIpStatsAsync(MainRequest request, RequestOptions options, ActionListener listener) { + return restHighLevelClient.performRequestAsyncAndParseEntity(request, IngestRequestConverters::geoIpStats, options, + GeoIpStatsResponse::fromXContent, listener, emptySet()); + } } diff --git a/client/rest-high-level/src/main/java/org/elasticsearch/client/IngestRequestConverters.java b/client/rest-high-level/src/main/java/org/elasticsearch/client/IngestRequestConverters.java index 25a59270202f9..fc4ca502a0d3a 100644 --- a/client/rest-high-level/src/main/java/org/elasticsearch/client/IngestRequestConverters.java +++ b/client/rest-high-level/src/main/java/org/elasticsearch/client/IngestRequestConverters.java @@ -16,6 +16,7 @@ import org.elasticsearch.action.ingest.GetPipelineRequest; import org.elasticsearch.action.ingest.PutPipelineRequest; import org.elasticsearch.action.ingest.SimulatePipelineRequest; +import org.elasticsearch.client.core.MainRequest; import java.io.IOException; @@ -79,4 +80,8 @@ static Request simulatePipeline(SimulatePipelineRequest simulatePipelineRequest) request.setEntity(RequestConverters.createEntity(simulatePipelineRequest, RequestConverters.REQUEST_BODY_CONTENT_TYPE)); return request; } + + static Request geoIpStats(MainRequest ignore) { + return new Request("GET", "_ingest/geoip/stats"); + } } diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/GeoIpStatsResponseTests.java b/client/rest-high-level/src/test/java/org/elasticsearch/GeoIpStatsResponseTests.java new file mode 100644 index 0000000000000..48e25c935220f --- /dev/null +++ b/client/rest-high-level/src/test/java/org/elasticsearch/GeoIpStatsResponseTests.java @@ -0,0 +1,44 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch; + +import org.elasticsearch.client.GeoIpStatsResponse; +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.test.AbstractXContentTestCase; + +import java.io.IOException; +import java.util.HashMap; +import java.util.List; +import java.util.stream.Collectors; + +public class GeoIpStatsResponseTests extends AbstractXContentTestCase { + + @Override + protected GeoIpStatsResponse createTestInstance() { + HashMap nodes = new HashMap<>(); + int nodeCount = randomInt(10); + for (int i = 0; i < nodeCount; i++) { + List databases = randomList(5, + () -> new GeoIpStatsResponse.DatabaseInfo(randomAlphaOfLength(5))); + nodes.put(randomAlphaOfLength(5), new GeoIpStatsResponse.NodeInfo(randomList(5, () -> randomAlphaOfLength(5)), + databases.stream().collect(Collectors.toMap(GeoIpStatsResponse.DatabaseInfo::getName, d -> d)))); + } + return new GeoIpStatsResponse(randomInt(), randomInt(), randomNonNegativeLong(), randomInt(), randomInt(), nodes); + } + + @Override + protected GeoIpStatsResponse doParseInstance(XContentParser parser) throws IOException { + return GeoIpStatsResponse.fromXContent(parser); + } + + @Override + protected boolean supportsUnknownFields() { + return false; + } +} diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/IngestClientIT.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/IngestClientIT.java index 1cb1ab8860899..48e5e3f1e05f6 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/IngestClientIT.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/IngestClientIT.java @@ -18,6 +18,7 @@ import org.elasticsearch.action.ingest.SimulatePipelineRequest; import org.elasticsearch.action.ingest.SimulatePipelineResponse; import org.elasticsearch.action.support.master.AcknowledgedResponse; +import org.elasticsearch.client.core.MainRequest; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentType; @@ -158,7 +159,7 @@ private void testSimulatePipeline(boolean isVerbose, } } else { assertThat(results.get(0), instanceOf(SimulateDocumentBaseResult.class)); - SimulateDocumentBaseResult baseResult = (SimulateDocumentBaseResult)results.get(0); + SimulateDocumentBaseResult baseResult = (SimulateDocumentBaseResult) results.get(0); if (isFailure) { assertNotNull(baseResult.getFailure()); assertThat(baseResult.getFailure().getMessage(), @@ -177,4 +178,15 @@ private void testSimulatePipeline(boolean isVerbose, } } } + + public void testGeoIpStats() throws IOException { + GeoIpStatsResponse response = execute(new MainRequest(), highLevelClient().ingest()::geoIpStats, + highLevelClient().ingest()::geoIpStatsAsync); + assertEquals(0, response.getDatabasesCount()); + assertEquals(0, response.getSkippedDownloads()); + assertEquals(0, response.getSuccessfulDownloads()); + assertEquals(0, response.getFailedDownloads()); + assertEquals(0, response.getTotalDownloadTime()); + assertEquals(0, response.getNodes().size()); + } } diff --git a/modules/ingest-geoip/build.gradle b/modules/ingest-geoip/build.gradle index 8a4b902d027f7..3152e0c4c904a 100644 --- a/modules/ingest-geoip/build.gradle +++ b/modules/ingest-geoip/build.gradle @@ -57,6 +57,11 @@ tasks.named("internalClusterTest").configure { } } +testClusters.all { + systemProperty "es.geoip_v2_feature_flag_enabled", "true" + setting "geoip.downloader.enabled", "false" +} + tasks.register("copyDefaultGeoIp2DatabaseFiles", Copy) { from { zipTree(configurations.testCompileClasspath.files.find { it.name.contains('geolite2-databases') }) } into "${project.buildDir}/ingest-geoip" diff --git a/modules/ingest-geoip/src/internalClusterTest/java/org/elasticsearch/ingest/geoip/GeoIpDownloaderStatsIT.java b/modules/ingest-geoip/src/internalClusterTest/java/org/elasticsearch/ingest/geoip/GeoIpDownloaderStatsIT.java new file mode 100644 index 0000000000000..12a20a5be139e --- /dev/null +++ b/modules/ingest-geoip/src/internalClusterTest/java/org/elasticsearch/ingest/geoip/GeoIpDownloaderStatsIT.java @@ -0,0 +1,106 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.ingest.geoip; + +import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsResponse; +import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.xcontent.ToXContent; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentFactory; +import org.elasticsearch.common.xcontent.XContentHelper; +import org.elasticsearch.index.reindex.ReindexPlugin; +import org.elasticsearch.ingest.geoip.stats.GeoIpDownloaderStatsAction; +import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.test.ESIntegTestCase; +import org.elasticsearch.test.XContentTestUtils; +import org.junit.After; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +import static org.elasticsearch.common.xcontent.ToXContent.EMPTY_PARAMS; +import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThan; +import static org.hamcrest.Matchers.hasSize; + +@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, maxNumDataNodes = 1) +public class GeoIpDownloaderStatsIT extends AbstractGeoIpIT { + + private static final String ENDPOINT = System.getProperty("geoip_endpoint"); + + @Override + protected Collection> nodePlugins() { + return Arrays.asList(ReindexPlugin.class, IngestGeoIpPlugin.class, GeoIpProcessorNonIngestNodeIT.IngestGeoIpSettingsPlugin.class); + } + + @Override + protected Settings nodeSettings(int nodeOrdinal) { + Settings.Builder settings = Settings.builder().put(super.nodeSettings(nodeOrdinal)); + if (ENDPOINT != null) { + settings.put(GeoIpDownloader.ENDPOINT_SETTING.getKey(), ENDPOINT); + } + return settings.build(); + } + + @After + public void disableDownloader() { + ClusterUpdateSettingsResponse settingsResponse = client().admin().cluster() + .prepareUpdateSettings() + .setPersistentSettings(Settings.builder().put(GeoIpDownloaderTaskExecutor.ENABLED_SETTING.getKey(), (String) null)) + .get(); + assertTrue(settingsResponse.isAcknowledged()); + } + + public void testStats() throws Exception { + GeoIpDownloaderStatsAction.Request req = new GeoIpDownloaderStatsAction.Request(); + GeoIpDownloaderStatsAction.Response response = client().execute(GeoIpDownloaderStatsAction.INSTANCE, req).actionGet(); + XContentTestUtils.JsonMapView jsonMapView = new XContentTestUtils.JsonMapView(convertToMap(response)); + assertThat(jsonMapView.get("stats.successful_downloads"), equalTo(0)); + assertThat(jsonMapView.get("stats.failed_downloads"), equalTo(0)); + assertThat(jsonMapView.get("stats.skipped_updates"), equalTo(0)); + assertThat(jsonMapView.get("stats.databases_count"), equalTo(0)); + assertThat(jsonMapView.get("stats.total_download_time"), equalTo(0)); + assertEquals(0, jsonMapView.>get("nodes").size()); + + + ClusterUpdateSettingsResponse settingsResponse = client().admin().cluster() + .prepareUpdateSettings() + .setPersistentSettings(Settings.builder().put(GeoIpDownloaderTaskExecutor.ENABLED_SETTING.getKey(), true)) + .get(); + assertTrue(settingsResponse.isAcknowledged()); + + assertBusy(() -> { + GeoIpDownloaderStatsAction.Response res = client().execute(GeoIpDownloaderStatsAction.INSTANCE, req).actionGet(); + XContentTestUtils.JsonMapView view = new XContentTestUtils.JsonMapView(convertToMap(res)); + assertThat(view.get("stats.successful_downloads"), equalTo(3)); + assertThat(view.get("stats.failed_downloads"), equalTo(0)); + assertThat(view.get("stats.skipped_updates"), equalTo(0)); + assertThat(view.get("stats.databases_count"), equalTo(3)); + assertThat(view.get("stats.total_download_time"), greaterThan(0)); + Map>>> nodes = view.get("nodes"); + assertThat(nodes.values(), hasSize(greaterThan(0))); + for (Map>> value : nodes.values()) { + assertThat(value.get("databases").stream().map(m -> m.get("name")).collect(Collectors.toSet()), + containsInAnyOrder("GeoLite2-City.mmdb", "GeoLite2-ASN.mmdb", "GeoLite2-Country.mmdb")); + } + }); + } + + public static Map convertToMap(ToXContent part) throws IOException { + XContentBuilder builder = XContentFactory.jsonBuilder(); + part.toXContent(builder, EMPTY_PARAMS); + return XContentHelper.convertToMap(BytesReference.bytes(builder), false, builder.contentType()).v2(); + } +} diff --git a/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/DatabaseRegistry.java b/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/DatabaseRegistry.java index c8fa5e1a7af96..69a9dcac852fe 100644 --- a/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/DatabaseRegistry.java +++ b/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/DatabaseRegistry.java @@ -32,6 +32,7 @@ import java.io.Closeable; import java.io.IOException; +import java.io.UncheckedIOException; import java.nio.file.FileAlreadyExistsException; import java.nio.file.FileVisitResult; import java.nio.file.FileVisitor; @@ -47,31 +48,34 @@ import java.util.List; import java.util.Locale; import java.util.Objects; +import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.function.Consumer; +import java.util.stream.Collectors; +import java.util.stream.Stream; import java.util.zip.GZIPInputStream; /** * A component that is responsible for making the databases maintained by {@link GeoIpDownloader} * available for ingest processors. - * + *

* Also provided a lookup mechanism for geoip processors with fallback to {@link LocalDatabases}. * All databases are downloaded into a geoip tmp directory, which is created at node startup. - * + *

* The following high level steps are executed after each cluster state update: * 1) Check which databases are available in {@link GeoIpTaskState}, - * which is part of the geoip downloader persistent task. + * which is part of the geoip downloader persistent task. * 2) For each database check whether the databases have changed - * by comparing the local and remote md5 hash or are locally missing. + * by comparing the local and remote md5 hash or are locally missing. * 3) For each database identified in step 2 start downloading the database - * chunks. Each chunks is appended to a tmp file (inside geoip tmp dir) and - * after all chunks have been downloaded, the database is uncompressed and - * renamed to the final filename.After this the database is loaded and - * if there is an old instance of this database then that is closed. + * chunks. Each chunks is appended to a tmp file (inside geoip tmp dir) and + * after all chunks have been downloaded, the database is uncompressed and + * renamed to the final filename.After this the database is loaded and + * if there is an old instance of this database then that is closed. * 4) Cleanup locally loaded databases that are no longer mentioned in {@link GeoIpTaskState}. */ -final class DatabaseRegistry implements Closeable { +public final class DatabaseRegistry implements Closeable { private static final Logger LOGGER = LogManager.getLogger(DatabaseRegistry.class); @@ -87,7 +91,7 @@ final class DatabaseRegistry implements Closeable { DatabaseRegistry(Environment environment, Client client, GeoIpCache cache, Consumer genericExecutor) { this( environment.tmpFile(), - new OriginSettingClient(client, "geoip"), + new OriginSettingClient(client, IngestService.INGEST_ORIGIN), cache, new LocalDatabases(environment, cache), genericExecutor @@ -128,7 +132,7 @@ public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) { @Override public FileVisitResult visitFileFailed(Path file, IOException e) { - if(e instanceof NoSuchFileException == false) { + if (e instanceof NoSuchFileException == false) { LOGGER.warn("can't delete stale file [" + file + "]", e); } return FileVisitResult.CONTINUE; @@ -353,4 +357,15 @@ static void decompress(Path source, Path target) throws IOException { } } + public Set getAvailableDatabases() { + return Set.copyOf(databases.keySet()); + } + + public Set getFilesInTemp() { + try (Stream files = Files.list(geoipTmpDirectory)) { + return files.map(Path::getFileName).map(Path::toString).collect(Collectors.toSet()); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } } diff --git a/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/GeoIpDownloader.java b/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/GeoIpDownloader.java index 528c02bdf2973..851fbacc83548 100644 --- a/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/GeoIpDownloader.java +++ b/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/GeoIpDownloader.java @@ -15,6 +15,7 @@ import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.action.support.WriteRequest; import org.elasticsearch.client.Client; +import org.elasticsearch.client.OriginSettingClient; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.hash.MessageDigests; import org.elasticsearch.common.settings.Setting; @@ -30,7 +31,9 @@ import org.elasticsearch.index.query.RangeQueryBuilder; import org.elasticsearch.index.reindex.DeleteByQueryAction; import org.elasticsearch.index.reindex.DeleteByQueryRequest; +import org.elasticsearch.ingest.IngestService; import org.elasticsearch.ingest.geoip.GeoIpTaskState.Metadata; +import org.elasticsearch.ingest.geoip.stats.GeoIpDownloaderStats; import org.elasticsearch.persistent.AllocatedPersistentTask; import org.elasticsearch.persistent.PersistentTasksCustomMetadata.PersistentTask; import org.elasticsearch.tasks.TaskId; @@ -51,7 +54,7 @@ * Downloads are verified against MD5 checksum provided by the server * Current state of all stored databases is stored in cluster state in persistent task state */ -class GeoIpDownloader extends AllocatedPersistentTask { +public class GeoIpDownloader extends AllocatedPersistentTask { private static final Logger logger = LogManager.getLogger(GeoIpDownloader.class); @@ -75,13 +78,14 @@ class GeoIpDownloader extends AllocatedPersistentTask { protected volatile GeoIpTaskState state; private volatile TimeValue pollInterval; private volatile Scheduler.ScheduledCancellable scheduled; + private volatile GeoIpDownloaderStats stats = GeoIpDownloaderStats.EMPTY; GeoIpDownloader(Client client, HttpClient httpClient, ClusterService clusterService, ThreadPool threadPool, Settings settings, long id, String type, String action, String description, TaskId parentTask, Map headers) { super(id, type, action, description, parentTask, headers); this.httpClient = httpClient; - this.client = client; + this.client = new OriginSettingClient(client, IngestService.INGEST_ORIGIN); this.threadPool = threadPool; endpoint = ENDPOINT_SETTING.get(settings); pollInterval = POLL_INTERVAL_SETTING.get(settings); @@ -125,16 +129,19 @@ void processDatabase(Map databaseInfo) { } logger.info("updating geoip database [" + name + "]"); String url = databaseInfo.get("url").toString(); + long start = System.currentTimeMillis(); try (InputStream is = httpClient.get(url)) { int firstChunk = state.contains(name) ? state.get(name).getLastChunk() + 1 : 0; int lastChunk = indexChunks(name, is, firstChunk, md5); if (lastChunk > firstChunk) { state = state.put(name, new Metadata(System.currentTimeMillis(), firstChunk, lastChunk - 1, md5)); updateTaskState(); + stats = stats.successfulDownload(System.currentTimeMillis() - start).count(state.getDatabases().size()); logger.info("updated geoip database [" + name + "]"); deleteOldChunks(name, firstChunk); } } catch (Exception e) { + stats = stats.failedDownload(); logger.error("error updating geoip database [" + name + "]", e); } } @@ -155,6 +162,7 @@ void deleteOldChunks(String name, int firstChunk) { protected void updateTimestamp(String name, Metadata old) { logger.info("geoip database [" + name + "] is up to date, updated timestamp"); state = state.put(name, new Metadata(System.currentTimeMillis(), old.getFirstChunk(), old.getLastChunk(), old.getMd5())); + stats = stats.skippedDownload(); updateTaskState(); } @@ -224,6 +232,11 @@ protected void onCancelled() { } } + @Override + public GeoIpDownloaderStats getStatus() { + return isCancelled() || isCompleted() ? null: stats; + } + private void scheduleNextRun(TimeValue time) { scheduled = threadPool.schedule(this::runDownloader, time, ThreadPool.Names.GENERIC); } diff --git a/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/GeoIpDownloaderTaskExecutor.java b/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/GeoIpDownloaderTaskExecutor.java index 6289a2ad36b52..7a28f6800fc73 100644 --- a/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/GeoIpDownloaderTaskExecutor.java +++ b/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/GeoIpDownloaderTaskExecutor.java @@ -27,6 +27,7 @@ import org.elasticsearch.threadpool.ThreadPool; import java.util.Map; +import java.util.concurrent.atomic.AtomicReference; import static org.elasticsearch.ingest.geoip.GeoIpDownloader.GEOIP_DOWNLOADER; import static org.elasticsearch.ingest.geoip.GeoIpDownloader.GEOIP_V2_FEATURE_FLAG_ENABLED; @@ -35,7 +36,7 @@ * Persistent task executor that is responsible for starting {@link GeoIpDownloader} after task is allocated by master node. * Also bootstraps GeoIP download task on clean cluster and handles changes to the 'geoip.downloader.enabled' setting */ -final class GeoIpDownloaderTaskExecutor extends PersistentTasksExecutor implements ClusterStateListener { +public final class GeoIpDownloaderTaskExecutor extends PersistentTasksExecutor implements ClusterStateListener { public static final Setting ENABLED_SETTING = Setting.boolSetting("geoip.downloader.enabled", GEOIP_V2_FEATURE_FLAG_ENABLED, Setting.Property.Dynamic, Setting.Property.NodeScope); @@ -48,15 +49,15 @@ final class GeoIpDownloaderTaskExecutor extends PersistentTasksExecutor currentTask = new AtomicReference<>(); - GeoIpDownloaderTaskExecutor(Client client, HttpClient httpClient, ClusterService clusterService, ThreadPool threadPool, - Settings settings) { + GeoIpDownloaderTaskExecutor(Client client, HttpClient httpClient, ClusterService clusterService, ThreadPool threadPool) { super(GEOIP_DOWNLOADER, ThreadPool.Names.GENERIC); this.client = client; this.httpClient = httpClient; this.clusterService = clusterService; this.threadPool = threadPool; - this.settings = settings; + this.settings = clusterService.getSettings(); persistentTasksService = new PersistentTasksService(clusterService, threadPool, client); if (ENABLED_SETTING.get(settings)) { clusterService.addListener(this); @@ -81,13 +82,14 @@ private void setEnabled(boolean enabled) { @Override protected void nodeOperation(AllocatedPersistentTask task, GeoIpTaskParams params, PersistentTaskState state) { GeoIpDownloader downloader = (GeoIpDownloader) task; + currentTask.set(downloader); GeoIpTaskState geoIpTaskState = state == null ? GeoIpTaskState.EMPTY : (GeoIpTaskState) state; downloader.setState(geoIpTaskState); downloader.runDownloader(); } @Override - protected AllocatedPersistentTask createTask(long id, String type, String action, TaskId parentTaskId, + protected GeoIpDownloader createTask(long id, String type, String action, TaskId parentTaskId, PersistentTasksCustomMetadata.PersistentTask taskInProgress, Map headers) { return new GeoIpDownloader(client, httpClient, clusterService, threadPool, settings, id, type, action, @@ -112,4 +114,8 @@ private void startTask(Runnable onFailure) { } })); } + + public GeoIpDownloader getCurrentTask(){ + return currentTask.get(); + } } diff --git a/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/IngestGeoIpPlugin.java b/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/IngestGeoIpPlugin.java index 5484a61d4d8e6..3d01a881f36e8 100644 --- a/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/IngestGeoIpPlugin.java +++ b/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/IngestGeoIpPlugin.java @@ -10,14 +10,20 @@ import org.apache.lucene.util.SetOnce; import org.elasticsearch.Version; +import org.elasticsearch.action.ActionRequest; +import org.elasticsearch.action.ActionResponse; import org.elasticsearch.client.Client; import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; +import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.ParseField; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; +import org.elasticsearch.common.settings.ClusterSettings; +import org.elasticsearch.common.settings.IndexScopedSettings; import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.settings.SettingsFilter; import org.elasticsearch.common.settings.SettingsModule; import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.common.xcontent.XContentBuilder; @@ -26,15 +32,23 @@ import org.elasticsearch.indices.SystemIndexDescriptor; import org.elasticsearch.ingest.IngestService; import org.elasticsearch.ingest.Processor; +import org.elasticsearch.ingest.geoip.stats.GeoIpDownloaderStats; +import org.elasticsearch.ingest.geoip.stats.GeoIpDownloaderStatsAction; +import org.elasticsearch.ingest.geoip.stats.GeoIpDownloaderStatsTransportAction; +import org.elasticsearch.ingest.geoip.stats.RestGeoIpDownloaderStatsAction; import org.elasticsearch.persistent.PersistentTaskParams; import org.elasticsearch.persistent.PersistentTaskState; import org.elasticsearch.persistent.PersistentTasksExecutor; +import org.elasticsearch.plugins.ActionPlugin; import org.elasticsearch.plugins.IngestPlugin; import org.elasticsearch.plugins.PersistentTaskPlugin; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.plugins.SystemIndexPlugin; import org.elasticsearch.repositories.RepositoriesService; +import org.elasticsearch.rest.RestController; +import org.elasticsearch.rest.RestHandler; import org.elasticsearch.script.ScriptService; +import org.elasticsearch.tasks.Task; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.watcher.ResourceWatcherService; @@ -55,7 +69,7 @@ import static org.elasticsearch.ingest.geoip.GeoIpDownloader.GEOIP_DOWNLOADER; import static org.elasticsearch.ingest.geoip.GeoIpDownloader.GEOIP_V2_FEATURE_FLAG_ENABLED; -public class IngestGeoIpPlugin extends Plugin implements IngestPlugin, SystemIndexPlugin, Closeable, PersistentTaskPlugin { +public class IngestGeoIpPlugin extends Plugin implements IngestPlugin, SystemIndexPlugin, Closeable, PersistentTaskPlugin, ActionPlugin { public static final Setting CACHE_SIZE = Setting.longSetting("ingest.geoip.cache_size", 1000, 0, Setting.Property.NodeScope); @@ -63,6 +77,7 @@ public class IngestGeoIpPlugin extends Plugin implements IngestPlugin, SystemInd private final SetOnce ingestService = new SetOnce<>(); private final SetOnce databaseRegistry = new SetOnce<>(); + private GeoIpDownloaderTaskExecutor geoIpDownloaderTaskExecutor; @Override public List> getSettings() { @@ -104,7 +119,11 @@ public Collection createComponents(Client client, } catch (IOException e) { throw new UncheckedIOException(e); } - return List.of(); + if (GEOIP_V2_FEATURE_FLAG_ENABLED) { + geoIpDownloaderTaskExecutor = new GeoIpDownloaderTaskExecutor(client, new HttpClient(), clusterService, threadPool); + return List.of(databaseRegistry.get(), geoIpDownloaderTaskExecutor); + } + return List.of(databaseRegistry.get()); } @Override @@ -117,13 +136,31 @@ public List> getPersistentTasksExecutor(ClusterServic Client client, SettingsModule settingsModule, IndexNameExpressionResolver expressionResolver) { if (GEOIP_V2_FEATURE_FLAG_ENABLED) { - Settings settings = settingsModule.getSettings(); - return List.of(new GeoIpDownloaderTaskExecutor(client, new HttpClient(), clusterService, threadPool, settings)); + return List.of(geoIpDownloaderTaskExecutor); } else { return List.of(); } } + @Override + public List> getActions() { + if (GEOIP_V2_FEATURE_FLAG_ENABLED) { + return List.of(new ActionHandler<>(GeoIpDownloaderStatsAction.INSTANCE, GeoIpDownloaderStatsTransportAction.class)); + } + return Collections.emptyList(); + } + + @Override + public List getRestHandlers(Settings settings, RestController restController, ClusterSettings clusterSettings, + IndexScopedSettings indexScopedSettings, SettingsFilter settingsFilter, + IndexNameExpressionResolver indexNameExpressionResolver, + Supplier nodesInCluster) { + if (GEOIP_V2_FEATURE_FLAG_ENABLED) { + return List.of(new RestGeoIpDownloaderStatsAction()); + } + return Collections.emptyList(); + } + @Override public List getNamedXContent() { return List.of(new NamedXContentRegistry.Entry(PersistentTaskParams.class, new ParseField(GEOIP_DOWNLOADER), @@ -134,7 +171,8 @@ public List getNamedXContent() { @Override public List getNamedWriteables() { return List.of(new NamedWriteableRegistry.Entry(PersistentTaskState.class, GEOIP_DOWNLOADER, GeoIpTaskState::new), - new NamedWriteableRegistry.Entry(PersistentTaskParams.class, GEOIP_DOWNLOADER, GeoIpTaskParams::new)); + new NamedWriteableRegistry.Entry(PersistentTaskParams.class, GEOIP_DOWNLOADER, GeoIpTaskParams::new), + new NamedWriteableRegistry.Entry(Task.Status.class, GEOIP_DOWNLOADER, GeoIpDownloaderStats::new)); } @Override diff --git a/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/stats/GeoIpDownloaderStats.java b/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/stats/GeoIpDownloaderStats.java new file mode 100644 index 0000000000000..730405ec3a97a --- /dev/null +++ b/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/stats/GeoIpDownloaderStats.java @@ -0,0 +1,156 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.ingest.geoip.stats; + +import org.elasticsearch.common.ParseField; +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.xcontent.ConstructingObjectParser; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.ingest.geoip.GeoIpDownloader; +import org.elasticsearch.tasks.Task; + +import java.io.IOException; +import java.util.Objects; + +public class GeoIpDownloaderStats implements Task.Status { + + public static final GeoIpDownloaderStats EMPTY = new GeoIpDownloaderStats(0, 0, 0, 0, 0); + + public static final ConstructingObjectParser PARSER = new ConstructingObjectParser<>( + "geoip_downloader_stats", a -> new GeoIpDownloaderStats((int) a[0], (int) a[1], (long) a[2], (int) a[3], (int) a[4])); + + private static final ParseField SUCCESSFUL_DOWNLOADS = new ParseField("successful_downloads"); + private static final ParseField FAILED_DOWNLOADS = new ParseField("failed_downloads"); + private static final ParseField TOTAL_DOWNLOAD_TIME = new ParseField("total_download_time"); + private static final ParseField DATABASES_COUNT = new ParseField("databases_count"); + private static final ParseField SKIPPED_DOWNLOADS = new ParseField("skipped_updates"); + + static { + PARSER.declareInt(ConstructingObjectParser.constructorArg(), SUCCESSFUL_DOWNLOADS); + PARSER.declareInt(ConstructingObjectParser.constructorArg(), FAILED_DOWNLOADS); + PARSER.declareLong(ConstructingObjectParser.constructorArg(), TOTAL_DOWNLOAD_TIME); + PARSER.declareInt(ConstructingObjectParser.constructorArg(), DATABASES_COUNT); + PARSER.declareInt(ConstructingObjectParser.constructorArg(), SKIPPED_DOWNLOADS); + } + + private final int successfulDownloads; + private final int failedDownloads; + private final long totalDownloadTime; + private final int databasesCount; + private final int skippedDownloads; + + public GeoIpDownloaderStats(StreamInput in) throws IOException { + successfulDownloads = in.readVInt(); + failedDownloads = in.readVInt(); + totalDownloadTime = in.readVLong(); + databasesCount = in.readVInt(); + skippedDownloads = in.readVInt(); + } + + private GeoIpDownloaderStats(int successfulDownloads, int failedDownloads, long totalDownloadTime, int databasesCount, + int skippedDownloads) { + this.successfulDownloads = successfulDownloads; + this.failedDownloads = failedDownloads; + this.totalDownloadTime = totalDownloadTime; + this.databasesCount = databasesCount; + this.skippedDownloads = skippedDownloads; + } + + public int getSuccessfulDownloads() { + return successfulDownloads; + } + + public int getFailedDownloads() { + return failedDownloads; + } + + public long getTotalDownloadTime() { + return totalDownloadTime; + } + + public int getDatabasesCount() { + return databasesCount; + } + + public int getSkippedDownloads() { + return skippedDownloads; + } + + public GeoIpDownloaderStats skippedDownload() { + return new GeoIpDownloaderStats(successfulDownloads, failedDownloads, totalDownloadTime, databasesCount, skippedDownloads + 1); + } + + public GeoIpDownloaderStats successfulDownload(long downloadTime) { + return new GeoIpDownloaderStats(successfulDownloads + 1, failedDownloads, totalDownloadTime + downloadTime, databasesCount, + skippedDownloads); + } + + public GeoIpDownloaderStats failedDownload() { + return new GeoIpDownloaderStats(successfulDownloads, failedDownloads + 1, totalDownloadTime, databasesCount, skippedDownloads); + } + + public GeoIpDownloaderStats count(int databasesCount) { + return new GeoIpDownloaderStats(successfulDownloads, failedDownloads, totalDownloadTime, databasesCount, skippedDownloads); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(); + builder.field(SUCCESSFUL_DOWNLOADS.getPreferredName(), successfulDownloads); + builder.field(FAILED_DOWNLOADS.getPreferredName(), failedDownloads); + builder.field(TOTAL_DOWNLOAD_TIME.getPreferredName(), totalDownloadTime); + builder.field(DATABASES_COUNT.getPreferredName(), databasesCount); + builder.field(SKIPPED_DOWNLOADS.getPreferredName(), skippedDownloads); + builder.endObject(); + return builder; + } + + public static GeoIpDownloaderStats fromXContent(XContentParser parser) throws IOException { + return PARSER.parse(parser, null); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeVInt(successfulDownloads); + out.writeVInt(failedDownloads); + out.writeVLong(totalDownloadTime); + out.writeVInt(databasesCount); + out.writeVInt(skippedDownloads); + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + GeoIpDownloaderStats that = (GeoIpDownloaderStats) o; + return successfulDownloads == that.successfulDownloads && + failedDownloads == that.failedDownloads && + totalDownloadTime == that.totalDownloadTime && + databasesCount == that.databasesCount && + skippedDownloads == that.skippedDownloads; + } + + @Override + public int hashCode() { + return Objects.hash(successfulDownloads, failedDownloads, totalDownloadTime, databasesCount, skippedDownloads); + } + + @Override + public String toString() { + return Strings.toString(this); + } + + @Override + public String getWriteableName() { + return GeoIpDownloader.GEOIP_DOWNLOADER; + } +} diff --git a/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/stats/GeoIpDownloaderStatsAction.java b/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/stats/GeoIpDownloaderStatsAction.java new file mode 100644 index 0000000000000..a9c817dbc8ad6 --- /dev/null +++ b/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/stats/GeoIpDownloaderStatsAction.java @@ -0,0 +1,194 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.ingest.geoip.stats; + +import org.elasticsearch.action.ActionType; +import org.elasticsearch.action.FailedNodeException; +import org.elasticsearch.action.support.nodes.BaseNodeResponse; +import org.elasticsearch.action.support.nodes.BaseNodesRequest; +import org.elasticsearch.action.support.nodes.BaseNodesResponse; +import org.elasticsearch.cluster.ClusterName; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.common.xcontent.ToXContentObject; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.transport.TransportRequest; + +import java.io.IOException; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; + +public class GeoIpDownloaderStatsAction extends ActionType { + + public static final GeoIpDownloaderStatsAction INSTANCE = new GeoIpDownloaderStatsAction(); + public static final String NAME = "cluster:monitor/ingest/geoip/stats"; + + public GeoIpDownloaderStatsAction() { + super(NAME, Response::new); + } + + public static class Request extends BaseNodesRequest implements ToXContentObject { + + public Request() { + super((String[]) null); + } + + public Request(StreamInput in) throws IOException { + super(in); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(); + builder.endObject(); + return builder; + } + + @Override + public int hashCode() { + // Nothing to hash atm, so just use the action name + return Objects.hashCode(NAME); + } + + @Override + public boolean equals(Object obj) { + if (obj == null) { + return false; + } + if (getClass() != obj.getClass()) { + return false; + } + return true; + } + } + + public static class NodeRequest extends TransportRequest { + public NodeRequest(StreamInput in) throws IOException { + super(in); + } + + public NodeRequest(Request request) { + + } + } + + public static class Response extends BaseNodesResponse implements Writeable, ToXContentObject { + public Response(StreamInput in) throws IOException { + super(in); + } + + public Response(ClusterName clusterName, List nodes, List failures) { + super(clusterName, nodes, failures); + } + + @Override + protected List readNodesFrom(StreamInput in) throws IOException { + return in.readList(NodeResponse::new); + } + + @Override + protected void writeNodesTo(StreamOutput out, List nodes) throws IOException { + out.writeList(nodes); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + GeoIpDownloaderStats stats = + getNodes().stream().map(n -> n.stats).filter(Objects::nonNull).findFirst().orElse(GeoIpDownloaderStats.EMPTY); + builder.startObject(); + builder.field("stats", stats); + builder.startObject("nodes"); + for (Map.Entry e : getNodesMap().entrySet()) { + NodeResponse response = e.getValue(); + if (response.filesInTemp.isEmpty() && response.databases.isEmpty()) { + continue; + } + builder.startObject(e.getKey()); + if (response.databases.isEmpty() == false) { + builder.startArray("databases"); + for (String database : response.databases) { + builder.startObject(); + builder.field("name", database); + builder.endObject(); + } + builder.endArray(); + } + if (response.filesInTemp.isEmpty() == false) { + builder.array("files_in_temp", response.filesInTemp.toArray(String[]::new)); + } + builder.endObject(); + } + builder.endObject(); + builder.endObject(); + return builder; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + Response that = (Response) o; + return Objects.equals(getNodes(), that.getNodes()) && Objects.equals(failures(), that.failures()); + } + + @Override + public int hashCode() { + return Objects.hash(getNodes(), failures()); + } + } + + public static class NodeResponse extends BaseNodeResponse { + + private final GeoIpDownloaderStats stats; + private final Set databases; + private final Set filesInTemp; + + protected NodeResponse(StreamInput in) throws IOException { + super(in); + stats = in.readBoolean() ? new GeoIpDownloaderStats(in) : null; + databases = in.readSet(StreamInput::readString); + filesInTemp = in.readSet(StreamInput::readString); + } + + protected NodeResponse(DiscoveryNode node, GeoIpDownloaderStats stats, Set databases, Set filesInTemp) { + super(node); + this.stats = stats; + this.databases = databases; + this.filesInTemp = filesInTemp; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeBoolean(stats != null); + if (stats != null) { + stats.writeTo(out); + } + out.writeCollection(databases, StreamOutput::writeString); + out.writeCollection(filesInTemp, StreamOutput::writeString); + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + NodeResponse that = (NodeResponse) o; + return stats.equals(that.stats) && databases.equals(that.databases) && filesInTemp.equals(that.filesInTemp); + } + + @Override + public int hashCode() { + return Objects.hash(stats, databases, filesInTemp); + } + } +} diff --git a/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/stats/GeoIpDownloaderStatsTransportAction.java b/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/stats/GeoIpDownloaderStatsTransportAction.java new file mode 100644 index 0000000000000..738970729694a --- /dev/null +++ b/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/stats/GeoIpDownloaderStatsTransportAction.java @@ -0,0 +1,70 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.ingest.geoip.stats; + +import org.elasticsearch.action.FailedNodeException; +import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.nodes.TransportNodesAction; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.ingest.geoip.DatabaseRegistry; +import org.elasticsearch.ingest.geoip.GeoIpDownloader; +import org.elasticsearch.ingest.geoip.GeoIpDownloaderTaskExecutor; +import org.elasticsearch.ingest.geoip.stats.GeoIpDownloaderStatsAction.NodeRequest; +import org.elasticsearch.ingest.geoip.stats.GeoIpDownloaderStatsAction.NodeResponse; +import org.elasticsearch.ingest.geoip.stats.GeoIpDownloaderStatsAction.Request; +import org.elasticsearch.ingest.geoip.stats.GeoIpDownloaderStatsAction.Response; +import org.elasticsearch.tasks.Task; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.TransportService; + +import java.io.IOException; +import java.util.List; + +public class GeoIpDownloaderStatsTransportAction extends TransportNodesAction { + + private final TransportService transportService; + private final DatabaseRegistry registry; + private final GeoIpDownloaderTaskExecutor geoIpDownloaderTaskExecutor; + + @Inject + public GeoIpDownloaderStatsTransportAction(TransportService transportService, ClusterService clusterService, + ThreadPool threadPool, ActionFilters actionFilters, DatabaseRegistry registry, + GeoIpDownloaderTaskExecutor geoIpDownloaderTaskExecutor) { + super(GeoIpDownloaderStatsAction.NAME, threadPool, clusterService, transportService, actionFilters, Request::new, + NodeRequest::new, ThreadPool.Names.MANAGEMENT, NodeResponse.class); + this.transportService = transportService; + this.registry = registry; + this.geoIpDownloaderTaskExecutor = geoIpDownloaderTaskExecutor; + } + + @Override + protected Response newResponse(Request request, List nodeResponses, List failures) { + return new Response(clusterService.getClusterName(), nodeResponses, failures); + } + + @Override + protected NodeRequest newNodeRequest(Request request) { + return new NodeRequest(request); + } + + @Override + protected NodeResponse newNodeResponse(StreamInput in) throws IOException { + return new NodeResponse(in); + } + + @Override + protected NodeResponse nodeOperation(NodeRequest request, Task task) { + GeoIpDownloader geoIpTask = geoIpDownloaderTaskExecutor.getCurrentTask(); + GeoIpDownloaderStats stats = geoIpTask == null || geoIpTask.getStatus() == null ? null : geoIpTask.getStatus(); + return new NodeResponse(transportService.getLocalNode(), stats, registry.getAvailableDatabases(), registry.getFilesInTemp()); + } +} diff --git a/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/stats/RestGeoIpDownloaderStatsAction.java b/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/stats/RestGeoIpDownloaderStatsAction.java new file mode 100644 index 0000000000000..5d20480565265 --- /dev/null +++ b/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/stats/RestGeoIpDownloaderStatsAction.java @@ -0,0 +1,37 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.ingest.geoip.stats; + +import org.elasticsearch.client.node.NodeClient; +import org.elasticsearch.rest.BaseRestHandler; +import org.elasticsearch.rest.RestRequest; +import org.elasticsearch.rest.action.RestToXContentListener; + +import java.util.List; + +import static org.elasticsearch.rest.RestRequest.Method.GET; + +public class RestGeoIpDownloaderStatsAction extends BaseRestHandler { + + @Override + public String getName() { + return "geoip_downloader_stats"; + } + + @Override + public List routes() { + return List.of(new Route(GET, "/_ingest/geoip/stats")); + } + + @Override + protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) { + return channel -> client.execute(GeoIpDownloaderStatsAction.INSTANCE, new GeoIpDownloaderStatsAction.Request(), + new RestToXContentListener<>(channel)); + } +} diff --git a/modules/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/stats/GeoIpDownloaderStatsActionNodeResponseSerializingTests.java b/modules/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/stats/GeoIpDownloaderStatsActionNodeResponseSerializingTests.java new file mode 100644 index 0000000000000..9a5046e503d58 --- /dev/null +++ b/modules/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/stats/GeoIpDownloaderStatsActionNodeResponseSerializingTests.java @@ -0,0 +1,38 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.ingest.geoip.stats; + +import org.elasticsearch.Version; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.test.AbstractWireSerializingTestCase; + +import java.util.Set; + +public class GeoIpDownloaderStatsActionNodeResponseSerializingTests extends + AbstractWireSerializingTestCase { + + @Override + protected Writeable.Reader instanceReader() { + return GeoIpDownloaderStatsAction.NodeResponse::new; + } + + @Override + protected GeoIpDownloaderStatsAction.NodeResponse createTestInstance() { + return createRandomInstance(); + } + + static GeoIpDownloaderStatsAction.NodeResponse createRandomInstance() { + DiscoveryNode node = new DiscoveryNode("id", buildNewFakeTransportAddress(), Version.CURRENT); + Set databases = Set.copyOf(randomList(10, () -> randomAlphaOfLengthBetween(5, 10))); + Set files = Set.copyOf(randomList(10, () -> randomAlphaOfLengthBetween(5, 10))); + return new GeoIpDownloaderStatsAction.NodeResponse(node, GeoIpDownloaderStatsSerializingTests.createRandomInstance(), databases, + files); + } +} diff --git a/modules/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/stats/GeoIpDownloaderStatsActionResponseSerializingTests.java b/modules/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/stats/GeoIpDownloaderStatsActionResponseSerializingTests.java new file mode 100644 index 0000000000000..b2066682c4d4b --- /dev/null +++ b/modules/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/stats/GeoIpDownloaderStatsActionResponseSerializingTests.java @@ -0,0 +1,32 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.ingest.geoip.stats; + +import org.elasticsearch.cluster.ClusterName; +import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.test.AbstractWireSerializingTestCase; + +import java.util.Collections; +import java.util.List; + +public class GeoIpDownloaderStatsActionResponseSerializingTests extends + AbstractWireSerializingTestCase { + + @Override + protected Writeable.Reader instanceReader() { + return GeoIpDownloaderStatsAction.Response::new; + } + + @Override + protected GeoIpDownloaderStatsAction.Response createTestInstance() { + List nodeResponses = randomList(10, + GeoIpDownloaderStatsActionNodeResponseSerializingTests::createRandomInstance); + return new GeoIpDownloaderStatsAction.Response(ClusterName.DEFAULT, nodeResponses, Collections.emptyList()); + } +} diff --git a/modules/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/stats/GeoIpDownloaderStatsSerializingTests.java b/modules/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/stats/GeoIpDownloaderStatsSerializingTests.java new file mode 100644 index 0000000000000..7bb30fd7c203b --- /dev/null +++ b/modules/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/stats/GeoIpDownloaderStatsSerializingTests.java @@ -0,0 +1,50 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.ingest.geoip.stats; + +import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.test.AbstractSerializingTestCase; + +import java.io.IOException; + +public class GeoIpDownloaderStatsSerializingTests extends AbstractSerializingTestCase { + + @Override + protected GeoIpDownloaderStats doParseInstance(XContentParser parser) throws IOException { + return GeoIpDownloaderStats.fromXContent(parser); + } + + @Override + protected Writeable.Reader instanceReader() { + return GeoIpDownloaderStats::new; + } + + @Override + protected GeoIpDownloaderStats createTestInstance() { + return createRandomInstance(); + } + + static GeoIpDownloaderStats createRandomInstance() { + GeoIpDownloaderStats stats = GeoIpDownloaderStats.EMPTY.count(randomInt(1000)); + int successes = randomInt(20); + for (int i = 0; i < successes; i++) { + stats = stats.successfulDownload(randomLongBetween(0, 3000)); + } + int failures = randomInt(20); + for (int i = 0; i < failures; i++) { + stats = stats.failedDownload(); + } + int skipped = randomInt(20); + for (int i = 0; i < skipped; i++) { + stats = stats.skippedDownload(); + } + return stats; + } +} diff --git a/modules/ingest-geoip/src/yamlRestTest/resources/rest-api-spec/test/ingest_geoip/30_geoip_stats.yml b/modules/ingest-geoip/src/yamlRestTest/resources/rest-api-spec/test/ingest_geoip/30_geoip_stats.yml new file mode 100644 index 0000000000000..852b2047a47ec --- /dev/null +++ b/modules/ingest-geoip/src/yamlRestTest/resources/rest-api-spec/test/ingest_geoip/30_geoip_stats.yml @@ -0,0 +1,10 @@ +--- +"Test geoip stats": + - do: + ingest.geo_ip_stats: {} + - gte: { stats.successful_downloads: 0 } + - gte: { stats.failed_downloads: 0 } + - gte: { stats.skipped_updates: 0 } + - gte: { stats.databases_count: 0 } + - gte: { stats.total_download_time: 0 } + - is_true: nodes diff --git a/rest-api-spec/src/main/resources/rest-api-spec/api/ingest.geo_ip_stats.json b/rest-api-spec/src/main/resources/rest-api-spec/api/ingest.geo_ip_stats.json new file mode 100644 index 0000000000000..1013d7dc6fc5e --- /dev/null +++ b/rest-api-spec/src/main/resources/rest-api-spec/api/ingest.geo_ip_stats.json @@ -0,0 +1,25 @@ +{ + "ingest.geo_ip_stats": { + "documentation": { + "url": "https://www.elastic.co/guide/en/elasticsearch/reference/master/geoip-stats-api.html", + "description": "Returns statistical information about geoip databases" + }, + "stability": "stable", + "visibility": "public", + "headers": { + "accept": [ + "application/json" + ] + }, + "url": { + "paths": [ + { + "path": "/_ingest/geoip/stats", + "methods": [ + "GET" + ] + } + ] + } + } +}