Skip to content

Commit

Permalink
Autoshard data streams on rollover
Browse files Browse the repository at this point in the history
  • Loading branch information
andreidan committed Feb 14, 2024
1 parent c6381af commit f9fc212
Show file tree
Hide file tree
Showing 15 changed files with 320 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
package org.elasticsearch.datastreams;

import org.elasticsearch.action.admin.indices.rollover.LazyRolloverAction;
import org.elasticsearch.action.datastreams.autosharding.DataStreamAutoShardingService;
import org.elasticsearch.datastreams.lifecycle.health.DataStreamLifecycleHealthInfoPublisher;
import org.elasticsearch.features.FeatureSpecification;
import org.elasticsearch.features.NodeFeature;
Expand All @@ -24,7 +25,8 @@ public class DataStreamFeatures implements FeatureSpecification {
public Set<NodeFeature> getFeatures() {
return Set.of(
DataStreamLifecycleHealthInfoPublisher.DSL_HEALTH_INFO_FEATURE, // Added in 8.12
LazyRolloverAction.DATA_STREAM_LAZY_ROLLOVER // Added in 8.13
LazyRolloverAction.DATA_STREAM_LAZY_ROLLOVER, // Added in 8.13
DataStreamAutoShardingService.DATA_STREAM_AUTO_SHARDING_FEATURE
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import org.elasticsearch.action.datastreams.MigrateToDataStreamAction;
import org.elasticsearch.action.datastreams.ModifyDataStreamsAction;
import org.elasticsearch.action.datastreams.PromoteDataStreamAction;
import org.elasticsearch.action.datastreams.autosharding.DataStreamAutoShardingService;
import org.elasticsearch.client.internal.OriginSettingClient;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.node.DiscoveryNodes;
Expand Down Expand Up @@ -132,6 +133,7 @@ public static TimeValue getLookAheadTime(Settings settings) {
private final SetOnce<DataStreamLifecycleService> dataLifecycleInitialisationService = new SetOnce<>();
private final SetOnce<DataStreamLifecycleHealthInfoPublisher> dataStreamLifecycleErrorsPublisher = new SetOnce<>();
private final SetOnce<DataStreamLifecycleHealthIndicatorService> dataStreamLifecycleHealthIndicatorService = new SetOnce<>();
private final SetOnce<DataStreamAutoShardingService> dataStreamAutoShardingServiceSetOnce = new SetOnce<>();
private final Settings settings;

public DataStreamsPlugin(Settings settings) {
Expand Down Expand Up @@ -166,6 +168,11 @@ public List<Setting<?>> getSettings() {
pluginSettings.add(DataStreamLifecycleService.DATA_STREAM_MERGE_POLICY_TARGET_FLOOR_SEGMENT_SETTING);
pluginSettings.add(DataStreamLifecycleService.DATA_STREAM_MERGE_POLICY_TARGET_FACTOR_SETTING);
pluginSettings.add(DataStreamLifecycleService.DATA_STREAM_SIGNALLING_ERROR_RETRY_INTERVAL_SETTING);
pluginSettings.add(DataStreamAutoShardingService.DATA_STREAMS_AUTO_SHARDING_EXCLUDES_SETTING);
pluginSettings.add(DataStreamAutoShardingService.DATA_STREAMS_AUTO_SHARDING_INCREASE_SHARDS_COOLDOWN);
pluginSettings.add(DataStreamAutoShardingService.DATA_STREAMS_AUTO_SHARDING_DECREASE_SHARDS_COOLDOWN);
pluginSettings.add(DataStreamAutoShardingService.CLUSTER_AUTO_SHARDING_MIN_NUMBER_WRITE_THREADS);
pluginSettings.add(DataStreamAutoShardingService.CLUSTER_AUTO_SHARDING_MAX_NUMBER_WRITE_THREADS);
return pluginSettings;
}

Expand Down Expand Up @@ -206,9 +213,19 @@ public Collection<?> createComponents(PluginServices services) {
dataLifecycleInitialisationService.get().init();
dataStreamLifecycleHealthIndicatorService.set(new DataStreamLifecycleHealthIndicatorService());

dataStreamAutoShardingServiceSetOnce.set(
new DataStreamAutoShardingService(
settings,
services.clusterService(),
services.featureService(),
services.threadPool()::absoluteTimeInMillis
)
);

components.add(errorStoreInitialisationService.get());
components.add(dataLifecycleInitialisationService.get());
components.add(dataStreamLifecycleErrorsPublisher.get());
components.add(dataStreamAutoShardingServiceSetOnce.get());
return components;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -318,7 +318,7 @@ private MetadataRolloverService.RolloverResult rolloverOver(ClusterState state,
MaxDocsCondition condition = new MaxDocsCondition(randomNonNegativeLong());
List<Condition<?>> metConditions = Collections.singletonList(condition);
CreateIndexRequest createIndexRequest = new CreateIndexRequest("_na_");
return rolloverService.rolloverClusterState(state, name, null, createIndexRequest, metConditions, time, false, false, null);
return rolloverService.rolloverClusterState(state, name, null, createIndexRequest, metConditions, time, false, false, null, null);
}

private Index getWriteIndex(ClusterState state, String name, String timestamp) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,8 @@ public void testRolloverClusterStateForDataStream() throws Exception {
now,
randomBoolean(),
false,
indexStats
indexStats,
null
);
long after = testThreadPool.absoluteTimeInMillis();

Expand Down Expand Up @@ -218,6 +219,7 @@ public void testRolloverAndMigrateDataStream() throws Exception {
now,
randomBoolean(),
false,
null,
null
);

Expand Down Expand Up @@ -310,6 +312,7 @@ public void testChangingIndexModeFromTimeSeriesToSomethingElseNoEffectOnExisting
now,
randomBoolean(),
false,
null,
null
);

Expand Down Expand Up @@ -375,7 +378,8 @@ public void testRolloverClusterStateWithBrokenOlderTsdbDataStream() throws Excep
now,
randomBoolean(),
false,
indexStats
indexStats,
null
);
long after = testThreadPool.absoluteTimeInMillis();

Expand Down Expand Up @@ -455,7 +459,8 @@ public void testRolloverClusterStateWithBrokenTsdbDataStream() throws Exception
now,
randomBoolean(),
false,
indexStats
indexStats,
null
)
);
assertThat(e.getMessage(), containsString("is overlapping with backing index"));
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

package org.elasticsearch.action.admin.indices.rollover;

import org.elasticsearch.TransportVersion;
import org.elasticsearch.action.datastreams.autosharding.DataStreamAutoShardingService.AutoShardingResult;
import org.elasticsearch.cluster.metadata.DataStream;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.xcontent.XContentBuilder;
import org.elasticsearch.xcontent.XContentParser;

import java.io.IOException;

import static org.elasticsearch.action.datastreams.autosharding.DataStreamAutoShardingService.AutoShardingResult.CURRENT_NUMBER_OF_SHARDS;
import static org.elasticsearch.action.datastreams.autosharding.DataStreamAutoShardingService.AutoShardingResult.TARGET_NUMBER_OF_SHARDS;
import static org.elasticsearch.action.datastreams.autosharding.DataStreamAutoShardingService.AutoShardingResult.WRITE_LOAD;
import static org.elasticsearch.action.datastreams.autosharding.DataStreamAutoShardingService.AutoShardingType.INCREASE_NUMBER_OF_SHARDS;

/**
* Condition for automatically increasing the number of shards for a data stream. The value is computed when the condition is
* evaluated.
*/
public class AutoShardingCondition extends Condition<AutoShardingResult> {
public static final String NAME = "auto_sharding";
private boolean isConditionMet;

public AutoShardingCondition(AutoShardingResult autoShardingResult) {
super(NAME, Type.AUTOMATIC);
this.value = autoShardingResult;
this.isConditionMet = (value.type() == INCREASE_NUMBER_OF_SHARDS && value.coolDownRemaining().equals(TimeValue.ZERO));
}

public AutoShardingCondition(StreamInput in) throws IOException {
super(NAME, Type.AUTOMATIC);
this.value = new AutoShardingResult(in);
}

@Override
public Result evaluate(final Stats stats) {
return new Result(this, isConditionMet);
}

@Override
public String getWriteableName() {
return NAME;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
value.writeTo(out);
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
// we only save this representation in the cluster state as part of meet_conditions when this condition is met
if (isConditionMet) {
builder.startObject(NAME);
builder.field(CURRENT_NUMBER_OF_SHARDS.getPreferredName(), value.currentNumberOfShards());
builder.field(TARGET_NUMBER_OF_SHARDS.getPreferredName(), value.targetNumberOfShards());
assert value.writeLoad() != null
: "when the condition matches, a change in number of shards is executed and a write load must be present";
builder.field(WRITE_LOAD.getPreferredName(), value.writeLoad());
builder.endObject();
}
return builder;
}

public static AutoShardingCondition fromXContent(XContentParser parser) throws IOException {
if (parser.nextToken() == XContentParser.Token.START_OBJECT) {
return new AutoShardingCondition(
new AutoShardingResult(
INCREASE_NUMBER_OF_SHARDS,
parser.intValue(),
parser.intValue(),
TimeValue.ZERO,
parser.doubleValue()
)
);
} else {
throw new IllegalArgumentException("invalid token when parsing " + NAME + " condition: " + parser.currentToken());
}
}

@Override
boolean includedInVersion(TransportVersion version) {
return version.onOrAfter(DataStream.ADDED_AUTO_SHARDING_EVENT_VERSION);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,14 @@
*/
public abstract class Condition<T> implements NamedWriteable, ToXContentFragment {

/**
* Describes the type of condition - a min_* condition (MIN) or max_* condition (MAX).
/*
* Describes the type of condition - a min_* condition (MIN), max_* condition (MAX), or an automatic condition (automatic conditions
* are something that the platform configures and manages)
*/
public enum Type {
MIN,
MAX
MAX,
AUTOMATIC
}

protected T value;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionType;
import org.elasticsearch.action.datastreams.autosharding.DataStreamAutoShardingService;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.client.internal.Client;
import org.elasticsearch.cluster.ClusterState;
Expand Down Expand Up @@ -59,6 +60,7 @@ public TransportLazyRolloverAction(
MetadataRolloverService rolloverService,
AllocationService allocationService,
MetadataDataStreamsService metadataDataStreamsService,
DataStreamAutoShardingService dataStreamAutoShardingService,
Client client
) {
super(
Expand All @@ -71,7 +73,8 @@ public TransportLazyRolloverAction(
rolloverService,
client,
allocationService,
metadataDataStreamsService
metadataDataStreamsService,
dataStreamAutoShardingService
);
}

Expand Down Expand Up @@ -121,6 +124,7 @@ protected void masterOperation(
new RolloverRequest(rolloverRequest.getRolloverTarget(), null),
null,
trialRolloverResponse,
null,
listener
);
submitRolloverTask(rolloverRequest, source, rolloverTask);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,18 @@

package org.elasticsearch.action.admin.indices.rollover;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.admin.indices.create.CreateIndexClusterStateUpdateRequest;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
import org.elasticsearch.action.datastreams.autosharding.DataStreamAutoShardingService.AutoShardingResult;
import org.elasticsearch.action.support.ActiveShardCount;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.AliasAction;
import org.elasticsearch.cluster.metadata.AliasMetadata;
import org.elasticsearch.cluster.metadata.ComposableIndexTemplate;
import org.elasticsearch.cluster.metadata.DataStream;
import org.elasticsearch.cluster.metadata.DataStreamAutoShardingEvent;
import org.elasticsearch.cluster.metadata.IndexAbstraction;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.IndexMetadataStats;
Expand All @@ -31,6 +35,7 @@
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.core.Tuple;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.IndexMode;
Expand All @@ -48,6 +53,7 @@
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicReference;
import java.util.regex.Pattern;

import static org.elasticsearch.cluster.metadata.IndexAbstraction.Type.ALIAS;
Expand All @@ -61,6 +67,7 @@
* Service responsible for handling rollover requests for write aliases and data streams
*/
public class MetadataRolloverService {
private static final Logger logger = LogManager.getLogger(MetadataRolloverService.class);
private static final Pattern INDEX_NAME_PATTERN = Pattern.compile("^.*-\\d+$");
private static final List<IndexAbstraction.Type> VALID_ROLLOVER_TARGETS = List.of(ALIAS, DATA_STREAM);

Expand Down Expand Up @@ -110,7 +117,8 @@ public RolloverResult rolloverClusterState(
Instant now,
boolean silent,
boolean onlyValidate,
@Nullable IndexMetadataStats sourceIndexStats
@Nullable IndexMetadataStats sourceIndexStats,
@Nullable AutoShardingResult autoShardingResult
) throws Exception {
validate(currentState.metadata(), rolloverTarget, newIndexName, createIndexRequest);
final IndexAbstraction indexAbstraction = currentState.metadata().getIndicesLookup().get(rolloverTarget);
Expand All @@ -134,7 +142,8 @@ public RolloverResult rolloverClusterState(
now,
silent,
onlyValidate,
sourceIndexStats
sourceIndexStats,
autoShardingResult
);
default ->
// the validate method above prevents this case
Expand Down Expand Up @@ -244,7 +253,8 @@ private RolloverResult rolloverDataStream(
Instant now,
boolean silent,
boolean onlyValidate,
@Nullable IndexMetadataStats sourceIndexStats
@Nullable IndexMetadataStats sourceIndexStats,
@Nullable AutoShardingResult autoShardingResult
) throws Exception {

if (SnapshotsService.snapshottingDataStreams(currentState, Collections.singleton(dataStream.getName())).isEmpty() == false) {
Expand Down Expand Up @@ -281,6 +291,34 @@ private RolloverResult rolloverDataStream(
return new RolloverResult(newWriteIndexName, originalWriteIndex.getName(), currentState);
}

AtomicReference<DataStreamAutoShardingEvent> newAutoShardingEvent = new AtomicReference<>();
if (autoShardingResult != null) {
// we're auto sharding on rollover
assert autoShardingResult.coolDownRemaining().equals(TimeValue.ZERO) : "the auto sharding result must be ready to apply";
logger.info("Auto sharding data stream [{}] to [{}]", dataStreamName, autoShardingResult);
Settings settingsWithAutoSharding = Settings.builder()
.put(createIndexRequest.settings())
.put(IndexMetadata.INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), autoShardingResult.targetNumberOfShards())
.build();
createIndexRequest.settings(settingsWithAutoSharding);
newAutoShardingEvent.set(
new DataStreamAutoShardingEvent(
dataStream.getWriteIndex().getName(),
dataStream.getGeneration(),
autoShardingResult.targetNumberOfShards(),
now.toEpochMilli()
)
);
} else if (dataStream.getAutoShardingEvent() != null) {
// we're not auto sharding on this rollover but maybe a previous rollover did so we have to use the number of shards
// configured by the previous auto sharding event
Settings settingsWithAutoSharding = Settings.builder()
.put(createIndexRequest.settings())
.put(IndexMetadata.INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), dataStream.getAutoShardingEvent().targetNumberOfShards())
.build();
createIndexRequest.settings(settingsWithAutoSharding);
}

var createIndexClusterStateRequest = prepareDataStreamCreateIndexRequest(
dataStreamName,
newWriteIndexName,
Expand All @@ -298,7 +336,14 @@ private RolloverResult rolloverDataStream(
silent,
(builder, indexMetadata) -> {
downgradeBrokenTsdbBackingIndices(dataStream, builder);
builder.put(dataStream.rollover(indexMetadata.getIndex(), newGeneration, metadata.isTimeSeriesTemplate(templateV2)));
builder.put(
dataStream.rollover(
indexMetadata.getIndex(),
newGeneration,
metadata.isTimeSeriesTemplate(templateV2),
newAutoShardingEvent.get()
)
);
},
rerouteCompletionIsNotRequired()
);
Expand Down
Loading

0 comments on commit f9fc212

Please sign in to comment.