Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ML-DataFrame] add field reason to DataFrameTransformState and add hlrc protocol tests #40736

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -43,14 +43,16 @@ public class DataFrameTransformState {
private static final ParseField TASK_STATE = new ParseField("task_state");
private static final ParseField CURRENT_POSITION = new ParseField("current_position");
private static final ParseField GENERATION = new ParseField("generation");
private static final ParseField REASON = new ParseField("reason");

@SuppressWarnings("unchecked")
public static final ConstructingObjectParser<DataFrameTransformState, Void> PARSER =
new ConstructingObjectParser<>("data_frame_transform_state",
new ConstructingObjectParser<>("data_frame_transform_state", true,
args -> new DataFrameTransformState((DataFrameTransformTaskState) args[0],
(IndexerState) args[1],
(HashMap<String, Object>) args[2],
(long) args[3]));
(long) args[3],
(String) args[4]));

static {
PARSER.declareField(constructorArg(),
Expand All @@ -68,6 +70,7 @@ public class DataFrameTransformState {
throw new IllegalArgumentException("Unsupported token [" + p.currentToken() + "]");
}, CURRENT_POSITION, ObjectParser.ValueType.VALUE_OBJECT_ARRAY);
PARSER.declareLong(ConstructingObjectParser.optionalConstructorArg(), GENERATION);
PARSER.declareString(ConstructingObjectParser.optionalConstructorArg(), REASON);
}

public static DataFrameTransformState fromXContent(XContentParser parser) throws IOException {
Expand All @@ -78,15 +81,18 @@ public static DataFrameTransformState fromXContent(XContentParser parser) throws
private final IndexerState indexerState;
private final long generation;
private final SortedMap<String, Object> currentPosition;
private final String reason;

public DataFrameTransformState(DataFrameTransformTaskState taskState,
IndexerState indexerState,
@Nullable Map<String, Object> position,
long generation) {
long generation,
@Nullable String reason) {
this.taskState = taskState;
this.indexerState = indexerState;
this.currentPosition = position == null ? null : Collections.unmodifiableSortedMap(new TreeMap<>(position));
this.generation = generation;
this.reason = reason;
}

public IndexerState getIndexerState() {
Expand All @@ -106,6 +112,11 @@ public long getGeneration() {
return generation;
}

@Nullable
public String getReason() {
return reason;
}

@Override
public boolean equals(Object other) {
if (this == other) {
Expand All @@ -121,11 +132,13 @@ public boolean equals(Object other) {
return Objects.equals(this.taskState, that.taskState) &&
Objects.equals(this.indexerState, that.indexerState) &&
Objects.equals(this.currentPosition, that.currentPosition) &&
this.generation == that.generation;
this.generation == that.generation &&
Objects.equals(this.reason, that.reason);
}

@Override
public int hashCode() {
return Objects.hash(taskState, indexerState, currentPosition, generation);
return Objects.hash(taskState, indexerState, currentPosition, generation, reason);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -358,6 +358,7 @@ public void testGetStats() throws Exception {
DataFrameTransformStateAndStats stateAndStats = response.getTransformsStateAndStats().get(0);
assertEquals(IndexerState.STARTED, stateAndStats.getTransformState().getIndexerState());
assertEquals(DataFrameTransformTaskState.STARTED, stateAndStats.getTransformState().getTaskState());
assertEquals(null, stateAndStats.getTransformState().getReason());
assertNotEquals(zeroIndexerStats, stateAndStats.getTransformStats());
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,15 +36,17 @@ public void testFromXContent() throws IOException {
DataFrameTransformStateTests::randomDataFrameTransformState,
DataFrameTransformStateTests::toXContent,
DataFrameTransformState::fromXContent)
.supportsUnknownFields(false)
.supportsUnknownFields(true)
.randomFieldsExcludeFilter(field -> field.equals("current_position"))
.test();
}

public static DataFrameTransformState randomDataFrameTransformState() {
return new DataFrameTransformState(randomFrom(DataFrameTransformTaskState.values()),
randomFrom(IndexerState.values()),
randomPositionMap(),
randomLongBetween(0,10));
randomLongBetween(0,10),
randomBoolean() ? null : randomAlphaOfLength(10));
}

public static void toXContent(DataFrameTransformState state, XContentBuilder builder) throws IOException {
Expand All @@ -55,6 +57,9 @@ public static void toXContent(DataFrameTransformState state, XContentBuilder bui
builder.field("current_position", state.getPosition());
}
builder.field("generation", state.getGeneration());
if (state.getReason() != null) {
builder.field("reason", state.getReason());
}
builder.endObject();
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/

package org.elasticsearch.xpack.core.dataframe.transforms;

import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.protocol.AbstractHlrcXContentTestCase;
import org.elasticsearch.xpack.core.indexing.IndexerState;

import java.io.IOException;
import java.util.function.Predicate;

public class DataFrameTransformStateHlrcTests extends AbstractHlrcXContentTestCase<DataFrameTransformState,
org.elasticsearch.client.dataframe.transforms.DataFrameTransformState> {

@Override
public org.elasticsearch.client.dataframe.transforms.DataFrameTransformState doHlrcParseInstance(XContentParser parser)
throws IOException {
return org.elasticsearch.client.dataframe.transforms.DataFrameTransformState.fromXContent(parser);
}

@Override
public DataFrameTransformState convertHlrcToInternal(org.elasticsearch.client.dataframe.transforms.DataFrameTransformState instance) {
return new DataFrameTransformState(DataFrameTransformTaskState.fromString(instance.getTaskState().value()),
IndexerState.fromString(instance.getIndexerState().value()),
instance.getPosition(), instance.getGeneration(), instance.getReason());
}

@Override
protected DataFrameTransformState createTestInstance() {
return DataFrameTransformStateTests.randomDataFrameTransformState();
}

@Override
protected DataFrameTransformState doParseInstance(XContentParser parser) throws IOException {
return DataFrameTransformState.fromXContent(parser);
}

@Override
protected boolean supportsUnknownFields() {
return true;
}

@Override
protected Predicate<String> getRandomFieldsExcludeFilter() {
return field -> field.equals("current_position");
}
}