Skip to content

Commit

Permalink
Fix AutoShardingCondition parsing
Browse files Browse the repository at this point in the history
andreidan committed Feb 18, 2024
1 parent 12a33bb commit 588275f
Showing 4 changed files with 97 additions and 55 deletions.
Original file line number Diff line number Diff line change
@@ -8,7 +8,9 @@

package org.elasticsearch.action.admin.indices.rollover;

import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.TransportVersion;
import org.elasticsearch.action.datastreams.autosharding.DataStreamAutoShardingService;
import org.elasticsearch.action.datastreams.autosharding.DataStreamAutoShardingService.AutoShardingResult;
import org.elasticsearch.cluster.metadata.DataStream;
import org.elasticsearch.common.io.stream.StreamInput;
@@ -22,25 +24,50 @@
import java.io.IOException;

import static org.elasticsearch.action.datastreams.autosharding.DataStreamAutoShardingService.AutoShardingType.INCREASE_NUMBER_OF_SHARDS;
import static org.elasticsearch.xcontent.ConstructingObjectParser.constructorArg;
import static org.elasticsearch.xcontent.ConstructingObjectParser.optionalConstructorArg;

/**
* Condition for automatically increasing the number of shards for a data stream. The value is computed when the condition is
* evaluated.
*/
public class AutoShardingCondition extends Condition<AutoShardingResult> {
public static final String NAME = "auto_sharding";
private final boolean isConditionMet;

public static final ParseField AUTO_SHARDING_TYPE = new ParseField("type");
public static final ParseField CURRENT_NUMBER_OF_SHARDS = new ParseField("current_number_of_shards");
public static final ParseField TARGET_NUMBER_OF_SHARDS = new ParseField("target_number_of_shards");
public static final ParseField COOLDOWN_REMAINING = new ParseField("cool_down_remaining");
public static final ParseField WRITE_LOAD = new ParseField("write_load");

public static final ConstructingObjectParser<AutoShardingCondition, Void> PARSER = new ConstructingObjectParser<>(
"auto_sharding_condition",
false,
(args, unused) -> new AutoShardingCondition((AutoShardingResult) args[0])
(args, unused) -> new AutoShardingCondition(
new AutoShardingResult(
DataStreamAutoShardingService.AutoShardingType.valueOf((String) args[0]),
(Integer) args[1],
(Integer) args[2],
(TimeValue) args[3],
(Double) args[4]
)
)
);

static {
PARSER.declareObject(ConstructingObjectParser.constructorArg(), (p, c) -> AutoShardingResult.fromXContent(p), new ParseField(NAME));
PARSER.declareString(constructorArg(), AUTO_SHARDING_TYPE);
PARSER.declareInt(constructorArg(), CURRENT_NUMBER_OF_SHARDS);
PARSER.declareInt(constructorArg(), TARGET_NUMBER_OF_SHARDS);
PARSER.declareString(
constructorArg(),
value -> TimeValue.parseTimeValue(value, COOLDOWN_REMAINING.getPreferredName()),
COOLDOWN_REMAINING
);
PARSER.declareDouble(optionalConstructorArg(), WRITE_LOAD);
}

private final boolean isConditionMet;

public AutoShardingCondition(AutoShardingResult autoShardingResult) {
super(NAME, Type.AUTOMATIC);
this.value = autoShardingResult;
@@ -71,7 +98,13 @@ public void writeTo(StreamOutput out) throws IOException {
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
// we only save this representation in the cluster state as part of meet_conditions when this condition is met
builder.field(NAME, value);
builder.startObject(NAME);
builder.field(AUTO_SHARDING_TYPE.getPreferredName(), value.type());
builder.field(CURRENT_NUMBER_OF_SHARDS.getPreferredName(), value.currentNumberOfShards());
builder.field(TARGET_NUMBER_OF_SHARDS.getPreferredName(), value.targetNumberOfShards());
builder.field(COOLDOWN_REMAINING.getPreferredName(), value.coolDownRemaining().toHumanReadableString(2));
builder.field(WRITE_LOAD.getPreferredName(), value.writeLoad());
builder.endObject();
return builder;
}

Original file line number Diff line number Diff line change
@@ -30,6 +30,7 @@
import org.elasticsearch.index.Index;
import org.elasticsearch.xcontent.ConstructingObjectParser;
import org.elasticsearch.xcontent.ParseField;
import org.elasticsearch.xcontent.ToXContentFragment;
import org.elasticsearch.xcontent.ToXContentObject;
import org.elasticsearch.xcontent.XContentBuilder;
import org.elasticsearch.xcontent.XContentParser;
@@ -135,37 +136,7 @@ public record AutoShardingResult(
int targetNumberOfShards,
TimeValue coolDownRemaining,
@Nullable Double writeLoad
) implements Writeable, ToXContentObject {

public static final ParseField AUTO_SHARDING_TYPE = new ParseField("type");
public static final ParseField CURRENT_NUMBER_OF_SHARDS = new ParseField("current_number_of_shards");
public static final ParseField TARGET_NUMBER_OF_SHARDS = new ParseField("target_number_of_shards");
public static final ParseField COOLDOWN_REMAINING = new ParseField("cool_down_remaining");
public static final ParseField WRITE_LOAD = new ParseField("write_load");

public static final ConstructingObjectParser<AutoShardingResult, Void> PARSER = new ConstructingObjectParser<>(
"auto_sharding",
false,
(args, unused) -> new AutoShardingResult(
AutoShardingType.valueOf((String) args[0]),
(Integer) args[1],
(Integer) args[2],
(TimeValue) args[3],
(Double) args[4]
)
);

static {
PARSER.declareString(constructorArg(), AUTO_SHARDING_TYPE);
PARSER.declareInt(constructorArg(), CURRENT_NUMBER_OF_SHARDS);
PARSER.declareInt(constructorArg(), TARGET_NUMBER_OF_SHARDS);
PARSER.declareString(
constructorArg(),
value -> TimeValue.parseTimeValue(value, COOLDOWN_REMAINING.getPreferredName()),
COOLDOWN_REMAINING
);
PARSER.declareDouble(optionalConstructorArg(), WRITE_LOAD);
}
) implements Writeable {

public AutoShardingResult(
AutoShardingType type,
@@ -185,22 +156,6 @@ public AutoShardingResult(StreamInput in) throws IOException {
this(in.readEnum(AutoShardingType.class), in.readVInt(), in.readVInt(), in.readTimeValue(), in.readOptionalDouble());
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.field(AUTO_SHARDING_TYPE.getPreferredName(), type);
builder.field(CURRENT_NUMBER_OF_SHARDS.getPreferredName(), currentNumberOfShards);
builder.field(TARGET_NUMBER_OF_SHARDS.getPreferredName(), targetNumberOfShards);
builder.field(COOLDOWN_REMAINING.getPreferredName(), coolDownRemaining.toHumanReadableString(2));
builder.field(WRITE_LOAD.getPreferredName(), writeLoad);
builder.endObject();
return builder;
}

public static AutoShardingResult fromXContent(XContentParser parser) throws IOException {
return PARSER.parse(parser, null);
}

@Override
public String toString() {
return "{ type: "
Original file line number Diff line number Diff line change
@@ -8,12 +8,19 @@

package org.elasticsearch.action.admin.indices.rollover;

import org.elasticsearch.action.datastreams.autosharding.DataStreamAutoShardingService.AutoShardingResult;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.EqualsHashCodeTestUtils;
import org.elasticsearch.xcontent.json.JsonXContent;

import java.io.IOException;
import java.util.List;

import static org.elasticsearch.action.datastreams.autosharding.DataStreamAutoShardingService.AutoShardingType.INCREASE_NUMBER_OF_SHARDS;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;

public class ConditionTests extends ESTestCase {

@@ -349,6 +356,53 @@ public void testEqualsAndHashCode() {
);
}

public void testAutoShardCondtionXContent() throws IOException {
AutoShardingCondition autoShardingCondition = new AutoShardingCondition(
new AutoShardingResult(INCREASE_NUMBER_OF_SHARDS, 1, 3, TimeValue.ZERO, 2.0)
);
{

AutoShardingCondition parsedCondition = AutoShardingCondition.fromXContent(createParser(JsonXContent.jsonXContent, """
{
"type": "INCREASE_NUMBER_OF_SHARDS",
"cool_down_remaining": "0s",
"current_number_of_shards": 1,
"target_number_of_shards": 3,
"write_load": 2.0
}
"""));
assertThat(parsedCondition.value, is(autoShardingCondition.value));
}

{
// let's test the met_conditions parsing that is part of the rollover_info
long time = System.currentTimeMillis();
RolloverInfo info = new RolloverInfo("logs-nginx", List.of(autoShardingCondition), time);

RolloverInfo parsedInfo = RolloverInfo.parse(
createParser(
JsonXContent.jsonXContent,
"{\n"
+ " \"met_conditions\": {\n"
+ " \"auto_sharding\": {\n"
+ " \"type\": \"INCREASE_NUMBER_OF_SHARDS\",\n"
+ " \"cool_down_remaining\": \"0s\",\n"
+ " \"current_number_of_shards\": 1,\n"
+ " \"target_number_of_shards\": 3,\n"
+ " \"write_load\": 2.0\n"
+ " }\n"
+ " },\n"
+ " \"time\": "
+ time
+ "\n"
+ " }"
),
"logs-nginx"
);
assertThat(parsedInfo, is(info));
}
}

private static ByteSizeValue randomByteSize() {
return ByteSizeValue.ofBytes(randomNonNegativeLong());
}
Original file line number Diff line number Diff line change
@@ -164,7 +164,7 @@ protected DataStream mutateInstance(DataStream instance) {
public void testRollover() {
DataStream ds = DataStreamTestHelper.randomInstance().promoteDataStream();
Tuple<String, Long> newCoordinates = ds.nextWriteIndexAndGeneration(Metadata.EMPTY_METADATA);
final DataStream rolledDs = ds.rollover(new Index(newCoordinates.v1(), UUIDs.randomBase64UUID()), newCoordinates.v2(), false);
final DataStream rolledDs = ds.rollover(new Index(newCoordinates.v1(), UUIDs.randomBase64UUID()), newCoordinates.v2(), false, null);
assertThat(rolledDs.getName(), equalTo(ds.getName()));
assertThat(rolledDs.getGeneration(), equalTo(ds.getGeneration() + 1));
assertThat(rolledDs.getIndices().size(), equalTo(ds.getIndices().size() + 1));
@@ -189,7 +189,7 @@ public void testRolloverWithConflictingBackingIndexName() {
}

final Tuple<String, Long> newCoordinates = ds.nextWriteIndexAndGeneration(builder.build());
final DataStream rolledDs = ds.rollover(new Index(newCoordinates.v1(), UUIDs.randomBase64UUID()), newCoordinates.v2(), false);
final DataStream rolledDs = ds.rollover(new Index(newCoordinates.v1(), UUIDs.randomBase64UUID()), newCoordinates.v2(), false, null);
assertThat(rolledDs.getName(), equalTo(ds.getName()));
assertThat(rolledDs.getGeneration(), equalTo(ds.getGeneration() + numConflictingIndices + 1));
assertThat(rolledDs.getIndices().size(), equalTo(ds.getIndices().size() + 1));
@@ -219,7 +219,7 @@ public void testRolloverUpgradeToTsdbDataStream() {
);
var newCoordinates = ds.nextWriteIndexAndGeneration(Metadata.EMPTY_METADATA);

var rolledDs = ds.rollover(new Index(newCoordinates.v1(), UUIDs.randomBase64UUID()), newCoordinates.v2(), true);
var rolledDs = ds.rollover(new Index(newCoordinates.v1(), UUIDs.randomBase64UUID()), newCoordinates.v2(), true, null);
assertThat(rolledDs.getName(), equalTo(ds.getName()));
assertThat(rolledDs.getGeneration(), equalTo(ds.getGeneration() + 1));
assertThat(rolledDs.getIndices().size(), equalTo(ds.getIndices().size() + 1));
@@ -247,7 +247,7 @@ public void testRolloverDowngradeToRegularDataStream() {
);
var newCoordinates = ds.nextWriteIndexAndGeneration(Metadata.EMPTY_METADATA);

var rolledDs = ds.rollover(new Index(newCoordinates.v1(), UUIDs.randomBase64UUID()), newCoordinates.v2(), false);
var rolledDs = ds.rollover(new Index(newCoordinates.v1(), UUIDs.randomBase64UUID()), newCoordinates.v2(), false, null);
assertThat(rolledDs.getName(), equalTo(ds.getName()));
assertThat(rolledDs.getGeneration(), equalTo(ds.getGeneration() + 1));
assertThat(rolledDs.getIndices().size(), equalTo(ds.getIndices().size() + 1));

0 comments on commit 588275f

Please sign in to comment.