Skip to content

Commit

Permalink
[Transform] Introduce _transform/_node_stats API
Browse files Browse the repository at this point in the history
  • Loading branch information
przemekwitek committed May 13, 2024
1 parent c22ec19 commit 00d7de2
Show file tree
Hide file tree
Showing 15 changed files with 615 additions and 1 deletion.
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
{
"transform.get_node_stats":{
"documentation":{
"url":"https://www.elastic.co/guide/en/elasticsearch/reference/current/get-transform-node-stats.html",
"description":"Retrieves transform usage information for transform nodes."
},
"stability":"stable",
"visibility":"public",
"headers":{
"accept": [ "application/json"]
},
"url":{
"paths":[
{
"path":"/_transform/_node_stats",
"methods":[
"GET"
]
}
]
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.xpack.core.transform.action;

import org.elasticsearch.action.ActionType;
import org.elasticsearch.action.FailedNodeException;
import org.elasticsearch.action.support.TransportAction;
import org.elasticsearch.action.support.nodes.BaseNodeResponse;
import org.elasticsearch.action.support.nodes.BaseNodesRequest;
import org.elasticsearch.action.support.nodes.BaseNodesResponse;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.transport.TransportRequest;
import org.elasticsearch.xcontent.ToXContent;
import org.elasticsearch.xcontent.ToXContentObject;
import org.elasticsearch.xcontent.XContentBuilder;

import java.io.IOException;
import java.util.List;

public class GetTransformNodeStatsAction extends ActionType<GetTransformNodeStatsAction.NodesStatsResponse> {

public static final GetTransformNodeStatsAction INSTANCE = new GetTransformNodeStatsAction();
public static final String NAME = "cluster:admin/transform/node_stats";

private static final String TOTAL_FIELD_NAME = "total";
private static final String REGISTERED_TRANSFORM_COUNT_FIELD_NAME = "registered_transform_count";

private GetTransformNodeStatsAction() {
super(NAME);
}

public static class NodesStatsRequest extends BaseNodesRequest<NodesStatsRequest> {

public NodesStatsRequest() {
super(Strings.EMPTY_ARRAY);
}

@Override
public void writeTo(StreamOutput out) throws IOException {
TransportAction.localOnly();
}
}

public static class NodesStatsResponse extends BaseNodesResponse<NodeStatsResponse> implements ToXContentObject {

public int getTotalRegisteredTransformCount() {
int totalRegisteredTransformCount = 0;
for (var nodeResponse : getNodes()) {
totalRegisteredTransformCount += nodeResponse.getRegisteredTransformCount();
}
return totalRegisteredTransformCount;
}

public NodesStatsResponse(ClusterName clusterName, List<NodeStatsResponse> nodes, List<FailedNodeException> failures) {
super(clusterName, nodes, failures);
}

public RestStatus status() {
return this.hasFailures() ? RestStatus.INTERNAL_SERVER_ERROR : RestStatus.OK;
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
for (var nodeEntry : getNodesMap().entrySet()) {
String nodeName = nodeEntry.getKey();
NodeStatsResponse nodeResponse = nodeEntry.getValue();
builder.field(nodeName);
nodeResponse.toXContent(builder, params);
}
builder.startObject(TOTAL_FIELD_NAME);
builder.field(REGISTERED_TRANSFORM_COUNT_FIELD_NAME, getTotalRegisteredTransformCount());
builder.endObject();
return builder.endObject();
}

@Override
protected List<NodeStatsResponse> readNodesFrom(StreamInput in) throws IOException {
return TransportAction.localOnly();
}

@Override
protected void writeNodesTo(StreamOutput out, List<NodeStatsResponse> nodes) throws IOException {
TransportAction.localOnly();
}
}

public static class NodeStatsRequest extends TransportRequest {

public NodeStatsRequest() {}

public NodeStatsRequest(StreamInput in) throws IOException {
super(in);
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
}
}

public static class NodeStatsResponse extends BaseNodeResponse implements ToXContentObject {

private final int registeredTransformCount;

public int getRegisteredTransformCount() {
return this.registeredTransformCount;
}

public NodeStatsResponse(DiscoveryNode node, int registeredTransformCount) {
super(node);
this.registeredTransformCount = registeredTransformCount;
}

public NodeStatsResponse(StreamInput in) throws IOException {
super(in);
this.registeredTransformCount = in.readVInt();
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeVInt(this.registeredTransformCount);
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, ToXContent.Params params) throws IOException {
builder.startObject();
builder.field(REGISTERED_TRANSFORM_COUNT_FIELD_NAME, registeredTransformCount);
return builder.endObject();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.xpack.core.transform.action;

import org.elasticsearch.action.FailedNodeException;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodeUtils;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.core.transform.action.GetTransformNodeStatsAction.NodeStatsResponse;
import org.elasticsearch.xpack.core.transform.action.GetTransformNodeStatsAction.NodesStatsResponse;

import java.util.List;

import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;

public class GetTransformNodeStatsActionNodesStatsResponseTests extends ESTestCase {

private static final ClusterName CLUSTER_NAME = new ClusterName("my-cluster");

public void testEmptyResponse() {
var nodesStatsResponse = new NodesStatsResponse(CLUSTER_NAME, List.of(), List.of());
assertThat(nodesStatsResponse.getNodes(), is(empty()));
assertThat(nodesStatsResponse.failures(), is(empty()));
assertThat(nodesStatsResponse.getTotalRegisteredTransformCount(), is(equalTo(0)));
}

public void testResponse() {
var nodeA = new NodeStatsResponse(createNode("node-A"), 7);
var nodeB = new NodeStatsResponse(createNode("node-B"), 0);
var nodeC = new NodeStatsResponse(createNode("node-C"), 4);

var nodesStatsResponse = new NodesStatsResponse(CLUSTER_NAME, List.of(nodeA, nodeB, nodeC), List.of());
assertThat(nodesStatsResponse.getNodes(), containsInAnyOrder(nodeA, nodeB, nodeC));
assertThat(nodesStatsResponse.failures(), is(empty()));
assertThat(nodesStatsResponse.getTotalRegisteredTransformCount(), is(equalTo(11)));
}

public void testResponseWithFailure() {
var nodeA = new NodeStatsResponse(createNode("node-A"), 7);
var nodeB = new NodeStatsResponse(createNode("node-B"), 0);
var nodeC = new FailedNodeException("node-C", "node C failed", null);

var nodesStatsResponse = new NodesStatsResponse(CLUSTER_NAME, List.of(nodeA, nodeB), List.of(nodeC));
assertThat(nodesStatsResponse.getNodes(), containsInAnyOrder(nodeA, nodeB));
assertThat(nodesStatsResponse.failures(), contains(nodeC));
assertThat(nodesStatsResponse.getTotalRegisteredTransformCount(), is(equalTo(7)));
}

private static DiscoveryNode createNode(String name) {
return DiscoveryNodeUtils.builder(UUIDs.randomBase64UUID(random())).name(name).build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ public class Constants {
"cluster:admin/features/reset",
"cluster:admin/tasks/cancel",
"cluster:admin/transform/delete",
"cluster:admin/transform/node_stats",
"cluster:admin/transform/preview",
"cluster:admin/transform/put",
"cluster:admin/transform/reset",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
setup:
- do:
indices.create:
index: airline-data
body:
mappings:
properties:
time:
type: date
airline:
type: keyword
responsetime:
type: float
event_rate:
type: integer
- do:
transform.put_transform:
transform_id: "airline-transform-stats"
body: >
{
"source": { "index": "airline-data" },
"dest": { "index": "airline-data-by-airline-stats" },
"pivot": {
"group_by": { "airline": {"terms": {"field": "airline"}}},
"aggs": {"avg_response": {"avg": {"field": "responsetime"}}}
},
"sync": { "time": { "field": "time", "delay": "1m" } }
}
- do:
transform.put_transform:
transform_id: "airline-transform-stats-dos"
body: >
{
"source": { "index": "airline-data" },
"dest": { "index": "airline-data-by-airline-stats-dos" },
"pivot": {
"group_by": { "airline": {"terms": {"field": "airline"}}},
"aggs": {"avg_response": {"avg": {"field": "responsetime"}}}
},
"sync": { "time": { "field": "time", "delay": "1m" } }
}
- do:
transform.put_transform:
transform_id: "airline-transform-stats-the-third"
body: >
{
"source": { "index": "airline-data" },
"dest": { "index": "airline-data-by-airline-stats-the-third" },
"pivot": {
"group_by": { "airline": {"terms": {"field": "airline"}}},
"aggs": {"avg_response": {"avg": {"field": "responsetime"}}}
},
"sync": { "time": { "field": "time", "delay": "1m" } }
}
---
teardown:
- do:
transform.stop_transform:
wait_for_checkpoint: false
transform_id: "airline-transform-stats"
wait_for_completion: true
- do:
transform.delete_transform:
transform_id: "airline-transform-stats"
- do:
transform.stop_transform:
wait_for_checkpoint: false
transform_id: "airline-transform-stats-dos"
wait_for_completion: true
- do:
transform.delete_transform:
transform_id: "airline-transform-stats-dos"
- do:
transform.stop_transform:
wait_for_checkpoint: false
transform_id: "airline-transform-stats-the-third"
wait_for_completion: true
- do:
transform.delete_transform:
transform_id: "airline-transform-stats-the-third"

---
"Test get node stats":
- do:
transform.get_node_stats: {}
- match: { total.registered_transform_count: 0 }

- do:
transform.start_transform:
transform_id: "airline-transform-stats"

- do:
transform.get_node_stats: {}
- match: { total.registered_transform_count: 1 }

- do:
transform.start_transform:
transform_id: "airline-transform-stats-dos"

- do:
transform.get_node_stats: {}
- match: { total.registered_transform_count: 2 }

- do:
transform.start_transform:
transform_id: "airline-transform-stats-the-third"

- do:
transform.get_node_stats: {}
- match: { total.registered_transform_count: 3 }
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,11 @@ protected List<String> getTransformTasksFromClusterState(String transformId) thr
return tasks.stream().map(t -> (String) t.get("id")).filter(transformId::equals).toList();
}

protected int getTotalRegisteredTransformCount() throws IOException {
Response response = adminClient().performRequest(new Request("GET", "/_transform/_node_stats"));
return (int) XContentMapValues.extractValue(entityAsMap(response), "total", "registered_transform_count");
}

@SuppressWarnings("unchecked")
protected void logAudits() throws Exception {
logger.info("writing audit messages to the log");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -245,16 +245,19 @@ public void testTransformLifecycleInALoop() throws Exception {
putTransform(transformId, config, RequestOptions.DEFAULT);
assertThat(getTransformTasks(), is(empty()));
assertThat(getTransformTasksFromClusterState(transformId), is(empty()));
assertThat(getTotalRegisteredTransformCount(), is(equalTo(0)));

startTransform(transformId, RequestOptions.DEFAULT);
// There is 1 transform task after start.
assertThat(getTransformTasks(), hasSize(1));
assertThat(getTransformTasksFromClusterState(transformId), hasSize(1));
assertThat(getTotalRegisteredTransformCount(), is(equalTo(1)));

Thread.sleep(sleepAfterStartMillis);
// There should still be 1 transform task as the transform is continuous.
assertThat(getTransformTasks(), hasSize(1));
assertThat(getTransformTasksFromClusterState(transformId), hasSize(1));
assertThat(getTotalRegisteredTransformCount(), is(equalTo(1)));

// Stop the transform with force set randomly.
stopTransform(transformId, true, null, false, force);
Expand All @@ -268,6 +271,7 @@ public void testTransformLifecycleInALoop() throws Exception {
}
// After the transform is stopped, there should be no transform task left in the cluster state.
assertThat(getTransformTasksFromClusterState(transformId), is(empty()));
assertThat(getTotalRegisteredTransformCount(), is(equalTo(0)));

// Delete the transform
deleteTransform(transformId);
Expand Down
Loading

0 comments on commit 00d7de2

Please sign in to comment.