From 00d7de2dd7d7771c85706e9bc75e305d790dbf8c Mon Sep 17 00:00:00 2001 From: Przemyslaw Witek Date: Tue, 9 Apr 2024 17:24:56 +0200 Subject: [PATCH 1/2] [Transform] Introduce _transform/_node_stats API --- .../api/transform.get_node_stats.json | 23 +++ .../action/GetTransformNodeStatsAction.java | 143 ++++++++++++++++++ ...odeStatsActionNodesStatsResponseTests.java | 63 ++++++++ .../xpack/security/operator/Constants.java | 1 + .../test/transform/transforms_node_stats.yml | 111 ++++++++++++++ .../common/TransformCommonRestTestCase.java | 5 + .../transform/integration/TransformIT.java | 4 + .../integration/TransformNodeStatsIT.java | 91 +++++++++++ .../xpack/transform/Transform.java | 7 +- .../TransportGetTransformNodeStatsAction.java | 83 ++++++++++ .../RestGetTransformNodeStatsAction.java | 42 +++++ .../TransformScheduledTaskQueue.java | 9 ++ .../scheduling/TransformScheduler.java | 9 ++ .../TransformScheduledTaskQueueTests.java | 12 ++ .../scheduling/TransformSchedulerTests.java | 13 ++ 15 files changed, 615 insertions(+), 1 deletion(-) create mode 100644 rest-api-spec/src/main/resources/rest-api-spec/api/transform.get_node_stats.json create mode 100644 x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/action/GetTransformNodeStatsAction.java create mode 100644 x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/action/GetTransformNodeStatsActionNodesStatsResponseTests.java create mode 100644 x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/transform/transforms_node_stats.yml create mode 100644 x-pack/plugin/transform/qa/multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/TransformNodeStatsIT.java create mode 100644 x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportGetTransformNodeStatsAction.java create mode 100644 x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/rest/action/RestGetTransformNodeStatsAction.java diff --git a/rest-api-spec/src/main/resources/rest-api-spec/api/transform.get_node_stats.json b/rest-api-spec/src/main/resources/rest-api-spec/api/transform.get_node_stats.json new file mode 100644 index 0000000000000..ca3fde65f6363 --- /dev/null +++ b/rest-api-spec/src/main/resources/rest-api-spec/api/transform.get_node_stats.json @@ -0,0 +1,23 @@ +{ + "transform.get_node_stats":{ + "documentation":{ + "url":"https://www.elastic.co/guide/en/elasticsearch/reference/current/get-transform-node-stats.html", + "description":"Retrieves transform usage information for transform nodes." + }, + "stability":"stable", + "visibility":"public", + "headers":{ + "accept": [ "application/json"] + }, + "url":{ + "paths":[ + { + "path":"/_transform/_node_stats", + "methods":[ + "GET" + ] + } + ] + } + } +} diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/action/GetTransformNodeStatsAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/action/GetTransformNodeStatsAction.java new file mode 100644 index 0000000000000..9eebacc49bcca --- /dev/null +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/action/GetTransformNodeStatsAction.java @@ -0,0 +1,143 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.core.transform.action; + +import org.elasticsearch.action.ActionType; +import org.elasticsearch.action.FailedNodeException; +import org.elasticsearch.action.support.TransportAction; +import org.elasticsearch.action.support.nodes.BaseNodeResponse; +import org.elasticsearch.action.support.nodes.BaseNodesRequest; +import org.elasticsearch.action.support.nodes.BaseNodesResponse; +import org.elasticsearch.cluster.ClusterName; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.rest.RestStatus; +import org.elasticsearch.transport.TransportRequest; +import org.elasticsearch.xcontent.ToXContent; +import org.elasticsearch.xcontent.ToXContentObject; +import org.elasticsearch.xcontent.XContentBuilder; + +import java.io.IOException; +import java.util.List; + +public class GetTransformNodeStatsAction extends ActionType { + + public static final GetTransformNodeStatsAction INSTANCE = new GetTransformNodeStatsAction(); + public static final String NAME = "cluster:admin/transform/node_stats"; + + private static final String TOTAL_FIELD_NAME = "total"; + private static final String REGISTERED_TRANSFORM_COUNT_FIELD_NAME = "registered_transform_count"; + + private GetTransformNodeStatsAction() { + super(NAME); + } + + public static class NodesStatsRequest extends BaseNodesRequest { + + public NodesStatsRequest() { + super(Strings.EMPTY_ARRAY); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + TransportAction.localOnly(); + } + } + + public static class NodesStatsResponse extends BaseNodesResponse implements ToXContentObject { + + public int getTotalRegisteredTransformCount() { + int totalRegisteredTransformCount = 0; + for (var nodeResponse : getNodes()) { + totalRegisteredTransformCount += nodeResponse.getRegisteredTransformCount(); + } + return totalRegisteredTransformCount; + } + + public NodesStatsResponse(ClusterName clusterName, List nodes, List failures) { + super(clusterName, nodes, failures); + } + + public RestStatus status() { + return this.hasFailures() ? RestStatus.INTERNAL_SERVER_ERROR : RestStatus.OK; + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(); + for (var nodeEntry : getNodesMap().entrySet()) { + String nodeName = nodeEntry.getKey(); + NodeStatsResponse nodeResponse = nodeEntry.getValue(); + builder.field(nodeName); + nodeResponse.toXContent(builder, params); + } + builder.startObject(TOTAL_FIELD_NAME); + builder.field(REGISTERED_TRANSFORM_COUNT_FIELD_NAME, getTotalRegisteredTransformCount()); + builder.endObject(); + return builder.endObject(); + } + + @Override + protected List readNodesFrom(StreamInput in) throws IOException { + return TransportAction.localOnly(); + } + + @Override + protected void writeNodesTo(StreamOutput out, List nodes) throws IOException { + TransportAction.localOnly(); + } + } + + public static class NodeStatsRequest extends TransportRequest { + + public NodeStatsRequest() {} + + public NodeStatsRequest(StreamInput in) throws IOException { + super(in); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + } + } + + public static class NodeStatsResponse extends BaseNodeResponse implements ToXContentObject { + + private final int registeredTransformCount; + + public int getRegisteredTransformCount() { + return this.registeredTransformCount; + } + + public NodeStatsResponse(DiscoveryNode node, int registeredTransformCount) { + super(node); + this.registeredTransformCount = registeredTransformCount; + } + + public NodeStatsResponse(StreamInput in) throws IOException { + super(in); + this.registeredTransformCount = in.readVInt(); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeVInt(this.registeredTransformCount); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, ToXContent.Params params) throws IOException { + builder.startObject(); + builder.field(REGISTERED_TRANSFORM_COUNT_FIELD_NAME, registeredTransformCount); + return builder.endObject(); + } + } +} diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/action/GetTransformNodeStatsActionNodesStatsResponseTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/action/GetTransformNodeStatsActionNodesStatsResponseTests.java new file mode 100644 index 0000000000000..6051f90904a8c --- /dev/null +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/action/GetTransformNodeStatsActionNodesStatsResponseTests.java @@ -0,0 +1,63 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.core.transform.action; + +import org.elasticsearch.action.FailedNodeException; +import org.elasticsearch.cluster.ClusterName; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.node.DiscoveryNodeUtils; +import org.elasticsearch.common.UUIDs; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.xpack.core.transform.action.GetTransformNodeStatsAction.NodeStatsResponse; +import org.elasticsearch.xpack.core.transform.action.GetTransformNodeStatsAction.NodesStatsResponse; + +import java.util.List; + +import static org.hamcrest.Matchers.contains; +import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.hamcrest.Matchers.empty; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.is; + +public class GetTransformNodeStatsActionNodesStatsResponseTests extends ESTestCase { + + private static final ClusterName CLUSTER_NAME = new ClusterName("my-cluster"); + + public void testEmptyResponse() { + var nodesStatsResponse = new NodesStatsResponse(CLUSTER_NAME, List.of(), List.of()); + assertThat(nodesStatsResponse.getNodes(), is(empty())); + assertThat(nodesStatsResponse.failures(), is(empty())); + assertThat(nodesStatsResponse.getTotalRegisteredTransformCount(), is(equalTo(0))); + } + + public void testResponse() { + var nodeA = new NodeStatsResponse(createNode("node-A"), 7); + var nodeB = new NodeStatsResponse(createNode("node-B"), 0); + var nodeC = new NodeStatsResponse(createNode("node-C"), 4); + + var nodesStatsResponse = new NodesStatsResponse(CLUSTER_NAME, List.of(nodeA, nodeB, nodeC), List.of()); + assertThat(nodesStatsResponse.getNodes(), containsInAnyOrder(nodeA, nodeB, nodeC)); + assertThat(nodesStatsResponse.failures(), is(empty())); + assertThat(nodesStatsResponse.getTotalRegisteredTransformCount(), is(equalTo(11))); + } + + public void testResponseWithFailure() { + var nodeA = new NodeStatsResponse(createNode("node-A"), 7); + var nodeB = new NodeStatsResponse(createNode("node-B"), 0); + var nodeC = new FailedNodeException("node-C", "node C failed", null); + + var nodesStatsResponse = new NodesStatsResponse(CLUSTER_NAME, List.of(nodeA, nodeB), List.of(nodeC)); + assertThat(nodesStatsResponse.getNodes(), containsInAnyOrder(nodeA, nodeB)); + assertThat(nodesStatsResponse.failures(), contains(nodeC)); + assertThat(nodesStatsResponse.getTotalRegisteredTransformCount(), is(equalTo(7))); + } + + private static DiscoveryNode createNode(String name) { + return DiscoveryNodeUtils.builder(UUIDs.randomBase64UUID(random())).name(name).build(); + } +} diff --git a/x-pack/plugin/security/qa/operator-privileges-tests/src/javaRestTest/java/org/elasticsearch/xpack/security/operator/Constants.java b/x-pack/plugin/security/qa/operator-privileges-tests/src/javaRestTest/java/org/elasticsearch/xpack/security/operator/Constants.java index 2fc894c69aa4c..e71e3a468cb87 100644 --- a/x-pack/plugin/security/qa/operator-privileges-tests/src/javaRestTest/java/org/elasticsearch/xpack/security/operator/Constants.java +++ b/x-pack/plugin/security/qa/operator-privileges-tests/src/javaRestTest/java/org/elasticsearch/xpack/security/operator/Constants.java @@ -99,6 +99,7 @@ public class Constants { "cluster:admin/features/reset", "cluster:admin/tasks/cancel", "cluster:admin/transform/delete", + "cluster:admin/transform/node_stats", "cluster:admin/transform/preview", "cluster:admin/transform/put", "cluster:admin/transform/reset", diff --git a/x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/transform/transforms_node_stats.yml b/x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/transform/transforms_node_stats.yml new file mode 100644 index 0000000000000..219dfd068b360 --- /dev/null +++ b/x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/transform/transforms_node_stats.yml @@ -0,0 +1,111 @@ +setup: + - do: + indices.create: + index: airline-data + body: + mappings: + properties: + time: + type: date + airline: + type: keyword + responsetime: + type: float + event_rate: + type: integer + - do: + transform.put_transform: + transform_id: "airline-transform-stats" + body: > + { + "source": { "index": "airline-data" }, + "dest": { "index": "airline-data-by-airline-stats" }, + "pivot": { + "group_by": { "airline": {"terms": {"field": "airline"}}}, + "aggs": {"avg_response": {"avg": {"field": "responsetime"}}} + }, + "sync": { "time": { "field": "time", "delay": "1m" } } + } + - do: + transform.put_transform: + transform_id: "airline-transform-stats-dos" + body: > + { + "source": { "index": "airline-data" }, + "dest": { "index": "airline-data-by-airline-stats-dos" }, + "pivot": { + "group_by": { "airline": {"terms": {"field": "airline"}}}, + "aggs": {"avg_response": {"avg": {"field": "responsetime"}}} + }, + "sync": { "time": { "field": "time", "delay": "1m" } } + } + - do: + transform.put_transform: + transform_id: "airline-transform-stats-the-third" + body: > + { + "source": { "index": "airline-data" }, + "dest": { "index": "airline-data-by-airline-stats-the-third" }, + "pivot": { + "group_by": { "airline": {"terms": {"field": "airline"}}}, + "aggs": {"avg_response": {"avg": {"field": "responsetime"}}} + }, + "sync": { "time": { "field": "time", "delay": "1m" } } + } + +--- +teardown: + - do: + transform.stop_transform: + wait_for_checkpoint: false + transform_id: "airline-transform-stats" + wait_for_completion: true + - do: + transform.delete_transform: + transform_id: "airline-transform-stats" + - do: + transform.stop_transform: + wait_for_checkpoint: false + transform_id: "airline-transform-stats-dos" + wait_for_completion: true + - do: + transform.delete_transform: + transform_id: "airline-transform-stats-dos" + - do: + transform.stop_transform: + wait_for_checkpoint: false + transform_id: "airline-transform-stats-the-third" + wait_for_completion: true + - do: + transform.delete_transform: + transform_id: "airline-transform-stats-the-third" + +--- +"Test get node stats": + - do: + transform.get_node_stats: {} + - match: { total.registered_transform_count: 0 } + + - do: + transform.start_transform: + transform_id: "airline-transform-stats" + + - do: + transform.get_node_stats: {} + - match: { total.registered_transform_count: 1 } + + - do: + transform.start_transform: + transform_id: "airline-transform-stats-dos" + + - do: + transform.get_node_stats: {} + - match: { total.registered_transform_count: 2 } + + - do: + transform.start_transform: + transform_id: "airline-transform-stats-the-third" + + - do: + transform.get_node_stats: {} + - match: { total.registered_transform_count: 3 } diff --git a/x-pack/plugin/transform/qa/common/src/main/java/org/elasticsearch/xpack/transform/integration/common/TransformCommonRestTestCase.java b/x-pack/plugin/transform/qa/common/src/main/java/org/elasticsearch/xpack/transform/integration/common/TransformCommonRestTestCase.java index 98cf817d6c018..1a69e221573ab 100644 --- a/x-pack/plugin/transform/qa/common/src/main/java/org/elasticsearch/xpack/transform/integration/common/TransformCommonRestTestCase.java +++ b/x-pack/plugin/transform/qa/common/src/main/java/org/elasticsearch/xpack/transform/integration/common/TransformCommonRestTestCase.java @@ -77,6 +77,11 @@ protected List getTransformTasksFromClusterState(String transformId) thr return tasks.stream().map(t -> (String) t.get("id")).filter(transformId::equals).toList(); } + protected int getTotalRegisteredTransformCount() throws IOException { + Response response = adminClient().performRequest(new Request("GET", "/_transform/_node_stats")); + return (int) XContentMapValues.extractValue(entityAsMap(response), "total", "registered_transform_count"); + } + @SuppressWarnings("unchecked") protected void logAudits() throws Exception { logger.info("writing audit messages to the log"); diff --git a/x-pack/plugin/transform/qa/multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/TransformIT.java b/x-pack/plugin/transform/qa/multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/TransformIT.java index 4db0d0d8baaf1..61d0af8c54fb6 100644 --- a/x-pack/plugin/transform/qa/multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/TransformIT.java +++ b/x-pack/plugin/transform/qa/multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/TransformIT.java @@ -245,16 +245,19 @@ public void testTransformLifecycleInALoop() throws Exception { putTransform(transformId, config, RequestOptions.DEFAULT); assertThat(getTransformTasks(), is(empty())); assertThat(getTransformTasksFromClusterState(transformId), is(empty())); + assertThat(getTotalRegisteredTransformCount(), is(equalTo(0))); startTransform(transformId, RequestOptions.DEFAULT); // There is 1 transform task after start. assertThat(getTransformTasks(), hasSize(1)); assertThat(getTransformTasksFromClusterState(transformId), hasSize(1)); + assertThat(getTotalRegisteredTransformCount(), is(equalTo(1))); Thread.sleep(sleepAfterStartMillis); // There should still be 1 transform task as the transform is continuous. assertThat(getTransformTasks(), hasSize(1)); assertThat(getTransformTasksFromClusterState(transformId), hasSize(1)); + assertThat(getTotalRegisteredTransformCount(), is(equalTo(1))); // Stop the transform with force set randomly. stopTransform(transformId, true, null, false, force); @@ -268,6 +271,7 @@ public void testTransformLifecycleInALoop() throws Exception { } // After the transform is stopped, there should be no transform task left in the cluster state. assertThat(getTransformTasksFromClusterState(transformId), is(empty())); + assertThat(getTotalRegisteredTransformCount(), is(equalTo(0))); // Delete the transform deleteTransform(transformId); diff --git a/x-pack/plugin/transform/qa/multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/TransformNodeStatsIT.java b/x-pack/plugin/transform/qa/multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/TransformNodeStatsIT.java new file mode 100644 index 0000000000000..bee7d7fac3df5 --- /dev/null +++ b/x-pack/plugin/transform/qa/multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/TransformNodeStatsIT.java @@ -0,0 +1,91 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.transform.integration; + +import org.elasticsearch.client.Request; +import org.elasticsearch.client.RequestOptions; +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.xcontent.support.XContentMapValues; +import org.elasticsearch.search.aggregations.AggregationBuilders; +import org.elasticsearch.search.aggregations.AggregatorFactories; +import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramInterval; +import org.elasticsearch.xpack.core.transform.transforms.QueryConfig; +import org.elasticsearch.xpack.core.transform.transforms.pivot.TermsGroupSource; +import org.junit.After; + +import java.util.Map; + +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.hasKey; +import static org.hamcrest.Matchers.is; + +public class TransformNodeStatsIT extends TransformRestTestCase { + + private static final int NUM_USERS = 28; + + static Integer getUserIdForRow(int row) { + return row % NUM_USERS; + } + + static String getDateStringForRow(int row) { + int day = (11 + (row / 100)) % 28; + int hour = 10 + (row % 13); + int min = 10 + (row % 49); + int sec = 10 + (row % 49); + return "2017-01-" + (day < 10 ? "0" + day : day) + "T" + hour + ":" + min + ":" + sec + "Z"; + } + + @After + public void cleanTransforms() throws Exception { + cleanUp(); + } + + @SuppressWarnings("unchecked") + public void testTransformNodeStats() throws Exception { + var transformId = "transform-basic-stats"; + createStoppedTransform("basic-stats-reviews", transformId); + + var nodesInfo = getNodesInfo(adminClient()); + assertThat("Nodes were: " + nodesInfo, nodesInfo.size(), is(equalTo(3))); + + var response = entityAsMap(adminClient().performRequest(new Request("GET", "/_transform/_node_stats"))); + assertThat(response, hasKey("total")); + assertThat(XContentMapValues.extractValue(response, "total", "registered_transform_count"), is(equalTo(0))); + for (String nodeId : nodesInfo.keySet()) { + assertThat(response, hasKey(nodeId)); + assertThat(XContentMapValues.extractValue(response, nodeId, "registered_transform_count"), is(equalTo(0))); + } + } + + private void createStoppedTransform(String indexName, String transformId) throws Exception { + createReviewsIndex(indexName, 100, NUM_USERS, TransformNodeStatsIT::getUserIdForRow, TransformNodeStatsIT::getDateStringForRow); + + var groups = Map.of( + "by-day", + createDateHistogramGroupSourceWithCalendarInterval("timestamp", DateHistogramInterval.DAY, null), + "by-user", + new TermsGroupSource("user_id", null, false), + "by-business", + new TermsGroupSource("business_id", null, false) + ); + + var aggs = AggregatorFactories.builder() + .addAggregator(AggregationBuilders.avg("review_score").field("stars")) + .addAggregator(AggregationBuilders.max("timestamp").field("timestamp")); + + var config = createTransformConfigBuilder(transformId, "reviews-by-user-business-day", QueryConfig.matchAll(), indexName) + .setPivotConfig(createPivotConfig(groups, aggs)) + .build(); + + putTransform(transformId, Strings.toString(config), RequestOptions.DEFAULT); + startTransform(config.getId(), RequestOptions.DEFAULT); + + waitUntilCheckpoint(config.getId(), 1L); + stopTransform(config.getId()); + } +} diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/Transform.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/Transform.java index 5b6d0f5dbe608..efc2edb93026d 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/Transform.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/Transform.java @@ -58,6 +58,7 @@ import org.elasticsearch.xpack.core.transform.action.GetCheckpointAction; import org.elasticsearch.xpack.core.transform.action.GetCheckpointNodeAction; import org.elasticsearch.xpack.core.transform.action.GetTransformAction; +import org.elasticsearch.xpack.core.transform.action.GetTransformNodeStatsAction; import org.elasticsearch.xpack.core.transform.action.GetTransformStatsAction; import org.elasticsearch.xpack.core.transform.action.PreviewTransformAction; import org.elasticsearch.xpack.core.transform.action.PutTransformAction; @@ -74,6 +75,7 @@ import org.elasticsearch.xpack.transform.action.TransportGetCheckpointAction; import org.elasticsearch.xpack.transform.action.TransportGetCheckpointNodeAction; import org.elasticsearch.xpack.transform.action.TransportGetTransformAction; +import org.elasticsearch.xpack.transform.action.TransportGetTransformNodeStatsAction; import org.elasticsearch.xpack.transform.action.TransportGetTransformStatsAction; import org.elasticsearch.xpack.transform.action.TransportPreviewTransformAction; import org.elasticsearch.xpack.transform.action.TransportPutTransformAction; @@ -93,6 +95,7 @@ import org.elasticsearch.xpack.transform.rest.action.RestCatTransformAction; import org.elasticsearch.xpack.transform.rest.action.RestDeleteTransformAction; import org.elasticsearch.xpack.transform.rest.action.RestGetTransformAction; +import org.elasticsearch.xpack.transform.rest.action.RestGetTransformNodeStatsAction; import org.elasticsearch.xpack.transform.rest.action.RestGetTransformStatsAction; import org.elasticsearch.xpack.transform.rest.action.RestPreviewTransformAction; import org.elasticsearch.xpack.transform.rest.action.RestPutTransformAction; @@ -191,7 +194,8 @@ public List getRestHandlers( new RestCatTransformAction(), new RestUpgradeTransformsAction(), new RestResetTransformAction(), - new RestScheduleNowTransformAction() + new RestScheduleNowTransformAction(), + new RestGetTransformNodeStatsAction() ); } @@ -211,6 +215,7 @@ public List getRestHandlers( new ActionHandler<>(UpgradeTransformsAction.INSTANCE, TransportUpgradeTransformsAction.class), new ActionHandler<>(ResetTransformAction.INSTANCE, TransportResetTransformAction.class), new ActionHandler<>(ScheduleNowTransformAction.INSTANCE, TransportScheduleNowTransformAction.class), + new ActionHandler<>(GetTransformNodeStatsAction.INSTANCE, TransportGetTransformNodeStatsAction.class), // internal, no rest endpoint new ActionHandler<>(ValidateTransformAction.INSTANCE, TransportValidateTransformAction.class), diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportGetTransformNodeStatsAction.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportGetTransformNodeStatsAction.java new file mode 100644 index 0000000000000..e4f4b3298517c --- /dev/null +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportGetTransformNodeStatsAction.java @@ -0,0 +1,83 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.transform.action; + +import org.elasticsearch.action.FailedNodeException; +import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.nodes.TransportNodesAction; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.tasks.Task; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.TransportService; +import org.elasticsearch.xpack.core.transform.action.GetTransformNodeStatsAction; +import org.elasticsearch.xpack.core.transform.action.GetTransformNodeStatsAction.NodeStatsRequest; +import org.elasticsearch.xpack.core.transform.action.GetTransformNodeStatsAction.NodeStatsResponse; +import org.elasticsearch.xpack.core.transform.action.GetTransformNodeStatsAction.NodesStatsRequest; +import org.elasticsearch.xpack.core.transform.action.GetTransformNodeStatsAction.NodesStatsResponse; +import org.elasticsearch.xpack.transform.TransformServices; +import org.elasticsearch.xpack.transform.transforms.scheduling.TransformScheduler; + +import java.io.IOException; +import java.util.List; + +/** + * {@link TransportGetTransformNodeStatsAction} class fetches transform-related metrics from all the nodes and aggregates these metrics. + */ +public class TransportGetTransformNodeStatsAction extends TransportNodesAction< + NodesStatsRequest, + NodesStatsResponse, + NodeStatsRequest, + NodeStatsResponse> { + + private final TransportService transportService; + private final TransformScheduler scheduler; + + @Inject + public TransportGetTransformNodeStatsAction( + ThreadPool threadPool, + ClusterService clusterService, + TransportService transportService, + ActionFilters actionFilters, + TransformServices transformServices + ) { + super( + GetTransformNodeStatsAction.NAME, + clusterService, + transportService, + actionFilters, + NodeStatsRequest::new, + threadPool.executor(ThreadPool.Names.MANAGEMENT) + ); + this.transportService = transportService; + this.scheduler = transformServices.getScheduler(); + } + + @Override + protected NodesStatsResponse newResponse(NodesStatsRequest request, List nodes, List failures) { + return new NodesStatsResponse(clusterService.getClusterName(), nodes, failures); + } + + @Override + protected NodeStatsRequest newNodeRequest(NodesStatsRequest request) { + return new NodeStatsRequest(); + } + + @Override + protected NodeStatsResponse newNodeResponse(StreamInput in, DiscoveryNode node) throws IOException { + return new NodeStatsResponse(in); + } + + @Override + protected NodeStatsResponse nodeOperation(NodeStatsRequest request, Task task) { + final DiscoveryNode localNode = transportService.getLocalNode(); + return new NodeStatsResponse(localNode, scheduler.getRegisteredTransformCount()); + } +} diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/rest/action/RestGetTransformNodeStatsAction.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/rest/action/RestGetTransformNodeStatsAction.java new file mode 100644 index 0000000000000..30d3b6dbdcaae --- /dev/null +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/rest/action/RestGetTransformNodeStatsAction.java @@ -0,0 +1,42 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.transform.rest.action; + +import org.elasticsearch.client.internal.node.NodeClient; +import org.elasticsearch.rest.BaseRestHandler; +import org.elasticsearch.rest.RestRequest; +import org.elasticsearch.rest.Scope; +import org.elasticsearch.rest.ServerlessScope; +import org.elasticsearch.rest.action.RestToXContentListener; +import org.elasticsearch.xpack.core.transform.TransformField; +import org.elasticsearch.xpack.core.transform.action.GetTransformNodeStatsAction; +import org.elasticsearch.xpack.core.transform.action.GetTransformNodeStatsAction.NodesStatsRequest; + +import java.util.List; + +import static org.elasticsearch.rest.RestRequest.Method.GET; + +@ServerlessScope(Scope.PUBLIC) +public class RestGetTransformNodeStatsAction extends BaseRestHandler { + + @Override + public List routes() { + return List.of(new Route(GET, TransformField.REST_BASE_PATH_TRANSFORMS + "_node_stats")); + } + + @Override + protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient client) { + NodesStatsRequest request = new NodesStatsRequest(); + return channel -> client.execute(GetTransformNodeStatsAction.INSTANCE, request, new RestToXContentListener<>(channel)); + } + + @Override + public String getName() { + return "transform_get_transform_node_stats_action"; + } +} diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/scheduling/TransformScheduledTaskQueue.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/scheduling/TransformScheduledTaskQueue.java index e11da6af1c285..cd3630a095ed1 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/scheduling/TransformScheduledTaskQueue.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/scheduling/TransformScheduledTaskQueue.java @@ -108,6 +108,15 @@ public synchronized TransformScheduledTask remove(String transformId) { return task; } + /** + * Returns the current queue size. + * + * @return the current queue size + */ + public synchronized int size() { + return tasks.size(); + } + // Visible for testing /** * @return the set of all the transform ids diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/scheduling/TransformScheduler.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/scheduling/TransformScheduler.java index a02f2aac956e2..233a62af5b0df 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/scheduling/TransformScheduler.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/scheduling/TransformScheduler.java @@ -270,6 +270,15 @@ public void deregisterTransform(String transformId) { scheduledTasks.remove(transformId); } + /** + * Returns the number of transforms currently in the queue. + * + * @return number of transforms currently in the queue + */ + public int getRegisteredTransformCount() { + return scheduledTasks.size(); + } + // Visible for testing /** * @return queue current contents diff --git a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/scheduling/TransformScheduledTaskQueueTests.java b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/scheduling/TransformScheduledTaskQueueTests.java index 5030d42f9c17c..6c032e752613b 100644 --- a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/scheduling/TransformScheduledTaskQueueTests.java +++ b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/scheduling/TransformScheduledTaskQueueTests.java @@ -54,6 +54,7 @@ public void testEmptyQueue() { public void testNonEmptyQueue() { queue.add(createTask("task-1", 5)); assertThat(queue.first(), is(notNullValue())); + assertThat(queue.size(), is(equalTo(1))); } public void testAddAndRemove() { @@ -63,6 +64,7 @@ public void testAddAndRemove() { assertThat(queue.first(), is(notNullValue())); assertThat(queue.getTransformIds(), containsInAnyOrder("task-1", "task-2", "task-3")); assertThat(queue.first(), is(equalTo(createTask("task-2", 1)))); + assertThat(queue.size(), is(equalTo(3))); queue.remove("task-1"); queue.remove("task-2"); @@ -86,6 +88,7 @@ public void testConcurrentAddAndRemove() throws Exception { } assertThat(queue.first(), is(notNullValue())); assertThat(queue.getTransformIds(), hasSize(100)); + assertThat(queue.size(), is(equalTo(100))); { Set removedTaskIds = new HashSet<>(); @@ -107,11 +110,13 @@ public void testConcurrentAddAndRemove() throws Exception { public void testAddNoOp() { queue.add(createTask("task-1", 5)); assertThat(queue.first(), is(equalTo(createTask("task-1", 5)))); + assertThat(queue.size(), is(equalTo(1))); // Try adding a task with a duplicate key queue.add(createTask("task-1", 6)); // Verify that the add operation had no effect assertThat(queue.first(), is(equalTo(createTask("task-1", 5)))); + assertThat(queue.size(), is(equalTo(1))); } public void testRemoveNoOp() { @@ -121,6 +126,7 @@ public void testRemoveNoOp() { assertThat(queue.first(), is(notNullValue())); assertThat(queue.getTransformIds(), containsInAnyOrder("task-1")); assertThat(queue.first(), is(equalTo(createTask("task-1", 5)))); + assertThat(queue.size(), is(equalTo(1))); } public void testUpdateNoOp() { @@ -130,6 +136,7 @@ public void testUpdateNoOp() { assertThat(queue.first(), is(notNullValue())); assertThat(queue.getTransformIds(), containsInAnyOrder("task-1")); assertThat(queue.first(), is(equalTo(createTask("task-1", 5)))); + assertThat(queue.size(), is(equalTo(1))); } public void testUpdateModifiesId() { @@ -154,6 +161,7 @@ public void testRemoveAll() { containsInAnyOrder("task-1", "task-2", "task-3", "task-4", "task-5", "task-6", "task-7", "task-8", "task-9") ); assertThat(queue.first(), is(equalTo(createTask("task-7", 0)))); + assertThat(queue.size(), is(equalTo(9))); List tasksByPriority = new ArrayList<>(); while (queue.first() != null) { @@ -184,15 +192,18 @@ public void testUpdatePriority() { queue.add(createTask("task-3", 9)); assertThat(queue.getTransformIds(), containsInAnyOrder("task-1", "task-2", "task-3")); assertThat(queue.first(), is(equalTo(createTask("task-2", 1)))); + assertThat(queue.size(), is(equalTo(3))); queue.update("task-3", task -> createTask(task.getTransformId(), -999)); assertThat(queue.getTransformIds(), containsInAnyOrder("task-1", "task-2", "task-3")); assertThat(queue.first(), is(equalTo(createTask("task-3", -999)))); + assertThat(queue.size(), is(equalTo(3))); queue.update("task-1", task -> createTask(task.getTransformId(), 0)); queue.remove("task-3"); assertThat(queue.getTransformIds(), containsInAnyOrder("task-1", "task-2")); assertThat(queue.first(), is(equalTo(createTask("task-1", 0)))); + assertThat(queue.size(), is(equalTo(2))); } private static TransformScheduledTask createTask(String transformId, long nextScheduledTimeMillis) { @@ -213,5 +224,6 @@ private static void failUnexpectedCall(Event event) { private void assertThatQueueIsEmpty() { assertThat(queue.first(), is(nullValue())); assertThat(queue.getTransformIds(), is(empty())); + assertThat(queue.size(), is(equalTo(0))); } } diff --git a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/scheduling/TransformSchedulerTests.java b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/scheduling/TransformSchedulerTests.java index 8d3220a5b4de3..06fdfd7b538b1 100644 --- a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/scheduling/TransformSchedulerTests.java +++ b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/scheduling/TransformSchedulerTests.java @@ -80,7 +80,9 @@ private void testScheduling(int frequencySeconds, int minFreqencySeconds) { TransformScheduler.Listener listener = events::add; TransformScheduler transformScheduler = new TransformScheduler(clock, threadPool, SETTINGS, minFrequency); + assertThat(transformScheduler.getRegisteredTransformCount(), is(equalTo(0))); transformScheduler.registerTransform(transformTaskParams, listener); + assertThat(transformScheduler.getRegisteredTransformCount(), is(equalTo(1))); assertThat( transformScheduler.getTransformScheduledTasks(), contains(new TransformScheduledTask(transformId, fiveSeconds, 0L, 0, 5000, listener)) @@ -125,6 +127,7 @@ private void testScheduling(int frequencySeconds, int minFreqencySeconds) { assertThat(events.get(2), is(equalTo(new TransformScheduler.Event(transformId, 10005, 10010)))); transformScheduler.deregisterTransform(transformId); + assertThat(transformScheduler.getRegisteredTransformCount(), is(equalTo(0))); assertThat(transformScheduler.getTransformScheduledTasks(), is(empty())); transformScheduler.stop(); @@ -139,7 +142,9 @@ public void testSchedulingWithFailures() { TransformScheduler.Listener listener = events::add; TransformScheduler transformScheduler = new TransformScheduler(clock, threadPool, SETTINGS, TimeValue.ZERO); + assertThat(transformScheduler.getRegisteredTransformCount(), is(equalTo(0))); transformScheduler.registerTransform(transformTaskParams, listener); + assertThat(transformScheduler.getRegisteredTransformCount(), is(equalTo(1))); assertThat( transformScheduler.getTransformScheduledTasks(), contains(new TransformScheduledTask(transformId, frequency, 0L, 0, 60 * 60 * 1000, listener)) @@ -177,6 +182,7 @@ public void testSchedulingWithFailures() { ); transformScheduler.deregisterTransform(transformId); + assertThat(transformScheduler.getRegisteredTransformCount(), is(equalTo(0))); assertThat(transformScheduler.getTransformScheduledTasks(), is(empty())); transformScheduler.stop(); @@ -191,7 +197,9 @@ public void testScheduleNow() { TransformScheduler.Listener listener = events::add; TransformScheduler transformScheduler = new TransformScheduler(clock, threadPool, SETTINGS, TimeValue.ZERO); + assertThat(transformScheduler.getRegisteredTransformCount(), is(equalTo(0))); transformScheduler.registerTransform(transformTaskParams, listener); + assertThat(transformScheduler.getRegisteredTransformCount(), is(equalTo(1))); assertThat( transformScheduler.getTransformScheduledTasks(), contains(new TransformScheduledTask(transformId, frequency, 0L, 0, 60 * 60 * 1000, listener)) @@ -226,6 +234,7 @@ public void testScheduleNow() { assertThat(events.get(2), is(equalTo(new TransformScheduler.Event(transformId, 31 * 60 * 1000, 31 * 60 * 1000)))); transformScheduler.deregisterTransform(transformId); + assertThat(transformScheduler.getRegisteredTransformCount(), is(equalTo(0))); assertThat(transformScheduler.getTransformScheduledTasks(), is(empty())); transformScheduler.stop(); @@ -402,9 +411,11 @@ public void testRegisterMultipleTransforms() { TransformScheduler.Listener listener = events::add; TransformScheduler transformScheduler = new TransformScheduler(clock, threadPool, SETTINGS, TimeValue.ZERO); + assertThat(transformScheduler.getRegisteredTransformCount(), is(equalTo(0))); transformScheduler.registerTransform(transformTaskParams1, listener); transformScheduler.registerTransform(transformTaskParams2, listener); transformScheduler.registerTransform(transformTaskParams3, listener); + assertThat(transformScheduler.getRegisteredTransformCount(), is(equalTo(3))); assertThat( transformScheduler.getTransformScheduledTasks(), contains( @@ -432,9 +443,11 @@ public void testMultipleTransformsEligibleForProcessingAtOnce() { TransformScheduler.Listener listener = events::add; TransformScheduler transformScheduler = new TransformScheduler(clock, threadPool, SETTINGS, TimeValue.ZERO); + assertThat(transformScheduler.getRegisteredTransformCount(), is(equalTo(0))); transformScheduler.registerTransform(transformTaskParams1, listener); transformScheduler.registerTransform(transformTaskParams2, listener); transformScheduler.registerTransform(transformTaskParams3, listener); + assertThat(transformScheduler.getRegisteredTransformCount(), is(equalTo(3))); assertThat( transformScheduler.getTransformScheduledTasks(), contains( From 40c5f500c3d18d49f967839180ef296ceb21782e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Przemys=C5=82aw=20Witek?= Date: Mon, 13 May 2024 14:13:39 +0200 Subject: [PATCH 2/2] Update docs/changelog/107279.yaml --- docs/changelog/107279.yaml | 5 +++++ 1 file changed, 5 insertions(+) create mode 100644 docs/changelog/107279.yaml diff --git a/docs/changelog/107279.yaml b/docs/changelog/107279.yaml new file mode 100644 index 0000000000000..a2940ecc9ba2d --- /dev/null +++ b/docs/changelog/107279.yaml @@ -0,0 +1,5 @@ +pr: 107279 +summary: Introduce _transform/_node_stats API +area: Transform +type: feature +issues: []