Skip to content

Commit

Permalink
temp
Browse files Browse the repository at this point in the history
  • Loading branch information
andreidan committed Feb 7, 2024
1 parent e7199f6 commit 8b040c8
Show file tree
Hide file tree
Showing 7 changed files with 173 additions and 94 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
package org.elasticsearch.datastreams;

import org.elasticsearch.action.admin.indices.rollover.LazyRolloverAction;
import org.elasticsearch.action.datastreams.autosharding.DataStreamAutoShardingService;
import org.elasticsearch.datastreams.lifecycle.health.DataStreamLifecycleHealthInfoPublisher;
import org.elasticsearch.features.FeatureSpecification;
import org.elasticsearch.features.NodeFeature;
Expand All @@ -24,7 +25,8 @@ public class DataStreamFeatures implements FeatureSpecification {
public Set<NodeFeature> getFeatures() {
return Set.of(
DataStreamLifecycleHealthInfoPublisher.DSL_HEALTH_INFO_FEATURE, // Added in 8.12
LazyRolloverAction.DATA_STREAM_LAZY_ROLLOVER // Added in 8.13
LazyRolloverAction.DATA_STREAM_LAZY_ROLLOVER, // Added in 8.13
DataStreamAutoShardingService.DATA_STREAM_AUTO_SHARDING_FEATURE
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,53 +14,38 @@
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.xcontent.ParseField;
import org.elasticsearch.xcontent.XContentBuilder;
import org.elasticsearch.xcontent.XContentParser;

import java.io.IOException;

import static org.elasticsearch.action.datastreams.autosharding.DataStreamAutoShardingService.AutoShardingResult.CURRENT_NUMBER_OF_SHARDS;
import static org.elasticsearch.action.datastreams.autosharding.DataStreamAutoShardingService.AutoShardingResult.TARGET_NUMBER_OF_SHARDS;
import static org.elasticsearch.action.datastreams.autosharding.DataStreamAutoShardingService.AutoShardingType.SCALE_UP;
import static org.elasticsearch.action.datastreams.autosharding.DataStreamAutoShardingService.AutoShardingResult.WRITE_LOAD;
import static org.elasticsearch.action.datastreams.autosharding.DataStreamAutoShardingService.AutoShardingType.INCREASE_NUMBER_OF_SHARDS;

/**
* Condition for automatically increasing the number of shards for a data stream. The value is computed when the condition is
* evaluated.
*/
public class AutoShardingCondition extends Condition<AutoShardingResult> {
public static final String NAME = "auto_sharding";

private static final ParseField WRITE_LOAD = new ParseField("write_load");

private Double writeIndexLoad;
private boolean isConditionMet;

public AutoShardingCondition(AutoShardingResult autoShardingResult) {
super(NAME, Type.INTERNAL);
super(NAME, Type.AUTOMATIC);
this.value = autoShardingResult;
this.writeIndexLoad = null;
}

public AutoShardingCondition(AutoShardingResult autoShardingResult, Double writeIndexLoad) {
super(NAME, Type.INTERNAL);
this.value = autoShardingResult;
this.writeIndexLoad = writeIndexLoad;
this.isConditionMet = value.type() == INCREASE_NUMBER_OF_SHARDS && value.coolDownRemaining().equals(TimeValue.ZERO);
}

public AutoShardingCondition(StreamInput in) throws IOException {
super(NAME, Type.INTERNAL);
super(NAME, Type.AUTOMATIC);
this.value = new AutoShardingResult(in);
this.writeIndexLoad = in.readOptionalDouble();
}

@Override
public Result evaluate(final Stats stats) {
writeIndexLoad = stats.writeIndexLoad();
if (value.type() == SCALE_UP && value.coolDownRemaining().equals(TimeValue.ZERO)) {
return new Result(this, true);
} else {
return new Result(this, false);
}
return new Result(this, isConditionMet);
}

@Override
Expand All @@ -71,17 +56,18 @@ public String getWriteableName() {
@Override
public void writeTo(StreamOutput out) throws IOException {
value.writeTo(out);
out.writeOptionalDouble(writeIndexLoad);
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
// we only save this representation in the cluster state as part of meet_conditions when this condition is met
if (value != null && value.type().equals(SCALE_UP)) {
if (isConditionMet) {
builder.startObject(NAME);
builder.field(CURRENT_NUMBER_OF_SHARDS.getPreferredName(), value.currentNumberOfShards());
builder.field(TARGET_NUMBER_OF_SHARDS.getPreferredName(), value.targetNumberOfShards());
builder.field(WRITE_LOAD.getPreferredName(), writeIndexLoad);
assert value.writeLoad() != null
: "when the condition matches, a change in number of shards is executed and a write load must be present";
builder.field(WRITE_LOAD.getPreferredName(), value.writeLoad());
builder.endObject();
}
return builder;
Expand All @@ -90,8 +76,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
public static AutoShardingCondition fromXContent(XContentParser parser) throws IOException {
if (parser.nextToken() == XContentParser.Token.START_OBJECT) {
return new AutoShardingCondition(
new AutoShardingResult(SCALE_UP, parser.intValue(), parser.intValue(), TimeValue.ZERO),
parser.doubleValue()
new AutoShardingResult(INCREASE_NUMBER_OF_SHARDS, parser.intValue(), parser.intValue(), TimeValue.ZERO, parser.doubleValue())
);
} else {
throw new IllegalArgumentException("invalid token when parsing " + NAME + " condition: " + parser.currentToken());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import org.elasticsearch.TransportVersion;
import org.elasticsearch.common.io.stream.NamedWriteable;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.xcontent.ToXContentFragment;

import java.util.Objects;
Expand All @@ -20,13 +21,14 @@
*/
public abstract class Condition<T> implements NamedWriteable, ToXContentFragment {

/**
* Describes the type of condition - a min_* condition (MIN), max_* condition (MAX), or an internal (usually) condition
/*
* Describes the type of condition - a min_* condition (MIN), max_* condition (MAX), or an automatic condition (automatic conditions
* are something that the platform configures and manages)
*/
public enum Type {
MIN,
MAX,
INTERNAL
AUTOMATIC
}

protected T value;
Expand Down Expand Up @@ -91,7 +93,7 @@ public record Stats(
ByteSizeValue indexSize,
ByteSizeValue maxPrimaryShardSize,
long maxPrimaryShardDocs,
double writeIndexLoad
@Nullable Double writeIndexLoad
) {}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionType;
import org.elasticsearch.action.datastreams.autosharding.DataStreamAutoShardingService;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.client.internal.Client;
import org.elasticsearch.cluster.ClusterState;
Expand Down Expand Up @@ -59,6 +60,7 @@ public TransportLazyRolloverAction(
MetadataRolloverService rolloverService,
AllocationService allocationService,
MetadataDataStreamsService metadataDataStreamsService,
DataStreamAutoShardingService dataStreamAutoShardingService,
Client client
) {
super(
Expand All @@ -71,7 +73,8 @@ public TransportLazyRolloverAction(
rolloverService,
client,
allocationService,
metadataDataStreamsService
metadataDataStreamsService,
dataStreamAutoShardingService
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -244,12 +244,12 @@ public boolean areConditionsMet(Map<String, Boolean> conditionResults) {
.filter(c -> Condition.Type.MAX == c.type())
.anyMatch(c -> conditionResults.getOrDefault(c.toString(), false));

boolean anyImplicitConditionsMet = conditions.values()
boolean anyInternalConditionsMet = conditions.values()
.stream()
.filter(c -> Condition.Type.INTERNAL == c.type())
.filter(c -> Condition.Type.AUTOMATIC == c.type())
.anyMatch(c -> conditionResults.getOrDefault(c.toString(), false));

return conditionResults.size() == 0 || (allMinConditionsMet && anyMaxConditionsMet) || anyImplicitConditionsMet;
return conditionResults.size() == 0 || (allMinConditionsMet && anyMaxConditionsMet) || anyInternalConditionsMet;
}

public static RolloverConditions fromXContent(XContentParser parser) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionType;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
import org.elasticsearch.action.admin.indices.stats.IndexStats;
import org.elasticsearch.action.admin.indices.stats.IndicesStatsAction;
import org.elasticsearch.action.admin.indices.stats.IndicesStatsRequest;
Expand All @@ -29,6 +30,7 @@
import org.elasticsearch.cluster.ClusterStateTaskListener;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.DataStream;
import org.elasticsearch.cluster.metadata.IndexAbstraction;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.IndexMetadataStats;
Expand All @@ -43,6 +45,7 @@
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.collect.Iterators;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.core.Nullable;
Expand All @@ -64,6 +67,9 @@
import java.util.Optional;
import java.util.stream.Collectors;

import static org.elasticsearch.action.datastreams.autosharding.DataStreamAutoShardingService.AutoShardingType.NOT_APPLICABLE;
import static org.elasticsearch.action.datastreams.autosharding.DataStreamAutoShardingService.AutoShardingType.INCREASE_NUMBER_OF_SHARDS;

/**
* Main class to swap the index pointed to by an alias, given some conditions
*/
Expand All @@ -74,6 +80,7 @@ public class TransportRolloverAction extends TransportMasterNodeAction<RolloverR
private final Client client;
private final MasterServiceTaskQueue<RolloverTask> rolloverTaskQueue;
private final MetadataDataStreamsService metadataDataStreamsService;
private final DataStreamAutoShardingService dataStreamAutoShardingService;

@Inject
public TransportRolloverAction(
Expand All @@ -85,7 +92,8 @@ public TransportRolloverAction(
MetadataRolloverService rolloverService,
Client client,
AllocationService allocationService,
MetadataDataStreamsService metadataDataStreamsService
MetadataDataStreamsService metadataDataStreamsService,
DataStreamAutoShardingService dataStreamAutoShardingService
) {
this(
RolloverAction.INSTANCE,
Expand All @@ -97,7 +105,8 @@ public TransportRolloverAction(
rolloverService,
client,
allocationService,
metadataDataStreamsService
metadataDataStreamsService,
dataStreamAutoShardingService
);
}

Expand All @@ -111,7 +120,8 @@ public TransportRolloverAction(
MetadataRolloverService rolloverService,
Client client,
AllocationService allocationService,
MetadataDataStreamsService metadataDataStreamsService
MetadataDataStreamsService metadataDataStreamsService,
DataStreamAutoShardingService dataStreamAutoShardingService
) {
super(
actionType.name(),
Expand All @@ -131,6 +141,7 @@ public TransportRolloverAction(
new RolloverExecutor(clusterService, allocationService, rolloverService, threadPool)
);
this.metadataDataStreamsService = metadataDataStreamsService;
this.dataStreamAutoShardingService = dataStreamAutoShardingService;
}

@Override
Expand Down Expand Up @@ -171,15 +182,6 @@ protected void masterOperation(
MetadataRolloverService.validateIndexName(clusterState, trialRolloverIndexName);

boolean isDataStream = metadata.dataStreams().containsKey(rolloverRequest.getRolloverTarget());
final IndexAbstraction indexAbstraction = clusterState.metadata().getIndicesLookup().get(rolloverRequest.getRolloverTarget());
if (indexAbstraction.getType().equals(IndexAbstraction.Type.DATA_STREAM)) {
RolloverConditions conditionsIncludingImplicit = RolloverConditions.newBuilder(rolloverRequest.getConditions())
.addAutoShardingCondition(
new AutoShardingResult(DataStreamAutoShardingService.AutoShardingType.SCALE_UP, 1, 3, TimeValue.ZERO)
)
.build();
rolloverRequest.setConditions(conditionsIncludingImplicit);
}
if (rolloverRequest.isLazy()) {
if (isDataStream == false || rolloverRequest.getConditions().hasConditions()) {
String message;
Expand Down Expand Up @@ -234,11 +236,44 @@ protected void masterOperation(

listener.delegateFailureAndWrap((delegate, statsResponse) -> {

final IndexAbstraction indexAbstraction = clusterState.metadata()
.getIndicesLookup()
.get(rolloverRequest.getRolloverTarget());
Condition.Stats stats = buildStats(metadata.index(trialSourceIndexName), statsResponse);
if (indexAbstraction.getType().equals(IndexAbstraction.Type.DATA_STREAM)) {
DataStream dataStream = (DataStream) indexAbstraction;
AutoShardingResult autoShardingResult = dataStreamAutoShardingService.calculate(
clusterState,
dataStream,
stats.writeIndexLoad()
);
if (autoShardingResult.type().equals(NOT_APPLICABLE) == false) {
logger.debug("data stream auto sharding result is [{}]", autoShardingResult);
if (autoShardingResult.type().equals(INCREASE_NUMBER_OF_SHARDS)) {
if (autoShardingResult.coolDownRemaining().equals(TimeValue.ZERO)) {
logger.info(
"Data stream auto sharding changing the number of shards for data stream from [{}] to [{}]",
autoShardingResult.currentNumberOfShards(),
autoShardingResult.targetNumberOfShards()
);
CreateIndexRequest createIndexRequest = rolloverRequest.getCreateIndexRequest();
Settings settingsWithAutoSharding = Settings.builder()
.put(createIndexRequest.settings())
.put(IndexMetadata.INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), autoShardingResult.targetNumberOfShards())
.build();
createIndexRequest.settings(settingsWithAutoSharding);
}
}

RolloverConditions conditionsIncludingImplicit = RolloverConditions.newBuilder(rolloverRequest.getConditions())
.addAutoShardingCondition(autoShardingResult)
.build();
rolloverRequest.setConditions(conditionsIncludingImplicit);
}
}

// Evaluate the conditions, so that we can tell without a cluster state update whether a rollover would occur.
final Map<String, Boolean> trialConditionResults = evaluateConditions(
rolloverRequest.getConditionValues(),
buildStats(metadata.index(trialSourceIndexName), statsResponse)
);
final Map<String, Boolean> trialConditionResults = evaluateConditions(rolloverRequest.getConditionValues(), stats);

final RolloverResponse trialRolloverResponse = new RolloverResponse(
trialSourceIndexName,
Expand Down Expand Up @@ -316,7 +351,7 @@ static Condition.Stats buildStats(@Nullable final IndexMetadata metadata, @Nulla
.max()
.orElse(0);

double writeLoad = 0.0;
Double writeLoad = null;
if (statsResponse != null) {
IndexingStats indexing = statsResponse.getTotal().getIndexing();
if (indexing != null) {
Expand Down Expand Up @@ -394,16 +429,6 @@ public ClusterState executeTask(
final var rolloverTask = rolloverTaskContext.getTask();
final var rolloverRequest = rolloverTask.rolloverRequest();

final IndexAbstraction indexAbstraction = currentState.metadata().getIndicesLookup().get(rolloverRequest.getRolloverTarget());
if (indexAbstraction.getType().equals(IndexAbstraction.Type.DATA_STREAM)) {
RolloverConditions conditionsIncludingImplicit = RolloverConditions.newBuilder(rolloverRequest.getConditions())
.addAutoShardingCondition(
new AutoShardingResult(DataStreamAutoShardingService.AutoShardingType.SCALE_UP, 1, 3, TimeValue.ZERO)
)
.build();
rolloverRequest.setConditions(conditionsIncludingImplicit);
}

// Regenerate the rollover names, as a rollover could have happened in between the pre-check and the cluster state update
final var rolloverNames = MetadataRolloverService.resolveRolloverNames(
currentState,
Expand Down Expand Up @@ -433,6 +458,14 @@ public ClusterState executeTask(
? IndexMetadataStats.fromStatsResponse(rolloverSourceIndex, rolloverTask.statsResponse())
: null;

final IndexAbstraction indexAbstraction = currentState.metadata()
.getIndicesLookup()
.get(rolloverRequest.getRolloverTarget());

if (indexAbstraction.getType().equals(IndexAbstraction.Type.DATA_STREAM)) {
// TODO: we scale down the number of shards only when rolling over due to other conditions
}

// Perform the actual rollover
final var rolloverResult = rolloverService.rolloverClusterState(
currentState,
Expand Down
Loading

0 comments on commit 8b040c8

Please sign in to comment.